diff options
Diffstat (limited to 'xlators/features/changelog/src/changelog.c')
-rw-r--r-- | xlators/features/changelog/src/changelog.c | 149 |
1 files changed, 109 insertions, 40 deletions
diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c index 0a03cfa3673..75d5ae267fd 100644 --- a/xlators/features/changelog/src/changelog.c +++ b/xlators/features/changelog/src/changelog.c @@ -34,6 +34,12 @@ static struct changelog_bootstrap cb_bootstrap[] = { }, }; +static int +changelog_init_rpc(xlator_t *this, changelog_priv_t *priv); + +static int +changelog_init(xlator_t *this, changelog_priv_t *priv); + /* Entry operations - TYPE III */ /** @@ -1997,6 +2003,11 @@ notify(xlator_t *this, int event, void *data, ...) uint64_t clntcnt = 0; changelog_clnt_t *conn = NULL; gf_boolean_t cleanup_notify = _gf_false; + char sockfile[UNIX_PATH_MAX] = { + 0, + }; + rpcsvc_listener_t *listener = NULL; + rpcsvc_listener_t *next = NULL; INIT_LIST_HEAD(&queue); @@ -2010,23 +2021,40 @@ notify(xlator_t *this, int event, void *data, ...) "cleanup changelog rpc connection of brick %s", priv->victim->name); - this->cleanup_starting = 1; - changelog_destroy_rpc_listner(this, priv); - conn = &priv->connections; - if (conn) - changelog_ev_cleanup_connections(this, conn); - xprtcnt = GF_ATOMIC_GET(priv->xprtcnt); - clntcnt = GF_ATOMIC_GET(priv->clntcnt); - - if (!xprtcnt && !clntcnt) { - LOCK(&priv->lock); - { - cleanup_notify = priv->notify_down; - priv->notify_down = _gf_true; + if (priv->rpc_active) { + this->cleanup_starting = 1; + changelog_destroy_rpc_listner(this, priv); + conn = &priv->connections; + if (conn) + changelog_ev_cleanup_connections(this, conn); + xprtcnt = GF_ATOMIC_GET(priv->xprtcnt); + clntcnt = GF_ATOMIC_GET(priv->clntcnt); + if (!xprtcnt && !clntcnt) { + LOCK(&priv->lock); + { + cleanup_notify = priv->notify_down; + priv->notify_down = _gf_true; + } + UNLOCK(&priv->lock); + list_for_each_entry_safe(listener, next, &priv->rpc->listeners, + list) + { + if (listener->trans) { + rpc_transport_unref(listener->trans); + } + } + CHANGELOG_MAKE_SOCKET_PATH(priv->changelog_brick, sockfile, + UNIX_PATH_MAX); + sys_unlink(sockfile); + if (priv->rpc) { + rpcsvc_destroy(priv->rpc); + priv->rpc = NULL; + } + if (!cleanup_notify) + default_notify(this, GF_EVENT_PARENT_DOWN, data); } - UNLOCK(&priv->lock); - if (!cleanup_notify) - default_notify(this, GF_EVENT_PARENT_DOWN, data); + } else { + default_notify(this, GF_EVENT_PARENT_DOWN, data); } goto out; } @@ -2405,6 +2433,22 @@ changelog_barrier_pthread_destroy(changelog_priv_t *priv) LOCK_DESTROY(&priv->bflags.lock); } +static void +changelog_cleanup_rpc(xlator_t *this, changelog_priv_t *priv) +{ + /* terminate rpc server */ + if (!this->cleanup_starting) + changelog_destroy_rpc_listner(this, priv); + + (void)changelog_cleanup_rpc_threads(this, priv); + /* cleanup rot buffs */ + rbuf_dtor(priv->rbuf); + + /* cleanup poller thread */ + if (priv->poller) + (void)changelog_thread_cleanup(this, priv->poller); +} + int reconfigure(xlator_t *this, dict_t *options) { @@ -2413,6 +2457,9 @@ reconfigure(xlator_t *this, dict_t *options) changelog_priv_t *priv = NULL; gf_boolean_t active_earlier = _gf_true; gf_boolean_t active_now = _gf_true; + gf_boolean_t rpc_active_earlier = _gf_true; + gf_boolean_t rpc_active_now = _gf_true; + gf_boolean_t iniate_rpc = _gf_false; changelog_time_slice_t *slice = NULL; changelog_log_data_t cld = { 0, @@ -2434,6 +2481,7 @@ reconfigure(xlator_t *this, dict_t *options) ret = -1; active_earlier = priv->active; + rpc_active_earlier = priv->rpc_active; /* first stop the rollover and the fsync thread */ changelog_cleanup_helper_threads(this, priv); @@ -2467,6 +2515,29 @@ reconfigure(xlator_t *this, dict_t *options) goto out; GF_OPTION_RECONF("changelog", active_now, options, bool, out); + GF_OPTION_RECONF("changelog-notification", rpc_active_now, options, bool, + out); + + /* If journalling is enabled, enable rpc notifications */ + if (active_now && !active_earlier) { + if (!rpc_active_earlier) + iniate_rpc = _gf_true; + } + + if (rpc_active_now && !rpc_active_earlier) { + iniate_rpc = _gf_true; + } + + /* TODO: Disable of changelog-notifications is not supported for now + * as there is no clean way of cleaning up of rpc resources + */ + + if (iniate_rpc) { + ret = changelog_init_rpc(this, priv); + if (ret) + goto out; + priv->rpc_active = _gf_true; + } /** * changelog_handle_change() handles changes that could possibly @@ -2597,6 +2668,7 @@ changelog_init_options(xlator_t *this, changelog_priv_t *priv) goto dealloc_2; GF_OPTION_INIT("changelog", priv->active, bool, dealloc_2); + GF_OPTION_INIT("changelog-notification", priv->rpc_active, bool, dealloc_2); GF_OPTION_INIT("capture-del-path", priv->capture_del_path, bool, dealloc_2); GF_OPTION_INIT("op-mode", tmp, str, dealloc_2); @@ -2635,22 +2707,6 @@ error_return: return -1; } -static void -changelog_cleanup_rpc(xlator_t *this, changelog_priv_t *priv) -{ - /* terminate rpc server */ - if (!this->cleanup_starting) - changelog_destroy_rpc_listner(this, priv); - - (void)changelog_cleanup_rpc_threads(this, priv); - /* cleanup rot buffs */ - rbuf_dtor(priv->rbuf); - - /* cleanup poller thread */ - if (priv->poller) - (void)changelog_thread_cleanup(this, priv->poller); -} - static int changelog_init_rpc(xlator_t *this, changelog_priv_t *priv) { @@ -2747,10 +2803,13 @@ init(xlator_t *this) INIT_LIST_HEAD(&priv->queue); priv->barrier_enabled = _gf_false; - /* RPC ball rolling.. */ - ret = changelog_init_rpc(this, priv); - if (ret) - goto cleanup_barrier; + if (priv->rpc_active || priv->active) { + /* RPC ball rolling.. */ + ret = changelog_init_rpc(this, priv); + if (ret) + goto cleanup_barrier; + priv->rpc_active = _gf_true; + } ret = changelog_init(this, priv); if (ret) @@ -2762,7 +2821,9 @@ init(xlator_t *this) return 0; cleanup_rpc: - changelog_cleanup_rpc(this, priv); + if (priv->rpc_active) { + changelog_cleanup_rpc(this, priv); + } cleanup_barrier: changelog_barrier_pthread_destroy(priv); cleanup_options: @@ -2788,9 +2849,10 @@ fini(xlator_t *this) priv = this->private; if (priv) { - /* terminate RPC server/threads */ - changelog_cleanup_rpc(this, priv); - + if (priv->active || priv->rpc_active) { + /* terminate RPC server/threads */ + changelog_cleanup_rpc(this, priv); + } /* call barrier_disable to cancel timer */ if (priv->barrier_enabled) __chlog_barrier_disable(this, &queue); @@ -2859,6 +2921,13 @@ struct volume_options options[] = { .flags = OPT_FLAG_SETTABLE, .level = OPT_STATUS_BASIC, .tags = {"journal", "georep", "glusterfind"}}, + {.key = {"changelog-notification"}, + .type = GF_OPTION_TYPE_BOOL, + .default_value = "off", + .description = "enable/disable changelog live notification", + .op_version = {3}, + .level = OPT_STATUS_BASIC, + .tags = {"bitrot", "georep"}}, {.key = {"changelog-brick"}, .type = GF_OPTION_TYPE_PATH, .description = "brick path to generate unique socket file name." |