cleanup/refactor

This commit is contained in:
Felix Pankratz 2025-07-21 21:55:44 +02:00
parent 8464ceae24
commit d5a9ad9a19
3 changed files with 118 additions and 126 deletions

View File

@ -9,80 +9,15 @@ import subprocess
import re import re
import requests import requests
import argparse import argparse
from io import BytesIO
import asyncio import asyncio
import sys
import termios from terminalplotter import TerminalPlotter
import tty
import select
# TODO: Color arches based on latency # TODO: Color arches based on latency
# TODO: Interactive globe (spin w/ keys)
# TODO: Text info (num hops etc.) # TODO: Text info (num hops etc.)
# TODO: Image spacing # DONE: Interactive globe (spin w/ keys)
# TODO: Async rendering? # DONE: Image spacing
# DONE: Async rendering?
class TerminalPlotter(pv.Plotter):
def __init__(self, width, height, **kwargs):
super().__init__(off_screen=True, window_size=(height, width), **kwargs)
self._running = True
h_pix, _ = kitty.get_terminal_cell_size()
self.rows, _ = kitty.get_terminal_size()
self.start_y, self.start_x = kitty.get_position()
# the image requires height/cell_height lines
self.needed_lines = math.ceil(height/h_pix)
self.set_background([0.0, 1.0, 0.0])
#if self.start_y == self.rows:
if self.rows - self.start_y < self.needed_lines:
self.set_background([1.0, 0.0, 0.0])
missing = self.needed_lines - (self.rows - self.start_y)
self.start_y -= missing
#self.start_y -=
#print("\n" * self.needed_lines, end="")
print("\n" * self.needed_lines, end="")
kitty.set_position(self.start_y, self.start_x)
def render_to_kitty(self):
import io
self.render()
buf = io.BytesIO()
self.screenshot(buf, transparent_background=True)
kitty.draw_to_terminal(buf)
print('y:', self.start_y, 'rows:', self.rows, end='')
kitty.set_position(self.start_y, self.start_x)
async def _input_loop(self):
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setcbreak(fd)
while self._running:
rlist, _, _ = select.select([sys.stdin], [], [], 0)
if rlist:
c = sys.stdin.read(1)
if c == 'q': # quit on q
self._running = False
else:
self._handle_key(c)
await asyncio.sleep(0.01)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
def _handle_key(self, c):
# Simple interaction: rotate camera with a/s keys
if c == 'a':
self.camera.Azimuth(10)
elif c == 'd':
self.camera.Azimuth(-10)
elif c == 'w':
self.camera.Elevation(10)
elif c == 's':
self.camera.Elevation(-10)
self.render_to_kitty()
async def run(self):
self.render_to_kitty()
await self._input_loop()
# Convert lat/lon to Cartesian coordinates # Convert lat/lon to Cartesian coordinates
def latlon_to_xyz(lat: float, lon: float, radius=1.0): def latlon_to_xyz(lat: float, lon: float, radius=1.0):
@ -93,7 +28,6 @@ def latlon_to_xyz(lat: float, lon: float, radius=1.0):
z = radius * np.sin(lat_rad) z = radius * np.sin(lat_rad)
return np.array([x, y, z]) return np.array([x, y, z])
# Create an arch between two 3D points # Create an arch between two 3D points
def generate_arch(p1, p2, height_factor=0.2, n_points=100): def generate_arch(p1, p2, height_factor=0.2, n_points=100):
# Normalize input points to lie on the unit sphere # Normalize input points to lie on the unit sphere
@ -148,9 +82,7 @@ def main():
description="Traceroute on a globe", description="Traceroute on a globe",
epilog="Requires kitty graphics protocol support in terminal", epilog="Requires kitty graphics protocol support in terminal",
) )
parser.add_argument("-t", "--traceroute", default=None) parser.add_argument("-t", "--traceroute", default=None)
args = parser.parse_args() args = parser.parse_args()
locations = [] locations = []
@ -166,9 +98,7 @@ def main():
) )
tex = pv.examples.load_globe_texture() tex = pv.examples.load_globe_texture()
globe.active_texture_coordinates = np.zeros((globe.points.shape[0], 2)) globe.active_texture_coordinates = np.zeros((globe.points.shape[0], 2))
globe.active_texture_coordinates[:, 0] = 0.5 + np.arctan2( globe.active_texture_coordinates[:, 0] = 0.5 + np.arctan2(
globe.points[:, 1], globe.points[:, 0] globe.points[:, 1], globe.points[:, 0]
) / (2 * math.pi) ) / (2 * math.pi)
@ -179,22 +109,20 @@ def main():
# Convert to 3D coordinates # Convert to 3D coordinates
points_3d = [latlon_to_xyz(lat, lon) for lat, lon in locations] points_3d = [latlon_to_xyz(lat, lon) for lat, lon in locations]
#pl: pv.Plotter = pv.Plotter(off_screen=(not args.external)) plotter = TerminalPlotter(450, 450)
height, width = kitty.get_terminal_size_pixel() plotter.add_mesh(globe, color="tan", smooth_shading=True, texture=tex, show_edges=False)
pl = TerminalPlotter(450, 450)
pl.add_mesh(globe, color="tan", smooth_shading=True, texture=tex, show_edges=False)
for pt in points_3d: for point in points_3d:
city_marker = pv.Sphere(center=pt, radius=0.02) city_marker = pv.Sphere(center=point, radius=0.02)
pl.add_mesh(city_marker, color="blue") plotter.add_mesh(city_marker, color="blue")
for i in range(len(points_3d[:-1])): for i in range(len(points_3d[:-1])):
arch = generate_arch(points_3d[i], points_3d[i + 1], height_factor=0.2) arch = generate_arch(points_3d[i], points_3d[i + 1], height_factor=0.2)
line = pv.lines_from_points(arch, close=False) line = pv.lines_from_points(arch, close=False)
pl.add_mesh(line, color="red", line_width=2) plotter.add_mesh(line, color="red", line_width=2)
#kitty.hide_cursor() kitty.hide_cursor()
try: try:
asyncio.run(pl.run()) asyncio.run(plotter.run())
finally: finally:
kitty.show_cursor() kitty.show_cursor()

