MAJOR FEATURES ADDED: ====================== 🤖 ML Analysis System: - Comprehensive scene image analysis with per-scene predictions - Enhanced database schema with scene_ml_analysis table - Advanced detection for clothing colors, body types, age categories, positions, settings - Support for multiple prediction types (clothing, body, sexual acts, etc.) - Confidence scoring and ML source tracking 🧠 Enhanced Search Capabilities: - Natural language parser for complex queries (e.g., "Teenage Riley Reid creampie older man pink thong black heels red couch") - Category-based search with confidence-weighted results - ML-enhanced tag matching with automatic fallback to traditional search - Support for "Money Shot: Creampie" vs "Cum in Open Mouth" detection 🗄️ Advanced Database Schema: - Male detection: circumcised field (0/1) - Pubic hair types: natural, shaved, trimmed, landing strip, bushy, hairy - Scene ML analysis table for storing per-scene predictions - Comprehensive seed tags for all detection categories 🏗️ Dual Scraper Architecture: - Flexible import service supporting both TPDB and Adult Empire scrapers - Bulk scraper implementation for Adult Empire using multiple search strategies - Progress tracking with Server-Sent Events (SSE) for real-time updates - Graceful fallback from Adult Empire to TPDB when needed 📝 Enhanced Import System: - Individual bulk imports (performers, studios, scenes, movies) - Combined "import all" operation - Real-time progress tracking with job management - Error handling and retry mechanisms - Support for multiple import sources and strategies 🔧 Technical Improvements: - Modular component architecture for maintainability - Enhanced error handling and logging - Performance-optimized database queries with proper indexing - Configurable import limits and rate limiting - Comprehensive testing framework This commit establishes Goondex as a comprehensive adult content discovery platform with ML-powered analysis and advanced search capabilities, ready for integration with computer vision models for automated tagging and scene analysis.
525 lines
15 KiB
Go
525 lines
15 KiB
Go
package import_service
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
|
|
"git.leaktechnologies.dev/stu/Goondex/internal/db"
|
|
"git.leaktechnologies.dev/stu/Goondex/internal/model"
|
|
"git.leaktechnologies.dev/stu/Goondex/internal/scraper"
|
|
"git.leaktechnologies.dev/stu/Goondex/internal/scraper/tpdb"
|
|
)
|
|
|
|
// ProgressUpdate represents a progress update during import
|
|
type ProgressUpdate struct {
|
|
EntityType string `json:"entity_type"`
|
|
Current int `json:"current"`
|
|
Total int `json:"total"`
|
|
Percent float64 `json:"percent"`
|
|
Message string `json:"message"`
|
|
}
|
|
|
|
// ProgressCallback is called when progress is made
|
|
type ProgressCallback func(update ProgressUpdate)
|
|
|
|
// Service handles bulk import operations
|
|
type Service struct {
|
|
db *db.DB
|
|
scraper *tpdb.Scraper
|
|
bulkScraper scraper.BulkScraper
|
|
enricher *Enricher
|
|
}
|
|
|
|
// NewService creates a new import service
|
|
func NewService(database *db.DB, scraper *tpdb.Scraper) *Service {
|
|
return &Service{
|
|
db: database,
|
|
scraper: scraper,
|
|
bulkScraper: nil,
|
|
enricher: nil,
|
|
}
|
|
}
|
|
|
|
// NewFlexibleService creates a new import service with Adult Empire scraper
|
|
func NewFlexibleService(database *db.DB, bulkScraper scraper.BulkScraper) *Service {
|
|
return &Service{
|
|
db: database,
|
|
scraper: nil,
|
|
bulkScraper: bulkScraper,
|
|
enricher: nil,
|
|
}
|
|
}
|
|
|
|
// WithEnricher configures enrichment (optional).
|
|
func (s *Service) WithEnricher(enricher *Enricher) {
|
|
s.enricher = enricher
|
|
}
|
|
|
|
// BulkImportAllPerformersFlexible imports all performers using Adult Empire scraper
|
|
func (s *Service) BulkImportAllPerformersFlexible(ctx context.Context) (*ImportResult, error) {
|
|
if s.bulkScraper == nil {
|
|
return s.BulkImportAllPerformers(ctx)
|
|
}
|
|
|
|
result := &ImportResult{
|
|
EntityType: "performers",
|
|
}
|
|
|
|
performerStore := db.NewPerformerStore(s.db)
|
|
|
|
// Get all performers from scraper
|
|
searchResults, err := s.bulkScraper.SearchAllPerformers(ctx)
|
|
if err != nil {
|
|
return result, fmt.Errorf("failed to fetch performers: %w", err)
|
|
}
|
|
|
|
result.Total = len(searchResults)
|
|
log.Printf("Found %d performer search results to import", len(searchResults))
|
|
|
|
// Import each performer
|
|
imported := 0
|
|
failed := 0
|
|
|
|
for _, searchResult := range searchResults {
|
|
// Convert to model
|
|
performer := s.bulkScraper.ConvertPerformerToModel(&searchResult)
|
|
if performer == nil {
|
|
failed++
|
|
continue
|
|
}
|
|
|
|
// Set source metadata
|
|
performer.Source = "adultempire"
|
|
performer.SourceID = searchResult.URL
|
|
|
|
// Try to create performer
|
|
if err := performerStore.Create(performer); err != nil {
|
|
log.Printf("Failed to import performer %s: %v", performer.Name, err)
|
|
failed++
|
|
} else {
|
|
imported++
|
|
log.Printf("Imported performer: %s", performer.Name)
|
|
}
|
|
}
|
|
|
|
result.Imported = imported
|
|
result.Failed = failed
|
|
|
|
log.Printf("Performers import complete: %d imported, %d failed", imported, failed)
|
|
return result, nil
|
|
}
|
|
|
|
// BulkImportAllScenesFlexible imports all scenes using Adult Empire scraper
|
|
func (s *Service) BulkImportAllScenesFlexible(ctx context.Context) (*ImportResult, error) {
|
|
if s.bulkScraper == nil {
|
|
return s.BulkImportAllScenes(ctx)
|
|
}
|
|
|
|
result := &ImportResult{
|
|
EntityType: "scenes",
|
|
}
|
|
|
|
sceneStore := db.NewSceneStore(s.db)
|
|
|
|
// Get all scenes from scraper
|
|
searchResults, err := s.bulkScraper.SearchAllScenes(ctx)
|
|
if err != nil {
|
|
return result, fmt.Errorf("failed to fetch scenes: %w", err)
|
|
}
|
|
|
|
result.Total = len(searchResults)
|
|
log.Printf("Found %d scene search results to import", len(searchResults))
|
|
|
|
// Import each scene
|
|
imported := 0
|
|
failed := 0
|
|
|
|
for _, searchResult := range searchResults {
|
|
// Convert to model
|
|
scene := s.bulkScraper.ConvertSceneToModel(&searchResult)
|
|
if scene == nil {
|
|
failed++
|
|
continue
|
|
}
|
|
|
|
// Set source metadata
|
|
scene.Source = "adultempire"
|
|
scene.SourceID = searchResult.URL
|
|
|
|
// Try to create scene
|
|
if err := sceneStore.Create(scene); err != nil {
|
|
log.Printf("Failed to import scene %s: %v", scene.Title, err)
|
|
failed++
|
|
} else {
|
|
imported++
|
|
log.Printf("Imported scene: %s", scene.Title)
|
|
}
|
|
}
|
|
|
|
result.Imported = imported
|
|
result.Failed = failed
|
|
|
|
log.Printf("Scenes import complete: %d imported, %d failed", imported, failed)
|
|
return result, nil
|
|
}
|
|
|
|
// ImportResult contains the results of an import operation
|
|
type ImportResult struct {
|
|
EntityType string
|
|
Imported int
|
|
Failed int
|
|
Total int
|
|
}
|
|
|
|
// BulkImportAllPerformers imports all performers from TPDB
|
|
func (s *Service) BulkImportAllPerformers(ctx context.Context) (*ImportResult, error) {
|
|
return s.BulkImportAllPerformersWithProgress(ctx, nil)
|
|
}
|
|
|
|
// BulkImportAllPerformersWithProgress imports all performers from TPDB with progress updates
|
|
func (s *Service) BulkImportAllPerformersWithProgress(ctx context.Context, progress ProgressCallback) (*ImportResult, error) {
|
|
result := &ImportResult{
|
|
EntityType: "performers",
|
|
}
|
|
|
|
performerStore := db.NewPerformerStore(s.db)
|
|
|
|
page := 1
|
|
for {
|
|
performers, meta, err := s.scraper.ListPerformers(ctx, page)
|
|
if err != nil {
|
|
return result, fmt.Errorf("failed to fetch page %d: %w", page, err)
|
|
}
|
|
|
|
// Update total on first page
|
|
if meta != nil && page == 1 {
|
|
result.Total = meta.Total
|
|
if meta.Total >= 10000 {
|
|
log.Printf("TPDB performers total reports %d (cap?). Continuing to paginate until empty.", meta.Total)
|
|
}
|
|
}
|
|
|
|
// Stop when no data is returned
|
|
if len(performers) == 0 {
|
|
log.Printf("No performers returned at page %d; stopping import.", page)
|
|
break
|
|
}
|
|
|
|
// Import each performer
|
|
for _, performer := range performers {
|
|
if err := performerStore.Upsert(&performer); err != nil {
|
|
log.Printf("Failed to import performer %s: %v", performer.Name, err)
|
|
result.Failed++
|
|
} else {
|
|
result.Imported++
|
|
if s.enricher != nil {
|
|
s.enricher.EnrichPerformer(ctx, &performer)
|
|
}
|
|
}
|
|
|
|
// Send progress update
|
|
if progress != nil && result.Total > 0 {
|
|
progress(ProgressUpdate{
|
|
EntityType: "performers",
|
|
Current: result.Imported,
|
|
Total: result.Total,
|
|
Percent: float64(result.Imported) / float64(result.Total) * 100,
|
|
Message: fmt.Sprintf("Imported %d/%d performers", result.Imported, result.Total),
|
|
})
|
|
}
|
|
}
|
|
|
|
log.Printf("Imported page %d/%d of performers (%d/%d total)", page, meta.LastPage, result.Imported, result.Total)
|
|
|
|
page++
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// BulkImportAllStudios imports all studios from TPDB
|
|
func (s *Service) BulkImportAllStudios(ctx context.Context) (*ImportResult, error) {
|
|
return s.BulkImportAllStudiosWithProgress(ctx, nil)
|
|
}
|
|
|
|
// BulkImportAllStudiosWithProgress imports all studios from TPDB with progress updates
|
|
func (s *Service) BulkImportAllStudiosWithProgress(ctx context.Context, progress ProgressCallback) (*ImportResult, error) {
|
|
result := &ImportResult{
|
|
EntityType: "studios",
|
|
}
|
|
|
|
studioStore := db.NewStudioStore(s.db)
|
|
|
|
page := 1
|
|
for {
|
|
studios, meta, err := s.scraper.ListStudios(ctx, page)
|
|
if err != nil {
|
|
return result, fmt.Errorf("failed to fetch page %d: %w", page, err)
|
|
}
|
|
|
|
// Update total on first page
|
|
if meta != nil && page == 1 {
|
|
result.Total = meta.Total
|
|
if meta.Total >= 10000 {
|
|
log.Printf("TPDB studios total reports %d (cap?). Continuing to paginate until empty.", meta.Total)
|
|
}
|
|
}
|
|
|
|
if len(studios) == 0 {
|
|
log.Printf("No studios returned at page %d; stopping import.", page)
|
|
break
|
|
}
|
|
|
|
// Import each studio
|
|
for _, studio := range studios {
|
|
if err := studioStore.Upsert(&studio); err != nil {
|
|
log.Printf("Failed to import studio %s: %v", studio.Name, err)
|
|
result.Failed++
|
|
} else {
|
|
result.Imported++
|
|
}
|
|
|
|
// Send progress update
|
|
if progress != nil && result.Total > 0 {
|
|
progress(ProgressUpdate{
|
|
EntityType: "studios",
|
|
Current: result.Imported,
|
|
Total: result.Total,
|
|
Percent: float64(result.Imported) / float64(result.Total) * 100,
|
|
Message: fmt.Sprintf("Imported %d/%d studios", result.Imported, result.Total),
|
|
})
|
|
}
|
|
}
|
|
|
|
log.Printf("Imported page %d/%d of studios (%d/%d total)", page, meta.LastPage, result.Imported, result.Total)
|
|
|
|
page++
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// BulkImportAllScenes imports all scenes from TPDB
|
|
func (s *Service) BulkImportAllScenes(ctx context.Context) (*ImportResult, error) {
|
|
return s.BulkImportAllScenesWithProgress(ctx, nil)
|
|
}
|
|
|
|
// BulkImportAllScenesWithProgress imports all scenes from TPDB with progress updates
|
|
func (s *Service) BulkImportAllScenesWithProgress(ctx context.Context, progress ProgressCallback) (*ImportResult, error) {
|
|
result := &ImportResult{
|
|
EntityType: "scenes",
|
|
}
|
|
|
|
performerStore := db.NewPerformerStore(s.db)
|
|
studioStore := db.NewStudioStore(s.db)
|
|
sceneStore := db.NewSceneStore(s.db)
|
|
tagStore := db.NewTagStore(s.db)
|
|
|
|
page := 1
|
|
for {
|
|
scenes, meta, err := s.scraper.ListScenes(ctx, page)
|
|
if err != nil {
|
|
return result, fmt.Errorf("failed to fetch page %d: %w", page, err)
|
|
}
|
|
|
|
// Update total on first page
|
|
if meta != nil && page == 1 {
|
|
result.Total = meta.Total
|
|
if meta.Total >= 10000 {
|
|
log.Printf("TPDB scenes total reports %d (cap?). Continuing to paginate until empty.", meta.Total)
|
|
}
|
|
}
|
|
|
|
if len(scenes) == 0 {
|
|
log.Printf("No scenes returned at page %d; stopping import.", page)
|
|
break
|
|
}
|
|
|
|
// Import each scene with its performers and tags
|
|
for _, scene := range scenes {
|
|
// First import performers from the scene
|
|
for _, performer := range scene.Performers {
|
|
if err := performerStore.Upsert(&performer); err != nil {
|
|
log.Printf("Failed to import performer %s for scene %s: %v", performer.Name, scene.Title, err)
|
|
}
|
|
}
|
|
|
|
// Import studio if present
|
|
if scene.Studio != nil {
|
|
if err := studioStore.Upsert(scene.Studio); err != nil {
|
|
log.Printf("Failed to import studio %s for scene %s: %v", scene.Studio.Name, scene.Title, err)
|
|
}
|
|
// Look up the studio ID
|
|
existingStudio, err := studioStore.GetBySourceID("tpdb", scene.Studio.SourceID)
|
|
if err == nil && existingStudio != nil {
|
|
scene.StudioID = &existingStudio.ID
|
|
}
|
|
}
|
|
|
|
// Import tags
|
|
for _, tag := range scene.Tags {
|
|
if err := tagStore.Upsert(&tag); err != nil {
|
|
log.Printf("Failed to import tag %s for scene %s: %v", tag.Name, scene.Title, err)
|
|
}
|
|
}
|
|
|
|
// Import the scene
|
|
if err := sceneStore.Upsert(&scene); err != nil {
|
|
log.Printf("Failed to import scene %s: %v", scene.Title, err)
|
|
result.Failed++
|
|
continue
|
|
}
|
|
|
|
// Get the scene ID
|
|
existingScene, err := sceneStore.GetBySourceID("tpdb", scene.SourceID)
|
|
if err != nil {
|
|
log.Printf("Failed to lookup scene %s after import: %v", scene.Title, err)
|
|
result.Failed++
|
|
continue
|
|
}
|
|
|
|
// Link performers to scene
|
|
for _, performer := range scene.Performers {
|
|
existingPerformer, err := performerStore.GetBySourceID("tpdb", performer.SourceID)
|
|
if err == nil && existingPerformer != nil {
|
|
if err := sceneStore.AddPerformer(existingScene.ID, existingPerformer.ID); err != nil {
|
|
log.Printf("Failed to link performer %s to scene %s: %v", performer.Name, scene.Title, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Link tags to scene
|
|
for _, tag := range scene.Tags {
|
|
existingTag, err := tagStore.GetBySourceID("tpdb", tag.SourceID)
|
|
if err == nil && existingTag != nil {
|
|
if err := sceneStore.AddTag(existingScene.ID, existingTag.ID); err != nil {
|
|
log.Printf("Failed to link tag %s to scene %s: %v", tag.Name, scene.Title, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
result.Imported++
|
|
|
|
// Send progress update
|
|
if progress != nil && result.Total > 0 {
|
|
progress(ProgressUpdate{
|
|
EntityType: "scenes",
|
|
Current: result.Imported,
|
|
Total: result.Total,
|
|
Percent: float64(result.Imported) / float64(result.Total) * 100,
|
|
Message: fmt.Sprintf("Imported %d/%d scenes", result.Imported, result.Total),
|
|
})
|
|
}
|
|
}
|
|
|
|
log.Printf("Imported page %d/%d of scenes (%d/%d total)", page, meta.LastPage, result.Imported, result.Total)
|
|
|
|
page++
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// BulkImportAll imports all data from TPDB (performers, studios, scenes)
|
|
func (s *Service) BulkImportAll(ctx context.Context) ([]ImportResult, error) {
|
|
var results []ImportResult
|
|
|
|
log.Println("Starting bulk import of all TPDB data...")
|
|
|
|
// Import performers first
|
|
log.Println("Importing performers...")
|
|
performerResult, err := s.BulkImportAllPerformers(ctx)
|
|
if err != nil {
|
|
return results, fmt.Errorf("failed to import performers: %w", err)
|
|
}
|
|
results = append(results, *performerResult)
|
|
|
|
// Import studios
|
|
log.Println("Importing studios...")
|
|
studioResult, err := s.BulkImportAllStudios(ctx)
|
|
if err != nil {
|
|
return results, fmt.Errorf("failed to import studios: %w", err)
|
|
}
|
|
results = append(results, *studioResult)
|
|
|
|
// Import scenes (with their performers and tags)
|
|
log.Println("Importing scenes...")
|
|
sceneResult, err := s.BulkImportAllScenes(ctx)
|
|
if err != nil {
|
|
return results, fmt.Errorf("failed to import scenes: %w", err)
|
|
}
|
|
results = append(results, *sceneResult)
|
|
|
|
log.Println("Bulk import complete!")
|
|
|
|
return results, nil
|
|
}
|
|
|
|
// ImportScene imports a single scene with all its related data
|
|
func (s *Service) ImportScene(ctx context.Context, scene *model.Scene) error {
|
|
performerStore := db.NewPerformerStore(s.db)
|
|
studioStore := db.NewStudioStore(s.db)
|
|
sceneStore := db.NewSceneStore(s.db)
|
|
tagStore := db.NewTagStore(s.db)
|
|
|
|
// Import performers first
|
|
for _, performer := range scene.Performers {
|
|
if err := performerStore.Upsert(&performer); err != nil {
|
|
return fmt.Errorf("failed to import performer %s: %w", performer.Name, err)
|
|
}
|
|
}
|
|
|
|
// Import tags
|
|
for _, tag := range scene.Tags {
|
|
if err := tagStore.Upsert(&tag); err != nil {
|
|
return fmt.Errorf("failed to import tag %s: %w", tag.Name, err)
|
|
}
|
|
}
|
|
|
|
// Import studio if present
|
|
if scene.Studio != nil {
|
|
if err := studioStore.Upsert(scene.Studio); err != nil {
|
|
return fmt.Errorf("failed to import studio %s: %w", scene.Studio.Name, err)
|
|
}
|
|
// Look up the studio ID
|
|
existingStudio, err := studioStore.GetBySourceID("tpdb", scene.Studio.SourceID)
|
|
if err == nil && existingStudio != nil {
|
|
scene.StudioID = &existingStudio.ID
|
|
}
|
|
}
|
|
|
|
// Import the scene
|
|
if err := sceneStore.Upsert(scene); err != nil {
|
|
return fmt.Errorf("failed to import scene: %w", err)
|
|
}
|
|
|
|
// Get the scene ID
|
|
existingScene, err := sceneStore.GetBySourceID("tpdb", scene.SourceID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to lookup scene after import: %w", err)
|
|
}
|
|
|
|
// Link performers to scene
|
|
for _, performer := range scene.Performers {
|
|
existingPerformer, err := performerStore.GetBySourceID("tpdb", performer.SourceID)
|
|
if err == nil && existingPerformer != nil {
|
|
if err := sceneStore.AddPerformer(existingScene.ID, existingPerformer.ID); err != nil {
|
|
return fmt.Errorf("failed to link performer %s: %w", performer.Name, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Link tags to scene
|
|
for _, tag := range scene.Tags {
|
|
existingTag, err := tagStore.GetBySourceID("tpdb", tag.SourceID)
|
|
if err == nil && existingTag != nil {
|
|
if err := sceneStore.AddTag(existingScene.ID, existingTag.ID); err != nil {
|
|
return fmt.Errorf("failed to link tag %s: %w", tag.Name, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|