summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-lib/src/rpc-transport.c
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-lib/src/rpc-transport.c')
-rw-r--r--rpc/rpc-lib/src/rpc-transport.c1657
1 files changed, 514 insertions, 1143 deletions
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c
index 82ea9a74bfd..a6e201a9b36 100644
--- a/rpc/rpc-lib/src/rpc-transport.c
+++ b/rpc/rpc-lib/src/rpc-transport.c
@@ -1,1301 +1,672 @@
/*
- Copyright (c) 2010 Gluster, Inc. <http://www.gluster.com>
+ Copyright (c) 2008-2012 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
- GlusterFS is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3 of the License,
- or (at your option) any later version.
-
- GlusterFS is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program. If not, see
- <http://www.gnu.org/licenses/>.
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
*/
#include <dlfcn.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/poll.h>
-#include <fnmatch.h>
#include <stdint.h>
-#ifndef _CONFIG_H
-#define _CONFIG_H
-#include "config.h"
-#endif
-
-#include "logging.h"
#include "rpc-transport.h"
-#include "glusterfs.h"
-/* FIXME: xlator.h is needed for volume_option_t, need to define the datatype
- * in some other header
- */
-#include "xlator.h"
-#include "list.h"
#ifndef GF_OPTION_LIST_EMPTY
#define GF_OPTION_LIST_EMPTY(_opt) (_opt->value[0] == NULL)
#endif
-/* RFC 1123 & 952 */
-static char
-valid_host_name (char *address, int length)
-{
- int i = 0;
- char ret = 1;
-
- if ((length > 75) || (length == 1)) {
- ret = 0;
- goto out;
- }
-
- if (!isalnum (address[length - 1])) {
- ret = 0;
- goto out;
- }
-
- for (i = 0; i < length; i++) {
- if (!isalnum (address[i]) && (address[i] != '.')
- && (address[i] != '-')) {
- ret = 0;
- goto out;
- }
- }
-
-out:
- return ret;
-}
-
-static char
-valid_ipv4_address (char *address, int length)
+int32_t
+rpc_transport_count(const char *transport_type)
{
- int octets = 0;
- int value = 0;
- char *tmp = NULL, *ptr = NULL, *prev = NULL, *endptr = NULL;
- char ret = 1;
-
- prev = tmp = gf_strdup (address);
- prev = strtok_r (tmp, ".", &ptr);
-
- while (prev != NULL)
- {
- octets++;
- value = strtol (prev, &endptr, 10);
- if ((value > 255) || (value < 0) || (endptr != NULL)) {
- ret = 0;
- goto out;
- }
-
- prev = strtok_r (NULL, ".", &ptr);
- }
-
- if (octets != 4) {
- ret = 0;
- }
-
-out:
- GF_FREE (tmp);
- return ret;
+ char *transport_dup = NULL;
+ char *saveptr = NULL;
+ char *ptr = NULL;
+ int count = 0;
+
+ if (transport_type == NULL)
+ return -1;
+
+ transport_dup = gf_strdup(transport_type);
+ if (transport_dup == NULL) {
+ return -1;
+ }
+
+ ptr = strtok_r(transport_dup, ",", &saveptr);
+ while (ptr != NULL) {
+ count++;
+ ptr = strtok_r(NULL, ",", &saveptr);
+ }
+
+ GF_FREE(transport_dup);
+ return count;
}
-
-static char
-valid_ipv6_address (char *address, int length)
+int
+rpc_transport_get_myaddr(rpc_transport_t *this, char *peeraddr, int addrlen,
+ struct sockaddr_storage *sa, size_t salen)
{
- int hex_numbers = 0;
- int value = 0;
- char *tmp = NULL, *ptr = NULL, *prev = NULL, *endptr = NULL;
- char ret = 1;
-
- tmp = gf_strdup (address);
- prev = strtok_r (tmp, ":", &ptr);
+ int32_t ret = -1;
+ GF_VALIDATE_OR_GOTO("rpc", this, out);
- while (prev != NULL)
- {
- hex_numbers++;
- value = strtol (prev, &endptr, 16);
- if ((value > 0xffff) || (value < 0)
- || (endptr != NULL && *endptr != '\0')) {
- ret = 0;
- goto out;
- }
-
- prev = strtok_r (NULL, ":", &ptr);
- }
-
- if (hex_numbers > 8) {
- ret = 0;
- }
+ ret = this->ops->get_myaddr(this, peeraddr, addrlen, sa, salen);
out:
- GF_FREE (tmp);
- return ret;
+ return ret;
}
-
-static char
-valid_internet_address (char *address)
-{
- char ret = 0;
- int length = 0;
-
- if (address == NULL) {
- goto out;
- }
-
- length = strlen (address);
- if (length == 0) {
- goto out;
- }
-
- if (valid_ipv4_address (address, length)
- || valid_ipv6_address (address, length)
- || valid_host_name (address, length)) {
- ret = 1;
- }
-
-out:
- return ret;
-}
-
-
-int
-__volume_option_value_validate (char *name,
- data_pair_t *pair,
- volume_option_t *opt)
+int32_t
+rpc_transport_get_peername(rpc_transport_t *this, char *hostname, int hostlen)
{
- int i = 0;
- int ret = -1;
- uint64_t input_size = 0;
- long long inputll = 0;
-
- /* Key is valid, validate the option */
- switch (opt->type) {
- case GF_OPTION_TYPE_XLATOR:
- break;
-
- case GF_OPTION_TYPE_PATH:
- {
- if (strstr (pair->value->data, "../")) {
- gf_log (name, GF_LOG_ERROR,
- "invalid path given '%s'",
- pair->value->data);
- ret = -1;
- goto out;
- }
-
- /* Make sure the given path is valid */
- if (pair->value->data[0] != '/') {
- gf_log (name, GF_LOG_WARNING,
- "option %s %s: '%s' is not an "
- "absolute path name",
- pair->key, pair->value->data,
- pair->value->data);
- }
- ret = 0;
- }
- break;
- case GF_OPTION_TYPE_INT:
- {
- /* Check the range */
- if (gf_string2longlong (pair->value->data,
- &inputll) != 0) {
- gf_log (name, GF_LOG_ERROR,
- "invalid number format \"%s\" in "
- "\"option %s\"",
- pair->value->data, pair->key);
- goto out;
- }
-
- if ((opt->min == 0) && (opt->max == 0)) {
- gf_log (name, GF_LOG_DEBUG,
- "no range check required for "
- "'option %s %s'",
- pair->key, pair->value->data);
- ret = 0;
- break;
- }
- if ((inputll < opt->min) ||
- (inputll > opt->max)) {
- gf_log (name, GF_LOG_WARNING,
- "'%lld' in 'option %s %s' is out of "
- "range [%"PRId64" - %"PRId64"]",
- inputll, pair->key,
- pair->value->data,
- opt->min, opt->max);
- }
- ret = 0;
- }
- break;
- case GF_OPTION_TYPE_SIZET:
- {
- /* Check the range */
- if (gf_string2bytesize (pair->value->data,
- &input_size) != 0) {
- gf_log (name, GF_LOG_ERROR,
- "invalid size format \"%s\" in "
- "\"option %s\"",
- pair->value->data, pair->key);
- goto out;
- }
-
- if ((opt->min == 0) && (opt->max == 0)) {
- gf_log (name, GF_LOG_DEBUG,
- "no range check required for "
- "'option %s %s'",
- pair->key, pair->value->data);
- ret = 0;
- break;
- }
- if ((input_size < opt->min) ||
- (input_size > opt->max)) {
- gf_log (name, GF_LOG_ERROR,
- "'%"PRId64"' in 'option %s %s' is "
- "out of range [%"PRId64" - %"PRId64"]",
- input_size, pair->key,
- pair->value->data,
- opt->min, opt->max);
- }
- ret = 0;
- }
- break;
- case GF_OPTION_TYPE_BOOL:
- {
- /* Check if the value is one of
- '0|1|on|off|no|yes|true|false|enable|disable' */
- gf_boolean_t bool_value;
- if (gf_string2boolean (pair->value->data,
- &bool_value) != 0) {
- gf_log (name, GF_LOG_ERROR,
- "option %s %s: '%s' is not a valid "
- "boolean value",
- pair->key, pair->value->data,
- pair->value->data);
- goto out;
- }
- ret = 0;
- }
- break;
- case GF_OPTION_TYPE_STR:
- {
- /* Check if the '*str' is valid */
- if (GF_OPTION_LIST_EMPTY(opt)) {
- ret = 0;
- goto out;
- }
-
- for (i = 0; (i < ZR_OPTION_MAX_ARRAY_SIZE) &&
- opt->value[i]; i++) {
- if (strcasecmp (opt->value[i],
- pair->value->data) == 0) {
- ret = 0;
- break;
- }
- }
-
- if ((i == ZR_OPTION_MAX_ARRAY_SIZE)
- || ((i < ZR_OPTION_MAX_ARRAY_SIZE)
- && (!opt->value[i]))) {
- /* enter here only if
- * 1. reached end of opt->value array and haven't
- * validated input
- * OR
- * 2. valid input list is less than
- * ZR_OPTION_MAX_ARRAY_SIZE and input has not
- * matched all possible input values.
- */
- char given_array[4096] = {0,};
- for (i = 0; (i < ZR_OPTION_MAX_ARRAY_SIZE) &&
- opt->value[i]; i++) {
- strcat (given_array, opt->value[i]);
- strcat (given_array, ", ");
- }
-
- gf_log (name, GF_LOG_ERROR,
- "option %s %s: '%s' is not valid "
- "(possible options are %s)",
- pair->key, pair->value->data,
- pair->value->data, given_array);
-
- goto out;
- }
- }
- break;
- case GF_OPTION_TYPE_PERCENT:
- {
- uint32_t percent = 0;
-
-
- /* Check if the value is valid percentage */
- if (gf_string2percent (pair->value->data,
- &percent) != 0) {
- gf_log (name, GF_LOG_ERROR,
- "invalid percent format \"%s\" "
- "in \"option %s\"",
- pair->value->data, pair->key);
- goto out;
- }
-
- if ((percent < 0) || (percent > 100)) {
- gf_log (name, GF_LOG_ERROR,
- "'%d' in 'option %s %s' is out of "
- "range [0 - 100]",
- percent, pair->key,
- pair->value->data);
- }
- ret = 0;
- }
- break;
- case GF_OPTION_TYPE_PERCENT_OR_SIZET:
- {
- uint32_t percent = 0;
- uint64_t input_size = 0;
-
- /* Check if the value is valid percentage */
- if (gf_string2percent (pair->value->data,
- &percent) == 0) {
- if (percent > 100) {
- gf_log (name, GF_LOG_DEBUG,
- "value given was greater than 100, "
- "assuming this is actually a size");
- if (gf_string2bytesize (pair->value->data,
- &input_size) == 0) {
- /* Check the range */
- if ((opt->min == 0) &&
- (opt->max == 0)) {
- gf_log (name, GF_LOG_DEBUG,
- "no range check "
- "required for "
- "'option %s %s'",
- pair->key,
- pair->value->data);
- // It is a size
- ret = 0;
- goto out;
- }
- if ((input_size < opt->min) ||
- (input_size > opt->max)) {
- gf_log (name, GF_LOG_ERROR,
- "'%"PRId64"' in "
- "'option %s %s' is out"
- " of range [%"PRId64""
- "- %"PRId64"]",
- input_size, pair->key,
- pair->value->data,
- opt->min, opt->max);
- }
- // It is a size
- ret = 0;
- goto out;
- } else {
- // It's not a percent or size
- gf_log (name, GF_LOG_ERROR,
- "invalid number format \"%s\" "
- "in \"option %s\"",
- pair->value->data, pair->key);
- }
-
- }
- // It is a percent
- ret = 0;
- goto out;
- } else {
- if (gf_string2bytesize (pair->value->data,
- &input_size) == 0) {
- /* Check the range */
- if ((opt->min == 0) && (opt->max == 0)) {
- gf_log (name, GF_LOG_DEBUG,
- "no range check required for "
- "'option %s %s'",
- pair->key, pair->value->data);
- // It is a size
- ret = 0;
- goto out;
- }
- if ((input_size < opt->min) ||
- (input_size > opt->max)) {
- gf_log (name, GF_LOG_ERROR,
- "'%"PRId64"' in 'option %s %s'"
- " is out of range [%"PRId64" -"
- " %"PRId64"]",
- input_size, pair->key,
- pair->value->data,
- opt->min, opt->max);
- }
- } else {
- // It's not a percent or size
- gf_log (name, GF_LOG_ERROR,
- "invalid number format \"%s\" "
- "in \"option %s\"",
- pair->value->data, pair->key);
- }
- //It is a size
- ret = 0;
- goto out;
- }
-
- }
- break;
- case GF_OPTION_TYPE_TIME:
- {
- uint32_t input_time = 0;
-
- /* Check if the value is valid percentage */
- if (gf_string2time (pair->value->data,
- &input_time) != 0) {
- gf_log (name,
- GF_LOG_ERROR,
- "invalid time format \"%s\" in "
- "\"option %s\"",
- pair->value->data, pair->key);
- goto out;
- }
-
- if ((opt->min == 0) && (opt->max == 0)) {
- gf_log (name, GF_LOG_DEBUG,
- "no range check required for "
- "'option %s %s'",
- pair->key, pair->value->data);
- ret = 0;
- goto out;
- }
- if ((input_time < opt->min) ||
- (input_time > opt->max)) {
- gf_log (name, GF_LOG_ERROR,
- "'%"PRIu32"' in 'option %s %s' is "
- "out of range [%"PRId64" - %"PRId64"]",
- input_time, pair->key,
- pair->value->data,
- opt->min, opt->max);
- }
- ret = 0;
- }
- break;
- case GF_OPTION_TYPE_DOUBLE:
- {
- double input_time = 0.0;
-
- /* Check if the value is valid double */
- if (gf_string2double (pair->value->data,
- &input_time) != 0) {
- gf_log (name,
- GF_LOG_ERROR,
- "invalid time format \"%s\" in \"option %s\"",
- pair->value->data, pair->key);
- goto out;
- }
-
- if (input_time < 0.0) {
- gf_log (name,
- GF_LOG_ERROR,
- "invalid time format \"%s\" in \"option %s\"",
- pair->value->data, pair->key);
- goto out;
- }
-
- if ((opt->min == 0) && (opt->max == 0)) {
- gf_log (name, GF_LOG_DEBUG,
- "no range check required for 'option %s %s'",
- pair->key, pair->value->data);
- ret = 0;
- goto out;
- }
- ret = 0;
- }
- break;
- case GF_OPTION_TYPE_INTERNET_ADDRESS:
- {
- if (valid_internet_address (pair->value->data)) {
- ret = 0;
- }
- }
- break;
- case GF_OPTION_TYPE_ANY:
- /* NO CHECK */
- ret = 0;
- break;
- }
+ int32_t ret = -1;
+ GF_VALIDATE_OR_GOTO("rpc", this, out);
+ ret = this->ops->get_peername(this, hostname, hostlen);
out:
- return ret;
+ return ret;
}
-/* FIXME: this procedure should be removed from transport */
int
-validate_volume_options (char *name, dict_t *options, volume_option_t *opt)
+rpc_transport_throttle(rpc_transport_t *this, gf_boolean_t onoff)
{
- int i = 0;
- int ret = -1;
- int index = 0;
- volume_option_t *trav = NULL;
- data_pair_t *pairs = NULL;
-
- if (!opt) {
- ret = 0;
- goto out;
- }
-
- /* First search for not supported options, if any report error */
- pairs = options->members_list;
- while (pairs) {
- ret = -1;
- for (index = 0;
- opt[index].key && opt[index].key[0] ; index++) {
- trav = &(opt[index]);
- for (i = 0 ;
- (i < ZR_VOLUME_MAX_NUM_KEY) &&
- trav->key[i]; i++) {
- /* Check if the key is valid */
- if (fnmatch (trav->key[i],
- pairs->key, FNM_NOESCAPE) == 0) {
- ret = 0;
- break;
- }
- }
- if (!ret) {
- if (i) {
- gf_log (name, GF_LOG_WARNING,
- "option '%s' is deprecated, "
- "preferred is '%s', continuing"
- " with correction",
- trav->key[i], trav->key[0]);
- /* TODO: some bytes lost */
- pairs->key = gf_strdup (trav->key[0]);
- }
- break;
- }
- }
- if (!ret) {
- ret = __volume_option_value_validate (name, pairs, trav);
- if (-1 == ret) {
- goto out;
- }
- }
-
- pairs = pairs->next;
- }
-
- ret = 0;
- out:
- return ret;
-}
+ if (!this->ops->throttle)
+ return -ENOSYS;
-int32_t
-rpc_transport_get_myaddr (rpc_transport_t *this, char *peeraddr, int addrlen,
- struct sockaddr *sa, size_t salen)
-{
- if (!this)
- return -1;
-
- return this->ops->get_myaddr (this, peeraddr, addrlen, sa, salen);
+ return this->ops->throttle(this, onoff);
}
int32_t
-rpc_transport_get_myname (rpc_transport_t *this, char *hostname, int hostlen)
+rpc_transport_get_peeraddr(rpc_transport_t *this, char *peeraddr, int addrlen,
+ struct sockaddr_storage *sa, size_t salen)
{
- if (!this)
- return -1;
-
- return this->ops->get_myname (this, hostname, hostlen);
-}
-
-int32_t
-rpc_transport_get_peername (rpc_transport_t *this, char *hostname, int hostlen)
-{
- if (!this)
- return -1;
- return this->ops->get_peername (this, hostname, hostlen);
-}
+ int32_t ret = -1;
+ GF_VALIDATE_OR_GOTO("rpc", this, out);
-int32_t
-rpc_transport_get_peeraddr (rpc_transport_t *this, char *peeraddr, int addrlen,
- struct sockaddr *sa, size_t salen)
-{
- if (!this)
- return -1;
- return this->ops->get_peeraddr (this, peeraddr, addrlen, sa, salen);
+ ret = this->ops->get_peeraddr(this, peeraddr, addrlen, sa, salen);
+out:
+ return ret;
}
void
-rpc_transport_pollin_destroy (rpc_transport_pollin_t *pollin)
+rpc_transport_pollin_destroy(rpc_transport_pollin_t *pollin)
{
- if (!pollin) {
- goto out;
- }
+ GF_VALIDATE_OR_GOTO("rpc", pollin, out);
- if (pollin->vectored) {
- if (pollin->data.vector.iobuf1) {
- iobuf_unref (pollin->data.vector.iobuf1);
- }
-
- if (pollin->data.vector.iobuf2) {
- iobuf_unref (pollin->data.vector.iobuf2);
- }
- } else {
- if (pollin->data.simple.iobuf) {
- iobuf_unref (pollin->data.simple.iobuf);
- }
- }
+ if (pollin->iobref) {
+ iobref_unref(pollin->iobref);
+ }
- if (pollin->private) {
- /* */
- GF_FREE (pollin->private);
- }
+ if (pollin->private) {
+ /* */
+ GF_FREE(pollin->private);
+ }
- GF_FREE (pollin);
+ GF_FREE(pollin);
out:
- return;
+ return;
}
-
rpc_transport_pollin_t *
-rpc_transport_pollin_alloc (rpc_transport_t *this, struct iobuf *iobuf,
- size_t size, struct iobuf *vectored_buf,
- size_t vectored_size, void *private)
+rpc_transport_pollin_alloc(rpc_transport_t *this, struct iovec *vector,
+ int count, struct iobuf *hdr_iobuf,
+ struct iobref *iobref, void *private)
{
- rpc_transport_pollin_t *msg = NULL;
- msg = GF_CALLOC (1, sizeof (*msg), gf_common_mt_rpc_trans_pollin_t);
- if (!msg) {
- gf_log ("rpc-transport", GF_LOG_ERROR, "out of memory");
- goto out;
- }
+ rpc_transport_pollin_t *msg = NULL;
+ msg = GF_CALLOC(1, sizeof(*msg), gf_common_mt_rpc_trans_pollin_t);
+ if (!msg) {
+ goto out;
+ }
- if (vectored_buf) {
- msg->vectored = 1;
- msg->data.vector.iobuf1 = iobuf_ref (iobuf);
- msg->data.vector.size1 = size;
+ msg->trans = this;
- msg->data.vector.iobuf2 = iobuf_ref (vectored_buf);
- msg->data.vector.size2 = vectored_size;
- } else {
- msg->data.simple.iobuf = iobuf_ref (iobuf);
- msg->data.simple.size = size;
- }
+ if (count > 1) {
+ msg->vectored = 1;
+ }
+
+ memcpy(msg->vector, vector, count * sizeof(*vector));
+ msg->count = count;
+ msg->iobref = iobref_ref(iobref);
+ msg->private = private;
+ if (hdr_iobuf)
+ iobref_add(iobref, hdr_iobuf);
- msg->private = private;
out:
- return msg;
+ return msg;
}
-
-rpc_transport_pollin_t *
-rpc_transport_same_process_pollin_alloc (rpc_transport_t *this,
- struct iovec *rpchdr, int rpchdrcount,
- struct iovec *proghdr,
- int proghdrcount,
- struct iovec *progpayload,
- int progpayloadcount,
- rpc_transport_rsp_t *rsp,
- char is_request)
+void
+rpc_transport_cleanup(rpc_transport_t *trans)
{
- rpc_transport_pollin_t *msg = NULL;
- int rpchdrlen = 0, proghdrlen = 0;
- int progpayloadlen = 0;
- char vectored = 0;
- char *hdr = NULL, *progpayloadbuf = NULL;
-
- if (!rpchdr || !proghdr) {
- goto err;
- }
-
- msg = GF_CALLOC (1, sizeof (*msg), gf_common_mt_rpc_trans_pollin_t);
- if (!msg) {
- gf_log ("rpc-transport", GF_LOG_ERROR, "out of memory");
- goto err;
- }
-
- rpchdrlen = iov_length (rpchdr, rpchdrcount);
- proghdrlen = iov_length (proghdr, proghdrcount);
+ if (!trans)
+ return;
- if (progpayload) {
- vectored = 1;
- progpayloadlen = iov_length (progpayload, progpayloadcount);
- }
+ if (trans->fini)
+ trans->fini(trans);
- /* FIXME: we are assuming rpchdr and proghdr will fit into
- * an iobuf (128KB)
- */
- if ((rpchdrlen + proghdrlen) > this->ctx->page_size) {
- gf_log ("rpc_transport", GF_LOG_DEBUG, "program hdr and rpc"
- " hdr together combined (%d) is bigger than "
- "iobuf size (%zu)", (rpchdrlen + proghdrlen),
- this->ctx->page_size);
- goto err;
- }
-
- if (vectored) {
- msg->data.vector.iobuf1 = iobuf_get (this->ctx->iobuf_pool);
- if (!msg->data.vector.iobuf1) {
- gf_log ("rpc_transport", GF_LOG_ERROR,
- "out of memory");
- goto err;
- }
-
- msg->data.vector.size1 = rpchdrlen + proghdrlen;
- hdr = iobuf_ptr (msg->data.vector.iobuf1);
-
- if (!is_request && rsp) {
- msg->data.vector.iobuf2 = rsp->rspbuf;
- progpayloadbuf = rsp->rspvec->iov_base;
- } else {
- msg->data.vector.iobuf2 = iobuf_get (this->ctx->iobuf_pool);
- if (!msg->data.vector.iobuf2) {
- gf_log ("rpc_transport", GF_LOG_ERROR,
- "out of memory");
- goto err;
- }
-
- progpayloadbuf = iobuf_ptr (msg->data.vector.iobuf2);
- }
- msg->data.vector.size2 = progpayloadlen;
- } else {
- if (!is_request && rsp) {
- /* FIXME: Assuming rspvec contains only one vector */
- hdr = rsp->rspvec->iov_base;
- msg->data.simple.iobuf = rsp->rspbuf;
- } else {
- msg->data.simple.iobuf = iobuf_get (this->ctx->iobuf_pool);
- if (!msg->data.simple.iobuf) {
- gf_log ("rpc_transport", GF_LOG_ERROR,
- "out of memory");
- goto err;
- }
-
- hdr = iobuf_ptr (msg->data.simple.iobuf);
- }
-
- msg->data.simple.size = rpchdrlen + proghdrlen;
- }
+ if (trans->options) {
+ dict_unref(trans->options);
+ trans->options = NULL;
+ }
- iov_unload (hdr, rpchdr, rpchdrcount);
- hdr += rpchdrlen;
- iov_unload (hdr, proghdr, proghdrcount);
+ GF_FREE(trans->name);
- if (progpayload) {
- iov_unload (progpayloadbuf, progpayload,
- progpayloadcount);
- }
+ if (trans->xl)
+ pthread_mutex_destroy(&trans->lock);
- if (is_request) {
- msg->private = rsp;
- }
- return msg;
-err:
- if (msg) {
- rpc_transport_pollin_destroy (msg);
- }
+ if (trans->dl_handle)
+ dlclose(trans->dl_handle);
- return NULL;
+ GF_FREE(trans);
}
-
-rpc_transport_handover_t *
-rpc_transport_handover_alloc (rpc_transport_pollin_t *pollin)
+rpc_transport_t *
+rpc_transport_load(glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
{
- rpc_transport_handover_t *msg = NULL;
-
- msg = GF_CALLOC (1, sizeof (*msg), gf_common_mt_rpc_trans_handover_t);
- if (!msg) {
- gf_log ("rpc_transport", GF_LOG_ERROR, "out of memory");
- goto out;
- }
-
- msg->pollin = pollin;
- INIT_LIST_HEAD (&msg->list);
-out:
- return msg;
-}
+ struct rpc_transport *trans = NULL, *return_trans = NULL;
+ char *name = NULL;
+ void *handle = NULL;
+ char *type = NULL;
+ static char str[] = "ERROR";
+ int32_t ret = -1;
+ int is_tcp = 0, is_unix = 0, is_ibsdp = 0;
+ volume_opt_list_t *vol_opt = NULL;
+ gf_boolean_t bind_insecure = _gf_false;
+ xlator_t *this = NULL;
+ gf_boolean_t success = _gf_false;
+
+ GF_VALIDATE_OR_GOTO("rpc-transport", options, fail);
+ GF_VALIDATE_OR_GOTO("rpc-transport", ctx, fail);
+ GF_VALIDATE_OR_GOTO("rpc-transport", trans_name, fail);
+
+ trans = GF_CALLOC(1, sizeof(struct rpc_transport),
+ gf_common_mt_rpc_trans_t);
+ if (!trans)
+ goto fail;
+
+ trans->name = gf_strdup(trans_name);
+ if (!trans->name)
+ goto fail;
+
+ trans->ctx = ctx;
+ type = str;
+
+ /* Backward compatibility */
+ ret = dict_get_str_sizen(options, "transport-type", &type);
+ if (ret < 0) {
+ ret = dict_set_str_sizen(options, "transport-type", "socket");
+ if (ret < 0)
+ gf_log("dict", GF_LOG_DEBUG, "setting transport-type failed");
+ else
+ gf_log("rpc-transport", GF_LOG_DEBUG,
+ "missing 'option transport-type'. defaulting to "
+ "\"socket\"");
+ } else {
+ {
+ /* Backward compatibility to handle * /client,
+ * * /server.
+ */
+ char *tmp = strchr(type, '/');
+ if (tmp)
+ *tmp = '\0';
+ }
+
+ is_tcp = strcmp(type, "tcp");
+ is_unix = strcmp(type, "unix");
+ is_ibsdp = strcmp(type, "ib-sdp");
+ if ((is_tcp == 0) || (is_unix == 0) || (is_ibsdp == 0)) {
+ if (is_unix == 0)
+ ret = dict_set_str_sizen(options, "transport.address-family",
+ "unix");
+ if (is_ibsdp == 0)
+ ret = dict_set_str_sizen(options, "transport.address-family",
+ "inet-sdp");
+
+ if (ret < 0)
+ gf_log("dict", GF_LOG_DEBUG, "setting address-family failed");
+
+ ret = dict_set_str_sizen(options, "transport-type", "socket");
+ if (ret < 0)
+ gf_log("dict", GF_LOG_DEBUG, "setting transport-type failed");
+ }
+ }
+
+ /* client-bind-insecure is for clients protocol, and
+ * bind-insecure for glusterd. Both mutually exclusive
+ */
+ ret = dict_get_str_sizen(options, "client-bind-insecure", &type);
+ if (ret)
+ ret = dict_get_str_sizen(options, "bind-insecure", &type);
+ if (ret == 0) {
+ ret = gf_string2boolean(type, &bind_insecure);
+ if (ret < 0) {
+ gf_log("rcp-transport", GF_LOG_WARNING,
+ "bind-insecure option %s is not a"
+ " valid bool option",
+ type);
+ goto fail;
+ }
+ if (_gf_true == bind_insecure)
+ trans->bind_insecure = 1;
+ else
+ trans->bind_insecure = 0;
+ } else {
+ /* By default allow bind insecure */
+ trans->bind_insecure = 1;
+ }
+
+ ret = dict_get_str_sizen(options, "transport-type", &type);
+ if (ret < 0) {
+ gf_log("rpc-transport", GF_LOG_ERROR,
+ "'option transport-type <xx>' missing in volume '%s'",
+ trans_name);
+ goto fail;
+ }
+
+ ret = gf_asprintf(&name, "%s/%s.so", RPC_TRANSPORTDIR, type);
+ if (-1 == ret) {
+ goto fail;
+ }
+
+ if (dict_get_sizen(options, "notify-poller-death")) {
+ trans->notify_poller_death = 1;
+ }
+
+ gf_log("rpc-transport", GF_LOG_DEBUG, "attempt to load file %s", name);
+
+ handle = dlopen(name, RTLD_NOW);
+ if (handle == NULL) {
+ gf_log("rpc-transport", GF_LOG_ERROR, "%s", dlerror());
+ gf_log("rpc-transport", GF_LOG_WARNING,
+ "volume '%s': transport-type '%s' is not valid or "
+ "not found on this machine",
+ trans_name, type);
+ goto fail;
+ }
+
+ trans->dl_handle = handle;
+
+ trans->ops = dlsym(handle, "tops");
+ if (trans->ops == NULL) {
+ gf_log("rpc-transport", GF_LOG_ERROR, "dlsym (rpc_transport_ops) on %s",
+ dlerror());
+ goto fail;
+ }
+
+ *VOID(&(trans->init)) = dlsym(handle, "init");
+ if (trans->init == NULL) {
+ gf_log("rpc-transport", GF_LOG_ERROR,
+ "dlsym (gf_rpc_transport_init) on %s", dlerror());
+ goto fail;
+ }
+
+ *VOID(&(trans->fini)) = dlsym(handle, "fini");
+ if (trans->fini == NULL) {
+ gf_log("rpc-transport", GF_LOG_ERROR,
+ "dlsym (gf_rpc_transport_fini) on %s", dlerror());
+ goto fail;
+ }
+
+ *VOID(&(trans->reconfigure)) = dlsym(handle, "reconfigure");
+ if (trans->reconfigure == NULL) {
+ gf_log("rpc-transport", GF_LOG_DEBUG,
+ "dlsym (gf_rpc_transport_reconfigure) on %s", dlerror());
+ }
+
+ vol_opt = GF_CALLOC(1, sizeof(volume_opt_list_t),
+ gf_common_mt_volume_opt_list_t);
+ if (!vol_opt) {
+ goto fail;
+ }
+
+ this = THIS;
+ vol_opt->given_opt = dlsym(handle, "options");
+ if (vol_opt->given_opt == NULL) {
+ gf_log("rpc-transport", GF_LOG_DEBUG,
+ "volume option validation not specified");
+ } else {
+ INIT_LIST_HEAD(&vol_opt->list);
+ list_add_tail(&vol_opt->list, &(this->volume_options));
+ if (xlator_options_validate_list(this, options, vol_opt, NULL)) {
+ gf_log("rpc-transport", GF_LOG_ERROR,
+ "volume option validation failed");
+ goto fail;
+ }
+ }
+
+ trans->options = dict_ref(options);
+
+ pthread_mutex_init(&trans->lock, NULL);
+ trans->xl = this;
+
+ ret = trans->init(trans);
+ if (ret != 0) {
+ gf_log("rpc-transport", GF_LOG_WARNING, "'%s' initialization failed",
+ type);
+ goto fail;
+ }
+
+ INIT_LIST_HEAD(&trans->list);
+ GF_ATOMIC_INIT(trans->disconnect_progress, 0);
+
+ return_trans = trans;
+
+ GF_FREE(name);
+
+ success = _gf_true;
+fail:
+ if (!success) {
+ rpc_transport_cleanup(trans);
+ GF_FREE(name);
-void
-rpc_transport_handover_destroy (rpc_transport_handover_t *msg)
-{
- if (!msg) {
- goto out;
- }
+ return_trans = NULL;
+ }
- if (msg->pollin) {
- rpc_transport_pollin_destroy (msg->pollin);
+ if (vol_opt) {
+ if (!list_empty(&vol_opt->list)) {
+ list_del_init(&vol_opt->list);
}
+ GF_FREE(vol_opt);
+ }
- GF_FREE (msg);
-
-out:
- return;
+ return return_trans;
}
-
-rpc_transport_t *
-rpc_transport_load (glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
+int32_t
+rpc_transport_submit_request(rpc_transport_t *this, rpc_transport_req_t *req)
{
- struct rpc_transport *trans = NULL, *return_trans = NULL;
- char *name = NULL;
- void *handle = NULL;
- char *type = NULL;
- char str[] = "ERROR";
- int32_t ret = -1;
- int8_t is_tcp = 0, is_unix = 0, is_ibsdp = 0;
- volume_opt_list_t *vol_opt = NULL;
-
- GF_VALIDATE_OR_GOTO("rpc-transport", options, fail);
- GF_VALIDATE_OR_GOTO("rpc-transport", ctx, fail);
- GF_VALIDATE_OR_GOTO("rpc-transport", trans_name, fail);
-
- trans = GF_CALLOC (1, sizeof (struct rpc_transport), gf_common_mt_rpc_trans_t);
- GF_VALIDATE_OR_GOTO("rpc-transport", trans, fail);
-
- trans->name = gf_strdup (trans_name);
- GF_VALIDATE_OR_GOTO ("rpc-transport", trans->name, fail);
-
- trans->ctx = ctx;
- type = str;
-
- /* Backward compatibility */
- ret = dict_get_str (options, "transport-type", &type);
- if (ret < 0) {
- ret = dict_set_str (options, "transport-type", "socket");
- if (ret < 0)
- gf_log ("dict", GF_LOG_DEBUG,
- "setting transport-type failed");
- gf_log ("rpc-transport", GF_LOG_WARNING,
- "missing 'option transport-type'. defaulting to "
- "\"socket\"");
- } else {
- {
- /* Backword compatibility to handle * /client,
- * * /server.
- */
- char *tmp = strchr (type, '/');
- if (tmp)
- *tmp = '\0';
- }
-
- is_tcp = strcmp (type, "tcp");
- is_unix = strcmp (type, "unix");
- is_ibsdp = strcmp (type, "ib-sdp");
- if ((is_tcp == 0) ||
- (is_unix == 0) ||
- (is_ibsdp == 0)) {
- if (is_unix == 0)
- ret = dict_set_str (options,
- "transport.address-family",
- "unix");
- if (is_ibsdp == 0)
- ret = dict_set_str (options,
- "transport.address-family",
- "inet-sdp");
-
- if (ret < 0)
- gf_log ("dict", GF_LOG_DEBUG,
- "setting address-family failed");
-
- ret = dict_set_str (options,
- "transport-type", "socket");
- if (ret < 0)
- gf_log ("dict", GF_LOG_DEBUG,
- "setting transport-type failed");
- }
- }
-
- ret = dict_get_str (options, "transport-type", &type);
- if (ret < 0) {
- gf_log ("rpc-transport", GF_LOG_ERROR,
- "'option transport-type <xx>' missing in volume '%s'",
- trans_name);
- goto fail;
- }
-
- ret = gf_asprintf (&name, "%s/%s.so", RPC_TRANSPORTDIR, type);
- if (-1 == ret) {
- gf_log ("rpc-transport", GF_LOG_ERROR, "asprintf failed");
- goto fail;
- }
- gf_log ("rpc-transport", GF_LOG_DEBUG,
- "attempt to load file %s", name);
-
- handle = dlopen (name, RTLD_NOW|RTLD_GLOBAL);
- if (handle == NULL) {
- gf_log ("rpc-transport", GF_LOG_ERROR, "%s", dlerror ());
- gf_log ("rpc-transport", GF_LOG_ERROR,
- "volume '%s': transport-type '%s' is not valid or "
- "not found on this machine",
- trans_name, type);
- goto fail;
- }
-
- trans->ops = dlsym (handle, "tops");
- if (trans->ops == NULL) {
- gf_log ("rpc-transport", GF_LOG_ERROR,
- "dlsym (rpc_transport_ops) on %s", dlerror ());
- goto fail;
- }
-
- trans->init = dlsym (handle, "init");
- if (trans->init == NULL) {
- gf_log ("rpc-transport", GF_LOG_ERROR,
- "dlsym (gf_rpc_transport_init) on %s", dlerror ());
- goto fail;
- }
-
- trans->fini = dlsym (handle, "fini");
- if (trans->fini == NULL) {
- gf_log ("rpc-transport", GF_LOG_ERROR,
- "dlsym (gf_rpc_transport_fini) on %s", dlerror ());
- goto fail;
- }
-
- vol_opt = GF_CALLOC (1, sizeof (volume_opt_list_t),
- gf_common_mt_volume_opt_list_t);
- if (!vol_opt) {
- gf_log (trans_name, GF_LOG_ERROR, "out of memory");
- goto fail;
- }
+ int32_t ret = -1;
- vol_opt->given_opt = dlsym (handle, "options");
- if (vol_opt->given_opt == NULL) {
- gf_log ("rpc-transport", GF_LOG_DEBUG,
- "volume option validation not specified");
- } else {
- /* FIXME: is adding really needed? */
- /* list_add_tail (&vol_opt->list, &xl->volume_options); */
- if (-1 ==
- validate_volume_options (trans_name, options,
- vol_opt->given_opt)) {
- gf_log ("rpc-transport", GF_LOG_ERROR,
- "volume option validation failed");
- goto fail;
- }
- }
-
- ret = trans->init (trans);
- if (ret != 0) {
- gf_log ("rpc-transport", GF_LOG_ERROR,
- "'%s' initialization failed", type);
- goto fail;
- }
-
- trans->options = options;
-
- pthread_mutex_init (&trans->lock, NULL);
- return_trans = trans;
- return return_trans;
+ GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
+ GF_VALIDATE_OR_GOTO("rpc_transport", this->ops, fail);
+ ret = this->ops->submit_request(this, req);
fail:
- if (trans) {
- if (trans->name) {
- GF_FREE (trans->name);
- }
-
- GF_FREE (trans);
- }
-
- if (name) {
- GF_FREE (name);
- }
-
- if (vol_opt) {
- GF_FREE (vol_opt);
- }
-
- return NULL;
+ return ret;
}
-
int32_t
-rpc_transport_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
+rpc_transport_submit_reply(rpc_transport_t *this, rpc_transport_reply_t *reply)
{
- int32_t ret = -1;
- rpc_transport_t *peer_trans = NULL;
- rpc_transport_pollin_t *pollin = NULL;
- rpc_transport_handover_t *handover_msg = NULL;
- rpc_transport_rsp_t *rsp = NULL;
-
- if (this->peer_trans) {
- peer_trans = this->peer_trans;
-
- rsp = GF_CALLOC (1, sizeof (*rsp), gf_common_mt_rpc_trans_rsp_t);
- if (!rsp) {
- ret = -ENOMEM;
- goto fail;
- }
-
- *rsp = req->rsp;
-
- pollin = rpc_transport_same_process_pollin_alloc (this, req->msg.rpchdr,
- req->msg.rpchdrcount,
- req->msg.proghdr,
- req->msg.proghdrcount,
- req->msg.progpayload,
- req->msg.progpayloadcount,
- rsp, 1);
- if (!pollin) {
- GF_FREE (rsp);
- ret = -ENOMEM;
- goto fail;
- }
-
- handover_msg = rpc_transport_handover_alloc (pollin);
- if (!handover_msg) {
- rpc_transport_pollin_destroy (pollin);
- ret = -ENOMEM;
- goto fail;
- }
-
- pthread_mutex_lock (&peer_trans->handover.mutex);
- {
- list_add_tail (&handover_msg->list,
- &peer_trans->handover.msgs);
- pthread_cond_broadcast (&peer_trans->handover.cond);
- }
- pthread_mutex_unlock (&peer_trans->handover.mutex);
-
- return 0;
- }
+ int32_t ret = -1;
- GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
- GF_VALIDATE_OR_GOTO("rpc_transport", this->ops, fail);
+ GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
+ GF_VALIDATE_OR_GOTO("rpc_transport", this->ops, fail);
- ret = this->ops->submit_request (this, req);
+ ret = this->ops->submit_reply(this, reply);
fail:
- return ret;
+ return ret;
}
-
int32_t
-rpc_transport_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
+rpc_transport_connect(rpc_transport_t *this, int port)
{
- int32_t ret = -1;
- rpc_transport_t *peer_trans = NULL;
- rpc_transport_pollin_t *pollin = NULL;
- rpc_transport_handover_t *handover_msg = NULL;
-
- if (this->peer_trans) {
- peer_trans = this->peer_trans;
-
- pollin = rpc_transport_same_process_pollin_alloc (this, reply->msg.rpchdr,
- reply->msg.rpchdrcount,
- reply->msg.proghdr,
- reply->msg.proghdrcount,
- reply->msg.progpayload,
- reply->msg.progpayloadcount,
- reply->private, 0);
- if (!pollin) {
- return -ENOMEM;
- }
-
- handover_msg = rpc_transport_handover_alloc (pollin);
- if (!handover_msg) {
- rpc_transport_pollin_destroy (pollin);
- return -ENOMEM;
- }
-
- pthread_mutex_lock (&peer_trans->handover.mutex);
- {
- list_add_tail (&handover_msg->list,
- &peer_trans->handover.msgs);
- pthread_cond_broadcast (&peer_trans->handover.cond);
- }
- pthread_mutex_unlock (&peer_trans->handover.mutex);
-
- return 0;
- }
+ int ret = -1;
- GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
- GF_VALIDATE_OR_GOTO("rpc_transport", this->ops, fail);
+ GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
- ret = this->ops->submit_reply (this, reply);
+ ret = this->ops->connect(this, port);
fail:
- return ret;
+ return ret;
}
-
int32_t
-rpc_transport_connect (rpc_transport_t *this)
+rpc_transport_listen(rpc_transport_t *this)
{
- int ret = -1;
+ int ret = -1;
- GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
+ GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
- ret = this->ops->connect (this);
+ ret = this->ops->listen(this);
fail:
- return ret;
+ return ret;
}
-
int32_t
-rpc_transport_listen (rpc_transport_t *this)
+rpc_transport_disconnect(rpc_transport_t *this, gf_boolean_t wait)
{
- int ret = -1;
+ int32_t ret = -1;
- GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
+ GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
+
+ ret = this->ops->disconnect(this, wait);
- ret = this->ops->listen (this);
fail:
- return ret;
+ return ret;
}
-
-int32_t
-rpc_transport_disconnect (rpc_transport_t *this)
+static void
+rpc_transport_destroy(rpc_transport_t *this)
{
- int32_t ret = -1;
+ struct dnscache6 *cache = NULL;
- GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
+ if (this->clnt_options)
+ dict_unref(this->clnt_options);
+ if (this->options)
+ dict_unref(this->options);
+ if (this->fini)
+ this->fini(this);
- ret = this->ops->disconnect (this);
-fail:
- return ret;
-}
+ pthread_mutex_destroy(&this->lock);
+ GF_FREE(this->name);
-int32_t
-rpc_transport_destroy (rpc_transport_t *this)
-{
- int32_t ret = -1;
+ if (this->dl_handle)
+ dlclose(this->dl_handle);
- GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
+ if (this->ssl_name) {
+ GF_FREE(this->ssl_name);
+ }
- if (this->fini)
- this->fini (this);
- pthread_mutex_destroy (&this->lock);
- GF_FREE (this);
-fail:
- return ret;
-}
+ if (this->dnscache) {
+ cache = this->dnscache;
+ if (cache->first)
+ freeaddrinfo(cache->first);
+ GF_FREE(this->dnscache);
+ }
+ GF_FREE(this);
+}
rpc_transport_t *
-rpc_transport_ref (rpc_transport_t *this)
+rpc_transport_ref(rpc_transport_t *this)
{
- rpc_transport_t *return_this = NULL;
+ rpc_transport_t *return_this = NULL;
- GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
+ GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
- pthread_mutex_lock (&this->lock);
- {
- this->refcount ++;
- }
- pthread_mutex_unlock (&this->lock);
+ GF_ATOMIC_INC(this->refcount);
- return_this = this;
+ return_this = this;
fail:
- return return_this;
+ return return_this;
}
-
int32_t
-rpc_transport_unref (rpc_transport_t *this)
+rpc_transport_unref(rpc_transport_t *this)
{
- int32_t refcount = 0;
- int32_t ret = -1;
+ int32_t refcount = 0;
+ int32_t ret = -1;
- GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
+ GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
- pthread_mutex_lock (&this->lock);
- {
- refcount = --this->refcount;
- }
- pthread_mutex_unlock (&this->lock);
+ refcount = GF_ATOMIC_DEC(this->refcount);
- if (refcount == 0) {
- /* xlator_notify (this->xl, GF_EVENT_RPC_TRANSPORT_CLEANUP,
- this); */
- rpc_transport_destroy (this);
- }
+ if (refcount == 0) {
+ if (this->mydata)
+ this->notify(this, this->mydata, RPC_TRANSPORT_CLEANUP, NULL);
+ this->mydata = NULL;
+ this->notify = NULL;
+ rpc_transport_destroy(this);
+ }
- ret = 0;
+ ret = 0;
fail:
- return ret;
+ return ret;
}
-
int32_t
-rpc_transport_notify (rpc_transport_t *this, rpc_transport_event_t event,
- void *data, ...)
+rpc_transport_notify(rpc_transport_t *this, rpc_transport_event_t event,
+ void *data, ...)
{
- int32_t ret = -1;
+ int32_t ret = -1;
+ GF_VALIDATE_OR_GOTO("rpc", this, out);
- if (this == NULL) {
- goto out;
- }
-
- //ret = this->notify (this, this->notify_data, event, data);
- ret = this->notify (this, this->mydata, event, data);
+ if (this->notify != NULL) {
+ ret = this->notify(this, this->mydata, event, data);
+ } else {
+ ret = 0;
+ }
out:
- return ret;
+ return ret;
}
-
-void *
-rpc_transport_peerproc (void *trans_data)
+int
+rpc_transport_register_notify(rpc_transport_t *trans,
+ rpc_transport_notify_t notify, void *mydata)
{
- rpc_transport_t *trans = NULL;
- rpc_transport_handover_t *msg = NULL;
-
- trans = trans_data;
-
- while (1) {
- pthread_mutex_lock (&trans->handover.mutex);
- {
- while (list_empty (&trans->handover.msgs))
- pthread_cond_wait (&trans->handover.cond,
- &trans->handover.mutex);
+ int32_t ret = -1;
+ GF_VALIDATE_OR_GOTO("rpc", trans, out);
- msg = list_entry (trans->handover.msgs.next,
- rpc_transport_handover_t, list);
+ trans->notify = notify;
+ trans->mydata = mydata;
- list_del_init (&msg->list);
- }
- pthread_mutex_unlock (&trans->handover.mutex);
-
- rpc_transport_notify (trans, RPC_TRANSPORT_MSG_RECEIVED, msg->pollin);
- rpc_transport_handover_destroy (msg);
- }
+ ret = 0;
+out:
+ return ret;
}
-
+// give negative values to skip setting that value
+// this function asserts if both the values are negative.
+// why call it if you don't set it.
int
-rpc_transport_setpeer (rpc_transport_t *trans, rpc_transport_t *peer_trans)
+rpc_transport_keepalive_options_set(dict_t *options, int32_t interval,
+ int32_t time, int32_t timeout)
{
- trans->peer_trans = rpc_transport_ref (peer_trans);
+ int ret = -1;
- INIT_LIST_HEAD (&trans->handover.msgs);
- pthread_cond_init (&trans->handover.cond, NULL);
- pthread_mutex_init (&trans->handover.mutex, NULL);
- pthread_create (&trans->handover.thread, NULL,
- rpc_transport_peerproc, trans);
+ GF_ASSERT(options);
+ GF_ASSERT((interval > 0) || (time > 0));
- peer_trans->peer_trans = rpc_transport_ref (trans);
+ ret = dict_set_int32_sizen(options, "transport.socket.keepalive-interval",
+ interval);
+ if (ret)
+ goto out;
- INIT_LIST_HEAD (&peer_trans->handover.msgs);
- pthread_cond_init (&peer_trans->handover.cond, NULL);
- pthread_mutex_init (&peer_trans->handover.mutex, NULL);
- pthread_create (&peer_trans->handover.thread, NULL,
- rpc_transport_peerproc, peer_trans);
+ ret = dict_set_int32_sizen(options, "transport.socket.keepalive-time",
+ time);
+ if (ret)
+ goto out;
- return 0;
+ ret = dict_set_int32_sizen(options, "transport.tcp-user-timeout", timeout);
+ if (ret)
+ goto out;
+out:
+ return ret;
}
-
-inline int
-rpc_transport_register_notify (rpc_transport_t *trans,
- rpc_transport_notify_t notify, void *mydata)
+int
+rpc_transport_unix_options_build(dict_t *dict, char *filepath,
+ int frame_timeout)
{
- int ret = -1;
-
- if (trans == NULL) {
- goto out;
- }
+ char *fpath = NULL;
+ int ret = -1;
+
+ GF_ASSERT(filepath);
+ GF_VALIDATE_OR_GOTO("rpc-transport", dict, out);
+
+ fpath = gf_strdup(filepath);
+ if (!fpath) {
+ ret = -1;
+ goto out;
+ }
+
+ ret = dict_set_dynstr_sizen(dict, "transport.socket.connect-path", fpath);
+ if (ret) {
+ GF_FREE(fpath);
+ goto out;
+ }
+
+ ret = dict_set_str_sizen(dict, "transport.address-family", "unix");
+ if (ret)
+ goto out;
+
+ ret = dict_set_str_sizen(dict, "transport.socket.nodelay", "off");
+ if (ret)
+ goto out;
+
+ ret = dict_set_str_sizen(dict, "transport-type", "socket");
+ if (ret)
+ goto out;
+
+ ret = dict_set_str_sizen(dict, "transport.socket.keepalive", "off");
+ if (ret)
+ goto out;
+
+ if (frame_timeout > 0) {
+ ret = dict_set_int32_sizen(dict, "frame-timeout", frame_timeout);
+ if (ret)
+ goto out;
+ }
+out:
+ return ret;
+}
- trans->notify = notify;
- trans->mydata = mydata;
+int
+rpc_transport_inet_options_build(dict_t *dict, const char *hostname, int port,
+ char *af)
+{
+ char *host = NULL;
+ int ret = -1;
+#ifdef IPV6_DEFAULT
+ static char *addr_family = "inet6";
+#else
+ static char *addr_family = "inet";
+#endif
- ret = 0;
+ GF_ASSERT(hostname);
+ GF_ASSERT(port >= 1024);
+ GF_VALIDATE_OR_GOTO("rpc-transport", dict, out);
+
+ host = gf_strdup((char *)hostname);
+ if (!host) {
+ ret = -1;
+ goto out;
+ }
+
+ ret = dict_set_dynstr_sizen(dict, "remote-host", host);
+ if (ret) {
+ gf_log(THIS->name, GF_LOG_WARNING, "failed to set remote-host with %s",
+ host);
+ GF_FREE(host);
+ goto out;
+ }
+
+ ret = dict_set_int32_sizen(dict, "remote-port", port);
+ if (ret) {
+ gf_log(THIS->name, GF_LOG_WARNING, "failed to set remote-port with %d",
+ port);
+ goto out;
+ }
+
+ ret = dict_set_str_sizen(dict, "address-family",
+ (af != NULL ? af : addr_family));
+ if (ret) {
+ gf_log(THIS->name, GF_LOG_WARNING, "failed to set address-family to %s",
+ addr_family);
+ goto out;
+ }
+
+ ret = dict_set_str_sizen(dict, "transport-type", "socket");
+ if (ret) {
+ gf_log(THIS->name, GF_LOG_WARNING,
+ "failed to set trans-type with socket");
+ goto out;
+ }
out:
- return ret;
+ return ret;
}