summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-lib/src/rpc-transport.c
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-lib/src/rpc-transport.c')
-rw-r--r--rpc/rpc-lib/src/rpc-transport.c679
1 files changed, 679 insertions, 0 deletions
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c
new file mode 100644
index 000000000..c24d41084
--- /dev/null
+++ b/rpc/rpc-lib/src/rpc-transport.c
@@ -0,0 +1,679 @@
+/*
+ Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#include <dlfcn.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <sys/poll.h>
+#include <fnmatch.h>
+#include <stdint.h>
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "logging.h"
+#include "rpc-transport.h"
+#include "glusterfs.h"
+/* FIXME: xlator.h is needed for volume_option_t, need to define the datatype
+ * in some other header
+ */
+#include "xlator.h"
+#include "list.h"
+
+#ifndef GF_OPTION_LIST_EMPTY
+#define GF_OPTION_LIST_EMPTY(_opt) (_opt->value[0] == NULL)
+#endif
+
+
+int
+rpc_transport_get_myaddr (rpc_transport_t *this, char *peeraddr, int addrlen,
+ struct sockaddr_storage *sa, size_t salen)
+{
+ int32_t ret = -1;
+ GF_VALIDATE_OR_GOTO ("rpc", this, out);
+
+ ret = this->ops->get_myaddr (this, peeraddr, addrlen, sa, salen);
+
+out:
+ return ret;
+}
+
+int32_t
+rpc_transport_get_myname (rpc_transport_t *this, char *hostname, int hostlen)
+{
+ int32_t ret = -1;
+ GF_VALIDATE_OR_GOTO ("rpc", this, out);
+
+ ret = this->ops->get_myname (this, hostname, hostlen);
+out:
+ return ret;
+}
+
+int32_t
+rpc_transport_get_peername (rpc_transport_t *this, char *hostname, int hostlen)
+{
+ int32_t ret = -1;
+ GF_VALIDATE_OR_GOTO ("rpc", this, out);
+
+ ret = this->ops->get_peername (this, hostname, hostlen);
+out:
+ return ret;
+}
+
+int
+rpc_transport_throttle (rpc_transport_t *this, gf_boolean_t onoff)
+{
+ int ret = 0;
+
+ if (!this->ops->throttle)
+ return -ENOSYS;
+
+ ret = this->ops->throttle (this, onoff);
+
+ return ret;
+}
+
+int32_t
+rpc_transport_get_peeraddr (rpc_transport_t *this, char *peeraddr, int addrlen,
+ struct sockaddr_storage *sa, size_t salen)
+{
+ int32_t ret = -1;
+ GF_VALIDATE_OR_GOTO ("rpc", this, out);
+
+ ret = this->ops->get_peeraddr (this, peeraddr, addrlen, sa, salen);
+out:
+ return ret;
+}
+
+void
+rpc_transport_pollin_destroy (rpc_transport_pollin_t *pollin)
+{
+ GF_VALIDATE_OR_GOTO ("rpc", pollin, out);
+
+ if (pollin->iobref) {
+ iobref_unref (pollin->iobref);
+ }
+
+ if (pollin->hdr_iobuf) {
+ iobuf_unref (pollin->hdr_iobuf);
+ }
+
+ if (pollin->private) {
+ /* */
+ GF_FREE (pollin->private);
+ }
+
+ GF_FREE (pollin);
+out:
+ return;
+}
+
+
+rpc_transport_pollin_t *
+rpc_transport_pollin_alloc (rpc_transport_t *this, struct iovec *vector,
+ int count, struct iobuf *hdr_iobuf,
+ struct iobref *iobref, void *private)
+{
+ rpc_transport_pollin_t *msg = NULL;
+ msg = GF_CALLOC (1, sizeof (*msg), gf_common_mt_rpc_trans_pollin_t);
+ if (!msg) {
+ goto out;
+ }
+
+ if (count > 1) {
+ msg->vectored = 1;
+ }
+
+ memcpy (msg->vector, vector, count * sizeof (*vector));
+ msg->count = count;
+ msg->iobref = iobref_ref (iobref);
+ msg->private = private;
+ if (hdr_iobuf)
+ msg->hdr_iobuf = iobuf_ref (hdr_iobuf);
+
+out:
+ return msg;
+}
+
+
+
+rpc_transport_t *
+rpc_transport_load (glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
+{
+ struct rpc_transport *trans = NULL, *return_trans = NULL;
+ char *name = NULL;
+ void *handle = NULL;
+ char *type = NULL;
+ char str[] = "ERROR";
+ int32_t ret = -1;
+ int8_t is_tcp = 0, is_unix = 0, is_ibsdp = 0;
+ volume_opt_list_t *vol_opt = NULL;
+ gf_boolean_t bind_insecure = _gf_false;
+ xlator_t *this = NULL;
+
+ GF_VALIDATE_OR_GOTO("rpc-transport", options, fail);
+ GF_VALIDATE_OR_GOTO("rpc-transport", ctx, fail);
+ GF_VALIDATE_OR_GOTO("rpc-transport", trans_name, fail);
+
+ trans = GF_CALLOC (1, sizeof (struct rpc_transport), gf_common_mt_rpc_trans_t);
+ if (!trans)
+ goto fail;
+
+ trans->name = gf_strdup (trans_name);
+ if (!trans->name)
+ goto fail;
+
+ trans->ctx = ctx;
+ type = str;
+
+ /* Backward compatibility */
+ ret = dict_get_str (options, "transport-type", &type);
+ if (ret < 0) {
+ ret = dict_set_str (options, "transport-type", "socket");
+ if (ret < 0)
+ gf_log ("dict", GF_LOG_DEBUG,
+ "setting transport-type failed");
+ else
+ gf_log ("rpc-transport", GF_LOG_DEBUG,
+ "missing 'option transport-type'. defaulting to "
+ "\"socket\"");
+ } else {
+ {
+ /* Backword compatibility to handle * /client,
+ * * /server.
+ */
+ char *tmp = strchr (type, '/');
+ if (tmp)
+ *tmp = '\0';
+ }
+
+ is_tcp = strcmp (type, "tcp");
+ is_unix = strcmp (type, "unix");
+ is_ibsdp = strcmp (type, "ib-sdp");
+ if ((is_tcp == 0) ||
+ (is_unix == 0) ||
+ (is_ibsdp == 0)) {
+ if (is_unix == 0)
+ ret = dict_set_str (options,
+ "transport.address-family",
+ "unix");
+ if (is_ibsdp == 0)
+ ret = dict_set_str (options,
+ "transport.address-family",
+ "inet-sdp");
+
+ if (ret < 0)
+ gf_log ("dict", GF_LOG_DEBUG,
+ "setting address-family failed");
+
+ ret = dict_set_str (options,
+ "transport-type", "socket");
+ if (ret < 0)
+ gf_log ("dict", GF_LOG_DEBUG,
+ "setting transport-type failed");
+ }
+ }
+
+ /* client-bind-insecure is for clients protocol, and
+ * bind-insecure for glusterd. Both mutually exclusive
+ */
+ ret = dict_get_str (options, "client-bind-insecure", &type);
+ if (ret)
+ ret = dict_get_str (options, "bind-insecure", &type);
+ if (ret == 0) {
+ ret = gf_string2boolean (type, &bind_insecure);
+ if (ret < 0) {
+ gf_log ("rcp-transport", GF_LOG_WARNING,
+ "bind-insecure option %s is not a"
+ " valid bool option", type);
+ goto fail;
+ }
+ if (_gf_true == bind_insecure)
+ trans->bind_insecure = 1;
+ else
+ trans->bind_insecure = 0;
+ } else {
+ trans->bind_insecure = 0;
+ }
+
+ ret = dict_get_str (options, "transport-type", &type);
+ if (ret < 0) {
+ gf_log ("rpc-transport", GF_LOG_ERROR,
+ "'option transport-type <xx>' missing in volume '%s'",
+ trans_name);
+ goto fail;
+ }
+
+ ret = gf_asprintf (&name, "%s/%s.so", RPC_TRANSPORTDIR, type);
+ if (-1 == ret) {
+ goto fail;
+ }
+
+ gf_log ("rpc-transport", GF_LOG_DEBUG,
+ "attempt to load file %s", name);
+
+ handle = dlopen (name, RTLD_NOW|RTLD_GLOBAL);
+ if (handle == NULL) {
+ gf_log ("rpc-transport", GF_LOG_ERROR, "%s", dlerror ());
+ gf_log ("rpc-transport", GF_LOG_WARNING,
+ "volume '%s': transport-type '%s' is not valid or "
+ "not found on this machine",
+ trans_name, type);
+ goto fail;
+ }
+
+ trans->dl_handle = handle;
+
+ trans->ops = dlsym (handle, "tops");
+ if (trans->ops == NULL) {
+ gf_log ("rpc-transport", GF_LOG_ERROR,
+ "dlsym (rpc_transport_ops) on %s", dlerror ());
+ goto fail;
+ }
+
+ *VOID(&(trans->init)) = dlsym (handle, "init");
+ if (trans->init == NULL) {
+ gf_log ("rpc-transport", GF_LOG_ERROR,
+ "dlsym (gf_rpc_transport_init) on %s", dlerror ());
+ goto fail;
+ }
+
+ *VOID(&(trans->fini)) = dlsym (handle, "fini");
+ if (trans->fini == NULL) {
+ gf_log ("rpc-transport", GF_LOG_ERROR,
+ "dlsym (gf_rpc_transport_fini) on %s", dlerror ());
+ goto fail;
+ }
+
+ *VOID(&(trans->reconfigure)) = dlsym (handle, "reconfigure");
+ if (trans->reconfigure == NULL) {
+ gf_log ("rpc-transport", GF_LOG_DEBUG,
+ "dlsym (gf_rpc_transport_reconfigure) on %s", dlerror());
+ }
+
+ vol_opt = GF_CALLOC (1, sizeof (volume_opt_list_t),
+ gf_common_mt_volume_opt_list_t);
+ if (!vol_opt) {
+ goto fail;
+ }
+
+ this = THIS;
+ vol_opt->given_opt = dlsym (handle, "options");
+ if (vol_opt->given_opt == NULL) {
+ gf_log ("rpc-transport", GF_LOG_DEBUG,
+ "volume option validation not specified");
+ } else {
+ INIT_LIST_HEAD (&vol_opt->list);
+ list_add_tail (&vol_opt->list, &(this->volume_options));
+ if (xlator_options_validate_list (this, options, vol_opt,
+ NULL)) {
+ gf_log ("rpc-transport", GF_LOG_ERROR,
+ "volume option validation failed");
+ goto fail;
+ }
+ }
+
+ trans->options = options;
+
+ pthread_mutex_init (&trans->lock, NULL);
+ trans->xl = this;
+
+ ret = trans->init (trans);
+ if (ret != 0) {
+ gf_log ("rpc-transport", GF_LOG_ERROR,
+ "'%s' initialization failed", type);
+ goto fail;
+ }
+
+ return_trans = trans;
+
+ GF_FREE (name);
+
+ return return_trans;
+
+fail:
+ if (trans) {
+ GF_FREE (trans->name);
+
+ if (trans->dl_handle)
+ dlclose (trans->dl_handle);
+
+ GF_FREE (trans);
+ }
+
+ GF_FREE (name);
+
+ if (vol_opt && !list_empty (&vol_opt->list)) {
+ list_del_init (&vol_opt->list);
+ GF_FREE (vol_opt);
+ }
+
+ return NULL;
+}
+
+
+int32_t
+rpc_transport_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
+{
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
+ GF_VALIDATE_OR_GOTO("rpc_transport", this->ops, fail);
+
+ ret = this->ops->submit_request (this, req);
+fail:
+ return ret;
+}
+
+
+int32_t
+rpc_transport_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
+{
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
+ GF_VALIDATE_OR_GOTO("rpc_transport", this->ops, fail);
+
+ ret = this->ops->submit_reply (this, reply);
+fail:
+ return ret;
+}
+
+
+int32_t
+rpc_transport_connect (rpc_transport_t *this, int port)
+{
+ int ret = -1;
+
+ GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
+
+ ret = this->ops->connect (this, port);
+fail:
+ return ret;
+}
+
+
+int32_t
+rpc_transport_listen (rpc_transport_t *this)
+{
+ int ret = -1;
+
+ GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
+
+ ret = this->ops->listen (this);
+fail:
+ return ret;
+}
+
+
+int32_t
+rpc_transport_disconnect (rpc_transport_t *this)
+{
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
+
+ ret = this->ops->disconnect (this);
+fail:
+ return ret;
+}
+
+
+int32_t
+rpc_transport_destroy (rpc_transport_t *this)
+{
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
+
+ if (this->options)
+ dict_unref (this->options);
+ if (this->fini)
+ this->fini (this);
+
+ pthread_mutex_destroy (&this->lock);
+
+ GF_FREE (this->name);
+
+ if (this->dl_handle)
+ dlclose (this->dl_handle);
+
+ GF_FREE (this);
+fail:
+ return ret;
+}
+
+
+rpc_transport_t *
+rpc_transport_ref (rpc_transport_t *this)
+{
+ rpc_transport_t *return_this = NULL;
+
+ GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
+
+ pthread_mutex_lock (&this->lock);
+ {
+ this->refcount ++;
+ }
+ pthread_mutex_unlock (&this->lock);
+
+ return_this = this;
+fail:
+ return return_this;
+}
+
+
+int32_t
+rpc_transport_unref (rpc_transport_t *this)
+{
+ int32_t refcount = 0;
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
+
+ pthread_mutex_lock (&this->lock);
+ {
+ refcount = --this->refcount;
+ }
+ pthread_mutex_unlock (&this->lock);
+
+ if (refcount == 0) {
+ if (this->mydata)
+ this->notify (this, this->mydata, RPC_TRANSPORT_CLEANUP,
+ NULL);
+ this->mydata = NULL;
+ this->notify = NULL;
+ rpc_transport_destroy (this);
+ }
+
+ ret = 0;
+fail:
+ return ret;
+}
+
+
+int32_t
+rpc_transport_notify (rpc_transport_t *this, rpc_transport_event_t event,
+ void *data, ...)
+{
+ int32_t ret = -1;
+ GF_VALIDATE_OR_GOTO ("rpc", this, out);
+
+ if (this->notify != NULL) {
+ ret = this->notify (this, this->mydata, event, data);
+ } else {
+ ret = 0;
+ }
+out:
+ return ret;
+}
+
+
+
+inline int
+rpc_transport_register_notify (rpc_transport_t *trans,
+ rpc_transport_notify_t notify, void *mydata)
+{
+ int32_t ret = -1;
+ GF_VALIDATE_OR_GOTO ("rpc", trans, out);
+
+ trans->notify = notify;
+ trans->mydata = mydata;
+
+ ret = 0;
+out:
+ return ret;
+}
+
+
+
+//give negative values to skip setting that value
+//this function asserts if both the values are negative.
+//why call it if you dont set it.
+int
+rpc_transport_keepalive_options_set (dict_t *options, int32_t interval,
+ int32_t time)
+{
+ int ret = -1;
+
+ GF_ASSERT (options);
+ GF_ASSERT ((interval > 0) || (time > 0));
+
+ ret = dict_set_int32 (options,
+ "transport.socket.keepalive-interval", interval);
+ if (ret)
+ goto out;
+
+ ret = dict_set_int32 (options,
+ "transport.socket.keepalive-time", time);
+ if (ret)
+ goto out;
+out:
+ return ret;
+}
+
+int
+rpc_transport_unix_options_build (dict_t **options, char *filepath,
+ int frame_timeout)
+{
+ dict_t *dict = NULL;
+ char *fpath = NULL;
+ int ret = -1;
+
+ GF_ASSERT (filepath);
+ GF_ASSERT (options);
+
+ dict = dict_new ();
+ if (!dict)
+ goto out;
+
+ fpath = gf_strdup (filepath);
+ if (!fpath) {
+ ret = -1;
+ goto out;
+ }
+
+ ret = dict_set_dynstr (dict, "transport.socket.connect-path", fpath);
+ if (ret)
+ goto out;
+
+ ret = dict_set_str (dict, "transport.address-family", "unix");
+ if (ret)
+ goto out;
+
+ ret = dict_set_str (dict, "transport.socket.nodelay", "off");
+ if (ret)
+ goto out;
+
+ ret = dict_set_str (dict, "transport-type", "socket");
+ if (ret)
+ goto out;
+
+ ret = dict_set_str (dict, "transport.socket.keepalive", "off");
+ if (ret)
+ goto out;
+
+ if (frame_timeout > 0) {
+ ret = dict_set_int32 (dict, "frame-timeout", frame_timeout);
+ if (ret)
+ goto out;
+ }
+
+ *options = dict;
+out:
+ if (ret) {
+ GF_FREE (fpath);
+ if (dict)
+ dict_unref (dict);
+ }
+ return ret;
+}
+
+int
+rpc_transport_inet_options_build (dict_t **options, const char *hostname,
+ int port)
+{
+ dict_t *dict = NULL;
+ char *host = NULL;
+ int ret = -1;
+
+ GF_ASSERT (options);
+ GF_ASSERT (hostname);
+ GF_ASSERT (port >= 1024);
+
+ dict = dict_new ();
+ if (!dict)
+ goto out;
+
+ host = gf_strdup ((char*)hostname);
+ if (!hostname) {
+ ret = -1;
+ goto out;
+ }
+
+ ret = dict_set_dynstr (dict, "remote-host", host);
+ if (ret) {
+ gf_log (THIS->name, GF_LOG_WARNING,
+ "failed to set remote-host with %s", host);
+ goto out;
+ }
+
+ ret = dict_set_int32 (dict, "remote-port", port);
+ if (ret) {
+ gf_log (THIS->name, GF_LOG_WARNING,
+ "failed to set remote-port with %d", port);
+ goto out;
+ }
+ ret = dict_set_str (dict, "transport.address-family", "inet");
+ if (ret) {
+ gf_log (THIS->name, GF_LOG_WARNING,
+ "failed to set addr-family with inet");
+ goto out;
+ }
+
+ ret = dict_set_str (dict, "transport-type", "socket");
+ if (ret) {
+ gf_log (THIS->name, GF_LOG_WARNING,
+ "failed to set trans-type with socket");
+ goto out;
+ }
+
+ *options = dict;
+out:
+ if (ret) {
+ GF_FREE (host);
+ if (dict)
+ dict_unref (dict);
+ }
+
+ return ret;
+}