kglobe/kitty.py

300 lines
9.2 KiB
Python

#!/usr/bin/env python3
from base64 import standard_b64encode
from enum import Enum
import array
import asyncio
import fcntl
import mmap
import os
import re
import select
import sys
import termios
import tty
import zlib
from terminalio import TerminalIO
SHM_NAME = "/kitty-shm-frame"
class Kitty_Format(Enum):
RGB = 24
RGBA = 32
PNG = 100
class TerminalProtocolError(Exception):
pass
class Terminal():
def __init__(self):
self.terminal_io = TerminalIO()
self.terminal_io.start()
self.supports_kitty_graphics = supports_kitty_graphics()
def get_terminal_size(self) -> tuple[int, int]:
"""Get (rows, cols) of the terminal"""
buf = array.array("H", [0, 0, 0, 0])
fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf)
rows, cols, width, height = buf
return (rows, cols)
async def get_terminal_cell_size(self) -> tuple[int, int]:
"""Get (height, width) of a single cell in px"""
reply = await self._query_terminal("\x1b[16t", "t")
match = re.search(r"\[6;(\d+);(\d+)t", reply)
if match:
v_pix, h_pix = map(int, match.groups())
return v_pix, h_pix
print(reply)
raise ValueError("Failed to parse terminal cell size response")
async def get_terminal_size_pixel(self) -> tuple[int, int]:
"""Get (height, width) of the terminal in px"""
reply = await self._query_terminal("\x1b[14t", "t")
match = re.search(r"\[4;(\d+);(\d+)t", reply)
if match:
height, width = map(int, match.groups())
return height, width
raise ValueError("Failed to parse terminal pixel size response")
def hide_cursor(self) -> None:
"""Tell the terminal to hide the cursor."""
_write_stdout("\x1b[?25l")
def show_cursor(self) -> None:
"""Tell the terminal to show the cursor."""
_write_stdout("\x1b[?25h")
def set_position(self, y: int, x: int) -> None:
"""Set the cursor position to y, x"""
_write_stdout(f"\x1b[{y};{x}H")
async def get_position(self) -> tuple[int, int]:
"""Get the (y, x) position of the cursor"""
reply = await self._query_terminal("\x1b[6n", "R")
match = re.search(r"\[(\d+);(\d+)R", reply)
if match:
y, x = map(int, match.groups())
return y, x
raise ValueError("Failed to parse cursor position response")
async def draw(
self,
buffer,
width: int | None,
height: int | None,
pixel_format: Kitty_Format = Kitty_Format.RGBA,
image_num: int = 1,
compress=False,
use_shm: bool = False,
) -> None:
"""Display an image in the terminal."""
if (pixel_format != Kitty_Format.PNG or use_shm) and (width is None or height is None):
raise ValueError(
"shm transfer or using image formats other than PNG and require height and width"
)
kwargs = {"a": "T", "i": image_num, "f": pixel_format.value, "t": "s" if use_shm else "d"}
if width:
kwargs["s"] = width
if height:
kwargs["v"] = height
if compress:
kwargs["o"] = "z"
data = zlib.compress(buffer)
else:
data = buffer
if use_shm:
# write to shm
_mmap(data, width, height)
# set shm name as payload
data = SHM_NAME
await _write_chunked(**kwargs, data=data, wait_for_ok=True if use_shm else False)
await asyncio.sleep(0.001)
async def _query_terminal(self, escape: str, endchar: str) -> str:
"""
Send `escape` to the terminal, read the response until
`endchar`, return response (including `endchar`)
"""
loop = asyncio.get_running_loop()
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
def read_terminal_response() -> str:
tty.setraw(fd)
_write_stdout(escape)
response = ""
while True:
char = sys.stdin.read(1)
response += char
if char == endchar:
break
return response
try:
response = await loop.run_in_executor(None, read_terminal_response)
finally:
# Restore the terminal settings
termios.tcsetattr(fd, termios.TCSANOW, old_settings)
return response
async def _write_chunked(self, wait_for_ok: bool = True, **cmd) -> None:
image_id = cmd['i']
if cmd["t"] == "s":
data = standard_b64encode(bytes(cmd.pop("data"), "utf-8"))
else:
data = standard_b64encode(cmd.pop("data"))
while data:
chunk, data = data[:4096], data[4096:]
m = 1 if data else 0
payload = _serialize_gr_command(payload=chunk, m=m, **cmd)
sys.stdout.buffer.write(payload)
sys.stdout.flush()
cmd.clear()
if wait_for_ok:
# Wait for terminal to confirm OK response
await self.terminal_io.wait_for_ok()
def _mmap(np_image, width: int, height: int) -> None:
"""Write image data to shared memory"""
shm_size = width * height * 4 # RGBA
fd = os.open(f"/dev/shm{SHM_NAME}", os.O_CREAT | os.O_RDWR)
os.ftruncate(fd, shm_size)
shm = mmap.mmap(fd, shm_size, mmap.MAP_SHARED, mmap.PROT_WRITE)
# write to shared memory
shm.seek(0)
shm.write(np_image.tobytes())
shm.flush()
def supports_kitty_graphics(timeout=0.1) -> bool:
"""Check if the terminal has support for the graphics protocol."""
if not sys.stdout.isatty():
return False
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
# Set terminal to raw mode
tty.setraw(fd)
# Send Kitty graphics query escape code
sys.stdout.write("\033_Gs=1,v=1,a=q,t=d,f=24;AAAA\033\\")
sys.stdout.flush()
# Wait for response with timeout
rlist, _, _ = select.select([fd], [], [], timeout)
if not rlist:
return False
# Read response
response = os.read(fd, 1024).decode("utf-8", "ignore")
return response.startswith("\033_GOK")
finally:
# Restore terminal settings
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
def _serialize_gr_command(**cmd) -> bytes:
payload = cmd.pop("payload", None)
cmd = ",".join(f"{k}={v}" for k, v in cmd.items())
ans = []
w = ans.append
w(b"\033_G"), w(cmd.encode("ascii"))
if payload:
w(b";")
w(payload)
w(b"\033\\")
return b"".join(ans)
#async def _write_chunked(**cmd) -> None:
# if cmd["t"] == "s":
# data = standard_b64encode(bytes(cmd.pop("data"), "utf-8"))
# else:
# data = standard_b64encode(cmd.pop("data"))
# while data:
# chunk, data = data[:4096], data[4096:]
# m = 1 if data else 0
# payload = _serialize_gr_command(payload=chunk, m=m, **cmd)
# sys.stdout.buffer.write(payload)
# sys.stdout.flush()
# cmd.clear()
async def _write_chunked(wait_for_ok: bool = True, **cmd) -> None:
image_id = cmd['i']
if cmd["t"] == "s":
data = standard_b64encode(bytes(cmd.pop("data"), "utf-8"))
else:
data = standard_b64encode(cmd.pop("data"))
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
tty.setraw(fd)
loop = asyncio.get_running_loop()
try:
while data:
chunk, data = data[:4096], data[4096:]
m = 1 if data else 0
payload = _serialize_gr_command(payload=chunk, m=m, **cmd)
sys.stdout.buffer.write(payload)
sys.stdout.flush()
cmd.clear()
if wait_for_ok:
# Wait for terminal to confirm OK response
response = os.read(fd, 1024).decode("utf-8", "ignore")
if not response.startswith(f"\033_Gi={image_id};OK"):
return
#raise RuntimeError(f"Unexpected response: {repr(response)}")
#return response.startswith("\033_GOK")
# def _read_ok():
# response = ""
# while not response.endswith("\033\\"):
# response += sys.stdin.read(1)
# return response
# response = await loop.run_in_executor(None, _read_ok)
# if not response.startswith(f"\033_Gi={image_id};OK"):
# raise RuntimeError(f"Unexpected response: {repr(response)}")
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
def _write_stdout(cmd: str) -> None:
"""Write a command string to stdout and flush."""
sys.stdout.write(cmd)
sys.stdout.flush()
async def main():
term = Terminal()
rows, cols = term.get_terminal_size()
v_pix, h_pix = await term.get_terminal_cell_size()
height, width = await term.get_terminal_size_pixel()
y, x = await term.get_position()
print(f"Terminal has {rows} rows, {cols} cols = {rows * cols} cells")
print(f"Cell size: {h_pix}x{v_pix} px")
print(f"Dimensions: {width}x{height} px")
print("Cursor is at y, x:", y, x)
print(f"Supports graphics: {term.supports_kitty_graphics}")
if __name__ == "__main__":
asyncio.run(main())