gstreamer/
bus.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4    future,
5    mem::transmute,
6    pin::Pin,
7    sync::{Arc, Mutex},
8    task::{Context, Poll},
9};
10
11use futures_channel::mpsc::{self, UnboundedReceiver};
12use futures_core::Stream;
13use futures_util::{stream::FusedStream, StreamExt};
14use glib::{
15    ffi::{gboolean, gpointer},
16    prelude::*,
17    source::Priority,
18    translate::*,
19    ControlFlow,
20};
21
22use crate::{ffi, Bus, BusSyncReply, Message, MessageType};
23
24unsafe extern "C" fn trampoline_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(
25    bus: *mut ffi::GstBus,
26    msg: *mut ffi::GstMessage,
27    func: gpointer,
28) -> gboolean {
29    let func: &mut F = &mut *(func as *mut F);
30    func(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
31}
32
33unsafe extern "C" fn destroy_closure_watch<
34    F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
35>(
36    ptr: gpointer,
37) {
38    let _ = Box::<F>::from_raw(ptr as *mut _);
39}
40
41fn into_raw_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(func: F) -> gpointer {
42    #[allow(clippy::type_complexity)]
43    let func: Box<F> = Box::new(func);
44    Box::into_raw(func) as gpointer
45}
46
47unsafe extern "C" fn trampoline_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(
48    bus: *mut ffi::GstBus,
49    msg: *mut ffi::GstMessage,
50    func: gpointer,
51) -> gboolean {
52    let func: &mut glib::thread_guard::ThreadGuard<F> =
53        &mut *(func as *mut glib::thread_guard::ThreadGuard<F>);
54    (func.get_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
55}
56
57unsafe extern "C" fn destroy_closure_watch_local<
58    F: FnMut(&Bus, &Message) -> ControlFlow + 'static,
59>(
60    ptr: gpointer,
61) {
62    let _ = Box::<glib::thread_guard::ThreadGuard<F>>::from_raw(ptr as *mut _);
63}
64
65fn into_raw_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(func: F) -> gpointer {
66    #[allow(clippy::type_complexity)]
67    let func: Box<glib::thread_guard::ThreadGuard<F>> =
68        Box::new(glib::thread_guard::ThreadGuard::new(func));
69    Box::into_raw(func) as gpointer
70}
71
72unsafe extern "C" fn trampoline_sync<
73    F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
74>(
75    bus: *mut ffi::GstBus,
76    msg: *mut ffi::GstMessage,
77    func: gpointer,
78) -> ffi::GstBusSyncReply {
79    let f: &F = &*(func as *const F);
80    let res = f(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib();
81
82    if res == ffi::GST_BUS_DROP {
83        ffi::gst_mini_object_unref(msg as *mut _);
84    }
85
86    res
87}
88
89unsafe extern "C" fn destroy_closure_sync<
90    F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
91>(
92    ptr: gpointer,
93) {
94    let _ = Box::<F>::from_raw(ptr as *mut _);
95}
96
97fn into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>(
98    func: F,
99) -> gpointer {
100    let func: Box<F> = Box::new(func);
101    Box::into_raw(func) as gpointer
102}
103
104impl Bus {
105    /// Adds a bus signal watch to the default main context with the given `priority`
106    /// (e.g. `G_PRIORITY_DEFAULT`). It is also possible to use a non-default main
107    /// context set up using [`glib::MainContext::push_thread_default()`][crate::glib::MainContext::push_thread_default()]
108    /// (before one had to create a bus watch source and attach it to the desired
109    /// main context 'manually').
110    ///
111    /// After calling this statement, the bus will emit the "message" signal for each
112    /// message posted on the bus when the `GMainLoop` is running.
113    ///
114    /// This function may be called multiple times. To clean up, the caller is
115    /// responsible for calling [`remove_signal_watch()`][Self::remove_signal_watch()] as many times as this
116    /// function is called.
117    ///
118    /// There can only be a single bus watch per bus, you must remove any signal
119    /// watch before you can set another type of watch.
120    /// ## `priority`
121    /// The priority of the watch.
122    #[doc(alias = "gst_bus_add_signal_watch")]
123    #[doc(alias = "gst_bus_add_signal_watch_full")]
124    pub fn add_signal_watch_full(&self, priority: Priority) {
125        unsafe {
126            ffi::gst_bus_add_signal_watch_full(self.to_glib_none().0, priority.into_glib());
127        }
128    }
129
130    /// Create watch for this bus. The [`glib::Source`][crate::glib::Source] will be dispatched whenever
131    /// a message is on the bus. After the GSource is dispatched, the
132    /// message is popped off the bus and unreffed.
133    ///
134    /// As with other watches, there can only be one watch on the bus, including
135    /// any signal watch added with `gst_bus_add_signal_watch`.
136    ///
137    /// # Returns
138    ///
139    /// a [`glib::Source`][crate::glib::Source] that can be added to a `GMainLoop`.
140    #[doc(alias = "gst_bus_create_watch")]
141    pub fn create_watch<F>(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source
142    where
143        F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
144    {
145        skip_assert_initialized!();
146        unsafe {
147            let source = ffi::gst_bus_create_watch(self.to_glib_none().0);
148            glib::ffi::g_source_set_callback(
149                source,
150                Some(transmute::<
151                    *mut (),
152                    unsafe extern "C" fn(glib::ffi::gpointer) -> i32,
153                >(trampoline_watch::<F> as *mut ())),
154                into_raw_watch(func),
155                Some(destroy_closure_watch::<F>),
156            );
157            glib::ffi::g_source_set_priority(source, priority.into_glib());
158
159            if let Some(name) = name {
160                glib::ffi::g_source_set_name(source, name.to_glib_none().0);
161            }
162
163            from_glib_full(source)
164        }
165    }
166
167    /// Adds a bus watch to the default main context with the default priority
168    /// ( `G_PRIORITY_DEFAULT` ). It is also possible to use a non-default main
169    /// context set up using [`glib::MainContext::push_thread_default()`][crate::glib::MainContext::push_thread_default()] (before
170    /// one had to create a bus watch source and attach it to the desired main
171    /// context 'manually').
172    ///
173    /// This function is used to receive asynchronous messages in the main loop.
174    /// There can only be a single bus watch per bus, you must remove it before you
175    /// can set a new one.
176    ///
177    /// The bus watch will only work if a `GMainLoop` is being run.
178    ///
179    /// The watch can be removed using [`remove_watch()`][Self::remove_watch()] or by returning [`false`]
180    /// from `func`. If the watch was added to the default main context it is also
181    /// possible to remove the watch using [`glib::Source::remove()`][crate::glib::Source::remove()].
182    ///
183    /// The bus watch will take its own reference to the `self`, so it is safe to unref
184    /// `self` using `gst_object_unref()` after setting the bus watch.
185    /// ## `func`
186    /// A function to call when a message is received.
187    ///
188    /// # Returns
189    ///
190    /// The event source id or 0 if `self` already got an event source.
191    #[doc(alias = "gst_bus_add_watch")]
192    #[doc(alias = "gst_bus_add_watch_full")]
193    pub fn add_watch<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError>
194    where
195        F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
196    {
197        unsafe {
198            let res = ffi::gst_bus_add_watch_full(
199                self.to_glib_none().0,
200                glib::ffi::G_PRIORITY_DEFAULT,
201                Some(trampoline_watch::<F>),
202                into_raw_watch(func),
203                Some(destroy_closure_watch::<F>),
204            );
205
206            if res == 0 {
207                Err(glib::bool_error!("Bus already has a watch"))
208            } else {
209                Ok(BusWatchGuard { bus: self.clone() })
210            }
211        }
212    }
213
214    #[doc(alias = "gst_bus_add_watch")]
215    #[doc(alias = "gst_bus_add_watch_full")]
216    pub fn add_watch_local<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError>
217    where
218        F: FnMut(&Bus, &Message) -> ControlFlow + 'static,
219    {
220        unsafe {
221            let ctx = glib::MainContext::ref_thread_default();
222            let _acquire = ctx
223                .acquire()
224                .expect("thread default main context already acquired by another thread");
225
226            let res = ffi::gst_bus_add_watch_full(
227                self.to_glib_none().0,
228                glib::ffi::G_PRIORITY_DEFAULT,
229                Some(trampoline_watch_local::<F>),
230                into_raw_watch_local(func),
231                Some(destroy_closure_watch_local::<F>),
232            );
233
234            if res == 0 {
235                Err(glib::bool_error!("Bus already has a watch"))
236            } else {
237                Ok(BusWatchGuard { bus: self.clone() })
238            }
239        }
240    }
241
242    /// Sets the synchronous handler on the bus. The function will be called
243    /// every time a new message is posted on the bus. Note that the function
244    /// will be called in the same thread context as the posting object. This
245    /// function is usually only called by the creator of the bus. Applications
246    /// should handle messages asynchronously using the gst_bus watch and poll
247    /// functions.
248    ///
249    /// Before 1.16.3 it was not possible to replace an existing handler and
250    /// clearing an existing handler with [`None`] was not thread-safe.
251    /// ## `func`
252    /// The handler function to install
253    /// ## `notify`
254    /// called when `user_data` becomes unused
255    #[doc(alias = "gst_bus_set_sync_handler")]
256    pub fn set_sync_handler<F>(&self, func: F)
257    where
258        F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
259    {
260        unsafe {
261            let bus = self.to_glib_none().0;
262
263            #[cfg(not(feature = "v1_18"))]
264            {
265                static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
266                    std::sync::OnceLock::new();
267
268                let set_once_quark = SET_ONCE_QUARK
269                    .get_or_init(|| glib::Quark::from_str("gstreamer-rs-sync-handler"));
270
271                // This is not thread-safe before 1.16.3, see
272                // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416
273                if crate::version() < (1, 16, 3, 0) {
274                    if !glib::gobject_ffi::g_object_get_qdata(
275                        bus as *mut _,
276                        set_once_quark.into_glib(),
277                    )
278                    .is_null()
279                    {
280                        panic!("Bus sync handler can only be set once");
281                    }
282
283                    glib::gobject_ffi::g_object_set_qdata(
284                        bus as *mut _,
285                        set_once_quark.into_glib(),
286                        1 as *mut _,
287                    );
288                }
289            }
290
291            ffi::gst_bus_set_sync_handler(
292                bus,
293                Some(trampoline_sync::<F>),
294                into_raw_sync(func),
295                Some(destroy_closure_sync::<F>),
296            )
297        }
298    }
299
300    pub fn unset_sync_handler(&self) {
301        #[cfg(not(feature = "v1_18"))]
302        {
303            // This is not thread-safe before 1.16.3, see
304            // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416
305            if crate::version() < (1, 16, 3, 0) {
306                return;
307            }
308        }
309
310        unsafe {
311            use std::ptr;
312
313            ffi::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None)
314        }
315    }
316
317    #[doc(alias = "gst_bus_pop")]
318    pub fn iter(&self) -> Iter {
319        self.iter_timed(Some(crate::ClockTime::ZERO))
320    }
321
322    #[doc(alias = "gst_bus_timed_pop")]
323    pub fn iter_timed(&self, timeout: impl Into<Option<crate::ClockTime>>) -> Iter {
324        Iter {
325            bus: self,
326            timeout: timeout.into(),
327        }
328    }
329
330    #[doc(alias = "gst_bus_pop_filtered")]
331    pub fn iter_filtered<'a>(
332        &'a self,
333        msg_types: &'a [MessageType],
334    ) -> impl Iterator<Item = Message> + 'a {
335        self.iter_timed_filtered(Some(crate::ClockTime::ZERO), msg_types)
336    }
337
338    #[doc(alias = "gst_bus_timed_pop_filtered")]
339    pub fn iter_timed_filtered<'a>(
340        &'a self,
341        timeout: impl Into<Option<crate::ClockTime>>,
342        msg_types: &'a [MessageType],
343    ) -> impl Iterator<Item = Message> + 'a {
344        self.iter_timed(timeout)
345            .filter(move |msg| msg_types.contains(&msg.type_()))
346    }
347
348    /// Gets a message from the bus whose type matches the message type mask `types`,
349    /// waiting up to the specified timeout (and discarding any messages that do not
350    /// match the mask provided).
351    ///
352    /// If `timeout` is 0, this function behaves like [`pop_filtered()`][Self::pop_filtered()]. If
353    /// `timeout` is `GST_CLOCK_TIME_NONE`, this function will block forever until a
354    /// matching message was posted on the bus.
355    /// ## `timeout`
356    /// a timeout in nanoseconds, or `GST_CLOCK_TIME_NONE` to wait forever
357    /// ## `types`
358    /// message types to take into account, `GST_MESSAGE_ANY` for any type
359    ///
360    /// # Returns
361    ///
362    /// a [`Message`][crate::Message] matching the
363    ///  filter in `types`, or [`None`] if no matching message was found on
364    ///  the bus until the timeout expired.
365    #[doc(alias = "gst_bus_timed_pop_filtered")]
366    pub fn timed_pop_filtered(
367        &self,
368        timeout: impl Into<Option<crate::ClockTime>> + Clone,
369        msg_types: &[MessageType],
370    ) -> Option<Message> {
371        loop {
372            let msg = self.timed_pop(timeout.clone())?;
373            if msg_types.contains(&msg.type_()) {
374                return Some(msg);
375            }
376        }
377    }
378
379    /// Gets a message matching `type_` from the bus. Will discard all messages on
380    /// the bus that do not match `type_` and that have been posted before the first
381    /// message that does match `type_`. If there is no message matching `type_` on
382    /// the bus, all messages will be discarded. It is not possible to use message
383    /// enums beyond `GST_MESSAGE_EXTENDED` in the `events` mask.
384    /// ## `types`
385    /// message types to take into account
386    ///
387    /// # Returns
388    ///
389    /// the next [`Message`][crate::Message] matching
390    ///  `type_` that is on the bus, or [`None`] if the bus is empty or there
391    ///  is no message matching `type_`.
392    #[doc(alias = "gst_bus_pop_filtered")]
393    pub fn pop_filtered(&self, msg_types: &[MessageType]) -> Option<Message> {
394        loop {
395            let msg = self.pop()?;
396            if msg_types.contains(&msg.type_()) {
397                return Some(msg);
398            }
399        }
400    }
401
402    pub fn stream(&self) -> BusStream {
403        BusStream::new(self)
404    }
405
406    pub fn stream_filtered<'a>(
407        &self,
408        message_types: &'a [MessageType],
409    ) -> impl FusedStream<Item = Message> + Unpin + Send + 'a {
410        self.stream().filter(move |message| {
411            let message_type = message.type_();
412
413            future::ready(message_types.contains(&message_type))
414        })
415    }
416}
417
418#[must_use = "iterators are lazy and do nothing unless consumed"]
419#[derive(Debug)]
420pub struct Iter<'a> {
421    bus: &'a Bus,
422    timeout: Option<crate::ClockTime>,
423}
424
425impl Iterator for Iter<'_> {
426    type Item = Message;
427
428    fn next(&mut self) -> Option<Message> {
429        self.bus.timed_pop(self.timeout)
430    }
431}
432
433#[derive(Debug)]
434pub struct BusStream {
435    bus: glib::WeakRef<Bus>,
436    receiver: UnboundedReceiver<Message>,
437}
438
439impl BusStream {
440    fn new(bus: &Bus) -> Self {
441        skip_assert_initialized!();
442
443        let mutex = Arc::new(Mutex::new(()));
444        let (sender, receiver) = mpsc::unbounded();
445
446        // Use a mutex to ensure that the sync handler is not putting any messages into the sender
447        // until we have removed all previously queued messages from the bus.
448        // This makes sure that the messages are staying in order.
449        //
450        // We could use the bus' object lock here but a separate mutex seems safer.
451        let _mutex_guard = mutex.lock().unwrap();
452        bus.set_sync_handler({
453            let sender = sender.clone();
454            let mutex = mutex.clone();
455
456            move |_bus, message| {
457                let _mutex_guard = mutex.lock().unwrap();
458
459                let _ = sender.unbounded_send(message.to_owned());
460
461                BusSyncReply::Drop
462            }
463        });
464
465        // First pop all messages that might've been previously queued before creating the bus stream.
466        while let Some(message) = bus.pop() {
467            let _ = sender.unbounded_send(message);
468        }
469
470        Self {
471            bus: bus.downgrade(),
472            receiver,
473        }
474    }
475}
476
477impl Drop for BusStream {
478    fn drop(&mut self) {
479        if let Some(bus) = self.bus.upgrade() {
480            bus.unset_sync_handler();
481        }
482    }
483}
484
485impl Stream for BusStream {
486    type Item = Message;
487
488    fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
489        self.receiver.poll_next_unpin(context)
490    }
491}
492
493impl FusedStream for BusStream {
494    fn is_terminated(&self) -> bool {
495        self.receiver.is_terminated()
496    }
497}
498
499// rustdoc-stripper-ignore-next
500/// Manages ownership of the bus watch added to a bus with [`Bus::add_watch`] or [`Bus::add_watch_local`]
501///
502/// When dropped the bus watch is removed from the bus.
503#[derive(Debug)]
504#[must_use = "if unused the bus watch will immediately be removed"]
505pub struct BusWatchGuard {
506    bus: Bus,
507}
508
509impl Drop for BusWatchGuard {
510    fn drop(&mut self) {
511        let _ = self.bus.remove_watch();
512    }
513}
514
515#[cfg(test)]
516mod tests {
517    use std::sync::{Arc, Mutex};
518
519    use super::*;
520
521    #[test]
522    fn test_sync_handler() {
523        crate::init().unwrap();
524
525        let bus = Bus::new();
526        let msgs = Arc::new(Mutex::new(Vec::new()));
527        let msgs_clone = msgs.clone();
528        bus.set_sync_handler(move |_, msg| {
529            msgs_clone.lock().unwrap().push(msg.clone());
530            BusSyncReply::Pass
531        });
532
533        bus.post(crate::message::Eos::new()).unwrap();
534
535        let msgs = msgs.lock().unwrap();
536        assert_eq!(msgs.len(), 1);
537        match msgs[0].view() {
538            crate::MessageView::Eos(_) => (),
539            _ => unreachable!(),
540        }
541    }
542
543    #[test]
544    fn test_bus_stream() {
545        crate::init().unwrap();
546
547        let bus = Bus::new();
548        let bus_stream = bus.stream();
549
550        let eos_message = crate::message::Eos::new();
551        bus.post(eos_message).unwrap();
552
553        let bus_future = bus_stream.into_future();
554        let (message, _) = futures_executor::block_on(bus_future);
555
556        match message.unwrap().view() {
557            crate::MessageView::Eos(_) => (),
558            _ => unreachable!(),
559        }
560    }
561}