summaryrefslogtreecommitdiffstats
path: root/xlators
diff options
context:
space:
mode:
authorJoseph Fernandes <josferna@redhat.com>2016-01-26 17:47:08 +0530
committerDan Lambright <dlambrig@redhat.com>2016-02-03 10:04:40 -0800
commit11202e6c726f79ddf0e461338d7dce158733122e (patch)
treedf1381c8783ba8f5507f8311db4b9febf9536d10 /xlators
parent545f4ed2c7195a21210e6a055c27c1b7a115e18c (diff)
tier/gfdb : Round-Robin read of query files
1. Each brick on a host will get a separate query file. 2. While reading query record from these query files we read them in a Round-Robin manner. 3. When an error occurs during migration we rename it to query file with an time stamp and .err extension for better debugging. Change-Id: I27c4285d24fd695d2d5cbd9fd7db3879d277ecc8 BUG: 1302772 Signed-off-by: Joseph Fernandes <josferna@redhat.com> Reviewed-on: http://review.gluster.org/13293 Smoke: Gluster Build System <jenkins@build.gluster.com> Tested-by: N Balachandran <nbalacha@redhat.com> NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org> Reviewed-by: Dan Lambright <dlambrig@redhat.com> CentOS-regression: Gluster Build System <jenkins@build.gluster.com>
Diffstat (limited to 'xlators')
-rw-r--r--xlators/cluster/dht/src/dht-mem-types.h1
-rw-r--r--xlators/cluster/dht/src/tier.c380
-rw-r--r--xlators/cluster/dht/src/tier.h26
3 files changed, 326 insertions, 81 deletions
diff --git a/xlators/cluster/dht/src/dht-mem-types.h b/xlators/cluster/dht/src/dht-mem-types.h
index 27ebf9dc501..5de5d1838ad 100644
--- a/xlators/cluster/dht/src/dht-mem-types.h
+++ b/xlators/cluster/dht/src/dht-mem-types.h
@@ -37,6 +37,7 @@ enum gf_dht_mem_types_ {
gf_tier_mt_bricklist_t,
gf_tier_mt_ipc_ctr_params_t,
gf_dht_mt_fd_ctx_t,
+ gf_tier_mt_qfile_array_t,
gf_dht_mt_end
};
#endif
diff --git a/xlators/cluster/dht/src/tier.c b/xlators/cluster/dht/src/tier.c
index 7be5c4a2b81..8a756b88c27 100644
--- a/xlators/cluster/dht/src/tier.c
+++ b/xlators/cluster/dht/src/tier.c
@@ -22,7 +22,9 @@ static gfdb_db_type_t dht_tier_db_type = GFDB_SQLITE3;
/*Mutex for updating the data movement stats*/
static pthread_mutex_t dm_stat_mutex = PTHREAD_MUTEX_INITIALIZER;
+/* Stores the path location of promotion query files */
static char *promotion_qfile;
+/* Stores the path location of demotion query files */
static char *demotion_qfile;
static void *libhandle;
@@ -30,6 +32,162 @@ static gfdb_methods_t gfdb_methods;
#define DB_QUERY_RECORD_SIZE 4096
+/*
+ * Closes all the fds and frees the qfile_array
+ * */
+static void
+qfile_array_free (tier_qfile_array_t *qfile_array)
+{
+ ssize_t i = 0;
+
+ if (qfile_array) {
+ if (qfile_array->fd_array) {
+ for (i = 0; i < qfile_array->array_size; i++) {
+ if (qfile_array->fd_array[i] != -1) {
+ sys_close (qfile_array->fd_array[i]);
+ }
+ }
+ }
+ GF_FREE (qfile_array->fd_array);
+ }
+ GF_FREE (qfile_array);
+}
+
+
+/* Create a new query file list with given size */
+static tier_qfile_array_t *
+qfile_array_new (ssize_t array_size)
+{
+ int ret = -1;
+ tier_qfile_array_t *qfile_array = NULL;
+ ssize_t i = 0;
+
+ GF_VALIDATE_OR_GOTO ("tier", (array_size > 0), out);
+
+ qfile_array = GF_CALLOC (1, sizeof (tier_qfile_array_t),
+ gf_tier_mt_qfile_array_t);
+ if (!qfile_array) {
+ gf_msg ("tier", GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
+ "Failed to allocate memory for tier_qfile_array_t");
+ goto out;
+ }
+
+ qfile_array->fd_array = GF_CALLOC (array_size, sizeof (int),
+ gf_dht_mt_int32_t);
+ if (!qfile_array->fd_array) {
+ gf_msg ("tier", GF_LOG_ERROR, 0, DHT_MSG_LOG_TIER_ERROR,
+ "Failed to allocate memory for "
+ "tier_qfile_array_t->fd_array");
+ goto out;
+ }
+
+ /* Init all the fds to -1 */
+ for (i = 0; i < array_size; i++) {
+ qfile_array->fd_array[i] = -1;
+ }
+
+ qfile_array->array_size = array_size;
+ qfile_array->next_index = 0;
+
+ /* Set exhausted count to list size as the list is empty */
+ qfile_array->exhausted_count = qfile_array->array_size;
+
+ ret = 0;
+out:
+ if (ret) {
+ qfile_array_free (qfile_array);
+ qfile_array = NULL;
+ }
+ return qfile_array;
+}
+
+
+/* Checks if the query file list is empty or totally exhausted. */
+static gf_boolean_t
+is_qfile_array_empty (tier_qfile_array_t *qfile_array)
+{
+ return (qfile_array->exhausted_count == qfile_array->array_size) ?
+ _gf_true : _gf_false;
+}
+
+
+/* Shifts the next_fd pointer to the next available fd in the list */
+static void
+shift_next_index (tier_qfile_array_t *qfile_array)
+{
+ int qfile_fd = 0;
+ int spin_count = 0;
+
+ if (is_qfile_array_empty (qfile_array)) {
+ return;
+ }
+
+ do {
+ /* change next_index in a rotional manner */
+ (qfile_array->next_index == (qfile_array->array_size - 1)) ?
+ qfile_array->next_index = 0 : qfile_array->next_index++;
+
+ qfile_fd = (qfile_array->fd_array[qfile_array->next_index]);
+
+ spin_count++;
+
+ } while ((qfile_fd == -1) && (spin_count < qfile_array->array_size));
+
+}
+
+/*
+ * This is a non-thread safe function to read query records
+ * from a list of query files in a Round-Robin manner.
+ * As in when the query files get exhuasted they are closed.
+ * Returns:
+ * 0 if all the query records in all the query files of the list are
+ * exhausted.
+ * > 0 if a query record is successfully read. Indicates the size of the query
+ * record read.
+ * < 0 if there was failure
+ * */
+static int
+read_query_record_list (tier_qfile_array_t *qfile_array,
+ gfdb_query_record_t **query_record)
+{
+ int ret = -1;
+ int qfile_fd = 0;
+
+ GF_VALIDATE_OR_GOTO ("tier", qfile_array, out);
+ GF_VALIDATE_OR_GOTO ("tier", qfile_array->fd_array, out);
+
+ do {
+ if (is_qfile_array_empty (qfile_array)) {
+ ret = 0;
+ break;
+ }
+
+ qfile_fd = qfile_array->fd_array[qfile_array->next_index];
+ ret = gfdb_methods.gfdb_read_query_record
+ (qfile_fd, query_record);
+ if (ret <= 0) {
+ /*The qfile_fd has reached EOF or
+ * there was an error.
+ * 1. Close the exhausted fd
+ * 2. increment the exhausted count
+ * 3. shift next_qfile to next qfile
+ **/
+ sys_close (qfile_fd);
+ qfile_array->fd_array[qfile_array->next_index] = -1;
+ qfile_array->exhausted_count++;
+ /* shift next_qfile to next qfile */
+ shift_next_index (qfile_array);
+ continue;
+ } else {
+ /* shift next_qfile to next qfile */
+ shift_next_index (qfile_array);
+ break;
+ }
+ } while (1);
+out:
+ return ret;
+}
+
/* Check and update the watermark every WM_INTERVAL seconds */
#define WM_INTERVAL 5
@@ -126,8 +284,10 @@ tier_check_watermark (xlator_t *this, loc_t *root_loc)
goto exit;
}
- /* Find how much free space is on the hot subvolume. Then see if that value */
- /* is less than or greater than user defined watermarks. Stash results in */
+ /* Find how much free space is on the hot subvolume.
+ * Then see if that value */
+ /* is less than or greater than user defined watermarks.
+ * Stash results in */
/* the tier_conf data structure. */
ret = syncop_statfs (conf->subvolumes[1], root_loc, &statfs,
@@ -255,7 +415,7 @@ static int
tier_migrate_using_query_file (void *_args)
{
int ret = -1;
- query_cbk_args_t *query_cbk_args = (query_cbk_args_t *) _args;
+ query_cbk_args_t *query_cbk_args = (query_cbk_args_t *) _args;
xlator_t *this = NULL;
gf_defrag_info_t *defrag = NULL;
gfdb_query_record_t *query_record = NULL;
@@ -278,7 +438,6 @@ tier_migrate_using_query_file (void *_args)
int per_file_status = 0;
int per_link_status = 0;
int total_status = 0;
- int query_fd = 0;
xlator_t *src_subvol = NULL;
dht_conf_t *conf = NULL;
uint64_t total_migrated_bytes = 0;
@@ -293,15 +452,13 @@ tier_migrate_using_query_file (void *_args)
GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->this, out);
this = query_cbk_args->this;
GF_VALIDATE_OR_GOTO (this->name, query_cbk_args->defrag, out);
- GF_VALIDATE_OR_GOTO (this->name, (query_cbk_args->query_fd > 0), out);
+ GF_VALIDATE_OR_GOTO (this->name, query_cbk_args->qfile_array, out);
GF_VALIDATE_OR_GOTO (this->name, this->private, out);
conf = this->private;
defrag = query_cbk_args->defrag;
- query_fd = query_cbk_args->query_fd;
-
migrate_data = dict_new ();
if (!migrate_data)
goto out;
@@ -333,8 +490,8 @@ tier_migrate_using_query_file (void *_args)
}
/* Per file */
- while ((ret = gfdb_methods.gfdb_read_query_record
- (query_fd, &query_record)) != 0) {
+ while ((ret = read_query_record_list (query_cbk_args->qfile_array,
+ &query_record)) != 0) {
if (ret < 0) {
gf_msg (this->name, GF_LOG_ERROR, 0,
@@ -773,16 +930,14 @@ tier_process_self_query (tier_brick_list_t *local_brick, void *args)
}
/* Query for eligible files from db */
- query_cbk_args->query_fd = open (GET_QFILE_PATH
- (gfdb_brick_info->_gfdb_promote),
+ query_cbk_args->query_fd = open (local_brick->qfile_path,
O_WRONLY | O_CREAT | O_APPEND,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
if (query_cbk_args->query_fd < 0) {
gf_msg (this->name, GF_LOG_ERROR, errno,
DHT_MSG_LOG_TIER_ERROR,
"Failed to open query file %s",
- GET_QFILE_PATH
- (gfdb_brick_info->_gfdb_promote));
+ local_brick->qfile_path);
goto out;
}
if (!gfdb_brick_info->_gfdb_promote) {
@@ -872,6 +1027,7 @@ out:
query_cbk_args->query_fd = -1;
}
gfdb_methods.fini_db (conn_node);
+
return ret;
}
@@ -938,9 +1094,10 @@ tier_process_ctr_query (tier_brick_list_t *local_brick, void *args)
GFDB_IPC_CTR_KEY, GFDB_IPC_CTR_QUERY_OPS,
ret, out);
+
SET_DB_PARAM_TO_DICT(this->name, ctr_ipc_in_dict,
GFDB_IPC_CTR_GET_QFILE_PATH,
- GET_QFILE_PATH(ipc_ctr_params->is_promote),
+ local_brick->qfile_path,
ret, out);
ret = dict_set_bin (ctr_ipc_in_dict, GFDB_IPC_CTR_GET_QUERY_PARAMS,
@@ -1003,7 +1160,7 @@ out:
-/*This is the call back function for each brick from hot/cold bricklist
+/* This is the call back function for each brick from hot/cold bricklist
* It picks up each bricks db and queries for eligible files for migration.
* The list of eligible files are populated in appropriate query files*/
static int
@@ -1122,20 +1279,7 @@ tier_build_migration_qfile (demotion_args_t *args,
gfdb_time_t time_in_past;
int ret = -1;
tier_brick_list_t *local_brick = NULL;
-
- /*
- * The first time this function is called, query file will
- * not exist on a given instance of running the migration daemon.
- * The remove call is optimistic and it is legal if it fails.
- */
-
- ret = remove (GET_QFILE_PATH (is_promotion));
- if (ret == -1) {
- gf_msg_trace (args->this->name, errno,
- "Failed to remove %s",
- GET_QFILE_PATH (is_promotion));
- }
-
+ int i = 0;
time_in_past.tv_sec = args->freq_time;
time_in_past.tv_usec = 0;
@@ -1160,6 +1304,21 @@ tier_build_migration_qfile (demotion_args_t *args,
gfdb_brick_info._query_cbk_args = query_cbk_args;
list_for_each_entry (local_brick, args->brick_list, list) {
+
+ /* Construct query file path for this brick
+ * i.e
+ * /var/run/gluster/xlator_name/
+ * {promote/demote}-brickname-indexinbricklist
+ * So that no two query files will have same path even
+ * bricks have the same name
+ * */
+ snprintf (local_brick->qfile_path, PATH_MAX , "%s-%s-%d",
+ GET_QFILE_PATH (gfdb_brick_info._gfdb_promote),
+ local_brick->brick_name, i);
+
+ /* Delete any old query files for this brick */
+ sys_unlink (local_brick->qfile_path);
+
ret = tier_process_brick (local_brick,
&gfdb_brick_info);
if (ret) {
@@ -1168,6 +1327,7 @@ tier_build_migration_qfile (demotion_args_t *args,
"Brick %s query failed\n",
local_brick->brick_db_path);
}
+ i++;
}
ret = 0;
out:
@@ -1176,30 +1336,80 @@ out:
static int
tier_migrate_files_using_qfile (demotion_args_t *comp,
- query_cbk_args_t *query_cbk_args,
- char *qfile)
+ query_cbk_args_t *query_cbk_args)
{
- char renamed_file[PATH_MAX] = "";
- int ret = -1;
-
- query_cbk_args->query_fd = open (qfile, O_RDONLY);
- if (query_cbk_args->query_fd < 0) {
- gf_msg ("tier", GF_LOG_ERROR, errno,
- DHT_MSG_FOPEN_FAILED,
- "Failed to open %s for migration", qfile);
+ int ret = -1;
+ tier_brick_list_t *local_brick = NULL;
+ tier_brick_list_t *temp = NULL;
+ char query_file_path_err[PATH_MAX] = "";
+ struct tm tm = {0};
+ gfdb_time_t current_time = {0};
+ char time_str[256] = {0};
+ char time_format[20] = "%Y-%m-%d-%H-%M-%S";
+ ssize_t qfile_array_size = 0;
+ int count = 0;
+ int temp_fd = 0;
+
+ /* Time format for error query files */
+ gettimeofday (&current_time, NULL);
+ gmtime_r (&current_time.tv_sec, &tm);
+ strftime (time_str, 256, time_format, &tm);
+
+ /* Build the qfile list */
+ list_for_each_entry_safe (local_brick, temp, comp->brick_list, list) {
+ qfile_array_size++;
+ }
+ query_cbk_args->qfile_array = qfile_array_new (qfile_array_size);
+ if (!query_cbk_args->qfile_array) {
+ gf_msg ("tier", GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR, "Failed to create new "
+ "qfile_array");
goto out;
}
+
+ /*Open all qfiles*/
+ count = 0;
+ query_cbk_args->qfile_array->exhausted_count = 0;
+ list_for_each_entry_safe (local_brick, temp, comp->brick_list, list) {
+ temp_fd = query_cbk_args->qfile_array->fd_array[count];
+ temp_fd = open (local_brick->qfile_path, O_RDONLY,
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+ if (temp_fd < 0) {
+ gf_msg ("tier", GF_LOG_ERROR, errno,
+ DHT_MSG_LOG_TIER_ERROR, "Failed to open "
+ "%s to the query file",
+ local_brick->qfile_path);
+ query_cbk_args->qfile_array->exhausted_count++;
+ }
+ query_cbk_args->qfile_array->fd_array[count] = temp_fd;
+ count++;
+ }
+
+ /* Migrate files using query file list */
ret = tier_migrate_using_query_file ((void *)query_cbk_args);
- sys_close (query_cbk_args->query_fd);
- query_cbk_args->query_fd = -1;
+out:
+ qfile_array_free (query_cbk_args->qfile_array);
+
+ /* If there is an error rename all the query files to .err files
+ * with a timestamp for better debugging */
if (ret) {
- snprintf (renamed_file, sizeof renamed_file, "%s.err", qfile);
- sys_rename (qfile, renamed_file);
+ list_for_each_entry_safe (local_brick, temp, comp->brick_list,
+ list) {
+ /* rename error qfile*/
+ snprintf (query_file_path_err, PATH_MAX, "%s-%s.err",
+ local_brick->qfile_path, time_str);
+ sys_rename (local_brick->qfile_path,
+ query_file_path_err);
+ }
}
-out:
+
+ query_cbk_args->qfile_array = NULL;
+
return ret;
}
+
+
/*Demotion Thread*/
static void *
tier_demote (void *args)
@@ -1229,7 +1439,7 @@ tier_demote (void *args)
/* Migrate files using the query file */
ret = tier_migrate_files_using_qfile (args,
- &query_cbk_args, demotion_qfile);
+ &query_cbk_args);
if (ret)
goto out;
@@ -1266,9 +1476,7 @@ static void
goto out;
/* Migrate files using the query file */
- ret = tier_migrate_files_using_qfile (args,
- &query_cbk_args,
- promotion_qfile);
+ ret = tier_migrate_files_using_qfile (args, &query_cbk_args);
if (ret)
goto out;
@@ -1280,14 +1488,14 @@ out:
static int
tier_get_bricklist (xlator_t *xl, struct list_head *local_bricklist_head)
{
- xlator_list_t *child = NULL;
- char *rv = NULL;
- char *rh = NULL;
- char localhost[256] = {0};
- char *brickname = NULL;
- char db_name[PATH_MAX] = "";
- int ret = 0;
- tier_brick_list_t *local_brick = NULL;
+ xlator_list_t *child = NULL;
+ char *rv = NULL;
+ char *rh = NULL;
+ char localhost[256] = {0};
+ char *brickname = NULL;
+ char db_name[PATH_MAX] = "";
+ int ret = 0;
+ tier_brick_list_t *local_brick = NULL;
GF_VALIDATE_OR_GOTO ("tier", xl, out);
GF_VALIDATE_OR_GOTO ("tier", local_bricklist_head, out);
@@ -1325,18 +1533,22 @@ tier_get_bricklist (xlator_t *xl, struct list_head *local_bricklist_head)
if (!local_brick->brick_db_path) {
gf_msg ("tier", GF_LOG_ERROR, 0,
DHT_MSG_LOG_TIER_STATUS,
- "Faile. to allocate memory for bricklist");
+ "Failed to allocate memory for"
+ " bricklist.");
goto out;
}
- snprintf(local_brick->brick_db_path, PATH_MAX, "%s/%s/%s", rv,
- GF_HIDDEN_PATH,
- db_name);
+ snprintf(local_brick->brick_db_path,
+ PATH_MAX, "%s/%s/%s", rv,
+ GF_HIDDEN_PATH, db_name);
local_brick->xlator = xl;
+ snprintf (local_brick->brick_name,
+ NAME_MAX, "%s", brickname);
+
list_add_tail (&(local_brick->list),
- local_bricklist_head);
+ local_bricklist_head);
ret = 0;
goto out;
@@ -1420,6 +1632,34 @@ clear_bricklist (struct list_head *brick_list)
}
+static void
+set_brick_list_qpath (struct list_head *brick_list, gf_boolean_t is_cold)
+{
+
+ tier_brick_list_t *local_brick = NULL;
+ int i = 0;
+
+ GF_VALIDATE_OR_GOTO ("tier", brick_list, out);
+
+ list_for_each_entry (local_brick, brick_list, list) {
+
+ /* Construct query file path for this brick
+ * i.e
+ * /var/run/gluster/xlator_name/
+ * {promote/demote}-brickname-indexinbricklist
+ * So that no two query files will have same path even
+ * bricks have the same name
+ * */
+ snprintf (local_brick->qfile_path, PATH_MAX , "%s-%s-%d",
+ GET_QFILE_PATH (is_cold),
+ local_brick->brick_name, i);
+ i++;
+ }
+out:
+ return;
+}
+
+
int
tier_start (xlator_t *this, gf_defrag_info_t *defrag)
{
@@ -1453,7 +1693,9 @@ tier_start (xlator_t *this, gf_defrag_info_t *defrag)
INIT_LIST_HEAD ((&bricklist_cold));
tier_get_bricklist (conf->subvolumes[0], &bricklist_cold);
+ set_brick_list_qpath (&bricklist_cold, _gf_true);
tier_get_bricklist (conf->subvolumes[1], &bricklist_hot);
+ set_brick_list_qpath (&bricklist_hot, _gf_false);
is_hot_list_empty = list_empty(&bricklist_hot);
is_cold_list_empty = list_empty(&bricklist_cold);
@@ -1948,27 +2190,20 @@ tier_init (xlator_t *this)
GF_FREE(voldir);
- ret = gf_asprintf (&promotion_qfile, "%s/%s/%s-%s",
+ ret = gf_asprintf (&promotion_qfile, "%s/%s/promote",
DEFAULT_VAR_RUN_DIRECTORY,
- this->name,
- PROMOTION_QFILE,
this->name);
if (ret < 0)
goto out;
- ret = gf_asprintf (&demotion_qfile, "%s/%s/%s-%s",
+ ret = gf_asprintf (&demotion_qfile, "%s/%s/demote",
DEFAULT_VAR_RUN_DIRECTORY,
- this->name,
- DEMOTION_QFILE,
this->name);
if (ret < 0) {
GF_FREE (promotion_qfile);
goto out;
}
- sys_unlink(promotion_qfile);
- sys_unlink(demotion_qfile);
-
gf_msg (this->name, GF_LOG_INFO, 0,
DHT_MSG_LOG_TIER_STATUS,
"Promote/demote frequency %d/%d "
@@ -1978,11 +2213,6 @@ tier_init (xlator_t *this)
defrag->write_freq_threshold,
defrag->read_freq_threshold);
- gf_msg (this->name, GF_LOG_INFO, 0,
- DHT_MSG_LOG_TIER_STATUS,
- "Promote file %s demote file %s",
- promotion_qfile, demotion_qfile);
-
ret = 0;
out:
diff --git a/xlators/cluster/dht/src/tier.h b/xlators/cluster/dht/src/tier.h
index 92e2fda6e5c..41c5a318de4 100644
--- a/xlators/cluster/dht/src/tier.h
+++ b/xlators/cluster/dht/src/tier.h
@@ -39,25 +39,39 @@
#define GET_QFILE_PATH(is_promotion)\
(is_promotion) ? promotion_qfile : demotion_qfile
+typedef struct tier_qfile_array {
+ int *fd_array;
+ ssize_t array_size;
+ ssize_t next_index;
+ /* Indicate the number of exhuasted FDs*/
+ ssize_t exhausted_count;
+} tier_qfile_array_t;
+
+
typedef struct _query_cbk_args {
- xlator_t *this;
- gf_defrag_info_t *defrag;
- int query_fd;
- int is_promotion;
+ xlator_t *this;
+ gf_defrag_info_t *defrag;
+ /* This is write */
+ int query_fd;
+ int is_promotion;
+ /* This is for read */
+ tier_qfile_array_t *qfile_array;
} query_cbk_args_t;
int
gf_run_tier(xlator_t *this, gf_defrag_info_t *defrag);
typedef struct gfdb_brick_info {
- gfdb_time_t *time_stamp;
+ gfdb_time_t *time_stamp;
gf_boolean_t _gfdb_promote;
- query_cbk_args_t *_query_cbk_args;
+ query_cbk_args_t *_query_cbk_args;
} gfdb_brick_info_t;
typedef struct brick_list {
xlator_t *xlator;
char *brick_db_path;
+ char brick_name[NAME_MAX];
+ char qfile_path[PATH_MAX];
struct list_head list;
} tier_brick_list_t;