executor: introduce new executor API

Introduce a new API to execute "runnables" from background threads.

The final design is a result of discussions on the mailing-list:
 - <https://mailman.videolan.org/pipermail/vlc-devel/2020-August/136696.html>
 - <https://mailman.videolan.org/pipermail/vlc-devel/2020-September/136944.html>

Signed-off-by: Alexandre Janniaux <ajanni@videolabs.io>
This commit is contained in:
Romain Vimont 2020-09-01 18:09:09 +02:00 committed by Alexandre Janniaux
parent c2ca19baf8
commit fb48df6fdf
4 changed files with 478 additions and 0 deletions

183
include/vlc_executor.h Normal file
View File

@ -0,0 +1,183 @@
/*****************************************************************************
* vlc_executor.h
*****************************************************************************
* Copyright (C) 2020 Videolabs, VLC authors and VideoLAN
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 2.1 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
*****************************************************************************/
#ifndef VLC_EXECUTOR_H
#define VLC_EXECUTOR_H
#include <vlc_common.h>
#include <vlc_list.h>
# ifdef __cplusplus
extern "C" {
# endif
/** Executor type (opaque) */
typedef struct vlc_executor vlc_executor_t;
/**
* A Runnable encapsulates a task to be run from an executor thread.
*/
struct vlc_runnable {
/**
* This function is to be executed by a vlc_executor_t.
*
* It must implement the actions (arbitrarily long) to execute from an
* executor thread, synchronously. As soon as run() returns, the execution
* of this runnable is complete.
*
* After the runnable is submitted to an executor via
* vlc_executor_Submit(), the run() function is executed at most once (zero
* if the execution is canceled before it was started).
*
* It must not be NULL.
*
* \param userdata the userdata provided to vlc_executor_Submit()
*/
void (*run)(void *userdata);
/**
* Userdata passed back to run().
*/
void *userdata;
/* Private data used by the vlc_executor_t (do not touch) */
struct vlc_list node;
};
/**
* Create a new executor.
*
* \param max_threads the maximum number of threads used to execute runnables
* \return a pointer to a new executor, or NULL if an error occurred
*/
VLC_API vlc_executor_t *
vlc_executor_New(unsigned max_threads);
/**
* Delete an executor.
*
* Wait for all the threads to complete, and delete the executor instance.
*
* All submitted tasks must be either started or explicitly canceled. To wait
* for all tasks to complete, use vlc_executor_WaitIdle().
*
* It is an error to submit a new runnable after vlc_executor_Delete() is
* called. In particular, a running task must not submit a new runnable once
* deletion has been requested.
*
* \param executor the executor
*/
VLC_API void
vlc_executor_Delete(vlc_executor_t *executor);
/**
* Submit a runnable for execution.
*
* The struct vlc_runnable is not copied, it must exist until the end of the
* execution (the user is expected to embed it in its own task structure).
*
* Here is a simple example:
*
* \code{c}
* struct my_task {
* char *str;
* struct vlc_runnable runnable;
* };
*
* static void Run(void *userdata)
* {
* struct my_task *task = userdata;
*
* printf("start of %s\n", task->str);
* vlc_tick_sleep(VLC_TICK_FROM_SEC(3)); // long action
* printf("end of %s\n", task->str);
*
* free(task->str);
* free(task);
* }
*
* void foo(vlc_executor_t *executor, const char *str)
* {
* // no error handling for brevity
* struct my_task *task = malloc(sizeof(*task));
* task->str = strdup(str);
* task->runnable.run = Run;
* task->runnable.userdata = task;
* vlc_executor_Submit(executor, &task->runnable);
* }
* \endcode
*
* A runnable instance is intended to be submitted at most once. The caller is
* expected to allocate a new task structure (embedding the runnable) for every
* submission.
*
* More precisely, it is incorrect to submit a runnable already submitted that
* is still in the pending queue (i.e. not canceled or started). This is due to
* the intrusive linked list of runnables.
*
* It is strongly discouraged to submit a runnable that is currently running on
* the executor (unless you are prepared for the run() callback to be run
* several times in parallel).
*
* For simplicity, it is discouraged to submit a runnable previously submitted.
*
* \param executor the executor
* \param runnable the task to run
*/
VLC_API void
vlc_executor_Submit(vlc_executor_t *executor, struct vlc_runnable *runnable);
/**
* Cancel a runnable previously submitted.
*
* If this runnable is still queued (i.e. it has not be run yet), then dequeue
* it so that it will never be run, and return true.
*
* Otherwise, this runnable has already been taken by an executor thread (it is
* still running or is complete). In that case, do nothing, and return false.
*
* This is an error to pass a runnable not submitted to this executor (the
* result is undefined in that case).
*
* Note that the runnable instance is owned by the caller, so the executor will
* never attempt to free it.
*
* \param executor the executor
* \param runnable the task to cancel
* \retval true if the runnable has been canceled before execution
* \retval false if the runnable has not been canceled
*/
VLC_API bool
vlc_executor_Cancel(vlc_executor_t *executor, struct vlc_runnable *runnable);
/**
* Wait until all submitted tasks are completed or canceled.
*
* \param executor the executor
*/
VLC_API void
vlc_executor_WaitIdle(vlc_executor_t *executor);
# ifdef __cplusplus
}
# endif
#endif

