diff options
| -rw-r--r-- | libglusterfs/src/defaults-tmpl.c | 7 | ||||
| -rw-r--r-- | libglusterfs/src/xlator.c | 2 | ||||
| -rw-r--r-- | libglusterfs/src/xlator.h | 9 | ||||
| -rw-r--r-- | xlators/features/index/src/index.c | 52 | ||||
| -rw-r--r-- | xlators/features/index/src/index.h | 2 | ||||
| -rw-r--r-- | xlators/performance/io-threads/src/io-threads.c | 51 | ||||
| -rw-r--r-- | xlators/performance/io-threads/src/io-threads.h | 2 | ||||
| -rw-r--r-- | xlators/protocol/server/src/server-handshake.c | 25 | ||||
| -rw-r--r-- | xlators/protocol/server/src/server-helpers.c | 76 | ||||
| -rw-r--r-- | xlators/protocol/server/src/server-helpers.h | 4 | ||||
| -rw-r--r-- | xlators/protocol/server/src/server.c | 178 | ||||
| -rw-r--r-- | xlators/protocol/server/src/server.h | 9 | ||||
| -rw-r--r-- | xlators/storage/posix/src/posix-common.c | 10 | 
13 files changed, 346 insertions, 81 deletions
diff --git a/libglusterfs/src/defaults-tmpl.c b/libglusterfs/src/defaults-tmpl.c index 2ce5ec7c685..bdb30d4872c 100644 --- a/libglusterfs/src/defaults-tmpl.c +++ b/libglusterfs/src/defaults-tmpl.c @@ -124,13 +124,18 @@ int  default_notify(xlator_t *this, int32_t event, void *data, ...)  {      GF_UNUSED int ret = 0; +    xlator_t *victim = data; +      switch (event) {          case GF_EVENT_PARENT_UP:          case GF_EVENT_PARENT_DOWN: {              xlator_list_t *list = this->children;              while (list) { -                xlator_notify(list->xlator, event, this); +                if (victim && victim->cleanup_starting) +                    xlator_notify(list->xlator, event, victim); +                else +                    xlator_notify(list->xlator, event, this);                  list = list->next;              }          } break; diff --git a/libglusterfs/src/xlator.c b/libglusterfs/src/xlator.c index 6975fbbd15f..01cb27896ee 100644 --- a/libglusterfs/src/xlator.c +++ b/libglusterfs/src/xlator.c @@ -700,6 +700,8 @@ xlator_init(xlator_t *xl)          xl->mem_acct_init(xl);      xl->instance_name = NULL; +    GF_ATOMIC_INIT(xl->xprtrefcnt, 0); +    GF_ATOMIC_INIT(xl->fd_cnt, 0);      if (!xl->init) {          gf_msg(xl->name, GF_LOG_WARNING, 0, LG_MSG_INIT_FAILED,                 "No init() found"); diff --git a/libglusterfs/src/xlator.h b/libglusterfs/src/xlator.h index 12078bb72e3..9bdbd7f0e26 100644 --- a/libglusterfs/src/xlator.h +++ b/libglusterfs/src/xlator.h @@ -842,6 +842,15 @@ struct _xlator {      /* Flag to understand how this xlator is categorized */      gf_category_t category; + +    /* Variable to save fd_count for detach brick */ +    gf_atomic_t fd_cnt; + +    /* Variable to save xprt associated for detach brick */ +    gf_atomic_t xprtrefcnt; + +    /* Flag to notify got CHILD_DOWN event for detach brick */ +    uint32_t notify_down;  };  typedef struct { diff --git a/xlators/features/index/src/index.c b/xlators/features/index/src/index.c index 8e850a7151b..fb295488789 100644 --- a/xlators/features/index/src/index.c +++ b/xlators/features/index/src/index.c @@ -187,6 +187,7 @@ worker_enqueue(xlator_t *this, call_stub_t *stub)      pthread_mutex_lock(&priv->mutex);      {          __index_enqueue(&priv->callstubs, stub); +        GF_ATOMIC_INC(priv->stub_cnt);          pthread_cond_signal(&priv->cond);      }      pthread_mutex_unlock(&priv->mutex); @@ -220,11 +221,18 @@ index_worker(void *data)              }              if (!bye)                  stub = __index_dequeue(&priv->callstubs); +            if (bye) { +                priv->curr_count--; +                if (priv->curr_count == 0) +                    pthread_cond_broadcast(&priv->cond); +            }          }          pthread_mutex_unlock(&priv->mutex); -        if (stub) /* guard against spurious wakeups */ +        if (stub) { /* guard against spurious wakeups */              call_resume(stub); +            GF_ATOMIC_DEC(priv->stub_cnt); +        }          stub = NULL;          if (bye)              break; @@ -2391,6 +2399,7 @@ init(xlator_t *this)          gf_uuid_generate(priv->internal_vgfid[i]);      INIT_LIST_HEAD(&priv->callstubs); +    GF_ATOMIC_INIT(priv->stub_cnt, 0);      this->local_pool = mem_pool_new(index_local_t, 64);      if (!this->local_pool) { @@ -2419,6 +2428,7 @@ init(xlator_t *this)      index_set_link_count(priv, count, XATTROP);      priv->down = _gf_false; +    priv->curr_count = 0;      ret = gf_thread_create(&priv->thread, &w_attr, index_worker, this,                             "idxwrker");      if (ret) { @@ -2427,7 +2437,7 @@ init(xlator_t *this)                 "Failed to create worker thread, aborting");          goto out;      } - +    priv->curr_count++;      ret = 0;  out:      GF_FREE(tmp); @@ -2545,6 +2555,11 @@ notify(xlator_t *this, int event, void *data, ...)  {      int ret = 0;      index_priv_t *priv = NULL; +    uint64_t stub_cnt = 0; +    xlator_t *victim = data; +    struct timespec sleep_till = { +        0, +    };      if (!this)          return 0; @@ -2553,6 +2568,39 @@ notify(xlator_t *this, int event, void *data, ...)      if (!priv)          return 0; +    if ((event == GF_EVENT_PARENT_DOWN) && victim->cleanup_starting) { +        stub_cnt = GF_ATOMIC_GET(priv->stub_cnt); +        clock_gettime(CLOCK_REALTIME, &sleep_till); +        sleep_till.tv_sec += 1; + +        /* Wait for draining stub from queue before notify PARENT_DOWN */ +        pthread_mutex_lock(&priv->mutex); +        { +            while (stub_cnt) { +                (void)pthread_cond_timedwait(&priv->cond, &priv->mutex, +                                             &sleep_till); +                stub_cnt = GF_ATOMIC_GET(priv->stub_cnt); +            } +        } +        pthread_mutex_unlock(&priv->mutex); +        gf_log(this->name, GF_LOG_INFO, +               "Notify GF_EVENT_PARENT_DOWN for brick %s", victim->name); +    } + +    if ((event == GF_EVENT_CHILD_DOWN) && victim->cleanup_starting) { +        pthread_mutex_lock(&priv->mutex); +        { +            priv->down = _gf_true; +            pthread_cond_broadcast(&priv->cond); +            while (priv->curr_count) +                pthread_cond_wait(&priv->cond, &priv->mutex); +        } +        pthread_mutex_unlock(&priv->mutex); + +        gf_log(this->name, GF_LOG_INFO, +               "Notify GF_EVENT_CHILD_DOWN for brick %s", victim->name); +    } +      ret = default_notify(this, event, data);      return ret;  } diff --git a/xlators/features/index/src/index.h b/xlators/features/index/src/index.h index f1e0293b794..149cfd415b3 100644 --- a/xlators/features/index/src/index.h +++ b/xlators/features/index/src/index.h @@ -58,6 +58,8 @@ typedef struct index_priv {      int64_t pending_count;      pthread_t thread;      gf_boolean_t down; +    gf_atomic_t stub_cnt; +    int32_t curr_count;  } index_priv_t;  typedef struct index_local { diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c index 78678adb859..eac834a6c33 100644 --- a/xlators/performance/io-threads/src/io-threads.c +++ b/xlators/performance/io-threads/src/io-threads.c @@ -123,7 +123,7 @@ __iot_dequeue(iot_conf_t *conf, int *pri)      if (!stub)          return NULL; -    conf->queue_size--; +    GF_ATOMIC_DEC(conf->queue_size);      conf->queue_sizes[*pri]--;      return stub; @@ -155,7 +155,7 @@ __iot_enqueue(iot_conf_t *conf, call_stub_t *stub, int pri)      }      list_add_tail(&stub->list, &ctx->reqs); -    conf->queue_size++; +    GF_ATOMIC_INC(conf->queue_size);      conf->queue_sizes[pri]++;  } @@ -183,7 +183,7 @@ iot_worker(void *data)                  conf->ac_iot_count[pri]--;                  pri = -1;              } -            while (conf->queue_size == 0) { +            while (GF_ATOMIC_GET(conf->queue_size) == 0) {                  if (conf->down) {                      bye = _gf_true; /*Avoid sleep*/                      break; @@ -837,9 +837,9 @@ __iot_workers_scale(iot_conf_t *conf)                                 thread_name);          if (ret == 0) {              conf->curr_count++; -            gf_msg_debug(conf->this->name, 0, -                         "scaled threads to %d (queue_size=%d/%d)", -                         conf->curr_count, conf->queue_size, scale); +            gf_msg_debug( +                conf->this->name, 0, "scaled threads to %d (queue_size=%ld/%d)", +                conf->curr_count, GF_ATOMIC_GET(conf->queue_size), scale);          } else {              break;          } @@ -1231,6 +1231,7 @@ init(xlator_t *this)      GF_OPTION_INIT("pass-through", this->pass_through, bool, out);      conf->this = this; +    GF_ATOMIC_INIT(conf->queue_size, 0);      for (i = 0; i < GF_FOP_PRI_MAX; i++) {          INIT_LIST_HEAD(&conf->clients[i]); @@ -1280,9 +1281,43 @@ int  notify(xlator_t *this, int32_t event, void *data, ...)  {      iot_conf_t *conf = this->private; +    xlator_t *victim = data; +    uint64_t queue_size = 0; +    struct timespec sleep_till = { +        0, +    }; -    if (GF_EVENT_PARENT_DOWN == event) -        iot_exit_threads(conf); +    if (GF_EVENT_PARENT_DOWN == event) { +        if (victim->cleanup_starting) { +            clock_gettime(CLOCK_REALTIME, &sleep_till); +            sleep_till.tv_sec += 1; +            /* Wait for draining stub from queue before notify PARENT_DOWN */ +            queue_size = GF_ATOMIC_GET(conf->queue_size); + +            pthread_mutex_lock(&conf->mutex); +            { +                while (queue_size) { +                    (void)pthread_cond_timedwait(&conf->cond, &conf->mutex, +                                                 &sleep_till); +                    queue_size = GF_ATOMIC_GET(conf->queue_size); +                } +            } +            pthread_mutex_unlock(&conf->mutex); + +            gf_log(this->name, GF_LOG_INFO, +                   "Notify GF_EVENT_PARENT_DOWN for brick %s", victim->name); +        } else { +            iot_exit_threads(conf); +        } +    } + +    if (GF_EVENT_CHILD_DOWN == event) { +        if (victim->cleanup_starting) { +            iot_exit_threads(conf); +            gf_log(this->name, GF_LOG_INFO, +                   "Notify GF_EVENT_CHILD_DOWN for brick %s", victim->name); +        } +    }      default_notify(this, event, data); diff --git a/xlators/performance/io-threads/src/io-threads.h b/xlators/performance/io-threads/src/io-threads.h index 949d1d1b1e0..f80fd41c45d 100644 --- a/xlators/performance/io-threads/src/io-threads.h +++ b/xlators/performance/io-threads/src/io-threads.h @@ -63,7 +63,7 @@ struct iot_conf {      int32_t ac_iot_limit[GF_FOP_PRI_MAX];      int32_t ac_iot_count[GF_FOP_PRI_MAX];      int queue_sizes[GF_FOP_PRI_MAX]; -    int queue_size; +    gf_atomic_t queue_size;      pthread_attr_t w_attr;      gf_boolean_t least_priority; /*Enable/Disable least-priority */ diff --git a/xlators/protocol/server/src/server-handshake.c b/xlators/protocol/server/src/server-handshake.c index 30e7e4b0478..9ba6f0b0a95 100644 --- a/xlators/protocol/server/src/server-handshake.c +++ b/xlators/protocol/server/src/server-handshake.c @@ -592,6 +592,7 @@ server_setvolume(rpcsvc_request_t *req)          goto fail;      } +    pthread_mutex_lock(&conf->mutex);      list_for_each_entry(tmp, &conf->child_status->status_list, status_list)      {          if (strcmp(tmp->name, name) == 0) @@ -599,10 +600,8 @@ server_setvolume(rpcsvc_request_t *req)      }      if (!tmp->name) { -        gf_msg(this->name, GF_LOG_ERROR, 0, PS_MSG_CHILD_STATUS_FAILED, -               "No xlator %s is found in " -               "child status list", -               name); +        gf_msg(this->name, GF_LOG_INFO, 0, PS_MSG_CHILD_STATUS_FAILED, +               "No xlator %s is found in child status list", name);      } else {          ret = dict_set_int32(reply, "child_up", tmp->child_up);          if (ret < 0) @@ -610,7 +609,21 @@ server_setvolume(rpcsvc_request_t *req)                     "Failed to set 'child_up' for xlator %s "                     "in the reply dict",                     tmp->name); +        if (!tmp->child_up) { +            ret = dict_set_str(reply, "ERROR", +                               "Not received child_up for this xlator"); +            if (ret < 0) +                gf_msg_debug(this->name, 0, "failed to set error msg"); + +            gf_msg(this->name, GF_LOG_ERROR, 0, PS_MSG_CHILD_STATUS_FAILED, +                   "Not received child_up for this xlator %s", name); +            op_ret = -1; +            op_errno = EAGAIN; +            pthread_mutex_unlock(&conf->mutex); +            goto fail; +        }      } +    pthread_mutex_unlock(&conf->mutex);      ret = dict_get_str(params, "process-uuid", &client_uid);      if (ret < 0) { @@ -798,8 +811,8 @@ server_setvolume(rpcsvc_request_t *req)          req->trans->clnt_options = dict_ref(params);          gf_msg(this->name, GF_LOG_INFO, 0, PS_MSG_CLIENT_ACCEPTED, -               "accepted client from %s (version: %s)", client->client_uid, -               (clnt_version) ? clnt_version : "old"); +               "accepted client from %s (version: %s) with subvol %s", +               client->client_uid, (clnt_version) ? clnt_version : "old", name);          gf_event(EVENT_CLIENT_CONNECT,                   "client_uid=%s;" diff --git a/xlators/protocol/server/src/server-helpers.c b/xlators/protocol/server/src/server-helpers.c index cb2a73c7168..3e27998897e 100644 --- a/xlators/protocol/server/src/server-helpers.c +++ b/xlators/protocol/server/src/server-helpers.c @@ -233,16 +233,51 @@ server_connection_cleanup_flush_cbk(call_frame_t *frame, void *cookie,      int32_t ret = -1;      fd_t *fd = NULL;      client_t *client = NULL; +    uint64_t fd_cnt = 0; +    xlator_t *victim = NULL; +    server_conf_t *conf = NULL; +    xlator_t *serv_xl = NULL; +    rpc_transport_t *xprt = NULL; +    rpc_transport_t *xp_next = NULL; +    int32_t detach = (long)cookie; +    gf_boolean_t xprt_found = _gf_false;      GF_VALIDATE_OR_GOTO("server", this, out);      GF_VALIDATE_OR_GOTO("server", frame, out);      fd = frame->local;      client = frame->root->client; +    serv_xl = frame->this; +    conf = serv_xl->private;      fd_unref(fd);      frame->local = NULL; +    if (client) +        victim = client->bound_xl; + +    if (victim) { +        fd_cnt = GF_ATOMIC_DEC(victim->fd_cnt); +        if (!fd_cnt && conf && detach) { +            pthread_mutex_lock(&conf->mutex); +            { +                list_for_each_entry_safe(xprt, xp_next, &conf->xprt_list, list) +                { +                    if (!xprt->xl_private) +                        continue; +                    if (xprt->xl_private == client) { +                        xprt_found = _gf_true; +                        break; +                    } +                } +            } +            pthread_mutex_unlock(&conf->mutex); +            if (xprt_found) { +                rpc_transport_unref(xprt); +            } +        } +    } +      gf_client_unref(client);      STACK_DESTROY(frame->root); @@ -253,7 +288,7 @@ out:  static int  do_fd_cleanup(xlator_t *this, client_t *client, fdentry_t *fdentries, -              int fd_count) +              int fd_count, int32_t detach)  {      fd_t *fd = NULL;      int i = 0, ret = -1; @@ -265,6 +300,7 @@ do_fd_cleanup(xlator_t *this, client_t *client, fdentry_t *fdentries,      GF_VALIDATE_OR_GOTO("server", fdentries, out);      bound_xl = client->bound_xl; +      for (i = 0; i < fd_count; i++) {          fd = fdentries[i].fd; @@ -294,8 +330,9 @@ do_fd_cleanup(xlator_t *this, client_t *client, fdentry_t *fdentries,              tmp_frame->root->client = client;              memset(&tmp_frame->root->lk_owner, 0, sizeof(gf_lkowner_t)); -            STACK_WIND(tmp_frame, server_connection_cleanup_flush_cbk, bound_xl, -                       bound_xl->fops->flush, fd, NULL); +            STACK_WIND_COOKIE(tmp_frame, server_connection_cleanup_flush_cbk, +                              (void *)(long)detach, bound_xl, +                              bound_xl->fops->flush, fd, NULL);          }      } @@ -307,13 +344,19 @@ out:  }  int -server_connection_cleanup(xlator_t *this, client_t *client, int32_t flags) +server_connection_cleanup(xlator_t *this, client_t *client, int32_t flags, +                          gf_boolean_t *fd_exist)  {      server_ctx_t *serv_ctx = NULL;      fdentry_t *fdentries = NULL;      uint32_t fd_count = 0;      int cd_ret = 0;      int ret = 0; +    xlator_t *bound_xl = NULL; +    int i = 0; +    fd_t *fd = NULL; +    uint64_t fd_cnt = 0; +    int32_t detach = 0;      GF_VALIDATE_OR_GOTO("server", this, out);      GF_VALIDATE_OR_GOTO(this->name, client, out); @@ -343,11 +386,34 @@ server_connection_cleanup(xlator_t *this, client_t *client, int32_t flags)      }      if (fdentries != NULL) { +        /* Loop to configure fd_count on victim brick */ +        bound_xl = client->bound_xl; +        if (bound_xl) { +            for (i = 0; i < fd_count; i++) { +                fd = fdentries[i].fd; +                if (!fd) +                    continue; +                fd_cnt++; +            } +            if (fd_cnt) { +                if (fd_exist) +                    (*fd_exist) = _gf_true; +                GF_ATOMIC_ADD(bound_xl->fd_cnt, fd_cnt); +            } +        } + +        /* If fd_exist is not NULL it means function is invoke +           by server_rpc_notify at the time of getting DISCONNECT +           notification +        */ +        if (fd_exist) +            detach = 1; +          gf_msg_debug(this->name, 0,                       "Performing cleanup on %d "                       "fdentries",                       fd_count); -        ret = do_fd_cleanup(this, client, fdentries, fd_count); +        ret = do_fd_cleanup(this, client, fdentries, fd_count, detach);      } else          gf_msg(this->name, GF_LOG_INFO, 0, PS_MSG_FDENTRY_NULL,                 "no fdentries to clean"); diff --git a/xlators/protocol/server/src/server-helpers.h b/xlators/protocol/server/src/server-helpers.h index 20b8d901bd2..9f2e1546831 100644 --- a/xlators/protocol/server/src/server-helpers.h +++ b/xlators/protocol/server/src/server-helpers.h @@ -43,8 +43,8 @@ call_frame_t *  get_frame_from_request(rpcsvc_request_t *req);  int -server_connection_cleanup(xlator_t *this, struct _client *client, -                          int32_t flags); +server_connection_cleanup(xlator_t *this, struct _client *client, int32_t flags, +                          gf_boolean_t *fd_exist);  int  server_build_config(xlator_t *this, server_conf_t *conf); diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c index b9879cdf148..e1ec5512510 100644 --- a/xlators/protocol/server/src/server.c +++ b/xlators/protocol/server/src/server.c @@ -133,7 +133,7 @@ server_submit_reply(call_frame_t *frame, rpcsvc_request_t *req, void *arg,                           "Reply submission failed");          if (frame && client) {              server_connection_cleanup(frame->this, client, -                                      INTERNAL_LOCKS | POSIX_LOCKS); +                                      INTERNAL_LOCKS | POSIX_LOCKS, NULL);          } else {              gf_msg_callingfn("", GF_LOG_ERROR, 0, PS_MSG_REPLY_SUBMIT_FAILED,                               "Reply submission failed"); @@ -392,6 +392,36 @@ out:      return error;  } +void +server_call_xlator_mem_cleanup(xlator_t *this, char *victim_name) +{ +    pthread_t th_id = { +        0, +    }; +    int th_ret = -1; +    server_cleanup_xprt_arg_t *arg = NULL; + +    if (!victim_name) +        return; + +    gf_log(this->name, GF_LOG_INFO, "Create graph janitor thread for brick %s", +           victim_name); + +    arg = calloc(1, sizeof(*arg)); +    arg->this = this; +    arg->victim_name = gf_strdup(victim_name); +    th_ret = gf_thread_create_detached(&th_id, server_graph_janitor_threads, +                                       arg, "graphjanitor"); +    if (th_ret) { +        gf_log(this->name, GF_LOG_ERROR, +               "graph janitor Thread" +               " creation is failed for brick %s", +               victim_name); +        GF_FREE(arg->victim_name); +        free(arg); +    } +} +  int  server_rpc_notify(rpcsvc_t *rpc, void *xl, rpcsvc_event_t event, void *data)  { @@ -402,14 +432,9 @@ server_rpc_notify(rpcsvc_t *rpc, void *xl, rpcsvc_event_t event, void *data)      client_t *client = NULL;      char *auth_path = NULL;      int ret = -1; -    gf_boolean_t victim_found = _gf_false;      char *xlator_name = NULL; -    glusterfs_ctx_t *ctx = NULL; -    xlator_t *top = NULL; -    xlator_list_t **trav_p = NULL; -    xlator_t *travxl = NULL;      uint64_t xprtrefcount = 0; -    struct _child_status *tmp = NULL; +    gf_boolean_t fd_exist = _gf_false;      if (!xl || !data) {          gf_msg_callingfn("server", GF_LOG_WARNING, 0, PS_MSG_RPC_NOTIFY_ERROR, @@ -420,7 +445,6 @@ server_rpc_notify(rpcsvc_t *rpc, void *xl, rpcsvc_event_t event, void *data)      this = xl;      trans = data;      conf = this->private; -    ctx = this->ctx;      switch (event) {          case RPCSVC_EVENT_ACCEPT: { @@ -457,7 +481,8 @@ server_rpc_notify(rpcsvc_t *rpc, void *xl, rpcsvc_event_t event, void *data)               */              pthread_mutex_lock(&conf->mutex);              client = trans->xl_private; -            list_del_init(&trans->list); +            if (!client) +                list_del_init(&trans->list);              pthread_mutex_unlock(&conf->mutex);              if (!client) @@ -478,8 +503,8 @@ server_rpc_notify(rpcsvc_t *rpc, void *xl, rpcsvc_event_t event, void *data)              gf_client_ref(client);              gf_client_put(client, &detached);              if (detached) { -                server_connection_cleanup(this, client, -                                          INTERNAL_LOCKS | POSIX_LOCKS); +                server_connection_cleanup( +                    this, client, INTERNAL_LOCKS | POSIX_LOCKS, &fd_exist);                  gf_event(EVENT_CLIENT_DISCONNECT,                           "client_uid=%s;"                           "client_identifier=%s;server_identifier=%s;" @@ -496,53 +521,36 @@ server_rpc_notify(rpcsvc_t *rpc, void *xl, rpcsvc_event_t event, void *data)          unref_transport:              /* rpc_transport_unref() causes a RPCSVC_EVENT_TRANSPORT_DESTROY               * to be called in blocking manner -             * So no code should ideally be after this unref +             * So no code should ideally be after this unref, Call +             * rpc_transport_unref only while cleanup_starting flag is not set +             * otherwise transport_unref will be call by either +             * server_connection_cleanup_flush_cbk or server_submit_reply at the +             * time of freeing state               */ -            rpc_transport_unref(trans); +            if (!client || !detached || !fd_exist) +                rpc_transport_unref(trans);              break;          case RPCSVC_EVENT_TRANSPORT_DESTROY: +            pthread_mutex_lock(&conf->mutex);              client = trans->xl_private; +            list_del_init(&trans->list); +            pthread_mutex_unlock(&conf->mutex);              if (!client)                  break; -            pthread_mutex_lock(&conf->mutex); -            list_for_each_entry(tmp, &conf->child_status->status_list, -                                status_list) -            { -                if (tmp->name && client->bound_xl && -                    client->bound_xl->cleanup_starting && -                    !strcmp(tmp->name, client->bound_xl->name)) { -                    xprtrefcount = GF_ATOMIC_GET(tmp->xprtrefcnt); -                    if (xprtrefcount > 0) { -                        xprtrefcount = GF_ATOMIC_DEC(tmp->xprtrefcnt); -                        if (xprtrefcount == 0) -                            xlator_name = gf_strdup(client->bound_xl->name); -                    } -                    break; + +            if (client->bound_xl && client->bound_xl->cleanup_starting) { +                xprtrefcount = GF_ATOMIC_GET(client->bound_xl->xprtrefcnt); +                if (xprtrefcount > 0) { +                    xprtrefcount = GF_ATOMIC_DEC(client->bound_xl->xprtrefcnt); +                    if (xprtrefcount == 0) +                        xlator_name = gf_strdup(client->bound_xl->name);                  }              } -            pthread_mutex_unlock(&conf->mutex);              gf_client_unref(client);              if (xlator_name) { -                if (this->ctx->active) { -                    top = this->ctx->active->first; -                    LOCK(&ctx->volfile_lock); -                    for (trav_p = &top->children; *trav_p; -                         trav_p = &(*trav_p)->next) { -                        travxl = (*trav_p)->xlator; -                        if (!travxl->call_cleanup && -                            strcmp(travxl->name, xlator_name) == 0) { -                            victim_found = _gf_true; -                            break; -                        } -                    } -                    UNLOCK(&ctx->volfile_lock); -                    if (victim_found) { -                        xlator_mem_cleanup(travxl); -                        rpcsvc_autoscale_threads(ctx, conf->rpc, -1); -                    } -                } +                server_call_xlator_mem_cleanup(this, xlator_name);                  GF_FREE(xlator_name);              } @@ -556,6 +564,67 @@ out:      return 0;  } +void * +server_graph_janitor_threads(void *data) +{ +    xlator_t *victim = NULL; +    xlator_t *this = NULL; +    server_conf_t *conf = NULL; +    glusterfs_ctx_t *ctx = NULL; +    char *victim_name = NULL; +    server_cleanup_xprt_arg_t *arg = NULL; +    gf_boolean_t victim_found = _gf_false; +    xlator_list_t **trav_p = NULL; +    xlator_t *top = NULL; + +    GF_ASSERT(data); + +    arg = data; +    this = arg->this; +    victim_name = arg->victim_name; +    THIS = arg->this; +    conf = this->private; + +    ctx = THIS->ctx; +    GF_VALIDATE_OR_GOTO(this->name, ctx, out); + +    top = this->ctx->active->first; +    LOCK(&ctx->volfile_lock); +    for (trav_p = &top->children; *trav_p; trav_p = &(*trav_p)->next) { +        victim = (*trav_p)->xlator; +        if (victim->cleanup_starting && +            strcmp(victim->name, victim_name) == 0) { +            victim_found = _gf_true; +            break; +        } +    } +    if (victim_found) +        glusterfs_delete_volfile_checksum(ctx, victim->volfile_id); +    UNLOCK(&ctx->volfile_lock); +    if (!victim_found) { +        gf_log(this->name, GF_LOG_ERROR, +               "victim brick %s is not" +               " found in graph", +               victim_name); +        goto out; +    } + +    default_notify(victim, GF_EVENT_PARENT_DOWN, victim); +    if (victim->notify_down) { +        gf_log(THIS->name, GF_LOG_INFO, +               "Start call fini for brick" +               " %s stack", +               victim->name); +        xlator_mem_cleanup(victim); +        rpcsvc_autoscale_threads(ctx, conf->rpc, -1); +    } + +out: +    GF_FREE(arg->victim_name); +    free(arg); +    return NULL; +} +  int32_t  server_mem_acct_init(xlator_t *this)  { @@ -972,13 +1041,7 @@ server_init(xlator_t *this)      conf->child_status = GF_CALLOC(1, sizeof(struct _child_status),                                     gf_server_mt_child_status);      INIT_LIST_HEAD(&conf->child_status->status_list); -    GF_ATOMIC_INIT(conf->child_status->xprtrefcnt, 0); -    /*ret = dict_get_str (this->options, "statedump-path", &statedump_path); -    if (!ret) { -            gf_path_strip_trailing_slashes (statedump_path); -            this->ctx->statedump_path = statedump_path; -    }*/      GF_OPTION_INIT("statedump-path", statedump_path, path, out);      if (statedump_path) {          gf_path_strip_trailing_slashes(statedump_path); @@ -1484,6 +1547,12 @@ server_notify(xlator_t *this, int32_t event, void *data, ...)          }          case GF_EVENT_CHILD_DOWN: { +            if (victim->cleanup_starting) { +                victim->notify_down = 1; +                gf_log(this->name, GF_LOG_INFO, +                       "Getting CHILD_DOWN event for brick %s", victim->name); +            } +              ret = server_process_child_event(this, event, data,                                               GF_CBK_CHILD_DOWN);              if (ret) { @@ -1517,7 +1586,7 @@ server_notify(xlator_t *this, int32_t event, void *data, ...)              {                  if (strcmp(tmp->name, victim->name) == 0) {                      tmp->child_up = _gf_false; -                    GF_ATOMIC_INIT(tmp->xprtrefcnt, totxprt); +                    GF_ATOMIC_INIT(victim->xprtrefcnt, totxprt);                      break;                  }              } @@ -1561,8 +1630,7 @@ server_notify(xlator_t *this, int32_t event, void *data, ...)                  rpc_clnt_mgmt_pmap_signout(ctx, victim->name);                  if (!xprt_found && victim_found) { -                    xlator_mem_cleanup(victim); -                    rpcsvc_autoscale_threads(ctx, conf->rpc, -1); +                    server_call_xlator_mem_cleanup(this, victim->name);                  }              }              break; diff --git a/xlators/protocol/server/src/server.h b/xlators/protocol/server/src/server.h index e6064af076e..a0e0a960c7c 100644 --- a/xlators/protocol/server/src/server.h +++ b/xlators/protocol/server/src/server.h @@ -100,7 +100,6 @@ struct _child_status {      struct list_head status_list;      char *name;      gf_boolean_t child_up; -    gf_atomic_t xprtrefcnt;  };  struct server_conf {      rpcsvc_t *rpc; @@ -241,6 +240,11 @@ typedef struct _server_ctx {      fdtable_t *fdtable;  } server_ctx_t; +typedef struct server_cleanup_xprt_arg { +    xlator_t *this; +    char *victim_name; +} server_cleanup_xprt_arg_t; +  int  server_submit_reply(call_frame_t *frame, rpcsvc_request_t *req, void *arg,                      struct iovec *payload, int payloadcount, @@ -254,6 +258,9 @@ gf_server_check_getxattr_cmd(call_frame_t *frame, const char *name);  void  forget_inode_if_no_dentry(inode_t *inode); +void * +server_graph_janitor_threads(void *); +  server_ctx_t *  server_ctx_get(client_t *client, xlator_t *xlator);  #endif /* !_SERVER_H */ diff --git a/xlators/storage/posix/src/posix-common.c b/xlators/storage/posix/src/posix-common.c index 9c9d52e3609..03cb532e2c5 100644 --- a/xlators/storage/posix/src/posix-common.c +++ b/xlators/storage/posix/src/posix-common.c @@ -143,11 +143,21 @@ posix_inode(xlator_t *this)  int32_t  posix_notify(xlator_t *this, int32_t event, void *data, ...)  { +    xlator_t *victim = data; +      switch (event) {          case GF_EVENT_PARENT_UP: {              /* Tell the parent that posix xlator is up */              default_notify(this, GF_EVENT_CHILD_UP, data);          } break; + +        case GF_EVENT_PARENT_DOWN: { +            if (!victim->cleanup_starting) +                break; +            gf_log(this->name, GF_LOG_INFO, "Sending CHILD_DOWN for brick %s", +                   victim->name); +            default_notify(this->parents->xlator, GF_EVENT_CHILD_DOWN, data); +        } break;          default:              /* */              break;  | 
