3 QUEUE_SYSTEM_GUIDE
Gemini CLI edited this page 2026-03-13 11:05:17 -04:00
This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

VideoTools Queue System - Complete Guide

Overview

The VideoTools queue system enables professional batch processing of multiple videos with:

  • Job prioritization
  • Pause/resume capabilities
  • Real-time progress tracking
  • Job history and persistence
  • Thread-safe operations
  • Context-based cancellation

Architecture

Core Components

internal/queue/queue.go (542 lines)
├── Queue struct (thread-safe job manager)
├── Job struct (individual task definition)
├── JobStatus & JobType enums
├── 24 public methods
└── JSON persistence layer

Queue Types

Job Types

const (
    JobTypeConvert JobType = "convert"  // Video encoding
    JobTypeMerge   JobType = "merge"    // Video joining
    JobTypeTrim    JobType = "trim"     // Video cutting
    JobTypeFilter  JobType = "filter"   // Effects/filters
    JobTypeUpscale JobType = "upscale"  // Video enhancement
    JobTypeAudio   JobType = "audio"    // Audio processing
    JobTypeThumbnail JobType = "thumbnail" // Thumbnail generation
)

Job Status

const (
    JobStatusPending   JobStatus = "pending"    // Waiting to run
    JobStatusRunning   JobStatus = "running"    // Currently executing
    JobStatusPaused    JobStatus = "paused"     // Paused by user
    JobStatusCompleted JobStatus = "completed"  // Finished successfully
    JobStatusFailed    JobStatus = "failed"     // Encountered error
    JobStatusCancelled JobStatus = "cancelled"  // User cancelled
)

Data Structures

Job Structure

type Job struct {
    ID          string                 // Unique identifier
    Type        JobType                // Job category
    Status      JobStatus              // Current state
    Title       string                 // Display name
    Description string                 // Details
    InputFile   string                 // Source video path
    OutputFile  string                 // Output path
    Config      map[string]interface{} // Job-specific config
    Progress    float64                // 0-100%
    Error       string                 // Error message if failed
    CreatedAt   time.Time              // Creation timestamp
    StartedAt   *time.Time             // Execution start
    CompletedAt *time.Time             // Completion timestamp
    Priority    int                    // Higher = runs first
    cancel      context.CancelFunc     // Cancellation mechanism
}

Queue Operations

type Queue struct {
    jobs     []*Job              // All jobs
    executor JobExecutor         // Function that executes jobs
    running  bool                // Execution state
    mu       sync.RWMutex        // Thread synchronization
    onChange func()              // Change notification callback
}

Public API Methods (24 methods)

Queue Management

// Create new queue
queue := queue.New(executorFunc)

// Set callback for state changes
queue.SetChangeCallback(func() {
    // Called whenever queue state changes
    // Use for UI updates
})

Job Operations

Adding Jobs

// Create job
job := &queue.Job{
    Type:        queue.JobTypeConvert,
    Title:       "Convert video.mp4",
    Description: "Convert to DVD-NTSC",
    InputFile:   "input.mp4",
    OutputFile:  "output.mpg",
    Config:      map[string]interface{}{
        "codec":       "mpeg2video",
        "bitrate":     "6000k",
        // ... other config
    },
    Priority:    5,
}

// Add to queue
queue.Add(job)

Removing/Canceling

// Remove job completely
queue.Remove(jobID)

// Cancel running job (keeps history)
queue.Cancel(jobID)

// Cancel all jobs
queue.CancelAll()

Retrieving Jobs

// Get single job
job := queue.Get(jobID)

// Get all jobs
allJobs := queue.List()

// Get statistics
pending, running, completed, failed := queue.Stats()

// Get jobs by status
runningJobs := queue.GetByStatus(queue.JobStatusRunning)

Pause/Resume Operations

// Pause running job
queue.Pause(jobID)

// Resume paused job
queue.Resume(jobID)

// Pause all jobs
queue.PauseAll()

// Resume all jobs
queue.ResumeAll()

Queue Control

// Start processing queue
queue.Start()

// Stop processing queue
queue.Stop()

// Check if queue is running
isRunning := queue.IsRunning()

// Clear completed jobs
queue.Clear()

// Clear all jobs
queue.ClearAll()

Job Ordering

// Reorder jobs by moving up/down
queue.MoveUp(jobID)       // Move earlier in queue
queue.MoveDown(jobID)     // Move later in queue
queue.MoveBefore(jobID, beforeID)  // Insert before job
queue.MoveAfter(jobID, afterID)    // Insert after job

