diff options
Diffstat (limited to 'rpc/rpc-lib')
| -rw-r--r-- | rpc/rpc-lib/Makefile.am | 1 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/Makefile.am | 15 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/auth-glusterfs.c | 199 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/auth-null.c | 70 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/auth-unix.c | 90 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 1283 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.h | 166 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-transport.c | 1301 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-transport.h | 288 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc-auth.c | 409 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc-common.h | 83 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.c | 2015 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpcsvc.h | 582 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/xdr-common.h | 68 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/xdr-rpc.c | 193 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/xdr-rpc.h | 74 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/xdr-rpcclnt.c | 131 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/xdr-rpcclnt.h | 51 | 
18 files changed, 7019 insertions, 0 deletions
diff --git a/rpc/rpc-lib/Makefile.am b/rpc/rpc-lib/Makefile.am new file mode 100644 index 00000000000..af437a64d6d --- /dev/null +++ b/rpc/rpc-lib/Makefile.am @@ -0,0 +1 @@ +SUBDIRS = src diff --git a/rpc/rpc-lib/src/Makefile.am b/rpc/rpc-lib/src/Makefile.am new file mode 100644 index 00000000000..4df8888a08d --- /dev/null +++ b/rpc/rpc-lib/src/Makefile.am @@ -0,0 +1,15 @@ +lib_LTLIBRARIES = libgfrpc.la +libgfrpc_la_LDFLAGS = -module -avoidversion + +libgfrpc_la_SOURCES = auth-unix.c rpcsvc-auth.c rpcsvc.c auth-null.c \ +	rpc-transport.c xdr-rpc.c xdr-rpcclnt.c rpc-clnt.c auth-glusterfs.c +libgfrpc_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la + +noinst_HEADERS = rpcsvc.h rpc-transport.h xdr-common.h xdr-rpc.h xdr-rpcclnt.h \ +	rpc-clnt.h rpcsvc-common.h +AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS)\ +	-I$(top_srcdir)/libglusterfs/src -shared -nostartfiles $(GF_CFLAGS) \ +	-I$(top_srcdir)/xlators/protocol/lib/src \ +	-DRPC_TRANSPORTDIR=\"$(libdir)/glusterfs/$(PACKAGE_VERSION)/rpc-transport\" + +CLEANFILES = *~ diff --git a/rpc/rpc-lib/src/auth-glusterfs.c b/rpc/rpc-lib/src/auth-glusterfs.c new file mode 100644 index 00000000000..e248bf142aa --- /dev/null +++ b/rpc/rpc-lib/src/auth-glusterfs.c @@ -0,0 +1,199 @@ +/* +  Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + + + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "rpcsvc.h" +#include "list.h" +#include "dict.h" +#include "xdr-rpc.h" +#include "xdr-common.h" + +bool_t +xdr_auth_glusterfs_parms (XDR *xdrs, auth_glusterfs_parms *objp) +{ +	register int32_t *buf; + +	int i; + +	if (xdrs->x_op == XDR_ENCODE) { +		 if (!xdr_u_quad_t (xdrs, &objp->lk_owner)) +			 return FALSE; +		buf = XDR_INLINE (xdrs, (4 +  16 )* BYTES_PER_XDR_UNIT); +		if (buf == NULL) { +			 if (!xdr_u_int (xdrs, &objp->pid)) +				 return FALSE; +			 if (!xdr_u_int (xdrs, &objp->uid)) +				 return FALSE; +			 if (!xdr_u_int (xdrs, &objp->gid)) +				 return FALSE; +			 if (!xdr_u_int (xdrs, &objp->ngrps)) +				 return FALSE; +			 if (!xdr_vector (xdrs, (char *)objp->groups, 16, +				sizeof (u_int), (xdrproc_t) xdr_u_int)) +				 return FALSE; +		} else { +			IXDR_PUT_U_LONG(buf, objp->pid); +			IXDR_PUT_U_LONG(buf, objp->uid); +			IXDR_PUT_U_LONG(buf, objp->gid); +			IXDR_PUT_U_LONG(buf, objp->ngrps); +			{ +				register u_int *genp; + +				for (i = 0, genp = objp->groups; +					i < 16; ++i) { +					IXDR_PUT_U_LONG(buf, *genp++); +				} +			} +		} +		return TRUE; +	} else if (xdrs->x_op == XDR_DECODE) { +		 if (!xdr_u_quad_t (xdrs, &objp->lk_owner)) +			 return FALSE; +		buf = XDR_INLINE (xdrs, (4 +  16 )* BYTES_PER_XDR_UNIT); +		if (buf == NULL) { +			 if (!xdr_u_int (xdrs, &objp->pid)) +				 return FALSE; +			 if (!xdr_u_int (xdrs, &objp->uid)) +				 return FALSE; +			 if (!xdr_u_int (xdrs, &objp->gid)) +				 return FALSE; +			 if (!xdr_u_int (xdrs, &objp->ngrps)) +				 return FALSE; +			 if (!xdr_vector (xdrs, (char *)objp->groups, 16, +				sizeof (u_int), (xdrproc_t) xdr_u_int)) +				 return FALSE; +		} else { +			objp->pid = IXDR_GET_U_LONG(buf); +			objp->uid = IXDR_GET_U_LONG(buf); +			objp->gid = IXDR_GET_U_LONG(buf); +			objp->ngrps = IXDR_GET_U_LONG(buf); +			{ +				register u_int *genp; + +				for (i = 0, genp = objp->groups; +					i < 16; ++i) { +					*genp++ = IXDR_GET_U_LONG(buf); +				} +			} +		} +	 return TRUE; +	} + +	 if (!xdr_u_quad_t (xdrs, &objp->lk_owner)) +		 return FALSE; +	 if (!xdr_u_int (xdrs, &objp->pid)) +		 return FALSE; +	 if (!xdr_u_int (xdrs, &objp->uid)) +		 return FALSE; +	 if (!xdr_u_int (xdrs, &objp->gid)) +		 return FALSE; +	 if (!xdr_u_int (xdrs, &objp->ngrps)) +		 return FALSE; +	 if (!xdr_vector (xdrs, (char *)objp->groups, 16, +		sizeof (u_int), (xdrproc_t) xdr_u_int)) +		 return FALSE; +	return TRUE; +} + +ssize_t +xdr_to_glusterfs_auth (char *buf, struct auth_glusterfs_parms *req) +{ +        XDR     xdr; +        ssize_t ret = -1; + +        if ((!buf) || (!req)) +                return -1; + +        xdrmem_create (&xdr, buf, sizeof (struct auth_glusterfs_parms), +                       XDR_DECODE); +        if (!xdr_auth_glusterfs_parms (&xdr, req)) { +                ret  = -1; +                goto ret; +        } + +        ret = (((size_t)(&xdr)->x_private) - ((size_t)(&xdr)->x_base)); +ret: +        return ret; + +} +int +auth_glusterfs_request_init (rpcsvc_request_t *req, void *priv) +{ +        if (!req) +                return -1; +        memset (req->verf.authdata, 0, RPCSVC_MAX_AUTH_BYTES); +        req->verf.datalen = 0; +        req->verf.flavour = AUTH_NULL; + +        return 0; +} + +int auth_glusterfs_authenticate (rpcsvc_request_t *req, void *priv) +{ +        int                          ret = RPCSVC_AUTH_REJECT; +        struct auth_glusterfs_parms  au = {0,}; + +        if (!req) +                return ret; + +        ret = xdr_to_glusterfs_auth (req->cred.authdata, &au); +        if (ret == -1) { +                ret = RPCSVC_AUTH_REJECT; +                goto err; +        } + +        req->pid = au.pid; +        req->uid = au.uid; +        req->gid = au.gid; +        req->lk_owner = au.lk_owner; +        req->auxgidcount = au.ngrps; + +        gf_log (GF_RPCSVC, GF_LOG_TRACE, "Auth Info: pid: %u, uid: %d" +                ", gid: %d, owner: %"PRId64, +                req->pid, req->uid, req->gid, req->lk_owner); +        ret = RPCSVC_AUTH_ACCEPT; +err: +        return ret; +} + +rpcsvc_auth_ops_t auth_glusterfs_ops = { +        .conn_init              = NULL, +        .request_init           = auth_glusterfs_request_init, +        .authenticate           = auth_glusterfs_authenticate +}; + +rpcsvc_auth_t rpcsvc_auth_glusterfs = { +        .authname       = "AUTH_GLUSTERFS", +        .authnum        = AUTH_GLUSTERFS, +        .authops        = &auth_glusterfs_ops, +        .authprivate    = NULL +}; + + +rpcsvc_auth_t * +rpcsvc_auth_glusterfs_init (rpcsvc_t *svc, dict_t *options) +{ +        return &rpcsvc_auth_glusterfs; +} diff --git a/rpc/rpc-lib/src/auth-null.c b/rpc/rpc-lib/src/auth-null.c new file mode 100644 index 00000000000..a2581a1718d --- /dev/null +++ b/rpc/rpc-lib/src/auth-null.c @@ -0,0 +1,70 @@ +/* +  Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "rpcsvc.h" +#include "list.h" +#include "dict.h" + + +int +auth_null_request_init (rpcsvc_request_t *req, void *priv) +{ +        if (!req) +                return -1; + +        memset (req->cred.authdata, 0, RPCSVC_MAX_AUTH_BYTES); +        req->cred.datalen = 0; + +        memset (req->verf.authdata, 0, RPCSVC_MAX_AUTH_BYTES); +        req->verf.datalen = 0; + +        return 0; +} + +int auth_null_authenticate (rpcsvc_request_t *req, void *priv) +{ +        /* Always succeed. */ +        return RPCSVC_AUTH_ACCEPT; +} + +rpcsvc_auth_ops_t auth_null_ops = { +        .conn_init              = NULL, +        .request_init           = auth_null_request_init, +        .authenticate           = auth_null_authenticate +}; + +rpcsvc_auth_t rpcsvc_auth_null = { +        .authname       = "AUTH_NULL", +        .authnum        = AUTH_NULL, +        .authops        = &auth_null_ops, +        .authprivate    = NULL +}; + + +rpcsvc_auth_t * +rpcsvc_auth_null_init (rpcsvc_t *svc, dict_t *options) +{ +        return &rpcsvc_auth_null; +} diff --git a/rpc/rpc-lib/src/auth-unix.c b/rpc/rpc-lib/src/auth-unix.c new file mode 100644 index 00000000000..aed3c1f9d46 --- /dev/null +++ b/rpc/rpc-lib/src/auth-unix.c @@ -0,0 +1,90 @@ +/* +  Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + + + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "rpcsvc.h" +#include "list.h" +#include "dict.h" +#include "xdr-rpc.h" + + +int +auth_unix_request_init (rpcsvc_request_t *req, void *priv) +{ +        if (!req) +                return -1; +        memset (req->verf.authdata, 0, RPCSVC_MAX_AUTH_BYTES); +        req->verf.datalen = 0; +        req->verf.flavour = AUTH_NULL; + +        return 0; +} + +int auth_unix_authenticate (rpcsvc_request_t *req, void *priv) +{ +        int                     ret = RPCSVC_AUTH_REJECT; +        struct authunix_parms   aup; +        char                    machname[MAX_MACHINE_NAME]; + +        if (!req) +                return ret; + +        ret = xdr_to_auth_unix_cred (req->cred.authdata, req->cred.datalen, +                                     &aup, machname, req->auxgids); +        if (ret == -1) { +                ret = RPCSVC_AUTH_REJECT; +                goto err; +        } + +        req->uid = aup.aup_uid; +        req->gid = aup.aup_gid; +        req->auxgidcount = aup.aup_len; + +        gf_log (GF_RPCSVC, GF_LOG_TRACE, "Auth Info: machine name: %s, uid: %d" +                ", gid: %d", machname, req->uid, req->gid); +        ret = RPCSVC_AUTH_ACCEPT; +err: +        return ret; +} + +rpcsvc_auth_ops_t auth_unix_ops = { +        .conn_init              = NULL, +        .request_init           = auth_unix_request_init, +        .authenticate           = auth_unix_authenticate +}; + +rpcsvc_auth_t rpcsvc_auth_unix = { +        .authname       = "AUTH_UNIX", +        .authnum        = AUTH_UNIX, +        .authops        = &auth_unix_ops, +        .authprivate    = NULL +}; + + +rpcsvc_auth_t * +rpcsvc_auth_unix_init (rpcsvc_t *svc, dict_t *options) +{ +        return &rpcsvc_auth_unix; +} diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c new file mode 100644 index 00000000000..84fdb9bb55f --- /dev/null +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -0,0 +1,1283 @@ +/* +  Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "rpc-clnt.h" +#include "xdr-rpcclnt.h" +#include "rpc-transport.h" +#include "protocol-common.h" + +uint64_t +rpc_clnt_new_callid (struct rpc_clnt *clnt) +{ +        uint64_t callid = 0; + +        pthread_mutex_lock (&clnt->lock); +        { +                callid = ++clnt->xid; +        } +        pthread_mutex_unlock (&clnt->lock); + +        return callid; +} + + +struct saved_frame * +__saved_frames_get_timedout (struct saved_frames *frames, uint32_t timeout, +                             struct timeval *current) +{ +	struct saved_frame *bailout_frame = NULL, *tmp = NULL; + +	if (!list_empty(&frames->sf.list)) { +		tmp = list_entry (frames->sf.list.next, typeof (*tmp), list); +		if ((tmp->saved_at.tv_sec + timeout) < current->tv_sec) { +			bailout_frame = tmp; +			list_del_init (&bailout_frame->list); +			frames->count--; +		} +	} + +	return bailout_frame; +} + + +struct saved_frame * +__saved_frames_put (struct saved_frames *frames, void *frame, int32_t procnum, +                    rpc_clnt_prog_t *prog, fop_cbk_fn_t cbk, int64_t callid) +{ +	struct saved_frame *saved_frame = NULL; + +	saved_frame = GF_CALLOC (1, sizeof (*saved_frame), +                                 gf_common_mt_rpcclnt_savedframe_t); +	if (!saved_frame) { +                gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); +                goto out; +	} +        /* THIS should be saved and set back */ + +	INIT_LIST_HEAD (&saved_frame->list); + +	saved_frame->capital_this = THIS; +	saved_frame->frame        = frame; +	saved_frame->procnum      = procnum; +	saved_frame->callid       = callid; +        saved_frame->prog         = prog; +        saved_frame->cbkfn        = cbk; + +	gettimeofday (&saved_frame->saved_at, NULL); + +	list_add_tail (&saved_frame->list, &frames->sf.list); +	frames->count++; + +out: +	return saved_frame; +} + + +void +saved_frames_delete (struct saved_frame *saved_frame, +                     rpc_clnt_connection_t *conn) +{ +        if (!saved_frame || !conn) { +                goto out; +        } + +        pthread_mutex_lock (&conn->lock); +        { +                list_del_init (&saved_frame->list); +                conn->saved_frames->count--; +        } +        pthread_mutex_unlock (&conn->lock); + +        GF_FREE (saved_frame); +out: +        return; +} + + +static void +call_bail (void *data) +{ +        struct rpc_clnt       *clnt = NULL; +        rpc_clnt_connection_t *conn = NULL; +        struct timeval         current; +        struct list_head       list; +        struct saved_frame    *saved_frame = NULL; +        struct saved_frame    *trav = NULL; +        struct saved_frame    *tmp = NULL; +        struct tm              frame_sent_tm; +        char                   frame_sent[32] = {0,}; +        struct timeval         timeout = {0,}; +        gf_timer_cbk_t         timer_cbk = NULL; +        struct rpc_req         req; +        struct iovec           iov = {0,}; + +        GF_VALIDATE_OR_GOTO ("client", data, out); + +        clnt = data; + +        conn = &clnt->conn; + +        gettimeofday (¤t, NULL); +        INIT_LIST_HEAD (&list); + +        pthread_mutex_lock (&conn->lock); +        { +                /* Chaining to get call-always functionality from +                   call-once timer */ +                if (conn->timer) { +                        timer_cbk = conn->timer->callbk; + +                        timeout.tv_sec = 10; +                        timeout.tv_usec = 0; + +                        gf_timer_call_cancel (clnt->ctx, conn->timer); +                        conn->timer = gf_timer_call_after (clnt->ctx, +                                                           timeout, +                                                           call_bail, +                                                           (void *) clnt); + +                        if (conn->timer == NULL) { +                                gf_log (conn->trans->name, GF_LOG_DEBUG, +                                        "Cannot create bailout timer"); +                        } +                } + +                do { +                        saved_frame = +                                __saved_frames_get_timedout (conn->saved_frames, +                                                             conn->frame_timeout, +                                                             ¤t); +                        if (saved_frame) +                                list_add (&saved_frame->list, &list); + +                } while (saved_frame); +        } +        pthread_mutex_unlock (&conn->lock); + +        list_for_each_entry_safe (trav, tmp, &list, list) { +                localtime_r (&trav->saved_at.tv_sec, &frame_sent_tm); +                strftime (frame_sent, 32, "%Y-%m-%d %H:%M:%S", &frame_sent_tm); + +		gf_log (conn->trans->name, GF_LOG_ERROR, +			"bailing out frame type(%s) op(%s(%d)) sent = %s. " +                        "timeout = %d", +			trav->prog->progname, (trav->prog->procnames) ? +                        trav->prog->procnames[trav->procnum] : "--", +                        trav->procnum, frame_sent, +                        conn->frame_timeout); + +		trav->cbkfn (&req, &iov, 1, trav->frame); + +                list_del_init (&trav->list); +                GF_FREE (trav); +        } +out: +        return; +} + + +/* to be called with conn->lock held */ +struct saved_frame * +__save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame, int procnum, +              rpc_clnt_prog_t *prog, fop_cbk_fn_t cbk, uint64_t callid) +{ +        rpc_clnt_connection_t *conn        = NULL; +        struct timeval         timeout     = {0, }; +        struct saved_frame    *saved_frame = NULL; + +        conn = &rpc_clnt->conn; + +        saved_frame = __saved_frames_put (conn->saved_frames, frame, +                                          procnum, prog, cbk, callid); +        if (saved_frame == NULL) { +                goto out; +        } + +        /* TODO: make timeout configurable */ +        if (conn->timer == NULL) { +                timeout.tv_sec  = 10; +                timeout.tv_usec = 0; +                conn->timer = gf_timer_call_after (rpc_clnt->ctx, +                                                   timeout, +                                                   call_bail, +                                                   (void *) rpc_clnt); +        } + +out: +        return saved_frame; +} + + +struct saved_frames * +saved_frames_new (void) +{ +	struct saved_frames *saved_frames = NULL; + +	saved_frames = GF_CALLOC (1, sizeof (*saved_frames), +                                  gf_common_mt_rpcclnt_savedframe_t); +	if (!saved_frames) { +                gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); +		return NULL; +	} + +	INIT_LIST_HEAD (&saved_frames->sf.list); + +	return saved_frames; +} + + +int +__saved_frame_copy (struct saved_frames *frames, int64_t callid, +                    struct saved_frame *saved_frame) +{ +	struct saved_frame *tmp   = NULL; +        int                 ret   = -1; + +        if (!saved_frame) { +                ret = 0; +                goto out; +        } + +	list_for_each_entry (tmp, &frames->sf.list, list) { +		if (tmp->callid == callid) { +			*saved_frame = *tmp; +                        ret = 0; +			break; +		} +	} + +out: +	return ret; +} + + +struct saved_frame * +__saved_frame_get (struct saved_frames *frames, int64_t callid) +{ +	struct saved_frame *saved_frame = NULL; +	struct saved_frame *tmp = NULL; + +	list_for_each_entry (tmp, &frames->sf.list, list) { +		if (tmp->callid == callid) { +			list_del_init (&tmp->list); +			frames->count--; +			saved_frame = tmp; +			break; +		} +	} + +	if (saved_frame) { +                THIS  = saved_frame->capital_this; +        } + +	return saved_frame; +} + +void +saved_frames_unwind (struct saved_frames *saved_frames) +{ +	struct saved_frame   *trav = NULL; +	struct saved_frame   *tmp = NULL; + +        struct rpc_req        req; +        struct iovec          iov = {0,}; + +        memset (&req, 0, sizeof (req)); + +        req.rpc_status = -1; + +	list_for_each_entry_safe (trav, tmp, &saved_frames->sf.list, list) { +		gf_log ("rpc-clnt", GF_LOG_ERROR, +			"forced unwinding frame type(%s) op(%s(%d))", +			trav->prog->progname, (trav->prog->procnames) ? +                        trav->prog->procnames[trav->procnum] : "--", +                        trav->procnum); + +		saved_frames->count--; + +		trav->cbkfn (&req, &iov, 1, trav->frame); + +		list_del_init (&trav->list); +		GF_FREE (trav); +	} +} + + +void +saved_frames_destroy (struct saved_frames *frames) +{ +	saved_frames_unwind (frames); + +	GF_FREE (frames); +} + + +void +rpc_clnt_reconnect (void *trans_ptr) +{ +        rpc_transport_t         *trans = NULL; +        rpc_clnt_connection_t   *conn  = NULL; +        struct timeval           tv    = {0, 0}; +        int32_t                  ret   = 0; +        struct rpc_clnt         *clnt  = NULL; + +        trans = trans_ptr; +        if (!trans || !trans->mydata) +                return; + +        conn  = trans->mydata; +        clnt = conn->rpc_clnt; + +        pthread_mutex_lock (&conn->lock); +        { +                if (conn->reconnect) +                        gf_timer_call_cancel (clnt->ctx, +                                              conn->reconnect); +                conn->reconnect = 0; + +                if (conn->connected == 0) { +                        tv.tv_sec = 3; + +                        gf_log (trans->name, GF_LOG_TRACE, +                                "attempting reconnect"); +                        ret = rpc_transport_connect (trans); + +                        conn->reconnect = +                                gf_timer_call_after (clnt->ctx, tv, +                                                     rpc_clnt_reconnect, +                                                     trans); +                } else { +                        gf_log (trans->name, GF_LOG_TRACE, +                                "breaking reconnect chain"); +                } +        } +        pthread_mutex_unlock (&conn->lock); + +        if ((ret == -1) && (errno != EINPROGRESS) && (clnt->notifyfn)) { +                clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_DISCONNECT, NULL); +        } + +        return; +} + + +int +rpc_clnt_fill_request_info (struct rpc_clnt *clnt, rpc_request_info_t *info) +{ +        struct saved_frame  saved_frame = {{}, 0}; +        int                 ret         = -1; + +        pthread_mutex_lock (&clnt->conn.lock); +        { +                ret = __saved_frame_copy (clnt->conn.saved_frames, info->xid, +                                          &saved_frame); +        } +        pthread_mutex_unlock (&clnt->conn.lock); + +        if (ret == -1) { +                gf_log ("rpc-clnt", GF_LOG_CRITICAL, "cannot lookup the saved " +                        "frame corresponding to xid (%d)", info->xid); +                goto out; +        } + +        info->prognum = saved_frame.prog->prognum; +        info->procnum = saved_frame.procnum; +        info->progver = saved_frame.prog->progver; +        info->rsp     = saved_frame.rsp; + +        ret = 0; +out: +        return ret; +} + + +/* + * client_protocol_cleanup - cleanup function + * @trans: transport object + * + */ +int +rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn) +{ +        struct saved_frames    *saved_frames = NULL; +        struct rpc_clnt         *clnt  = NULL; + +        if (!conn) { +                goto out; +        } + +        clnt = conn->rpc_clnt; + +        gf_log ("rpc-clnt", GF_LOG_DEBUG, +                "cleaning up state in transport object %p", conn->trans); + +        pthread_mutex_lock (&conn->lock); +        { +                saved_frames = conn->saved_frames; +                conn->saved_frames = saved_frames_new (); + +                /* bailout logic cleanup */ +                if (conn->timer) { +                        gf_timer_call_cancel (clnt->ctx, conn->timer); +                        conn->timer = NULL; +                } + +                if (conn->reconnect == NULL) { +                        /* :O This part is empty.. any thing missing? */ +                } + +                conn->connected = 0; +        } +        pthread_mutex_unlock (&conn->lock); + +        saved_frames_destroy (saved_frames); + +out: +        return 0; +} + +/* + * lookup_frame - lookup call frame corresponding to a given callid + * @trans: transport object + * @callid: call id of the frame + * + * not for external reference + */ + +static struct saved_frame * +lookup_frame (rpc_clnt_connection_t *conn, int64_t callid) +{ +        struct saved_frame *frame = NULL; + +        pthread_mutex_lock (&conn->lock); +        { +                frame = __saved_frame_get (conn->saved_frames, callid); +        } +        pthread_mutex_unlock (&conn->lock); + +        return frame; +} + + +int +rpc_clnt_reply_fill (rpc_transport_pollin_t *msg, +                     rpc_clnt_connection_t *conn, +                     struct rpc_msg *replymsg, struct iovec progmsg, +                     struct rpc_req *req, struct saved_frame *saved_frame) +{ +        int           ret   = -1; + +        if ((!conn) || (!replymsg)|| (!req) || (!saved_frame) || (!msg)) { +                goto out; +        } + +        req->rpc_status = 0; +        if ((rpc_reply_status (replymsg) == MSG_DENIED) +            || (rpc_accepted_reply_status (replymsg) != SUCCESS)) { +                req->rpc_status = -1; +        } + +        req->xid = rpc_reply_xid (replymsg); +        req->prog = saved_frame->prog; +        req->procnum = saved_frame->procnum; +        req->conn = conn; + +        req->rsp[0] = progmsg; + +        if (msg->vectored) { +                req->rsp[1].iov_base = iobuf_ptr (msg->data.vector.iobuf2); +                req->rsp[1].iov_len = msg->data.vector.size2; + +                req->rspcnt = 2; + +                req->rsp_prochdr = iobuf_ref (msg->data.vector.iobuf1); +                req->rsp_procpayload = iobuf_ref (msg->data.vector.iobuf2); +        } else { +                req->rspcnt = 1; + +                req->rsp_prochdr = iobuf_ref (msg->data.simple.iobuf); +        } + +        /* By this time, the data bytes for the auth scheme would have already +         * been copied into the required sections of the req structure, +         * we just need to fill in the meta-data about it now. +         */ +        if (req->rpc_status == 0) { +                /* +                 * req->verf.flavour = rpc_reply_verf_flavour (replymsg); +                 * req->verf.datalen = rpc_reply_verf_len (replymsg); +                 */ +        } + +        ret = 0; + +out: +        return ret; +} + + +void +rpc_clnt_reply_deinit (struct rpc_req *req) +{ +        if (!req) { +                goto out; +        } + +        if (req->rsp_prochdr) { +                iobuf_unref (req->rsp_prochdr); +        } + +        if (req->rsp_procpayload) { +                iobuf_unref (req->rsp_procpayload); +        } + +out: +        return; +} + + +/* TODO: use mem-pool for allocating requests */ +int +rpc_clnt_reply_init (rpc_clnt_connection_t *conn, rpc_transport_pollin_t *msg, +                     struct rpc_req *req, struct saved_frame *saved_frame) +{ +        char                    *msgbuf = NULL; +        struct rpc_msg          rpcmsg; +        struct iovec            progmsg;        /* RPC Program payload */ +        size_t                  msglen  = 0; +        int                     ret     = -1; + +        if (msg->vectored) { +                msgbuf = iobuf_ptr (msg->data.vector.iobuf1); +                msglen = msg->data.vector.size1; +        } else { +                msgbuf = iobuf_ptr (msg->data.simple.iobuf); +                msglen = msg->data.simple.size; +        } + +        ret = xdr_to_rpc_reply (msgbuf, msglen, &rpcmsg, &progmsg, +                                req->verf.authdata); +        if (ret != 0) { +                gf_log ("rpc-clnt", GF_LOG_ERROR, "RPC reply decoding failed"); +                goto out; +        } + +        ret = rpc_clnt_reply_fill (msg, conn, &rpcmsg, progmsg, req, +                                   saved_frame); +        if (ret != 0) { +                goto out; +        } + +        gf_log ("rpc-clnt", GF_LOG_TRACE, "RPC XID: %"PRIx64", Program: %s," +                " ProgVers: %d, Proc: %d", saved_frame->callid, +                saved_frame->prog->progname, saved_frame->prog->progver, +                saved_frame->procnum); +/* TODO: */ +        /* TODO: AUTH */ +        /* The verifier that is sent in a reply is a string that can be used as +         * a shorthand in credentials for future transactions. We can opt not to +         * use this shorthand, preffering to use the original AUTH_UNIX method +         * for authentication (containing all the details for authentication in +         * credential itself). Hence it is not mandatory for us to be checking +         * the verifier. See Appendix A of rfc-5531 for more details. +         */ + +        /* +         * ret = rpc_authenticate (req); +         * if (ret == RPC_AUTH_REJECT) { +         * gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed authentication"); +         * ret = -1; +         * goto out; +         * } +         */ + +        /* If the error is not RPC_MISMATCH, we consider the call as accepted +         * since we are not handling authentication failures for now. +         */ +        req->rpc_status = 0; + +out: +        if (ret != 0) { +                req->rpc_status = -1; +        } + +        return ret; +} + + +int +rpc_clnt_handle_reply (struct rpc_clnt *clnt, rpc_transport_pollin_t *pollin) +{ +        rpc_clnt_connection_t *conn         = NULL; +        struct saved_frame    *saved_frame  = NULL; +        rpc_request_info_t    *request_info = NULL; +        int                    ret          = -1; +        struct rpc_req         req          = {0, }; + +        conn = &clnt->conn; + +        request_info = pollin->private; + +        saved_frame = lookup_frame (conn, (int64_t)request_info->xid); +        if (saved_frame == NULL) { +                gf_log ("rpc-clnt", GF_LOG_CRITICAL, "cannot lookup the " +                        "saved frame for reply with xid (%d), " +                        "prog-version (%d), prog-num (%d)," +                        "procnum (%d)", request_info->xid, +                        request_info->progver, request_info->prognum, +                        request_info->procnum); +                goto out; +        } + +        ret = rpc_clnt_reply_init (conn, pollin, &req, saved_frame); +        if (ret != 0) { +                req.rpc_status = -1; +                gf_log ("rpc-clnt", GF_LOG_DEBUG, "initialising rpc reply " +                        "failed"); +        } + +        saved_frame->cbkfn (&req, req.rsp, req.rspcnt, saved_frame->frame); + +        if (ret == 0) { +                rpc_clnt_reply_deinit (&req); +        } + +        ret = 0; +out: + +        if (saved_frame) { +                GF_FREE (saved_frame); +        } + +        return ret; +} + + +inline void +rpc_clnt_set_connected (rpc_clnt_connection_t *conn) +{ +        if (!conn) { +                goto out; +        } + +        pthread_mutex_lock (&conn->lock); +        { +                conn->connected = 1; +        } +        pthread_mutex_unlock (&conn->lock); + +out: +        return; +} + + +void +rpc_clnt_unset_connected (rpc_clnt_connection_t *conn) +{ +        if (!conn) { +                goto out; +        } + +        pthread_mutex_lock (&conn->lock); +        { +                conn->connected = 0; +        } +        pthread_mutex_unlock (&conn->lock); + +out: +        return; +} + + +int +rpc_clnt_notify (rpc_transport_t *trans, void *mydata, +                 rpc_transport_event_t event, void *data, ...) +{ +        rpc_clnt_connection_t  *conn     = NULL; +        struct rpc_clnt        *clnt     = NULL; +        int                     ret      = -1; +        rpc_request_info_t     *req_info = NULL; +        rpc_transport_pollin_t *pollin   = NULL; +        struct timeval          tv       = {0, }; + +        conn = mydata; +        if (conn == NULL) { +                goto out; +        } +        clnt = conn->rpc_clnt; + +        switch (event) { +        case RPC_TRANSPORT_DISCONNECT: +        { +                rpc_clnt_connection_cleanup (&clnt->conn); + +                pthread_mutex_lock (&conn->lock); +                { +                        if (conn->reconnect == NULL) { +                                tv.tv_sec = 10; + +                                conn->reconnect = +                                        gf_timer_call_after (clnt->ctx, tv, +                                                             rpc_clnt_reconnect, +                                                             conn->trans); +                        } +                } +                pthread_mutex_unlock (&conn->lock); + +                ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_DISCONNECT, +                                      NULL); +                break; +        } + +        case RPC_TRANSPORT_CLEANUP: +                /* this event should not be received on a client for, a +                 * transport is only disconnected, but never destroyed. +                 */ +                ret = 0; +                break; + +        case RPC_TRANSPORT_MAP_XID_REQUEST: +        { +                req_info = data; +                ret = rpc_clnt_fill_request_info (clnt, req_info); +                break; +        } + +        case RPC_TRANSPORT_MSG_RECEIVED: +        { +                pollin = data; +                ret = rpc_clnt_handle_reply (clnt, pollin); +                /* ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_MSG, +                 * data); +                 */ +                break; +        } + +        case RPC_TRANSPORT_MSG_SENT: +        { +                pthread_mutex_lock (&conn->lock); +                { +                        gettimeofday (&conn->last_sent, NULL); +                } +                pthread_mutex_unlock (&conn->lock); + +                ret = 0; +                break; +        } + +        case RPC_TRANSPORT_CONNECT: +        { +                ret = clnt->notifyfn (clnt, clnt->mydata, RPC_CLNT_CONNECT, NULL); +                break; +        } + +        case RPC_TRANSPORT_ACCEPT: +                /* only meaningful on a server, no need of handling this event +                 * in a client. +                 */ +                ret = 0; +                break; +        } + +out: +        return ret; +} + + +void +rpc_clnt_connection_deinit (rpc_clnt_connection_t *conn) +{ +        return; +} + + +inline int +rpc_clnt_connection_init (struct rpc_clnt *clnt, glusterfs_ctx_t *ctx, +                          dict_t *options, char *name) +{ +        int                    ret  = -1; +        rpc_clnt_connection_t *conn = NULL; + +        conn = &clnt->conn; +        pthread_mutex_init (&clnt->conn.lock, NULL); + +        ret = dict_get_int32 (options, "frame-timeout", +                              &conn->frame_timeout); +        if (ret >= 0) { +                gf_log (name, GF_LOG_DEBUG, +                        "setting frame-timeout to %d", conn->frame_timeout); +        } else { +                gf_log (name, GF_LOG_DEBUG, +                        "defaulting frame-timeout to 30mins"); +                conn->frame_timeout = 1800; +        } + +        conn->trans = rpc_transport_load (ctx, options, name); +        if (!conn->trans) { +                gf_log ("rpc-clnt", GF_LOG_DEBUG, "loading of new rpc-transport" +                        " failed"); +                goto out; +        } + +        rpc_transport_ref (conn->trans); + +        conn->rpc_clnt = clnt; + +        ret = rpc_transport_register_notify (conn->trans, rpc_clnt_notify, +                                             conn); +        if (ret == -1) { +                gf_log ("rpc-clnt", GF_LOG_DEBUG, "registering notify failed"); +                rpc_clnt_connection_cleanup (conn); +                conn = NULL; +                goto out; +        } + +        conn->saved_frames = saved_frames_new (); +        if (!conn->saved_frames) { +                gf_log ("rpc-clnt", GF_LOG_DEBUG, "creation of saved_frames " +                        "failed"); +                rpc_clnt_connection_cleanup (conn); +                goto out; +        } + +        rpc_clnt_reconnect (conn->trans); + +        ret = 0; + +out: +        return ret; +} + + +struct rpc_clnt * +rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options, +               glusterfs_ctx_t *ctx, char *name) +{ +        int                    ret  = -1; +        struct rpc_clnt       *rpc  = NULL; + +        rpc = GF_CALLOC (1, sizeof (*rpc), gf_common_mt_rpcclnt_t); +        if (!rpc) { +                gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); +                goto out; +        } + +        pthread_mutex_init (&rpc->lock, NULL); +        rpc->ctx = ctx; + +        ret = rpc_clnt_connection_init (rpc, ctx, options, name); +        if (ret == -1) { +                pthread_mutex_destroy (&rpc->lock); +                GF_FREE (rpc); +                rpc = NULL; +                goto out; +        } +out: +        return rpc; +} + + +int +rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn, +                          void *mydata) +{ +        rpc->mydata = mydata; +        rpc->notifyfn = fn; + +        return 0; +} + +ssize_t +xdr_serialize_glusterfs_auth (char *dest, struct auth_glusterfs_parms *au) +{ +        ssize_t ret = -1; +        XDR     xdr; + +        if ((!dest) || (!au)) +                return -1; + +        xdrmem_create (&xdr, dest, 1024, +                       XDR_ENCODE); + +        if (!xdr_auth_glusterfs_parms (&xdr, au)) { +                ret = -1; +                goto ret; +        } + +        ret = (((size_t)(&xdr)->x_private) - ((size_t)(&xdr)->x_base)); + +ret: +        return ret; +} + + +int +rpc_clnt_fill_request (int prognum, int progver, int procnum, int payload, +                       uint64_t xid, struct auth_glusterfs_parms *au, +                       struct rpc_msg *request) +{ +        int   ret          = -1; +        char  dest[1024]   = {0,}; + +        if (!request) { +                goto out; +        } + +        memset (request, 0, sizeof (*request)); + +        request->rm_xid = xid; +        request->rm_direction = CALL; + +        request->rm_call.cb_rpcvers = 2; +        request->rm_call.cb_prog = prognum; +        request->rm_call.cb_vers = progver; +        request->rm_call.cb_proc = procnum; + +        /* TODO: Using AUTH_GLUSTERFS for time-being. Make it modular in +         * future so it is easy to plug-in new authentication schemes. +         */ +        ret = xdr_serialize_glusterfs_auth (dest, au); +        if (ret == -1) { +                gf_log ("rpc-clnt", GF_LOG_DEBUG, "cannot encode credentials"); +                goto out; +        } + +        request->rm_call.cb_cred.oa_flavor = AUTH_GLUSTERFS; +        request->rm_call.cb_cred.oa_base   = dest; +        request->rm_call.cb_cred.oa_length = ret; + +        request->rm_call.cb_verf.oa_flavor = AUTH_NONE; +        request->rm_call.cb_verf.oa_base = NULL; +        request->rm_call.cb_verf.oa_length = 0; + +        ret = 0; +out: +        return ret; +} + + +void +rpc_clnt_set_lastfrag (uint32_t *fragsize) { +        (*fragsize) |= 0x80000000U; +} + + +void +rpc_clnt_set_frag_header_size (uint32_t size, char *haddr) +{ +        size = htonl (size); +        memcpy (haddr, &size, sizeof (size)); +} + + +void +rpc_clnt_set_last_frag_header_size (uint32_t size, char *haddr) +{ +        rpc_clnt_set_lastfrag (&size); +        rpc_clnt_set_frag_header_size (size, haddr); +} + + +struct iovec +rpc_clnt_record_build_header (char *recordstart, size_t rlen, +                              struct rpc_msg *request, size_t payload) +{ +        struct iovec    requesthdr = {0, }; +        struct iovec    txrecord   = {0, 0}; +        size_t          fraglen    = 0; +        int             ret        = -1; + +        /* After leaving aside the 4 bytes for the fragment header, lets +         * encode the RPC reply structure into the buffer given to us. +         */ +        ret = rpc_request_to_xdr (request, (recordstart + RPC_FRAGHDR_SIZE), +                                  rlen, &requesthdr); +        if (ret == -1) { +                gf_log ("rpc-clnt", GF_LOG_DEBUG, +                        "Failed to create RPC request"); +                goto out; +        } + +        fraglen = payload + requesthdr.iov_len; +        gf_log ("rpc-clnt", GF_LOG_TRACE, "Request fraglen %zu, payload: %zu, " +                "rpc hdr: %zu", fraglen, payload, requesthdr.iov_len); + +        /* Since we're not spreading RPC records over mutiple fragments +         * we just set this fragment as the first and last fragment for this +         * record. +         */ +        rpc_clnt_set_last_frag_header_size (fraglen, recordstart); + +        /* Even though the RPC record starts at recordstart+RPCSVC_FRAGHDR_SIZE +         * we need to transmit the record with the fragment header, which starts +         * at recordstart. +         */ +        txrecord.iov_base = recordstart; + +        /* Remember, this is only the vec for the RPC header and does not +         * include the payload above. We needed the payload only to calculate +         * the size of the full fragment. This size is sent in the fragment +         * header. +         */ +        txrecord.iov_len = RPC_FRAGHDR_SIZE + requesthdr.iov_len; + +out: +        return txrecord; +} + + +struct iobuf * +rpc_clnt_record_build_record (struct rpc_clnt *clnt, int prognum, int progver, +                              int procnum, size_t payload, uint64_t xid, +                              struct auth_glusterfs_parms *au, struct iovec *recbuf) +{ +        struct rpc_msg           request     = {0, }; +        struct iobuf            *request_iob = NULL; +        char                    *record      = NULL; +        struct iovec             recordhdr   = {0, }; +        size_t                   pagesize    = 0; +        int                      ret         = -1; + +        if ((!clnt) || (!recbuf) || (!au)) { +                goto out; +        } + +        /* First, try to get a pointer into the buffer which the RPC +         * layer can use. +         */ +        request_iob = iobuf_get (clnt->ctx->iobuf_pool); +        if (!request_iob) { +                gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed to get iobuf"); +                goto out; +        } + +        pagesize = ((struct iobuf_pool *)clnt->ctx->iobuf_pool)->page_size; + +        record = iobuf_ptr (request_iob);  /* Now we have it. */ + +        /* Fill the rpc structure and XDR it into the buffer got above. */ +        ret = rpc_clnt_fill_request (prognum, progver, procnum, payload, xid, +                                     au, &request); +        if (ret == -1) { +                gf_log ("rpc-clnt", GF_LOG_DEBUG, "cannot build a rpc-request " +                        "xid (%"PRIu64")", xid); +                goto out; +        } + +        recordhdr = rpc_clnt_record_build_header (record, pagesize, &request, +                                                  payload); + +        //GF_FREE (request.rm_call.cb_cred.oa_base); + +        if (!recordhdr.iov_base) { +                gf_log ("rpc-clnt", GF_LOG_ERROR, "Failed to build record " +                        " header"); +                iobuf_unref (request_iob); +                request_iob = NULL; +                recbuf->iov_base = NULL; +                goto out; +        } + +        recbuf->iov_base = recordhdr.iov_base; +        recbuf->iov_len = recordhdr.iov_len; + +out: +        return request_iob; +} + + +struct iobuf * +rpc_clnt_record (struct rpc_clnt *clnt, call_frame_t *call_frame, +                 rpc_clnt_prog_t *prog,int procnum, size_t payload_len, +                 struct iovec *rpchdr, uint64_t callid) +{ +        struct auth_glusterfs_parms  au                    = {0, }; +        struct iobuf                *request_iob           = NULL; + +        if (!prog || !rpchdr || !call_frame) { +                goto out; +        } + +        au.pid      = call_frame->root->pid; +        au.uid      = call_frame->root->uid; +        au.gid      = call_frame->root->gid; +        au.ngrps    = call_frame->root->ngrps; +        au.lk_owner = call_frame->root->lk_owner; +        if (!au.lk_owner) +                au.lk_owner = au.pid; + +        gf_log ("", GF_LOG_TRACE, "Auth Info: pid: %u, uid: %d" +                ", gid: %d, owner: %"PRId64, +                au.pid, au.uid, au.gid, au.lk_owner); + +        memcpy (au.groups, call_frame->root->groups, 16); + +        //rpc_transport_get_myname (clnt->conn.trans, myname, UNIX_PATH_MAX); +        //au.aup_machname = myname; + +        /* Assuming the client program would like to speak to the same versioned +         * program on server. +         */ +        request_iob = rpc_clnt_record_build_record (clnt, prog->prognum, +                                                    prog->progver, +                                                    procnum, payload_len, +                                                    callid, &au, +                                                    rpchdr); +        if (!request_iob) { +                gf_log ("rpc-clnt", GF_LOG_DEBUG, "cannot build rpc-record"); +                goto out; +        } + +out: +        return request_iob; +} + + +int +rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, +                 int procnum, fop_cbk_fn_t cbkfn, +                 struct iovec *proghdr, int proghdrcount, +                 struct iovec *progpayload, int progpayloadcount, +                 struct iobref *iobref, void *frame) +{ +        rpc_clnt_connection_t *conn        = NULL; +        struct iobuf          *request_iob = NULL; +        struct iovec           rpchdr      = {0,}; +        struct rpc_req         rpcreq      = {0,}; +        rpc_transport_req_t    req; +        int                    ret         = -1; +        int                    proglen     = 0; +        char                   new_iobref  = 0; +        uint64_t               callid      = 0; + +        if (!rpc || !prog || !frame) { +                goto out; +        } + +        memset (&req, 0, sizeof (req)); + +        if (!iobref) { +                iobref = iobref_new (); +                if (!iobref) { +                        gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory"); +                        goto out; +                } + +                new_iobref = 1; +        } + +        callid = rpc_clnt_new_callid (rpc); + +        conn = &rpc->conn; + +        pthread_mutex_lock (&conn->lock); +        { +                if (conn->connected == 0) { +                        rpc_transport_connect (conn->trans); +                } + +                ret = -1; + +                if (conn->connected || +                    /* FIXME: hack!! hack!! find a neater way to do this */ +                    ((prog->prognum == GLUSTER_HNDSK_PROGRAM) && +                     ((procnum == GF_HNDSK_SETVOLUME) || +                      (procnum == GF_HNDSK_DUMP_VERSION)))) { +                        if (proghdr) { +                                proglen += iov_length (proghdr, proghdrcount); +                        } + +                        if (progpayload) { +                                proglen += iov_length (progpayload, +                                                       progpayloadcount); +                        } + +                        request_iob = rpc_clnt_record (rpc, frame, prog, +                                                       procnum, proglen, +                                                       &rpchdr, callid); +                        if (!request_iob) { +                                gf_log ("rpc-clnt", GF_LOG_DEBUG, +                                        "cannot build rpc-record"); +                                goto unlock; +                        } + +                        iobref_add (iobref, request_iob); + +                        req.msg.rpchdr = &rpchdr; +                        req.msg.rpchdrcount = 1; +                        req.msg.proghdr = proghdr; +                        req.msg.proghdrcount = proghdrcount; +                        req.msg.progpayload = progpayload; +                        req.msg.progpayloadcount = progpayloadcount; +                        req.msg.iobref = iobref; + +                        ret = rpc_transport_submit_request (rpc->conn.trans, +                                                            &req); +                        if (ret == -1) { +                                gf_log ("rpc-clnt", GF_LOG_DEBUG, +                                        "transmission of rpc-request failed"); +                        } +                } + +                if ((ret >= 0) && frame) { +                        gettimeofday (&conn->last_sent, NULL); +                        /* Save the frame in queue */ +                        __save_frame (rpc, frame, procnum, prog, cbkfn, callid); +                } + +        } +unlock: +        pthread_mutex_unlock (&conn->lock); + +        if (ret == -1) { +                goto out; +        } + +        ret = 0; + +out: +        iobuf_unref (request_iob); + +        if (new_iobref && iobref) { +                iobref_unref (iobref); +        } + +        if (frame && (ret == -1)) { +                rpcreq.rpc_status = -1; +                cbkfn (&rpcreq, NULL, 0, frame); +        } +        return ret; +} + + +void +rpc_clnt_destroy (struct rpc_clnt *rpc) +{ +        rpc_clnt_connection_cleanup (&rpc->conn); +        pthread_mutex_destroy (&rpc->lock); +        pthread_mutex_destroy (&rpc->conn.lock); +        GF_FREE (rpc); +        return; +} diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h new file mode 100644 index 00000000000..efc256cd261 --- /dev/null +++ b/rpc/rpc-lib/src/rpc-clnt.h @@ -0,0 +1,166 @@ +/* +  Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + +#ifndef _RPC_CLNT_H +#define _RPC_CLNT_H + +#include "stack.h" +#include "rpc-transport.h" +#include "timer.h" +#include "xdr-common.h" + +typedef enum { +        RPC_CLNT_CONNECT, +        RPC_CLNT_DISCONNECT, +        RPC_CLNT_MSG +} rpc_clnt_event_t; + +#define AUTH_GLUSTERFS  5 + +struct xptr_clnt; +struct rpc_req; +struct rpc_clnt; +struct rpc_clnt_config; +struct rpc_clnt_program; + +typedef int (*rpc_clnt_notify_t) (struct rpc_clnt *rpc, void *mydata, +                                  rpc_clnt_event_t fn, void *data); + +typedef int (*fop_cbk_fn_t) (struct rpc_req *req, struct iovec *iov, int count, +                             void *myframe); + +typedef int (*clnt_fn_t) (call_frame_t *fr, xlator_t *xl, void *args); + +struct saved_frame { +	union { +		struct list_head list; +		struct { +			struct saved_frame *frame_next; +			struct saved_frame *frame_prev; +		}; +	}; +        void                    *capital_this; +	void                    *frame; +	struct timeval           saved_at; +	int32_t                  procnum; +        struct rpc_clnt_program *prog; +        fop_cbk_fn_t             cbkfn; +	uint64_t                 callid; +        rpc_transport_rsp_t      rsp; +}; + +struct saved_frames { +	int64_t            count; +	struct saved_frame sf; +}; + + +/* Initialized by procnum */ +typedef struct rpc_clnt_procedure { +        char         *procname; +        clnt_fn_t     fn; +} rpc_clnt_procedure_t; + +typedef struct rpc_clnt_program { +        char                 *progname; +        int                   prognum; +        int                   progver; +        rpc_clnt_procedure_t *proctable; +        char                **procnames; +        int                   numproc; +} rpc_clnt_prog_t; + +#define RPC_MAX_AUTH_BYTES   400 +typedef struct rpc_auth_data { +        int             flavour; +        int             datalen; +        char            authdata[RPC_MAX_AUTH_BYTES]; +} rpc_auth_data_t; + +#define rpc_auth_flavour(au)    ((au).flavour) + +struct rpc_clnt_connection { +        pthread_mutex_t        lock; +        rpc_transport_t       *trans; +        gf_timer_t            *reconnect; +        gf_timer_t            *timer; +        gf_timer_t            *ping_timer; +        struct rpc_clnt       *rpc_clnt; +        char                   connected; +        struct saved_frames   *saved_frames; +        int32_t                frame_timeout; +	struct timeval         last_sent; +	struct timeval         last_received; +	int32_t                ping_started; +}; +typedef struct rpc_clnt_connection rpc_clnt_connection_t; + +struct rpc_req { +        rpc_clnt_connection_t *conn; +        uint32_t               xid; +        struct iovec           req[2]; +        int                    reqcnt; +        struct iovec           rsp[2]; +        int                    rspcnt; +        struct iobuf          *rsp_prochdr; +        struct iobuf          *rsp_procpayload; +        int                    rpc_status; +        rpc_auth_data_t        verf; +        rpc_clnt_prog_t       *prog; +        int                    procnum; +}; + +struct rpc_clnt { +        pthread_mutex_t        lock; +        rpc_clnt_notify_t      notifyfn; +        rpc_clnt_connection_t  conn; +        void                  *mydata; +        uint64_t               xid; +        glusterfs_ctx_t       *ctx; +}; + +struct rpc_clnt_config { +        int    rpc_timeout; +        int    remote_port; +        char * remote_host; +}; + + +struct rpc_clnt * rpc_clnt_init (struct rpc_clnt_config *config, +                                 dict_t *options, glusterfs_ctx_t *ctx, +                                 char *name); + +int rpc_clnt_register_notify (struct rpc_clnt *rpc, rpc_clnt_notify_t fn, +                              void *mydata); + +int rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, +                     int procnum, fop_cbk_fn_t cbkfn, +                     struct iovec *proghdr, int proghdrcount, +                     struct iovec *progpayload, int progpayloadcount, +                     struct iobref *iobref, void *frame); + +void rpc_clnt_destroy (struct rpc_clnt *rpc); + +void rpc_clnt_set_connected (rpc_clnt_connection_t *conn); + +void rpc_clnt_unset_connected (rpc_clnt_connection_t *conn); + +void rpc_clnt_reconnect (void *trans_ptr); + +#endif /* !_RPC_CLNT_H */ diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c new file mode 100644 index 00000000000..82ea9a74bfd --- /dev/null +++ b/rpc/rpc-lib/src/rpc-transport.c @@ -0,0 +1,1301 @@ +/* +  Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + +#include <dlfcn.h> +#include <stdlib.h> +#include <stdio.h> +#include <sys/poll.h> +#include <fnmatch.h> +#include <stdint.h> + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "logging.h" +#include "rpc-transport.h" +#include "glusterfs.h" +/* FIXME: xlator.h is needed for volume_option_t, need to define the datatype + * in some other header + */ +#include "xlator.h" +#include "list.h" + +#ifndef GF_OPTION_LIST_EMPTY +#define GF_OPTION_LIST_EMPTY(_opt) (_opt->value[0] == NULL) +#endif + +/* RFC 1123 & 952 */ +static char +valid_host_name (char *address, int length) +{ +        int i = 0; +        char ret = 1; + +        if ((length > 75) || (length == 1)) { +                ret = 0; +                goto out; +        } + +        if (!isalnum (address[length - 1])) { +                ret = 0; +                goto out; +        } + +        for (i = 0; i < length; i++) { +                if (!isalnum (address[i]) && (address[i] != '.') +                    && (address[i] != '-')) { +                        ret = 0; +                        goto out; +                } +        } + +out: +        return ret; +} + +static char +valid_ipv4_address (char *address, int length) +{ +        int octets = 0; +        int value = 0; +        char *tmp = NULL, *ptr = NULL, *prev = NULL, *endptr = NULL; +        char ret = 1; + +        prev = tmp = gf_strdup (address); +        prev = strtok_r (tmp, ".", &ptr); + +        while (prev != NULL) +        { +                octets++; +                value = strtol (prev, &endptr, 10); +                if ((value > 255) || (value < 0) || (endptr != NULL)) { +                        ret = 0; +                        goto out; +                } + +                prev = strtok_r (NULL, ".", &ptr); +        } + +        if (octets != 4) { +                ret = 0; +        } + +out: +        GF_FREE (tmp); +        return ret; +} + + +static char +valid_ipv6_address (char *address, int length) +{ +        int hex_numbers = 0; +        int value = 0; +        char *tmp = NULL, *ptr = NULL, *prev = NULL, *endptr = NULL; +        char ret = 1; + +        tmp = gf_strdup (address); +        prev = strtok_r (tmp, ":", &ptr); + +        while (prev != NULL) +        { +                hex_numbers++; +                value = strtol (prev, &endptr, 16); +                if ((value > 0xffff) || (value < 0) +                    || (endptr != NULL && *endptr != '\0')) { +                        ret = 0; +                        goto out; +                } + +                prev = strtok_r (NULL, ":", &ptr); +        } + +        if (hex_numbers > 8) { +                ret = 0; +        } + +out: +        GF_FREE (tmp); +        return ret; +} + + +static char +valid_internet_address (char *address) +{ +        char ret = 0; +        int length = 0; + +        if (address == NULL) { +                goto out; +        } + +        length = strlen (address); +        if (length == 0) { +                goto out; +        } + +        if (valid_ipv4_address (address, length) +            || valid_ipv6_address (address, length) +            || valid_host_name (address, length)) { +                ret = 1; +        } + +out: +        return ret; +} + + +int +__volume_option_value_validate (char *name, +                                data_pair_t *pair, +                                volume_option_t *opt) +{ +	int       i = 0; +	int       ret = -1; +	uint64_t  input_size = 0; +	long long inputll = 0; + +	/* Key is valid, validate the option */ +	switch (opt->type) { +        case GF_OPTION_TYPE_XLATOR: +                break; + +	case GF_OPTION_TYPE_PATH: +	{ +                if (strstr (pair->value->data, "../")) { +                        gf_log (name, GF_LOG_ERROR, +                                "invalid path given '%s'", +                                pair->value->data); +                        ret = -1; +                        goto out; +                } + +                /* Make sure the given path is valid */ +		if (pair->value->data[0] != '/') { +			gf_log (name, GF_LOG_WARNING, +				"option %s %s: '%s' is not an " +				"absolute path name", +				pair->key, pair->value->data, +				pair->value->data); +		} +		ret = 0; +	} +	break; +	case GF_OPTION_TYPE_INT: +	{ +		/* Check the range */ +		if (gf_string2longlong (pair->value->data, +					&inputll) != 0) { +			gf_log (name, GF_LOG_ERROR, +				"invalid number format \"%s\" in " +				"\"option %s\"", +				pair->value->data, pair->key); +			goto out; +		} + +		if ((opt->min == 0) && (opt->max == 0)) { +			gf_log (name, GF_LOG_DEBUG, +				"no range check required for " +				"'option %s %s'", +				pair->key, pair->value->data); +			ret = 0; +			break; +		} +		if ((inputll < opt->min) || +		    (inputll > opt->max)) { +			gf_log (name, GF_LOG_WARNING, +				"'%lld' in 'option %s %s' is out of " +				"range [%"PRId64" - %"PRId64"]", +				inputll, pair->key, +				pair->value->data, +				opt->min, opt->max); +		} +		ret = 0; +	} +	break; +	case GF_OPTION_TYPE_SIZET: +	{ +		/* Check the range */ +		if (gf_string2bytesize (pair->value->data, +					&input_size) != 0) { +			gf_log (name, GF_LOG_ERROR, +				"invalid size format \"%s\" in " +				"\"option %s\"", +				pair->value->data, pair->key); +			goto out; +		} + +		if ((opt->min == 0) && (opt->max == 0)) { +			gf_log (name, GF_LOG_DEBUG, +				"no range check required for " +				"'option %s %s'", +				pair->key, pair->value->data); +			ret = 0; +			break; +		} +		if ((input_size < opt->min) || +		    (input_size > opt->max)) { +			gf_log (name, GF_LOG_ERROR, +				"'%"PRId64"' in 'option %s %s' is " +				"out of range [%"PRId64" - %"PRId64"]", +				input_size, pair->key, +				pair->value->data, +				opt->min, opt->max); +		} +		ret = 0; +	} +	break; +	case GF_OPTION_TYPE_BOOL: +	{ +		/* Check if the value is one of +		   '0|1|on|off|no|yes|true|false|enable|disable' */ +		gf_boolean_t bool_value; +		if (gf_string2boolean (pair->value->data, +				       &bool_value) != 0) { +			gf_log (name, GF_LOG_ERROR, +				"option %s %s: '%s' is not a valid " +				"boolean value", +				pair->key, pair->value->data, +				pair->value->data); +			goto out; +		} +		ret = 0; +	} +	break; +	case GF_OPTION_TYPE_STR: +	{ +		/* Check if the '*str' is valid */ +                if (GF_OPTION_LIST_EMPTY(opt)) { +                        ret = 0; +                        goto out; +                } + +		for (i = 0; (i < ZR_OPTION_MAX_ARRAY_SIZE) && +			     opt->value[i]; i++) { +			if (strcasecmp (opt->value[i], +					pair->value->data) == 0) { +				ret = 0; +				break; +			} +		} + +		if ((i == ZR_OPTION_MAX_ARRAY_SIZE) +		    || ((i < ZR_OPTION_MAX_ARRAY_SIZE) +			&& (!opt->value[i]))) { +			/* enter here only if +			 * 1. reached end of opt->value array and haven't +                         *    validated input +			 *                      OR +			 * 2. valid input list is less than +                         *    ZR_OPTION_MAX_ARRAY_SIZE and input has not +                         *    matched all possible input values. +			 */ +			char given_array[4096] = {0,}; +			for (i = 0; (i < ZR_OPTION_MAX_ARRAY_SIZE) && +				     opt->value[i]; i++) { +				strcat (given_array, opt->value[i]); +				strcat (given_array, ", "); +			} + +			gf_log (name, GF_LOG_ERROR, +				"option %s %s: '%s' is not valid " +				"(possible options are %s)", +				pair->key, pair->value->data, +				pair->value->data, given_array); + +			goto out; +		} +	} +	break; +	case GF_OPTION_TYPE_PERCENT: +	{ +		uint32_t percent = 0; + + +		/* Check if the value is valid percentage */ +		if (gf_string2percent (pair->value->data, +				       &percent) != 0) { +			gf_log (name, GF_LOG_ERROR, +				"invalid percent format \"%s\" " +				"in \"option %s\"", +				pair->value->data, pair->key); +			goto out; +		} + +		if ((percent < 0) || (percent > 100)) { +			gf_log (name, GF_LOG_ERROR, +				"'%d' in 'option %s %s' is out of " +				"range [0 - 100]", +				percent, pair->key, +				pair->value->data); +		} +		ret = 0; +	} +	break; +	case GF_OPTION_TYPE_PERCENT_OR_SIZET: +	{ +		uint32_t percent = 0; +		uint64_t input_size = 0; + +		/* Check if the value is valid percentage */ +		if (gf_string2percent (pair->value->data, +				       &percent) == 0) { +			if (percent > 100) { +				gf_log (name, GF_LOG_DEBUG, +					"value given was greater than 100, " +					"assuming this is actually a size"); +		        if (gf_string2bytesize (pair->value->data, +				                &input_size) == 0) { +				        /* Check the range */ +				if ((opt->min == 0) && +                                            (opt->max == 0)) { +				        gf_log (name, GF_LOG_DEBUG, +				        "no range check " +                                                        "required for " +					"'option %s %s'", +				pair->key, +                                                        pair->value->data); +						// It is a size +			                        ret = 0; +				                goto out; +				} +			if ((input_size < opt->min) || +				            (input_size > opt->max)) { +				        gf_log (name, GF_LOG_ERROR, +				        "'%"PRId64"' in " +                                                        "'option %s %s' is out" +					" of range [%"PRId64"" +                                                        "- %"PRId64"]", +				input_size, pair->key, +				pair->value->data, +				                opt->min, opt->max); +				} +					// It is a size +					ret = 0; +					goto out; +				} else { +					// It's not a percent or size +					gf_log (name, GF_LOG_ERROR, +					"invalid number format \"%s\" " +					"in \"option %s\"", +					pair->value->data, pair->key); +				} + +			} +			// It is a percent +			ret = 0; +			goto out; +		} else { +		        if (gf_string2bytesize (pair->value->data, +				        &input_size) == 0) { +			        /* Check the range */ +			if ((opt->min == 0) && (opt->max == 0)) { +			        gf_log (name, GF_LOG_DEBUG, +				        "no range check required for " +					"'option %s %s'", +			pair->key, pair->value->data); +					// It is a size +		                        ret = 0; +				        goto out; +			} +		        if ((input_size < opt->min) || +			            (input_size > opt->max)) { +					gf_log (name, GF_LOG_ERROR, +				        "'%"PRId64"' in 'option %s %s'" +                                                " is out of range [%"PRId64" -" +                                                " %"PRId64"]", +			input_size, pair->key, +			pair->value->data, +			                opt->min, opt->max); +				} +			} else { +				// It's not a percent or size +				gf_log (name, GF_LOG_ERROR, +					"invalid number format \"%s\" " +					"in \"option %s\"", +					pair->value->data, pair->key); +			} +			//It is a size +                        ret = 0; +		        goto out; +		} + +	} +	break; +	case GF_OPTION_TYPE_TIME: +	{ +		uint32_t input_time = 0; + +		/* Check if the value is valid percentage */ +		if (gf_string2time (pair->value->data, +				    &input_time) != 0) { +			gf_log (name, +				GF_LOG_ERROR, +				"invalid time format \"%s\" in " +				"\"option %s\"", +				pair->value->data, pair->key); +			goto out; +		} + +		if ((opt->min == 0) && (opt->max == 0)) { +			gf_log (name, GF_LOG_DEBUG, +				"no range check required for " +				"'option %s %s'", +				pair->key, pair->value->data); +			ret = 0; +			goto out; +		} +		if ((input_time < opt->min) || +		    (input_time > opt->max)) { +			gf_log (name, GF_LOG_ERROR, +				"'%"PRIu32"' in 'option %s %s' is " +				"out of range [%"PRId64" - %"PRId64"]", +				input_time, pair->key, +				pair->value->data, +				opt->min, opt->max); +		} +		ret = 0; +	} +	break; +	case GF_OPTION_TYPE_DOUBLE: +	{ +		double input_time = 0.0; + +		/* Check if the value is valid double */ +		if (gf_string2double (pair->value->data, +				      &input_time) != 0) { +			gf_log (name, +				GF_LOG_ERROR, +				"invalid time format \"%s\" in \"option %s\"", +				pair->value->data, pair->key); +			goto out; +		} + +		if (input_time < 0.0) { +			gf_log (name, +				GF_LOG_ERROR, +				"invalid time format \"%s\" in \"option %s\"", +				pair->value->data, pair->key); +			goto out; +		} + +		if ((opt->min == 0) && (opt->max == 0)) { +			gf_log (name, GF_LOG_DEBUG, +				"no range check required for 'option %s %s'", +				pair->key, pair->value->data); +			ret = 0; +			goto out; +		} +		ret = 0; +	} +	break; +        case GF_OPTION_TYPE_INTERNET_ADDRESS: +        { +                if (valid_internet_address (pair->value->data)) { +                        ret = 0; +                } +	} +        break; +	case GF_OPTION_TYPE_ANY: +		/* NO CHECK */ +		ret = 0; +		break; +	} + +out: +	return ret; +} + +/* FIXME: this procedure should be removed from transport */ +int +validate_volume_options (char *name, dict_t *options, volume_option_t *opt) +{ +	int i = 0; +	int ret = -1; +	int index = 0; +	volume_option_t *trav  = NULL; +	data_pair_t     *pairs = NULL; + +	if (!opt) { +		ret = 0; +		goto out; +	} + +	/* First search for not supported options, if any report error */ +	pairs = options->members_list; +	while (pairs) { +		ret = -1; +		for (index = 0; +		     opt[index].key && opt[index].key[0] ; index++) { +			trav = &(opt[index]); +			for (i = 0 ; +			     (i < ZR_VOLUME_MAX_NUM_KEY) && +				     trav->key[i]; i++) { +				/* Check if the key is valid */ +				if (fnmatch (trav->key[i], +					     pairs->key, FNM_NOESCAPE) == 0) { +					ret = 0; +					break; +				} +			} +			if (!ret) { +				if (i) { +					gf_log (name, GF_LOG_WARNING, +						"option '%s' is deprecated, " +						"preferred is '%s', continuing" +						" with correction", +						trav->key[i], trav->key[0]); +					/* TODO: some bytes lost */ +					pairs->key = gf_strdup (trav->key[0]); +				} +				break; +			} +		} +		if (!ret) { +			ret = __volume_option_value_validate (name, pairs, trav); +			if (-1 == ret) { +				goto out; +			} +		} + +		pairs = pairs->next; +	} + +	ret = 0; + out: +	return ret; +} + +int32_t +rpc_transport_get_myaddr (rpc_transport_t *this, char *peeraddr, int addrlen, +                          struct sockaddr *sa, size_t salen) +{ +        if (!this) +                return -1; + +        return this->ops->get_myaddr (this, peeraddr, addrlen, sa, salen); +} + +int32_t +rpc_transport_get_myname (rpc_transport_t *this, char *hostname, int hostlen) +{ +        if (!this) +                return -1; + +        return this->ops->get_myname (this, hostname, hostlen); +} + +int32_t +rpc_transport_get_peername (rpc_transport_t *this, char *hostname, int hostlen) +{ +        if (!this) +                return -1; +        return this->ops->get_peername (this, hostname, hostlen); +} + +int32_t +rpc_transport_get_peeraddr (rpc_transport_t *this, char *peeraddr, int addrlen, +                            struct sockaddr *sa, size_t salen) +{ +        if (!this) +                return -1; +        return this->ops->get_peeraddr (this, peeraddr, addrlen, sa, salen); +} + +void +rpc_transport_pollin_destroy (rpc_transport_pollin_t *pollin) +{ +        if (!pollin) { +                goto out; +        } + +        if (pollin->vectored) { +                if (pollin->data.vector.iobuf1) { +                        iobuf_unref (pollin->data.vector.iobuf1); +                } + +                if (pollin->data.vector.iobuf2) { +                        iobuf_unref (pollin->data.vector.iobuf2); +                } +        } else { +                if (pollin->data.simple.iobuf) { +                        iobuf_unref (pollin->data.simple.iobuf); +                } +        } + +        if (pollin->private) { +                /* */ +                GF_FREE (pollin->private); +        } + +        GF_FREE (pollin); +out: +        return; +} + + +rpc_transport_pollin_t * +rpc_transport_pollin_alloc (rpc_transport_t *this, struct iobuf *iobuf, +                            size_t size, struct iobuf *vectored_buf, +                            size_t vectored_size, void *private) +{ +        rpc_transport_pollin_t *msg = NULL; +        msg = GF_CALLOC (1, sizeof (*msg), gf_common_mt_rpc_trans_pollin_t); +        if (!msg) { +                gf_log ("rpc-transport", GF_LOG_ERROR, "out of memory"); +                goto out; +        } + +        if (vectored_buf) { +                msg->vectored = 1; +                msg->data.vector.iobuf1 = iobuf_ref (iobuf); +                msg->data.vector.size1  = size; + +                msg->data.vector.iobuf2 = iobuf_ref (vectored_buf); +                msg->data.vector.size2  = vectored_size; +        } else { +                msg->data.simple.iobuf = iobuf_ref (iobuf); +                msg->data.simple.size = size; +        } + +        msg->private = private; +out: +        return msg; +} + + +rpc_transport_pollin_t * +rpc_transport_same_process_pollin_alloc (rpc_transport_t *this, +                                         struct iovec *rpchdr, int rpchdrcount, +                                         struct iovec *proghdr, +                                         int proghdrcount, +                                         struct iovec *progpayload, +                                         int progpayloadcount, +                                         rpc_transport_rsp_t *rsp, +                                         char is_request) +{ +        rpc_transport_pollin_t *msg            = NULL; +        int                     rpchdrlen      = 0, proghdrlen = 0; +        int                     progpayloadlen = 0; +        char                    vectored       = 0; +        char                   *hdr            = NULL, *progpayloadbuf = NULL; + +        if (!rpchdr || !proghdr) { +                goto err; +        } + +        msg = GF_CALLOC (1, sizeof (*msg), gf_common_mt_rpc_trans_pollin_t); +        if (!msg) { +                gf_log ("rpc-transport", GF_LOG_ERROR, "out of memory"); +                goto err; +        } + +        rpchdrlen = iov_length (rpchdr, rpchdrcount); +        proghdrlen = iov_length (proghdr, proghdrcount); + +        if (progpayload) { +                vectored = 1; +                progpayloadlen = iov_length (progpayload, progpayloadcount); +        } + +        /* FIXME: we are assuming rpchdr and proghdr will fit into +         * an iobuf (128KB) +         */ +        if ((rpchdrlen + proghdrlen) > this->ctx->page_size) { +                gf_log ("rpc_transport", GF_LOG_DEBUG, "program hdr and rpc" +                        " hdr together combined (%d) is bigger than " +                        "iobuf size (%zu)", (rpchdrlen + proghdrlen), +                        this->ctx->page_size); +                goto err; +        } + +        if (vectored) { +                msg->data.vector.iobuf1 = iobuf_get (this->ctx->iobuf_pool); +                if (!msg->data.vector.iobuf1) { +                        gf_log ("rpc_transport", GF_LOG_ERROR, +                                "out of memory"); +                        goto err; +                } + +                msg->data.vector.size1 = rpchdrlen + proghdrlen; +                hdr = iobuf_ptr (msg->data.vector.iobuf1); + +                if (!is_request && rsp) { +                        msg->data.vector.iobuf2 = rsp->rspbuf; +                        progpayloadbuf = rsp->rspvec->iov_base; +                } else { +                        msg->data.vector.iobuf2 = iobuf_get (this->ctx->iobuf_pool); +                        if (!msg->data.vector.iobuf2) { +                                gf_log ("rpc_transport", GF_LOG_ERROR, +                                        "out of memory"); +                                goto err; +                        } + +                        progpayloadbuf = iobuf_ptr (msg->data.vector.iobuf2); +                } +                msg->data.vector.size2 = progpayloadlen; +        } else { +                if (!is_request && rsp) { +                        /* FIXME: Assuming rspvec contains only one vector */ +                        hdr = rsp->rspvec->iov_base; +                        msg->data.simple.iobuf = rsp->rspbuf; +                } else { +                        msg->data.simple.iobuf = iobuf_get (this->ctx->iobuf_pool); +                        if (!msg->data.simple.iobuf) { +                                gf_log ("rpc_transport", GF_LOG_ERROR, +                                        "out of memory"); +                                goto err; +                        } + +                        hdr = iobuf_ptr (msg->data.simple.iobuf); +                } + +                msg->data.simple.size = rpchdrlen + proghdrlen; +        } + +        iov_unload (hdr, rpchdr, rpchdrcount); +        hdr += rpchdrlen; +        iov_unload (hdr, proghdr, proghdrcount); + +        if (progpayload) { +                iov_unload (progpayloadbuf, progpayload, +                            progpayloadcount); +        } + +        if (is_request) { +                msg->private = rsp; +        } +        return msg; +err: +        if (msg) { +                rpc_transport_pollin_destroy (msg); +        } + +        return NULL; +} + + +rpc_transport_handover_t * +rpc_transport_handover_alloc (rpc_transport_pollin_t *pollin) +{ +        rpc_transport_handover_t *msg = NULL; + +        msg = GF_CALLOC (1, sizeof (*msg), gf_common_mt_rpc_trans_handover_t); +        if (!msg) { +                gf_log ("rpc_transport", GF_LOG_ERROR, "out of memory"); +                goto out; +        } + +        msg->pollin = pollin; +        INIT_LIST_HEAD (&msg->list); +out: +        return msg; +} + + +void +rpc_transport_handover_destroy (rpc_transport_handover_t *msg) +{ +        if (!msg) { +                goto out; +        } + +        if (msg->pollin) { +                rpc_transport_pollin_destroy (msg->pollin); +        } + +        GF_FREE (msg); + +out: +        return; +} + + +rpc_transport_t * +rpc_transport_load (glusterfs_ctx_t *ctx, dict_t *options, char *trans_name) +{ +	struct rpc_transport *trans = NULL, *return_trans = NULL; +	char *name = NULL; +	void *handle = NULL; +	char *type = NULL; +	char str[] = "ERROR"; +	int32_t ret = -1; +	int8_t is_tcp = 0, is_unix = 0, is_ibsdp = 0; +	volume_opt_list_t *vol_opt = NULL; + +	GF_VALIDATE_OR_GOTO("rpc-transport", options, fail); +	GF_VALIDATE_OR_GOTO("rpc-transport", ctx, fail); +	GF_VALIDATE_OR_GOTO("rpc-transport", trans_name, fail); + +	trans = GF_CALLOC (1, sizeof (struct rpc_transport), gf_common_mt_rpc_trans_t); +	GF_VALIDATE_OR_GOTO("rpc-transport", trans, fail); + +        trans->name = gf_strdup (trans_name); +        GF_VALIDATE_OR_GOTO ("rpc-transport", trans->name, fail); + +	trans->ctx = ctx; +	type = str; + +	/* Backward compatibility */ +	ret = dict_get_str (options, "transport-type", &type); +	if (ret < 0) { +		ret = dict_set_str (options, "transport-type", "socket"); +		if (ret < 0) +			gf_log ("dict", GF_LOG_DEBUG, +				"setting transport-type failed"); +		gf_log ("rpc-transport", GF_LOG_WARNING, +			"missing 'option transport-type'. defaulting to " +			"\"socket\""); +	} else { +		{ +			/* Backword compatibility to handle * /client, +			 * * /server. +			 */ +			char *tmp = strchr (type, '/'); +			if (tmp) +				*tmp = '\0'; +		} + +		is_tcp = strcmp (type, "tcp"); +		is_unix = strcmp (type, "unix"); +		is_ibsdp = strcmp (type, "ib-sdp"); +		if ((is_tcp == 0) || +		    (is_unix == 0) || +		    (is_ibsdp == 0)) { +			if (is_unix == 0) +				ret = dict_set_str (options, +						    "transport.address-family", +						    "unix"); +			if (is_ibsdp == 0) +				ret = dict_set_str (options, +						    "transport.address-family", +						    "inet-sdp"); + +			if (ret < 0) +				gf_log ("dict", GF_LOG_DEBUG, +					"setting address-family failed"); + +			ret = dict_set_str (options, +					    "transport-type", "socket"); +			if (ret < 0) +				gf_log ("dict", GF_LOG_DEBUG, +					"setting transport-type failed"); +		} +	} + +	ret = dict_get_str (options, "transport-type", &type); +	if (ret < 0) { +		gf_log ("rpc-transport", GF_LOG_ERROR, +			"'option transport-type <xx>' missing in volume '%s'", +			trans_name); +		goto fail; +	} + +	ret = gf_asprintf (&name, "%s/%s.so", RPC_TRANSPORTDIR, type); +        if (-1 == ret) { +                gf_log ("rpc-transport", GF_LOG_ERROR, "asprintf failed"); +                goto fail; +        } +	gf_log ("rpc-transport", GF_LOG_DEBUG, +		"attempt to load file %s", name); + +	handle = dlopen (name, RTLD_NOW|RTLD_GLOBAL); +	if (handle == NULL) { +		gf_log ("rpc-transport", GF_LOG_ERROR, "%s", dlerror ()); +		gf_log ("rpc-transport", GF_LOG_ERROR, +			"volume '%s': transport-type '%s' is not valid or " +			"not found on this machine", +			trans_name, type); +		goto fail; +	} + +	trans->ops = dlsym (handle, "tops"); +	if (trans->ops == NULL) { +		gf_log ("rpc-transport", GF_LOG_ERROR, +			"dlsym (rpc_transport_ops) on %s", dlerror ()); +		goto fail; +	} + +	trans->init = dlsym (handle, "init"); +	if (trans->init == NULL) { +		gf_log ("rpc-transport", GF_LOG_ERROR, +			"dlsym (gf_rpc_transport_init) on %s", dlerror ()); +		goto fail; +	} + +	trans->fini = dlsym (handle, "fini"); +	if (trans->fini == NULL) { +		gf_log ("rpc-transport", GF_LOG_ERROR, +			"dlsym (gf_rpc_transport_fini) on %s", dlerror ()); +		goto fail; +	} + +	vol_opt = GF_CALLOC (1, sizeof (volume_opt_list_t), +                             gf_common_mt_volume_opt_list_t); +        if (!vol_opt) { +                gf_log (trans_name, GF_LOG_ERROR, "out of memory"); +                goto fail; +        } + +	vol_opt->given_opt = dlsym (handle, "options"); +	if (vol_opt->given_opt == NULL) { +		gf_log ("rpc-transport", GF_LOG_DEBUG, +			"volume option validation not specified"); +	} else { +                /* FIXME: is adding really needed? */ +		/* list_add_tail (&vol_opt->list, &xl->volume_options); */ +		if (-1 == +		    validate_volume_options (trans_name, options, +                                             vol_opt->given_opt)) { +			gf_log ("rpc-transport", GF_LOG_ERROR, +				"volume option validation failed"); +			goto fail; +		} +	} + +	ret = trans->init (trans); +	if (ret != 0) { +		gf_log ("rpc-transport", GF_LOG_ERROR, +			"'%s' initialization failed", type); +		goto fail; +	} + +        trans->options = options; + +	pthread_mutex_init (&trans->lock, NULL); +	return_trans = trans; +	return return_trans; + +fail: +        if (trans) { +                if (trans->name) { +                        GF_FREE (trans->name); +                } + +                GF_FREE (trans); +        } + +        if (name) { +                GF_FREE (name); +        } + +        if (vol_opt) { +                GF_FREE (vol_opt); +        } + +        return NULL; +} + + +int32_t +rpc_transport_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) +{ +	int32_t                       ret          = -1; +        rpc_transport_t              *peer_trans   = NULL; +        rpc_transport_pollin_t       *pollin       = NULL; +        rpc_transport_handover_t     *handover_msg = NULL; +        rpc_transport_rsp_t          *rsp          = NULL; + +        if (this->peer_trans) { +                peer_trans = this->peer_trans; + +                rsp = GF_CALLOC (1, sizeof (*rsp), gf_common_mt_rpc_trans_rsp_t); +                if (!rsp) { +                        ret = -ENOMEM; +                        goto fail; +                } + +                *rsp = req->rsp; + +                pollin = rpc_transport_same_process_pollin_alloc (this, req->msg.rpchdr, +                                                                  req->msg.rpchdrcount, +                                                                  req->msg.proghdr, +                                                                  req->msg.proghdrcount, +                                                                  req->msg.progpayload, +                                                                  req->msg.progpayloadcount, +                                                                  rsp, 1); +                if (!pollin) { +                        GF_FREE (rsp); +                        ret = -ENOMEM; +                        goto fail; +                } + +                handover_msg = rpc_transport_handover_alloc (pollin); +                if (!handover_msg) { +                        rpc_transport_pollin_destroy (pollin); +                        ret = -ENOMEM; +                        goto fail; +                } + +                pthread_mutex_lock (&peer_trans->handover.mutex); +                { +                        list_add_tail (&handover_msg->list, +                                       &peer_trans->handover.msgs); +                        pthread_cond_broadcast (&peer_trans->handover.cond); +                } +                pthread_mutex_unlock (&peer_trans->handover.mutex); + +                return 0; +        } + +	GF_VALIDATE_OR_GOTO("rpc_transport", this, fail); +	GF_VALIDATE_OR_GOTO("rpc_transport", this->ops, fail); + +	ret = this->ops->submit_request (this, req); +fail: +	return ret; +} + + +int32_t +rpc_transport_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) +{ +	int32_t                   ret          = -1; +        rpc_transport_t              *peer_trans   = NULL; +        rpc_transport_pollin_t       *pollin       = NULL; +        rpc_transport_handover_t     *handover_msg = NULL; + +        if (this->peer_trans) { +                peer_trans = this->peer_trans; + +                pollin = rpc_transport_same_process_pollin_alloc (this, reply->msg.rpchdr, +                                                                  reply->msg.rpchdrcount, +                                                                  reply->msg.proghdr, +                                                                  reply->msg.proghdrcount, +                                                                  reply->msg.progpayload, +                                                                  reply->msg.progpayloadcount, +                                                                  reply->private, 0); +                if (!pollin) { +                        return -ENOMEM; +                } + +                handover_msg = rpc_transport_handover_alloc (pollin); +                if (!handover_msg) { +                        rpc_transport_pollin_destroy (pollin); +                        return -ENOMEM; +                } + +                pthread_mutex_lock (&peer_trans->handover.mutex); +                { +                        list_add_tail (&handover_msg->list, +                                       &peer_trans->handover.msgs); +                        pthread_cond_broadcast (&peer_trans->handover.cond); +                } +                pthread_mutex_unlock (&peer_trans->handover.mutex); + +                return 0; +        } + +	GF_VALIDATE_OR_GOTO("rpc_transport", this, fail); +	GF_VALIDATE_OR_GOTO("rpc_transport", this->ops, fail); + +	ret = this->ops->submit_reply (this, reply); +fail: +	return ret; +} + + +int32_t +rpc_transport_connect (rpc_transport_t *this) +{ +	int ret = -1; + +	GF_VALIDATE_OR_GOTO("rpc_transport", this, fail); + +	ret = this->ops->connect (this); +fail: +	return ret; +} + + +int32_t +rpc_transport_listen (rpc_transport_t *this) +{ +	int ret = -1; + +	GF_VALIDATE_OR_GOTO("rpc_transport", this, fail); + +	ret = this->ops->listen (this); +fail: +	return ret; +} + + +int32_t +rpc_transport_disconnect (rpc_transport_t *this) +{ +	int32_t ret = -1; + +	GF_VALIDATE_OR_GOTO("rpc_transport", this, fail); + +	ret = this->ops->disconnect (this); +fail: +	return ret; +} + + +int32_t +rpc_transport_destroy (rpc_transport_t *this) +{ +	int32_t ret = -1; + +	GF_VALIDATE_OR_GOTO("rpc_transport", this, fail); + +	if (this->fini) +		this->fini (this); +	pthread_mutex_destroy (&this->lock); +	GF_FREE (this); +fail: +	return ret; +} + + +rpc_transport_t * +rpc_transport_ref (rpc_transport_t *this) +{ +	rpc_transport_t *return_this = NULL; + +	GF_VALIDATE_OR_GOTO("rpc_transport", this, fail); + +	pthread_mutex_lock (&this->lock); +	{ +		this->refcount ++; +	} +	pthread_mutex_unlock (&this->lock); + +	return_this = this; +fail: +	return return_this; +} + + +int32_t +rpc_transport_unref (rpc_transport_t *this) +{ +	int32_t refcount = 0; +	int32_t ret = -1; + +	GF_VALIDATE_OR_GOTO("rpc_transport", this, fail); + +	pthread_mutex_lock (&this->lock); +	{ +		refcount = --this->refcount; +	} +	pthread_mutex_unlock (&this->lock); + +	if (refcount == 0) { +		/* xlator_notify (this->xl, GF_EVENT_RPC_TRANSPORT_CLEANUP, +                   this); */ +		rpc_transport_destroy (this); +	} + +	ret = 0; +fail: +	return ret; +} + + +int32_t +rpc_transport_notify (rpc_transport_t *this, rpc_transport_event_t event, +                      void *data, ...) +{ +        int32_t ret = -1; + +        if (this == NULL) { +                goto out; +        } + +        //ret = this->notify (this, this->notify_data, event, data); +        ret = this->notify (this, this->mydata, event, data); +out: +        return ret; +} + + +void * +rpc_transport_peerproc (void *trans_data) +{ +        rpc_transport_t                     *trans = NULL; +        rpc_transport_handover_t            *msg   = NULL; + +        trans = trans_data; + +        while (1) { +                pthread_mutex_lock (&trans->handover.mutex); +                { +                        while (list_empty (&trans->handover.msgs)) +                                pthread_cond_wait (&trans->handover.cond, +                                                   &trans->handover.mutex); + +                        msg = list_entry (trans->handover.msgs.next, +                                          rpc_transport_handover_t, list); + +                        list_del_init (&msg->list); +                } +                pthread_mutex_unlock (&trans->handover.mutex); + +                rpc_transport_notify (trans, RPC_TRANSPORT_MSG_RECEIVED, msg->pollin); +                rpc_transport_handover_destroy (msg); +        } +} + + +int +rpc_transport_setpeer (rpc_transport_t *trans, rpc_transport_t *peer_trans) +{ +        trans->peer_trans = rpc_transport_ref (peer_trans); + +        INIT_LIST_HEAD (&trans->handover.msgs); +        pthread_cond_init (&trans->handover.cond, NULL); +        pthread_mutex_init (&trans->handover.mutex, NULL); +        pthread_create (&trans->handover.thread, NULL, +                        rpc_transport_peerproc, trans); + +        peer_trans->peer_trans = rpc_transport_ref (trans); + +        INIT_LIST_HEAD (&peer_trans->handover.msgs); +        pthread_cond_init (&peer_trans->handover.cond, NULL); +        pthread_mutex_init (&peer_trans->handover.mutex, NULL); +        pthread_create (&peer_trans->handover.thread, NULL, +                        rpc_transport_peerproc, peer_trans); + +        return 0; +} + + +inline int +rpc_transport_register_notify (rpc_transport_t *trans, +                               rpc_transport_notify_t notify, void *mydata) +{ +        int ret = -1; + +        if (trans == NULL) { +                goto out; +        } + +        trans->notify = notify; +        trans->mydata = mydata; + +        ret = 0; +out: +        return ret; +} diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h new file mode 100644 index 00000000000..b3c7985e095 --- /dev/null +++ b/rpc/rpc-lib/src/rpc-transport.h @@ -0,0 +1,288 @@ +/* +  Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + +#ifndef __RPC_TRANSPORT_H__ +#define __RPC_TRANSPORT_H__ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include <inttypes.h> +#include <rpc/rpc.h> +#include <rpc/auth.h> +#include <rpc/rpc_msg.h> + +/* Given the 4-byte fragment header, returns non-zero if this fragment + * is the last fragment for the RPC record being assemebled. + * RPC Record marking standard defines a 32 bit value as the fragment + * header with the MSB signifying whether the fragment is the last + * fragment for the record being asembled. + */ +#define RPC_LASTFRAG(fraghdr) ((uint32_t)(fraghdr & 0x80000000U)) + +/* Given the 4-byte fragment header, extracts the bits that contain + * the fragment size. + */ +#define RPC_FRAGSIZE(fraghdr) ((uint32_t)(fraghdr & 0x7fffffffU)) + +#define RPC_FRAGHDR_SIZE               4 +#define RPC_MSGTYPE_SIZE               8 + +/* size of the msg from the start of call-body till and including credlen */ +#define RPC_CALL_BODY_SIZE             24 + +#define RPC_REPLY_STATUS_SIZE          4 + +#define RPC_AUTH_FLAVOUR_N_LENGTH_SIZE 8 + +#define RPC_ACCEPT_STATUS_LEN          4 + +struct rpc_transport_ops; +typedef struct rpc_transport rpc_transport_t; + +#include "dict.h" +#include "compat.h" +#include "rpcsvc-common.h" + +struct peer_info { +	struct sockaddr_storage sockaddr; +	socklen_t sockaddr_len; +	char identifier[UNIX_PATH_MAX]; +}; +typedef struct peer_info peer_info_t; + +typedef enum msg_type msg_type_t; + +typedef enum { +        RPC_TRANSPORT_ACCEPT,      /* New client has been accepted */ +        RPC_TRANSPORT_DISCONNECT,  /* Connection is disconnected */ +        RPC_TRANSPORT_CLEANUP,     /* connection is about to be freed */ +        /*RPC_TRANSPORT_READ,*/    /* An event used to enable rpcsvc to instruct +                                    * transport the number of bytes to read. +                                    * This helps in reading large msgs, wherein +                                    * the rpc actors might decide to place the +                                    * actor's payload in new iobufs separate +                                    * from the rpc header, proghdr and +                                    * authentication information. glusterfs/nfs +                                    * read and write actors are few examples +                                    * that might beniefit from this. While +                                    * reading a single msg, this event may be +                                    * delivered more than once. +                                    */ +        RPC_TRANSPORT_MAP_XID_REQUEST, /* reciever of this event should send +                                        * the prognum and procnum corresponding +                                        * to xid. +                                        */ +        RPC_TRANSPORT_MSG_RECEIVED,         /* Complete rpc msg has been read */ +        RPC_TRANSPORT_CONNECT,              /* client is connected to server */ +        RPC_TRANSPORT_MSG_SENT, +} rpc_transport_event_t; + +struct rpc_transport_msg { +        struct iovec     *rpchdr; +        int               rpchdrcount; +        struct iovec     *proghdr; +        int               proghdrcount; +        struct iovec     *progpayload; +        int               progpayloadcount; +        struct iobref    *iobref; +}; +typedef struct rpc_transport_msg rpc_transport_msg_t; + +struct rpc_transport_rsp { +        /* as of now, the entire rsp payload is read into rspbuf and hence +         * rspcount is always set to one. +         */ +        struct iovec    *rspvec; +        int              rspcount; +        struct iobuf    *rspbuf; +}; +typedef struct rpc_transport_rsp rpc_transport_rsp_t; + +struct rpc_transport_req { +        rpc_transport_msg_t msg; +        rpc_transport_rsp_t rsp; +}; +typedef struct rpc_transport_req rpc_transport_req_t; + +struct rpc_transport_reply { +        rpc_transport_msg_t  msg; +        void                *private; +}; +typedef struct rpc_transport_reply rpc_transport_reply_t; + +struct rpc_request_info { +        uint32_t            xid; +        int                 prognum; +        int                 progver; +        int                 procnum; +        rpc_transport_rsp_t rsp; +}; +typedef struct rpc_request_info rpc_request_info_t; + + +struct rpc_transport_pollin { +        union { +                struct vectored { +                        struct iobuf *iobuf1; +                        size_t        size1; +                        struct iobuf *iobuf2; +                        size_t        size2; +                } vector; +                struct simple { +                        struct iobuf *iobuf; +                        size_t        size; +                } simple; +        } data; +        char vectored; +        void *private; +}; +typedef struct rpc_transport_pollin rpc_transport_pollin_t; + +typedef int (*rpc_transport_notify_t) (rpc_transport_t *, void *mydata, +                                       rpc_transport_event_t, void *data, ...); +struct rpc_transport { +	struct rpc_transport_ops  *ops; +	void                      *private; +        void                      *xl_private; +	void                      *mydata; +	pthread_mutex_t            lock; +	int32_t                    refcount; + +        glusterfs_ctx_t           *ctx; +        dict_t                    *options; +        char                      *name; +	void                      *dnscache; +	data_t                    *buf; +	int32_t                  (*init)   (rpc_transport_t *this); +	void                     (*fini)   (rpc_transport_t *this); +        rpc_transport_notify_t     notify; +        void                      *notify_data; +	peer_info_t                peerinfo; +	peer_info_t                myinfo; + +        rpc_transport_t           *peer_trans; +        struct { +                pthread_mutex_t       mutex; +                pthread_cond_t        cond; +                pthread_t             thread; +                struct list_head      msgs; +                /* any request/reply will be transformed as pollin data on the +                 * peer, hence we are building up a pollin data even before +                 * handing it over to peer rpc_transport. In order to decide whether +                 * the pollin data is vectored or simple, we follow a simple +                 * algo i.e., if there is a progpayload in request/reply, its +                 * considered vectored, otherwise its a simple pollin data. +                 */ +                rpc_transport_pollin_t   *msg; +        } handover; +}; + +typedef struct { +        rpc_transport_pollin_t *pollin; +        struct list_head    list; +} rpc_transport_handover_t; + +struct rpc_transport_ops { +        /* no need of receive op, msg will be delivered through an event +         * notification +         */ +        int32_t (*submit_request) (rpc_transport_t *this, +                                   rpc_transport_req_t *req); +        int32_t (*submit_reply)   (rpc_transport_t *this, +                                   rpc_transport_reply_t *reply); +	int32_t (*connect)        (rpc_transport_t *this); +	int32_t (*listen)         (rpc_transport_t *this); +	int32_t (*disconnect)     (rpc_transport_t *this); +        int32_t (*get_peername)   (rpc_transport_t *this, char *hostname, +                                   int hostlen); +        int32_t (*get_peeraddr)   (rpc_transport_t *this, char *peeraddr, +                                   int addrlen, struct sockaddr *sa, +                                   socklen_t sasize); +        int32_t (*get_myname)     (rpc_transport_t *this, char *hostname, +                                   int hostlen); +        int32_t (*get_myaddr)     (rpc_transport_t *this, char *peeraddr, +                                   int addrlen, struct sockaddr *sa, +                                   socklen_t sasize); +}; + + +int32_t +rpc_transport_listen (rpc_transport_t *this); + +int32_t +rpc_transport_connect (rpc_transport_t *this); + +int32_t +rpc_transport_disconnect (rpc_transport_t *this); + +int32_t +rpc_transport_notify (rpc_transport_t *this, rpc_transport_event_t event, +                      void *data, ...); + +int32_t +rpc_transport_submit_request (rpc_transport_t *this, rpc_transport_req_t *req); + +int32_t +rpc_transport_submit_reply (rpc_transport_t *this, +                            rpc_transport_reply_t *reply); + +int32_t +rpc_transport_destroy (rpc_transport_t *this); + +rpc_transport_t * +rpc_transport_load (glusterfs_ctx_t *ctx, dict_t *options, char *name); + +rpc_transport_t * +rpc_transport_ref   (rpc_transport_t *trans); + +int32_t +rpc_transport_unref (rpc_transport_t *trans); + +int +rpc_transport_setpeer (rpc_transport_t *trans, rpc_transport_t *trans_peer); + +int +rpc_transport_register_notify (rpc_transport_t *trans, rpc_transport_notify_t, +                               void *mydata); + +int32_t +rpc_transport_get_peername (rpc_transport_t *this, char *hostname, int hostlen); + +int32_t +rpc_transport_get_peeraddr (rpc_transport_t *this, char *peeraddr, int addrlen, +                            struct sockaddr *sa, size_t salen); + +int32_t +rpc_transport_get_myname (rpc_transport_t *this, char *hostname, int hostlen); + +int32_t +rpc_transport_get_myaddr (rpc_transport_t *this, char *peeraddr, int addrlen, +                          struct sockaddr *sa, size_t salen); + +rpc_transport_pollin_t * +rpc_transport_pollin_alloc (rpc_transport_t *this, struct iobuf *iobuf, +                            size_t iobuf_size, struct iobuf *vectoriob, +                            size_t vectoriob_size, void *private); +void +rpc_transport_pollin_destroy (rpc_transport_pollin_t *pollin); + +#endif /* __RPC_TRANSPORT_H__ */ diff --git a/rpc/rpc-lib/src/rpcsvc-auth.c b/rpc/rpc-lib/src/rpcsvc-auth.c new file mode 100644 index 00000000000..fa039c38677 --- /dev/null +++ b/rpc/rpc-lib/src/rpcsvc-auth.c @@ -0,0 +1,409 @@ +/* +  Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + +#include "rpcsvc.h" +#include "logging.h" +#include "dict.h" + +extern rpcsvc_auth_t * +rpcsvc_auth_null_init (rpcsvc_t *svc, dict_t *options); + +extern rpcsvc_auth_t * +rpcsvc_auth_unix_init (rpcsvc_t *svc, dict_t *options); + +extern rpcsvc_auth_t * +rpcsvc_auth_glusterfs_init (rpcsvc_t *svc, dict_t *options); + +int +rpcsvc_auth_add_initer (struct list_head *list, char *idfier, +                        rpcsvc_auth_initer_t init) +{ +        struct rpcsvc_auth_list         *new = NULL; + +        if ((!list) || (!init) || (!idfier)) +                return -1; + +        new = GF_CALLOC (1, sizeof (*new), gf_common_mt_rpcsvc_auth_list); +        if (!new) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Memory allocation failed"); +                return -1; +        } + +        new->init = init; +        strcpy (new->name, idfier); +        INIT_LIST_HEAD (&new->authlist); +        list_add_tail (&new->authlist, list); +        return 0; +} + + + +int +rpcsvc_auth_add_initers (rpcsvc_t *svc) +{ +        int     ret = -1; + +        ret = rpcsvc_auth_add_initer (&svc->authschemes, "auth-glusterfs", +                                      (rpcsvc_auth_initer_t) +                                      rpcsvc_auth_glusterfs_init); +        if (ret == -1) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to add AUTH_GLUSTERFS"); +                goto err; +        } + +        ret = rpcsvc_auth_add_initer (&svc->authschemes, "auth-unix", +                                      (rpcsvc_auth_initer_t) +                                      rpcsvc_auth_unix_init); +        if (ret == -1) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to add AUTH_UNIX"); +                goto err; +        } + +        ret = rpcsvc_auth_add_initer (&svc->authschemes, "auth-null", +                                      (rpcsvc_auth_initer_t) +                                      rpcsvc_auth_null_init); +        if (ret == -1) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to add AUTH_NULL"); +                goto err; +        } + +        ret = 0; +err: +        return 0; +} + + +int +rpcsvc_auth_init_auth (rpcsvc_t *svc, dict_t *options, +                       struct rpcsvc_auth_list *authitem) +{ +        int             ret = -1; + +        if ((!svc) || (!options) || (!authitem)) +                return -1; + +        if (!authitem->init) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "No init function defined"); +                ret = -1; +                goto err; +        } + +        authitem->auth = authitem->init (svc, options); +        if (!authitem->auth) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Registration of auth failed:" +                        " %s", authitem->name); +                ret = -1; +                goto err; +        } + +        authitem->enable = 1; +        gf_log (GF_RPCSVC, GF_LOG_TRACE, "Authentication enabled: %s", +                authitem->auth->authname); + +        ret = 0; +err: +        return ret; +} + + +int +rpcsvc_auth_init_auths (rpcsvc_t *svc, dict_t *options) +{ +        int                     ret = -1; +        struct rpcsvc_auth_list *auth = NULL; +        struct rpcsvc_auth_list *tmp = NULL; + +        if (!svc) +                return -1; + +        if (list_empty (&svc->authschemes)) { +                gf_log (GF_RPCSVC, GF_LOG_WARNING, "No authentication!"); +                ret = 0; +                goto err; +        } + +        /* If auth null and sys are not disabled by the user, we must enable +         * it by default. This is a globally default rule, the user is still +         * allowed to disable the two for particular subvolumes. +         */ +        if (!dict_get (options, "rpc-auth.auth-null")) +                ret = dict_set_str (options, "rpc-auth.auth-null", "on"); + +        if (!dict_get (options, "rpc-auth.auth-unix")) +                ret = dict_set_str (options, "rpc-auth.auth-unix", "on"); + +        if (!dict_get (options, "rpc-auth.auth-glusterfs")) +                ret = dict_set_str (options, "rpc-auth.auth-glusterfs", "on"); + +        list_for_each_entry_safe (auth, tmp, &svc->authschemes, authlist) { +                ret = rpcsvc_auth_init_auth (svc, options, auth); +                if (ret == -1) +                        goto err; +        } + +        ret = 0; +err: +        return ret; + +} + +int +rpcsvc_auth_init (rpcsvc_t *svc, dict_t *options) +{ +        int             ret = -1; + +        if ((!svc) || (!options)) +                return -1; + +        ret = rpcsvc_auth_add_initers (svc); +        if (ret == -1) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to add initers"); +                goto out; +        } + +        ret = rpcsvc_auth_init_auths (svc, options); +        if (ret == -1) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to init auth schemes"); +                goto out; +        } + +out: +        return ret; +} + + +rpcsvc_auth_t * +__rpcsvc_auth_get_handler (rpcsvc_request_t *req) +{ +        int                     ret = -1; +        struct rpcsvc_auth_list *auth = NULL; +        struct rpcsvc_auth_list *tmp = NULL; +        rpcsvc_t                *svc = NULL; + +        if (!req) +                return NULL; + +        svc = rpcsvc_request_service (req); +        if (!svc) +                gf_log ("", 1, "something wrong, !svc"); + +        if (list_empty (&svc->authschemes)) { +                gf_log (GF_RPCSVC, GF_LOG_WARNING, "No authentication!"); +                ret = 0; +                goto err; +        } + +        list_for_each_entry_safe (auth, tmp, &svc->authschemes, authlist) { +                if (!auth->enable) +                        continue; +                if (auth->auth->authnum == req->cred.flavour) +                        goto err; + +        } + +        auth = NULL; +err: +        if (auth) +                return auth->auth; +        else +                return NULL; +} + +rpcsvc_auth_t * +rpcsvc_auth_get_handler (rpcsvc_request_t *req) +{ +        rpcsvc_auth_t           *auth = NULL; + +        auth = __rpcsvc_auth_get_handler (req); +        if (auth) +                goto ret; + +        gf_log (GF_RPCSVC, GF_LOG_TRACE, "No auth handler: %d", +                req->cred.flavour); + +        /* The requested scheme was not available so fall back the to one +         * scheme that will always be present. +         */ +        req->cred.flavour = AUTH_NULL; +        req->verf.flavour = AUTH_NULL; +        auth = __rpcsvc_auth_get_handler (req); +ret: +        return auth; +} + + +int +rpcsvc_auth_request_init (rpcsvc_request_t *req) +{ +        int                     ret = -1; +        rpcsvc_auth_t           *auth = NULL; + +        if (!req) +                return -1; + +        auth = rpcsvc_auth_get_handler (req); +        if (!auth) +                goto err; +        ret = 0; +        gf_log (GF_RPCSVC, GF_LOG_TRACE, "Auth handler: %s", auth->authname); +        if (!auth->authops->request_init) +                ret = auth->authops->request_init (req, auth->authprivate); + +err: +        return ret; +} + + +int +rpcsvc_authenticate (rpcsvc_request_t *req) +{ +        int                     ret = RPCSVC_AUTH_REJECT; +        rpcsvc_auth_t           *auth = NULL; +        int                     minauth = 0; + +        if (!req) +                return ret; + +        //minauth = rpcsvc_request_prog_minauth (req); +        minauth = 1; +        if (minauth > rpcsvc_request_cred_flavour (req)) { +                gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Auth too weak"); +                rpcsvc_request_set_autherr (req, AUTH_TOOWEAK); +                goto err; +        } + +        auth = rpcsvc_auth_get_handler (req); +        if (!auth) { +                gf_log (GF_RPCSVC, GF_LOG_DEBUG, "No auth handler found"); +                goto err; +        } + +        if (auth->authops->authenticate) +                ret = auth->authops->authenticate (req, auth->authprivate); + +err: +        return ret; +} + + +int +rpcsvc_auth_array (rpcsvc_t *svc, char *volname, int *autharr, int arrlen) +{ +        int             count = 0; +        int             gen = RPCSVC_AUTH_REJECT; +        int             spec = RPCSVC_AUTH_REJECT; +        int             final = RPCSVC_AUTH_REJECT; +        char            *srchstr = NULL; +        char            *valstr = NULL; +        gf_boolean_t    boolval = _gf_false; +        int             ret = 0; + +        struct rpcsvc_auth_list *auth = NULL; +        struct rpcsvc_auth_list *tmp = NULL; + +        if ((!svc) || (!autharr) || (!volname)) +                return -1; + +        memset (autharr, 0, arrlen * sizeof(int)); +        if (list_empty (&svc->authschemes)) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "No authentication!"); +                goto err; +        } + +        list_for_each_entry_safe (auth, tmp, &svc->authschemes, authlist) { +                if (count >= arrlen) +                        break; + +                gen = gf_asprintf (&srchstr, "rpc-auth.%s", auth->name); +                if (gen == -1) { +                        count = -1; +                        goto err; +                } + +                gen = RPCSVC_AUTH_REJECT; +                if (dict_get (svc->options, srchstr)) { +                        ret = dict_get_str (svc->options, srchstr, &valstr); +                        if (ret == 0) { +                                ret = gf_string2boolean (valstr, &boolval); +                                if (ret == 0) { +                                        if (boolval == _gf_true) +                                                gen = RPCSVC_AUTH_ACCEPT; +                                } else +                                        gf_log (GF_RPCSVC, GF_LOG_ERROR, "Faile" +                                                "d to read auth val"); +                        } else +                                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Faile" +                                        "d to read auth val"); +                } + +                GF_FREE (srchstr); +                spec = gf_asprintf (&srchstr, "rpc-auth.%s.%s", auth->name, +                                    volname); +                if (spec == -1) { +                        count = -1; +                        goto err; +                } + +                spec = RPCSVC_AUTH_DONTCARE; +                if (dict_get (svc->options, srchstr)) { +                        ret = dict_get_str (svc->options, srchstr, &valstr); +                        if (ret == 0) { +                                ret = gf_string2boolean (valstr, &boolval); +                                if (ret == 0) { +                                        if (boolval == _gf_true) +                                                spec = RPCSVC_AUTH_ACCEPT; +                                        else +                                                spec = RPCSVC_AUTH_REJECT; +                                } else +                                        gf_log (GF_RPCSVC, GF_LOG_ERROR, "Faile" +                                                "d to read auth val"); +                        } else +                                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Faile" +                                        "d to read auth val"); +                } + +                GF_FREE (srchstr); +                final = rpcsvc_combine_gen_spec_volume_checks (gen, spec); +                if (final == RPCSVC_AUTH_ACCEPT) { +                        autharr[count] = auth->auth->authnum; +                        ++count; +                } +        } + +err: +        return count; +} + + +gid_t * +rpcsvc_auth_unix_auxgids (rpcsvc_request_t *req, int *arrlen) +{ +        if ((!req) || (!arrlen)) +                return NULL; + +        if ((req->cred.flavour != AUTH_UNIX) || +            (req->cred.flavour != AUTH_GLUSTERFS)) +                return NULL; + +        *arrlen = req->auxgidcount; +        if (*arrlen == 0) +                return NULL; + +        return &req->auxgids[0]; +} diff --git a/rpc/rpc-lib/src/rpcsvc-common.h b/rpc/rpc-lib/src/rpcsvc-common.h new file mode 100644 index 00000000000..0b9d84cfdf0 --- /dev/null +++ b/rpc/rpc-lib/src/rpcsvc-common.h @@ -0,0 +1,83 @@ +/* +  Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + +#ifndef _RPCSVC_COMMON_H +#define _RPCSVC_COMMON_H + +#include <pthread.h> +#include "list.h" +#include "compat.h" +#include "glusterfs.h" +#include "dict.h" + +typedef enum { +        RPCSVC_EVENT_ACCEPT, +        RPCSVC_EVENT_DISCONNECT, +        RPCSVC_EVENT_LISTENER_DEAD, +} rpcsvc_event_t; + + +struct rpcsvc_state; + +typedef int (*rpcsvc_notify_t) (struct rpcsvc_state *, void *mydata, +                                rpcsvc_event_t, void *data); + + +/* Contains global state required for all the RPC services. + */ +typedef struct rpcsvc_state { + +        /* Contains list of (program, version) handlers. +         * other options. +         */ + +        pthread_mutex_t         rpclock; + +        unsigned int            memfactor; + +        /* List of the authentication schemes available. */ +        struct list_head        authschemes; + +        /* Reference to the options */ +        dict_t                  *options; + +        /* Allow insecure ports. */ +        int                     allow_insecure; + +        glusterfs_ctx_t         *ctx; + +        void                    *listener; + +        /* list of connections which will listen for incoming connections */ +        struct list_head         listeners; + +        /* list of programs registered with rpcsvc */ +        struct list_head         programs; + +        /* list of notification callbacks */ +        struct list_head         notify; +        int                      notify_count; + +        void                    *mydata; /* This is xlator */ +        rpcsvc_notify_t          notifyfn; + +} rpcsvc_t; + + +#endif /* #ifndef _RPCSVC_COMMON_H */ diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c new file mode 100644 index 00000000000..e07cb8ee714 --- /dev/null +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -0,0 +1,2015 @@ +/* +  Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "rpcsvc.h" +#include "rpc-transport.h" +#include "dict.h" +#include "logging.h" +#include "byte-order.h" +#include "common-utils.h" +#include "compat-errno.h" +#include "list.h" +#include "xdr-rpc.h" +#include "iobuf.h" +#include "globals.h" + +#include <errno.h> +#include <pthread.h> +#include <stdlib.h> +#include <rpc/rpc.h> +#include <rpc/pmap_clnt.h> +#include <arpa/inet.h> +#include <rpc/xdr.h> +#include <fnmatch.h> +#include <stdarg.h> +#include <stdio.h> + + +#define rpcsvc_alloc_request(con, request)                              \ +        do {                                                            \ +                request = (rpcsvc_request_t *) mem_get ((con)->rxpool); \ +                memset (request, 0, sizeof (rpcsvc_request_t));         \ +        } while (0) + + +int +rpcsvc_conn_peer_check_search (dict_t *options, char *pattern, char *clstr) +{ +        int                     ret = -1; +        char                    *addrtok = NULL; +        char                    *addrstr = NULL; +        char                    *svptr = NULL; + +        if ((!options) || (!clstr)) +                return -1; + +        if (!dict_get (options, pattern)) +                return -1; + +        ret = dict_get_str (options, pattern, &addrstr); +        if (ret < 0) { +                ret = -1; +                goto err; +        } + +        if (!addrstr) { +                ret = -1; +                goto err; +        } + +        addrtok = strtok_r (addrstr, ",", &svptr); +        while (addrtok) { + +                ret = fnmatch (addrtok, clstr, FNM_CASEFOLD); +                if (ret == 0) +                        goto err; + +                addrtok = strtok_r (NULL, ",", &svptr); +        } + +        ret = -1; +err: + +        return ret; +} + + +int +rpcsvc_conn_peer_check_allow (dict_t *options, char *volname, char *clstr) +{ +        int     ret = RPCSVC_AUTH_DONTCARE; +        char    *srchstr = NULL; +        char    globalrule[] = "rpc-auth.addr.allow"; + +        if ((!options) || (!clstr)) +                return ret; + +        /* If volname is NULL, then we're searching for the general rule to +         * determine the current address in clstr is allowed or not for all +         * subvolumes. +         */ +        if (volname) { +                ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname); +                if (ret == -1) { +                        gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); +                        ret = RPCSVC_AUTH_DONTCARE; +                        goto out; +                } +        } else +                srchstr = globalrule; + +        ret = rpcsvc_conn_peer_check_search (options, srchstr, clstr); +        if (volname) +                GF_FREE (srchstr); + +        if (ret == 0) +                ret = RPCSVC_AUTH_ACCEPT; +        else +                ret = RPCSVC_AUTH_DONTCARE; +out: +        return ret; +} + +int +rpcsvc_conn_peer_check_reject (dict_t *options, char *volname, char *clstr) +{ +        int     ret = RPCSVC_AUTH_DONTCARE; +        char    *srchstr = NULL; +        char    generalrule[] = "rpc-auth.addr.reject"; + +        if ((!options) || (!clstr)) +                return ret; + +        if (volname) { +                ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.reject", volname); +                if (ret == -1) { +                        gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); +                        ret = RPCSVC_AUTH_REJECT; +                        goto out; +                } +        } else +                srchstr = generalrule; + +        ret = rpcsvc_conn_peer_check_search (options, srchstr, clstr); +        if (volname) +                GF_FREE (srchstr); + +        if (ret == 0) +                ret = RPCSVC_AUTH_REJECT; +        else +                ret = RPCSVC_AUTH_DONTCARE; +out: +        return ret; +} + + +/* This function tests the results of the allow rule and the reject rule to + * combine them into a single result that can be used to determine if the + * connection should be allowed to proceed. + * Heres the test matrix we need to follow in this function. + * + * A -  Allow, the result of the allow test. Never returns R. + * R - Reject, result of the reject test. Never returns A. + * Both can return D or dont care if no rule was given. + * + * | @allow | @reject | Result | + * |    A   |   R     | R      | + * |    D   |   D     | D      | + * |    A   |   D     | A      | + * |    D   |   R     | R      | + */ +int +rpcsvc_combine_allow_reject_volume_check (int allow, int reject) +{ +        int     final = RPCSVC_AUTH_REJECT; + +        /* If allowed rule allows but reject rule rejects, we stay cautious +         * and reject. */ +        if ((allow == RPCSVC_AUTH_ACCEPT) && (reject == RPCSVC_AUTH_REJECT)) +                final = RPCSVC_AUTH_REJECT; +        /* if both are dont care, that is user did not specify for either allow +         * or reject, we leave it up to the general rule to apply, in the hope +         * that there is one. +         */ +        else if ((allow == RPCSVC_AUTH_DONTCARE) && +                 (reject == RPCSVC_AUTH_DONTCARE)) +                final = RPCSVC_AUTH_DONTCARE; +        /* If one is dont care, the other one applies. */ +        else if ((allow == RPCSVC_AUTH_ACCEPT) && +                 (reject == RPCSVC_AUTH_DONTCARE)) +                final = RPCSVC_AUTH_ACCEPT; +        else if ((allow == RPCSVC_AUTH_DONTCARE) && +                 (reject == RPCSVC_AUTH_REJECT)) +                final = RPCSVC_AUTH_REJECT; + +        return final; +} + + +/* Combines the result of the general rule test against, the specific rule + * to determine final permission for the client's address. + * + * | @gen   | @spec   | Result | + * |    A   |   A     | A      | + * |    A   |   R     | R      | + * |    A   |   D     | A      | + * |    D   |   A     | A      | + * |    D   |   R     | R      | + * |    D   |   D     | D      | + * |    R   |   A     | A      | + * |    R   |   D     | R      | + * |    R   |   R     | R      | + */ +int +rpcsvc_combine_gen_spec_addr_checks (int gen, int spec) +{ +        int     final = RPCSVC_AUTH_REJECT; + +        if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_ACCEPT)) +                final = RPCSVC_AUTH_ACCEPT; +        else if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_REJECT)) +                final = RPCSVC_AUTH_REJECT; +        else if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_DONTCARE)) +                final = RPCSVC_AUTH_ACCEPT; +        else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec == RPCSVC_AUTH_ACCEPT)) +                final = RPCSVC_AUTH_ACCEPT; +        else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec == RPCSVC_AUTH_REJECT)) +                final = RPCSVC_AUTH_REJECT; +        else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec== RPCSVC_AUTH_DONTCARE)) +                final = RPCSVC_AUTH_DONTCARE; +        else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_ACCEPT)) +                final = RPCSVC_AUTH_ACCEPT; +        else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_DONTCARE)) +                final = RPCSVC_AUTH_REJECT; +        else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_REJECT)) +                final = RPCSVC_AUTH_REJECT; + +        return final; +} + + + +/* Combines the result of the general rule test against, the specific rule + * to determine final test for the connection coming in for a given volume. + * + * | @gen   | @spec   | Result | + * |    A   |   A     | A      | + * |    A   |   R     | R      | + * |    A   |   D     | A      | + * |    D   |   A     | A      | + * |    D   |   R     | R      | + * |    D   |   D     | R      |, special case, we intentionally disallow this. + * |    R   |   A     | A      | + * |    R   |   D     | R      | + * |    R   |   R     | R      | + */ +int +rpcsvc_combine_gen_spec_volume_checks (int gen, int spec) +{ +        int     final = RPCSVC_AUTH_REJECT; + +        if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_ACCEPT)) +                final = RPCSVC_AUTH_ACCEPT; +        else if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_REJECT)) +                final = RPCSVC_AUTH_REJECT; +        else if ((gen == RPCSVC_AUTH_ACCEPT) && (spec == RPCSVC_AUTH_DONTCARE)) +                final = RPCSVC_AUTH_ACCEPT; +        else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec == RPCSVC_AUTH_ACCEPT)) +                final = RPCSVC_AUTH_ACCEPT; +        else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec == RPCSVC_AUTH_REJECT)) +                final = RPCSVC_AUTH_REJECT; +        /* On no rule, we reject. */ +        else if ((gen == RPCSVC_AUTH_DONTCARE) && (spec== RPCSVC_AUTH_DONTCARE)) +                final = RPCSVC_AUTH_REJECT; +        else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_ACCEPT)) +                final = RPCSVC_AUTH_ACCEPT; +        else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_DONTCARE)) +                final = RPCSVC_AUTH_REJECT; +        else if ((gen == RPCSVC_AUTH_REJECT) && (spec == RPCSVC_AUTH_REJECT)) +                final = RPCSVC_AUTH_REJECT; + +        return final; +} + + +int +rpcsvc_conn_peer_check_name (dict_t *options, char *volname, +                             rpcsvc_conn_t *conn) +{ +        int     ret = RPCSVC_AUTH_REJECT; +        int     aret = RPCSVC_AUTH_REJECT; +        int     rjret = RPCSVC_AUTH_REJECT; +        char    clstr[RPCSVC_PEER_STRLEN]; + +        if (!conn) +                return ret; + +        ret = rpcsvc_conn_peername (conn, clstr, RPCSVC_PEER_STRLEN); +        if (ret != 0) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get remote addr: " +                        "%s", gai_strerror (ret)); +                ret = RPCSVC_AUTH_REJECT; +                goto err; +        } + +        aret = rpcsvc_conn_peer_check_allow (options, volname, clstr); +        rjret = rpcsvc_conn_peer_check_reject (options, volname, clstr); + +        ret = rpcsvc_combine_allow_reject_volume_check (aret, rjret); + +err: +        return ret; +} + + +int +rpcsvc_conn_peer_check_addr (dict_t *options, char *volname,rpcsvc_conn_t *conn) +{ +        int     ret = RPCSVC_AUTH_REJECT; +        int     aret = RPCSVC_AUTH_DONTCARE; +        int     rjret = RPCSVC_AUTH_REJECT; +        char    clstr[RPCSVC_PEER_STRLEN]; + +        if (!conn) +                return ret; + +        ret = rpcsvc_conn_peeraddr (conn, clstr, RPCSVC_PEER_STRLEN, NULL, 0); +        if (ret != 0) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get remote addr: " +                        "%s", gai_strerror (ret)); +                ret = RPCSVC_AUTH_REJECT; +                goto err; +        } + +        aret = rpcsvc_conn_peer_check_allow (options, volname, clstr); +        rjret = rpcsvc_conn_peer_check_reject (options, volname, clstr); + +        ret = rpcsvc_combine_allow_reject_volume_check (aret, rjret); +err: +        return ret; +} + + +int +rpcsvc_conn_check_volume_specific (dict_t *options, char *volname, +                                   rpcsvc_conn_t *conn) +{ +        int             namechk = RPCSVC_AUTH_REJECT; +        int             addrchk = RPCSVC_AUTH_REJECT; +        gf_boolean_t    namelookup = _gf_true; +        char            *namestr = NULL; +        int             ret = 0; + +        if ((!options) || (!volname) || (!conn)) +                return RPCSVC_AUTH_REJECT; + +        /* Enabled by default */ +        if ((dict_get (options, "rpc-auth.addr.namelookup"))) { +                ret = dict_get_str (options, "rpc-auth.addr.namelookup" +                                    , &namestr); +                if (ret == 0) +                        ret = gf_string2boolean (namestr, &namelookup); +        } + +        /* We need two separate checks because the rules with addresses in them +         * can be network addresses which can be general and names can be +         * specific which will over-ride the network address rules. +         */ +        if (namelookup) +                namechk = rpcsvc_conn_peer_check_name (options, volname, conn); +        addrchk = rpcsvc_conn_peer_check_addr (options, volname, conn); + +        if (namelookup) +                ret = rpcsvc_combine_gen_spec_addr_checks (addrchk, namechk); +        else +                ret = addrchk; + +        return ret; +} + + +int +rpcsvc_conn_check_volume_general (dict_t *options, rpcsvc_conn_t *conn) +{ +        int             addrchk = RPCSVC_AUTH_REJECT; +        int             namechk = RPCSVC_AUTH_REJECT; +        gf_boolean_t    namelookup = _gf_true; +        char            *namestr = NULL; +        int             ret = 0; + +        if ((!options) || (!conn)) +                return RPCSVC_AUTH_REJECT; + +        /* Enabled by default */ +        if ((dict_get (options, "rpc-auth.addr.namelookup"))) { +                ret = dict_get_str (options, "rpc-auth.addr.namelookup" +                                    , &namestr); +                if (ret == 0) +                        ret = gf_string2boolean (namestr, &namelookup); +        } + +        /* We need two separate checks because the rules with addresses in them +         * can be network addresses which can be general and names can be +         * specific which will over-ride the network address rules. +         */ +        if (namelookup) +                namechk = rpcsvc_conn_peer_check_name (options, NULL, conn); +        addrchk = rpcsvc_conn_peer_check_addr (options, NULL, conn); + +        if (namelookup) +                ret = rpcsvc_combine_gen_spec_addr_checks (addrchk, namechk); +        else +                ret = addrchk; + +        return ret; +} + +int +rpcsvc_conn_peer_check (dict_t *options, char *volname, rpcsvc_conn_t *conn) +{ +        int     general_chk = RPCSVC_AUTH_REJECT; +        int     specific_chk = RPCSVC_AUTH_REJECT; + +        if ((!options) || (!volname) || (!conn)) +                return RPCSVC_AUTH_REJECT; + +        general_chk = rpcsvc_conn_check_volume_general (options, conn); +        specific_chk = rpcsvc_conn_check_volume_specific (options, volname, +                                                          conn); + +        return rpcsvc_combine_gen_spec_volume_checks (general_chk,specific_chk); +} + + +char * +rpcsvc_volume_allowed (dict_t *options, char *volname) +{ +        char    globalrule[] = "rpc-auth.addr.allow"; +        char    *srchstr = NULL; +        char    *addrstr = NULL; +        int     ret = -1; + +        if ((!options) || (!volname)) +                return NULL; + +        ret = gf_asprintf (&srchstr, "rpc-auth.addr.%s.allow", volname); +        if (ret == -1) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); +                goto out; +        } + +        if (!dict_get (options, srchstr)) { +                GF_FREE (srchstr); +                srchstr = globalrule; +                ret = dict_get_str (options, srchstr, &addrstr); +        } else +                ret = dict_get_str (options, srchstr, &addrstr); + +out: +        return addrstr; +} + + + +/* Initialize the core of a connection */ +rpcsvc_conn_t * +rpcsvc_conn_alloc (rpcsvc_t *svc, rpc_transport_t *trans) +{ +        rpcsvc_conn_t  *conn = NULL; +        int             ret = -1; +        unsigned int    poolcount = 0; + +        conn = GF_CALLOC (1, sizeof(*conn), gf_common_mt_rpcsvc_conn_t); +        if (!conn) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "memory allocation failed"); +                return NULL; +        } + +        conn->trans = trans; +        conn->svc   = svc; +        poolcount   = RPCSVC_POOLCOUNT_MULT * svc->memfactor; + +        gf_log (GF_RPCSVC, GF_LOG_TRACE, "rx pool: %d", poolcount); +        conn->rxpool = mem_pool_new (rpcsvc_request_t, poolcount); +        /* TODO: leak */ +        if (!conn->rxpool) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "mem pool allocation failed"); +                goto free_conn; +        } + +        /* Cannot consider a connection connected unless the user of this +         * connection decides it is ready to use. It is possible that we have +         * to free this connection soon after. That free will not happpen +         * unless the state is disconnected. +         */ +        conn->connstate = RPCSVC_CONNSTATE_DISCONNECTED; +        pthread_mutex_init (&conn->connlock, NULL); +        conn->connref = 0; + +        ret = 0; + +free_conn: +        if (ret == -1) { +                GF_FREE (conn); +                conn = NULL; +        } + +        return conn; +} + +int +rpcsvc_notify (rpc_transport_t *trans, void *mydata, +               rpc_transport_event_t event, void *data, ...); + +void +rpcsvc_conn_state_init (rpcsvc_conn_t *conn) +{ +        if (!conn) +                return; + +        ++conn->connref; +        conn->connstate = RPCSVC_CONNSTATE_CONNECTED; +} + + +rpcsvc_notify_wrapper_t * +rpcsvc_notify_wrapper_alloc (void) +{ +        rpcsvc_notify_wrapper_t *wrapper = NULL; + +        wrapper = GF_CALLOC (1, sizeof (*wrapper), gf_common_mt_rpcsvc_wrapper_t); +        if (!wrapper) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "memory allocation failed"); +                goto out; +        } + +        INIT_LIST_HEAD (&wrapper->list); +out: +        return wrapper; +} + + +void +rpcsvc_listener_destroy (rpcsvc_listener_t *listener) +{ +        rpcsvc_t *svc = NULL; + +        if (!listener) { +                goto out; +        } + +        if (!listener->conn) { +                goto listener_free; +        } + +        svc = listener->conn->svc; +        if (!svc) { +                goto listener_free; +        } + +        pthread_mutex_lock (&svc->rpclock); +        { +                list_del_init (&listener->list); +        } +        pthread_mutex_unlock (&svc->rpclock); + +listener_free: +        GF_FREE (listener); +out: +        return; +} + + +void +rpcsvc_conn_destroy (rpcsvc_conn_t *conn) +{ +        rpcsvc_notify_wrapper_t *wrapper  = NULL; +        rpcsvc_event_t           event    = 0; +        rpcsvc_listener_t       *listener = NULL; +        rpcsvc_t                *svc      = NULL; +        rpcsvc_notify_wrapper_t *wrappers = NULL; +        int                      i        = 0, wrapper_count = 0; + +        if (!conn) +                goto out; + +        mem_pool_destroy (conn->rxpool); + +        listener = conn->listener; +        if (!listener) +                goto out; + +        event = (listener->conn == conn) ? RPCSVC_EVENT_LISTENER_DEAD +                : RPCSVC_EVENT_DISCONNECT; + +        svc = conn->svc; +        if (!svc) +                goto out; + +        pthread_mutex_lock (&svc->rpclock); +        { +                wrappers = GF_CALLOC (svc->notify_count, sizeof (*wrapper), +                                      gf_common_mt_rpcsvc_wrapper_t); +                if (!wrappers) { +                        goto unlock; +                } + +                list_for_each_entry (wrapper, &conn->listener->list, +                                     list) { +                        if (wrapper->notify) { +                                wrappers[i++] = *wrapper; +                        } +                } +                wrapper_count = i; +        } +unlock: +        pthread_mutex_unlock (&svc->rpclock); + +        if (wrappers) { +                for (i = 0; i < wrapper_count; i++) { +                        wrappers[i].notify (conn->svc, wrappers[i].data, +                                            event, conn); +                } + +                GF_FREE (wrappers); +        } + +        if (listener->conn == conn) { +                rpcsvc_listener_destroy (listener); +        } + +        /* Need to destory record state, txlists etc. */ +        GF_FREE (conn); +out: +        gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Connection destroyed"); +} + + +rpcsvc_conn_t * +rpcsvc_conn_init (rpcsvc_t *svc, rpc_transport_t *trans) +{ +        int            ret  = -1; +        rpcsvc_conn_t *conn = NULL; + +        conn = rpcsvc_conn_alloc (svc, trans); +        if (!conn) { +                ret = -1; +                gf_log (GF_RPCSVC, GF_LOG_DEBUG, "cannot init a connection"); +                goto out; +        } + +        ret = rpc_transport_register_notify (trans, rpcsvc_notify, conn); +        if (ret == -1) { +                gf_log (GF_RPCSVC, GF_LOG_DEBUG, "registering notify failed"); +                rpcsvc_conn_destroy (conn); +                conn = NULL; +                goto out; +        } + +        rpcsvc_conn_state_init (conn); + +out: +        return conn; +} + + +int +__rpcsvc_conn_unref (rpcsvc_conn_t *conn) +{ +        --conn->connref; +        return conn->connref; +} + + +void +__rpcsvc_conn_deinit (rpcsvc_conn_t *conn) +{ +        if (!conn) +                return; + +        if (rpcsvc_conn_check_active (conn)) { +                conn->connstate = RPCSVC_CONNSTATE_DISCONNECTED; +        } + +        if (conn->trans) { +                rpc_transport_disconnect (conn->trans); +                conn->trans = NULL; +        } +} + + +void +rpcsvc_conn_deinit (rpcsvc_conn_t *conn) +{ +        int ref = 0; + +        if (!conn) +                return; + +        pthread_mutex_lock (&conn->connlock); +        { +                __rpcsvc_conn_deinit (conn); +                ref = __rpcsvc_conn_unref (conn); +        } +        pthread_mutex_unlock (&conn->connlock); + +        if (ref == 0) +                rpcsvc_conn_destroy (conn); + +        return; +} + + +void +rpcsvc_conn_unref (rpcsvc_conn_t *conn) +{ +        int ref = 0; +        if (!conn) +                return; + +        pthread_mutex_lock (&conn->connlock); +        { +                ref = __rpcsvc_conn_unref (conn); +        } +        pthread_mutex_unlock (&conn->connlock); + +        if (ref == 0) { +                rpcsvc_conn_destroy (conn); +        } +} + + +int +rpcsvc_conn_active (rpcsvc_conn_t *conn) +{ +        int     status = 0; + +        if (!conn) +                return 0; + +        pthread_mutex_lock (&conn->connlock); +        { +                status = rpcsvc_conn_check_active (conn); +        } +        pthread_mutex_unlock (&conn->connlock); + +        return status; +} + + +void +rpcsvc_conn_ref (rpcsvc_conn_t *conn) +{ +        if (!conn) +                return; + +        pthread_mutex_lock (&conn->connlock); +        { +                ++conn->connref; +        } +        pthread_mutex_unlock (&conn->connlock); + +        return; +} + + +int +rpcsvc_conn_privport_check (rpcsvc_t *svc, char *volname, rpcsvc_conn_t *conn) +{ +        struct sockaddr_in      sa; +        int                     ret = RPCSVC_AUTH_REJECT; +        socklen_t               sasize = sizeof (sa); +        char                    *srchstr = NULL; +        char                    *valstr = NULL; +        int                     globalinsecure = RPCSVC_AUTH_REJECT; +        int                     exportinsecure = RPCSVC_AUTH_DONTCARE; +        uint16_t                port = 0; +        gf_boolean_t            insecure = _gf_false; + +        if ((!svc) || (!volname) || (!conn)) +                return ret; + +        ret = rpcsvc_conn_peeraddr (conn, NULL, 0, (struct sockaddr *)&sa, +                                    sasize); +        if (ret != 0) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get peer addr: %s", +                        gai_strerror (ret)); +                ret = RPCSVC_AUTH_REJECT; +                goto err; +        } + +        port = ntohs (sa.sin_port); +        gf_log (GF_RPCSVC, GF_LOG_TRACE, "Client port: %d", (int)port); +        /* If the port is already a privileged one, dont bother with checking +         * options. +         */ +        if (port <= 1024) { +                ret = RPCSVC_AUTH_ACCEPT; +                goto err; +        } + +        /* Disabled by default */ +        if ((dict_get (svc->options, "rpc-auth.ports.insecure"))) { +                ret = dict_get_str (svc->options, "rpc-auth.ports.insecure" +                                    , &srchstr); +                if (ret == 0) { +                        ret = gf_string2boolean (srchstr, &insecure); +                        if (ret == 0) { +                                if (insecure == _gf_true) +                                        globalinsecure = RPCSVC_AUTH_ACCEPT; +                        } else +                                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to" +                                        " read rpc-auth.ports.insecure value"); +                } else +                        gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to" +                                " read rpc-auth.ports.insecure value"); +        } + +        /* Disabled by default */ +        ret = gf_asprintf (&srchstr, "rpc-auth.ports.%s.insecure", volname); +        if (ret == -1) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "asprintf failed"); +                ret = RPCSVC_AUTH_REJECT; +                goto err; +        } + +        if (dict_get (svc->options, srchstr)) { +                ret = dict_get_str (svc->options, srchstr, &valstr); +                if (ret == 0) { +                        ret = gf_string2boolean (srchstr, &insecure); +                        if (ret == 0) { +                                if (insecure == _gf_true) +                                        exportinsecure = RPCSVC_AUTH_ACCEPT; +                                else +                                        exportinsecure = RPCSVC_AUTH_REJECT; +                        } else +                                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to" +                                        " read rpc-auth.ports.insecure value"); +                } else +                        gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to" +                                " read rpc-auth.ports.insecure value"); +        } + +        ret = rpcsvc_combine_gen_spec_volume_checks (globalinsecure, +                                                     exportinsecure); +        if (ret == RPCSVC_AUTH_ACCEPT) +                gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Unprivileged port allowed"); +        else +                gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Unprivileged port not" +                        " allowed"); + +err: +        return ret; +} + + +/* This needs to change to returning errors, since + * we need to return RPC specific error messages when some + * of the pointers below are NULL. + */ +rpcsvc_actor_t * +rpcsvc_program_actor (rpcsvc_conn_t *conn, rpcsvc_request_t *req) +{ +        rpcsvc_program_t        *program = NULL; +        int                     err      = SYSTEM_ERR; +        rpcsvc_actor_t          *actor   = NULL; +        rpcsvc_t                *svc     = NULL; +        char                    found    = 0; + +        if ((!conn) || (!req)) +                goto err; + +        svc = conn->svc; +        pthread_mutex_lock (&svc->rpclock); +        { +                list_for_each_entry (program, &svc->programs, program) { +                        if (program->prognum == req->prognum) { +                                err = PROG_MISMATCH; +                        } + +                        if ((program->prognum == req->prognum) +                            && (program->progver == req->progver)) { +                                found = 1; +                                break; +                        } +                } +        } +        pthread_mutex_unlock (&svc->rpclock); + +        if (!found) { +                if (err != PROG_MISMATCH) { +                        gf_log (GF_RPCSVC, GF_LOG_ERROR, +                                "RPC program not available"); +                        err = PROG_UNAVAIL; +                        goto err; +                } + +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC program version not" +                        " available"); +                goto err; +        } +        req->prog = program; +        if (!program->actors) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC System error"); +                err = SYSTEM_ERR; +                goto err; +        } + +        if ((req->procnum < 0) || (req->procnum >= program->numactors)) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC Program procedure not" +                        " available"); +                err = PROC_UNAVAIL; +                goto err; +        } + +        actor = &program->actors[req->procnum]; +        if (!actor->actor) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC Program procedure not" +                        " available"); +                err = PROC_UNAVAIL; +                actor = NULL; +                goto err; +        } + +        err = SUCCESS; +        gf_log (GF_RPCSVC, GF_LOG_TRACE, "Actor found: %s - %s", +                program->progname, actor->procname); +err: +        if (req) +                req->rpc_err = err; + +        return actor; +} + + +/* this procedure can only pass 4 arguments to registered notifyfn. To send more + * arguements call wrapper->notify directly. + */ +inline void +rpcsvc_program_notify (rpcsvc_listener_t *listener, rpcsvc_event_t event, +                       void *data) +{ +        rpcsvc_notify_wrapper_t *wrapper = NULL; + +        if (!listener) { +                goto out; +        } + +        list_for_each_entry (wrapper, &listener->list, list) { +                if (wrapper->notify) { +                        wrapper->notify (listener->conn->svc, +                                         wrapper->data, +                                         event, data); +                } +        } + +out: +        return; +} + + +int +rpcsvc_accept (rpcsvc_conn_t *listen_conn, rpc_transport_t *new_trans) +{ +        rpcsvc_listener_t *listener = NULL; +        rpcsvc_conn_t     *conn     = NULL; +        char               clstr[RPCSVC_PEER_STRLEN]; + +        listener = listen_conn->listener; +        conn = rpcsvc_conn_init (listen_conn->svc, new_trans); +        if (!conn) { +                rpc_transport_disconnect (new_trans); +                memset (clstr, 0, RPCSVC_PEER_STRLEN); +                rpc_transport_get_peername (new_trans, clstr, +                                            RPCSVC_PEER_STRLEN); +                gf_log (GF_RPCSVC, GF_LOG_DEBUG, "allocating connection for " +                        "new transport (%s) failed", clstr); +                goto out; +        } + +        conn->listener = listener; + +        //rpcsvc_program_notify (listener, RPCSVC_EVENT_ACCEPT, conn); +out: +        return 0; +} + + +void +rpcsvc_request_destroy (rpcsvc_conn_t *conn, rpcsvc_request_t *req) +{ +        if (!conn || !req) { +                goto out; +        } + +        if (req->recordiob) { +                iobuf_unref (req->recordiob); +        } + +        if (req->vectorediob) { +                iobuf_unref (req->vectorediob); +        } + +        mem_put (conn->rxpool, req); +out: +        return; +} + + +rpcsvc_request_t * +rpcsvc_request_init (rpcsvc_conn_t *conn, struct rpc_msg *callmsg, +                     struct iovec progmsg, rpc_transport_pollin_t *msg, +                     rpcsvc_request_t *req) +{ +        if ((!conn) || (!callmsg)|| (!req) || (!msg)) +                return NULL; + +        /* We start a RPC request as always denied. */ +        req->rpc_status = MSG_DENIED; +        req->xid = rpc_call_xid (callmsg); +        req->prognum = rpc_call_program (callmsg); +        req->progver = rpc_call_progver (callmsg); +        req->procnum = rpc_call_progproc (callmsg); +        req->conn = conn; +        req->msg[0] = progmsg; +        if (msg->vectored) { +                req->msg[1].iov_base = iobuf_ptr (msg->data.vector.iobuf2); +                req->msg[1].iov_len = msg->data.vector.size2; + +                req->recordiob = iobuf_ref (msg->data.vector.iobuf1); +                req->vectorediob = iobuf_ref (msg->data.vector.iobuf2); +        } else { +                req->recordiob = iobuf_ref (msg->data.simple.iobuf); +        } + +        req->trans_private = msg->private; + +        INIT_LIST_HEAD (&req->txlist); +        req->payloadsize = 0; + +        /* By this time, the data bytes for the auth scheme would have already +         * been copied into the required sections of the req structure, +         * we just need to fill in the meta-data about it now. +         */ +        req->cred.flavour = rpc_call_cred_flavour (callmsg); +        req->cred.datalen = rpc_call_cred_len (callmsg); +        req->verf.flavour = rpc_call_verf_flavour (callmsg); +        req->verf.datalen = rpc_call_verf_len (callmsg); + +        /* AUTH */ +        rpcsvc_auth_request_init (req); +        return req; +} + + +rpcsvc_request_t * +rpcsvc_request_create (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg) +{ +        char                    *msgbuf = NULL; +        struct rpc_msg          rpcmsg; +        struct iovec            progmsg;        /* RPC Program payload */ +        rpcsvc_request_t        *req    = NULL; +        size_t                  msglen  = 0; +        int                     ret     = -1; + +        if (!conn) +                return NULL; + +        /* We need to allocate the request before actually calling +         * rpcsvc_request_init on the request so that we, can fill the auth +         * data directly into the request structure from the message iobuf. +         * This avoids a need to keep a temp buffer into which the auth data +         * would've been copied otherwise. +         */ +        rpcsvc_alloc_request (conn, req); +        if (!req) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to alloc request"); +                goto err; +        } + +        if (msg->vectored) { +                msgbuf = iobuf_ptr (msg->data.vector.iobuf1); +                msglen = msg->data.vector.size1; +        } else { +                msgbuf = iobuf_ptr (msg->data.simple.iobuf); +                msglen = msg->data.simple.size; +        } + +        ret = xdr_to_rpc_call (msgbuf, msglen, &rpcmsg, &progmsg, +                               req->cred.authdata,req->verf.authdata); + +        if (ret == -1) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC call decoding failed"); +                rpcsvc_request_seterr (req, GARBAGE_ARGS); +                goto err; +        } + +        ret = -1; +        rpcsvc_request_init (conn, &rpcmsg, progmsg, msg, req); + +        gf_log (GF_RPCSVC, GF_LOG_TRACE, "RPC XID: %lx, Ver: %ld, Program: %ld," +                " ProgVers: %ld, Proc: %ld", rpc_call_xid (&rpcmsg), +                rpc_call_rpcvers (&rpcmsg), rpc_call_program (&rpcmsg), +                rpc_call_progver (&rpcmsg), rpc_call_progproc (&rpcmsg)); + +        if (rpc_call_rpcvers (&rpcmsg) != 2) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "RPC version not supported"); +                rpcsvc_request_seterr (req, RPC_MISMATCH); +                goto err; +        } + +        ret = rpcsvc_authenticate (req); +        if (ret == RPCSVC_AUTH_REJECT) { +                /* No need to set auth_err, that is the responsibility of +                 * the authentication handler since only that know what exact +                 * error happened. +                 */ +                rpcsvc_request_seterr (req, AUTH_ERROR); +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed authentication"); +                ret = -1; +                goto err; +        } + + +        /* If the error is not RPC_MISMATCH, we consider the call as accepted +         * since we are not handling authentication failures for now. +         */ +        req->rpc_status = MSG_ACCEPTED; +        ret = 0; +err: +        if (ret == -1) { +                ret = rpcsvc_error_reply (req); +                req = NULL; +        } + +        return req; +} + + +int +rpcsvc_handle_rpc_call (rpcsvc_conn_t *conn, rpc_transport_pollin_t *msg) +{ +        rpcsvc_actor_t          *actor = NULL; +        rpcsvc_request_t        *req = NULL; +        int                     ret = -1; + +        if (!conn) +                return -1; + +        req = rpcsvc_request_create (conn, msg); +        if (!req) +                goto err; + +        if (!rpcsvc_request_accepted (req)) +                goto err_reply; + +        actor = rpcsvc_program_actor (conn, req); +        if (!actor) +                goto err_reply; + +        if (actor) { +                if (req->vectorediob) { +                        if (actor->vector_actor) { +                                rpcsvc_conn_ref (conn); +                                ret = actor->vector_actor (req, +                                                           req->vectorediob); +                        } else { +                                rpcsvc_request_seterr (req, PROC_UNAVAIL); +                                gf_log (GF_RPCSVC, GF_LOG_ERROR, +                                        "No vectored handler present"); +                                ret = RPCSVC_ACTOR_ERROR; +                        } +                } else if (actor->actor) { +                        rpcsvc_conn_ref (req->conn); +                        /* Before going to xlator code, set the THIS properly */ +                        THIS = conn->svc->mydata; +                        ret = actor->actor (req); +                } +        } + +err_reply: +        if (ret == RPCSVC_ACTOR_ERROR) +                ret = rpcsvc_error_reply (req); + +        /* No need to propagate error beyond this function since the reply +         * has now been queued. */ +        ret = 0; +err: +        return ret; +} + + +int +rpcsvc_notify (rpc_transport_t *trans, void *mydata, +               rpc_transport_event_t event, void *data, ...) +{ +        rpcsvc_conn_t          *conn      = NULL; +        rpcsvc_t               *svc       = NULL; +        int                     ret       = -1; +        rpc_transport_pollin_t *msg       = NULL; +        rpc_transport_t        *new_trans = NULL; + +        conn = mydata; +        if (conn == NULL) { +                goto out; +        } + +        svc = conn->svc; + +        switch (event) { +        case RPC_TRANSPORT_ACCEPT: +                new_trans = data; +                ret = rpcsvc_accept (conn, new_trans); +                break; + +        case RPC_TRANSPORT_DISCONNECT: +                //rpcsvc_conn_deinit (conn); +                ret = 0; +                break; + +        case RPC_TRANSPORT_MSG_RECEIVED: +                msg = data; +                ret = rpcsvc_handle_rpc_call (conn, msg); +                break; + +        case RPC_TRANSPORT_MSG_SENT: +                ret = 0; +                break; + +        case RPC_TRANSPORT_CONNECT: +                /* do nothing, no need for rpcsvc to handle this, client should +                 * handle this event +                 */ +                gf_log ("rpcsvc", GF_LOG_CRITICAL, +                        "got CONNECT event, which should have not come"); +                ret = 0; +                break; + +        case RPC_TRANSPORT_CLEANUP: +                /* FIXME: think about this later */ +                ret = 0; +                break; + +        case RPC_TRANSPORT_MAP_XID_REQUEST: +                /* FIXME: think about this later */ +                gf_log ("rpcsvc", GF_LOG_CRITICAL, +                        "got MAP_XID event, which should have not come"); +                ret = 0; +                break; +        } + +out: +        return ret; +} + + +void +rpcsvc_set_lastfrag (uint32_t *fragsize) { +        (*fragsize) |= 0x80000000U; +} + +void +rpcsvc_set_frag_header_size (uint32_t size, char *haddr) +{ +        size = htonl (size); +        memcpy (haddr, &size, sizeof (size)); +} + +void +rpcsvc_set_last_frag_header_size (uint32_t size, char *haddr) +{ +        rpcsvc_set_lastfrag (&size); +        rpcsvc_set_frag_header_size (size, haddr); +} + + +/* Given the RPC reply structure and the payload handed by the RPC program, + * encode the RPC record header into the buffer pointed by recordstart. + */ +struct iovec +rpcsvc_record_build_header (char *recordstart, size_t rlen, +                            struct rpc_msg reply, size_t payload) +{ +        struct iovec    replyhdr; +        struct iovec    txrecord = {0, 0}; +        size_t          fraglen = 0; +        int             ret = -1; + +        /* After leaving aside the 4 bytes for the fragment header, lets +         * encode the RPC reply structure into the buffer given to us. +         */ +        ret = rpc_reply_to_xdr (&reply,(recordstart + RPCSVC_FRAGHDR_SIZE), +                                rlen, &replyhdr); +        if (ret == -1) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to create RPC reply"); +                goto err; +        } + +        fraglen = payload + replyhdr.iov_len; +        gf_log (GF_RPCSVC, GF_LOG_TRACE, "Reply fraglen %zu, payload: %zu, " +                "rpc hdr: %zu", fraglen, payload, replyhdr.iov_len); + +        /* Since we're not spreading RPC records over mutiple fragments +         * we just set this fragment as the first and last fragment for this +         * record. +         */ +        rpcsvc_set_last_frag_header_size (fraglen, recordstart); + +        /* Even though the RPC record starts at recordstart+RPCSVC_FRAGHDR_SIZE +         * we need to transmit the record with the fragment header, which starts +         * at recordstart. +         */ +        txrecord.iov_base = recordstart; + +        /* Remember, this is only the vec for the RPC header and does not +         * include the payload above. We needed the payload only to calculate +         * the size of the full fragment. This size is sent in the fragment +         * header. +         */ +        txrecord.iov_len = RPCSVC_FRAGHDR_SIZE + replyhdr.iov_len; +err: +        return txrecord; +} + + +int +rpcsvc_conn_submit (rpcsvc_conn_t *conn, struct iovec *hdrvec, +                    int hdrcount, struct iovec *proghdr, int proghdrcount, +                    struct iovec *progpayload, int progpayloadcount, +                    struct iobref *iobref, void *priv) +{ +        int                   ret   = -1; +        rpc_transport_reply_t reply = {{0, }}; + +        if ((!conn) || (!hdrvec) || (!hdrvec->iov_base) || (!conn->trans)) { +                goto out; +        } + +        reply.msg.rpchdr = hdrvec; +        reply.msg.rpchdrcount = hdrcount; +        reply.msg.proghdr = proghdr; +        reply.msg.proghdrcount = proghdrcount; +        reply.msg.progpayload = progpayload; +        reply.msg.progpayloadcount = progpayloadcount; +        reply.msg.iobref = iobref; +        reply.private = priv; + +        /* Now that we have both the RPC and Program buffers in xdr format +         * lets hand it to the transmission layer. +         */ +        if (!rpcsvc_conn_check_active (conn)) { +                gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Connection inactive"); +                goto out; +        } + +        ret = rpc_transport_submit_reply (conn->trans, &reply); + +out: +        return ret; +} + + +int +rpcsvc_fill_reply (rpcsvc_request_t *req, struct rpc_msg *reply) +{ +        rpcsvc_program_t        *prog = NULL; +        if ((!req) || (!reply)) +                return -1; + +        prog = rpcsvc_request_program (req); +        rpc_fill_empty_reply (reply, req->xid); + +        if (req->rpc_status == MSG_DENIED) +                rpc_fill_denied_reply (reply, req->rpc_err, req->auth_err); +        else if (req->rpc_status == MSG_ACCEPTED) +                rpc_fill_accepted_reply (reply, req->rpc_err, prog->proglowvers, +                                         prog->proghighvers, req->verf.flavour, +                                         req->verf.datalen, +                                         req->verf.authdata); +        else +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Invalid rpc_status value"); + +        return 0; +} + + +/* Given a request and the reply payload, build a reply and encodes the reply + * into a record header. This record header is encoded into the vector pointed + * to be recbuf. + * msgvec is the buffer that points to the payload of the RPC program. + * This buffer can be NULL, if an RPC error reply is being constructed. + * The only reason it is needed here is that in case the buffer is provided, + * we should account for the length of that buffer in the RPC fragment header. + */ +struct iobuf * +rpcsvc_record_build_record (rpcsvc_request_t *req, size_t payload, +                            struct iovec *recbuf) +{ +        struct rpc_msg          reply; +        struct iobuf            *replyiob = NULL; +        char                    *record = NULL; +        struct iovec            recordhdr = {0, }; +        size_t                  pagesize = 0; +        rpcsvc_conn_t           *conn = NULL; +        rpcsvc_t                *svc = NULL; + +        if ((!req) || (!req->conn) || (!recbuf)) +                return NULL; + +        /* First, try to get a pointer into the buffer which the RPC +         * layer can use. +         */ +        conn = req->conn; +        svc = rpcsvc_conn_rpcsvc (conn); +        replyiob = iobuf_get (svc->ctx->iobuf_pool); +        pagesize = iobpool_pagesize ((struct iobuf_pool *)svc->ctx->iobuf_pool); +        if (!replyiob) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to get iobuf"); +                goto err_exit; +        } + +        record = iobuf_ptr (replyiob);  /* Now we have it. */ + +        /* Fill the rpc structure and XDR it into the buffer got above. */ +        rpcsvc_fill_reply (req, &reply); +        recordhdr = rpcsvc_record_build_header (record, pagesize, reply, +                                                payload); +        if (!recordhdr.iov_base) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to build record " +                        " header"); +                iobuf_unref (replyiob); +                replyiob = NULL; +                recbuf->iov_base = NULL; +                goto err_exit; +        } + +        recbuf->iov_base = recordhdr.iov_base; +        recbuf->iov_len = recordhdr.iov_len; +err_exit: +        return replyiob; +} + + +/* + * The function to submit a program message to the RPC service. + * This message is added to the transmission queue of the + * conn. + * + * Program callers are not expected to use the msgvec->iov_base + * address for anything else. + * Nor are they expected to free it once this function returns. + * Once the transmission of the buffer is completed by the RPC service, + * the memory area as referenced through @msg will be unrefed. + * If a higher layer does not want anything to do with this iobuf + * after this function returns, it should call unref on it. For keeping + * it around till the transmission is actually complete, rpcsvc also refs it. + *  * + * If this function returns an error by returning -1, the + * higher layer programs should assume that a disconnection happened + * and should know that the conn memory area as well as the req structure + * has been freed internally. + * + * For now, this function assumes that a submit is always called + * to send a new record. Later, if there is a situation where different + * buffers for the same record come from different sources, then we'll + * need to change this code to account for multiple submit calls adding + * the buffers into a single record. + */ + +int +rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr, +                       int hdrcount, struct iovec *payload, int payloadcount, +                       struct iobref *iobref) +{ +        int                     ret         = -1, i = 0; +        struct iobuf            *replyiob   = NULL; +        struct iovec            recordhdr   = {0, }; +        rpcsvc_conn_t           *conn       = NULL; +        size_t                   msglen     = 0; +        char                     new_iobref = 0; + +        if ((!req) || (!req->conn)) +                return -1; + +        conn = req->conn; + +        for (i = 0; i < hdrcount; i++) { +                msglen += proghdr[i].iov_len; +        } + +        for (i = 0; i < payloadcount; i++) { +                msglen += payload[i].iov_len; +        } + +        gf_log (GF_RPCSVC, GF_LOG_TRACE, "Tx message: %zu", msglen); + +        /* Build the buffer containing the encoded RPC reply. */ +        replyiob = rpcsvc_record_build_record (req, msglen, &recordhdr); +        if (!replyiob) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR,"Reply record creation failed"); +                goto disconnect_exit; +        } + +        if (!iobref) { +                iobref = iobref_new (); +                if (!iobref) { +                        gf_log (GF_RPCSVC, GF_LOG_ERROR, "memory allocation " +                                "failed"); +                        goto disconnect_exit; +                } + +                new_iobref = 1; +        } + +        iobref_add (iobref, replyiob); + +        ret = rpcsvc_conn_submit (conn, &recordhdr, 1, proghdr, hdrcount, +                                  payload, payloadcount, iobref, +                                  req->trans_private); + +        rpcsvc_request_destroy (conn, req); + +        if (ret == -1) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to submit message"); +        } + +disconnect_exit: +        if (replyiob) { +                iobuf_unref (replyiob); +        } + +        if (new_iobref) { +                iobref_unref (iobref); +        } + +        /* Note that a unref is called everytime a reply is sent. This is in +         * response to the ref that is performed on the conn when a request is +         * handed to the RPC program. +         * +         * The catch, however, is that if the reply is an rpc error, we must +         * not unref. This is because the ref only contains +         * references for the actors to which the request was handed plus one +         * reference maintained by the RPC layer. By unrefing for a case where +         * no actor was called, we will be losing the ref held for the RPC +         * layer. +         */ +        if ((rpcsvc_request_accepted (req)) && +            (rpcsvc_request_accepted_success (req))) +                rpcsvc_conn_unref (conn); + +        return ret; +} + + +int +rpcsvc_error_reply (rpcsvc_request_t *req) +{ +        struct iovec    dummyvec = {0, }; + +        if (!req) +                return -1; + +        /* At this point the req should already have been filled with the +         * appropriate RPC error numbers. +         */ +        return rpcsvc_submit_generic (req, &dummyvec, 0, NULL, 0, NULL); +} + + +/* Register the program with the local portmapper service. */ +int +rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, rpcsvc_conn_t *conn) +{ +        int                ret   = 0; +        struct sockaddr_in sa    = {0, }; + +        if (!newprog || !conn->trans) { +                goto out; +        } + +        if (!(pmap_set (newprog->prognum, newprog->progver, IPPROTO_TCP, +                        sa.sin_port))) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Could not register with" +                        " portmap"); +                goto out; +        } + +        ret = 0; +out: +        return ret; +} + + +int +rpcsvc_program_unregister_portmap (rpcsvc_program_t *prog) +{ +        if (!prog) +                return -1; + +        if (!(pmap_unset(prog->prognum, prog->progver))) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Could not unregister with" +                        " portmap"); +                return -1; +        } + +        return 0; +} + + +rpcsvc_listener_t * +rpcsvc_get_listener (rpcsvc_t *svc, uint16_t port) +{ +        rpcsvc_listener_t  *listener = NULL; +        char                found    = 0; + +        if (!svc) { +                goto out; +        } + +        pthread_mutex_lock (&svc->rpclock); +        { +                list_for_each_entry (listener, &svc->listeners, list) { +                        if (((struct sockaddr_in *)&listener->sa)->sin_port +                            == port) { +                                found = 1; +                                break; +                        } +                } +        } +        pthread_mutex_unlock (&svc->rpclock); + +        if (!found) { +                listener = NULL; +        } + +out: +        return listener; +} + + +/* The only difference between the generic submit and this one is that the + * generic submit is also used for submitting RPC error replies in where there + * are no payloads so the msgvec and msgbuf can be NULL. + * Since RPC programs should be using this function along with their payloads + * we must perform NULL checks before calling the generic submit. + */ +int +rpcsvc_submit_message (rpcsvc_request_t *req, struct iovec *proghdr, +                       int hdrcount, struct iovec *payload, int payloadcount, +                       struct iobref *iobref) +{ +        if ((!req) || (!req->conn) || (!proghdr) || (!proghdr->iov_base)) +                return -1; + +        return rpcsvc_submit_generic (req, proghdr, hdrcount, payload, +                                      payloadcount, iobref); +} + + +int +rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t prog) +{ +        int                     ret = -1; + +        if (!svc) +                return -1; + +        /* TODO: De-init the listening connection for this program. */ +        ret = rpcsvc_program_unregister_portmap (&prog); +        if (ret == -1) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "portmap unregistration of" +                        " program failed"); +                goto err; +        } + +        ret = 0; +        gf_log (GF_RPCSVC, GF_LOG_DEBUG, "Program unregistered: %s, Num: %d," +                " Ver: %d, Port: %d", prog.progname, prog.prognum, +                prog.progver, prog.progport); + +err: +        if (ret == -1) +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Program unregistration failed" +                        ": %s, Num: %d, Ver: %d, Port: %d", prog.progname, +                        prog.prognum, prog.progver, prog.progport); + +        return ret; +} + + +int +rpcsvc_conn_peername (rpcsvc_conn_t *conn, char *hostname, int hostlen) +{ +        if (!conn || !conn->trans) +                return -1; + +        return rpc_transport_get_peername (conn->trans, hostname, hostlen); +} + + +int +rpcsvc_conn_peeraddr (rpcsvc_conn_t *conn, char *addrstr, int addrlen, +                      struct sockaddr *sa, socklen_t sasize) +{ +        if (!conn || !conn->trans) +                return -1; + +        return rpc_transport_get_peeraddr(conn->trans, addrstr, addrlen, sa, +                                          sasize); +} + + +rpcsvc_conn_t * +rpcsvc_conn_create (rpcsvc_t *svc, dict_t *options, char *name) +{ +        int            ret   = -1; +        rpc_transport_t   *trans = NULL; +        rpcsvc_conn_t *conn  = NULL; + +        trans = rpc_transport_load (svc->ctx, options, name); +        if (!trans) { +                gf_log (GF_RPCSVC, GF_LOG_DEBUG, "cannot create listener, " +                        "initing the transport failed"); +                goto out; +        } + +        ret = rpc_transport_listen (trans); +        if (ret == -1) { +                gf_log (GF_RPCSVC, GF_LOG_DEBUG, +                        "listening on transport failed"); +                goto out; +        } + +        conn = rpcsvc_conn_init (svc, trans); +        if (!conn) { +                ret = -1; +                gf_log (GF_RPCSVC, GF_LOG_DEBUG, +                        "initializing connection for transport failed"); +                goto out; +        } + +        ret = 0; +out: +        if ((ret == -1) && (trans)) { +                rpc_transport_disconnect (trans); +        } + +        return conn; +} + +rpcsvc_listener_t * +rpcsvc_listener_alloc (rpcsvc_t *svc, rpcsvc_conn_t *conn) +{ +        rpcsvc_listener_t *listener = NULL; +        int                ret      = -1; + +        listener = GF_CALLOC (1, sizeof (*listener), +                              gf_common_mt_rpcsvc_listener_t); +        if (!listener) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "memory allocation failed"); +                goto out; +        } + +        /* TODO: unresolved symbol */ +        ret = rpc_transport_get_myaddr (conn->trans, NULL, 0, +                                        &listener->sa, +                                        sizeof (listener->sa)); +        ret = 0; +        if (ret == -1) { +                GF_FREE (listener); +                listener = NULL; +                goto out; +        } + +        listener->conn = conn; + +        INIT_LIST_HEAD (&listener->list); + +        pthread_mutex_lock (&svc->rpclock); +        { +                list_add_tail (&listener->list, &svc->listeners); +        } +        pthread_mutex_unlock (&svc->rpclock); +out: +        return listener; +} + + +rpcsvc_listener_t * +rpcsvc_create_listener (rpcsvc_t *svc, dict_t *options, char *name) +{ +        rpcsvc_conn_t     *conn     = NULL; +        rpcsvc_listener_t *listener = NULL; + +        if (!svc || !options) { +                goto out; +        } + +        conn = rpcsvc_conn_create (svc, options, name); +        if (!conn) { +                goto out; +        } + +        listener = rpcsvc_listener_alloc (svc, conn); +        if (listener == NULL) { +                goto out; +        } + +        conn->listener = listener; +out: +        if (!listener && conn) { +                rpcsvc_conn_deinit (conn); +        } + +        return listener; +} + + +int +rpcsvc_unregister_notify (rpcsvc_t *svc, rpcsvc_notify_t notify, void *mydata) +{ +        rpcsvc_notify_wrapper_t *wrapper = NULL, *tmp = NULL; +        int                      ret     = 0; + +        if (!svc || !notify) { +                goto out; +        } + +        pthread_mutex_lock (&svc->rpclock); +        { +                list_for_each_entry_safe (wrapper, tmp, &svc->notify, list) { +                        if ((wrapper->notify == notify) +                            && (mydata == wrapper->data)) { +                                list_del_init (&wrapper->list); +                                GF_FREE (wrapper); +                                ret++; +                        } +                } +        } +        pthread_mutex_unlock (&svc->rpclock); + +out: +        return ret; +} + +int +rpcsvc_register_notify (rpcsvc_t *svc, rpcsvc_notify_t notify, void *mydata) +{ +        rpcsvc_notify_wrapper_t *wrapper = NULL; +        int                      ret     = -1; + +        wrapper = rpcsvc_notify_wrapper_alloc (); +        if (!wrapper) { +                goto out; +        } +        svc->mydata   = mydata;  /* this_xlator */ +        wrapper->data = mydata; +        wrapper->notify = notify; + +        pthread_mutex_lock (&svc->rpclock); +        { +                list_add_tail (&wrapper->list, &svc->notify); +                svc->notify_count++; +        } +        pthread_mutex_unlock (&svc->rpclock); + +        ret = 0; +out: +        return ret; +} + + + +int +rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t program) +{ +        rpcsvc_program_t        *newprog       = NULL; +        int                      ret           = -1; +        rpcsvc_listener_t       *listener      = NULL; + +        if (!svc) +                return -1; + +        newprog = GF_CALLOC (1, sizeof(*newprog), gf_common_mt_rpcsvc_program_t); +        if (!newprog) +                return -1; + +        if (!program.actors) +                goto free_prog; + +        memcpy (newprog, &program, sizeof (program)); + +        listener = svc->listener; + +        ret = rpcsvc_program_register_portmap (newprog, listener->conn); +        if (ret == -1) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "portmap registration of" +                        " program failed"); +                goto free_prog; +        } + +        pthread_mutex_lock (&svc->rpclock); +        { +                list_add_tail (&newprog->program, &svc->programs); +        } +        pthread_mutex_unlock (&svc->rpclock); + +        ret = 0; +        gf_log (GF_RPCSVC, GF_LOG_DEBUG, "New program registered: %s, Num: %d," +                " Ver: %d, Port: %d", newprog->progname, newprog->prognum, +                newprog->progver, newprog->progport); + +free_prog: +        if (ret == -1) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Program registration failed:" +                        " %s, Num: %d, Ver: %d, Port: %d", newprog->progname, +                        newprog->prognum, newprog->progver, newprog->progport); +                GF_FREE (newprog); +        } + +        return ret; +} + + +int +rpcsvc_init_options (rpcsvc_t *svc, dict_t *options) +{ +        svc->memfactor = RPCSVC_DEFAULT_MEMFACTOR; +        return 0; +} + + +/* The global RPC service initializer. + */ +rpcsvc_t * +rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options) +{ +        rpcsvc_t          *svc      = NULL; +        int                ret      = -1; +        rpcsvc_listener_t *listener = NULL; + +        if ((!ctx) || (!options)) +                return NULL; + +        svc = GF_CALLOC (1, sizeof (*svc), gf_common_mt_rpcsvc_t); +        if (!svc) +                return NULL; + +        pthread_mutex_init (&svc->rpclock, NULL); +        INIT_LIST_HEAD (&svc->authschemes); +        INIT_LIST_HEAD (&svc->notify); +        INIT_LIST_HEAD (&svc->listeners); +        INIT_LIST_HEAD (&svc->programs); + +        ret = rpcsvc_init_options (svc, options); +        if (ret == -1) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to init options"); +                goto free_svc; +        } + +        ret = rpcsvc_auth_init (svc, options); +        if (ret == -1) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "Failed to init " +                        "authentication"); +                goto free_svc; +        } + +        ret = -1; +        svc->options = options; +        svc->ctx = ctx; +        gf_log (GF_RPCSVC, GF_LOG_DEBUG, "RPC service inited."); + +        /* One listen port per RPC */ +        listener = rpcsvc_get_listener (svc, 0); +        if (!listener) { +                /* FIXME: listener is given the name of first program that +                 * creates it. This is not always correct. For eg., multiple +                 * programs can be listening on the same listener +                 * (glusterfs 3.1.0, 3.1.2, 3.1.3 etc). +                 */ +                listener = rpcsvc_create_listener (svc, options, "RPC"); +                if (!listener) { +                        gf_log (GF_RPCSVC, GF_LOG_ERROR, "creation of listener" +                                " for program failed"); +                        goto free_svc; +                } +        } + +        if (!listener->conn) { +                gf_log (GF_RPCSVC, GF_LOG_ERROR, "listener with no connection " +                        "found"); +                goto free_svc; +        } + +        svc->listener = listener; + +        ret = 0; +free_svc: +        if (ret == -1) { +                GF_FREE (svc); +                svc = NULL; +        } + +        return svc; +} diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h new file mode 100644 index 00000000000..10dc32698ad --- /dev/null +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -0,0 +1,582 @@ +/* +  Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + +#ifndef _RPCSVC_H +#define _RPCSVC_H + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include "event.h" +#include "rpc-transport.h" +#include "logging.h" +#include "dict.h" +#include "mem-pool.h" +#include "list.h" +#include "iobuf.h" +#include "xdr-rpc.h" +#include "glusterfs.h" +#include "rpcsvc-common.h" + +#include <pthread.h> +#include <sys/uio.h> +#include <inttypes.h> +#include "compat.h" + +#ifndef NGRPS +#define NGRPS 16 +#endif /* !NGRPS */ + +#define GF_RPCSVC       "rpc-service" +#define RPCSVC_THREAD_STACK_SIZE ((size_t)(1024 * GF_UNIT_KB)) + +#define RPCSVC_FRAGHDR_SIZE  4       /* 4-byte RPC fragment header size */ + +#define RPCSVC_DEFAULT_MEMFACTOR        15 +#define RPCSVC_EVENTPOOL_SIZE_MULT      1024 +#define RPCSVC_POOLCOUNT_MULT           35 +#define RPCSVC_CONN_READ        (128 * GF_UNIT_KB) +#define RPCSVC_PAGE_SIZE        (128 * GF_UNIT_KB) + +/* RPC Record States */ +#define RPCSVC_READ_FRAGHDR     1 +#define RPCSVC_READ_FRAG        2 +/* The size in bytes, if crossed by a fragment will be handed over to the + * vectored actor so that it can allocate its buffers the way it wants. + * In our RPC layer, we assume that vectored RPC requests/records are never + * spread over multiple RPC fragments since that prevents us from determining + * whether the record should be handled in RPC layer completely or handed to + * the vectored handler. + */ +#define RPCSVC_VECTORED_FRAGSZ  4096 +#define RPCSVC_VECTOR_READCRED          1003 +#define RPCSVC_VECTOR_READVERFSZ        1004 +#define RPCSVC_VECTOR_READVERF          1005 +#define RPCSVC_VECTOR_IGNORE            1006 +#define RPCSVC_VECTOR_READVEC           1007 +#define RPCSVC_VECTOR_READPROCHDR       1008 + +#define rpcsvc_record_vectored_baremsg(rs) (((rs)->state == RPCSVC_READ_FRAG) && (rs)->vecstate == 0) +#define rpcsvc_record_vectored_cred(rs) ((rs)->vecstate == RPCSVC_VECTOR_READCRED) +#define rpcsvc_record_vectored_verfsz(rs) ((rs)->vecstate == RPCSVC_VECTOR_READVERFSZ) +#define rpcsvc_record_vectored_verfread(rs) ((rs)->vecstate == RPCSVC_VECTOR_READVERF) +#define rpcsvc_record_vectored_ignore(rs) ((rs)->vecstate == RPCSVC_VECTOR_IGNORE) +#define rpcsvc_record_vectored_readvec(rs) ((rs)->vecstate == RPCSVC_VECTOR_READVEC) +#define rpcsvc_record_vectored_readprochdr(rs) ((rs)->vecstate == RPCSVC_VECTOR_READPROCHDR) +#define rpcsvc_record_vectored(rs) ((rs)->fragsize > RPCSVC_VECTORED_FRAGSZ) +/* Includes bytes up to and including the credential length field. The credlen + * will be followed by @credlen bytes of credential data which will have to be + * read separately by the vectored reader. After the credentials comes the + * verifier which will also have to be read separately including the 8 bytes of + * verf flavour and verflen. + */ +#define RPCSVC_BARERPC_MSGSZ    32 +#define rpcsvc_record_readfraghdr(rs)   ((rs)->state == RPCSVC_READ_FRAGHDR) +#define rpcsvc_record_readfrag(rs)      ((rs)->state == RPCSVC_READ_FRAG) + +#define RPCSVC_LOWVERS  2 +#define RPCSVC_HIGHVERS 2 + + +#if 0 +#error "defined in /usr/include/rpc/auth.h" + +#define AUTH_NONE	0		/* no authentication */ +#define	AUTH_NULL	0		/* backward compatibility */ +#define	AUTH_SYS	1		/* unix style (uid, gids) */ +#define	AUTH_UNIX	AUTH_SYS +#define	AUTH_SHORT	2		/* short hand unix style */ +#define AUTH_DES	3		/* des style (encrypted timestamps) */ +#define AUTH_DH		AUTH_DES	/* Diffie-Hellman (this is DES) */ +#define AUTH_KERB       4               /* kerberos style */ +#endif /* */ + +#define AUTH_GLUSTERFS  5 + +typedef struct rpcsvc_program rpcsvc_program_t; + +struct rpcsvc_notify_wrapper { +        struct list_head  list; +        void             *data; +        rpcsvc_notify_t   notify; +}; +typedef struct rpcsvc_notify_wrapper rpcsvc_notify_wrapper_t; + +#define RPCSVC_CONNSTATE_CONNECTED      1 +#define RPCSVC_CONNSTATE_DISCONNECTED   2 + +#define rpcsvc_conn_check_active(conn) ((conn)->connstate==RPCSVC_CONNSTATE_CONNECTED) + +typedef struct rpcsvc_request rpcsvc_request_t; + +typedef struct rpc_conn_state rpcsvc_conn_t; +typedef struct { +        rpcsvc_conn_t   *conn; +        struct sockaddr  sa; +        struct list_head list; +} rpcsvc_listener_t; + +struct rpcsvc_config { +        int    max_block_size; +}; + +/* Contains the state for each connection that is used for transmitting and + * receiving RPC messages. + * + * Anything that can be accessed by a RPC program must be synced through + * connlock. + */ +struct rpc_conn_state { + +        /* Transport or connection state */ +        rpc_transport_t             *trans; + +        rpcsvc_t                *svc; +        /* RPC Records and Fragments assembly state. +         * All incoming data is staged here before being +         * called a full RPC message. +         */ +        /* rpcsvc_record_state_t   rstate; */ + +         /* It is possible that a client disconnects while +         * the higher layer RPC service is busy in a call. +         * In this case, we cannot just free the conn +         * structure, since the higher layer service could +         * still have a reference to it. +         * The refcount avoids freeing until all references +         * have been given up, although the connection is clos()ed at the first +         * call to unref. +         */ +        int                     connref; +        pthread_mutex_t         connlock; +        int                     connstate; + +        /* Memory pool for rpcsvc_request_t */ +        struct mem_pool         *rxpool; + +        /* The request which hasnt yet been handed to the RPC program because +         * this request is being treated as a vector request and so needs some +         * more data to be got from the network. +         */ +        /* rpcsvc_request_t        *vectoredreq; */ +        rpcsvc_listener_t        *listener; +}; + +#define RPCSVC_CONNSTATE_CONNECTED      1 +#define RPCSVC_CONNSTATE_DISCONNECTED   2 + +#define RPCSVC_MAX_AUTH_BYTES   400 +typedef struct rpcsvc_auth_data { +        int             flavour; +        int             datalen; +        char            authdata[RPCSVC_MAX_AUTH_BYTES]; +} rpcsvc_auth_data_t; + +#define rpcsvc_auth_flavour(au)    ((au).flavour) + +/* The container for the RPC call handed up to an actor. + * Dynamically allocated. Lives till the call reply is completely + * transmitted. + * */ +struct rpcsvc_request { +        /* connection over which this request came. */ +        rpcsvc_conn_t         *conn; + +        rpcsvc_program_t      *prog; + +        /* The identifier for the call from client. +         * Needed to pair the reply with the call. +         */ +        uint32_t                xid; + +        int                     prognum; + +        int                     progver; + +        int                     procnum; + +        int                     type; + +        /* Uid and gid filled by the rpc-auth module during the authentication +         * phase. +         */ +        uid_t                   uid; +        gid_t                   gid; +        pid_t                   pid; + +        uint64_t                lk_owner; +        uint64_t                gfs_id; + +        /* Might want to move this to AUTH_UNIX specifix state since this array +         * is not available for every authenticatino scheme. +         */ +        gid_t                   auxgids[NGRPS]; +        int                     auxgidcount; + + +        /* The RPC message payload, contains the data required +         * by the program actors. This is the buffer that will need to +         * be de-xdred by the actor. +         */ +        struct iovec            msg[2]; + +        /* The full message buffer allocated to store the RPC headers. +         * This buffer is ref'd when allocated why RPC svc and unref'd after +         * the buffer is handed to the actor. That means if the actor or any +         * higher layer wants to keep this buffer around, they too must ref it +         * right after entering the program actor. +         */ +        struct iobuf            *recordiob; + +        /* iobuf to hold payload of calls like write. By storing large payloads +         * starting from page-aligned addresses, performance increases while +         * accessing the payload +         */ +        struct iobuf            *vectorediob; + + +        /* Status of the RPC call, whether it was accepted or denied. */ +        int                     rpc_status; + +        /* In case, the call was denied, the RPC error is stored here +         * till the reply is sent. +         */ +        int                     rpc_err; + +        /* In case the failure happened because of an authentication problem +         * , this value needs to be assigned the correct auth error number. +         */ +        int                     auth_err; + +        /* There can be cases of RPC requests where the reply needs to +         * be built from multiple sources. For eg. where even the NFS reply can +         * contain a payload, as in the NFSv3 read reply. Here the RPC header +         * ,NFS header and the read data are brought together separately from +         * different buffers, so we need to stage the buffers temporarily here +         * before all of them get added to the connection's transmission list. +         */ +        struct list_head        txlist; + +        /* While the reply record is being built, this variable keeps track +         * of how many bytes have been added to the record. +         */ +        size_t                  payloadsize; + +        /* The credentials extracted from the rpc request */ +        rpcsvc_auth_data_t      cred; + +        /* The verified extracted from the rpc request. In request side +         * processing this contains the verifier sent by the client, on reply +         * side processing, it is filled with the verified that will be +         * sent to the client. +         */ +        rpcsvc_auth_data_t      verf; + +        /* Container for a RPC program wanting to store a temp +         * request-specific item. +         */ +        void                    *private; + +        /* Container for transport to store request-specific item */ +        void                    *trans_private; +}; + +#define rpcsvc_request_program(req) ((rpcsvc_program_t *)((req)->prog)) +#define rpcsvc_request_program_private(req) (((rpcsvc_program_t *)((req)->program))->private) +#define rpcsvc_request_conn(req)        (req)->conn +#define rpcsvc_request_accepted(req)    ((req)->rpc_status == MSG_ACCEPTED) +#define rpcsvc_request_accepted_success(req) ((req)->rpc_err == SUCCESS) +#define rpcsvc_request_uid(req)         ((req)->uid) +#define rpcsvc_request_gid(req)         ((req)->gid) +#define rpcsvc_conn_rpcsvc(conn)        ((conn)->svc) +#define rpcsvc_request_service(req)     (rpcsvc_conn_rpcsvc(rpcsvc_request_conn(req))) +#define rpcsvc_request_prog_minauth(req) (rpcsvc_request_program(req)->min_auth) +#define rpcsvc_request_cred_flavour(req) (rpcsvc_auth_flavour(req->cred)) +#define rpcsvc_request_verf_flavour(req) (rpcsvc_auth_flavour(req->verf)) + +#define rpcsvc_request_uid(req)         ((req)->uid) +#define rpcsvc_request_gid(req)         ((req)->gid) +#define rpcsvc_request_private(req)     ((req)->private) +#define rpcsvc_request_xid(req)         ((req)->xid) +#define rpcsvc_request_set_private(req,prv)  (req)->private = (void *)(prv) +#define rpcsvc_request_record_iob(rq)   ((rq)->recordiob) +#define rpcsvc_request_record_ref(req)  (iobuf_ref ((req)->recordiob)) +#define rpcsvc_request_record_unref(req) (iobuf_unref ((req)->recordiob)) + + +#define RPCSVC_ACTOR_SUCCESS    0 +#define RPCSVC_ACTOR_ERROR      (-1) + +/* Functor for every type of protocol actor + * must be defined like this. + * + * See the request structure for info on how to handle the request + * in the program actor. + * + * On successful santify checks inside the actor, it should return + * RPCSVC_ACTOR_SUCCESS. + * On an error, on which the RPC layer is expected to return a reply, the actor + * should return RPCSVC_ACTOR_ERROR. + * + */ +typedef int (*rpcsvc_actor) (rpcsvc_request_t *req); +typedef int (*rpcsvc_vector_actor) (rpcsvc_request_t *req, struct iobuf *iob); +typedef int (*rpcsvc_vector_sizer) (rpcsvc_request_t *req, ssize_t *readsize, +                                    int *newiob); + +/* Every protocol actor will also need to specify the function the RPC layer + * will use to serialize or encode the message into XDR format just before + * transmitting on the connection. + */ +typedef void *(*rpcsvc_encode_reply) (void *msg); + +/* Once the reply has been transmitted, the message will have to be de-allocated + * , so every actor will need to provide a function that deallocates the message + * it had allocated as a response. + */ +typedef void (*rpcsvc_deallocate_reply) (void *msg); + + +#define RPCSVC_NAME_MAX            32 +/* The descriptor for each procedure/actor that runs + * over the RPC service. + */ +typedef struct rpcsvc_actor_desc { +        char                    procname[RPCSVC_NAME_MAX]; +        int                     procnum; +        rpcsvc_actor            actor; + +        /* Handler for cases where the RPC requests fragments are large enough +         * to benefit from being decoded into aligned memory addresses. While +         * decoding the request in a non-vectored manner, due to the nature of +         * the XDR scheme, RPC cannot guarantee memory aligned addresses for +         * the resulting message-specific structures. Allowing a specialized +         * handler for letting the RPC program read the data from the network +         * directly into its alligned buffers. +         */ +        rpcsvc_vector_actor     vector_actor; +        rpcsvc_vector_sizer     vector_sizer; + +} rpcsvc_actor_t; + +/* Describes a program and its version along with the function pointers + * required to handle the procedures/actors of each program/version. + * Never changed ever by any thread so no need for a lock. + */ +struct rpcsvc_program { +        char                    progname[RPCSVC_NAME_MAX]; +        int                     prognum; +        int                     progver; +        /* FIXME */ +        dict_t                 *options;        /* An opaque dictionary +                                                 * populated by the program +                                                 * (probably from xl->options) +                                                 * which contain enough +                                                 * information for transport to +                                                 * initialize. As a part of +                                                 * cleanup, the members of +                                                 * options which are of interest +                                                 * to transport should be put +                                                 * into a structure for better +                                                 * readability and structure +                                                 * should replace options member +                                                 * here. +                                                 */ +        uint16_t                progport;       /* Registered with portmap */ +#if 0 +        int                     progaddrfamily; /* AF_INET or AF_INET6 */ +        char                    *proghost;      /* Bind host, can be NULL */ +#endif +        rpcsvc_actor_t          *actors;        /* All procedure handlers */ +        int                     numactors;      /* Num actors in actor array */ +        int                     proghighvers;   /* Highest ver for program +                                                   supported by the system. */ +        int                     proglowvers;    /* Lowest ver */ + +        /* Program specific state handed to actors */ +        void                    *private; + + +        /* This upcall is provided by the program during registration. +         * It is used to notify the program about events like connection being +         * destroyed etc. The rpc program may take appropriate actions, for eg., +         * in the case of connection being destroyed, it should cleanup its +         * state stored in the connection. +         */ +        rpcsvc_notify_t         notify; + +        /* An integer that identifies the min auth strength that is required +         * by this protocol, for eg. MOUNT3 needs AUTH_UNIX at least. +         * See RFC 1813, Section 5.2.1. +         */ +        int                     min_auth; + +        /* list member to link to list of registered services with rpcsvc */ +        struct list_head        program; +}; + + +/* All users of RPC services should use this API to register their + * procedure handlers. + */ +extern int +rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t program); + +extern int +rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t program); + +/* This will create and add a listener to listener pool. Programs can + * use any of the listener in this pool. A single listener can be used by + * multiple programs and vice versa. There can also be a one to one mapping + * between a program and a listener. After registering a program with rpcsvc, + * the program has to be associated with a listener using + * rpcsvc_program_register_portmap. + */ +/* FIXME: can multiple programs registered on same port? */ +extern rpcsvc_listener_t * +rpcsvc_create_listener (rpcsvc_t *svc, dict_t *options, char *name); + +extern int +rpcsvc_program_register_portmap (rpcsvc_program_t *newprog, +                                 rpcsvc_conn_t *conn); + +/* Inits the global RPC service data structures. + * Called in main. + */ +extern rpcsvc_t * +rpcsvc_init (glusterfs_ctx_t *ctx, dict_t *options); + +int +rpcsvc_register_notify (rpcsvc_t *svc, rpcsvc_notify_t notify, void *mydata); + +/* unregister a notification callback @notify with data @mydata from svc. + * returns the number of notification callbacks unregistered. + */ +int +rpcsvc_unregister_notify (rpcsvc_t *svc, rpcsvc_notify_t notify, void *mydata); + +int +rpcsvc_submit_message (rpcsvc_request_t *req, struct iovec *proghdr, +                       int hdrcount, struct iovec *payload, int payloadcount, +                       struct iobref *iobref); + +int +rpcsvc_submit_generic (rpcsvc_request_t *req, struct iovec *proghdr, +                       int hdrcount, struct iovec *payload, int payloadcount, +                       struct iobref *iobref); + +extern int +rpcsvc_error_reply (rpcsvc_request_t *req); + +#define RPCSVC_PEER_STRLEN      1024 +#define RPCSVC_AUTH_ACCEPT      1 +#define RPCSVC_AUTH_REJECT      2 +#define RPCSVC_AUTH_DONTCARE    3 + +extern int +rpcsvc_conn_peername (rpcsvc_conn_t *conn, char *hostname, int hostlen); + +extern int +rpcsvc_conn_peeraddr (rpcsvc_conn_t *conn, char *addrstr, int addrlen, +                      struct sockaddr *returnsa, socklen_t sasize); + +extern int +rpcsvc_conn_peer_check (dict_t *options, char *volname, rpcsvc_conn_t *conn); + +extern int +rpcsvc_conn_privport_check (rpcsvc_t *svc, char *volname, rpcsvc_conn_t *conn); +#define rpcsvc_request_seterr(req, err)                 (req)->rpc_err = err +#define rpcsvc_request_set_autherr(req, err)            (req)->auth_err = err + +extern int rpcsvc_submit_vectors (rpcsvc_request_t *req); + +extern int rpcsvc_request_attach_vector (rpcsvc_request_t *req, +                                         struct iovec msgvec, struct iobuf *iob, +                                         struct iobref *ioref, int finalvector); + + +typedef int (*auth_init_conn) (rpcsvc_conn_t *conn, void *priv); +typedef int (*auth_init_request) (rpcsvc_request_t *req, void *priv); +typedef int (*auth_request_authenticate) (rpcsvc_request_t *req, void *priv); + +/* This structure needs to be registered by every authentication scheme. + * Our authentication schemes are stored per connection because + * each connection will end up using a different authentication scheme. + */ +typedef struct rpcsvc_auth_ops { +        auth_init_conn                conn_init; +        auth_init_request             request_init; +        auth_request_authenticate     authenticate; +} rpcsvc_auth_ops_t; + +typedef struct rpcsvc_auth_flavour_desc { +        char                    authname[RPCSVC_NAME_MAX]; +        int                     authnum; +        rpcsvc_auth_ops_t       *authops; +        void                    *authprivate; +} rpcsvc_auth_t; + +typedef void * (*rpcsvc_auth_initer_t) (rpcsvc_t *svc, dict_t *options); + +struct rpcsvc_auth_list { +        struct list_head        authlist; +        rpcsvc_auth_initer_t    init; +        /* Should be the name with which we identify the auth scheme given +         * in the volfile options. +         * This should be different from the authname in rpc_auth_t +         * in way that makes it easier to specify this scheme in the volfile. +         * This is because the technical names of the schemes can be a bit +         * arcane. +         */ +        char                    name[RPCSVC_NAME_MAX]; +        rpcsvc_auth_t           *auth; +        int                     enable; +}; + +extern int +rpcsvc_auth_request_init (rpcsvc_request_t *req); + +extern int +rpcsvc_auth_init (rpcsvc_t *svc, dict_t *options); + +extern int +rpcsvc_auth_conn_init (rpcsvc_conn_t *xprt); + +extern int +rpcsvc_authenticate (rpcsvc_request_t *req); + +extern int +rpcsvc_auth_array (rpcsvc_t *svc, char *volname, int *autharr, int arrlen); + +/* If the request has been sent using AUTH_UNIX, this function returns the + * auxiliary gids as an array, otherwise, it returns NULL. + * Move to auth-unix specific source file when we need to modularize the + * authentication code even further to support mode auth schemes. + */ +extern gid_t * +rpcsvc_auth_unix_auxgids (rpcsvc_request_t *req, int *arrlen); + +extern int +rpcsvc_combine_gen_spec_volume_checks (int gen, int spec); + +extern char * +rpcsvc_volume_allowed (dict_t *options, char *volname); +#endif diff --git a/rpc/rpc-lib/src/xdr-common.h b/rpc/rpc-lib/src/xdr-common.h new file mode 100644 index 00000000000..0c9ffb2f827 --- /dev/null +++ b/rpc/rpc-lib/src/xdr-common.h @@ -0,0 +1,68 @@ +/* +  Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + +#ifndef _XDR_COMMON_H_ +#define _XDR_COMMON_H_ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include <rpc/rpc.h> + +#if GF_DARWIN_HOST_OS +#define xdr_u_quad_t xdr_u_int64_t +#define xdr_quad_t   xdr_int64_t +#define xdr_uint32_t xdr_u_int32_t +#endif + +struct auth_glusterfs_parms { +	u_quad_t lk_owner; +	u_int pid; +	u_int uid; +	u_int gid; +	u_int ngrps; +	u_int groups[16]; +}; +typedef struct auth_glusterfs_parms auth_glusterfs_parms; + +bool_t +xdr_auth_glusterfs_parms (XDR *xdrs, auth_glusterfs_parms *objp); + +#define XDR_BYTES_PER_UNIT      4 + +/* Returns the address of the byte that follows the + * last byte used for decoding the previous xdr component. + * For eg, once the RPC call for NFS has been decoded, thie macro will return + * the address from which the NFS header starts. + */ +#define xdr_decoded_remaining_addr(xdr)        ((&xdr)->x_private) + +/* Returns the length of the remaining record after the previous decode + * operation completed. + */ +#define xdr_decoded_remaining_len(xdr)         ((&xdr)->x_handy) + +/* Returns the number of bytes used by the last encode operation. */ +#define xdr_encoded_length(xdr) (((size_t)(&xdr)->x_private) - ((size_t)(&xdr)->x_base)) + +#define xdr_decoded_length(xdr) (((size_t)(&xdr)->x_private) - ((size_t)(&xdr)->x_base)) + +#endif diff --git a/rpc/rpc-lib/src/xdr-rpc.c b/rpc/rpc-lib/src/xdr-rpc.c new file mode 100644 index 00000000000..088bddbf5b7 --- /dev/null +++ b/rpc/rpc-lib/src/xdr-rpc.c @@ -0,0 +1,193 @@ +/* +  Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include <rpc/rpc.h> +#include <rpc/pmap_clnt.h> +#include <arpa/inet.h> +#include <rpc/xdr.h> +#include <sys/uio.h> +#include <rpc/auth_unix.h> + +#include "mem-pool.h" +#include "xdr-rpc.h" +#include "xdr-common.h" +#include "logging.h" + +/* Decodes the XDR format in msgbuf into rpc_msg. + * The remaining payload is returned into payload. + */ +int +xdr_to_rpc_call (char *msgbuf, size_t len, struct rpc_msg *call, +                 struct iovec *payload, char *credbytes, char *verfbytes) +{ +        XDR                     xdr; +        char                    opaquebytes[MAX_AUTH_BYTES]; +        struct opaque_auth      *oa = NULL; + +        if ((!msgbuf) || (!call)) +                return -1; + +        memset (call, 0, sizeof (*call)); + +        oa = &call->rm_call.cb_cred; +        if (!credbytes) +                oa->oa_base = opaquebytes; +        else +                oa->oa_base = credbytes; + +        oa = &call->rm_call.cb_verf; +        if (!verfbytes) +                oa->oa_base = opaquebytes; +        else +                oa->oa_base = verfbytes; + +        xdrmem_create (&xdr, msgbuf, len, XDR_DECODE); +        if (!xdr_callmsg (&xdr, call)) +                return -1; + +        if (payload) { +                payload->iov_base = xdr_decoded_remaining_addr (xdr); +                payload->iov_len = xdr_decoded_remaining_len (xdr); +        } + +        return 0; +} + + +bool_t +true_func (XDR *s, caddr_t *a) +{ +        return TRUE; +} + + +int +rpc_fill_empty_reply (struct rpc_msg *reply, uint32_t xid) +{ +        if (!reply) +                return -1; + +        /* Setting to 0 also results in reply verifier flavor to be +         * set to AUTH_NULL which is what we want right now. +         */ +        memset (reply, 0, sizeof (*reply)); +        reply->rm_xid = xid; +        reply->rm_direction = REPLY; + +        return 0; +} + +int +rpc_fill_denied_reply (struct rpc_msg *reply, int rjstat, int auth_err) +{ +        if (!reply) +                return -1; + +        reply->rm_reply.rp_stat = MSG_DENIED; +        reply->rjcted_rply.rj_stat = rjstat; +        if (rjstat == RPC_MISMATCH) { +                /* No problem with hardocoding +                 * RPC version numbers. We only support +                 * v2 anyway. +                 */ +                reply->rjcted_rply.rj_vers.low = 2; +                reply->rjcted_rply.rj_vers.high = 2; +        } else if (rjstat == AUTH_ERROR) +                reply->rjcted_rply.rj_why = auth_err; + +        return 0; +} + + +int +rpc_fill_accepted_reply (struct rpc_msg *reply, int arstat, int proglow, +                         int proghigh, int verf, int len, char *vdata) +{ +        if (!reply) +                return -1; + +        reply->rm_reply.rp_stat = MSG_ACCEPTED; +        reply->acpted_rply.ar_stat = arstat; + +        reply->acpted_rply.ar_verf.oa_flavor = verf; +        reply->acpted_rply.ar_verf.oa_length = len; +        reply->acpted_rply.ar_verf.oa_base = vdata; +        if (arstat == PROG_MISMATCH) { +                reply->acpted_rply.ar_vers.low = proglow; +                reply->acpted_rply.ar_vers.high = proghigh; +        } else if (arstat == SUCCESS) { + +                /* This is a hack. I'd really like to build a custom +                 * XDR library because Sun RPC interface is not very flexible. +                 */ +                reply->acpted_rply.ar_results.proc = (xdrproc_t)true_func; +                reply->acpted_rply.ar_results.where = NULL; +        } + +        return 0; +} + +int +rpc_reply_to_xdr (struct rpc_msg *reply, char *dest, size_t len, +                  struct iovec *dst) +{ +        XDR             xdr; + +        if ((!dest) || (!reply) || (!dst)) +                return -1; + +        xdrmem_create (&xdr, dest, len, XDR_ENCODE); +        if (!xdr_replymsg(&xdr, reply)) +                return -1; + +        dst->iov_base = dest; +        dst->iov_len = xdr_encoded_length (xdr); + +        return 0; +} + + +int +xdr_to_auth_unix_cred (char *msgbuf, int msglen, struct authunix_parms *au, +                       char *machname, gid_t *gids) +{ +        XDR             xdr; + +        if ((!msgbuf) || (!machname) || (!gids) || (!au)) +                return -1; + +        au->aup_machname = machname; +#ifdef GF_DARWIN_HOST_OS +        au->aup_gids = (int *)gids; +#else +        au->aup_gids = gids; +#endif + +        xdrmem_create (&xdr, msgbuf, msglen, XDR_DECODE); + +        if (!xdr_authunix_parms (&xdr, au)) +                return -1; + +        return 0; +} diff --git a/rpc/rpc-lib/src/xdr-rpc.h b/rpc/rpc-lib/src/xdr-rpc.h new file mode 100644 index 00000000000..4c0ee69b138 --- /dev/null +++ b/rpc/rpc-lib/src/xdr-rpc.h @@ -0,0 +1,74 @@ +/* +  Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + +#ifndef _XDR_RPC_H +#define _XDR_RPC_H_ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include <rpc/rpc.h> +#include <rpc/pmap_clnt.h> +#include <arpa/inet.h> +#include <rpc/xdr.h> +#include <sys/uio.h> + +/* Converts a given network buffer from its XDR format to a structure + * that contains everything an RPC call needs to work. + */ +extern int +xdr_to_rpc_call (char *msgbuf, size_t len, struct rpc_msg *call, +                 struct iovec *payload, char *credbytes, char *verfbytes); + +extern int +rpc_fill_empty_reply (struct rpc_msg *reply, uint32_t xid); + +extern int +rpc_fill_denied_reply (struct rpc_msg *reply, int rjstat, int auth_err); + +extern int +rpc_fill_accepted_reply (struct rpc_msg *reply, int arstat, int proglow, +                         int proghigh, int verf, int len, char *vdata); +extern int +rpc_reply_to_xdr (struct rpc_msg *reply, char *dest, size_t len, +                  struct iovec *dst); + +extern int +xdr_to_auth_unix_cred (char *msgbuf, int msglen, struct authunix_parms *au, +                       char *machname, gid_t *gids); +/* Macros that simplify accesing the members of an RPC call structure. */ +#define rpc_call_xid(call)              ((call)->rm_xid) +#define rpc_call_direction(call)        ((call)->rm_direction) +#define rpc_call_rpcvers(call)          ((call)->ru.RM_cmb.cb_rpcvers) +#define rpc_call_program(call)          ((call)->ru.RM_cmb.cb_prog) +#define rpc_call_progver(call)          ((call)->ru.RM_cmb.cb_vers) +#define rpc_call_progproc(call)         ((call)->ru.RM_cmb.cb_proc) +#define rpc_opaque_auth_flavour(oa)     ((oa)->oa_flavor) +#define rpc_opaque_auth_len(oa)         ((oa)->oa_length) + +#define rpc_call_cred_flavour(call)     (rpc_opaque_auth_flavour ((&(call)->ru.RM_cmb.cb_cred))) +#define rpc_call_cred_len(call)         (rpc_opaque_auth_len ((&(call)->ru.RM_cmb.cb_cred))) + + +#define rpc_call_verf_flavour(call)     (rpc_opaque_auth_flavour ((&(call)->ru.RM_cmb.cb_verf))) +#define rpc_call_verf_len(call)         (rpc_opaque_auth_len ((&(call)->ru.RM_cmb.cb_verf))) + +#endif diff --git a/rpc/rpc-lib/src/xdr-rpcclnt.c b/rpc/rpc-lib/src/xdr-rpcclnt.c new file mode 100644 index 00000000000..98676ae61ab --- /dev/null +++ b/rpc/rpc-lib/src/xdr-rpcclnt.c @@ -0,0 +1,131 @@ +/* +  Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include <rpc/rpc.h> +#include <rpc/pmap_clnt.h> +#include <arpa/inet.h> +#include <rpc/xdr.h> +#include <sys/uio.h> +#include <rpc/auth_unix.h> +#include <errno.h> + +#include "mem-pool.h" +#include "xdr-rpc.h" +#include "xdr-common.h" +#include "logging.h" + +/* Decodes the XDR format in msgbuf into rpc_msg. + * The remaining payload is returned into payload. + */ +int +xdr_to_rpc_reply (char *msgbuf, size_t len, struct rpc_msg *reply, +                  struct iovec *payload, char *verfbytes) +{ +        XDR                     xdr; +        int                     ret = -1; + +        if ((!msgbuf) || (!reply)) { +                ret = -EINVAL; +                goto out; +        } + +        memset (reply, 0, sizeof (struct rpc_msg)); + +        reply->acpted_rply.ar_verf = _null_auth; +        reply->acpted_rply.ar_results.where = NULL; +        reply->acpted_rply.ar_results.proc = (xdrproc_t)(xdr_void); + +        xdrmem_create (&xdr, msgbuf, len, XDR_DECODE); +        if (!xdr_replymsg (&xdr, reply)) { +                ret = -errno; +                goto out; +        } +        if (payload) { +                payload->iov_base = xdr_decoded_remaining_addr (xdr); +                payload->iov_len = xdr_decoded_remaining_len (xdr); +        } + +        ret = 0; +out: +        return ret; +} + +#if 0 +bool_t +true_func (XDR *s, caddr_t *a) +{ +        return TRUE; +} +#endif + +int +rpc_request_to_xdr (struct rpc_msg *request, char *dest, size_t len, +                    struct iovec *dst) +{ +        XDR xdr; +        int ret = -1; + +        if ((!dest) || (!request) || (!dst)) { +                goto out; +        } + +        xdrmem_create (&xdr, dest, len, XDR_ENCODE); +        if (!xdr_callmsg (&xdr, request)) { +                goto out; +        } + +        dst->iov_base = dest; +        dst->iov_len = xdr_encoded_length (xdr); + +        ret = 0; + +out: +        return ret; +} + + +int +auth_unix_cred_to_xdr (struct authunix_parms *au, char *dest, size_t len, +                       struct iovec *iov) +{ +        XDR xdr; +        int ret = -1; + +        if (!au || !dest || !iov) { +                goto out; +        } + +        xdrmem_create (&xdr, dest, len, XDR_DECODE); + +        if (!xdr_authunix_parms (&xdr, au)) { +                goto out; +        } + +        iov->iov_base = dest; +        iov->iov_len = xdr_encoded_length (xdr); + +        ret = 0; +out: +        return ret; +} diff --git a/rpc/rpc-lib/src/xdr-rpcclnt.h b/rpc/rpc-lib/src/xdr-rpcclnt.h new file mode 100644 index 00000000000..37c3046d330 --- /dev/null +++ b/rpc/rpc-lib/src/xdr-rpcclnt.h @@ -0,0 +1,51 @@ +/* +  Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com> +  This file is part of GlusterFS. + +  GlusterFS is free software; you can redistribute it and/or modify +  it under the terms of the GNU General Public License as published +  by the Free Software Foundation; either version 3 of the License, +  or (at your option) any later version. + +  GlusterFS is distributed in the hope that it will be useful, but +  WITHOUT ANY WARRANTY; without even the implied warranty of +  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU +  General Public License for more details. + +  You should have received a copy of the GNU General Public License +  along with this program.  If not, see +  <http://www.gnu.org/licenses/>. +*/ + +#ifndef _XDR_RPCCLNT_H +#define _XDR_RPCCLNT_H + +#ifndef _CONFIG_H +#define _CONFIG_H +#include "config.h" +#endif + +#include <rpc/rpc.h> +#include <rpc/pmap_clnt.h> +#include <arpa/inet.h> +#include <rpc/xdr.h> +#include <sys/uio.h> +#include <rpc/rpc_msg.h> +#include <rpc/auth_unix.h> + +/* Macros that simplify accesing the members of an RPC call structure. */ +#define rpc_reply_xid(reply)              ((reply)->rm_xid) +#define rpc_reply_status(reply)           ((reply)->ru.RM_rmb.rp_stat) +#define rpc_accepted_reply_status(reply)  ((reply)->acpted_rply.ar_stat) +#define rpc_reply_verf_flavour(reply)     ((reply)->acpted_rply.ar_verf.oa_flavor) + +int xdr_to_rpc_reply (char *msgbuf, size_t len, struct rpc_msg *reply, +                      struct iovec *payload, char *verfbytes); +int +rpc_request_to_xdr (struct rpc_msg *request, char *dest, size_t len, +                    struct iovec *dst); +int +auth_unix_cred_to_xdr (struct authunix_parms *au, char *dest, size_t len, +                       struct iovec *iov); + +#endif  | 
