diff --git a/kglobe.py b/kglobe.py index bb76c5f..8d33e55 100644 --- a/kglobe.py +++ b/kglobe.py @@ -85,7 +85,7 @@ def traceroute(target: str) -> list[tuple[int, int]]: return coords -def main(): +async def main(): parser = argparse.ArgumentParser( prog="kglobe", @@ -123,7 +123,7 @@ def main(): else: points_3d = [latlon_to_xyz(lat, lon) for lat, lon in locations] - height, width = kitty.get_terminal_size_pixel() + height, width = await kitty.get_terminal_size_pixel() plotter = TerminalPlotter(width, height) plotter.add_mesh(globe, color="tan", smooth_shading=False, texture=tex, show_edges=False) @@ -138,10 +138,11 @@ def main(): #kitty.hide_cursor() try: - asyncio.run(plotter.run()) + #asyncio.run(plotter.run()) + await plotter.run() finally: kitty.show_cursor() if __name__ == "__main__": - main() + asyncio.run(main()) diff --git a/kitty.py b/kitty.py index caa3aed..979de70 100755 --- a/kitty.py +++ b/kitty.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +import asyncio import re import sys import termios @@ -9,14 +10,14 @@ import array import fcntl -def draw_to_terminal(buffer: BytesIO) -> None: +async def draw_to_terminal(buffer: BytesIO) -> None: """Display a PNG image in the terminal.""" - _write_chunked(a="T", i=1, f=100, q=2, data=buffer.getvalue()) + await _write_chunked(a="T", i=1, f=100, q=2, data=buffer.getvalue()) -def get_terminal_cell_size() -> tuple[int, int]: +async def get_terminal_cell_size() -> tuple[int, int]: """Get (height, width) of a single cell in px""" - reply = _query_terminal("\x1b[16t", "t") + reply = await _query_terminal("\x1b[16t", "t") match = re.search(r"\[6;(\d+);(\d+)t", reply) if match: @@ -26,9 +27,9 @@ def get_terminal_cell_size() -> tuple[int, int]: raise ValueError("Failed to parse terminal cell size response") -def get_terminal_size_pixel() -> tuple[int, int]: +async def get_terminal_size_pixel() -> tuple[int, int]: """Get (height, width) of the terminal in px""" - reply = _query_terminal("\x1b[14t", "t") + reply = await _query_terminal("\x1b[14t", "t") match = re.search(r"\[4;(\d+);(\d+)t", reply) if match: @@ -58,13 +59,15 @@ def _serialize_gr_command(**cmd) -> bytes: return b"".join(ans) -def _write_chunked(**cmd) -> None: +async def _write_chunked(**cmd) -> None: data = standard_b64encode(cmd.pop("data")) + loop = asyncio.get_running_loop() while data: chunk, data = data[:4096], data[4096:] m = 1 if data else 0 - sys.stdout.buffer.write(_serialize_gr_command(payload=chunk, m=m, **cmd)) - sys.stdout.flush() + payload = _serialize_gr_command(payload=chunk, m=m, **cmd) + await loop.run_in_executor(None, sys.stdout.buffer.write, payload) + await loop.run_in_executor(None, sys.stdout.flush) cmd.clear() @@ -88,36 +91,38 @@ def _write_stdout(cmd: str) -> None: sys.stdout.flush() -def _query_terminal(escape: str, endchar: str) -> str: +async def _query_terminal(escape: str, endchar: str) -> str: """ Send `escape` to the terminal, read the response until `endchar`, return response (including `endchar`) """ # Save the current terminal settings + loop = asyncio.get_running_loop() fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) - try: - # Set terminal to raw mode + def read_terminal_response() -> str: tty.setraw(fd) _write_stdout(escape) - response = "" while True: char = sys.stdin.read(1) response += char if char == endchar: break + return response + try: + response = await loop.run_in_executor(None, read_terminal_response) finally: # Restore the terminal settings termios.tcsetattr(fd, termios.TCSANOW, old_settings) return response -def get_position() -> tuple[int, int]: +async def get_position() -> tuple[int, int]: """Get the (y, x) position of the cursor""" - reply = _query_terminal("\x1b[6n", "R") + reply = await _query_terminal("\x1b[6n", "R") match = re.search(r"\[(\d+);(\d+)R", reply) if match: @@ -126,18 +131,19 @@ def get_position() -> tuple[int, int]: raise ValueError("Failed to parse cursor position response") -if __name__ == "__main__": +async def main(): import pyvista as pv rows, cols = get_terminal_size() - v_pix, h_pix = get_terminal_cell_size() - height, width = get_terminal_size_pixel() + v_pix, h_pix = await get_terminal_cell_size() + height, width = await get_terminal_size_pixel() + y, x = await get_position() - y, x = get_position() + y, x = await get_position() print(f"Terminal has {rows} rows, {cols} cols = {rows * cols} cells") print(f"Cell size: {h_pix}x{v_pix} px") print(f"Dimensions: {width}x{height} px") - print("Cursor is at y, x:", get_position()) + print("Cursor is at y, x:", y, x) new_lines = int((height / v_pix) - 6) print("\n" * new_lines, end="") @@ -150,4 +156,7 @@ if __name__ == "__main__": b = BytesIO() pl.screenshot(b, transparent_background=True, window_size=(width, height)) - draw_to_terminal(b) + await draw_to_terminal(b) + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/terminalplotter.py b/terminalplotter.py index 8f7f493..262a589 100644 --- a/terminalplotter.py +++ b/terminalplotter.py @@ -17,24 +17,33 @@ class TerminalPlotter(pv.Plotter): self.width = width self.height = height self._running = True - h_pix, _ = kitty.get_terminal_cell_size() - self.rows, _ = kitty.get_terminal_size() - self.start_y, self.start_x = kitty.get_position() - # the image requires height/cell_height lines - self.needed_lines = math.ceil(height / h_pix) + self._needs_render = True + self._render_complete = False + self._rendering = False + self._render_pending = True self.set_background([0.0, 0.0, 0.0]) + self.add_on_render_callback(self._on_render) # if we are too close to the bottom of the terminal, create some space. + + async def initialize(self): + h_pix, _ = await kitty.get_terminal_cell_size() + self.rows, _ = kitty.get_terminal_size() + self.start_y, self.start_x = await kitty.get_position() + self.needed_lines = math.ceil(self.height / h_pix) + + # Add vertical space if needed if self.rows - self.start_y < self.needed_lines: missing = self.needed_lines - (self.rows - self.start_y) self.start_y -= missing print("\n" * self.needed_lines, end="") kitty.set_position(self.start_y, self.start_x) - def render_to_kitty(self): + + async def render_to_kitty(self): self.render() buf = io.BytesIO() self.screenshot(buf, transparent_background=True, window_size=(self.width, self.height)) - kitty.draw_to_terminal(buf) + await kitty.draw_to_terminal(buf) # print("y:", self.start_y, "rows:", self.rows, end="") kitty.set_position(self.start_y, self.start_x) @@ -50,12 +59,12 @@ class TerminalPlotter(pv.Plotter): if c == "q": # quit on q self._running = False else: - self._handle_key(c) + await self._handle_key(c) await asyncio.sleep(0.01) finally: termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) - def _handle_key(self, c): + async def _handle_key(self, c): if c == "a": self.camera.Azimuth(10) elif c == "d": @@ -64,8 +73,51 @@ class TerminalPlotter(pv.Plotter): self.camera.Elevation(10) elif c == "s": self.camera.Elevation(-10) - self.render_to_kitty() + self._needs_render = True + # await self.render_to_kitty() + + def _on_render(self, *args): + """Callback that runs after each render.""" + self._rendering = False + self._render_complete = True + + async def _display_loop(self): + """Loop that captures the screen and updates the terminal after rendering.""" + while self._running: + if self._render_complete: + self._render_complete = False + buf = io.BytesIO() + # Safe: Screenshot only after render is confirmed complete + self.screenshot(buf, transparent_background=True, window_size=(self.width, self.height)) + await kitty.draw_to_terminal(buf) + kitty.set_position(self.start_y, self.start_x) + await asyncio.sleep(0.001) + + async def _render_loop(self): + while self._running: + if self._needs_render and not self._rendering: + self._rendering = True + self._needs_render = False + self.update() # triggers a render and calls _on_render + await asyncio.sleep(0.001) + + async def _display_in_terminal(self, buf): + await kitty.draw_to_terminal(buf) + kitty.set_position(self.start_y, self.start_x) + async def run(self): - self.render_to_kitty() - await self._input_loop() + await self.initialize() + self.iren.initialize() + self._needs_render = True + tasks = [ + asyncio.create_task(self._input_loop()), + asyncio.create_task(self._render_loop()), + asyncio.create_task(self._display_loop()), + ] + await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + self._running = False + + + #await self.render_to_kitty() + #await self._input_loop()