diff --git a/kglobe.py b/kglobe.py index 1c4914a..d2528f1 100644 --- a/kglobe.py +++ b/kglobe.py @@ -1,8 +1,6 @@ #!/usr/bin/env python3 -from kitty import draw_to_terminal, get_position, set_position, hide_cursor, show_cursor #, draw_animation - -from PIL import Image +from kitty import draw_to_terminal, get_position, set_position, hide_cursor, show_cursor import pyvista as pv import numpy as np @@ -11,6 +9,12 @@ import subprocess import re import requests import argparse +from io import BytesIO + +# TODO: Color arches based on latency +# TODO: Interactive globe (spin w/ keys) +# TODO: Text info (num hops etc.) + # Convert lat/lon to Cartesian coordinates def latlon_to_xyz(lat, lon, radius=1.0): @@ -21,6 +25,7 @@ def latlon_to_xyz(lat, lon, radius=1.0): z = radius * np.sin(lat_rad) return np.array([x, y, z]) + # Create an arch between two 3D points def generate_arch(p1, p2, height_factor=0.2, n_points=100): # Normalize input points to lie on the unit sphere @@ -34,10 +39,12 @@ def generate_arch(p1, p2, height_factor=0.2, n_points=100): t = np.linspace(0, 1, n_points) sin_omega = np.sin(omega) - + # Spherical linear interpolation (slerp) - arch_points = (np.sin((1 - t)[:, None] * omega) * p1[None, :] + - np.sin(t[:, None] * omega) * p2[None, :]) / sin_omega + arch_points = ( + np.sin((1 - t)[:, None] * omega) * p1[None, :] + + np.sin(t[:, None] * omega) * p2[None, :] + ) / sin_omega # Add radial height offset based on sine curve heights = 1 + np.sin(np.pi * t) * height_factor @@ -45,76 +52,93 @@ def generate_arch(p1, p2, height_factor=0.2, n_points=100): return arch_points + def traceroute(target): # Run traceroute command - result = subprocess.run(['traceroute', '-n', target, '-q', '1', '-w', '1,3,10'], capture_output=True, text=True) - hops = re.findall(r'\n\s*\d+\s+([\d.]+)', result.stdout) + result = subprocess.run( + ["traceroute", "-n", target, "-q", "1", "-w", "1,3,10"], + capture_output=True, + text=True, + ) + hops = re.findall(r"\n\s*\d+\s+([\d.]+)", result.stdout) coords = [] for ip in hops: try: response = requests.get(f"http://ip-api.com/json/{ip}").json() - if response['status'] == 'success': - coords.append((response['lat'], response['lon'])) + if response["status"] == "success": + coords.append((response["lat"], response["lon"])) except Exception: continue return coords + def main(): parser = argparse.ArgumentParser( - prog='kglobe', - description='Traceroute on a globe', - epilog='Requires kitty graphics protocol support in terminal') + prog="kglobe", + description="Traceroute on a globe", + epilog="Requires kitty graphics protocol support in terminal", + ) - parser.add_argument('target') - parser.add_argument('-e', '--external', action='store_true') + parser.add_argument("target") + parser.add_argument("-e", "--external", action="store_true") args = parser.parse_args() locations = traceroute(args.target) - globe = pv.Sphere(radius=1.0, theta_resolution=120, phi_resolution=120, - start_theta=270.001, end_theta=270) + globe = pv.Sphere( + radius=1.0, + theta_resolution=120, + phi_resolution=120, + start_theta=270.001, + end_theta=270, + ) tex = pv.examples.load_globe_texture() globe.active_texture_coordinates = np.zeros((globe.points.shape[0], 2)) - globe.active_texture_coordinates[:, 0] = 0.5 + np.arctan2(globe.points[:, 1], globe.points[:, 0]) / (2 * math.pi) - globe.active_texture_coordinates[:, 1] = 0.5 + np.arcsin(globe.points[:, 2]) / math.pi + globe.active_texture_coordinates[:, 0] = 0.5 + np.arctan2( + globe.points[:, 1], globe.points[:, 0] + ) / (2 * math.pi) + globe.active_texture_coordinates[:, 1] = ( + 0.5 + np.arcsin(globe.points[:, 2]) / math.pi + ) # Convert to 3D coordinates points_3d = [latlon_to_xyz(lat, lon) for lat, lon in locations] - pl=pv.Plotter(off_screen=(not args.external)) - pl.add_mesh(globe, color='tan', smooth_shading=True, texture=tex, show_edges=False) + pl = pv.Plotter(off_screen=(not args.external)) + pl.add_mesh(globe, color="tan", smooth_shading=True, texture=tex, show_edges=False) for pt in points_3d: city_marker = pv.Sphere(center=pt, radius=0.02) - pl.add_mesh(city_marker, color='blue') + pl.add_mesh(city_marker, color="blue") for i in range(len(points_3d[:-1])): - arch = generate_arch(points_3d[i], points_3d[i+1], height_factor=0.2) + arch = generate_arch(points_3d[i], points_3d[i + 1], height_factor=0.2) line = pv.lines_from_points(arch, close=False) - pl.add_mesh(line, color='red', line_width=2) + pl.add_mesh(line, color="red", line_width=2) hide_cursor() y, x = get_position() - print('\n' * 25, end='') + print("\n" * 25, end="") frames = [] try: if not args.external: while True: pl.camera.Azimuth(1) - image = pl.screenshot(transparent_background=True, window_size=(512, 512)) - frames.append(Image.fromarray(image)) - set_position(y-25, x) - draw_to_terminal(Image.fromarray(image)) + buf: BytesIO = BytesIO() + pl.screenshot(buf, transparent_background=True, window_size=(512, 512)) + set_position(y - 25, x) + draw_to_terminal(buf) else: pl.show() finally: show_cursor() -if __name__ == '__main__': + +if __name__ == "__main__": main() diff --git a/kitty.py b/kitty.py index 6889396..b11029a 100755 --- a/kitty.py +++ b/kitty.py @@ -6,48 +6,50 @@ import tty from base64 import standard_b64encode from io import BytesIO -from PIL import Image, ImageDraw -def draw_to_terminal(img: Image.Image) -> None: - buffer: BytesIO = BytesIO() - img.save(buffer, format='PNG') - write_chunked(a='T', i=1, f=100, q=2, data=buffer.getvalue()) +def draw_to_terminal(buffer: BytesIO) -> None: + write_chunked(a="T", i=1, f=100, q=2, data=buffer.getvalue()) -def serialize_gr_command(**cmd): - payload = cmd.pop('payload', None) - cmd = ','.join(f'{k}={v}' for k, v in cmd.items()) + +def serialize_gr_command(**cmd) -> bytes: + payload = cmd.pop("payload", None) + cmd = ",".join(f"{k}={v}" for k, v in cmd.items()) ans = [] w = ans.append - w(b'\033_G'), w(cmd.encode('ascii')) + w(b"\033_G"), w(cmd.encode("ascii")) if payload: - w(b';') + w(b";") w(payload) - w(b'\033\\') - return b''.join(ans) + w(b"\033\\") + return b"".join(ans) -def write_chunked(**cmd): - data = standard_b64encode(cmd.pop('data')) + +def write_chunked(**cmd) -> None: + data = standard_b64encode(cmd.pop("data")) while data: chunk, data = data[:4096], data[4096:] m = 1 if data else 0 - sys.stdout.buffer.write(serialize_gr_command(payload=chunk, m=m, - **cmd)) + sys.stdout.buffer.write(serialize_gr_command(payload=chunk, m=m, **cmd)) sys.stdout.flush() cmd.clear() -def hide_cursor(): + +def hide_cursor() -> None: sys.stdout.write("\x1b[?25l") sys.stdout.flush() -def show_cursor(): + +def show_cursor() -> None: sys.stdout.write("\x1b[?25h") sys.stdout.flush() -def set_position(y, x): + +def set_position(y: int, x: int) -> None: sys.stdout.write(f"\x1b[{y};{x}H") sys.stdout.flush() -def get_position(): + +def get_position() -> tuple[int, int]: # Save the current terminal settings fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) @@ -55,17 +57,16 @@ def get_position(): try: # Set terminal to raw mode tty.setraw(fd) - # Send the ESC[6n command to request cursor position sys.stdout.write("\x1b[6n") sys.stdout.flush() # Read the response: ESC [ row ; col R - response = '' + response = "" while True: ch = sys.stdin.read(1) response += ch - if ch == 'R': + if ch == "R": break finally: @@ -73,33 +74,28 @@ def get_position(): termios.tcsetattr(fd, termios.TCSANOW, old_settings) # Parse the response using regex - match = re.search(r'\[(\d+);(\d+)R', response) + match = re.search(r"\[(\d+);(\d+)R", response) if match: y, x = map(int, match.groups()) return y, x else: raise ValueError("Failed to parse cursor position response") - -# sys.stdout.write("\x1b[6n") -# sys.stdout.flush() -# response = '' -# while True: -# ch = sys.stdin.read(1) -# response += ch -# if ch == 'R': -# break -# match = re.search(r'\[(\d+);(\d+)R', response) -# if match: -# y, x = map(int, match.groups()) -# return y, x -if __name__ == '__main__': - i = Image.new("RGB", (100, 100), (0, 0, 0)) - d = ImageDraw.Draw(i) - d.ellipse([(5, 5), (95, 95)]) - d.ellipse([(10, 10), (90, 90)]) - d.line(((50, 0), (50, 100))) - d.line(((0, 50), (100, 50))) +if __name__ == "__main__": + import pyvista as pv - draw_to_terminal(i) + s = pv.Sphere() + pl = pv.Plotter() + pl.add_mesh(s) + b = BytesIO() + pl.show() + pl.screenshot(b) + # i = Image.new("RGB", (100, 100), (0, 0, 0)) + # d = ImageDraw.Draw(i) + # d.ellipse([(5, 5), (95, 95)]) + # d.ellipse([(10, 10), (90, 90)]) + # d.line(((50, 0), (50, 100))) + # d.line(((0, 50), (100, 50))) + + draw_to_terminal(b)