summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-lib/src/rpc-clnt.c
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-lib/src/rpc-clnt.c')
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.c161
1 files changed, 101 insertions, 60 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index 4920eda..22513b7 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -137,6 +137,7 @@ out:
static void
call_bail (void *data)
{
+ rpc_transport_t *trans = NULL;
struct rpc_clnt *clnt = NULL;
rpc_clnt_connection_t *conn = NULL;
struct timeval current;
@@ -147,12 +148,27 @@ call_bail (void *data)
char frame_sent[256] = {0,};
struct timespec timeout = {0,};
struct iovec iov = {0,};
+ char peerid[UNIX_PATH_MAX] = {0};
GF_VALIDATE_OR_GOTO ("client", data, out);
clnt = data;
conn = &clnt->conn;
+ pthread_mutex_lock (&conn->lock);
+ {
+ trans = conn->trans;
+ if (trans) {
+ strncpy (peerid, conn->trans->peerinfo.identifier,
+ sizeof (peerid)-1);
+
+ }
+ }
+ pthread_mutex_unlock (&conn->lock);
+ /*rpc_clnt_connection_cleanup will be unwinding all saved frames,
+ * bailed or otherwise*/
+ if (!trans)
+ goto out;
gettimeofday (&current, NULL);
INIT_LIST_HEAD (&list);
@@ -172,9 +188,9 @@ call_bail (void *data)
(void *) clnt);
if (conn->timer == NULL) {
- gf_log (conn->trans->name, GF_LOG_WARNING,
+ gf_log (conn->name, GF_LOG_WARNING,
"Cannot create bailout timer for %s",
- conn->trans->peerinfo.identifier);
+ peerid);
}
}
@@ -197,7 +213,7 @@ call_bail (void *data)
256 - strlen (frame_sent),
".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec);
- gf_log (conn->trans->name, GF_LOG_ERROR,
+ gf_log (conn->name, GF_LOG_ERROR,
"bailing out frame type(%s) op(%s(%d)) xid = 0x%x "
"sent = %s. timeout = %d for %s",
trav->rpcreq->prog->progname,
@@ -205,7 +221,7 @@ call_bail (void *data)
trav->rpcreq->prog->procnames[trav->rpcreq->procnum] :
"--",
trav->rpcreq->procnum, trav->rpcreq->xid, frame_sent,
- conn->frame_timeout, conn->trans->peerinfo.identifier);
+ conn->frame_timeout, peerid);
clnt = rpc_clnt_ref (clnt);
trav->rpcreq->rpc_status = -1;
@@ -357,7 +373,7 @@ saved_frames_unwind (struct saved_frames *saved_frames)
if (!trav->rpcreq || !trav->rpcreq->prog)
continue;
- gf_log_callingfn (trav->rpcreq->conn->trans->name,
+ gf_log_callingfn (trav->rpcreq->conn->name,
GF_LOG_ERROR,
"forced unwinding frame type(%s) op(%s(%d)) "
"called at %s (xid=0x%x)",
@@ -394,7 +410,7 @@ saved_frames_destroy (struct saved_frames *frames)
void
-rpc_clnt_reconnect (void *trans_ptr)
+rpc_clnt_reconnect (void *conn_ptr)
{
rpc_transport_t *trans = NULL;
rpc_clnt_connection_t *conn = NULL;
@@ -402,15 +418,16 @@ rpc_clnt_reconnect (void *trans_ptr)
int32_t ret = 0;
struct rpc_clnt *clnt = NULL;
- trans = trans_ptr;
- if (!trans || !trans->mydata)
- return;
-
- conn = trans->mydata;
+ conn = conn_ptr;
clnt = conn->rpc_clnt;
pthread_mutex_lock (&conn->lock);
{
+ trans = conn->trans;
+ if (!trans) {
+ pthread_mutex_unlock (&conn->lock);
+ return;
+ }
if (conn->reconnect)
gf_timer_call_cancel (clnt->ctx,
conn->reconnect);
@@ -420,16 +437,16 @@ rpc_clnt_reconnect (void *trans_ptr)
ts.tv_sec = 3;
ts.tv_nsec = 0;
- gf_log (trans->name, GF_LOG_TRACE,
+ gf_log (conn->name, GF_LOG_TRACE,
"attempting reconnect");
ret = rpc_transport_connect (trans,
conn->config.remote_port);
conn->reconnect =
gf_timer_call_after (clnt->ctx, ts,
rpc_clnt_reconnect,
- trans);
+ conn);
} else {
- gf_log (trans->name, GF_LOG_TRACE,
+ gf_log (conn->name, GF_LOG_TRACE,
"breaking reconnect chain");
}
}
@@ -457,7 +474,7 @@ rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info)
pthread_mutex_unlock (&clnt->conn.lock);
if (ret == -1) {
- gf_log (clnt->conn.trans->name, GF_LOG_CRITICAL,
+ gf_log (clnt->conn.name, GF_LOG_CRITICAL,
"cannot lookup the saved "
"frame corresponding to xid (%d)", info->xid);
goto out;
@@ -517,11 +534,9 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn)
clnt = conn->rpc_clnt;
- gf_log (conn->trans->name, GF_LOG_TRACE,
- "cleaning up state in transport object %p", conn->trans);
-
pthread_mutex_lock (&conn->lock);
{
+
saved_frames = conn->saved_frames;
conn->saved_frames = saved_frames_new ();
@@ -651,7 +666,7 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg,
ret = xdr_to_rpc_reply (msgbuf, msglen, &rpcmsg, &progmsg,
req->verf.authdata);
if (ret != 0) {
- gf_log (conn->trans->name, GF_LOG_WARNING,
+ gf_log (conn->name, GF_LOG_WARNING,
"RPC reply decoding failed");
goto out;
}
@@ -662,13 +677,13 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg,
goto out;
}
- gf_log (conn->trans->name, GF_LOG_TRACE,
+ gf_log (conn->name, GF_LOG_TRACE,
"received rpc message (RPC XID: 0x%x"
" Program: %s, ProgVers: %d, Proc: %d) from rpc-transport (%s)",
saved_frame->rpcreq->xid,
saved_frame->rpcreq->prog->progname,
saved_frame->rpcreq->prog->progver,
- saved_frame->rpcreq->procnum, conn->trans->name);
+ saved_frame->rpcreq->procnum, conn->name);
out:
if (ret != 0) {
@@ -696,18 +711,18 @@ rpc_clnt_handle_cbk (struct rpc_clnt *clnt, rpc_transport_pollin_t *msg)
clnt = rpc_clnt_ref (clnt);
ret = xdr_to_rpc_call (msgbuf, msglen, &rpcmsg, &progmsg, NULL,NULL);
if (ret == -1) {
- gf_log (clnt->conn.trans->name, GF_LOG_WARNING,
+ gf_log (clnt->conn.name, GF_LOG_WARNING,
"RPC call decoding failed");
goto out;
}
- gf_log (clnt->conn.trans->name, GF_LOG_TRACE,
+ gf_log (clnt->conn.name, GF_LOG_TRACE,
"received rpc message (XID: 0x%lx, "
"Ver: %ld, Program: %ld, ProgVers: %ld, Proc: %ld) "
"from rpc-transport (%s)", rpc_call_xid (&rpcmsg),
rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg),
rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg),
- clnt->conn.trans->name);
+ clnt->conn.name);
procnum = rpc_call_progproc (&rpcmsg);
@@ -750,7 +765,7 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin)
xid = ntoh32 (*((uint32_t *)pollin->vector[0].iov_base));
saved_frame = lookup_frame (conn, xid);
if (saved_frame == NULL) {
- gf_log (conn->trans->name, GF_LOG_ERROR,
+ gf_log (conn->name, GF_LOG_ERROR,
"cannot lookup the saved frame for reply with xid (%u)",
xid);
goto out;
@@ -758,7 +773,7 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin)
req = saved_frame->rpcreq;
if (req == NULL) {
- gf_log (conn->trans->name, GF_LOG_ERROR,
+ gf_log (conn->name, GF_LOG_ERROR,
"no request with frame for xid (%u)", xid);
goto out;
}
@@ -766,7 +781,7 @@ rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin)
ret = rpc_clnt_reply_init (conn, pollin, req, saved_frame);
if (ret != 0) {
req->rpc_status = -1;
- gf_log (conn->trans->name, GF_LOG_WARNING,
+ gf_log (conn->name, GF_LOG_WARNING,
"initialising rpc reply failed");
}
@@ -859,7 +874,7 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
conn->reconnect =
gf_timer_call_after (clnt->ctx, ts,
rpc_clnt_reconnect,
- conn->trans);
+ conn);
}
}
pthread_mutex_unlock (&conn->lock);
@@ -961,10 +976,17 @@ rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx,
{
int ret = -1;
rpc_clnt_connection_t *conn = NULL;
+ rpc_transport_t *trans = NULL;
conn = &clnt->conn;
pthread_mutex_init (&clnt->conn.lock, NULL);
+ conn->name = gf_strdup (name);
+ if (!conn->name) {
+ ret = -1;
+ goto out;
+ }
+
ret = dict_get_int32 (options, "frame-timeout",
&conn->frame_timeout);
if (ret >= 0) {
@@ -975,25 +997,28 @@ rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx,
"defaulting frame-timeout to 30mins");
conn->frame_timeout = 1800;
}
+ conn->rpc_clnt = clnt;
- conn->trans = rpc_transport_load (ctx, options, name);
- if (!conn->trans) {
+ trans = rpc_transport_load (ctx, options, name);
+ if (!trans) {
gf_log (name, GF_LOG_WARNING, "loading of new rpc-transport"
" failed");
ret = -1;
goto out;
}
+ rpc_transport_ref (trans);
- rpc_transport_ref (conn->trans);
-
- conn->rpc_clnt = clnt;
+ pthread_mutex_lock (&conn->lock);
+ {
+ conn->trans = trans;
+ trans = NULL;
+ }
+ pthread_mutex_unlock (&conn->lock);
ret = rpc_transport_register_notify (conn->trans, rpc_clnt_notify,
conn);
if (ret == -1) {
gf_log (name, GF_LOG_WARNING, "registering notify failed");
- rpc_clnt_connection_cleanup (conn);
- conn = NULL;
goto out;
}
@@ -1001,13 +1026,26 @@ rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx,
if (!conn->saved_frames) {
gf_log (name, GF_LOG_WARNING, "creation of saved_frames "
"failed");
- rpc_clnt_connection_cleanup (conn);
+ ret = -1;
goto out;
}
ret = 0;
out:
+ if (ret) {
+ pthread_mutex_lock (&conn->lock);
+ {
+ trans = conn->trans;
+ conn->trans = NULL;
+ }
+ pthread_mutex_unlock (&conn->lock);
+ if (trans)
+ rpc_transport_unref (trans);
+ //conn cleanup needs to be done since we might have failed to
+ // register notification.
+ rpc_clnt_connection_cleanup (conn);
+ }
return ret;
}
@@ -1079,7 +1117,7 @@ rpc_clnt_start (struct rpc_clnt *rpc)
conn = &rpc->conn;
- rpc_clnt_reconnect (conn->trans);
+ rpc_clnt_reconnect (conn);
return 0;
}
@@ -1234,7 +1272,7 @@ rpc_clnt_record_build_record (struct rpc_clnt *clnt, int prognum, int progver,
xid, au, &request, auth_data);
if (ret == -1) {
- gf_log (clnt->conn.trans->name, GF_LOG_WARNING,
+ gf_log (clnt->conn.name, GF_LOG_WARNING,
"cannot build a rpc-request xid (%"PRIu64")", xid);
goto out;
}
@@ -1257,7 +1295,7 @@ rpc_clnt_record_build_record (struct rpc_clnt *clnt, int prognum, int progver,
hdrsize);
if (!recordhdr.iov_base) {
- gf_log (clnt->conn.trans->name, GF_LOG_ERROR,
+ gf_log (clnt->conn.name, GF_LOG_ERROR,
"Failed to build record header");
iobuf_unref (request_iob);
request_iob = NULL;
@@ -1307,7 +1345,7 @@ rpc_clnt_record (struct rpc_clnt *clnt, call_frame_t *call_frame,
au.lk_owner.lk_owner_len = 4;
}
- gf_log (clnt->conn.trans->name, GF_LOG_TRACE, "Auth Info: pid: %u, uid: %d"
+ gf_log (clnt->conn.name, GF_LOG_TRACE, "Auth Info: pid: %u, uid: %d"
", gid: %d, owner: %s", au.pid, au.uid, au.gid,
lkowner_utoa (&call_frame->root->lk_owner));
@@ -1317,7 +1355,7 @@ rpc_clnt_record (struct rpc_clnt *clnt, call_frame_t *call_frame,
callid, &au,
rpchdr);
if (!request_iob) {
- gf_log (clnt->conn.trans->name, GF_LOG_WARNING,
+ gf_log (clnt->conn.name, GF_LOG_WARNING,
"cannot build rpc-record");
goto out;
}
@@ -1353,7 +1391,7 @@ rpcclnt_cbk_program_register (struct rpc_clnt *clnt,
pthread_mutex_unlock (&clnt->lock);
if (already_registered) {
- gf_log_callingfn (clnt->conn.trans->name, GF_LOG_DEBUG,
+ gf_log_callingfn (clnt->conn.name, GF_LOG_DEBUG,
"already registered");
ret = 0;
goto out;
@@ -1377,14 +1415,14 @@ rpcclnt_cbk_program_register (struct rpc_clnt *clnt,
pthread_mutex_unlock (&clnt->lock);
ret = 0;
- gf_log (clnt->conn.trans->name, GF_LOG_DEBUG,
+ gf_log (clnt->conn.name, GF_LOG_DEBUG,
"New program registered: %s, Num: %d, Ver: %d",
program->progname, program->prognum,
program->progver);
out:
if (ret == -1) {
- gf_log (clnt->conn.trans->name, GF_LOG_ERROR,
+ gf_log (clnt->conn.name, GF_LOG_ERROR,
"Program registration failed:"
" %s, Num: %d, Ver: %d", program->progname,
program->prognum, program->progver);
@@ -1419,10 +1457,6 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
conn = &rpc->conn;
- if (conn->trans == NULL) {
- goto out;
- }
-
rpcreq = mem_get (rpc->reqpool);
if (rpcreq == NULL) {
goto out;
@@ -1458,7 +1492,7 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
procnum, proglen,
&rpchdr, callid);
if (!request_iob) {
- gf_log (conn->trans->name, GF_LOG_WARNING,
+ gf_log (conn->name, GF_LOG_WARNING,
"cannot build rpc-record");
goto out;
}
@@ -1487,15 +1521,14 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
conn->config.remote_port);
}
- ret = rpc_transport_submit_request (rpc->conn.trans,
- &req);
+ ret = rpc_transport_submit_request (conn->trans, &req);
if (ret == -1) {
- gf_log (conn->trans->name, GF_LOG_WARNING,
+ gf_log (conn->name, GF_LOG_WARNING,
"failed to submit rpc-request "
"(XID: 0x%x Program: %s, ProgVers: %d, "
"Proc: %d) to rpc-transport (%s)", rpcreq->xid,
rpcreq->prog->progname, rpcreq->prog->progver,
- rpcreq->procnum, rpc->conn.trans->name);
+ rpcreq->procnum, conn->name);
}
if ((ret >= 0) && frame) {
@@ -1506,7 +1539,7 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
"(XID: 0x%x Program: %s, ProgVers: %d, "
"Proc: %d) to rpc-transport (%s)", rpcreq->xid,
rpcreq->prog->progname, rpcreq->prog->progver,
- rpcreq->procnum, rpc->conn.trans->name);
+ rpcreq->procnum, conn->name);
}
}
pthread_mutex_unlock (&conn->lock);
@@ -1554,11 +1587,14 @@ rpc_clnt_ref (struct rpc_clnt *rpc)
static void
rpc_clnt_trigger_destroy (struct rpc_clnt *rpc)
{
+ rpc_clnt_connection_t *conn = NULL;
+
if (!rpc)
return;
+ conn = &rpc->conn;
rpc_clnt_disable (rpc);
- rpc_transport_unref (rpc->conn.trans);
+ rpc_transport_unref (conn->trans);
}
static void
@@ -1627,6 +1663,7 @@ void
rpc_clnt_disable (struct rpc_clnt *rpc)
{
rpc_clnt_connection_t *conn = NULL;
+ rpc_transport_t *trans = NULL;
if (!rpc) {
goto out;
@@ -1654,11 +1691,15 @@ rpc_clnt_disable (struct rpc_clnt *rpc)
conn->ping_timer = NULL;
conn->ping_started = 0;
}
+ trans = conn->trans;
+ conn->trans = NULL;
}
pthread_mutex_unlock (&conn->lock);
- rpc_transport_disconnect (rpc->conn.trans);
+ if (trans) {
+ rpc_transport_disconnect (trans);
+ }
out:
return;
@@ -1670,7 +1711,7 @@ rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config)
{
if (config->rpc_timeout) {
if (config->rpc_timeout != rpc->conn.config.rpc_timeout)
- gf_log (rpc->conn.trans->name, GF_LOG_INFO,
+ gf_log (rpc->conn.name, GF_LOG_INFO,
"changing timeout to %d (from %d)",
config->rpc_timeout,
rpc->conn.config.rpc_timeout);
@@ -1679,7 +1720,7 @@ rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config)
if (config->remote_port) {
if (config->remote_port != rpc->conn.config.remote_port)
- gf_log (rpc->conn.trans->name, GF_LOG_INFO,
+ gf_log (rpc->conn.name, GF_LOG_INFO,
"changing port to %d (from %d)",
config->remote_port,
rpc->conn.config.remote_port);
@@ -1691,13 +1732,13 @@ rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config)
if (rpc->conn.config.remote_host) {
if (strcmp (rpc->conn.config.remote_host,
config->remote_host))
- gf_log (rpc->conn.trans->name, GF_LOG_INFO,
+ gf_log (rpc->conn.name, GF_LOG_INFO,
"changing hostname to %s (from %s)",
config->remote_host,
rpc->conn.config.remote_host);
GF_FREE (rpc->conn.config.remote_host);
} else {
- gf_log (rpc->conn.trans->name, GF_LOG_INFO,
+ gf_log (rpc->conn.name, GF_LOG_INFO,
"setting hostname to %s",
config->remote_host);
}