From 14599f4df5c697aae6050b712b60a8691821cc7a Mon Sep 17 00:00:00 2001 From: Stu Leak Date: Sat, 10 Jan 2026 15:46:32 -0500 Subject: [PATCH] Add GStreamer player state machine --- internal/player/gstreamer_player.go | 75 +++++++++++++++++++++++++++-- 1 file changed, 71 insertions(+), 4 deletions(-) diff --git a/internal/player/gstreamer_player.go b/internal/player/gstreamer_player.go index b62275e..824b7ba 100644 --- a/internal/player/gstreamer_player.go +++ b/internal/player/gstreamer_player.go @@ -84,6 +84,7 @@ var gstInitOnce sync.Once type GStreamerPlayer struct { mu sync.Mutex + seekMu sync.Mutex pipeline *C.GstElement appsink *C.GstElement bus *C.GstBus @@ -101,6 +102,7 @@ type GStreamerPlayer struct { eos bool state C.GstState duration time.Duration + mode PlayerState } type busEvent struct { @@ -109,6 +111,20 @@ type busEvent struct { State C.GstState } +type PlayerState int + +const ( + StateIdle PlayerState = iota + StateLoading + StatePaused + StatePlaying + StateSeeking + StateStepping + StateStopped + StateError + StateEOS +) + func NewGStreamerPlayer(config Config) (*GStreamerPlayer, error) { var initErr error gstInitOnce.Do(func() { @@ -125,6 +141,7 @@ func NewGStreamerPlayer(config Config) (*GStreamerPlayer, error) { paused: true, volume: config.Volume, preview: config.PreviewMode, + mode: StateIdle, }, nil } @@ -224,9 +241,11 @@ func (p *GStreamerPlayer) Load(path string, offset time.Duration) error { p.paused = true p.eos = false p.lastErr = "" + p.mode = StateLoading // Set to PAUSED to preroll (loads first frame) if C.gst_element_set_state(playbin, C.GST_STATE_PAUSED) == C.GST_STATE_CHANGE_FAILURE { + p.mode = StateError p.closeLocked() return errors.New("gstreamer failed to enter paused state") } @@ -244,8 +263,10 @@ func (p *GStreamerPlayer) Load(path string, offset time.Duration) error { p.closeLocked() if errMsg != nil { defer C.vt_gst_free_error(errMsg) + p.mode = StateError return errors.New(C.GoString(errMsg)) } + p.mode = StateError return errors.New("gstreamer error while loading") } C.gst_message_unref(msg) @@ -256,6 +277,7 @@ func (p *GStreamerPlayer) Load(path string, offset time.Duration) error { _ = p.seekLocked(offset) } + p.mode = StatePaused p.startBusLoopLocked() return nil } @@ -268,9 +290,11 @@ func (p *GStreamerPlayer) Play() error { return errors.New("no pipeline loaded") } if C.gst_element_set_state(p.pipeline, C.GST_STATE_PLAYING) == C.GST_STATE_CHANGE_FAILURE { + p.mode = StateError return errors.New("gstreamer failed to enter playing state") } p.paused = false + p.mode = StatePlaying return nil } @@ -282,16 +306,33 @@ func (p *GStreamerPlayer) Pause() error { return errors.New("no pipeline loaded") } if C.gst_element_set_state(p.pipeline, C.GST_STATE_PAUSED) == C.GST_STATE_CHANGE_FAILURE { + p.mode = StateError return errors.New("gstreamer failed to enter paused state") } p.paused = true + p.mode = StatePaused return nil } func (p *GStreamerPlayer) SeekToTime(offset time.Duration) error { + p.seekMu.Lock() + defer p.seekMu.Unlock() + p.mu.Lock() - defer p.mu.Unlock() - return p.seekLocked(offset) + prevMode := p.mode + p.mode = StateSeeking + p.mu.Unlock() + + err := p.seekLocked(offset) + + p.mu.Lock() + if err != nil { + p.mode = StateError + } else { + p.mode = prevMode + } + p.mu.Unlock() + return err } func (p *GStreamerPlayer) seekLocked(offset time.Duration) error { @@ -311,14 +352,30 @@ func (p *GStreamerPlayer) seekLockedWithFlags(offset time.Duration, flags C.GstS } func (p *GStreamerPlayer) SeekToFrame(frame int64) error { + p.seekMu.Lock() + defer p.seekMu.Unlock() + p.mu.Lock() - defer p.mu.Unlock() if p.fps <= 0 { + p.mu.Unlock() return nil } + prevMode := p.mode + p.mode = StateStepping seconds := float64(frame) / p.fps + p.mu.Unlock() + flags := C.GstSeekFlags(C.GST_SEEK_FLAG_FLUSH | C.GST_SEEK_FLAG_ACCURATE) - return p.seekLockedWithFlags(time.Duration(seconds*float64(time.Second)), flags) + err := p.seekLockedWithFlags(time.Duration(seconds*float64(time.Second)), flags) + + p.mu.Lock() + if err != nil { + p.mode = StateError + } else { + p.mode = prevMode + } + p.mu.Unlock() + return err } func (p *GStreamerPlayer) GetCurrentTime() time.Duration { @@ -472,12 +529,14 @@ func (p *GStreamerPlayer) Stop() error { if p.pipeline != nil { C.gst_element_set_state(p.pipeline, C.GST_STATE_NULL) } + p.mode = StateStopped return nil } func (p *GStreamerPlayer) Close() { p.mu.Lock() defer p.mu.Unlock() + p.mode = StateStopped p.closeLocked() } @@ -485,6 +544,12 @@ func (p *GStreamerPlayer) Events() <-chan busEvent { return p.events } +func (p *GStreamerPlayer) State() PlayerState { + p.mu.Lock() + defer p.mu.Unlock() + return p.mode +} + func (p *GStreamerPlayer) closeLocked() { p.stopBusLoopLocked() if p.pipeline != nil { @@ -568,12 +633,14 @@ func (p *GStreamerPlayer) busLoop() { } else { p.lastErr = "gstreamer error" } + p.mode = StateError evt := busEvent{Kind: "error", Info: p.lastErr} p.mu.Unlock() p.pushEvent(evt) case C.GST_MESSAGE_EOS: p.mu.Lock() p.eos = true + p.mode = StateEOS p.mu.Unlock() p.pushEvent(busEvent{Kind: "eos"}) case C.GST_MESSAGE_STATE_CHANGED: