Compare commits

...

3 Commits

3 changed files with 330 additions and 180 deletions

265
kitty.py
View File

@ -1,20 +1,19 @@
#!/usr/bin/env python3
from base64 import standard_b64encode
from enum import Enum
import array
import asyncio
import fcntl
import mmap
import os
import re
import select
import sys
import termios
import tty
from base64 import standard_b64encode
import array
import fcntl
import zlib
import mmap
import os
import select
from enum import Enum
from terminalio import TerminalIO
SHM_NAME = "/kitty-shm-frame"
@ -24,8 +23,69 @@ class Kitty_Format(Enum):
RGBA = 32
PNG = 100
class TerminalProtocolError(Exception):
pass
async def draw_to_terminal(
class Terminal():
def __init__(self):
self.terminal_io = TerminalIO()
self.terminal_io.start()
self.supports_kitty_graphics = supports_kitty_graphics()
def get_terminal_size(self) -> tuple[int, int]:
"""Get (rows, cols) of the terminal"""
buf = array.array("H", [0, 0, 0, 0])
fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf)
rows, cols, width, height = buf
return (rows, cols)
async def get_terminal_cell_size(self) -> tuple[int, int]:
"""Get (height, width) of a single cell in px"""
reply = await self._query_terminal("\x1b[16t", "t")
match = re.search(r"\[6;(\d+);(\d+)t", reply)
if match:
v_pix, h_pix = map(int, match.groups())
return v_pix, h_pix
print(reply)
raise ValueError("Failed to parse terminal cell size response")
async def get_terminal_size_pixel(self) -> tuple[int, int]:
"""Get (height, width) of the terminal in px"""
reply = await self._query_terminal("\x1b[14t", "t")
match = re.search(r"\[4;(\d+);(\d+)t", reply)
if match:
height, width = map(int, match.groups())
return height, width
raise ValueError("Failed to parse terminal pixel size response")
def hide_cursor(self) -> None:
"""Tell the terminal to hide the cursor."""
_write_stdout("\x1b[?25l")
def show_cursor(self) -> None:
"""Tell the terminal to show the cursor."""
_write_stdout("\x1b[?25h")
def set_position(self, y: int, x: int) -> None:
"""Set the cursor position to y, x"""
_write_stdout(f"\x1b[{y};{x}H")
async def get_position(self) -> tuple[int, int]:
"""Get the (y, x) position of the cursor"""
reply = await self._query_terminal("\x1b[6n", "R")
match = re.search(r"\[(\d+);(\d+)R", reply)
if match:
y, x = map(int, match.groups())
return y, x
raise ValueError("Failed to parse cursor position response")
async def draw(
self,
buffer,
width: int | None,
height: int | None,
@ -40,7 +100,7 @@ async def draw_to_terminal(
"shm transfer or using image formats other than PNG and require height and width"
)
kwargs = {"a": "T", "i": image_num, "f": pixel_format.value, "t": "s" if use_shm else "d", "q": 2}
kwargs = {"a": "T", "i": image_num, "f": pixel_format.value, "t": "s" if use_shm else "d"}
if width:
kwargs["s"] = width
@ -59,9 +119,53 @@ async def draw_to_terminal(
# set shm name as payload
data = SHM_NAME
await _write_chunked(**kwargs, data=data)
await _write_chunked(**kwargs, data=data, wait_for_ok=True if use_shm else False)
await asyncio.sleep(0.001)
async def _query_terminal(self, escape: str, endchar: str) -> str:
"""
Send `escape` to the terminal, read the response until
`endchar`, return response (including `endchar`)
"""
loop = asyncio.get_running_loop()
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
def read_terminal_response() -> str:
tty.setraw(fd)
_write_stdout(escape)
response = ""
while True:
char = sys.stdin.read(1)
response += char
if char == endchar:
break
return response
try:
response = await loop.run_in_executor(None, read_terminal_response)
finally:
# Restore the terminal settings
termios.tcsetattr(fd, termios.TCSANOW, old_settings)
return response
async def _write_chunked(self, wait_for_ok: bool = True, **cmd) -> None:
image_id = cmd['i']
if cmd["t"] == "s":
data = standard_b64encode(bytes(cmd.pop("data"), "utf-8"))
else:
data = standard_b64encode(cmd.pop("data"))
while data:
chunk, data = data[:4096], data[4096:]
m = 1 if data else 0
payload = _serialize_gr_command(payload=chunk, m=m, **cmd)
sys.stdout.buffer.write(payload)
sys.stdout.flush()
cmd.clear()
if wait_for_ok:
# Wait for terminal to confirm OK response
await self.terminal_io.wait_for_ok()
def _mmap(np_image, width: int, height: int) -> None:
"""Write image data to shared memory"""
@ -104,37 +208,6 @@ def supports_kitty_graphics(timeout=0.1) -> bool:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
async def get_terminal_cell_size() -> tuple[int, int]:
"""Get (height, width) of a single cell in px"""
reply = await _query_terminal("\x1b[16t", "t")
match = re.search(r"\[6;(\d+);(\d+)t", reply)
if match:
v_pix, h_pix = map(int, match.groups())
return v_pix, h_pix
print(reply)
raise ValueError("Failed to parse terminal cell size response")
async def get_terminal_size_pixel() -> tuple[int, int]:
"""Get (height, width) of the terminal in px"""
reply = await _query_terminal("\x1b[14t", "t")
match = re.search(r"\[4;(\d+);(\d+)t", reply)
if match:
height, width = map(int, match.groups())
return height, width
raise ValueError("Failed to parse terminal pixel size response")
def get_terminal_size() -> tuple[int, int]:
"""Get (rows, cols) of the terminal"""
buf = array.array("H", [0, 0, 0, 0])
fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf)
rows, cols, width, height = buf
return (rows, cols)
def _serialize_gr_command(**cmd) -> bytes:
payload = cmd.pop("payload", None)
cmd = ",".join(f"{k}={v}" for k, v in cmd.items())
@ -148,11 +221,33 @@ def _serialize_gr_command(**cmd) -> bytes:
return b"".join(ans)
async def _write_chunked(**cmd) -> None:
#async def _write_chunked(**cmd) -> None:
# if cmd["t"] == "s":
# data = standard_b64encode(bytes(cmd.pop("data"), "utf-8"))
# else:
# data = standard_b64encode(cmd.pop("data"))
# while data:
# chunk, data = data[:4096], data[4096:]
# m = 1 if data else 0
# payload = _serialize_gr_command(payload=chunk, m=m, **cmd)
# sys.stdout.buffer.write(payload)
# sys.stdout.flush()
# cmd.clear()
async def _write_chunked(wait_for_ok: bool = True, **cmd) -> None:
image_id = cmd['i']
if cmd["t"] == "s":
data = standard_b64encode(bytes(cmd.pop("data"), "utf-8"))
else:
data = standard_b64encode(cmd.pop("data"))
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
tty.setraw(fd)
loop = asyncio.get_running_loop()
try:
while data:
chunk, data = data[:4096], data[4096:]
m = 1 if data else 0
@ -161,32 +256,24 @@ async def _write_chunked(**cmd) -> None:
sys.stdout.flush()
cmd.clear()
if wait_for_ok:
# Wait for terminal to confirm OK response
response = os.read(fd, 1024).decode("utf-8", "ignore")
if not response.startswith(f"\033_Gi={image_id};OK"):
return
#raise RuntimeError(f"Unexpected response: {repr(response)}")
#return response.startswith("\033_GOK")
# def _read_ok():
# response = ""
# while not response.endswith("\033\\"):
# response += sys.stdin.read(1)
# return response
def hide_cursor() -> None:
"""Tell the terminal to hide the cursor."""
_write_stdout("\x1b[?25l")
def show_cursor() -> None:
"""Tell the terminal to show the cursor."""
_write_stdout("\x1b[?25h")
def set_position(y: int, x: int) -> None:
"""Set the cursor position to y, x"""
_write_stdout(f"\x1b[{y};{x}H")
async def get_position() -> tuple[int, int]:
"""Get the (y, x) position of the cursor"""
reply = await _query_terminal("\x1b[6n", "R")
match = re.search(r"\[(\d+);(\d+)R", reply)
if match:
y, x = map(int, match.groups())
return y, x
raise ValueError("Failed to parse cursor position response")
# response = await loop.run_in_executor(None, _read_ok)
# if not response.startswith(f"\033_Gi={image_id};OK"):
# raise RuntimeError(f"Unexpected response: {repr(response)}")
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
def _write_stdout(cmd: str) -> None:
"""Write a command string to stdout and flush."""
@ -194,46 +281,18 @@ def _write_stdout(cmd: str) -> None:
sys.stdout.flush()
async def _query_terminal(escape: str, endchar: str) -> str:
"""
Send `escape` to the terminal, read the response until
`endchar`, return response (including `endchar`)
"""
# Save the current terminal settings
loop = asyncio.get_running_loop()
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
def read_terminal_response() -> str:
tty.setraw(fd)
_write_stdout(escape)
response = ""
while True:
char = sys.stdin.read(1)
response += char
if char == endchar:
break
return response
try:
response = await loop.run_in_executor(None, read_terminal_response)
finally:
# Restore the terminal settings
termios.tcsetattr(fd, termios.TCSANOW, old_settings)
return response
async def main():
rows, cols = get_terminal_size()
v_pix, h_pix = await get_terminal_cell_size()
height, width = await get_terminal_size_pixel()
y, x = await get_position()
term = Terminal()
rows, cols = term.get_terminal_size()
v_pix, h_pix = await term.get_terminal_cell_size()
height, width = await term.get_terminal_size_pixel()
y, x = await term.get_position()
print(f"Terminal has {rows} rows, {cols} cols = {rows * cols} cells")
print(f"Cell size: {h_pix}x{v_pix} px")
print(f"Dimensions: {width}x{height} px")
print("Cursor is at y, x:", y, x)
print(f"Supports graphics: {supports_kitty_graphics()}")
print(f"Supports graphics: {term.supports_kitty_graphics}")
if __name__ == "__main__":

69
terminalio.py Normal file
View File

@ -0,0 +1,69 @@
import asyncio
import os
import re
import sys
import tty
import termios
class TerminalIO:
def __init__(self):
self.ok_event = asyncio.Event()
self.input_callbacks = []
self._reader_task = None
self._stop = False
self._buffer = ""
self._old_settings = None
def register_input_callback(self, callback):
"""Register a function that takes a single character input."""
self.input_callbacks.append(callback)
def start(self):
"""Start the stdin reading loop."""
if not sys.stdin.isatty():
raise RuntimeError("stdin is not a TTY")
self._old_settings = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
self._reader_task = asyncio.create_task(self._stdin_loop())
async def stop(self):
"""Stop the reader and restore terminal settings."""
self._stop = True
if self._reader_task:
await self._reader_task
if self._old_settings:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self._old_settings)
async def wait_for_ok(self, timeout=1.0):
"""Wait for a Kitty OK message (non-blocking)."""
try:
await asyncio.wait_for(self.ok_event.wait(), timeout)
except asyncio.TimeoutError:
raise RuntimeError("Timed out waiting for Kitty OK response")
self.ok_event.clear()
async def _stdin_loop(self):
loop = asyncio.get_running_loop()
fd = sys.stdin.fileno()
def read_byte():
return os.read(fd, 1).decode("utf-8", errors="ignore")
while not self._stop:
char = await loop.run_in_executor(None, read_byte)
self._buffer += char
if self._buffer.endswith("\033\\"):
#if "\033_GOK" in self._buffer:
if re.search('\033_G(i=\\d+;)?OK', self._buffer):
self.ok_event.set()
self._buffer = ""
continue
# Dispatch to key input handlers (non-escape)
if not char.startswith("\033"):
for callback in self.input_callbacks:
callback(char)
await asyncio.sleep(0.01)

