stream: redo playback abort handling

This mechanism originates from MPlayer's way of dealing with blocking
network, but it's still useful. On opening and closing, mpv waits for
network synchronously, and also some obscure commands and use-cases can
lead to such blocking. In these situations, the stream is asynchronously
forced to stop by "interrupting" it.

The old design interrupting I/O was a bit broken: polling with a
callback, instead of actively interrupting it. Change the direction of
this. There is no callback anymore, and the player calls
mp_cancel_trigger() to force the stream to return.

libavformat (via stream_lavf.c) has the old broken design, and fixing it
would require fixing libavformat, which won't happen so quickly. So we
have to keep that part. But everything above the stream layer is
prepared for a better design, and more sophisticated methods than
mp_cancel_test() could be easily introduced.

There's still one problem: commands are still run in the central
playback loop, which we assume can block on I/O in the worst case.
That's not a problem yet, because we simply mark some commands as being
able to stop playback of the current file ("quit" etc.), so input.c
could abort playback as soon as such a command is queued. But there are
also commands abort playback only conditionally, and the logic for that
is in the playback core and thus "unreachable". For example,
"playlist_next" aborts playback only if there's a next file. We don't
want it to always abort playback.

As a quite ugly hack, abort playback only if at least 2 abort commands
are queued - this pretty much happens only if the core is frozen and
doesn't react to input.
This commit is contained in:
wm4 2014-09-13 14:23:08 +02:00
parent 2dd819705d
commit 2e91d44e20
13 changed files with 86 additions and 47 deletions

View File

@ -7,9 +7,6 @@
struct mpv_global {
struct MPOpts *opts;
struct mp_log *log;
int (*stream_interrupt_cb)(void *ctx);
void *stream_interrupt_cb_ctx;
};
#endif

View File

