Branch data Line data Source code
1 : : /* GStreamer
2 : : * Copyright (C) 2006 Edward Hervey <edward@fluendo.com>
3 : : * Copyright (C) 2007 Jan Schmidt <jan@fluendo.com>
4 : : * Copyright (C) 2007 Wim Taymans <wim@fluendo.com>
5 : : * Copyright (C) 2011 Sebastian Dröge <sebastian.droege@collabora.co.uk>
6 : : *
7 : : * gstmultiqueue.c:
8 : : *
9 : : * This library is free software; you can redistribute it and/or
10 : : * modify it under the terms of the GNU Library General Public
11 : : * License as published by the Free Software Foundation; either
12 : : * version 2 of the License, or (at your option) any later version.
13 : : *
14 : : * This library is distributed in the hope that it will be useful,
15 : : * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 : : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 : : * Library General Public License for more details.
18 : : *
19 : : * You should have received a copy of the GNU Library General Public
20 : : * License along with this library; if not, write to the
21 : : * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
22 : : * Boston, MA 02111-1307, USA.
23 : : */
24 : :
25 : : /**
26 : : * SECTION:element-multiqueue
27 : : * @see_also: #GstQueue
28 : : *
29 : : * <refsect2>
30 : : * <para>
31 : : * Multiqueue is similar to a normal #GstQueue with the following additional
32 : : * features:
33 : : * <orderedlist>
34 : : * <listitem>
35 : : * <itemizedlist><title>Multiple streamhandling</title>
36 : : * <listitem><para>
37 : : * The element handles queueing data on more than one stream at once. To
38 : : * achieve such a feature it has request sink pads (sink%d) and
39 : : * 'sometimes' src pads (src%d).
40 : : * </para><para>
41 : : * When requesting a given sinkpad with gst_element_get_request_pad(),
42 : : * the associated srcpad for that stream will be created.
43 : : * Example: requesting sink1 will generate src1.
44 : : * </para></listitem>
45 : : * </itemizedlist>
46 : : * </listitem>
47 : : * <listitem>
48 : : * <itemizedlist><title>Non-starvation on multiple streams</title>
49 : : * <listitem><para>
50 : : * If more than one stream is used with the element, the streams' queues
51 : : * will be dynamically grown (up to a limit), in order to ensure that no
52 : : * stream is risking data starvation. This guarantees that at any given
53 : : * time there are at least N bytes queued and available for each individual
54 : : * stream.
55 : : * </para><para>
56 : : * If an EOS event comes through a srcpad, the associated queue will be
57 : : * considered as 'not-empty' in the queue-size-growing algorithm.
58 : : * </para></listitem>
59 : : * </itemizedlist>
60 : : * </listitem>
61 : : * <listitem>
62 : : * <itemizedlist><title>Non-linked srcpads graceful handling</title>
63 : : * <listitem><para>
64 : : * In order to better support dynamic switching between streams, the multiqueue
65 : : * (unlike the current GStreamer queue) continues to push buffers on non-linked
66 : : * pads rather than shutting down.
67 : : * </para><para>
68 : : * In addition, to prevent a non-linked stream from very quickly consuming all
69 : : * available buffers and thus 'racing ahead' of the other streams, the element
70 : : * must ensure that buffers and inlined events for a non-linked stream are pushed
71 : : * in the same order as they were received, relative to the other streams
72 : : * controlled by the element. This means that a buffer cannot be pushed to a
73 : : * non-linked pad any sooner than buffers in any other stream which were received
74 : : * before it.
75 : : * </para></listitem>
76 : : * </itemizedlist>
77 : : * </listitem>
78 : : * </orderedlist>
79 : : * </para>
80 : : * <para>
81 : : * Data is queued until one of the limits specified by the
82 : : * #GstMultiQueue:max-size-buffers, #GstMultiQueue:max-size-bytes and/or
83 : : * #GstMultiQueue:max-size-time properties has been reached. Any attempt to push
84 : : * more buffers into the queue will block the pushing thread until more space
85 : : * becomes available. #GstMultiQueue:extra-size-buffers,
86 : : * </para>
87 : : * <para>
88 : : * #GstMultiQueue:extra-size-bytes and #GstMultiQueue:extra-size-time are
89 : : * currently unused.
90 : : * </para>
91 : : * <para>
92 : : * The default queue size limits are 5 buffers, 10MB of data, or
93 : : * two second worth of data, whichever is reached first. Note that the number
94 : : * of buffers will dynamically grow depending on the fill level of
95 : : * other queues.
96 : : * </para>
97 : : * <para>
98 : : * The #GstMultiQueue::underrun signal is emitted when all of the queues
99 : : * are empty. The #GstMultiQueue::overrun signal is emitted when one of the
100 : : * queues is filled.
101 : : * Both signals are emitted from the context of the streaming thread.
102 : : * </para>
103 : : * </refsect2>
104 : : *
105 : : * Last reviewed on 2008-01-25 (0.10.17)
106 : : */
107 : :
108 : : #ifdef HAVE_CONFIG_H
109 : : # include "config.h"
110 : : #endif
111 : :
112 : : #include <gst/gst.h>
113 : : #include "gstmultiqueue.h"
114 : :
115 : : /**
116 : : * GstSingleQueue:
117 : : * @sinkpad: associated sink #GstPad
118 : : * @srcpad: associated source #GstPad
119 : : *
120 : : * Structure containing all information and properties about
121 : : * a single queue.
122 : : */
123 : : typedef struct _GstSingleQueue GstSingleQueue;
124 : :
125 : : struct _GstSingleQueue
126 : : {
127 : : /* unique identifier of the queue */
128 : : guint id;
129 : :
130 : : GstMultiQueue *mqueue;
131 : :
132 : : GstPad *sinkpad;
133 : : GstPad *srcpad;
134 : :
135 : : /* flowreturn of previous srcpad push */
136 : : GstFlowReturn srcresult;
137 : :
138 : : /* segments */
139 : : GstSegment sink_segment;
140 : : GstSegment src_segment;
141 : :
142 : : /* position of src/sink */
143 : : GstClockTime sinktime, srctime;
144 : : /* TRUE if either position needs to be recalculated */
145 : : gboolean sink_tainted, src_tainted;
146 : :
147 : : /* queue of data */
148 : : GstDataQueue *queue;
149 : : GstDataQueueSize max_size, extra_size;
150 : : GstClockTime cur_time;
151 : : gboolean is_eos;
152 : : gboolean flushing;
153 : :
154 : : /* Protected by global lock */
155 : : guint32 nextid; /* ID of the next object waiting to be pushed */
156 : : guint32 oldid; /* ID of the last object pushed (last in a series) */
157 : : guint32 last_oldid; /* Previously observed old_id, reset to MAXUINT32 on flush */
158 : : GCond *turn; /* SingleQueue turn waiting conditional */
159 : : };
160 : :
161 : :
162 : : /* Extension of GstDataQueueItem structure for our usage */
163 : : typedef struct _GstMultiQueueItem GstMultiQueueItem;
164 : :
165 : : struct _GstMultiQueueItem
166 : : {
167 : : GstMiniObject *object;
168 : : guint size;
169 : : guint64 duration;
170 : : gboolean visible;
171 : :
172 : : GDestroyNotify destroy;
173 : : guint32 posid;
174 : : };
175 : :
176 : : static GstSingleQueue *gst_single_queue_new (GstMultiQueue * mqueue);
177 : : static void gst_single_queue_free (GstSingleQueue * squeue);
178 : :
179 : : static void wake_up_next_non_linked (GstMultiQueue * mq);
180 : : static void compute_high_id (GstMultiQueue * mq);
181 : : static void single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
182 : : static void single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq);
183 : :
184 : : static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink%d",
185 : : GST_PAD_SINK,
186 : : GST_PAD_REQUEST,
187 : : GST_STATIC_CAPS_ANY);
188 : :
189 : : static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src%d",
190 : : GST_PAD_SRC,
191 : : GST_PAD_SOMETIMES,
192 : : GST_STATIC_CAPS_ANY);
193 : :
194 : : GST_DEBUG_CATEGORY_STATIC (multi_queue_debug);
195 : : #define GST_CAT_DEFAULT (multi_queue_debug)
196 : :
197 : : /* Signals and args */
198 : : enum
199 : : {
200 : : SIGNAL_UNDERRUN,
201 : : SIGNAL_OVERRUN,
202 : : LAST_SIGNAL
203 : : };
204 : :
205 : : /* default limits, we try to keep up to 2 seconds of data and if there is not
206 : : * time, up to 10 MB. The number of buffers is dynamically scaled to make sure
207 : : * there is data in the queues. Normally, the byte and time limits are not hit
208 : : * in theses conditions. */
209 : : #define DEFAULT_MAX_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */
210 : : #define DEFAULT_MAX_SIZE_BUFFERS 5
211 : : #define DEFAULT_MAX_SIZE_TIME 2 * GST_SECOND
212 : :
213 : : /* second limits. When we hit one of the above limits we are probably dealing
214 : : * with a badly muxed file and we scale the limits to these emergency values.
215 : : * This is currently not yet implemented.
216 : : * Since we dynamically scale the queue buffer size up to the limits but avoid
217 : : * going above the max-size-buffers when we can, we don't really need this
218 : : * aditional extra size. */
219 : : #define DEFAULT_EXTRA_SIZE_BYTES 10 * 1024 * 1024 /* 10 MB */
220 : : #define DEFAULT_EXTRA_SIZE_BUFFERS 5
221 : : #define DEFAULT_EXTRA_SIZE_TIME 3 * GST_SECOND
222 : :
223 : : #define DEFAULT_USE_BUFFERING FALSE
224 : : #define DEFAULT_LOW_PERCENT 10
225 : : #define DEFAULT_HIGH_PERCENT 99
226 : :
227 : : enum
228 : : {
229 : : PROP_0,
230 : : PROP_EXTRA_SIZE_BYTES,
231 : : PROP_EXTRA_SIZE_BUFFERS,
232 : : PROP_EXTRA_SIZE_TIME,
233 : : PROP_MAX_SIZE_BYTES,
234 : : PROP_MAX_SIZE_BUFFERS,
235 : : PROP_MAX_SIZE_TIME,
236 : : PROP_USE_BUFFERING,
237 : : PROP_LOW_PERCENT,
238 : : PROP_HIGH_PERCENT,
239 : : PROP_LAST
240 : : };
241 : :
242 : : #define GST_MULTI_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
243 : : g_mutex_lock (q->qlock); \
244 : : } G_STMT_END
245 : :
246 : : #define GST_MULTI_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \
247 : : g_mutex_unlock (q->qlock); \
248 : : } G_STMT_END
249 : :
250 : : static void gst_multi_queue_finalize (GObject * object);
251 : : static void gst_multi_queue_set_property (GObject * object,
252 : : guint prop_id, const GValue * value, GParamSpec * pspec);
253 : : static void gst_multi_queue_get_property (GObject * object,
254 : : guint prop_id, GValue * value, GParamSpec * pspec);
255 : :
256 : : static GstPad *gst_multi_queue_request_new_pad (GstElement * element,
257 : : GstPadTemplate * temp, const gchar * name);
258 : : static void gst_multi_queue_release_pad (GstElement * element, GstPad * pad);
259 : : static GstStateChangeReturn gst_multi_queue_change_state (GstElement *
260 : : element, GstStateChange transition);
261 : :
262 : : static void gst_multi_queue_loop (GstPad * pad);
263 : :
264 : : #define _do_init(bla) \
265 : : GST_DEBUG_CATEGORY_INIT (multi_queue_debug, "multiqueue", 0, "multiqueue element");
266 : :
267 [ + + ][ + - ]: 285 : GST_BOILERPLATE_FULL (GstMultiQueue, gst_multi_queue, GstElement,
268 : 285 : GST_TYPE_ELEMENT, _do_init);
269 : :
270 : : static guint gst_multi_queue_signals[LAST_SIGNAL] = { 0 };
271 : :
272 : : static void
273 : 14 : gst_multi_queue_base_init (gpointer g_class)
274 : : {
275 : 14 : GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
276 : :
277 : 14 : gst_element_class_set_details_simple (gstelement_class,
278 : : "MultiQueue",
279 : : "Generic", "Multiple data queue", "Edward Hervey <edward@fluendo.com>");
280 : 14 : gst_element_class_add_pad_template (gstelement_class,
281 : : gst_static_pad_template_get (&sinktemplate));
282 : 14 : gst_element_class_add_pad_template (gstelement_class,
283 : : gst_static_pad_template_get (&srctemplate));
284 : 14 : }
285 : :
286 : : static void
287 : 14 : gst_multi_queue_class_init (GstMultiQueueClass * klass)
288 : : {
289 : 14 : GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
290 : 14 : GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
291 : :
292 : 14 : gobject_class->set_property = gst_multi_queue_set_property;
293 : 14 : gobject_class->get_property = gst_multi_queue_get_property;
294 : :
295 : : /* SIGNALS */
296 : :
297 : : /**
298 : : * GstMultiQueue::underrun:
299 : : * @multiqueue: the multqueue instance
300 : : *
301 : : * This signal is emitted from the streaming thread when there is
302 : : * no data in any of the queues inside the multiqueue instance (underrun).
303 : : *
304 : : * This indicates either starvation or EOS from the upstream data sources.
305 : : */
306 : 14 : gst_multi_queue_signals[SIGNAL_UNDERRUN] =
307 : 14 : g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
308 : : G_STRUCT_OFFSET (GstMultiQueueClass, underrun), NULL, NULL,
309 : : g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
310 : :
311 : : /**
312 : : * GstMultiQueue::overrun:
313 : : * @multiqueue: the multiqueue instance
314 : : *
315 : : * Reports that one of the queues in the multiqueue is full (overrun).
316 : : * A queue is full if the total amount of data inside it (num-buffers, time,
317 : : * size) is higher than the boundary values which can be set through the
318 : : * GObject properties.
319 : : *
320 : : * This can be used as an indicator of pre-roll.
321 : : */
322 : 14 : gst_multi_queue_signals[SIGNAL_OVERRUN] =
323 : 14 : g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
324 : : G_STRUCT_OFFSET (GstMultiQueueClass, overrun), NULL, NULL,
325 : : g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
326 : :
327 : : /* PROPERTIES */
328 : :
329 : 14 : g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
330 : : g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
331 : : "Max. amount of data in the queue (bytes, 0=disable)",
332 : : 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
333 : : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
334 : 14 : g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
335 : : g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
336 : : "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
337 : : DEFAULT_MAX_SIZE_BUFFERS,
338 : : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
339 : 14 : g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
340 : : g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
341 : : "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
342 : : DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
343 : :
344 : 14 : g_object_class_install_property (gobject_class, PROP_EXTRA_SIZE_BYTES,
345 : : g_param_spec_uint ("extra-size-bytes", "Extra Size (kB)",
346 : : "Amount of data the queues can grow if one of them is empty (bytes, 0=disable)"
347 : : " (NOT IMPLEMENTED)",
348 : : 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BYTES,
349 : : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
350 : 14 : g_object_class_install_property (gobject_class, PROP_EXTRA_SIZE_BUFFERS,
351 : : g_param_spec_uint ("extra-size-buffers", "Extra Size (buffers)",
352 : : "Amount of buffers the queues can grow if one of them is empty (0=disable)"
353 : : " (NOT IMPLEMENTED)",
354 : : 0, G_MAXUINT, DEFAULT_EXTRA_SIZE_BUFFERS,
355 : : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
356 : 14 : g_object_class_install_property (gobject_class, PROP_EXTRA_SIZE_TIME,
357 : : g_param_spec_uint64 ("extra-size-time", "Extra Size (ns)",
358 : : "Amount of time the queues can grow if one of them is empty (in ns, 0=disable)"
359 : : " (NOT IMPLEMENTED)",
360 : : 0, G_MAXUINT64, DEFAULT_EXTRA_SIZE_TIME,
361 : : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
362 : :
363 : : /**
364 : : * GstMultiQueue:use-buffering
365 : : *
366 : : * Enable the buffering option in multiqueue so that BUFFERING messages are
367 : : * emited based on low-/high-percent thresholds.
368 : : *
369 : : * Since: 0.10.26
370 : : */
371 : 14 : g_object_class_install_property (gobject_class, PROP_USE_BUFFERING,
372 : : g_param_spec_boolean ("use-buffering", "Use buffering",
373 : : "Emit GST_MESSAGE_BUFFERING based on low-/high-percent thresholds",
374 : : DEFAULT_USE_BUFFERING, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
375 : : /**
376 : : * GstMultiQueue:low-percent
377 : : *
378 : : * Low threshold percent for buffering to start.
379 : : *
380 : : * Since: 0.10.26
381 : : */
382 : 14 : g_object_class_install_property (gobject_class, PROP_LOW_PERCENT,
383 : : g_param_spec_int ("low-percent", "Low percent",
384 : : "Low threshold for buffering to start", 0, 100,
385 : : DEFAULT_LOW_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
386 : : /**
387 : : * GstMultiQueue:high-percent
388 : : *
389 : : * High threshold percent for buffering to finish.
390 : : *
391 : : * Since: 0.10.26
392 : : */
393 : 14 : g_object_class_install_property (gobject_class, PROP_HIGH_PERCENT,
394 : : g_param_spec_int ("high-percent", "High percent",
395 : : "High threshold for buffering to finish", 0, 100,
396 : : DEFAULT_HIGH_PERCENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
397 : :
398 : :
399 : 14 : gobject_class->finalize = gst_multi_queue_finalize;
400 : :
401 : 14 : gstelement_class->request_new_pad =
402 : 14 : GST_DEBUG_FUNCPTR (gst_multi_queue_request_new_pad);
403 : 14 : gstelement_class->release_pad =
404 : 14 : GST_DEBUG_FUNCPTR (gst_multi_queue_release_pad);
405 : 14 : gstelement_class->change_state =
406 : 14 : GST_DEBUG_FUNCPTR (gst_multi_queue_change_state);
407 : 14 : }
408 : :
409 : : static void
410 : 10 : gst_multi_queue_init (GstMultiQueue * mqueue, GstMultiQueueClass * klass)
411 : : {
412 : 10 : mqueue->nbqueues = 0;
413 : 10 : mqueue->queues = NULL;
414 : :
415 : 10 : mqueue->max_size.bytes = DEFAULT_MAX_SIZE_BYTES;
416 : 10 : mqueue->max_size.visible = DEFAULT_MAX_SIZE_BUFFERS;
417 : 10 : mqueue->max_size.time = DEFAULT_MAX_SIZE_TIME;
418 : :
419 : 10 : mqueue->extra_size.bytes = DEFAULT_EXTRA_SIZE_BYTES;
420 : 10 : mqueue->extra_size.visible = DEFAULT_EXTRA_SIZE_BUFFERS;
421 : 10 : mqueue->extra_size.time = DEFAULT_EXTRA_SIZE_TIME;
422 : :
423 : 10 : mqueue->use_buffering = DEFAULT_USE_BUFFERING;
424 : 10 : mqueue->low_percent = DEFAULT_LOW_PERCENT;
425 : 10 : mqueue->high_percent = DEFAULT_HIGH_PERCENT;
426 : :
427 : 10 : mqueue->counter = 1;
428 : 10 : mqueue->highid = -1;
429 : :
430 : 10 : mqueue->qlock = g_mutex_new ();
431 : 10 : }
432 : :
433 : : static void
434 : 10 : gst_multi_queue_finalize (GObject * object)
435 : : {
436 : 10 : GstMultiQueue *mqueue = GST_MULTI_QUEUE (object);
437 : :
438 : 10 : g_list_foreach (mqueue->queues, (GFunc) gst_single_queue_free, NULL);
439 : 10 : g_list_free (mqueue->queues);
440 : 10 : mqueue->queues = NULL;
441 : 10 : mqueue->queues_cookie++;
442 : :
443 : : /* free/unref instance data */
444 : 10 : g_mutex_free (mqueue->qlock);
445 : :
446 : 10 : G_OBJECT_CLASS (parent_class)->finalize (object);
447 : 10 : }
448 : :
449 : : #define SET_CHILD_PROPERTY(mq,format) G_STMT_START { \
450 : : GList * tmp = mq->queues; \
451 : : while (tmp) { \
452 : : GstSingleQueue *q = (GstSingleQueue*)tmp->data; \
453 : : q->max_size.format = mq->max_size.format; \
454 : : tmp = g_list_next(tmp); \
455 : : }; \
456 : : } G_STMT_END
457 : :
458 : : static void
459 : 18 : gst_multi_queue_set_property (GObject * object, guint prop_id,
460 : : const GValue * value, GParamSpec * pspec)
461 : : {
462 : 18 : GstMultiQueue *mq = GST_MULTI_QUEUE (object);
463 : :
464 [ + + + + : 18 : switch (prop_id) {
+ + - - -
- ]
465 : : case PROP_MAX_SIZE_BYTES:
466 : 3 : GST_MULTI_QUEUE_MUTEX_LOCK (mq);
467 : 3 : mq->max_size.bytes = g_value_get_uint (value);
468 [ # # ][ - + ]: 3 : SET_CHILD_PROPERTY (mq, bytes);
469 : 3 : GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
470 : 3 : break;
471 : : case PROP_MAX_SIZE_BUFFERS:
472 : 3 : GST_MULTI_QUEUE_MUTEX_LOCK (mq);
473 : 3 : mq->max_size.visible = g_value_get_uint (value);
474 [ # # ][ - + ]: 3 : SET_CHILD_PROPERTY (mq, visible);
475 : 3 : GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
476 : 3 : break;
477 : : case PROP_MAX_SIZE_TIME:
478 : 3 : GST_MULTI_QUEUE_MUTEX_LOCK (mq);
479 : 3 : mq->max_size.time = g_value_get_uint64 (value);
480 [ # # ][ - + ]: 3 : SET_CHILD_PROPERTY (mq, time);
481 : 3 : GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
482 : 3 : break;
483 : : case PROP_EXTRA_SIZE_BYTES:
484 : 3 : mq->extra_size.bytes = g_value_get_uint (value);
485 : 3 : break;
486 : : case PROP_EXTRA_SIZE_BUFFERS:
487 : 3 : mq->extra_size.visible = g_value_get_uint (value);
488 : 3 : break;
489 : : case PROP_EXTRA_SIZE_TIME:
490 : 3 : mq->extra_size.time = g_value_get_uint64 (value);
491 : 3 : break;
492 : : case PROP_USE_BUFFERING:
493 : 0 : mq->use_buffering = g_value_get_boolean (value);
494 : 0 : break;
495 : : case PROP_LOW_PERCENT:
496 : 0 : mq->low_percent = g_value_get_int (value);
497 : 0 : break;
498 : : case PROP_HIGH_PERCENT:
499 : 0 : mq->high_percent = g_value_get_int (value);
500 : 0 : break;
501 : : default:
502 : 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
503 : 0 : break;
504 : : }
505 : 18 : }
506 : :
507 : : static void
508 : 0 : gst_multi_queue_get_property (GObject * object, guint prop_id,
509 : : GValue * value, GParamSpec * pspec)
510 : : {
511 : 0 : GstMultiQueue *mq = GST_MULTI_QUEUE (object);
512 : :
513 : 0 : GST_MULTI_QUEUE_MUTEX_LOCK (mq);
514 : :
515 [ # # # # : 0 : switch (prop_id) {
# # # # #
# ]
516 : : case PROP_EXTRA_SIZE_BYTES:
517 : 0 : g_value_set_uint (value, mq->extra_size.bytes);
518 : 0 : break;
519 : : case PROP_EXTRA_SIZE_BUFFERS:
520 : 0 : g_value_set_uint (value, mq->extra_size.visible);
521 : 0 : break;
522 : : case PROP_EXTRA_SIZE_TIME:
523 : 0 : g_value_set_uint64 (value, mq->extra_size.time);
524 : 0 : break;
525 : : case PROP_MAX_SIZE_BYTES:
526 : 0 : g_value_set_uint (value, mq->max_size.bytes);
527 : 0 : break;
528 : : case PROP_MAX_SIZE_BUFFERS:
529 : 0 : g_value_set_uint (value, mq->max_size.visible);
530 : 0 : break;
531 : : case PROP_MAX_SIZE_TIME:
532 : 0 : g_value_set_uint64 (value, mq->max_size.time);
533 : 0 : break;
534 : : case PROP_USE_BUFFERING:
535 : 0 : g_value_set_boolean (value, mq->use_buffering);
536 : 0 : break;
537 : : case PROP_LOW_PERCENT:
538 : 0 : g_value_set_int (value, mq->low_percent);
539 : 0 : break;
540 : : case PROP_HIGH_PERCENT:
541 : 0 : g_value_set_int (value, mq->high_percent);
542 : 0 : break;
543 : : default:
544 : 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
545 : 0 : break;
546 : : }
547 : :
548 : 0 : GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
549 : 0 : }
550 : :
551 : : static GstIterator *
552 : 0 : gst_multi_queue_iterate_internal_links (GstPad * pad)
553 : : {
554 : 0 : GstIterator *it = NULL;
555 : : GstPad *opad;
556 : : GstSingleQueue *squeue;
557 : 0 : GstMultiQueue *mq = GST_MULTI_QUEUE (gst_pad_get_parent (pad));
558 : :
559 : 0 : GST_MULTI_QUEUE_MUTEX_LOCK (mq);
560 : 0 : squeue = gst_pad_get_element_private (pad);
561 [ # # ]: 0 : if (!squeue)
562 : 0 : goto out;
563 : :
564 [ # # ]: 0 : if (squeue->sinkpad == pad)
565 : 0 : opad = gst_object_ref (squeue->srcpad);
566 [ # # ]: 0 : else if (squeue->srcpad == pad)
567 : 0 : opad = gst_object_ref (squeue->sinkpad);
568 : : else
569 : 0 : goto out;
570 : :
571 : 0 : it = gst_iterator_new_single (GST_TYPE_PAD, opad,
572 : : (GstCopyFunction) gst_object_ref, (GFreeFunc) gst_object_unref);
573 : :
574 : 0 : gst_object_unref (opad);
575 : :
576 : : out:
577 : 0 : GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
578 : 0 : gst_object_unref (mq);
579 : :
580 : 0 : return it;
581 : : }
582 : :
583 : :
584 : : /*
585 : : * GstElement methods
586 : : */
587 : :
588 : : static GstPad *
589 : 16 : gst_multi_queue_request_new_pad (GstElement * element, GstPadTemplate * temp,
590 : : const gchar * name)
591 : : {
592 : 16 : GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
593 : : GstSingleQueue *squeue;
594 : :
595 [ - + ][ # # ]: 16 : GST_LOG_OBJECT (element, "name : %s", GST_STR_NULL (name));
596 : :
597 : : /* Create a new single queue, add the sink and source pad and return the sink pad */
598 : 16 : squeue = gst_single_queue_new (mqueue);
599 : :
600 : 16 : GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
601 : 16 : mqueue->queues = g_list_append (mqueue->queues, squeue);
602 : 16 : mqueue->queues_cookie++;
603 : 16 : GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
604 : :
605 [ - + ][ # # ]: 16 : GST_DEBUG_OBJECT (mqueue, "Returning pad %s:%s",
[ # # ][ # # ]
[ # # ][ # # ]
606 : : GST_DEBUG_PAD_NAME (squeue->sinkpad));
607 : :
608 : 16 : return squeue->sinkpad;
609 : : }
610 : :
611 : : static void
612 : 12 : gst_multi_queue_release_pad (GstElement * element, GstPad * pad)
613 : : {
614 : 12 : GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
615 : 12 : GstSingleQueue *sq = NULL;
616 : : GList *tmp;
617 : :
618 [ - + ][ # # ]: 12 : GST_LOG_OBJECT (element, "pad %s:%s", GST_DEBUG_PAD_NAME (pad));
[ # # ][ # # ]
[ # # ][ # # ]
619 : :
620 : 12 : GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
621 : : /* Find which single queue it belongs to, knowing that it should be a sinkpad */
622 [ # # ][ + - ]: 12 : for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
623 : 12 : sq = (GstSingleQueue *) tmp->data;
624 : :
625 [ + - ]: 12 : if (sq->sinkpad == pad)
626 : 12 : break;
627 : : }
628 : :
629 [ - + ]: 12 : if (!tmp) {
630 [ # # ]: 0 : GST_WARNING_OBJECT (mqueue, "That pad doesn't belong to this element ???");
631 : 0 : GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
632 : 12 : return;
633 : : }
634 : :
635 : : /* FIXME: The removal of the singlequeue should probably not happen until it
636 : : * finishes draining */
637 : :
638 : : /* remove it from the list */
639 : 12 : mqueue->queues = g_list_delete_link (mqueue->queues, tmp);
640 : 12 : mqueue->queues_cookie++;
641 : :
642 : : /* FIXME : recompute next-non-linked */
643 : 12 : GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
644 : :
645 : : /* delete SingleQueue */
646 : 12 : gst_data_queue_set_flushing (sq->queue, TRUE);
647 : :
648 : 12 : gst_pad_set_active (sq->srcpad, FALSE);
649 : 12 : gst_pad_set_active (sq->sinkpad, FALSE);
650 : 12 : gst_pad_set_element_private (sq->srcpad, NULL);
651 : 12 : gst_pad_set_element_private (sq->sinkpad, NULL);
652 : 12 : gst_element_remove_pad (element, sq->srcpad);
653 : 12 : gst_element_remove_pad (element, sq->sinkpad);
654 : 12 : gst_single_queue_free (sq);
655 : : }
656 : :
657 : : static GstStateChangeReturn
658 : 62 : gst_multi_queue_change_state (GstElement * element, GstStateChange transition)
659 : : {
660 : 62 : GstMultiQueue *mqueue = GST_MULTI_QUEUE (element);
661 : 62 : GstSingleQueue *sq = NULL;
662 : : GstStateChangeReturn result;
663 : :
664 [ + + + ]: 62 : switch (transition) {
665 : : case GST_STATE_CHANGE_READY_TO_PAUSED:{
666 : : GList *tmp;
667 : :
668 : : /* Set all pads to non-flushing */
669 : 12 : GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
670 [ + - ][ + + ]: 26 : for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
671 : 14 : sq = (GstSingleQueue *) tmp->data;
672 : 14 : sq->flushing = FALSE;
673 : : }
674 : 12 : GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
675 : 12 : break;
676 : : }
677 : : case GST_STATE_CHANGE_PAUSED_TO_READY:{
678 : : GList *tmp;
679 : :
680 : : /* Un-wait all waiting pads */
681 : 12 : GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
682 [ + - ][ + + ]: 14 : for (tmp = mqueue->queues; tmp; tmp = g_list_next (tmp)) {
683 : 2 : sq = (GstSingleQueue *) tmp->data;
684 : 2 : sq->flushing = TRUE;
685 : 2 : g_cond_signal (sq->turn);
686 : : }
687 : 12 : GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
688 : 12 : break;
689 : : }
690 : : default:
691 : 38 : break;
692 : : }
693 : :
694 : 62 : result = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
695 : :
696 : : switch (transition) {
697 : : default:
698 : 62 : break;
699 : : }
700 : :
701 : 62 : return result;
702 : :
703 : :
704 : :
705 : : }
706 : :
707 : : static gboolean
708 : 28 : gst_single_queue_flush (GstMultiQueue * mq, GstSingleQueue * sq, gboolean flush)
709 : : {
710 : : gboolean result;
711 : :
712 [ - + ][ # # ]: 28 : GST_DEBUG_OBJECT (mq, "flush %s queue %d", (flush ? "start" : "stop"),
713 : : sq->id);
714 : :
715 [ + + ]: 28 : if (flush) {
716 : 14 : sq->srcresult = GST_FLOW_WRONG_STATE;
717 : 14 : gst_data_queue_set_flushing (sq->queue, TRUE);
718 : :
719 : 14 : sq->flushing = TRUE;
720 : :
721 : : /* wake up non-linked task */
722 [ - + ]: 14 : GST_LOG_OBJECT (mq, "SingleQueue %d : waking up eventually waiting task",
723 : : sq->id);
724 : 14 : GST_MULTI_QUEUE_MUTEX_LOCK (mq);
725 : 14 : g_cond_signal (sq->turn);
726 : 14 : GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
727 : :
728 [ - + ]: 14 : GST_LOG_OBJECT (mq, "SingleQueue %d : pausing task", sq->id);
729 : 14 : result = gst_pad_pause_task (sq->srcpad);
730 : 14 : sq->sink_tainted = sq->src_tainted = TRUE;
731 : : } else {
732 : 14 : gst_data_queue_flush (sq->queue);
733 : 14 : gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
734 : 14 : gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
735 : : /* All pads start off not-linked for a smooth kick-off */
736 : 14 : sq->srcresult = GST_FLOW_OK;
737 : 14 : sq->cur_time = 0;
738 : 14 : sq->max_size.visible = mq->max_size.visible;
739 : 14 : sq->is_eos = FALSE;
740 : 14 : sq->nextid = 0;
741 : 14 : sq->oldid = 0;
742 : 14 : sq->last_oldid = G_MAXUINT32;
743 : 14 : gst_data_queue_set_flushing (sq->queue, FALSE);
744 : :
745 : 14 : sq->flushing = FALSE;
746 : :
747 [ - + ]: 14 : GST_LOG_OBJECT (mq, "SingleQueue %d : starting task", sq->id);
748 : 14 : result =
749 : 14 : gst_pad_start_task (sq->srcpad, (GstTaskFunction) gst_multi_queue_loop,
750 : 14 : sq->srcpad);
751 : : }
752 : 28 : return result;
753 : : }
754 : :
755 : : static void
756 : 4514 : update_buffering (GstMultiQueue * mq, GstSingleQueue * sq)
757 : : {
758 : : GstDataQueueSize size;
759 : : gint percent, tmp;
760 : 4514 : gboolean post = FALSE;
761 : :
762 : : /* nothing to dowhen we are not in buffering mode */
763 [ + - ]: 4514 : if (!mq->use_buffering)
764 : 4514 : return;
765 : :
766 : 0 : gst_data_queue_get_level (sq->queue, &size);
767 : :
768 [ # # ]: 0 : GST_DEBUG_OBJECT (mq,
769 : : "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
770 : : G_GUINT64_FORMAT, sq->id, size.visible, sq->max_size.visible,
771 : : size.bytes, sq->max_size.bytes, sq->cur_time, sq->max_size.time);
772 : :
773 : : /* get bytes and time percentages and take the max */
774 [ # # ]: 0 : if (sq->is_eos) {
775 : 0 : percent = 100;
776 : : } else {
777 : 0 : percent = 0;
778 [ # # ]: 0 : if (sq->max_size.time > 0) {
779 : 0 : tmp = (sq->cur_time * 100) / sq->max_size.time;
780 : 0 : percent = MAX (percent, tmp);
781 : : }
782 [ # # ]: 0 : if (sq->max_size.bytes > 0) {
783 : 0 : tmp = (size.bytes * 100) / sq->max_size.bytes;
784 : 0 : percent = MAX (percent, tmp);
785 : : }
786 : : }
787 : :
788 [ # # ]: 0 : if (mq->buffering) {
789 : 0 : post = TRUE;
790 [ # # ]: 0 : if (percent >= mq->high_percent) {
791 : 0 : mq->buffering = FALSE;
792 : : }
793 : : /* make sure it increases */
794 : 0 : percent = MAX (mq->percent, percent);
795 : :
796 [ # # ]: 0 : if (percent == mq->percent)
797 : : /* don't post if nothing changed */
798 : 0 : post = FALSE;
799 : : else
800 : : /* else keep last value we posted */
801 : 0 : mq->percent = percent;
802 : : } else {
803 [ # # ]: 0 : if (percent < mq->low_percent) {
804 : 0 : mq->buffering = TRUE;
805 : 0 : mq->percent = percent;
806 : 0 : post = TRUE;
807 : : }
808 : : }
809 [ # # ]: 0 : if (post) {
810 : : GstMessage *message;
811 : :
812 : : /* scale to high percent so that it becomes the 100% mark */
813 : 0 : percent = percent * 100 / mq->high_percent;
814 : : /* clip */
815 [ # # ]: 0 : if (percent > 100)
816 : 0 : percent = 100;
817 : :
818 [ # # ]: 0 : GST_DEBUG_OBJECT (mq, "buffering %d percent", percent);
819 : 0 : message = gst_message_new_buffering (GST_OBJECT_CAST (mq), percent);
820 : :
821 : 0 : gst_element_post_message (GST_ELEMENT_CAST (mq), message);
822 : : } else {
823 [ # # ]: 0 : GST_DEBUG_OBJECT (mq, "filled %d percent", percent);
824 : : }
825 : : }
826 : :
827 : : /* calculate the diff between running time on the sink and src of the queue.
828 : : * This is the total amount of time in the queue.
829 : : * WITH LOCK TAKEN */
830 : : static void
831 : 4501 : update_time_level (GstMultiQueue * mq, GstSingleQueue * sq)
832 : : {
833 : : gint64 sink_time, src_time;
834 : :
835 [ + + ]: 4501 : if (sq->sink_tainted) {
836 : 2469 : sink_time = sq->sinktime =
837 : 2469 : gst_segment_to_running_time (&sq->sink_segment, GST_FORMAT_TIME,
838 : : sq->sink_segment.last_stop);
839 : :
840 [ + - ]: 2469 : if (G_UNLIKELY (sink_time != GST_CLOCK_TIME_NONE))
841 : : /* if we have a time, we become untainted and use the time */
842 : 2469 : sq->sink_tainted = FALSE;
843 : : } else
844 : 2032 : sink_time = sq->sinktime;
845 : :
846 [ + + ]: 4501 : if (sq->src_tainted) {
847 : 2046 : src_time = sq->srctime =
848 : 2046 : gst_segment_to_running_time (&sq->src_segment, GST_FORMAT_TIME,
849 : : sq->src_segment.last_stop);
850 : : /* if we have a time, we become untainted and use the time */
851 [ + - ]: 2046 : if (G_UNLIKELY (src_time != GST_CLOCK_TIME_NONE))
852 : 2046 : sq->src_tainted = FALSE;
853 : : } else
854 : 2455 : src_time = sq->srctime;
855 : :
856 [ - + ][ # # ]: 4501 : GST_DEBUG_OBJECT (mq,
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
857 : : "queue %d, sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT, sq->id,
858 : : GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
859 : :
860 : : /* This allows for streams with out of order timestamping - sometimes the
861 : : * emerging timestamp is later than the arriving one(s) */
862 [ + - ][ + - ]: 4501 : if (G_LIKELY (sink_time != -1 && src_time != -1 && sink_time > src_time))
[ + + ]
863 : 3550 : sq->cur_time = sink_time - src_time;
864 : : else
865 : 951 : sq->cur_time = 0;
866 : :
867 : : /* updating the time level can change the buffering state */
868 : 4501 : update_buffering (mq, sq);
869 : :
870 : 4501 : return;
871 : : }
872 : :
873 : : /* take a NEWSEGMENT event and apply the values to segment, updating the time
874 : : * level of queue. */
875 : : static void
876 : 208 : apply_segment (GstMultiQueue * mq, GstSingleQueue * sq, GstEvent * event,
877 : : GstSegment * segment)
878 : : {
879 : : gboolean update;
880 : : GstFormat format;
881 : : gdouble rate, arate;
882 : : gint64 start, stop, time;
883 : :
884 : 208 : gst_event_parse_new_segment_full (event, &update, &rate, &arate,
885 : : &format, &start, &stop, &time);
886 : :
887 : : /* now configure the values, we use these to track timestamps on the
888 : : * sinkpad. */
889 [ + + ]: 208 : if (format != GST_FORMAT_TIME) {
890 : : /* non-time format, pretent the current time segment is closed with a
891 : : * 0 start and unknown stop time. */
892 : 4 : update = FALSE;
893 : 4 : format = GST_FORMAT_TIME;
894 : 4 : start = 0;
895 : 4 : stop = -1;
896 : 4 : time = 0;
897 : : }
898 : :
899 : 208 : GST_MULTI_QUEUE_MUTEX_LOCK (mq);
900 : :
901 : 208 : gst_segment_set_newsegment_full (segment, update,
902 : : rate, arate, format, start, stop, time);
903 : :
904 [ + + ]: 208 : if (segment == &sq->sink_segment)
905 : 104 : sq->sink_tainted = TRUE;
906 : : else
907 : 104 : sq->src_tainted = TRUE;
908 : :
909 [ - + ]: 208 : GST_DEBUG_OBJECT (mq,
910 : : "queue %d, configured NEWSEGMENT %" GST_SEGMENT_FORMAT, sq->id, segment);
911 : :
912 : : /* segment can update the time level of the queue */
913 : 208 : update_time_level (mq, sq);
914 : :
915 : 208 : GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
916 : 208 : }
917 : :
918 : : /* take a buffer and update segment, updating the time level of the queue. */
919 : : static void
920 : 4293 : apply_buffer (GstMultiQueue * mq, GstSingleQueue * sq, GstClockTime timestamp,
921 : : GstClockTime duration, GstSegment * segment)
922 : : {
923 : 4293 : GST_MULTI_QUEUE_MUTEX_LOCK (mq);
924 : :
925 : : /* if no timestamp is set, assume it's continuous with the previous
926 : : * time */
927 [ - + ]: 4293 : if (timestamp == GST_CLOCK_TIME_NONE)
928 : 0 : timestamp = segment->last_stop;
929 : :
930 : : /* add duration */
931 [ - + ]: 4293 : if (duration != GST_CLOCK_TIME_NONE)
932 : 0 : timestamp += duration;
933 : :
934 [ - + ][ # # ]: 4293 : GST_DEBUG_OBJECT (mq, "queue %d, last_stop updated to %" GST_TIME_FORMAT,
[ # # ][ # # ]
[ # # ]
935 : : sq->id, GST_TIME_ARGS (timestamp));
936 : :
937 : 4293 : gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp);
938 : :
939 [ + + ]: 4293 : if (segment == &sq->sink_segment)
940 : 2363 : sq->sink_tainted = TRUE;
941 : : else
942 : 1930 : sq->src_tainted = TRUE;
943 : :
944 : : /* calc diff with other end */
945 : 4293 : update_time_level (mq, sq);
946 : 4293 : GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
947 : 4293 : }
948 : :
949 : : static GstFlowReturn
950 : 2043 : gst_single_queue_push_one (GstMultiQueue * mq, GstSingleQueue * sq,
951 : : GstMiniObject * object)
952 : : {
953 : 2043 : GstFlowReturn result = GST_FLOW_OK;
954 : :
955 [ - + ][ + - ]: 2043 : if (GST_IS_BUFFER (object)) {
[ + + ][ + + ]
956 : : GstBuffer *buffer;
957 : : GstClockTime timestamp, duration;
958 : : GstCaps *caps;
959 : :
960 : 1930 : buffer = GST_BUFFER_CAST (object);
961 : 1930 : timestamp = GST_BUFFER_TIMESTAMP (buffer);
962 : 1930 : duration = GST_BUFFER_DURATION (buffer);
963 : 1930 : caps = GST_BUFFER_CAPS (buffer);
964 : :
965 : 1930 : apply_buffer (mq, sq, timestamp, duration, &sq->src_segment);
966 : :
967 : : /* Applying the buffer may have made the queue non-full again, unblock it if needed */
968 : 1930 : gst_data_queue_limits_changed (sq->queue);
969 : :
970 [ - + ][ # # ]: 1930 : GST_DEBUG_OBJECT (mq,
[ # # ][ # # ]
[ # # ]
971 : : "SingleQueue %d : Pushing buffer %p with ts %" GST_TIME_FORMAT,
972 : : sq->id, buffer, GST_TIME_ARGS (timestamp));
973 : :
974 : : /* Set caps on pad before pushing, this avoids core calling the acceptcaps
975 : : * function on the srcpad, which will call acceptcaps upstream, which might
976 : : * not accept these caps (anymore). */
977 [ - + ][ # # ]: 1930 : if (caps && caps != GST_PAD_CAPS (sq->srcpad))
978 : 0 : gst_pad_set_caps (sq->srcpad, caps);
979 : :
980 : 1930 : result = gst_pad_push (sq->srcpad, buffer);
981 [ - + ][ + - ]: 113 : } else if (GST_IS_EVENT (object)) {
[ + - ][ + - ]
982 : : GstEvent *event;
983 : :
984 : 113 : event = GST_EVENT_CAST (object);
985 : :
986 [ + + - ]: 113 : switch (GST_EVENT_TYPE (event)) {
987 : : case GST_EVENT_EOS:
988 : 9 : result = GST_FLOW_UNEXPECTED;
989 : 9 : break;
990 : : case GST_EVENT_NEWSEGMENT:
991 : 104 : apply_segment (mq, sq, event, &sq->src_segment);
992 : : /* Applying the segment may have made the queue non-full again, unblock it if needed */
993 : 104 : gst_data_queue_limits_changed (sq->queue);
994 : 104 : break;
995 : : default:
996 : 0 : break;
997 : : }
998 : :
999 [ - + ]: 113 : GST_DEBUG_OBJECT (mq,
1000 : : "SingleQueue %d : Pushing event %p of type %s",
1001 : : sq->id, event, GST_EVENT_TYPE_NAME (event));
1002 : :
1003 : 113 : gst_pad_push_event (sq->srcpad, event);
1004 : : } else {
1005 : 0 : g_warning ("Unexpected object in singlequeue %d (refcounting problem?)",
1006 : : sq->id);
1007 : : }
1008 : 2043 : return result;
1009 : :
1010 : : /* ERRORS */
1011 : : }
1012 : :
1013 : : static GstMiniObject *
1014 : 2047 : gst_multi_queue_item_steal_object (GstMultiQueueItem * item)
1015 : : {
1016 : : GstMiniObject *res;
1017 : :
1018 : 2047 : res = item->object;
1019 : 2047 : item->object = NULL;
1020 : :
1021 : 2047 : return res;
1022 : : }
1023 : :
1024 : : static void
1025 : 2480 : gst_multi_queue_item_destroy (GstMultiQueueItem * item)
1026 : : {
1027 [ + + ]: 2480 : if (item->object)
1028 : 434 : gst_mini_object_unref (item->object);
1029 : 2480 : g_slice_free (GstMultiQueueItem, item);
1030 : 2481 : }
1031 : :
1032 : : /* takes ownership of passed mini object! */
1033 : : static GstMultiQueueItem *
1034 : 2364 : gst_multi_queue_buffer_item_new (GstMiniObject * object, guint32 curid)
1035 : : {
1036 : : GstMultiQueueItem *item;
1037 : :
1038 : 2364 : item = g_slice_new (GstMultiQueueItem);
1039 : 2364 : item->object = object;
1040 : 2364 : item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
1041 : 2364 : item->posid = curid;
1042 : :
1043 : 2364 : item->size = GST_BUFFER_SIZE (object);
1044 : 2364 : item->duration = GST_BUFFER_DURATION (object);
1045 [ + - ]: 2364 : if (item->duration == GST_CLOCK_TIME_NONE)
1046 : 2364 : item->duration = 0;
1047 : 2364 : item->visible = TRUE;
1048 : 2364 : return item;
1049 : : }
1050 : :
1051 : : static GstMultiQueueItem *
1052 : 117 : gst_multi_queue_event_item_new (GstMiniObject * object, guint32 curid)
1053 : : {
1054 : : GstMultiQueueItem *item;
1055 : :
1056 : 117 : item = g_slice_new (GstMultiQueueItem);
1057 : 117 : item->object = object;
1058 : 117 : item->destroy = (GDestroyNotify) gst_multi_queue_item_destroy;
1059 : 117 : item->posid = curid;
1060 : :
1061 : 117 : item->size = 0;
1062 : 117 : item->duration = 0;
1063 : 117 : item->visible = FALSE;
1064 : 117 : return item;
1065 : : }
1066 : :
1067 : : /* Each main loop attempts to push buffers until the return value
1068 : : * is not-linked. not-linked pads are not allowed to push data beyond
1069 : : * any linked pads, so they don't 'rush ahead of the pack'.
1070 : : */
1071 : : static void
1072 : 2056 : gst_multi_queue_loop (GstPad * pad)
1073 : : {
1074 : : GstSingleQueue *sq;
1075 : : GstMultiQueueItem *item;
1076 : : GstDataQueueItem *sitem;
1077 : : GstMultiQueue *mq;
1078 : 2056 : GstMiniObject *object = NULL;
1079 : : guint32 newid;
1080 : : GstFlowReturn result;
1081 : :
1082 : 2056 : sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
1083 : 2056 : mq = sq->mqueue;
1084 : :
1085 [ - + ]: 2056 : GST_DEBUG_OBJECT (mq, "SingleQueue %d : trying to pop an object", sq->id);
1086 : :
1087 [ - + ]: 2056 : if (sq->flushing)
1088 : 0 : goto out_flushing;
1089 : :
1090 : : /* Get something from the queue, blocking until that happens, or we get
1091 : : * flushed */
1092 [ + + ]: 2056 : if (!(gst_data_queue_pop (sq->queue, &sitem)))
1093 : 9 : goto out_flushing;
1094 : :
1095 : 2047 : item = (GstMultiQueueItem *) sitem;
1096 : 2047 : newid = item->posid;
1097 : :
1098 : : /* steal the object and destroy the item */
1099 : 2047 : object = gst_multi_queue_item_steal_object (item);
1100 : 2047 : gst_multi_queue_item_destroy (item);
1101 : :
1102 [ - + ]: 2047 : GST_LOG_OBJECT (mq, "SingleQueue %d : newid:%d , oldid:%d",
1103 : : sq->id, newid, sq->last_oldid);
1104 : :
1105 : : /* If we're not-linked, we do some extra work because we might need to
1106 : : * wait before pushing. If we're linked but there's a gap in the IDs,
1107 : : * or it's the first loop, or we just passed the previous highid,
1108 : : * we might need to wake some sleeping pad up, so there's extra work
1109 : : * there too */
1110 [ + + ][ + + ]: 2047 : if (sq->srcresult == GST_FLOW_NOT_LINKED ||
1111 [ + + ][ + + ]: 1062 : (sq->last_oldid == G_MAXUINT32) || (newid != (sq->last_oldid + 1)) ||
1112 : 527 : sq->last_oldid > mq->highid) {
1113 [ - + ]: 1990 : GST_LOG_OBJECT (mq, "CHECKING sq->srcresult: %s",
1114 : : gst_flow_get_name (sq->srcresult));
1115 : :
1116 : 1990 : GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1117 : :
1118 : : /* Update the nextid so other threads know when to wake us up */
1119 : 1990 : sq->nextid = newid;
1120 : :
1121 : : /* Update the oldid (the last ID we output) for highid tracking */
1122 [ + + ]: 1990 : if (sq->last_oldid != G_MAXUINT32)
1123 : 1976 : sq->oldid = sq->last_oldid;
1124 : :
1125 [ + + ]: 1990 : if (sq->srcresult == GST_FLOW_NOT_LINKED) {
1126 : : /* Go to sleep until it's time to push this buffer */
1127 : :
1128 : : /* Recompute the highid */
1129 : 971 : compute_high_id (mq);
1130 [ + + ][ + - ]: 1437 : while (newid > mq->highid && sq->srcresult == GST_FLOW_NOT_LINKED) {
1131 [ - + ]: 470 : GST_DEBUG_OBJECT (mq, "queue %d sleeping for not-linked wakeup with "
1132 : : "newid %u and highid %u", sq->id, newid, mq->highid);
1133 : :
1134 : :
1135 : : /* Wake up all non-linked pads before we sleep */
1136 : 470 : wake_up_next_non_linked (mq);
1137 : :
1138 : 470 : mq->numwaiting++;
1139 : 470 : g_cond_wait (sq->turn, mq->qlock);
1140 : 470 : mq->numwaiting--;
1141 : :
1142 [ + + ]: 470 : if (sq->flushing) {
1143 : 4 : GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1144 : 4 : goto out_flushing;
1145 : : }
1146 : :
1147 [ - + ]: 466 : GST_DEBUG_OBJECT (mq, "queue %d woken from sleeping for not-linked "
1148 : : "wakeup with newid %u and highid %u", sq->id, newid, mq->highid);
1149 : : }
1150 : :
1151 : : /* Re-compute the high_id in case someone else pushed */
1152 : 967 : compute_high_id (mq);
1153 : : } else {
1154 : 1019 : compute_high_id (mq);
1155 : : /* Wake up all non-linked pads */
1156 : 1019 : wake_up_next_non_linked (mq);
1157 : : }
1158 : : /* We're done waiting, we can clear the nextid */
1159 : 1986 : sq->nextid = 0;
1160 : :
1161 : 1986 : GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1162 : : }
1163 : :
1164 [ - + ]: 2043 : if (sq->flushing)
1165 : 0 : goto out_flushing;
1166 : :
1167 [ - + ]: 2043 : GST_LOG_OBJECT (mq, "BEFORE PUSHING sq->srcresult: %s",
1168 : : gst_flow_get_name (sq->srcresult));
1169 : :
1170 : : /* Try to push out the new object */
1171 : 2043 : result = gst_single_queue_push_one (mq, sq, object);
1172 : 2043 : sq->srcresult = result;
1173 : 2043 : object = NULL;
1174 : :
1175 [ + + ][ + + ]: 2043 : if (result != GST_FLOW_OK && result != GST_FLOW_NOT_LINKED
1176 [ + + ]: 10 : && result != GST_FLOW_UNEXPECTED)
1177 : 1 : goto out_flushing;
1178 : :
1179 [ - + ]: 2042 : GST_LOG_OBJECT (mq, "AFTER PUSHING sq->srcresult: %s",
1180 : : gst_flow_get_name (sq->srcresult));
1181 : :
1182 : 2042 : sq->last_oldid = newid;
1183 : 2042 : return;
1184 : :
1185 : : out_flushing:
1186 : : {
1187 [ + + ]: 14 : if (object)
1188 : 4 : gst_mini_object_unref (object);
1189 : :
1190 : : /* Need to make sure wake up any sleeping pads when we exit */
1191 : 14 : GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1192 : 14 : compute_high_id (mq);
1193 : 14 : wake_up_next_non_linked (mq);
1194 : 14 : GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1195 : :
1196 : : /* upstream needs to see fatal result ASAP to shut things down,
1197 : : * but might be stuck in one of our other full queues;
1198 : : * so empty this one and trigger dynamic queue growth. At
1199 : : * this point the srcresult is not OK, NOT_LINKED
1200 : : * or UNEXPECTED, i.e. a real failure */
1201 : 14 : gst_data_queue_flush (sq->queue);
1202 : 14 : single_queue_underrun_cb (sq->queue, sq);
1203 : 14 : gst_data_queue_set_flushing (sq->queue, TRUE);
1204 : 14 : gst_pad_pause_task (sq->srcpad);
1205 [ - + ]: 14 : GST_CAT_LOG_OBJECT (multi_queue_debug, mq,
1206 : : "SingleQueue[%d] task paused, reason:%s",
1207 : : sq->id, gst_flow_get_name (sq->srcresult));
1208 : 2056 : return;
1209 : : }
1210 : : }
1211 : :
1212 : : /**
1213 : : * gst_multi_queue_chain:
1214 : : *
1215 : : * This is similar to GstQueue's chain function, except:
1216 : : * _ we don't have leak behavioures,
1217 : : * _ we push with a unique id (curid)
1218 : : */
1219 : : static GstFlowReturn
1220 : 2364 : gst_multi_queue_chain (GstPad * pad, GstBuffer * buffer)
1221 : : {
1222 : : GstSingleQueue *sq;
1223 : : GstMultiQueue *mq;
1224 : : GstMultiQueueItem *item;
1225 : : guint32 curid;
1226 : : GstClockTime timestamp, duration;
1227 : :
1228 : 2364 : sq = gst_pad_get_element_private (pad);
1229 : 2364 : mq = sq->mqueue;
1230 : :
1231 : : /* if eos, we are always full, so avoid hanging incoming indefinitely */
1232 [ - + ]: 2364 : if (sq->is_eos)
1233 : 0 : goto was_eos;
1234 : :
1235 : : /* Get a unique incrementing id */
1236 : 2364 : curid = g_atomic_int_exchange_and_add ((gint *) & mq->counter, 1);
1237 : :
1238 [ - + ]: 2364 : GST_LOG_OBJECT (mq, "SingleQueue %d : about to enqueue buffer %p with id %d",
1239 : : sq->id, buffer, curid);
1240 : :
1241 : 2364 : item = gst_multi_queue_buffer_item_new (GST_MINI_OBJECT_CAST (buffer), curid);
1242 : :
1243 : 2364 : timestamp = GST_BUFFER_TIMESTAMP (buffer);
1244 : 2364 : duration = GST_BUFFER_DURATION (buffer);
1245 : :
1246 [ + + ]: 2364 : if (!(gst_data_queue_push (sq->queue, (GstDataQueueItem *) item)))
1247 : 1 : goto flushing;
1248 : :
1249 : : /* update time level, we must do this after pushing the data in the queue so
1250 : : * that we never end up filling the queue first. */
1251 : 2363 : apply_buffer (mq, sq, timestamp, duration, &sq->sink_segment);
1252 : :
1253 : : done:
1254 : 2364 : return sq->srcresult;
1255 : :
1256 : : /* ERRORS */
1257 : : flushing:
1258 : : {
1259 [ - + ]: 1 : GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
1260 : : sq->id, gst_flow_get_name (sq->srcresult));
1261 : 1 : gst_multi_queue_item_destroy (item);
1262 : 1 : goto done;
1263 : : }
1264 : : was_eos:
1265 : : {
1266 [ # # ]: 0 : GST_DEBUG_OBJECT (mq, "we are EOS, dropping buffer, return UNEXPECTED");
1267 : 0 : gst_buffer_unref (buffer);
1268 : 2364 : return GST_FLOW_UNEXPECTED;
1269 : : }
1270 : : }
1271 : :
1272 : : static gboolean
1273 : 28 : gst_multi_queue_sink_activate_push (GstPad * pad, gboolean active)
1274 : : {
1275 : : GstSingleQueue *sq;
1276 : :
1277 : 28 : sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
1278 : :
1279 [ + + ]: 28 : if (active) {
1280 : : /* All pads start off linked until they push one buffer */
1281 : 14 : sq->srcresult = GST_FLOW_OK;
1282 : : } else {
1283 : 14 : sq->srcresult = GST_FLOW_WRONG_STATE;
1284 : 14 : gst_data_queue_flush (sq->queue);
1285 : : }
1286 : 28 : return TRUE;
1287 : : }
1288 : :
1289 : : static gboolean
1290 : 117 : gst_multi_queue_sink_event (GstPad * pad, GstEvent * event)
1291 : : {
1292 : : GstSingleQueue *sq;
1293 : : GstMultiQueue *mq;
1294 : : guint32 curid;
1295 : : GstMultiQueueItem *item;
1296 : : gboolean res;
1297 : : GstEventType type;
1298 : 117 : GstEvent *sref = NULL;
1299 : :
1300 : 117 : sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
1301 : 117 : mq = (GstMultiQueue *) gst_pad_get_parent (pad);
1302 : :
1303 : 117 : type = GST_EVENT_TYPE (event);
1304 : :
1305 [ - - + + ]: 117 : switch (type) {
1306 : : case GST_EVENT_FLUSH_START:
1307 [ # # ]: 0 : GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush start event",
1308 : : sq->id);
1309 : :
1310 : 0 : res = gst_pad_push_event (sq->srcpad, event);
1311 : :
1312 : 0 : gst_single_queue_flush (mq, sq, TRUE);
1313 : 0 : goto done;
1314 : :
1315 : : case GST_EVENT_FLUSH_STOP:
1316 [ # # ]: 0 : GST_DEBUG_OBJECT (mq, "SingleQueue %d : received flush stop event",
1317 : : sq->id);
1318 : :
1319 : 0 : res = gst_pad_push_event (sq->srcpad, event);
1320 : :
1321 : 0 : gst_single_queue_flush (mq, sq, FALSE);
1322 : 0 : goto done;
1323 : : case GST_EVENT_NEWSEGMENT:
1324 : : /* take ref because the queue will take ownership and we need the event
1325 : : * afterwards to update the segment */
1326 : 104 : sref = gst_event_ref (event);
1327 : 104 : break;
1328 : :
1329 : : default:
1330 [ - + ]: 13 : if (!(GST_EVENT_IS_SERIALIZED (event))) {
1331 : 0 : res = gst_pad_push_event (sq->srcpad, event);
1332 : 0 : goto done;
1333 : : }
1334 : 13 : break;
1335 : : }
1336 : :
1337 : : /* if eos, we are always full, so avoid hanging incoming indefinitely */
1338 [ - + ]: 117 : if (sq->is_eos)
1339 : 0 : goto was_eos;
1340 : :
1341 : : /* Get an unique incrementing id. */
1342 : 117 : curid = g_atomic_int_exchange_and_add ((gint *) & mq->counter, 1);
1343 : :
1344 : 117 : item = gst_multi_queue_event_item_new ((GstMiniObject *) event, curid);
1345 : :
1346 [ - + ]: 117 : GST_DEBUG_OBJECT (mq,
1347 : : "SingleQueue %d : Enqueuing event %p of type %s with id %d",
1348 : : sq->id, event, GST_EVENT_TYPE_NAME (event), curid);
1349 : :
1350 [ - + ]: 117 : if (!(res = gst_data_queue_push (sq->queue, (GstDataQueueItem *) item)))
1351 : 0 : goto flushing;
1352 : :
1353 : : /* mark EOS when we received one, we must do that after putting the
1354 : : * buffer in the queue because EOS marks the buffer as filled. No need to take
1355 : : * a lock, the _check_full happens from this thread only, right before pushing
1356 : : * into dataqueue. */
1357 [ + + - ]: 117 : switch (type) {
1358 : : case GST_EVENT_EOS:
1359 : 13 : sq->is_eos = TRUE;
1360 : : /* EOS affects the buffering state */
1361 : 13 : update_buffering (mq, sq);
1362 : 13 : single_queue_overrun_cb (sq->queue, sq);
1363 : 13 : break;
1364 : : case GST_EVENT_NEWSEGMENT:
1365 : 104 : apply_segment (mq, sq, sref, &sq->sink_segment);
1366 : 104 : gst_event_unref (sref);
1367 : 104 : break;
1368 : : default:
1369 : 0 : break;
1370 : : }
1371 : : done:
1372 : 117 : gst_object_unref (mq);
1373 : 117 : return res;
1374 : :
1375 : : flushing:
1376 : : {
1377 [ # # ]: 0 : GST_LOG_OBJECT (mq, "SingleQueue %d : exit because task paused, reason: %s",
1378 : : sq->id, gst_flow_get_name (sq->srcresult));
1379 [ # # ]: 0 : if (sref)
1380 : 0 : gst_event_unref (sref);
1381 : 0 : gst_multi_queue_item_destroy (item);
1382 : 0 : goto done;
1383 : : }
1384 : : was_eos:
1385 : : {
1386 [ # # ]: 0 : GST_DEBUG_OBJECT (mq, "we are EOS, dropping event, return FALSE");
1387 : 0 : gst_event_unref (event);
1388 : 0 : res = FALSE;
1389 : 0 : goto done;
1390 : : }
1391 : : }
1392 : :
1393 : : static GstCaps *
1394 : 28 : gst_multi_queue_getcaps (GstPad * pad)
1395 : : {
1396 : 28 : GstSingleQueue *sq = gst_pad_get_element_private (pad);
1397 : : GstPad *otherpad;
1398 : : GstCaps *result;
1399 : :
1400 [ + + ]: 28 : otherpad = (pad == sq->srcpad) ? sq->sinkpad : sq->srcpad;
1401 : :
1402 [ - + ]: 28 : GST_LOG_OBJECT (otherpad, "Getting caps from the peer of this pad");
1403 : :
1404 : 28 : result = gst_pad_peer_get_caps (otherpad);
1405 [ + + ]: 28 : if (result == NULL)
1406 : 14 : result = gst_caps_new_any ();
1407 : :
1408 : 28 : return result;
1409 : : }
1410 : :
1411 : : static gboolean
1412 : 0 : gst_multi_queue_acceptcaps (GstPad * pad, GstCaps * caps)
1413 : : {
1414 : 0 : GstSingleQueue *sq = gst_pad_get_element_private (pad);
1415 : : GstPad *otherpad;
1416 : : gboolean result;
1417 : :
1418 [ # # ]: 0 : otherpad = (pad == sq->srcpad) ? sq->sinkpad : sq->srcpad;
1419 : :
1420 [ # # ]: 0 : GST_LOG_OBJECT (otherpad, "Accept caps from the peer of this pad");
1421 : :
1422 : 0 : result = gst_pad_peer_accept_caps (otherpad, caps);
1423 : :
1424 : 0 : return result;
1425 : : }
1426 : :
1427 : : static GstFlowReturn
1428 : 0 : gst_multi_queue_bufferalloc (GstPad * pad, guint64 offset, guint size,
1429 : : GstCaps * caps, GstBuffer ** buf)
1430 : : {
1431 : 0 : GstSingleQueue *sq = gst_pad_get_element_private (pad);
1432 : :
1433 : 0 : return gst_pad_alloc_buffer (sq->srcpad, offset, size, caps, buf);
1434 : : }
1435 : :
1436 : : static gboolean
1437 : 28 : gst_multi_queue_src_activate_push (GstPad * pad, gboolean active)
1438 : : {
1439 : : GstMultiQueue *mq;
1440 : : GstSingleQueue *sq;
1441 : 28 : gboolean result = FALSE;
1442 : :
1443 : 28 : sq = (GstSingleQueue *) gst_pad_get_element_private (pad);
1444 : 28 : mq = sq->mqueue;
1445 : :
1446 [ - + ]: 28 : GST_DEBUG_OBJECT (mq, "SingleQueue %d", sq->id);
1447 : :
1448 [ + + ]: 28 : if (active) {
1449 : 14 : result = gst_single_queue_flush (mq, sq, FALSE);
1450 : : } else {
1451 : 14 : result = gst_single_queue_flush (mq, sq, TRUE);
1452 : : /* make sure streaming finishes */
1453 : 14 : result |= gst_pad_stop_task (pad);
1454 : : }
1455 : 28 : return result;
1456 : : }
1457 : :
1458 : : static gboolean
1459 : 1 : gst_multi_queue_src_event (GstPad * pad, GstEvent * event)
1460 : : {
1461 : 1 : GstSingleQueue *sq = gst_pad_get_element_private (pad);
1462 : :
1463 : 1 : return gst_pad_push_event (sq->sinkpad, event);
1464 : : }
1465 : :
1466 : : static gboolean
1467 : 1 : gst_multi_queue_src_query (GstPad * pad, GstQuery * query)
1468 : : {
1469 : 1 : GstSingleQueue *sq = gst_pad_get_element_private (pad);
1470 : : GstPad *peerpad;
1471 : : gboolean res;
1472 : :
1473 : : /* FIXME, Handle position offset depending on queue size */
1474 : :
1475 : : /* default handling */
1476 [ - + ]: 1 : if (!(peerpad = gst_pad_get_peer (sq->sinkpad)))
1477 : 0 : goto no_peer;
1478 : :
1479 : 1 : res = gst_pad_query (peerpad, query);
1480 : :
1481 : 1 : gst_object_unref (peerpad);
1482 : :
1483 : 1 : return res;
1484 : :
1485 : : /* ERRORS */
1486 : : no_peer:
1487 : : {
1488 [ # # ]: 0 : GST_LOG_OBJECT (sq->sinkpad, "Couldn't send query because we have no peer");
1489 : 1 : return FALSE;
1490 : : }
1491 : : }
1492 : :
1493 : : /*
1494 : : * Next-non-linked functions
1495 : : */
1496 : :
1497 : : /* WITH LOCK TAKEN */
1498 : : static void
1499 : 1503 : wake_up_next_non_linked (GstMultiQueue * mq)
1500 : : {
1501 : : GList *tmp;
1502 : :
1503 : : /* maybe no-one is waiting */
1504 [ + + ]: 1503 : if (mq->numwaiting < 1)
1505 : 1503 : return;
1506 : :
1507 : : /* Else figure out which singlequeue(s) need waking up */
1508 [ + - ][ + + ]: 3457 : for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
1509 : 2872 : GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
1510 : :
1511 [ + + ]: 2872 : if (sq->srcresult == GST_FLOW_NOT_LINKED) {
1512 [ + + ][ + + ]: 2495 : if (sq->nextid != 0 && sq->nextid <= mq->highid) {
1513 [ - + ]: 480 : GST_LOG_OBJECT (mq, "Waking up singlequeue %d", sq->id);
1514 : 480 : g_cond_signal (sq->turn);
1515 : : }
1516 : : }
1517 : : }
1518 : : }
1519 : :
1520 : : /* WITH LOCK TAKEN */
1521 : : static void
1522 : 2971 : compute_high_id (GstMultiQueue * mq)
1523 : : {
1524 : : /* The high-id is either the highest id among the linked pads, or if all
1525 : : * pads are not-linked, it's the lowest not-linked pad */
1526 : : GList *tmp;
1527 : 2971 : guint32 lowest = G_MAXUINT32;
1528 : 2971 : guint32 highid = G_MAXUINT32;
1529 : :
1530 [ + - ][ + + ]: 15799 : for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
1531 : 12828 : GstSingleQueue *sq = (GstSingleQueue *) tmp->data;
1532 : :
1533 [ - + ]: 12828 : GST_LOG_OBJECT (mq, "inspecting sq:%d , nextid:%d, oldid:%d, srcresult:%s",
1534 : : sq->id, sq->nextid, sq->oldid, gst_flow_get_name (sq->srcresult));
1535 : :
1536 [ + + ]: 12828 : if (sq->srcresult == GST_FLOW_NOT_LINKED) {
1537 : : /* No need to consider queues which are not waiting */
1538 [ + + ]: 9399 : if (sq->nextid == 0) {
1539 [ - + ]: 2702 : GST_LOG_OBJECT (mq, "sq:%d is not waiting - ignoring", sq->id);
1540 : 2702 : continue;
1541 : : }
1542 : :
1543 [ + + ]: 6697 : if (sq->nextid < lowest)
1544 : 3212 : lowest = sq->nextid;
1545 [ + + ]: 3429 : } else if (sq->srcresult != GST_FLOW_UNEXPECTED) {
1546 : : /* If we don't have a global highid, or the global highid is lower than
1547 : : * this single queue's last outputted id, store the queue's one,
1548 : : * unless the singlequeue is at EOS (srcresult = UNEXPECTED) */
1549 [ + + ][ + + ]: 3405 : if ((highid == G_MAXUINT32) || (sq->oldid > highid))
1550 : 2588 : highid = sq->oldid;
1551 : : }
1552 : : }
1553 : :
1554 [ + + ][ + + ]: 2971 : if (highid == G_MAXUINT32 || lowest < highid)
1555 : 1958 : mq->highid = lowest;
1556 : : else
1557 : 1013 : mq->highid = highid;
1558 : :
1559 [ - + ]: 2971 : GST_LOG_OBJECT (mq, "Highid is now : %u, lowest non-linked %u", mq->highid,
1560 : : lowest);
1561 : 2971 : }
1562 : :
1563 : : #define IS_FILLED(q, format, value) (((q)->max_size.format) != 0 && \
1564 : : ((q)->max_size.format) <= (value))
1565 : :
1566 : : /*
1567 : : * GstSingleQueue functions
1568 : : */
1569 : : static void
1570 : 92 : single_queue_overrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
1571 : : {
1572 : 92 : GstMultiQueue *mq = sq->mqueue;
1573 : : GList *tmp;
1574 : : GstDataQueueSize size;
1575 : 92 : gboolean filled = FALSE;
1576 : :
1577 : 92 : gst_data_queue_get_level (sq->queue, &size);
1578 : :
1579 [ - + ]: 92 : GST_LOG_OBJECT (mq, "Single Queue %d is full", sq->id);
1580 : :
1581 : 92 : GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1582 [ + - ][ + + ]: 201 : for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
1583 : 115 : GstSingleQueue *oq = (GstSingleQueue *) tmp->data;
1584 : : GstDataQueueSize ssize;
1585 : :
1586 [ - + ]: 115 : GST_LOG_OBJECT (mq, "Checking Queue %d", oq->id);
1587 : :
1588 [ + + ]: 115 : if (gst_data_queue_is_empty (oq->queue)) {
1589 [ - + ]: 6 : GST_LOG_OBJECT (mq, "Queue %d is empty", oq->id);
1590 [ - + ][ # # ]: 6 : if (IS_FILLED (sq, visible, size.visible)) {
1591 : 0 : sq->max_size.visible = size.visible + 1;
1592 [ # # ]: 0 : GST_DEBUG_OBJECT (mq,
1593 : : "Another queue is empty, bumping single queue %d max visible to %d",
1594 : : sq->id, sq->max_size.visible);
1595 : : }
1596 : 6 : GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1597 : 6 : goto beach;
1598 : : }
1599 : : /* check if we reached the hard time/bytes limits */
1600 : 109 : gst_data_queue_get_level (oq->queue, &ssize);
1601 : :
1602 [ - + ]: 109 : GST_DEBUG_OBJECT (mq,
1603 : : "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
1604 : : G_GUINT64_FORMAT, oq->id, ssize.visible, oq->max_size.visible,
1605 : : ssize.bytes, oq->max_size.bytes, oq->cur_time, oq->max_size.time);
1606 : :
1607 : : /* if this queue is filled completely we must signal overrun.
1608 : : * FIXME, this seems wrong in many ways
1609 : : * - we're comparing the filled level of this queue against the
1610 : : * values of the other one
1611 : : * - we should only do this after we found no empty queues, ie, move
1612 : : * this check outside of the loop
1613 : : * - the debug statement talks about a different queue than the one
1614 : : * we are checking here.
1615 : : */
1616 [ + + ][ + - ]: 109 : if (sq->is_eos || IS_FILLED (sq, bytes, ssize.bytes) ||
[ + - ][ + - ]
1617 [ - + ]: 79 : IS_FILLED (sq, time, sq->cur_time)) {
1618 [ - + ]: 30 : GST_LOG_OBJECT (mq, "Queue %d is filled", oq->id);
1619 : 30 : filled = TRUE;
1620 : : }
1621 : : }
1622 : : /* no queues were empty */
1623 : 86 : GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1624 : :
1625 : : /* Overrun is always forwarded, since this is blocking the upstream element */
1626 [ + + ]: 86 : if (filled) {
1627 [ - + ]: 7 : GST_DEBUG_OBJECT (mq, "A queue is filled, signalling overrun");
1628 : 7 : g_signal_emit (mq, gst_multi_queue_signals[SIGNAL_OVERRUN], 0);
1629 : : }
1630 : :
1631 : : beach:
1632 : 92 : return;
1633 : : }
1634 : :
1635 : : static void
1636 : 340 : single_queue_underrun_cb (GstDataQueue * dq, GstSingleQueue * sq)
1637 : : {
1638 : 340 : gboolean empty = TRUE;
1639 : 340 : GstMultiQueue *mq = sq->mqueue;
1640 : : GList *tmp;
1641 : :
1642 [ - + ]: 340 : GST_LOG_OBJECT (mq,
1643 : : "Single Queue %d is empty, Checking other single queues", sq->id);
1644 : :
1645 : 340 : GST_MULTI_QUEUE_MUTEX_LOCK (mq);
1646 [ + - ][ + + ]: 1550 : for (tmp = mq->queues; tmp; tmp = g_list_next (tmp)) {
1647 : 1210 : GstSingleQueue *oq = (GstSingleQueue *) tmp->data;
1648 : :
1649 [ + + ]: 1210 : if (gst_data_queue_is_full (oq->queue)) {
1650 : : GstDataQueueSize size;
1651 : :
1652 : 45 : gst_data_queue_get_level (oq->queue, &size);
1653 [ + + ][ - + ]: 45 : if (IS_FILLED (oq, visible, size.visible)) {
1654 : 0 : oq->max_size.visible = size.visible + 1;
1655 [ # # ]: 0 : GST_DEBUG_OBJECT (mq,
1656 : : "queue %d is filled, bumping its max visible to %d", oq->id,
1657 : : oq->max_size.visible);
1658 : 0 : gst_data_queue_limits_changed (oq->queue);
1659 : : }
1660 : : }
1661 [ + + ]: 1210 : if (!gst_data_queue_is_empty (oq->queue))
1662 : 299 : empty = FALSE;
1663 : : }
1664 : 340 : GST_MULTI_QUEUE_MUTEX_UNLOCK (mq);
1665 : :
1666 [ + + ]: 340 : if (empty) {
1667 [ - + ]: 141 : GST_DEBUG_OBJECT (mq, "All queues are empty, signalling it");
1668 : 141 : g_signal_emit (mq, gst_multi_queue_signals[SIGNAL_UNDERRUN], 0);
1669 : : }
1670 : 340 : }
1671 : :
1672 : : static gboolean
1673 : 3848 : single_queue_check_full (GstDataQueue * dataq, guint visible, guint bytes,
1674 : : guint64 time, GstSingleQueue * sq)
1675 : : {
1676 : : gboolean res;
1677 : 3848 : GstMultiQueue *mq = sq->mqueue;
1678 : :
1679 [ - + ]: 3848 : GST_DEBUG_OBJECT (mq,
1680 : : "queue %d: visible %u/%u, bytes %u/%u, time %" G_GUINT64_FORMAT "/%"
1681 : : G_GUINT64_FORMAT, sq->id, visible, sq->max_size.visible, bytes,
1682 : : sq->max_size.bytes, sq->cur_time, sq->max_size.time);
1683 : :
1684 : : /* we are always filled on EOS */
1685 [ + + ]: 3848 : if (sq->is_eos)
1686 : 45 : return TRUE;
1687 : :
1688 : : /* we never go past the max visible items unless we are in buffering mode */
1689 [ + - ][ + + ]: 3803 : if (!mq->use_buffering && IS_FILLED (sq, visible, visible))
[ + + ]
1690 : 158 : return TRUE;
1691 : :
1692 : : /* check time or bytes */
1693 [ + + ][ + - ]: 3645 : res = IS_FILLED (sq, time, sq->cur_time) || IS_FILLED (sq, bytes, bytes);
[ + + ][ - + ]
1694 : :
1695 : 3848 : return res;
1696 : : }
1697 : :
1698 : : static void
1699 : 16 : gst_single_queue_free (GstSingleQueue * sq)
1700 : : {
1701 : : /* DRAIN QUEUE */
1702 : 16 : gst_data_queue_flush (sq->queue);
1703 : 16 : g_object_unref (sq->queue);
1704 : 16 : g_cond_free (sq->turn);
1705 : 16 : g_free (sq);
1706 : 16 : }
1707 : :
1708 : : static GstSingleQueue *
1709 : 16 : gst_single_queue_new (GstMultiQueue * mqueue)
1710 : : {
1711 : : GstSingleQueue *sq;
1712 : : gchar *tmp;
1713 : :
1714 : 16 : sq = g_new0 (GstSingleQueue, 1);
1715 : :
1716 : 16 : GST_MULTI_QUEUE_MUTEX_LOCK (mqueue);
1717 : 16 : sq->id = mqueue->nbqueues++;
1718 : :
1719 : : /* copy over max_size and extra_size so we don't need to take the lock
1720 : : * any longer when checking if the queue is full. */
1721 : 16 : sq->max_size.visible = mqueue->max_size.visible;
1722 : 16 : sq->max_size.bytes = mqueue->max_size.bytes;
1723 : 16 : sq->max_size.time = mqueue->max_size.time;
1724 : :
1725 : 16 : sq->extra_size.visible = mqueue->extra_size.visible;
1726 : 16 : sq->extra_size.bytes = mqueue->extra_size.bytes;
1727 : 16 : sq->extra_size.time = mqueue->extra_size.time;
1728 : :
1729 : 16 : GST_MULTI_QUEUE_MUTEX_UNLOCK (mqueue);
1730 : :
1731 [ - + ]: 16 : GST_DEBUG_OBJECT (mqueue, "Creating GstSingleQueue id:%d", sq->id);
1732 : :
1733 : 16 : sq->mqueue = mqueue;
1734 : 16 : sq->srcresult = GST_FLOW_WRONG_STATE;
1735 : 16 : sq->queue = gst_data_queue_new_full ((GstDataQueueCheckFullFunction)
1736 : : single_queue_check_full,
1737 : : (GstDataQueueFullCallback) single_queue_overrun_cb,
1738 : : (GstDataQueueEmptyCallback) single_queue_underrun_cb, sq);
1739 : 16 : sq->is_eos = FALSE;
1740 : 16 : sq->flushing = FALSE;
1741 : 16 : gst_segment_init (&sq->sink_segment, GST_FORMAT_TIME);
1742 : 16 : gst_segment_init (&sq->src_segment, GST_FORMAT_TIME);
1743 : :
1744 : 16 : sq->nextid = 0;
1745 : 16 : sq->oldid = 0;
1746 : 16 : sq->turn = g_cond_new ();
1747 : :
1748 : 16 : sq->sinktime = GST_CLOCK_TIME_NONE;
1749 : 16 : sq->srctime = GST_CLOCK_TIME_NONE;
1750 : 16 : sq->sink_tainted = TRUE;
1751 : 16 : sq->src_tainted = TRUE;
1752 : :
1753 : 16 : tmp = g_strdup_printf ("sink%d", sq->id);
1754 : 16 : sq->sinkpad = gst_pad_new_from_static_template (&sinktemplate, tmp);
1755 : 16 : g_free (tmp);
1756 : :
1757 : 16 : gst_pad_set_chain_function (sq->sinkpad,
1758 : 16 : GST_DEBUG_FUNCPTR (gst_multi_queue_chain));
1759 : 16 : gst_pad_set_activatepush_function (sq->sinkpad,
1760 : 16 : GST_DEBUG_FUNCPTR (gst_multi_queue_sink_activate_push));
1761 : 16 : gst_pad_set_event_function (sq->sinkpad,
1762 : 16 : GST_DEBUG_FUNCPTR (gst_multi_queue_sink_event));
1763 : 16 : gst_pad_set_getcaps_function (sq->sinkpad,
1764 : 16 : GST_DEBUG_FUNCPTR (gst_multi_queue_getcaps));
1765 : 16 : gst_pad_set_acceptcaps_function (sq->sinkpad,
1766 : 16 : GST_DEBUG_FUNCPTR (gst_multi_queue_acceptcaps));
1767 : 16 : gst_pad_set_bufferalloc_function (sq->sinkpad,
1768 : 16 : GST_DEBUG_FUNCPTR (gst_multi_queue_bufferalloc));
1769 : 16 : gst_pad_set_iterate_internal_links_function (sq->sinkpad,
1770 : 16 : GST_DEBUG_FUNCPTR (gst_multi_queue_iterate_internal_links));
1771 : :
1772 : 16 : tmp = g_strdup_printf ("src%d", sq->id);
1773 : 16 : sq->srcpad = gst_pad_new_from_static_template (&srctemplate, tmp);
1774 : 16 : g_free (tmp);
1775 : :
1776 : 16 : gst_pad_set_activatepush_function (sq->srcpad,
1777 : 16 : GST_DEBUG_FUNCPTR (gst_multi_queue_src_activate_push));
1778 : 16 : gst_pad_set_getcaps_function (sq->srcpad,
1779 : 16 : GST_DEBUG_FUNCPTR (gst_multi_queue_getcaps));
1780 : 16 : gst_pad_set_acceptcaps_function (sq->srcpad,
1781 : 16 : GST_DEBUG_FUNCPTR (gst_multi_queue_acceptcaps));
1782 : 16 : gst_pad_set_event_function (sq->srcpad,
1783 : 16 : GST_DEBUG_FUNCPTR (gst_multi_queue_src_event));
1784 : 16 : gst_pad_set_query_function (sq->srcpad,
1785 : 16 : GST_DEBUG_FUNCPTR (gst_multi_queue_src_query));
1786 : 16 : gst_pad_set_iterate_internal_links_function (sq->srcpad,
1787 : 16 : GST_DEBUG_FUNCPTR (gst_multi_queue_iterate_internal_links));
1788 : :
1789 : 16 : gst_pad_set_element_private (sq->sinkpad, (gpointer) sq);
1790 : 16 : gst_pad_set_element_private (sq->srcpad, (gpointer) sq);
1791 : :
1792 : : /* only activate the pads when we are not in the NULL state
1793 : : * and add the pad under the state_lock to prevend state changes
1794 : : * between activating and adding */
1795 : 16 : g_static_rec_mutex_lock (GST_STATE_GET_LOCK (mqueue));
1796 [ - + ]: 16 : if (GST_STATE_TARGET (mqueue) != GST_STATE_NULL) {
1797 : 0 : gst_pad_set_active (sq->srcpad, TRUE);
1798 : 0 : gst_pad_set_active (sq->sinkpad, TRUE);
1799 : : }
1800 : 16 : gst_element_add_pad (GST_ELEMENT (mqueue), sq->srcpad);
1801 : 16 : gst_element_add_pad (GST_ELEMENT (mqueue), sq->sinkpad);
1802 : 16 : g_static_rec_mutex_unlock (GST_STATE_GET_LOCK (mqueue));
1803 : :
1804 [ - + ]: 16 : GST_DEBUG_OBJECT (mqueue, "GstSingleQueue [%d] created and pads added",
1805 : : sq->id);
1806 : :
1807 : 16 : return sq;
1808 : : }
|