summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/dht/src
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster/dht/src')
-rw-r--r--xlators/cluster/dht/src/Makefile.am11
-rw-r--r--xlators/cluster/dht/src/dht-common.c100
-rw-r--r--xlators/cluster/dht/src/dht-common.h29
-rw-r--r--xlators/cluster/dht/src/dht-helper.c22
-rw-r--r--xlators/cluster/dht/src/dht-inode-read.c1
-rw-r--r--xlators/cluster/dht/src/dht-messages.h19
-rw-r--r--xlators/cluster/dht/src/dht-rebalance.c28
-rw-r--r--xlators/cluster/dht/src/dht-shared.c31
-rw-r--r--xlators/cluster/dht/src/dht.c7
-rw-r--r--xlators/cluster/dht/src/nufa.c5
-rw-r--r--xlators/cluster/dht/src/tier.c1007
-rw-r--r--xlators/cluster/dht/src/tier.h71
12 files changed, 1300 insertions, 31 deletions
diff --git a/xlators/cluster/dht/src/Makefile.am b/xlators/cluster/dht/src/Makefile.am
index 8d02749f4d9..46dc4bb840f 100644
--- a/xlators/cluster/dht/src/Makefile.am
+++ b/xlators/cluster/dht/src/Makefile.am
@@ -1,4 +1,4 @@
-xlator_LTLIBRARIES = dht.la nufa.la switch.la
+xlator_LTLIBRARIES = dht.la nufa.la switch.la tier.la
xlatordir = $(libdir)/glusterfs/$(PACKAGE_VERSION)/xlator/cluster
dht_common_source = dht-layout.c dht-helper.c dht-linkfile.c dht-rebalance.c \
@@ -10,6 +10,7 @@ dht_la_SOURCES = $(dht_common_source) dht.c
nufa_la_SOURCES = $(dht_common_source) nufa.c
switch_la_SOURCES = $(dht_common_source) switch.c
+tier_la_SOURCES = $(dht_common_source) tier.c
dht_la_LDFLAGS = -module -avoid-version
dht_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
@@ -20,10 +21,16 @@ nufa_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
switch_la_LDFLAGS = -module -avoid-version
switch_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la
-noinst_HEADERS = dht-common.h dht-mem-types.h dht-messages.h dht-helper.h \
+tier_la_LDFLAGS = -module -avoid-version
+tier_la_LIBADD = $(top_builddir)/libglusterfs/src/libglusterfs.la\
+ $(top_builddir)/libglusterfs/src/gfdb/libgfdb.la
+AM_CFLAGS = -Wall $(GF_CFLAGS) $(SQLITE_CFLAGS)
+
+noinst_HEADERS = dht-common.h dht-mem-types.h dht-messages.h dht-helper.h tier.h\
$(top_builddir)/xlators/lib/src/libxlator.h
AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \
+ -I$(top_srcdir)/libglusterfs/src/gfdb \
-I$(top_srcdir)/xlators/lib/src
AM_CFLAGS = -Wall $(GF_CFLAGS)
diff --git a/xlators/cluster/dht/src/dht-common.c b/xlators/cluster/dht/src/dht-common.c
index 9fda4aa07d6..3a196e07be9 100644
--- a/xlators/cluster/dht/src/dht-common.c
+++ b/xlators/cluster/dht/src/dht-common.c
@@ -3175,6 +3175,7 @@ dht_setxattr (call_frame_t *frame, xlator_t *this,
xlator_t *subvol = NULL;
dht_local_t *local = NULL;
dht_conf_t *conf = NULL;
+ dht_methods_t *methods = NULL;
dht_layout_t *layout = NULL;
int i = 0;
int op_errno = EINVAL;
@@ -3191,6 +3192,10 @@ dht_setxattr (call_frame_t *frame, xlator_t *this,
VALIDATE_OR_GOTO (loc->inode, err);
conf = this->private;
+ GF_VALIDATE_OR_GOTO (this->name, conf, err);
+
+ methods = conf->methods;
+ GF_VALIDATE_OR_GOTO (this->name, conf->methods, err);
GF_IF_INTERNAL_XATTR_GOTO (conf->wild_xattr_name, xattr,
op_errno, err);
@@ -3255,8 +3260,8 @@ dht_setxattr (call_frame_t *frame, xlator_t *this,
goto err;
}
- local->rebalance.target_node =
- dht_subvol_get_hashed (this, &local->loc);
+ methods->migration_get_dst_subvol(this, local);
+
if (!local->rebalance.target_node) {
gf_msg (this->name, GF_LOG_ERROR, 0,
DHT_MSG_HASHED_SUBVOL_GET_FAILED,
@@ -3719,7 +3724,6 @@ dht_statfs (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
VALIDATE_OR_GOTO (frame, err);
VALIDATE_OR_GOTO (this, err);
VALIDATE_OR_GOTO (loc, err);
- VALIDATE_OR_GOTO (loc->inode, err);
VALIDATE_OR_GOTO (this->private, err);
conf = this->private;
@@ -3730,7 +3734,7 @@ dht_statfs (call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
goto err;
}
- if (IA_ISDIR (loc->inode->ia_type)) {
+ if (!loc->inode || IA_ISDIR (loc->inode->ia_type)) {
local->call_cnt = conf->subvolume_cnt;
for (i = 0; i < conf->subvolume_cnt; i++) {
@@ -3820,6 +3824,7 @@ dht_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int op_ret,
int count = 0;
dht_layout_t *layout = 0;
dht_conf_t *conf = NULL;
+ dht_methods_t *methods = NULL;
xlator_t *subvol = 0;
xlator_t *hashed_subvol = 0;
int ret = 0;
@@ -3828,7 +3833,12 @@ dht_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int op_ret,
INIT_LIST_HEAD (&entries.list);
prev = cookie;
local = frame->local;
+
conf = this->private;
+ GF_VALIDATE_OR_GOTO(this->name, conf, unwind);
+
+ methods = conf->methods;
+ GF_VALIDATE_OR_GOTO(this->name, conf->methods, done);
if (op_ret < 0)
goto done;
@@ -3867,8 +3877,8 @@ dht_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int op_ret,
}
- hashed_subvol = dht_layout_search (this, layout, \
- orig_entry->d_name);
+ hashed_subvol = methods->layout_search (this, layout,
+ orig_entry->d_name);
if (prev->this == hashed_subvol)
goto list;
@@ -3894,8 +3904,8 @@ list:
/* Do this if conf->search_unhashed is set to "auto" */
if (conf->search_unhashed == GF_DHT_LOOKUP_UNHASHED_AUTO) {
- subvol = dht_layout_search (this, layout,
- orig_entry->d_name);
+ subvol = methods->layout_search (this, layout,
+ orig_entry->d_name);
if (!subvol || (subvol != prev->this)) {
/* TODO: Count the number of entries which need
linkfile to prove its existence in fs */
@@ -4008,11 +4018,19 @@ dht_readdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int count = 0;
dht_layout_t *layout = 0;
xlator_t *subvol = 0;
+ dht_conf_t *conf = NULL;
+ dht_methods_t *methods = NULL;
INIT_LIST_HEAD (&entries.list);
prev = cookie;
local = frame->local;
+ conf = this->private;
+ GF_VALIDATE_OR_GOTO (this->name, conf, done);
+
+ methods = conf->methods;
+ GF_VALIDATE_OR_GOTO (this->name, conf->methods, done);
+
if (op_ret < 0)
goto done;
@@ -4024,7 +4042,8 @@ dht_readdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
list_for_each_entry (orig_entry, (&orig_entries->list), list) {
next_offset = orig_entry->d_off;
- subvol = dht_layout_search (this, layout, orig_entry->d_name);
+ subvol = methods->layout_search (this, layout,
+ orig_entry->d_name);
if (!subvol || (subvol == prev->this)) {
entry = gf_dirent_for_name (orig_entry->d_name);
@@ -6073,11 +6092,13 @@ dht_notify (xlator_t *this, int event, void *data, ...)
gf_defrag_type cmd = 0;
dict_t *output = NULL;
va_list ap;
-
+ dht_methods_t *methods = NULL;
conf = this->private;
- if (!conf)
- return ret;
+ GF_VALIDATE_OR_GOTO (this->name, conf, out);
+
+ methods = conf->methods;
+ GF_VALIDATE_OR_GOTO (this->name, methods, out);
/* had all subvolumes reported status once till now? */
had_heard_from_all = 1;
@@ -6271,12 +6292,18 @@ unlock:
* not need to handle CHILD_DOWN event here.
*/
if (conf->defrag) {
- ret = gf_thread_create (&conf->defrag->th, NULL,
- gf_defrag_start, this);
- if (ret) {
- conf->defrag = NULL;
+ if (methods->migration_needed(this)) {
+ ret = gf_thread_create(&conf->defrag->th,
+ NULL,
+ gf_defrag_start, this);
+ if (ret) {
+ GF_FREE (conf->defrag);
+ conf->defrag = NULL;
+ kill (getpid(), SIGTERM);
+ }
+ } else {
GF_FREE (conf->defrag);
- kill (getpid(), SIGTERM);
+ conf->defrag = NULL;
}
}
}
@@ -6284,7 +6311,7 @@ unlock:
ret = 0;
if (propagate)
ret = default_notify (this, event, data);
-
+out:
return ret;
}
@@ -6412,3 +6439,40 @@ dht_log_new_layout_for_dir_selfheal (xlator_t *this, loc_t *loc,
err:
GF_FREE (output_string);
}
+
+int32_t dht_migration_get_dst_subvol(xlator_t *this, dht_local_t *local)
+{
+ int ret = -1;
+
+ if (!local)
+ goto out;
+
+ local->rebalance.target_node =
+ dht_subvol_get_hashed (this, &local->loc);
+
+ if (local->rebalance.target_node)
+ ret = 0;
+
+out:
+ return ret;
+}
+
+int32_t dht_migration_needed(xlator_t *this)
+{
+ gf_defrag_info_t *defrag = NULL;
+ dht_conf_t *conf = NULL;
+ int ret = 0;
+
+ conf = this->private;
+
+ GF_VALIDATE_OR_GOTO ("dht", conf, out);
+ GF_VALIDATE_OR_GOTO ("dht", conf->defrag, out);
+
+ defrag = conf->defrag;
+
+ if (defrag->cmd != GF_DEFRAG_CMD_START_TIER)
+ ret = 1;
+
+out:
+ return ret;
+}
diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h
index 67e693146af..9145f336d7c 100644
--- a/xlators/cluster/dht/src/dht-common.h
+++ b/xlators/cluster/dht/src/dht-common.h
@@ -267,6 +267,8 @@ enum gf_defrag_type {
GF_DEFRAG_CMD_STATUS = 1 + 2,
GF_DEFRAG_CMD_START_LAYOUT_FIX = 1 + 3,
GF_DEFRAG_CMD_START_FORCE = 1 + 4,
+ GF_DEFRAG_CMD_START_TIER = 1 + 5,
+ GF_DEFRAG_CMD_STATUS_TIER = 1 + 6,
};
typedef enum gf_defrag_type gf_defrag_type;
@@ -310,10 +312,31 @@ struct gf_defrag_info_ {
struct timeval start_time;
gf_boolean_t stats;
gf_defrag_pattern_list_t *defrag_pattern;
+ int tier_promote_frequency;
+ int tier_demote_frequency;
+
+ /*Data Tiering params for scanner*/
+ uint64_t total_files_promoted;
+ uint64_t total_files_demoted;
+ int write_freq_threshold;
+ int read_freq_threshold;
};
typedef struct gf_defrag_info_ gf_defrag_info_t;
+struct dht_methods_s {
+ int32_t (*migration_get_dst_subvol)(xlator_t *this,
+ dht_local_t *local);
+ int32_t (*migration_other)(xlator_t *this,
+ gf_defrag_info_t *defrag);
+ int32_t (*migration_needed)(xlator_t *this);
+ xlator_t* (*layout_search)(xlator_t *this,
+ dht_layout_t *layout,
+ const char *name);
+};
+
+typedef struct dht_methods_s dht_methods_t;
+
struct dht_conf {
gf_lock_t subvolume_lock;
int subvolume_cnt;
@@ -371,6 +394,8 @@ struct dht_conf {
gf_boolean_t do_weighting;
gf_boolean_t randomize_by_gfid;
+ dht_methods_t *methods;
+
struct mem_pool *lock_pool;
};
typedef struct dht_conf dht_conf_t;
@@ -477,6 +502,10 @@ dht_layout_t *dht_layout_get (xlator_t *this, inode_t
dht_layout_t *dht_layout_for_subvol (xlator_t *this, xlator_t *subvol);
xlator_t *dht_layout_search (xlator_t *this, dht_layout_t *layout,
const char *name);
+int32_t
+dht_migration_get_dst_subvol(xlator_t *this, dht_local_t *local);
+int32_t
+dht_migration_needed(xlator_t *this);
int dht_layout_normalize (xlator_t *this, loc_t *loc, dht_layout_t *layout);
int dht_layout_anomalies (xlator_t *this, loc_t *loc, dht_layout_t *layout,
uint32_t *holes_p, uint32_t *overlaps_p,
diff --git a/xlators/cluster/dht/src/dht-helper.c b/xlators/cluster/dht/src/dht-helper.c
index f4e5305d791..346d19bec88 100644
--- a/xlators/cluster/dht/src/dht-helper.c
+++ b/xlators/cluster/dht/src/dht-helper.c
@@ -502,10 +502,18 @@ dht_subvol_get_hashed (xlator_t *this, loc_t *loc)
{
dht_layout_t *layout = NULL;
xlator_t *subvol = NULL;
+ dht_conf_t *conf = NULL;
+ dht_methods_t *methods = NULL;
GF_VALIDATE_OR_GOTO ("dht", this, out);
GF_VALIDATE_OR_GOTO (this->name, loc, out);
+ conf = this->private;
+ GF_VALIDATE_OR_GOTO (this->name, conf, out);
+
+ methods = conf->methods;
+ GF_VALIDATE_OR_GOTO (this->name, conf->methods, out);
+
if (__is_root_gfid (loc->gfid)) {
subvol = dht_first_up_subvol (this);
goto out;
@@ -523,7 +531,7 @@ dht_subvol_get_hashed (xlator_t *this, loc_t *loc)
goto out;
}
- subvol = dht_layout_search (this, layout, loc->name);
+ subvol = methods->layout_search (this, layout, loc->name);
if (!subvol) {
gf_msg_debug (this->name, 0,
@@ -846,6 +854,18 @@ dht_migration_complete_check_task (void *data)
SYNCTASK_SETID (frame->root->uid, frame->root->gid);
}
+ /*
+ * temporary check related to tier promoting/demoting the file;
+ * the lower level DHT detects the migration (due to sticky
+ * bits) when it is the responsibility of the tier translator
+ * to complete the rebalance transaction. It will be corrected
+ * when rebalance and tier migration are fixed to work together.
+ */
+ if (strcmp(this->parents->xlator->type, "cluster/tier") == 0) {
+ ret = 0;
+ goto out;
+ }
+
if (!ret)
dst_node = dht_linkfile_subvol (this, NULL, NULL, dict);
diff --git a/xlators/cluster/dht/src/dht-inode-read.c b/xlators/cluster/dht/src/dht-inode-read.c
index d78dd2ea0ef..78e3ef4233b 100644
--- a/xlators/cluster/dht/src/dht-inode-read.c
+++ b/xlators/cluster/dht/src/dht-inode-read.c
@@ -1064,7 +1064,6 @@ dht_inodelk (call_frame_t *frame, xlator_t *this, const char *volume,
VALIDATE_OR_GOTO (this, err);
VALIDATE_OR_GOTO (loc, err);
VALIDATE_OR_GOTO (loc->inode, err);
- VALIDATE_OR_GOTO (loc->path, err);
local = dht_local_init (frame, loc, NULL, GF_FOP_INODELK);
if (!local) {
diff --git a/xlators/cluster/dht/src/dht-messages.h b/xlators/cluster/dht/src/dht-messages.h
index f4096ae8f1e..eb4c356e3c2 100644
--- a/xlators/cluster/dht/src/dht-messages.h
+++ b/xlators/cluster/dht/src/dht-messages.h
@@ -441,12 +441,25 @@
#define DHT_MSG_LOG_FIXED_LAYOUT (GLFS_DHT_BASE + 36)
-/*------------*/
-#define glfs_msg_end_x GLFS_MSGID_END, "Invalid: End of messages"
+/*
+ * @messageid 109037
+ * @diagnosis Informational message regarding error in tier operation
+ * @recommendedaction None
+ */
+#define DHT_MSG_LOG_TIER_ERROR (GLFS_DHT_BASE + 37)
-#endif /* _DHT_MESSAGES_H_ */
+/*
+ * @messageid 109038
+ * @diagnosis Informational message regarding tier operation
+ * @recommendedaction None
+ */
+#define DHT_MSG_LOG_TIER_STATUS (GLFS_DHT_BASE + 38)
+/*------------*/
+#define glfs_msg_end_x GLFS_MSGID_END, "Invalid: End of messages"
+
+#endif /* _DHT_MESSAGES_H_ */
diff --git a/xlators/cluster/dht/src/dht-rebalance.c b/xlators/cluster/dht/src/dht-rebalance.c
index 3531872dd31..b838ecec4b7 100644
--- a/xlators/cluster/dht/src/dht-rebalance.c
+++ b/xlators/cluster/dht/src/dht-rebalance.c
@@ -19,6 +19,7 @@
#include <signal.h>
#include <fnmatch.h>
#include <signal.h>
+#include "tier.h"
#define GF_DISK_SECTOR_SIZE 512
#define DHT_REBALANCE_PID 4242 /* Change it if required */
@@ -919,6 +920,7 @@ dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to,
tmp_loc.inode = inode_ref (loc->inode);
uuid_copy (tmp_loc.gfid, loc->gfid);
+ tmp_loc.path = gf_strdup(loc->path);
ret = syncop_inodelk (from, DHT_FILE_MIGRATE_DOMAIN, &tmp_loc, F_SETLKW,
&flock, NULL, NULL);
@@ -984,6 +986,7 @@ dht_migrate_file (xlator_t *this, loc_t *loc, xlator_t *from, xlator_t *to,
goto out;
ret = __dht_check_free_space (to, from, loc, &stbuf, flag);
+
if (ret) {
goto out;
}
@@ -1713,7 +1716,8 @@ gf_defrag_fix_layout (xlator_t *this, gf_defrag_info_t *defrag, loc_t *loc,
goto out;
}
- if (defrag->cmd != GF_DEFRAG_CMD_START_LAYOUT_FIX) {
+ if ((defrag->cmd != GF_DEFRAG_CMD_START_TIER) &&
+ (defrag->cmd != GF_DEFRAG_CMD_START_LAYOUT_FIX)) {
ret = gf_defrag_migrate_data (this, defrag, loc, migrate_data);
if (ret)
goto out;
@@ -1863,7 +1867,6 @@ out:
}
-
int
gf_defrag_start_crawl (void *data)
{
@@ -1878,6 +1881,7 @@ gf_defrag_start_crawl (void *data)
dict_t *migrate_data = NULL;
dict_t *status = NULL;
glusterfs_ctx_t *ctx = NULL;
+ dht_methods_t *methods = NULL;
this = data;
if (!this)
@@ -1942,7 +1946,8 @@ gf_defrag_start_crawl (void *data)
goto out;
}
- if (defrag->cmd != GF_DEFRAG_CMD_START_LAYOUT_FIX) {
+ if ((defrag->cmd != GF_DEFRAG_CMD_START_TIER) &&
+ (defrag->cmd != GF_DEFRAG_CMD_START_LAYOUT_FIX)) {
migrate_data = dict_new ();
if (!migrate_data) {
ret = -1;
@@ -1959,15 +1964,28 @@ gf_defrag_start_crawl (void *data)
if (ret)
goto out;
}
+
ret = gf_defrag_fix_layout (this, defrag, &loc, fix_layout,
migrate_data);
+
+ if (defrag->cmd == GF_DEFRAG_CMD_START_TIER) {
+ methods = conf->methods;
+ if (!methods) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "Methods invalid for translator.");
+ defrag->defrag_status = GF_DEFRAG_STATUS_FAILED;
+ ret = -1;
+ goto out;
+ }
+ methods->migration_other(this, defrag);
+ }
+
if ((defrag->defrag_status != GF_DEFRAG_STATUS_STOPPED) &&
(defrag->defrag_status != GF_DEFRAG_STATUS_FAILED)) {
defrag->defrag_status = GF_DEFRAG_STATUS_COMPLETE;
}
-
-
out:
LOCK (&defrag->lock);
{
diff --git a/xlators/cluster/dht/src/dht-shared.c b/xlators/cluster/dht/src/dht-shared.c
index 860f3e716f0..1e666bd8140 100644
--- a/xlators/cluster/dht/src/dht-shared.c
+++ b/xlators/cluster/dht/src/dht-shared.c
@@ -29,6 +29,8 @@
*/
struct volume_options options[];
+extern dht_methods_t dht_methods;
+
void
dht_layout_dump (dht_layout_t *layout, const char *prefix)
{
@@ -701,6 +703,8 @@ dht_init (xlator_t *this)
if (dht_set_subvol_range(this))
goto err;
+ conf->methods = &dht_methods;
+
return 0;
err:
@@ -847,6 +851,33 @@ struct volume_options options[] = {
.type = GF_OPTION_TYPE_XLATOR
},
+ /* tier options */
+ { .key = {"tier-promote-frequency"},
+ .type = GF_OPTION_TYPE_INT,
+ .default_value = "120",
+ .description = "Frequency to promote files to fast tier"
+ },
+
+ { .key = {"tier-demote-frequency"},
+ .type = GF_OPTION_TYPE_INT,
+ .default_value = "120",
+ .description = "Frequency to demote files to slow tier"
+ },
+
+ { .key = {"write-freq-thresold"},
+ .type = GF_OPTION_TYPE_INT,
+ .default_value = "0",
+ .description = "Defines the write fequency "
+ "that would be considered hot"
+ },
+
+ { .key = {"read-freq-thresold"},
+ .type = GF_OPTION_TYPE_INT,
+ .default_value = "0",
+ .description = "Defines the read fequency "
+ "that would be considered hot"
+ },
+
/* switch option */
{ .key = {"pattern.switch.case"},
.type = GF_OPTION_TYPE_ANY
diff --git a/xlators/cluster/dht/src/dht.c b/xlators/cluster/dht/src/dht.c
index fc0ca2f7735..3934df5ec64 100644
--- a/xlators/cluster/dht/src/dht.c
+++ b/xlators/cluster/dht/src/dht.c
@@ -17,6 +17,12 @@
#include "statedump.h"
#include "dht-common.h"
+dht_methods_t dht_methods = {
+ .migration_get_dst_subvol = dht_migration_get_dst_subvol,
+ .migration_needed = dht_migration_needed,
+ .layout_search = dht_layout_search,
+};
+
class_methods_t class_methods = {
.init = dht_init,
.fini = dht_fini,
@@ -86,4 +92,3 @@ struct xlator_cbks cbks = {
// .releasedir = dht_releasedir,
.forget = dht_forget
};
-;
diff --git a/xlators/cluster/dht/src/nufa.c b/xlators/cluster/dht/src/nufa.c
index f188a5479f4..72d6d9c10e5 100644
--- a/xlators/cluster/dht/src/nufa.c
+++ b/xlators/cluster/dht/src/nufa.c
@@ -621,6 +621,11 @@ nufa_init (xlator_t *this)
return 0;
}
+dht_methods_t dht_methods = {
+ .migration_get_dst_subvol = dht_migration_get_dst_subvol,
+ .migration_needed = dht_migration_needed,
+ .layout_search = dht_layout_search,
+};
class_methods_t class_methods = {
.init = nufa_init,
diff --git a/xlators/cluster/dht/src/tier.c b/xlators/cluster/dht/src/tier.c
new file mode 100644
index 00000000000..028a42f7a1a
--- /dev/null
+++ b/xlators/cluster/dht/src/tier.c
@@ -0,0 +1,1007 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "dht-common.h"
+#include "tier.h"
+
+/*Hard coded DB info*/
+static gfdb_db_type_t dht_tier_db_type = GFDB_SQLITE3;
+/*Hard coded DB info*/
+
+/*Mutex for updating the data movement stats*/
+static pthread_mutex_t dm_stat_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+#define DB_QUERY_RECORD_SIZE 4096
+
+static int
+tier_parse_query_str (char *query_record_str,
+ char *gfid, char *link_buffer, ssize_t *link_size)
+{
+ char *token_str = NULL;
+ char *delimiter = "|";
+ char *saveptr = NULL;
+ int ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("tier", query_record_str, out);
+ GF_VALIDATE_OR_GOTO ("tier", gfid, out);
+ GF_VALIDATE_OR_GOTO ("tier", link_buffer, out);
+ GF_VALIDATE_OR_GOTO ("tier", link_size, out);
+
+ token_str = strtok_r (query_record_str, delimiter, &saveptr);
+ if (!token_str)
+ goto out;
+
+ strcpy (gfid, token_str);
+
+
+ token_str = strtok_r (NULL, delimiter, &saveptr);
+ if (!token_str)
+ goto out;
+
+ strcpy (link_buffer, token_str);
+
+ token_str = strtok_r (NULL, delimiter, &saveptr);
+ if (!token_str)
+ goto out;
+
+ *link_size = atoi (token_str);
+
+ ret = 0;
+out:
+ return ret;
+}
+
+static int
+tier_migrate_using_query_file (void *_args)
+{
+ int ret = -1;
+ char gfid_str[UUID_CANONICAL_FORM_LEN+1] = "";
+ char query_record_str[4096] = "";
+ query_cbk_args_t *query_cbk_args = (query_cbk_args_t *) _args;
+ xlator_t *this = NULL;
+ gf_defrag_info_t *defrag = NULL;
+ char *token_str = NULL;
+ char *delimiter = "::";
+ char *link_buffer = NULL;
+ gfdb_query_record_t *query_record = NULL;
+ gfdb_link_info_t *link_info = NULL;
+ struct iatt par_stbuf = {0,};
+ struct iatt current = {0,};
+ loc_t p_loc = {0,};
+ loc_t loc = {0,};
+ dict_t *migrate_data = NULL;
+ inode_t *linked_inode = NULL;
+ int per_file_status = 0;
+ int per_link_status = 0;
+ int total_status = 0;
+ FILE *queryFILE = NULL;
+ char *link_str = NULL;
+
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args, out);
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->this, out);
+ this = query_cbk_args->this;
+ GF_VALIDATE_OR_GOTO (this->name, query_cbk_args->defrag, out);
+ GF_VALIDATE_OR_GOTO (this->name, query_cbk_args->queryFILE, out);
+
+ defrag = query_cbk_args->defrag;
+
+ queryFILE = query_cbk_args->queryFILE;
+
+ query_record = gfdb_query_record_init();
+ if (!query_record) {
+ goto out;
+ }
+
+ query_record->_link_info_str = calloc (DB_QUERY_RECORD_SIZE, 1);
+ if (!query_record->_link_info_str) {
+ goto out;
+ }
+ link_buffer = query_record->_link_info_str;
+
+ link_info = gfdb_link_info_init ();
+
+ migrate_data = dict_new ();
+ if (!migrate_data)
+ goto out;
+
+ /* Per file */
+ while (fscanf (queryFILE, "%s", query_record_str) != EOF) {
+
+ per_file_status = 0;
+ per_link_status = 0;
+
+ memset (gfid_str, 0, UUID_CANONICAL_FORM_LEN+1);
+ memset (query_record->_link_info_str, 0, DB_QUERY_RECORD_SIZE);
+
+ if (tier_parse_query_str (query_record_str, gfid_str,
+ link_buffer,
+ &query_record->link_info_size)) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "failed parsing %s\n", query_record_str);
+ continue;
+ }
+
+ uuid_parse (gfid_str, query_record->gfid);
+
+ if (dict_get(migrate_data, GF_XATTR_FILE_MIGRATE_KEY))
+ dict_del(migrate_data, GF_XATTR_FILE_MIGRATE_KEY);
+
+ if (dict_get(migrate_data, "from.migrator"))
+ dict_del(migrate_data, "from.migrator");
+
+ token_str = strtok (link_buffer, delimiter);
+ if (token_str != NULL) {
+ per_file_status =
+ dict_set_str (migrate_data,
+ GF_XATTR_FILE_MIGRATE_KEY,
+ "force");
+ if (per_file_status) {
+ goto per_file_out;
+ }
+
+ /* Flag to suggest the xattr call is from migrator */
+ per_file_status = dict_set_str (migrate_data,
+ "from.migrator", "yes");
+ if (per_file_status) {
+ goto per_file_out;
+ }
+ }
+ per_link_status = 0;
+ /* Per link of file */
+ while (token_str != NULL) {
+
+ link_str = gf_strdup (token_str);
+
+ if (!link_info) {
+ per_link_status = -1;
+ goto per_file_out;
+ }
+
+ memset (link_info, 0, sizeof(gfdb_link_info_t));
+
+ ret = str_to_link_info (link_str, link_info);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "failed parsing %s\n", link_str);
+ per_link_status = -1;
+ goto error;
+ }
+
+ uuid_copy (p_loc.gfid, link_info->pargfid);
+
+ p_loc.inode = inode_new (defrag->root_inode->table);
+ if (!p_loc.inode) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "failed parsing %s\n", link_str);
+ per_link_status = -1;
+ goto error;
+ }
+
+ ret = syncop_lookup (this, &p_loc, NULL, &par_stbuf,
+ NULL, NULL);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ " ERROR in parent lookup\n");
+ per_link_status = -1;
+ goto error;
+ }
+ linked_inode = inode_link (p_loc.inode, NULL, NULL,
+ &par_stbuf);
+ inode_unref (p_loc.inode);
+ p_loc.inode = linked_inode;
+
+ uuid_copy (loc.gfid, query_record->gfid);
+ loc.inode = inode_new (defrag->root_inode->table);
+ uuid_copy (loc.pargfid, link_info->pargfid);
+ loc.parent = inode_ref(p_loc.inode);
+
+ loc.name = gf_strdup (link_info->file_name);
+ if (!loc.name) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR, "ERROR in "
+ "memory allocation\n");
+ per_link_status = -1;
+ goto error;
+ }
+
+ loc.path = gf_strdup (link_info->file_path);
+ if (!loc.path) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR, "ERROR in "
+ "memory allocation\n");
+ per_link_status = -1;
+ goto error;
+ }
+
+ uuid_copy (loc.parent->gfid, link_info->pargfid);
+
+ ret = syncop_lookup (this, &loc, NULL, &current,
+ NULL, NULL);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR, "ERROR in "
+ "current lookup\n");
+ per_link_status = -1;
+ goto error;
+ }
+ linked_inode = inode_link (loc.inode, NULL, NULL,
+ &current);
+ inode_unref (loc.inode);
+ loc.inode = linked_inode;
+
+ gf_msg (this->name, GF_LOG_INFO, 0,
+ DHT_MSG_LOG_TIER_STATUS, "Tier migrate file %s",
+ loc.name);
+
+ ret = syncop_setxattr (this, &loc, migrate_data, 0);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR, "ERROR %d in "
+ "current migration %s %s\n", ret,
+ loc.name,
+ loc.path);
+ per_link_status = -1;
+ goto error;
+ }
+ inode_unref (loc.inode);
+ inode_unref (loc.parent);
+ inode_unref (p_loc.inode);
+error:
+ if (loc.name) {
+ GF_FREE ((char *) loc.name);
+ }
+ if (loc.path) {
+ GF_FREE ((char *) loc.path);
+ }
+ token_str = NULL;
+ token_str = strtok (NULL, delimiter);
+ GF_FREE (link_str);
+ }
+ per_file_status = per_link_status;
+per_file_out:
+ if (per_file_status) {
+ pthread_mutex_lock (&dm_stat_mutex);
+ defrag->total_failures++;
+ pthread_mutex_unlock (&dm_stat_mutex);
+ } else {
+ pthread_mutex_lock (&dm_stat_mutex);
+ defrag->total_files++;
+ pthread_mutex_unlock (&dm_stat_mutex);
+ }
+ total_status = total_status + per_file_status;
+ per_link_status = 0;
+ per_file_status = 0;
+ query_record_str[0] = '\0';
+ }
+
+out:
+ if (link_buffer)
+ free (link_buffer);
+ gfdb_link_info_fini (&link_info);
+ if (migrate_data)
+ dict_unref (migrate_data);
+ gfdb_query_record_fini (&query_record);
+ return total_status;
+}
+
+
+/*This is the call back function per record/file from data base*/
+static int
+tier_gf_query_callback (gfdb_query_record_t *gfdb_query_record,
+ void *_args) {
+ int ret = -1;
+ char gfid_str[UUID_CANONICAL_FORM_LEN] = "";
+ query_cbk_args_t *query_cbk_args = _args;
+
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args, out);
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->defrag, out);
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->queryFILE, out);
+
+ uuid_unparse (gfdb_query_record->gfid, gfid_str);
+ fprintf (query_cbk_args->queryFILE, "%s|%s|%ld\n", gfid_str,
+ gfdb_query_record->_link_info_str,
+ gfdb_query_record->link_info_size);
+
+ pthread_mutex_lock (&dm_stat_mutex);
+ query_cbk_args->defrag->num_files_lookedup++;
+ pthread_mutex_unlock (&dm_stat_mutex);
+
+ ret = 0;
+out:
+ return ret;
+}
+
+/*This is the call back function for each brick from hot/cold bricklist
+ * It picks up each bricks db and queries for eligible files for migration.
+ * The list of eligible files are populated in appropriate query files*/
+static int
+tier_process_brick_cbk (dict_t *brick_dict, char *key, data_t *value,
+ void *args) {
+ int ret = -1;
+ char *db_path = NULL;
+ query_cbk_args_t *query_cbk_args = NULL;
+ xlator_t *this = NULL;
+ gfdb_conn_node_t *conn_node = NULL;
+ dict_t *params_dict = NULL;
+ _gfdb_brick_dict_info_t *gfdb_brick_dict_info = args;
+
+ /*Init of all the essentials*/
+ GF_VALIDATE_OR_GOTO ("tier", gfdb_brick_dict_info , out);
+ query_cbk_args = gfdb_brick_dict_info->_query_cbk_args;
+
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->this, out);
+ this = query_cbk_args->this;
+
+ GF_VALIDATE_OR_GOTO (this->name,
+ gfdb_brick_dict_info->_query_cbk_args, out);
+
+ GF_VALIDATE_OR_GOTO (this->name, value, out);
+ db_path = data_to_str(value);
+
+ /*Preparing DB parameters before init_db i.e getting db connection*/
+ params_dict = dict_new ();
+ if (!params_dict) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "DB Params cannot initialized!");
+ goto out;
+ }
+ SET_DB_PARAM_TO_DICT(this->name, params_dict, GFDB_SQL_PARAM_DBPATH,
+ db_path, ret, out);
+
+ /*Get the db connection*/
+ conn_node = init_db((void *)params_dict, dht_tier_db_type);
+ if (!conn_node) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "FATAL: Failed initializing db operations");
+ goto out;
+ }
+
+ /*Query for eligible files from db*/
+ query_cbk_args->queryFILE = fopen(GET_QFILE_PATH
+ (gfdb_brick_dict_info->_gfdb_promote), "a+");
+ if (!query_cbk_args->queryFILE) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
+ "Failed to open query file %s:%s",
+ GET_QFILE_PATH
+ (gfdb_brick_dict_info->_gfdb_promote),
+ strerror(errno));
+ goto out;
+ }
+ if (!gfdb_brick_dict_info->_gfdb_promote) {
+ if (query_cbk_args->defrag->write_freq_threshold == 0 &&
+ query_cbk_args->defrag->read_freq_threshold == 0) {
+ ret = find_unchanged_for_time(conn_node,
+ tier_gf_query_callback,
+ (void *)query_cbk_args,
+ gfdb_brick_dict_info->time_stamp);
+ } else {
+ ret = find_unchanged_for_time_freq(conn_node,
+ tier_gf_query_callback,
+ (void *)query_cbk_args,
+ gfdb_brick_dict_info->time_stamp,
+ query_cbk_args->defrag->
+ write_freq_threshold,
+ query_cbk_args->defrag->
+ read_freq_threshold,
+ _gf_false);
+ }
+ } else {
+ if (query_cbk_args->defrag->write_freq_threshold == 0 &&
+ query_cbk_args->defrag->read_freq_threshold == 0) {
+ ret = find_recently_changed_files(conn_node,
+ tier_gf_query_callback,
+ (void *)query_cbk_args,
+ gfdb_brick_dict_info->time_stamp);
+ } else {
+ ret = find_recently_changed_files_freq(conn_node,
+ tier_gf_query_callback,
+ (void *)query_cbk_args,
+ gfdb_brick_dict_info->time_stamp,
+ query_cbk_args->defrag->
+ write_freq_threshold,
+ query_cbk_args->defrag->
+ read_freq_threshold,
+ _gf_false);
+ }
+ }
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "FATAL: query from db failed");
+ goto out;
+ }
+ ret = 0;
+out:
+ if (query_cbk_args->queryFILE) {
+ fclose (query_cbk_args->queryFILE);
+ query_cbk_args->queryFILE = NULL;
+ }
+ fini_db (conn_node);
+ return ret;
+}
+
+inline int
+tier_build_migration_qfile (demotion_args_t *args,
+ query_cbk_args_t *query_cbk_args,
+ gf_boolean_t is_promotion)
+{
+ gfdb_time_t current_time;
+ _gfdb_brick_dict_info_t gfdb_brick_dict_info;
+ gfdb_time_t time_in_past;
+ int ret = -1;
+
+ remove (GET_QFILE_PATH (is_promotion));
+ time_in_past.tv_sec = args->freq_time;
+ time_in_past.tv_usec = 0;
+ if (gettimeofday (&current_time, NULL) == -1) {
+ gf_log (args->this->name, GF_LOG_ERROR,
+ "Failed to get current timen");
+ goto out;
+ }
+ time_in_past.tv_sec = current_time.tv_sec - time_in_past.tv_sec;
+ time_in_past.tv_usec = current_time.tv_usec - time_in_past.tv_usec;
+ gfdb_brick_dict_info.time_stamp = &time_in_past;
+ gfdb_brick_dict_info._gfdb_promote = is_promotion;
+ gfdb_brick_dict_info._query_cbk_args = query_cbk_args;
+ ret = dict_foreach (args->brick_list, tier_process_brick_cbk,
+ &gfdb_brick_dict_info);
+ if (ret) {
+ gf_log (args->this->name, GF_LOG_ERROR,
+ "Brick query failedn");
+ goto out;
+ }
+out:
+ return ret;
+}
+
+inline int
+tier_migrate_files_using_qfile (demotion_args_t *comp,
+ query_cbk_args_t *query_cbk_args,
+ char *qfile)
+{
+ char renamed_file[PATH_MAX] = "";
+ int ret = -1;
+
+ query_cbk_args->queryFILE = fopen (qfile, "r");
+ if (!query_cbk_args->queryFILE) {
+ gf_log ("tier", GF_LOG_ERROR,
+ "Failed opening %s for migration", qfile);
+ goto out;
+ }
+ ret = tier_migrate_using_query_file ((void *)query_cbk_args);
+ fclose (query_cbk_args->queryFILE);
+ query_cbk_args->queryFILE = NULL;
+ if (ret) {
+ sprintf (renamed_file, "%s.err", qfile);
+ rename (qfile, renamed_file);
+ }
+out:
+ return ret;
+}
+
+/*Demotion Thread*/
+static void *
+tier_demote (void *args)
+{
+ int ret = -1;
+ query_cbk_args_t query_cbk_args;
+ demotion_args_t *demotion_args = args;
+
+ GF_VALIDATE_OR_GOTO ("tier", demotion_args, out);
+ GF_VALIDATE_OR_GOTO ("tier", demotion_args->this, out);
+ GF_VALIDATE_OR_GOTO (demotion_args->this->name,
+ demotion_args->brick_list, out);
+ GF_VALIDATE_OR_GOTO (demotion_args->this->name,
+ demotion_args->defrag, out);
+
+ query_cbk_args.this = demotion_args->this;
+ query_cbk_args.defrag = demotion_args->defrag;
+
+ /*Build the query file using bricklist*/
+ ret = tier_build_migration_qfile(demotion_args, &query_cbk_args,
+ _gf_false);
+ if (ret)
+ goto out;
+
+ /* Migrate files using the query file */
+ ret = tier_migrate_files_using_qfile (args,
+ &query_cbk_args, DEMOTION_QFILE);
+ if (ret)
+ goto out;
+
+out:
+ demotion_args->return_value = ret;
+ return NULL;
+}
+
+
+/*Promotion Thread*/
+static void
+*tier_promote (void *args)
+{
+ int ret = -1;
+ query_cbk_args_t query_cbk_args;
+ promotion_args_t *promotion_args = args;
+
+ GF_VALIDATE_OR_GOTO ("tier", promotion_args->this, out);
+ GF_VALIDATE_OR_GOTO (promotion_args->this->name,
+ promotion_args->brick_list, out);
+ GF_VALIDATE_OR_GOTO (promotion_args->this->name,
+ promotion_args->defrag, out);
+
+ query_cbk_args.this = promotion_args->this;
+ query_cbk_args.defrag = promotion_args->defrag;
+
+ /*Build the query file using bricklist*/
+ ret = tier_build_migration_qfile(promotion_args, &query_cbk_args,
+ _gf_true);
+ if (ret)
+ goto out;
+
+ /* Migrate files using the query file */
+ ret = tier_migrate_files_using_qfile (args,
+ &query_cbk_args,
+ PROMOTION_QFILE);
+ if (ret)
+ goto out;
+
+out:
+ promotion_args->return_value = ret;
+ return NULL;
+}
+
+static void
+tier_get_bricklist (xlator_t *xl, dict_t *bricklist)
+{
+ xlator_list_t *child = NULL;
+ char *rv = NULL;
+ char *rh = NULL;
+ char localhost[256] = {0};
+ char *db_path = "";
+ char *brickname = NULL;
+ char db_name[PATH_MAX] = "";
+ int ret = 0;
+
+ GF_VALIDATE_OR_GOTO ("tier", xl, out);
+ GF_VALIDATE_OR_GOTO ("tier", bricklist, out);
+
+ gethostname (localhost, sizeof (localhost));
+
+ /*
+ * This function obtains remote subvolumes and filters out only
+ * those running on the same node as the tier daemon.
+ */
+ if (strcmp(xl->type, "protocol/client") == 0) {
+ ret = dict_get_str(xl->options, "remote-host", &rh);
+ if (ret < 0)
+ goto out;
+
+ if (gf_is_local_addr (rh)) {
+
+ ret = dict_get_str(xl->options, "remote-subvolume",
+ &rv);
+ if (ret < 0)
+ goto out;
+ brickname = strrchr(rv, '/') + 1;
+ snprintf(db_name, sizeof(db_name), "%s.db",
+ brickname);
+ db_path = GF_CALLOC (PATH_MAX, 1, gf_common_mt_char);
+ if (!db_path) {
+ gf_msg ("tier", GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_STATUS,
+ "Failed to allocate memory for bricklist");
+ goto out;
+ }
+
+ sprintf(db_path, "%s/.glusterfs/%s", rv, db_name);
+ if (dict_add_dynstr_with_alloc(bricklist, "brick",
+ db_path))
+ goto out;
+ }
+ }
+
+ for (child = xl->children; child; child = child->next) {
+ tier_get_bricklist(child->xlator, bricklist);
+ }
+out:
+ return;
+}
+
+int
+tier_start (xlator_t *this, gf_defrag_info_t *defrag)
+{
+ dict_t *bricklist_cold = NULL;
+ dict_t *bricklist_hot = NULL;
+ dht_conf_t *conf = NULL;
+ int tick = 0;
+ int next_demote = 0;
+ int next_promote = 0;
+ int freq_promote = 0;
+ int freq_demote = 0;
+ promotion_args_t promotion_args = { 0 };
+ demotion_args_t demotion_args = { 0 };
+ int ret_promotion = 0;
+ int ret_demotion = 0;
+ int ret = 0;
+ pthread_t promote_thread;
+ pthread_t demote_thread;
+
+ conf = this->private;
+
+ bricklist_cold = dict_new();
+ if (!bricklist_cold)
+ return -1;
+
+ bricklist_hot = dict_new();
+ if (!bricklist_hot)
+ return -1;
+
+ tier_get_bricklist (conf->subvolumes[0], bricklist_cold);
+ tier_get_bricklist (conf->subvolumes[1], bricklist_hot);
+
+ freq_promote = defrag->tier_promote_frequency;
+ freq_demote = defrag->tier_demote_frequency;
+
+ next_promote = defrag->tier_promote_frequency % TIMER_SECS;
+ next_demote = defrag->tier_demote_frequency % TIMER_SECS;
+
+
+ gf_msg (this->name, GF_LOG_INFO, 0,
+ DHT_MSG_LOG_TIER_STATUS, "Begin run tier promote %d demote %d",
+ next_promote, next_demote);
+
+ defrag->defrag_status = GF_DEFRAG_STATUS_STARTED;
+
+ while (1) {
+
+ sleep(1);
+
+ ret_promotion = -1;
+ ret_demotion = -1;
+
+ if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) {
+ ret = 1;
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "defrag->defrag_status != "
+ "GF_DEFRAG_STATUS_STARTED");
+ goto out;
+ }
+
+ tick = (tick + 1) % TIMER_SECS;
+ if ((next_demote != tick) && (next_promote != tick))
+ continue;
+
+ if (next_demote >= tick) {
+ demotion_args.this = this;
+ demotion_args.brick_list = bricklist_hot;
+ demotion_args.defrag = defrag;
+ demotion_args.freq_time = freq_demote;
+ ret_demotion = pthread_create (&demote_thread, NULL,
+ &tier_demote, &demotion_args);
+ if (ret_demotion) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "Failed starting Demotion thread!");
+ }
+ freq_demote = defrag->tier_demote_frequency;
+ next_demote = (tick + freq_demote) % TIMER_SECS;
+ }
+
+ if (next_promote >= tick) {
+ promotion_args.this = this;
+ promotion_args.brick_list = bricklist_cold;
+ promotion_args.defrag = defrag;
+ promotion_args.freq_time = freq_promote;
+ ret_promotion = pthread_create (&promote_thread, NULL,
+ &tier_promote, &promotion_args);
+ if (ret_promotion) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "Failed starting Promotion thread!");
+ }
+ freq_promote = defrag->tier_promote_frequency;
+ next_promote = (tick + freq_promote) % TIMER_SECS;
+ }
+
+ if (ret_demotion == 0) {
+ pthread_join (demote_thread, NULL);
+ if (demotion_args.return_value) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "Demotion failed!");
+ }
+ ret_demotion = demotion_args.return_value;
+ }
+
+ if (ret_promotion == 0) {
+ pthread_join (promote_thread, NULL);
+ if (promotion_args.return_value) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "Promotion failed!");
+ }
+ ret_promotion = promotion_args.return_value;
+ }
+
+ /*Collect previous and current cummulative status */
+ ret = ret | ret_demotion | ret_promotion;
+
+ /*reseting promotion and demotion arguments for next iteration*/
+ memset (&demotion_args, 0, sizeof(demotion_args_t));
+ memset (&promotion_args, 0, sizeof(promotion_args_t));
+
+ }
+
+ ret = 0;
+out:
+
+ dict_unref(bricklist_cold);
+ dict_unref(bricklist_hot);
+
+ return ret;
+}
+
+int32_t
+tier_migration_needed (xlator_t *this)
+{
+ gf_defrag_info_t *defrag = NULL;
+ dht_conf_t *conf = NULL;
+ int ret = 0;
+
+ conf = this->private;
+
+ GF_VALIDATE_OR_GOTO (this->name, conf, out);
+ GF_VALIDATE_OR_GOTO (this->name, conf->defrag, out);
+
+ defrag = conf->defrag;
+
+ if (defrag->cmd == GF_DEFRAG_CMD_START_TIER)
+ ret = 1;
+out:
+ return ret;
+}
+
+int32_t
+tier_migration_get_dst (xlator_t *this, dht_local_t *local)
+{
+ dht_conf_t *conf = NULL;
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO("tier", this, out);
+ GF_VALIDATE_OR_GOTO(this->name, this->private, out);
+
+ conf = this->private;
+ if (!conf)
+ goto out;
+
+ if (conf->subvolumes[0] == local->cached_subvol)
+ local->rebalance.target_node =
+ conf->subvolumes[1];
+ else
+ local->rebalance.target_node =
+ conf->subvolumes[0];
+
+ if (local->rebalance.target_node)
+ ret = 0;
+
+out:
+ return ret;
+}
+
+xlator_t *
+tier_search (xlator_t *this, dht_layout_t *layout, const char *name)
+{
+ xlator_t *subvol = NULL;
+ void *value;
+ int search_first_subvol = 0;
+
+ GF_VALIDATE_OR_GOTO("tier", this, out);
+ GF_VALIDATE_OR_GOTO(this->name, layout, out);
+ GF_VALIDATE_OR_GOTO(this->name, name, out);
+
+ if (!dict_get_ptr (this->options, "rule", &value) &&
+ !strcmp(layout->list[0].xlator->name, value)) {
+ search_first_subvol = 1;
+ }
+
+ if (search_first_subvol)
+ subvol = layout->list[0].xlator;
+ else
+ subvol = layout->list[1].xlator;
+
+out:
+ return subvol;
+}
+
+
+dht_methods_t tier_methods = {
+ .migration_get_dst_subvol = tier_migration_get_dst,
+ .migration_other = tier_start,
+ .migration_needed = tier_migration_needed,
+ .layout_search = tier_search,
+};
+
+
+int
+tier_init (xlator_t *this)
+{
+ int ret = -1;
+ int freq = 0;
+ dht_conf_t *conf = NULL;
+ gf_defrag_info_t *defrag = NULL;
+
+ ret = dht_init(this);
+ if (ret) {
+ gf_msg(this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "dht_init failed");
+ goto out;
+ }
+
+ conf = this->private;
+
+ conf->methods = &tier_methods;
+
+ if (conf->subvolume_cnt != 2) {
+ gf_msg(this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "Invalid number of subvolumes %d", conf->subvolume_cnt);
+ goto out;
+ }
+
+ /* if instatiated from client side initialization is complete. */
+ if (!conf->defrag) {
+ ret = 0;
+ goto out;
+ }
+
+ defrag = conf->defrag;
+
+ GF_OPTION_INIT ("tier-promote-frequency",
+ defrag->tier_promote_frequency,
+ int32, out);
+
+ ret = dict_get_int32 (this->options,
+ "tier-promote-frequency", &freq);
+ if (ret) {
+ freq = DEFAULT_PROMOTE_FREQ_SEC;
+ }
+
+ defrag->tier_promote_frequency = freq;
+
+ GF_OPTION_INIT ("tier-demote-frequency",
+ defrag->tier_demote_frequency,
+ int32, out);
+
+ ret = dict_get_int32 (this->options,
+ "tier-demote-frequency", &freq);
+ if (ret) {
+ freq = DEFAULT_DEMOTE_FREQ_SEC;
+ }
+
+ defrag->tier_demote_frequency = freq;
+
+ GF_OPTION_INIT ("write-freq-threshold",
+ defrag->write_freq_threshold,
+ int32, out);
+
+ GF_OPTION_INIT ("read-freq-threshold",
+ defrag->read_freq_threshold,
+ int32, out);
+
+ gf_msg(this->name, GF_LOG_INFO, 0,
+ DHT_MSG_LOG_TIER_STATUS,
+ "Promote frequency set to %d demote set to %d",
+ defrag->tier_promote_frequency,
+ defrag->tier_demote_frequency);
+
+ ret = 0;
+
+out:
+ return ret;
+}
+
+
+int
+tier_reconfigure (xlator_t *this, dict_t *options)
+{
+ dht_conf_t *conf = NULL;
+ gf_defrag_info_t *defrag = NULL;
+
+ conf = this->private;
+
+ if (conf->defrag) {
+ defrag = conf->defrag;
+ GF_OPTION_RECONF ("tier-promote-frequency",
+ defrag->tier_promote_frequency, options,
+ int32, out);
+
+ GF_OPTION_RECONF ("tier-demote-frequency",
+ defrag->tier_demote_frequency, options,
+ int32, out);
+
+ GF_OPTION_RECONF ("write-freq-threshold",
+ defrag->write_freq_threshold, options,
+ int32, out);
+
+ GF_OPTION_RECONF ("read-freq-threshold",
+ defrag->read_freq_threshold, options,
+ int32, out);
+ }
+
+out:
+ return dht_reconfigure (this, options);
+}
+
+class_methods_t class_methods = {
+ .init = tier_init,
+ .fini = dht_fini,
+ .reconfigure = tier_reconfigure,
+ .notify = dht_notify
+};
+
+
+struct xlator_fops fops = {
+ .lookup = dht_lookup,
+ .create = dht_create,
+ .mknod = dht_mknod,
+
+ .stat = dht_stat,
+ .fstat = dht_fstat,
+ .truncate = dht_truncate,
+ .ftruncate = dht_ftruncate,
+ .access = dht_access,
+ .readlink = dht_readlink,
+ .setxattr = dht_setxattr,
+ .getxattr = dht_getxattr,
+ .removexattr = dht_removexattr,
+ .open = dht_open,
+ .readv = dht_readv,
+ .writev = dht_writev,
+ .flush = dht_flush,
+ .fsync = dht_fsync,
+ .statfs = dht_statfs,
+ .lk = dht_lk,
+ .opendir = dht_opendir,
+ .readdir = dht_readdir,
+ .readdirp = dht_readdirp,
+ .fsyncdir = dht_fsyncdir,
+ .symlink = dht_symlink,
+ .unlink = dht_unlink,
+ .link = dht_link,
+ .mkdir = dht_mkdir,
+ .rmdir = dht_rmdir,
+ .rename = dht_rename,
+ .inodelk = dht_inodelk,
+ .finodelk = dht_finodelk,
+ .entrylk = dht_entrylk,
+ .fentrylk = dht_fentrylk,
+ .xattrop = dht_xattrop,
+ .fxattrop = dht_fxattrop,
+ .setattr = dht_setattr,
+};
+
+
+struct xlator_cbks cbks = {
+ .forget = dht_forget
+};
+
diff --git a/xlators/cluster/dht/src/tier.h b/xlators/cluster/dht/src/tier.h
new file mode 100644
index 00000000000..73266050a5c
--- /dev/null
+++ b/xlators/cluster/dht/src/tier.h
@@ -0,0 +1,71 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _TIER_H_
+#define _TIER_H_
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+
+/******************************************************************************/
+/* This is from dht-rebalancer.c as we dont have dht-rebalancer.h */
+#include "dht-common.h"
+#include "xlator.h"
+#include <signal.h>
+#include <fnmatch.h>
+#include <signal.h>
+
+#define DEFAULT_PROMOTE_FREQ_SEC 120
+#define DEFAULT_DEMOTE_FREQ_SEC 120
+
+/*
+ * Size of timer wheel. We would not promote or demote lesd
+ * frequently than this number.
+ */
+#define TIMER_SECS 3600
+
+#include "gfdb_data_store.h"
+#include <ctype.h>
+#include <sys/xattr.h>
+#include <sys/stat.h>
+
+#define DEMOTION_QFILE "/var/run/gluster/demotequeryfile"
+#define PROMOTION_QFILE "/var/run/gluster/promotequeryfile"
+
+#define GET_QFILE_PATH(is_promotion)\
+ (is_promotion) ? PROMOTION_QFILE : DEMOTION_QFILE
+
+typedef struct _query_cbk_args {
+ xlator_t *this;
+ gf_defrag_info_t *defrag;
+ FILE *queryFILE;
+} query_cbk_args_t;
+
+int
+gf_run_tier(xlator_t *this, gf_defrag_info_t *defrag);
+
+typedef struct _gfdb_brick_dict_info {
+ gfdb_time_t *time_stamp;
+ gf_boolean_t _gfdb_promote;
+ query_cbk_args_t *_query_cbk_args;
+} _gfdb_brick_dict_info_t;
+
+typedef struct _dm_thread_args {
+ xlator_t *this;
+ gf_defrag_info_t *defrag;
+ dict_t *brick_list;
+ int freq_time;
+ int return_value;
+} promotion_args_t, demotion_args_t;
+
+#endif