Skip to main content

gstreamer_app/
app_sink.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4    mem, panic,
5    pin::Pin,
6    ptr,
7    sync::{Arc, Mutex},
8    task::{Context, Poll, Waker},
9};
10
11#[cfg(not(panic = "abort"))]
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use futures_core::Stream;
15use glib::{ffi::gpointer, prelude::*, translate::*};
16
17use crate::{AppSink, ffi};
18
19#[allow(clippy::type_complexity)]
20pub struct AppSinkCallbacks {
21    eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
22    new_preroll: Option<
23        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
24    >,
25    new_sample: Option<
26        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
27    >,
28    new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
29    propose_allocation:
30        Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
31    #[cfg(not(panic = "abort"))]
32    panicked: AtomicBool,
33    callbacks: ffi::GstAppSinkCallbacks,
34}
35
36unsafe impl Send for AppSinkCallbacks {}
37unsafe impl Sync for AppSinkCallbacks {}
38
39impl AppSinkCallbacks {
40    pub fn builder() -> AppSinkCallbacksBuilder {
41        skip_assert_initialized!();
42        AppSinkCallbacksBuilder {
43            eos: None,
44            new_preroll: None,
45            new_sample: None,
46            new_event: None,
47            propose_allocation: None,
48        }
49    }
50}
51
52#[allow(clippy::type_complexity)]
53#[must_use = "The builder must be built to be used"]
54pub struct AppSinkCallbacksBuilder {
55    eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
56    new_preroll: Option<
57        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
58    >,
59    new_sample: Option<
60        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
61    >,
62    new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
63    propose_allocation:
64        Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
65}
66
67impl AppSinkCallbacksBuilder {
68    pub fn eos<F: FnMut(&AppSink) + Send + 'static>(self, eos: F) -> Self {
69        Self {
70            eos: Some(Box::new(eos)),
71            ..self
72        }
73    }
74
75    pub fn eos_if<F: FnMut(&AppSink) + Send + 'static>(self, eos: F, predicate: bool) -> Self {
76        if predicate { self.eos(eos) } else { self }
77    }
78
79    pub fn eos_if_some<F: FnMut(&AppSink) + Send + 'static>(self, eos: Option<F>) -> Self {
80        if let Some(eos) = eos {
81            self.eos(eos)
82        } else {
83            self
84        }
85    }
86
87    pub fn new_preroll<
88        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
89    >(
90        self,
91        new_preroll: F,
92    ) -> Self {
93        Self {
94            new_preroll: Some(Box::new(new_preroll)),
95            ..self
96        }
97    }
98
99    pub fn new_preroll_if<
100        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
101    >(
102        self,
103        new_preroll: F,
104        predicate: bool,
105    ) -> Self {
106        if predicate {
107            self.new_preroll(new_preroll)
108        } else {
109            self
110        }
111    }
112
113    pub fn new_preroll_if_some<
114        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
115    >(
116        self,
117        new_preroll: Option<F>,
118    ) -> Self {
119        if let Some(new_preroll) = new_preroll {
120            self.new_preroll(new_preroll)
121        } else {
122            self
123        }
124    }
125
126    pub fn new_sample<
127        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
128    >(
129        self,
130        new_sample: F,
131    ) -> Self {
132        Self {
133            new_sample: Some(Box::new(new_sample)),
134            ..self
135        }
136    }
137
138    pub fn new_sample_if<
139        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
140    >(
141        self,
142        new_sample: F,
143        predicate: bool,
144    ) -> Self {
145        if predicate {
146            self.new_sample(new_sample)
147        } else {
148            self
149        }
150    }
151
152    pub fn new_sample_if_some<
153        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
154    >(
155        self,
156        new_sample: Option<F>,
157    ) -> Self {
158        if let Some(new_sample) = new_sample {
159            self.new_sample(new_sample)
160        } else {
161            self
162        }
163    }
164
165    #[cfg(feature = "v1_20")]
166    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
167    pub fn new_event<F: FnMut(&AppSink) -> bool + Send + 'static>(self, new_event: F) -> Self {
168        Self {
169            new_event: Some(Box::new(new_event)),
170            ..self
171        }
172    }
173
174    #[cfg(feature = "v1_20")]
175    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
176    pub fn new_event_if<F: FnMut(&AppSink) -> bool + Send + 'static>(
177        self,
178        new_event: F,
179        predicate: bool,
180    ) -> Self {
181        if predicate {
182            self.new_event(new_event)
183        } else {
184            self
185        }
186    }
187
188    #[cfg(feature = "v1_20")]
189    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
190    pub fn new_event_if_some<F: FnMut(&AppSink) -> bool + Send + 'static>(
191        self,
192        new_event: Option<F>,
193    ) -> Self {
194        if let Some(new_event) = new_event {
195            self.new_event(new_event)
196        } else {
197            self
198        }
199    }
200
201    #[cfg(feature = "v1_24")]
202    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
203    pub fn propose_allocation<
204        F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
205    >(
206        self,
207        propose_allocation: F,
208    ) -> Self {
209        Self {
210            propose_allocation: Some(Box::new(propose_allocation)),
211            ..self
212        }
213    }
214
215    #[cfg(feature = "v1_24")]
216    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
217    pub fn propose_allocation_if<
218        F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
219    >(
220        self,
221        propose_allocation: F,
222        predicate: bool,
223    ) -> Self {
224        if predicate {
225            self.propose_allocation(propose_allocation)
226        } else {
227            self
228        }
229    }
230
231    #[cfg(feature = "v1_24")]
232    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
233    pub fn propose_allocation_if_some<
234        F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
235    >(
236        self,
237        propose_allocation: Option<F>,
238    ) -> Self {
239        if let Some(propose_allocation) = propose_allocation {
240            self.propose_allocation(propose_allocation)
241        } else {
242            self
243        }
244    }
245
246    #[must_use = "Building the callbacks without using them has no effect"]
247    pub fn build(self) -> AppSinkCallbacks {
248        let have_eos = self.eos.is_some();
249        let have_new_preroll = self.new_preroll.is_some();
250        let have_new_sample = self.new_sample.is_some();
251        let have_new_event = self.new_event.is_some();
252        let have_propose_allocation = self.propose_allocation.is_some();
253
254        AppSinkCallbacks {
255            eos: self.eos,
256            new_preroll: self.new_preroll,
257            new_sample: self.new_sample,
258            new_event: self.new_event,
259            propose_allocation: self.propose_allocation,
260            #[cfg(not(panic = "abort"))]
261            panicked: AtomicBool::new(false),
262            callbacks: ffi::GstAppSinkCallbacks {
263                eos: if have_eos { Some(trampoline_eos) } else { None },
264                new_preroll: if have_new_preroll {
265                    Some(trampoline_new_preroll)
266                } else {
267                    None
268                },
269                new_sample: if have_new_sample {
270                    Some(trampoline_new_sample)
271                } else {
272                    None
273                },
274                new_event: if have_new_event {
275                    Some(trampoline_new_event)
276                } else {
277                    None
278                },
279                propose_allocation: if have_propose_allocation {
280                    Some(trampoline_propose_allocation)
281                } else {
282                    None
283                },
284                _gst_reserved: [ptr::null_mut(), ptr::null_mut()],
285            },
286        }
287    }
288}
289
290unsafe extern "C" fn trampoline_eos(appsink: *mut ffi::GstAppSink, callbacks: gpointer) {
291    unsafe {
292        let callbacks = callbacks as *mut AppSinkCallbacks;
293        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
294
295        #[cfg(not(panic = "abort"))]
296        if (*callbacks).panicked.load(Ordering::Relaxed) {
297            let element: Borrowed<AppSink> = from_glib_borrow(appsink);
298            gst::subclass::post_panic_error_message(
299                element.upcast_ref(),
300                element.upcast_ref(),
301                None,
302            );
303            return;
304        }
305
306        if let Some(ref mut eos) = (*callbacks).eos {
307            let result = panic::catch_unwind(panic::AssertUnwindSafe(|| eos(&element)));
308            match result {
309                Ok(result) => result,
310                Err(err) => {
311                    #[cfg(panic = "abort")]
312                    {
313                        unreachable!("{err:?}");
314                    }
315                    #[cfg(not(panic = "abort"))]
316                    {
317                        (*callbacks).panicked.store(true, Ordering::Relaxed);
318                        gst::subclass::post_panic_error_message(
319                            element.upcast_ref(),
320                            element.upcast_ref(),
321                            Some(err),
322                        );
323                    }
324                }
325            }
326        }
327    }
328}
329
330unsafe extern "C" fn trampoline_new_preroll(
331    appsink: *mut ffi::GstAppSink,
332    callbacks: gpointer,
333) -> gst::ffi::GstFlowReturn {
334    unsafe {
335        let callbacks = callbacks as *mut AppSinkCallbacks;
336        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
337
338        #[cfg(not(panic = "abort"))]
339        if (*callbacks).panicked.load(Ordering::Relaxed) {
340            let element: Borrowed<AppSink> = from_glib_borrow(appsink);
341            gst::subclass::post_panic_error_message(
342                element.upcast_ref(),
343                element.upcast_ref(),
344                None,
345            );
346            return gst::FlowReturn::Error.into_glib();
347        }
348
349        let ret = if let Some(ref mut new_preroll) = (*callbacks).new_preroll {
350            let result =
351                panic::catch_unwind(panic::AssertUnwindSafe(|| new_preroll(&element).into()));
352            match result {
353                Ok(result) => result,
354                Err(err) => {
355                    #[cfg(panic = "abort")]
356                    {
357                        unreachable!("{err:?}");
358                    }
359                    #[cfg(not(panic = "abort"))]
360                    {
361                        (*callbacks).panicked.store(true, Ordering::Relaxed);
362                        gst::subclass::post_panic_error_message(
363                            element.upcast_ref(),
364                            element.upcast_ref(),
365                            Some(err),
366                        );
367
368                        gst::FlowReturn::Error
369                    }
370                }
371            }
372        } else {
373            gst::FlowReturn::Error
374        };
375
376        ret.into_glib()
377    }
378}
379
380unsafe extern "C" fn trampoline_new_sample(
381    appsink: *mut ffi::GstAppSink,
382    callbacks: gpointer,
383) -> gst::ffi::GstFlowReturn {
384    unsafe {
385        let callbacks = callbacks as *mut AppSinkCallbacks;
386        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
387
388        #[cfg(not(panic = "abort"))]
389        if (*callbacks).panicked.load(Ordering::Relaxed) {
390            let element: Borrowed<AppSink> = from_glib_borrow(appsink);
391            gst::subclass::post_panic_error_message(
392                element.upcast_ref(),
393                element.upcast_ref(),
394                None,
395            );
396            return gst::FlowReturn::Error.into_glib();
397        }
398
399        let ret = if let Some(ref mut new_sample) = (*callbacks).new_sample {
400            let result =
401                panic::catch_unwind(panic::AssertUnwindSafe(|| new_sample(&element).into()));
402            match result {
403                Ok(result) => result,
404                Err(err) => {
405                    #[cfg(panic = "abort")]
406                    {
407                        unreachable!("{err:?}");
408                    }
409                    #[cfg(not(panic = "abort"))]
410                    {
411                        (*callbacks).panicked.store(true, Ordering::Relaxed);
412                        gst::subclass::post_panic_error_message(
413                            element.upcast_ref(),
414                            element.upcast_ref(),
415                            Some(err),
416                        );
417
418                        gst::FlowReturn::Error
419                    }
420                }
421            }
422        } else {
423            gst::FlowReturn::Error
424        };
425
426        ret.into_glib()
427    }
428}
429
430unsafe extern "C" fn trampoline_new_event(
431    appsink: *mut ffi::GstAppSink,
432    callbacks: gpointer,
433) -> glib::ffi::gboolean {
434    unsafe {
435        let callbacks = callbacks as *mut AppSinkCallbacks;
436        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
437
438        #[cfg(not(panic = "abort"))]
439        if (*callbacks).panicked.load(Ordering::Relaxed) {
440            let element: Borrowed<AppSink> = from_glib_borrow(appsink);
441            gst::subclass::post_panic_error_message(
442                element.upcast_ref(),
443                element.upcast_ref(),
444                None,
445            );
446            return false.into_glib();
447        }
448
449        let ret = if let Some(ref mut new_event) = (*callbacks).new_event {
450            let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_event(&element)));
451            match result {
452                Ok(result) => result,
453                Err(err) => {
454                    #[cfg(panic = "abort")]
455                    {
456                        unreachable!("{err:?}");
457                    }
458                    #[cfg(not(panic = "abort"))]
459                    {
460                        (*callbacks).panicked.store(true, Ordering::Relaxed);
461                        gst::subclass::post_panic_error_message(
462                            element.upcast_ref(),
463                            element.upcast_ref(),
464                            Some(err),
465                        );
466
467                        false
468                    }
469                }
470            }
471        } else {
472            false
473        };
474
475        ret.into_glib()
476    }
477}
478
479unsafe extern "C" fn trampoline_propose_allocation(
480    appsink: *mut ffi::GstAppSink,
481    query: *mut gst::ffi::GstQuery,
482    callbacks: gpointer,
483) -> glib::ffi::gboolean {
484    unsafe {
485        let callbacks = callbacks as *mut AppSinkCallbacks;
486        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
487
488        #[cfg(not(panic = "abort"))]
489        if (*callbacks).panicked.load(Ordering::Relaxed) {
490            let element: Borrowed<AppSink> = from_glib_borrow(appsink);
491            gst::subclass::post_panic_error_message(
492                element.upcast_ref(),
493                element.upcast_ref(),
494                None,
495            );
496            return false.into_glib();
497        }
498
499        let ret = if let Some(ref mut propose_allocation) = (*callbacks).propose_allocation {
500            let query = match gst::QueryRef::from_mut_ptr(query).view_mut() {
501                gst::QueryViewMut::Allocation(allocation) => allocation,
502                _ => unreachable!(),
503            };
504            let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
505                propose_allocation(&element, query)
506            }));
507            match result {
508                Ok(result) => result,
509                Err(err) => {
510                    #[cfg(panic = "abort")]
511                    {
512                        unreachable!("{err:?}");
513                    }
514                    #[cfg(not(panic = "abort"))]
515                    {
516                        (*callbacks).panicked.store(true, Ordering::Relaxed);
517                        gst::subclass::post_panic_error_message(
518                            element.upcast_ref(),
519                            element.upcast_ref(),
520                            Some(err),
521                        );
522                        false
523                    }
524                }
525            }
526        } else {
527            false
528        };
529
530        ret.into_glib()
531    }
532}
533
534unsafe extern "C" fn destroy_callbacks(ptr: gpointer) {
535    unsafe {
536        let _ = Box::<AppSinkCallbacks>::from_raw(ptr as *mut _);
537    }
538}
539
540impl AppSink {
541    // rustdoc-stripper-ignore-next
542    /// Creates a new builder-pattern struct instance to construct [`AppSink`] objects.
543    ///
544    /// This method returns an instance of [`AppSinkBuilder`](crate::builders::AppSinkBuilder) which can be used to create [`AppSink`] objects.
545    pub fn builder<'a>() -> AppSinkBuilder<'a> {
546        assert_initialized_main_thread!();
547        AppSinkBuilder {
548            builder: gst::Object::builder(),
549            callbacks: None,
550            drop_out_of_segment: None,
551        }
552    }
553
554    /// Set callbacks which will be executed for each new preroll, new sample and eos.
555    /// This is an alternative to using the signals, it has lower overhead and is thus
556    /// less expensive, but also less flexible.
557    ///
558    /// If callbacks are installed, no signals will be emitted for performance
559    /// reasons.
560    ///
561    /// Before 1.16.3 it was not possible to change the callbacks in a thread-safe
562    /// way.
563    ///
564    /// Since 1.28.3 it is allowed to set the `callbacks` to [`None`] to unset them.
565    ///
566    /// Note that `gst_app_sink_set_callbacks()` and
567    /// `gst_app_sink_set_simple_callbacks()` are mutually exclusive and setting one
568    /// will unset the other.
569    /// ## `callbacks`
570    /// the callbacks
571    /// ## `notify`
572    /// a destroy notify function
573    #[doc(alias = "gst_app_sink_set_callbacks")]
574    pub fn set_callbacks(&self, callbacks: AppSinkCallbacks) {
575        unsafe {
576            let sink = self.to_glib_none().0;
577
578            #[allow(clippy::manual_dangling_ptr)]
579            #[cfg(not(feature = "v1_18"))]
580            {
581                static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
582                    std::sync::OnceLock::new();
583
584                let set_once_quark = SET_ONCE_QUARK
585                    .get_or_init(|| glib::Quark::from_str("gstreamer-rs-app-sink-callbacks"));
586
587                // This is not thread-safe before 1.16.3, see
588                // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
589                if gst::version() < (1, 16, 3, 0) {
590                    if !glib::gobject_ffi::g_object_get_qdata(
591                        sink as *mut _,
592                        set_once_quark.into_glib(),
593                    )
594                    .is_null()
595                    {
596                        panic!("AppSink callbacks can only be set once");
597                    }
598
599                    glib::gobject_ffi::g_object_set_qdata(
600                        sink as *mut _,
601                        set_once_quark.into_glib(),
602                        1 as *mut _,
603                    );
604                }
605            }
606
607            ffi::gst_app_sink_set_callbacks(
608                sink,
609                mut_override(&callbacks.callbacks),
610                Box::into_raw(Box::new(callbacks)) as *mut _,
611                Some(destroy_callbacks),
612            );
613        }
614    }
615
616    #[doc(alias = "drop-out-of-segment")]
617    pub fn drops_out_of_segment(&self) -> bool {
618        unsafe {
619            from_glib(gst_base::ffi::gst_base_sink_get_drop_out_of_segment(
620                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
621            ))
622        }
623    }
624
625    #[doc(alias = "max-bitrate")]
626    #[doc(alias = "gst_base_sink_get_max_bitrate")]
627    pub fn max_bitrate(&self) -> u64 {
628        unsafe {
629            gst_base::ffi::gst_base_sink_get_max_bitrate(
630                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
631            )
632        }
633    }
634
635    #[doc(alias = "max-lateness")]
636    #[doc(alias = "gst_base_sink_get_max_lateness")]
637    pub fn max_lateness(&self) -> i64 {
638        unsafe {
639            gst_base::ffi::gst_base_sink_get_max_lateness(
640                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
641            )
642        }
643    }
644
645    #[doc(alias = "processing-deadline")]
646    #[cfg(feature = "v1_16")]
647    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
648    #[doc(alias = "gst_base_sink_get_processing_deadline")]
649    pub fn processing_deadline(&self) -> gst::ClockTime {
650        unsafe {
651            try_from_glib(gst_base::ffi::gst_base_sink_get_processing_deadline(
652                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
653            ))
654            .expect("undefined processing_deadline")
655        }
656    }
657
658    #[doc(alias = "render-delay")]
659    #[doc(alias = "gst_base_sink_get_render_delay")]
660    pub fn render_delay(&self) -> gst::ClockTime {
661        unsafe {
662            try_from_glib(gst_base::ffi::gst_base_sink_get_render_delay(
663                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
664            ))
665            .expect("undefined render_delay")
666        }
667    }
668
669    #[cfg(feature = "v1_18")]
670    #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
671    #[doc(alias = "gst_base_sink_get_stats")]
672    pub fn stats(&self) -> gst::Structure {
673        unsafe {
674            from_glib_full(gst_base::ffi::gst_base_sink_get_stats(
675                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
676            ))
677        }
678    }
679
680    #[doc(alias = "sync")]
681    pub fn is_sync(&self) -> bool {
682        unsafe {
683            from_glib(gst_base::ffi::gst_base_sink_get_sync(
684                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
685            ))
686        }
687    }
688
689    #[doc(alias = "throttle-time")]
690    #[doc(alias = "gst_base_sink_get_throttle_time")]
691    pub fn throttle_time(&self) -> u64 {
692        unsafe {
693            gst_base::ffi::gst_base_sink_get_throttle_time(
694                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
695            )
696        }
697    }
698
699    #[doc(alias = "ts-offset")]
700    #[doc(alias = "gst_base_sink_get_ts_offset")]
701    pub fn ts_offset(&self) -> gst::ClockTimeDiff {
702        unsafe {
703            gst_base::ffi::gst_base_sink_get_ts_offset(
704                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
705            )
706        }
707    }
708
709    #[doc(alias = "async")]
710    #[doc(alias = "gst_base_sink_is_async_enabled")]
711    pub fn is_async(&self) -> bool {
712        unsafe {
713            from_glib(gst_base::ffi::gst_base_sink_is_async_enabled(
714                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
715            ))
716        }
717    }
718
719    #[doc(alias = "last-sample")]
720    pub fn enables_last_sample(&self) -> bool {
721        unsafe {
722            from_glib(gst_base::ffi::gst_base_sink_is_last_sample_enabled(
723                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
724            ))
725        }
726    }
727
728    #[doc(alias = "qos")]
729    #[doc(alias = "gst_base_sink_is_qos_enabled")]
730    pub fn is_qos(&self) -> bool {
731        unsafe {
732            from_glib(gst_base::ffi::gst_base_sink_is_qos_enabled(
733                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
734            ))
735        }
736    }
737
738    #[doc(alias = "async")]
739    #[doc(alias = "gst_base_sink_set_async_enabled")]
740    pub fn set_async(&self, enabled: bool) {
741        unsafe {
742            gst_base::ffi::gst_base_sink_set_async_enabled(
743                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
744                enabled.into_glib(),
745            );
746        }
747    }
748
749    #[doc(alias = "drop-out-of-segment")]
750    #[doc(alias = "gst_base_sink_set_drop_out_of_segment")]
751    pub fn set_drop_out_of_segment(&self, drop_out_of_segment: bool) {
752        unsafe {
753            gst_base::ffi::gst_base_sink_set_drop_out_of_segment(
754                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
755                drop_out_of_segment.into_glib(),
756            );
757        }
758    }
759
760    #[doc(alias = "last-sample")]
761    pub fn set_enable_last_sample(&self, enabled: bool) {
762        unsafe {
763            gst_base::ffi::gst_base_sink_set_last_sample_enabled(
764                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
765                enabled.into_glib(),
766            );
767        }
768    }
769
770    #[doc(alias = "max-bitrate")]
771    #[doc(alias = "gst_base_sink_set_max_bitrate")]
772    pub fn set_max_bitrate(&self, max_bitrate: u64) {
773        unsafe {
774            gst_base::ffi::gst_base_sink_set_max_bitrate(
775                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
776                max_bitrate,
777            );
778        }
779    }
780
781    #[doc(alias = "max-lateness")]
782    #[doc(alias = "gst_base_sink_set_max_lateness")]
783    pub fn set_max_lateness(&self, max_lateness: i64) {
784        unsafe {
785            gst_base::ffi::gst_base_sink_set_max_lateness(
786                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
787                max_lateness,
788            );
789        }
790    }
791
792    #[doc(alias = "processing-deadline")]
793    #[cfg(feature = "v1_16")]
794    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
795    #[doc(alias = "gst_base_sink_set_processing_deadline")]
796    pub fn set_processing_deadline(&self, processing_deadline: gst::ClockTime) {
797        unsafe {
798            gst_base::ffi::gst_base_sink_set_processing_deadline(
799                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
800                processing_deadline.into_glib(),
801            );
802        }
803    }
804
805    #[doc(alias = "qos")]
806    #[doc(alias = "gst_base_sink_set_qos_enabled")]
807    pub fn set_qos(&self, enabled: bool) {
808        unsafe {
809            gst_base::ffi::gst_base_sink_set_qos_enabled(
810                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
811                enabled.into_glib(),
812            );
813        }
814    }
815
816    #[doc(alias = "render-delay")]
817    #[doc(alias = "gst_base_sink_set_render_delay")]
818    pub fn set_render_delay(&self, delay: gst::ClockTime) {
819        unsafe {
820            gst_base::ffi::gst_base_sink_set_render_delay(
821                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
822                delay.into_glib(),
823            );
824        }
825    }
826
827    #[doc(alias = "sync")]
828    #[doc(alias = "gst_base_sink_set_sync")]
829    pub fn set_sync(&self, sync: bool) {
830        unsafe {
831            gst_base::ffi::gst_base_sink_set_sync(
832                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
833                sync.into_glib(),
834            );
835        }
836    }
837
838    #[doc(alias = "throttle-time")]
839    #[doc(alias = "gst_base_sink_set_throttle_time")]
840    pub fn set_throttle_time(&self, throttle: u64) {
841        unsafe {
842            gst_base::ffi::gst_base_sink_set_throttle_time(
843                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
844                throttle,
845            );
846        }
847    }
848
849    #[doc(alias = "ts-offset")]
850    #[doc(alias = "gst_base_sink_set_ts_offset")]
851    pub fn set_ts_offset(&self, offset: gst::ClockTimeDiff) {
852        unsafe {
853            gst_base::ffi::gst_base_sink_set_ts_offset(
854                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
855                offset,
856            );
857        }
858    }
859
860    #[doc(alias = "async")]
861    pub fn connect_async_notify<F: Fn(&Self) + Send + Sync + 'static>(
862        &self,
863        f: F,
864    ) -> glib::SignalHandlerId {
865        unsafe extern "C" fn notify_async_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
866            this: *mut ffi::GstAppSink,
867            _param_spec: glib::ffi::gpointer,
868            f: glib::ffi::gpointer,
869        ) {
870            unsafe {
871                let f: &F = &*(f as *const F);
872                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
873            }
874        }
875        unsafe {
876            let f: Box<F> = Box::new(f);
877            glib::signal::connect_raw(
878                self.as_ptr() as *mut _,
879                b"notify::async\0".as_ptr() as *const _,
880                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
881                    notify_async_trampoline::<F> as *const (),
882                )),
883                Box::into_raw(f),
884            )
885        }
886    }
887
888    #[doc(alias = "blocksize")]
889    pub fn connect_blocksize_notify<F: Fn(&Self) + Send + Sync + 'static>(
890        &self,
891        f: F,
892    ) -> glib::SignalHandlerId {
893        unsafe extern "C" fn notify_blocksize_trampoline<
894            F: Fn(&AppSink) + Send + Sync + 'static,
895        >(
896            this: *mut ffi::GstAppSink,
897            _param_spec: glib::ffi::gpointer,
898            f: glib::ffi::gpointer,
899        ) {
900            unsafe {
901                let f: &F = &*(f as *const F);
902                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
903            }
904        }
905        unsafe {
906            let f: Box<F> = Box::new(f);
907            glib::signal::connect_raw(
908                self.as_ptr() as *mut _,
909                b"notify::blocksize\0".as_ptr() as *const _,
910                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
911                    notify_blocksize_trampoline::<F> as *const (),
912                )),
913                Box::into_raw(f),
914            )
915        }
916    }
917
918    #[doc(alias = "enable-last-sample")]
919    pub fn connect_enable_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
920        &self,
921        f: F,
922    ) -> glib::SignalHandlerId {
923        unsafe extern "C" fn notify_enable_last_sample_trampoline<
924            F: Fn(&AppSink) + Send + Sync + 'static,
925        >(
926            this: *mut ffi::GstAppSink,
927            _param_spec: glib::ffi::gpointer,
928            f: glib::ffi::gpointer,
929        ) {
930            unsafe {
931                let f: &F = &*(f as *const F);
932                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
933            }
934        }
935        unsafe {
936            let f: Box<F> = Box::new(f);
937            glib::signal::connect_raw(
938                self.as_ptr() as *mut _,
939                b"notify::enable-last-sample\0".as_ptr() as *const _,
940                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
941                    notify_enable_last_sample_trampoline::<F> as *const (),
942                )),
943                Box::into_raw(f),
944            )
945        }
946    }
947
948    #[doc(alias = "last-sample")]
949    pub fn connect_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
950        &self,
951        f: F,
952    ) -> glib::SignalHandlerId {
953        unsafe extern "C" fn notify_last_sample_trampoline<
954            F: Fn(&AppSink) + Send + Sync + 'static,
955        >(
956            this: *mut ffi::GstAppSink,
957            _param_spec: glib::ffi::gpointer,
958            f: glib::ffi::gpointer,
959        ) {
960            unsafe {
961                let f: &F = &*(f as *const F);
962                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
963            }
964        }
965        unsafe {
966            let f: Box<F> = Box::new(f);
967            glib::signal::connect_raw(
968                self.as_ptr() as *mut _,
969                b"notify::last-sample\0".as_ptr() as *const _,
970                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
971                    notify_last_sample_trampoline::<F> as *const (),
972                )),
973                Box::into_raw(f),
974            )
975        }
976    }
977
978    #[doc(alias = "max-bitrate")]
979    pub fn connect_max_bitrate_notify<F: Fn(&Self) + Send + Sync + 'static>(
980        &self,
981        f: F,
982    ) -> glib::SignalHandlerId {
983        unsafe extern "C" fn notify_max_bitrate_trampoline<
984            F: Fn(&AppSink) + Send + Sync + 'static,
985        >(
986            this: *mut ffi::GstAppSink,
987            _param_spec: glib::ffi::gpointer,
988            f: glib::ffi::gpointer,
989        ) {
990            unsafe {
991                let f: &F = &*(f as *const F);
992                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
993            }
994        }
995        unsafe {
996            let f: Box<F> = Box::new(f);
997            glib::signal::connect_raw(
998                self.as_ptr() as *mut _,
999                b"notify::max-bitrate\0".as_ptr() as *const _,
1000                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1001                    notify_max_bitrate_trampoline::<F> as *const (),
1002                )),
1003                Box::into_raw(f),
1004            )
1005        }
1006    }
1007
1008    #[doc(alias = "max-lateness")]
1009    pub fn connect_max_lateness_notify<F: Fn(&Self) + Send + Sync + 'static>(
1010        &self,
1011        f: F,
1012    ) -> glib::SignalHandlerId {
1013        unsafe extern "C" fn notify_max_lateness_trampoline<
1014            F: Fn(&AppSink) + Send + Sync + 'static,
1015        >(
1016            this: *mut ffi::GstAppSink,
1017            _param_spec: glib::ffi::gpointer,
1018            f: glib::ffi::gpointer,
1019        ) {
1020            unsafe {
1021                let f: &F = &*(f as *const F);
1022                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1023            }
1024        }
1025        unsafe {
1026            let f: Box<F> = Box::new(f);
1027            glib::signal::connect_raw(
1028                self.as_ptr() as *mut _,
1029                b"notify::max-lateness\0".as_ptr() as *const _,
1030                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1031                    notify_max_lateness_trampoline::<F> as *const (),
1032                )),
1033                Box::into_raw(f),
1034            )
1035        }
1036    }
1037
1038    #[cfg(feature = "v1_16")]
1039    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
1040    #[doc(alias = "processing-deadline")]
1041    pub fn connect_processing_deadline_notify<F: Fn(&Self) + Send + Sync + 'static>(
1042        &self,
1043        f: F,
1044    ) -> glib::SignalHandlerId {
1045        unsafe extern "C" fn notify_processing_deadline_trampoline<
1046            F: Fn(&AppSink) + Send + Sync + 'static,
1047        >(
1048            this: *mut ffi::GstAppSink,
1049            _param_spec: glib::ffi::gpointer,
1050            f: glib::ffi::gpointer,
1051        ) {
1052            unsafe {
1053                let f: &F = &*(f as *const F);
1054                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1055            }
1056        }
1057        unsafe {
1058            let f: Box<F> = Box::new(f);
1059            glib::signal::connect_raw(
1060                self.as_ptr() as *mut _,
1061                b"notify::processing-deadline\0".as_ptr() as *const _,
1062                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1063                    notify_processing_deadline_trampoline::<F> as *const (),
1064                )),
1065                Box::into_raw(f),
1066            )
1067        }
1068    }
1069
1070    #[doc(alias = "qos")]
1071    pub fn connect_qos_notify<F: Fn(&Self) + Send + Sync + 'static>(
1072        &self,
1073        f: F,
1074    ) -> glib::SignalHandlerId {
1075        unsafe extern "C" fn notify_qos_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1076            this: *mut ffi::GstAppSink,
1077            _param_spec: glib::ffi::gpointer,
1078            f: glib::ffi::gpointer,
1079        ) {
1080            unsafe {
1081                let f: &F = &*(f as *const F);
1082                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1083            }
1084        }
1085        unsafe {
1086            let f: Box<F> = Box::new(f);
1087            glib::signal::connect_raw(
1088                self.as_ptr() as *mut _,
1089                b"notify::qos\0".as_ptr() as *const _,
1090                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1091                    notify_qos_trampoline::<F> as *const (),
1092                )),
1093                Box::into_raw(f),
1094            )
1095        }
1096    }
1097
1098    #[doc(alias = "render-delay")]
1099    pub fn connect_render_delay_notify<F: Fn(&Self) + Send + Sync + 'static>(
1100        &self,
1101        f: F,
1102    ) -> glib::SignalHandlerId {
1103        unsafe extern "C" fn notify_render_delay_trampoline<
1104            F: Fn(&AppSink) + Send + Sync + 'static,
1105        >(
1106            this: *mut ffi::GstAppSink,
1107            _param_spec: glib::ffi::gpointer,
1108            f: glib::ffi::gpointer,
1109        ) {
1110            unsafe {
1111                let f: &F = &*(f as *const F);
1112                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1113            }
1114        }
1115        unsafe {
1116            let f: Box<F> = Box::new(f);
1117            glib::signal::connect_raw(
1118                self.as_ptr() as *mut _,
1119                b"notify::render-delay\0".as_ptr() as *const _,
1120                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1121                    notify_render_delay_trampoline::<F> as *const (),
1122                )),
1123                Box::into_raw(f),
1124            )
1125        }
1126    }
1127
1128    #[cfg(feature = "v1_18")]
1129    #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
1130    #[doc(alias = "stats")]
1131    pub fn connect_stats_notify<F: Fn(&Self) + Send + Sync + 'static>(
1132        &self,
1133        f: F,
1134    ) -> glib::SignalHandlerId {
1135        unsafe extern "C" fn notify_stats_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1136            this: *mut ffi::GstAppSink,
1137            _param_spec: glib::ffi::gpointer,
1138            f: glib::ffi::gpointer,
1139        ) {
1140            unsafe {
1141                let f: &F = &*(f as *const F);
1142                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1143            }
1144        }
1145        unsafe {
1146            let f: Box<F> = Box::new(f);
1147            glib::signal::connect_raw(
1148                self.as_ptr() as *mut _,
1149                b"notify::stats\0".as_ptr() as *const _,
1150                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1151                    notify_stats_trampoline::<F> as *const (),
1152                )),
1153                Box::into_raw(f),
1154            )
1155        }
1156    }
1157
1158    #[doc(alias = "sync")]
1159    pub fn connect_sync_notify<F: Fn(&Self) + Send + Sync + 'static>(
1160        &self,
1161        f: F,
1162    ) -> glib::SignalHandlerId {
1163        unsafe extern "C" fn notify_sync_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1164            this: *mut ffi::GstAppSink,
1165            _param_spec: glib::ffi::gpointer,
1166            f: glib::ffi::gpointer,
1167        ) {
1168            unsafe {
1169                let f: &F = &*(f as *const F);
1170                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1171            }
1172        }
1173        unsafe {
1174            let f: Box<F> = Box::new(f);
1175            glib::signal::connect_raw(
1176                self.as_ptr() as *mut _,
1177                b"notify::sync\0".as_ptr() as *const _,
1178                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1179                    notify_sync_trampoline::<F> as *const (),
1180                )),
1181                Box::into_raw(f),
1182            )
1183        }
1184    }
1185
1186    #[doc(alias = "throttle-time")]
1187    pub fn connect_throttle_time_notify<F: Fn(&Self) + Send + Sync + 'static>(
1188        &self,
1189        f: F,
1190    ) -> glib::SignalHandlerId {
1191        unsafe extern "C" fn notify_throttle_time_trampoline<
1192            F: Fn(&AppSink) + Send + Sync + 'static,
1193        >(
1194            this: *mut ffi::GstAppSink,
1195            _param_spec: glib::ffi::gpointer,
1196            f: glib::ffi::gpointer,
1197        ) {
1198            unsafe {
1199                let f: &F = &*(f as *const F);
1200                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1201            }
1202        }
1203        unsafe {
1204            let f: Box<F> = Box::new(f);
1205            glib::signal::connect_raw(
1206                self.as_ptr() as *mut _,
1207                b"notify::throttle-time\0".as_ptr() as *const _,
1208                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1209                    notify_throttle_time_trampoline::<F> as *const (),
1210                )),
1211                Box::into_raw(f),
1212            )
1213        }
1214    }
1215
1216    #[doc(alias = "ts-offset")]
1217    pub fn connect_ts_offset_notify<F: Fn(&Self) + Send + Sync + 'static>(
1218        &self,
1219        f: F,
1220    ) -> glib::SignalHandlerId {
1221        unsafe extern "C" fn notify_ts_offset_trampoline<
1222            F: Fn(&AppSink) + Send + Sync + 'static,
1223        >(
1224            this: *mut ffi::GstAppSink,
1225            _param_spec: glib::ffi::gpointer,
1226            f: glib::ffi::gpointer,
1227        ) {
1228            unsafe {
1229                let f: &F = &*(f as *const F);
1230                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1231            }
1232        }
1233        unsafe {
1234            let f: Box<F> = Box::new(f);
1235            glib::signal::connect_raw(
1236                self.as_ptr() as *mut _,
1237                b"notify::ts-offset\0".as_ptr() as *const _,
1238                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1239                    notify_ts_offset_trampoline::<F> as *const (),
1240                )),
1241                Box::into_raw(f),
1242            )
1243        }
1244    }
1245
1246    pub fn stream(&self) -> AppSinkStream {
1247        AppSinkStream::new(self)
1248    }
1249}
1250
1251// rustdoc-stripper-ignore-next
1252/// A [builder-pattern] type to construct [`AppSink`] objects.
1253///
1254/// [builder-pattern]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html
1255#[must_use = "The builder must be built to be used"]
1256pub struct AppSinkBuilder<'a> {
1257    builder: gst::gobject::GObjectBuilder<'a, AppSink>,
1258    callbacks: Option<AppSinkCallbacks>,
1259    drop_out_of_segment: Option<bool>,
1260}
1261
1262impl<'a> AppSinkBuilder<'a> {
1263    // rustdoc-stripper-ignore-next
1264    /// Build the [`AppSink`].
1265    ///
1266    /// # Panics
1267    ///
1268    /// This panics if the [`AppSink`] doesn't have all the given properties or
1269    /// property values of the wrong type are provided.
1270    #[must_use = "Building the object from the builder is usually expensive and is not expected to have side effects"]
1271    pub fn build(self) -> AppSink {
1272        let appsink = self.builder.build().unwrap();
1273
1274        if let Some(callbacks) = self.callbacks {
1275            appsink.set_callbacks(callbacks);
1276        }
1277
1278        if let Some(drop_out_of_segment) = self.drop_out_of_segment {
1279            appsink.set_drop_out_of_segment(drop_out_of_segment);
1280        }
1281
1282        appsink
1283    }
1284
1285    pub fn async_(self, async_: bool) -> Self {
1286        Self {
1287            builder: self.builder.property("async", async_),
1288            ..self
1289        }
1290    }
1291
1292    pub fn buffer_list(self, buffer_list: bool) -> Self {
1293        Self {
1294            builder: self.builder.property("buffer-list", buffer_list),
1295            ..self
1296        }
1297    }
1298
1299    pub fn callbacks(self, callbacks: AppSinkCallbacks) -> Self {
1300        Self {
1301            callbacks: Some(callbacks),
1302            ..self
1303        }
1304    }
1305
1306    pub fn caps(self, caps: &'a gst::Caps) -> Self {
1307        Self {
1308            builder: self.builder.property("caps", caps),
1309            ..self
1310        }
1311    }
1312
1313    #[cfg_attr(feature = "v1_28", deprecated = "Since 1.28")]
1314    #[allow(deprecated)]
1315    pub fn drop(self, drop: bool) -> Self {
1316        Self {
1317            builder: self.builder.property("drop", drop),
1318            ..self
1319        }
1320    }
1321
1322    pub fn drop_out_of_segment(self, drop_out_of_segment: bool) -> Self {
1323        Self {
1324            drop_out_of_segment: Some(drop_out_of_segment),
1325            ..self
1326        }
1327    }
1328
1329    pub fn enable_last_sample(self, enable_last_sample: bool) -> Self {
1330        Self {
1331            builder: self
1332                .builder
1333                .property("enable-last-sample", enable_last_sample),
1334            ..self
1335        }
1336    }
1337
1338    pub fn max_bitrate(self, max_bitrate: u64) -> Self {
1339        Self {
1340            builder: self.builder.property("max-bitrate", max_bitrate),
1341            ..self
1342        }
1343    }
1344
1345    pub fn max_buffers(self, max_buffers: u32) -> Self {
1346        Self {
1347            builder: self.builder.property("max-buffers", max_buffers),
1348            ..self
1349        }
1350    }
1351
1352    pub fn max_lateness(self, max_lateness: i64) -> Self {
1353        Self {
1354            builder: self.builder.property("max-lateness", max_lateness),
1355            ..self
1356        }
1357    }
1358
1359    #[cfg(feature = "v1_16")]
1360    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
1361    pub fn processing_deadline(self, processing_deadline: gst::ClockTime) -> Self {
1362        Self {
1363            builder: self
1364                .builder
1365                .property("processing-deadline", processing_deadline),
1366            ..self
1367        }
1368    }
1369
1370    pub fn qos(self, qos: bool) -> Self {
1371        Self {
1372            builder: self.builder.property("qos", qos),
1373            ..self
1374        }
1375    }
1376
1377    pub fn render_delay(self, render_delay: Option<gst::ClockTime>) -> Self {
1378        Self {
1379            builder: self.builder.property("render-delay", render_delay),
1380            ..self
1381        }
1382    }
1383
1384    pub fn sync(self, sync: bool) -> Self {
1385        Self {
1386            builder: self.builder.property("sync", sync),
1387            ..self
1388        }
1389    }
1390
1391    pub fn throttle_time(self, throttle_time: u64) -> Self {
1392        Self {
1393            builder: self.builder.property("throttle-time", throttle_time),
1394            ..self
1395        }
1396    }
1397
1398    pub fn ts_offset(self, ts_offset: gst::ClockTimeDiff) -> Self {
1399        Self {
1400            builder: self.builder.property("ts-offset", ts_offset),
1401            ..self
1402        }
1403    }
1404
1405    pub fn wait_on_eos(self, wait_on_eos: bool) -> Self {
1406        Self {
1407            builder: self.builder.property("wait-on-eos", wait_on_eos),
1408            ..self
1409        }
1410    }
1411
1412    #[cfg(feature = "v1_24")]
1413    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1414    pub fn max_time(self, max_time: Option<gst::ClockTime>) -> Self {
1415        Self {
1416            builder: self.builder.property("max-time", max_time),
1417            ..self
1418        }
1419    }
1420
1421    #[cfg(feature = "v1_24")]
1422    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1423    pub fn max_bytes(self, max_bytes: u64) -> Self {
1424        Self {
1425            builder: self.builder.property("max-bytes", max_bytes),
1426            ..self
1427        }
1428    }
1429
1430    #[cfg(feature = "v1_28")]
1431    #[cfg_attr(docsrs, doc(cfg(feature = "v1_28")))]
1432    pub fn leaky_type(self, leaky_type: crate::AppLeakyType) -> Self {
1433        Self {
1434            builder: self.builder.property("leaky-type", leaky_type),
1435            ..self
1436        }
1437    }
1438
1439    #[cfg(feature = "v1_28")]
1440    #[cfg_attr(docsrs, doc(cfg(feature = "v1_28")))]
1441    pub fn silent(self, silent: bool) -> Self {
1442        Self {
1443            builder: self.builder.property("silent", silent),
1444            ..self
1445        }
1446    }
1447
1448    // rustdoc-stripper-ignore-next
1449    /// Sets property `name` to the given value `value`.
1450    ///
1451    /// Overrides any default or previously defined value for `name`.
1452    #[inline]
1453    pub fn property(self, name: &'a str, value: impl Into<glib::Value> + 'a) -> Self {
1454        Self {
1455            builder: self.builder.property(name, value),
1456            ..self
1457        }
1458    }
1459
1460    // rustdoc-stripper-ignore-next
1461    /// Sets property `name` to the given string value `value`.
1462    #[inline]
1463    pub fn property_from_str(self, name: &'a str, value: &'a str) -> Self {
1464        Self {
1465            builder: self.builder.property_from_str(name, value),
1466            ..self
1467        }
1468    }
1469
1470    gst::impl_builder_gvalue_extra_setters!(property_and_name);
1471}
1472
1473#[derive(Debug)]
1474pub struct AppSinkStream {
1475    app_sink: glib::WeakRef<AppSink>,
1476    waker_reference: Arc<Mutex<Option<Waker>>>,
1477}
1478
1479impl AppSinkStream {
1480    fn new(app_sink: &AppSink) -> Self {
1481        skip_assert_initialized!();
1482
1483        let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
1484
1485        app_sink.set_callbacks(
1486            AppSinkCallbacks::builder()
1487                .new_sample({
1488                    let waker_reference = Arc::clone(&waker_reference);
1489
1490                    move |_| {
1491                        if let Some(waker) = waker_reference.lock().unwrap().take() {
1492                            waker.wake();
1493                        }
1494
1495                        Ok(gst::FlowSuccess::Ok)
1496                    }
1497                })
1498                .eos({
1499                    let waker_reference = Arc::clone(&waker_reference);
1500
1501                    move |_| {
1502                        if let Some(waker) = waker_reference.lock().unwrap().take() {
1503                            waker.wake();
1504                        }
1505                    }
1506                })
1507                .build(),
1508        );
1509
1510        Self {
1511            app_sink: app_sink.downgrade(),
1512            waker_reference,
1513        }
1514    }
1515}
1516
1517impl Drop for AppSinkStream {
1518    fn drop(&mut self) {
1519        #[cfg(not(feature = "v1_18"))]
1520        {
1521            // This is not thread-safe before 1.16.3, see
1522            // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
1523            if gst::version() >= (1, 16, 3, 0)
1524                && let Some(app_sink) = self.app_sink.upgrade()
1525            {
1526                app_sink.set_callbacks(AppSinkCallbacks::builder().build());
1527            }
1528        }
1529    }
1530}
1531
1532impl Stream for AppSinkStream {
1533    type Item = gst::Sample;
1534
1535    fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
1536        let mut waker = self.waker_reference.lock().unwrap();
1537
1538        let Some(app_sink) = self.app_sink.upgrade() else {
1539            return Poll::Ready(None);
1540        };
1541
1542        app_sink
1543            .try_pull_sample(gst::ClockTime::ZERO)
1544            .map(|sample| Poll::Ready(Some(sample)))
1545            .unwrap_or_else(|| {
1546                if app_sink.is_eos() {
1547                    return Poll::Ready(None);
1548                }
1549
1550                waker.replace(context.waker().to_owned());
1551
1552                Poll::Pending
1553            })
1554    }
1555}
1556
1557#[cfg(test)]
1558mod tests {
1559    use futures_util::StreamExt;
1560    use gst::prelude::*;
1561
1562    use super::*;
1563
1564    #[test]
1565    fn test_app_sink_stream() {
1566        gst::init().unwrap();
1567
1568        let videotestsrc = gst::ElementFactory::make("videotestsrc")
1569            .property("num-buffers", 5)
1570            .build()
1571            .unwrap();
1572        let appsink = gst::ElementFactory::make("appsink").build().unwrap();
1573
1574        let pipeline = gst::Pipeline::new();
1575        pipeline.add(&videotestsrc).unwrap();
1576        pipeline.add(&appsink).unwrap();
1577
1578        videotestsrc.link(&appsink).unwrap();
1579
1580        let app_sink_stream = appsink.dynamic_cast::<AppSink>().unwrap().stream();
1581        let samples_future = app_sink_stream.collect::<Vec<gst::Sample>>();
1582
1583        pipeline.set_state(gst::State::Playing).unwrap();
1584        let samples = futures_executor::block_on(samples_future);
1585        pipeline.set_state(gst::State::Null).unwrap();
1586
1587        assert_eq!(samples.len(), 5);
1588    }
1589}