ts-intersrc
Thread-sharing source for inter-pipelines communication.
ts-intersrc
is an element that proxies events travelling downstream, non-serialized
queries and buffers (including metas) from another pipeline that contains a matching
ts-intersink
element. The purpose is to allow one to many decoupled pipelines
to function as though they were one without having to manually shuttle buffers,
events, queries, etc.
This doesn't support dynamically changing ts-intersink
for now.
The ts-intersink
& ts-intersrc
elements take advantage of the threadshare
runtime, reducing the number of threads & context switches which would be
necessary with other forms of inter-pipelines elements.
Usage
use futures::prelude::*;
use gst::prelude::*;
let g_ctx = gst::glib::MainContext::default();
gst::init().unwrap();
// An upstream pipeline producing a 1 second Opus encoded audio stream
let pipe_up = gst::parse::launch(
"
audiotestsrc is-live=true num-buffers=50 volume=0.02
! opusenc
! ts-intersink inter-context=my-inter-ctx
",
)
.unwrap()
.downcast::<gst::Pipeline>()
.unwrap();
// A downstream pipeline which will receive the Opus encoded audio stream
// and render it locally.
let pipe_down = gst::parse::launch(
"
ts-intersrc inter-context=my-inter-ctx context=ts-group-01 context-wait=20
! opusdec
! audioconvert
! audioresample
! ts-queue context=ts-group-01 context-wait=20 max-size-buffers=1 max-size-bytes=0 max-size-time=0
! autoaudiosink
",
)
.unwrap()
.downcast::<gst::Pipeline>()
.unwrap();
// Both pipelines must agree on the timing information or we'll get glitches
// or overruns/underruns. Ideally, we should tell pipe_up to use the same clock
// as pipe_down, but since that will be set asynchronously to the audio clock, it
// is simpler and likely accurate enough to use the system clock for both
// pipelines. If no element in either pipeline will provide a clock, this
// is not needed.
let clock = gst::SystemClock::obtain();
pipe_up.set_clock(Some(&clock)).unwrap();
pipe_down.set_clock(Some(&clock)).unwrap();
// This is not really needed in this case since the pipelines are created and
// started at the same time. However, an application that dynamically
// generates pipelines must ensure that all the pipelines that will be
// connected together share the same base time.
pipe_up.set_base_time(gst::ClockTime::ZERO);
pipe_up.set_start_time(gst::ClockTime::NONE);
pipe_down.set_base_time(gst::ClockTime::ZERO);
pipe_down.set_start_time(gst::ClockTime::NONE);
pipe_up.set_state(gst::State::Playing).unwrap();
pipe_down.set_state(gst::State::Playing).unwrap();
g_ctx.block_on(async {
use gst::MessageView::*;
let mut bus_up_stream = pipe_up.bus().unwrap().stream();
let mut bus_down_stream = pipe_down.bus().unwrap().stream();
loop {
futures::select! {
msg = bus_up_stream.next() => {
let Some(msg) = msg else { continue };
match msg.view() {
Latency(_) => {
let _ = pipe_down.recalculate_latency();
}
Error(err) => {
eprintln!("Error with downstream pipeline {err:?}");
break;
}
_ => (),
}
}
msg = bus_down_stream.next() => {
let Some(msg) = msg else { continue };
match msg.view() {
Latency(_) => {
let _ = pipe_down.recalculate_latency();
}
Eos(_) => {
println!("Got EoS");
break;
}
Error(err) => {
eprintln!("Error with downstream pipeline {err:?}");
break;
}
_ => (),
}
}
};
}
});
pipe_up.set_state(gst::State::Null).unwrap();
pipe_down.set_state(gst::State::Null).unwrap();
Hierarchy
GObject ╰──GInitiallyUnowned ╰──GstObject ╰──GstElement ╰──ts-intersrc
Factory details
Authors: – François Laignel
Classification: – Source/Generic
Rank – none
Plugin – threadshare
Package – gst-plugin-threadshare
Pad Templates
Properties
context
“context” gchararray
Context name to share threads with
Flags : Read / Write / Construct Only
context-wait
“context-wait” guint
Throttle poll loop to run at most once every this many ms
Flags : Read / Write / Construct Only
Default value : 0
inter-context
“inter-context” gchararray
Context name of the inter elements to share with
Flags : Read / Write / Construct Only
max-size-buffers
“max-size-buffers” guint
Maximum number of buffers to queue (0=unlimited)
Flags : Read / Write / Construct Only
Default value : 200
max-size-bytes
“max-size-bytes” guint
Maximum number of bytes to queue (0=unlimited)
Flags : Read / Write / Construct Only
Default value : 1048576
max-size-time
“max-size-time” guint64
Maximum number of nanoseconds to queue (0=unlimited)
Flags : Read / Write / Construct Only
Default value : 1000000000
The results of the search are