summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/lib/src/gf-changelog.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/changelog/lib/src/gf-changelog.c')
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog.c862
1 files changed, 399 insertions, 463 deletions
diff --git a/xlators/features/changelog/lib/src/gf-changelog.c b/xlators/features/changelog/lib/src/gf-changelog.c
index fb2d9037ffb..8f33eb01013 100644
--- a/xlators/features/changelog/lib/src/gf-changelog.c
+++ b/xlators/features/changelog/lib/src/gf-changelog.c
@@ -14,6 +14,8 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
+#include <sys/time.h>
+#include <sys/resource.h>
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
@@ -24,589 +26,523 @@
#include "glusterfs.h"
#include "logging.h"
#include "defaults.h"
+#include "syncop.h"
+#include "gf-changelog-rpc.h"
#include "gf-changelog-helpers.h"
/* from the changelog translator */
#include "changelog-misc.h"
#include "changelog-mem-types.h"
-int byebye = 0;
+/**
+ * Global singleton xlator pointer for the library, initialized
+ * during library load. This should probably be hidden inside
+ * an initialized object which is an handle for the consumer.
+ *
+ * TODO: do away with the global..
+ */
+xlator_t *master = NULL;
-static void
-gf_changelog_cleanup (gf_changelog_t *gfc)
+static inline
+gf_private_t *gf_changelog_alloc_priv ()
{
- /* socket */
- if (gfc->gfc_sockfd != -1)
- close (gfc->gfc_sockfd);
- /* tracker fd */
- if (gfc->gfc_fd != -1)
- close (gfc->gfc_fd);
- /* processing dir */
- if (gfc->gfc_dir)
- closedir (gfc->gfc_dir);
-
- if (gfc->gfc_working_dir)
- free (gfc->gfc_working_dir); /* allocated by realpath */
-}
+ int ret = 0;
+ gf_private_t *priv = NULL;
-void
-__attribute__ ((constructor)) gf_changelog_ctor (void)
-{
- glusterfs_ctx_t *ctx = NULL;
+ priv = calloc (1, sizeof (gf_private_t));
+ if (!priv)
+ goto error_return;
+ INIT_LIST_HEAD (&priv->connections);
- ctx = glusterfs_ctx_new ();
- if (!ctx)
- return;
+ ret = LOCK_INIT (&priv->lock);
+ if (ret != 0)
+ goto free_priv;
+ priv->api = NULL;
- if (glusterfs_globals_init (ctx)) {
- free (ctx);
- ctx = NULL;
- return;
- }
+ return priv;
- THIS->ctx = ctx;
- if (xlator_mem_acct_init (THIS, gf_changelog_mt_end))
- return;
+ free_priv:
+ free (priv);
+ error_return:
+ return NULL;
}
-void
-__attribute__ ((destructor)) gf_changelog_dtor (void)
-{
- xlator_t *this = NULL;
- glusterfs_ctx_t *ctx = NULL;
- gf_changelog_t *gfc = NULL;
+#define GF_CHANGELOG_EVENT_POOL_SIZE 16384
+#define GF_CHANGELOG_EVENT_THREAD_COUNT 4
- this = THIS;
- if (!this)
- return;
-
- ctx = this->ctx;
- gfc = this->private;
-
- if (gfc) {
- if (gfc->hist_gfc) {
- gf_changelog_cleanup(gfc->hist_gfc);
- GF_FREE (gfc->hist_gfc);
- }
- gf_changelog_cleanup (gfc);
- GF_FREE (gfc);
+static int
+gf_changelog_ctx_defaults_init (glusterfs_ctx_t *ctx)
+{
+ cmd_args_t *cmd_args = NULL;
+ struct rlimit lim = {0, };
+ call_pool_t *pool = NULL;
+ int ret = -1;
+
+ ret = xlator_mem_acct_init (THIS, gf_changelog_mt_end);
+ if (ret != 0) {
+ return ret;
}
- if (ctx) {
- pthread_mutex_destroy (&ctx->lock);
- free (ctx);
- ctx = NULL;
- }
-}
+ ctx->process_uuid = generate_glusterfs_ctx_id ();
+ if (!ctx->process_uuid)
+ return -1;
+ ctx->page_size = 128 * GF_UNIT_KB;
-static int
-gf_changelog_open_dirs (gf_changelog_t *gfc)
-{
- int ret = -1;
- DIR *dir = NULL;
- int tracker_fd = 0;
- char tracker_path[PATH_MAX] = {0,};
- xlator_t *this = NULL;
+ ctx->iobuf_pool = iobuf_pool_new ();
+ if (!ctx->iobuf_pool)
+ return -1;
- this = THIS;
- GF_ASSERT (this);
+ ctx->event_pool = event_pool_new (GF_CHANGELOG_EVENT_POOL_SIZE,
+ GF_CHANGELOG_EVENT_THREAD_COUNT);
+ if (!ctx->event_pool)
+ return -1;
- (void) snprintf (gfc->gfc_current_dir, PATH_MAX,
- "%s/"GF_CHANGELOG_CURRENT_DIR"/",
- gfc->gfc_working_dir);
+ pool = GF_CALLOC (1, sizeof (call_pool_t),
+ gf_changelog_mt_libgfchangelog_call_pool_t);
+ if (!pool)
+ return -1;
- ret = recursive_rmdir (gfc->gfc_current_dir);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "Failed to rmdir: %s, err: %s",
- gfc->gfc_current_dir, strerror (errno));
- goto out;
- }
+ /* frame_mem_pool size 112 * 64 */
+ pool->frame_mem_pool = mem_pool_new (call_frame_t, 32);
+ if (!pool->frame_mem_pool)
+ return -1;
- ret = mkdir_p (gfc->gfc_current_dir, 0600, _gf_false);
- if (ret)
- goto out;
+ /* stack_mem_pool size 256 * 128 */
+ pool->stack_mem_pool = mem_pool_new (call_stack_t, 16);
- (void) snprintf (gfc->gfc_processed_dir, PATH_MAX,
- "%s/"GF_CHANGELOG_PROCESSED_DIR"/",
- gfc->gfc_working_dir);
+ if (!pool->stack_mem_pool)
+ return -1;
- ret = mkdir_p (gfc->gfc_processed_dir, 0600, _gf_false);
- if (ret)
- goto out;
+ ctx->stub_mem_pool = mem_pool_new (call_stub_t, 16);
+ if (!ctx->stub_mem_pool)
+ return -1;
- (void) snprintf (gfc->gfc_processing_dir, PATH_MAX,
- "%s/"GF_CHANGELOG_PROCESSING_DIR"/",
- gfc->gfc_working_dir);
+ ctx->dict_pool = mem_pool_new (dict_t, 32);
+ if (!ctx->dict_pool)
+ return -1;
- ret = recursive_rmdir (gfc->gfc_processing_dir);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "Failed to rmdir: %s, err: %s",
- gfc->gfc_processing_dir, strerror (errno));
- goto out;
- }
+ ctx->dict_pair_pool = mem_pool_new (data_pair_t, 512);
+ if (!ctx->dict_pair_pool)
+ return -1;
- ret = mkdir_p (gfc->gfc_processing_dir, 0600, _gf_false);
- if (ret)
- goto out;
+ ctx->dict_data_pool = mem_pool_new (data_t, 512);
+ if (!ctx->dict_data_pool)
+ return -1;
- dir = opendir (gfc->gfc_processing_dir);
- if (!dir) {
- gf_log ("", GF_LOG_ERROR,
- "opendir() error [reason: %s]", strerror (errno));
- goto out;
- }
+ INIT_LIST_HEAD (&pool->all_frames);
+ LOCK_INIT (&pool->lock);
+ ctx->pool = pool;
- gfc->gfc_dir = dir;
+ pthread_mutex_init (&(ctx->lock), NULL);
- (void) snprintf (tracker_path, PATH_MAX,
- "%s/"GF_CHANGELOG_TRACKER, gfc->gfc_working_dir);
+ cmd_args = &ctx->cmd_args;
- tracker_fd = open (tracker_path, O_CREAT | O_APPEND | O_RDWR,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
- if (tracker_fd < 0) {
- closedir (gfc->gfc_dir);
- ret = -1;
- goto out;
- }
+ INIT_LIST_HEAD (&cmd_args->xlator_options);
+
+ lim.rlim_cur = RLIM_INFINITY;
+ lim.rlim_max = RLIM_INFINITY;
+ setrlimit (RLIMIT_CORE, &lim);
- gfc->gfc_fd = tracker_fd;
- ret = 0;
- out:
- return ret;
+ return 0;
}
-int
-gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc)
+/* TODO: cleanup ctx defaults */
+static void
+gf_changelog_cleanup_this (xlator_t *this)
{
- int ret = 0;
- int len = 0;
- int tries = 0;
- int sockfd = 0;
- struct sockaddr_un remote;
-
- this = gfc->this;
+ glusterfs_ctx_t *ctx = NULL;
- if (gfc->gfc_sockfd != -1) {
- gf_log (this->name, GF_LOG_INFO,
- "Reconnecting...");
- close (gfc->gfc_sockfd);
- }
+ if (!this)
+ return;
- sockfd = socket (AF_UNIX, SOCK_STREAM, 0);
- if (sockfd < 0) {
- ret = -1;
- goto out;
- }
+ ctx = this->ctx;
+ syncenv_destroy (ctx->env);
+ free (ctx);
- CHANGELOG_MAKE_SOCKET_PATH (gfc->gfc_brickpath,
- gfc->gfc_sockpath, UNIX_PATH_MAX);
- gf_log (this->name, GF_LOG_INFO,
- "connecting to changelog socket: %s (brick: %s)",
- gfc->gfc_sockpath, gfc->gfc_brickpath);
+ if (this->private)
+ free (this->private);
- remote.sun_family = AF_UNIX;
- strcpy (remote.sun_path, gfc->gfc_sockpath);
+ this->private = NULL;
+ this->ctx = NULL;
+}
- len = strlen (remote.sun_path) + sizeof (remote.sun_family);
+static int
+gf_changelog_init_this ()
+{
+ glusterfs_ctx_t *ctx = NULL;
- while (tries < gfc->gfc_connretries) {
- gf_log (this->name, GF_LOG_WARNING,
- "connection attempt %d/%d...",
- tries + 1, gfc->gfc_connretries);
+ ctx = glusterfs_ctx_new ();
+ if (!ctx)
+ goto error_return;
- /* initiate a connect */
- if (connect (sockfd, (struct sockaddr *) &remote, len) == 0) {
- gfc->gfc_sockfd = sockfd;
- break;
- }
+ if (glusterfs_globals_init (ctx))
+ goto free_ctx;
- tries++;
- sleep (2);
- }
+ THIS->ctx = ctx;
+ if (gf_changelog_ctx_defaults_init (ctx))
+ goto free_ctx;
- if (tries == gfc->gfc_connretries) {
- gf_log (this->name, GF_LOG_ERROR,
- "could not connect to changelog socket!"
- " bailing out...");
- close (sockfd);
- ret = -1;
- } else
- gf_log (this->name, GF_LOG_INFO,
- "connection successful");
+ ctx->env = syncenv_new (0, 0, 0);
+ if (!ctx->env)
+ goto free_ctx;
+ return 0;
- out:
- return ret;
+ free_ctx:
+ free (ctx);
+ THIS->ctx = NULL;
+ error_return:
+ return -1;
}
-int
-gf_changelog_done (char *file)
+static int
+gf_changelog_init_master ()
{
- int ret = -1;
- char *buffer = NULL;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- char to_path[PATH_MAX] = {0,};
-
- errno = EINVAL;
-
- this = THIS;
- if (!this)
- goto out;
-
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
+ int ret = 0;
+ gf_private_t *priv = NULL;
+ glusterfs_ctx_t *ctx = NULL;
- if (!file || !strlen (file))
- goto out;
+ ret = gf_changelog_init_this ();
+ if (ret != 0)
+ goto error_return;
+ master = THIS;
+
+ priv = gf_changelog_alloc_priv ();
+ if (!priv)
+ goto cleanup_master;
+ master->private = priv;
+
+ /* poller thread */
+ ret = pthread_create (&priv->poller,
+ NULL, changelog_rpc_poller, master);
+ if (ret != 0) {
+ gf_log (master->name, GF_LOG_ERROR,
+ "failed to spawn poller thread");
+ goto cleanup_master;
+ }
- /* make sure 'file' is inside ->gfc_working_dir */
- buffer = realpath (file, NULL);
- if (!buffer)
- goto out;
+ return 0;
- if (strncmp (gfc->gfc_working_dir,
- buffer, strlen (gfc->gfc_working_dir)))
- goto out;
+ cleanup_master:
+ master->private = NULL;
+ gf_changelog_cleanup_this (master);
+ error_return:
+ return -1;
+}
- (void) snprintf (to_path, PATH_MAX, "%s%s",
- gfc->gfc_processed_dir, basename (buffer));
- gf_log (this->name, GF_LOG_DEBUG,
- "moving %s to processed directory", file);
- ret = rename (buffer, to_path);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "cannot move %s to %s (reason: %s)",
- file, to_path, strerror (errno));
- goto out;
- }
+/* ctor/dtor */
- ret = 0;
+void
+__attribute__ ((constructor)) gf_changelog_ctor (void)
+{
+ (void) gf_changelog_init_master ();
+}
- out:
- if (buffer)
- free (buffer); /* allocated by realpath() */
- return ret;
+void
+__attribute__ ((destructor)) gf_changelog_dtor (void)
+{
+ gf_changelog_cleanup_this (master);
}
-/**
- * @API
- * for a set of changelogs, start from the beginning
- */
+/* TODO: cleanup clnt/svc on failure */
int
-gf_changelog_start_fresh ()
+gf_changelog_setup_rpc (xlator_t *this,
+ gf_changelog_t *entry, int proc)
{
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
-
- this = THIS;
- if (!this)
- goto out;
+ int ret = 0;
+ rpcsvc_t *svc = NULL;
+ struct rpc_clnt *rpc = NULL;
- errno = EINVAL;
+ /**
+ * Initialize a connect back socket. A probe() RPC call to the server
+ * triggers a reverse connect.
+ */
+ svc = gf_changelog_reborp_init_rpc_listner (this, entry->brick,
+ RPC_SOCK (entry), entry);
+ if (!svc)
+ goto error_return;
+ RPC_REBORP (entry) = svc;
+
+ /* Initialize an RPC client */
+ rpc = gf_changelog_rpc_init (this, entry);
+ if (!rpc)
+ goto error_return;
+ RPC_PROBER (entry) = rpc;
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
+ /**
+ * @FIXME
+ * till we have connection state machine, let's delay the RPC call
+ * for now..
+ */
+ sleep (2);
- if (gf_ftruncate (gfc->gfc_fd, 0))
- goto out;
+ /**
+ * Probe changelog translator for reverse connection. After a successful
+ * call, there's less use of the client and can be disconnected, but
+ * let's leave the connection active for any future RPC calls.
+ */
+ ret = gf_changelog_invoke_rpc (this, entry, proc);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Could not initiate probe RPC, bailing out!!!");
+ goto error_return;
+ }
return 0;
- out:
+ error_return:
return -1;
}
-/**
- * @API
- * return the next changelog file entry. zero means all chanelogs
- * consumed.
- */
-ssize_t
-gf_changelog_next_change (char *bufptr, size_t maxlen)
+static void
+gf_cleanup_event (gf_changelog_t *entry)
{
- ssize_t size = -1;
- int tracker_fd = 0;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- char buffer[PATH_MAX] = {0,};
-
- if (maxlen > PATH_MAX) {
- errno = ENAMETOOLONG;
- goto out;
- }
+ xlator_t *this = NULL;
+ struct gf_event_list *ev = NULL;
- errno = EINVAL;
+ this = entry->this;
+ ev = &entry->event;
- this = THIS;
- if (!this)
- goto out;
+ (void) gf_thread_cleanup (this, ev->invoker);
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
+ (void) pthread_mutex_destroy (&ev->lock);
+ (void) pthread_cond_destroy (&ev->cond);
- tracker_fd = gfc->gfc_fd;
+ ev->entry = NULL;
+}
- size = gf_readline (tracker_fd, buffer, maxlen);
- if (size < 0) {
- size = -1;
- goto out;
+static int
+gf_init_event (gf_changelog_t *entry)
+{
+ int ret = 0;
+ struct gf_event_list *ev = NULL;
+
+ ev = &entry->event;
+ ev->entry = entry;
+
+ ret = pthread_mutex_init (&ev->lock, NULL);
+ if (ret != 0)
+ goto error_return;
+ ret = pthread_cond_init (&ev->cond, NULL);
+ if (ret != 0)
+ goto cleanup_mutex;
+ INIT_LIST_HEAD (&ev->events);
+
+ ev->next_seq = 0; /* bootstrap sequencing */
+
+ if (entry->ordered) {
+ ret = pthread_create (&ev->invoker, NULL,
+ gf_changelog_callback_invoker, ev);
+ if (ret != 0)
+ goto cleanup_cond;
}
- if (size == 0)
- goto out;
-
- memcpy (bufptr, buffer, size - 1);
- bufptr[size - 1] = '\0';
+ return 0;
-out:
- return size;
+ cleanup_cond:
+ (void) pthread_cond_destroy (&ev->cond);
+ cleanup_mutex:
+ (void) pthread_mutex_destroy (&ev->lock);
+ error_return:
+ return -1;
}
/**
- * @API
- * gf_changelog_scan() - scan and generate a list of change entries
- *
- * calling this api multiple times (without calling gf_changlog_done())
- * would result new changelogs(s) being refreshed in the tracker file.
- * This call also acts as a cancellation point for the consumer.
+ * TODO:
+ * - cleanup invoker thread (if ordered mode)
+ * - cleanup event list
+ * - destroy rpc{-clnt, svc}
*/
-ssize_t
-gf_changelog_scan ()
+int
+gf_cleanup_brick_connection (xlator_t *this, gf_changelog_t *entry)
{
- int ret = 0;
- int tracker_fd = 0;
- size_t len = 0;
- size_t off = 0;
- xlator_t *this = NULL;
- size_t nr_entries = 0;
- gf_changelog_t *gfc = NULL;
- struct dirent *entryp = NULL;
- struct dirent *result = NULL;
- char buffer[PATH_MAX] = {0,};
-
- this = THIS;
- if (!this)
- goto out;
+ return 0;
+}
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
+int
+gf_cleanup_connections (xlator_t *this)
+{
+ return 0;
+}
- /**
- * do we need to protect 'byebye' with locks? worst, the
- * consumer would get notified during next scan().
- */
- if (byebye) {
- errno = ECONNREFUSED;
- goto out;
- }
+static int
+gf_setup_brick_connection (xlator_t *this,
+ struct gf_brick_spec *brick,
+ gf_boolean_t ordered, void *xl)
+{
+ int ret = 0;
+ gf_private_t *priv = NULL;
+ gf_changelog_t *entry = NULL;
- errno = EINVAL;
+ priv = this->private;
- tracker_fd = gfc->gfc_fd;
+ if (!brick->callback || !brick->init || !brick->fini)
+ goto error_return;
- if (gf_ftruncate (tracker_fd, 0))
- goto out;
+ entry = GF_CALLOC (1, sizeof (*entry),
+ gf_changelog_mt_libgfchangelog_t);
+ if (!entry)
+ goto error_return;
+ INIT_LIST_HEAD (&entry->list);
- len = offsetof(struct dirent, d_name)
- + pathconf(gfc->gfc_processing_dir, _PC_NAME_MAX) + 1;
- entryp = GF_CALLOC (1, len,
- gf_changelog_mt_libgfchangelog_dirent_t);
- if (!entryp)
- goto out;
+ entry->notify = brick->filter;
+ (void) strncpy (entry->brick, brick->brick_path, PATH_MAX);
- rewinddir (gfc->gfc_dir);
- while (1) {
- ret = readdir_r (gfc->gfc_dir, entryp, &result);
- if (ret || !result)
- break;
+ entry->this = this;
+ entry->invokerxl = xl;
- if ( !strcmp (basename (entryp->d_name), ".")
- || !strcmp (basename (entryp->d_name), "..") )
- continue;
+ entry->ordered = ordered;
+ if (ordered) {
+ ret = gf_init_event (entry);
+ if (ret)
+ goto free_entry;
+ }
- nr_entries++;
+ entry->fini = brick->fini;
+ entry->callback = brick->callback;
+ entry->connected = brick->connected;
+ entry->disconnected = brick->disconnected;
- GF_CHANGELOG_FILL_BUFFER (gfc->gfc_processing_dir,
- buffer, off,
- strlen (gfc->gfc_processing_dir));
- GF_CHANGELOG_FILL_BUFFER (entryp->d_name, buffer,
- off, strlen (entryp->d_name));
- GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1);
+ entry->ptr = brick->init (this, brick);
+ if (!entry->ptr)
+ goto cleanup_event;
+ priv->api = entry->ptr; /* pointer to API, if required */
- if (gf_changelog_write (tracker_fd, buffer, off) != off) {
- gf_log (this->name, GF_LOG_ERROR,
- "error writing changelog filename"
- " to tracker file");
- break;
- }
- off = 0;
+ LOCK (&priv->lock);
+ {
+ list_add_tail (&entry->list, &priv->connections);
}
+ UNLOCK (&priv->lock);
- GF_FREE (entryp);
+ ret = gf_changelog_setup_rpc (this, entry, CHANGELOG_RPC_PROBE_FILTER);
+ if (ret)
+ goto cleanup_event;
+ return 0;
- if (!result) {
- if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1)
- return nr_entries;
- }
- out:
+ cleanup_event:
+ if (ordered)
+ gf_cleanup_event (entry);
+ free_entry:
+ list_del (&entry->list); /* FIXME: kludge for now */
+ GF_FREE (entry);
+ error_return:
return -1;
}
-/**
- * @API
- * gf_changelog_register() - register a client for updates.
- */
int
-gf_changelog_register (char *brick_path, char *scratch_dir,
- char *log_file, int log_level, int max_reconnects)
+gf_changelog_register_brick (xlator_t *this,
+ struct gf_brick_spec *brick,
+ gf_boolean_t ordered, void *xl)
{
- int i = 0;
- int ret = -1;
- int errn = 0;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- char hist_scratch_dir[PATH_MAX] = {0,};
- struct stat buf = {0,};
-
- this = THIS;
- if (!this->ctx)
- goto out;
-
- errno = ENOMEM;
-
- gfc = GF_CALLOC (1, sizeof (*gfc),
- gf_changelog_mt_libgfchangelog_t);
- if (!gfc)
- goto out;
-
- gfc->this = this;
-
- gfc->gfc_dir = NULL;
- gfc->gfc_fd = gfc->gfc_sockfd = -1;
-
- if (stat (scratch_dir, &buf) && errno == ENOENT) {
- ret = mkdir_p (scratch_dir, 0600, _gf_true);
- if (ret) {
- errn = errno;
- goto cleanup;
- }
- }
-
- gfc->gfc_working_dir = realpath (scratch_dir, NULL);
- if (!gfc->gfc_working_dir) {
- errn = errno;
- goto cleanup;
- }
+ return gf_setup_brick_connection (this, brick, ordered, xl);
+}
- /* Begin: Changes for History API */
- gfc->hist_gfc = NULL;
+static int
+gf_changelog_setup_logging (xlator_t *this, char *logfile, int loglevel)
+{
+ /* passing ident as NULL means to use default ident for syslog */
+ if (gf_log_init (this->ctx, logfile, NULL))
+ return -1;
- gfc->hist_gfc = GF_CALLOC (1, sizeof (*gfc),
- gf_changelog_mt_libgfchangelog_t);
- if (!gfc->hist_gfc)
- goto cleanup;
+ gf_log_set_loglevel ((loglevel == -1) ? GF_LOG_INFO :
+ loglevel);
+ return 0;
+}
- gfc->hist_gfc->gfc_dir = NULL;
- gfc->hist_gfc->gfc_fd = gfc->hist_gfc->gfc_sockfd = -1;
- gfc->hist_gfc->this = NULL;
+int
+gf_changelog_register_generic (struct gf_brick_spec *bricks, int count,
+ int ordered, char *logfile, int lvl, void *xl)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ xlator_t *old_this = NULL;
+ struct gf_brick_spec *brick = NULL;
+ gf_boolean_t need_order = _gf_false;
- (void) strncpy (hist_scratch_dir, scratch_dir, PATH_MAX);
- (void) snprintf (hist_scratch_dir, PATH_MAX,
- "%s/"GF_CHANGELOG_HISTORY_DIR"/",
- gfc->gfc_working_dir);
+ SAVE_THIS (xl);
- ret = mkdir_p (hist_scratch_dir, 0600, _gf_false);
- if (ret) {
- errn = errno;
- goto cleanup;
- }
+ this = THIS;
+ if (!this)
+ goto error_return;
- gfc->hist_gfc->gfc_working_dir = realpath (hist_scratch_dir, NULL);
- if (!gfc->hist_gfc->gfc_working_dir) {
- errn = errno;
- goto cleanup;
- }
+ ret = gf_changelog_setup_logging (this, logfile, lvl);
+ if (ret)
+ goto error_return;
- ret = gf_changelog_open_dirs (gfc->hist_gfc);
- if (ret) {
- errn = errno;
- gf_log (this->name, GF_LOG_ERROR,
- "could not create entries in history scratch dir");
- goto cleanup;
- }
+ need_order = (ordered) ? _gf_true : _gf_false;
- (void) strncpy (gfc->hist_gfc->gfc_brickpath, brick_path, PATH_MAX);
+ brick = bricks;
+ while (count--) {
+ gf_log (this->name, GF_LOG_INFO,
+ "Registering brick: %s [notify filter: %d]",
+ brick->brick_path, brick->filter);
- for (i=0; i < 256; i++) {
- gfc->hist_gfc->rfc3986[i] =
- (isalnum(i) || i == '~' ||
- i == '-' || i == '.' || i == '_') ? i : 0;
- }
- /* End: Changes for History API*/
+ ret = gf_changelog_register_brick (this, brick, need_order, xl);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Error registering with changelog xlator");
+ break;
+ }
- ret = gf_changelog_open_dirs (gfc);
- if (ret) {
- errn = errno;
- gf_log (this->name, GF_LOG_ERROR,
- "could not create entries in scratch dir");
- goto cleanup;
+ brick++;
}
- /* passing ident as NULL means to use default ident for syslog */
- if (gf_log_init (this->ctx, log_file, NULL))
- goto cleanup;
-
- gf_log_set_loglevel ((log_level == -1) ? GF_LOG_INFO :
- log_level);
+ if (ret != 0)
+ goto cleanup_inited_bricks;
- gfc->gfc_connretries = (max_reconnects <= 0) ? 1 : max_reconnects;
- (void) strncpy (gfc->gfc_brickpath, brick_path, PATH_MAX);
+ RESTORE_THIS();
+ return 0;
- ret = gf_changelog_notification_init (this, gfc);
- if (ret) {
- errn = errno;
- goto cleanup;
- }
+ cleanup_inited_bricks:
+ gf_cleanup_connections (this);
+ error_return:
+ RESTORE_THIS();
+ return -1;
+}
- ret = gf_thread_create (&gfc->gfc_changelog_processor,
- NULL, gf_changelog_process, gfc);
- if (ret) {
- errn = errno;
- gf_log (this->name, GF_LOG_ERROR,
- "error creating changelog processor thread"
- " new changes won't be recorded!!!");
- goto cleanup;
- }
+/**
+ * @API
+ * gf_changelog_register()
+ *
+ * This is _NOT_ a generic register API. It's a special API to handle
+ * updates at a journal granulality. This is used by consumers wanting
+ * to process persistent journal such as geo-replication via a set of
+ * APIs. All of this is required to maintain backward compatibility.
+ * Owner specific private data is stored in ->api (in gf_private_t),
+ * which is used by APIs to access it's private data. This limits
+ * the library access to a single brick, but that's how it used to
+ * be anyway. Furthermore, this API solely _owns_ "this", therefore
+ * callers already having a notion of "this" are expected to use the
+ * newer API.
+ *
+ * Newer applications wanting to use this library need not face this
+ * limitation and reply of the much more feature rich generic register
+ * API, which is purely callback based.
+ *
+ * NOTE: @max_reconnects is not used but required for backward compat.
+ *
+ * For generic API, refer gf_changelog_register_generic().
+ */
+int
+gf_changelog_register (char *brick_path, char *scratch_dir,
+ char *log_file, int log_level, int max_reconnects)
+{
+ struct gf_brick_spec brick = {0,};
- for (i=0; i < 256; i++) {
- gfc->rfc3986[i] =
- (isalnum(i) || i == '~' ||
- i == '-' || i == '.' || i == '_') ? i : 0;
- }
+ THIS = master;
- ret = 0;
- this->private = gfc;
+ brick.brick_path = brick_path;
+ brick.filter = CHANGELOG_OP_TYPE_JOURNAL;
- goto out;
+ brick.init = gf_changelog_journal_init;
+ brick.fini = gf_changelog_journal_fini;
+ brick.callback = gf_changelog_handle_journal;
+ brick.connected = gf_changelog_journal_connect;
+ brick.disconnected = gf_changelog_journal_disconnect;
- cleanup:
- if (gfc->hist_gfc) {
- gf_changelog_cleanup (gfc->hist_gfc);
- GF_FREE (gfc->hist_gfc);
- }
- gf_changelog_cleanup (gfc);
- GF_FREE (gfc);
- this->private = NULL;
- errno = errn;
+ brick.ptr = scratch_dir;
- out:
- return ret;
+ return gf_changelog_register_generic (&brick, 1, 1,
+ log_file, log_level, NULL);
}