This commit is contained in:
Felix Pankratz 2025-07-21 23:24:55 +02:00
parent 5748917acf
commit 6cff5f247d
3 changed files with 99 additions and 37 deletions

View File

@ -85,7 +85,7 @@ def traceroute(target: str) -> list[tuple[int, int]]:
return coords return coords
def main(): async def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog="kglobe", prog="kglobe",
@ -123,7 +123,7 @@ def main():
else: else:
points_3d = [latlon_to_xyz(lat, lon) for lat, lon in locations] points_3d = [latlon_to_xyz(lat, lon) for lat, lon in locations]
height, width = kitty.get_terminal_size_pixel() height, width = await kitty.get_terminal_size_pixel()
plotter = TerminalPlotter(width, height) plotter = TerminalPlotter(width, height)
plotter.add_mesh(globe, color="tan", smooth_shading=False, texture=tex, show_edges=False) plotter.add_mesh(globe, color="tan", smooth_shading=False, texture=tex, show_edges=False)
@ -138,10 +138,11 @@ def main():
#kitty.hide_cursor() #kitty.hide_cursor()
try: try:
asyncio.run(plotter.run()) #asyncio.run(plotter.run())
await plotter.run()
finally: finally:
kitty.show_cursor() kitty.show_cursor()
if __name__ == "__main__": if __name__ == "__main__":
main() asyncio.run(main())

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import asyncio
import re import re
import sys import sys
import termios import termios
@ -9,14 +10,14 @@ import array
import fcntl import fcntl
def draw_to_terminal(buffer: BytesIO) -> None: async def draw_to_terminal(buffer: BytesIO) -> None:
"""Display a PNG image in the terminal.""" """Display a PNG image in the terminal."""
_write_chunked(a="T", i=1, f=100, q=2, data=buffer.getvalue()) await _write_chunked(a="T", i=1, f=100, q=2, data=buffer.getvalue())
def get_terminal_cell_size() -> tuple[int, int]: async def get_terminal_cell_size() -> tuple[int, int]:
"""Get (height, width) of a single cell in px""" """Get (height, width) of a single cell in px"""
reply = _query_terminal("\x1b[16t", "t") reply = await _query_terminal("\x1b[16t", "t")
match = re.search(r"\[6;(\d+);(\d+)t", reply) match = re.search(r"\[6;(\d+);(\d+)t", reply)
if match: if match:
@ -26,9 +27,9 @@ def get_terminal_cell_size() -> tuple[int, int]:
raise ValueError("Failed to parse terminal cell size response") raise ValueError("Failed to parse terminal cell size response")
def get_terminal_size_pixel() -> tuple[int, int]: async def get_terminal_size_pixel() -> tuple[int, int]:
"""Get (height, width) of the terminal in px""" """Get (height, width) of the terminal in px"""
reply = _query_terminal("\x1b[14t", "t") reply = await _query_terminal("\x1b[14t", "t")
match = re.search(r"\[4;(\d+);(\d+)t", reply) match = re.search(r"\[4;(\d+);(\d+)t", reply)
if match: if match:
@ -58,13 +59,15 @@ def _serialize_gr_command(**cmd) -> bytes:
return b"".join(ans) return b"".join(ans)
def _write_chunked(**cmd) -> None: async def _write_chunked(**cmd) -> None:
data = standard_b64encode(cmd.pop("data")) data = standard_b64encode(cmd.pop("data"))
loop = asyncio.get_running_loop()
while data: while data:
chunk, data = data[:4096], data[4096:] chunk, data = data[:4096], data[4096:]
m = 1 if data else 0 m = 1 if data else 0
sys.stdout.buffer.write(_serialize_gr_command(payload=chunk, m=m, **cmd)) payload = _serialize_gr_command(payload=chunk, m=m, **cmd)
sys.stdout.flush() await loop.run_in_executor(None, sys.stdout.buffer.write, payload)
await loop.run_in_executor(None, sys.stdout.flush)
cmd.clear() cmd.clear()
@ -88,36 +91,38 @@ def _write_stdout(cmd: str) -> None:
sys.stdout.flush() sys.stdout.flush()
def _query_terminal(escape: str, endchar: str) -> str: async def _query_terminal(escape: str, endchar: str) -> str:
""" """
Send `escape` to the terminal, read the response until Send `escape` to the terminal, read the response until
`endchar`, return response (including `endchar`) `endchar`, return response (including `endchar`)
""" """
# Save the current terminal settings # Save the current terminal settings
loop = asyncio.get_running_loop()
fd = sys.stdin.fileno() fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd) old_settings = termios.tcgetattr(fd)
try: def read_terminal_response() -> str:
# Set terminal to raw mode
tty.setraw(fd) tty.setraw(fd)
_write_stdout(escape) _write_stdout(escape)
response = "" response = ""
while True: while True:
char = sys.stdin.read(1) char = sys.stdin.read(1)
response += char response += char
if char == endchar: if char == endchar:
break break
return response
try:
response = await loop.run_in_executor(None, read_terminal_response)
finally: finally:
# Restore the terminal settings # Restore the terminal settings
termios.tcsetattr(fd, termios.TCSANOW, old_settings) termios.tcsetattr(fd, termios.TCSANOW, old_settings)
return response return response
def get_position() -> tuple[int, int]: async def get_position() -> tuple[int, int]:
"""Get the (y, x) position of the cursor""" """Get the (y, x) position of the cursor"""
reply = _query_terminal("\x1b[6n", "R") reply = await _query_terminal("\x1b[6n", "R")
match = re.search(r"\[(\d+);(\d+)R", reply) match = re.search(r"\[(\d+);(\d+)R", reply)
if match: if match:
@ -126,18 +131,19 @@ def get_position() -> tuple[int, int]:
raise ValueError("Failed to parse cursor position response") raise ValueError("Failed to parse cursor position response")
if __name__ == "__main__": async def main():
import pyvista as pv import pyvista as pv
rows, cols = get_terminal_size() rows, cols = get_terminal_size()
v_pix, h_pix = get_terminal_cell_size() v_pix, h_pix = await get_terminal_cell_size()
height, width = get_terminal_size_pixel() height, width = await get_terminal_size_pixel()
y, x = await get_position()
y, x = get_position() y, x = await get_position()
print(f"Terminal has {rows} rows, {cols} cols = {rows * cols} cells") print(f"Terminal has {rows} rows, {cols} cols = {rows * cols} cells")
print(f"Cell size: {h_pix}x{v_pix} px") print(f"Cell size: {h_pix}x{v_pix} px")
print(f"Dimensions: {width}x{height} px") print(f"Dimensions: {width}x{height} px")
print("Cursor is at y, x:", get_position()) print("Cursor is at y, x:", y, x)
new_lines = int((height / v_pix) - 6) new_lines = int((height / v_pix) - 6)
print("\n" * new_lines, end="") print("\n" * new_lines, end="")
@ -150,4 +156,7 @@ if __name__ == "__main__":
b = BytesIO() b = BytesIO()
pl.screenshot(b, transparent_background=True, window_size=(width, height)) pl.screenshot(b, transparent_background=True, window_size=(width, height))
draw_to_terminal(b) await draw_to_terminal(b)
if __name__ == "__main__":
asyncio.run(main())