@ -156,6 +156,8 @@ struct input_ctx {
int num_sources;
struct cmd_queue cmd_queue;
struct mp_cancel *cancel;
};
static int parse_config(struct input_ctx *ictx, bool builtin, bstr data,
@ -243,10 +245,10 @@ static int queue_count_cmds(struct cmd_queue *queue)
return res;
}
static bool queue_has_abort_cmds(struct cmd_queue *queue)
static bool has_abort_cmds(struct input_ctx *ictx)
{
bool ret = false;
for (struct mp_cmd *cmd = queue->first; cmd; cmd = cmd->queue_next)
for (struct mp_cmd *cmd = ictx->cmd_queue.first; cmd; cmd = cmd->queue_next)
if (mp_input_is_abort_cmd(cmd)) {
ret = true;
break;
@ -556,8 +558,8 @@ static bool key_updown_ok(enum mp_command_type cmd)
static bool should_drop_cmd(struct input_ctx *ictx, struct mp_cmd *cmd)
{
struct cmd_queue *queue = &ictx->cmd_queue;
return (queue_count_cmds(queue) >= ictx->key_fifo_size &&
(!mp_input_is_abort_cmd(cmd) || queue_has_abort_cmds(queue)));
return queue_count_cmds(queue) >= ictx->key_fifo_size &&
!mp_input_is_abort_cmd(cmd);
}
static struct mp_cmd *resolve_key(struct input_ctx *ictx, int code)
@ -796,6 +798,9 @@ int mp_input_queue_cmd(struct input_ctx *ictx, mp_cmd_t *cmd)
{
input_lock(ictx);
if (cmd) {
// Abort only if there are going to be at least 2 commands in the queue.
if (ictx->cancel && mp_input_is_abort_cmd(cmd) && has_abort_cmds(ictx))
mp_cancel_trigger(ictx->cancel);
queue_add_tail(&ictx->cmd_queue, cmd);
mp_input_wakeup(ictx);
}
@ -1304,12 +1309,11 @@ void mp_input_uninit(struct input_ctx *ictx)
talloc_free(ictx);
}
bool mp_input_check_interrupt(struct input_ctx *ictx)
void mp_input_set_cancel(struct input_ctx *ictx, struct mp_cancel *cancel)
{
input_lock(ictx);
bool res = queue_has_abort_cmds(&ictx->cmd_queue);
ictx->cancel = cancel;
input_unlock(ictx);
return res;
}
bool mp_input_use_alt_gr(struct input_ctx *ictx)

View File

@ -228,8 +228,10 @@ void mp_input_wakeup(struct input_ctx *ictx);
void mp_input_wakeup_nolock(struct input_ctx *ictx);
// Interruptible usleep: (used by demux)
bool mp_input_check_interrupt(struct input_ctx *ictx);
// Used to asynchronously abort playback. Needed because the core still can
// block on network in some situations.
struct mp_cancel;
void mp_input_set_cancel(struct input_ctx *ictx, struct mp_cancel *cancel);
// If this returns true, use Right Alt key as Alt Gr to produce special
// characters. If false, count Right Alt as the modifier Alt key.

View File

@ -169,6 +169,7 @@ typedef struct MPContext {
struct input_ctx *input;
struct mp_client_api *clients;
struct mp_dispatch_queue *dispatch;
struct mp_cancel *playback_abort;
struct mp_log *statusline;
struct osd_state *osd;

View File

@ -1005,6 +1005,8 @@ static void play_current_file(struct MPContext *mpctx)
mp_notify(mpctx, MPV_EVENT_START_FILE, NULL);
mp_cancel_reset(mpctx->playback_abort);
mpctx->stop_play = 0;
mpctx->filename = NULL;
mpctx->shown_aframes = 0;
@ -1085,7 +1087,8 @@ static void play_current_file(struct MPContext *mpctx)
int stream_flags = STREAM_READ;
if (!opts->load_unsafe_playlists)
stream_flags |= mpctx->playing->stream_flags;
mpctx->stream = stream_create(stream_filename, stream_flags, mpctx->global);
mpctx->stream = stream_create(stream_filename, stream_flags,
mpctx->playback_abort, mpctx->global);
if (!mpctx->stream) { // error...
mp_process_input(mpctx);
goto terminate_playback;
@ -1307,6 +1310,8 @@ terminate_playback:
if (mpctx->step_frames)
opts->pause = 1;
mp_cancel_trigger(mpctx->playback_abort);
MP_INFO(mpctx, "\n");
// time to uninit all, except global stuff:

View File

@ -308,6 +308,7 @@ struct MPContext *mp_create(void)
.osd_progbar = { .type = -1 },
.playlist = talloc_struct(mpctx, struct playlist, {0}),
.dispatch = mp_dispatch_create(mpctx),
.playback_abort = mp_cancel_new(mpctx),
};
mpctx->global = talloc_zero(mpctx, struct mpv_global);
@ -341,12 +342,6 @@ struct MPContext *mp_create(void)
return mpctx;
}
static int check_stream_interrupt(void *ctx)
{
struct MPContext *mpctx = ctx;
return mp_input_check_interrupt(mpctx->input);
}
static void wakeup_playloop(void *ctx)
{
struct MPContext *mpctx = ctx;
@ -383,8 +378,7 @@ int mp_initialize(struct MPContext *mpctx)
}
mpctx->input = mp_input_init(mpctx->global);
mpctx->global->stream_interrupt_cb = check_stream_interrupt;
mpctx->global->stream_interrupt_cb_ctx = mpctx;
mp_input_set_cancel(mpctx->input, mpctx->playback_abort);
mp_dispatch_set_wakeup_fn(mpctx->dispatch, wakeup_playloop, mpctx);

View File

@ -138,7 +138,7 @@ static int64_t mp_clipi64(int64_t val, int64_t min, int64_t max)
// Returns CACHE_INTERRUPTED if the caller is supposed to abort.
static int cache_wakeup_and_wait(struct priv *s, double *retry_time)
{
if (stream_check_interrupt(s->cache))
if (mp_cancel_test(s->cache->cancel))
return CACHE_INTERRUPTED;
double start = mp_time_sec();
@ -652,7 +652,7 @@ int stream_cache_init(stream_t *cache, stream_t *stream,
if (min < 1)
return 1;
for (;;) {
if (stream_check_interrupt(cache))
if (mp_cancel_test(cache->cancel))
return -1;
int64_t fill;
int idle;

View File

@ -386,7 +386,8 @@ int RarParse(struct stream *s, int *count, rar_file_t ***file)
if (!volume_mrl)
goto done;
vol = stream_create(volume_mrl, STREAM_READ | STREAM_NO_FILTERS, s->global);
vol = stream_create(volume_mrl, STREAM_READ | STREAM_NO_FILTERS,
s->cancel, s->global);
if (!vol)
goto done;
@ -423,7 +424,7 @@ int RarSeek(rar_file_t *file, uint64_t position)
free_stream(file->s);
file->s = stream_create(file->current_chunk->mrl,
STREAM_READ | STREAM_NO_FILTERS,
file->global);
file->cancel, file->global);
}
return file->s ? stream_seek(file->s, offset) : 0;
}

View File

@ -45,6 +45,7 @@ typedef struct {
// When actually reading the data
struct mpv_global *global;
struct mp_cancel *cancel;
uint64_t i_pos;
stream_t *s;
rar_file_chunk_t *current_chunk;

View File

@ -32,6 +32,7 @@
#include <libavutil/common.h>
#include "osdep/mpbswap.h"
#include "osdep/atomics.h"
#include "talloc.h"
@ -252,8 +253,8 @@ static const char *match_proto(const char *url, const char *proto)
}
static int open_internal(const stream_info_t *sinfo, struct stream *underlying,
const char *url, int flags, struct mpv_global *global,
struct stream **ret)
const char *url, int flags, struct mp_cancel *c,
struct mpv_global *global, struct stream **ret)
{
if (sinfo->stream_filter != !!underlying)
return STREAM_NO_MATCH;
@ -281,6 +282,7 @@ static int open_internal(const stream_info_t *sinfo, struct stream *underlying,
s->log = mp_log_new(s, global->log, sinfo->name);
s->info = sinfo;
s->opts = global->opts;
s->cancel = c;
s->global = global;
s->url = talloc_strdup(s, url);
s->path = talloc_strdup(s, path);
@ -335,7 +337,8 @@ static int open_internal(const stream_info_t *sinfo, struct stream *underlying,
return STREAM_OK;
}
struct stream *stream_create(const char *url, int flags, struct mpv_global *global)
struct stream *stream_create(const char *url, int flags,
struct mp_cancel *c, struct mpv_global *global)
{
struct mp_log *log = mp_log_new(NULL, global->log, "!stream");
struct stream *s = NULL;
@ -344,7 +347,7 @@ struct stream *stream_create(const char *url, int flags, struct mpv_global *glob
// Open stream proper
bool unsafe = false;
for (int i = 0; stream_list[i]; i++) {
int r = open_internal(stream_list[i], NULL, url, flags, global, &s);
int r = open_internal(stream_list[i], NULL, url, flags, c, global, &s);
if (r == STREAM_OK)
break;
if (r == STREAM_NO_MATCH || r == STREAM_UNSUPPORTED)
@ -375,7 +378,7 @@ struct stream *stream_create(const char *url, int flags, struct mpv_global *glob
for (;;) {
struct stream *new = NULL;
for (int i = 0; stream_list[i]; i++) {
int r = open_internal(stream_list[i], s, s->url, flags, global, &new);
int r = open_internal(stream_list[i], s, s->url, flags, c, global, &new);
if (r == STREAM_OK)
break;
}
@ -391,12 +394,12 @@ done:
struct stream *stream_open(const char *filename, struct mpv_global *global)
{
return stream_create(filename, STREAM_READ, global);
return stream_create(filename, STREAM_READ, NULL, global);
}
stream_t *open_output_stream(const char *filename, struct mpv_global *global)
{
return stream_create(filename, STREAM_WRITE, global);
return stream_create(filename, STREAM_WRITE, NULL, global);
}
static int stream_reconnect(stream_t *s)
@ -407,7 +410,7 @@ static int stream_reconnect(stream_t *s)
return 0;
if (!s->seekable)
return 0;
if (stream_check_interrupt(s))
if (mp_cancel_test(s->cancel))
return 0;
int64_t pos = s->pos;
int sleep_ms = 5;
@ -419,7 +422,7 @@ static int stream_reconnect(stream_t *s)
sleep_ms = MPMIN(sleep_ms * 2, RECONNECT_SLEEP_MAX_MS);
}
if (stream_check_interrupt(s))
if (mp_cancel_test(s->cancel))
return 0;
s->eof = 1;
@ -766,13 +769,6 @@ void free_stream(stream_t *s)
talloc_free(s);
}
bool stream_check_interrupt(struct stream *s)
{
if (!s->global || !s->global->stream_interrupt_cb)
return false;
return s->global->stream_interrupt_cb(s->global->stream_interrupt_cb_ctx);
}
stream_t *open_memory_stream(void *data, int len)
{
assert(len >= 0);
@ -802,6 +798,7 @@ static stream_t *open_cache(stream_t *orig, const char *name)
cache->streaming = orig->streaming,
cache->is_network = orig->is_network;
cache->opts = orig->opts;
cache->cancel = orig->cancel;
cache->global = orig->global;
cache->log = mp_log_new(cache, cache->global->log, name);
@ -989,6 +986,36 @@ struct bstr stream_read_complete(struct stream *s, void *talloc_ctx,
return (struct bstr){buf, total_read};
}
struct mp_cancel {
atomic_bool triggered;
};
struct mp_cancel *mp_cancel_new(void *talloc_ctx)
{
struct mp_cancel *c = talloc_ptrtype(talloc_ctx, c);
*c = (struct mp_cancel){.triggered = ATOMIC_VAR_INIT(false)};
return c;
}
// Request abort.
void mp_cancel_trigger(struct mp_cancel *c)
{
c->triggered = true;
}
// Restore original state. (Allows reusing a mp_cancel.)
void mp_cancel_reset(struct mp_cancel *c)
{
c->triggered = false;
}
// Return whether the caller should abort.
// For convenience, c==NULL is allowed.
bool mp_cancel_test(struct mp_cancel *c)
{
return c ? c->triggered : false;
}
void stream_print_proto_list(struct mp_log *log)
{
int count = 0;

View File

@ -189,6 +189,8 @@ typedef struct stream {
struct MPOpts *opts;
struct mpv_global *global;
struct mp_cancel *cancel; // cancellation notification
FILE *capture_file;
char *capture_filename;
@ -248,16 +250,20 @@ struct bstr stream_read_complete(struct stream *s, void *talloc_ctx,
int max_size);
int stream_control(stream_t *s, int cmd, void *arg);
void free_stream(stream_t *s);
struct stream *stream_create(const char *url, int flags, struct mpv_global *global);
struct stream *stream_create(const char *url, int flags,
struct mp_cancel *c, struct mpv_global *global);
struct stream *stream_open(const char *filename, struct mpv_global *global);
stream_t *open_output_stream(const char *filename, struct mpv_global *global);
stream_t *open_memory_stream(void *data, int len);
bool stream_check_interrupt(struct stream *s);
void mp_url_unescape_inplace(char *buf);
char *mp_url_escape(void *talloc_ctx, const char *s, const char *ok);
struct mp_cancel *mp_cancel_new(void *talloc_ctx);
void mp_cancel_trigger(struct mp_cancel *c);
bool mp_cancel_test(struct mp_cancel *c);
void mp_cancel_reset(struct mp_cancel *c);
// stream_file.c
char *mp_file_url_to_filename(void *talloc_ctx, bstr url);

View File

@ -138,7 +138,7 @@ static int control(stream_t *s, int cmd, void *arg)
static int interrupt_cb(void *ctx)
{
struct stream *stream = ctx;
return stream_check_interrupt(stream);
return mp_cancel_test(stream->cancel);
}
static const char * const prefix[] = { "lavf://", "ffmpeg://" };

View File

@ -96,8 +96,8 @@ static int rar_entry_open(stream_t *stream)
*name++ = '\0';
mp_url_unescape_inplace(base);
struct stream *rar =
stream_create(base, STREAM_READ | STREAM_NO_FILTERS, stream->global);
struct stream *rar = stream_create(base, STREAM_READ | STREAM_NO_FILTERS,
stream->cancel, stream->global);
if (!rar)
return STREAM_ERROR;
@ -126,6 +126,7 @@ static int rar_entry_open(stream_t *stream)
};
file->current_chunk = &dummy;
file->s = rar; // transfer ownership
file->cancel = stream->cancel;
file->global = stream->global;
RarSeek(file, 0);