diff options
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.c | 104 | ||||
| -rw-r--r-- | rpc/rpc-lib/src/rpc-clnt.h | 1 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-ev-handle.c | 12 | ||||
| -rw-r--r-- | xlators/features/changelog/src/changelog-rpc.c | 5 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-rebalance.c | 1 | ||||
| -rw-r--r-- | xlators/mgmt/glusterd/src/glusterd-utils.c | 1 | 
6 files changed, 117 insertions, 7 deletions
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c index cf261fcc58b..76d187931b0 100644 --- a/rpc/rpc-lib/src/rpc-clnt.c +++ b/rpc/rpc-lib/src/rpc-clnt.c @@ -126,6 +126,7 @@ call_bail (void *data)          struct timespec        timeout = {0,};          struct iovec           iov = {0,};          char                   peerid[UNIX_PATH_MAX] = {0}; +        gf_boolean_t           need_unref = _gf_false;          GF_VALIDATE_OR_GOTO ("client", data, out); @@ -158,6 +159,8 @@ call_bail (void *data)                          timeout.tv_sec = 10;                          timeout.tv_nsec = 0; +                        /* Ref rpc as it's added to timer event queue */ +                        rpc_clnt_ref (clnt);                          gf_timer_call_cancel (clnt->ctx, conn->timer);                          conn->timer = gf_timer_call_after (clnt->ctx,                                                             timeout, @@ -168,6 +171,7 @@ call_bail (void *data)                                  gf_log (conn->name, GF_LOG_WARNING,                                          "Cannot create bailout timer for %s",                                          peerid); +                                need_unref = _gf_true;                          }                  } @@ -210,6 +214,9 @@ call_bail (void *data)                  mem_put (trav);          }  out: +        rpc_clnt_unref (clnt); +        if (need_unref) +                rpc_clnt_unref (clnt);          return;  } @@ -235,6 +242,7 @@ __save_frame (struct rpc_clnt *rpc_clnt, call_frame_t *frame,          if (conn->timer == NULL) {                  timeout.tv_sec  = 10;                  timeout.tv_nsec = 0; +                rpc_clnt_ref (rpc_clnt);                  conn->timer = gf_timer_call_after (rpc_clnt->ctx,                                                     timeout,                                                     call_bail, @@ -394,6 +402,7 @@ rpc_clnt_reconnect (void *conn_ptr)          struct timespec          ts    = {0, 0};          int32_t                  ret   = 0;          struct rpc_clnt         *clnt  = NULL; +        gf_boolean_t             need_unref = _gf_false;          conn  = conn_ptr;          clnt = conn->rpc_clnt; @@ -418,10 +427,16 @@ rpc_clnt_reconnect (void *conn_ptr)                                  "attempting reconnect");                          ret = rpc_transport_connect (trans,                                                       conn->config.remote_port); +                        rpc_clnt_ref (clnt);                          conn->reconnect =                                  gf_timer_call_after (clnt->ctx, ts,                                                       rpc_clnt_reconnect,                                                       conn); +                        if (!conn->reconnect) { +                                need_unref = _gf_true; +                                gf_log (conn->name, GF_LOG_ERROR, +                                        "Error adding to timer event queue"); +                        }                  } else {                          gf_log (conn->name, GF_LOG_TRACE,                                  "breaking reconnect chain"); @@ -429,6 +444,9 @@ rpc_clnt_reconnect (void *conn_ptr)          }          pthread_mutex_unlock (&conn->lock); +        rpc_clnt_unref (clnt); +        if (need_unref) +                rpc_clnt_unref (clnt);          return;  } @@ -468,6 +486,8 @@ int  rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn)  {          struct rpc_clnt         *clnt  = NULL; +        int                      ret   = 0; +        gf_boolean_t             reconnect_unref = _gf_false;          if (!conn) {                  goto out; @@ -479,13 +499,18 @@ rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn)          {                  if (conn->reconnect) { -                        gf_timer_call_cancel (clnt->ctx, conn->reconnect); +                        ret = gf_timer_call_cancel (clnt->ctx, conn->reconnect); +                        if (!ret) +                                reconnect_unref = _gf_true;                          conn->reconnect = NULL;                  }          }          pthread_mutex_unlock (&conn->lock); +        if (reconnect_unref) +                rpc_clnt_unref (clnt); +  out:          return 0;  } @@ -501,6 +526,8 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn)          struct saved_frames    *saved_frames = NULL;          struct rpc_clnt         *clnt  = NULL;          int                     unref = 0; +        int                     ret   = 0; +        gf_boolean_t            timer_unref = _gf_false;          if (!conn) {                  goto out; @@ -516,7 +543,9 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn)                  /* bailout logic cleanup */                  if (conn->timer) { -                        gf_timer_call_cancel (clnt->ctx, conn->timer); +                        ret = gf_timer_call_cancel (clnt->ctx, conn->timer); +                        if (!ret) +                                timer_unref = _gf_true;                          conn->timer = NULL;                  } @@ -533,6 +562,9 @@ rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn)          if (unref)                  rpc_clnt_unref (clnt); +        if (timer_unref) +                rpc_clnt_unref (clnt); +  out:          return 0;  } @@ -856,6 +888,7 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,          rpc_transport_pollin_t *pollin      = NULL;          struct timespec         ts          = {0, };          void                   *clnt_mydata = NULL; +        gf_boolean_t            unref_clnt  = _gf_false;          DECLARE_OLD_THIS;          conn = mydata; @@ -880,10 +913,16 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,                                  ts.tv_sec = 10;                                  ts.tv_nsec = 0; +                                rpc_clnt_ref (clnt);                                  conn->reconnect =                                          gf_timer_call_after (clnt->ctx, ts,                                                               rpc_clnt_reconnect,                                                               conn); +                                if (conn->reconnect == NULL) { +                                        gf_log (conn->name, GF_LOG_WARNING, +                                                "Cannot create rpc_clnt_reconnect timer"); +                                        unref_clnt = _gf_true; +                                }                          }                  }                  pthread_mutex_unlock (&conn->lock); @@ -891,6 +930,9 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,                  if (clnt->notifyfn)                          ret = clnt->notifyfn (clnt, clnt->mydata,                                                RPC_CLNT_DISCONNECT, NULL); +                if (unref_clnt) +                        rpc_clnt_ref (clnt); +                  break;          } @@ -1148,6 +1190,10 @@ rpc_clnt_start (struct rpc_clnt *rpc)                  rpc->disabled = 0;          }          pthread_mutex_unlock (&conn->lock); +        /* Corresponding unref will be either on successful timer cancel or last +         * rpc_clnt_reconnect fire event. +         */ +        rpc_clnt_ref (rpc);          rpc_clnt_reconnect (conn);          return 0; @@ -1525,6 +1571,7 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,          int                    proglen     = 0;          char                   new_iobref  = 0;          uint64_t               callid      = 0; +        gf_boolean_t           need_unref  = _gf_false;          if (!rpc || !prog || !frame) {                  goto out; @@ -1609,6 +1656,14 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,                  if ((ret >= 0) && frame) {                          /* Save the frame in queue */                          __save_frame (rpc, frame, rpcreq); + +                        /* A ref on rpc-clnt object is taken while registering +                         * call_bail to timer in __save_frame. If it fails to +                         * register, it needs an unref and should happen outside +                         * conn->lock which otherwise leads to deadlocks */ +                        if (conn->timer == NULL) +                                need_unref = _gf_true; +                          conn->msgcnt++;                          gf_log ("rpc-clnt", GF_LOG_TRACE, "submitted request " @@ -1620,6 +1675,9 @@ rpc_clnt_submit (struct rpc_clnt *rpc, rpc_clnt_prog_t *prog,          }          pthread_mutex_unlock (&conn->lock); +        if (need_unref) +                rpc_clnt_unref (rpc); +          if (ret == -1) {                  goto out;          } @@ -1752,6 +1810,9 @@ rpc_clnt_disable (struct rpc_clnt *rpc)          rpc_clnt_connection_t *conn = NULL;          rpc_transport_t       *trans = NULL;          int                    unref = 0; +        int                    ret   = 0; +        gf_boolean_t           timer_unref = _gf_false; +        gf_boolean_t           reconnect_unref = _gf_false;          if (!rpc) {                  goto out; @@ -1764,12 +1825,20 @@ rpc_clnt_disable (struct rpc_clnt *rpc)                  rpc->disabled = 1;                  if (conn->timer) { -                        gf_timer_call_cancel (rpc->ctx, conn->timer); +                        ret = gf_timer_call_cancel (rpc->ctx, conn->timer); +                        /* If the event is not fired and it actually cancelled +                         * the timer, do the unref else registered call back +                         * function will take care of it. +                         */ +                        if (!ret) +                                timer_unref = _gf_true;                          conn->timer = NULL;                  }                  if (conn->reconnect) { -                        gf_timer_call_cancel (rpc->ctx, conn->reconnect); +                        ret = gf_timer_call_cancel (rpc->ctx, conn->reconnect); +                        if (!ret) +                                reconnect_unref = _gf_true;                          conn->reconnect = NULL;                  }                  conn->connected = 0; @@ -1787,6 +1856,12 @@ rpc_clnt_disable (struct rpc_clnt *rpc)          if (unref)                  rpc_clnt_unref (rpc); +        if (timer_unref) +                rpc_clnt_unref (rpc); + +        if (reconnect_unref) +                rpc_clnt_unref (rpc); +  out:          return;  } @@ -1797,6 +1872,9 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc)          rpc_clnt_connection_t *conn  = NULL;          rpc_transport_t       *trans = NULL;          int                    unref = 0; +        int                    ret   = 0; +        gf_boolean_t           timer_unref = _gf_false; +        gf_boolean_t           reconnect_unref = _gf_false;          if (!rpc)                  goto out; @@ -1807,12 +1885,20 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc)          {                  rpc->disabled = 1;                  if (conn->timer) { -                        gf_timer_call_cancel (rpc->ctx, conn->timer); +                        ret = gf_timer_call_cancel (rpc->ctx, conn->timer); +                        /* If the event is not fired and it actually cancelled +                         * the timer, do the unref else registered call back +                         * function will take care of unref. +                         */ +                        if (!ret) +                                timer_unref = _gf_true;                          conn->timer = NULL;                  }                  if (conn->reconnect) { -                        gf_timer_call_cancel (rpc->ctx, conn->reconnect); +                        ret = gf_timer_call_cancel (rpc->ctx, conn->reconnect); +                        if (!ret) +                                reconnect_unref = _gf_true;                          conn->reconnect = NULL;                  }                  conn->connected = 0; @@ -1828,6 +1914,12 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc)          if (unref)                  rpc_clnt_unref (rpc); +        if (timer_unref) +                rpc_clnt_unref (rpc); + +        if (reconnect_unref) +                rpc_clnt_unref (rpc); +  out:          return;  } diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h index 01caeb814c0..f84b4cbf806 100644 --- a/rpc/rpc-lib/src/rpc-clnt.h +++ b/rpc/rpc-lib/src/rpc-clnt.h @@ -231,6 +231,7 @@ struct rpc_clnt *  rpc_clnt_unref (struct rpc_clnt *rpc);  int rpc_clnt_connection_cleanup (rpc_clnt_connection_t *conn); +int rpc_clnt_reconnect_cleanup (rpc_clnt_connection_t *conn);  void rpc_clnt_set_connected (rpc_clnt_connection_t *conn); diff --git a/xlators/features/changelog/src/changelog-ev-handle.c b/xlators/features/changelog/src/changelog-ev-handle.c index bcce28ebd7e..8097d636178 100644 --- a/xlators/features/changelog/src/changelog-ev-handle.c +++ b/xlators/features/changelog/src/changelog-ev-handle.c @@ -157,6 +157,13 @@ changelog_rpc_notify (struct rpc_clnt *rpc,                  break;          case RPC_CLNT_DISCONNECT:                  rpc_clnt_disable (crpc->rpc); + +                /* rpc_clnt_disable doesn't unref the rpc. It just marks +                 * the rpc as disabled and cancels reconnection timer. +                 * Hence unref the rpc object to free it. +                 */ +                rpc_clnt_unref (crpc->rpc); +                  selection = &priv->ev_selection;                  LOCK (&crpc->lock); @@ -170,6 +177,8 @@ changelog_rpc_notify (struct rpc_clnt *rpc,                  break;          case RPC_CLNT_MSG:          case RPC_CLNT_DESTROY: +                /* Free up mydata */ +                changelog_rpc_clnt_unref (crpc);                  break;          } @@ -251,7 +260,9 @@ get_client (changelog_clnt_t *c_clnt, struct list_head **next)                  if (*next == &c_clnt->active)                          goto unblock;                  crpc = list_entry (*next, changelog_rpc_clnt_t, list); +                /* ref rpc as DISCONNECT might unref the rpc asynchronously */                  changelog_rpc_clnt_ref (crpc); +                rpc_clnt_ref (crpc->rpc);                  *next = (*next)->next;          }   unblock: @@ -265,6 +276,7 @@ put_client (changelog_clnt_t *c_clnt, changelog_rpc_clnt_t *crpc)  {          LOCK (&c_clnt->active_lock);          { +                rpc_clnt_unref (crpc->rpc);                  changelog_rpc_clnt_unref (crpc);          }          UNLOCK (&c_clnt->active_lock); diff --git a/xlators/features/changelog/src/changelog-rpc.c b/xlators/features/changelog/src/changelog-rpc.c index 005ac15c3c7..69a0db193ed 100644 --- a/xlators/features/changelog/src/changelog-rpc.c +++ b/xlators/features/changelog/src/changelog-rpc.c @@ -199,7 +199,10 @@ changelog_rpc_clnt_init (xlator_t *this,                  goto error_return;          INIT_LIST_HEAD (&crpc->list); -        crpc->ref = 0; +        /* Take a ref, the last unref will be on RPC_CLNT_DESTROY +         * which comes as a result of last rpc_clnt_unref. +         */ +        crpc->ref = 1;          changelog_set_disconnect_flag (crpc, _gf_false);          crpc->filter = rpc_req->filter; diff --git a/xlators/mgmt/glusterd/src/glusterd-rebalance.c b/xlators/mgmt/glusterd/src/glusterd-rebalance.c index 2781b5d9f90..dc2ac372723 100644 --- a/xlators/mgmt/glusterd/src/glusterd-rebalance.c +++ b/xlators/mgmt/glusterd/src/glusterd-rebalance.c @@ -149,6 +149,7 @@ __glusterd_defrag_notify (struct rpc_clnt *rpc, void *mydata,                  glusterd_store_perform_node_state_store (volinfo); +                rpc_clnt_reconnect_cleanup (&defrag->rpc->conn);                  glusterd_defrag_rpc_put (defrag);                  if (defrag->cbk_fn)                          defrag->cbk_fn (volinfo, diff --git a/xlators/mgmt/glusterd/src/glusterd-utils.c b/xlators/mgmt/glusterd/src/glusterd-utils.c index 655cc0446eb..b9145b58408 100644 --- a/xlators/mgmt/glusterd/src/glusterd-utils.c +++ b/xlators/mgmt/glusterd/src/glusterd-utils.c @@ -10491,6 +10491,7 @@ glusterd_rpc_clnt_unref (glusterd_conf_t *conf, rpc_clnt_t *rpc)          GF_ASSERT (conf);          GF_ASSERT (rpc);          synclock_unlock (&conf->big_lock); +        (void) rpc_clnt_reconnect_cleanup (&rpc->conn);          ret = rpc_clnt_unref (rpc);          synclock_lock (&conf->big_lock);  | 
