Compare commits

...

8 Commits

Author SHA1 Message Date
Felix Pankratz
70d2029a9f smart newlines 2025-07-21 15:31:56 +02:00
Felix Pankratz
868820a955 todos 2025-07-21 15:02:21 +02:00
Felix Pankratz
fd3dff1ebb target optional 2025-07-21 14:55:27 +02:00
Felix Pankratz
2d22c99c92 docstrings 2025-07-21 14:42:28 +02:00
Felix Pankratz
2a9ab40739 black 2025-07-21 14:15:59 +02:00
Felix Pankratz
51a30d5147 terminal query functions, refactoring 2025-07-21 14:15:46 +02:00
Felix Pankratz
037ce01efd type hints 2025-07-21 11:09:18 +02:00
Felix Pankratz
4e6ff22dba remove PIL images, Black formatting 2025-07-21 10:42:23 +02:00
2 changed files with 181 additions and 97 deletions

113
kglobe.py
View File

@ -1,8 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from kitty import draw_to_terminal, get_position, set_position, hide_cursor, show_cursor #, draw_animation import kitty
from PIL import Image
import pyvista as pv import pyvista as pv
import numpy as np import numpy as np
@ -11,9 +9,17 @@ import subprocess
import re import re
import requests import requests
import argparse import argparse
from io import BytesIO
# TODO: Color arches based on latency
# TODO: Interactive globe (spin w/ keys)
# TODO: Text info (num hops etc.)
# TODO: Image spacing
# TODO: Async rendering?
# Convert lat/lon to Cartesian coordinates # Convert lat/lon to Cartesian coordinates
def latlon_to_xyz(lat, lon, radius=1.0): def latlon_to_xyz(lat: float, lon: float, radius=1.0):
lat_rad = np.radians(lat) lat_rad = np.radians(lat)
lon_rad = np.radians(lon) lon_rad = np.radians(lon)
x = radius * np.cos(lat_rad) * np.cos(lon_rad) x = radius * np.cos(lat_rad) * np.cos(lon_rad)
@ -21,6 +27,7 @@ def latlon_to_xyz(lat, lon, radius=1.0):
z = radius * np.sin(lat_rad) z = radius * np.sin(lat_rad)
return np.array([x, y, z]) return np.array([x, y, z])
# Create an arch between two 3D points # Create an arch between two 3D points
def generate_arch(p1, p2, height_factor=0.2, n_points=100): def generate_arch(p1, p2, height_factor=0.2, n_points=100):
# Normalize input points to lie on the unit sphere # Normalize input points to lie on the unit sphere
@ -34,10 +41,12 @@ def generate_arch(p1, p2, height_factor=0.2, n_points=100):
t = np.linspace(0, 1, n_points) t = np.linspace(0, 1, n_points)
sin_omega = np.sin(omega) sin_omega = np.sin(omega)
# Spherical linear interpolation (slerp) # Spherical linear interpolation (slerp)
arch_points = (np.sin((1 - t)[:, None] * omega) * p1[None, :] + arch_points = (
np.sin(t[:, None] * omega) * p2[None, :]) / sin_omega np.sin((1 - t)[:, None] * omega) * p1[None, :]
+ np.sin(t[:, None] * omega) * p2[None, :]
) / sin_omega
# Add radial height offset based on sine curve # Add radial height offset based on sine curve
heights = 1 + np.sin(np.pi * t) * height_factor heights = 1 + np.sin(np.pi * t) * height_factor
@ -45,76 +54,100 @@ def generate_arch(p1, p2, height_factor=0.2, n_points=100):
return arch_points return arch_points
def traceroute(target):
# Run traceroute command
result = subprocess.run(['traceroute', '-n', target, '-q', '1', '-w', '1,3,10'], capture_output=True, text=True)
hops = re.findall(r'\n\s*\d+\s+([\d.]+)', result.stdout)
coords = [] def traceroute(target: str) -> list[tuple[int, int]]:
# Run traceroute command
result = subprocess.run(
["traceroute", "-n", target, "-q", "1", "-w", "1,3,10"],
capture_output=True,
text=True,
)
hops: list[str] = re.findall(r"\n\s*\d+\s+([\d.]+)", result.stdout)
coords: list[tuple[int, int]] = []
for ip in hops: for ip in hops:
try: try:
response = requests.get(f"http://ip-api.com/json/{ip}").json() response: dict = requests.get(f"http://ip-api.com/json/{ip}").json()
if response['status'] == 'success': if response["status"] == "success":
coords.append((response['lat'], response['lon'])) coords.append((response["lat"], response["lon"]))
except Exception: except Exception:
continue continue
return coords return coords
def main(): def main():
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog='kglobe', prog="kglobe",
description='Traceroute on a globe', description="Traceroute on a globe",
epilog='Requires kitty graphics protocol support in terminal') epilog="Requires kitty graphics protocol support in terminal",
)
parser.add_argument('target') parser.add_argument("-t", "--traceroute", default=None)
parser.add_argument('-e', '--external', action='store_true') parser.add_argument("-e", "--external", action="store_true")
args = parser.parse_args() args = parser.parse_args()
locations = traceroute(args.target) locations = []
if args.traceroute:
locations = traceroute(args.traceroute)
globe = pv.Sphere(radius=1.0, theta_resolution=120, phi_resolution=120, globe = pv.Sphere(
start_theta=270.001, end_theta=270) radius=1.0,
theta_resolution=120,
phi_resolution=120,
start_theta=270.001,
end_theta=270,
)
tex = pv.examples.load_globe_texture() tex = pv.examples.load_globe_texture()
globe.active_texture_coordinates = np.zeros((globe.points.shape[0], 2)) globe.active_texture_coordinates = np.zeros((globe.points.shape[0], 2))
globe.active_texture_coordinates[:, 0] = 0.5 + np.arctan2(globe.points[:, 1], globe.points[:, 0]) / (2 * math.pi) globe.active_texture_coordinates[:, 0] = 0.5 + np.arctan2(
globe.active_texture_coordinates[:, 1] = 0.5 + np.arcsin(globe.points[:, 2]) / math.pi globe.points[:, 1], globe.points[:, 0]
) / (2 * math.pi)
globe.active_texture_coordinates[:, 1] = (
0.5 + np.arcsin(globe.points[:, 2]) / math.pi
)
# Convert to 3D coordinates # Convert to 3D coordinates
points_3d = [latlon_to_xyz(lat, lon) for lat, lon in locations] points_3d = [latlon_to_xyz(lat, lon) for lat, lon in locations]
pl=pv.Plotter(off_screen=(not args.external)) pl: pv.Plotter = pv.Plotter(off_screen=(not args.external))
pl.add_mesh(globe, color='tan', smooth_shading=True, texture=tex, show_edges=False) pl.add_mesh(globe, color="tan", smooth_shading=True, texture=tex, show_edges=False)
for pt in points_3d: for pt in points_3d:
city_marker = pv.Sphere(center=pt, radius=0.02) city_marker = pv.Sphere(center=pt, radius=0.02)
pl.add_mesh(city_marker, color='blue') pl.add_mesh(city_marker, color="blue")
for i in range(len(points_3d[:-1])): for i in range(len(points_3d[:-1])):
arch = generate_arch(points_3d[i], points_3d[i+1], height_factor=0.2) arch = generate_arch(points_3d[i], points_3d[i + 1], height_factor=0.2)
line = pv.lines_from_points(arch, close=False) line = pv.lines_from_points(arch, close=False)
pl.add_mesh(line, color='red', line_width=2) pl.add_mesh(line, color="red", line_width=2)
hide_cursor() kitty.hide_cursor()
y, x = get_position() y, x = kitty.get_position()
print('\n' * 25, end='')
height, width = kitty.get_terminal_size_pixel()
h_pix, w_pix = kitty.get_terminal_cell_size()
# the image requires height/cell_height lines
needed_lines = round(height/h_pix)
print("\n" * needed_lines, end="")
frames = []
try: try:
if not args.external: if not args.external:
while True: while True:
pl.camera.Azimuth(1) pl.camera.Azimuth(1)
image = pl.screenshot(transparent_background=True, window_size=(512, 512)) buf: BytesIO = BytesIO()
frames.append(Image.fromarray(image)) pl.screenshot(buf, transparent_background=True, window_size=(height, width))
set_position(y-25, x) kitty.set_position(y - needed_lines, 0)
draw_to_terminal(Image.fromarray(image)) kitty.draw_to_terminal(buf)
else: else:
pl.show() pl.show()
finally: finally:
show_cursor() kitty.show_cursor()
if __name__ == '__main__':
if __name__ == "__main__":
main() main()

