summaryrefslogtreecommitdiffstats
path: root/xlators/features/marker
diff options
context:
space:
mode:
authorvmallika <vmallika@redhat.com>2015-03-17 20:05:19 +0530
committerVijay Bellur <vbellur@redhat.com>2015-03-17 21:07:04 -0700
commit7970183f4c730d4aca3cfaa106fde015a0fbc415 (patch)
treebe48e8390e3acc8468061f95785b943a14c43ced /xlators/features/marker
parent33bb32ce5866a15e7d5164c67f214c4797236066 (diff)
Quota/marker : Support for inode quota
Currently, the only way to retrieve the number of files/objects in a directory or volume is to do a crawl of the entire directory/volume. This is expensive and is not scalable. The new mechanism proposes to store count of objects/files as part of an extended attribute of a directory. Each directory's extended attribute value will indicate the number of files/objects present in a tree with the directory being considered as the root of the tree. Currently file usage is accounted in marker by doing multiple FOPs like setting and getting xattrs. Doing this with STACK WIND and UNWIND can be harder to debug as involves multiple callbacks. In this code we are replacing current mechanism with syncop approach as syncop code is much simpler to follow and help us implement inode quota in an organized way. Change-Id: Ibf366fbe07037284e89a241ddaff7750fc8771b4 BUG: 1188636 Signed-off-by: vmallika <vmallika@redhat.com> Signed-off-by: Sachin Pandit <spandit@redhat.com> Reviewed-on: http://review.gluster.org/9567 Reviewed-by: Vijay Bellur <vbellur@redhat.com> Tested-by: Vijay Bellur <vbellur@redhat.com>
Diffstat (limited to 'xlators/features/marker')
-rw-r--r--xlators/features/marker/src/marker-mem-types.h2
-rw-r--r--xlators/features/marker/src/marker-quota-helper.c50
-rw-r--r--xlators/features/marker/src/marker-quota-helper.h2
-rw-r--r--xlators/features/marker/src/marker-quota.c1723
-rw-r--r--xlators/features/marker/src/marker-quota.h62
-rw-r--r--xlators/features/marker/src/marker.c67
6 files changed, 1658 insertions, 248 deletions
diff --git a/xlators/features/marker/src/marker-mem-types.h b/xlators/features/marker/src/marker-mem-types.h
index 1f74d50..dc5ad16 100644
--- a/xlators/features/marker/src/marker-mem-types.h
+++ b/xlators/features/marker/src/marker-mem-types.h
@@ -20,6 +20,8 @@ enum gf_marker_mem_types_ {
gf_marker_mt_quota_inode_ctx_t,
gf_marker_mt_marker_inode_ctx_t,
gf_marker_mt_inode_contribution_t,
+ gf_marker_mt_quota_meta_t,
+ gf_marker_mt_quota_synctask_t,
gf_marker_mt_end
};
#endif
diff --git a/xlators/features/marker/src/marker-quota-helper.c b/xlators/features/marker/src/marker-quota-helper.c
index ec0d833..cdc2475 100644
--- a/xlators/features/marker/src/marker-quota-helper.c
+++ b/xlators/features/marker/src/marker-quota-helper.c
@@ -37,23 +37,27 @@ mq_loc_fill (loc_t *loc, inode_t *inode, inode_t *parent, char *path)
if (parent)
loc->parent = inode_ref (parent);
+ if (!uuid_is_null (inode->gfid))
+ uuid_copy (loc->gfid, inode->gfid);
+
loc->path = gf_strdup (path);
if (!loc->path) {
gf_log ("loc fill", GF_LOG_ERROR, "strdup failed");
- goto loc_wipe;
+ goto out;
}
loc->name = strrchr (loc->path, '/');
if (loc->name)
loc->name++;
else
- goto loc_wipe;
+ goto out;
ret = 0;
-loc_wipe:
+
+out:
if (ret < 0)
loc_wipe (loc);
-out:
+
return ret;
}
@@ -222,37 +226,39 @@ mq_add_new_contribution_node (xlator_t *this, quota_inode_ctx_t *ctx,
int32_t
-mq_dict_set_contribution (xlator_t *this, dict_t *dict,
- loc_t *loc)
+mq_dict_set_contribution (xlator_t *this, dict_t *dict, loc_t *loc,
+ uuid_t gfid, char *contri_key)
{
- int32_t ret = -1;
- char contri_key [512] = {0, };
+ int32_t ret = -1;
+ char key[CONTRI_KEY_MAX] = {0, };
GF_VALIDATE_OR_GOTO ("marker", this, out);
GF_VALIDATE_OR_GOTO ("marker", dict, out);
GF_VALIDATE_OR_GOTO ("marker", loc, out);
- if (loc->parent) {
- GET_CONTRI_KEY (contri_key, loc->parent->gfid, ret);
- if (ret < 0) {
- ret = -1;
- goto out;
- }
+ if (gfid && !uuid_is_null(gfid)) {
+ GET_CONTRI_KEY (key, gfid, ret);
+ } else if (loc->parent) {
+ GET_CONTRI_KEY (key, loc->parent->gfid, ret);
} else {
/* nameless lookup, fetch contributions to all parents */
- GET_CONTRI_KEY (contri_key, NULL, ret);
+ GET_CONTRI_KEY (key, NULL, ret);
}
- ret = dict_set_int64 (dict, contri_key, 0);
- if (ret < 0) {
- gf_log (this->name, GF_LOG_WARNING,
- "unable to set dict value on %s.",
- loc->path);
+ if (ret < 0)
goto out;
- }
- ret = 0;
+ ret = dict_set_int64 (dict, key, 0);
+ if (ret < 0)
+ goto out;
+
+ if (contri_key)
+ strncpy (contri_key, key, CONTRI_KEY_MAX);
+
out:
+ if (ret < 0)
+ gf_log_callingfn (this->name, GF_LOG_ERROR, "dict set failed");
+
return ret;
}
diff --git a/xlators/features/marker/src/marker-quota-helper.h b/xlators/features/marker/src/marker-quota-helper.h
index b200413..161413d 100644
--- a/xlators/features/marker/src/marker-quota-helper.h
+++ b/xlators/features/marker/src/marker-quota-helper.h
@@ -44,7 +44,7 @@ inode_contribution_t *
mq_add_new_contribution_node (xlator_t *, quota_inode_ctx_t *, loc_t *);
int32_t
-mq_dict_set_contribution (xlator_t *, dict_t *, loc_t *);
+mq_dict_set_contribution (xlator_t *, dict_t *, loc_t *, uuid_t, char *);
quota_inode_ctx_t *
mq_inode_ctx_new (inode_t *, xlator_t *);
diff --git a/xlators/features/marker/src/marker-quota.c b/xlators/features/marker/src/marker-quota.c
index b8d7a77..835108c 100644
--- a/xlators/features/marker/src/marker-quota.c
+++ b/xlators/features/marker/src/marker-quota.c
@@ -20,6 +20,7 @@
#include "byte-order.h"
#include "marker-quota.h"
#include "marker-quota-helper.h"
+#include "syncop.h"
int
mq_loc_copy (loc_t *dst, loc_t *src)
@@ -478,11 +479,11 @@ mq_get_child_contribution (call_frame_t *frame,
dict_t *dict,
struct iatt *postparent)
{
- int32_t ret = -1;
- int32_t val = 0;
- char contri_key [512] = {0, };
- int64_t *contri = NULL;
- quota_local_t *local = NULL;
+ int32_t ret = -1;
+ int32_t val = 0;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ int64_t *contri = NULL;
+ quota_local_t *local = NULL;
local = frame->local;
@@ -543,16 +544,16 @@ mq_readdir_cbk (call_frame_t *frame,
int32_t op_errno,
gf_dirent_t *entries, dict_t *xdata)
{
- char contri_key [512] = {0, };
- int32_t ret = 0;
- int32_t val = 0;
- off_t offset = 0;
- int32_t count = 0;
- dict_t *dict = NULL;
- quota_local_t *local = NULL;
- gf_dirent_t *entry = NULL;
- call_frame_t *newframe = NULL;
- loc_t loc = {0, };
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ int32_t ret = 0;
+ int32_t val = 0;
+ off_t offset = 0;
+ int32_t count = 0;
+ dict_t *dict = NULL;
+ quota_local_t *local = NULL;
+ gf_dirent_t *entry = NULL;
+ call_frame_t *newframe = NULL;
+ loc_t loc = {0, };
local = mq_local_ref (frame->local);
@@ -824,7 +825,10 @@ mq_get_dirty_xattr (call_frame_t *frame, void *cookie, xlator_t *this,
if (uuid_is_null (local->loc.gfid))
uuid_copy (local->loc.gfid, local->loc.inode->gfid);
- GF_UUID_ASSERT (local->loc.gfid);
+ if (uuid_is_null (local->loc.gfid)) {
+ ret = -1;
+ goto err;
+ }
STACK_WIND (frame,
mq_check_if_still_dirty,
@@ -850,14 +854,12 @@ err:
* 0 other wise
*/
int32_t
-mq_update_dirty_inode (xlator_t *this,
- loc_t *loc,
- quota_inode_ctx_t *ctx,
+mq_update_dirty_inode (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx,
inode_contribution_t *contribution)
{
int32_t ret = -1;
quota_local_t *local = NULL;
- gf_boolean_t status = _gf_false;
+ gf_boolean_t status = _gf_false;
struct gf_flock lock = {0, };
call_frame_t *frame = NULL;
@@ -1017,14 +1019,14 @@ err:
int32_t
mq_create_xattr (xlator_t *this, call_frame_t *frame)
{
- int32_t ret = 0;
- int64_t *value = NULL;
- int64_t *size = NULL;
- dict_t *dict = NULL;
- char key[512] = {0, };
- quota_local_t *local = NULL;
- quota_inode_ctx_t *ctx = NULL;
- inode_contribution_t *contri = NULL;
+ int32_t ret = 0;
+ int64_t *value = NULL;
+ int64_t *size = NULL;
+ dict_t *dict = NULL;
+ char key[CONTRI_KEY_MAX] = {0, };
+ quota_local_t *local = NULL;
+ quota_inode_ctx_t *ctx = NULL;
+ inode_contribution_t *contri = NULL;
if (frame == NULL || this == NULL)
return 0;
@@ -1070,7 +1072,10 @@ mq_create_xattr (xlator_t *this, call_frame_t *frame)
goto free_value;
}
- GF_UUID_ASSERT (local->loc.gfid);
+ if (uuid_is_null (local->loc.gfid)) {
+ ret = -1;
+ goto out;
+ }
STACK_WIND (frame, mq_create_dirty_xattr, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->xattrop, &local->loc,
@@ -1105,11 +1110,12 @@ mq_check_n_set_inode_xattr (call_frame_t *frame, void *cookie,
inode_t *inode, struct iatt *buf, dict_t *dict,
struct iatt *postparent)
{
- quota_local_t *local = NULL;
- int64_t *size = NULL, *contri = NULL;
- int8_t dirty = 0;
- int32_t ret = 0;
- char contri_key[512] = {0, };
+ quota_local_t *local = NULL;
+ int64_t *size = NULL;
+ int64_t *contri = NULL;
+ int8_t dirty = 0;
+ int32_t ret = 0;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
if (op_ret < 0) {
goto out;
@@ -1174,7 +1180,7 @@ mq_get_xattr (call_frame_t *frame, void *cookie, xlator_t *this,
goto err;
}
- ret = mq_req_xattr (this, &local->loc, xattr_req);
+ ret = mq_req_xattr (this, &local->loc, xattr_req, NULL);
if (ret < 0) {
gf_log (this->name, GF_LOG_WARNING, "cannot request xattr");
goto err;
@@ -1183,7 +1189,10 @@ mq_get_xattr (call_frame_t *frame, void *cookie, xlator_t *this,
if (uuid_is_null (local->loc.gfid))
uuid_copy (local->loc.gfid, local->loc.inode->gfid);
- GF_UUID_ASSERT (local->loc.gfid);
+ if (uuid_is_null (local->loc.gfid)) {
+ ret = -1;
+ goto err;
+ }
STACK_WIND (frame, mq_check_n_set_inode_xattr, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->lookup, &local->loc, xattr_req);
@@ -1634,15 +1643,17 @@ mq_update_inode_contribution (call_frame_t *frame, void *cookie,
struct iatt *buf, dict_t *dict,
struct iatt *postparent)
{
- int32_t ret = -1;
- int64_t *size = NULL, size_int = 0, contri_int = 0;
- int64_t *contri = NULL;
- int64_t *delta = NULL;
- char contri_key [512] = {0, };
- dict_t *newdict = NULL;
- quota_local_t *local = NULL;
- quota_inode_ctx_t *ctx = NULL;
- inode_contribution_t *contribution = NULL;
+ int32_t ret = -1;
+ int64_t *size = NULL;
+ int64_t size_int = 0;
+ int64_t contri_int = 0;
+ int64_t *contri = NULL;
+ int64_t *delta = NULL;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ dict_t *newdict = NULL;
+ quota_local_t *local = NULL;
+ quota_inode_ctx_t *ctx = NULL;
+ inode_contribution_t *contribution = NULL;
local = frame->local;
@@ -1760,11 +1771,11 @@ mq_fetch_child_size_and_contri (call_frame_t *frame, void *cookie,
xlator_t *this, int32_t op_ret,
int32_t op_errno, dict_t *xdata)
{
- int32_t ret = -1;
- char contri_key [512] = {0, };
- dict_t *newdict = NULL;
- quota_local_t *local = NULL;
- quota_inode_ctx_t *ctx = NULL;
+ int32_t ret = -1;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ dict_t *newdict = NULL;
+ quota_local_t *local = NULL;
+ quota_inode_ctx_t *ctx = NULL;
local = frame->local;
@@ -2024,19 +2035,1156 @@ err:
return -1;
}
+int32_t
+mq_dict_set_meta (dict_t *dict, char *key, const quota_meta_t *meta,
+ ia_type_t ia_type)
+{
+ int32_t ret = -1;
+ quota_meta_t *value = NULL;
+
+ QUOTA_ALLOC_OR_GOTO (value, quota_meta_t, ret, out);
+
+ value->size = hton64 (meta->size);
+ value->file_count = hton64 (meta->file_count);
+ value->dir_count = hton64 (meta->dir_count);
+
+ if (ia_type == IA_IFDIR) {
+ ret = dict_set_bin (dict, key, value, sizeof (*value));
+ } else {
+ /* For a file we don't need to store dir_count in the
+ * quota size xattr, so we set the len of the data in the dict
+ * as 128bits, so when the posix xattrop reads the dict, it only
+ * performs operations on size and file_count
+ */
+ ret = dict_set_bin (dict, key, value,
+ sizeof (*value) - sizeof (int64_t));
+ }
+
+ if (ret < 0) {
+ gf_log_callingfn ("marker", GF_LOG_ERROR, "dict set failed");
+ GF_FREE (value);
+ }
+
+out:
+ return ret;
+}
+
+int32_t
+mq_dict_get_meta (dict_t *dict, char *key, quota_meta_t *meta)
+{
+ int32_t ret = -1;
+ data_t *data = NULL;
+ quota_meta_t *value = NULL;
+
+ if (!dict || !key || !meta)
+ goto out;
+
+ data = dict_get (dict, key);
+ if (!data || !data->data)
+ goto out;
+
+ if (data->len > sizeof (int64_t)) {
+ value = (quota_meta_t *) data->data;
+ meta->size = ntoh64 (value->size);
+ meta->file_count = ntoh64 (value->file_count);
+ if (data->len > (sizeof (int64_t)) * 2)
+ meta->dir_count = ntoh64 (value->dir_count);
+ else
+ meta->dir_count = 0;
+ } else {
+ /* This can happen during software upgrade.
+ * Older version of glusterfs will not have inode count.
+ * Return failure, this will be healed as part of lookup
+ */
+ gf_log_callingfn ("marker", GF_LOG_DEBUG, "Object quota xattrs "
+ "missing: len = %d", data->len);
+ ret = -1;
+ goto out;
+ }
+
+ ret = 0;
+out:
+
+ return ret;
+}
+
+void
+mq_compute_delta (quota_meta_t *delta, const quota_meta_t *op1,
+ const quota_meta_t *op2)
+{
+ delta->size = op1->size - op2->size;
+ delta->file_count = op1->file_count - op2->file_count;
+ delta->dir_count = op1->dir_count - op2->dir_count;
+}
+
+void
+mq_add_meta (quota_meta_t *dst, const quota_meta_t *src)
+{
+ dst->size += src->size;
+ dst->file_count += src->file_count;
+ dst->dir_count += src->dir_count;
+}
+
+void
+mq_sub_meta (quota_meta_t *dst, const quota_meta_t *src)
+{
+ if (src == NULL) {
+ dst->size = -dst->size;
+ dst->file_count = -dst->file_count;
+ dst->dir_count = -dst->dir_count;
+ } else {
+ dst->size = src->size - dst->size;
+ dst->file_count = src->file_count - dst->file_count;
+ dst->dir_count = src->dir_count - dst->dir_count;
+ }
+}
+
+gf_boolean_t
+quota_meta_is_null (const quota_meta_t *meta)
+{
+ if (meta->size == 0 &&
+ meta->file_count == 0 &&
+ meta->dir_count == 0)
+ return _gf_true;
+
+ return _gf_false;
+}
+
+int32_t
+mq_are_xattrs_set (xlator_t *this, loc_t *loc, gf_boolean_t *result,
+ gf_boolean_t *objects)
+{
+ int32_t ret = -1;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ quota_meta_t meta = {0, };
+ struct iatt stbuf = {0,};
+ dict_t *dict = NULL;
+ dict_t *rsp_dict = NULL;
+
+ dict = dict_new ();
+ if (dict == NULL) {
+ gf_log (this->name, GF_LOG_ERROR, "dict_new failed");
+ goto out;
+ }
+
+ ret = mq_req_xattr (this, loc, dict, contri_key);
+ if (ret < 0)
+ goto out;
+
+ ret = syncop_lookup (FIRST_CHILD(this), loc, dict, &stbuf, &rsp_dict,
+ NULL);
+ if (ret < 0) {
+ gf_log (this->name, (-ret == ENOENT || -ret == ESTALE)
+ ? GF_LOG_DEBUG:GF_LOG_ERROR, "lookup failed "
+ "for %s: %s", loc->path, strerror (-ret));
+ goto out;
+ }
+
+ if (rsp_dict == NULL) {
+ *result = _gf_false;
+ goto out;
+ }
+
+ *result = _gf_true;
+ if (loc->inode->ia_type == IA_IFDIR) {
+ ret = mq_dict_get_meta (rsp_dict, QUOTA_SIZE_KEY, &meta);
+ if (ret < 0 || meta.dir_count == 0) {
+ ret = 0;
+ *result = _gf_false;
+ goto out;
+ }
+ *objects = _gf_true;
+ }
+
+ if (!loc_is_root(loc) && !dict_get (rsp_dict, contri_key)) {
+ *result = _gf_false;
+ goto out;
+ }
+
+out:
+ if (dict)
+ dict_unref (dict);
+
+ if (rsp_dict)
+ dict_unref (rsp_dict);
+
+ return ret;
+}
+
+int32_t
+mq_create_xattrs (xlator_t *this, loc_t *loc, gf_boolean_t objects)
+{
+ quota_meta_t size = {0, };
+ quota_meta_t contri = {0, };
+ int32_t ret = -1;
+ char key[CONTRI_KEY_MAX] = {0, };
+ dict_t *dict = NULL;
+ quota_inode_ctx_t *ctx = NULL;
+ inode_contribution_t *contribution = NULL;
+
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+
+ ret = mq_inode_ctx_get (loc->inode, this, &ctx);
+ if (ret < 0) {
+ ctx = mq_inode_ctx_new (loc->inode, this);
+ if (ctx == NULL) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "mq_inode_ctx_new failed");
+ ret = -1;
+ goto out;
+ }
+ }
+
+ dict = dict_new ();
+ if (!dict) {
+ gf_log (this->name, GF_LOG_ERROR, "dict_new failed");
+ ret = -1;
+ goto out;
+ }
+
+ if (loc->inode->ia_type == IA_IFDIR) {
+ if (objects == _gf_false) {
+ /* Initial object count of a directory is 1 */
+ size.dir_count = 1;
+ }
+ ret = mq_dict_set_meta (dict, QUOTA_SIZE_KEY, &size, IA_IFDIR);
+ if (ret < 0)
+ goto out;
+ }
+
+ if (!loc_is_root (loc)) {
+ contribution = mq_add_new_contribution_node (this, ctx, loc);
+ if (contribution == NULL) {
+ ret = -1;
+ goto out;
+ }
+
+ GET_CONTRI_KEY (key, contribution->gfid, ret);
+ ret = mq_dict_set_meta (dict, key, &contri,
+ loc->inode->ia_type);
+ if (ret < 0)
+ goto out;
+ }
+
+ ret = syncop_xattrop(FIRST_CHILD(this), loc, GF_XATTROP_ADD_ARRAY64,
+ dict, NULL);
+
+ if (ret < 0) {
+ gf_log (this->name, (-ret == ENOENT || -ret == ESTALE)
+ ? GF_LOG_DEBUG:GF_LOG_ERROR, "xattrop failed "
+ "for %s: %s", loc->path, strerror (-ret));
+ goto out;
+ }
+
+out:
+ if (dict)
+ dict_unref (dict);
+
+ return ret;
+}
+
+int32_t
+mq_lock (xlator_t *this, loc_t *loc, short l_type)
+{
+ struct gf_flock lock = {0, };
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+
+ gf_log (this->name, GF_LOG_DEBUG, "set lock type %d on %s",
+ l_type, loc->path);
+
+ lock.l_len = 0;
+ lock.l_start = 0;
+ lock.l_type = l_type;
+ lock.l_whence = SEEK_SET;
+
+ ret = syncop_inodelk (FIRST_CHILD(this), this->name, loc, F_SETLKW,
+ &lock, NULL, NULL);
+ if (ret < 0)
+ gf_log (this->name, (-ret == ENOENT || -ret == ESTALE)
+ ? GF_LOG_DEBUG:GF_LOG_ERROR, "inodelk failed "
+ "for %s: %s", loc->path, strerror (-ret));
+
+out:
+
+ return ret;
+}
+
+int32_t
+mq_get_dirty (xlator_t *this, loc_t *loc, int32_t *dirty)
+{
+ int32_t ret = -1;
+ int8_t value = 0;
+ dict_t *dict = NULL;
+ dict_t *rsp_dict = NULL;
+ struct iatt stbuf = {0,};
+
+ dict = dict_new ();
+ if (dict == NULL) {
+ gf_log (this->name, GF_LOG_ERROR, "dict_new failed");
+ goto out;
+ }
+
+ ret = dict_set_int64 (dict, QUOTA_DIRTY_KEY, 0);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_WARNING, "dict set failed");
+ goto out;
+ }
+
+ ret = syncop_lookup (FIRST_CHILD(this), loc, dict, &stbuf, &rsp_dict,
+ NULL);
+ if (ret < 0) {
+ gf_log (this->name, (-ret == ENOENT || -ret == ESTALE)
+ ? GF_LOG_DEBUG:GF_LOG_ERROR, "lookup failed "
+ "for %s: %s", loc->path, strerror (-ret));
+ goto out;
+ }
+
+ ret = dict_get_int8 (rsp_dict, QUOTA_DIRTY_KEY, &value);
+ if (ret < 0)
+ goto out;
+
+ *dirty = value;
+
+out:
+ if (dict)
+ dict_unref (dict);
+
+ if (rsp_dict)
+ dict_unref (rsp_dict);
+
+ return ret;
+}
+
+int32_t
+mq_mark_dirty (xlator_t *this, loc_t *loc, int32_t dirty)
+{
+ int32_t ret = -1;
+ dict_t *dict = NULL;
+ quota_inode_ctx_t *ctx = NULL;
+
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+
+ dict = dict_new ();
+ if (!dict) {
+ gf_log (this->name, GF_LOG_ERROR, "dict_new failed");
+ goto out;
+ }
+
+ ret = dict_set_int8 (dict, QUOTA_DIRTY_KEY, dirty);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "dict_set failed");
+ goto out;
+ }
+
+ ret = syncop_setxattr (FIRST_CHILD(this), loc, dict, 0);
+ if (ret < 0) {
+ gf_log (this->name, (-ret == ENOENT || -ret == ESTALE)
+ ? GF_LOG_DEBUG:GF_LOG_ERROR, "setxattr dirty = %d "
+ "failed for %s: %s", dirty, loc->path, strerror (-ret));
+ goto out;
+ }
+
+ ret = mq_inode_ctx_get (loc->inode, this, &ctx);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to get inode ctx for "
+ "%s", loc->path);
+ goto out;
+ }
+
+ LOCK (&ctx->lock);
+ {
+ ctx->dirty = dirty;
+ }
+ UNLOCK (&ctx->lock);
+
+out:
+ if (dict)
+ dict_unref (dict);
+
+ return ret;
+}
+
+int32_t
+_mq_get_metadata (xlator_t *this, loc_t *loc, quota_meta_t *contri,
+ quota_meta_t *size, uuid_t contri_gfid)
+{
+ int32_t ret = -1;
+ quota_meta_t meta = {0, };
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ dict_t *dict = NULL;
+ dict_t *rsp_dict = NULL;
+ struct iatt stbuf = {0,};
+
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+
+ if (size == NULL && contri == NULL)
+ goto out;
+
+ dict = dict_new ();
+ if (dict == NULL) {
+ gf_log (this->name, GF_LOG_ERROR, "dict_new failed");
+ goto out;
+ }
+
+ if (size && loc->inode->ia_type == IA_IFDIR) {
+ ret = dict_set_int64 (dict, QUOTA_SIZE_KEY, 0);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "dict_set failed.");
+ goto out;
+ }
+ }
+
+ if (contri && !loc_is_root(loc)) {
+ ret = mq_dict_set_contribution (this, dict, loc, contri_gfid,
+ contri_key);
+ if (ret < 0)
+ goto out;
+ }
+
+ ret = syncop_lookup (FIRST_CHILD(this), loc, dict, &stbuf, &rsp_dict,
+ NULL);
+ if (ret < 0) {
+ gf_log (this->name, (-ret == ENOENT || -ret == ESTALE)
+ ? GF_LOG_DEBUG:GF_LOG_ERROR, "lookup failed "
+ "for %s: %s", loc->path, strerror (-ret));
+ goto out;
+ }
+
+ if (size) {
+ if (loc->inode->ia_type == IA_IFDIR) {
+ ret = mq_dict_get_meta (rsp_dict, QUOTA_SIZE_KEY,
+ &meta);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "dict_get failed.");
+ goto out;
+ }
+
+ size->size = meta.size;
+ size->file_count = meta.file_count;
+ size->dir_count = meta.dir_count;
+ } else {
+ size->size = stbuf.ia_blocks * 512;
+ size->file_count = 1;
+ size->dir_count = 0;
+ }
+ }
+
+ if (contri && !loc_is_root(loc)) {
+ ret = mq_dict_get_meta (rsp_dict, contri_key, &meta);
+ if (ret < 0) {
+ contri->size = 0;
+ contri->file_count = 0;
+ contri->dir_count = 0;
+ } else {
+ contri->size = meta.size;
+ contri->file_count = meta.file_count;
+ contri->dir_count = meta.dir_count;
+ }
+ }
+
+ ret = 0;
+
+out:
+ if (dict)
+ dict_unref (dict);
+
+ if (rsp_dict)
+ dict_unref (rsp_dict);
+
+ return ret;
+}
+
+int32_t
+mq_get_metadata (xlator_t *this, loc_t *loc, quota_meta_t *contri,
+ quota_meta_t *size, quota_inode_ctx_t *ctx,
+ inode_contribution_t *contribution)
+{
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+ GF_VALIDATE_OR_GOTO ("marker", ctx, out);
+ GF_VALIDATE_OR_GOTO ("marker", contribution, out);
+
+ if (size == NULL && contri == NULL) {
+ ret = 0;
+ goto out;
+ }
+
+ ret = _mq_get_metadata (this, loc, contri, size, contribution->gfid);
+ if (ret < 0) {
+ gf_log_callingfn (this->name, GF_LOG_ERROR, "Failed to get "
+ "metadata for %s", loc->path);
+ goto out;
+ }
+
+ if (size) {
+ LOCK (&ctx->lock);
+ {
+ ctx->size = size->size;
+ ctx->file_count = size->file_count;
+ ctx->dir_count = size->dir_count;
+ }
+ UNLOCK (&ctx->lock);
+ }
+
+ if (contri) {
+ LOCK (&contribution->lock);
+ {
+ contribution->contribution = contri->size;
+ contribution->file_count = contri->file_count;
+ contribution->dir_count = contri->dir_count;
+ }
+ UNLOCK (&contribution->lock);
+ }
+
+out:
+ return ret;
+}
+
+int32_t
+mq_get_size (xlator_t *this, loc_t *loc, quota_meta_t *size)
+{
+ int32_t ret = -1;
+
+ ret = _mq_get_metadata (this, loc, NULL, size, 0);
+ if (ret < 0) {
+ gf_log_callingfn (this->name, GF_LOG_ERROR, "Failed to get "
+ "metadata for %s", loc->path);
+ goto out;
+ }
+
+out:
+ return ret;
+}
+
+int32_t
+mq_get_contri (xlator_t *this, loc_t *loc, quota_meta_t *contri,
+ uuid_t contri_gfid)
+{
+ int32_t ret = -1;
+
+ ret = _mq_get_metadata (this, loc, contri, NULL, contri_gfid);
+ if (ret < 0) {
+ gf_log_callingfn (this->name, GF_LOG_ERROR, "Failed to get "
+ "metadata for %s", loc->path);
+ goto out;
+ }
+
+out:
+ return ret;
+}
+
+int32_t
+mq_get_delta (xlator_t *this, loc_t *loc, quota_meta_t *delta,
+ quota_inode_ctx_t *ctx, inode_contribution_t *contribution)
+{
+ int32_t ret = -1;
+ quota_meta_t size = {0, };
+ quota_meta_t contri = {0, };
+
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+ GF_VALIDATE_OR_GOTO ("marker", ctx, out);
+ GF_VALIDATE_OR_GOTO ("marker", contribution, out);
+
+ ret = mq_get_metadata (this, loc, &contri, &size, ctx, contribution);
+ if (ret < 0)
+ goto out;
+
+ mq_compute_delta (delta, &size, &contri);
+
+out:
+ return ret;
+}
+
+int32_t
+mq_remove_contri (xlator_t *this, loc_t *loc, inode_contribution_t *contri)
+{
+ int32_t ret = -1;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+
+ GET_CONTRI_KEY (contri_key, contri->gfid, ret);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "get contri_key "
+ "failed for %s", uuid_utoa(contri->gfid));
+ goto out;
+ }
+
+ ret = syncop_removexattr (FIRST_CHILD(this), loc, contri_key, 0);
+ if (ret < 0) {
+ if (-ret == ENOENT || -ret == ESTALE) {
+ /* Remove contri in done when unlink operation is
+ * performed, so return success on ENOENT/ESTSLE
+ */
+ ret = 0;
+ } else {
+ gf_log (this->name, GF_LOG_ERROR, "removexattr %s "
+ "failed for %s: %s", contri_key, loc->path,
+ strerror (-ret));
+ goto out;
+ }
+ }
+
+ LOCK (&contri->lock);
+ {
+ contri->contribution = 0;
+ contri->file_count = 0;
+ contri->dir_count = 0;
+ }
+ UNLOCK (&contri->lock);
+
+ ret = 0;
+out:
+
+ return ret;
+}
+
+int32_t
+mq_update_contri (xlator_t *this, loc_t *loc, inode_contribution_t *contri,
+ quota_meta_t *delta)
+{
+ int32_t ret = -1;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ dict_t *dict = NULL;
+
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+ GF_VALIDATE_OR_GOTO ("marker", delta, out);
+ GF_VALIDATE_OR_GOTO ("marker", contri, out);
+
+ if (quota_meta_is_null (delta)) {
+ ret = 0;
+ goto out;
+ }
+
+ dict = dict_new ();
+ if (!dict) {
+ gf_log (this->name, GF_LOG_ERROR, "dict_new failed");
+ ret = -1;
+ goto out;
+ }
+
+ GET_CONTRI_KEY (contri_key, contri->gfid, ret);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "get contri_key "
+ "failed for %s", uuid_utoa(contri->gfid));
+ goto out;
+ }
+
+ ret = mq_dict_set_meta (dict, contri_key, delta, loc->inode->ia_type);
+ if (ret < 0)
+ goto out;
+
+ ret = syncop_xattrop(FIRST_CHILD(this), loc, GF_XATTROP_ADD_ARRAY64,
+ dict, NULL);
+ if (ret < 0) {
+ gf_log (this->name, (-ret == ENOENT || -ret == ESTALE)
+ ? GF_LOG_DEBUG:GF_LOG_ERROR, "xattrop failed "
+ "for %s: %s", loc->path, strerror (-ret));
+ goto out;
+ }
+
+ LOCK (&contri->lock);
+ {
+ contri->contribution += delta->size;
+ contri->file_count += delta->file_count;
+ contri->dir_count += delta->dir_count;
+ }
+ UNLOCK (&contri->lock);
+
+out:
+ if (dict)
+ dict_unref (dict);
+
+ return ret;
+}
+
+int32_t
+mq_update_size (xlator_t *this, loc_t *loc, quota_meta_t *delta)
+{
+ int32_t ret = -1;
+ quota_inode_ctx_t *ctx = NULL;
+ dict_t *dict = NULL;
+
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+ GF_VALIDATE_OR_GOTO ("marker", delta, out);
+
+ if (quota_meta_is_null (delta)) {
+ ret = 0;
+ goto out;
+ }
+
+ ret = mq_inode_ctx_get (loc->inode, this, &ctx);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to get inode ctx for "
+ "%s", loc->path);
+ goto out;
+ }
+
+ dict = dict_new ();
+ if (!dict) {
+ gf_log (this->name, GF_LOG_ERROR, "dict_new failed");
+ ret = -1;
+ goto out;
+ }
+
+ ret = mq_dict_set_meta (dict, QUOTA_SIZE_KEY, delta,
+ loc->inode->ia_type);
+ if (ret < 0)
+ goto out;
+
+ ret = syncop_xattrop(FIRST_CHILD(this), loc, GF_XATTROP_ADD_ARRAY64,
+ dict, NULL);
+ if (ret < 0) {
+ gf_log (this->name, (-ret == ENOENT || -ret == ESTALE)
+ ? GF_LOG_DEBUG:GF_LOG_ERROR, "xattrop failed "
+ "for %s: %s", loc->path, strerror (-ret));
+ goto out;
+ }
+
+ LOCK (&ctx->lock);
+ {
+ ctx->size += delta->size;
+ ctx->file_count += delta->file_count;
+ ctx->dir_count += delta->dir_count;
+ }
+ UNLOCK (&ctx->lock);
+
+out:
+ if (dict)
+ dict_unref (dict);
+
+ return ret;
+}
int
-mq_initiate_quota_txn (xlator_t *this, loc_t *loc)
+mq_synctask_cleanup (int ret, call_frame_t *frame, void *opaque)
{
- int32_t ret = -1;
- gf_boolean_t status = _gf_false;
- quota_inode_ctx_t *ctx = NULL;
- inode_contribution_t *contribution = NULL;
+ quota_synctask_t *args = NULL;
+
+ GF_ASSERT (opaque);
+
+ args = (quota_synctask_t *) opaque;
+ loc_wipe (&args->loc);
+ if (args->dict)
+ dict_unref (args->dict);
+
+ if (!args->is_static)
+ GF_FREE (args);
+
+ return 0;
+}
+
+int
+mq_synctask (xlator_t *this, synctask_fn_t task, gf_boolean_t spawn, loc_t *loc,
+ dict_t *dict, struct iatt *buf, int64_t contri)
+{
+ int32_t ret = -1;
+ quota_synctask_t *args = NULL;
+ quota_synctask_t static_args = {0, };
+
+ if (spawn) {
+ QUOTA_ALLOC_OR_GOTO (args, quota_synctask_t, ret, out);
+ args->is_static = _gf_false;
+ } else {
+ args = &static_args;
+ args->is_static = _gf_true;
+ }
+
+ args->this = this;
+ loc_copy (&args->loc, loc);
+ args->contri = contri;
+ if (dict)
+ args->dict = dict_ref (dict);
+ if (buf)
+ args->buf = *buf;
+
+
+ if (spawn) {
+ ret = synctask_new (this->ctx->env, task, mq_synctask_cleanup,
+ NULL, args);
+ if (ret) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to spawn "
+ "new synctask");
+ mq_synctask_cleanup (ret, NULL, args);
+ }
+ } else {
+ ret = task (args);
+ mq_synctask_cleanup (ret, NULL, args);
+ }
+
+out:
+ return ret;
+}
+
+int
+mq_start_quota_txn_v2 (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx,
+ inode_contribution_t *contri)
+{
+ int32_t ret = -1;
+ loc_t child_loc = {0,};
+ loc_t parent_loc = {0,};
+ gf_boolean_t locked = _gf_false;
+ gf_boolean_t dirty = _gf_false;
+ gf_boolean_t status = _gf_true;
+ quota_meta_t delta = {0, };
+
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+ GF_VALIDATE_OR_GOTO ("marker", ctx, out);
+ GF_VALIDATE_OR_GOTO ("marker", contri, out);
+
+ ret = mq_loc_copy (&child_loc, loc);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "loc copy failed");
+ goto out;
+ }
+
+ if (uuid_is_null (child_loc.gfid))
+ uuid_copy (child_loc.gfid, child_loc.inode->gfid);
+
+ if (uuid_is_null (child_loc.gfid)) {
+ ret = -1;
+ gf_log (this->name, GF_LOG_DEBUG, "UUID is null for %s",
+ child_loc.path);
+ goto out;
+ }
+
+ while (!__is_root_gfid (child_loc.gfid)) {
+ /* To improve performance, abort current transaction
+ * if one is already in progress for same inode
+ */
+ ret = mq_test_and_set_ctx_updation_status (ctx, &status);
+ if (ret < 0 || status == _gf_true)
+ goto out;
+
+ ret = mq_inode_loc_fill (NULL, child_loc.parent, &parent_loc);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "loc fill failed");
+ goto out;
+ }
+
+ ret = mq_lock (this, &parent_loc, F_WRLCK);
+ if (ret < 0)
+ goto out;
+ locked = _gf_true;
+
+ mq_set_ctx_updation_status (ctx, _gf_false);
+ status = _gf_true;
+
+ ret = mq_get_delta (this, &child_loc, &delta, ctx, contri);
+ if (ret < 0)
+ goto out;
+
+ if (quota_meta_is_null (&delta))
+ goto out;
+
+ ret = mq_mark_dirty (this, &parent_loc, 1);
+ if (ret < 0)
+ goto out;
+ dirty = _gf_true;
+
+ ret = mq_update_contri (this, &child_loc, contri, &delta);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "contri "
+ "update failed for %s", child_loc.path);
+ goto out;
+ }
+
+ ret = mq_update_size (this, &parent_loc, &delta);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_WARNING, "rollback "
+ "contri updation");
+ mq_sub_meta (&delta, NULL);
+ mq_update_contri (this, &child_loc, contri, &delta);
+ goto out;
+ }
+
+ ret = mq_mark_dirty (this, &parent_loc, 0);
+ dirty = _gf_false;
+
+ ret = mq_lock (this, &parent_loc, F_UNLCK);
+ locked = _gf_false;
+
+ if (__is_root_gfid (parent_loc.gfid))
+ break;
+
+ /* Repeate above steps upwards till the root */
+ loc_wipe (&child_loc);
+ ret = mq_loc_copy (&child_loc, &parent_loc);
+ if (ret < 0)
+ goto out;
+ loc_wipe (&parent_loc);
+
+ ret = mq_inode_ctx_get (child_loc.inode, this, &ctx);
+ if (ret < 0)
+ goto out;
+
+ if (list_empty (&ctx->contribution_head)) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "contribution node list is empty (%s)",
+ uuid_utoa(child_loc.inode->gfid));
+ ret = -1;
+ goto out;
+ }
+ contri = mq_get_contribution_node (child_loc.parent, ctx);
+ GF_ASSERT (contri != NULL);
+ }
+
+out:
+ if (ret >= 0 && dirty)
+ ret = mq_mark_dirty (this, &parent_loc, 0);
+
+ if (locked)
+ ret = mq_lock (this, &parent_loc, F_UNLCK);
+
+ if (status == _gf_false)
+ mq_set_ctx_updation_status (ctx, _gf_false);
+
+ loc_wipe (&child_loc);
+ loc_wipe (&parent_loc);
+
+ return ret;
+}
+
+int
+mq_create_xattrs_task (void *opaque)
+{
+ int32_t ret = -1;
+ gf_boolean_t locked = _gf_false;
+ gf_boolean_t xattrs_set = _gf_false;
+ gf_boolean_t objects = _gf_false;
+ gf_boolean_t need_txn = _gf_false;
+ quota_synctask_t *args = NULL;
+ xlator_t *this = NULL;
+ loc_t *loc = NULL;
+
+ GF_ASSERT (opaque);
+
+ args = (quota_synctask_t *) opaque;
+ loc = &args->loc;
+ this = args->this;
+ THIS = this;
+
+ if (uuid_is_null (loc->gfid))
+ uuid_copy (loc->gfid, loc->inode->gfid);
+
+ if (uuid_is_null (loc->gfid)) {
+ ret = -1;
+ gf_log (this->name, GF_LOG_DEBUG, "UUID is null for %s",
+ loc->path);
+ goto out;
+ }
+
+ ret = mq_lock (this, loc, F_WRLCK);
+ if (ret < 0)
+ goto out;
+ locked = _gf_true;
+
+ ret = mq_are_xattrs_set (this, loc, &xattrs_set, &objects);
+ if (ret < 0 || xattrs_set)
+ goto out;
+
+ ret = mq_create_xattrs (this, loc, objects);
+ if (ret < 0)
+ goto out;
+
+ need_txn = _gf_true;
+out:
+ if (locked)
+ ret = mq_lock (this, loc, F_UNLCK);
+
+ if (need_txn)
+ ret = mq_initiate_quota_blocking_txn (this, loc);
+
+ return ret;
+}
+
+int
+mq_create_xattrs_txn (xlator_t *this, loc_t *loc)
+{
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+
+ ret = mq_synctask (this, mq_create_xattrs_task, _gf_true, loc, NULL,
+ NULL, 0);
+out:
+ return ret;
+}
+
+int
+mq_create_xattrs_blocking_txn (xlator_t *this, loc_t *loc)
+{
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+
+ ret = mq_synctask (this, mq_create_xattrs_task, _gf_false, loc, NULL,
+ NULL, 0);
+out:
+ return ret;
+}
+
+int32_t
+mq_reduce_parent_size_task (void *opaque)
+{
+ int32_t ret = -1;
+ quota_inode_ctx_t *ctx = NULL;
+ inode_contribution_t *contribution = NULL;
+ quota_meta_t delta = {0, };
+ loc_t parent_loc = {0,};
+ gf_boolean_t locked = _gf_false;
+ gf_boolean_t dirty = _gf_false;
+ quota_synctask_t *args = NULL;
+ xlator_t *this = NULL;
+ loc_t *loc = NULL;
+ int64_t contri = 0;
+
+ GF_ASSERT (opaque);
+
+ args = (quota_synctask_t *) opaque;
+ loc = &args->loc;
+ contri = args->contri;
+ this = args->this;
+ THIS = this;
+
+ ret = mq_inode_ctx_get (loc->inode, this, &ctx);
+ if (ret < 0) {
+ gf_log_callingfn (this->name, GF_LOG_WARNING, "ctx for"
+ " the node %s is NULL", loc->path);
+ goto out;
+ }
+
+ contribution = mq_get_contribution_node (loc->parent, ctx);
+ if (contribution == NULL) {
+ ret = -1;
+ gf_log_callingfn (this->name, GF_LOG_WARNING,
+ "contribution for the node %s is NULL",
+ loc->path);
+ goto out;
+ }
+
+ if (contri >= 0) {
+ /* contri paramater is supplied only for rename operation */
+ delta.size = contri;
+ delta.file_count = 1;
+ delta.dir_count = 0;
+ }
+
+ ret = mq_inode_loc_fill (NULL, loc->parent, &parent_loc);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "loc fill failed");
+ goto out;
+ }
+
+ ret = mq_lock (this, &parent_loc, F_WRLCK);
+ if (ret < 0)
+ goto out;
+ locked = _gf_true;
+
+ if (contri < 0) {
+ LOCK (&contribution->lock);
+ {
+ delta.size = contribution->contribution;
+ delta.file_count = contribution->file_count;
+ delta.dir_count = contribution->dir_count;
+ }
+ UNLOCK (&contribution->lock);
+ }
+
+ /* TODO: Handle handlinks with better approach
+ Iterating dentry_list without a lock is not a good idea
+ if (loc->inode->ia_type != IA_IFDIR) {
+ list_for_each_entry (dentry, &inode->dentry_list, inode_list) {
+ if (loc->parent == dentry->parent) {
+ * If the file has another link within the same
+ * directory, we should not be reducing the size
+ * of parent
+ *
+ delta = 0;
+ idelta = 0;
+ break;
+ }
+ }
+ }
+ */
+
+ if (quota_meta_is_null (&delta))
+ goto out;
+
+ ret = mq_mark_dirty (this, &parent_loc, 1);
+ if (ret < 0)
+ goto out;
+ dirty = _gf_true;
+
+ ret = mq_remove_contri (this, loc, contribution);
+ if (ret < 0)
+ goto out;
+
+ mq_sub_meta (&delta, NULL);
+ ret = mq_update_size (this, &parent_loc, &delta);
+ if (ret < 0)
+ goto out;
+
+out:
+ if (dirty && ret >= 0)
+ ret = mq_mark_dirty (this, &parent_loc, 0);
+
+ if (locked)
+ ret = mq_lock (this, &parent_loc, F_UNLCK);
+
+ if (ret >= 0)
+ ret = mq_initiate_quota_blocking_txn (this, &parent_loc);
+
+ loc_wipe (&parent_loc);
+
+ return ret;
+}
+
+int32_t
+mq_reduce_parent_size_txn (xlator_t *this, loc_t *loc, int64_t contri)
+{
+ int32_t ret = -1;
GF_VALIDATE_OR_GOTO ("marker", this, out);
GF_VALIDATE_OR_GOTO ("marker", loc, out);
GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+ ret = mq_synctask (this, mq_reduce_parent_size_task, _gf_true, loc,
+ NULL, NULL, contri);
+out:
+ return ret;
+}
+
+int
+mq_initiate_quota_task (void *opaque)
+{
+ int32_t ret = -1;
+ quota_inode_ctx_t *ctx = NULL;
+ inode_contribution_t *contribution = NULL;
+ quota_synctask_t *args = NULL;
+ xlator_t *this = NULL;
+ loc_t *loc = NULL;
+
+ GF_ASSERT (opaque);
+
+ args = (quota_synctask_t *) opaque;
+ loc = &args->loc;
+ this = args->this;
+ THIS = this;
+
ret = mq_inode_ctx_get (loc->inode, this, &ctx);
if (ret == -1) {
gf_log (this->name, GF_LOG_WARNING,
@@ -2057,69 +3205,259 @@ mq_initiate_quota_txn (xlator_t *this, loc_t *loc)
*/
contribution = mq_get_contribution_node (loc->parent, ctx);
if (!contribution) {
- if ((loc->path && strcmp (loc->path, "/"))
- || (!uuid_is_null (loc->gfid)
- && !__is_root_gfid (loc->gfid))
- || (loc->inode && !uuid_is_null (loc->inode->gfid)
- && !__is_root_gfid (loc->inode->gfid)))
+ if (!loc_is_root(loc))
gf_log_callingfn (this->name, GF_LOG_TRACE,
"contribution node for the "
"path (%s) with parent (%s) "
"not found", loc->path,
- loc->parent?
- uuid_utoa (loc->parent->gfid):
+ loc->parent ?
+ uuid_utoa (loc->parent->gfid) :
NULL);
contribution = mq_add_new_contribution_node (this, ctx, loc);
if (!contribution) {
- if(loc->path && strcmp (loc->path, "/"))
+ if (!loc_is_root(loc))
gf_log_callingfn (this->name, GF_LOG_WARNING,
"could not allocate "
" contribution node for (%s) "
"parent: (%s)", loc->path,
- loc->parent?
- uuid_utoa (loc->parent->gfid):
+ loc->parent ?
+ uuid_utoa (loc->parent->gfid) :
NULL);
goto out;
}
}
- /* To improve performance, do not start another transaction
- * if one is already in progress for same inode
- */
- status = _gf_true;
+ mq_start_quota_txn_v2 (this, loc, ctx, contribution);
+
+ ret = 0;
+out:
+ return ret;
+}
+
+int
+mq_initiate_quota_txn (xlator_t *this, loc_t *loc)
+{
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("marker", this, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+
+ ret = mq_synctask (this, mq_initiate_quota_task, _gf_true, loc, NULL,
+ NULL, 0);
+out:
+ return ret;
+}
+
+int
+mq_initiate_quota_blocking_txn (xlator_t *this, loc_t *loc)
+{
+ int32_t ret = -1;
+
+ GF_VALIDATE_OR_GOTO ("marker", this, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+
+ ret = mq_synctask (this, mq_initiate_quota_task, _gf_false, loc, NULL,
+ NULL, 0);
+out:
+ return ret;
+}
+
+/* return 1 when dirty updation is performed
+ * return 0 other wise
+ */
+int32_t
+mq_update_dirty_inode_v2 (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx,
+ inode_contribution_t *contribution)
+{
+ int32_t ret = -1;
+ fd_t *fd = NULL;
+ off_t offset = 0;
+ loc_t child_loc = {0, };
+ gf_dirent_t entries;
+ gf_dirent_t *entry = NULL;
+ gf_boolean_t status = _gf_true;
+ gf_boolean_t locked = _gf_false;
+ gf_boolean_t free_entries = _gf_false;
+ gf_boolean_t updated = _gf_false;
+ int32_t dirty = 0;
+ quota_meta_t contri = {0, };
+ quota_meta_t size = {0, };
+ quota_meta_t contri_sum = {0, };
+ quota_meta_t delta = {0, };
+
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
+
+ ret = mq_get_ctx_updation_status (ctx, &status);
+ if (ret == -1 || status == _gf_true) {
+ ret = 0;
+ goto out;
+ }
+
+ if (uuid_is_null (loc->gfid))
+ uuid_copy (loc->gfid, loc->inode->gfid);
- ret = mq_test_and_set_ctx_updation_status (ctx, &status);
+ if (uuid_is_null (loc->gfid)) {
+ ret = -1;
+ gf_log (this->name, GF_LOG_DEBUG, "UUID is null for %s",
+ loc->path);
+ goto out;
+ }
+
+ ret = mq_lock (this, loc, F_WRLCK);
if (ret < 0)
goto out;
+ locked = _gf_true;
- if (status == _gf_false) {
- mq_start_quota_txn (this, loc, ctx, contribution);
+ ret = mq_get_dirty (this, loc, &dirty);
+ if (ret < 0 || dirty == 0) {
+ ret = 0;
+ goto out;
}
- ret = 0;
+ fd = fd_create (loc->inode, 0);
+ if (!fd) {
+ gf_log (this->name, GF_LOG_ERROR, "Failed to create fd");
+ ret = -1;
+ goto out;
+ }
+
+ ret = syncop_opendir (this, loc, fd);
+ if (ret < 0) {
+ gf_log (this->name, (-ret == ENOENT || -ret == ESTALE)
+ ? GF_LOG_DEBUG:GF_LOG_ERROR, "opendir failed "
+ "for %s: %s", loc->path, strerror (-ret));
+ goto out;
+ }
+
+ INIT_LIST_HEAD (&entries.list);
+ while ((ret = syncop_readdirp (this, fd, 131072, offset, NULL,
+ &entries)) != 0) {
+ if (ret < 0) {
+ gf_log (this->name, (-ret == ENOENT || -ret == ESTALE)
+ ? GF_LOG_DEBUG:GF_LOG_ERROR, "readdirp failed "
+ "for %s: %s", loc->path, strerror (-ret));
+ goto out;
+ }
+
+ if (list_empty (&entries.list))
+ break;
+
+ free_entries = _gf_true;
+ list_for_each_entry (entry, &entries.list, list) {
+ offset = entry->d_off;
+
+ if (!strcmp (entry->d_name, ".") ||
+ !strcmp (entry->d_name, ".."))
+ continue;
+
+ ret = loc_build_child (&child_loc, loc, entry->d_name);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "Couldn't build loc for %s/%s "
+ "returning from updation of dirty "
+ "inode", loc->path, entry->d_name);
+ goto out;
+ }
+
+ ret = mq_get_contri (this, &child_loc, &contri,
+ loc->gfid);
+ if (ret < 0)
+ goto out;
+
+ mq_add_meta (&contri_sum, &contri);
+ loc_wipe (&child_loc);
+ }
+
+ gf_dirent_free (&entries);
+ free_entries = _gf_false;
+ }
+ /* Inculde for self */
+ contri_sum.dir_count++;
+
+ ret = mq_get_size (this, loc, &size);
+ if (ret < 0)
+ goto out;
+
+ mq_compute_delta (&delta, &contri_sum, &size);
+
+ if (quota_meta_is_null (&delta))
+ goto out;
+
+ gf_log (this->name, GF_LOG_INFO, "calculated size = %"PRId64
+ ", original size = %"PRIu64 ", diff = %"PRIu64
+ ", path = %s ", contri_sum.size, size.size, delta.size,
+ loc->path);
+
+ gf_log (this->name, GF_LOG_INFO, "calculated f_count = %"PRId64
+ ", original f_count = %"PRIu64 ", diff = %"PRIu64
+ ", path = %s ", contri_sum.file_count, size.file_count,
+ delta.file_count, loc->path);
+
+ gf_log (this->name, GF_LOG_INFO, "calculated d_count = %"PRId64
+ ", original d_count = %"PRIu64 ", diff = %"PRIu64
+ ", path = %s ", contri_sum.dir_count, size.dir_count,
+ delta.dir_count, loc->path);
+
+
+ ret = mq_update_size (this, loc, &delta);
+ if (ret < 0)
+ goto out;
+
+ updated = _gf_true;
+
out:
- return ret;
-}
+ if (free_entries)
+ gf_dirent_free (&entries);
+ if (fd)
+ fd_unref (fd);
+
+ if (ret >= 0 && dirty)
+ mq_mark_dirty (this, loc, 0);
+
+ if (locked)
+ mq_lock (this, loc, F_UNLCK);
+ if (status == _gf_false)
+ mq_set_ctx_updation_status (ctx, _gf_false);
+ loc_wipe(&child_loc);
+ if (updated)
+ return 1;
+ else
+ return 0;
+}
int32_t
-mq_inspect_directory_xattr (xlator_t *this,
- loc_t *loc,
- dict_t *dict,
- struct iatt buf)
+mq_inspect_directory_xattr_task (void *opaque)
{
- int32_t ret = 0;
- int8_t dirty = -1;
- int64_t *size = NULL, size_int = 0;
- int64_t *contri = NULL, contri_int = 0;
- char contri_key [512] = {0, };
- gf_boolean_t not_root = _gf_false;
- quota_inode_ctx_t *ctx = NULL;
- inode_contribution_t *contribution = NULL;
+ int32_t ret = 0;
+ int8_t dirty = -1;
+ quota_meta_t size = {0, };
+ quota_meta_t contri = {0, };
+ quota_meta_t delta = {0, };
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ quota_inode_ctx_t *ctx = NULL;
+ inode_contribution_t *contribution = NULL;
+ quota_synctask_t *args = NULL;
+ xlator_t *this = NULL;
+ loc_t *loc = NULL;
+ dict_t *dict = NULL;
+ struct iatt buf = {0,};
+
+ GF_ASSERT (opaque);
+
+ args = (quota_synctask_t *) opaque;
+ loc = &args->loc;
+ dict = args->dict;
+ buf = args->buf;
+ this = args->this;
+ THIS = this;
ret = mq_inode_ctx_get (loc->inode, this, &ctx);
if (ret < 0) {
@@ -2132,7 +3470,7 @@ mq_inspect_directory_xattr (xlator_t *this,
}
}
- if (!loc->path || (loc->path && strcmp (loc->path, "/") != 0)) {
+ if (!loc_is_root(loc)) {
contribution = mq_add_new_contribution_node (this, ctx, loc);
if (contribution == NULL) {
if (!uuid_is_null (loc->inode->gfid))
@@ -2144,76 +3482,96 @@ mq_inspect_directory_xattr (xlator_t *this,
}
}
- ret = dict_get_bin (dict, QUOTA_SIZE_KEY, (void **) &size);
+ ret = dict_get_int8 (dict, QUOTA_DIRTY_KEY, &dirty);
if (ret < 0)
goto out;
- ret = dict_get_int8 (dict, QUOTA_DIRTY_KEY, &dirty);
+ ret = mq_dict_get_meta (dict, QUOTA_SIZE_KEY, &size);
if (ret < 0)
goto out;
- if ((loc->path && strcmp (loc->path, "/") != 0)
- || (!uuid_is_null (loc->gfid) && !__is_root_gfid (loc->gfid))
- || (loc->inode && !uuid_is_null (loc->inode->gfid) &&
- !__is_root_gfid (loc->inode->gfid))) {
- not_root = _gf_true;
-
+ if (!loc_is_root(loc)) {
GET_CONTRI_KEY (contri_key, contribution->gfid, ret);
if (ret < 0)
- goto out;
+ goto err;
- ret = dict_get_bin (dict, contri_key, (void **) &contri);
+ ret = mq_dict_get_meta (dict, contri_key, &contri);
if (ret < 0)
goto out;
LOCK (&contribution->lock);
{
- contribution->contribution = ntoh64 (*contri);
- contri_int = contribution->contribution;
+ contribution->contribution = contri.size;
+ contribution->file_count = contri.file_count;
+ contribution->dir_count = contri.dir_count;
}
UNLOCK (&contribution->lock);
}
LOCK (&ctx->lock);
{
- ctx->size = ntoh64 (*size);
+ ctx->size = size.size;
+ ctx->file_count = size.file_count;
+ ctx->dir_count = size.dir_count;
ctx->dirty = dirty;
- size_int = ctx->size;
}
UNLOCK (&ctx->lock);
- gf_log (this->name, GF_LOG_DEBUG, "size=%"PRId64
- " contri=%"PRId64, size_int, contri_int);
+ mq_compute_delta (&delta, &size, &contri);
- if (dirty) {
- ret = mq_update_dirty_inode (this, loc, ctx, contribution);
- }
+ if (dirty)
+ ret = mq_update_dirty_inode_v2 (this, loc, ctx, contribution);
- if ((!dirty || ret == 0) && (not_root == _gf_true) &&
- (size_int != contri_int)) {
- mq_initiate_quota_txn (this, loc);
- }
+ if ((!dirty || ret == 1) &&
+ !loc_is_root(loc) &&
+ !quota_meta_is_null (&delta))
+ mq_initiate_quota_blocking_txn (this, loc);
ret = 0;
out:
- if (ret)
- mq_set_inode_xattr (this, loc);
+ if (ret < 0)
+ ret = mq_create_xattrs_blocking_txn (this, loc);
+
err:
return ret;
}
int32_t
-mq_inspect_file_xattr (xlator_t *this,
- loc_t *loc,
- dict_t *dict,
- struct iatt buf)
+mq_inspect_directory_xattr_txn (xlator_t *this, loc_t *loc, dict_t *dict,
+ struct iatt buf)
{
- int32_t ret = -1;
- uint64_t contri_int = 0, size = 0;
- int64_t *contri_ptr = NULL;
- char contri_key [512] = {0, };
- quota_inode_ctx_t *ctx = NULL;
- inode_contribution_t *contribution = NULL;
+ int32_t ret = -1;
+
+ ret = mq_synctask (this, mq_inspect_directory_xattr_task, _gf_true,
+ loc, dict, &buf, 0);
+
+ return ret;
+}
+
+int32_t
+mq_inspect_file_xattr_task (void *opaque)
+{
+ int32_t ret = -1;
+ quota_meta_t size = {0, };
+ quota_meta_t contri = {0, };
+ quota_meta_t delta = {0, };
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ quota_inode_ctx_t *ctx = NULL;
+ inode_contribution_t *contribution = NULL;
+ quota_synctask_t *args = NULL;
+ xlator_t *this = NULL;
+ loc_t *loc = NULL;
+ dict_t *dict = NULL;
+ struct iatt buf = {0,};
+
+ GF_ASSERT (opaque);
+
+ args = (quota_synctask_t *) opaque;
+ loc = &args->loc;
+ dict = args->dict;
+ buf = args->buf;
+ this = args->this;
+ THIS = this;
ret = mq_inode_ctx_get (loc->inode, this, &ctx);
if (ret < 0) {
@@ -2230,103 +3588,111 @@ mq_inspect_file_xattr (xlator_t *this,
if (contribution == NULL) {
gf_log_callingfn (this->name, GF_LOG_DEBUG, "cannot allocate "
"contribution node (path:%s)", loc->path);
+ ret = -1;
goto out;
}
LOCK (&ctx->lock);
{
ctx->size = 512 * buf.ia_blocks;
- size = ctx->size;
+ ctx->file_count = 1;
+ ctx->dir_count = 0;
+
+ size.size = ctx->size;
+ size.file_count = ctx->file_count;
+ size.dir_count = ctx->dir_count;
}
UNLOCK (&ctx->lock);
list_for_each_entry (contribution, &ctx->contribution_head,
contri_list) {
+
GET_CONTRI_KEY (contri_key, contribution->gfid, ret);
if (ret < 0)
continue;
- ret = dict_get_bin (dict, contri_key, (void **) &contri_int);
- if (ret == 0) {
- contri_ptr = (int64_t *)(unsigned long)contri_int;
-
+ ret = mq_dict_get_meta (dict, contri_key, &contri);
+ if (ret < 0) {
+ ret = mq_create_xattrs_blocking_txn (this, loc);
+ } else {
LOCK (&contribution->lock);
{
- contribution->contribution = ntoh64 (*contri_ptr);
- contri_int = contribution->contribution;
+ contribution->contribution = contri.size;
+ contribution->file_count = contri.file_count;
+ contribution->dir_count = contri.dir_count;
}
UNLOCK (&contribution->lock);
- gf_log (this->name, GF_LOG_DEBUG,
- "size=%"PRId64 " contri=%"PRId64, size, contri_int);
-
- if (size != contri_int) {
- mq_initiate_quota_txn (this, loc);
+ mq_compute_delta (&delta, &size, &contri);
+ if (!quota_meta_is_null (&delta)) {
+ mq_initiate_quota_blocking_txn (this, loc);
+ /* TODO: revist this code when fixing hardlinks
+ */
+ break;
}
- } else {
- if (size)
- mq_initiate_quota_txn (this, loc);
- else
- mq_set_inode_xattr (this, loc);
}
+
+ /* TODO: loc->parent might need to be assigned to corresponding
+ * contribution inode. We need to handle hard links here
+ */
}
out:
+
return ret;
}
int32_t
-mq_xattr_state (xlator_t *this,
- loc_t *loc,
- dict_t *dict,
- struct iatt buf)
+mq_inspect_file_xattr_txn (xlator_t *this, loc_t *loc, dict_t *dict,
+ struct iatt buf)
+{
+ int32_t ret = -1;
+
+ ret = mq_synctask (this, mq_inspect_file_xattr_task, _gf_true,
+ loc, dict, &buf, 0);
+
+ return ret;
+}
+
+int32_t
+mq_xattr_state (xlator_t *this, loc_t *loc, dict_t *dict, struct iatt buf)
{
if (((buf.ia_type == IA_IFREG) && !dht_is_linkfile (&buf, dict))
|| (buf.ia_type == IA_IFLNK)) {
- mq_inspect_file_xattr (this, loc, dict, buf);
+ mq_inspect_file_xattr_txn (this, loc, dict, buf);
} else if (buf.ia_type == IA_IFDIR)
- mq_inspect_directory_xattr (this, loc, dict, buf);
+ mq_inspect_directory_xattr_txn (this, loc, dict, buf);
return 0;
}
int32_t
-mq_req_xattr (xlator_t *this,
- loc_t *loc,
- dict_t *dict)
+mq_req_xattr (xlator_t *this, loc_t *loc, dict_t *dict,
+ char *contri_key)
{
int32_t ret = -1;
GF_VALIDATE_OR_GOTO ("marker", this, out);
+ GF_VALIDATE_OR_GOTO ("marker", loc, out);
GF_VALIDATE_OR_GOTO ("marker", dict, out);
- if (!loc)
- goto set_size;
-
- //if not "/" then request contribution
- if (loc->path && strcmp (loc->path, "/") == 0)
- goto set_size;
-
- ret = mq_dict_set_contribution (this, dict, loc);
- if (ret == -1)
- goto out;
+ if (!loc_is_root(loc)) {
+ ret = mq_dict_set_contribution (this, dict, loc, NULL,
+ contri_key);
+ if (ret < 0)
+ goto out;
+ }
-set_size:
ret = dict_set_uint64 (dict, QUOTA_SIZE_KEY, 0);
- if (ret < 0) {
- ret = -1;
+ if (ret < 0)
goto out;
- }
ret = dict_set_int8 (dict, QUOTA_DIRTY_KEY, 0);
- if (ret < 0) {
- ret = -1;
- goto out;
- }
-
- ret = 0;
out:
+ if (ret < 0)
+ gf_log_callingfn (this->name, GF_LOG_ERROR, "dict set failed");
+
return ret;
}
@@ -2344,15 +3710,15 @@ int32_t
_mq_inode_remove_done (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
- int32_t ret = 0;
- char contri_key [512] = {0, };
- quota_local_t *local = NULL;
- inode_t *inode = NULL;
- dentry_t *tmp = NULL;
- gf_boolean_t last_dentry = _gf_true;
- loc_t loc = {0, };
- dentry_t *other_dentry = NULL;
- gf_boolean_t remove = _gf_false;
+ int32_t ret = 0;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ quota_local_t *local = NULL;
+ inode_t *inode = NULL;
+ dentry_t *tmp = NULL;
+ gf_boolean_t last_dentry = _gf_true;
+ loc_t loc = {0, };
+ dentry_t *other_dentry = NULL;
+ gf_boolean_t remove = _gf_false;
local = (quota_local_t *) frame->local;
@@ -2531,6 +3897,7 @@ mq_reduce_parent_size_xattr (call_frame_t *frame, void *cookie, xlator_t *this,
dict = dict_new ();
if (dict == NULL) {
+ gf_log (this->name, GF_LOG_ERROR, "dict_new failed");
ret = -1;
goto err;
}
diff --git a/xlators/features/marker/src/marker-quota.h b/xlators/features/marker/src/marker-quota.h
index 42def9d..fa132a8 100644
--- a/xlators/features/marker/src/marker-quota.h
+++ b/xlators/features/marker/src/marker-quota.h
@@ -21,7 +21,7 @@
#define QUOTA_XATTR_PREFIX "trusted.glusterfs"
#define QUOTA_DIRTY_KEY "trusted.glusterfs.quota.dirty"
-#define CONTRIBUTION "contri"
+#define CONTRIBUTION "contri"
#define CONTRI_KEY_MAX 512
#define READDIR_BUF 4096
@@ -59,21 +59,21 @@
ret = 0; \
} while (0);
-#define GET_CONTRI_KEY(var, _gfid, _ret) \
- do { \
- if (_gfid != NULL) { \
- char _gfid_unparsed[40]; \
- uuid_unparse (_gfid, _gfid_unparsed); \
- _ret = snprintf (var, CONTRI_KEY_MAX, \
- QUOTA_XATTR_PREFIX \
+#define GET_CONTRI_KEY(var, _gfid, _ret) \
+ do { \
+ if (_gfid != NULL) { \
+ char _gfid_unparsed[40]; \
+ uuid_unparse (_gfid, _gfid_unparsed); \
+ _ret = snprintf (var, CONTRI_KEY_MAX, \
+ QUOTA_XATTR_PREFIX \
".%s.%s." CONTRIBUTION, "quota", \
- _gfid_unparsed); \
- } else { \
- _ret = snprintf (var, CONTRI_KEY_MAX, \
- QUOTA_XATTR_PREFIX \
- ".%s.." CONTRIBUTION, "quota"); \
- } \
- } while (0);
+ _gfid_unparsed); \
+ } else { \
+ _ret = snprintf (var, CONTRI_KEY_MAX, \
+ QUOTA_XATTR_PREFIX \
+ ".%s.." CONTRIBUTION, "quota"); \
+ } \
+ } while (0)
#define QUOTA_SAFE_INCREMENT(lock, var) \
do { \
@@ -84,6 +84,8 @@
struct quota_inode_ctx {
int64_t size;
+ int64_t file_count;
+ int64_t dir_count;
int8_t dirty;
gf_boolean_t updation_status;
gf_lock_t lock;
@@ -91,9 +93,28 @@ struct quota_inode_ctx {
};
typedef struct quota_inode_ctx quota_inode_ctx_t;
+struct quota_meta {
+ int64_t size;
+ int64_t file_count;
+ int64_t dir_count;
+};
+typedef struct quota_meta quota_meta_t;
+
+struct quota_synctask {
+ xlator_t *this;
+ loc_t loc;
+ dict_t *dict;
+ struct iatt buf;
+ int64_t contri;
+ gf_boolean_t is_static;
+};
+typedef struct quota_synctask quota_synctask_t;
+
struct inode_contribution {
struct list_head contri_list;
int64_t contribution;
+ int64_t file_count;
+ int64_t dir_count;
uuid_t gfid;
gf_lock_t lock;
};
@@ -103,7 +124,7 @@ int32_t
mq_get_lock_on_parent (call_frame_t *, xlator_t *);
int32_t
-mq_req_xattr (xlator_t *, loc_t *, dict_t *);
+mq_req_xattr (xlator_t *, loc_t *, dict_t *, char *);
int32_t
init_quota_priv (xlator_t *);
@@ -117,6 +138,12 @@ mq_set_inode_xattr (xlator_t *, loc_t *);
int
mq_initiate_quota_txn (xlator_t *, loc_t *);
+int
+mq_initiate_quota_blocking_txn (xlator_t *, loc_t *);
+
+int
+mq_create_xattrs_txn (xlator_t *this, loc_t *loc);
+
int32_t
mq_dirty_inode_readdir (call_frame_t *, void *, xlator_t *,
int32_t, int32_t, fd_t *, dict_t *);
@@ -125,6 +152,9 @@ int32_t
mq_reduce_parent_size (xlator_t *, loc_t *, int64_t);
int32_t
+mq_reduce_parent_size_txn (xlator_t *, loc_t *, int64_t);
+
+int32_t
mq_rename_update_newpath (xlator_t *, loc_t *);
int32_t
diff --git a/xlators/features/marker/src/marker.c b/xlators/features/marker/src/marker.c
index ad3aabd..af7ec1f 100644
--- a/xlators/features/marker/src/marker.c
+++ b/xlators/features/marker/src/marker.c
@@ -607,7 +607,7 @@ marker_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
priv = this->private;
if (priv->feature_enabled & GF_QUOTA)
- mq_set_inode_xattr (this, &local->loc);
+ mq_create_xattrs_txn (this, &local->loc);
if (priv->feature_enabled & GF_XTIME)
marker_xtime_update_marks (this, local);
@@ -681,7 +681,7 @@ marker_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
priv = this->private;
if (priv->feature_enabled & GF_QUOTA)
- mq_set_inode_xattr (this, &local->loc);
+ mq_create_xattrs_txn (this, &local->loc);
if (priv->feature_enabled & GF_XTIME)
marker_xtime_update_marks (this, local);
@@ -827,7 +827,7 @@ marker_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
priv = this->private;
if (priv->feature_enabled & GF_QUOTA)
- mq_reduce_parent_size (this, &local->loc, -1);
+ mq_reduce_parent_size_txn (this, &local->loc, -1);
if (priv->feature_enabled & GF_XTIME)
marker_xtime_update_marks (this, local);
@@ -896,7 +896,7 @@ marker_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
if (priv->feature_enabled & GF_QUOTA) {
if (!local->skip_txn)
- mq_reduce_parent_size (this, &local->loc, -1);
+ mq_reduce_parent_size_txn (this, &local->loc, -1);
}
if (priv->feature_enabled & GF_XTIME)
@@ -977,7 +977,7 @@ marker_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
if (priv->feature_enabled & GF_QUOTA) {
if (!local->skip_txn)
- mq_set_inode_xattr (this, &local->loc);
+ mq_create_xattrs_txn (this, &local->loc);
}
@@ -1065,10 +1065,11 @@ marker_rename_done (call_frame_t *frame, void *cookie, xlator_t *this,
frame->root->unique);
}
- mq_reduce_parent_size (this, &oplocal->loc, oplocal->contribution);
+ mq_reduce_parent_size_txn (this, &oplocal->loc, oplocal->contribution);
if (local->loc.inode != NULL) {
- mq_reduce_parent_size (this, &local->loc, local->contribution);
+ mq_reduce_parent_size_txn (this, &local->loc,
+ local->contribution);
}
newloc.inode = inode_ref (oplocal->loc.inode);
@@ -1078,7 +1079,7 @@ marker_rename_done (call_frame_t *frame, void *cookie, xlator_t *this,
newloc.name++;
newloc.parent = inode_ref (local->loc.parent);
- mq_set_inode_xattr (this, &newloc);
+ mq_create_xattrs_txn (this, &newloc);
loc_wipe (&newloc);
@@ -1181,13 +1182,13 @@ marker_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
struct iatt *prenewparent, struct iatt *postnewparent,
dict_t *xdata)
{
- marker_conf_t *priv = NULL;
- marker_local_t *local = NULL;
- marker_local_t *oplocal = NULL;
- call_stub_t *stub = NULL;
- int32_t ret = 0;
- char contri_key [512] = {0, };
- loc_t newloc = {0, };
+ marker_conf_t *priv = NULL;
+ marker_local_t *local = NULL;
+ marker_local_t *oplocal = NULL;
+ call_stub_t *stub = NULL;
+ int32_t ret = 0;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ loc_t newloc = {0, };
local = (marker_local_t *) frame->local;
@@ -1284,10 +1285,11 @@ int32_t
marker_do_rename (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *dict, dict_t *xdata)
{
- marker_local_t *local = NULL, *oplocal = NULL;
- char contri_key[512] = {0, };
- int32_t ret = 0;
- int64_t *contribution = 0;
+ marker_local_t *local = NULL;
+ marker_local_t *oplocal = NULL;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ int32_t ret = 0;
+ int64_t *contribution = 0;
local = frame->local;
oplocal = local->oplocal;
@@ -1336,10 +1338,11 @@ marker_get_newpath_contribution (call_frame_t *frame, void *cookie,
xlator_t *this, int32_t op_ret,
int32_t op_errno, dict_t *dict, dict_t *xdata)
{
- marker_local_t *local = NULL, *oplocal = NULL;
- char contri_key[512] = {0, };
- int32_t ret = 0;
- int64_t *contribution = 0;
+ marker_local_t *local = NULL;
+ marker_local_t *oplocal = NULL;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ int32_t ret = 0;
+ int64_t *contribution = 0;
local = frame->local;
oplocal = local->oplocal;
@@ -1403,9 +1406,10 @@ marker_get_oldpath_contribution (call_frame_t *frame, void *cookie,
xlator_t *this, int32_t op_ret,
int32_t op_errno, dict_t *xdata)
{
- marker_local_t *local = NULL, *oplocal = NULL;
- char contri_key[512] = {0, };
- int32_t ret = 0;
+ marker_local_t *local = NULL;
+ marker_local_t *oplocal = NULL;
+ char contri_key[CONTRI_KEY_MAX] = {0, };
+ int32_t ret = 0;
local = frame->local;
oplocal = local->oplocal;
@@ -1764,8 +1768,9 @@ marker_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
priv = this->private;
- if (priv->feature_enabled & GF_QUOTA)
- mq_set_inode_xattr (this, &local->loc);
+ if (priv->feature_enabled & GF_QUOTA) {
+ mq_create_xattrs_txn (this, &local->loc);
+ }
if (priv->feature_enabled & GF_XTIME)
marker_xtime_update_marks (this, local);
@@ -1838,7 +1843,7 @@ marker_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
priv = this->private;
if ((priv->feature_enabled & GF_QUOTA) && (S_ISREG (local->mode))) {
- mq_set_inode_xattr (this, &local->loc);
+ mq_create_xattrs_txn (this, &local->loc);
}
if (priv->feature_enabled & GF_XTIME)
@@ -2706,7 +2711,7 @@ marker_lookup (call_frame_t *frame, xlator_t *this,
goto err;
if ((priv->feature_enabled & GF_QUOTA) && xattr_req)
- mq_req_xattr (this, loc, xattr_req);
+ mq_req_xattr (this, loc, xattr_req, NULL);
wind:
STACK_WIND (frame, marker_lookup_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->lookup, loc, xattr_req);
@@ -2854,7 +2859,7 @@ marker_readdirp (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
loc.parent = local->loc.inode = inode_ref (fd->inode);
- mq_req_xattr (this, &loc, dict);
+ mq_req_xattr (this, &loc, dict, NULL);
}
STACK_WIND (frame, marker_readdirp_cbk,