kglobe/kglobe.py

188 lines
5.2 KiB
Python

#!/usr/bin/env python3
import kitty
import pyvista as pv
import numpy as np
import math
import subprocess
import re
import requests
import argparse
import asyncio
import os
import IP2Location
from terminalplotter import TerminalPlotter
# TODO: Color arches based on latency
# TODO: Text info (num hops etc.)
# TODO: Mouse support
# DONE: Interactive globe (spin w/ keys)
# DONE: Image spacing
# DONE: Async rendering?
EXAMPLE_ROUTE = [
(47.996, 7.849), # freiburg
(50.110, 8.682), # ffm
(52.231, 21.006), # warsaw
(12.988, 77.622), # bangalore
(22.350, 114.184), # hong kong
(-33.869, 151.208), # sydney
]
# Convert lat/lon to Cartesian coordinates
def latlon_to_xyz(lat: float, lon: float, radius=1.0):
lat_rad = np.radians(lat)
lon_rad = np.radians(lon)
x = radius * np.cos(lat_rad) * np.cos(lon_rad)
y = radius * np.cos(lat_rad) * np.sin(lon_rad)
z = radius * np.sin(lat_rad)
return np.array([x, y, z])
# Create an arch between two 3D points
def generate_arch(p1, p2, height_factor=0.2, n_points=100):
# Normalize input points to lie on the unit sphere
p1 = p1 / np.linalg.norm(p1)
p2 = p2 / np.linalg.norm(p2)
# Compute angle between p1 and p2
omega = np.arccos(np.clip(np.dot(p1, p2), -1, 1))
if omega == 0:
return np.tile(p1, (n_points, 1)) # degenerate case
t = np.linspace(0, 1, n_points)
sin_omega = np.sin(omega)
# Spherical linear interpolation (slerp)
arch_points = (
np.sin((1 - t)[:, None] * omega) * p1[None, :]
+ np.sin(t[:, None] * omega) * p2[None, :]
) / sin_omega
# Add radial height offset based on sine curve
heights = 1 + np.sin(np.pi * t) * height_factor
arch_points *= heights[:, None] # Scale outward from center
return arch_points
def traceroute(target: str):
database = IP2Location.IP2Location("IP2LOCATION-LITE-DB5.BIN", "SHARED_MEMORY")
# Run traceroute command
result = subprocess.run(
["traceroute", "-n", target, "-q", "1", "-w", "1,3,10"],
capture_output=True,
text=True,
)
ips: list[str] = re.findall(r"\n\s*\d+\s+([\d.]+)\s+([\d.]+)", result.stdout)
hops = []
print(ips)
for ip, latency in ips:
if ip.startswith(
(
"10.",
"127.",
"169.254.",
"172.16.",
"172.17.",
"172.18.",
"172.19.",
"172.20.",
"172.21.",
"172.22.",
"172.23.",
"172.24.",
"172.25.",
"172.26.",
"172.27.",
"172.28.",
"172.29.",
"172.30.",
"172.30.",
"192.168.",
)
):
# exclude common local network addreses
continue
hop = database.get_all(ip)
hop.latency = latency
hops.append(hop)
return hops
async def main():
parser = argparse.ArgumentParser(
prog="kglobe",
description="Traceroute on a globe",
epilog="Requires kitty graphics protocol support in terminal",
)
parser.add_argument("-t", "--traceroute", default=None)
parser.add_argument("--example", action="store_true")
parser.add_argument("-s", "--size", nargs=2, type=int, help="width x height in px")
args = parser.parse_args()
locations = []
if args.traceroute:
locations = traceroute(args.traceroute)
globe = pv.Sphere(
radius=1.0,
theta_resolution=60,
phi_resolution=60,
start_theta=270.001,
end_theta=270,
)
tex = pv.examples.load_globe_texture()
globe.active_texture_coordinates = np.zeros((globe.points.shape[0], 2))
globe.active_texture_coordinates[:, 0] = 0.5 + np.arctan2(
globe.points[:, 1], globe.points[:, 0]
) / (2 * math.pi)
globe.active_texture_coordinates[:, 1] = (
0.5 + np.arcsin(globe.points[:, 2]) / math.pi
)
# Convert to 3D coordinates
if args.example:
points_3d = [latlon_to_xyz(lat, lon) for lat, lon in EXAMPLE_ROUTE]
else:
points_3d = [latlon_to_xyz(float(_.latitude), float(_.longitude)) for _ in locations]
if args.size:
width, height = args.size
else:
height, width = await kitty.get_terminal_size_pixel()
plotter = TerminalPlotter(width, height)
plotter.add_mesh(
globe, color="tan", smooth_shading=False, texture=tex, show_edges=False
)
for point in points_3d:
city_marker = pv.Sphere(center=point, radius=0.02)
plotter.add_mesh(city_marker, color="blue")
labels = [ f'{_.country_short}: {_.latency}ms' for _ in locations]
raised_points = [point * 1.1 for point in points_3d]
if raised_points:
plotter.add_point_labels(raised_points, labels, point_size=0, font_size=14)
for i in range(len(points_3d[:-1])):
arch = generate_arch(points_3d[i], points_3d[i + 1], height_factor=0.2)
line = pv.lines_from_points(arch, close=False)
plotter.add_mesh(line, color="red", line_width=2)
kitty.hide_cursor()
try:
await plotter.run()
finally:
kitty.show_cursor()
if __name__ == "__main__":
asyncio.run(main())