summaryrefslogtreecommitdiffstats
path: root/xlators
diff options
context:
space:
mode:
authorJoseph Fernandes <josferna@redhat.com>2015-09-18 19:57:54 +0530
committerDan Lambright <dlambrig@redhat.com>2015-10-09 16:58:53 -0700
commit489f02879afd940d201d092720dbf13b2922b134 (patch)
tree1d29e56c9307b0f7722c757bc6d807aeed289d4d /xlators
parentd777b2feaf55347177d6786657d68cc0547f7f73 (diff)
tier/ctr: Solution for db locks for tier migrator and ctr using sqlite version less than 3.7 i.e rhel 6.7
Problem: On RHEL 6.7, we have sqlite version 3.6.2 which doesnt support WAL journaling mode, as this journaling mode is only available in sqlite 3.7 and above. As a result we cannot have to progreses concurrently accessing sqlite, without running into db locks! Well WAL is also need for performace on CTR side. Solution: This solution is to use CTR db connection for doing queries when WAL mode is absent. i,e tier migrator will send sync_op ipc calls to CTR, which in turn will do the query and create/update the query file suggested by tier migrator. Pending: Well this solution will stop the db locks but the performance is still an issue for CTR. We are developing an in-Memory Transaction Log (iMeTaL) which will help boost the CTR performance by doing in memory udpates on the IO path and later flush the updates to the db in a batch/segment flush. Master patch: http://review.gluster.org/#/c/12191 >> Change-Id: Ie3149643ded159234b5cc6aa6cf93b9022c2f124 >> BUG: 1240577 >> Signed-off-by: Joseph Fernandes <josferna@redhat.com> >> Signed-off-by: Dan Lambright <dlambrig@redhat.com> >> Signed-off-by: Joseph Fernandes <josferna@redhat.com> >> Reviewed-on: http://review.gluster.org/12191 >> Tested-by: Gluster Build System <jenkins@build.gluster.com> >> Reviewed-by: Luis Pabon <lpabon@redhat.com> Signed-off-by: Joseph Fernandes <josferna@redhat.com> Change-Id: Ie8c7a7e9566244c104531b579126bb57fbc6e32b BUG: 1270123 Reviewed-on: http://review.gluster.org/12325 Tested-by: NetBSD Build System <jenkins@build.gluster.org> Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Dan Lambright <dlambrig@redhat.com> Tested-by: Dan Lambright <dlambrig@redhat.com>
Diffstat (limited to 'xlators')
-rw-r--r--xlators/cluster/dht/src/dht-mem-types.h1
-rw-r--r--xlators/cluster/dht/src/dht-messages.h11
-rw-r--r--xlators/cluster/dht/src/tier.c305
-rw-r--r--xlators/features/changetimerecorder/src/changetimerecorder.c332
-rw-r--r--xlators/features/changetimerecorder/src/ctr-helper.h7
5 files changed, 600 insertions, 56 deletions
diff --git a/xlators/cluster/dht/src/dht-mem-types.h b/xlators/cluster/dht/src/dht-mem-types.h
index e3a38ed7e03..a90b5710745 100644
--- a/xlators/cluster/dht/src/dht-mem-types.h
+++ b/xlators/cluster/dht/src/dht-mem-types.h
@@ -35,6 +35,7 @@ enum gf_dht_mem_types_ {
gf_dht_mt_octx_t,
gf_dht_mt_miginfo_t,
gf_tier_mt_bricklist_t,
+ gf_tier_mt_ipc_ctr_params_t,
gf_dht_mt_end
};
#endif
diff --git a/xlators/cluster/dht/src/dht-messages.h b/xlators/cluster/dht/src/dht-messages.h
index d6300d3741f..c491600394c 100644
--- a/xlators/cluster/dht/src/dht-messages.h
+++ b/xlators/cluster/dht/src/dht-messages.h
@@ -45,7 +45,7 @@
*/
#define GLFS_DHT_BASE GLFS_MSGID_COMP_DHT
-#define GLFS_DHT_NUM_MESSAGES 106
+#define GLFS_DHT_NUM_MESSAGES 107
#define GLFS_MSGID_END (GLFS_DHT_BASE + GLFS_DHT_NUM_MESSAGES + 1)
/* Messages with message IDs */
@@ -998,5 +998,14 @@
*/
#define DHT_MSG_HAS_MIGINFO (GLFS_DHT_BASE + 106)
+
+/*
+ * @messageid 109107
+ * @diagnosis
+ * @recommendedaction None
+ */
+
+#define DHT_MSG_LOG_IPC_TIER_ERROR (GLFS_DHT_BASE + 107)
+
#define glfs_msg_end_x GLFS_MSGID_END, "Invalid: End of messages"
#endif /* _DHT_MESSAGES_H_ */
diff --git a/xlators/cluster/dht/src/tier.c b/xlators/cluster/dht/src/tier.c
index 327fcebbaba..c93281bc785 100644
--- a/xlators/cluster/dht/src/tier.c
+++ b/xlators/cluster/dht/src/tier.c
@@ -434,17 +434,20 @@ out:
return ret;
}
-/*This is the call back function for each brick from hot/cold bricklist
- * It picks up each bricks db and queries for eligible files for migration.
- * The list of eligible files are populated in appropriate query files*/
+
+
+
+/*Create query file in tier process*/
static int
-tier_process_brick_cbk (brick_list_t *local_brick, void *args) {
+tier_process_self_query (brick_list_t *local_brick, void *args)
+{
int ret = -1;
char *db_path = NULL;
- query_cbk_args_t *query_cbk_args = NULL;
+ query_cbk_args_t *query_cbk_args = NULL;
xlator_t *this = NULL;
- gfdb_conn_node_t *conn_node = NULL;
+ gfdb_conn_node_t *conn_node = NULL;
dict_t *params_dict = NULL;
+ dict_t *ctr_ipc_dict = NULL;
_gfdb_brick_dict_info_t *gfdb_brick_dict_info = args;
/*Init of all the essentials*/
@@ -462,6 +465,7 @@ tier_process_brick_cbk (brick_list_t *local_brick, void *args) {
GF_VALIDATE_OR_GOTO (this->name, local_brick->xlator, out);
GF_VALIDATE_OR_GOTO (this->name, local_brick->brick_db_path, out);
+
db_path = local_brick->brick_db_path;
/*Preparing DB parameters before init_db i.e getting db connection*/
@@ -469,10 +473,10 @@ tier_process_brick_cbk (brick_list_t *local_brick, void *args) {
if (!params_dict) {
gf_msg (this->name, GF_LOG_ERROR, 0,
DHT_MSG_LOG_TIER_ERROR,
- "DB Params cannot initialized!");
+ "DB Params cannot initialized");
goto out;
}
- SET_DB_PARAM_TO_DICT(this->name, params_dict, (char *) gfdb_methods.get_db_path(),
+ SET_DB_PARAM_TO_DICT(this->name, params_dict, (char *) gfdb_methods.get_db_path_key(),
db_path, ret, out);
/*Get the db connection*/
@@ -543,17 +547,41 @@ tier_process_brick_cbk (brick_list_t *local_brick, void *args) {
}
/*Clear the heat on the DB entries*/
- ret = syncop_ipc (local_brick->xlator, GF_IPC_TARGET_CTR, NULL, NULL);
+ /*Preparing ctr_ipc_dict*/
+ ctr_ipc_dict = dict_new ();
+ if (!ctr_ipc_dict) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "ctr_ipc_dict cannot initialized");
+ goto out;
+ }
+
+ SET_DB_PARAM_TO_DICT(this->name, ctr_ipc_dict,
+ GFDB_IPC_CTR_KEY, GFDB_IPC_CTR_CLEAR_OPS,
+ ret, out);
+
+ ret = syncop_ipc (local_brick->xlator, GF_IPC_TARGET_CTR, ctr_ipc_dict,
+ NULL);
if (ret) {
gf_msg (this->name, GF_LOG_ERROR, 0,
DHT_MSG_LOG_TIER_ERROR, "Failed clearing the heat "
- "on db %s", local_brick->brick_db_path);
+ "on db %s error %d", local_brick->brick_db_path, ret);
goto out;
}
ret = 0;
out:
- if (query_cbk_args->queryFILE) {
+ if (params_dict) {
+ dict_unref (params_dict);
+ params_dict = NULL;
+ }
+
+ if (ctr_ipc_dict) {
+ dict_unref (ctr_ipc_dict);
+ ctr_ipc_dict = NULL;
+ }
+
+ if (query_cbk_args && query_cbk_args->queryFILE) {
fclose (query_cbk_args->queryFILE);
query_cbk_args->queryFILE = NULL;
}
@@ -561,7 +589,244 @@ out:
return ret;
}
-static inline int
+
+
+/*Ask CTR to create the query file*/
+static int
+tier_process_ctr_query (brick_list_t *local_brick, void *args)
+{
+ int ret = -1;
+ query_cbk_args_t *query_cbk_args = NULL;
+ xlator_t *this = NULL;
+ dict_t *ctr_ipc_in_dict = NULL;
+ dict_t *ctr_ipc_out_dict = NULL;
+ _gfdb_brick_dict_info_t *gfdb_brick_dict_info = args;
+ gfdb_ipc_ctr_params_t *ipc_ctr_params = NULL;
+ int count = 0;
+
+ /*Init of all the essentials*/
+ GF_VALIDATE_OR_GOTO ("tier", gfdb_brick_dict_info , out);
+ query_cbk_args = gfdb_brick_dict_info->_query_cbk_args;
+
+ GF_VALIDATE_OR_GOTO ("tier", query_cbk_args->this, out);
+ this = query_cbk_args->this;
+
+ GF_VALIDATE_OR_GOTO (this->name,
+ gfdb_brick_dict_info->_query_cbk_args, out);
+
+ GF_VALIDATE_OR_GOTO (this->name, local_brick, out);
+
+ GF_VALIDATE_OR_GOTO (this->name, local_brick->xlator, out);
+
+ GF_VALIDATE_OR_GOTO (this->name, local_brick->brick_db_path, out);
+
+
+ /*Preparing ctr_ipc_in_dict*/
+ ctr_ipc_in_dict = dict_new ();
+ if (!ctr_ipc_in_dict) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "ctr_ipc_in_dict cannot initialized");
+ goto out;
+ }
+
+ ipc_ctr_params = GF_CALLOC (1, sizeof (gfdb_ipc_ctr_params_t),
+ gf_tier_mt_ipc_ctr_params_t);
+ if (!ipc_ctr_params) {
+ goto out;
+ }
+
+ /* set all the query params*/
+ ipc_ctr_params->is_promote = gfdb_brick_dict_info->_gfdb_promote;
+ ipc_ctr_params->write_freq_threshold = query_cbk_args->
+ defrag->write_freq_threshold;
+ ipc_ctr_params->read_freq_threshold = query_cbk_args->
+ defrag->read_freq_threshold;
+ memcpy (&ipc_ctr_params->time_stamp,
+ gfdb_brick_dict_info->time_stamp,
+ sizeof (gfdb_time_t));
+
+ SET_DB_PARAM_TO_DICT(this->name, ctr_ipc_in_dict,
+ GFDB_IPC_CTR_KEY, GFDB_IPC_CTR_QUERY_OPS,
+ ret, out);
+
+ SET_DB_PARAM_TO_DICT(this->name, ctr_ipc_in_dict,
+ GFDB_IPC_CTR_GET_QFILE_PATH,
+ GET_QFILE_PATH(ipc_ctr_params->is_promote),
+ ret, out);
+
+ ret = dict_set_bin (ctr_ipc_in_dict, GFDB_IPC_CTR_GET_QUERY_PARAMS,
+ ipc_ctr_params, sizeof (*ipc_ctr_params));
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0, LG_MSG_SET_PARAM_FAILED,
+ "Failed setting %s to params dictionary",
+ GFDB_IPC_CTR_GET_QUERY_PARAMS);
+ goto out;
+ }
+
+ ret = syncop_ipc (local_brick->xlator, GF_IPC_TARGET_CTR,
+ ctr_ipc_in_dict, &ctr_ipc_out_dict);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_IPC_TIER_ERROR, "Failed query on %s ret %d",
+ local_brick->brick_db_path, ret);
+ goto out;
+ }
+
+ ret = dict_get_int32(ctr_ipc_out_dict, GFDB_IPC_CTR_RET_QUERY_COUNT,
+ &count);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR, "Failed getting count "
+ "of records on %s",
+ local_brick->brick_db_path);
+ goto out;
+ }
+
+ if (count < 0) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR, "Failed query on %s",
+ local_brick->brick_db_path);
+ ret = -1;
+ goto out;
+ }
+
+ pthread_mutex_lock (&dm_stat_mutex);
+ query_cbk_args->defrag->num_files_lookedup = count;
+ pthread_mutex_unlock (&dm_stat_mutex);
+
+ ret = 0;
+out:
+
+ if (ctr_ipc_in_dict) {
+ dict_unref(ctr_ipc_in_dict);
+ ctr_ipc_in_dict = NULL;
+ }
+
+ if (ctr_ipc_out_dict) {
+ dict_unref(ctr_ipc_out_dict);
+ ctr_ipc_out_dict = NULL;
+ ipc_ctr_params = NULL;
+ }
+
+ GF_FREE (ipc_ctr_params);
+
+ return ret;
+}
+
+
+
+
+/*This is the call back function for each brick from hot/cold bricklist
+ * It picks up each bricks db and queries for eligible files for migration.
+ * The list of eligible files are populated in appropriate query files*/
+static int
+tier_process_brick (brick_list_t *local_brick, void *args) {
+ int ret = -1;
+ dict_t *ctr_ipc_in_dict = NULL;
+ dict_t *ctr_ipc_out_dict = NULL;
+ char *strval = NULL;
+
+ GF_VALIDATE_OR_GOTO ("tier", local_brick, out);
+
+ GF_VALIDATE_OR_GOTO ("tier", local_brick->xlator, out);
+
+ if (dht_tier_db_type == GFDB_SQLITE3) {
+
+ /*Preparing ctr_ipc_in_dict*/
+ ctr_ipc_in_dict = dict_new ();
+ if (!ctr_ipc_in_dict) {
+ gf_msg ("tier", GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR,
+ "ctr_ipc_in_dict cannot initialized");
+ goto out;
+ }
+
+ ret = dict_set_str (ctr_ipc_in_dict, GFDB_IPC_CTR_KEY,
+ GFDB_IPC_CTR_GET_DB_PARAM_OPS);
+ if (ret) {
+ gf_msg ("tier", GF_LOG_ERROR, 0,\
+ LG_MSG_SET_PARAM_FAILED, "Failed setting %s "\
+ "to params dictionary", GFDB_IPC_CTR_KEY);\
+ goto out;
+ }
+
+ ret = dict_set_str (ctr_ipc_in_dict,
+ GFDB_IPC_CTR_GET_DB_PARAM_OPS, "");
+ if (ret) {
+ gf_msg ("tier", GF_LOG_ERROR, 0,\
+ LG_MSG_SET_PARAM_FAILED, "Failed setting %s "\
+ "to params dictionary",
+ GFDB_IPC_CTR_GET_DB_PARAM_OPS);\
+ goto out;
+ }
+
+ ret = dict_set_str (ctr_ipc_in_dict,
+ GFDB_IPC_CTR_GET_DB_KEY, "journal_mode");
+ if (ret) {
+ gf_msg ("tier", GF_LOG_ERROR, 0,\
+ LG_MSG_SET_PARAM_FAILED, "Failed setting %s "\
+ "to params dictionary",
+ GFDB_IPC_CTR_GET_DB_KEY);\
+ goto out;
+ }
+
+
+
+ ret = syncop_ipc (local_brick->xlator, GF_IPC_TARGET_CTR,
+ ctr_ipc_in_dict, &ctr_ipc_out_dict);
+ if (ret || ctr_ipc_out_dict == NULL) {
+ gf_msg ("tier", GF_LOG_ERROR, 0,
+ DHT_MSG_LOG_TIER_ERROR, "Failed getting"
+ "journal_mode of sql db %s",
+ local_brick->brick_db_path);
+ goto out;
+ }
+
+ ret = dict_get_str (ctr_ipc_out_dict, "journal_mode", &strval);
+ if (ret) {
+ gf_msg ("tier", GF_LOG_ERROR, 0,\
+ LG_MSG_GET_PARAM_FAILED, "Failed getting %s "\
+ "to params dictionary",
+ "journal_mode");\
+ goto out;
+ }
+
+ if (strval && (strncmp(strval, "wal", strlen ("wal")) == 0)) {
+ ret = tier_process_self_query (local_brick, args);
+ if (ret) {
+ goto out;
+ }
+ } else {
+ ret = tier_process_ctr_query (local_brick, args);
+ if (ret) {
+ goto out;
+ }
+ }
+ ret = 0;
+
+ } else {
+ ret = tier_process_self_query (local_brick, args);
+ if (ret) {
+ goto out;
+ }
+ }
+
+ ret = 0;
+out:
+ if (ctr_ipc_in_dict)
+ dict_unref (ctr_ipc_in_dict);
+
+ if (ctr_ipc_out_dict)
+ dict_unref (ctr_ipc_out_dict);
+
+ return ret;
+}
+
+
+
+
+static int
tier_build_migration_qfile (demotion_args_t *args,
query_cbk_args_t *query_cbk_args,
gf_boolean_t is_promotion)
@@ -603,8 +868,8 @@ tier_build_migration_qfile (demotion_args_t *args,
gfdb_brick_dict_info._query_cbk_args = query_cbk_args;
list_for_each_entry (local_brick, args->brick_list, list) {
- ret = tier_process_brick_cbk (local_brick,
- &gfdb_brick_dict_info);
+ ret = tier_process_brick (local_brick,
+ &gfdb_brick_dict_info);
if (ret) {
gf_msg (args->this->name, GF_LOG_ERROR, 0,
DHT_MSG_BRICK_QUERY_FAILED,
@@ -658,6 +923,8 @@ tier_demote (void *args)
GF_VALIDATE_OR_GOTO (demotion_args->this->name,
demotion_args->defrag, out);
+ THIS = demotion_args->this;
+
query_cbk_args.this = demotion_args->this;
query_cbk_args.defrag = demotion_args->defrag;
query_cbk_args.is_promotion = 0;
@@ -694,6 +961,8 @@ static void
GF_VALIDATE_OR_GOTO (promotion_args->this->name,
promotion_args->defrag, out);
+ THIS = promotion_args->this;
+
query_cbk_args.this = promotion_args->this;
query_cbk_args.defrag = promotion_args->defrag;
query_cbk_args.is_promotion = 1;
@@ -923,7 +1192,7 @@ tier_start (xlator_t *this, gf_defrag_info_t *defrag)
gf_msg (this->name, GF_LOG_ERROR, 0,
DHT_MSG_LOG_TIER_ERROR,
"Failed starting Demotion "
- "thread!");
+ "thread");
}
}
@@ -939,7 +1208,7 @@ tier_start (xlator_t *this, gf_defrag_info_t *defrag)
gf_msg (this->name, GF_LOG_ERROR, 0,
DHT_MSG_LOG_TIER_ERROR,
"Failed starting Promotion "
- "thread!");
+ "thread");
}
}
@@ -948,7 +1217,7 @@ tier_start (xlator_t *this, gf_defrag_info_t *defrag)
if (demotion_args.return_value) {
gf_msg (this->name, GF_LOG_ERROR, 0,
DHT_MSG_LOG_TIER_ERROR,
- "Demotion failed!");
+ "Demotion failed");
}
ret_demotion = demotion_args.return_value;
}
@@ -958,7 +1227,7 @@ tier_start (xlator_t *this, gf_defrag_info_t *defrag)
if (promotion_args.return_value) {
gf_msg (this->name, GF_LOG_ERROR, 0,
DHT_MSG_LOG_TIER_ERROR,
- "Promotion failed!");
+ "Promotion failed");
}
ret_promotion = promotion_args.return_value;
}
diff --git a/xlators/features/changetimerecorder/src/changetimerecorder.c b/xlators/features/changetimerecorder/src/changetimerecorder.c
index 8acd811a0cb..258b56ba541 100644
--- a/xlators/features/changetimerecorder/src/changetimerecorder.c
+++ b/xlators/features/changetimerecorder/src/changetimerecorder.c
@@ -1382,29 +1382,287 @@ out:
/*******************************ctr_ipc****************************************/
+/*This is the call back function per record/file from data base*/
+static int
+ctr_db_query_callback (gfdb_query_record_t *gfdb_query_record,
+ void *args) {
+ int ret = -1;
+ char gfid_str[UUID_CANONICAL_FORM_LEN+1] = "";
+ ctr_query_cbk_args_t *query_cbk_args = args;
+
+ GF_VALIDATE_OR_GOTO ("ctr", query_cbk_args, out);
+
+ gf_uuid_unparse (gfdb_query_record->gfid, gfid_str);
+ fprintf (query_cbk_args->queryFILE, "%s|%s|%ld\n", gfid_str,
+ gfdb_query_record->_link_info_str,
+ gfdb_query_record->link_info_size);
+
+ query_cbk_args->count++;
+
+ ret = 0;
+out:
+ return ret;
+}
+
+/* This function does all the db queries related to tiering and
+ * generates/populates new/existing query file
+ * inputs:
+ * xlator_t *this : CTR Translator
+ * void *conn_node : Database connection
+ * char *query_file: the query file that needs to be updated
+ * gfdb_ipc_ctr_params_t *ipc_ctr_params: the query parameters
+ * Return:
+ * On success 0
+ * On failure -1
+ * */
+int
+ctr_db_query (xlator_t *this,
+ void *conn_node,
+ char *query_file,
+ gfdb_ipc_ctr_params_t *ipc_ctr_params)
+{
+ int ret = -1;
+ ctr_query_cbk_args_t query_cbk_args = {0};
+
+ GF_VALIDATE_OR_GOTO ("ctr", this, out);
+ GF_VALIDATE_OR_GOTO (this->name, conn_node, out);
+ GF_VALIDATE_OR_GOTO (this->name, query_file, out);
+ GF_VALIDATE_OR_GOTO (this->name, ipc_ctr_params, out);
+
+ /*Query for eligible files from db*/
+ query_cbk_args.queryFILE = fopen(query_file, "a+");
+ if (!query_cbk_args.queryFILE) {
+ gf_msg (this->name, GF_LOG_ERROR, errno,
+ CTR_MSG_FATAL_ERROR,
+ "Failed to open query file %s", query_file);
+ goto out;
+ }
+ if (!ipc_ctr_params->is_promote) {
+ if (ipc_ctr_params->write_freq_threshold == 0 &&
+ ipc_ctr_params->read_freq_threshold == 0) {
+ ret = find_unchanged_for_time (
+ conn_node,
+ ctr_db_query_callback,
+ (void *)&query_cbk_args,
+ &ipc_ctr_params->time_stamp);
+ } else {
+ ret = find_unchanged_for_time_freq (
+ conn_node,
+ ctr_db_query_callback,
+ (void *)&query_cbk_args,
+ &ipc_ctr_params->time_stamp,
+ ipc_ctr_params->write_freq_threshold,
+ ipc_ctr_params->read_freq_threshold,
+ _gf_false);
+ }
+ } else {
+ if (ipc_ctr_params->write_freq_threshold == 0 &&
+ ipc_ctr_params->read_freq_threshold == 0) {
+ ret = find_recently_changed_files (
+ conn_node,
+ ctr_db_query_callback,
+ (void *)&query_cbk_args,
+ &ipc_ctr_params->time_stamp);
+ } else {
+ ret = find_recently_changed_files_freq (
+ conn_node,
+ ctr_db_query_callback,
+ (void *)&query_cbk_args,
+ &ipc_ctr_params->time_stamp,
+ ipc_ctr_params->write_freq_threshold,
+ ipc_ctr_params->read_freq_threshold,
+ _gf_false);
+ }
+ }
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ CTR_MSG_FATAL_ERROR,
+ "FATAL: query from db failed");
+ goto out;
+ }
+
+ ret = clear_files_heat (conn_node);
+ if (ret) {
+ gf_msg (this->name, GF_LOG_ERROR, 0,
+ CTR_MSG_FATAL_ERROR,
+ "FATAL: Failed to clear db entries");
+ goto out;
+ }
+
+ ret = 0;
+out:
+
+ if (!ret)
+ ret = query_cbk_args.count;
+
+ if (query_cbk_args.queryFILE) {
+ fclose (query_cbk_args.queryFILE);
+ query_cbk_args.queryFILE = NULL;
+ }
+
+ return ret;
+}
+
+
+int
+ctr_ipc_helper (xlator_t *this, dict_t *in_dict,
+ dict_t *out_dict)
+{
+ int ret = -1;
+ char *ctr_ipc_ops = NULL;
+ gf_ctr_private_t *priv = NULL;
+ char *db_version = NULL;
+ char *db_param_key = NULL;
+ char *db_param = NULL;
+ char *query_file = NULL;
+ gfdb_ipc_ctr_params_t *ipc_ctr_params = NULL;
+
+
+ GF_VALIDATE_OR_GOTO ("ctr", this, out);
+ GF_VALIDATE_OR_GOTO (this->name, this->private, out);
+ priv = this->private;
+ GF_VALIDATE_OR_GOTO (this->name, priv->_db_conn, out);
+ GF_VALIDATE_OR_GOTO (this->name, in_dict, out);
+ GF_VALIDATE_OR_GOTO (this->name, out_dict, out);
+
+ GET_DB_PARAM_FROM_DICT(this->name, in_dict, GFDB_IPC_CTR_KEY,
+ ctr_ipc_ops, out);
+
+ /*if its a db clear operation */
+ if (strncmp (ctr_ipc_ops, GFDB_IPC_CTR_CLEAR_OPS,
+ strlen (GFDB_IPC_CTR_CLEAR_OPS)) == 0) {
+
+ ret = clear_files_heat (priv->_db_conn);
+ if (ret)
+ goto out;
+
+ } /* if its a query operation, in which case its query + clear db*/
+ else if (strncmp (ctr_ipc_ops, GFDB_IPC_CTR_QUERY_OPS,
+ strlen (GFDB_IPC_CTR_QUERY_OPS)) == 0) {
+
+ ret = dict_get_str (in_dict, GFDB_IPC_CTR_GET_QFILE_PATH,
+ &query_file);
+ if (ret) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, CTR_MSG_SET,
+ "Failed extracting query file path");
+ goto out;
+ }
+
+ ret = dict_get_bin (in_dict, GFDB_IPC_CTR_GET_QUERY_PARAMS,
+ (void *)&ipc_ctr_params);
+ if (ret) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, CTR_MSG_SET,
+ "Failed extracting query parameters");
+ goto out;
+ }
+
+ ret = ctr_db_query (this, priv->_db_conn, query_file,
+ ipc_ctr_params);
+
+ ret = dict_set_int32 (out_dict,
+ GFDB_IPC_CTR_RET_QUERY_COUNT, ret);
+ if (ret) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, CTR_MSG_SET,
+ "Failed setting query reply");
+ goto out;
+ }
+
+ } /* if its a query for db version */
+ else if (strncmp (ctr_ipc_ops, GFDB_IPC_CTR_GET_DB_VERSION_OPS,
+ strlen (GFDB_IPC_CTR_GET_DB_VERSION_OPS)) == 0) {
+
+ ret = get_db_version (priv->_db_conn, &db_version);
+ if (ret == -1 || !db_version) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, CTR_MSG_SET,
+ "Failed extracting db version ");
+ goto out;
+ }
+
+ SET_DB_PARAM_TO_DICT(this->name, out_dict,
+ GFDB_IPC_CTR_RET_DB_VERSION,
+ db_version, ret, error);
+
+ } /* if its a query for a db setting */
+ else if (strncmp (ctr_ipc_ops, GFDB_IPC_CTR_GET_DB_PARAM_OPS,
+ strlen (GFDB_IPC_CTR_GET_DB_PARAM_OPS)) == 0) {
+
+ ret = dict_get_str (in_dict, GFDB_IPC_CTR_GET_DB_KEY,
+ &db_param_key);
+ if (ret) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, CTR_MSG_SET,
+ "Failed extracting db param key");
+ goto out;
+ }
+
+ ret = get_db_setting (priv->_db_conn, db_param_key, &db_param);
+ if (ret == -1 || !db_param) {
+ goto out;
+ }
+
+ SET_DB_PARAM_TO_DICT(this->name, out_dict,
+ db_param_key,
+ db_param, ret, error);
+ } /* default case */
+ else {
+ goto out;
+ }
+
+
+ ret = 0;
+ goto out;
+error:
+ GF_FREE (db_param_key);
+ GF_FREE (db_param);
+ GF_FREE (db_version);
+out:
+ return ret;
+}
+
+
/* IPC Call from tier migrator to clear the heat on the DB */
int32_t
-ctr_ipc (call_frame_t *frame, xlator_t *this, int32_t op, dict_t *xdata)
+ctr_ipc (call_frame_t *frame, xlator_t *this, int32_t op,
+ dict_t *in_dict)
{
int ret = -1;
- gf_ctr_private_t *_priv = NULL;
+ gf_ctr_private_t *priv = NULL;
+ dict_t *out_dict = NULL;
GF_ASSERT(this);
- _priv = this->private;
- GF_ASSERT (_priv);
- GF_ASSERT(_priv->_db_conn);
+ priv = this->private;
+ GF_ASSERT (priv);
+ GF_ASSERT(priv->_db_conn);
+ GF_VALIDATE_OR_GOTO (this->name, in_dict, wind);
+
if (op != GF_IPC_TARGET_CTR)
goto wind;
- ret = clear_files_heat (_priv->_db_conn);
+ out_dict = dict_new();
+ if (!out_dict) {
+ goto out;
+ }
+
+ ret = ctr_ipc_helper (this, in_dict, out_dict);
+ if (ret) {
+ gf_msg(this->name, GF_LOG_ERROR, 0, CTR_MSG_SET,
+ "Failed in ctr_ipc_helper");
+ }
+out:
+
+ STACK_UNWIND_STRICT (ipc, frame, ret, 0, out_dict);
+
+ if (out_dict)
+ dict_unref(out_dict);
- STACK_UNWIND_STRICT (ipc, frame, ret, 0, NULL);
return 0;
wind:
STACK_WIND (frame, default_ipc_cbk, FIRST_CHILD (this),
- FIRST_CHILD (this)->fops->ipc, op, xdata);
+ FIRST_CHILD (this)->fops->ipc, op, in_dict);
+
+
+
return 0;
}
@@ -1416,35 +1674,35 @@ reconfigure (xlator_t *this, dict_t *options)
{
char *temp_str = NULL;
int ret = 0;
- gf_ctr_private_t *_priv = NULL;
+ gf_ctr_private_t *priv = NULL;
- _priv = this->private;
+ priv = this->private;
if (dict_get_str(options, "changetimerecorder.frequency",
&temp_str)) {
gf_msg(this->name, GF_LOG_INFO, 0, CTR_MSG_SET, "set!");
}
- GF_OPTION_RECONF ("ctr-enabled", _priv->enabled, options,
+ GF_OPTION_RECONF ("ctr-enabled", priv->enabled, options,
bool, out);
- GF_OPTION_RECONF ("record-counters", _priv->ctr_record_counter, options,
+ GF_OPTION_RECONF ("record-counters", priv->ctr_record_counter, options,
bool, out);
- GF_OPTION_RECONF ("ctr_link_consistency", _priv->ctr_link_consistency,
+ GF_OPTION_RECONF ("ctr_link_consistency", priv->ctr_link_consistency,
options, bool, out);
GF_OPTION_RECONF ("ctr_inode_heal_expire_period",
- _priv->ctr_inode_heal_expire_period,
+ priv->ctr_inode_heal_expire_period,
options, uint64, out);
GF_OPTION_RECONF ("ctr_hardlink_heal_expire_period",
- _priv->ctr_hardlink_heal_expire_period,
+ priv->ctr_hardlink_heal_expire_period,
options, uint64, out);
- GF_OPTION_RECONF ("record-exit", _priv->ctr_record_unwind, options,
+ GF_OPTION_RECONF ("record-exit", priv->ctr_record_unwind, options,
bool, out);
- GF_OPTION_RECONF ("record-entry", _priv->ctr_record_wind, options,
+ GF_OPTION_RECONF ("record-entry", priv->ctr_record_wind, options,
bool, out);
out:
@@ -1457,7 +1715,7 @@ out:
int32_t
init (xlator_t *this)
{
- gf_ctr_private_t *_priv = NULL;
+ gf_ctr_private_t *priv = NULL;
int ret_db = -1;
dict_t *params_dict = NULL;
@@ -1476,8 +1734,8 @@ init (xlator_t *this)
"dangling volume. check volfile ");
}
- _priv = GF_CALLOC (1, sizeof (*_priv), gf_ctr_mt_private_t);
- if (!_priv) {
+ priv = GF_CALLOC (1, sizeof (*priv), gf_ctr_mt_private_t);
+ if (!priv) {
gf_msg (this->name, GF_LOG_ERROR, ENOMEM,
CTR_MSG_CALLOC_FAILED,
"Calloc didnt work!!!");
@@ -1485,20 +1743,20 @@ init (xlator_t *this)
}
/*Default values for the translator*/
- _priv->ctr_record_wind = _gf_true;
- _priv->ctr_record_unwind = _gf_false;
- _priv->ctr_hot_brick = _gf_false;
- _priv->gfdb_db_type = GFDB_SQLITE3;
- _priv->gfdb_sync_type = GFDB_DB_SYNC;
- _priv->enabled = _gf_true;
- _priv->_db_conn = NULL;
- _priv->ctr_hardlink_heal_expire_period =
+ priv->ctr_record_wind = _gf_true;
+ priv->ctr_record_unwind = _gf_false;
+ priv->ctr_hot_brick = _gf_false;
+ priv->gfdb_db_type = GFDB_SQLITE3;
+ priv->gfdb_sync_type = GFDB_DB_SYNC;
+ priv->enabled = _gf_true;
+ priv->_db_conn = NULL;
+ priv->ctr_hardlink_heal_expire_period =
CTR_DEFAULT_HARDLINK_EXP_PERIOD;
- _priv->ctr_inode_heal_expire_period =
+ priv->ctr_inode_heal_expire_period =
CTR_DEFAULT_INODE_EXP_PERIOD;
/*Extract ctr xlator options*/
- ret_db = extract_ctr_options (this, _priv);
+ ret_db = extract_ctr_options (this, priv);
if (ret_db) {
gf_msg (this->name, GF_LOG_ERROR, 0,
CTR_MSG_EXTRACT_CTR_XLATOR_OPTIONS_FAILED,
@@ -1515,7 +1773,7 @@ init (xlator_t *this)
}
/*Extract db params options*/
- ret_db = extract_db_params(this, params_dict, _priv->gfdb_db_type);
+ ret_db = extract_db_params(this, params_dict, priv->gfdb_db_type);
if (ret_db) {
gf_msg (this->name, GF_LOG_ERROR, 0,
CTR_MSG_EXTRACT_DB_PARAM_OPTIONS_FAILED,
@@ -1533,8 +1791,8 @@ init (xlator_t *this)
}
/*Initialize Database Connection*/
- _priv->_db_conn = init_db(params_dict, _priv->gfdb_db_type);
- if (!_priv->_db_conn) {
+ priv->_db_conn = init_db(params_dict, priv->gfdb_db_type);
+ if (!priv->_db_conn) {
gf_msg (this->name, GF_LOG_ERROR, 0,
CTR_MSG_FATAL_ERROR,
"FATAL: Failed initializing data base");
@@ -1550,10 +1808,10 @@ error:
if (this)
mem_pool_destroy (this->local_pool);
- if (_priv) {
- GF_FREE (_priv->ctr_db_path);
+ if (priv) {
+ GF_FREE (priv->ctr_db_path);
}
- GF_FREE (_priv);
+ GF_FREE (priv);
if (params_dict)
dict_unref (params_dict);
@@ -1565,7 +1823,7 @@ out:
if (params_dict)
dict_unref (params_dict);
- this->private = (void *)_priv;
+ this->private = (void *)priv;
return 0;
}
diff --git a/xlators/features/changetimerecorder/src/ctr-helper.h b/xlators/features/changetimerecorder/src/ctr-helper.h
index d6521d26b3d..244427230b4 100644
--- a/xlators/features/changetimerecorder/src/ctr-helper.h
+++ b/xlators/features/changetimerecorder/src/ctr-helper.h
@@ -35,6 +35,13 @@
#define CTR_DEFAULT_HARDLINK_EXP_PERIOD 300 /* Five mins */
#define CTR_DEFAULT_INODE_EXP_PERIOD 300 /* Five mins */
+
+typedef struct ctr_query_cbk_args {
+ FILE *queryFILE;
+ int count;
+} ctr_query_cbk_args_t;
+
+
/*CTR Xlator Private structure*/
typedef struct gf_ctr_private {
gf_boolean_t enabled;