summaryrefslogtreecommitdiffstats
path: root/xlators/protocol/server/src/server.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/protocol/server/src/server.c')
-rw-r--r--xlators/protocol/server/src/server.c215
1 files changed, 181 insertions, 34 deletions
diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c
index aa091e2e5..589bd7b36 100644
--- a/xlators/protocol/server/src/server.c
+++ b/xlators/protocol/server/src/server.c
@@ -25,8 +25,6 @@
#include "statedump.h"
#include "defaults.h"
#include "authenticate.h"
-#include "rpcsvc.h"
-#include "client_t.h"
void
grace_time_handler (void *data)
@@ -34,6 +32,7 @@ grace_time_handler (void *data)
client_t *client = NULL;
xlator_t *this = NULL;
gf_timer_t *timer = NULL;
+ server_ctx_t *serv_ctx = NULL;
gf_boolean_t cancelled = _gf_false;
gf_boolean_t detached = _gf_false;
@@ -43,16 +42,23 @@ grace_time_handler (void *data)
GF_VALIDATE_OR_GOTO (THIS->name, this, out);
gf_log (this->name, GF_LOG_INFO, "grace timer expired for %s",
- client->server_ctx.client_uid);
+ client->client_uid);
- LOCK (&client->server_ctx.fdtable_lock);
+ serv_ctx = server_ctx_get (client, this);
+
+ if (serv_ctx == NULL) {
+ gf_log (this->name, GF_LOG_INFO, "server_ctx_get() failed");
+ goto out;
+ }
+
+ LOCK (&serv_ctx->fdtable_lock);
{
- if (client->server_ctx.grace_timer) {
- timer = client->server_ctx.grace_timer;
- client->server_ctx.grace_timer = NULL;
+ if (serv_ctx->grace_timer) {
+ timer = serv_ctx->grace_timer;
+ serv_ctx->grace_timer = NULL;
}
}
- UNLOCK (&client->server_ctx.fdtable_lock);
+ UNLOCK (&serv_ctx->fdtable_lock);
if (timer) {
gf_timer_call_cancel (this->ctx, timer);
cancelled = _gf_true;
@@ -67,7 +73,7 @@ grace_time_handler (void *data)
gf_client_put (client, &detached);
if (detached)//reconnection did not happen :-(
server_connection_cleanup (this, client,
- INTERNAL_LOCKS | POSIX_LOCKS);
+ INTERNAL_LOCKS | POSIX_LOCKS);
gf_client_unref (client);
}
out:
@@ -124,8 +130,6 @@ ret:
return iob;
}
-
-
int
server_submit_reply (call_frame_t *frame, rpcsvc_request_t *req, void *arg,
struct iovec *payload, int payloadcount,
@@ -138,13 +142,18 @@ server_submit_reply (call_frame_t *frame, rpcsvc_request_t *req, void *arg,
char new_iobref = 0;
client_t *client = NULL;
gf_boolean_t lk_heal = _gf_false;
+ server_conf_t *conf = NULL;
+ gf_barrier_t *barrier = NULL;
+ gf_barrier_payload_t *stub = NULL;
+ gf_boolean_t barriered = _gf_false;
GF_VALIDATE_OR_GOTO ("server", req, ret);
if (frame) {
state = CALL_STATE (frame);
frame->local = NULL;
- client = state->client;
+ client = frame->root->client;
+ conf = (server_conf_t *) client->this->private;
}
if (client)
@@ -167,6 +176,32 @@ server_submit_reply (call_frame_t *frame, rpcsvc_request_t *req, void *arg,
iobref_add (iobref, iob);
+ if (conf)
+ barrier = conf->barrier;
+ if (barrier) {
+ /* todo: write's with fd flags set to O_SYNC and O_DIRECT */
+ LOCK (&barrier->lock);
+ {
+ if (is_fop_barriered (barrier->fops, req->procnum) &&
+ (barrier_add_to_queue (barrier))) {
+ stub = gf_barrier_payload (req, &rsp, frame,
+ payload,
+ payloadcount, iobref,
+ iob, new_iobref);
+ if (stub) {
+ gf_barrier_enqueue (barrier, stub);
+ barriered = _gf_true;
+ } else {
+ gf_log ("", GF_LOG_ERROR, "Failed to "
+ " barrier fop %"PRIu64,
+ ((uint64_t)1 << req->procnum));
+ }
+ }
+ }
+ UNLOCK (&barrier->lock);
+ if (barriered == _gf_true)
+ goto out;
+ }
/* Then, submit the message for transmission. */
ret = rpcsvc_submit_generic (req, &rsp, 1, payload, payloadcount,
iobref);
@@ -208,7 +243,7 @@ ret:
if (new_iobref) {
iobref_unref (iobref);
}
-
+out:
return ret;
}
@@ -463,6 +498,7 @@ server_rpc_notify (rpcsvc_t *rpc, void *xl, rpcsvc_event_t event,
rpc_transport_t *trans = NULL;
server_conf_t *conf = NULL;
client_t *client = NULL;
+ server_ctx_t *serv_ctx = NULL;
if (!xl || !data) {
gf_log_callingfn ("server", GF_LOG_WARNING,
@@ -471,7 +507,7 @@ server_rpc_notify (rpcsvc_t *rpc, void *xl, rpcsvc_event_t event,
}
this = xl;
- trans= data;
+ trans = data;
conf = this->private;
switch (event) {
@@ -511,7 +547,7 @@ server_rpc_notify (rpcsvc_t *rpc, void *xl, rpcsvc_event_t event,
break;
gf_log (this->name, GF_LOG_INFO, "disconnecting connection"
- "from %s", client->server_ctx.client_uid);
+ "from %s", client->client_uid);
/* If lock self heal is off, then destroy the
conn object, else register a grace timer event */
@@ -527,22 +563,30 @@ server_rpc_notify (rpcsvc_t *rpc, void *xl, rpcsvc_event_t event,
trans->xl_private = NULL;
server_connection_cleanup (this, client, INTERNAL_LOCKS);
- LOCK (&client->server_ctx.fdtable_lock);
+ serv_ctx = server_ctx_get (client, this);
+
+ if (serv_ctx == NULL) {
+ gf_log (this->name, GF_LOG_INFO,
+ "server_ctx_get() failed");
+ goto out;
+ }
+
+ LOCK (&serv_ctx->fdtable_lock);
{
- if (!client->server_ctx.grace_timer) {
+ if (!serv_ctx->grace_timer) {
gf_log (this->name, GF_LOG_INFO,
"starting a grace timer for %s",
- client->server_ctx.client_uid);
+ client->client_uid);
- client->server_ctx.grace_timer =
+ serv_ctx->grace_timer =
gf_timer_call_after (this->ctx,
- conf->grace_tv,
+ conf->grace_ts,
grace_time_handler,
client);
}
}
- UNLOCK (&client->server_ctx.fdtable_lock);
+ UNLOCK (&serv_ctx->fdtable_lock);
break;
case RPCSVC_EVENT_TRANSPORT_DESTROY:
/*- conn obj has been disassociated from trans on first
@@ -639,14 +683,14 @@ server_init_grace_timer (xlator_t *this, dict_t *options,
ret = dict_get_int32 (options, "grace-timeout", &grace_timeout);
if (!ret)
- conf->grace_tv.tv_sec = grace_timeout;
+ conf->grace_ts.tv_sec = grace_timeout;
else
- conf->grace_tv.tv_sec = 10;
+ conf->grace_ts.tv_sec = 10;
gf_log (this->name, GF_LOG_DEBUG, "Server grace timeout "
- "value = %"PRIu64, conf->grace_tv.tv_sec);
+ "value = %"PRIu64, conf->grace_ts.tv_sec);
- conf->grace_tv.tv_usec = 0;
+ conf->grace_ts.tv_nsec = 0;
ret = 0;
out:
@@ -693,12 +737,6 @@ reconfigure (xlator_t *this, dict_t *options)
}
- /*ret = dict_get_str (options, "statedump-path", &statedump_path);
- if (!ret) {
- gf_path_strip_trailing_slashes (statedump_path);
- GF_FREE (this->ctx->statedump_path);
- this->ctx->statedump_path = gf_strdup (statedump_path);
- }*/
GF_OPTION_RECONF ("statedump-path", statedump_path,
options, path, out);
if (!statedump_path) {
@@ -737,6 +775,7 @@ reconfigure (xlator_t *this, dict_t *options)
(void) rpcsvc_set_allow_insecure (rpc_conf, options);
(void) rpcsvc_set_root_squash (rpc_conf, options);
+ (void) rpcsvc_set_outstanding_rpc_limit (rpc_conf, options);
list_for_each_entry (listeners, &(rpc_conf->listeners), list) {
if (listeners->trans != NULL) {
if (listeners->trans->reconfigure )
@@ -753,6 +792,26 @@ out:
return ret;
}
+static int32_t
+client_destroy_cbk (xlator_t *this, client_t *client)
+{
+ void *tmp = NULL;
+ server_ctx_t *ctx = NULL;
+
+ client_ctx_del (client, this, &tmp);
+
+ ctx = tmp;
+
+ if (ctx == NULL)
+ return 0;
+
+ gf_fd_fdtable_destroy (ctx->fdtable);
+ LOCK_DESTROY (&ctx->fdtable_lock);
+ GF_FREE (ctx);
+
+ return 0;
+}
+
int
init (xlator_t *this)
{
@@ -760,6 +819,8 @@ init (xlator_t *this)
server_conf_t *conf = NULL;
rpcsvc_listener_t *listener = NULL;
char *statedump_path = NULL;
+ gf_barrier_t *barrier = NULL;
+ char *str = NULL;
GF_VALIDATE_OR_GOTO ("init", this, out);
if (this->children == NULL) {
@@ -900,6 +961,37 @@ init (xlator_t *this)
}
}
#endif
+ /* barrier related */
+ barrier = GF_CALLOC (1, sizeof (*barrier),1);
+ if (!barrier) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "WARNING: Failed to allocate barrier");
+ ret = -1;
+ goto out;
+ }
+
+ LOCK_INIT (&barrier->lock);
+ INIT_LIST_HEAD (&barrier->queue);
+ barrier->on = _gf_false;
+
+ GF_OPTION_INIT ("barrier-queue-length", barrier->max_size,
+ int64, out);
+ GF_OPTION_INIT ("barrier-timeout", barrier->time_out,
+ uint64, out);
+
+ ret = dict_get_str (this->options, "barrier-fops", &str);
+ if (ret) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "setting barrier fops to default value");
+ }
+ ret = gf_barrier_fops_configure (this, barrier, str);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "invalid barrier fops specified");
+ goto out;
+ }
+
+ conf->barrier = barrier;
this->private = conf;
ret = 0;
@@ -953,19 +1045,57 @@ int
notify (xlator_t *this, int32_t event, void *data, ...)
{
int ret = 0;
+ int32_t val = 0;
+ dict_t *dict = NULL;
+ dict_t *output = NULL;
+ va_list ap;
+
+ dict = data;
+ va_start (ap, data);
+ output = va_arg (ap, dict_t*);
+ va_end (ap);
+
switch (event) {
+ case GF_EVENT_VOLUME_BARRIER_OP:
+ ret = dict_get_int32 (dict, "barrier", &val);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Wrong BARRIER event");
+ goto out;
+ }
+ /* !val un-barrier, if val, barrier */
+ if (val) {
+ ret = gf_barrier_start (this);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR,
+ "Barrier start failed");
+ } else {
+ ret = gf_barrier_stop (this);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR,
+ "Barrier stop failed");
+ }
+ ret = dict_set_int32 (output, "barrier-status", ret);
+ if (ret)
+ gf_log (this->name, GF_LOG_ERROR,
+ "Failed to set barrier-status in dict");
+ break;
+
+ /* todo: call default_notify to make other xlators handle it.*/
default:
default_notify (this, event, data);
break;
}
-
+out:
return ret;
}
struct xlator_fops fops;
-struct xlator_cbks cbks;
+struct xlator_cbks cbks = {
+ .client_destroy = client_destroy_cbk,
+};
struct xlator_dumpops dumpops = {
.priv = server_priv,
@@ -1062,6 +1192,23 @@ struct volume_options options[] = {
"hostnames to connect to the server. By default, all"
" connections are allowed."
},
-
+ {.key = {"barrier-timeout"},
+ .type = GF_OPTION_TYPE_INT,
+ .default_value = "60",
+ .min = 0,
+ .max = 360,
+ .description = "Barrier timeout in seconds",
+ },
+ {.key = {"barrier-queue-length"},
+ .type = GF_OPTION_TYPE_INT,
+ .default_value = "4096",
+ .min = 0,
+ .max = 16384,
+ .description = "Barrier queue length",
+ },
+ {.key = {"barrier-fops"},
+ .type = GF_OPTION_TYPE_STR,
+ .description = "Allow a comma seperated fop lists",
+ },
{ .key = {NULL} },
};