#!/usr/bin/env python3 import requests from bs4 import BeautifulSoup as Soup import curses import webbrowser import math import aiohttp import asyncio import api spinner_states = ['-', '\\', '|', '/'] class Client: def __init__(self): # set up curses self.screen = curses.initscr() curses.start_color() curses.use_default_colors() curses.init_pair(1, 255, 208) curses.noecho() curses.cbreak() self.screen.keypad(True) self.topstories = api.get_topstories() self.loadedstories = {} self.story_pos = 0 self.cursor_pos = 0 self.lines = curses.LINES self.cols = curses.COLS self.stories_in_a_site = self.lines - 3 def set_footer(self, footer): self.screen.addstr(curses.LINES - 1, 0, footer, curses.A_REVERSE) self.screen.clrtoeol() def draw(self): self.screen.clear() self.lines = curses.LINES self.cols = curses.COLS # header, detail, footer: self.stories_in_a_site = self.lines - 3 title_string = '[Y] Hacker News' title_string +=' ' * (self.cols - len(title_string) - 1) + '\n' self.screen.addstr(title_string, curses.color_pair(1)) for i, story_id in enumerate(self.topstories[self.story_pos:self.story_pos + self.stories_in_a_site]): story = self.loadedstories[story_id] prefix = '>' if i == self.cursor_pos else '' # calculate length of line text = f'{prefix} ()\n' chars_available = self.cols - len(text) max_title_len = min((chars_available//4)*3, len(story.title)) max_url_len = chars_available - max_title_len title = story.title[:max_title_len-1] + "…" if len(story.title) > max_title_len else story.title link = story.link.replace('https://', '').replace('http://', '') link = link[:max_url_len-1] + "…" if len(link) > max_url_len else link self.screen.addstr(prefix) if i == self.cursor_pos: if story.read: self.screen.addstr(title, curses.A_DIM | curses.A_UNDERLINE) else: self.screen.addstr(title, curses.A_UNDERLINE) self.screen.addstr(f' ({link})\n', curses.A_DIM ) detail = f' by {story.author} | {story.comments} comments | {story.votes} points\n' self.screen.addstr(detail) else: if story.read: self.screen.addstr(title, curses.A_DIM) else: self.screen.addstr(title) self.screen.addstr(f' ({link})\n', curses.A_DIM ) page = int(self.story_pos / self.stories_in_a_site + 1) total_pages = math.ceil(len(self.loadedstories)/500) self.set_footer(f'Page {page}/{total_pages}, loaded {len(self.loadedstories)} stories.') #self.set_footer(f'{self.loadedstories}') async def handle_input(self): c = self.screen.getch() story = self.loadedstories[self.topstories[self.story_pos + self.cursor_pos]] if c == ord('q'): # Quit self.exit() elif c == curses.KEY_UP: self.cursor_pos -= 1 if self.cursor_pos < 0: self.cursor_pos = self.stories_in_a_site-1 # scroll up a page (: self.story_pos -= self.stories_in_a_site self.story_pos = 0 if self.story_pos < 0 else self.story_pos elif c == curses.KEY_DOWN: self.cursor_pos += 1 if self.cursor_pos >= self.stories_in_a_site: self.cursor_pos = 0 # scroll up down a page :) self.story_pos += self.stories_in_a_site await self.load_more_if_needed() elif c == ord('c'): # open comments webbrowser.open(f'https://news.ycombinator.com/item?id={story.id}') elif c == curses.KEY_ENTER or c == 10: # open link if story.link == 'No URL': webbrowser.open(f'https://news.ycombinator.com/item?id={story.id}') else: webbrowser.open(story.link) story.read = True elif c == ord('r'): await self.reload() elif c == curses.KEY_RESIZE: curses.resize_term(*self.screen.getmaxyx()) self.lines, self.cols = self.screen.getmaxyx() self.stories_in_a_site = self.lines - 3 await self.load_more_if_needed() async def load_more_if_needed(self): if len(self.loadedstories) < self.story_pos + self.stories_in_a_site: # load more await self.load_stories(self.story_pos, self.story_pos + self.stories_in_a_site) async def load_stories(self, from_pos, to_pos): #self.set_footer(f'[{spinner_states[idx%4]}] Loading { to_pos - from_pos } stories...') self.set_footer(f'Loading stories...') self.screen.refresh() story_list = [] tasks = [] #async with self.session as session: session = self.session for idx, i in enumerate(self.topstories[from_pos:to_pos]): tasks.append(asyncio.ensure_future(api.get_story(session, i))) story_list = await asyncio.gather(*tasks) for story in story_list: if story.id in self.loadedstories.keys(): # upon reloading, refresh counts + title self.loadedstories[story.id].votes = story.votes self.loadedstories[story.id].comments = story.comments self.loadedstories[story.id].title = story.title pass else: self.loadedstories[story.id] = story async def run(self): self.session = aiohttp.ClientSession() await self.load_stories(0, self.stories_in_a_site) while True: self.draw() await self.handle_input() def exit(self): self.session.close() curses.endwin() import sys sys.exit(0) async def reload(self): self.set_footer("Reloading...") self.screen.refresh() self.topstories = api.get_topstories() #self.loadedstories = [] self.story_pos = 0 self.cursor_pos = 0 await self.load_stories(self.cursor_pos, self.cursor_pos + self.stories_in_a_site) def main(): try: client = Client() #async with client.run(): asyncio.run(client.run()) except Exception as e: curses.endwin() client.session.close() raise e if __name__ == '__main__': main()