diff options
Diffstat (limited to 'rpc/rpc-lib/src/rpc-clnt.c')
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 104 | 
1 files changed, 98 insertions, 6 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index 27e394093cf..a9e43eb42f1 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -121,6 +121,7 @@ call_bail (void *data)          struct timespec        timeout = {0,};          struct iovec           iov = {0,};          char                   peerid[UNIX_PATH_MAX] = {0}; +        gf_boolean_t           need_unref = _gf_false;          GF_VALIDATE_OR_GOTO ("client", data, out); @@ -153,6 +154,8 @@ call_bail (void *data)                          timeout.tv_sec = 10;                          timeout.tv_nsec = 0; +                        /* Ref rpc as it's added to timer event queue */ +                        rpc_clnt_ref (clnt);                          gf_timer_call_cancel (clnt->ctx, conn->timer);                          conn->timer = gf_timer_call_after (clnt->ctx,                                                             timeout, @@ -163,6 +166,7 @@ call_bail (void *data)                                  gf_log (conn->name, GF_LOG_WARNING,                                          "Cannot create bailout timer for %s",                                          peerid); +                                need_unref = _gf_true;                          }                  } @@ -205,6 +209,9 @@ call_bail (void *data)                  mem_put (trav);          }  out: +        rpc_clnt_unref (clnt); +        if (need_unref) +                rpc_clnt_unref (clnt);          return;  } @@ -230,6 +237,7 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame,          if (conn->timer == NULL) {                  timeout.tv_sec  = 10;                  timeout.tv_nsec = 0; +                rpc_clnt_ref (rpc_clnt);                  conn->timer = gf_timer_call_after (rpc_clnt->ctx,                                                     timeout,                                                     call_bail, @@ -389,6 +397,7 @@ rpc_clnt_reconnect (void *conn_ptr)          struct timespec          ts    = {0, 0};          int32_t                  ret   = 0;          struct rpc_clnt         *clnt  = NULL; +        gf_boolean_t             need_unref = _gf_false;          conn  = conn_ptr;          clnt = conn->rpc_clnt; @@ -413,10 +422,16 @@ rpc_clnt_reconnect (void *conn_ptr)                                  "attempting reconnect");                          ret = rpc_transport_connect (trans,                                                       conn->config.remote_port); +                        rpc_clnt_ref (clnt);                          conn->reconnect =                                  gf_timer_call_after (clnt->ctx, ts,                                                       rpc_clnt_reconnect,                                                       conn); +                        if (!conn->reconnect) { +                                need_unref = _gf_true; +                                gf_log (conn->name, GF_LOG_ERROR, +                                        "Error adding to timer event queue"); +                        }                  } else {                          gf_log (conn->name, GF_LOG_TRACE,                                  "breaking reconnect chain"); @@ -424,6 +439,9 @@ rpc_clnt_reconnect (void *conn_ptr)          }          pthread_mutex_unlock (&conn->lock); +        rpc_clnt_unref (clnt); +        if (need_unref) +                rpc_clnt_unref (clnt);          return;  } @@ -463,6 +481,8 @@ int  rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn)  {          struct rpc_clnt         *clnt  = NULL; +        int                      ret   = 0; +        gf_boolean_t             reconnect_unref = _gf_false;          if (!conn) {                  goto out; @@ -474,13 +494,18 @@ rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn)          {                  if (conn->reconnect) { -                        gf_timer_call_cancel (clnt->ctx, conn->reconnect); +                        ret = gf_timer_call_cancel (clnt->ctx, conn->reconnect); +                        if (!ret) +                                reconnect_unref = _gf_true;                          conn->reconnect = NULL;                  }          }          pthread_mutex_unlock (&conn->lock); +        if (reconnect_unref) +                rpc_clnt_unref (clnt); +  out:          return 0;  } @@ -496,6 +521,8 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn)          struct saved_frames    *saved_frames = NULL;          struct rpc_clnt         *clnt  = NULL;          int                     unref = 0; +        int                     ret   = 0; +        gf_boolean_t            timer_unref = _gf_false;          if (!conn) {                  goto out; @@ -511,7 +538,9 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn)                  /* bailout logic cleanup */                  if (conn->timer) { -                        gf_timer_call_cancel (clnt->ctx, conn->timer); +                        ret = gf_timer_call_cancel (clnt->ctx, conn->timer); +                        if (!ret) +                                timer_unref = _gf_true;                          conn->timer = NULL;                  } @@ -528,6 +557,9 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn)          if (unref)                  rpc_clnt_unref (clnt); +        if (timer_unref) +                rpc_clnt_unref (clnt); +  out:          return 0;  } @@ -851,6 +883,7 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,          rpc_transport_pollin_t *pollin      = NULL;          struct timespec         ts          = {0, };          void                   *clnt_mydata = NULL; +        gf_boolean_t            unref_clnt  = _gf_false;          DECLARE_OLD_THIS;          conn = mydata; @@ -875,10 +908,16 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,                                  ts.tv_sec = 10;                                  ts.tv_nsec = 0; +                                rpc_clnt_ref (clnt);                                  conn->reconnect =                                          gf_timer_call_after (clnt->ctx, ts,                                                               rpc_clnt_reconnect,                                                               conn); +                                if (conn->reconnect == NULL) { +                                        gf_log (conn->name, GF_LOG_WARNING, +                                                "Cannot create rpc_clnt_reconnect timer"); +                                        unref_clnt = _gf_true; +                                }                          }                  }                  pthread_mutex_unlock (&conn->lock); @@ -886,6 +925,9 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,                  if (clnt->notifyfn)                          ret = clnt->notifyfn (clnt, clnt->mydata,                                                RPC_CLNT_DISCONNECT, NULL); +                if (unref_clnt) +                        rpc_clnt_ref (clnt); +                  break;          } @@ -1135,6 +1177,10 @@ rpc_clnt_start (struct rpc_clnt *rpc)                  rpc->disabled = 0;          }          pthread_mutex_unlock (&conn->lock); +        /* Corresponding unref will be either on successful timer cancel or last +         * rpc_clnt_reconnect fire event. +         */ +        rpc_clnt_ref (rpc);          rpc_clnt_reconnect (conn);          return 0; @@ -1512,6 +1558,7 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,          int                    proglen     = 0;          char                   new_iobref  = 0;          uint64_t               callid      = 0; +        gf_boolean_t           need_unref  = _gf_false;          if (!rpc || !prog || !frame) {                  goto out; @@ -1596,6 +1643,14 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,                  if ((ret >= 0) && frame) {                          /* Save the frame in queue */                          __save_frame (rpc, frame, rpcreq); + +                        /* A ref on rpc-clnt object is taken while registering +                         * call_bail to timer in __save_frame. If it fails to +                         * register, it needs an unref and should happen outside +                         * conn->lock which otherwise leads to deadlocks */ +                        if (conn->timer == NULL) +                                need_unref = _gf_true; +                          conn->msgcnt++;                          gf_log ("rpc-clnt", GF_LOG_TRACE, "submitted request " @@ -1607,6 +1662,9 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,          }          pthread_mutex_unlock (&conn->lock); +        if (need_unref) +                rpc_clnt_unref (rpc); +          if (ret == -1) {                  goto out;          } @@ -1739,6 +1797,9 @@ rpc_clnt_disable (struct rpc_clnt *rpc)          rpc_clnt_connection_t *conn = NULL;          rpc_transport_t       *trans = NULL;          int                    unref = 0; +        int                    ret   = 0; +        gf_boolean_t           timer_unref = _gf_false; +        gf_boolean_t           reconnect_unref = _gf_false;          if (!rpc) {                  goto out; @@ -1751,12 +1812,20 @@ rpc_clnt_disable (struct rpc_clnt *rpc)                  rpc->disabled = 1;                  if (conn->timer) { -                        gf_timer_call_cancel (rpc->ctx, conn->timer); +                        ret = gf_timer_call_cancel (rpc->ctx, conn->timer); +                        /* If the event is not fired and it actually cancelled +                         * the timer, do the unref else registered call back +                         * function will take care of it. +                         */ +                        if (!ret) +                                timer_unref = _gf_true;                          conn->timer = NULL;                  }                  if (conn->reconnect) { -                        gf_timer_call_cancel (rpc->ctx, conn->reconnect); +                        ret = gf_timer_call_cancel (rpc->ctx, conn->reconnect); +                        if (!ret) +                                reconnect_unref = _gf_true;                          conn->reconnect = NULL;                  }                  conn->connected = 0; @@ -1774,6 +1843,12 @@ rpc_clnt_disable (struct rpc_clnt *rpc)          if (unref)                  rpc_clnt_unref (rpc); +        if (timer_unref) +                rpc_clnt_unref (rpc); + +        if (reconnect_unref) +                rpc_clnt_unref (rpc); +  out:          return;  } @@ -1784,6 +1859,9 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc)          rpc_clnt_connection_t *conn  = NULL;          rpc_transport_t       *trans = NULL;          int                    unref = 0; +        int                    ret   = 0; +        gf_boolean_t           timer_unref = _gf_false; +        gf_boolean_t           reconnect_unref = _gf_false;          if (!rpc)                  goto out; @@ -1794,12 +1872,20 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc)          {                  rpc->disabled = 1;                  if (conn->timer) { -                        gf_timer_call_cancel (rpc->ctx, conn->timer); +                        ret = gf_timer_call_cancel (rpc->ctx, conn->timer); +                        /* If the event is not fired and it actually cancelled +                         * the timer, do the unref else registered call back +                         * function will take care of unref. +                         */ +                        if (!ret) +                                timer_unref = _gf_true;                          conn->timer = NULL;                  }                  if (conn->reconnect) { -                        gf_timer_call_cancel (rpc->ctx, conn->reconnect); +                        ret = gf_timer_call_cancel (rpc->ctx, conn->reconnect); +                        if (!ret) +                                reconnect_unref = _gf_true;                          conn->reconnect = NULL;                  }                  conn->connected = 0; @@ -1815,6 +1901,12 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc)          if (unref)                  rpc_clnt_unref (rpc); +        if (timer_unref) +                rpc_clnt_unref (rpc); + +        if (reconnect_unref) +                rpc_clnt_unref (rpc); +  out:          return;  }  | 
