diff --git a/misc/thread_tools.c b/misc/thread_tools.c index b11ffd07e4..ecf6bc2381 100644 --- a/misc/thread_tools.c +++ b/misc/thread_tools.c @@ -26,8 +26,10 @@ #endif #include "common/common.h" +#include "misc/linked_list.h" #include "osdep/atomic.h" #include "osdep/io.h" +#include "osdep/timer.h" #include "thread_tools.h" @@ -74,45 +76,118 @@ bool mp_waiter_poll(struct mp_waiter *waiter) return r; } -#ifndef __MINGW32__ struct mp_cancel { + pthread_mutex_t lock; + pthread_cond_t wakeup; + + // Semaphore state and "mirrors". atomic_bool triggered; + void (*cb)(void *ctx); + void *cb_ctx; int wakeup_pipe[2]; + void *win32_event; // actually HANDLE + + // Slave list. These are automatically notified as well. + struct { + struct mp_cancel *head, *tail; + } slaves; + + // For slaves. Synchronization is managed by parent.lock! + struct mp_cancel *parent; + struct { + struct mp_cancel *next, *prev; + } siblings; }; static void cancel_destroy(void *p) { struct mp_cancel *c = p; + + assert(!c->slaves.head); // API user error + + // We can access c->parent without synchronization, because: + // - since c is being destroyed, nobody can explicitly remove it as slave + // at the same time + // - c->parent needs to stay valid as long as the slave exists + if (c->parent) + mp_cancel_remove_slave(c->parent, c); + if (c->wakeup_pipe[0] >= 0) { close(c->wakeup_pipe[0]); close(c->wakeup_pipe[1]); } + +#ifdef __MINGW32__ + if (c->win32_event) + CloseHandle(c->win32_event); +#endif + + pthread_mutex_destroy(&c->lock); + pthread_cond_destroy(&c->wakeup); } struct mp_cancel *mp_cancel_new(void *talloc_ctx) { struct mp_cancel *c = talloc_ptrtype(talloc_ctx, c); talloc_set_destructor(c, cancel_destroy); - *c = (struct mp_cancel){.triggered = ATOMIC_VAR_INIT(false)}; - mp_make_wakeup_pipe(c->wakeup_pipe); + *c = (struct mp_cancel){ + .triggered = ATOMIC_VAR_INIT(false), + .wakeup_pipe = {-1, -1}, + }; + pthread_mutex_init(&c->lock, NULL); + pthread_cond_init(&c->wakeup, NULL); return c; } +static void trigger_locked(struct mp_cancel *c) +{ + atomic_store(&c->triggered, true); + + pthread_cond_broadcast(&c->wakeup); // condition bound to c->triggered + + if (c->cb) + c->cb(c->cb_ctx); + + for (struct mp_cancel *sub = c->slaves.head; sub; sub = sub->siblings.next) + mp_cancel_trigger(sub); + + if (c->wakeup_pipe[1] >= 0) + (void)write(c->wakeup_pipe[1], &(char){0}, 1); + +#ifdef __MINGW32__ + if (c->win32_event) + SetEvent(c->win32_event); +#endif +} + void mp_cancel_trigger(struct mp_cancel *c) { - atomic_store(&c->triggered, true); - (void)write(c->wakeup_pipe[1], &(char){0}, 1); + pthread_mutex_lock(&c->lock); + trigger_locked(c); + pthread_mutex_unlock(&c->lock); } void mp_cancel_reset(struct mp_cancel *c) { + pthread_mutex_lock(&c->lock); + atomic_store(&c->triggered, false); - // Flush it fully. - while (1) { - int r = read(c->wakeup_pipe[0], &(char[256]){0}, 256); - if (r <= 0 && !(r < 0 && errno == EINTR)) - break; + + if (c->wakeup_pipe[0] >= 0) { + // Flush it fully. + while (1) { + int r = read(c->wakeup_pipe[0], &(char[256]){0}, 256); + if (r <= 0 && !(r < 0 && errno == EINTR)) + break; + } } + +#ifdef __MINGW32__ + if (c->win32_event) + ResetEvent(c->win32_event); +#endif + + pthread_mutex_unlock(&c->lock); } bool mp_cancel_test(struct mp_cancel *c) @@ -122,69 +197,78 @@ bool mp_cancel_test(struct mp_cancel *c) bool mp_cancel_wait(struct mp_cancel *c, double timeout) { - struct pollfd fd = { .fd = c->wakeup_pipe[0], .events = POLLIN }; - poll(&fd, 1, timeout * 1000); - return fd.revents & POLLIN; + struct timespec ts = mp_rel_time_to_timespec(timeout); + pthread_mutex_lock(&c->lock); + while (!mp_cancel_test(c)) { + if (pthread_cond_timedwait(&c->wakeup, &c->lock, &ts)) + break; + } + pthread_mutex_unlock(&c->lock); + + return mp_cancel_test(c); +} + +// If a new notification mechanism was added, and the mp_cancel state was +// already triggered, make sure the newly added mechanism is also triggered. +static void retrigger_locked(struct mp_cancel *c) +{ + if (mp_cancel_test(c)) + trigger_locked(c); +} + +void mp_cancel_set_cb(struct mp_cancel *c, void (*cb)(void *ctx), void *ctx) +{ + pthread_mutex_lock(&c->lock); + c->cb = cb; + c->cb_ctx = ctx; + retrigger_locked(c); + pthread_mutex_unlock(&c->lock); +} + +void mp_cancel_add_slave(struct mp_cancel *c, struct mp_cancel *slave) +{ + pthread_mutex_lock(&c->lock); + assert(!slave->parent); + slave->parent = c; + LL_APPEND(siblings, &c->slaves, slave); + retrigger_locked(c); + pthread_mutex_unlock(&c->lock); +} + +void mp_cancel_remove_slave(struct mp_cancel *c, struct mp_cancel *slave) +{ + pthread_mutex_lock(&c->lock); + if (slave->parent) { + assert(slave->parent == c); + slave->parent = NULL; + LL_REMOVE(siblings, &c->slaves, slave); + } + pthread_mutex_unlock(&c->lock); } int mp_cancel_get_fd(struct mp_cancel *c) { + pthread_mutex_lock(&c->lock); + if (c->wakeup_pipe[0] < 0) { + mp_make_wakeup_pipe(c->wakeup_pipe); + retrigger_locked(c); + } + pthread_mutex_unlock(&c->lock); + + return c->wakeup_pipe[0]; } -#else - -struct mp_cancel { - atomic_bool triggered; - HANDLE event; -}; - -static void cancel_destroy(void *p) -{ - struct mp_cancel *c = p; - CloseHandle(c->event); -} - -struct mp_cancel *mp_cancel_new(void *talloc_ctx) -{ - struct mp_cancel *c = talloc_ptrtype(talloc_ctx, c); - talloc_set_destructor(c, cancel_destroy); - *c = (struct mp_cancel){.triggered = ATOMIC_VAR_INIT(false)}; - c->event = CreateEventW(NULL, TRUE, FALSE, NULL); - return c; -} - -void mp_cancel_trigger(struct mp_cancel *c) -{ - atomic_store(&c->triggered, true); - SetEvent(c->event); -} - -void mp_cancel_reset(struct mp_cancel *c) -{ - atomic_store(&c->triggered, false); - ResetEvent(c->event); -} - -bool mp_cancel_test(struct mp_cancel *c) -{ - return c ? atomic_load_explicit(&c->triggered, memory_order_relaxed) : false; -} - -bool mp_cancel_wait(struct mp_cancel *c, double timeout) -{ - return WaitForSingleObject(c->event, timeout < 0 ? INFINITE : timeout * 1000) - == WAIT_OBJECT_0; -} - +#ifdef __MINGW32__ void *mp_cancel_get_event(struct mp_cancel *c) { - return c->event; -} + pthread_mutex_lock(&c->lock); + if (!c->win32_event) { + c->win32_event = CreateEventW(NULL, TRUE, FALSE, NULL); + retrigger_locked(c); + } + pthread_mutex_unlock(&c->lock); -int mp_cancel_get_fd(struct mp_cancel *c) -{ - return -1; + return c->win32_event; } - #endif diff --git a/misc/thread_tools.h b/misc/thread_tools.h index a734ac85b0..2198181e6c 100644 --- a/misc/thread_tools.h +++ b/misc/thread_tools.h @@ -59,6 +59,23 @@ bool mp_cancel_wait(struct mp_cancel *c, double timeout); // Restore original state. (Allows reusing a mp_cancel.) void mp_cancel_reset(struct mp_cancel *c); +// Add a callback to invoke when mp_cancel gets triggered. If it's already +// triggered, call it from mp_cancel_add_cb() directly. May be called multiple +// times even if the trigger state changes; not called if it resets. In all +// cases, this may be called with internal locks held (either in mp_cancel, or +// other locks held by whoever calls mp_cancel_trigger()). +// There is only one callback. Create a slave mp_cancel to get a private one. +void mp_cancel_set_cb(struct mp_cancel *c, void (*cb)(void *ctx), void *ctx); + +// If c gets triggered, automatically trigger slave. Trying to add a slave more +// than once or to multiple parents is undefined behavior. +// The parent mp_cancel must remain valid until the slave is manually removed +// or destroyed. Destroying a mp_cancel that still has slaves is an error. +void mp_cancel_add_slave(struct mp_cancel *c, struct mp_cancel *slave); + +// Undo mp_cancel_add_slave(). Ignores never added slaves for easier cleanup. +void mp_cancel_remove_slave(struct mp_cancel *c, struct mp_cancel *slave); + // win32 "Event" HANDLE that indicates the current mp_cancel state. void *mp_cancel_get_event(struct mp_cancel *c);