300 lines
9.2 KiB
Python
300 lines
9.2 KiB
Python
#!/usr/bin/env python3
|
|
from base64 import standard_b64encode
|
|
from enum import Enum
|
|
import array
|
|
import asyncio
|
|
import fcntl
|
|
import mmap
|
|
import os
|
|
import re
|
|
import select
|
|
import sys
|
|
import termios
|
|
import tty
|
|
import zlib
|
|
|
|
from terminalio import TerminalIO
|
|
|
|
SHM_NAME = "/kitty-shm-frame"
|
|
|
|
|
|
class Kitty_Format(Enum):
|
|
RGB = 24
|
|
RGBA = 32
|
|
PNG = 100
|
|
|
|
class TerminalProtocolError(Exception):
|
|
pass
|
|
|
|
|
|
class Terminal():
|
|
def __init__(self):
|
|
self.terminal_io = TerminalIO()
|
|
self.terminal_io.start()
|
|
self.supports_kitty_graphics = supports_kitty_graphics()
|
|
|
|
def get_terminal_size(self) -> tuple[int, int]:
|
|
"""Get (rows, cols) of the terminal"""
|
|
buf = array.array("H", [0, 0, 0, 0])
|
|
fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf)
|
|
rows, cols, width, height = buf
|
|
return (rows, cols)
|
|
|
|
async def get_terminal_cell_size(self) -> tuple[int, int]:
|
|
"""Get (height, width) of a single cell in px"""
|
|
reply = await self._query_terminal("\x1b[16t", "t")
|
|
match = re.search(r"\[6;(\d+);(\d+)t", reply)
|
|
if match:
|
|
v_pix, h_pix = map(int, match.groups())
|
|
return v_pix, h_pix
|
|
print(reply)
|
|
raise ValueError("Failed to parse terminal cell size response")
|
|
|
|
|
|
async def get_terminal_size_pixel(self) -> tuple[int, int]:
|
|
"""Get (height, width) of the terminal in px"""
|
|
reply = await self._query_terminal("\x1b[14t", "t")
|
|
match = re.search(r"\[4;(\d+);(\d+)t", reply)
|
|
if match:
|
|
height, width = map(int, match.groups())
|
|
return height, width
|
|
raise ValueError("Failed to parse terminal pixel size response")
|
|
|
|
def hide_cursor(self) -> None:
|
|
"""Tell the terminal to hide the cursor."""
|
|
_write_stdout("\x1b[?25l")
|
|
|
|
|
|
def show_cursor(self) -> None:
|
|
"""Tell the terminal to show the cursor."""
|
|
_write_stdout("\x1b[?25h")
|
|
|
|
|
|
def set_position(self, y: int, x: int) -> None:
|
|
"""Set the cursor position to y, x"""
|
|
_write_stdout(f"\x1b[{y};{x}H")
|
|
|
|
async def get_position(self) -> tuple[int, int]:
|
|
"""Get the (y, x) position of the cursor"""
|
|
reply = await self._query_terminal("\x1b[6n", "R")
|
|
|
|
match = re.search(r"\[(\d+);(\d+)R", reply)
|
|
if match:
|
|
y, x = map(int, match.groups())
|
|
return y, x
|
|
raise ValueError("Failed to parse cursor position response")
|
|
|
|
async def draw(
|
|
self,
|
|
buffer,
|
|
width: int | None,
|
|
height: int | None,
|
|
pixel_format: Kitty_Format = Kitty_Format.RGBA,
|
|
image_num: int = 1,
|
|
compress=False,
|
|
use_shm: bool = False,
|
|
) -> None:
|
|
"""Display an image in the terminal."""
|
|
if (pixel_format != Kitty_Format.PNG or use_shm) and (width is None or height is None):
|
|
raise ValueError(
|
|
"shm transfer or using image formats other than PNG and require height and width"
|
|
)
|
|
|
|
kwargs = {"a": "T", "i": image_num, "f": pixel_format.value, "t": "s" if use_shm else "d"}
|
|
|
|
if width:
|
|
kwargs["s"] = width
|
|
if height:
|
|
kwargs["v"] = height
|
|
|
|
if compress:
|
|
kwargs["o"] = "z"
|
|
data = zlib.compress(buffer)
|
|
else:
|
|
data = buffer
|
|
|
|
if use_shm:
|
|
# write to shm
|
|
_mmap(data, width, height)
|
|
# set shm name as payload
|
|
data = SHM_NAME
|
|
|
|
await _write_chunked(**kwargs, data=data, wait_for_ok=True if use_shm else False)
|
|
await asyncio.sleep(0.001)
|
|
|
|
async def _query_terminal(self, escape: str, endchar: str) -> str:
|
|
"""
|
|
Send `escape` to the terminal, read the response until
|
|
`endchar`, return response (including `endchar`)
|
|
"""
|
|
loop = asyncio.get_running_loop()
|
|
fd = sys.stdin.fileno()
|
|
old_settings = termios.tcgetattr(fd)
|
|
|
|
def read_terminal_response() -> str:
|
|
tty.setraw(fd)
|
|
_write_stdout(escape)
|
|
response = ""
|
|
while True:
|
|
char = sys.stdin.read(1)
|
|
response += char
|
|
if char == endchar:
|
|
break
|
|
return response
|
|
|
|
try:
|
|
response = await loop.run_in_executor(None, read_terminal_response)
|
|
finally:
|
|
# Restore the terminal settings
|
|
termios.tcsetattr(fd, termios.TCSANOW, old_settings)
|
|
return response
|
|
|
|
async def _write_chunked(self, wait_for_ok: bool = True, **cmd) -> None:
|
|
image_id = cmd['i']
|
|
if cmd["t"] == "s":
|
|
data = standard_b64encode(bytes(cmd.pop("data"), "utf-8"))
|
|
else:
|
|
data = standard_b64encode(cmd.pop("data"))
|
|
while data:
|
|
chunk, data = data[:4096], data[4096:]
|
|
m = 1 if data else 0
|
|
payload = _serialize_gr_command(payload=chunk, m=m, **cmd)
|
|
sys.stdout.buffer.write(payload)
|
|
sys.stdout.flush()
|
|
cmd.clear()
|
|
|
|
if wait_for_ok:
|
|
# Wait for terminal to confirm OK response
|
|
await self.terminal_io.wait_for_ok()
|
|
|
|
def _mmap(np_image, width: int, height: int) -> None:
|
|
"""Write image data to shared memory"""
|
|
shm_size = width * height * 4 # RGBA
|
|
|
|
fd = os.open(f"/dev/shm{SHM_NAME}", os.O_CREAT | os.O_RDWR)
|
|
os.ftruncate(fd, shm_size)
|
|
shm = mmap.mmap(fd, shm_size, mmap.MAP_SHARED, mmap.PROT_WRITE)
|
|
|
|
# write to shared memory
|
|
shm.seek(0)
|
|
shm.write(np_image.tobytes())
|
|
shm.flush()
|
|
|
|
|
|
def supports_kitty_graphics(timeout=0.1) -> bool:
|
|
"""Check if the terminal has support for the graphics protocol."""
|
|
if not sys.stdout.isatty():
|
|
return False
|
|
|
|
fd = sys.stdin.fileno()
|
|
old_settings = termios.tcgetattr(fd)
|
|
try:
|
|
# Set terminal to raw mode
|
|
tty.setraw(fd)
|
|
# Send Kitty graphics query escape code
|
|
sys.stdout.write("\033_Gs=1,v=1,a=q,t=d,f=24;AAAA\033\\")
|
|
sys.stdout.flush()
|
|
|
|
# Wait for response with timeout
|
|
rlist, _, _ = select.select([fd], [], [], timeout)
|
|
if not rlist:
|
|
return False
|
|
|
|
# Read response
|
|
response = os.read(fd, 1024).decode("utf-8", "ignore")
|
|
return response.startswith("\033_GOK")
|
|
finally:
|
|
# Restore terminal settings
|
|
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
|
|
|
|
|
|
def _serialize_gr_command(**cmd) -> bytes:
|
|
payload = cmd.pop("payload", None)
|
|
cmd = ",".join(f"{k}={v}" for k, v in cmd.items())
|
|
ans = []
|
|
w = ans.append
|
|
w(b"\033_G"), w(cmd.encode("ascii"))
|
|
if payload:
|
|
w(b";")
|
|
w(payload)
|
|
w(b"\033\\")
|
|
return b"".join(ans)
|
|
|
|
|
|
#async def _write_chunked(**cmd) -> None:
|
|
# if cmd["t"] == "s":
|
|
# data = standard_b64encode(bytes(cmd.pop("data"), "utf-8"))
|
|
# else:
|
|
# data = standard_b64encode(cmd.pop("data"))
|
|
# while data:
|
|
# chunk, data = data[:4096], data[4096:]
|
|
# m = 1 if data else 0
|
|
# payload = _serialize_gr_command(payload=chunk, m=m, **cmd)
|
|
# sys.stdout.buffer.write(payload)
|
|
# sys.stdout.flush()
|
|
# cmd.clear()
|
|
|
|
async def _write_chunked(wait_for_ok: bool = True, **cmd) -> None:
|
|
image_id = cmd['i']
|
|
if cmd["t"] == "s":
|
|
data = standard_b64encode(bytes(cmd.pop("data"), "utf-8"))
|
|
else:
|
|
data = standard_b64encode(cmd.pop("data"))
|
|
|
|
fd = sys.stdin.fileno()
|
|
old_settings = termios.tcgetattr(fd)
|
|
tty.setraw(fd)
|
|
|
|
loop = asyncio.get_running_loop()
|
|
|
|
try:
|
|
while data:
|
|
chunk, data = data[:4096], data[4096:]
|
|
m = 1 if data else 0
|
|
payload = _serialize_gr_command(payload=chunk, m=m, **cmd)
|
|
sys.stdout.buffer.write(payload)
|
|
sys.stdout.flush()
|
|
cmd.clear()
|
|
|
|
if wait_for_ok:
|
|
# Wait for terminal to confirm OK response
|
|
response = os.read(fd, 1024).decode("utf-8", "ignore")
|
|
if not response.startswith(f"\033_Gi={image_id};OK"):
|
|
return
|
|
#raise RuntimeError(f"Unexpected response: {repr(response)}")
|
|
#return response.startswith("\033_GOK")
|
|
# def _read_ok():
|
|
# response = ""
|
|
# while not response.endswith("\033\\"):
|
|
# response += sys.stdin.read(1)
|
|
# return response
|
|
|
|
# response = await loop.run_in_executor(None, _read_ok)
|
|
# if not response.startswith(f"\033_Gi={image_id};OK"):
|
|
# raise RuntimeError(f"Unexpected response: {repr(response)}")
|
|
finally:
|
|
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
|
|
|
|
def _write_stdout(cmd: str) -> None:
|
|
"""Write a command string to stdout and flush."""
|
|
sys.stdout.write(cmd)
|
|
sys.stdout.flush()
|
|
|
|
|
|
async def main():
|
|
term = Terminal()
|
|
rows, cols = term.get_terminal_size()
|
|
v_pix, h_pix = await term.get_terminal_cell_size()
|
|
height, width = await term.get_terminal_size_pixel()
|
|
y, x = await term.get_position()
|
|
|
|
print(f"Terminal has {rows} rows, {cols} cols = {rows * cols} cells")
|
|
print(f"Cell size: {h_pix}x{v_pix} px")
|
|
print(f"Dimensions: {width}x{height} px")
|
|
print("Cursor is at y, x:", y, x)
|
|
print(f"Supports graphics: {term.supports_kitty_graphics}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|