summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/dht/src/tier.c
diff options
context:
space:
mode:
authorDan Lambright <dlambrig@redhat.com>2015-02-22 11:05:58 -0500
committerVijay Bellur <vbellur@redhat.com>2015-03-21 09:50:29 -0700
commit20355992e8eed7d3ed78a23bc7e922d6ae94860d (patch)
treec7af71245b57d867ef83edf91dbfe69e622a478d /xlators/cluster/dht/src/tier.c
parent8aa13c8e285ad496ed7c8511ae0b735eed73ebd4 (diff)
cluster/dht: Add tier translator.
The tier translator shares most of DHT's code. It differs in how subvolumes are chosen for I/Os, and how file migration (cache promotion and demotion) is managed. That different functionality is split to either DHT or tier logic according to the "tier_methods" structure. A cache promotion and demotion thread is created in a manner similar to the rebalance daemon. The thread operates a timing wheel which periodically checks for promotion and demotion candidates (files). Candidates are queued and then migrated. Candidates must exist on the same node as the daemon and meet other critera per caching policies. This patch has two authors (Dan Lambright and Joseph Fernandes). Dan did the DHT changes and Joe wrote the cache policies. The fix depends on DHT readidr changes and the database library which have been submitted separately. Header files in libglusterfs/src/gfdb should be reviewed in patch 9683. For more background and design see the feature page [1]. [1] http://www.gluster.org/community/documentation/index.php/Features/data-classification Change-Id: Icc26c517ccecf5c42aef039f5b9c6f7afe83e46c BUG: 1194753 Signed-off-by: Dan Lambright <dlambrig@redhat.com> Reviewed-on: http://review.gluster.org/9724 Reviewed-by: Vijay Bellur <vbellur@redhat.com> Tested-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'xlators/cluster/dht/src/tier.c')
-rw-r--r--xlators/cluster/dht/src/tier.c1007
1 files changed, 1007 insertions, 0 deletions
diff --git a/xlators/cluster/dht/src/tier.c b/xlators/cluster/dht/src/tier.c
new file mode 100644
index 00000000000..028a42f7a1a
--- /dev/null
+++ b/xlators/cluster/dht/src/tier.c
@@ -0,0 +1,1007 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "dht-common.h"
+#include "tier.h"
+
+/*Hard coded DB info*/
+static gfdb_db_type_t dht_tier_db_type = GFDB_SQLITE3;
+/*Hard coded DB info*/
+
+/*Mutex for updating the data movement stats*/
+static pthread_mutex_t dm_stat_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+#define DB_QUERY_RECORD_SIZE 4096
+
+static int
+tier_parse_query_str (char *query_record_str,
+ char *gfid, char *link_buffer, ssize_t *link_size)
+{
+ char *token_str = NULL;
+ char *delimiter = "|";
+ char *saveptr = NULL;
+ int ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("tier", query_record_str, out);
+ GF_VALIDATE_OR_GOTO ("tier", gfid, out);
+ GF_VALIDATE_OR_GOTO ("tier", link_buffer, out);
+ GF_VALIDATE_OR_GOTO ("tier", link_size, out);
+
+ token_str = strtok_r (query_record_str, delimiter, &saveptr);
+ if (!token_str)
+ goto out;
+
+ strcpy (gfid, token_str);
+
+
+ token_str = strtok_r (NULL, delimiter, &saveptr);
+ if (!token_str)
+ goto out;
+
+ strcpy (link_buffer, token_str);
+
+ token_str = strtok_r (NULL, delimiter, &saveptr);
+ if (!token_str)
+ goto out;
+
+ *link_size = atoi (token_str);
+
+ ret = 0;
+out:
+ return ret;
+}
+
+static int
+tier_migrate_using_query_file (void *_args)
+{
+ int ret = -1;
+ char gfid_str[UUID_CANONICAL_FORM_LEN+1] = "";
+ char query_record_str[4096] = "";
+ query_cbk_args_t *query_cbk_args = (query_cbk_args_t *) _args;
+ xlator_t *this = NULL;
+ gf_defrag_info_t *defrag = NULL;
+ char *token_str = NULL;
+ char *delimiter = "::";
+ char *link_buffer = NULL;
+ gfdb_query_record_t *query_record = NULL;
+ gfdb_link_info_t *link_info = NULL;
+ struct iatt par_stbuf = {0,};
+ struct iatt current = {0,};
+ loc_t p_loc = {0,};
+ loc_t loc = {0,};
+ dict_t *migrate_data = NULL;
+ inode_t *linked_inode = NULL;
+ int per_file_status = 0;
+ int per_link_status = 0;
+ int total_status = 0;
+ FILE *queryFILE = NULL;
+ char *link_str = NULL;
+
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args, out);
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->this, out);
+ this = query_cbk_args->this;
+ GF_VALIDATE_OR_GOTO (this->name, query_cbk_args->defrag, out);
+ GF_VALIDATE_OR_GOTO (this->name, query_cbk_args->queryFILE, out);
+
+ defrag = query_cbk_args->defrag;
+
+ queryFILE = query_cbk_args->queryFILE;
+
+ query_record = gfdb_query_record_init();
+ if (!query_record) {
+ goto out;
+ }
+
+ query_record->_link_info_str = calloc (DB_QUERY_RECORD_SIZE, 1);
+ if (!query_record->_link_info_str) {
+ goto out;
+ }
+ link_buffer = query_record->_link_info_str;
+
+ link_info = gfdb_link_info_init ();
+
+ migrate_data = dict_new ();
+ if (!migrate_data)
+ goto out;
+
+ /* Per file */
+ while (fscanf (queryFILE, "%s", query_record_str) != EOF) {
+
+ per_file_status = 0;
+ per_link_status = 0;
+
+ memset (gfid_str, 0, UUID_CANONICAL_FORM_LEN+1);
+ memset (query_record->_link_info_str, 0, DB_QUERY_RECORD_SIZE);
+
+ if (tier_parse_query_str (query_record_str, gfid_str,
+ link_buffer,
+ &query_record->link_info_size)) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "failed parsing %s\n", query_record_str);
+ continue;
+ }
+
+ uuid_parse (gfid_str, query_record->gfid);
+
+ if (dict_get(migrate_data, GF_XATTR_FILE_MIGRATE_KEY))
+ dict_del(migrate_data, GF_XATTR_FILE_MIGRATE_KEY);
+
+ if (dict_get(migrate_data, "from.migrator"))
+ dict_del(migrate_data, "from.migrator");
+
+ token_str = strtok (link_buffer, delimiter);
+ if (token_str != NULL) {
+ per_file_status =
+ dict_set_str (migrate_data,
+ GF_XATTR_FILE_MIGRATE_KEY,
+ "force");
+ if (per_file_status) {
+ goto per_file_out;
+ }
+
+ /* Flag to suggest the xattr call is from migrator */
+ per_file_status = dict_set_str (migrate_data,
+ "from.migrator", "yes");
+ if (per_file_status) {
+ goto per_file_out;
+ }
+ }
+ per_link_status = 0;
+ /* Per link of file */
+ while (token_str != NULL) {
+
+ link_str = gf_strdup (token_str);
+
+ if (!link_info) {
+ per_link_status = -1;
+ goto per_file_out;
+ }
+
+ memset (link_info, 0, sizeof(gfdb_link_info_t));
+
+ ret = str_to_link_info (link_str, link_info);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "failed parsing %s\n", link_str);
+ per_link_status = -1;
+ goto error;
+ }
+
+ uuid_copy (p_loc.gfid, link_info->pargfid);
+
+ p_loc.inode = inode_new (defrag->root_inode->table);
+ if (!p_loc.inode) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "failed parsing %s\n", link_str);
+ per_link_status = -1;
+ goto error;
+ }
+
+ ret = syncop_lookup (this, &p_loc, NULL, &par_stbuf,
+ NULL, NULL);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ " ERROR in parent lookup\n");
+ per_link_status = -1;
+ goto error;
+ }
+ linked_inode = inode_link (p_loc.inode, NULL, NULL,
+ &par_stbuf);
+ inode_unref (p_loc.inode);
+ p_loc.inode = linked_inode;
+
+ uuid_copy (loc.gfid, query_record->gfid);
+ loc.inode = inode_new (defrag->root_inode->table);
+ uuid_copy (loc.pargfid, link_info->pargfid);
+ loc.parent = inode_ref(p_loc.inode);
+
+ loc.name = gf_strdup (link_info->file_name);
+ if (!loc.name) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR, "ERROR in "
+ "memory allocation\n");
+ per_link_status = -1;
+ goto error;
+ }
+
+ loc.path = gf_strdup (link_info->file_path);
+ if (!loc.path) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR, "ERROR in "
+ "memory allocation\n");
+ per_link_status = -1;
+ goto error;
+ }
+
+ uuid_copy (loc.parent->gfid, link_info->pargfid);
+
+ ret = syncop_lookup (this, &loc, NULL, &current,
+ NULL, NULL);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR, "ERROR in "
+ "current lookup\n");
+ per_link_status = -1;
+ goto error;
+ }
+ linked_inode = inode_link (loc.inode, NULL, NULL,
+ &current);
+ inode_unref (loc.inode);
+ loc.inode = linked_inode;
+
+ gf_msg (this->name, GF_LOG_INFO, 0,
+ DHT_MSG_LOG_TIER_STATUS, "Tier migrate file %s",
+ loc.name);
+
+ ret = syncop_setxattr (this, &loc, migrate_data, 0);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR, "ERROR %d in "
+ "current migration %s %s\n", ret,
+ loc.name,
+ loc.path);
+ per_link_status = -1;
+ goto error;
+ }
+ inode_unref (loc.inode);
+ inode_unref (loc.parent);
+ inode_unref (p_loc.inode);
+error:
+ if (loc.name) {
+ GF_FREE ((char *) loc.name);
+ }
+ if (loc.path) {
+ GF_FREE ((char *) loc.path);
+ }
+ token_str = NULL;
+ token_str = strtok (NULL, delimiter);
+ GF_FREE (link_str);
+ }
+ per_file_status = per_link_status;
+per_file_out:
+ if (per_file_status) {
+ pthread_mutex_lock (&dm_stat_mutex);
+ defrag->total_failures++;
+ pthread_mutex_unlock (&dm_stat_mutex);
+ } else {
+ pthread_mutex_lock (&dm_stat_mutex);
+ defrag->total_files++;
+ pthread_mutex_unlock (&dm_stat_mutex);
+ }
+ total_status = total_status + per_file_status;
+ per_link_status = 0;
+ per_file_status = 0;
+ query_record_str[0] = '\0';
+ }
+
+out:
+ if (link_buffer)
+ free (link_buffer);
+ gfdb_link_info_fini (&link_info);
+ if (migrate_data)
+ dict_unref (migrate_data);
+ gfdb_query_record_fini (&query_record);
+ return total_status;
+}
+
+
+/*This is the call back function per record/file from data base*/
+static int
+tier_gf_query_callback (gfdb_query_record_t *gfdb_query_record,
+ void *_args) {
+ int ret = -1;
+ char gfid_str[UUID_CANONICAL_FORM_LEN] = "";
+ query_cbk_args_t *query_cbk_args = _args;
+
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args, out);
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->defrag, out);
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->queryFILE, out);
+
+ uuid_unparse (gfdb_query_record->gfid, gfid_str);
+ fprintf (query_cbk_args->queryFILE, "%s|%s|%ld\n", gfid_str,
+ gfdb_query_record->_link_info_str,
+ gfdb_query_record->link_info_size);
+
+ pthread_mutex_lock (&dm_stat_mutex);
+ query_cbk_args->defrag->num_files_lookedup++;
+ pthread_mutex_unlock (&dm_stat_mutex);
+
+ ret = 0;
+out:
+ return ret;
+}
+
+/*This is the call back function for each brick from hot/cold bricklist
+ * It picks up each bricks db and queries for eligible files for migration.
+ * The list of eligible files are populated in appropriate query files*/
+static int
+tier_process_brick_cbk (dict_t *brick_dict, char *key, data_t *value,
+ void *args) {
+ int ret = -1;
+ char *db_path = NULL;
+ query_cbk_args_t *query_cbk_args = NULL;
+ xlator_t *this = NULL;
+ gfdb_conn_node_t *conn_node = NULL;
+ dict_t *params_dict = NULL;
+ _gfdb_brick_dict_info_t *gfdb_brick_dict_info = args;
+
+ /*Init of all the essentials*/
+ GF_VALIDATE_OR_GOTO ("tier", gfdb_brick_dict_info , out);
+ query_cbk_args = gfdb_brick_dict_info->_query_cbk_args;
+
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->this, out);
+ this = query_cbk_args->this;
+
+ GF_VALIDATE_OR_GOTO (this->name,
+ gfdb_brick_dict_info->_query_cbk_args, out);
+
+ GF_VALIDATE_OR_GOTO (this->name, value, out);
+ db_path = data_to_str(value);
+
+ /*Preparing DB parameters before init_db i.e getting db connection*/
+ params_dict = dict_new ();
+ if (!params_dict) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "DB Params cannot initialized!");
+ goto out;
+ }
+ SET_DB_PARAM_TO_DICT(this->name, params_dict, GFDB_SQL_PARAM_DBPATH,
+ db_path, ret, out);
+
+ /*Get the db connection*/
+ conn_node = init_db((void *)params_dict, dht_tier_db_type);
+ if (!conn_node) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "FATAL: Failed initializing db operations");
+ goto out;
+ }
+
+ /*Query for eligible files from db*/
+ query_cbk_args->queryFILE = fopen(GET_QFILE_PATH
+ (gfdb_brick_dict_info->_gfdb_promote), "a+");
+ if (!query_cbk_args->queryFILE) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
+ "Failed to open query file %s:%s",
+ GET_QFILE_PATH
+ (gfdb_brick_dict_info->_gfdb_promote),
+ strerror(errno));
+ goto out;
+ }
+ if (!gfdb_brick_dict_info->_gfdb_promote) {
+ if (query_cbk_args->defrag->write_freq_threshold == 0 &&
+ query_cbk_args->defrag->read_freq_threshold == 0) {
+ ret = find_unchanged_for_time(conn_node,
+ tier_gf_query_callback,
+ (void *)query_cbk_args,
+ gfdb_brick_dict_info->time_stamp);
+ } else {
+ ret = find_unchanged_for_time_freq(conn_node,
+ tier_gf_query_callback,
+ (void *)query_cbk_args,
+ gfdb_brick_dict_info->time_stamp,
+ query_cbk_args->defrag->
+ write_freq_threshold,
+ query_cbk_args->defrag->
+ read_freq_threshold,
+ _gf_false);
+ }
+ } else {
+ if (query_cbk_args->defrag->write_freq_threshold == 0 &&
+ query_cbk_args->defrag->read_freq_threshold == 0) {
+ ret = find_recently_changed_files(conn_node,
+ tier_gf_query_callback,
+ (void *)query_cbk_args,
+ gfdb_brick_dict_info->time_stamp);
+ } else {
+ ret = find_recently_changed_files_freq(conn_node,
+ tier_gf_query_callback,
+ (void *)query_cbk_args,
+ gfdb_brick_dict_info->time_stamp,
+ query_cbk_args->defrag->
+ write_freq_threshold,
+ query_cbk_args->defrag->
+ read_freq_threshold,
+ _gf_false);
+ }
+ }
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "FATAL: query from db failed");
+ goto out;
+ }
+ ret = 0;
+out:
+ if (query_cbk_args->queryFILE) {
+ fclose (query_cbk_args->queryFILE);
+ query_cbk_args->queryFILE = NULL;
+ }
+ fini_db (conn_node);
+ return ret;
+}
+
+inline int
+tier_build_migration_qfile (demotion_args_t *args,
+ query_cbk_args_t *query_cbk_args,
+ gf_boolean_t is_promotion)
+{
+ gfdb_time_t current_time;
+ _gfdb_brick_dict_info_t gfdb_brick_dict_info;
+ gfdb_time_t time_in_past;
+ int ret = -1;
+
+ remove (GET_QFILE_PATH (is_promotion));
+ time_in_past.tv_sec = args->freq_time;
+ time_in_past.tv_usec = 0;
+ if (gettimeofday (&current_time, NULL) == -1) {
+ gf_log (args->this->name, GF_LOG_ERROR,
+ "Failed to get current timen");
+ goto out;
+ }
+ time_in_past.tv_sec = current_time.tv_sec - time_in_past.tv_sec;
+ time_in_past.tv_usec = current_time.tv_usec - time_in_past.tv_usec;
+ gfdb_brick_dict_info.time_stamp = &time_in_past;
+ gfdb_brick_dict_info._gfdb_promote = is_promotion;
+ gfdb_brick_dict_info._query_cbk_args = query_cbk_args;
+ ret = dict_foreach (args->brick_list, tier_process_brick_cbk,
+ &gfdb_brick_dict_info);
+ if (ret) {
+ gf_log (args->this->name, GF_LOG_ERROR,
+ "Brick query failedn");
+ goto out;
+ }
+out:
+ return ret;
+}
+
+inline int
+tier_migrate_files_using_qfile (demotion_args_t *comp,
+ query_cbk_args_t *query_cbk_args,
+ char *qfile)
+{
+ char renamed_file[PATH_MAX] = "";
+ int ret = -1;
+
+ query_cbk_args->queryFILE = fopen (qfile, "r");
+ if (!query_cbk_args->queryFILE) {
+ gf_log ("tier", GF_LOG_ERROR,
+ "Failed opening %s for migration", qfile);
+ goto out;
+ }
+ ret = tier_migrate_using_query_file ((void *)query_cbk_args);
+ fclose (query_cbk_args->queryFILE);
+ query_cbk_args->queryFILE = NULL;
+ if (ret) {
+ sprintf (renamed_file, "%s.err", qfile);
+ rename (qfile, renamed_file);
+ }
+out:
+ return ret;
+}
+
+/*Demotion Thread*/
+static void *
+tier_demote (void *args)
+{
+ int ret = -1;
+ query_cbk_args_t query_cbk_args;
+ demotion_args_t *demotion_args = args;
+
+ GF_VALIDATE_OR_GOTO ("tier", demotion_args, out);
+ GF_VALIDATE_OR_GOTO ("tier", demotion_args->this, out);
+ GF_VALIDATE_OR_GOTO (demotion_args->this->name,
+ demotion_args->brick_list, out);
+ GF_VALIDATE_OR_GOTO (demotion_args->this->name,
+ demotion_args->defrag, out);
+
+ query_cbk_args.this = demotion_args->this;
+ query_cbk_args.defrag = demotion_args->defrag;
+
+ /*Build the query file using bricklist*/
+ ret = tier_build_migration_qfile(demotion_args, &query_cbk_args,
+ _gf_false);
+ if (ret)
+ goto out;
+
+ /* Migrate files using the query file */
+ ret = tier_migrate_files_using_qfile (args,
+ &query_cbk_args, DEMOTION_QFILE);
+ if (ret)
+ goto out;
+
+out:
+ demotion_args->return_value = ret;
+ return NULL;
+}
+
+
+/*Promotion Thread*/
+static void
+*tier_promote (void *args)
+{
+ int ret = -1;
+ query_cbk_args_t query_cbk_args;
+ promotion_args_t *promotion_args = args;
+
+ GF_VALIDATE_OR_GOTO ("tier", promotion_args->this, out);
+ GF_VALIDATE_OR_GOTO (promotion_args->this->name,
+ promotion_args->brick_list, out);
+ GF_VALIDATE_OR_GOTO (promotion_args->this->name,
+ promotion_args->defrag, out);
+
+ query_cbk_args.this = promotion_args->this;
+ query_cbk_args.defrag = promotion_args->defrag;
+
+ /*Build the query file using bricklist*/
+ ret = tier_build_migration_qfile(promotion_args, &query_cbk_args,
+ _gf_true);
+ if (ret)
+ goto out;
+
+ /* Migrate files using the query file */
+ ret = tier_migrate_files_using_qfile (args,
+ &query_cbk_args,
+ PROMOTION_QFILE);
+ if (ret)
+ goto out;
+
+out:
+ promotion_args->return_value = ret;
+ return NULL;
+}
+
+static void
+tier_get_bricklist (xlator_t *xl, dict_t *bricklist)
+{
+ xlator_list_t *child = NULL;
+ char *rv = NULL;
+ char *rh = NULL;
+ char localhost[256] = {0};
+ char *db_path = "";
+ char *brickname = NULL;
+ char db_name[PATH_MAX] = "";
+ int ret = 0;
+
+ GF_VALIDATE_OR_GOTO ("tier", xl, out);
+ GF_VALIDATE_OR_GOTO ("tier", bricklist, out);
+
+ gethostname (localhost, sizeof (localhost));
+
+ /*
+ * This function obtains remote subvolumes and filters out only
+ * those running on the same node as the tier daemon.
+ */
+ if (strcmp(xl->type, "protocol/client") == 0) {
+ ret = dict_get_str(xl->options, "remote-host", &rh);
+ if (ret < 0)
+ goto out;
+
+ if (gf_is_local_addr (rh)) {
+
+ ret = dict_get_str(xl->options, "remote-subvolume",
+ &rv);
+ if (ret < 0)
+ goto out;
+ brickname = strrchr(rv, '/') + 1;
+ snprintf(db_name, sizeof(db_name), "%s.db",
+ brickname);
+ db_path = GF_CALLOC (PATH_MAX, 1, gf_common_mt_char);
+ if (!db_path) {
+ gf_msg ("tier", GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_STATUS,
+ "Failed to allocate memory for bricklist");
+ goto out;
+ }
+
+ sprintf(db_path, "%s/.glusterfs/%s", rv, db_name);
+ if (dict_add_dynstr_with_alloc(bricklist, "brick",
+ db_path))
+ goto out;
+ }
+ }
+
+ for (child = xl->children; child; child = child->next) {
+ tier_get_bricklist(child->xlator, bricklist);
+ }
+out:
+ return;
+}
+
+int
+tier_start (xlator_t *this, gf_defrag_info_t *defrag)
+{
+ dict_t *bricklist_cold = NULL;
+ dict_t *bricklist_hot = NULL;
+ dht_conf_t *conf = NULL;
+ int tick = 0;
+ int next_demote = 0;
+ int next_promote = 0;
+ int freq_promote = 0;
+ int freq_demote = 0;
+ promotion_args_t promotion_args = { 0 };
+ demotion_args_t demotion_args = { 0 };
+ int ret_promotion = 0;
+ int ret_demotion = 0;
+ int ret = 0;
+ pthread_t promote_thread;
+ pthread_t demote_thread;
+
+ conf = this->private;
+
+ bricklist_cold = dict_new();
+ if (!bricklist_cold)
+ return -1;
+
+ bricklist_hot = dict_new();
+ if (!bricklist_hot)
+ return -1;
+
+ tier_get_bricklist (conf->subvolumes[0], bricklist_cold);
+ tier_get_bricklist (conf->subvolumes[1], bricklist_hot);
+
+ freq_promote = defrag->tier_promote_frequency;
+ freq_demote = defrag->tier_demote_frequency;
+
+ next_promote = defrag->tier_promote_frequency % TIMER_SECS;
+ next_demote = defrag->tier_demote_frequency % TIMER_SECS;
+
+
+ gf_msg (this->name, GF_LOG_INFO, 0,
+ DHT_MSG_LOG_TIER_STATUS, "Begin run tier promote %d demote %d",
+ next_promote, next_demote);
+
+ defrag->defrag_status = GF_DEFRAG_STATUS_STARTED;
+
+ while (1) {
+
+ sleep(1);
+
+ ret_promotion = -1;
+ ret_demotion = -1;
+
+ if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) {
+ ret = 1;
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "defrag->defrag_status != "
+ "GF_DEFRAG_STATUS_STARTED");
+ goto out;
+ }
+
+ tick = (tick + 1) % TIMER_SECS;
+ if ((next_demote != tick) && (next_promote != tick))
+ continue;
+
+ if (next_demote >= tick) {
+ demotion_args.this = this;
+ demotion_args.brick_list = bricklist_hot;
+ demotion_args.defrag = defrag;
+ demotion_args.freq_time = freq_demote;
+ ret_demotion = pthread_create (&demote_thread, NULL,
+ &tier_demote, &demotion_args);
+ if (ret_demotion) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "Failed starting Demotion thread!");
+ }
+ freq_demote = defrag->tier_demote_frequency;
+ next_demote = (tick + freq_demote) % TIMER_SECS;
+ }
+
+ if (next_promote >= tick) {
+ promotion_args.this = this;
+ promotion_args.brick_list = bricklist_cold;
+ promotion_args.defrag = defrag;
+ promotion_args.freq_time = freq_promote;
+ ret_promotion = pthread_create (&promote_thread, NULL,
+ &tier_promote, &promotion_args);
+ if (ret_promotion) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "Failed starting Promotion thread!");
+ }
+ freq_promote = defrag->tier_promote_frequency;
+ next_promote = (tick + freq_promote) % TIMER_SECS;
+ }
+
+ if (ret_demotion == 0) {
+ pthread_join (demote_thread, NULL);
+ if (demotion_args.return_value) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "Demotion failed!");
+ }
+ ret_demotion = demotion_args.return_value;
+ }
+
+ if (ret_promotion == 0) {
+ pthread_join (promote_thread, NULL);
+ if (promotion_args.return_value) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "Promotion failed!");
+ }
+ ret_promotion = promotion_args.return_value;
+ }
+
+ /*Collect previous and current cummulative status */
+ ret = ret | ret_demotion | ret_promotion;
+
+ /*reseting promotion and demotion arguments for next iteration*/
+ memset (&demotion_args, 0, sizeof(demotion_args_t));
+ memset (&promotion_args, 0, sizeof(promotion_args_t));
+
+ }
+
+ ret = 0;
+out:
+
+ dict_unref(bricklist_cold);
+ dict_unref(bricklist_hot);
+
+ return ret;
+}
+
+int32_t
+tier_migration_needed (xlator_t *this)
+{
+ gf_defrag_info_t *defrag = NULL;
+ dht_conf_t *conf = NULL;
+ int ret = 0;
+
+ conf = this->private;
+
+ GF_VALIDATE_OR_GOTO (this->name, conf, out);
+ GF_VALIDATE_OR_GOTO (this->name, conf->defrag, out);
+
+ defrag = conf->defrag;
+
+ if (defrag->cmd == GF_DEFRAG_CMD_START_TIER)
+ ret = 1;
+out:
+ return ret;
+}
+
+int32_t
+tier_migration_get_dst (xlator_t *this, dht_local_t *local)
+{
+ dht_conf_t *conf = NULL;
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO("tier", this, out);
+ GF_VALIDATE_OR_GOTO(this->name, this->private, out);
+
+ conf = this->private;
+ if (!conf)
+ goto out;
+
+ if (conf->subvolumes[0] == local->cached_subvol)
+ local->rebalance.target_node =
+ conf->subvolumes[1];
+ else
+ local->rebalance.target_node =
+ conf->subvolumes[0];
+
+ if (local->rebalance.target_node)
+ ret = 0;
+
+out:
+ return ret;
+}
+
+xlator_t *
+tier_search (xlator_t *this, dht_layout_t *layout, const char *name)
+{
+ xlator_t *subvol = NULL;
+ void *value;
+ int search_first_subvol = 0;
+
+ GF_VALIDATE_OR_GOTO("tier", this, out);
+ GF_VALIDATE_OR_GOTO(this->name, layout, out);
+ GF_VALIDATE_OR_GOTO(this->name, name, out);
+
+ if (!dict_get_ptr (this->options, "rule", &value) &&
+ !strcmp(layout->list[0].xlator->name, value)) {
+ search_first_subvol = 1;
+ }
+
+ if (search_first_subvol)
+ subvol = layout->list[0].xlator;
+ else
+ subvol = layout->list[1].xlator;
+
+out:
+ return subvol;
+}
+
+
+dht_methods_t tier_methods = {
+ .migration_get_dst_subvol = tier_migration_get_dst,
+ .migration_other = tier_start,
+ .migration_needed = tier_migration_needed,
+ .layout_search = tier_search,
+};
+
+
+int
+tier_init (xlator_t *this)
+{
+ int ret = -1;
+ int freq = 0;
+ dht_conf_t *conf = NULL;
+ gf_defrag_info_t *defrag = NULL;
+
+ ret = dht_init(this);
+ if (ret) {
+ gf_msg(this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "dht_init failed");
+ goto out;
+ }
+
+ conf = this->private;
+
+ conf->methods = &tier_methods;
+
+ if (conf->subvolume_cnt != 2) {
+ gf_msg(this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "Invalid number of subvolumes %d", conf->subvolume_cnt);
+ goto out;
+ }
+
+ /* if instatiated from client side initialization is complete. */
+ if (!conf->defrag) {
+ ret = 0;
+ goto out;
+ }
+
+ defrag = conf->defrag;
+
+ GF_OPTION_INIT ("tier-promote-frequency",
+ defrag->tier_promote_frequency,
+ int32, out);
+
+ ret = dict_get_int32 (this->options,
+ "tier-promote-frequency", &freq);
+ if (ret) {
+ freq = DEFAULT_PROMOTE_FREQ_SEC;
+ }
+
+ defrag->tier_promote_frequency = freq;
+
+ GF_OPTION_INIT ("tier-demote-frequency",
+ defrag->tier_demote_frequency,
+ int32, out);
+
+ ret = dict_get_int32 (this->options,
+ "tier-demote-frequency", &freq);
+ if (ret) {
+ freq = DEFAULT_DEMOTE_FREQ_SEC;
+ }
+
+ defrag->tier_demote_frequency = freq;
+
+ GF_OPTION_INIT ("write-freq-threshold",
+ defrag->write_freq_threshold,
+ int32, out);
+
+ GF_OPTION_INIT ("read-freq-threshold",
+ defrag->read_freq_threshold,
+ int32, out);
+
+ gf_msg(this->name, GF_LOG_INFO, 0,
+ DHT_MSG_LOG_TIER_STATUS,
+ "Promote frequency set to %d demote set to %d",
+ defrag->tier_promote_frequency,
+ defrag->tier_demote_frequency);
+
+ ret = 0;
+
+out:
+ return ret;
+}
+
+
+int
+tier_reconfigure (xlator_t *this, dict_t *options)
+{
+ dht_conf_t *conf = NULL;
+ gf_defrag_info_t *defrag = NULL;
+
+ conf = this->private;
+
+ if (conf->defrag) {
+ defrag = conf->defrag;
+ GF_OPTION_RECONF ("tier-promote-frequency",
+ defrag->tier_promote_frequency, options,
+ int32, out);
+
+ GF_OPTION_RECONF ("tier-demote-frequency",
+ defrag->tier_demote_frequency, options,
+ int32, out);
+
+ GF_OPTION_RECONF ("write-freq-threshold",
+ defrag->write_freq_threshold, options,
+ int32, out);
+
+ GF_OPTION_RECONF ("read-freq-threshold",
+ defrag->read_freq_threshold, options,
+ int32, out);
+ }
+
+out:
+ return dht_reconfigure (this, options);
+}
+
+class_methods_t class_methods = {
+ .init = tier_init,
+ .fini = dht_fini,
+ .reconfigure = tier_reconfigure,
+ .notify = dht_notify
+};
+
+
+struct xlator_fops fops = {
+ .lookup = dht_lookup,
+ .create = dht_create,
+ .mknod = dht_mknod,
+
+ .stat = dht_stat,
+ .fstat = dht_fstat,
+ .truncate = dht_truncate,
+ .ftruncate = dht_ftruncate,
+ .access = dht_access,
+ .readlink = dht_readlink,
+ .setxattr = dht_setxattr,
+ .getxattr = dht_getxattr,
+ .removexattr = dht_removexattr,
+ .open = dht_open,
+ .readv = dht_readv,
+ .writev = dht_writev,
+ .flush = dht_flush,
+ .fsync = dht_fsync,
+ .statfs = dht_statfs,
+ .lk = dht_lk,
+ .opendir = dht_opendir,
+ .readdir = dht_readdir,
+ .readdirp = dht_readdirp,
+ .fsyncdir = dht_fsyncdir,
+ .symlink = dht_symlink,
+ .unlink = dht_unlink,
+ .link = dht_link,
+ .mkdir = dht_mkdir,
+ .rmdir = dht_rmdir,
+ .rename = dht_rename,
+ .inodelk = dht_inodelk,
+ .finodelk = dht_finodelk,
+ .entrylk = dht_entrylk,
+ .fentrylk = dht_fentrylk,
+ .xattrop = dht_xattrop,
+ .fxattrop = dht_fxattrop,
+ .setattr = dht_setattr,
+};
+
+
+struct xlator_cbks cbks = {
+ .forget = dht_forget
+};
+