1use std::{
2 collections::HashMap,
3 mem,
4 sync::{atomic, Arc, Mutex, MutexGuard},
5};
6
7use gst::{glib, prelude::*};
8use std::sync::LazyLock;
9use thiserror::Error;
10
11#[derive(Debug)]
15struct WrappedAtomicU64 {
16 #[cfg(not(target_has_atomic = "64"))]
17 atomic: Mutex<u64>,
18 #[cfg(target_has_atomic = "64")]
19 atomic: atomic::AtomicU64,
20}
21
22#[cfg(target_has_atomic = "64")]
23impl WrappedAtomicU64 {
24 fn new(value: u64) -> WrappedAtomicU64 {
25 WrappedAtomicU64 {
26 atomic: atomic::AtomicU64::new(value),
27 }
28 }
29 fn fetch_add(&self, value: u64, order: atomic::Ordering) -> u64 {
30 self.atomic.fetch_add(value, order)
31 }
32 fn store(&self, value: u64, order: atomic::Ordering) {
33 self.atomic.store(value, order);
34 }
35
36 fn load(&self, order: atomic::Ordering) -> u64 {
37 self.atomic.load(order)
38 }
39}
40
41#[cfg(not(target_has_atomic = "64"))]
42impl WrappedAtomicU64 {
43 fn new(value: u64) -> WrappedAtomicU64 {
44 WrappedAtomicU64 {
45 atomic: Mutex::new(value),
46 }
47 }
48 fn fetch_add(&self, value: u64, _order: atomic::Ordering) -> u64 {
49 let mut guard = self.atomic.lock().unwrap();
50 let old = *guard;
51 *guard += value;
52 old
53 }
54 fn store(&self, value: u64, _order: atomic::Ordering) {
55 *self.atomic.lock().unwrap() = value;
56 }
57 fn load(&self, _order: atomic::Ordering) -> u64 {
58 *self.atomic.lock().unwrap()
59 }
60}
61
62static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
63 gst::DebugCategory::new(
64 "utilsrs-stream-producer",
65 gst::DebugColorFlags::empty(),
66 Some("gst_app Stream Producer interface"),
67 )
68});
69
70#[derive(Debug, Clone)]
76pub struct StreamProducer(Arc<StreamProducerInner>);
77
78impl PartialEq for StreamProducer {
79 fn eq(&self, other: &Self) -> bool {
80 self.0.appsink.eq(&other.0.appsink)
81 }
82}
83
84impl Eq for StreamProducer {}
85
86#[derive(Debug)]
87struct StreamProducerInner {
88 appsink: gst_app::AppSink,
90 appsink_probe_id: Option<gst::PadProbeId>,
92 consumers: Arc<Mutex<StreamConsumers>>,
94}
95
96impl Drop for StreamProducerInner {
97 fn drop(&mut self) {
98 if let Some(probe_id) = self.appsink_probe_id.take() {
99 let pad = self.appsink.static_pad("sink").unwrap();
100 pad.remove_probe(probe_id);
101 }
102
103 self.appsink
104 .set_callbacks(gst_app::AppSinkCallbacks::builder().build());
105 }
106}
107
108#[derive(Debug)]
111#[must_use]
112pub struct ConsumptionLink {
113 consumer: gst_app::AppSrc,
114 producer: Option<StreamProducer>,
115 dropped: Arc<WrappedAtomicU64>,
117 pushed: Arc<WrappedAtomicU64>,
119 discard: Arc<atomic::AtomicBool>,
121 wait_for_keyframe: Arc<atomic::AtomicBool>,
123}
124
125impl ConsumptionLink {
126 pub fn disconnected(consumer: gst_app::AppSrc) -> ConsumptionLink {
128 StreamProducer::configure_consumer(&consumer);
129
130 ConsumptionLink {
131 consumer,
132 producer: None,
133 dropped: Arc::new(WrappedAtomicU64::new(0)),
134 pushed: Arc::new(WrappedAtomicU64::new(0)),
135 discard: Arc::new(atomic::AtomicBool::new(false)),
136 wait_for_keyframe: Arc::new(atomic::AtomicBool::new(true)),
137 }
138 }
139
140 pub fn change_producer(
142 &mut self,
143 new_producer: &StreamProducer,
144 reset_stats: bool,
145 ) -> Result<(), AddConsumerError> {
146 self.disconnect();
147 if reset_stats {
148 self.dropped.store(0, atomic::Ordering::SeqCst);
149 self.pushed.store(0, atomic::Ordering::SeqCst);
150 }
151 new_producer.add_consumer_internal(
152 &self.consumer,
153 false,
154 self.dropped.clone(),
155 self.pushed.clone(),
156 self.discard.clone(),
157 self.wait_for_keyframe.clone(),
158 )?;
159 self.producer = Some(new_producer.clone());
160 Ok(())
161 }
162
163 pub fn disconnect(&mut self) {
165 if let Some(producer) = self.producer.take() {
166 producer.remove_consumer(&self.consumer);
167 }
168 }
169
170 pub fn dropped(&self) -> u64 {
172 self.dropped.load(atomic::Ordering::SeqCst)
173 }
174
175 pub fn pushed(&self) -> u64 {
177 self.pushed.load(atomic::Ordering::SeqCst)
178 }
179
180 pub fn discard(&self) -> bool {
182 self.discard.load(atomic::Ordering::SeqCst)
183 }
184
185 pub fn set_discard(&self, discard: bool) {
187 self.discard.store(discard, atomic::Ordering::SeqCst)
188 }
189
190 pub fn wait_for_keyframe(&self) -> bool {
192 self.wait_for_keyframe.load(atomic::Ordering::SeqCst)
193 }
194
195 pub fn set_wait_for_keyframe(&self, wait: bool) {
198 self.wait_for_keyframe.store(wait, atomic::Ordering::SeqCst)
199 }
200
201 pub fn appsrc(&self) -> &gst_app::AppSrc {
203 &self.consumer
204 }
205
206 pub fn stream_producer(&self) -> Option<&StreamProducer> {
208 self.producer.as_ref()
209 }
210}
211
212impl Drop for ConsumptionLink {
213 fn drop(&mut self) {
214 self.disconnect();
215 }
216}
217
218#[derive(Debug, Error)]
219pub enum AddConsumerError {
221 #[error("Consumer already added")]
222 AlreadyAdded,
224}
225
226impl StreamProducer {
227 pub fn configure_consumer(consumer: &gst_app::AppSrc) {
231 consumer.set_latency(gst::ClockTime::ZERO, gst::ClockTime::NONE);
234 consumer.set_format(gst::Format::Time);
235 consumer.set_is_live(true);
236 consumer.set_handle_segment_change(true);
237 consumer.set_max_buffers(0);
238 consumer.set_max_bytes(0);
239 consumer.set_max_time(500 * gst::ClockTime::MSECOND);
240 consumer.set_leaky_type(gst_app::AppLeakyType::Downstream);
241 consumer.set_automatic_eos(false);
242 }
243
244 pub fn add_consumer(
248 &self,
249 consumer: &gst_app::AppSrc,
250 ) -> Result<ConsumptionLink, AddConsumerError> {
251 let dropped = Arc::new(WrappedAtomicU64::new(0));
252 let pushed = Arc::new(WrappedAtomicU64::new(0));
253 let discard = Arc::new(atomic::AtomicBool::new(false));
254 let wait_for_keyframe = Arc::new(atomic::AtomicBool::new(true));
255
256 self.add_consumer_internal(
257 consumer,
258 true,
259 dropped.clone(),
260 pushed.clone(),
261 discard.clone(),
262 wait_for_keyframe.clone(),
263 )?;
264
265 Ok(ConsumptionLink {
266 consumer: consumer.clone(),
267 producer: Some(self.clone()),
268 dropped,
269 pushed,
270 discard,
271 wait_for_keyframe,
272 })
273 }
274
275 fn add_consumer_internal(
276 &self,
277 consumer: &gst_app::AppSrc,
278 configure_consumer: bool,
279 dropped: Arc<WrappedAtomicU64>,
280 pushed: Arc<WrappedAtomicU64>,
281 discard: Arc<atomic::AtomicBool>,
282 wait_for_keyframe: Arc<atomic::AtomicBool>,
283 ) -> Result<(), AddConsumerError> {
284 let mut consumers = self.0.consumers.lock().unwrap();
285 if consumers.consumers.contains_key(consumer) {
286 gst::error!(
287 CAT,
288 obj = &self.0.appsink,
289 "Consumer {} ({:?}) already added",
290 consumer.name(),
291 consumer
292 );
293 return Err(AddConsumerError::AlreadyAdded);
294 }
295
296 gst::debug!(
297 CAT,
298 obj = &self.0.appsink,
299 "Adding consumer {} ({:?})",
300 consumer.name(),
301 consumer
302 );
303
304 if configure_consumer {
305 Self::configure_consumer(consumer);
306 }
307
308 let srcpad = consumer.static_pad("src").unwrap();
310 let fku_probe_id = srcpad
311 .add_probe(
312 gst::PadProbeType::EVENT_UPSTREAM,
313 glib::clone!(
314 #[weak(rename_to = appsink)]
315 self.0.appsink,
316 #[upgrade_or_panic]
317 move |_pad, info| {
318 let Some(event) = info.event() else {
319 return gst::PadProbeReturn::Ok;
320 };
321
322 if gst_video::UpstreamForceKeyUnitEvent::parse(event).is_ok() {
323 gst::debug!(CAT, obj = &appsink, "Requesting keyframe");
324 let pad = appsink.static_pad("sink").unwrap();
326 let _ = pad.push_event(event.clone());
327 }
328
329 gst::PadProbeReturn::Ok
330 }
331 ),
332 )
333 .unwrap();
334
335 let stream_consumer = StreamConsumer::new(
336 consumer,
337 fku_probe_id,
338 dropped,
339 pushed,
340 discard,
341 wait_for_keyframe,
342 );
343
344 consumers
345 .consumers
346 .insert(consumer.clone(), stream_consumer);
347
348 let events_to_forward = consumers.events_to_forward.clone();
351 drop(consumers);
353
354 let appsink_pad = self.0.appsink.static_pad("sink").unwrap();
355 appsink_pad.sticky_events_foreach(|event| {
356 if events_to_forward.contains(&event.type_()) {
357 gst::debug!(
358 CAT,
359 obj = &self.0.appsink,
360 "forward sticky event {:?}",
361 event
362 );
363 consumer.send_event(event.clone());
364 }
365
366 std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
367 });
368
369 Ok(())
370 }
371
372 fn process_sample(
373 sample: gst::Sample,
374 appsink: &gst_app::AppSink,
375 mut consumers: MutexGuard<StreamConsumers>,
376 ) -> Result<gst::FlowSuccess, gst::FlowError> {
377 let (is_discont, is_keyframe) = if let Some(buf) = sample.buffer() {
378 let flags = buf.flags();
379
380 (
381 flags.contains(gst::BufferFlags::DISCONT),
382 !flags.contains(gst::BufferFlags::DELTA_UNIT),
383 )
384 } else {
385 (false, true)
386 };
387
388 gst::trace!(
389 CAT,
390 obj = appsink,
391 "processing sample {:?}",
392 sample.buffer()
393 );
394
395 let latency = consumers.current_latency;
396 let latency_updated = mem::replace(&mut consumers.latency_updated, false);
397
398 let mut needs_keyframe_request = false;
399
400 let current_consumers = consumers
401 .consumers
402 .values()
403 .filter_map(|consumer| {
404 if let Some(latency) = latency {
405 if consumer
406 .forwarded_latency
407 .compare_exchange(
408 false,
409 true,
410 atomic::Ordering::SeqCst,
411 atomic::Ordering::SeqCst,
412 )
413 .is_ok()
414 || latency_updated
415 {
416 gst::info!(CAT, obj = appsink, "setting new latency: {latency}");
417 consumer.appsrc.set_latency(latency, gst::ClockTime::NONE);
418 }
419 }
420
421 if consumer.discard.load(atomic::Ordering::SeqCst) {
422 consumer
423 .needs_keyframe
424 .store(true, atomic::Ordering::SeqCst);
425 return None;
426 }
427
428 if is_discont
429 && !is_keyframe
430 && consumer.wait_for_keyframe.load(atomic::Ordering::SeqCst)
431 {
432 consumer
434 .needs_keyframe
435 .store(true, atomic::Ordering::SeqCst);
436 }
437
438 if !is_keyframe && consumer.needs_keyframe.load(atomic::Ordering::SeqCst) {
439 if !needs_keyframe_request {
441 gst::debug!(CAT, obj = appsink, "Requesting keyframe for first buffer");
442 needs_keyframe_request = true;
443 }
444
445 consumer.dropped.fetch_add(1, atomic::Ordering::SeqCst);
446
447 gst::error!(
448 CAT,
449 obj = appsink,
450 "Ignoring frame for {} while waiting for a keyframe",
451 consumer.appsrc.name()
452 );
453 None
454 } else {
455 consumer
456 .needs_keyframe
457 .store(false, atomic::Ordering::SeqCst);
458 consumer.pushed.fetch_add(1, atomic::Ordering::SeqCst);
459
460 Some(consumer.appsrc.clone())
461 }
462 })
463 .collect::<Vec<_>>();
464
465 drop(consumers);
466
467 if needs_keyframe_request {
468 let pad = appsink.static_pad("sink").unwrap();
470 pad.push_event(
471 gst_video::UpstreamForceKeyUnitEvent::builder()
472 .all_headers(true)
473 .build(),
474 );
475 }
476
477 for consumer in current_consumers {
478 if let Err(err) = consumer.push_sample(&sample) {
479 gst::warning!(CAT, obj = appsink, "Failed to push sample: {}", err);
480 }
481 }
482 Ok(gst::FlowSuccess::Ok)
483 }
484
485 pub fn remove_consumer(&self, consumer: &gst_app::AppSrc) {
487 let name = consumer.name();
488 if self
489 .0
490 .consumers
491 .lock()
492 .unwrap()
493 .consumers
494 .remove(consumer)
495 .is_some()
496 {
497 gst::debug!(
498 CAT,
499 obj = &self.0.appsink,
500 "Removed consumer {} ({:?})",
501 name,
502 consumer
503 );
504 consumer.set_callbacks(gst_app::AppSrcCallbacks::builder().build());
505 } else {
506 gst::debug!(
507 CAT,
508 obj = &self.0.appsink,
509 "Consumer {} ({:?}) not found",
510 name,
511 consumer
512 );
513 }
514 }
515
516 pub fn set_forward_events(&self, events_to_forward: impl IntoIterator<Item = gst::EventType>) {
518 self.0.consumers.lock().unwrap().events_to_forward =
519 events_to_forward.into_iter().collect();
520 }
521
522 pub fn get_forwarded_events(&self) -> Vec<gst::EventType> {
524 self.0.consumers.lock().unwrap().events_to_forward.clone()
525 }
526
527 pub fn set_forward_preroll(&self, forward_preroll: bool) {
529 self.0.consumers.lock().unwrap().forward_preroll = forward_preroll;
530 }
531
532 pub fn appsink(&self) -> &gst_app::AppSink {
534 &self.0.appsink
535 }
536
537 pub fn error(&self, error: &gst::glib::Error, debug: Option<&str>) {
539 let consumers = self.0.consumers.lock().unwrap();
540
541 for consumer in consumers.consumers.keys() {
542 let mut msg_builder =
543 gst::message::Error::builder_from_error(error.clone()).src(consumer);
544 if let Some(debug) = debug {
545 msg_builder = msg_builder.debug(debug);
546 }
547
548 let _ = consumer.post_message(msg_builder.build());
549 }
550 }
551
552 pub fn last_sample(&self) -> Option<gst::Sample> {
554 self.0.appsink.property("last-sample")
555 }
556}
557
558impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
559 fn from(appsink: &'a gst_app::AppSink) -> Self {
560 let consumers = Arc::new(Mutex::new(StreamConsumers {
561 current_latency: None,
562 latency_updated: false,
563 consumers: HashMap::new(),
564 events_to_forward: vec![gst::EventType::Eos, gst::EventType::Gap],
567 forward_preroll: true,
568 just_forwarded_preroll: false,
569 }));
570
571 appsink.set_callbacks(
572 gst_app::AppSinkCallbacks::builder()
573 .new_sample(glib::clone!(
574 #[strong]
575 consumers,
576 move |appsink| {
577 let mut consumers = consumers.lock().unwrap();
578
579 let sample = match appsink.pull_sample() {
580 Ok(sample) => sample,
581 Err(_err) => {
582 gst::debug!(CAT, obj = appsink, "Failed to pull sample");
583 return Err(gst::FlowError::Flushing);
584 }
585 };
586
587 let just_forwarded_preroll =
588 mem::replace(&mut consumers.just_forwarded_preroll, false);
589
590 if just_forwarded_preroll {
591 return Ok(gst::FlowSuccess::Ok);
592 }
593
594 StreamProducer::process_sample(sample, appsink, consumers)
595 }
596 ))
597 .new_preroll(glib::clone!(
598 #[strong]
599 consumers,
600 move |appsink| {
601 let mut consumers = consumers.lock().unwrap();
602
603 let sample = match appsink.pull_preroll() {
604 Ok(sample) => sample,
605 Err(_err) => {
606 gst::debug!(CAT, obj = appsink, "Failed to pull preroll");
607 return Err(gst::FlowError::Flushing);
608 }
609 };
610
611 if consumers.forward_preroll {
612 consumers.just_forwarded_preroll = true;
613
614 StreamProducer::process_sample(sample, appsink, consumers)
615 } else {
616 Ok(gst::FlowSuccess::Ok)
617 }
618 }
619 ))
620 .new_event(glib::clone!(
621 #[strong]
622 consumers,
623 move |appsink| {
624 match appsink
625 .pull_object()
626 .map(|obj| obj.downcast::<gst::Event>())
627 {
628 Ok(Ok(event)) => {
629 let (events_to_forward, appsrcs) = {
630 let consumers = consumers.lock().unwrap();
632 let events = consumers.events_to_forward.clone();
633 let appsrcs =
634 consumers.consumers.keys().cloned().collect::<Vec<_>>();
635
636 (events, appsrcs)
637 };
638
639 if events_to_forward.contains(&event.type_()) {
640 for appsrc in appsrcs {
641 appsrc.send_event(event.clone());
642 }
643 }
644 }
645 Ok(Err(_)) => {} Err(_err) => gst::warning!(CAT, obj = appsink, "Failed to pull event"),
647 }
648
649 false
650 }
651 ))
652 .eos(glib::clone!(
653 #[strong]
654 consumers,
655 move |appsink| {
656 let stream_consumers = consumers.lock().unwrap();
657
658 if stream_consumers
659 .events_to_forward
660 .contains(&gst::EventType::Eos)
661 {
662 let current_consumers = stream_consumers
663 .consumers
664 .values()
665 .map(|c| c.appsrc.clone())
666 .collect::<Vec<_>>();
667 drop(stream_consumers);
668
669 for consumer in current_consumers {
670 gst::debug!(
671 CAT,
672 obj = appsink,
673 "set EOS on consumer {}",
674 consumer.name()
675 );
676 let _ = consumer.end_of_stream();
677 }
678 } else {
679 gst::debug!(CAT, obj = appsink, "don't forward EOS to consumers");
680 }
681 }
682 ))
683 .build(),
684 );
685
686 let sinkpad = appsink.static_pad("sink").unwrap();
687 let appsink_probe_id = sinkpad
688 .add_probe(
689 gst::PadProbeType::EVENT_UPSTREAM,
690 glib::clone!(
691 #[strong]
692 consumers,
693 move |_pad, info| {
694 let Some(event) = info.event() else {
695 return gst::PadProbeReturn::Ok;
696 };
697
698 let gst::EventView::Latency(event) = event.view() else {
699 return gst::PadProbeReturn::Ok;
700 };
701
702 let latency = event.latency();
703 let mut consumers = consumers.lock().unwrap();
704 consumers.current_latency = Some(latency);
705 consumers.latency_updated = true;
706
707 gst::PadProbeReturn::Ok
708 }
709 ),
710 )
711 .unwrap();
712
713 StreamProducer(Arc::new(StreamProducerInner {
714 appsink: appsink.clone(),
715 appsink_probe_id: Some(appsink_probe_id),
716 consumers,
717 }))
718 }
719}
720
721#[derive(Debug)]
724struct StreamConsumers {
725 current_latency: Option<gst::ClockTime>,
727 latency_updated: bool,
729 consumers: HashMap<gst_app::AppSrc, StreamConsumer>,
731 events_to_forward: Vec<gst::EventType>,
733 forward_preroll: bool,
735 just_forwarded_preroll: bool,
739}
740
741#[derive(Debug)]
743struct StreamConsumer {
744 appsrc: gst_app::AppSrc,
746 fku_probe_id: Option<gst::PadProbeId>,
748 forwarded_latency: atomic::AtomicBool,
750 needs_keyframe: Arc<atomic::AtomicBool>,
754 dropped: Arc<WrappedAtomicU64>,
756 pushed: Arc<WrappedAtomicU64>,
758 discard: Arc<atomic::AtomicBool>,
760 wait_for_keyframe: Arc<atomic::AtomicBool>,
762}
763
764impl StreamConsumer {
765 fn new(
767 appsrc: &gst_app::AppSrc,
768 fku_probe_id: gst::PadProbeId,
769 dropped: Arc<WrappedAtomicU64>,
770 pushed: Arc<WrappedAtomicU64>,
771 discard: Arc<atomic::AtomicBool>,
772 wait_for_keyframe: Arc<atomic::AtomicBool>,
773 ) -> Self {
774 let needs_keyframe = Arc::new(atomic::AtomicBool::new(
775 wait_for_keyframe.load(atomic::Ordering::SeqCst),
776 ));
777 let needs_keyframe_clone = needs_keyframe.clone();
778 let wait_for_keyframe_clone = wait_for_keyframe.clone();
779 let dropped_clone = dropped.clone();
780
781 appsrc.set_callbacks(
782 gst_app::AppSrcCallbacks::builder()
783 .enough_data(move |appsrc| {
784 gst::debug!(
785 CAT,
786 obj = appsrc,
787 "consumer {} ({:?}) is not consuming fast enough, old samples are getting dropped",
788 appsrc.name(),
789 appsrc,
790 );
791
792 needs_keyframe_clone.store(wait_for_keyframe_clone.load(atomic::Ordering::SeqCst), atomic::Ordering::SeqCst);
793 dropped_clone.fetch_add(1, atomic::Ordering::SeqCst);
794 })
795 .build(),
796 );
797
798 StreamConsumer {
799 appsrc: appsrc.clone(),
800 fku_probe_id: Some(fku_probe_id),
801 forwarded_latency: atomic::AtomicBool::new(false),
802 needs_keyframe,
803 dropped,
804 pushed,
805 discard,
806 wait_for_keyframe,
807 }
808 }
809}
810
811impl Drop for StreamConsumer {
812 fn drop(&mut self) {
813 if let Some(fku_probe_id) = self.fku_probe_id.take() {
814 let srcpad = self.appsrc.static_pad("src").unwrap();
815 srcpad.remove_probe(fku_probe_id);
816 }
817 }
818}
819
820impl PartialEq for StreamConsumer {
821 fn eq(&self, other: &Self) -> bool {
822 self.appsrc.eq(&other.appsrc)
823 }
824}
825
826impl Eq for StreamConsumer {}
827
828impl std::hash::Hash for StreamConsumer {
829 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
830 std::hash::Hash::hash(&self.appsrc, state);
831 }
832}
833
834impl std::borrow::Borrow<gst_app::AppSrc> for StreamConsumer {
835 #[inline]
836 fn borrow(&self) -> &gst_app::AppSrc {
837 &self.appsrc
838 }
839}
840
841#[cfg(test)]
842mod tests {
843 use std::{
844 str::FromStr,
845 sync::{Arc, Mutex},
846 };
847
848 use futures::{
849 channel::{mpsc, mpsc::Receiver},
850 SinkExt, StreamExt,
851 };
852 use gst::prelude::*;
853
854 use crate::{ConsumptionLink, StreamProducer};
855
856 fn create_producer() -> (
857 gst::Pipeline,
858 gst_app::AppSrc,
859 gst_app::AppSink,
860 StreamProducer,
861 ) {
862 let producer_pipe =
863 gst::parse::launch("appsrc name=producer_src ! appsink name=producer_sink")
864 .unwrap()
865 .downcast::<gst::Pipeline>()
866 .unwrap();
867 let producer_sink = producer_pipe
868 .by_name("producer_sink")
869 .unwrap()
870 .downcast::<gst_app::AppSink>()
871 .unwrap();
872
873 (
874 producer_pipe.clone(),
875 producer_pipe
876 .by_name("producer_src")
877 .unwrap()
878 .downcast::<gst_app::AppSrc>()
879 .unwrap(),
880 producer_sink.clone(),
881 StreamProducer::from(&producer_sink),
882 )
883 }
884
885 struct Consumer {
886 pipeline: gst::Pipeline,
887 src: gst_app::AppSrc,
888 sink: gst_app::AppSink,
889 receiver: Mutex<Receiver<gst::Sample>>,
890 connected: Mutex<bool>,
891 }
892
893 impl Consumer {
894 fn new(id: &str) -> Self {
895 let pipeline = gst::parse::launch(&format!("appsrc name={id} ! appsink name=sink"))
896 .unwrap()
897 .downcast::<gst::Pipeline>()
898 .unwrap();
899
900 let (sender, receiver) = mpsc::channel::<gst::Sample>(1000);
901 let sender = Arc::new(Mutex::new(sender));
902 let sink = pipeline
903 .by_name("sink")
904 .unwrap()
905 .downcast::<gst_app::AppSink>()
906 .unwrap();
907
908 sink.set_callbacks(
909 gst_app::AppSinkCallbacks::builder()
910 .new_sample(move |appsink| {
912 let sender_clone = sender.clone();
914 futures::executor::block_on(
915 sender_clone
916 .lock()
917 .unwrap()
918 .send(appsink.pull_sample().unwrap()),
919 )
920 .unwrap();
921
922 Ok(gst::FlowSuccess::Ok)
923 })
924 .build(),
925 );
926
927 Self {
928 pipeline: pipeline.clone(),
929 src: pipeline
930 .by_name(id)
931 .unwrap()
932 .downcast::<gst_app::AppSrc>()
933 .unwrap(),
934 sink,
935 receiver: Mutex::new(receiver),
936 connected: Mutex::new(false),
937 }
938 }
939
940 fn connect(&self, producer: &StreamProducer) -> ConsumptionLink {
941 {
942 let mut connected = self.connected.lock().unwrap();
943 *connected = true;
944 }
945
946 producer.add_consumer(&self.src).unwrap()
947 }
948
949 fn disconnect(&self, producer: &StreamProducer) {
950 {
951 let mut connected = self.connected.lock().unwrap();
952 *connected = false;
953 }
954
955 producer.remove_consumer(&self.src);
956 }
957 }
958
959 #[test]
960 fn simple() {
961 gst::init().unwrap();
962
963 let (producer_pipe, producer_src, _producer_sink, producer) = create_producer();
964 producer_pipe
965 .set_state(gst::State::Playing)
966 .expect("Couldn't set producer pipeline state");
967
968 let mut consumers: Vec<Consumer> = Vec::new();
969 let consumer = Consumer::new("consumer1");
970 let link1 = consumer.connect(&producer);
971 consumer
972 .pipeline
973 .set_state(gst::State::Playing)
974 .expect("Couldn't set producer pipeline state");
975 consumers.push(consumer);
976
977 let consumer = Consumer::new("consumer2");
978 let link2 = consumer.connect(&producer);
979 consumer
980 .pipeline
981 .set_state(gst::State::Playing)
982 .expect("Couldn't set producer pipeline state");
983 consumers.push(consumer);
984
985 assert!(producer.last_sample().is_none());
986
987 for i in 0..10 {
988 let caps = gst::Caps::from_str(&format!("test,n={i}")).unwrap();
989 producer_src.set_caps(Some(&caps));
990 producer_src.push_buffer(gst::Buffer::new()).unwrap();
991
992 for consumer in &consumers {
993 if *consumer.connected.lock().unwrap() {
994 let sample =
995 futures::executor::block_on(consumer.receiver.lock().unwrap().next())
996 .expect("Received an empty buffer?");
997 sample.buffer().expect("No buffer on the sample?");
998 assert_eq!(sample.caps(), Some(caps.as_ref()));
999 } else {
1000 debug_assert!(
1001 consumer
1002 .sink
1003 .try_pull_sample(gst::ClockTime::from_nseconds(0))
1004 .is_none(),
1005 "Disconnected consumer got a new sample?!"
1006 );
1007 }
1008 }
1009
1010 if i == 5 {
1011 consumers.first().unwrap().disconnect(&producer);
1012 }
1013 }
1014
1015 assert!(producer.last_sample().is_some());
1016
1017 assert_eq!(link1.pushed(), 6);
1018 assert_eq!(link1.dropped(), 0);
1019 assert_eq!(link2.pushed(), 10);
1020 assert_eq!(link2.dropped(), 0);
1021 }
1022}