summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/lib/src/gf-changelog.c
diff options
context:
space:
mode:
authorVenky Shankar <vshankar@redhat.com>2015-02-03 19:22:16 +0530
committerVijay Bellur <vbellur@redhat.com>2015-03-18 18:22:36 -0700
commit4737584fffcd25dbe35d17b076c95bf90a422cf2 (patch)
tree9f30e0e90c88c245787b78af3ca78d7ae05e30f2 /xlators/features/changelog/lib/src/gf-changelog.c
parent728fcd41eb39f66744d84b979dd8195fd47313ed (diff)
features/changelog: RPC'fy {libgf}changelog
This patch introduces RPC based communication between the changelog translator and libgfchangelog. It replaces the old pathetic stream based interaction that existed earlier (due to time constraints :-/). Changelog, upon initialization starts a RPC server (rpcsvc) allowing clients to invoke a probe API as a bootup mechanism to request for event notifications. During probe, clients can choose an event filter specifying the type(s) of events they are interested in. As of now there is no way to change the event notification set once the probe RPC call is made, but that is easier to implement. The actual event notifications is done on a separate RPC session. The client (libgfchangelog) itself starts and RPC server which the changelog translator "connects back" during probe. Notifications are dispatched by a bunch of threads from the server (translator) and the client optionally orders them if ordered notifications are requried. FOPs fill in their respective event details in a buffer (rot-buffs to be particular) and a bunch of threads (consumers) swap the buffers out of roatation and dispatch them via RPC. To avoid writer starvation, then number of dispatcher threads is one less than the number of buffer list in rot-buffs.x libgfchangelog becomes purely callback based -- upon event notification from the server (and re-ordering them if required) invoke a callback routine specified by consumer(s). A major part of the patch is also aimed at providing backward compatibility for geo-replication, which was one of the main consumer of the stream based API. Also, this patch does not\ "turn on" event notifications for all fops, just a bunch which is currently in requirement. Another pain point is that the server does not filter events before dispatching it to the clients. That load is taken up by the client itself (although it's done at the library layer rather than making it hard on the callback implementor). This needs improvement and care needs to be taken to not load the server up with expensive filtering mechanisms. Change-Id: Ibf60a432b68f2dfa60c6f9add2bcfd37a9c41395 BUG: 1170075 Signed-off-by: Venky Shankar <vshankar@redhat.com> Reviewed-on: http://review.gluster.org/9708 Reviewed-by: Jeff Darcy <jdarcy@redhat.com> Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'xlators/features/changelog/lib/src/gf-changelog.c')
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog.c862
1 files changed, 399 insertions, 463 deletions
diff --git a/xlators/features/changelog/lib/src/gf-changelog.c b/xlators/features/changelog/lib/src/gf-changelog.c
index fb2d9037ffb..8f33eb01013 100644
--- a/xlators/features/changelog/lib/src/gf-changelog.c
+++ b/xlators/features/changelog/lib/src/gf-changelog.c
@@ -14,6 +14,8 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
+#include <sys/time.h>
+#include <sys/resource.h>
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
@@ -24,589 +26,523 @@
#include "glusterfs.h"
#include "logging.h"
#include "defaults.h"
+#include "syncop.h"
+#include "gf-changelog-rpc.h"
#include "gf-changelog-helpers.h"
/* from the changelog translator */
#include "changelog-misc.h"
#include "changelog-mem-types.h"
-int byebye = 0;
+/**
+ * Global singleton xlator pointer for the library, initialized
+ * during library load. This should probably be hidden inside
+ * an initialized object which is an handle for the consumer.
+ *
+ * TODO: do away with the global..
+ */
+xlator_t *master = NULL;
-static void
-gf_changelog_cleanup (gf_changelog_t *gfc)
+static inline
+gf_private_t *gf_changelog_alloc_priv ()
{
- /* socket */
- if (gfc->gfc_sockfd != -1)
- close (gfc->gfc_sockfd);
- /* tracker fd */
- if (gfc->gfc_fd != -1)
- close (gfc->gfc_fd);
- /* processing dir */
- if (gfc->gfc_dir)
- closedir (gfc->gfc_dir);
-
- if (gfc->gfc_working_dir)
- free (gfc->gfc_working_dir); /* allocated by realpath */
-}
+ int ret = 0;
+ gf_private_t *priv = NULL;
-void
-__attribute__ ((constructor)) gf_changelog_ctor (void)
-{
- glusterfs_ctx_t *ctx = NULL;
+ priv = calloc (1, sizeof (gf_private_t));
+ if (!priv)
+ goto error_return;
+ INIT_LIST_HEAD (&priv->connections);
- ctx = glusterfs_ctx_new ();
- if (!ctx)
- return;
+ ret = LOCK_INIT (&priv->lock);
+ if (ret != 0)
+ goto free_priv;
+ priv->api = NULL;
- if (glusterfs_globals_init (ctx)) {
- free (ctx);
- ctx = NULL;
- return;
- }
+ return priv;
- THIS->ctx = ctx;
- if (xlator_mem_acct_init (THIS, gf_changelog_mt_end))
- return;
+ free_priv:
+ free (priv);
+ error_return:
+ return NULL;
}
-void
-__attribute__ ((destructor)) gf_changelog_dtor (void)
-{
- xlator_t *this = NULL;
- glusterfs_ctx_t *ctx = NULL;
- gf_changelog_t *gfc = NULL;
+#define GF_CHANGELOG_EVENT_POOL_SIZE 16384
+#define GF_CHANGELOG_EVENT_THREAD_COUNT 4
- this = THIS;
- if (!this)
- return;
-
- ctx = this->ctx;
- gfc = this->private;
-
- if (gfc) {
- if (gfc->hist_gfc) {
- gf_changelog_cleanup(gfc->hist_gfc);
- GF_FREE (gfc->hist_gfc);
- }
- gf_changelog_cleanup (gfc);
- GF_FREE (gfc);
+static int
+gf_changelog_ctx_defaults_init (glusterfs_ctx_t *ctx)
+{
+ cmd_args_t *cmd_args = NULL;
+ struct rlimit lim = {0, };
+ call_pool_t *pool = NULL;
+ int ret = -1;
+
+ ret = xlator_mem_acct_init (THIS, gf_changelog_mt_end);
+ if (ret != 0) {
+ return ret;
}
- if (ctx) {
- pthread_mutex_destroy (&ctx->lock);
- free (ctx);
- ctx = NULL;
- }
-}
+ ctx->process_uuid = generate_glusterfs_ctx_id ();
+ if (!ctx->process_uuid)
+ return -1;
+ ctx->page_size = 128 * GF_UNIT_KB;
-static int
-gf_changelog_open_dirs (gf_changelog_t *gfc)
-{
- int ret = -1;
- DIR *dir = NULL;
- int tracker_fd = 0;
- char tracker_path[PATH_MAX] = {0,};
- xlator_t *this = NULL;
+ ctx->iobuf_pool = iobuf_pool_new ();
+ if (!ctx->iobuf_pool)
+ return -1;
- this = THIS;
- GF_ASSERT (this);
+ ctx->event_pool = event_pool_new (GF_CHANGELOG_EVENT_POOL_SIZE,
+ GF_CHANGELOG_EVENT_THREAD_COUNT);
+ if (!ctx->event_pool)
+ return -1;
- (void) snprintf (gfc->gfc_current_dir, PATH_MAX,
- "%s/"GF_CHANGELOG_CURRENT_DIR"/",
- gfc->gfc_working_dir);
+ pool = GF_CALLOC (1, sizeof (call_pool_t),
+ gf_changelog_mt_libgfchangelog_call_pool_t);
+ if (!pool)
+ return -1;
- ret = recursive_rmdir (gfc->gfc_current_dir);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "Failed to rmdir: %s, err: %s",
- gfc->gfc_current_dir, strerror (errno));
- goto out;
- }
+ /* frame_mem_pool size 112 * 64 */
+ pool->frame_mem_pool = mem_pool_new (call_frame_t, 32);
+ if (!pool->frame_mem_pool)
+ return -1;
- ret = mkdir_p (gfc->gfc_current_dir, 0600, _gf_false);
- if (ret)
- goto out;
+ /* stack_mem_pool size 256 * 128 */
+ pool->stack_mem_pool = mem_pool_new (call_stack_t, 16);
- (void) snprintf (gfc->gfc_processed_dir, PATH_MAX,
- "%s/"GF_CHANGELOG_PROCESSED_DIR"/",
- gfc->gfc_working_dir);
+ if (!pool->stack_mem_pool)
+ return -1;
- ret = mkdir_p (gfc->gfc_processed_dir, 0600, _gf_false);
- if (ret)
- goto out;
+ ctx->stub_mem_pool = mem_pool_new (call_stub_t, 16);
+ if (!ctx->stub_mem_pool)
+ return -1;
- (void) snprintf (gfc->gfc_processing_dir, PATH_MAX,
- "%s/"GF_CHANGELOG_PROCESSING_DIR"/",
- gfc->gfc_working_dir);
+ ctx->dict_pool = mem_pool_new (dict_t, 32);
+ if (!ctx->dict_pool)
+ return -1;
- ret = recursive_rmdir (gfc->gfc_processing_dir);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "Failed to rmdir: %s, err: %s",
- gfc->gfc_processing_dir, strerror (errno));
- goto out;
- }
+ ctx->dict_pair_pool = mem_pool_new (data_pair_t, 512);
+ if (!ctx->dict_pair_pool)
+ return -1;
- ret = mkdir_p (gfc->gfc_processing_dir, 0600, _gf_false);
- if (ret)
- goto out;
+ ctx->dict_data_pool = mem_pool_new (data_t, 512);
+ if (!ctx->dict_data_pool)
+ return -1;
- dir = opendir (gfc->gfc_processing_dir);
- if (!dir) {
- gf_log ("", GF_LOG_ERROR,
- "opendir() error [reason: %s]", strerror (errno));
- goto out;
- }
+ INIT_LIST_HEAD (&pool->all_frames);
+ LOCK_INIT (&pool->lock);
+ ctx->pool = pool;
- gfc->gfc_dir = dir;
+ pthread_mutex_init (&(ctx->lock), NULL);
- (void) snprintf (tracker_path, PATH_MAX,
- "%s/"GF_CHANGELOG_TRACKER, gfc->gfc_working_dir);
+ cmd_args = &ctx->cmd_args;
- tracker_fd = open (tracker_path, O_CREAT | O_APPEND | O_RDWR,
- S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
- if (tracker_fd < 0) {
- closedir (gfc->gfc_dir);
- ret = -1;
- goto out;
- }
+ INIT_LIST_HEAD (&cmd_args->xlator_options);
+
+ lim.rlim_cur = RLIM_INFINITY;
+ lim.rlim_max = RLIM_INFINITY;
+ setrlimit (RLIMIT_CORE, &lim);
- gfc->gfc_fd = tracker_fd;
- ret = 0;
- out:
- return ret;
+ return 0;
}
-int
-gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc)
+/* TODO: cleanup ctx defaults */
+static void
+gf_changelog_cleanup_this (xlator_t *this)
{
- int ret = 0;
- int len = 0;
- int tries = 0;
- int sockfd = 0;
- struct sockaddr_un remote;
-
- this = gfc->this;
+ glusterfs_ctx_t *ctx = NULL;
- if (gfc->gfc_sockfd != -1) {
- gf_log (this->name, GF_LOG_INFO,
- "Reconnecting...");
- close (gfc->gfc_sockfd);
- }
+ if (!this)
+ return;
- sockfd = socket (AF_UNIX, SOCK_STREAM, 0);
- if (sockfd < 0) {
- ret = -1;
- goto out;
- }
+ ctx = this->ctx;
+ syncenv_destroy (ctx->env);
+ free (ctx);
- CHANGELOG_MAKE_SOCKET_PATH (gfc->gfc_brickpath,
- gfc->gfc_sockpath, UNIX_PATH_MAX);
- gf_log (this->name, GF_LOG_INFO,
- "connecting to changelog socket: %s (brick: %s)",
- gfc->gfc_sockpath, gfc->gfc_brickpath);
+ if (this->private)
+ free (this->private);
- remote.sun_family = AF_UNIX;
- strcpy (remote.sun_path, gfc->gfc_sockpath);
+ this->private = NULL;
+ this->ctx = NULL;
+}
- len = strlen (remote.sun_path) + sizeof (remote.sun_family);
+static int
+gf_changelog_init_this ()
+{
+ glusterfs_ctx_t *ctx = NULL;
- while (tries < gfc->gfc_connretries) {
- gf_log (this->name, GF_LOG_WARNING,
- "connection attempt %d/%d...",
- tries + 1, gfc->gfc_connretries);
+ ctx = glusterfs_ctx_new ();
+ if (!ctx)
+ goto error_return;
- /* initiate a connect */
- if (connect (sockfd, (struct sockaddr *) &remote, len) == 0) {
- gfc->gfc_sockfd = sockfd;
- break;
- }
+ if (glusterfs_globals_init (ctx))
+ goto free_ctx;
- tries++;
- sleep (2);
- }
+ THIS->ctx = ctx;
+ if (gf_changelog_ctx_defaults_init (ctx))
+ goto free_ctx;
- if (tries == gfc->gfc_connretries) {
- gf_log (this->name, GF_LOG_ERROR,
- "could not connect to changelog socket!"
- " bailing out...");
- close (sockfd);
- ret = -1;
- } else
- gf_log (this->name, GF_LOG_INFO,
- "connection successful");
+ ctx->env = syncenv_new (0, 0, 0);
+ if (!ctx->env)
+ goto free_ctx;
+ return 0;
- out:
- return ret;
+ free_ctx:
+ free (ctx);
+ THIS->ctx = NULL;
+ error_return:
+ return -1;
}
-int
-gf_changelog_done (char *file)
+static int
+gf_changelog_init_master ()
{
- int ret = -1;
- char *buffer = NULL;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- char to_path[PATH_MAX] = {0,};
-
- errno = EINVAL;
-
- this = THIS;
- if (!this)
- goto out;
-
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
+ int ret = 0;
+ gf_private_t *priv = NULL;
+ glusterfs_ctx_t *ctx = NULL;
- if (!file || !strlen (file))
- goto out;
+ ret = gf_changelog_init_this ();
+ if (ret != 0)
+ goto error_return;
+ master = THIS;
+
+ priv = gf_changelog_alloc_priv ();
+ if (!priv)
+ goto cleanup_master;
+ master->private = priv;
+
+ /* poller thread */
+ ret = pthread_create (&priv->poller,
+ NULL, changelog_rpc_poller, master);
+ if (ret != 0) {
+ gf_log (master->name, GF_LOG_ERROR,
+ "failed to spawn poller thread");
+ goto cleanup_master;
+ }
- /* make sure 'file' is inside ->gfc_working_dir */
- buffer = realpath (file, NULL);
- if (!buffer)
- goto out;
+ return 0;
- if (strncmp (gfc->gfc_working_dir,
- buffer, strlen (gfc->gfc_working_dir)))
- goto out;
+ cleanup_master:
+ master->private = NULL;
+ gf_changelog_cleanup_this (master);
+ error_return:
+ return -1;
+}
- (void) snprintf (to_path, PATH_MAX, "%s%s",
- gfc->gfc_processed_dir, basename (buffer));
- gf_log (this->name, GF_LOG_DEBUG,
- "moving %s to processed directory", file);
- ret = rename (buffer, to_path);
- if (ret) {
- gf_log (this->name, GF_LOG_ERROR,
- "cannot move %s to %s (reason: %s)",
- file, to_path, strerror (errno));
- goto out;
- }
+/* ctor/dtor */
- ret = 0;
+void
+__attribute__ ((constructor)) gf_changelog_ctor (void)
+{
+ (void) gf_changelog_init_master ();
+}
- out:
- if (buffer)
- free (buffer); /* allocated by realpath() */
- return ret;
+void
+__attribute__ ((destructor)) gf_changelog_dtor (void)
+{
+ gf_changelog_cleanup_this (master);
}
-/**
- * @API
- * for a set of changelogs, start from the beginning
- */
+/* TODO: cleanup clnt/svc on failure */
int
-gf_changelog_start_fresh ()
+gf_changelog_setup_rpc (xlator_t *this,
+ gf_changelog_t *entry, int proc)
{
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
-
- this = THIS;
- if (!this)
- goto out;
+ int ret = 0;
+ rpcsvc_t *svc = NULL;
+ struct rpc_clnt *rpc = NULL;
- errno = EINVAL;
+ /**
+ * Initialize a connect back socket. A probe() RPC call to the server
+ * triggers a reverse connect.
+ */
+ svc = gf_changelog_reborp_init_rpc_listner (this, entry->brick,
+ RPC_SOCK (entry), entry);
+ if (!svc)
+ goto error_return;
+ RPC_REBORP (entry) = svc;
+
+ /* Initialize an RPC client */
+ rpc = gf_changelog_rpc_init (this, entry);
+ if (!rpc)
+ goto error_return;
+ RPC_PROBER (entry) = rpc;
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
+ /**
+ * @FIXME
+ * till we have connection state machine, let's delay the RPC call
+ * for now..
+ */
+ sleep (2);
- if (gf_ftruncate (gfc->gfc_fd, 0))
- goto out;
+ /**
+ * Probe changelog translator for reverse connection. After a successful
+ * call, there's less use of the client and can be disconnected, but
+ * let's leave the connection active for any future RPC calls.
+ */
+ ret = gf_changelog_invoke_rpc (this, entry, proc);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Could not initiate probe RPC, bailing out!!!");
+ goto error_return;
+ }
return 0;
- out:
+ error_return:
return -1;
}
-/**
- * @API
- * return the next changelog file entry. zero means all chanelogs
- * consumed.
- */
-ssize_t
-gf_changelog_next_change (char *bufptr, size_t maxlen)
+static void
+gf_cleanup_event (gf_changelog_t *entry)
{
- ssize_t size = -1;
- int tracker_fd = 0;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- char buffer[PATH_MAX] = {0,};
-
- if (maxlen > PATH_MAX) {
- errno = ENAMETOOLONG;
- goto out;
- }
+ xlator_t *this = NULL;
+ struct gf_event_list *ev = NULL;
- errno = EINVAL;
+ this = entry->this;
+ ev = &entry->event;
- this = THIS;
- if (!this)
- goto out;
+ (void) gf_thread_cleanup (this, ev->invoker);
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
+ (void) pthread_mutex_destroy (&ev->lock);
+ (void) pthread_cond_destroy (&ev->cond);
- tracker_fd = gfc->gfc_fd;
+ ev->entry = NULL;
+}
- size = gf_readline (tracker_fd, buffer, maxlen);
- if (size < 0) {
- size = -1;
- goto out;
+static int
+gf_init_event (gf_changelog_t *entry)
+{
+ int ret = 0;
+ struct gf_event_list *ev = NULL;
+
+ ev = &entry->event;
+ ev->entry = entry;
+
+ ret = pthread_mutex_init (&ev->lock, NULL);
+ if (ret != 0)
+ goto error_return;
+ ret = pthread_cond_init (&ev->cond, NULL);
+ if (ret != 0)
+ goto cleanup_mutex;
+ INIT_LIST_HEAD (&ev->events);
+
+ ev->next_seq = 0; /* bootstrap sequencing */
+
+ if (entry->ordered) {
+ ret = pthread_create (&ev->invoker, NULL,
+ gf_changelog_callback_invoker, ev);
+ if (ret != 0)
+ goto cleanup_cond;
}
- if (size == 0)
- goto out;
-
- memcpy (bufptr, buffer, size - 1);
- bufptr[size - 1] = '\0';
+ return 0;
-out:
- return size;
+ cleanup_cond:
+ (void) pthread_cond_destroy (&ev->cond);
+ cleanup_mutex:
+ (void) pthread_mutex_destroy (&ev->lock);
+ error_return:
+ return -1;
}
/**
- * @API
- * gf_changelog_scan() - scan and generate a list of change entries
- *
- * calling this api multiple times (without calling gf_changlog_done())
- * would result new changelogs(s) being refreshed in the tracker file.
- * This call also acts as a cancellation point for the consumer.
+ * TODO:
+ * - cleanup invoker thread (if ordered mode)
+ * - cleanup event list
+ * - destroy rpc{-clnt, svc}
*/
-ssize_t
-gf_changelog_scan ()
+int
+gf_cleanup_brick_connection (xlator_t *this, gf_changelog_t *entry)
{
- int ret = 0;
- int tracker_fd = 0;
- size_t len = 0;
- size_t off = 0;
- xlator_t *this = NULL;
- size_t nr_entries = 0;
- gf_changelog_t *gfc = NULL;
- struct dirent *entryp = NULL;
- struct dirent *result = NULL;
- char buffer[PATH_MAX] = {0,};
-
- this = THIS;
- if (!this)
- goto out;
+ return 0;
+}
- gfc = (gf_changelog_t *) this->private;
- if (!gfc)
- goto out;
+int
+gf_cleanup_connections (xlator_t *this)
+{
+ return 0;
+}
- /**
- * do we need to protect 'byebye' with locks? worst, the
- * consumer would get notified during next scan().
- */
- if (byebye) {
- errno = ECONNREFUSED;
- goto out;
- }
+static int
+gf_setup_brick_connection (xlator_t *this,
+ struct gf_brick_spec *brick,
+ gf_boolean_t ordered, void *xl)
+{
+ int ret = 0;
+ gf_private_t *priv = NULL;
+ gf_changelog_t *entry = NULL;
- errno = EINVAL;
+ priv = this->private;
- tracker_fd = gfc->gfc_fd;
+ if (!brick->callback || !brick->init || !brick->fini)
+ goto error_return;
- if (gf_ftruncate (tracker_fd, 0))
- goto out;
+ entry = GF_CALLOC (1, sizeof (*entry),
+ gf_changelog_mt_libgfchangelog_t);
+ if (!entry)
+ goto error_return;
+ INIT_LIST_HEAD (&entry->list);
- len = offsetof(struct dirent, d_name)
- + pathconf(gfc->gfc_processing_dir, _PC_NAME_MAX) + 1;
- entryp = GF_CALLOC (1, len,
- gf_changelog_mt_libgfchangelog_dirent_t);
- if (!entryp)
- goto out;
+ entry->notify = brick->filter;
+ (void) strncpy (entry->brick, brick->brick_path, PATH_MAX);
- rewinddir (gfc->gfc_dir);
- while (1) {
- ret = readdir_r (gfc->gfc_dir, entryp, &result);
- if (ret || !result)
- break;
+ entry->this = this;
+ entry->invokerxl = xl;
- if ( !strcmp (basename (entryp->d_name), ".")
- || !strcmp (basename (entryp->d_name), "..") )
- continue;
+ entry->ordered = ordered;
+ if (ordered) {
+ ret = gf_init_event (entry);
+ if (ret)
+ goto free_entry;
+ }
- nr_entries++;
+ entry->fini = brick->fini;
+ entry->callback = brick->callback;
+ entry->connected = brick->connected;
+ entry->disconnected = brick->disconnected;
- GF_CHANGELOG_FILL_BUFFER (gfc->gfc_processing_dir,
- buffer, off,
- strlen (gfc->gfc_processing_dir));
- GF_CHANGELOG_FILL_BUFFER (entryp->d_name, buffer,
- off, strlen (entryp->d_name));
- GF_CHANGELOG_FILL_BUFFER ("\n", buffer, off, 1);
+ entry->ptr = brick->init (this, brick);
+ if (!entry->ptr)
+ goto cleanup_event;
+ priv->api = entry->ptr; /* pointer to API, if required */
- if (gf_changelog_write (tracker_fd, buffer, off) != off) {
- gf_log (this->name, GF_LOG_ERROR,
- "error writing changelog filename"
- " to tracker file");
- break;
- }
- off = 0;
+ LOCK (&priv->lock);
+ {
+ list_add_tail (&entry->list, &priv->connections);
}
+ UNLOCK (&priv->lock);
- GF_FREE (entryp);
+ ret = gf_changelog_setup_rpc (this, entry, CHANGELOG_RPC_PROBE_FILTER);
+ if (ret)
+ goto cleanup_event;
+ return 0;
- if (!result) {
- if (gf_lseek (tracker_fd, 0, SEEK_SET) != -1)
- return nr_entries;
- }
- out:
+ cleanup_event:
+ if (ordered)
+ gf_cleanup_event (entry);
+ free_entry:
+ list_del (&entry->list); /* FIXME: kludge for now */
+ GF_FREE (entry);
+ error_return:
return -1;
}
-/**
- * @API
- * gf_changelog_register() - register a client for updates.
- */
int
-gf_changelog_register (char *brick_path, char *scratch_dir,
- char *log_file, int log_level, int max_reconnects)
+gf_changelog_register_brick (xlator_t *this,
+ struct gf_brick_spec *brick,
+ gf_boolean_t ordered, void *xl)
{
- int i = 0;
- int ret = -1;
- int errn = 0;
- xlator_t *this = NULL;
- gf_changelog_t *gfc = NULL;
- char hist_scratch_dir[PATH_MAX] = {0,};
- struct stat buf = {0,};
-
- this = THIS;
- if (!this->ctx)
- goto out;
-
- errno = ENOMEM;
-
- gfc = GF_CALLOC (1, sizeof (*gfc),
- gf_changelog_mt_libgfchangelog_t);
- if (!gfc)
- goto out;
-
- gfc->this = this;
-
- gfc->gfc_dir = NULL;
- gfc->gfc_fd = gfc->gfc_sockfd = -1;
-
- if (stat (scratch_dir, &buf) && errno == ENOENT) {
- ret = mkdir_p (scratch_dir, 0600, _gf_true);
- if (ret) {
- errn = errno;
- goto cleanup;
- }
- }
-
- gfc->gfc_working_dir = realpath (scratch_dir, NULL);
- if (!gfc->gfc_working_dir) {
- errn = errno;
- goto cleanup;
- }
+ return gf_setup_brick_connection (this, brick, ordered, xl);
+}
- /* Begin: Changes for History API */
- gfc->hist_gfc = NULL;
+static int
+gf_changelog_setup_logging (xlator_t *this, char *logfile, int loglevel)
+{
+ /* passing ident as NULL means to use default ident for syslog */
+ if (gf_log_init (this->ctx, logfile, NULL))
+ return -1;
- gfc->hist_gfc = GF_CALLOC (1, sizeof (*gfc),
- gf_changelog_mt_libgfchangelog_t);
- if (!gfc->hist_gfc)
- goto cleanup;
+ gf_log_set_loglevel ((loglevel == -1) ? GF_LOG_INFO :
+ loglevel);
+ return 0;
+}
- gfc->hist_gfc->gfc_dir = NULL;
- gfc->hist_gfc->gfc_fd = gfc->hist_gfc->gfc_sockfd = -1;
- gfc->hist_gfc->this = NULL;
+int
+gf_changelog_register_generic (struct gf_brick_spec *bricks, int count,
+ int ordered, char *logfile, int lvl, void *xl)
+{
+ int ret = 0;
+ xlator_t *this = NULL;
+ xlator_t *old_this = NULL;
+ struct gf_brick_spec *brick = NULL;
+ gf_boolean_t need_order = _gf_false;
- (void) strncpy (hist_scratch_dir, scratch_dir, PATH_MAX);
- (void) snprintf (hist_scratch_dir, PATH_MAX,
- "%s/"GF_CHANGELOG_HISTORY_DIR"/",
- gfc->gfc_working_dir);
+ SAVE_THIS (xl);
- ret = mkdir_p (hist_scratch_dir, 0600, _gf_false);
- if (ret) {
- errn = errno;
- goto cleanup;
- }
+ this = THIS;
+ if (!this)
+ goto error_return;
- gfc->hist_gfc->gfc_working_dir = realpath (hist_scratch_dir, NULL);
- if (!gfc->hist_gfc->gfc_working_dir) {
- errn = errno;
- goto cleanup;
- }
+ ret = gf_changelog_setup_logging (this, logfile, lvl);
+ if (ret)
+ goto error_return;
- ret = gf_changelog_open_dirs (gfc->hist_gfc);
- if (ret) {
- errn = errno;
- gf_log (this->name, GF_LOG_ERROR,
- "could not create entries in history scratch dir");
- goto cleanup;
- }
+ need_order = (ordered) ? _gf_true : _gf_false;
- (void) strncpy (gfc->hist_gfc->gfc_brickpath, brick_path, PATH_MAX);
+ brick = bricks;
+ while (count--) {
+ gf_log (this->name, GF_LOG_INFO,
+ "Registering brick: %s [notify filter: %d]",
+ brick->brick_path, brick->filter);
- for (i=0; i < 256; i++) {
- gfc->hist_gfc->rfc3986[i] =
- (isalnum(i) || i == '~' ||
- i == '-' || i == '.' || i == '_') ? i : 0;
- }
- /* End: Changes for History API*/
+ ret = gf_changelog_register_brick (this, brick, need_order, xl);
+ if (ret != 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "Error registering with changelog xlator");
+ break;
+ }
- ret = gf_changelog_open_dirs (gfc);
- if (ret) {
- errn = errno;
- gf_log (this->name, GF_LOG_ERROR,
- "could not create entries in scratch dir");
- goto cleanup;
+ brick++;
}
- /* passing ident as NULL means to use default ident for syslog */
- if (gf_log_init (this->ctx, log_file, NULL))
- goto cleanup;
-
- gf_log_set_loglevel ((log_level == -1) ? GF_LOG_INFO :
- log_level);
+ if (ret != 0)
+ goto cleanup_inited_bricks;
- gfc->gfc_connretries = (max_reconnects <= 0) ? 1 : max_reconnects;
- (void) strncpy (gfc->gfc_brickpath, brick_path, PATH_MAX);
+ RESTORE_THIS();
+ return 0;
- ret = gf_changelog_notification_init (this, gfc);
- if (ret) {
- errn = errno;
- goto cleanup;
- }
+ cleanup_inited_bricks:
+ gf_cleanup_connections (this);
+ error_return:
+ RESTORE_THIS();
+ return -1;
+}
- ret = gf_thread_create (&gfc->gfc_changelog_processor,
- NULL, gf_changelog_process, gfc);
- if (ret) {
- errn = errno;
- gf_log (this->name, GF_LOG_ERROR,
- "error creating changelog processor thread"
- " new changes won't be recorded!!!");
- goto cleanup;
- }
+/**
+ * @API
+ * gf_changelog_register()
+ *
+ * This is _NOT_ a generic register API. It's a special API to handle
+ * updates at a journal granulality. This is used by consumers wanting
+ * to process persistent journal such as geo-replication via a set of
+ * APIs. All of this is required to maintain backward compatibility.
+ * Owner specific private data is stored in ->api (in gf_private_t),
+ * which is used by APIs to access it's private data. This limits
+ * the library access to a single brick, but that's how it used to
+ * be anyway. Furthermore, this API solely _owns_ "this", therefore
+ * callers already having a notion of "this" are expected to use the
+ * newer API.
+ *
+ * Newer applications wanting to use this library need not face this
+ * limitation and reply of the much more feature rich generic register
+ * API, which is purely callback based.
+ *
+ * NOTE: @max_reconnects is not used but required for backward compat.
+ *
+ * For generic API, refer gf_changelog_register_generic().
+ */
+int
+gf_changelog_register (char *brick_path, char *scratch_dir,
+ char *log_file, int log_level, int max_reconnects)
+{
+ struct gf_brick_spec brick = {0,};
- for (i=0; i < 256; i++) {
- gfc->rfc3986[i] =
- (isalnum(i) || i == '~' ||
- i == '-' || i == '.' || i == '_') ? i : 0;
- }
+ THIS = master;
- ret = 0;
- this->private = gfc;
+ brick.brick_path = brick_path;
+ brick.filter = CHANGELOG_OP_TYPE_JOURNAL;
- goto out;
+ brick.init = gf_changelog_journal_init;
+ brick.fini = gf_changelog_journal_fini;
+ brick.callback = gf_changelog_handle_journal;
+ brick.connected = gf_changelog_journal_connect;
+ brick.disconnected = gf_changelog_journal_disconnect;
- cleanup:
- if (gfc->hist_gfc) {
- gf_changelog_cleanup (gfc->hist_gfc);
- GF_FREE (gfc->hist_gfc);
- }
- gf_changelog_cleanup (gfc);
- GF_FREE (gfc);
- this->private = NULL;
- errno = errn;
+ brick.ptr = scratch_dir;
- out:
- return ret;
+ return gf_changelog_register_generic (&brick, 1, 1,
+ log_file, log_level, NULL);
}