View File

@ -10,40 +10,42 @@ import fcntl
def draw_to_terminal(buffer: BytesIO) -> None: def draw_to_terminal(buffer: BytesIO) -> None:
'''Display a PNG image in the terminal.''' """Display a PNG image in the terminal."""
write_chunked(a="T", i=1, f=100, q=2, data=buffer.getvalue()) _write_chunked(a="T", i=1, f=100, q=2, data=buffer.getvalue())
def get_terminal_cell_size() -> tuple[int, int]: def get_terminal_cell_size() -> tuple[int, int]:
'''Get (height, width) of a single cell in px''' """Get (height, width) of a single cell in px"""
reply = _query_terminal("\x1b[16t", "t") reply = _query_terminal("\x1b[16t", "t")
match = re.search(r"\[6;(\d+);(\d+)t", reply) match = re.search(r"\[6;(\d+);(\d+)t", reply)
if match: if match:
v_pix, h_pix = map(int, match.groups()) v_pix, h_pix = map(int, match.groups())
return v_pix, h_pix return v_pix, h_pix
else: print(reply)
print(reply) raise ValueError("Failed to parse terminal cell size response")
raise ValueError("Failed to parse terminal cell size response")
def get_terminal_size_pixel() -> tuple[int, int]: def get_terminal_size_pixel() -> tuple[int, int]:
'''Get (height, width) of the terminal in px''' """Get (height, width) of the terminal in px"""
reply = _query_terminal("\x1b[14t", "t") reply = _query_terminal("\x1b[14t", "t")
match = re.search(r"\[4;(\d+);(\d+)t", reply) match = re.search(r"\[4;(\d+);(\d+)t", reply)
if match: if match:
height, width = map(int, match.groups()) height, width = map(int, match.groups())
return height, width return height, width
else: raise ValueError("Failed to parse terminal pixel size response")
raise ValueError("Failed to parse terminal pixel size response")
def get_terminal_size() -> tuple[int, int]: def get_terminal_size() -> tuple[int, int]:
'''Get (rows, cols) of the terminal''' """Get (rows, cols) of the terminal"""
buf = array.array('H', [0, 0, 0, 0]) buf = array.array("H", [0, 0, 0, 0])
fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf) fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf)
rows, cols, width, height = buf rows, cols, width, height = buf
return (rows, cols) return (rows, cols)
def serialize_gr_command(**cmd) -> bytes:
def _serialize_gr_command(**cmd) -> bytes:
payload = cmd.pop("payload", None) payload = cmd.pop("payload", None)
cmd = ",".join(f"{k}={v}" for k, v in cmd.items()) cmd = ",".join(f"{k}={v}" for k, v in cmd.items())
ans = [] ans = []
@ -56,39 +58,41 @@ def serialize_gr_command(**cmd) -> bytes:
return b"".join(ans) return b"".join(ans)
def write_chunked(**cmd) -> None: def _write_chunked(**cmd) -> None:
data = standard_b64encode(cmd.pop("data")) data = standard_b64encode(cmd.pop("data"))
while data: while data:
chunk, data = data[:4096], data[4096:] chunk, data = data[:4096], data[4096:]
m = 1 if data else 0 m = 1 if data else 0
sys.stdout.buffer.write(serialize_gr_command(payload=chunk, m=m, **cmd)) sys.stdout.buffer.write(_serialize_gr_command(payload=chunk, m=m, **cmd))
sys.stdout.flush() sys.stdout.flush()
cmd.clear() cmd.clear()
def hide_cursor() -> None: def hide_cursor() -> None:
'''Tell the terminal to hide the cursor.''' """Tell the terminal to hide the cursor."""
_write_stdout("\x1b[?25l") _write_stdout("\x1b[?25l")
def show_cursor() -> None: def show_cursor() -> None:
'''Tell the terminal to show the cursor.''' """Tell the terminal to show the cursor."""
_write_stdout("\x1b[?25h") _write_stdout("\x1b[?25h")
def set_position(y: int, x: int) -> None: def set_position(y: int, x: int) -> None:
'''Set the cursor position to y, x''' """Set the cursor position to y, x"""
_write_stdout(f"\x1b[{y};{x}H") _write_stdout(f"\x1b[{y};{x}H")
def _write_stdout(cmd: str) -> None: def _write_stdout(cmd: str) -> None:
sys.stdout.write(cmd) sys.stdout.write(cmd)
sys.stdout.flush() sys.stdout.flush()
def _query_terminal(escape: str, endchar: str) -> str: def _query_terminal(escape: str, endchar: str) -> str:
''' """
Send `escape` to the terminal, read the response until Send `escape` to the terminal, read the response until
`endchar`, return response (including `endchar`) `endchar`, return response (including `endchar`)
''' """
# Save the current terminal settings # Save the current terminal settings
fd = sys.stdin.fileno() fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd) old_settings = termios.tcgetattr(fd)
@ -100,9 +104,9 @@ def _query_terminal(escape: str, endchar: str) -> str:
response = "" response = ""
while True: while True:
ch = sys.stdin.read(1) char = sys.stdin.read(1)
response += ch response += char
if ch == endchar: if char == endchar:
break break
finally: finally:
@ -110,16 +114,16 @@ def _query_terminal(escape: str, endchar: str) -> str:
termios.tcsetattr(fd, termios.TCSANOW, old_settings) termios.tcsetattr(fd, termios.TCSANOW, old_settings)
return response return response
def get_position() -> tuple[int, int]: def get_position() -> tuple[int, int]:
'''Get the (y, x) position of the cursor''' """Get the (y, x) position of the cursor"""
reply = _query_terminal("\x1b[6n", "R") reply = _query_terminal("\x1b[6n", "R")
match = re.search(r"\[(\d+);(\d+)R", reply) match = re.search(r"\[(\d+);(\d+)R", reply)
if match: if match:
y, x = map(int, match.groups()) y, x = map(int, match.groups())
return y, x return y, x
else: raise ValueError("Failed to parse cursor position response")
raise ValueError("Failed to parse cursor position response")
if __name__ == "__main__": if __name__ == "__main__":
@ -130,16 +134,13 @@ if __name__ == "__main__":
height, width = get_terminal_size_pixel() height, width = get_terminal_size_pixel()
y, x = get_position() y, x = get_position()
print(f'Terminal has {rows} rows, {cols} cols = {rows * cols} cells') print(f"Terminal has {rows} rows, {cols} cols = {rows * cols} cells")
print(f'Cell size: {h_pix}x{v_pix} px') print(f"Cell size: {h_pix}x{v_pix} px")
print(f'Dimensions: {width}x{height} px') print(f"Dimensions: {width}x{height} px")
print('Cursor is at y, x:', get_position()) print("Cursor is at y, x:", get_position())
import time
time.sleep(2)
new_lines = int((height / v_pix) - 6)
new_lines = int((height/v_pix)-6) print("\n" * new_lines, end="")
print('\n' * new_lines, end='')
set_position(y - new_lines - 6, x) set_position(y - new_lines - 6, x)
@ -149,11 +150,4 @@ if __name__ == "__main__":
b = BytesIO() b = BytesIO()
pl.screenshot(b, transparent_background=True, window_size=(width, height)) pl.screenshot(b, transparent_background=True, window_size=(width, height))
# i = Image.new("RGB", (100, 100), (0, 0, 0))
# d = ImageDraw.Draw(i)
# d.ellipse([(5, 5), (95, 95)])
# d.ellipse([(10, 10), (90, 90)])
# d.line(((50, 0), (50, 100)))
# d.line(((0, 50), (100, 50)))
draw_to_terminal(b) draw_to_terminal(b)

70
terminalplotter.py Normal file
View File

@ -0,0 +1,70 @@
import asyncio
import io
import math
import select
import sys
import termios
import tty
import pyvista as pv
import kitty
class TerminalPlotter(pv.Plotter):
def __init__(self, width, height, **kwargs):
super().__init__(off_screen=True, window_size=(height, width), **kwargs)
self._running = True
h_pix, _ = kitty.get_terminal_cell_size()
self.rows, _ = kitty.get_terminal_size()
self.start_y, self.start_x = kitty.get_position()
# the image requires height/cell_height lines
self.needed_lines = math.ceil(height / h_pix)
self.set_background([0.0, 1.0, 0.0])
# if we are too close to the bottom of the terminal, create some space.
if self.rows - self.start_y < self.needed_lines:
self.set_background([1.0, 0.0, 0.0])
missing = self.needed_lines - (self.rows - self.start_y)
self.start_y -= missing
print("\n" * self.needed_lines, end="")
kitty.set_position(self.start_y, self.start_x)
def render_to_kitty(self):
self.render()
buf = io.BytesIO()
self.screenshot(buf, transparent_background=True)
kitty.draw_to_terminal(buf)
# print("y:", self.start_y, "rows:", self.rows, end="")
kitty.set_position(self.start_y, self.start_x)
async def _input_loop(self):
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setcbreak(fd)
while self._running:
rlist, _, _ = select.select([sys.stdin], [], [], 0)
if rlist:
c = sys.stdin.read(1)
if c == "q": # quit on q
self._running = False
else:
self._handle_key(c)
await asyncio.sleep(0.01)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
def _handle_key(self, c):
if c == "a":
self.camera.Azimuth(10)
elif c == "d":
self.camera.Azimuth(-10)
elif c == "w":
self.camera.Elevation(10)
elif c == "s":
self.camera.Elevation(-10)
self.render_to_kitty()
async def run(self):
self.render_to_kitty()
await self._input_loop()