106 lines
3.4 KiB
Python
106 lines
3.4 KiB
Python
import asyncio
|
|
import math
|
|
import select
|
|
import sys
|
|
import termios
|
|
import tty
|
|
import vtk
|
|
from vtk.util import numpy_support
|
|
import numpy as np
|
|
from io import BytesIO
|
|
from PIL import Image
|
|
|
|
import pyvista as pv
|
|
|
|
import kitty
|
|
|
|
|
|
class TerminalPlotter(pv.Plotter):
|
|
def __init__(self, width, height, **kwargs):
|
|
super().__init__(off_screen=True, window_size=(width, height), **kwargs)
|
|
self.width = width
|
|
self.height = height
|
|
self._running = True
|
|
# setup vtk rendering chain
|
|
self.w2i_filter = vtk.vtkWindowToImageFilter()
|
|
self.w2i_filter.SetInputBufferTypeToRGBA()
|
|
self.w2i_filter.ReadFrontBufferOff()
|
|
self.w2i_filter.SetInput(self.ren_win)
|
|
self.writer = vtk.vtkPNGWriter()
|
|
self.writer.WriteToMemoryOn()
|
|
self.writer.SetInputConnection(self.w2i_filter.GetOutputPort())
|
|
# enable transparency
|
|
self.set_background([0.0, 0.0, 0.0])
|
|
self.ren_win.SetAlphaBitPlanes(1)
|
|
self.ren_win.SetMultiSamples(0)
|
|
|
|
|
|
async def initialize(self):
|
|
h_pix, _ = await kitty.get_terminal_cell_size()
|
|
self.rows, _ = kitty.get_terminal_size()
|
|
self.start_y, self.start_x = await kitty.get_position()
|
|
self.needed_lines = math.ceil(self.height / h_pix)
|
|
|
|
# Add vertical space if needed
|
|
if self.rows - self.start_y < self.needed_lines:
|
|
missing = self.needed_lines - (self.rows - self.start_y)
|
|
self.start_y -= missing
|
|
print("\n" * self.needed_lines, end="")
|
|
kitty.set_position(self.start_y, self.start_x)
|
|
self.orbit = self.generate_orbital_path()
|
|
|
|
async def _handle_key(self, c):
|
|
if c == "a":
|
|
self.camera.Azimuth(10)
|
|
elif c == "d":
|
|
self.camera.Azimuth(-10)
|
|
elif c == "w":
|
|
self.camera.Elevation(10)
|
|
elif c == "s":
|
|
self.camera.Elevation(-10)
|
|
elif c == "+":
|
|
self.camera.zoom(1.1)
|
|
elif c == "-":
|
|
self.camera.zoom(0.9)
|
|
elif c == "p":
|
|
self.fly_to(self.orbit.points[15])
|
|
|
|
async def _input_loop(self):
|
|
fd = sys.stdin.fileno()
|
|
old_settings = termios.tcgetattr(fd)
|
|
try:
|
|
tty.setcbreak(fd)
|
|
while self._running:
|
|
rlist, _, _ = select.select([sys.stdin], [], [], 0)
|
|
if rlist:
|
|
c = sys.stdin.read(1)
|
|
if c == "q": # quit on q
|
|
self._running = False
|
|
else:
|
|
await self._handle_key(c)
|
|
await asyncio.sleep(0.001)
|
|
finally:
|
|
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
|
|
|
|
async def _render_loop(self):
|
|
while self._running:
|
|
# keep renedring the scene
|
|
self.ren_win.Render()
|
|
# Update the filter to grab the current buffer
|
|
self.w2i_filter.Modified()
|
|
self.w2i_filter.Update()
|
|
self.writer.Write()
|
|
await kitty.draw_to_terminal(self.writer.GetResult())
|
|
kitty.set_position(self.start_y, self.start_x)
|
|
self.camera.Azimuth(1)
|
|
|
|
async def run(self):
|
|
await self.initialize()
|
|
self.iren.initialize()
|
|
tasks = [
|
|
asyncio.create_task(self._input_loop()),
|
|
asyncio.create_task(self._render_loop()),
|
|
]
|
|
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
|
|
self._running = False
|