Compare commits

..

No commits in common. "a0376ae9a4328555564ec0dbf20ae106ca219752" and "9bddcfaa7ed68920ac1f2f3494523d7b9e970621" have entirely different histories.

3 changed files with 52 additions and 150 deletions

View File

@ -7,16 +7,20 @@ import numpy as np
import math import math
import subprocess import subprocess
import re import re
import requests
import argparse import argparse
import asyncio import asyncio
import os
import IP2Location import IP2Location
from datetime import timezone, datetime
from terminalplotter import TerminalPlotter from terminalplotter import TerminalPlotter
# DONE: Color arches based on latency # TODO: Color arches based on latency
# TODO: Text info (num hops etc.) # TODO: Text info (num hops etc.)
# TODO: Mouse support # TODO: Mouse support
# DONE: Interactive globe (spin w/ keys)
# DONE: Image spacing
# DONE: Async rendering?
EXAMPLE_ROUTE = [ EXAMPLE_ROUTE = [
(47.996, 7.849), # freiburg (47.996, 7.849), # freiburg
@ -28,8 +32,8 @@ EXAMPLE_ROUTE = [
] ]
# Convert lat/lon to Cartesian coordinates
def latlon_to_xyz(lat: float, lon: float, radius=1.0): def latlon_to_xyz(lat: float, lon: float, radius=1.0):
'''Convert lat/lon to Cartesian coordinates'''
lat_rad = np.radians(lat) lat_rad = np.radians(lat)
lon_rad = np.radians(lon) lon_rad = np.radians(lon)
x = radius * np.cos(lat_rad) * np.cos(lon_rad) x = radius * np.cos(lat_rad) * np.cos(lon_rad)
@ -66,7 +70,6 @@ def generate_arch(p1, p2, height_factor=0.2, n_points=100):
def traceroute(target: str): def traceroute(target: str):
'''Run traceroute and look up the coordinates in geo database'''
database = IP2Location.IP2Location("IP2LOCATION-LITE-DB5.BIN", "SHARED_MEMORY") database = IP2Location.IP2Location("IP2LOCATION-LITE-DB5.BIN", "SHARED_MEMORY")
# Run traceroute command # Run traceroute command
@ -154,7 +157,7 @@ async def main():
width, height = args.size width, height = args.size
else: else:
height, width = await kitty.get_terminal_size_pixel() height, width = await kitty.get_terminal_size_pixel()
plotter = TerminalPlotter(width, height, lighting='none') plotter = TerminalPlotter(width, height)
plotter.add_mesh( plotter.add_mesh(
globe, color="tan", smooth_shading=False, texture=tex, show_edges=False globe, color="tan", smooth_shading=False, texture=tex, show_edges=False
) )
@ -182,12 +185,6 @@ async def main():
color = "red" color = "red"
plotter.add_mesh(line, color=color, line_width=2) plotter.add_mesh(line, color=color, line_width=2)
now = datetime.now(timezone.utc)
sun_angle = (now.hour - 12) * (360/24)
light = pv.Light()
light.set_direction_angle(0.55864, sun_angle)
plotter.add_light(light)
kitty.hide_cursor() kitty.hide_cursor()
try: try:
await plotter.run() await plotter.run()

146
kitty.py Normal file → Executable file
View File

@ -5,103 +5,14 @@ import sys
import termios import termios
import tty import tty
from base64 import standard_b64encode from base64 import standard_b64encode
from io import BytesIO
import array import array
import fcntl import fcntl
import zlib
async def draw_to_terminal(buffer: BytesIO) -> None:
import mmap """Display a PNG image in the terminal."""
import os await _write_chunked(a="T", i=1, f=100, q=2, data=buffer)
import select
from enum import Enum
SHM_NAME = "/kitty-shm-frame"
class Kitty_Format(Enum):
RGB = 24
RGBA = 32
PNG = 100
async def draw_to_terminal(
buffer,
width: int | None,
height: int | None,
pixel_format: Kitty_Format = Kitty_Format.RGBA,
image_num: int = 1,
compress=False,
use_shm: bool = False,
) -> None:
"""Display an image in the terminal."""
if (pixel_format != Kitty_Format.PNG or use_shm) and (width is None or height is None):
raise ValueError(
"shm transfer or using image formats other than PNG and require height and width"
)
kwargs = {"a": "T", "i": image_num, "f": pixel_format.value, "t": "s" if use_shm else "d", "q": 2}
if width:
kwargs["s"] = width
if height:
kwargs["v"] = height
if compress:
kwargs["o"] = "z"
data = zlib.compress(buffer)
else:
data = buffer
if use_shm:
# write to shm
_mmap(data, width, height)
# set shm name as payload
data = SHM_NAME
await _write_chunked(**kwargs, data=data)
await asyncio.sleep(0.001)
def _mmap(np_image, width: int, height: int) -> None:
"""Write image data to shared memory"""
shm_size = width * height * 4 # RGBA
fd = os.open(f"/dev/shm{SHM_NAME}", os.O_CREAT | os.O_RDWR)
os.ftruncate(fd, shm_size)
shm = mmap.mmap(fd, shm_size, mmap.MAP_SHARED, mmap.PROT_WRITE)
# write to shared memory
shm.seek(0)
shm.write(np_image.tobytes())
shm.flush()
def supports_kitty_graphics(timeout=0.1) -> bool:
"""Check if the terminal has support for the graphics protocol."""
if not sys.stdout.isatty():
return False
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
# Set terminal to raw mode
tty.setraw(fd)
# Send Kitty graphics query escape code
sys.stdout.write("\033_Gs=1,v=1,a=q,t=d,f=24;AAAA\033\\")
sys.stdout.flush()
# Wait for response with timeout
rlist, _, _ = select.select([fd], [], [], timeout)
if not rlist:
return False
# Read response
response = os.read(fd, 1024).decode("utf-8", "ignore")
return response.startswith("\033_GOK")
finally:
# Restore terminal settings
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
async def get_terminal_cell_size() -> tuple[int, int]: async def get_terminal_cell_size() -> tuple[int, int]:
@ -149,16 +60,14 @@ def _serialize_gr_command(**cmd) -> bytes:
async def _write_chunked(**cmd) -> None: async def _write_chunked(**cmd) -> None:
if cmd["t"] == "s": data = standard_b64encode(cmd.pop("data"))
data = standard_b64encode(bytes(cmd.pop("data"), "utf-8")) loop = asyncio.get_running_loop()
else:
data = standard_b64encode(cmd.pop("data"))
while data: while data:
chunk, data = data[:4096], data[4096:] chunk, data = data[:4096], data[4096:]
m = 1 if data else 0 m = 1 if data else 0
payload = _serialize_gr_command(payload=chunk, m=m, **cmd) payload = _serialize_gr_command(payload=chunk, m=m, **cmd)
sys.stdout.buffer.write(payload) await loop.run_in_executor(None, sys.stdout.buffer.write, payload)
sys.stdout.flush() await loop.run_in_executor(None, sys.stdout.flush)
cmd.clear() cmd.clear()
@ -177,19 +86,7 @@ def set_position(y: int, x: int) -> None:
_write_stdout(f"\x1b[{y};{x}H") _write_stdout(f"\x1b[{y};{x}H")
async def get_position() -> tuple[int, int]:
"""Get the (y, x) position of the cursor"""
reply = await _query_terminal("\x1b[6n", "R")
match = re.search(r"\[(\d+);(\d+)R", reply)
if match:
y, x = map(int, match.groups())
return y, x
raise ValueError("Failed to parse cursor position response")
def _write_stdout(cmd: str) -> None: def _write_stdout(cmd: str) -> None:
"""Write a command string to stdout and flush."""
sys.stdout.write(cmd) sys.stdout.write(cmd)
sys.stdout.flush() sys.stdout.flush()
@ -223,18 +120,43 @@ async def _query_terminal(escape: str, endchar: str) -> str:
return response return response
async def get_position() -> tuple[int, int]:
"""Get the (y, x) position of the cursor"""
reply = await _query_terminal("\x1b[6n", "R")
match = re.search(r"\[(\d+);(\d+)R", reply)
if match:
y, x = map(int, match.groups())
return y, x
raise ValueError("Failed to parse cursor position response")
async def main(): async def main():
import pyvista as pv
rows, cols = get_terminal_size() rows, cols = get_terminal_size()
v_pix, h_pix = await get_terminal_cell_size() v_pix, h_pix = await get_terminal_cell_size()
height, width = await get_terminal_size_pixel() height, width = await get_terminal_size_pixel()
y, x = await get_position() y, x = await get_position()
y, x = await get_position()
print(f"Terminal has {rows} rows, {cols} cols = {rows * cols} cells") print(f"Terminal has {rows} rows, {cols} cols = {rows * cols} cells")
print(f"Cell size: {h_pix}x{v_pix} px") print(f"Cell size: {h_pix}x{v_pix} px")
print(f"Dimensions: {width}x{height} px") print(f"Dimensions: {width}x{height} px")
print("Cursor is at y, x:", y, x) print("Cursor is at y, x:", y, x)
print(f"Supports graphics: {supports_kitty_graphics()}")
new_lines = int((height / v_pix) - 6)
print("\n" * new_lines, end="")
set_position(y - new_lines - 6, x)
s = pv.Sphere()
pl = pv.Plotter(off_screen=True)
pl.add_mesh(s)
b = BytesIO()
pl.screenshot(b, transparent_background=True, window_size=(width, height))
await draw_to_terminal(b)
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main()) asyncio.run(main())