View File

@ -17,24 +17,33 @@ class TerminalPlotter(pv.Plotter):
self.width = width self.width = width
self.height = height self.height = height
self._running = True self._running = True
h_pix, _ = kitty.get_terminal_cell_size() self._needs_render = True
self.rows, _ = kitty.get_terminal_size() self._render_complete = False
self.start_y, self.start_x = kitty.get_position() self._rendering = False
# the image requires height/cell_height lines self._render_pending = True
self.needed_lines = math.ceil(height / h_pix)
self.set_background([0.0, 0.0, 0.0]) self.set_background([0.0, 0.0, 0.0])
self.add_on_render_callback(self._on_render)
# if we are too close to the bottom of the terminal, create some space. # if we are too close to the bottom of the terminal, create some space.
async def initialize(self):
h_pix, _ = await kitty.get_terminal_cell_size()
self.rows, _ = kitty.get_terminal_size()
self.start_y, self.start_x = await kitty.get_position()
self.needed_lines = math.ceil(self.height / h_pix)
# Add vertical space if needed
if self.rows - self.start_y < self.needed_lines: if self.rows - self.start_y < self.needed_lines:
missing = self.needed_lines - (self.rows - self.start_y) missing = self.needed_lines - (self.rows - self.start_y)
self.start_y -= missing self.start_y -= missing
print("\n" * self.needed_lines, end="") print("\n" * self.needed_lines, end="")
kitty.set_position(self.start_y, self.start_x) kitty.set_position(self.start_y, self.start_x)
def render_to_kitty(self):
async def render_to_kitty(self):
self.render() self.render()
buf = io.BytesIO() buf = io.BytesIO()
self.screenshot(buf, transparent_background=True, window_size=(self.width, self.height)) self.screenshot(buf, transparent_background=True, window_size=(self.width, self.height))
kitty.draw_to_terminal(buf) await kitty.draw_to_terminal(buf)
# print("y:", self.start_y, "rows:", self.rows, end="") # print("y:", self.start_y, "rows:", self.rows, end="")
kitty.set_position(self.start_y, self.start_x) kitty.set_position(self.start_y, self.start_x)
@ -50,12 +59,12 @@ class TerminalPlotter(pv.Plotter):
if c == "q": # quit on q if c == "q": # quit on q
self._running = False self._running = False
else: else:
self._handle_key(c) await self._handle_key(c)
await asyncio.sleep(0.01) await asyncio.sleep(0.01)
finally: finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
def _handle_key(self, c): async def _handle_key(self, c):
if c == "a": if c == "a":
self.camera.Azimuth(10) self.camera.Azimuth(10)
elif c == "d": elif c == "d":
@ -64,8 +73,51 @@ class TerminalPlotter(pv.Plotter):
self.camera.Elevation(10) self.camera.Elevation(10)
elif c == "s": elif c == "s":
self.camera.Elevation(-10) self.camera.Elevation(-10)
self.render_to_kitty() self._needs_render = True
# await self.render_to_kitty()
def _on_render(self, *args):
"""Callback that runs after each render."""
self._rendering = False
self._render_complete = True
async def _display_loop(self):
"""Loop that captures the screen and updates the terminal after rendering."""
while self._running:
if self._render_complete:
self._render_complete = False
buf = io.BytesIO()
# Safe: Screenshot only after render is confirmed complete
self.screenshot(buf, transparent_background=True, window_size=(self.width, self.height))
await kitty.draw_to_terminal(buf)
kitty.set_position(self.start_y, self.start_x)
await asyncio.sleep(0.001)
async def _render_loop(self):
while self._running:
if self._needs_render and not self._rendering:
self._rendering = True
self._needs_render = False
self.update() # triggers a render and calls _on_render
await asyncio.sleep(0.001)
async def _display_in_terminal(self, buf):
await kitty.draw_to_terminal(buf)
kitty.set_position(self.start_y, self.start_x)
async def run(self): async def run(self):
self.render_to_kitty() await self.initialize()
await self._input_loop() self.iren.initialize()
self._needs_render = True
tasks = [
asyncio.create_task(self._input_loop()),
asyncio.create_task(self._render_loop()),
asyncio.create_task(self._display_loop()),
]
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
self._running = False
#await self.render_to_kitty()
#await self._input_loop()