summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvmallika <vmallika@redhat.com>2015-03-17 20:05:19 +0530
committerVijay Bellur <vbellur@redhat.com>2015-03-17 21:07:04 -0700
commit7970183f4c730d4aca3cfaa106fde015a0fbc415 (patch)
treebe48e8390e3acc8468061f95785b943a14c43ced
parent33bb32ce5866a15e7d5164c67f214c4797236066 (diff)
Quota/marker : Support for inode quota
Currently, the only way to retrieve the number of files/objects in a directory or volume is to do a crawl of the entire directory/volume. This is expensive and is not scalable. The new mechanism proposes to store count of objects/files as part of an extended attribute of a directory. Each directory's extended attribute value will indicate the number of files/objects present in a tree with the directory being considered as the root of the tree. Currently file usage is accounted in marker by doing multiple FOPs like setting and getting xattrs. Doing this with STACK WIND and UNWIND can be harder to debug as involves multiple callbacks. In this code we are replacing current mechanism with syncop approach as syncop code is much simpler to follow and help us implement inode quota in an organized way. Change-Id: Ibf366fbe07037284e89a241ddaff7750fc8771b4 BUG: 1188636 Signed-off-by: vmallika <vmallika@redhat.com> Signed-off-by: Sachin Pandit <spandit@redhat.com> Reviewed-on: http://review.gluster.org/9567 Reviewed-by: Vijay Bellur <vbellur@redhat.com> Tested-by: Vijay Bellur <vbellur@redhat.com>
-rw-r--r--cli/src/cli-rpc-ops.c65
-rw-r--r--libglusterfs/src/glusterfs.h3
-rw-r--r--libglusterfs/src/syncop.c36
-rw-r--r--libglusterfs/src/syncop.h4
-rw-r--r--libglusterfs/src/xlator.c42
-rw-r--r--libglusterfs/src/xlator.h1
-rw-r--r--xlators/features/marker/src/marker-mem-types.h2
-rw-r--r--xlators/features/marker/src/marker-quota-helper.c50
-rw-r--r--xlators/features/marker/src/marker-quota-helper.h2
-rw-r--r--xlators/features/marker/src/marker-quota.c1723
-rw-r--r--xlators/features/marker/src/marker-quota.h62
-rw-r--r--xlators/features/marker/src/marker.c67
12 files changed, 1788 insertions, 269 deletions
diff --git a/cli/src/cli-rpc-ops.c b/cli/src/cli-rpc-ops.c
index b2964b68ff6..8ea43f824bc 100644
--- a/cli/src/cli-rpc-ops.c
+++ b/cli/src/cli-rpc-ops.c
@@ -2413,23 +2413,29 @@ static int
print_quota_list_output (cli_local_t *local, char *mountdir,
char *default_sl, char *path)
{
- int64_t used_space = 0;
- int64_t avail = 0;
- char *used_str = NULL;
- char *avail_str = NULL;
- int ret = -1;
- char *sl_final = NULL;
- char *hl_str = NULL;
- double sl_num = 0;
- gf_boolean_t sl = _gf_false;
- gf_boolean_t hl = _gf_false;
- char percent_str[20] = {0};
+ int64_t avail = 0;
+ char *used_str = NULL;
+ char *avail_str = NULL;
+ int ret = -1;
+ char *sl_final = NULL;
+ char *hl_str = NULL;
+ double sl_num = 0;
+ gf_boolean_t sl = _gf_false;
+ gf_boolean_t hl = _gf_false;
+ char percent_str[20] = {0};
+ ssize_t xattr_size = 0;
struct quota_limit {
int64_t hl;
int64_t sl;
} __attribute__ ((__packed__)) existing_limits;
+ struct quota_meta {
+ int64_t size;
+ int64_t file_count;
+ int64_t dir_count;
+ } __attribute__ ((__packed__)) used_space;
+
ret = sys_lgetxattr (mountdir, "trusted.glusterfs.quota.limit-set",
(void *)&existing_limits,
sizeof (existing_limits));
@@ -2490,10 +2496,26 @@ print_quota_list_output (cli_local_t *local, char *mountdir,
sl_final = percent_str;
}
- ret = sys_lgetxattr (mountdir, "trusted.glusterfs.quota.size",
- &used_space, sizeof (used_space));
+ used_space.size = used_space.file_count = used_space.dir_count = 0;
+ xattr_size = sys_lgetxattr (mountdir, "trusted.glusterfs.quota.size",
+ NULL, 0);
+ if (xattr_size > sizeof (int64_t)) {
+ ret = sys_lgetxattr (mountdir, "trusted.glusterfs.quota.size",
+ &used_space, sizeof (used_space));
+ } else if (xattr_size > 0) {
+ /* This is for compatibility.
+ * Older version had only file usage
+ */
+ ret = sys_lgetxattr (mountdir, "trusted.glusterfs.quota.size",
+ &(used_space.size), sizeof (used_space.size));
+ } else {
+ ret = -1;
+ }
if (ret < 0) {
+ gf_log ("cli", GF_LOG_ERROR, "Failed to get quota size "
+ "on path %s: %s", mountdir, strerror (errno));
+
if (global_state->mode & GLUSTER_MODE_XML) {
ret = cli_quota_xml_output (local, path, hl_str,
sl_final, "N/A",
@@ -2510,14 +2532,16 @@ print_quota_list_output (cli_local_t *local, char *mountdir,
"N/A", "N/A", "N/A", "N/A");
}
} else {
- used_space = ntoh64 (used_space);
+ used_space.size = ntoh64 (used_space.size);
+ used_space.file_count = ntoh64 (used_space.file_count);
+ used_space.dir_count = ntoh64 (used_space.dir_count);
- used_str = gf_uint64_2human_readable (used_space);
+ used_str = gf_uint64_2human_readable (used_space.size);
- if (existing_limits.hl > used_space) {
- avail = existing_limits.hl - used_space;
+ if (existing_limits.hl > used_space.size) {
+ avail = existing_limits.hl - used_space.size;
hl = _gf_false;
- if (used_space > sl_num)
+ if (used_space.size > sl_num)
sl = _gf_true;
else
sl = _gf_false;
@@ -2544,8 +2568,9 @@ print_quota_list_output (cli_local_t *local, char *mountdir,
if (used_str == NULL) {
cli_out ("%-40s %7s %9s %11"PRIu64
"%9"PRIu64" %15s %18s", path, hl_str,
- sl_final, used_space, avail, sl? "Yes" : "No",
- hl? "Yes" : "No");
+ sl_final, used_space.size, avail,
+ sl ? "Yes" : "No",
+ hl ? "Yes" : "No");
} else {
cli_out ("%-40s %7s %9s %11s %7s %15s %20s", path, hl_str,
sl_final, used_str, avail_str, sl? "Yes" : "No",
diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h
index 8d7659b5015..a810f3a81f0 100644
--- a/libglusterfs/src/glusterfs.h
+++ b/libglusterfs/src/glusterfs.h
@@ -128,11 +128,12 @@
#define GLUSTERFS_POSIXLK_COUNT "glusterfs.posixlk-count"
#define GLUSTERFS_PARENT_ENTRYLK "glusterfs.parent-entrylk"
#define GLUSTERFS_INODELK_DOM_COUNT "glusterfs.inodelk-dom-count"
-#define QUOTA_SIZE_KEY "trusted.glusterfs.quota.size"
#define GFID_TO_PATH_KEY "glusterfs.gfid2path"
#define GF_XATTR_STIME_PATTERN "trusted.glusterfs.*.stime"
#define GF_XATTR_TRIGGER_SYNC "glusterfs.geo-rep.trigger-sync"
+#define QUOTA_SIZE_KEY "trusted.glusterfs.quota.size"
+
/* Index xlator related */
#define GF_XATTROP_INDEX_GFID "glusterfs.xattrop_index_gfid"
#define GF_XATTROP_INDEX_COUNT "glusterfs.xattrop_index_count"
diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c
index e3321cf6ddb..e241e2c1ee0 100644
--- a/libglusterfs/src/syncop.c
+++ b/libglusterfs/src/syncop.c
@@ -2537,3 +2537,39 @@ syncop_inodelk (xlator_t *subvol, const char *volume, loc_t *loc, int32_t cmd,
return args.op_ret;
}
+
+int32_t
+syncop_xattrop_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *dict,
+ dict_t *xdata)
+{
+ struct syncargs *args = NULL;
+
+ args = cookie;
+
+ args->op_ret = op_ret;
+ args->op_errno = op_errno;
+
+ if (xdata)
+ args->xdata = dict_ref (xdata);
+
+ __wake (args);
+
+ return 0;
+
+}
+
+int
+syncop_xattrop (xlator_t *subvol, loc_t *loc, gf_xattrop_flags_t flags,
+ dict_t *dict, dict_t *xdata)
+{
+ struct syncargs args = {0, };
+
+ SYNCOP (subvol, (&args), syncop_xattrop_cbk, subvol->fops->xattrop,
+ loc, flags, dict, xdata);
+
+ if (args.op_ret < 0)
+ return -args.op_errno;
+
+ return args.op_ret;
+}
diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h
index 7f8ec7345b0..a9244a51552 100644
--- a/libglusterfs/src/syncop.h
+++ b/libglusterfs/src/syncop.h
@@ -439,4 +439,8 @@ syncop_inodelk (xlator_t *subvol, const char *volume, loc_t *loc, int32_t cmd,
int
syncop_ipc (xlator_t *subvol, int op, dict_t *xdata_in, dict_t **xdata_out);
+int
+syncop_xattrop (xlator_t *subvol, loc_t *loc, gf_xattrop_flags_t flags,
+ dict_t *dict, dict_t *xdata);
+
#endif /* _SYNCOP_H */
diff --git a/libglusterfs/src/xlator.c b/libglusterfs/src/xlator.c
index 49af7d2e0e6..cc4726e0ea5 100644
--- a/libglusterfs/src/xlator.c
+++ b/libglusterfs/src/xlator.c
@@ -860,9 +860,51 @@ loc_is_root (loc_t *loc)
} else if (loc && loc->inode && __is_root_gfid (loc->inode->gfid)) {
return _gf_true;
}
+
return _gf_false;
}
+int32_t
+loc_build_child (loc_t *child, loc_t *parent, char *name)
+{
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("xlator", child, out);
+ GF_VALIDATE_OR_GOTO ("xlator", parent, out);
+ GF_VALIDATE_OR_GOTO ("xlator", name, out);
+
+ loc_gfid (parent, child->pargfid);
+
+ if (strcmp (parent->path, "/") == 0)
+ ret = gf_asprintf ((char **)&child->path, "/%s", name);
+ else
+ ret = gf_asprintf ((char **)&child->path, "%s/%s", parent->path,
+ name);
+
+ if (ret < 0 || !child->path) {
+ ret = -1;
+ goto out;
+ }
+
+ child->name = strrchr (child->path, '/') + 1;
+
+ child->parent = inode_ref (parent->inode);
+ child->inode = inode_new (parent->inode->table);
+
+ if (!child->inode) {
+ ret = -1;
+ goto out;
+ }
+
+ ret = 0;
+
+out:
+ if ((ret < 0) && child)
+ loc_wipe (child);
+
+ return ret;
+}
+
int
xlator_destroy (xlator_t *xl)
{
diff --git a/libglusterfs/src/xlator.h b/libglusterfs/src/xlator.h
index e953ec04372..733f6cf47ab 100644
--- a/libglusterfs/src/xlator.h
+++ b/libglusterfs/src/xlator.h
@@ -960,6 +960,7 @@ int loc_path (loc_t *loc, const char *bname);
void loc_gfid (loc_t *loc, uuid_t gfid);
char* loc_gfid_utoa (loc_t *loc);
gf_boolean_t loc_is_root (loc_t *loc);
+int32_t loc_build_child (loc_t *child, loc_t *parent, char *name);
int xlator_mem_acct_init (xlator_t *xl, int num_types);
int is_gf_log_command (xlator_t *trans, const char *name, char *value);
int glusterd_check_log_level (const char *value);
diff --git a/xlators/features/marker/src/marker-mem-types.h b/xlators/features/marker/src/marker-mem-types.h
index 1f74d504897..dc5ad16ed76 100644
--- a/xlators/features/marker/src/marker-mem-types.h
+++ b/xlators/features/marker/src/marker-mem-types.h
@@ -20,6 +20,8 @@ enum gf_marker_mem_types_ {
gf_marker_mt_quota_inode_ctx_t,
gf_marker_mt_marker_inode_ctx_t,
gf_marker_mt_inode_contribution_t,
+ gf_marker_mt_quota_meta_t,
+ gf_marker_mt_quota_synctask_t,
gf_marker_mt_end
};
#endif
diff --git a/xlators/features/marker/src/marker-quota-helper.c b/xlators/features/marker/src/marker-quota-helper.c
index ec0d83316c7..cdc2475c3e8 100644
--- a/xlators/features/marker/src/marker-quota-helper.c
+++ b/xlators/features/marker/src/marker-quota-helper.c
@@ -37,23 +37,27 @@ mq_loc_fill (loc_t *loc, inode_t *inode, inode_t *parent, char *path)
if (parent)
loc->parent = inode_ref (parent);
+ if (!uuid_is_null (inode->gfid))
+ uuid_copy (loc->gfid, inode->gfid);
+
loc->path = gf_strdup (path);
if (!loc->path) {
gf_log ("loc fill", GF_LOG_ERROR, "strdup failed");
- goto loc_wipe;
+ goto out;
}
loc->name = strrchr (loc->path, '/');
if (loc->name)
loc->name++;
else
- goto loc_wipe;
+ goto out;
ret = 0;
-loc_wipe:
+
+out:
if (ret < 0)
loc_wipe (loc);
-out:
+
return ret;
}
@@ -222,37 +226,39 @@ mq_add_new_contribution_node (xlator_t *this, quota_inode_ctx_t *ctx,
int32_t
-mq_dict_set_contribution (xlator_t *this, dict_t *dict,
- loc_t *loc)
+mq_dict_set_contribution (xlator_t *this, dict_t *dict, loc_t *loc,
+ uuid_t gfid, char *contri_key)
{
- int32_t ret = -1;
- char contri_key [512] = {0, };
+ int32_t ret = -1;
+ char key[CONTRI_KEY_MAX] = {0, };
GF_VALIDATE_OR_GOTO ("marker", this, out);
GF_VALIDATE_OR_GOTO ("marker", dict, out);
GF_VALIDATE_OR_GOTO ("marker", loc, out);
- if (loc->parent) {
- GET_CONTRI_KEY (contri_key, loc->parent->gfid, ret);
- if (ret < 0) {
- ret = -1;
- goto out;
- }
+ if (gfid && !uuid_is_null(gfid)) {
+ GET_CONTRI_KEY (key, gfid, ret);
+ } else if (loc->parent) {
+ GET_CONTRI_KEY (key, loc->parent->gfid, ret);
} else {
/* nameless lookup, fetch contributions to all parents */
- GET_CONTRI_KEY (contri_key, NULL, ret);
+ GET_CONTRI_KEY (key, NULL, ret);
}
- ret = dict_set_int64 (dict, contri_key, 0);
- if (ret < 0) {
- gf_log (this->name, GF_LOG_WARNING,
- "unable to set dict value on %s.",
- loc->path);
+ if (ret < 0)
goto out;
- }
- ret = 0;
+ ret = dict_set_int64 (dict, key, 0);
+ if (ret < 0)
+ goto out;
+
+ if (contri_key)
+ strncpy (contri_key, key, CONTRI_KEY_MAX);
+
out:
+ if (ret < 0)
+ gf_log_callingfn (this->name, GF_LOG_ERROR, "dict set failed");
+
return ret;
}
diff --git a/xlators/features/marker/src/marker-quota-helper.h b/xlators/features/marker/src/marker-quota-helper.h
index b200413b0ad..161413debfa 100644
--- a/xlators/features/marker/src/marker-quota-helper.h
+++ b/xlators/features/marker/src/marker-quota-helper.h
@@ -44,7 +44,7 @@ inode_contribution_t *
mq_add_new_contribution_node (xlator_t *, quota_inode_ctx_t *, loc_t *);
int32_t
-mq_dict_set_contribution (xlator_t *, dict_t *, loc_t *);
+mq_dict_set_contribution (xlator_t *, dict_t *, loc_t *, uuid_t, char *);
quota_inode_ctx_t *
mq_inode_ctx_new (inode_t *, xlator_t *);
diff --git a/xlators/features/marker/src/marker-quota.c b/xlators/features/marker/src/marker-quota.c
index b8d7a774377..835108cc403 100644
--- a/xlators/features/marker/src/marker-quota.c
+++ b/xlators/features/marker/src/marker-quota.c
@@ -20,6 +20,7 @@
#include "byte-order.h"
#include "marker-quota.h"
#include "marker-quota-helper.h"
+#include "syncop.h"
int
mq_loc_copy (loc_t *dst, loc_t *src)
@@ -478,11 +479,11 @@ mq_get_child_contribution (call_frame_t *frame,
dict_t *dict,
struct iatt *postparent)
{
- int32_t ret = -1;
- int32_t val = 0;
- char contri_key [512] = {0, };
- int64_t *contri = NULL;
- quota_local_t *local = NULL;
+ int32_t ret = -1;
+ int32_t val = 0;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ int64_t *contri = NULL;
+ quota_local_t *local = NULL;
local = frame->local;
@@ -543,16 +544,16 @@ mq_readdir_cbk (call_frame_t *frame,
int32_t op_errno,
gf_dirent_t *entries, dict_t *xdata)
{
- char contri_key [512] = {0, };
- int32_t ret = 0;
- int32_t val = 0;
- off_t offset = 0;
- int32_t count = 0;
- dict_t *dict = NULL;
- quota_local_t *local = NULL;
- gf_dirent_t *entry = NULL;
- call_frame_t *newframe = NULL;
- loc_t loc = {0, };
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ int32_t ret = 0;
+ int32_t val = 0;
+ off_t offset = 0;
+ int32_t count = 0;
+ dict_t *dict = NULL;
+ quota_local_t *local = NULL;
+ gf_dirent_t *entry = NULL;
+ call_frame_t *newframe = NULL;
+ loc_t loc = {0, };
local = mq_local_ref (frame->local);
@@ -824,7 +825,10 @@ mq_get_dirty_xattr (call_frame_t *frame, void *cookie, xlator_t *this,
if (uuid_is_null (local->loc.gfid))
uuid_copy (local->loc.gfid, local->loc.inode->gfid);
- GF_UUID_ASSERT (local->loc.gfid);
+ if (uuid_is_null (local->loc.gfid)) {
+ ret = -1;
+ goto err;
+ }
STACK_WIND (frame,
mq_check_if_still_dirty,
@@ -850,14 +854,12 @@ err:
* 0 other wise
*/
int32_t
-mq_update_dirty_inode (xlator_t *this,
- loc_t *loc,
- quota_inode_ctx_t *ctx,
+mq_update_dirty_inode (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx,
inode_contribution_t *contribution)
{
int32_t ret = -1;
quota_local_t *local = NULL;
- gf_boolean_t status = _gf_false;
+ gf_boolean_t status = _gf_false;
struct gf_flock lock = {0, };
call_frame_t *frame = NULL;
@@ -1017,14 +1019,14 @@ err:
int32_t
mq_create_xattr (xlator_t *this, call_frame_t *frame)
{
- int32_t ret = 0;
- int64_t *value = NULL;
- int64_t *size = NULL;
- dict_t *dict = NULL;
- char key[512] = {0, };
- quota_local_t *local = NULL;
- quota_inode_ctx_t *ctx = NULL;
- inode_contribution_t *contri = NULL;
+ int32_t ret = 0;
+ int64_t *value = NULL;
+ int64_t *size = NULL;
+ dict_t *dict = NULL;
+ char key[CONTRI_KEY_MAX] = {0, };
+ quota_local_t *local = NULL;
+ quota_inode_ctx_t *ctx = NULL;
+ inode_contribution_t *contri = NULL;
if (frame == NULL || this == NULL)
return 0;
@@ -1070,7 +1072,10 @@ mq_create_xattr (xlator_t *this, call_frame_t *frame)
goto free_value;
}
- GF_UUID_ASSERT (local->loc.gfid);
+ if (uuid_is_null (local->loc.gfid)) {
+ ret = -1;
+ goto out;
+ }
STACK_WIND (frame, mq_create_dirty_xattr, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->xattrop, &local->loc,
@@ -1105,11 +1110,12 @@ mq_check_n_set_inode_xattr (call_frame_t *frame, void *cookie,
inode_t *inode, struct iatt *buf, dict_t *dict,
struct iatt *postparent)
{
- quota_local_t *local = NULL;
- int64_t *size = NULL, *contri = NULL;
- int8_t dirty = 0;
- int32_t ret = 0;
- char contri_key[512] = {0, };
+ quota_local_t *local = NULL;
+ int64_t *size = NULL;
+ int64_t *contri = NULL;
+ int8_t dirty = 0;
+ int32_t ret = 0;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
if (op_ret < 0) {
goto out;
@@ -1174,7 +1180,7 @@ mq_get_xattr (call_frame_t *frame, void *cookie, xlator_t *this,
goto err;
}
- ret = mq_req_xattr (this, &local->loc, xattr_req);
+ ret = mq_req_xattr (this, &local->loc, xattr_req, NULL);
if (ret < 0) {
gf_log (this->name, GF_LOG_WARNING, "cannot request xattr");
goto err;
@@ -1183,7 +1189,10 @@ mq_get_xattr (call_frame_t *frame, void *cookie, xlator_t *this,
if (uuid_is_null (local->loc.gfid))
uuid_copy (local->loc.gfid, local->loc.inode->gfid);
- GF_UUID_ASSERT (local->loc.gfid);
+ if (uuid_is_null (local->loc.gfid)) {
+ ret = -1;
+ goto err;
+ }
STACK_WIND (frame, mq_check_n_set_inode_xattr, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->lookup, &local->loc, xattr_req);
@@ -1634,15 +1643,17 @@ mq_update_inode_contribution (call_frame_t *frame, void *cookie,
struct iatt *buf, dict_t *dict,
struct iatt *postparent)
{
- int32_t ret = -1;
- int64_t *size = NULL, size_int = 0, contri_int = 0;
- int64_t *contri = NULL;
- int64_t *delta = NULL;
- char contri_key [512] = {0, };
- dict_t *newdict = NULL;
- quota_local_t *local = NULL;
- quota_inode_ctx_t *ctx = NULL;
- inode_contribution_t *contribution = NULL;
+ int32_t ret = -1;
+ int64_t *size = NULL;
+ int64_t size_int = 0;
+ int64_t contri_int = 0;
+ int64_t *contri = NULL;
+ int64_t *delta = NULL;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ dict_t *newdict = NULL;
+ quota_local_t *local = NULL;
+ quota_inode_ctx_t *ctx = NULL;
+ inode_contribution_t *contribution = NULL;
local = frame->local;
@@ -1760,11 +1771,11 @@ mq_fetch_child_size_and_contri (call_frame_t *frame, void *cookie,
xlator_t *this, int32_t op_ret,
int32_t op_errno, dict_t *xdata)
{
- int32_t ret = -1;
- char contri_key [512] = {0, };
- dict_t *newdict = NULL;
- quota_local_t *local = NULL;
- quota_inode_ctx_t *ctx = NULL;
+ int32_t ret = -1;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ dict_t *newdict = NULL;
+ quota_local_t *local = NULL;
+ quota_inode_ctx_t *ctx = NULL;
local = frame->local;
@@ -2024,19 +2035,1156 @@ err:
return -1;
}
+int32_t
+mq_dict_set_meta (dict_t *dict, char *key, const quota_meta_t *meta,
+ ia_type_t ia_type)
+{
+ int32_t ret = -1;
+ quota_meta_t *value = NULL;
+
+ QUOTA_ALLOC_OR_GOTO (value, quota_meta_t, ret, out);
+
+ value->size = hton64 (meta->size);
+ value->file_count = hton64 (meta->file_count);
+ value->dir_count = hton64 (meta->dir_count);
+
+ if (ia_type == IA_IFDIR) {
+ ret = dict_set_bin (dict, key, value, sizeof (*value));
+ } else {
+ /* For a file we don't need to store dir_count in the
+ * quota size xattr, so we set the len of the data in the dict
+ * as 128bits, so when the posix xattrop reads the dict, it only
+ * performs operations on size and file_count
+ */
+ ret = dict_set_bin (dict, key, value,
+ sizeof (*value) - sizeof (int64_t));
+ }
+
+ if (ret < 0) {
+ gf_log_callingfn ("marker", GF_LOG_ERROR, "dict set failed");
+ GF_FREE (value);
+ }
+
+out:
+ return ret;
+}
+
+int32_t
+mq_dict_get_meta (dict_t *dict, char *key, quota_meta_t *meta)
+{
+ int32_t ret = -1;
+ data_t *data = NULL;
+ quota_meta_t *value = NULL;
+
+ if (!dict || !key || !meta)
+ goto out;
+
+ data = dict_get (dict, key);
+ if (!data || !data->data)
+ goto out;
+
+ if (data->len > sizeof (int64_t)) {
+ value = (quota_meta_t *) data->data;
+ meta->size = ntoh64 (value->size);
+ meta->file_count = ntoh64 (value->file_count);
+ if (data->len > (sizeof (int64_t)) * 2)
+ meta->dir_count = ntoh64 (value->dir_count);
+ else
+ meta->dir_count = 0;
+ } else {
+ /* This can happen during software upgrade.
+ * Older version of glusterfs will not have inode count.
+ * Return failure, this will be healed as part of lookup
+ */
+ gf_log_callingfn ("marker", GF_LOG_DEBUG, "Object quota xattrs "
+ "missing: len = %d", data->len);
+ ret = -1;
+ goto out;
+ }
+
+ ret = 0;
+out:
+
+ return ret;
+}
+
+void
+mq_compute_delta (quota_meta_t *delta, const quota_meta_t *op1,
+ const quota_meta_t *op2)
+{
+ delta->size = op1->size - op2->size;
+ delta->file_count = op1->file_count - op2->file_count;
+ delta->dir_count = op1->dir_count - op2->dir_count;
+}
+
+void
+mq_add_meta (quota_meta_t *dst, const quota_meta_t *src)
+{
+ dst->size += src->size;
+ dst->file_count += src->file_count;
+ dst->dir_count += src->dir_count;
+}
+
+void
+mq_sub_meta (quota_meta_t *dst, const quota_meta_t *src)
+{
+ if (src == NULL) {
+ dst->size = -dst->size;
+ dst->file_count = -dst->file_count;
+ dst->dir_count = -dst->dir_count;
+ } else {
+ dst->size = src->size - dst->size;
+ dst->file_count = src->file_count - dst->file_count;
+ dst->dir_count = src->dir_count - dst->dir_count;
+ }
+}
+
+gf_boolean_t
+quota_meta_is_null (const quota_meta_t *meta)
+{
+ if (meta->size == 0 &&
+ meta->file_count == 0 &&
+ meta->dir_count == 0)
+ return _gf_true;
+
+ return _gf_false;
+}
+
+int32_t
+mq_are_xattrs_set (xlator_t *this, loc_t *loc, gf_boolean_t *result,
+ gf_boolean_t *objects)
+{
+ int32_t ret = -1;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ quota_meta_t meta = {0, };
+ struct iatt stbuf = {0,};
+ dict_t *dict = NULL;
+ dict_t *rsp_dict = NULL;
+
+ dict = dict_new ();
+ if (dict == NULL) {
+ gf_log (this->name, GF_LOG_ERROR, "dict_new failed");
+ goto out;
+ }
+
+ ret = mq_req_xattr (this, loc, dict, contri_key);
+ if (ret < 0)
+ goto out;
+
+ ret = syncop_lookup (FIRST_CHILD(this), loc, dict, &stbuf, &rsp_dict,
+ NULL);
+ if (ret < 0) {
+ gf_log (this->name, (-ret == ENOENT || -ret == ESTALE)
+ ? GF_LOG_DEBUG:GF_LOG_ERROR, "lookup failed "
+ "for %s: %s", loc->path, strerror (-ret));
+ goto out;
+ }
+
+ if (rsp_dict == NULL) {
+ *result = _gf_false;
+ goto out;
+ }
+
+ *result = _gf_true;
+ if (loc->inode->ia_type == IA_IFDIR) {
+ ret = mq_dict_get_meta (rsp_dict, QUOTA_SIZE_KEY, &meta);
+ if (ret < 0 || meta.dir_count == 0) {
+ ret = 0;
+ *result = _gf_false;
+ goto out;
+ }
+ *objects = _gf_true;
+ }
+
+ if (!loc_is_root(loc) && !dict_get (rsp_dict, contri_key)) {
+ *result = _gf_false;
+ goto out;
+ }
+
+out:
+ if (dict)
+ dict_unref (dict);
+
+ if (rsp_dict)
+ dict_unref (rsp_dict);
+
+ return ret;
+}
+
+int32_t
+mq_create_xattrs (xlator_t *this, loc_t *loc, gf_boolean_t objects)
+{
+ quota_meta_t size = {0, };
+ quota_meta_t contri = {0, };
+ int32_t ret = -1;
+ char key[CONTRI_KEY_MAX] = {0, };
+ dict_t *dict = NULL;
+ quota_inode_ctx_t *ctx = NULL;
+ inode_contribution_t *contribution = NULL;
+
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+
+ ret = mq_inode_ctx_get (loc->inode, this, &ctx);
+ if (ret < 0) {
+ ctx = mq_inode_ctx_new (loc->inode, this);
+ if (ctx == NULL) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "mq_inode_ctx_new failed");
+ ret = -1;
+ goto out;
+ }
+ }
+
+ dict = dict_new ();
+ if (!dict) {
+ gf_log (this->name, GF_LOG_ERROR, "dict_new failed");
+ ret = -1;
+ goto out;
+ }
+
+ if (loc->inode->ia_type == IA_IFDIR) {
+ if (objects == _gf_false) {
+ /* Initial object count of a directory is 1 */
+ size.dir_count = 1;
+ }
+ ret = mq_dict_set_meta (dict, QUOTA_SIZE_KEY, &size, IA_IFDIR);
+ if (ret < 0)
+ goto out;
+ }
+
+ if (!loc_is_root (loc)) {
+ contribution = mq_add_new_contribution_node (this, ctx, loc);
+ if (contribution == NULL) {
+ ret = -1;
+ goto out;
+ }
+
+ GET_CONTRI_KEY (key, contribution->gfid, ret);
+ ret = mq_dict_set_meta (dict, key, &contri,
+ loc->inode->ia_type);
+ if (ret < 0)
+ goto out;
+ }
+
+ ret = syncop_xattrop(FIRST_CHILD(this), loc, GF_XATTROP_ADD_ARRAY64,
+ dict, NULL);
+
+ if (ret < 0) {
+ gf_log (this->name, (-ret == ENOENT || -ret == ESTALE)
+ ? GF_LOG_DEBUG:GF_LOG_ERROR, "xattrop failed "
+ "for %s: %s", loc->path, strerror (-ret));
+ goto out;
+ }
+
+out:
+ if (dict)
+ dict_unref (dict);
+
+ return ret;
+}
+
+int32_t
+mq_lock (xlator_t *this, loc_t *loc, short l_type)
+{
+ struct gf_flock lock = {0, };
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+
+ gf_log (this->name, GF_LOG_DEBUG, "set lock type %d on %s",
+ l_type, loc->path);
+
+ lock.l_len = 0;
+ lock.l_start = 0;
+ lock.l_type = l_type;
+ lock.l_whence = SEEK_SET;
+
+ ret = syncop_inodelk (FIRST_CHILD(this), this->name, loc, F_SETLKW,
+ &lock, NULL, NULL);
+ if (ret < 0)
+ gf_log (this->name, (-ret == ENOENT || -ret == ESTALE)
+ ? GF_LOG_DEBUG:GF_LOG_ERROR, "inodelk failed "
+ "for %s: %s", loc->path, strerror (-ret));
+
+out:
+
+ return ret;
+}
+
+int32_t
+mq_get_dirty (xlator_t *this, loc_t *loc, int32_t *dirty)
+{
+ int32_t ret = -1;
+ int8_t value = 0;
+ dict_t *dict = NULL;
+ dict_t *rsp_dict = NULL;
+ struct iatt stbuf = {0,};
+
+ dict = dict_new ();
+ if (dict == NULL) {
+ gf_log (this->name, GF_LOG_ERROR, "dict_new failed");
+ goto out;
+ }
+
+ ret = dict_set_int64 (dict, QUOTA_DIRTY_KEY, 0);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_WARNING, "dict set failed");
+ goto out;
+ }
+
+ ret = syncop_lookup (FIRST_CHILD(this), loc, dict, &stbuf, &rsp_dict,
+ NULL);
+ if (ret < 0) {
+ gf_log (this->name, (-ret == ENOENT || -ret == ESTALE)
+ ? GF_LOG_DEBUG:GF_LOG_ERROR, "lookup failed "
+ "for %s: %s", loc->path, strerror (-ret));
+ goto out;
+ }
+
+ ret = dict_get_int8 (rsp_dict, QUOTA_DIRTY_KEY, &value);
+ if (ret < 0)
+ goto out;
+
+ *dirty = value;
+
+out:
+ if (dict)
+ dict_unref (dict);
+
+ if (rsp_dict)
+ dict_unref (rsp_dict);
+
+ return ret;
+}
+
+int32_t
+mq_mark_dirty (xlator_t *this, loc_t *loc, int32_t dirty)
+{
+ int32_t ret = -1;
+ dict_t *dict = NULL;
+ quota_inode_ctx_t *ctx = NULL;
+
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+
+ dict = dict_new ();
+ if (!dict) {
+ gf_log (this->name, GF_LOG_ERROR, "dict_new failed");
+ goto out;
+ }
+
+ ret = dict_set_int8 (dict, QUOTA_DIRTY_KEY, dirty);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "dict_set failed");
+ goto out;
+ }
+
+ ret = syncop_setxattr (FIRST_CHILD(this), loc, dict, 0);
+ if (ret < 0) {
+ gf_log (this->name, (-ret == ENOENT || -ret == ESTALE)
+ ? GF_LOG_DEBUG:GF_LOG_ERROR, "setxattr dirty = %d "
+ "failed for %s: %s", dirty, loc->path, strerror (-ret));
+ goto out;
+ }
+
+ ret = mq_inode_ctx_get (loc->inode, this, &ctx);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to get inode ctx for "
+ "%s", loc->path);
+ goto out;
+ }
+
+ LOCK (&ctx->lock);
+ {
+ ctx->dirty = dirty;
+ }
+ UNLOCK (&ctx->lock);
+
+out:
+ if (dict)
+ dict_unref (dict);
+
+ return ret;
+}
+
+int32_t
+_mq_get_metadata (xlator_t *this, loc_t *loc, quota_meta_t *contri,
+ quota_meta_t *size, uuid_t contri_gfid)
+{
+ int32_t ret = -1;
+ quota_meta_t meta = {0, };
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ dict_t *dict = NULL;
+ dict_t *rsp_dict = NULL;
+ struct iatt stbuf = {0,};
+
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+
+ if (size == NULL && contri == NULL)
+ goto out;
+
+ dict = dict_new ();
+ if (dict == NULL) {
+ gf_log (this->name, GF_LOG_ERROR, "dict_new failed");
+ goto out;
+ }
+
+ if (size && loc->inode->ia_type == IA_IFDIR) {
+ ret = dict_set_int64 (dict, QUOTA_SIZE_KEY, 0);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "dict_set failed.");
+ goto out;
+ }
+ }
+
+ if (contri && !loc_is_root(loc)) {
+ ret = mq_dict_set_contribution (this, dict, loc, contri_gfid,
+ contri_key);
+ if (ret < 0)
+ goto out;
+ }
+
+ ret = syncop_lookup (FIRST_CHILD(this), loc, dict, &stbuf, &rsp_dict,
+ NULL);
+ if (ret < 0) {
+ gf_log (this->name, (-ret == ENOENT || -ret == ESTALE)
+ ? GF_LOG_DEBUG:GF_LOG_ERROR, "lookup failed "
+ "for %s: %s", loc->path, strerror (-ret));
+ goto out;
+ }
+
+ if (size) {
+ if (loc->inode->ia_type == IA_IFDIR) {
+ ret = mq_dict_get_meta (rsp_dict, QUOTA_SIZE_KEY,
+ &meta);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "dict_get failed.");
+ goto out;
+ }
+
+ size->size = meta.size;
+ size->file_count = meta.file_count;
+ size->dir_count = meta.dir_count;
+ } else {
+ size->size = stbuf.ia_blocks * 512;
+ size->file_count = 1;
+ size->dir_count = 0;
+ }
+ }
+
+ if (contri && !loc_is_root(loc)) {
+ ret = mq_dict_get_meta (rsp_dict, contri_key, &meta);
+ if (ret < 0) {
+ contri->size = 0;
+ contri->file_count = 0;
+ contri->dir_count = 0;
+ } else {
+ contri->size = meta.size;
+ contri->file_count = meta.file_count;
+ contri->dir_count = meta.dir_count;
+ }
+ }
+
+ ret = 0;
+
+out:
+ if (dict)
+ dict_unref (dict);
+
+ if (rsp_dict)
+ dict_unref (rsp_dict);
+
+ return ret;
+}
+
+int32_t
+mq_get_metadata (xlator_t *this, loc_t *loc, quota_meta_t *contri,
+ quota_meta_t *size, quota_inode_ctx_t *ctx,
+ inode_contribution_t *contribution)
+{
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+ GF_VALIDATE_OR_GOTO ("marker", ctx, out);
+ GF_VALIDATE_OR_GOTO ("marker", contribution, out);
+
+ if (size == NULL && contri == NULL) {
+ ret = 0;
+ goto out;
+ }
+
+ ret = _mq_get_metadata (this, loc, contri, size, contribution->gfid);
+ if (ret < 0) {
+ gf_log_callingfn (this->name, GF_LOG_ERROR, "Failed to get "
+ "metadata for %s", loc->path);
+ goto out;
+ }
+
+ if (size) {
+ LOCK (&ctx->lock);
+ {
+ ctx->size = size->size;
+ ctx->file_count = size->file_count;
+ ctx->dir_count = size->dir_count;
+ }
+ UNLOCK (&ctx->lock);
+ }
+
+ if (contri) {
+ LOCK (&contribution->lock);
+ {
+ contribution->contribution = contri->size;
+ contribution->file_count = contri->file_count;
+ contribution->dir_count = contri->dir_count;
+ }
+ UNLOCK (&contribution->lock);
+ }
+
+out:
+ return ret;
+}
+
+int32_t
+mq_get_size (xlator_t *this, loc_t *loc, quota_meta_t *size)
+{
+ int32_t ret = -1;
+
+ ret = _mq_get_metadata (this, loc, NULL, size, 0);
+ if (ret < 0) {
+ gf_log_callingfn (this->name, GF_LOG_ERROR, "Failed to get "
+ "metadata for %s", loc->path);
+ goto out;
+ }
+
+out:
+ return ret;
+}
+
+int32_t
+mq_get_contri (xlator_t *this, loc_t *loc, quota_meta_t *contri,
+ uuid_t contri_gfid)
+{
+ int32_t ret = -1;
+
+ ret = _mq_get_metadata (this, loc, contri, NULL, contri_gfid);
+ if (ret < 0) {
+ gf_log_callingfn (this->name, GF_LOG_ERROR, "Failed to get "
+ "metadata for %s", loc->path);
+ goto out;
+ }
+
+out:
+ return ret;
+}
+
+int32_t
+mq_get_delta (xlator_t *this, loc_t *loc, quota_meta_t *delta,
+ quota_inode_ctx_t *ctx, inode_contribution_t *contribution)
+{
+ int32_t ret = -1;
+ quota_meta_t size = {0, };
+ quota_meta_t contri = {0, };
+
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+ GF_VALIDATE_OR_GOTO ("marker", ctx, out);
+ GF_VALIDATE_OR_GOTO ("marker", contribution, out);
+
+ ret = mq_get_metadata (this, loc, &contri, &size, ctx, contribution);
+ if (ret < 0)
+ goto out;
+
+ mq_compute_delta (delta, &size, &contri);
+
+out:
+ return ret;
+}
+
+int32_t
+mq_remove_contri (xlator_t *this, loc_t *loc, inode_contribution_t *contri)
+{
+ int32_t ret = -1;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+
+ GET_CONTRI_KEY (contri_key, contri->gfid, ret);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "get contri_key "
+ "failed for %s", uuid_utoa(contri->gfid));
+ goto out;
+ }
+
+ ret = syncop_removexattr (FIRST_CHILD(this), loc, contri_key, 0);
+ if (ret < 0) {
+ if (-ret == ENOENT || -ret == ESTALE) {
+ /* Remove contri in done when unlink operation is
+ * performed, so return success on ENOENT/ESTSLE
+ */
+ ret = 0;
+ } else {
+ gf_log (this->name, GF_LOG_ERROR, "removexattr %s "
+ "failed for %s: %s", contri_key, loc->path,
+ strerror (-ret));
+ goto out;
+ }
+ }
+
+ LOCK (&contri->lock);
+ {
+ contri->contribution = 0;
+ contri->file_count = 0;
+ contri->dir_count = 0;
+ }
+ UNLOCK (&contri->lock);
+
+ ret = 0;
+out:
+
+ return ret;
+}
+
+int32_t
+mq_update_contri (xlator_t *this, loc_t *loc, inode_contribution_t *contri,
+ quota_meta_t *delta)
+{
+ int32_t ret = -1;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ dict_t *dict = NULL;
+
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+ GF_VALIDATE_OR_GOTO ("marker", delta, out);
+ GF_VALIDATE_OR_GOTO ("marker", contri, out);
+
+ if (quota_meta_is_null (delta)) {
+ ret = 0;
+ goto out;
+ }
+
+ dict = dict_new ();
+ if (!dict) {
+ gf_log (this->name, GF_LOG_ERROR, "dict_new failed");
+ ret = -1;
+ goto out;
+ }
+
+ GET_CONTRI_KEY (contri_key, contri->gfid, ret);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "get contri_key "
+ "failed for %s", uuid_utoa(contri->gfid));
+ goto out;
+ }
+
+ ret = mq_dict_set_meta (dict, contri_key, delta, loc->inode->ia_type);
+ if (ret < 0)
+ goto out;
+
+ ret = syncop_xattrop(FIRST_CHILD(this), loc, GF_XATTROP_ADD_ARRAY64,
+ dict, NULL);
+ if (ret < 0) {
+ gf_log (this->name, (-ret == ENOENT || -ret == ESTALE)
+ ? GF_LOG_DEBUG:GF_LOG_ERROR, "xattrop failed "
+ "for %s: %s", loc->path, strerror (-ret));
+ goto out;
+ }
+
+ LOCK (&contri->lock);
+ {
+ contri->contribution += delta->size;
+ contri->file_count += delta->file_count;
+ contri->dir_count += delta->dir_count;
+ }
+ UNLOCK (&contri->lock);
+
+out:
+ if (dict)
+ dict_unref (dict);
+
+ return ret;
+}
+
+int32_t
+mq_update_size (xlator_t *this, loc_t *loc, quota_meta_t *delta)
+{
+ int32_t ret = -1;
+ quota_inode_ctx_t *ctx = NULL;
+ dict_t *dict = NULL;
+
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+ GF_VALIDATE_OR_GOTO ("marker", delta, out);
+
+ if (quota_meta_is_null (delta)) {
+ ret = 0;
+ goto out;
+ }
+
+ ret = mq_inode_ctx_get (loc->inode, this, &ctx);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to get inode ctx for "
+ "%s", loc->path);
+ goto out;
+ }
+
+ dict = dict_new ();
+ if (!dict) {
+ gf_log (this->name, GF_LOG_ERROR, "dict_new failed");
+ ret = -1;
+ goto out;
+ }
+
+ ret = mq_dict_set_meta (dict, QUOTA_SIZE_KEY, delta,
+ loc->inode->ia_type);
+ if (ret < 0)
+ goto out;
+
+ ret = syncop_xattrop(FIRST_CHILD(this), loc, GF_XATTROP_ADD_ARRAY64,
+ dict, NULL);
+ if (ret < 0) {
+ gf_log (this->name, (-ret == ENOENT || -ret == ESTALE)
+ ? GF_LOG_DEBUG:GF_LOG_ERROR, "xattrop failed "
+ "for %s: %s", loc->path, strerror (-ret));
+ goto out;
+ }
+
+ LOCK (&ctx->lock);
+ {
+ ctx->size += delta->size;
+ ctx->file_count += delta->file_count;
+ ctx->dir_count += delta->dir_count;
+ }
+ UNLOCK (&ctx->lock);
+
+out:
+ if (dict)
+ dict_unref (dict);
+
+ return ret;
+}
int
-mq_initiate_quota_txn (xlator_t *this, loc_t *loc)
+mq_synctask_cleanup (int ret, call_frame_t *frame, void *opaque)
{
- int32_t ret = -1;
- gf_boolean_t status = _gf_false;
- quota_inode_ctx_t *ctx = NULL;
- inode_contribution_t *contribution = NULL;
+ quota_synctask_t *args = NULL;
+
+ GF_ASSERT (opaque);
+
+ args = (quota_synctask_t *) opaque;
+ loc_wipe (&args->loc);
+ if (args->dict)
+ dict_unref (args->dict);
+
+ if (!args->is_static)
+ GF_FREE (args);
+
+ return 0;
+}
+
+int
+mq_synctask (xlator_t *this, synctask_fn_t task, gf_boolean_t spawn, loc_t *loc,
+ dict_t *dict, struct iatt *buf, int64_t contri)
+{
+ int32_t ret = -1;
+ quota_synctask_t *args = NULL;
+ quota_synctask_t static_args = {0, };
+
+ if (spawn) {
+ QUOTA_ALLOC_OR_GOTO (args, quota_synctask_t, ret, out);
+ args->is_static = _gf_false;
+ } else {
+ args = &static_args;
+ args->is_static = _gf_true;
+ }
+
+ args->this = this;
+ loc_copy (&args->loc, loc);
+ args->contri = contri;
+ if (dict)
+ args->dict = dict_ref (dict);
+ if (buf)
+ args->buf = *buf;
+
+
+ if (spawn) {
+ ret = synctask_new (this->ctx->env, task, mq_synctask_cleanup,
+ NULL, args);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to spawn "
+ "new synctask");
+ mq_synctask_cleanup (ret, NULL, args);
+ }
+ } else {
+ ret = task (args);
+ mq_synctask_cleanup (ret, NULL, args);
+ }
+
+out:
+ return ret;
+}
+
+int
+mq_start_quota_txn_v2 (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx,
+ inode_contribution_t *contri)
+{
+ int32_t ret = -1;
+ loc_t child_loc = {0,};
+ loc_t parent_loc = {0,};
+ gf_boolean_t locked = _gf_false;
+ gf_boolean_t dirty = _gf_false;
+ gf_boolean_t status = _gf_true;
+ quota_meta_t delta = {0, };
+
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+ GF_VALIDATE_OR_GOTO ("marker", ctx, out);
+ GF_VALIDATE_OR_GOTO ("marker", contri, out);
+
+ ret = mq_loc_copy (&child_loc, loc);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "loc copy failed");
+ goto out;
+ }
+
+ if (uuid_is_null (child_loc.gfid))
+ uuid_copy (child_loc.gfid, child_loc.inode->gfid);
+
+ if (uuid_is_null (child_loc.gfid)) {
+ ret = -1;
+ gf_log (this->name, GF_LOG_DEBUG, "UUID is null for %s",
+ child_loc.path);
+ goto out;
+ }
+
+ while (!__is_root_gfid (child_loc.gfid)) {
+ /* To improve performance, abort current transaction
+ * if one is already in progress for same inode
+ */
+ ret = mq_test_and_set_ctx_updation_status (ctx, &status);
+ if (ret < 0 || status == _gf_true)
+ goto out;
+
+ ret = mq_inode_loc_fill (NULL, child_loc.parent, &parent_loc);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "loc fill failed");
+ goto out;
+ }
+
+ ret = mq_lock (this, &parent_loc, F_WRLCK);
+ if (ret < 0)
+ goto out;
+ locked = _gf_true;
+
+ mq_set_ctx_updation_status (ctx, _gf_false);
+ status = _gf_true;
+
+ ret = mq_get_delta (this, &child_loc, &delta, ctx, contri);
+ if (ret < 0)
+ goto out;
+
+ if (quota_meta_is_null (&delta))
+ goto out;
+
+ ret = mq_mark_dirty (this, &parent_loc, 1);
+ if (ret < 0)
+ goto out;
+ dirty = _gf_true;
+
+ ret = mq_update_contri (this, &child_loc, contri, &delta);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "contri "
+ "update failed for %s", child_loc.path);
+ goto out;
+ }
+
+ ret = mq_update_size (this, &parent_loc, &delta);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_WARNING, "rollback "
+ "contri updation");
+ mq_sub_meta (&delta, NULL);
+ mq_update_contri (this, &child_loc, contri, &delta);
+ goto out;
+ }
+
+ ret = mq_mark_dirty (this, &parent_loc, 0);
+ dirty = _gf_false;
+
+ ret = mq_lock (this, &parent_loc, F_UNLCK);
+ locked = _gf_false;
+
+ if (__is_root_gfid (parent_loc.gfid))
+ break;
+
+ /* Repeate above steps upwards till the root */
+ loc_wipe (&child_loc);
+ ret = mq_loc_copy (&child_loc, &parent_loc);
+ if (ret < 0)
+ goto out;
+ loc_wipe (&parent_loc);
+
+ ret = mq_inode_ctx_get (child_loc.inode, this, &ctx);
+ if (ret < 0)
+ goto out;
+
+ if (list_empty (&ctx->contribution_head)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "contribution node list is empty (%s)",
+ uuid_utoa(child_loc.inode->gfid));
+ ret = -1;
+ goto out;
+ }
+ contri = mq_get_contribution_node (child_loc.parent, ctx);
+ GF_ASSERT (contri != NULL);
+ }
+
+out:
+ if (ret >= 0 && dirty)
+ ret = mq_mark_dirty (this, &parent_loc, 0);
+
+ if (locked)
+ ret = mq_lock (this, &parent_loc, F_UNLCK);
+
+ if (status == _gf_false)
+ mq_set_ctx_updation_status (ctx, _gf_false);
+
+ loc_wipe (&child_loc);
+ loc_wipe (&parent_loc);
+
+ return ret;
+}
+
+int
+mq_create_xattrs_task (void *opaque)
+{
+ int32_t ret = -1;
+ gf_boolean_t locked = _gf_false;
+ gf_boolean_t xattrs_set = _gf_false;
+ gf_boolean_t objects = _gf_false;
+ gf_boolean_t need_txn = _gf_false;
+ quota_synctask_t *args = NULL;
+ xlator_t *this = NULL;
+ loc_t *loc = NULL;
+
+ GF_ASSERT (opaque);
+
+ args = (quota_synctask_t *) opaque;
+ loc = &args->loc;
+ this = args->this;
+ THIS = this;
+
+ if (uuid_is_null (loc->gfid))
+ uuid_copy (loc->gfid, loc->inode->gfid);
+
+ if (uuid_is_null (loc->gfid)) {
+ ret = -1;
+ gf_log (this->name, GF_LOG_DEBUG, "UUID is null for %s",
+ loc->path);
+ goto out;
+ }
+
+ ret = mq_lock (this, loc, F_WRLCK);
+ if (ret < 0)
+ goto out;
+ locked = _gf_true;
+
+ ret = mq_are_xattrs_set (this, loc, &xattrs_set, &objects);
+ if (ret < 0 || xattrs_set)
+ goto out;
+
+ ret = mq_create_xattrs (this, loc, objects);
+ if (ret < 0)
+ goto out;
+
+ need_txn = _gf_true;
+out:
+ if (locked)
+ ret = mq_lock (this, loc, F_UNLCK);
+
+ if (need_txn)
+ ret = mq_initiate_quota_blocking_txn (this, loc);
+
+ return ret;
+}
+
+int
+mq_create_xattrs_txn (xlator_t *this, loc_t *loc)
+{
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+
+ ret = mq_synctask (this, mq_create_xattrs_task, _gf_true, loc, NULL,
+ NULL, 0);
+out:
+ return ret;
+}
+
+int
+mq_create_xattrs_blocking_txn (xlator_t *this, loc_t *loc)
+{
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+
+ ret = mq_synctask (this, mq_create_xattrs_task, _gf_false, loc, NULL,
+ NULL, 0);
+out:
+ return ret;
+}
+
+int32_t
+mq_reduce_parent_size_task (void *opaque)
+{
+ int32_t ret = -1;
+ quota_inode_ctx_t *ctx = NULL;
+ inode_contribution_t *contribution = NULL;
+ quota_meta_t delta = {0, };
+ loc_t parent_loc = {0,};
+ gf_boolean_t locked = _gf_false;
+ gf_boolean_t dirty = _gf_false;
+ quota_synctask_t *args = NULL;
+ xlator_t *this = NULL;
+ loc_t *loc = NULL;
+ int64_t contri = 0;
+
+ GF_ASSERT (opaque);
+
+ args = (quota_synctask_t *) opaque;
+ loc = &args->loc;
+ contri = args->contri;
+ this = args->this;
+ THIS = this;
+
+ ret = mq_inode_ctx_get (loc->inode, this, &ctx);
+ if (ret < 0) {
+ gf_log_callingfn (this->name, GF_LOG_WARNING, "ctx for"
+ " the node %s is NULL", loc->path);
+ goto out;
+ }
+
+ contribution = mq_get_contribution_node (loc->parent, ctx);
+ if (contribution == NULL) {
+ ret = -1;
+ gf_log_callingfn (this->name, GF_LOG_WARNING,
+ "contribution for the node %s is NULL",
+ loc->path);
+ goto out;
+ }
+
+ if (contri >= 0) {
+ /* contri paramater is supplied only for rename operation */
+ delta.size = contri;
+ delta.file_count = 1;
+ delta.dir_count = 0;
+ }
+
+ ret = mq_inode_loc_fill (NULL, loc->parent, &parent_loc);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "loc fill failed");
+ goto out;
+ }
+
+ ret = mq_lock (this, &parent_loc, F_WRLCK);
+ if (ret < 0)
+ goto out;
+ locked = _gf_true;
+
+ if (contri < 0) {
+ LOCK (&contribution->lock);
+ {
+ delta.size = contribution->contribution;
+ delta.file_count = contribution->file_count;
+ delta.dir_count = contribution->dir_count;
+ }
+ UNLOCK (&contribution->lock);
+ }
+
+ /* TODO: Handle handlinks with better approach
+ Iterating dentry_list without a lock is not a good idea
+ if (loc->inode->ia_type != IA_IFDIR) {
+ list_for_each_entry (dentry, &inode->dentry_list, inode_list) {
+ if (loc->parent == dentry->parent) {
+ * If the file has another link within the same
+ * directory, we should not be reducing the size
+ * of parent
+ *
+ delta = 0;
+ idelta = 0;
+ break;
+ }
+ }
+ }
+ */
+
+ if (quota_meta_is_null (&delta))
+ goto out;
+
+ ret = mq_mark_dirty (this, &parent_loc, 1);
+ if (ret < 0)
+ goto out;
+ dirty = _gf_true;
+
+ ret = mq_remove_contri (this, loc, contribution);
+ if (ret < 0)
+ goto out;
+
+ mq_sub_meta (&delta, NULL);
+ ret = mq_update_size (this, &parent_loc, &delta);
+ if (ret < 0)
+ goto out;
+
+out:
+ if (dirty && ret >= 0)
+ ret = mq_mark_dirty (this, &parent_loc, 0);
+
+ if (locked)
+ ret = mq_lock (this, &parent_loc, F_UNLCK);
+
+ if (ret >= 0)
+ ret = mq_initiate_quota_blocking_txn (this, &parent_loc);
+
+ loc_wipe (&parent_loc);
+
+ return ret;
+}
+
+int32_t
+mq_reduce_parent_size_txn (xlator_t *this, loc_t *loc, int64_t contri)
+{
+ int32_t ret = -1;
GF_VALIDATE_OR_GOTO ("marker", this, out);
GF_VALIDATE_OR_GOTO ("marker", loc, out);
GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+ ret = mq_synctask (this, mq_reduce_parent_size_task, _gf_true, loc,
+ NULL, NULL, contri);
+out:
+ return ret;
+}
+
+int
+mq_initiate_quota_task (void *opaque)
+{
+ int32_t ret = -1;
+ quota_inode_ctx_t *ctx = NULL;
+ inode_contribution_t *contribution = NULL;
+ quota_synctask_t *args = NULL;
+ xlator_t *this = NULL;
+ loc_t *loc = NULL;
+
+ GF_ASSERT (opaque);
+
+ args = (quota_synctask_t *) opaque;
+ loc = &args->loc;
+ this = args->this;
+ THIS = this;
+
ret = mq_inode_ctx_get (loc->inode, this, &ctx);
if (ret == -1) {
gf_log (this->name, GF_LOG_WARNING,
@@ -2057,69 +3205,259 @@ mq_initiate_quota_txn (xlator_t *this, loc_t *loc)
*/
contribution = mq_get_contribution_node (loc->parent, ctx);
if (!contribution) {
- if ((loc->path && strcmp (loc->path, "/"))
- || (!uuid_is_null (loc->gfid)
- && !__is_root_gfid (loc->gfid))
- || (loc->inode && !uuid_is_null (loc->inode->gfid)
- && !__is_root_gfid (loc->inode->gfid)))
+ if (!loc_is_root(loc))
gf_log_callingfn (this->name, GF_LOG_TRACE,
"contribution node for the "
"path (%s) with parent (%s) "
"not found", loc->path,
- loc->parent?
- uuid_utoa (loc->parent->gfid):
+ loc->parent ?
+ uuid_utoa (loc->parent->gfid) :
NULL);
contribution = mq_add_new_contribution_node (this, ctx, loc);
if (!contribution) {
- if(loc->path && strcmp (loc->path, "/"))
+ if (!loc_is_root(loc))
gf_log_callingfn (this->name, GF_LOG_WARNING,
"could not allocate "
" contribution node for (%s) "
"parent: (%s)", loc->path,
- loc->parent?
- uuid_utoa (loc->parent->gfid):
+ loc->parent ?
+ uuid_utoa (loc->parent->gfid) :
NULL);
goto out;
}
}
- /* To improve performance, do not start another transaction
- * if one is already in progress for same inode
- */
- status = _gf_true;
+ mq_start_quota_txn_v2 (this, loc, ctx, contribution);
+
+ ret = 0;
+out:
+ return ret;
+}
+
+int
+mq_initiate_quota_txn (xlator_t *this, loc_t *loc)
+{
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("marker", this, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+
+ ret = mq_synctask (this, mq_initiate_quota_task, _gf_true, loc, NULL,
+ NULL, 0);
+out:
+ return ret;
+}
+
+int
+mq_initiate_quota_blocking_txn (xlator_t *this, loc_t *loc)
+{
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("marker", this, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+
+ ret = mq_synctask (this, mq_initiate_quota_task, _gf_false, loc, NULL,
+ NULL, 0);
+out:
+ return ret;
+}
+
+/* return 1 when dirty updation is performed
+ * return 0 other wise
+ */
+int32_t
+mq_update_dirty_inode_v2 (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx,
+ inode_contribution_t *contribution)
+{
+ int32_t ret = -1;
+ fd_t *fd = NULL;
+ off_t offset = 0;
+ loc_t child_loc = {0, };
+ gf_dirent_t entries;
+ gf_dirent_t *entry = NULL;
+ gf_boolean_t status = _gf_true;
+ gf_boolean_t locked = _gf_false;
+ gf_boolean_t free_entries = _gf_false;
+ gf_boolean_t updated = _gf_false;
+ int32_t dirty = 0;
+ quota_meta_t contri = {0, };
+ quota_meta_t size = {0, };
+ quota_meta_t contri_sum = {0, };
+ quota_meta_t delta = {0, };
+
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+
+ ret = mq_get_ctx_updation_status (ctx, &status);
+ if (ret == -1 || status == _gf_true) {
+ ret = 0;
+ goto out;
+ }
+
+ if (uuid_is_null (loc->gfid))
+ uuid_copy (loc->gfid, loc->inode->gfid);
- ret = mq_test_and_set_ctx_updation_status (ctx, &status);
+ if (uuid_is_null (loc->gfid)) {
+ ret = -1;
+ gf_log (this->name, GF_LOG_DEBUG, "UUID is null for %s",
+ loc->path);
+ goto out;
+ }
+
+ ret = mq_lock (this, loc, F_WRLCK);
if (ret < 0)
goto out;
+ locked = _gf_true;
- if (status == _gf_false) {
- mq_start_quota_txn (this, loc, ctx, contribution);
+ ret = mq_get_dirty (this, loc, &dirty);
+ if (ret < 0 || dirty == 0) {
+ ret = 0;
+ goto out;
}
- ret = 0;
+ fd = fd_create (loc->inode, 0);
+ if (!fd) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to create fd");
+ ret = -1;
+ goto out;
+ }
+
+ ret = syncop_opendir (this, loc, fd);
+ if (ret < 0) {
+ gf_log (this->name, (-ret == ENOENT || -ret == ESTALE)
+ ? GF_LOG_DEBUG:GF_LOG_ERROR, "opendir failed "
+ "for %s: %s", loc->path, strerror (-ret));
+ goto out;
+ }
+
+ INIT_LIST_HEAD (&entries.list);
+ while ((ret = syncop_readdirp (this, fd, 131072, offset, NULL,
+ &entries)) != 0) {
+ if (ret < 0) {
+ gf_log (this->name, (-ret == ENOENT || -ret == ESTALE)
+ ? GF_LOG_DEBUG:GF_LOG_ERROR, "readdirp failed "
+ "for %s: %s", loc->path, strerror (-ret));
+ goto out;
+ }
+
+ if (list_empty (&entries.list))
+ break;
+
+ free_entries = _gf_true;
+ list_for_each_entry (entry, &entries.list, list) {
+ offset = entry->d_off;
+
+ if (!strcmp (entry->d_name, ".") ||
+ !strcmp (entry->d_name, ".."))
+ continue;
+
+ ret = loc_build_child (&child_loc, loc, entry->d_name);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "Couldn't build loc for %s/%s "
+ "returning from updation of dirty "
+ "inode", loc->path, entry->d_name);
+ goto out;
+ }
+
+ ret = mq_get_contri (this, &child_loc, &contri,
+ loc->gfid);
+ if (ret < 0)
+ goto out;
+
+ mq_add_meta (&contri_sum, &contri);
+ loc_wipe (&child_loc);
+ }
+
+ gf_dirent_free (&entries);
+ free_entries = _gf_false;
+ }
+ /* Inculde for self */
+ contri_sum.dir_count++;
+
+ ret = mq_get_size (this, loc, &size);
+ if (ret < 0)
+ goto out;
+
+ mq_compute_delta (&delta, &contri_sum, &size);
+
+ if (quota_meta_is_null (&delta))
+ goto out;
+
+ gf_log (this->name, GF_LOG_INFO, "calculated size = %"PRId64
+ ", original size = %"PRIu64 ", diff = %"PRIu64
+ ", path = %s ", contri_sum.size, size.size, delta.size,
+ loc->path);
+
+ gf_log (this->name, GF_LOG_INFO, "calculated f_count = %"PRId64
+ ", original f_count = %"PRIu64 ", diff = %"PRIu64
+ ", path = %s ", contri_sum.file_count, size.file_count,
+ delta.file_count, loc->path);
+
+ gf_log (this->name, GF_LOG_INFO, "calculated d_count = %"PRId64
+ ", original d_count = %"PRIu64 ", diff = %"PRIu64
+ ", path = %s ", contri_sum.dir_count, size.dir_count,
+ delta.dir_count, loc->path);
+
+
+ ret = mq_update_size (this, loc, &delta);
+ if (ret < 0)
+ goto out;
+
+ updated = _gf_true;
+
out:
- return ret;
-}
+ if (free_entries)
+ gf_dirent_free (&entries);
+ if (fd)
+ fd_unref (fd);
+
+ if (ret >= 0 && dirty)
+ mq_mark_dirty (this, loc, 0);
+
+ if (locked)
+ mq_lock (this, loc, F_UNLCK);
+ if (status == _gf_false)
+ mq_set_ctx_updation_status (ctx, _gf_false);
+ loc_wipe(&child_loc);
+ if (updated)
+ return 1;
+ else
+ return 0;
+}
int32_t
-mq_inspect_directory_xattr (xlator_t *this,
- loc_t *loc,
- dict_t *dict,
- struct iatt buf)
+mq_inspect_directory_xattr_task (void *opaque)
{
- int32_t ret = 0;
- int8_t dirty = -1;
- int64_t *size = NULL, size_int = 0;
- int64_t *contri = NULL, contri_int = 0;
- char contri_key [512] = {0, };
- gf_boolean_t not_root = _gf_false;
- quota_inode_ctx_t *ctx = NULL;
- inode_contribution_t *contribution = NULL;
+ int32_t ret = 0;
+ int8_t dirty = -1;
+ quota_meta_t size = {0, };
+ quota_meta_t contri = {0, };
+ quota_meta_t delta = {0, };
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ quota_inode_ctx_t *ctx = NULL;
+ inode_contribution_t *contribution = NULL;
+ quota_synctask_t *args = NULL;
+ xlator_t *this = NULL;
+ loc_t *loc = NULL;
+ dict_t *dict = NULL;
+ struct iatt buf = {0,};
+
+ GF_ASSERT (opaque);
+
+ args = (quota_synctask_t *) opaque;
+ loc = &args->loc;
+ dict = args->dict;
+ buf = args->buf;
+ this = args->this;
+ THIS = this;
ret = mq_inode_ctx_get (loc->inode, this, &ctx);
if (ret < 0) {
@@ -2132,7 +3470,7 @@ mq_inspect_directory_xattr (xlator_t *this,
}
}
- if (!loc->path || (loc->path && strcmp (loc->path, "/") != 0)) {
+ if (!loc_is_root(loc)) {
contribution = mq_add_new_contribution_node (this, ctx, loc);
if (contribution == NULL) {
if (!uuid_is_null (loc->inode->gfid))
@@ -2144,76 +3482,96 @@ mq_inspect_directory_xattr (xlator_t *this,
}
}
- ret = dict_get_bin (dict, QUOTA_SIZE_KEY, (void **) &size);
+ ret = dict_get_int8 (dict, QUOTA_DIRTY_KEY, &dirty);
if (ret < 0)
goto out;
- ret = dict_get_int8 (dict, QUOTA_DIRTY_KEY, &dirty);
+ ret = mq_dict_get_meta (dict, QUOTA_SIZE_KEY, &size);
if (ret < 0)
goto out;
- if ((loc->path && strcmp (loc->path, "/") != 0)
- || (!uuid_is_null (loc->gfid) && !__is_root_gfid (loc->gfid))
- || (loc->inode && !uuid_is_null (loc->inode->gfid) &&
- !__is_root_gfid (loc->inode->gfid))) {
- not_root = _gf_true;
-
+ if (!loc_is_root(loc)) {
GET_CONTRI_KEY (contri_key, contribution->gfid, ret);
if (ret < 0)
- goto out;
+ goto err;
- ret = dict_get_bin (dict, contri_key, (void **) &contri);
+ ret = mq_dict_get_meta (dict, contri_key, &contri);
if (ret < 0)
goto out;
LOCK (&contribution->lock);
{
- contribution->contribution = ntoh64 (*contri);
- contri_int = contribution->contribution;
+ contribution->contribution = contri.size;
+ contribution->file_count = contri.file_count;
+ contribution->dir_count = contri.dir_count;
}
UNLOCK (&contribution->lock);
}
LOCK (&ctx->lock);
{
- ctx->size = ntoh64 (*size);
+ ctx->size = size.size;
+ ctx->file_count = size.file_count;
+ ctx->dir_count = size.dir_count;
ctx->dirty = dirty;
- size_int = ctx->size;
}
UNLOCK (&ctx->lock);
- gf_log (this->name, GF_LOG_DEBUG, "size=%"PRId64
- " contri=%"PRId64, size_int, contri_int);
+ mq_compute_delta (&delta, &size, &contri);
- if (dirty) {
- ret = mq_update_dirty_inode (this, loc, ctx, contribution);
- }
+ if (dirty)
+ ret = mq_update_dirty_inode_v2 (this, loc, ctx, contribution);
- if ((!dirty || ret == 0) && (not_root == _gf_true) &&
- (size_int != contri_int)) {
- mq_initiate_quota_txn (this, loc);
- }
+ if ((!dirty || ret == 1) &&
+ !loc_is_root(loc) &&
+ !quota_meta_is_null (&delta))
+ mq_initiate_quota_blocking_txn (this, loc);
ret = 0;
out:
- if (ret)
- mq_set_inode_xattr (this, loc);
+ if (ret < 0)
+ ret = mq_create_xattrs_blocking_txn (this, loc);
+
err:
return ret;
}
int32_t
-mq_inspect_file_xattr (xlator_t *this,
- loc_t *loc,
- dict_t *dict,
- struct iatt buf)
+mq_inspect_directory_xattr_txn (xlator_t *this, loc_t *loc, dict_t *dict,
+ struct iatt buf)
{
- int32_t ret = -1;
- uint64_t contri_int = 0, size = 0;
- int64_t *contri_ptr = NULL;
- char contri_key [512] = {0, };
- quota_inode_ctx_t *ctx = NULL;
- inode_contribution_t *contribution = NULL;
+ int32_t ret = -1;
+
+ ret = mq_synctask (this, mq_inspect_directory_xattr_task, _gf_true,
+ loc, dict, &buf, 0);
+
+ return ret;
+}
+
+int32_t
+mq_inspect_file_xattr_task (void *opaque)
+{
+ int32_t ret = -1;
+ quota_meta_t size = {0, };
+ quota_meta_t contri = {0, };
+ quota_meta_t delta = {0, };
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ quota_inode_ctx_t *ctx = NULL;
+ inode_contribution_t *contribution = NULL;
+ quota_synctask_t *args = NULL;
+ xlator_t *this = NULL;
+ loc_t *loc = NULL;
+ dict_t *dict = NULL;
+ struct iatt buf = {0,};
+
+ GF_ASSERT (opaque);
+
+ args = (quota_synctask_t *) opaque;
+ loc = &args->loc;
+ dict = args->dict;
+ buf = args->buf;
+ this = args->this;
+ THIS = this;
ret = mq_inode_ctx_get (loc->inode, this, &ctx);
if (ret < 0) {
@@ -2230,103 +3588,111 @@ mq_inspect_file_xattr (xlator_t *this,
if (contribution == NULL) {
gf_log_callingfn (this->name, GF_LOG_DEBUG, "cannot allocate "
"contribution node (path:%s)", loc->path);
+ ret = -1;
goto out;
}
LOCK (&ctx->lock);
{
ctx->size = 512 * buf.ia_blocks;
- size = ctx->size;
+ ctx->file_count = 1;
+ ctx->dir_count = 0;
+
+ size.size = ctx->size;
+ size.file_count = ctx->file_count;
+ size.dir_count = ctx->dir_count;
}
UNLOCK (&ctx->lock);
list_for_each_entry (contribution, &ctx->contribution_head,
contri_list) {
+
GET_CONTRI_KEY (contri_key, contribution->gfid, ret);
if (ret < 0)
continue;
- ret = dict_get_bin (dict, contri_key, (void **) &contri_int);
- if (ret == 0) {
- contri_ptr = (int64_t *)(unsigned long)contri_int;
-
+ ret = mq_dict_get_meta (dict, contri_key, &contri);
+ if (ret < 0) {
+ ret = mq_create_xattrs_blocking_txn (this, loc);
+ } else {
LOCK (&contribution->lock);
{
- contribution->contribution = ntoh64 (*contri_ptr);
- contri_int = contribution->contribution;
+ contribution->contribution = contri.size;
+ contribution->file_count = contri.file_count;
+ contribution->dir_count = contri.dir_count;
}
UNLOCK (&contribution->lock);
- gf_log (this->name, GF_LOG_DEBUG,
- "size=%"PRId64 " contri=%"PRId64, size, contri_int);
-
- if (size != contri_int) {
- mq_initiate_quota_txn (this, loc);
+ mq_compute_delta (&delta, &size, &contri);
+ if (!quota_meta_is_null (&delta)) {
+ mq_initiate_quota_blocking_txn (this, loc);
+ /* TODO: revist this code when fixing hardlinks
+ */
+ break;
}
- } else {
- if (size)
- mq_initiate_quota_txn (this, loc);
- else
- mq_set_inode_xattr (this, loc);
}
+
+ /* TODO: loc->parent might need to be assigned to corresponding
+ * contribution inode. We need to handle hard links here
+ */
}
out:
+
return ret;
}
int32_t
-mq_xattr_state (xlator_t *this,
- loc_t *loc,
- dict_t *dict,
- struct iatt buf)
+mq_inspect_file_xattr_txn (xlator_t *this, loc_t *loc, dict_t *dict,
+ struct iatt buf)
+{
+ int32_t ret = -1;
+
+ ret = mq_synctask (this, mq_inspect_file_xattr_task, _gf_true,
+ loc, dict, &buf, 0);
+
+ return ret;
+}
+
+int32_t
+mq_xattr_state (xlator_t *this, loc_t *loc, dict_t *dict, struct iatt buf)
{
if (((buf.ia_type == IA_IFREG) && !dht_is_linkfile (&buf, dict))
|| (buf.ia_type == IA_IFLNK)) {
- mq_inspect_file_xattr (this, loc, dict, buf);
+ mq_inspect_file_xattr_txn (this, loc, dict, buf);
} else if (buf.ia_type == IA_IFDIR)
- mq_inspect_directory_xattr (this, loc, dict, buf);
+ mq_inspect_directory_xattr_txn (this, loc, dict, buf);
return 0;
}
int32_t
-mq_req_xattr (xlator_t *this,
- loc_t *loc,
- dict_t *dict)
+mq_req_xattr (xlator_t *this, loc_t *loc, dict_t *dict,
+ char *contri_key)
{
int32_t ret = -1;
GF_VALIDATE_OR_GOTO ("marker", this, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
GF_VALIDATE_OR_GOTO ("marker", dict, out);
- if (!loc)
- goto set_size;
-
- //if not "/" then request contribution
- if (loc->path && strcmp (loc->path, "/") == 0)
- goto set_size;
-
- ret = mq_dict_set_contribution (this, dict, loc);
- if (ret == -1)
- goto out;
+ if (!loc_is_root(loc)) {
+ ret = mq_dict_set_contribution (this, dict, loc, NULL,
+ contri_key);
+ if (ret < 0)
+ goto out;
+ }
-set_size:
ret = dict_set_uint64 (dict, QUOTA_SIZE_KEY, 0);
- if (ret < 0) {
- ret = -1;
+ if (ret < 0)
goto out;
- }
ret = dict_set_int8 (dict, QUOTA_DIRTY_KEY, 0);
- if (ret < 0) {
- ret = -1;
- goto out;
- }
-
- ret = 0;
out:
+ if (ret < 0)
+ gf_log_callingfn (this->name, GF_LOG_ERROR, "dict set failed");
+
return ret;
}
@@ -2344,15 +3710,15 @@ int32_t
_mq_inode_remove_done (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
- int32_t ret = 0;
- char contri_key [512] = {0, };
- quota_local_t *local = NULL;
- inode_t *inode = NULL;
- dentry_t *tmp = NULL;
- gf_boolean_t last_dentry = _gf_true;
- loc_t loc = {0, };
- dentry_t *other_dentry = NULL;
- gf_boolean_t remove = _gf_false;
+ int32_t ret = 0;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ quota_local_t *local = NULL;
+ inode_t *inode = NULL;
+ dentry_t *tmp = NULL;
+ gf_boolean_t last_dentry = _gf_true;
+ loc_t loc = {0, };
+ dentry_t *other_dentry = NULL;
+ gf_boolean_t remove = _gf_false;
local = (quota_local_t *) frame->local;
@@ -2531,6 +3897,7 @@ mq_reduce_parent_size_xattr (call_frame_t *frame, void *cookie, xlator_t *this,
dict = dict_new ();
if (dict == NULL) {
+ gf_log (this->name, GF_LOG_ERROR, "dict_new failed");
ret = -1;
goto err;
}
diff --git a/xlators/features/marker/src/marker-quota.h b/xlators/features/marker/src/marker-quota.h
index 42def9d22dc..fa132a815b7 100644
--- a/xlators/features/marker/src/marker-quota.h
+++ b/xlators/features/marker/src/marker-quota.h
@@ -21,7 +21,7 @@
#define QUOTA_XATTR_PREFIX "trusted.glusterfs"
#define QUOTA_DIRTY_KEY "trusted.glusterfs.quota.dirty"
-#define CONTRIBUTION "contri"
+#define CONTRIBUTION "contri"
#define CONTRI_KEY_MAX 512
#define READDIR_BUF 4096
@@ -59,21 +59,21 @@
ret = 0; \
} while (0);
-#define GET_CONTRI_KEY(var, _gfid, _ret) \
- do { \
- if (_gfid != NULL) { \
- char _gfid_unparsed[40]; \
- uuid_unparse (_gfid, _gfid_unparsed); \
- _ret = snprintf (var, CONTRI_KEY_MAX, \
- QUOTA_XATTR_PREFIX \
+#define GET_CONTRI_KEY(var, _gfid, _ret) \
+ do { \
+ if (_gfid != NULL) { \
+ char _gfid_unparsed[40]; \
+ uuid_unparse (_gfid, _gfid_unparsed); \
+ _ret = snprintf (var, CONTRI_KEY_MAX, \
+ QUOTA_XATTR_PREFIX \
".%s.%s." CONTRIBUTION, "quota", \
- _gfid_unparsed); \
- } else { \
- _ret = snprintf (var, CONTRI_KEY_MAX, \
- QUOTA_XATTR_PREFIX \
- ".%s.." CONTRIBUTION, "quota"); \
- } \
- } while (0);
+ _gfid_unparsed); \
+ } else { \
+ _ret = snprintf (var, CONTRI_KEY_MAX, \
+ QUOTA_XATTR_PREFIX \
+ ".%s.." CONTRIBUTION, "quota"); \
+ } \
+ } while (0)
#define QUOTA_SAFE_INCREMENT(lock, var) \
do { \
@@ -84,6 +84,8 @@
struct quota_inode_ctx {
int64_t size;
+ int64_t file_count;
+ int64_t dir_count;
int8_t dirty;
gf_boolean_t updation_status;
gf_lock_t lock;
@@ -91,9 +93,28 @@ struct quota_inode_ctx {
};
typedef struct quota_inode_ctx quota_inode_ctx_t;
+struct quota_meta {
+ int64_t size;
+ int64_t file_count;
+ int64_t dir_count;
+};
+typedef struct quota_meta quota_meta_t;
+
+struct quota_synctask {
+ xlator_t *this;
+ loc_t loc;
+ dict_t *dict;
+ struct iatt buf;
+ int64_t contri;
+ gf_boolean_t is_static;
+};
+typedef struct quota_synctask quota_synctask_t;
+
struct inode_contribution {
struct list_head contri_list;
int64_t contribution;
+ int64_t file_count;
+ int64_t dir_count;
uuid_t gfid;
gf_lock_t lock;
};
@@ -103,7 +124,7 @@ int32_t
mq_get_lock_on_parent (call_frame_t *, xlator_t *);
int32_t
-mq_req_xattr (xlator_t *, loc_t *, dict_t *);
+mq_req_xattr (xlator_t *, loc_t *, dict_t *, char *);
int32_t
init_quota_priv (xlator_t *);
@@ -117,6 +138,12 @@ mq_set_inode_xattr (xlator_t *, loc_t *);
int
mq_initiate_quota_txn (xlator_t *, loc_t *);
+int
+mq_initiate_quota_blocking_txn (xlator_t *, loc_t *);
+
+int
+mq_create_xattrs_txn (xlator_t *this, loc_t *loc);
+
int32_t
mq_dirty_inode_readdir (call_frame_t *, void *, xlator_t *,
int32_t, int32_t, fd_t *, dict_t *);
@@ -125,6 +152,9 @@ int32_t
mq_reduce_parent_size (xlator_t *, loc_t *, int64_t);
int32_t
+mq_reduce_parent_size_txn (xlator_t *, loc_t *, int64_t);
+
+int32_t
mq_rename_update_newpath (xlator_t *, loc_t *);
int32_t
diff --git a/xlators/features/marker/src/marker.c b/xlators/features/marker/src/marker.c
index ad3aabda9ba..af7ec1f907f 100644
--- a/xlators/features/marker/src/marker.c
+++ b/xlators/features/marker/src/marker.c
@@ -607,7 +607,7 @@ marker_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
priv = this->private;
if (priv->feature_enabled & GF_QUOTA)
- mq_set_inode_xattr (this, &local->loc);
+ mq_create_xattrs_txn (this, &local->loc);
if (priv->feature_enabled & GF_XTIME)
marker_xtime_update_marks (this, local);
@@ -681,7 +681,7 @@ marker_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
priv = this->private;
if (priv->feature_enabled & GF_QUOTA)
- mq_set_inode_xattr (this, &local->loc);
+ mq_create_xattrs_txn (this, &local->loc);
if (priv->feature_enabled & GF_XTIME)
marker_xtime_update_marks (this, local);
@@ -827,7 +827,7 @@ marker_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
priv = this->private;
if (priv->feature_enabled & GF_QUOTA)
- mq_reduce_parent_size (this, &local->loc, -1);
+ mq_reduce_parent_size_txn (this, &local->loc, -1);
if (priv->feature_enabled & GF_XTIME)
marker_xtime_update_marks (this, local);
@@ -896,7 +896,7 @@ marker_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
if (priv->feature_enabled & GF_QUOTA) {
if (!local->skip_txn)
- mq_reduce_parent_size (this, &local->loc, -1);
+ mq_reduce_parent_size_txn (this, &local->loc, -1);
}
if (priv->feature_enabled & GF_XTIME)
@@ -977,7 +977,7 @@ marker_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
if (priv->feature_enabled & GF_QUOTA) {
if (!local->skip_txn)
- mq_set_inode_xattr (this, &local->loc);
+ mq_create_xattrs_txn (this, &local->loc);
}
@@ -1065,10 +1065,11 @@ marker_rename_done (call_frame_t *frame, void *cookie, xlator_t *this,
frame->root->unique);
}
- mq_reduce_parent_size (this, &oplocal->loc, oplocal->contribution);
+ mq_reduce_parent_size_txn (this, &oplocal->loc, oplocal->contribution);
if (local->loc.inode != NULL) {
- mq_reduce_parent_size (this, &local->loc, local->contribution);
+ mq_reduce_parent_size_txn (this, &local->loc,
+ local->contribution);
}
newloc.inode = inode_ref (oplocal->loc.inode);
@@ -1078,7 +1079,7 @@ marker_rename_done (call_frame_t *frame, void *cookie, xlator_t *this,
newloc.name++;
newloc.parent = inode_ref (local->loc.parent);
- mq_set_inode_xattr (this, &newloc);
+ mq_create_xattrs_txn (this, &newloc);
loc_wipe (&newloc);
@@ -1181,13 +1182,13 @@ marker_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
struct iatt *prenewparent, struct iatt *postnewparent,
dict_t *xdata)
{
- marker_conf_t *priv = NULL;
- marker_local_t *local = NULL;
- marker_local_t *oplocal = NULL;
- call_stub_t *stub = NULL;
- int32_t ret = 0;
- char contri_key [512] = {0, };
- loc_t newloc = {0, };
+ marker_conf_t *priv = NULL;
+ marker_local_t *local = NULL;
+ marker_local_t *oplocal = NULL;
+ call_stub_t *stub = NULL;
+ int32_t ret = 0;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ loc_t newloc = {0, };
local = (marker_local_t *) frame->local;
@@ -1284,10 +1285,11 @@ int32_t
marker_do_rename (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *dict, dict_t *xdata)
{
- marker_local_t *local = NULL, *oplocal = NULL;
- char contri_key[512] = {0, };
- int32_t ret = 0;
- int64_t *contribution = 0;
+ marker_local_t *local = NULL;
+ marker_local_t *oplocal = NULL;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ int32_t ret = 0;
+ int64_t *contribution = 0;
local = frame->local;
oplocal = local->oplocal;
@@ -1336,10 +1338,11 @@ marker_get_newpath_contribution (call_frame_t *frame, void *cookie,
xlator_t *this, int32_t op_ret,
int32_t op_errno, dict_t *dict, dict_t *xdata)
{
- marker_local_t *local = NULL, *oplocal = NULL;
- char contri_key[512] = {0, };
- int32_t ret = 0;
- int64_t *contribution = 0;
+ marker_local_t *local = NULL;
+ marker_local_t *oplocal = NULL;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ int32_t ret = 0;
+ int64_t *contribution = 0;
local = frame->local;
oplocal = local->oplocal;
@@ -1403,9 +1406,10 @@ marker_get_oldpath_contribution (call_frame_t *frame, void *cookie,
xlator_t *this, int32_t op_ret,
int32_t op_errno, dict_t *xdata)
{
- marker_local_t *local = NULL, *oplocal = NULL;
- char contri_key[512] = {0, };
- int32_t ret = 0;
+ marker_local_t *local = NULL;
+ marker_local_t *oplocal = NULL;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ int32_t ret = 0;
local = frame->local;
oplocal = local->oplocal;
@@ -1764,8 +1768,9 @@ marker_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
priv = this->private;
- if (priv->feature_enabled & GF_QUOTA)
- mq_set_inode_xattr (this, &local->loc);
+ if (priv->feature_enabled & GF_QUOTA) {
+ mq_create_xattrs_txn (this, &local->loc);
+ }
if (priv->feature_enabled & GF_XTIME)
marker_xtime_update_marks (this, local);
@@ -1838,7 +1843,7 @@ marker_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
priv = this->private;
if ((priv->feature_enabled & GF_QUOTA) && (S_ISREG (local->mode))) {
- mq_set_inode_xattr (this, &local->loc);
+ mq_create_xattrs_txn (this, &local->loc);
}
if (priv->feature_enabled & GF_XTIME)
@@ -2706,7 +2711,7 @@ marker_lookup (call_frame_t *frame, xlator_t *this,
goto err;
if ((priv->feature_enabled & GF_QUOTA) && xattr_req)
- mq_req_xattr (this, loc, xattr_req);
+ mq_req_xattr (this, loc, xattr_req, NULL);
wind:
STACK_WIND (frame, marker_lookup_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->lookup, loc, xattr_req);
@@ -2854,7 +2859,7 @@ marker_readdirp (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
loc.parent = local->loc.inode = inode_ref (fd->inode);
- mq_req_xattr (this, &loc, dict);
+ mq_req_xattr (this, &loc, dict, NULL);
}
STACK_WIND (frame, marker_readdirp_cbk,