Compare commits

...

4 Commits

3 changed files with 150 additions and 52 deletions

View File

@ -7,20 +7,16 @@ import numpy as np
import math
import subprocess
import re
import requests
import argparse
import asyncio
import os
import IP2Location
from datetime import timezone, datetime
from terminalplotter import TerminalPlotter
# TODO: Color arches based on latency
# DONE: Color arches based on latency
# TODO: Text info (num hops etc.)
# TODO: Mouse support
# DONE: Interactive globe (spin w/ keys)
# DONE: Image spacing
# DONE: Async rendering?
EXAMPLE_ROUTE = [
(47.996, 7.849), # freiburg
@ -32,8 +28,8 @@ EXAMPLE_ROUTE = [
]
# Convert lat/lon to Cartesian coordinates
def latlon_to_xyz(lat: float, lon: float, radius=1.0):
'''Convert lat/lon to Cartesian coordinates'''
lat_rad = np.radians(lat)
lon_rad = np.radians(lon)
x = radius * np.cos(lat_rad) * np.cos(lon_rad)
@ -70,6 +66,7 @@ def generate_arch(p1, p2, height_factor=0.2, n_points=100):
def traceroute(target: str):
'''Run traceroute and look up the coordinates in geo database'''
database = IP2Location.IP2Location("IP2LOCATION-LITE-DB5.BIN", "SHARED_MEMORY")
# Run traceroute command
@ -157,7 +154,7 @@ async def main():
width, height = args.size
else:
height, width = await kitty.get_terminal_size_pixel()
plotter = TerminalPlotter(width, height)
plotter = TerminalPlotter(width, height, lighting='none')
plotter.add_mesh(
globe, color="tan", smooth_shading=False, texture=tex, show_edges=False
)
@ -185,6 +182,12 @@ async def main():
color = "red"
plotter.add_mesh(line, color=color, line_width=2)
now = datetime.now(timezone.utc)
sun_angle = (now.hour - 12) * (360/24)
light = pv.Light()
light.set_direction_angle(0.55864, sun_angle)
plotter.add_light(light)
kitty.hide_cursor()
try:
await plotter.run()

146
kitty.py Executable file → Normal file
View File

@ -5,14 +5,103 @@ import sys
import termios
import tty
from base64 import standard_b64encode
from io import BytesIO
import array
import fcntl
import zlib
async def draw_to_terminal(buffer: BytesIO) -> None:
"""Display a PNG image in the terminal."""
await _write_chunked(a="T", i=1, f=100, q=2, data=buffer)
import mmap
import os
import select
from enum import Enum
SHM_NAME = "/kitty-shm-frame"
class Kitty_Format(Enum):
RGB = 24
RGBA = 32
PNG = 100
async def draw_to_terminal(
buffer,
width: int | None,
height: int | None,
pixel_format: Kitty_Format = Kitty_Format.RGBA,
image_num: int = 1,
compress=False,
use_shm: bool = False,
) -> None:
"""Display an image in the terminal."""
if (pixel_format != Kitty_Format.PNG or use_shm) and (width is None or height is None):
raise ValueError(
"shm transfer or using image formats other than PNG and require height and width"
)
kwargs = {"a": "T", "i": image_num, "f": pixel_format.value, "t": "s" if use_shm else "d", "q": 2}
if width:
kwargs["s"] = width
if height:
kwargs["v"] = height
if compress:
kwargs["o"] = "z"
data = zlib.compress(buffer)
else:
data = buffer
if use_shm:
# write to shm
_mmap(data, width, height)
# set shm name as payload
data = SHM_NAME
await _write_chunked(**kwargs, data=data)
await asyncio.sleep(0.001)
def _mmap(np_image, width: int, height: int) -> None:
"""Write image data to shared memory"""
shm_size = width * height * 4 # RGBA
fd = os.open(f"/dev/shm{SHM_NAME}", os.O_CREAT | os.O_RDWR)
os.ftruncate(fd, shm_size)
shm = mmap.mmap(fd, shm_size, mmap.MAP_SHARED, mmap.PROT_WRITE)
# write to shared memory
shm.seek(0)
shm.write(np_image.tobytes())
shm.flush()
def supports_kitty_graphics(timeout=0.1) -> bool:
"""Check if the terminal has support for the graphics protocol."""
if not sys.stdout.isatty():
return False
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
# Set terminal to raw mode
tty.setraw(fd)
# Send Kitty graphics query escape code
sys.stdout.write("\033_Gs=1,v=1,a=q,t=d,f=24;AAAA\033\\")
sys.stdout.flush()
# Wait for response with timeout
rlist, _, _ = select.select([fd], [], [], timeout)
if not rlist:
return False
# Read response
response = os.read(fd, 1024).decode("utf-8", "ignore")
return response.startswith("\033_GOK")
finally:
# Restore terminal settings
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
async def get_terminal_cell_size() -> tuple[int, int]:
@ -60,14 +149,16 @@ def _serialize_gr_command(**cmd) -> bytes:
async def _write_chunked(**cmd) -> None:
data = standard_b64encode(cmd.pop("data"))
loop = asyncio.get_running_loop()
if cmd["t"] == "s":
data = standard_b64encode(bytes(cmd.pop("data"), "utf-8"))
else:
data = standard_b64encode(cmd.pop("data"))
while data:
chunk, data = data[:4096], data[4096:]
m = 1 if data else 0
payload = _serialize_gr_command(payload=chunk, m=m, **cmd)
await loop.run_in_executor(None, sys.stdout.buffer.write, payload)
await loop.run_in_executor(None, sys.stdout.flush)
sys.stdout.buffer.write(payload)
sys.stdout.flush()
cmd.clear()
@ -86,7 +177,19 @@ def set_position(y: int, x: int) -> None:
_write_stdout(f"\x1b[{y};{x}H")
async def get_position() -> tuple[int, int]:
"""Get the (y, x) position of the cursor"""
reply = await _query_terminal("\x1b[6n", "R")
match = re.search(r"\[(\d+);(\d+)R", reply)
if match:
y, x = map(int, match.groups())
return y, x
raise ValueError("Failed to parse cursor position response")
def _write_stdout(cmd: str) -> None:
"""Write a command string to stdout and flush."""
sys.stdout.write(cmd)
sys.stdout.flush()
@ -120,43 +223,18 @@ async def _query_terminal(escape: str, endchar: str) -> str:
return response
async def get_position() -> tuple[int, int]:
"""Get the (y, x) position of the cursor"""
reply = await _query_terminal("\x1b[6n", "R")
match = re.search(r"\[(\d+);(\d+)R", reply)
if match:
y, x = map(int, match.groups())
return y, x
raise ValueError("Failed to parse cursor position response")
async def main():
import pyvista as pv
rows, cols = get_terminal_size()
v_pix, h_pix = await get_terminal_cell_size()
height, width = await get_terminal_size_pixel()
y, x = await get_position()
y, x = await get_position()
print(f"Terminal has {rows} rows, {cols} cols = {rows * cols} cells")
print(f"Cell size: {h_pix}x{v_pix} px")
print(f"Dimensions: {width}x{height} px")
print("Cursor is at y, x:", y, x)
print(f"Supports graphics: {supports_kitty_graphics()}")
new_lines = int((height / v_pix) - 6)
print("\n" * new_lines, end="")
set_position(y - new_lines - 6, x)
s = pv.Sphere()
pl = pv.Plotter(off_screen=True)
pl.add_mesh(s)
b = BytesIO()
pl.screenshot(b, transparent_background=True, window_size=(width, height))
await draw_to_terminal(b)
if __name__ == "__main__":
asyncio.run(main())

View File

@ -6,14 +6,12 @@ import termios
import tty
import vtk
from vtk.util import numpy_support
import numpy as np
from io import BytesIO
from PIL import Image
import pyvista as pv
import kitty
import numpy as np
class TerminalPlotter(pv.Plotter):
def __init__(self, width, height, **kwargs):
@ -24,11 +22,9 @@ class TerminalPlotter(pv.Plotter):
# setup vtk rendering chain
self.w2i_filter = vtk.vtkWindowToImageFilter()
self.w2i_filter.SetInputBufferTypeToRGBA()
self.w2i_filter.ReadFrontBufferOff()
#self.w2i_filter.ReadFrontBufferOff()
self.w2i_filter.ReadFrontBufferOn()
self.w2i_filter.SetInput(self.ren_win)
self.writer = vtk.vtkPNGWriter()
self.writer.WriteToMemoryOn()
self.writer.SetInputConnection(self.w2i_filter.GetOutputPort())
# enable transparency
self.set_background([0.0, 0.0, 0.0])
self.ren_win.SetAlphaBitPlanes(1)
@ -83,14 +79,25 @@ class TerminalPlotter(pv.Plotter):
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
async def _render_loop(self):
import time
while self._running:
start = time.perf_counter()
# keep renedring the scene
self.ren_win.Render()
# Update the filter to grab the current buffer
self.w2i_filter.Modified()
self.w2i_filter.Update()
self.writer.Write()
await kitty.draw_to_terminal(self.writer.GetResult())
vtk_image = self.w2i_filter.GetOutput()
width, height, _ = vtk_image.GetDimensions()
vtk_array = vtk_image.GetPointData().GetScalars()
np_image = numpy_support.vtk_to_numpy(vtk_array).reshape(height, width, 4)
np_image = np_image[::-1] # Flip vertically
np_image = np.ascontiguousarray(np_image) # Ensure memory layout is C-contiguous
print("Render & copy:", time.perf_counter() - start)
#await kitty.draw_to_terminal(np_image, width, height, compress=True)
await kitty.draw_to_terminal(np_image, width, height, use_shm=True)
print("Draw:", time.perf_counter() - start)
kitty.set_position(self.start_y, self.start_x)
self.camera.Azimuth(1)
@ -102,4 +109,14 @@ class TerminalPlotter(pv.Plotter):
asyncio.create_task(self._render_loop()),
]
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
self._running = False
self._running = False
async def main():
plotter = TerminalPlotter(1000, 1000)
mesh = pv.Sphere()
plotter.add_mesh(mesh)
await plotter.run()
if __name__ == '__main__':
import cProfile
cProfile.run("asyncio.run(main())", "profile")