f_async_queue: add various helper functions

Shouldn't change the behavior if not used. Will probably be used in a
later commit.
This commit is contained in:
wm4 2020-08-28 20:08:32 +02:00
parent 71d118733a
commit 3d0eb4c26c
2 changed files with 105 additions and 2 deletions

View File

@ -155,6 +155,24 @@ void mp_async_queue_reset(struct mp_async_queue *queue)
reset_queue(queue->q);
}
bool mp_async_queue_is_active(struct mp_async_queue *queue)
{
struct async_queue *q = queue->q;
pthread_mutex_lock(&q->lock);
bool res = q->active;
pthread_mutex_unlock(&q->lock);
return res;
}
bool mp_async_queue_is_full(struct mp_async_queue *queue)
{
struct async_queue *q = queue->q;
pthread_mutex_lock(&q->lock);
bool res = is_full(q);
pthread_mutex_unlock(&q->lock);
return res;
}
void mp_async_queue_resume(struct mp_async_queue *queue)
{
struct async_queue *q = queue->q;
@ -169,8 +187,44 @@ void mp_async_queue_resume(struct mp_async_queue *queue)
pthread_mutex_unlock(&q->lock);
}
void mp_async_queue_resume_reading(struct mp_async_queue *queue)
{
struct async_queue *q = queue->q;
pthread_mutex_lock(&q->lock);
if (!q->active || !q->reading) {
q->active = true;
q->reading = true;
// Possibly start producer/consumer.
for (int n = 0; n < 2; n++) {
if (q->conn[n])
mp_filter_wakeup(q->conn[n]);
}
}
pthread_mutex_unlock(&q->lock);
}
int64_t mp_async_queue_get_samples(struct mp_async_queue *queue)
{
struct async_queue *q = queue->q;
pthread_mutex_lock(&q->lock);
int64_t res = q->samples_size;
pthread_mutex_unlock(&q->lock);
return res;
}
int mp_async_queue_get_frames(struct mp_async_queue *queue)
{
struct async_queue *q = queue->q;
pthread_mutex_lock(&q->lock);
int res = q->num_frames;
pthread_mutex_unlock(&q->lock);
return res;
}
struct priv {
struct async_queue *q;
struct mp_filter *notify;
};
static void destroy(struct mp_filter *f)
@ -212,9 +266,14 @@ static void process_in(struct mp_filter *f)
// Notify reader that we have new frames.
if (q->conn[1])
mp_filter_wakeup(q->conn[1]);
if (!is_full(q))
bool full = is_full(q);
if (!full)
mp_pin_out_request_data_next(f->ppins[0]);
if (p->notify && full)
mp_filter_wakeup(p->notify);
}
if (p->notify && !q->num_frames)
mp_filter_wakeup(p->notify);
pthread_mutex_unlock(&q->lock);
}
@ -275,6 +334,17 @@ static const struct mp_filter_info info_out = {
.process = process_out,
};
void mp_async_queue_set_notifier(struct mp_filter *f, struct mp_filter *notify)
{
assert(mp_filter_get_info(f) == &info_in);
struct priv *p = f->priv;
if (p->notify != notify) {
p->notify = notify;
if (notify)
mp_filter_wakeup(notify);
}
}
struct mp_filter *mp_async_queue_create_filter(struct mp_filter *parent,
enum mp_pin_dir dir,
struct mp_async_queue *queue)

View File

@ -28,6 +28,28 @@ void mp_async_queue_reset(struct mp_async_queue *queue);
// fill up.
void mp_async_queue_resume(struct mp_async_queue *queue);
// Like mp_async_queue_resume(), but also allows the producer writing to the
// queue, even if the consumer will request any data yet.
void mp_async_queue_resume_reading(struct mp_async_queue *queue);
// Returns true if out of mp_async_queue_reset()/mp_async_queue_resume(), the
// latter was most recently called.
bool mp_async_queue_is_active(struct mp_async_queue *queue);
// Returns true if the queue reached its configured size, the input filter
// accepts no further frames. Always returns false if not active (then it does
// not accept any input at all).
bool mp_async_queue_is_full(struct mp_async_queue *queue);
// Get the total of samples buffered within the queue itself. This doesn't count
// samples buffered in the access filters. mp_async_queue_config.sample_unit is
// used to define what "1 sample" means.
int64_t mp_async_queue_get_samples(struct mp_async_queue *queue);
// Get the total number of frames buffered within the queue itself. Frames
// buffered in the access filters are not included.
int mp_async_queue_get_frames(struct mp_async_queue *queue);
// Create a filter to access the queue, and connect it. It's not allowed to
// connect an already connected end of the queue. The filter can be freed at
// any time.
@ -44,7 +66,7 @@ void mp_async_queue_resume(struct mp_async_queue *queue);
// queue state is the API user's responsibility. Note that resetting an input
// filter (dir==MP_PIN_IN) while the queue is active and in "reading" state
// (the output filter requested data at any point before the last
// mp_async_queue_reset() was called), the
// mp_async_queue_reset(), or mp_async_queue_resume_reading() was called), the
// filter will immediately request data after the reset.
//
// For proper global reset, this order should be preferred:
@ -63,6 +85,17 @@ struct mp_filter *mp_async_queue_create_filter(struct mp_filter *parent,
enum mp_pin_dir dir,
struct mp_async_queue *queue);
// Set a filter that should be woken up with mp_filter_wakeup() in the following
// situations:
// - mp_async_queue_is_full() changes to true (at least for a short moment)
// - mp_async_queue_get_frames() changes to 0 (at least until new data is fed)
// This is a workaround for the filter design, which does not allow you to write
// to the queue in a "sequential" way (write, then check condition).
// Calling this again on the same filter removes the previous notify filter.
// f: must be a filter returned by mp_async_queue_create_filter(, MP_PIN_IN,)
// notify: filter to be woken up
void mp_async_queue_set_notifier(struct mp_filter *f, struct mp_filter *notify);
enum mp_async_queue_sample_unit {
AQUEUE_UNIT_FRAME = 0, // a frame counts as 1 sample
AQUEUE_UNIT_SAMPLES, // number of audio samples (1 for other media types,