demux: remove logic duplication from packet read functions

There were 3 packet reading functions: the "old" demux_read_packet()
that blocked (leftover from MPlayer times, but was still used until
recently by some obscure code), the "new" demux_read_packet_async(), and
the special demux_read_any_packet(), that is used by pseudo-demuxers
like demux_edl.

The first two could be used both in threaded and un-threaded mode. This
made 5 cases in total. Some bits of logic was spread across all of them.

Unify the logic. A recent commit made demux_read_packet() private, and
the code for it in threaded mode disappears. The difference between
threaded and un-threaded is minimized.

It's possible that this commit causes random regression. Enjoy.
This commit is contained in:
wm4 2019-05-17 20:53:09 +02:00
parent 287166b02e
commit f08387c552
1 changed files with 57 additions and 68 deletions

View File

@ -1855,22 +1855,52 @@ static struct demux_packet *advance_reader_head(struct demux_stream *ds)
return pkt;
}
static struct demux_packet *dequeue_packet(struct demux_stream *ds)
// Returns:
// < 0: EOF was reached, *res is not set
// == 0: no new packet yet, wait, *res is not set
// > 0: new packet is moved to *res
static int dequeue_packet(struct demux_stream *ds, struct demux_packet **res)
{
struct demux_internal *in = ds->in;
if (!ds->selected)
return -1;
if (in->blocked)
return 0;
if (ds->sh->attached_picture) {
ds->eof = true;
if (ds->attached_picture_added)
return NULL;
return -1;
ds->attached_picture_added = true;
struct demux_packet *pkt = demux_copy_packet(ds->sh->attached_picture);
if (!pkt)
abort();
pkt->stream = ds->sh->index;
return pkt;
*res = pkt;
return 1;
}
if (!ds->reader_head || ds->in->blocked)
return NULL;
if (ds->eager) {
in->reading = true; // enable readahead
in->eof = false; // force retry
pthread_cond_signal(&in->wakeup); // possibly read more
}
ds->need_wakeup = !ds->reader_head;
if (!ds->reader_head) {
if (!ds->eager) {
// Non-eager streams temporarily return EOF. If they returned 0,
// the reader would have to wait for new packets, which does not
// make sense due to the sparseness and passiveness of non-eager
// streams.
return -1;
}
return ds->eof ? -1 : 0;
}
struct demux_packet *pkt = advance_reader_head(ds);
assert(pkt);
// The returned packet is mutated etc. and will be owned by the user.
pkt = demux_copy_packet(pkt);
@ -1924,42 +1954,8 @@ static struct demux_packet *dequeue_packet(struct demux_stream *ds)
}
prune_old_packets(ds->in);
return pkt;
}
// Read a packet from the given stream. The returned packet belongs to the
// caller, who has to free it with talloc_free(). Might block. Returns NULL
// on EOF.
static struct demux_packet *demux_read_packet(struct sh_stream *sh)
{
struct demux_stream *ds = sh ? sh->ds : NULL;
if (!ds)
return NULL;
struct demux_internal *in = ds->in;
pthread_mutex_lock(&in->lock);
if (ds->eager) {
const char *t = stream_type_name(ds->type);
MP_DBG(in, "reading packet for %s\n", t);
in->eof = false; // force retry
ds->need_wakeup = true;
while (ds->selected && !ds->reader_head && !in->blocked) {
in->reading = true;
// Note: the following code marks EOF if it can't continue
if (in->threading) {
MP_VERBOSE(in, "waiting for demux thread (%s)\n", t);
pthread_cond_signal(&in->wakeup);
pthread_cond_wait(&in->wakeup, &in->lock);
} else {
thread_work(in);
}
if (ds->eof)
break;
}
}
struct demux_packet *pkt = dequeue_packet(ds);
pthread_cond_signal(&in->wakeup); // possibly read more
pthread_mutex_unlock(&in->lock);
return pkt;
*res = pkt;
return 1;
}
// Poll the demuxer queue, and if there's a packet, return it. Otherwise, just
@ -1977,32 +1973,21 @@ static struct demux_packet *demux_read_packet(struct sh_stream *sh)
int demux_read_packet_async(struct sh_stream *sh, struct demux_packet **out_pkt)
{
struct demux_stream *ds = sh ? sh->ds : NULL;
int r = -1;
*out_pkt = NULL;
if (!ds)
return r;
if (ds->in->threading) {
pthread_mutex_lock(&ds->in->lock);
*out_pkt = dequeue_packet(ds);
if (ds->eager) {
r = *out_pkt ? 1 : (ds->eof ? -1 : 0);
ds->in->reading = true; // enable readahead
ds->in->eof = false; // force retry
pthread_cond_signal(&ds->in->wakeup); // possibly read more
} else {
r = *out_pkt ? 1 : -1;
}
ds->need_wakeup = r != 1;
pthread_mutex_unlock(&ds->in->lock);
} else {
if (ds->in->blocked) {
r = 0;
} else {
*out_pkt = demux_read_packet(sh);
r = *out_pkt ? 1 : -1;
}
ds->need_wakeup = r != 1;
return -1;
struct demux_internal *in = ds->in;
pthread_mutex_lock(&in->lock);
int r = -1;
while (1) {
r = dequeue_packet(ds, out_pkt);
if (in->threading || in->blocked || r != 0)
break;
// Needs to actually read packets until we got a packet or EOF.
thread_work(in);
}
pthread_mutex_unlock(&in->lock);
return r;
}
@ -2013,16 +1998,20 @@ struct demux_packet *demux_read_any_packet(struct demuxer *demuxer)
assert(!in->threading); // doesn't work with threading
bool read_more = true;
while (read_more && !in->blocked) {
bool all_eof = true;
for (int n = 0; n < in->num_streams; n++) {
in->reading = true; // force read_packet() to read
struct demux_packet *pkt = dequeue_packet(in->streams[n]->ds);
if (pkt)
return pkt;
struct demux_packet *out_pkt = NULL;
int r = dequeue_packet(in->streams[n]->ds, &out_pkt);
if (r > 0)
return out_pkt;
if (r == 0)
all_eof = false;
}
// retry after calling this
pthread_mutex_lock(&in->lock); // lock only because thread_work unlocks
read_more = thread_work(in);
read_more &= !in->eof;
read_more &= !all_eof;
pthread_mutex_unlock(&in->lock);
}
return NULL;