summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/lib/src/gf-changelog-helpers.h
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/changelog/lib/src/gf-changelog-helpers.h')
-rw-r--r--xlators/features/changelog/lib/src/gf-changelog-helpers.h194
1 files changed, 134 insertions, 60 deletions
diff --git a/xlators/features/changelog/lib/src/gf-changelog-helpers.h b/xlators/features/changelog/lib/src/gf-changelog-helpers.h
index 9b875d45dcc..17b8862a89b 100644
--- a/xlators/features/changelog/lib/src/gf-changelog-helpers.h
+++ b/xlators/features/changelog/lib/src/gf-changelog-helpers.h
@@ -18,6 +18,11 @@
#include <xlator.h>
+#include "changelog.h"
+
+#include "changelog-rpc-common.h"
+#include "gf-changelog-journal.h"
+
#define GF_CHANGELOG_TRACKER "tracker"
#define GF_CHANGELOG_CURRENT_DIR ".current"
@@ -41,79 +46,143 @@ typedef struct read_line {
char rl_buf[MAXLINE];
} read_line_t;
+struct gf_changelog;
+
+/**
+ * Event list for ordered event notification
+ *
+ * ->next_seq holds the next _expected_ sequence number.
+ */
+struct gf_event_list {
+ pthread_mutex_t lock; /* protects this structure */
+ pthread_cond_t cond;
+
+ pthread_t invoker;
+
+ unsigned long next_seq; /* next sequence number expected:
+ zero during bootstrap */
+
+ struct gf_changelog *entry; /* backpointer to it's brick
+ encapsulator (entry) */
+ struct list_head events; /* list of events (ordered) */
+};
+
+/**
+ * include a refcount if it's of use by additional layers
+ */
+struct gf_event {
+ int count;
+
+ unsigned long seq;
+
+ struct list_head list;
+
+ struct iovec iov[0];
+};
+#define GF_EVENT_CALLOC_SIZE(cnt, len) \
+ (sizeof (struct gf_event) + (cnt * sizeof (struct iovec)) + len)
+
+/**
+ * assign the base address of the IO vector to the correct memory
+ * area and set it's addressable length.
+ */
+#define GF_EVENT_ASSIGN_IOVEC(vec, event, len, pos) \
+ do { \
+ vec->iov_base = ((char *)event) + \
+ sizeof (struct gf_event) + \
+ (event->count * sizeof (struct iovec)) + pos; \
+ vec->iov_len = len; \
+ pos += len; \
+ } while (0)
+
+/**
+ * An instance of this structure is allocated for each brick for which
+ * notifications are streamed.
+ */
typedef struct gf_changelog {
xlator_t *this;
- /* 'processing' directory stream */
- DIR *gfc_dir;
-
- /* fd to the tracker file */
- int gfc_fd;
-
- /* connection retries */
- int gfc_connretries;
+ struct list_head list; /* list of instances */
- char gfc_sockpath[UNIX_PATH_MAX];
+ char brick[PATH_MAX]; /* brick path for this end-point */
- char gfc_brickpath[PATH_MAX];
+ changelog_rpc_t grpc; /* rpc{-clnt,svc} for this brick */
+#define RPC_PROBER(ent) ent->grpc.rpc
+#define RPC_REBORP(ent) ent->grpc.svc
+#define RPC_SOCK(ent) ent->grpc.sock
- /* socket for receiving notifications */
- int gfc_sockfd;
+ unsigned int notify; /* notification flag(s) */
- char *gfc_working_dir;
+ FINI *fini; /* destructor callback */
+ CALLBACK *callback; /* event callback dispatcher */
+ CONNECT *connected; /* connect callback */
+ DISCONNECT *disconnected; /* disconnection callback */
- /* RFC 3986 string encoding */
- char rfc3986[256];
+ void *ptr; /* owner specific private data */
+ xlator_t *invokerxl; /* consumers _this_, if valid,
+ assigned to THIS before cbk is
+ invoked */
- char gfc_current_dir[PATH_MAX];
- char gfc_processed_dir[PATH_MAX];
- char gfc_processing_dir[PATH_MAX];
+ gf_boolean_t ordered;
- pthread_t gfc_changelog_processor;
-
- /* Holds gfc for History API */
- struct gf_changelog *hist_gfc;
-
- /* holds 0 done scanning, 1 keep scanning and -1 error */
- int hist_done;
+ struct gf_event_list event;
} gf_changelog_t;
-typedef struct gf_changelog_history_data {
- int len;
-
- int htime_fd;
-
- /* parallelism count */
- int n_parallel;
-
- /* history from, to indexes */
- unsigned long from;
- unsigned long to;
-} gf_changelog_history_data_t;
-
-typedef struct gf_changelog_consume_data {
- /** set of inputs */
-
- /* fd to read from */
- int fd;
-
- /* from @offset */
- off_t offset;
-
- xlator_t *this;
- gf_changelog_t *gfc;
-
- /** set of outputs */
+static inline int
+gf_changelog_filter_check (gf_changelog_t *entry, changelog_event_t *event)
+{
+ if (event->ev_type & entry->notify)
+ return 1;
+ return 0;
+}
+
+#define GF_NEED_ORDERED_EVENTS(ent) (ent->ordered == _gf_true)
+
+/** private structure */
+typedef struct gf_private {
+ gf_lock_t lock; /* protects ->connections */
+
+ void *api; /* pointer for API access */
+
+ pthread_t poller; /* event poller thread */
+
+ struct list_head connections; /* list of connections */
+} gf_private_t;
+
+#define GF_CHANGELOG_GET_API_PTR(this) (((gf_private_t *) this->private)->api)
+
+/**
+ * upcall: invoke callback with _correct_ THIS
+ */
+#define GF_CHANGELOG_INVOKE_CBK(this, cbk, brick, args ...) \
+ do { \
+ xlator_t *old_this = NULL; \
+ xlator_t *invokerxl = NULL; \
+ \
+ invokerxl = entry->invokerxl; \
+ old_this = this; \
+ \
+ if (invokerxl) { \
+ THIS = invokerxl; \
+ } \
+ \
+ cbk (invokerxl, brick, args); \
+ THIS = old_this; \
+ \
+ } while (0)
- /* return value */
- int retval;
+#define SAVE_THIS(xl) \
+ do { \
+ old_this = xl; \
+ THIS = master; \
+ } while (0)
- /* journal processed */
- char changelog[PATH_MAX];
-} gf_changelog_consume_data_t;
+#define RESTORE_THIS() \
+ do { \
+ THIS = old_this; \
+ } while (0)
-int
-gf_changelog_notification_init (xlator_t *this, gf_changelog_t *gfc);
+/** APIs and the rest */
void *
gf_changelog_process (void *data);
@@ -138,9 +207,14 @@ gf_lseek (int fd, off_t offset, int whence);
int
gf_changelog_consume (xlator_t *this,
- gf_changelog_t *gfc,
+ gf_changelog_journal_t *jnl,
char *from_path, gf_boolean_t no_publish);
int
-gf_changelog_publish (xlator_t *this, gf_changelog_t *gfc, char *from_path);
+gf_changelog_publish (xlator_t *this,
+ gf_changelog_journal_t *jnl, char *from_path);
+int
+gf_thread_cleanup (xlator_t *this, pthread_t thread);
+void *
+gf_changelog_callback_invoker (void *arg);
#endif