feat: unify terminal color logging across FeedLoader, RSSHandler, and FeedIndex

This commit is contained in:
Stu Leak 2025-11-03 23:08:53 -05:00
parent ddc5576ec1
commit 509ff92722
4 changed files with 221 additions and 30 deletions

View File

@ -9,6 +9,7 @@
import os import os
import json import json
from src.core.terminal_colors import TerminalColors as C
# ------------------------------------------------------------ # ------------------------------------------------------------
# Region alias table for common Canadian cities/suburbs # Region alias table for common Canadian cities/suburbs
@ -74,16 +75,16 @@ class FeedLoader:
def _load_json(self) -> None: def _load_json(self) -> None:
"""Loads feeds.json and parses all sections.""" """Loads feeds.json and parses all sections."""
if not os.path.exists(self.config_path): if not os.path.exists(self.config_path):
raise FileNotFoundError(f"Missing feeds.json at {self.config_path}") raise FileNotFoundError(f"{C.ERROR}[error]{C.RESET} Missing feeds.json at {self.config_path}")
with open(self.config_path, "r", encoding="utf-8") as file: with open(self.config_path, "r", encoding="utf-8") as file:
try: try:
data = json.load(file) data = json.load(file)
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in feeds.json: {e}") raise ValueError(f"{C.ERROR}[error]{C.RESET} Invalid JSON in feeds.json: {e}")
if "Feeds" not in data or not isinstance(data["Feeds"], dict): if "Feeds" not in data or not isinstance(data["Feeds"], dict):
raise ValueError("feeds.json must contain a 'Feeds' dictionary.") raise ValueError(f"{C.ERROR}[error]{C.RESET} feeds.json must contain a 'Feeds' dictionary.")
self.all_feeds = data["Feeds"] self.all_feeds = data["Feeds"]
@ -110,7 +111,7 @@ class FeedLoader:
}) })
self.active_feeds = active self.active_feeds = active
print(f"[info] Loaded {len(self.active_feeds)} active feeds for region: {self.region}") print(f"{C.INFO}[info]{C.RESET} Loaded {len(self.active_feeds)} active feeds for region: {C.SUCCESS}{self.region}{C.RESET}")
# ------------------------------------------------------------ # ------------------------------------------------------------
def get_active_feeds(self) -> list[dict]: def get_active_feeds(self) -> list[dict]:
@ -129,6 +130,6 @@ if __name__ == "__main__":
loader = FeedLoader() loader = FeedLoader()
feeds = loader.get_active_feeds() feeds = loader.get_active_feeds()
print(f"Detected Region: {loader.get_region()}") print(f"{C.INFO}Detected Region:{C.RESET} {C.SUCCESS}{loader.get_region()}{C.RESET}")
for feed in feeds: for feed in feeds:
print(f"- {feed['Name']} ({feed['Url']}) [{feed['Provider']}]") print(f"{C.DEBUG}- {feed['Name']} ({feed['Url']}) [{feed['Provider']}] {C.RESET}")

View File

@ -0,0 +1,31 @@
# ============================================================
# File: src/core/terminal_colors.py
# Description:
# Provides ANSI color codes for structured console logging.
# Used across all Telefact modules (FeedLoader, RSSHandler,
# Renderer, etc.) for consistent, readable terminal output.
# ============================================================
class TerminalColors:
"""ANSI terminal color codes for clean, readable logs."""
RESET = "\033[0m"
# Core log levels
INFO = "\033[96m" # Cyan
WARN = "\033[93m" # Yellow
ERROR = "\033[91m" # Red
SUCCESS = "\033[92m" # Green
DEBUG = "\033[90m" # Grey
# Optional stylistic colors for subsystem tags
HEADER = "\033[95m" # Magenta
TITLE = "\033[94m" # Blue
DATE = "\033[32m" # Green (for timestamps)
SECTION = "\033[36m" # Cyan for dividers
@staticmethod
def format(text: str, color: str) -> str:
"""Quick helper for inline colored text."""
return f"{color}{text}{TerminalColors.RESET}"

View File

