1use std::{
2 collections::HashMap,
3 mem,
4 sync::{atomic, Arc, Mutex, MutexGuard},
5};
6
7use gst::{glib, prelude::*};
8use once_cell::sync::Lazy;
9use thiserror::Error;
10
11#[derive(Debug)]
15struct WrappedAtomicU64 {
16 #[cfg(not(target_has_atomic = "64"))]
17 atomic: Mutex<u64>,
18 #[cfg(target_has_atomic = "64")]
19 atomic: atomic::AtomicU64,
20}
21
22#[cfg(target_has_atomic = "64")]
23impl WrappedAtomicU64 {
24 fn new(value: u64) -> WrappedAtomicU64 {
25 WrappedAtomicU64 {
26 atomic: atomic::AtomicU64::new(value),
27 }
28 }
29 fn fetch_add(&self, value: u64, order: atomic::Ordering) -> u64 {
30 self.atomic.fetch_add(value, order)
31 }
32 fn store(&self, value: u64, order: atomic::Ordering) {
33 self.atomic.store(value, order);
34 }
35
36 fn load(&self, order: atomic::Ordering) -> u64 {
37 self.atomic.load(order)
38 }
39}
40
41#[cfg(not(target_has_atomic = "64"))]
42impl WrappedAtomicU64 {
43 fn new(value: u64) -> WrappedAtomicU64 {
44 WrappedAtomicU64 {
45 atomic: Mutex::new(value),
46 }
47 }
48 fn fetch_add(&self, value: u64, _order: atomic::Ordering) -> u64 {
49 let mut guard = self.atomic.lock().unwrap();
50 let old = *guard;
51 *guard += value;
52 old
53 }
54 fn store(&self, value: u64, _order: atomic::Ordering) {
55 *self.atomic.lock().unwrap() = value;
56 }
57 fn load(&self, _order: atomic::Ordering) -> u64 {
58 *self.atomic.lock().unwrap()
59 }
60}
61
62static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
63 gst::DebugCategory::new(
64 "utilsrs-stream-producer",
65 gst::DebugColorFlags::empty(),
66 Some("gst_app Stream Producer interface"),
67 )
68});
69
70#[derive(Debug, Clone)]
76pub struct StreamProducer(Arc<StreamProducerInner>);
77
78impl PartialEq for StreamProducer {
79 fn eq(&self, other: &Self) -> bool {
80 self.0.appsink.eq(&other.0.appsink)
81 }
82}
83
84impl Eq for StreamProducer {}
85
86#[derive(Debug)]
87struct StreamProducerInner {
88 appsink: gst_app::AppSink,
90 appsink_probe_id: Option<gst::PadProbeId>,
92 consumers: Arc<Mutex<StreamConsumers>>,
94}
95
96impl Drop for StreamProducerInner {
97 fn drop(&mut self) {
98 if let Some(probe_id) = self.appsink_probe_id.take() {
99 let pad = self.appsink.static_pad("sink").unwrap();
100 pad.remove_probe(probe_id);
101 }
102
103 self.appsink
104 .set_callbacks(gst_app::AppSinkCallbacks::builder().build());
105 }
106}
107
108#[derive(Debug)]
111#[must_use]
112pub struct ConsumptionLink {
113 consumer: gst_app::AppSrc,
114 producer: Option<StreamProducer>,
115 dropped: Arc<WrappedAtomicU64>,
117 pushed: Arc<WrappedAtomicU64>,
119 discard: Arc<atomic::AtomicBool>,
121 wait_for_keyframe: Arc<atomic::AtomicBool>,
123}
124
125impl ConsumptionLink {
126 pub fn disconnected(consumer: gst_app::AppSrc) -> ConsumptionLink {
128 ConsumptionLink {
129 consumer,
130 producer: None,
131 dropped: Arc::new(WrappedAtomicU64::new(0)),
132 pushed: Arc::new(WrappedAtomicU64::new(0)),
133 discard: Arc::new(atomic::AtomicBool::new(false)),
134 wait_for_keyframe: Arc::new(atomic::AtomicBool::new(true)),
135 }
136 }
137
138 pub fn change_producer(
140 &mut self,
141 new_producer: &StreamProducer,
142 reset_stats: bool,
143 ) -> Result<(), AddConsumerError> {
144 self.disconnect();
145 if reset_stats {
146 self.dropped.store(0, atomic::Ordering::SeqCst);
147 self.pushed.store(0, atomic::Ordering::SeqCst);
148 }
149 new_producer.add_consumer_internal(
150 &self.consumer,
151 self.dropped.clone(),
152 self.pushed.clone(),
153 self.discard.clone(),
154 self.wait_for_keyframe.clone(),
155 )?;
156 self.producer = Some(new_producer.clone());
157 Ok(())
158 }
159
160 pub fn disconnect(&mut self) {
162 if let Some(producer) = self.producer.take() {
163 producer.remove_consumer(&self.consumer);
164 }
165 }
166
167 pub fn dropped(&self) -> u64 {
169 self.dropped.load(atomic::Ordering::SeqCst)
170 }
171
172 pub fn pushed(&self) -> u64 {
174 self.pushed.load(atomic::Ordering::SeqCst)
175 }
176
177 pub fn discard(&self) -> bool {
179 self.discard.load(atomic::Ordering::SeqCst)
180 }
181
182 pub fn set_discard(&self, discard: bool) {
184 self.discard.store(discard, atomic::Ordering::SeqCst)
185 }
186
187 pub fn wait_for_keyframe(&self) -> bool {
189 self.wait_for_keyframe.load(atomic::Ordering::SeqCst)
190 }
191
192 pub fn set_wait_for_keyframe(&self, wait: bool) {
195 self.wait_for_keyframe.store(wait, atomic::Ordering::SeqCst)
196 }
197
198 pub fn appsrc(&self) -> &gst_app::AppSrc {
200 &self.consumer
201 }
202
203 pub fn stream_producer(&self) -> Option<&StreamProducer> {
205 self.producer.as_ref()
206 }
207}
208
209impl Drop for ConsumptionLink {
210 fn drop(&mut self) {
211 self.disconnect();
212 }
213}
214
215#[derive(Debug, Error)]
216pub enum AddConsumerError {
218 #[error("Consumer already added")]
219 AlreadyAdded,
221}
222
223impl StreamProducer {
224 pub fn configure_consumer(consumer: &gst_app::AppSrc) {
228 consumer.set_latency(gst::ClockTime::ZERO, gst::ClockTime::NONE);
231 consumer.set_format(gst::Format::Time);
232 consumer.set_is_live(true);
233 consumer.set_handle_segment_change(true);
234 consumer.set_max_buffers(0);
235 consumer.set_max_bytes(0);
236 consumer.set_max_time(500 * gst::ClockTime::MSECOND);
237 consumer.set_leaky_type(gst_app::AppLeakyType::Downstream);
238 consumer.set_automatic_eos(false);
239 }
240
241 pub fn add_consumer(
245 &self,
246 consumer: &gst_app::AppSrc,
247 ) -> Result<ConsumptionLink, AddConsumerError> {
248 let dropped = Arc::new(WrappedAtomicU64::new(0));
249 let pushed = Arc::new(WrappedAtomicU64::new(0));
250 let discard = Arc::new(atomic::AtomicBool::new(false));
251 let wait_for_keyframe = Arc::new(atomic::AtomicBool::new(true));
252
253 self.add_consumer_internal(
254 consumer,
255 dropped.clone(),
256 pushed.clone(),
257 discard.clone(),
258 wait_for_keyframe.clone(),
259 )?;
260
261 Ok(ConsumptionLink {
262 consumer: consumer.clone(),
263 producer: Some(self.clone()),
264 dropped,
265 pushed,
266 discard,
267 wait_for_keyframe,
268 })
269 }
270
271 fn add_consumer_internal(
272 &self,
273 consumer: &gst_app::AppSrc,
274 dropped: Arc<WrappedAtomicU64>,
275 pushed: Arc<WrappedAtomicU64>,
276 discard: Arc<atomic::AtomicBool>,
277 wait_for_keyframe: Arc<atomic::AtomicBool>,
278 ) -> Result<(), AddConsumerError> {
279 let mut consumers = self.0.consumers.lock().unwrap();
280 if consumers.consumers.contains_key(consumer) {
281 gst::error!(
282 CAT,
283 obj = &self.0.appsink,
284 "Consumer {} ({:?}) already added",
285 consumer.name(),
286 consumer
287 );
288 return Err(AddConsumerError::AlreadyAdded);
289 }
290
291 gst::debug!(
292 CAT,
293 obj = &self.0.appsink,
294 "Adding consumer {} ({:?})",
295 consumer.name(),
296 consumer
297 );
298
299 Self::configure_consumer(consumer);
300
301 let srcpad = consumer.static_pad("src").unwrap();
303 let fku_probe_id = srcpad
304 .add_probe(
305 gst::PadProbeType::EVENT_UPSTREAM,
306 glib::clone!(
307 #[weak(rename_to = appsink)]
308 self.0.appsink,
309 #[upgrade_or_panic]
310 move |_pad, info| {
311 let Some(event) = info.event() else {
312 return gst::PadProbeReturn::Ok;
313 };
314
315 if gst_video::UpstreamForceKeyUnitEvent::parse(event).is_ok() {
316 gst::debug!(CAT, obj = &appsink, "Requesting keyframe");
317 let pad = appsink.static_pad("sink").unwrap();
319 let _ = pad.push_event(event.clone());
320 }
321
322 gst::PadProbeReturn::Ok
323 }
324 ),
325 )
326 .unwrap();
327
328 let stream_consumer = StreamConsumer::new(
329 consumer,
330 fku_probe_id,
331 dropped,
332 pushed,
333 discard,
334 wait_for_keyframe,
335 );
336
337 consumers
338 .consumers
339 .insert(consumer.clone(), stream_consumer);
340
341 let events_to_forward = consumers.events_to_forward.clone();
344 drop(consumers);
346
347 let appsink_pad = self.0.appsink.static_pad("sink").unwrap();
348 appsink_pad.sticky_events_foreach(|event| {
349 if events_to_forward.contains(&event.type_()) {
350 gst::debug!(
351 CAT,
352 obj = &self.0.appsink,
353 "forward sticky event {:?}",
354 event
355 );
356 consumer.send_event(event.clone());
357 }
358
359 std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
360 });
361
362 Ok(())
363 }
364
365 fn process_sample(
366 sample: gst::Sample,
367 appsink: &gst_app::AppSink,
368 mut consumers: MutexGuard<StreamConsumers>,
369 ) -> Result<gst::FlowSuccess, gst::FlowError> {
370 let (is_discont, is_keyframe) = if let Some(buf) = sample.buffer() {
371 let flags = buf.flags();
372
373 (
374 flags.contains(gst::BufferFlags::DISCONT),
375 !flags.contains(gst::BufferFlags::DELTA_UNIT),
376 )
377 } else {
378 (false, true)
379 };
380
381 gst::trace!(
382 CAT,
383 obj = appsink,
384 "processing sample {:?}",
385 sample.buffer()
386 );
387
388 let latency = consumers.current_latency;
389 let latency_updated = mem::replace(&mut consumers.latency_updated, false);
390
391 let mut needs_keyframe_request = false;
392
393 let current_consumers = consumers
394 .consumers
395 .values()
396 .filter_map(|consumer| {
397 if let Some(latency) = latency {
398 if consumer
399 .forwarded_latency
400 .compare_exchange(
401 false,
402 true,
403 atomic::Ordering::SeqCst,
404 atomic::Ordering::SeqCst,
405 )
406 .is_ok()
407 || latency_updated
408 {
409 gst::info!(CAT, obj = appsink, "setting new latency: {latency}");
410 consumer.appsrc.set_latency(latency, gst::ClockTime::NONE);
411 }
412 }
413
414 if consumer.discard.load(atomic::Ordering::SeqCst) {
415 consumer
416 .needs_keyframe
417 .store(true, atomic::Ordering::SeqCst);
418 return None;
419 }
420
421 if is_discont
422 && !is_keyframe
423 && consumer.wait_for_keyframe.load(atomic::Ordering::SeqCst)
424 {
425 consumer
427 .needs_keyframe
428 .store(true, atomic::Ordering::SeqCst);
429 }
430
431 if !is_keyframe && consumer.needs_keyframe.load(atomic::Ordering::SeqCst) {
432 if !needs_keyframe_request {
434 gst::debug!(CAT, obj = appsink, "Requesting keyframe for first buffer");
435 needs_keyframe_request = true;
436 }
437
438 consumer.dropped.fetch_add(1, atomic::Ordering::SeqCst);
439
440 gst::error!(
441 CAT,
442 obj = appsink,
443 "Ignoring frame for {} while waiting for a keyframe",
444 consumer.appsrc.name()
445 );
446 None
447 } else {
448 consumer
449 .needs_keyframe
450 .store(false, atomic::Ordering::SeqCst);
451 consumer.pushed.fetch_add(1, atomic::Ordering::SeqCst);
452
453 Some(consumer.appsrc.clone())
454 }
455 })
456 .collect::<Vec<_>>();
457
458 drop(consumers);
459
460 if needs_keyframe_request {
461 let pad = appsink.static_pad("sink").unwrap();
463 pad.push_event(
464 gst_video::UpstreamForceKeyUnitEvent::builder()
465 .all_headers(true)
466 .build(),
467 );
468 }
469
470 for consumer in current_consumers {
471 if let Err(err) = consumer.push_sample(&sample) {
472 gst::warning!(CAT, obj = appsink, "Failed to push sample: {}", err);
473 }
474 }
475 Ok(gst::FlowSuccess::Ok)
476 }
477
478 pub fn remove_consumer(&self, consumer: &gst_app::AppSrc) {
480 let name = consumer.name();
481 if self
482 .0
483 .consumers
484 .lock()
485 .unwrap()
486 .consumers
487 .remove(consumer)
488 .is_some()
489 {
490 gst::debug!(
491 CAT,
492 obj = &self.0.appsink,
493 "Removed consumer {} ({:?})",
494 name,
495 consumer
496 );
497 consumer.set_callbacks(gst_app::AppSrcCallbacks::builder().build());
498 } else {
499 gst::debug!(
500 CAT,
501 obj = &self.0.appsink,
502 "Consumer {} ({:?}) not found",
503 name,
504 consumer
505 );
506 }
507 }
508
509 pub fn set_forward_events(&self, events_to_forward: impl IntoIterator<Item = gst::EventType>) {
511 self.0.consumers.lock().unwrap().events_to_forward =
512 events_to_forward.into_iter().collect();
513 }
514
515 pub fn set_forward_preroll(&self, forward_preroll: bool) {
517 self.0.consumers.lock().unwrap().forward_preroll = forward_preroll;
518 }
519
520 pub fn appsink(&self) -> &gst_app::AppSink {
522 &self.0.appsink
523 }
524
525 pub fn error(&self, error: &gst::glib::Error, debug: Option<&str>) {
527 let consumers = self.0.consumers.lock().unwrap();
528
529 for consumer in consumers.consumers.keys() {
530 let mut msg_builder =
531 gst::message::Error::builder_from_error(error.clone()).src(consumer);
532 if let Some(debug) = debug {
533 msg_builder = msg_builder.debug(debug);
534 }
535
536 let _ = consumer.post_message(msg_builder.build());
537 }
538 }
539
540 pub fn last_sample(&self) -> Option<gst::Sample> {
542 self.0.appsink.property("last-sample")
543 }
544}
545
546impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
547 fn from(appsink: &'a gst_app::AppSink) -> Self {
548 let consumers = Arc::new(Mutex::new(StreamConsumers {
549 current_latency: None,
550 latency_updated: false,
551 consumers: HashMap::new(),
552 events_to_forward: vec![gst::EventType::Eos, gst::EventType::Gap],
555 forward_preroll: true,
556 just_forwarded_preroll: false,
557 }));
558
559 appsink.set_callbacks(
560 gst_app::AppSinkCallbacks::builder()
561 .new_sample(glib::clone!(
562 #[strong]
563 consumers,
564 move |appsink| {
565 let mut consumers = consumers.lock().unwrap();
566
567 let sample = match appsink.pull_sample() {
568 Ok(sample) => sample,
569 Err(_err) => {
570 gst::debug!(CAT, obj = appsink, "Failed to pull sample");
571 return Err(gst::FlowError::Flushing);
572 }
573 };
574
575 let just_forwarded_preroll =
576 mem::replace(&mut consumers.just_forwarded_preroll, false);
577
578 if just_forwarded_preroll {
579 return Ok(gst::FlowSuccess::Ok);
580 }
581
582 StreamProducer::process_sample(sample, appsink, consumers)
583 }
584 ))
585 .new_preroll(glib::clone!(
586 #[strong]
587 consumers,
588 move |appsink| {
589 let mut consumers = consumers.lock().unwrap();
590
591 let sample = match appsink.pull_preroll() {
592 Ok(sample) => sample,
593 Err(_err) => {
594 gst::debug!(CAT, obj = appsink, "Failed to pull preroll");
595 return Err(gst::FlowError::Flushing);
596 }
597 };
598
599 if consumers.forward_preroll {
600 consumers.just_forwarded_preroll = true;
601
602 StreamProducer::process_sample(sample, appsink, consumers)
603 } else {
604 Ok(gst::FlowSuccess::Ok)
605 }
606 }
607 ))
608 .new_event(glib::clone!(
609 #[strong]
610 consumers,
611 move |appsink| {
612 match appsink
613 .pull_object()
614 .map(|obj| obj.downcast::<gst::Event>())
615 {
616 Ok(Ok(event)) => {
617 let (events_to_forward, appsrcs) = {
618 let consumers = consumers.lock().unwrap();
620 let events = consumers.events_to_forward.clone();
621 let appsrcs =
622 consumers.consumers.keys().cloned().collect::<Vec<_>>();
623
624 (events, appsrcs)
625 };
626
627 if events_to_forward.contains(&event.type_()) {
628 for appsrc in appsrcs {
629 appsrc.send_event(event.clone());
630 }
631 }
632 }
633 Ok(Err(_)) => {} Err(_err) => gst::warning!(CAT, obj = appsink, "Failed to pull event"),
635 }
636
637 false
638 }
639 ))
640 .eos(glib::clone!(
641 #[strong]
642 consumers,
643 move |appsink| {
644 let stream_consumers = consumers.lock().unwrap();
645
646 if stream_consumers
647 .events_to_forward
648 .contains(&gst::EventType::Eos)
649 {
650 let current_consumers = stream_consumers
651 .consumers
652 .values()
653 .map(|c| c.appsrc.clone())
654 .collect::<Vec<_>>();
655 drop(stream_consumers);
656
657 for consumer in current_consumers {
658 gst::debug!(
659 CAT,
660 obj = appsink,
661 "set EOS on consumer {}",
662 consumer.name()
663 );
664 let _ = consumer.end_of_stream();
665 }
666 } else {
667 gst::debug!(CAT, obj = appsink, "don't forward EOS to consumers");
668 }
669 }
670 ))
671 .build(),
672 );
673
674 let sinkpad = appsink.static_pad("sink").unwrap();
675 let appsink_probe_id = sinkpad
676 .add_probe(
677 gst::PadProbeType::EVENT_UPSTREAM,
678 glib::clone!(
679 #[strong]
680 consumers,
681 move |_pad, info| {
682 let Some(event) = info.event() else {
683 return gst::PadProbeReturn::Ok;
684 };
685
686 let gst::EventView::Latency(event) = event.view() else {
687 return gst::PadProbeReturn::Ok;
688 };
689
690 let latency = event.latency();
691 let mut consumers = consumers.lock().unwrap();
692 consumers.current_latency = Some(latency);
693 consumers.latency_updated = true;
694
695 gst::PadProbeReturn::Ok
696 }
697 ),
698 )
699 .unwrap();
700
701 StreamProducer(Arc::new(StreamProducerInner {
702 appsink: appsink.clone(),
703 appsink_probe_id: Some(appsink_probe_id),
704 consumers,
705 }))
706 }
707}
708
709#[derive(Debug)]
712struct StreamConsumers {
713 current_latency: Option<gst::ClockTime>,
715 latency_updated: bool,
717 consumers: HashMap<gst_app::AppSrc, StreamConsumer>,
719 events_to_forward: Vec<gst::EventType>,
721 forward_preroll: bool,
723 just_forwarded_preroll: bool,
727}
728
729#[derive(Debug)]
731struct StreamConsumer {
732 appsrc: gst_app::AppSrc,
734 fku_probe_id: Option<gst::PadProbeId>,
736 forwarded_latency: atomic::AtomicBool,
738 needs_keyframe: Arc<atomic::AtomicBool>,
742 dropped: Arc<WrappedAtomicU64>,
744 pushed: Arc<WrappedAtomicU64>,
746 discard: Arc<atomic::AtomicBool>,
748 wait_for_keyframe: Arc<atomic::AtomicBool>,
750}
751
752impl StreamConsumer {
753 fn new(
755 appsrc: &gst_app::AppSrc,
756 fku_probe_id: gst::PadProbeId,
757 dropped: Arc<WrappedAtomicU64>,
758 pushed: Arc<WrappedAtomicU64>,
759 discard: Arc<atomic::AtomicBool>,
760 wait_for_keyframe: Arc<atomic::AtomicBool>,
761 ) -> Self {
762 let needs_keyframe = Arc::new(atomic::AtomicBool::new(
763 wait_for_keyframe.load(atomic::Ordering::SeqCst),
764 ));
765 let needs_keyframe_clone = needs_keyframe.clone();
766 let wait_for_keyframe_clone = wait_for_keyframe.clone();
767 let dropped_clone = dropped.clone();
768
769 appsrc.set_callbacks(
770 gst_app::AppSrcCallbacks::builder()
771 .enough_data(move |appsrc| {
772 gst::debug!(
773 CAT,
774 obj = appsrc,
775 "consumer {} ({:?}) is not consuming fast enough, old samples are getting dropped",
776 appsrc.name(),
777 appsrc,
778 );
779
780 needs_keyframe_clone.store(wait_for_keyframe_clone.load(atomic::Ordering::SeqCst), atomic::Ordering::SeqCst);
781 dropped_clone.fetch_add(1, atomic::Ordering::SeqCst);
782 })
783 .build(),
784 );
785
786 StreamConsumer {
787 appsrc: appsrc.clone(),
788 fku_probe_id: Some(fku_probe_id),
789 forwarded_latency: atomic::AtomicBool::new(false),
790 needs_keyframe,
791 dropped,
792 pushed,
793 discard,
794 wait_for_keyframe,
795 }
796 }
797}
798
799impl Drop for StreamConsumer {
800 fn drop(&mut self) {
801 if let Some(fku_probe_id) = self.fku_probe_id.take() {
802 let srcpad = self.appsrc.static_pad("src").unwrap();
803 srcpad.remove_probe(fku_probe_id);
804 }
805 }
806}
807
808impl PartialEq for StreamConsumer {
809 fn eq(&self, other: &Self) -> bool {
810 self.appsrc.eq(&other.appsrc)
811 }
812}
813
814impl Eq for StreamConsumer {}
815
816impl std::hash::Hash for StreamConsumer {
817 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
818 std::hash::Hash::hash(&self.appsrc, state);
819 }
820}
821
822impl std::borrow::Borrow<gst_app::AppSrc> for StreamConsumer {
823 #[inline]
824 fn borrow(&self) -> &gst_app::AppSrc {
825 &self.appsrc
826 }
827}
828
829#[cfg(test)]
830mod tests {
831 use std::{
832 str::FromStr,
833 sync::{Arc, Mutex},
834 };
835
836 use futures::{
837 channel::{mpsc, mpsc::Receiver},
838 SinkExt, StreamExt,
839 };
840 use gst::prelude::*;
841
842 use crate::{ConsumptionLink, StreamProducer};
843
844 fn create_producer() -> (
845 gst::Pipeline,
846 gst_app::AppSrc,
847 gst_app::AppSink,
848 StreamProducer,
849 ) {
850 let producer_pipe =
851 gst::parse::launch("appsrc name=producer_src ! appsink name=producer_sink")
852 .unwrap()
853 .downcast::<gst::Pipeline>()
854 .unwrap();
855 let producer_sink = producer_pipe
856 .by_name("producer_sink")
857 .unwrap()
858 .downcast::<gst_app::AppSink>()
859 .unwrap();
860
861 (
862 producer_pipe.clone(),
863 producer_pipe
864 .by_name("producer_src")
865 .unwrap()
866 .downcast::<gst_app::AppSrc>()
867 .unwrap(),
868 producer_sink.clone(),
869 StreamProducer::from(&producer_sink),
870 )
871 }
872
873 struct Consumer {
874 pipeline: gst::Pipeline,
875 src: gst_app::AppSrc,
876 sink: gst_app::AppSink,
877 receiver: Mutex<Receiver<gst::Sample>>,
878 connected: Mutex<bool>,
879 }
880
881 impl Consumer {
882 fn new(id: &str) -> Self {
883 let pipeline = gst::parse::launch(&format!("appsrc name={id} ! appsink name=sink"))
884 .unwrap()
885 .downcast::<gst::Pipeline>()
886 .unwrap();
887
888 let (sender, receiver) = mpsc::channel::<gst::Sample>(1000);
889 let sender = Arc::new(Mutex::new(sender));
890 let sink = pipeline
891 .by_name("sink")
892 .unwrap()
893 .downcast::<gst_app::AppSink>()
894 .unwrap();
895
896 sink.set_callbacks(
897 gst_app::AppSinkCallbacks::builder()
898 .new_sample(move |appsink| {
900 let sender_clone = sender.clone();
902 futures::executor::block_on(
903 sender_clone
904 .lock()
905 .unwrap()
906 .send(appsink.pull_sample().unwrap()),
907 )
908 .unwrap();
909
910 Ok(gst::FlowSuccess::Ok)
911 })
912 .build(),
913 );
914
915 Self {
916 pipeline: pipeline.clone(),
917 src: pipeline
918 .by_name(id)
919 .unwrap()
920 .downcast::<gst_app::AppSrc>()
921 .unwrap(),
922 sink,
923 receiver: Mutex::new(receiver),
924 connected: Mutex::new(false),
925 }
926 }
927
928 fn connect(&self, producer: &StreamProducer) -> ConsumptionLink {
929 {
930 let mut connected = self.connected.lock().unwrap();
931 *connected = true;
932 }
933
934 producer.add_consumer(&self.src).unwrap()
935 }
936
937 fn disconnect(&self, producer: &StreamProducer) {
938 {
939 let mut connected = self.connected.lock().unwrap();
940 *connected = false;
941 }
942
943 producer.remove_consumer(&self.src);
944 }
945 }
946
947 #[test]
948 fn simple() {
949 gst::init().unwrap();
950
951 let (producer_pipe, producer_src, _producer_sink, producer) = create_producer();
952 producer_pipe
953 .set_state(gst::State::Playing)
954 .expect("Couldn't set producer pipeline state");
955
956 let mut consumers: Vec<Consumer> = Vec::new();
957 let consumer = Consumer::new("consumer1");
958 let link1 = consumer.connect(&producer);
959 consumer
960 .pipeline
961 .set_state(gst::State::Playing)
962 .expect("Couldn't set producer pipeline state");
963 consumers.push(consumer);
964
965 let consumer = Consumer::new("consumer2");
966 let link2 = consumer.connect(&producer);
967 consumer
968 .pipeline
969 .set_state(gst::State::Playing)
970 .expect("Couldn't set producer pipeline state");
971 consumers.push(consumer);
972
973 assert!(producer.last_sample().is_none());
974
975 for i in 0..10 {
976 let caps = gst::Caps::from_str(&format!("test,n={i}")).unwrap();
977 producer_src.set_caps(Some(&caps));
978 producer_src.push_buffer(gst::Buffer::new()).unwrap();
979
980 for consumer in &consumers {
981 if *consumer.connected.lock().unwrap() {
982 let sample =
983 futures::executor::block_on(consumer.receiver.lock().unwrap().next())
984 .expect("Received an empty buffer?");
985 sample.buffer().expect("No buffer on the sample?");
986 assert_eq!(sample.caps(), Some(caps.as_ref()));
987 } else {
988 debug_assert!(
989 consumer
990 .sink
991 .try_pull_sample(gst::ClockTime::from_nseconds(0))
992 .is_none(),
993 "Disconnected consumer got a new sample?!"
994 );
995 }
996 }
997
998 if i == 5 {
999 consumers.first().unwrap().disconnect(&producer);
1000 }
1001 }
1002
1003 assert!(producer.last_sample().is_some());
1004
1005 assert_eq!(link1.pushed(), 6);
1006 assert_eq!(link1.dropped(), 0);
1007 assert_eq!(link2.pushed(), 10);
1008 assert_eq!(link2.dropped(), 0);
1009 }
1010}