summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKrishnan Parthasarathi <kparthas@redhat.com>2013-03-06 11:45:48 +0530
committerAnand Avati <avati@redhat.com>2013-03-06 21:32:34 -0800
commit6f2dc529faba92f10a5fee618bed05ebf752ef9e (patch)
treebabf8cd8f1cb2234b84b4eae2379af6eb080a1e2
parente52dc374bce30ed8a223e89628324ae18f433b96 (diff)
synctask: support for (assymetric) counted barriers
[Backport of Avati's patch on master - http://review.gluster.org/4558] This patch introduces a new set of primitives: - synctask_barrier_init (stub) - synctask_barrier_waitfor (stub, count) - synctask_barrier_wake (stub) Unlike pthread_barrier_t, this barrier has an explicit notion of "waiter" and "waker". The "waiter" waits for @count number of "wakers" to call synctask_barrier_wake() before returning. The wait performed by the waiter via synctask_barrier_waitfor() is co-operative in nature and yields the thread for scheduling other synctasks in the mean time. Intended use case: Eliminate excessive serialization in glusterd and allow for concurrent RPC transactions. Code which are currently in this format: ---old--- list_for_each_entry (peerinfo, peers, op_peers_list) { ... GD_SYNCOP (peerinfo->rpc, stub, rpc_cbk, ...); } ... int rpc_cbk (rpc, stub, ...) { ... __wake (stub); } ---old--- Can be restructred into the format: ---new--- synctask_barrier_init (stub); { list_for_each_entry (peerinfo, peers, op_peers_list) { ... rpc_submit (peerinfo->rpc, stub, rpc_cbk, ...); count++; } } synctask_barrier_wait (stub, count); ... int rpc_cbk (rpc, stub, ...) { ... synctask_barrier_wake (stub); } ---new--- In the above structure, from the synctask's point of view, the region between synctask_barrier_init() and synctask_barrier_wait() are spawning off asynchronous "threads" (or RPC) and keep count of how many such threads have been spawned. Each of those threads are expected to make one call to synctask_barrier_wake(). The call to synctask_barrier_wait() makes the synctask thread co-operatively wait/sleep till @count such threads call their wake function. This way, the synctask thread retains the "synchronous" flow in the code, yet at the same time allows for asynchronous "threads" to acheive parallelism over RPC. Change-Id: Ie037f99b2d306b71e63e3a56353daec06fb0bf41 BUG: 913662 Signed-off-by: Anand Avati <avati@redhat.com> Signed-off-by: Krishnan Parthasarathi <kparthas@redhat.com> Reviewed-on: http://review.gluster.org/4636 Reviewed-by: Jeff Darcy <jdarcy@redhat.com> Tested-by: Gluster Build System <jenkins@build.gluster.com>
-rw-r--r--libglusterfs/src/syncop.c43
-rw-r--r--libglusterfs/src/syncop.h155
2 files changed, 122 insertions, 76 deletions
diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c
index b15ee31ba..a8faa65e0 100644
--- a/libglusterfs/src/syncop.c
+++ b/libglusterfs/src/syncop.c
@@ -80,15 +80,24 @@ __wait (struct synctask *task)
void
-synctask_yield (struct synctask *task)
+synctask_waitfor (struct synctask *task, int waitfor)
{
- xlator_t *oldTHIS = THIS;
+ struct syncenv *env = NULL;
+ xlator_t *oldTHIS = THIS;
+
+ env = task->env;
#if defined(__NetBSD__) && defined(_UC_TLSBASE)
/* Preserve pthread private pointer through swapcontex() */
task->proc->sched.uc_flags &= ~_UC_TLSBASE;
#endif
+ pthread_mutex_lock (&env->mutex);
+ {
+ task->waitfor = waitfor;
+ }
+ pthread_mutex_unlock (&env->mutex);
+
if (swapcontext (&task->ctx, &task->proc->sched) < 0) {
gf_log ("syncop", GF_LOG_ERROR,
"swapcontext failed (%s)", strerror (errno));
@@ -99,6 +108,29 @@ synctask_yield (struct synctask *task)
void
+synctask_yield (struct synctask *task)
+{
+ synctask_waitfor (task, 1);
+}
+
+
+void
+synctask_yawn (struct synctask *task)
+{
+ struct syncenv *env = NULL;
+
+ env = task->env;
+
+ pthread_mutex_lock (&env->mutex);
+ {
+ task->woken = 0;
+ task->waitfor = 0;
+ }
+ pthread_mutex_unlock (&env->mutex);
+}
+
+
+void
synctask_wake (struct synctask *task)
{
struct syncenv *env = NULL;
@@ -107,9 +139,9 @@ synctask_wake (struct synctask *task)
pthread_mutex_lock (&env->mutex);
{
- task->woken = 1;
+ task->woken++;
- if (task->slept)
+ if (task->slept && task->woken >= task->waitfor)
__run (task);
}
pthread_mutex_unlock (&env->mutex);
@@ -338,6 +370,7 @@ synctask_switchto (struct synctask *task)
task->woken = 0;
task->slept = 0;
+ task->waitfor = 0;
#if defined(__NetBSD__) && defined(_UC_TLSBASE)
/* Preserve pthread private pointer through swapcontex() */
@@ -356,7 +389,7 @@ synctask_switchto (struct synctask *task)
pthread_mutex_lock (&env->mutex);
{
- if (task->woken) {
+ if (task->woken >= task->waitfor) {
__run (task);
} else {
task->slept = 1;
diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h
index 37e2b0e28..d4086291a 100644
--- a/libglusterfs/src/syncop.h
+++ b/libglusterfs/src/syncop.h
@@ -36,11 +36,11 @@ typedef int (*synctask_fn_t) (void *opaque);
typedef enum {
- SYNCTASK_INIT = 0,
- SYNCTASK_RUN,
+ SYNCTASK_INIT = 0,
+ SYNCTASK_RUN,
SYNCTASK_SUSPEND,
- SYNCTASK_WAIT,
- SYNCTASK_DONE,
+ SYNCTASK_WAIT,
+ SYNCTASK_DONE,
} synctask_state_t;
/* for one sequential execution of @syncfn */
@@ -52,22 +52,23 @@ struct synctask {
call_frame_t *opframe;
synctask_cbk_t synccbk;
synctask_fn_t syncfn;
- synctask_state_t state;
+ synctask_state_t state;
void *opaque;
void *stack;
int woken;
int slept;
- int ret;
+ int waitfor;
+ int ret;
- uid_t uid;
- gid_t gid;
+ uid_t uid;
+ gid_t gid;
ucontext_t ctx;
- struct syncproc *proc;
+ struct syncproc *proc;
- pthread_mutex_t mutex; /* for synchronous spawning of synctask */
- pthread_cond_t cond;
- int done;
+ pthread_mutex_t mutex; /* for synchronous spawning of synctask */
+ pthread_cond_t cond;
+ int done;
};
@@ -116,79 +117,85 @@ struct syncargs {
/* do not touch */
struct synctask *task;
- pthread_mutex_t mutex;
- pthread_cond_t cond;
- int done;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ int wakecnt;
};
-#define __yawn(args) do { \
- if (!args->task) { \
- pthread_mutex_init (&args->mutex, NULL); \
- pthread_cond_init (&args->cond, NULL); \
- args->done = 0; \
- } \
- } while (0)
-
-
-#define __wake(args) do { \
- if (args->task) { \
- synctask_wake (args->task); \
- } else { \
- pthread_mutex_lock (&args->mutex); \
- { \
- args->done = 1; \
- pthread_cond_signal (&args->cond); \
- } \
- pthread_mutex_unlock (&args->mutex); \
- } \
- } while (0)
-
-
-#define __yield(args) do { \
- if (args->task) { \
- synctask_yield (args->task); \
- } else { \
- pthread_mutex_lock (&args->mutex); \
- { \
- while (!args->done) \
- pthread_cond_wait (&args->cond, \
- &args->mutex); \
- } \
- pthread_mutex_unlock (&args->mutex); \
- pthread_mutex_destroy (&args->mutex); \
- pthread_cond_destroy (&args->cond); \
- } \
- } while (0)
+#define __yawn(args) do { \
+ args->task = synctask_get (); \
+ if (args->task) { \
+ synctask_yawn (args->task); \
+ } else { \
+ pthread_mutex_init (&args->mutex, NULL); \
+ pthread_cond_init (&args->cond, NULL); \
+ args->wakecnt = 0; \
+ } \
+ } while (0)
+
+
+#define __wake(args) do { \
+ if (args->task) { \
+ synctask_wake (args->task); \
+ } else { \
+ pthread_mutex_lock (&args->mutex); \
+ { \
+ args->wakecnt++; \
+ pthread_cond_signal (&args->cond); \
+ } \
+ pthread_mutex_unlock (&args->mutex); \
+ } \
+ } while (0)
+
+
+#define __waitfor(args, cnt) do { \
+ if (args->task) { \
+ synctask_waitfor (args->task, cnt); \
+ } else { \
+ pthread_mutex_lock (&args->mutex); \
+ { \
+ while (args->wakecnt < cnt) \
+ pthread_cond_wait (&args->cond, \
+ &args->mutex); \
+ } \
+ pthread_mutex_unlock (&args->mutex); \
+ pthread_mutex_destroy (&args->mutex); \
+ pthread_cond_destroy (&args->cond); \
+ } \
+ } while (0)
+
+
+#define __yield(args) __waitfor(args, 1)
#define SYNCOP(subvol, stb, cbk, op, params ...) do { \
struct synctask *task = NULL; \
- call_frame_t *frame = NULL; \
+ call_frame_t *frame = NULL; \
\
task = synctask_get (); \
stb->task = task; \
- if (task) \
- frame = task->opframe; \
- else \
- frame = create_frame (THIS, THIS->ctx->pool); \
+ if (task) \
+ frame = task->opframe; \
+ else \
+ frame = create_frame (THIS, THIS->ctx->pool); \
if (task) { \
frame->root->uid = task->uid; \
frame->root->gid = task->gid; \
} \
- \
- __yawn (stb); \
\
- STACK_WIND_COOKIE (frame, cbk, (void *)stb, subvol, \
- op, params); \
- if (task) \
- task->state = SYNCTASK_SUSPEND; \
- \
- __yield (stb); \
- if (task) \
- STACK_RESET (frame->root); \
- else \
- STACK_DESTROY (frame->root); \
+ __yawn (stb); \
+ \
+ STACK_WIND_COOKIE (frame, cbk, (void *)stb, subvol, \
+ op, params); \
+ if (task) \
+ task->state = SYNCTASK_SUSPEND; \
+ \
+ __yield (stb); \
+ if (task) \
+ STACK_RESET (frame->root); \
+ else \
+ STACK_DESTROY (frame->root); \
} while (0)
@@ -201,6 +208,12 @@ void syncenv_scale (struct syncenv *env);
int synctask_new (struct syncenv *, synctask_fn_t, synctask_cbk_t, call_frame_t* frame, void *);
void synctask_wake (struct synctask *task);
void synctask_yield (struct synctask *task);
+void synctask_yawn (struct synctask *task);
+void synctask_waitfor (struct synctask *task, int count);
+
+#define synctask_barrier_init(args) __yawn (args)
+#define synctask_barrier_wait(args, n) __waitfor (args, n)
+#define synctask_barrier_wake(args) __wake (args)
int synctask_setid (struct synctask *task, uid_t uid, gid_t gid);
#define SYNCTASK_SETID(uid, gid) synctask_setid (synctask_get(), uid, gid);
@@ -264,7 +277,7 @@ int syncop_fstat (xlator_t *subvol, fd_t *fd, struct iatt *stbuf);
int syncop_stat (xlator_t *subvol, loc_t *loc, struct iatt *stbuf);
int syncop_symlink (xlator_t *subvol, loc_t *loc, const char *newpath,
- dict_t *dict);
+ dict_t *dict);
int syncop_readlink (xlator_t *subvol, loc_t *loc, char **buffer, size_t size);
int syncop_mknod (xlator_t *subvol, loc_t *loc, mode_t mode, dev_t rdev,
dict_t *dict);