Compare commits

...

3 Commits

3 changed files with 330 additions and 180 deletions

265
kitty.py
View File

@ -1,20 +1,19 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from base64 import standard_b64encode
from enum import Enum
import array
import asyncio import asyncio
import fcntl
import mmap
import os
import re import re
import select
import sys import sys
import termios import termios
import tty import tty
from base64 import standard_b64encode
import array
import fcntl
import zlib import zlib
from terminalio import TerminalIO
import mmap
import os
import select
from enum import Enum
SHM_NAME = "/kitty-shm-frame" SHM_NAME = "/kitty-shm-frame"
@ -24,8 +23,69 @@ class Kitty_Format(Enum):
RGBA = 32 RGBA = 32
PNG = 100 PNG = 100
class TerminalProtocolError(Exception):
pass
async def draw_to_terminal(
class Terminal():
def __init__(self):
self.terminal_io = TerminalIO()
self.terminal_io.start()
self.supports_kitty_graphics = supports_kitty_graphics()
def get_terminal_size(self) -> tuple[int, int]:
"""Get (rows, cols) of the terminal"""
buf = array.array("H", [0, 0, 0, 0])
fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf)
rows, cols, width, height = buf
return (rows, cols)
async def get_terminal_cell_size(self) -> tuple[int, int]:
"""Get (height, width) of a single cell in px"""
reply = await self._query_terminal("\x1b[16t", "t")
match = re.search(r"\[6;(\d+);(\d+)t", reply)
if match:
v_pix, h_pix = map(int, match.groups())
return v_pix, h_pix
print(reply)
raise ValueError("Failed to parse terminal cell size response")
async def get_terminal_size_pixel(self) -> tuple[int, int]:
"""Get (height, width) of the terminal in px"""
reply = await self._query_terminal("\x1b[14t", "t")
match = re.search(r"\[4;(\d+);(\d+)t", reply)
if match:
height, width = map(int, match.groups())
return height, width
raise ValueError("Failed to parse terminal pixel size response")
def hide_cursor(self) -> None:
"""Tell the terminal to hide the cursor."""
_write_stdout("\x1b[?25l")
def show_cursor(self) -> None:
"""Tell the terminal to show the cursor."""
_write_stdout("\x1b[?25h")
def set_position(self, y: int, x: int) -> None:
"""Set the cursor position to y, x"""
_write_stdout(f"\x1b[{y};{x}H")
async def get_position(self) -> tuple[int, int]:
"""Get the (y, x) position of the cursor"""
reply = await self._query_terminal("\x1b[6n", "R")
match = re.search(r"\[(\d+);(\d+)R", reply)
if match:
y, x = map(int, match.groups())
return y, x
raise ValueError("Failed to parse cursor position response")
async def draw(
self,
buffer, buffer,
width: int | None, width: int | None,
height: int | None, height: int | None,
@ -40,7 +100,7 @@ async def draw_to_terminal(
"shm transfer or using image formats other than PNG and require height and width" "shm transfer or using image formats other than PNG and require height and width"
) )
kwargs = {"a": "T", "i": image_num, "f": pixel_format.value, "t": "s" if use_shm else "d", "q": 2} kwargs = {"a": "T", "i": image_num, "f": pixel_format.value, "t": "s" if use_shm else "d"}
if width: if width:
kwargs["s"] = width kwargs["s"] = width
@ -59,9 +119,53 @@ async def draw_to_terminal(
# set shm name as payload # set shm name as payload
data = SHM_NAME data = SHM_NAME
await _write_chunked(**kwargs, data=data) await _write_chunked(**kwargs, data=data, wait_for_ok=True if use_shm else False)
await asyncio.sleep(0.001) await asyncio.sleep(0.001)
async def _query_terminal(self, escape: str, endchar: str) -> str:
"""
Send `escape` to the terminal, read the response until
`endchar`, return response (including `endchar`)
"""
loop = asyncio.get_running_loop()
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
def read_terminal_response() -> str:
tty.setraw(fd)
_write_stdout(escape)
response = ""
while True:
char = sys.stdin.read(1)
response += char
if char == endchar:
break
return response
try:
response = await loop.run_in_executor(None, read_terminal_response)
finally:
# Restore the terminal settings
termios.tcsetattr(fd, termios.TCSANOW, old_settings)
return response
async def _write_chunked(self, wait_for_ok: bool = True, **cmd) -> None:
image_id = cmd['i']
if cmd["t"] == "s":
data = standard_b64encode(bytes(cmd.pop("data"), "utf-8"))
else:
data = standard_b64encode(cmd.pop("data"))
while data:
chunk, data = data[:4096], data[4096:]
m = 1 if data else 0
payload = _serialize_gr_command(payload=chunk, m=m, **cmd)
sys.stdout.buffer.write(payload)
sys.stdout.flush()
cmd.clear()
if wait_for_ok:
# Wait for terminal to confirm OK response
await self.terminal_io.wait_for_ok()
def _mmap(np_image, width: int, height: int) -> None: def _mmap(np_image, width: int, height: int) -> None:
"""Write image data to shared memory""" """Write image data to shared memory"""
@ -104,37 +208,6 @@ def supports_kitty_graphics(timeout=0.1) -> bool:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
async def get_terminal_cell_size() -> tuple[int, int]:
"""Get (height, width) of a single cell in px"""
reply = await _query_terminal("\x1b[16t", "t")
match = re.search(r"\[6;(\d+);(\d+)t", reply)
if match:
v_pix, h_pix = map(int, match.groups())
return v_pix, h_pix
print(reply)
raise ValueError("Failed to parse terminal cell size response")
async def get_terminal_size_pixel() -> tuple[int, int]:
"""Get (height, width) of the terminal in px"""
reply = await _query_terminal("\x1b[14t", "t")
match = re.search(r"\[4;(\d+);(\d+)t", reply)
if match:
height, width = map(int, match.groups())
return height, width
raise ValueError("Failed to parse terminal pixel size response")
def get_terminal_size() -> tuple[int, int]:
"""Get (rows, cols) of the terminal"""
buf = array.array("H", [0, 0, 0, 0])
fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf)
rows, cols, width, height = buf
return (rows, cols)
def _serialize_gr_command(**cmd) -> bytes: def _serialize_gr_command(**cmd) -> bytes:
payload = cmd.pop("payload", None) payload = cmd.pop("payload", None)
cmd = ",".join(f"{k}={v}" for k, v in cmd.items()) cmd = ",".join(f"{k}={v}" for k, v in cmd.items())
@ -148,11 +221,33 @@ def _serialize_gr_command(**cmd) -> bytes:
return b"".join(ans) return b"".join(ans)
async def _write_chunked(**cmd) -> None: #async def _write_chunked(**cmd) -> None:
# if cmd["t"] == "s":
# data = standard_b64encode(bytes(cmd.pop("data"), "utf-8"))
# else:
# data = standard_b64encode(cmd.pop("data"))
# while data:
# chunk, data = data[:4096], data[4096:]
# m = 1 if data else 0
# payload = _serialize_gr_command(payload=chunk, m=m, **cmd)
# sys.stdout.buffer.write(payload)
# sys.stdout.flush()
# cmd.clear()
async def _write_chunked(wait_for_ok: bool = True, **cmd) -> None:
image_id = cmd['i']
if cmd["t"] == "s": if cmd["t"] == "s":
data = standard_b64encode(bytes(cmd.pop("data"), "utf-8")) data = standard_b64encode(bytes(cmd.pop("data"), "utf-8"))
else: else:
data = standard_b64encode(cmd.pop("data")) data = standard_b64encode(cmd.pop("data"))
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
tty.setraw(fd)
loop = asyncio.get_running_loop()
try:
while data: while data:
chunk, data = data[:4096], data[4096:] chunk, data = data[:4096], data[4096:]
m = 1 if data else 0 m = 1 if data else 0
@ -161,32 +256,24 @@ async def _write_chunked(**cmd) -> None:
sys.stdout.flush() sys.stdout.flush()
cmd.clear() cmd.clear()
if wait_for_ok:
# Wait for terminal to confirm OK response
response = os.read(fd, 1024).decode("utf-8", "ignore")
if not response.startswith(f"\033_Gi={image_id};OK"):
return
#raise RuntimeError(f"Unexpected response: {repr(response)}")
#return response.startswith("\033_GOK")
# def _read_ok():
# response = ""
# while not response.endswith("\033\\"):
# response += sys.stdin.read(1)
# return response
def hide_cursor() -> None: # response = await loop.run_in_executor(None, _read_ok)
"""Tell the terminal to hide the cursor.""" # if not response.startswith(f"\033_Gi={image_id};OK"):
_write_stdout("\x1b[?25l") # raise RuntimeError(f"Unexpected response: {repr(response)}")
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
def show_cursor() -> None:
"""Tell the terminal to show the cursor."""
_write_stdout("\x1b[?25h")
def set_position(y: int, x: int) -> None:
"""Set the cursor position to y, x"""
_write_stdout(f"\x1b[{y};{x}H")
async def get_position() -> tuple[int, int]:
"""Get the (y, x) position of the cursor"""
reply = await _query_terminal("\x1b[6n", "R")
match = re.search(r"\[(\d+);(\d+)R", reply)
if match:
y, x = map(int, match.groups())
return y, x
raise ValueError("Failed to parse cursor position response")
def _write_stdout(cmd: str) -> None: def _write_stdout(cmd: str) -> None:
"""Write a command string to stdout and flush.""" """Write a command string to stdout and flush."""
@ -194,46 +281,18 @@ def _write_stdout(cmd: str) -> None:
sys.stdout.flush() sys.stdout.flush()
async def _query_terminal(escape: str, endchar: str) -> str:
"""
Send `escape` to the terminal, read the response until
`endchar`, return response (including `endchar`)
"""
# Save the current terminal settings
loop = asyncio.get_running_loop()
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
def read_terminal_response() -> str:
tty.setraw(fd)
_write_stdout(escape)
response = ""
while True:
char = sys.stdin.read(1)
response += char
if char == endchar:
break
return response
try:
response = await loop.run_in_executor(None, read_terminal_response)
finally:
# Restore the terminal settings
termios.tcsetattr(fd, termios.TCSANOW, old_settings)
return response
async def main(): async def main():
rows, cols = get_terminal_size() term = Terminal()
v_pix, h_pix = await get_terminal_cell_size() rows, cols = term.get_terminal_size()
height, width = await get_terminal_size_pixel() v_pix, h_pix = await term.get_terminal_cell_size()
y, x = await get_position() height, width = await term.get_terminal_size_pixel()
y, x = await term.get_position()
print(f"Terminal has {rows} rows, {cols} cols = {rows * cols} cells") print(f"Terminal has {rows} rows, {cols} cols = {rows * cols} cells")
print(f"Cell size: {h_pix}x{v_pix} px") print(f"Cell size: {h_pix}x{v_pix} px")
print(f"Dimensions: {width}x{height} px") print(f"Dimensions: {width}x{height} px")
print("Cursor is at y, x:", y, x) print("Cursor is at y, x:", y, x)
print(f"Supports graphics: {supports_kitty_graphics()}") print(f"Supports graphics: {term.supports_kitty_graphics}")
if __name__ == "__main__": if __name__ == "__main__":

