#!/usr/bin/env python3 from base64 import standard_b64encode from enum import Enum import array import asyncio import fcntl import mmap import os import re import select import sys import termios import tty import zlib SHM_NAME = "/kitty-shm-frame" class Kitty_Format(Enum): RGB = 24 RGBA = 32 PNG = 100 class TerminalProtocolError(Exception): pass async def draw_to_terminal( buffer, width: int | None, height: int | None, pixel_format: Kitty_Format = Kitty_Format.RGBA, image_num: int = 1, compress=False, use_shm: bool = False, ) -> None: """Display an image in the terminal.""" if (pixel_format != Kitty_Format.PNG or use_shm) and (width is None or height is None): raise TerminalProtocolError( "shm transfer or using image formats other than PNG and require height and width" ) kwargs = {"a": "T", "i": image_num, "f": pixel_format.value, "t": "s" if use_shm else "d"} if width: kwargs["s"] = width if height: kwargs["v"] = height if compress: kwargs["o"] = "z" data = zlib.compress(buffer) else: data = buffer if use_shm: # write to shm _mmap(data, width, height) # set shm name as payload data = SHM_NAME await _write_chunked(**kwargs, data=data, wait_for_ok=True) await asyncio.sleep(0.001) def _mmap(np_image, width: int, height: int) -> None: """Write image data to shared memory""" shm_size = width * height * 4 # RGBA fd = os.open(f"/dev/shm{SHM_NAME}", os.O_CREAT | os.O_RDWR) os.ftruncate(fd, shm_size) shm = mmap.mmap(fd, shm_size, mmap.MAP_SHARED, mmap.PROT_WRITE) # write to shared memory shm.seek(0) shm.write(np_image.tobytes()) shm.flush() def supports_kitty_graphics(timeout=0.1) -> bool: """Check if the terminal has support for the graphics protocol.""" if not sys.stdout.isatty(): return False fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) try: # Set terminal to raw mode tty.setraw(fd) # Send Kitty graphics query escape code sys.stdout.write("\033_Gs=1,v=1,a=q,t=d,f=24;AAAA\033\\") sys.stdout.flush() # Wait for response with timeout rlist, _, _ = select.select([fd], [], [], timeout) if not rlist: return False # Read response response = os.read(fd, 1024).decode("utf-8", "ignore") return response.startswith("\033_GOK") finally: # Restore terminal settings termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) async def get_terminal_cell_size() -> tuple[int, int]: """Get (height, width) of a single cell in px""" reply = await _query_terminal("\x1b[16t", "t") match = re.search(r"\[6;(\d+);(\d+)t", reply) if match: v_pix, h_pix = map(int, match.groups()) return v_pix, h_pix print(reply) raise TerminalProtocolError("Failed to parse terminal cell size response") async def get_terminal_size_pixel() -> tuple[int, int]: """Get (height, width) of the terminal in px""" reply = await _query_terminal("\x1b[14t", "t") match = re.search(r"\[4;(\d+);(\d+)t", reply) if match: height, width = map(int, match.groups()) return height, width raise TerminalProtocolError("Failed to parse terminal pixel size response") def get_terminal_size() -> tuple[int, int]: """Get (rows, cols) of the terminal""" buf = array.array("H", [0, 0, 0, 0]) fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf) rows, cols, width, height = buf return (rows, cols) def _serialize_gr_command(**cmd) -> bytes: payload = cmd.pop("payload", None) cmd = ",".join(f"{k}={v}" for k, v in cmd.items()) ans = [] w = ans.append w(b"\033_G"), w(cmd.encode("ascii")) if payload: w(b";") w(payload) w(b"\033\\") return b"".join(ans) #async def _write_chunked(**cmd) -> None: # if cmd["t"] == "s": # data = standard_b64encode(bytes(cmd.pop("data"), "utf-8")) # else: # data = standard_b64encode(cmd.pop("data")) # while data: # chunk, data = data[:4096], data[4096:] # m = 1 if data else 0 # payload = _serialize_gr_command(payload=chunk, m=m, **cmd) # sys.stdout.buffer.write(payload) # sys.stdout.flush() # cmd.clear() async def _write_chunked(wait_for_ok: bool = True, **cmd) -> None: image_id = cmd['i'] if cmd["t"] == "s": data = standard_b64encode(bytes(cmd.pop("data"), "utf-8")) else: data = standard_b64encode(cmd.pop("data")) fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) tty.setraw(fd) loop = asyncio.get_running_loop() try: while data: chunk, data = data[:4096], data[4096:] m = 1 if data else 0 payload = _serialize_gr_command(payload=chunk, m=m, **cmd) sys.stdout.buffer.write(payload) sys.stdout.flush() cmd.clear() if wait_for_ok: # Wait for terminal to confirm OK response response = os.read(fd, 1024).decode("utf-8", "ignore") if not response.startswith(f"\033_Gi={image_id};OK"): return #raise RuntimeError(f"Unexpected response: {repr(response)}") #return response.startswith("\033_GOK") # def _read_ok(): # response = "" # while not response.endswith("\033\\"): # response += sys.stdin.read(1) # return response # response = await loop.run_in_executor(None, _read_ok) # if not response.startswith(f"\033_Gi={image_id};OK"): # raise RuntimeError(f"Unexpected response: {repr(response)}") finally: termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) def hide_cursor() -> None: """Tell the terminal to hide the cursor.""" _write_stdout("\x1b[?25l") def show_cursor() -> None: """Tell the terminal to show the cursor.""" _write_stdout("\x1b[?25h") def set_position(y: int, x: int) -> None: """Set the cursor position to y, x""" _write_stdout(f"\x1b[{y};{x}H") async def get_position() -> tuple[int, int]: """Get the (y, x) position of the cursor""" reply = await _query_terminal("\x1b[6n", "R") match = re.search(r"\[(\d+);(\d+)R", reply) if match: y, x = map(int, match.groups()) return y, x raise ValueError("Failed to parse cursor position response") def _write_stdout(cmd: str) -> None: """Write a command string to stdout and flush.""" sys.stdout.write(cmd) sys.stdout.flush() async def _query_terminal(escape: str, endchar: str) -> str: """ Send `escape` to the terminal, read the response until `endchar`, return response (including `endchar`) """ # Save the current terminal settings loop = asyncio.get_running_loop() fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) def read_terminal_response() -> str: tty.setraw(fd) _write_stdout(escape) response = "" while True: char = sys.stdin.read(1) response += char if char == endchar: break return response try: response = await loop.run_in_executor(None, read_terminal_response) finally: # Restore the terminal settings termios.tcsetattr(fd, termios.TCSANOW, old_settings) return response async def main(): rows, cols = get_terminal_size() v_pix, h_pix = await get_terminal_cell_size() height, width = await get_terminal_size_pixel() y, x = await get_position() print(f"Terminal has {rows} rows, {cols} cols = {rows * cols} cells") print(f"Cell size: {h_pix}x{v_pix} px") print(f"Dimensions: {width}x{height} px") print("Cursor is at y, x:", y, x) print(f"Supports graphics: {supports_kitty_graphics()}") if __name__ == "__main__": asyncio.run(main())