// Update priority (higher = earlier)
queue.SetPriority(jobID, newPriority)

Persistence

// Save queue to JSON file
queue.Save(filepath)

// Load queue from JSON file
queue.Load(filepath)

Integration with Main.go

Current State

The queue system is fully implemented and working in main.go:

  1. Queue Initialization (main.go, line ~1130)

    state.jobQueue = queue.New(state.jobExecutor)
    state.jobQueue.SetChangeCallback(func() {
        fyne.CurrentApp().Driver().DoFromGoroutine(func() {
            state.updateStatsBar()
            state.updateQueueButtonLabel()
        }, false)
    })
    
  2. Job Executor (main.go, line ~781)

    func (s *appState) jobExecutor(ctx context.Context, job *queue.Job, progressCallback func(float64)) error {
        // Routes to appropriate handler based on job.Type
    }
    
  3. Convert Job Execution (main.go, line ~805)

    func (s *appState) executeConvertJob(ctx context.Context, job *queue.Job, progressCallback func(float64)) error {
        // Full FFmpeg integration with progress callback
    }
    
  4. Queue UI (internal/ui/queueview.go, line ~317)

    • View Queue button shows job list
    • Progress tracking per job
    • Pause/Resume/Cancel controls
    • Job history display

DVD Integration with Queue

The queue system works seamlessly with DVD-NTSC encoding:

// Create DVD conversion job
DVDJob := &queue.Job{
    Type:        queue.JobTypeConvert,
    Title:       "Convert to DVD-NTSC: movie.mp4",
    Description: "720×480 MPEG-2 for authoring",
    InputFile:   "movie.mp4",
    OutputFile:  "movie.mpg",
    Config: map[string]interface{}{
        "format":              "DVD-NTSC (MPEG-2)",
        "videoCodec":          "MPEG-2",
        "audioCodec":          "AC-3",
        "resolution":          "720x480",
        "framerate":           "29.97",
        "videoBitrate":        "6000k",
        "audioBitrate":        "192k",
        "selectedFormat":      formatOption{Label: "DVD-NTSC", Ext: ".mpg"},
        // ... validation warnings from convert.ValidateDVDNTSC()
    },
    Priority: 10,  // High priority
}

// Add to queue
state.jobQueue.Add(DVDJob)

// Start processing
state.jobQueue.Start()

Batch Processing Example

Converting Multiple Videos to DVD-NTSC

// 1. Load multiple videos
inputFiles := []string{
    "video1.avi",
    "video2.mov",
    "video3.mp4",
}

// 2. Create queue with executor
myQueue := queue.New(executeConversionJob)
myQueue.SetChangeCallback(updateUI)

// 3. Add jobs for each video
for i, input := range inputFiles {
    src, _ := convert.ProbeVideo(input)
    warnings := convert.ValidateDVDNTSC(src, convert.DVDNTSCPreset())

    job := &queue.Job{
        Type:       queue.JobTypeConvert,
        Title:      fmt.Sprintf("DVD %d/%d: %s", i+1, len(inputFiles), filepath.Base(input)),
        InputFile:  input,
        OutputFile: strings.TrimSuffix(input, filepath.Ext(input)) + ".mpg",
        Config: map[string]interface{}{
            "preset":    "DVD-ntsc",
            "warnings":  warnings,
            "videoCodec": "mpeg2video",
            // ...
        },
        Priority: len(inputFiles) - i,  // Earlier files higher priority
    }
    myQueue.Add(job)
}

// 4. Start processing
myQueue.Start()

// 5. Monitor progress
go func() {
    for {
        jobs := myQueue.List()
        pending, running, completed, failed := myQueue.Stats()

        fmt.Printf("Queue Status: %d pending, %d running, %d done, %d failed\n",
            pending, running, completed, failed)

        for _, job := range jobs {
            if job.Status == queue.JobStatusRunning {
                fmt.Printf("  ▶ %s: %.1f%%\n", job.Title, job.Progress)
            }
        }

        time.Sleep(2 * time.Second)
    }
}()

Progress Tracking

The queue provides real-time progress updates through:

1. Job Progress Field

job.Progress  // 0-100% float64

2. Change Callback

queue.SetChangeCallback(func() {
    // Called whenever job status/progress changes
    // Should trigger UI refresh
})

