1use std::{
4 mem, panic,
5 pin::Pin,
6 ptr,
7 sync::{Arc, Mutex},
8 task::{Context, Poll, Waker},
9};
10
11#[cfg(not(panic = "abort"))]
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use futures_core::Stream;
15use glib::{ffi::gpointer, prelude::*, translate::*};
16
17use crate::{AppSink, ffi};
18
19#[allow(clippy::type_complexity)]
20pub struct AppSinkCallbacks {
21 eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
22 new_preroll: Option<
23 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
24 >,
25 new_sample: Option<
26 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
27 >,
28 new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
29 propose_allocation:
30 Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
31 #[cfg(not(panic = "abort"))]
32 panicked: AtomicBool,
33 callbacks: ffi::GstAppSinkCallbacks,
34}
35
36unsafe impl Send for AppSinkCallbacks {}
37unsafe impl Sync for AppSinkCallbacks {}
38
39impl AppSinkCallbacks {
40 pub fn builder() -> AppSinkCallbacksBuilder {
41 skip_assert_initialized!();
42 AppSinkCallbacksBuilder {
43 eos: None,
44 new_preroll: None,
45 new_sample: None,
46 new_event: None,
47 propose_allocation: None,
48 }
49 }
50}
51
52#[allow(clippy::type_complexity)]
53#[must_use = "The builder must be built to be used"]
54pub struct AppSinkCallbacksBuilder {
55 eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
56 new_preroll: Option<
57 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
58 >,
59 new_sample: Option<
60 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
61 >,
62 new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
63 propose_allocation:
64 Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
65}
66
67impl AppSinkCallbacksBuilder {
68 pub fn eos<F: FnMut(&AppSink) + Send + 'static>(self, eos: F) -> Self {
69 Self {
70 eos: Some(Box::new(eos)),
71 ..self
72 }
73 }
74
75 pub fn eos_if<F: FnMut(&AppSink) + Send + 'static>(self, eos: F, predicate: bool) -> Self {
76 if predicate { self.eos(eos) } else { self }
77 }
78
79 pub fn eos_if_some<F: FnMut(&AppSink) + Send + 'static>(self, eos: Option<F>) -> Self {
80 if let Some(eos) = eos {
81 self.eos(eos)
82 } else {
83 self
84 }
85 }
86
87 pub fn new_preroll<
88 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
89 >(
90 self,
91 new_preroll: F,
92 ) -> Self {
93 Self {
94 new_preroll: Some(Box::new(new_preroll)),
95 ..self
96 }
97 }
98
99 pub fn new_preroll_if<
100 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
101 >(
102 self,
103 new_preroll: F,
104 predicate: bool,
105 ) -> Self {
106 if predicate {
107 self.new_preroll(new_preroll)
108 } else {
109 self
110 }
111 }
112
113 pub fn new_preroll_if_some<
114 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
115 >(
116 self,
117 new_preroll: Option<F>,
118 ) -> Self {
119 if let Some(new_preroll) = new_preroll {
120 self.new_preroll(new_preroll)
121 } else {
122 self
123 }
124 }
125
126 pub fn new_sample<
127 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
128 >(
129 self,
130 new_sample: F,
131 ) -> Self {
132 Self {
133 new_sample: Some(Box::new(new_sample)),
134 ..self
135 }
136 }
137
138 pub fn new_sample_if<
139 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
140 >(
141 self,
142 new_sample: F,
143 predicate: bool,
144 ) -> Self {
145 if predicate {
146 self.new_sample(new_sample)
147 } else {
148 self
149 }
150 }
151
152 pub fn new_sample_if_some<
153 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
154 >(
155 self,
156 new_sample: Option<F>,
157 ) -> Self {
158 if let Some(new_sample) = new_sample {
159 self.new_sample(new_sample)
160 } else {
161 self
162 }
163 }
164
165 #[cfg(feature = "v1_20")]
166 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
167 pub fn new_event<F: FnMut(&AppSink) -> bool + Send + 'static>(self, new_event: F) -> Self {
168 Self {
169 new_event: Some(Box::new(new_event)),
170 ..self
171 }
172 }
173
174 #[cfg(feature = "v1_20")]
175 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
176 pub fn new_event_if<F: FnMut(&AppSink) -> bool + Send + 'static>(
177 self,
178 new_event: F,
179 predicate: bool,
180 ) -> Self {
181 if predicate {
182 self.new_event(new_event)
183 } else {
184 self
185 }
186 }
187
188 #[cfg(feature = "v1_20")]
189 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
190 pub fn new_event_if_some<F: FnMut(&AppSink) -> bool + Send + 'static>(
191 self,
192 new_event: Option<F>,
193 ) -> Self {
194 if let Some(new_event) = new_event {
195 self.new_event(new_event)
196 } else {
197 self
198 }
199 }
200
201 #[cfg(feature = "v1_24")]
202 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
203 pub fn propose_allocation<
204 F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
205 >(
206 self,
207 propose_allocation: F,
208 ) -> Self {
209 Self {
210 propose_allocation: Some(Box::new(propose_allocation)),
211 ..self
212 }
213 }
214
215 #[cfg(feature = "v1_24")]
216 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
217 pub fn propose_allocation_if<
218 F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
219 >(
220 self,
221 propose_allocation: F,
222 predicate: bool,
223 ) -> Self {
224 if predicate {
225 self.propose_allocation(propose_allocation)
226 } else {
227 self
228 }
229 }
230
231 #[cfg(feature = "v1_24")]
232 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
233 pub fn propose_allocation_if_some<
234 F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
235 >(
236 self,
237 propose_allocation: Option<F>,
238 ) -> Self {
239 if let Some(propose_allocation) = propose_allocation {
240 self.propose_allocation(propose_allocation)
241 } else {
242 self
243 }
244 }
245
246 #[must_use = "Building the callbacks without using them has no effect"]
247 pub fn build(self) -> AppSinkCallbacks {
248 let have_eos = self.eos.is_some();
249 let have_new_preroll = self.new_preroll.is_some();
250 let have_new_sample = self.new_sample.is_some();
251 let have_new_event = self.new_event.is_some();
252 let have_propose_allocation = self.propose_allocation.is_some();
253
254 AppSinkCallbacks {
255 eos: self.eos,
256 new_preroll: self.new_preroll,
257 new_sample: self.new_sample,
258 new_event: self.new_event,
259 propose_allocation: self.propose_allocation,
260 #[cfg(not(panic = "abort"))]
261 panicked: AtomicBool::new(false),
262 callbacks: ffi::GstAppSinkCallbacks {
263 eos: if have_eos { Some(trampoline_eos) } else { None },
264 new_preroll: if have_new_preroll {
265 Some(trampoline_new_preroll)
266 } else {
267 None
268 },
269 new_sample: if have_new_sample {
270 Some(trampoline_new_sample)
271 } else {
272 None
273 },
274 new_event: if have_new_event {
275 Some(trampoline_new_event)
276 } else {
277 None
278 },
279 propose_allocation: if have_propose_allocation {
280 Some(trampoline_propose_allocation)
281 } else {
282 None
283 },
284 _gst_reserved: [ptr::null_mut(), ptr::null_mut()],
285 },
286 }
287 }
288}
289
290unsafe extern "C" fn trampoline_eos(appsink: *mut ffi::GstAppSink, callbacks: gpointer) {
291 unsafe {
292 let callbacks = callbacks as *mut AppSinkCallbacks;
293 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
294
295 #[cfg(not(panic = "abort"))]
296 if (*callbacks).panicked.load(Ordering::Relaxed) {
297 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
298 gst::subclass::post_panic_error_message(
299 element.upcast_ref(),
300 element.upcast_ref(),
301 None,
302 );
303 return;
304 }
305
306 if let Some(ref mut eos) = (*callbacks).eos {
307 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| eos(&element)));
308 match result {
309 Ok(result) => result,
310 Err(err) => {
311 #[cfg(panic = "abort")]
312 {
313 unreachable!("{err:?}");
314 }
315 #[cfg(not(panic = "abort"))]
316 {
317 (*callbacks).panicked.store(true, Ordering::Relaxed);
318 gst::subclass::post_panic_error_message(
319 element.upcast_ref(),
320 element.upcast_ref(),
321 Some(err),
322 );
323 }
324 }
325 }
326 }
327 }
328}
329
330unsafe extern "C" fn trampoline_new_preroll(
331 appsink: *mut ffi::GstAppSink,
332 callbacks: gpointer,
333) -> gst::ffi::GstFlowReturn {
334 unsafe {
335 let callbacks = callbacks as *mut AppSinkCallbacks;
336 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
337
338 #[cfg(not(panic = "abort"))]
339 if (*callbacks).panicked.load(Ordering::Relaxed) {
340 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
341 gst::subclass::post_panic_error_message(
342 element.upcast_ref(),
343 element.upcast_ref(),
344 None,
345 );
346 return gst::FlowReturn::Error.into_glib();
347 }
348
349 let ret = if let Some(ref mut new_preroll) = (*callbacks).new_preroll {
350 let result =
351 panic::catch_unwind(panic::AssertUnwindSafe(|| new_preroll(&element).into()));
352 match result {
353 Ok(result) => result,
354 Err(err) => {
355 #[cfg(panic = "abort")]
356 {
357 unreachable!("{err:?}");
358 }
359 #[cfg(not(panic = "abort"))]
360 {
361 (*callbacks).panicked.store(true, Ordering::Relaxed);
362 gst::subclass::post_panic_error_message(
363 element.upcast_ref(),
364 element.upcast_ref(),
365 Some(err),
366 );
367
368 gst::FlowReturn::Error
369 }
370 }
371 }
372 } else {
373 gst::FlowReturn::Error
374 };
375
376 ret.into_glib()
377 }
378}
379
380unsafe extern "C" fn trampoline_new_sample(
381 appsink: *mut ffi::GstAppSink,
382 callbacks: gpointer,
383) -> gst::ffi::GstFlowReturn {
384 unsafe {
385 let callbacks = callbacks as *mut AppSinkCallbacks;
386 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
387
388 #[cfg(not(panic = "abort"))]
389 if (*callbacks).panicked.load(Ordering::Relaxed) {
390 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
391 gst::subclass::post_panic_error_message(
392 element.upcast_ref(),
393 element.upcast_ref(),
394 None,
395 );
396 return gst::FlowReturn::Error.into_glib();
397 }
398
399 let ret = if let Some(ref mut new_sample) = (*callbacks).new_sample {
400 let result =
401 panic::catch_unwind(panic::AssertUnwindSafe(|| new_sample(&element).into()));
402 match result {
403 Ok(result) => result,
404 Err(err) => {
405 #[cfg(panic = "abort")]
406 {
407 unreachable!("{err:?}");
408 }
409 #[cfg(not(panic = "abort"))]
410 {
411 (*callbacks).panicked.store(true, Ordering::Relaxed);
412 gst::subclass::post_panic_error_message(
413 element.upcast_ref(),
414 element.upcast_ref(),
415 Some(err),
416 );
417
418 gst::FlowReturn::Error
419 }
420 }
421 }
422 } else {
423 gst::FlowReturn::Error
424 };
425
426 ret.into_glib()
427 }
428}
429
430unsafe extern "C" fn trampoline_new_event(
431 appsink: *mut ffi::GstAppSink,
432 callbacks: gpointer,
433) -> glib::ffi::gboolean {
434 unsafe {
435 let callbacks = callbacks as *mut AppSinkCallbacks;
436 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
437
438 #[cfg(not(panic = "abort"))]
439 if (*callbacks).panicked.load(Ordering::Relaxed) {
440 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
441 gst::subclass::post_panic_error_message(
442 element.upcast_ref(),
443 element.upcast_ref(),
444 None,
445 );
446 return false.into_glib();
447 }
448
449 let ret = if let Some(ref mut new_event) = (*callbacks).new_event {
450 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_event(&element)));
451 match result {
452 Ok(result) => result,
453 Err(err) => {
454 #[cfg(panic = "abort")]
455 {
456 unreachable!("{err:?}");
457 }
458 #[cfg(not(panic = "abort"))]
459 {
460 (*callbacks).panicked.store(true, Ordering::Relaxed);
461 gst::subclass::post_panic_error_message(
462 element.upcast_ref(),
463 element.upcast_ref(),
464 Some(err),
465 );
466
467 false
468 }
469 }
470 }
471 } else {
472 false
473 };
474
475 ret.into_glib()
476 }
477}
478
479unsafe extern "C" fn trampoline_propose_allocation(
480 appsink: *mut ffi::GstAppSink,
481 query: *mut gst::ffi::GstQuery,
482 callbacks: gpointer,
483) -> glib::ffi::gboolean {
484 unsafe {
485 let callbacks = callbacks as *mut AppSinkCallbacks;
486 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
487
488 #[cfg(not(panic = "abort"))]
489 if (*callbacks).panicked.load(Ordering::Relaxed) {
490 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
491 gst::subclass::post_panic_error_message(
492 element.upcast_ref(),
493 element.upcast_ref(),
494 None,
495 );
496 return false.into_glib();
497 }
498
499 let ret = if let Some(ref mut propose_allocation) = (*callbacks).propose_allocation {
500 let query = match gst::QueryRef::from_mut_ptr(query).view_mut() {
501 gst::QueryViewMut::Allocation(allocation) => allocation,
502 _ => unreachable!(),
503 };
504 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
505 propose_allocation(&element, query)
506 }));
507 match result {
508 Ok(result) => result,
509 Err(err) => {
510 #[cfg(panic = "abort")]
511 {
512 unreachable!("{err:?}");
513 }
514 #[cfg(not(panic = "abort"))]
515 {
516 (*callbacks).panicked.store(true, Ordering::Relaxed);
517 gst::subclass::post_panic_error_message(
518 element.upcast_ref(),
519 element.upcast_ref(),
520 Some(err),
521 );
522 false
523 }
524 }
525 }
526 } else {
527 false
528 };
529
530 ret.into_glib()
531 }
532}
533
534unsafe extern "C" fn destroy_callbacks(ptr: gpointer) {
535 unsafe {
536 let _ = Box::<AppSinkCallbacks>::from_raw(ptr as *mut _);
537 }
538}
539
540impl AppSink {
541 pub fn builder<'a>() -> AppSinkBuilder<'a> {
546 assert_initialized_main_thread!();
547 AppSinkBuilder {
548 builder: gst::Object::builder(),
549 callbacks: None,
550 drop_out_of_segment: None,
551 }
552 }
553
554 #[doc(alias = "gst_app_sink_set_callbacks")]
574 pub fn set_callbacks(&self, callbacks: AppSinkCallbacks) {
575 unsafe {
576 let sink = self.to_glib_none().0;
577
578 #[allow(clippy::manual_dangling_ptr)]
579 #[cfg(not(feature = "v1_18"))]
580 {
581 static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
582 std::sync::OnceLock::new();
583
584 let set_once_quark = SET_ONCE_QUARK
585 .get_or_init(|| glib::Quark::from_str("gstreamer-rs-app-sink-callbacks"));
586
587 if gst::version() < (1, 16, 3, 0) {
590 if !glib::gobject_ffi::g_object_get_qdata(
591 sink as *mut _,
592 set_once_quark.into_glib(),
593 )
594 .is_null()
595 {
596 panic!("AppSink callbacks can only be set once");
597 }
598
599 glib::gobject_ffi::g_object_set_qdata(
600 sink as *mut _,
601 set_once_quark.into_glib(),
602 1 as *mut _,
603 );
604 }
605 }
606
607 ffi::gst_app_sink_set_callbacks(
608 sink,
609 mut_override(&callbacks.callbacks),
610 Box::into_raw(Box::new(callbacks)) as *mut _,
611 Some(destroy_callbacks),
612 );
613 }
614 }
615
616 #[doc(alias = "drop-out-of-segment")]
617 pub fn drops_out_of_segment(&self) -> bool {
618 unsafe {
619 from_glib(gst_base::ffi::gst_base_sink_get_drop_out_of_segment(
620 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
621 ))
622 }
623 }
624
625 #[doc(alias = "max-bitrate")]
626 #[doc(alias = "gst_base_sink_get_max_bitrate")]
627 pub fn max_bitrate(&self) -> u64 {
628 unsafe {
629 gst_base::ffi::gst_base_sink_get_max_bitrate(
630 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
631 )
632 }
633 }
634
635 #[doc(alias = "max-lateness")]
636 #[doc(alias = "gst_base_sink_get_max_lateness")]
637 pub fn max_lateness(&self) -> i64 {
638 unsafe {
639 gst_base::ffi::gst_base_sink_get_max_lateness(
640 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
641 )
642 }
643 }
644
645 #[doc(alias = "processing-deadline")]
646 #[cfg(feature = "v1_16")]
647 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
648 #[doc(alias = "gst_base_sink_get_processing_deadline")]
649 pub fn processing_deadline(&self) -> gst::ClockTime {
650 unsafe {
651 try_from_glib(gst_base::ffi::gst_base_sink_get_processing_deadline(
652 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
653 ))
654 .expect("undefined processing_deadline")
655 }
656 }
657
658 #[doc(alias = "render-delay")]
659 #[doc(alias = "gst_base_sink_get_render_delay")]
660 pub fn render_delay(&self) -> gst::ClockTime {
661 unsafe {
662 try_from_glib(gst_base::ffi::gst_base_sink_get_render_delay(
663 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
664 ))
665 .expect("undefined render_delay")
666 }
667 }
668
669 #[cfg(feature = "v1_18")]
670 #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
671 #[doc(alias = "gst_base_sink_get_stats")]
672 pub fn stats(&self) -> gst::Structure {
673 unsafe {
674 from_glib_full(gst_base::ffi::gst_base_sink_get_stats(
675 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
676 ))
677 }
678 }
679
680 #[doc(alias = "sync")]
681 pub fn is_sync(&self) -> bool {
682 unsafe {
683 from_glib(gst_base::ffi::gst_base_sink_get_sync(
684 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
685 ))
686 }
687 }
688
689 #[doc(alias = "throttle-time")]
690 #[doc(alias = "gst_base_sink_get_throttle_time")]
691 pub fn throttle_time(&self) -> u64 {
692 unsafe {
693 gst_base::ffi::gst_base_sink_get_throttle_time(
694 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
695 )
696 }
697 }
698
699 #[doc(alias = "ts-offset")]
700 #[doc(alias = "gst_base_sink_get_ts_offset")]
701 pub fn ts_offset(&self) -> gst::ClockTimeDiff {
702 unsafe {
703 gst_base::ffi::gst_base_sink_get_ts_offset(
704 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
705 )
706 }
707 }
708
709 #[doc(alias = "async")]
710 #[doc(alias = "gst_base_sink_is_async_enabled")]
711 pub fn is_async(&self) -> bool {
712 unsafe {
713 from_glib(gst_base::ffi::gst_base_sink_is_async_enabled(
714 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
715 ))
716 }
717 }
718
719 #[doc(alias = "last-sample")]
720 pub fn enables_last_sample(&self) -> bool {
721 unsafe {
722 from_glib(gst_base::ffi::gst_base_sink_is_last_sample_enabled(
723 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
724 ))
725 }
726 }
727
728 #[doc(alias = "qos")]
729 #[doc(alias = "gst_base_sink_is_qos_enabled")]
730 pub fn is_qos(&self) -> bool {
731 unsafe {
732 from_glib(gst_base::ffi::gst_base_sink_is_qos_enabled(
733 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
734 ))
735 }
736 }
737
738 #[doc(alias = "async")]
739 #[doc(alias = "gst_base_sink_set_async_enabled")]
740 pub fn set_async(&self, enabled: bool) {
741 unsafe {
742 gst_base::ffi::gst_base_sink_set_async_enabled(
743 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
744 enabled.into_glib(),
745 );
746 }
747 }
748
749 #[doc(alias = "drop-out-of-segment")]
750 #[doc(alias = "gst_base_sink_set_drop_out_of_segment")]
751 pub fn set_drop_out_of_segment(&self, drop_out_of_segment: bool) {
752 unsafe {
753 gst_base::ffi::gst_base_sink_set_drop_out_of_segment(
754 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
755 drop_out_of_segment.into_glib(),
756 );
757 }
758 }
759
760 #[doc(alias = "last-sample")]
761 pub fn set_enable_last_sample(&self, enabled: bool) {
762 unsafe {
763 gst_base::ffi::gst_base_sink_set_last_sample_enabled(
764 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
765 enabled.into_glib(),
766 );
767 }
768 }
769
770 #[doc(alias = "max-bitrate")]
771 #[doc(alias = "gst_base_sink_set_max_bitrate")]
772 pub fn set_max_bitrate(&self, max_bitrate: u64) {
773 unsafe {
774 gst_base::ffi::gst_base_sink_set_max_bitrate(
775 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
776 max_bitrate,
777 );
778 }
779 }
780
781 #[doc(alias = "max-lateness")]
782 #[doc(alias = "gst_base_sink_set_max_lateness")]
783 pub fn set_max_lateness(&self, max_lateness: i64) {
784 unsafe {
785 gst_base::ffi::gst_base_sink_set_max_lateness(
786 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
787 max_lateness,
788 );
789 }
790 }
791
792 #[doc(alias = "processing-deadline")]
793 #[cfg(feature = "v1_16")]
794 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
795 #[doc(alias = "gst_base_sink_set_processing_deadline")]
796 pub fn set_processing_deadline(&self, processing_deadline: gst::ClockTime) {
797 unsafe {
798 gst_base::ffi::gst_base_sink_set_processing_deadline(
799 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
800 processing_deadline.into_glib(),
801 );
802 }
803 }
804
805 #[doc(alias = "qos")]
806 #[doc(alias = "gst_base_sink_set_qos_enabled")]
807 pub fn set_qos(&self, enabled: bool) {
808 unsafe {
809 gst_base::ffi::gst_base_sink_set_qos_enabled(
810 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
811 enabled.into_glib(),
812 );
813 }
814 }
815
816 #[doc(alias = "render-delay")]
817 #[doc(alias = "gst_base_sink_set_render_delay")]
818 pub fn set_render_delay(&self, delay: gst::ClockTime) {
819 unsafe {
820 gst_base::ffi::gst_base_sink_set_render_delay(
821 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
822 delay.into_glib(),
823 );
824 }
825 }
826
827 #[doc(alias = "sync")]
828 #[doc(alias = "gst_base_sink_set_sync")]
829 pub fn set_sync(&self, sync: bool) {
830 unsafe {
831 gst_base::ffi::gst_base_sink_set_sync(
832 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
833 sync.into_glib(),
834 );
835 }
836 }
837
838 #[doc(alias = "throttle-time")]
839 #[doc(alias = "gst_base_sink_set_throttle_time")]
840 pub fn set_throttle_time(&self, throttle: u64) {
841 unsafe {
842 gst_base::ffi::gst_base_sink_set_throttle_time(
843 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
844 throttle,
845 );
846 }
847 }
848
849 #[doc(alias = "ts-offset")]
850 #[doc(alias = "gst_base_sink_set_ts_offset")]
851 pub fn set_ts_offset(&self, offset: gst::ClockTimeDiff) {
852 unsafe {
853 gst_base::ffi::gst_base_sink_set_ts_offset(
854 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
855 offset,
856 );
857 }
858 }
859
860 #[doc(alias = "async")]
861 pub fn connect_async_notify<F: Fn(&Self) + Send + Sync + 'static>(
862 &self,
863 f: F,
864 ) -> glib::SignalHandlerId {
865 unsafe extern "C" fn notify_async_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
866 this: *mut ffi::GstAppSink,
867 _param_spec: glib::ffi::gpointer,
868 f: glib::ffi::gpointer,
869 ) {
870 unsafe {
871 let f: &F = &*(f as *const F);
872 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
873 }
874 }
875 unsafe {
876 let f: Box<F> = Box::new(f);
877 glib::signal::connect_raw(
878 self.as_ptr() as *mut _,
879 b"notify::async\0".as_ptr() as *const _,
880 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
881 notify_async_trampoline::<F> as *const (),
882 )),
883 Box::into_raw(f),
884 )
885 }
886 }
887
888 #[doc(alias = "blocksize")]
889 pub fn connect_blocksize_notify<F: Fn(&Self) + Send + Sync + 'static>(
890 &self,
891 f: F,
892 ) -> glib::SignalHandlerId {
893 unsafe extern "C" fn notify_blocksize_trampoline<
894 F: Fn(&AppSink) + Send + Sync + 'static,
895 >(
896 this: *mut ffi::GstAppSink,
897 _param_spec: glib::ffi::gpointer,
898 f: glib::ffi::gpointer,
899 ) {
900 unsafe {
901 let f: &F = &*(f as *const F);
902 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
903 }
904 }
905 unsafe {
906 let f: Box<F> = Box::new(f);
907 glib::signal::connect_raw(
908 self.as_ptr() as *mut _,
909 b"notify::blocksize\0".as_ptr() as *const _,
910 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
911 notify_blocksize_trampoline::<F> as *const (),
912 )),
913 Box::into_raw(f),
914 )
915 }
916 }
917
918 #[doc(alias = "enable-last-sample")]
919 pub fn connect_enable_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
920 &self,
921 f: F,
922 ) -> glib::SignalHandlerId {
923 unsafe extern "C" fn notify_enable_last_sample_trampoline<
924 F: Fn(&AppSink) + Send + Sync + 'static,
925 >(
926 this: *mut ffi::GstAppSink,
927 _param_spec: glib::ffi::gpointer,
928 f: glib::ffi::gpointer,
929 ) {
930 unsafe {
931 let f: &F = &*(f as *const F);
932 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
933 }
934 }
935 unsafe {
936 let f: Box<F> = Box::new(f);
937 glib::signal::connect_raw(
938 self.as_ptr() as *mut _,
939 b"notify::enable-last-sample\0".as_ptr() as *const _,
940 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
941 notify_enable_last_sample_trampoline::<F> as *const (),
942 )),
943 Box::into_raw(f),
944 )
945 }
946 }
947
948 #[doc(alias = "last-sample")]
949 pub fn connect_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
950 &self,
951 f: F,
952 ) -> glib::SignalHandlerId {
953 unsafe extern "C" fn notify_last_sample_trampoline<
954 F: Fn(&AppSink) + Send + Sync + 'static,
955 >(
956 this: *mut ffi::GstAppSink,
957 _param_spec: glib::ffi::gpointer,
958 f: glib::ffi::gpointer,
959 ) {
960 unsafe {
961 let f: &F = &*(f as *const F);
962 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
963 }
964 }
965 unsafe {
966 let f: Box<F> = Box::new(f);
967 glib::signal::connect_raw(
968 self.as_ptr() as *mut _,
969 b"notify::last-sample\0".as_ptr() as *const _,
970 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
971 notify_last_sample_trampoline::<F> as *const (),
972 )),
973 Box::into_raw(f),
974 )
975 }
976 }
977
978 #[doc(alias = "max-bitrate")]
979 pub fn connect_max_bitrate_notify<F: Fn(&Self) + Send + Sync + 'static>(
980 &self,
981 f: F,
982 ) -> glib::SignalHandlerId {
983 unsafe extern "C" fn notify_max_bitrate_trampoline<
984 F: Fn(&AppSink) + Send + Sync + 'static,
985 >(
986 this: *mut ffi::GstAppSink,
987 _param_spec: glib::ffi::gpointer,
988 f: glib::ffi::gpointer,
989 ) {
990 unsafe {
991 let f: &F = &*(f as *const F);
992 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
993 }
994 }
995 unsafe {
996 let f: Box<F> = Box::new(f);
997 glib::signal::connect_raw(
998 self.as_ptr() as *mut _,
999 b"notify::max-bitrate\0".as_ptr() as *const _,
1000 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1001 notify_max_bitrate_trampoline::<F> as *const (),
1002 )),
1003 Box::into_raw(f),
1004 )
1005 }
1006 }
1007
1008 #[doc(alias = "max-lateness")]
1009 pub fn connect_max_lateness_notify<F: Fn(&Self) + Send + Sync + 'static>(
1010 &self,
1011 f: F,
1012 ) -> glib::SignalHandlerId {
1013 unsafe extern "C" fn notify_max_lateness_trampoline<
1014 F: Fn(&AppSink) + Send + Sync + 'static,
1015 >(
1016 this: *mut ffi::GstAppSink,
1017 _param_spec: glib::ffi::gpointer,
1018 f: glib::ffi::gpointer,
1019 ) {
1020 unsafe {
1021 let f: &F = &*(f as *const F);
1022 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1023 }
1024 }
1025 unsafe {
1026 let f: Box<F> = Box::new(f);
1027 glib::signal::connect_raw(
1028 self.as_ptr() as *mut _,
1029 b"notify::max-lateness\0".as_ptr() as *const _,
1030 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1031 notify_max_lateness_trampoline::<F> as *const (),
1032 )),
1033 Box::into_raw(f),
1034 )
1035 }
1036 }
1037
1038 #[cfg(feature = "v1_16")]
1039 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
1040 #[doc(alias = "processing-deadline")]
1041 pub fn connect_processing_deadline_notify<F: Fn(&Self) + Send + Sync + 'static>(
1042 &self,
1043 f: F,
1044 ) -> glib::SignalHandlerId {
1045 unsafe extern "C" fn notify_processing_deadline_trampoline<
1046 F: Fn(&AppSink) + Send + Sync + 'static,
1047 >(
1048 this: *mut ffi::GstAppSink,
1049 _param_spec: glib::ffi::gpointer,
1050 f: glib::ffi::gpointer,
1051 ) {
1052 unsafe {
1053 let f: &F = &*(f as *const F);
1054 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1055 }
1056 }
1057 unsafe {
1058 let f: Box<F> = Box::new(f);
1059 glib::signal::connect_raw(
1060 self.as_ptr() as *mut _,
1061 b"notify::processing-deadline\0".as_ptr() as *const _,
1062 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1063 notify_processing_deadline_trampoline::<F> as *const (),
1064 )),
1065 Box::into_raw(f),
1066 )
1067 }
1068 }
1069
1070 #[doc(alias = "qos")]
1071 pub fn connect_qos_notify<F: Fn(&Self) + Send + Sync + 'static>(
1072 &self,
1073 f: F,
1074 ) -> glib::SignalHandlerId {
1075 unsafe extern "C" fn notify_qos_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1076 this: *mut ffi::GstAppSink,
1077 _param_spec: glib::ffi::gpointer,
1078 f: glib::ffi::gpointer,
1079 ) {
1080 unsafe {
1081 let f: &F = &*(f as *const F);
1082 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1083 }
1084 }
1085 unsafe {
1086 let f: Box<F> = Box::new(f);
1087 glib::signal::connect_raw(
1088 self.as_ptr() as *mut _,
1089 b"notify::qos\0".as_ptr() as *const _,
1090 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1091 notify_qos_trampoline::<F> as *const (),
1092 )),
1093 Box::into_raw(f),
1094 )
1095 }
1096 }
1097
1098 #[doc(alias = "render-delay")]
1099 pub fn connect_render_delay_notify<F: Fn(&Self) + Send + Sync + 'static>(
1100 &self,
1101 f: F,
1102 ) -> glib::SignalHandlerId {
1103 unsafe extern "C" fn notify_render_delay_trampoline<
1104 F: Fn(&AppSink) + Send + Sync + 'static,
1105 >(
1106 this: *mut ffi::GstAppSink,
1107 _param_spec: glib::ffi::gpointer,
1108 f: glib::ffi::gpointer,
1109 ) {
1110 unsafe {
1111 let f: &F = &*(f as *const F);
1112 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1113 }
1114 }
1115 unsafe {
1116 let f: Box<F> = Box::new(f);
1117 glib::signal::connect_raw(
1118 self.as_ptr() as *mut _,
1119 b"notify::render-delay\0".as_ptr() as *const _,
1120 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1121 notify_render_delay_trampoline::<F> as *const (),
1122 )),
1123 Box::into_raw(f),
1124 )
1125 }
1126 }
1127
1128 #[cfg(feature = "v1_18")]
1129 #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
1130 #[doc(alias = "stats")]
1131 pub fn connect_stats_notify<F: Fn(&Self) + Send + Sync + 'static>(
1132 &self,
1133 f: F,
1134 ) -> glib::SignalHandlerId {
1135 unsafe extern "C" fn notify_stats_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1136 this: *mut ffi::GstAppSink,
1137 _param_spec: glib::ffi::gpointer,
1138 f: glib::ffi::gpointer,
1139 ) {
1140 unsafe {
1141 let f: &F = &*(f as *const F);
1142 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1143 }
1144 }
1145 unsafe {
1146 let f: Box<F> = Box::new(f);
1147 glib::signal::connect_raw(
1148 self.as_ptr() as *mut _,
1149 b"notify::stats\0".as_ptr() as *const _,
1150 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1151 notify_stats_trampoline::<F> as *const (),
1152 )),
1153 Box::into_raw(f),
1154 )
1155 }
1156 }
1157
1158 #[doc(alias = "sync")]
1159 pub fn connect_sync_notify<F: Fn(&Self) + Send + Sync + 'static>(
1160 &self,
1161 f: F,
1162 ) -> glib::SignalHandlerId {
1163 unsafe extern "C" fn notify_sync_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1164 this: *mut ffi::GstAppSink,
1165 _param_spec: glib::ffi::gpointer,
1166 f: glib::ffi::gpointer,
1167 ) {
1168 unsafe {
1169 let f: &F = &*(f as *const F);
1170 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1171 }
1172 }
1173 unsafe {
1174 let f: Box<F> = Box::new(f);
1175 glib::signal::connect_raw(
1176 self.as_ptr() as *mut _,
1177 b"notify::sync\0".as_ptr() as *const _,
1178 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1179 notify_sync_trampoline::<F> as *const (),
1180 )),
1181 Box::into_raw(f),
1182 )
1183 }
1184 }
1185
1186 #[doc(alias = "throttle-time")]
1187 pub fn connect_throttle_time_notify<F: Fn(&Self) + Send + Sync + 'static>(
1188 &self,
1189 f: F,
1190 ) -> glib::SignalHandlerId {
1191 unsafe extern "C" fn notify_throttle_time_trampoline<
1192 F: Fn(&AppSink) + Send + Sync + 'static,
1193 >(
1194 this: *mut ffi::GstAppSink,
1195 _param_spec: glib::ffi::gpointer,
1196 f: glib::ffi::gpointer,
1197 ) {
1198 unsafe {
1199 let f: &F = &*(f as *const F);
1200 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1201 }
1202 }
1203 unsafe {
1204 let f: Box<F> = Box::new(f);
1205 glib::signal::connect_raw(
1206 self.as_ptr() as *mut _,
1207 b"notify::throttle-time\0".as_ptr() as *const _,
1208 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1209 notify_throttle_time_trampoline::<F> as *const (),
1210 )),
1211 Box::into_raw(f),
1212 )
1213 }
1214 }
1215
1216 #[doc(alias = "ts-offset")]
1217 pub fn connect_ts_offset_notify<F: Fn(&Self) + Send + Sync + 'static>(
1218 &self,
1219 f: F,
1220 ) -> glib::SignalHandlerId {
1221 unsafe extern "C" fn notify_ts_offset_trampoline<
1222 F: Fn(&AppSink) + Send + Sync + 'static,
1223 >(
1224 this: *mut ffi::GstAppSink,
1225 _param_spec: glib::ffi::gpointer,
1226 f: glib::ffi::gpointer,
1227 ) {
1228 unsafe {
1229 let f: &F = &*(f as *const F);
1230 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1231 }
1232 }
1233 unsafe {
1234 let f: Box<F> = Box::new(f);
1235 glib::signal::connect_raw(
1236 self.as_ptr() as *mut _,
1237 b"notify::ts-offset\0".as_ptr() as *const _,
1238 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1239 notify_ts_offset_trampoline::<F> as *const (),
1240 )),
1241 Box::into_raw(f),
1242 )
1243 }
1244 }
1245
1246 pub fn stream(&self) -> AppSinkStream {
1247 AppSinkStream::new(self)
1248 }
1249}
1250
1251#[must_use = "The builder must be built to be used"]
1256pub struct AppSinkBuilder<'a> {
1257 builder: gst::gobject::GObjectBuilder<'a, AppSink>,
1258 callbacks: Option<AppSinkCallbacks>,
1259 drop_out_of_segment: Option<bool>,
1260}
1261
1262impl<'a> AppSinkBuilder<'a> {
1263 #[must_use = "Building the object from the builder is usually expensive and is not expected to have side effects"]
1271 pub fn build(self) -> AppSink {
1272 let appsink = self.builder.build().unwrap();
1273
1274 if let Some(callbacks) = self.callbacks {
1275 appsink.set_callbacks(callbacks);
1276 }
1277
1278 if let Some(drop_out_of_segment) = self.drop_out_of_segment {
1279 appsink.set_drop_out_of_segment(drop_out_of_segment);
1280 }
1281
1282 appsink
1283 }
1284
1285 pub fn async_(self, async_: bool) -> Self {
1286 Self {
1287 builder: self.builder.property("async", async_),
1288 ..self
1289 }
1290 }
1291
1292 pub fn buffer_list(self, buffer_list: bool) -> Self {
1293 Self {
1294 builder: self.builder.property("buffer-list", buffer_list),
1295 ..self
1296 }
1297 }
1298
1299 pub fn callbacks(self, callbacks: AppSinkCallbacks) -> Self {
1300 Self {
1301 callbacks: Some(callbacks),
1302 ..self
1303 }
1304 }
1305
1306 pub fn caps(self, caps: &'a gst::Caps) -> Self {
1307 Self {
1308 builder: self.builder.property("caps", caps),
1309 ..self
1310 }
1311 }
1312
1313 #[cfg_attr(feature = "v1_28", deprecated = "Since 1.28")]
1314 #[allow(deprecated)]
1315 pub fn drop(self, drop: bool) -> Self {
1316 Self {
1317 builder: self.builder.property("drop", drop),
1318 ..self
1319 }
1320 }
1321
1322 pub fn drop_out_of_segment(self, drop_out_of_segment: bool) -> Self {
1323 Self {
1324 drop_out_of_segment: Some(drop_out_of_segment),
1325 ..self
1326 }
1327 }
1328
1329 pub fn enable_last_sample(self, enable_last_sample: bool) -> Self {
1330 Self {
1331 builder: self
1332 .builder
1333 .property("enable-last-sample", enable_last_sample),
1334 ..self
1335 }
1336 }
1337
1338 pub fn max_bitrate(self, max_bitrate: u64) -> Self {
1339 Self {
1340 builder: self.builder.property("max-bitrate", max_bitrate),
1341 ..self
1342 }
1343 }
1344
1345 pub fn max_buffers(self, max_buffers: u32) -> Self {
1346 Self {
1347 builder: self.builder.property("max-buffers", max_buffers),
1348 ..self
1349 }
1350 }
1351
1352 pub fn max_lateness(self, max_lateness: i64) -> Self {
1353 Self {
1354 builder: self.builder.property("max-lateness", max_lateness),
1355 ..self
1356 }
1357 }
1358
1359 #[cfg(feature = "v1_16")]
1360 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
1361 pub fn processing_deadline(self, processing_deadline: gst::ClockTime) -> Self {
1362 Self {
1363 builder: self
1364 .builder
1365 .property("processing-deadline", processing_deadline),
1366 ..self
1367 }
1368 }
1369
1370 pub fn qos(self, qos: bool) -> Self {
1371 Self {
1372 builder: self.builder.property("qos", qos),
1373 ..self
1374 }
1375 }
1376
1377 pub fn render_delay(self, render_delay: Option<gst::ClockTime>) -> Self {
1378 Self {
1379 builder: self.builder.property("render-delay", render_delay),
1380 ..self
1381 }
1382 }
1383
1384 pub fn sync(self, sync: bool) -> Self {
1385 Self {
1386 builder: self.builder.property("sync", sync),
1387 ..self
1388 }
1389 }
1390
1391 pub fn throttle_time(self, throttle_time: u64) -> Self {
1392 Self {
1393 builder: self.builder.property("throttle-time", throttle_time),
1394 ..self
1395 }
1396 }
1397
1398 pub fn ts_offset(self, ts_offset: gst::ClockTimeDiff) -> Self {
1399 Self {
1400 builder: self.builder.property("ts-offset", ts_offset),
1401 ..self
1402 }
1403 }
1404
1405 pub fn wait_on_eos(self, wait_on_eos: bool) -> Self {
1406 Self {
1407 builder: self.builder.property("wait-on-eos", wait_on_eos),
1408 ..self
1409 }
1410 }
1411
1412 #[cfg(feature = "v1_24")]
1413 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1414 pub fn max_time(self, max_time: Option<gst::ClockTime>) -> Self {
1415 Self {
1416 builder: self.builder.property("max-time", max_time),
1417 ..self
1418 }
1419 }
1420
1421 #[cfg(feature = "v1_24")]
1422 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1423 pub fn max_bytes(self, max_bytes: u64) -> Self {
1424 Self {
1425 builder: self.builder.property("max-bytes", max_bytes),
1426 ..self
1427 }
1428 }
1429
1430 #[cfg(feature = "v1_28")]
1431 #[cfg_attr(docsrs, doc(cfg(feature = "v1_28")))]
1432 pub fn leaky_type(self, leaky_type: crate::AppLeakyType) -> Self {
1433 Self {
1434 builder: self.builder.property("leaky-type", leaky_type),
1435 ..self
1436 }
1437 }
1438
1439 #[cfg(feature = "v1_28")]
1440 #[cfg_attr(docsrs, doc(cfg(feature = "v1_28")))]
1441 pub fn silent(self, silent: bool) -> Self {
1442 Self {
1443 builder: self.builder.property("silent", silent),
1444 ..self
1445 }
1446 }
1447
1448 #[inline]
1453 pub fn property(self, name: &'a str, value: impl Into<glib::Value> + 'a) -> Self {
1454 Self {
1455 builder: self.builder.property(name, value),
1456 ..self
1457 }
1458 }
1459
1460 #[inline]
1463 pub fn property_from_str(self, name: &'a str, value: &'a str) -> Self {
1464 Self {
1465 builder: self.builder.property_from_str(name, value),
1466 ..self
1467 }
1468 }
1469
1470 gst::impl_builder_gvalue_extra_setters!(property_and_name);
1471}
1472
1473#[derive(Debug)]
1474pub struct AppSinkStream {
1475 app_sink: glib::WeakRef<AppSink>,
1476 waker_reference: Arc<Mutex<Option<Waker>>>,
1477}
1478
1479impl AppSinkStream {
1480 fn new(app_sink: &AppSink) -> Self {
1481 skip_assert_initialized!();
1482
1483 let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
1484
1485 app_sink.set_callbacks(
1486 AppSinkCallbacks::builder()
1487 .new_sample({
1488 let waker_reference = Arc::clone(&waker_reference);
1489
1490 move |_| {
1491 if let Some(waker) = waker_reference.lock().unwrap().take() {
1492 waker.wake();
1493 }
1494
1495 Ok(gst::FlowSuccess::Ok)
1496 }
1497 })
1498 .eos({
1499 let waker_reference = Arc::clone(&waker_reference);
1500
1501 move |_| {
1502 if let Some(waker) = waker_reference.lock().unwrap().take() {
1503 waker.wake();
1504 }
1505 }
1506 })
1507 .build(),
1508 );
1509
1510 Self {
1511 app_sink: app_sink.downgrade(),
1512 waker_reference,
1513 }
1514 }
1515}
1516
1517impl Drop for AppSinkStream {
1518 fn drop(&mut self) {
1519 #[cfg(not(feature = "v1_18"))]
1520 {
1521 if gst::version() >= (1, 16, 3, 0)
1524 && let Some(app_sink) = self.app_sink.upgrade()
1525 {
1526 app_sink.set_callbacks(AppSinkCallbacks::builder().build());
1527 }
1528 }
1529 }
1530}
1531
1532impl Stream for AppSinkStream {
1533 type Item = gst::Sample;
1534
1535 fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
1536 let mut waker = self.waker_reference.lock().unwrap();
1537
1538 let Some(app_sink) = self.app_sink.upgrade() else {
1539 return Poll::Ready(None);
1540 };
1541
1542 app_sink
1543 .try_pull_sample(gst::ClockTime::ZERO)
1544 .map(|sample| Poll::Ready(Some(sample)))
1545 .unwrap_or_else(|| {
1546 if app_sink.is_eos() {
1547 return Poll::Ready(None);
1548 }
1549
1550 waker.replace(context.waker().to_owned());
1551
1552 Poll::Pending
1553 })
1554 }
1555}
1556
1557#[cfg(test)]
1558mod tests {
1559 use futures_util::StreamExt;
1560 use gst::prelude::*;
1561
1562 use super::*;
1563
1564 #[test]
1565 fn test_app_sink_stream() {
1566 gst::init().unwrap();
1567
1568 let videotestsrc = gst::ElementFactory::make("videotestsrc")
1569 .property("num-buffers", 5)
1570 .build()
1571 .unwrap();
1572 let appsink = gst::ElementFactory::make("appsink").build().unwrap();
1573
1574 let pipeline = gst::Pipeline::new();
1575 pipeline.add(&videotestsrc).unwrap();
1576 pipeline.add(&appsink).unwrap();
1577
1578 videotestsrc.link(&appsink).unwrap();
1579
1580 let app_sink_stream = appsink.dynamic_cast::<AppSink>().unwrap().stream();
1581 let samples_future = app_sink_stream.collect::<Vec<gst::Sample>>();
1582
1583 pipeline.set_state(gst::State::Playing).unwrap();
1584 let samples = futures_executor::block_on(samples_future);
1585 pipeline.set_state(gst::State::Null).unwrap();
1586
1587 assert_eq!(samples.len(), 5);
1588 }
1589}