#!/usr/bin/env python3 from base64 import standard_b64encode from enum import Enum import array import asyncio import fcntl import mmap import os import re import select import sys import termios import tty import zlib from terminalio import TerminalIO SHM_NAME = "/kitty-shm-frame" class Kitty_Format(Enum): RGB = 24 RGBA = 32 PNG = 100 class TerminalProtocolError(Exception): pass class Terminal(): def __init__(self): self.terminal_io = TerminalIO() self.terminal_io.start() self.supports_kitty_graphics = supports_kitty_graphics() def get_terminal_size(self) -> tuple[int, int]: """Get (rows, cols) of the terminal""" buf = array.array("H", [0, 0, 0, 0]) fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf) rows, cols, width, height = buf return (rows, cols) async def get_terminal_cell_size(self) -> tuple[int, int]: """Get (height, width) of a single cell in px""" reply = await self._query_terminal("\x1b[16t", "t") match = re.search(r"\[6;(\d+);(\d+)t", reply) if match: v_pix, h_pix = map(int, match.groups()) return v_pix, h_pix print(reply) raise ValueError("Failed to parse terminal cell size response") async def get_terminal_size_pixel(self) -> tuple[int, int]: """Get (height, width) of the terminal in px""" reply = await self._query_terminal("\x1b[14t", "t") match = re.search(r"\[4;(\d+);(\d+)t", reply) if match: height, width = map(int, match.groups()) return height, width raise ValueError("Failed to parse terminal pixel size response") def hide_cursor(self) -> None: """Tell the terminal to hide the cursor.""" _write_stdout("\x1b[?25l") def show_cursor(self) -> None: """Tell the terminal to show the cursor.""" _write_stdout("\x1b[?25h") def set_position(self, y: int, x: int) -> None: """Set the cursor position to y, x""" _write_stdout(f"\x1b[{y};{x}H") async def get_position(self) -> tuple[int, int]: """Get the (y, x) position of the cursor""" reply = await self._query_terminal("\x1b[6n", "R") match = re.search(r"\[(\d+);(\d+)R", reply) if match: y, x = map(int, match.groups()) return y, x raise ValueError("Failed to parse cursor position response") async def draw( self, buffer, width: int | None, height: int | None, pixel_format: Kitty_Format = Kitty_Format.RGBA, image_num: int = 1, compress=False, use_shm: bool = False, ) -> None: """Display an image in the terminal.""" if (pixel_format != Kitty_Format.PNG or use_shm) and (width is None or height is None): raise ValueError( "shm transfer or using image formats other than PNG and require height and width" ) kwargs = {"a": "T", "i": image_num, "f": pixel_format.value, "t": "s" if use_shm else "d"} if width: kwargs["s"] = width if height: kwargs["v"] = height if compress: kwargs["o"] = "z" data = zlib.compress(buffer) else: data = buffer if use_shm: # write to shm _mmap(data, width, height) # set shm name as payload data = SHM_NAME await _write_chunked(**kwargs, data=data, wait_for_ok=True if use_shm else False) await asyncio.sleep(0.001) async def _query_terminal(self, escape: str, endchar: str) -> str: """ Send `escape` to the terminal, read the response until `endchar`, return response (including `endchar`) """ loop = asyncio.get_running_loop() fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) def read_terminal_response() -> str: tty.setraw(fd) _write_stdout(escape) response = "" while True: char = sys.stdin.read(1) response += char if char == endchar: break return response try: response = await loop.run_in_executor(None, read_terminal_response) finally: # Restore the terminal settings termios.tcsetattr(fd, termios.TCSANOW, old_settings) return response async def _write_chunked(self, wait_for_ok: bool = True, **cmd) -> None: image_id = cmd['i'] if cmd["t"] == "s": data = standard_b64encode(bytes(cmd.pop("data"), "utf-8")) else: data = standard_b64encode(cmd.pop("data")) while data: chunk, data = data[:4096], data[4096:] m = 1 if data else 0 payload = _serialize_gr_command(payload=chunk, m=m, **cmd) sys.stdout.buffer.write(payload) sys.stdout.flush() cmd.clear() if wait_for_ok: # Wait for terminal to confirm OK response await self.terminal_io.wait_for_ok() def _mmap(np_image, width: int, height: int) -> None: """Write image data to shared memory""" shm_size = width * height * 4 # RGBA fd = os.open(f"/dev/shm{SHM_NAME}", os.O_CREAT | os.O_RDWR) os.ftruncate(fd, shm_size) shm = mmap.mmap(fd, shm_size, mmap.MAP_SHARED, mmap.PROT_WRITE) # write to shared memory shm.seek(0) shm.write(np_image.tobytes()) shm.flush() def supports_kitty_graphics(timeout=0.1) -> bool: """Check if the terminal has support for the graphics protocol.""" if not sys.stdout.isatty(): return False fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) try: # Set terminal to raw mode tty.setraw(fd) # Send Kitty graphics query escape code sys.stdout.write("\033_Gs=1,v=1,a=q,t=d,f=24;AAAA\033\\") sys.stdout.flush() # Wait for response with timeout rlist, _, _ = select.select([fd], [], [], timeout) if not rlist: return False # Read response response = os.read(fd, 1024).decode("utf-8", "ignore") return response.startswith("\033_GOK") finally: # Restore terminal settings termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) def _serialize_gr_command(**cmd) -> bytes: payload = cmd.pop("payload", None) cmd = ",".join(f"{k}={v}" for k, v in cmd.items()) ans = [] w = ans.append w(b"\033_G"), w(cmd.encode("ascii")) if payload: w(b";") w(payload) w(b"\033\\") return b"".join(ans) #async def _write_chunked(**cmd) -> None: # if cmd["t"] == "s": # data = standard_b64encode(bytes(cmd.pop("data"), "utf-8")) # else: # data = standard_b64encode(cmd.pop("data")) # while data: # chunk, data = data[:4096], data[4096:] # m = 1 if data else 0 # payload = _serialize_gr_command(payload=chunk, m=m, **cmd) # sys.stdout.buffer.write(payload) # sys.stdout.flush() # cmd.clear() async def _write_chunked(wait_for_ok: bool = True, **cmd) -> None: image_id = cmd['i'] if cmd["t"] == "s": data = standard_b64encode(bytes(cmd.pop("data"), "utf-8")) else: data = standard_b64encode(cmd.pop("data")) fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) tty.setraw(fd) loop = asyncio.get_running_loop() try: while data: chunk, data = data[:4096], data[4096:] m = 1 if data else 0 payload = _serialize_gr_command(payload=chunk, m=m, **cmd) sys.stdout.buffer.write(payload) sys.stdout.flush() cmd.clear() if wait_for_ok: # Wait for terminal to confirm OK response response = os.read(fd, 1024).decode("utf-8", "ignore") if not response.startswith(f"\033_Gi={image_id};OK"): return #raise RuntimeError(f"Unexpected response: {repr(response)}") #return response.startswith("\033_GOK") # def _read_ok(): # response = "" # while not response.endswith("\033\\"): # response += sys.stdin.read(1) # return response # response = await loop.run_in_executor(None, _read_ok) # if not response.startswith(f"\033_Gi={image_id};OK"): # raise RuntimeError(f"Unexpected response: {repr(response)}") finally: termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) def _write_stdout(cmd: str) -> None: """Write a command string to stdout and flush.""" sys.stdout.write(cmd) sys.stdout.flush() async def main(): term = Terminal() rows, cols = term.get_terminal_size() v_pix, h_pix = await term.get_terminal_cell_size() height, width = await term.get_terminal_size_pixel() y, x = await term.get_position() print(f"Terminal has {rows} rows, {cols} cols = {rows * cols} cells") print(f"Cell size: {h_pix}x{v_pix} px") print(f"Dimensions: {width}x{height} px") print("Cursor is at y, x:", y, x) print(f"Supports graphics: {term.supports_kitty_graphics}") if __name__ == "__main__": asyncio.run(main())