diff --git a/internal/player/gstreamer_player.go b/internal/player/gstreamer_player.go index 137ab71..b62275e 100644 --- a/internal/player/gstreamer_player.go +++ b/internal/player/gstreamer_player.go @@ -52,6 +52,20 @@ static GstSample* vt_gst_pull_sample(GstAppSink* sink, GstClockTime timeout, gbo } return gst_app_sink_try_pull_sample(sink, timeout); } +static GstMessageType vt_gst_message_mask(void) { + return GST_MESSAGE_ERROR + | GST_MESSAGE_EOS + | GST_MESSAGE_STATE_CHANGED + | GST_MESSAGE_DURATION_CHANGED + | GST_MESSAGE_ASYNC_DONE + | GST_MESSAGE_CLOCK_LOST; +} +static GstMessageType vt_gst_message_type(GstMessage* msg) { + return GST_MESSAGE_TYPE(msg); +} +static void vt_gst_parse_state_changed(GstMessage* msg, GstState* old_state, GstState* new_state, GstState* pending) { + gst_message_parse_state_changed(msg, old_state, new_state, pending); +} */ import "C" @@ -72,6 +86,10 @@ type GStreamerPlayer struct { mu sync.Mutex pipeline *C.GstElement appsink *C.GstElement + bus *C.GstBus + busQuit chan struct{} + busDone chan struct{} + events chan busEvent paused bool volume float64 preview bool @@ -79,6 +97,16 @@ type GStreamerPlayer struct { height int fps float64 queued *image.RGBA + lastErr string + eos bool + state C.GstState + duration time.Duration +} + +type busEvent struct { + Kind string + Info string + State C.GstState } func NewGStreamerPlayer(config Config) (*GStreamerPlayer, error) { @@ -93,6 +121,7 @@ func NewGStreamerPlayer(config Config) (*GStreamerPlayer, error) { } return &GStreamerPlayer{ + events: make(chan busEvent, 8), paused: true, volume: config.Volume, preview: config.PreviewMode, @@ -193,6 +222,8 @@ func (p *GStreamerPlayer) Load(path string, offset time.Duration) error { p.pipeline = playbin p.appsink = appsink p.paused = true + p.eos = false + p.lastErr = "" // Set to PAUSED to preroll (loads first frame) if C.gst_element_set_state(playbin, C.GST_STATE_PAUSED) == C.GST_STATE_CHANGE_FAILURE { @@ -225,6 +256,7 @@ func (p *GStreamerPlayer) Load(path string, offset time.Duration) error { _ = p.seekLocked(offset) } + p.startBusLoopLocked() return nil } @@ -449,7 +481,12 @@ func (p *GStreamerPlayer) Close() { p.closeLocked() } +func (p *GStreamerPlayer) Events() <-chan busEvent { + return p.events +} + func (p *GStreamerPlayer) closeLocked() { + p.stopBusLoopLocked() if p.pipeline != nil { C.gst_element_set_state(p.pipeline, C.GST_STATE_NULL) C.gst_object_unref(C.gpointer(p.pipeline)) @@ -459,6 +496,133 @@ func (p *GStreamerPlayer) closeLocked() { C.gst_object_unref(C.gpointer(p.appsink)) p.appsink = nil } + if p.bus != nil { + C.gst_object_unref(C.gpointer(p.bus)) + p.bus = nil + } +} + +func (p *GStreamerPlayer) startBusLoopLocked() { + if p.pipeline == nil || p.bus != nil { + return + } + bus := C.gst_element_get_bus(p.pipeline) + if bus == nil { + return + } + p.bus = bus + p.busQuit = make(chan struct{}) + p.busDone = make(chan struct{}) + go p.busLoop() +} + +func (p *GStreamerPlayer) stopBusLoopLocked() { + if p.busQuit == nil { + return + } + close(p.busQuit) + if p.busDone != nil { + <-p.busDone + } + p.busQuit = nil + p.busDone = nil +} + +func (p *GStreamerPlayer) busLoop() { + defer func() { + p.mu.Lock() + if p.busDone != nil { + close(p.busDone) + } + p.mu.Unlock() + }() + + for { + select { + case <-p.busQuit: + return + default: + } + + p.mu.Lock() + bus := p.bus + p.mu.Unlock() + if bus == nil { + time.Sleep(100 * time.Millisecond) + continue + } + + msg := C.gst_bus_timed_pop_filtered(bus, 200*1000*1000, C.vt_gst_message_mask()) + if msg == nil { + continue + } + + msgType := C.vt_gst_message_type(msg) + switch msgType { + case C.GST_MESSAGE_ERROR: + errMsg := C.vt_gst_error_from_message(msg) + p.mu.Lock() + if errMsg != nil { + p.lastErr = C.GoString(errMsg) + C.vt_gst_free_error(errMsg) + } else { + p.lastErr = "gstreamer error" + } + evt := busEvent{Kind: "error", Info: p.lastErr} + p.mu.Unlock() + p.pushEvent(evt) + case C.GST_MESSAGE_EOS: + p.mu.Lock() + p.eos = true + p.mu.Unlock() + p.pushEvent(busEvent{Kind: "eos"}) + case C.GST_MESSAGE_STATE_CHANGED: + var oldState C.GstState + var newState C.GstState + var pending C.GstState + C.vt_gst_parse_state_changed(msg, &oldState, &newState, &pending) + p.mu.Lock() + p.state = newState + p.mu.Unlock() + p.pushEvent(busEvent{Kind: "state_changed", State: newState}) + case C.GST_MESSAGE_DURATION_CHANGED: + p.updateDuration() + p.pushEvent(busEvent{Kind: "duration_changed"}) + case C.GST_MESSAGE_CLOCK_LOST: + p.mu.Lock() + shouldRecover := !p.paused && p.pipeline != nil + p.mu.Unlock() + if shouldRecover { + C.gst_element_set_state(p.pipeline, C.GST_STATE_PAUSED) + C.gst_element_set_state(p.pipeline, C.GST_STATE_PLAYING) + } + p.pushEvent(busEvent{Kind: "clock_lost"}) + } + C.gst_message_unref(msg) + } +} + +func (p *GStreamerPlayer) updateDuration() { + p.mu.Lock() + defer p.mu.Unlock() + if p.pipeline == nil { + return + } + var dur C.gint64 + if C.gst_element_query_duration(p.pipeline, C.GST_FORMAT_TIME, &dur) == 0 { + return + } + p.duration = time.Duration(dur) +} + +func (p *GStreamerPlayer) pushEvent(evt busEvent) { + if p.events == nil { + return + } + select { + case p.events <- evt: + default: + } } func fileURI(path string) string {