Goondex/internal/import/service.go
Stu Leak 3b8adad57d 🚀 Goondex v0.1.0-dev3 - Comprehensive ML-Powered Search & Import System
MAJOR FEATURES ADDED:
======================

🤖 ML Analysis System:
- Comprehensive scene image analysis with per-scene predictions
- Enhanced database schema with scene_ml_analysis table
- Advanced detection for clothing colors, body types, age categories, positions, settings
- Support for multiple prediction types (clothing, body, sexual acts, etc.)
- Confidence scoring and ML source tracking

🧠 Enhanced Search Capabilities:
- Natural language parser for complex queries (e.g., "Teenage Riley Reid creampie older man pink thong black heels red couch")
- Category-based search with confidence-weighted results
- ML-enhanced tag matching with automatic fallback to traditional search
- Support for "Money Shot: Creampie" vs "Cum in Open Mouth" detection

🗄️ Advanced Database Schema:
- Male detection: circumcised field (0/1)
- Pubic hair types: natural, shaved, trimmed, landing strip, bushy, hairy
- Scene ML analysis table for storing per-scene predictions
- Comprehensive seed tags for all detection categories

🏗️ Dual Scraper Architecture:
- Flexible import service supporting both TPDB and Adult Empire scrapers
- Bulk scraper implementation for Adult Empire using multiple search strategies
- Progress tracking with Server-Sent Events (SSE) for real-time updates
- Graceful fallback from Adult Empire to TPDB when needed

📝 Enhanced Import System:
- Individual bulk imports (performers, studios, scenes, movies)
- Combined "import all" operation
- Real-time progress tracking with job management
- Error handling and retry mechanisms
- Support for multiple import sources and strategies

🔧 Technical Improvements:
- Modular component architecture for maintainability
- Enhanced error handling and logging
- Performance-optimized database queries with proper indexing
- Configurable import limits and rate limiting
- Comprehensive testing framework

This commit establishes Goondex as a comprehensive adult content discovery platform with ML-powered analysis and advanced search capabilities, ready for integration with computer vision models for automated tagging and scene analysis.
2025-12-30 21:52:25 -05:00

525 lines
15 KiB
Go

