Compare commits
1 Commits
terminal-c
...
master
Author | SHA1 | Date | |
---|---|---|---|
cdf71a3f06 |
329
kitty.py
329
kitty.py
@ -1,19 +1,20 @@
|
||||
#!/usr/bin/env python3
|
||||
from base64 import standard_b64encode
|
||||
from enum import Enum
|
||||
import array
|
||||
import asyncio
|
||||
import fcntl
|
||||
import mmap
|
||||
import os
|
||||
import re
|
||||
import select
|
||||
import sys
|
||||
import termios
|
||||
import tty
|
||||
from base64 import standard_b64encode
|
||||
import array
|
||||
import fcntl
|
||||
|
||||
import zlib
|
||||
|
||||
from terminalio import TerminalIO
|
||||
|
||||
import mmap
|
||||
import os
|
||||
import select
|
||||
from enum import Enum
|
||||
|
||||
SHM_NAME = "/kitty-shm-frame"
|
||||
|
||||
@ -23,149 +24,44 @@ class Kitty_Format(Enum):
|
||||
RGBA = 32
|
||||
PNG = 100
|
||||
|
||||
class TerminalProtocolError(Exception):
|
||||
pass
|
||||
|
||||
async def draw_to_terminal(
|
||||
buffer,
|
||||
width: int | None,
|
||||
height: int | None,
|
||||
pixel_format: Kitty_Format = Kitty_Format.RGBA,
|
||||
image_num: int = 1,
|
||||
compress=False,
|
||||
use_shm: bool = False,
|
||||
) -> None:
|
||||
"""Display an image in the terminal."""
|
||||
if (pixel_format != Kitty_Format.PNG or use_shm) and (width is None or height is None):
|
||||
raise ValueError(
|
||||
"shm transfer or using image formats other than PNG and require height and width"
|
||||
)
|
||||
|
||||
class Terminal():
|
||||
def __init__(self):
|
||||
self.terminal_io = TerminalIO()
|
||||
self.terminal_io.start()
|
||||
self.supports_kitty_graphics = supports_kitty_graphics()
|
||||
kwargs = {"a": "T", "i": image_num, "f": pixel_format.value, "t": "s" if use_shm else "d", "q": 2}
|
||||
|
||||
def get_terminal_size(self) -> tuple[int, int]:
|
||||
"""Get (rows, cols) of the terminal"""
|
||||
buf = array.array("H", [0, 0, 0, 0])
|
||||
fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf)
|
||||
rows, cols, width, height = buf
|
||||
return (rows, cols)
|
||||
if width:
|
||||
kwargs["s"] = width
|
||||
if height:
|
||||
kwargs["v"] = height
|
||||
|
||||
async def get_terminal_cell_size(self) -> tuple[int, int]:
|
||||
"""Get (height, width) of a single cell in px"""
|
||||
reply = await self._query_terminal("\x1b[16t", "t")
|
||||
match = re.search(r"\[6;(\d+);(\d+)t", reply)
|
||||
if match:
|
||||
v_pix, h_pix = map(int, match.groups())
|
||||
return v_pix, h_pix
|
||||
print(reply)
|
||||
raise ValueError("Failed to parse terminal cell size response")
|
||||
if compress:
|
||||
kwargs["o"] = "z"
|
||||
data = zlib.compress(buffer)
|
||||
else:
|
||||
data = buffer
|
||||
|
||||
if use_shm:
|
||||
# write to shm
|
||||
_mmap(data, width, height)
|
||||
# set shm name as payload
|
||||
data = SHM_NAME
|
||||
|
||||
async def get_terminal_size_pixel(self) -> tuple[int, int]:
|
||||
"""Get (height, width) of the terminal in px"""
|
||||
reply = await self._query_terminal("\x1b[14t", "t")
|
||||
match = re.search(r"\[4;(\d+);(\d+)t", reply)
|
||||
if match:
|
||||
height, width = map(int, match.groups())
|
||||
return height, width
|
||||
raise ValueError("Failed to parse terminal pixel size response")
|
||||
await _write_chunked(**kwargs, data=data)
|
||||
await asyncio.sleep(0.001)
|
||||
|
||||
def hide_cursor(self) -> None:
|
||||
"""Tell the terminal to hide the cursor."""
|
||||
_write_stdout("\x1b[?25l")
|
||||
|
||||
|
||||
def show_cursor(self) -> None:
|
||||
"""Tell the terminal to show the cursor."""
|
||||
_write_stdout("\x1b[?25h")
|
||||
|
||||
|
||||
def set_position(self, y: int, x: int) -> None:
|
||||
"""Set the cursor position to y, x"""
|
||||
_write_stdout(f"\x1b[{y};{x}H")
|
||||
|
||||
async def get_position(self) -> tuple[int, int]:
|
||||
"""Get the (y, x) position of the cursor"""
|
||||
reply = await self._query_terminal("\x1b[6n", "R")
|
||||
|
||||
match = re.search(r"\[(\d+);(\d+)R", reply)
|
||||
if match:
|
||||
y, x = map(int, match.groups())
|
||||
return y, x
|
||||
raise ValueError("Failed to parse cursor position response")
|
||||
|
||||
async def draw(
|
||||
self,
|
||||
buffer,
|
||||
width: int | None,
|
||||
height: int | None,
|
||||
pixel_format: Kitty_Format = Kitty_Format.RGBA,
|
||||
image_num: int = 1,
|
||||
compress=False,
|
||||
use_shm: bool = False,
|
||||
) -> None:
|
||||
"""Display an image in the terminal."""
|
||||
if (pixel_format != Kitty_Format.PNG or use_shm) and (width is None or height is None):
|
||||
raise ValueError(
|
||||
"shm transfer or using image formats other than PNG and require height and width"
|
||||
)
|
||||
|
||||
kwargs = {"a": "T", "i": image_num, "f": pixel_format.value, "t": "s" if use_shm else "d"}
|
||||
|
||||
if width:
|
||||
kwargs["s"] = width
|
||||
if height:
|
||||
kwargs["v"] = height
|
||||
|
||||
if compress:
|
||||
kwargs["o"] = "z"
|
||||
data = zlib.compress(buffer)
|
||||
else:
|
||||
data = buffer
|
||||
|
||||
if use_shm:
|
||||
# write to shm
|
||||
_mmap(data, width, height)
|
||||
# set shm name as payload
|
||||
data = SHM_NAME
|
||||
|
||||
await _write_chunked(**kwargs, data=data, wait_for_ok=True if use_shm else False)
|
||||
await asyncio.sleep(0.001)
|
||||
|
||||
async def _query_terminal(self, escape: str, endchar: str) -> str:
|
||||
"""
|
||||
Send `escape` to the terminal, read the response until
|
||||
`endchar`, return response (including `endchar`)
|
||||
"""
|
||||
loop = asyncio.get_running_loop()
|
||||
fd = sys.stdin.fileno()
|
||||
old_settings = termios.tcgetattr(fd)
|
||||
|
||||
def read_terminal_response() -> str:
|
||||
tty.setraw(fd)
|
||||
_write_stdout(escape)
|
||||
response = ""
|
||||
while True:
|
||||
char = sys.stdin.read(1)
|
||||
response += char
|
||||
if char == endchar:
|
||||
break
|
||||
return response
|
||||
|
||||
try:
|
||||
response = await loop.run_in_executor(None, read_terminal_response)
|
||||
finally:
|
||||
# Restore the terminal settings
|
||||
termios.tcsetattr(fd, termios.TCSANOW, old_settings)
|
||||
return response
|
||||
|
||||
async def _write_chunked(self, wait_for_ok: bool = True, **cmd) -> None:
|
||||
image_id = cmd['i']
|
||||
if cmd["t"] == "s":
|
||||
data = standard_b64encode(bytes(cmd.pop("data"), "utf-8"))
|
||||
else:
|
||||
data = standard_b64encode(cmd.pop("data"))
|
||||
while data:
|
||||
chunk, data = data[:4096], data[4096:]
|
||||
m = 1 if data else 0
|
||||
payload = _serialize_gr_command(payload=chunk, m=m, **cmd)
|
||||
sys.stdout.buffer.write(payload)
|
||||
sys.stdout.flush()
|
||||
cmd.clear()
|
||||
|
||||
if wait_for_ok:
|
||||
# Wait for terminal to confirm OK response
|
||||
await self.terminal_io.wait_for_ok()
|
||||
|
||||
def _mmap(np_image, width: int, height: int) -> None:
|
||||
"""Write image data to shared memory"""
|
||||
@ -208,6 +104,37 @@ def supports_kitty_graphics(timeout=0.1) -> bool:
|
||||
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
|
||||
|
||||
|
||||
async def get_terminal_cell_size() -> tuple[int, int]:
|
||||
"""Get (height, width) of a single cell in px"""
|
||||
reply = await _query_terminal("\x1b[16t", "t")
|
||||
|
||||
match = re.search(r"\[6;(\d+);(\d+)t", reply)
|
||||
if match:
|
||||
v_pix, h_pix = map(int, match.groups())
|
||||
return v_pix, h_pix
|
||||
print(reply)
|
||||
raise ValueError("Failed to parse terminal cell size response")
|
||||
|
||||
|
||||
async def get_terminal_size_pixel() -> tuple[int, int]:
|
||||
"""Get (height, width) of the terminal in px"""
|
||||
reply = await _query_terminal("\x1b[14t", "t")
|
||||
|
||||
match = re.search(r"\[4;(\d+);(\d+)t", reply)
|
||||
if match:
|
||||
height, width = map(int, match.groups())
|
||||
return height, width
|
||||
raise ValueError("Failed to parse terminal pixel size response")
|
||||
|
||||
|
||||
def get_terminal_size() -> tuple[int, int]:
|
||||
"""Get (rows, cols) of the terminal"""
|
||||
buf = array.array("H", [0, 0, 0, 0])
|
||||
fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf)
|
||||
rows, cols, width, height = buf
|
||||
return (rows, cols)
|
||||
|
||||
|
||||
def _serialize_gr_command(**cmd) -> bytes:
|
||||
payload = cmd.pop("payload", None)
|
||||
cmd = ",".join(f"{k}={v}" for k, v in cmd.items())
|
||||
@ -221,59 +148,45 @@ def _serialize_gr_command(**cmd) -> bytes:
|
||||
return b"".join(ans)
|
||||
|
||||
|
||||
#async def _write_chunked(**cmd) -> None:
|
||||
# if cmd["t"] == "s":
|
||||
# data = standard_b64encode(bytes(cmd.pop("data"), "utf-8"))
|
||||
# else:
|
||||
# data = standard_b64encode(cmd.pop("data"))
|
||||
# while data:
|
||||
# chunk, data = data[:4096], data[4096:]
|
||||
# m = 1 if data else 0
|
||||
# payload = _serialize_gr_command(payload=chunk, m=m, **cmd)
|
||||
# sys.stdout.buffer.write(payload)
|
||||
# sys.stdout.flush()
|
||||
# cmd.clear()
|
||||
|
||||
async def _write_chunked(wait_for_ok: bool = True, **cmd) -> None:
|
||||
image_id = cmd['i']
|
||||
async def _write_chunked(**cmd) -> None:
|
||||
if cmd["t"] == "s":
|
||||
data = standard_b64encode(bytes(cmd.pop("data"), "utf-8"))
|
||||
else:
|
||||
data = standard_b64encode(cmd.pop("data"))
|
||||
while data:
|
||||
chunk, data = data[:4096], data[4096:]
|
||||
m = 1 if data else 0
|
||||
payload = _serialize_gr_command(payload=chunk, m=m, **cmd)
|
||||
sys.stdout.buffer.write(payload)
|
||||
sys.stdout.flush()
|
||||
cmd.clear()
|
||||
|
||||
fd = sys.stdin.fileno()
|
||||
old_settings = termios.tcgetattr(fd)
|
||||
tty.setraw(fd)
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
def hide_cursor() -> None:
|
||||
"""Tell the terminal to hide the cursor."""
|
||||
_write_stdout("\x1b[?25l")
|
||||
|
||||
try:
|
||||
while data:
|
||||
chunk, data = data[:4096], data[4096:]
|
||||
m = 1 if data else 0
|
||||
payload = _serialize_gr_command(payload=chunk, m=m, **cmd)
|
||||
sys.stdout.buffer.write(payload)
|
||||
sys.stdout.flush()
|
||||
cmd.clear()
|
||||
|
||||
if wait_for_ok:
|
||||
# Wait for terminal to confirm OK response
|
||||
response = os.read(fd, 1024).decode("utf-8", "ignore")
|
||||
if not response.startswith(f"\033_Gi={image_id};OK"):
|
||||
return
|
||||
#raise RuntimeError(f"Unexpected response: {repr(response)}")
|
||||
#return response.startswith("\033_GOK")
|
||||
# def _read_ok():
|
||||
# response = ""
|
||||
# while not response.endswith("\033\\"):
|
||||
# response += sys.stdin.read(1)
|
||||
# return response
|
||||
def show_cursor() -> None:
|
||||
"""Tell the terminal to show the cursor."""
|
||||
_write_stdout("\x1b[?25h")
|
||||
|
||||
|
||||
def set_position(y: int, x: int) -> None:
|
||||
"""Set the cursor position to y, x"""
|
||||
_write_stdout(f"\x1b[{y};{x}H")
|
||||
|
||||
|
||||
async def get_position() -> tuple[int, int]:
|
||||
"""Get the (y, x) position of the cursor"""
|
||||
reply = await _query_terminal("\x1b[6n", "R")
|
||||
|
||||
match = re.search(r"\[(\d+);(\d+)R", reply)
|
||||
if match:
|
||||
y, x = map(int, match.groups())
|
||||
return y, x
|
||||
raise ValueError("Failed to parse cursor position response")
|
||||
|
||||
# response = await loop.run_in_executor(None, _read_ok)
|
||||
# if not response.startswith(f"\033_Gi={image_id};OK"):
|
||||
# raise RuntimeError(f"Unexpected response: {repr(response)}")
|
||||
finally:
|
||||
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
|
||||
|
||||
def _write_stdout(cmd: str) -> None:
|
||||
"""Write a command string to stdout and flush."""
|
||||
@ -281,18 +194,46 @@ def _write_stdout(cmd: str) -> None:
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
async def _query_terminal(escape: str, endchar: str) -> str:
|
||||
"""
|
||||
Send `escape` to the terminal, read the response until
|
||||
`endchar`, return response (including `endchar`)
|
||||
"""
|
||||
# Save the current terminal settings
|
||||
loop = asyncio.get_running_loop()
|
||||
fd = sys.stdin.fileno()
|
||||
old_settings = termios.tcgetattr(fd)
|
||||
|
||||
def read_terminal_response() -> str:
|
||||
tty.setraw(fd)
|
||||
_write_stdout(escape)
|
||||
response = ""
|
||||
while True:
|
||||
char = sys.stdin.read(1)
|
||||
response += char
|
||||
if char == endchar:
|
||||
break
|
||||
return response
|
||||
|
||||
try:
|
||||
response = await loop.run_in_executor(None, read_terminal_response)
|
||||
finally:
|
||||
# Restore the terminal settings
|
||||
termios.tcsetattr(fd, termios.TCSANOW, old_settings)
|
||||
return response
|
||||
|
||||
|
||||
async def main():
|
||||
term = Terminal()
|
||||
rows, cols = term.get_terminal_size()
|
||||
v_pix, h_pix = await term.get_terminal_cell_size()
|
||||
height, width = await term.get_terminal_size_pixel()
|
||||
y, x = await term.get_position()
|
||||
rows, cols = get_terminal_size()
|
||||
v_pix, h_pix = await get_terminal_cell_size()
|
||||
height, width = await get_terminal_size_pixel()
|
||||
y, x = await get_position()
|
||||
|
||||
print(f"Terminal has {rows} rows, {cols} cols = {rows * cols} cells")
|
||||
print(f"Cell size: {h_pix}x{v_pix} px")
|
||||
print(f"Dimensions: {width}x{height} px")
|
||||
print("Cursor is at y, x:", y, x)
|
||||
print(f"Supports graphics: {term.supports_kitty_graphics}")
|
||||
print(f"Supports graphics: {supports_kitty_graphics()}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -1,69 +0,0 @@
|
||||
import asyncio
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import tty
|
||||
import termios
|
||||
|
||||
class TerminalIO:
|
||||
def __init__(self):
|
||||
self.ok_event = asyncio.Event()
|
||||
self.input_callbacks = []
|
||||
self._reader_task = None
|
||||
self._stop = False
|
||||
self._buffer = ""
|
||||
self._old_settings = None
|
||||
|
||||
def register_input_callback(self, callback):
|
||||
"""Register a function that takes a single character input."""
|
||||
self.input_callbacks.append(callback)
|
||||
|
||||
def start(self):
|
||||
"""Start the stdin reading loop."""
|
||||
if not sys.stdin.isatty():
|
||||
raise RuntimeError("stdin is not a TTY")
|
||||
|
||||
self._old_settings = termios.tcgetattr(sys.stdin)
|
||||
tty.setraw(sys.stdin.fileno())
|
||||
self._reader_task = asyncio.create_task(self._stdin_loop())
|
||||
|
||||
async def stop(self):
|
||||
"""Stop the reader and restore terminal settings."""
|
||||
self._stop = True
|
||||
if self._reader_task:
|
||||
await self._reader_task
|
||||
if self._old_settings:
|
||||
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self._old_settings)
|
||||
|
||||
async def wait_for_ok(self, timeout=1.0):
|
||||
"""Wait for a Kitty OK message (non-blocking)."""
|
||||
try:
|
||||
await asyncio.wait_for(self.ok_event.wait(), timeout)
|
||||
except asyncio.TimeoutError:
|
||||
raise RuntimeError("Timed out waiting for Kitty OK response")
|
||||
self.ok_event.clear()
|
||||
|
||||
async def _stdin_loop(self):
|
||||
loop = asyncio.get_running_loop()
|
||||
fd = sys.stdin.fileno()
|
||||
|
||||
def read_byte():
|
||||
return os.read(fd, 1).decode("utf-8", errors="ignore")
|
||||
|
||||
while not self._stop:
|
||||
char = await loop.run_in_executor(None, read_byte)
|
||||
|
||||
self._buffer += char
|
||||
if self._buffer.endswith("\033\\"):
|
||||
#if "\033_GOK" in self._buffer:
|
||||
if re.search('\033_G(i=\\d+;)?OK', self._buffer):
|
||||
self.ok_event.set()
|
||||
self._buffer = ""
|
||||
continue
|
||||
|
||||
# Dispatch to key input handlers (non-escape)
|
||||
if not char.startswith("\033"):
|
||||
for callback in self.input_callbacks:
|
||||
callback(char)
|
||||
await asyncio.sleep(0.01)
|
||||
|
@ -29,13 +29,12 @@ class TerminalPlotter(pv.Plotter):
|
||||
self.set_background([0.0, 0.0, 0.0])
|
||||
self.ren_win.SetAlphaBitPlanes(1)
|
||||
self.ren_win.SetMultiSamples(0)
|
||||
self.terminal = kitty.Terminal()
|
||||
|
||||
|
||||
async def initialize(self):
|
||||
h_pix, _ = await self.terminal.get_terminal_cell_size()
|
||||
self.rows, _ = self.terminal.get_terminal_size()
|
||||
self.start_y, self.start_x = await self.terminal.get_position()
|
||||
h_pix, _ = await kitty.get_terminal_cell_size()
|
||||
self.rows, _ = kitty.get_terminal_size()
|
||||
self.start_y, self.start_x = await kitty.get_position()
|
||||
self.needed_lines = math.ceil(self.height / h_pix)
|
||||
|
||||
# Add vertical space if needed
|
||||
@ -43,61 +42,41 @@ class TerminalPlotter(pv.Plotter):
|
||||
missing = self.needed_lines - (self.rows - self.start_y)
|
||||
self.start_y -= missing
|
||||
print("\n" * self.needed_lines, end="")
|
||||
self.terminal.set_position(self.start_y, self.start_x)
|
||||
kitty.set_position(self.start_y, self.start_x)
|
||||
self.orbit = self.generate_orbital_path()
|
||||
|
||||
def _handle_key(c):
|
||||
if c == "a":
|
||||
self.camera.Azimuth(10)
|
||||
elif c == "d":
|
||||
self.camera.Azimuth(-10)
|
||||
elif c == "w":
|
||||
self.camera.Elevation(10)
|
||||
elif c == "s":
|
||||
self.camera.Elevation(-10)
|
||||
elif c == "+":
|
||||
self.camera.zoom(1.1)
|
||||
elif c == "-":
|
||||
self.camera.zoom(0.9)
|
||||
elif c == "p":
|
||||
self.fly_to(self.orbit.points[15])
|
||||
elif c == "q":
|
||||
self._running = False
|
||||
self.terminal.terminal_io.register_input_callback(_handle_key)
|
||||
async def _handle_key(self, c):
|
||||
if c == "a":
|
||||
self.camera.Azimuth(10)
|
||||
elif c == "d":
|
||||
self.camera.Azimuth(-10)
|
||||
elif c == "w":
|
||||
self.camera.Elevation(10)
|
||||
elif c == "s":
|
||||
self.camera.Elevation(-10)
|
||||
elif c == "+":
|
||||
self.camera.zoom(1.1)
|
||||
elif c == "-":
|
||||
self.camera.zoom(0.9)
|
||||
elif c == "p":
|
||||
self.fly_to(self.orbit.points[15])
|
||||
|
||||
|
||||
# async def _handle_key(self, c):
|
||||
# if c == "a":
|
||||
# self.camera.Azimuth(10)
|
||||
# elif c == "d":
|
||||
# self.camera.Azimuth(-10)
|
||||
# elif c == "w":
|
||||
# self.camera.Elevation(10)
|
||||
# elif c == "s":
|
||||
# self.camera.Elevation(-10)
|
||||
# elif c == "+":
|
||||
# self.camera.zoom(1.1)
|
||||
# elif c == "-":
|
||||
# self.camera.zoom(0.9)
|
||||
# elif c == "p":
|
||||
# self.fly_to(self.orbit.points[15])
|
||||
|
||||
#async def _input_loop(self):
|
||||
# fd = sys.stdin.fileno()
|
||||
# old_settings = termios.tcgetattr(fd)
|
||||
# try:
|
||||
# tty.setcbreak(fd)
|
||||
# while self._running:
|
||||
# rlist, _, _ = select.select([sys.stdin], [], [], 0)
|
||||
# if rlist:
|
||||
# c = sys.stdin.read(1)
|
||||
# if c == "q": # quit on q
|
||||
# self._running = False
|
||||
# else:
|
||||
# await self._handle_key(c)
|
||||
# await asyncio.sleep(0.001)
|
||||
# finally:
|
||||
# termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
|
||||
async def _input_loop(self):
|
||||
fd = sys.stdin.fileno()
|
||||
old_settings = termios.tcgetattr(fd)
|
||||
try:
|
||||
tty.setcbreak(fd)
|
||||
while self._running:
|
||||
rlist, _, _ = select.select([sys.stdin], [], [], 0)
|
||||
if rlist:
|
||||
c = sys.stdin.read(1)
|
||||
if c == "q": # quit on q
|
||||
self._running = False
|
||||
else:
|
||||
await self._handle_key(c)
|
||||
await asyncio.sleep(0.001)
|
||||
finally:
|
||||
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
|
||||
|
||||
async def _render_loop(self):
|
||||
import time
|
||||
@ -114,23 +93,22 @@ class TerminalPlotter(pv.Plotter):
|
||||
np_image = numpy_support.vtk_to_numpy(vtk_array).reshape(height, width, 4)
|
||||
np_image = np_image[::-1] # Flip vertically
|
||||
np_image = np.ascontiguousarray(np_image) # Ensure memory layout is C-contiguous
|
||||
#print("Render & copy:", time.perf_counter() - start)
|
||||
print("Render & copy:", time.perf_counter() - start)
|
||||
|
||||
#await self.terminal.draw_to_terminal(np_image, width, height, compress=True)
|
||||
await self.terminal.draw(np_image, width, height, use_shm=True)
|
||||
#print("Draw:", time.perf_counter() - start)
|
||||
self.terminal.set_position(self.start_y, self.start_x)
|
||||
#await kitty.draw_to_terminal(np_image, width, height, compress=True)
|
||||
await kitty.draw_to_terminal(np_image, width, height, use_shm=True)
|
||||
print("Draw:", time.perf_counter() - start)
|
||||
kitty.set_position(self.start_y, self.start_x)
|
||||
self.camera.Azimuth(1)
|
||||
|
||||
async def run(self):
|
||||
await self.initialize()
|
||||
self.iren.initialize()
|
||||
#tasks = [
|
||||
# #asyncio.create_task(self._input_loop()),
|
||||
# asyncio.create_task(self._render_loop()),
|
||||
#]
|
||||
#await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
|
||||
await self._render_loop()
|
||||
tasks = [
|
||||
asyncio.create_task(self._input_loop()),
|
||||
asyncio.create_task(self._render_loop()),
|
||||
]
|
||||
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
|
||||
self._running = False
|
||||
|
||||
async def main():
|
||||
|
11
uv.lock
generated
11
uv.lock
generated
@ -125,11 +125,21 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/cb/bd/b394387b598ed84d8d0fa90611a90bee0adc2021820ad5729f7ced74a8e2/imageio-2.37.0-py3-none-any.whl", hash = "sha256:11efa15b87bc7871b61590326b2d635439acc321cf7f8ce996f812543ce10eed", size = 315796 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ip2location"
|
||||
version = "8.10.5"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/26/d1/6c8d2e3e0076412186827eae5722f9eddb3d7a9bac1c906f1ee00182ff21/ip2location-8.10.5.tar.gz", hash = "sha256:e26bdd214be15a41030f4e7e6771df6ccbc75d743c8853933252369993a6368b", size = 13792 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/f8/32/83d9735da14416b48e571bfc624823e5b29b36384a2f5f2235c79b21d4ea/IP2Location-8.10.5-py3-none-any.whl", hash = "sha256:eb7e2a54d168a2f082927ca71e166e5c767d48a6c04851e3230cddebc4d34c79", size = 14283 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "kglobe"
|
||||
version = "0.1.0"
|
||||
source = { virtual = "." }
|
||||
dependencies = [
|
||||
{ name = "ip2location" },
|
||||
{ name = "numpy" },
|
||||
{ name = "pyrender" },
|
||||
{ name = "pyvista" },
|
||||
@ -137,6 +147,7 @@ dependencies = [
|
||||
|
||||
[package.metadata]
|
||||
requires-dist = [
|
||||
{ name = "ip2location", specifier = ">=8.10.5" },
|
||||
{ name = "numpy", specifier = "<2" },
|
||||
{ name = "pyrender", specifier = ">=0.1.45" },
|
||||
{ name = "pyvista", specifier = ">=0.45.3" },
|
||||
|
Loading…
Reference in New Issue
Block a user