summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-lib/src
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-lib/src')
-rw-r--r--rpc/rpc-lib/src/Makefile.am2
-rw-r--r--rpc/rpc-lib/src/auth-glusterfs.c3
-rw-r--r--rpc/rpc-lib/src/auth-null.c3
-rw-r--r--rpc/rpc-lib/src/auth-unix.c3
-rw-r--r--rpc/rpc-lib/src/autoscale-threads.c5
-rw-r--r--rpc/rpc-lib/src/libgfrpc.sym4
-rw-r--r--rpc/rpc-lib/src/mgmt-pmap.c7
-rw-r--r--rpc/rpc-lib/src/protocol-common.h4
-rw-r--r--rpc/rpc-lib/src/rpc-clnt-ping.c40
-rw-r--r--rpc/rpc-lib/src/rpc-clnt-ping.h1
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.c212
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.h45
-rw-r--r--rpc/rpc-lib/src/rpc-drc.c51
-rw-r--r--rpc/rpc-lib/src/rpc-drc.h20
-rw-r--r--rpc/rpc-lib/src/rpc-lib-messages.h2
-rw-r--r--rpc/rpc-lib/src/rpc-transport.c164
-rw-r--r--rpc/rpc-lib/src/rpc-transport.h74
-rw-r--r--rpc/rpc-lib/src/rpcsvc-auth.c46
-rw-r--r--rpc/rpc-lib/src/rpcsvc-common.h20
-rw-r--r--rpc/rpc-lib/src/rpcsvc.c563
-rw-r--r--rpc/rpc-lib/src/rpcsvc.h184
-rw-r--r--rpc/rpc-lib/src/xdr-common.h2
-rw-r--r--rpc/rpc-lib/src/xdr-rpc.c6
-rw-r--r--rpc/rpc-lib/src/xdr-rpc.h1
-rw-r--r--rpc/rpc-lib/src/xdr-rpcclnt.c6
-rw-r--r--rpc/rpc-lib/src/xdr-rpcclnt.h2
26 files changed, 832 insertions, 638 deletions
diff --git a/rpc/rpc-lib/src/Makefile.am b/rpc/rpc-lib/src/Makefile.am
index 81a96476883..35c9db07e7f 100644
--- a/rpc/rpc-lib/src/Makefile.am
+++ b/rpc/rpc-lib/src/Makefile.am
@@ -2,7 +2,7 @@ lib_LTLIBRARIES = libgfrpc.la
libgfrpc_la_SOURCES = auth-unix.c rpcsvc-auth.c rpcsvc.c auth-null.c \
rpc-transport.c xdr-rpc.c xdr-rpcclnt.c rpc-clnt.c auth-glusterfs.c \
- rpc-drc.c $(CONTRIBDIR)/sunrpc/xdr_sizeof.c rpc-clnt-ping.c \
+ rpc-drc.c rpc-clnt-ping.c \
autoscale-threads.c mgmt-pmap.c
EXTRA_DIST = libgfrpc.sym
diff --git a/rpc/rpc-lib/src/auth-glusterfs.c b/rpc/rpc-lib/src/auth-glusterfs.c
index d569a0403f8..69a96f7512f 100644
--- a/rpc/rpc-lib/src/auth-glusterfs.c
+++ b/rpc/rpc-lib/src/auth-glusterfs.c
@@ -9,8 +9,7 @@
*/
#include "rpcsvc.h"
-#include "list.h"
-#include "dict.h"
+#include <glusterfs/dict.h>
#include "xdr-rpc.h"
#include "xdr-common.h"
#include "rpc-common-xdr.h"
diff --git a/rpc/rpc-lib/src/auth-null.c b/rpc/rpc-lib/src/auth-null.c
index 46046e8e440..6d059b9da50 100644
--- a/rpc/rpc-lib/src/auth-null.c
+++ b/rpc/rpc-lib/src/auth-null.c
@@ -9,8 +9,7 @@
*/
#include "rpcsvc.h"
-#include "list.h"
-#include "dict.h"
+#include <glusterfs/dict.h>
int
auth_null_request_init(rpcsvc_request_t *req, void *priv)
diff --git a/rpc/rpc-lib/src/auth-unix.c b/rpc/rpc-lib/src/auth-unix.c
index c53870fcf94..61d475a5e84 100644
--- a/rpc/rpc-lib/src/auth-unix.c
+++ b/rpc/rpc-lib/src/auth-unix.c
@@ -9,8 +9,7 @@
*/
#include "rpcsvc.h"
-#include "list.h"
-#include "dict.h"
+#include <glusterfs/dict.h>
#include "xdr-rpc.h"
int
diff --git a/rpc/rpc-lib/src/autoscale-threads.c b/rpc/rpc-lib/src/autoscale-threads.c
index 337f002df10..a954ae7a27a 100644
--- a/rpc/rpc-lib/src/autoscale-threads.c
+++ b/rpc/rpc-lib/src/autoscale-threads.c
@@ -8,7 +8,7 @@
cases as published by the Free Software Foundation.
*/
-#include "gf-event.h"
+#include <glusterfs/gf-event.h>
#include "rpcsvc.h"
void
@@ -18,6 +18,5 @@ rpcsvc_autoscale_threads(glusterfs_ctx_t *ctx, rpcsvc_t *rpc, int incr)
int thread_count = pool->eventthreadcount;
pool->auto_thread_count += incr;
- (void)event_reconfigure_threads(pool, thread_count + incr);
- rpcsvc_ownthread_reconf(rpc, pool->eventthreadcount);
+ (void)gf_event_reconfigure_threads(pool, thread_count + incr);
}
diff --git a/rpc/rpc-lib/src/libgfrpc.sym b/rpc/rpc-lib/src/libgfrpc.sym
index a7cb5f6b5cb..e026d80259b 100644
--- a/rpc/rpc-lib/src/libgfrpc.sym
+++ b/rpc/rpc-lib/src/libgfrpc.sym
@@ -3,7 +3,6 @@ rpcclnt_cbk_program_register
rpc_clnt_cleanup_and_start
rpc_clnt_connection_cleanup
rpc_clnt_disable
-rpc_clnt_disconnect
rpc_clnt_new
rpc_clnt_reconfig
rpc_clnt_reconnect
@@ -26,6 +25,7 @@ rpcsvc_drc_priv
rpcsvc_drc_reconfigure
rpcsvc_get_program_vector_sizer
rpcsvc_init
+rpcsvc_destroy
rpcsvc_init_options
rpcsvc_listener_destroy
rpcsvc_program_register
@@ -51,7 +51,6 @@ rpcsvc_transport_connect
rpcsvc_transport_getpeeraddr
rpcsvc_unregister_notify
rpcsvc_volume_allowed
-rpcsvc_ownthread_reconf
rpc_transport_count
rpc_transport_connect
rpc_transport_disconnect
@@ -66,3 +65,4 @@ rpc_transport_unix_options_build
rpc_transport_unref
rpc_clnt_mgmt_pmap_signout
rpcsvc_autoscale_threads
+rpcsvc_statedump
diff --git a/rpc/rpc-lib/src/mgmt-pmap.c b/rpc/rpc-lib/src/mgmt-pmap.c
index 344ec56bbf7..25a7148e5a3 100644
--- a/rpc/rpc-lib/src/mgmt-pmap.c
+++ b/rpc/rpc-lib/src/mgmt-pmap.c
@@ -12,7 +12,6 @@
#include "protocol-common.h"
#include "rpc-clnt.h"
#include "xdr-generic.h"
-#include "xlator.h"
/* Defining a minimal RPC client program for portmap signout
*/
@@ -35,7 +34,9 @@ mgmt_pmap_signout_cbk(struct rpc_req *req, struct iovec *iov, int count,
0,
};
int ret = 0;
+ call_frame_t *frame = NULL;
+ frame = myframe;
if (-1 == req->rpc_status) {
rsp.op_ret = -1;
rsp.op_errno = EINVAL;
@@ -56,6 +57,10 @@ mgmt_pmap_signout_cbk(struct rpc_req *req, struct iovec *iov, int count,
goto out;
}
out:
+ if (frame) {
+ STACK_DESTROY(frame->root);
+ }
+
return 0;
}
diff --git a/rpc/rpc-lib/src/protocol-common.h b/rpc/rpc-lib/src/protocol-common.h
index 4950857ae9e..0cb5862e9a9 100644
--- a/rpc/rpc-lib/src/protocol-common.h
+++ b/rpc/rpc-lib/src/protocol-common.h
@@ -68,6 +68,7 @@ enum gf_fop_procnum {
GFS3_OP_ICREATE,
GFS3_OP_NAMELINK,
GFS3_OP_PUT,
+ GFS3_OP_COPY_FILE_RANGE,
GFS3_OP_MAXVALUE,
};
@@ -244,6 +245,8 @@ enum glusterd_brick_procnum {
GLUSTERD_NODE_BITROT,
GLUSTERD_BRICK_ATTACH,
GLUSTERD_DUMP_METRICS,
+ GLUSTERD_SVC_ATTACH,
+ GLUSTERD_SVC_DETACH,
GLUSTERD_BRICK_MAXVALUE,
};
@@ -306,6 +309,7 @@ enum glusterd_mgmt_v3_procnum {
GLUSTERD_MGMT_V3_PRE_VALIDATE,
GLUSTERD_MGMT_V3_BRICK_OP,
GLUSTERD_MGMT_V3_COMMIT,
+ GLUSTERD_MGMT_V3_POST_COMMIT,
GLUSTERD_MGMT_V3_POST_VALIDATE,
GLUSTERD_MGMT_V3_UNLOCK,
GLUSTERD_MGMT_V3_MAXVALUE,
diff --git a/rpc/rpc-lib/src/rpc-clnt-ping.c b/rpc/rpc-lib/src/rpc-clnt-ping.c
index a98a83dd8c8..31f17841bea 100644
--- a/rpc/rpc-lib/src/rpc-clnt-ping.c
+++ b/rpc/rpc-lib/src/rpc-clnt-ping.c
@@ -10,14 +10,14 @@
#include "rpc-clnt.h"
#include "rpc-clnt-ping.h"
-#include "byte-order.h"
+#include <glusterfs/byte-order.h>
#include "xdr-rpcclnt.h"
#include "rpc-transport.h"
#include "protocol-common.h"
-#include "mem-pool.h"
+#include <glusterfs/mem-pool.h>
#include "xdr-rpc.h"
#include "rpc-common-xdr.h"
-#include "timespec.h"
+#include <glusterfs/timespec.h>
char *clnt_ping_procs[GF_DUMP_MAXVALUE] = {
[GF_DUMP_PING] = "NULL",
@@ -108,7 +108,6 @@ rpc_clnt_ping_timer_expired(void *rpc_ptr)
rpc_transport_t *trans = NULL;
rpc_clnt_connection_t *conn = NULL;
int disconnect = 0;
- int transport_activity = 0;
struct timespec current = {
0,
};
@@ -123,28 +122,22 @@ rpc_clnt_ping_timer_expired(void *rpc_ptr)
goto out;
}
+ timespec_now_realtime(&current);
pthread_mutex_lock(&conn->lock);
{
unref = rpc_clnt_remove_ping_timer_locked(rpc);
- clock_gettime(CLOCK_REALTIME, &current);
if (((current.tv_sec - conn->last_received.tv_sec) <
conn->ping_timeout) ||
((current.tv_sec - conn->last_sent.tv_sec) < conn->ping_timeout)) {
- transport_activity = 1;
- }
-
- if (transport_activity) {
gf_log(trans->name, GF_LOG_TRACE,
"ping timer expired but transport activity "
"detected - not bailing transport");
-
if (__rpc_clnt_rearm_ping_timer(rpc, rpc_clnt_ping_timer_expired) ==
-1) {
gf_log(trans->name, GF_LOG_WARNING,
"unable to setup ping timer");
}
-
} else {
conn->ping_started = 0;
disconnect = 1;
@@ -198,14 +191,16 @@ rpc_clnt_ping_cbk(struct rpc_req *req, struct iovec *iov, int count,
timespec_sub(&local->submit_time, &now, &delta);
latency_msec = delta.tv_sec * 1000 + delta.tv_nsec / 1000000;
+ gf_log(THIS->name, GF_LOG_DEBUG, "Ping latency is %" PRIu64 "ms",
+ latency_msec);
+ call_notify = _gf_true;
+
pthread_mutex_lock(&conn->lock);
{
- gf_log(THIS->name, GF_LOG_DEBUG, "Ping latency is %" PRIu64 "ms",
- latency_msec);
-
- call_notify = _gf_true;
+ unref = rpc_clnt_remove_ping_timer_locked(local->rpc);
if (req->rpc_status == -1) {
- unref = rpc_clnt_remove_ping_timer_locked(local->rpc);
+ conn->ping_started = 0;
+ pthread_mutex_unlock(&conn->lock);
if (unref) {
gf_log(this->name, GF_LOG_WARNING,
"socket or ib related error");
@@ -214,19 +209,20 @@ rpc_clnt_ping_cbk(struct rpc_req *req, struct iovec *iov, int count,
/* timer expired and transport bailed out */
gf_log(this->name, GF_LOG_WARNING, "socket disconnected");
}
- conn->ping_started = 0;
- goto unlock;
+ goto after_unlock;
}
- unref = rpc_clnt_remove_ping_timer_locked(local->rpc);
if (__rpc_clnt_rearm_ping_timer(local->rpc, rpc_clnt_start_ping) ==
-1) {
+ /* unlock before logging error */
+ pthread_mutex_unlock(&conn->lock);
gf_log(this->name, GF_LOG_WARNING, "failed to set the ping timer");
+ } else {
+ /* just unlock the mutex */
+ pthread_mutex_unlock(&conn->lock);
}
}
-unlock:
- pthread_mutex_unlock(&conn->lock);
-
+after_unlock:
if (call_notify) {
ret = local->rpc->notifyfn(local->rpc, this, RPC_CLNT_PING,
(void *)(uintptr_t)latency_msec);
diff --git a/rpc/rpc-lib/src/rpc-clnt-ping.h b/rpc/rpc-lib/src/rpc-clnt-ping.h
index fcbac6f9e21..e5466a828c2 100644
--- a/rpc/rpc-lib/src/rpc-clnt-ping.h
+++ b/rpc/rpc-lib/src/rpc-clnt-ping.h
@@ -8,6 +8,7 @@
cases as published by the Free Software Foundation.
*/
+struct rpc_clnt;
#define RPC_DEFAULT_PING_TIMEOUT 30
void
rpc_clnt_check_and_start_ping(struct rpc_clnt *rpc_ptr);
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index c5236251549..517037c4a5d 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -12,11 +12,11 @@
#include "rpc-clnt.h"
#include "rpc-clnt-ping.h"
-#include "byte-order.h"
+#include <glusterfs/byte-order.h>
#include "xdr-rpcclnt.h"
#include "rpc-transport.h"
#include "protocol-common.h"
-#include "mem-pool.h"
+#include <glusterfs/mem-pool.h>
#include "xdr-rpc.h"
#include "rpc-common-xdr.h"
@@ -55,25 +55,25 @@ _is_lock_fop(struct saved_frame *sframe)
(fop == GFS3_OP_FENTRYLK));
}
-struct saved_frame *
+static struct saved_frame *
__saved_frames_put(struct saved_frames *frames, void *frame,
struct rpc_req *rpcreq)
{
- struct saved_frame *saved_frame = NULL;
+ struct saved_frame *saved_frame = mem_get(
+ rpcreq->conn->rpc_clnt->saved_frames_pool);
- saved_frame = mem_get(rpcreq->conn->rpc_clnt->saved_frames_pool);
if (!saved_frame) {
goto out;
}
/* THIS should be saved and set back */
- memset(saved_frame, 0, sizeof(*saved_frame));
INIT_LIST_HEAD(&saved_frame->list);
saved_frame->capital_this = THIS;
saved_frame->frame = frame;
saved_frame->rpcreq = rpcreq;
gettimeofday(&saved_frame->saved_at, NULL);
+ memset(&saved_frame->rsp, 0, sizeof(rpc_transport_rsp_t));
if (_is_lock_fop(saved_frame))
list_add_tail(&saved_frame->list, &frames->lk_sf.list);
@@ -97,7 +97,7 @@ call_bail(void *data)
struct saved_frame *saved_frame = NULL;
struct saved_frame *trav = NULL;
struct saved_frame *tmp = NULL;
- char frame_sent[256] = {
+ char frame_sent[GF_TIMESTR_SIZE] = {
0,
};
struct timespec timeout = {
@@ -105,7 +105,6 @@ call_bail(void *data)
};
char peerid[UNIX_PATH_MAX] = {0};
gf_boolean_t need_unref = _gf_false;
- int len;
GF_VALIDATE_OR_GOTO("client", data, out);
@@ -116,8 +115,8 @@ call_bail(void *data)
{
trans = conn->trans;
if (trans) {
- strncpy(peerid, conn->trans->peerinfo.identifier,
- sizeof(peerid) - 1);
+ (void)snprintf(peerid, sizeof(peerid), "%s",
+ conn->trans->peerinfo.identifier);
}
}
pthread_mutex_unlock(&conn->lock);
@@ -160,13 +159,13 @@ call_bail(void *data)
}
pthread_mutex_unlock(&conn->lock);
+ if (list_empty(&list))
+ goto out;
+
list_for_each_entry_safe(trav, tmp, &list, list)
{
- gf_time_fmt(frame_sent, sizeof frame_sent, trav->saved_at.tv_sec,
- gf_timefmt_FT);
- len = strlen(frame_sent);
- snprintf(frame_sent + len, sizeof(frame_sent) - len,
- ".%" GF_PRI_SUSECONDS, trav->saved_at.tv_usec);
+ gf_time_fmt_tv(frame_sent, sizeof frame_sent, &trav->saved_at,
+ gf_timefmt_FT);
gf_log(conn->name, GF_LOG_ERROR,
"bailing out frame type(%s), op(%s(%d)), xid = 0x%x, "
@@ -196,19 +195,16 @@ out:
}
/* to be called with conn->lock held */
-struct saved_frame *
+static struct saved_frame *
__save_frame(struct rpc_clnt *rpc_clnt, call_frame_t *frame,
struct rpc_req *rpcreq)
{
- rpc_clnt_connection_t *conn = NULL;
+ rpc_clnt_connection_t *conn = &rpc_clnt->conn;
struct timespec timeout = {
0,
};
- struct saved_frame *saved_frame = NULL;
-
- conn = &rpc_clnt->conn;
-
- saved_frame = __saved_frames_put(conn->saved_frames, frame, rpcreq);
+ struct saved_frame *saved_frame = __saved_frames_put(conn->saved_frames,
+ frame, rpcreq);
if (saved_frame == NULL) {
goto out;
@@ -317,20 +313,15 @@ saved_frames_unwind(struct saved_frames *saved_frames)
{
struct saved_frame *trav = NULL;
struct saved_frame *tmp = NULL;
- char timestr[1024] = {
+ char timestr[GF_TIMESTR_SIZE] = {
0,
};
- int len;
list_splice_init(&saved_frames->lk_sf.list, &saved_frames->sf.list);
list_for_each_entry_safe(trav, tmp, &saved_frames->sf.list, list)
{
- gf_time_fmt(timestr, sizeof timestr, trav->saved_at.tv_sec,
- gf_timefmt_FT);
- len = strlen(timestr);
- snprintf(timestr + len, sizeof(timestr) - len, ".%" GF_PRI_SUSECONDS,
- trav->saved_at.tv_usec);
+ gf_time_fmt_tv(timestr, sizeof timestr, &trav->saved_at, gf_timefmt_FT);
if (!trav->rpcreq || !trav->rpcreq->prog)
continue;
@@ -376,19 +367,20 @@ rpc_clnt_reconnect(void *conn_ptr)
struct timespec ts = {0, 0};
struct rpc_clnt *clnt = NULL;
gf_boolean_t need_unref = _gf_false;
+ gf_boolean_t canceled_unref = _gf_false;
conn = conn_ptr;
clnt = conn->rpc_clnt;
-
pthread_mutex_lock(&conn->lock);
{
trans = conn->trans;
- if (!trans) {
- pthread_mutex_unlock(&conn->lock);
- return;
+ if (!trans)
+ goto out_unlock;
+
+ if (conn->reconnect) {
+ if (!gf_timer_call_cancel(clnt->ctx, conn->reconnect))
+ canceled_unref = _gf_true;
}
- if (conn->reconnect)
- gf_timer_call_cancel(clnt->ctx, conn->reconnect);
conn->reconnect = 0;
if ((conn->connected == 0) && !clnt->disabled) {
@@ -409,11 +401,14 @@ rpc_clnt_reconnect(void *conn_ptr)
gf_log(conn->name, GF_LOG_TRACE, "breaking reconnect chain");
}
}
+out_unlock:
pthread_mutex_unlock(&conn->lock);
rpc_clnt_unref(clnt);
if (need_unref)
rpc_clnt_unref(clnt);
+ if (canceled_unref)
+ rpc_clnt_unref(clnt);
return;
}
@@ -495,6 +490,7 @@ rpc_clnt_connection_cleanup(rpc_clnt_connection_t *conn)
int unref = 0;
int ret = 0;
gf_boolean_t timer_unref = _gf_false;
+ gf_boolean_t reconnect_unref = _gf_false;
if (!conn) {
goto out;
@@ -514,6 +510,12 @@ rpc_clnt_connection_cleanup(rpc_clnt_connection_t *conn)
timer_unref = _gf_true;
conn->timer = NULL;
}
+ if (conn->reconnect) {
+ ret = gf_timer_call_cancel(clnt->ctx, conn->reconnect);
+ if (!ret)
+ reconnect_unref = _gf_true;
+ conn->reconnect = NULL;
+ }
conn->connected = 0;
conn->disconnected = 1;
@@ -533,6 +535,8 @@ rpc_clnt_connection_cleanup(rpc_clnt_connection_t *conn)
if (timer_unref)
rpc_clnt_unref(clnt);
+ if (reconnect_unref)
+ rpc_clnt_unref(clnt);
out:
return 0;
}
@@ -777,8 +781,7 @@ is_rpc_clnt_disconnected(rpc_clnt_connection_t *conn)
pthread_mutex_lock(&conn->lock);
{
- if (conn->disconnected == _gf_false)
- disconnected = _gf_false;
+ disconnected = conn->disconnected;
}
pthread_mutex_unlock(&conn->lock);
@@ -831,7 +834,7 @@ rpc_clnt_handle_disconnect(struct rpc_clnt *clnt, rpc_clnt_connection_t *conn)
pthread_mutex_lock(&conn->lock);
{
if (!conn->rpc_clnt->disabled && (conn->reconnect == NULL)) {
- ts.tv_sec = 10;
+ ts.tv_sec = 3;
ts.tv_nsec = 0;
rpc_clnt_ref(clnt);
@@ -916,7 +919,7 @@ rpc_clnt_notify(rpc_transport_t *trans, void *mydata,
}
case RPC_TRANSPORT_MSG_RECEIVED: {
- clock_gettime(CLOCK_REALTIME, &conn->last_received);
+ timespec_now_realtime(&conn->last_received);
pollin = data;
if (pollin->is_reply)
@@ -930,8 +933,7 @@ rpc_clnt_notify(rpc_transport_t *trans, void *mydata,
}
case RPC_TRANSPORT_MSG_SENT: {
- clock_gettime(CLOCK_REALTIME, &conn->last_sent);
-
+ timespec_now_realtime(&conn->last_sent);
ret = 0;
break;
}
@@ -948,6 +950,7 @@ rpc_clnt_notify(rpc_transport_t *trans, void *mydata,
conn->config.remote_port = 0;
conn->connected = 1;
conn->disconnected = 0;
+ pthread_cond_broadcast(&conn->cond);
}
pthread_mutex_unlock(&conn->lock);
@@ -970,6 +973,12 @@ rpc_clnt_notify(rpc_transport_t *trans, void *mydata,
*/
ret = 0;
break;
+
+ case RPC_TRANSPORT_EVENT_THREAD_DIED:
+ /* only meaningful on a server, no need of handling this event on a
+ * client */
+ ret = 0;
+ break;
}
out:
@@ -987,6 +996,7 @@ rpc_clnt_connection_init(struct rpc_clnt *clnt, glusterfs_ctx_t *ctx,
conn = &clnt->conn;
pthread_mutex_init(&clnt->conn.lock, NULL);
+ pthread_cond_init(&clnt->conn.cond, NULL);
conn->name = gf_strdup(name);
if (!conn->name) {
@@ -1111,8 +1121,6 @@ rpc_clnt_new(dict_t *options, xlator_t *owner, char *name,
mem_pool_destroy(rpc->saved_frames_pool);
GF_FREE(rpc);
rpc = NULL;
- if (options)
- dict_unref(options);
goto out;
}
@@ -1674,13 +1682,17 @@ rpc_clnt_submit(struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, int procnum,
pthread_mutex_lock(&conn->lock);
{
- if (conn->connected == 0 && !rpc->disabled) {
+ if (conn->connected == 0) {
+ if (rpc->disabled)
+ goto unlock;
ret = rpc_transport_connect(conn->trans, conn->config.remote_port);
if (ret < 0) {
- gf_log(conn->name, GF_LOG_WARNING,
+ gf_log(conn->name,
+ (errno == EINPROGRESS) ? GF_LOG_DEBUG : GF_LOG_WARNING,
"error returned while attempting to "
"connect to host:%s, port:%d",
conn->config.remote_host, conn->config.remote_port);
+ goto unlock;
}
}
@@ -1693,9 +1705,7 @@ rpc_clnt_submit(struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, int procnum,
"ProgVers: %d, Proc: %d) to rpc-transport (%s)",
cframe->root->unique, rpcreq->xid, rpcreq->prog->progname,
rpcreq->prog->progver, rpcreq->procnum, conn->name);
- }
-
- if ((ret >= 0) && frame) {
+ } else if ((ret >= 0) && frame) {
/* Save the frame in queue */
__save_frame(rpc, frame, rpcreq);
@@ -1717,6 +1727,7 @@ rpc_clnt_submit(struct rpc_clnt *rpc, rpc_clnt_prog_t *prog, int procnum,
rpcreq->prog->progver, rpcreq->procnum, conn->name);
}
}
+unlock:
pthread_mutex_unlock(&conn->lock);
if (need_unref)
@@ -1771,7 +1782,7 @@ rpc_clnt_trigger_destroy(struct rpc_clnt *rpc)
* ref*/
conn = &rpc->conn;
trans = conn->trans;
- rpc_clnt_disconnect(rpc);
+ rpc_clnt_disable(rpc);
/* This is to account for rpc_clnt_disable that might have been called
* before rpc_clnt_unref */
@@ -1811,6 +1822,7 @@ rpc_clnt_destroy(struct rpc_clnt *rpc)
saved_frames_destroy(saved_frames);
pthread_mutex_destroy(&rpc->lock);
pthread_mutex_destroy(&rpc->conn.lock);
+ pthread_cond_destroy(&rpc->conn.cond);
/* mem-pool should be destroyed, otherwise,
it will cause huge memory leaks */
@@ -1843,29 +1855,7 @@ rpc_clnt_unref(struct rpc_clnt *rpc)
return rpc;
}
-char
-rpc_clnt_is_disabled(struct rpc_clnt *rpc)
-{
- rpc_clnt_connection_t *conn = NULL;
- char disabled = 0;
-
- if (!rpc) {
- goto out;
- }
-
- conn = &rpc->conn;
-
- pthread_mutex_lock(&conn->lock);
- {
- disabled = rpc->disabled;
- }
- pthread_mutex_unlock(&conn->lock);
-
-out:
- return disabled;
-}
-
-void
+int
rpc_clnt_disable(struct rpc_clnt *rpc)
{
rpc_clnt_connection_t *conn = NULL;
@@ -1909,81 +1899,9 @@ rpc_clnt_disable(struct rpc_clnt *rpc)
}
pthread_mutex_unlock(&conn->lock);
+ ret = -1;
if (trans) {
- rpc_transport_disconnect(trans, _gf_true);
- /* The auth_value was being reset to AUTH_GLUSTERFS_v2.
- * if (clnt->auth_value)
- * clnt->auth_value = AUTH_GLUSTERFS_v2;
- * It should not be reset here. The disconnect during
- * portmap request can race with handshake. If handshake
- * happens first and disconnect later, auth_value would set
- * to default value and it never sets back to actual auth_value
- * supported by server. But it's important to set to lower
- * version supported in the case where the server downgrades.
- * So moving this code to RPC_TRANSPORT_CONNECT. Note that
- * CONNECT cannot race with handshake as by nature it is
- * serialized with handhake. An handshake can happen only
- * on a connected transport and hence its strictly serialized.
- */
- }
-
- if (unref)
- rpc_clnt_unref(rpc);
-
- if (timer_unref)
- rpc_clnt_unref(rpc);
-
- if (reconnect_unref)
- rpc_clnt_unref(rpc);
-
-out:
- return;
-}
-
-void
-rpc_clnt_disconnect(struct rpc_clnt *rpc)
-{
- rpc_clnt_connection_t *conn = NULL;
- rpc_transport_t *trans = NULL;
- int unref = 0;
- int ret = 0;
- gf_boolean_t timer_unref = _gf_false;
- gf_boolean_t reconnect_unref = _gf_false;
-
- if (!rpc)
- goto out;
-
- conn = &rpc->conn;
-
- pthread_mutex_lock(&conn->lock);
- {
- rpc->disabled = 1;
- if (conn->timer) {
- ret = gf_timer_call_cancel(rpc->ctx, conn->timer);
- /* If the event is not fired and it actually cancelled
- * the timer, do the unref else registered call back
- * function will take care of unref.
- */
- if (!ret)
- timer_unref = _gf_true;
- conn->timer = NULL;
- }
-
- if (conn->reconnect) {
- ret = gf_timer_call_cancel(rpc->ctx, conn->reconnect);
- if (!ret)
- reconnect_unref = _gf_true;
- conn->reconnect = NULL;
- }
- conn->connected = 0;
-
- unref = rpc_clnt_remove_ping_timer_locked(rpc);
- trans = conn->trans;
- }
- pthread_mutex_unlock(&conn->lock);
-
- if (trans) {
- rpc_transport_disconnect(trans, _gf_true);
+ ret = rpc_transport_disconnect(trans, _gf_true);
/* The auth_value was being reset to AUTH_GLUSTERFS_v2.
* if (clnt->auth_value)
* clnt->auth_value = AUTH_GLUSTERFS_v2;
@@ -2009,7 +1927,7 @@ rpc_clnt_disconnect(struct rpc_clnt *rpc)
rpc_clnt_unref(rpc);
out:
- return;
+ return ret;
}
void
diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h
index 28b6f0e7f4d..2945265200b 100644
--- a/rpc/rpc-lib/src/rpc-clnt.h
+++ b/rpc/rpc-lib/src/rpc-clnt.h
@@ -11,9 +11,9 @@
#ifndef __RPC_CLNT_H
#define __RPC_CLNT_H
-#include "stack.h"
+#include <glusterfs/stack.h>
#include "rpc-transport.h"
-#include "timer.h"
+#include <glusterfs/timer.h>
#include "xdr-common.h"
#include "glusterfs3.h"
@@ -52,8 +52,8 @@ struct saved_frame {
};
void *capital_this;
void *frame;
- struct timeval saved_at;
struct rpc_req *rpcreq;
+ struct timeval saved_at;
rpc_transport_rsp_t rsp;
};
@@ -85,8 +85,8 @@ typedef int (*rpcclnt_cb_fn)(struct rpc_clnt *rpc, void *mydata, void *data);
*/
typedef struct rpcclnt_actor_desc {
char procname[32];
- int procnum;
rpcclnt_cb_fn actor;
+ int procnum;
} rpcclnt_cb_actor_t;
/* Describes a program and its version along with the function pointers
@@ -98,8 +98,6 @@ typedef struct rpcclnt_cb_program {
int prognum;
int progver;
rpcclnt_cb_actor_t *actors; /* All procedure handlers */
- int numactors; /* Num actors in actor array */
-
/* Program specific state handed to actors */
void *private;
@@ -108,6 +106,8 @@ typedef struct rpcclnt_cb_program {
/* Needed for passing back in cb_actor */
void *mydata;
+ int numactors; /* Num actors in actor array */
+
} rpcclnt_cb_program_t;
typedef struct rpc_auth_data {
@@ -127,42 +127,43 @@ struct rpc_clnt_config {
struct rpc_clnt_connection {
pthread_mutex_t lock;
+ pthread_cond_t cond;
rpc_transport_t *trans;
struct rpc_clnt_config config;
gf_timer_t *reconnect;
gf_timer_t *timer;
gf_timer_t *ping_timer;
struct rpc_clnt *rpc_clnt;
- char connected;
- gf_boolean_t disconnected;
struct saved_frames *saved_frames;
- int32_t frame_timeout;
struct timespec last_sent;
struct timespec last_received;
- int32_t ping_started;
- char *name;
- int32_t ping_timeout;
uint64_t pingcnt;
uint64_t msgcnt;
uint64_t cleanup_gen;
+ char *name;
+ int32_t ping_started;
+ int32_t frame_timeout;
+ int32_t ping_timeout;
+ gf_boolean_t disconnected;
+ char connected;
};
typedef struct rpc_clnt_connection rpc_clnt_connection_t;
struct rpc_req {
rpc_clnt_connection_t *conn;
- uint32_t xid;
struct iovec req[2];
- int reqcnt;
struct iobref *req_iobref;
struct iovec rsp[2];
int rspcnt;
+ int reqcnt;
struct iobref *rsp_iobref;
- int rpc_status;
- rpc_auth_data_t verf;
rpc_clnt_prog_t *prog;
- int procnum;
+ rpc_auth_data_t verf;
fop_cbk_fn_t cbkfn;
void *conn_private;
+ int procnum;
+ int rpc_status;
+ uint32_t xid;
};
typedef struct rpc_clnt {
@@ -182,9 +183,9 @@ typedef struct rpc_clnt {
glusterfs_ctx_t *ctx;
gf_atomic_t refcount;
+ xlator_t *owner;
int auth_value;
char disabled;
- xlator_t *owner;
} rpc_clnt_t;
struct rpc_clnt *
@@ -250,15 +251,9 @@ int
rpcclnt_cbk_program_register(struct rpc_clnt *svc,
rpcclnt_cb_program_t *program, void *mydata);
-void
+int
rpc_clnt_disable(struct rpc_clnt *rpc);
-void
-rpc_clnt_disconnect(struct rpc_clnt *rpc);
-
-char
-rpc_clnt_is_disabled(struct rpc_clnt *rpc);
-
int
rpc_clnt_mgmt_pmap_signout(glusterfs_ctx_t *ctx, char *brick_name);
diff --git a/rpc/rpc-lib/src/rpc-drc.c b/rpc/rpc-lib/src/rpc-drc.c
index 50013776c86..de8dc630626 100644
--- a/rpc/rpc-lib/src/rpc-drc.c
+++ b/rpc/rpc-lib/src/rpc-drc.c
@@ -12,11 +12,9 @@
#ifndef RPC_DRC_H
#include "rpc-drc.h"
#endif
-#include "locking.h"
-#include "hashfn.h"
-#include "common-utils.h"
-#include "statedump.h"
-#include "mem-pool.h"
+#include <glusterfs/locking.h>
+#include <glusterfs/statedump.h>
+#include <glusterfs/mem-pool.h>
#include <netinet/in.h>
#include <unistd.h>
@@ -192,7 +190,7 @@ rpcsvc_get_drc_client(rpcsvc_drc_globals_t *drc,
if (!client)
goto out;
- client->ref = 0;
+ GF_ATOMIC_INIT(client->ref, 0);
client->sock_union = (union gf_sock_union) * sockaddr;
client->op_count = 0;
INIT_LIST_HEAD(&client->client_list);
@@ -248,7 +246,7 @@ static drc_client_t *
rpcsvc_drc_client_ref(drc_client_t *client)
{
GF_ASSERT(client);
- client->ref++;
+ GF_ATOMIC_INC(client->ref);
return client;
}
@@ -263,11 +261,12 @@ rpcsvc_drc_client_ref(drc_client_t *client)
static drc_client_t *
rpcsvc_drc_client_unref(rpcsvc_drc_globals_t *drc, drc_client_t *client)
{
+ uint32_t refcount;
+
GF_ASSERT(drc);
- GF_ASSERT(client->ref);
- client->ref--;
- if (!client->ref) {
+ refcount = GF_ATOMIC_DEC(client->ref);
+ if (!refcount) {
drc->client_count--;
rpcsvc_remove_drc_client(client);
client = NULL;
@@ -591,7 +590,7 @@ rpcsvc_drc_priv(rpcsvc_drc_globals_t *drc)
}
gf_proc_dump_build_key(key, "client", "%d.ref_count", i);
- gf_proc_dump_write(key, "%d", client->ref);
+ gf_proc_dump_write(key, "%" PRIu32, GF_ATOMIC_GET(client->ref));
gf_proc_dump_build_key(key, "client", "%d.op_count", i);
gf_proc_dump_write(key, "%d", client->op_count);
i++;
@@ -699,47 +698,42 @@ rpcsvc_drc_init(rpcsvc_t *svc, dict_t *options)
LOCK_INIT(&drc->lock);
svc->drc = drc;
- LOCK(&drc->lock);
-
/* Specify type of DRC to be used */
ret = dict_get_uint32(options, "nfs.drc-type", &drc_type);
if (ret) {
gf_log(GF_RPCSVC, GF_LOG_DEBUG,
- "drc type not set."
- " Continuing with default");
+ "drc type not set. Continuing with default");
drc_type = DRC_DEFAULT_TYPE;
}
- drc->type = drc_type;
-
/* Set the global cache size (no. of ops to cache) */
ret = dict_get_uint32(options, "nfs.drc-size", &drc_size);
if (ret) {
gf_log(GF_RPCSVC, GF_LOG_DEBUG,
- "drc size not set."
- " Continuing with default size");
+ "drc size not set. Continuing with default size");
drc_size = DRC_DEFAULT_CACHE_SIZE;
}
+ LOCK(&drc->lock);
+
+ drc->type = drc_type;
drc->global_cache_size = drc_size;
/* Mempool for cached ops */
drc->mempool = mem_pool_new(drc_cached_op_t, drc->global_cache_size);
if (!drc->mempool) {
+ UNLOCK(&drc->lock);
gf_log(GF_RPCSVC, GF_LOG_ERROR,
- "Failed to get mempool for"
- " DRC, drc-size: %d",
- drc->global_cache_size);
+ "Failed to get mempool for DRC, drc-size: %d", drc_size);
ret = -1;
- goto out;
+ goto post_unlock;
}
/* What percent of cache to be evicted whenever it fills up */
ret = dict_get_uint32(options, "nfs.drc-lru-factor", &drc_factor);
if (ret) {
gf_log(GF_RPCSVC, GF_LOG_DEBUG,
- "drc lru factor not set."
- " Continuing with policy default");
+ "drc lru factor not set. Continuing with policy default");
drc_factor = DRC_DEFAULT_LRU_FACTOR;
}
@@ -750,15 +744,16 @@ rpcsvc_drc_init(rpcsvc_t *svc, dict_t *options)
ret = rpcsvc_register_notify(svc, rpcsvc_drc_notify, THIS);
if (ret) {
+ UNLOCK(&drc->lock);
gf_log(GF_RPCSVC, GF_LOG_ERROR,
"registration of drc_notify function failed");
- goto out;
+ goto post_unlock;
}
- gf_log(GF_RPCSVC, GF_LOG_DEBUG, "drc init successful");
drc->status = DRC_INITIATED;
-out:
UNLOCK(&drc->lock);
+ gf_log(GF_RPCSVC, GF_LOG_DEBUG, "drc init successful");
+post_unlock:
if (ret == -1) {
if (drc->mempool) {
mem_pool_destroy(drc->mempool);
diff --git a/rpc/rpc-lib/src/rpc-drc.h b/rpc/rpc-lib/src/rpc-drc.h
index 1fd1e5e24ef..ce66430809b 100644
--- a/rpc/rpc-lib/src/rpc-drc.h
+++ b/rpc/rpc-lib/src/rpc-drc.h
@@ -13,24 +13,23 @@
#include "rpcsvc-common.h"
#include "rpcsvc.h"
-#include "locking.h"
-#include "dict.h"
+#include <glusterfs/locking.h>
+#include <glusterfs/dict.h>
#include "rb.h"
/* per-client cache structure */
struct drc_client {
- uint32_t ref;
union gf_sock_union sock_union;
/* pointers to the cache */
struct rb_table *rbtree;
/* no. of ops currently cached */
uint32_t op_count;
+ gf_atomic_uint32_t ref;
struct list_head client_list;
};
struct drc_cached_op {
drc_op_state_t state;
- uint32_t xid;
int prognum;
int progversion;
int procnum;
@@ -39,6 +38,7 @@ struct drc_cached_op {
struct list_head client_list;
struct list_head global_list;
int32_t ref;
+ uint32_t xid;
};
/* global drc definitions */
@@ -50,19 +50,19 @@ struct drc_globals {
* it is used so in gf_libavl_allocator
*/
struct libavl_allocator allocator;
- drc_type_t type;
/* configurable size parameter */
- uint32_t global_cache_size;
- drc_lru_factor_t lru_factor;
gf_lock_t lock;
- drc_status_t status;
- uint32_t op_count;
uint64_t cache_hits;
uint64_t intransit_hits;
struct mem_pool *mempool;
struct list_head cache_head;
- uint32_t client_count;
struct list_head clients_head;
+ uint32_t op_count;
+ uint32_t client_count;
+ uint32_t global_cache_size;
+ drc_type_t type;
+ drc_lru_factor_t lru_factor;
+ drc_status_t status;
};
int
diff --git a/rpc/rpc-lib/src/rpc-lib-messages.h b/rpc/rpc-lib/src/rpc-lib-messages.h
index 9251578a121..2c0b820dbf9 100644
--- a/rpc/rpc-lib/src/rpc-lib-messages.h
+++ b/rpc/rpc-lib/src/rpc-lib-messages.h
@@ -11,7 +11,7 @@
#ifndef _RPC_LIB_MESSAGES_H_
#define _RPC_LIB_MESSAGES_H_
-#include "glfs-message-id.h"
+#include <glusterfs/glfs-message-id.h>
/* To add new message IDs, append new identifiers at the end of the list.
*
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c
index d70334476c7..a6e201a9b36 100644
--- a/rpc/rpc-lib/src/rpc-transport.c
+++ b/rpc/rpc-lib/src/rpc-transport.c
@@ -12,17 +12,9 @@
#include <stdlib.h>
#include <stdio.h>
#include <sys/poll.h>
-#include <fnmatch.h>
#include <stdint.h>
-#include "logging.h"
#include "rpc-transport.h"
-#include "glusterfs.h"
-/* FIXME: xlator.h is needed for volume_option_t, need to define the datatype
- * in some other header
- */
-#include "xlator.h"
-#include "list.h"
#ifndef GF_OPTION_LIST_EMPTY
#define GF_OPTION_LIST_EMPTY(_opt) (_opt->value[0] == NULL)
@@ -68,17 +60,6 @@ out:
}
int32_t
-rpc_transport_get_myname(rpc_transport_t *this, char *hostname, int hostlen)
-{
- int32_t ret = -1;
- GF_VALIDATE_OR_GOTO("rpc", this, out);
-
- ret = this->ops->get_myname(this, hostname, hostlen);
-out:
- return ret;
-}
-
-int32_t
rpc_transport_get_peername(rpc_transport_t *this, char *hostname, int hostlen)
{
int32_t ret = -1;
@@ -92,14 +73,10 @@ out:
int
rpc_transport_throttle(rpc_transport_t *this, gf_boolean_t onoff)
{
- int ret = 0;
-
if (!this->ops->throttle)
return -ENOSYS;
- ret = this->ops->throttle(this, onoff);
-
- return ret;
+ return this->ops->throttle(this, onoff);
}
int32_t
@@ -144,6 +121,8 @@ rpc_transport_pollin_alloc(rpc_transport_t *this, struct iovec *vector,
goto out;
}
+ msg->trans = this;
+
if (count > 1) {
msg->vectored = 1;
}
@@ -159,6 +138,31 @@ out:
return msg;
}
+void
+rpc_transport_cleanup(rpc_transport_t *trans)
+{
+ if (!trans)
+ return;
+
+ if (trans->fini)
+ trans->fini(trans);
+
+ if (trans->options) {
+ dict_unref(trans->options);
+ trans->options = NULL;
+ }
+
+ GF_FREE(trans->name);
+
+ if (trans->xl)
+ pthread_mutex_destroy(&trans->lock);
+
+ if (trans->dl_handle)
+ dlclose(trans->dl_handle);
+
+ GF_FREE(trans);
+}
+
rpc_transport_t *
rpc_transport_load(glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
{
@@ -166,7 +170,7 @@ rpc_transport_load(glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
char *name = NULL;
void *handle = NULL;
char *type = NULL;
- char str[] = "ERROR";
+ static char str[] = "ERROR";
int32_t ret = -1;
int is_tcp = 0, is_unix = 0, is_ibsdp = 0;
volume_opt_list_t *vol_opt = NULL;
@@ -191,9 +195,9 @@ rpc_transport_load(glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
type = str;
/* Backward compatibility */
- ret = dict_get_str(options, "transport-type", &type);
+ ret = dict_get_str_sizen(options, "transport-type", &type);
if (ret < 0) {
- ret = dict_set_str(options, "transport-type", "socket");
+ ret = dict_set_str_sizen(options, "transport-type", "socket");
if (ret < 0)
gf_log("dict", GF_LOG_DEBUG, "setting transport-type failed");
else
@@ -215,15 +219,16 @@ rpc_transport_load(glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
is_ibsdp = strcmp(type, "ib-sdp");
if ((is_tcp == 0) || (is_unix == 0) || (is_ibsdp == 0)) {
if (is_unix == 0)
- ret = dict_set_str(options, "transport.address-family", "unix");
+ ret = dict_set_str_sizen(options, "transport.address-family",
+ "unix");
if (is_ibsdp == 0)
- ret = dict_set_str(options, "transport.address-family",
- "inet-sdp");
+ ret = dict_set_str_sizen(options, "transport.address-family",
+ "inet-sdp");
if (ret < 0)
gf_log("dict", GF_LOG_DEBUG, "setting address-family failed");
- ret = dict_set_str(options, "transport-type", "socket");
+ ret = dict_set_str_sizen(options, "transport-type", "socket");
if (ret < 0)
gf_log("dict", GF_LOG_DEBUG, "setting transport-type failed");
}
@@ -232,9 +237,9 @@ rpc_transport_load(glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
/* client-bind-insecure is for clients protocol, and
* bind-insecure for glusterd. Both mutually exclusive
*/
- ret = dict_get_str(options, "client-bind-insecure", &type);
+ ret = dict_get_str_sizen(options, "client-bind-insecure", &type);
if (ret)
- ret = dict_get_str(options, "bind-insecure", &type);
+ ret = dict_get_str_sizen(options, "bind-insecure", &type);
if (ret == 0) {
ret = gf_string2boolean(type, &bind_insecure);
if (ret < 0) {
@@ -253,7 +258,7 @@ rpc_transport_load(glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
trans->bind_insecure = 1;
}
- ret = dict_get_str(options, "transport-type", &type);
+ ret = dict_get_str_sizen(options, "transport-type", &type);
if (ret < 0) {
gf_log("rpc-transport", GF_LOG_ERROR,
"'option transport-type <xx>' missing in volume '%s'",
@@ -266,6 +271,10 @@ rpc_transport_load(glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
goto fail;
}
+ if (dict_get_sizen(options, "notify-poller-death")) {
+ trans->notify_poller_death = 1;
+ }
+
gf_log("rpc-transport", GF_LOG_DEBUG, "attempt to load file %s", name);
handle = dlopen(name, RTLD_NOW);
@@ -328,7 +337,7 @@ rpc_transport_load(glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
}
}
- trans->options = options;
+ trans->options = dict_ref(options);
pthread_mutex_init(&trans->lock, NULL);
trans->xl = this;
@@ -341,6 +350,7 @@ rpc_transport_load(glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
}
INIT_LIST_HEAD(&trans->list);
+ GF_ATOMIC_INIT(trans->disconnect_progress, 0);
return_trans = trans;
@@ -350,15 +360,7 @@ rpc_transport_load(glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
fail:
if (!success) {
- if (trans) {
- GF_FREE(trans->name);
-
- if (trans->dl_handle)
- dlclose(trans->dl_handle);
-
- GF_FREE(trans);
- }
-
+ rpc_transport_cleanup(trans);
GF_FREE(name);
return_trans = NULL;
@@ -437,13 +439,10 @@ fail:
return ret;
}
-int32_t
+static void
rpc_transport_destroy(rpc_transport_t *this)
{
struct dnscache6 *cache = NULL;
- int32_t ret = -1;
-
- GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
if (this->clnt_options)
dict_unref(this->clnt_options);
@@ -471,10 +470,6 @@ rpc_transport_destroy(rpc_transport_t *this)
}
GF_FREE(this);
-
- ret = 0;
-fail:
- return ret;
}
rpc_transport_t *
@@ -557,16 +552,17 @@ rpc_transport_keepalive_options_set(dict_t *options, int32_t interval,
GF_ASSERT(options);
GF_ASSERT((interval > 0) || (time > 0));
- ret = dict_set_int32(options, "transport.socket.keepalive-interval",
- interval);
+ ret = dict_set_int32_sizen(options, "transport.socket.keepalive-interval",
+ interval);
if (ret)
goto out;
- ret = dict_set_int32(options, "transport.socket.keepalive-time", time);
+ ret = dict_set_int32_sizen(options, "transport.socket.keepalive-time",
+ time);
if (ret)
goto out;
- ret = dict_set_int32(options, "transport.tcp-user-timeout", timeout);
+ ret = dict_set_int32_sizen(options, "transport.tcp-user-timeout", timeout);
if (ret)
goto out;
out:
@@ -574,19 +570,14 @@ out:
}
int
-rpc_transport_unix_options_build(dict_t **options, char *filepath,
+rpc_transport_unix_options_build(dict_t *dict, char *filepath,
int frame_timeout)
{
- dict_t *dict = NULL;
char *fpath = NULL;
int ret = -1;
GF_ASSERT(filepath);
- GF_ASSERT(options);
-
- dict = dict_new();
- if (!dict)
- goto out;
+ GF_VALIDATE_OR_GOTO("rpc-transport", dict, out);
fpath = gf_strdup(filepath);
if (!fpath) {
@@ -594,62 +585,52 @@ rpc_transport_unix_options_build(dict_t **options, char *filepath,
goto out;
}
- ret = dict_set_dynstr(dict, "transport.socket.connect-path", fpath);
+ ret = dict_set_dynstr_sizen(dict, "transport.socket.connect-path", fpath);
if (ret) {
GF_FREE(fpath);
goto out;
}
- ret = dict_set_str(dict, "transport.address-family", "unix");
+ ret = dict_set_str_sizen(dict, "transport.address-family", "unix");
if (ret)
goto out;
- ret = dict_set_str(dict, "transport.socket.nodelay", "off");
+ ret = dict_set_str_sizen(dict, "transport.socket.nodelay", "off");
if (ret)
goto out;
- ret = dict_set_str(dict, "transport-type", "socket");
+ ret = dict_set_str_sizen(dict, "transport-type", "socket");
if (ret)
goto out;
- ret = dict_set_str(dict, "transport.socket.keepalive", "off");
+ ret = dict_set_str_sizen(dict, "transport.socket.keepalive", "off");
if (ret)
goto out;
if (frame_timeout > 0) {
- ret = dict_set_int32(dict, "frame-timeout", frame_timeout);
+ ret = dict_set_int32_sizen(dict, "frame-timeout", frame_timeout);
if (ret)
goto out;
}
-
- *options = dict;
out:
- if (ret && dict) {
- dict_unref(dict);
- }
return ret;
}
int
-rpc_transport_inet_options_build(dict_t **options, const char *hostname,
- int port)
+rpc_transport_inet_options_build(dict_t *dict, const char *hostname, int port,
+ char *af)
{
- dict_t *dict = NULL;
char *host = NULL;
int ret = -1;
#ifdef IPV6_DEFAULT
- char *addr_family = "inet6";
+ static char *addr_family = "inet6";
#else
- char *addr_family = "inet";
+ static char *addr_family = "inet";
#endif
- GF_ASSERT(options);
GF_ASSERT(hostname);
GF_ASSERT(port >= 1024);
-
- dict = dict_new();
- if (!dict)
- goto out;
+ GF_VALIDATE_OR_GOTO("rpc-transport", dict, out);
host = gf_strdup((char *)hostname);
if (!host) {
@@ -657,7 +638,7 @@ rpc_transport_inet_options_build(dict_t **options, const char *hostname,
goto out;
}
- ret = dict_set_dynstr(dict, "remote-host", host);
+ ret = dict_set_dynstr_sizen(dict, "remote-host", host);
if (ret) {
gf_log(THIS->name, GF_LOG_WARNING, "failed to set remote-host with %s",
host);
@@ -665,32 +646,27 @@ rpc_transport_inet_options_build(dict_t **options, const char *hostname,
goto out;
}
- ret = dict_set_int32(dict, "remote-port", port);
+ ret = dict_set_int32_sizen(dict, "remote-port", port);
if (ret) {
gf_log(THIS->name, GF_LOG_WARNING, "failed to set remote-port with %d",
port);
goto out;
}
- ret = dict_set_str(dict, "address-family", addr_family);
+ ret = dict_set_str_sizen(dict, "address-family",
+ (af != NULL ? af : addr_family));
if (ret) {
gf_log(THIS->name, GF_LOG_WARNING, "failed to set address-family to %s",
addr_family);
goto out;
}
- ret = dict_set_str(dict, "transport-type", "socket");
+ ret = dict_set_str_sizen(dict, "transport-type", "socket");
if (ret) {
gf_log(THIS->name, GF_LOG_WARNING,
"failed to set trans-type with socket");
goto out;
}
-
- *options = dict;
out:
- if (ret && dict) {
- dict_unref(dict);
- }
-
return ret;
}
diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h
index c238501b5c7..c499f0bb955 100644
--- a/rpc/rpc-lib/src/rpc-transport.h
+++ b/rpc/rpc-lib/src/rpc-transport.h
@@ -56,17 +56,18 @@
struct rpc_transport_ops;
typedef struct rpc_transport rpc_transport_t;
-#include "dict.h"
-#include "compat.h"
+#include <glusterfs/dict.h>
+#include <glusterfs/compat.h>
+#include <glusterfs/async.h>
#include "rpcsvc-common.h"
struct peer_info {
- struct sockaddr_storage sockaddr;
- socklen_t sockaddr_len;
- char identifier[UNIX_PATH_MAX];
// OP-VERSION of clients
uint32_t max_op_version;
uint32_t min_op_version;
+ struct sockaddr_storage sockaddr;
+ socklen_t sockaddr_len;
+ char identifier[UNIX_PATH_MAX];
// Volume mounted by client
char volname[NAME_MAX];
};
@@ -97,47 +98,48 @@ typedef enum {
RPC_TRANSPORT_MSG_RECEIVED, /* Complete rpc msg has been read */
RPC_TRANSPORT_CONNECT, /* client is connected to server */
RPC_TRANSPORT_MSG_SENT,
+ RPC_TRANSPORT_EVENT_THREAD_DIED /* event-thread has died */
} rpc_transport_event_t;
struct rpc_transport_msg {
struct iovec *rpchdr;
- int rpchdrcount;
struct iovec *proghdr;
+ int rpchdrcount;
int proghdrcount;
struct iovec *progpayload;
- int progpayloadcount;
struct iobref *iobref;
+ int progpayloadcount;
};
typedef struct rpc_transport_msg rpc_transport_msg_t;
struct rpc_transport_rsp {
struct iovec *rsphdr;
- int rsphdr_count;
struct iovec *rsp_payload;
+ int rsphdr_count;
int rsp_payload_count;
struct iobref *rsp_iobref;
};
typedef struct rpc_transport_rsp rpc_transport_rsp_t;
struct rpc_transport_req {
+ struct rpc_req *rpc_req;
rpc_transport_msg_t msg;
rpc_transport_rsp_t rsp;
- struct rpc_req *rpc_req;
};
typedef struct rpc_transport_req rpc_transport_req_t;
struct rpc_transport_reply {
- rpc_transport_msg_t msg;
void *private;
+ rpc_transport_msg_t msg;
};
typedef struct rpc_transport_reply rpc_transport_reply_t;
struct rpc_transport_data {
- char is_request;
union {
rpc_transport_req_t req;
rpc_transport_reply_t reply;
} data;
+ char is_request;
};
typedef struct rpc_transport_data rpc_transport_data_t;
@@ -145,25 +147,15 @@ typedef struct rpc_transport_data rpc_transport_data_t;
* rpc_request, hence these should be removed from request_info
*/
struct rpc_request_info {
- uint32_t xid;
int prognum;
int progver;
- int procnum;
void *rpc_req; /* struct rpc_req */
rpc_transport_rsp_t rsp;
+ int procnum;
+ uint32_t xid;
};
typedef struct rpc_request_info rpc_request_info_t;
-struct rpc_transport_pollin {
- struct iovec vector[MAX_IOVEC];
- int count;
- char vectored;
- void *private;
- struct iobref *iobref;
- char is_reply;
-};
-typedef struct rpc_transport_pollin rpc_transport_pollin_t;
-
typedef int (*rpc_transport_notify_t)(rpc_transport_t *, void *mydata,
rpc_transport_event_t, void *data, ...);
@@ -181,9 +173,6 @@ struct rpc_transport {
void *mydata;
pthread_mutex_t lock;
gf_atomic_t refcount;
-
- int32_t outstanding_rpc_count;
-
glusterfs_ctx_t *ctx;
dict_t *options;
char *name;
@@ -201,20 +190,36 @@ struct rpc_transport {
uint64_t total_bytes_read;
uint64_t total_bytes_write;
uint32_t xid; /* RPC/XID used for callbacks */
+ int32_t outstanding_rpc_count;
struct list_head list;
- int bind_insecure;
void *dl_handle; /* handle of dlopen() */
char *ssl_name;
dict_t *clnt_options; /* store options received from
* client */
+ gf_atomic_t disconnect_progress;
+ int bind_insecure;
/* connect_failed: saves the connect() syscall status as socket_t
* member holding connect() status can't be accessed by higher gfapi
* layer or in client management notification handler functions
*/
gf_boolean_t connect_failed;
+ char notify_poller_death;
+ char poller_death_accept;
};
+struct rpc_transport_pollin {
+ struct rpc_transport *trans;
+ void *private;
+ struct iobref *iobref;
+ struct iovec vector[MAX_IOVEC];
+ gf_async_t async;
+ int count;
+ char is_reply;
+ char vectored;
+};
+typedef struct rpc_transport_pollin rpc_transport_pollin_t;
+
struct rpc_transport_ops {
/* no need of receive op, msg will be delivered through an event
* notification
@@ -247,9 +252,6 @@ int32_t
rpc_transport_disconnect(rpc_transport_t *this, gf_boolean_t wait);
int32_t
-rpc_transport_destroy(rpc_transport_t *this);
-
-int32_t
rpc_transport_notify(rpc_transport_t *this, rpc_transport_event_t event,
void *data, ...);
@@ -280,9 +282,6 @@ rpc_transport_get_peeraddr(rpc_transport_t *this, char *peeraddr, int addrlen,
struct sockaddr_storage *sa, size_t salen);
int32_t
-rpc_transport_get_myname(rpc_transport_t *this, char *hostname, int hostlen);
-
-int32_t
rpc_transport_get_myaddr(rpc_transport_t *this, char *peeraddr, int addrlen,
struct sockaddr_storage *sa, size_t salen);
@@ -301,10 +300,13 @@ rpc_transport_keepalive_options_set(dict_t *options, int32_t interval,
int32_t time, int32_t timeout);
int
-rpc_transport_unix_options_build(dict_t **options, char *filepath,
+rpc_transport_unix_options_build(dict_t *options, char *filepath,
int frame_timeout);
int
-rpc_transport_inet_options_build(dict_t **options, const char *hostname,
- int port);
+rpc_transport_inet_options_build(dict_t *options, const char *hostname,
+ int port, char *af);
+
+void
+rpc_transport_cleanup(rpc_transport_t *);
#endif /* __RPC_TRANSPORT_H__ */
diff --git a/rpc/rpc-lib/src/rpcsvc-auth.c b/rpc/rpc-lib/src/rpcsvc-auth.c
index da260ade0c0..8e76b4188bb 100644
--- a/rpc/rpc-lib/src/rpcsvc-auth.c
+++ b/rpc/rpc-lib/src/rpcsvc-auth.c
@@ -9,8 +9,7 @@
*/
#include "rpcsvc.h"
-#include "logging.h"
-#include "dict.h"
+#include <glusterfs/dict.h>
extern rpcsvc_auth_t *
rpcsvc_auth_null_init(rpcsvc_t *svc, dict_t *options);
@@ -274,6 +273,44 @@ rpcsvc_set_root_squash(rpcsvc_t *svc, dict_t *options)
}
int
+rpcsvc_set_all_squash(rpcsvc_t *svc, dict_t *options)
+{
+ int ret = -1;
+
+ uid_t anonuid = -1;
+ gid_t anongid = -1;
+
+ GF_ASSERT(svc);
+ GF_ASSERT(options);
+
+ ret = dict_get_str_boolean(options, "all-squash", 0);
+ if (ret != -1)
+ svc->all_squash = ret;
+ else
+ svc->all_squash = _gf_false;
+
+ ret = dict_get_uint32(options, "anonuid", &anonuid);
+ if (!ret)
+ svc->anonuid = anonuid;
+ else
+ svc->anonuid = RPC_NOBODY_UID;
+
+ ret = dict_get_uint32(options, "anongid", &anongid);
+ if (!ret)
+ svc->anongid = anongid;
+ else
+ svc->anongid = RPC_NOBODY_GID;
+
+ if (svc->all_squash)
+ gf_log(GF_RPCSVC, GF_LOG_DEBUG,
+ "all squashing enabled "
+ "(uid=%d, gid=%d)",
+ svc->anonuid, svc->anongid);
+
+ return 0;
+}
+
+int
rpcsvc_auth_init(rpcsvc_t *svc, dict_t *options)
{
int ret = -1;
@@ -283,6 +320,7 @@ rpcsvc_auth_init(rpcsvc_t *svc, dict_t *options)
(void)rpcsvc_set_allow_insecure(svc, options);
(void)rpcsvc_set_root_squash(svc, options);
+ (void)rpcsvc_set_all_squash(svc, options);
(void)rpcsvc_set_addr_namelookup(svc, options);
ret = rpcsvc_auth_add_initers(svc);
if (ret == -1) {
@@ -316,6 +354,10 @@ rpcsvc_auth_reconf(rpcsvc_t *svc, dict_t *options)
if (ret)
return (-1);
+ ret = rpcsvc_set_all_squash(svc, options);
+ if (ret)
+ return (-1);
+
return rpcsvc_set_addr_namelookup(svc, options);
}
diff --git a/rpc/rpc-lib/src/rpcsvc-common.h b/rpc/rpc-lib/src/rpcsvc-common.h
index 6a36b1f1b9b..6c4ec49a6ef 100644
--- a/rpc/rpc-lib/src/rpcsvc-common.h
+++ b/rpc/rpc-lib/src/rpcsvc-common.h
@@ -12,11 +12,8 @@
#define _RPCSVC_COMMON_H
#include <pthread.h>
-#include "list.h"
-#include "compat.h"
-#include "glusterfs.h"
-#include "dict.h"
-#include "xlator.h"
+#include <glusterfs/compat.h>
+#include <glusterfs/dict.h>
typedef enum {
RPCSVC_EVENT_ACCEPT,
@@ -42,18 +39,12 @@ typedef struct rpcsvc_state {
pthread_rwlock_t rpclock;
- unsigned int memfactor;
-
/* List of the authentication schemes available. */
struct list_head authschemes;
/* Reference to the options */
dict_t *options;
- /* Allow insecure ports. */
- gf_boolean_t allow_insecure;
- gf_boolean_t register_portmap;
- gf_boolean_t root_squash;
uid_t anonuid;
gid_t anongid;
glusterfs_ctx_t *ctx;
@@ -68,6 +59,8 @@ typedef struct rpcsvc_state {
struct list_head notify;
int notify_count;
+ unsigned int memfactor;
+
xlator_t *xl; /* xlator */
void *mydata;
rpcsvc_notify_t notifyfn;
@@ -79,6 +72,11 @@ typedef struct rpcsvc_state {
gf_boolean_t addr_namelookup;
/* determine whether throttling is needed, by default OFF */
gf_boolean_t throttle;
+ /* Allow insecure ports. */
+ gf_boolean_t allow_insecure;
+ gf_boolean_t register_portmap;
+ gf_boolean_t root_squash;
+ gf_boolean_t all_squash;
} rpcsvc_t;
/* DRC START */
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index c6545193a11..39910d481bf 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -10,19 +10,16 @@
#include "rpcsvc.h"
#include "rpc-transport.h"
-#include "dict.h"
-#include "logging.h"
-#include "byte-order.h"
-#include "common-utils.h"
-#include "compat-errno.h"
-#include "list.h"
+#include <glusterfs/dict.h>
+#include <glusterfs/byte-order.h>
+#include <glusterfs/compat-errno.h>
+#include <glusterfs/statedump.h>
#include "xdr-rpc.h"
-#include "iobuf.h"
-#include "globals.h"
+#include <glusterfs/iobuf.h>
#include "xdr-common.h"
#include "xdr-generic.h"
#include "rpc-common-xdr.h"
-#include "syncop.h"
+#include <glusterfs/syncop.h>
#include "rpc-drc.h"
#include "protocol-common.h"
@@ -36,15 +33,20 @@
#include <fnmatch.h>
#include <stdarg.h>
#include <stdio.h>
+#include <dlfcn.h>
#ifdef IPV6_DEFAULT
#include <netconfig.h>
#endif
#include "xdr-rpcclnt.h"
-#include "glusterfs-acl.h"
+#include <glusterfs/glusterfs-acl.h>
-struct rpcsvc_program gluster_dump_prog;
+#ifndef PTHREAD_MUTEX_ADAPTIVE_NP
+#define PTHREAD_MUTEX_ADAPTIVE_NP PTHREAD_MUTEX_DEFAULT
+#endif
+
+static struct rpcsvc_program gluster_dump_prog;
#define rpcsvc_alloc_request(svc, request) \
do { \
@@ -63,10 +65,41 @@ rpcsvc_get_listener(rpcsvc_t *svc, uint16_t port, rpc_transport_t *trans);
int
rpcsvc_notify(rpc_transport_t *trans, void *mydata, rpc_transport_event_t event,
void *data, ...);
+void *
+rpcsvc_request_handler(void *arg);
static int
rpcsvc_match_subnet_v4(const char *addrtok, const char *ipaddr);
+static void
+rpcsvc_toggle_queue_status(rpcsvc_program_t *prog,
+ rpcsvc_request_queue_t *queue,
+ unsigned long status[])
+{
+ unsigned queue_index = queue - prog->request_queue;
+
+ status[queue_index / __BITS_PER_LONG] ^= (1UL << (queue_index %
+ __BITS_PER_LONG));
+}
+
+int
+rpcsvc_get_free_queue_index(rpcsvc_program_t *prog)
+{
+ unsigned i, j = 0;
+
+ for (i = 0; i < EVENT_MAX_THREADS / __BITS_PER_LONG; i++)
+ if (prog->request_queue_status[i] != ULONG_MAX) {
+ j = __builtin_ctzl(~prog->request_queue_status[i]);
+ break;
+ }
+
+ if (i == EVENT_MAX_THREADS / __BITS_PER_LONG)
+ return -1;
+
+ prog->request_queue_status[i] |= (1UL << j);
+ return i * __BITS_PER_LONG + j;
+}
+
rpcsvc_notify_wrapper_t *
rpcsvc_notify_wrapper_alloc(void)
{
@@ -309,6 +342,10 @@ rpcsvc_program_actor(rpcsvc_request_t *req)
goto err;
}
+ if (svc->xl->ctx->measure_latency) {
+ timespec_now(&req->begin);
+ }
+
req->ownthread = program->ownthread;
req->synctask = program->synctask;
@@ -575,6 +612,73 @@ rpcsvc_check_and_reply_error(int ret, call_frame_t *frame, void *opaque)
return 0;
}
+void
+rpcsvc_queue_event_thread_death(rpcsvc_t *svc, rpcsvc_program_t *prog, int gen)
+{
+ rpcsvc_request_queue_t *queue = NULL;
+ int num = 0;
+ void *value = NULL;
+ rpcsvc_request_t *req = NULL;
+ char empty = 0;
+
+ value = pthread_getspecific(prog->req_queue_key);
+ if (value == NULL) {
+ return;
+ }
+
+ num = ((unsigned long)value) - 1;
+
+ queue = &prog->request_queue[num];
+
+ if (queue->gen == gen) {
+ /* duplicate event */
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "not queuing duplicate event thread death. "
+ "queue %d program %s",
+ num, prog->progname);
+ return;
+ }
+
+ rpcsvc_alloc_request(svc, req);
+ req->prognum = RPCSVC_INFRA_PROGRAM;
+ req->procnum = RPCSVC_PROC_EVENT_THREAD_DEATH;
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "queuing event thread death request to queue %d of program %s", num,
+ prog->progname);
+
+ pthread_mutex_lock(&queue->queue_lock);
+ {
+ empty = list_empty(&queue->request_queue);
+
+ list_add_tail(&req->request_list, &queue->request_queue);
+ queue->gen = gen;
+
+ if (empty && queue->waiting)
+ pthread_cond_signal(&queue->queue_cond);
+ }
+ pthread_mutex_unlock(&queue->queue_lock);
+
+ return;
+}
+
+int
+rpcsvc_handle_event_thread_death(rpcsvc_t *svc, rpc_transport_t *trans, int gen)
+{
+ rpcsvc_program_t *prog = NULL;
+
+ pthread_rwlock_rdlock(&svc->rpclock);
+ {
+ list_for_each_entry(prog, &svc->programs, program)
+ {
+ if (prog->ownthread)
+ rpcsvc_queue_event_thread_death(svc, prog, gen);
+ }
+ }
+ pthread_rwlock_unlock(&svc->rpclock);
+
+ return 0;
+}
+
int
rpcsvc_handle_rpc_call(rpcsvc_t *svc, rpc_transport_t *trans,
rpc_transport_pollin_t *msg)
@@ -585,9 +689,12 @@ rpcsvc_handle_rpc_call(rpcsvc_t *svc, rpc_transport_t *trans,
int ret = -1;
uint16_t port = 0;
gf_boolean_t is_unix = _gf_false, empty = _gf_false;
- gf_boolean_t unprivileged = _gf_false;
+ gf_boolean_t unprivileged = _gf_false, spawn_request_handler = 0;
drc_cached_op_t *reply = NULL;
rpcsvc_drc_globals_t *drc = NULL;
+ rpcsvc_request_queue_t *queue = NULL;
+ long num = 0;
+ void *value = NULL;
if (!trans || !svc)
return -1;
@@ -700,19 +807,81 @@ rpcsvc_handle_rpc_call(rpcsvc_t *svc, rpc_transport_t *trans,
ret = synctask_new(THIS->ctx->env, (synctask_fn_t)actor_fn,
rpcsvc_check_and_reply_error, NULL, req);
} else if (req->ownthread) {
- pthread_mutex_lock(&req->prog->queue_lock);
+ value = pthread_getspecific(req->prog->req_queue_key);
+ if (value == NULL) {
+ pthread_mutex_lock(&req->prog->thr_lock);
+ {
+ num = rpcsvc_get_free_queue_index(req->prog);
+ if (num != -1) {
+ num++;
+ value = (void *)num;
+ ret = pthread_setspecific(req->prog->req_queue_key,
+ value);
+ if (ret < 0) {
+ gf_log(GF_RPCSVC, GF_LOG_WARNING,
+ "setting request queue in TLS failed");
+ rpcsvc_toggle_queue_status(
+ req->prog, &req->prog->request_queue[num - 1],
+ req->prog->request_queue_status);
+ num = -1;
+ } else {
+ spawn_request_handler = 1;
+ }
+ }
+ }
+ pthread_mutex_unlock(&req->prog->thr_lock);
+ }
+
+ if (num == -1)
+ goto noqueue;
+
+ num = ((unsigned long)value) - 1;
+
+ queue = &req->prog->request_queue[num];
+
+ if (spawn_request_handler) {
+ ret = gf_thread_create(&queue->thread, NULL,
+ rpcsvc_request_handler, queue,
+ "rpcrqhnd");
+ if (!ret) {
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "spawned a request handler thread for queue %d",
+ (int)num);
+
+ req->prog->threadcount++;
+ } else {
+ gf_log(
+ GF_RPCSVC, GF_LOG_INFO,
+ "spawning a request handler thread for queue %d failed",
+ (int)num);
+ ret = pthread_setspecific(req->prog->req_queue_key, 0);
+ if (ret < 0) {
+ gf_log(GF_RPCSVC, GF_LOG_WARNING,
+ "resetting request queue in TLS failed");
+ }
+
+ rpcsvc_toggle_queue_status(
+ req->prog, &req->prog->request_queue[num - 1],
+ req->prog->request_queue_status);
+
+ goto noqueue;
+ }
+ }
+
+ pthread_mutex_lock(&queue->queue_lock);
{
- empty = list_empty(&req->prog->request_queue);
+ empty = list_empty(&queue->request_queue);
- list_add_tail(&req->request_list, &req->prog->request_queue);
+ list_add_tail(&req->request_list, &queue->request_queue);
- if (empty)
- pthread_cond_signal(&req->prog->queue_cond);
+ if (empty && queue->waiting)
+ pthread_cond_signal(&queue->queue_cond);
}
- pthread_mutex_unlock(&req->prog->queue_lock);
+ pthread_mutex_unlock(&queue->queue_lock);
ret = 0;
} else {
+ noqueue:
ret = actor_fn(req);
}
}
@@ -839,6 +1008,12 @@ rpcsvc_notify(rpc_transport_t *trans, void *mydata, rpc_transport_event_t event,
"got MAP_XID event, which should have not come");
ret = 0;
break;
+
+ case RPC_TRANSPORT_EVENT_THREAD_DIED:
+ rpcsvc_handle_event_thread_death(svc, trans,
+ (int)(unsigned long)data);
+ ret = 0;
+ break;
}
out:
@@ -1164,9 +1339,9 @@ rpcsvc_transport_submit(rpc_transport_t *trans, struct iovec *rpchdr,
int progpayloadcount, struct iobref *iobref, void *priv)
{
int ret = -1;
- rpc_transport_reply_t reply = {{
+ rpc_transport_reply_t reply = {
0,
- }};
+ };
if ((!trans) || (!rpchdr) || (!rpchdr->iov_base)) {
goto out;
@@ -1320,10 +1495,18 @@ rpcsvc_submit_generic(rpcsvc_request_t *req, struct iovec *proghdr,
size_t hdrlen = 0;
char new_iobref = 0;
rpcsvc_drc_globals_t *drc = NULL;
+ gf_latency_t *lat = NULL;
if ((!req) || (!req->trans))
return -1;
+ if (req->prog && req->begin.tv_sec) {
+ if ((req->procnum >= 0) && (req->procnum < req->prog->numactors)) {
+ timespec_now(&req->end);
+ lat = &req->prog->latencies[req->procnum];
+ gf_latency_update(lat, &req->begin, &req->end);
+ }
+ }
trans = req->trans;
for (i = 0; i < hdrcount; i++) {
@@ -1654,6 +1837,15 @@ rpcsvc_submit_message(rpcsvc_request_t *req, struct iovec *proghdr,
iobref);
}
+void
+rpcsvc_program_destroy(rpcsvc_program_t *program)
+{
+ if (program) {
+ GF_FREE(program->latencies);
+ GF_FREE(program);
+ }
+}
+
int
rpcsvc_program_unregister(rpcsvc_t *svc, rpcsvc_program_t *program)
{
@@ -1663,6 +1855,18 @@ rpcsvc_program_unregister(rpcsvc_t *svc, rpcsvc_program_t *program)
goto out;
}
+ pthread_rwlock_rdlock(&svc->rpclock);
+ {
+ list_for_each_entry(prog, &svc->programs, program)
+ {
+ if ((prog->prognum == program->prognum) &&
+ (prog->progver == program->progver)) {
+ break;
+ }
+ }
+ }
+ pthread_rwlock_unlock(&svc->rpclock);
+
ret = rpcsvc_program_unregister_portmap(program);
if (ret == -1) {
gf_log(GF_RPCSVC, GF_LOG_ERROR,
@@ -1679,17 +1883,6 @@ rpcsvc_program_unregister(rpcsvc_t *svc, rpcsvc_program_t *program)
goto out;
}
#endif
- pthread_rwlock_rdlock(&svc->rpclock);
- {
- list_for_each_entry(prog, &svc->programs, program)
- {
- if ((prog->prognum == program->prognum) &&
- (prog->progver == program->progver)) {
- break;
- }
- }
- }
- pthread_rwlock_unlock(&svc->rpclock);
gf_log(GF_RPCSVC, GF_LOG_DEBUG,
"Program unregistered: %s, Num: %d,"
@@ -1710,6 +1903,8 @@ rpcsvc_program_unregister(rpcsvc_t *svc, rpcsvc_program_t *program)
ret = 0;
out:
+ rpcsvc_program_destroy(prog);
+
if (ret == -1) {
if (program) {
gf_log(GF_RPCSVC, GF_LOG_ERROR,
@@ -1804,6 +1999,7 @@ rpcsvc_create_listener(rpcsvc_t *svc, dict_t *options, char *name)
listener = rpcsvc_listener_alloc(svc, trans);
if (listener == NULL) {
+ ret = -1;
goto out;
}
@@ -1811,6 +2007,7 @@ rpcsvc_create_listener(rpcsvc_t *svc, dict_t *options, char *name)
out:
if (!listener && trans) {
rpc_transport_disconnect(trans, _gf_true);
+ rpc_transport_cleanup(trans);
}
return ret;
@@ -1877,6 +2074,7 @@ rpcsvc_create_listeners(rpcsvc_t *svc, dict_t *options, char *name)
goto out;
}
+ dict_del(options, "notify-poller-death");
GF_FREE(transport_name);
transport_name = NULL;
count++;
@@ -1961,55 +2159,84 @@ out:
void *
rpcsvc_request_handler(void *arg)
{
- rpcsvc_program_t *program = arg;
- rpcsvc_request_t *req = NULL;
+ rpcsvc_request_queue_t *queue = NULL;
+ rpcsvc_program_t *program = NULL;
+ rpcsvc_request_t *req = NULL, *tmp_req = NULL;
rpcsvc_actor_t *actor = NULL;
gf_boolean_t done = _gf_false;
int ret = 0;
+ struct list_head tmp_list;
+
+ queue = arg;
+ program = queue->program;
+
+ INIT_LIST_HEAD(&tmp_list);
if (!program)
return NULL;
while (1) {
- pthread_mutex_lock(&program->queue_lock);
+ pthread_mutex_lock(&queue->queue_lock);
{
- if (!program->alive && list_empty(&program->request_queue)) {
+ if (!program->alive && list_empty(&queue->request_queue)) {
done = 1;
goto unlock;
}
- while (list_empty(&program->request_queue) &&
- (program->threadcount <= program->eventthreadcount)) {
- pthread_cond_wait(&program->queue_cond, &program->queue_lock);
+ while (list_empty(&queue->request_queue)) {
+ queue->waiting = _gf_true;
+ pthread_cond_wait(&queue->queue_cond, &queue->queue_lock);
}
- if (program->threadcount > program->eventthreadcount) {
- done = 1;
- program->threadcount--;
-
- gf_log(GF_RPCSVC, GF_LOG_INFO,
- "program '%s' thread terminated; "
- "total count:%d",
- program->progname, program->threadcount);
- } else if (!list_empty(&program->request_queue)) {
- req = list_entry(program->request_queue.next, typeof(*req),
- request_list);
+ queue->waiting = _gf_false;
- list_del_init(&req->request_list);
+ if (!list_empty(&queue->request_queue)) {
+ INIT_LIST_HEAD(&tmp_list);
+ list_splice_init(&queue->request_queue, &tmp_list);
}
}
unlock:
- pthread_mutex_unlock(&program->queue_lock);
+ pthread_mutex_unlock(&queue->queue_lock);
- if (req) {
- THIS = req->svc->xl;
- actor = rpcsvc_program_actor(req);
- ret = actor->actor(req);
+ list_for_each_entry_safe(req, tmp_req, &tmp_list, request_list)
+ {
+ if (req) {
+ list_del_init(&req->request_list);
- if (ret != 0) {
- rpcsvc_check_and_reply_error(ret, NULL, req);
+ if (req->prognum == RPCSVC_INFRA_PROGRAM) {
+ switch (req->procnum) {
+ case RPCSVC_PROC_EVENT_THREAD_DEATH:
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "event thread died, exiting request handler "
+ "thread for queue %d of program %s",
+ (int)(queue - &program->request_queue[0]),
+ program->progname);
+ done = 1;
+ pthread_mutex_lock(&program->thr_lock);
+ {
+ rpcsvc_toggle_queue_status(
+ program, queue,
+ program->request_queue_status);
+ program->threadcount--;
+ }
+ pthread_mutex_unlock(&program->thr_lock);
+ rpcsvc_request_destroy(req);
+ break;
+
+ default:
+ break;
+ }
+ } else {
+ THIS = req->svc->xl;
+ actor = rpcsvc_program_actor(req);
+ ret = actor->actor(req);
+
+ if (ret != 0) {
+ rpcsvc_check_and_reply_error(ret, NULL, req);
+ }
+ req = NULL;
+ }
}
- req = NULL;
}
if (done)
@@ -2020,61 +2247,14 @@ rpcsvc_request_handler(void *arg)
}
int
-rpcsvc_spawn_threads(rpcsvc_t *svc, rpcsvc_program_t *program)
-{
- int ret = 0, delta = 0, creates = 0;
-
- if (!program || !svc)
- goto out;
-
- pthread_mutex_lock(&program->queue_lock);
- {
- delta = program->eventthreadcount - program->threadcount;
-
- if (delta >= 0) {
- while (delta--) {
- ret = gf_thread_create(&program->thread, NULL,
- rpcsvc_request_handler, program,
- "rpcrqhnd");
- if (!ret) {
- program->threadcount++;
- creates++;
- }
- }
-
- if (creates) {
- gf_log(GF_RPCSVC, GF_LOG_INFO,
- "spawned %d threads for program '%s'; "
- "total count:%d",
- creates, program->progname, program->threadcount);
- }
- } else {
- gf_log(GF_RPCSVC, GF_LOG_INFO,
- "terminating %d threads for program '%s'", -delta,
- program->progname);
-
- /* this signal is to just wake up the threads so they
- * test for the change in eventthreadcount and kill
- * themselves until the program thread count becomes
- * equal to the event thread count
- */
- pthread_cond_broadcast(&program->queue_cond);
- }
- }
- pthread_mutex_unlock(&program->queue_lock);
-
-out:
- return creates;
-}
-
-int
rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program,
gf_boolean_t add_to_head)
{
- int ret = -1;
- int creates = -1;
+ int ret = -1, i = 0;
rpcsvc_program_t *newprog = NULL;
char already_registered = 0;
+ pthread_mutexattr_t attr[EVENT_MAX_THREADS];
+ pthread_mutexattr_t thr_attr;
if (!svc) {
goto out;
@@ -2108,25 +2288,45 @@ rpcsvc_program_register(rpcsvc_t *svc, rpcsvc_program_t *program,
}
memcpy(newprog, program, sizeof(*program));
+ newprog->latencies = gf_latency_new(program->numactors);
+ if (!newprog->latencies) {
+ rpcsvc_program_destroy(newprog);
+ goto out;
+ }
INIT_LIST_HEAD(&newprog->program);
- INIT_LIST_HEAD(&newprog->request_queue);
- pthread_mutex_init(&newprog->queue_lock, NULL);
- pthread_cond_init(&newprog->queue_cond, NULL);
+ pthread_mutexattr_init(&thr_attr);
+ pthread_mutexattr_settype(&thr_attr, PTHREAD_MUTEX_ADAPTIVE_NP);
+
+ for (i = 0; i < EVENT_MAX_THREADS; i++) {
+ pthread_mutexattr_init(&attr[i]);
+ pthread_mutexattr_settype(&attr[i], PTHREAD_MUTEX_ADAPTIVE_NP);
+ INIT_LIST_HEAD(&newprog->request_queue[i].request_queue);
+ pthread_mutex_init(&newprog->request_queue[i].queue_lock, &attr[i]);
+ pthread_cond_init(&newprog->request_queue[i].queue_cond, NULL);
+ newprog->request_queue[i].program = newprog;
+ }
+
+ pthread_mutex_init(&newprog->thr_lock, &thr_attr);
+ pthread_cond_init(&newprog->thr_cond, NULL);
newprog->alive = _gf_true;
+ if (gf_async_ctrl.enabled) {
+ newprog->ownthread = _gf_false;
+ newprog->synctask = _gf_false;
+ }
+
/* make sure synctask gets priority over ownthread */
if (newprog->synctask)
newprog->ownthread = _gf_false;
if (newprog->ownthread) {
- newprog->eventthreadcount = 1;
- creates = rpcsvc_spawn_threads(svc, newprog);
+ struct event_pool *ep = svc->ctx->event_pool;
+ newprog->eventthreadcount = ep->eventthreadcount;
- if (creates < 1) {
- goto out;
- }
+ pthread_key_create(&newprog->req_queue_key, NULL);
+ newprog->thr_queue = 1;
}
pthread_rwlock_wrlock(&svc->rpclock);
@@ -2366,7 +2566,7 @@ rpcsvc_reconfigure_options(rpcsvc_t *svc, dict_t *options)
*/
dict_del(svc->options, srchkey);
if (!dict_get_str(options, srchkey, &keyval)) {
- ret = dict_set_str(svc->options, srchkey, keyval);
+ ret = dict_set_dynstr_with_alloc(svc->options, srchkey, keyval);
if (ret < 0) {
gf_log(GF_RPCSVC, GF_LOG_ERROR, "dict_set_str error");
GF_FREE(srchkey);
@@ -2398,7 +2598,7 @@ rpcsvc_reconfigure_options(rpcsvc_t *svc, dict_t *options)
*/
dict_del(svc->options, srchkey);
if (!dict_get_str(options, srchkey, &keyval)) {
- ret = dict_set_str(svc->options, srchkey, keyval);
+ ret = dict_set_dynstr_with_alloc(svc->options, srchkey, keyval);
if (ret < 0) {
gf_log(GF_RPCSVC, GF_LOG_ERROR, "dict_set_str error");
GF_FREE(srchkey);
@@ -2418,18 +2618,13 @@ rpcsvc_reconfigure_options(rpcsvc_t *svc, dict_t *options)
}
int
-rpcsvc_transport_unix_options_build(dict_t **options, char *filepath)
+rpcsvc_transport_unix_options_build(dict_t *dict, char *filepath)
{
- dict_t *dict = NULL;
char *fpath = NULL;
int ret = -1;
GF_ASSERT(filepath);
- GF_ASSERT(options);
-
- dict = dict_new();
- if (!dict)
- goto out;
+ GF_VALIDATE_OR_GOTO("rpcsvc", dict, out);
fpath = gf_strdup(filepath);
if (!fpath) {
@@ -2452,13 +2647,9 @@ rpcsvc_transport_unix_options_build(dict_t **options, char *filepath)
ret = dict_set_str(dict, "transport-type", "socket");
if (ret)
goto out;
-
- *options = dict;
out:
if (ret) {
GF_FREE(fpath);
- if (dict)
- dict_unref(dict);
}
return ret;
}
@@ -2553,6 +2744,43 @@ rpcsvc_get_throttle(rpcsvc_t *svc)
return svc->throttle;
}
+/* Function call to cleanup resources for svc
+ */
+int
+rpcsvc_destroy(rpcsvc_t *svc)
+{
+ struct rpcsvc_auth_list *auth = NULL;
+ struct rpcsvc_auth_list *tmp = NULL;
+ rpcsvc_listener_t *listener = NULL;
+ rpcsvc_listener_t *next = NULL;
+ int ret = 0;
+
+ if (!svc)
+ return ret;
+
+ list_for_each_entry_safe(listener, next, &svc->listeners, list)
+ {
+ rpcsvc_listener_destroy(listener);
+ }
+
+ list_for_each_entry_safe(auth, tmp, &svc->authschemes, authlist)
+ {
+ list_del_init(&auth->authlist);
+ GF_FREE(auth);
+ }
+
+ rpcsvc_program_unregister(svc, &gluster_dump_prog);
+ if (svc->rxpool) {
+ mem_pool_destroy(svc->rxpool);
+ svc->rxpool = NULL;
+ }
+
+ pthread_rwlock_destroy(&svc->rpclock);
+ GF_FREE(svc);
+
+ return ret;
+}
+
/* The global RPC service initializer.
*/
rpcsvc_t *
@@ -2649,6 +2877,10 @@ rpcsvc_transport_peer_check_search(dict_t *options, char *pattern, char *ip,
}
dup_addrstr = gf_strdup(addrstr);
+ if (dup_addrstr == NULL) {
+ ret = -1;
+ goto err;
+ }
addrtok = strtok_r(dup_addrstr, ",", &svptr);
while (addrtok) {
/* CASEFOLD not present on Solaris */
@@ -2972,10 +3204,6 @@ rpcsvc_match_subnet_v4(const char *addrtok, const char *ipaddr)
if (inet_pton(AF_INET, ipaddr, &sin1.sin_addr) == 0)
goto out;
- /* Find the network socket addr of subnet pattern */
- if (inet_pton(AF_INET, netaddr, &sin2.sin_addr) == 0)
- goto out;
-
slash = strchr(netaddr, '/');
if (slash) {
*slash = '\0';
@@ -2988,9 +3216,16 @@ rpcsvc_match_subnet_v4(const char *addrtok, const char *ipaddr)
if (prefixlen > 31)
goto out;
} else {
+ /* if there is no '/', then this function wouldn't be called */
goto out;
}
+ /* Need to do this after removing '/', as inet_pton() take IP address as
+ * second argument. Once we get sin2, then comparison is oranges to orange
+ */
+ if (inet_pton(AF_INET, netaddr, &sin2.sin_addr) == 0)
+ goto out;
+
shift = IPv4_ADDR_SIZE - prefixlen;
mask.sin_addr.s_addr = htonl((uint32_t)~0 << shift);
@@ -3003,45 +3238,55 @@ out:
return ret;
}
-/* During reconfigure, Make sure to call this function after event-threads are
- * reconfigured as programs' threadcount will be made equal to event threads.
- */
-
-int
-rpcsvc_ownthread_reconf(rpcsvc_t *svc, int new_eventthreadcount)
+void
+rpcsvc_program_dump(rpcsvc_program_t *prog)
{
- int ret = -1;
- rpcsvc_program_t *program = NULL;
+ char key_prefix[GF_DUMP_MAX_BUF_LEN];
+ char key[GF_DUMP_MAX_BUF_LEN];
+ int i;
- if (!svc) {
- ret = 0;
- goto out;
+ snprintf(key_prefix, GF_DUMP_MAX_BUF_LEN, "%s", prog->progname);
+ gf_proc_dump_add_section("%s", key_prefix);
+
+ gf_proc_dump_build_key(key, key_prefix, "program-number");
+ gf_proc_dump_write(key, "%d", prog->prognum);
+
+ gf_proc_dump_build_key(key, key_prefix, "program-version");
+ gf_proc_dump_write(key, "%d", prog->progver);
+
+ strncat(key_prefix, ".latency",
+ sizeof(key_prefix) - strlen(key_prefix) - 1);
+
+ for (i = 0; i < prog->numactors; i++) {
+ gf_proc_dump_build_key(key, key_prefix, "%s", prog->actors[i].procname);
+ gf_latency_statedump_and_reset(key, &prog->latencies[i]);
}
+}
- pthread_rwlock_wrlock(&svc->rpclock);
+void
+rpcsvc_statedump(rpcsvc_t *svc)
+{
+ rpcsvc_program_t *prog = NULL;
+ int ret = 0;
+ ret = pthread_rwlock_tryrdlock(&svc->rpclock);
+ if (ret)
+ return;
{
- list_for_each_entry(program, &svc->programs, program)
+ list_for_each_entry(prog, &svc->programs, program)
{
- if (program->ownthread) {
- program->eventthreadcount = new_eventthreadcount;
- rpcsvc_spawn_threads(svc, program);
- }
+ rpcsvc_program_dump(prog);
}
}
pthread_rwlock_unlock(&svc->rpclock);
-
- ret = 0;
-out:
- return ret;
}
-rpcsvc_actor_t gluster_dump_actors[GF_DUMP_MAXVALUE] = {
- [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0, DRC_NA},
- [GF_DUMP_DUMP] = {"DUMP", GF_DUMP_DUMP, rpcsvc_dump, NULL, 0, DRC_NA},
- [GF_DUMP_PING] = {"PING", GF_DUMP_PING, rpcsvc_ping, NULL, 0, DRC_NA},
+static rpcsvc_actor_t gluster_dump_actors[GF_DUMP_MAXVALUE] = {
+ [GF_DUMP_NULL] = {"NULL", NULL, NULL, GF_DUMP_NULL, DRC_NA, 0},
+ [GF_DUMP_DUMP] = {"DUMP", rpcsvc_dump, NULL, GF_DUMP_DUMP, DRC_NA, 0},
+ [GF_DUMP_PING] = {"PING", rpcsvc_ping, NULL, GF_DUMP_PING, DRC_NA, 0},
};
-struct rpcsvc_program gluster_dump_prog = {
+static struct rpcsvc_program gluster_dump_prog = {
.progname = "GF-DUMP",
.prognum = GLUSTER_DUMP_PROGRAM,
.progver = GLUSTER_DUMP_VERSION,
diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h
index ebb836fba3f..7b3030926c8 100644
--- a/rpc/rpc-lib/src/rpcsvc.h
+++ b/rpc/rpc-lib/src/rpcsvc.h
@@ -11,28 +11,33 @@
#ifndef _RPCSVC_H
#define _RPCSVC_H
-#include "gf-event.h"
+#include <glusterfs/gf-event.h>
#include "rpc-transport.h"
-#include "logging.h"
-#include "dict.h"
-#include "mem-pool.h"
-#include "list.h"
-#include "iobuf.h"
+#include <glusterfs/dict.h>
#include "xdr-rpc.h"
-#include "glusterfs.h"
-#include "xlator.h"
#include "rpcsvc-common.h"
#include <pthread.h>
#include <sys/uio.h>
#include <inttypes.h>
#include <rpc/rpc_msg.h>
-#include "compat.h"
+#include <glusterfs/compat.h>
+#include <glusterfs/client_t.h>
#ifndef MAX_IOVEC
#define MAX_IOVEC 16
#endif
+/* TODO: we should store prognums at a centralized location to avoid conflict
+ or use a robust random number generator to avoid conflicts
+*/
+
+#define RPCSVC_INFRA_PROGRAM 7712846 /* random number */
+
+typedef enum {
+ RPCSVC_PROC_EVENT_THREAD_DEATH = 0,
+} rpcsvc_infra_procnum_t;
+
#define RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT \
64 /* Default for protocol/server */
#define RPCSVC_DEF_NFS_OUTSTANDING_RPC_LIMIT 16 /* Default for nfs/server */
@@ -138,12 +143,6 @@ struct rpcsvc_config {
int max_block_size;
};
-typedef struct rpcsvc_auth_data {
- int flavour;
- int datalen;
- char authdata[GF_MAX_AUTH_BYTES];
-} rpcsvc_auth_data_t;
-
#define rpcsvc_auth_flavour(au) ((au).flavour)
typedef struct drc_client drc_client_t;
@@ -161,11 +160,6 @@ struct rpcsvc_request {
rpcsvc_program_t *prog;
- /* The identifier for the call from client.
- * Needed to pair the reply with the call.
- */
- uint32_t xid;
-
int prognum;
int progver;
@@ -196,24 +190,11 @@ struct rpcsvc_request {
* by the program actors. This is the buffer that will need to
* be de-xdred by the actor.
*/
- struct iovec msg[MAX_IOVEC];
int count;
+ struct iovec msg[MAX_IOVEC];
struct iobref *iobref;
- /* Status of the RPC call, whether it was accepted or denied. */
- int rpc_status;
-
- /* In case, the call was denied, the RPC error is stored here
- * till the reply is sent.
- */
- int rpc_err;
-
- /* In case the failure happened because of an authentication problem
- * , this value needs to be assigned the correct auth error number.
- */
- int auth_err;
-
/* There can be cases of RPC requests where the reply needs to
* be built from multiple sources. E.g. where even the NFS reply
* can contain a payload, as in the NFSv3 read reply. Here the RPC header
@@ -229,19 +210,14 @@ struct rpcsvc_request {
size_t payloadsize;
/* The credentials extracted from the rpc request */
- rpcsvc_auth_data_t cred;
+ client_auth_data_t cred;
/* The verified extracted from the rpc request. In request side
* processing this contains the verifier sent by the client, on reply
* side processing, it is filled with the verified that will be
* sent to the client.
*/
- rpcsvc_auth_data_t verf;
-
- /* Execute this request's actor function in ownthread of program?*/
- gf_boolean_t ownthread;
-
- gf_boolean_t synctask;
+ client_auth_data_t verf;
/* Container for a RPC program wanting to store a temp
* request-specific item.
*/
@@ -256,6 +232,19 @@ struct rpcsvc_request {
/* request queue in rpcsvc */
struct list_head request_list;
+ /* Status of the RPC call, whether it was accepted or denied. */
+ int rpc_status;
+
+ /* In case, the call was denied, the RPC error is stored here
+ * till the reply is sent.
+ */
+ int rpc_err;
+
+ /* In case the failure happened because of an authentication problem
+ * , this value needs to be assigned the correct auth error number.
+ */
+ int auth_err;
+
/* Things passed to rpc layer from client */
/* @flags: Can be used for binary data passed in xdata to be
@@ -265,6 +254,18 @@ struct rpcsvc_request {
/* ctime: origin of time on the client side, ideally this is
the one we should consider for time */
struct timespec ctime;
+
+ /* The identifier for the call from client.
+ * Needed to pair the reply with the call.
+ */
+ uint32_t xid;
+
+ /* Execute this request's actor function in ownthread of program?*/
+ gf_boolean_t ownthread;
+
+ gf_boolean_t synctask;
+ struct timespec begin; /*req handling start time*/
+ struct timespec end; /*req handling end time*/
};
#define rpcsvc_request_program(req) ((rpcsvc_program_t *)((req)->prog))
@@ -306,6 +307,20 @@ struct rpcsvc_request {
} \
} while (0);
+#define RPC_AUTH_ALL_SQUASH(req) \
+ do { \
+ int gidcount = 0; \
+ if (req->svc->all_squash) { \
+ req->uid = req->svc->anonuid; \
+ req->gid = req->svc->anongid; \
+ \
+ for (gidcount = 0; gidcount < req->auxgidcount; ++gidcount) { \
+ if (!req->auxgids[gidcount]) \
+ req->auxgids[gidcount] = req->svc->anongid; \
+ } \
+ } \
+ } while (0);
+
#define RPCSVC_ACTOR_SUCCESS 0
#define RPCSVC_ACTOR_ERROR (-1)
#define RPCSVC_ACTOR_IGNORE (-2)
@@ -344,7 +359,6 @@ typedef void (*rpcsvc_deallocate_reply)(void *msg);
*/
typedef struct rpcsvc_actor_desc {
char procname[RPCSVC_NAME_MAX];
- int procnum;
rpcsvc_actor actor;
/* Handler for cases where the RPC requests fragments are large enough
@@ -357,11 +371,23 @@ typedef struct rpcsvc_actor_desc {
*/
rpcsvc_vector_sizer vector_sizer;
+ int procnum;
+
/* Can actor be ran on behalf an unprivileged requestor? */
- gf_boolean_t unprivileged;
drc_op_type_t op_type;
+ gf_boolean_t unprivileged;
} rpcsvc_actor_t;
+typedef struct rpcsvc_request_queue {
+ struct list_head request_queue;
+ pthread_mutex_t queue_lock;
+ pthread_cond_t queue_cond;
+ pthread_t thread;
+ struct rpcsvc_program *program;
+ int gen;
+ gf_boolean_t waiting;
+} rpcsvc_request_queue_t;
+
/* Describes a program and its version along with the function pointers
* required to handle the procedures/actors of each program/version.
* Never changed ever by any thread so no need for a lock.
@@ -371,21 +397,20 @@ struct rpcsvc_program {
int prognum;
int progver;
/* FIXME */
- dict_t *options; /* An opaque dictionary
- * populated by the program
- * (probably from xl->options)
- * which contain enough
- * information for transport to
- * initialize. As a part of
- * cleanup, the members of
- * options which are of interest
- * to transport should be put
- * into a structure for better
- * readability and structure
- * should replace options member
- * here.
- */
- uint16_t progport; /* Registered with portmap */
+ dict_t *options; /* An opaque dictionary
+ * populated by the program
+ * (probably from xl->options)
+ * which contain enough
+ * information for transport to
+ * initialize. As a part of
+ * cleanup, the members of
+ * options which are of interest
+ * to transport should be put
+ * into a structure for better
+ * readability and structure
+ * should replace options member
+ * here.
+ */
#if 0
int progaddrfamily; /* AF_INET or AF_INET6 */
char *proghost; /* Bind host, can be NULL */
@@ -394,10 +419,9 @@ struct rpcsvc_program {
int numactors; /* Num actors in actor array */
int proghighvers; /* Highest ver for program
supported by the system. */
- int proglowvers; /* Lowest ver */
-
/* Program specific state handed to actors */
void *private;
+ gf_latency_t *latencies; /*Tracks latency statistics for the rpc call*/
/* This upcall is provided by the program during registration.
* It is used to notify the program about events like connection being
@@ -407,30 +431,36 @@ struct rpcsvc_program {
*/
rpcsvc_notify_t notify;
+ int proglowvers; /* Lowest ver */
+
/* An integer that identifies the min auth strength that is required
* by this protocol, for eg. MOUNT3 needs AUTH_UNIX at least.
* See RFC 1813, Section 5.2.1.
*/
int min_auth;
- /* Execute actor function in program's own thread? This will reduce */
- /* the workload on poller threads */
- gf_boolean_t ownthread;
- gf_boolean_t alive;
-
- gf_boolean_t synctask;
/* list member to link to list of registered services with rpcsvc */
struct list_head program;
- struct list_head request_queue;
- pthread_mutex_t queue_lock;
- pthread_cond_t queue_cond;
- pthread_t thread;
+ rpcsvc_request_queue_t request_queue[EVENT_MAX_THREADS];
+ pthread_mutex_t thr_lock;
+ pthread_cond_t thr_cond;
int threadcount;
+ int thr_queue;
+ pthread_key_t req_queue_key;
+
/* eventthreadcount is just a readonly copy of the actual value
* owned by the event sub-system
* It is used to control the scaling of rpcsvc_request_handler threads
*/
int eventthreadcount;
+ uint16_t progport; /* Registered with portmap */
+ /* Execute actor function in program's own thread? This will reduce */
+ /* the workload on poller threads */
+ gf_boolean_t ownthread;
+ gf_boolean_t alive;
+
+ gf_boolean_t synctask;
+ unsigned long request_queue_status[EVENT_MAX_THREADS / __BITS_PER_LONG];
};
typedef struct rpcsvc_cbk_program {
@@ -562,9 +592,9 @@ typedef struct rpcsvc_auth_ops {
typedef struct rpcsvc_auth_flavour_desc {
char authname[RPCSVC_NAME_MAX];
- int authnum;
rpcsvc_auth_ops_t *authops;
void *authprivate;
+ int authnum;
} rpcsvc_auth_t;
typedef void *(*rpcsvc_auth_initer_t)(rpcsvc_t *svc, dict_t *options);
@@ -628,7 +658,7 @@ rpcsvc_actor_t *
rpcsvc_program_actor(rpcsvc_request_t *req);
int
-rpcsvc_transport_unix_options_build(dict_t **options, char *filepath);
+rpcsvc_transport_unix_options_build(dict_t *options, char *filepath);
int
rpcsvc_set_allow_insecure(rpcsvc_t *svc, dict_t *options);
int
@@ -636,6 +666,8 @@ rpcsvc_set_addr_namelookup(rpcsvc_t *svc, dict_t *options);
int
rpcsvc_set_root_squash(rpcsvc_t *svc, dict_t *options);
int
+rpcsvc_set_all_squash(rpcsvc_t *svc, dict_t *options);
+int
rpcsvc_set_outstanding_rpc_limit(rpcsvc_t *svc, dict_t *options, int defvalue);
int
@@ -652,9 +684,11 @@ rpcsvc_auth_array(rpcsvc_t *svc, char *volname, int *autharr, int arrlen);
rpcsvc_vector_sizer
rpcsvc_get_program_vector_sizer(rpcsvc_t *svc, uint32_t prognum,
uint32_t progver, int procnum);
-extern int
-rpcsvc_ownthread_reconf(rpcsvc_t *svc, int new_eventthreadcount);
-
void
rpcsvc_autoscale_threads(glusterfs_ctx_t *ctx, rpcsvc_t *rpc, int incr);
+
+extern int
+rpcsvc_destroy(rpcsvc_t *svc);
+void
+rpcsvc_statedump(rpcsvc_t *svc);
#endif
diff --git a/rpc/rpc-lib/src/xdr-common.h b/rpc/rpc-lib/src/xdr-common.h
index 7b0bc36ec64..752736b3d4d 100644
--- a/rpc/rpc-lib/src/xdr-common.h
+++ b/rpc/rpc-lib/src/xdr-common.h
@@ -66,11 +66,9 @@ enum gf_dump_procnum {
#ifdef GF_LINUX_HOST_OS
#define xdr_u_int32_t xdr_uint32_t
#define xdr_u_int64_t xdr_uint64_t
-#ifdef IPV6_DEFAULT
unsigned long
xdr_sizeof(xdrproc_t func, void *data);
#endif
-#endif
#ifdef GF_DARWIN_HOST_OS
#define xdr_u_quad_t xdr_u_int64_t
diff --git a/rpc/rpc-lib/src/xdr-rpc.c b/rpc/rpc-lib/src/xdr-rpc.c
index 36fd9db1a97..4992dc5a7ce 100644
--- a/rpc/rpc-lib/src/xdr-rpc.c
+++ b/rpc/rpc-lib/src/xdr-rpc.c
@@ -9,17 +9,13 @@
*/
#include <rpc/rpc.h>
-#include <rpc/pmap_clnt.h>
-#include <arpa/inet.h>
#include <rpc/xdr.h>
#include <sys/uio.h>
#include <rpc/auth_unix.h>
-#include "mem-pool.h"
#include "xdr-rpc.h"
#include "xdr-common.h"
-#include "logging.h"
-#include "common-utils.h"
+#include <glusterfs/common-utils.h>
/* Decodes the XDR format in msgbuf into rpc_msg.
* The remaining payload is returned into payload.
diff --git a/rpc/rpc-lib/src/xdr-rpc.h b/rpc/rpc-lib/src/xdr-rpc.h
index a57cd9430be..7baed273846 100644
--- a/rpc/rpc-lib/src/xdr-rpc.h
+++ b/rpc/rpc-lib/src/xdr-rpc.h
@@ -20,7 +20,6 @@
#include <rpc/auth_sys.h>
#endif
-//#include <rpc/pmap_clnt.h>
#include <arpa/inet.h>
#include <rpc/xdr.h>
#include <sys/uio.h>
diff --git a/rpc/rpc-lib/src/xdr-rpcclnt.c b/rpc/rpc-lib/src/xdr-rpcclnt.c
index 9e60d19e7a2..8dcdcfeda83 100644
--- a/rpc/rpc-lib/src/xdr-rpcclnt.c
+++ b/rpc/rpc-lib/src/xdr-rpcclnt.c
@@ -9,18 +9,14 @@
*/
#include <rpc/rpc.h>
-#include <rpc/pmap_clnt.h>
-#include <arpa/inet.h>
#include <rpc/xdr.h>
#include <sys/uio.h>
#include <rpc/auth_unix.h>
#include <errno.h>
-#include "mem-pool.h"
#include "xdr-rpc.h"
#include "xdr-common.h"
-#include "logging.h"
-#include "common-utils.h"
+#include <glusterfs/common-utils.h>
/* Decodes the XDR format in msgbuf into rpc_msg.
* The remaining payload is returned into payload.
diff --git a/rpc/rpc-lib/src/xdr-rpcclnt.h b/rpc/rpc-lib/src/xdr-rpcclnt.h
index 4d6e38d429c..58eda4892a9 100644
--- a/rpc/rpc-lib/src/xdr-rpcclnt.h
+++ b/rpc/rpc-lib/src/xdr-rpcclnt.h
@@ -11,8 +11,6 @@
#ifndef _XDR_RPCCLNT_H
#define _XDR_RPCCLNT_H
-//#include <rpc/rpc.h>
-//#include <rpc/pmap_clnt.h>
#include <arpa/inet.h>
#include <rpc/xdr.h>
#include <sys/uio.h>