diff options
Diffstat (limited to 'xlators/features/changelog/lib/src/gf-changelog.c')
-rw-r--r-- | xlators/features/changelog/lib/src/gf-changelog.c | 862 |
1 files changed, 399 insertions, 463 deletions
diff --git a/xlators/features/changelog/lib/src/gf-changelog.c b/xlators/features/changelog/lib/src/gf-changelog.c index fb2d9037ffb..8f33eb01013 100644 --- a/xlators/features/changelog/lib/src/gf-changelog.c +++ b/xlators/features/changelog/lib/src/gf-changelog.c @@ -14,6 +14,8 @@ #include <sys/types.h> #include <sys/socket.h> #include <sys/un.h> +#include <sys/time.h> +#include <sys/resource.h> #ifndef _GNU_SOURCE #define _GNU_SOURCE @@ -24,589 +26,523 @@ #include "glusterfs.h" #include "logging.h" #include "defaults.h" +#include "syncop.h" +#include "gf-changelog-rpc.h" #include "gf-changelog-helpers.h" /* from the changelog translator */ #include "changelog-misc.h" #include "changelog-mem-types.h" -int byebye = 0; +/** + * Global singleton xlator pointer for the library, initialized + * during library load. This should probably be hidden inside + * an initialized object which is an handle for the consumer. + * + * TODO: do away with the global.. + */ +xlator_t *master = NULL; -static void -gf_changelog_cleanup (gf_changelog_t *gfc) +static inline +gf_private_t *gf_changelog_alloc_priv () { - /* socket */ - if (gfc->gfc_sockfd != -1) - close (gfc->gfc_sockfd); - /* tracker fd */ - if (gfc->gfc_fd != -1) - close (gfc->gfc_fd); - /* processing dir */ - if (gfc->gfc_dir) - closedir (gfc->gfc_dir); - - if (gfc->gfc_working_dir) - free (gfc->gfc_working_dir); /* allocated by realpath */ -} + int ret = 0; + gf_private_t *priv = NULL; -void -__attribute__ ((constructor)) gf_changelog_ctor (void) -{ - glusterfs_ctx_t *ctx = NULL; + priv = calloc (1, sizeof (gf_private_t)); + if (!priv) + goto error_return; + INIT_LIST_HEAD (&priv->connections); - ctx = glusterfs_ctx_new (); - if (!ctx) - return; + ret = LOCK_INIT (&priv->lock); + if (ret != 0) + goto free_priv; + priv->api = NULL; - if (glusterfs_globals_init (ctx)) { - free (ctx); - ctx = NULL; - return; - } + return priv; - THIS->ctx = ctx; - if (xlator_mem_acct_init (THIS, gf_changelog_mt_end)) - return; + free_priv: + free (priv); + error_return: + return NULL; } -void -__attribute__ ((destructor)) gf_changelog_dtor (void) -{ - xlator_t *this = NULL; - glusterfs_ctx_t *ctx = NULL; - gf_changelog_t *gfc = NULL; +#define GF_CHANGELOG_EVENT_POOL_SIZE 16384 +#define GF_CHANGELOG_EVENT_THREAD_COUNT 4 - this = THIS; - if (!this) - return; - - ctx = this->ctx; - gfc = this->private; - - if (gfc) { - if (gfc->hist_gfc) { - gf_changelog_cleanup(gfc->hist_gfc); - GF_FREE (gfc->hist_gfc); - } - gf_changelog_cleanup (gfc); - GF_FREE (gfc); +static int +gf_changelog_ctx_defaults_init (glusterfs_ctx_t *ctx) +{ + cmd_args_t *cmd_args = NULL; + struct rlimit lim = {0, }; + call_pool_t *pool = NULL; + int ret = -1; + + ret = xlator_mem_acct_init (THIS, gf_changelog_mt_end); + if (ret != 0) { + return ret; } - if (ctx) { - pthread_mutex_destroy (&ctx->lock); - free (ctx); - ctx = NULL; - } -} + ctx->process_uuid = generate_glusterfs_ctx_id (); + if (!ctx->process_uuid) + return -1; + ctx->page_size = 128 * GF_UNIT_KB; -static int -gf_changelog_open_dirs (gf_changelog_t *gfc) -{ - int ret = -1; - DIR *dir = NULL; - int tracker_fd = 0; - char tracker_path[PATH_MAX] = {0,}; - xlator_t *this = NULL; + ctx->iobuf_pool = iobuf_pool_new (); + if (!ctx->iobuf_pool) + return -1; - this = THIS; - GF_ASSERT (this); + ctx->event_pool = event_pool_new (GF_CHANGELOG_EVENT_POOL_SIZE, + GF_CHANGELOG_EVENT_THREAD_COUNT); + if (!ctx->event_pool) + return -1; - (void) snprintf (gfc->gfc_current_dir, PATH_MAX, - "%s/"GF_CHANGELOG_CURRENT_DIR"/", - gfc->gfc_working_dir); + pool = GF_CALLOC (1, sizeof (call_pool_t), + gf_changelog_mt_libgfchangelog_call_pool_t); + if (!pool) + return -1; - ret = recursive_rmdir (gfc->gfc_current_dir); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "Failed to rmdir: %s, err: %s", - gfc->gfc_current_dir, strerror (errno)); - goto out; - } + /* frame_mem_pool size 112 * 64 */ + pool->frame_mem_pool = mem_pool_new (call_frame_t, 32); + if (!pool->frame_mem_pool) + return -1; - ret = mkdir_p (gfc->gfc_current_dir, 0600, _gf_false); - if (ret) - goto out; + /* stack_mem_pool size 256 * 128 */ + pool->stack_mem_pool = mem_pool_new (call_stack_t, 16); - (void) snprintf (gfc->gfc_processed_dir, PATH_MAX, - "%s/"GF_CHANGELOG_PROCESSED_DIR"/", - gfc->gfc_working_dir); + if (!pool->stack_mem_pool) + return -1; - ret = mkdir_p (gfc->gfc_processed_dir, 0600, _gf_false); - if (ret) - goto out; + ctx->stub_mem_pool = mem_pool_new (call_stub_t, 16); + if (!ctx->stub_mem_pool) + return -1; - (void) snprintf (gfc->gfc_processing_dir, PATH_MAX, - "%s/"GF_CHANGELOG_PROCESSING_DIR"/", - gfc->gfc_working_dir); + ctx->dict_pool = mem_pool_new (dict_t, 32); + if (!ctx->dict_pool) + return -1; - ret = recursive_rmdir (gfc->gfc_processing_dir); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "Failed to rmdir: %s, err: %s", - gfc->gfc_processing_dir, strerror (errno)); - goto out; - } + ctx->dict_pair_pool = mem_pool_new (data_pair_t, 512); + if (!ctx->dict_pair_pool) + return -1; - ret = mkdir_p (gfc->gfc_processing_dir, 0600, _gf_false); - if (ret) - goto out; + ctx->dict_data_pool = mem_pool_new (data_t, 512); + if (!ctx->dict_data_pool) + return -1; - dir = opendir (gfc->gfc_processing_dir); - if (!dir) { - gf_log ("", GF_LOG_ERROR, - "opendir() error [reason: %s]", strerror (errno)); - goto out; - } + INIT_LIST_HEAD (&pool->all_frames); + LOCK_INIT (&pool->lock); + ctx->pool = pool; - gfc->gfc_dir = dir; + pthread_mutex_init (&(ctx->lock), NULL); - (void) snprintf (tracker_path, PATH_MAX, - "%s/"GF_CHANGELOG_TRACKER, gfc->gfc_working_dir); + cmd_args = &ctx->cmd_args; - tracker_fd = open (tracker_path, O_CREAT | O_APPEND | O_RDWR, - S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); - if (tracker_fd < 0) { - closedir (gfc->gfc_dir); - ret = -1; - goto out; - } + INIT_LIST_HEAD (&cmd_args->xlator_options); + + lim.rlim_cur = RLIM_INFINITY; + lim.rlim_max = RLIM_INFINITY; + setrlimit (RLIMIT_CORE, &lim); - gfc->gfc_fd = tracker_fd; - ret = 0; - out: - return ret; + return 0; } -int -gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc) +/* TODO: cleanup ctx defaults */ +static void +gf_changelog_cleanup_this (xlator_t *this) { - int ret = 0; - int len = 0; - int tries = 0; - int sockfd = 0; - struct sockaddr_un remote; - - this = gfc->this; + glusterfs_ctx_t *ctx = NULL; - if (gfc->gfc_sockfd != -1) { - gf_log (this->name, GF_LOG_INFO, - "Reconnecting..."); - close (gfc->gfc_sockfd); - } + if (!this) + return; - sockfd = socket (AF_UNIX, SOCK_STREAM, 0); - if (sockfd < 0) { - ret = -1; - goto out; - } + ctx = this->ctx; + syncenv_destroy (ctx->env); + free (ctx); - CHANGELOG_MAKE_SOCKET_PATH (gfc->gfc_brickpath, - gfc->gfc_sockpath, UNIX_PATH_MAX); - gf_log (this->name, GF_LOG_INFO, - "connecting to changelog socket: %s (brick: %s)", - gfc->gfc_sockpath, gfc->gfc_brickpath); + if (this->private) + free (this->private); - remote.sun_family = AF_UNIX; - strcpy (remote.sun_path, gfc->gfc_sockpath); + this->private = NULL; + this->ctx = NULL; +} - len = strlen (remote.sun_path) + sizeof (remote.sun_family); +static int +gf_changelog_init_this () +{ + glusterfs_ctx_t *ctx = NULL; - while (tries < gfc->gfc_connretries) { - gf_log (this->name, GF_LOG_WARNING, - "connection attempt %d/%d...", - tries + 1, gfc->gfc_connretries); + ctx = glusterfs_ctx_new (); + if (!ctx) + goto error_return; - /* initiate a connect */ - if (connect (sockfd, (struct sockaddr *) &remote, len) == 0) { - gfc->gfc_sockfd = sockfd; - break; - } + if (glusterfs_globals_init (ctx)) + goto free_ctx; - tries++; - sleep (2); - } + THIS->ctx = ctx; + if (gf_changelog_ctx_defaults_init (ctx)) + goto free_ctx; - if (tries == gfc->gfc_connretries) { - gf_log (this->name, GF_LOG_ERROR, - "could not connect to changelog socket!" - " bailing out..."); - close (sockfd); - ret = -1; - } else - gf_log (this->name, GF_LOG_INFO, - "connection successful"); + ctx->env = syncenv_new (0, 0, 0); + if (!ctx->env) + goto free_ctx; + return 0; - out: - return ret; + free_ctx: + free (ctx); + THIS->ctx = NULL; + error_return: + return -1; } -int -gf_changelog_done (char *file) +static int +gf_changelog_init_master () { - int ret = -1; - char *buffer = NULL; - xlator_t *this = NULL; - gf_changelog_t *gfc = NULL; - char to_path[PATH_MAX] = {0,}; - - errno = EINVAL; - - this = THIS; - if (!this) - goto out; - - gfc = (gf_changelog_t *) this->private; - if (!gfc) - goto out; + int ret = 0; + gf_private_t *priv = NULL; + glusterfs_ctx_t *ctx = NULL; - if (!file || !strlen (file)) - goto out; + ret = gf_changelog_init_this (); + if (ret != 0) + goto error_return; + master = THIS; + + priv = gf_changelog_alloc_priv (); + if (!priv) + goto cleanup_master; + master->private = priv; + + /* poller thread */ + ret = pthread_create (&priv->poller, + NULL, changelog_rpc_poller, master); + if (ret != 0) { + gf_log (master->name, GF_LOG_ERROR, + "failed to spawn poller thread"); + goto cleanup_master; + } - /* make sure 'file' is inside ->gfc_working_dir */ - buffer = realpath (file, NULL); - if (!buffer) - goto out; + return 0; - if (strncmp (gfc->gfc_working_dir, - buffer, strlen (gfc->gfc_working_dir))) - goto out; + cleanup_master: + master->private = NULL; + gf_changelog_cleanup_this (master); + error_return: + return -1; +} - (void) snprintf (to_path, PATH_MAX, "%s%s", - gfc->gfc_processed_dir, basename (buffer)); - gf_log (this->name, GF_LOG_DEBUG, - "moving %s to processed directory", file); - ret = rename (buffer, to_path); - if (ret) { - gf_log (this->name, GF_LOG_ERROR, - "cannot move %s to %s (reason: %s)", - file, to_path, strerror (errno)); - goto out; - } +/* ctor/dtor */ - ret = 0; +void +__attribute__ ((constructor)) gf_changelog_ctor (void) +{ + (void) gf_changelog_init_master (); +} - out: - if (buffer) - free (buffer); /* allocated by realpath() */ - return ret; +void +__attribute__ ((destructor)) gf_changelog_dtor (void) +{ + gf_changelog_cleanup_this (master); } -/** - * @API - * for a set of changelogs, start from the beginning - */ +/* TODO: cleanup clnt/svc on failure */ int -gf_changelog_start_fresh () +gf_changelog_setup_rpc (xlator_t *this, + gf_changelog_t *entry, int proc) { - xlator_t *this = NULL; - gf_changelog_t *gfc = NULL; - - this = THIS; - if (!this) - goto out; + int ret = 0; + rpcsvc_t *svc = NULL; + struct rpc_clnt *rpc = NULL; - errno = EINVAL; + /** + * Initialize a connect back socket. A probe() RPC call to the server + * triggers a reverse connect. + */ + svc = gf_changelog_reborp_init_rpc_listner (this, entry->brick, + RPC_SOCK (entry), entry); + if (!svc) + goto error_return; + RPC_REBORP (entry) = svc; + + /* Initialize an RPC client */ + rpc = gf_changelog_rpc_init (this, entry); + if (!rpc) + goto error_return; + RPC_PROBER (entry) = rpc; - gfc = (gf_changelog_t *) this->private; - if (!gfc) - goto out; + /** + * @FIXME + * till we have connection state machine, let's delay the RPC call + * for now.. + */ + sleep (2); - if (gf_ftruncate (gfc->gfc_fd, 0)) - goto out; + /** + * Probe changelog translator for reverse connection. After a successful + * call, there's less use of the client and can be disconnected, but + * let's leave the connection active for any future RPC calls. + */ + ret = gf_changelog_invoke_rpc (this, entry, proc); + if (ret) { + gf_log (this->name, GF_LOG_ERROR, + "Could not initiate probe RPC, bailing out!!!"); + goto error_return; + } return 0; - out: + error_return: return -1; } -/** - * @API - * return the next changelog file entry. zero means all chanelogs - * consumed. - */ -ssize_t -gf_changelog_next_change (char *bufptr, size_t maxlen) +static void +gf_cleanup_event (gf_changelog_t *entry) { - ssize_t size = -1; - int tracker_fd = 0; - xlator_t *this = NULL; - gf_changelog_t *gfc = NULL; - char buffer[PATH_MAX] = {0,}; - - if (maxlen > PATH_MAX) { - errno = ENAMETOOLONG; - goto out; - } + xlator_t *this = NULL; + struct gf_event_list *ev = NULL; - errno = EINVAL; + this = entry->this; + ev = &entry->event; - this = THIS; - if (!this) - goto out; + (void) gf_thread_cleanup (this, ev->invoker); - gfc = (gf_changelog_t *) this->private; - if (!gfc) - goto out; + (void) pthread_mutex_destroy (&ev->lock); + (void) pthread_cond_destroy (&ev->cond); - tracker_fd = gfc->gfc_fd; + ev->entry = NULL; +} - size = gf_readline (tracker_fd, buffer, maxlen); - if (size < 0) { - size = -1; - goto out; +static int +gf_init_event (gf_changelog_t *entry) +{ + int ret = 0; + struct gf_event_list *ev = NULL; + + ev = &entry->event; + ev->entry = entry; + + ret = pthread_mutex_init (&ev->lock, NULL); + if (ret != 0) + goto error_return; + ret = pthread_cond_init (&ev->cond, NULL); + if (ret != 0) + goto cleanup_mutex; + INIT_LIST_HEAD (&ev->events); + + ev->next_seq = 0; /* bootstrap sequencing */ + + if (entry->ordered) { + ret = pthread_create (&ev->invoker, NULL, + gf_changelog_callback_invoker, ev); + if (ret != 0) + goto cleanup_cond; } - if (size == 0) - goto out; - - memcpy (bufptr, buffer, size - 1); - bufptr[size - 1] = '\0'; + return 0; -out: - return size; + cleanup_cond: + (void) pthread_cond_destroy (&ev->cond); + cleanup_mutex: + (void) pthread_mutex_destroy (&ev->lock); + error_return: + return -1; } /** - * @API - * gf_changelog_scan() - scan and generate a list of change entries - * - * calling this api multiple times (without calling gf_changlog_done()) - * would result new changelogs(s) being refreshed in the tracker file. - * This call also acts as a cancellation point for the consumer. + * TODO: + * - cleanup invoker thread (if ordered mode) + * - cleanup event list + * - destroy rpc{-clnt, svc} */ -ssize_t -gf_changelog_scan () +int +gf_cleanup_brick_connection (xlator_t *this, gf_changelog_t *entry) { - int ret = 0; - int tracker_fd = 0; - size_t len = 0; - size_t off = 0; - xlator_t *this = NULL; - size_t nr_entries = 0; - gf_changelog_t *gfc = NULL; - struct dirent *entryp = NULL; - struct dirent *result = NULL; - char buffer[PATH_MAX] = {0,}; - - this = THIS; - if (!this) - goto out; + return 0; +} - gfc = (gf_changelog_t *) this->private; - if (!gfc) - goto out; +int +gf_cleanup_connections (xlator_t *this) +{ + return 0; +} - /** - * do we need to protect 'byebye' with locks? worst, the - * consumer would get notified during next scan(). - */ - if (byebye) { - errno = ECONNREFUSED; - goto out; - } +static int +gf_setup_brick_connection (xlator_t *this, + struct gf_brick_spec *brick, + gf_boolean_t ordered, void *xl) +{ + int ret = 0; + gf_private_t *priv = NULL; + gf_changelog_t *entry = NULL; - errno = EINVAL; + priv = this->private; - tracker_fd = gfc->gfc_fd; + if (!brick->callback || !brick->init || !brick->fini) + goto error_return; - if (gf_ftruncate (tracker_fd, 0)) - goto out; + entry = GF_CALLOC (1, sizeof (*entry), + gf_changelog_mt_libgfchangelog_t); + if (!entry) + goto error_return; + INIT_LIST_HEAD (&entry->list); - len = offsetof(struct dirent, d_name) - + pathconf(gfc->gfc_processing_dir, _PC_NAME_MAX) + 1; - entryp = GF_CALLOC (1, len, - gf_changelog_mt_libgfchangelog_dirent_t); - if (!entryp) - goto out; + entry->notify = brick->filter; + (void) strncpy (entry->brick, brick->brick_path, PATH_MAX); - rewinddir (gfc->gfc_dir); - while (1) { - ret = readdir_r (gfc->gfc_dir, entryp, &result); - if (ret || !result) - break; + entry->this = this; + entry->invokerxl = xl; - if ( !strcmp (basename (entryp->d_name), ".") - || !strcmp (basename (entryp->d_name), "..") ) - continue; + entry->ordered = ordered; + if (ordered) { + ret = gf_init_event (entry); + if (ret) + goto free_entry; + } - nr_entries++; + entry->fini = brick->fini; + entry->callback = brick->callback; + entry->connected = brick->connected; + entry->disconnected = brick->disconnected; - GF_CHANGELOG_FILL_BUFFER (gfc->gfc_processing_dir, - buffer, off, - strlen (gfc->gfc_processing_dir)); - GF_CHANGELOG_FILL_BUFFER (entryp->d_name, buffer, - off, strlen (entryp->d_name)); - GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1); + entry->ptr = brick->init (this, brick); + if (!entry->ptr) + goto cleanup_event; + priv->api = entry->ptr; /* pointer to API, if required */ - if (gf_changelog_write (tracker_fd, buffer, off) != off) { - gf_log (this->name, GF_LOG_ERROR, - "error writing changelog filename" - " to tracker file"); - break; - } - off = 0; + LOCK (&priv->lock); + { + list_add_tail (&entry->list, &priv->connections); } + UNLOCK (&priv->lock); - GF_FREE (entryp); + ret = gf_changelog_setup_rpc (this, entry, CHANGELOG_RPC_PROBE_FILTER); + if (ret) + goto cleanup_event; + return 0; - if (!result) { - if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1) - return nr_entries; - } - out: + cleanup_event: + if (ordered) + gf_cleanup_event (entry); + free_entry: + list_del (&entry->list); /* FIXME: kludge for now */ + GF_FREE (entry); + error_return: return -1; } -/** - * @API - * gf_changelog_register() - register a client for updates. - */ int -gf_changelog_register (char *brick_path, char *scratch_dir, - char *log_file, int log_level, int max_reconnects) +gf_changelog_register_brick (xlator_t *this, + struct gf_brick_spec *brick, + gf_boolean_t ordered, void *xl) { - int i = 0; - int ret = -1; - int errn = 0; - xlator_t *this = NULL; - gf_changelog_t *gfc = NULL; - char hist_scratch_dir[PATH_MAX] = {0,}; - struct stat buf = {0,}; - - this = THIS; - if (!this->ctx) - goto out; - - errno = ENOMEM; - - gfc = GF_CALLOC (1, sizeof (*gfc), - gf_changelog_mt_libgfchangelog_t); - if (!gfc) - goto out; - - gfc->this = this; - - gfc->gfc_dir = NULL; - gfc->gfc_fd = gfc->gfc_sockfd = -1; - - if (stat (scratch_dir, &buf) && errno == ENOENT) { - ret = mkdir_p (scratch_dir, 0600, _gf_true); - if (ret) { - errn = errno; - goto cleanup; - } - } - - gfc->gfc_working_dir = realpath (scratch_dir, NULL); - if (!gfc->gfc_working_dir) { - errn = errno; - goto cleanup; - } + return gf_setup_brick_connection (this, brick, ordered, xl); +} - /* Begin: Changes for History API */ - gfc->hist_gfc = NULL; +static int +gf_changelog_setup_logging (xlator_t *this, char *logfile, int loglevel) +{ + /* passing ident as NULL means to use default ident for syslog */ + if (gf_log_init (this->ctx, logfile, NULL)) + return -1; - gfc->hist_gfc = GF_CALLOC (1, sizeof (*gfc), - gf_changelog_mt_libgfchangelog_t); - if (!gfc->hist_gfc) - goto cleanup; + gf_log_set_loglevel ((loglevel == -1) ? GF_LOG_INFO : + loglevel); + return 0; +} - gfc->hist_gfc->gfc_dir = NULL; - gfc->hist_gfc->gfc_fd = gfc->hist_gfc->gfc_sockfd = -1; - gfc->hist_gfc->this = NULL; +int +gf_changelog_register_generic (struct gf_brick_spec *bricks, int count, + int ordered, char *logfile, int lvl, void *xl) +{ + int ret = 0; + xlator_t *this = NULL; + xlator_t *old_this = NULL; + struct gf_brick_spec *brick = NULL; + gf_boolean_t need_order = _gf_false; - (void) strncpy (hist_scratch_dir, scratch_dir, PATH_MAX); - (void) snprintf (hist_scratch_dir, PATH_MAX, - "%s/"GF_CHANGELOG_HISTORY_DIR"/", - gfc->gfc_working_dir); + SAVE_THIS (xl); - ret = mkdir_p (hist_scratch_dir, 0600, _gf_false); - if (ret) { - errn = errno; - goto cleanup; - } + this = THIS; + if (!this) + goto error_return; - gfc->hist_gfc->gfc_working_dir = realpath (hist_scratch_dir, NULL); - if (!gfc->hist_gfc->gfc_working_dir) { - errn = errno; - goto cleanup; - } + ret = gf_changelog_setup_logging (this, logfile, lvl); + if (ret) + goto error_return; - ret = gf_changelog_open_dirs (gfc->hist_gfc); - if (ret) { - errn = errno; - gf_log (this->name, GF_LOG_ERROR, - "could not create entries in history scratch dir"); - goto cleanup; - } + need_order = (ordered) ? _gf_true : _gf_false; - (void) strncpy (gfc->hist_gfc->gfc_brickpath, brick_path, PATH_MAX); + brick = bricks; + while (count--) { + gf_log (this->name, GF_LOG_INFO, + "Registering brick: %s [notify filter: %d]", + brick->brick_path, brick->filter); - for (i=0; i < 256; i++) { - gfc->hist_gfc->rfc3986[i] = - (isalnum(i) || i == '~' || - i == '-' || i == '.' || i == '_') ? i : 0; - } - /* End: Changes for History API*/ + ret = gf_changelog_register_brick (this, brick, need_order, xl); + if (ret != 0) { + gf_log (this->name, GF_LOG_ERROR, + "Error registering with changelog xlator"); + break; + } - ret = gf_changelog_open_dirs (gfc); - if (ret) { - errn = errno; - gf_log (this->name, GF_LOG_ERROR, - "could not create entries in scratch dir"); - goto cleanup; + brick++; } - /* passing ident as NULL means to use default ident for syslog */ - if (gf_log_init (this->ctx, log_file, NULL)) - goto cleanup; - - gf_log_set_loglevel ((log_level == -1) ? GF_LOG_INFO : - log_level); + if (ret != 0) + goto cleanup_inited_bricks; - gfc->gfc_connretries = (max_reconnects <= 0) ? 1 : max_reconnects; - (void) strncpy (gfc->gfc_brickpath, brick_path, PATH_MAX); + RESTORE_THIS(); + return 0; - ret = gf_changelog_notification_init (this, gfc); - if (ret) { - errn = errno; - goto cleanup; - } + cleanup_inited_bricks: + gf_cleanup_connections (this); + error_return: + RESTORE_THIS(); + return -1; +} - ret = gf_thread_create (&gfc->gfc_changelog_processor, - NULL, gf_changelog_process, gfc); - if (ret) { - errn = errno; - gf_log (this->name, GF_LOG_ERROR, - "error creating changelog processor thread" - " new changes won't be recorded!!!"); - goto cleanup; - } +/** + * @API + * gf_changelog_register() + * + * This is _NOT_ a generic register API. It's a special API to handle + * updates at a journal granulality. This is used by consumers wanting + * to process persistent journal such as geo-replication via a set of + * APIs. All of this is required to maintain backward compatibility. + * Owner specific private data is stored in ->api (in gf_private_t), + * which is used by APIs to access it's private data. This limits + * the library access to a single brick, but that's how it used to + * be anyway. Furthermore, this API solely _owns_ "this", therefore + * callers already having a notion of "this" are expected to use the + * newer API. + * + * Newer applications wanting to use this library need not face this + * limitation and reply of the much more feature rich generic register + * API, which is purely callback based. + * + * NOTE: @max_reconnects is not used but required for backward compat. + * + * For generic API, refer gf_changelog_register_generic(). + */ +int +gf_changelog_register (char *brick_path, char *scratch_dir, + char *log_file, int log_level, int max_reconnects) +{ + struct gf_brick_spec brick = {0,}; - for (i=0; i < 256; i++) { - gfc->rfc3986[i] = - (isalnum(i) || i == '~' || - i == '-' || i == '.' || i == '_') ? i : 0; - } + THIS = master; - ret = 0; - this->private = gfc; + brick.brick_path = brick_path; + brick.filter = CHANGELOG_OP_TYPE_JOURNAL; - goto out; + brick.init = gf_changelog_journal_init; + brick.fini = gf_changelog_journal_fini; + brick.callback = gf_changelog_handle_journal; + brick.connected = gf_changelog_journal_connect; + brick.disconnected = gf_changelog_journal_disconnect; - cleanup: - if (gfc->hist_gfc) { - gf_changelog_cleanup (gfc->hist_gfc); - GF_FREE (gfc->hist_gfc); - } - gf_changelog_cleanup (gfc); - GF_FREE (gfc); - this->private = NULL; - errno = errn; + brick.ptr = scratch_dir; - out: - return ret; + return gf_changelog_register_generic (&brick, 1, 1, + log_file, log_level, NULL); } |