From e31467a255ca5d1b787ccb3e49c3d79c7b0eeff0 Mon Sep 17 00:00:00 2001 From: shishir gowda Date: Tue, 20 Aug 2013 16:30:06 +0530 Subject: protocol/server: Implement barriering mechanisms for fops Barriering allows cbk's responses to be held back until either a timeout or a un-barrier event is received. The event is triggered by a brick-op. barrier-timeout, barrier-queue-lenght, and barrier-fops are currently exposed as a xlator-options only Change-Id: Ifb62d5b2bd2fbb35aca3b537e534156c877ce3a6 Signed-off-by: shishir gowda --- xlators/protocol/server/src/server-helpers.c | 307 +++++++++++++++++++++++++++ xlators/protocol/server/src/server-helpers.h | 15 +- xlators/protocol/server/src/server.c | 120 ++++++++++- xlators/protocol/server/src/server.h | 29 +++ 4 files changed, 465 insertions(+), 6 deletions(-) diff --git a/xlators/protocol/server/src/server-helpers.c b/xlators/protocol/server/src/server-helpers.c index 14d074124..053670765 100644 --- a/xlators/protocol/server/src/server-helpers.c +++ b/xlators/protocol/server/src/server-helpers.c @@ -1076,3 +1076,310 @@ server_cancel_grace_timer (xlator_t *this, client_t *client) } return cancelled; } + +int32_t +gf_barrier_transmit (server_conf_t *conf, gf_barrier_payload_t *payload) +{ + gf_barrier_t *barrier = NULL; + int32_t ret = -1; + client_t *client = NULL; + gf_boolean_t lk_heal = _gf_false; + call_frame_t *frame = NULL; + server_state_t *state = NULL; + + GF_VALIDATE_OR_GOTO ("barrier", conf, out); + GF_VALIDATE_OR_GOTO ("barrier", conf->barrier, out); + GF_VALIDATE_OR_GOTO ("barrier", payload, out); + + barrier = conf->barrier; + + frame = payload->frame; + if (frame) { + state = CALL_STATE (frame); + frame->local = NULL; + client = state->client; + } + + if (client) + lk_heal = ((server_conf_t *) client->this->private)->lk_heal; + + ret = rpcsvc_submit_generic (payload->req, payload->rsp, 1, + payload->payload, payload->payload_count, + payload->iobref); + iobuf_unref (payload->iob); + if (ret == -1) { + gf_log_callingfn ("", GF_LOG_ERROR, "Reply submission failed"); + if (frame && client && !lk_heal) { + server_connection_cleanup (frame->this, client, + INTERNAL_LOCKS | POSIX_LOCKS); + } else { + /* TODO: Failure of open(dir), create, inodelk, entrylk + or lk fops send failure must be handled specially. */ + } + goto ret; + } + + ret = 0; +ret: + if (state) { + free_state (state); + } + + if (frame) { + gf_client_unref (client); + STACK_DESTROY (frame->root); + } + + if (payload->free_iobref) { + iobref_unref (payload->iobref); + } +out: + return ret; +} +gf_barrier_payload_t * +gf_barrier_dequeue (gf_barrier_t *barrier) +{ + gf_barrier_payload_t *payload = NULL; + + if (!barrier || list_empty (&barrier->queue)) + return NULL; + + LOCK (&barrier->lock); + { + payload = list_entry (barrier->queue.next, + gf_barrier_payload_t, list); + list_del_init (&payload->list); + barrier->cur_size--; + } + UNLOCK (&barrier->lock); + + return payload; +} + + +void +gf_barrier_dequeue_start (void *data) +{ + server_conf_t *conf = NULL; + gf_barrier_t *barrier = NULL; + gf_barrier_payload_t *payload = NULL; + + conf = (server_conf_t *)data; + if (!conf || !conf->barrier) + return; + barrier = conf->barrier; + + + while (!list_empty (&barrier->queue)) { + payload = gf_barrier_dequeue (barrier); + if (payload) { + if (gf_barrier_transmit (conf, payload)) { + gf_log ("server", GF_LOG_WARNING, + "Failed to transmit"); + } + GF_FREE (payload); + } + } + return; +} + +void +gf_barrier_timeout (void *data) +{ + server_conf_t *conf = NULL; + gf_barrier_t *barrier = NULL; + gf_boolean_t need_dequeue = _gf_false; + + conf = (server_conf_t *)data; + if (!conf || !conf->barrier) + goto out; + barrier = conf->barrier; + + LOCK (&barrier->lock); + { + need_dequeue = barrier->on; + barrier->on = _gf_false; + } + UNLOCK (&barrier->lock); + + if (need_dequeue == _gf_true) + gf_barrier_dequeue_start (data); +out: + return; +} + + +int32_t +gf_barrier_start (xlator_t *this) +{ + server_conf_t *conf = NULL; + gf_barrier_t *barrier = NULL; + int32_t ret = -1; + struct timeval time = {0,}; + + conf = this->private; + + GF_VALIDATE_OR_GOTO ("server", this, out); + GF_VALIDATE_OR_GOTO (this->name, conf, out); + GF_VALIDATE_OR_GOTO (this->name, conf->barrier, out); + + barrier = conf->barrier; + + LOCK (&barrier->lock); + { + /* if barrier is on, reset timer */ + if (barrier->on == _gf_true) { + ret = gf_timer_call_cancel (this->ctx, barrier->timer); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, "Failed to " + "unset timer, failing barrier start"); + goto unlock; + } + } + + barrier->on = _gf_true; + time.tv_sec = barrier->time_out; + time.tv_usec = 0; + barrier->timer = gf_timer_call_after (this->ctx, time, + gf_barrier_timeout, + (void *)conf); + if (!barrier->timer) { + gf_log (this->name, GF_LOG_ERROR, "Failed to set " + "timer, failing barrier start"); + barrier->on = _gf_false; + } + } +unlock: + UNLOCK (&barrier->lock); + + ret = 0; +out: + return ret; +} + +int32_t +gf_barrier_stop (xlator_t *this) +{ + server_conf_t *conf = NULL; + gf_barrier_t *barrier = NULL; + int32_t ret = -1; + gf_boolean_t need_dequeue = _gf_false; + + conf = this->private; + + GF_VALIDATE_OR_GOTO ("server", this, out); + GF_VALIDATE_OR_GOTO (this->name, conf, out); + GF_VALIDATE_OR_GOTO (this->name, conf->barrier, out); + + barrier = conf->barrier; + + LOCK (&barrier->lock); + { + need_dequeue = barrier->on; + barrier->on = _gf_false; + } + UNLOCK (&barrier->lock); + + if (need_dequeue == _gf_true) { + gf_timer_call_cancel (this->ctx, barrier->timer); + gf_barrier_dequeue_start (conf); + } + ret = 0; +out: + return ret; +} + +int32_t +gf_barrier_fops_configure (xlator_t *this, gf_barrier_t *barrier, char *str) +{ + int32_t ret = -1; + char *dup_str = NULL; + char *str_tok = NULL; + char *save_ptr = NULL; + uint64_t fops = 0; + + /* by defaul fsync & flush needs to be barriered */ + + fops |= 1 << GFS3_OP_FSYNC; + fops |= 1 << GFS3_OP_FLUSH; + + if (!str) + goto done; + + dup_str = gf_strdup (str); + if (!dup_str) + goto done; + + str_tok = strtok_r (dup_str, ",", &save_ptr); + if (!str_tok) + goto done; + + fops = 0; + while (str_tok) { + if (!strcmp(str_tok, "writev")) { + fops |= ((uint64_t)1 << GFS3_OP_WRITE); + } else if (!strcmp(str_tok, "fsync")) { + fops |= ((uint64_t)1 << GFS3_OP_FSYNC); + } else if (!strcmp(str_tok, "read")) { + fops |= ((uint64_t)1 << GFS3_OP_READ); + } else if (!strcmp(str_tok, "rename")) { + fops |= ((uint64_t)1 << GFS3_OP_RENAME); + } else if (!strcmp(str_tok, "flush")) { + fops |= ((uint64_t)1 << GFS3_OP_FLUSH); + } else if (!strcmp(str_tok, "ftruncate")) { + fops |= ((uint64_t)1 << GFS3_OP_FTRUNCATE); + } else if (!strcmp(str_tok, "fallocate")) { + fops |= ((uint64_t)1 << GFS3_OP_FALLOCATE); + } else if (!strcmp(str_tok, "rmdir")) { + fops |= ((uint64_t)1 << GFS3_OP_RMDIR); + } else { + gf_log ("barrier", GF_LOG_ERROR, + "Invalid barrier fop %s", str_tok); + } + + str_tok = strtok_r (NULL, ",", &save_ptr); + } +done: + LOCK (&barrier->lock); + { + barrier->fops = fops; + } + ret = 0; + + GF_FREE (dup_str); + return ret; +} + + +void +gf_barrier_enqueue (gf_barrier_t *barrier, gf_barrier_payload_t *payload) +{ + list_add_tail (&payload->list, &barrier->queue); + barrier->cur_size++; +} + +gf_barrier_payload_t * +gf_barrier_payload (rpcsvc_request_t *req, struct iovec *rsp, + call_frame_t *frame, struct iovec *payload_orig, + int payloadcount, struct iobref *iobref, + struct iobuf *iob, gf_boolean_t free_iobref) +{ + gf_barrier_payload_t *payload = NULL; + + payload = GF_CALLOC (1, sizeof (*payload),1); + if (!payload) + return NULL; + + INIT_LIST_HEAD (&payload->list); + + payload->req = req; + payload->rsp = rsp; + payload->frame = frame; + payload->payload = payload_orig; + payload->payload_count = payloadcount; + payload->iobref = iobref; + payload->iob = iob; + payload->free_iobref = free_iobref; + + return payload; +} diff --git a/xlators/protocol/server/src/server-helpers.h b/xlators/protocol/server/src/server-helpers.h index 987528fbd..bbbe0650d 100644 --- a/xlators/protocol/server/src/server-helpers.h +++ b/xlators/protocol/server/src/server-helpers.h @@ -31,6 +31,10 @@ #define IS_NOT_ROOT(pathlen) ((pathlen > 2)? 1 : 0) +#define is_fop_barriered(fops, procnum) (fops & ((uint64_t)1 << procnum)) + +#define barrier_add_to_queue(barrier) (barrier->on || barrier->cur_size) + void free_state (server_state_t *state); void server_loc_wipe (loc_t *loc); @@ -55,5 +59,14 @@ int serialize_rsp_dirent (gf_dirent_t *entries, gfs3_readdir_rsp *rsp); int serialize_rsp_direntp (gf_dirent_t *entries, gfs3_readdirp_rsp *rsp); int readdirp_rsp_cleanup (gfs3_readdirp_rsp *rsp); int readdir_rsp_cleanup (gfs3_readdir_rsp *rsp); - +int32_t gf_barrier_start (xlator_t *this); +int32_t gf_barrier_stop (xlator_t *this); +int32_t gf_barrier_fops_configure (xlator_t *this, gf_barrier_t *barrier, + char *str); +void gf_barrier_enqueue (gf_barrier_t *barrier, gf_barrier_payload_t *stub); +gf_barrier_payload_t * +gf_barrier_payload (rpcsvc_request_t *req, struct iovec *rsp, + call_frame_t *frame, struct iovec *payload, + int payloadcount, struct iobref *iobref, + struct iobuf *iob, gf_boolean_t free_iobref); #endif /* !_SERVER_HELPERS_H */ diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c index aa091e2e5..9a59634a9 100644 --- a/xlators/protocol/server/src/server.c +++ b/xlators/protocol/server/src/server.c @@ -124,8 +124,6 @@ ret: return iob; } - - int server_submit_reply (call_frame_t *frame, rpcsvc_request_t *req, void *arg, struct iovec *payload, int payloadcount, @@ -138,6 +136,9 @@ server_submit_reply (call_frame_t *frame, rpcsvc_request_t *req, void *arg, char new_iobref = 0; client_t *client = NULL; gf_boolean_t lk_heal = _gf_false; + server_conf_t *conf = NULL; + gf_barrier_t *barrier = NULL; + gf_barrier_payload_t *stub = NULL; GF_VALIDATE_OR_GOTO ("server", req, ret); @@ -145,6 +146,7 @@ server_submit_reply (call_frame_t *frame, rpcsvc_request_t *req, void *arg, state = CALL_STATE (frame); frame->local = NULL; client = state->client; + conf = (server_conf_t *) client->this->private; } if (client) @@ -167,6 +169,30 @@ server_submit_reply (call_frame_t *frame, rpcsvc_request_t *req, void *arg, iobref_add (iobref, iob); + if (conf) + barrier = conf->barrier; + if (barrier) { + /* todo: write's with fd flags set to O_SYNC and O_DIRECT */ + LOCK (&barrier->lock); + { + if (is_fop_barriered (barrier->fops, req->procnum) && + (barrier_add_to_queue (barrier))) { + stub = gf_barrier_payload (req, &rsp, frame, + payload, + payloadcount, iobref, + iob, new_iobref); + if (stub) { + gf_barrier_enqueue (barrier, stub); + goto ret; + } else { + gf_log ("", GF_LOG_ERROR, "Failed to " + " barrier fop %"PRIu64, + ((uint64_t)1 << req->procnum)); + } + } + } + UNLOCK (&barrier->lock); + } /* Then, submit the message for transmission. */ ret = rpcsvc_submit_generic (req, &rsp, 1, payload, payloadcount, iobref); @@ -208,7 +234,6 @@ ret: if (new_iobref) { iobref_unref (iobref); } - return ret; } @@ -760,6 +785,8 @@ init (xlator_t *this) server_conf_t *conf = NULL; rpcsvc_listener_t *listener = NULL; char *statedump_path = NULL; + gf_barrier_t *barrier = NULL; + char *str = NULL; GF_VALIDATE_OR_GOTO ("init", this, out); if (this->children == NULL) { @@ -900,6 +927,35 @@ init (xlator_t *this) } } #endif + /* barrier related */ + barrier = GF_CALLOC (1, sizeof (*barrier),1); + if (!barrier) { + gf_log (this->name, GF_LOG_WARNING, + "WARNING: Failed to allocate barrier"); + ret = -1; + goto out; + } + + LOCK_INIT (&barrier->lock); + + GF_OPTION_INIT ("barrier-queue-length", barrier->max_size, + int64, out); + GF_OPTION_INIT ("barrier-timeout", barrier->time_out, + uint64, out); + + ret = dict_get_str (this->options, "barrier-fops", &str); + if (ret) { + gf_log (this->name, GF_LOG_DEBUG, + "setting barrier fops to default value"); + } + ret = gf_barrier_fops_configure (this, barrier, str); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "invalid barrier fops specified"); + goto out; + } + + conf->barrier = barrier; this->private = conf; ret = 0; @@ -953,12 +1009,49 @@ int notify (xlator_t *this, int32_t event, void *data, ...) { int ret = 0; + int32_t val = 0; + dict_t *dict = NULL; + dict_t *output = NULL; + va_list ap; + + dict = data; + va_start (ap, data); + output = va_arg (ap, dict_t*); + va_end (ap); + switch (event) { + /* todo: GF_EVENT_BARRIER */ + case 100: + ret = dict_get_int32 (dict, "barrier", &val); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "Wrong BARRIER event"); + goto out; + } + /* !val un-barrier, if val, barrier */ + if (val) { + ret = gf_barrier_start (this); + if (ret) + gf_log (this->name, GF_LOG_ERROR, + "Barrier start failed"); + } else { + ret = gf_barrier_stop (this); + if (ret) + gf_log (this->name, GF_LOG_ERROR, + "Barrier stop failed"); + } + ret = dict_set_int32 (dict, "barrier-status", ret); + if (ret) + gf_log (this->name, GF_LOG_ERROR, + "Failed to set barrier-status in dict"); + break; + + /* todo: call default_notify to make other xlators handle it.*/ default: default_notify (this, event, data); break; } - +out: return ret; } @@ -1062,6 +1155,23 @@ struct volume_options options[] = { "hostnames to connect to the server. By default, all" " connections are allowed." }, - + {.key = {"barrier-timeout"}, + .type = GF_OPTION_TYPE_INT, + .default_value = "60", + .min = 0, + .max = 360, + .description = "Barrier timeout in seconds", + }, + {.key = {"barrier-queue-length"}, + .type = GF_OPTION_TYPE_INT, + .default_value = "4096", + .min = 0, + .max = 16384, + .description = "Barrier queue length", + }, + {.key = {"barrier-fops"}, + .type = GF_OPTION_TYPE_STR, + .description = "Allow a comma seperated fop lists", + }, { .key = {NULL} }, }; diff --git a/xlators/protocol/server/src/server.h b/xlators/protocol/server/src/server.h index 6675d5b1a..0484ae988 100644 --- a/xlators/protocol/server/src/server.h +++ b/xlators/protocol/server/src/server.h @@ -26,6 +26,34 @@ #define GF_MAX_SOCKET_WINDOW_SIZE (1 * GF_UNIT_MB) #define GF_MIN_SOCKET_WINDOW_SIZE (0) +struct _gf_barrier_payload { + rpcsvc_request_t *req; + struct iovec *rsp; + call_frame_t *frame; + struct iovec *payload; + struct iobref *iobref; + struct iobuf *iob; + int payload_count; + gf_boolean_t free_iobref; + struct list_head list; +}; + +typedef struct _gf_barrier_payload gf_barrier_payload_t; + +struct _gf_barrier { + gf_lock_t lock; + gf_boolean_t on; + gf_boolean_t force; + size_t cur_size; + int64_t max_size; + uint64_t fops; + gf_timer_t *timer; + uint64_t time_out; + struct list_head queue; +}; + +typedef struct _gf_barrier gf_barrier_t; + typedef enum { INTERNAL_LOCKS = 1, POSIX_LOCKS = 2, @@ -54,6 +82,7 @@ struct server_conf { struct timeval grace_tv; dict_t *auth_modules; pthread_mutex_t mutex; + gf_barrier_t *barrier; struct list_head xprt_list; }; typedef struct server_conf server_conf_t; -- cgit