Pads driving the pipeline

Sinkpads operating in pull-mode, with the sourcepads operating in push-mode (or it has no sourcepads when it is a sink), can start a task that will drive the pipeline data flow. Within this task function, you have random access over all of the sinkpads, and push data over the sourcepads. This can come in useful for several different kinds of elements:

First you need to perform a SCHEDULING query to check if the upstream element(s) support pull-mode scheduling. If that is possible, you can activate the sinkpad in pull-mode. Inside the activate_mode function you can then start the task.

#include "filter.h"
#include <string.h>

static gboolean	gst_my_filter_activate	    (GstPad      * pad,
                                             GstObject   * parent);
static gboolean	gst_my_filter_activate_mode (GstPad      * pad,
                                             GstObject   * parent,
                                             GstPadMode    mode
					     gboolean      active);
static void	gst_my_filter_loop	    (GstMyFilter * filter);

G_DEFINE_TYPE (GstMyFilter, gst_my_filter, GST_TYPE_ELEMENT);


static void
gst_my_filter_init (GstMyFilter * filter)
{

[..]

  gst_pad_set_activate_function (filter->sinkpad, gst_my_filter_activate);
  gst_pad_set_activatemode_function (filter->sinkpad,
      gst_my_filter_activate_mode);


[..]
}

[..]

static gboolean
gst_my_filter_activate (GstPad * pad, GstObject * parent)
{
  GstQuery *query;
  gboolean pull_mode;

  /* first check what upstream scheduling is supported */
  query = gst_query_new_scheduling ();

  if (!gst_pad_peer_query (pad, query)) {
    gst_query_unref (query);
    goto activate_push;
  }

  /* see if pull-mode is supported */
  pull_mode = gst_query_has_scheduling_mode_with_flags (query,
      GST_PAD_MODE_PULL, GST_SCHEDULING_FLAG_SEEKABLE);
  gst_query_unref (query);

  if (!pull_mode)
    goto activate_push;

  /* now we can activate in pull-mode. GStreamer will also
   * activate the upstream peer in pull-mode */
  return gst_pad_activate_mode (pad, GST_PAD_MODE_PULL, TRUE);

activate_push:
  {
    /* something not right, we fallback to push-mode */
    return gst_pad_activate_mode (pad, GST_PAD_MODE_PUSH, TRUE);
  }
}

static gboolean
gst_my_filter_activate_pull (GstPad    * pad,
			     GstObject * parent,
			     GstPadMode  mode,
			     gboolean    active)
{
  gboolean res;
  GstMyFilter *filter = GST_MY_FILTER (parent);

  switch (mode) {
    case GST_PAD_MODE_PUSH:
      res = TRUE;
      break;
    case GST_PAD_MODE_PULL:
      if (active) {
        filter->offset = 0;
        res = gst_pad_start_task (pad,
            (GstTaskFunction) gst_my_filter_loop, filter, NULL);
      } else {
        res = gst_pad_stop_task (pad);
      }
      break;
    default:
      /* unknown scheduling mode */
      res = FALSE;
      break;
  }
  return res;
}
    

Once started, your task has full control over input and output. The most simple case of a task function is one that reads input and pushes that over its source pad. It's not all that useful, but provides some more flexibility than the old push-mode case that we've been looking at so far.

#define BLOCKSIZE 2048

static void
gst_my_filter_loop (GstMyFilter * filter)
{
  GstFlowReturn ret;
  guint64 len;
  GstFormat fmt = GST_FORMAT_BYTES;
  GstBuffer *buf = NULL;

  if (!gst_pad_query_duration (filter->sinkpad, fmt, &len)) {
    GST_DEBUG_OBJECT (filter, "failed to query duration, pausing");
    goto stop;
  }

   if (filter->offset >= len) {
    GST_DEBUG_OBJECT (filter, "at end of input, sending EOS, pausing");
    gst_pad_push_event (filter->srcpad, gst_event_new_eos ());
    goto stop;
  }

  /* now, read BLOCKSIZE bytes from byte offset filter->offset */
  ret = gst_pad_pull_range (filter->sinkpad, filter->offset,
      BLOCKSIZE, &buf);

  if (ret != GST_FLOW_OK) {
    GST_DEBUG_OBJECT (filter, "pull_range failed: %s", gst_flow_get_name (ret));
    goto stop;
  }

  /* now push buffer downstream */
  ret = gst_pad_push (filter->srcpad, buf);

  buf = NULL; /* gst_pad_push() took ownership of buffer */

  if (ret != GST_FLOW_OK) {
    GST_DEBUG_OBJECT (filter, "pad_push failed: %s", gst_flow_get_name (ret));
    goto stop;
  }

  /* everything is fine, increase offset and wait for us to be called again */
  filter->offset += BLOCKSIZE;
  return;

stop:
  GST_DEBUG_OBJECT (filter, "pausing task");
  gst_pad_pause_task (filter->sinkpad);
}