diff --git a/kitty.py b/kitty.py old mode 100755 new mode 100644 index 18394b7..74814f3 --- a/kitty.py +++ b/kitty.py @@ -5,14 +5,103 @@ import sys import termios import tty from base64 import standard_b64encode -from io import BytesIO import array import fcntl +import zlib -async def draw_to_terminal(buffer: BytesIO) -> None: - """Display a PNG image in the terminal.""" - await _write_chunked(a="T", i=1, f=100, q=2, data=buffer) + +import mmap +import os +import select +from enum import Enum + +SHM_NAME = "/kitty-shm-frame" + + +class Kitty_Format(Enum): + RGB = 24 + RGBA = 32 + PNG = 100 + + +async def draw_to_terminal( + buffer, + width: int | None, + height: int | None, + pixel_format: Kitty_Format = Kitty_Format.RGBA, + image_num: int = 1, + compress=False, + use_shm: bool = False, +) -> None: + """Display an image in the terminal.""" + if (pixel_format != Kitty_Format.PNG or use_shm) and (width is None or height is None): + raise ValueError( + "shm transfer or using image formats other than PNG and require height and width" + ) + + kwargs = {"a": "T", "i": image_num, "f": pixel_format.value, "t": "s" if use_shm else "d", "q": 2} + + if width: + kwargs["s"] = width + if height: + kwargs["v"] = height + + if compress: + kwargs["o"] = "z" + data = zlib.compress(buffer) + else: + data = buffer + + if use_shm: + # write to shm + _mmap(data, width, height) + # set shm name as payload + data = SHM_NAME + + await _write_chunked(**kwargs, data=data) + await asyncio.sleep(0.001) + + +def _mmap(np_image, width: int, height: int) -> None: + """Write image data to shared memory""" + shm_size = width * height * 4 # RGBA + + fd = os.open(f"/dev/shm{SHM_NAME}", os.O_CREAT | os.O_RDWR) + os.ftruncate(fd, shm_size) + shm = mmap.mmap(fd, shm_size, mmap.MAP_SHARED, mmap.PROT_WRITE) + + # write to shared memory + shm.seek(0) + shm.write(np_image.tobytes()) + shm.flush() + + +def supports_kitty_graphics(timeout=0.1) -> bool: + """Check if the terminal has support for the graphics protocol.""" + if not sys.stdout.isatty(): + return False + + fd = sys.stdin.fileno() + old_settings = termios.tcgetattr(fd) + try: + # Set terminal to raw mode + tty.setraw(fd) + # Send Kitty graphics query escape code + sys.stdout.write("\033_Gs=1,v=1,a=q,t=d,f=24;AAAA\033\\") + sys.stdout.flush() + + # Wait for response with timeout + rlist, _, _ = select.select([fd], [], [], timeout) + if not rlist: + return False + + # Read response + response = os.read(fd, 1024).decode("utf-8", "ignore") + return response.startswith("\033_GOK") + finally: + # Restore terminal settings + termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) async def get_terminal_cell_size() -> tuple[int, int]: @@ -60,14 +149,16 @@ def _serialize_gr_command(**cmd) -> bytes: async def _write_chunked(**cmd) -> None: - data = standard_b64encode(cmd.pop("data")) - loop = asyncio.get_running_loop() + if cmd["t"] == "s": + data = standard_b64encode(bytes(cmd.pop("data"), "utf-8")) + else: + data = standard_b64encode(cmd.pop("data")) while data: chunk, data = data[:4096], data[4096:] m = 1 if data else 0 payload = _serialize_gr_command(payload=chunk, m=m, **cmd) - await loop.run_in_executor(None, sys.stdout.buffer.write, payload) - await loop.run_in_executor(None, sys.stdout.flush) + sys.stdout.buffer.write(payload) + sys.stdout.flush() cmd.clear() @@ -86,7 +177,19 @@ def set_position(y: int, x: int) -> None: _write_stdout(f"\x1b[{y};{x}H") +async def get_position() -> tuple[int, int]: + """Get the (y, x) position of the cursor""" + reply = await _query_terminal("\x1b[6n", "R") + + match = re.search(r"\[(\d+);(\d+)R", reply) + if match: + y, x = map(int, match.groups()) + return y, x + raise ValueError("Failed to parse cursor position response") + + def _write_stdout(cmd: str) -> None: + """Write a command string to stdout and flush.""" sys.stdout.write(cmd) sys.stdout.flush() @@ -120,43 +223,18 @@ async def _query_terminal(escape: str, endchar: str) -> str: return response -async def get_position() -> tuple[int, int]: - """Get the (y, x) position of the cursor""" - reply = await _query_terminal("\x1b[6n", "R") - - match = re.search(r"\[(\d+);(\d+)R", reply) - if match: - y, x = map(int, match.groups()) - return y, x - raise ValueError("Failed to parse cursor position response") - - async def main(): - import pyvista as pv - rows, cols = get_terminal_size() v_pix, h_pix = await get_terminal_cell_size() height, width = await get_terminal_size_pixel() y, x = await get_position() - y, x = await get_position() print(f"Terminal has {rows} rows, {cols} cols = {rows * cols} cells") print(f"Cell size: {h_pix}x{v_pix} px") print(f"Dimensions: {width}x{height} px") print("Cursor is at y, x:", y, x) + print(f"Supports graphics: {supports_kitty_graphics()}") - new_lines = int((height / v_pix) - 6) - print("\n" * new_lines, end="") - - set_position(y - new_lines - 6, x) - - s = pv.Sphere() - pl = pv.Plotter(off_screen=True) - pl.add_mesh(s) - b = BytesIO() - pl.screenshot(b, transparent_background=True, window_size=(width, height)) - - await draw_to_terminal(b) if __name__ == "__main__": asyncio.run(main())