summaryrefslogtreecommitdiffstats
path: root/xlators/features/changelog/src/changelog-helpers.h
diff options
context:
space:
mode:
authorVenky Shankar <vshankar@redhat.com>2015-02-03 19:22:16 +0530
committerVijay Bellur <vbellur@redhat.com>2015-03-18 18:22:36 -0700
commit4737584fffcd25dbe35d17b076c95bf90a422cf2 (patch)
tree9f30e0e90c88c245787b78af3ca78d7ae05e30f2 /xlators/features/changelog/src/changelog-helpers.h
parent728fcd41eb39f66744d84b979dd8195fd47313ed (diff)
features/changelog: RPC'fy {libgf}changelog
This patch introduces RPC based communication between the changelog translator and libgfchangelog. It replaces the old pathetic stream based interaction that existed earlier (due to time constraints :-/). Changelog, upon initialization starts a RPC server (rpcsvc) allowing clients to invoke a probe API as a bootup mechanism to request for event notifications. During probe, clients can choose an event filter specifying the type(s) of events they are interested in. As of now there is no way to change the event notification set once the probe RPC call is made, but that is easier to implement. The actual event notifications is done on a separate RPC session. The client (libgfchangelog) itself starts and RPC server which the changelog translator "connects back" during probe. Notifications are dispatched by a bunch of threads from the server (translator) and the client optionally orders them if ordered notifications are requried. FOPs fill in their respective event details in a buffer (rot-buffs to be particular) and a bunch of threads (consumers) swap the buffers out of roatation and dispatch them via RPC. To avoid writer starvation, then number of dispatcher threads is one less than the number of buffer list in rot-buffs.x libgfchangelog becomes purely callback based -- upon event notification from the server (and re-ordering them if required) invoke a callback routine specified by consumer(s). A major part of the patch is also aimed at providing backward compatibility for geo-replication, which was one of the main consumer of the stream based API. Also, this patch does not\ "turn on" event notifications for all fops, just a bunch which is currently in requirement. Another pain point is that the server does not filter events before dispatching it to the clients. That load is taken up by the client itself (although it's done at the library layer rather than making it hard on the callback implementor). This needs improvement and care needs to be taken to not load the server up with expensive filtering mechanisms. Change-Id: Ibf60a432b68f2dfa60c6f9add2bcfd37a9c41395 BUG: 1170075 Signed-off-by: Venky Shankar <vshankar@redhat.com> Reviewed-on: http://review.gluster.org/9708 Reviewed-by: Jeff Darcy <jdarcy@redhat.com> Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'xlators/features/changelog/src/changelog-helpers.h')
-rw-r--r--xlators/features/changelog/src/changelog-helpers.h107
1 files changed, 70 insertions, 37 deletions
diff --git a/xlators/features/changelog/src/changelog-helpers.h b/xlators/features/changelog/src/changelog-helpers.h
index 03a795369d1..33a99ee4eed 100644
--- a/xlators/features/changelog/src/changelog-helpers.h
+++ b/xlators/features/changelog/src/changelog-helpers.h
@@ -15,10 +15,16 @@
#include "timer.h"
#include "pthread.h"
#include "iobuf.h"
+#include "rot-buffs.h"
#include "changelog-misc.h"
#include "call-stub.h"
+#include "rpcsvc.h"
+#include "changelog-ev-handle.h"
+
+#include "changelog.h"
+
/**
* the changelog entry
*/
@@ -120,29 +126,6 @@ typedef struct changelog_fsync {
xlator_t *this;
} changelog_fsync_t;
-# define CHANGELOG_MAX_CLIENTS 5
-typedef struct changelog_notify {
- /* reader end of the pipe */
- int rfd;
-
- /* notifier thread */
- pthread_t notify_th;
-
- /* unique socket path */
- char sockpath[UNIX_PATH_MAX];
-
- int socket_fd;
-
- /**
- * simple array of accept()'ed fds. Not scalable at all
- * for large number of clients, but it's okay as we have
- * a ahrd limit in this version (@CHANGELOG_MAX_CLIENTS).
- */
- int client_fd[CHANGELOG_MAX_CLIENTS];
-
- xlator_t *this;
-} changelog_notify_t;
-
/* Draining during changelog rollover (for geo-rep snapshot dependency):
* --------------------------------------------------------------------
* The introduction of draining of in-transit fops during changelog rollover
@@ -162,14 +145,14 @@ typedef struct changelog_notify {
typedef enum chlog_fop_color {
FOP_COLOR_BLACK,
FOP_COLOR_WHITE
-}chlog_fop_color_t;
+} chlog_fop_color_t;
/* Barrier notify variable */
typedef struct barrier_notify {
pthread_mutex_t bnotify_mutex;
pthread_cond_t bnotify_cond;
gf_boolean_t bnotify;
-}barrier_notify_t;
+} barrier_notify_t;
/* Two separate mutex and conditional variable set is used
* to drain white and black fops. */
@@ -185,15 +168,26 @@ typedef struct drain_mgmt {
unsigned long white_fop_cnt;
gf_boolean_t drain_wait_black;
gf_boolean_t drain_wait_white;
-}drain_mgmt_t;
+} drain_mgmt_t;
/* External barrier as a result of snap on/off indicating flag*/
typedef struct barrier_flags {
gf_lock_t lock;
gf_boolean_t barrier_ext;
-}barrier_flags_t;
+} barrier_flags_t;
+/* Event selection */
+typedef struct changelog_ev_selector {
+ gf_lock_t reflock;
+ /**
+ * Array of references for each selection bit.
+ */
+ unsigned int ref[CHANGELOG_EV_SELECTION_RANGE];
+} changelog_ev_selector_t;
+
+
+/* changelog's private structure */
struct changelog_priv {
gf_boolean_t active;
@@ -223,9 +217,6 @@ struct changelog_priv {
/* lock to synchronize CSNAP updation */
gf_lock_t c_snap_lock;
- /* writen end of the pipe */
- int wfd;
-
/* rollover time */
int32_t rollover_time;
@@ -247,9 +238,6 @@ struct changelog_priv {
/* context of fsync thread */
changelog_fsync_t cf;
- /* context of the notifier thread */
- changelog_notify_t cn;
-
/* operation mode */
changelog_mode_t op_mode;
@@ -262,7 +250,9 @@ struct changelog_priv {
/* encoder */
struct changelog_encoder *ce;
- /* snapshot dependency changes */
+ /**
+ * snapshot dependency changes
+ */
/* Draining of fops*/
drain_mgmt_t dm;
@@ -289,6 +279,30 @@ struct changelog_priv {
gf_timer_t *timer;
struct timespec timeout;
+ /**
+ * buffers, RPC, event selection, notifications and other
+ * beasts.
+ */
+
+ /* epoll pthread */
+ pthread_t poller;
+
+ /* rotational buffer */
+ rbuf_t *rbuf;
+
+ /* changelog RPC server */
+ rpcsvc_t *rpc;
+
+ /* event selection */
+ changelog_ev_selector_t ev_selection;
+
+ /* client handling (reverse connection) */
+ pthread_t connector;
+
+ int nr_dispatchers;
+ pthread_t *ev_dispatcher;
+
+ changelog_clnt_t connections;
};
struct changelog_local {
@@ -367,7 +381,7 @@ typedef struct {
* helpers routines
*/
-void
+int
changelog_thread_cleanup (xlator_t *this, pthread_t thr_id);
void *
@@ -386,7 +400,7 @@ changelog_start_next_change (xlator_t *this,
changelog_priv_t *priv,
unsigned long ts, gf_boolean_t finale);
int
-changelog_open (xlator_t *this, changelog_priv_t *priv);
+changelog_open_journal (xlator_t *this, changelog_priv_t *priv);
int
changelog_fill_rollover_data (changelog_log_data_t *cld, gf_boolean_t is_last);
int
@@ -449,6 +463,7 @@ changelog_snap_handle_ascii_change (xlator_t *this,
changelog_log_data_t *cld);
int
changelog_snap_write_change (changelog_priv_t *priv, char *buffer, size_t len);
+
/* Changelog barrier routines */
void __chlog_barrier_enqueue (xlator_t *this, call_stub_t *stub);
void __chlog_barrier_disable (xlator_t *this, struct list_head *queue);
@@ -460,6 +475,24 @@ int32_t
changelog_fill_entry_buf (call_frame_t *frame, xlator_t *this,
loc_t *loc, changelog_local_t **local);
+/* event selection routines */
+inline void changelog_select_event (xlator_t *,
+ changelog_ev_selector_t *, unsigned int);
+inline void changelog_deselect_event (xlator_t *,
+ changelog_ev_selector_t *, unsigned int);
+inline int changelog_init_event_selection (xlator_t *,
+ changelog_ev_selector_t *);
+inline int changelog_cleanup_event_selection (xlator_t *,
+ changelog_ev_selector_t *);
+inline int changelog_ev_selected (xlator_t *,
+ changelog_ev_selector_t *, unsigned int);
+inline void
+changelog_dispatch_event (xlator_t *, changelog_priv_t *, changelog_event_t *);
+
+changelog_inode_ctx_t *
+__changelog_inode_ctx_get (xlator_t *, inode_t *, unsigned long **,
+ unsigned long *, changelog_log_type);
+
/* macros */
#define CHANGELOG_STACK_UNWIND(fop, frame, params ...) do { \
@@ -471,10 +504,10 @@ changelog_fill_entry_buf (call_frame_t *frame, xlator_t *this,
frame->local = NULL; \
} \
STACK_UNWIND_STRICT (fop, frame, params); \
- changelog_local_cleanup (__xl, __local); \
if (__local && __local->prev_entry) \
changelog_local_cleanup (__xl, \
__local->prev_entry); \
+ changelog_local_cleanup (__xl, __local); \
} while (0)
#define CHANGELOG_IOBUF_REF(iobuf) do { \