kglobe/terminalio.py

70 lines
2.2 KiB
Python

import asyncio
import os
import re
import sys
import tty
import termios
class TerminalIO:
def __init__(self):
self.ok_event = asyncio.Event()
self.input_callbacks = []
self._reader_task = None
self._stop = False
self._buffer = ""
self._old_settings = None
def register_input_callback(self, callback):
"""Register a function that takes a single character input."""
self.input_callbacks.append(callback)
def start(self):
"""Start the stdin reading loop."""
if not sys.stdin.isatty():
raise RuntimeError("stdin is not a TTY")
self._old_settings = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())
self._reader_task = asyncio.create_task(self._stdin_loop())
async def stop(self):
"""Stop the reader and restore terminal settings."""
self._stop = True
if self._reader_task:
await self._reader_task
if self._old_settings:
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self._old_settings)
async def wait_for_ok(self, timeout=1.0):
"""Wait for a Kitty OK message (non-blocking)."""
try:
await asyncio.wait_for(self.ok_event.wait(), timeout)
except asyncio.TimeoutError:
raise RuntimeError("Timed out waiting for Kitty OK response")
self.ok_event.clear()
async def _stdin_loop(self):
loop = asyncio.get_running_loop()
fd = sys.stdin.fileno()
def read_byte():
return os.read(fd, 1).decode("utf-8", errors="ignore")
while not self._stop:
char = await loop.run_in_executor(None, read_byte)
self._buffer += char
if self._buffer.endswith("\033\\"):
#if "\033_GOK" in self._buffer:
if re.search('\033_G(i=\\d+;)?OK', self._buffer):
self.ok_event.set()
self._buffer = ""
continue
# Dispatch to key input handlers (non-escape)
if not char.startswith("\033"):
for callback in self.input_callbacks:
callback(char)
await asyncio.sleep(0.01)