From c0e433f4912cbf6a397c7728ba91811a6af634e9 Mon Sep 17 00:00:00 2001 From: VideoTools CI Date: Thu, 22 Jan 2026 08:48:34 -0500 Subject: [PATCH] fix(player): repair gstreamer bus loop --- internal/player/gstreamer_player.go | 83 ++--------------------------- 1 file changed, 4 insertions(+), 79 deletions(-) diff --git a/internal/player/gstreamer_player.go b/internal/player/gstreamer_player.go index 9e9dc80..9e1fccf 100644 --- a/internal/player/gstreamer_player.go +++ b/internal/player/gstreamer_player.go @@ -108,11 +108,10 @@ type GStreamerPlayer struct { duration time.Duration // Bus handling - busCh chan *C.GstMessage - eos chan struct{} + busQuit chan struct{} busDone chan struct{} -} - + eos bool + state C.GstState } type busEvent struct { @@ -628,6 +627,7 @@ func (p *GStreamerPlayer) busLoop() { msgType := C.vt_gst_message_type(msg) switch msgType { case C.GST_MESSAGE_ERROR: + errMsg := C.vt_gst_error_from_message(msg) p.mu.Lock() if errMsg != nil { p.lastErr = C.GoString(errMsg) @@ -669,81 +669,6 @@ func (p *GStreamerPlayer) busLoop() { C.gst_message_unref(msg) } } - p.mu.Unlock() - }() - - for { - select { - case <-p.busQuit: - return - default: - } - - p.mu.Lock() - bus := p.bus - p.mu.Unlock() - if bus == nil { - time.Sleep(100 * time.Millisecond) - continue - } - - msg := C.gst_bus_timed_pop_filtered(bus, 200*1000*1000, C.vt_gst_message_mask()) - if msg == nil { - continue - } - - msgType := C.vt_gst_message_type(msg) - switch msgType { - case C.GST_MESSAGE_ERROR: - errMsg := C.vt_gst_error_from_message(msg) - p.mu.Lock() - if errMsg != nil { - p.lastErr = C.GoString(errMsg) - C.vt_gst_free_error(errMsg) - } else { - p.lastErr = "gstreamer error" - } - p.mode = StateError - evt := busEvent{Kind: "error", Info: p.lastErr} - p.mu.Unlock() - p.pushEvent(evt) - case C.GST_MESSAGE_EOS: - p.mu.Lock() - p.eos = true - p.mode = StateEOS - p.mu.Unlock() - p.pushEvent(busEvent{Kind: "eos"}) - case C.GST_MESSAGE_STATE_CHANGED: - var oldState C.GstState - var newState C.GstState - var pending C.GstState - C.vt_gst_parse_state_changed(msg, &oldState, &newState, &pending) - p.mu.Lock() - p.state = newState - p.mu.Unlock() - p.pushEvent(busEvent{Kind: "state_changed", State: newState}) - case C.GST_MESSAGE_DURATION_CHANGED: - p.updateDuration() - p.pushEvent(busEvent{Kind: "duration_changed"}) - case C.GST_MESSAGE_CLOCK_LOST: - p.mu.Lock() - shouldRecover := !p.paused && p.pipeline != nil - p.mu.Unlock() - if shouldRecover { - C.gst_element_set_state(p.pipeline, C.GST_STATE_PAUSED) - C.gst_element_set_state(p.pipeline, C.GST_STATE_PLAYING) - } - p.pushEvent(busEvent{Kind: "clock_lost"}) - } - C.gst_message_unref(msg) - default: - } - p.mu.Unlock() - p.pushEvent(busEvent{Kind: "clock_lost"}) - } - C.gst_message_unref(msg) - } -} func (p *GStreamerPlayer) updateDuration() { p.mu.Lock()