This commit is contained in:
Felix Pankratz 2025-07-26 19:12:47 +02:00
parent a0376ae9a4
commit e56d6f9734
2 changed files with 193 additions and 123 deletions

192
kitty.py
View File

@ -16,6 +16,8 @@ import os
import select import select
from enum import Enum from enum import Enum
from terminalio import TerminalIO
SHM_NAME = "/kitty-shm-frame" SHM_NAME = "/kitty-shm-frame"
@ -24,8 +26,67 @@ class Kitty_Format(Enum):
RGBA = 32 RGBA = 32
PNG = 100 PNG = 100
class Terminal():
def __init__(self):
self.terminal_io = TerminalIO()
self.terminal_io.start()
self.supports_kitty_graphics = supports_kitty_graphics()
async def draw_to_terminal( def get_terminal_size(self) -> tuple[int, int]:
"""Get (rows, cols) of the terminal"""
buf = array.array("H", [0, 0, 0, 0])
fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf)
rows, cols, width, height = buf
return (rows, cols)
async def get_terminal_cell_size(self) -> tuple[int, int]:
"""Get (height, width) of a single cell in px"""
reply = await self._query_terminal("\x1b[16t", "t")
match = re.search(r"\[6;(\d+);(\d+)t", reply)
if match:
v_pix, h_pix = map(int, match.groups())
return v_pix, h_pix
print(reply)
raise ValueError("Failed to parse terminal cell size response")
async def get_terminal_size_pixel(self) -> tuple[int, int]:
"""Get (height, width) of the terminal in px"""
reply = await self._query_terminal("\x1b[14t", "t")
match = re.search(r"\[4;(\d+);(\d+)t", reply)
if match:
height, width = map(int, match.groups())
return height, width
raise ValueError("Failed to parse terminal pixel size response")
def hide_cursor(self) -> None:
"""Tell the terminal to hide the cursor."""
_write_stdout("\x1b[?25l")
def show_cursor(self) -> None:
"""Tell the terminal to show the cursor."""
_write_stdout("\x1b[?25h")
def set_position(self, y: int, x: int) -> None:
"""Set the cursor position to y, x"""
_write_stdout(f"\x1b[{y};{x}H")
async def get_position(self) -> tuple[int, int]:
"""Get the (y, x) position of the cursor"""
reply = await self._query_terminal("\x1b[6n", "R")
match = re.search(r"\[(\d+);(\d+)R", reply)
if match:
y, x = map(int, match.groups())
return y, x
raise ValueError("Failed to parse cursor position response")
async def draw(
self,
buffer, buffer,
width: int | None, width: int | None,
height: int | None, height: int | None,
@ -33,14 +94,14 @@ async def draw_to_terminal(
image_num: int = 1, image_num: int = 1,
compress=False, compress=False,
use_shm: bool = False, use_shm: bool = False,
) -> None: ) -> None:
"""Display an image in the terminal.""" """Display an image in the terminal."""
if (pixel_format != Kitty_Format.PNG or use_shm) and (width is None or height is None): if (pixel_format != Kitty_Format.PNG or use_shm) and (width is None or height is None):
raise ValueError( raise ValueError(
"shm transfer or using image formats other than PNG and require height and width" "shm transfer or using image formats other than PNG and require height and width"
) )
kwargs = {"a": "T", "i": image_num, "f": pixel_format.value, "t": "s" if use_shm else "d", "q": 2} kwargs = {"a": "T", "i": image_num, "f": pixel_format.value, "t": "s" if use_shm else "d"}
if width: if width:
kwargs["s"] = width kwargs["s"] = width
@ -62,6 +123,33 @@ async def draw_to_terminal(
await _write_chunked(**kwargs, data=data) await _write_chunked(**kwargs, data=data)
await asyncio.sleep(0.001) await asyncio.sleep(0.001)
async def _query_terminal(self, escape: str, endchar: str) -> str:
"""
Send `escape` to the terminal, read the response until
`endchar`, return response (including `endchar`)
"""
# Save the current terminal settings
loop = asyncio.get_running_loop()
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
def read_terminal_response() -> str:
tty.setraw(fd)
_write_stdout(escape)
response = ""
while True:
char = sys.stdin.read(1)
response += char
if char == endchar:
break
return response
try:
response = await loop.run_in_executor(None, read_terminal_response)
finally:
# Restore the terminal settings
termios.tcsetattr(fd, termios.TCSANOW, old_settings)
return response
def _mmap(np_image, width: int, height: int) -> None: def _mmap(np_image, width: int, height: int) -> None:
"""Write image data to shared memory""" """Write image data to shared memory"""
@ -104,37 +192,6 @@ def supports_kitty_graphics(timeout=0.1) -> bool:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
async def get_terminal_cell_size() -> tuple[int, int]:
"""Get (height, width) of a single cell in px"""
reply = await _query_terminal("\x1b[16t", "t")
match = re.search(r"\[6;(\d+);(\d+)t", reply)
if match:
v_pix, h_pix = map(int, match.groups())
return v_pix, h_pix
print(reply)
raise ValueError("Failed to parse terminal cell size response")
async def get_terminal_size_pixel() -> tuple[int, int]:
"""Get (height, width) of the terminal in px"""
reply = await _query_terminal("\x1b[14t", "t")
match = re.search(r"\[4;(\d+);(\d+)t", reply)
if match:
height, width = map(int, match.groups())
return height, width
raise ValueError("Failed to parse terminal pixel size response")
def get_terminal_size() -> tuple[int, int]:
"""Get (rows, cols) of the terminal"""
buf = array.array("H", [0, 0, 0, 0])
fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf)
rows, cols, width, height = buf
return (rows, cols)
def _serialize_gr_command(**cmd) -> bytes: def _serialize_gr_command(**cmd) -> bytes:
payload = cmd.pop("payload", None) payload = cmd.pop("payload", None)
cmd = ",".join(f"{k}={v}" for k, v in cmd.items()) cmd = ",".join(f"{k}={v}" for k, v in cmd.items())
@ -161,79 +218,24 @@ async def _write_chunked(**cmd) -> None:
sys.stdout.flush() sys.stdout.flush()
cmd.clear() cmd.clear()
def hide_cursor() -> None:
"""Tell the terminal to hide the cursor."""
_write_stdout("\x1b[?25l")
def show_cursor() -> None:
"""Tell the terminal to show the cursor."""
_write_stdout("\x1b[?25h")
def set_position(y: int, x: int) -> None:
"""Set the cursor position to y, x"""
_write_stdout(f"\x1b[{y};{x}H")
async def get_position() -> tuple[int, int]:
"""Get the (y, x) position of the cursor"""
reply = await _query_terminal("\x1b[6n", "R")
match = re.search(r"\[(\d+);(\d+)R", reply)
if match:
y, x = map(int, match.groups())
return y, x
raise ValueError("Failed to parse cursor position response")
def _write_stdout(cmd: str) -> None: def _write_stdout(cmd: str) -> None:
"""Write a command string to stdout and flush.""" """Write a command string to stdout and flush."""
sys.stdout.write(cmd) sys.stdout.write(cmd)
sys.stdout.flush() sys.stdout.flush()
async def _query_terminal(escape: str, endchar: str) -> str:
"""
Send `escape` to the terminal, read the response until
`endchar`, return response (including `endchar`)
"""
# Save the current terminal settings
loop = asyncio.get_running_loop()
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
def read_terminal_response() -> str:
tty.setraw(fd)
_write_stdout(escape)
response = ""
while True:
char = sys.stdin.read(1)
response += char
if char == endchar:
break
return response
try:
response = await loop.run_in_executor(None, read_terminal_response)
finally:
# Restore the terminal settings
termios.tcsetattr(fd, termios.TCSANOW, old_settings)
return response
async def main(): async def main():
rows, cols = get_terminal_size() term = Terminal()
v_pix, h_pix = await get_terminal_cell_size() rows, cols = term.get_terminal_size()
height, width = await get_terminal_size_pixel() v_pix, h_pix = await term.get_terminal_cell_size()
y, x = await get_position() height, width = await term.get_terminal_size_pixel()
y, x = await term.get_position()
print(f"Terminal has {rows} rows, {cols} cols = {rows * cols} cells") print(f"Terminal has {rows} rows, {cols} cols = {rows * cols} cells")
print(f"Cell size: {h_pix}x{v_pix} px") print(f"Cell size: {h_pix}x{v_pix} px")
print(f"Dimensions: {width}x{height} px") print(f"Dimensions: {width}x{height} px")
print("Cursor is at y, x:", y, x) print("Cursor is at y, x:", y, x)
print(f"Supports graphics: {supports_kitty_graphics()}") print(f"Supports graphics: {term.supports_kitty_graphics}")
if __name__ == "__main__": if __name__ == "__main__":

