gstreamer_utils/
streamproducer.rs

1use std::{
2    collections::HashMap,
3    mem,
4    sync::{atomic, Arc, Mutex, MutexGuard},
5};
6
7use gst::{glib, prelude::*};
8use std::sync::LazyLock;
9use thiserror::Error;
10
11// Small wrapper around AtomicU64 and a Mutex, to allow it to run regular AtomicU64
12// operations where supported, and fallback to a mutex where it is not. The wrapper methods
13// are the ones that are needed, and not all are exposed.
14#[derive(Debug)]
15struct WrappedAtomicU64 {
16    #[cfg(not(target_has_atomic = "64"))]
17    atomic: Mutex<u64>,
18    #[cfg(target_has_atomic = "64")]
19    atomic: atomic::AtomicU64,
20}
21
22#[cfg(target_has_atomic = "64")]
23impl WrappedAtomicU64 {
24    fn new(value: u64) -> WrappedAtomicU64 {
25        WrappedAtomicU64 {
26            atomic: atomic::AtomicU64::new(value),
27        }
28    }
29    fn fetch_add(&self, value: u64, order: atomic::Ordering) -> u64 {
30        self.atomic.fetch_add(value, order)
31    }
32    fn store(&self, value: u64, order: atomic::Ordering) {
33        self.atomic.store(value, order);
34    }
35
36    fn load(&self, order: atomic::Ordering) -> u64 {
37        self.atomic.load(order)
38    }
39}
40
41#[cfg(not(target_has_atomic = "64"))]
42impl WrappedAtomicU64 {
43    fn new(value: u64) -> WrappedAtomicU64 {
44        WrappedAtomicU64 {
45            atomic: Mutex::new(value),
46        }
47    }
48    fn fetch_add(&self, value: u64, _order: atomic::Ordering) -> u64 {
49        let mut guard = self.atomic.lock().unwrap();
50        let old = *guard;
51        *guard += value;
52        old
53    }
54    fn store(&self, value: u64, _order: atomic::Ordering) {
55        *self.atomic.lock().unwrap() = value;
56    }
57    fn load(&self, _order: atomic::Ordering) -> u64 {
58        *self.atomic.lock().unwrap()
59    }
60}
61
62static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
63    gst::DebugCategory::new(
64        "utilsrs-stream-producer",
65        gst::DebugColorFlags::empty(),
66        Some("gst_app Stream Producer interface"),
67    )
68});
69
70/// The interface for transporting media data from one node
71/// to another.
72///
73/// A producer is essentially a GStreamer `appsink` whose output
74/// is sent to a set of consumers, who are essentially `appsrc` wrappers
75#[derive(Debug, Clone)]
76pub struct StreamProducer(Arc<StreamProducerInner>);
77
78impl PartialEq for StreamProducer {
79    fn eq(&self, other: &Self) -> bool {
80        self.0.appsink.eq(&other.0.appsink)
81    }
82}
83
84impl Eq for StreamProducer {}
85
86#[derive(Debug)]
87struct StreamProducerInner {
88    /// The appsink to dispatch data for
89    appsink: gst_app::AppSink,
90    /// The pad probe on the appsink=
91    appsink_probe_id: Option<gst::PadProbeId>,
92    /// The consumers to dispatch data to
93    consumers: Arc<Mutex<StreamConsumers>>,
94}
95
96impl Drop for StreamProducerInner {
97    fn drop(&mut self) {
98        if let Some(probe_id) = self.appsink_probe_id.take() {
99            let pad = self.appsink.static_pad("sink").unwrap();
100            pad.remove_probe(probe_id);
101        }
102
103        self.appsink
104            .set_callbacks(gst_app::AppSinkCallbacks::builder().build());
105    }
106}
107
108/// Link between a `StreamProducer` and a consumer, disconnecting the link on `Drop`.
109/// The producer and consumer will stay alive while the link is.
110#[derive(Debug)]
111#[must_use]
112pub struct ConsumptionLink {
113    consumer: gst_app::AppSrc,
114    producer: Option<StreamProducer>,
115    /// number of buffers dropped because `consumer` internal queue was full
116    dropped: Arc<WrappedAtomicU64>,
117    /// number of buffers pushed through `consumer`
118    pushed: Arc<WrappedAtomicU64>,
119    /// if buffers should not be pushed to the `consumer` right now
120    discard: Arc<atomic::AtomicBool>,
121    /// whether the link will drop delta frames until next keyframe on discont
122    wait_for_keyframe: Arc<atomic::AtomicBool>,
123}
124
125impl ConsumptionLink {
126    /// Create a new disconnected `ConsumptionLink`.
127    pub fn disconnected(consumer: gst_app::AppSrc) -> ConsumptionLink {
128        StreamProducer::configure_consumer(&consumer);
129
130        ConsumptionLink {
131            consumer,
132            producer: None,
133            dropped: Arc::new(WrappedAtomicU64::new(0)),
134            pushed: Arc::new(WrappedAtomicU64::new(0)),
135            discard: Arc::new(atomic::AtomicBool::new(false)),
136            wait_for_keyframe: Arc::new(atomic::AtomicBool::new(true)),
137        }
138    }
139
140    /// Replace the producer by a new one, keeping the existing consumer.
141    pub fn change_producer(
142        &mut self,
143        new_producer: &StreamProducer,
144        reset_stats: bool,
145    ) -> Result<(), AddConsumerError> {
146        self.disconnect();
147        if reset_stats {
148            self.dropped.store(0, atomic::Ordering::SeqCst);
149            self.pushed.store(0, atomic::Ordering::SeqCst);
150        }
151        new_producer.add_consumer_internal(
152            &self.consumer,
153            false,
154            self.dropped.clone(),
155            self.pushed.clone(),
156            self.discard.clone(),
157            self.wait_for_keyframe.clone(),
158        )?;
159        self.producer = Some(new_producer.clone());
160        Ok(())
161    }
162
163    /// Disconnect the consumer from the producer
164    pub fn disconnect(&mut self) {
165        if let Some(producer) = self.producer.take() {
166            producer.remove_consumer(&self.consumer);
167        }
168    }
169
170    /// number of dropped buffers because the consumer internal queue was full
171    pub fn dropped(&self) -> u64 {
172        self.dropped.load(atomic::Ordering::SeqCst)
173    }
174
175    /// number of buffers pushed through this link
176    pub fn pushed(&self) -> u64 {
177        self.pushed.load(atomic::Ordering::SeqCst)
178    }
179
180    /// if buffers are currently pushed through this link
181    pub fn discard(&self) -> bool {
182        self.discard.load(atomic::Ordering::SeqCst)
183    }
184
185    /// If set to `true` then no buffers will be pushed through this link
186    pub fn set_discard(&self, discard: bool) {
187        self.discard.store(discard, atomic::Ordering::SeqCst)
188    }
189
190    /// if the link will drop frames until the next keyframe on discont
191    pub fn wait_for_keyframe(&self) -> bool {
192        self.wait_for_keyframe.load(atomic::Ordering::SeqCst)
193    }
194
195    /// If set to `true` then the link will drop delta-frames until the next
196    /// keyframe on discont (default behavior).
197    pub fn set_wait_for_keyframe(&self, wait: bool) {
198        self.wait_for_keyframe.store(wait, atomic::Ordering::SeqCst)
199    }
200
201    /// Get the GStreamer `appsrc` wrapped by this link
202    pub fn appsrc(&self) -> &gst_app::AppSrc {
203        &self.consumer
204    }
205
206    /// Get the `StreamProducer` currently by this link, if any.
207    pub fn stream_producer(&self) -> Option<&StreamProducer> {
208        self.producer.as_ref()
209    }
210}
211
212impl Drop for ConsumptionLink {
213    fn drop(&mut self) {
214        self.disconnect();
215    }
216}
217
218#[derive(Debug, Error)]
219/// Error type returned when adding consumers to producers.
220pub enum AddConsumerError {
221    #[error("Consumer already added")]
222    /// Consumer has already been added to this producer.
223    AlreadyAdded,
224}
225
226impl StreamProducer {
227    /// Configure a consumer `appsrc` for later use in a `StreamProducer`
228    ///
229    /// This is automatically called when calling `add_consumer()`.
230    pub fn configure_consumer(consumer: &gst_app::AppSrc) {
231        // Latency on the appsrc is set by the publisher before the first buffer
232        // and whenever it changes
233        consumer.set_latency(gst::ClockTime::ZERO, gst::ClockTime::NONE);
234        consumer.set_format(gst::Format::Time);
235        consumer.set_is_live(true);
236        consumer.set_handle_segment_change(true);
237        consumer.set_max_buffers(0);
238        consumer.set_max_bytes(0);
239        consumer.set_max_time(500 * gst::ClockTime::MSECOND);
240        consumer.set_leaky_type(gst_app::AppLeakyType::Downstream);
241        consumer.set_automatic_eos(false);
242    }
243
244    /// Add an appsrc to dispatch data to.
245    ///
246    /// Dropping the returned `ConsumptionLink` will automatically disconnect the consumer from the producer.
247    pub fn add_consumer(
248        &self,
249        consumer: &gst_app::AppSrc,
250    ) -> Result<ConsumptionLink, AddConsumerError> {
251        let dropped = Arc::new(WrappedAtomicU64::new(0));
252        let pushed = Arc::new(WrappedAtomicU64::new(0));
253        let discard = Arc::new(atomic::AtomicBool::new(false));
254        let wait_for_keyframe = Arc::new(atomic::AtomicBool::new(true));
255
256        self.add_consumer_internal(
257            consumer,
258            true,
259            dropped.clone(),
260            pushed.clone(),
261            discard.clone(),
262            wait_for_keyframe.clone(),
263        )?;
264
265        Ok(ConsumptionLink {
266            consumer: consumer.clone(),
267            producer: Some(self.clone()),
268            dropped,
269            pushed,
270            discard,
271            wait_for_keyframe,
272        })
273    }
274
275    fn add_consumer_internal(
276        &self,
277        consumer: &gst_app::AppSrc,
278        configure_consumer: bool,
279        dropped: Arc<WrappedAtomicU64>,
280        pushed: Arc<WrappedAtomicU64>,
281        discard: Arc<atomic::AtomicBool>,
282        wait_for_keyframe: Arc<atomic::AtomicBool>,
283    ) -> Result<(), AddConsumerError> {
284        let mut consumers = self.0.consumers.lock().unwrap();
285        if consumers.consumers.contains_key(consumer) {
286            gst::error!(
287                CAT,
288                obj = &self.0.appsink,
289                "Consumer {} ({:?}) already added",
290                consumer.name(),
291                consumer
292            );
293            return Err(AddConsumerError::AlreadyAdded);
294        }
295
296        gst::debug!(
297            CAT,
298            obj = &self.0.appsink,
299            "Adding consumer {} ({:?})",
300            consumer.name(),
301            consumer
302        );
303
304        if configure_consumer {
305            Self::configure_consumer(consumer);
306        }
307
308        // Forward force-keyunit events upstream to the appsink
309        let srcpad = consumer.static_pad("src").unwrap();
310        let fku_probe_id = srcpad
311            .add_probe(
312                gst::PadProbeType::EVENT_UPSTREAM,
313                glib::clone!(
314                    #[weak(rename_to = appsink)]
315                    self.0.appsink,
316                    #[upgrade_or_panic]
317                    move |_pad, info| {
318                        let Some(event) = info.event() else {
319                            return gst::PadProbeReturn::Ok;
320                        };
321
322                        if gst_video::UpstreamForceKeyUnitEvent::parse(event).is_ok() {
323                            gst::debug!(CAT, obj = &appsink, "Requesting keyframe");
324                            // Do not use `gst_element_send_event()` as it takes the state lock which may lead to dead locks.
325                            let pad = appsink.static_pad("sink").unwrap();
326                            let _ = pad.push_event(event.clone());
327                        }
328
329                        gst::PadProbeReturn::Ok
330                    }
331                ),
332            )
333            .unwrap();
334
335        let stream_consumer = StreamConsumer::new(
336            consumer,
337            fku_probe_id,
338            dropped,
339            pushed,
340            discard,
341            wait_for_keyframe,
342        );
343
344        consumers
345            .consumers
346            .insert(consumer.clone(), stream_consumer);
347
348        // forward selected sticky events. We can send those now as appsrc will delay the events
349        // until stream-start, caps and segment are sent.
350        let events_to_forward = consumers.events_to_forward.clone();
351        // drop the lock before sending events
352        drop(consumers);
353
354        let appsink_pad = self.0.appsink.static_pad("sink").unwrap();
355        appsink_pad.sticky_events_foreach(|event| {
356            if events_to_forward.contains(&event.type_()) {
357                gst::debug!(
358                    CAT,
359                    obj = &self.0.appsink,
360                    "forward sticky event {:?}",
361                    event
362                );
363                consumer.send_event(event.clone());
364            }
365
366            std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
367        });
368
369        Ok(())
370    }
371
372    fn process_sample(
373        sample: gst::Sample,
374        appsink: &gst_app::AppSink,
375        mut consumers: MutexGuard<StreamConsumers>,
376    ) -> Result<gst::FlowSuccess, gst::FlowError> {
377        let (is_discont, is_keyframe) = if let Some(buf) = sample.buffer() {
378            let flags = buf.flags();
379
380            (
381                flags.contains(gst::BufferFlags::DISCONT),
382                !flags.contains(gst::BufferFlags::DELTA_UNIT),
383            )
384        } else {
385            (false, true)
386        };
387
388        gst::trace!(
389            CAT,
390            obj = appsink,
391            "processing sample {:?}",
392            sample.buffer()
393        );
394
395        let latency = consumers.current_latency;
396        let latency_updated = mem::replace(&mut consumers.latency_updated, false);
397
398        let mut needs_keyframe_request = false;
399
400        let current_consumers = consumers
401            .consumers
402            .values()
403            .filter_map(|consumer| {
404                if let Some(latency) = latency {
405                    if consumer
406                        .forwarded_latency
407                        .compare_exchange(
408                            false,
409                            true,
410                            atomic::Ordering::SeqCst,
411                            atomic::Ordering::SeqCst,
412                        )
413                        .is_ok()
414                        || latency_updated
415                    {
416                        gst::info!(CAT, obj = appsink, "setting new latency: {latency}");
417                        consumer.appsrc.set_latency(latency, gst::ClockTime::NONE);
418                    }
419                }
420
421                if consumer.discard.load(atomic::Ordering::SeqCst) {
422                    consumer
423                        .needs_keyframe
424                        .store(true, atomic::Ordering::SeqCst);
425                    return None;
426                }
427
428                if is_discont
429                    && !is_keyframe
430                    && consumer.wait_for_keyframe.load(atomic::Ordering::SeqCst)
431                {
432                    // Whenever we have a discontinuity, we need a new keyframe
433                    consumer
434                        .needs_keyframe
435                        .store(true, atomic::Ordering::SeqCst);
436                }
437
438                if !is_keyframe && consumer.needs_keyframe.load(atomic::Ordering::SeqCst) {
439                    // If we need a keyframe (and this one isn't) request a keyframe upstream
440                    if !needs_keyframe_request {
441                        gst::debug!(CAT, obj = appsink, "Requesting keyframe for first buffer");
442                        needs_keyframe_request = true;
443                    }
444
445                    consumer.dropped.fetch_add(1, atomic::Ordering::SeqCst);
446
447                    gst::error!(
448                        CAT,
449                        obj = appsink,
450                        "Ignoring frame for {} while waiting for a keyframe",
451                        consumer.appsrc.name()
452                    );
453                    None
454                } else {
455                    consumer
456                        .needs_keyframe
457                        .store(false, atomic::Ordering::SeqCst);
458                    consumer.pushed.fetch_add(1, atomic::Ordering::SeqCst);
459
460                    Some(consumer.appsrc.clone())
461                }
462            })
463            .collect::<Vec<_>>();
464
465        drop(consumers);
466
467        if needs_keyframe_request {
468            // Do not use `gst_element_send_event()` as it takes the state lock which may lead to dead locks.
469            let pad = appsink.static_pad("sink").unwrap();
470            pad.push_event(
471                gst_video::UpstreamForceKeyUnitEvent::builder()
472                    .all_headers(true)
473                    .build(),
474            );
475        }
476
477        for consumer in current_consumers {
478            if let Err(err) = consumer.push_sample(&sample) {
479                gst::warning!(CAT, obj = appsink, "Failed to push sample: {}", err);
480            }
481        }
482        Ok(gst::FlowSuccess::Ok)
483    }
484
485    /// Remove a consumer appsrc by id
486    pub fn remove_consumer(&self, consumer: &gst_app::AppSrc) {
487        let name = consumer.name();
488        if self
489            .0
490            .consumers
491            .lock()
492            .unwrap()
493            .consumers
494            .remove(consumer)
495            .is_some()
496        {
497            gst::debug!(
498                CAT,
499                obj = &self.0.appsink,
500                "Removed consumer {} ({:?})",
501                name,
502                consumer
503            );
504            consumer.set_callbacks(gst_app::AppSrcCallbacks::builder().build());
505        } else {
506            gst::debug!(
507                CAT,
508                obj = &self.0.appsink,
509                "Consumer {} ({:?}) not found",
510                name,
511                consumer
512            );
513        }
514    }
515
516    /// configure event types the appsink should forward to all its consumers (default: `Eos`).
517    pub fn set_forward_events(&self, events_to_forward: impl IntoIterator<Item = gst::EventType>) {
518        self.0.consumers.lock().unwrap().events_to_forward =
519            events_to_forward.into_iter().collect();
520    }
521
522    /// get event types the appsink should forward to all its consumers
523    pub fn get_forwarded_events(&self) -> Vec<gst::EventType> {
524        self.0.consumers.lock().unwrap().events_to_forward.clone()
525    }
526
527    /// configure whether the preroll sample should be forwarded (default: `true`)
528    pub fn set_forward_preroll(&self, forward_preroll: bool) {
529        self.0.consumers.lock().unwrap().forward_preroll = forward_preroll;
530    }
531
532    /// Get the GStreamer `appsink` wrapped by this producer
533    pub fn appsink(&self) -> &gst_app::AppSink {
534        &self.0.appsink
535    }
536
537    /// Signals an error on all consumers
538    pub fn error(&self, error: &gst::glib::Error, debug: Option<&str>) {
539        let consumers = self.0.consumers.lock().unwrap();
540
541        for consumer in consumers.consumers.keys() {
542            let mut msg_builder =
543                gst::message::Error::builder_from_error(error.clone()).src(consumer);
544            if let Some(debug) = debug {
545                msg_builder = msg_builder.debug(debug);
546            }
547
548            let _ = consumer.post_message(msg_builder.build());
549        }
550    }
551
552    /// The last sample produced by this producer.
553    pub fn last_sample(&self) -> Option<gst::Sample> {
554        self.0.appsink.property("last-sample")
555    }
556}
557
558impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
559    fn from(appsink: &'a gst_app::AppSink) -> Self {
560        let consumers = Arc::new(Mutex::new(StreamConsumers {
561            current_latency: None,
562            latency_updated: false,
563            consumers: HashMap::new(),
564            // it would make sense to automatically forward more events such as Tag but that would break
565            // with older GStreamer, see https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4297
566            events_to_forward: vec![gst::EventType::Eos, gst::EventType::Gap],
567            forward_preroll: true,
568            just_forwarded_preroll: false,
569        }));
570
571        appsink.set_callbacks(
572            gst_app::AppSinkCallbacks::builder()
573                .new_sample(glib::clone!(
574                    #[strong]
575                    consumers,
576                    move |appsink| {
577                        let mut consumers = consumers.lock().unwrap();
578
579                        let sample = match appsink.pull_sample() {
580                            Ok(sample) => sample,
581                            Err(_err) => {
582                                gst::debug!(CAT, obj = appsink, "Failed to pull sample");
583                                return Err(gst::FlowError::Flushing);
584                            }
585                        };
586
587                        let just_forwarded_preroll =
588                            mem::replace(&mut consumers.just_forwarded_preroll, false);
589
590                        if just_forwarded_preroll {
591                            return Ok(gst::FlowSuccess::Ok);
592                        }
593
594                        StreamProducer::process_sample(sample, appsink, consumers)
595                    }
596                ))
597                .new_preroll(glib::clone!(
598                    #[strong]
599                    consumers,
600                    move |appsink| {
601                        let mut consumers = consumers.lock().unwrap();
602
603                        let sample = match appsink.pull_preroll() {
604                            Ok(sample) => sample,
605                            Err(_err) => {
606                                gst::debug!(CAT, obj = appsink, "Failed to pull preroll");
607                                return Err(gst::FlowError::Flushing);
608                            }
609                        };
610
611                        if consumers.forward_preroll {
612                            consumers.just_forwarded_preroll = true;
613
614                            StreamProducer::process_sample(sample, appsink, consumers)
615                        } else {
616                            Ok(gst::FlowSuccess::Ok)
617                        }
618                    }
619                ))
620                .new_event(glib::clone!(
621                    #[strong]
622                    consumers,
623                    move |appsink| {
624                        match appsink
625                            .pull_object()
626                            .map(|obj| obj.downcast::<gst::Event>())
627                        {
628                            Ok(Ok(event)) => {
629                                let (events_to_forward, appsrcs) = {
630                                    // clone so we don't keep the lock while pushing events
631                                    let consumers = consumers.lock().unwrap();
632                                    let events = consumers.events_to_forward.clone();
633                                    let appsrcs =
634                                        consumers.consumers.keys().cloned().collect::<Vec<_>>();
635
636                                    (events, appsrcs)
637                                };
638
639                                if events_to_forward.contains(&event.type_()) {
640                                    for appsrc in appsrcs {
641                                        appsrc.send_event(event.clone());
642                                    }
643                                }
644                            }
645                            Ok(Err(_)) => {} // pulled another unsupported object type, ignore
646                            Err(_err) => gst::warning!(CAT, obj = appsink, "Failed to pull event"),
647                        }
648
649                        false
650                    }
651                ))
652                .eos(glib::clone!(
653                    #[strong]
654                    consumers,
655                    move |appsink| {
656                        let stream_consumers = consumers.lock().unwrap();
657
658                        if stream_consumers
659                            .events_to_forward
660                            .contains(&gst::EventType::Eos)
661                        {
662                            let current_consumers = stream_consumers
663                                .consumers
664                                .values()
665                                .map(|c| c.appsrc.clone())
666                                .collect::<Vec<_>>();
667                            drop(stream_consumers);
668
669                            for consumer in current_consumers {
670                                gst::debug!(
671                                    CAT,
672                                    obj = appsink,
673                                    "set EOS on consumer {}",
674                                    consumer.name()
675                                );
676                                let _ = consumer.end_of_stream();
677                            }
678                        } else {
679                            gst::debug!(CAT, obj = appsink, "don't forward EOS to consumers");
680                        }
681                    }
682                ))
683                .build(),
684        );
685
686        let sinkpad = appsink.static_pad("sink").unwrap();
687        let appsink_probe_id = sinkpad
688            .add_probe(
689                gst::PadProbeType::EVENT_UPSTREAM,
690                glib::clone!(
691                    #[strong]
692                    consumers,
693                    move |_pad, info| {
694                        let Some(event) = info.event() else {
695                            return gst::PadProbeReturn::Ok;
696                        };
697
698                        let gst::EventView::Latency(event) = event.view() else {
699                            return gst::PadProbeReturn::Ok;
700                        };
701
702                        let latency = event.latency();
703                        let mut consumers = consumers.lock().unwrap();
704                        consumers.current_latency = Some(latency);
705                        consumers.latency_updated = true;
706
707                        gst::PadProbeReturn::Ok
708                    }
709                ),
710            )
711            .unwrap();
712
713        StreamProducer(Arc::new(StreamProducerInner {
714            appsink: appsink.clone(),
715            appsink_probe_id: Some(appsink_probe_id),
716            consumers,
717        }))
718    }
719}
720
721/// Wrapper around a HashMap of consumers, exists for thread safety
722/// and also protects some of the producer state
723#[derive(Debug)]
724struct StreamConsumers {
725    /// The currently-observed latency
726    current_latency: Option<gst::ClockTime>,
727    /// Whether the consumers' appsrc latency needs updating
728    latency_updated: bool,
729    /// The consumers, AppSrc pointer value -> consumer
730    consumers: HashMap<gst_app::AppSrc, StreamConsumer>,
731    /// What events should be forwarded to consumers
732    events_to_forward: Vec<gst::EventType>,
733    /// Whether the preroll sample should be forwarded at all
734    forward_preroll: bool,
735    /// Whether we just forwarded the preroll sample. When we did we want to
736    /// discard the next sample from on_new_sample as it would cause us to
737    /// otherwise push out the same sample twice to consumers.
738    just_forwarded_preroll: bool,
739}
740
741/// Wrapper around a consumer's `appsrc`
742#[derive(Debug)]
743struct StreamConsumer {
744    /// The GStreamer `appsrc` of the consumer
745    appsrc: gst_app::AppSrc,
746    /// The id of a pad probe that intercepts force-key-unit events
747    fku_probe_id: Option<gst::PadProbeId>,
748    /// Whether an initial latency was forwarded to the `appsrc`
749    forwarded_latency: atomic::AtomicBool,
750    /// Whether a first buffer has made it through, used to determine
751    /// whether a new key unit should be requested. Only useful for encoded
752    /// streams.
753    needs_keyframe: Arc<atomic::AtomicBool>,
754    /// number of buffers dropped because `appsrc` internal queue was full
755    dropped: Arc<WrappedAtomicU64>,
756    /// number of buffers pushed through `appsrc`
757    pushed: Arc<WrappedAtomicU64>,
758    /// if buffers should not be pushed to the `appsrc` right now
759    discard: Arc<atomic::AtomicBool>,
760    /// whether the consumer should drop delta frames until next keyframe on discont
761    wait_for_keyframe: Arc<atomic::AtomicBool>,
762}
763
764impl StreamConsumer {
765    /// Create a new consumer
766    fn new(
767        appsrc: &gst_app::AppSrc,
768        fku_probe_id: gst::PadProbeId,
769        dropped: Arc<WrappedAtomicU64>,
770        pushed: Arc<WrappedAtomicU64>,
771        discard: Arc<atomic::AtomicBool>,
772        wait_for_keyframe: Arc<atomic::AtomicBool>,
773    ) -> Self {
774        let needs_keyframe = Arc::new(atomic::AtomicBool::new(
775            wait_for_keyframe.load(atomic::Ordering::SeqCst),
776        ));
777        let needs_keyframe_clone = needs_keyframe.clone();
778        let wait_for_keyframe_clone = wait_for_keyframe.clone();
779        let dropped_clone = dropped.clone();
780
781        appsrc.set_callbacks(
782            gst_app::AppSrcCallbacks::builder()
783                .enough_data(move |appsrc| {
784                    gst::debug!(
785                        CAT,
786                        obj = appsrc,
787                        "consumer {} ({:?}) is not consuming fast enough, old samples are getting dropped",
788                        appsrc.name(),
789                        appsrc,
790                    );
791
792                    needs_keyframe_clone.store(wait_for_keyframe_clone.load(atomic::Ordering::SeqCst), atomic::Ordering::SeqCst);
793                    dropped_clone.fetch_add(1, atomic::Ordering::SeqCst);
794                })
795                .build(),
796        );
797
798        StreamConsumer {
799            appsrc: appsrc.clone(),
800            fku_probe_id: Some(fku_probe_id),
801            forwarded_latency: atomic::AtomicBool::new(false),
802            needs_keyframe,
803            dropped,
804            pushed,
805            discard,
806            wait_for_keyframe,
807        }
808    }
809}
810
811impl Drop for StreamConsumer {
812    fn drop(&mut self) {
813        if let Some(fku_probe_id) = self.fku_probe_id.take() {
814            let srcpad = self.appsrc.static_pad("src").unwrap();
815            srcpad.remove_probe(fku_probe_id);
816        }
817    }
818}
819
820impl PartialEq for StreamConsumer {
821    fn eq(&self, other: &Self) -> bool {
822        self.appsrc.eq(&other.appsrc)
823    }
824}
825
826impl Eq for StreamConsumer {}
827
828impl std::hash::Hash for StreamConsumer {
829    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
830        std::hash::Hash::hash(&self.appsrc, state);
831    }
832}
833
834impl std::borrow::Borrow<gst_app::AppSrc> for StreamConsumer {
835    #[inline]
836    fn borrow(&self) -> &gst_app::AppSrc {
837        &self.appsrc
838    }
839}
840
841#[cfg(test)]
842mod tests {
843    use std::{
844        str::FromStr,
845        sync::{Arc, Mutex},
846    };
847
848    use futures::{
849        channel::{mpsc, mpsc::Receiver},
850        SinkExt, StreamExt,
851    };
852    use gst::prelude::*;
853
854    use crate::{ConsumptionLink, StreamProducer};
855
856    fn create_producer() -> (
857        gst::Pipeline,
858        gst_app::AppSrc,
859        gst_app::AppSink,
860        StreamProducer,
861    ) {
862        let producer_pipe =
863            gst::parse::launch("appsrc name=producer_src ! appsink name=producer_sink")
864                .unwrap()
865                .downcast::<gst::Pipeline>()
866                .unwrap();
867        let producer_sink = producer_pipe
868            .by_name("producer_sink")
869            .unwrap()
870            .downcast::<gst_app::AppSink>()
871            .unwrap();
872
873        (
874            producer_pipe.clone(),
875            producer_pipe
876                .by_name("producer_src")
877                .unwrap()
878                .downcast::<gst_app::AppSrc>()
879                .unwrap(),
880            producer_sink.clone(),
881            StreamProducer::from(&producer_sink),
882        )
883    }
884
885    struct Consumer {
886        pipeline: gst::Pipeline,
887        src: gst_app::AppSrc,
888        sink: gst_app::AppSink,
889        receiver: Mutex<Receiver<gst::Sample>>,
890        connected: Mutex<bool>,
891    }
892
893    impl Consumer {
894        fn new(id: &str) -> Self {
895            let pipeline = gst::parse::launch(&format!("appsrc name={id} ! appsink name=sink"))
896                .unwrap()
897                .downcast::<gst::Pipeline>()
898                .unwrap();
899
900            let (sender, receiver) = mpsc::channel::<gst::Sample>(1000);
901            let sender = Arc::new(Mutex::new(sender));
902            let sink = pipeline
903                .by_name("sink")
904                .unwrap()
905                .downcast::<gst_app::AppSink>()
906                .unwrap();
907
908            sink.set_callbacks(
909                gst_app::AppSinkCallbacks::builder()
910                    // Add a handler to the "new-sample" signal.
911                    .new_sample(move |appsink| {
912                        // Pull the sample in question out of the appsink's buffer.
913                        let sender_clone = sender.clone();
914                        futures::executor::block_on(
915                            sender_clone
916                                .lock()
917                                .unwrap()
918                                .send(appsink.pull_sample().unwrap()),
919                        )
920                        .unwrap();
921
922                        Ok(gst::FlowSuccess::Ok)
923                    })
924                    .build(),
925            );
926
927            Self {
928                pipeline: pipeline.clone(),
929                src: pipeline
930                    .by_name(id)
931                    .unwrap()
932                    .downcast::<gst_app::AppSrc>()
933                    .unwrap(),
934                sink,
935                receiver: Mutex::new(receiver),
936                connected: Mutex::new(false),
937            }
938        }
939
940        fn connect(&self, producer: &StreamProducer) -> ConsumptionLink {
941            {
942                let mut connected = self.connected.lock().unwrap();
943                *connected = true;
944            }
945
946            producer.add_consumer(&self.src).unwrap()
947        }
948
949        fn disconnect(&self, producer: &StreamProducer) {
950            {
951                let mut connected = self.connected.lock().unwrap();
952                *connected = false;
953            }
954
955            producer.remove_consumer(&self.src);
956        }
957    }
958
959    #[test]
960    fn simple() {
961        gst::init().unwrap();
962
963        let (producer_pipe, producer_src, _producer_sink, producer) = create_producer();
964        producer_pipe
965            .set_state(gst::State::Playing)
966            .expect("Couldn't set producer pipeline state");
967
968        let mut consumers: Vec<Consumer> = Vec::new();
969        let consumer = Consumer::new("consumer1");
970        let link1 = consumer.connect(&producer);
971        consumer
972            .pipeline
973            .set_state(gst::State::Playing)
974            .expect("Couldn't set producer pipeline state");
975        consumers.push(consumer);
976
977        let consumer = Consumer::new("consumer2");
978        let link2 = consumer.connect(&producer);
979        consumer
980            .pipeline
981            .set_state(gst::State::Playing)
982            .expect("Couldn't set producer pipeline state");
983        consumers.push(consumer);
984
985        assert!(producer.last_sample().is_none());
986
987        for i in 0..10 {
988            let caps = gst::Caps::from_str(&format!("test,n={i}")).unwrap();
989            producer_src.set_caps(Some(&caps));
990            producer_src.push_buffer(gst::Buffer::new()).unwrap();
991
992            for consumer in &consumers {
993                if *consumer.connected.lock().unwrap() {
994                    let sample =
995                        futures::executor::block_on(consumer.receiver.lock().unwrap().next())
996                            .expect("Received an empty buffer?");
997                    sample.buffer().expect("No buffer on the sample?");
998                    assert_eq!(sample.caps(), Some(caps.as_ref()));
999                } else {
1000                    debug_assert!(
1001                        consumer
1002                            .sink
1003                            .try_pull_sample(gst::ClockTime::from_nseconds(0))
1004                            .is_none(),
1005                        "Disconnected consumer got a new sample?!"
1006                    );
1007                }
1008            }
1009
1010            if i == 5 {
1011                consumers.first().unwrap().disconnect(&producer);
1012            }
1013        }
1014
1015        assert!(producer.last_sample().is_some());
1016
1017        assert_eq!(link1.pushed(), 6);
1018        assert_eq!(link1.dropped(), 0);
1019        assert_eq!(link2.pushed(), 10);
1020        assert_eq!(link2.dropped(), 0);
1021    }
1022}