summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPavan Sondur <pavan@gluster.com>2010-07-25 18:13:08 +0000
committerAnand V. Avati <avati@dev.gluster.com>2010-07-28 23:58:13 -0700
commitfac3ff8bfb3958a3bdc34dc9bff7cb281597e40f (patch)
treeed16cfdbdb386ef81f957533f9d563076da953c7
parentcadd1256355f4e1a5bd43c3a71dbd6cb97dff66d (diff)
syncop: initial implementation
Resending Avati's syncop patch with a few bug fixes. (please do not skip the IMPORTANT NOTES section) * Framework for SYNChronous OPerations -------------------------------------- This patch provides a framework for performing synchronous operations over the underlying actual asynchronous GlusterFS FOPS. * Use cases ----------- 1. Convenient implementation of crawler thread in replicate/pump 2. Convenient implementation of high level control flow in DVM * Background ------------ All (almost) threads in GlusterFS are hosts for executing aysnchronous file operations using the STACK_WIND and STACK_UNWIND primitives - as calls and callbacks. While the STACK_WIND and STACK_UNWIND macros provide high control for efficiently implementing file operations in a clustered/parallel environment, there are tasks where the nature of the task itself is sequential and the execution performance of the task is not critical. In these cases the complexity to implement the task with STACK_WIND/STACK_UNWIND based operations as calls and callbacks is an overkill. * Introduction --------------- syncop: are wrappers around the STACK_WIND/STACK_UNWIND based asynchronous fops. synctask: a sequential task (a C function) which uses syncops. syncenv: an environement to schedule and execute synctasks. The synchronicity is implemented via ucontext.h based continuations. Execution of synchronous tasks is possible only in a synchronous environment. Therefore, the first step is to create such an environment - struct syncenv *env = syncenv_new (0); This creates a synchronous environment, with a thread (scheduler) to host the synchronous tasks. Creation of this environment is generally to be done at the time of process initialization. Next is to spawn a synchronous task in this environment - int slow_self_heal (void *data); int completion_func (int ret, void *data); ret = synctask_new (env, slow_self_heal, completion_func, data); Here slow_self_heal is a task which is implemented using synchronous operations. When slow_self_heal() completes, completion_func() is called with the first parameter as the return value of slow_self_heal(). Both these functions get the @data argument as the same value passed to synctask_new(). int slow_self_heal (void *data) { xlator_t *child = FIRST_CHILD (THIS); fd_t *dir = NULL; ... dir = syncop_opendir (child, loc); entry = syncop_readdir (dir); ... return ret; } * IMPORTANT NOTES ----------------- - calling syncops in code executing outside the synchronous environment will very likely cause and undesired blocking of the executing thread leading to deadlocks!! The synchronous environment is a special thread where such sleeps are safe, and these sleeps result in the scheduler to 'swap in' other synctasks. - syncops can put the task to sleep. DO NOT issue syncops while holding mutexes. This is very similar to the blunder of holding a mutex and doing STACK_WIND. - It works best when synctasks use only syncops. If a call_frame is created and STACK_WIND'ed, the callback would very likely happen in a thread outside the synchronous enviroment, at an undefined time - as expected. So note that the synchronous environment does not tame the notorious behaviour of STACK_WIND. Signed-off-by: Anand V. Avati <avati@blackhole.gluster.com> Signed-off-by: Anand V. Avati <avati@dev.gluster.com> BUG: 971 (dynamic volume management) URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=971
-rw-r--r--libglusterfs/src/Makefile.am4
-rw-r--r--libglusterfs/src/globals.c43
-rw-r--r--libglusterfs/src/globals.h5
-rw-r--r--libglusterfs/src/syncop.c350
-rw-r--r--libglusterfs/src/syncop.h164
5 files changed, 564 insertions, 2 deletions
diff --git a/libglusterfs/src/Makefile.am b/libglusterfs/src/Makefile.am
index 7a13d6955dd..f6c2cf8555b 100644
--- a/libglusterfs/src/Makefile.am
+++ b/libglusterfs/src/Makefile.am
@@ -6,9 +6,9 @@ libglusterfs_la_LIBADD = @LEXLIB@
lib_LTLIBRARIES = libglusterfs.la
-libglusterfs_la_SOURCES = dict.c graph.lex.c y.tab.c xlator.c logging.c hashfn.c defaults.c common-utils.c timer.c inode.c call-stub.c compat.c fd.c compat-errno.c event.c mem-pool.c gf-dirent.c syscall.c iobuf.c globals.c statedump.c stack.c checksum.c $(CONTRIBDIR)/md5/md5.c $(CONTRIBDIR)/rbtree/rb.c rbthash.c latency.c graph.c $(CONTRIBDIR)/uuid/clear.c $(CONTRIBDIR)/uuid/copy.c $(CONTRIBDIR)/uuid/gen_uuid.c $(CONTRIBDIR)/uuid/pack.c $(CONTRIBDIR)/uuid/tst_uuid.c $(CONTRIBDIR)/uuid/parse.c $(CONTRIBDIR)/uuid/unparse.c $(CONTRIBDIR)/uuid/uuid_time.c $(CONTRIBDIR)/uuid/compare.c $(CONTRIBDIR)/uuid/isnull.c $(CONTRIBDIR)/uuid/unpack.c
+libglusterfs_la_SOURCES = dict.c graph.lex.c y.tab.c xlator.c logging.c hashfn.c defaults.c common-utils.c timer.c inode.c call-stub.c compat.c fd.c compat-errno.c event.c mem-pool.c gf-dirent.c syscall.c iobuf.c globals.c statedump.c stack.c checksum.c $(CONTRIBDIR)/md5/md5.c $(CONTRIBDIR)/rbtree/rb.c rbthash.c latency.c graph.c $(CONTRIBDIR)/uuid/clear.c $(CONTRIBDIR)/uuid/copy.c $(CONTRIBDIR)/uuid/gen_uuid.c $(CONTRIBDIR)/uuid/pack.c $(CONTRIBDIR)/uuid/tst_uuid.c $(CONTRIBDIR)/uuid/parse.c $(CONTRIBDIR)/uuid/unparse.c $(CONTRIBDIR)/uuid/uuid_time.c $(CONTRIBDIR)/uuid/compare.c $(CONTRIBDIR)/uuid/isnull.c $(CONTRIBDIR)/uuid/unpack.c syncop.c
-noinst_HEADERS = common-utils.h defaults.h dict.h glusterfs.h hashfn.h logging.h xlator.h stack.h timer.h list.h inode.h call-stub.h compat.h fd.h revision.h compat-errno.h event.h mem-pool.h byte-order.h gf-dirent.h locking.h syscall.h iobuf.h globals.h statedump.h checksum.h $(CONTRIBDIR)/md5/md5.h $(CONTRIBDIR)/rbtree/rb.h rbthash.h iatt.h latency.h mem-types.h $(CONTRIBDIR)/uuid/uuidd.h $(CONTRIBDIR)/uuid/uuid.h $(CONTRIBDIR)/uuid/uuidP.h $(CONTRIBDIR)/uuid/uuid_types.h
+noinst_HEADERS = common-utils.h defaults.h dict.h glusterfs.h hashfn.h logging.h xlator.h stack.h timer.h list.h inode.h call-stub.h compat.h fd.h revision.h compat-errno.h event.h mem-pool.h byte-order.h gf-dirent.h locking.h syscall.h iobuf.h globals.h statedump.h checksum.h $(CONTRIBDIR)/md5/md5.h $(CONTRIBDIR)/rbtree/rb.h rbthash.h iatt.h latency.h mem-types.h $(CONTRIBDIR)/uuid/uuidd.h $(CONTRIBDIR)/uuid/uuid.h $(CONTRIBDIR)/uuid/uuidP.h $(CONTRIBDIR)/uuid/uuid_types.h syncop.h
EXTRA_DIST = graph.l graph.y
diff --git a/libglusterfs/src/globals.c b/libglusterfs/src/globals.c
index c0040db1448..e845b3dcb66 100644
--- a/libglusterfs/src/globals.c
+++ b/libglusterfs/src/globals.c
@@ -267,6 +267,45 @@ glusterfs_central_log_flag_unset ()
}
+
+/* SYNCTASK */
+
+static pthread_key_t synctask_key;
+
+
+int
+synctask_init ()
+{
+ int ret = 0;
+
+ ret = pthread_key_create (&synctask_key, NULL);
+
+ return ret;
+}
+
+
+void *
+synctask_get ()
+{
+ void *synctask = NULL;
+
+ synctask = pthread_getspecific (synctask_key);
+
+ return synctask;
+}
+
+
+int
+synctask_set (void *synctask)
+{
+ int ret = 0;
+
+ pthread_setspecific (synctask_key, synctask);
+
+ return ret;
+}
+
+
int
glusterfs_globals_init ()
{
@@ -288,6 +327,10 @@ glusterfs_globals_init ()
gf_mem_acct_enable_set ();
+ ret = synctask_init ();
+ if (ret)
+ goto out;
+
out:
return ret;
}
diff --git a/libglusterfs/src/globals.h b/libglusterfs/src/globals.h
index e09c695113c..bdd9e891046 100644
--- a/libglusterfs/src/globals.h
+++ b/libglusterfs/src/globals.h
@@ -42,10 +42,15 @@ xlator_t **__glusterfs_this_location ();
xlator_t *glusterfs_this_get ();
int glusterfs_this_set (xlator_t *);
+/* central log */
+
void glusterfs_central_log_flag_set ();
long glusterfs_central_log_flag_get ();
void glusterfs_central_log_flag_unset ();
+/* task */
+void *synctask_get ();
+int synctask_set (void *);
/* init */
int glusterfs_globals_init (void);
diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c
new file mode 100644
index 00000000000..beb5d9db4a1
--- /dev/null
+++ b/libglusterfs/src/syncop.c
@@ -0,0 +1,350 @@
+/*
+ Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "syncop.h"
+
+
+void
+synctask_yield (struct synctask *task)
+{
+ struct syncenv *env = NULL;
+
+ env = task->env;
+
+ if (swapcontext (&task->ctx, &env->sched) < 0) {
+ gf_log ("syncop", GF_LOG_ERROR,
+ "swapcontext failed (%s)", strerror (errno));
+ }
+}
+
+
+void
+synctask_yawn (struct synctask *task)
+{
+ struct syncenv *env = NULL;
+
+ env = task->env;
+
+ pthread_mutex_lock (&env->mutex);
+ {
+ list_del_init (&task->all_tasks);
+ list_add (&task->all_tasks, &env->waitq);
+ }
+ pthread_mutex_unlock (&env->mutex);
+}
+
+
+void
+synctask_zzzz (struct synctask *task)
+{
+ synctask_yawn (task);
+
+ synctask_yield (task);
+}
+
+
+void
+synctask_wake (struct synctask *task)
+{
+ struct syncenv *env = NULL;
+
+ env = task->env;
+
+ pthread_mutex_lock (&env->mutex);
+ {
+ list_del_init (&task->all_tasks);
+ list_add_tail (&task->all_tasks, &env->runq);
+ }
+ pthread_mutex_unlock (&env->mutex);
+
+ pthread_cond_broadcast (&env->cond);
+}
+
+
+void
+synctask_wrap (struct synctask *task)
+{
+ int ret;
+
+ ret = task->syncfn (task->opaque);
+ task->synccbk (ret, task->opaque);
+
+ /* cannot destroy @task right here as we are
+ in the execution stack of @task itself
+ */
+ task->complete = 1;
+ synctask_wake (task);
+}
+
+
+void
+synctask_destroy (struct synctask *task)
+{
+ if (!task)
+ return;
+
+ if (task->stack)
+ FREE (task);
+ FREE (task);
+}
+
+
+int
+synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
+ void *opaque)
+{
+ struct synctask *newtask = NULL;
+
+ newtask = CALLOC (1, sizeof (*newtask));
+ if (!newtask)
+ return -ENOMEM;
+
+ newtask->env = env;
+ newtask->xl = THIS;
+ newtask->syncfn = fn;
+ newtask->synccbk = cbk;
+ newtask->opaque = opaque;
+
+ INIT_LIST_HEAD (&newtask->all_tasks);
+
+ if (getcontext (&newtask->ctx) < 0) {
+ gf_log ("syncop", GF_LOG_ERROR,
+ "getcontext failed (%s)",
+ strerror (errno));
+ goto err;
+ }
+
+ newtask->stack = CALLOC (1, env->stacksize);
+ if (!newtask->stack) {
+ gf_log ("syncop", GF_LOG_ERROR,
+ "out of memory for stack");
+ goto err;
+ }
+
+ newtask->ctx.uc_stack.ss_sp = newtask->stack;
+ newtask->ctx.uc_stack.ss_size = env->stacksize;
+
+ makecontext (&newtask->ctx, (void *) synctask_wrap, 2, newtask);
+
+ synctask_wake (newtask);
+
+ return 0;
+err:
+ if (newtask) {
+ if (newtask->stack)
+ FREE (newtask->stack);
+ FREE (newtask);
+ }
+ return -1;
+}
+
+
+struct synctask *
+syncenv_task (struct syncenv *env)
+{
+ struct synctask *task = NULL;
+
+ pthread_mutex_lock (&env->mutex);
+ {
+ while (list_empty (&env->runq))
+ pthread_cond_wait (&env->cond, &env->mutex);
+
+ task = list_entry (env->runq.next, struct synctask, all_tasks);
+
+ list_del_init (&task->all_tasks);
+ }
+ pthread_mutex_unlock (&env->mutex);
+
+ return task;
+}
+
+
+void
+synctask_switchto (struct synctask *task)
+{
+ struct syncenv *env = NULL;
+
+ env = task->env;
+
+ synctask_set (task);
+ THIS = task->xl;
+
+ if (swapcontext (&env->sched, &task->ctx) < 0) {
+ gf_log ("syncop", GF_LOG_ERROR,
+ "swapcontext failed (%s)", strerror (errno));
+ }
+}
+
+
+void *
+syncenv_processor (void *thdata)
+{
+ struct syncenv *env = NULL;
+ struct synctask *task = NULL;
+
+ env = thdata;
+
+ for (;;) {
+ task = syncenv_task (env);
+
+ if (task->complete) {
+ synctask_destroy (task);
+ continue;
+ }
+
+ synctask_switchto (task);
+ }
+
+ return NULL;
+}
+
+
+void
+syncenv_destroy (struct syncenv *env)
+{
+
+}
+
+
+struct syncenv *
+syncenv_new (size_t stacksize)
+{
+ struct syncenv *newenv = NULL;
+ int ret = 0;
+
+ newenv = CALLOC (1, sizeof (*newenv));
+
+ if (!newenv)
+ return NULL;
+
+ pthread_mutex_init (&newenv->mutex, NULL);
+ pthread_cond_init (&newenv->cond, NULL);
+
+ INIT_LIST_HEAD (&newenv->runq);
+ INIT_LIST_HEAD (&newenv->waitq);
+
+ newenv->stacksize = SYNCENV_DEFAULT_STACKSIZE;
+ if (stacksize)
+ newenv->stacksize = stacksize;
+
+ ret = pthread_create (&newenv->processor, NULL,
+ syncenv_processor, newenv);
+
+ if (ret != 0)
+ syncenv_destroy (newenv);
+
+ return newenv;
+}
+
+
+/* FOPS */
+
+
+int
+syncop_lookup_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno, inode_t *inode,
+ struct iatt *iatt, dict_t *xattr, struct iatt *parent)
+{
+ struct syncargs *args = NULL;
+
+ args = cookie;
+
+ args->op_ret = op_ret;
+ args->op_errno = op_errno;
+
+ if (op_ret == 0) {
+ args->iatt1 = *iatt;
+ args->xattr = xattr;
+ args->iatt2 = *parent;
+ }
+
+ __wake (args);
+
+ return 0;
+}
+
+
+int
+syncop_lookup (xlator_t *subvol, loc_t *loc, dict_t *xattr_req,
+ struct iatt *iatt, dict_t **xattr_rsp, struct iatt *parent)
+{
+ struct syncargs args = {0, };
+
+ SYNCOP (subvol, (&args), syncop_lookup_cbk, subvol->fops->lookup,
+ loc, xattr_req);
+
+ if (iatt)
+ *iatt = args.iatt1;
+ if (xattr_rsp)
+ *xattr_rsp = args.xattr;
+ if (parent)
+ *parent = args.iatt2;
+
+ errno = args.op_errno;
+ return args.op_ret;
+}
+
+
+
+int
+syncop_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno,
+ struct iatt *preop, struct iatt *postop)
+{
+ struct syncargs *args = NULL;
+
+ args = cookie;
+
+ args->op_ret = op_ret;
+ args->op_errno = op_errno;
+
+ if (op_ret == 0) {
+ args->iatt1 = *preop;
+ args->iatt2 = *postop;
+ }
+
+ __wake (args);
+
+ return 0;
+}
+
+
+int
+syncop_setattr (xlator_t *subvol, loc_t *loc, struct iatt *iatt, int valid,
+ struct iatt *preop, struct iatt *postop)
+{
+ struct syncargs args = {0, };
+
+ SYNCOP (subvol, (&args), syncop_setattr_cbk, subvol->fops->setattr,
+ loc, iatt, valid);
+
+ if (preop)
+ *preop = args.iatt1;
+ if (postop)
+ *postop = args.iatt2;
+
+ errno = args.op_errno;
+ return args.op_ret;
+}
+
diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h
new file mode 100644
index 00000000000..ce364b07301
--- /dev/null
+++ b/libglusterfs/src/syncop.h
@@ -0,0 +1,164 @@
+/*
+ Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com>
+ This file is part of GlusterFS.
+
+ GlusterFS is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3 of the License,
+ or (at your option) any later version.
+
+ GlusterFS is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see
+ <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef _SYNCOP_H
+#define _SYNCOP_H
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "xlator.h"
+#include <sys/time.h>
+#include <pthread.h>
+#include <ucontext.h>
+
+
+struct synctask;
+struct syncenv;
+
+
+typedef int (*synctask_cbk_t) (int ret, void *opaque);
+
+typedef int (*synctask_fn_t) (void *opaque);
+
+
+/* for one sequential execution of @syncfn */
+struct synctask {
+ struct list_head all_tasks;
+ struct syncenv *env;
+ xlator_t *xl;
+ synctask_cbk_t synccbk;
+ synctask_fn_t syncfn;
+ void *opaque;
+ void *stack;
+ int complete;
+
+ ucontext_t ctx;
+};
+
+/* hosts the scheduler thread and framework for executing synctasks */
+struct syncenv {
+ pthread_t processor;
+ struct synctask *current;
+
+ struct list_head runq;
+ struct list_head waitq;
+
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+
+ ucontext_t sched;
+ size_t stacksize;
+};
+
+
+struct syncargs {
+ int op_ret;
+ int op_errno;
+ struct iatt iatt1;
+ struct iatt iatt2;
+ dict_t *xattr;
+
+ /* do not touch */
+ pthread_mutex_t mutex;
+ char complete;
+ pthread_cond_t cond;
+ struct synctask *task;
+};
+
+
+#define __yawn(args) do { \
+ struct synctask *task = NULL; \
+ \
+ task = synctask_get (); \
+ if (task) { \
+ args->task = task; \
+ synctask_yawn (task); \
+ } else { \
+ pthread_mutex_init (&args->mutex, NULL); \
+ pthread_cond_init (&args->cond, NULL); \
+ } \
+} while (0)
+
+
+#define __yield(args) do { \
+ if (args->task) { \
+ synctask_yield (args->task); \
+ } else { \
+ pthread_mutex_lock (&args->mutex); \
+ { \
+ while (!args->complete) \
+ pthread_cond_wait (&args->cond, \
+ &args->mutex); \
+ } \
+ pthread_mutex_unlock (&args->mutex); \
+ \
+ pthread_mutex_destroy (&args->mutex); \
+ pthread_cond_destroy (&args->cond); \
+ } \
+} while (0)
+
+
+#define __wake(args) do { \
+ if (args->task) { \
+ synctask_wake (args->task); \
+ } else { \
+ pthread_mutex_lock (&args->mutex); \
+ { \
+ args->complete = 1; \
+ pthread_cond_broadcast (&args->cond); \
+ } \
+ pthread_mutex_unlock (&args->mutex); \
+ } \
+} while (0)
+
+
+#define SYNCOP(subvol, stb, cbk, op, params ...) do { \
+ call_frame_t *frame = NULL; \
+ \
+ frame = create_frame (THIS, THIS->ctx->pool); \
+ \
+ __yawn (stb); \
+ STACK_WIND_COOKIE (frame, (void *)stb, cbk, subvol, op, params);\
+ __yield (stb); \
+} while (0)
+
+
+#define SYNCENV_DEFAULT_STACKSIZE (16 * 1024)
+
+struct syncenv * syncenv_new ();
+void syncenv_destroy (struct syncenv *);
+
+int synctask_new (struct syncenv *, synctask_fn_t, synctask_cbk_t, void *);
+void synctask_zzzz (struct synctask *task);
+void synctask_yawn (struct synctask *task);
+void synctask_wake (struct synctask *task);
+void synctask_yield (struct synctask *task);
+
+int syncop_lookup (xlator_t *subvol, loc_t *loc, dict_t *xattr_req,
+ /* out */
+ struct iatt *iatt, dict_t **xattr_rsp, struct iatt *parent);
+
+int syncop_setattr (xlator_t *subvol, loc_t *loc, struct iatt *iatt, int valid,
+ /* out */
+ struct iatt *preop, struct iatt *postop);
+
+#endif /* _SYNCOP_H */