thread_tools: unify mp_cancel POSIX/win32 paths, add features

The OS specifics are merged because the resulting ifdeffery is not much
worse than the old ifdeffery, but the logic that is now shared is
becoming more complex.

Create all objects lazily. The intention is to make mp_cancel instances
cheaper. POSIX pipes and win32 Events are pretty heavy weight, and are
only needed in special situations.

Add a mechanism to "chain" mp_cancel instances. Needed by the later
commits for whatever reasons.

Untested on win32.
This commit is contained in:
wm4 2018-05-17 21:20:26 +02:00
parent 782e428284
commit a253c72dbb
2 changed files with 165 additions and 64 deletions

View File

@ -26,8 +26,10 @@
#endif
#include "common/common.h"
#include "misc/linked_list.h"
#include "osdep/atomic.h"
#include "osdep/io.h"
#include "osdep/timer.h"
#include "thread_tools.h"
@ -74,45 +76,118 @@ bool mp_waiter_poll(struct mp_waiter *waiter)
return r;
}
#ifndef __MINGW32__
struct mp_cancel {
pthread_mutex_t lock;
pthread_cond_t wakeup;
// Semaphore state and "mirrors".
atomic_bool triggered;
void (*cb)(void *ctx);
void *cb_ctx;
int wakeup_pipe[2];
void *win32_event; // actually HANDLE
// Slave list. These are automatically notified as well.
struct {
struct mp_cancel *head, *tail;
} slaves;
// For slaves. Synchronization is managed by parent.lock!
struct mp_cancel *parent;
struct {
struct mp_cancel *next, *prev;
} siblings;
};
static void cancel_destroy(void *p)
{
struct mp_cancel *c = p;
assert(!c->slaves.head); // API user error
// We can access c->parent without synchronization, because:
// - since c is being destroyed, nobody can explicitly remove it as slave
// at the same time
// - c->parent needs to stay valid as long as the slave exists
if (c->parent)
mp_cancel_remove_slave(c->parent, c);
if (c->wakeup_pipe[0] >= 0) {
close(c->wakeup_pipe[0]);
close(c->wakeup_pipe[1]);
}
#ifdef __MINGW32__
if (c->win32_event)
CloseHandle(c->win32_event);
#endif
pthread_mutex_destroy(&c->lock);
pthread_cond_destroy(&c->wakeup);
}
struct mp_cancel *mp_cancel_new(void *talloc_ctx)
{
struct mp_cancel *c = talloc_ptrtype(talloc_ctx, c);
talloc_set_destructor(c, cancel_destroy);
*c = (struct mp_cancel){.triggered = ATOMIC_VAR_INIT(false)};
mp_make_wakeup_pipe(c->wakeup_pipe);
*c = (struct mp_cancel){
.triggered = ATOMIC_VAR_INIT(false),
.wakeup_pipe = {-1, -1},
};
pthread_mutex_init(&c->lock, NULL);
pthread_cond_init(&c->wakeup, NULL);
return c;
}
static void trigger_locked(struct mp_cancel *c)
{
atomic_store(&c->triggered, true);
pthread_cond_broadcast(&c->wakeup); // condition bound to c->triggered
if (c->cb)
c->cb(c->cb_ctx);
for (struct mp_cancel *sub = c->slaves.head; sub; sub = sub->siblings.next)
mp_cancel_trigger(sub);
if (c->wakeup_pipe[1] >= 0)
(void)write(c->wakeup_pipe[1], &(char){0}, 1);
#ifdef __MINGW32__
if (c->win32_event)
SetEvent(c->win32_event);
#endif
}
void mp_cancel_trigger(struct mp_cancel *c)
{
atomic_store(&c->triggered, true);
(void)write(c->wakeup_pipe[1], &(char){0}, 1);
pthread_mutex_lock(&c->lock);
trigger_locked(c);
pthread_mutex_unlock(&c->lock);
}
void mp_cancel_reset(struct mp_cancel *c)
{
pthread_mutex_lock(&c->lock);
atomic_store(&c->triggered, false);
// Flush it fully.
while (1) {
int r = read(c->wakeup_pipe[0], &(char[256]){0}, 256);
if (r <= 0 && !(r < 0 && errno == EINTR))
break;
if (c->wakeup_pipe[0] >= 0) {
// Flush it fully.
while (1) {
int r = read(c->wakeup_pipe[0], &(char[256]){0}, 256);
if (r <= 0 && !(r < 0 && errno == EINTR))
break;
}
}
#ifdef __MINGW32__
if (c->win32_event)
ResetEvent(c->win32_event);
#endif
pthread_mutex_unlock(&c->lock);
}
bool mp_cancel_test(struct mp_cancel *c)
@ -122,69 +197,78 @@ bool mp_cancel_test(struct mp_cancel *c)
bool mp_cancel_wait(struct mp_cancel *c, double timeout)
{
struct pollfd fd = { .fd = c->wakeup_pipe[0], .events = POLLIN };
poll(&fd, 1, timeout * 1000);
return fd.revents & POLLIN;
struct timespec ts = mp_rel_time_to_timespec(timeout);
pthread_mutex_lock(&c->lock);
while (!mp_cancel_test(c)) {
if (pthread_cond_timedwait(&c->wakeup, &c->lock, &ts))
break;
}
pthread_mutex_unlock(&c->lock);
return mp_cancel_test(c);
}
// If a new notification mechanism was added, and the mp_cancel state was
// already triggered, make sure the newly added mechanism is also triggered.
static void retrigger_locked(struct mp_cancel *c)
{
if (mp_cancel_test(c))
trigger_locked(c);
}
void mp_cancel_set_cb(struct mp_cancel *c, void (*cb)(void *ctx), void *ctx)
{
pthread_mutex_lock(&c->lock);
c->cb = cb;
c->cb_ctx = ctx;
retrigger_locked(c);
pthread_mutex_unlock(&c->lock);
}
void mp_cancel_add_slave(struct mp_cancel *c, struct mp_cancel *slave)
{
pthread_mutex_lock(&c->lock);
assert(!slave->parent);
slave->parent = c;
LL_APPEND(siblings, &c->slaves, slave);
retrigger_locked(c);
pthread_mutex_unlock(&c->lock);
}
void mp_cancel_remove_slave(struct mp_cancel *c, struct mp_cancel *slave)
{
pthread_mutex_lock(&c->lock);
if (slave->parent) {
assert(slave->parent == c);
slave->parent = NULL;
LL_REMOVE(siblings, &c->slaves, slave);
}
pthread_mutex_unlock(&c->lock);
}
int mp_cancel_get_fd(struct mp_cancel *c)
{
pthread_mutex_lock(&c->lock);
if (c->wakeup_pipe[0] < 0) {
mp_make_wakeup_pipe(c->wakeup_pipe);
retrigger_locked(c);
}
pthread_mutex_unlock(&c->lock);
return c->wakeup_pipe[0];
}
#else
struct mp_cancel {
atomic_bool triggered;
HANDLE event;
};
static void cancel_destroy(void *p)
{
struct mp_cancel *c = p;
CloseHandle(c->event);
}
struct mp_cancel *mp_cancel_new(void *talloc_ctx)
{
struct mp_cancel *c = talloc_ptrtype(talloc_ctx, c);
talloc_set_destructor(c, cancel_destroy);
*c = (struct mp_cancel){.triggered = ATOMIC_VAR_INIT(false)};
c->event = CreateEventW(NULL, TRUE, FALSE, NULL);
return c;
}
void mp_cancel_trigger(struct mp_cancel *c)
{
atomic_store(&c->triggered, true);
SetEvent(c->event);
}
void mp_cancel_reset(struct mp_cancel *c)
{
atomic_store(&c->triggered, false);
ResetEvent(c->event);
}
bool mp_cancel_test(struct mp_cancel *c)
{
return c ? atomic_load_explicit(&c->triggered, memory_order_relaxed) : false;
}
bool mp_cancel_wait(struct mp_cancel *c, double timeout)
{
return WaitForSingleObject(c->event, timeout < 0 ? INFINITE : timeout * 1000)
== WAIT_OBJECT_0;
}
#ifdef __MINGW32__
void *mp_cancel_get_event(struct mp_cancel *c)
{
return c->event;
}
pthread_mutex_lock(&c->lock);
if (!c->win32_event) {
c->win32_event = CreateEventW(NULL, TRUE, FALSE, NULL);
retrigger_locked(c);
}
pthread_mutex_unlock(&c->lock);
int mp_cancel_get_fd(struct mp_cancel *c)
{
return -1;
return c->win32_event;
}
#endif

View File

@ -59,6 +59,23 @@ bool mp_cancel_wait(struct mp_cancel *c, double timeout);
// Restore original state. (Allows reusing a mp_cancel.)
void mp_cancel_reset(struct mp_cancel *c);
// Add a callback to invoke when mp_cancel gets triggered. If it's already
// triggered, call it from mp_cancel_add_cb() directly. May be called multiple
// times even if the trigger state changes; not called if it resets. In all
// cases, this may be called with internal locks held (either in mp_cancel, or
// other locks held by whoever calls mp_cancel_trigger()).
// There is only one callback. Create a slave mp_cancel to get a private one.
void mp_cancel_set_cb(struct mp_cancel *c, void (*cb)(void *ctx), void *ctx);
// If c gets triggered, automatically trigger slave. Trying to add a slave more
// than once or to multiple parents is undefined behavior.
// The parent mp_cancel must remain valid until the slave is manually removed
// or destroyed. Destroying a mp_cancel that still has slaves is an error.
void mp_cancel_add_slave(struct mp_cancel *c, struct mp_cancel *slave);
// Undo mp_cancel_add_slave(). Ignores never added slaves for easier cleanup.
void mp_cancel_remove_slave(struct mp_cancel *c, struct mp_cancel *slave);
// win32 "Event" HANDLE that indicates the current mp_cancel state.
void *mp_cancel_get_event(struct mp_cancel *c);