69
terminalio.py Normal file
View File

@ -0,0 +1,69 @@
import asyncio
import os
import re
import sys
import tty
import termios
class TerminalIO:
def __init__(self):
self.ok_event = asyncio.Event()
self.input_callbacks = []
self._reader_task = None
self._stop = False
self._buffer = ""
self._old_settings = None
def register_input_callback(self, callback):
"""Register a function that takes a single character input."""
self.input_callbacks.append(callback)
def start(self):
"""Start the stdin reading loop."""
if not sys.stdin.isatty():
raise RuntimeError("stdin is not a TTY")
self._old_settings = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
self._reader_task = asyncio.create_task(self._stdin_loop())
async def stop(self):
"""Stop the reader and restore terminal settings."""
self._stop = True
if self._reader_task:
await self._reader_task
if self._old_settings:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self._old_settings)
async def wait_for_ok(self, timeout=1.0):
"""Wait for a Kitty OK message (non-blocking)."""
try:
await asyncio.wait_for(self.ok_event.wait(), timeout)
except asyncio.TimeoutError:
raise RuntimeError("Timed out waiting for Kitty OK response")
self.ok_event.clear()
async def _stdin_loop(self):
loop = asyncio.get_running_loop()
fd = sys.stdin.fileno()
def read_byte():
return os.read(fd, 1).decode("utf-8", errors="ignore")
while not self._stop:
char = await loop.run_in_executor(None, read_byte)
self._buffer += char
if self._buffer.endswith("\033\\"):
#if "\033_GOK" in self._buffer:
if re.search('\033_G(i=\\d+;)?OK', self._buffer):
self.ok_event.set()
self._buffer = ""
continue
# Dispatch to key input handlers (non-escape)
if not char.startswith("\033"):
for callback in self.input_callbacks:
callback(char)
await asyncio.sleep(0.01)

