summaryrefslogtreecommitdiffstats
path: root/rpc/rpc-lib
diff options
context:
space:
mode:
Diffstat (limited to 'rpc/rpc-lib')
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.c104
-rw-r--r--rpc/rpc-lib/src/rpc-clnt.h1
2 files changed, 99 insertions, 6 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index 27e394093cf..a9e43eb42f1 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -121,6 +121,7 @@ call_bail (void *data)
struct timespec timeout = {0,};
struct iovec iov = {0,};
char peerid[UNIX_PATH_MAX] = {0};
+ gf_boolean_t need_unref = _gf_false;
GF_VALIDATE_OR_GOTO ("client", data, out);
@@ -153,6 +154,8 @@ call_bail (void *data)
timeout.tv_sec = 10;
timeout.tv_nsec = 0;
+ /* Ref rpc as it's added to timer event queue */
+ rpc_clnt_ref (clnt);
gf_timer_call_cancel (clnt->ctx, conn->timer);
conn->timer = gf_timer_call_after (clnt->ctx,
timeout,
@@ -163,6 +166,7 @@ call_bail (void *data)
gf_log (conn->name, GF_LOG_WARNING,
"Cannot create bailout timer for %s",
peerid);
+ need_unref = _gf_true;
}
}
@@ -205,6 +209,9 @@ call_bail (void *data)
mem_put (trav);
}
out:
+ rpc_clnt_unref (clnt);
+ if (need_unref)
+ rpc_clnt_unref (clnt);
return;
}
@@ -230,6 +237,7 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame,
if (conn->timer == NULL) {
timeout.tv_sec = 10;
timeout.tv_nsec = 0;
+ rpc_clnt_ref (rpc_clnt);
conn->timer = gf_timer_call_after (rpc_clnt->ctx,
timeout,
call_bail,
@@ -389,6 +397,7 @@ rpc_clnt_reconnect (void *conn_ptr)
struct timespec ts = {0, 0};
int32_t ret = 0;
struct rpc_clnt *clnt = NULL;
+ gf_boolean_t need_unref = _gf_false;
conn = conn_ptr;
clnt = conn->rpc_clnt;
@@ -413,10 +422,16 @@ rpc_clnt_reconnect (void *conn_ptr)
"attempting reconnect");
ret = rpc_transport_connect (trans,
conn->config.remote_port);
+ rpc_clnt_ref (clnt);
conn->reconnect =
gf_timer_call_after (clnt->ctx, ts,
rpc_clnt_reconnect,
conn);
+ if (!conn->reconnect) {
+ need_unref = _gf_true;
+ gf_log (conn->name, GF_LOG_ERROR,
+ "Error adding to timer event queue");
+ }
} else {
gf_log (conn->name, GF_LOG_TRACE,
"breaking reconnect chain");
@@ -424,6 +439,9 @@ rpc_clnt_reconnect (void *conn_ptr)
}
pthread_mutex_unlock (&conn->lock);
+ rpc_clnt_unref (clnt);
+ if (need_unref)
+ rpc_clnt_unref (clnt);
return;
}
@@ -463,6 +481,8 @@ int
rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn)
{
struct rpc_clnt *clnt = NULL;
+ int ret = 0;
+ gf_boolean_t reconnect_unref = _gf_false;
if (!conn) {
goto out;
@@ -474,13 +494,18 @@ rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn)
{
if (conn->reconnect) {
- gf_timer_call_cancel (clnt->ctx, conn->reconnect);
+ ret = gf_timer_call_cancel (clnt->ctx, conn->reconnect);
+ if (!ret)
+ reconnect_unref = _gf_true;
conn->reconnect = NULL;
}
}
pthread_mutex_unlock (&conn->lock);
+ if (reconnect_unref)
+ rpc_clnt_unref (clnt);
+
out:
return 0;
}
@@ -496,6 +521,8 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn)
struct saved_frames *saved_frames = NULL;
struct rpc_clnt *clnt = NULL;
int unref = 0;
+ int ret = 0;
+ gf_boolean_t timer_unref = _gf_false;
if (!conn) {
goto out;
@@ -511,7 +538,9 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn)
/* bailout logic cleanup */
if (conn->timer) {
- gf_timer_call_cancel (clnt->ctx, conn->timer);
+ ret = gf_timer_call_cancel (clnt->ctx, conn->timer);
+ if (!ret)
+ timer_unref = _gf_true;
conn->timer = NULL;
}
@@ -528,6 +557,9 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn)
if (unref)
rpc_clnt_unref (clnt);
+ if (timer_unref)
+ rpc_clnt_unref (clnt);
+
out:
return 0;
}
@@ -851,6 +883,7 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
rpc_transport_pollin_t *pollin = NULL;
struct timespec ts = {0, };
void *clnt_mydata = NULL;
+ gf_boolean_t unref_clnt = _gf_false;
DECLARE_OLD_THIS;
conn = mydata;
@@ -875,10 +908,16 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
ts.tv_sec = 10;
ts.tv_nsec = 0;
+ rpc_clnt_ref (clnt);
conn->reconnect =
gf_timer_call_after (clnt->ctx, ts,
rpc_clnt_reconnect,
conn);
+ if (conn->reconnect == NULL) {
+ gf_log (conn->name, GF_LOG_WARNING,
+ "Cannot create rpc_clnt_reconnect timer");
+ unref_clnt = _gf_true;
+ }
}
}
pthread_mutex_unlock (&conn->lock);
@@ -886,6 +925,9 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
if (clnt->notifyfn)
ret = clnt->notifyfn (clnt, clnt->mydata,
RPC_CLNT_DISCONNECT, NULL);
+ if (unref_clnt)
+ rpc_clnt_ref (clnt);
+
break;
}
@@ -1135,6 +1177,10 @@ rpc_clnt_start (struct rpc_clnt *rpc)
rpc->disabled = 0;
}
pthread_mutex_unlock (&conn->lock);
+ /* Corresponding unref will be either on successful timer cancel or last
+ * rpc_clnt_reconnect fire event.
+ */
+ rpc_clnt_ref (rpc);
rpc_clnt_reconnect (conn);
return 0;
@@ -1512,6 +1558,7 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
int proglen = 0;
char new_iobref = 0;
uint64_t callid = 0;
+ gf_boolean_t need_unref = _gf_false;
if (!rpc || !prog || !frame) {
goto out;
@@ -1596,6 +1643,14 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
if ((ret >= 0) && frame) {
/* Save the frame in queue */
__save_frame (rpc, frame, rpcreq);
+
+ /* A ref on rpc-clnt object is taken while registering
+ * call_bail to timer in __save_frame. If it fails to
+ * register, it needs an unref and should happen outside
+ * conn->lock which otherwise leads to deadlocks */
+ if (conn->timer == NULL)
+ need_unref = _gf_true;
+
conn->msgcnt++;
gf_log ("rpc-clnt", GF_LOG_TRACE, "submitted request "
@@ -1607,6 +1662,9 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,
}
pthread_mutex_unlock (&conn->lock);
+ if (need_unref)
+ rpc_clnt_unref (rpc);
+
if (ret == -1) {
goto out;
}
@@ -1739,6 +1797,9 @@ rpc_clnt_disable (struct rpc_clnt *rpc)
rpc_clnt_connection_t *conn = NULL;
rpc_transport_t *trans = NULL;
int unref = 0;
+ int ret = 0;
+ gf_boolean_t timer_unref = _gf_false;
+ gf_boolean_t reconnect_unref = _gf_false;
if (!rpc) {
goto out;
@@ -1751,12 +1812,20 @@ rpc_clnt_disable (struct rpc_clnt *rpc)
rpc->disabled = 1;
if (conn->timer) {
- gf_timer_call_cancel (rpc->ctx, conn->timer);
+ ret = gf_timer_call_cancel (rpc->ctx, conn->timer);
+ /* If the event is not fired and it actually cancelled
+ * the timer, do the unref else registered call back
+ * function will take care of it.
+ */
+ if (!ret)
+ timer_unref = _gf_true;
conn->timer = NULL;
}
if (conn->reconnect) {
- gf_timer_call_cancel (rpc->ctx, conn->reconnect);
+ ret = gf_timer_call_cancel (rpc->ctx, conn->reconnect);
+ if (!ret)
+ reconnect_unref = _gf_true;
conn->reconnect = NULL;
}
conn->connected = 0;
@@ -1774,6 +1843,12 @@ rpc_clnt_disable (struct rpc_clnt *rpc)
if (unref)
rpc_clnt_unref (rpc);
+ if (timer_unref)
+ rpc_clnt_unref (rpc);
+
+ if (reconnect_unref)
+ rpc_clnt_unref (rpc);
+
out:
return;
}
@@ -1784,6 +1859,9 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc)
rpc_clnt_connection_t *conn = NULL;
rpc_transport_t *trans = NULL;
int unref = 0;
+ int ret = 0;
+ gf_boolean_t timer_unref = _gf_false;
+ gf_boolean_t reconnect_unref = _gf_false;
if (!rpc)
goto out;
@@ -1794,12 +1872,20 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc)
{
rpc->disabled = 1;
if (conn->timer) {
- gf_timer_call_cancel (rpc->ctx, conn->timer);
+ ret = gf_timer_call_cancel (rpc->ctx, conn->timer);
+ /* If the event is not fired and it actually cancelled
+ * the timer, do the unref else registered call back
+ * function will take care of unref.
+ */
+ if (!ret)
+ timer_unref = _gf_true;
conn->timer = NULL;
}
if (conn->reconnect) {
- gf_timer_call_cancel (rpc->ctx, conn->reconnect);
+ ret = gf_timer_call_cancel (rpc->ctx, conn->reconnect);
+ if (!ret)
+ reconnect_unref = _gf_true;
conn->reconnect = NULL;
}
conn->connected = 0;
@@ -1815,6 +1901,12 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc)
if (unref)
rpc_clnt_unref (rpc);
+ if (timer_unref)
+ rpc_clnt_unref (rpc);
+
+ if (reconnect_unref)
+ rpc_clnt_unref (rpc);
+
out:
return;
}
diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h
index 01caeb814c0..f84b4cbf806 100644
--- a/rpc/rpc-lib/src/rpc-clnt.h
+++ b/rpc/rpc-lib/src/rpc-clnt.h
@@ -231,6 +231,7 @@ struct rpc_clnt *
rpc_clnt_unref (struct rpc_clnt *rpc);
int rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn);
+int rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn);
void rpc_clnt_set_connected (rpc_clnt_connection_t *conn);