gstreamer_app/
app_sink.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4    mem, panic,
5    pin::Pin,
6    ptr,
7    sync::{Arc, Mutex},
8    task::{Context, Poll, Waker},
9};
10
11#[cfg(not(panic = "abort"))]
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use futures_core::Stream;
15use glib::{ffi::gpointer, prelude::*, translate::*};
16
17use crate::{ffi, AppSink};
18
19#[allow(clippy::type_complexity)]
20pub struct AppSinkCallbacks {
21    eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
22    new_preroll: Option<
23        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
24    >,
25    new_sample: Option<
26        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
27    >,
28    new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
29    propose_allocation:
30        Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
31    #[cfg(not(panic = "abort"))]
32    panicked: AtomicBool,
33    callbacks: ffi::GstAppSinkCallbacks,
34}
35
36unsafe impl Send for AppSinkCallbacks {}
37unsafe impl Sync for AppSinkCallbacks {}
38
39impl AppSinkCallbacks {
40    pub fn builder() -> AppSinkCallbacksBuilder {
41        skip_assert_initialized!();
42        AppSinkCallbacksBuilder {
43            eos: None,
44            new_preroll: None,
45            new_sample: None,
46            new_event: None,
47            propose_allocation: None,
48        }
49    }
50}
51
52#[allow(clippy::type_complexity)]
53#[must_use = "The builder must be built to be used"]
54pub struct AppSinkCallbacksBuilder {
55    eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
56    new_preroll: Option<
57        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
58    >,
59    new_sample: Option<
60        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
61    >,
62    new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
63    propose_allocation:
64        Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
65}
66
67impl AppSinkCallbacksBuilder {
68    pub fn eos<F: FnMut(&AppSink) + Send + 'static>(self, eos: F) -> Self {
69        Self {
70            eos: Some(Box::new(eos)),
71            ..self
72        }
73    }
74
75    pub fn eos_if<F: FnMut(&AppSink) + Send + 'static>(self, eos: F, predicate: bool) -> Self {
76        if predicate {
77            self.eos(eos)
78        } else {
79            self
80        }
81    }
82
83    pub fn eos_if_some<F: FnMut(&AppSink) + Send + 'static>(self, eos: Option<F>) -> Self {
84        if let Some(eos) = eos {
85            self.eos(eos)
86        } else {
87            self
88        }
89    }
90
91    pub fn new_preroll<
92        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
93    >(
94        self,
95        new_preroll: F,
96    ) -> Self {
97        Self {
98            new_preroll: Some(Box::new(new_preroll)),
99            ..self
100        }
101    }
102
103    pub fn new_preroll_if<
104        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
105    >(
106        self,
107        new_preroll: F,
108        predicate: bool,
109    ) -> Self {
110        if predicate {
111            self.new_preroll(new_preroll)
112        } else {
113            self
114        }
115    }
116
117    pub fn new_preroll_if_some<
118        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
119    >(
120        self,
121        new_preroll: Option<F>,
122    ) -> Self {
123        if let Some(new_preroll) = new_preroll {
124            self.new_preroll(new_preroll)
125        } else {
126            self
127        }
128    }
129
130    pub fn new_sample<
131        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
132    >(
133        self,
134        new_sample: F,
135    ) -> Self {
136        Self {
137            new_sample: Some(Box::new(new_sample)),
138            ..self
139        }
140    }
141
142    pub fn new_sample_if<
143        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
144    >(
145        self,
146        new_sample: F,
147        predicate: bool,
148    ) -> Self {
149        if predicate {
150            self.new_sample(new_sample)
151        } else {
152            self
153        }
154    }
155
156    pub fn new_sample_if_some<
157        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
158    >(
159        self,
160        new_sample: Option<F>,
161    ) -> Self {
162        if let Some(new_sample) = new_sample {
163            self.new_sample(new_sample)
164        } else {
165            self
166        }
167    }
168
169    #[cfg(feature = "v1_20")]
170    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
171    pub fn new_event<F: FnMut(&AppSink) -> bool + Send + 'static>(self, new_event: F) -> Self {
172        Self {
173            new_event: Some(Box::new(new_event)),
174            ..self
175        }
176    }
177
178    #[cfg(feature = "v1_20")]
179    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
180    pub fn new_event_if<F: FnMut(&AppSink) -> bool + Send + 'static>(
181        self,
182        new_event: F,
183        predicate: bool,
184    ) -> Self {
185        if predicate {
186            self.new_event(new_event)
187        } else {
188            self
189        }
190    }
191
192    #[cfg(feature = "v1_20")]
193    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
194    pub fn new_event_if_some<F: FnMut(&AppSink) -> bool + Send + 'static>(
195        self,
196        new_event: Option<F>,
197    ) -> Self {
198        if let Some(new_event) = new_event {
199            self.new_event(new_event)
200        } else {
201            self
202        }
203    }
204
205    #[cfg(feature = "v1_24")]
206    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
207    pub fn propose_allocation<
208        F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
209    >(
210        self,
211        propose_allocation: F,
212    ) -> Self {
213        Self {
214            propose_allocation: Some(Box::new(propose_allocation)),
215            ..self
216        }
217    }
218
219    #[cfg(feature = "v1_24")]
220    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
221    pub fn propose_allocation_if<
222        F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
223    >(
224        self,
225        propose_allocation: F,
226        predicate: bool,
227    ) -> Self {
228        if predicate {
229            self.propose_allocation(propose_allocation)
230        } else {
231            self
232        }
233    }
234
235    #[cfg(feature = "v1_24")]
236    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
237    pub fn propose_allocation_if_some<
238        F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
239    >(
240        self,
241        propose_allocation: Option<F>,
242    ) -> Self {
243        if let Some(propose_allocation) = propose_allocation {
244            self.propose_allocation(propose_allocation)
245        } else {
246            self
247        }
248    }
249
250    #[must_use = "Building the callbacks without using them has no effect"]
251    pub fn build(self) -> AppSinkCallbacks {
252        let have_eos = self.eos.is_some();
253        let have_new_preroll = self.new_preroll.is_some();
254        let have_new_sample = self.new_sample.is_some();
255        let have_new_event = self.new_event.is_some();
256        let have_propose_allocation = self.propose_allocation.is_some();
257
258        AppSinkCallbacks {
259            eos: self.eos,
260            new_preroll: self.new_preroll,
261            new_sample: self.new_sample,
262            new_event: self.new_event,
263            propose_allocation: self.propose_allocation,
264            #[cfg(not(panic = "abort"))]
265            panicked: AtomicBool::new(false),
266            callbacks: ffi::GstAppSinkCallbacks {
267                eos: if have_eos { Some(trampoline_eos) } else { None },
268                new_preroll: if have_new_preroll {
269                    Some(trampoline_new_preroll)
270                } else {
271                    None
272                },
273                new_sample: if have_new_sample {
274                    Some(trampoline_new_sample)
275                } else {
276                    None
277                },
278                new_event: if have_new_event {
279                    Some(trampoline_new_event)
280                } else {
281                    None
282                },
283                propose_allocation: if have_propose_allocation {
284                    Some(trampoline_propose_allocation)
285                } else {
286                    None
287                },
288                _gst_reserved: [ptr::null_mut(), ptr::null_mut()],
289            },
290        }
291    }
292}
293
294unsafe extern "C" fn trampoline_eos(appsink: *mut ffi::GstAppSink, callbacks: gpointer) {
295    let callbacks = callbacks as *mut AppSinkCallbacks;
296    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
297
298    #[cfg(not(panic = "abort"))]
299    if (*callbacks).panicked.load(Ordering::Relaxed) {
300        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
301        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
302        return;
303    }
304
305    if let Some(ref mut eos) = (*callbacks).eos {
306        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| eos(&element)));
307        match result {
308            Ok(result) => result,
309            Err(err) => {
310                #[cfg(panic = "abort")]
311                {
312                    unreachable!("{err:?}");
313                }
314                #[cfg(not(panic = "abort"))]
315                {
316                    (*callbacks).panicked.store(true, Ordering::Relaxed);
317                    gst::subclass::post_panic_error_message(
318                        element.upcast_ref(),
319                        element.upcast_ref(),
320                        Some(err),
321                    );
322                }
323            }
324        }
325    }
326}
327
328unsafe extern "C" fn trampoline_new_preroll(
329    appsink: *mut ffi::GstAppSink,
330    callbacks: gpointer,
331) -> gst::ffi::GstFlowReturn {
332    let callbacks = callbacks as *mut AppSinkCallbacks;
333    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
334
335    #[cfg(not(panic = "abort"))]
336    if (*callbacks).panicked.load(Ordering::Relaxed) {
337        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
338        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
339        return gst::FlowReturn::Error.into_glib();
340    }
341
342    let ret = if let Some(ref mut new_preroll) = (*callbacks).new_preroll {
343        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_preroll(&element).into()));
344        match result {
345            Ok(result) => result,
346            Err(err) => {
347                #[cfg(panic = "abort")]
348                {
349                    unreachable!("{err:?}");
350                }
351                #[cfg(not(panic = "abort"))]
352                {
353                    (*callbacks).panicked.store(true, Ordering::Relaxed);
354                    gst::subclass::post_panic_error_message(
355                        element.upcast_ref(),
356                        element.upcast_ref(),
357                        Some(err),
358                    );
359
360                    gst::FlowReturn::Error
361                }
362            }
363        }
364    } else {
365        gst::FlowReturn::Error
366    };
367
368    ret.into_glib()
369}
370
371unsafe extern "C" fn trampoline_new_sample(
372    appsink: *mut ffi::GstAppSink,
373    callbacks: gpointer,
374) -> gst::ffi::GstFlowReturn {
375    let callbacks = callbacks as *mut AppSinkCallbacks;
376    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
377
378    #[cfg(not(panic = "abort"))]
379    if (*callbacks).panicked.load(Ordering::Relaxed) {
380        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
381        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
382        return gst::FlowReturn::Error.into_glib();
383    }
384
385    let ret = if let Some(ref mut new_sample) = (*callbacks).new_sample {
386        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_sample(&element).into()));
387        match result {
388            Ok(result) => result,
389            Err(err) => {
390                #[cfg(panic = "abort")]
391                {
392                    unreachable!("{err:?}");
393                }
394                #[cfg(not(panic = "abort"))]
395                {
396                    (*callbacks).panicked.store(true, Ordering::Relaxed);
397                    gst::subclass::post_panic_error_message(
398                        element.upcast_ref(),
399                        element.upcast_ref(),
400                        Some(err),
401                    );
402
403                    gst::FlowReturn::Error
404                }
405            }
406        }
407    } else {
408        gst::FlowReturn::Error
409    };
410
411    ret.into_glib()
412}
413
414unsafe extern "C" fn trampoline_new_event(
415    appsink: *mut ffi::GstAppSink,
416    callbacks: gpointer,
417) -> glib::ffi::gboolean {
418    let callbacks = callbacks as *mut AppSinkCallbacks;
419    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
420
421    #[cfg(not(panic = "abort"))]
422    if (*callbacks).panicked.load(Ordering::Relaxed) {
423        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
424        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
425        return false.into_glib();
426    }
427
428    let ret = if let Some(ref mut new_event) = (*callbacks).new_event {
429        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_event(&element)));
430        match result {
431            Ok(result) => result,
432            Err(err) => {
433                #[cfg(panic = "abort")]
434                {
435                    unreachable!("{err:?}");
436                }
437                #[cfg(not(panic = "abort"))]
438                {
439                    (*callbacks).panicked.store(true, Ordering::Relaxed);
440                    gst::subclass::post_panic_error_message(
441                        element.upcast_ref(),
442                        element.upcast_ref(),
443                        Some(err),
444                    );
445
446                    false
447                }
448            }
449        }
450    } else {
451        false
452    };
453
454    ret.into_glib()
455}
456
457unsafe extern "C" fn trampoline_propose_allocation(
458    appsink: *mut ffi::GstAppSink,
459    query: *mut gst::ffi::GstQuery,
460    callbacks: gpointer,
461) -> glib::ffi::gboolean {
462    let callbacks = callbacks as *mut AppSinkCallbacks;
463    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
464
465    #[cfg(not(panic = "abort"))]
466    if (*callbacks).panicked.load(Ordering::Relaxed) {
467        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
468        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
469        return false.into_glib();
470    }
471
472    let ret = if let Some(ref mut propose_allocation) = (*callbacks).propose_allocation {
473        let query = match gst::QueryRef::from_mut_ptr(query).view_mut() {
474            gst::QueryViewMut::Allocation(allocation) => allocation,
475            _ => unreachable!(),
476        };
477        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
478            propose_allocation(&element, query)
479        }));
480        match result {
481            Ok(result) => result,
482            Err(err) => {
483                #[cfg(panic = "abort")]
484                {
485                    unreachable!("{err:?}");
486                }
487                #[cfg(not(panic = "abort"))]
488                {
489                    (*callbacks).panicked.store(true, Ordering::Relaxed);
490                    gst::subclass::post_panic_error_message(
491                        element.upcast_ref(),
492                        element.upcast_ref(),
493                        Some(err),
494                    );
495                    false
496                }
497            }
498        }
499    } else {
500        false
501    };
502
503    ret.into_glib()
504}
505
506unsafe extern "C" fn destroy_callbacks(ptr: gpointer) {
507    let _ = Box::<AppSinkCallbacks>::from_raw(ptr as *mut _);
508}
509
510impl AppSink {
511    // rustdoc-stripper-ignore-next
512    /// Creates a new builder-pattern struct instance to construct [`AppSink`] objects.
513    ///
514    /// This method returns an instance of [`AppSinkBuilder`](crate::builders::AppSinkBuilder) which can be used to create [`AppSink`] objects.
515    pub fn builder<'a>() -> AppSinkBuilder<'a> {
516        assert_initialized_main_thread!();
517        AppSinkBuilder {
518            builder: gst::Object::builder(),
519            callbacks: None,
520            drop_out_of_segment: None,
521        }
522    }
523
524    /// Set callbacks which will be executed for each new preroll, new sample and eos.
525    /// This is an alternative to using the signals, it has lower overhead and is thus
526    /// less expensive, but also less flexible.
527    ///
528    /// If callbacks are installed, no signals will be emitted for performance
529    /// reasons.
530    ///
531    /// Before 1.16.3 it was not possible to change the callbacks in a thread-safe
532    /// way.
533    /// ## `callbacks`
534    /// the callbacks
535    /// ## `notify`
536    /// a destroy notify function
537    #[doc(alias = "gst_app_sink_set_callbacks")]
538    pub fn set_callbacks(&self, callbacks: AppSinkCallbacks) {
539        unsafe {
540            let sink = self.to_glib_none().0;
541
542            #[allow(clippy::manual_dangling_ptr)]
543            #[cfg(not(feature = "v1_18"))]
544            {
545                static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
546                    std::sync::OnceLock::new();
547
548                let set_once_quark = SET_ONCE_QUARK
549                    .get_or_init(|| glib::Quark::from_str("gstreamer-rs-app-sink-callbacks"));
550
551                // This is not thread-safe before 1.16.3, see
552                // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
553                if gst::version() < (1, 16, 3, 0) {
554                    if !glib::gobject_ffi::g_object_get_qdata(
555                        sink as *mut _,
556                        set_once_quark.into_glib(),
557                    )
558                    .is_null()
559                    {
560                        panic!("AppSink callbacks can only be set once");
561                    }
562
563                    glib::gobject_ffi::g_object_set_qdata(
564                        sink as *mut _,
565                        set_once_quark.into_glib(),
566                        1 as *mut _,
567                    );
568                }
569            }
570
571            ffi::gst_app_sink_set_callbacks(
572                sink,
573                mut_override(&callbacks.callbacks),
574                Box::into_raw(Box::new(callbacks)) as *mut _,
575                Some(destroy_callbacks),
576            );
577        }
578    }
579
580    #[doc(alias = "drop-out-of-segment")]
581    pub fn drops_out_of_segment(&self) -> bool {
582        unsafe {
583            from_glib(gst_base::ffi::gst_base_sink_get_drop_out_of_segment(
584                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
585            ))
586        }
587    }
588
589    #[doc(alias = "max-bitrate")]
590    #[doc(alias = "gst_base_sink_get_max_bitrate")]
591    pub fn max_bitrate(&self) -> u64 {
592        unsafe {
593            gst_base::ffi::gst_base_sink_get_max_bitrate(
594                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
595            )
596        }
597    }
598
599    #[doc(alias = "max-lateness")]
600    #[doc(alias = "gst_base_sink_get_max_lateness")]
601    pub fn max_lateness(&self) -> i64 {
602        unsafe {
603            gst_base::ffi::gst_base_sink_get_max_lateness(
604                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
605            )
606        }
607    }
608
609    #[doc(alias = "processing-deadline")]
610    #[cfg(feature = "v1_16")]
611    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
612    #[doc(alias = "gst_base_sink_get_processing_deadline")]
613    pub fn processing_deadline(&self) -> gst::ClockTime {
614        unsafe {
615            try_from_glib(gst_base::ffi::gst_base_sink_get_processing_deadline(
616                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
617            ))
618            .expect("undefined processing_deadline")
619        }
620    }
621
622    #[doc(alias = "render-delay")]
623    #[doc(alias = "gst_base_sink_get_render_delay")]
624    pub fn render_delay(&self) -> gst::ClockTime {
625        unsafe {
626            try_from_glib(gst_base::ffi::gst_base_sink_get_render_delay(
627                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
628            ))
629            .expect("undefined render_delay")
630        }
631    }
632
633    #[cfg(feature = "v1_18")]
634    #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
635    #[doc(alias = "gst_base_sink_get_stats")]
636    pub fn stats(&self) -> gst::Structure {
637        unsafe {
638            from_glib_full(gst_base::ffi::gst_base_sink_get_stats(
639                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
640            ))
641        }
642    }
643
644    #[doc(alias = "sync")]
645    pub fn is_sync(&self) -> bool {
646        unsafe {
647            from_glib(gst_base::ffi::gst_base_sink_get_sync(
648                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
649            ))
650        }
651    }
652
653    #[doc(alias = "throttle-time")]
654    #[doc(alias = "gst_base_sink_get_throttle_time")]
655    pub fn throttle_time(&self) -> u64 {
656        unsafe {
657            gst_base::ffi::gst_base_sink_get_throttle_time(
658                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
659            )
660        }
661    }
662
663    #[doc(alias = "ts-offset")]
664    #[doc(alias = "gst_base_sink_get_ts_offset")]
665    pub fn ts_offset(&self) -> gst::ClockTimeDiff {
666        unsafe {
667            gst_base::ffi::gst_base_sink_get_ts_offset(
668                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
669            )
670        }
671    }
672
673    #[doc(alias = "async")]
674    #[doc(alias = "gst_base_sink_is_async_enabled")]
675    pub fn is_async(&self) -> bool {
676        unsafe {
677            from_glib(gst_base::ffi::gst_base_sink_is_async_enabled(
678                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
679            ))
680        }
681    }
682
683    #[doc(alias = "last-sample")]
684    pub fn enables_last_sample(&self) -> bool {
685        unsafe {
686            from_glib(gst_base::ffi::gst_base_sink_is_last_sample_enabled(
687                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
688            ))
689        }
690    }
691
692    #[doc(alias = "qos")]
693    #[doc(alias = "gst_base_sink_is_qos_enabled")]
694    pub fn is_qos(&self) -> bool {
695        unsafe {
696            from_glib(gst_base::ffi::gst_base_sink_is_qos_enabled(
697                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
698            ))
699        }
700    }
701
702    #[doc(alias = "async")]
703    #[doc(alias = "gst_base_sink_set_async_enabled")]
704    pub fn set_async(&self, enabled: bool) {
705        unsafe {
706            gst_base::ffi::gst_base_sink_set_async_enabled(
707                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
708                enabled.into_glib(),
709            );
710        }
711    }
712
713    #[doc(alias = "drop-out-of-segment")]
714    #[doc(alias = "gst_base_sink_set_drop_out_of_segment")]
715    pub fn set_drop_out_of_segment(&self, drop_out_of_segment: bool) {
716        unsafe {
717            gst_base::ffi::gst_base_sink_set_drop_out_of_segment(
718                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
719                drop_out_of_segment.into_glib(),
720            );
721        }
722    }
723
724    #[doc(alias = "last-sample")]
725    pub fn set_enable_last_sample(&self, enabled: bool) {
726        unsafe {
727            gst_base::ffi::gst_base_sink_set_last_sample_enabled(
728                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
729                enabled.into_glib(),
730            );
731        }
732    }
733
734    #[doc(alias = "max-bitrate")]
735    #[doc(alias = "gst_base_sink_set_max_bitrate")]
736    pub fn set_max_bitrate(&self, max_bitrate: u64) {
737        unsafe {
738            gst_base::ffi::gst_base_sink_set_max_bitrate(
739                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
740                max_bitrate,
741            );
742        }
743    }
744
745    #[doc(alias = "max-lateness")]
746    #[doc(alias = "gst_base_sink_set_max_lateness")]
747    pub fn set_max_lateness(&self, max_lateness: i64) {
748        unsafe {
749            gst_base::ffi::gst_base_sink_set_max_lateness(
750                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
751                max_lateness,
752            );
753        }
754    }
755
756    #[doc(alias = "processing-deadline")]
757    #[cfg(feature = "v1_16")]
758    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
759    #[doc(alias = "gst_base_sink_set_processing_deadline")]
760    pub fn set_processing_deadline(&self, processing_deadline: gst::ClockTime) {
761        unsafe {
762            gst_base::ffi::gst_base_sink_set_processing_deadline(
763                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
764                processing_deadline.into_glib(),
765            );
766        }
767    }
768
769    #[doc(alias = "qos")]
770    #[doc(alias = "gst_base_sink_set_qos_enabled")]
771    pub fn set_qos(&self, enabled: bool) {
772        unsafe {
773            gst_base::ffi::gst_base_sink_set_qos_enabled(
774                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
775                enabled.into_glib(),
776            );
777        }
778    }
779
780    #[doc(alias = "render-delay")]
781    #[doc(alias = "gst_base_sink_set_render_delay")]
782    pub fn set_render_delay(&self, delay: gst::ClockTime) {
783        unsafe {
784            gst_base::ffi::gst_base_sink_set_render_delay(
785                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
786                delay.into_glib(),
787            );
788        }
789    }
790
791    #[doc(alias = "sync")]
792    #[doc(alias = "gst_base_sink_set_sync")]
793    pub fn set_sync(&self, sync: bool) {
794        unsafe {
795            gst_base::ffi::gst_base_sink_set_sync(
796                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
797                sync.into_glib(),
798            );
799        }
800    }
801
802    #[doc(alias = "throttle-time")]
803    #[doc(alias = "gst_base_sink_set_throttle_time")]
804    pub fn set_throttle_time(&self, throttle: u64) {
805        unsafe {
806            gst_base::ffi::gst_base_sink_set_throttle_time(
807                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
808                throttle,
809            );
810        }
811    }
812
813    #[doc(alias = "ts-offset")]
814    #[doc(alias = "gst_base_sink_set_ts_offset")]
815    pub fn set_ts_offset(&self, offset: gst::ClockTimeDiff) {
816        unsafe {
817            gst_base::ffi::gst_base_sink_set_ts_offset(
818                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
819                offset,
820            );
821        }
822    }
823
824    #[doc(alias = "async")]
825    pub fn connect_async_notify<F: Fn(&Self) + Send + Sync + 'static>(
826        &self,
827        f: F,
828    ) -> glib::SignalHandlerId {
829        unsafe extern "C" fn notify_async_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
830            this: *mut ffi::GstAppSink,
831            _param_spec: glib::ffi::gpointer,
832            f: glib::ffi::gpointer,
833        ) {
834            let f: &F = &*(f as *const F);
835            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
836        }
837        unsafe {
838            let f: Box<F> = Box::new(f);
839            glib::signal::connect_raw(
840                self.as_ptr() as *mut _,
841                b"notify::async\0".as_ptr() as *const _,
842                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
843                    notify_async_trampoline::<F> as *const (),
844                )),
845                Box::into_raw(f),
846            )
847        }
848    }
849
850    #[doc(alias = "blocksize")]
851    pub fn connect_blocksize_notify<F: Fn(&Self) + Send + Sync + 'static>(
852        &self,
853        f: F,
854    ) -> glib::SignalHandlerId {
855        unsafe extern "C" fn notify_blocksize_trampoline<
856            F: Fn(&AppSink) + Send + Sync + 'static,
857        >(
858            this: *mut ffi::GstAppSink,
859            _param_spec: glib::ffi::gpointer,
860            f: glib::ffi::gpointer,
861        ) {
862            let f: &F = &*(f as *const F);
863            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
864        }
865        unsafe {
866            let f: Box<F> = Box::new(f);
867            glib::signal::connect_raw(
868                self.as_ptr() as *mut _,
869                b"notify::blocksize\0".as_ptr() as *const _,
870                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
871                    notify_blocksize_trampoline::<F> as *const (),
872                )),
873                Box::into_raw(f),
874            )
875        }
876    }
877
878    #[doc(alias = "enable-last-sample")]
879    pub fn connect_enable_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
880        &self,
881        f: F,
882    ) -> glib::SignalHandlerId {
883        unsafe extern "C" fn notify_enable_last_sample_trampoline<
884            F: Fn(&AppSink) + Send + Sync + 'static,
885        >(
886            this: *mut ffi::GstAppSink,
887            _param_spec: glib::ffi::gpointer,
888            f: glib::ffi::gpointer,
889        ) {
890            let f: &F = &*(f as *const F);
891            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
892        }
893        unsafe {
894            let f: Box<F> = Box::new(f);
895            glib::signal::connect_raw(
896                self.as_ptr() as *mut _,
897                b"notify::enable-last-sample\0".as_ptr() as *const _,
898                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
899                    notify_enable_last_sample_trampoline::<F> as *const (),
900                )),
901                Box::into_raw(f),
902            )
903        }
904    }
905
906    #[doc(alias = "last-sample")]
907    pub fn connect_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
908        &self,
909        f: F,
910    ) -> glib::SignalHandlerId {
911        unsafe extern "C" fn notify_last_sample_trampoline<
912            F: Fn(&AppSink) + Send + Sync + 'static,
913        >(
914            this: *mut ffi::GstAppSink,
915            _param_spec: glib::ffi::gpointer,
916            f: glib::ffi::gpointer,
917        ) {
918            let f: &F = &*(f as *const F);
919            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
920        }
921        unsafe {
922            let f: Box<F> = Box::new(f);
923            glib::signal::connect_raw(
924                self.as_ptr() as *mut _,
925                b"notify::last-sample\0".as_ptr() as *const _,
926                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
927                    notify_last_sample_trampoline::<F> as *const (),
928                )),
929                Box::into_raw(f),
930            )
931        }
932    }
933
934    #[doc(alias = "max-bitrate")]
935    pub fn connect_max_bitrate_notify<F: Fn(&Self) + Send + Sync + 'static>(
936        &self,
937        f: F,
938    ) -> glib::SignalHandlerId {
939        unsafe extern "C" fn notify_max_bitrate_trampoline<
940            F: Fn(&AppSink) + Send + Sync + 'static,
941        >(
942            this: *mut ffi::GstAppSink,
943            _param_spec: glib::ffi::gpointer,
944            f: glib::ffi::gpointer,
945        ) {
946            let f: &F = &*(f as *const F);
947            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
948        }
949        unsafe {
950            let f: Box<F> = Box::new(f);
951            glib::signal::connect_raw(
952                self.as_ptr() as *mut _,
953                b"notify::max-bitrate\0".as_ptr() as *const _,
954                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
955                    notify_max_bitrate_trampoline::<F> as *const (),
956                )),
957                Box::into_raw(f),
958            )
959        }
960    }
961
962    #[doc(alias = "max-lateness")]
963    pub fn connect_max_lateness_notify<F: Fn(&Self) + Send + Sync + 'static>(
964        &self,
965        f: F,
966    ) -> glib::SignalHandlerId {
967        unsafe extern "C" fn notify_max_lateness_trampoline<
968            F: Fn(&AppSink) + Send + Sync + 'static,
969        >(
970            this: *mut ffi::GstAppSink,
971            _param_spec: glib::ffi::gpointer,
972            f: glib::ffi::gpointer,
973        ) {
974            let f: &F = &*(f as *const F);
975            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
976        }
977        unsafe {
978            let f: Box<F> = Box::new(f);
979            glib::signal::connect_raw(
980                self.as_ptr() as *mut _,
981                b"notify::max-lateness\0".as_ptr() as *const _,
982                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
983                    notify_max_lateness_trampoline::<F> as *const (),
984                )),
985                Box::into_raw(f),
986            )
987        }
988    }
989
990    #[cfg(feature = "v1_16")]
991    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
992    #[doc(alias = "processing-deadline")]
993    pub fn connect_processing_deadline_notify<F: Fn(&Self) + Send + Sync + 'static>(
994        &self,
995        f: F,
996    ) -> glib::SignalHandlerId {
997        unsafe extern "C" fn notify_processing_deadline_trampoline<
998            F: Fn(&AppSink) + Send + Sync + 'static,
999        >(
1000            this: *mut ffi::GstAppSink,
1001            _param_spec: glib::ffi::gpointer,
1002            f: glib::ffi::gpointer,
1003        ) {
1004            let f: &F = &*(f as *const F);
1005            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1006        }
1007        unsafe {
1008            let f: Box<F> = Box::new(f);
1009            glib::signal::connect_raw(
1010                self.as_ptr() as *mut _,
1011                b"notify::processing-deadline\0".as_ptr() as *const _,
1012                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1013                    notify_processing_deadline_trampoline::<F> as *const (),
1014                )),
1015                Box::into_raw(f),
1016            )
1017        }
1018    }
1019
1020    #[doc(alias = "qos")]
1021    pub fn connect_qos_notify<F: Fn(&Self) + Send + Sync + 'static>(
1022        &self,
1023        f: F,
1024    ) -> glib::SignalHandlerId {
1025        unsafe extern "C" fn notify_qos_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1026            this: *mut ffi::GstAppSink,
1027            _param_spec: glib::ffi::gpointer,
1028            f: glib::ffi::gpointer,
1029        ) {
1030            let f: &F = &*(f as *const F);
1031            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1032        }
1033        unsafe {
1034            let f: Box<F> = Box::new(f);
1035            glib::signal::connect_raw(
1036                self.as_ptr() as *mut _,
1037                b"notify::qos\0".as_ptr() as *const _,
1038                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1039                    notify_qos_trampoline::<F> as *const (),
1040                )),
1041                Box::into_raw(f),
1042            )
1043        }
1044    }
1045
1046    #[doc(alias = "render-delay")]
1047    pub fn connect_render_delay_notify<F: Fn(&Self) + Send + Sync + 'static>(
1048        &self,
1049        f: F,
1050    ) -> glib::SignalHandlerId {
1051        unsafe extern "C" fn notify_render_delay_trampoline<
1052            F: Fn(&AppSink) + Send + Sync + 'static,
1053        >(
1054            this: *mut ffi::GstAppSink,
1055            _param_spec: glib::ffi::gpointer,
1056            f: glib::ffi::gpointer,
1057        ) {
1058            let f: &F = &*(f as *const F);
1059            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1060        }
1061        unsafe {
1062            let f: Box<F> = Box::new(f);
1063            glib::signal::connect_raw(
1064                self.as_ptr() as *mut _,
1065                b"notify::render-delay\0".as_ptr() as *const _,
1066                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1067                    notify_render_delay_trampoline::<F> as *const (),
1068                )),
1069                Box::into_raw(f),
1070            )
1071        }
1072    }
1073
1074    #[cfg(feature = "v1_18")]
1075    #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
1076    #[doc(alias = "stats")]
1077    pub fn connect_stats_notify<F: Fn(&Self) + Send + Sync + 'static>(
1078        &self,
1079        f: F,
1080    ) -> glib::SignalHandlerId {
1081        unsafe extern "C" fn notify_stats_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1082            this: *mut ffi::GstAppSink,
1083            _param_spec: glib::ffi::gpointer,
1084            f: glib::ffi::gpointer,
1085        ) {
1086            let f: &F = &*(f as *const F);
1087            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1088        }
1089        unsafe {
1090            let f: Box<F> = Box::new(f);
1091            glib::signal::connect_raw(
1092                self.as_ptr() as *mut _,
1093                b"notify::stats\0".as_ptr() as *const _,
1094                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1095                    notify_stats_trampoline::<F> as *const (),
1096                )),
1097                Box::into_raw(f),
1098            )
1099        }
1100    }
1101
1102    #[doc(alias = "sync")]
1103    pub fn connect_sync_notify<F: Fn(&Self) + Send + Sync + 'static>(
1104        &self,
1105        f: F,
1106    ) -> glib::SignalHandlerId {
1107        unsafe extern "C" fn notify_sync_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1108            this: *mut ffi::GstAppSink,
1109            _param_spec: glib::ffi::gpointer,
1110            f: glib::ffi::gpointer,
1111        ) {
1112            let f: &F = &*(f as *const F);
1113            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1114        }
1115        unsafe {
1116            let f: Box<F> = Box::new(f);
1117            glib::signal::connect_raw(
1118                self.as_ptr() as *mut _,
1119                b"notify::sync\0".as_ptr() as *const _,
1120                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1121                    notify_sync_trampoline::<F> as *const (),
1122                )),
1123                Box::into_raw(f),
1124            )
1125        }
1126    }
1127
1128    #[doc(alias = "throttle-time")]
1129    pub fn connect_throttle_time_notify<F: Fn(&Self) + Send + Sync + 'static>(
1130        &self,
1131        f: F,
1132    ) -> glib::SignalHandlerId {
1133        unsafe extern "C" fn notify_throttle_time_trampoline<
1134            F: Fn(&AppSink) + Send + Sync + 'static,
1135        >(
1136            this: *mut ffi::GstAppSink,
1137            _param_spec: glib::ffi::gpointer,
1138            f: glib::ffi::gpointer,
1139        ) {
1140            let f: &F = &*(f as *const F);
1141            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1142        }
1143        unsafe {
1144            let f: Box<F> = Box::new(f);
1145            glib::signal::connect_raw(
1146                self.as_ptr() as *mut _,
1147                b"notify::throttle-time\0".as_ptr() as *const _,
1148                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1149                    notify_throttle_time_trampoline::<F> as *const (),
1150                )),
1151                Box::into_raw(f),
1152            )
1153        }
1154    }
1155
1156    #[doc(alias = "ts-offset")]
1157    pub fn connect_ts_offset_notify<F: Fn(&Self) + Send + Sync + 'static>(
1158        &self,
1159        f: F,
1160    ) -> glib::SignalHandlerId {
1161        unsafe extern "C" fn notify_ts_offset_trampoline<
1162            F: Fn(&AppSink) + Send + Sync + 'static,
1163        >(
1164            this: *mut ffi::GstAppSink,
1165            _param_spec: glib::ffi::gpointer,
1166            f: glib::ffi::gpointer,
1167        ) {
1168            let f: &F = &*(f as *const F);
1169            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1170        }
1171        unsafe {
1172            let f: Box<F> = Box::new(f);
1173            glib::signal::connect_raw(
1174                self.as_ptr() as *mut _,
1175                b"notify::ts-offset\0".as_ptr() as *const _,
1176                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1177                    notify_ts_offset_trampoline::<F> as *const (),
1178                )),
1179                Box::into_raw(f),
1180            )
1181        }
1182    }
1183
1184    pub fn stream(&self) -> AppSinkStream {
1185        AppSinkStream::new(self)
1186    }
1187}
1188
1189// rustdoc-stripper-ignore-next
1190/// A [builder-pattern] type to construct [`AppSink`] objects.
1191///
1192/// [builder-pattern]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html
1193#[must_use = "The builder must be built to be used"]
1194pub struct AppSinkBuilder<'a> {
1195    builder: gst::gobject::GObjectBuilder<'a, AppSink>,
1196    callbacks: Option<AppSinkCallbacks>,
1197    drop_out_of_segment: Option<bool>,
1198}
1199
1200impl<'a> AppSinkBuilder<'a> {
1201    // rustdoc-stripper-ignore-next
1202    /// Build the [`AppSink`].
1203    ///
1204    /// # Panics
1205    ///
1206    /// This panics if the [`AppSink`] doesn't have all the given properties or
1207    /// property values of the wrong type are provided.
1208    #[must_use = "Building the object from the builder is usually expensive and is not expected to have side effects"]
1209    pub fn build(self) -> AppSink {
1210        let appsink = self.builder.build().unwrap();
1211
1212        if let Some(callbacks) = self.callbacks {
1213            appsink.set_callbacks(callbacks);
1214        }
1215
1216        if let Some(drop_out_of_segment) = self.drop_out_of_segment {
1217            appsink.set_drop_out_of_segment(drop_out_of_segment);
1218        }
1219
1220        appsink
1221    }
1222
1223    pub fn async_(self, async_: bool) -> Self {
1224        Self {
1225            builder: self.builder.property("async", async_),
1226            ..self
1227        }
1228    }
1229
1230    pub fn buffer_list(self, buffer_list: bool) -> Self {
1231        Self {
1232            builder: self.builder.property("buffer-list", buffer_list),
1233            ..self
1234        }
1235    }
1236
1237    pub fn callbacks(self, callbacks: AppSinkCallbacks) -> Self {
1238        Self {
1239            callbacks: Some(callbacks),
1240            ..self
1241        }
1242    }
1243
1244    pub fn caps(self, caps: &'a gst::Caps) -> Self {
1245        Self {
1246            builder: self.builder.property("caps", caps),
1247            ..self
1248        }
1249    }
1250
1251    #[cfg_attr(feature = "v1_28", deprecated = "Since 1.28")]
1252    #[allow(deprecated)]
1253    pub fn drop(self, drop: bool) -> Self {
1254        Self {
1255            builder: self.builder.property("drop", drop),
1256            ..self
1257        }
1258    }
1259
1260    pub fn drop_out_of_segment(self, drop_out_of_segment: bool) -> Self {
1261        Self {
1262            builder: self
1263                .builder
1264                .property("drop-out-of-segment", drop_out_of_segment),
1265            ..self
1266        }
1267    }
1268
1269    pub fn enable_last_sample(self, enable_last_sample: bool) -> Self {
1270        Self {
1271            builder: self
1272                .builder
1273                .property("enable-last-sample", enable_last_sample),
1274            ..self
1275        }
1276    }
1277
1278    pub fn max_bitrate(self, max_bitrate: u64) -> Self {
1279        Self {
1280            builder: self.builder.property("max-bitrate", max_bitrate),
1281            ..self
1282        }
1283    }
1284
1285    pub fn max_buffers(self, max_buffers: u32) -> Self {
1286        Self {
1287            builder: self.builder.property("max-buffers", max_buffers),
1288            ..self
1289        }
1290    }
1291
1292    pub fn max_lateness(self, max_lateness: i64) -> Self {
1293        Self {
1294            builder: self.builder.property("max-lateness", max_lateness),
1295            ..self
1296        }
1297    }
1298
1299    #[cfg(feature = "v1_16")]
1300    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
1301    pub fn processing_deadline(self, processing_deadline: gst::ClockTime) -> Self {
1302        Self {
1303            builder: self
1304                .builder
1305                .property("processing-deadline", processing_deadline),
1306            ..self
1307        }
1308    }
1309
1310    pub fn qos(self, qos: bool) -> Self {
1311        Self {
1312            builder: self.builder.property("qos", qos),
1313            ..self
1314        }
1315    }
1316
1317    pub fn render_delay(self, render_delay: Option<gst::ClockTime>) -> Self {
1318        Self {
1319            builder: self.builder.property("render-delay", render_delay),
1320            ..self
1321        }
1322    }
1323
1324    pub fn sync(self, sync: bool) -> Self {
1325        Self {
1326            builder: self.builder.property("sync", sync),
1327            ..self
1328        }
1329    }
1330
1331    pub fn throttle_time(self, throttle_time: u64) -> Self {
1332        Self {
1333            builder: self.builder.property("throttle-time", throttle_time),
1334            ..self
1335        }
1336    }
1337
1338    pub fn ts_offset(self, ts_offset: gst::ClockTimeDiff) -> Self {
1339        Self {
1340            builder: self.builder.property("ts-offset", ts_offset),
1341            ..self
1342        }
1343    }
1344
1345    pub fn wait_on_eos(self, wait_on_eos: bool) -> Self {
1346        Self {
1347            builder: self.builder.property("wait-on-eos", wait_on_eos),
1348            ..self
1349        }
1350    }
1351
1352    #[cfg(feature = "v1_24")]
1353    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1354    pub fn max_time(self, max_time: Option<gst::ClockTime>) -> Self {
1355        Self {
1356            builder: self.builder.property("max-time", max_time),
1357            ..self
1358        }
1359    }
1360
1361    #[cfg(feature = "v1_24")]
1362    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1363    pub fn max_bytes(self, max_bytes: u64) -> Self {
1364        Self {
1365            builder: self.builder.property("max-bytes", max_bytes),
1366            ..self
1367        }
1368    }
1369
1370    #[cfg(feature = "v1_28")]
1371    #[cfg_attr(docsrs, doc(cfg(feature = "v1_28")))]
1372    pub fn leaky_type(self, leaky_type: crate::AppLeakyType) -> Self {
1373        Self {
1374            builder: self.builder.property("leaky-type", leaky_type),
1375            ..self
1376        }
1377    }
1378
1379    #[cfg(feature = "v1_28")]
1380    #[cfg_attr(docsrs, doc(cfg(feature = "v1_28")))]
1381    pub fn silent(self, silent: bool) -> Self {
1382        Self {
1383            builder: self.builder.property("silent", silent),
1384            ..self
1385        }
1386    }
1387
1388    // rustdoc-stripper-ignore-next
1389    /// Sets property `name` to the given value `value`.
1390    ///
1391    /// Overrides any default or previously defined value for `name`.
1392    #[inline]
1393    pub fn property(self, name: &'a str, value: impl Into<glib::Value> + 'a) -> Self {
1394        Self {
1395            builder: self.builder.property(name, value),
1396            ..self
1397        }
1398    }
1399
1400    // rustdoc-stripper-ignore-next
1401    /// Sets property `name` to the given string value `value`.
1402    #[inline]
1403    pub fn property_from_str(self, name: &'a str, value: &'a str) -> Self {
1404        Self {
1405            builder: self.builder.property_from_str(name, value),
1406            ..self
1407        }
1408    }
1409
1410    gst::impl_builder_gvalue_extra_setters!(property_and_name);
1411}
1412
1413#[derive(Debug)]
1414pub struct AppSinkStream {
1415    app_sink: glib::WeakRef<AppSink>,
1416    waker_reference: Arc<Mutex<Option<Waker>>>,
1417}
1418
1419impl AppSinkStream {
1420    fn new(app_sink: &AppSink) -> Self {
1421        skip_assert_initialized!();
1422
1423        let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
1424
1425        app_sink.set_callbacks(
1426            AppSinkCallbacks::builder()
1427                .new_sample({
1428                    let waker_reference = Arc::clone(&waker_reference);
1429
1430                    move |_| {
1431                        if let Some(waker) = waker_reference.lock().unwrap().take() {
1432                            waker.wake();
1433                        }
1434
1435                        Ok(gst::FlowSuccess::Ok)
1436                    }
1437                })
1438                .eos({
1439                    let waker_reference = Arc::clone(&waker_reference);
1440
1441                    move |_| {
1442                        if let Some(waker) = waker_reference.lock().unwrap().take() {
1443                            waker.wake();
1444                        }
1445                    }
1446                })
1447                .build(),
1448        );
1449
1450        Self {
1451            app_sink: app_sink.downgrade(),
1452            waker_reference,
1453        }
1454    }
1455}
1456
1457impl Drop for AppSinkStream {
1458    fn drop(&mut self) {
1459        #[cfg(not(feature = "v1_18"))]
1460        {
1461            // This is not thread-safe before 1.16.3, see
1462            // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
1463            if gst::version() >= (1, 16, 3, 0) {
1464                if let Some(app_sink) = self.app_sink.upgrade() {
1465                    app_sink.set_callbacks(AppSinkCallbacks::builder().build());
1466                }
1467            }
1468        }
1469    }
1470}
1471
1472impl Stream for AppSinkStream {
1473    type Item = gst::Sample;
1474
1475    fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
1476        let mut waker = self.waker_reference.lock().unwrap();
1477
1478        let Some(app_sink) = self.app_sink.upgrade() else {
1479            return Poll::Ready(None);
1480        };
1481
1482        app_sink
1483            .try_pull_sample(gst::ClockTime::ZERO)
1484            .map(|sample| Poll::Ready(Some(sample)))
1485            .unwrap_or_else(|| {
1486                if app_sink.is_eos() {
1487                    return Poll::Ready(None);
1488                }
1489
1490                waker.replace(context.waker().to_owned());
1491
1492                Poll::Pending
1493            })
1494    }
1495}
1496
1497#[cfg(test)]
1498mod tests {
1499    use futures_util::StreamExt;
1500    use gst::prelude::*;
1501
1502    use super::*;
1503
1504    #[test]
1505    fn test_app_sink_stream() {
1506        gst::init().unwrap();
1507
1508        let videotestsrc = gst::ElementFactory::make("videotestsrc")
1509            .property("num-buffers", 5)
1510            .build()
1511            .unwrap();
1512        let appsink = gst::ElementFactory::make("appsink").build().unwrap();
1513
1514        let pipeline = gst::Pipeline::new();
1515        pipeline.add(&videotestsrc).unwrap();
1516        pipeline.add(&appsink).unwrap();
1517
1518        videotestsrc.link(&appsink).unwrap();
1519
1520        let app_sink_stream = appsink.dynamic_cast::<AppSink>().unwrap().stream();
1521        let samples_future = app_sink_stream.collect::<Vec<gst::Sample>>();
1522
1523        pipeline.set_state(gst::State::Playing).unwrap();
1524        let samples = futures_executor::block_on(samples_future);
1525        pipeline.set_state(gst::State::Null).unwrap();
1526
1527        assert_eq!(samples.len(), 5);
1528    }
1529}