VideoTools/internal/queue/queue.go
Stu Leak 27e038e1a1 Fix queue stats to properly distinguish cancelled from failed jobs
The queue Stats() method was grouping cancelled and failed jobs together,
causing cancelled jobs to be displayed as "failed" in the status bar.
Updated Stats() to return a separate cancelled count and modified all
callers (updateStatsBar, queueProgressCounts, showMainMenu) to handle
the new return value. Also updated ConversionStatsBar to display
cancelled jobs separately in the status bar.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-17 14:25:18 -05:00

581 lines
12 KiB
Go

package queue
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"
)
// JobType represents the type of job to execute
type JobType string
const (
JobTypeConvert JobType = "convert"
JobTypeMerge JobType = "merge"
JobTypeTrim JobType = "trim"
JobTypeFilter JobType = "filter"
JobTypeUpscale JobType = "upscale"
JobTypeAudio JobType = "audio"
JobTypeThumb JobType = "thumb"
JobTypeSnippet JobType = "snippet"
)
// JobStatus represents the current state of a job
type JobStatus string
const (
JobStatusPending JobStatus = "pending"
JobStatusRunning JobStatus = "running"
JobStatusPaused JobStatus = "paused"
JobStatusCompleted JobStatus = "completed"
JobStatusFailed JobStatus = "failed"
JobStatusCancelled JobStatus = "cancelled"
)
// Job represents a single job in the queue
type Job struct {
ID string `json:"id"`
Type JobType `json:"type"`
Status JobStatus `json:"status"`
Title string `json:"title"`
Description string `json:"description"`
InputFile string `json:"input_file"`
OutputFile string `json:"output_file"`
LogPath string `json:"log_path,omitempty"`
Config map[string]interface{} `json:"config"`
Progress float64 `json:"progress"`
Error string `json:"error,omitempty"`
CreatedAt time.Time `json:"created_at"`
StartedAt *time.Time `json:"started_at,omitempty"`
CompletedAt *time.Time `json:"completed_at,omitempty"`
Priority int `json:"priority"` // Higher priority = runs first
cancel context.CancelFunc `json:"-"`
}
// JobExecutor is a function that executes a job
type JobExecutor func(ctx context.Context, job *Job, progressCallback func(float64)) error
// Queue manages a queue of jobs
type Queue struct {
jobs []*Job
executor JobExecutor
running bool
mu sync.RWMutex
onChange func() // Callback when queue state changes
}
// New creates a new queue with the given executor
func New(executor JobExecutor) *Queue {
return &Queue{
jobs: make([]*Job, 0),
executor: executor,
running: false,
}
}
// SetChangeCallback sets a callback to be called when the queue state changes
func (q *Queue) SetChangeCallback(callback func()) {
q.mu.Lock()
defer q.mu.Unlock()
q.onChange = callback
}
// notifyChange triggers the onChange callback if set
// Must be called without holding the mutex lock
func (q *Queue) notifyChange() {
if q.onChange != nil {
// Call in goroutine to avoid blocking and potential deadlocks
go q.onChange()
}
}
// Add adds a job to the queue
func (q *Queue) Add(job *Job) {
q.mu.Lock()
if job.ID == "" {
job.ID = generateID()
}
if job.CreatedAt.IsZero() {
job.CreatedAt = time.Now()
}
if job.Status == "" {
job.Status = JobStatusPending
}
q.jobs = append(q.jobs, job)
q.rebalancePrioritiesLocked()
q.mu.Unlock()
q.notifyChange()
}
// Remove removes a job from the queue by ID
func (q *Queue) Remove(id string) error {
q.mu.Lock()
var removed bool
for i, job := range q.jobs {
if job.ID == id {
// Cancel if running
if job.Status == JobStatusRunning && job.cancel != nil {
job.cancel()
}
q.jobs = append(q.jobs[:i], q.jobs[i+1:]...)
q.rebalancePrioritiesLocked()
removed = true
break
}
}
q.mu.Unlock()
if removed {
q.notifyChange()
return nil
}
return fmt.Errorf("job not found: %s", id)
}
// Get retrieves a job by ID
func (q *Queue) Get(id string) (*Job, error) {
q.mu.RLock()
defer q.mu.RUnlock()
for _, job := range q.jobs {
if job.ID == id {
return job, nil
}
}
return nil, fmt.Errorf("job not found: %s", id)
}
// List returns all jobs in the queue
func (q *Queue) List() []*Job {
q.mu.RLock()
defer q.mu.RUnlock()
// Return a copy of the jobs to avoid races on the live queue state
result := make([]*Job, len(q.jobs))
for i, job := range q.jobs {
clone := *job
result[i] = &clone
}
return result
}
// Stats returns queue statistics
func (q *Queue) Stats() (pending, running, completed, failed, cancelled int) {
q.mu.RLock()
defer q.mu.RUnlock()
for _, job := range q.jobs {
switch job.Status {
case JobStatusPending, JobStatusPaused:
pending++
case JobStatusRunning:
running++
case JobStatusCompleted:
completed++
case JobStatusFailed:
failed++
case JobStatusCancelled:
cancelled++
}
}
return
}
// CurrentRunning returns the currently running job, if any.
func (q *Queue) CurrentRunning() *Job {
q.mu.RLock()
defer q.mu.RUnlock()
for _, job := range q.jobs {
if job.Status == JobStatusRunning {
clone := *job
return &clone
}
}
return nil
}
// Pause pauses a running job
func (q *Queue) Pause(id string) error {
q.mu.Lock()
result := fmt.Errorf("job not found: %s", id)
for _, job := range q.jobs {
if job.ID == id {
if job.Status != JobStatusRunning {
result = fmt.Errorf("job is not running")
break
}
if job.cancel != nil {
job.cancel()
}
job.Status = JobStatusPaused
// Keep position; just stop current run
result = nil
break
}
}
q.mu.Unlock()
if result == nil {
q.notifyChange()
}
return result
}
// Resume resumes a paused job
func (q *Queue) Resume(id string) error {
q.mu.Lock()
result := fmt.Errorf("job not found: %s", id)
for _, job := range q.jobs {
if job.ID == id {
if job.Status != JobStatusPaused {
result = fmt.Errorf("job is not paused")
break
}
job.Status = JobStatusPending
// Keep position; move selection via priorities
result = nil
break
}
}
q.mu.Unlock()
if result == nil {
q.notifyChange()
}
return result
}
// Cancel cancels a job
func (q *Queue) Cancel(id string) error {
q.mu.Lock()
var cancelled bool
now := time.Now()
for _, job := range q.jobs {
if job.ID == id {
if job.Status == JobStatusRunning && job.cancel != nil {
job.cancel()
}
job.Status = JobStatusCancelled
job.CompletedAt = &now
q.rebalancePrioritiesLocked()
cancelled = true
break
}
}
q.mu.Unlock()
if cancelled {
q.notifyChange()
return nil
}
return fmt.Errorf("job not found: %s", id)
}
// Start starts processing jobs in the queue
func (q *Queue) Start() {
q.mu.Lock()
if q.running {
q.mu.Unlock()
return
}
q.running = true
q.mu.Unlock()
go q.processJobs()
}
// Stop stops processing jobs
func (q *Queue) Stop() {
q.mu.Lock()
defer q.mu.Unlock()
q.running = false
}
// IsRunning returns true if the queue is currently processing jobs
func (q *Queue) IsRunning() bool {
q.mu.RLock()
defer q.mu.RUnlock()
return q.running
}
// PauseAll pauses any running job and stops processing
func (q *Queue) PauseAll() {
q.mu.Lock()
for _, job := range q.jobs {
if job.Status == JobStatusRunning && job.cancel != nil {
job.cancel()
job.Status = JobStatusPaused
job.cancel = nil
job.StartedAt = nil
job.CompletedAt = nil
job.Error = ""
}
}
q.running = false
q.mu.Unlock()
q.notifyChange()
}
// ResumeAll restarts processing the queue
func (q *Queue) ResumeAll() {
q.mu.Lock()
if q.running {
q.mu.Unlock()
return
}
q.running = true
q.mu.Unlock()
q.notifyChange()
go q.processJobs()
}
// processJobs continuously processes pending jobs
func (q *Queue) processJobs() {
for {
q.mu.Lock()
if !q.running {
q.mu.Unlock()
return
}
// Check if there's already a running job (only process one at a time)
hasRunningJob := false
for _, job := range q.jobs {
if job.Status == JobStatusRunning {
hasRunningJob = true
break
}
}
// If a job is already running, wait and check again later
if hasRunningJob {
q.mu.Unlock()
time.Sleep(500 * time.Millisecond)
continue
}
// Find highest priority pending job
var nextJob *Job
highestPriority := -1
for _, job := range q.jobs {
if job.Status == JobStatusPending && job.Priority > highestPriority {
nextJob = job
highestPriority = job.Priority
}
}
if nextJob == nil {
q.mu.Unlock()
time.Sleep(500 * time.Millisecond)
continue
}
// Mark as running
nextJob.Status = JobStatusRunning
now := time.Now()
nextJob.StartedAt = &now
ctx, cancel := context.WithCancel(context.Background())
nextJob.cancel = cancel
q.mu.Unlock()
q.notifyChange()
// Execute job
err := q.executor(ctx, nextJob, func(progress float64) {
q.mu.Lock()
nextJob.Progress = progress
q.mu.Unlock()
q.notifyChange()
})
// Update job status
q.mu.Lock()
now = time.Now()
if err != nil {
if ctx.Err() == context.Canceled {
if nextJob.Status == JobStatusPaused {
// Leave as paused without timestamps/error
nextJob.StartedAt = nil
nextJob.CompletedAt = nil
nextJob.Error = ""
} else {
// Cancelled
nextJob.Status = JobStatusCancelled
nextJob.CompletedAt = &now
nextJob.Error = ""
}
} else {
nextJob.Status = JobStatusFailed
nextJob.CompletedAt = &now
nextJob.Error = err.Error()
}
} else {
nextJob.Status = JobStatusCompleted
nextJob.Progress = 100.0
nextJob.CompletedAt = &now
}
nextJob.cancel = nil
q.mu.Unlock()
q.notifyChange()
}
}
// MoveUp moves a pending or paused job one position up in the queue
func (q *Queue) MoveUp(id string) error {
return q.move(id, -1)
}
// MoveDown moves a pending or paused job one position down in the queue
func (q *Queue) MoveDown(id string) error {
return q.move(id, 1)
}
func (q *Queue) move(id string, delta int) error {
q.mu.Lock()
defer q.mu.Unlock()
var idx int = -1
for i, job := range q.jobs {
if job.ID == id {
idx = i
if job.Status != JobStatusPending && job.Status != JobStatusPaused {
return fmt.Errorf("job must be pending or paused to reorder")
}
break
}
}
if idx == -1 {
return fmt.Errorf("job not found: %s", id)
}
newIdx := idx + delta
if newIdx < 0 || newIdx >= len(q.jobs) {
return nil // already at boundary; no-op
}
q.jobs[idx], q.jobs[newIdx] = q.jobs[newIdx], q.jobs[idx]
q.rebalancePrioritiesLocked()
return nil
}
// Save saves the queue to a JSON file
func (q *Queue) Save(path string) error {
q.mu.RLock()
defer q.mu.RUnlock()
// Create directory if it doesn't exist
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("failed to create directory: %w", err)
}
data, err := json.MarshalIndent(q.jobs, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal queue: %w", err)
}
if err := os.WriteFile(path, data, 0644); err != nil {
return fmt.Errorf("failed to write queue file: %w", err)
}
return nil
}
// Load loads the queue from a JSON file
func (q *Queue) Load(path string) error {
data, err := os.ReadFile(path)
if err != nil {
if os.IsNotExist(err) {
return nil // No saved queue, that's OK
}
return fmt.Errorf("failed to read queue file: %w", err)
}
var jobs []*Job
if err := json.Unmarshal(data, &jobs); err != nil {
return fmt.Errorf("failed to unmarshal queue: %w", err)
}
q.mu.Lock()
// Reset running jobs to pending
for _, job := range jobs {
if job.Status == JobStatusRunning {
job.Status = JobStatusPending
job.Progress = 0
}
}
q.jobs = jobs
q.rebalancePrioritiesLocked()
q.mu.Unlock()
q.notifyChange()
return nil
}
// Clear removes all completed, failed, and cancelled jobs
func (q *Queue) Clear() {
q.mu.Lock()
// Keep only pending, running, and paused jobs
filtered := make([]*Job, 0)
for _, job := range q.jobs {
if job.Status == JobStatusPending || job.Status == JobStatusRunning || job.Status == JobStatusPaused {
filtered = append(filtered, job)
}
}
q.jobs = filtered
q.rebalancePrioritiesLocked()
q.mu.Unlock()
q.notifyChange()
}
// ClearAll removes all jobs from the queue
func (q *Queue) ClearAll() {
q.mu.Lock()
// Cancel any running work and stop the processor
q.cancelRunningLocked()
q.running = false
q.jobs = make([]*Job, 0)
q.rebalancePrioritiesLocked()
q.mu.Unlock()
q.notifyChange()
}
// generateID generates a unique ID for a job
func generateID() string {
return fmt.Sprintf("job-%d", time.Now().UnixNano())
}
// rebalancePrioritiesLocked assigns descending priorities so earlier items are selected first
func (q *Queue) rebalancePrioritiesLocked() {
for i := range q.jobs {
q.jobs[i].Priority = len(q.jobs) - i
}
}
// cancelRunningLocked cancels any currently running job and marks it cancelled.
func (q *Queue) cancelRunningLocked() {
now := time.Now()
for _, job := range q.jobs {
if job.Status == JobStatusRunning {
if job.cancel != nil {
job.cancel()
}
job.Status = JobStatusCancelled
job.CompletedAt = &now
}
}
}