From 0da2a6d08cc8fc2315742d010c8a8cab38ef75bb Mon Sep 17 00:00:00 2001 From: Vijay Bellur Date: Mon, 28 Jun 2010 02:49:46 +0000 Subject: Move rpc to top-level Signed-off-by: Vijay Bellur Signed-off-by: Anand V. Avati BUG: 875 (Implement a new protocol to provide proper backward/forward compatibility) URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=875 --- xlators/protocol/rpc/rpc-transport/Makefile.am | 1 - .../protocol/rpc/rpc-transport/socket/Makefile.am | 1 - .../rpc/rpc-transport/socket/src/Makefile.am | 15 - .../protocol/rpc/rpc-transport/socket/src/name.c | 737 ------- .../protocol/rpc/rpc-transport/socket/src/name.h | 44 - .../protocol/rpc/rpc-transport/socket/src/socket.c | 2308 -------------------- .../protocol/rpc/rpc-transport/socket/src/socket.h | 190 -- 7 files changed, 3296 deletions(-) delete mode 100644 xlators/protocol/rpc/rpc-transport/Makefile.am delete mode 100644 xlators/protocol/rpc/rpc-transport/socket/Makefile.am delete mode 100644 xlators/protocol/rpc/rpc-transport/socket/src/Makefile.am delete mode 100644 xlators/protocol/rpc/rpc-transport/socket/src/name.c delete mode 100644 xlators/protocol/rpc/rpc-transport/socket/src/name.h delete mode 100644 xlators/protocol/rpc/rpc-transport/socket/src/socket.c delete mode 100644 xlators/protocol/rpc/rpc-transport/socket/src/socket.h (limited to 'xlators/protocol/rpc/rpc-transport') diff --git a/xlators/protocol/rpc/rpc-transport/Makefile.am b/xlators/protocol/rpc/rpc-transport/Makefile.am deleted file mode 100644 index 7dd9f026cfc..00000000000 --- a/xlators/protocol/rpc/rpc-transport/Makefile.am +++ /dev/null @@ -1 +0,0 @@ -SUBDIRS = socket diff --git a/xlators/protocol/rpc/rpc-transport/socket/Makefile.am b/xlators/protocol/rpc/rpc-transport/socket/Makefile.am deleted file mode 100644 index f963effea22..00000000000 --- a/xlators/protocol/rpc/rpc-transport/socket/Makefile.am +++ /dev/null @@ -1 +0,0 @@ -SUBDIRS = src \ No newline at end of file diff --git a/xlators/protocol/rpc/rpc-transport/socket/src/Makefile.am b/xlators/protocol/rpc/rpc-transport/socket/src/Makefile.am deleted file mode 100644 index 325a58bb05d..00000000000 --- a/xlators/protocol/rpc/rpc-transport/socket/src/Makefile.am +++ /dev/null @@ -1,15 +0,0 @@ -noinst_HEADERS = socket.h name.h - -rpctransport_LTLIBRARIES = socket.la -rpctransportdir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/rpc-transport - -socket_la_LDFLAGS = -module -avoidversion - -socket_la_SOURCES = socket.c name.c -socket_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la - -AM_CFLAGS = -fPIC -D_FILE_OFFSET_BITS=64 -D_GNU_SOURCE -Wall -D$(GF_HOST_OS)\ - -I$(top_srcdir)/libglusterfs/src -I$(top_srcdir)/xlators/protocol/rpc/rpc-lib/src/ \ - -I$(top_srcdir)/xlators/protocol/lib/src/ -shared -nostartfiles $(GF_CFLAGS) - -CLEANFILES = *~ diff --git a/xlators/protocol/rpc/rpc-transport/socket/src/name.c b/xlators/protocol/rpc/rpc-transport/socket/src/name.c deleted file mode 100644 index 763fa3dd09b..00000000000 --- a/xlators/protocol/rpc/rpc-transport/socket/src/name.c +++ /dev/null @@ -1,737 +0,0 @@ -/* - Copyright (c) 2010 Gluster, Inc. - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - . -*/ - -#include -#include -#include -#include -#include -#include - -#ifdef CLIENT_PORT_CEILING -#undef CLIENT_PORT_CEILING -#endif - -#define CLIENT_PORT_CEILING 1024 - -#ifndef AF_INET_SDP -#define AF_INET_SDP 27 -#endif - -#include "rpc-transport.h" -#include "socket.h" - -int32_t -gf_resolve_ip6 (const char *hostname, - uint16_t port, - int family, - void **dnscache, - struct addrinfo **addr_info); - -static int32_t -af_inet_bind_to_port_lt_ceiling (int fd, struct sockaddr *sockaddr, - socklen_t sockaddr_len, int ceiling) -{ - int32_t ret = -1; - /* struct sockaddr_in sin = {0, }; */ - uint16_t port = ceiling - 1; - - while (port) - { - switch (sockaddr->sa_family) - { - case AF_INET6: - ((struct sockaddr_in6 *)sockaddr)->sin6_port = htons (port); - break; - - case AF_INET_SDP: - case AF_INET: - ((struct sockaddr_in *)sockaddr)->sin_port = htons (port); - break; - } - - ret = bind (fd, sockaddr, sockaddr_len); - - if (ret == 0) - break; - - if (ret == -1 && errno == EACCES) - break; - - port--; - } - - return ret; -} - -static int32_t -af_unix_client_bind (rpc_transport_t *this, - struct sockaddr *sockaddr, - socklen_t sockaddr_len, - int sock) -{ - data_t *path_data = NULL; - struct sockaddr_un *addr = NULL; - int32_t ret = 0; - - path_data = dict_get (this->options, "transport.socket.bind-path"); - if (path_data) { - char *path = data_to_str (path_data); - if (!path || strlen (path) > UNIX_PATH_MAX) { - gf_log (this->name, GF_LOG_TRACE, - "bind-path not specfied for unix socket, " - "letting connect to assign default value"); - goto err; - } - - addr = (struct sockaddr_un *) sockaddr; - strcpy (addr->sun_path, path); - ret = bind (sock, (struct sockaddr *)addr, sockaddr_len); - if (ret == -1) { - gf_log (this->name, GF_LOG_ERROR, - "cannot bind to unix-domain socket %d (%s)", - sock, strerror (errno)); - goto err; - } - } else { - gf_log (this->name, GF_LOG_TRACE, - "bind-path not specfied for unix socket, " - "letting connect to assign default value"); - } - -err: - return ret; -} - -int32_t -client_fill_address_family (rpc_transport_t *this, sa_family_t *sa_family) -{ - data_t *address_family_data = NULL; - int32_t ret = -1; - - if (sa_family == NULL) { - goto out; - } - - address_family_data = dict_get (this->options, - "transport.address-family"); - if (!address_family_data) { - data_t *remote_host_data = NULL, *connect_path_data = NULL; - remote_host_data = dict_get (this->options, "remote-host"); - connect_path_data = dict_get (this->options, - "transport.socket.connect-path"); - - if (!(remote_host_data || connect_path_data) || - (remote_host_data && connect_path_data)) { - gf_log (this->name, GF_LOG_ERROR, - "transport.address-family not specified and " - "not able to determine the " - "same from other options (remote-host:%s and " - "transport.unix.connect-path:%s)", - data_to_str (remote_host_data), - data_to_str (connect_path_data)); - goto out; - } - - if (remote_host_data) { - gf_log (this->name, GF_LOG_DEBUG, - "address-family not specified, guessing it " - "to be inet/inet6"); - *sa_family = AF_UNSPEC; - } else { - gf_log (this->name, GF_LOG_DEBUG, - "address-family not specified, guessing it " - "to be unix"); - *sa_family = AF_UNIX; - } - - } else { - char *address_family = data_to_str (address_family_data); - if (!strcasecmp (address_family, "unix")) { - *sa_family = AF_UNIX; - } else if (!strcasecmp (address_family, "inet")) { - *sa_family = AF_INET; - } else if (!strcasecmp (address_family, "inet6")) { - *sa_family = AF_INET6; - } else if (!strcasecmp (address_family, "inet-sdp")) { - *sa_family = AF_INET_SDP; - } else if (!strcasecmp (address_family, "inet/inet6") - || !strcasecmp (address_family, "inet6/inet")) { - *sa_family = AF_UNSPEC; - } else { - gf_log (this->name, GF_LOG_ERROR, - "unknown address-family (%s) specified", - address_family); - goto out; - } - } - - ret = 0; - -out: - return ret; -} - -static int32_t -af_inet_client_get_remote_sockaddr (rpc_transport_t *this, - struct sockaddr *sockaddr, - socklen_t *sockaddr_len) -{ - dict_t *options = this->options; - data_t *remote_host_data = NULL; - data_t *remote_port_data = NULL; - char *remote_host = NULL; - uint16_t remote_port = 0; - struct addrinfo *addr_info = NULL; - int32_t ret = 0; - - remote_host_data = dict_get (options, "remote-host"); - if (remote_host_data == NULL) - { - gf_log (this->name, GF_LOG_ERROR, - "option remote-host missing in volume %s", this->name); - ret = -1; - goto err; - } - - remote_host = data_to_str (remote_host_data); - if (remote_host == NULL) - { - gf_log (this->name, GF_LOG_ERROR, - "option remote-host has data NULL in volume %s", this->name); - ret = -1; - goto err; - } - - remote_port_data = dict_get (options, "remote-port"); - if (remote_port_data == NULL) - { - gf_log (this->name, GF_LOG_TRACE, - "option remote-port missing in volume %s. Defaulting to %d", - this->name, GF_DEFAULT_SOCKET_LISTEN_PORT); - - remote_port = GF_DEFAULT_SOCKET_LISTEN_PORT; - } - else - { - remote_port = data_to_uint16 (remote_port_data); - } - - if (remote_port == (uint16_t)-1) - { - gf_log (this->name, GF_LOG_ERROR, - "option remote-port has invalid port in volume %s", - this->name); - ret = -1; - goto err; - } - - /* TODO: gf_resolve is a blocking call. kick in some - non blocking dns techniques */ - ret = gf_resolve_ip6 (remote_host, remote_port, - sockaddr->sa_family, &this->dnscache, &addr_info); - if (ret == -1) { - gf_log (this->name, GF_LOG_ERROR, - "DNS resolution failed on host %s", remote_host); - goto err; - } - - memcpy (sockaddr, addr_info->ai_addr, addr_info->ai_addrlen); - *sockaddr_len = addr_info->ai_addrlen; - -err: - return ret; -} - -static int32_t -af_unix_client_get_remote_sockaddr (rpc_transport_t *this, - struct sockaddr *sockaddr, - socklen_t *sockaddr_len) -{ - struct sockaddr_un *sockaddr_un = NULL; - char *connect_path = NULL; - data_t *connect_path_data = NULL; - int32_t ret = 0; - - connect_path_data = dict_get (this->options, - "transport.socket.connect-path"); - if (!connect_path_data) { - gf_log (this->name, GF_LOG_ERROR, - "option transport.unix.connect-path not specified for " - "address-family unix"); - ret = -1; - goto err; - } - - connect_path = data_to_str (connect_path_data); - if (!connect_path) { - gf_log (this->name, GF_LOG_ERROR, - "transport.unix.connect-path is null-string"); - ret = -1; - goto err; - } - - if (strlen (connect_path) > UNIX_PATH_MAX) { - gf_log (this->name, GF_LOG_ERROR, - "connect-path value length %"GF_PRI_SIZET" > %d octets", - strlen (connect_path), UNIX_PATH_MAX); - ret = -1; - goto err; - } - - gf_log (this->name, GF_LOG_TRACE, - "using connect-path %s", connect_path); - sockaddr_un = (struct sockaddr_un *)sockaddr; - strcpy (sockaddr_un->sun_path, connect_path); - *sockaddr_len = sizeof (struct sockaddr_un); - -err: - return ret; -} - -static int32_t -af_unix_server_get_local_sockaddr (rpc_transport_t *this, - struct sockaddr *addr, - socklen_t *addr_len) -{ - data_t *listen_path_data = NULL; - char *listen_path = NULL; - int32_t ret = 0; - struct sockaddr_un *sunaddr = (struct sockaddr_un *)addr; - - - listen_path_data = dict_get (this->options, - "transport.socket.listen-path"); - if (!listen_path_data) { - gf_log (this->name, GF_LOG_ERROR, - "missing option transport.socket.listen-path"); - ret = -1; - goto err; - } - - listen_path = data_to_str (listen_path_data); - -#ifndef UNIX_PATH_MAX -#define UNIX_PATH_MAX 108 -#endif - - if (strlen (listen_path) > UNIX_PATH_MAX) { - gf_log (this->name, GF_LOG_ERROR, - "option transport.unix.listen-path has value length " - "%"GF_PRI_SIZET" > %d", - strlen (listen_path), UNIX_PATH_MAX); - ret = -1; - goto err; - } - - sunaddr->sun_family = AF_UNIX; - strcpy (sunaddr->sun_path, listen_path); - *addr_len = sizeof (struct sockaddr_un); - -err: - return ret; -} - -static int32_t -af_inet_server_get_local_sockaddr (rpc_transport_t *this, - struct sockaddr *addr, - socklen_t *addr_len) -{ - struct addrinfo hints, *res = 0; - data_t *listen_port_data = NULL, *listen_host_data = NULL; - uint16_t listen_port = -1; - char service[NI_MAXSERV], *listen_host = NULL; - dict_t *options = NULL; - int32_t ret = 0; - - options = this->options; - - listen_port_data = dict_get (options, "transport.socket.listen-port"); - listen_host_data = dict_get (options, "transport.socket.bind-address"); - - if (listen_port_data) - { - listen_port = data_to_uint16 (listen_port_data); - } - - if (listen_port == (uint16_t) -1) - listen_port = GF_DEFAULT_SOCKET_LISTEN_PORT; - - - if (listen_host_data) - { - listen_host = data_to_str (listen_host_data); - } else { - if (addr->sa_family == AF_INET6) { - struct sockaddr_in6 *in = (struct sockaddr_in6 *) addr; - in->sin6_addr = in6addr_any; - in->sin6_port = htons(listen_port); - *addr_len = sizeof(struct sockaddr_in6); - goto out; - } else if (addr->sa_family == AF_INET) { - struct sockaddr_in *in = (struct sockaddr_in *) addr; - in->sin_addr.s_addr = htonl(INADDR_ANY); - in->sin_port = htons(listen_port); - *addr_len = sizeof(struct sockaddr_in); - goto out; - } - } - - memset (service, 0, sizeof (service)); - sprintf (service, "%d", listen_port); - - memset (&hints, 0, sizeof (hints)); - hints.ai_family = addr->sa_family; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_ADDRCONFIG | AI_PASSIVE; - - ret = getaddrinfo(listen_host, service, &hints, &res); - if (ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "getaddrinfo failed for host %s, service %s (%s)", - listen_host, service, gai_strerror (ret)); - ret = -1; - goto out; - } - - memcpy (addr, res->ai_addr, res->ai_addrlen); - *addr_len = res->ai_addrlen; - - freeaddrinfo (res); - -out: - return ret; -} - -int32_t -client_bind (rpc_transport_t *this, - struct sockaddr *sockaddr, - socklen_t *sockaddr_len, - int sock) -{ - int ret = 0; - - *sockaddr_len = sizeof (struct sockaddr_in6); - switch (sockaddr->sa_family) - { - case AF_INET_SDP: - case AF_INET: - *sockaddr_len = sizeof (struct sockaddr_in); - - case AF_INET6: - ret = af_inet_bind_to_port_lt_ceiling (sock, sockaddr, - *sockaddr_len, CLIENT_PORT_CEILING); - if (ret == -1) { - gf_log (this->name, GF_LOG_WARNING, - "cannot bind inet socket (%d) to port less than %d (%s)", - sock, CLIENT_PORT_CEILING, strerror (errno)); - ret = 0; - } - break; - - case AF_UNIX: - *sockaddr_len = sizeof (struct sockaddr_un); - ret = af_unix_client_bind (this, (struct sockaddr *)sockaddr, - *sockaddr_len, sock); - break; - - default: - gf_log (this->name, GF_LOG_ERROR, - "unknown address family %d", sockaddr->sa_family); - ret = -1; - break; - } - - return ret; -} - -int32_t -socket_client_get_remote_sockaddr (rpc_transport_t *this, - struct sockaddr *sockaddr, - socklen_t *sockaddr_len, - sa_family_t *sa_family) -{ - int32_t ret = 0; - - if ((sockaddr == NULL) || (sockaddr_len == NULL) - || (sa_family == NULL)) { - ret = -1; - goto err; - } - - - ret = client_fill_address_family (this, &sockaddr->sa_family); - if (ret) { - ret = -1; - goto err; - } - - *sa_family = sockaddr->sa_family; - - switch (sockaddr->sa_family) - { - case AF_INET_SDP: - sockaddr->sa_family = AF_INET; - - case AF_INET: - case AF_INET6: - case AF_UNSPEC: - ret = af_inet_client_get_remote_sockaddr (this, sockaddr, - sockaddr_len); - break; - - case AF_UNIX: - ret = af_unix_client_get_remote_sockaddr (this, sockaddr, - sockaddr_len); - break; - - default: - gf_log (this->name, GF_LOG_ERROR, - "unknown address-family %d", sockaddr->sa_family); - ret = -1; - } - - if (*sa_family == AF_UNSPEC) { - *sa_family = sockaddr->sa_family; - } - -err: - return ret; -} - - -int32_t -server_fill_address_family (rpc_transport_t *this, sa_family_t *sa_family) -{ - data_t *address_family_data = NULL; - int32_t ret = -1; - - if (sa_family == NULL) { - goto out; - } - - address_family_data = dict_get (this->options, - "transport.address-family"); - if (address_family_data) { - char *address_family = NULL; - address_family = data_to_str (address_family_data); - - if (!strcasecmp (address_family, "inet")) { - *sa_family = AF_INET; - } else if (!strcasecmp (address_family, "inet6")) { - *sa_family = AF_INET6; - } else if (!strcasecmp (address_family, "inet-sdp")) { - *sa_family = AF_INET_SDP; - } else if (!strcasecmp (address_family, "unix")) { - *sa_family = AF_UNIX; - } else if (!strcasecmp (address_family, "inet/inet6") - || !strcasecmp (address_family, "inet6/inet")) { - *sa_family = AF_UNSPEC; - } else { - gf_log (this->name, GF_LOG_ERROR, - "unknown address family (%s) specified", address_family); - goto out; - } - } else { - gf_log (this->name, GF_LOG_DEBUG, - "option address-family not specified, defaulting to inet/inet6"); - *sa_family = AF_UNSPEC; - } - - ret = 0; -out: - return ret; -} - - -int32_t -socket_server_get_local_sockaddr (rpc_transport_t *this, struct sockaddr *addr, - socklen_t *addr_len, sa_family_t *sa_family) -{ - int32_t ret = -1; - - if ((addr == NULL) || (addr_len == NULL) || (sa_family == NULL)) { - goto err; - } - - ret = server_fill_address_family (this, &addr->sa_family); - if (ret == -1) { - goto err; - } - - *sa_family = addr->sa_family; - - switch (addr->sa_family) - { - case AF_INET_SDP: - addr->sa_family = AF_INET; - - case AF_INET: - case AF_INET6: - case AF_UNSPEC: - ret = af_inet_server_get_local_sockaddr (this, addr, addr_len); - break; - - case AF_UNIX: - ret = af_unix_server_get_local_sockaddr (this, addr, addr_len); - break; - } - - if (*sa_family == AF_UNSPEC) { - *sa_family = addr->sa_family; - } - -err: - return ret; -} - -int32_t -fill_inet6_inet_identifiers (rpc_transport_t *this, struct sockaddr_storage *addr, - int32_t addr_len, char *identifier) -{ - int32_t ret = 0, tmpaddr_len = 0; - char service[NI_MAXSERV], host[NI_MAXHOST]; - struct sockaddr_storage tmpaddr; - - memset (&tmpaddr, 0, sizeof (tmpaddr)); - tmpaddr = *addr; - tmpaddr_len = addr_len; - - if (((struct sockaddr *) &tmpaddr)->sa_family == AF_INET6) { - int32_t one_to_four, four_to_eight, twelve_to_sixteen; - int16_t eight_to_ten, ten_to_twelve; - - one_to_four = four_to_eight = twelve_to_sixteen = 0; - eight_to_ten = ten_to_twelve = 0; - - one_to_four = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[0]; - four_to_eight = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[1]; -#ifdef GF_SOLARIS_HOST_OS - eight_to_ten = S6_ADDR16(((struct sockaddr_in6 *) &tmpaddr)->sin6_addr)[4]; -#else - eight_to_ten = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr16[4]; -#endif - -#ifdef GF_SOLARIS_HOST_OS - ten_to_twelve = S6_ADDR16(((struct sockaddr_in6 *) &tmpaddr)->sin6_addr)[5]; -#else - ten_to_twelve = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr16[5]; -#endif - - twelve_to_sixteen = ((struct sockaddr_in6 *) &tmpaddr)->sin6_addr.s6_addr32[3]; - - /* ipv4 mapped ipv6 address has - bits 0-80: 0 - bits 80-96: 0xffff - bits 96-128: ipv4 address - */ - - if (one_to_four == 0 && - four_to_eight == 0 && - eight_to_ten == 0 && - ten_to_twelve == -1) { - struct sockaddr_in *in_ptr = (struct sockaddr_in *)&tmpaddr; - memset (&tmpaddr, 0, sizeof (tmpaddr)); - - in_ptr->sin_family = AF_INET; - in_ptr->sin_port = ((struct sockaddr_in6 *)addr)->sin6_port; - in_ptr->sin_addr.s_addr = twelve_to_sixteen; - tmpaddr_len = sizeof (*in_ptr); - } - } - - ret = getnameinfo ((struct sockaddr *) &tmpaddr, - tmpaddr_len, - host, sizeof (host), - service, sizeof (service), - NI_NUMERICHOST | NI_NUMERICSERV); - if (ret != 0) { - gf_log (this->name, GF_LOG_ERROR, - "getnameinfo failed (%s)", gai_strerror (ret)); - } - - sprintf (identifier, "%s:%s", host, service); - - return ret; -} - -int32_t -get_transport_identifiers (rpc_transport_t *this) -{ - int32_t ret = 0; - char is_inet_sdp = 0; - - switch (((struct sockaddr *) &this->myinfo.sockaddr)->sa_family) - { - case AF_INET_SDP: - is_inet_sdp = 1; - ((struct sockaddr *) &this->peerinfo.sockaddr)->sa_family = ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = AF_INET; - - case AF_INET: - case AF_INET6: - { - ret = fill_inet6_inet_identifiers (this, - &this->myinfo.sockaddr, - this->myinfo.sockaddr_len, - this->myinfo.identifier); - if (ret == -1) { - gf_log (this->name, GF_LOG_ERROR, - "cannot fill inet/inet6 identifier for server"); - goto err; - } - - ret = fill_inet6_inet_identifiers (this, - &this->peerinfo.sockaddr, - this->peerinfo.sockaddr_len, - this->peerinfo.identifier); - if (ret == -1) { - gf_log (this->name, GF_LOG_ERROR, - "cannot fill inet/inet6 identifier for client"); - goto err; - } - - if (is_inet_sdp) { - ((struct sockaddr *) &this->peerinfo.sockaddr)->sa_family = ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family = AF_INET_SDP; - } - } - break; - - case AF_UNIX: - { - struct sockaddr_un *sunaddr = NULL; - - sunaddr = (struct sockaddr_un *) &this->myinfo.sockaddr; - strcpy (this->myinfo.identifier, sunaddr->sun_path); - - sunaddr = (struct sockaddr_un *) &this->peerinfo.sockaddr; - strcpy (this->peerinfo.identifier, sunaddr->sun_path); - } - break; - - default: - gf_log (this->name, GF_LOG_ERROR, - "unknown address family (%d)", - ((struct sockaddr *) &this->myinfo.sockaddr)->sa_family); - ret = -1; - break; - } - -err: - return ret; -} diff --git a/xlators/protocol/rpc/rpc-transport/socket/src/name.h b/xlators/protocol/rpc/rpc-transport/socket/src/name.h deleted file mode 100644 index 6a89d383b65..00000000000 --- a/xlators/protocol/rpc/rpc-transport/socket/src/name.h +++ /dev/null @@ -1,44 +0,0 @@ -/* - Copyright (c) 2010 Gluster, Inc. - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - . -*/ - -#ifndef _SOCKET_NAME_H -#define _SOCKET_NAME_H - -#include "compat.h" - -int32_t -client_bind (rpc_transport_t *this, - struct sockaddr *sockaddr, - socklen_t *sockaddr_len, - int sock); - -int32_t -socket_client_get_remote_sockaddr (rpc_transport_t *this, - struct sockaddr *sockaddr, - socklen_t *sockaddr_len, - sa_family_t *sa_family); - -int32_t -socket_server_get_local_sockaddr (rpc_transport_t *this, struct sockaddr *addr, - socklen_t *addr_len, sa_family_t *sa_family); - -int32_t -get_transport_identifiers (rpc_transport_t *this); - -#endif /* _SOCKET_NAME_H */ diff --git a/xlators/protocol/rpc/rpc-transport/socket/src/socket.c b/xlators/protocol/rpc/rpc-transport/socket/src/socket.c deleted file mode 100644 index 4ca7121e8af..00000000000 --- a/xlators/protocol/rpc/rpc-transport/socket/src/socket.c +++ /dev/null @@ -1,2308 +0,0 @@ -/* - Copyright (c) 2010 Gluster, Inc. - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - . -*/ - - -#ifndef _CONFIG_H -#define _CONFIG_H -#include "config.h" -#endif - -#include "socket.h" -#include "name.h" -#include "dict.h" -#include "rpc-transport.h" -#include "logging.h" -#include "xlator.h" -#include "byte-order.h" -#include "common-utils.h" -#include "compat-errno.h" -#include "protocol-common.h" - -#include "glusterfs-xdr.h" -#include -#include -#include - -#define GF_LOG_ERRNO(errno) ((errno == ENOTCONN) ? GF_LOG_DEBUG : GF_LOG_ERROR) -#define SA(ptr) ((struct sockaddr *)ptr) - -#define __socket_proto_reset_pending(priv) do { \ - memset (&priv->incoming.frag.vector, 0, \ - sizeof (priv->incoming.frag.vector)); \ - priv->incoming.frag.pending_vector = \ - &priv->incoming.frag.vector; \ - priv->incoming.frag.pending_vector->iov_base = \ - priv->incoming.frag.fragcurrent; \ - priv->incoming.pending_vector = \ - priv->incoming.frag.pending_vector; \ - } while (0); - - -#define __socket_proto_update_pending(priv) \ - do { \ - uint32_t remaining_fragsize = 0; \ - if (priv->incoming.frag.pending_vector->iov_len == 0) { \ - remaining_fragsize = RPC_FRAGSIZE (priv->incoming.fraghdr) \ - - priv->incoming.frag.bytes_read; \ - \ - priv->incoming.frag.pending_vector->iov_len = \ - remaining_fragsize > priv->incoming.frag.remaining_size \ - ? priv->incoming.frag.remaining_size : remaining_fragsize; \ - \ - priv->incoming.frag.remaining_size -= \ - priv->incoming.frag.pending_vector->iov_len; \ - } \ - } while (0); - -#define __socket_proto_update_priv_after_read(priv, ret, bytes_read) \ - { \ - priv->incoming.frag.fragcurrent += bytes_read; \ - priv->incoming.frag.bytes_read += bytes_read; \ - \ - if ((ret > 0) || (priv->incoming.frag.remaining_size != 0)) { \ - if (priv->incoming.frag.remaining_size != 0) { \ - __socket_proto_reset_pending (priv); \ - } \ - \ - gf_log (this->name, GF_LOG_TRACE, "partial read on non-blocking socket"); \ - \ - break; \ - } \ - } - -#define __socket_proto_init_pending(priv, size) \ - do { \ - uint32_t remaining_fragsize = 0; \ - remaining_fragsize = RPC_FRAGSIZE (priv->incoming.fraghdr) \ - - priv->incoming.frag.bytes_read; \ - \ - __socket_proto_reset_pending (priv); \ - \ - priv->incoming.frag.pending_vector->iov_len = \ - remaining_fragsize > size ? size : remaining_fragsize; \ - \ - priv->incoming.frag.remaining_size = \ - size - priv->incoming.frag.pending_vector->iov_len; \ - \ -} while (0); - - -/* This will be used in a switch case and breaks from the switch case if all - * the pending data is not read. - */ -#define __socket_proto_read(priv, ret) \ - { \ - size_t bytes_read = 0; \ - \ - __socket_proto_update_pending (priv); \ - \ - ret = __socket_readv (this, \ - priv->incoming.pending_vector, 1, \ - &priv->incoming.pending_vector, \ - &priv->incoming.pending_count, \ - &bytes_read); \ - if (ret == -1) { \ - gf_log (this->name, GF_LOG_TRACE, \ - "reading from socket failed. Error (%s), " \ - "peer (%s)", strerror (errno), \ - this->peerinfo.identifier); \ - break; \ - } \ - __socket_proto_update_priv_after_read (priv, ret, bytes_read); \ - } - - -int socket_init (rpc_transport_t *this); - -/* - * return value: - * 0 = success (completed) - * -1 = error - * > 0 = incomplete - */ - -int -__socket_rwv (rpc_transport_t *this, struct iovec *vector, int count, - struct iovec **pending_vector, int *pending_count, size_t *bytes, - int write) -{ - socket_private_t *priv = NULL; - int sock = -1; - int ret = -1; - struct iovec *opvector = NULL; - int opcount = 0; - int moved = 0; - - priv = this->private; - sock = priv->sock; - - opvector = vector; - opcount = count; - - if (bytes != NULL) { - *bytes = 0; - } - - while (opcount) { - if (write) { - ret = writev (sock, opvector, opcount); - - if (ret == 0 || (ret == -1 && errno == EAGAIN)) { - /* done for now */ - break; - } - } else { - ret = readv (sock, opvector, opcount); - if (ret == -1 && errno == EAGAIN) { - /* done for now */ - break; - } - } - - if (ret == 0) { - /* Mostly due to 'umount' in client */ - - gf_log (this->name, GF_LOG_TRACE, - "EOF from peer %s", this->peerinfo.identifier); - opcount = -1; - errno = ENOTCONN; - break; - } - if (ret == -1) { - if (errno == EINTR) - continue; - - gf_log (this->name, GF_LOG_TRACE, - "%s failed (%s)", write ? "writev" : "readv", - strerror (errno)); - opcount = -1; - break; - } - - if (bytes != NULL) { - *bytes += ret; - } - - moved = 0; - - while (moved < ret) { - if ((ret - moved) >= opvector[0].iov_len) { - moved += opvector[0].iov_len; - opvector++; - opcount--; - } else { - opvector[0].iov_len -= (ret - moved); - opvector[0].iov_base += (ret - moved); - moved += (ret - moved); - } - while (opcount && !opvector[0].iov_len) { - opvector++; - opcount--; - } - } - } - - if (pending_vector) - *pending_vector = opvector; - - if (pending_count) - *pending_count = opcount; - - return opcount; -} - - -int -__socket_readv (rpc_transport_t *this, struct iovec *vector, int count, - struct iovec **pending_vector, int *pending_count, - size_t *bytes) -{ - int ret = -1; - - ret = __socket_rwv (this, vector, count, - pending_vector, pending_count, bytes, 0); - - return ret; -} - - -int -__socket_writev (rpc_transport_t *this, struct iovec *vector, int count, - struct iovec **pending_vector, int *pending_count) -{ - int ret = -1; - - ret = __socket_rwv (this, vector, count, - pending_vector, pending_count, NULL, 1); - - return ret; -} - - -int -__socket_disconnect (rpc_transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = -1; - - priv = this->private; - - if (priv->sock != -1) { - ret = shutdown (priv->sock, SHUT_RDWR); - priv->connected = -1; - gf_log (this->name, GF_LOG_TRACE, - "shutdown() returned %d. set connection state to -1", - ret); - } - - return ret; -} - - -int -__socket_server_bind (rpc_transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = -1; - int opt = 1; - - priv = this->private; - - ret = setsockopt (priv->sock, SOL_SOCKET, SO_REUSEADDR, - &opt, sizeof (opt)); - - if (ret == -1) { - gf_log (this->name, GF_LOG_ERROR, - "setsockopt() for SO_REUSEADDR failed (%s)", - strerror (errno)); - } - - ret = bind (priv->sock, (struct sockaddr *)&this->myinfo.sockaddr, - this->myinfo.sockaddr_len); - - if (ret == -1) { - gf_log (this->name, GF_LOG_ERROR, - "binding to %s failed: %s", - this->myinfo.identifier, strerror (errno)); - if (errno == EADDRINUSE) { - gf_log (this->name, GF_LOG_ERROR, - "Port is already in use"); - } - } - - return ret; -} - - -int -__socket_nonblock (int fd) -{ - int flags = 0; - int ret = -1; - - flags = fcntl (fd, F_GETFL); - - if (flags != -1) - ret = fcntl (fd, F_SETFL, flags | O_NONBLOCK); - - return ret; -} - - -int -__socket_nodelay (int fd) -{ - int on = 1; - int ret = -1; - - ret = setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, - &on, sizeof (on)); - if (!ret) - gf_log ("", GF_LOG_TRACE, - "NODELAY enabled for socket %d", fd); - - return ret; -} - -int -__socket_connect_finish (int fd) -{ - int ret = -1; - int optval = 0; - socklen_t optlen = sizeof (int); - - ret = getsockopt (fd, SOL_SOCKET, SO_ERROR, (void *)&optval, &optlen); - - if (ret == 0 && optval) { - errno = optval; - ret = -1; - } - - return ret; -} - - -void -__socket_reset (rpc_transport_t *this) -{ - socket_private_t *priv = NULL; - - priv = this->private; - - /* TODO: use mem-pool on incoming data */ - - if (priv->incoming.iobuf) { - iobuf_unref (priv->incoming.iobuf); - } - - if (priv->incoming.vectoriob) { - iobuf_unref (priv->incoming.vectoriob); - } - - memset (&priv->incoming, 0, sizeof (priv->incoming)); - - event_unregister (this->ctx->event_pool, priv->sock, priv->idx); - - close (priv->sock); - priv->sock = -1; - priv->idx = -1; - priv->connected = -1; -} - - -struct ioq * -__socket_ioq_new (rpc_transport_t *this, rpc_transport_msg_t *msg) -{ - socket_private_t *priv = NULL; - struct ioq *entry = NULL; - int count = 0; - - priv = this->private; - - /* TODO: use mem-pool */ - entry = GF_CALLOC (1, sizeof (*entry), gf_common_mt_ioq); - if (!entry) - return NULL; - - count = msg->rpchdrcount + msg->proghdrcount + msg->progpayloadcount; - - assert (count <= MAX_IOVEC); - - if (msg->rpchdr != NULL) { - memcpy (&entry->vector[0], msg->rpchdr, - sizeof (struct iovec) * msg->rpchdrcount); - entry->count += msg->rpchdrcount; - } - - if (msg->proghdr != NULL) { - memcpy (&entry->vector[entry->count], msg->proghdr, - sizeof (struct iovec) * msg->proghdrcount); - entry->count += msg->proghdrcount; - } - - if (msg->progpayload != NULL) { - memcpy (&entry->vector[entry->count], msg->progpayload, - sizeof (struct iovec) * msg->progpayloadcount); - entry->count += msg->progpayloadcount; - } - - entry->pending_vector = entry->vector; - entry->pending_count = entry->count; - - if (msg->iobref != NULL) - entry->iobref = iobref_ref (msg->iobref); - - INIT_LIST_HEAD (&entry->list); - - return entry; -} - - -void -__socket_ioq_entry_free (struct ioq *entry) -{ - list_del_init (&entry->list); - if (entry->iobref) - iobref_unref (entry->iobref); - - /* TODO: use mem-pool */ - GF_FREE (entry); -} - - -void -__socket_ioq_flush (rpc_transport_t *this) -{ - socket_private_t *priv = NULL; - struct ioq *entry = NULL; - - priv = this->private; - - while (!list_empty (&priv->ioq)) { - entry = priv->ioq_next; - __socket_ioq_entry_free (entry); - } - - return; -} - - -int -__socket_ioq_churn_entry (rpc_transport_t *this, struct ioq *entry) -{ - int ret = -1; - - ret = __socket_writev (this, entry->pending_vector, - entry->pending_count, - &entry->pending_vector, - &entry->pending_count); - - if (ret == 0) { - /* current entry was completely written */ - assert (entry->pending_count == 0); - __socket_ioq_entry_free (entry); - } - - return ret; -} - - -int -__socket_ioq_churn (rpc_transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = 0; - struct ioq *entry = NULL; - - priv = this->private; - - while (!list_empty (&priv->ioq)) { - /* pick next entry */ - entry = priv->ioq_next; - - ret = __socket_ioq_churn_entry (this, entry); - - if (ret != 0) - break; - } - - if (list_empty (&priv->ioq)) { - /* all pending writes done, not interested in POLLOUT */ - priv->idx = event_select_on (this->ctx->event_pool, - priv->sock, priv->idx, -1, 0); - } - - return ret; -} - - -int -socket_event_poll_err (rpc_transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = -1; - - priv = this->private; - - pthread_mutex_lock (&priv->lock); - { - __socket_ioq_flush (this); - __socket_reset (this); - } - pthread_mutex_unlock (&priv->lock); - - rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this); - - return ret; -} - - -int -socket_event_poll_out (rpc_transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = -1; - - priv = this->private; - - pthread_mutex_lock (&priv->lock); - { - if (priv->connected == 1) { - ret = __socket_ioq_churn (this); - - if (ret == -1) { - __socket_disconnect (this); - } - } - } - pthread_mutex_unlock (&priv->lock); - - ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_SENT, NULL); - - return ret; -} - - -inline int -__socket_read_simple_msg (rpc_transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = 0; - uint32_t remaining_size = 0; - size_t bytes_read = 0; - - priv = this->private; - - switch (priv->incoming.frag.simple_state) { - - case SP_STATE_SIMPLE_MSG_INIT: - remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) - - priv->incoming.frag.bytes_read; - - __socket_proto_init_pending (priv, remaining_size); - - priv->incoming.frag.simple_state = - SP_STATE_READING_SIMPLE_MSG; - - /* fall through */ - - case SP_STATE_READING_SIMPLE_MSG: - ret = 0; - - remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) - - priv->incoming.frag.bytes_read; - - if (remaining_size > 0) { - ret = __socket_readv (this, - priv->incoming.pending_vector, 1, - &priv->incoming.pending_vector, - &priv->incoming.pending_count, - &bytes_read); - } - - if (ret == -1) { - gf_log (this->name, GF_LOG_TRACE, - "reading from socket failed. Error (%s), " - "peer (%s)", strerror (errno), - this->peerinfo.identifier); - break; - } - - priv->incoming.frag.bytes_read += bytes_read; - priv->incoming.frag.fragcurrent += bytes_read; - - if (ret > 0) { - gf_log (this->name, GF_LOG_TRACE, - "partial read on non-blocking socket."); - break; - } - - if (ret == 0) { - priv->incoming.frag.simple_state - = SP_STATE_SIMPLE_MSG_INIT; - } - } - - return ret; -} - - -inline int -__socket_read_simple_request (rpc_transport_t *this) -{ - return __socket_read_simple_msg (this); -} - - -#define rpc_cred_addr(buf) (buf + RPC_MSGTYPE_SIZE + RPC_CALL_BODY_SIZE - 4) - -#define rpc_verf_addr(fragcurrent) (fragcurrent - 4) - - -inline int -__socket_read_vectored_request (rpc_transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = 0; - uint32_t credlen = 0, verflen = 0; - char *addr = NULL; - struct iobuf *iobuf = NULL; - uint32_t remaining_size = 0; - uint32_t gluster_write_proc_len = 0; - - priv = this->private; - - switch (priv->incoming.frag.call_body.request.vector_state) { - case SP_STATE_VECTORED_REQUEST_INIT: - addr = rpc_cred_addr (iobuf_ptr (priv->incoming.iobuf)); - - /* also read verf flavour and verflen */ - credlen = ntoh32 (*((uint32_t *)addr)) - + RPC_AUTH_FLAVOUR_N_LENGTH_SIZE; - - __socket_proto_init_pending (priv, credlen); - - priv->incoming.frag.call_body.request.vector_state = - SP_STATE_READING_CREDBYTES; - - /* fall through */ - - case SP_STATE_READING_CREDBYTES: - __socket_proto_read (priv, ret); - - priv->incoming.frag.call_body.request.vector_state = - SP_STATE_READ_CREDBYTES; - - /* fall through */ - - case SP_STATE_READ_CREDBYTES: - addr = rpc_verf_addr (priv->incoming.frag.fragcurrent); - - /* FIXME: Also handle procedures other than glusterfs-write - * here - */ - /* also read proc-header */ - gluster_write_proc_len = sizeof (gfs3_write_req); - - verflen = ntoh32 (*((uint32_t *)addr)) - + gluster_write_proc_len; - - __socket_proto_init_pending (priv, verflen); - - priv->incoming.frag.call_body.request.vector_state - = SP_STATE_READING_VERFBYTES; - - /* fall through */ - - case SP_STATE_READING_VERFBYTES: - __socket_proto_read (priv, ret); - - priv->incoming.frag.call_body.request.vector_state = - SP_STATE_READ_VERFBYTES; - - /* fall through */ - - case SP_STATE_READ_VERFBYTES: - if (priv->incoming.vectoriob == NULL) { - iobuf = iobuf_get (this->ctx->iobuf_pool); - if (!iobuf) { - gf_log (this->name, GF_LOG_ERROR, - "unable to allocate IO buffer " - "for peer %s", - this->peerinfo.identifier); - ret = -1; - break; - } - - priv->incoming.vectoriob = iobuf; - priv->incoming.frag.fragcurrent = iobuf_ptr (iobuf); - } - - priv->incoming.frag.call_body.request.vector_state = - SP_STATE_READING_PROG; - - /* fall through */ - - case SP_STATE_READING_PROG: - /* now read the remaining rpc msg into buffer pointed by - * fragcurrent - */ - - ret = __socket_read_simple_msg (this); - - remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) - - priv->incoming.frag.bytes_read; - - if ((ret == -1) - || ((ret == 0) - && (remaining_size == 0) - && RPC_LASTFRAG (priv->incoming.fraghdr))) { - priv->incoming.frag.call_body.request.vector_state - = SP_STATE_VECTORED_REQUEST_INIT; - priv->incoming.vectoriob_size - = (unsigned long)priv->incoming.frag.fragcurrent - - (unsigned long)iobuf_ptr (priv->incoming.vectoriob); - } - break; - } - - return ret; -} - - -#define rpc_msgtype_addr(buf) (buf + 4) - -#define rpc_prognum_addr(buf) (buf + RPC_MSGTYPE_SIZE + 4) - -#define rpc_procnum_addr(buf) (buf + RPC_MSGTYPE_SIZE + 12) - - -inline int -__socket_read_request (rpc_transport_t *this) -{ - socket_private_t *priv = NULL; - uint32_t prognum = 0, procnum = 0; - uint32_t remaining_size = 0; - int ret = -1; - char *buf = NULL; - - priv = this->private; - - switch (priv->incoming.frag.call_body.request.header_state) { - - case SP_STATE_REQUEST_HEADER_INIT: - - __socket_proto_init_pending (priv, RPC_CALL_BODY_SIZE); - - priv->incoming.frag.call_body.request.header_state - = SP_STATE_READING_RPCHDR1; - - /* fall through */ - - case SP_STATE_READING_RPCHDR1: - __socket_proto_read (priv, ret); - - priv->incoming.frag.call_body.request.header_state = - SP_STATE_READ_RPCHDR1; - - /* fall through */ - - case SP_STATE_READ_RPCHDR1: - buf = rpc_prognum_addr (iobuf_ptr (priv->incoming.iobuf)); - prognum = ntoh32 (*((uint32_t *)buf)); - - buf = rpc_procnum_addr (iobuf_ptr (priv->incoming.iobuf)); - procnum = ntoh32 (*((uint32_t *)buf)); - - if ((prognum == GLUSTER3_1_FOP_PROGRAM) - && (procnum == GF_FOP_WRITE)) { - ret = __socket_read_vectored_request (this); - } else { - ret = __socket_read_simple_request (this); - } - - remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) - - priv->incoming.frag.bytes_read; - - if ((ret == -1) - || ((ret == 0) - && (remaining_size == 0) - && (RPC_LASTFRAG (priv->incoming.fraghdr)))) { - priv->incoming.frag.call_body.request.header_state = - SP_STATE_REQUEST_HEADER_INIT; - } - - break; - } - - return ret; -} - - -inline int -__socket_read_accepted_successful_reply (rpc_transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = 0; - struct iobuf *iobuf = NULL; - uint32_t gluster_read_rsp_hdr_len = 0; - - priv = this->private; - - switch (priv->incoming.frag.call_body.reply.accepted_success_state) { - - case SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT: - gluster_read_rsp_hdr_len = sizeof (gfs3_read_rsp); - - __socket_proto_init_pending (priv, gluster_read_rsp_hdr_len); - - priv->incoming.frag.call_body.reply.accepted_success_state - = SP_STATE_READING_PROC_HEADER; - - /* fall through */ - - case SP_STATE_READING_PROC_HEADER: - __socket_proto_read (priv, ret); - - priv->incoming.frag.call_body.reply.accepted_success_state - = SP_STATE_READ_PROC_HEADER; - - /* fall through */ - - case SP_STATE_READ_PROC_HEADER: - if (priv->incoming.vectoriob == NULL) { - iobuf = iobuf_get (this->ctx->iobuf_pool); - if (iobuf == NULL) { - ret = -1; - goto out; - } - - priv->incoming.vectoriob = iobuf; - } - - priv->incoming.frag.fragcurrent - = iobuf_ptr (priv->incoming.vectoriob); - - /* now read the entire remaining msg into new iobuf */ - ret = __socket_read_simple_msg (this); - if ((ret == -1) - || ((ret == 0) - && RPC_LASTFRAG (priv->incoming.fraghdr))) { - priv->incoming.frag.call_body.reply.accepted_success_state - = SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT; - } - - break; - } - -out: - return ret; -} - -#define rpc_reply_verflen_addr(fragcurrent) ((char *)fragcurrent - 4) -#define rpc_reply_accept_status_addr(fragcurrent) ((char *)fragcurrent - 4) - -inline int -__socket_read_accepted_reply (rpc_transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = -1; - char *buf = NULL; - uint32_t verflen = 0, len = 0; - uint32_t remaining_size = 0; - - priv = this->private; - - switch (priv->incoming.frag.call_body.reply.accepted_state) { - - case SP_STATE_ACCEPTED_REPLY_INIT: - __socket_proto_init_pending (priv, - RPC_AUTH_FLAVOUR_N_LENGTH_SIZE); - - priv->incoming.frag.call_body.reply.accepted_state - = SP_STATE_READING_REPLY_VERFLEN; - - /* fall through */ - - case SP_STATE_READING_REPLY_VERFLEN: - __socket_proto_read (priv, ret); - - priv->incoming.frag.call_body.reply.accepted_state - = SP_STATE_READ_REPLY_VERFLEN; - - /* fall through */ - - case SP_STATE_READ_REPLY_VERFLEN: - buf = rpc_reply_verflen_addr (priv->incoming.frag.fragcurrent); - - verflen = ntoh32 (*((uint32_t *) buf)); - - /* also read accept status along with verf data */ - len = verflen + RPC_ACCEPT_STATUS_LEN; - - __socket_proto_init_pending (priv, len); - - priv->incoming.frag.call_body.reply.accepted_state - = SP_STATE_READING_REPLY_VERFBYTES; - - /* fall through */ - - case SP_STATE_READING_REPLY_VERFBYTES: - __socket_proto_read (priv, ret); - - priv->incoming.frag.call_body.reply.accepted_state - = SP_STATE_READ_REPLY_VERFBYTES; - - buf = rpc_reply_accept_status_addr (priv->incoming.frag.fragcurrent); - - priv->incoming.frag.call_body.reply.accept_status - = ntoh32 (*(uint32_t *) buf); - - /* fall through */ - - case SP_STATE_READ_REPLY_VERFBYTES: - - if (priv->incoming.frag.call_body.reply.accept_status - == SUCCESS) { - ret = __socket_read_accepted_successful_reply (this); - } else { - /* read entire remaining msg into buffer pointed to by - * fragcurrent - */ - ret = __socket_read_simple_msg (this); - } - - remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) - - priv->incoming.frag.bytes_read; - - if ((ret == -1) - || ((ret == 0) - && (remaining_size == 0) - && (RPC_LASTFRAG (priv->incoming.fraghdr)))) { - priv->incoming.frag.call_body.reply.accepted_state - = SP_STATE_ACCEPTED_REPLY_INIT; - } - - break; - } - - return ret; -} - - -inline int -__socket_read_denied_reply (rpc_transport_t *this) -{ - return __socket_read_simple_msg (this); -} - - -#define rpc_reply_status_addr(fragcurrent) ((char *)fragcurrent - 4) - - -inline int -__socket_read_vectored_reply (rpc_transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = 0; - char *buf = NULL; - uint32_t remaining_size = 0; - - priv = this->private; - - switch (priv->incoming.frag.call_body.reply.status_state) { - - case SP_STATE_ACCEPTED_REPLY_INIT: - __socket_proto_init_pending (priv, RPC_REPLY_STATUS_SIZE); - - priv->incoming.frag.call_body.reply.status_state - = SP_STATE_READING_REPLY_STATUS; - - /* fall through */ - - case SP_STATE_READING_REPLY_STATUS: - __socket_proto_read (priv, ret); - - buf = rpc_reply_status_addr (priv->incoming.frag.fragcurrent); - - priv->incoming.frag.call_body.reply.accept_status - = ntoh32 (*((uint32_t *) buf)); - - priv->incoming.frag.call_body.reply.status_state - = SP_STATE_READ_REPLY_STATUS; - - /* fall through */ - - case SP_STATE_READ_REPLY_STATUS: - if (priv->incoming.frag.call_body.reply.accept_status - == MSG_ACCEPTED) { - ret = __socket_read_accepted_reply (this); - } else { - ret = __socket_read_denied_reply (this); - } - - remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) - - priv->incoming.frag.bytes_read; - - if ((ret == -1) - || ((ret == 0) - && (remaining_size == 0) - && (RPC_LASTFRAG (priv->incoming.fraghdr)))) { - priv->incoming.frag.call_body.reply.status_state - = SP_STATE_ACCEPTED_REPLY_INIT; - } - break; - } - - return ret; -} - - -inline int -__socket_read_simple_reply (rpc_transport_t *this) -{ - return __socket_read_simple_msg (this); -} - -#define rpc_xid_addr(buf) (buf) - -inline int -__socket_read_reply (rpc_transport_t *this) -{ - socket_private_t *priv = NULL; - char *buf = NULL; - int32_t ret = -1; - rpc_request_info_t *request_info = NULL; - - priv = this->private; - - buf = rpc_xid_addr (iobuf_ptr (priv->incoming.iobuf)); - - request_info = GF_CALLOC (1, sizeof (*request_info), gf_common_mt_rpc_trans_reqinfo_t); - if (request_info == NULL) { - gf_log (this->name, GF_LOG_ERROR, "out of memory"); - goto out; - } - - priv->incoming.request_info = request_info; - - request_info->xid = ntoh32 (*((uint32_t *) buf)); - - /* release priv->lock, so as to avoid deadlock b/w conn->lock and - * priv->lock, since we are doing an upcall here. - */ - pthread_mutex_unlock (&priv->lock); - { - ret = rpc_transport_notify (this, RPC_TRANSPORT_MAP_XID_REQUEST, - priv->incoming.request_info); - } - pthread_mutex_lock (&priv->lock); - - if (ret == -1) { - goto out; - } - - if ((request_info->prognum == GLUSTER3_1_FOP_PROGRAM) - && (request_info->procnum == GF_FOP_READ)) { - if (request_info->rsp.rspbuf != NULL) { - priv->incoming.vectoriob - = iobuf_ref (request_info->rsp.rspbuf); - } - - ret = __socket_read_vectored_reply (this); - } else { - ret = __socket_read_simple_reply (this); - } -out: - return ret; -} - - -/* returns the number of bytes yet to be read in a fragment */ -inline int -__socket_read_frag (rpc_transport_t *this) -{ - socket_private_t *priv = NULL; - int32_t ret = 0; - char *buf = NULL; - uint32_t remaining_size = 0; - - priv = this->private; - - switch (priv->incoming.frag.state) { - case SP_STATE_NADA: - __socket_proto_init_pending (priv, RPC_MSGTYPE_SIZE); - - priv->incoming.frag.state = SP_STATE_READING_MSGTYPE; - - /* fall through */ - - case SP_STATE_READING_MSGTYPE: - __socket_proto_read (priv, ret); - - priv->incoming.frag.state = SP_STATE_READ_MSGTYPE; - /* fall through */ - - case SP_STATE_READ_MSGTYPE: - buf = rpc_msgtype_addr (iobuf_ptr (priv->incoming.iobuf)); - priv->incoming.msg_type = ntoh32 (*((uint32_t *)buf)); - - if (priv->incoming.msg_type == CALL) { - ret = __socket_read_request (this); - } else if (priv->incoming.msg_type == REPLY) { - ret = __socket_read_reply (this); - } else { - gf_log ("rpc", GF_LOG_ERROR, - "wrong MSG-TYPE (%d) received", - priv->incoming.msg_type); - ret = -1; - } - - remaining_size = RPC_FRAGSIZE (priv->incoming.fraghdr) - - priv->incoming.frag.bytes_read; - - if ((ret == -1) - || ((ret == 0) - && (remaining_size == 0) - && (RPC_LASTFRAG (priv->incoming.fraghdr)))) { - priv->incoming.frag.state = SP_STATE_NADA; - } - - break; - } - - return ret; -} - - -inline -void __socket_reset_priv (socket_private_t *priv) -{ - if (priv->incoming.iobuf) { - iobuf_unref (priv->incoming.iobuf); - priv->incoming.iobuf = NULL; - } - - if (priv->incoming.vectoriob) { - iobuf_unref (priv->incoming.vectoriob); - priv->incoming.vectoriob = NULL; - } -} - - -int -__socket_proto_state_machine (rpc_transport_t *this, - rpc_transport_pollin_t **pollin) -{ - int ret = -1; - socket_private_t *priv = NULL; - struct iobuf *iobuf = NULL; - - priv = this->private; - while (priv->incoming.record_state != SP_STATE_COMPLETE) { - switch (priv->incoming.record_state) { - - case SP_STATE_NADA: - iobuf = iobuf_get (this->ctx->iobuf_pool); - if (!iobuf) { - gf_log (this->name, GF_LOG_ERROR, - "unable to allocate IO buffer " - "for peer %s", - this->peerinfo.identifier); - ret = -ENOMEM; - goto out; - } - - priv->incoming.iobuf = iobuf; - priv->incoming.iobuf_size = 0; - priv->incoming.vectoriob_size = 0; - - priv->incoming.pending_vector = priv->incoming.vector; - priv->incoming.pending_vector->iov_base = - &priv->incoming.fraghdr; - - priv->incoming.frag.fragcurrent = iobuf_ptr (iobuf); - priv->incoming.pending_vector->iov_len = - sizeof (priv->incoming.fraghdr); - - priv->incoming.record_state = SP_STATE_READING_FRAGHDR; - - /* fall through */ - - case SP_STATE_READING_FRAGHDR: - ret = __socket_readv (this, - priv->incoming.pending_vector, 1, - &priv->incoming.pending_vector, - &priv->incoming.pending_count, - NULL); - if (ret == -1) { - gf_log (this->name, GF_LOG_TRACE, - "reading from socket failed. Error (%s), " - "peer (%s)", strerror (errno), - this->peerinfo.identifier); - goto out; - } - - if (ret > 0) { - gf_log (this->name, GF_LOG_TRACE, "partial " - "fragment header read"); - goto out; - } - - if (ret == 0) { - priv->incoming.record_state = - SP_STATE_READ_FRAGHDR; - } - /* fall through */ - - case SP_STATE_READ_FRAGHDR: - - priv->incoming.fraghdr = ntoh32 (priv->incoming.fraghdr); - priv->incoming.record_state = SP_STATE_READING_FRAG; - priv->incoming.total_bytes_read - += RPC_FRAGSIZE(priv->incoming.fraghdr); - /* fall through */ - - case SP_STATE_READING_FRAG: - ret = __socket_read_frag (this); - - if ((ret == -1) - || (priv->incoming.frag.bytes_read != - RPC_FRAGSIZE (priv->incoming.fraghdr))) { - goto out; - } - - priv->incoming.frag.bytes_read = 0; - - if (!RPC_LASTFRAG (priv->incoming.fraghdr)) { - priv->incoming.record_state = - SP_STATE_READING_FRAGHDR; - break; - } - - /* we've read the entire rpc record, notify the - * upper layers. - */ - if (pollin != NULL) { - priv->incoming.iobuf_size - = priv->incoming.total_bytes_read - - priv->incoming.vectoriob_size; - - *pollin = rpc_transport_pollin_alloc (this, - priv->incoming.iobuf, - priv->incoming.iobuf_size, - priv->incoming.vectoriob, - priv->incoming.vectoriob_size, - priv->incoming.request_info); - if (*pollin == NULL) { - ret = -1; - goto out; - } - - priv->incoming.request_info = NULL; - } - priv->incoming.record_state = SP_STATE_COMPLETE; - break; - - case SP_STATE_COMPLETE: - /* control should not reach here */ - gf_log (this->name, GF_LOG_DEBUG, "control reached to " - "SP_STATE_COMPLETE, which should not have " - "happened"); - break; - } - } - - if (priv->incoming.record_state == SP_STATE_COMPLETE) { - priv->incoming.record_state = SP_STATE_NADA; - __socket_reset_priv (priv); - } - -out: - if ((ret == -1) && (errno == EAGAIN)) { - ret = 0; - } - return ret; -} - - -int -socket_proto_state_machine (rpc_transport_t *this, - rpc_transport_pollin_t **pollin) -{ - socket_private_t *priv = NULL; - int ret = 0; - - priv = this->private; - - pthread_mutex_lock (&priv->lock); - { - ret = __socket_proto_state_machine (this, pollin); - } - pthread_mutex_unlock (&priv->lock); - - return ret; -} - - -int -socket_event_poll_in (rpc_transport_t *this) -{ - int ret = -1; - rpc_transport_pollin_t *pollin = NULL; - - ret = socket_proto_state_machine (this, &pollin); - - if (pollin != NULL) { - ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED, - pollin); - - rpc_transport_pollin_destroy (pollin); - } - - return ret; -} - - -int -socket_connect_finish (rpc_transport_t *this) -{ - int ret = -1; - socket_private_t *priv = NULL; - rpc_transport_event_t event = 0; - char notify_rpc = 0; - - priv = this->private; - - pthread_mutex_lock (&priv->lock); - { - if (priv->connected) - goto unlock; - - ret = __socket_connect_finish (priv->sock); - - if (ret == -1 && errno == EINPROGRESS) - ret = 1; - - if (ret == -1 && errno != EINPROGRESS) { - if (!priv->connect_finish_log) { - gf_log (this->name, GF_LOG_ERROR, - "connection to %s failed (%s)", - this->peerinfo.identifier, - strerror (errno)); - priv->connect_finish_log = 1; - } - __socket_disconnect (this); - notify_rpc = 1; - event = RPC_TRANSPORT_DISCONNECT; - goto unlock; - } - - if (ret == 0) { - notify_rpc = 1; - - this->myinfo.sockaddr_len = - sizeof (this->myinfo.sockaddr); - - ret = getsockname (priv->sock, - SA (&this->myinfo.sockaddr), - &this->myinfo.sockaddr_len); - if (ret == -1) { - gf_log (this->name, GF_LOG_DEBUG, - "getsockname on (%d) failed (%s)", - priv->sock, strerror (errno)); - __socket_disconnect (this); - event = GF_EVENT_POLLERR; - goto unlock; - } - - priv->connected = 1; - priv->connect_finish_log = 0; - event = RPC_TRANSPORT_CONNECT; - get_transport_identifiers (this); - } - } -unlock: - pthread_mutex_unlock (&priv->lock); - - if (notify_rpc) { - rpc_transport_notify (this, event, this); - } - - return 0; -} - - -/* reads rpc_requests during pollin */ -int -socket_event_handler (int fd, int idx, void *data, - int poll_in, int poll_out, int poll_err) -{ - rpc_transport_t *this = NULL; - socket_private_t *priv = NULL; - int ret = 0; - - this = data; - priv = this->private; - - pthread_mutex_lock (&priv->lock); - { - priv->idx = idx; - } - pthread_mutex_unlock (&priv->lock); - - if (!priv->connected) { - ret = socket_connect_finish (this); - } - - if (!ret && poll_out) { - ret = socket_event_poll_out (this); - } - - if (!ret && poll_in) { - ret = socket_event_poll_in (this); - } - - if ((ret < 0) || poll_err) { - gf_log ("transport", GF_LOG_TRACE, "disconnecting now"); - socket_event_poll_err (this); - rpc_transport_unref (this); - } - - return 0; -} - - -int -socket_server_event_handler (int fd, int idx, void *data, - int poll_in, int poll_out, int poll_err) -{ - rpc_transport_t *this = NULL; - socket_private_t *priv = NULL; - int ret = 0; - int new_sock = -1; - rpc_transport_t *new_trans = NULL; - struct sockaddr_storage new_sockaddr = {0, }; - socklen_t addrlen = sizeof (new_sockaddr); - socket_private_t *new_priv = NULL; - glusterfs_ctx_t *ctx = NULL; - - this = data; - priv = this->private; - ctx = this->ctx; - - pthread_mutex_lock (&priv->lock); - { - priv->idx = idx; - - if (poll_in) { - new_sock = accept (priv->sock, SA (&new_sockaddr), - &addrlen); - - if (new_sock == -1) - goto unlock; - - if (!priv->bio) { - ret = __socket_nonblock (new_sock); - - if (ret == -1) { - gf_log (this->name, GF_LOG_DEBUG, - "NBIO on %d failed (%s)", - new_sock, strerror (errno)); - - close (new_sock); - goto unlock; - } - } - - if (priv->nodelay) { - ret = __socket_nodelay (new_sock); - if (ret == -1) { - gf_log (this->name, GF_LOG_ERROR, - "setsockopt() failed for " - "NODELAY (%s)", - strerror (errno)); - } - } - - new_trans = GF_CALLOC (1, sizeof (*new_trans), - gf_common_mt_rpc_trans_t); - new_trans->fini = this->fini; - new_trans->name = gf_strdup (this->name); - - memcpy (&new_trans->peerinfo.sockaddr, &new_sockaddr, - addrlen); - new_trans->peerinfo.sockaddr_len = addrlen; - - new_trans->myinfo.sockaddr_len = - sizeof (new_trans->myinfo.sockaddr); - - ret = getsockname (new_sock, - SA (&new_trans->myinfo.sockaddr), - &new_trans->myinfo.sockaddr_len); - if (ret == -1) { - gf_log (this->name, GF_LOG_DEBUG, - "getsockname on %d failed (%s)", - new_sock, strerror (errno)); - close (new_sock); - goto unlock; - } - - get_transport_identifiers (new_trans); - socket_init (new_trans); - new_trans->ops = this->ops; - new_trans->init = this->init; - new_trans->fini = this->fini; - new_trans->ctx = ctx; - new_trans->mydata = this->mydata; - new_trans->notify = this->notify; - new_priv = new_trans->private; - - pthread_mutex_lock (&new_priv->lock); - { - new_priv->sock = new_sock; - new_priv->connected = 1; - rpc_transport_ref (new_trans); - - new_priv->idx = - event_register (ctx->event_pool, - new_sock, - socket_event_handler, - new_trans, 1, 0); - - if (new_priv->idx == -1) - ret = -1; - } - pthread_mutex_unlock (&new_priv->lock); - ret = rpc_transport_notify (this, RPC_TRANSPORT_ACCEPT, new_trans); - } - } -unlock: - pthread_mutex_unlock (&priv->lock); - - return ret; -} - - -int -socket_disconnect (rpc_transport_t *this) -{ - socket_private_t *priv = NULL; - int ret = -1; - - priv = this->private; - - pthread_mutex_lock (&priv->lock); - { - ret = __socket_disconnect (this); - } - pthread_mutex_unlock (&priv->lock); - - return ret; -} - - -int -socket_connect (rpc_transport_t *this) -{ - int ret = -1; - int sock = -1; - socket_private_t *priv = NULL; - struct sockaddr_storage sockaddr = {0, }; - socklen_t sockaddr_len = 0; - glusterfs_ctx_t *ctx = NULL; - sa_family_t sa_family = {0, }; - - priv = this->private; - ctx = this->ctx; - - if (!priv) { - gf_log (this->name, GF_LOG_DEBUG, - "connect() called on uninitialized transport"); - goto err; - } - - pthread_mutex_lock (&priv->lock); - { - sock = priv->sock; - } - pthread_mutex_unlock (&priv->lock); - - if (sock != -1) { - gf_log (this->name, GF_LOG_TRACE, - "connect () called on transport already connected"); - ret = 0; - goto err; - } - - ret = socket_client_get_remote_sockaddr (this, SA (&sockaddr), - &sockaddr_len, &sa_family); - if (ret == -1) { - /* logged inside client_get_remote_sockaddr */ - goto err; - } - - pthread_mutex_lock (&priv->lock); - { - if (priv->sock != -1) { - gf_log (this->name, GF_LOG_TRACE, - "connect() -- already connected"); - goto unlock; - } - - memcpy (&this->peerinfo.sockaddr, &sockaddr, sockaddr_len); - this->peerinfo.sockaddr_len = sockaddr_len; - - priv->sock = socket (sa_family, SOCK_STREAM, 0); - if (priv->sock == -1) { - gf_log (this->name, GF_LOG_ERROR, - "socket creation failed (%s)", - strerror (errno)); - goto unlock; - } - - /* Cant help if setting socket options fails. We can continue - * working nonetheless. - */ - if (setsockopt (priv->sock, SOL_SOCKET, SO_RCVBUF, - &priv->windowsize, - sizeof (priv->windowsize)) < 0) { - gf_log (this->name, GF_LOG_ERROR, - "setting receive window size failed: %d: %d: " - "%s", priv->sock, priv->windowsize, - strerror (errno)); - } - - if (setsockopt (priv->sock, SOL_SOCKET, SO_SNDBUF, - &priv->windowsize, - sizeof (priv->windowsize)) < 0) { - gf_log (this->name, GF_LOG_ERROR, - "setting send window size failed: %d: %d: " - "%s", priv->sock, priv->windowsize, - strerror (errno)); - } - - - if (priv->nodelay && priv->lowlat) { - ret = __socket_nodelay (priv->sock); - if (ret == -1) { - gf_log (this->name, GF_LOG_ERROR, - "setsockopt() failed for NODELAY (%s)", - strerror (errno)); - } - } - - if (!priv->bio) { - ret = __socket_nonblock (priv->sock); - - if (ret == -1) { - gf_log (this->name, GF_LOG_ERROR, - "NBIO on %d failed (%s)", - priv->sock, strerror (errno)); - close (priv->sock); - priv->sock = -1; - goto unlock; - } - } - - SA (&this->myinfo.sockaddr)->sa_family = - SA (&this->peerinfo.sockaddr)->sa_family; - - ret = client_bind (this, SA (&this->myinfo.sockaddr), - &this->myinfo.sockaddr_len, priv->sock); - if (ret == -1) { - gf_log (this->name, GF_LOG_WARNING, - "client bind failed: %s", strerror (errno)); - close (priv->sock); - priv->sock = -1; - goto unlock; - } - - ret = connect (priv->sock, SA (&this->peerinfo.sockaddr), - this->peerinfo.sockaddr_len); - - if (ret == -1 && errno != EINPROGRESS) { - gf_log (this->name, GF_LOG_ERROR, - "connection attempt failed (%s)", - strerror (errno)); - close (priv->sock); - priv->sock = -1; - goto unlock; - } - - priv->connected = 0; - - rpc_transport_ref (this); - - priv->idx = event_register (ctx->event_pool, priv->sock, - socket_event_handler, this, 1, 1); - if (priv->idx == -1) - ret = -1; - } -unlock: - pthread_mutex_unlock (&priv->lock); - -err: - return ret; -} - - -int -socket_listen (rpc_transport_t *this) -{ - socket_private_t * priv = NULL; - int ret = -1; - int sock = -1; - struct sockaddr_storage sockaddr; - socklen_t sockaddr_len; - peer_info_t *myinfo = NULL; - glusterfs_ctx_t *ctx = NULL; - sa_family_t sa_family = {0, }; - - priv = this->private; - myinfo = &this->myinfo; - ctx = this->ctx; - - pthread_mutex_lock (&priv->lock); - { - sock = priv->sock; - } - pthread_mutex_unlock (&priv->lock); - - if (sock != -1) { - gf_log (this->name, GF_LOG_DEBUG, - "alreading listening"); - return ret; - } - - ret = socket_server_get_local_sockaddr (this, SA (&sockaddr), - &sockaddr_len, &sa_family); - if (ret == -1) { - return ret; - } - - pthread_mutex_lock (&priv->lock); - { - if (priv->sock != -1) { - gf_log (this->name, GF_LOG_DEBUG, - "already listening"); - goto unlock; - } - - memcpy (&myinfo->sockaddr, &sockaddr, sockaddr_len); - myinfo->sockaddr_len = sockaddr_len; - - priv->sock = socket (sa_family, SOCK_STREAM, 0); - - if (priv->sock == -1) { - gf_log (this->name, GF_LOG_ERROR, - "socket creation failed (%s)", - strerror (errno)); - goto unlock; - } - - /* Cant help if setting socket options fails. We can continue - * working nonetheless. - */ - if (setsockopt (priv->sock, SOL_SOCKET, SO_RCVBUF, - &priv->windowsize, - sizeof (priv->windowsize)) < 0) { - gf_log (this->name, GF_LOG_ERROR, - "setting receive window size failed: %d: %d: " - "%s", priv->sock, priv->windowsize, - strerror (errno)); - } - - if (setsockopt (priv->sock, SOL_SOCKET, SO_SNDBUF, - &priv->windowsize, - sizeof (priv->windowsize)) < 0) { - gf_log (this->name, GF_LOG_ERROR, - "setting send window size failed: %d: %d: " - "%s", priv->sock, priv->windowsize, - strerror (errno)); - } - - if (priv->nodelay) { - ret = __socket_nodelay (priv->sock); - if (ret == -1) { - gf_log (this->name, GF_LOG_ERROR, - "setsockopt() failed for NODELAY (%s)", - strerror (errno)); - } - } - - if (!priv->bio) { - ret = __socket_nonblock (priv->sock); - - if (ret == -1) { - gf_log (this->name, GF_LOG_ERROR, - "NBIO on %d failed (%s)", - priv->sock, strerror (errno)); - close (priv->sock); - priv->sock = -1; - goto unlock; - } - } - - ret = __socket_server_bind (this); - - if (ret == -1) { - /* logged inside __socket_server_bind() */ - close (priv->sock); - priv->sock = -1; - goto unlock; - } - - ret = listen (priv->sock, 10); - - if (ret == -1) { - gf_log (this->name, GF_LOG_ERROR, - "could not set socket %d to listen mode (%s)", - priv->sock, strerror (errno)); - close (priv->sock); - priv->sock = -1; - goto unlock; - } - - rpc_transport_ref (this); - - priv->idx = event_register (ctx->event_pool, priv->sock, - socket_server_event_handler, - this, 1, 0); - - if (priv->idx == -1) { - gf_log (this->name, GF_LOG_DEBUG, - "could not register socket %d with events", - priv->sock); - ret = -1; - close (priv->sock); - priv->sock = -1; - goto unlock; - } - } -unlock: - pthread_mutex_unlock (&priv->lock); - - return ret; -} - - -/* TODO: implement per transfer limit */ -#if 0 -int -socket_submit (rpc_transport_t *this, char *buf, int len, - struct iovec *vector, int count, - struct iobref *iobref) -{ - socket_private_t *priv = NULL; - int ret = -1; - char need_poll_out = 0; - char need_append = 1; - struct ioq *entry = NULL; - glusterfs_ctx_t *ctx = NULL; - - priv = this->private; - ctx = this->ctx; - - pthread_mutex_lock (&priv->lock); - { - if (priv->connected != 1) { - if (!priv->submit_log && !priv->connect_finish_log) { - gf_log (this->name, GF_LOG_DEBUG, - "not connected (priv->connected = %d)", - priv->connected); - priv->submit_log = 1; - } - goto unlock; - } - - priv->submit_log = 0; - entry = __socket_ioq_new (this, buf, len, vector, count, iobref); - if (!entry) - goto unlock; - - if (list_empty (&priv->ioq)) { - ret = __socket_ioq_churn_entry (this, entry); - - if (ret == 0) - need_append = 0; - - if (ret > 0) - need_poll_out = 1; - } - - if (need_append) { - list_add_tail (&entry->list, &priv->ioq); - ret = 0; - } - - if (need_poll_out) { - /* first entry to wait. continue writing on POLLOUT */ - priv->idx = event_select_on (ctx->event_pool, - priv->sock, - priv->idx, -1, 1); - } - } -unlock: - pthread_mutex_unlock (&priv->lock); - - return ret; -} -#endif - - -int32_t -socket_submit_request (rpc_transport_t *this, rpc_transport_req_t *req) -{ - socket_private_t *priv = NULL; - int ret = -1; - char need_poll_out = 0; - char need_append = 1; - struct ioq *entry = NULL; - glusterfs_ctx_t *ctx = NULL; - - priv = this->private; - ctx = this->ctx; - - pthread_mutex_lock (&priv->lock); - { - if (priv->connected != 1) { - if (!priv->submit_log && !priv->connect_finish_log) { - gf_log (this->name, GF_LOG_DEBUG, - "not connected (priv->connected = %d)", - priv->connected); - priv->submit_log = 1; - } - goto unlock; - } - - priv->submit_log = 0; - entry = __socket_ioq_new (this, &req->msg); - if (!entry) - goto unlock; - - if (list_empty (&priv->ioq)) { - ret = __socket_ioq_churn_entry (this, entry); - - if (ret == 0) - need_append = 0; - - if (ret > 0) - need_poll_out = 1; - } - - if (need_append) { - list_add_tail (&entry->list, &priv->ioq); - ret = 0; - } - - if (need_poll_out) { - /* first entry to wait. continue writing on POLLOUT */ - priv->idx = event_select_on (ctx->event_pool, - priv->sock, - priv->idx, -1, 1); - } - } -unlock: - pthread_mutex_unlock (&priv->lock); - - return ret; -} - - -int32_t -socket_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply) -{ - socket_private_t *priv = NULL; - int ret = -1; - char need_poll_out = 0; - char need_append = 1; - struct ioq *entry = NULL; - glusterfs_ctx_t *ctx = NULL; - - priv = this->private; - ctx = this->ctx; - - pthread_mutex_lock (&priv->lock); - { - if (priv->connected != 1) { - if (!priv->submit_log && !priv->connect_finish_log) { - gf_log (this->name, GF_LOG_DEBUG, - "not connected (priv->connected = %d)", - priv->connected); - priv->submit_log = 1; - } - goto unlock; - } - priv->submit_log = 0; - entry = __socket_ioq_new (this, &reply->msg); - if (!entry) - goto unlock; - if (list_empty (&priv->ioq)) { - ret = __socket_ioq_churn_entry (this, entry); - - if (ret == 0) - need_append = 0; - - if (ret > 0) - need_poll_out = 1; - } - - if (need_append) { - list_add_tail (&entry->list, &priv->ioq); - ret = 0; - } - - if (need_poll_out) { - /* first entry to wait. continue writing on POLLOUT */ - priv->idx = event_select_on (ctx->event_pool, - priv->sock, - priv->idx, -1, 1); - } - } - -unlock: - pthread_mutex_unlock (&priv->lock); - - return ret; -} - - -int32_t -socket_getpeername (rpc_transport_t *this, char *hostname, int hostlen) -{ - int32_t ret = -1; - - if ((this == NULL) || (hostname == NULL)) { - goto out; - } - - if (hostlen < (strlen (this->peerinfo.identifier) + 1)) { - goto out; - } - - strcpy (hostname, this->peerinfo.identifier); - ret = 0; -out: - return ret; -} - - -int32_t -socket_getpeeraddr (rpc_transport_t *this, char *peeraddr, int addrlen, - struct sockaddr *sa, socklen_t salen) -{ - int32_t ret = -1; - - if ((this == NULL) || (sa == NULL)) { - goto out; - } - - *sa = *((struct sockaddr *)&this->peerinfo.sockaddr); - - if (peeraddr != NULL) { - ret = socket_getpeername (this, peeraddr, addrlen); - } - -out: - return ret; -} - - -int32_t -socket_getmyname (rpc_transport_t *this, char *hostname, int hostlen) -{ - int32_t ret = -1; - - if ((this == NULL) || (hostname == NULL)) { - goto out; - } - - if (hostlen < (strlen (this->myinfo.identifier) + 1)) { - goto out; - } - - strcpy (hostname, this->myinfo.identifier); - ret = 0; -out: - return ret; -} - - -int32_t -socket_getmyaddr (rpc_transport_t *this, char *myaddr, int addrlen, - struct sockaddr *sa, socklen_t salen) -{ - int32_t ret = -1; - - if ((this == NULL) || (sa == NULL)) { - goto out; - } - - *sa = *((struct sockaddr *)&this->myinfo.sockaddr); - - if (myaddr != NULL) { - ret = socket_getmyname (this, myaddr, addrlen); - } - -out: - return ret; -} - - -struct rpc_transport_ops tops = { - .listen = socket_listen, - .connect = socket_connect, - .disconnect = socket_disconnect, - .submit_request = socket_submit_request, - .submit_reply = socket_submit_reply, - .get_peername = socket_getpeername, - .get_peeraddr = socket_getpeeraddr, - .get_myname = socket_getmyname, - .get_myaddr = socket_getmyaddr -}; - - -int -socket_init (rpc_transport_t *this) -{ - socket_private_t *priv = NULL; - gf_boolean_t tmp_bool = 0; - uint64_t windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE; - char *optstr = NULL; - - if (this->private) { - gf_log (this->name, GF_LOG_DEBUG, - "double init attempted"); - return -1; - } - - priv = GF_CALLOC (1, sizeof (*priv), gf_common_mt_socket_private_t); - if (!priv) { - gf_log (this->name, GF_LOG_ERROR, - "calloc (1, %"GF_PRI_SIZET") returned NULL", - sizeof (*priv)); - return -1; - } - - pthread_mutex_init (&priv->lock, NULL); - - priv->sock = -1; - priv->idx = -1; - priv->connected = -1; - - INIT_LIST_HEAD (&priv->ioq); - - if (dict_get (this->options, "non-blocking-io")) { - optstr = data_to_str (dict_get (this->options, - "non-blocking-io")); - - if (gf_string2boolean (optstr, &tmp_bool) == -1) { - gf_log (this->name, GF_LOG_ERROR, - "'non-blocking-io' takes only boolean options," - " not taking any action"); - tmp_bool = 1; - } - priv->bio = 0; - if (!tmp_bool) { - priv->bio = 1; - gf_log (this->name, GF_LOG_WARNING, - "disabling non-blocking IO"); - } - } - - optstr = NULL; - - // By default, we enable NODELAY - priv->nodelay = 1; - if (dict_get (this->options, "transport.socket.nodelay")) { - optstr = data_to_str (dict_get (this->options, - "transport.socket.nodelay")); - - if (gf_string2boolean (optstr, &tmp_bool) == -1) { - gf_log (this->name, GF_LOG_ERROR, - "'transport.socket.nodelay' takes only " - "boolean options, not taking any action"); - tmp_bool = 1; - } - if (!tmp_bool) { - priv->nodelay = 0; - gf_log (this->name, GF_LOG_DEBUG, - "disabling nodelay"); - } - } - - - optstr = NULL; - if (dict_get_str (this->options, "transport.window-size", - &optstr) == 0) { - if (gf_string2bytesize (optstr, &windowsize) != 0) { - gf_log (this->name, GF_LOG_ERROR, - "invalid number format: %s", optstr); - return -1; - } - } - - optstr = NULL; - - if (dict_get_str (this->options, "transport.socket.lowlat", - &optstr) == 0) { - priv->lowlat = 1; - } - - priv->windowsize = (int)windowsize; - this->private = priv; - - return 0; -} - - -void -fini (rpc_transport_t *this) -{ - socket_private_t *priv = this->private; - - gf_log (this->name, GF_LOG_TRACE, - "transport %p destroyed", this); - - pthread_mutex_destroy (&priv->lock); - - GF_FREE (this->name); - GF_FREE (priv); -} - - -int32_t -init (rpc_transport_t *this) -{ - int ret = -1; - - ret = socket_init (this); - - if (ret == -1) { - gf_log (this->name, GF_LOG_DEBUG, "socket_init() failed"); - } - - return ret; -} - -struct volume_options options[] = { - { .key = {"remote-port", - "transport.remote-port", - "transport.socket.remote-port"}, - .type = GF_OPTION_TYPE_INT - }, - { .key = {"transport.socket.listen-port", "listen-port"}, - .type = GF_OPTION_TYPE_INT - }, - { .key = {"transport.socket.bind-address", "bind-address" }, - .type = GF_OPTION_TYPE_INTERNET_ADDRESS - }, - { .key = {"transport.socket.connect-path", "connect-path"}, - .type = GF_OPTION_TYPE_ANY - }, - { .key = {"transport.socket.bind-path", "bind-path"}, - .type = GF_OPTION_TYPE_ANY - }, - { .key = {"transport.socket.listen-path", "listen-path"}, - .type = GF_OPTION_TYPE_ANY - }, - { .key = { "transport.address-family", - "address-family" }, - .value = {"inet", "inet6", "inet/inet6", "inet6/inet", - "unix", "inet-sdp" }, - .type = GF_OPTION_TYPE_STR - }, - - { .key = {"non-blocking-io"}, - .type = GF_OPTION_TYPE_BOOL - }, - { .key = {"transport.window-size"}, - .type = GF_OPTION_TYPE_SIZET, - .min = GF_MIN_SOCKET_WINDOW_SIZE, - .max = GF_MAX_SOCKET_WINDOW_SIZE, - }, - { .key = {"transport.socket.nodelay"}, - .type = GF_OPTION_TYPE_BOOL - }, - { .key = {"transport.socket.lowlat"}, - .type = GF_OPTION_TYPE_BOOL - }, - { .key = {NULL} } -}; diff --git a/xlators/protocol/rpc/rpc-transport/socket/src/socket.h b/xlators/protocol/rpc/rpc-transport/socket/src/socket.h deleted file mode 100644 index aa31ee2a7ef..00000000000 --- a/xlators/protocol/rpc/rpc-transport/socket/src/socket.h +++ /dev/null @@ -1,190 +0,0 @@ -/* - Copyright (c) 2010 Gluster, Inc. - This file is part of GlusterFS. - - GlusterFS is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published - by the Free Software Foundation; either version 3 of the License, - or (at your option) any later version. - - GlusterFS is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see - . -*/ - -#ifndef _SOCKET_H -#define _SOCKET_H - - -#ifndef _CONFIG_H -#define _CONFIG_H -#include "config.h" -#endif - -#include "event.h" -#include "rpc-transport.h" -#include "logging.h" -#include "dict.h" -#include "mem-pool.h" - -#ifndef MAX_IOVEC -#define MAX_IOVEC 16 -#endif /* MAX_IOVEC */ - -#define GF_DEFAULT_SOCKET_LISTEN_PORT 6969 - -/* This is the size set through setsockopt for - * both the TCP receive window size and the - * send buffer size. - * Till the time iobuf size becomes configurable, this size is set to include - * two iobufs + the GlusterFS protocol headers. - * Linux allows us to over-ride the max values for the system. - * Should we over-ride them? Because if we set a value larger than the default - * setsockopt will fail. Having larger values might be beneficial for - * IB links. - */ -#define GF_DEFAULT_SOCKET_WINDOW_SIZE (512 * GF_UNIT_KB) -#define GF_MAX_SOCKET_WINDOW_SIZE (1 * GF_UNIT_MB) -#define GF_MIN_SOCKET_WINDOW_SIZE (128 * GF_UNIT_KB) - -typedef enum { - SP_STATE_NADA = 0, - SP_STATE_COMPLETE, - SP_STATE_READING_FRAGHDR, - SP_STATE_READ_FRAGHDR, - SP_STATE_READING_FRAG, -} sp_rpcrecord_state_t; - -typedef enum { - SP_STATE_RPCFRAG_INIT, - SP_STATE_READING_MSGTYPE, - SP_STATE_READ_MSGTYPE, -} sp_rpcfrag_state_t; - -typedef enum { - SP_STATE_SIMPLE_MSG_INIT, - SP_STATE_READING_SIMPLE_MSG, -} sp_rpcfrag_simple_msg_state_t; - -typedef enum { - SP_STATE_VECTORED_REQUEST_INIT, - SP_STATE_READING_CREDBYTES, - SP_STATE_READ_CREDBYTES, /* read credential data. */ - SP_STATE_READING_VERFBYTES, - SP_STATE_READ_VERFBYTES, /* read verifier data */ - SP_STATE_READING_PROG, -} sp_rpcfrag_vectored_request_state_t; - -typedef enum { - SP_STATE_REQUEST_HEADER_INIT, - SP_STATE_READING_RPCHDR1, - SP_STATE_READ_RPCHDR1, /* read msg from beginning till and - * including credlen - */ -} sp_rpcfrag_request_header_state_t; - -struct ioq { - union { - struct list_head list; - struct { - struct ioq *next; - struct ioq *prev; - }; - }; - - struct iovec vector[MAX_IOVEC]; - int count; - struct iovec *pending_vector; - int pending_count; - struct iobref *iobref; -}; - -typedef struct { - sp_rpcfrag_request_header_state_t header_state; - sp_rpcfrag_vectored_request_state_t vector_state; -} sp_rpcfrag_request_state_t; - -typedef enum { - SP_STATE_VECTORED_REPLY_STATUS_INIT, - SP_STATE_READING_REPLY_STATUS, - SP_STATE_READ_REPLY_STATUS, -} sp_rpcfrag_vectored_reply_status_state_t; - -typedef enum { - SP_STATE_ACCEPTED_SUCCESS_REPLY_INIT, - SP_STATE_READING_PROC_HEADER, - SP_STATE_READ_PROC_HEADER, -} sp_rpcfrag_vectored_reply_accepted_success_state_t; - -typedef enum { - SP_STATE_ACCEPTED_REPLY_INIT, - SP_STATE_READING_REPLY_VERFLEN, - SP_STATE_READ_REPLY_VERFLEN, - SP_STATE_READING_REPLY_VERFBYTES, - SP_STATE_READ_REPLY_VERFBYTES, -} sp_rpcfrag_vectored_reply_accepted_state_t; - -typedef struct { - uint32_t accept_status; - sp_rpcfrag_vectored_reply_status_state_t status_state; - sp_rpcfrag_vectored_reply_accepted_state_t accepted_state; - sp_rpcfrag_vectored_reply_accepted_success_state_t accepted_success_state; -} sp_rpcfrag_vectored_reply_state_t; - -typedef struct { - int32_t sock; - int32_t idx; - unsigned char connected; // -1 = not connected. 0 = in progress. 1 = connected - char bio; - char connect_finish_log; - char submit_log; - union { - struct list_head ioq; - struct { - struct ioq *ioq_next; - struct ioq *ioq_prev; - }; - }; - struct { - sp_rpcrecord_state_t record_state; - struct { - char *fragcurrent; - uint32_t bytes_read; - uint32_t remaining_size; - struct iovec vector; - struct iovec *pending_vector; - union { - sp_rpcfrag_request_state_t request; - sp_rpcfrag_vectored_reply_state_t reply; - } call_body; - - sp_rpcfrag_simple_msg_state_t simple_state; - sp_rpcfrag_state_t state; - } frag; - struct iobuf *iobuf; - size_t iobuf_size; - struct iovec vector[2]; - int count; - struct iobuf *vectoriob; - size_t vectoriob_size; - rpc_request_info_t *request_info; - struct iovec *pending_vector; - int pending_count; - uint32_t fraghdr; - char complete_record; - msg_type_t msg_type; - size_t total_bytes_read; - } incoming; - pthread_mutex_t lock; - int windowsize; - char lowlat; - char nodelay; -} socket_private_t; - - -#endif -- cgit