Chapter 19. Pipeline manipulation

Table of Contents

Using probes
Data probes
Play a region of a media file
Manually adding or removing data from/to a pipeline
Inserting data with appsrc
Grabbing data with appsink
Forcing a format
Changing format in a PLAYING pipeline
Dynamically changing the pipeline
Changing elements in a pipeline

This chapter will discuss how you can manipulate your pipeline in several ways from your application on. Parts of this chapter are very lowlevel, so be assured that you'll need some programming knowledge and a good understanding of GStreamer before you start reading this.

Topics that will be discussed here include how you can insert data into a pipeline from your application, how to read data from a pipeline, how to manipulate the pipeline's speed, length, starting point and how to listen to a pipeline's data processing.

Using probes

Probing is best envisioned as a pad listener. Technically, a probe is nothing more than a callback that can be attached to a pad. You can attach a probe using gst_pad_add_probe (). Similarly, one can use the gst_pad_remove_probe () to remove the callback again. The probe notifies you of any activity that happens on the pad, like buffers, events and queries. You can define what kind of notifications you are interested in when you add the probe.

The probe can notify you of the following activity on pads:

  • A buffer is pushed or pulled. You want to specify the GST_PAD_PROBE_TYPE_BUFFER when registering the probe. Because the pad can be scheduled in different ways, it is possible to also specify in what scheduling mode you are interested with the optional GST_PAD_PROBE_TYPE_PUSH and GST_PAD_PROBE_TYPE_PULL flags.

    You can use this probe to inspect, modify or drop the buffer. See the section called “Data probes”.

  • A bufferlist is pushed. Use the GST_PAD_PROBE_TYPE_BUFFER_LIST when registering the probe.

  • An event travels over a pad. Use the GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM and GST_PAD_PROBE_TYPE_EVENT_UPSTREAM flags to select downstream and upstream events. There is also a convenience GST_PAD_PROBE_TYPE_EVENT_BOTH to be notified of events going both upstream and downstream. By default, flush events do not cause a notification. You need to explicitly enable GST_PAD_PROBE_TYPE_EVENT_FLUSH to receive callbacks from flushing events. Events are always only notified in push mode.

    You can use this probe to inspect, modify or drop the event.

  • A query travels over a pad. Use the GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM and GST_PAD_PROBE_TYPE_QUERY_UPSTREAM flags to select downstream and upstream queries. The convenience GST_PAD_PROBE_TYPE_QUERY_BOTH can also be used to select both directions. Query probes will be notified twice, once when the query travels upstream/downstream and once when the query result is returned. You can select in what stage the callback will be called with the GST_PAD_PROBE_TYPE_PUSH and GST_PAD_PROBE_TYPE_PULL, respectively when the query is performed and when the query result is returned.

    You can use this probe to inspect or modify the query. You can also answer the query in the probe callback by placing the result value in the query and by returning GST_PAD_PROBE_DROP from the callback.

  • In addition to notifying you of dataflow, you can also ask the probe to block the dataflow when the callback returns. This is called a blocking probe and is activated by specifying the GST_PAD_PROBE_TYPE_BLOCK flag. You can use this flag with the other flags to only block dataflow on selected activity. A pad becomes unblocked again if you remove the probe or when you return GST_PAD_PROBE_REMOVE from the callback. You can let only the currently blocked item pass by returning GST_PAD_PROBE_PASS from the callback, it will block again on the next item.

    Blocking probes are used to temporarily block pads because they are unlinked or because you are going to unlink them. If the dataflow is not blocked, the pipeline would go into an error state if data is pushed on an unlinked pad. We will se how to use blocking probes to partially preroll a pipeline. See also the section called “Play a region of a media file”.

  • Be notified when no activity is happening on a pad. You install this probe with the GST_PAD_PROBE_TYPE_IDLE flag. You can specify GST_PAD_PROBE_TYPE_PUSH and/or GST_PAD_PROBE_TYPE_PULL to only be notified depending on the pad scheduling mode. The IDLE probe is also a blocking probe in that it will not let any data pass on the pad for as long as the IDLE probe is installed.

    You can use idle probes to dynamically relink a pad. We will see how to use idle probes to replace an element in the pipeline. See also the section called “Dynamically changing the pipeline”.

