1use std::{
4 future,
5 mem::transmute,
6 pin::Pin,
7 sync::{Arc, Mutex},
8 task::{Context, Poll},
9};
10
11use futures_channel::mpsc::{self, UnboundedReceiver};
12use futures_core::Stream;
13use futures_util::{stream::FusedStream, StreamExt};
14use glib::{
15 ffi::{gboolean, gpointer},
16 prelude::*,
17 source::Priority,
18 translate::*,
19 ControlFlow,
20};
21
22use crate::{ffi, Bus, BusSyncReply, Message, MessageType};
23
24unsafe extern "C" fn trampoline_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(
25 bus: *mut ffi::GstBus,
26 msg: *mut ffi::GstMessage,
27 func: gpointer,
28) -> gboolean {
29 let func: &mut F = &mut *(func as *mut F);
30 func(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
31}
32
33unsafe extern "C" fn destroy_closure_watch<
34 F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
35>(
36 ptr: gpointer,
37) {
38 let _ = Box::<F>::from_raw(ptr as *mut _);
39}
40
41fn into_raw_watch<F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static>(func: F) -> gpointer {
42 #[allow(clippy::type_complexity)]
43 let func: Box<F> = Box::new(func);
44 Box::into_raw(func) as gpointer
45}
46
47unsafe extern "C" fn trampoline_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(
48 bus: *mut ffi::GstBus,
49 msg: *mut ffi::GstMessage,
50 func: gpointer,
51) -> gboolean {
52 let func: &mut glib::thread_guard::ThreadGuard<F> =
53 &mut *(func as *mut glib::thread_guard::ThreadGuard<F>);
54 (func.get_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
55}
56
57unsafe extern "C" fn destroy_closure_watch_local<
58 F: FnMut(&Bus, &Message) -> ControlFlow + 'static,
59>(
60 ptr: gpointer,
61) {
62 let _ = Box::<glib::thread_guard::ThreadGuard<F>>::from_raw(ptr as *mut _);
63}
64
65fn into_raw_watch_local<F: FnMut(&Bus, &Message) -> ControlFlow + 'static>(func: F) -> gpointer {
66 #[allow(clippy::type_complexity)]
67 let func: Box<glib::thread_guard::ThreadGuard<F>> =
68 Box::new(glib::thread_guard::ThreadGuard::new(func));
69 Box::into_raw(func) as gpointer
70}
71
72unsafe extern "C" fn trampoline_sync<
73 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
74>(
75 bus: *mut ffi::GstBus,
76 msg: *mut ffi::GstMessage,
77 func: gpointer,
78) -> ffi::GstBusSyncReply {
79 let f: &F = &*(func as *const F);
80 let res = f(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib();
81
82 if res == ffi::GST_BUS_DROP {
83 ffi::gst_mini_object_unref(msg as *mut _);
84 }
85
86 res
87}
88
89unsafe extern "C" fn destroy_closure_sync<
90 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
91>(
92 ptr: gpointer,
93) {
94 let _ = Box::<F>::from_raw(ptr as *mut _);
95}
96
97fn into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>(
98 func: F,
99) -> gpointer {
100 let func: Box<F> = Box::new(func);
101 Box::into_raw(func) as gpointer
102}
103
104impl Bus {
105 #[doc(alias = "gst_bus_add_signal_watch")]
123 #[doc(alias = "gst_bus_add_signal_watch_full")]
124 pub fn add_signal_watch_full(&self, priority: Priority) {
125 unsafe {
126 ffi::gst_bus_add_signal_watch_full(self.to_glib_none().0, priority.into_glib());
127 }
128 }
129
130 #[doc(alias = "gst_bus_create_watch")]
141 pub fn create_watch<F>(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source
142 where
143 F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
144 {
145 skip_assert_initialized!();
146 unsafe {
147 let source = ffi::gst_bus_create_watch(self.to_glib_none().0);
148 glib::ffi::g_source_set_callback(
149 source,
150 Some(transmute::<
151 *mut (),
152 unsafe extern "C" fn(glib::ffi::gpointer) -> i32,
153 >(trampoline_watch::<F> as *mut ())),
154 into_raw_watch(func),
155 Some(destroy_closure_watch::<F>),
156 );
157 glib::ffi::g_source_set_priority(source, priority.into_glib());
158
159 if let Some(name) = name {
160 glib::ffi::g_source_set_name(source, name.to_glib_none().0);
161 }
162
163 from_glib_full(source)
164 }
165 }
166
167 #[doc(alias = "gst_bus_add_watch")]
192 #[doc(alias = "gst_bus_add_watch_full")]
193 pub fn add_watch<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError>
194 where
195 F: FnMut(&Bus, &Message) -> ControlFlow + Send + 'static,
196 {
197 unsafe {
198 let res = ffi::gst_bus_add_watch_full(
199 self.to_glib_none().0,
200 glib::ffi::G_PRIORITY_DEFAULT,
201 Some(trampoline_watch::<F>),
202 into_raw_watch(func),
203 Some(destroy_closure_watch::<F>),
204 );
205
206 if res == 0 {
207 Err(glib::bool_error!("Bus already has a watch"))
208 } else {
209 Ok(BusWatchGuard { bus: self.clone() })
210 }
211 }
212 }
213
214 #[doc(alias = "gst_bus_add_watch")]
215 #[doc(alias = "gst_bus_add_watch_full")]
216 pub fn add_watch_local<F>(&self, func: F) -> Result<BusWatchGuard, glib::BoolError>
217 where
218 F: FnMut(&Bus, &Message) -> ControlFlow + 'static,
219 {
220 unsafe {
221 let ctx = glib::MainContext::ref_thread_default();
222 let _acquire = ctx
223 .acquire()
224 .expect("thread default main context already acquired by another thread");
225
226 let res = ffi::gst_bus_add_watch_full(
227 self.to_glib_none().0,
228 glib::ffi::G_PRIORITY_DEFAULT,
229 Some(trampoline_watch_local::<F>),
230 into_raw_watch_local(func),
231 Some(destroy_closure_watch_local::<F>),
232 );
233
234 if res == 0 {
235 Err(glib::bool_error!("Bus already has a watch"))
236 } else {
237 Ok(BusWatchGuard { bus: self.clone() })
238 }
239 }
240 }
241
242 #[doc(alias = "gst_bus_set_sync_handler")]
256 pub fn set_sync_handler<F>(&self, func: F)
257 where
258 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
259 {
260 unsafe {
261 let bus = self.to_glib_none().0;
262
263 #[allow(clippy::manual_dangling_ptr)]
264 #[cfg(not(feature = "v1_18"))]
265 {
266 static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
267 std::sync::OnceLock::new();
268
269 let set_once_quark = SET_ONCE_QUARK
270 .get_or_init(|| glib::Quark::from_str("gstreamer-rs-sync-handler"));
271
272 if crate::version() < (1, 16, 3, 0) {
275 if !glib::gobject_ffi::g_object_get_qdata(
276 bus as *mut _,
277 set_once_quark.into_glib(),
278 )
279 .is_null()
280 {
281 panic!("Bus sync handler can only be set once");
282 }
283
284 glib::gobject_ffi::g_object_set_qdata(
285 bus as *mut _,
286 set_once_quark.into_glib(),
287 1 as *mut _,
288 );
289 }
290 }
291
292 ffi::gst_bus_set_sync_handler(
293 bus,
294 Some(trampoline_sync::<F>),
295 into_raw_sync(func),
296 Some(destroy_closure_sync::<F>),
297 )
298 }
299 }
300
301 pub fn unset_sync_handler(&self) {
302 #[cfg(not(feature = "v1_18"))]
303 {
304 if crate::version() < (1, 16, 3, 0) {
307 return;
308 }
309 }
310
311 unsafe {
312 use std::ptr;
313
314 ffi::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None)
315 }
316 }
317
318 #[doc(alias = "gst_bus_pop")]
319 pub fn iter(&self) -> Iter<'_> {
320 self.iter_timed(Some(crate::ClockTime::ZERO))
321 }
322
323 #[doc(alias = "gst_bus_timed_pop")]
324 pub fn iter_timed(&self, timeout: impl Into<Option<crate::ClockTime>>) -> Iter<'_> {
325 Iter {
326 bus: self,
327 timeout: timeout.into(),
328 }
329 }
330
331 #[doc(alias = "gst_bus_pop_filtered")]
332 pub fn iter_filtered<'a>(
333 &'a self,
334 msg_types: &'a [MessageType],
335 ) -> impl Iterator<Item = Message> + 'a {
336 self.iter_timed_filtered(Some(crate::ClockTime::ZERO), msg_types)
337 }
338
339 #[doc(alias = "gst_bus_timed_pop_filtered")]
340 pub fn iter_timed_filtered<'a>(
341 &'a self,
342 timeout: impl Into<Option<crate::ClockTime>>,
343 msg_types: &'a [MessageType],
344 ) -> impl Iterator<Item = Message> + 'a {
345 self.iter_timed(timeout)
346 .filter(move |msg| msg_types.contains(&msg.type_()))
347 }
348
349 #[doc(alias = "gst_bus_timed_pop_filtered")]
367 pub fn timed_pop_filtered(
368 &self,
369 timeout: impl Into<Option<crate::ClockTime>> + Clone,
370 msg_types: &[MessageType],
371 ) -> Option<Message> {
372 loop {
373 let msg = self.timed_pop(timeout.clone())?;
374 if msg_types.contains(&msg.type_()) {
375 return Some(msg);
376 }
377 }
378 }
379
380 #[doc(alias = "gst_bus_pop_filtered")]
394 pub fn pop_filtered(&self, msg_types: &[MessageType]) -> Option<Message> {
395 loop {
396 let msg = self.pop()?;
397 if msg_types.contains(&msg.type_()) {
398 return Some(msg);
399 }
400 }
401 }
402
403 pub fn stream(&self) -> BusStream {
404 BusStream::new(self)
405 }
406
407 pub fn stream_filtered<'a>(
408 &self,
409 message_types: &'a [MessageType],
410 ) -> impl FusedStream<Item = Message> + Unpin + Send + 'a {
411 self.stream().filter(move |message| {
412 let message_type = message.type_();
413
414 future::ready(message_types.contains(&message_type))
415 })
416 }
417}
418
419#[must_use = "iterators are lazy and do nothing unless consumed"]
420#[derive(Debug)]
421pub struct Iter<'a> {
422 bus: &'a Bus,
423 timeout: Option<crate::ClockTime>,
424}
425
426impl Iterator for Iter<'_> {
427 type Item = Message;
428
429 fn next(&mut self) -> Option<Message> {
430 self.bus.timed_pop(self.timeout)
431 }
432}
433
434#[derive(Debug)]
435pub struct BusStream {
436 bus: glib::WeakRef<Bus>,
437 receiver: UnboundedReceiver<Message>,
438}
439
440impl BusStream {
441 fn new(bus: &Bus) -> Self {
442 skip_assert_initialized!();
443
444 let mutex = Arc::new(Mutex::new(()));
445 let (sender, receiver) = mpsc::unbounded();
446
447 let _mutex_guard = mutex.lock().unwrap();
453 bus.set_sync_handler({
454 let sender = sender.clone();
455 let mutex = mutex.clone();
456
457 move |_bus, message| {
458 let _mutex_guard = mutex.lock().unwrap();
459
460 let _ = sender.unbounded_send(message.to_owned());
461
462 BusSyncReply::Drop
463 }
464 });
465
466 while let Some(message) = bus.pop() {
468 let _ = sender.unbounded_send(message);
469 }
470
471 Self {
472 bus: bus.downgrade(),
473 receiver,
474 }
475 }
476}
477
478impl Drop for BusStream {
479 fn drop(&mut self) {
480 if let Some(bus) = self.bus.upgrade() {
481 bus.unset_sync_handler();
482 }
483 }
484}
485
486impl Stream for BusStream {
487 type Item = Message;
488
489 fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
490 self.receiver.poll_next_unpin(context)
491 }
492}
493
494impl FusedStream for BusStream {
495 fn is_terminated(&self) -> bool {
496 self.receiver.is_terminated()
497 }
498}
499
500#[derive(Debug)]
505#[must_use = "if unused the bus watch will immediately be removed"]
506pub struct BusWatchGuard {
507 bus: Bus,
508}
509
510impl Drop for BusWatchGuard {
511 fn drop(&mut self) {
512 let _ = self.bus.remove_watch();
513 }
514}
515
516#[cfg(test)]
517mod tests {
518 use std::sync::{Arc, Mutex};
519
520 use super::*;
521
522 #[test]
523 fn test_sync_handler() {
524 crate::init().unwrap();
525
526 let bus = Bus::new();
527 let msgs = Arc::new(Mutex::new(Vec::new()));
528 let msgs_clone = msgs.clone();
529 bus.set_sync_handler(move |_, msg| {
530 msgs_clone.lock().unwrap().push(msg.clone());
531 BusSyncReply::Pass
532 });
533
534 bus.post(crate::message::Eos::new()).unwrap();
535
536 let msgs = msgs.lock().unwrap();
537 assert_eq!(msgs.len(), 1);
538 match msgs[0].view() {
539 crate::MessageView::Eos(_) => (),
540 _ => unreachable!(),
541 }
542 }
543
544 #[test]
545 fn test_bus_stream() {
546 crate::init().unwrap();
547
548 let bus = Bus::new();
549 let bus_stream = bus.stream();
550
551 let eos_message = crate::message::Eos::new();
552 bus.post(eos_message).unwrap();
553
554 let bus_future = bus_stream.into_future();
555 let (message, _) = futures_executor::block_on(bus_future);
556
557 match message.unwrap().view() {
558 crate::MessageView::Eos(_) => (),
559 _ => unreachable!(),
560 }
561 }
562}