View File

@ -49,6 +49,7 @@ pluginsinclude_HEADERS = \
../include/vlc_es.h \
../include/vlc_es_out.h \
../include/vlc_events.h \
../include/vlc_executor.h \
../include/vlc_filter.h \
../include/vlc_fingerprinter.h \
../include/vlc_fourcc.h \
@ -353,6 +354,7 @@ libvlccore_la_SOURCES = \
misc/actions.c \
misc/background_worker.c \
misc/background_worker.h \
misc/executor.c \
misc/md5.c \
misc/probe.c \
misc/rand.c \

View File

@ -976,3 +976,8 @@ vlc_video_context_GetType
vlc_video_context_GetPrivate
vlc_video_context_Hold
vlc_video_context_HoldDevice
vlc_executor_New
vlc_executor_Delete
vlc_executor_Submit
vlc_executor_Cancel
vlc_executor_WaitIdle

288
src/misc/executor.c Normal file
View File

@ -0,0 +1,288 @@
/*****************************************************************************
* misc/executor.c
*****************************************************************************
* Copyright (C) 2020 Videolabs, VLC authors and VideoLAN
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 2.1 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
*****************************************************************************/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <vlc_executor.h>
#include <vlc_atomic.h>
#include <vlc_list.h>
#include <vlc_threads.h>
#include "libvlc.h"
/**
* An executor can spawn several threads.
*
* This structure contains the data specific to one thread.
*/
struct vlc_executor_thread {
/** Node of vlc_executor.threads list */
struct vlc_list node;
/** The executor owning the thread */
vlc_executor_t *owner;
/** The system thread */
vlc_thread_t thread;
/** The current task executed by the thread, NULL if none */
struct vlc_runnable *current_task;
};
/**
* The executor (also vlc_executor_t, exposed as opaque type in the public
* header).
*/
struct vlc_executor {
vlc_mutex_t lock;
/** Maximum number of threads to run the tasks */
unsigned max_threads;
/** List of active vlc_executor_thread */
struct vlc_list threads;
/** Thread count (in a separate field to quickly compare to max_threads) */
unsigned nthreads;
/* Number of tasks requested but not finished. */
unsigned unfinished;
/** Wait for the executor to be idle (i.e. unfinished == 0) */
vlc_cond_t idle_wait;
/** Queue of vlc_runnable */
struct vlc_list queue;
/** Wait for the queue to be non-empty */
vlc_cond_t queue_wait;
/** True if executor deletion is requested */
bool closing;
};
static void
QueuePush(vlc_executor_t *executor, struct vlc_runnable *runnable)
{
vlc_mutex_assert(&executor->lock);
vlc_list_append(&runnable->node, &executor->queue);
vlc_cond_signal(&executor->queue_wait);
}
static struct vlc_runnable *
QueueTake(vlc_executor_t *executor)
{
vlc_mutex_assert(&executor->lock);
while (!executor->closing && vlc_list_is_empty(&executor->queue))
vlc_cond_wait(&executor->queue_wait, &executor->lock);
if (executor->closing)
return NULL;
struct vlc_runnable *runnable =
vlc_list_first_entry_or_null(&executor->queue, struct vlc_runnable,
node);
assert(runnable);
vlc_list_remove(&runnable->node);
/* Set links to NULL to know that it has been taken by a thread in
* vlc_executor_Cancel() */
runnable->node.prev = runnable->node.next = NULL;
return runnable;
}
static void *
ThreadRun(void *userdata)
{
struct vlc_executor_thread *thread = userdata;
vlc_executor_t *executor = thread->owner;
vlc_mutex_lock(&executor->lock);
struct vlc_runnable *runnable;
/* When the executor is closing, QueueTake() returns NULL */
while ((runnable = QueueTake(executor)))
{
thread->current_task = runnable;
vlc_mutex_unlock(&executor->lock);
/* Execute the user-provided runnable, without the executor lock */
runnable->run(runnable->userdata);
vlc_mutex_lock(&executor->lock);
thread->current_task = NULL;
assert(executor->unfinished > 0);
--executor->unfinished;
if (!executor->unfinished)
vlc_cond_signal(&executor->idle_wait);
}
vlc_mutex_unlock(&executor->lock);
return NULL;
}
static int
SpawnThread(vlc_executor_t *executor)
{
assert(executor->nthreads < executor->max_threads);
struct vlc_executor_thread *thread = malloc(sizeof(*thread));
if (!thread)
return VLC_ENOMEM;
thread->owner = executor;
thread->current_task = NULL;
if (vlc_clone(&thread->thread, ThreadRun, thread, VLC_THREAD_PRIORITY_LOW))
{
free(thread);
return VLC_EGENERIC;
}
executor->nthreads++;
vlc_list_append(&thread->node, &executor->threads);
return VLC_SUCCESS;
}
vlc_executor_t *
vlc_executor_New(unsigned max_threads)
{
assert(max_threads);
vlc_executor_t *executor = malloc(sizeof(*executor));
if (!executor)
return NULL;
vlc_mutex_init(&executor->lock);
executor->max_threads = max_threads;
executor->nthreads = 0;
executor->unfinished = 0;
vlc_list_init(&executor->threads);
vlc_list_init(&executor->queue);
vlc_cond_init(&executor->idle_wait);
vlc_cond_init(&executor->queue_wait);
executor->closing = false;
/* Create one thread on init so that vlc_executor_Submit() may never fail */
int ret = SpawnThread(executor);
if (ret != VLC_SUCCESS)
{
free(executor);
return NULL;
}
return executor;
}
void
vlc_executor_Submit(vlc_executor_t *executor, struct vlc_runnable *runnable)
{
vlc_mutex_lock(&executor->lock);
assert(!executor->closing);
QueuePush(executor, runnable);
if (++executor->unfinished > executor->nthreads
&& executor->nthreads < executor->max_threads)
/* If it fails, this is not an error, there is at least one thread */
SpawnThread(executor);
vlc_mutex_unlock(&executor->lock);
}
bool
vlc_executor_Cancel(vlc_executor_t *executor, struct vlc_runnable *runnable)
{
vlc_mutex_lock(&executor->lock);
/* Either both prev and next are set, either both are NULL */
assert(!runnable->node.prev == !runnable->node.next);
bool in_queue = runnable->node.prev;
if (in_queue)
{
vlc_list_remove(&runnable->node);
assert(executor->unfinished > 0);
--executor->unfinished;
if (!executor->unfinished)
vlc_cond_signal(&executor->idle_wait);
}
vlc_mutex_unlock(&executor->lock);
return in_queue;
}
void
vlc_executor_WaitIdle(vlc_executor_t *executor)
{
vlc_mutex_lock(&executor->lock);
while (executor->unfinished)
vlc_cond_wait(&executor->idle_wait, &executor->lock);
vlc_mutex_unlock(&executor->lock);
}
void
vlc_executor_Delete(vlc_executor_t *executor)
{
vlc_mutex_lock(&executor->lock);
executor->closing = true;
/* All the tasks must be canceled on delete */
assert(vlc_list_is_empty(&executor->queue));
vlc_mutex_unlock(&executor->lock);
/* "closing" is now true, this will wake up threads */
vlc_cond_broadcast(&executor->queue_wait);
/* The threads list may not be written at this point, so it is safe to read
* it without mutex locked (the mutex must be released to join the
* threads). */
struct vlc_executor_thread *thread;
vlc_list_foreach(thread, &executor->threads, node)
{
vlc_join(thread->thread, NULL);
free(thread);
}
/* The queue must still be empty (no runnable submitted a new runnable) */
assert(vlc_list_is_empty(&executor->queue));
/* There are no tasks anymore */
assert(!executor->unfinished);
free(executor);
}