diff --git a/kitty.py b/kitty.py index 74814f3..36725d4 100644 --- a/kitty.py +++ b/kitty.py @@ -16,6 +16,8 @@ import os import select from enum import Enum +from terminalio import TerminalIO + SHM_NAME = "/kitty-shm-frame" @@ -24,44 +26,130 @@ class Kitty_Format(Enum): RGBA = 32 PNG = 100 +class Terminal(): + def __init__(self): + self.terminal_io = TerminalIO() + self.terminal_io.start() + self.supports_kitty_graphics = supports_kitty_graphics() -async def draw_to_terminal( - buffer, - width: int | None, - height: int | None, - pixel_format: Kitty_Format = Kitty_Format.RGBA, - image_num: int = 1, - compress=False, - use_shm: bool = False, -) -> None: - """Display an image in the terminal.""" - if (pixel_format != Kitty_Format.PNG or use_shm) and (width is None or height is None): - raise ValueError( - "shm transfer or using image formats other than PNG and require height and width" - ) + def get_terminal_size(self) -> tuple[int, int]: + """Get (rows, cols) of the terminal""" + buf = array.array("H", [0, 0, 0, 0]) + fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf) + rows, cols, width, height = buf + return (rows, cols) - kwargs = {"a": "T", "i": image_num, "f": pixel_format.value, "t": "s" if use_shm else "d", "q": 2} + async def get_terminal_cell_size(self) -> tuple[int, int]: + """Get (height, width) of a single cell in px""" + reply = await self._query_terminal("\x1b[16t", "t") - if width: - kwargs["s"] = width - if height: - kwargs["v"] = height + match = re.search(r"\[6;(\d+);(\d+)t", reply) + if match: + v_pix, h_pix = map(int, match.groups()) + return v_pix, h_pix + print(reply) + raise ValueError("Failed to parse terminal cell size response") - if compress: - kwargs["o"] = "z" - data = zlib.compress(buffer) - else: - data = buffer - if use_shm: - # write to shm - _mmap(data, width, height) - # set shm name as payload - data = SHM_NAME + async def get_terminal_size_pixel(self) -> tuple[int, int]: + """Get (height, width) of the terminal in px""" + reply = await self._query_terminal("\x1b[14t", "t") - await _write_chunked(**kwargs, data=data) - await asyncio.sleep(0.001) + match = re.search(r"\[4;(\d+);(\d+)t", reply) + if match: + height, width = map(int, match.groups()) + return height, width + raise ValueError("Failed to parse terminal pixel size response") + def hide_cursor(self) -> None: + """Tell the terminal to hide the cursor.""" + _write_stdout("\x1b[?25l") + + + def show_cursor(self) -> None: + """Tell the terminal to show the cursor.""" + _write_stdout("\x1b[?25h") + + + def set_position(self, y: int, x: int) -> None: + """Set the cursor position to y, x""" + _write_stdout(f"\x1b[{y};{x}H") + + async def get_position(self) -> tuple[int, int]: + """Get the (y, x) position of the cursor""" + reply = await self._query_terminal("\x1b[6n", "R") + + match = re.search(r"\[(\d+);(\d+)R", reply) + if match: + y, x = map(int, match.groups()) + return y, x + raise ValueError("Failed to parse cursor position response") + + async def draw( + self, + buffer, + width: int | None, + height: int | None, + pixel_format: Kitty_Format = Kitty_Format.RGBA, + image_num: int = 1, + compress=False, + use_shm: bool = False, + ) -> None: + """Display an image in the terminal.""" + if (pixel_format != Kitty_Format.PNG or use_shm) and (width is None or height is None): + raise ValueError( + "shm transfer or using image formats other than PNG and require height and width" + ) + + kwargs = {"a": "T", "i": image_num, "f": pixel_format.value, "t": "s" if use_shm else "d"} + + if width: + kwargs["s"] = width + if height: + kwargs["v"] = height + + if compress: + kwargs["o"] = "z" + data = zlib.compress(buffer) + else: + data = buffer + + if use_shm: + # write to shm + _mmap(data, width, height) + # set shm name as payload + data = SHM_NAME + + await _write_chunked(**kwargs, data=data) + await asyncio.sleep(0.001) + + async def _query_terminal(self, escape: str, endchar: str) -> str: + """ + Send `escape` to the terminal, read the response until + `endchar`, return response (including `endchar`) + """ + # Save the current terminal settings + loop = asyncio.get_running_loop() + fd = sys.stdin.fileno() + old_settings = termios.tcgetattr(fd) + + def read_terminal_response() -> str: + tty.setraw(fd) + _write_stdout(escape) + response = "" + while True: + char = sys.stdin.read(1) + response += char + if char == endchar: + break + return response + + try: + response = await loop.run_in_executor(None, read_terminal_response) + finally: + # Restore the terminal settings + termios.tcsetattr(fd, termios.TCSANOW, old_settings) + return response def _mmap(np_image, width: int, height: int) -> None: """Write image data to shared memory""" @@ -104,37 +192,6 @@ def supports_kitty_graphics(timeout=0.1) -> bool: termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) -async def get_terminal_cell_size() -> tuple[int, int]: - """Get (height, width) of a single cell in px""" - reply = await _query_terminal("\x1b[16t", "t") - - match = re.search(r"\[6;(\d+);(\d+)t", reply) - if match: - v_pix, h_pix = map(int, match.groups()) - return v_pix, h_pix - print(reply) - raise ValueError("Failed to parse terminal cell size response") - - -async def get_terminal_size_pixel() -> tuple[int, int]: - """Get (height, width) of the terminal in px""" - reply = await _query_terminal("\x1b[14t", "t") - - match = re.search(r"\[4;(\d+);(\d+)t", reply) - if match: - height, width = map(int, match.groups()) - return height, width - raise ValueError("Failed to parse terminal pixel size response") - - -def get_terminal_size() -> tuple[int, int]: - """Get (rows, cols) of the terminal""" - buf = array.array("H", [0, 0, 0, 0]) - fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf) - rows, cols, width, height = buf - return (rows, cols) - - def _serialize_gr_command(**cmd) -> bytes: payload = cmd.pop("payload", None) cmd = ",".join(f"{k}={v}" for k, v in cmd.items()) @@ -161,79 +218,24 @@ async def _write_chunked(**cmd) -> None: sys.stdout.flush() cmd.clear() - -def hide_cursor() -> None: - """Tell the terminal to hide the cursor.""" - _write_stdout("\x1b[?25l") - - -def show_cursor() -> None: - """Tell the terminal to show the cursor.""" - _write_stdout("\x1b[?25h") - - -def set_position(y: int, x: int) -> None: - """Set the cursor position to y, x""" - _write_stdout(f"\x1b[{y};{x}H") - - -async def get_position() -> tuple[int, int]: - """Get the (y, x) position of the cursor""" - reply = await _query_terminal("\x1b[6n", "R") - - match = re.search(r"\[(\d+);(\d+)R", reply) - if match: - y, x = map(int, match.groups()) - return y, x - raise ValueError("Failed to parse cursor position response") - - def _write_stdout(cmd: str) -> None: """Write a command string to stdout and flush.""" sys.stdout.write(cmd) sys.stdout.flush() -async def _query_terminal(escape: str, endchar: str) -> str: - """ - Send `escape` to the terminal, read the response until - `endchar`, return response (including `endchar`) - """ - # Save the current terminal settings - loop = asyncio.get_running_loop() - fd = sys.stdin.fileno() - old_settings = termios.tcgetattr(fd) - - def read_terminal_response() -> str: - tty.setraw(fd) - _write_stdout(escape) - response = "" - while True: - char = sys.stdin.read(1) - response += char - if char == endchar: - break - return response - - try: - response = await loop.run_in_executor(None, read_terminal_response) - finally: - # Restore the terminal settings - termios.tcsetattr(fd, termios.TCSANOW, old_settings) - return response - - async def main(): - rows, cols = get_terminal_size() - v_pix, h_pix = await get_terminal_cell_size() - height, width = await get_terminal_size_pixel() - y, x = await get_position() + term = Terminal() + rows, cols = term.get_terminal_size() + v_pix, h_pix = await term.get_terminal_cell_size() + height, width = await term.get_terminal_size_pixel() + y, x = await term.get_position() print(f"Terminal has {rows} rows, {cols} cols = {rows * cols} cells") print(f"Cell size: {h_pix}x{v_pix} px") print(f"Dimensions: {width}x{height} px") print("Cursor is at y, x:", y, x) - print(f"Supports graphics: {supports_kitty_graphics()}") + print(f"Supports graphics: {term.supports_kitty_graphics}") if __name__ == "__main__": diff --git a/terminalio.py b/terminalio.py new file mode 100644 index 0000000..995815d --- /dev/null +++ b/terminalio.py @@ -0,0 +1,68 @@ +import asyncio +import os +import re +import sys +import tty +import termios + +class TerminalIO: + def __init__(self): + self.ok_event = asyncio.Event() + self.input_callbacks = [] + self._reader_task = None + self._stop = False + self._buffer = "" + self._old_settings = None + + def register_input_callback(self, callback): + """Register a function that takes a single character input.""" + self.input_callbacks.append(callback) + + def start(self): + """Start the stdin reading loop.""" + if not sys.stdin.isatty(): + raise RuntimeError("stdin is not a TTY") + + self._old_settings = termios.tcgetattr(sys.stdin) + tty.setraw(sys.stdin.fileno()) + self._reader_task = asyncio.create_task(self._stdin_loop()) + + async def stop(self): + """Stop the reader and restore terminal settings.""" + self._stop = True + if self._reader_task: + await self._reader_task + if self._old_settings: + termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self._old_settings) + + async def wait_for_ok(self, timeout=1.0): + """Wait for a Kitty OK message (non-blocking).""" + try: + await asyncio.wait_for(self.ok_event.wait(), timeout) + except asyncio.TimeoutError: + raise RuntimeError("Timed out waiting for Kitty OK response") + self.ok_event.clear() + + async def _stdin_loop(self): + loop = asyncio.get_running_loop() + fd = sys.stdin.fileno() + + def read_byte(): + return os.read(fd, 1).decode("utf-8", errors="ignore") + + while not self._stop: + char = await loop.run_in_executor(None, read_byte) + + self._buffer += char + if self._buffer.endswith("\033\\"): + #if "\033_GOK" in self._buffer: + if re.search('\033_G(i=\d+;)?OK', self._buffer): + self.ok_event.set() + self._buffer = "" + continue + + # Dispatch to key input handlers (non-escape) + if not char.startswith("\033"): + for callback in self.input_callbacks: + callback(char) +