ts-intersrc

Thread-sharing source for inter-pipelines communication.

ts-intersrc is an element that proxies events travelling downstream, non-serialized queries and buffers (including metas) from another pipeline that contains a matching ts-intersink element. The purpose is to allow one to many decoupled pipelines to function as though they were one without having to manually shuttle buffers, events, queries, etc.

The ts-intersink & ts-intersrc elements take advantage of the threadshare runtime, reducing the number of threads & context switches which would be necessary with other forms of inter-pipelines elements.

Usage

 use futures::prelude::*;
 use gst::prelude::*;

 let g_ctx = gst::glib::MainContext::default();
 gst::init().unwrap();

 // An upstream pipeline producing a 1 second Opus encoded audio stream
 let pipe_up = gst::parse::launch(
     "
         audiotestsrc is-live=true num-buffers=50 volume=0.02
         ! opusenc
         ! ts-intersink inter-context=my-inter-ctx
     ",
 )
 .unwrap()
 .downcast::<gst::Pipeline>()
 .unwrap();

 // A downstream pipeline which will receive the Opus encoded audio stream
 // and render it locally.
 let pipe_down = gst::parse::launch(
     "
         ts-intersrc inter-context=my-inter-ctx context=ts-group-01 context-wait=20
         ! opusdec
         ! audioconvert
         ! audioresample
         ! ts-queue context=ts-group-01 context-wait=20 max-size-buffers=1 max-size-bytes=0 max-size-time=0
         ! autoaudiosink
     ",
 )
 .unwrap()
 .downcast::<gst::Pipeline>()
 .unwrap();

 // Both pipelines must agree on the timing information or we'll get glitches
 // or overruns/underruns. Ideally, we should tell pipe_up to use the same clock
 // as pipe_down, but since that will be set asynchronously to the audio clock, it
 // is simpler and likely accurate enough to use the system clock for both
 // pipelines. If no element in either pipeline will provide a clock, this
 // is not needed.
 let clock = gst::SystemClock::obtain();
 pipe_up.set_clock(Some(&clock)).unwrap();
 pipe_down.set_clock(Some(&clock)).unwrap();

 // This is not really needed in this case since the pipelines are created and
 // started at the same time. However, an application that dynamically
 // generates pipelines must ensure that all the pipelines that will be
 // connected together share the same base time.
 pipe_up.set_base_time(gst::ClockTime::ZERO);
 pipe_up.set_start_time(gst::ClockTime::NONE);
 pipe_down.set_base_time(gst::ClockTime::ZERO);
 pipe_down.set_start_time(gst::ClockTime::NONE);

 pipe_up.set_state(gst::State::Playing).unwrap();
 pipe_down.set_state(gst::State::Playing).unwrap();

 g_ctx.block_on(async {
     use gst::MessageView::*;

     let mut bus_up_stream = pipe_up.bus().unwrap().stream();
     let mut bus_down_stream = pipe_down.bus().unwrap().stream();

     loop {
         futures::select! {
             msg = bus_up_stream.next() => {
                 let Some(msg) = msg else { continue };
                 match msg.view() {
                     Latency(_) => {
                         let _ = pipe_down.recalculate_latency();
                     }
                     Error(err) => {
                         eprintln!("Error with downstream pipeline {err:?}");
                         break;
                     }
                     _ => (),
                 }
             }
             msg = bus_down_stream.next() => {
                 let Some(msg) = msg else { continue };
                 match msg.view() {
                     Latency(_) => {
                         let _ = pipe_down.recalculate_latency();
                     }
                     Eos(_) => {
                         println!("Got EoS");
                         break;
                     }
                     Error(err) => {
                         eprintln!("Error with downstream pipeline {err:?}");
                         break;
                     }
                     _ => (),
                 }
             }
         };
     }
 });

 pipe_up.set_state(gst::State::Null).unwrap();
 pipe_down.set_state(gst::State::Null).unwrap();

Hierarchy

GObject
    ╰──GInitiallyUnowned
        ╰──GstObject
            ╰──GstElement
                ╰──ts-intersrc

Factory details

Authors: – François Laignel

Classification:Source/Generic

Rank – none

Plugin – threadshare

Package – gst-plugin-threadshare

Pad Templates

src

ANY

Presencealways

Directionsrc

Object typeGstPad


Properties

context

“context” gchararray

Context name to share threads with

Flags : Read / Write / Construct Only


context-wait

“context-wait” guint

Throttle poll loop to run at most once every this many ms

Flags : Read / Write / Construct Only

Default value : 0


current-level-buffers

“current-level-buffers” guint

Current number of buffers in the queue

Flags : Read

Default value : 0


current-level-bytes

“current-level-bytes” guint

Current amount of data in the queue (bytes)

Flags : Read

Default value : 0


current-level-time

“current-level-time” guint64

Current amount of data in the queue (in ns)

Flags : Read

Default value : 0


inter-context

“inter-context” gchararray

Context name of the inter elements to share with

Flags : Read / Write


max-size-buffers

“max-size-buffers” guint

Maximum number of buffers to queue (0=unlimited)

Flags : Read / Write / Construct Only

Default value : 200


max-size-bytes

“max-size-bytes” guint

Maximum number of bytes to queue (0=unlimited)

Flags : Read / Write / Construct Only

Default value : 1048576


max-size-time

“max-size-time” guint64

Maximum number of nanoseconds to queue (0=unlimited)

Flags : Read / Write / Construct Only

Default value : 1000000000


The results of the search are