Data probes

Data probes allow you to be notified when there is data passing on a pad. When adding the probe, specify the GST_PAD_PROBE_TYPE_BUFFER and/or GST_PAD_PROBE_TYPE_BUFFER_LIST.

Data probes run in pipeline streaming thread context, so callbacks should try to not block and generally not do any weird stuff, since this could have a negative impact on pipeline performance or, in case of bugs, cause deadlocks or crashes. More precisely, one should usually not call any GUI-related functions from within a probe callback, nor try to change the state of the pipeline. An application may post custom messages on the pipeline's bus though to communicate with the main application thread and have it do things like stop the pipeline.

In any case, most common buffer operations that elements can do in _chain () functions, can be done in probe callbacks as well. The example below gives a short impression on how to use them.



#include <gst/gst.h>

static GstPadProbeReturn
cb_have_data (GstPad          *pad,
              GstPadProbeInfo *info,
              gpointer         user_data)
{
  gint x, y;
  GstMapInfo map;
  guint16 *ptr, t;
  GstBuffer *buffer;

  buffer = GST_PAD_PROBE_INFO_BUFFER (info);

  buffer = gst_buffer_make_writable (buffer);
  
  gst_buffer_map (buffer, &map, GST_MAP_WRITE);

  ptr = (guint16 *) map.data;
  /* invert data */
  for (y = 0; y < 288; y++) {
    for (x = 0; x < 384 / 2; x++) {
      t = ptr[384 - 1 - x];
      ptr[384 - 1 - x] = ptr[x];
      ptr[x] = t;
    }
    ptr += 384;
  }
  gst_buffer_unmap (buffer, &map);

  GST_PAD_PROBE_INFO_DATA (info) = buffer;

  return GST_PAD_PROBE_OK;
}

gint
main (gint   argc,
      gchar *argv[])
{
  GMainLoop *loop;
  GstElement *pipeline, *src, *sink, *filter, *csp;
  GstCaps *filtercaps;
  GstPad *pad;

  /* init GStreamer */
  gst_init (&argc, &argv);
  loop = g_main_loop_new (NULL, FALSE);

  /* build */
  pipeline = gst_pipeline_new ("my-pipeline");
  src = gst_element_factory_make ("videotestsrc", "src");
  if (src == NULL)
    g_error ("Could not create 'videotestsrc' element");

  filter = gst_element_factory_make ("capsfilter", "filter");
  g_assert (filter != NULL); /* should always exist */

  csp = gst_element_factory_make ("videoconvert", "csp");
  if (csp == NULL)
    g_error ("Could not create 'videoconvert' element");

  sink = gst_element_factory_make ("xvimagesink", "sink");
  if (sink == NULL) {
    sink = gst_element_factory_make ("ximagesink", "sink");
    if (sink == NULL)
      g_error ("Could not create neither 'xvimagesink' nor 'ximagesink' element");
  }

  gst_bin_add_many (GST_BIN (pipeline), src, filter, csp, sink, NULL);
  gst_element_link_many (src, filter, csp, sink, NULL);
  filtercaps = gst_caps_new_simple ("video/x-raw",
			   "format", G_TYPE_STRING, "RGB16",
			   "width", G_TYPE_INT, 384,
			   "height", G_TYPE_INT, 288,
			   "framerate", GST_TYPE_FRACTION, 25, 1,
			   NULL);
  g_object_set (G_OBJECT (filter), "caps", filtercaps, NULL);
  gst_caps_unref (filtercaps);

  pad = gst_element_get_static_pad (src, "src");
  gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BUFFER,
      (GstPadProbeCallback) cb_have_data, NULL, NULL);
  gst_object_unref (pad);

  /* run */
  gst_element_set_state (pipeline, GST_STATE_PLAYING);

  /* wait until it's up and running or failed */
  if (gst_element_get_state (pipeline, NULL, NULL, -1) == GST_STATE_CHANGE_FAILURE) {
    g_error ("Failed to go into PLAYING state");
  }

  g_print ("Running ...\n");
  g_main_loop_run (loop);

  /* exit */
  gst_element_set_state (pipeline, GST_STATE_NULL);
  gst_object_unref (pipeline);

  return 0;
}


      

