diff options
Diffstat (limited to 'xlators/features/changelog/src')
-rw-r--r-- | xlators/features/changelog/src/changelog-helpers.c | 52 | ||||
-rw-r--r-- | xlators/features/changelog/src/changelog-helpers.h | 25 | ||||
-rw-r--r-- | xlators/features/changelog/src/changelog-messages.h | 8 | ||||
-rw-r--r-- | xlators/features/changelog/src/changelog-rpc-common.c | 3 | ||||
-rw-r--r-- | xlators/features/changelog/src/changelog.c | 183 |
5 files changed, 147 insertions, 124 deletions
diff --git a/xlators/features/changelog/src/changelog-helpers.c b/xlators/features/changelog/src/changelog-helpers.c index 71fe1f032a0..e561997d858 100644 --- a/xlators/features/changelog/src/changelog-helpers.c +++ b/xlators/features/changelog/src/changelog-helpers.c @@ -242,8 +242,7 @@ changelog_write(int fd, char *buffer, size_t len) } int -htime_update(xlator_t *this, changelog_priv_t *priv, unsigned long ts, - char *buffer) +htime_update(xlator_t *this, changelog_priv_t *priv, time_t ts, char *buffer) { char changelog_path[PATH_MAX + 1] = { 0, @@ -273,7 +272,7 @@ htime_update(xlator_t *this, changelog_priv_t *priv, unsigned long ts, goto out; } - len = snprintf(x_value, sizeof(x_value), "%lu:%d", ts, + len = snprintf(x_value, sizeof(x_value), "%ld:%d", ts, priv->rollover_count); if (len >= sizeof(x_value)) { ret = -1; @@ -382,8 +381,7 @@ out: } static int -changelog_rollover_changelog(xlator_t *this, changelog_priv_t *priv, - unsigned long ts) +changelog_rollover_changelog(xlator_t *this, changelog_priv_t *priv, time_t ts) { int ret = -1; int notify = 0; @@ -421,16 +419,14 @@ changelog_rollover_changelog(xlator_t *this, changelog_priv_t *priv, priv->changelog_fd = -1; } - time_t time = (time_t)ts; - - /* Get GMT time */ - gmt = gmtime(&time); + /* Get GMT time. */ + gmt = gmtime(&ts); strftime(yyyymmdd, sizeof(yyyymmdd), "%Y/%m/%d", gmt); (void)snprintf(ofile, PATH_MAX, "%s/" CHANGELOG_FILE_NAME, priv->changelog_dir); - (void)snprintf(nfile, PATH_MAX, "%s/%s/" CHANGELOG_FILE_NAME ".%lu", + (void)snprintf(nfile, PATH_MAX, "%s/%s/" CHANGELOG_FILE_NAME ".%ld", priv->changelog_dir, yyyymmdd, ts); (void)snprintf(nfile_dir, PATH_MAX, "%s/%s", priv->changelog_dir, yyyymmdd); @@ -593,7 +589,7 @@ out: * returns -1 on failure or error */ int -htime_open(xlator_t *this, changelog_priv_t *priv, unsigned long ts) +htime_open(xlator_t *this, changelog_priv_t *priv, time_t ts) { int ht_file_fd = -1; int ht_dir_fd = -1; @@ -723,7 +719,7 @@ out: * returns -1 on failure or error */ int -htime_create(xlator_t *this, changelog_priv_t *priv, unsigned long ts) +htime_create(xlator_t *this, changelog_priv_t *priv, time_t ts) { int ht_file_fd = -1; int ht_dir_fd = -1; @@ -741,12 +737,12 @@ htime_create(xlator_t *this, changelog_priv_t *priv, unsigned long ts) int32_t len = 0; gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_NEW_HTIME_FILE, - "name=%lu", ts, NULL); + "name=%ld", ts, NULL); CHANGELOG_FILL_HTIME_DIR(priv->changelog_dir, ht_dir_path); /* get the htime file name in ht_file_path */ - len = snprintf(ht_file_path, PATH_MAX, "%s/%s.%lu", ht_dir_path, + len = snprintf(ht_file_path, PATH_MAX, "%s/%s.%ld", ht_dir_path, HTIME_FILE_NAME, ts); if ((len < 0) || (len >= PATH_MAX)) { ret = -1; @@ -792,7 +788,7 @@ htime_create(xlator_t *this, changelog_priv_t *priv, unsigned long ts) goto out; } - (void)snprintf(ht_file_bname, sizeof(ht_file_bname), "%s.%lu", + (void)snprintf(ht_file_bname, sizeof(ht_file_bname), "%s.%ld", HTIME_FILE_NAME, ts); if (sys_fsetxattr(ht_dir_fd, HTIME_CURRENT, ht_file_bname, strlen(ht_file_bname), 0)) { @@ -963,8 +959,8 @@ out: } int -changelog_start_next_change(xlator_t *this, changelog_priv_t *priv, - unsigned long ts, gf_boolean_t finale) +changelog_start_next_change(xlator_t *this, changelog_priv_t *priv, time_t ts, + gf_boolean_t finale) { int ret = -1; @@ -985,21 +981,12 @@ changelog_entry_length() return sizeof(changelog_log_data_t); } -int +void changelog_fill_rollover_data(changelog_log_data_t *cld, gf_boolean_t is_last) { - struct timeval tv = { - 0, - }; - cld->cld_type = CHANGELOG_TYPE_ROLLOVER; - - if (gettimeofday(&tv, NULL)) - return -1; - - cld->cld_roll_time = (unsigned long)tv.tv_sec; + cld->cld_roll_time = gf_time(); cld->cld_finale = is_last; - return 0; } int @@ -1274,7 +1261,7 @@ changelog_rollover(void *data) while (1) { (void)pthread_testcancel(); - tv.tv_sec = time(NULL) + priv->rollover_time; + tv.tv_sec = gf_time() + priv->rollover_time; tv.tv_nsec = 0; ret = 0; /* Reset ret to zero */ @@ -1355,12 +1342,7 @@ changelog_rollover(void *data) if (priv->explicit_rollover == _gf_true) sleep(1); - ret = changelog_fill_rollover_data(&cld, _gf_false); - if (ret) { - gf_smsg(this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_ROLLOVER_DATA_FILL_FAILED, NULL); - continue; - } + changelog_fill_rollover_data(&cld, _gf_false); _mask_cancellation(); diff --git a/xlators/features/changelog/src/changelog-helpers.h b/xlators/features/changelog/src/changelog-helpers.h index 0906b164a78..38fa7590c32 100644 --- a/xlators/features/changelog/src/changelog-helpers.h +++ b/xlators/features/changelog/src/changelog-helpers.h @@ -31,7 +31,7 @@ */ typedef struct changelog_log_data { /* rollover related */ - unsigned long cld_roll_time; + time_t cld_roll_time; /* reopen changelog? */ gf_boolean_t cld_finale; @@ -97,12 +97,6 @@ struct changelog_encoder { typedef struct changelog_time_slice { /** - * just in case we need nanosecond granularity some day. - * field is unused as of now (maybe we'd need it later). - */ - struct timeval tv_start; - - /** * version of changelog file, incremented each time changes * rollover. */ @@ -190,8 +184,12 @@ typedef struct changelog_ev_selector { /* changelog's private structure */ struct changelog_priv { + /* changelog journalling */ gf_boolean_t active; + /* changelog live notifications */ + gf_boolean_t rpc_active; + /* to generate unique socket file per brick */ char *changelog_brick; @@ -419,11 +417,11 @@ changelog_local_t * changelog_local_init(xlator_t *this, inode_t *inode, uuid_t gfid, int xtra_records, gf_boolean_t update_flag); int -changelog_start_next_change(xlator_t *this, changelog_priv_t *priv, - unsigned long ts, gf_boolean_t finale); +changelog_start_next_change(xlator_t *this, changelog_priv_t *priv, time_t ts, + gf_boolean_t finale); int changelog_open_journal(xlator_t *this, changelog_priv_t *priv); -int +void changelog_fill_rollover_data(changelog_log_data_t *cld, gf_boolean_t is_last); int changelog_inject_single_event(xlator_t *this, changelog_priv_t *priv, @@ -447,12 +445,11 @@ changelog_fsync_thread(void *data); int changelog_forget(xlator_t *this, inode_t *inode); int -htime_update(xlator_t *this, changelog_priv_t *priv, unsigned long ts, - char *buffer); +htime_update(xlator_t *this, changelog_priv_t *priv, time_t ts, char *buffer); int -htime_open(xlator_t *this, changelog_priv_t *priv, unsigned long ts); +htime_open(xlator_t *this, changelog_priv_t *priv, time_t ts); int -htime_create(xlator_t *this, changelog_priv_t *priv, unsigned long ts); +htime_create(xlator_t *this, changelog_priv_t *priv, time_t ts); /* Geo-Rep snapshot dependency changes */ void diff --git a/xlators/features/changelog/src/changelog-messages.h b/xlators/features/changelog/src/changelog-messages.h index 4dd56b8ee97..cb0e16c85d8 100644 --- a/xlators/features/changelog/src/changelog-messages.h +++ b/xlators/features/changelog/src/changelog-messages.h @@ -59,12 +59,12 @@ GLFS_MSGID( CHANGELOG_MSG_NO_HTIME_CURRENT, CHANGELOG_MSG_HTIME_CURRENT, CHANGELOG_MSG_NEW_HTIME_FILE, CHANGELOG_MSG_MKDIR_ERROR, CHANGELOG_MSG_PATH_NOT_FOUND, CHANGELOG_MSG_XATTR_INIT_FAILED, - CHANGELOG_MSG_WROTE_TO_CSNAP, CHANGELOG_MSG_ROLLOVER_DATA_FILL_FAILED, + CHANGELOG_MSG_WROTE_TO_CSNAP, CHANGELOG_MSG_UNUSED_0, CHANGELOG_MSG_GET_BUFFER_FAILED, CHANGELOG_MSG_BARRIER_STATE_NOTIFY, CHANGELOG_MSG_BARRIER_DISABLED, CHANGELOG_MSG_BARRIER_ALREADY_DISABLED, CHANGELOG_MSG_BARRIER_ON_ERROR, CHANGELOG_MSG_BARRIER_ENABLE, CHANGELOG_MSG_BARRIER_KEY_NOT_FOUND, CHANGELOG_MSG_ERROR_IN_DICT_GET, - CHANGELOG_MSG_GET_TIME_FAILURE, CHANGELOG_MSG_HTIME_FETCH_FAILED, + CHANGELOG_MSG_UNUSED_1, CHANGELOG_MSG_UNUSED_2, CHANGELOG_MSG_DEQUEUING_BARRIER_FOPS, CHANGELOG_MSG_DEQUEUING_BARRIER_FOPS_FINISHED, CHANGELOG_MSG_BARRIER_TIMEOUT, CHANGELOG_MSG_TIMEOUT_ADD_FAILED, @@ -123,8 +123,6 @@ GLFS_MSGID( #define CHANGELOG_MSG_GET_TIME_OP_FAILED_STR "Problem rolling over changelog(s)" #define CHANGELOG_MSG_BARRIER_INFO_STR "Explicit wakeup on barrier notify" #define CHANGELOG_MSG_SELECT_FAILED_STR "pthread_cond_timedwait failed" -#define CHANGELOG_MSG_ROLLOVER_DATA_FILL_FAILED_STR \ - "failed to fill rollover data" #define CHANGELOG_MSG_INJECT_FSYNC_FAILED_STR "failed to inject fsync event" #define CHANGELOG_MSG_LOCAL_INIT_FAILED_STR \ "changelog local initialization failed" @@ -144,9 +142,7 @@ GLFS_MSGID( #define CHANGELOG_MSG_BARRIER_KEY_NOT_FOUND_STR "barrier key not found" #define CHANGELOG_MSG_ERROR_IN_DICT_GET_STR \ "Something went wrong in dict_get_str_boolean" -#define CHANGELOG_MSG_GET_TIME_FAILURE_STR "gettimeofday() failure" #define CHANGELOG_MSG_DIR_OPTIONS_NOT_SET_STR "changelog-dir option is not set" -#define CHANGELOG_MSG_HTIME_FETCH_FAILED_STR "unable to fetch htime" #define CHANGELOG_MSG_FREEUP_FAILED_STR "could not cleanup bootstrapper" #define CHANGELOG_MSG_CHILD_MISCONFIGURED_STR \ "translator needs a single subvolume" diff --git a/xlators/features/changelog/src/changelog-rpc-common.c b/xlators/features/changelog/src/changelog-rpc-common.c index afcc3a873c8..125246a17e1 100644 --- a/xlators/features/changelog/src/changelog-rpc-common.c +++ b/xlators/features/changelog/src/changelog-rpc-common.c @@ -262,6 +262,9 @@ changelog_rpc_server_destroy(xlator_t *this, rpcsvc_t *rpc, char *sockfile, struct rpcsvc_program *prog = NULL; rpc_transport_t *trans = NULL; + if (!rpc) + return; + while (*progs) { prog = *progs; (void)rpcsvc_program_unregister(rpc, prog); diff --git a/xlators/features/changelog/src/changelog.c b/xlators/features/changelog/src/changelog.c index 0a03cfa3673..6a6e5af859e 100644 --- a/xlators/features/changelog/src/changelog.c +++ b/xlators/features/changelog/src/changelog.c @@ -34,6 +34,12 @@ static struct changelog_bootstrap cb_bootstrap[] = { }, }; +static int +changelog_init_rpc(xlator_t *this, changelog_priv_t *priv); + +static int +changelog_init(xlator_t *this, changelog_priv_t *priv); + /* Entry operations - TYPE III */ /** @@ -1997,6 +2003,11 @@ notify(xlator_t *this, int event, void *data, ...) uint64_t clntcnt = 0; changelog_clnt_t *conn = NULL; gf_boolean_t cleanup_notify = _gf_false; + char sockfile[UNIX_PATH_MAX] = { + 0, + }; + rpcsvc_listener_t *listener = NULL; + rpcsvc_listener_t *next = NULL; INIT_LIST_HEAD(&queue); @@ -2010,23 +2021,40 @@ notify(xlator_t *this, int event, void *data, ...) "cleanup changelog rpc connection of brick %s", priv->victim->name); - this->cleanup_starting = 1; - changelog_destroy_rpc_listner(this, priv); - conn = &priv->connections; - if (conn) - changelog_ev_cleanup_connections(this, conn); - xprtcnt = GF_ATOMIC_GET(priv->xprtcnt); - clntcnt = GF_ATOMIC_GET(priv->clntcnt); - - if (!xprtcnt && !clntcnt) { - LOCK(&priv->lock); - { - cleanup_notify = priv->notify_down; - priv->notify_down = _gf_true; + if (priv->rpc_active) { + this->cleanup_starting = 1; + changelog_destroy_rpc_listner(this, priv); + conn = &priv->connections; + if (conn) + changelog_ev_cleanup_connections(this, conn); + xprtcnt = GF_ATOMIC_GET(priv->xprtcnt); + clntcnt = GF_ATOMIC_GET(priv->clntcnt); + if (!xprtcnt && !clntcnt) { + LOCK(&priv->lock); + { + cleanup_notify = priv->notify_down; + priv->notify_down = _gf_true; + } + UNLOCK(&priv->lock); + if (priv->rpc) { + list_for_each_entry_safe(listener, next, + &priv->rpc->listeners, list) + { + if (listener->trans) { + rpc_transport_unref(listener->trans); + } + } + rpcsvc_destroy(priv->rpc); + priv->rpc = NULL; + } + CHANGELOG_MAKE_SOCKET_PATH(priv->changelog_brick, sockfile, + UNIX_PATH_MAX); + sys_unlink(sockfile); + if (!cleanup_notify) + default_notify(this, GF_EVENT_PARENT_DOWN, data); } - UNLOCK(&priv->lock); - if (!cleanup_notify) - default_notify(this, GF_EVENT_PARENT_DOWN, data); + } else { + default_notify(this, GF_EVENT_PARENT_DOWN, data); } goto out; } @@ -2224,23 +2252,11 @@ static int changelog_init(xlator_t *this, changelog_priv_t *priv) { int i = 0; - int ret = -1; - struct timeval tv = { - 0, - }; + int ret = 0; changelog_log_data_t cld = { 0, }; - ret = gettimeofday(&tv, NULL); - if (ret) { - gf_smsg(this->name, GF_LOG_ERROR, errno, CHANGELOG_MSG_GET_TIME_FAILURE, - NULL); - goto out; - } - - priv->slice.tv_start = tv; - priv->maps[CHANGELOG_TYPE_DATA] = "D "; priv->maps[CHANGELOG_TYPE_METADATA] = "M "; priv->maps[CHANGELOG_TYPE_METADATA_XATTR] = "M "; @@ -2259,9 +2275,7 @@ changelog_init(xlator_t *this, changelog_priv_t *priv) * in case there was an encoding change. so... things are kept * simple here. */ - ret = changelog_fill_rollover_data(&cld, _gf_false); - if (ret) - goto out; + changelog_fill_rollover_data(&cld, _gf_false); ret = htime_open(this, priv, cld.cld_roll_time); /* call htime open with cld's rollover_time */ @@ -2405,6 +2419,22 @@ changelog_barrier_pthread_destroy(changelog_priv_t *priv) LOCK_DESTROY(&priv->bflags.lock); } +static void +changelog_cleanup_rpc(xlator_t *this, changelog_priv_t *priv) +{ + /* terminate rpc server */ + if (!this->cleanup_starting) + changelog_destroy_rpc_listner(this, priv); + + (void)changelog_cleanup_rpc_threads(this, priv); + /* cleanup rot buffs */ + rbuf_dtor(priv->rbuf); + + /* cleanup poller thread */ + if (priv->poller) + (void)changelog_thread_cleanup(this, priv->poller); +} + int reconfigure(xlator_t *this, dict_t *options) { @@ -2413,6 +2443,9 @@ reconfigure(xlator_t *this, dict_t *options) changelog_priv_t *priv = NULL; gf_boolean_t active_earlier = _gf_true; gf_boolean_t active_now = _gf_true; + gf_boolean_t rpc_active_earlier = _gf_true; + gf_boolean_t rpc_active_now = _gf_true; + gf_boolean_t iniate_rpc = _gf_false; changelog_time_slice_t *slice = NULL; changelog_log_data_t cld = { 0, @@ -2423,9 +2456,6 @@ reconfigure(xlator_t *this, dict_t *options) char csnap_dir[PATH_MAX] = { 0, }; - struct timeval tv = { - 0, - }; uint32_t timeout = 0; priv = this->private; @@ -2434,6 +2464,7 @@ reconfigure(xlator_t *this, dict_t *options) ret = -1; active_earlier = priv->active; + rpc_active_earlier = priv->rpc_active; /* first stop the rollover and the fsync thread */ changelog_cleanup_helper_threads(this, priv); @@ -2467,6 +2498,29 @@ reconfigure(xlator_t *this, dict_t *options) goto out; GF_OPTION_RECONF("changelog", active_now, options, bool, out); + GF_OPTION_RECONF("changelog-notification", rpc_active_now, options, bool, + out); + + /* If journalling is enabled, enable rpc notifications */ + if (active_now && !active_earlier) { + if (!rpc_active_earlier) + iniate_rpc = _gf_true; + } + + if (rpc_active_now && !rpc_active_earlier) { + iniate_rpc = _gf_true; + } + + /* TODO: Disable of changelog-notifications is not supported for now + * as there is no clean way of cleaning up of rpc resources + */ + + if (iniate_rpc) { + ret = changelog_init_rpc(this, priv); + if (ret) + goto out; + priv->rpc_active = _gf_true; + } /** * changelog_handle_change() handles changes that could possibly @@ -2493,9 +2547,7 @@ reconfigure(xlator_t *this, dict_t *options) out); if (active_now || active_earlier) { - ret = changelog_fill_rollover_data(&cld, !active_now); - if (ret) - goto out; + changelog_fill_rollover_data(&cld, !active_now); slice = &priv->slice; @@ -2514,13 +2566,7 @@ reconfigure(xlator_t *this, dict_t *options) if (!active_earlier) { gf_smsg(this->name, GF_LOG_INFO, 0, CHANGELOG_MSG_RECONFIGURE, NULL); - if (gettimeofday(&tv, NULL)) { - gf_smsg(this->name, GF_LOG_ERROR, 0, - CHANGELOG_MSG_HTIME_FETCH_FAILED, NULL); - ret = -1; - goto out; - } - htime_create(this, priv, tv.tv_sec); + htime_create(this, priv, gf_time()); } ret = changelog_spawn_helper_threads(this, priv); } @@ -2597,6 +2643,7 @@ changelog_init_options(xlator_t *this, changelog_priv_t *priv) goto dealloc_2; GF_OPTION_INIT("changelog", priv->active, bool, dealloc_2); + GF_OPTION_INIT("changelog-notification", priv->rpc_active, bool, dealloc_2); GF_OPTION_INIT("capture-del-path", priv->capture_del_path, bool, dealloc_2); GF_OPTION_INIT("op-mode", tmp, str, dealloc_2); @@ -2635,22 +2682,6 @@ error_return: return -1; } -static void -changelog_cleanup_rpc(xlator_t *this, changelog_priv_t *priv) -{ - /* terminate rpc server */ - if (!this->cleanup_starting) - changelog_destroy_rpc_listner(this, priv); - - (void)changelog_cleanup_rpc_threads(this, priv); - /* cleanup rot buffs */ - rbuf_dtor(priv->rbuf); - - /* cleanup poller thread */ - if (priv->poller) - (void)changelog_thread_cleanup(this, priv->poller); -} - static int changelog_init_rpc(xlator_t *this, changelog_priv_t *priv) { @@ -2747,10 +2778,13 @@ init(xlator_t *this) INIT_LIST_HEAD(&priv->queue); priv->barrier_enabled = _gf_false; - /* RPC ball rolling.. */ - ret = changelog_init_rpc(this, priv); - if (ret) - goto cleanup_barrier; + if (priv->rpc_active || priv->active) { + /* RPC ball rolling.. */ + ret = changelog_init_rpc(this, priv); + if (ret) + goto cleanup_barrier; + priv->rpc_active = _gf_true; + } ret = changelog_init(this, priv); if (ret) @@ -2762,7 +2796,9 @@ init(xlator_t *this) return 0; cleanup_rpc: - changelog_cleanup_rpc(this, priv); + if (priv->rpc_active) { + changelog_cleanup_rpc(this, priv); + } cleanup_barrier: changelog_barrier_pthread_destroy(priv); cleanup_options: @@ -2788,9 +2824,11 @@ fini(xlator_t *this) priv = this->private; if (priv) { - /* terminate RPC server/threads */ - changelog_cleanup_rpc(this, priv); - + if (priv->active || priv->rpc_active) { + /* terminate RPC server/threads */ + changelog_cleanup_rpc(this, priv); + GF_FREE(priv->ev_dispatcher); + } /* call barrier_disable to cancel timer */ if (priv->barrier_enabled) __chlog_barrier_disable(this, &queue); @@ -2859,6 +2897,13 @@ struct volume_options options[] = { .flags = OPT_FLAG_SETTABLE, .level = OPT_STATUS_BASIC, .tags = {"journal", "georep", "glusterfind"}}, + {.key = {"changelog-notification"}, + .type = GF_OPTION_TYPE_BOOL, + .default_value = "off", + .description = "enable/disable changelog live notification", + .op_version = {3}, + .level = OPT_STATUS_BASIC, + .tags = {"bitrot", "georep"}}, {.key = {"changelog-brick"}, .type = GF_OPTION_TYPE_PATH, .description = "brick path to generate unique socket file name." |