demux: add a way to destroy the demuxer asynchronously

This will enable the player core to terminate the demuxers in a "nicer"
way without having to block on network. If it just used demux_free(), it
would either have to block on network, or like currently, essentially
kill all I/O forcefully.

The API is slightly awkward, because demuxer lifetime is bound to its
allocation. On the other hand, changing that would also be awkward, and
introduce weird in-between states that would have to be handled in tons
of places.

Currently unused, to be user later.
This commit is contained in:
wm4 2018-05-19 18:19:07 +02:00
parent ee88ae15b3
commit c24520b7f3
2 changed files with 96 additions and 11 deletions

View File

@ -142,6 +142,7 @@ struct demux_internal {
bool thread_terminate;
bool threading;
bool shutdown_async;
void (*wakeup_cb)(void *ctx);
void *wakeup_cb_ctx;
@ -945,6 +946,32 @@ int demux_get_num_stream(struct demuxer *demuxer)
return r;
}
static void demux_shutdown(struct demux_internal *in)
{
struct demuxer *demuxer = in->d_user;
if (demuxer->desc->close)
demuxer->desc->close(in->d_thread);
demuxer->priv = NULL;
in->d_thread->priv = NULL;
demux_flush(demuxer);
assert(in->total_bytes == 0);
if (in->owns_stream)
free_stream(demuxer->stream);
demuxer->stream = NULL;
}
static void demux_dealloc(struct demux_internal *in)
{
for (int n = 0; n < in->num_streams; n++)
talloc_free(in->streams[n]);
pthread_mutex_destroy(&in->lock);
pthread_cond_destroy(&in->wakeup);
talloc_free(in->d_user);
}
void demux_free(struct demuxer *demuxer)
{
if (!demuxer)
@ -953,21 +980,63 @@ void demux_free(struct demuxer *demuxer)
assert(demuxer == in->d_user);
demux_stop_thread(demuxer);
demux_shutdown(in);
demux_dealloc(in);
}
if (demuxer->desc->close)
demuxer->desc->close(in->d_thread);
// Start closing the demuxer and eventually freeing the demuxer asynchronously.
// You must not access the demuxer once this has been started. Once the demuxer
// is shutdown, the wakeup callback is invoked. Then you need to call
// demux_free_async_finish() to end the operation (it must not be called from
// the wakeup callback).
// This can return NULL. Then the demuxer cannot be free'd asynchronously, and
// you need to call demux_free() instead.
struct demux_free_async_state *demux_free_async(struct demuxer *demuxer)
{
struct demux_internal *in = demuxer->in;
assert(demuxer == in->d_user);
demux_flush(demuxer);
assert(in->total_bytes == 0);
if (!in->threading)
return NULL;
if (in->owns_stream)
free_stream(demuxer->stream);
pthread_mutex_lock(&in->lock);
in->thread_terminate = true;
in->shutdown_async = true;
pthread_cond_signal(&in->wakeup);
pthread_mutex_unlock(&in->lock);
for (int n = 0; n < in->num_streams; n++)
talloc_free(in->streams[n]);
pthread_mutex_destroy(&in->lock);
pthread_cond_destroy(&in->wakeup);
talloc_free(demuxer);
return (struct demux_free_async_state *)demuxer->in; // lies
}
// As long as state is valid, you can call this to request immediate abort.
// Roughly behaves as demux_cancel_and_free(), except you still need to wait
// for the result.
void demux_free_async_force(struct demux_free_async_state *state)
{
struct demux_internal *in = (struct demux_internal *)state; // reverse lies
mp_cancel_trigger(in->d_user->cancel);
}
// Check whether the demuxer is shutdown yet. If not, return false, and you
// need to call this again in the future (preferably after you were notified by
// the wakeup callback). If yes, deallocate all state, and return true (in
// particular, the state ptr becomes invalid, and the wakeup callback will never
// be called again).
bool demux_free_async_finish(struct demux_free_async_state *state)
{
struct demux_internal *in = (struct demux_internal *)state; // reverse lies
pthread_mutex_lock(&in->lock);
bool busy = in->shutdown_async;
pthread_mutex_unlock(&in->lock);
if (busy)
return false;
demux_stop_thread(in->d_user);
demux_dealloc(in);
return true;
}
// Like demux_free(), but trigger an abort, which will force the demuxer to
@ -1688,12 +1757,23 @@ static void *demux_thread(void *pctx)
struct demux_internal *in = pctx;
mpthread_set_name("demux");
pthread_mutex_lock(&in->lock);
while (!in->thread_terminate) {
if (thread_work(in))
continue;
pthread_cond_signal(&in->wakeup);
pthread_cond_wait(&in->wakeup, &in->lock);
}
if (in->shutdown_async) {
pthread_mutex_unlock(&in->lock);
demux_shutdown(in);
pthread_mutex_lock(&in->lock);
in->shutdown_async = false;
if (in->wakeup_cb)
in->wakeup_cb(in->wakeup_cb_ctx);
}
pthread_mutex_unlock(&in->lock);
return NULL;
}

View File

@ -253,6 +253,11 @@ typedef struct {
void demux_free(struct demuxer *demuxer);
void demux_cancel_and_free(struct demuxer *demuxer);
struct demux_free_async_state;
struct demux_free_async_state *demux_free_async(struct demuxer *demuxer);
void demux_free_async_force(struct demux_free_async_state *state);
bool demux_free_async_finish(struct demux_free_async_state *state);
void demux_add_packet(struct sh_stream *stream, demux_packet_t *dp);
void demuxer_feed_caption(struct sh_stream *stream, demux_packet_t *dp);