summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-lib/src/rpc-clnt.c
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-lib/src/rpc-clnt.c')
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.c244
1 files changed, 101 insertions, 143 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index 428a82b76..ac98a5c91 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -1,20 +1,11 @@
/*
- Copyright (c) 2010-2011 Gluster, Inc. <http://www.gluster.com>
+ Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
- GlusterFS is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3 of the License,
- or (at your option) any later version.
-
- GlusterFS is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program. If not, see
- <http://www.gnu.org/licenses/>.
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
*/
@@ -75,8 +66,8 @@ _is_lock_fop (struct saved_frame *sframe)
{
int fop = 0;
- if (SFRAME_GET_PROGNUM (sframe) == GLUSTER3_1_FOP_PROGRAM &&
- SFRAME_GET_PROGVER (sframe) == GLUSTER3_1_FOP_VERSION)
+ if (SFRAME_GET_PROGNUM (sframe) == GLUSTER_FOP_PROGRAM &&
+ SFRAME_GET_PROGVER (sframe) == GLUSTER_FOP_VERSION)
fop = SFRAME_GET_PROCNUM (sframe);
return ((fop == GFS3_OP_LK) ||
@@ -153,9 +144,8 @@ call_bail (void *data)
struct saved_frame *saved_frame = NULL;
struct saved_frame *trav = NULL;
struct saved_frame *tmp = NULL;
- struct tm frame_sent_tm;
char frame_sent[256] = {0,};
- struct timeval timeout = {0,};
+ struct timespec timeout = {0,};
struct iovec iov = {0,};
GF_VALIDATE_OR_GOTO ("client", data, out);
@@ -173,7 +163,7 @@ call_bail (void *data)
call-once timer */
if (conn->timer) {
timeout.tv_sec = 10;
- timeout.tv_usec = 0;
+ timeout.tv_nsec = 0;
gf_timer_call_cancel (clnt->ctx, conn->timer);
conn->timer = gf_timer_call_after (clnt->ctx,
@@ -183,7 +173,8 @@ call_bail (void *data)
if (conn->timer == NULL) {
gf_log (conn->trans->name, GF_LOG_WARNING,
- "Cannot create bailout timer");
+ "Cannot create bailout timer for %s",
+ conn->trans->peerinfo.identifier);
}
}
@@ -200,21 +191,21 @@ call_bail (void *data)
pthread_mutex_unlock (&conn->lock);
list_for_each_entry_safe (trav, tmp, &list, list) {
- localtime_r (&trav->saved_at.tv_sec, &frame_sent_tm);
- strftime (frame_sent, 32, "%Y-%m-%d %H:%M:%S", &frame_sent_tm);
+ gf_time_fmt (frame_sent, sizeof frame_sent,
+ trav->saved_at.tv_sec, gf_timefmt_FT);
snprintf (frame_sent + strlen (frame_sent),
256 - strlen (frame_sent),
".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec);
gf_log (conn->trans->name, GF_LOG_ERROR,
- "bailing out frame type(%s) op(%s(%d)) xid = 0x%ux "
- "sent = %s. timeout = %d",
+ "bailing out frame type(%s) op(%s(%d)) xid = 0x%x "
+ "sent = %s. timeout = %d for %s",
trav->rpcreq->prog->progname,
(trav->rpcreq->prog->procnames) ?
trav->rpcreq->prog->procnames[trav->rpcreq->procnum] :
"--",
trav->rpcreq->procnum, trav->rpcreq->xid, frame_sent,
- conn->frame_timeout);
+ conn->frame_timeout, conn->trans->peerinfo.identifier);
clnt = rpc_clnt_ref (clnt);
trav->rpcreq->rpc_status = -1;
@@ -236,7 +227,7 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame,
struct rpc_req *rpcreq)
{
rpc_clnt_connection_t *conn = NULL;
- struct timeval timeout = {0, };
+ struct timespec timeout = {0, };
struct saved_frame *saved_frame = NULL;
conn = &rpc_clnt->conn;
@@ -250,7 +241,7 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame,
/* TODO: make timeout configurable */
if (conn->timer == NULL) {
timeout.tv_sec = 10;
- timeout.tv_usec = 0;
+ timeout.tv_nsec = 0;
conn->timer = gf_timer_call_after (rpc_clnt->ctx,
timeout,
call_bail,
@@ -349,20 +340,16 @@ out:
void
saved_frames_unwind (struct saved_frames *saved_frames)
{
- struct rpc_clnt *clnt = NULL;
struct saved_frame *trav = NULL;
struct saved_frame *tmp = NULL;
- struct tm *frame_sent_tm = NULL;
- char timestr[256] = {0,};
-
+ char timestr[1024] = {0,};
struct iovec iov = {0,};
list_splice_init (&saved_frames->lk_sf.list, &saved_frames->sf.list);
list_for_each_entry_safe (trav, tmp, &saved_frames->sf.list, list) {
- frame_sent_tm = localtime (&trav->saved_at.tv_sec);
- strftime (timestr, sizeof(timestr), "%Y-%m-%d %H:%M:%S",
- frame_sent_tm);
+ gf_time_fmt (timestr, sizeof timestr,
+ trav->saved_at.tv_sec, gf_timefmt_FT);
snprintf (timestr + strlen (timestr),
sizeof(timestr) - strlen (timestr),
".%"GF_PRI_SUSECONDS, trav->saved_at.tv_usec);
@@ -373,7 +360,7 @@ saved_frames_unwind (struct saved_frames *saved_frames)
gf_log_callingfn (trav->rpcreq->conn->trans->name,
GF_LOG_ERROR,
"forced unwinding frame type(%s) op(%s(%d)) "
- "called at %s (xid=0x%ux)",
+ "called at %s (xid=0x%x)",
trav->rpcreq->prog->progname,
((trav->rpcreq->prog->procnames) ?
trav->rpcreq->prog->procnames[trav->rpcreq->procnum]
@@ -382,14 +369,12 @@ saved_frames_unwind (struct saved_frames *saved_frames)
trav->rpcreq->xid);
saved_frames->count--;
- clnt = rpc_clnt_ref (trav->rpcreq->conn->rpc_clnt);
trav->rpcreq->rpc_status = -1;
trav->rpcreq->cbkfn (trav->rpcreq, &iov, 1, trav->frame);
rpc_clnt_reply_deinit (trav->rpcreq,
trav->rpcreq->conn->rpc_clnt->reqpool);
- clnt = rpc_clnt_unref (clnt);
list_del_init (&trav->list);
mem_put (trav);
}
@@ -413,7 +398,7 @@ rpc_clnt_reconnect (void *trans_ptr)
{
rpc_transport_t *trans = NULL;
rpc_clnt_connection_t *conn = NULL;
- struct timeval tv = {0, 0};
+ struct timespec ts = {0, 0};
int32_t ret = 0;
struct rpc_clnt *clnt = NULL;
@@ -432,23 +417,15 @@ rpc_clnt_reconnect (void *trans_ptr)
conn->reconnect = 0;
if (conn->connected == 0) {
- tv.tv_sec = 3;
+ ts.tv_sec = 3;
+ ts.tv_nsec = 0;
gf_log (trans->name, GF_LOG_TRACE,
"attempting reconnect");
ret = rpc_transport_connect (trans,
conn->config.remote_port);
- /* Every time there is a disconnection, processes
- should try to connect to 'glusterd' (ie, default
- port) or whichever port given as 'option remote-port'
- in volume file. */
- /* Below code makes sure the (re-)configured port lasts
- for just one successful attempt */
- if (!ret)
- conn->config.remote_port = 0;
-
conn->reconnect =
- gf_timer_call_after (clnt->ctx, tv,
+ gf_timer_call_after (clnt->ctx, ts,
rpc_clnt_reconnect,
trans);
} else {
@@ -469,7 +446,7 @@ rpc_clnt_reconnect (void *trans_ptr)
int
rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info)
{
- struct saved_frame saved_frame = {{}, 0};
+ struct saved_frame saved_frame;
int ret = -1;
pthread_mutex_lock (&clnt->conn.lock);
@@ -686,15 +663,13 @@ rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg,
}
gf_log (conn->trans->name, GF_LOG_TRACE,
- "received rpc message (RPC XID: 0x%ux"
+ "received rpc message (RPC XID: 0x%x"
" Program: %s, ProgVers: %d, Proc: %d) from rpc-transport (%s)",
saved_frame->rpcreq->xid,
saved_frame->rpcreq->prog->progname,
saved_frame->rpcreq->prog->progver,
saved_frame->rpcreq->procnum, conn->trans->name);
- req->rpc_status = 0;
-
out:
if (ret != 0) {
req->rpc_status = -1;
@@ -751,7 +726,8 @@ rpc_clnt_handle_cbk (struct rpc_clnt *clnt, rpc_transport_pollin_t *msg)
if (found && (procnum < program->numactors) &&
(program->actors[procnum].actor)) {
- program->actors[procnum].actor (&progmsg);
+ program->actors[procnum].actor (clnt, program->mydata,
+ &progmsg);
}
out:
@@ -845,6 +821,9 @@ out:
return;
}
+static void
+rpc_clnt_destroy (struct rpc_clnt *rpc);
+
int
rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
rpc_transport_event_t event, void *data, ...)
@@ -854,7 +833,7 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
int ret = -1;
rpc_request_info_t *req_info = NULL;
rpc_transport_pollin_t *pollin = NULL;
- struct timeval tv = {0, };
+ struct timespec ts = {0, };
conn = mydata;
if (conn == NULL) {
@@ -873,10 +852,11 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
{
if (!conn->rpc_clnt->disabled
&& (conn->reconnect == NULL)) {
- tv.tv_sec = 10;
+ ts.tv_sec = 10;
+ ts.tv_nsec = 0;
conn->reconnect =
- gf_timer_call_after (clnt->ctx, tv,
+ gf_timer_call_after (clnt->ctx, ts,
rpc_clnt_reconnect,
conn->trans);
}
@@ -890,9 +870,7 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
}
case RPC_TRANSPORT_CLEANUP:
- /* this event should not be received on a client for, a
- * transport is only disconnected, but never destroyed.
- */
+ rpc_clnt_destroy (clnt);
ret = 0;
break;
@@ -936,6 +914,14 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
case RPC_TRANSPORT_CONNECT:
{
+ /* Every time there is a disconnection, processes
+ should try to connect to 'glusterd' (ie, default
+ port) or whichever port given as 'option remote-port'
+ in volume file. */
+ /* Below code makes sure the (re-)configured port lasts
+ for just one successful attempt */
+ conn->config.remote_port = 0;
+
if (clnt->notifyfn)
ret = clnt->notifyfn (clnt, clnt->mydata,
RPC_CLNT_CONNECT, NULL);
@@ -962,7 +948,7 @@ rpc_clnt_connection_deinit (rpc_clnt_connection_t *conn)
}
-inline int
+static inline int
rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx,
dict_t *options, char *name)
{
@@ -1232,20 +1218,6 @@ rpc_clnt_record_build_record (struct rpc_clnt *clnt, int prognum, int progver,
goto out;
}
- xdr_size = xdr_sizeof ((xdrproc_t)xdr_callmsg, &request);
-
- /* First, try to get a pointer into the buffer which the RPC
- * layer can use.
- */
- request_iob = iobuf_get2 (clnt->ctx->iobuf_pool, (xdr_size + hdrsize));
- if (!request_iob) {
- goto out;
- }
-
- pagesize = iobuf_pagesize (request_iob);
-
- record = iobuf_ptr (request_iob); /* Now we have it. */
-
/* Fill the rpc structure and XDR it into the buffer got above. */
if (clnt->auth_null)
ret = rpc_clnt_fill_request (prognum, progver, procnum,
@@ -1260,6 +1232,20 @@ rpc_clnt_record_build_record (struct rpc_clnt *clnt, int prognum, int progver,
goto out;
}
+ xdr_size = xdr_sizeof ((xdrproc_t)xdr_callmsg, &request);
+
+ /* First, try to get a pointer into the buffer which the RPC
+ * layer can use.
+ */
+ request_iob = iobuf_get2 (clnt->ctx->iobuf_pool, (xdr_size + hdrsize));
+ if (!request_iob) {
+ goto out;
+ }
+
+ pagesize = iobuf_pagesize (request_iob);
+
+ record = iobuf_ptr (request_iob); /* Now we have it. */
+
recordhdr = rpc_clnt_record_build_header (record, pagesize, &request,
hdrsize);
@@ -1335,7 +1321,7 @@ out:
int
rpcclnt_cbk_program_register (struct rpc_clnt *clnt,
- rpcclnt_cb_program_t *program)
+ rpcclnt_cb_program_t *program, void *mydata)
{
int ret = -1;
char already_registered = 0;
@@ -1375,6 +1361,8 @@ rpcclnt_cbk_program_register (struct rpc_clnt *clnt,
memcpy (tmp, program, sizeof (*tmp));
INIT_LIST_HEAD (&tmp->program);
+ tmp->mydata = mydata;
+
pthread_mutex_lock (&clnt->lock);
{
list_add_tail (&tmp->program, &clnt->programs);
@@ -1490,10 +1478,6 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
if (conn->connected == 0) {
ret = rpc_transport_connect (conn->trans,
conn->config.remote_port);
- /* Below code makes sure the (re-)configured port lasts
- for just one successful connect attempt */
- if (!ret)
- conn->config.remote_port = 0;
}
ret = rpc_transport_submit_request (rpc->conn.trans,
@@ -1501,19 +1485,18 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
if (ret == -1) {
gf_log (conn->trans->name, GF_LOG_WARNING,
"failed to submit rpc-request "
- "(XID: 0x%ux Program: %s, ProgVers: %d, "
+ "(XID: 0x%x Program: %s, ProgVers: %d, "
"Proc: %d) to rpc-transport (%s)", rpcreq->xid,
rpcreq->prog->progname, rpcreq->prog->progver,
rpcreq->procnum, rpc->conn.trans->name);
}
if ((ret >= 0) && frame) {
- gettimeofday (&conn->last_sent, NULL);
/* Save the frame in queue */
__save_frame (rpc, frame, rpcreq);
gf_log ("rpc-clnt", GF_LOG_TRACE, "submitted request "
- "(XID: 0x%ux Program: %s, ProgVers: %d, "
+ "(XID: 0x%x Program: %s, ProgVers: %d, "
"Proc: %d) to rpc-transport (%s)", rpcreq->xid,
rpcreq->prog->progname, rpcreq->prog->progver,
rpcreq->procnum, rpc->conn.trans->name);
@@ -1562,18 +1545,21 @@ rpc_clnt_ref (struct rpc_clnt *rpc)
static void
-rpc_clnt_destroy (struct rpc_clnt *rpc)
+rpc_clnt_trigger_destroy (struct rpc_clnt *rpc)
{
if (!rpc)
return;
- if (rpc->conn.trans) {
- rpc_transport_unregister_notify (rpc->conn.trans);
- rpc_transport_disconnect (rpc->conn.trans);
- rpc_transport_unref (rpc->conn.trans);
- }
+ rpc_clnt_disable (rpc);
+ rpc_transport_unref (rpc->conn.trans);
+}
+
+static void
+rpc_clnt_destroy (struct rpc_clnt *rpc)
+{
+ if (!rpc)
+ return;
- rpc_clnt_reconnect_cleanup (&rpc->conn);
saved_frames_destroy (rpc->conn.saved_frames);
pthread_mutex_destroy (&rpc->lock);
pthread_mutex_destroy (&rpc->conn.lock);
@@ -1600,13 +1586,36 @@ rpc_clnt_unref (struct rpc_clnt *rpc)
}
pthread_mutex_unlock (&rpc->lock);
if (!count) {
- rpc_clnt_destroy (rpc);
+ rpc_clnt_trigger_destroy (rpc);
return NULL;
}
return rpc;
}
+char
+rpc_clnt_is_disabled (struct rpc_clnt *rpc)
+{
+
+ rpc_clnt_connection_t *conn = NULL;
+ char disabled = 0;
+
+ if (!rpc) {
+ goto out;
+ }
+
+ conn = &rpc->conn;
+
+ pthread_mutex_lock (&conn->lock);
+ {
+ disabled = rpc->disabled;
+ }
+ pthread_mutex_unlock (&conn->lock);
+
+out:
+ return disabled;
+}
+
void
rpc_clnt_disable (struct rpc_clnt *rpc)
{
@@ -1676,7 +1685,7 @@ rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config)
if (strcmp (rpc->conn.config.remote_host,
config->remote_host))
gf_log (rpc->conn.trans->name, GF_LOG_INFO,
- "changing port to %s (from %s)",
+ "changing hostname to %s (from %s)",
config->remote_host,
rpc->conn.config.remote_host);
FREE (rpc->conn.config.remote_host);
@@ -1689,54 +1698,3 @@ rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config)
rpc->conn.config.remote_host = gf_strdup (config->remote_host);
}
}
-
-int
-rpc_clnt_transport_unix_options_build (dict_t **options, char *filepath)
-{
- dict_t *dict = NULL;
- char *fpath = NULL;
- int ret = -1;
-
- GF_ASSERT (filepath);
- GF_ASSERT (options);
-
- dict = dict_new ();
- if (!dict)
- goto out;
-
- fpath = gf_strdup (filepath);
- if (!fpath) {
- ret = -1;
- goto out;
- }
-
- ret = dict_set_dynstr (dict, "transport.socket.connect-path", fpath);
- if (ret)
- goto out;
-
- ret = dict_set_str (dict, "transport.address-family", "unix");
- if (ret)
- goto out;
-
- ret = dict_set_str (dict, "transport.socket.nodelay", "off");
- if (ret)
- goto out;
-
- ret = dict_set_str (dict, "transport-type", "socket");
- if (ret)
- goto out;
-
- ret = dict_set_str (dict, "transport.socket.keepalive", "off");
- if (ret)
- goto out;
-
- *options = dict;
-out:
- if (ret) {
- if (fpath)
- GF_FREE (fpath);
- if (dict)
- dict_unref (dict);
- }
- return ret;
-}