summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/dht/src
diff options
context:
space:
mode:
authorDan Lambright <dlambrig@redhat.com>2015-02-22 11:05:58 -0500
committerVijay Bellur <vbellur@redhat.com>2015-03-21 09:50:29 -0700
commit20355992e8eed7d3ed78a23bc7e922d6ae94860d (patch)
treec7af71245b57d867ef83edf91dbfe69e622a478d /xlators/cluster/dht/src
parent8aa13c8e285ad496ed7c8511ae0b735eed73ebd4 (diff)
cluster/dht: Add tier translator.
The tier translator shares most of DHT's code. It differs in how subvolumes are chosen for I/Os, and how file migration (cache promotion and demotion) is managed. That different functionality is split to either DHT or tier logic according to the "tier_methods" structure. A cache promotion and demotion thread is created in a manner similar to the rebalance daemon. The thread operates a timing wheel which periodically checks for promotion and demotion candidates (files). Candidates are queued and then migrated. Candidates must exist on the same node as the daemon and meet other critera per caching policies. This patch has two authors (Dan Lambright and Joseph Fernandes). Dan did the DHT changes and Joe wrote the cache policies. The fix depends on DHT readidr changes and the database library which have been submitted separately. Header files in libglusterfs/src/gfdb should be reviewed in patch 9683. For more background and design see the feature page [1]. [1] http://www.gluster.org/community/documentation/index.php/Features/data-classification Change-Id: Icc26c517ccecf5c42aef039f5b9c6f7afe83e46c BUG: 1194753 Signed-off-by: Dan Lambright <dlambrig@redhat.com> Reviewed-on: http://review.gluster.org/9724 Reviewed-by: Vijay Bellur <vbellur@redhat.com> Tested-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'xlators/cluster/dht/src')
-rw-r--r--xlators/cluster/dht/src/Makefile.am11
-rw-r--r--xlators/cluster/dht/src/dht-common.c100
-rw-r--r--xlators/cluster/dht/src/dht-common.h29
-rw-r--r--xlators/cluster/dht/src/dht-helper.c22
-rw-r--r--xlators/cluster/dht/src/dht-inode-read.c1
-rw-r--r--xlators/cluster/dht/src/dht-messages.h19
-rw-r--r--xlators/cluster/dht/src/dht-rebalance.c28
-rw-r--r--xlators/cluster/dht/src/dht-shared.c31
-rw-r--r--xlators/cluster/dht/src/dht.c7
-rw-r--r--xlators/cluster/dht/src/nufa.c5
-rw-r--r--xlators/cluster/dht/src/tier.c1007
-rw-r--r--xlators/cluster/dht/src/tier.h71
12 files changed, 1300 insertions, 31 deletions
diff --git a/xlators/cluster/dht/src/Makefile.am b/xlators/cluster/dht/src/Makefile.am
index 8d02749f4d9..46dc4bb840f 100644
--- a/xlators/cluster/dht/src/Makefile.am
+++ b/xlators/cluster/dht/src/Makefile.am
@@ -1,4 +1,4 @@
-xlator_LTLIBRARIES = dht.la nufa.la switch.la
+xlator_LTLIBRARIES = dht.la nufa.la switch.la tier.la
xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/cluster
dht_common_source = dht-layout.c dht-helper.c dht-linkfile.c dht-rebalance.c \
@@ -10,6 +10,7 @@ dht_la_SOURCES = $(dht_common_source) dht.c
nufa_la_SOURCES = $(dht_common_source) nufa.c
switch_la_SOURCES = $(dht_common_source) switch.c
+tier_la_SOURCES = $(dht_common_source) tier.c
dht_la_LDFLAGS = -module -avoid-version
dht_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
@@ -20,10 +21,16 @@ nufa_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
switch_la_LDFLAGS = -module -avoid-version
switch_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
-noinst_HEADERS = dht-common.h dht-mem-types.h dht-messages.h dht-helper.h \
+tier_la_LDFLAGS = -module -avoid-version
+tier_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la\
+ $(top_builddir)/libglusterfs/src/gfdb/libgfdb.la
+AM_CFLAGS = -Wall $(GF_CFLAGS) $(SQLITE_CFLAGS)
+
+noinst_HEADERS = dht-common.h dht-mem-types.h dht-messages.h dht-helper.h tier.h\
$(top_builddir)/xlators/lib/src/libxlator.h
AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \
+ -I$(top_srcdir)/libglusterfs/src/gfdb \
-I$(top_srcdir)/xlators/lib/src
AM_CFLAGS = -Wall $(GF_CFLAGS)
diff --git a/xlators/cluster/dht/src/dht-common.c b/xlators/cluster/dht/src/dht-common.c
index 9fda4aa07d6..3a196e07be9 100644
--- a/xlators/cluster/dht/src/dht-common.c
+++ b/xlators/cluster/dht/src/dht-common.c
@@ -3175,6 +3175,7 @@ dht_setxattr (call_frame_t *frame, xlator_t *this,
xlator_t *subvol = NULL;
dht_local_t *local = NULL;
dht_conf_t *conf = NULL;
+ dht_methods_t *methods = NULL;
dht_layout_t *layout = NULL;
int i = 0;
int op_errno = EINVAL;
@@ -3191,6 +3192,10 @@ dht_setxattr (call_frame_t *frame, xlator_t *this,
VALIDATE_OR_GOTO (loc->inode, err);
conf = this->private;
+ GF_VALIDATE_OR_GOTO (this->name, conf, err);
+
+ methods = conf->methods;
+ GF_VALIDATE_OR_GOTO (this->name, conf->methods, err);
GF_IF_INTERNAL_XATTR_GOTO (conf->wild_xattr_name, xattr,
op_errno, err);
@@ -3255,8 +3260,8 @@ dht_setxattr (call_frame_t *frame, xlator_t *this,
goto err;
}
- local->rebalance.target_node =
- dht_subvol_get_hashed (this, &local->loc);
+ methods->migration_get_dst_subvol(this, local);
+
if (!local->rebalance.target_node) {
gf_msg (this->name, GF_LOG_ERROR, 0,
DHT_MSG_HASHED_SUBVOL_GET_FAILED,
@@ -3719,7 +3724,6 @@ dht_statfs (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
VALIDATE_OR_GOTO (frame, err);
VALIDATE_OR_GOTO (this, err);
VALIDATE_OR_GOTO (loc, err);
- VALIDATE_OR_GOTO (loc->inode, err);
VALIDATE_OR_GOTO (this->private, err);
conf = this->private;
@@ -3730,7 +3734,7 @@ dht_statfs (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
goto err;
}
- if (IA_ISDIR (loc->inode->ia_type)) {
+ if (!loc->inode || IA_ISDIR (loc->inode->ia_type)) {
local->call_cnt = conf->subvolume_cnt;
for (i = 0; i < conf->subvolume_cnt; i++) {
@@ -3820,6 +3824,7 @@ dht_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int op_ret,
int count = 0;
dht_layout_t *layout = 0;
dht_conf_t *conf = NULL;
+ dht_methods_t *methods = NULL;
xlator_t *subvol = 0;
xlator_t *hashed_subvol = 0;
int ret = 0;
@@ -3828,7 +3833,12 @@ dht_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int op_ret,
INIT_LIST_HEAD (&entries.list);
prev = cookie;
local = frame->local;
+
conf = this->private;
+ GF_VALIDATE_OR_GOTO(this->name, conf, unwind);
+
+ methods = conf->methods;
+ GF_VALIDATE_OR_GOTO(this->name, conf->methods, done);
if (op_ret < 0)
goto done;
@@ -3867,8 +3877,8 @@ dht_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int op_ret,
}
- hashed_subvol = dht_layout_search (this, layout, \
- orig_entry->d_name);
+ hashed_subvol = methods->layout_search (this, layout,
+ orig_entry->d_name);
if (prev->this == hashed_subvol)
goto list;
@@ -3894,8 +3904,8 @@ list:
/* Do this if conf->search_unhashed is set to "auto" */
if (conf->search_unhashed == GF_DHT_LOOKUP_UNHASHED_AUTO) {
- subvol = dht_layout_search (this, layout,
- orig_entry->d_name);
+ subvol = methods->layout_search (this, layout,
+ orig_entry->d_name);
if (!subvol || (subvol != prev->this)) {
/* TODO: Count the number of entries which need
linkfile to prove its existence in fs */
@@ -4008,11 +4018,19 @@ dht_readdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int count = 0;
dht_layout_t *layout = 0;
xlator_t *subvol = 0;
+ dht_conf_t *conf = NULL;
+ dht_methods_t *methods = NULL;
INIT_LIST_HEAD (&entries.list);
prev = cookie;
local = frame->local;
+ conf = this->private;
+ GF_VALIDATE_OR_GOTO (this->name, conf, done);
+
+ methods = conf->methods;
+ GF_VALIDATE_OR_GOTO (this->name, conf->methods, done);
+
if (op_ret < 0)
goto done;
@@ -4024,7 +4042,8 @@ dht_readdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
list_for_each_entry (orig_entry, (&orig_entries->list), list) {
next_offset = orig_entry->d_off;
- subvol = dht_layout_search (this, layout, orig_entry->d_name);
+ subvol = methods->layout_search (this, layout,
+ orig_entry->d_name);
if (!subvol || (subvol == prev->this)) {
entry = gf_dirent_for_name (orig_entry->d_name);
@@ -6073,11 +6092,13 @@ dht_notify (xlator_t *this, int event, void *data, ...)
gf_defrag_type cmd = 0;
dict_t *output = NULL;
va_list ap;
-
+ dht_methods_t *methods = NULL;
conf = this->private;
- if (!conf)
- return ret;
+ GF_VALIDATE_OR_GOTO (this->name, conf, out);
+
+ methods = conf->methods;
+ GF_VALIDATE_OR_GOTO (this->name, methods, out);
/* had all subvolumes reported status once till now? */
had_heard_from_all = 1;
@@ -6271,12 +6292,18 @@ unlock:
* not need to handle CHILD_DOWN event here.
*/
if (conf->defrag) {
- ret = gf_thread_create (&conf->defrag->th, NULL,
- gf_defrag_start, this);
- if (ret) {
- conf->defrag = NULL;
+ if (methods->migration_needed(this)) {
+ ret = gf_thread_create(&conf->defrag->th,
+ NULL,
+ gf_defrag_start, this);
+ if (ret) {
+ GF_FREE (conf->defrag);
+ conf->defrag = NULL;
+ kill (getpid(), SIGTERM);
+ }
+ } else {
GF_FREE (conf->defrag);
- kill (getpid(), SIGTERM);
+ conf->defrag = NULL;
}
}
}
@@ -6284,7 +6311,7 @@ unlock:
ret = 0;
if (propagate)
ret = default_notify (this, event, data);
-
+out:
return ret;
}
@@ -6412,3 +6439,40 @@ dht_log_new_layout_for_dir_selfheal (xlator_t *this, loc_t *loc,
err:
GF_FREE (output_string);
}
+
+int32_t dht_migration_get_dst_subvol(xlator_t *this, dht_local_t *local)
+{
+ int ret = -1;
+
+ if (!local)
+ goto out;
+
+ local->rebalance.target_node =
+ dht_subvol_get_hashed (this, &local->loc);
+
+ if (local->rebalance.target_node)
+ ret = 0;
+
+out:
+ return ret;
+}
+
+int32_t dht_migration_needed(xlator_t *this)
+{
+ gf_defrag_info_t *defrag = NULL;
+ dht_conf_t *conf = NULL;
+ int ret = 0;
+
+ conf = this->private;
+
+ GF_VALIDATE_OR_GOTO ("dht", conf, out);
+ GF_VALIDATE_OR_GOTO ("dht", conf->defrag, out);
+
+ defrag = conf->defrag;
+
+ if (defrag->cmd != GF_DEFRAG_CMD_START_TIER)
+ ret = 1;
+
+out:
+ return ret;
+}
diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h
index 67e693146af..9145f336d7c 100644
--- a/xlators/cluster/dht/src/dht-common.h
+++ b/xlators/cluster/dht/src/dht-common.h
@@ -267,6 +267,8 @@ enum gf_defrag_type {
GF_DEFRAG_CMD_STATUS = 1 + 2,
GF_DEFRAG_CMD_START_LAYOUT_FIX = 1 + 3,
GF_DEFRAG_CMD_START_FORCE = 1 + 4,
+ GF_DEFRAG_CMD_START_TIER = 1 + 5,
+ GF_DEFRAG_CMD_STATUS_TIER = 1 + 6,
};
typedef enum gf_defrag_type gf_defrag_type;
@@ -310,10 +312,31 @@ struct gf_defrag_info_ {
struct timeval start_time;
gf_boolean_t stats;
gf_defrag_pattern_list_t *defrag_pattern;
+ int tier_promote_frequency;
+ int tier_demote_frequency;
+
+ /*Data Tiering params for scanner*/
+ uint64_t total_files_promoted;
+ uint64_t total_files_demoted;
+ int write_freq_threshold;
+ int read_freq_threshold;
};
typedef struct gf_defrag_info_ gf_defrag_info_t;
+struct dht_methods_s {
+ int32_t (*migration_get_dst_subvol)(xlator_t *this,
+ dht_local_t *local);
+ int32_t (*migration_other)(xlator_t *this,
+ gf_defrag_info_t *defrag);
+ int32_t (*migration_needed)(xlator_t *this);
+ xlator_t* (*layout_search)(xlator_t *this,
+ dht_layout_t *layout,
+ const char *name);
+};
+
+typedef struct dht_methods_s dht_methods_t;
+
struct dht_conf {
gf_lock_t subvolume_lock;
int subvolume_cnt;
@@ -371,6 +394,8 @@ struct dht_conf {
gf_boolean_t do_weighting;
gf_boolean_t randomize_by_gfid;
+ dht_methods_t *methods;
+
struct mem_pool *lock_pool;
};
typedef struct dht_conf dht_conf_t;
@@ -477,6 +502,10 @@ dht_layout_t *dht_layout_get (xlator_t *this, inode_t
dht_layout_t *dht_layout_for_subvol (xlator_t *this, xlator_t *subvol);
xlator_t *dht_layout_search (xlator_t *this, dht_layout_t *layout,
const char *name);
+int32_t
+dht_migration_get_dst_subvol(xlator_t *this, dht_local_t *local);
+int32_t
+dht_migration_needed(xlator_t *this);
int dht_layout_normalize (xlator_t *this, loc_t *loc, dht_layout_t *layout);
int dht_layout_anomalies (xlator_t *this, loc_t *loc, dht_layout_t *layout,
uint32_t *holes_p, uint32_t *overlaps_p,
diff --git a/xlators/cluster/dht/src/dht-helper.c b/xlators/cluster/dht/src/dht-helper.c
index f4e5305d791..346d19bec88 100644
--- a/xlators/cluster/dht/src/dht-helper.c
+++ b/xlators/cluster/dht/src/dht-helper.c
@@ -502,10 +502,18 @@ dht_subvol_get_hashed (xlator_t *this, loc_t *loc)
{
dht_layout_t *layout = NULL;
xlator_t *subvol = NULL;
+ dht_conf_t *conf = NULL;
+ dht_methods_t *methods = NULL;
GF_VALIDATE_OR_GOTO ("dht", this, out);
GF_VALIDATE_OR_GOTO (this->name, loc, out);
+ conf = this->private;
+ GF_VALIDATE_OR_GOTO (this->name, conf, out);
+
+ methods = conf->methods;
+ GF_VALIDATE_OR_GOTO (this->name, conf->methods, out);
+
if (__is_root_gfid (loc->gfid)) {
subvol = dht_first_up_subvol (this);
goto out;
@@ -523,7 +531,7 @@ dht_subvol_get_hashed (xlator_t *this, loc_t *loc)
goto out;
}
- subvol = dht_layout_search (this, layout, loc->name);
+ subvol = methods->layout_search (this, layout, loc->name);
if (!subvol) {
gf_msg_debug (this->name, 0,
@@ -846,6 +854,18 @@ dht_migration_complete_check_task (void *data)
SYNCTASK_SETID (frame->root->uid, frame->root->gid);
}
+ /*
+ * temporary check related to tier promoting/demoting the file;
+ * the lower level DHT detects the migration (due to sticky
+ * bits) when it is the responsibility of the tier translator
+ * to complete the rebalance transaction. It will be corrected
+ * when rebalance and tier migration are fixed to work together.
+ */
+ if (strcmp(this->parents->xlator->type, "cluster/tier") == 0) {
+ ret = 0;
+ goto out;
+ }
+
if (!ret)
dst_node = dht_linkfile_subvol (this, NULL, NULL, dict);
diff --git a/xlators/cluster/dht/src/dht-inode-read.c b/xlators/cluster/dht/src/dht-inode-read.c
index d78dd2ea0ef..78e3ef4233b 100644
--- a/xlators/cluster/dht/src/dht-inode-read.c
+++ b/xlators/cluster/dht/src/dht-inode-read.c
@@ -1064,7 +1064,6 @@ dht_inodelk (call_frame_t *frame, xlator_t *this, const char *volume,
VALIDATE_OR_GOTO (this, err);
VALIDATE_OR_GOTO (loc, err);
VALIDATE_OR_GOTO (loc->inode, err);
- VALIDATE_OR_GOTO (loc->path, err);
local = dht_local_init (frame, loc, NULL, GF_FOP_INODELK);
if (!local) {
diff --git a/xlators/cluster/dht/src/dht-messages.h b/xlators/cluster/dht/src/dht-messages.h
index f4096ae8f1e..eb4c356e3c2 100644
--- a/xlators/cluster/dht/src/dht-messages.h
+++ b/xlators/cluster/dht/src/dht-messages.h
@@ -441,12 +441,25 @@
#define DHT_MSG_LOG_FIXED_LAYOUT (GLFS_DHT_BASE + 36)
-/*------------*/
-#define glfs_msg_end_x GLFS_MSGID_END, "Invalid: End of messages"
+/*
+ * @messageid 109037
+ * @diagnosis Informational message regarding error in tier operation
+ * @recommendedaction None
+ */
+#define DHT_MSG_LOG_TIER_ERROR (GLFS_DHT_BASE + 37)
-#endif /* _DHT_MESSAGES_H_ */
+/*
+ * @messageid 109038
+ * @diagnosis Informational message regarding tier operation
+ * @recommendedaction None
+ */
+#define DHT_MSG_LOG_TIER_STATUS (GLFS_DHT_BASE + 38)
+/*------------*/
+#define glfs_msg_end_x GLFS_MSGID_END, "Invalid: End of messages"
+
+#endif /* _DHT_MESSAGES_H_ */
diff --git a/xlators/cluster/dht/src/dht-rebalance.c b/xlators/cluster/dht/src/dht-rebalance.c
index 3531872dd31..b838ecec4b7 100644
--- a/xlators/cluster/dht/src/dht-rebalance.c
+++ b/xlators/cluster/dht/src/dht-rebalance.c
@@ -19,6 +19,7 @@
#include <signal.h>
#include <fnmatch.h>
#include <signal.h>
+#include "tier.h"
#define GF_DISK_SECTOR_SIZE 512
#define DHT_REBALANCE_PID 4242 /* Change it if required */
@@ -919,6 +920,7 @@ dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to,
tmp_loc.inode = inode_ref (loc->inode);
uuid_copy (tmp_loc.gfid, loc->gfid);
+ tmp_loc.path = gf_strdup(loc->path);
ret = syncop_inodelk (from, DHT_FILE_MIGRATE_DOMAIN, &tmp_loc, F_SETLKW,
&flock, NULL, NULL);
@@ -984,6 +986,7 @@ dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to,
goto out;
ret = __dht_check_free_space (to, from, loc, &stbuf, flag);
+
if (ret) {
goto out;
}
@@ -1713,7 +1716,8 @@ gf_defrag_fix_layout (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,
goto out;
}
- if (defrag->cmd != GF_DEFRAG_CMD_START_LAYOUT_FIX) {
+ if ((defrag->cmd != GF_DEFRAG_CMD_START_TIER) &&
+ (defrag->cmd != GF_DEFRAG_CMD_START_LAYOUT_FIX)) {
ret = gf_defrag_migrate_data (this, defrag, loc, migrate_data);
if (ret)
goto out;
@@ -1863,7 +1867,6 @@ out:
}
-
int
gf_defrag_start_crawl (void *data)
{
@@ -1878,6 +1881,7 @@ gf_defrag_start_crawl (void *data)
dict_t *migrate_data = NULL;
dict_t *status = NULL;
glusterfs_ctx_t *ctx = NULL;
+ dht_methods_t *methods = NULL;
this = data;
if (!this)
@@ -1942,7 +1946,8 @@ gf_defrag_start_crawl (void *data)
goto out;
}
- if (defrag->cmd != GF_DEFRAG_CMD_START_LAYOUT_FIX) {
+ if ((defrag->cmd != GF_DEFRAG_CMD_START_TIER) &&
+ (defrag->cmd != GF_DEFRAG_CMD_START_LAYOUT_FIX)) {
migrate_data = dict_new ();
if (!migrate_data) {
ret = -1;
@@ -1959,15 +1964,28 @@ gf_defrag_start_crawl (void *data)
if (ret)
goto out;
}
+
ret = gf_defrag_fix_layout (this, defrag, &loc, fix_layout,
migrate_data);
+
+ if (defrag->cmd == GF_DEFRAG_CMD_START_TIER) {
+ methods = conf->methods;
+ if (!methods) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "Methods invalid for translator.");
+ defrag->defrag_status = GF_DEFRAG_STATUS_FAILED;
+ ret = -1;
+ goto out;
+ }
+ methods->migration_other(this, defrag);
+ }
+
if ((defrag->defrag_status != GF_DEFRAG_STATUS_STOPPED) &&
(defrag->defrag_status != GF_DEFRAG_STATUS_FAILED)) {
defrag->defrag_status = GF_DEFRAG_STATUS_COMPLETE;
}
-
-
out:
LOCK (&defrag->lock);
{
diff --git a/xlators/cluster/dht/src/dht-shared.c b/xlators/cluster/dht/src/dht-shared.c
index 860f3e716f0..1e666bd8140 100644
--- a/xlators/cluster/dht/src/dht-shared.c
+++ b/xlators/cluster/dht/src/dht-shared.c
@@ -29,6 +29,8 @@
*/
struct volume_options options[];
+extern dht_methods_t dht_methods;
+
void
dht_layout_dump (dht_layout_t *layout, const char *prefix)
{
@@ -701,6 +703,8 @@ dht_init (xlator_t *this)
if (dht_set_subvol_range(this))
goto err;
+ conf->methods = &dht_methods;
+
return 0;
err:
@@ -847,6 +851,33 @@ struct volume_options options[] = {
.type = GF_OPTION_TYPE_XLATOR
},
+ /* tier options */
+ { .key = {"tier-promote-frequency"},
+ .type = GF_OPTION_TYPE_INT,
+ .default_value = "120",
+ .description = "Frequency to promote files to fast tier"
+ },
+
+ { .key = {"tier-demote-frequency"},
+ .type = GF_OPTION_TYPE_INT,
+ .default_value = "120",
+ .description = "Frequency to demote files to slow tier"
+ },
+
+ { .key = {"write-freq-thresold"},
+ .type = GF_OPTION_TYPE_INT,
+ .default_value = "0",
+ .description = "Defines the write fequency "
+ "that would be considered hot"
+ },
+
+ { .key = {"read-freq-thresold"},
+ .type = GF_OPTION_TYPE_INT,
+ .default_value = "0",
+ .description = "Defines the read fequency "
+ "that would be considered hot"
+ },
+
/* switch option */
{ .key = {"pattern.switch.case"},
.type = GF_OPTION_TYPE_ANY
diff --git a/xlators/cluster/dht/src/dht.c b/xlators/cluster/dht/src/dht.c
index fc0ca2f7735..3934df5ec64 100644
--- a/xlators/cluster/dht/src/dht.c
+++ b/xlators/cluster/dht/src/dht.c
@@ -17,6 +17,12 @@
#include "statedump.h"
#include "dht-common.h"
+dht_methods_t dht_methods = {
+ .migration_get_dst_subvol = dht_migration_get_dst_subvol,
+ .migration_needed = dht_migration_needed,
+ .layout_search = dht_layout_search,
+};
+
class_methods_t class_methods = {
.init = dht_init,
.fini = dht_fini,
@@ -86,4 +92,3 @@ struct xlator_cbks cbks = {
// .releasedir = dht_releasedir,
.forget = dht_forget
};
-;
diff --git a/xlators/cluster/dht/src/nufa.c b/xlators/cluster/dht/src/nufa.c
index f188a5479f4..72d6d9c10e5 100644
--- a/xlators/cluster/dht/src/nufa.c
+++ b/xlators/cluster/dht/src/nufa.c
@@ -621,6 +621,11 @@ nufa_init (xlator_t *this)
return 0;
}
+dht_methods_t dht_methods = {
+ .migration_get_dst_subvol = dht_migration_get_dst_subvol,
+ .migration_needed = dht_migration_needed,
+ .layout_search = dht_layout_search,
+};
class_methods_t class_methods = {
.init = nufa_init,
diff --git a/xlators/cluster/dht/src/tier.c b/xlators/cluster/dht/src/tier.c
new file mode 100644
index 00000000000..028a42f7a1a
--- /dev/null
+++ b/xlators/cluster/dht/src/tier.c
@@ -0,0 +1,1007 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "dht-common.h"
+#include "tier.h"
+
+/*Hard coded DB info*/
+static gfdb_db_type_t dht_tier_db_type = GFDB_SQLITE3;
+/*Hard coded DB info*/
+
+/*Mutex for updating the data movement stats*/
+static pthread_mutex_t dm_stat_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+#define DB_QUERY_RECORD_SIZE 4096
+
+static int
+tier_parse_query_str (char *query_record_str,
+ char *gfid, char *link_buffer, ssize_t *link_size)
+{
+ char *token_str = NULL;
+ char *delimiter = "|";
+ char *saveptr = NULL;
+ int ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("tier", query_record_str, out);
+ GF_VALIDATE_OR_GOTO ("tier", gfid, out);
+ GF_VALIDATE_OR_GOTO ("tier", link_buffer, out);
+ GF_VALIDATE_OR_GOTO ("tier", link_size, out);
+
+ token_str = strtok_r (query_record_str, delimiter, &saveptr);
+ if (!token_str)
+ goto out;
+
+ strcpy (gfid, token_str);
+
+
+ token_str = strtok_r (NULL, delimiter, &saveptr);
+ if (!token_str)
+ goto out;
+
+ strcpy (link_buffer, token_str);
+
+ token_str = strtok_r (NULL, delimiter, &saveptr);
+ if (!token_str)
+ goto out;
+
+ *link_size = atoi (token_str);
+
+ ret = 0;
+out:
+ return ret;
+}
+
+static int
+tier_migrate_using_query_file (void *_args)
+{
+ int ret = -1;
+ char gfid_str[UUID_CANONICAL_FORM_LEN+1] = "";
+ char query_record_str[4096] = "";
+ query_cbk_args_t *query_cbk_args = (query_cbk_args_t *) _args;
+ xlator_t *this = NULL;
+ gf_defrag_info_t *defrag = NULL;
+ char *token_str = NULL;
+ char *delimiter = "::";
+ char *link_buffer = NULL;
+ gfdb_query_record_t *query_record = NULL;
+ gfdb_link_info_t *link_info = NULL;
+ struct iatt par_stbuf = {0,};
+ struct iatt current = {0,};
+ loc_t p_loc = {0,};
+ loc_t loc = {0,};
+ dict_t *migrate_data = NULL;
+ inode_t *linked_inode = NULL;
+ int per_file_status = 0;
+ int per_link_status = 0;
+ int total_status = 0;
+ FILE *queryFILE = NULL;
+ char *link_str = NULL;
+
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args, out);
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->this, out);
+ this = query_cbk_args->this;
+ GF_VALIDATE_OR_GOTO (this->name, query_cbk_args->defrag, out);
+ GF_VALIDATE_OR_GOTO (this->name, query_cbk_args->queryFILE, out);
+
+ defrag = query_cbk_args->defrag;
+
+ queryFILE = query_cbk_args->queryFILE;
+
+ query_record = gfdb_query_record_init();
+ if (!query_record) {
+ goto out;
+ }
+
+ query_record->_link_info_str = calloc (DB_QUERY_RECORD_SIZE, 1);
+ if (!query_record->_link_info_str) {
+ goto out;
+ }
+ link_buffer = query_record->_link_info_str;
+
+ link_info = gfdb_link_info_init ();
+
+ migrate_data = dict_new ();
+ if (!migrate_data)
+ goto out;
+
+ /* Per file */
+ while (fscanf (queryFILE, "%s", query_record_str) != EOF) {
+
+ per_file_status = 0;
+ per_link_status = 0;
+
+ memset (gfid_str, 0, UUID_CANONICAL_FORM_LEN+1);
+ memset (query_record->_link_info_str, 0, DB_QUERY_RECORD_SIZE);
+
+ if (tier_parse_query_str (query_record_str, gfid_str,
+ link_buffer,
+ &query_record->link_info_size)) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "failed parsing %s\n", query_record_str);
+ continue;
+ }
+
+ uuid_parse (gfid_str, query_record->gfid);
+
+ if (dict_get(migrate_data, GF_XATTR_FILE_MIGRATE_KEY))
+ dict_del(migrate_data, GF_XATTR_FILE_MIGRATE_KEY);
+
+ if (dict_get(migrate_data, "from.migrator"))
+ dict_del(migrate_data, "from.migrator");
+
+ token_str = strtok (link_buffer, delimiter);
+ if (token_str != NULL) {
+ per_file_status =
+ dict_set_str (migrate_data,
+ GF_XATTR_FILE_MIGRATE_KEY,
+ "force");
+ if (per_file_status) {
+ goto per_file_out;
+ }
+
+ /* Flag to suggest the xattr call is from migrator */
+ per_file_status = dict_set_str (migrate_data,
+ "from.migrator", "yes");
+ if (per_file_status) {
+ goto per_file_out;
+ }
+ }
+ per_link_status = 0;
+ /* Per link of file */
+ while (token_str != NULL) {
+
+ link_str = gf_strdup (token_str);
+
+ if (!link_info) {
+ per_link_status = -1;
+ goto per_file_out;
+ }
+
+ memset (link_info, 0, sizeof(gfdb_link_info_t));
+
+ ret = str_to_link_info (link_str, link_info);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "failed parsing %s\n", link_str);
+ per_link_status = -1;
+ goto error;
+ }
+
+ uuid_copy (p_loc.gfid, link_info->pargfid);
+
+ p_loc.inode = inode_new (defrag->root_inode->table);
+ if (!p_loc.inode) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "failed parsing %s\n", link_str);
+ per_link_status = -1;
+ goto error;
+ }
+
+ ret = syncop_lookup (this, &p_loc, NULL, &par_stbuf,
+ NULL, NULL);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ " ERROR in parent lookup\n");
+ per_link_status = -1;
+ goto error;
+ }
+ linked_inode = inode_link (p_loc.inode, NULL, NULL,
+ &par_stbuf);
+ inode_unref (p_loc.inode);
+ p_loc.inode = linked_inode;
+
+ uuid_copy (loc.gfid, query_record->gfid);
+ loc.inode = inode_new (defrag->root_inode->table);
+ uuid_copy (loc.pargfid, link_info->pargfid);
+ loc.parent = inode_ref(p_loc.inode);
+
+ loc.name = gf_strdup (link_info->file_name);
+ if (!loc.name) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR, "ERROR in "
+ "memory allocation\n");
+ per_link_status = -1;
+ goto error;
+ }
+
+ loc.path = gf_strdup (link_info->file_path);
+ if (!loc.path) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR, "ERROR in "
+ "memory allocation\n");
+ per_link_status = -1;
+ goto error;
+ }
+
+ uuid_copy (loc.parent->gfid, link_info->pargfid);
+
+ ret = syncop_lookup (this, &loc, NULL, &current,
+ NULL, NULL);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR, "ERROR in "
+ "current lookup\n");
+ per_link_status = -1;
+ goto error;
+ }
+ linked_inode = inode_link (loc.inode, NULL, NULL,
+ &current);
+ inode_unref (loc.inode);
+ loc.inode = linked_inode;
+
+ gf_msg (this->name, GF_LOG_INFO, 0,
+ DHT_MSG_LOG_TIER_STATUS, "Tier migrate file %s",
+ loc.name);
+
+ ret = syncop_setxattr (this, &loc, migrate_data, 0);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR, "ERROR %d in "
+ "current migration %s %s\n", ret,
+ loc.name,
+ loc.path);
+ per_link_status = -1;
+ goto error;
+ }
+ inode_unref (loc.inode);
+ inode_unref (loc.parent);
+ inode_unref (p_loc.inode);
+error:
+ if (loc.name) {
+ GF_FREE ((char *) loc.name);
+ }
+ if (loc.path) {
+ GF_FREE ((char *) loc.path);
+ }
+ token_str = NULL;
+ token_str = strtok (NULL, delimiter);
+ GF_FREE (link_str);
+ }
+ per_file_status = per_link_status;
+per_file_out:
+ if (per_file_status) {
+ pthread_mutex_lock (&dm_stat_mutex);
+ defrag->total_failures++;
+ pthread_mutex_unlock (&dm_stat_mutex);
+ } else {
+ pthread_mutex_lock (&dm_stat_mutex);
+ defrag->total_files++;
+ pthread_mutex_unlock (&dm_stat_mutex);
+ }
+ total_status = total_status + per_file_status;
+ per_link_status = 0;
+ per_file_status = 0;
+ query_record_str[0] = '\0';
+ }
+
+out:
+ if (link_buffer)
+ free (link_buffer);
+ gfdb_link_info_fini (&link_info);
+ if (migrate_data)
+ dict_unref (migrate_data);
+ gfdb_query_record_fini (&query_record);
+ return total_status;
+}
+
+
+/*This is the call back function per record/file from data base*/
+static int
+tier_gf_query_callback (gfdb_query_record_t *gfdb_query_record,
+ void *_args) {
+ int ret = -1;
+ char gfid_str[UUID_CANONICAL_FORM_LEN] = "";
+ query_cbk_args_t *query_cbk_args = _args;
+
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args, out);
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->defrag, out);
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->queryFILE, out);
+
+ uuid_unparse (gfdb_query_record->gfid, gfid_str);
+ fprintf (query_cbk_args->queryFILE, "%s|%s|%ld\n", gfid_str,
+ gfdb_query_record->_link_info_str,
+ gfdb_query_record->link_info_size);
+
+ pthread_mutex_lock (&dm_stat_mutex);
+ query_cbk_args->defrag->num_files_lookedup++;
+ pthread_mutex_unlock (&dm_stat_mutex);
+
+ ret = 0;
+out:
+ return ret;
+}
+
+/*This is the call back function for each brick from hot/cold bricklist
+ * It picks up each bricks db and queries for eligible files for migration.
+ * The list of eligible files are populated in appropriate query files*/
+static int
+tier_process_brick_cbk (dict_t *brick_dict, char *key, data_t *value,
+ void *args) {
+ int ret = -1;
+ char *db_path = NULL;
+ query_cbk_args_t *query_cbk_args = NULL;
+ xlator_t *this = NULL;
+ gfdb_conn_node_t *conn_node = NULL;
+ dict_t *params_dict = NULL;
+ _gfdb_brick_dict_info_t *gfdb_brick_dict_info = args;
+
+ /*Init of all the essentials*/
+ GF_VALIDATE_OR_GOTO ("tier", gfdb_brick_dict_info , out);
+ query_cbk_args = gfdb_brick_dict_info->_query_cbk_args;
+
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->this, out);
+ this = query_cbk_args->this;
+
+ GF_VALIDATE_OR_GOTO (this->name,
+ gfdb_brick_dict_info->_query_cbk_args, out);
+
+ GF_VALIDATE_OR_GOTO (this->name, value, out);
+ db_path = data_to_str(value);
+
+ /*Preparing DB parameters before init_db i.e getting db connection*/
+ params_dict = dict_new ();
+ if (!params_dict) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "DB Params cannot initialized!");
+ goto out;
+ }
+ SET_DB_PARAM_TO_DICT(this->name, params_dict, GFDB_SQL_PARAM_DBPATH,
+ db_path, ret, out);
+
+ /*Get the db connection*/
+ conn_node = init_db((void *)params_dict, dht_tier_db_type);
+ if (!conn_node) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "FATAL: Failed initializing db operations");
+ goto out;
+ }
+
+ /*Query for eligible files from db*/
+ query_cbk_args->queryFILE = fopen(GET_QFILE_PATH
+ (gfdb_brick_dict_info->_gfdb_promote), "a+");
+ if (!query_cbk_args->queryFILE) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
+ "Failed to open query file %s:%s",
+ GET_QFILE_PATH
+ (gfdb_brick_dict_info->_gfdb_promote),
+ strerror(errno));
+ goto out;
+ }
+ if (!gfdb_brick_dict_info->_gfdb_promote) {
+ if (query_cbk_args->defrag->write_freq_threshold == 0 &&
+ query_cbk_args->defrag->read_freq_threshold == 0) {
+ ret = find_unchanged_for_time(conn_node,
+ tier_gf_query_callback,
+ (void *)query_cbk_args,
+ gfdb_brick_dict_info->time_stamp);
+ } else {
+ ret = find_unchanged_for_time_freq(conn_node,
+ tier_gf_query_callback,
+ (void *)query_cbk_args,
+ gfdb_brick_dict_info->time_stamp,
+ query_cbk_args->defrag->
+ write_freq_threshold,
+ query_cbk_args->defrag->
+ read_freq_threshold,
+ _gf_false);
+ }
+ } else {
+ if (query_cbk_args->defrag->write_freq_threshold == 0 &&
+ query_cbk_args->defrag->read_freq_threshold == 0) {
+ ret = find_recently_changed_files(conn_node,
+ tier_gf_query_callback,
+ (void *)query_cbk_args,
+ gfdb_brick_dict_info->time_stamp);
+ } else {
+ ret = find_recently_changed_files_freq(conn_node,
+ tier_gf_query_callback,
+ (void *)query_cbk_args,
+ gfdb_brick_dict_info->time_stamp,
+ query_cbk_args->defrag->
+ write_freq_threshold,
+ query_cbk_args->defrag->
+ read_freq_threshold,
+ _gf_false);
+ }
+ }
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "FATAL: query from db failed");
+ goto out;
+ }
+ ret = 0;
+out:
+ if (query_cbk_args->queryFILE) {
+ fclose (query_cbk_args->queryFILE);
+ query_cbk_args->queryFILE = NULL;
+ }
+ fini_db (conn_node);
+ return ret;
+}
+
+inline int
+tier_build_migration_qfile (demotion_args_t *args,
+ query_cbk_args_t *query_cbk_args,
+ gf_boolean_t is_promotion)
+{
+ gfdb_time_t current_time;
+ _gfdb_brick_dict_info_t gfdb_brick_dict_info;
+ gfdb_time_t time_in_past;
+ int ret = -1;
+
+ remove (GET_QFILE_PATH (is_promotion));
+ time_in_past.tv_sec = args->freq_time;
+ time_in_past.tv_usec = 0;
+ if (gettimeofday (&current_time, NULL) == -1) {
+ gf_log (args->this->name, GF_LOG_ERROR,
+ "Failed to get current timen");
+ goto out;
+ }
+ time_in_past.tv_sec = current_time.tv_sec - time_in_past.tv_sec;
+ time_in_past.tv_usec = current_time.tv_usec - time_in_past.tv_usec;
+ gfdb_brick_dict_info.time_stamp = &time_in_past;
+ gfdb_brick_dict_info._gfdb_promote = is_promotion;
+ gfdb_brick_dict_info._query_cbk_args = query_cbk_args;
+ ret = dict_foreach (args->brick_list, tier_process_brick_cbk,
+ &gfdb_brick_dict_info);
+ if (ret) {
+ gf_log (args->this->name, GF_LOG_ERROR,
+ "Brick query failedn");
+ goto out;
+ }
+out:
+ return ret;
+}
+
+inline int
+tier_migrate_files_using_qfile (demotion_args_t *comp,
+ query_cbk_args_t *query_cbk_args,
+ char *qfile)
+{
+ char renamed_file[PATH_MAX] = "";
+ int ret = -1;
+
+ query_cbk_args->queryFILE = fopen (qfile, "r");
+ if (!query_cbk_args->queryFILE) {
+ gf_log ("tier", GF_LOG_ERROR,
+ "Failed opening %s for migration", qfile);
+ goto out;
+ }
+ ret = tier_migrate_using_query_file ((void *)query_cbk_args);
+ fclose (query_cbk_args->queryFILE);
+ query_cbk_args->queryFILE = NULL;
+ if (ret) {
+ sprintf (renamed_file, "%s.err", qfile);
+ rename (qfile, renamed_file);
+ }
+out:
+ return ret;
+}
+
+/*Demotion Thread*/
+static void *
+tier_demote (void *args)
+{
+ int ret = -1;
+ query_cbk_args_t query_cbk_args;
+ demotion_args_t *demotion_args = args;
+
+ GF_VALIDATE_OR_GOTO ("tier", demotion_args, out);
+ GF_VALIDATE_OR_GOTO ("tier", demotion_args->this, out);
+ GF_VALIDATE_OR_GOTO (demotion_args->this->name,
+ demotion_args->brick_list, out);
+ GF_VALIDATE_OR_GOTO (demotion_args->this->name,
+ demotion_args->defrag, out);
+
+ query_cbk_args.this = demotion_args->this;
+ query_cbk_args.defrag = demotion_args->defrag;
+
+ /*Build the query file using bricklist*/
+ ret = tier_build_migration_qfile(demotion_args, &query_cbk_args,
+ _gf_false);
+ if (ret)
+ goto out;
+
+ /* Migrate files using the query file */
+ ret = tier_migrate_files_using_qfile (args,
+ &query_cbk_args, DEMOTION_QFILE);
+ if (ret)
+ goto out;
+
+out:
+ demotion_args->return_value = ret;
+ return NULL;
+}
+
+
+/*Promotion Thread*/
+static void
+*tier_promote (void *args)
+{
+ int ret = -1;
+ query_cbk_args_t query_cbk_args;
+ promotion_args_t *promotion_args = args;
+
+ GF_VALIDATE_OR_GOTO ("tier", promotion_args->this, out);
+ GF_VALIDATE_OR_GOTO (promotion_args->this->name,
+ promotion_args->brick_list, out);
+ GF_VALIDATE_OR_GOTO (promotion_args->this->name,
+ promotion_args->defrag, out);
+
+ query_cbk_args.this = promotion_args->this;
+ query_cbk_args.defrag = promotion_args->defrag;
+
+ /*Build the query file using bricklist*/
+ ret = tier_build_migration_qfile(promotion_args, &query_cbk_args,
+ _gf_true);
+ if (ret)
+ goto out;
+
+ /* Migrate files using the query file */
+ ret = tier_migrate_files_using_qfile (args,
+ &query_cbk_args,
+ PROMOTION_QFILE);
+ if (ret)
+ goto out;
+
+out:
+ promotion_args->return_value = ret;
+ return NULL;
+}
+
+static void
+tier_get_bricklist (xlator_t *xl, dict_t *bricklist)
+{
+ xlator_list_t *child = NULL;
+ char *rv = NULL;
+ char *rh = NULL;
+ char localhost[256] = {0};
+ char *db_path = "";
+ char *brickname = NULL;
+ char db_name[PATH_MAX] = "";
+ int ret = 0;
+
+ GF_VALIDATE_OR_GOTO ("tier", xl, out);
+ GF_VALIDATE_OR_GOTO ("tier", bricklist, out);
+
+ gethostname (localhost, sizeof (localhost));
+
+ /*
+ * This function obtains remote subvolumes and filters out only
+ * those running on the same node as the tier daemon.
+ */
+ if (strcmp(xl->type, "protocol/client") == 0) {
+ ret = dict_get_str(xl->options, "remote-host", &rh);
+ if (ret < 0)
+ goto out;
+
+ if (gf_is_local_addr (rh)) {
+
+ ret = dict_get_str(xl->options, "remote-subvolume",
+ &rv);
+ if (ret < 0)
+ goto out;
+ brickname = strrchr(rv, '/') + 1;
+ snprintf(db_name, sizeof(db_name), "%s.db",
+ brickname);
+ db_path = GF_CALLOC (PATH_MAX, 1, gf_common_mt_char);
+ if (!db_path) {
+ gf_msg ("tier", GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_STATUS,
+ "Failed to allocate memory for bricklist");
+ goto out;
+ }
+
+ sprintf(db_path, "%s/.glusterfs/%s", rv, db_name);
+ if (dict_add_dynstr_with_alloc(bricklist, "brick",
+ db_path))
+ goto out;
+ }
+ }
+
+ for (child = xl->children; child; child = child->next) {
+ tier_get_bricklist(child->xlator, bricklist);
+ }
+out:
+ return;
+}
+
+int
+tier_start (xlator_t *this, gf_defrag_info_t *defrag)
+{
+ dict_t *bricklist_cold = NULL;
+ dict_t *bricklist_hot = NULL;
+ dht_conf_t *conf = NULL;
+ int tick = 0;
+ int next_demote = 0;
+ int next_promote = 0;
+ int freq_promote = 0;
+ int freq_demote = 0;
+ promotion_args_t promotion_args = { 0 };
+ demotion_args_t demotion_args = { 0 };
+ int ret_promotion = 0;
+ int ret_demotion = 0;
+ int ret = 0;
+ pthread_t promote_thread;
+ pthread_t demote_thread;
+
+ conf = this->private;
+
+ bricklist_cold = dict_new();
+ if (!bricklist_cold)
+ return -1;
+
+ bricklist_hot = dict_new();
+ if (!bricklist_hot)
+ return -1;
+
+ tier_get_bricklist (conf->subvolumes[0], bricklist_cold);
+ tier_get_bricklist (conf->subvolumes[1], bricklist_hot);
+
+ freq_promote = defrag->tier_promote_frequency;
+ freq_demote = defrag->tier_demote_frequency;
+
+ next_promote = defrag->tier_promote_frequency % TIMER_SECS;
+ next_demote = defrag->tier_demote_frequency % TIMER_SECS;
+
+
+ gf_msg (this->name, GF_LOG_INFO, 0,
+ DHT_MSG_LOG_TIER_STATUS, "Begin run tier promote %d demote %d",
+ next_promote, next_demote);
+
+ defrag->defrag_status = GF_DEFRAG_STATUS_STARTED;
+
+ while (1) {
+
+ sleep(1);
+
+ ret_promotion = -1;
+ ret_demotion = -1;
+
+ if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) {
+ ret = 1;
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "defrag->defrag_status != "
+ "GF_DEFRAG_STATUS_STARTED");
+ goto out;
+ }
+
+ tick = (tick + 1) % TIMER_SECS;
+ if ((next_demote != tick) && (next_promote != tick))
+ continue;
+
+ if (next_demote >= tick) {
+ demotion_args.this = this;
+ demotion_args.brick_list = bricklist_hot;
+ demotion_args.defrag = defrag;
+ demotion_args.freq_time = freq_demote;
+ ret_demotion = pthread_create (&demote_thread, NULL,
+ &tier_demote, &demotion_args);
+ if (ret_demotion) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "Failed starting Demotion thread!");
+ }
+ freq_demote = defrag->tier_demote_frequency;
+ next_demote = (tick + freq_demote) % TIMER_SECS;
+ }
+
+ if (next_promote >= tick) {
+ promotion_args.this = this;
+ promotion_args.brick_list = bricklist_cold;
+ promotion_args.defrag = defrag;
+ promotion_args.freq_time = freq_promote;
+ ret_promotion = pthread_create (&promote_thread, NULL,
+ &tier_promote, &promotion_args);
+ if (ret_promotion) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "Failed starting Promotion thread!");
+ }
+ freq_promote = defrag->tier_promote_frequency;
+ next_promote = (tick + freq_promote) % TIMER_SECS;
+ }
+
+ if (ret_demotion == 0) {
+ pthread_join (demote_thread, NULL);
+ if (demotion_args.return_value) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "Demotion failed!");
+ }
+ ret_demotion = demotion_args.return_value;
+ }
+
+ if (ret_promotion == 0) {
+ pthread_join (promote_thread, NULL);
+ if (promotion_args.return_value) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "Promotion failed!");
+ }
+ ret_promotion = promotion_args.return_value;
+ }
+
+ /*Collect previous and current cummulative status */
+ ret = ret | ret_demotion | ret_promotion;
+
+ /*reseting promotion and demotion arguments for next iteration*/
+ memset (&demotion_args, 0, sizeof(demotion_args_t));
+ memset (&promotion_args, 0, sizeof(promotion_args_t));
+
+ }
+
+ ret = 0;
+out:
+
+ dict_unref(bricklist_cold);
+ dict_unref(bricklist_hot);
+
+ return ret;
+}
+
+int32_t
+tier_migration_needed (xlator_t *this)
+{
+ gf_defrag_info_t *defrag = NULL;
+ dht_conf_t *conf = NULL;
+ int ret = 0;
+
+ conf = this->private;
+
+ GF_VALIDATE_OR_GOTO (this->name, conf, out);
+ GF_VALIDATE_OR_GOTO (this->name, conf->defrag, out);
+
+ defrag = conf->defrag;
+
+ if (defrag->cmd == GF_DEFRAG_CMD_START_TIER)
+ ret = 1;
+out:
+ return ret;
+}
+
+int32_t
+tier_migration_get_dst (xlator_t *this, dht_local_t *local)
+{
+ dht_conf_t *conf = NULL;
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO("tier", this, out);
+ GF_VALIDATE_OR_GOTO(this->name, this->private, out);
+
+ conf = this->private;
+ if (!conf)
+ goto out;
+
+ if (conf->subvolumes[0] == local->cached_subvol)
+ local->rebalance.target_node =
+ conf->subvolumes[1];
+ else
+ local->rebalance.target_node =
+ conf->subvolumes[0];
+
+ if (local->rebalance.target_node)
+ ret = 0;
+
+out:
+ return ret;
+}
+
+xlator_t *
+tier_search (xlator_t *this, dht_layout_t *layout, const char *name)
+{
+ xlator_t *subvol = NULL;
+ void *value;
+ int search_first_subvol = 0;
+
+ GF_VALIDATE_OR_GOTO("tier", this, out);
+ GF_VALIDATE_OR_GOTO(this->name, layout, out);
+ GF_VALIDATE_OR_GOTO(this->name, name, out);
+
+ if (!dict_get_ptr (this->options, "rule", &value) &&
+ !strcmp(layout->list[0].xlator->name, value)) {
+ search_first_subvol = 1;
+ }
+
+ if (search_first_subvol)
+ subvol = layout->list[0].xlator;
+ else
+ subvol = layout->list[1].xlator;
+
+out:
+ return subvol;
+}
+
+
+dht_methods_t tier_methods = {
+ .migration_get_dst_subvol = tier_migration_get_dst,
+ .migration_other = tier_start,
+ .migration_needed = tier_migration_needed,
+ .layout_search = tier_search,
+};
+
+
+int
+tier_init (xlator_t *this)
+{
+ int ret = -1;
+ int freq = 0;
+ dht_conf_t *conf = NULL;
+ gf_defrag_info_t *defrag = NULL;
+
+ ret = dht_init(this);
+ if (ret) {
+ gf_msg(this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "dht_init failed");
+ goto out;
+ }
+
+ conf = this->private;
+
+ conf->methods = &tier_methods;
+
+ if (conf->subvolume_cnt != 2) {
+ gf_msg(this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "Invalid number of subvolumes %d", conf->subvolume_cnt);
+ goto out;
+ }
+
+ /* if instatiated from client side initialization is complete. */
+ if (!conf->defrag) {
+ ret = 0;
+ goto out;
+ }
+
+ defrag = conf->defrag;
+
+ GF_OPTION_INIT ("tier-promote-frequency",
+ defrag->tier_promote_frequency,
+ int32, out);
+
+ ret = dict_get_int32 (this->options,
+ "tier-promote-frequency", &freq);
+ if (ret) {
+ freq = DEFAULT_PROMOTE_FREQ_SEC;
+ }
+
+ defrag->tier_promote_frequency = freq;
+
+ GF_OPTION_INIT ("tier-demote-frequency",
+ defrag->tier_demote_frequency,
+ int32, out);
+
+ ret = dict_get_int32 (this->options,
+ "tier-demote-frequency", &freq);
+ if (ret) {
+ freq = DEFAULT_DEMOTE_FREQ_SEC;
+ }
+
+ defrag->tier_demote_frequency = freq;
+
+ GF_OPTION_INIT ("write-freq-threshold",
+ defrag->write_freq_threshold,
+ int32, out);
+
+ GF_OPTION_INIT ("read-freq-threshold",
+ defrag->read_freq_threshold,
+ int32, out);
+
+ gf_msg(this->name, GF_LOG_INFO, 0,
+ DHT_MSG_LOG_TIER_STATUS,
+ "Promote frequency set to %d demote set to %d",
+ defrag->tier_promote_frequency,
+ defrag->tier_demote_frequency);
+
+ ret = 0;
+
+out:
+ return ret;
+}
+
+
+int
+tier_reconfigure (xlator_t *this, dict_t *options)
+{
+ dht_conf_t *conf = NULL;
+ gf_defrag_info_t *defrag = NULL;
+
+ conf = this->private;
+
+ if (conf->defrag) {
+ defrag = conf->defrag;
+ GF_OPTION_RECONF ("tier-promote-frequency",
+ defrag->tier_promote_frequency, options,
+ int32, out);
+
+ GF_OPTION_RECONF ("tier-demote-frequency",
+ defrag->tier_demote_frequency, options,
+ int32, out);
+
+ GF_OPTION_RECONF ("write-freq-threshold",
+ defrag->write_freq_threshold, options,
+ int32, out);
+
+ GF_OPTION_RECONF ("read-freq-threshold",
+ defrag->read_freq_threshold, options,
+ int32, out);
+ }
+
+out:
+ return dht_reconfigure (this, options);
+}
+
+class_methods_t class_methods = {
+ .init = tier_init,
+ .fini = dht_fini,
+ .reconfigure = tier_reconfigure,
+ .notify = dht_notify
+};
+
+
+struct xlator_fops fops = {
+ .lookup = dht_lookup,
+ .create = dht_create,
+ .mknod = dht_mknod,
+
+ .stat = dht_stat,
+ .fstat = dht_fstat,
+ .truncate = dht_truncate,
+ .ftruncate = dht_ftruncate,
+ .access = dht_access,
+ .readlink = dht_readlink,
+ .setxattr = dht_setxattr,
+ .getxattr = dht_getxattr,
+ .removexattr = dht_removexattr,
+ .open = dht_open,
+ .readv = dht_readv,
+ .writev = dht_writev,
+ .flush = dht_flush,
+ .fsync = dht_fsync,
+ .statfs = dht_statfs,
+ .lk = dht_lk,
+ .opendir = dht_opendir,
+ .readdir = dht_readdir,
+ .readdirp = dht_readdirp,
+ .fsyncdir = dht_fsyncdir,
+ .symlink = dht_symlink,
+ .unlink = dht_unlink,
+ .link = dht_link,
+ .mkdir = dht_mkdir,
+ .rmdir = dht_rmdir,
+ .rename = dht_rename,
+ .inodelk = dht_inodelk,
+ .finodelk = dht_finodelk,
+ .entrylk = dht_entrylk,
+ .fentrylk = dht_fentrylk,
+ .xattrop = dht_xattrop,
+ .fxattrop = dht_fxattrop,
+ .setattr = dht_setattr,
+};
+
+
+struct xlator_cbks cbks = {
+ .forget = dht_forget
+};
+
diff --git a/xlators/cluster/dht/src/tier.h b/xlators/cluster/dht/src/tier.h
new file mode 100644
index 00000000000..73266050a5c
--- /dev/null
+++ b/xlators/cluster/dht/src/tier.h
@@ -0,0 +1,71 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _TIER_H_
+#define _TIER_H_
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+
+/******************************************************************************/
+/* This is from dht-rebalancer.c as we dont have dht-rebalancer.h */
+#include "dht-common.h"
+#include "xlator.h"
+#include <signal.h>
+#include <fnmatch.h>
+#include <signal.h>
+
+#define DEFAULT_PROMOTE_FREQ_SEC 120
+#define DEFAULT_DEMOTE_FREQ_SEC 120
+
+/*
+ * Size of timer wheel. We would not promote or demote lesd
+ * frequently than this number.
+ */
+#define TIMER_SECS 3600
+
+#include "gfdb_data_store.h"
+#include <ctype.h>
+#include <sys/xattr.h>
+#include <sys/stat.h>
+
+#define DEMOTION_QFILE "/var/run/gluster/demotequeryfile"
+#define PROMOTION_QFILE "/var/run/gluster/promotequeryfile"
+
+#define GET_QFILE_PATH(is_promotion)\
+ (is_promotion) ? PROMOTION_QFILE : DEMOTION_QFILE
+
+typedef struct _query_cbk_args {
+ xlator_t *this;
+ gf_defrag_info_t *defrag;
+ FILE *queryFILE;
+} query_cbk_args_t;
+
+int
+gf_run_tier(xlator_t *this, gf_defrag_info_t *defrag);
+
+typedef struct _gfdb_brick_dict_info {
+ gfdb_time_t *time_stamp;
+ gf_boolean_t _gfdb_promote;
+ query_cbk_args_t *_query_cbk_args;
+} _gfdb_brick_dict_info_t;
+
+typedef struct _dm_thread_args {
+ xlator_t *this;
+ gf_defrag_info_t *defrag;
+ dict_t *brick_list;
+ int freq_time;
+ int return_value;
+} promotion_args_t, demotion_args_t;
+
+#endif