package import_service
import (
"context"
"fmt"
"log"
"git.leaktechnologies.dev/stu/Goondex/internal/db"
"git.leaktechnologies.dev/stu/Goondex/internal/model"
"git.leaktechnologies.dev/stu/Goondex/internal/scraper"
"git.leaktechnologies.dev/stu/Goondex/internal/scraper/tpdb"
)
// ProgressUpdate represents a progress update during import
type ProgressUpdate struct {
EntityType string `json:"entity_type"`
Current int `json:"current"`
Total int `json:"total"`
Percent float64 `json:"percent"`
Message string `json:"message"`
}
// ProgressCallback is called when progress is made
type ProgressCallback func(update ProgressUpdate)
// Service handles bulk import operations
type Service struct {
db *db.DB
scraper *tpdb.Scraper
bulkScraper scraper.BulkScraper
enricher *Enricher
}
// NewService creates a new import service
func NewService(database *db.DB, scraper *tpdb.Scraper) *Service {
return &Service{
db: database,
scraper: scraper,
bulkScraper: nil,
enricher: nil,
}
}
// NewFlexibleService creates a new import service with Adult Empire scraper
func NewFlexibleService(database *db.DB, bulkScraper scraper.BulkScraper) *Service {
return &Service{
db: database,
scraper: nil,
bulkScraper: bulkScraper,
enricher: nil,
}
}
// WithEnricher configures enrichment (optional).
func (s *Service) WithEnricher(enricher *Enricher) {
s.enricher = enricher
}
// BulkImportAllPerformersFlexible imports all performers using Adult Empire scraper
func (s *Service) BulkImportAllPerformersFlexible(ctx context.Context) (*ImportResult, error) {
if s.bulkScraper == nil {
return s.BulkImportAllPerformers(ctx)
}
result := &ImportResult{
EntityType: "performers",
}
performerStore := db.NewPerformerStore(s.db)
// Get all performers from scraper
searchResults, err := s.bulkScraper.SearchAllPerformers(ctx)
if err != nil {
return result, fmt.Errorf("failed to fetch performers: %w", err)
}
result.Total = len(searchResults)
log.Printf("Found %d performer search results to import", len(searchResults))
// Import each performer
imported := 0
failed := 0
for _, searchResult := range searchResults {
// Convert to model
performer := s.bulkScraper.ConvertPerformerToModel(&searchResult)
if performer == nil {
failed++
continue
}
// Set source metadata
performer.Source = "adultempire"
performer.SourceID = searchResult.URL
// Try to create performer
if err := performerStore.Create(performer); err != nil {
log.Printf("Failed to import performer %s: %v", performer.Name, err)
failed++
} else {
imported++
log.Printf("Imported performer: %s", performer.Name)
}
}
result.Imported = imported
result.Failed = failed
log.Printf("Performers import complete: %d imported, %d failed", imported, failed)
return result, nil
}
// BulkImportAllScenesFlexible imports all scenes using Adult Empire scraper
func (s *Service) BulkImportAllScenesFlexible(ctx context.Context) (*ImportResult, error) {
if s.bulkScraper == nil {
return s.BulkImportAllScenes(ctx)
}
result := &ImportResult{
EntityType: "scenes",
}
sceneStore := db.NewSceneStore(s.db)
// Get all scenes from scraper
searchResults, err := s.bulkScraper.SearchAllScenes(ctx)
if err != nil {
return result, fmt.Errorf("failed to fetch scenes: %w", err)
}
result.Total = len(searchResults)
log.Printf("Found %d scene search results to import", len(searchResults))
// Import each scene
imported := 0
failed := 0
for _, searchResult := range searchResults {
// Convert to model
scene := s.bulkScraper.ConvertSceneToModel(&searchResult)
if scene == nil {
failed++
continue
}
// Set source metadata
scene.Source = "adultempire"
scene.SourceID = searchResult.URL
// Try to create scene
if err := sceneStore.Create(scene); err != nil {
log.Printf("Failed to import scene %s: %v", scene.Title, err)
failed++
} else {
imported++
log.Printf("Imported scene: %s", scene.Title)
}
}
result.Imported = imported
result.Failed = failed
log.Printf("Scenes import complete: %d imported, %d failed", imported, failed)
return result, nil
}
// ImportResult contains the results of an import operation
type ImportResult struct {
EntityType string
Imported int
Failed int
Total int
}
// BulkImportAllPerformers imports all performers from TPDB
func (s *Service) BulkImportAllPerformers(ctx context.Context) (*ImportResult, error) {
return s.BulkImportAllPerformersWithProgress(ctx, nil)
}
// BulkImportAllPerformersWithProgress imports all performers from TPDB with progress updates
func (s *Service) BulkImportAllPerformersWithProgress(ctx context.Context, progress ProgressCallback) (*ImportResult, error) {
result := &ImportResult{
EntityType: "performers",
}
performerStore := db.NewPerformerStore(s.db)
page := 1
for {
performers, meta, err := s.scraper.ListPerformers(ctx, page)
if err != nil {
return result, fmt.Errorf("failed to fetch page %d: %w", page, err)
}
// Update total on first page
if meta != nil && page == 1 {
result.Total = meta.Total
if meta.Total >= 10000 {
log.Printf("TPDB performers total reports %d (cap?). Continuing to paginate until empty.", meta.Total)
}
}
// Stop when no data is returned
if len(performers) == 0 {
log.Printf("No performers returned at page %d; stopping import.", page)
break
}
// Import each performer
for _, performer := range performers {
if err := performerStore.Upsert(&performer); err != nil {
log.Printf("Failed to import performer %s: %v", performer.Name, err)
result.Failed++
} else {
result.Imported++
if s.enricher != nil {
s.enricher.EnrichPerformer(ctx, &performer)
}
}
// Send progress update
if progress != nil && result.Total > 0 {
progress(ProgressUpdate{
EntityType: "performers",
Current: result.Imported,
Total: result.Total,
Percent: float64(result.Imported) / float64(result.Total) * 100,
Message: fmt.Sprintf("Imported %d/%d performers", result.Imported, result.Total),
})
}
}
log.Printf("Imported page %d/%d of performers (%d/%d total)", page, meta.LastPage, result.Imported, result.Total)
page++
}
return result, nil
}
// BulkImportAllStudios imports all studios from TPDB
func (s *Service) BulkImportAllStudios(ctx context.Context) (*ImportResult, error) {
return s.BulkImportAllStudiosWithProgress(ctx, nil)
}
// BulkImportAllStudiosWithProgress imports all studios from TPDB with progress updates
func (s *Service) BulkImportAllStudiosWithProgress(ctx context.Context, progress ProgressCallback) (*ImportResult, error) {
result := &ImportResult{
EntityType: "studios",
}
studioStore := db.NewStudioStore(s.db)
page := 1
for {
studios, meta, err := s.scraper.ListStudios(ctx, page)
if err != nil {
return result, fmt.Errorf("failed to fetch page %d: %w", page, err)
}
// Update total on first page
if meta != nil && page == 1 {
result.Total = meta.Total
if meta.Total >= 10000 {
log.Printf("TPDB studios total reports %d (cap?). Continuing to paginate until empty.", meta.Total)
}
}
if len(studios) == 0 {
log.Printf("No studios returned at page %d; stopping import.", page)
break
}
// Import each studio
for _, studio := range studios {
if err := studioStore.Upsert(&studio); err != nil {
log.Printf("Failed to import studio %s: %v", studio.Name, err)
result.Failed++
} else {
result.Imported++
}
// Send progress update
if progress != nil && result.Total > 0 {
progress(ProgressUpdate{
EntityType: "studios",
Current: result.Imported,
Total: result.Total,
Percent: float64(result.Imported) / float64(result.Total) * 100,
Message: fmt.Sprintf("Imported %d/%d studios", result.Imported, result.Total),
})
}
}
log.Printf("Imported page %d/%d of studios (%d/%d total)", page, meta.LastPage, result.Imported, result.Total)
page++
}
return result, nil
}
// BulkImportAllScenes imports all scenes from TPDB
func (s *Service) BulkImportAllScenes(ctx context.Context) (*ImportResult, error) {
return s.BulkImportAllScenesWithProgress(ctx, nil)
}
// BulkImportAllScenesWithProgress imports all scenes from TPDB with progress updates
func (s *Service) BulkImportAllScenesWithProgress(ctx context.Context, progress ProgressCallback) (*ImportResult, error) {
result := &ImportResult{
EntityType: "scenes",
}
performerStore := db.NewPerformerStore(s.db)
studioStore := db.NewStudioStore(s.db)
sceneStore := db.NewSceneStore(s.db)
tagStore := db.NewTagStore(s.db)
page := 1
for {
scenes, meta, err := s.scraper.ListScenes(ctx, page)
if err != nil {
return result, fmt.Errorf("failed to fetch page %d: %w", page, err)
}
// Update total on first page
if meta != nil && page == 1 {
result.Total = meta.Total
if meta.Total >= 10000 {
log.Printf("TPDB scenes total reports %d (cap?). Continuing to paginate until empty.", meta.Total)
}
}
if len(scenes) == 0 {
log.Printf("No scenes returned at page %d; stopping import.", page)
break
}
// Import each scene with its performers and tags
for _, scene := range scenes {
// First import performers from the scene
for _, performer := range scene.Performers {
if err := performerStore.Upsert(&performer); err != nil {
log.Printf("Failed to import performer %s for scene %s: %v", performer.Name, scene.Title, err)
}
}
// Import studio if present
if scene.Studio != nil {
if err := studioStore.Upsert(scene.Studio); err != nil {
log.Printf("Failed to import studio %s for scene %s: %v", scene.Studio.Name, scene.Title, err)
}
// Look up the studio ID
existingStudio, err := studioStore.GetBySourceID("tpdb", scene.Studio.SourceID)
if err == nil && existingStudio != nil {
scene.StudioID = &existingStudio.ID
}
}
// Import tags
for _, tag := range scene.Tags {
if err := tagStore.Upsert(&tag); err != nil {
log.Printf("Failed to import tag %s for scene %s: %v", tag.Name, scene.Title, err)
}
}
// Import the scene
if err := sceneStore.Upsert(&scene); err != nil {
log.Printf("Failed to import scene %s: %v", scene.Title, err)
result.Failed++
continue
}
// Get the scene ID
existingScene, err := sceneStore.GetBySourceID("tpdb", scene.SourceID)
if err != nil {
log.Printf("Failed to lookup scene %s after import: %v", scene.Title, err)
result.Failed++
continue
}
// Link performers to scene
for _, performer := range scene.Performers {
existingPerformer, err := performerStore.GetBySourceID("tpdb", performer.SourceID)
if err == nil && existingPerformer != nil {
if err := sceneStore.AddPerformer(existingScene.ID, existingPerformer.ID); err != nil {
log.Printf("Failed to link performer %s to scene %s: %v", performer.Name, scene.Title, err)
}
}
}
// Link tags to scene
for _, tag := range scene.Tags {
existingTag, err := tagStore.GetBySourceID("tpdb", tag.SourceID)
if err == nil && existingTag != nil {
if err := sceneStore.AddTag(existingScene.ID, existingTag.ID); err != nil {
log.Printf("Failed to link tag %s to scene %s: %v", tag.Name, scene.Title, err)
}
}
}
result.Imported++
// Send progress update
if progress != nil && result.Total > 0 {
progress(ProgressUpdate{
EntityType: "scenes",
Current: result.Imported,
Total: result.Total,
Percent: float64(result.Imported) / float64(result.Total) * 100,
Message: fmt.Sprintf("Imported %d/%d scenes", result.Imported, result.Total),
})
}
}
log.Printf("Imported page %d/%d of scenes (%d/%d total)", page, meta.LastPage, result.Imported, result.Total)
page++
}
return result, nil
}
// BulkImportAll imports all data from TPDB (performers, studios, scenes)
func (s *Service) BulkImportAll(ctx context.Context) ([]ImportResult, error) {
var results []ImportResult
log.Println("Starting bulk import of all TPDB data...")
// Import performers first
log.Println("Importing performers...")
performerResult, err := s.BulkImportAllPerformers(ctx)
if err != nil {
return results, fmt.Errorf("failed to import performers: %w", err)
}
results = append(results, *performerResult)
// Import studios
log.Println("Importing studios...")
studioResult, err := s.BulkImportAllStudios(ctx)
if err != nil {
return results, fmt.Errorf("failed to import studios: %w", err)
}
results = append(results, *studioResult)
// Import scenes (with their performers and tags)
log.Println("Importing scenes...")
sceneResult, err := s.BulkImportAllScenes(ctx)
if err != nil {
return results, fmt.Errorf("failed to import scenes: %w", err)
}
results = append(results, *sceneResult)
log.Println("Bulk import complete!")
return results, nil
}
// ImportScene imports a single scene with all its related data
func (s *Service) ImportScene(ctx context.Context, scene *model.Scene) error {
performerStore := db.NewPerformerStore(s.db)
studioStore := db.NewStudioStore(s.db)
sceneStore := db.NewSceneStore(s.db)
tagStore := db.NewTagStore(s.db)
// Import performers first
for _, performer := range scene.Performers {
if err := performerStore.Upsert(&performer); err != nil {
return fmt.Errorf("failed to import performer %s: %w", performer.Name, err)
}
}
// Import tags
for _, tag := range scene.Tags {
if err := tagStore.Upsert(&tag); err != nil {
return fmt.Errorf("failed to import tag %s: %w", tag.Name, err)
}
}
// Import studio if present
if scene.Studio != nil {
if err := studioStore.Upsert(scene.Studio); err != nil {
return fmt.Errorf("failed to import studio %s: %w", scene.Studio.Name, err)
}
// Look up the studio ID
existingStudio, err := studioStore.GetBySourceID("tpdb", scene.Studio.SourceID)
if err == nil && existingStudio != nil {
scene.StudioID = &existingStudio.ID
}
}
// Import the scene
if err := sceneStore.Upsert(scene); err != nil {
return fmt.Errorf("failed to import scene: %w", err)
}
// Get the scene ID
existingScene, err := sceneStore.GetBySourceID("tpdb", scene.SourceID)
if err != nil {
return fmt.Errorf("failed to lookup scene after import: %w", err)
}
// Link performers to scene
for _, performer := range scene.Performers {
existingPerformer, err := performerStore.GetBySourceID("tpdb", performer.SourceID)
if err == nil && existingPerformer != nil {
if err := sceneStore.AddPerformer(existingScene.ID, existingPerformer.ID); err != nil {
return fmt.Errorf("failed to link performer %s: %w", performer.Name, err)
}
}
}
// Link tags to scene
for _, tag := range scene.Tags {
existingTag, err := tagStore.GetBySourceID("tpdb", tag.SourceID)
if err == nil && existingTag != nil {
if err := sceneStore.AddTag(existingScene.ID, existingTag.ID); err != nil {
return fmt.Errorf("failed to link tag %s: %w", tag.Name, err)
}
}
}
return nil
}