summaryrefslogtreecommitdiffstats
path: root/rpc
diff options
context:
space:
mode:
authorPranith K <pranithk@gluster.com>2011-03-10 02:18:22 +0000
committerVijay Bellur <vijay@dev.gluster.com>2011-03-10 08:09:30 -0800
commitd23585307a0e333c9b1ff627df4c7e30b3642201 (patch)
tree0e900775fff91686474051cf55375966e7028eec /rpc
parent45fd0d904d11f07f8b523af2d1357f081e3c5dc1 (diff)
rpc: Changes for handling unix domain sockets avoid race
Signed-off-by: Pranith Kumar K <pranithk@gluster.com> Signed-off-by: Vijay Bellur <vijay@dev.gluster.com> BUG: 1965 (need a cmd to get io-stat details) URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=1965
Diffstat (limited to 'rpc')
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.c109
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.h10
-rw-r--r--rpc/rpc-lib/src/rpc-transport.c79
-rw-r--r--rpc/rpc-lib/src/rpc-transport.h6
-rw-r--r--rpc/rpc-lib/src/rpcsvc.c65
-rw-r--r--rpc/rpc-lib/src/rpcsvc.h2
-rw-r--r--rpc/rpc-transport/socket/src/socket.c16
7 files changed, 211 insertions, 76 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index d496673..99bba8c 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -970,68 +970,8 @@ out:
return ret;
}
-
struct rpc_clnt *
-rpc_clnt_init (struct rpc_clnt_config *config, dict_t *options,
- glusterfs_ctx_t *ctx, char *name)
-{
- int ret = -1;
- struct rpc_clnt *rpc = NULL;
- struct rpc_clnt_connection *conn = NULL;
-
- rpc = GF_CALLOC (1, sizeof (*rpc), gf_common_mt_rpcclnt_t);
- if (!rpc) {
- gf_log ("rpc-clnt", GF_LOG_ERROR, "out of memory");
- goto out;
- }
-
- pthread_mutex_init (&rpc->lock, NULL);
- rpc->ctx = ctx;
-
- rpc->reqpool = mem_pool_new (struct rpc_req,
- RPC_CLNT_DEFAULT_REQUEST_COUNT);
- if (rpc->reqpool == NULL) {
- pthread_mutex_destroy (&rpc->lock);
- GF_FREE (rpc);
- rpc = NULL;
- goto out;
- }
-
- rpc->saved_frames_pool = mem_pool_new (struct saved_frame,
- RPC_CLNT_DEFAULT_REQUEST_COUNT);
- if (rpc->saved_frames_pool == NULL) {
- pthread_mutex_destroy (&rpc->lock);
- mem_pool_destroy (rpc->reqpool);
- GF_FREE (rpc);
- rpc = NULL;
- goto out;
- }
-
- ret = rpc_clnt_connection_init (rpc, ctx, options, name);
- if (ret == -1) {
- pthread_mutex_destroy (&rpc->lock);
- mem_pool_destroy (rpc->reqpool);
- mem_pool_destroy (rpc->saved_frames_pool);
- GF_FREE (rpc);
- rpc = NULL;
- if (options)
- dict_unref (options);
- goto out;
- }
-
- conn = &rpc->conn;
- rpc_clnt_reconnect (conn->trans);
-
- rpc = rpc_clnt_ref (rpc);
- INIT_LIST_HEAD (&rpc->programs);
-
-out:
- return rpc;
-}
-
-
-struct rpc_clnt *
-rpc_clnt_new (struct rpc_clnt_config *config, dict_t *options,
+rpc_clnt_new (dict_t *options,
glusterfs_ctx_t *ctx, char *name)
{
int ret = -1;
@@ -1607,3 +1547,50 @@ rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config)
rpc->conn.config.remote_host = gf_strdup (config->remote_host);
}
}
+
+int
+rpc_clnt_transport_unix_options_build (dict_t **options, char *filepath)
+{
+ dict_t *dict = NULL;
+ char *fpath = NULL;
+ int ret = -1;
+
+ GF_ASSERT (filepath);
+ GF_ASSERT (options);
+
+ dict = dict_new ();
+ if (!dict)
+ goto out;
+
+ fpath = gf_strdup (filepath);
+ if (!fpath) {
+ ret = -1;
+ goto out;
+ }
+
+ ret = dict_set_dynstr (dict, "transport.socket.connect-path", fpath);
+ if (ret)
+ goto out;
+
+ ret = dict_set_str (dict, "transport.address-family", "unix");
+ if (ret)
+ goto out;
+
+ ret = dict_set_str (dict, "transport.socket.nodelay", "off");
+ if (ret)
+ goto out;
+
+ ret = dict_set_str (dict, "transport-type", "socket");
+ if (ret)
+ goto out;
+
+ *options = dict;
+out:
+ if (ret) {
+ if (fpath)
+ GF_FREE (fpath);
+ if (dict)
+ dict_unref (dict);
+ }
+ return ret;
+}
diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h
index f8da51b..d3e7f2f 100644
--- a/rpc/rpc-lib/src/rpc-clnt.h
+++ b/rpc/rpc-lib/src/rpc-clnt.h
@@ -187,13 +187,7 @@ struct rpc_clnt {
};
-struct rpc_clnt * rpc_clnt_init (struct rpc_clnt_config *config,
- dict_t *options, glusterfs_ctx_t *ctx,
- char *name);
-
-
-struct rpc_clnt *rpc_clnt_new (struct rpc_clnt_config *config,
- dict_t *options, glusterfs_ctx_t *ctx,
+struct rpc_clnt *rpc_clnt_new (dict_t *options, glusterfs_ctx_t *ctx,
char *name);
int rpc_clnt_start (struct rpc_clnt *rpc);
@@ -244,4 +238,6 @@ void rpc_clnt_reconfig (struct rpc_clnt *rpc, struct rpc_clnt_config *config);
int rpcclnt_cbk_program_register (struct rpc_clnt *svc,
rpcclnt_cb_program_t *program);
+int
+rpc_clnt_transport_unix_options_build (dict_t **options, char *filepath);
#endif /* !_RPC_CLNT_H */
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c
index e7ffb06..b3bf4c9 100644
--- a/rpc/rpc-lib/src/rpc-transport.c
+++ b/rpc/rpc-lib/src/rpc-transport.c
@@ -808,7 +808,7 @@ rpc_transport_load (glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
"dlsym (gf_rpc_transport_fini) on %s", dlerror ());
goto fail;
}
-
+
trans->reconfigure = dlsym (handle, "reconfigure");
if (trans->fini == NULL) {
gf_log ("rpc-transport", GF_LOG_DEBUG,
@@ -954,6 +954,8 @@ rpc_transport_destroy (rpc_transport_t *this)
GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
+ if (this->options)
+ dict_unref (this->options);
if (this->fini)
this->fini (this);
@@ -1051,3 +1053,78 @@ rpc_transport_register_notify (rpc_transport_t *trans,
out:
return ret;
}
+
+//give negative values to skip setting that value
+//this function asserts if both the values are negative.
+//why call it if you dont set it.
+int
+rpc_transport_keepalive_options_set (dict_t *options, int32_t interval,
+ int32_t time)
+{
+ int ret = -1;
+
+ GF_ASSERT (options);
+ GF_ASSERT ((interval > 0) || (time > 0));
+
+ ret = dict_set_int32 (options,
+ "transport.socket.keepalive-interval", interval);
+ if (ret)
+ goto out;
+
+ ret = dict_set_int32 (options,
+ "transport.socket.keepalive-time", time);
+ if (ret)
+ goto out;
+out:
+ return ret;
+}
+
+int
+rpc_transport_inet_options_build (dict_t **options, const char *hostname, int port)
+{
+ dict_t *dict = NULL;
+ char *host = NULL;
+ int ret = -1;
+
+ GF_ASSERT (options);
+ GF_ASSERT (hostname);
+ GF_ASSERT (port >= 1024);
+
+ dict = dict_new ();
+ if (!dict)
+ goto out;
+
+
+ host = gf_strdup ((char*)hostname);
+ if (!hostname) {
+ ret = -1;
+ goto out;
+ }
+
+ ret = dict_set_dynstr (dict, "remote-host", host);
+ if (ret)
+ goto out;
+
+ ret = dict_set_int32 (dict, "remote-port", port);
+ if (ret)
+ goto out;
+
+ ret = dict_set_str (dict, "transport.address-family", "inet");
+ if (ret)
+ goto out;
+
+ ret = dict_set_str (dict, "transport-type", "socket");
+ if (ret)
+ goto out;
+
+ *options = dict;
+out:
+ if (ret) {
+ if (host)
+ GF_FREE (host);
+ if (dict)
+ dict_unref (dict);
+ }
+
+ return ret;
+}
diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h
index 9db24c0..a955df1 100644
--- a/rpc/rpc-lib/src/rpc-transport.h
+++ b/rpc/rpc-lib/src/rpc-transport.h
@@ -295,4 +295,10 @@ rpc_transport_pollin_alloc (rpc_transport_t *this, struct iovec *vector,
void
rpc_transport_pollin_destroy (rpc_transport_pollin_t *pollin);
+int
+rpc_transport_keepalive_options_set (dict_t *options, int32_t interval,
+ int32_t time);
+
+int
+rpc_transport_inet_options_build (dict_t **options, const char *hostname, int port);
#endif /* __RPC_TRANSPORT_H__ */
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index d949677..3e9d585 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -937,6 +937,7 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
rpcsvc_request_t *req = NULL;
int ret = -1;
uint16_t port = 0;
+ gf_boolean_t is_unix = _gf_false;
if (!trans || !svc)
return -1;
@@ -949,7 +950,9 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
case AF_INET6:
port = ((struct sockaddr_in6 *)&trans->peerinfo.sockaddr)->sin6_port;
break;
-
+ case AF_UNIX:
+ is_unix = _gf_true;
+ break;
default:
gf_log (GF_RPCSVC, GF_LOG_DEBUG,
"invalid address family (%d)",
@@ -959,14 +962,16 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
- port = ntohs (port);
+ if (is_unix == _gf_false) {
+ port = ntohs (port);
- gf_log ("rpcsvc", GF_LOG_TRACE, "Client port: %d", (int)port);
+ gf_log ("rpcsvc", GF_LOG_TRACE, "Client port: %d", (int)port);
- if (port > 1024) { //Non-privilaged user, fail request
- gf_log ("glusterd", GF_LOG_ERROR, "Request received from non-"
- "privileged port. Failing request");
- return -1;
+ if (port > 1024) { //Non-privilaged user, fail request
+ gf_log ("glusterd", GF_LOG_ERROR, "Request received from non-"
+ "privileged port. Failing request");
+ return -1;
+ }
}
req = rpcsvc_request_create (svc, trans, msg);
@@ -2193,6 +2198,52 @@ rpcsvc_init_options (rpcsvc_t *svc, dict_t *options)
return 0;
}
+int
+rpcsvc_transport_unix_options_build (dict_t **options, char *filepath)
+{
+ dict_t *dict = NULL;
+ char *fpath = NULL;
+ int ret = -1;
+
+ GF_ASSERT (filepath);
+ GF_ASSERT (options);
+
+ dict = dict_new ();
+ if (!dict)
+ goto out;
+
+ fpath = gf_strdup (filepath);
+ if (!fpath) {
+ ret = -1;
+ goto out;
+ }
+
+ ret = dict_set_dynstr (dict, "transport.socket.listen-path", fpath);
+ if (ret)
+ goto out;
+
+ ret = dict_set_str (dict, "transport.address-family", "unix");
+ if (ret)
+ goto out;
+
+ ret = dict_set_str (dict, "transport.socket.nodelay", "off");
+ if (ret)
+ goto out;
+
+ ret = dict_set_str (dict, "transport-type", "socket");
+ if (ret)
+ goto out;
+
+ *options = dict;
+out:
+ if (ret) {
+ if (fpath)
+ GF_FREE (fpath);
+ if (dict)
+ dict_unref (dict);
+ }
+ return ret;
+}
/* The global RPC service initializer.
*/
diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h
index 8625600..3d5abc2 100644
--- a/rpc/rpc-lib/src/rpcsvc.h
+++ b/rpc/rpc-lib/src/rpcsvc.h
@@ -538,4 +538,6 @@ int rpcsvc_callback_submit (rpcsvc_t *rpc, rpc_transport_t *trans,
rpcsvc_cbk_program_t *prog, int procnum,
struct iovec *proghdr, int proghdrcount);
+int
+rpcsvc_transport_unix_options_build (dict_t **options, char *filepath);
#endif
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 4ae4981..cbd3034 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -298,6 +298,8 @@ __socket_server_bind (rpc_transport_t *this)
socket_private_t *priv = NULL;
int ret = -1;
int opt = 1;
+ int reuse_check_sock = -1;
+ struct sockaddr_storage unix_addr = {0};
if (!this || !this->private)
goto out;
@@ -312,6 +314,20 @@ __socket_server_bind (rpc_transport_t *this)
"setsockopt() for SO_REUSEADDR failed (%s)",
strerror (errno));
}
+ //reuse-address doesnt work for unix type sockets
+ if (AF_UNIX == SA (&this->myinfo.sockaddr)->sa_family) {
+ memcpy (&unix_addr, SA (&this->myinfo.sockaddr),
+ this->myinfo.sockaddr_len);
+ reuse_check_sock = socket (AF_UNIX, SOCK_STREAM, 0);
+ if (reuse_check_sock > 0) {
+ ret = connect (reuse_check_sock, SA (&unix_addr),
+ this->myinfo.sockaddr_len);
+ if ((ret == -1) && (ECONNREFUSED == errno)) {
+ unlink (((struct sockaddr_un*)&unix_addr)->sun_path);
+ }
+ close (reuse_check_sock);
+ }
+ }
ret = bind (priv->sock, (struct sockaddr *)&this->myinfo.sockaddr,
this->myinfo.sockaddr_len);