ts-intersrc

Thread-sharing source for inter-pipelines communication.

ts-intersrc is an element that proxies events travelling downstream, non-serialized queries and buffers (including metas) from another pipeline that contains a matching ts-intersink element. The purpose is to allow one to many decoupled pipelines to function as though they were one without having to manually shuttle buffers, events, queries, etc.

This doesn't support dynamically changing ts-intersink for now.

The ts-intersink & ts-intersrc elements take advantage of the threadshare runtime, reducing the number of threads & context switches which would be necessary with other forms of inter-pipelines elements.

Usage

 use futures::prelude::*;
 use gst::prelude::*;

 let g_ctx = gst::glib::MainContext::default();
 gst::init().unwrap();

 // An upstream pipeline producing a 1 second Opus encoded audio stream
 let pipe_up = gst::parse::launch(
     "
         audiotestsrc is-live=true num-buffers=50 volume=0.02
         ! opusenc
         ! ts-intersink inter-context=my-inter-ctx
     ",
 )
 .unwrap()
 .downcast::<gst::Pipeline>()
 .unwrap();

 // A downstream pipeline which will receive the Opus encoded audio stream
 // and render it locally.
 let pipe_down = gst::parse::launch(
     "
         ts-intersrc inter-context=my-inter-ctx context=ts-group-01 context-wait=20
         ! opusdec
         ! audioconvert
         ! audioresample
         ! ts-queue context=ts-group-01 context-wait=20 max-size-buffers=1 max-size-bytes=0 max-size-time=0
         ! autoaudiosink
     ",
 )
 .unwrap()
 .downcast::<gst::Pipeline>()
 .unwrap();

 // Both pipelines must agree on the timing information or we'll get glitches
 // or overruns/underruns. Ideally, we should tell pipe_up to use the same clock
 // as pipe_down, but since that will be set asynchronously to the audio clock, it
 // is simpler and likely accurate enough to use the system clock for both
 // pipelines. If no element in either pipeline will provide a clock, this
 // is not needed.
 let clock = gst::SystemClock::obtain();
 pipe_up.set_clock(Some(&clock)).unwrap();
 pipe_down.set_clock(Some(&clock)).unwrap();

 // This is not really needed in this case since the pipelines are created and
 // started at the same time. However, an application that dynamically
 // generates pipelines must ensure that all the pipelines that will be
 // connected together share the same base time.
 pipe_up.set_base_time(gst::ClockTime::ZERO);
 pipe_up.set_start_time(gst::ClockTime::NONE);
 pipe_down.set_base_time(gst::ClockTime::ZERO);
 pipe_down.set_start_time(gst::ClockTime::NONE);

 pipe_up.set_state(gst::State::Playing).unwrap();
 pipe_down.set_state(gst::State::Playing).unwrap();

 g_ctx.block_on(async {
     use gst::MessageView::*;

     let mut bus_up_stream = pipe_up.bus().unwrap().stream();
     let mut bus_down_stream = pipe_down.bus().unwrap().stream();

     loop {
         futures::select! {
             msg = bus_up_stream.next() => {
                 let Some(msg) = msg else { continue };
                 match msg.view() {
                     Latency(_) => {
                         let _ = pipe_down.recalculate_latency();
                     }
                     Error(err) => {
                         eprintln!("Error with downstream pipeline {err:?}");
                         break;
                     }
                     _ => (),
                 }
             }
             msg = bus_down_stream.next() => {
                 let Some(msg) = msg else { continue };
                 match msg.view() {
                     Latency(_) => {
                         let _ = pipe_down.recalculate_latency();
                     }
                     Eos(_) => {
                         println!("Got EoS");
                         break;
                     }
                     Error(err) => {
                         eprintln!("Error with downstream pipeline {err:?}");
                         break;
                     }
                     _ => (),
                 }
             }
         };
     }
 });

 pipe_up.set_state(gst::State::Null).unwrap();
 pipe_down.set_state(gst::State::Null).unwrap();

Hierarchy

GObject
    ╰──GInitiallyUnowned
        ╰──GstObject
            ╰──GstElement
                ╰──ts-intersrc

Factory details

Authors: – François Laignel

Classification:Source/Generic

Rank – none

Plugin – threadshare

Package – gst-plugin-threadshare

Pad Templates

src

ANY

Presencealways

Directionsrc

Object typeGstPad


Properties

context

“context” gchararray

Context name to share threads with

Flags : Read / Write / Construct Only


context-wait

“context-wait” guint

Throttle poll loop to run at most once every this many ms

Flags : Read / Write / Construct Only

Default value : 0


inter-context

“inter-context” gchararray

Context name of the inter elements to share with

Flags : Read / Write / Construct Only


max-size-buffers

“max-size-buffers” guint

Maximum number of buffers to queue (0=unlimited)

Flags : Read / Write / Construct Only

Default value : 200


max-size-bytes

“max-size-bytes” guint

Maximum number of bytes to queue (0=unlimited)

Flags : Read / Write / Construct Only

Default value : 1048576


max-size-time

“max-size-time” guint64

Maximum number of nanoseconds to queue (0=unlimited)

Flags : Read / Write / Construct Only

Default value : 1000000000


The results of the search are