summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-lib/src/rpcsvc.c
diff options
context:
space:
mode:
authorGluster Ant <bugzilla-bot@gluster.org>2018-09-12 17:52:45 +0530
committerNigel Babu <nigelb@redhat.com>2018-09-12 17:52:45 +0530
commite16868dede6455cab644805af6fe1ac312775e13 (patch)
tree15aebdb4fff2d87cf8a72f836816b3aa634da58d /rpc/rpc-lib/src/rpcsvc.c
parent45a71c0548b6fd2c757aa2e7b7671a1411948894 (diff)
Land part 2 of clang-format changes
Change-Id: Ia84cc24c8924e6d22d02ac15f611c10e26db99b4 Signed-off-by: Nigel Babu <nigelb@redhat.com>
Diffstat (limited to 'rpc/rpc-lib/src/rpcsvc.c')
-rw-r--r--rpc/rpc-lib/src/rpcsvc.c4525
1 files changed, 2265 insertions, 2260 deletions
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index c8aaf4c7fa9..c6545193a11 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -46,1175 +46,1176 @@
struct rpcsvc_program gluster_dump_prog;
-#define rpcsvc_alloc_request(svc, request) \
- do { \
- request = (rpcsvc_request_t *)mem_get ((svc)->rxpool); \
- if (request) { \
- memset (request, 0, sizeof (rpcsvc_request_t)); \
- } else { \
- gf_log ("rpcsvc", GF_LOG_ERROR, \
- "error getting memory for rpc request"); \
- } \
- } while (0)
+#define rpcsvc_alloc_request(svc, request) \
+ do { \
+ request = (rpcsvc_request_t *)mem_get((svc)->rxpool); \
+ if (request) { \
+ memset(request, 0, sizeof(rpcsvc_request_t)); \
+ } else { \
+ gf_log("rpcsvc", GF_LOG_ERROR, \
+ "error getting memory for rpc request"); \
+ } \
+ } while (0)
rpcsvc_listener_t *
-rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans);
+rpcsvc_get_listener(rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans);
int
-rpcsvc_notify (rpc_transport_t *trans, void *mydata,
- rpc_transport_event_t event, void *data, ...);
+rpcsvc_notify(rpc_transport_t *trans, void *mydata, rpc_transport_event_t event,
+ void *data, ...);
static int
-rpcsvc_match_subnet_v4 (const char *addrtok, const char *ipaddr);
+rpcsvc_match_subnet_v4(const char *addrtok, const char *ipaddr);
rpcsvc_notify_wrapper_t *
-rpcsvc_notify_wrapper_alloc (void)
+rpcsvc_notify_wrapper_alloc(void)
{
- rpcsvc_notify_wrapper_t *wrapper = NULL;
+ rpcsvc_notify_wrapper_t *wrapper = NULL;
- wrapper = GF_CALLOC (1, sizeof (*wrapper), gf_common_mt_rpcsvc_wrapper_t);
- if (!wrapper) {
- goto out;
- }
+ wrapper = GF_CALLOC(1, sizeof(*wrapper), gf_common_mt_rpcsvc_wrapper_t);
+ if (!wrapper) {
+ goto out;
+ }
- INIT_LIST_HEAD (&wrapper->list);
+ INIT_LIST_HEAD(&wrapper->list);
out:
- return wrapper;
+ return wrapper;
}
-
void
-rpcsvc_listener_destroy (rpcsvc_listener_t *listener)
+rpcsvc_listener_destroy(rpcsvc_listener_t *listener)
{
- rpcsvc_t *svc = NULL;
+ rpcsvc_t *svc = NULL;
- if (!listener) {
- goto out;
- }
+ if (!listener) {
+ goto out;
+ }
- svc = listener->svc;
- if (!svc) {
- goto listener_free;
- }
+ svc = listener->svc;
+ if (!svc) {
+ goto listener_free;
+ }
- pthread_rwlock_wrlock (&svc->rpclock);
- {
- list_del_init (&listener->list);
- }
- pthread_rwlock_unlock (&svc->rpclock);
+ pthread_rwlock_wrlock(&svc->rpclock);
+ {
+ list_del_init(&listener->list);
+ }
+ pthread_rwlock_unlock(&svc->rpclock);
listener_free:
- GF_FREE (listener);
+ GF_FREE(listener);
out:
- return;
+ return;
}
rpcsvc_vector_sizer
-rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum,
- uint32_t progver, int procnum)
+rpcsvc_get_program_vector_sizer(rpcsvc_t *svc, uint32_t prognum,
+ uint32_t progver, int procnum)
{
- rpcsvc_program_t *program = NULL;
- char found = 0;
+ rpcsvc_program_t *program = NULL;
+ char found = 0;
- if (!svc)
- return NULL;
+ if (!svc)
+ return NULL;
- pthread_rwlock_rdlock (&svc->rpclock);
+ pthread_rwlock_rdlock(&svc->rpclock);
+ {
+ /* Find the matching RPC program from registered list */
+ list_for_each_entry(program, &svc->programs, program)
{
- /* Find the matching RPC program from registered list */
- list_for_each_entry (program, &svc->programs, program) {
- if ((program->prognum == prognum)
- && (program->progver == progver)) {
- found = 1;
- break;
- }
- }
+ if ((program->prognum == prognum) &&
+ (program->progver == progver)) {
+ found = 1;
+ break;
+ }
}
- pthread_rwlock_unlock (&svc->rpclock);
-
- if (found) {
- /* Make sure the requested procnum is supported by RPC prog */
- if ((procnum < 0) || (procnum >= program->numactors)) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR,
- "RPC procedure %d not available for Program %s",
- procnum, program->progname);
- return NULL;
- }
+ }
+ pthread_rwlock_unlock(&svc->rpclock);
- /* SUCCESS: Supported procedure */
- return program->actors[procnum].vector_sizer;
+ if (found) {
+ /* Make sure the requested procnum is supported by RPC prog */
+ if ((procnum < 0) || (procnum >= program->numactors)) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR,
+ "RPC procedure %d not available for Program %s", procnum,
+ program->progname);
+ return NULL;
}
- return NULL; /* FAIL */
+ /* SUCCESS: Supported procedure */
+ return program->actors[procnum].vector_sizer;
+ }
+
+ return NULL; /* FAIL */
}
gf_boolean_t
-rpcsvc_can_outstanding_req_be_ignored (rpcsvc_request_t *req)
-{
- /*
- * If outstanding_rpc_limit is reached because of blocked locks and
- * throttling is attempted then no unlock requests will be received. So
- * the outstanding request count will never change i.e. it will always
- * be equal to the limit. This also leads to ping timer expiry on
- * client.
- */
-
- /*
- * This is a hack and a necessity until grantedlock == fop completion.
- * Ideally if we get a blocking lock request which cannot be granted
- * right now, we should unwind the fop saying “request registered, will
- * notify you when granted”, which is very hard to implement at the
- * moment. Until we bring in such mechanism, we will need to live with
- * not rate-limiting INODELK/ENTRYLK/LK fops
- */
-
- if ((req->prognum == GLUSTER_FOP_PROGRAM) &&
- (req->progver == GLUSTER_FOP_VERSION)) {
- if ((req->procnum == GFS3_OP_INODELK) ||
- (req->procnum == GFS3_OP_FINODELK) ||
- (req->procnum == GFS3_OP_ENTRYLK) ||
- (req->procnum == GFS3_OP_FENTRYLK) ||
- (req->procnum == GFS3_OP_LK))
- return _gf_true;
- }
- return _gf_false;
+rpcsvc_can_outstanding_req_be_ignored(rpcsvc_request_t *req)
+{
+ /*
+ * If outstanding_rpc_limit is reached because of blocked locks and
+ * throttling is attempted then no unlock requests will be received. So
+ * the outstanding request count will never change i.e. it will always
+ * be equal to the limit. This also leads to ping timer expiry on
+ * client.
+ */
+
+ /*
+ * This is a hack and a necessity until grantedlock == fop completion.
+ * Ideally if we get a blocking lock request which cannot be granted
+ * right now, we should unwind the fop saying “request registered, will
+ * notify you when granted”, which is very hard to implement at the
+ * moment. Until we bring in such mechanism, we will need to live with
+ * not rate-limiting INODELK/ENTRYLK/LK fops
+ */
+
+ if ((req->prognum == GLUSTER_FOP_PROGRAM) &&
+ (req->progver == GLUSTER_FOP_VERSION)) {
+ if ((req->procnum == GFS3_OP_INODELK) ||
+ (req->procnum == GFS3_OP_FINODELK) ||
+ (req->procnum == GFS3_OP_ENTRYLK) ||
+ (req->procnum == GFS3_OP_FENTRYLK) || (req->procnum == GFS3_OP_LK))
+ return _gf_true;
+ }
+ return _gf_false;
}
int
-rpcsvc_request_outstanding (rpcsvc_request_t *req, int delta)
+rpcsvc_request_outstanding(rpcsvc_request_t *req, int delta)
{
- int ret = -1;
- int old_count = 0;
- int new_count = 0;
- int limit = 0;
- gf_boolean_t throttle = _gf_false;
+ int ret = -1;
+ int old_count = 0;
+ int new_count = 0;
+ int limit = 0;
+ gf_boolean_t throttle = _gf_false;
- if (!req)
- goto out;
+ if (!req)
+ goto out;
- throttle = rpcsvc_get_throttle (req->svc);
- if (!throttle) {
- ret = 0;
- goto out;
- }
+ throttle = rpcsvc_get_throttle(req->svc);
+ if (!throttle) {
+ ret = 0;
+ goto out;
+ }
- if (rpcsvc_can_outstanding_req_be_ignored (req)) {
- ret = 0;
- goto out;
- }
+ if (rpcsvc_can_outstanding_req_be_ignored(req)) {
+ ret = 0;
+ goto out;
+ }
- pthread_mutex_lock (&req->trans->lock);
- {
- limit = req->svc->outstanding_rpc_limit;
- if (!limit)
- goto unlock;
+ pthread_mutex_lock(&req->trans->lock);
+ {
+ limit = req->svc->outstanding_rpc_limit;
+ if (!limit)
+ goto unlock;
- old_count = req->trans->outstanding_rpc_count;
- req->trans->outstanding_rpc_count += delta;
- new_count = req->trans->outstanding_rpc_count;
+ old_count = req->trans->outstanding_rpc_count;
+ req->trans->outstanding_rpc_count += delta;
+ new_count = req->trans->outstanding_rpc_count;
- if (old_count <= limit && new_count > limit)
- ret = rpc_transport_throttle (req->trans, _gf_true);
+ if (old_count <= limit && new_count > limit)
+ ret = rpc_transport_throttle(req->trans, _gf_true);
- if (old_count > limit && new_count <= limit)
- ret = rpc_transport_throttle (req->trans, _gf_false);
- }
+ if (old_count > limit && new_count <= limit)
+ ret = rpc_transport_throttle(req->trans, _gf_false);
+ }
unlock:
- pthread_mutex_unlock (&req->trans->lock);
+ pthread_mutex_unlock(&req->trans->lock);
out:
- return ret;
+ return ret;
}
-
/* This needs to change to returning errors, since
* we need to return RPC specific error messages when some
* of the pointers below are NULL.
*/
rpcsvc_actor_t *
-rpcsvc_program_actor (rpcsvc_request_t *req)
-{
- rpcsvc_program_t *program = NULL;
- int err = SYSTEM_ERR;
- rpcsvc_actor_t *actor = NULL;
- rpcsvc_t *svc = NULL;
- char found = 0;
- char *peername = NULL;
-
- if (!req)
- goto err;
-
- svc = req->svc;
- peername = req->trans->peerinfo.identifier;
- pthread_rwlock_rdlock (&svc->rpclock);
+rpcsvc_program_actor(rpcsvc_request_t *req)
+{
+ rpcsvc_program_t *program = NULL;
+ int err = SYSTEM_ERR;
+ rpcsvc_actor_t *actor = NULL;
+ rpcsvc_t *svc = NULL;
+ char found = 0;
+ char *peername = NULL;
+
+ if (!req)
+ goto err;
+
+ svc = req->svc;
+ peername = req->trans->peerinfo.identifier;
+ pthread_rwlock_rdlock(&svc->rpclock);
+ {
+ list_for_each_entry(program, &svc->programs, program)
{
- list_for_each_entry (program, &svc->programs, program) {
- if (program->prognum == req->prognum) {
- err = PROG_MISMATCH;
- }
-
- if ((program->prognum == req->prognum)
- && (program->progver == req->progver)) {
- found = 1;
- break;
- }
- }
- }
- pthread_rwlock_unlock (&svc->rpclock);
-
- if (!found) {
- if (err != PROG_MISMATCH) {
- /* log in DEBUG when nfs clients try to see if
- * ACL requests are accepted by nfs server
- */
- gf_log (GF_RPCSVC, (req->prognum == ACL_PROGRAM) ?
- GF_LOG_DEBUG : GF_LOG_WARNING,
- "RPC program not available (req %u %u) for %s",
- req->prognum, req->progver,
- peername);
- err = PROG_UNAVAIL;
- goto err;
- }
+ if (program->prognum == req->prognum) {
+ err = PROG_MISMATCH;
+ }
- gf_log (GF_RPCSVC, GF_LOG_WARNING,
- "RPC program version not available (req %u %u) for %s",
- req->prognum, req->progver,
- peername);
- goto err;
- }
- req->prog = program;
- if (!program->actors) {
- gf_log (GF_RPCSVC, GF_LOG_WARNING,
- "RPC Actor not found for program %s %d for %s",
- program->progname, program->prognum,
- peername);
- err = SYSTEM_ERR;
- goto err;
- }
-
- if ((req->procnum < 0) || (req->procnum >= program->numactors)) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC Program procedure not"
- " available for procedure %d in %s for %s",
- req->procnum, program->progname,
- peername);
- err = PROC_UNAVAIL;
- goto err;
- }
-
- actor = &program->actors[req->procnum];
- if (!actor->actor) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC Program procedure not"
- " available for procedure %d in %s for %s",
- req->procnum, program->progname,
- peername);
- err = PROC_UNAVAIL;
- actor = NULL;
- goto err;
- }
-
- req->ownthread = program->ownthread;
- req->synctask = program->synctask;
-
- err = SUCCESS;
- gf_log (GF_RPCSVC, GF_LOG_TRACE, "Actor found: %s - %s for %s",
- program->progname, actor->procname,
- peername);
+ if ((program->prognum == req->prognum) &&
+ (program->progver == req->progver)) {
+ found = 1;
+ break;
+ }
+ }
+ }
+ pthread_rwlock_unlock(&svc->rpclock);
+
+ if (!found) {
+ if (err != PROG_MISMATCH) {
+ /* log in DEBUG when nfs clients try to see if
+ * ACL requests are accepted by nfs server
+ */
+ gf_log(
+ GF_RPCSVC,
+ (req->prognum == ACL_PROGRAM) ? GF_LOG_DEBUG : GF_LOG_WARNING,
+ "RPC program not available (req %u %u) for %s", req->prognum,
+ req->progver, peername);
+ err = PROG_UNAVAIL;
+ goto err;
+ }
+
+ gf_log(GF_RPCSVC, GF_LOG_WARNING,
+ "RPC program version not available (req %u %u) for %s",
+ req->prognum, req->progver, peername);
+ goto err;
+ }
+ req->prog = program;
+ if (!program->actors) {
+ gf_log(GF_RPCSVC, GF_LOG_WARNING,
+ "RPC Actor not found for program %s %d for %s",
+ program->progname, program->prognum, peername);
+ err = SYSTEM_ERR;
+ goto err;
+ }
+
+ if ((req->procnum < 0) || (req->procnum >= program->numactors)) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR,
+ "RPC Program procedure not"
+ " available for procedure %d in %s for %s",
+ req->procnum, program->progname, peername);
+ err = PROC_UNAVAIL;
+ goto err;
+ }
+
+ actor = &program->actors[req->procnum];
+ if (!actor->actor) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR,
+ "RPC Program procedure not"
+ " available for procedure %d in %s for %s",
+ req->procnum, program->progname, peername);
+ err = PROC_UNAVAIL;
+ actor = NULL;
+ goto err;
+ }
+
+ req->ownthread = program->ownthread;
+ req->synctask = program->synctask;
+
+ err = SUCCESS;
+ gf_log(GF_RPCSVC, GF_LOG_TRACE, "Actor found: %s - %s for %s",
+ program->progname, actor->procname, peername);
err:
- if (req)
- req->rpc_err = err;
+ if (req)
+ req->rpc_err = err;
- return actor;
+ return actor;
}
-
/* this procedure can only pass 4 arguments to registered notifyfn. To send more
* arguments call wrapper->notify directly.
*/
static void
-rpcsvc_program_notify (rpcsvc_listener_t *listener, rpcsvc_event_t event,
- void *data)
+rpcsvc_program_notify(rpcsvc_listener_t *listener, rpcsvc_event_t event,
+ void *data)
{
- rpcsvc_notify_wrapper_t *wrapper = NULL;
+ rpcsvc_notify_wrapper_t *wrapper = NULL;
- if (!listener) {
- goto out;
- }
+ if (!listener) {
+ goto out;
+ }
- list_for_each_entry (wrapper, &listener->svc->notify, list) {
- if (wrapper->notify) {
- wrapper->notify (listener->svc,
- wrapper->data,
- event, data);
- }
+ list_for_each_entry(wrapper, &listener->svc->notify, list)
+ {
+ if (wrapper->notify) {
+ wrapper->notify(listener->svc, wrapper->data, event, data);
}
+ }
out:
- return;
+ return;
}
-
static int
-rpcsvc_accept (rpcsvc_t *svc, rpc_transport_t *listen_trans,
- rpc_transport_t *new_trans)
+rpcsvc_accept(rpcsvc_t *svc, rpc_transport_t *listen_trans,
+ rpc_transport_t *new_trans)
{
- rpcsvc_listener_t *listener = NULL;
- int32_t ret = -1;
+ rpcsvc_listener_t *listener = NULL;
+ int32_t ret = -1;
- listener = rpcsvc_get_listener (svc, -1, listen_trans);
- if (listener == NULL) {
- goto out;
- }
+ listener = rpcsvc_get_listener(svc, -1, listen_trans);
+ if (listener == NULL) {
+ goto out;
+ }
- rpcsvc_program_notify (listener, RPCSVC_EVENT_ACCEPT, new_trans);
- ret = 0;
+ rpcsvc_program_notify(listener, RPCSVC_EVENT_ACCEPT, new_trans);
+ ret = 0;
out:
- return ret;
+ return ret;
}
-
void
-rpcsvc_request_destroy (rpcsvc_request_t *req)
+rpcsvc_request_destroy(rpcsvc_request_t *req)
{
- if (!req) {
- goto out;
- }
+ if (!req) {
+ goto out;
+ }
- if (req->iobref) {
- iobref_unref (req->iobref);
- }
+ if (req->iobref) {
+ iobref_unref(req->iobref);
+ }
- /* This marks the "end" of an RPC request. Reply is
- completely written to the socket and is on the way
- to the client. It is time to decrement the
- outstanding request counter by 1.
- */
- if (req->prognum) //Only for initialized requests
- rpcsvc_request_outstanding (req, -1);
+ /* This marks the "end" of an RPC request. Reply is
+ completely written to the socket and is on the way
+ to the client. It is time to decrement the
+ outstanding request counter by 1.
+ */
+ if (req->prognum) // Only for initialized requests
+ rpcsvc_request_outstanding(req, -1);
- rpc_transport_unref (req->trans);
+ rpc_transport_unref(req->trans);
- GF_FREE (req->auxgidlarge);
+ GF_FREE(req->auxgidlarge);
- mem_put (req);
+ mem_put(req);
out:
- return;
+ return;
}
-
rpcsvc_request_t *
-rpcsvc_request_init (rpcsvc_t *svc, rpc_transport_t *trans,
- struct rpc_msg *callmsg,
- struct iovec progmsg, rpc_transport_pollin_t *msg,
- rpcsvc_request_t *req)
+rpcsvc_request_init(rpcsvc_t *svc, rpc_transport_t *trans,
+ struct rpc_msg *callmsg, struct iovec progmsg,
+ rpc_transport_pollin_t *msg, rpcsvc_request_t *req)
{
- int i = 0;
-
- if ((!trans) || (!callmsg)|| (!req) || (!msg))
- return NULL;
+ int i = 0;
- /* We start a RPC request as always denied. */
- req->rpc_status = MSG_DENIED;
- req->xid = rpc_call_xid (callmsg);
- req->prognum = rpc_call_program (callmsg);
- req->progver = rpc_call_progver (callmsg);
- req->procnum = rpc_call_progproc (callmsg);
- req->trans = rpc_transport_ref (trans);
- req->count = msg->count;
- req->msg[0] = progmsg;
- req->iobref = iobref_ref (msg->iobref);
- if (msg->vectored) {
- /* msg->vector[MAX_IOVEC] is defined in structure. prevent a
- out of bound access */
- for (i = 1; i < min (msg->count, MAX_IOVEC); i++) {
- req->msg[i] = msg->vector[i];
- }
- }
-
- req->svc = svc;
- req->trans_private = msg->private;
-
- INIT_LIST_HEAD (&req->txlist);
- INIT_LIST_HEAD (&req->request_list);
- req->payloadsize = 0;
+ if ((!trans) || (!callmsg) || (!req) || (!msg))
+ return NULL;
- /* By this time, the data bytes for the auth scheme would have already
- * been copied into the required sections of the req structure,
- * we just need to fill in the meta-data about it now.
- */
- rpcsvc_auth_request_init (req, callmsg);
- return req;
+ /* We start a RPC request as always denied. */
+ req->rpc_status = MSG_DENIED;
+ req->xid = rpc_call_xid(callmsg);
+ req->prognum = rpc_call_program(callmsg);
+ req->progver = rpc_call_progver(callmsg);
+ req->procnum = rpc_call_progproc(callmsg);
+ req->trans = rpc_transport_ref(trans);
+ req->count = msg->count;
+ req->msg[0] = progmsg;
+ req->iobref = iobref_ref(msg->iobref);
+ if (msg->vectored) {
+ /* msg->vector[MAX_IOVEC] is defined in structure. prevent a
+ out of bound access */
+ for (i = 1; i < min(msg->count, MAX_IOVEC); i++) {
+ req->msg[i] = msg->vector[i];
+ }
+ }
+
+ req->svc = svc;
+ req->trans_private = msg->private;
+
+ INIT_LIST_HEAD(&req->txlist);
+ INIT_LIST_HEAD(&req->request_list);
+ req->payloadsize = 0;
+
+ /* By this time, the data bytes for the auth scheme would have already
+ * been copied into the required sections of the req structure,
+ * we just need to fill in the meta-data about it now.
+ */
+ rpcsvc_auth_request_init(req, callmsg);
+ return req;
}
-
rpcsvc_request_t *
-rpcsvc_request_create (rpcsvc_t *svc, rpc_transport_t *trans,
- rpc_transport_pollin_t *msg)
+rpcsvc_request_create(rpcsvc_t *svc, rpc_transport_t *trans,
+ rpc_transport_pollin_t *msg)
{
- char *msgbuf = NULL;
- struct rpc_msg rpcmsg;
- struct iovec progmsg; /* RPC Program payload */
- rpcsvc_request_t *req = NULL;
- size_t msglen = 0;
- int ret = -1;
-
- if (!svc || !trans || !svc->rxpool)
- return NULL;
-
- /* We need to allocate the request before actually calling
- * rpcsvc_request_init on the request so that we, can fill the auth
- * data directly into the request structure from the message iobuf.
- * This avoids a need to keep a temp buffer into which the auth data
- * would've been copied otherwise.
- */
- rpcsvc_alloc_request (svc, req);
- if (!req) {
- goto err;
- }
-
- msgbuf = msg->vector[0].iov_base;
- msglen = msg->vector[0].iov_len;
-
- ret = xdr_to_rpc_call (msgbuf, msglen, &rpcmsg, &progmsg,
- req->cred.authdata,req->verf.authdata);
-
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_WARNING, "RPC call decoding failed");
- rpcsvc_request_seterr (req, GARBAGE_ARGS);
- req->trans = rpc_transport_ref (trans);
- req->svc = svc;
- goto err;
- }
-
- ret = -1;
- rpcsvc_request_init (svc, trans, &rpcmsg, progmsg, msg, req);
-
- gf_log (GF_RPCSVC, GF_LOG_TRACE, "received rpc-message "
- "(XID: 0x%" GF_PRI_RPC_XID ", Ver: %" GF_PRI_RPC_VERSION ", Program: %" GF_PRI_RPC_PROG_ID ", "
- "ProgVers: %" GF_PRI_RPC_PROG_VERS ", Proc: %" GF_PRI_RPC_PROC ") "
- "from rpc-transport (%s)", rpc_call_xid (&rpcmsg),
- rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg),
- rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg),
- trans->name);
-
- /* We just received a new request from the wire. Account for
- it in the outsanding request counter to make sure we don't
- ingest too many concurrent requests from the same client.
- */
- if (req->prognum) //Only for initialized requests
- ret = rpcsvc_request_outstanding (req, +1);
-
- if (rpc_call_rpcvers (&rpcmsg) != 2) {
- /* LOG- TODO: print rpc version, also print the peerinfo
- from transport */
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC version not supported "
- "(XID: 0x%" GF_PRI_RPC_XID ", Ver: %" GF_PRI_RPC_VERSION ", Program: %" GF_PRI_RPC_PROG_ID ", "
- "ProgVers: %" GF_PRI_RPC_PROG_VERS ", Proc: %" GF_PRI_RPC_PROC ") "
- "from trans (%s)", rpc_call_xid (&rpcmsg),
- rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg),
- rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg),
- trans->name);
- rpcsvc_request_seterr (req, RPC_MISMATCH);
- goto err;
- }
-
- ret = rpcsvc_authenticate (req);
- if (ret == RPCSVC_AUTH_REJECT) {
- /* No need to set auth_err, that is the responsibility of
- * the authentication handler since only that know what exact
- * error happened.
- */
- rpcsvc_request_seterr (req, AUTH_ERROR);
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "auth failed on request. "
- "(XID: 0x%" GF_PRI_RPC_XID ", Ver: %" GF_PRI_RPC_VERSION ", Program: %" GF_PRI_RPC_PROG_ID ", "
- "ProgVers: %" GF_PRI_RPC_PROG_VERS ", Proc: %" GF_PRI_RPC_PROC ") "
- "from trans (%s)", rpc_call_xid (&rpcmsg),
- rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg),
- rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg),
- trans->name);
- ret = -1;
- goto err;
- }
+ char *msgbuf = NULL;
+ struct rpc_msg rpcmsg;
+ struct iovec progmsg; /* RPC Program payload */
+ rpcsvc_request_t *req = NULL;
+ size_t msglen = 0;
+ int ret = -1;
+ if (!svc || !trans || !svc->rxpool)
+ return NULL;
- /* If the error is not RPC_MISMATCH, we consider the call as accepted
- * since we are not handling authentication failures for now.
+ /* We need to allocate the request before actually calling
+ * rpcsvc_request_init on the request so that we, can fill the auth
+ * data directly into the request structure from the message iobuf.
+ * This avoids a need to keep a temp buffer into which the auth data
+ * would've been copied otherwise.
+ */
+ rpcsvc_alloc_request(svc, req);
+ if (!req) {
+ goto err;
+ }
+
+ msgbuf = msg->vector[0].iov_base;
+ msglen = msg->vector[0].iov_len;
+
+ ret = xdr_to_rpc_call(msgbuf, msglen, &rpcmsg, &progmsg, req->cred.authdata,
+ req->verf.authdata);
+
+ if (ret == -1) {
+ gf_log(GF_RPCSVC, GF_LOG_WARNING, "RPC call decoding failed");
+ rpcsvc_request_seterr(req, GARBAGE_ARGS);
+ req->trans = rpc_transport_ref(trans);
+ req->svc = svc;
+ goto err;
+ }
+
+ ret = -1;
+ rpcsvc_request_init(svc, trans, &rpcmsg, progmsg, msg, req);
+
+ gf_log(GF_RPCSVC, GF_LOG_TRACE,
+ "received rpc-message "
+ "(XID: 0x%" GF_PRI_RPC_XID ", Ver: %" GF_PRI_RPC_VERSION
+ ", Program: %" GF_PRI_RPC_PROG_ID
+ ", "
+ "ProgVers: %" GF_PRI_RPC_PROG_VERS ", Proc: %" GF_PRI_RPC_PROC
+ ") "
+ "from rpc-transport (%s)",
+ rpc_call_xid(&rpcmsg), rpc_call_rpcvers(&rpcmsg),
+ rpc_call_program(&rpcmsg), rpc_call_progver(&rpcmsg),
+ rpc_call_progproc(&rpcmsg), trans->name);
+
+ /* We just received a new request from the wire. Account for
+ it in the outsanding request counter to make sure we don't
+ ingest too many concurrent requests from the same client.
+ */
+ if (req->prognum) // Only for initialized requests
+ ret = rpcsvc_request_outstanding(req, +1);
+
+ if (rpc_call_rpcvers(&rpcmsg) != 2) {
+ /* LOG- TODO: print rpc version, also print the peerinfo
+ from transport */
+ gf_log(GF_RPCSVC, GF_LOG_ERROR,
+ "RPC version not supported "
+ "(XID: 0x%" GF_PRI_RPC_XID ", Ver: %" GF_PRI_RPC_VERSION
+ ", Program: %" GF_PRI_RPC_PROG_ID
+ ", "
+ "ProgVers: %" GF_PRI_RPC_PROG_VERS ", Proc: %" GF_PRI_RPC_PROC
+ ") "
+ "from trans (%s)",
+ rpc_call_xid(&rpcmsg), rpc_call_rpcvers(&rpcmsg),
+ rpc_call_program(&rpcmsg), rpc_call_progver(&rpcmsg),
+ rpc_call_progproc(&rpcmsg), trans->name);
+ rpcsvc_request_seterr(req, RPC_MISMATCH);
+ goto err;
+ }
+
+ ret = rpcsvc_authenticate(req);
+ if (ret == RPCSVC_AUTH_REJECT) {
+ /* No need to set auth_err, that is the responsibility of
+ * the authentication handler since only that know what exact
+ * error happened.
*/
- req->rpc_status = MSG_ACCEPTED;
- req->reply = NULL;
- ret = 0;
+ rpcsvc_request_seterr(req, AUTH_ERROR);
+ gf_log(GF_RPCSVC, GF_LOG_ERROR,
+ "auth failed on request. "
+ "(XID: 0x%" GF_PRI_RPC_XID ", Ver: %" GF_PRI_RPC_VERSION
+ ", Program: %" GF_PRI_RPC_PROG_ID
+ ", "
+ "ProgVers: %" GF_PRI_RPC_PROG_VERS ", Proc: %" GF_PRI_RPC_PROC
+ ") "
+ "from trans (%s)",
+ rpc_call_xid(&rpcmsg), rpc_call_rpcvers(&rpcmsg),
+ rpc_call_program(&rpcmsg), rpc_call_progver(&rpcmsg),
+ rpc_call_progproc(&rpcmsg), trans->name);
+ ret = -1;
+ goto err;
+ }
+
+ /* If the error is not RPC_MISMATCH, we consider the call as accepted
+ * since we are not handling authentication failures for now.
+ */
+ req->rpc_status = MSG_ACCEPTED;
+ req->reply = NULL;
+ ret = 0;
err:
- if (ret == -1) {
- ret = rpcsvc_error_reply (req);
- if (ret)
- gf_log ("rpcsvc", GF_LOG_WARNING,
- "failed to queue error reply");
- req = NULL;
- }
+ if (ret == -1) {
+ ret = rpcsvc_error_reply(req);
+ if (ret)
+ gf_log("rpcsvc", GF_LOG_WARNING, "failed to queue error reply");
+ req = NULL;
+ }
- return req;
+ return req;
}
-
int
-rpcsvc_check_and_reply_error (int ret, call_frame_t *frame, void *opaque)
+rpcsvc_check_and_reply_error(int ret, call_frame_t *frame, void *opaque)
{
- rpcsvc_request_t *req = NULL;
+ rpcsvc_request_t *req = NULL;
+
+ req = opaque;
- req = opaque;
+ if (ret)
+ gf_log("rpcsvc", GF_LOG_ERROR,
+ "rpc actor (%d:%d:%d) failed to complete successfully",
+ req->prognum, req->progver, req->procnum);
+ if (ret == RPCSVC_ACTOR_ERROR) {
+ ret = rpcsvc_error_reply(req);
if (ret)
- gf_log ("rpcsvc", GF_LOG_ERROR,
- "rpc actor (%d:%d:%d) failed to complete successfully",
- req->prognum, req->progver, req->procnum);
-
- if (ret == RPCSVC_ACTOR_ERROR) {
- ret = rpcsvc_error_reply (req);
- if (ret)
- gf_log ("rpcsvc", GF_LOG_WARNING,
- "failed to queue error reply");
- }
+ gf_log("rpcsvc", GF_LOG_WARNING, "failed to queue error reply");
+ }
- return 0;
+ return 0;
}
int
-rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
- rpc_transport_pollin_t *msg)
-{
- rpcsvc_actor_t *actor = NULL;
- rpcsvc_actor actor_fn = NULL;
- rpcsvc_request_t *req = NULL;
- int ret = -1;
- uint16_t port = 0;
- gf_boolean_t is_unix = _gf_false, empty = _gf_false;
- gf_boolean_t unprivileged = _gf_false;
- drc_cached_op_t *reply = NULL;
- rpcsvc_drc_globals_t *drc = NULL;
-
- if (!trans || !svc)
- return -1;
-
- switch (trans->peerinfo.sockaddr.ss_family) {
+rpcsvc_handle_rpc_call(rpcsvc_t *svc, rpc_transport_t *trans,
+ rpc_transport_pollin_t *msg)
+{
+ rpcsvc_actor_t *actor = NULL;
+ rpcsvc_actor actor_fn = NULL;
+ rpcsvc_request_t *req = NULL;
+ int ret = -1;
+ uint16_t port = 0;
+ gf_boolean_t is_unix = _gf_false, empty = _gf_false;
+ gf_boolean_t unprivileged = _gf_false;
+ drc_cached_op_t *reply = NULL;
+ rpcsvc_drc_globals_t *drc = NULL;
+
+ if (!trans || !svc)
+ return -1;
+
+ switch (trans->peerinfo.sockaddr.ss_family) {
case AF_INET:
- port = ((struct sockaddr_in *)&trans->peerinfo.sockaddr)->sin_port;
- break;
+ port = ((struct sockaddr_in *)&trans->peerinfo.sockaddr)->sin_port;
+ break;
case AF_INET6:
- port = ((struct sockaddr_in6 *)&trans->peerinfo.sockaddr)->sin6_port;
- break;
+ port = ((struct sockaddr_in6 *)&trans->peerinfo.sockaddr)
+ ->sin6_port;
+ break;
case AF_UNIX:
- is_unix = _gf_true;
- break;
+ is_unix = _gf_true;
+ break;
default:
- gf_log (GF_RPCSVC, GF_LOG_ERROR,
- "invalid address family (%d)",
- trans->peerinfo.sockaddr.ss_family);
- return -1;
- }
+ gf_log(GF_RPCSVC, GF_LOG_ERROR, "invalid address family (%d)",
+ trans->peerinfo.sockaddr.ss_family);
+ return -1;
+ }
+ if (is_unix == _gf_false) {
+ port = ntohs(port);
+ gf_log("rpcsvc", GF_LOG_TRACE, "Client port: %d", (int)port);
- if (is_unix == _gf_false) {
- port = ntohs (port);
+ if (port >= 1024)
+ unprivileged = _gf_true;
+ }
- gf_log ("rpcsvc", GF_LOG_TRACE, "Client port: %d", (int)port);
+ req = rpcsvc_request_create(svc, trans, msg);
+ if (!req)
+ goto out;
- if (port >= 1024)
- unprivileged = _gf_true;
- }
+ if (!rpcsvc_request_accepted(req))
+ goto err_reply;
+
+ actor = rpcsvc_program_actor(req);
+ if (!actor)
+ goto err_reply;
+
+ if (0 == svc->allow_insecure && unprivileged && !actor->unprivileged) {
+ /* Non-privileged user, fail request */
+ gf_log(GF_RPCSVC, GF_LOG_ERROR,
+ "Request received from non-"
+ "privileged port. Failing request for %s.",
+ req->trans->peerinfo.identifier);
+ req->rpc_status = MSG_DENIED;
+ req->rpc_err = AUTH_ERROR;
+ req->auth_err = RPCSVC_AUTH_REJECT;
+ goto err_reply;
+ }
- req = rpcsvc_request_create (svc, trans, msg);
- if (!req)
+ /* DRC */
+ if (rpcsvc_need_drc(req)) {
+ drc = req->svc->drc;
+
+ LOCK(&drc->lock);
+ {
+ reply = rpcsvc_drc_lookup(req);
+
+ /* retransmission of completed request, send cached reply */
+ if (reply && reply->state == DRC_OP_CACHED) {
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "duplicate request:"
+ " XID: 0x%x",
+ req->xid);
+ ret = rpcsvc_send_cached_reply(req, reply);
+ drc->cache_hits++;
+ UNLOCK(&drc->lock);
+ goto out;
+
+ } /* retransmitted request, original op in transit, drop it */
+ else if (reply && reply->state == DRC_OP_IN_TRANSIT) {
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "op in transit,"
+ " discarding. XID: 0x%x",
+ req->xid);
+ ret = 0;
+ drc->intransit_hits++;
+ rpcsvc_request_destroy(req);
+ UNLOCK(&drc->lock);
goto out;
- if (!rpcsvc_request_accepted (req))
- goto err_reply;
-
- actor = rpcsvc_program_actor (req);
- if (!actor)
- goto err_reply;
-
- if (0 == svc->allow_insecure && unprivileged && !actor->unprivileged) {
- /* Non-privileged user, fail request */
- gf_log (GF_RPCSVC, GF_LOG_ERROR,
- "Request received from non-"
- "privileged port. Failing request for %s.",
- req->trans->peerinfo.identifier);
- req->rpc_status = MSG_DENIED;
- req->rpc_err = AUTH_ERROR;
- req->auth_err = RPCSVC_AUTH_REJECT;
- goto err_reply;
+ } /* fresh request, cache it as in-transit and proceed */
+ else {
+ ret = rpcsvc_cache_request(req);
+ }
}
+ UNLOCK(&drc->lock);
+ }
- /* DRC */
- if (rpcsvc_need_drc (req)) {
- drc = req->svc->drc;
-
- LOCK (&drc->lock);
- {
- reply = rpcsvc_drc_lookup (req);
-
- /* retransmission of completed request, send cached reply */
- if (reply && reply->state == DRC_OP_CACHED) {
- gf_log (GF_RPCSVC, GF_LOG_INFO, "duplicate request:"
- " XID: 0x%x", req->xid);
- ret = rpcsvc_send_cached_reply (req, reply);
- drc->cache_hits++;
- UNLOCK (&drc->lock);
- goto out;
-
- } /* retransmitted request, original op in transit, drop it */
- else if (reply && reply->state == DRC_OP_IN_TRANSIT) {
- gf_log (GF_RPCSVC, GF_LOG_INFO, "op in transit,"
- " discarding. XID: 0x%x", req->xid);
- ret = 0;
- drc->intransit_hits++;
- rpcsvc_request_destroy (req);
- UNLOCK (&drc->lock);
- goto out;
-
- } /* fresh request, cache it as in-transit and proceed */
- else {
- ret = rpcsvc_cache_request (req);
- }
- }
- UNLOCK (&drc->lock);
+ if (req->rpc_err == SUCCESS) {
+ /* Before going to xlator code, set the THIS properly */
+ THIS = svc->xl;
+
+ actor_fn = actor->actor;
+
+ if (!actor_fn) {
+ rpcsvc_request_seterr(req, PROC_UNAVAIL);
+ /* LOG TODO: print more info about procnum,
+ prognum etc, also print transport info */
+ gf_log(GF_RPCSVC, GF_LOG_ERROR, "No vectored handler present");
+ ret = RPCSVC_ACTOR_ERROR;
+ goto err_reply;
}
- if (req->rpc_err == SUCCESS) {
- /* Before going to xlator code, set the THIS properly */
- THIS = svc->xl;
+ if (req->synctask) {
+ ret = synctask_new(THIS->ctx->env, (synctask_fn_t)actor_fn,
+ rpcsvc_check_and_reply_error, NULL, req);
+ } else if (req->ownthread) {
+ pthread_mutex_lock(&req->prog->queue_lock);
+ {
+ empty = list_empty(&req->prog->request_queue);
- actor_fn = actor->actor;
+ list_add_tail(&req->request_list, &req->prog->request_queue);
- if (!actor_fn) {
- rpcsvc_request_seterr (req, PROC_UNAVAIL);
- /* LOG TODO: print more info about procnum,
- prognum etc, also print transport info */
- gf_log (GF_RPCSVC, GF_LOG_ERROR,
- "No vectored handler present");
- ret = RPCSVC_ACTOR_ERROR;
- goto err_reply;
- }
+ if (empty)
+ pthread_cond_signal(&req->prog->queue_cond);
+ }
+ pthread_mutex_unlock(&req->prog->queue_lock);
- if (req->synctask) {
- ret = synctask_new (THIS->ctx->env,
- (synctask_fn_t) actor_fn,
- rpcsvc_check_and_reply_error, NULL,
- req);
- } else if (req->ownthread) {
- pthread_mutex_lock (&req->prog->queue_lock);
- {
- empty = list_empty (&req->prog->request_queue);
-
- list_add_tail (&req->request_list,
- &req->prog->request_queue);
-
- if (empty)
- pthread_cond_signal (&req->prog->queue_cond);
- }
- pthread_mutex_unlock (&req->prog->queue_lock);
-
- ret = 0;
- } else {
- ret = actor_fn (req);
- }
+ ret = 0;
+ } else {
+ ret = actor_fn(req);
}
+ }
err_reply:
- ret = rpcsvc_check_and_reply_error (ret, NULL, req);
- /* No need to propagate error beyond this function since the reply
- * has now been queued. */
- ret = 0;
+ ret = rpcsvc_check_and_reply_error(ret, NULL, req);
+ /* No need to propagate error beyond this function since the reply
+ * has now been queued. */
+ ret = 0;
out:
- return ret;
+ return ret;
}
-
int
-rpcsvc_handle_disconnect (rpcsvc_t *svc, rpc_transport_t *trans)
+rpcsvc_handle_disconnect(rpcsvc_t *svc, rpc_transport_t *trans)
{
- rpcsvc_event_t event;
- rpcsvc_notify_wrapper_t *wrappers = NULL, *wrapper;
- int32_t ret = -1, i = 0, wrapper_count = 0;
- rpcsvc_listener_t *listener = NULL;
-
- event = (trans->listener == NULL) ? RPCSVC_EVENT_LISTENER_DEAD
- : RPCSVC_EVENT_DISCONNECT;
+ rpcsvc_event_t event;
+ rpcsvc_notify_wrapper_t *wrappers = NULL, *wrapper;
+ int32_t ret = -1, i = 0, wrapper_count = 0;
+ rpcsvc_listener_t *listener = NULL;
- pthread_rwlock_rdlock (&svc->rpclock);
- {
- if (!svc->notify_count)
- goto unlock;
+ event = (trans->listener == NULL) ? RPCSVC_EVENT_LISTENER_DEAD
+ : RPCSVC_EVENT_DISCONNECT;
- wrappers = GF_CALLOC (svc->notify_count, sizeof (*wrapper),
- gf_common_mt_rpcsvc_wrapper_t);
- if (!wrappers) {
- goto unlock;
- }
+ pthread_rwlock_rdlock(&svc->rpclock);
+ {
+ if (!svc->notify_count)
+ goto unlock;
- list_for_each_entry (wrapper, &svc->notify, list) {
- if (wrapper->notify) {
- wrappers[i++] = *wrapper;
- }
- }
+ wrappers = GF_CALLOC(svc->notify_count, sizeof(*wrapper),
+ gf_common_mt_rpcsvc_wrapper_t);
+ if (!wrappers) {
+ goto unlock;
+ }
- wrapper_count = i;
+ list_for_each_entry(wrapper, &svc->notify, list)
+ {
+ if (wrapper->notify) {
+ wrappers[i++] = *wrapper;
+ }
}
-unlock:
- pthread_rwlock_unlock (&svc->rpclock);
- if (wrappers) {
- for (i = 0; i < wrapper_count; i++) {
- wrappers[i].notify (svc, wrappers[i].data,
- event, trans);
- }
+ wrapper_count = i;
+ }
+unlock:
+ pthread_rwlock_unlock(&svc->rpclock);
- GF_FREE (wrappers);
+ if (wrappers) {
+ for (i = 0; i < wrapper_count; i++) {
+ wrappers[i].notify(svc, wrappers[i].data, event, trans);
}
- if (event == RPCSVC_EVENT_LISTENER_DEAD) {
- listener = rpcsvc_get_listener (svc, -1, trans->listener);
- rpcsvc_listener_destroy (listener);
- }
+ GF_FREE(wrappers);
+ }
- return ret;
-}
+ if (event == RPCSVC_EVENT_LISTENER_DEAD) {
+ listener = rpcsvc_get_listener(svc, -1, trans->listener);
+ rpcsvc_listener_destroy(listener);
+ }
+ return ret;
+}
int
-rpcsvc_notify (rpc_transport_t *trans, void *mydata,
- rpc_transport_event_t event, void *data, ...)
+rpcsvc_notify(rpc_transport_t *trans, void *mydata, rpc_transport_event_t event,
+ void *data, ...)
{
- int ret = -1;
- rpc_transport_pollin_t *msg = NULL;
- rpc_transport_t *new_trans = NULL;
- rpcsvc_t *svc = NULL;
- rpcsvc_listener_t *listener = NULL;
+ int ret = -1;
+ rpc_transport_pollin_t *msg = NULL;
+ rpc_transport_t *new_trans = NULL;
+ rpcsvc_t *svc = NULL;
+ rpcsvc_listener_t *listener = NULL;
- svc = mydata;
- if (svc == NULL) {
- goto out;
- }
+ svc = mydata;
+ if (svc == NULL) {
+ goto out;
+ }
- switch (event) {
+ switch (event) {
case RPC_TRANSPORT_ACCEPT:
- new_trans = data;
- ret = rpcsvc_accept (svc, trans, new_trans);
- break;
+ new_trans = data;
+ ret = rpcsvc_accept(svc, trans, new_trans);
+ break;
case RPC_TRANSPORT_DISCONNECT:
- ret = rpcsvc_handle_disconnect (svc, trans);
- break;
+ ret = rpcsvc_handle_disconnect(svc, trans);
+ break;
case RPC_TRANSPORT_MSG_RECEIVED:
- msg = data;
- ret = rpcsvc_handle_rpc_call (svc, trans, msg);
- break;
+ msg = data;
+ ret = rpcsvc_handle_rpc_call(svc, trans, msg);
+ break;
case RPC_TRANSPORT_MSG_SENT:
- ret = 0;
- break;
+ ret = 0;
+ break;
case RPC_TRANSPORT_CONNECT:
- /* do nothing, no need for rpcsvc to handle this, client should
- * handle this event
- */
- /* print info about transport too : LOG TODO */
- gf_log ("rpcsvc", GF_LOG_CRITICAL,
- "got CONNECT event, which should have not come");
- ret = 0;
- break;
+ /* do nothing, no need for rpcsvc to handle this, client should
+ * handle this event
+ */
+ /* print info about transport too : LOG TODO */
+ gf_log("rpcsvc", GF_LOG_CRITICAL,
+ "got CONNECT event, which should have not come");
+ ret = 0;
+ break;
case RPC_TRANSPORT_CLEANUP:
- listener = rpcsvc_get_listener (svc, -1, trans->listener);
- if (listener == NULL) {
- goto out;
- }
+ listener = rpcsvc_get_listener(svc, -1, trans->listener);
+ if (listener == NULL) {
+ goto out;
+ }
- rpcsvc_program_notify (listener, RPCSVC_EVENT_TRANSPORT_DESTROY,
- trans);
- ret = 0;
- break;
+ rpcsvc_program_notify(listener, RPCSVC_EVENT_TRANSPORT_DESTROY,
+ trans);
+ ret = 0;
+ break;
case RPC_TRANSPORT_MAP_XID_REQUEST:
- /* FIXME: think about this later */
- gf_log ("rpcsvc", GF_LOG_CRITICAL,
- "got MAP_XID event, which should have not come");
- ret = 0;
- break;
- }
+ /* FIXME: think about this later */
+ gf_log("rpcsvc", GF_LOG_CRITICAL,
+ "got MAP_XID event, which should have not come");
+ ret = 0;
+ break;
+ }
out:
- return ret;
+ return ret;
}
-
/* Given the RPC reply structure and the payload handed by the RPC program,
* encode the RPC record header into the buffer pointed by recordstart.
*/
struct iovec
-rpcsvc_record_build_header (char *recordstart, size_t rlen,
- struct rpc_msg reply, size_t payload)
-{
- struct iovec replyhdr;
- struct iovec txrecord = {0, 0};
- size_t fraglen = 0;
- int ret = -1;
-
- /* After leaving aside the 4 bytes for the fragment header, lets
- * encode the RPC reply structure into the buffer given to us.
- */
- ret = rpc_reply_to_xdr (&reply, recordstart, rlen, &replyhdr);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_WARNING, "Failed to create RPC reply");
- goto err;
- }
-
- fraglen = payload + replyhdr.iov_len;
- gf_log (GF_RPCSVC, GF_LOG_TRACE, "Reply fraglen %zu, payload: %zu, "
- "rpc hdr: %zu", fraglen, payload, replyhdr.iov_len);
-
- txrecord.iov_base = recordstart;
-
- /* Remember, this is only the vec for the RPC header and does not
- * include the payload above. We needed the payload only to calculate
- * the size of the full fragment. This size is sent in the fragment
- * header.
- */
- txrecord.iov_len = replyhdr.iov_len;
+rpcsvc_record_build_header(char *recordstart, size_t rlen, struct rpc_msg reply,
+ size_t payload)
+{
+ struct iovec replyhdr;
+ struct iovec txrecord = {0, 0};
+ size_t fraglen = 0;
+ int ret = -1;
+
+ /* After leaving aside the 4 bytes for the fragment header, lets
+ * encode the RPC reply structure into the buffer given to us.
+ */
+ ret = rpc_reply_to_xdr(&reply, recordstart, rlen, &replyhdr);
+ if (ret == -1) {
+ gf_log(GF_RPCSVC, GF_LOG_WARNING, "Failed to create RPC reply");
+ goto err;
+ }
+
+ fraglen = payload + replyhdr.iov_len;
+ gf_log(GF_RPCSVC, GF_LOG_TRACE,
+ "Reply fraglen %zu, payload: %zu, "
+ "rpc hdr: %zu",
+ fraglen, payload, replyhdr.iov_len);
+
+ txrecord.iov_base = recordstart;
+
+ /* Remember, this is only the vec for the RPC header and does not
+ * include the payload above. We needed the payload only to calculate
+ * the size of the full fragment. This size is sent in the fragment
+ * header.
+ */
+ txrecord.iov_len = replyhdr.iov_len;
err:
- return txrecord;
+ return txrecord;
}
static uint32_t
-rpc_callback_new_callid (struct rpc_transport *trans)
+rpc_callback_new_callid(struct rpc_transport *trans)
{
- uint32_t callid = 0;
+ uint32_t callid = 0;
- pthread_mutex_lock (&trans->lock);
- {
- callid = ++trans->xid;
- }
- pthread_mutex_unlock (&trans->lock);
+ pthread_mutex_lock(&trans->lock);
+ {
+ callid = ++trans->xid;
+ }
+ pthread_mutex_unlock(&trans->lock);
- return callid;
+ return callid;
}
int
-rpcsvc_fill_callback (int prognum, int progver, int procnum, int payload,
- uint32_t xid, struct rpc_msg *request)
+rpcsvc_fill_callback(int prognum, int progver, int procnum, int payload,
+ uint32_t xid, struct rpc_msg *request)
{
- int ret = -1;
+ int ret = -1;
- if (!request) {
- goto out;
- }
+ if (!request) {
+ goto out;
+ }
- memset (request, 0, sizeof (*request));
+ memset(request, 0, sizeof(*request));
- request->rm_xid = xid;
- request->rm_direction = CALL;
+ request->rm_xid = xid;
+ request->rm_direction = CALL;
- request->rm_call.cb_rpcvers = 2;
- request->rm_call.cb_prog = prognum;
- request->rm_call.cb_vers = progver;
- request->rm_call.cb_proc = procnum;
+ request->rm_call.cb_rpcvers = 2;
+ request->rm_call.cb_prog = prognum;
+ request->rm_call.cb_vers = progver;
+ request->rm_call.cb_proc = procnum;
- request->rm_call.cb_cred.oa_flavor = AUTH_NONE;
- request->rm_call.cb_cred.oa_base = NULL;
- request->rm_call.cb_cred.oa_length = 0;
+ request->rm_call.cb_cred.oa_flavor = AUTH_NONE;
+ request->rm_call.cb_cred.oa_base = NULL;
+ request->rm_call.cb_cred.oa_length = 0;
- request->rm_call.cb_verf.oa_flavor = AUTH_NONE;
- request->rm_call.cb_verf.oa_base = NULL;
- request->rm_call.cb_verf.oa_length = 0;
+ request->rm_call.cb_verf.oa_flavor = AUTH_NONE;
+ request->rm_call.cb_verf.oa_base = NULL;
+ request->rm_call.cb_verf.oa_length = 0;
- ret = 0;
+ ret = 0;
out:
- return ret;
+ return ret;
}
-
struct iovec
-rpcsvc_callback_build_header (char *recordstart, size_t rlen,
+rpcsvc_callback_build_header(char *recordstart, size_t rlen,
struct rpc_msg *request, size_t payload)
{
- struct iovec requesthdr = {0, };
- struct iovec txrecord = {0, 0};
- int ret = -1;
- size_t fraglen = 0;
-
- ret = rpc_request_to_xdr (request, recordstart, rlen, &requesthdr);
- if (ret == -1) {
- gf_log ("rpcsvc", GF_LOG_WARNING,
- "Failed to create RPC request");
- goto out;
- }
-
- fraglen = payload + requesthdr.iov_len;
- gf_log ("rpcsvc", GF_LOG_TRACE, "Request fraglen %zu, payload: %zu, "
- "rpc hdr: %zu", fraglen, payload, requesthdr.iov_len);
-
- txrecord.iov_base = recordstart;
-
- /* Remember, this is only the vec for the RPC header and does not
- * include the payload above. We needed the payload only to calculate
- * the size of the full fragment. This size is sent in the fragment
- * header.
- */
- txrecord.iov_len = requesthdr.iov_len;
+ struct iovec requesthdr = {
+ 0,
+ };
+ struct iovec txrecord = {0, 0};
+ int ret = -1;
+ size_t fraglen = 0;
+
+ ret = rpc_request_to_xdr(request, recordstart, rlen, &requesthdr);
+ if (ret == -1) {
+ gf_log("rpcsvc", GF_LOG_WARNING, "Failed to create RPC request");
+ goto out;
+ }
+
+ fraglen = payload + requesthdr.iov_len;
+ gf_log("rpcsvc", GF_LOG_TRACE,
+ "Request fraglen %zu, payload: %zu, "
+ "rpc hdr: %zu",
+ fraglen, payload, requesthdr.iov_len);
+
+ txrecord.iov_base = recordstart;
+
+ /* Remember, this is only the vec for the RPC header and does not
+ * include the payload above. We needed the payload only to calculate
+ * the size of the full fragment. This size is sent in the fragment
+ * header.
+ */
+ txrecord.iov_len = requesthdr.iov_len;
out:
- return txrecord;
+ return txrecord;
}
static struct iobuf *
-rpcsvc_callback_build_record (rpcsvc_t *rpc, int prognum, int progver,
- int procnum, size_t payload, u_long xid,
- struct iovec *recbuf)
-{
- struct rpc_msg request = {0, };
- struct iobuf *request_iob = NULL;
- char *record = NULL;
- struct iovec recordhdr = {0, };
- size_t pagesize = 0;
- size_t xdr_size = 0;
- int ret = -1;
-
- if ((!rpc) || (!recbuf)) {
- goto out;
- }
-
- /* Fill the rpc structure and XDR it into the buffer got above. */
- ret = rpcsvc_fill_callback (prognum, progver, procnum, payload, xid,
- &request);
- if (ret == -1) {
- gf_log ("rpcsvc", GF_LOG_WARNING, "cannot build a rpc-request "
- "xid (%lu)", xid);
- goto out;
- }
-
- /* First, try to get a pointer into the buffer which the RPC
- * layer can use.
- */
- xdr_size = xdr_sizeof ((xdrproc_t)xdr_callmsg, &request);
-
- request_iob = iobuf_get2 (rpc->ctx->iobuf_pool, (xdr_size + payload));
- if (!request_iob) {
- goto out;
- }
-
- pagesize = iobuf_pagesize (request_iob);
-
- record = iobuf_ptr (request_iob); /* Now we have it. */
-
- recordhdr = rpcsvc_callback_build_header (record, pagesize, &request,
- payload);
-
- if (!recordhdr.iov_base) {
- gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed to build record "
- " header");
- iobuf_unref (request_iob);
- request_iob = NULL;
- recbuf->iov_base = NULL;
- goto out;
- }
-
- recbuf->iov_base = recordhdr.iov_base;
- recbuf->iov_len = recordhdr.iov_len;
+rpcsvc_callback_build_record(rpcsvc_t *rpc, int prognum, int progver,
+ int procnum, size_t payload, u_long xid,
+ struct iovec *recbuf)
+{
+ struct rpc_msg request = {
+ 0,
+ };
+ struct iobuf *request_iob = NULL;
+ char *record = NULL;
+ struct iovec recordhdr = {
+ 0,
+ };
+ size_t pagesize = 0;
+ size_t xdr_size = 0;
+ int ret = -1;
+
+ if ((!rpc) || (!recbuf)) {
+ goto out;
+ }
+
+ /* Fill the rpc structure and XDR it into the buffer got above. */
+ ret = rpcsvc_fill_callback(prognum, progver, procnum, payload, xid,
+ &request);
+ if (ret == -1) {
+ gf_log("rpcsvc", GF_LOG_WARNING,
+ "cannot build a rpc-request "
+ "xid (%lu)",
+ xid);
+ goto out;
+ }
+
+ /* First, try to get a pointer into the buffer which the RPC
+ * layer can use.
+ */
+ xdr_size = xdr_sizeof((xdrproc_t)xdr_callmsg, &request);
+
+ request_iob = iobuf_get2(rpc->ctx->iobuf_pool, (xdr_size + payload));
+ if (!request_iob) {
+ goto out;
+ }
+
+ pagesize = iobuf_pagesize(request_iob);
+
+ record = iobuf_ptr(request_iob); /* Now we have it. */
+
+ recordhdr = rpcsvc_callback_build_header(record, pagesize, &request,
+ payload);
+
+ if (!recordhdr.iov_base) {
+ gf_log("rpc-clnt", GF_LOG_ERROR,
+ "Failed to build record "
+ " header");
+ iobuf_unref(request_iob);
+ request_iob = NULL;
+ recbuf->iov_base = NULL;
+ goto out;
+ }
+
+ recbuf->iov_base = recordhdr.iov_base;
+ recbuf->iov_len = recordhdr.iov_len;
out:
- return request_iob;
+ return request_iob;
}
-int rpcsvc_request_submit (rpcsvc_t *rpc, rpc_transport_t *trans,
- rpcsvc_cbk_program_t *prog, int procnum,
- void *req, glusterfs_ctx_t *ctx,
- xdrproc_t xdrproc)
-{
- int ret = -1;
- int count = 0;
- struct iovec iov = {0, };
- struct iobuf *iobuf = NULL;
- ssize_t xdr_size = 0;
- struct iobref *iobref = NULL;
-
- if (!req)
- goto out;
-
- xdr_size = xdr_sizeof (xdrproc, req);
-
- iobuf = iobuf_get2 (ctx->iobuf_pool, xdr_size);
- if (!iobuf)
- goto out;
-
- iov.iov_base = iobuf->ptr;
- iov.iov_len = iobuf_pagesize (iobuf);
-
- ret = xdr_serialize_generic (iov, req, xdrproc);
- if (ret == -1) {
- gf_log (THIS->name, GF_LOG_WARNING,
- "failed to create XDR payload");
- goto out;
- }
- iov.iov_len = ret;
- count = 1;
-
- iobref = iobref_new ();
- if (!iobref) {
- ret = -1;
- gf_log ("rpcsvc", GF_LOG_WARNING, "Failed to create iobref");
- goto out;
- }
+int
+rpcsvc_request_submit(rpcsvc_t *rpc, rpc_transport_t *trans,
+ rpcsvc_cbk_program_t *prog, int procnum, void *req,
+ glusterfs_ctx_t *ctx, xdrproc_t xdrproc)
+{
+ int ret = -1;
+ int count = 0;
+ struct iovec iov = {
+ 0,
+ };
+ struct iobuf *iobuf = NULL;
+ ssize_t xdr_size = 0;
+ struct iobref *iobref = NULL;
+
+ if (!req)
+ goto out;
+
+ xdr_size = xdr_sizeof(xdrproc, req);
+
+ iobuf = iobuf_get2(ctx->iobuf_pool, xdr_size);
+ if (!iobuf)
+ goto out;
+
+ iov.iov_base = iobuf->ptr;
+ iov.iov_len = iobuf_pagesize(iobuf);
+
+ ret = xdr_serialize_generic(iov, req, xdrproc);
+ if (ret == -1) {
+ gf_log(THIS->name, GF_LOG_WARNING, "failed to create XDR payload");
+ goto out;
+ }
+ iov.iov_len = ret;
+ count = 1;
+
+ iobref = iobref_new();
+ if (!iobref) {
+ ret = -1;
+ gf_log("rpcsvc", GF_LOG_WARNING, "Failed to create iobref");
+ goto out;
+ }
- iobref_add (iobref, iobuf);
+ iobref_add(iobref, iobuf);
- ret = rpcsvc_callback_submit (rpc, trans, prog, procnum,
- &iov, count, iobref);
+ ret = rpcsvc_callback_submit(rpc, trans, prog, procnum, &iov, count,
+ iobref);
out:
- if (iobuf)
- iobuf_unref (iobuf);
+ if (iobuf)
+ iobuf_unref(iobuf);
- if (iobref)
- iobref_unref (iobref);
+ if (iobref)
+ iobref_unref(iobref);
- return ret;
+ return ret;
}
int
-rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans,
- rpcsvc_cbk_program_t *prog, int procnum,
- struct iovec *proghdr, int proghdrcount,
- struct iobref *iobref)
-{
- struct iobuf *request_iob = NULL;
- struct iovec rpchdr = {0,};
- rpc_transport_req_t req;
- int ret = -1;
- int proglen = 0;
- uint32_t xid = 0;
- gf_boolean_t new_iobref = _gf_false;
-
- if (!rpc) {
- goto out;
- }
-
- memset (&req, 0, sizeof (req));
-
- if (proghdr) {
- proglen += iov_length (proghdr, proghdrcount);
- }
-
- xid = rpc_callback_new_callid (trans);
-
- request_iob = rpcsvc_callback_build_record (rpc, prog->prognum,
- prog->progver, procnum,
- proglen, xid, &rpchdr);
- if (!request_iob) {
- gf_log ("rpcsvc", GF_LOG_WARNING,
- "cannot build rpc-record");
- goto out;
- }
+rpcsvc_callback_submit(rpcsvc_t *rpc, rpc_transport_t *trans,
+ rpcsvc_cbk_program_t *prog, int procnum,
+ struct iovec *proghdr, int proghdrcount,
+ struct iobref *iobref)
+{
+ struct iobuf *request_iob = NULL;
+ struct iovec rpchdr = {
+ 0,
+ };
+ rpc_transport_req_t req;
+ int ret = -1;
+ int proglen = 0;
+ uint32_t xid = 0;
+ gf_boolean_t new_iobref = _gf_false;
+
+ if (!rpc) {
+ goto out;
+ }
+
+ memset(&req, 0, sizeof(req));
+
+ if (proghdr) {
+ proglen += iov_length(proghdr, proghdrcount);
+ }
+
+ xid = rpc_callback_new_callid(trans);
+
+ request_iob = rpcsvc_callback_build_record(
+ rpc, prog->prognum, prog->progver, procnum, proglen, xid, &rpchdr);
+ if (!request_iob) {
+ gf_log("rpcsvc", GF_LOG_WARNING, "cannot build rpc-record");
+ goto out;
+ }
+ if (!iobref) {
+ iobref = iobref_new();
if (!iobref) {
- iobref = iobref_new ();
- if (!iobref) {
- gf_log ("rpcsvc", GF_LOG_WARNING, "Failed to create iobref");
- goto out;
- }
- new_iobref = 1;
+ gf_log("rpcsvc", GF_LOG_WARNING, "Failed to create iobref");
+ goto out;
}
+ new_iobref = 1;
+ }
- iobref_add (iobref, request_iob);
+ iobref_add(iobref, request_iob);
- req.msg.rpchdr = &rpchdr;
- req.msg.rpchdrcount = 1;
- req.msg.proghdr = proghdr;
- req.msg.proghdrcount = proghdrcount;
- req.msg.iobref = iobref;
+ req.msg.rpchdr = &rpchdr;
+ req.msg.rpchdrcount = 1;
+ req.msg.proghdr = proghdr;
+ req.msg.proghdrcount = proghdrcount;
+ req.msg.iobref = iobref;
- ret = rpc_transport_submit_request (trans, &req);
- if (ret == -1) {
- gf_log ("rpcsvc", GF_LOG_WARNING,
- "transmission of rpc-request failed");
- goto out;
- }
+ ret = rpc_transport_submit_request(trans, &req);
+ if (ret == -1) {
+ gf_log("rpcsvc", GF_LOG_WARNING, "transmission of rpc-request failed");
+ goto out;
+ }
- ret = 0;
+ ret = 0;
out:
- iobuf_unref (request_iob);
+ iobuf_unref(request_iob);
- if (new_iobref)
- iobref_unref (iobref);
+ if (new_iobref)
+ iobref_unref(iobref);
- return ret;
+ return ret;
}
int
-rpcsvc_transport_submit (rpc_transport_t *trans, struct iovec *rpchdr,
- int rpchdrcount, struct iovec *proghdr,
- int proghdrcount, struct iovec *progpayload,
- int progpayloadcount, struct iobref *iobref,
- void *priv)
-{
- int ret = -1;
- rpc_transport_reply_t reply = {{0, }};
-
- if ((!trans) || (!rpchdr) || (!rpchdr->iov_base)) {
- goto out;
- }
-
- reply.msg.rpchdr = rpchdr;
- reply.msg.rpchdrcount = rpchdrcount;
- reply.msg.proghdr = proghdr;
- reply.msg.proghdrcount = proghdrcount;
- reply.msg.progpayload = progpayload;
- reply.msg.progpayloadcount = progpayloadcount;
- reply.msg.iobref = iobref;
- reply.private = priv;
-
- ret = rpc_transport_submit_reply (trans, &reply);
+rpcsvc_transport_submit(rpc_transport_t *trans, struct iovec *rpchdr,
+ int rpchdrcount, struct iovec *proghdr,
+ int proghdrcount, struct iovec *progpayload,
+ int progpayloadcount, struct iobref *iobref, void *priv)
+{
+ int ret = -1;
+ rpc_transport_reply_t reply = {{
+ 0,
+ }};
+
+ if ((!trans) || (!rpchdr) || (!rpchdr->iov_base)) {
+ goto out;
+ }
+
+ reply.msg.rpchdr = rpchdr;
+ reply.msg.rpchdrcount = rpchdrcount;
+ reply.msg.proghdr = proghdr;
+ reply.msg.proghdrcount = proghdrcount;
+ reply.msg.progpayload = progpayload;
+ reply.msg.progpayloadcount = progpayloadcount;
+ reply.msg.iobref = iobref;
+ reply.private = priv;
+
+ ret = rpc_transport_submit_reply(trans, &reply);
out:
- return ret;
+ return ret;
}
-
int
-rpcsvc_fill_reply (rpcsvc_request_t *req, struct rpc_msg *reply)
-{
- int ret = -1;
- rpcsvc_program_t *prog = NULL;
- if ((!req) || (!reply))
- goto out;
-
- ret = 0;
- rpc_fill_empty_reply (reply, req->xid);
- if (req->rpc_status == MSG_DENIED) {
- rpc_fill_denied_reply (reply, req->rpc_err, req->auth_err);
- goto out;
- }
-
- prog = rpcsvc_request_program (req);
-
- if (req->rpc_status == MSG_ACCEPTED)
- rpc_fill_accepted_reply (reply, req->rpc_err,
- (prog) ? prog->proglowvers : 0,
- (prog) ? prog->proghighvers: 0,
- req->verf.flavour, req->verf.datalen,
- req->verf.authdata);
- else
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Invalid rpc_status value");
+rpcsvc_fill_reply(rpcsvc_request_t *req, struct rpc_msg *reply)
+{
+ int ret = -1;
+ rpcsvc_program_t *prog = NULL;
+ if ((!req) || (!reply))
+ goto out;
+
+ ret = 0;
+ rpc_fill_empty_reply(reply, req->xid);
+ if (req->rpc_status == MSG_DENIED) {
+ rpc_fill_denied_reply(reply, req->rpc_err, req->auth_err);
+ goto out;
+ }
+
+ prog = rpcsvc_request_program(req);
+
+ if (req->rpc_status == MSG_ACCEPTED)
+ rpc_fill_accepted_reply(
+ reply, req->rpc_err, (prog) ? prog->proglowvers : 0,
+ (prog) ? prog->proghighvers : 0, req->verf.flavour,
+ req->verf.datalen, req->verf.authdata);
+ else
+ gf_log(GF_RPCSVC, GF_LOG_ERROR, "Invalid rpc_status value");
out:
- return ret;
+ return ret;
}
-
/* Given a request and the reply payload, build a reply and encodes the reply
* into a record header. This record header is encoded into the vector pointed
* to be recbuf.
@@ -1224,59 +1225,60 @@ out:
* we should account for the length of that buffer in the RPC fragment header.
*/
struct iobuf *
-rpcsvc_record_build_record (rpcsvc_request_t *req, size_t payload,
- size_t hdrlen, struct iovec *recbuf)
-{
- struct rpc_msg reply;
- struct iobuf *replyiob = NULL;
- char *record = NULL;
- struct iovec recordhdr = {0, };
- size_t pagesize = 0;
- size_t xdr_size = 0;
- rpcsvc_t *svc = NULL;
- int ret = -1;
+rpcsvc_record_build_record(rpcsvc_request_t *req, size_t payload, size_t hdrlen,
+ struct iovec *recbuf)
+{
+ struct rpc_msg reply;
+ struct iobuf *replyiob = NULL;
+ char *record = NULL;
+ struct iovec recordhdr = {
+ 0,
+ };
+ size_t pagesize = 0;
+ size_t xdr_size = 0;
+ rpcsvc_t *svc = NULL;
+ int ret = -1;
+
+ if ((!req) || (!req->trans) || (!req->svc) || (!recbuf))
+ return NULL;
- if ((!req) || (!req->trans) || (!req->svc) || (!recbuf))
- return NULL;
+ svc = req->svc;
- svc = req->svc;
+ /* Fill the rpc structure and XDR it into the buffer got above. */
+ ret = rpcsvc_fill_reply(req, &reply);
+ if (ret)
+ goto err_exit;
- /* Fill the rpc structure and XDR it into the buffer got above. */
- ret = rpcsvc_fill_reply (req, &reply);
- if (ret)
- goto err_exit;
+ xdr_size = xdr_sizeof((xdrproc_t)xdr_replymsg, &reply);
- xdr_size = xdr_sizeof ((xdrproc_t)xdr_replymsg, &reply);
+ /* Payload would include 'readv' size etc too, where as
+ that comes as another payload iobuf */
+ replyiob = iobuf_get2(svc->ctx->iobuf_pool, (xdr_size + hdrlen));
+ if (!replyiob) {
+ goto err_exit;
+ }
- /* Payload would include 'readv' size etc too, where as
- that comes as another payload iobuf */
- replyiob = iobuf_get2 (svc->ctx->iobuf_pool, (xdr_size + hdrlen));
- if (!replyiob) {
- goto err_exit;
- }
-
- pagesize = iobuf_pagesize (replyiob);
+ pagesize = iobuf_pagesize(replyiob);
- record = iobuf_ptr (replyiob); /* Now we have it. */
+ record = iobuf_ptr(replyiob); /* Now we have it. */
- recordhdr = rpcsvc_record_build_header (record, pagesize, reply,
- payload);
- if (!recordhdr.iov_base) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to build record "
- " header");
- iobuf_unref (replyiob);
- replyiob = NULL;
- recbuf->iov_base = NULL;
- goto err_exit;
- }
+ recordhdr = rpcsvc_record_build_header(record, pagesize, reply, payload);
+ if (!recordhdr.iov_base) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR,
+ "Failed to build record "
+ " header");
+ iobuf_unref(replyiob);
+ replyiob = NULL;
+ recbuf->iov_base = NULL;
+ goto err_exit;
+ }
- recbuf->iov_base = recordhdr.iov_base;
- recbuf->iov_len = recordhdr.iov_len;
+ recbuf->iov_base = recordhdr.iov_base;
+ recbuf->iov_len = recordhdr.iov_len;
err_exit:
- return replyiob;
+ return replyiob;
}
-
/*
* The function to submit a program message to the RPC service.
* This message is added to the transmission queue of the
@@ -1304,331 +1306,336 @@ err_exit:
*/
int
-rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr,
- int hdrcount, struct iovec *payload, int payloadcount,
- struct iobref *iobref)
-{
- int ret = -1, i = 0;
- struct iobuf *replyiob = NULL;
- struct iovec recordhdr = {0, };
- rpc_transport_t *trans = NULL;
- size_t msglen = 0;
- size_t hdrlen = 0;
- char new_iobref = 0;
- rpcsvc_drc_globals_t *drc = NULL;
-
- if ((!req) || (!req->trans))
- return -1;
-
- trans = req->trans;
-
- for (i = 0; i < hdrcount; i++) {
- msglen += proghdr[i].iov_len;
- }
-
- for (i = 0; i < payloadcount; i++) {
- msglen += payload[i].iov_len;
- }
-
- gf_log (GF_RPCSVC, GF_LOG_TRACE, "Tx message: %zu", msglen);
-
- /* Build the buffer containing the encoded RPC reply. */
- replyiob = rpcsvc_record_build_record (req, msglen, hdrlen, &recordhdr);
- if (!replyiob) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR,"Reply record creation failed");
- goto disconnect_exit;
- }
-
+rpcsvc_submit_generic(rpcsvc_request_t *req, struct iovec *proghdr,
+ int hdrcount, struct iovec *payload, int payloadcount,
+ struct iobref *iobref)
+{
+ int ret = -1, i = 0;
+ struct iobuf *replyiob = NULL;
+ struct iovec recordhdr = {
+ 0,
+ };
+ rpc_transport_t *trans = NULL;
+ size_t msglen = 0;
+ size_t hdrlen = 0;
+ char new_iobref = 0;
+ rpcsvc_drc_globals_t *drc = NULL;
+
+ if ((!req) || (!req->trans))
+ return -1;
+
+ trans = req->trans;
+
+ for (i = 0; i < hdrcount; i++) {
+ msglen += proghdr[i].iov_len;
+ }
+
+ for (i = 0; i < payloadcount; i++) {
+ msglen += payload[i].iov_len;
+ }
+
+ gf_log(GF_RPCSVC, GF_LOG_TRACE, "Tx message: %zu", msglen);
+
+ /* Build the buffer containing the encoded RPC reply. */
+ replyiob = rpcsvc_record_build_record(req, msglen, hdrlen, &recordhdr);
+ if (!replyiob) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR, "Reply record creation failed");
+ goto disconnect_exit;
+ }
+
+ if (!iobref) {
+ iobref = iobref_new();
if (!iobref) {
- iobref = iobref_new ();
- if (!iobref) {
- goto disconnect_exit;
- }
-
- new_iobref = 1;
+ goto disconnect_exit;
}
- iobref_add (iobref, replyiob);
-
- /* cache the request in the duplicate request cache for appropriate ops */
- if ((req->reply) && (rpcsvc_need_drc (req))) {
- drc = req->svc->drc;
+ new_iobref = 1;
+ }
- LOCK (&drc->lock);
- ret = rpcsvc_cache_reply (req, iobref, &recordhdr, 1,
- proghdr, hdrcount,
- payload, payloadcount);
- UNLOCK (&drc->lock);
- if (ret < 0) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR,
- "failed to cache reply");
- }
- }
+ iobref_add(iobref, replyiob);
- ret = rpcsvc_transport_submit (trans, &recordhdr, 1, proghdr, hdrcount,
- payload, payloadcount, iobref,
- req->trans_private);
+ /* cache the request in the duplicate request cache for appropriate ops */
+ if ((req->reply) && (rpcsvc_need_drc(req))) {
+ drc = req->svc->drc;
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "failed to submit message "
- "(XID: 0x%x, Program: %s, ProgVers: %d, Proc: %d) to "
- "rpc-transport (%s)", req->xid,
- req->prog ? req->prog->progname : "(not matched)",
- req->prog ? req->prog->progver : 0,
- req->procnum, trans ? trans->name : "");
- } else {
- gf_log (GF_RPCSVC, GF_LOG_TRACE,
- "submitted reply for rpc-message (XID: 0x%x, "
- "Program: %s, ProgVers: %d, Proc: %d) to rpc-transport "
- "(%s)", req->xid, req->prog ? req->prog->progname: "-",
- req->prog ? req->prog->progver : 0,
- req->procnum, trans ? trans->name : "");
- }
+ LOCK(&drc->lock);
+ ret = rpcsvc_cache_reply(req, iobref, &recordhdr, 1, proghdr, hdrcount,
+ payload, payloadcount);
+ UNLOCK(&drc->lock);
+ if (ret < 0) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR, "failed to cache reply");
+ }
+ }
+
+ ret = rpcsvc_transport_submit(trans, &recordhdr, 1, proghdr, hdrcount,
+ payload, payloadcount, iobref,
+ req->trans_private);
+
+ if (ret == -1) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR,
+ "failed to submit message "
+ "(XID: 0x%x, Program: %s, ProgVers: %d, Proc: %d) to "
+ "rpc-transport (%s)",
+ req->xid, req->prog ? req->prog->progname : "(not matched)",
+ req->prog ? req->prog->progver : 0, req->procnum,
+ trans ? trans->name : "");
+ } else {
+ gf_log(GF_RPCSVC, GF_LOG_TRACE,
+ "submitted reply for rpc-message (XID: 0x%x, "
+ "Program: %s, ProgVers: %d, Proc: %d) to rpc-transport "
+ "(%s)",
+ req->xid, req->prog ? req->prog->progname : "-",
+ req->prog ? req->prog->progver : 0, req->procnum,
+ trans ? trans->name : "");
+ }
disconnect_exit:
- if (replyiob) {
- iobuf_unref (replyiob);
- }
+ if (replyiob) {
+ iobuf_unref(replyiob);
+ }
- if (new_iobref) {
- iobref_unref (iobref);
- }
+ if (new_iobref) {
+ iobref_unref(iobref);
+ }
- rpcsvc_request_destroy (req);
+ rpcsvc_request_destroy(req);
- return ret;
+ return ret;
}
-
int
-rpcsvc_error_reply (rpcsvc_request_t *req)
+rpcsvc_error_reply(rpcsvc_request_t *req)
{
- struct iovec dummyvec = {0, };
+ struct iovec dummyvec = {
+ 0,
+ };
- if (!req)
- return -1;
+ if (!req)
+ return -1;
- gf_log_callingfn ("", GF_LOG_DEBUG, "sending a RPC error reply");
+ gf_log_callingfn("", GF_LOG_DEBUG, "sending a RPC error reply");
- /* At this point the req should already have been filled with the
- * appropriate RPC error numbers.
- */
- return rpcsvc_submit_generic (req, &dummyvec, 0, NULL, 0, NULL);
+ /* At this point the req should already have been filled with the
+ * appropriate RPC error numbers.
+ */
+ return rpcsvc_submit_generic(req, &dummyvec, 0, NULL, 0, NULL);
}
#ifdef IPV6_DEFAULT
int
-rpcsvc_program_register_rpcbind6 (rpcsvc_program_t *newprog, uint32_t port)
-{
- const int IP_BUF_LEN = 64;
- char addr_buf[IP_BUF_LEN];
-
- int err = 0;
- bool_t success = 0;
- struct netconfig *nc;
- struct netbuf *nb;
-
- if (!newprog) {
- goto out;
- }
-
- nc = getnetconfigent ("tcp6");
- if (!nc) {
- err = -1;
- goto out;
- }
-
-
- err = sprintf (addr_buf, "::.%d.%d", port >> 8 & 0xff,
- port & 0xff);
- if (err < 0) {
- err = -1;
- goto out;
- }
-
- nb = uaddr2taddr (nc, addr_buf);
- if (!nb) {
- err = -1;
- goto out;
- }
-
- /* Force the unregistration of the program first.
- * This call may fail if nothing has been registered,
- * which is fine.
- */
- rpcsvc_program_unregister_rpcbind6 (newprog);
-
- success = rpcb_set (newprog->prognum, newprog->progver, nc, nb);
- if (!success) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Could not register the IPv6"
- " service with rpcbind");
- }
-
- err = 0;
+rpcsvc_program_register_rpcbind6(rpcsvc_program_t *newprog, uint32_t port)
+{
+ const int IP_BUF_LEN = 64;
+ char addr_buf[IP_BUF_LEN];
+
+ int err = 0;
+ bool_t success = 0;
+ struct netconfig *nc;
+ struct netbuf *nb;
+
+ if (!newprog) {
+ goto out;
+ }
+
+ nc = getnetconfigent("tcp6");
+ if (!nc) {
+ err = -1;
+ goto out;
+ }
+
+ err = sprintf(addr_buf, "::.%d.%d", port >> 8 & 0xff, port & 0xff);
+ if (err < 0) {
+ err = -1;
+ goto out;
+ }
+
+ nb = uaddr2taddr(nc, addr_buf);
+ if (!nb) {
+ err = -1;
+ goto out;
+ }
+
+ /* Force the unregistration of the program first.
+ * This call may fail if nothing has been registered,
+ * which is fine.
+ */
+ rpcsvc_program_unregister_rpcbind6(newprog);
+
+ success = rpcb_set(newprog->prognum, newprog->progver, nc, nb);
+ if (!success) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR,
+ "Could not register the IPv6"
+ " service with rpcbind");
+ }
+
+ err = 0;
out:
- return err;
+ return err;
}
int
-rpcsvc_program_unregister_rpcbind6 (rpcsvc_program_t *newprog)
-{
- int err = 0;
- bool_t success = 0;
- struct netconfig *nc;
-
- if (!newprog) {
- goto out;
- }
-
- nc = getnetconfigent ("tcp6");
- if (!nc) {
- err = -1;
- goto out;
- }
-
- success = rpcb_unset (newprog->prognum, newprog->progver, nc);
- if (!success) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Could not unregister the IPv6"
- " service with rpcbind");
- }
-
- err = 0;
+rpcsvc_program_unregister_rpcbind6(rpcsvc_program_t *newprog)
+{
+ int err = 0;
+ bool_t success = 0;
+ struct netconfig *nc;
+
+ if (!newprog) {
+ goto out;
+ }
+
+ nc = getnetconfigent("tcp6");
+ if (!nc) {
+ err = -1;
+ goto out;
+ }
+
+ success = rpcb_unset(newprog->prognum, newprog->progver, nc);
+ if (!success) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR,
+ "Could not unregister the IPv6"
+ " service with rpcbind");
+ }
+
+ err = 0;
out:
- return err;
+ return err;
}
#endif
/* Register the program with the local portmapper service. */
int
-rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, uint32_t port)
+rpcsvc_program_register_portmap(rpcsvc_program_t *newprog, uint32_t port)
{
- int ret = -1; /* FAIL */
+ int ret = -1; /* FAIL */
- if (!newprog) {
- goto out;
- }
+ if (!newprog) {
+ goto out;
+ }
- /* pmap_set() returns 0 for FAIL and 1 for SUCCESS */
- if (!(pmap_set (newprog->prognum, newprog->progver, IPPROTO_TCP,
- port))) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Could not register with"
- " portmap %d %d %u", newprog->prognum, newprog->progver, port);
- goto out;
- }
+ /* pmap_set() returns 0 for FAIL and 1 for SUCCESS */
+ if (!(pmap_set(newprog->prognum, newprog->progver, IPPROTO_TCP, port))) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR,
+ "Could not register with"
+ " portmap %d %d %u",
+ newprog->prognum, newprog->progver, port);
+ goto out;
+ }
- ret = 0; /* SUCCESS */
+ ret = 0; /* SUCCESS */
out:
- return ret;
+ return ret;
}
-
int
-rpcsvc_program_unregister_portmap (rpcsvc_program_t *prog)
+rpcsvc_program_unregister_portmap(rpcsvc_program_t *prog)
{
- int ret = -1;
+ int ret = -1;
- if (!prog)
- goto out;
+ if (!prog)
+ goto out;
- if (!(pmap_unset(prog->prognum, prog->progver))) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Could not unregister with"
- " portmap");
- goto out;
- }
+ if (!(pmap_unset(prog->prognum, prog->progver))) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR,
+ "Could not unregister with"
+ " portmap");
+ goto out;
+ }
- ret = 0;
+ ret = 0;
out:
- return ret;
+ return ret;
}
int
-rpcsvc_register_portmap_enabled (rpcsvc_t *svc)
+rpcsvc_register_portmap_enabled(rpcsvc_t *svc)
{
- return svc->register_portmap;
+ return svc->register_portmap;
}
int32_t
-rpcsvc_get_listener_port (rpcsvc_listener_t *listener)
+rpcsvc_get_listener_port(rpcsvc_listener_t *listener)
{
- int32_t listener_port = -1;
+ int32_t listener_port = -1;
- if ((listener == NULL) || (listener->trans == NULL)) {
- goto out;
- }
+ if ((listener == NULL) || (listener->trans == NULL)) {
+ goto out;
+ }
- switch (listener->trans->myinfo.sockaddr.ss_family) {
+ switch (listener->trans->myinfo.sockaddr.ss_family) {
case AF_INET:
- listener_port = ((struct sockaddr_in *)&listener->trans->myinfo.sockaddr)->sin_port;
- break;
+ listener_port = ((struct sockaddr_in *)&listener->trans->myinfo
+ .sockaddr)
+ ->sin_port;
+ break;
case AF_INET6:
- listener_port = ((struct sockaddr_in6 *)&listener->trans->myinfo.sockaddr)->sin6_port;
- break;
+ listener_port = ((struct sockaddr_in6 *)&listener->trans->myinfo
+ .sockaddr)
+ ->sin6_port;
+ break;
default:
- gf_log (GF_RPCSVC, GF_LOG_DEBUG,
- "invalid address family (%d)",
- listener->trans->myinfo.sockaddr.ss_family);
- goto out;
- }
+ gf_log(GF_RPCSVC, GF_LOG_DEBUG, "invalid address family (%d)",
+ listener->trans->myinfo.sockaddr.ss_family);
+ goto out;
+ }
- listener_port = ntohs (listener_port);
+ listener_port = ntohs(listener_port);
out:
- return listener_port;
+ return listener_port;
}
-
rpcsvc_listener_t *
-rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans)
+rpcsvc_get_listener(rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans)
{
- rpcsvc_listener_t *listener = NULL;
- char found = 0;
- rpcsvc_listener_t *next = NULL;
- uint32_t listener_port = 0;
+ rpcsvc_listener_t *listener = NULL;
+ char found = 0;
+ rpcsvc_listener_t *next = NULL;
+ uint32_t listener_port = 0;
- if (!svc) {
- goto out;
- }
+ if (!svc) {
+ goto out;
+ }
- pthread_rwlock_rdlock (&svc->rpclock);
+ pthread_rwlock_rdlock(&svc->rpclock);
+ {
+ list_for_each_entry_safe(listener, next, &svc->listeners, list)
{
- list_for_each_entry_safe (listener, next, &svc->listeners, list) {
- if (trans != NULL) {
- if (listener->trans == trans) {
- found = 1;
- break;
- }
-
- continue;
- }
-
- listener_port = rpcsvc_get_listener_port (listener);
- if (listener_port == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR,
- "invalid port for listener %s",
- listener->trans->name);
- continue;
- }
-
- if (listener_port == port) {
- found = 1;
- break;
- }
+ if (trans != NULL) {
+ if (listener->trans == trans) {
+ found = 1;
+ break;
}
- }
- pthread_rwlock_unlock (&svc->rpclock);
- if (!found) {
- listener = NULL;
+ continue;
+ }
+
+ listener_port = rpcsvc_get_listener_port(listener);
+ if (listener_port == -1) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR, "invalid port for listener %s",
+ listener->trans->name);
+ continue;
+ }
+
+ if (listener_port == port) {
+ found = 1;
+ break;
+ }
}
+ }
+ pthread_rwlock_unlock(&svc->rpclock);
+
+ if (!found) {
+ listener = NULL;
+ }
out:
- return listener;
+ return listener;
}
-
/* The only difference between the generic submit and this one is that the
* generic submit is also used for submitting RPC error replies in where there
* are no payloads so the msgvec and msgbuf can be NULL.
@@ -1636,819 +1643,824 @@ out:
* we must perform NULL checks before calling the generic submit.
*/
int
-rpcsvc_submit_message (rpcsvc_request_t *req, struct iovec *proghdr,
- int hdrcount, struct iovec *payload, int payloadcount,
- struct iobref *iobref)
+rpcsvc_submit_message(rpcsvc_request_t *req, struct iovec *proghdr,
+ int hdrcount, struct iovec *payload, int payloadcount,
+ struct iobref *iobref)
{
- if ((!req) || (!req->trans) || (!proghdr) || (!proghdr->iov_base))
- return -1;
+ if ((!req) || (!req->trans) || (!proghdr) || (!proghdr->iov_base))
+ return -1;
- return rpcsvc_submit_generic (req, proghdr, hdrcount, payload,
- payloadcount, iobref);
+ return rpcsvc_submit_generic(req, proghdr, hdrcount, payload, payloadcount,
+ iobref);
}
-
int
-rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t *program)
-{
- int ret = -1;
- rpcsvc_program_t *prog = NULL;
- if (!svc || !program) {
- goto out;
- }
-
- ret = rpcsvc_program_unregister_portmap (program);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "portmap unregistration of"
- " program failed");
- goto out;
- }
+rpcsvc_program_unregister(rpcsvc_t *svc, rpcsvc_program_t *program)
+{
+ int ret = -1;
+ rpcsvc_program_t *prog = NULL;
+ if (!svc || !program) {
+ goto out;
+ }
+
+ ret = rpcsvc_program_unregister_portmap(program);
+ if (ret == -1) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR,
+ "portmap unregistration of"
+ " program failed");
+ goto out;
+ }
#ifdef IPV6_DEFAULT
- ret = rpcsvc_program_unregister_rpcbind6 (program);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "rpcbind (ipv6)"
- " unregistration of program failed");
- goto out;
- }
+ ret = rpcsvc_program_unregister_rpcbind6(program);
+ if (ret == -1) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR,
+ "rpcbind (ipv6)"
+ " unregistration of program failed");
+ goto out;
+ }
#endif
- pthread_rwlock_rdlock (&svc->rpclock);
+ pthread_rwlock_rdlock(&svc->rpclock);
+ {
+ list_for_each_entry(prog, &svc->programs, program)
{
- list_for_each_entry (prog, &svc->programs, program) {
- if ((prog->prognum == program->prognum)
- && (prog->progver == program->progver)) {
- break;
- }
- }
+ if ((prog->prognum == program->prognum) &&
+ (prog->progver == program->progver)) {
+ break;
+ }
}
- pthread_rwlock_unlock (&svc->rpclock);
+ }
+ pthread_rwlock_unlock(&svc->rpclock);
- gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Program unregistered: %s, Num: %d,"
- " Ver: %d, Port: %d", prog->progname, prog->prognum,
- prog->progver, prog->progport);
+ gf_log(GF_RPCSVC, GF_LOG_DEBUG,
+ "Program unregistered: %s, Num: %d,"
+ " Ver: %d, Port: %d",
+ prog->progname, prog->prognum, prog->progver, prog->progport);
- if (prog->ownthread) {
- prog->alive = _gf_false;
- ret = 0;
- goto out;
- }
+ if (prog->ownthread) {
+ prog->alive = _gf_false;
+ ret = 0;
+ goto out;
+ }
- pthread_rwlock_wrlock (&svc->rpclock);
- {
- list_del_init (&prog->program);
- }
- pthread_rwlock_unlock (&svc->rpclock);
+ pthread_rwlock_wrlock(&svc->rpclock);
+ {
+ list_del_init(&prog->program);
+ }
+ pthread_rwlock_unlock(&svc->rpclock);
- ret = 0;
+ ret = 0;
out:
- if (ret == -1) {
- if (program) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Program "
- "unregistration failed"
- ": %s, Num: %d, Ver: %d, Port: %d",
- program->progname, program->prognum,
- program->progver, program->progport);
- } else {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Program not found");
- }
+ if (ret == -1) {
+ if (program) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR,
+ "Program "
+ "unregistration failed"
+ ": %s, Num: %d, Ver: %d, Port: %d",
+ program->progname, program->prognum, program->progver,
+ program->progport);
+ } else {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR, "Program not found");
}
+ }
- return ret;
+ return ret;
}
-
int
-rpcsvc_transport_peername (rpc_transport_t *trans, char *hostname, int hostlen)
+rpcsvc_transport_peername(rpc_transport_t *trans, char *hostname, int hostlen)
{
- if (!trans) {
- return -1;
- }
+ if (!trans) {
+ return -1;
+ }
- return rpc_transport_get_peername (trans, hostname, hostlen);
+ return rpc_transport_get_peername(trans, hostname, hostlen);
}
-
int
-rpcsvc_transport_peeraddr (rpc_transport_t *trans, char *addrstr, int addrlen,
- struct sockaddr_storage *sa, socklen_t sasize)
+rpcsvc_transport_peeraddr(rpc_transport_t *trans, char *addrstr, int addrlen,
+ struct sockaddr_storage *sa, socklen_t sasize)
{
- if (!trans) {
- return -1;
- }
+ if (!trans) {
+ return -1;
+ }
- return rpc_transport_get_peeraddr(trans, addrstr, addrlen, sa,
- sasize);
+ return rpc_transport_get_peeraddr(trans, addrstr, addrlen, sa, sasize);
}
rpcsvc_listener_t *
-rpcsvc_listener_alloc (rpcsvc_t *svc, rpc_transport_t *trans)
+rpcsvc_listener_alloc(rpcsvc_t *svc, rpc_transport_t *trans)
{
- rpcsvc_listener_t *listener = NULL;
+ rpcsvc_listener_t *listener = NULL;
- listener = GF_CALLOC (1, sizeof (*listener),
- gf_common_mt_rpcsvc_listener_t);
- if (!listener) {
- goto out;
- }
+ listener = GF_CALLOC(1, sizeof(*listener), gf_common_mt_rpcsvc_listener_t);
+ if (!listener) {
+ goto out;
+ }
- listener->trans = trans;
- listener->svc = svc;
+ listener->trans = trans;
+ listener->svc = svc;
- INIT_LIST_HEAD (&listener->list);
+ INIT_LIST_HEAD(&listener->list);
- pthread_rwlock_wrlock (&svc->rpclock);
- {
- list_add_tail (&listener->list, &svc->listeners);
- }
- pthread_rwlock_unlock (&svc->rpclock);
+ pthread_rwlock_wrlock(&svc->rpclock);
+ {
+ list_add_tail(&listener->list, &svc->listeners);
+ }
+ pthread_rwlock_unlock(&svc->rpclock);
out:
- return listener;
+ return listener;
}
-
int32_t
-rpcsvc_create_listener (rpcsvc_t *svc, dict_t *options, char *name)
-{
- rpc_transport_t *trans = NULL;
- rpcsvc_listener_t *listener = NULL;
- int32_t ret = -1;
-
- if (!svc || !options) {
- goto out;
- }
-
- trans = rpc_transport_load (svc->ctx, options, name);
- if (!trans) {
- gf_log (GF_RPCSVC, GF_LOG_WARNING, "cannot create listener, "
- "initing the transport failed");
- goto out;
- }
-
- ret = rpc_transport_listen (trans);
- if (ret == -EADDRINUSE || ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_WARNING,
- "listening on transport failed");
- goto out;
- }
-
- ret = rpc_transport_register_notify (trans, rpcsvc_notify, svc);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_WARNING, "registering notify failed");
- goto out;
- }
-
- listener = rpcsvc_listener_alloc (svc, trans);
- if (listener == NULL) {
- goto out;
- }
-
- ret = 0;
+rpcsvc_create_listener(rpcsvc_t *svc, dict_t *options, char *name)
+{
+ rpc_transport_t *trans = NULL;
+ rpcsvc_listener_t *listener = NULL;
+ int32_t ret = -1;
+
+ if (!svc || !options) {
+ goto out;
+ }
+
+ trans = rpc_transport_load(svc->ctx, options, name);
+ if (!trans) {
+ gf_log(GF_RPCSVC, GF_LOG_WARNING,
+ "cannot create listener, "
+ "initing the transport failed");
+ goto out;
+ }
+
+ ret = rpc_transport_listen(trans);
+ if (ret == -EADDRINUSE || ret == -1) {
+ gf_log(GF_RPCSVC, GF_LOG_WARNING, "listening on transport failed");
+ goto out;
+ }
+
+ ret = rpc_transport_register_notify(trans, rpcsvc_notify, svc);
+ if (ret == -1) {
+ gf_log(GF_RPCSVC, GF_LOG_WARNING, "registering notify failed");
+ goto out;
+ }
+
+ listener = rpcsvc_listener_alloc(svc, trans);
+ if (listener == NULL) {
+ goto out;
+ }
+
+ ret = 0;
out:
- if (!listener && trans) {
- rpc_transport_disconnect (trans, _gf_true);
- }
+ if (!listener && trans) {
+ rpc_transport_disconnect(trans, _gf_true);
+ }
- return ret;
+ return ret;
}
-
int32_t
-rpcsvc_create_listeners (rpcsvc_t *svc, dict_t *options, char *name)
+rpcsvc_create_listeners(rpcsvc_t *svc, dict_t *options, char *name)
{
- int32_t ret = -1, count = 0;
- data_t *data = NULL;
- char *str = NULL, *ptr = NULL, *transport_name = NULL;
- char *transport_type = NULL, *saveptr = NULL, *tmp = NULL;
+ int32_t ret = -1, count = 0;
+ data_t *data = NULL;
+ char *str = NULL, *ptr = NULL, *transport_name = NULL;
+ char *transport_type = NULL, *saveptr = NULL, *tmp = NULL;
- if ((svc == NULL) || (options == NULL) || (name == NULL)) {
- goto out;
- }
+ if ((svc == NULL) || (options == NULL) || (name == NULL)) {
+ goto out;
+ }
- data = dict_get (options, "transport-type");
- if (data == NULL) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR,
- "option transport-type not set");
- goto out;
- }
+ data = dict_get(options, "transport-type");
+ if (data == NULL) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR, "option transport-type not set");
+ goto out;
+ }
- transport_type = data_to_str (data);
- if (transport_type == NULL) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR,
- "option transport-type not set");
- goto out;
- }
+ transport_type = data_to_str(data);
+ if (transport_type == NULL) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR, "option transport-type not set");
+ goto out;
+ }
- /* duplicate transport_type, since following dict_set will free it */
- transport_type = gf_strdup (transport_type);
- if (transport_type == NULL) {
- goto out;
- }
+ /* duplicate transport_type, since following dict_set will free it */
+ transport_type = gf_strdup(transport_type);
+ if (transport_type == NULL) {
+ goto out;
+ }
- str = gf_strdup (transport_type);
- if (str == NULL) {
- goto out;
- }
+ str = gf_strdup(transport_type);
+ if (str == NULL) {
+ goto out;
+ }
- ptr = strtok_r (str, ",", &saveptr);
+ ptr = strtok_r(str, ",", &saveptr);
- while (ptr != NULL) {
- tmp = gf_strdup (ptr);
- if (tmp == NULL) {
- goto out;
- }
-
- ret = gf_asprintf (&transport_name, "%s.%s", tmp, name);
- if (ret == -1) {
- goto out;
- }
+ while (ptr != NULL) {
+ tmp = gf_strdup(ptr);
+ if (tmp == NULL) {
+ goto out;
+ }
- ret = dict_set_dynstr (options, "transport-type", tmp);
- if (ret == -1) {
- goto out;
- }
+ ret = gf_asprintf(&transport_name, "%s.%s", tmp, name);
+ if (ret == -1) {
+ goto out;
+ }
- tmp = NULL;
- ptr = strtok_r (NULL, ",", &saveptr);
+ ret = dict_set_dynstr(options, "transport-type", tmp);
+ if (ret == -1) {
+ goto out;
+ }
- ret = rpcsvc_create_listener (svc, options, transport_name);
- if (ret != 0) {
- goto out;
- }
+ tmp = NULL;
+ ptr = strtok_r(NULL, ",", &saveptr);
- GF_FREE (transport_name);
- transport_name = NULL;
- count++;
+ ret = rpcsvc_create_listener(svc, options, transport_name);
+ if (ret != 0) {
+ goto out;
}
- ret = dict_set_dynstr (options, "transport-type", transport_type);
- if (ret == -1) {
- goto out;
- }
+ GF_FREE(transport_name);
+ transport_name = NULL;
+ count++;
+ }
+
+ ret = dict_set_dynstr(options, "transport-type", transport_type);
+ if (ret == -1) {
+ goto out;
+ }
- transport_type = NULL;
+ transport_type = NULL;
out:
- GF_FREE (str);
+ GF_FREE(str);
- GF_FREE (transport_type);
+ GF_FREE(transport_type);
- GF_FREE (tmp);
+ GF_FREE(tmp);
- GF_FREE (transport_name);
+ GF_FREE(transport_name);
- if (count > 0) {
- return count;
- } else {
- return ret;
- }
+ if (count > 0) {
+ return count;
+ } else {
+ return ret;
+ }
}
-
int
-rpcsvc_unregister_notify (rpcsvc_t *svc, rpcsvc_notify_t notify, void *mydata)
+rpcsvc_unregister_notify(rpcsvc_t *svc, rpcsvc_notify_t notify, void *mydata)
{
- rpcsvc_notify_wrapper_t *wrapper = NULL, *tmp = NULL;
- int ret = 0;
+ rpcsvc_notify_wrapper_t *wrapper = NULL, *tmp = NULL;
+ int ret = 0;
- if (!svc || !notify) {
- goto out;
- }
+ if (!svc || !notify) {
+ goto out;
+ }
- pthread_rwlock_wrlock (&svc->rpclock);
+ pthread_rwlock_wrlock(&svc->rpclock);
+ {
+ list_for_each_entry_safe(wrapper, tmp, &svc->notify, list)
{
- list_for_each_entry_safe (wrapper, tmp, &svc->notify, list) {
- if ((wrapper->notify == notify)
- && (mydata == wrapper->data)) {
- list_del_init (&wrapper->list);
- GF_FREE (wrapper);
- ret++;
- }
- }
+ if ((wrapper->notify == notify) && (mydata == wrapper->data)) {
+ list_del_init(&wrapper->list);
+ GF_FREE(wrapper);
+ ret++;
+ }
}
- pthread_rwlock_unlock (&svc->rpclock);
+ }
+ pthread_rwlock_unlock(&svc->rpclock);
out:
- return ret;
+ return ret;
}
int
-rpcsvc_register_notify (rpcsvc_t *svc, rpcsvc_notify_t notify, void *mydata)
+rpcsvc_register_notify(rpcsvc_t *svc, rpcsvc_notify_t notify, void *mydata)
+{
+ rpcsvc_notify_wrapper_t *wrapper = NULL;
+ int ret = -1;
+
+ wrapper = rpcsvc_notify_wrapper_alloc();
+ if (!wrapper) {
+ goto out;
+ }
+ svc->mydata = mydata;
+ wrapper->data = mydata;
+ wrapper->notify = notify;
+
+ pthread_rwlock_wrlock(&svc->rpclock);
+ {
+ list_add_tail(&wrapper->list, &svc->notify);
+ svc->notify_count++;
+ }
+ pthread_rwlock_unlock(&svc->rpclock);
+
+ ret = 0;
+out:
+ return ret;
+}
+
+void *
+rpcsvc_request_handler(void *arg)
{
- rpcsvc_notify_wrapper_t *wrapper = NULL;
- int ret = -1;
+ rpcsvc_program_t *program = arg;
+ rpcsvc_request_t *req = NULL;
+ rpcsvc_actor_t *actor = NULL;
+ gf_boolean_t done = _gf_false;
+ int ret = 0;
- wrapper = rpcsvc_notify_wrapper_alloc ();
- if (!wrapper) {
- goto out;
- }
- svc->mydata = mydata;
- wrapper->data = mydata;
- wrapper->notify = notify;
+ if (!program)
+ return NULL;
- pthread_rwlock_wrlock (&svc->rpclock);
+ while (1) {
+ pthread_mutex_lock(&program->queue_lock);
{
- list_add_tail (&wrapper->list, &svc->notify);
- svc->notify_count++;
- }
- pthread_rwlock_unlock (&svc->rpclock);
+ if (!program->alive && list_empty(&program->request_queue)) {
+ done = 1;
+ goto unlock;
+ }
- ret = 0;
-out:
- return ret;
-}
+ while (list_empty(&program->request_queue) &&
+ (program->threadcount <= program->eventthreadcount)) {
+ pthread_cond_wait(&program->queue_cond, &program->queue_lock);
+ }
-void *
-rpcsvc_request_handler (void *arg)
-{
- rpcsvc_program_t *program = arg;
- rpcsvc_request_t *req = NULL;
- rpcsvc_actor_t *actor = NULL;
- gf_boolean_t done = _gf_false;
- int ret = 0;
-
- if (!program)
- return NULL;
-
- while (1) {
- pthread_mutex_lock (&program->queue_lock);
- {
- if (!program->alive
- && list_empty (&program->request_queue)) {
- done = 1;
- goto unlock;
- }
-
- while (list_empty (&program->request_queue) &&
- (program->threadcount <=
- program->eventthreadcount)) {
- pthread_cond_wait (&program->queue_cond,
- &program->queue_lock);
- }
-
- if (program->threadcount > program->eventthreadcount) {
- done = 1;
- program->threadcount--;
-
- gf_log (GF_RPCSVC, GF_LOG_INFO,
- "program '%s' thread terminated; "
- "total count:%d",
- program->progname,
- program->threadcount);
- } else if (!list_empty (&program->request_queue)) {
- req = list_entry (program->request_queue.next,
- typeof (*req), request_list);
-
- list_del_init (&req->request_list);
- }
- }
- unlock:
- pthread_mutex_unlock (&program->queue_lock);
-
- if (req) {
- THIS = req->svc->xl;
- actor = rpcsvc_program_actor (req);
- ret = actor->actor (req);
-
- if (ret != 0) {
- rpcsvc_check_and_reply_error (ret, NULL, req);
- }
- req = NULL;
- }
+ if (program->threadcount > program->eventthreadcount) {
+ done = 1;
+ program->threadcount--;
+
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "program '%s' thread terminated; "
+ "total count:%d",
+ program->progname, program->threadcount);
+ } else if (!list_empty(&program->request_queue)) {
+ req = list_entry(program->request_queue.next, typeof(*req),
+ request_list);
- if (done)
- break;
+ list_del_init(&req->request_list);
+ }
}
+ unlock:
+ pthread_mutex_unlock(&program->queue_lock);
- return NULL;
+ if (req) {
+ THIS = req->svc->xl;
+ actor = rpcsvc_program_actor(req);
+ ret = actor->actor(req);
+
+ if (ret != 0) {
+ rpcsvc_check_and_reply_error(ret, NULL, req);
+ }
+ req = NULL;
+ }
+
+ if (done)
+ break;
+ }
+
+ return NULL;
}
int
-rpcsvc_spawn_threads (rpcsvc_t *svc, rpcsvc_program_t *program)
+rpcsvc_spawn_threads(rpcsvc_t *svc, rpcsvc_program_t *program)
{
- int ret = 0, delta = 0, creates = 0;
+ int ret = 0, delta = 0, creates = 0;
- if (!program || !svc)
- goto out;
+ if (!program || !svc)
+ goto out;
- pthread_mutex_lock (&program->queue_lock);
- {
- delta = program->eventthreadcount - program->threadcount;
-
- if (delta >= 0) {
- while (delta--) {
- ret = gf_thread_create (&program->thread, NULL,
- rpcsvc_request_handler,
- program, "rpcrqhnd");
- if (!ret) {
- program->threadcount++;
- creates++;
- }
- }
-
- if (creates) {
- gf_log (GF_RPCSVC, GF_LOG_INFO,
- "spawned %d threads for program '%s'; "
- "total count:%d",
- creates,
- program->progname,
- program->threadcount);
- }
- } else {
- gf_log (GF_RPCSVC, GF_LOG_INFO,
- "terminating %d threads for program '%s'",
- -delta, program->progname);
-
- /* this signal is to just wake up the threads so they
- * test for the change in eventthreadcount and kill
- * themselves until the program thread count becomes
- * equal to the event thread count
- */
- pthread_cond_broadcast (&program->queue_cond);
+ pthread_mutex_lock(&program->queue_lock);
+ {
+ delta = program->eventthreadcount - program->threadcount;
+
+ if (delta >= 0) {
+ while (delta--) {
+ ret = gf_thread_create(&program->thread, NULL,
+ rpcsvc_request_handler, program,
+ "rpcrqhnd");
+ if (!ret) {
+ program->threadcount++;
+ creates++;
}
+ }
+
+ if (creates) {
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "spawned %d threads for program '%s'; "
+ "total count:%d",
+ creates, program->progname, program->threadcount);
+ }
+ } else {
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "terminating %d threads for program '%s'", -delta,
+ program->progname);
+
+ /* this signal is to just wake up the threads so they
+ * test for the change in eventthreadcount and kill
+ * themselves until the program thread count becomes
+ * equal to the event thread count
+ */
+ pthread_cond_broadcast(&program->queue_cond);
}
- pthread_mutex_unlock (&program->queue_lock);
+ }
+ pthread_mutex_unlock(&program->queue_lock);
out:
- return creates;
+ return creates;
}
int
-rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program,
- gf_boolean_t add_to_head)
+rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program,
+ gf_boolean_t add_to_head)
{
- int ret = -1;
- int creates = -1;
- rpcsvc_program_t *newprog = NULL;
- char already_registered = 0;
+ int ret = -1;
+ int creates = -1;
+ rpcsvc_program_t *newprog = NULL;
+ char already_registered = 0;
- if (!svc) {
- goto out;
- }
+ if (!svc) {
+ goto out;
+ }
- if (program->actors == NULL) {
- goto out;
- }
+ if (program->actors == NULL) {
+ goto out;
+ }
- pthread_rwlock_rdlock (&svc->rpclock);
+ pthread_rwlock_rdlock(&svc->rpclock);
+ {
+ list_for_each_entry(newprog, &svc->programs, program)
{
- list_for_each_entry (newprog, &svc->programs, program) {
- if ((newprog->prognum == program->prognum)
- && (newprog->progver == program->progver)) {
- already_registered = 1;
- break;
- }
- }
+ if ((newprog->prognum == program->prognum) &&
+ (newprog->progver == program->progver)) {
+ already_registered = 1;
+ break;
+ }
}
- pthread_rwlock_unlock (&svc->rpclock);
+ }
+ pthread_rwlock_unlock(&svc->rpclock);
- if (already_registered) {
- ret = 0;
- goto out;
- }
+ if (already_registered) {
+ ret = 0;
+ goto out;
+ }
- newprog = GF_CALLOC (1, sizeof(*newprog),gf_common_mt_rpcsvc_program_t);
- if (newprog == NULL) {
- goto out;
- }
+ newprog = GF_CALLOC(1, sizeof(*newprog), gf_common_mt_rpcsvc_program_t);
+ if (newprog == NULL) {
+ goto out;
+ }
- memcpy (newprog, program, sizeof (*program));
+ memcpy(newprog, program, sizeof(*program));
- INIT_LIST_HEAD (&newprog->program);
- INIT_LIST_HEAD (&newprog->request_queue);
- pthread_mutex_init (&newprog->queue_lock, NULL);
- pthread_cond_init (&newprog->queue_cond, NULL);
+ INIT_LIST_HEAD(&newprog->program);
+ INIT_LIST_HEAD(&newprog->request_queue);
+ pthread_mutex_init(&newprog->queue_lock, NULL);
+ pthread_cond_init(&newprog->queue_cond, NULL);
- newprog->alive = _gf_true;
+ newprog->alive = _gf_true;
- /* make sure synctask gets priority over ownthread */
- if (newprog->synctask)
- newprog->ownthread = _gf_false;
+ /* make sure synctask gets priority over ownthread */
+ if (newprog->synctask)
+ newprog->ownthread = _gf_false;
- if (newprog->ownthread) {
- newprog->eventthreadcount = 1;
- creates = rpcsvc_spawn_threads (svc, newprog);
+ if (newprog->ownthread) {
+ newprog->eventthreadcount = 1;
+ creates = rpcsvc_spawn_threads(svc, newprog);
- if (creates < 1) {
- goto out;
- }
+ if (creates < 1) {
+ goto out;
}
+ }
- pthread_rwlock_wrlock (&svc->rpclock);
- {
- if (add_to_head)
- list_add (&newprog->program, &svc->programs);
- else
- list_add_tail (&newprog->program, &svc->programs);
- }
- pthread_rwlock_unlock (&svc->rpclock);
+ pthread_rwlock_wrlock(&svc->rpclock);
+ {
+ if (add_to_head)
+ list_add(&newprog->program, &svc->programs);
+ else
+ list_add_tail(&newprog->program, &svc->programs);
+ }
+ pthread_rwlock_unlock(&svc->rpclock);
- ret = 0;
- gf_log (GF_RPCSVC, GF_LOG_DEBUG, "New program registered: %s, Num: %d,"
- " Ver: %d, Port: %d", newprog->progname, newprog->prognum,
- newprog->progver, newprog->progport);
+ ret = 0;
+ gf_log(GF_RPCSVC, GF_LOG_DEBUG,
+ "New program registered: %s, Num: %d,"
+ " Ver: %d, Port: %d",
+ newprog->progname, newprog->prognum, newprog->progver,
+ newprog->progport);
out:
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Program registration failed:"
- " %s, Num: %d, Ver: %d, Port: %d", program->progname,
- program->prognum, program->progver, program->progport);
- }
+ if (ret == -1) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR,
+ "Program registration failed:"
+ " %s, Num: %d, Ver: %d, Port: %d",
+ program->progname, program->prognum, program->progver,
+ program->progport);
+ }
- return ret;
+ return ret;
}
static void
-free_prog_details (gf_dump_rsp *rsp)
+free_prog_details(gf_dump_rsp *rsp)
{
- gf_prog_detail *prev = NULL;
- gf_prog_detail *trav = NULL;
+ gf_prog_detail *prev = NULL;
+ gf_prog_detail *trav = NULL;
- trav = rsp->prog;
- while (trav) {
- prev = trav;
- trav = trav->next;
- GF_FREE (prev);
- }
+ trav = rsp->prog;
+ while (trav) {
+ prev = trav;
+ trav = trav->next;
+ GF_FREE(prev);
+ }
}
static int
-build_prog_details (rpcsvc_request_t *req, gf_dump_rsp *rsp)
+build_prog_details(rpcsvc_request_t *req, gf_dump_rsp *rsp)
{
- int ret = -1;
- rpcsvc_program_t *program = NULL;
- gf_prog_detail *prog = NULL;
- gf_prog_detail *prev = NULL;
+ int ret = -1;
+ rpcsvc_program_t *program = NULL;
+ gf_prog_detail *prog = NULL;
+ gf_prog_detail *prev = NULL;
- if (!req || !req->trans || !req->svc)
- goto out;
+ if (!req || !req->trans || !req->svc)
+ goto out;
- pthread_rwlock_rdlock (&req->svc->rpclock);
+ pthread_rwlock_rdlock(&req->svc->rpclock);
+ {
+ list_for_each_entry(program, &req->svc->programs, program)
{
- list_for_each_entry (program, &req->svc->programs, program) {
- prog = GF_CALLOC (1, sizeof (*prog), 0);
- if (!prog)
- goto unlock;
-
- prog->progname = program->progname;
- prog->prognum = program->prognum;
- prog->progver = program->progver;
-
- if (!rsp->prog)
- rsp->prog = prog;
- if (prev)
- prev->next = prog;
- prev = prog;
- }
- if (prev)
- ret = 0;
- }
+ prog = GF_CALLOC(1, sizeof(*prog), 0);
+ if (!prog)
+ goto unlock;
+
+ prog->progname = program->progname;
+ prog->prognum = program->prognum;
+ prog->progver = program->progver;
+
+ if (!rsp->prog)
+ rsp->prog = prog;
+ if (prev)
+ prev->next = prog;
+ prev = prog;
+ }
+ if (prev)
+ ret = 0;
+ }
unlock:
- pthread_rwlock_unlock (&req->svc->rpclock);
+ pthread_rwlock_unlock(&req->svc->rpclock);
out:
- return ret;
+ return ret;
}
static int
-rpcsvc_ping (rpcsvc_request_t *req)
+rpcsvc_ping(rpcsvc_request_t *req)
{
- char rsp_buf[8 * 1024] = {0,};
- gf_common_rsp rsp = {0,};
- struct iovec iov = {0,};
- int ret = -1;
- uint32_t ping_rsp_len = 0;
+ char rsp_buf[8 * 1024] = {
+ 0,
+ };
+ gf_common_rsp rsp = {
+ 0,
+ };
+ struct iovec iov = {
+ 0,
+ };
+ int ret = -1;
+ uint32_t ping_rsp_len = 0;
- ping_rsp_len = xdr_sizeof ((xdrproc_t) xdr_gf_common_rsp,
- &rsp);
+ ping_rsp_len = xdr_sizeof((xdrproc_t)xdr_gf_common_rsp, &rsp);
- iov.iov_base = rsp_buf;
- iov.iov_len = ping_rsp_len;
+ iov.iov_base = rsp_buf;
+ iov.iov_len = ping_rsp_len;
- ret = xdr_serialize_generic (iov, &rsp, (xdrproc_t)xdr_gf_common_rsp);
- if (ret < 0) {
- ret = RPCSVC_ACTOR_ERROR;
- } else {
- rsp.op_ret = 0;
- rpcsvc_submit_generic (req, &iov, 1, NULL, 0, NULL);
- }
+ ret = xdr_serialize_generic(iov, &rsp, (xdrproc_t)xdr_gf_common_rsp);
+ if (ret < 0) {
+ ret = RPCSVC_ACTOR_ERROR;
+ } else {
+ rsp.op_ret = 0;
+ rpcsvc_submit_generic(req, &iov, 1, NULL, 0, NULL);
+ }
- return 0;
+ return 0;
}
static int
-rpcsvc_dump (rpcsvc_request_t *req)
-{
- char rsp_buf[8 * 1024] = {0,};
- gf_dump_rsp rsp = {0,};
- struct iovec iov = {0,};
- int op_errno = EINVAL;
- int ret = -1;
- uint32_t dump_rsp_len = 0;
-
- if (!req)
- goto sendrsp;
-
- ret = build_prog_details (req, &rsp);
- if (ret < 0) {
- op_errno = -ret;
- goto sendrsp;
- }
-
- op_errno = 0;
+rpcsvc_dump(rpcsvc_request_t *req)
+{
+ char rsp_buf[8 * 1024] = {
+ 0,
+ };
+ gf_dump_rsp rsp = {
+ 0,
+ };
+ struct iovec iov = {
+ 0,
+ };
+ int op_errno = EINVAL;
+ int ret = -1;
+ uint32_t dump_rsp_len = 0;
+
+ if (!req)
+ goto sendrsp;
+
+ ret = build_prog_details(req, &rsp);
+ if (ret < 0) {
+ op_errno = -ret;
+ goto sendrsp;
+ }
+
+ op_errno = 0;
sendrsp:
- rsp.op_errno = gf_errno_to_error (op_errno);
- rsp.op_ret = ret;
+ rsp.op_errno = gf_errno_to_error(op_errno);
+ rsp.op_ret = ret;
- dump_rsp_len = xdr_sizeof ((xdrproc_t) xdr_gf_dump_rsp,
- &rsp);
+ dump_rsp_len = xdr_sizeof((xdrproc_t)xdr_gf_dump_rsp, &rsp);
- iov.iov_base = rsp_buf;
- iov.iov_len = dump_rsp_len;
+ iov.iov_base = rsp_buf;
+ iov.iov_len = dump_rsp_len;
- ret = xdr_serialize_generic (iov, &rsp, (xdrproc_t)xdr_gf_dump_rsp);
- if (ret < 0) {
- ret = RPCSVC_ACTOR_ERROR;
- } else {
- rpcsvc_submit_generic (req, &iov, 1, NULL, 0, NULL);
- ret = 0;
- }
+ ret = xdr_serialize_generic(iov, &rsp, (xdrproc_t)xdr_gf_dump_rsp);
+ if (ret < 0) {
+ ret = RPCSVC_ACTOR_ERROR;
+ } else {
+ rpcsvc_submit_generic(req, &iov, 1, NULL, 0, NULL);
+ ret = 0;
+ }
- free_prog_details (&rsp);
+ free_prog_details(&rsp);
- return ret;
+ return ret;
}
int
-rpcsvc_init_options (rpcsvc_t *svc, dict_t *options)
+rpcsvc_init_options(rpcsvc_t *svc, dict_t *options)
{
- char *optstr = NULL;
- int ret = -1;
+ char *optstr = NULL;
+ int ret = -1;
- if ((!svc) || (!options))
- return -1;
+ if ((!svc) || (!options))
+ return -1;
- svc->memfactor = RPCSVC_DEFAULT_MEMFACTOR;
+ svc->memfactor = RPCSVC_DEFAULT_MEMFACTOR;
- svc->register_portmap = _gf_true;
- if (dict_get (options, "rpc.register-with-portmap")) {
- ret = dict_get_str (options, "rpc.register-with-portmap",
- &optstr);
- if (ret < 0) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to parse "
- "dict");
- goto out;
- }
-
- ret = gf_string2boolean (optstr, &svc->register_portmap);
- if (ret < 0) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to parse bool "
- "string");
- goto out;
- }
+ svc->register_portmap = _gf_true;
+ if (dict_get(options, "rpc.register-with-portmap")) {
+ ret = dict_get_str(options, "rpc.register-with-portmap", &optstr);
+ if (ret < 0) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR,
+ "Failed to parse "
+ "dict");
+ goto out;
}
- if (!svc->register_portmap)
- gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Portmap registration "
- "disabled");
- ret = 0;
+ ret = gf_string2boolean(optstr, &svc->register_portmap);
+ if (ret < 0) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR,
+ "Failed to parse bool "
+ "string");
+ goto out;
+ }
+ }
+
+ if (!svc->register_portmap)
+ gf_log(GF_RPCSVC, GF_LOG_DEBUG,
+ "Portmap registration "
+ "disabled");
+ ret = 0;
out:
- return ret;
+ return ret;
}
int
-rpcsvc_reconfigure_options (rpcsvc_t *svc, dict_t *options)
-{
- xlator_t *xlator = NULL;
- xlator_list_t *volentry = NULL;
- char *srchkey = NULL;
- char *keyval = NULL;
- int ret = -1;
-
- if ((!svc) || (!svc->options) || (!options))
- return (-1);
-
- /* Fetch the xlator from svc */
- xlator = svc->xl;
- if (!xlator)
+rpcsvc_reconfigure_options(rpcsvc_t *svc, dict_t *options)
+{
+ xlator_t *xlator = NULL;
+ xlator_list_t *volentry = NULL;
+ char *srchkey = NULL;
+ char *keyval = NULL;
+ int ret = -1;
+
+ if ((!svc) || (!svc->options) || (!options))
+ return (-1);
+
+ /* Fetch the xlator from svc */
+ xlator = svc->xl;
+ if (!xlator)
+ return (-1);
+
+ /* Reconfigure the volume specific rpc-auth.addr allow part */
+ volentry = xlator->children;
+ while (volentry) {
+ ret = gf_asprintf(&srchkey, "rpc-auth.addr.%s.allow",
+ volentry->xlator->name);
+ if (ret == -1) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ return (-1);
+ }
+
+ /* key-string: rpc-auth.addr.<volname>.allow
+ *
+ * IMP: Delete the OLD key/value pair from dict.
+ * And set the NEW key/value pair IFF the option is SET
+ * in reconfigured volfile.
+ *
+ * NB: If rpc-auth.addr.<volname>.allow is not SET explicitly,
+ * build_nfs_graph() sets it as "*" i.e. anonymous.
+ */
+ dict_del(svc->options, srchkey);
+ if (!dict_get_str(options, srchkey, &keyval)) {
+ ret = dict_set_str(svc->options, srchkey, keyval);
+ if (ret < 0) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR, "dict_set_str error");
+ GF_FREE(srchkey);
return (-1);
-
- /* Reconfigure the volume specific rpc-auth.addr allow part */
- volentry = xlator->children;
- while (volentry) {
- ret = gf_asprintf (&srchkey, "rpc-auth.addr.%s.allow",
- volentry->xlator->name);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
- return (-1);
- }
-
- /* key-string: rpc-auth.addr.<volname>.allow
- *
- * IMP: Delete the OLD key/value pair from dict.
- * And set the NEW key/value pair IFF the option is SET
- * in reconfigured volfile.
- *
- * NB: If rpc-auth.addr.<volname>.allow is not SET explicitly,
- * build_nfs_graph() sets it as "*" i.e. anonymous.
- */
- dict_del (svc->options, srchkey);
- if (!dict_get_str (options, srchkey, &keyval)) {
- ret = dict_set_str (svc->options, srchkey, keyval);
- if (ret < 0) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR,
- "dict_set_str error");
- GF_FREE (srchkey);
- return (-1);
- }
- }
-
- GF_FREE (srchkey);
- volentry = volentry->next;
+ }
}
- /* Reconfigure the volume specific rpc-auth.addr reject part */
- volentry = xlator->children;
- while (volentry) {
- ret = gf_asprintf (&srchkey, "rpc-auth.addr.%s.reject",
- volentry->xlator->name);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
- return (-1);
- }
-
- /* key-string: rpc-auth.addr.<volname>.reject
- *
- * IMP: Delete the OLD key/value pair from dict.
- * And set the NEW key/value pair IFF the option is SET
- * in reconfigured volfile.
- *
- * NB: No default value for reject key.
- */
- dict_del (svc->options, srchkey);
- if (!dict_get_str (options, srchkey, &keyval)) {
- ret = dict_set_str (svc->options, srchkey, keyval);
- if (ret < 0) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR,
- "dict_set_str error");
- GF_FREE (srchkey);
- return (-1);
- }
- }
+ GF_FREE(srchkey);
+ volentry = volentry->next;
+ }
- GF_FREE (srchkey);
- volentry = volentry->next;
+ /* Reconfigure the volume specific rpc-auth.addr reject part */
+ volentry = xlator->children;
+ while (volentry) {
+ ret = gf_asprintf(&srchkey, "rpc-auth.addr.%s.reject",
+ volentry->xlator->name);
+ if (ret == -1) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ return (-1);
}
- ret = rpcsvc_init_options (svc, options);
- if (ret)
+ /* key-string: rpc-auth.addr.<volname>.reject
+ *
+ * IMP: Delete the OLD key/value pair from dict.
+ * And set the NEW key/value pair IFF the option is SET
+ * in reconfigured volfile.
+ *
+ * NB: No default value for reject key.
+ */
+ dict_del(svc->options, srchkey);
+ if (!dict_get_str(options, srchkey, &keyval)) {
+ ret = dict_set_str(svc->options, srchkey, keyval);
+ if (ret < 0) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR, "dict_set_str error");
+ GF_FREE(srchkey);
return (-1);
+ }
+ }
- return rpcsvc_auth_reconf (svc, options);
+ GF_FREE(srchkey);
+ volentry = volentry->next;
+ }
+
+ ret = rpcsvc_init_options(svc, options);
+ if (ret)
+ return (-1);
+
+ return rpcsvc_auth_reconf(svc, options);
}
int
-rpcsvc_transport_unix_options_build (dict_t **options, char *filepath)
+rpcsvc_transport_unix_options_build(dict_t **options, char *filepath)
{
- dict_t *dict = NULL;
- char *fpath = NULL;
- int ret = -1;
+ dict_t *dict = NULL;
+ char *fpath = NULL;
+ int ret = -1;
- GF_ASSERT (filepath);
- GF_ASSERT (options);
+ GF_ASSERT(filepath);
+ GF_ASSERT(options);
- dict = dict_new ();
- if (!dict)
- goto out;
+ dict = dict_new();
+ if (!dict)
+ goto out;
- fpath = gf_strdup (filepath);
- if (!fpath) {
- ret = -1;
- goto out;
- }
+ fpath = gf_strdup(filepath);
+ if (!fpath) {
+ ret = -1;
+ goto out;
+ }
- ret = dict_set_dynstr (dict, "transport.socket.listen-path", fpath);
- if (ret)
- goto out;
+ ret = dict_set_dynstr(dict, "transport.socket.listen-path", fpath);
+ if (ret)
+ goto out;
- ret = dict_set_str (dict, "transport.address-family", "unix");
- if (ret)
- goto out;
+ ret = dict_set_str(dict, "transport.address-family", "unix");
+ if (ret)
+ goto out;
- ret = dict_set_str (dict, "transport.socket.nodelay", "off");
- if (ret)
- goto out;
+ ret = dict_set_str(dict, "transport.socket.nodelay", "off");
+ if (ret)
+ goto out;
- ret = dict_set_str (dict, "transport-type", "socket");
- if (ret)
- goto out;
+ ret = dict_set_str(dict, "transport-type", "socket");
+ if (ret)
+ goto out;
- *options = dict;
+ *options = dict;
out:
- if (ret) {
- GF_FREE (fpath);
- if (dict)
- dict_unref (dict);
- }
- return ret;
+ if (ret) {
+ GF_FREE(fpath);
+ if (dict)
+ dict_unref(dict);
+ }
+ return ret;
}
/*
@@ -2460,42 +2472,42 @@ out:
* NB: defval or set-value "0" is special which means unlimited/65536.
*/
int
-rpcsvc_set_outstanding_rpc_limit (rpcsvc_t *svc, dict_t *options, int defvalue)
+rpcsvc_set_outstanding_rpc_limit(rpcsvc_t *svc, dict_t *options, int defvalue)
{
- int ret = -1; /* FAILURE */
- int rpclim = 0;
- static char *rpclimkey = "rpc.outstanding-rpc-limit";
+ int ret = -1; /* FAILURE */
+ int rpclim = 0;
+ static char *rpclimkey = "rpc.outstanding-rpc-limit";
- if ((!svc) || (!options))
- return (-1);
+ if ((!svc) || (!options))
+ return (-1);
- if ((defvalue < RPCSVC_MIN_OUTSTANDING_RPC_LIMIT) ||
- (defvalue > RPCSVC_MAX_OUTSTANDING_RPC_LIMIT)) {
- return (-1);
- }
+ if ((defvalue < RPCSVC_MIN_OUTSTANDING_RPC_LIMIT) ||
+ (defvalue > RPCSVC_MAX_OUTSTANDING_RPC_LIMIT)) {
+ return (-1);
+ }
- /* Fetch the rpc.outstanding-rpc-limit from dict. */
- ret = dict_get_int32 (options, rpclimkey, &rpclim);
- if (ret < 0) {
- /* Fall back to default for FAILURE */
- rpclim = defvalue;
- }
+ /* Fetch the rpc.outstanding-rpc-limit from dict. */
+ ret = dict_get_int32(options, rpclimkey, &rpclim);
+ if (ret < 0) {
+ /* Fall back to default for FAILURE */
+ rpclim = defvalue;
+ }
- /* Round up to multiple-of-8. It must not exceed
- * RPCSVC_MAX_OUTSTANDING_RPC_LIMIT.
- */
- rpclim = ((rpclim + 8 - 1) >> 3) * 8;
- if (rpclim > RPCSVC_MAX_OUTSTANDING_RPC_LIMIT) {
- rpclim = RPCSVC_MAX_OUTSTANDING_RPC_LIMIT;
- }
+ /* Round up to multiple-of-8. It must not exceed
+ * RPCSVC_MAX_OUTSTANDING_RPC_LIMIT.
+ */
+ rpclim = ((rpclim + 8 - 1) >> 3) * 8;
+ if (rpclim > RPCSVC_MAX_OUTSTANDING_RPC_LIMIT) {
+ rpclim = RPCSVC_MAX_OUTSTANDING_RPC_LIMIT;
+ }
- if (svc->outstanding_rpc_limit != rpclim) {
- svc->outstanding_rpc_limit = rpclim;
- gf_log (GF_RPCSVC, GF_LOG_INFO,
- "Configured %s with value %d", rpclimkey, rpclim);
- }
+ if (svc->outstanding_rpc_limit != rpclim) {
+ svc->outstanding_rpc_limit = rpclim;
+ gf_log(GF_RPCSVC, GF_LOG_INFO, "Configured %s with value %d", rpclimkey,
+ rpclim);
+ }
- return (0);
+ return (0);
}
/*
@@ -2503,15 +2515,14 @@ rpcsvc_set_outstanding_rpc_limit (rpcsvc_t *svc, dict_t *options, int defvalue)
* Returns 0 on success, -1 otherwise.
*/
int
-rpcsvc_set_throttle_on (rpcsvc_t *svc)
+rpcsvc_set_throttle_on(rpcsvc_t *svc)
{
+ if (!svc)
+ return -1;
- if (!svc)
- return -1;
-
- svc->throttle = _gf_true;
+ svc->throttle = _gf_true;
- return 0;
+ return 0;
}
/*
@@ -2519,15 +2530,14 @@ rpcsvc_set_throttle_on (rpcsvc_t *svc)
* Returns 0 on success, -1 otherwise.
*/
int
-rpcsvc_set_throttle_off (rpcsvc_t *svc)
+rpcsvc_set_throttle_off(rpcsvc_t *svc)
{
+ if (!svc)
+ return -1;
- if (!svc)
- return -1;
+ svc->throttle = _gf_false;
- svc->throttle = _gf_false;
-
- return 0;
+ return 0;
}
/*
@@ -2535,213 +2545,205 @@ rpcsvc_set_throttle_off (rpcsvc_t *svc)
* Returns value of attribute throttle on success, _gf_false otherwise.
*/
gf_boolean_t
-rpcsvc_get_throttle (rpcsvc_t *svc)
+rpcsvc_get_throttle(rpcsvc_t *svc)
{
+ if (!svc)
+ return _gf_false;
- if (!svc)
- return _gf_false;
-
- return svc->throttle;
+ return svc->throttle;
}
/* The global RPC service initializer.
*/
rpcsvc_t *
-rpcsvc_init (xlator_t *xl, glusterfs_ctx_t *ctx, dict_t *options,
- uint32_t poolcount)
+rpcsvc_init(xlator_t *xl, glusterfs_ctx_t *ctx, dict_t *options,
+ uint32_t poolcount)
{
- rpcsvc_t *svc = NULL;
- int ret = -1;
-
- if ((!xl) || (!ctx) || (!options))
- return NULL;
-
- svc = GF_CALLOC (1, sizeof (*svc), gf_common_mt_rpcsvc_t);
- if (!svc)
- return NULL;
-
- pthread_rwlock_init (&svc->rpclock, NULL);
- INIT_LIST_HEAD (&svc->authschemes);
- INIT_LIST_HEAD (&svc->notify);
- INIT_LIST_HEAD (&svc->listeners);
- INIT_LIST_HEAD (&svc->programs);
+ rpcsvc_t *svc = NULL;
+ int ret = -1;
- ret = rpcsvc_init_options (svc, options);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to init options");
- goto free_svc;
- }
-
- if (!poolcount)
- poolcount = RPCSVC_POOLCOUNT_MULT * svc->memfactor;
-
- gf_log (GF_RPCSVC, GF_LOG_TRACE, "rx pool: %d", poolcount);
- svc->rxpool = mem_pool_new (rpcsvc_request_t, poolcount);
- /* TODO: leak */
- if (!svc->rxpool) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "mem pool allocation failed");
- goto free_svc;
- }
-
- ret = rpcsvc_auth_init (svc, options);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to init "
- "authentication");
- goto free_svc;
- }
-
- ret = -1;
- svc->options = options;
- svc->ctx = ctx;
- svc->xl = xl;
- gf_log (GF_RPCSVC, GF_LOG_DEBUG, "RPC service inited.");
-
- gluster_dump_prog.options = options;
+ if ((!xl) || (!ctx) || (!options))
+ return NULL;
- ret = rpcsvc_program_register (svc, &gluster_dump_prog, _gf_false);
- if (ret) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR,
- "failed to register DUMP program");
- goto free_svc;
- }
+ svc = GF_CALLOC(1, sizeof(*svc), gf_common_mt_rpcsvc_t);
+ if (!svc)
+ return NULL;
- ret = 0;
+ pthread_rwlock_init(&svc->rpclock, NULL);
+ INIT_LIST_HEAD(&svc->authschemes);
+ INIT_LIST_HEAD(&svc->notify);
+ INIT_LIST_HEAD(&svc->listeners);
+ INIT_LIST_HEAD(&svc->programs);
+
+ ret = rpcsvc_init_options(svc, options);
+ if (ret == -1) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR, "Failed to init options");
+ goto free_svc;
+ }
+
+ if (!poolcount)
+ poolcount = RPCSVC_POOLCOUNT_MULT * svc->memfactor;
+
+ gf_log(GF_RPCSVC, GF_LOG_TRACE, "rx pool: %d", poolcount);
+ svc->rxpool = mem_pool_new(rpcsvc_request_t, poolcount);
+ /* TODO: leak */
+ if (!svc->rxpool) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR, "mem pool allocation failed");
+ goto free_svc;
+ }
+
+ ret = rpcsvc_auth_init(svc, options);
+ if (ret == -1) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR,
+ "Failed to init "
+ "authentication");
+ goto free_svc;
+ }
+
+ ret = -1;
+ svc->options = options;
+ svc->ctx = ctx;
+ svc->xl = xl;
+ gf_log(GF_RPCSVC, GF_LOG_DEBUG, "RPC service inited.");
+
+ gluster_dump_prog.options = options;
+
+ ret = rpcsvc_program_register(svc, &gluster_dump_prog, _gf_false);
+ if (ret) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR, "failed to register DUMP program");
+ goto free_svc;
+ }
+
+ ret = 0;
free_svc:
- if (ret == -1) {
- GF_FREE (svc);
- svc = NULL;
- }
+ if (ret == -1) {
+ GF_FREE(svc);
+ svc = NULL;
+ }
- return svc;
+ return svc;
}
-
int
-rpcsvc_transport_peer_check_search (dict_t *options, char *pattern,
- char *ip, char *hostname)
+rpcsvc_transport_peer_check_search(dict_t *options, char *pattern, char *ip,
+ char *hostname)
{
- int ret = -1;
- char *addrtok = NULL;
- char *addrstr = NULL;
- char *dup_addrstr = NULL;
- char *svptr = NULL;
+ int ret = -1;
+ char *addrtok = NULL;
+ char *addrstr = NULL;
+ char *dup_addrstr = NULL;
+ char *svptr = NULL;
- if ((!options) || (!ip))
- return -1;
+ if ((!options) || (!ip))
+ return -1;
- ret = dict_get_str (options, pattern, &addrstr);
- if (ret < 0) {
- ret = -1;
- goto err;
- }
-
- if (!addrstr) {
- ret = -1;
- goto err;
- }
+ ret = dict_get_str(options, pattern, &addrstr);
+ if (ret < 0) {
+ ret = -1;
+ goto err;
+ }
- dup_addrstr = gf_strdup (addrstr);
- addrtok = strtok_r (dup_addrstr, ",", &svptr);
- while (addrtok) {
+ if (!addrstr) {
+ ret = -1;
+ goto err;
+ }
- /* CASEFOLD not present on Solaris */
+ dup_addrstr = gf_strdup(addrstr);
+ addrtok = strtok_r(dup_addrstr, ",", &svptr);
+ while (addrtok) {
+ /* CASEFOLD not present on Solaris */
#ifdef FNM_CASEFOLD
- ret = fnmatch (addrtok, ip, FNM_CASEFOLD);
+ ret = fnmatch(addrtok, ip, FNM_CASEFOLD);
#else
- ret = fnmatch (addrtok, ip, 0);
+ ret = fnmatch(addrtok, ip, 0);
#endif
- if (ret == 0)
- goto err;
+ if (ret == 0)
+ goto err;
- /* compare hostnames if applicable */
- if (hostname) {
+ /* compare hostnames if applicable */
+ if (hostname) {
#ifdef FNM_CASEFOLD
- ret = fnmatch (addrtok, hostname, FNM_CASEFOLD);
+ ret = fnmatch(addrtok, hostname, FNM_CASEFOLD);
#else
- ret = fnmatch (addrtok, hostname, 0);
+ ret = fnmatch(addrtok, hostname, 0);
#endif
- if (ret == 0)
- goto err;
- }
-
- /* Compare IPv4 subnetwork, TODO: IPv6 subnet support */
- if (strchr (addrtok, '/')) {
- ret = rpcsvc_match_subnet_v4 (addrtok, ip);
- if (ret == 0)
- goto err;
- }
+ if (ret == 0)
+ goto err;
+ }
- addrtok = strtok_r (NULL, ",", &svptr);
+ /* Compare IPv4 subnetwork, TODO: IPv6 subnet support */
+ if (strchr(addrtok, '/')) {
+ ret = rpcsvc_match_subnet_v4(addrtok, ip);
+ if (ret == 0)
+ goto err;
}
- ret = -1;
+ addrtok = strtok_r(NULL, ",", &svptr);
+ }
+
+ ret = -1;
err:
- GF_FREE (dup_addrstr);
+ GF_FREE(dup_addrstr);
- return ret;
+ return ret;
}
-
static int
-rpcsvc_transport_peer_check_allow (dict_t *options, char *volname,
- char *ip, char *hostname)
+rpcsvc_transport_peer_check_allow(dict_t *options, char *volname, char *ip,
+ char *hostname)
{
- int ret = RPCSVC_AUTH_DONTCARE;
- char *srchstr = NULL;
+ int ret = RPCSVC_AUTH_DONTCARE;
+ char *srchstr = NULL;
- if ((!options) || (!ip) || (!volname))
- return ret;
+ if ((!options) || (!ip) || (!volname))
+ return ret;
- ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
- ret = RPCSVC_AUTH_DONTCARE;
- goto out;
- }
+ ret = gf_asprintf(&srchstr, "rpc-auth.addr.%s.allow", volname);
+ if (ret == -1) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ ret = RPCSVC_AUTH_DONTCARE;
+ goto out;
+ }
- ret = rpcsvc_transport_peer_check_search (options, srchstr,
- ip, hostname);
- GF_FREE (srchstr);
+ ret = rpcsvc_transport_peer_check_search(options, srchstr, ip, hostname);
+ GF_FREE(srchstr);
- if (ret == 0)
- ret = RPCSVC_AUTH_ACCEPT;
- else
- ret = RPCSVC_AUTH_REJECT;
+ if (ret == 0)
+ ret = RPCSVC_AUTH_ACCEPT;
+ else
+ ret = RPCSVC_AUTH_REJECT;
out:
- return ret;
+ return ret;
}
static int
-rpcsvc_transport_peer_check_reject (dict_t *options, char *volname,
- char *ip, char *hostname)
+rpcsvc_transport_peer_check_reject(dict_t *options, char *volname, char *ip,
+ char *hostname)
{
- int ret = RPCSVC_AUTH_DONTCARE;
- char *srchstr = NULL;
+ int ret = RPCSVC_AUTH_DONTCARE;
+ char *srchstr = NULL;
- if ((!options) || (!ip) || (!volname))
- return ret;
+ if ((!options) || (!ip) || (!volname))
+ return ret;
- ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.reject",
- volname);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
- ret = RPCSVC_AUTH_REJECT;
- goto out;
- }
+ ret = gf_asprintf(&srchstr, "rpc-auth.addr.%s.reject", volname);
+ if (ret == -1) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ ret = RPCSVC_AUTH_REJECT;
+ goto out;
+ }
- ret = rpcsvc_transport_peer_check_search (options, srchstr,
- ip, hostname);
- GF_FREE (srchstr);
+ ret = rpcsvc_transport_peer_check_search(options, srchstr, ip, hostname);
+ GF_FREE(srchstr);
- if (ret == 0)
- ret = RPCSVC_AUTH_REJECT;
- else
- ret = RPCSVC_AUTH_DONTCARE;
+ if (ret == 0)
+ ret = RPCSVC_AUTH_REJECT;
+ else
+ ret = RPCSVC_AUTH_DONTCARE;
out:
- return ret;
+ return ret;
}
-
/* Combines rpc auth's allow and reject options.
* Order of checks is important.
* First, REJECT if either rejects.
@@ -2749,188 +2751,188 @@ out:
* If neither accepts, DONTCARE
*/
int
-rpcsvc_combine_allow_reject_volume_check (int allow, int reject)
+rpcsvc_combine_allow_reject_volume_check(int allow, int reject)
{
- if (allow == RPCSVC_AUTH_REJECT ||
- reject == RPCSVC_AUTH_REJECT)
- return RPCSVC_AUTH_REJECT;
+ if (allow == RPCSVC_AUTH_REJECT || reject == RPCSVC_AUTH_REJECT)
+ return RPCSVC_AUTH_REJECT;
- if (allow == RPCSVC_AUTH_ACCEPT ||
- reject == RPCSVC_AUTH_ACCEPT)
- return RPCSVC_AUTH_ACCEPT;
+ if (allow == RPCSVC_AUTH_ACCEPT || reject == RPCSVC_AUTH_ACCEPT)
+ return RPCSVC_AUTH_ACCEPT;
- return RPCSVC_AUTH_DONTCARE;
+ return RPCSVC_AUTH_DONTCARE;
}
int
-rpcsvc_auth_check (rpcsvc_t *svc, char *volname, char *ipaddr)
-{
- int ret = RPCSVC_AUTH_REJECT;
- int accept = RPCSVC_AUTH_REJECT;
- int reject = RPCSVC_AUTH_REJECT;
- char *hostname = NULL;
- char *allow_str = NULL;
- char *reject_str = NULL;
- char *srchstr = NULL;
- dict_t *options = NULL;
-
- if (!svc || !volname || !ipaddr)
- return ret;
-
- /* Fetch the options from svc struct and validate */
- options = svc->options;
- if (!options)
- return ret;
-
- /* Accept if its the default case: Allow all, Reject none
- * The default volfile always contains a 'allow *' rule
- * for each volume. If allow rule is missing (which implies
- * there is some bad volfile generating code doing this), we
- * assume no one is allowed mounts, and thus, we reject mounts.
- */
- ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
- return RPCSVC_AUTH_REJECT;
- }
-
- ret = dict_get_str (options, srchstr, &allow_str);
- GF_FREE (srchstr);
- if (ret < 0)
- return RPCSVC_AUTH_REJECT;
-
- ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.reject", volname);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
- return RPCSVC_AUTH_REJECT;
- }
-
- ret = dict_get_str (options, srchstr, &reject_str);
- GF_FREE (srchstr);
+rpcsvc_auth_check(rpcsvc_t *svc, char *volname, char *ipaddr)
+{
+ int ret = RPCSVC_AUTH_REJECT;
+ int accept = RPCSVC_AUTH_REJECT;
+ int reject = RPCSVC_AUTH_REJECT;
+ char *hostname = NULL;
+ char *allow_str = NULL;
+ char *reject_str = NULL;
+ char *srchstr = NULL;
+ dict_t *options = NULL;
+
+ if (!svc || !volname || !ipaddr)
+ return ret;
- /*
- * If "reject_str" is being set as '*' (anonymous), then NFS-server
- * would reject everything. If the "reject_str" is not set and
- * "allow_str" is set as '*' (anonymous), then NFS-server would
- * accept mount requests from all clients.
- */
- if (reject_str != NULL) {
- if (!strcmp ("*", reject_str))
- return RPCSVC_AUTH_REJECT;
- } else {
- if (!strcmp ("*", allow_str))
- return RPCSVC_AUTH_ACCEPT;
- }
+ /* Fetch the options from svc struct and validate */
+ options = svc->options;
+ if (!options)
+ return ret;
- /* addr-namelookup check */
- if (svc->addr_namelookup == _gf_true) {
- ret = gf_get_hostname_from_ip (ipaddr, &hostname);
- if (ret) {
- if (hostname)
- GF_FREE (hostname);
- /* failed to get hostname, but hostname auth
- * is enabled, so authentication will not be
- * 100% correct. reject mounts
- */
- return RPCSVC_AUTH_REJECT;
- }
+ /* Accept if its the default case: Allow all, Reject none
+ * The default volfile always contains a 'allow *' rule
+ * for each volume. If allow rule is missing (which implies
+ * there is some bad volfile generating code doing this), we
+ * assume no one is allowed mounts, and thus, we reject mounts.
+ */
+ ret = gf_asprintf(&srchstr, "rpc-auth.addr.%s.allow", volname);
+ if (ret == -1) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ return RPCSVC_AUTH_REJECT;
+ }
+
+ ret = dict_get_str(options, srchstr, &allow_str);
+ GF_FREE(srchstr);
+ if (ret < 0)
+ return RPCSVC_AUTH_REJECT;
+
+ ret = gf_asprintf(&srchstr, "rpc-auth.addr.%s.reject", volname);
+ if (ret == -1) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ return RPCSVC_AUTH_REJECT;
+ }
+
+ ret = dict_get_str(options, srchstr, &reject_str);
+ GF_FREE(srchstr);
+
+ /*
+ * If "reject_str" is being set as '*' (anonymous), then NFS-server
+ * would reject everything. If the "reject_str" is not set and
+ * "allow_str" is set as '*' (anonymous), then NFS-server would
+ * accept mount requests from all clients.
+ */
+ if (reject_str != NULL) {
+ if (!strcmp("*", reject_str))
+ return RPCSVC_AUTH_REJECT;
+ } else {
+ if (!strcmp("*", allow_str))
+ return RPCSVC_AUTH_ACCEPT;
+ }
+
+ /* addr-namelookup check */
+ if (svc->addr_namelookup == _gf_true) {
+ ret = gf_get_hostname_from_ip(ipaddr, &hostname);
+ if (ret) {
+ if (hostname)
+ GF_FREE(hostname);
+ /* failed to get hostname, but hostname auth
+ * is enabled, so authentication will not be
+ * 100% correct. reject mounts
+ */
+ return RPCSVC_AUTH_REJECT;
}
+ }
- accept = rpcsvc_transport_peer_check_allow (options, volname,
- ipaddr, hostname);
+ accept = rpcsvc_transport_peer_check_allow(options, volname, ipaddr,
+ hostname);
- reject = rpcsvc_transport_peer_check_reject (options, volname,
- ipaddr, hostname);
+ reject = rpcsvc_transport_peer_check_reject(options, volname, ipaddr,
+ hostname);
- if (hostname)
- GF_FREE (hostname);
- return rpcsvc_combine_allow_reject_volume_check (accept, reject);
+ if (hostname)
+ GF_FREE(hostname);
+ return rpcsvc_combine_allow_reject_volume_check(accept, reject);
}
int
-rpcsvc_transport_privport_check (rpcsvc_t *svc, char *volname, uint16_t port)
+rpcsvc_transport_privport_check(rpcsvc_t *svc, char *volname, uint16_t port)
{
- int ret = RPCSVC_AUTH_REJECT;
- char *srchstr = NULL;
- char *valstr = NULL;
- gf_boolean_t insecure = _gf_false;
-
- if ((!svc) || (!volname))
- return ret;
-
- gf_log (GF_RPCSVC, GF_LOG_TRACE, "Client port: %d", (int)port);
- /* If the port is already a privileged one, don't bother with checking
- * options.
- */
- if (port <= 1024) {
- ret = RPCSVC_AUTH_ACCEPT;
- goto err;
- }
-
- /* Disabled by default */
- ret = gf_asprintf (&srchstr, "rpc-auth.ports.%s.insecure", volname);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
- ret = RPCSVC_AUTH_REJECT;
- goto err;
- }
+ int ret = RPCSVC_AUTH_REJECT;
+ char *srchstr = NULL;
+ char *valstr = NULL;
+ gf_boolean_t insecure = _gf_false;
- ret = dict_get_str (svc->options, srchstr, &valstr);
- if (ret) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to"
- " read rpc-auth.ports.insecure value");
- goto err;
- }
-
- ret = gf_string2boolean (valstr, &insecure);
- if (ret) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to"
- " convert rpc-auth.ports.insecure value");
- goto err;
- }
+ if ((!svc) || (!volname))
+ return ret;
- ret = insecure ? RPCSVC_AUTH_ACCEPT : RPCSVC_AUTH_REJECT;
-
- if (ret == RPCSVC_AUTH_ACCEPT)
- gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Unprivileged port allowed");
- else
- gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Unprivileged port not"
- " allowed");
+ gf_log(GF_RPCSVC, GF_LOG_TRACE, "Client port: %d", (int)port);
+ /* If the port is already a privileged one, don't bother with checking
+ * options.
+ */
+ if (port <= 1024) {
+ ret = RPCSVC_AUTH_ACCEPT;
+ goto err;
+ }
+
+ /* Disabled by default */
+ ret = gf_asprintf(&srchstr, "rpc-auth.ports.%s.insecure", volname);
+ if (ret == -1) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ ret = RPCSVC_AUTH_REJECT;
+ goto err;
+ }
+
+ ret = dict_get_str(svc->options, srchstr, &valstr);
+ if (ret) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR,
+ "Failed to"
+ " read rpc-auth.ports.insecure value");
+ goto err;
+ }
+
+ ret = gf_string2boolean(valstr, &insecure);
+ if (ret) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR,
+ "Failed to"
+ " convert rpc-auth.ports.insecure value");
+ goto err;
+ }
+
+ ret = insecure ? RPCSVC_AUTH_ACCEPT : RPCSVC_AUTH_REJECT;
+
+ if (ret == RPCSVC_AUTH_ACCEPT)
+ gf_log(GF_RPCSVC, GF_LOG_DEBUG, "Unprivileged port allowed");
+ else
+ gf_log(GF_RPCSVC, GF_LOG_DEBUG,
+ "Unprivileged port not"
+ " allowed");
err:
- if (srchstr)
- GF_FREE (srchstr);
+ if (srchstr)
+ GF_FREE(srchstr);
- return ret;
+ return ret;
}
-
char *
-rpcsvc_volume_allowed (dict_t *options, char *volname)
+rpcsvc_volume_allowed(dict_t *options, char *volname)
{
- char globalrule[] = "rpc-auth.addr.allow";
- char *srchstr = NULL;
- char *addrstr = NULL;
- int ret = -1;
+ char globalrule[] = "rpc-auth.addr.allow";
+ char *srchstr = NULL;
+ char *addrstr = NULL;
+ int ret = -1;
- if ((!options) || (!volname))
- return NULL;
+ if ((!options) || (!volname))
+ return NULL;
- ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname);
- if (ret == -1) {
- gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
- goto out;
- }
+ ret = gf_asprintf(&srchstr, "rpc-auth.addr.%s.allow", volname);
+ if (ret == -1) {
+ gf_log(GF_RPCSVC, GF_LOG_ERROR, "asprintf failed");
+ goto out;
+ }
- if (!dict_get (options, srchstr))
- ret = dict_get_str (options, globalrule, &addrstr);
- else
- ret = dict_get_str (options, srchstr, &addrstr);
+ if (!dict_get(options, srchstr))
+ ret = dict_get_str(options, globalrule, &addrstr);
+ else
+ ret = dict_get_str(options, srchstr, &addrstr);
out:
- GF_FREE (srchstr);
+ GF_FREE(srchstr);
- return addrstr;
+ return addrstr;
}
/*
@@ -2944,56 +2946,61 @@ out:
* as it's already being done at the time of CLI SET.
*/
static int
-rpcsvc_match_subnet_v4 (const char *addrtok, const char *ipaddr)
-{
- char *slash = NULL;
- char *netaddr = NULL;
- int ret = -1;
- uint32_t prefixlen = 0;
- uint32_t shift = 0;
- struct sockaddr_in sin1 = {0, };
- struct sockaddr_in sin2 = {0, };
- struct sockaddr_in mask = {0, };
-
- /* Copy the input */
- netaddr = gf_strdup (addrtok);
- if (netaddr == NULL) /* ENOMEM */
- goto out;
-
- /* Find the network socket addr of target */
- if (inet_pton (AF_INET, ipaddr, &sin1.sin_addr) == 0)
- goto out;
-
- /* Find the network socket addr of subnet pattern */
- if (inet_pton (AF_INET, netaddr, &sin2.sin_addr) == 0)
- goto out;
-
- slash = strchr (netaddr, '/');
- if (slash) {
- *slash = '\0';
- /*
- * Find the IPv4 network mask in network byte order.
- * IMP: String slash+1 is already validated, it can't have value
- * more than IPv4_ADDR_SIZE (32).
- */
- prefixlen = (uint32_t) atoi (slash + 1);
- if (prefixlen > 31)
- goto out;
- } else {
- goto out;
- }
-
- shift = IPv4_ADDR_SIZE - prefixlen;
- mask.sin_addr.s_addr = htonl ((uint32_t)~0 << shift);
-
- if (mask_match (sin1.sin_addr.s_addr,
- sin2.sin_addr.s_addr,
- mask.sin_addr.s_addr)) {
- ret = 0; /* SUCCESS */
- }
+rpcsvc_match_subnet_v4(const char *addrtok, const char *ipaddr)
+{
+ char *slash = NULL;
+ char *netaddr = NULL;
+ int ret = -1;
+ uint32_t prefixlen = 0;
+ uint32_t shift = 0;
+ struct sockaddr_in sin1 = {
+ 0,
+ };
+ struct sockaddr_in sin2 = {
+ 0,
+ };
+ struct sockaddr_in mask = {
+ 0,
+ };
+
+ /* Copy the input */
+ netaddr = gf_strdup(addrtok);
+ if (netaddr == NULL) /* ENOMEM */
+ goto out;
+
+ /* Find the network socket addr of target */
+ if (inet_pton(AF_INET, ipaddr, &sin1.sin_addr) == 0)
+ goto out;
+
+ /* Find the network socket addr of subnet pattern */
+ if (inet_pton(AF_INET, netaddr, &sin2.sin_addr) == 0)
+ goto out;
+
+ slash = strchr(netaddr, '/');
+ if (slash) {
+ *slash = '\0';
+ /*
+ * Find the IPv4 network mask in network byte order.
+ * IMP: String slash+1 is already validated, it can't have value
+ * more than IPv4_ADDR_SIZE (32).
+ */
+ prefixlen = (uint32_t)atoi(slash + 1);
+ if (prefixlen > 31)
+ goto out;
+ } else {
+ goto out;
+ }
+
+ shift = IPv4_ADDR_SIZE - prefixlen;
+ mask.sin_addr.s_addr = htonl((uint32_t)~0 << shift);
+
+ if (mask_match(sin1.sin_addr.s_addr, sin2.sin_addr.s_addr,
+ mask.sin_addr.s_addr)) {
+ ret = 0; /* SUCCESS */
+ }
out:
- GF_FREE (netaddr);
- return ret;
+ GF_FREE(netaddr);
+ return ret;
}
/* During reconfigure, Make sure to call this function after event-threads are
@@ -3001,45 +3008,43 @@ out:
*/
int
-rpcsvc_ownthread_reconf (rpcsvc_t *svc, int new_eventthreadcount)
+rpcsvc_ownthread_reconf(rpcsvc_t *svc, int new_eventthreadcount)
{
- int ret = -1;
- rpcsvc_program_t *program = NULL;
+ int ret = -1;
+ rpcsvc_program_t *program = NULL;
- if (!svc) {
- ret = 0;
- goto out;
- }
+ if (!svc) {
+ ret = 0;
+ goto out;
+ }
- pthread_rwlock_wrlock (&svc->rpclock);
+ pthread_rwlock_wrlock(&svc->rpclock);
+ {
+ list_for_each_entry(program, &svc->programs, program)
{
- list_for_each_entry (program, &svc->programs, program) {
- if (program->ownthread) {
- program->eventthreadcount =
- new_eventthreadcount;
- rpcsvc_spawn_threads (svc, program);
- }
- }
+ if (program->ownthread) {
+ program->eventthreadcount = new_eventthreadcount;
+ rpcsvc_spawn_threads(svc, program);
+ }
}
- pthread_rwlock_unlock (&svc->rpclock);
+ }
+ pthread_rwlock_unlock(&svc->rpclock);
- ret = 0;
+ ret = 0;
out:
- return ret;
+ return ret;
}
-
rpcsvc_actor_t gluster_dump_actors[GF_DUMP_MAXVALUE] = {
- [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0, DRC_NA},
- [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0, DRC_NA},
- [GF_DUMP_PING] = {"PING", GF_DUMP_PING, rpcsvc_ping, NULL, 0, DRC_NA},
+ [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0, DRC_NA},
+ [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0, DRC_NA},
+ [GF_DUMP_PING] = {"PING", GF_DUMP_PING, rpcsvc_ping, NULL, 0, DRC_NA},
};
-
struct rpcsvc_program gluster_dump_prog = {
- .progname = "GF-DUMP",
- .prognum = GLUSTER_DUMP_PROGRAM,
- .progver = GLUSTER_DUMP_VERSION,
- .actors = gluster_dump_actors,
- .numactors = GF_DUMP_MAXVALUE,
+ .progname = "GF-DUMP",
+ .prognum = GLUSTER_DUMP_PROGRAM,
+ .progver = GLUSTER_DUMP_VERSION,
+ .actors = gluster_dump_actors,
+ .numactors = GF_DUMP_MAXVALUE,
};