summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-lib/src/rpc-clnt.c
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-lib/src/rpc-clnt.c')
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.c1979
1 files changed, 1979 insertions, 0 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
new file mode 100644
index 00000000000..517037c4a5d
--- /dev/null
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -0,0 +1,1979 @@
+/*
+ Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#define RPC_CLNT_DEFAULT_REQUEST_COUNT 512
+
+#include "rpc-clnt.h"
+#include "rpc-clnt-ping.h"
+#include <glusterfs/byte-order.h>
+#include "xdr-rpcclnt.h"
+#include "rpc-transport.h"
+#include "protocol-common.h"
+#include <glusterfs/mem-pool.h>
+#include "xdr-rpc.h"
+#include "rpc-common-xdr.h"
+
+void
+rpc_clnt_reply_deinit(struct rpc_req *req, struct mem_pool *pool);
+
+struct saved_frame *
+__saved_frames_get_timedout(struct saved_frames *frames, uint32_t timeout,
+ struct timeval *current)
+{
+ struct saved_frame *bailout_frame = NULL, *tmp = NULL;
+
+ if (!list_empty(&frames->sf.list)) {
+ tmp = list_entry(frames->sf.list.next, typeof(*tmp), list);
+ if ((tmp->saved_at.tv_sec + timeout) <= current->tv_sec) {
+ bailout_frame = tmp;
+ list_del_init(&bailout_frame->list);
+ frames->count--;
+ }
+ }
+
+ return bailout_frame;
+}
+
+static int
+_is_lock_fop(struct saved_frame *sframe)
+{
+ int fop = 0;
+
+ if (SFRAME_GET_PROGNUM(sframe) == GLUSTER_FOP_PROGRAM &&
+ SFRAME_GET_PROGVER(sframe) == GLUSTER_FOP_VERSION)
+ fop = SFRAME_GET_PROCNUM(sframe);
+
+ return ((fop == GFS3_OP_LK) || (fop == GFS3_OP_INODELK) ||
+ (fop == GFS3_OP_FINODELK) || (fop == GFS3_OP_ENTRYLK) ||
+ (fop == GFS3_OP_FENTRYLK));
+}
+
+static struct saved_frame *
+__saved_frames_put(struct saved_frames *frames, void *frame,
+ struct rpc_req *rpcreq)
+{
+ struct saved_frame *saved_frame = mem_get(
+ rpcreq->conn->rpc_clnt->saved_frames_pool);
+
+ if (!saved_frame) {
+ goto out;
+ }
+ /* THIS should be saved and set back */
+
+ INIT_LIST_HEAD(&saved_frame->list);
+
+ saved_frame->capital_this = THIS;
+ saved_frame->frame = frame;
+ saved_frame->rpcreq = rpcreq;
+ gettimeofday(&saved_frame->saved_at, NULL);
+ memset(&saved_frame->rsp, 0, sizeof(rpc_transport_rsp_t));
+
+ if (_is_lock_fop(saved_frame))
+ list_add_tail(&saved_frame->list, &frames->lk_sf.list);
+ else
+ list_add_tail(&saved_frame->list, &frames->sf.list);
+
+ frames->count++;
+
+out:
+ return saved_frame;
+}
+
+static void
+call_bail(void *data)
+{
+ rpc_transport_t *trans = NULL;
+ struct rpc_clnt *clnt = NULL;
+ rpc_clnt_connection_t *conn = NULL;
+ struct timeval current;
+ struct list_head list;
+ struct saved_frame *saved_frame = NULL;
+ struct saved_frame *trav = NULL;
+ struct saved_frame *tmp = NULL;
+ char frame_sent[GF_TIMESTR_SIZE] = {
+ 0,
+ };
+ struct timespec timeout = {
+ 0,
+ };
+ char peerid[UNIX_PATH_MAX] = {0};
+ gf_boolean_t need_unref = _gf_false;
+
+ GF_VALIDATE_OR_GOTO("client", data, out);
+
+ clnt = data;
+
+ conn = &clnt->conn;
+ pthread_mutex_lock(&conn->lock);
+ {
+ trans = conn->trans;
+ if (trans) {
+ (void)snprintf(peerid, sizeof(peerid), "%s",
+ conn->trans->peerinfo.identifier);
+ }
+ }
+ pthread_mutex_unlock(&conn->lock);
+ /*rpc_clnt_connection_cleanup will be unwinding all saved frames,
+ * bailed or otherwise*/
+ if (!trans)
+ goto out;
+
+ gettimeofday(&current, NULL);
+ INIT_LIST_HEAD(&list);
+
+ pthread_mutex_lock(&conn->lock);
+ {
+ /* Chaining to get call-always functionality from
+ call-once timer */
+ if (conn->timer) {
+ timeout.tv_sec = 10;
+ timeout.tv_nsec = 0;
+
+ /* Ref rpc as it's added to timer event queue */
+ rpc_clnt_ref(clnt);
+ gf_timer_call_cancel(clnt->ctx, conn->timer);
+ conn->timer = gf_timer_call_after(clnt->ctx, timeout, call_bail,
+ (void *)clnt);
+
+ if (conn->timer == NULL) {
+ gf_log(conn->name, GF_LOG_WARNING,
+ "Cannot create bailout timer for %s", peerid);
+ need_unref = _gf_true;
+ }
+ }
+
+ do {
+ saved_frame = __saved_frames_get_timedout(
+ conn->saved_frames, conn->frame_timeout, &current);
+ if (saved_frame)
+ list_add(&saved_frame->list, &list);
+
+ } while (saved_frame);
+ }
+ pthread_mutex_unlock(&conn->lock);
+
+ if (list_empty(&list))
+ goto out;
+
+ list_for_each_entry_safe(trav, tmp, &list, list)
+ {
+ gf_time_fmt_tv(frame_sent, sizeof frame_sent, &trav->saved_at,
+ gf_timefmt_FT);
+
+ gf_log(conn->name, GF_LOG_ERROR,
+ "bailing out frame type(%s), op(%s(%d)), xid = 0x%x, "
+ "unique = %" PRIu64 ", sent = %s, timeout = %d for %s",
+ trav->rpcreq->prog->progname,
+ (trav->rpcreq->prog->procnames)
+ ? trav->rpcreq->prog->procnames[trav->rpcreq->procnum]
+ : "--",
+ trav->rpcreq->procnum, trav->rpcreq->xid,
+ ((call_frame_t *)(trav->frame))->root->unique, frame_sent,
+ conn->frame_timeout, peerid);
+
+ clnt = rpc_clnt_ref(clnt);
+ trav->rpcreq->rpc_status = -1;
+ trav->rpcreq->cbkfn(trav->rpcreq, NULL, 0, trav->frame);
+
+ rpc_clnt_reply_deinit(trav->rpcreq, clnt->reqpool);
+ clnt = rpc_clnt_unref(clnt);
+ list_del_init(&trav->list);
+ mem_put(trav);
+ }
+out:
+ rpc_clnt_unref(clnt);
+ if (need_unref)
+ rpc_clnt_unref(clnt);
+ return;
+}
+
+/* to be called with conn->lock held */
+static struct saved_frame *
+__save_frame(struct rpc_clnt *rpc_clnt, call_frame_t *frame,
+ struct rpc_req *rpcreq)
+{
+ rpc_clnt_connection_t *conn = &rpc_clnt->conn;
+ struct timespec timeout = {
+ 0,
+ };
+ struct saved_frame *saved_frame = __saved_frames_put(conn->saved_frames,
+ frame, rpcreq);
+
+ if (saved_frame == NULL) {
+ goto out;
+ }
+
+ /* TODO: make timeout configurable */
+ if (conn->timer == NULL) {
+ timeout.tv_sec = 10;
+ timeout.tv_nsec = 0;
+ rpc_clnt_ref(rpc_clnt);
+ conn->timer = gf_timer_call_after(rpc_clnt->ctx, timeout, call_bail,
+ (void *)rpc_clnt);
+ }
+
+out:
+ return saved_frame;
+}
+
+struct saved_frames *
+saved_frames_new(void)
+{
+ struct saved_frames *saved_frames = NULL;
+
+ saved_frames = GF_CALLOC(1, sizeof(*saved_frames),
+ gf_common_mt_rpcclnt_savedframe_t);
+ if (!saved_frames) {
+ return NULL;
+ }
+
+ INIT_LIST_HEAD(&saved_frames->sf.list);
+ INIT_LIST_HEAD(&saved_frames->lk_sf.list);
+
+ return saved_frames;
+}
+
+int
+__saved_frame_copy(struct saved_frames *frames, int64_t callid,
+ struct saved_frame *saved_frame)
+{
+ struct saved_frame *tmp = NULL;
+ int ret = -1;
+
+ if (!saved_frame) {
+ ret = 0;
+ goto out;
+ }
+
+ list_for_each_entry(tmp, &frames->sf.list, list)
+ {
+ if (tmp->rpcreq->xid == callid) {
+ *saved_frame = *tmp;
+ ret = 0;
+ goto out;
+ }
+ }
+
+ list_for_each_entry(tmp, &frames->lk_sf.list, list)
+ {
+ if (tmp->rpcreq->xid == callid) {
+ *saved_frame = *tmp;
+ ret = 0;
+ goto out;
+ }
+ }
+
+out:
+ return ret;
+}
+
+struct saved_frame *
+__saved_frame_get(struct saved_frames *frames, int64_t callid)
+{
+ struct saved_frame *saved_frame = NULL;
+ struct saved_frame *tmp = NULL;
+
+ list_for_each_entry(tmp, &frames->sf.list, list)
+ {
+ if (tmp->rpcreq->xid == callid) {
+ list_del_init(&tmp->list);
+ frames->count--;
+ saved_frame = tmp;
+ goto out;
+ }
+ }
+
+ list_for_each_entry(tmp, &frames->lk_sf.list, list)
+ {
+ if (tmp->rpcreq->xid == callid) {
+ list_del_init(&tmp->list);
+ frames->count--;
+ saved_frame = tmp;
+ goto out;
+ }
+ }
+
+out:
+ if (saved_frame) {
+ THIS = saved_frame->capital_this;
+ }
+
+ return saved_frame;
+}
+
+void
+saved_frames_unwind(struct saved_frames *saved_frames)
+{
+ struct saved_frame *trav = NULL;
+ struct saved_frame *tmp = NULL;
+ char timestr[GF_TIMESTR_SIZE] = {
+ 0,
+ };
+
+ list_splice_init(&saved_frames->lk_sf.list, &saved_frames->sf.list);
+
+ list_for_each_entry_safe(trav, tmp, &saved_frames->sf.list, list)
+ {
+ gf_time_fmt_tv(timestr, sizeof timestr, &trav->saved_at, gf_timefmt_FT);
+
+ if (!trav->rpcreq || !trav->rpcreq->prog)
+ continue;
+
+ gf_log_callingfn(
+ trav->rpcreq->conn->name, GF_LOG_ERROR,
+ "forced unwinding frame type(%s) op(%s(%d)) "
+ "called at %s (xid=0x%x)",
+ trav->rpcreq->prog->progname,
+ ((trav->rpcreq->prog->procnames)
+ ? trav->rpcreq->prog->procnames[trav->rpcreq->procnum]
+ : "--"),
+ trav->rpcreq->procnum, timestr, trav->rpcreq->xid);
+ saved_frames->count--;
+
+ trav->rpcreq->rpc_status = -1;
+ trav->rpcreq->cbkfn(trav->rpcreq, NULL, 0, trav->frame);
+
+ rpc_clnt_reply_deinit(trav->rpcreq,
+ trav->rpcreq->conn->rpc_clnt->reqpool);
+
+ list_del_init(&trav->list);
+ mem_put(trav);
+ }
+}
+
+void
+saved_frames_destroy(struct saved_frames *frames)
+{
+ if (!frames)
+ return;
+
+ saved_frames_unwind(frames);
+
+ GF_FREE(frames);
+}
+
+void
+rpc_clnt_reconnect(void *conn_ptr)
+{
+ rpc_transport_t *trans = NULL;
+ rpc_clnt_connection_t *conn = NULL;
+ struct timespec ts = {0, 0};
+ struct rpc_clnt *clnt = NULL;
+ gf_boolean_t need_unref = _gf_false;
+ gf_boolean_t canceled_unref = _gf_false;
+
+ conn = conn_ptr;
+ clnt = conn->rpc_clnt;
+ pthread_mutex_lock(&conn->lock);
+ {
+ trans = conn->trans;
+ if (!trans)
+ goto out_unlock;
+
+ if (conn->reconnect) {
+ if (!gf_timer_call_cancel(clnt->ctx, conn->reconnect))
+ canceled_unref = _gf_true;
+ }
+ conn->reconnect = 0;
+
+ if ((conn->connected == 0) && !clnt->disabled) {
+ ts.tv_sec = 3;
+ ts.tv_nsec = 0;
+
+ gf_log(conn->name, GF_LOG_TRACE, "attempting reconnect");
+ (void)rpc_transport_connect(trans, conn->config.remote_port);
+ rpc_clnt_ref(clnt);
+ conn->reconnect = gf_timer_call_after(clnt->ctx, ts,
+ rpc_clnt_reconnect, conn);
+ if (!conn->reconnect) {
+ need_unref = _gf_true;
+ gf_log(conn->name, GF_LOG_ERROR,
+ "Error adding to timer event queue");
+ }
+ } else {
+ gf_log(conn->name, GF_LOG_TRACE, "breaking reconnect chain");
+ }
+ }
+out_unlock:
+ pthread_mutex_unlock(&conn->lock);
+
+ rpc_clnt_unref(clnt);
+ if (need_unref)
+ rpc_clnt_unref(clnt);
+ if (canceled_unref)
+ rpc_clnt_unref(clnt);
+ return;
+}
+
+int
+rpc_clnt_fill_request_info(struct rpc_clnt *clnt, rpc_request_info_t *info)
+{
+ struct saved_frame saved_frame;
+ int ret = -1;
+
+ pthread_mutex_lock(&clnt->conn.lock);
+ {
+ ret = __saved_frame_copy(clnt->conn.saved_frames, info->xid,
+ &saved_frame);
+ }
+ pthread_mutex_unlock(&clnt->conn.lock);
+
+ if (ret == -1) {
+ gf_log(clnt->conn.name, GF_LOG_CRITICAL,
+ "cannot lookup the saved "
+ "frame corresponding to xid (%d)",
+ info->xid);
+ goto out;
+ }
+
+ info->prognum = saved_frame.rpcreq->prog->prognum;
+ info->procnum = saved_frame.rpcreq->procnum;
+ info->progver = saved_frame.rpcreq->prog->progver;
+ info->rpc_req = saved_frame.rpcreq;
+ info->rsp = saved_frame.rsp;
+
+ ret = 0;
+out:
+ return ret;
+}
+
+int
+rpc_clnt_reconnect_cleanup(rpc_clnt_connection_t *conn)
+{
+ struct rpc_clnt *clnt = NULL;
+ int ret = 0;
+ gf_boolean_t reconnect_unref = _gf_false;
+
+ if (!conn) {
+ goto out;
+ }
+
+ clnt = conn->rpc_clnt;
+
+ pthread_mutex_lock(&conn->lock);
+ {
+ if (conn->reconnect) {
+ ret = gf_timer_call_cancel(clnt->ctx, conn->reconnect);
+ if (!ret) {
+ reconnect_unref = _gf_true;
+ conn->cleanup_gen++;
+ }
+ conn->reconnect = NULL;
+ }
+ }
+ pthread_mutex_unlock(&conn->lock);
+
+ if (reconnect_unref)
+ rpc_clnt_unref(clnt);
+
+out:
+ return 0;
+}
+
+/*
+ * client_protocol_cleanup - cleanup function
+ * @trans: transport object
+ *
+ */
+int
+rpc_clnt_connection_cleanup(rpc_clnt_connection_t *conn)
+{
+ struct saved_frames *saved_frames = NULL;
+ struct rpc_clnt *clnt = NULL;
+ int unref = 0;
+ int ret = 0;
+ gf_boolean_t timer_unref = _gf_false;
+ gf_boolean_t reconnect_unref = _gf_false;
+
+ if (!conn) {
+ goto out;
+ }
+
+ clnt = conn->rpc_clnt;
+
+ pthread_mutex_lock(&conn->lock);
+ {
+ saved_frames = conn->saved_frames;
+ conn->saved_frames = saved_frames_new();
+
+ /* bailout logic cleanup */
+ if (conn->timer) {
+ ret = gf_timer_call_cancel(clnt->ctx, conn->timer);
+ if (!ret)
+ timer_unref = _gf_true;
+ conn->timer = NULL;
+ }
+ if (conn->reconnect) {
+ ret = gf_timer_call_cancel(clnt->ctx, conn->reconnect);
+ if (!ret)
+ reconnect_unref = _gf_true;
+ conn->reconnect = NULL;
+ }
+
+ conn->connected = 0;
+ conn->disconnected = 1;
+
+ unref = rpc_clnt_remove_ping_timer_locked(clnt);
+ /*reset rpc msgs stats*/
+ conn->pingcnt = 0;
+ conn->msgcnt = 0;
+ conn->cleanup_gen++;
+ }
+ pthread_mutex_unlock(&conn->lock);
+
+ saved_frames_destroy(saved_frames);
+ if (unref)
+ rpc_clnt_unref(clnt);
+
+ if (timer_unref)
+ rpc_clnt_unref(clnt);
+
+ if (reconnect_unref)
+ rpc_clnt_unref(clnt);
+out:
+ return 0;
+}
+
+/*
+ * lookup_frame - lookup call frame corresponding to a given callid
+ * @trans: transport object
+ * @callid: call id of the frame
+ *
+ * not for external reference
+ */
+
+static struct saved_frame *
+lookup_frame(rpc_clnt_connection_t *conn, int64_t callid)
+{
+ struct saved_frame *frame = NULL;
+
+ pthread_mutex_lock(&conn->lock);
+ {
+ frame = __saved_frame_get(conn->saved_frames, callid);
+ }
+ pthread_mutex_unlock(&conn->lock);
+
+ return frame;
+}
+
+int
+rpc_clnt_reply_fill(rpc_transport_pollin_t *msg, rpc_clnt_connection_t *conn,
+ struct rpc_msg *replymsg, struct iovec progmsg,
+ struct rpc_req *req, struct saved_frame *saved_frame)
+{
+ int ret = -1;
+
+ if ((!conn) || (!replymsg) || (!req) || (!saved_frame) || (!msg)) {
+ goto out;
+ }
+
+ req->rpc_status = 0;
+ if ((rpc_reply_status(replymsg) == MSG_DENIED) ||
+ (rpc_accepted_reply_status(replymsg) != SUCCESS)) {
+ req->rpc_status = -1;
+ }
+
+ req->rsp[0] = progmsg;
+ req->rsp_iobref = iobref_ref(msg->iobref);
+
+ if (msg->vectored) {
+ req->rsp[1] = msg->vector[1];
+ req->rspcnt = 2;
+ } else {
+ req->rspcnt = 1;
+ }
+
+ /* By this time, the data bytes for the auth scheme would have already
+ * been copied into the required sections of the req structure,
+ * we just need to fill in the meta-data about it now.
+ */
+ if (req->rpc_status == 0) {
+ /*
+ * req->verf.flavour = rpc_reply_verf_flavour (replymsg);
+ * req->verf.datalen = rpc_reply_verf_len (replymsg);
+ */
+ }
+
+ ret = 0;
+
+out:
+ return ret;
+}
+
+void
+rpc_clnt_reply_deinit(struct rpc_req *req, struct mem_pool *pool)
+{
+ if (!req) {
+ goto out;
+ }
+
+ if (req->rsp_iobref) {
+ iobref_unref(req->rsp_iobref);
+ }
+
+ mem_put(req);
+out:
+ return;
+}
+
+/* TODO: use mem-pool for allocating requests */
+int
+rpc_clnt_reply_init(rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg,
+ struct rpc_req *req, struct saved_frame *saved_frame)
+{
+ char *msgbuf = NULL;
+ struct rpc_msg rpcmsg;
+ struct iovec progmsg; /* RPC Program payload */
+ size_t msglen = 0;
+ int ret = -1;
+
+ msgbuf = msg->vector[0].iov_base;
+ msglen = msg->vector[0].iov_len;
+
+ ret = xdr_to_rpc_reply(msgbuf, msglen, &rpcmsg, &progmsg,
+ req->verf.authdata);
+ if (ret != 0) {
+ gf_log(conn->name, GF_LOG_WARNING, "RPC reply decoding failed");
+ goto out;
+ }
+
+ ret = rpc_clnt_reply_fill(msg, conn, &rpcmsg, progmsg, req, saved_frame);
+ if (ret != 0) {
+ goto out;
+ }
+
+ gf_log(conn->name, GF_LOG_TRACE,
+ "received rpc message (RPC XID: 0x%x"
+ " Program: %s, ProgVers: %d, Proc: %d) from rpc-transport (%s)",
+ saved_frame->rpcreq->xid, saved_frame->rpcreq->prog->progname,
+ saved_frame->rpcreq->prog->progver, saved_frame->rpcreq->procnum,
+ conn->name);
+
+out:
+ if (ret != 0) {
+ req->rpc_status = -1;
+ }
+
+ return ret;
+}
+
+int
+rpc_clnt_handle_cbk(struct rpc_clnt *clnt, rpc_transport_pollin_t *msg)
+{
+ char *msgbuf = NULL;
+ rpcclnt_cb_program_t *program = NULL;
+ struct rpc_msg rpcmsg;
+ struct iovec progmsg; /* RPC Program payload */
+ size_t msglen = 0;
+ int found = 0;
+ int ret = -1;
+ int procnum = 0;
+
+ msgbuf = msg->vector[0].iov_base;
+ msglen = msg->vector[0].iov_len;
+
+ clnt = rpc_clnt_ref(clnt);
+ ret = xdr_to_rpc_call(msgbuf, msglen, &rpcmsg, &progmsg, NULL, NULL);
+ if (ret == -1) {
+ gf_log(clnt->conn.name, GF_LOG_WARNING, "RPC call decoding failed");
+ goto out;
+ }
+
+ gf_log(clnt->conn.name, GF_LOG_TRACE,
+ "receivd rpc message (XID: 0x%" GF_PRI_RPC_XID
+ ", "
+ "Ver: %" GF_PRI_RPC_VERSION ", Program: %" GF_PRI_RPC_PROG_ID
+ ", "
+ "ProgVers: %" GF_PRI_RPC_PROG_VERS ", Proc: %" GF_PRI_RPC_PROC
+ ") "
+ "from rpc-transport (%s)",
+ rpc_call_xid(&rpcmsg), rpc_call_rpcvers(&rpcmsg),
+ rpc_call_program(&rpcmsg), rpc_call_progver(&rpcmsg),
+ rpc_call_progproc(&rpcmsg), clnt->conn.name);
+
+ procnum = rpc_call_progproc(&rpcmsg);
+
+ pthread_mutex_lock(&clnt->lock);
+ {
+ list_for_each_entry(program, &clnt->programs, program)
+ {
+ if ((program->prognum == rpc_call_program(&rpcmsg)) &&
+ (program->progver == rpc_call_progver(&rpcmsg))) {
+ found = 1;
+ break;
+ }
+ }
+ }
+ pthread_mutex_unlock(&clnt->lock);
+
+ if (found && (procnum < program->numactors) &&
+ (program->actors[procnum].actor)) {
+ program->actors[procnum].actor(clnt, program->mydata, &progmsg);
+ }
+
+out:
+ rpc_clnt_unref(clnt);
+ return ret;
+}
+
+int
+rpc_clnt_handle_reply(struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin)
+{
+ rpc_clnt_connection_t *conn = NULL;
+ struct saved_frame *saved_frame = NULL;
+ int ret = -1;
+ struct rpc_req *req = NULL;
+ uint32_t xid = 0;
+
+ clnt = rpc_clnt_ref(clnt);
+ conn = &clnt->conn;
+
+ xid = ntoh32(*((uint32_t *)pollin->vector[0].iov_base));
+ saved_frame = lookup_frame(conn, xid);
+ if (saved_frame == NULL) {
+ gf_log(conn->name, GF_LOG_ERROR,
+ "cannot lookup the saved frame for reply with xid (%u)", xid);
+ goto out;
+ }
+
+ req = saved_frame->rpcreq;
+ if (req == NULL) {
+ gf_log(conn->name, GF_LOG_ERROR, "no request with frame for xid (%u)",
+ xid);
+ goto out;
+ }
+
+ ret = rpc_clnt_reply_init(conn, pollin, req, saved_frame);
+ if (ret != 0) {
+ req->rpc_status = -1;
+ gf_log(conn->name, GF_LOG_WARNING, "initialising rpc reply failed");
+ }
+
+ req->cbkfn(req, req->rsp, req->rspcnt, saved_frame->frame);
+
+ if (req) {
+ rpc_clnt_reply_deinit(req, conn->rpc_clnt->reqpool);
+ }
+out:
+
+ if (saved_frame) {
+ mem_put(saved_frame);
+ }
+
+ rpc_clnt_unref(clnt);
+ return ret;
+}
+
+gf_boolean_t
+is_rpc_clnt_disconnected(rpc_clnt_connection_t *conn)
+{
+ gf_boolean_t disconnected = _gf_true;
+
+ if (!conn)
+ return disconnected;
+
+ pthread_mutex_lock(&conn->lock);
+ {
+ disconnected = conn->disconnected;
+ }
+ pthread_mutex_unlock(&conn->lock);
+
+ return disconnected;
+}
+
+static void
+rpc_clnt_destroy(struct rpc_clnt *rpc);
+
+#define RPC_THIS_SAVE(xl) \
+ do { \
+ old_THIS = THIS; \
+ if (!old_THIS) \
+ gf_log_callingfn("rpc", GF_LOG_CRITICAL, \
+ "THIS is not initialised."); \
+ THIS = xl; \
+ } while (0)
+
+#define RPC_THIS_RESTORE (THIS = old_THIS)
+
+static int
+rpc_clnt_handle_disconnect(struct rpc_clnt *clnt, rpc_clnt_connection_t *conn)
+{
+ struct timespec ts = {
+ 0,
+ };
+ gf_boolean_t unref_clnt = _gf_false;
+ uint64_t pre_notify_gen = 0, post_notify_gen = 0;
+
+ pthread_mutex_lock(&conn->lock);
+ {
+ pre_notify_gen = conn->cleanup_gen;
+ }
+ pthread_mutex_unlock(&conn->lock);
+
+ if (clnt->notifyfn)
+ clnt->notifyfn(clnt, clnt->mydata, RPC_CLNT_DISCONNECT, NULL);
+
+ pthread_mutex_lock(&conn->lock);
+ {
+ post_notify_gen = conn->cleanup_gen;
+ }
+ pthread_mutex_unlock(&conn->lock);
+
+ if (pre_notify_gen == post_notify_gen) {
+ /* program didn't invoke cleanup, so rpc has to do it */
+ rpc_clnt_connection_cleanup(conn);
+ }
+
+ pthread_mutex_lock(&conn->lock);
+ {
+ if (!conn->rpc_clnt->disabled && (conn->reconnect == NULL)) {
+ ts.tv_sec = 3;
+ ts.tv_nsec = 0;
+
+ rpc_clnt_ref(clnt);
+ conn->reconnect = gf_timer_call_after(clnt->ctx, ts,
+ rpc_clnt_reconnect, conn);
+ if (conn->reconnect == NULL) {
+ gf_log(conn->name, GF_LOG_WARNING,
+ "Cannot create rpc_clnt_reconnect timer");
+ unref_clnt = _gf_true;
+ }
+ }
+ }
+ pthread_mutex_unlock(&conn->lock);
+
+ if (unref_clnt)
+ rpc_clnt_unref(clnt);
+
+ return 0;
+}
+
+int
+rpc_clnt_notify(rpc_transport_t *trans, void *mydata,
+ rpc_transport_event_t event, void *data, ...)
+{
+ rpc_clnt_connection_t *conn = NULL;
+ struct rpc_clnt *clnt = NULL;
+ int ret = -1;
+ rpc_request_info_t *req_info = NULL;
+ rpc_transport_pollin_t *pollin = NULL;
+ void *clnt_mydata = NULL;
+ DECLARE_OLD_THIS;
+
+ conn = mydata;
+ if (conn == NULL) {
+ goto out;
+ }
+ clnt = conn->rpc_clnt;
+ if (!clnt)
+ goto out;
+
+ RPC_THIS_SAVE(clnt->owner);
+
+ switch (event) {
+ case RPC_TRANSPORT_DISCONNECT: {
+ rpc_clnt_handle_disconnect(clnt, conn);
+ /* The auth_value was being reset to AUTH_GLUSTERFS_v2.
+ * if (clnt->auth_value)
+ * clnt->auth_value = AUTH_GLUSTERFS_v2;
+ * It should not be reset here. The disconnect during
+ * portmap request can race with handshake. If handshake
+ * happens first and disconnect later, auth_value would set
+ * to default value and it never sets back to actual auth_value
+ * supported by server. But it's important to set to lower
+ * version supported in the case where the server downgrades.
+ * So moving this code to RPC_TRANSPORT_CONNECT. Note that
+ * CONNECT cannot race with handshake as by nature it is
+ * serialized with handhake. An handshake can happen only
+ * on a connected transport and hence its strictly serialized.
+ */
+ break;
+ }
+
+ case RPC_TRANSPORT_CLEANUP:
+ if (clnt->notifyfn) {
+ clnt_mydata = clnt->mydata;
+ clnt->mydata = NULL;
+ ret = clnt->notifyfn(clnt, clnt_mydata, RPC_CLNT_DESTROY, NULL);
+ if (ret < 0) {
+ gf_log(trans->name, GF_LOG_WARNING,
+ "client notify handler returned error "
+ "while handling RPC_CLNT_DESTROY");
+ }
+ }
+ rpc_clnt_destroy(clnt);
+ ret = 0;
+ break;
+
+ case RPC_TRANSPORT_MAP_XID_REQUEST: {
+ req_info = data;
+ ret = rpc_clnt_fill_request_info(clnt, req_info);
+ break;
+ }
+
+ case RPC_TRANSPORT_MSG_RECEIVED: {
+ timespec_now_realtime(&conn->last_received);
+
+ pollin = data;
+ if (pollin->is_reply)
+ ret = rpc_clnt_handle_reply(clnt, pollin);
+ else
+ ret = rpc_clnt_handle_cbk(clnt, pollin);
+ /* ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_MSG,
+ * data);
+ */
+ break;
+ }
+
+ case RPC_TRANSPORT_MSG_SENT: {
+ timespec_now_realtime(&conn->last_sent);
+ ret = 0;
+ break;
+ }
+
+ case RPC_TRANSPORT_CONNECT: {
+ pthread_mutex_lock(&conn->lock);
+ {
+ /* Every time there is a disconnection, processes
+ * should try to connect to 'glusterd' (ie, default
+ * port) or whichever port given as 'option remote-port'
+ * in volume file. */
+ /* Below code makes sure the (re-)configured port lasts
+ * for just one successful attempt */
+ conn->config.remote_port = 0;
+ conn->connected = 1;
+ conn->disconnected = 0;
+ pthread_cond_broadcast(&conn->cond);
+ }
+ pthread_mutex_unlock(&conn->lock);
+
+ /* auth value should be set to lower version available
+ * and will be set to appropriate version supported by
+ * server after the handshake.
+ */
+ if (clnt->auth_value)
+ clnt->auth_value = AUTH_GLUSTERFS_v2;
+ if (clnt->notifyfn)
+ ret = clnt->notifyfn(clnt, clnt->mydata, RPC_CLNT_CONNECT,
+ NULL);
+
+ break;
+ }
+
+ case RPC_TRANSPORT_ACCEPT:
+ /* only meaningful on a server, no need of handling this event
+ * in a client.
+ */
+ ret = 0;
+ break;
+
+ case RPC_TRANSPORT_EVENT_THREAD_DIED:
+ /* only meaningful on a server, no need of handling this event on a
+ * client */
+ ret = 0;
+ break;
+ }
+
+out:
+ RPC_THIS_RESTORE;
+ return ret;
+}
+
+static int
+rpc_clnt_connection_init(struct rpc_clnt *clnt, glusterfs_ctx_t *ctx,
+ dict_t *options, char *name)
+{
+ int ret = -1;
+ rpc_clnt_connection_t *conn = NULL;
+ rpc_transport_t *trans = NULL;
+
+ conn = &clnt->conn;
+ pthread_mutex_init(&clnt->conn.lock, NULL);
+ pthread_cond_init(&clnt->conn.cond, NULL);
+
+ conn->name = gf_strdup(name);
+ if (!conn->name) {
+ ret = -1;
+ goto out;
+ }
+
+ ret = dict_get_int32(options, "frame-timeout", &conn->frame_timeout);
+ if (ret >= 0) {
+ gf_log(name, GF_LOG_INFO, "setting frame-timeout to %d",
+ conn->frame_timeout);
+ } else {
+ gf_log(name, GF_LOG_DEBUG, "defaulting frame-timeout to 30mins");
+ conn->frame_timeout = 1800;
+ }
+ conn->rpc_clnt = clnt;
+
+ ret = dict_get_int32(options, "ping-timeout", &conn->ping_timeout);
+ if (ret >= 0) {
+ gf_log(name, GF_LOG_DEBUG, "setting ping-timeout to %d",
+ conn->ping_timeout);
+ } else {
+ /*TODO: Once the epoll thread model is fixed,
+ change the default ping-timeout to 30sec */
+ gf_log(name, GF_LOG_DEBUG, "disable ping-timeout");
+ conn->ping_timeout = 0;
+ }
+
+ trans = rpc_transport_load(ctx, options, name);
+ if (!trans) {
+ gf_log(name, GF_LOG_WARNING,
+ "loading of new rpc-transport"
+ " failed");
+ ret = -1;
+ goto out;
+ }
+ rpc_transport_ref(trans);
+
+ pthread_mutex_lock(&conn->lock);
+ {
+ conn->trans = trans;
+ trans = NULL;
+ }
+ pthread_mutex_unlock(&conn->lock);
+
+ ret = rpc_transport_register_notify(conn->trans, rpc_clnt_notify, conn);
+ if (ret == -1) {
+ gf_log(name, GF_LOG_WARNING, "registering notify failed");
+ goto out;
+ }
+
+ conn->saved_frames = saved_frames_new();
+ if (!conn->saved_frames) {
+ gf_log(name, GF_LOG_WARNING,
+ "creation of saved_frames "
+ "failed");
+ ret = -1;
+ goto out;
+ }
+
+ ret = 0;
+
+out:
+ if (ret) {
+ pthread_mutex_lock(&conn->lock);
+ {
+ trans = conn->trans;
+ conn->trans = NULL;
+ }
+ pthread_mutex_unlock(&conn->lock);
+ if (trans)
+ rpc_transport_unref(trans);
+ // conn cleanup needs to be done since we might have failed to
+ // register notification.
+ rpc_clnt_connection_cleanup(conn);
+ }
+ return ret;
+}
+
+struct rpc_clnt *
+rpc_clnt_new(dict_t *options, xlator_t *owner, char *name,
+ uint32_t reqpool_size)
+{
+ int ret = -1;
+ struct rpc_clnt *rpc = NULL;
+ glusterfs_ctx_t *ctx = owner->ctx;
+
+ rpc = GF_CALLOC(1, sizeof(*rpc), gf_common_mt_rpcclnt_t);
+ if (!rpc) {
+ goto out;
+ }
+
+ pthread_mutex_init(&rpc->lock, NULL);
+ rpc->ctx = ctx;
+ rpc->owner = owner;
+ GF_ATOMIC_INIT(rpc->xid, 1);
+
+ if (!reqpool_size)
+ reqpool_size = RPC_CLNT_DEFAULT_REQUEST_COUNT;
+
+ rpc->reqpool = mem_pool_new(struct rpc_req, reqpool_size);
+ if (rpc->reqpool == NULL) {
+ pthread_mutex_destroy(&rpc->lock);
+ GF_FREE(rpc);
+ rpc = NULL;
+ goto out;
+ }
+
+ rpc->saved_frames_pool = mem_pool_new(struct saved_frame, reqpool_size);
+ if (rpc->saved_frames_pool == NULL) {
+ pthread_mutex_destroy(&rpc->lock);
+ mem_pool_destroy(rpc->reqpool);
+ GF_FREE(rpc);
+ rpc = NULL;
+ goto out;
+ }
+
+ ret = rpc_clnt_connection_init(rpc, ctx, options, name);
+ if (ret == -1) {
+ pthread_mutex_destroy(&rpc->lock);
+ mem_pool_destroy(rpc->reqpool);
+ mem_pool_destroy(rpc->saved_frames_pool);
+ GF_FREE(rpc);
+ rpc = NULL;
+ goto out;
+ }
+
+ /* This is handled to make sure we have modularity in getting the
+ auth data changed */
+ gf_boolean_t auth_null = dict_get_str_boolean(options, "auth-null", 0);
+
+ rpc->auth_value = (auth_null) ? 0 : AUTH_GLUSTERFS_v2;
+
+ rpc = rpc_clnt_ref(rpc);
+ INIT_LIST_HEAD(&rpc->programs);
+
+out:
+ return rpc;
+}
+
+int
+rpc_clnt_start(struct rpc_clnt *rpc)
+{
+ struct rpc_clnt_connection *conn = NULL;
+
+ if (!rpc)
+ return -1;
+
+ conn = &rpc->conn;
+
+ pthread_mutex_lock(&conn->lock);
+ {
+ rpc->disabled = 0;
+ }
+ pthread_mutex_unlock(&conn->lock);
+ /* Corresponding unref will be either on successful timer cancel or last
+ * rpc_clnt_reconnect fire event.
+ */
+ rpc_clnt_ref(rpc);
+ rpc_clnt_reconnect(conn);
+
+ return 0;
+}
+
+int
+rpc_clnt_cleanup_and_start(struct rpc_clnt *rpc)
+{
+ struct rpc_clnt_connection *conn = NULL;
+
+ if (!rpc)
+ return -1;
+
+ conn = &rpc->conn;
+
+ rpc_clnt_connection_cleanup(conn);
+
+ pthread_mutex_lock(&conn->lock);
+ {
+ rpc->disabled = 0;
+ }
+ pthread_mutex_unlock(&conn->lock);
+ /* Corresponding unref will be either on successful timer cancel or last
+ * rpc_clnt_reconnect fire event.
+ */
+ rpc_clnt_ref(rpc);
+ rpc_clnt_reconnect(conn);
+
+ return 0;
+}
+
+int
+rpc_clnt_register_notify(struct rpc_clnt *rpc, rpc_clnt_notify_t fn,
+ void *mydata)
+{
+ rpc->mydata = mydata;
+ rpc->notifyfn = fn;
+
+ return 0;
+}
+
+/* used for GF_LOG_OCCASIONALLY() */
+static int gf_auth_max_groups_log = 0;
+
+static inline int
+setup_glusterfs_auth_param_v3(call_frame_t *frame, auth_glusterfs_params_v3 *au,
+ int lk_owner_len, char *owner_data)
+{
+ int ret = -1;
+ unsigned int max_groups = 0;
+ int max_lkowner_len = 0;
+
+ au->pid = frame->root->pid;
+ au->uid = frame->root->uid;
+ au->gid = frame->root->gid;
+
+ au->flags = frame->root->flags;
+ au->ctime_sec = frame->root->ctime.tv_sec;
+ au->ctime_nsec = frame->root->ctime.tv_nsec;
+
+ au->lk_owner.lk_owner_val = owner_data;
+ au->lk_owner.lk_owner_len = lk_owner_len;
+ au->groups.groups_val = frame->root->groups;
+ au->groups.groups_len = frame->root->ngrps;
+
+ /* The number of groups and the size of lk_owner depend on oneother.
+ * We can truncate the groups, but should not touch the lk_owner. */
+ max_groups = GF_AUTH_GLUSTERFS_MAX_GROUPS(lk_owner_len, AUTH_GLUSTERFS_v3);
+ if (au->groups.groups_len > max_groups) {
+ GF_LOG_OCCASIONALLY(gf_auth_max_groups_log, "rpc-auth", GF_LOG_WARNING,
+ "truncating grouplist "
+ "from %d to %d",
+ au->groups.groups_len, max_groups);
+
+ au->groups.groups_len = max_groups;
+ }
+
+ max_lkowner_len = GF_AUTH_GLUSTERFS_MAX_LKOWNER(au->groups.groups_len,
+ AUTH_GLUSTERFS_v3);
+ if (lk_owner_len > max_lkowner_len) {
+ gf_log("rpc-clnt", GF_LOG_ERROR,
+ "lkowner field is too "
+ "big (%d), it does not fit in the rpc-header",
+ au->lk_owner.lk_owner_len);
+ errno = E2BIG;
+ goto out;
+ }
+
+ ret = 0;
+out:
+ return ret;
+}
+
+static inline int
+setup_glusterfs_auth_param_v2(call_frame_t *frame, auth_glusterfs_parms_v2 *au,
+ int lk_owner_len, char *owner_data)
+{
+ unsigned int max_groups = 0;
+ int max_lkowner_len = 0;
+ int ret = -1;
+
+ au->pid = frame->root->pid;
+ au->uid = frame->root->uid;
+ au->gid = frame->root->gid;
+
+ au->lk_owner.lk_owner_val = owner_data;
+ au->lk_owner.lk_owner_len = lk_owner_len;
+ au->groups.groups_val = frame->root->groups;
+ au->groups.groups_len = frame->root->ngrps;
+
+ /* The number of groups and the size of lk_owner depend on oneother.
+ * We can truncate the groups, but should not touch the lk_owner. */
+ max_groups = GF_AUTH_GLUSTERFS_MAX_GROUPS(lk_owner_len, AUTH_GLUSTERFS_v2);
+ if (au->groups.groups_len > max_groups) {
+ GF_LOG_OCCASIONALLY(gf_auth_max_groups_log, "rpc-auth", GF_LOG_WARNING,
+ "truncating grouplist "
+ "from %d to %d",
+ au->groups.groups_len, max_groups);
+
+ au->groups.groups_len = max_groups;
+ }
+
+ max_lkowner_len = GF_AUTH_GLUSTERFS_MAX_LKOWNER(au->groups.groups_len,
+ AUTH_GLUSTERFS_v2);
+ if (lk_owner_len > max_lkowner_len) {
+ gf_log("rpc-auth", GF_LOG_ERROR,
+ "lkowner field is too "
+ "big (%d), it does not fit in the rpc-header",
+ au->lk_owner.lk_owner_len);
+ errno = E2BIG;
+ goto out;
+ }
+
+ ret = 0;
+out:
+ return ret;
+}
+
+static ssize_t
+xdr_serialize_glusterfs_auth(struct rpc_clnt *clnt, call_frame_t *frame,
+ char *dest)
+{
+ ssize_t ret = -1;
+ XDR xdr;
+ char owner[4] = {
+ 0,
+ };
+ int32_t pid = 0;
+ char *lk_owner_data = NULL;
+ int lk_owner_len = 0;
+
+ if ((!dest))
+ return -1;
+
+ xdrmem_create(&xdr, dest, GF_MAX_AUTH_BYTES, XDR_ENCODE);
+
+ if (frame->root->lk_owner.len) {
+ lk_owner_data = frame->root->lk_owner.data;
+ lk_owner_len = frame->root->lk_owner.len;
+ } else {
+ pid = frame->root->pid;
+ owner[0] = (char)(pid & 0xff);
+ owner[1] = (char)((pid >> 8) & 0xff);
+ owner[2] = (char)((pid >> 16) & 0xff);
+ owner[3] = (char)((pid >> 24) & 0xff);
+
+ lk_owner_data = owner;
+ lk_owner_len = 4;
+ }
+
+ if (clnt->auth_value == AUTH_GLUSTERFS_v2) {
+ auth_glusterfs_parms_v2 au_v2 = {
+ 0,
+ };
+
+ ret = setup_glusterfs_auth_param_v2(frame, &au_v2, lk_owner_len,
+ lk_owner_data);
+ if (ret)
+ goto out;
+ if (!xdr_auth_glusterfs_parms_v2(&xdr, &au_v2)) {
+ gf_log(THIS->name, GF_LOG_WARNING,
+ "failed to encode auth glusterfs elements");
+ ret = -1;
+ goto out;
+ }
+ } else if (clnt->auth_value == AUTH_GLUSTERFS_v3) {
+ auth_glusterfs_params_v3 au_v3 = {
+ 0,
+ };
+
+ ret = setup_glusterfs_auth_param_v3(frame, &au_v3, lk_owner_len,
+ lk_owner_data);
+ if (ret)
+ goto out;
+
+ if (!xdr_auth_glusterfs_params_v3(&xdr, &au_v3)) {
+ gf_log(THIS->name, GF_LOG_WARNING,
+ "failed to encode auth glusterfs elements");
+ ret = -1;
+ goto out;
+ }
+ } else {
+ gf_log(THIS->name, GF_LOG_WARNING,
+ "failed to encode auth glusterfs elements");
+ ret = -1;
+ goto out;
+ }
+
+ ret = (((size_t)(&xdr)->x_private) - ((size_t)(&xdr)->x_base));
+
+out:
+ return ret;
+}
+
+int
+rpc_clnt_fill_request(struct rpc_clnt *clnt, int prognum, int progver,
+ int procnum, uint64_t xid, call_frame_t *fr,
+ struct rpc_msg *request, char *auth_data)
+{
+ int ret = -1;
+
+ if (!request) {
+ goto out;
+ }
+
+ memset(request, 0, sizeof(*request));
+
+ request->rm_xid = xid;
+ request->rm_direction = CALL;
+
+ request->rm_call.cb_rpcvers = 2;
+ request->rm_call.cb_prog = prognum;
+ request->rm_call.cb_vers = progver;
+ request->rm_call.cb_proc = procnum;
+
+ if (!clnt->auth_value) {
+ request->rm_call.cb_cred.oa_flavor = AUTH_NULL;
+ request->rm_call.cb_cred.oa_base = NULL;
+ request->rm_call.cb_cred.oa_length = 0;
+ } else {
+ ret = xdr_serialize_glusterfs_auth(clnt, fr, auth_data);
+ if (ret == -1) {
+ gf_log("rpc-clnt", GF_LOG_WARNING,
+ "cannot encode auth credentials");
+ goto out;
+ }
+
+ request->rm_call.cb_cred.oa_flavor = clnt->auth_value;
+ request->rm_call.cb_cred.oa_base = auth_data;
+ request->rm_call.cb_cred.oa_length = ret;
+ }
+ request->rm_call.cb_verf.oa_flavor = AUTH_NONE;
+ request->rm_call.cb_verf.oa_base = NULL;
+ request->rm_call.cb_verf.oa_length = 0;
+
+ ret = 0;
+out:
+ return ret;
+}
+
+struct iovec
+rpc_clnt_record_build_header(char *recordstart, size_t rlen,
+ struct rpc_msg *request, size_t payload)
+{
+ struct iovec requesthdr = {
+ 0,
+ };
+ struct iovec txrecord = {0, 0};
+ int ret = -1;
+ size_t fraglen = 0;
+
+ ret = rpc_request_to_xdr(request, recordstart, rlen, &requesthdr);
+ if (ret == -1) {
+ gf_log("rpc-clnt", GF_LOG_DEBUG, "Failed to create RPC request");
+ goto out;
+ }
+
+ fraglen = payload + requesthdr.iov_len;
+ gf_log("rpc-clnt", GF_LOG_TRACE,
+ "Request fraglen %zu, payload: %zu, "
+ "rpc hdr: %zu",
+ fraglen, payload, requesthdr.iov_len);
+
+ txrecord.iov_base = recordstart;
+
+ /* Remember, this is only the vec for the RPC header and does not
+ * include the payload above. We needed the payload only to calculate
+ * the size of the full fragment. This size is sent in the fragment
+ * header.
+ */
+ txrecord.iov_len = requesthdr.iov_len;
+
+out:
+ return txrecord;
+}
+
+struct iobuf *
+rpc_clnt_record_build_record(struct rpc_clnt *clnt, call_frame_t *fr,
+ int prognum, int progver, int procnum,
+ size_t hdrsize, uint64_t xid, struct iovec *recbuf)
+{
+ struct rpc_msg request = {
+ 0,
+ };
+ struct iobuf *request_iob = NULL;
+ char *record = NULL;
+ struct iovec recordhdr = {
+ 0,
+ };
+ size_t pagesize = 0;
+ int ret = -1;
+ size_t xdr_size = 0;
+ char auth_data[GF_MAX_AUTH_BYTES] = {
+ 0,
+ };
+
+ if ((!clnt) || (!recbuf)) {
+ goto out;
+ }
+
+ /* Fill the rpc structure and XDR it into the buffer got above. */
+ ret = rpc_clnt_fill_request(clnt, prognum, progver, procnum, xid, fr,
+ &request, auth_data);
+
+ if (ret == -1) {
+ gf_log(clnt->conn.name, GF_LOG_WARNING,
+ "cannot build a rpc-request xid (%" PRIu64 ")", xid);
+ goto out;
+ }
+
+ xdr_size = xdr_sizeof((xdrproc_t)xdr_callmsg, &request);
+
+ /* First, try to get a pointer into the buffer which the RPC
+ * layer can use.
+ */
+ request_iob = iobuf_get2(clnt->ctx->iobuf_pool, (xdr_size + hdrsize));
+ if (!request_iob) {
+ goto out;
+ }
+
+ pagesize = iobuf_pagesize(request_iob);
+
+ record = iobuf_ptr(request_iob); /* Now we have it. */
+
+ recordhdr = rpc_clnt_record_build_header(record, pagesize, &request,
+ hdrsize);
+
+ if (!recordhdr.iov_base) {
+ gf_log(clnt->conn.name, GF_LOG_ERROR, "Failed to build record header");
+ iobuf_unref(request_iob);
+ request_iob = NULL;
+ recbuf->iov_base = NULL;
+ goto out;
+ }
+
+ recbuf->iov_base = recordhdr.iov_base;
+ recbuf->iov_len = recordhdr.iov_len;
+
+out:
+ return request_iob;
+}
+
+static inline struct iobuf *
+rpc_clnt_record(struct rpc_clnt *clnt, call_frame_t *call_frame,
+ rpc_clnt_prog_t *prog, int procnum, size_t hdrlen,
+ struct iovec *rpchdr, uint64_t callid)
+{
+ if (!prog || !rpchdr || !call_frame) {
+ return NULL;
+ }
+
+ return rpc_clnt_record_build_record(clnt, call_frame, prog->prognum,
+ prog->progver, procnum, hdrlen, callid,
+ rpchdr);
+}
+
+int
+rpcclnt_cbk_program_register(struct rpc_clnt *clnt,
+ rpcclnt_cb_program_t *program, void *mydata)
+{
+ int ret = -1;
+ char already_registered = 0;
+ rpcclnt_cb_program_t *tmp = NULL;
+
+ if (!clnt)
+ goto out;
+
+ if (program->actors == NULL)
+ goto out;
+
+ pthread_mutex_lock(&clnt->lock);
+ {
+ list_for_each_entry(tmp, &clnt->programs, program)
+ {
+ if ((program->prognum == tmp->prognum) &&
+ (program->progver == tmp->progver)) {
+ already_registered = 1;
+ break;
+ }
+ }
+ }
+ pthread_mutex_unlock(&clnt->lock);
+
+ if (already_registered) {
+ gf_log_callingfn(clnt->conn.name, GF_LOG_DEBUG, "already registered");
+ ret = 0;
+ goto out;
+ }
+
+ tmp = GF_MALLOC(sizeof(*tmp), gf_common_mt_rpcclnt_cb_program_t);
+ if (tmp == NULL) {
+ goto out;
+ }
+
+ memcpy(tmp, program, sizeof(*tmp));
+ INIT_LIST_HEAD(&tmp->program);
+
+ tmp->mydata = mydata;
+
+ pthread_mutex_lock(&clnt->lock);
+ {
+ list_add_tail(&tmp->program, &clnt->programs);
+ }
+ pthread_mutex_unlock(&clnt->lock);
+
+ ret = 0;
+ gf_log(clnt->conn.name, GF_LOG_DEBUG,
+ "New program registered: %s, Num: %d, Ver: %d", program->progname,
+ program->prognum, program->progver);
+
+out:
+ if (ret == -1 && clnt) {
+ gf_log(clnt->conn.name, GF_LOG_ERROR,
+ "Program registration failed:"
+ " %s, Num: %d, Ver: %d",
+ program->progname, program->prognum, program->progver);
+ }
+
+ return ret;
+}
+
+int
+rpc_clnt_submit(struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, int procnum,
+ fop_cbk_fn_t cbkfn, struct iovec *proghdr, int proghdrcount,
+ struct iovec *progpayload, int progpayloadcount,
+ struct iobref *iobref, void *frame, struct iovec *rsphdr,
+ int rsphdr_count, struct iovec *rsp_payload,
+ int rsp_payload_count, struct iobref *rsp_iobref)
+{
+ rpc_clnt_connection_t *conn = NULL;
+ struct iobuf *request_iob = NULL;
+ struct iovec rpchdr = {
+ 0,
+ };
+ struct rpc_req *rpcreq = NULL;
+ rpc_transport_req_t req;
+ int ret = -1;
+ int proglen = 0;
+ char new_iobref = 0;
+ uint64_t callid = 0;
+ gf_boolean_t need_unref = _gf_false;
+ call_frame_t *cframe = frame;
+
+ if (!rpc || !prog || !frame) {
+ goto out;
+ }
+
+ conn = &rpc->conn;
+
+ rpcreq = mem_get(rpc->reqpool);
+ if (rpcreq == NULL) {
+ goto out;
+ }
+
+ memset(rpcreq, 0, sizeof(*rpcreq));
+ memset(&req, 0, sizeof(req));
+
+ if (!iobref) {
+ iobref = iobref_new();
+ if (!iobref) {
+ goto out;
+ }
+
+ new_iobref = 1;
+ }
+
+ callid = GF_ATOMIC_INC(rpc->xid);
+
+ rpcreq->prog = prog;
+ rpcreq->procnum = procnum;
+ rpcreq->conn = conn;
+ rpcreq->xid = callid;
+ rpcreq->cbkfn = cbkfn;
+
+ ret = -1;
+
+ if (proghdr) {
+ proglen += iov_length(proghdr, proghdrcount);
+ }
+
+ request_iob = rpc_clnt_record(rpc, frame, prog, procnum, proglen, &rpchdr,
+ callid);
+ if (!request_iob) {
+ gf_log(conn->name, GF_LOG_WARNING, "cannot build rpc-record");
+ goto out;
+ }
+
+ iobref_add(iobref, request_iob);
+
+ req.msg.rpchdr = &rpchdr;
+ req.msg.rpchdrcount = 1;
+ req.msg.proghdr = proghdr;
+ req.msg.proghdrcount = proghdrcount;
+ req.msg.progpayload = progpayload;
+ req.msg.progpayloadcount = progpayloadcount;
+ req.msg.iobref = iobref;
+
+ req.rsp.rsphdr = rsphdr;
+ req.rsp.rsphdr_count = rsphdr_count;
+ req.rsp.rsp_payload = rsp_payload;
+ req.rsp.rsp_payload_count = rsp_payload_count;
+ req.rsp.rsp_iobref = rsp_iobref;
+ req.rpc_req = rpcreq;
+
+ pthread_mutex_lock(&conn->lock);
+ {
+ if (conn->connected == 0) {
+ if (rpc->disabled)
+ goto unlock;
+ ret = rpc_transport_connect(conn->trans, conn->config.remote_port);
+ if (ret < 0) {
+ gf_log(conn->name,
+ (errno == EINPROGRESS) ? GF_LOG_DEBUG : GF_LOG_WARNING,
+ "error returned while attempting to "
+ "connect to host:%s, port:%d",
+ conn->config.remote_host, conn->config.remote_port);
+ goto unlock;
+ }
+ }
+
+ ret = rpc_transport_submit_request(conn->trans, &req);
+ if (ret == -1) {
+ gf_log(conn->name, GF_LOG_WARNING,
+ "failed to submit rpc-request "
+ "(unique: %" PRIu64
+ ", XID: 0x%x Program: %s, "
+ "ProgVers: %d, Proc: %d) to rpc-transport (%s)",
+ cframe->root->unique, rpcreq->xid, rpcreq->prog->progname,
+ rpcreq->prog->progver, rpcreq->procnum, conn->name);
+ } else if ((ret >= 0) && frame) {
+ /* Save the frame in queue */
+ __save_frame(rpc, frame, rpcreq);
+
+ /* A ref on rpc-clnt object is taken while registering
+ * call_bail to timer in __save_frame. If it fails to
+ * register, it needs an unref and should happen outside
+ * conn->lock which otherwise leads to deadlocks */
+ if (conn->timer == NULL)
+ need_unref = _gf_true;
+
+ conn->msgcnt++;
+
+ gf_log("rpc-clnt", GF_LOG_TRACE,
+ "submitted request "
+ "(unique: %" PRIu64
+ ", XID: 0x%x, Program: %s, "
+ "ProgVers: %d, Proc: %d) to rpc-transport (%s)",
+ cframe->root->unique, rpcreq->xid, rpcreq->prog->progname,
+ rpcreq->prog->progver, rpcreq->procnum, conn->name);
+ }
+ }
+unlock:
+ pthread_mutex_unlock(&conn->lock);
+
+ if (need_unref)
+ rpc_clnt_unref(rpc);
+
+ if (ret == -1) {
+ goto out;
+ }
+
+ rpc_clnt_check_and_start_ping(rpc);
+ ret = 0;
+
+out:
+ if (request_iob) {
+ iobuf_unref(request_iob);
+ }
+
+ if (new_iobref && iobref) {
+ iobref_unref(iobref);
+ }
+
+ if (frame && (ret == -1)) {
+ if (rpcreq) {
+ rpcreq->rpc_status = -1;
+ cbkfn(rpcreq, NULL, 0, frame);
+ mem_put(rpcreq);
+ }
+ }
+ return ret;
+}
+
+struct rpc_clnt *
+rpc_clnt_ref(struct rpc_clnt *rpc)
+{
+ if (!rpc)
+ return NULL;
+
+ GF_ATOMIC_INC(rpc->refcount);
+ return rpc;
+}
+
+static void
+rpc_clnt_trigger_destroy(struct rpc_clnt *rpc)
+{
+ rpc_clnt_connection_t *conn = NULL;
+ rpc_transport_t *trans = NULL;
+
+ if (!rpc)
+ return;
+
+ /* reading conn->trans outside conn->lock is OK, since this is the last
+ * ref*/
+ conn = &rpc->conn;
+ trans = conn->trans;
+ rpc_clnt_disable(rpc);
+
+ /* This is to account for rpc_clnt_disable that might have been called
+ * before rpc_clnt_unref */
+ if (trans) {
+ /* set conn->trans to NULL before rpc_transport_unref
+ * as rpc_transport_unref can potentially free conn
+ */
+ conn->trans = NULL;
+ rpc_transport_unref(trans);
+ }
+}
+
+static void
+rpc_clnt_destroy(struct rpc_clnt *rpc)
+{
+ rpcclnt_cb_program_t *program = NULL;
+ rpcclnt_cb_program_t *tmp = NULL;
+ struct saved_frames *saved_frames = NULL;
+ rpc_clnt_connection_t *conn = NULL;
+
+ if (!rpc)
+ return;
+
+ conn = &rpc->conn;
+ GF_FREE(rpc->conn.name);
+ /* Access saved_frames in critical-section to avoid
+ crash in rpc_clnt_connection_cleanup at the time
+ of destroying saved frames
+ */
+ pthread_mutex_lock(&conn->lock);
+ {
+ saved_frames = conn->saved_frames;
+ conn->saved_frames = NULL;
+ }
+ pthread_mutex_unlock(&conn->lock);
+
+ saved_frames_destroy(saved_frames);
+ pthread_mutex_destroy(&rpc->lock);
+ pthread_mutex_destroy(&rpc->conn.lock);
+ pthread_cond_destroy(&rpc->conn.cond);
+
+ /* mem-pool should be destroyed, otherwise,
+ it will cause huge memory leaks */
+ mem_pool_destroy(rpc->reqpool);
+ mem_pool_destroy(rpc->saved_frames_pool);
+
+ list_for_each_entry_safe(program, tmp, &rpc->programs, program)
+ {
+ GF_FREE(program);
+ }
+
+ GF_FREE(rpc);
+ return;
+}
+
+struct rpc_clnt *
+rpc_clnt_unref(struct rpc_clnt *rpc)
+{
+ int count = 0;
+
+ if (!rpc)
+ return NULL;
+
+ count = GF_ATOMIC_DEC(rpc->refcount);
+
+ if (!count) {
+ rpc_clnt_trigger_destroy(rpc);
+ return NULL;
+ }
+ return rpc;
+}
+
+int
+rpc_clnt_disable(struct rpc_clnt *rpc)
+{
+ rpc_clnt_connection_t *conn = NULL;
+ rpc_transport_t *trans = NULL;
+ int unref = 0;
+ int ret = 0;
+ gf_boolean_t timer_unref = _gf_false;
+ gf_boolean_t reconnect_unref = _gf_false;
+
+ if (!rpc) {
+ goto out;
+ }
+
+ conn = &rpc->conn;
+
+ pthread_mutex_lock(&conn->lock);
+ {
+ rpc->disabled = 1;
+
+ if (conn->timer) {
+ ret = gf_timer_call_cancel(rpc->ctx, conn->timer);
+ /* If the event is not fired and it actually cancelled
+ * the timer, do the unref else registered call back
+ * function will take care of it.
+ */
+ if (!ret)
+ timer_unref = _gf_true;
+ conn->timer = NULL;
+ }
+
+ if (conn->reconnect) {
+ ret = gf_timer_call_cancel(rpc->ctx, conn->reconnect);
+ if (!ret)
+ reconnect_unref = _gf_true;
+ conn->reconnect = NULL;
+ }
+ conn->connected = 0;
+
+ unref = rpc_clnt_remove_ping_timer_locked(rpc);
+ trans = conn->trans;
+ }
+ pthread_mutex_unlock(&conn->lock);
+
+ ret = -1;
+ if (trans) {
+ ret = rpc_transport_disconnect(trans, _gf_true);
+ /* The auth_value was being reset to AUTH_GLUSTERFS_v2.
+ * if (clnt->auth_value)
+ * clnt->auth_value = AUTH_GLUSTERFS_v2;
+ * It should not be reset here. The disconnect during
+ * portmap request can race with handshake. If handshake
+ * happens first and disconnect later, auth_value would set
+ * to default value and it never sets back to actual auth_value
+ * supported by server. But it's important to set to lower
+ * version supported in the case where the server downgrades.
+ * So moving this code to RPC_TRANSPORT_CONNECT. Note that
+ * CONNECT cannot race with handshake as by nature it is
+ * serialized with handhake. An handshake can happen only
+ * on a connected transport and hence its strictly serialized.
+ */
+ }
+ if (unref)
+ rpc_clnt_unref(rpc);
+
+ if (timer_unref)
+ rpc_clnt_unref(rpc);
+
+ if (reconnect_unref)
+ rpc_clnt_unref(rpc);
+
+out:
+ return ret;
+}
+
+void
+rpc_clnt_reconfig(struct rpc_clnt *rpc, struct rpc_clnt_config *config)
+{
+ if (config->ping_timeout) {
+ if (config->ping_timeout != rpc->conn.ping_timeout)
+ gf_log(rpc->conn.name, GF_LOG_INFO,
+ "changing ping timeout to %d (from %d)",
+ config->ping_timeout, rpc->conn.ping_timeout);
+
+ pthread_mutex_lock(&rpc->conn.lock);
+ {
+ rpc->conn.ping_timeout = config->ping_timeout;
+ }
+ pthread_mutex_unlock(&rpc->conn.lock);
+ }
+
+ if (config->rpc_timeout) {
+ if (config->rpc_timeout != rpc->conn.config.rpc_timeout)
+ gf_log(rpc->conn.name, GF_LOG_INFO,
+ "changing timeout to %d (from %d)", config->rpc_timeout,
+ rpc->conn.config.rpc_timeout);
+ rpc->conn.config.rpc_timeout = config->rpc_timeout;
+ }
+
+ if (config->remote_port) {
+ if (config->remote_port != rpc->conn.config.remote_port)
+ gf_log(rpc->conn.name, GF_LOG_INFO, "changing port to %d (from %d)",
+ config->remote_port, rpc->conn.config.remote_port);
+
+ rpc->conn.config.remote_port = config->remote_port;
+ }
+
+ if (config->remote_host) {
+ if (rpc->conn.config.remote_host) {
+ if (strcmp(rpc->conn.config.remote_host, config->remote_host))
+ gf_log(rpc->conn.name, GF_LOG_INFO,
+ "changing hostname to %s (from %s)", config->remote_host,
+ rpc->conn.config.remote_host);
+ GF_FREE(rpc->conn.config.remote_host);
+ } else {
+ gf_log(rpc->conn.name, GF_LOG_INFO, "setting hostname to %s",
+ config->remote_host);
+ }
+
+ rpc->conn.config.remote_host = gf_strdup(config->remote_host);
+ }
+}