Compare commits
8 Commits
770f9c077c
...
70d2029a9f
Author | SHA1 | Date | |
---|---|---|---|
|
70d2029a9f | ||
|
868820a955 | ||
|
fd3dff1ebb | ||
|
2d22c99c92 | ||
|
2a9ab40739 | ||
|
51a30d5147 | ||
|
037ce01efd | ||
|
4e6ff22dba |
113
kglobe.py
113
kglobe.py
@ -1,8 +1,6 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
from kitty import draw_to_terminal, get_position, set_position, hide_cursor, show_cursor #, draw_animation
|
import kitty
|
||||||
|
|
||||||
from PIL import Image
|
|
||||||
|
|
||||||
import pyvista as pv
|
import pyvista as pv
|
||||||
import numpy as np
|
import numpy as np
|
||||||
@ -11,9 +9,17 @@ import subprocess
|
|||||||
import re
|
import re
|
||||||
import requests
|
import requests
|
||||||
import argparse
|
import argparse
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
|
# TODO: Color arches based on latency
|
||||||
|
# TODO: Interactive globe (spin w/ keys)
|
||||||
|
# TODO: Text info (num hops etc.)
|
||||||
|
# TODO: Image spacing
|
||||||
|
# TODO: Async rendering?
|
||||||
|
|
||||||
|
|
||||||
# Convert lat/lon to Cartesian coordinates
|
# Convert lat/lon to Cartesian coordinates
|
||||||
def latlon_to_xyz(lat, lon, radius=1.0):
|
def latlon_to_xyz(lat: float, lon: float, radius=1.0):
|
||||||
lat_rad = np.radians(lat)
|
lat_rad = np.radians(lat)
|
||||||
lon_rad = np.radians(lon)
|
lon_rad = np.radians(lon)
|
||||||
x = radius * np.cos(lat_rad) * np.cos(lon_rad)
|
x = radius * np.cos(lat_rad) * np.cos(lon_rad)
|
||||||
@ -21,6 +27,7 @@ def latlon_to_xyz(lat, lon, radius=1.0):
|
|||||||
z = radius * np.sin(lat_rad)
|
z = radius * np.sin(lat_rad)
|
||||||
return np.array([x, y, z])
|
return np.array([x, y, z])
|
||||||
|
|
||||||
|
|
||||||
# Create an arch between two 3D points
|
# Create an arch between two 3D points
|
||||||
def generate_arch(p1, p2, height_factor=0.2, n_points=100):
|
def generate_arch(p1, p2, height_factor=0.2, n_points=100):
|
||||||
# Normalize input points to lie on the unit sphere
|
# Normalize input points to lie on the unit sphere
|
||||||
@ -34,10 +41,12 @@ def generate_arch(p1, p2, height_factor=0.2, n_points=100):
|
|||||||
|
|
||||||
t = np.linspace(0, 1, n_points)
|
t = np.linspace(0, 1, n_points)
|
||||||
sin_omega = np.sin(omega)
|
sin_omega = np.sin(omega)
|
||||||
|
|
||||||
# Spherical linear interpolation (slerp)
|
# Spherical linear interpolation (slerp)
|
||||||
arch_points = (np.sin((1 - t)[:, None] * omega) * p1[None, :] +
|
arch_points = (
|
||||||
np.sin(t[:, None] * omega) * p2[None, :]) / sin_omega
|
np.sin((1 - t)[:, None] * omega) * p1[None, :]
|
||||||
|
+ np.sin(t[:, None] * omega) * p2[None, :]
|
||||||
|
) / sin_omega
|
||||||
|
|
||||||
# Add radial height offset based on sine curve
|
# Add radial height offset based on sine curve
|
||||||
heights = 1 + np.sin(np.pi * t) * height_factor
|
heights = 1 + np.sin(np.pi * t) * height_factor
|
||||||
@ -45,76 +54,100 @@ def generate_arch(p1, p2, height_factor=0.2, n_points=100):
|
|||||||
|
|
||||||
return arch_points
|
return arch_points
|
||||||
|
|
||||||
def traceroute(target):
|
|
||||||
# Run traceroute command
|
|
||||||
result = subprocess.run(['traceroute', '-n', target, '-q', '1', '-w', '1,3,10'], capture_output=True, text=True)
|
|
||||||
hops = re.findall(r'\n\s*\d+\s+([\d.]+)', result.stdout)
|
|
||||||
|
|
||||||
coords = []
|
def traceroute(target: str) -> list[tuple[int, int]]:
|
||||||
|
# Run traceroute command
|
||||||
|
result = subprocess.run(
|
||||||
|
["traceroute", "-n", target, "-q", "1", "-w", "1,3,10"],
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
)
|
||||||
|
hops: list[str] = re.findall(r"\n\s*\d+\s+([\d.]+)", result.stdout)
|
||||||
|
|
||||||
|
coords: list[tuple[int, int]] = []
|
||||||
for ip in hops:
|
for ip in hops:
|
||||||
try:
|
try:
|
||||||
response = requests.get(f"http://ip-api.com/json/{ip}").json()
|
response: dict = requests.get(f"http://ip-api.com/json/{ip}").json()
|
||||||
if response['status'] == 'success':
|
if response["status"] == "success":
|
||||||
coords.append((response['lat'], response['lon']))
|
coords.append((response["lat"], response["lon"]))
|
||||||
except Exception:
|
except Exception:
|
||||||
continue
|
continue
|
||||||
return coords
|
return coords
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
prog='kglobe',
|
prog="kglobe",
|
||||||
description='Traceroute on a globe',
|
description="Traceroute on a globe",
|
||||||
epilog='Requires kitty graphics protocol support in terminal')
|
epilog="Requires kitty graphics protocol support in terminal",
|
||||||
|
)
|
||||||
|
|
||||||
parser.add_argument('target')
|
parser.add_argument("-t", "--traceroute", default=None)
|
||||||
parser.add_argument('-e', '--external', action='store_true')
|
parser.add_argument("-e", "--external", action="store_true")
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
locations = traceroute(args.target)
|
locations = []
|
||||||
|
if args.traceroute:
|
||||||
|
locations = traceroute(args.traceroute)
|
||||||
|
|
||||||
globe = pv.Sphere(radius=1.0, theta_resolution=120, phi_resolution=120,
|
globe = pv.Sphere(
|
||||||
start_theta=270.001, end_theta=270)
|
radius=1.0,
|
||||||
|
theta_resolution=120,
|
||||||
|
phi_resolution=120,
|
||||||
|
start_theta=270.001,
|
||||||
|
end_theta=270,
|
||||||
|
)
|
||||||
|
|
||||||
tex = pv.examples.load_globe_texture()
|
tex = pv.examples.load_globe_texture()
|
||||||
|
|
||||||
globe.active_texture_coordinates = np.zeros((globe.points.shape[0], 2))
|
globe.active_texture_coordinates = np.zeros((globe.points.shape[0], 2))
|
||||||
|
|
||||||
globe.active_texture_coordinates[:, 0] = 0.5 + np.arctan2(globe.points[:, 1], globe.points[:, 0]) / (2 * math.pi)
|
globe.active_texture_coordinates[:, 0] = 0.5 + np.arctan2(
|
||||||
globe.active_texture_coordinates[:, 1] = 0.5 + np.arcsin(globe.points[:, 2]) / math.pi
|
globe.points[:, 1], globe.points[:, 0]
|
||||||
|
) / (2 * math.pi)
|
||||||
|
globe.active_texture_coordinates[:, 1] = (
|
||||||
|
0.5 + np.arcsin(globe.points[:, 2]) / math.pi
|
||||||
|
)
|
||||||
|
|
||||||
# Convert to 3D coordinates
|
# Convert to 3D coordinates
|
||||||
points_3d = [latlon_to_xyz(lat, lon) for lat, lon in locations]
|
points_3d = [latlon_to_xyz(lat, lon) for lat, lon in locations]
|
||||||
|
|
||||||
pl=pv.Plotter(off_screen=(not args.external))
|
pl: pv.Plotter = pv.Plotter(off_screen=(not args.external))
|
||||||
pl.add_mesh(globe, color='tan', smooth_shading=True, texture=tex, show_edges=False)
|
pl.add_mesh(globe, color="tan", smooth_shading=True, texture=tex, show_edges=False)
|
||||||
|
|
||||||
for pt in points_3d:
|
for pt in points_3d:
|
||||||
city_marker = pv.Sphere(center=pt, radius=0.02)
|
city_marker = pv.Sphere(center=pt, radius=0.02)
|
||||||
pl.add_mesh(city_marker, color='blue')
|
pl.add_mesh(city_marker, color="blue")
|
||||||
for i in range(len(points_3d[:-1])):
|
for i in range(len(points_3d[:-1])):
|
||||||
arch = generate_arch(points_3d[i], points_3d[i+1], height_factor=0.2)
|
arch = generate_arch(points_3d[i], points_3d[i + 1], height_factor=0.2)
|
||||||
line = pv.lines_from_points(arch, close=False)
|
line = pv.lines_from_points(arch, close=False)
|
||||||
pl.add_mesh(line, color='red', line_width=2)
|
pl.add_mesh(line, color="red", line_width=2)
|
||||||
|
|
||||||
hide_cursor()
|
kitty.hide_cursor()
|
||||||
y, x = get_position()
|
y, x = kitty.get_position()
|
||||||
print('\n' * 25, end='')
|
|
||||||
|
height, width = kitty.get_terminal_size_pixel()
|
||||||
|
h_pix, w_pix = kitty.get_terminal_cell_size()
|
||||||
|
|
||||||
|
# the image requires height/cell_height lines
|
||||||
|
needed_lines = round(height/h_pix)
|
||||||
|
print("\n" * needed_lines, end="")
|
||||||
|
|
||||||
frames = []
|
|
||||||
try:
|
try:
|
||||||
if not args.external:
|
if not args.external:
|
||||||
while True:
|
while True:
|
||||||
pl.camera.Azimuth(1)
|
pl.camera.Azimuth(1)
|
||||||
image = pl.screenshot(transparent_background=True, window_size=(512, 512))
|
buf: BytesIO = BytesIO()
|
||||||
frames.append(Image.fromarray(image))
|
pl.screenshot(buf, transparent_background=True, window_size=(height, width))
|
||||||
set_position(y-25, x)
|
kitty.set_position(y - needed_lines, 0)
|
||||||
draw_to_terminal(Image.fromarray(image))
|
kitty.draw_to_terminal(buf)
|
||||||
else:
|
else:
|
||||||
pl.show()
|
pl.show()
|
||||||
finally:
|
finally:
|
||||||
show_cursor()
|
kitty.show_cursor()
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
165
kitty.py
165
kitty.py
@ -5,49 +5,90 @@ import termios
|
|||||||
import tty
|
import tty
|
||||||
from base64 import standard_b64encode
|
from base64 import standard_b64encode
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
|
import array
|
||||||
|
import fcntl
|
||||||
|
|
||||||
from PIL import Image, ImageDraw
|
|
||||||
|
|
||||||
def draw_to_terminal(img: Image.Image) -> None:
|
def draw_to_terminal(buffer: BytesIO) -> None:
|
||||||
buffer: BytesIO = BytesIO()
|
'''Display a PNG image in the terminal.'''
|
||||||
img.save(buffer, format='PNG')
|
write_chunked(a="T", i=1, f=100, q=2, data=buffer.getvalue())
|
||||||
write_chunked(a='T', i=1, f=100, q=2, data=buffer.getvalue())
|
|
||||||
|
|
||||||
def serialize_gr_command(**cmd):
|
def get_terminal_cell_size() -> tuple[int, int]:
|
||||||
payload = cmd.pop('payload', None)
|
'''Get (height, width) of a single cell in px'''
|
||||||
cmd = ','.join(f'{k}={v}' for k, v in cmd.items())
|
reply = _query_terminal("\x1b[16t", "t")
|
||||||
|
|
||||||
|
match = re.search(r"\[6;(\d+);(\d+)t", reply)
|
||||||
|
if match:
|
||||||
|
v_pix, h_pix = map(int, match.groups())
|
||||||
|
return v_pix, h_pix
|
||||||
|
else:
|
||||||
|
print(reply)
|
||||||
|
raise ValueError("Failed to parse terminal cell size response")
|
||||||
|
|
||||||
|
def get_terminal_size_pixel() -> tuple[int, int]:
|
||||||
|
'''Get (height, width) of the terminal in px'''
|
||||||
|
reply = _query_terminal("\x1b[14t", "t")
|
||||||
|
|
||||||
|
match = re.search(r"\[4;(\d+);(\d+)t", reply)
|
||||||
|
if match:
|
||||||
|
height, width = map(int, match.groups())
|
||||||
|
return height, width
|
||||||
|
else:
|
||||||
|
raise ValueError("Failed to parse terminal pixel size response")
|
||||||
|
|
||||||
|
def get_terminal_size() -> tuple[int, int]:
|
||||||
|
'''Get (rows, cols) of the terminal'''
|
||||||
|
buf = array.array('H', [0, 0, 0, 0])
|
||||||
|
fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, buf)
|
||||||
|
rows, cols, width, height = buf
|
||||||
|
return (rows, cols)
|
||||||
|
|
||||||
|
def serialize_gr_command(**cmd) -> bytes:
|
||||||
|
payload = cmd.pop("payload", None)
|
||||||
|
cmd = ",".join(f"{k}={v}" for k, v in cmd.items())
|
||||||
ans = []
|
ans = []
|
||||||
w = ans.append
|
w = ans.append
|
||||||
w(b'\033_G'), w(cmd.encode('ascii'))
|
w(b"\033_G"), w(cmd.encode("ascii"))
|
||||||
if payload:
|
if payload:
|
||||||
w(b';')
|
w(b";")
|
||||||
w(payload)
|
w(payload)
|
||||||
w(b'\033\\')
|
w(b"\033\\")
|
||||||
return b''.join(ans)
|
return b"".join(ans)
|
||||||
|
|
||||||
def write_chunked(**cmd):
|
|
||||||
data = standard_b64encode(cmd.pop('data'))
|
def write_chunked(**cmd) -> None:
|
||||||
|
data = standard_b64encode(cmd.pop("data"))
|
||||||
while data:
|
while data:
|
||||||
chunk, data = data[:4096], data[4096:]
|
chunk, data = data[:4096], data[4096:]
|
||||||
m = 1 if data else 0
|
m = 1 if data else 0
|
||||||
sys.stdout.buffer.write(serialize_gr_command(payload=chunk, m=m,
|
sys.stdout.buffer.write(serialize_gr_command(payload=chunk, m=m, **cmd))
|
||||||
**cmd))
|
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
cmd.clear()
|
cmd.clear()
|
||||||
|
|
||||||
def hide_cursor():
|
|
||||||
sys.stdout.write("\x1b[?25l")
|
def hide_cursor() -> None:
|
||||||
|
'''Tell the terminal to hide the cursor.'''
|
||||||
|
_write_stdout("\x1b[?25l")
|
||||||
|
|
||||||
|
|
||||||
|
def show_cursor() -> None:
|
||||||
|
'''Tell the terminal to show the cursor.'''
|
||||||
|
_write_stdout("\x1b[?25h")
|
||||||
|
|
||||||
|
|
||||||
|
def set_position(y: int, x: int) -> None:
|
||||||
|
'''Set the cursor position to y, x'''
|
||||||
|
_write_stdout(f"\x1b[{y};{x}H")
|
||||||
|
|
||||||
|
def _write_stdout(cmd: str) -> None:
|
||||||
|
sys.stdout.write(cmd)
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
|
|
||||||
def show_cursor():
|
def _query_terminal(escape: str, endchar: str) -> str:
|
||||||
sys.stdout.write("\x1b[?25h")
|
'''
|
||||||
sys.stdout.flush()
|
Send `escape` to the terminal, read the response until
|
||||||
|
`endchar`, return response (including `endchar`)
|
||||||
def set_position(y, x):
|
'''
|
||||||
sys.stdout.write(f"\x1b[{y};{x}H")
|
|
||||||
sys.stdout.flush()
|
|
||||||
|
|
||||||
def get_position():
|
|
||||||
# Save the current terminal settings
|
# Save the current terminal settings
|
||||||
fd = sys.stdin.fileno()
|
fd = sys.stdin.fileno()
|
||||||
old_settings = termios.tcgetattr(fd)
|
old_settings = termios.tcgetattr(fd)
|
||||||
@ -55,51 +96,61 @@ def get_position():
|
|||||||
try:
|
try:
|
||||||
# Set terminal to raw mode
|
# Set terminal to raw mode
|
||||||
tty.setraw(fd)
|
tty.setraw(fd)
|
||||||
|
_write_stdout(escape)
|
||||||
# Send the ESC[6n command to request cursor position
|
|
||||||
sys.stdout.write("\x1b[6n")
|
|
||||||
sys.stdout.flush()
|
|
||||||
|
|
||||||
# Read the response: ESC [ row ; col R
|
response = ""
|
||||||
response = ''
|
|
||||||
while True:
|
while True:
|
||||||
ch = sys.stdin.read(1)
|
ch = sys.stdin.read(1)
|
||||||
response += ch
|
response += ch
|
||||||
if ch == 'R':
|
if ch == endchar:
|
||||||
break
|
break
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# Restore the terminal settings
|
# Restore the terminal settings
|
||||||
termios.tcsetattr(fd, termios.TCSANOW, old_settings)
|
termios.tcsetattr(fd, termios.TCSANOW, old_settings)
|
||||||
|
return response
|
||||||
|
|
||||||
# Parse the response using regex
|
def get_position() -> tuple[int, int]:
|
||||||
match = re.search(r'\[(\d+);(\d+)R', response)
|
'''Get the (y, x) position of the cursor'''
|
||||||
|
reply = _query_terminal("\x1b[6n", "R")
|
||||||
|
|
||||||
|
match = re.search(r"\[(\d+);(\d+)R", reply)
|
||||||
if match:
|
if match:
|
||||||
y, x = map(int, match.groups())
|
y, x = map(int, match.groups())
|
||||||
return y, x
|
return y, x
|
||||||
else:
|
else:
|
||||||
raise ValueError("Failed to parse cursor position response")
|
raise ValueError("Failed to parse cursor position response")
|
||||||
|
|
||||||
|
|
||||||
# sys.stdout.write("\x1b[6n")
|
|
||||||
# sys.stdout.flush()
|
|
||||||
# response = ''
|
|
||||||
# while True:
|
|
||||||
# ch = sys.stdin.read(1)
|
|
||||||
# response += ch
|
|
||||||
# if ch == 'R':
|
|
||||||
# break
|
|
||||||
# match = re.search(r'\[(\d+);(\d+)R', response)
|
|
||||||
# if match:
|
|
||||||
# y, x = map(int, match.groups())
|
|
||||||
# return y, x
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == "__main__":
|
||||||
i = Image.new("RGB", (100, 100), (0, 0, 0))
|
import pyvista as pv
|
||||||
d = ImageDraw.Draw(i)
|
|
||||||
d.ellipse([(5, 5), (95, 95)])
|
|
||||||
d.ellipse([(10, 10), (90, 90)])
|
|
||||||
d.line(((50, 0), (50, 100)))
|
|
||||||
d.line(((0, 50), (100, 50)))
|
|
||||||
|
|
||||||
draw_to_terminal(i)
|
cols, rows = get_terminal_size()
|
||||||
|
v_pix, h_pix = get_terminal_cell_size()
|
||||||
|
height, width = get_terminal_size_pixel()
|
||||||
|
|
||||||
|
y, x = get_position()
|
||||||
|
print(f'Terminal has {rows} rows, {cols} cols = {rows * cols} cells')
|
||||||
|
print(f'Cell size: {h_pix}x{v_pix} px')
|
||||||
|
print(f'Dimensions: {width}x{height} px')
|
||||||
|
|
||||||
|
|
||||||
|
new_lines = int((height/v_pix)-6)
|
||||||
|
print('\n' * new_lines, end='')
|
||||||
|
|
||||||
|
set_position(y - new_lines - 6, x)
|
||||||
|
|
||||||
|
s = pv.Sphere()
|
||||||
|
pl = pv.Plotter(off_screen=True)
|
||||||
|
pl.add_mesh(s)
|
||||||
|
b = BytesIO()
|
||||||
|
pl.screenshot(b, transparent_background=True, window_size=(width, height))
|
||||||
|
|
||||||
|
# i = Image.new("RGB", (100, 100), (0, 0, 0))
|
||||||
|
# d = ImageDraw.Draw(i)
|
||||||
|
# d.ellipse([(5, 5), (95, 95)])
|
||||||
|
# d.ellipse([(10, 10), (90, 90)])
|
||||||
|
# d.line(((50, 0), (50, 100)))
|
||||||
|
# d.line(((0, 50), (100, 50)))
|
||||||
|
|
||||||
|
draw_to_terminal(b)
|
||||||
|
Loading…
Reference in New Issue
Block a user