Branch data Line data Source code
1 : : /* GStreamer
2 : : * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3 : : * 2000 Wim Taymans <wtay@chello.be>
4 : : * 2003 Colin Walters <cwalters@gnome.org>
5 : : * 2005 Wim Taymans <wim@fluendo.com>
6 : : *
7 : : * gstqueue.c:
8 : : *
9 : : * This library is free software; you can redistribute it and/or
10 : : * modify it under the terms of the GNU Library General Public
11 : : * License as published by the Free Software Foundation; either
12 : : * version 2 of the License, or (at your option) any later version.
13 : : *
14 : : * This library is distributed in the hope that it will be useful,
15 : : * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 : : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 : : * Library General Public License for more details.
18 : : *
19 : : * You should have received a copy of the GNU Library General Public
20 : : * License along with this library; if not, write to the
21 : : * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
22 : : * Boston, MA 02111-1307, USA.
23 : : */
24 : :
25 : : /**
26 : : * SECTION:element-queue
27 : : *
28 : : * Data is queued until one of the limits specified by the
29 : : * #GstQueue:max-size-buffers, #GstQueue:max-size-bytes and/or
30 : : * #GstQueue:max-size-time properties has been reached. Any attempt to push
31 : : * more buffers into the queue will block the pushing thread until more space
32 : : * becomes available.
33 : : *
34 : : * The queue will create a new thread on the source pad to decouple the
35 : : * processing on sink and source pad.
36 : : *
37 : : * You can query how many buffers are queued by reading the
38 : : * #GstQueue:current-level-buffers property. You can track changes
39 : : * by connecting to the notify::current-level-buffers signal (which
40 : : * like all signals will be emitted from the streaming thread). The same
41 : : * applies to the #GstQueue:current-level-time and
42 : : * #GstQueue:current-level-bytes properties.
43 : : *
44 : : * The default queue size limits are 200 buffers, 10MB of data, or
45 : : * one second worth of data, whichever is reached first.
46 : : *
47 : : * As said earlier, the queue blocks by default when one of the specified
48 : : * maximums (bytes, time, buffers) has been reached. You can set the
49 : : * #GstQueue:leaky property to specify that instead of blocking it should
50 : : * leak (drop) new or old buffers.
51 : : *
52 : : * The #GstQueue::underrun signal is emitted when the queue has less data than
53 : : * the specified minimum thresholds require (by default: when the queue is
54 : : * empty). The #GstQueue::overrun signal is emitted when the queue is filled
55 : : * up. Both signals are emitted from the context of the streaming thread.
56 : : */
57 : :
58 : : #include "gst/gst_private.h"
59 : :
60 : : #include <gst/gst.h>
61 : : #include "gstqueue.h"
62 : :
63 : : #include "../../gst/gst-i18n-lib.h"
64 : :
65 : : static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
66 : : GST_PAD_SINK,
67 : : GST_PAD_ALWAYS,
68 : : GST_STATIC_CAPS_ANY);
69 : :
70 : : static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
71 : : GST_PAD_SRC,
72 : : GST_PAD_ALWAYS,
73 : : GST_STATIC_CAPS_ANY);
74 : :
75 : : GST_DEBUG_CATEGORY_STATIC (queue_debug);
76 : : #define GST_CAT_DEFAULT (queue_debug)
77 : : GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
78 : :
79 : : #define STATUS(queue, pad, msg) \
80 : : GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
81 : : "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
82 : : "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
83 : : "-%" G_GUINT64_FORMAT " ns, %u items", \
84 : : GST_DEBUG_PAD_NAME (pad), \
85 : : queue->cur_level.buffers, \
86 : : queue->min_threshold.buffers, \
87 : : queue->max_size.buffers, \
88 : : queue->cur_level.bytes, \
89 : : queue->min_threshold.bytes, \
90 : : queue->max_size.bytes, \
91 : : queue->cur_level.time, \
92 : : queue->min_threshold.time, \
93 : : queue->max_size.time, \
94 : : queue->queue->length)
95 : :
96 : : /* Queue signals and args */
97 : : enum
98 : : {
99 : : SIGNAL_UNDERRUN,
100 : : SIGNAL_RUNNING,
101 : : SIGNAL_OVERRUN,
102 : : SIGNAL_PUSHING,
103 : : LAST_SIGNAL
104 : : };
105 : :
106 : : enum
107 : : {
108 : : PROP_0,
109 : : /* FIXME: don't we have another way of doing this
110 : : * "Gstreamer format" (frame/byte/time) queries? */
111 : : PROP_CUR_LEVEL_BUFFERS,
112 : : PROP_CUR_LEVEL_BYTES,
113 : : PROP_CUR_LEVEL_TIME,
114 : : PROP_MAX_SIZE_BUFFERS,
115 : : PROP_MAX_SIZE_BYTES,
116 : : PROP_MAX_SIZE_TIME,
117 : : PROP_MIN_THRESHOLD_BUFFERS,
118 : : PROP_MIN_THRESHOLD_BYTES,
119 : : PROP_MIN_THRESHOLD_TIME,
120 : : PROP_LEAKY,
121 : : PROP_SILENT
122 : : };
123 : :
124 : : /* default property values */
125 : : #define DEFAULT_MAX_SIZE_BUFFERS 200 /* 200 buffers */
126 : : #define DEFAULT_MAX_SIZE_BYTES (10 * 1024 * 1024) /* 10 MB */
127 : : #define DEFAULT_MAX_SIZE_TIME GST_SECOND /* 1 second */
128 : :
129 : : #define GST_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
130 : : g_mutex_lock (q->qlock); \
131 : : } G_STMT_END
132 : :
133 : : #define GST_QUEUE_MUTEX_LOCK_CHECK(q,label) G_STMT_START { \
134 : : GST_QUEUE_MUTEX_LOCK (q); \
135 : : if (q->srcresult != GST_FLOW_OK) \
136 : : goto label; \
137 : : } G_STMT_END
138 : :
139 : : #define GST_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \
140 : : g_mutex_unlock (q->qlock); \
141 : : } G_STMT_END
142 : :
143 : : #define GST_QUEUE_WAIT_DEL_CHECK(q, label) G_STMT_START { \
144 : : STATUS (q, q->sinkpad, "wait for DEL"); \
145 : : q->waiting_del = TRUE; \
146 : : g_cond_wait (q->item_del, q->qlock); \
147 : : q->waiting_del = FALSE; \
148 : : if (q->srcresult != GST_FLOW_OK) { \
149 : : STATUS (q, q->srcpad, "received DEL wakeup"); \
150 : : goto label; \
151 : : } \
152 : : STATUS (q, q->sinkpad, "received DEL"); \
153 : : } G_STMT_END
154 : :
155 : : #define GST_QUEUE_WAIT_ADD_CHECK(q, label) G_STMT_START { \
156 : : STATUS (q, q->srcpad, "wait for ADD"); \
157 : : q->waiting_add = TRUE; \
158 : : g_cond_wait (q->item_add, q->qlock); \
159 : : q->waiting_add = FALSE; \
160 : : if (q->srcresult != GST_FLOW_OK) { \
161 : : STATUS (q, q->srcpad, "received ADD wakeup"); \
162 : : goto label; \
163 : : } \
164 : : STATUS (q, q->srcpad, "received ADD"); \
165 : : } G_STMT_END
166 : :
167 : : #define GST_QUEUE_SIGNAL_DEL(q) G_STMT_START { \
168 : : if (q->waiting_del) { \
169 : : STATUS (q, q->srcpad, "signal DEL"); \
170 : : g_cond_signal (q->item_del); \
171 : : } \
172 : : } G_STMT_END
173 : :
174 : : #define GST_QUEUE_SIGNAL_ADD(q) G_STMT_START { \
175 : : if (q->waiting_add) { \
176 : : STATUS (q, q->sinkpad, "signal ADD"); \
177 : : g_cond_signal (q->item_add); \
178 : : } \
179 : : } G_STMT_END
180 : :
181 : : #define _do_init(bla) \
182 : : GST_DEBUG_CATEGORY_INIT (queue_debug, "queue", 0, "queue element"); \
183 : : GST_DEBUG_CATEGORY_INIT (queue_dataflow, "queue_dataflow", 0, \
184 : : "dataflow inside the queue element");
185 : :
186 [ + + ][ + - ]: 148096 : GST_BOILERPLATE_FULL (GstQueue, gst_queue, GstElement,
[ + - ]
187 : 148096 : GST_TYPE_ELEMENT, _do_init);
188 : :
189 : : static void gst_queue_finalize (GObject * object);
190 : :
191 : : static void gst_queue_set_property (GObject * object,
192 : : guint prop_id, const GValue * value, GParamSpec * pspec);
193 : : static void gst_queue_get_property (GObject * object,
194 : : guint prop_id, GValue * value, GParamSpec * pspec);
195 : :
196 : : static GstFlowReturn gst_queue_chain (GstPad * pad, GstBuffer * buffer);
197 : : static GstFlowReturn gst_queue_bufferalloc (GstPad * pad, guint64 offset,
198 : : guint size, GstCaps * caps, GstBuffer ** buf);
199 : : static GstFlowReturn gst_queue_push_one (GstQueue * queue);
200 : : static void gst_queue_loop (GstPad * pad);
201 : :
202 : : static gboolean gst_queue_handle_sink_event (GstPad * pad, GstEvent * event);
203 : :
204 : : static gboolean gst_queue_handle_src_event (GstPad * pad, GstEvent * event);
205 : : static gboolean gst_queue_handle_src_query (GstPad * pad, GstQuery * query);
206 : :
207 : : static gboolean gst_queue_acceptcaps (GstPad * pad, GstCaps * caps);
208 : : static GstCaps *gst_queue_getcaps (GstPad * pad);
209 : : static GstPadLinkReturn gst_queue_link_sink (GstPad * pad, GstPad * peer);
210 : : static GstPadLinkReturn gst_queue_link_src (GstPad * pad, GstPad * peer);
211 : : static void gst_queue_locked_flush (GstQueue * queue);
212 : :
213 : : static gboolean gst_queue_src_activate_push (GstPad * pad, gboolean active);
214 : : static gboolean gst_queue_sink_activate_push (GstPad * pad, gboolean active);
215 : :
216 : : static gboolean gst_queue_is_empty (GstQueue * queue);
217 : : static gboolean gst_queue_is_filled (GstQueue * queue);
218 : :
219 : : #define GST_TYPE_QUEUE_LEAKY (queue_leaky_get_type ())
220 : :
221 : : static GType
222 : 23 : queue_leaky_get_type (void)
223 : : {
224 : : static GType queue_leaky_type = 0;
225 : : static const GEnumValue queue_leaky[] = {
226 : : {GST_QUEUE_NO_LEAK, "Not Leaky", "no"},
227 : : {GST_QUEUE_LEAK_UPSTREAM, "Leaky on upstream (new buffers)", "upstream"},
228 : : {GST_QUEUE_LEAK_DOWNSTREAM, "Leaky on downstream (old buffers)",
229 : : "downstream"},
230 : : {0, NULL, NULL},
231 : : };
232 : :
233 [ + - ]: 23 : if (!queue_leaky_type) {
234 : 23 : queue_leaky_type = g_enum_register_static ("GstQueueLeaky", queue_leaky);
235 : : }
236 : 23 : return queue_leaky_type;
237 : : }
238 : :
239 : : static guint gst_queue_signals[LAST_SIGNAL] = { 0 };
240 : :
241 : : static void
242 : 23 : gst_queue_base_init (gpointer g_class)
243 : : {
244 : 23 : GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
245 : :
246 : 23 : gst_element_class_set_details_simple (gstelement_class,
247 : : "Queue",
248 : : "Generic", "Simple data queue", "Erik Walthinsen <omega@cse.ogi.edu>");
249 : 23 : gst_element_class_add_pad_template (gstelement_class,
250 : : gst_static_pad_template_get (&srctemplate));
251 : 23 : gst_element_class_add_pad_template (gstelement_class,
252 : : gst_static_pad_template_get (&sinktemplate));
253 : 23 : }
254 : :
255 : : static void
256 : 23 : gst_queue_class_init (GstQueueClass * klass)
257 : : {
258 : 23 : GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
259 : :
260 : 23 : gobject_class->set_property = gst_queue_set_property;
261 : 23 : gobject_class->get_property = gst_queue_get_property;
262 : :
263 : : /* signals */
264 : : /**
265 : : * GstQueue::underrun:
266 : : * @queue: the queue instance
267 : : *
268 : : * Reports that the buffer became empty (underrun).
269 : : * A buffer is empty if the total amount of data inside it (num-buffers, time,
270 : : * size) is lower than the boundary values which can be set through the
271 : : * GObject properties.
272 : : */
273 : 23 : gst_queue_signals[SIGNAL_UNDERRUN] =
274 : 23 : g_signal_new ("underrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
275 : : G_STRUCT_OFFSET (GstQueueClass, underrun), NULL, NULL,
276 : : g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
277 : : /**
278 : : * GstQueue::running:
279 : : * @queue: the queue instance
280 : : *
281 : : * Reports that enough (min-threshold) data is in the queue. Use this signal
282 : : * together with the underrun signal to pause the pipeline on underrun and
283 : : * wait for the queue to fill-up before resume playback.
284 : : */
285 : 23 : gst_queue_signals[SIGNAL_RUNNING] =
286 : 23 : g_signal_new ("running", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
287 : : G_STRUCT_OFFSET (GstQueueClass, running), NULL, NULL,
288 : : g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
289 : : /**
290 : : * GstQueue::overrun:
291 : : * @queue: the queue instance
292 : : *
293 : : * Reports that the buffer became full (overrun).
294 : : * A buffer is full if the total amount of data inside it (num-buffers, time,
295 : : * size) is higher than the boundary values which can be set through the
296 : : * GObject properties.
297 : : */
298 : 23 : gst_queue_signals[SIGNAL_OVERRUN] =
299 : 23 : g_signal_new ("overrun", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
300 : : G_STRUCT_OFFSET (GstQueueClass, overrun), NULL, NULL,
301 : : g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
302 : : /**
303 : : * GstQueue::pushing:
304 : : * @queue: the queue instance
305 : : *
306 : : * Reports when the queue has enough data to start pushing data again on the
307 : : * source pad.
308 : : */
309 : 23 : gst_queue_signals[SIGNAL_PUSHING] =
310 : 23 : g_signal_new ("pushing", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_FIRST,
311 : : G_STRUCT_OFFSET (GstQueueClass, pushing), NULL, NULL,
312 : : g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
313 : :
314 : : /* properties */
315 : 23 : g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES,
316 : : g_param_spec_uint ("current-level-bytes", "Current level (kB)",
317 : : "Current amount of data in the queue (bytes)",
318 : : 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
319 : 23 : g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BUFFERS,
320 : : g_param_spec_uint ("current-level-buffers", "Current level (buffers)",
321 : : "Current number of buffers in the queue",
322 : : 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
323 : 23 : g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME,
324 : : g_param_spec_uint64 ("current-level-time", "Current level (ns)",
325 : : "Current amount of data in the queue (in ns)",
326 : : 0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
327 : :
328 : 23 : g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BYTES,
329 : : g_param_spec_uint ("max-size-bytes", "Max. size (kB)",
330 : : "Max. amount of data in the queue (bytes, 0=disable)",
331 : : 0, G_MAXUINT, DEFAULT_MAX_SIZE_BYTES,
332 : : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
333 : 23 : g_object_class_install_property (gobject_class, PROP_MAX_SIZE_BUFFERS,
334 : : g_param_spec_uint ("max-size-buffers", "Max. size (buffers)",
335 : : "Max. number of buffers in the queue (0=disable)", 0, G_MAXUINT,
336 : : DEFAULT_MAX_SIZE_BUFFERS,
337 : : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
338 : 23 : g_object_class_install_property (gobject_class, PROP_MAX_SIZE_TIME,
339 : : g_param_spec_uint64 ("max-size-time", "Max. size (ns)",
340 : : "Max. amount of data in the queue (in ns, 0=disable)", 0, G_MAXUINT64,
341 : : DEFAULT_MAX_SIZE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
342 : :
343 : 23 : g_object_class_install_property (gobject_class, PROP_MIN_THRESHOLD_BYTES,
344 : : g_param_spec_uint ("min-threshold-bytes", "Min. threshold (kB)",
345 : : "Min. amount of data in the queue to allow reading (bytes, 0=disable)",
346 : : 0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
347 : 23 : g_object_class_install_property (gobject_class, PROP_MIN_THRESHOLD_BUFFERS,
348 : : g_param_spec_uint ("min-threshold-buffers", "Min. threshold (buffers)",
349 : : "Min. number of buffers in the queue to allow reading (0=disable)",
350 : : 0, G_MAXUINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
351 : 23 : g_object_class_install_property (gobject_class, PROP_MIN_THRESHOLD_TIME,
352 : : g_param_spec_uint64 ("min-threshold-time", "Min. threshold (ns)",
353 : : "Min. amount of data in the queue to allow reading (in ns, 0=disable)",
354 : : 0, G_MAXUINT64, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
355 : :
356 : 23 : g_object_class_install_property (gobject_class, PROP_LEAKY,
357 : : g_param_spec_enum ("leaky", "Leaky",
358 : : "Where the queue leaks, if at all",
359 : : GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK,
360 : : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
361 : :
362 : : /**
363 : : * GstQueue:silent
364 : : *
365 : : * Don't emit queue signals. Makes queues more lightweight if no signals are
366 : : * needed.
367 : : *
368 : : * Since: 0.10.31
369 : : */
370 : 23 : g_object_class_install_property (gobject_class, PROP_SILENT,
371 : : g_param_spec_boolean ("silent", "Silent",
372 : : "Don't emit queue signals", FALSE,
373 : : G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
374 : :
375 : 23 : gobject_class->finalize = gst_queue_finalize;
376 : :
377 : : /* Registering debug symbols for function pointers */
378 : 23 : GST_DEBUG_REGISTER_FUNCPTR (gst_queue_chain);
379 : 23 : GST_DEBUG_REGISTER_FUNCPTR (gst_queue_sink_activate_push);
380 : 23 : GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_sink_event);
381 : 23 : GST_DEBUG_REGISTER_FUNCPTR (gst_queue_link_sink);
382 : 23 : GST_DEBUG_REGISTER_FUNCPTR (gst_queue_getcaps);
383 : 23 : GST_DEBUG_REGISTER_FUNCPTR (gst_queue_acceptcaps);
384 : 23 : GST_DEBUG_REGISTER_FUNCPTR (gst_queue_bufferalloc);
385 : 23 : GST_DEBUG_REGISTER_FUNCPTR (gst_queue_src_activate_push);
386 : 23 : GST_DEBUG_REGISTER_FUNCPTR (gst_queue_link_src);
387 : 23 : GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_src_event);
388 : 23 : GST_DEBUG_REGISTER_FUNCPTR (gst_queue_handle_src_query);
389 : 23 : }
390 : :
391 : : static void
392 : 45 : gst_queue_init (GstQueue * queue, GstQueueClass * g_class)
393 : : {
394 : 45 : queue->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
395 : :
396 : 45 : gst_pad_set_chain_function (queue->sinkpad, gst_queue_chain);
397 : 45 : gst_pad_set_activatepush_function (queue->sinkpad,
398 : : gst_queue_sink_activate_push);
399 : 45 : gst_pad_set_event_function (queue->sinkpad, gst_queue_handle_sink_event);
400 : 45 : gst_pad_set_link_function (queue->sinkpad, gst_queue_link_sink);
401 : 45 : gst_pad_set_getcaps_function (queue->sinkpad, gst_queue_getcaps);
402 : 45 : gst_pad_set_acceptcaps_function (queue->sinkpad, gst_queue_acceptcaps);
403 : 45 : gst_pad_set_bufferalloc_function (queue->sinkpad, gst_queue_bufferalloc);
404 : 45 : gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
405 : :
406 : 45 : queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
407 : :
408 : 45 : gst_pad_set_activatepush_function (queue->srcpad,
409 : : gst_queue_src_activate_push);
410 : 45 : gst_pad_set_link_function (queue->srcpad, gst_queue_link_src);
411 : 45 : gst_pad_set_acceptcaps_function (queue->srcpad, gst_queue_acceptcaps);
412 : 45 : gst_pad_set_getcaps_function (queue->srcpad, gst_queue_getcaps);
413 : 45 : gst_pad_set_event_function (queue->srcpad, gst_queue_handle_src_event);
414 : 45 : gst_pad_set_query_function (queue->srcpad, gst_queue_handle_src_query);
415 : 45 : gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
416 : :
417 : 45 : GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
418 : 45 : queue->max_size.buffers = DEFAULT_MAX_SIZE_BUFFERS;
419 : 45 : queue->max_size.bytes = DEFAULT_MAX_SIZE_BYTES;
420 : 45 : queue->max_size.time = DEFAULT_MAX_SIZE_TIME;
421 : 45 : GST_QUEUE_CLEAR_LEVEL (queue->min_threshold);
422 : 45 : GST_QUEUE_CLEAR_LEVEL (queue->orig_min_threshold);
423 : 45 : gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
424 : 45 : gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
425 : 45 : queue->head_needs_discont = queue->tail_needs_discont = FALSE;
426 : :
427 : 45 : queue->leaky = GST_QUEUE_NO_LEAK;
428 : 45 : queue->srcresult = GST_FLOW_WRONG_STATE;
429 : :
430 : 45 : queue->qlock = g_mutex_new ();
431 : 45 : queue->item_add = g_cond_new ();
432 : 45 : queue->item_del = g_cond_new ();
433 : 45 : queue->queue = g_queue_new ();
434 : :
435 : 45 : queue->sinktime = GST_CLOCK_TIME_NONE;
436 : 45 : queue->srctime = GST_CLOCK_TIME_NONE;
437 : :
438 : 45 : queue->sink_tainted = TRUE;
439 : 45 : queue->src_tainted = TRUE;
440 : :
441 : 45 : queue->newseg_applied_to_src = FALSE;
442 : :
443 [ - + ]: 45 : GST_DEBUG_OBJECT (queue,
444 : : "initialized queue's not_empty & not_full conditions");
445 : 45 : }
446 : :
447 : : /* called only once, as opposed to dispose */
448 : : static void
449 : 43 : gst_queue_finalize (GObject * object)
450 : : {
451 : 43 : GstQueue *queue = GST_QUEUE (object);
452 : :
453 [ - + ]: 43 : GST_DEBUG_OBJECT (queue, "finalizing queue");
454 : :
455 [ - + ]: 43 : while (!g_queue_is_empty (queue->queue)) {
456 : 0 : GstMiniObject *data = g_queue_pop_head (queue->queue);
457 : :
458 : 0 : gst_mini_object_unref (data);
459 : : }
460 : 43 : g_queue_free (queue->queue);
461 : 43 : g_mutex_free (queue->qlock);
462 : 43 : g_cond_free (queue->item_add);
463 : 43 : g_cond_free (queue->item_del);
464 : :
465 : 43 : G_OBJECT_CLASS (parent_class)->finalize (object);
466 : 43 : }
467 : :
468 : : static gboolean
469 : 0 : gst_queue_acceptcaps (GstPad * pad, GstCaps * caps)
470 : : {
471 : : gboolean result;
472 : : GstQueue *queue;
473 : : GstPad *otherpad;
474 : :
475 : 0 : queue = GST_QUEUE (GST_PAD_PARENT (pad));
476 : :
477 [ # # ]: 0 : otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
478 : 0 : result = gst_pad_peer_accept_caps (otherpad, caps);
479 : :
480 : 0 : return result;
481 : : }
482 : :
483 : : static GstCaps *
484 : 152 : gst_queue_getcaps (GstPad * pad)
485 : : {
486 : : GstQueue *queue;
487 : : GstPad *otherpad;
488 : : GstCaps *result;
489 : :
490 : 152 : queue = GST_QUEUE (GST_PAD_PARENT (pad));
491 : :
492 [ + + ]: 152 : otherpad = (pad == queue->srcpad ? queue->sinkpad : queue->srcpad);
493 : 152 : result = gst_pad_peer_get_caps (otherpad);
494 [ + + ]: 152 : if (result == NULL)
495 : 79 : result = gst_caps_new_any ();
496 : :
497 : 152 : return result;
498 : : }
499 : :
500 : : static GstPadLinkReturn
501 : 42 : gst_queue_link_sink (GstPad * pad, GstPad * peer)
502 : : {
503 : 42 : return GST_PAD_LINK_OK;
504 : : }
505 : :
506 : : static GstPadLinkReturn
507 : 41 : gst_queue_link_src (GstPad * pad, GstPad * peer)
508 : : {
509 : 41 : GstPadLinkReturn result = GST_PAD_LINK_OK;
510 : : GstQueue *queue;
511 : :
512 : 41 : queue = GST_QUEUE (gst_pad_get_parent (pad));
513 : :
514 [ - + ]: 41 : GST_DEBUG_OBJECT (queue, "queue linking source pad");
515 : :
516 [ - + ]: 41 : if (GST_PAD_LINKFUNC (peer)) {
517 : 0 : result = GST_PAD_LINKFUNC (peer) (peer, pad);
518 : : }
519 : :
520 [ + - ]: 41 : if (GST_PAD_LINK_SUCCESSFUL (result)) {
521 : 41 : GST_QUEUE_MUTEX_LOCK (queue);
522 [ + + ]: 41 : if (queue->srcresult == GST_FLOW_OK) {
523 : 5 : queue->push_newsegment = TRUE;
524 : 5 : gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
525 [ - + ]: 5 : GST_DEBUG_OBJECT (queue, "starting task as pad is linked");
526 : : } else {
527 [ - + ]: 36 : GST_DEBUG_OBJECT (queue, "not starting task reason %s",
528 : : gst_flow_get_name (queue->srcresult));
529 : : }
530 : 41 : GST_QUEUE_MUTEX_UNLOCK (queue);
531 : : }
532 : 41 : gst_object_unref (queue);
533 : :
534 : 41 : return result;
535 : : }
536 : :
537 : : static GstFlowReturn
538 : 0 : gst_queue_bufferalloc (GstPad * pad, guint64 offset, guint size, GstCaps * caps,
539 : : GstBuffer ** buf)
540 : : {
541 : : GstQueue *queue;
542 : : GstFlowReturn result;
543 : :
544 : 0 : queue = GST_QUEUE (GST_PAD_PARENT (pad));
545 : :
546 : : /* Forward to src pad, without setting caps on the src pad */
547 : 0 : result = gst_pad_alloc_buffer (queue->srcpad, offset, size, caps, buf);
548 : :
549 : 0 : return result;
550 : : }
551 : :
552 : : /* calculate the diff between running time on the sink and src of the queue.
553 : : * This is the total amount of time in the queue. */
554 : : static void
555 : 305260 : update_time_level (GstQueue * queue)
556 : : {
557 : : gint64 sink_time, src_time;
558 : :
559 [ + + ]: 305260 : if (queue->sink_tainted) {
560 : 152745 : queue->sinktime =
561 : 152750 : gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
562 : : queue->sink_segment.last_stop);
563 : 152745 : queue->sink_tainted = FALSE;
564 : : }
565 : 305255 : sink_time = queue->sinktime;
566 : :
567 [ + + ]: 305255 : if (queue->src_tainted) {
568 : 152653 : queue->srctime =
569 : 152667 : gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
570 : : queue->src_segment.last_stop);
571 : 152653 : queue->src_tainted = FALSE;
572 : : }
573 : 305241 : src_time = queue->srctime;
574 : :
575 [ - + ][ # # ]: 305241 : GST_LOG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
576 : : GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
577 : :
578 [ + - ]: 305311 : if (sink_time >= src_time)
579 : 305311 : queue->cur_level.time = sink_time - src_time;
580 : : else
581 : 0 : queue->cur_level.time = 0;
582 : 305311 : }
583 : :
584 : : /* take a NEWSEGMENT event and apply the values to segment, updating the time
585 : : * level of queue. */
586 : : static void
587 : 75 : apply_segment (GstQueue * queue, GstEvent * event, GstSegment * segment,
588 : : gboolean sink)
589 : : {
590 : : gboolean update;
591 : : GstFormat format;
592 : : gdouble rate, arate;
593 : : gint64 start, stop, time;
594 : :
595 : 75 : gst_event_parse_new_segment_full (event, &update, &rate, &arate,
596 : : &format, &start, &stop, &time);
597 : :
598 : : /* now configure the values, we use these to track timestamps on the
599 : : * sinkpad. */
600 [ + + ]: 75 : if (format != GST_FORMAT_TIME) {
601 : : /* non-time format, pretent the current time segment is closed with a
602 : : * 0 start and unknown stop time. */
603 : 66 : update = FALSE;
604 : 66 : format = GST_FORMAT_TIME;
605 : 66 : start = 0;
606 : 66 : stop = -1;
607 : 66 : time = 0;
608 : : }
609 : 75 : gst_segment_set_newsegment_full (segment, update,
610 : : rate, arate, format, start, stop, time);
611 : :
612 [ + + ]: 75 : if (sink)
613 : 38 : queue->sink_tainted = TRUE;
614 : : else
615 : 37 : queue->src_tainted = TRUE;
616 : :
617 [ - + ]: 75 : GST_DEBUG_OBJECT (queue,
618 : : "configured NEWSEGMENT %" GST_SEGMENT_FORMAT, segment);
619 : :
620 : : /* segment can update the time level of the queue */
621 : 75 : update_time_level (queue);
622 : 75 : }
623 : :
624 : : /* take a buffer and update segment, updating the time level of the queue. */
625 : : static void
626 : 305220 : apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
627 : : gboolean with_duration, gboolean sink)
628 : : {
629 : : GstClockTime duration, timestamp;
630 : :
631 : 305220 : timestamp = GST_BUFFER_TIMESTAMP (buffer);
632 : 305220 : duration = GST_BUFFER_DURATION (buffer);
633 : :
634 : : /* if no timestamp is set, assume it's continuous with the previous
635 : : * time */
636 [ + + ]: 305220 : if (timestamp == GST_CLOCK_TIME_NONE)
637 : 20 : timestamp = segment->last_stop;
638 : :
639 : : /* add duration */
640 [ + + ][ + + ]: 305220 : if (with_duration && duration != GST_CLOCK_TIME_NONE)
641 : 10533 : timestamp += duration;
642 : :
643 [ - + ][ # # ]: 305220 : GST_LOG_OBJECT (queue, "last_stop updated to %" GST_TIME_FORMAT,
[ # # ][ # # ]
[ # # ]
644 : : GST_TIME_ARGS (timestamp));
645 : :
646 : 305220 : gst_segment_set_last_stop (segment, GST_FORMAT_TIME, timestamp);
647 [ + + ]: 305294 : if (sink)
648 : 152705 : queue->sink_tainted = TRUE;
649 : : else
650 : 152589 : queue->src_tainted = TRUE;
651 : :
652 : :
653 : : /* calc diff with other end */
654 : 305294 : update_time_level (queue);
655 : 305241 : }
656 : :
657 : : static void
658 : 47 : gst_queue_locked_flush (GstQueue * queue)
659 : : {
660 [ + + ]: 176 : while (!g_queue_is_empty (queue->queue)) {
661 : 129 : GstMiniObject *data = g_queue_pop_head (queue->queue);
662 : :
663 : : /* Then lose another reference because we are supposed to destroy that
664 : : data when flushing */
665 : 129 : gst_mini_object_unref (data);
666 : : }
667 : 47 : GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
668 : 47 : queue->min_threshold.buffers = queue->orig_min_threshold.buffers;
669 : 47 : queue->min_threshold.bytes = queue->orig_min_threshold.bytes;
670 : 47 : queue->min_threshold.time = queue->orig_min_threshold.time;
671 : 47 : gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
672 : 47 : gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
673 : 47 : queue->head_needs_discont = queue->tail_needs_discont = FALSE;
674 : :
675 : 47 : queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
676 : 47 : queue->sink_tainted = queue->src_tainted = TRUE;
677 : :
678 : : /* we deleted a lot of something */
679 [ + + ][ - + ]: 47 : GST_QUEUE_SIGNAL_DEL (queue);
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
680 : 47 : }
681 : :
682 : : /* enqueue an item an update the level stats, with QUEUE_LOCK */
683 : : static inline void
684 : 152691 : gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item)
685 : : {
686 : 152691 : GstBuffer *buffer = GST_BUFFER_CAST (item);
687 : :
688 : : /* add buffer to the statistics */
689 : 152691 : queue->cur_level.buffers++;
690 : 152691 : queue->cur_level.bytes += GST_BUFFER_SIZE (buffer);
691 : 152691 : apply_buffer (queue, buffer, &queue->sink_segment, TRUE, TRUE);
692 : :
693 : 152712 : g_queue_push_tail (queue->queue, item);
694 [ + + ][ - + ]: 152700 : GST_QUEUE_SIGNAL_ADD (queue);
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
695 : 152701 : }
696 : :
697 : : static inline void
698 : 59 : gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item)
699 : : {
700 : 59 : GstEvent *event = GST_EVENT_CAST (item);
701 : :
702 [ + + + ]: 59 : switch (GST_EVENT_TYPE (event)) {
703 : : case GST_EVENT_EOS:
704 : : /* Zero the thresholds, this makes sure the queue is completely
705 : : * filled and we can read all data from the queue. */
706 : 19 : GST_QUEUE_CLEAR_LEVEL (queue->min_threshold);
707 : : /* mark the queue as EOS. This prevents us from accepting more data. */
708 [ - + ]: 19 : GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from upstream");
709 : 19 : queue->eos = TRUE;
710 : 19 : break;
711 : : case GST_EVENT_NEWSEGMENT:
712 : 38 : apply_segment (queue, event, &queue->sink_segment, TRUE);
713 : : /* if the queue is empty, apply sink segment on the source */
714 [ + + ]: 38 : if (queue->queue->length == 0) {
715 [ - + ]: 35 : GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Apply segment on srcpad");
716 : 35 : apply_segment (queue, event, &queue->src_segment, FALSE);
717 : 35 : queue->newseg_applied_to_src = TRUE;
718 : : }
719 : : /* a new segment allows us to accept more buffers if we got UNEXPECTED
720 : : * from downstream */
721 : 38 : queue->unexpected = FALSE;
722 : 38 : break;
723 : : default:
724 : 2 : break;
725 : : }
726 : :
727 : 59 : g_queue_push_tail (queue->queue, item);
728 [ + + ][ - + ]: 59 : GST_QUEUE_SIGNAL_ADD (queue);
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
729 : 59 : }
730 : :
731 : : /* dequeue an item from the queue and update level stats, with QUEUE_LOCK */
732 : : static GstMiniObject *
733 : 152643 : gst_queue_locked_dequeue (GstQueue * queue, gboolean * is_buffer)
734 : : {
735 : : GstMiniObject *item;
736 : :
737 : 152643 : item = g_queue_pop_head (queue->queue);
738 [ - + ]: 152640 : if (item == NULL)
739 : 0 : goto no_item;
740 : :
741 [ - + ][ + ]: 152640 : if (GST_IS_BUFFER (item)) {
[ + + ][ + + ]
742 : 152586 : GstBuffer *buffer = GST_BUFFER_CAST (item);
743 : :
744 [ - + ]: 152586 : GST_CAT_LOG_OBJECT (queue_dataflow, queue,
745 : : "retrieved buffer %p from queue", buffer);
746 : :
747 : 152586 : queue->cur_level.buffers--;
748 : 152586 : queue->cur_level.bytes -= GST_BUFFER_SIZE (buffer);
749 : 152586 : apply_buffer (queue, buffer, &queue->src_segment, TRUE, FALSE);
750 : :
751 : : /* if the queue is empty now, update the other side */
752 [ + + ]: 152585 : if (queue->cur_level.buffers == 0)
753 : 88068 : queue->cur_level.time = 0;
754 : :
755 : 152585 : *is_buffer = TRUE;
756 [ - + ][ + - ]: 55 : } else if (GST_IS_EVENT (item)) {
[ + - ][ + - ]
757 : 55 : GstEvent *event = GST_EVENT_CAST (item);
758 : :
759 [ - + ]: 55 : GST_CAT_LOG_OBJECT (queue_dataflow, queue,
760 : : "retrieved event %p from queue", event);
761 : :
762 [ + + + ]: 55 : switch (GST_EVENT_TYPE (event)) {
763 : : case GST_EVENT_EOS:
764 : : /* queue is empty now that we dequeued the EOS */
765 : 17 : GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
766 : 17 : break;
767 : : case GST_EVENT_NEWSEGMENT:
768 : : /* apply newsegment if it has not already been applied */
769 [ + + ]: 36 : if (G_LIKELY (!queue->newseg_applied_to_src)) {
770 : 2 : apply_segment (queue, event, &queue->src_segment, FALSE);
771 : : } else {
772 : 34 : queue->newseg_applied_to_src = FALSE;
773 : : }
774 : 36 : break;
775 : : default:
776 : 2 : break;
777 : : }
778 : :
779 : 55 : *is_buffer = FALSE;
780 : : } else {
781 : 0 : g_warning
782 : : ("Unexpected item %p dequeued from queue %s (refcounting problem?)",
783 : : item, GST_OBJECT_NAME (queue));
784 : 0 : item = NULL;
785 : : }
786 [ + + ][ - + ]: 152640 : GST_QUEUE_SIGNAL_DEL (queue);
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
787 : :
788 : 152633 : return item;
789 : :
790 : : /* ERRORS */
791 : : no_item:
792 : : {
793 [ # # ]: 0 : GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "the queue is empty");
794 : 152633 : return NULL;
795 : : }
796 : : }
797 : :
798 : : static gboolean
799 : 147392 : gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
800 : : {
801 : : GstQueue *queue;
802 : :
803 : 147392 : queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
804 : :
805 [ - - + ]: 147392 : switch (GST_EVENT_TYPE (event)) {
806 : : case GST_EVENT_FLUSH_START:
807 : : {
808 [ # # ][ # # ]: 0 : STATUS (queue, pad, "received flush start event");
[ # # ][ # # ]
[ # # ][ # # ]
809 : : /* forward event */
810 : 0 : gst_pad_push_event (queue->srcpad, event);
811 : :
812 : : /* now unblock the chain function */
813 : 0 : GST_QUEUE_MUTEX_LOCK (queue);
814 : 0 : queue->srcresult = GST_FLOW_WRONG_STATE;
815 : : /* unblock the loop and chain functions */
816 [ # # ][ # # ]: 0 : GST_QUEUE_SIGNAL_ADD (queue);
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
817 [ # # ][ # # ]: 0 : GST_QUEUE_SIGNAL_DEL (queue);
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
818 : 0 : GST_QUEUE_MUTEX_UNLOCK (queue);
819 : :
820 : : /* make sure it pauses, this should happen since we sent
821 : : * flush_start downstream. */
822 : 0 : gst_pad_pause_task (queue->srcpad);
823 [ # # ]: 0 : GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
824 : 0 : goto done;
825 : : }
826 : : case GST_EVENT_FLUSH_STOP:
827 : : {
828 [ # # ][ # # ]: 0 : STATUS (queue, pad, "received flush stop event");
[ # # ][ # # ]
[ # # ][ # # ]
829 : : /* forward event */
830 : 0 : gst_pad_push_event (queue->srcpad, event);
831 : :
832 : 0 : GST_QUEUE_MUTEX_LOCK (queue);
833 : 0 : gst_queue_locked_flush (queue);
834 : 0 : queue->srcresult = GST_FLOW_OK;
835 : 0 : queue->eos = FALSE;
836 : 0 : queue->unexpected = FALSE;
837 [ # # ]: 0 : if (gst_pad_is_linked (queue->srcpad)) {
838 : 0 : gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
839 : 0 : queue->srcpad);
840 : : } else {
841 [ # # ]: 0 : GST_INFO_OBJECT (queue, "not re-starting task as pad is not linked");
842 : : }
843 : 0 : GST_QUEUE_MUTEX_UNLOCK (queue);
844 : :
845 [ # # ][ # # ]: 0 : STATUS (queue, pad, "after flush");
[ # # ][ # # ]
[ # # ][ # # ]
846 : 0 : goto done;
847 : : }
848 : : default:
849 [ + + ]: 147392 : if (GST_EVENT_IS_SERIALIZED (event)) {
850 : : /* serialized events go in the queue */
851 [ + - ]: 59 : GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
852 : : /* refuse more events on EOS */
853 [ - + ]: 59 : if (queue->eos)
854 : 0 : goto out_eos;
855 : 59 : gst_queue_locked_enqueue_event (queue, event);
856 : 59 : GST_QUEUE_MUTEX_UNLOCK (queue);
857 : : } else {
858 : : /* non-serialized events are passed upstream. */
859 : 147333 : gst_pad_push_event (queue->srcpad, event);
860 : : }
861 : 147393 : break;
862 : : }
863 : : done:
864 : 147393 : return TRUE;
865 : :
866 : : /* ERRORS */
867 : : out_flushing:
868 : : {
869 [ # # ]: 0 : GST_CAT_LOG_OBJECT (queue_dataflow, queue,
870 : : "refusing event, we are flushing");
871 : 0 : GST_QUEUE_MUTEX_UNLOCK (queue);
872 : 0 : gst_event_unref (event);
873 : 0 : return FALSE;
874 : : }
875 : : out_eos:
876 : : {
877 [ # # ]: 0 : GST_CAT_LOG_OBJECT (queue_dataflow, queue, "refusing event, we are EOS");
878 : 0 : GST_QUEUE_MUTEX_UNLOCK (queue);
879 : 0 : gst_event_unref (event);
880 : 147393 : return FALSE;
881 : : }
882 : : }
883 : :
884 : : static gboolean
885 : 406777 : gst_queue_is_empty (GstQueue * queue)
886 : : {
887 [ + + ]: 406777 : if (queue->queue->length == 0)
888 : 169424 : return TRUE;
889 : :
890 : : /* It is possible that a max size is reached before all min thresholds are.
891 : : * Therefore, only consider it empty if it is not filled. */
892 [ - + ][ # # ]: 644142 : return ((queue->min_threshold.buffers > 0 &&
893 [ - + ]: 237365 : queue->cur_level.buffers < queue->min_threshold.buffers) ||
894 [ # # ]: 0 : (queue->min_threshold.bytes > 0 &&
895 [ - + ]: 237365 : queue->cur_level.bytes < queue->min_threshold.bytes) ||
896 [ # # ]: 0 : (queue->min_threshold.time > 0 &&
897 [ # # ]: 0 : queue->cur_level.time < queue->min_threshold.time)) &&
898 : 0 : !gst_queue_is_filled (queue);
899 : : }
900 : :
901 : : static gboolean
902 : 334865 : gst_queue_is_filled (GstQueue * queue)
903 : : {
904 [ + + ][ + + ]: 548696 : return (((queue->max_size.buffers > 0 &&
905 [ + + ]: 213831 : queue->cur_level.buffers >= queue->max_size.buffers) ||
906 [ + ]: 212867 : (queue->max_size.bytes > 0 &&
907 [ + + ]: 213836 : queue->cur_level.bytes >= queue->max_size.bytes) ||
908 [ + + ]: 213831 : (queue->max_size.time > 0 &&
909 : 213831 : queue->cur_level.time >= queue->max_size.time)));
910 : : }
911 : :
912 : : static void
913 : 2 : gst_queue_leak_downstream (GstQueue * queue)
914 : : {
915 : : /* for as long as the queue is filled, dequeue an item and discard it */
916 [ + + ]: 7 : while (gst_queue_is_filled (queue)) {
917 : : GstMiniObject *leak;
918 : : gboolean is_buffer;
919 : :
920 : 5 : leak = gst_queue_locked_dequeue (queue, &is_buffer);
921 : : /* there is nothing to dequeue and the queue is still filled.. This should
922 : : * not happen */
923 [ - + ]: 5 : g_assert (leak != NULL);
924 : :
925 [ - + ]: 5 : GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
926 : : "queue is full, leaking item %p on downstream end", leak);
927 : 5 : gst_mini_object_unref (leak);
928 : :
929 : : /* last buffer needs to get a DISCONT flag */
930 : 5 : queue->head_needs_discont = TRUE;
931 : : }
932 : 2 : }
933 : :
934 : : static GstFlowReturn
935 : 152726 : gst_queue_chain (GstPad * pad, GstBuffer * buffer)
936 : : {
937 : : GstQueue *queue;
938 : : GstClockTime duration, timestamp;
939 : :
940 : 152726 : queue = (GstQueue *) GST_OBJECT_PARENT (pad);
941 : :
942 : : /* we have to lock the queue since we span threads */
943 [ + + ]: 152726 : GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
944 : : /* when we received EOS, we refuse any more data */
945 [ - + ]: 152724 : if (queue->eos)
946 : 0 : goto out_eos;
947 [ - + ]: 152724 : if (queue->unexpected)
948 : 0 : goto out_unexpected;
949 : :
950 : 152724 : timestamp = GST_BUFFER_TIMESTAMP (buffer);
951 : 152724 : duration = GST_BUFFER_DURATION (buffer);
952 : :
953 [ - + ][ # # ]: 152724 : GST_CAT_LOG_OBJECT (queue_dataflow, queue,
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
954 : : "received buffer %p of size %d, time %" GST_TIME_FORMAT ", duration %"
955 : : GST_TIME_FORMAT, buffer, GST_BUFFER_SIZE (buffer),
956 : : GST_TIME_ARGS (timestamp), GST_TIME_ARGS (duration));
957 : :
958 : : /* We make space available if we're "full" according to whatever
959 : : * the user defined as "full". Note that this only applies to buffers.
960 : : * We always handle events and they don't count in our statistics. */
961 [ + + ]: 213426 : while (gst_queue_is_filled (queue)) {
962 [ + - ]: 60740 : if (!queue->silent) {
963 : 60740 : GST_QUEUE_MUTEX_UNLOCK (queue);
964 : 60740 : g_signal_emit (queue, gst_queue_signals[SIGNAL_OVERRUN], 0);
965 [ + - ]: 60740 : GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
966 : : /* we recheck, the signal could have changed the thresholds */
967 [ + + ]: 60740 : if (!gst_queue_is_filled (queue))
968 : 31 : break;
969 : : }
970 : :
971 : : /* how are we going to make space for this buffer? */
972 [ + + - + ]: 60709 : switch (queue->leaky) {
973 : : case GST_QUEUE_LEAK_UPSTREAM:
974 : : /* next buffer needs to get a DISCONT flag */
975 : 1 : queue->tail_needs_discont = TRUE;
976 : : /* leak current buffer */
977 [ - + ]: 1 : GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
978 : : "queue is full, leaking buffer on upstream end");
979 : : /* now we can clean up and exit right away */
980 : 1 : goto out_unref;
981 : : case GST_QUEUE_LEAK_DOWNSTREAM:
982 : 2 : gst_queue_leak_downstream (queue);
983 : 2 : break;
984 : : default:
985 : 0 : g_warning ("Unknown leaky type, using default");
986 : : /* fall-through */
987 : : case GST_QUEUE_NO_LEAK:
988 : : {
989 [ - + ]: 60700 : GST_CAT_DEBUG_OBJECT (queue_dataflow, queue,
990 : : "queue is full, waiting for free space");
991 : :
992 : : /* don't leak. Instead, wait for space to be available */
993 : : do {
994 : : /* for as long as the queue is filled, wait till an item was deleted. */
995 [ - + ][ # # ]: 60701 : GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing);
[ # # ][ # # ]
[ # # ][ # # ]
[ + + ][ - + ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ - + ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
996 [ + + ]: 60700 : } while (gst_queue_is_filled (queue));
997 : :
998 [ - + ]: 60700 : GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is not full");
999 : :
1000 [ + - ]: 60700 : if (!queue->silent) {
1001 : 60700 : GST_QUEUE_MUTEX_UNLOCK (queue);
1002 : 60697 : g_signal_emit (queue, gst_queue_signals[SIGNAL_RUNNING], 0);
1003 [ + - ]: 60700 : GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1004 : : }
1005 : 60700 : break;
1006 : : }
1007 : : }
1008 : : }
1009 : :
1010 [ - + ]: 152669 : if (queue->tail_needs_discont) {
1011 : 0 : GstBuffer *subbuffer = gst_buffer_make_metadata_writable (buffer);
1012 : :
1013 [ # # ]: 0 : if (subbuffer) {
1014 : 0 : buffer = subbuffer;
1015 : 0 : GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1016 : : } else {
1017 [ # # ]: 0 : GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
1018 : : }
1019 : 0 : queue->tail_needs_discont = FALSE;
1020 : : }
1021 : :
1022 : : /* put buffer in queue now */
1023 : 152669 : gst_queue_locked_enqueue_buffer (queue, buffer);
1024 : 152697 : GST_QUEUE_MUTEX_UNLOCK (queue);
1025 : :
1026 : 152709 : return GST_FLOW_OK;
1027 : :
1028 : : /* special conditions */
1029 : : out_unref:
1030 : : {
1031 : 1 : GST_QUEUE_MUTEX_UNLOCK (queue);
1032 : :
1033 : 1 : gst_buffer_unref (buffer);
1034 : :
1035 : 1 : return GST_FLOW_OK;
1036 : : }
1037 : : out_flushing:
1038 : : {
1039 : 9 : GstFlowReturn ret = queue->srcresult;
1040 : :
1041 [ - + ]: 9 : GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1042 : : "exit because task paused, reason: %s", gst_flow_get_name (ret));
1043 : 9 : GST_QUEUE_MUTEX_UNLOCK (queue);
1044 : 9 : gst_buffer_unref (buffer);
1045 : :
1046 : 9 : return ret;
1047 : : }
1048 : : out_eos:
1049 : : {
1050 [ # # ]: 0 : GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
1051 : 0 : GST_QUEUE_MUTEX_UNLOCK (queue);
1052 : :
1053 : 0 : gst_buffer_unref (buffer);
1054 : :
1055 : 0 : return GST_FLOW_UNEXPECTED;
1056 : : }
1057 : : out_unexpected:
1058 : : {
1059 [ # # ]: 0 : GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1060 : : "exit because we received UNEXPECTED");
1061 : 0 : GST_QUEUE_MUTEX_UNLOCK (queue);
1062 : :
1063 : 0 : gst_buffer_unref (buffer);
1064 : :
1065 : 152719 : return GST_FLOW_UNEXPECTED;
1066 : : }
1067 : : }
1068 : :
1069 : : static void
1070 : 5 : gst_queue_push_newsegment (GstQueue * queue)
1071 : : {
1072 : : GstSegment *s;
1073 : : GstEvent *event;
1074 : :
1075 : 5 : s = &queue->src_segment;
1076 : :
1077 [ + + ]: 5 : if (s->accum != 0) {
1078 : 1 : event = gst_event_new_new_segment_full (FALSE, 1.0, 1.0, s->format, 0,
1079 : : s->accum, 0);
1080 [ - + ]: 1 : GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1081 : : "pushing accum newsegment event");
1082 : 1 : gst_pad_push_event (queue->srcpad, event);
1083 : : }
1084 : :
1085 : 5 : event = gst_event_new_new_segment_full (FALSE, s->rate, s->applied_rate,
1086 : : s->format, s->start, s->stop, s->time);
1087 [ - + ]: 5 : GST_CAT_LOG_OBJECT (queue_dataflow, queue, "pushing real newsegment event");
1088 : 5 : gst_pad_push_event (queue->srcpad, event);
1089 : 5 : }
1090 : :
1091 : : /* dequeue an item from the queue an push it downstream. This functions returns
1092 : : * the result of the push. */
1093 : : static GstFlowReturn
1094 : 152635 : gst_queue_push_one (GstQueue * queue)
1095 : : {
1096 : 152635 : GstFlowReturn result = GST_FLOW_OK;
1097 : : GstMiniObject *data;
1098 : : gboolean is_buffer;
1099 : :
1100 : 152635 : data = gst_queue_locked_dequeue (queue, &is_buffer);
1101 [ - + ]: 152632 : if (data == NULL)
1102 : 0 : goto no_item;
1103 : :
1104 : : next:
1105 [ + + ]: 152632 : if (is_buffer) {
1106 : : GstBuffer *buffer;
1107 : : GstCaps *caps;
1108 : :
1109 : 152580 : buffer = GST_BUFFER_CAST (data);
1110 : :
1111 [ + + ]: 152580 : if (queue->head_needs_discont) {
1112 : 2 : GstBuffer *subbuffer = gst_buffer_make_metadata_writable (buffer);
1113 : :
1114 [ + - ]: 2 : if (subbuffer) {
1115 : 2 : buffer = subbuffer;
1116 : 2 : GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DISCONT);
1117 : : } else {
1118 [ # # ]: 0 : GST_DEBUG_OBJECT (queue, "Could not mark buffer as DISCONT");
1119 : : }
1120 : 2 : queue->head_needs_discont = FALSE;
1121 : : }
1122 : :
1123 : 152580 : caps = GST_BUFFER_CAPS (buffer);
1124 : :
1125 : 152580 : GST_QUEUE_MUTEX_UNLOCK (queue);
1126 : : /* set the right caps on the pad now. We do this before pushing the buffer
1127 : : * because the pad_push call will check (using acceptcaps) if the buffer can
1128 : : * be set on the pad, which might fail because this will be propagated
1129 : : * upstream. Also note that if the buffer has NULL caps, it means that the
1130 : : * caps did not change, so we don't have to change caps on the pad. */
1131 [ + + ][ + - ]: 152584 : if (caps && caps != GST_PAD_CAPS (queue->srcpad))
1132 : 5001 : gst_pad_set_caps (queue->srcpad, caps);
1133 : :
1134 [ + + ]: 152587 : if (queue->push_newsegment) {
1135 : 5 : gst_queue_push_newsegment (queue);
1136 : : }
1137 : 152587 : result = gst_pad_push (queue->srcpad, buffer);
1138 : :
1139 : : /* need to check for srcresult here as well */
1140 [ + + ]: 152590 : GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1141 : :
1142 [ - + ]: 152584 : if (result == GST_FLOW_UNEXPECTED) {
1143 [ # # ]: 0 : GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1144 : : "got UNEXPECTED from downstream");
1145 : : /* stop pushing buffers, we dequeue all items until we see an item that we
1146 : : * can push again, which is EOS or NEWSEGMENT. If there is nothing in the
1147 : : * queue we can push, we set a flag to make the sinkpad refuse more
1148 : : * buffers with an UNEXPECTED return value. */
1149 [ # # ]: 0 : while ((data = gst_queue_locked_dequeue (queue, &is_buffer))) {
1150 [ # # ]: 0 : if (is_buffer) {
1151 [ # # ]: 0 : GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1152 : : "dropping UNEXPECTED buffer %p", data);
1153 : 0 : gst_buffer_unref (GST_BUFFER_CAST (data));
1154 : : } else {
1155 : 0 : GstEvent *event = GST_EVENT_CAST (data);
1156 : 0 : GstEventType type = GST_EVENT_TYPE (event);
1157 : :
1158 [ # # ][ # # ]: 0 : if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) {
1159 : : /* we found a pushable item in the queue, push it out */
1160 [ # # ]: 0 : GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1161 : : "pushing pushable event %s after UNEXPECTED",
1162 : : GST_EVENT_TYPE_NAME (event));
1163 : 0 : goto next;
1164 : : }
1165 [ # # ]: 0 : GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1166 : : "dropping UNEXPECTED event %p", event);
1167 : 0 : gst_event_unref (event);
1168 : : }
1169 : : }
1170 : : /* no more items in the queue. Set the unexpected flag so that upstream
1171 : : * make us refuse any more buffers on the sinkpad. Since we will still
1172 : : * accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the
1173 : : * task function does not shut down. */
1174 : 0 : queue->unexpected = TRUE;
1175 : 0 : result = GST_FLOW_OK;
1176 : : }
1177 : : } else {
1178 : 52 : GstEvent *event = GST_EVENT_CAST (data);
1179 : 52 : GstEventType type = GST_EVENT_TYPE (event);
1180 : :
1181 : 52 : GST_QUEUE_MUTEX_UNLOCK (queue);
1182 : :
1183 [ - + ][ # # ]: 52 : if (queue->push_newsegment && type != GST_EVENT_NEWSEGMENT) {
1184 : 0 : gst_queue_push_newsegment (queue);
1185 : : }
1186 : 52 : gst_pad_push_event (queue->srcpad, event);
1187 : :
1188 [ + - ]: 52 : GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1189 : : /* if we're EOS, return UNEXPECTED so that the task pauses. */
1190 [ + + ]: 52 : if (type == GST_EVENT_EOS) {
1191 [ - + ]: 17 : GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1192 : : "pushed EOS event %p, return UNEXPECTED", event);
1193 : 17 : result = GST_FLOW_UNEXPECTED;
1194 : : }
1195 : : }
1196 : 152636 : return result;
1197 : :
1198 : : /* ERRORS */
1199 : : no_item:
1200 : : {
1201 [ # # ]: 0 : GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1202 : : "exit because we have no item in the queue");
1203 : 0 : return GST_FLOW_ERROR;
1204 : : }
1205 : : out_flushing:
1206 : : {
1207 [ - + ]: 6 : GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
1208 : 152642 : return GST_FLOW_WRONG_STATE;
1209 : : }
1210 : : }
1211 : :
1212 : : static void
1213 : 152650 : gst_queue_loop (GstPad * pad)
1214 : : {
1215 : : GstQueue *queue;
1216 : : GstFlowReturn ret;
1217 : :
1218 : 152650 : queue = (GstQueue *) GST_PAD_PARENT (pad);
1219 : :
1220 : : /* have to lock for thread-safety */
1221 [ + - ]: 152650 : GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1222 : :
1223 [ + + ]: 237399 : while (gst_queue_is_empty (queue)) {
1224 [ - + ]: 84757 : GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is empty");
1225 [ + - ]: 84757 : if (!queue->silent) {
1226 : 84757 : GST_QUEUE_MUTEX_UNLOCK (queue);
1227 : 84757 : g_signal_emit (queue, gst_queue_signals[SIGNAL_UNDERRUN], 0);
1228 [ + + ]: 84757 : GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1229 : : }
1230 : :
1231 : : /* we recheck, the signal could have changed the thresholds */
1232 [ + + ]: 169416 : while (gst_queue_is_empty (queue)) {
1233 [ - + ][ # # ]: 84668 : GST_QUEUE_WAIT_ADD_CHECK (queue, out_flushing);
[ # # ][ # # ]
[ # # ][ # # ]
[ + + ][ - + ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ][ - + ]
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
1234 : : }
1235 : :
1236 [ - + ]: 84748 : GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "queue is not empty");
1237 [ + - ]: 84749 : if (!queue->silent) {
1238 : 84749 : GST_QUEUE_MUTEX_UNLOCK (queue);
1239 : 84743 : g_signal_emit (queue, gst_queue_signals[SIGNAL_RUNNING], 0);
1240 : 84749 : g_signal_emit (queue, gst_queue_signals[SIGNAL_PUSHING], 0);
1241 [ + - ]: 84749 : GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
1242 : : }
1243 : : }
1244 : :
1245 : 152635 : ret = gst_queue_push_one (queue);
1246 : 152642 : queue->push_newsegment = FALSE;
1247 : 152642 : queue->srcresult = ret;
1248 [ + + ]: 152642 : if (ret != GST_FLOW_OK)
1249 : 31 : goto out_flushing;
1250 : :
1251 : 152611 : GST_QUEUE_MUTEX_UNLOCK (queue);
1252 : :
1253 : 152611 : return;
1254 : :
1255 : : /* ERRORS */
1256 : : out_flushing:
1257 : : {
1258 : 39 : gboolean eos = queue->eos;
1259 : 39 : GstFlowReturn ret = queue->srcresult;
1260 : :
1261 : 39 : gst_pad_pause_task (queue->srcpad);
1262 [ - + ]: 39 : GST_CAT_LOG_OBJECT (queue_dataflow, queue,
1263 : : "pause task, reason: %s", gst_flow_get_name (ret));
1264 [ + + ][ - + ]: 39 : GST_QUEUE_SIGNAL_DEL (queue);
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
1265 : 39 : GST_QUEUE_MUTEX_UNLOCK (queue);
1266 : : /* let app know about us giving up if upstream is not expected to do so */
1267 : : /* UNEXPECTED is already taken care of elsewhere */
1268 [ + + ][ + - ]: 39 : if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_UNEXPECTED)) {
[ + + ]
1269 [ + - ][ - + ]: 1 : GST_ELEMENT_ERROR (queue, STREAM, FAILED,
[ + - ][ - + ]
1270 : : (_("Internal data flow error.")),
1271 : : ("streaming task paused, reason %s (%d)",
1272 : : gst_flow_get_name (ret), ret));
1273 : 1 : gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
1274 : : }
1275 : 152650 : return;
1276 : : }
1277 : : }
1278 : :
1279 : : static gboolean
1280 : 36 : gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
1281 : : {
1282 : 36 : gboolean res = TRUE;
1283 : 36 : GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad));
1284 : :
1285 : : #ifndef GST_DISABLE_GST_DEBUG
1286 [ - + ]: 36 : GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)",
1287 : : event, GST_EVENT_TYPE (event));
1288 : : #endif
1289 : :
1290 : 36 : res = gst_pad_push_event (queue->sinkpad, event);
1291 : :
1292 : 36 : return res;
1293 : : }
1294 : :
1295 : : static gboolean
1296 : 33 : gst_queue_handle_src_query (GstPad * pad, GstQuery * query)
1297 : : {
1298 : 33 : GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad));
1299 : : GstPad *peer;
1300 : : gboolean res;
1301 : :
1302 [ - + ]: 33 : if (!(peer = gst_pad_get_peer (queue->sinkpad)))
1303 : 0 : return FALSE;
1304 : :
1305 : 33 : res = gst_pad_query (peer, query);
1306 : 33 : gst_object_unref (peer);
1307 [ - + ]: 33 : if (!res)
1308 : 0 : return FALSE;
1309 : :
1310 [ - + - ]: 33 : switch (GST_QUERY_TYPE (query)) {
1311 : : case GST_QUERY_POSITION:
1312 : : {
1313 : : gint64 peer_pos;
1314 : : GstFormat format;
1315 : :
1316 : : /* get peer position */
1317 : 0 : gst_query_parse_position (query, &format, &peer_pos);
1318 : :
1319 : : /* FIXME: this code assumes that there's no discont in the queue */
1320 [ # # # ]: 0 : switch (format) {
1321 : : case GST_FORMAT_BYTES:
1322 : 0 : peer_pos -= queue->cur_level.bytes;
1323 : 0 : break;
1324 : : case GST_FORMAT_TIME:
1325 : 0 : peer_pos -= queue->cur_level.time;
1326 : 0 : break;
1327 : : default:
1328 [ # # ]: 0 : GST_DEBUG_OBJECT (queue, "Can't adjust query in %s format, don't "
1329 : : "know how to adjust value", gst_format_get_name (format));
1330 : 0 : return TRUE;
1331 : : }
1332 : : /* set updated position */
1333 : 0 : gst_query_set_position (query, format, peer_pos);
1334 : 0 : break;
1335 : : }
1336 : : case GST_QUERY_LATENCY:
1337 : : {
1338 : : gboolean live;
1339 : : GstClockTime min, max;
1340 : :
1341 : 33 : gst_query_parse_latency (query, &live, &min, &max);
1342 : :
1343 : : /* we can delay up to the limit of the queue in time. If we have no time
1344 : : * limit, the best thing we can do is to return an infinite delay. In
1345 : : * reality a better estimate would be the byte/buffer rate but that is not
1346 : : * possible right now. */
1347 [ + - ][ - + ]: 33 : if (queue->max_size.time > 0 && max != -1)
1348 : 0 : max += queue->max_size.time;
1349 : : else
1350 : 33 : max = -1;
1351 : :
1352 : : /* adjust for min-threshold */
1353 [ - + ][ # # ]: 33 : if (queue->min_threshold.time > 0 && min != -1)
1354 : 0 : min += queue->min_threshold.time;
1355 : :
1356 : 33 : gst_query_set_latency (query, live, min, max);
1357 : 33 : break;
1358 : : }
1359 : : default:
1360 : : /* peer handled other queries */
1361 : 0 : break;
1362 : : }
1363 : :
1364 : 33 : return TRUE;
1365 : : }
1366 : :
1367 : : static gboolean
1368 : 94 : gst_queue_sink_activate_push (GstPad * pad, gboolean active)
1369 : : {
1370 : 94 : gboolean result = TRUE;
1371 : : GstQueue *queue;
1372 : :
1373 : 94 : queue = GST_QUEUE (gst_pad_get_parent (pad));
1374 : :
1375 [ + + ]: 94 : if (active) {
1376 : 47 : GST_QUEUE_MUTEX_LOCK (queue);
1377 : 47 : queue->srcresult = GST_FLOW_OK;
1378 : 47 : queue->eos = FALSE;
1379 : 47 : queue->unexpected = FALSE;
1380 : 47 : GST_QUEUE_MUTEX_UNLOCK (queue);
1381 : : } else {
1382 : : /* step 1, unblock chain function */
1383 : 47 : GST_QUEUE_MUTEX_LOCK (queue);
1384 : 47 : queue->srcresult = GST_FLOW_WRONG_STATE;
1385 : 47 : gst_queue_locked_flush (queue);
1386 : 47 : GST_QUEUE_MUTEX_UNLOCK (queue);
1387 : : }
1388 : :
1389 : 94 : gst_object_unref (queue);
1390 : :
1391 : 94 : return result;
1392 : : }
1393 : :
1394 : : static gboolean
1395 : 94 : gst_queue_src_activate_push (GstPad * pad, gboolean active)
1396 : : {
1397 : 94 : gboolean result = FALSE;
1398 : : GstQueue *queue;
1399 : :
1400 : 94 : queue = GST_QUEUE (gst_pad_get_parent (pad));
1401 : :
1402 [ + + ]: 94 : if (active) {
1403 : 47 : GST_QUEUE_MUTEX_LOCK (queue);
1404 : 47 : queue->srcresult = GST_FLOW_OK;
1405 : 47 : queue->eos = FALSE;
1406 : 47 : queue->unexpected = FALSE;
1407 : : /* we do not start the task yet if the pad is not connected */
1408 [ + + ]: 47 : if (gst_pad_is_linked (pad))
1409 : 34 : result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
1410 : : else {
1411 [ - + ]: 13 : GST_INFO_OBJECT (queue, "not starting task as pad is not linked");
1412 : 13 : result = TRUE;
1413 : : }
1414 : 47 : GST_QUEUE_MUTEX_UNLOCK (queue);
1415 : : } else {
1416 : : /* step 1, unblock loop function */
1417 : 47 : GST_QUEUE_MUTEX_LOCK (queue);
1418 : 47 : queue->srcresult = GST_FLOW_WRONG_STATE;
1419 : : /* the item add signal will unblock */
1420 : 47 : g_cond_signal (queue->item_add);
1421 : 47 : GST_QUEUE_MUTEX_UNLOCK (queue);
1422 : :
1423 : : /* step 2, make sure streaming finishes */
1424 : 47 : result = gst_pad_stop_task (pad);
1425 : : }
1426 : :
1427 : 94 : gst_object_unref (queue);
1428 : :
1429 : 94 : return result;
1430 : : }
1431 : :
1432 : : static void
1433 : 22 : queue_capacity_change (GstQueue * queue)
1434 : : {
1435 [ - + ]: 22 : if (queue->leaky == GST_QUEUE_LEAK_DOWNSTREAM) {
1436 : 0 : gst_queue_leak_downstream (queue);
1437 : : }
1438 : :
1439 : : /* changing the capacity of the queue must wake up
1440 : : * the _chain function, it might have more room now
1441 : : * to store the buffer/event in the queue */
1442 [ - + ][ # # ]: 22 : GST_QUEUE_SIGNAL_DEL (queue);
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
1443 : 22 : }
1444 : :
1445 : : /* Changing the minimum required fill level must
1446 : : * wake up the _loop function as it might now
1447 : : * be able to preceed.
1448 : : */
1449 : : #define QUEUE_THRESHOLD_CHANGE(q)\
1450 : : GST_QUEUE_SIGNAL_ADD (q);
1451 : :
1452 : : static void
1453 : 26 : gst_queue_set_property (GObject * object,
1454 : : guint prop_id, const GValue * value, GParamSpec * pspec)
1455 : : {
1456 : 26 : GstQueue *queue = GST_QUEUE (object);
1457 : :
1458 : : /* someone could change levels here, and since this
1459 : : * affects the get/put funcs, we need to lock for safety. */
1460 : 26 : GST_QUEUE_MUTEX_LOCK (queue);
1461 : :
1462 [ + + + - : 26 : switch (prop_id) {
+ - + -
- ]
1463 : : case PROP_MAX_SIZE_BYTES:
1464 : 1 : queue->max_size.bytes = g_value_get_uint (value);
1465 : 1 : queue_capacity_change (queue);
1466 : 1 : break;
1467 : : case PROP_MAX_SIZE_BUFFERS:
1468 : 18 : queue->max_size.buffers = g_value_get_uint (value);
1469 : 18 : queue_capacity_change (queue);
1470 : 18 : break;
1471 : : case PROP_MAX_SIZE_TIME:
1472 : 3 : queue->max_size.time = g_value_get_uint64 (value);
1473 : 3 : queue_capacity_change (queue);
1474 : 3 : break;
1475 : : case PROP_MIN_THRESHOLD_BYTES:
1476 : 0 : queue->min_threshold.bytes = g_value_get_uint (value);
1477 : 0 : queue->orig_min_threshold.bytes = queue->min_threshold.bytes;
1478 [ # # ][ # # ]: 0 : QUEUE_THRESHOLD_CHANGE (queue);
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
1479 : 0 : break;
1480 : : case PROP_MIN_THRESHOLD_BUFFERS:
1481 : 1 : queue->min_threshold.buffers = g_value_get_uint (value);
1482 : 1 : queue->orig_min_threshold.buffers = queue->min_threshold.buffers;
1483 [ - + ][ # # ]: 1 : QUEUE_THRESHOLD_CHANGE (queue);
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
1484 : 1 : break;
1485 : : case PROP_MIN_THRESHOLD_TIME:
1486 : 0 : queue->min_threshold.time = g_value_get_uint64 (value);
1487 : 0 : queue->orig_min_threshold.time = queue->min_threshold.time;
1488 [ # # ][ # # ]: 0 : QUEUE_THRESHOLD_CHANGE (queue);
[ # # ][ # # ]
[ # # ][ # # ]
[ # # ]
1489 : 0 : break;
1490 : : case PROP_LEAKY:
1491 : 3 : queue->leaky = g_value_get_enum (value);
1492 : 3 : break;
1493 : : case PROP_SILENT:
1494 : 0 : queue->silent = g_value_get_boolean (value);
1495 : 0 : break;
1496 : : default:
1497 : 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1498 : 0 : break;
1499 : : }
1500 : :
1501 : 26 : GST_QUEUE_MUTEX_UNLOCK (queue);
1502 : 26 : }
1503 : :
1504 : : static void
1505 : 8 : gst_queue_get_property (GObject * object,
1506 : : guint prop_id, GValue * value, GParamSpec * pspec)
1507 : : {
1508 : 8 : GstQueue *queue = GST_QUEUE (object);
1509 : :
1510 : 8 : GST_QUEUE_MUTEX_LOCK (queue);
1511 : :
1512 [ - - + - : 8 : switch (prop_id) {
- - - - -
- - - ]
1513 : : case PROP_CUR_LEVEL_BYTES:
1514 : 0 : g_value_set_uint (value, queue->cur_level.bytes);
1515 : 0 : break;
1516 : : case PROP_CUR_LEVEL_BUFFERS:
1517 : 0 : g_value_set_uint (value, queue->cur_level.buffers);
1518 : 0 : break;
1519 : : case PROP_CUR_LEVEL_TIME:
1520 : 8 : g_value_set_uint64 (value, queue->cur_level.time);
1521 : 8 : break;
1522 : : case PROP_MAX_SIZE_BYTES:
1523 : 0 : g_value_set_uint (value, queue->max_size.bytes);
1524 : 0 : break;
1525 : : case PROP_MAX_SIZE_BUFFERS:
1526 : 0 : g_value_set_uint (value, queue->max_size.buffers);
1527 : 0 : break;
1528 : : case PROP_MAX_SIZE_TIME:
1529 : 0 : g_value_set_uint64 (value, queue->max_size.time);
1530 : 0 : break;
1531 : : case PROP_MIN_THRESHOLD_BYTES:
1532 : 0 : g_value_set_uint (value, queue->min_threshold.bytes);
1533 : 0 : break;
1534 : : case PROP_MIN_THRESHOLD_BUFFERS:
1535 : 0 : g_value_set_uint (value, queue->min_threshold.buffers);
1536 : 0 : break;
1537 : : case PROP_MIN_THRESHOLD_TIME:
1538 : 0 : g_value_set_uint64 (value, queue->min_threshold.time);
1539 : 0 : break;
1540 : : case PROP_LEAKY:
1541 : 0 : g_value_set_enum (value, queue->leaky);
1542 : 0 : break;
1543 : : case PROP_SILENT:
1544 : 0 : g_value_set_boolean (value, queue->silent);
1545 : 0 : break;
1546 : : default:
1547 : 0 : G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1548 : 0 : break;
1549 : : }
1550 : :
1551 : 8 : GST_QUEUE_MUTEX_UNLOCK (queue);
1552 : 8 : }
|