summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libglusterfs/src/xlator.c2
-rw-r--r--rpc/rpc-transport/socket/src/socket.c6
-rw-r--r--tests/00-geo-rep/bug-1600145.t109
-rw-r--r--tests/bugs/ec/bug-1236065.t1
-rw-r--r--xlators/features/changelog/src/changelog-ev-handle.c14
-rw-r--r--xlators/features/changelog/src/changelog-ev-handle.h2
-rw-r--r--xlators/features/changelog/src/changelog-helpers.h18
-rw-r--r--xlators/features/changelog/src/changelog-messages.h3
-rw-r--r--xlators/features/changelog/src/changelog-rpc-common.c18
-rw-r--r--xlators/features/changelog/src/changelog-rpc.c157
-rw-r--r--xlators/features/changelog/src/changelog.c39
11 files changed, 328 insertions, 41 deletions
diff --git a/libglusterfs/src/xlator.c b/libglusterfs/src/xlator.c
index b50848b3476..13213d55994 100644
--- a/libglusterfs/src/xlator.c
+++ b/libglusterfs/src/xlator.c
@@ -1036,7 +1036,7 @@ xlator_mem_free(xlator_t *xl)
static void
xlator_call_fini(xlator_t *this)
{
- if (!this || this->cleanup_starting)
+ if (!this || this->call_cleanup)
return;
this->cleanup_starting = 1;
this->call_cleanup = 1;
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index 38e52081b3c..b3f90079c3c 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -3009,6 +3009,12 @@ socket_server_event_handler(int fd, int idx, int gen, void *data, int poll_in,
* thread context while we are using it here.
*/
priv->idx = idx;
+ priv->gen = gen;
+
+ if (poll_err) {
+ socket_event_poll_err(this, gen, idx);
+ goto out;
+ }
if (poll_in) {
int aflags = 0;
diff --git a/tests/00-geo-rep/bug-1600145.t b/tests/00-geo-rep/bug-1600145.t
new file mode 100644
index 00000000000..1d38bf92682
--- /dev/null
+++ b/tests/00-geo-rep/bug-1600145.t
@@ -0,0 +1,109 @@
+#!/bin/bash
+
+. $(dirname $0)/../include.rc
+. $(dirname $0)/../volume.rc
+. $(dirname $0)/../geo-rep.rc
+. $(dirname $0)/../env.rc
+
+### Basic Tests with Distribute Replicate volumes
+
+##Cleanup and start glusterd
+cleanup;
+SCRIPT_TIMEOUT=600
+TEST glusterd;
+TEST pidof glusterd
+
+##Variables
+GEOREP_CLI="$CLI volume geo-replication"
+master=$GMV0
+SH0="127.0.0.1"
+slave=${SH0}::${GSV0}
+num_active=2
+num_passive=2
+master_mnt=$M0
+slave_mnt=$M1
+
+############################################################
+#SETUP VOLUMES AND GEO-REPLICATION
+############################################################
+
+##create_and_start_master_volume
+TEST $CLI volume create $GMV0 replica 2 $H0:$B0/${GMV0}{1,2};
+gluster v set all cluster.brick-multiplex on
+TEST $CLI volume start $GMV0
+
+##create_and_start_slave_volume
+TEST $CLI volume create $GSV0 replica 2 $H0:$B0/${GSV0}{1,2};
+TEST $CLI volume start $GSV0
+
+##Create, start and mount meta_volume
+TEST $CLI volume create $META_VOL replica 3 $H0:$B0/${META_VOL}{1,2,3};
+TEST $CLI volume start $META_VOL
+TEST mkdir -p $META_MNT
+TEST glusterfs -s $H0 --volfile-id $META_VOL $META_MNT
+
+############################################################
+#BASIC GEO-REPLICATION TESTS
+############################################################
+
+#Create geo-rep session
+TEST create_georep_session $master $slave
+
+#Config gluster-command-dir
+TEST $GEOREP_CLI $master $slave config gluster-command-dir ${GLUSTER_CMD_DIR}
+
+#Config gluster-command-dir
+TEST $GEOREP_CLI $master $slave config slave-gluster-command-dir ${GLUSTER_CMD_DIR}
+
+#Enable_metavolume
+TEST $GEOREP_CLI $master $slave config use_meta_volume true
+
+#Wait for common secret pem file to be created
+EXPECT_WITHIN $GEO_REP_TIMEOUT 0 check_common_secret_file
+
+#Verify the keys are distributed
+
+EXPECT_WITHIN $GEO_REP_TIMEOUT 0 check_keys_distributed
+
+#Count no. of changelog socket
+brick_pid=`ps -aef | grep glusterfsd | grep -v "shared_storage" | grep -v grep | awk -F " " '{print $2}'`
+n=$(grep -Fc "changelog" /proc/$brick_pid/net/unix)
+
+#Start_georep
+TEST $GEOREP_CLI $master $slave start
+
+EXPECT_WITHIN $GEO_REP_TIMEOUT 1 check_status_num_rows "Active"
+EXPECT_WITHIN $GEO_REP_TIMEOUT 1 check_status_num_rows "Passive"
+
+#Count no. of changelog socket
+brick_pid=`ps -aef | grep glusterfsd | grep -v "shared_storage" | grep -v grep | awk -F " " '{print $2}'`
+c=$(grep -Fc "changelog" /proc/$brick_pid/net/unix)
+let expected=n+2
+TEST [ "$c" -eq "$expected" ]
+
+#Kill the "Active" brick
+brick=$($GEOREP_CLI $master $slave status | grep -F "Active" | awk {'print $3'})
+cat /proc/$brick_pid/net/unix | grep "changelog"
+TEST kill_brick $GMV0 $H0 $brick
+#Expect geo-rep status to be "Faulty"
+EXPECT_WITHIN $GEO_REP_TIMEOUT 1 check_status_num_rows "Faulty"
+EXPECT_WITHIN $GEO_REP_TIMEOUT 1 check_status_num_rows "Active"
+
+#Count no. of changelog socket
+brick_pid=`ps -aef | grep glusterfsd | grep -v "shared_storage" | grep -v grep | awk -F " " '{print $2}'`
+cat /proc/$brick_pid/net/unix | grep "changelog"
+ls -lrth /proc/$brick_pid/fd | grep "socket"
+c=$(grep -Fc "changelog" /proc/$brick_pid/net/unix)
+TEST [ "$c" -eq "$n" ]
+
+#Stop Geo-rep
+TEST $GEOREP_CLI $master $slave stop
+
+#Delete Geo-rep
+TEST $GEOREP_CLI $master $slave delete
+
+#Cleanup authorized keys
+sed -i '/^command=.*SSH_ORIGINAL_COMMAND#.*/d' ~/.ssh/authorized_keys
+sed -i '/^command=.*gsyncd.*/d' ~/.ssh/authorized_keys
+
+cleanup;
diff --git a/tests/bugs/ec/bug-1236065.t b/tests/bugs/ec/bug-1236065.t
index 9395aa33e8c..76d25d739fa 100644
--- a/tests/bugs/ec/bug-1236065.t
+++ b/tests/bugs/ec/bug-1236065.t
@@ -2,6 +2,7 @@
. $(dirname $0)/../../include.rc
. $(dirname $0)/../../volume.rc
+SCRIPT_TIMEOUT=400
cleanup
diff --git a/xlators/features/changelog/src/changelog-ev-handle.c b/xlators/features/changelog/src/changelog-ev-handle.c
index 3ed6ff821d9..f48dd63870a 100644
--- a/xlators/features/changelog/src/changelog-ev-handle.c
+++ b/xlators/features/changelog/src/changelog-ev-handle.c
@@ -134,6 +134,8 @@ changelog_rpc_notify(struct rpc_clnt *rpc, void *mydata, rpc_clnt_event_t event,
changelog_clnt_t *c_clnt = NULL;
changelog_priv_t *priv = NULL;
changelog_ev_selector_t *selection = NULL;
+ uint64_t clntcnt = 0;
+ uint64_t xprtcnt = 0;
crpc = mydata;
this = crpc->this;
@@ -144,6 +146,7 @@ changelog_rpc_notify(struct rpc_clnt *rpc, void *mydata, rpc_clnt_event_t event,
switch (event) {
case RPC_CLNT_CONNECT:
selection = &priv->ev_selection;
+ GF_ATOMIC_INC(priv->clntcnt);
LOCK(&c_clnt->wait_lock);
{
@@ -176,12 +179,23 @@ changelog_rpc_notify(struct rpc_clnt *rpc, void *mydata, rpc_clnt_event_t event,
changelog_set_disconnect_flag(crpc, _gf_true);
}
UNLOCK(&crpc->lock);
+ LOCK(&c_clnt->active_lock);
+ {
+ list_del_init(&crpc->list);
+ }
+ UNLOCK(&c_clnt->active_lock);
break;
case RPC_CLNT_MSG:
case RPC_CLNT_DESTROY:
/* Free up mydata */
changelog_rpc_clnt_unref(crpc);
+ clntcnt = GF_ATOMIC_DEC(priv->clntcnt);
+ xprtcnt = GF_ATOMIC_GET(priv->xprtcnt);
+ if (this->cleanup_starting) {
+ if (!clntcnt && !xprtcnt)
+ changelog_process_cleanup_event(this);
+ }
break;
case RPC_CLNT_PING:
break;
diff --git a/xlators/features/changelog/src/changelog-ev-handle.h b/xlators/features/changelog/src/changelog-ev-handle.h
index 53119c5e209..cc1af58a276 100644
--- a/xlators/features/changelog/src/changelog-ev-handle.h
+++ b/xlators/features/changelog/src/changelog-ev-handle.h
@@ -131,4 +131,6 @@ changelog_ev_queue_connection(changelog_clnt_t *, changelog_rpc_clnt_t *);
void
changelog_ev_cleanup_connections(xlator_t *, changelog_clnt_t *);
+void
+changelog_process_cleanup_event(xlator_t *);
#endif
diff --git a/xlators/features/changelog/src/changelog-helpers.h b/xlators/features/changelog/src/changelog-helpers.h
index 1b8fcb8675a..517c4dc4883 100644
--- a/xlators/features/changelog/src/changelog-helpers.h
+++ b/xlators/features/changelog/src/changelog-helpers.h
@@ -307,6 +307,24 @@ struct changelog_priv {
/* glusterfind dependency to capture paths on deleted entries*/
gf_boolean_t capture_del_path;
+
+ /* Save total no. of listners */
+ gf_atomic_t listnercnt;
+
+ /* Save total no. of xprt are associated with listner */
+ gf_atomic_t xprtcnt;
+
+ /* Save xprt list */
+ struct list_head xprt_list;
+
+ /* Save total no. of client connection */
+ gf_atomic_t clntcnt;
+
+ /* Save cleanup brick in victim */
+ xlator_t *victim;
+
+ /* Status to save cleanup notify status */
+ gf_boolean_t notify_down;
};
struct changelog_local {
diff --git a/xlators/features/changelog/src/changelog-messages.h b/xlators/features/changelog/src/changelog-messages.h
index 9edd9ae60ec..ca50ccb149e 100644
--- a/xlators/features/changelog/src/changelog-messages.h
+++ b/xlators/features/changelog/src/changelog-messages.h
@@ -52,6 +52,7 @@ GLFS_MSGID(
CHANGELOG_MSG_FSTAT_OP_FAILED, CHANGELOG_MSG_LSEEK_OP_FAILED,
CHANGELOG_MSG_STRSTR_OP_FAILED, CHANGELOG_MSG_UNLINK_OP_FAILED,
CHANGELOG_MSG_DETECT_EMPTY_CHANGELOG_FAILED,
- CHANGELOG_MSG_READLINK_OP_FAILED, CHANGELOG_MSG_EXPLICIT_ROLLOVER_FAILED);
+ CHANGELOG_MSG_READLINK_OP_FAILED, CHANGELOG_MSG_EXPLICIT_ROLLOVER_FAILED,
+ CHANGELOG_MSG_RPCSVC_NOTIFY_FAILED);
#endif /* !_CHANGELOG_MESSAGES_H_ */
diff --git a/xlators/features/changelog/src/changelog-rpc-common.c b/xlators/features/changelog/src/changelog-rpc-common.c
index 89a5ab30a68..cf35175c3bc 100644
--- a/xlators/features/changelog/src/changelog-rpc-common.c
+++ b/xlators/features/changelog/src/changelog-rpc-common.c
@@ -260,6 +260,7 @@ changelog_rpc_server_destroy(xlator_t *this, rpcsvc_t *rpc, char *sockfile,
rpcsvc_listener_t *listener = NULL;
rpcsvc_listener_t *next = NULL;
struct rpcsvc_program *prog = NULL;
+ rpc_transport_t *trans = NULL;
while (*progs) {
prog = *progs;
@@ -269,22 +270,25 @@ changelog_rpc_server_destroy(xlator_t *this, rpcsvc_t *rpc, char *sockfile,
list_for_each_entry_safe(listener, next, &rpc->listeners, list)
{
- rpcsvc_listener_destroy(listener);
+ if (listener->trans) {
+ trans = listener->trans;
+ rpc_transport_disconnect(trans, _gf_false);
+ }
}
(void)rpcsvc_unregister_notify(rpc, fn, this);
- sys_unlink(sockfile);
- if (rpc->rxpool) {
- mem_pool_destroy(rpc->rxpool);
- rpc->rxpool = NULL;
- }
/* TODO Avoid freeing rpc object in case of brick multiplex
after freeing rpc object svc->rpclock corrupted and it takes
more time to detach a brick
*/
- if (!this->cleanup_starting)
+ if (!this->cleanup_starting) {
+ if (rpc->rxpool) {
+ mem_pool_destroy(rpc->rxpool);
+ rpc->rxpool = NULL;
+ }
GF_FREE(rpc);
+ }
}
rpcsvc_t *
diff --git a/xlators/features/changelog/src/changelog-rpc.c b/xlators/features/changelog/src/changelog-rpc.c
index 394fae44e3e..28974fe0999 100644
--- a/xlators/features/changelog/src/changelog-rpc.c
+++ b/xlators/features/changelog/src/changelog-rpc.c
@@ -43,9 +43,6 @@ changelog_cleanup_rpc_threads(xlator_t *this, changelog_priv_t *priv)
/** terminate dispatcher thread(s) */
changelog_cleanup_dispatchers(this, priv, priv->nr_dispatchers);
- /* TODO: what about pending and waiting connections? */
- changelog_ev_cleanup_connections(this, conn);
-
/* destroy locks */
ret = pthread_mutex_destroy(&conn->pending_lock);
if (ret != 0)
@@ -147,48 +144,146 @@ int
changelog_rpcsvc_notify(rpcsvc_t *rpc, void *xl, rpcsvc_event_t event,
void *data)
{
+ xlator_t *this = NULL;
+ rpc_transport_t *trans = NULL;
+ rpc_transport_t *xprt = NULL;
+ rpc_transport_t *xp_next = NULL;
+ changelog_priv_t *priv = NULL;
+ uint64_t listnercnt = 0;
+ uint64_t xprtcnt = 0;
+ uint64_t clntcnt = 0;
+ rpcsvc_listener_t *listener = NULL;
+ rpcsvc_listener_t *next = NULL;
+ gf_boolean_t listner_found = _gf_false;
+ socket_private_t *sockpriv = NULL;
+
+ if (!xl || !data || !rpc) {
+ gf_msg_callingfn("changelog", GF_LOG_WARNING, 0,
+ CHANGELOG_MSG_RPCSVC_NOTIFY_FAILED,
+ "Calling rpc_notify without initializing");
+ goto out;
+ }
+
+ this = xl;
+ trans = data;
+ priv = this->private;
+
+ if (!priv) {
+ gf_msg_callingfn("changelog", GF_LOG_WARNING, 0,
+ CHANGELOG_MSG_RPCSVC_NOTIFY_FAILED,
+ "Calling rpc_notify without priv initializing");
+ goto out;
+ }
+
+ if (event == RPCSVC_EVENT_ACCEPT) {
+ GF_ATOMIC_INC(priv->xprtcnt);
+ LOCK(&priv->lock);
+ {
+ list_add_tail(&trans->list, &priv->xprt_list);
+ }
+ UNLOCK(&priv->lock);
+ goto out;
+ }
+
+ if (event == RPCSVC_EVENT_DISCONNECT) {
+ list_for_each_entry_safe(listener, next, &rpc->listeners, list)
+ {
+ if (listener && listener->trans) {
+ if (listener->trans == trans) {
+ listnercnt = GF_ATOMIC_DEC(priv->listnercnt);
+ listner_found = _gf_true;
+ rpcsvc_listener_destroy(listener);
+ }
+ }
+ }
+
+ if (listnercnt > 0) {
+ goto out;
+ }
+ if (listner_found) {
+ LOCK(&priv->lock);
+ list_for_each_entry_safe(xprt, xp_next, &priv->xprt_list, list)
+ {
+ sockpriv = (socket_private_t *)(xprt->private);
+ gf_log("changelog", GF_LOG_INFO,
+ "Send disconnect"
+ " on socket %d",
+ sockpriv->sock);
+ rpc_transport_disconnect(xprt, _gf_false);
+ }
+ UNLOCK(&priv->lock);
+ goto out;
+ }
+ LOCK(&priv->lock);
+ {
+ list_del_init(&trans->list);
+ }
+ UNLOCK(&priv->lock);
+
+ xprtcnt = GF_ATOMIC_DEC(priv->xprtcnt);
+ clntcnt = GF_ATOMIC_GET(priv->clntcnt);
+ if (!xprtcnt && !clntcnt) {
+ changelog_process_cleanup_event(this);
+ }
+ }
+
+out:
return 0;
}
void
+changelog_process_cleanup_event(xlator_t *this)
+{
+ gf_boolean_t cleanup_notify = _gf_false;
+ changelog_priv_t *priv = NULL;
+ char sockfile[UNIX_PATH_MAX] = {
+ 0,
+ };
+
+ if (!this)
+ return;
+ priv = this->private;
+ if (!priv)
+ return;
+
+ LOCK(&priv->lock);
+ {
+ cleanup_notify = priv->notify_down;
+ priv->notify_down = _gf_true;
+ }
+ UNLOCK(&priv->lock);
+
+ if (priv->victim && !cleanup_notify) {
+ default_notify(this, GF_EVENT_PARENT_DOWN, priv->victim);
+
+ if (priv->rpc) {
+ /* sockfile path could have been saved to avoid this */
+ CHANGELOG_MAKE_SOCKET_PATH(priv->changelog_brick, sockfile,
+ UNIX_PATH_MAX);
+ sys_unlink(sockfile);
+ (void)rpcsvc_unregister_notify(priv->rpc, changelog_rpcsvc_notify,
+ this);
+ if (priv->rpc->rxpool) {
+ mem_pool_destroy(priv->rpc->rxpool);
+ priv->rpc->rxpool = NULL;
+ }
+ GF_FREE(priv->rpc);
+ priv->rpc = NULL;
+ }
+ }
+}
+
+void
changelog_destroy_rpc_listner(xlator_t *this, changelog_priv_t *priv)
{
char sockfile[UNIX_PATH_MAX] = {
0,
};
- changelog_clnt_t *c_clnt = &priv->connections;
- changelog_rpc_clnt_t *crpc = NULL;
- int nofconn = 0;
/* sockfile path could have been saved to avoid this */
CHANGELOG_MAKE_SOCKET_PATH(priv->changelog_brick, sockfile, UNIX_PATH_MAX);
changelog_rpc_server_destroy(this, priv->rpc, sockfile,
changelog_rpcsvc_notify, changelog_programs);
-
- /* TODO Below approach is not perfect to wait for cleanup
- all active connections without this code brick process
- can be crash in case of brick multiplexing if any in-progress
- request process on rpc by changelog xlator after
- cleanup resources
- */
-
- if (c_clnt) {
- do {
- nofconn = 0;
- LOCK(&c_clnt->active_lock);
- list_for_each_entry(crpc, &c_clnt->active, list) { nofconn++; }
- UNLOCK(&c_clnt->active_lock);
- LOCK(&c_clnt->wait_lock);
- list_for_each_entry(crpc, &c_clnt->waitq, list) { nofconn++; }
- UNLOCK(&c_clnt->wait_lock);
- pthread_mutex_lock(&c_clnt->pending_lock);
- list_for_each_entry(crpc, &c_clnt->pending, list) { nofconn++; }
- pthread_mutex_unlock(&c_clnt->pending_lock);
-
- } while (nofconn); /* Wait for all connection cleanup */
- }
-
- (void)changelog_cleanup_rpc_threads(this, priv);
}
rpcsvc_t *
diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c
index 1f42dbe178e..1f22a97a6e9 100644
--- a/xlators/features/changelog/src/changelog.c
+++ b/xlators/features/changelog/src/changelog.c
@@ -2004,6 +2004,10 @@ notify(xlator_t *this, int event, void *data, ...)
struct list_head queue = {
0,
};
+ uint64_t xprtcnt = 0;
+ uint64_t clntcnt = 0;
+ changelog_clnt_t *conn = NULL;
+ gf_boolean_t cleanup_notify = _gf_false;
INIT_LIST_HEAD(&queue);
@@ -2011,6 +2015,33 @@ notify(xlator_t *this, int event, void *data, ...)
if (!priv)
goto out;
+ if (event == GF_EVENT_PARENT_DOWN) {
+ priv->victim = data;
+ gf_log(this->name, GF_LOG_INFO,
+ "cleanup changelog rpc connection of brick %s",
+ priv->victim->name);
+
+ this->cleanup_starting = 1;
+ changelog_destroy_rpc_listner(this, priv);
+ conn = &priv->connections;
+ if (conn)
+ changelog_ev_cleanup_connections(this, conn);
+ xprtcnt = GF_ATOMIC_GET(priv->xprtcnt);
+ clntcnt = GF_ATOMIC_GET(priv->clntcnt);
+
+ if (!xprtcnt && !clntcnt) {
+ LOCK(&priv->lock);
+ {
+ cleanup_notify = priv->notify_down;
+ priv->notify_down = _gf_true;
+ }
+ UNLOCK(&priv->lock);
+ if (!cleanup_notify)
+ default_notify(this, GF_EVENT_PARENT_DOWN, data);
+ }
+ goto out;
+ }
+
if (event == GF_EVENT_TRANSLATOR_OP) {
dict = data;
@@ -2629,8 +2660,10 @@ static void
changelog_cleanup_rpc(xlator_t *this, changelog_priv_t *priv)
{
/* terminate rpc server */
- changelog_destroy_rpc_listner(this, priv);
+ if (!this->cleanup_starting)
+ changelog_destroy_rpc_listner(this, priv);
+ (void)changelog_cleanup_rpc_threads(this, priv);
/* cleanup rot buffs */
rbuf_dtor(priv->rbuf);
@@ -2703,6 +2736,10 @@ init(xlator_t *this)
LOCK_INIT(&priv->lock);
LOCK_INIT(&priv->c_snap_lock);
+ GF_ATOMIC_INIT(priv->listnercnt, 0);
+ GF_ATOMIC_INIT(priv->clntcnt, 0);
+ GF_ATOMIC_INIT(priv->xprtcnt, 0);
+ INIT_LIST_HEAD(&priv->xprt_list);
ret = changelog_init_options(this, priv);
if (ret)