diff --git a/kitty.py b/kitty.py index 74814f3..4f631b0 100644 --- a/kitty.py +++ b/kitty.py @@ -1,21 +1,19 @@ #!/usr/bin/env python3 +from base64 import standard_b64encode +from enum import Enum +import array import asyncio +import fcntl +import mmap +import os import re +import select import sys import termios import tty -from base64 import standard_b64encode -import array -import fcntl - import zlib -import mmap -import os -import select -from enum import Enum - SHM_NAME = "/kitty-shm-frame" @@ -24,6 +22,9 @@ class Kitty_Format(Enum): RGBA = 32 PNG = 100 +class TerminalProtocolError(Exception): + pass + async def draw_to_terminal( buffer, @@ -36,11 +37,11 @@ async def draw_to_terminal( ) -> None: """Display an image in the terminal.""" if (pixel_format != Kitty_Format.PNG or use_shm) and (width is None or height is None): - raise ValueError( + raise TerminalProtocolError( "shm transfer or using image formats other than PNG and require height and width" ) - kwargs = {"a": "T", "i": image_num, "f": pixel_format.value, "t": "s" if use_shm else "d", "q": 2} + kwargs = {"a": "T", "i": image_num, "f": pixel_format.value, "t": "s" if use_shm else "d"} if width: kwargs["s"] = width @@ -59,7 +60,7 @@ async def draw_to_terminal( # set shm name as payload data = SHM_NAME - await _write_chunked(**kwargs, data=data) + await _write_chunked(**kwargs, data=data, wait_for_ok=True) await asyncio.sleep(0.001) @@ -113,7 +114,7 @@ async def get_terminal_cell_size() -> tuple[int, int]: v_pix, h_pix = map(int, match.groups()) return v_pix, h_pix print(reply) - raise ValueError("Failed to parse terminal cell size response") + raise TerminalProtocolError("Failed to parse terminal cell size response") async def get_terminal_size_pixel() -> tuple[int, int]: @@ -124,7 +125,7 @@ async def get_terminal_size_pixel() -> tuple[int, int]: if match: height, width = map(int, match.groups()) return height, width - raise ValueError("Failed to parse terminal pixel size response") + raise TerminalProtocolError("Failed to parse terminal pixel size response") def get_terminal_size() -> tuple[int, int]: @@ -148,19 +149,59 @@ def _serialize_gr_command(**cmd) -> bytes: return b"".join(ans) -async def _write_chunked(**cmd) -> None: +#async def _write_chunked(**cmd) -> None: +# if cmd["t"] == "s": +# data = standard_b64encode(bytes(cmd.pop("data"), "utf-8")) +# else: +# data = standard_b64encode(cmd.pop("data")) +# while data: +# chunk, data = data[:4096], data[4096:] +# m = 1 if data else 0 +# payload = _serialize_gr_command(payload=chunk, m=m, **cmd) +# sys.stdout.buffer.write(payload) +# sys.stdout.flush() +# cmd.clear() + +async def _write_chunked(wait_for_ok: bool = True, **cmd) -> None: + image_id = cmd['i'] if cmd["t"] == "s": data = standard_b64encode(bytes(cmd.pop("data"), "utf-8")) else: data = standard_b64encode(cmd.pop("data")) - while data: - chunk, data = data[:4096], data[4096:] - m = 1 if data else 0 - payload = _serialize_gr_command(payload=chunk, m=m, **cmd) - sys.stdout.buffer.write(payload) - sys.stdout.flush() - cmd.clear() + fd = sys.stdin.fileno() + old_settings = termios.tcgetattr(fd) + tty.setraw(fd) + + loop = asyncio.get_running_loop() + + try: + while data: + chunk, data = data[:4096], data[4096:] + m = 1 if data else 0 + payload = _serialize_gr_command(payload=chunk, m=m, **cmd) + sys.stdout.buffer.write(payload) + sys.stdout.flush() + cmd.clear() + + if wait_for_ok: + # Wait for terminal to confirm OK response + response = os.read(fd, 1024).decode("utf-8", "ignore") + if not response.startswith(f"\033_Gi={image_id};OK"): + return + #raise RuntimeError(f"Unexpected response: {repr(response)}") + #return response.startswith("\033_GOK") + # def _read_ok(): + # response = "" + # while not response.endswith("\033\\"): + # response += sys.stdin.read(1) + # return response + + # response = await loop.run_in_executor(None, _read_ok) + # if not response.startswith(f"\033_Gi={image_id};OK"): + # raise RuntimeError(f"Unexpected response: {repr(response)}") + finally: + termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) def hide_cursor() -> None: """Tell the terminal to hide the cursor."""