diff options
Diffstat (limited to 'libglusterfs')
-rw-r--r-- | libglusterfs/src/glusterfs/syncop.h | 53 | ||||
-rw-r--r-- | libglusterfs/src/libglusterfs.sym | 7 | ||||
-rw-r--r-- | libglusterfs/src/syncop.c | 306 |
3 files changed, 291 insertions, 75 deletions
diff --git a/libglusterfs/src/glusterfs/syncop.h b/libglusterfs/src/glusterfs/syncop.h index bfdec491ba8..1a801a91444 100644 --- a/libglusterfs/src/glusterfs/syncop.h +++ b/libglusterfs/src/glusterfs/syncop.h @@ -16,6 +16,8 @@ #include <ucontext.h> #include "glusterfs/dict.h" // for dict_t #include "glusterfs/stack.h" // for call_frame_t, STACK_DESTROY, STACK_... +#include "glusterfs/timer.h" + #define SYNCENV_PROC_MAX 16 #define SYNCENV_PROC_MIN 2 #define SYNCPROC_IDLE_TIME 600 @@ -32,6 +34,7 @@ struct synctask; struct syncproc; struct syncenv; +struct synccond; typedef int (*synctask_cbk_t)(int ret, call_frame_t *frame, void *opaque); @@ -55,9 +58,12 @@ struct synctask { call_frame_t *opframe; synctask_cbk_t synccbk; synctask_fn_t syncfn; - synctask_state_t state; + struct timespec *delta; + gf_timer_t *timer; + struct synccond *synccond; void *opaque; void *stack; + synctask_state_t state; int woken; int slept; int ret; @@ -85,19 +91,21 @@ struct syncproc { /* hosts the scheduler thread and framework for executing synctasks */ struct syncenv { struct syncproc proc[SYNCENV_PROC_MAX]; - int procs; + + pthread_mutex_t mutex; + pthread_cond_t cond; struct list_head runq; - int runcount; struct list_head waitq; - int waitcount; + + int procs; + int procs_idle; + + int runcount; int procmin; int procmax; - pthread_mutex_t mutex; - pthread_cond_t cond; - size_t stacksize; int destroy; /* FLAG to mark syncenv is in destroy mode @@ -123,6 +131,13 @@ struct synclock { }; typedef struct synclock synclock_t; +struct synccond { + pthread_mutex_t pmutex; + pthread_cond_t pcond; + struct list_head waitq; +}; +typedef struct synccond synccond_t; + struct syncbarrier { gf_boolean_t initialized; /*Set on successful initialization*/ pthread_mutex_t guard; /* guard the remaining members, pair @cond */ @@ -219,7 +234,7 @@ struct syncopctx { #define __yield(args) \ do { \ if (args->task) { \ - synctask_yield(args->task); \ + synctask_yield(args->task, NULL); \ } else { \ pthread_mutex_lock(&args->mutex); \ { \ @@ -310,7 +325,9 @@ synctask_join(struct synctask *task); void synctask_wake(struct synctask *task); void -synctask_yield(struct synctask *task); +synctask_yield(struct synctask *task, struct timespec *delta); +void +synctask_sleep(int32_t secs); void synctask_waitfor(struct synctask *task, int count); @@ -408,6 +425,24 @@ synclock_trylock(synclock_t *lock); int synclock_unlock(synclock_t *lock); +int32_t +synccond_init(synccond_t *cond); + +void +synccond_destroy(synccond_t *cond); + +int +synccond_wait(synccond_t *cond, synclock_t *lock); + +int +synccond_timedwait(synccond_t *cond, synclock_t *lock, struct timespec *delta); + +void +synccond_signal(synccond_t *cond); + +void +synccond_broadcast(synccond_t *cond); + int syncbarrier_init(syncbarrier_t *barrier); int diff --git a/libglusterfs/src/libglusterfs.sym b/libglusterfs/src/libglusterfs.sym index 47e0872ca59..f5e273e4ca0 100644 --- a/libglusterfs/src/libglusterfs.sym +++ b/libglusterfs/src/libglusterfs.sym @@ -937,6 +937,12 @@ syncbarrier_destroy syncbarrier_init syncbarrier_wait syncbarrier_wake +synccond_init +synccond_destroy +synccond_wait +synccond_timedwait +synccond_signal +synccond_broadcast syncenv_destroy syncenv_new synclock_destroy @@ -1014,6 +1020,7 @@ synctask_new synctask_new1 synctask_set synctask_setid +synctask_sleep synctask_wake synctask_yield sys_access diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c index 0de53c6f4cf..e20cf006493 100644 --- a/libglusterfs/src/syncop.c +++ b/libglusterfs/src/syncop.c @@ -154,10 +154,14 @@ out: return ret; } +void * +syncenv_processor(void *thdata); + static void __run(struct synctask *task) { struct syncenv *env = NULL; + int32_t total, ret, i; env = task->env; @@ -173,7 +177,6 @@ __run(struct synctask *task) env->runcount--; break; case SYNCTASK_WAIT: - env->waitcount--; break; case SYNCTASK_DONE: gf_msg(task->xl->name, GF_LOG_WARNING, 0, LG_MSG_COMPLETED_TASK, @@ -187,8 +190,27 @@ __run(struct synctask *task) } list_add_tail(&task->all_tasks, &env->runq); - env->runcount++; task->state = SYNCTASK_RUN; + + env->runcount++; + + total = env->procs + env->runcount - env->procs_idle; + if (total > env->procmax) { + total = env->procmax; + } + if (total > env->procs) { + for (i = 0; i < env->procmax; i++) { + if (env->proc[i].env == NULL) { + env->proc[i].env = env; + ret = gf_thread_create(&env->proc[i].processor, NULL, + syncenv_processor, &env->proc[i], + "sproc%d", i); + if ((ret < 0) || (++env->procs >= total)) { + break; + } + } + } + } } static void @@ -210,7 +232,6 @@ __wait(struct synctask *task) gf_msg(task->xl->name, GF_LOG_WARNING, 0, LG_MSG_REWAITING_TASK, "re-waiting already waiting " "task"); - env->waitcount--; break; case SYNCTASK_DONE: gf_msg(task->xl->name, GF_LOG_WARNING, 0, LG_MSG_COMPLETED_TASK, @@ -223,12 +244,11 @@ __wait(struct synctask *task) } list_add_tail(&task->all_tasks, &env->waitq); - env->waitcount++; task->state = SYNCTASK_WAIT; } void -synctask_yield(struct synctask *task) +synctask_yield(struct synctask *task, struct timespec *delta) { xlator_t *oldTHIS = THIS; @@ -237,6 +257,8 @@ synctask_yield(struct synctask *task) task->proc->sched.uc_flags &= ~_UC_TLSBASE; #endif + task->delta = delta; + if (task->state != SYNCTASK_DONE) { task->state = SYNCTASK_SUSPEND; } @@ -249,6 +271,35 @@ synctask_yield(struct synctask *task) } void +synctask_sleep(int32_t secs) +{ + struct timespec delta; + struct synctask *task; + + task = synctask_get(); + + if (task == NULL) { + sleep(secs); + } else { + delta.tv_sec = secs; + delta.tv_nsec = 0; + + synctask_yield(task, &delta); + } +} + +static void +__synctask_wake(struct synctask *task) +{ + task->woken = 1; + + if (task->slept) + __run(task); + + pthread_cond_broadcast(&task->env->cond); +} + +void synctask_wake(struct synctask *task) { struct syncenv *env = NULL; @@ -257,13 +308,18 @@ synctask_wake(struct synctask *task) pthread_mutex_lock(&env->mutex); { - task->woken = 1; + if (task->timer != NULL) { + if (gf_timer_call_cancel(task->xl->ctx, task->timer) != 0) { + goto unlock; + } - if (task->slept) - __run(task); + task->timer = NULL; + task->synccond = NULL; + } - pthread_cond_broadcast(&env->cond); + __synctask_wake(task); } +unlock: pthread_mutex_unlock(&env->mutex); } @@ -282,7 +338,7 @@ synctask_wrap(void) task->state = SYNCTASK_DONE; - synctask_yield(task); + synctask_yield(task, NULL); } void @@ -422,11 +478,6 @@ synctask_create(struct syncenv *env, size_t stacksize, synctask_fn_t fn, } synctask_wake(newtask); - /* - * Make sure someone's there to execute anything we just put on the - * run queue. - */ - syncenv_scale(env); return newtask; err: @@ -520,8 +571,12 @@ syncenv_task(struct syncproc *proc) goto unlock; } + env->procs_idle++; + sleep_till.tv_sec = time(NULL) + SYNCPROC_IDLE_TIME; ret = pthread_cond_timedwait(&env->cond, &env->mutex, &sleep_till); + + env->procs_idle--; } task = list_entry(env->runq.next, struct synctask, all_tasks); @@ -540,6 +595,34 @@ unlock: return task; } +static void +synctask_timer(void *data) +{ + struct synctask *task = data; + struct synccond *cond; + + cond = task->synccond; + if (cond != NULL) { + pthread_mutex_lock(&cond->pmutex); + + list_del_init(&task->waitq); + task->synccond = NULL; + + pthread_mutex_unlock(&cond->pmutex); + + task->ret = -ETIMEDOUT; + } + + pthread_mutex_lock(&task->env->mutex); + + gf_timer_call_cancel(task->xl->ctx, task->timer); + task->timer = NULL; + + __synctask_wake(task); + + pthread_mutex_unlock(&task->env->mutex); +} + void synctask_switchto(struct synctask *task) { @@ -572,7 +655,14 @@ synctask_switchto(struct synctask *task) } else { task->slept = 1; __wait(task); + + if (task->delta != NULL) { + task->timer = gf_timer_call_after(task->xl->ctx, *task->delta, + synctask_timer, task); + } } + + task->delta = NULL; } pthread_mutex_unlock(&env->mutex); } @@ -580,65 +670,18 @@ synctask_switchto(struct synctask *task) void * syncenv_processor(void *thdata) { - struct syncenv *env = NULL; struct syncproc *proc = NULL; struct synctask *task = NULL; proc = thdata; - env = proc->env; - - for (;;) { - task = syncenv_task(proc); - if (!task) - break; + while ((task = syncenv_task(proc)) != NULL) { synctask_switchto(task); - - syncenv_scale(env); } return NULL; } -void -syncenv_scale(struct syncenv *env) -{ - int diff = 0; - int scale = 0; - int i = 0; - int ret = 0; - - pthread_mutex_lock(&env->mutex); - { - if (env->procs > env->runcount) - goto unlock; - - scale = env->runcount; - if (scale > env->procmax) - scale = env->procmax; - if (scale > env->procs) - diff = scale - env->procs; - while (diff) { - diff--; - for (; (i < env->procmax); i++) { - if (env->proc[i].processor == 0) - break; - } - - env->proc[i].env = env; - ret = gf_thread_create(&env->proc[i].processor, NULL, - syncenv_processor, &env->proc[i], - "sproc%03hx", env->procs & 0x3ff); - if (ret) - break; - env->procs++; - i++; - } - } -unlock: - pthread_mutex_unlock(&env->mutex); -} - /* The syncenv threads are cleaned up in this routine. */ void @@ -715,12 +758,13 @@ syncenv_new(size_t stacksize, int procmin, int procmax) newenv->stacksize = stacksize; newenv->procmin = procmin; newenv->procmax = procmax; + newenv->procs_idle = 0; for (i = 0; i < newenv->procmin; i++) { newenv->proc[i].env = newenv; ret = gf_thread_create(&newenv->proc[i].processor, NULL, syncenv_processor, &newenv->proc[i], "sproc%d", - newenv->procs); + i); if (ret) break; newenv->procs++; @@ -810,7 +854,7 @@ __synclock_lock(struct synclock *lock) task->woken = 0; list_add_tail(&task->waitq, &lock->waitq); pthread_mutex_unlock(&lock->guard); - synctask_yield(task); + synctask_yield(task, NULL); /* task is removed from waitq in unlock, * under lock->guard.*/ pthread_mutex_lock(&lock->guard); @@ -963,6 +1007,136 @@ synclock_unlock(synclock_t *lock) return ret; } +/* Condition variables */ + +int32_t +synccond_init(synccond_t *cond) +{ + int32_t ret; + + INIT_LIST_HEAD(&cond->waitq); + + ret = pthread_mutex_init(&cond->pmutex, NULL); + if (ret != 0) { + return -ret; + } + + ret = pthread_cond_init(&cond->pcond, NULL); + if (ret != 0) { + pthread_mutex_destroy(&cond->pmutex); + } + + return -ret; +} + +void +synccond_destroy(synccond_t *cond) +{ + pthread_cond_destroy(&cond->pcond); + pthread_mutex_destroy(&cond->pmutex); +} + +int +synccond_timedwait(synccond_t *cond, synclock_t *lock, struct timespec *delta) +{ + struct timespec now; + struct synctask *task = NULL; + int ret; + + task = synctask_get(); + + if (task == NULL) { + if (delta != NULL) { + timespec_now_realtime(&now); + timespec_adjust_delta(&now, *delta); + } + + pthread_mutex_lock(&cond->pmutex); + + if (delta == NULL) { + ret = -pthread_cond_wait(&cond->pcond, &cond->pmutex); + } else { + ret = -pthread_cond_timedwait(&cond->pcond, &cond->pmutex, &now); + } + } else { + pthread_mutex_lock(&cond->pmutex); + + list_add_tail(&task->waitq, &cond->waitq); + task->synccond = cond; + + ret = synclock_unlock(lock); + if (ret == 0) { + pthread_mutex_unlock(&cond->pmutex); + + synctask_yield(task, delta); + + ret = synclock_lock(lock); + if (ret == 0) { + ret = task->ret; + } + task->ret = 0; + + return ret; + } + + list_del_init(&task->waitq); + } + + pthread_mutex_unlock(&cond->pmutex); + + return ret; +} + +int +synccond_wait(synccond_t *cond, synclock_t *lock) +{ + return synccond_timedwait(cond, lock, NULL); +} + +void +synccond_signal(synccond_t *cond) +{ + struct synctask *task; + + pthread_mutex_lock(&cond->pmutex); + + if (!list_empty(&cond->waitq)) { + task = list_first_entry(&cond->waitq, struct synctask, waitq); + list_del_init(&task->waitq); + + pthread_mutex_unlock(&cond->pmutex); + + synctask_wake(task); + } else { + pthread_cond_signal(&cond->pcond); + + pthread_mutex_unlock(&cond->pmutex); + } +} + +void +synccond_broadcast(synccond_t *cond) +{ + struct list_head list; + struct synctask *task; + + INIT_LIST_HEAD(&list); + + pthread_mutex_lock(&cond->pmutex); + + list_splice_init(&cond->waitq, &list); + pthread_cond_broadcast(&cond->pcond); + + pthread_mutex_unlock(&cond->pmutex); + + while (!list_empty(&list)) { + task = list_first_entry(&list, struct synctask, waitq); + list_del_init(&task->waitq); + + synctask_wake(task); + } +} + /* Barriers */ int @@ -1032,7 +1206,7 @@ __syncbarrier_wait(struct syncbarrier *barrier, int waitfor) /* called within a synctask */ list_add_tail(&task->waitq, &barrier->waitq); pthread_mutex_unlock(&barrier->guard); - synctask_yield(task); + synctask_yield(task, NULL); pthread_mutex_lock(&barrier->guard); } else { /* called by a non-synctask */ |