204 lines
6.1 KiB
Python
204 lines
6.1 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import kitty
|
|
|
|
import pyvista as pv
|
|
import numpy as np
|
|
import math
|
|
import subprocess
|
|
import re
|
|
import requests
|
|
import argparse
|
|
from io import BytesIO
|
|
import asyncio
|
|
import sys
|
|
import termios
|
|
import tty
|
|
import select
|
|
|
|
# TODO: Color arches based on latency
|
|
# TODO: Interactive globe (spin w/ keys)
|
|
# TODO: Text info (num hops etc.)
|
|
# TODO: Image spacing
|
|
# TODO: Async rendering?
|
|
|
|
class TerminalPlotter(pv.Plotter):
|
|
def __init__(self, width, height, **kwargs):
|
|
super().__init__(off_screen=True, window_size=(height, width), **kwargs)
|
|
self._running = True
|
|
h_pix, _ = kitty.get_terminal_cell_size()
|
|
self.rows, _ = kitty.get_terminal_size()
|
|
self.start_y, self.start_x = kitty.get_position()
|
|
# the image requires height/cell_height lines
|
|
self.needed_lines = math.ceil(height/h_pix)
|
|
self.set_background([0.0, 1.0, 0.0])
|
|
#if self.start_y == self.rows:
|
|
if self.rows - self.start_y < self.needed_lines:
|
|
self.set_background([1.0, 0.0, 0.0])
|
|
missing = self.needed_lines - (self.rows - self.start_y)
|
|
self.start_y -= missing
|
|
#self.start_y -=
|
|
#print("\n" * self.needed_lines, end="")
|
|
print("\n" * self.needed_lines, end="")
|
|
kitty.set_position(self.start_y, self.start_x)
|
|
|
|
def render_to_kitty(self):
|
|
import io
|
|
self.render()
|
|
buf = io.BytesIO()
|
|
self.screenshot(buf, transparent_background=True)
|
|
kitty.draw_to_terminal(buf)
|
|
print('y:', self.start_y, 'rows:', self.rows, end='')
|
|
kitty.set_position(self.start_y, self.start_x)
|
|
|
|
async def _input_loop(self):
|
|
fd = sys.stdin.fileno()
|
|
old_settings = termios.tcgetattr(fd)
|
|
try:
|
|
tty.setcbreak(fd)
|
|
while self._running:
|
|
rlist, _, _ = select.select([sys.stdin], [], [], 0)
|
|
if rlist:
|
|
c = sys.stdin.read(1)
|
|
if c == 'q': # quit on q
|
|
self._running = False
|
|
else:
|
|
self._handle_key(c)
|
|
await asyncio.sleep(0.01)
|
|
finally:
|
|
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
|
|
|
|
def _handle_key(self, c):
|
|
# Simple interaction: rotate camera with a/s keys
|
|
if c == 'a':
|
|
self.camera.Azimuth(10)
|
|
elif c == 'd':
|
|
self.camera.Azimuth(-10)
|
|
elif c == 'w':
|
|
self.camera.Elevation(10)
|
|
elif c == 's':
|
|
self.camera.Elevation(-10)
|
|
self.render_to_kitty()
|
|
|
|
async def run(self):
|
|
self.render_to_kitty()
|
|
await self._input_loop()
|
|
|
|
# Convert lat/lon to Cartesian coordinates
|
|
def latlon_to_xyz(lat: float, lon: float, radius=1.0):
|
|
lat_rad = np.radians(lat)
|
|
lon_rad = np.radians(lon)
|
|
x = radius * np.cos(lat_rad) * np.cos(lon_rad)
|
|
y = radius * np.cos(lat_rad) * np.sin(lon_rad)
|
|
z = radius * np.sin(lat_rad)
|
|
return np.array([x, y, z])
|
|
|
|
|
|
# Create an arch between two 3D points
|
|
def generate_arch(p1, p2, height_factor=0.2, n_points=100):
|
|
# Normalize input points to lie on the unit sphere
|
|
p1 = p1 / np.linalg.norm(p1)
|
|
p2 = p2 / np.linalg.norm(p2)
|
|
|
|
# Compute angle between p1 and p2
|
|
omega = np.arccos(np.clip(np.dot(p1, p2), -1, 1))
|
|
if omega == 0:
|
|
return np.tile(p1, (n_points, 1)) # degenerate case
|
|
|
|
t = np.linspace(0, 1, n_points)
|
|
sin_omega = np.sin(omega)
|
|
|
|
# Spherical linear interpolation (slerp)
|
|
arch_points = (
|
|
np.sin((1 - t)[:, None] * omega) * p1[None, :]
|
|
+ np.sin(t[:, None] * omega) * p2[None, :]
|
|
) / sin_omega
|
|
|
|
# Add radial height offset based on sine curve
|
|
heights = 1 + np.sin(np.pi * t) * height_factor
|
|
arch_points *= heights[:, None] # Scale outward from center
|
|
|
|
return arch_points
|
|
|
|
|
|
def traceroute(target: str) -> list[tuple[int, int]]:
|
|
# Run traceroute command
|
|
result = subprocess.run(
|
|
["traceroute", "-n", target, "-q", "1", "-w", "1,3,10"],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
hops: list[str] = re.findall(r"\n\s*\d+\s+([\d.]+)", result.stdout)
|
|
|
|
coords: list[tuple[int, int]] = []
|
|
for ip in hops:
|
|
try:
|
|
response: dict = requests.get(f"http://ip-api.com/json/{ip}").json()
|
|
if response["status"] == "success":
|
|
coords.append((response["lat"], response["lon"]))
|
|
except Exception:
|
|
continue
|
|
return coords
|
|
|
|
|
|
def main():
|
|
|
|
parser = argparse.ArgumentParser(
|
|
prog="kglobe",
|
|
description="Traceroute on a globe",
|
|
epilog="Requires kitty graphics protocol support in terminal",
|
|
)
|
|
|
|
parser.add_argument("-t", "--traceroute", default=None)
|
|
|
|
args = parser.parse_args()
|
|
|
|
locations = []
|
|
if args.traceroute:
|
|
locations = traceroute(args.traceroute)
|
|
|
|
globe = pv.Sphere(
|
|
radius=1.0,
|
|
theta_resolution=120,
|
|
phi_resolution=120,
|
|
start_theta=270.001,
|
|
end_theta=270,
|
|
)
|
|
|
|
tex = pv.examples.load_globe_texture()
|
|
|
|
globe.active_texture_coordinates = np.zeros((globe.points.shape[0], 2))
|
|
|
|
globe.active_texture_coordinates[:, 0] = 0.5 + np.arctan2(
|
|
globe.points[:, 1], globe.points[:, 0]
|
|
) / (2 * math.pi)
|
|
globe.active_texture_coordinates[:, 1] = (
|
|
0.5 + np.arcsin(globe.points[:, 2]) / math.pi
|
|
)
|
|
|
|
# Convert to 3D coordinates
|
|
points_3d = [latlon_to_xyz(lat, lon) for lat, lon in locations]
|
|
|
|
#pl: pv.Plotter = pv.Plotter(off_screen=(not args.external))
|
|
height, width = kitty.get_terminal_size_pixel()
|
|
pl = TerminalPlotter(450, 450)
|
|
pl.add_mesh(globe, color="tan", smooth_shading=True, texture=tex, show_edges=False)
|
|
|
|
for pt in points_3d:
|
|
city_marker = pv.Sphere(center=pt, radius=0.02)
|
|
pl.add_mesh(city_marker, color="blue")
|
|
for i in range(len(points_3d[:-1])):
|
|
arch = generate_arch(points_3d[i], points_3d[i + 1], height_factor=0.2)
|
|
line = pv.lines_from_points(arch, close=False)
|
|
pl.add_mesh(line, color="red", line_width=2)
|
|
|
|
#kitty.hide_cursor()
|
|
try:
|
|
asyncio.run(pl.run())
|
|
finally:
|
|
kitty.show_cursor()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|