summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAnand Avati <avati@redhat.com>2013-02-21 00:17:26 -0800
committerAnand Avati <avati@redhat.com>2013-02-21 22:57:04 -0800
commit72ad9a3a8b684595dc394252c88c76c859919a45 (patch)
treeef6eb57e7d817b9bf3f8f942ea9ba137cd61d16f
parent1dbe9a05feac5032990457058f7cef686a293973 (diff)
synctask: support for (assymetric) counted barriers
This patch introduces a new set of primitives: - synctask_barrier_init (stub) - synctask_barrier_waitfor (stub, count) - synctask_barrier_wake (stub) Unlike pthread_barrier_t, this barrier has an explicit notion of "waiter" and "waker". The "waiter" waits for @count number of "wakers" to call synctask_barrier_wake() before returning. The wait performed by the waiter via synctask_barrier_waitfor() is co-operative in nature and yields the thread for scheduling other synctasks in the mean time. Intended use case: Eliminate excessive serialization in glusterd and allow for concurrent RPC transactions. Code which are currently in this format: ---old--- list_for_each_entry (peerinfo, peers, op_peers_list) { ... GD_SYNCOP (peerinfo->rpc, stub, rpc_cbk, ...); } ... int rpc_cbk (rpc, stub, ...) { ... __wake (stub); } ---old--- Can be restructred into the format: ---new--- synctask_barrier_init (stub); { list_for_each_entry (peerinfo, peers, op_peers_list) { ... rpc_submit (peerinfo->rpc, stub, rpc_cbk, ...); count++; } } synctask_barrier_wait (stub, count); ... int rpc_cbk (rpc, stub, ...) { ... synctask_barrier_wake (stub); } ---new--- In the above structure, from the synctask's point of view, the region between synctask_barrier_init() and synctask_barrier_wait() are spawning off asynchronous "threads" (or RPC) and keep count of how many such threads have been spawned. Each of those threads are expected to make one call to synctask_barrier_wake(). The call to synctask_barrier_wait() makes the synctask thread co-operatively wait/sleep till @count such threads call their wake function. This way, the synctask thread retains the "synchronous" flow in the code, yet at the same time allows for asynchronous "threads" to acheive parallelism over RPC. Change-Id: Ie037f99b2d306b71e63e3a56353daec06fb0bf41 BUG: 913662 Signed-off-by: Anand Avati <avati@redhat.com> Reviewed-on: http://review.gluster.org/4558 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Krishnan Parthasarathi <kparthas@redhat.com> Tested-by: Krishnan Parthasarathi <kparthas@redhat.com>
-rw-r--r--libglusterfs/src/syncop.c41
-rw-r--r--libglusterfs/src/syncop.h51
2 files changed, 69 insertions, 23 deletions
diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c
index c996b8fdd01..115debbfbf7 100644
--- a/libglusterfs/src/syncop.c
+++ b/libglusterfs/src/syncop.c
@@ -80,15 +80,24 @@ __wait (struct synctask *task)
void
-synctask_yield (struct synctask *task)
+synctask_waitfor (struct synctask *task, int waitfor)
{
+ struct syncenv *env = NULL;
xlator_t *oldTHIS = THIS;
+ env = task->env;
+
#if defined(__NetBSD__) && defined(_UC_TLSBASE)
/* Preserve pthread private pointer through swapcontex() */
task->proc->sched.uc_flags &= ~_UC_TLSBASE;
#endif
+ pthread_mutex_lock (&env->mutex);
+ {
+ task->waitfor = waitfor;
+ }
+ pthread_mutex_unlock (&env->mutex);
+
if (swapcontext (&task->ctx, &task->proc->sched) < 0) {
gf_log ("syncop", GF_LOG_ERROR,
"swapcontext failed (%s)", strerror (errno));
@@ -99,6 +108,29 @@ synctask_yield (struct synctask *task)
void
+synctask_yield (struct synctask *task)
+{
+ synctask_waitfor (task, 1);
+}
+
+
+void
+synctask_yawn (struct synctask *task)
+{
+ struct syncenv *env = NULL;
+
+ env = task->env;
+
+ pthread_mutex_lock (&env->mutex);
+ {
+ task->woken = 0;
+ task->waitfor = 0;
+ }
+ pthread_mutex_unlock (&env->mutex);
+}
+
+
+void
synctask_wake (struct synctask *task)
{
struct syncenv *env = NULL;
@@ -107,9 +139,9 @@ synctask_wake (struct synctask *task)
pthread_mutex_lock (&env->mutex);
{
- task->woken = 1;
+ task->woken++;
- if (task->slept)
+ if (task->slept && task->woken >= task->waitfor)
__run (task);
}
pthread_mutex_unlock (&env->mutex);
@@ -338,6 +370,7 @@ synctask_switchto (struct synctask *task)
task->woken = 0;
task->slept = 0;
+ task->waitfor = 0;
#if defined(__NetBSD__) && defined(_UC_TLSBASE)
/* Preserve pthread private pointer through swapcontex() */
@@ -356,7 +389,7 @@ synctask_switchto (struct synctask *task)
pthread_mutex_lock (&env->mutex);
{
- if (task->woken) {
+ if (task->woken >= task->waitfor) {
__run (task);
} else {
task->slept = 1;
diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h
index 001c68ff5f0..ba0440cd790 100644
--- a/libglusterfs/src/syncop.h
+++ b/libglusterfs/src/syncop.h
@@ -57,6 +57,7 @@ struct synctask {
void *stack;
int woken;
int slept;
+ int waitfor;
int ret;
uid_t uid;
@@ -118,15 +119,18 @@ struct syncargs {
struct synctask *task;
pthread_mutex_t mutex;
pthread_cond_t cond;
- int done;
+ int wakecnt;
};
#define __yawn(args) do { \
- if (!args->task) { \
+ args->task = synctask_get (); \
+ if (args->task) { \
+ synctask_yawn (args->task); \
+ } else { \
pthread_mutex_init (&args->mutex, NULL); \
pthread_cond_init (&args->cond, NULL); \
- args->done = 0; \
+ args->wakecnt = 0; \
} \
} while (0)
@@ -137,7 +141,7 @@ struct syncargs {
} else { \
pthread_mutex_lock (&args->mutex); \
{ \
- args->done = 1; \
+ args->wakecnt++; \
pthread_cond_signal (&args->cond); \
} \
pthread_mutex_unlock (&args->mutex); \
@@ -145,21 +149,24 @@ struct syncargs {
} while (0)
-#define __yield(args) do { \
- if (args->task) { \
- synctask_yield (args->task); \
- } else { \
- pthread_mutex_lock (&args->mutex); \
- { \
- while (!args->done) \
- pthread_cond_wait (&args->cond, \
- &args->mutex); \
- } \
- pthread_mutex_unlock (&args->mutex); \
- pthread_mutex_destroy (&args->mutex); \
- pthread_cond_destroy (&args->cond); \
- } \
- } while (0)
+#define __waitfor(args, cnt) do { \
+ if (args->task) { \
+ synctask_waitfor (args->task, cnt); \
+ } else { \
+ pthread_mutex_lock (&args->mutex); \
+ { \
+ while (args->wakecnt < cnt) \
+ pthread_cond_wait (&args->cond, \
+ &args->mutex); \
+ } \
+ pthread_mutex_unlock (&args->mutex); \
+ pthread_mutex_destroy (&args->mutex); \
+ pthread_cond_destroy (&args->cond); \
+ } \
+ } while (0)
+
+
+#define __yield(args) __waitfor(args, 1)
#define SYNCOP(subvol, stb, cbk, op, params ...) do { \
@@ -202,6 +209,12 @@ void syncenv_scale (struct syncenv *env);
int synctask_new (struct syncenv *, synctask_fn_t, synctask_cbk_t, call_frame_t* frame, void *);
void synctask_wake (struct synctask *task);
void synctask_yield (struct synctask *task);
+void synctask_yawn (struct synctask *task);
+void synctask_waitfor (struct synctask *task, int count);
+
+#define synctask_barrier_init(args) __yawn (args)
+#define synctask_barrier_wait(args, n) __waitfor (args, n)
+#define synctask_barrier_wake(args) __wake (args)
int synctask_setid (struct synctask *task, uid_t uid, gid_t gid);
#define SYNCTASK_SETID(uid, gid) synctask_setid (synctask_get(), uid, gid);