diff options
Diffstat (limited to 'xlators/protocol/client/src/client-handshake.c')
| -rw-r--r-- | xlators/protocol/client/src/client-handshake.c | 1516 |
1 files changed, 1364 insertions, 152 deletions
diff --git a/xlators/protocol/client/src/client-handshake.c b/xlators/protocol/client/src/client-handshake.c index f45cc84aa..5668fea53 100644 --- a/xlators/protocol/client/src/client-handshake.c +++ b/xlators/protocol/client/src/client-handshake.c @@ -1,20 +1,11 @@ /* - Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> + Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com> This file is part of GlusterFS. - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - <http://www.gnu.org/licenses/>. + This file is licensed to you under your choice of the GNU Lesser + General Public License, version 3 or any later version (LGPLv3 or + later), or the GNU General Public License, version 2 (GPLv2), in all + cases as published by the Free Software Foundation. */ #ifndef _CONFIG_H @@ -22,20 +13,37 @@ #include "config.h" #endif +#include "fd-lk.h" #include "client.h" #include "xlator.h" #include "defaults.h" #include "glusterfs.h" -#include "msg-xdr.h" #include "statedump.h" #include "compat-errno.h" -extern rpc_clnt_prog_t clnt3_1_fop_prog; -extern rpc_clnt_prog_t clnt3_1_mgmt_prog; +#include "glusterfs3.h" +#include "portmap-xdr.h" +#include "rpc-common-xdr.h" + +#define CLIENT_REOPEN_MAX_ATTEMPTS 1024 +extern rpc_clnt_prog_t clnt3_3_fop_prog; +extern rpc_clnt_prog_t clnt_pmap_prog; int client_ping_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe); +int client_set_lk_version_cbk (struct rpc_req *req, struct iovec *iov, + int count, void *myframe); + +int client_set_lk_version (xlator_t *this); + +typedef struct client_fd_lk_local { + int ref; + gf_boolean_t error; + gf_lock_t lock; + clnt_fd_ctx_t *fdctx; +}clnt_fd_lk_local_t; + /* Handshake */ void @@ -45,23 +53,32 @@ rpc_client_ping_timer_expired (void *data) rpc_clnt_connection_t *conn = NULL; int disconnect = 0; int transport_activity = 0; - struct timeval timeout = {0, }; + struct timespec timeout = {0, }; struct timeval current = {0, }; struct rpc_clnt *clnt = NULL; xlator_t *this = NULL; clnt_conf_t *conf = NULL; - if (!data) { + this = data; + + if (!this || !this->private) { + gf_log (THIS->name, GF_LOG_WARNING, "xlator initialization not done"); goto out; } - this = data; conf = this->private; - conn = &conf->rpc->conn; + clnt = conf->rpc; + if (!clnt) { + gf_log (this->name, GF_LOG_WARNING, "rpc not initialized"); + goto out; + } + + conn = &clnt->conn; trans = conn->trans; - if (!clnt || !trans) { + if (!trans) { + gf_log (this->name, GF_LOG_WARNING, "transport not initialized"); goto out; } @@ -84,15 +101,15 @@ rpc_client_ping_timer_expired (void *data) "ping timer expired but transport activity " "detected - not bailing transport"); timeout.tv_sec = conf->opt.ping_timeout; - timeout.tv_usec = 0; + timeout.tv_nsec = 0; conn->ping_timer = gf_timer_call_after (this->ctx, timeout, rpc_client_ping_timer_expired, (void *) this); if (conn->ping_timer == NULL) - gf_log (trans->name, GF_LOG_DEBUG, - "unable to setup timer"); + gf_log (trans->name, GF_LOG_WARNING, + "unable to setup ping timer"); } else { conn->ping_started = 0; @@ -103,8 +120,8 @@ rpc_client_ping_timer_expired (void *data) pthread_mutex_unlock (&conn->lock); if (disconnect) { - gf_log (trans->name, GF_LOG_ERROR, - "Server %s has not responded in the last %d " + gf_log (trans->name, GF_LOG_CRITICAL, + "server %s has not responded in the last %d " "seconds, disconnecting.", conn->trans->peerinfo.identifier, conf->opt.ping_timeout); @@ -123,19 +140,27 @@ client_start_ping (void *data) clnt_conf_t *conf = NULL; rpc_clnt_connection_t *conn = NULL; int32_t ret = -1; - struct timeval timeout = {0, }; + struct timespec timeout = {0, }; call_frame_t *frame = NULL; int frame_count = 0; - rpc_transport_t *trans = NULL; this = data; - conf = this->private; + if (!this || !this->private) { + gf_log (THIS->name, GF_LOG_WARNING, "xlator not initialized"); + goto fail; + } + conf = this->private; + if (!conf->rpc) { + gf_log (this->name, GF_LOG_WARNING, "rpc not initialized"); + goto fail; + } conn = &conf->rpc->conn; - trans = conn->trans; - if (conf->opt.ping_timeout == 0) + if (conf->opt.ping_timeout == 0) { + gf_log (this->name, GF_LOG_INFO, "ping timeout is 0, returning"); return; + } pthread_mutex_lock (&conn->lock); { @@ -154,19 +179,24 @@ client_start_ping (void *data) /* using goto looked ugly here, * hence getting out this way */ /* unlock */ + gf_log (this->name, GF_LOG_DEBUG, + "returning as transport is already disconnected" + " OR there are no frames (%d || %d)", + frame_count, !conn->connected); + pthread_mutex_unlock (&conn->lock); return; } if (frame_count < 0) { - gf_log (this->name, GF_LOG_DEBUG, + gf_log (this->name, GF_LOG_WARNING, "saved_frames->count is %"PRId64, conn->saved_frames->count); conn->saved_frames->count = 0; } timeout.tv_sec = conf->opt.ping_timeout; - timeout.tv_usec = 0; + timeout.tv_nsec = 0; conn->ping_timer = gf_timer_call_after (this->ctx, timeout, @@ -174,8 +204,8 @@ client_start_ping (void *data) (void *) this); if (conn->ping_timer == NULL) { - gf_log (this->name, GF_LOG_DEBUG, - "unable to setup timer"); + gf_log (this->name, GF_LOG_WARNING, + "unable to setup ping timer"); } else { conn->ping_started = 1; } @@ -187,11 +217,16 @@ client_start_ping (void *data) goto fail; ret = client_submit_request (this, NULL, frame, conf->handshake, - GF_HNDSK_PING, client_ping_cbk, NULL, NULL); + GF_HNDSK_PING, client_ping_cbk, NULL, + NULL, 0, NULL, 0, NULL, (xdrproc_t)NULL); + if (ret) { + gf_log (THIS->name, GF_LOG_ERROR, + "failed to start ping timer"); + } return; -fail: +fail: if (frame) { STACK_DESTROY (frame->root); } @@ -206,26 +241,47 @@ client_ping_cbk (struct rpc_req *req, struct iovec *iov, int count, { xlator_t *this = NULL; rpc_clnt_connection_t *conn = NULL; - struct timeval timeout = {0, }; + struct timespec timeout = {0, }; call_frame_t *frame = NULL; clnt_conf_t *conf = NULL; + if (!myframe) { + gf_log (THIS->name, GF_LOG_WARNING, + "frame with the request is NULL"); + goto out; + } frame = myframe; - this = frame->this; - conf = this->private; - conn = &conf->rpc->conn; - - if (req->rpc_status == -1) { - /* timer expired and transport bailed out */ - gf_log (this->name, GF_LOG_DEBUG, "timer must have expired"); + if (!this || !this->private) { + gf_log (THIS->name, GF_LOG_WARNING, + "xlator private is not set"); goto out; } + conf = this->private; + conn = &conf->rpc->conn; + pthread_mutex_lock (&conn->lock); { + if (req->rpc_status == -1) { + if (conn->ping_timer != NULL) { + gf_log (this->name, GF_LOG_WARNING, + "socket or ib related error"); + gf_timer_call_cancel (this->ctx, + conn->ping_timer); + conn->ping_timer = NULL; + } else { + /* timer expired and transport bailed out */ + gf_log (this->name, GF_LOG_WARNING, + "timer must have expired"); + } + + goto unlock; + } + + timeout.tv_sec = conf->opt.ping_timeout; - timeout.tv_usec = 0; + timeout.tv_nsec = 0; gf_timer_call_cancel (this->ctx, conn->ping_timer); @@ -235,53 +291,64 @@ client_ping_cbk (struct rpc_req *req, struct iovec *iov, int count, client_start_ping, (void *)this); if (conn->ping_timer == NULL) - gf_log (this->name, GF_LOG_DEBUG, - "gf_timer_call_after() returned NULL"); + gf_log (this->name, GF_LOG_WARNING, + "failed to set the ping timer"); } +unlock: pthread_mutex_unlock (&conn->lock); out: - STACK_DESTROY (frame->root); + if (frame) + STACK_DESTROY (frame->root); return 0; } int -client3_getspec_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) +client3_getspec_cbk (struct rpc_req *req, struct iovec *iov, int count, + void *myframe) { gf_getspec_rsp rsp = {0,}; call_frame_t *frame = NULL; - clnt_conf_t *conf = NULL; int ret = 0; frame = myframe; - conf = frame->this->private; - if (-1 == req->rpc_status) { + if (!frame || !frame->this) { + gf_log (THIS->name, GF_LOG_ERROR, "frame not found with the request, " + "returning EINVAL"); rsp.op_ret = -1; rsp.op_errno = EINVAL; goto out; } + if (-1 == req->rpc_status) { + gf_log (frame->this->name, GF_LOG_WARNING, + "received RPC status error, returning ENOTCONN"); + rsp.op_ret = -1; + rsp.op_errno = ENOTCONN; + goto out; + } - ret = xdr_to_getspec_rsp (*iov, &rsp); + ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_gf_getspec_rsp); if (ret < 0) { - gf_log ("", GF_LOG_ERROR, "error"); + gf_log (frame->this->name, GF_LOG_ERROR, + "XDR decoding failed, returning EINVAL"); rsp.op_ret = -1; rsp.op_errno = EINVAL; goto out; } if (-1 == rsp.op_ret) { - gf_log (frame->this->name, GF_LOG_ERROR, + gf_log (frame->this->name, GF_LOG_WARNING, "failed to get the 'volume file' from server"); goto out; } out: - STACK_UNWIND_STRICT (getspec, frame, rsp.op_ret, rsp.op_errno, rsp.spec); + CLIENT_STACK_UNWIND (getspec, frame, rsp.op_ret, rsp.op_errno, + rsp.spec); /* Don't use 'GF_FREE', this is allocated by libc */ - if (rsp.spec) - free (rsp.spec); + free (rsp.spec); return 0; } @@ -292,6 +359,7 @@ int32_t client3_getspec (call_frame_t *frame, xlator_t *this, void *data) clnt_args_t *args = NULL; gf_getspec_req req = {0,}; int op_errno = ESTALE; + int ret = 0; if (!frame || !this || !data) goto unwind; @@ -301,25 +369,916 @@ int32_t client3_getspec (call_frame_t *frame, xlator_t *this, void *data) req.flags = args->flags; req.key = (char *)args->name; - client_submit_request (this, &req, frame, conf->handshake, GF_HNDSK_GETSPEC, - client3_getspec_cbk, NULL, xdr_from_getspec_req); + ret = client_submit_request (this, &req, frame, conf->handshake, + GF_HNDSK_GETSPEC, client3_getspec_cbk, + NULL, NULL, 0, NULL, 0, NULL, + (xdrproc_t)xdr_gf_getspec_req); + + if (ret) { + gf_log (this->name, GF_LOG_WARNING, + "failed to send the request"); + } return 0; unwind: - STACK_UNWIND_STRICT (getspec, frame, -1, op_errno, NULL); + CLIENT_STACK_UNWIND (getspec, frame, -1, op_errno, NULL); return 0; } int +client_notify_parents_child_up (xlator_t *this) +{ + clnt_conf_t *conf = NULL; + int ret = 0; + + conf = this->private; + ret = default_notify (this, GF_EVENT_CHILD_UP, NULL); + if (ret) + gf_log (this->name, GF_LOG_INFO, + "notify of CHILD_UP failed"); + + conf->last_sent_event = GF_EVENT_CHILD_UP; + return 0; +} + +int +clnt_fd_lk_reacquire_failed (xlator_t *this, clnt_fd_ctx_t *fdctx, + clnt_conf_t *conf) +{ + int ret = -1; + + GF_VALIDATE_OR_GOTO ("client", this, out); + GF_VALIDATE_OR_GOTO (this->name, conf, out); + GF_VALIDATE_OR_GOTO (this->name, fdctx, out); + + pthread_mutex_lock (&conf->lock); + { + fdctx->remote_fd = -1; + fdctx->lk_heal_state = GF_LK_HEAL_DONE; + } + pthread_mutex_unlock (&conf->lock); + + ret = 0; +out: + return ret; +} + +int +client_set_lk_version_cbk (struct rpc_req *req, struct iovec *iov, + int count, void *myframe) +{ + int32_t ret = -1; + call_frame_t *fr = NULL; + gf_set_lk_ver_rsp rsp = {0,}; + + fr = (call_frame_t *) myframe; + GF_VALIDATE_OR_GOTO ("client", fr, out); + + if (req->rpc_status == -1) { + gf_log (fr->this->name, GF_LOG_WARNING, + "received RPC status error"); + goto out; + } + + ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_gf_set_lk_ver_rsp); + if (ret < 0) + gf_log (fr->this->name, GF_LOG_WARNING, + "xdr decoding failed"); + else + gf_log (fr->this->name, GF_LOG_INFO, + "Server lk version = %d", rsp.lk_ver); + + ret = 0; +out: + if (fr) + STACK_DESTROY (fr->root); + + return ret; +} + +//TODO: Check for all released fdctx and destroy them +int +client_set_lk_version (xlator_t *this) +{ + int ret = -1; + clnt_conf_t *conf = NULL; + call_frame_t *frame = NULL; + gf_set_lk_ver_req req = {0, }; + + GF_VALIDATE_OR_GOTO ("client", this, err); + + conf = (clnt_conf_t *) this->private; + + req.lk_ver = client_get_lk_ver (conf); + ret = gf_asprintf (&req.uid, "%s-%s-%d", + this->ctx->process_uuid, this->name, + this->graph->id); + if (ret == -1) + goto err; + + frame = create_frame (this, this->ctx->pool); + if (!frame) { + ret = -1; + goto out; + } + + gf_log (this->name, GF_LOG_DEBUG, "Sending SET_LK_VERSION"); + + ret = client_submit_request (this, &req, frame, + conf->handshake, + GF_HNDSK_SET_LK_VER, + client_set_lk_version_cbk, + NULL, NULL, 0, NULL, 0, NULL, + (xdrproc_t)xdr_gf_set_lk_ver_req); +out: + GF_FREE (req.uid); + return ret; +err: + gf_log (this->name, GF_LOG_WARNING, + "Failed to send SET_LK_VERSION to server"); + + return ret; +} + +int +client_fd_lk_count (fd_lk_ctx_t *lk_ctx) +{ + int count = 0; + fd_lk_ctx_node_t *fd_lk = NULL; + + GF_VALIDATE_OR_GOTO ("client", lk_ctx, err); + + LOCK (&lk_ctx->lock); + { + list_for_each_entry (fd_lk, &lk_ctx->lk_list, next) + count++; + } + UNLOCK (&lk_ctx->lock); + + return count; +err: + return -1; +} + +clnt_fd_lk_local_t * +clnt_fd_lk_local_ref (xlator_t *this, clnt_fd_lk_local_t *local) +{ + GF_VALIDATE_OR_GOTO (this->name, local, out); + + LOCK (&local->lock); + { + local->ref++; + } + UNLOCK (&local->lock); +out: + return local; +} + +int +clnt_fd_lk_local_unref (xlator_t *this, clnt_fd_lk_local_t *local) +{ + int ref = -1; + + GF_VALIDATE_OR_GOTO (this->name, local, out); + + LOCK (&local->lock); + { + ref = --local->ref; + } + UNLOCK (&local->lock); + + if (ref == 0) { + LOCK_DESTROY (&local->lock); + GF_FREE (local); + } +out: + return ref; +} + +clnt_fd_lk_local_t * +clnt_fd_lk_local_create (clnt_fd_ctx_t *fdctx) +{ + clnt_fd_lk_local_t *local = NULL; + + local = GF_CALLOC (1, sizeof (clnt_fd_lk_local_t), + gf_client_mt_clnt_fd_lk_local_t); + if (!local) + goto out; + + local->ref = 1; + local->error = _gf_false; + local->fdctx = fdctx; + + LOCK_INIT (&local->lock); +out: + return local; +} + +void +clnt_mark_fd_bad (clnt_conf_t *conf, clnt_fd_ctx_t *fdctx) +{ + pthread_mutex_lock (&conf->lock); + { + fdctx->remote_fd = -1; + } + pthread_mutex_unlock (&conf->lock); +} + +int +clnt_release_reopen_fd_cbk (struct rpc_req *req, struct iovec *iov, + int count, void *myframe) +{ + xlator_t *this = NULL; + call_frame_t *frame = NULL; + clnt_conf_t *conf = NULL; + clnt_fd_ctx_t *fdctx = NULL; + + frame = myframe; + this = frame->this; + fdctx = (clnt_fd_ctx_t *) frame->local; + conf = (clnt_conf_t *) this->private; + + clnt_fd_lk_reacquire_failed (this, fdctx, conf); + + fdctx->reopen_done (fdctx, this); + + frame->local = NULL; + STACK_DESTROY (frame->root); + + return 0; +} + +int +clnt_release_reopen_fd (xlator_t *this, clnt_fd_ctx_t *fdctx) +{ + int ret = -1; + clnt_conf_t *conf = NULL; + call_frame_t *frame = NULL; + gfs3_release_req req = {{0,},}; + + conf = (clnt_conf_t *) this->private; + + frame = create_frame (this, this->ctx->pool); + if (!frame) + goto out; + + frame->local = (void *) fdctx; + req.fd = fdctx->remote_fd; + + ret = client_submit_request (this, &req, frame, conf->fops, + GFS3_OP_RELEASE, + clnt_release_reopen_fd_cbk, NULL, + NULL, 0, NULL, 0, NULL, + (xdrproc_t)xdr_gfs3_releasedir_req); + return 0; + out: + if (ret) { + clnt_fd_lk_reacquire_failed (this, fdctx, conf); + fdctx->reopen_done (fdctx, this); + if (frame) { + frame->local = NULL; + STACK_DESTROY (frame->root); + } + } + return 0; +} + +int +clnt_reacquire_lock_error (xlator_t *this, clnt_fd_ctx_t *fdctx, + clnt_conf_t *conf) +{ + int32_t ret = -1; + + GF_VALIDATE_OR_GOTO ("client", this, out); + GF_VALIDATE_OR_GOTO (this->name, fdctx, out); + GF_VALIDATE_OR_GOTO (this->name, conf, out); + + clnt_release_reopen_fd (this, fdctx); + + ret = 0; +out: + return ret; +} + +gf_boolean_t +clnt_fd_lk_local_error_status (xlator_t *this, + clnt_fd_lk_local_t *local) +{ + gf_boolean_t error = _gf_false; + + LOCK (&local->lock); + { + error = local->error; + } + UNLOCK (&local->lock); + + return error; +} + +int +clnt_fd_lk_local_mark_error (xlator_t *this, + clnt_fd_lk_local_t *local) +{ + int32_t ret = -1; + clnt_conf_t *conf = NULL; + gf_boolean_t error = _gf_false; + + GF_VALIDATE_OR_GOTO ("client", this, out); + GF_VALIDATE_OR_GOTO (this->name, local, out); + + conf = (clnt_conf_t *) this->private; + + LOCK (&local->lock); + { + error = local->error; + local->error = _gf_true; + } + UNLOCK (&local->lock); + + if (!error) + clnt_reacquire_lock_error (this, local->fdctx, conf); + ret = 0; +out: + return ret; +} + +int +client_reacquire_lock_cbk (struct rpc_req *req, struct iovec *iov, + int count, void *myframe) +{ + int32_t ret = -1; + xlator_t *this = NULL; + gfs3_lk_rsp rsp = {0,}; + call_frame_t *frame = NULL; + clnt_conf_t *conf = NULL; + clnt_fd_ctx_t *fdctx = NULL; + clnt_fd_lk_local_t *local = NULL; + struct gf_flock lock = {0,}; + + frame = (call_frame_t *) myframe; + this = frame->this; + local = (clnt_fd_lk_local_t *) frame->local; + conf = (clnt_conf_t *) this->private; + + if (req->rpc_status == -1) { + gf_log ("client", GF_LOG_WARNING, + "request failed at rpc"); + goto out; + } + + ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_gfs3_lk_rsp); + if (ret < 0) { + gf_log (this->name, GF_LOG_ERROR, "XDR decoding failed"); + goto out; + } + + if (rsp.op_ret == -1) { + gf_log (this->name, GF_LOG_ERROR, "lock request failed"); + ret = -1; + goto out; + } + + fdctx = local->fdctx; + + gf_proto_flock_to_flock (&rsp.flock, &lock); + + gf_log (this->name, GF_LOG_DEBUG, "%s type lock reacquired on file " + "with gfid %s from %"PRIu64 " to %"PRIu64, + get_lk_type (lock.l_type), uuid_utoa (fdctx->gfid), + lock.l_start, lock.l_start + lock.l_len); + + if (!clnt_fd_lk_local_error_status (this, local) && + clnt_fd_lk_local_unref (this, local) == 0) { + pthread_mutex_lock (&conf->lock); + { + fdctx->lk_heal_state = GF_LK_HEAL_DONE; + } + pthread_mutex_unlock (&conf->lock); + + fdctx->reopen_done (fdctx, this); + } + + ret = 0; +out: + if (ret < 0) { + clnt_fd_lk_local_mark_error (this, local); + + clnt_fd_lk_local_unref (this, local); + } + + frame->local = NULL; + STACK_DESTROY (frame->root); + + return ret; +} + +int +_client_reacquire_lock (xlator_t *this, clnt_fd_ctx_t *fdctx) +{ + int32_t ret = -1; + int32_t gf_cmd = 0; + int32_t gf_type = 0; + gfs3_lk_req req = {{0,},}; + struct gf_flock flock = {0,}; + fd_lk_ctx_t *lk_ctx = NULL; + clnt_fd_lk_local_t *local = NULL; + fd_lk_ctx_node_t *fd_lk = NULL; + call_frame_t *frame = NULL; + clnt_conf_t *conf = NULL; + + conf = (clnt_conf_t *) this->private; + lk_ctx = fdctx->lk_ctx; + + local = clnt_fd_lk_local_create (fdctx); + if (!local) { + gf_log (this->name, GF_LOG_WARNING, "clnt_fd_lk_local_create " + "failed, aborting reacquring of locks on %s.", + uuid_utoa (fdctx->gfid)); + clnt_reacquire_lock_error (this, fdctx, conf); + goto out; + } + + list_for_each_entry (fd_lk, &lk_ctx->lk_list, next) { + memcpy (&flock, &fd_lk->user_flock, + sizeof (struct gf_flock)); + + /* Always send F_SETLK even if the cmd was F_SETLKW */ + /* to avoid frame being blocked if lock cannot be granted. */ + ret = client_cmd_to_gf_cmd (F_SETLK, &gf_cmd); + if (ret) { + gf_log (this->name, GF_LOG_WARNING, + "client_cmd_to_gf_cmd failed, " + "aborting reacquiring of locks"); + break; + } + + gf_type = client_type_to_gf_type (flock.l_type); + req.fd = fdctx->remote_fd; + req.cmd = gf_cmd; + req.type = gf_type; + (void) gf_proto_flock_from_flock (&req.flock, + &flock); + + memcpy (req.gfid, fdctx->gfid, 16); + + frame = create_frame (this, this->ctx->pool); + if (!frame) { + ret = -1; + break; + } + + frame->local = clnt_fd_lk_local_ref (this, local); + frame->root->lk_owner = fd_lk->user_flock.l_owner; + + ret = client_submit_request (this, &req, frame, + conf->fops, GFS3_OP_LK, + client_reacquire_lock_cbk, + NULL, NULL, 0, NULL, 0, NULL, + (xdrproc_t)xdr_gfs3_lk_req); + if (ret) { + gf_log (this->name, GF_LOG_WARNING, + "reacquiring locks failed on file with gfid %s", + uuid_utoa (fdctx->gfid)); + break; + } + + ret = 0; + frame = NULL; + } + + if (local) + (void) clnt_fd_lk_local_unref (this, local); +out: + return ret; +} + +int +client_reacquire_lock (xlator_t *this, clnt_fd_ctx_t *fdctx) +{ + int32_t ret = -1; + fd_lk_ctx_t *lk_ctx = NULL; + + GF_VALIDATE_OR_GOTO ("client", this, out); + GF_VALIDATE_OR_GOTO (this->name, fdctx, out); + + if (client_fd_lk_list_empty (fdctx->lk_ctx, _gf_false)) { + gf_log (this->name, GF_LOG_DEBUG, + "fd lock list is empty"); + fdctx->reopen_done (fdctx, this); + } else { + lk_ctx = fdctx->lk_ctx; + + LOCK (&lk_ctx->lock); + { + (void) _client_reacquire_lock (this, fdctx); + } + UNLOCK (&lk_ctx->lock); + } + ret = 0; +out: + return ret; +} + +void +client_default_reopen_done (clnt_fd_ctx_t *fdctx, xlator_t *this) +{ + gf_log_callingfn (this->name, GF_LOG_WARNING, + "This function should never be called"); +} + +void +client_reopen_done (clnt_fd_ctx_t *fdctx, xlator_t *this) +{ + clnt_conf_t *conf = NULL; + gf_boolean_t destroy = _gf_false; + + conf = this->private; + + pthread_mutex_lock (&conf->lock); + { + fdctx->reopen_attempts = 0; + if (!fdctx->released) + list_add_tail (&fdctx->sfd_pos, &conf->saved_fds); + else + destroy = _gf_true; + fdctx->reopen_done = client_default_reopen_done; + } + pthread_mutex_unlock (&conf->lock); + + if (destroy) + client_fdctx_destroy (this, fdctx); +} + +void +client_child_up_reopen_done (clnt_fd_ctx_t *fdctx, xlator_t *this) +{ + clnt_conf_t *conf = NULL; + uint64_t fd_count = 0; + + conf = this->private; + + LOCK (&conf->rec_lock); + { + fd_count = --(conf->reopen_fd_count); + } + UNLOCK (&conf->rec_lock); + + client_reopen_done (fdctx, this); + if (fd_count == 0) { + gf_log (this->name, GF_LOG_INFO, + "last fd open'd/lock-self-heal'd - notifying CHILD-UP"); + client_set_lk_version (this); + client_notify_parents_child_up (this); + } +} + +int +client3_3_reopen_cbk (struct rpc_req *req, struct iovec *iov, int count, + void *myframe) +{ + int32_t ret = -1; + gfs3_open_rsp rsp = {0,}; + gf_boolean_t attempt_lock_recovery = _gf_false; + clnt_local_t *local = NULL; + clnt_conf_t *conf = NULL; + clnt_fd_ctx_t *fdctx = NULL; + call_frame_t *frame = NULL; + xlator_t *this = NULL; + + frame = myframe; + this = frame->this; + conf = this->private; + local = frame->local; + fdctx = local->fdctx; + + if (-1 == req->rpc_status) { + gf_log (frame->this->name, GF_LOG_WARNING, + "received RPC status error, returning ENOTCONN"); + rsp.op_ret = -1; + rsp.op_errno = ENOTCONN; + goto out; + } + + ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_gfs3_open_rsp); + if (ret < 0) { + gf_log (frame->this->name, GF_LOG_ERROR, "XDR decoding failed"); + rsp.op_ret = -1; + rsp.op_errno = EINVAL; + goto out; + } + + if (rsp.op_ret < 0) { + gf_log (frame->this->name, GF_LOG_WARNING, + "reopen on %s failed (%s)", + local->loc.path, strerror (rsp.op_errno)); + } else { + gf_log (frame->this->name, GF_LOG_DEBUG, + "reopen on %s succeeded (remote-fd = %"PRId64")", + local->loc.path, rsp.fd); + } + + if (rsp.op_ret == -1) { + ret = -1; + goto out; + } + + pthread_mutex_lock (&conf->lock); + { + fdctx->remote_fd = rsp.fd; + if (!fdctx->released) { + if (conf->lk_heal && + !client_fd_lk_list_empty (fdctx->lk_ctx, + _gf_false)) { + attempt_lock_recovery = _gf_true; + fdctx->lk_heal_state = GF_LK_HEAL_IN_PROGRESS; + } + } + } + pthread_mutex_unlock (&conf->lock); + + ret = 0; + + if (attempt_lock_recovery) { + /* Delay decrementing the reopen fd count untill all the + locks corresponding to this fd are acquired.*/ + gf_log (this->name, GF_LOG_DEBUG, "acquiring locks " + "on %s", local->loc.path); + ret = client_reacquire_lock (frame->this, local->fdctx); + if (ret) { + clnt_reacquire_lock_error (this, local->fdctx, conf); + gf_log (this->name, GF_LOG_WARNING, "acquiring locks " + "failed on %s", local->loc.path); + } + } + +out: + if (!attempt_lock_recovery) + fdctx->reopen_done (fdctx, this); + + frame->local = NULL; + STACK_DESTROY (frame->root); + + client_local_wipe (local); + + return 0; +} + +int +client3_3_reopendir_cbk (struct rpc_req *req, struct iovec *iov, int count, + void *myframe) +{ + int32_t ret = -1; + gfs3_open_rsp rsp = {0,}; + clnt_local_t *local = NULL; + clnt_conf_t *conf = NULL; + clnt_fd_ctx_t *fdctx = NULL; + call_frame_t *frame = NULL; + + frame = myframe; + local = frame->local; + fdctx = local->fdctx; + conf = frame->this->private; + + + if (-1 == req->rpc_status) { + gf_log (frame->this->name, GF_LOG_WARNING, + "received RPC status error, returning ENOTCONN"); + rsp.op_ret = -1; + rsp.op_errno = ENOTCONN; + goto out; + } + + ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_gfs3_opendir_rsp); + if (ret < 0) { + gf_log (frame->this->name, GF_LOG_ERROR, "XDR decoding failed"); + rsp.op_ret = -1; + rsp.op_errno = EINVAL; + goto out; + } + + if (rsp.op_ret < 0) { + gf_log (frame->this->name, GF_LOG_WARNING, + "reopendir on %s failed (%s)", + local->loc.path, strerror (rsp.op_errno)); + } else { + gf_log (frame->this->name, GF_LOG_INFO, + "reopendir on %s succeeded (fd = %"PRId64")", + local->loc.path, rsp.fd); + } + + if (-1 == rsp.op_ret) { + ret = -1; + goto out; + } + + pthread_mutex_lock (&conf->lock); + { + fdctx->remote_fd = rsp.fd; + } + pthread_mutex_unlock (&conf->lock); + +out: + fdctx->reopen_done (fdctx, frame->this); + + frame->local = NULL; + STACK_DESTROY (frame->root); + client_local_wipe (local); + + return 0; +} + +static int +protocol_client_reopendir (clnt_fd_ctx_t *fdctx, xlator_t *this) +{ + int ret = -1; + gfs3_opendir_req req = {{0,},}; + clnt_local_t *local = NULL; + call_frame_t *frame = NULL; + clnt_conf_t *conf = NULL; + + conf = this->private; + + local = mem_get0 (this->local_pool); + if (!local) { + ret = -1; + goto out; + } + local->fdctx = fdctx; + + uuid_copy (local->loc.gfid, fdctx->gfid); + ret = loc_path (&local->loc, NULL); + if (ret < 0) + goto out; + + frame = create_frame (this, this->ctx->pool); + if (!frame) { + ret = -1; + goto out; + } + + memcpy (req.gfid, fdctx->gfid, 16); + + gf_log (frame->this->name, GF_LOG_DEBUG, + "attempting reopen on %s", local->loc.path); + + frame->local = local; + + ret = client_submit_request (this, &req, frame, conf->fops, + GFS3_OP_OPENDIR, + client3_3_reopendir_cbk, NULL, + NULL, 0, NULL, 0, NULL, + (xdrproc_t)xdr_gfs3_opendir_req); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "failed to send the re-opendir request"); + } + + return 0; + +out: + if (frame) { + frame->local = NULL; + STACK_DESTROY (frame->root); + } + + if (local) + client_local_wipe (local); + + fdctx->reopen_done (fdctx, this); + + return 0; + +} + +static int +protocol_client_reopenfile (clnt_fd_ctx_t *fdctx, xlator_t *this) +{ + int ret = -1; + gfs3_open_req req = {{0,},}; + clnt_local_t *local = NULL; + call_frame_t *frame = NULL; + clnt_conf_t *conf = NULL; + + conf = this->private; + + frame = create_frame (this, this->ctx->pool); + if (!frame) { + ret = -1; + goto out; + } + + local = mem_get0 (this->local_pool); + if (!local) { + ret = -1; + goto out; + } + + local->fdctx = fdctx; + uuid_copy (local->loc.gfid, fdctx->gfid); + ret = loc_path (&local->loc, NULL); + if (ret < 0) + goto out; + + frame->local = local; + + memcpy (req.gfid, fdctx->gfid, 16); + req.flags = gf_flags_from_flags (fdctx->flags); + req.flags = req.flags & (~(O_TRUNC|O_CREAT|O_EXCL)); + + gf_log (frame->this->name, GF_LOG_DEBUG, + "attempting reopen on %s", local->loc.path); + + ret = client_submit_request (this, &req, frame, conf->fops, + GFS3_OP_OPEN, client3_3_reopen_cbk, NULL, + NULL, 0, NULL, 0, NULL, + (xdrproc_t)xdr_gfs3_open_req); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "failed to send the re-open request"); + } + + return 0; + +out: + if (frame) { + frame->local = NULL; + STACK_DESTROY (frame->root); + } + + if (local) + client_local_wipe (local); + + fdctx->reopen_done (fdctx, this); + + return 0; + +} + +static void +protocol_client_reopen (clnt_fd_ctx_t *fdctx, xlator_t *this) +{ + if (fdctx->is_dir) + protocol_client_reopendir (fdctx, this); + else + protocol_client_reopenfile (fdctx, this); +} + +gf_boolean_t +__is_fd_reopen_in_progress (clnt_fd_ctx_t *fdctx) +{ + if (fdctx->reopen_done == client_default_reopen_done) + return _gf_false; + return _gf_true; +} + +void +client_attempt_reopen (fd_t *fd, xlator_t *this) +{ + clnt_conf_t *conf = NULL; + clnt_fd_ctx_t *fdctx = NULL; + gf_boolean_t reopen = _gf_false; + + if (!fd || !this) + goto out; + + conf = this->private; + pthread_mutex_lock (&conf->lock); + { + fdctx = this_fd_get_ctx (fd, this); + if (!fdctx) + goto unlock; + if (__is_fd_reopen_in_progress (fdctx)) + goto unlock; + if (fdctx->remote_fd != -1) + goto unlock; + + if (fdctx->reopen_attempts == CLIENT_REOPEN_MAX_ATTEMPTS) { + reopen = _gf_true; + fdctx->reopen_done = client_reopen_done; + list_del_init (&fdctx->sfd_pos); + } else { + fdctx->reopen_attempts++; + } + } +unlock: + pthread_mutex_unlock (&conf->lock); + if (reopen) + protocol_client_reopen (fdctx, this); +out: + return; +} + +int client_post_handshake (call_frame_t *frame, xlator_t *this) { clnt_conf_t *conf = NULL; clnt_fd_ctx_t *tmp = NULL; clnt_fd_ctx_t *fdctx = NULL; - xlator_list_t *parent = NULL; struct list_head reopen_head; + int count = 0; + if (!this || !this->private) goto out; @@ -333,29 +1292,33 @@ client_post_handshake (call_frame_t *frame, xlator_t *this) if (fdctx->remote_fd != -1) continue; + fdctx->reopen_done = client_child_up_reopen_done; list_del_init (&fdctx->sfd_pos); list_add_tail (&fdctx->sfd_pos, &reopen_head); + count++; } } pthread_mutex_unlock (&conf->lock); - list_for_each_entry_safe (fdctx, tmp, &reopen_head, sfd_pos) { - list_del_init (&fdctx->sfd_pos); - - if (fdctx->is_dir) - protocol_client_reopendir (this, fdctx); - else - protocol_client_reopen (this, fdctx); - } + /* Delay notifying CHILD_UP to parents + until all locks are recovered */ + if (count > 0) { + gf_log (this->name, GF_LOG_INFO, + "%d fds open - Delaying child_up until they are re-opened", + count); + client_save_number_fds (conf, count); - parent = this->parents; + list_for_each_entry_safe (fdctx, tmp, &reopen_head, sfd_pos) { + list_del_init (&fdctx->sfd_pos); - while (parent) { - xlator_notify (parent->xlator, GF_EVENT_CHILD_UP, - this); - parent = parent->next; + protocol_client_reopen (fdctx, this); + } + } else { + gf_log (this->name, GF_LOG_DEBUG, + "No fds to open - notifying all parents child up"); + client_set_lk_version (this); + client_notify_parents_child_up (this); } - out: return 0; } @@ -367,31 +1330,30 @@ client_setvolume_cbk (struct rpc_req *req, struct iovec *iov, int count, void *m clnt_conf_t *conf = NULL; xlator_t *this = NULL; dict_t *reply = NULL; - xlator_list_t *parent = NULL; char *process_uuid = NULL; char *remote_error = NULL; char *remote_subvol = NULL; - rpc_transport_t *peer_trans = NULL; gf_setvolume_rsp rsp = {0,}; - uint64_t peertrans_int = 0; int ret = 0; - int op_ret = 0; - int op_errno = 0; + int32_t op_ret = 0; + int32_t op_errno = 0; + gf_boolean_t auth_fail = _gf_false; + uint32_t lk_ver = 0; frame = myframe; this = frame->this; conf = this->private; if (-1 == req->rpc_status) { + gf_log (frame->this->name, GF_LOG_WARNING, + "received RPC status error"); op_ret = -1; - op_errno = EINVAL; goto out; } - ret = xdr_to_setvolume_rsp (*iov, &rsp); + ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_gf_setvolume_rsp); if (ret < 0) { - gf_log ("", GF_LOG_ERROR, "error"); - op_errno = EINVAL; + gf_log (this->name, GF_LOG_ERROR, "XDR decoding failed"); op_ret = -1; goto out; } @@ -399,7 +1361,8 @@ client_setvolume_cbk (struct rpc_req *req, struct iovec *iov, int count, void *m op_errno = gf_error_to_errno (rsp.op_errno); if (-1 == rsp.op_ret) { gf_log (frame->this->name, GF_LOG_WARNING, - "failed to set the volume"); + "failed to set the volume (%s)", + (op_errno)? strerror (op_errno) : "--"); } reply = dict_new (); @@ -410,7 +1373,7 @@ client_setvolume_cbk (struct rpc_req *req, struct iovec *iov, int count, void *m ret = dict_unserialize (rsp.dict.dict_val, rsp.dict.dict_len, &reply); if (ret < 0) { - gf_log (frame->this->name, GF_LOG_DEBUG, + gf_log (frame->this->name, GF_LOG_WARNING, "failed to unserialize buffer to dict"); goto out; } @@ -418,13 +1381,13 @@ client_setvolume_cbk (struct rpc_req *req, struct iovec *iov, int count, void *m ret = dict_get_str (reply, "ERROR", &remote_error); if (ret < 0) { - gf_log (this->name, GF_LOG_DEBUG, + gf_log (this->name, GF_LOG_WARNING, "failed to get ERROR string from reply dict"); } ret = dict_get_str (reply, "process-uuid", &process_uuid); if (ret < 0) { - gf_log (this->name, GF_LOG_DEBUG, + gf_log (this->name, GF_LOG_WARNING, "failed to get 'process-uuid' from reply dict"); } @@ -433,40 +1396,61 @@ client_setvolume_cbk (struct rpc_req *req, struct iovec *iov, int count, void *m "SETVOLUME on remote-host failed: %s", remote_error ? remote_error : strerror (op_errno)); errno = op_errno; + if (remote_error && + (strcmp ("Authentication failed", remote_error) == 0)) { + auth_fail = _gf_true; + op_ret = 0; + } if (op_errno == ESTALE) { - parent = this->parents; - while (parent) { - xlator_notify (parent->xlator, - GF_EVENT_VOLFILE_MODIFIED, - this); - parent = parent->next; - } + ret = default_notify (this, GF_EVENT_VOLFILE_MODIFIED, NULL); + if (ret) + gf_log (this->name, GF_LOG_INFO, + "notify of VOLFILE_MODIFIED failed"); + conf->last_sent_event = GF_EVENT_VOLFILE_MODIFIED; } goto out; } + ret = dict_get_str (this->options, "remote-subvolume", &remote_subvol); - if (!remote_subvol) + if (ret || !remote_subvol) { + gf_log (this->name, GF_LOG_WARNING, + "failed to find key 'remote-subvolume' in the options"); goto out; + } + + ret = dict_get_uint32 (reply, "clnt-lk-version", &lk_ver); + if (ret) { + gf_log (this->name, GF_LOG_WARNING, + "failed to find key 'clnt-lk-version' in the options"); + goto out; + } - if (process_uuid && + gf_log (this->name, GF_LOG_DEBUG, "clnt-lk-version = %d, " + "server-lk-version = %d", client_get_lk_ver (conf), lk_ver); + /* TODO: currently setpeer path is broken */ + /* + if (process_uuid && req->conn && !strcmp (this->ctx->process_uuid, process_uuid)) { + rpc_transport_t *peer_trans = NULL; + uint64_t peertrans_int = 0; + ret = dict_get_uint64 (reply, "transport-ptr", &peertrans_int); - - peer_trans = (void *) (long) (peertrans_int); + if (ret) + goto out; gf_log (this->name, GF_LOG_WARNING, "attaching to the local volume '%s'", remote_subvol); - if (req->conn) { - /* TODO: Some issues with this logic at present */ - //rpc_transport_setpeer (req->conn->trans, peer_trans); - } + peer_trans = (void *) (long) (peertrans_int); + + rpc_transport_setpeer (req->conn->trans, peer_trans); } + */ - gf_log (this->name, GF_LOG_NORMAL, + gf_log (this->name, GF_LOG_INFO, "Connected to %s, attached to remote volume '%s'.", conf->rpc->conn.trans->peerinfo.identifier, remote_subvol); @@ -475,43 +1459,66 @@ client_setvolume_cbk (struct rpc_req *req, struct iovec *iov, int count, void *m op_ret = 0; conf->connecting = 0; - - /* TODO: more to test */ - client_post_handshake (frame, frame->this); + conf->connected = 1; + + conf->need_different_port = 0; + + if (lk_ver != client_get_lk_ver (conf)) { + gf_log (this->name, GF_LOG_INFO, "Server and Client " + "lk-version numbers are not same, reopening the fds"); + client_mark_fd_bad (this); + client_post_handshake (frame, frame->this); + } else { + /*TODO: Traverse the saved fd list, and send + release to the server on fd's that were closed + during grace period */ + gf_log (this->name, GF_LOG_INFO, "Server and Client " + "lk-version numbers are same, no need to " + "reopen the fds"); + } out: - + if (auth_fail) { + gf_log (this->name, GF_LOG_INFO, "sending AUTH_FAILED event"); + ret = default_notify (this, GF_EVENT_AUTH_FAILED, NULL); + if (ret) + gf_log (this->name, GF_LOG_INFO, + "notify of AUTH_FAILED failed"); + conf->connecting = 0; + conf->connected = 0; + conf->last_sent_event = GF_EVENT_AUTH_FAILED; + ret = -1; + } if (-1 == op_ret) { /* Let the connection/re-connection happen in * background, for now, don't hang here, * tell the parents that i am all ok.. */ - parent = this->parents; - while (parent) { - xlator_notify (parent->xlator, - GF_EVENT_CHILD_CONNECTING, this); - parent = parent->next; - } - + gf_log (this->name, GF_LOG_INFO, "sending CHILD_CONNECTING event"); + ret = default_notify (this, GF_EVENT_CHILD_CONNECTING, NULL); + if (ret) + gf_log (this->name, GF_LOG_INFO, + "notify of CHILD_CONNECTING failed"); + conf->last_sent_event = GF_EVENT_CHILD_CONNECTING; conf->connecting= 1; + ret = 0; } - if (rsp.dict.dict_val) - free (rsp.dict.dict_val); + free (rsp.dict.dict_val); STACK_DESTROY (frame->root); if (reply) dict_unref (reply); - return 0; + return ret; } int client_setvolume (xlator_t *this, struct rpc_clnt *rpc) { int ret = 0; - gf_setvolume_req req = {0,}; + gf_setvolume_req req = {{0,},}; call_frame_t *fr = NULL; char *process_uuid_xl = NULL; clnt_conf_t *conf = NULL; @@ -541,13 +1548,19 @@ client_setvolume (xlator_t *this, struct rpc_clnt *rpc) } } - ret = gf_asprintf (&process_uuid_xl, "%s-%s", this->ctx->process_uuid, - this->name); + /* With multiple graphs possible in the same process, we need a + field to bring the uniqueness. Graph-ID should be enough to get the + job done + */ + ret = gf_asprintf (&process_uuid_xl, "%s-%s-%d", + this->ctx->process_uuid, this->name, + this->graph->id); if (-1 == ret) { gf_log (this->name, GF_LOG_ERROR, "asprintf failed while setting process_uuid"); goto fail; } + ret = dict_set_dynstr (options, "process-uuid", process_uuid_xl); if (ret < 0) { gf_log (this->name, GF_LOG_ERROR, @@ -556,21 +1569,44 @@ client_setvolume (xlator_t *this, struct rpc_clnt *rpc) goto fail; } + ret = dict_set_str (options, "client-version", PACKAGE_VERSION); + if (ret < 0) { + gf_log (this->name, GF_LOG_WARNING, + "failed to set client-version(%s) in handshake msg", + PACKAGE_VERSION); + } + if (this->ctx->cmd_args.volfile_server) { - if (this->ctx->cmd_args.volfile_id) + if (this->ctx->cmd_args.volfile_id) { ret = dict_set_str (options, "volfile-key", this->ctx->cmd_args.volfile_id); + if (ret) + gf_log (this->name, GF_LOG_ERROR, + "failed to set 'volfile-key'"); + } ret = dict_set_uint32 (options, "volfile-checksum", this->graph->volfile_checksum); + if (ret) + gf_log (this->name, GF_LOG_ERROR, + "failed to set 'volfile-checksum'"); + } + + ret = dict_set_int16 (options, "clnt-lk-version", + client_get_lk_ver (conf)); + if (ret < 0) { + gf_log (this->name, GF_LOG_WARNING, + "failed to set clnt-lk-version(%"PRIu32") in handshake msg", + client_get_lk_ver (conf)); } - req.dict.dict_len = dict_serialized_length (options); - if (req.dict.dict_len < 0) { + ret = dict_serialized_length (options); + if (ret < 0) { gf_log (this->name, GF_LOG_ERROR, "failed to get serialized length of dict"); ret = -1; goto fail; } + req.dict.dict_len = ret; req.dict.dict_val = GF_CALLOC (1, req.dict.dict_len, gf_client_mt_clnt_req_buf_t); ret = dict_serialize (options, req.dict.dict_val); @@ -586,11 +1622,11 @@ client_setvolume (xlator_t *this, struct rpc_clnt *rpc) ret = client_submit_request (this, &req, fr, conf->handshake, GF_HNDSK_SETVOLUME, client_setvolume_cbk, - NULL, xdr_from_setvolume_req); + NULL, NULL, 0, NULL, 0, NULL, + (xdrproc_t)xdr_gf_setvolume_req); fail: - if (req.dict.dict_val) - GF_FREE (req.dict.dict_val); + GF_FREE (req.dict.dict_val); return ret; } @@ -602,26 +1638,20 @@ select_server_supported_programs (xlator_t *this, gf_prog_detail *prog) clnt_conf_t *conf = NULL; int ret = -1; - if (!this || !prog) + if (!this || !prog) { + gf_log (THIS->name, GF_LOG_WARNING, + "xlator not found OR RPC program not found"); goto out; + } conf = this->private; trav = prog; while (trav) { /* Select 'programs' */ - if ((clnt3_1_fop_prog.prognum == trav->prognum) && - (clnt3_1_fop_prog.progver == trav->progver)) { - conf->fops = &clnt3_1_fop_prog; - gf_log (this->name, GF_LOG_INFO, - "Using Program %s, Num (%"PRId64"), " - "Version (%"PRId64")", - trav->progname, trav->prognum, trav->progver); - ret = 0; - } - if ((clnt3_1_mgmt_prog.prognum == trav->prognum) && - (clnt3_1_mgmt_prog.progver == trav->progver)) { - conf->mgmt = &clnt3_1_mgmt_prog; + if ((clnt3_3_fop_prog.prognum == trav->prognum) && + (clnt3_3_fop_prog.progver == trav->progver)) { + conf->fops = &clnt3_3_fop_prog; gf_log (this->name, GF_LOG_INFO, "Using Program %s, Num (%"PRId64"), " "Version (%"PRId64")", @@ -640,8 +1670,165 @@ out: return ret; } + +int +server_has_portmap (xlator_t *this, gf_prog_detail *prog) +{ + gf_prog_detail *trav = NULL; + int ret = -1; + + if (!this || !prog) { + gf_log (THIS->name, GF_LOG_WARNING, + "xlator not found OR RPC program not found"); + goto out; + } + + trav = prog; + + while (trav) { + if ((trav->prognum == GLUSTER_PMAP_PROGRAM) && + (trav->progver == GLUSTER_PMAP_VERSION)) { + gf_log (this->name, GF_LOG_DEBUG, + "detected portmapper on server"); + ret = 0; + break; + } + trav = trav->next; + } + +out: + return ret; +} + + +int +client_query_portmap_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) +{ + struct pmap_port_by_brick_rsp rsp = {0,}; + call_frame_t *frame = NULL; + clnt_conf_t *conf = NULL; + int ret = -1; + struct rpc_clnt_config config = {0, }; + xlator_t *this = NULL; + + frame = myframe; + if (!frame || !frame->this || !frame->this->private) { + gf_log (THIS->name, GF_LOG_WARNING, + "frame not found with rpc request"); + goto out; + } + this = frame->this; + conf = frame->this->private; + + if (-1 == req->rpc_status) { + gf_log (this->name, GF_LOG_WARNING, + "received RPC status error, try again later"); + goto out; + } + + ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_pmap_port_by_brick_rsp); + if (ret < 0) { + gf_log (this->name, GF_LOG_ERROR, "XDR decoding failed"); + goto out; + } + + if (-1 == rsp.op_ret) { + ret = -1; + gf_log (this->name, ((!conf->portmap_err_logged) ? + GF_LOG_ERROR : GF_LOG_DEBUG), + "failed to get the port number for remote subvolume. " + "Please run 'gluster volume status' on server to see " + "if brick process is running."); + conf->portmap_err_logged = 1; + goto out; + } + + conf->portmap_err_logged = 0; + conf->disconnect_err_logged = 0; + + config.remote_port = rsp.port; + rpc_clnt_reconfig (conf->rpc, &config); + + conf->skip_notify = 1; + conf->quick_reconnect = 1; + +out: + if (frame) + STACK_DESTROY (frame->root); + + if (conf) { + /* Need this to connect the same transport on different port */ + /* ie, glusterd to glusterfsd */ + rpc_transport_disconnect (conf->rpc->conn.trans); + } + + return ret; +} + + +int +client_query_portmap (xlator_t *this, struct rpc_clnt *rpc) +{ + int ret = -1; + pmap_port_by_brick_req req = {0,}; + call_frame_t *fr = NULL; + clnt_conf_t *conf = NULL; + dict_t *options = NULL; + char *remote_subvol = NULL; + char *xprt = NULL; + char brick_name[PATH_MAX] = {0,}; + + options = this->options; + conf = this->private; + + ret = dict_get_str (options, "remote-subvolume", &remote_subvol); + if (ret < 0) { + gf_log (this->name, GF_LOG_ERROR, + "remote-subvolume not set in volfile"); + goto fail; + } + + req.brick = remote_subvol; + + /* FIXME: Dirty work around */ + if (!dict_get_str (options, "transport-type", &xprt)) { + /* This logic is required only in case of 'rdma' client + transport-type and the volume is of 'tcp,rdma' + transport type. */ + if (!strcmp (xprt, "rdma")) { + if (!conf->need_different_port) { + snprintf (brick_name, PATH_MAX, "%s.rdma", + remote_subvol); + req.brick = brick_name; + conf->need_different_port = 1; + conf->skip_notify = 1; + } else { + conf->need_different_port = 0; + conf->skip_notify = 0; + } + } + } + + fr = create_frame (this, this->ctx->pool); + if (!fr) { + ret = -1; + goto fail; + } + + ret = client_submit_request (this, &req, fr, &clnt_pmap_prog, + GF_PMAP_PORTBYBRICK, + client_query_portmap_cbk, + NULL, NULL, 0, NULL, 0, NULL, + (xdrproc_t)xdr_pmap_port_by_brick_req); + +fail: + return ret; +} + + int -client_dump_version_cbk (struct rpc_req *req, struct iovec *iov, int count, void *myframe) +client_dump_version_cbk (struct rpc_req *req, struct iovec *iov, int count, + void *myframe) { gf_dump_rsp rsp = {0,}; gf_prog_detail *trav = NULL; @@ -654,28 +1841,33 @@ client_dump_version_cbk (struct rpc_req *req, struct iovec *iov, int count, void conf = frame->this->private; if (-1 == req->rpc_status) { - gf_log ("", 1, "some error, retry again later"); + gf_log (frame->this->name, GF_LOG_WARNING, + "received RPC status error"); goto out; } - ret = xdr_to_dump_rsp (*iov, &rsp); + ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_gf_dump_rsp); if (ret < 0) { - gf_log ("", GF_LOG_ERROR, "error"); + gf_log (frame->this->name, GF_LOG_ERROR, "XDR decoding failed"); goto out; } if (-1 == rsp.op_ret) { - gf_log (frame->this->name, GF_LOG_ERROR, + gf_log (frame->this->name, GF_LOG_WARNING, "failed to get the 'versions' from server"); goto out; } + if (server_has_portmap (frame->this, rsp.prog) == 0) { + ret = client_query_portmap (frame->this, conf->rpc); + goto out; + } + /* Check for the proper version string */ /* Reply in "Name:Program-Number:Program-Version,..." format */ ret = select_server_supported_programs (frame->this, rsp.prog); if (ret) { gf_log (frame->this->name, GF_LOG_ERROR, - "Server versions are not present in this " - "release"); + "server doesn't support the version"); goto out; } @@ -687,12 +1879,17 @@ out: trav = rsp.prog; while (trav) { next = trav->next; + free (trav->progname); free (trav); trav = next; } } STACK_DESTROY (frame->root); + + if (ret != 0) + rpc_transport_disconnect (conf->rpc->conn.trans); + return ret; } @@ -705,8 +1902,10 @@ client_handshake (xlator_t *this, struct rpc_clnt *rpc) int ret = 0; conf = this->private; - if (!conf->handshake) + if (!conf->handshake) { + gf_log (this->name, GF_LOG_WARNING, "handshake program not found"); goto out; + } frame = create_frame (this, this->ctx->pool); if (!frame) @@ -715,7 +1914,8 @@ client_handshake (xlator_t *this, struct rpc_clnt *rpc) req.gfs_id = 0xbabe; ret = client_submit_request (this, &req, frame, conf->dump, GF_DUMP_DUMP, client_dump_version_cbk, - NULL, xdr_from_dump_req); + NULL, NULL, 0, NULL, 0, + NULL, (xdrproc_t)xdr_gf_dump_req); out: return ret; @@ -726,6 +1926,7 @@ char *clnt_handshake_procs[GF_HNDSK_MAXVALUE] = { [GF_HNDSK_SETVOLUME] = "SETVOLUME", [GF_HNDSK_GETSPEC] = "GETSPEC", [GF_HNDSK_PING] = "PING", + [GF_HNDSK_SET_LK_VER] = "SET_LK_VER" }; rpc_clnt_prog_t clnt_handshake_prog = { @@ -746,3 +1947,14 @@ rpc_clnt_prog_t clnt_dump_prog = { .progver = GLUSTER_DUMP_VERSION, .procnames = clnt_dump_proc, }; + +char *clnt_pmap_procs[GF_PMAP_MAXVALUE] = { + [GF_PMAP_PORTBYBRICK] = "PORTBYBRICK", +}; + +rpc_clnt_prog_t clnt_pmap_prog = { + .progname = "PORTMAP", + .prognum = GLUSTER_PMAP_PROGRAM, + .progver = GLUSTER_PMAP_VERSION, + .procnames = clnt_pmap_procs, +}; |