View File

@ -6,12 +6,14 @@ import termios
import tty import tty
import vtk import vtk
from vtk.util import numpy_support from vtk.util import numpy_support
import numpy as np
from io import BytesIO
from PIL import Image
import pyvista as pv import pyvista as pv
import kitty import kitty
import numpy as np
class TerminalPlotter(pv.Plotter): class TerminalPlotter(pv.Plotter):
def __init__(self, width, height, **kwargs): def __init__(self, width, height, **kwargs):
@ -22,9 +24,11 @@ class TerminalPlotter(pv.Plotter):
# setup vtk rendering chain # setup vtk rendering chain
self.w2i_filter = vtk.vtkWindowToImageFilter() self.w2i_filter = vtk.vtkWindowToImageFilter()
self.w2i_filter.SetInputBufferTypeToRGBA() self.w2i_filter.SetInputBufferTypeToRGBA()
#self.w2i_filter.ReadFrontBufferOff() self.w2i_filter.ReadFrontBufferOff()
self.w2i_filter.ReadFrontBufferOn()
self.w2i_filter.SetInput(self.ren_win) self.w2i_filter.SetInput(self.ren_win)
self.writer = vtk.vtkPNGWriter()
self.writer.WriteToMemoryOn()
self.writer.SetInputConnection(self.w2i_filter.GetOutputPort())
# enable transparency # enable transparency
self.set_background([0.0, 0.0, 0.0]) self.set_background([0.0, 0.0, 0.0])
self.ren_win.SetAlphaBitPlanes(1) self.ren_win.SetAlphaBitPlanes(1)
@ -79,25 +83,14 @@ class TerminalPlotter(pv.Plotter):
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
async def _render_loop(self): async def _render_loop(self):
import time
while self._running: while self._running:
start = time.perf_counter()
# keep renedring the scene # keep renedring the scene
self.ren_win.Render() self.ren_win.Render()
# Update the filter to grab the current buffer # Update the filter to grab the current buffer
self.w2i_filter.Modified() self.w2i_filter.Modified()
self.w2i_filter.Update() self.w2i_filter.Update()
vtk_image = self.w2i_filter.GetOutput() self.writer.Write()
width, height, _ = vtk_image.GetDimensions() await kitty.draw_to_terminal(self.writer.GetResult())
vtk_array = vtk_image.GetPointData().GetScalars()
np_image = numpy_support.vtk_to_numpy(vtk_array).reshape(height, width, 4)
np_image = np_image[::-1] # Flip vertically
np_image = np.ascontiguousarray(np_image) # Ensure memory layout is C-contiguous
print("Render & copy:", time.perf_counter() - start)
#await kitty.draw_to_terminal(np_image, width, height, compress=True)
await kitty.draw_to_terminal(np_image, width, height, use_shm=True)
print("Draw:", time.perf_counter() - start)
kitty.set_position(self.start_y, self.start_x) kitty.set_position(self.start_y, self.start_x)
self.camera.Azimuth(1) self.camera.Azimuth(1)
@ -109,14 +102,4 @@ class TerminalPlotter(pv.Plotter):
asyncio.create_task(self._render_loop()), asyncio.create_task(self._render_loop()),
] ]
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
self._running = False self._running = False
async def main():
plotter = TerminalPlotter(1000, 1000)
mesh = pv.Sphere()
plotter.add_mesh(mesh)
await plotter.run()
if __name__ == '__main__':
import cProfile
cProfile.run("asyncio.run(main())", "profile")