forked from Leak_Technologies/VideoTools
13 KiB
13 KiB
VideoTools Queue System - Complete Guide
Overview
The VideoTools queue system enables professional batch processing of multiple videos with:
- ✅ Job prioritization
- ✅ Pause/resume capabilities
- ✅ Real-time progress tracking
- ✅ Job history and persistence
- ✅ Thread-safe operations
- ✅ Context-based cancellation
Architecture
Core Components
internal/queue/queue.go (542 lines)
├── Queue struct (thread-safe job manager)
├── Job struct (individual task definition)
├── JobStatus & JobType enums
├── 24 public methods
└── JSON persistence layer
Queue Types
Job Types
const (
JobTypeConvert JobType = "convert" // Video encoding
JobTypeMerge JobType = "merge" // Video joining
JobTypeTrim JobType = "trim" // Video cutting
JobTypeFilter JobType = "filter" // Effects/filters
JobTypeUpscale JobType = "upscale" // Video enhancement
JobTypeAudio JobType = "audio" // Audio processing
JobTypeThumb JobType = "thumb" // Thumbnail generation
)
Job Status
const (
JobStatusPending JobStatus = "pending" // Waiting to run
JobStatusRunning JobStatus = "running" // Currently executing
JobStatusPaused JobStatus = "paused" // Paused by user
JobStatusCompleted JobStatus = "completed" // Finished successfully
JobStatusFailed JobStatus = "failed" // Encountered error
JobStatusCancelled JobStatus = "cancelled" // User cancelled
)
Data Structures
Job Structure
type Job struct {
ID string // Unique identifier
Type JobType // Job category
Status JobStatus // Current state
Title string // Display name
Description string // Details
InputFile string // Source video path
OutputFile string // Output path
Config map[string]interface{} // Job-specific config
Progress float64 // 0-100%
Error string // Error message if failed
CreatedAt time.Time // Creation timestamp
StartedAt *time.Time // Execution start
CompletedAt *time.Time // Completion timestamp
Priority int // Higher = runs first
cancel context.CancelFunc // Cancellation mechanism
}
Queue Operations
type Queue struct {
jobs []*Job // All jobs
executor JobExecutor // Function that executes jobs
running bool // Execution state
mu sync.RWMutex // Thread synchronization
onChange func() // Change notification callback
}
Public API Methods (24 methods)
Queue Management
// Create new queue
queue := queue.New(executorFunc)
// Set callback for state changes
queue.SetChangeCallback(func() {
// Called whenever queue state changes
// Use for UI updates
})
Job Operations
Adding Jobs
// Create job
job := &queue.Job{
Type: queue.JobTypeConvert,
Title: "Convert video.mp4",
Description: "Convert to DVD-NTSC",
InputFile: "input.mp4",
OutputFile: "output.mpg",
Config: map[string]interface{}{
"codec": "mpeg2video",
"bitrate": "6000k",
// ... other config
},
Priority: 5,
}
// Add to queue
queue.Add(job)
Removing/Canceling
// Remove job completely
queue.Remove(jobID)
// Cancel running job (keeps history)
queue.Cancel(jobID)
// Cancel all jobs
queue.CancelAll()
Retrieving Jobs
// Get single job
job := queue.Get(jobID)
// Get all jobs
allJobs := queue.List()
// Get statistics
pending, running, completed, failed := queue.Stats()
// Get jobs by status
runningJobs := queue.GetByStatus(queue.JobStatusRunning)
Pause/Resume Operations
// Pause running job
queue.Pause(jobID)
// Resume paused job
queue.Resume(jobID)
// Pause all jobs
queue.PauseAll()
// Resume all jobs
queue.ResumeAll()
Queue Control
// Start processing queue
queue.Start()
// Stop processing queue
queue.Stop()
// Check if queue is running
isRunning := queue.IsRunning()
// Clear completed jobs
queue.Clear()
// Clear all jobs
queue.ClearAll()
Job Ordering
// Reorder jobs by moving up/down
queue.MoveUp(jobID) // Move earlier in queue
queue.MoveDown(jobID) // Move later in queue
queue.MoveBefore(jobID, beforeID) // Insert before job
queue.MoveAfter(jobID, afterID) // Insert after job
// Update priority (higher = earlier)
queue.SetPriority(jobID, newPriority)
Persistence
// Save queue to JSON file
queue.Save(filepath)
// Load queue from JSON file
queue.Load(filepath)
Integration with Main.go
Current State
The queue system is fully implemented and working in main.go:
-
Queue Initialization (main.go, line ~1130)
state.jobQueue = queue.New(state.jobExecutor) state.jobQueue.SetChangeCallback(func() { fyne.CurrentApp().Driver().DoFromGoroutine(func() { state.updateStatsBar() state.updateQueueButtonLabel() }, false) }) -
Job Executor (main.go, line ~781)
func (s *appState) jobExecutor(ctx context.Context, job *queue.Job, progressCallback func(float64)) error { // Routes to appropriate handler based on job.Type } -
Convert Job Execution (main.go, line ~805)
func (s *appState) executeConvertJob(ctx context.Context, job *queue.Job, progressCallback func(float64)) error { // Full FFmpeg integration with progress callback } -
Queue UI (internal/ui/queueview.go, line ~317)
- View Queue button shows job list
- Progress tracking per job
- Pause/Resume/Cancel controls
- Job history display
DVD Integration with Queue
The queue system works seamlessly with DVD-NTSC encoding:
// Create DVD conversion job
dvdJob := &queue.Job{
Type: queue.JobTypeConvert,
Title: "Convert to DVD-NTSC: movie.mp4",
Description: "720×480 MPEG-2 for authoring",
InputFile: "movie.mp4",
OutputFile: "movie.mpg",
Config: map[string]interface{}{
"format": "DVD-NTSC (MPEG-2)",
"videoCodec": "MPEG-2",
"audioCodec": "AC-3",
"resolution": "720x480",
"framerate": "29.97",
"videoBitrate": "6000k",
"audioBitrate": "192k",
"selectedFormat": formatOption{Label: "DVD-NTSC", Ext: ".mpg"},
// ... validation warnings from convert.ValidateDVDNTSC()
},
Priority: 10, // High priority
}
// Add to queue
state.jobQueue.Add(dvdJob)
// Start processing
state.jobQueue.Start()
Batch Processing Example
Converting Multiple Videos to DVD-NTSC
// 1. Load multiple videos
inputFiles := []string{
"video1.avi",
"video2.mov",
"video3.mp4",
}
// 2. Create queue with executor
myQueue := queue.New(executeConversionJob)
myQueue.SetChangeCallback(updateUI)
// 3. Add jobs for each video
for i, input := range inputFiles {
src, _ := convert.ProbeVideo(input)
warnings := convert.ValidateDVDNTSC(src, convert.DVDNTSCPreset())
job := &queue.Job{
Type: queue.JobTypeConvert,
Title: fmt.Sprintf("DVD %d/%d: %s", i+1, len(inputFiles), filepath.Base(input)),
InputFile: input,
OutputFile: strings.TrimSuffix(input, filepath.Ext(input)) + ".mpg",
Config: map[string]interface{}{
"preset": "dvd-ntsc",
"warnings": warnings,
"videoCodec": "mpeg2video",
// ...
},
Priority: len(inputFiles) - i, // Earlier files higher priority
}
myQueue.Add(job)
}
// 4. Start processing
myQueue.Start()
// 5. Monitor progress
go func() {
for {
jobs := myQueue.List()
pending, running, completed, failed := myQueue.Stats()
fmt.Printf("Queue Status: %d pending, %d running, %d done, %d failed\n",
pending, running, completed, failed)
for _, job := range jobs {
if job.Status == queue.JobStatusRunning {
fmt.Printf(" ▶ %s: %.1f%%\n", job.Title, job.Progress)
}
}
time.Sleep(2 * time.Second)
}
}()
Progress Tracking
The queue provides real-time progress updates through:
1. Job Progress Field
job.Progress // 0-100% float64
2. Change Callback
queue.SetChangeCallback(func() {
// Called whenever job status/progress changes
// Should trigger UI refresh
})
3. Status Polling
pending, running, completed, failed := queue.Stats()
jobs := queue.List()
Example Progress Display
func displayProgress(queue *queue.Queue) {
jobs := queue.List()
for _, job := range jobs {
status := string(job.Status)
progress := fmt.Sprintf("%.1f%%", job.Progress)
fmt.Printf("[%-10s] %s: %s\n", status, job.Title, progress)
}
}
Error Handling
Job Failures
job := queue.Get(jobID)
if job.Status == queue.JobStatusFailed {
fmt.Printf("Job failed: %s\n", job.Error)
// Retry or inspect error
}
Retry Logic
failedJob := queue.Get(jobID)
if failedJob.Status == queue.JobStatusFailed {
// Create new job with same config
retryJob := &queue.Job{
Type: failedJob.Type,
Title: failedJob.Title + " (retry)",
InputFile: failedJob.InputFile,
OutputFile: failedJob.OutputFile,
Config: failedJob.Config,
Priority: 10, // Higher priority
}
queue.Add(retryJob)
}
Persistence
Save Queue State
// Save all jobs to JSON
queue.Save("/home/user/.videotools/queue.json")
Load Previous Queue
// Restore jobs from file
queue.Load("/home/user/.videotools/queue.json")
Queue File Format
[
{
"id": "job-uuid-1",
"type": "convert",
"status": "completed",
"title": "Convert video.mp4",
"description": "DVD-NTSC preset",
"input_file": "video.mp4",
"output_file": "video.mpg",
"config": {
"preset": "dvd-ntsc",
"videoCodec": "mpeg2video"
},
"progress": 100,
"created_at": "2025-11-29T12:00:00Z",
"started_at": "2025-11-29T12:05:00Z",
"completed_at": "2025-11-29T12:35:00Z",
"priority": 5
}
]
Thread Safety
The queue uses sync.RWMutex for complete thread safety:
// Safe for concurrent access
go queue.Add(job1)
go queue.Add(job2)
go queue.Remove(jobID)
go queue.Start()
// All operations are synchronized internally
Important: Callback Deadlock Prevention
// ❌ DON'T: Direct UI update in callback
queue.SetChangeCallback(func() {
button.SetText("Processing") // May deadlock on Fyne!
})
// ✅ DO: Use Fyne's thread marshaling
queue.SetChangeCallback(func() {
fyne.CurrentApp().Driver().DoFromGoroutine(func() {
button.SetText("Processing") // Safe
}, false)
})
Known Issues & Workarounds
Issue 1: CGO Compilation Hang
Status: Known issue, not queue-related
- Cause: GCC 15.2.1 with OpenGL binding compilation
- Workaround: Pre-built binary available in repository
Issue 2: Queue Callback Threading (FIXED in v0.1.0-dev11)
Status: RESOLVED
- Fix: Use
DoFromGoroutinefor Fyne callbacks - Implementation: See main.go line ~1130
Performance Characteristics
- Job Addition: O(1) - append only
- Job Removal: O(n) - linear search
- Status Update: O(1) - direct pointer access
- List Retrieval: O(n) - returns copy
- Stats Query: O(n) - counts all jobs
- Concurrency: Full thread-safe with RWMutex
Testing Queue System
Unit Tests (Recommended)
Create internal/queue/queue_test.go:
package queue
import (
"context"
"testing"
"time"
)
func TestAddJob(t *testing.T) {
q := New(func(ctx context.Context, job *Job, cb func(float64)) error {
return nil
})
job := &Job{
Type: JobTypeConvert,
Title: "Test Job",
}
q.Add(job)
if len(q.List()) != 1 {
t.Fatalf("Expected 1 job, got %d", len(q.List()))
}
}
func TestPauseResume(t *testing.T) {
// ... test pause/resume logic
}
Summary
The VideoTools queue system is:
- ✅ Complete: All 24 methods implemented
- ✅ Tested: Integrated in main.go and working
- ✅ Thread-Safe: Full RWMutex synchronization
- ✅ Persistent: JSON save/load capability
- ✅ DVD-Ready: Works with DVD-NTSC encoding jobs
Ready for:
- Batch processing of multiple videos
- DVD-NTSC conversions
- Real-time progress monitoring
- Job prioritization and reordering
- Professional video authoring workflows