summaryrefslogtreecommitdiffstats
path: root/xlators/features/qemu-block/src/coroutine-synctask.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/qemu-block/src/coroutine-synctask.c')
-rw-r--r--xlators/features/qemu-block/src/coroutine-synctask.c215
1 files changed, 59 insertions, 156 deletions
diff --git a/xlators/features/qemu-block/src/coroutine-synctask.c b/xlators/features/qemu-block/src/coroutine-synctask.c
index c3538f60e..e43988a95 100644
--- a/xlators/features/qemu-block/src/coroutine-synctask.c
+++ b/xlators/features/qemu-block/src/coroutine-synctask.c
@@ -22,192 +22,95 @@
#include "qemu-block-memory-types.h"
#include "qemu-block.h"
-#include "coroutine-synctask.h"
-void
-qemu_coroutine_delete (Coroutine *co_)
-{
- struct synctask *synctask = NULL;
- CoroutineSynctask *cs = NULL;
-
- cs = DO_UPCAST(CoroutineSynctask, base, co_);
- synctask = cs->synctask;
-
- cs->die = 1;
- synctask_wake (synctask);
-
- /* Do not free either @cs or @synctask here.
- @synctask is naturally destroyed when
- cs_proc() returns (after "break"ing out of
- the loop because of setting cs->die=1 above.
-
- We free @cs too just before returning from
- cs_proc()
- */
- return;
-}
-
-
-CoroutineAction
-qemu_coroutine_switch (Coroutine *from_, Coroutine *to_, CoroutineAction action)
+/*
+ * This code serves as the bridge from the main glusterfs context to the qemu
+ * coroutine context via synctask. We create a single threaded syncenv with a
+ * single synctask responsible for processing a queue of coroutines. The qemu
+ * code invoked from within the synctask function handlers uses the ucontext
+ * coroutine implementation and scheduling logic internal to qemu. This
+ * effectively donates a thread of execution to qemu and its internal coroutine
+ * management.
+ *
+ * NOTE: The existence of concurrent synctasks has proven quite racy with regard
+ * to qemu coroutine management, particularly related to the lifecycle
+ * differences with top-level synctasks and internally created coroutines and
+ * interactions with qemu-internal queues (and locks, in turn). We explicitly
+ * disallow this scenario, via the queue, until it is more well supported.
+ */
+
+static struct {
+ struct list_head queue;
+ gf_lock_t lock;
+ struct synctask *task;
+} qb_co;
+
+static void
+init_qbco()
{
- struct synctask *to = NULL;
- struct synctask *from = NULL;
- CoroutineSynctask *csto = NULL;
- CoroutineSynctask *csfrom = NULL;
-
- csto = DO_UPCAST(CoroutineSynctask, base, to_);
- csfrom = DO_UPCAST(CoroutineSynctask, base, from_);
- to = csto->synctask;
- from = csfrom->synctask;
-
- /* TODO: need mutex/cond guarding when making syncenv
- multithreaded
- */
- csfrom->run = false;
- csto->run = true;
-
- /* the next three lines must be in this specific order only */
- csfrom->action = action;
-
- synctask_wake (to);
-
- synctask_yield (from);
-
- /* the yielder set @action value in @csfrom, but for the
- resumer it is @csto
- */
- return csto->action;
+ INIT_LIST_HEAD(&qb_co.queue);
+ LOCK_INIT(&qb_co.lock);
}
-
-int
-cs_fin (int ret, call_frame_t *frame, void *opaque)
+static int
+synctask_nop_cbk (int ret, call_frame_t *frame, void *opaque)
{
- /* nop */
return 0;
}
-
static int
-cs_proc (void *opaque)
+qb_synctask_wrap (void *opaque)
{
- CoroutineSynctask *cs = opaque;
- struct synctask *synctask = NULL;
-
- synctask = synctask_get (); /* == cs->synctask */
+ qb_local_t *qb_local, *tmp;
- for (;;) {
- while (!cs->run && !cs->die)
- /* entry function (i.e cs->base.entry) will
- not be set just yet first time. Wait for
- caller to set it and call switch()
- */
- synctask_yield (synctask);
+ LOCK(&qb_co.lock);
- if (cs->die)
+ while (!list_empty(&qb_co.queue)) {
+ list_for_each_entry_safe(qb_local, tmp, &qb_co.queue, list) {
+ list_del_init(&qb_local->list);
break;
+ }
- cs->base.entry (cs->base.entry_arg);
- qemu_coroutine_switch (&cs->base, cs->base.caller,
- COROUTINE_TERMINATE);
- }
-
- GF_FREE (cs);
-
- return 0;
-}
-
-
-Coroutine *
-qemu_coroutine_new()
-{
- qb_conf_t *conf = NULL;
- CoroutineSynctask *cs = NULL;
- struct synctask *task = NULL;
-
- conf = THIS->private;
-
- cs = GF_CALLOC (1, sizeof (*cs), gf_qb_mt_coroutinesynctask_t);
- if (!cs)
- return NULL;
-
- task = synctask_get ();
- /* Inherit the frame from the parent synctask, as this will
- carry forward things like uid, gid, pid, lkowner etc. of the
- caller properly.
- */
- cs->synctask = synctask_create (conf->env, cs_proc, cs_fin,
- task ? task->frame : NULL, cs);
- if (!cs->synctask)
- return NULL;
-
- return &cs->base;
-}
-
+ UNLOCK(&qb_co.lock);
-Coroutine *
-qemu_coroutine_self()
-{
- struct synctask *synctask = NULL;
- CoroutineSynctask *cs = NULL;
-
- synctask = synctask_get();
-
- cs = synctask->opaque;
-
- return &cs->base;
-}
-
-
-bool
-qemu_in_coroutine ()
-{
- Coroutine *co = NULL;
+ qb_local->synctask_fn(qb_local);
+ /* qb_local is now unwound and gone! */
- co = qemu_coroutine_self ();
-
- return co && co->caller;
-}
+ LOCK(&qb_co.lock);
+ }
+ qb_co.task = NULL;
-/* These are calls for the "top" xlator to invoke/submit
- coroutines
-*/
+ UNLOCK(&qb_co.lock);
-static int
-synctask_nop_cbk (int ret, call_frame_t *frame, void *opaque)
-{
return 0;
}
-
-int
-qb_synctask_wrap (void *opaque)
-{
- struct synctask *task = NULL;
- CoroutineSynctask *cs = NULL;
- qb_local_t *qb_local = NULL;
-
- task = synctask_get ();
- cs = opaque;
- cs->synctask = task;
- qb_local = DO_UPCAST (qb_local_t, cs, cs);
-
- return qb_local->synctask_fn (opaque);
-}
-
-
int
qb_coroutine (call_frame_t *frame, synctask_fn_t fn)
{
qb_local_t *qb_local = NULL;
qb_conf_t *qb_conf = NULL;
+ static int init = 0;
qb_local = frame->local;
qb_local->synctask_fn = fn;
qb_conf = frame->this->private;
- return synctask_new (qb_conf->env, qb_synctask_wrap, synctask_nop_cbk,
- frame, &qb_local->cs);
+ if (!init) {
+ init = 1;
+ init_qbco();
+ }
+
+ LOCK(&qb_co.lock);
+
+ if (!qb_co.task)
+ qb_co.task = synctask_create(qb_conf->env, qb_synctask_wrap,
+ synctask_nop_cbk, frame, NULL);
+
+ list_add_tail(&qb_local->list, &qb_co.queue);
+
+ UNLOCK(&qb_co.lock);
+
+ return 0;
}