View File

@ -29,12 +29,13 @@ class TerminalPlotter(pv.Plotter):
self.set_background([0.0, 0.0, 0.0])
self.ren_win.SetAlphaBitPlanes(1)
self.ren_win.SetMultiSamples(0)
self.terminal = kitty.Terminal()
async def initialize(self):
h_pix, _ = await kitty.get_terminal_cell_size()
self.rows, _ = kitty.get_terminal_size()
self.start_y, self.start_x = await kitty.get_position()
h_pix, _ = await self.terminal.get_terminal_cell_size()
self.rows, _ = self.terminal.get_terminal_size()
self.start_y, self.start_x = await self.terminal.get_position()
self.needed_lines = math.ceil(self.height / h_pix)
# Add vertical space if needed
@ -42,10 +43,10 @@ class TerminalPlotter(pv.Plotter):
missing = self.needed_lines - (self.rows - self.start_y)
self.start_y -= missing
print("\n" * self.needed_lines, end="")
kitty.set_position(self.start_y, self.start_x)
self.terminal.set_position(self.start_y, self.start_x)
self.orbit = self.generate_orbital_path()
async def _handle_key(self, c):
def _handle_key(c):
if c == "a":
self.camera.Azimuth(10)
elif c == "d":
@ -60,23 +61,43 @@ class TerminalPlotter(pv.Plotter):
self.camera.zoom(0.9)
elif c == "p":
self.fly_to(self.orbit.points[15])
async def _input_loop(self):
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setcbreak(fd)
while self._running:
rlist, _, _ = select.select([sys.stdin], [], [], 0)
if rlist:
c = sys.stdin.read(1)
if c == "q": # quit on q
elif c == "q":
self._running = False
else:
await self._handle_key(c)
await asyncio.sleep(0.001)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
self.terminal.terminal_io.register_input_callback(_handle_key)
# async def _handle_key(self, c):
# if c == "a":
# self.camera.Azimuth(10)
# elif c == "d":
# self.camera.Azimuth(-10)
# elif c == "w":
# self.camera.Elevation(10)
# elif c == "s":
# self.camera.Elevation(-10)
# elif c == "+":
# self.camera.zoom(1.1)
# elif c == "-":
# self.camera.zoom(0.9)
# elif c == "p":
# self.fly_to(self.orbit.points[15])
#async def _input_loop(self):
# fd = sys.stdin.fileno()
# old_settings = termios.tcgetattr(fd)
# try:
# tty.setcbreak(fd)
# while self._running:
# rlist, _, _ = select.select([sys.stdin], [], [], 0)
# if rlist:
# c = sys.stdin.read(1)
# if c == "q": # quit on q
# self._running = False
# else:
# await self._handle_key(c)
# await asyncio.sleep(0.001)
# finally:
# termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
async def _render_loop(self):
import time
@ -93,22 +114,23 @@ class TerminalPlotter(pv.Plotter):
np_image = numpy_support.vtk_to_numpy(vtk_array).reshape(height, width, 4)
np_image = np_image[::-1] # Flip vertically
np_image = np.ascontiguousarray(np_image) # Ensure memory layout is C-contiguous
print("Render & copy:", time.perf_counter() - start)
#print("Render & copy:", time.perf_counter() - start)
#await kitty.draw_to_terminal(np_image, width, height, compress=True)
await kitty.draw_to_terminal(np_image, width, height, use_shm=True)
print("Draw:", time.perf_counter() - start)
kitty.set_position(self.start_y, self.start_x)
#await self.terminal.draw_to_terminal(np_image, width, height, compress=True)
await self.terminal.draw(np_image, width, height, use_shm=True)
#print("Draw:", time.perf_counter() - start)
self.terminal.set_position(self.start_y, self.start_x)
self.camera.Azimuth(1)
async def run(self):
await self.initialize()
self.iren.initialize()
tasks = [
asyncio.create_task(self._input_loop()),
asyncio.create_task(self._render_loop()),
]
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
#tasks = [
# #asyncio.create_task(self._input_loop()),
# asyncio.create_task(self._render_loop()),
#]
#await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
await self._render_loop()
self._running = False
async def main():