Add GStreamer player state machine

This commit is contained in:
Stu Leak 2026-01-10 15:46:32 -05:00
parent bc56642d3b
commit 14599f4df5

View File

@ -84,6 +84,7 @@ var gstInitOnce sync.Once
type GStreamerPlayer struct {
mu sync.Mutex
seekMu sync.Mutex
pipeline *C.GstElement
appsink *C.GstElement
bus *C.GstBus
@ -101,6 +102,7 @@ type GStreamerPlayer struct {
eos bool
state C.GstState
duration time.Duration
mode PlayerState
}
type busEvent struct {
@ -109,6 +111,20 @@ type busEvent struct {
State C.GstState
}
type PlayerState int
const (
StateIdle PlayerState = iota
StateLoading
StatePaused
StatePlaying
StateSeeking
StateStepping
StateStopped
StateError
StateEOS
)
func NewGStreamerPlayer(config Config) (*GStreamerPlayer, error) {
var initErr error
gstInitOnce.Do(func() {
@ -125,6 +141,7 @@ func NewGStreamerPlayer(config Config) (*GStreamerPlayer, error) {
paused: true,
volume: config.Volume,
preview: config.PreviewMode,
mode: StateIdle,
}, nil
}
@ -224,9 +241,11 @@ func (p *GStreamerPlayer) Load(path string, offset time.Duration) error {
p.paused = true
p.eos = false
p.lastErr = ""
p.mode = StateLoading
// Set to PAUSED to preroll (loads first frame)
if C.gst_element_set_state(playbin, C.GST_STATE_PAUSED) == C.GST_STATE_CHANGE_FAILURE {
p.mode = StateError
p.closeLocked()
return errors.New("gstreamer failed to enter paused state")
}
@ -244,8 +263,10 @@ func (p *GStreamerPlayer) Load(path string, offset time.Duration) error {
p.closeLocked()
if errMsg != nil {
defer C.vt_gst_free_error(errMsg)
p.mode = StateError
return errors.New(C.GoString(errMsg))
}
p.mode = StateError
return errors.New("gstreamer error while loading")
}
C.gst_message_unref(msg)
@ -256,6 +277,7 @@ func (p *GStreamerPlayer) Load(path string, offset time.Duration) error {
_ = p.seekLocked(offset)
}
p.mode = StatePaused
p.startBusLoopLocked()
return nil
}
@ -268,9 +290,11 @@ func (p *GStreamerPlayer) Play() error {
return errors.New("no pipeline loaded")
}
if C.gst_element_set_state(p.pipeline, C.GST_STATE_PLAYING) == C.GST_STATE_CHANGE_FAILURE {
p.mode = StateError
return errors.New("gstreamer failed to enter playing state")
}
p.paused = false
p.mode = StatePlaying
return nil
}
@ -282,16 +306,33 @@ func (p *GStreamerPlayer) Pause() error {
return errors.New("no pipeline loaded")
}
if C.gst_element_set_state(p.pipeline, C.GST_STATE_PAUSED) == C.GST_STATE_CHANGE_FAILURE {
p.mode = StateError
return errors.New("gstreamer failed to enter paused state")
}
p.paused = true
p.mode = StatePaused
return nil
}
func (p *GStreamerPlayer) SeekToTime(offset time.Duration) error {
p.seekMu.Lock()
defer p.seekMu.Unlock()
p.mu.Lock()
defer p.mu.Unlock()
return p.seekLocked(offset)
prevMode := p.mode
p.mode = StateSeeking
p.mu.Unlock()
err := p.seekLocked(offset)
p.mu.Lock()
if err != nil {
p.mode = StateError
} else {
p.mode = prevMode
}
p.mu.Unlock()
return err
}
func (p *GStreamerPlayer) seekLocked(offset time.Duration) error {
@ -311,14 +352,30 @@ func (p *GStreamerPlayer) seekLockedWithFlags(offset time.Duration, flags C.GstS
}
func (p *GStreamerPlayer) SeekToFrame(frame int64) error {
p.seekMu.Lock()
defer p.seekMu.Unlock()
p.mu.Lock()
defer p.mu.Unlock()
if p.fps <= 0 {
p.mu.Unlock()
return nil
}
prevMode := p.mode
p.mode = StateStepping
seconds := float64(frame) / p.fps
p.mu.Unlock()
flags := C.GstSeekFlags(C.GST_SEEK_FLAG_FLUSH | C.GST_SEEK_FLAG_ACCURATE)
return p.seekLockedWithFlags(time.Duration(seconds*float64(time.Second)), flags)
err := p.seekLockedWithFlags(time.Duration(seconds*float64(time.Second)), flags)
p.mu.Lock()
if err != nil {
p.mode = StateError
} else {
p.mode = prevMode
}
p.mu.Unlock()
return err
}
func (p *GStreamerPlayer) GetCurrentTime() time.Duration {
@ -472,12 +529,14 @@ func (p *GStreamerPlayer) Stop() error {
if p.pipeline != nil {
C.gst_element_set_state(p.pipeline, C.GST_STATE_NULL)
}
p.mode = StateStopped
return nil
}
func (p *GStreamerPlayer) Close() {
p.mu.Lock()
defer p.mu.Unlock()
p.mode = StateStopped
p.closeLocked()
}
@ -485,6 +544,12 @@ func (p *GStreamerPlayer) Events() <-chan busEvent {
return p.events
}
func (p *GStreamerPlayer) State() PlayerState {
p.mu.Lock()
defer p.mu.Unlock()
return p.mode
}
func (p *GStreamerPlayer) closeLocked() {
p.stopBusLoopLocked()
if p.pipeline != nil {
@ -568,12 +633,14 @@ func (p *GStreamerPlayer) busLoop() {
} else {
p.lastErr = "gstreamer error"
}
p.mode = StateError
evt := busEvent{Kind: "error", Info: p.lastErr}
p.mu.Unlock()
p.pushEvent(evt)
case C.GST_MESSAGE_EOS:
p.mu.Lock()
p.eos = true
p.mode = StateEOS
p.mu.Unlock()
p.pushEvent(busEvent{Kind: "eos"})
case C.GST_MESSAGE_STATE_CHANGED: