You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

198 lines
6.8 KiB
Python

#!/usr/bin/env python3
import requests
from bs4 import BeautifulSoup as Soup
import curses
import webbrowser
import math
import aiohttp
import asyncio
import api
spinner_states = ['-', '\\', '|', '/']
class Client:
def __init__(self):
# set up curses
self.screen = curses.initscr()
curses.start_color()
curses.use_default_colors()
curses.init_pair(1, 255, 208)
curses.noecho()
curses.cbreak()
self.screen.keypad(True)
self.topstories = api.get_topstories()
self.loadedstories = {}
self.story_pos = 0
self.cursor_pos = 0
self.lines = curses.LINES
self.cols = curses.COLS
self.stories_in_a_site = self.lines - 3
def set_footer(self, footer):
self.screen.addstr(curses.LINES - 1, 0, footer, curses.A_REVERSE)
self.screen.clrtoeol()
def draw(self):
self.screen.clear()
self.lines = curses.LINES
self.cols = curses.COLS
# header, detail, footer:
self.stories_in_a_site = self.lines - 3
title_string = '[Y] Hacker News'
title_string += ' ' * (self.cols - len(title_string))
self.screen.addstr(title_string, curses.color_pair(1))
for i, story_id in enumerate(self.topstories[self.story_pos:self.story_pos + self.stories_in_a_site]):
story = self.loadedstories[story_id]
prefix = '>' if i == self.cursor_pos else ''
# calculate length of line
text = f'{prefix} ()'
chars_available = self.cols - len(text)
max_title_len = min((chars_available//4)*3, len(story.title))
max_url_len = chars_available - max_title_len
title = story.title[:max_title_len-1] + "" if len(story.title) > max_title_len else story.title
link = story.link.replace('https://', '').replace('http://', '')
link = link[:max_url_len - 1] + "" if len(link) > max_url_len else link
self.screen.addstr(prefix)
if i == self.cursor_pos:
if story.read:
self.screen.addstr(title, curses.A_DIM | curses.A_UNDERLINE)
else:
self.screen.addstr(title, curses.A_UNDERLINE)
self.screen.addstr(f' ({link})', curses.A_DIM )
if len(text) + len(title) + len(link) + 1 < self.cols:
self.screen.addstr('\n')
detail = f' by {story.author} | {story.comments} comments | {story.votes} points\n'
self.screen.addstr(detail)
else:
if story.read:
self.screen.addstr(title, curses.A_DIM)
else:
self.screen.addstr(title)
self.screen.addstr(f' ({link})', curses.A_DIM )
if len(text) + len(title) + len(link) + 1 < self.cols:
self.screen.addstr('\n')
page = int(self.story_pos / self.stories_in_a_site + 1)
total_pages = math.ceil(500/len(self.loadedstories))
self.set_footer(f'Page {page}/{total_pages}, loaded {len(self.loadedstories)} stories.')
#self.set_footer(f'{self.loadedstories}')
async def handle_input(self):
c = self.screen.getch()
story = self.loadedstories[self.topstories[self.story_pos + self.cursor_pos]]
if c == ord('q'): # Quit
self.exit()
elif c == curses.KEY_UP:
self.cursor_pos -= 1
if self.cursor_pos < 0:
self.cursor_pos = self.stories_in_a_site-1
# scroll up a page (:
self.story_pos -= self.stories_in_a_site
self.story_pos = 0 if self.story_pos < 0 else self.story_pos
elif c == curses.KEY_DOWN:
self.cursor_pos += 1
if self.cursor_pos >= self.stories_in_a_site:
self.cursor_pos = 0
# scroll up down a page :)
self.story_pos += self.stories_in_a_site
await self.load_more_if_needed()
elif c == ord('c'):
# open comments
webbrowser.open(f'https://news.ycombinator.com/item?id={story.id}')
elif c == curses.KEY_ENTER or c == 10:
# open link
if story.link == 'No URL':
webbrowser.open(f'https://news.ycombinator.com/item?id={story.id}')
else:
webbrowser.open(story.link)
story.read = True
elif c == ord('r'):
await self.reload()
elif c == curses.KEY_RESIZE:
curses.resize_term(*self.screen.getmaxyx())
self.lines, self.cols = self.screen.getmaxyx()
self.stories_in_a_site = self.lines - 3
await self.load_more_if_needed()
async def load_more_if_needed(self):
if len(self.loadedstories) < self.story_pos + self.stories_in_a_site:
# load more
await self.load_stories(self.story_pos, self.story_pos + self.stories_in_a_site)
async def load_stories(self, from_pos, to_pos):
#self.set_footer(f'[{spinner_states[idx%4]}] Loading { to_pos - from_pos } stories...')
self.set_footer(f'Loading stories...')
self.screen.refresh()
story_list = []
tasks = []
#async with self.session as session:
session = self.session
for idx, i in enumerate(self.topstories[from_pos:to_pos]):
tasks.append(asyncio.ensure_future(api.get_story(session, i)))
story_list = await asyncio.gather(*tasks)
for story in story_list:
if story.id in self.loadedstories.keys():
# upon reloading, refresh counts + title
self.loadedstories[story.id].votes = story.votes
self.loadedstories[story.id].comments = story.comments
self.loadedstories[story.id].title = story.title
pass
else:
self.loadedstories[story.id] = story
async def run(self):
self.session = aiohttp.ClientSession()
await self.load_stories(0, self.stories_in_a_site)
while True:
self.draw()
await self.handle_input()
def exit(self):
self.session.close()
curses.endwin()
import sys
sys.exit(0)
async def reload(self):
self.set_footer("Reloading...")
self.screen.refresh()
self.topstories = api.get_topstories()
#self.loadedstories = []
self.story_pos = 0
self.cursor_pos = 0
await self.load_stories(self.cursor_pos, self.cursor_pos + self.stories_in_a_site)
def main():
try:
client = Client()
#async with client.run():
asyncio.run(client.run())
except Exception as e:
curses.endwin()
client.session.close()
raise e
if __name__ == '__main__':
main()