Goondex/src/performers/bridge/enrichment_bridge.py

161 lines
6.3 KiB
Python

#!/usr/bin/env python3
"""
enrichment_bridge.py — Goondex v0.3.5-r3
────────────────────────────────────────
Performer enrichment orchestrator combining multiple data bridges:
TPDB → StashDB → PornPics → Local DB/JSON.
This module serves as a unification layer for all external data
sources, merging their metadata into the Goondex performer schema
and writing updated JSON + SQLite entries.
It can be triggered directly from the CLI using:
gx enrich-bridge [limit]
"""
import json
import traceback
from pathlib import Path
from typing import Optional, Dict
from utils.cli_colours import pink, lilac, cyan, yellow, green, red, heading
from src.performers.utils import normalize_name
from src.performers.db_manager import add_or_update_performer
from src.performers.bridge.tpdb_bridge import fetch_tpdb_performers
from src.performers.bridge.stashdb_bridge import fetch_stashdb_performer
from src.importer.pornpics_bridge import fetch_pornpics_profile
# ─────────────────────────────────────────────
# Paths
# ─────────────────────────────────────────────
BASE_DIR = Path(__file__).resolve().parents[2]
DATA_DIR = BASE_DIR / "data" / "performers"
DATA_DIR.mkdir(parents=True, exist_ok=True)
# ─────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────
def _merge_performer_data(base: Dict, overlay: Optional[Dict]) -> Dict:
"""
Merge overlay dict into base performer data where base values
are missing or null. Deep-merges nested 'stats' and 'sources'.
"""
if not overlay:
return base
for k, v in overlay.items():
if k == "stats":
base.setdefault("stats", {}).update(v)
elif k == "sources":
base.setdefault("sources", {}).update(v)
elif not base.get(k) or base[k] in ("", "-", None, [], {}):
base[k] = v
return base
def _save_json_cache(performer: Dict):
"""Write unified performer metadata to /data/performers."""
pid = performer.get("id") or normalize_name(performer.get("name", "unknown"))
path = DATA_DIR / f"{pid}.json"
try:
path.write_text(json.dumps(performer, indent=2, ensure_ascii=False), encoding="utf-8")
print(green(f"[💾] Cached performer → {path.name}"))
except Exception as e:
print(red(f"[ERROR] Failed to save cache for {pid}: {e}"))
def _load_json_cache(name: str) -> Optional[Dict]:
"""Load an existing performer cache if present."""
normalized = normalize_name(name)
for candidate in [
DATA_DIR / f"{normalized}.json",
DATA_DIR / f"{normalized.replace('_', '-')}.json",
DATA_DIR / f"{normalized.replace('-', '_')}.json",
]:
if candidate.exists():
try:
return json.loads(candidate.read_text(encoding="utf-8"))
except Exception:
continue
return None
# ─────────────────────────────────────────────
# Enrichment Logic
# ─────────────────────────────────────────────
def enrich_performer(name: str) -> Optional[Dict]:
"""
Run a full enrichment sequence for a single performer.
TPDB → StashDB → PornPics → DB/JSON
"""
print(heading(f"Enriching Performer: {name}", icon="💖"))
try:
# Load any existing cache first
performer = _load_json_cache(name) or {}
# ─ TPDB Fetch
print(cyan(f"[TPDB] Searching for '{name}'..."))
tpdb_results = fetch_tpdb_performers(limit=200)
tpdb_match = next((p for p in tpdb_results if name.lower() in p.get("name", "").lower()), None)
if tpdb_match:
print(green(f"[TPDB] Found → {tpdb_match['name']}"))
performer = _merge_performer_data(performer, tpdb_match)
else:
print(yellow(f"[TPDB] No direct match for '{name}'"))
# ─ StashDB Fetch
print(cyan(f"[STASHDB] Searching for '{name}'..."))
stashdb_data = fetch_stashdb_performer(name)
if stashdb_data:
print(green(f"[STASHDB] Found → {stashdb_data['name']}"))
performer = _merge_performer_data(performer, stashdb_data)
else:
print(yellow(f"[STASHDB] No results."))
# ─ PornPics Fetch
print(cyan(f"[PORNpics] Searching for '{name}'..."))
pp_data = fetch_pornpics_profile(name)
if pp_data:
print(green(f"[PORNpics] Found → {pp_data['name']}"))
performer = _merge_performer_data(performer, pp_data)
else:
print(yellow(f"[PORNpics] No profile found."))
# Save and update DB
if performer:
_save_json_cache(performer)
add_or_update_performer(performer)
print(green(f"[OK] Enrichment complete for {performer.get('name', name)}"))
else:
print(red(f"[ERROR] No data found for '{name}'"))
return performer
except Exception as e:
print(red(f"[CRITICAL] Failed to enrich {name}: {e}"))
print(traceback.format_exc())
return None
def enrich_all_performers(limit: Optional[int] = None):
"""
Run enrichment across all performers in /data/performers.
If limit is provided, only process that many entries.
"""
all_files = sorted(DATA_DIR.glob("*.json"))
if limit:
all_files = all_files[:limit]
print(heading(f"Launching Enrichment Bridge ({len(all_files)} performers)", icon="🧩"))
for idx, file in enumerate(all_files, start=1):
try:
data = json.loads(file.read_text(encoding="utf-8"))
name = data.get("name", file.stem)
print(lilac(f"\n[{idx}/{len(all_files)}] {name}"))
enrich_performer(name)
except Exception as e:
print(red(f"[ERROR] Failed to process {file.name}: {e}"))
continue
print(green("\n[OK] Bridge enrichment complete."))