package queue import ( "bufio" "context" "encoding/json" "fmt" "os/exec" "git.leaktechnologies.dev/stu/VideoTools/internal/logging" "git.leaktechnologies.dev/stu/VideoTools/internal/ui/utils" ) // ExecuteEditJob executes an editable job with dynamic FFmpeg command func ExecuteEditJob(ctx context.Context, job *Job, progressCallback func(float64), ffmpegPath string) error { logging.Debug(logging.CatSystem, "executing edit job %s: %s", job.ID, job.Title) // Get FFmpeg command from job config if job.Config == nil { return fmt.Errorf("edit job has no config") } cmdInterface, exists := job.Config["ffmpeg_command"] if !exists { return fmt.Errorf("edit job has no ffmpeg_command in config") } // Convert to FFmpegCommand var cmd queue.FFmpegCommand if cmdBytes, err := json.Marshal(cmdInterface); err == nil { if err := json.Unmarshal(cmdBytes, &cmd); err != nil { return fmt.Errorf("failed to parse FFmpeg command: %w", err) } } else { return fmt.Errorf("failed to serialize FFmpeg command: %w", err) } // Validate command editManager := queue.NewEditJobManager(s.jobQueue) if err := editManager.ValidateCommand(&cmd); err != nil { return fmt.Errorf("invalid FFmpeg command: %w", err) } // Build final command args finalArgs := cmd.Args if cmd.InputFile != "" { finalArgs = append([]string{"-i", cmd.InputFile}, finalArgs...) } if cmd.OutputFile != "" { finalArgs = append(finalArgs, cmd.OutputFile) } // Execute FFmpeg command ffmpegPath := utils.GetFFmpegPath() fullCmd := append([]string{ffmpegPath}, finalArgs...) logging.Info(logging.CatFFMPEG, "Executing edit job: %v", fullCmd) // Create and execute command execCmd := exec.CommandContext(ctx, fullCmd[0], fullCmd[1:]...) // Set up pipes for stdout/stderr stdout, err := execCmd.StdoutPipe() if err != nil { return fmt.Errorf("failed to create stdout pipe: %w", err) } stderr, err := execCmd.StderrPipe() if err != nil { return fmt.Errorf("failed to create stderr pipe: %w", err) } // Start command if err := execCmd.Start(); err != nil { return fmt.Errorf("failed to start FFmpeg: %w", err) } // Parse output for progress progressParser := utils.NewFFmpegProgressParser() // Combine stdout and stderr for processing go func() { scanner := bufio.NewScanner(stdout) for scanner.Scan() { if progress := progressParser.ParseLine(scanner.Text()); progress >= 0 { progressCallback(progress) } } }() go func() { scanner := bufio.NewScanner(stderr) for scanner.Scan() { if progress := progressParser.ParseLine(scanner.Text()); progress >= 0 { progressCallback(progress) } // Log stderr for debugging logging.Debug(logging.CatFFMPEG, "FFmpeg stderr: %s", scanner.Text()) } }() // Wait for command to complete err = execCmd.Wait() if err != nil { return fmt.Errorf("FFmpeg execution failed: %w", err) } // Mark job as completed progressCallback(100.0) logging.Info(logging.CatFFMPEG, "Edit job %s completed successfully", job.ID) return nil }