68
terminalio.py Normal file
View File

@ -0,0 +1,68 @@
import asyncio
import os
import re
import sys
import tty
import termios
class TerminalIO:
def __init__(self):
self.ok_event = asyncio.Event()
self.input_callbacks = []
self._reader_task = None
self._stop = False
self._buffer = ""
self._old_settings = None
def register_input_callback(self, callback):
"""Register a function that takes a single character input."""
self.input_callbacks.append(callback)
def start(self):
"""Start the stdin reading loop."""
if not sys.stdin.isatty():
raise RuntimeError("stdin is not a TTY")
self._old_settings = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
self._reader_task = asyncio.create_task(self._stdin_loop())
async def stop(self):
"""Stop the reader and restore terminal settings."""
self._stop = True
if self._reader_task:
await self._reader_task
if self._old_settings:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self._old_settings)
async def wait_for_ok(self, timeout=1.0):
"""Wait for a Kitty OK message (non-blocking)."""
try:
await asyncio.wait_for(self.ok_event.wait(), timeout)
except asyncio.TimeoutError:
raise RuntimeError("Timed out waiting for Kitty OK response")
self.ok_event.clear()
async def _stdin_loop(self):
loop = asyncio.get_running_loop()
fd = sys.stdin.fileno()
def read_byte():
return os.read(fd, 1).decode("utf-8", errors="ignore")
while not self._stop:
char = await loop.run_in_executor(None, read_byte)
self._buffer += char
if self._buffer.endswith("\033\\"):
#if "\033_GOK" in self._buffer:
if re.search('\033_G(i=\d+;)?OK', self._buffer):
self.ok_event.set()
self._buffer = ""
continue
# Dispatch to key input handlers (non-escape)
if not char.startswith("\033"):
for callback in self.input_callbacks:
callback(char)