gstreamer_utils/
streamproducer.rs

1use std::{
2    collections::HashMap,
3    mem,
4    sync::{atomic, Arc, Mutex, MutexGuard},
5};
6
7use gst::{glib, prelude::*};
8use once_cell::sync::Lazy;
9use thiserror::Error;
10
11// Small wrapper around AtomicU64 and a Mutex, to allow it to run regular AtomicU64
12// operations where supported, and fallback to a mutex where it is not. The wrapper methods
13// are the ones that are needed, and not all are exposed.
14#[derive(Debug)]
15struct WrappedAtomicU64 {
16    #[cfg(not(target_has_atomic = "64"))]
17    atomic: Mutex<u64>,
18    #[cfg(target_has_atomic = "64")]
19    atomic: atomic::AtomicU64,
20}
21
22#[cfg(target_has_atomic = "64")]
23impl WrappedAtomicU64 {
24    fn new(value: u64) -> WrappedAtomicU64 {
25        WrappedAtomicU64 {
26            atomic: atomic::AtomicU64::new(value),
27        }
28    }
29    fn fetch_add(&self, value: u64, order: atomic::Ordering) -> u64 {
30        self.atomic.fetch_add(value, order)
31    }
32    fn store(&self, value: u64, order: atomic::Ordering) {
33        self.atomic.store(value, order);
34    }
35
36    fn load(&self, order: atomic::Ordering) -> u64 {
37        self.atomic.load(order)
38    }
39}
40
41#[cfg(not(target_has_atomic = "64"))]
42impl WrappedAtomicU64 {
43    fn new(value: u64) -> WrappedAtomicU64 {
44        WrappedAtomicU64 {
45            atomic: Mutex::new(value),
46        }
47    }
48    fn fetch_add(&self, value: u64, _order: atomic::Ordering) -> u64 {
49        let mut guard = self.atomic.lock().unwrap();
50        let old = *guard;
51        *guard += value;
52        old
53    }
54    fn store(&self, value: u64, _order: atomic::Ordering) {
55        *self.atomic.lock().unwrap() = value;
56    }
57    fn load(&self, _order: atomic::Ordering) -> u64 {
58        *self.atomic.lock().unwrap()
59    }
60}
61
62static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
63    gst::DebugCategory::new(
64        "utilsrs-stream-producer",
65        gst::DebugColorFlags::empty(),
66        Some("gst_app Stream Producer interface"),
67    )
68});
69
70/// The interface for transporting media data from one node
71/// to another.
72///
73/// A producer is essentially a GStreamer `appsink` whose output
74/// is sent to a set of consumers, who are essentially `appsrc` wrappers
75#[derive(Debug, Clone)]
76pub struct StreamProducer(Arc<StreamProducerInner>);
77
78impl PartialEq for StreamProducer {
79    fn eq(&self, other: &Self) -> bool {
80        self.0.appsink.eq(&other.0.appsink)
81    }
82}
83
84impl Eq for StreamProducer {}
85
86#[derive(Debug)]
87struct StreamProducerInner {
88    /// The appsink to dispatch data for
89    appsink: gst_app::AppSink,
90    /// The pad probe on the appsink=
91    appsink_probe_id: Option<gst::PadProbeId>,
92    /// The consumers to dispatch data to
93    consumers: Arc<Mutex<StreamConsumers>>,
94}
95
96impl Drop for StreamProducerInner {
97    fn drop(&mut self) {
98        if let Some(probe_id) = self.appsink_probe_id.take() {
99            let pad = self.appsink.static_pad("sink").unwrap();
100            pad.remove_probe(probe_id);
101        }
102
103        self.appsink
104            .set_callbacks(gst_app::AppSinkCallbacks::builder().build());
105    }
106}
107
108/// Link between a `StreamProducer` and a consumer, disconnecting the link on `Drop`.
109/// The producer and consumer will stay alive while the link is.
110#[derive(Debug)]
111#[must_use]
112pub struct ConsumptionLink {
113    consumer: gst_app::AppSrc,
114    producer: Option<StreamProducer>,
115    /// number of buffers dropped because `consumer` internal queue was full
116    dropped: Arc<WrappedAtomicU64>,
117    /// number of buffers pushed through `consumer`
118    pushed: Arc<WrappedAtomicU64>,
119    /// if buffers should not be pushed to the `consumer` right now
120    discard: Arc<atomic::AtomicBool>,
121    /// whether the link will drop delta frames until next keyframe on discont
122    wait_for_keyframe: Arc<atomic::AtomicBool>,
123}
124
125impl ConsumptionLink {
126    /// Create a new disconnected `ConsumptionLink`.
127    pub fn disconnected(consumer: gst_app::AppSrc) -> ConsumptionLink {
128        ConsumptionLink {
129            consumer,
130            producer: None,
131            dropped: Arc::new(WrappedAtomicU64::new(0)),
132            pushed: Arc::new(WrappedAtomicU64::new(0)),
133            discard: Arc::new(atomic::AtomicBool::new(false)),
134            wait_for_keyframe: Arc::new(atomic::AtomicBool::new(true)),
135        }
136    }
137
138    /// Replace the producer by a new one, keeping the existing consumer.
139    pub fn change_producer(
140        &mut self,
141        new_producer: &StreamProducer,
142        reset_stats: bool,
143    ) -> Result<(), AddConsumerError> {
144        self.disconnect();
145        if reset_stats {
146            self.dropped.store(0, atomic::Ordering::SeqCst);
147            self.pushed.store(0, atomic::Ordering::SeqCst);
148        }
149        new_producer.add_consumer_internal(
150            &self.consumer,
151            self.dropped.clone(),
152            self.pushed.clone(),
153            self.discard.clone(),
154            self.wait_for_keyframe.clone(),
155        )?;
156        self.producer = Some(new_producer.clone());
157        Ok(())
158    }
159
160    /// Disconnect the consumer from the producer
161    pub fn disconnect(&mut self) {
162        if let Some(producer) = self.producer.take() {
163            producer.remove_consumer(&self.consumer);
164        }
165    }
166
167    /// number of dropped buffers because the consumer internal queue was full
168    pub fn dropped(&self) -> u64 {
169        self.dropped.load(atomic::Ordering::SeqCst)
170    }
171
172    /// number of buffers pushed through this link
173    pub fn pushed(&self) -> u64 {
174        self.pushed.load(atomic::Ordering::SeqCst)
175    }
176
177    /// if buffers are currently pushed through this link
178    pub fn discard(&self) -> bool {
179        self.discard.load(atomic::Ordering::SeqCst)
180    }
181
182    /// If set to `true` then no buffers will be pushed through this link
183    pub fn set_discard(&self, discard: bool) {
184        self.discard.store(discard, atomic::Ordering::SeqCst)
185    }
186
187    /// if the link will drop frames until the next keyframe on discont
188    pub fn wait_for_keyframe(&self) -> bool {
189        self.wait_for_keyframe.load(atomic::Ordering::SeqCst)
190    }
191
192    /// If set to `true` then the link will drop delta-frames until the next
193    /// keyframe on discont (default behavior).
194    pub fn set_wait_for_keyframe(&self, wait: bool) {
195        self.wait_for_keyframe.store(wait, atomic::Ordering::SeqCst)
196    }
197
198    /// Get the GStreamer `appsrc` wrapped by this link
199    pub fn appsrc(&self) -> &gst_app::AppSrc {
200        &self.consumer
201    }
202
203    /// Get the `StreamProducer` currently by this link, if any.
204    pub fn stream_producer(&self) -> Option<&StreamProducer> {
205        self.producer.as_ref()
206    }
207}
208
209impl Drop for ConsumptionLink {
210    fn drop(&mut self) {
211        self.disconnect();
212    }
213}
214
215#[derive(Debug, Error)]
216/// Error type returned when adding consumers to producers.
217pub enum AddConsumerError {
218    #[error("Consumer already added")]
219    /// Consumer has already been added to this producer.
220    AlreadyAdded,
221}
222
223impl StreamProducer {
224    /// Configure a consumer `appsrc` for later use in a `StreamProducer`
225    ///
226    /// This is automatically called when calling `add_consumer()`.
227    pub fn configure_consumer(consumer: &gst_app::AppSrc) {
228        // Latency on the appsrc is set by the publisher before the first buffer
229        // and whenever it changes
230        consumer.set_latency(gst::ClockTime::ZERO, gst::ClockTime::NONE);
231        consumer.set_format(gst::Format::Time);
232        consumer.set_is_live(true);
233        consumer.set_handle_segment_change(true);
234        consumer.set_max_buffers(0);
235        consumer.set_max_bytes(0);
236        consumer.set_max_time(500 * gst::ClockTime::MSECOND);
237        consumer.set_leaky_type(gst_app::AppLeakyType::Downstream);
238        consumer.set_automatic_eos(false);
239    }
240
241    /// Add an appsrc to dispatch data to.
242    ///
243    /// Dropping the returned `ConsumptionLink` will automatically disconnect the consumer from the producer.
244    pub fn add_consumer(
245        &self,
246        consumer: &gst_app::AppSrc,
247    ) -> Result<ConsumptionLink, AddConsumerError> {
248        let dropped = Arc::new(WrappedAtomicU64::new(0));
249        let pushed = Arc::new(WrappedAtomicU64::new(0));
250        let discard = Arc::new(atomic::AtomicBool::new(false));
251        let wait_for_keyframe = Arc::new(atomic::AtomicBool::new(true));
252
253        self.add_consumer_internal(
254            consumer,
255            dropped.clone(),
256            pushed.clone(),
257            discard.clone(),
258            wait_for_keyframe.clone(),
259        )?;
260
261        Ok(ConsumptionLink {
262            consumer: consumer.clone(),
263            producer: Some(self.clone()),
264            dropped,
265            pushed,
266            discard,
267            wait_for_keyframe,
268        })
269    }
270
271    fn add_consumer_internal(
272        &self,
273        consumer: &gst_app::AppSrc,
274        dropped: Arc<WrappedAtomicU64>,
275        pushed: Arc<WrappedAtomicU64>,
276        discard: Arc<atomic::AtomicBool>,
277        wait_for_keyframe: Arc<atomic::AtomicBool>,
278    ) -> Result<(), AddConsumerError> {
279        let mut consumers = self.0.consumers.lock().unwrap();
280        if consumers.consumers.contains_key(consumer) {
281            gst::error!(
282                CAT,
283                obj = &self.0.appsink,
284                "Consumer {} ({:?}) already added",
285                consumer.name(),
286                consumer
287            );
288            return Err(AddConsumerError::AlreadyAdded);
289        }
290
291        gst::debug!(
292            CAT,
293            obj = &self.0.appsink,
294            "Adding consumer {} ({:?})",
295            consumer.name(),
296            consumer
297        );
298
299        Self::configure_consumer(consumer);
300
301        // Forward force-keyunit events upstream to the appsink
302        let srcpad = consumer.static_pad("src").unwrap();
303        let fku_probe_id = srcpad
304            .add_probe(
305                gst::PadProbeType::EVENT_UPSTREAM,
306                glib::clone!(
307                    #[weak(rename_to = appsink)]
308                    self.0.appsink,
309                    #[upgrade_or_panic]
310                    move |_pad, info| {
311                        let Some(event) = info.event() else {
312                            return gst::PadProbeReturn::Ok;
313                        };
314
315                        if gst_video::UpstreamForceKeyUnitEvent::parse(event).is_ok() {
316                            gst::debug!(CAT, obj = &appsink, "Requesting keyframe");
317                            // Do not use `gst_element_send_event()` as it takes the state lock which may lead to dead locks.
318                            let pad = appsink.static_pad("sink").unwrap();
319                            let _ = pad.push_event(event.clone());
320                        }
321
322                        gst::PadProbeReturn::Ok
323                    }
324                ),
325            )
326            .unwrap();
327
328        let stream_consumer = StreamConsumer::new(
329            consumer,
330            fku_probe_id,
331            dropped,
332            pushed,
333            discard,
334            wait_for_keyframe,
335        );
336
337        consumers
338            .consumers
339            .insert(consumer.clone(), stream_consumer);
340
341        // forward selected sticky events. We can send those now as appsrc will delay the events
342        // until stream-start, caps and segment are sent.
343        let events_to_forward = consumers.events_to_forward.clone();
344        // drop the lock before sending events
345        drop(consumers);
346
347        let appsink_pad = self.0.appsink.static_pad("sink").unwrap();
348        appsink_pad.sticky_events_foreach(|event| {
349            if events_to_forward.contains(&event.type_()) {
350                gst::debug!(
351                    CAT,
352                    obj = &self.0.appsink,
353                    "forward sticky event {:?}",
354                    event
355                );
356                consumer.send_event(event.clone());
357            }
358
359            std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
360        });
361
362        Ok(())
363    }
364
365    fn process_sample(
366        sample: gst::Sample,
367        appsink: &gst_app::AppSink,
368        mut consumers: MutexGuard<StreamConsumers>,
369    ) -> Result<gst::FlowSuccess, gst::FlowError> {
370        let (is_discont, is_keyframe) = if let Some(buf) = sample.buffer() {
371            let flags = buf.flags();
372
373            (
374                flags.contains(gst::BufferFlags::DISCONT),
375                !flags.contains(gst::BufferFlags::DELTA_UNIT),
376            )
377        } else {
378            (false, true)
379        };
380
381        gst::trace!(
382            CAT,
383            obj = appsink,
384            "processing sample {:?}",
385            sample.buffer()
386        );
387
388        let latency = consumers.current_latency;
389        let latency_updated = mem::replace(&mut consumers.latency_updated, false);
390
391        let mut needs_keyframe_request = false;
392
393        let current_consumers = consumers
394            .consumers
395            .values()
396            .filter_map(|consumer| {
397                if let Some(latency) = latency {
398                    if consumer
399                        .forwarded_latency
400                        .compare_exchange(
401                            false,
402                            true,
403                            atomic::Ordering::SeqCst,
404                            atomic::Ordering::SeqCst,
405                        )
406                        .is_ok()
407                        || latency_updated
408                    {
409                        gst::info!(CAT, obj = appsink, "setting new latency: {latency}");
410                        consumer.appsrc.set_latency(latency, gst::ClockTime::NONE);
411                    }
412                }
413
414                if consumer.discard.load(atomic::Ordering::SeqCst) {
415                    consumer
416                        .needs_keyframe
417                        .store(true, atomic::Ordering::SeqCst);
418                    return None;
419                }
420
421                if is_discont
422                    && !is_keyframe
423                    && consumer.wait_for_keyframe.load(atomic::Ordering::SeqCst)
424                {
425                    // Whenever we have a discontinuity, we need a new keyframe
426                    consumer
427                        .needs_keyframe
428                        .store(true, atomic::Ordering::SeqCst);
429                }
430
431                if !is_keyframe && consumer.needs_keyframe.load(atomic::Ordering::SeqCst) {
432                    // If we need a keyframe (and this one isn't) request a keyframe upstream
433                    if !needs_keyframe_request {
434                        gst::debug!(CAT, obj = appsink, "Requesting keyframe for first buffer");
435                        needs_keyframe_request = true;
436                    }
437
438                    consumer.dropped.fetch_add(1, atomic::Ordering::SeqCst);
439
440                    gst::error!(
441                        CAT,
442                        obj = appsink,
443                        "Ignoring frame for {} while waiting for a keyframe",
444                        consumer.appsrc.name()
445                    );
446                    None
447                } else {
448                    consumer
449                        .needs_keyframe
450                        .store(false, atomic::Ordering::SeqCst);
451                    consumer.pushed.fetch_add(1, atomic::Ordering::SeqCst);
452
453                    Some(consumer.appsrc.clone())
454                }
455            })
456            .collect::<Vec<_>>();
457
458        drop(consumers);
459
460        if needs_keyframe_request {
461            // Do not use `gst_element_send_event()` as it takes the state lock which may lead to dead locks.
462            let pad = appsink.static_pad("sink").unwrap();
463            pad.push_event(
464                gst_video::UpstreamForceKeyUnitEvent::builder()
465                    .all_headers(true)
466                    .build(),
467            );
468        }
469
470        for consumer in current_consumers {
471            if let Err(err) = consumer.push_sample(&sample) {
472                gst::warning!(CAT, obj = appsink, "Failed to push sample: {}", err);
473            }
474        }
475        Ok(gst::FlowSuccess::Ok)
476    }
477
478    /// Remove a consumer appsrc by id
479    pub fn remove_consumer(&self, consumer: &gst_app::AppSrc) {
480        let name = consumer.name();
481        if self
482            .0
483            .consumers
484            .lock()
485            .unwrap()
486            .consumers
487            .remove(consumer)
488            .is_some()
489        {
490            gst::debug!(
491                CAT,
492                obj = &self.0.appsink,
493                "Removed consumer {} ({:?})",
494                name,
495                consumer
496            );
497            consumer.set_callbacks(gst_app::AppSrcCallbacks::builder().build());
498        } else {
499            gst::debug!(
500                CAT,
501                obj = &self.0.appsink,
502                "Consumer {} ({:?}) not found",
503                name,
504                consumer
505            );
506        }
507    }
508
509    /// configure event types the appsink should forward to all its consumers (default: `Eos`).
510    pub fn set_forward_events(&self, events_to_forward: impl IntoIterator<Item = gst::EventType>) {
511        self.0.consumers.lock().unwrap().events_to_forward =
512            events_to_forward.into_iter().collect();
513    }
514
515    /// configure whether the preroll sample should be forwarded (default: `true`)
516    pub fn set_forward_preroll(&self, forward_preroll: bool) {
517        self.0.consumers.lock().unwrap().forward_preroll = forward_preroll;
518    }
519
520    /// Get the GStreamer `appsink` wrapped by this producer
521    pub fn appsink(&self) -> &gst_app::AppSink {
522        &self.0.appsink
523    }
524
525    /// Signals an error on all consumers
526    pub fn error(&self, error: &gst::glib::Error, debug: Option<&str>) {
527        let consumers = self.0.consumers.lock().unwrap();
528
529        for consumer in consumers.consumers.keys() {
530            let mut msg_builder =
531                gst::message::Error::builder_from_error(error.clone()).src(consumer);
532            if let Some(debug) = debug {
533                msg_builder = msg_builder.debug(debug);
534            }
535
536            let _ = consumer.post_message(msg_builder.build());
537        }
538    }
539
540    /// The last sample produced by this producer.
541    pub fn last_sample(&self) -> Option<gst::Sample> {
542        self.0.appsink.property("last-sample")
543    }
544}
545
546impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
547    fn from(appsink: &'a gst_app::AppSink) -> Self {
548        let consumers = Arc::new(Mutex::new(StreamConsumers {
549            current_latency: None,
550            latency_updated: false,
551            consumers: HashMap::new(),
552            // it would make sense to automatically forward more events such as Tag but that would break
553            // with older GStreamer, see https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4297
554            events_to_forward: vec![gst::EventType::Eos, gst::EventType::Gap],
555            forward_preroll: true,
556            just_forwarded_preroll: false,
557        }));
558
559        appsink.set_callbacks(
560            gst_app::AppSinkCallbacks::builder()
561                .new_sample(glib::clone!(
562                    #[strong]
563                    consumers,
564                    move |appsink| {
565                        let mut consumers = consumers.lock().unwrap();
566
567                        let sample = match appsink.pull_sample() {
568                            Ok(sample) => sample,
569                            Err(_err) => {
570                                gst::debug!(CAT, obj = appsink, "Failed to pull sample");
571                                return Err(gst::FlowError::Flushing);
572                            }
573                        };
574
575                        let just_forwarded_preroll =
576                            mem::replace(&mut consumers.just_forwarded_preroll, false);
577
578                        if just_forwarded_preroll {
579                            return Ok(gst::FlowSuccess::Ok);
580                        }
581
582                        StreamProducer::process_sample(sample, appsink, consumers)
583                    }
584                ))
585                .new_preroll(glib::clone!(
586                    #[strong]
587                    consumers,
588                    move |appsink| {
589                        let mut consumers = consumers.lock().unwrap();
590
591                        let sample = match appsink.pull_preroll() {
592                            Ok(sample) => sample,
593                            Err(_err) => {
594                                gst::debug!(CAT, obj = appsink, "Failed to pull preroll");
595                                return Err(gst::FlowError::Flushing);
596                            }
597                        };
598
599                        if consumers.forward_preroll {
600                            consumers.just_forwarded_preroll = true;
601
602                            StreamProducer::process_sample(sample, appsink, consumers)
603                        } else {
604                            Ok(gst::FlowSuccess::Ok)
605                        }
606                    }
607                ))
608                .new_event(glib::clone!(
609                    #[strong]
610                    consumers,
611                    move |appsink| {
612                        match appsink
613                            .pull_object()
614                            .map(|obj| obj.downcast::<gst::Event>())
615                        {
616                            Ok(Ok(event)) => {
617                                let (events_to_forward, appsrcs) = {
618                                    // clone so we don't keep the lock while pushing events
619                                    let consumers = consumers.lock().unwrap();
620                                    let events = consumers.events_to_forward.clone();
621                                    let appsrcs =
622                                        consumers.consumers.keys().cloned().collect::<Vec<_>>();
623
624                                    (events, appsrcs)
625                                };
626
627                                if events_to_forward.contains(&event.type_()) {
628                                    for appsrc in appsrcs {
629                                        appsrc.send_event(event.clone());
630                                    }
631                                }
632                            }
633                            Ok(Err(_)) => {} // pulled another unsupported object type, ignore
634                            Err(_err) => gst::warning!(CAT, obj = appsink, "Failed to pull event"),
635                        }
636
637                        false
638                    }
639                ))
640                .eos(glib::clone!(
641                    #[strong]
642                    consumers,
643                    move |appsink| {
644                        let stream_consumers = consumers.lock().unwrap();
645
646                        if stream_consumers
647                            .events_to_forward
648                            .contains(&gst::EventType::Eos)
649                        {
650                            let current_consumers = stream_consumers
651                                .consumers
652                                .values()
653                                .map(|c| c.appsrc.clone())
654                                .collect::<Vec<_>>();
655                            drop(stream_consumers);
656
657                            for consumer in current_consumers {
658                                gst::debug!(
659                                    CAT,
660                                    obj = appsink,
661                                    "set EOS on consumer {}",
662                                    consumer.name()
663                                );
664                                let _ = consumer.end_of_stream();
665                            }
666                        } else {
667                            gst::debug!(CAT, obj = appsink, "don't forward EOS to consumers");
668                        }
669                    }
670                ))
671                .build(),
672        );
673
674        let sinkpad = appsink.static_pad("sink").unwrap();
675        let appsink_probe_id = sinkpad
676            .add_probe(
677                gst::PadProbeType::EVENT_UPSTREAM,
678                glib::clone!(
679                    #[strong]
680                    consumers,
681                    move |_pad, info| {
682                        let Some(event) = info.event() else {
683                            return gst::PadProbeReturn::Ok;
684                        };
685
686                        let gst::EventView::Latency(event) = event.view() else {
687                            return gst::PadProbeReturn::Ok;
688                        };
689
690                        let latency = event.latency();
691                        let mut consumers = consumers.lock().unwrap();
692                        consumers.current_latency = Some(latency);
693                        consumers.latency_updated = true;
694
695                        gst::PadProbeReturn::Ok
696                    }
697                ),
698            )
699            .unwrap();
700
701        StreamProducer(Arc::new(StreamProducerInner {
702            appsink: appsink.clone(),
703            appsink_probe_id: Some(appsink_probe_id),
704            consumers,
705        }))
706    }
707}
708
709/// Wrapper around a HashMap of consumers, exists for thread safety
710/// and also protects some of the producer state
711#[derive(Debug)]
712struct StreamConsumers {
713    /// The currently-observed latency
714    current_latency: Option<gst::ClockTime>,
715    /// Whether the consumers' appsrc latency needs updating
716    latency_updated: bool,
717    /// The consumers, AppSrc pointer value -> consumer
718    consumers: HashMap<gst_app::AppSrc, StreamConsumer>,
719    /// What events should be forwarded to consumers
720    events_to_forward: Vec<gst::EventType>,
721    /// Whether the preroll sample should be forwarded at all
722    forward_preroll: bool,
723    /// Whether we just forwarded the preroll sample. When we did we want to
724    /// discard the next sample from on_new_sample as it would cause us to
725    /// otherwise push out the same sample twice to consumers.
726    just_forwarded_preroll: bool,
727}
728
729/// Wrapper around a consumer's `appsrc`
730#[derive(Debug)]
731struct StreamConsumer {
732    /// The GStreamer `appsrc` of the consumer
733    appsrc: gst_app::AppSrc,
734    /// The id of a pad probe that intercepts force-key-unit events
735    fku_probe_id: Option<gst::PadProbeId>,
736    /// Whether an initial latency was forwarded to the `appsrc`
737    forwarded_latency: atomic::AtomicBool,
738    /// Whether a first buffer has made it through, used to determine
739    /// whether a new key unit should be requested. Only useful for encoded
740    /// streams.
741    needs_keyframe: Arc<atomic::AtomicBool>,
742    /// number of buffers dropped because `appsrc` internal queue was full
743    dropped: Arc<WrappedAtomicU64>,
744    /// number of buffers pushed through `appsrc`
745    pushed: Arc<WrappedAtomicU64>,
746    /// if buffers should not be pushed to the `appsrc` right now
747    discard: Arc<atomic::AtomicBool>,
748    /// whether the consumer should drop delta frames until next keyframe on discont
749    wait_for_keyframe: Arc<atomic::AtomicBool>,
750}
751
752impl StreamConsumer {
753    /// Create a new consumer
754    fn new(
755        appsrc: &gst_app::AppSrc,
756        fku_probe_id: gst::PadProbeId,
757        dropped: Arc<WrappedAtomicU64>,
758        pushed: Arc<WrappedAtomicU64>,
759        discard: Arc<atomic::AtomicBool>,
760        wait_for_keyframe: Arc<atomic::AtomicBool>,
761    ) -> Self {
762        let needs_keyframe = Arc::new(atomic::AtomicBool::new(
763            wait_for_keyframe.load(atomic::Ordering::SeqCst),
764        ));
765        let needs_keyframe_clone = needs_keyframe.clone();
766        let wait_for_keyframe_clone = wait_for_keyframe.clone();
767        let dropped_clone = dropped.clone();
768
769        appsrc.set_callbacks(
770            gst_app::AppSrcCallbacks::builder()
771                .enough_data(move |appsrc| {
772                    gst::debug!(
773                        CAT,
774                        obj = appsrc,
775                        "consumer {} ({:?}) is not consuming fast enough, old samples are getting dropped",
776                        appsrc.name(),
777                        appsrc,
778                    );
779
780                    needs_keyframe_clone.store(wait_for_keyframe_clone.load(atomic::Ordering::SeqCst), atomic::Ordering::SeqCst);
781                    dropped_clone.fetch_add(1, atomic::Ordering::SeqCst);
782                })
783                .build(),
784        );
785
786        StreamConsumer {
787            appsrc: appsrc.clone(),
788            fku_probe_id: Some(fku_probe_id),
789            forwarded_latency: atomic::AtomicBool::new(false),
790            needs_keyframe,
791            dropped,
792            pushed,
793            discard,
794            wait_for_keyframe,
795        }
796    }
797}
798
799impl Drop for StreamConsumer {
800    fn drop(&mut self) {
801        if let Some(fku_probe_id) = self.fku_probe_id.take() {
802            let srcpad = self.appsrc.static_pad("src").unwrap();
803            srcpad.remove_probe(fku_probe_id);
804        }
805    }
806}
807
808impl PartialEq for StreamConsumer {
809    fn eq(&self, other: &Self) -> bool {
810        self.appsrc.eq(&other.appsrc)
811    }
812}
813
814impl Eq for StreamConsumer {}
815
816impl std::hash::Hash for StreamConsumer {
817    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
818        std::hash::Hash::hash(&self.appsrc, state);
819    }
820}
821
822impl std::borrow::Borrow<gst_app::AppSrc> for StreamConsumer {
823    #[inline]
824    fn borrow(&self) -> &gst_app::AppSrc {
825        &self.appsrc
826    }
827}
828
829#[cfg(test)]
830mod tests {
831    use std::{
832        str::FromStr,
833        sync::{Arc, Mutex},
834    };
835
836    use futures::{
837        channel::{mpsc, mpsc::Receiver},
838        SinkExt, StreamExt,
839    };
840    use gst::prelude::*;
841
842    use crate::{ConsumptionLink, StreamProducer};
843
844    fn create_producer() -> (
845        gst::Pipeline,
846        gst_app::AppSrc,
847        gst_app::AppSink,
848        StreamProducer,
849    ) {
850        let producer_pipe =
851            gst::parse::launch("appsrc name=producer_src ! appsink name=producer_sink")
852                .unwrap()
853                .downcast::<gst::Pipeline>()
854                .unwrap();
855        let producer_sink = producer_pipe
856            .by_name("producer_sink")
857            .unwrap()
858            .downcast::<gst_app::AppSink>()
859            .unwrap();
860
861        (
862            producer_pipe.clone(),
863            producer_pipe
864                .by_name("producer_src")
865                .unwrap()
866                .downcast::<gst_app::AppSrc>()
867                .unwrap(),
868            producer_sink.clone(),
869            StreamProducer::from(&producer_sink),
870        )
871    }
872
873    struct Consumer {
874        pipeline: gst::Pipeline,
875        src: gst_app::AppSrc,
876        sink: gst_app::AppSink,
877        receiver: Mutex<Receiver<gst::Sample>>,
878        connected: Mutex<bool>,
879    }
880
881    impl Consumer {
882        fn new(id: &str) -> Self {
883            let pipeline = gst::parse::launch(&format!("appsrc name={id} ! appsink name=sink"))
884                .unwrap()
885                .downcast::<gst::Pipeline>()
886                .unwrap();
887
888            let (sender, receiver) = mpsc::channel::<gst::Sample>(1000);
889            let sender = Arc::new(Mutex::new(sender));
890            let sink = pipeline
891                .by_name("sink")
892                .unwrap()
893                .downcast::<gst_app::AppSink>()
894                .unwrap();
895
896            sink.set_callbacks(
897                gst_app::AppSinkCallbacks::builder()
898                    // Add a handler to the "new-sample" signal.
899                    .new_sample(move |appsink| {
900                        // Pull the sample in question out of the appsink's buffer.
901                        let sender_clone = sender.clone();
902                        futures::executor::block_on(
903                            sender_clone
904                                .lock()
905                                .unwrap()
906                                .send(appsink.pull_sample().unwrap()),
907                        )
908                        .unwrap();
909
910                        Ok(gst::FlowSuccess::Ok)
911                    })
912                    .build(),
913            );
914
915            Self {
916                pipeline: pipeline.clone(),
917                src: pipeline
918                    .by_name(id)
919                    .unwrap()
920                    .downcast::<gst_app::AppSrc>()
921                    .unwrap(),
922                sink,
923                receiver: Mutex::new(receiver),
924                connected: Mutex::new(false),
925            }
926        }
927
928        fn connect(&self, producer: &StreamProducer) -> ConsumptionLink {
929            {
930                let mut connected = self.connected.lock().unwrap();
931                *connected = true;
932            }
933
934            producer.add_consumer(&self.src).unwrap()
935        }
936
937        fn disconnect(&self, producer: &StreamProducer) {
938            {
939                let mut connected = self.connected.lock().unwrap();
940                *connected = false;
941            }
942
943            producer.remove_consumer(&self.src);
944        }
945    }
946
947    #[test]
948    fn simple() {
949        gst::init().unwrap();
950
951        let (producer_pipe, producer_src, _producer_sink, producer) = create_producer();
952        producer_pipe
953            .set_state(gst::State::Playing)
954            .expect("Couldn't set producer pipeline state");
955
956        let mut consumers: Vec<Consumer> = Vec::new();
957        let consumer = Consumer::new("consumer1");
958        let link1 = consumer.connect(&producer);
959        consumer
960            .pipeline
961            .set_state(gst::State::Playing)
962            .expect("Couldn't set producer pipeline state");
963        consumers.push(consumer);
964
965        let consumer = Consumer::new("consumer2");
966        let link2 = consumer.connect(&producer);
967        consumer
968            .pipeline
969            .set_state(gst::State::Playing)
970            .expect("Couldn't set producer pipeline state");
971        consumers.push(consumer);
972
973        assert!(producer.last_sample().is_none());
974
975        for i in 0..10 {
976            let caps = gst::Caps::from_str(&format!("test,n={i}")).unwrap();
977            producer_src.set_caps(Some(&caps));
978            producer_src.push_buffer(gst::Buffer::new()).unwrap();
979
980            for consumer in &consumers {
981                if *consumer.connected.lock().unwrap() {
982                    let sample =
983                        futures::executor::block_on(consumer.receiver.lock().unwrap().next())
984                            .expect("Received an empty buffer?");
985                    sample.buffer().expect("No buffer on the sample?");
986                    assert_eq!(sample.caps(), Some(caps.as_ref()));
987                } else {
988                    debug_assert!(
989                        consumer
990                            .sink
991                            .try_pull_sample(gst::ClockTime::from_nseconds(0))
992                            .is_none(),
993                        "Disconnected consumer got a new sample?!"
994                    );
995                }
996            }
997
998            if i == 5 {
999                consumers.first().unwrap().disconnect(&producer);
1000            }
1001        }
1002
1003        assert!(producer.last_sample().is_some());
1004
1005        assert_eq!(link1.pushed(), 6);
1006        assert_eq!(link1.dropped(), 0);
1007        assert_eq!(link2.pushed(), 10);
1008        assert_eq!(link2.dropped(), 0);
1009    }
1010}