gstreamer_app/
app_src.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4    mem, panic,
5    pin::Pin,
6    ptr,
7    sync::{Arc, Mutex},
8    task::{Context, Poll, Waker},
9};
10
11#[cfg(not(panic = "abort"))]
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use futures_sink::Sink;
15use glib::{
16    ffi::{gboolean, gpointer},
17    prelude::*,
18    translate::*,
19};
20
21use crate::{ffi, AppSrc};
22
23#[allow(clippy::type_complexity)]
24pub struct AppSrcCallbacks {
25    need_data: Option<Box<dyn FnMut(&AppSrc, u32) + Send + 'static>>,
26    enough_data: Option<Box<dyn Fn(&AppSrc) + Send + Sync + 'static>>,
27    seek_data: Option<Box<dyn Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>>,
28    #[cfg(not(panic = "abort"))]
29    panicked: AtomicBool,
30    callbacks: ffi::GstAppSrcCallbacks,
31}
32
33unsafe impl Send for AppSrcCallbacks {}
34unsafe impl Sync for AppSrcCallbacks {}
35
36impl AppSrcCallbacks {
37    pub fn builder() -> AppSrcCallbacksBuilder {
38        skip_assert_initialized!();
39
40        AppSrcCallbacksBuilder {
41            need_data: None,
42            enough_data: None,
43            seek_data: None,
44        }
45    }
46}
47
48#[allow(clippy::type_complexity)]
49#[must_use = "The builder must be built to be used"]
50pub struct AppSrcCallbacksBuilder {
51    need_data: Option<Box<dyn FnMut(&AppSrc, u32) + Send + 'static>>,
52    enough_data: Option<Box<dyn Fn(&AppSrc) + Send + Sync + 'static>>,
53    seek_data: Option<Box<dyn Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>>,
54}
55
56impl AppSrcCallbacksBuilder {
57    pub fn need_data<F: FnMut(&AppSrc, u32) + Send + 'static>(self, need_data: F) -> Self {
58        Self {
59            need_data: Some(Box::new(need_data)),
60            ..self
61        }
62    }
63
64    pub fn need_data_if<F: FnMut(&AppSrc, u32) + Send + 'static>(
65        self,
66        need_data: F,
67        predicate: bool,
68    ) -> Self {
69        if predicate {
70            self.need_data(need_data)
71        } else {
72            self
73        }
74    }
75
76    pub fn need_data_if_some<F: FnMut(&AppSrc, u32) + Send + 'static>(
77        self,
78        need_data: Option<F>,
79    ) -> Self {
80        if let Some(need_data) = need_data {
81            self.need_data(need_data)
82        } else {
83            self
84        }
85    }
86
87    pub fn enough_data<F: Fn(&AppSrc) + Send + Sync + 'static>(self, enough_data: F) -> Self {
88        Self {
89            enough_data: Some(Box::new(enough_data)),
90            ..self
91        }
92    }
93
94    pub fn enough_data_if<F: Fn(&AppSrc) + Send + Sync + 'static>(
95        self,
96        enough_data: F,
97        predicate: bool,
98    ) -> Self {
99        if predicate {
100            self.enough_data(enough_data)
101        } else {
102            self
103        }
104    }
105
106    pub fn enough_data_if_some<F: Fn(&AppSrc) + Send + Sync + 'static>(
107        self,
108        enough_data: Option<F>,
109    ) -> Self {
110        if let Some(enough_data) = enough_data {
111            self.enough_data(enough_data)
112        } else {
113            self
114        }
115    }
116
117    pub fn seek_data<F: Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>(
118        self,
119        seek_data: F,
120    ) -> Self {
121        Self {
122            seek_data: Some(Box::new(seek_data)),
123            ..self
124        }
125    }
126
127    pub fn seek_data_if<F: Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>(
128        self,
129        seek_data: F,
130        predicate: bool,
131    ) -> Self {
132        if predicate {
133            self.seek_data(seek_data)
134        } else {
135            self
136        }
137    }
138
139    pub fn seek_data_if_some<F: Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>(
140        self,
141        seek_data: Option<F>,
142    ) -> Self {
143        if let Some(seek_data) = seek_data {
144            self.seek_data(seek_data)
145        } else {
146            self
147        }
148    }
149
150    #[must_use = "Building the callbacks without using them has no effect"]
151    pub fn build(self) -> AppSrcCallbacks {
152        let have_need_data = self.need_data.is_some();
153        let have_enough_data = self.enough_data.is_some();
154        let have_seek_data = self.seek_data.is_some();
155
156        AppSrcCallbacks {
157            need_data: self.need_data,
158            enough_data: self.enough_data,
159            seek_data: self.seek_data,
160            #[cfg(not(panic = "abort"))]
161            panicked: AtomicBool::new(false),
162            callbacks: ffi::GstAppSrcCallbacks {
163                need_data: if have_need_data {
164                    Some(trampoline_need_data)
165                } else {
166                    None
167                },
168                enough_data: if have_enough_data {
169                    Some(trampoline_enough_data)
170                } else {
171                    None
172                },
173                seek_data: if have_seek_data {
174                    Some(trampoline_seek_data)
175                } else {
176                    None
177                },
178                _gst_reserved: [
179                    ptr::null_mut(),
180                    ptr::null_mut(),
181                    ptr::null_mut(),
182                    ptr::null_mut(),
183                ],
184            },
185        }
186    }
187}
188
189unsafe extern "C" fn trampoline_need_data(
190    appsrc: *mut ffi::GstAppSrc,
191    length: u32,
192    callbacks: gpointer,
193) {
194    let callbacks = callbacks as *mut AppSrcCallbacks;
195    let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
196
197    #[cfg(not(panic = "abort"))]
198    if (*callbacks).panicked.load(Ordering::Relaxed) {
199        let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
200        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
201        return;
202    }
203
204    if let Some(ref mut need_data) = (*callbacks).need_data {
205        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| need_data(&element, length)));
206        match result {
207            Ok(result) => result,
208            Err(err) => {
209                #[cfg(panic = "abort")]
210                {
211                    unreachable!("{err:?}");
212                }
213                #[cfg(not(panic = "abort"))]
214                {
215                    (*callbacks).panicked.store(true, Ordering::Relaxed);
216                    gst::subclass::post_panic_error_message(
217                        element.upcast_ref(),
218                        element.upcast_ref(),
219                        Some(err),
220                    );
221                }
222            }
223        }
224    }
225}
226
227unsafe extern "C" fn trampoline_enough_data(appsrc: *mut ffi::GstAppSrc, callbacks: gpointer) {
228    let callbacks = callbacks as *const AppSrcCallbacks;
229    let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
230
231    #[cfg(not(panic = "abort"))]
232    if (*callbacks).panicked.load(Ordering::Relaxed) {
233        let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
234        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
235        return;
236    }
237
238    if let Some(ref enough_data) = (*callbacks).enough_data {
239        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| enough_data(&element)));
240        match result {
241            Ok(result) => result,
242            Err(err) => {
243                #[cfg(panic = "abort")]
244                {
245                    unreachable!("{err:?}");
246                }
247                #[cfg(not(panic = "abort"))]
248                {
249                    (*callbacks).panicked.store(true, Ordering::Relaxed);
250                    gst::subclass::post_panic_error_message(
251                        element.upcast_ref(),
252                        element.upcast_ref(),
253                        Some(err),
254                    );
255                }
256            }
257        }
258    }
259}
260
261unsafe extern "C" fn trampoline_seek_data(
262    appsrc: *mut ffi::GstAppSrc,
263    offset: u64,
264    callbacks: gpointer,
265) -> gboolean {
266    let callbacks = callbacks as *const AppSrcCallbacks;
267    let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
268
269    #[cfg(not(panic = "abort"))]
270    if (*callbacks).panicked.load(Ordering::Relaxed) {
271        let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
272        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
273        return false.into_glib();
274    }
275
276    let ret = if let Some(ref seek_data) = (*callbacks).seek_data {
277        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| seek_data(&element, offset)));
278        match result {
279            Ok(result) => result,
280            Err(err) => {
281                #[cfg(panic = "abort")]
282                {
283                    unreachable!("{err:?}");
284                }
285                #[cfg(not(panic = "abort"))]
286                {
287                    (*callbacks).panicked.store(true, Ordering::Relaxed);
288                    gst::subclass::post_panic_error_message(
289                        element.upcast_ref(),
290                        element.upcast_ref(),
291                        Some(err),
292                    );
293
294                    false
295                }
296            }
297        }
298    } else {
299        false
300    };
301
302    ret.into_glib()
303}
304
305unsafe extern "C" fn destroy_callbacks(ptr: gpointer) {
306    let _ = Box::<AppSrcCallbacks>::from_raw(ptr as *mut _);
307}
308
309impl AppSrc {
310    // rustdoc-stripper-ignore-next
311    /// Creates a new builder-pattern struct instance to construct [`AppSrc`] objects.
312    ///
313    /// This method returns an instance of [`AppSrcBuilder`](crate::builders::AppSrcBuilder) which can be used to create [`AppSrc`] objects.
314    pub fn builder<'a>() -> AppSrcBuilder<'a> {
315        assert_initialized_main_thread!();
316        AppSrcBuilder {
317            builder: gst::Object::builder(),
318            callbacks: None,
319            automatic_eos: None,
320        }
321    }
322
323    /// Set callbacks which will be executed when data is needed, enough data has
324    /// been collected or when a seek should be performed.
325    /// This is an alternative to using the signals, it has lower overhead and is thus
326    /// less expensive, but also less flexible.
327    ///
328    /// If callbacks are installed, no signals will be emitted for performance
329    /// reasons.
330    ///
331    /// Before 1.16.3 it was not possible to change the callbacks in a thread-safe
332    /// way.
333    /// ## `callbacks`
334    /// the callbacks
335    /// ## `notify`
336    /// a destroy notify function
337    #[doc(alias = "gst_app_src_set_callbacks")]
338    pub fn set_callbacks(&self, callbacks: AppSrcCallbacks) {
339        unsafe {
340            let src = self.to_glib_none().0;
341            #[allow(clippy::manual_dangling_ptr)]
342            #[cfg(not(feature = "v1_18"))]
343            {
344                static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
345                    std::sync::OnceLock::new();
346
347                let set_once_quark = SET_ONCE_QUARK
348                    .get_or_init(|| glib::Quark::from_str("gstreamer-rs-app-src-callbacks"));
349
350                // This is not thread-safe before 1.16.3, see
351                // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
352                if gst::version() < (1, 16, 3, 0) {
353                    if !glib::gobject_ffi::g_object_get_qdata(
354                        src as *mut _,
355                        set_once_quark.into_glib(),
356                    )
357                    .is_null()
358                    {
359                        panic!("AppSrc callbacks can only be set once");
360                    }
361
362                    glib::gobject_ffi::g_object_set_qdata(
363                        src as *mut _,
364                        set_once_quark.into_glib(),
365                        1 as *mut _,
366                    );
367                }
368            }
369
370            ffi::gst_app_src_set_callbacks(
371                src,
372                mut_override(&callbacks.callbacks),
373                Box::into_raw(Box::new(callbacks)) as *mut _,
374                Some(destroy_callbacks),
375            );
376        }
377    }
378
379    /// Configure the `min` and `max` latency in `src`. If `min` is set to -1, the
380    /// default latency calculations for pseudo-live sources will be used.
381    /// ## `min`
382    /// the min latency
383    /// ## `max`
384    /// the max latency
385    #[doc(alias = "gst_app_src_set_latency")]
386    pub fn set_latency(
387        &self,
388        min: impl Into<Option<gst::ClockTime>>,
389        max: impl Into<Option<gst::ClockTime>>,
390    ) {
391        unsafe {
392            ffi::gst_app_src_set_latency(
393                self.to_glib_none().0,
394                min.into().into_glib(),
395                max.into().into_glib(),
396            );
397        }
398    }
399
400    /// Retrieve the min and max latencies in `min` and `max` respectively.
401    ///
402    /// # Returns
403    ///
404    ///
405    /// ## `min`
406    /// the min latency
407    ///
408    /// ## `max`
409    /// the max latency
410    #[doc(alias = "get_latency")]
411    #[doc(alias = "gst_app_src_get_latency")]
412    pub fn latency(&self) -> (Option<gst::ClockTime>, Option<gst::ClockTime>) {
413        unsafe {
414            let mut min = mem::MaybeUninit::uninit();
415            let mut max = mem::MaybeUninit::uninit();
416            ffi::gst_app_src_get_latency(self.to_glib_none().0, min.as_mut_ptr(), max.as_mut_ptr());
417            (from_glib(min.assume_init()), from_glib(max.assume_init()))
418        }
419    }
420
421    #[doc(alias = "do-timestamp")]
422    #[doc(alias = "gst_base_src_set_do_timestamp")]
423    pub fn set_do_timestamp(&self, timestamp: bool) {
424        unsafe {
425            gst_base::ffi::gst_base_src_set_do_timestamp(
426                self.as_ptr() as *mut gst_base::ffi::GstBaseSrc,
427                timestamp.into_glib(),
428            );
429        }
430    }
431
432    #[doc(alias = "do-timestamp")]
433    #[doc(alias = "gst_base_src_get_do_timestamp")]
434    pub fn do_timestamp(&self) -> bool {
435        unsafe {
436            from_glib(gst_base::ffi::gst_base_src_get_do_timestamp(
437                self.as_ptr() as *mut gst_base::ffi::GstBaseSrc
438            ))
439        }
440    }
441
442    #[doc(alias = "do-timestamp")]
443    pub fn connect_do_timestamp_notify<F: Fn(&Self) + Send + Sync + 'static>(
444        &self,
445        f: F,
446    ) -> glib::SignalHandlerId {
447        unsafe extern "C" fn notify_do_timestamp_trampoline<
448            F: Fn(&AppSrc) + Send + Sync + 'static,
449        >(
450            this: *mut ffi::GstAppSrc,
451            _param_spec: glib::ffi::gpointer,
452            f: glib::ffi::gpointer,
453        ) {
454            let f: &F = &*(f as *const F);
455            f(&AppSrc::from_glib_borrow(this))
456        }
457        unsafe {
458            let f: Box<F> = Box::new(f);
459            glib::signal::connect_raw(
460                self.as_ptr() as *mut _,
461                b"notify::do-timestamp\0".as_ptr() as *const _,
462                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
463                    notify_do_timestamp_trampoline::<F> as *const (),
464                )),
465                Box::into_raw(f),
466            )
467        }
468    }
469
470    #[doc(alias = "set-automatic-eos")]
471    #[doc(alias = "gst_base_src_set_automatic_eos")]
472    pub fn set_automatic_eos(&self, automatic_eos: bool) {
473        unsafe {
474            gst_base::ffi::gst_base_src_set_automatic_eos(
475                self.as_ptr() as *mut gst_base::ffi::GstBaseSrc,
476                automatic_eos.into_glib(),
477            );
478        }
479    }
480
481    pub fn sink(&self) -> AppSrcSink {
482        AppSrcSink::new(self)
483    }
484}
485
486// rustdoc-stripper-ignore-next
487/// A [builder-pattern] type to construct [`AppSrc`] objects.
488///
489/// [builder-pattern]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html
490#[must_use = "The builder must be built to be used"]
491pub struct AppSrcBuilder<'a> {
492    builder: gst::gobject::GObjectBuilder<'a, AppSrc>,
493    callbacks: Option<AppSrcCallbacks>,
494    automatic_eos: Option<bool>,
495}
496
497impl<'a> AppSrcBuilder<'a> {
498    // rustdoc-stripper-ignore-next
499    /// Build the [`AppSrc`].
500    ///
501    /// # Panics
502    ///
503    /// This panics if the [`AppSrc`] doesn't have all the given properties or
504    /// property values of the wrong type are provided.
505    #[must_use = "Building the object from the builder is usually expensive and is not expected to have side effects"]
506    pub fn build(self) -> AppSrc {
507        let appsrc = self.builder.build().unwrap();
508
509        if let Some(callbacks) = self.callbacks {
510            appsrc.set_callbacks(callbacks);
511        }
512
513        if let Some(automatic_eos) = self.automatic_eos {
514            appsrc.set_automatic_eos(automatic_eos);
515        }
516
517        appsrc
518    }
519
520    pub fn automatic_eos(self, automatic_eos: bool) -> Self {
521        Self {
522            automatic_eos: Some(automatic_eos),
523            ..self
524        }
525    }
526
527    pub fn block(self, block: bool) -> Self {
528        Self {
529            builder: self.builder.property("block", block),
530            ..self
531        }
532    }
533
534    pub fn callbacks(self, callbacks: AppSrcCallbacks) -> Self {
535        Self {
536            callbacks: Some(callbacks),
537            ..self
538        }
539    }
540
541    pub fn caps(self, caps: &'a gst::Caps) -> Self {
542        Self {
543            builder: self.builder.property("caps", caps),
544            ..self
545        }
546    }
547
548    pub fn do_timestamp(self, do_timestamp: bool) -> Self {
549        Self {
550            builder: self.builder.property("do-timestamp", do_timestamp),
551            ..self
552        }
553    }
554
555    pub fn duration(self, duration: u64) -> Self {
556        Self {
557            builder: self.builder.property("duration", duration),
558            ..self
559        }
560    }
561
562    pub fn format(self, format: gst::Format) -> Self {
563        Self {
564            builder: self.builder.property("format", format),
565            ..self
566        }
567    }
568
569    #[cfg(feature = "v1_18")]
570    #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
571    pub fn handle_segment_change(self, handle_segment_change: bool) -> Self {
572        Self {
573            builder: self
574                .builder
575                .property("handle-segment-change", handle_segment_change),
576            ..self
577        }
578    }
579
580    pub fn is_live(self, is_live: bool) -> Self {
581        Self {
582            builder: self.builder.property("is-live", is_live),
583            ..self
584        }
585    }
586
587    #[cfg(feature = "v1_20")]
588    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
589    pub fn leaky_type(self, leaky_type: crate::AppLeakyType) -> Self {
590        Self {
591            builder: self.builder.property("leaky-type", leaky_type),
592            ..self
593        }
594    }
595
596    #[cfg(feature = "v1_20")]
597    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
598    pub fn max_buffers(self, max_buffers: u64) -> Self {
599        Self {
600            builder: self.builder.property("max-buffers", max_buffers),
601            ..self
602        }
603    }
604
605    pub fn max_bytes(self, max_bytes: u64) -> Self {
606        Self {
607            builder: self.builder.property("max-bytes", max_bytes),
608            ..self
609        }
610    }
611
612    pub fn max_latency(self, max_latency: i64) -> Self {
613        Self {
614            builder: self.builder.property("max-latency", max_latency),
615            ..self
616        }
617    }
618
619    #[cfg(feature = "v1_20")]
620    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
621    pub fn max_time(self, max_time: gst::ClockTime) -> Self {
622        Self {
623            builder: self.builder.property("max-time", max_time),
624            ..self
625        }
626    }
627
628    pub fn min_latency(self, min_latency: i64) -> Self {
629        Self {
630            builder: self.builder.property("min-latency", min_latency),
631            ..self
632        }
633    }
634
635    pub fn min_percent(self, min_percent: u32) -> Self {
636        Self {
637            builder: self.builder.property("min-percent", min_percent),
638            ..self
639        }
640    }
641
642    pub fn size(self, size: i64) -> Self {
643        Self {
644            builder: self.builder.property("size", size),
645            ..self
646        }
647    }
648
649    pub fn stream_type(self, stream_type: crate::AppStreamType) -> Self {
650        Self {
651            builder: self.builder.property("stream-type", stream_type),
652            ..self
653        }
654    }
655
656    #[cfg(feature = "v1_28")]
657    #[cfg_attr(docsrs, doc(cfg(feature = "v1_28")))]
658    pub fn silent(self, silent: bool) -> Self {
659        Self {
660            builder: self.builder.property("silent", silent),
661            ..self
662        }
663    }
664
665    // rustdoc-stripper-ignore-next
666    /// Sets property `name` to the given value `value`.
667    ///
668    /// Overrides any default or previously defined value for `name`.
669    #[inline]
670    pub fn property(self, name: &'a str, value: impl Into<glib::Value> + 'a) -> Self {
671        Self {
672            builder: self.builder.property(name, value),
673            ..self
674        }
675    }
676
677    // rustdoc-stripper-ignore-next
678    /// Sets property `name` to the given string value `value`.
679    #[inline]
680    pub fn property_from_str(self, name: &'a str, value: &'a str) -> Self {
681        Self {
682            builder: self.builder.property_from_str(name, value),
683            ..self
684        }
685    }
686
687    gst::impl_builder_gvalue_extra_setters!(property_and_name);
688}
689
690#[derive(Debug)]
691pub struct AppSrcSink {
692    app_src: glib::WeakRef<AppSrc>,
693    waker_reference: Arc<Mutex<Option<Waker>>>,
694}
695
696impl AppSrcSink {
697    fn new(app_src: &AppSrc) -> Self {
698        skip_assert_initialized!();
699
700        let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
701
702        app_src.set_callbacks(
703            AppSrcCallbacks::builder()
704                .need_data({
705                    let waker_reference = Arc::clone(&waker_reference);
706
707                    move |_, _| {
708                        if let Some(waker) = waker_reference.lock().unwrap().take() {
709                            waker.wake();
710                        }
711                    }
712                })
713                .build(),
714        );
715
716        Self {
717            app_src: app_src.downgrade(),
718            waker_reference,
719        }
720    }
721}
722
723impl Drop for AppSrcSink {
724    fn drop(&mut self) {
725        #[cfg(not(feature = "v1_18"))]
726        {
727            // This is not thread-safe before 1.16.3, see
728            // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
729            if gst::version() >= (1, 16, 3, 0) {
730                if let Some(app_src) = self.app_src.upgrade() {
731                    app_src.set_callbacks(AppSrcCallbacks::builder().build());
732                }
733            }
734        }
735    }
736}
737
738impl Sink<gst::Sample> for AppSrcSink {
739    type Error = gst::FlowError;
740
741    fn poll_ready(self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<(), Self::Error>> {
742        let mut waker = self.waker_reference.lock().unwrap();
743
744        let Some(app_src) = self.app_src.upgrade() else {
745            return Poll::Ready(Err(gst::FlowError::Eos));
746        };
747
748        let current_level_bytes = app_src.current_level_bytes();
749        let max_bytes = app_src.max_bytes();
750
751        if current_level_bytes >= max_bytes && max_bytes != 0 {
752            waker.replace(context.waker().to_owned());
753
754            Poll::Pending
755        } else {
756            Poll::Ready(Ok(()))
757        }
758    }
759
760    fn start_send(self: Pin<&mut Self>, sample: gst::Sample) -> Result<(), Self::Error> {
761        let Some(app_src) = self.app_src.upgrade() else {
762            return Err(gst::FlowError::Eos);
763        };
764
765        app_src.push_sample(&sample)?;
766
767        Ok(())
768    }
769
770    fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
771        Poll::Ready(Ok(()))
772    }
773
774    fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
775        let Some(app_src) = self.app_src.upgrade() else {
776            return Poll::Ready(Ok(()));
777        };
778
779        app_src.end_of_stream()?;
780
781        Poll::Ready(Ok(()))
782    }
783}
784
785#[cfg(test)]
786mod tests {
787    use std::sync::atomic::{AtomicUsize, Ordering};
788
789    use futures_util::{sink::SinkExt, stream::StreamExt};
790    use gst::prelude::*;
791
792    use super::*;
793
794    #[test]
795    fn test_app_src_sink() {
796        gst::init().unwrap();
797
798        let appsrc = gst::ElementFactory::make("appsrc").build().unwrap();
799        let fakesink = gst::ElementFactory::make("fakesink")
800            .property("signal-handoffs", true)
801            .build()
802            .unwrap();
803
804        let pipeline = gst::Pipeline::new();
805        pipeline.add(&appsrc).unwrap();
806        pipeline.add(&fakesink).unwrap();
807
808        appsrc.link(&fakesink).unwrap();
809
810        let mut bus_stream = pipeline.bus().unwrap().stream();
811        let mut app_src_sink = appsrc.dynamic_cast::<AppSrc>().unwrap().sink();
812
813        let sample_quantity = 5;
814
815        let samples = (0..sample_quantity)
816            .map(|_| gst::Sample::builder().buffer(&gst::Buffer::new()).build())
817            .collect::<Vec<gst::Sample>>();
818
819        let mut sample_stream = futures_util::stream::iter(samples).map(Ok);
820
821        let handoff_count_reference = Arc::new(AtomicUsize::new(0));
822
823        fakesink.connect("handoff", false, {
824            let handoff_count_reference = Arc::clone(&handoff_count_reference);
825
826            move |_| {
827                handoff_count_reference.fetch_add(1, Ordering::AcqRel);
828
829                None
830            }
831        });
832
833        pipeline.set_state(gst::State::Playing).unwrap();
834
835        futures_executor::block_on(app_src_sink.send_all(&mut sample_stream)).unwrap();
836        futures_executor::block_on(app_src_sink.close()).unwrap();
837
838        while let Some(message) = futures_executor::block_on(bus_stream.next()) {
839            match message.view() {
840                gst::MessageView::Eos(_) => break,
841                gst::MessageView::Error(_) => unreachable!(),
842                _ => continue,
843            }
844        }
845
846        pipeline.set_state(gst::State::Null).unwrap();
847
848        assert_eq!(
849            handoff_count_reference.load(Ordering::Acquire),
850            sample_quantity
851        );
852    }
853
854    #[test]
855    fn builder_caps_lt() {
856        gst::init().unwrap();
857
858        let caps = &gst::Caps::new_any();
859        {
860            let stream_type = "random-access".to_owned();
861            let appsrc = AppSrc::builder()
862                .property_from_str("stream-type", &stream_type)
863                .caps(caps)
864                .build();
865            assert_eq!(
866                appsrc.property::<crate::AppStreamType>("stream-type"),
867                crate::AppStreamType::RandomAccess
868            );
869            assert!(appsrc.property::<gst::Caps>("caps").is_any());
870        }
871
872        let stream_type = &"random-access".to_owned();
873        {
874            let caps = &gst::Caps::new_any();
875            let appsrc = AppSrc::builder()
876                .property_from_str("stream-type", stream_type)
877                .caps(caps)
878                .build();
879            assert_eq!(
880                appsrc.property::<crate::AppStreamType>("stream-type"),
881                crate::AppStreamType::RandomAccess
882            );
883            assert!(appsrc.property::<gst::Caps>("caps").is_any());
884        }
885    }
886}