3. Status Polling

pending, running, completed, failed := queue.Stats()
jobs := queue.List()

Example Progress Display

func displayProgress(queue *queue.Queue) {
    jobs := queue.List()
    for _, job := range jobs {
        status := string(job.Status)
        progress := fmt.Sprintf("%.1f%%", job.Progress)
        fmt.Printf("[%-10s] %s: %s\n", status, job.Title, progress)
    }
}

Error Handling

Job Failures

job := queue.Get(jobID)
if job.Status == queue.JobStatusFailed {
    fmt.Printf("Job failed: %s\n", job.Error)
    // Retry or inspect error
}

Retry Logic

failedJob := queue.Get(jobID)
if failedJob.Status == queue.JobStatusFailed {
    // Create new job with same config
    retryJob := &queue.Job{
        Type:        failedJob.Type,
        Title:       failedJob.Title + " (retry)",
        InputFile:   failedJob.InputFile,
        OutputFile:  failedJob.OutputFile,
        Config:      failedJob.Config,
        Priority:    10,  // Higher priority
    }
    queue.Add(retryJob)
}

Persistence

Save Queue State

// Save all jobs to JSON
queue.Save("/home/user/.videotools/queue.json")

Load Previous Queue

// Restore jobs from file
queue.Load("/home/user/.videotools/queue.json")

Queue File Format

[
  {
    "id": "job-uuid-1",
    "type": "convert",
    "status": "completed",
    "title": "Convert video.mp4",
    "description": "DVD-NTSC preset",
    "input_file": "video.mp4",
    "output_file": "video.mpg",
    "config": {
      "preset": "DVD-ntsc",
      "videoCodec": "mpeg2video"
    },
    "progress": 100,
    "created_at": "2025-11-29T12:00:00Z",
    "started_at": "2025-11-29T12:05:00Z",
    "completed_at": "2025-11-29T12:35:00Z",
    "priority": 5
  }
]

Thread Safety

The queue uses sync.RWMutex for complete thread safety:

// Safe for concurrent access
go queue.Add(job1)
go queue.Add(job2)
go queue.Remove(jobID)
go queue.Start()

// All operations are synchronized internally

Important: Callback Deadlock Prevention

//  DON'T: Direct UI update in callback
queue.SetChangeCallback(func() {
    button.SetText("Processing")  // May deadlock on Fyne!
})

//  DO: Use Fyne's thread marshaling
queue.SetChangeCallback(func() {
    fyne.CurrentApp().Driver().DoFromGoroutine(func() {
        button.SetText("Processing")  // Safe
    }, false)
})

Known Issues & Workarounds

Issue 1: CGO Compilation Hang

Status: Known issue, not queue-related

  • Cause: GCC 15.2.1 with OpenGL binding compilation
  • Workaround: Pre-built binary available in repository

Issue 2: Queue Callback Threading (FIXED in v0.1.0-dev11)

Status: RESOLVED

  • Fix: Use DoFromGoroutine for Fyne callbacks
  • Implementation: See main.go line ~1130

Performance Characteristics

  • Job Addition: O(1) - append only
  • Job Removal: O(n) - linear search
  • Status Update: O(1) - direct pointer access
  • List Retrieval: O(n) - returns copy
  • Stats Query: O(n) - counts all jobs
  • Concurrency: Full thread-safe with RWMutex

Testing Queue System

Create internal/queue/queue_test.go:

package queue

import (
    "context"
    "testing"
    "time"
)

func TestAddJob(t *testing.T) {
    q := New(func(ctx context.Context, job *Job, cb func(float64)) error {
        return nil
    })

    job := &Job{
        Type:  JobTypeConvert,
        Title: "Test Job",
    }

    q.Add(job)

    if len(q.List()) != 1 {
        t.Fatalf("Expected 1 job, got %d", len(q.List()))
    }
}

func TestPauseResume(t *testing.T) {
    // ... test pause/resume logic
}

Summary

The VideoTools queue system is:

  • Complete: All 24 methods implemented
  • Tested: Integrated in main.go and working
  • Thread-Safe: Full RWMutex synchronization
  • Persistent: JSON save/load capability
  • DVD-Ready: Works with DVD-NTSC encoding jobs

Ready for:

  • Batch processing of multiple videos
  • DVD-NTSC conversions
  • Real-time progress monitoring
  • Job prioritization and reordering
  • Professional video authoring workflows