diff options
Diffstat (limited to 'xlators/features/changelog/src/changelog.c')
-rw-r--r-- | xlators/features/changelog/src/changelog.c | 183 |
1 files changed, 114 insertions, 69 deletions
diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c index 0a03cfa3673..6a6e5af859e 100644 --- a/xlators/features/changelog/src/changelog.c +++ b/xlators/features/changelog/src/changelog.c @@ -34,6 +34,12 @@ static struct changelog_bootstrap cb_bootstrap[] = { }, }; +static int +changelog_init_rpc(xlator_t *this, changelog_priv_t *priv); + +static int +changelog_init(xlator_t *this, changelog_priv_t *priv); + /* Entry operations - TYPE III */ /** @@ -1997,6 +2003,11 @@ notify(xlator_t *this, int event, void *data, ...) uint64_t clntcnt = 0; changelog_clnt_t *conn = NULL; gf_boolean_t cleanup_notify = _gf_false; + char sockfile[UNIX_PATH_MAX] = { + 0, + }; + rpcsvc_listener_t *listener = NULL; + rpcsvc_listener_t *next = NULL; INIT_LIST_HEAD(&queue); @@ -2010,23 +2021,40 @@ notify(xlator_t *this, int event, void *data, ...) "cleanup changelog rpc connection of brick %s", priv->victim->name); - this->cleanup_starting = 1; - changelog_destroy_rpc_listner(this, priv); - conn = &priv->connections; - if (conn) - changelog_ev_cleanup_connections(this, conn); - xprtcnt = GF_ATOMIC_GET(priv->xprtcnt); - clntcnt = GF_ATOMIC_GET(priv->clntcnt); - - if (!xprtcnt && !clntcnt) { - LOCK(&priv->lock); - { - cleanup_notify = priv->notify_down; - priv->notify_down = _gf_true; + if (priv->rpc_active) { + this->cleanup_starting = 1; + changelog_destroy_rpc_listner(this, priv); + conn = &priv->connections; + if (conn) + changelog_ev_cleanup_connections(this, conn); + xprtcnt = GF_ATOMIC_GET(priv->xprtcnt); + clntcnt = GF_ATOMIC_GET(priv->clntcnt); + if (!xprtcnt && !clntcnt) { + LOCK(&priv->lock); + { + cleanup_notify = priv->notify_down; + priv->notify_down = _gf_true; + } + UNLOCK(&priv->lock); + if (priv->rpc) { + list_for_each_entry_safe(listener, next, + &priv->rpc->listeners, list) + { + if (listener->trans) { + rpc_transport_unref(listener->trans); + } + } + rpcsvc_destroy(priv->rpc); + priv->rpc = NULL; + } + CHANGELOG_MAKE_SOCKET_PATH(priv->changelog_brick, sockfile, + UNIX_PATH_MAX); + sys_unlink(sockfile); + if (!cleanup_notify) + default_notify(this, GF_EVENT_PARENT_DOWN, data); } - UNLOCK(&priv->lock); - if (!cleanup_notify) - default_notify(this, GF_EVENT_PARENT_DOWN, data); + } else { + default_notify(this, GF_EVENT_PARENT_DOWN, data); } goto out; } @@ -2224,23 +2252,11 @@ static int changelog_init(xlator_t *this, changelog_priv_t *priv) { int i = 0; - int ret = -1; - struct timeval tv = { - 0, - }; + int ret = 0; changelog_log_data_t cld = { 0, }; - ret = gettimeofday(&tv, NULL); - if (ret) { - gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_GET_TIME_FAILURE, - NULL); - goto out; - } - - priv->slice.tv_start = tv; - priv->maps[CHANGELOG_TYPE_DATA] = "D "; priv->maps[CHANGELOG_TYPE_METADATA] = "M "; priv->maps[CHANGELOG_TYPE_METADATA_XATTR] = "M "; @@ -2259,9 +2275,7 @@ changelog_init(xlator_t *this, changelog_priv_t *priv) * in case there was an encoding change. so... things are kept * simple here. */ - ret = changelog_fill_rollover_data(&cld, _gf_false); - if (ret) - goto out; + changelog_fill_rollover_data(&cld, _gf_false); ret = htime_open(this, priv, cld.cld_roll_time); /* call htime open with cld's rollover_time */ @@ -2405,6 +2419,22 @@ changelog_barrier_pthread_destroy(changelog_priv_t *priv) LOCK_DESTROY(&priv->bflags.lock); } +static void +changelog_cleanup_rpc(xlator_t *this, changelog_priv_t *priv) +{ + /* terminate rpc server */ + if (!this->cleanup_starting) + changelog_destroy_rpc_listner(this, priv); + + (void)changelog_cleanup_rpc_threads(this, priv); + /* cleanup rot buffs */ + rbuf_dtor(priv->rbuf); + + /* cleanup poller thread */ + if (priv->poller) + (void)changelog_thread_cleanup(this, priv->poller); +} + int reconfigure(xlator_t *this, dict_t *options) { @@ -2413,6 +2443,9 @@ reconfigure(xlator_t *this, dict_t *options) changelog_priv_t *priv = NULL; gf_boolean_t active_earlier = _gf_true; gf_boolean_t active_now = _gf_true; + gf_boolean_t rpc_active_earlier = _gf_true; + gf_boolean_t rpc_active_now = _gf_true; + gf_boolean_t iniate_rpc = _gf_false; changelog_time_slice_t *slice = NULL; changelog_log_data_t cld = { 0, @@ -2423,9 +2456,6 @@ reconfigure(xlator_t *this, dict_t *options) char csnap_dir[PATH_MAX] = { 0, }; - struct timeval tv = { - 0, - }; uint32_t timeout = 0; priv = this->private; @@ -2434,6 +2464,7 @@ reconfigure(xlator_t *this, dict_t *options) ret = -1; active_earlier = priv->active; + rpc_active_earlier = priv->rpc_active; /* first stop the rollover and the fsync thread */ changelog_cleanup_helper_threads(this, priv); @@ -2467,6 +2498,29 @@ reconfigure(xlator_t *this, dict_t *options) goto out; GF_OPTION_RECONF("changelog", active_now, options, bool, out); + GF_OPTION_RECONF("changelog-notification", rpc_active_now, options, bool, + out); + + /* If journalling is enabled, enable rpc notifications */ + if (active_now && !active_earlier) { + if (!rpc_active_earlier) + iniate_rpc = _gf_true; + } + + if (rpc_active_now && !rpc_active_earlier) { + iniate_rpc = _gf_true; + } + + /* TODO: Disable of changelog-notifications is not supported for now + * as there is no clean way of cleaning up of rpc resources + */ + + if (iniate_rpc) { + ret = changelog_init_rpc(this, priv); + if (ret) + goto out; + priv->rpc_active = _gf_true; + } /** * changelog_handle_change() handles changes that could possibly @@ -2493,9 +2547,7 @@ reconfigure(xlator_t *this, dict_t *options) out); if (active_now || active_earlier) { - ret = changelog_fill_rollover_data(&cld, !active_now); - if (ret) - goto out; + changelog_fill_rollover_data(&cld, !active_now); slice = &priv->slice; @@ -2514,13 +2566,7 @@ reconfigure(xlator_t *this, dict_t *options) if (!active_earlier) { gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_RECONFIGURE, NULL); - if (gettimeofday(&tv, NULL)) { - gf_smsg(this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_HTIME_FETCH_FAILED, NULL); - ret = -1; - goto out; - } - htime_create(this, priv, tv.tv_sec); + htime_create(this, priv, gf_time()); } ret = changelog_spawn_helper_threads(this, priv); } @@ -2597,6 +2643,7 @@ changelog_init_options(xlator_t *this, changelog_priv_t *priv) goto dealloc_2; GF_OPTION_INIT("changelog", priv->active, bool, dealloc_2); + GF_OPTION_INIT("changelog-notification", priv->rpc_active, bool, dealloc_2); GF_OPTION_INIT("capture-del-path", priv->capture_del_path, bool, dealloc_2); GF_OPTION_INIT("op-mode", tmp, str, dealloc_2); @@ -2635,22 +2682,6 @@ error_return: return -1; } -static void -changelog_cleanup_rpc(xlator_t *this, changelog_priv_t *priv) -{ - /* terminate rpc server */ - if (!this->cleanup_starting) - changelog_destroy_rpc_listner(this, priv); - - (void)changelog_cleanup_rpc_threads(this, priv); - /* cleanup rot buffs */ - rbuf_dtor(priv->rbuf); - - /* cleanup poller thread */ - if (priv->poller) - (void)changelog_thread_cleanup(this, priv->poller); -} - static int changelog_init_rpc(xlator_t *this, changelog_priv_t *priv) { @@ -2747,10 +2778,13 @@ init(xlator_t *this) INIT_LIST_HEAD(&priv->queue); priv->barrier_enabled = _gf_false; - /* RPC ball rolling.. */ - ret = changelog_init_rpc(this, priv); - if (ret) - goto cleanup_barrier; + if (priv->rpc_active || priv->active) { + /* RPC ball rolling.. */ + ret = changelog_init_rpc(this, priv); + if (ret) + goto cleanup_barrier; + priv->rpc_active = _gf_true; + } ret = changelog_init(this, priv); if (ret) @@ -2762,7 +2796,9 @@ init(xlator_t *this) return 0; cleanup_rpc: - changelog_cleanup_rpc(this, priv); + if (priv->rpc_active) { + changelog_cleanup_rpc(this, priv); + } cleanup_barrier: changelog_barrier_pthread_destroy(priv); cleanup_options: @@ -2788,9 +2824,11 @@ fini(xlator_t *this) priv = this->private; if (priv) { - /* terminate RPC server/threads */ - changelog_cleanup_rpc(this, priv); - + if (priv->active || priv->rpc_active) { + /* terminate RPC server/threads */ + changelog_cleanup_rpc(this, priv); + GF_FREE(priv->ev_dispatcher); + } /* call barrier_disable to cancel timer */ if (priv->barrier_enabled) __chlog_barrier_disable(this, &queue); @@ -2859,6 +2897,13 @@ struct volume_options options[] = { .flags = OPT_FLAG_SETTABLE, .level = OPT_STATUS_BASIC, .tags = {"journal", "georep", "glusterfind"}}, + {.key = {"changelog-notification"}, + .type = GF_OPTION_TYPE_BOOL, + .default_value = "off", + .description = "enable/disable changelog live notification", + .op_version = {3}, + .level = OPT_STATUS_BASIC, + .tags = {"bitrot", "georep"}}, {.key = {"changelog-brick"}, .type = GF_OPTION_TYPE_PATH, .description = "brick path to generate unique socket file name." |