165
kitty.py
View File

@ -5,49 +5,90 @@ import termios
import tty import tty
from base64 import standard_b64encode from base64 import standard_b64encode
from io import BytesIO from io import BytesIO
import array
import fcntl
from PIL import Image, ImageDraw
def draw_to_terminal(img: Image.Image) -> None: def draw_to_terminal(buffer: BytesIO) -> None:
buffer: BytesIO = BytesIO() '''Display a PNG image in the terminal.'''
img.save(buffer, format='PNG') write_chunked(a="T", i=1, f=100, q=2, data=buffer.getvalue())
write_chunked(a='T', i=1, f=100, q=2, data=buffer.getvalue())
def serialize_gr_command(**cmd): def get_terminal_cell_size() -> tuple[int, int]:
payload = cmd.pop('payload', None) '''Get (height, width) of a single cell in px'''
cmd = ','.join(f'{k}={v}' for k, v in cmd.items()) reply = _query_terminal("\x1b[16t", "t")
match = re.search(r"\[6;(\d+);(\d+)t", reply)
if match:
v_pix, h_pix = map(int, match.groups())
return v_pix, h_pix
else:
print(reply)
raise ValueError("Failed to parse terminal cell size response")
def get_terminal_size_pixel() -> tuple[int, int]:
'''Get (height, width) of the terminal in px'''
reply = _query_terminal("\x1b[14t", "t")
match = re.search(r"\[4;(\d+);(\d+)t", reply)
if match:
height, width = map(int, match.groups())
return height, width
else:
raise ValueError("Failed to parse terminal pixel size response")
def get_terminal_size() -> tuple[int, int]:
'''Get (rows, cols) of the terminal'''
buf = array.array('H', [0, 0, 0, 0])
fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf)
rows, cols, width, height = buf
return (rows, cols)
def serialize_gr_command(**cmd) -> bytes:
payload = cmd.pop("payload", None)
cmd = ",".join(f"{k}={v}" for k, v in cmd.items())
ans = [] ans = []
w = ans.append w = ans.append
w(b'\033_G'), w(cmd.encode('ascii')) w(b"\033_G"), w(cmd.encode("ascii"))
if payload: if payload:
w(b';') w(b";")
w(payload) w(payload)
w(b'\033\\') w(b"\033\\")
return b''.join(ans) return b"".join(ans)
def write_chunked(**cmd):
data = standard_b64encode(cmd.pop('data')) def write_chunked(**cmd) -> None:
data = standard_b64encode(cmd.pop("data"))
while data: while data:
chunk, data = data[:4096], data[4096:] chunk, data = data[:4096], data[4096:]
m = 1 if data else 0 m = 1 if data else 0
sys.stdout.buffer.write(serialize_gr_command(payload=chunk, m=m, sys.stdout.buffer.write(serialize_gr_command(payload=chunk, m=m, **cmd))
**cmd))
sys.stdout.flush() sys.stdout.flush()
cmd.clear() cmd.clear()
def hide_cursor():
sys.stdout.write("\x1b[?25l") def hide_cursor() -> None:
'''Tell the terminal to hide the cursor.'''
_write_stdout("\x1b[?25l")
def show_cursor() -> None:
'''Tell the terminal to show the cursor.'''
_write_stdout("\x1b[?25h")
def set_position(y: int, x: int) -> None:
'''Set the cursor position to y, x'''
_write_stdout(f"\x1b[{y};{x}H")
def _write_stdout(cmd: str) -> None:
sys.stdout.write(cmd)
sys.stdout.flush() sys.stdout.flush()
def show_cursor(): def _query_terminal(escape: str, endchar: str) -> str:
sys.stdout.write("\x1b[?25h") '''
sys.stdout.flush() Send `escape` to the terminal, read the response until
`endchar`, return response (including `endchar`)
def set_position(y, x): '''
sys.stdout.write(f"\x1b[{y};{x}H")
sys.stdout.flush()
def get_position():
# Save the current terminal settings # Save the current terminal settings
fd = sys.stdin.fileno() fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd) old_settings = termios.tcgetattr(fd)
@ -55,51 +96,61 @@ def get_position():
try: try:
# Set terminal to raw mode # Set terminal to raw mode
tty.setraw(fd) tty.setraw(fd)
_write_stdout(escape)
# Send the ESC[6n command to request cursor position
sys.stdout.write("\x1b[6n")
sys.stdout.flush()
# Read the response: ESC [ row ; col R response = ""
response = ''
while True: while True:
ch = sys.stdin.read(1) ch = sys.stdin.read(1)
response += ch response += ch
if ch == 'R': if ch == endchar:
break break
finally: finally:
# Restore the terminal settings # Restore the terminal settings
termios.tcsetattr(fd, termios.TCSANOW, old_settings) termios.tcsetattr(fd, termios.TCSANOW, old_settings)
return response
# Parse the response using regex def get_position() -> tuple[int, int]:
match = re.search(r'\[(\d+);(\d+)R', response) '''Get the (y, x) position of the cursor'''
reply = _query_terminal("\x1b[6n", "R")
match = re.search(r"\[(\d+);(\d+)R", reply)
if match: if match:
y, x = map(int, match.groups()) y, x = map(int, match.groups())
return y, x return y, x
else: else:
raise ValueError("Failed to parse cursor position response") raise ValueError("Failed to parse cursor position response")
# sys.stdout.write("\x1b[6n")
# sys.stdout.flush()
# response = ''
# while True:
# ch = sys.stdin.read(1)
# response += ch
# if ch == 'R':
# break
# match = re.search(r'\[(\d+);(\d+)R', response)
# if match:
# y, x = map(int, match.groups())
# return y, x
if __name__ == '__main__': if __name__ == "__main__":
i = Image.new("RGB", (100, 100), (0, 0, 0)) import pyvista as pv
d = ImageDraw.Draw(i)
d.ellipse([(5, 5), (95, 95)])
d.ellipse([(10, 10), (90, 90)])
d.line(((50, 0), (50, 100)))
d.line(((0, 50), (100, 50)))
draw_to_terminal(i) cols, rows = get_terminal_size()
v_pix, h_pix = get_terminal_cell_size()
height, width = get_terminal_size_pixel()
y, x = get_position()
print(f'Terminal has {rows} rows, {cols} cols = {rows * cols} cells')
print(f'Cell size: {h_pix}x{v_pix} px')
print(f'Dimensions: {width}x{height} px')
new_lines = int((height/v_pix)-6)
print('\n' * new_lines, end='')
set_position(y - new_lines - 6, x)
s = pv.Sphere()
pl = pv.Plotter(off_screen=True)
pl.add_mesh(s)
b = BytesIO()
pl.screenshot(b, transparent_background=True, window_size=(width, height))
# i = Image.new("RGB", (100, 100), (0, 0, 0))
# d = ImageDraw.Draw(i)
# d.ellipse([(5, 5), (95, 95)])
# d.ellipse([(10, 10), (90, 90)])
# d.line(((50, 0), (50, 100)))
# d.line(((0, 50), (100, 50)))
draw_to_terminal(b)