ts-intersrc
Thread-sharing source for inter-pipelines communication.
ts-intersrc
is an element that proxies events travelling downstream, non-serialized
queries and buffers (including metas) from another pipeline that contains a matching
ts-intersink
element. The purpose is to allow one to many decoupled pipelines
to function as though they were one without having to manually shuttle buffers,
events, queries, etc.
The ts-intersink
& ts-intersrc
elements take advantage of the threadshare
runtime, reducing the number of threads & context switches which would be
necessary with other forms of inter-pipelines elements.
Usage
use futures::prelude::*;
use gst::prelude::*;
let g_ctx = gst::glib::MainContext::default();
gst::init().unwrap();
// An upstream pipeline producing a 1 second Opus encoded audio stream
let pipe_up = gst::parse::launch(
"
audiotestsrc is-live=true num-buffers=50 volume=0.02
! opusenc
! ts-intersink inter-context=my-inter-ctx
",
)
.unwrap()
.downcast::<gst::Pipeline>()
.unwrap();
// A downstream pipeline which will receive the Opus encoded audio stream
// and render it locally.
let pipe_down = gst::parse::launch(
"
ts-intersrc inter-context=my-inter-ctx context=ts-group-01 context-wait=20
! opusdec
! audioconvert
! audioresample
! ts-queue context=ts-group-01 context-wait=20 max-size-buffers=1 max-size-bytes=0 max-size-time=0
! autoaudiosink
",
)
.unwrap()
.downcast::<gst::Pipeline>()
.unwrap();
// Both pipelines must agree on the timing information or we'll get glitches
// or overruns/underruns. Ideally, we should tell pipe_up to use the same clock
// as pipe_down, but since that will be set asynchronously to the audio clock, it
// is simpler and likely accurate enough to use the system clock for both
// pipelines. If no element in either pipeline will provide a clock, this
// is not needed.
let clock = gst::SystemClock::obtain();
pipe_up.set_clock(Some(&clock)).unwrap();
pipe_down.set_clock(Some(&clock)).unwrap();
// This is not really needed in this case since the pipelines are created and
// started at the same time. However, an application that dynamically
// generates pipelines must ensure that all the pipelines that will be
// connected together share the same base time.
pipe_up.set_base_time(gst::ClockTime::ZERO);
pipe_up.set_start_time(gst::ClockTime::NONE);
pipe_down.set_base_time(gst::ClockTime::ZERO);
pipe_down.set_start_time(gst::ClockTime::NONE);
pipe_up.set_state(gst::State::Playing).unwrap();
pipe_down.set_state(gst::State::Playing).unwrap();
g_ctx.block_on(async {
use gst::MessageView::*;
let mut bus_up_stream = pipe_up.bus().unwrap().stream();
let mut bus_down_stream = pipe_down.bus().unwrap().stream();
loop {
futures::select! {
msg = bus_up_stream.next() => {
let Some(msg) = msg else { continue };
match msg.view() {
Latency(_) => {
let _ = pipe_down.recalculate_latency();
}
Error(err) => {
eprintln!("Error with downstream pipeline {err:?}");
break;
}
_ => (),
}
}
msg = bus_down_stream.next() => {
let Some(msg) = msg else { continue };
match msg.view() {
Latency(_) => {
let _ = pipe_down.recalculate_latency();
}
Eos(_) => {
println!("Got EoS");
break;
}
Error(err) => {
eprintln!("Error with downstream pipeline {err:?}");
break;
}
_ => (),
}
}
};
}
});
pipe_up.set_state(gst::State::Null).unwrap();
pipe_down.set_state(gst::State::Null).unwrap();
Hierarchy
GObject ╰──GInitiallyUnowned ╰──GstObject ╰──GstElement ╰──ts-intersrc
Factory details
Authors: – François Laignel
Classification: – Source/Generic
Rank – none
Plugin – threadshare
Package – gst-plugin-threadshare
Pad Templates
Properties
context
“context” gchararray
Context name to share threads with
Flags : Read / Write / Construct Only
context-wait
“context-wait” guint
Throttle poll loop to run at most once every this many ms
Flags : Read / Write / Construct Only
Default value : 0
current-level-buffers
“current-level-buffers” guint
Current number of buffers in the queue
Flags : Read
Default value : 0
current-level-bytes
“current-level-bytes” guint
Current amount of data in the queue (bytes)
Flags : Read
Default value : 0
current-level-time
“current-level-time” guint64
Current amount of data in the queue (in ns)
Flags : Read
Default value : 0
inter-context
“inter-context” gchararray
Context name of the inter elements to share with
Flags : Read / Write
max-size-buffers
“max-size-buffers” guint
Maximum number of buffers to queue (0=unlimited)
Flags : Read / Write / Construct Only
Default value : 200
max-size-bytes
“max-size-bytes” guint
Maximum number of bytes to queue (0=unlimited)
Flags : Read / Write / Construct Only
Default value : 1048576
max-size-time
“max-size-time” guint64
Maximum number of nanoseconds to queue (0=unlimited)
Flags : Read / Write / Construct Only
Default value : 1000000000
The results of the search are