View File

@ -29,12 +29,13 @@ class TerminalPlotter(pv.Plotter):
self.set_background([0.0, 0.0, 0.0]) self.set_background([0.0, 0.0, 0.0])
self.ren_win.SetAlphaBitPlanes(1) self.ren_win.SetAlphaBitPlanes(1)
self.ren_win.SetMultiSamples(0) self.ren_win.SetMultiSamples(0)
self.terminal = kitty.Terminal()
async def initialize(self): async def initialize(self):
h_pix, _ = await kitty.get_terminal_cell_size() h_pix, _ = await self.terminal.get_terminal_cell_size()
self.rows, _ = kitty.get_terminal_size() self.rows, _ = self.terminal.get_terminal_size()
self.start_y, self.start_x = await kitty.get_position() self.start_y, self.start_x = await self.terminal.get_position()
self.needed_lines = math.ceil(self.height / h_pix) self.needed_lines = math.ceil(self.height / h_pix)
# Add vertical space if needed # Add vertical space if needed
@ -42,10 +43,10 @@ class TerminalPlotter(pv.Plotter):
missing = self.needed_lines - (self.rows - self.start_y) missing = self.needed_lines - (self.rows - self.start_y)
self.start_y -= missing self.start_y -= missing
print("\n" * self.needed_lines, end="") print("\n" * self.needed_lines, end="")
kitty.set_position(self.start_y, self.start_x) self.terminal.set_position(self.start_y, self.start_x)
self.orbit = self.generate_orbital_path() self.orbit = self.generate_orbital_path()
async def _handle_key(self, c): def _handle_key(c):
if c == "a": if c == "a":
self.camera.Azimuth(10) self.camera.Azimuth(10)
elif c == "d": elif c == "d":
@ -60,23 +61,43 @@ class TerminalPlotter(pv.Plotter):
self.camera.zoom(0.9) self.camera.zoom(0.9)
elif c == "p": elif c == "p":
self.fly_to(self.orbit.points[15]) self.fly_to(self.orbit.points[15])
elif c == "q":
async def _input_loop(self):
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setcbreak(fd)
while self._running:
rlist, _, _ = select.select([sys.stdin], [], [], 0)
if rlist:
c = sys.stdin.read(1)
if c == "q": # quit on q
self._running = False self._running = False
else: self.terminal.terminal_io.register_input_callback(_handle_key)
await self._handle_key(c)
await asyncio.sleep(0.001)
finally: # async def _handle_key(self, c):
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) # if c == "a":
# self.camera.Azimuth(10)
# elif c == "d":
# self.camera.Azimuth(-10)
# elif c == "w":
# self.camera.Elevation(10)
# elif c == "s":
# self.camera.Elevation(-10)
# elif c == "+":
# self.camera.zoom(1.1)
# elif c == "-":
# self.camera.zoom(0.9)
# elif c == "p":
# self.fly_to(self.orbit.points[15])
#async def _input_loop(self):
# fd = sys.stdin.fileno()
# old_settings = termios.tcgetattr(fd)
# try:
# tty.setcbreak(fd)
# while self._running:
# rlist, _, _ = select.select([sys.stdin], [], [], 0)
# if rlist:
# c = sys.stdin.read(1)
# if c == "q": # quit on q
# self._running = False
# else:
# await self._handle_key(c)
# await asyncio.sleep(0.001)
# finally:
# termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
async def _render_loop(self): async def _render_loop(self):
import time import time
@ -93,22 +114,23 @@ class TerminalPlotter(pv.Plotter):
np_image = numpy_support.vtk_to_numpy(vtk_array).reshape(height, width, 4) np_image = numpy_support.vtk_to_numpy(vtk_array).reshape(height, width, 4)
np_image = np_image[::-1] # Flip vertically np_image = np_image[::-1] # Flip vertically
np_image = np.ascontiguousarray(np_image) # Ensure memory layout is C-contiguous np_image = np.ascontiguousarray(np_image) # Ensure memory layout is C-contiguous
print("Render & copy:", time.perf_counter() - start) #print("Render & copy:", time.perf_counter() - start)
#await kitty.draw_to_terminal(np_image, width, height, compress=True) #await self.terminal.draw_to_terminal(np_image, width, height, compress=True)
await kitty.draw_to_terminal(np_image, width, height, use_shm=True) await self.terminal.draw(np_image, width, height, use_shm=True)
print("Draw:", time.perf_counter() - start) #print("Draw:", time.perf_counter() - start)
kitty.set_position(self.start_y, self.start_x) self.terminal.set_position(self.start_y, self.start_x)
self.camera.Azimuth(1) self.camera.Azimuth(1)
async def run(self): async def run(self):
await self.initialize() await self.initialize()
self.iren.initialize() self.iren.initialize()
tasks = [ #tasks = [
asyncio.create_task(self._input_loop()), # #asyncio.create_task(self._input_loop()),
asyncio.create_task(self._render_loop()), # asyncio.create_task(self._render_loop()),
] #]
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) #await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
await self._render_loop()
self._running = False self._running = False
async def main(): async def main():