summaryrefslogtreecommitdiffstats
path: root/xlators/cluster/dht/src/tier.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/cluster/dht/src/tier.c')
-rw-r--r--xlators/cluster/dht/src/tier.c1007
1 files changed, 1007 insertions, 0 deletions
diff --git a/xlators/cluster/dht/src/tier.c b/xlators/cluster/dht/src/tier.c
new file mode 100644
index 00000000000..028a42f7a1a
--- /dev/null
+++ b/xlators/cluster/dht/src/tier.c
@@ -0,0 +1,1007 @@
+/*
+ Copyright (c) 2015 Red Hat, Inc. <http://www.redhat.com>
+ This file is part of GlusterFS.
+
+ This file is licensed to you under your choice of the GNU Lesser
+ General Public License, version 3 or any later version (LGPLv3 or
+ later), or the GNU General Public License, version 2 (GPLv2), in all
+ cases as published by the Free Software Foundation.
+*/
+
+#ifndef _CONFIG_H
+#define _CONFIG_H
+#include "config.h"
+#endif
+
+#include "dht-common.h"
+#include "tier.h"
+
+/*Hard coded DB info*/
+static gfdb_db_type_t dht_tier_db_type = GFDB_SQLITE3;
+/*Hard coded DB info*/
+
+/*Mutex for updating the data movement stats*/
+static pthread_mutex_t dm_stat_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+#define DB_QUERY_RECORD_SIZE 4096
+
+static int
+tier_parse_query_str (char *query_record_str,
+ char *gfid, char *link_buffer, ssize_t *link_size)
+{
+ char *token_str = NULL;
+ char *delimiter = "|";
+ char *saveptr = NULL;
+ int ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("tier", query_record_str, out);
+ GF_VALIDATE_OR_GOTO ("tier", gfid, out);
+ GF_VALIDATE_OR_GOTO ("tier", link_buffer, out);
+ GF_VALIDATE_OR_GOTO ("tier", link_size, out);
+
+ token_str = strtok_r (query_record_str, delimiter, &saveptr);
+ if (!token_str)
+ goto out;
+
+ strcpy (gfid, token_str);
+
+
+ token_str = strtok_r (NULL, delimiter, &saveptr);
+ if (!token_str)
+ goto out;
+
+ strcpy (link_buffer, token_str);
+
+ token_str = strtok_r (NULL, delimiter, &saveptr);
+ if (!token_str)
+ goto out;
+
+ *link_size = atoi (token_str);
+
+ ret = 0;
+out:
+ return ret;
+}
+
+static int
+tier_migrate_using_query_file (void *_args)
+{
+ int ret = -1;
+ char gfid_str[UUID_CANONICAL_FORM_LEN+1] = "";
+ char query_record_str[4096] = "";
+ query_cbk_args_t *query_cbk_args = (query_cbk_args_t *) _args;
+ xlator_t *this = NULL;
+ gf_defrag_info_t *defrag = NULL;
+ char *token_str = NULL;
+ char *delimiter = "::";
+ char *link_buffer = NULL;
+ gfdb_query_record_t *query_record = NULL;
+ gfdb_link_info_t *link_info = NULL;
+ struct iatt par_stbuf = {0,};
+ struct iatt current = {0,};
+ loc_t p_loc = {0,};
+ loc_t loc = {0,};
+ dict_t *migrate_data = NULL;
+ inode_t *linked_inode = NULL;
+ int per_file_status = 0;
+ int per_link_status = 0;
+ int total_status = 0;
+ FILE *queryFILE = NULL;
+ char *link_str = NULL;
+
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args, out);
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->this, out);
+ this = query_cbk_args->this;
+ GF_VALIDATE_OR_GOTO (this->name, query_cbk_args->defrag, out);
+ GF_VALIDATE_OR_GOTO (this->name, query_cbk_args->queryFILE, out);
+
+ defrag = query_cbk_args->defrag;
+
+ queryFILE = query_cbk_args->queryFILE;
+
+ query_record = gfdb_query_record_init();
+ if (!query_record) {
+ goto out;
+ }
+
+ query_record->_link_info_str = calloc (DB_QUERY_RECORD_SIZE, 1);
+ if (!query_record->_link_info_str) {
+ goto out;
+ }
+ link_buffer = query_record->_link_info_str;
+
+ link_info = gfdb_link_info_init ();
+
+ migrate_data = dict_new ();
+ if (!migrate_data)
+ goto out;
+
+ /* Per file */
+ while (fscanf (queryFILE, "%s", query_record_str) != EOF) {
+
+ per_file_status = 0;
+ per_link_status = 0;
+
+ memset (gfid_str, 0, UUID_CANONICAL_FORM_LEN+1);
+ memset (query_record->_link_info_str, 0, DB_QUERY_RECORD_SIZE);
+
+ if (tier_parse_query_str (query_record_str, gfid_str,
+ link_buffer,
+ &query_record->link_info_size)) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "failed parsing %s\n", query_record_str);
+ continue;
+ }
+
+ uuid_parse (gfid_str, query_record->gfid);
+
+ if (dict_get(migrate_data, GF_XATTR_FILE_MIGRATE_KEY))
+ dict_del(migrate_data, GF_XATTR_FILE_MIGRATE_KEY);
+
+ if (dict_get(migrate_data, "from.migrator"))
+ dict_del(migrate_data, "from.migrator");
+
+ token_str = strtok (link_buffer, delimiter);
+ if (token_str != NULL) {
+ per_file_status =
+ dict_set_str (migrate_data,
+ GF_XATTR_FILE_MIGRATE_KEY,
+ "force");
+ if (per_file_status) {
+ goto per_file_out;
+ }
+
+ /* Flag to suggest the xattr call is from migrator */
+ per_file_status = dict_set_str (migrate_data,
+ "from.migrator", "yes");
+ if (per_file_status) {
+ goto per_file_out;
+ }
+ }
+ per_link_status = 0;
+ /* Per link of file */
+ while (token_str != NULL) {
+
+ link_str = gf_strdup (token_str);
+
+ if (!link_info) {
+ per_link_status = -1;
+ goto per_file_out;
+ }
+
+ memset (link_info, 0, sizeof(gfdb_link_info_t));
+
+ ret = str_to_link_info (link_str, link_info);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "failed parsing %s\n", link_str);
+ per_link_status = -1;
+ goto error;
+ }
+
+ uuid_copy (p_loc.gfid, link_info->pargfid);
+
+ p_loc.inode = inode_new (defrag->root_inode->table);
+ if (!p_loc.inode) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "failed parsing %s\n", link_str);
+ per_link_status = -1;
+ goto error;
+ }
+
+ ret = syncop_lookup (this, &p_loc, NULL, &par_stbuf,
+ NULL, NULL);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ " ERROR in parent lookup\n");
+ per_link_status = -1;
+ goto error;
+ }
+ linked_inode = inode_link (p_loc.inode, NULL, NULL,
+ &par_stbuf);
+ inode_unref (p_loc.inode);
+ p_loc.inode = linked_inode;
+
+ uuid_copy (loc.gfid, query_record->gfid);
+ loc.inode = inode_new (defrag->root_inode->table);
+ uuid_copy (loc.pargfid, link_info->pargfid);
+ loc.parent = inode_ref(p_loc.inode);
+
+ loc.name = gf_strdup (link_info->file_name);
+ if (!loc.name) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR, "ERROR in "
+ "memory allocation\n");
+ per_link_status = -1;
+ goto error;
+ }
+
+ loc.path = gf_strdup (link_info->file_path);
+ if (!loc.path) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR, "ERROR in "
+ "memory allocation\n");
+ per_link_status = -1;
+ goto error;
+ }
+
+ uuid_copy (loc.parent->gfid, link_info->pargfid);
+
+ ret = syncop_lookup (this, &loc, NULL, &current,
+ NULL, NULL);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR, "ERROR in "
+ "current lookup\n");
+ per_link_status = -1;
+ goto error;
+ }
+ linked_inode = inode_link (loc.inode, NULL, NULL,
+ &current);
+ inode_unref (loc.inode);
+ loc.inode = linked_inode;
+
+ gf_msg (this->name, GF_LOG_INFO, 0,
+ DHT_MSG_LOG_TIER_STATUS, "Tier migrate file %s",
+ loc.name);
+
+ ret = syncop_setxattr (this, &loc, migrate_data, 0);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR, "ERROR %d in "
+ "current migration %s %s\n", ret,
+ loc.name,
+ loc.path);
+ per_link_status = -1;
+ goto error;
+ }
+ inode_unref (loc.inode);
+ inode_unref (loc.parent);
+ inode_unref (p_loc.inode);
+error:
+ if (loc.name) {
+ GF_FREE ((char *) loc.name);
+ }
+ if (loc.path) {
+ GF_FREE ((char *) loc.path);
+ }
+ token_str = NULL;
+ token_str = strtok (NULL, delimiter);
+ GF_FREE (link_str);
+ }
+ per_file_status = per_link_status;
+per_file_out:
+ if (per_file_status) {
+ pthread_mutex_lock (&dm_stat_mutex);
+ defrag->total_failures++;
+ pthread_mutex_unlock (&dm_stat_mutex);
+ } else {
+ pthread_mutex_lock (&dm_stat_mutex);
+ defrag->total_files++;
+ pthread_mutex_unlock (&dm_stat_mutex);
+ }
+ total_status = total_status + per_file_status;
+ per_link_status = 0;
+ per_file_status = 0;
+ query_record_str[0] = '\0';
+ }
+
+out:
+ if (link_buffer)
+ free (link_buffer);
+ gfdb_link_info_fini (&link_info);
+ if (migrate_data)
+ dict_unref (migrate_data);
+ gfdb_query_record_fini (&query_record);
+ return total_status;
+}
+
+
+/*This is the call back function per record/file from data base*/
+static int
+tier_gf_query_callback (gfdb_query_record_t *gfdb_query_record,
+ void *_args) {
+ int ret = -1;
+ char gfid_str[UUID_CANONICAL_FORM_LEN] = "";
+ query_cbk_args_t *query_cbk_args = _args;
+
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args, out);
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->defrag, out);
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->queryFILE, out);
+
+ uuid_unparse (gfdb_query_record->gfid, gfid_str);
+ fprintf (query_cbk_args->queryFILE, "%s|%s|%ld\n", gfid_str,
+ gfdb_query_record->_link_info_str,
+ gfdb_query_record->link_info_size);
+
+ pthread_mutex_lock (&dm_stat_mutex);
+ query_cbk_args->defrag->num_files_lookedup++;
+ pthread_mutex_unlock (&dm_stat_mutex);
+
+ ret = 0;
+out:
+ return ret;
+}
+
+/*This is the call back function for each brick from hot/cold bricklist
+ * It picks up each bricks db and queries for eligible files for migration.
+ * The list of eligible files are populated in appropriate query files*/
+static int
+tier_process_brick_cbk (dict_t *brick_dict, char *key, data_t *value,
+ void *args) {
+ int ret = -1;
+ char *db_path = NULL;
+ query_cbk_args_t *query_cbk_args = NULL;
+ xlator_t *this = NULL;
+ gfdb_conn_node_t *conn_node = NULL;
+ dict_t *params_dict = NULL;
+ _gfdb_brick_dict_info_t *gfdb_brick_dict_info = args;
+
+ /*Init of all the essentials*/
+ GF_VALIDATE_OR_GOTO ("tier", gfdb_brick_dict_info , out);
+ query_cbk_args = gfdb_brick_dict_info->_query_cbk_args;
+
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->this, out);
+ this = query_cbk_args->this;
+
+ GF_VALIDATE_OR_GOTO (this->name,
+ gfdb_brick_dict_info->_query_cbk_args, out);
+
+ GF_VALIDATE_OR_GOTO (this->name, value, out);
+ db_path = data_to_str(value);
+
+ /*Preparing DB parameters before init_db i.e getting db connection*/
+ params_dict = dict_new ();
+ if (!params_dict) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "DB Params cannot initialized!");
+ goto out;
+ }
+ SET_DB_PARAM_TO_DICT(this->name, params_dict, GFDB_SQL_PARAM_DBPATH,
+ db_path, ret, out);
+
+ /*Get the db connection*/
+ conn_node = init_db((void *)params_dict, dht_tier_db_type);
+ if (!conn_node) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "FATAL: Failed initializing db operations");
+ goto out;
+ }
+
+ /*Query for eligible files from db*/
+ query_cbk_args->queryFILE = fopen(GET_QFILE_PATH
+ (gfdb_brick_dict_info->_gfdb_promote), "a+");
+ if (!query_cbk_args->queryFILE) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
+ "Failed to open query file %s:%s",
+ GET_QFILE_PATH
+ (gfdb_brick_dict_info->_gfdb_promote),
+ strerror(errno));
+ goto out;
+ }
+ if (!gfdb_brick_dict_info->_gfdb_promote) {
+ if (query_cbk_args->defrag->write_freq_threshold == 0 &&
+ query_cbk_args->defrag->read_freq_threshold == 0) {
+ ret = find_unchanged_for_time(conn_node,
+ tier_gf_query_callback,
+ (void *)query_cbk_args,
+ gfdb_brick_dict_info->time_stamp);
+ } else {
+ ret = find_unchanged_for_time_freq(conn_node,
+ tier_gf_query_callback,
+ (void *)query_cbk_args,
+ gfdb_brick_dict_info->time_stamp,
+ query_cbk_args->defrag->
+ write_freq_threshold,
+ query_cbk_args->defrag->
+ read_freq_threshold,
+ _gf_false);
+ }
+ } else {
+ if (query_cbk_args->defrag->write_freq_threshold == 0 &&
+ query_cbk_args->defrag->read_freq_threshold == 0) {
+ ret = find_recently_changed_files(conn_node,
+ tier_gf_query_callback,
+ (void *)query_cbk_args,
+ gfdb_brick_dict_info->time_stamp);
+ } else {
+ ret = find_recently_changed_files_freq(conn_node,
+ tier_gf_query_callback,
+ (void *)query_cbk_args,
+ gfdb_brick_dict_info->time_stamp,
+ query_cbk_args->defrag->
+ write_freq_threshold,
+ query_cbk_args->defrag->
+ read_freq_threshold,
+ _gf_false);
+ }
+ }
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "FATAL: query from db failed");
+ goto out;
+ }
+ ret = 0;
+out:
+ if (query_cbk_args->queryFILE) {
+ fclose (query_cbk_args->queryFILE);
+ query_cbk_args->queryFILE = NULL;
+ }
+ fini_db (conn_node);
+ return ret;
+}
+
+inline int
+tier_build_migration_qfile (demotion_args_t *args,
+ query_cbk_args_t *query_cbk_args,
+ gf_boolean_t is_promotion)
+{
+ gfdb_time_t current_time;
+ _gfdb_brick_dict_info_t gfdb_brick_dict_info;
+ gfdb_time_t time_in_past;
+ int ret = -1;
+
+ remove (GET_QFILE_PATH (is_promotion));
+ time_in_past.tv_sec = args->freq_time;
+ time_in_past.tv_usec = 0;
+ if (gettimeofday (&current_time, NULL) == -1) {
+ gf_log (args->this->name, GF_LOG_ERROR,
+ "Failed to get current timen");
+ goto out;
+ }
+ time_in_past.tv_sec = current_time.tv_sec - time_in_past.tv_sec;
+ time_in_past.tv_usec = current_time.tv_usec - time_in_past.tv_usec;
+ gfdb_brick_dict_info.time_stamp = &time_in_past;
+ gfdb_brick_dict_info._gfdb_promote = is_promotion;
+ gfdb_brick_dict_info._query_cbk_args = query_cbk_args;
+ ret = dict_foreach (args->brick_list, tier_process_brick_cbk,
+ &gfdb_brick_dict_info);
+ if (ret) {
+ gf_log (args->this->name, GF_LOG_ERROR,
+ "Brick query failedn");
+ goto out;
+ }
+out:
+ return ret;
+}
+
+inline int
+tier_migrate_files_using_qfile (demotion_args_t *comp,
+ query_cbk_args_t *query_cbk_args,
+ char *qfile)
+{
+ char renamed_file[PATH_MAX] = "";
+ int ret = -1;
+
+ query_cbk_args->queryFILE = fopen (qfile, "r");
+ if (!query_cbk_args->queryFILE) {
+ gf_log ("tier", GF_LOG_ERROR,
+ "Failed opening %s for migration", qfile);
+ goto out;
+ }
+ ret = tier_migrate_using_query_file ((void *)query_cbk_args);
+ fclose (query_cbk_args->queryFILE);
+ query_cbk_args->queryFILE = NULL;
+ if (ret) {
+ sprintf (renamed_file, "%s.err", qfile);
+ rename (qfile, renamed_file);
+ }
+out:
+ return ret;
+}
+
+/*Demotion Thread*/
+static void *
+tier_demote (void *args)
+{
+ int ret = -1;
+ query_cbk_args_t query_cbk_args;
+ demotion_args_t *demotion_args = args;
+
+ GF_VALIDATE_OR_GOTO ("tier", demotion_args, out);
+ GF_VALIDATE_OR_GOTO ("tier", demotion_args->this, out);
+ GF_VALIDATE_OR_GOTO (demotion_args->this->name,
+ demotion_args->brick_list, out);
+ GF_VALIDATE_OR_GOTO (demotion_args->this->name,
+ demotion_args->defrag, out);
+
+ query_cbk_args.this = demotion_args->this;
+ query_cbk_args.defrag = demotion_args->defrag;
+
+ /*Build the query file using bricklist*/
+ ret = tier_build_migration_qfile(demotion_args, &query_cbk_args,
+ _gf_false);
+ if (ret)
+ goto out;
+
+ /* Migrate files using the query file */
+ ret = tier_migrate_files_using_qfile (args,
+ &query_cbk_args, DEMOTION_QFILE);
+ if (ret)
+ goto out;
+
+out:
+ demotion_args->return_value = ret;
+ return NULL;
+}
+
+
+/*Promotion Thread*/
+static void
+*tier_promote (void *args)
+{
+ int ret = -1;
+ query_cbk_args_t query_cbk_args;
+ promotion_args_t *promotion_args = args;
+
+ GF_VALIDATE_OR_GOTO ("tier", promotion_args->this, out);
+ GF_VALIDATE_OR_GOTO (promotion_args->this->name,
+ promotion_args->brick_list, out);
+ GF_VALIDATE_OR_GOTO (promotion_args->this->name,
+ promotion_args->defrag, out);
+
+ query_cbk_args.this = promotion_args->this;
+ query_cbk_args.defrag = promotion_args->defrag;
+
+ /*Build the query file using bricklist*/
+ ret = tier_build_migration_qfile(promotion_args, &query_cbk_args,
+ _gf_true);
+ if (ret)
+ goto out;
+
+ /* Migrate files using the query file */
+ ret = tier_migrate_files_using_qfile (args,
+ &query_cbk_args,
+ PROMOTION_QFILE);
+ if (ret)
+ goto out;
+
+out:
+ promotion_args->return_value = ret;
+ return NULL;
+}
+
+static void
+tier_get_bricklist (xlator_t *xl, dict_t *bricklist)
+{
+ xlator_list_t *child = NULL;
+ char *rv = NULL;
+ char *rh = NULL;
+ char localhost[256] = {0};
+ char *db_path = "";
+ char *brickname = NULL;
+ char db_name[PATH_MAX] = "";
+ int ret = 0;
+
+ GF_VALIDATE_OR_GOTO ("tier", xl, out);
+ GF_VALIDATE_OR_GOTO ("tier", bricklist, out);
+
+ gethostname (localhost, sizeof (localhost));
+
+ /*
+ * This function obtains remote subvolumes and filters out only
+ * those running on the same node as the tier daemon.
+ */
+ if (strcmp(xl->type, "protocol/client") == 0) {
+ ret = dict_get_str(xl->options, "remote-host", &rh);
+ if (ret < 0)
+ goto out;
+
+ if (gf_is_local_addr (rh)) {
+
+ ret = dict_get_str(xl->options, "remote-subvolume",
+ &rv);
+ if (ret < 0)
+ goto out;
+ brickname = strrchr(rv, '/') + 1;
+ snprintf(db_name, sizeof(db_name), "%s.db",
+ brickname);
+ db_path = GF_CALLOC (PATH_MAX, 1, gf_common_mt_char);
+ if (!db_path) {
+ gf_msg ("tier", GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_STATUS,
+ "Failed to allocate memory for bricklist");
+ goto out;
+ }
+
+ sprintf(db_path, "%s/.glusterfs/%s", rv, db_name);
+ if (dict_add_dynstr_with_alloc(bricklist, "brick",
+ db_path))
+ goto out;
+ }
+ }
+
+ for (child = xl->children; child; child = child->next) {
+ tier_get_bricklist(child->xlator, bricklist);
+ }
+out:
+ return;
+}
+
+int
+tier_start (xlator_t *this, gf_defrag_info_t *defrag)
+{
+ dict_t *bricklist_cold = NULL;
+ dict_t *bricklist_hot = NULL;
+ dht_conf_t *conf = NULL;
+ int tick = 0;
+ int next_demote = 0;
+ int next_promote = 0;
+ int freq_promote = 0;
+ int freq_demote = 0;
+ promotion_args_t promotion_args = { 0 };
+ demotion_args_t demotion_args = { 0 };
+ int ret_promotion = 0;
+ int ret_demotion = 0;
+ int ret = 0;
+ pthread_t promote_thread;
+ pthread_t demote_thread;
+
+ conf = this->private;
+
+ bricklist_cold = dict_new();
+ if (!bricklist_cold)
+ return -1;
+
+ bricklist_hot = dict_new();
+ if (!bricklist_hot)
+ return -1;
+
+ tier_get_bricklist (conf->subvolumes[0], bricklist_cold);
+ tier_get_bricklist (conf->subvolumes[1], bricklist_hot);
+
+ freq_promote = defrag->tier_promote_frequency;
+ freq_demote = defrag->tier_demote_frequency;
+
+ next_promote = defrag->tier_promote_frequency % TIMER_SECS;
+ next_demote = defrag->tier_demote_frequency % TIMER_SECS;
+
+
+ gf_msg (this->name, GF_LOG_INFO, 0,
+ DHT_MSG_LOG_TIER_STATUS, "Begin run tier promote %d demote %d",
+ next_promote, next_demote);
+
+ defrag->defrag_status = GF_DEFRAG_STATUS_STARTED;
+
+ while (1) {
+
+ sleep(1);
+
+ ret_promotion = -1;
+ ret_demotion = -1;
+
+ if (defrag->defrag_status != GF_DEFRAG_STATUS_STARTED) {
+ ret = 1;
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "defrag->defrag_status != "
+ "GF_DEFRAG_STATUS_STARTED");
+ goto out;
+ }
+
+ tick = (tick + 1) % TIMER_SECS;
+ if ((next_demote != tick) && (next_promote != tick))
+ continue;
+
+ if (next_demote >= tick) {
+ demotion_args.this = this;
+ demotion_args.brick_list = bricklist_hot;
+ demotion_args.defrag = defrag;
+ demotion_args.freq_time = freq_demote;
+ ret_demotion = pthread_create (&demote_thread, NULL,
+ &tier_demote, &demotion_args);
+ if (ret_demotion) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "Failed starting Demotion thread!");
+ }
+ freq_demote = defrag->tier_demote_frequency;
+ next_demote = (tick + freq_demote) % TIMER_SECS;
+ }
+
+ if (next_promote >= tick) {
+ promotion_args.this = this;
+ promotion_args.brick_list = bricklist_cold;
+ promotion_args.defrag = defrag;
+ promotion_args.freq_time = freq_promote;
+ ret_promotion = pthread_create (&promote_thread, NULL,
+ &tier_promote, &promotion_args);
+ if (ret_promotion) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "Failed starting Promotion thread!");
+ }
+ freq_promote = defrag->tier_promote_frequency;
+ next_promote = (tick + freq_promote) % TIMER_SECS;
+ }
+
+ if (ret_demotion == 0) {
+ pthread_join (demote_thread, NULL);
+ if (demotion_args.return_value) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "Demotion failed!");
+ }
+ ret_demotion = demotion_args.return_value;
+ }
+
+ if (ret_promotion == 0) {
+ pthread_join (promote_thread, NULL);
+ if (promotion_args.return_value) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "Promotion failed!");
+ }
+ ret_promotion = promotion_args.return_value;
+ }
+
+ /*Collect previous and current cummulative status */
+ ret = ret | ret_demotion | ret_promotion;
+
+ /*reseting promotion and demotion arguments for next iteration*/
+ memset (&demotion_args, 0, sizeof(demotion_args_t));
+ memset (&promotion_args, 0, sizeof(promotion_args_t));
+
+ }
+
+ ret = 0;
+out:
+
+ dict_unref(bricklist_cold);
+ dict_unref(bricklist_hot);
+
+ return ret;
+}
+
+int32_t
+tier_migration_needed (xlator_t *this)
+{
+ gf_defrag_info_t *defrag = NULL;
+ dht_conf_t *conf = NULL;
+ int ret = 0;
+
+ conf = this->private;
+
+ GF_VALIDATE_OR_GOTO (this->name, conf, out);
+ GF_VALIDATE_OR_GOTO (this->name, conf->defrag, out);
+
+ defrag = conf->defrag;
+
+ if (defrag->cmd == GF_DEFRAG_CMD_START_TIER)
+ ret = 1;
+out:
+ return ret;
+}
+
+int32_t
+tier_migration_get_dst (xlator_t *this, dht_local_t *local)
+{
+ dht_conf_t *conf = NULL;
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO("tier", this, out);
+ GF_VALIDATE_OR_GOTO(this->name, this->private, out);
+
+ conf = this->private;
+ if (!conf)
+ goto out;
+
+ if (conf->subvolumes[0] == local->cached_subvol)
+ local->rebalance.target_node =
+ conf->subvolumes[1];
+ else
+ local->rebalance.target_node =
+ conf->subvolumes[0];
+
+ if (local->rebalance.target_node)
+ ret = 0;
+
+out:
+ return ret;
+}
+
+xlator_t *
+tier_search (xlator_t *this, dht_layout_t *layout, const char *name)
+{
+ xlator_t *subvol = NULL;
+ void *value;
+ int search_first_subvol = 0;
+
+ GF_VALIDATE_OR_GOTO("tier", this, out);
+ GF_VALIDATE_OR_GOTO(this->name, layout, out);
+ GF_VALIDATE_OR_GOTO(this->name, name, out);
+
+ if (!dict_get_ptr (this->options, "rule", &value) &&
+ !strcmp(layout->list[0].xlator->name, value)) {
+ search_first_subvol = 1;
+ }
+
+ if (search_first_subvol)
+ subvol = layout->list[0].xlator;
+ else
+ subvol = layout->list[1].xlator;
+
+out:
+ return subvol;
+}
+
+
+dht_methods_t tier_methods = {
+ .migration_get_dst_subvol = tier_migration_get_dst,
+ .migration_other = tier_start,
+ .migration_needed = tier_migration_needed,
+ .layout_search = tier_search,
+};
+
+
+int
+tier_init (xlator_t *this)
+{
+ int ret = -1;
+ int freq = 0;
+ dht_conf_t *conf = NULL;
+ gf_defrag_info_t *defrag = NULL;
+
+ ret = dht_init(this);
+ if (ret) {
+ gf_msg(this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "dht_init failed");
+ goto out;
+ }
+
+ conf = this->private;
+
+ conf->methods = &tier_methods;
+
+ if (conf->subvolume_cnt != 2) {
+ gf_msg(this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "Invalid number of subvolumes %d", conf->subvolume_cnt);
+ goto out;
+ }
+
+ /* if instatiated from client side initialization is complete. */
+ if (!conf->defrag) {
+ ret = 0;
+ goto out;
+ }
+
+ defrag = conf->defrag;
+
+ GF_OPTION_INIT ("tier-promote-frequency",
+ defrag->tier_promote_frequency,
+ int32, out);
+
+ ret = dict_get_int32 (this->options,
+ "tier-promote-frequency", &freq);
+ if (ret) {
+ freq = DEFAULT_PROMOTE_FREQ_SEC;
+ }
+
+ defrag->tier_promote_frequency = freq;
+
+ GF_OPTION_INIT ("tier-demote-frequency",
+ defrag->tier_demote_frequency,
+ int32, out);
+
+ ret = dict_get_int32 (this->options,
+ "tier-demote-frequency", &freq);
+ if (ret) {
+ freq = DEFAULT_DEMOTE_FREQ_SEC;
+ }
+
+ defrag->tier_demote_frequency = freq;
+
+ GF_OPTION_INIT ("write-freq-threshold",
+ defrag->write_freq_threshold,
+ int32, out);
+
+ GF_OPTION_INIT ("read-freq-threshold",
+ defrag->read_freq_threshold,
+ int32, out);
+
+ gf_msg(this->name, GF_LOG_INFO, 0,
+ DHT_MSG_LOG_TIER_STATUS,
+ "Promote frequency set to %d demote set to %d",
+ defrag->tier_promote_frequency,
+ defrag->tier_demote_frequency);
+
+ ret = 0;
+
+out:
+ return ret;
+}
+
+
+int
+tier_reconfigure (xlator_t *this, dict_t *options)
+{
+ dht_conf_t *conf = NULL;
+ gf_defrag_info_t *defrag = NULL;
+
+ conf = this->private;
+
+ if (conf->defrag) {
+ defrag = conf->defrag;
+ GF_OPTION_RECONF ("tier-promote-frequency",
+ defrag->tier_promote_frequency, options,
+ int32, out);
+
+ GF_OPTION_RECONF ("tier-demote-frequency",
+ defrag->tier_demote_frequency, options,
+ int32, out);
+
+ GF_OPTION_RECONF ("write-freq-threshold",
+ defrag->write_freq_threshold, options,
+ int32, out);
+
+ GF_OPTION_RECONF ("read-freq-threshold",
+ defrag->read_freq_threshold, options,
+ int32, out);
+ }
+
+out:
+ return dht_reconfigure (this, options);
+}
+
+class_methods_t class_methods = {
+ .init = tier_init,
+ .fini = dht_fini,
+ .reconfigure = tier_reconfigure,
+ .notify = dht_notify
+};
+
+
+struct xlator_fops fops = {
+ .lookup = dht_lookup,
+ .create = dht_create,
+ .mknod = dht_mknod,
+
+ .stat = dht_stat,
+ .fstat = dht_fstat,
+ .truncate = dht_truncate,
+ .ftruncate = dht_ftruncate,
+ .access = dht_access,
+ .readlink = dht_readlink,
+ .setxattr = dht_setxattr,
+ .getxattr = dht_getxattr,
+ .removexattr = dht_removexattr,
+ .open = dht_open,
+ .readv = dht_readv,
+ .writev = dht_writev,
+ .flush = dht_flush,
+ .fsync = dht_fsync,
+ .statfs = dht_statfs,
+ .lk = dht_lk,
+ .opendir = dht_opendir,
+ .readdir = dht_readdir,
+ .readdirp = dht_readdirp,
+ .fsyncdir = dht_fsyncdir,
+ .symlink = dht_symlink,
+ .unlink = dht_unlink,
+ .link = dht_link,
+ .mkdir = dht_mkdir,
+ .rmdir = dht_rmdir,
+ .rename = dht_rename,
+ .inodelk = dht_inodelk,
+ .finodelk = dht_finodelk,
+ .entrylk = dht_entrylk,
+ .fentrylk = dht_fentrylk,
+ .xattrop = dht_xattrop,
+ .fxattrop = dht_fxattrop,
+ .setattr = dht_setattr,
+};
+
+
+struct xlator_cbks cbks = {
+ .forget = dht_forget
+};
+