Compare that output with the output of gst-launch-1.0 videotestsrc ! xvimagesink, just so you know what you're looking for.

Strictly speaking, a pad probe callback is only allowed to modify the buffer content if the buffer is writable. Whether this is the case or not depends a lot on the pipeline and the elements involved. Often enough, this is the case, but sometimes it is not, and if it is not then unexpected modification of the data or metadata can introduce bugs that are very hard to debug and track down. You can check if a buffer is writable with gst_buffer_is_writable (). Since you can pass back a different buffer than the one passed in, it is a good idea to make the buffer writable in the callback function with gst_buffer_make_writable ().

Pad probes are suited best for looking at data as it passes through the pipeline. If you need to modify data, you should better write your own GStreamer element. Base classes like GstAudioFilter, GstVideoFilter or GstBaseTransform make this fairly easy.

If you just want to inspect buffers as they pass through the pipeline, you don't even need to set up pad probes. You could also just insert an identity element into the pipeline and connect to its "handoff" signal. The identity element also provides a few useful debugging tools like the "dump" property or the "last-message" property (the latter is enabled by passing the '-v' switch to gst-launch and by setting the silent property on the identity to FALSE).

Play a region of a media file

In this example we will show you how to play back a region of a media file. The goal is to only play the part of a file from 2 seconds to 5 seconds and then EOS.

In a first step we will set a uridecodebin element to the PAUSED state and make sure that we block all the source pads that are created. When all the source pads are blocked, we have data on all source pads and we say that the uridecodebin is prerolled.

In a prerolled pipeline we can ask for the duration of the media and we can also perform seeks. We are interested in performing a seek operation on the pipeline to select the range of media that we are interested in.

After we configure the region we are interested in, we can link the sink element, unblock the source pads and set the pipeline to the playing state. You will see that exactly the requested region is played by the sink before it goes to EOS. What follows

What follows is an example application that loosly follows this algorithm.



#include <gst/gst.h>

static GMainLoop *loop;
static volatile gint counter;
static GstBus *bus;
static gboolean prerolled = FALSE;
static GstPad *sinkpad;

static void
dec_counter (GstElement * pipeline)
{
  if (prerolled)
    return;

  if (g_atomic_int_dec_and_test (&counter)) {
    /* all probes blocked and no-more-pads signaled, post
     * message on the bus. */
    prerolled = TRUE;

    gst_bus_post (bus, gst_message_new_application (
          GST_OBJECT_CAST (pipeline),
          gst_structure_new_empty ("ExPrerolled")));
  }
}

/* called when a source pad of uridecodebin is blocked */
static GstPadProbeReturn
cb_blocked (GstPad          *pad,
            GstPadProbeInfo *info,
            gpointer         user_data)
{
  GstElement *pipeline = GST_ELEMENT (user_data);

  if (prerolled)
    return GST_PAD_PROBE_REMOVE;

  dec_counter (pipeline);

  return GST_PAD_PROBE_OK;
}

/* called when uridecodebin has a new pad */
static void
cb_pad_added (GstElement *element,
              GstPad     *pad,
              gpointer    user_data)
{
  GstElement *pipeline = GST_ELEMENT (user_data);

  if (prerolled)
    return;

  g_atomic_int_inc (&counter);

  gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
      (GstPadProbeCallback) cb_blocked, pipeline, NULL);

  /* try to link to the video pad */
  gst_pad_link (pad, sinkpad);
}

/* called when uridecodebin has created all pads */
static void
cb_no_more_pads (GstElement *element,
                 gpointer    user_data)
{
  GstElement *pipeline = GST_ELEMENT (user_data);

  if (prerolled)
    return;

  dec_counter (pipeline);
}

