add shm support
This commit is contained in:
parent
cfe0227ca6
commit
cfeb818190
146
kitty.py
Executable file → Normal file
146
kitty.py
Executable file → Normal file
@ -5,14 +5,103 @@ import sys
|
||||
import termios
|
||||
import tty
|
||||
from base64 import standard_b64encode
|
||||
from io import BytesIO
|
||||
import array
|
||||
import fcntl
|
||||
|
||||
import zlib
|
||||
|
||||
async def draw_to_terminal(buffer: BytesIO) -> None:
|
||||
"""Display a PNG image in the terminal."""
|
||||
await _write_chunked(a="T", i=1, f=100, q=2, data=buffer)
|
||||
|
||||
import mmap
|
||||
import os
|
||||
import select
|
||||
from enum import Enum
|
||||
|
||||
SHM_NAME = "/kitty-shm-frame"
|
||||
|
||||
|
||||
class Kitty_Format(Enum):
|
||||
RGB = 24
|
||||
RGBA = 32
|
||||
PNG = 100
|
||||
|
||||
|
||||
async def draw_to_terminal(
|
||||
buffer,
|
||||
width: int | None,
|
||||
height: int | None,
|
||||
pixel_format: Kitty_Format = Kitty_Format.RGBA,
|
||||
image_num: int = 1,
|
||||
compress=False,
|
||||
use_shm: bool = False,
|
||||
) -> None:
|
||||
"""Display an image in the terminal."""
|
||||
if (pixel_format != Kitty_Format.PNG or use_shm) and (width is None or height is None):
|
||||
raise ValueError(
|
||||
"shm transfer or using image formats other than PNG and require height and width"
|
||||
)
|
||||
|
||||
kwargs = {"a": "T", "i": image_num, "f": pixel_format.value, "t": "s" if use_shm else "d", "q": 2}
|
||||
|
||||
if width:
|
||||
kwargs["s"] = width
|
||||
if height:
|
||||
kwargs["v"] = height
|
||||
|
||||
if compress:
|
||||
kwargs["o"] = "z"
|
||||
data = zlib.compress(buffer)
|
||||
else:
|
||||
data = buffer
|
||||
|
||||
if use_shm:
|
||||
# write to shm
|
||||
_mmap(data, width, height)
|
||||
# set shm name as payload
|
||||
data = SHM_NAME
|
||||
|
||||
await _write_chunked(**kwargs, data=data)
|
||||
await asyncio.sleep(0.001)
|
||||
|
||||
|
||||
def _mmap(np_image, width: int, height: int) -> None:
|
||||
"""Write image data to shared memory"""
|
||||
shm_size = width * height * 4 # RGBA
|
||||
|
||||
fd = os.open(f"/dev/shm{SHM_NAME}", os.O_CREAT | os.O_RDWR)
|
||||
os.ftruncate(fd, shm_size)
|
||||
shm = mmap.mmap(fd, shm_size, mmap.MAP_SHARED, mmap.PROT_WRITE)
|
||||
|
||||
# write to shared memory
|
||||
shm.seek(0)
|
||||
shm.write(np_image.tobytes())
|
||||
shm.flush()
|
||||
|
||||
|
||||
def supports_kitty_graphics(timeout=0.1) -> bool:
|
||||
"""Check if the terminal has support for the graphics protocol."""
|
||||
if not sys.stdout.isatty():
|
||||
return False
|
||||
|
||||
fd = sys.stdin.fileno()
|
||||
old_settings = termios.tcgetattr(fd)
|
||||
try:
|
||||
# Set terminal to raw mode
|
||||
tty.setraw(fd)
|
||||
# Send Kitty graphics query escape code
|
||||
sys.stdout.write("\033_Gs=1,v=1,a=q,t=d,f=24;AAAA\033\\")
|
||||
sys.stdout.flush()
|
||||
|
||||
# Wait for response with timeout
|
||||
rlist, _, _ = select.select([fd], [], [], timeout)
|
||||
if not rlist:
|
||||
return False
|
||||
|
||||
# Read response
|
||||
response = os.read(fd, 1024).decode("utf-8", "ignore")
|
||||
return response.startswith("\033_GOK")
|
||||
finally:
|
||||
# Restore terminal settings
|
||||
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
|
||||
|
||||
|
||||
async def get_terminal_cell_size() -> tuple[int, int]:
|
||||
@ -60,14 +149,16 @@ def _serialize_gr_command(**cmd) -> bytes:
|
||||
|
||||
|
||||
async def _write_chunked(**cmd) -> None:
|
||||
data = standard_b64encode(cmd.pop("data"))
|
||||
loop = asyncio.get_running_loop()
|
||||
if cmd["t"] == "s":
|
||||
data = standard_b64encode(bytes(cmd.pop("data"), "utf-8"))
|
||||
else:
|
||||
data = standard_b64encode(cmd.pop("data"))
|
||||
while data:
|
||||
chunk, data = data[:4096], data[4096:]
|
||||
m = 1 if data else 0
|
||||
payload = _serialize_gr_command(payload=chunk, m=m, **cmd)
|
||||
await loop.run_in_executor(None, sys.stdout.buffer.write, payload)
|
||||
await loop.run_in_executor(None, sys.stdout.flush)
|
||||
sys.stdout.buffer.write(payload)
|
||||
sys.stdout.flush()
|
||||
cmd.clear()
|
||||
|
||||
|
||||
@ -86,7 +177,19 @@ def set_position(y: int, x: int) -> None:
|
||||
_write_stdout(f"\x1b[{y};{x}H")
|
||||
|
||||
|
||||
async def get_position() -> tuple[int, int]:
|
||||
"""Get the (y, x) position of the cursor"""
|
||||
reply = await _query_terminal("\x1b[6n", "R")
|
||||
|
||||
match = re.search(r"\[(\d+);(\d+)R", reply)
|
||||
if match:
|
||||
y, x = map(int, match.groups())
|
||||
return y, x
|
||||
raise ValueError("Failed to parse cursor position response")
|
||||
|
||||
|
||||
def _write_stdout(cmd: str) -> None:
|
||||
"""Write a command string to stdout and flush."""
|
||||
sys.stdout.write(cmd)
|
||||
sys.stdout.flush()
|
||||
|
||||
@ -120,43 +223,18 @@ async def _query_terminal(escape: str, endchar: str) -> str:
|
||||
return response
|
||||
|
||||
|
||||
async def get_position() -> tuple[int, int]:
|
||||
"""Get the (y, x) position of the cursor"""
|
||||
reply = await _query_terminal("\x1b[6n", "R")
|
||||
|
||||
match = re.search(r"\[(\d+);(\d+)R", reply)
|
||||
if match:
|
||||
y, x = map(int, match.groups())
|
||||
return y, x
|
||||
raise ValueError("Failed to parse cursor position response")
|
||||
|
||||
|
||||
async def main():
|
||||
import pyvista as pv
|
||||
|
||||
rows, cols = get_terminal_size()
|
||||
v_pix, h_pix = await get_terminal_cell_size()
|
||||
height, width = await get_terminal_size_pixel()
|
||||
y, x = await get_position()
|
||||
|
||||
y, x = await get_position()
|
||||
print(f"Terminal has {rows} rows, {cols} cols = {rows * cols} cells")
|
||||
print(f"Cell size: {h_pix}x{v_pix} px")
|
||||
print(f"Dimensions: {width}x{height} px")
|
||||
print("Cursor is at y, x:", y, x)
|
||||
print(f"Supports graphics: {supports_kitty_graphics()}")
|
||||
|
||||
new_lines = int((height / v_pix) - 6)
|
||||
print("\n" * new_lines, end="")
|
||||
|
||||
set_position(y - new_lines - 6, x)
|
||||
|
||||
s = pv.Sphere()
|
||||
pl = pv.Plotter(off_screen=True)
|
||||
pl.add_mesh(s)
|
||||
b = BytesIO()
|
||||
pl.screenshot(b, transparent_background=True, window_size=(width, height))
|
||||
|
||||
await draw_to_terminal(b)
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
|
Loading…
Reference in New Issue
Block a user