demux: simplify some internals, stop trying to read packets after EOF

Remove some redundant fields that controlled or indicated whether the
demuxer was/could/should prefetch. Redefine how the eof/reading fields
work.

The in->eof field is now always valid, instead of weirdly being reset to
false in random situations. The in->reading field now corresponds to
whether the demuxer thread is working at all, and is reset if it stops
doing anything.

Also, I always found it stupid that dequeue_packet() forced the demuxer
thread to retry reading if it was EOF. This makes little sense, but was
probably added for files that are being appended to (running downloads).
It makes no sense, because if the cache really tried to read until file
EOF, it would encounter partial packets and throw errors, so all is lost
anyway. Plus stream_file now handles this better. So stop this behavior,
but add a temporary option that enables the old behavior.

I think checking for ds->eager when enabling prefetching never really
made sense (could be debated, but no, not really). On the other hand,
the change above exposed a missing wakeup in the backward demuxing code.

Some chances of regressions that could make it stuck in certain states
or so, or incorrect demuxer cache state reporting to the player
frontend.
This commit is contained in:
wm4 2020-02-27 22:16:30 +01:00
parent 611c92ef1d
commit b56e2efd5f
2 changed files with 37 additions and 27 deletions

View File

@ -3528,6 +3528,16 @@ Demuxer
``--cache-secs`` is used (i.e. when the stream appears to be a network
stream or the stream cache is enabled).
``--demuxer-force-retry-on-eof=<yes|no>``
Whether to keep retrying making the demuxer thread read more packets each
time the decoder dequeues a packet, even if the end of the file was reached
(default: no). This does not really make sense, but was the default behavior
in mpv 0.32.0 and earlier. This option will be silently removed after a
while, and exists only to restore the old behavior for testing, in case this
was actually needed somewhere. This does _not_ help with files that are
being appended to (in these cases use ``appending://``, or disable the
cache).
``--demuxer-thread=<yes|no>``
Run the demuxer in a separate thread, and let it prefetch a certain amount
of packets (default: yes). Having this enabled leads to smoother playback,

View File

