From d5a9ad9a1979c7e9f02b7943fe37f331620ac044 Mon Sep 17 00:00:00 2001 From: Felix Pankratz Date: Mon, 21 Jul 2025 21:55:44 +0200 Subject: [PATCH] cleanup/refactor --- kglobe.py | 98 ++++++---------------------------------------- kitty.py | 76 +++++++++++++++++------------------ terminalplotter.py | 70 +++++++++++++++++++++++++++++++++ 3 files changed, 118 insertions(+), 126 deletions(-) create mode 100644 terminalplotter.py diff --git a/kglobe.py b/kglobe.py index dc00989..9644ab8 100644 --- a/kglobe.py +++ b/kglobe.py @@ -9,80 +9,15 @@ import subprocess import re import requests import argparse -from io import BytesIO import asyncio -import sys -import termios -import tty -import select + +from terminalplotter import TerminalPlotter # TODO: Color arches based on latency -# TODO: Interactive globe (spin w/ keys) # TODO: Text info (num hops etc.) -# TODO: Image spacing -# TODO: Async rendering? - -class TerminalPlotter(pv.Plotter): - def __init__(self, width, height, **kwargs): - super().__init__(off_screen=True, window_size=(height, width), **kwargs) - self._running = True - h_pix, _ = kitty.get_terminal_cell_size() - self.rows, _ = kitty.get_terminal_size() - self.start_y, self.start_x = kitty.get_position() - # the image requires height/cell_height lines - self.needed_lines = math.ceil(height/h_pix) - self.set_background([0.0, 1.0, 0.0]) - #if self.start_y == self.rows: - if self.rows - self.start_y < self.needed_lines: - self.set_background([1.0, 0.0, 0.0]) - missing = self.needed_lines - (self.rows - self.start_y) - self.start_y -= missing - #self.start_y -= - #print("\n" * self.needed_lines, end="") - print("\n" * self.needed_lines, end="") - kitty.set_position(self.start_y, self.start_x) - - def render_to_kitty(self): - import io - self.render() - buf = io.BytesIO() - self.screenshot(buf, transparent_background=True) - kitty.draw_to_terminal(buf) - print('y:', self.start_y, 'rows:', self.rows, end='') - kitty.set_position(self.start_y, self.start_x) - - async def _input_loop(self): - fd = sys.stdin.fileno() - old_settings = termios.tcgetattr(fd) - try: - tty.setcbreak(fd) - while self._running: - rlist, _, _ = select.select([sys.stdin], [], [], 0) - if rlist: - c = sys.stdin.read(1) - if c == 'q': # quit on q - self._running = False - else: - self._handle_key(c) - await asyncio.sleep(0.01) - finally: - termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) - - def _handle_key(self, c): - # Simple interaction: rotate camera with a/s keys - if c == 'a': - self.camera.Azimuth(10) - elif c == 'd': - self.camera.Azimuth(-10) - elif c == 'w': - self.camera.Elevation(10) - elif c == 's': - self.camera.Elevation(-10) - self.render_to_kitty() - - async def run(self): - self.render_to_kitty() - await self._input_loop() +# DONE: Interactive globe (spin w/ keys) +# DONE: Image spacing +# DONE: Async rendering? # Convert lat/lon to Cartesian coordinates def latlon_to_xyz(lat: float, lon: float, radius=1.0): @@ -93,7 +28,6 @@ def latlon_to_xyz(lat: float, lon: float, radius=1.0): z = radius * np.sin(lat_rad) return np.array([x, y, z]) - # Create an arch between two 3D points def generate_arch(p1, p2, height_factor=0.2, n_points=100): # Normalize input points to lie on the unit sphere @@ -148,9 +82,7 @@ def main(): description="Traceroute on a globe", epilog="Requires kitty graphics protocol support in terminal", ) - parser.add_argument("-t", "--traceroute", default=None) - args = parser.parse_args() locations = [] @@ -166,9 +98,7 @@ def main(): ) tex = pv.examples.load_globe_texture() - globe.active_texture_coordinates = np.zeros((globe.points.shape[0], 2)) - globe.active_texture_coordinates[:, 0] = 0.5 + np.arctan2( globe.points[:, 1], globe.points[:, 0] ) / (2 * math.pi) @@ -179,22 +109,20 @@ def main(): # Convert to 3D coordinates points_3d = [latlon_to_xyz(lat, lon) for lat, lon in locations] - #pl: pv.Plotter = pv.Plotter(off_screen=(not args.external)) - height, width = kitty.get_terminal_size_pixel() - pl = TerminalPlotter(450, 450) - pl.add_mesh(globe, color="tan", smooth_shading=True, texture=tex, show_edges=False) + plotter = TerminalPlotter(450, 450) + plotter.add_mesh(globe, color="tan", smooth_shading=True, texture=tex, show_edges=False) - for pt in points_3d: - city_marker = pv.Sphere(center=pt, radius=0.02) - pl.add_mesh(city_marker, color="blue") + for point in points_3d: + city_marker = pv.Sphere(center=point, radius=0.02) + plotter.add_mesh(city_marker, color="blue") for i in range(len(points_3d[:-1])): arch = generate_arch(points_3d[i], points_3d[i + 1], height_factor=0.2) line = pv.lines_from_points(arch, close=False) - pl.add_mesh(line, color="red", line_width=2) + plotter.add_mesh(line, color="red", line_width=2) - #kitty.hide_cursor() + kitty.hide_cursor() try: - asyncio.run(pl.run()) + asyncio.run(plotter.run()) finally: kitty.show_cursor() diff --git a/kitty.py b/kitty.py index 9725618..caa3aed 100755 --- a/kitty.py +++ b/kitty.py @@ -10,40 +10,42 @@ import fcntl def draw_to_terminal(buffer: BytesIO) -> None: - '''Display a PNG image in the terminal.''' - write_chunked(a="T", i=1, f=100, q=2, data=buffer.getvalue()) + """Display a PNG image in the terminal.""" + _write_chunked(a="T", i=1, f=100, q=2, data=buffer.getvalue()) + def get_terminal_cell_size() -> tuple[int, int]: - '''Get (height, width) of a single cell in px''' + """Get (height, width) of a single cell in px""" reply = _query_terminal("\x1b[16t", "t") match = re.search(r"\[6;(\d+);(\d+)t", reply) if match: v_pix, h_pix = map(int, match.groups()) return v_pix, h_pix - else: - print(reply) - raise ValueError("Failed to parse terminal cell size response") + print(reply) + raise ValueError("Failed to parse terminal cell size response") + def get_terminal_size_pixel() -> tuple[int, int]: - '''Get (height, width) of the terminal in px''' + """Get (height, width) of the terminal in px""" reply = _query_terminal("\x1b[14t", "t") match = re.search(r"\[4;(\d+);(\d+)t", reply) if match: height, width = map(int, match.groups()) return height, width - else: - raise ValueError("Failed to parse terminal pixel size response") + raise ValueError("Failed to parse terminal pixel size response") + def get_terminal_size() -> tuple[int, int]: - '''Get (rows, cols) of the terminal''' - buf = array.array('H', [0, 0, 0, 0]) + """Get (rows, cols) of the terminal""" + buf = array.array("H", [0, 0, 0, 0]) fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf) rows, cols, width, height = buf return (rows, cols) -def serialize_gr_command(**cmd) -> bytes: + +def _serialize_gr_command(**cmd) -> bytes: payload = cmd.pop("payload", None) cmd = ",".join(f"{k}={v}" for k, v in cmd.items()) ans = [] @@ -56,39 +58,41 @@ def serialize_gr_command(**cmd) -> bytes: return b"".join(ans) -def write_chunked(**cmd) -> None: +def _write_chunked(**cmd) -> None: data = standard_b64encode(cmd.pop("data")) while data: chunk, data = data[:4096], data[4096:] m = 1 if data else 0 - sys.stdout.buffer.write(serialize_gr_command(payload=chunk, m=m, **cmd)) + sys.stdout.buffer.write(_serialize_gr_command(payload=chunk, m=m, **cmd)) sys.stdout.flush() cmd.clear() def hide_cursor() -> None: - '''Tell the terminal to hide the cursor.''' + """Tell the terminal to hide the cursor.""" _write_stdout("\x1b[?25l") def show_cursor() -> None: - '''Tell the terminal to show the cursor.''' + """Tell the terminal to show the cursor.""" _write_stdout("\x1b[?25h") def set_position(y: int, x: int) -> None: - '''Set the cursor position to y, x''' + """Set the cursor position to y, x""" _write_stdout(f"\x1b[{y};{x}H") + def _write_stdout(cmd: str) -> None: sys.stdout.write(cmd) sys.stdout.flush() + def _query_terminal(escape: str, endchar: str) -> str: - ''' + """ Send `escape` to the terminal, read the response until `endchar`, return response (including `endchar`) - ''' + """ # Save the current terminal settings fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) @@ -100,9 +104,9 @@ def _query_terminal(escape: str, endchar: str) -> str: response = "" while True: - ch = sys.stdin.read(1) - response += ch - if ch == endchar: + char = sys.stdin.read(1) + response += char + if char == endchar: break finally: @@ -110,16 +114,16 @@ def _query_terminal(escape: str, endchar: str) -> str: termios.tcsetattr(fd, termios.TCSANOW, old_settings) return response + def get_position() -> tuple[int, int]: - '''Get the (y, x) position of the cursor''' + """Get the (y, x) position of the cursor""" reply = _query_terminal("\x1b[6n", "R") match = re.search(r"\[(\d+);(\d+)R", reply) if match: y, x = map(int, match.groups()) return y, x - else: - raise ValueError("Failed to parse cursor position response") + raise ValueError("Failed to parse cursor position response") if __name__ == "__main__": @@ -130,16 +134,13 @@ if __name__ == "__main__": height, width = get_terminal_size_pixel() y, x = get_position() - print(f'Terminal has {rows} rows, {cols} cols = {rows * cols} cells') - print(f'Cell size: {h_pix}x{v_pix} px') - print(f'Dimensions: {width}x{height} px') - print('Cursor is at y, x:', get_position()) - import time - time.sleep(2) + print(f"Terminal has {rows} rows, {cols} cols = {rows * cols} cells") + print(f"Cell size: {h_pix}x{v_pix} px") + print(f"Dimensions: {width}x{height} px") + print("Cursor is at y, x:", get_position()) - - new_lines = int((height/v_pix)-6) - print('\n' * new_lines, end='') + new_lines = int((height / v_pix) - 6) + print("\n" * new_lines, end="") set_position(y - new_lines - 6, x) @@ -149,11 +150,4 @@ if __name__ == "__main__": b = BytesIO() pl.screenshot(b, transparent_background=True, window_size=(width, height)) - # i = Image.new("RGB", (100, 100), (0, 0, 0)) - # d = ImageDraw.Draw(i) - # d.ellipse([(5, 5), (95, 95)]) - # d.ellipse([(10, 10), (90, 90)]) - # d.line(((50, 0), (50, 100))) - # d.line(((0, 50), (100, 50))) - draw_to_terminal(b) diff --git a/terminalplotter.py b/terminalplotter.py new file mode 100644 index 0000000..dbf447e --- /dev/null +++ b/terminalplotter.py @@ -0,0 +1,70 @@ +import asyncio +import io +import math +import select +import sys +import termios +import tty + +import pyvista as pv + +import kitty + + +class TerminalPlotter(pv.Plotter): + def __init__(self, width, height, **kwargs): + super().__init__(off_screen=True, window_size=(height, width), **kwargs) + self._running = True + h_pix, _ = kitty.get_terminal_cell_size() + self.rows, _ = kitty.get_terminal_size() + self.start_y, self.start_x = kitty.get_position() + # the image requires height/cell_height lines + self.needed_lines = math.ceil(height / h_pix) + self.set_background([0.0, 1.0, 0.0]) + # if we are too close to the bottom of the terminal, create some space. + if self.rows - self.start_y < self.needed_lines: + self.set_background([1.0, 0.0, 0.0]) + missing = self.needed_lines - (self.rows - self.start_y) + self.start_y -= missing + print("\n" * self.needed_lines, end="") + kitty.set_position(self.start_y, self.start_x) + + def render_to_kitty(self): + self.render() + buf = io.BytesIO() + self.screenshot(buf, transparent_background=True) + kitty.draw_to_terminal(buf) + # print("y:", self.start_y, "rows:", self.rows, end="") + kitty.set_position(self.start_y, self.start_x) + + async def _input_loop(self): + fd = sys.stdin.fileno() + old_settings = termios.tcgetattr(fd) + try: + tty.setcbreak(fd) + while self._running: + rlist, _, _ = select.select([sys.stdin], [], [], 0) + if rlist: + c = sys.stdin.read(1) + if c == "q": # quit on q + self._running = False + else: + self._handle_key(c) + await asyncio.sleep(0.01) + finally: + termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) + + def _handle_key(self, c): + if c == "a": + self.camera.Azimuth(10) + elif c == "d": + self.camera.Azimuth(-10) + elif c == "w": + self.camera.Elevation(10) + elif c == "s": + self.camera.Elevation(-10) + self.render_to_kitty() + + async def run(self): + self.render_to_kitty() + await self._input_loop()