@ -18,6 +18,7 @@ import xml.etree.ElementTree as ET
from html import unescape from html import unescape
from datetime import datetime from datetime import datetime
from src.config.feed_loader import FeedLoader from src.config.feed_loader import FeedLoader
from src.core.terminal_colors import TerminalColors as C
class RSSFeedHandler: class RSSFeedHandler:
@ -74,20 +75,19 @@ class RSSFeedHandler:
response = requests.get(url, headers=headers, timeout=20) response = requests.get(url, headers=headers, timeout=20)
if response.status_code == 200: if response.status_code == 200:
return response.text return response.text
print(f"[warn] RSS fetch failed ({response.status_code}) for {url}") print(f"{C.WARN}[warn]{C.RESET} RSS fetch failed ({response.status_code}) for {url}")
break # non-200 is not retryable break # non-200 is not retryable
except requests.Timeout: except requests.Timeout:
print(f"[warn] Timeout {attempt + 1}/3 for {url}") print(f"{C.WARN}[warn]{C.RESET} Timeout {attempt + 1}/3 for {url}")
except requests.RequestException as e: except requests.RequestException as e:
print(f"[warn] RSS fetch error ({attempt + 1}/3): {e}") print(f"{C.WARN}[warn]{C.RESET} RSS fetch error ({attempt + 1}/3): {e}")
time.sleep(2 ** attempt) # backoff: 1 s, 2 s, 4 s time.sleep(2 ** attempt) # backoff: 1s, 2s, 4s
return None return None
# ------------------------------------------------------------ # ------------------------------------------------------------
def _strip_html(self, text: str) -> str: def _strip_html(self, text: str) -> str:
"""Removes HTML tags and decodes entities.""" """Removes HTML tags and decodes entities."""
# Remove <img> and <a> and all tags
clean = re.sub(r"<img[^>]*>", "", text) clean = re.sub(r"<img[^>]*>", "", text)
clean = re.sub(r"<[^>]+>", "", clean) clean = re.sub(r"<[^>]+>", "", clean)
clean = unescape(clean) clean = unescape(clean)
@ -99,7 +99,7 @@ class RSSFeedHandler:
try: try:
root = ET.fromstring(xml_data) root = ET.fromstring(xml_data)
except ET.ParseError as e: except ET.ParseError as e:
print(f"[warn] XML parse error: {e}") print(f"{C.WARN}[warn]{C.RESET} XML parse error: {e}")
return [] return []
channel = root.find("channel") channel = root.find("channel")
@ -108,7 +108,6 @@ class RSSFeedHandler:
stories = [] stories = []
for item in channel.findall("item"): for item in channel.findall("item"):
# --- CBC namespaced attributes ---
cbc_type = "" cbc_type = ""
deptid = "" deptid = ""
@ -132,14 +131,10 @@ class RSSFeedHandler:
title = title.replace("<![CDATA[", "").replace("]]>", "").strip() title = title.replace("<![CDATA[", "").replace("]]>", "").strip()
description = description.replace("<![CDATA[", "").replace("]]>", "").strip() description = description.replace("<![CDATA[", "").replace("]]>", "").strip()
# Strip HTML
clean_text = self._strip_html(description) clean_text = self._strip_html(description)
# Skip empty or image-only items
if not clean_text or len(clean_text) < 20: if not clean_text or len(clean_text) < 20:
continue continue
# Truncate for Teletext readability
summary = clean_text[:500].rstrip() summary = clean_text[:500].rstrip()
stories.append({ stories.append({
@ -152,14 +147,12 @@ class RSSFeedHandler:
}) })
if not stories: if not stories:
print("[debug] No valid <cbc:type>story</cbc:type> entries found — may be namespace issue.") print(f"{C.DEBUG}[debug]{C.RESET} No valid <cbc:type>story</cbc:type> entries found — possible namespace issue.")
else: else:
print(f"[debug] Parsed {len(stories)} stories successfully.") print(f"{C.DEBUG}[debug]{C.RESET} Parsed {len(stories)} stories successfully.")
# Sort newest → oldest
stories = self._sort_stories(stories) stories = self._sort_stories(stories)
# Limit to N stories
if self.story_limit > 0: if self.story_limit > 0:
stories = stories[:self.story_limit] stories = stories[:self.story_limit]
@ -183,7 +176,7 @@ class RSSFeedHandler:
with open(path, "r", encoding="utf-8") as file: with open(path, "r", encoding="utf-8") as file:
return json.load(file) return json.load(file)
except Exception as e: except Exception as e:
print(f"[warn] Could not load cache: {e}") print(f"{C.WARN}[warn]{C.RESET} Could not load cache: {e}")
return [] return []
# ------------------------------------------------------------ # ------------------------------------------------------------
@ -198,17 +191,17 @@ class RSSFeedHandler:
indent=2, indent=2,
) )
except Exception as e: except Exception as e:
print(f"[warn] Failed to save cache for {path}: {e}") print(f"{C.WARN}[warn]{C.RESET} Failed to save cache for {path}: {e}")
# ------------------------------------------------------------ # ------------------------------------------------------------
def update_feeds(self, force: bool = False) -> None: def update_feeds(self, force: bool = False) -> None:
"""Fetches and caches all active feeds.""" """Fetches and caches all active feeds."""
active_feeds = self.feed_loader.get_active_feeds() active_feeds = self.feed_loader.get_active_feeds()
if not active_feeds: if not active_feeds:
print("[warn] No active feeds to update.") print(f"{C.WARN}[warn]{C.RESET} No active feeds to update.")
return return
print(f"[info] Updating {len(active_feeds)} feeds for region: {self.region}") print(f"{C.INFO}[info]{C.RESET} Updating {len(active_feeds)} feeds for region: {C.SUCCESS}{self.region}{C.RESET}")
for feed in active_feeds: for feed in active_feeds:
feed_name = feed["Name"] feed_name = feed["Name"]
@ -216,21 +209,21 @@ class RSSFeedHandler:
cache_path = self._get_cache_path(feed_name) cache_path = self._get_cache_path(feed_name)
if not force and self._is_cache_valid(cache_path): if not force and self._is_cache_valid(cache_path):
print(f"[info] Cache valid for: {feed_name}") print(f"{C.DEBUG}[debug]{C.RESET} Cache valid for: {feed_name}")
continue continue
print(f"[info] Fetching: {feed_name}") print(f"{C.INFO}[info]{C.RESET} Fetching: {C.TITLE}{feed_name}{C.RESET}")
xml_data = self._fetch_rss(feed_url) xml_data = self._fetch_rss(feed_url)
if not xml_data: if not xml_data:
continue continue
stories = self._parse_rss(xml_data) stories = self._parse_rss(xml_data)
if not stories: if not stories:
print(f"[warn] No valid stories found in {feed_name}") print(f"{C.WARN}[warn]{C.RESET} No valid stories found in {feed_name}")
continue continue
self._save_cache(cache_path, stories) self._save_cache(cache_path, stories)
print(f"[info] Cached {len(stories)} stories from {feed_name}") print(f"{C.SUCCESS}[success]{C.RESET} Cached {len(stories)} stories from {feed_name}")
# ------------------------------------------------------------ # ------------------------------------------------------------
def load_cached_feeds(self) -> dict[str, list[dict]]: def load_cached_feeds(self) -> dict[str, list[dict]]:
@ -260,7 +253,7 @@ if __name__ == "__main__":
cached = handler.load_cached_feeds() cached = handler.load_cached_feeds()
for feed, data in cached.items(): for feed, data in cached.items():
print(f"\n=== {feed} ===") print(f"\n{C.SECTION}=== {feed} ==={C.RESET}")
if isinstance(data, dict): if isinstance(data, dict):
stories = data.get("Stories", []) stories = data.get("Stories", [])
else: else:

