diff options
Diffstat (limited to 'xlators/features/qemu-block/src/coroutine-synctask.c')
-rw-r--r-- | xlators/features/qemu-block/src/coroutine-synctask.c | 215 |
1 files changed, 59 insertions, 156 deletions
diff --git a/xlators/features/qemu-block/src/coroutine-synctask.c b/xlators/features/qemu-block/src/coroutine-synctask.c index c3538f60e25..e43988a953f 100644 --- a/xlators/features/qemu-block/src/coroutine-synctask.c +++ b/xlators/features/qemu-block/src/coroutine-synctask.c @@ -22,192 +22,95 @@ #include "qemu-block-memory-types.h" #include "qemu-block.h" -#include "coroutine-synctask.h" -void -qemu_coroutine_delete (Coroutine *co_) -{ - struct synctask *synctask = NULL; - CoroutineSynctask *cs = NULL; - - cs = DO_UPCAST(CoroutineSynctask, base, co_); - synctask = cs->synctask; - - cs->die = 1; - synctask_wake (synctask); - - /* Do not free either @cs or @synctask here. - @synctask is naturally destroyed when - cs_proc() returns (after "break"ing out of - the loop because of setting cs->die=1 above. - - We free @cs too just before returning from - cs_proc() - */ - return; -} - - -CoroutineAction -qemu_coroutine_switch (Coroutine *from_, Coroutine *to_, CoroutineAction action) +/* + * This code serves as the bridge from the main glusterfs context to the qemu + * coroutine context via synctask. We create a single threaded syncenv with a + * single synctask responsible for processing a queue of coroutines. The qemu + * code invoked from within the synctask function handlers uses the ucontext + * coroutine implementation and scheduling logic internal to qemu. This + * effectively donates a thread of execution to qemu and its internal coroutine + * management. + * + * NOTE: The existence of concurrent synctasks has proven quite racy with regard + * to qemu coroutine management, particularly related to the lifecycle + * differences with top-level synctasks and internally created coroutines and + * interactions with qemu-internal queues (and locks, in turn). We explicitly + * disallow this scenario, via the queue, until it is more well supported. + */ + +static struct { + struct list_head queue; + gf_lock_t lock; + struct synctask *task; +} qb_co; + +static void +init_qbco() { - struct synctask *to = NULL; - struct synctask *from = NULL; - CoroutineSynctask *csto = NULL; - CoroutineSynctask *csfrom = NULL; - - csto = DO_UPCAST(CoroutineSynctask, base, to_); - csfrom = DO_UPCAST(CoroutineSynctask, base, from_); - to = csto->synctask; - from = csfrom->synctask; - - /* TODO: need mutex/cond guarding when making syncenv - multithreaded - */ - csfrom->run = false; - csto->run = true; - - /* the next three lines must be in this specific order only */ - csfrom->action = action; - - synctask_wake (to); - - synctask_yield (from); - - /* the yielder set @action value in @csfrom, but for the - resumer it is @csto - */ - return csto->action; + INIT_LIST_HEAD(&qb_co.queue); + LOCK_INIT(&qb_co.lock); } - -int -cs_fin (int ret, call_frame_t *frame, void *opaque) +static int +synctask_nop_cbk (int ret, call_frame_t *frame, void *opaque) { - /* nop */ return 0; } - static int -cs_proc (void *opaque) +qb_synctask_wrap (void *opaque) { - CoroutineSynctask *cs = opaque; - struct synctask *synctask = NULL; - - synctask = synctask_get (); /* == cs->synctask */ + qb_local_t *qb_local, *tmp; - for (;;) { - while (!cs->run && !cs->die) - /* entry function (i.e cs->base.entry) will - not be set just yet first time. Wait for - caller to set it and call switch() - */ - synctask_yield (synctask); + LOCK(&qb_co.lock); - if (cs->die) + while (!list_empty(&qb_co.queue)) { + list_for_each_entry_safe(qb_local, tmp, &qb_co.queue, list) { + list_del_init(&qb_local->list); break; + } - cs->base.entry (cs->base.entry_arg); - qemu_coroutine_switch (&cs->base, cs->base.caller, - COROUTINE_TERMINATE); - } - - GF_FREE (cs); - - return 0; -} - - -Coroutine * -qemu_coroutine_new() -{ - qb_conf_t *conf = NULL; - CoroutineSynctask *cs = NULL; - struct synctask *task = NULL; - - conf = THIS->private; - - cs = GF_CALLOC (1, sizeof (*cs), gf_qb_mt_coroutinesynctask_t); - if (!cs) - return NULL; - - task = synctask_get (); - /* Inherit the frame from the parent synctask, as this will - carry forward things like uid, gid, pid, lkowner etc. of the - caller properly. - */ - cs->synctask = synctask_create (conf->env, cs_proc, cs_fin, - task ? task->frame : NULL, cs); - if (!cs->synctask) - return NULL; - - return &cs->base; -} - + UNLOCK(&qb_co.lock); -Coroutine * -qemu_coroutine_self() -{ - struct synctask *synctask = NULL; - CoroutineSynctask *cs = NULL; - - synctask = synctask_get(); - - cs = synctask->opaque; - - return &cs->base; -} - - -bool -qemu_in_coroutine () -{ - Coroutine *co = NULL; + qb_local->synctask_fn(qb_local); + /* qb_local is now unwound and gone! */ - co = qemu_coroutine_self (); - - return co && co->caller; -} + LOCK(&qb_co.lock); + } + qb_co.task = NULL; -/* These are calls for the "top" xlator to invoke/submit - coroutines -*/ + UNLOCK(&qb_co.lock); -static int -synctask_nop_cbk (int ret, call_frame_t *frame, void *opaque) -{ return 0; } - -int -qb_synctask_wrap (void *opaque) -{ - struct synctask *task = NULL; - CoroutineSynctask *cs = NULL; - qb_local_t *qb_local = NULL; - - task = synctask_get (); - cs = opaque; - cs->synctask = task; - qb_local = DO_UPCAST (qb_local_t, cs, cs); - - return qb_local->synctask_fn (opaque); -} - - int qb_coroutine (call_frame_t *frame, synctask_fn_t fn) { qb_local_t *qb_local = NULL; qb_conf_t *qb_conf = NULL; + static int init = 0; qb_local = frame->local; qb_local->synctask_fn = fn; qb_conf = frame->this->private; - return synctask_new (qb_conf->env, qb_synctask_wrap, synctask_nop_cbk, - frame, &qb_local->cs); + if (!init) { + init = 1; + init_qbco(); + } + + LOCK(&qb_co.lock); + + if (!qb_co.task) + qb_co.task = synctask_create(qb_conf->env, qb_synctask_wrap, + synctask_nop_cbk, frame, NULL); + + list_add_tail(&qb_local->list, &qb_co.queue); + + UNLOCK(&qb_co.lock); + + return 0; } |