1use std::{
4 mem, panic,
5 pin::Pin,
6 ptr,
7 sync::{Arc, Mutex},
8 task::{Context, Poll, Waker},
9};
10
11#[cfg(not(panic = "abort"))]
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use futures_sink::Sink;
15use glib::{
16 ffi::{gboolean, gpointer},
17 prelude::*,
18 translate::*,
19};
20
21use crate::{ffi, AppSrc};
22
23#[allow(clippy::type_complexity)]
24pub struct AppSrcCallbacks {
25 need_data: Option<Box<dyn FnMut(&AppSrc, u32) + Send + 'static>>,
26 enough_data: Option<Box<dyn Fn(&AppSrc) + Send + Sync + 'static>>,
27 seek_data: Option<Box<dyn Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>>,
28 #[cfg(not(panic = "abort"))]
29 panicked: AtomicBool,
30 callbacks: ffi::GstAppSrcCallbacks,
31}
32
33unsafe impl Send for AppSrcCallbacks {}
34unsafe impl Sync for AppSrcCallbacks {}
35
36impl AppSrcCallbacks {
37 pub fn builder() -> AppSrcCallbacksBuilder {
38 skip_assert_initialized!();
39
40 AppSrcCallbacksBuilder {
41 need_data: None,
42 enough_data: None,
43 seek_data: None,
44 }
45 }
46}
47
48#[allow(clippy::type_complexity)]
49#[must_use = "The builder must be built to be used"]
50pub struct AppSrcCallbacksBuilder {
51 need_data: Option<Box<dyn FnMut(&AppSrc, u32) + Send + 'static>>,
52 enough_data: Option<Box<dyn Fn(&AppSrc) + Send + Sync + 'static>>,
53 seek_data: Option<Box<dyn Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>>,
54}
55
56impl AppSrcCallbacksBuilder {
57 pub fn need_data<F: FnMut(&AppSrc, u32) + Send + 'static>(self, need_data: F) -> Self {
58 Self {
59 need_data: Some(Box::new(need_data)),
60 ..self
61 }
62 }
63
64 pub fn need_data_if<F: FnMut(&AppSrc, u32) + Send + 'static>(
65 self,
66 need_data: F,
67 predicate: bool,
68 ) -> Self {
69 if predicate {
70 self.need_data(need_data)
71 } else {
72 self
73 }
74 }
75
76 pub fn need_data_if_some<F: FnMut(&AppSrc, u32) + Send + 'static>(
77 self,
78 need_data: Option<F>,
79 ) -> Self {
80 if let Some(need_data) = need_data {
81 self.need_data(need_data)
82 } else {
83 self
84 }
85 }
86
87 pub fn enough_data<F: Fn(&AppSrc) + Send + Sync + 'static>(self, enough_data: F) -> Self {
88 Self {
89 enough_data: Some(Box::new(enough_data)),
90 ..self
91 }
92 }
93
94 pub fn enough_data_if<F: Fn(&AppSrc) + Send + Sync + 'static>(
95 self,
96 enough_data: F,
97 predicate: bool,
98 ) -> Self {
99 if predicate {
100 self.enough_data(enough_data)
101 } else {
102 self
103 }
104 }
105
106 pub fn enough_data_if_some<F: Fn(&AppSrc) + Send + Sync + 'static>(
107 self,
108 enough_data: Option<F>,
109 ) -> Self {
110 if let Some(enough_data) = enough_data {
111 self.enough_data(enough_data)
112 } else {
113 self
114 }
115 }
116
117 pub fn seek_data<F: Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>(
118 self,
119 seek_data: F,
120 ) -> Self {
121 Self {
122 seek_data: Some(Box::new(seek_data)),
123 ..self
124 }
125 }
126
127 pub fn seek_data_if<F: Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>(
128 self,
129 seek_data: F,
130 predicate: bool,
131 ) -> Self {
132 if predicate {
133 self.seek_data(seek_data)
134 } else {
135 self
136 }
137 }
138
139 pub fn seek_data_if_some<F: Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>(
140 self,
141 seek_data: Option<F>,
142 ) -> Self {
143 if let Some(seek_data) = seek_data {
144 self.seek_data(seek_data)
145 } else {
146 self
147 }
148 }
149
150 #[must_use = "Building the callbacks without using them has no effect"]
151 pub fn build(self) -> AppSrcCallbacks {
152 let have_need_data = self.need_data.is_some();
153 let have_enough_data = self.enough_data.is_some();
154 let have_seek_data = self.seek_data.is_some();
155
156 AppSrcCallbacks {
157 need_data: self.need_data,
158 enough_data: self.enough_data,
159 seek_data: self.seek_data,
160 #[cfg(not(panic = "abort"))]
161 panicked: AtomicBool::new(false),
162 callbacks: ffi::GstAppSrcCallbacks {
163 need_data: if have_need_data {
164 Some(trampoline_need_data)
165 } else {
166 None
167 },
168 enough_data: if have_enough_data {
169 Some(trampoline_enough_data)
170 } else {
171 None
172 },
173 seek_data: if have_seek_data {
174 Some(trampoline_seek_data)
175 } else {
176 None
177 },
178 _gst_reserved: [
179 ptr::null_mut(),
180 ptr::null_mut(),
181 ptr::null_mut(),
182 ptr::null_mut(),
183 ],
184 },
185 }
186 }
187}
188
189unsafe extern "C" fn trampoline_need_data(
190 appsrc: *mut ffi::GstAppSrc,
191 length: u32,
192 callbacks: gpointer,
193) {
194 let callbacks = callbacks as *mut AppSrcCallbacks;
195 let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
196
197 #[cfg(not(panic = "abort"))]
198 if (*callbacks).panicked.load(Ordering::Relaxed) {
199 let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
200 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
201 return;
202 }
203
204 if let Some(ref mut need_data) = (*callbacks).need_data {
205 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| need_data(&element, length)));
206 match result {
207 Ok(result) => result,
208 Err(err) => {
209 #[cfg(panic = "abort")]
210 {
211 unreachable!("{err:?}");
212 }
213 #[cfg(not(panic = "abort"))]
214 {
215 (*callbacks).panicked.store(true, Ordering::Relaxed);
216 gst::subclass::post_panic_error_message(
217 element.upcast_ref(),
218 element.upcast_ref(),
219 Some(err),
220 );
221 }
222 }
223 }
224 }
225}
226
227unsafe extern "C" fn trampoline_enough_data(appsrc: *mut ffi::GstAppSrc, callbacks: gpointer) {
228 let callbacks = callbacks as *const AppSrcCallbacks;
229 let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
230
231 #[cfg(not(panic = "abort"))]
232 if (*callbacks).panicked.load(Ordering::Relaxed) {
233 let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
234 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
235 return;
236 }
237
238 if let Some(ref enough_data) = (*callbacks).enough_data {
239 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| enough_data(&element)));
240 match result {
241 Ok(result) => result,
242 Err(err) => {
243 #[cfg(panic = "abort")]
244 {
245 unreachable!("{err:?}");
246 }
247 #[cfg(not(panic = "abort"))]
248 {
249 (*callbacks).panicked.store(true, Ordering::Relaxed);
250 gst::subclass::post_panic_error_message(
251 element.upcast_ref(),
252 element.upcast_ref(),
253 Some(err),
254 );
255 }
256 }
257 }
258 }
259}
260
261unsafe extern "C" fn trampoline_seek_data(
262 appsrc: *mut ffi::GstAppSrc,
263 offset: u64,
264 callbacks: gpointer,
265) -> gboolean {
266 let callbacks = callbacks as *const AppSrcCallbacks;
267 let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
268
269 #[cfg(not(panic = "abort"))]
270 if (*callbacks).panicked.load(Ordering::Relaxed) {
271 let element: Borrowed<AppSrc> = from_glib_borrow(appsrc);
272 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
273 return false.into_glib();
274 }
275
276 let ret = if let Some(ref seek_data) = (*callbacks).seek_data {
277 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| seek_data(&element, offset)));
278 match result {
279 Ok(result) => result,
280 Err(err) => {
281 #[cfg(panic = "abort")]
282 {
283 unreachable!("{err:?}");
284 }
285 #[cfg(not(panic = "abort"))]
286 {
287 (*callbacks).panicked.store(true, Ordering::Relaxed);
288 gst::subclass::post_panic_error_message(
289 element.upcast_ref(),
290 element.upcast_ref(),
291 Some(err),
292 );
293
294 false
295 }
296 }
297 }
298 } else {
299 false
300 };
301
302 ret.into_glib()
303}
304
305unsafe extern "C" fn destroy_callbacks(ptr: gpointer) {
306 let _ = Box::<AppSrcCallbacks>::from_raw(ptr as *mut _);
307}
308
309impl AppSrc {
310 pub fn builder<'a>() -> AppSrcBuilder<'a> {
315 assert_initialized_main_thread!();
316 AppSrcBuilder {
317 builder: gst::Object::builder(),
318 callbacks: None,
319 automatic_eos: None,
320 }
321 }
322
323 #[doc(alias = "gst_app_src_set_callbacks")]
338 pub fn set_callbacks(&self, callbacks: AppSrcCallbacks) {
339 unsafe {
340 let src = self.to_glib_none().0;
341 #[allow(clippy::manual_dangling_ptr)]
342 #[cfg(not(feature = "v1_18"))]
343 {
344 static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
345 std::sync::OnceLock::new();
346
347 let set_once_quark = SET_ONCE_QUARK
348 .get_or_init(|| glib::Quark::from_str("gstreamer-rs-app-src-callbacks"));
349
350 if gst::version() < (1, 16, 3, 0) {
353 if !glib::gobject_ffi::g_object_get_qdata(
354 src as *mut _,
355 set_once_quark.into_glib(),
356 )
357 .is_null()
358 {
359 panic!("AppSrc callbacks can only be set once");
360 }
361
362 glib::gobject_ffi::g_object_set_qdata(
363 src as *mut _,
364 set_once_quark.into_glib(),
365 1 as *mut _,
366 );
367 }
368 }
369
370 ffi::gst_app_src_set_callbacks(
371 src,
372 mut_override(&callbacks.callbacks),
373 Box::into_raw(Box::new(callbacks)) as *mut _,
374 Some(destroy_callbacks),
375 );
376 }
377 }
378
379 #[doc(alias = "gst_app_src_set_latency")]
386 pub fn set_latency(
387 &self,
388 min: impl Into<Option<gst::ClockTime>>,
389 max: impl Into<Option<gst::ClockTime>>,
390 ) {
391 unsafe {
392 ffi::gst_app_src_set_latency(
393 self.to_glib_none().0,
394 min.into().into_glib(),
395 max.into().into_glib(),
396 );
397 }
398 }
399
400 #[doc(alias = "get_latency")]
411 #[doc(alias = "gst_app_src_get_latency")]
412 pub fn latency(&self) -> (Option<gst::ClockTime>, Option<gst::ClockTime>) {
413 unsafe {
414 let mut min = mem::MaybeUninit::uninit();
415 let mut max = mem::MaybeUninit::uninit();
416 ffi::gst_app_src_get_latency(self.to_glib_none().0, min.as_mut_ptr(), max.as_mut_ptr());
417 (from_glib(min.assume_init()), from_glib(max.assume_init()))
418 }
419 }
420
421 #[doc(alias = "do-timestamp")]
422 #[doc(alias = "gst_base_src_set_do_timestamp")]
423 pub fn set_do_timestamp(&self, timestamp: bool) {
424 unsafe {
425 gst_base::ffi::gst_base_src_set_do_timestamp(
426 self.as_ptr() as *mut gst_base::ffi::GstBaseSrc,
427 timestamp.into_glib(),
428 );
429 }
430 }
431
432 #[doc(alias = "do-timestamp")]
433 #[doc(alias = "gst_base_src_get_do_timestamp")]
434 pub fn do_timestamp(&self) -> bool {
435 unsafe {
436 from_glib(gst_base::ffi::gst_base_src_get_do_timestamp(
437 self.as_ptr() as *mut gst_base::ffi::GstBaseSrc
438 ))
439 }
440 }
441
442 #[doc(alias = "do-timestamp")]
443 pub fn connect_do_timestamp_notify<F: Fn(&Self) + Send + Sync + 'static>(
444 &self,
445 f: F,
446 ) -> glib::SignalHandlerId {
447 unsafe extern "C" fn notify_do_timestamp_trampoline<
448 F: Fn(&AppSrc) + Send + Sync + 'static,
449 >(
450 this: *mut ffi::GstAppSrc,
451 _param_spec: glib::ffi::gpointer,
452 f: glib::ffi::gpointer,
453 ) {
454 let f: &F = &*(f as *const F);
455 f(&AppSrc::from_glib_borrow(this))
456 }
457 unsafe {
458 let f: Box<F> = Box::new(f);
459 glib::signal::connect_raw(
460 self.as_ptr() as *mut _,
461 b"notify::do-timestamp\0".as_ptr() as *const _,
462 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
463 notify_do_timestamp_trampoline::<F> as *const (),
464 )),
465 Box::into_raw(f),
466 )
467 }
468 }
469
470 #[doc(alias = "set-automatic-eos")]
471 #[doc(alias = "gst_base_src_set_automatic_eos")]
472 pub fn set_automatic_eos(&self, automatic_eos: bool) {
473 unsafe {
474 gst_base::ffi::gst_base_src_set_automatic_eos(
475 self.as_ptr() as *mut gst_base::ffi::GstBaseSrc,
476 automatic_eos.into_glib(),
477 );
478 }
479 }
480
481 pub fn sink(&self) -> AppSrcSink {
482 AppSrcSink::new(self)
483 }
484}
485
486#[must_use = "The builder must be built to be used"]
491pub struct AppSrcBuilder<'a> {
492 builder: gst::gobject::GObjectBuilder<'a, AppSrc>,
493 callbacks: Option<AppSrcCallbacks>,
494 automatic_eos: Option<bool>,
495}
496
497impl<'a> AppSrcBuilder<'a> {
498 #[must_use = "Building the object from the builder is usually expensive and is not expected to have side effects"]
506 pub fn build(self) -> AppSrc {
507 let appsrc = self.builder.build().unwrap();
508
509 if let Some(callbacks) = self.callbacks {
510 appsrc.set_callbacks(callbacks);
511 }
512
513 if let Some(automatic_eos) = self.automatic_eos {
514 appsrc.set_automatic_eos(automatic_eos);
515 }
516
517 appsrc
518 }
519
520 pub fn automatic_eos(self, automatic_eos: bool) -> Self {
521 Self {
522 automatic_eos: Some(automatic_eos),
523 ..self
524 }
525 }
526
527 pub fn block(self, block: bool) -> Self {
528 Self {
529 builder: self.builder.property("block", block),
530 ..self
531 }
532 }
533
534 pub fn callbacks(self, callbacks: AppSrcCallbacks) -> Self {
535 Self {
536 callbacks: Some(callbacks),
537 ..self
538 }
539 }
540
541 pub fn caps(self, caps: &'a gst::Caps) -> Self {
542 Self {
543 builder: self.builder.property("caps", caps),
544 ..self
545 }
546 }
547
548 pub fn do_timestamp(self, do_timestamp: bool) -> Self {
549 Self {
550 builder: self.builder.property("do-timestamp", do_timestamp),
551 ..self
552 }
553 }
554
555 pub fn duration(self, duration: u64) -> Self {
556 Self {
557 builder: self.builder.property("duration", duration),
558 ..self
559 }
560 }
561
562 pub fn format(self, format: gst::Format) -> Self {
563 Self {
564 builder: self.builder.property("format", format),
565 ..self
566 }
567 }
568
569 #[cfg(feature = "v1_18")]
570 #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
571 pub fn handle_segment_change(self, handle_segment_change: bool) -> Self {
572 Self {
573 builder: self
574 .builder
575 .property("handle-segment-change", handle_segment_change),
576 ..self
577 }
578 }
579
580 pub fn is_live(self, is_live: bool) -> Self {
581 Self {
582 builder: self.builder.property("is-live", is_live),
583 ..self
584 }
585 }
586
587 #[cfg(feature = "v1_20")]
588 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
589 pub fn leaky_type(self, leaky_type: crate::AppLeakyType) -> Self {
590 Self {
591 builder: self.builder.property("leaky-type", leaky_type),
592 ..self
593 }
594 }
595
596 #[cfg(feature = "v1_20")]
597 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
598 pub fn max_buffers(self, max_buffers: u64) -> Self {
599 Self {
600 builder: self.builder.property("max-buffers", max_buffers),
601 ..self
602 }
603 }
604
605 pub fn max_bytes(self, max_bytes: u64) -> Self {
606 Self {
607 builder: self.builder.property("max-bytes", max_bytes),
608 ..self
609 }
610 }
611
612 pub fn max_latency(self, max_latency: i64) -> Self {
613 Self {
614 builder: self.builder.property("max-latency", max_latency),
615 ..self
616 }
617 }
618
619 #[cfg(feature = "v1_20")]
620 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
621 pub fn max_time(self, max_time: gst::ClockTime) -> Self {
622 Self {
623 builder: self.builder.property("max-time", max_time),
624 ..self
625 }
626 }
627
628 pub fn min_latency(self, min_latency: i64) -> Self {
629 Self {
630 builder: self.builder.property("min-latency", min_latency),
631 ..self
632 }
633 }
634
635 pub fn min_percent(self, min_percent: u32) -> Self {
636 Self {
637 builder: self.builder.property("min-percent", min_percent),
638 ..self
639 }
640 }
641
642 pub fn size(self, size: i64) -> Self {
643 Self {
644 builder: self.builder.property("size", size),
645 ..self
646 }
647 }
648
649 pub fn stream_type(self, stream_type: crate::AppStreamType) -> Self {
650 Self {
651 builder: self.builder.property("stream-type", stream_type),
652 ..self
653 }
654 }
655
656 #[cfg(feature = "v1_28")]
657 #[cfg_attr(docsrs, doc(cfg(feature = "v1_28")))]
658 pub fn silent(self, silent: bool) -> Self {
659 Self {
660 builder: self.builder.property("silent", silent),
661 ..self
662 }
663 }
664
665 #[inline]
670 pub fn property(self, name: &'a str, value: impl Into<glib::Value> + 'a) -> Self {
671 Self {
672 builder: self.builder.property(name, value),
673 ..self
674 }
675 }
676
677 #[inline]
680 pub fn property_from_str(self, name: &'a str, value: &'a str) -> Self {
681 Self {
682 builder: self.builder.property_from_str(name, value),
683 ..self
684 }
685 }
686
687 gst::impl_builder_gvalue_extra_setters!(property_and_name);
688}
689
690#[derive(Debug)]
691pub struct AppSrcSink {
692 app_src: glib::WeakRef<AppSrc>,
693 waker_reference: Arc<Mutex<Option<Waker>>>,
694}
695
696impl AppSrcSink {
697 fn new(app_src: &AppSrc) -> Self {
698 skip_assert_initialized!();
699
700 let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
701
702 app_src.set_callbacks(
703 AppSrcCallbacks::builder()
704 .need_data({
705 let waker_reference = Arc::clone(&waker_reference);
706
707 move |_, _| {
708 if let Some(waker) = waker_reference.lock().unwrap().take() {
709 waker.wake();
710 }
711 }
712 })
713 .build(),
714 );
715
716 Self {
717 app_src: app_src.downgrade(),
718 waker_reference,
719 }
720 }
721}
722
723impl Drop for AppSrcSink {
724 fn drop(&mut self) {
725 #[cfg(not(feature = "v1_18"))]
726 {
727 if gst::version() >= (1, 16, 3, 0) {
730 if let Some(app_src) = self.app_src.upgrade() {
731 app_src.set_callbacks(AppSrcCallbacks::builder().build());
732 }
733 }
734 }
735 }
736}
737
738impl Sink<gst::Sample> for AppSrcSink {
739 type Error = gst::FlowError;
740
741 fn poll_ready(self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<(), Self::Error>> {
742 let mut waker = self.waker_reference.lock().unwrap();
743
744 let Some(app_src) = self.app_src.upgrade() else {
745 return Poll::Ready(Err(gst::FlowError::Eos));
746 };
747
748 let current_level_bytes = app_src.current_level_bytes();
749 let max_bytes = app_src.max_bytes();
750
751 if current_level_bytes >= max_bytes && max_bytes != 0 {
752 waker.replace(context.waker().to_owned());
753
754 Poll::Pending
755 } else {
756 Poll::Ready(Ok(()))
757 }
758 }
759
760 fn start_send(self: Pin<&mut Self>, sample: gst::Sample) -> Result<(), Self::Error> {
761 let Some(app_src) = self.app_src.upgrade() else {
762 return Err(gst::FlowError::Eos);
763 };
764
765 app_src.push_sample(&sample)?;
766
767 Ok(())
768 }
769
770 fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
771 Poll::Ready(Ok(()))
772 }
773
774 fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
775 let Some(app_src) = self.app_src.upgrade() else {
776 return Poll::Ready(Ok(()));
777 };
778
779 app_src.end_of_stream()?;
780
781 Poll::Ready(Ok(()))
782 }
783}
784
785#[cfg(test)]
786mod tests {
787 use std::sync::atomic::{AtomicUsize, Ordering};
788
789 use futures_util::{sink::SinkExt, stream::StreamExt};
790 use gst::prelude::*;
791
792 use super::*;
793
794 #[test]
795 fn test_app_src_sink() {
796 gst::init().unwrap();
797
798 let appsrc = gst::ElementFactory::make("appsrc").build().unwrap();
799 let fakesink = gst::ElementFactory::make("fakesink")
800 .property("signal-handoffs", true)
801 .build()
802 .unwrap();
803
804 let pipeline = gst::Pipeline::new();
805 pipeline.add(&appsrc).unwrap();
806 pipeline.add(&fakesink).unwrap();
807
808 appsrc.link(&fakesink).unwrap();
809
810 let mut bus_stream = pipeline.bus().unwrap().stream();
811 let mut app_src_sink = appsrc.dynamic_cast::<AppSrc>().unwrap().sink();
812
813 let sample_quantity = 5;
814
815 let samples = (0..sample_quantity)
816 .map(|_| gst::Sample::builder().buffer(&gst::Buffer::new()).build())
817 .collect::<Vec<gst::Sample>>();
818
819 let mut sample_stream = futures_util::stream::iter(samples).map(Ok);
820
821 let handoff_count_reference = Arc::new(AtomicUsize::new(0));
822
823 fakesink.connect("handoff", false, {
824 let handoff_count_reference = Arc::clone(&handoff_count_reference);
825
826 move |_| {
827 handoff_count_reference.fetch_add(1, Ordering::AcqRel);
828
829 None
830 }
831 });
832
833 pipeline.set_state(gst::State::Playing).unwrap();
834
835 futures_executor::block_on(app_src_sink.send_all(&mut sample_stream)).unwrap();
836 futures_executor::block_on(app_src_sink.close()).unwrap();
837
838 while let Some(message) = futures_executor::block_on(bus_stream.next()) {
839 match message.view() {
840 gst::MessageView::Eos(_) => break,
841 gst::MessageView::Error(_) => unreachable!(),
842 _ => continue,
843 }
844 }
845
846 pipeline.set_state(gst::State::Null).unwrap();
847
848 assert_eq!(
849 handoff_count_reference.load(Ordering::Acquire),
850 sample_quantity
851 );
852 }
853
854 #[test]
855 fn builder_caps_lt() {
856 gst::init().unwrap();
857
858 let caps = &gst::Caps::new_any();
859 {
860 let stream_type = "random-access".to_owned();
861 let appsrc = AppSrc::builder()
862 .property_from_str("stream-type", &stream_type)
863 .caps(caps)
864 .build();
865 assert_eq!(
866 appsrc.property::<crate::AppStreamType>("stream-type"),
867 crate::AppStreamType::RandomAccess
868 );
869 assert!(appsrc.property::<gst::Caps>("caps").is_any());
870 }
871
872 let stream_type = &"random-access".to_owned();
873 {
874 let caps = &gst::Caps::new_any();
875 let appsrc = AppSrc::builder()
876 .property_from_str("stream-type", stream_type)
877 .caps(caps)
878 .build();
879 assert_eq!(
880 appsrc.property::<crate::AppStreamType>("stream-type"),
881 crate::AppStreamType::RandomAccess
882 );
883 assert!(appsrc.property::<gst::Caps>("caps").is_any());
884 }
885 }
886}