/* called when a new message is posted on the bus */
static void
cb_message (GstBus     *bus,
            GstMessage *message,
            gpointer    user_data)
{
  GstElement *pipeline = GST_ELEMENT (user_data);

  switch (GST_MESSAGE_TYPE (message)) {
    case GST_MESSAGE_ERROR:
      g_print ("we received an error!\n");
      g_main_loop_quit (loop);
      break;
    case GST_MESSAGE_EOS:
      g_print ("we reached EOS\n");
      g_main_loop_quit (loop);
      break;
    case GST_MESSAGE_APPLICATION:
    {
      if (gst_message_has_name (message, "ExPrerolled")) {
        /* it's our message */
        g_print ("we are all prerolled, do seek\n");
        gst_element_seek (pipeline,
            1.0, GST_FORMAT_TIME,
            GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_ACCURATE,
            GST_SEEK_TYPE_SET, 2 * GST_SECOND,
            GST_SEEK_TYPE_SET, 5 * GST_SECOND);

        gst_element_set_state (pipeline, GST_STATE_PLAYING);
      }
      break;
    }
    default:
      break;
  }
}

gint
main (gint   argc,
      gchar *argv[])
{
  GstElement *pipeline, *src, *csp, *vs, *sink;

  /* init GStreamer */
  gst_init (&argc, &argv);
  loop = g_main_loop_new (NULL, FALSE);

  if (argc < 2) {
    g_print ("usage: %s <uri>", argv[0]);
    return -1;
  }

  /* build */
  pipeline = gst_pipeline_new ("my-pipeline");

  bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
  gst_bus_add_signal_watch (bus);
  g_signal_connect (bus, "message", (GCallback) cb_message,
      pipeline);

  src = gst_element_factory_make ("uridecodebin", "src");
  if (src == NULL)
    g_error ("Could not create 'uridecodebin' element");

  g_object_set (src, "uri", argv[1], NULL);

  csp = gst_element_factory_make ("videoconvert", "csp");
  if (csp == NULL)
    g_error ("Could not create 'videoconvert' element");

  vs = gst_element_factory_make ("videoscale", "vs");
  if (csp == NULL)
    g_error ("Could not create 'videoscale' element");

  sink = gst_element_factory_make ("autovideosink", "sink");
  if (sink == NULL)
    g_error ("Could not create 'autovideosink' element");

  gst_bin_add_many (GST_BIN (pipeline), src, csp, vs, sink, NULL);

  /* can't link src yet, it has no pads */
  gst_element_link_many (csp, vs, sink, NULL);

  sinkpad = gst_element_get_static_pad (csp, "sink");

  /* for each pad block that is installed, we will increment
   * the counter. for each pad block that is signaled, we
   * decrement the counter. When the counter is 0 we post
   * an app message to tell the app that all pads are
   * blocked. Start with 1 that is decremented when no-more-pads
   * is signaled to make sure that we only post the message
   * after no-more-pads */
  g_atomic_int_set (&counter, 1);

  g_signal_connect (src, "pad-added",
      (GCallback) cb_pad_added, pipeline);
  g_signal_connect (src, "no-more-pads",
      (GCallback) cb_no_more_pads, pipeline);

  gst_element_set_state (pipeline, GST_STATE_PAUSED);

  g_main_loop_run (loop);

  gst_element_set_state (pipeline, GST_STATE_NULL);

  gst_object_unref (sinkpad);
  gst_object_unref (bus);
  gst_object_unref (pipeline);
  g_main_loop_unref (loop);

  return 0;
}


Note that we use a custom application message to signal the main thread that the uridecidebin is prerolled. The main thread will then issue a flushing seek to the requested region. The flush will temporarily unblock the pad and reblock them when new data arrives again. We detect this second block to remove the probes. Then we set the pipeline to PLAYING and it should play from 2 to 5 seconds, then EOS and exit the application.