@ -100,6 +100,7 @@ struct demux_opts {
int back_batch[STREAM_TYPE_COUNT];
double back_seek_size;
char *meta_cp;
int force_retry_eof;
};
#define OPT_BASE_STRUCT struct demux_opts
@ -135,6 +136,7 @@ const struct m_sub_options demux_conf = {
OPT_DOUBLE("demuxer-backward-playback-step", back_seek_size, M_OPT_MIN,
.min = 0),
OPT_STRING("metadata-codepage", meta_cp, 0),
OPT_FLAG("demuxer-force-retry-on-eof", force_retry_eof, 0),
{0}
},
.size = sizeof(struct demux_opts),
@ -202,9 +204,7 @@ struct demux_internal {
struct demux_cache *cache;
bool warned_queue_overflow;
bool last_eof; // last actual global EOF status
bool eof; // whether we're in EOF state (reset for retry)
bool idle;
bool eof; // whether we're in EOF state
double min_secs;
size_t max_bytes;
size_t max_bytes_bw;
@ -212,8 +212,11 @@ struct demux_internal {
bool using_network_cache_opts;
char *record_filename;
// At least one decoder actually requested data since init or the last seek.
// Do this to allow the decoder thread to select streams before starting.
// Whether the demuxer thread should prefetch packets. This is set to false
// if EOF was reached or the demuxer cache is full. This is also important
// in the initial state: the decoder thread needs to select streams before
// the first packet is read, so this is set to true by packet reading only.
// Reset to false again on EOF or if prefetching is done.
bool reading;
// Set if we just performed a seek, without reading packets yet. Used to
@ -1559,6 +1562,7 @@ resume_earlier:
ds->reader_head = t;
ds->back_need_recheck = true;
in->back_any_need_recheck = true;
pthread_cond_signal(&in->wakeup);
} else {
ds->back_seek_pos -= in->opts->back_seek_size;
in->need_back_seek = true;
@ -2082,7 +2086,7 @@ static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp)
if (!ds->ignore_eof) {
// obviously not true anymore
ds->eof = false;
in->last_eof = in->eof = false;
in->eof = false;
}
// For video, PTS determination is not trivial, but for other media types
@ -2148,7 +2152,7 @@ static bool lazy_stream_needs_wait(struct demux_stream *ds)
// Attempt to read until force_read_until was reached, or reading has
// stopped for some reason (true EOF, queue overflow).
return !ds->eager && !ds->reader_head && !in->back_demuxing &&
!in->last_eof && ds->force_read_until != MP_NOPTS_VALUE &&
!in->eof && ds->force_read_until != MP_NOPTS_VALUE &&
(in->demux_ts == MP_NOPTS_VALUE ||
in->demux_ts <= ds->force_read_until);
}
@ -2156,10 +2160,10 @@ static bool lazy_stream_needs_wait(struct demux_stream *ds)
// Returns true if there was "progress" (lock was released temporarily).
static bool read_packet(struct demux_internal *in)
{
in->eof = false;
in->idle = true;
bool was_reading = in->reading;
in->reading = false;
if (!in->reading || in->blocked || demux_cancel_test(in->d_thread))
if (!was_reading || in->blocked || demux_cancel_test(in->d_thread))
return false;
// Check if we need to read a new packet. We do this if all queues are below
@ -2237,7 +2241,7 @@ static bool read_packet(struct demux_internal *in)
// Actually read a packet. Drop the lock while doing so, because waiting
// for disk or network I/O can take time.
in->idle = false;
in->reading = true;
in->after_seek = false;
in->after_seek_to_start = false;
pthread_mutex_unlock(&in->lock);
@ -2262,14 +2266,15 @@ static bool read_packet(struct demux_internal *in)
for (int n = 0; n < in->num_streams; n++)
mark_stream_eof(in->streams[n]->ds);
// If we had EOF previously, then don't wakeup (avoids wakeup loop)
if (!in->last_eof) {
if (!in->eof) {
if (in->wakeup_cb)
in->wakeup_cb(in->wakeup_cb_ctx);
pthread_cond_signal(&in->wakeup);
MP_VERBOSE(in, "EOF reached.\n");
}
}
in->eof = in->last_eof = eof;
in->eof = eof;
in->reading = !eof;
}
return true;
}
@ -2409,7 +2414,7 @@ static void execute_seek(struct demux_internal *in)
{
int flags = in->seek_flags;
double pts = in->seek_pts;
in->last_eof = in->eof = false;
in->eof = false;
in->seeking = false;
in->seeking_in_progress = pts;
in->demux_ts = MP_NOPTS_VALUE;
@ -2524,10 +2529,8 @@ static bool thread_work(struct demux_internal *in)
execute_seek(in);
return true;
}
if (!in->eof) {
if (read_packet(in))
return true; // read_packet unlocked, so recheck conditions
}
if (read_packet(in))
return true; // read_packet unlocked, so recheck conditions
if (mp_time_us() >= in->next_cache_update) {
update_cache(in);
return true;
@ -2630,10 +2633,9 @@ static int dequeue_packet(struct demux_stream *ds, double min_pts,
return 1;
}
if (ds->eager) {
in->reading = true; // enable readahead
in->eof = false; // force retry
pthread_cond_signal(&in->wakeup); // possibly read more
if (!in->reading && (!in->eof || in->opts->force_retry_eof)) {
in->reading = true; // enable demuxer thread prefetching
pthread_cond_signal(&in->wakeup);
}
ds->force_read_until = min_pts;
@ -2762,7 +2764,7 @@ int demux_read_packet_async(struct sh_stream *sh, struct demux_packet **out_pkt)
// Like demux_read_packet_async(). They are the same for min_pts==MP_NOPTS_VALUE.
// If min_pts is set, and the stream is lazily read (eager=false, interleaved
// subtitles), then return 0 until demuxing has reached min_pts, or the queue
// overflowed, or EOF was reached, or packet was read for this stream.
// overflowed, or EOF was reached, or a packet was read for this stream.
int demux_read_packet_async_until(struct sh_stream *sh, double min_pts,
struct demux_packet **out_pkt)
{
@ -2797,7 +2799,6 @@ struct demux_packet *demux_read_any_packet(struct demuxer *demuxer)
while (read_more && !in->blocked) {
bool all_eof = true;
for (int n = 0; n < in->num_streams; n++) {
in->reading = true; // force read_packet() to read
int r = dequeue_packet(in->streams[n]->ds, MP_NOPTS_VALUE, &out_pkt);
if (r > 0)
goto done;
@ -3787,7 +3788,6 @@ static bool queue_seek(struct demux_internal *in, double seek_pts, int flags,
}
in->eof = false;
in->idle = true;
in->reading = false;
in->back_demuxing = set_backwards;
@ -4411,7 +4411,7 @@ void demux_get_reader_state(struct demuxer *demuxer, struct demux_reader_state *
pthread_mutex_lock(&in->lock);
*r = (struct demux_reader_state){
.eof = in->last_eof,
.eof = in->eof,
.ts_reader = MP_NOPTS_VALUE,
.ts_end = MP_NOPTS_VALUE,
.ts_duration = -1,
@ -4434,7 +4434,7 @@ void demux_get_reader_state(struct demuxer *demuxer, struct demux_reader_state *
}
r->fw_bytes += get_foward_buffered_bytes(ds);
}
r->idle = (in->idle && !r->underrun) || r->eof;
r->idle = (!in->reading && !r->underrun) || r->eof;
r->underrun &= !r->idle && in->threading;
r->ts_reader = MP_ADD_PTS(r->ts_reader, in->ts_offset);
r->ts_end = MP_ADD_PTS(r->ts_end, in->ts_offset);