166
src/rss/rss_feedIndex.py Normal file
View File

@ -0,0 +1,166 @@
# ============================================================
# File: src/rss/rss_feedIndex.py
# Description:
# Converts cached RSS feed data into Telefact pages.
# Builds index pages (headlines) and paginated subpages
# (story summaries) using the TelefactFrame grid (40×24).
# Designed for use in broadcaster rotation.
# ============================================================
import os
import textwrap
from datetime import datetime, timezone
from src.core.telefact_frame import TelefactFrame
from src.rss.rss_feedHandler import RSSFeedHandler
from src.core.terminal_colors import TerminalColors as C
class RSSFeedIndex:
"""
Generates TelefactFrame pages and subpages from cached RSS feeds.
Produces:
- Index page (list of headlines)
- One or more subpages per story (paginated if long)
"""
def __init__(self, cache_dir: str = "Cache/Feeds", region: str | None = None):
self.handler = RSSFeedHandler(cache_dir=cache_dir)
self.region = region or self.handler.region
self.frames = {} # { page_number: [TelefactFrame, TelefactFrame, ...] }
# layout constants
self.max_width = 40
self.header_rows = 1
self.footer_rows = 1
self.body_start = self.header_rows
self.body_end = 23 # up to line 23 (0-indexed)
self.body_lines = self.body_end - self.body_start
# ------------------------------------------------------------
def _wrap_text(self, text: str) -> list[str]:
"""Breaks plain text into 40-character lines."""
wrapped = textwrap.wrap(text, width=self.max_width)
return wrapped or ["(no content)"]
# ------------------------------------------------------------
def _new_frame(self) -> TelefactFrame:
"""Create a new blank Telefact frame."""
return TelefactFrame()
# ------------------------------------------------------------
def _draw_centered(self, frame: TelefactFrame, row: int, text: str, color: str = "white"):
"""Center text horizontally within the 40-column grid."""
text = text.strip()
if len(text) > self.max_width:
text = text[:self.max_width]
start_col = max(0, (self.max_width - len(text)) // 2)
for i, ch in enumerate(text):
frame.set_cell(start_col + i, row, ch, color)
# ------------------------------------------------------------
def _build_index_page(self, feed_name: str, stories: list[dict]) -> TelefactFrame:
"""Builds a single index page listing headlines."""
frame = self._new_frame()
self._draw_centered(frame, 1, f"{feed_name.upper()} INDEX", "yellow")
# timestamp (UTC)
timestamp = datetime.now(timezone.utc).strftime("%b %d %H:%M UTC")
self._draw_centered(frame, 2, f"Updated {timestamp}", "green")
row = 4
for i, story in enumerate(stories[: self.body_lines - 4]):
title = story.get("Title", "").strip()
if not title:
continue
display = f"{i + 1:02d}. {title[: self.max_width - 5]}"
for j, ch in enumerate(display):
if j < self.max_width:
frame.set_cell(j, row, ch, "white")
row += 1
self._draw_centered(frame, 23, "Telefact: The world at your fingertips", "cyan")
return frame
# ------------------------------------------------------------
def _build_story_subpages(self, story: dict) -> list[TelefactFrame]:
"""Builds one or more subpages for a story (paginated)."""
title = story.get("Title", "").strip()
body = story.get("Summary", "").strip()
pubdate = story.get("PubDate", "").strip()
wrapped_body = self._wrap_text(body)
lines_per_page = self.body_lines - 4 # reserve rows for title/date/footer
chunks = [
wrapped_body[i : i + lines_per_page]
for i in range(0, len(wrapped_body), lines_per_page)
]
subpages = []
total_pages = len(chunks)
for page_num, chunk in enumerate(chunks, start=1):
frame = self._new_frame()
self._draw_centered(frame, 1, title, "yellow")
if pubdate:
self._draw_centered(frame, 2, pubdate, "green")
row = 4
for line in chunk:
for j, ch in enumerate(line[: self.max_width]):
frame.set_cell(j, row, ch, "white")
row += 1
footer_text = f"Page {page_num}/{total_pages}"
self._draw_centered(frame, 23, footer_text, "cyan")
subpages.append(frame)
return subpages
# ------------------------------------------------------------
def build_all_pages(self):
"""
Loads cached RSS feeds and builds Teletext-ready pages.
Returns a dict mapping feed names to their page structures:
{ "Top Stories": {"index": TelefactFrame, "subpages": [frames...]}, ... }
"""
cached = self.handler.load_cached_feeds()
if not cached:
print(f"{C.WARN}[warn]{C.RESET} No cached feeds found.")
return {}
page_map = {}
for feed_name, data in cached.items():
stories = []
if isinstance(data, dict):
stories = data.get("Stories", [])
elif isinstance(data, list):
stories = data
else:
continue
if not stories:
continue
index_frame = self._build_index_page(feed_name, stories)
subpages = []
for story in stories:
subpages.extend(self._build_story_subpages(story))
page_map[feed_name] = {"index": index_frame, "subpages": subpages}
print(f"{C.INFO}[info]{C.RESET} Built {len(page_map)} indexed feeds into TelefactFrames.")
return page_map
# ------------------------------------------------------------
# Manual test harness
# ------------------------------------------------------------
if __name__ == "__main__":
indexer = RSSFeedIndex()
pages = indexer.build_all_pages()
for feed_name, page_data in pages.items():
print(f"\n{C.SUCCESS}Feed:{C.RESET} {feed_name}")
subpage_count = len(page_data['subpages'])
print(f" {C.DEBUG}- Index ready, {subpage_count} story subpages.{C.RESET}")