summaryrefslogtreecommitdiffstats
path: root/xlators/features/quota/src/quota.c
diff options
context:
space:
mode:
Diffstat (limited to 'xlators/features/quota/src/quota.c')
-rw-r--r--xlators/features/quota/src/quota.c325
1 files changed, 271 insertions, 54 deletions
diff --git a/xlators/features/quota/src/quota.c b/xlators/features/quota/src/quota.c
index 3c1b8e09c5c..f903b4e57b7 100644
--- a/xlators/features/quota/src/quota.c
+++ b/xlators/features/quota/src/quota.c
@@ -14,17 +14,6 @@
#include "defaults.h"
#include "statedump.h"
-void
-quota_get_limit_dir (call_frame_t *frame, inode_t *cur_inode, xlator_t *this);
-
-int32_t
-quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
- char *name, uuid_t par);
-
-int
-quota_fill_inodectx (xlator_t *this, inode_t *inode, dict_t *dict,
- loc_t *loc, struct iatt *buf, int32_t *op_errno);
-
struct volume_options options[];
static int32_t
@@ -251,6 +240,164 @@ out:
return;
}
+static inline inode_t*
+__quota_inode_parent (inode_t *inode, uuid_t pargfid, const char *name)
+{
+ inode_t *parent = NULL;
+
+ parent = inode_parent (inode, pargfid, name);
+ inode_unref (inode);
+ return parent;
+}
+
+static inline inode_t*
+quota_inode_parent (inode_t *inode, uuid_t pargfid, const char *name)
+{
+ inode_t *parent = NULL;
+
+ parent = __quota_inode_parent (inode, pargfid, name);
+ if (!parent)
+ gf_log_callingfn (THIS->name, GF_LOG_ERROR, "Failed to find "
+ "ancestor for inode (%s)",
+ uuid_utoa(inode->gfid));
+
+ return parent;
+}
+
+int32_t
+quota_inode_depth (inode_t *inode)
+{
+ int depth = 0;
+ inode_t *cur_inode = NULL;
+
+ cur_inode = inode_ref (inode);
+ while (cur_inode && !__is_root_gfid (cur_inode->gfid)) {
+ depth++;
+ cur_inode = quota_inode_parent (cur_inode, 0 , NULL);
+ if (!cur_inode)
+ depth = -1;
+ }
+
+ if (cur_inode)
+ inode_unref (cur_inode);
+
+ return depth;
+}
+
+int32_t quota_find_common_ancestor (inode_t *inode1, inode_t *inode2,
+ uuid_t *common_ancestor)
+{
+ int32_t depth1 = 0;
+ int32_t depth2 = 0;
+ int32_t ret = -1;
+ inode_t *cur_inode1 = NULL;
+ inode_t *cur_inode2 = NULL;
+
+ depth1 = quota_inode_depth (inode1);
+ if (depth1 < 0)
+ goto out;
+
+ depth2 = quota_inode_depth (inode2);
+ if (depth2 < 0)
+ goto out;
+
+ cur_inode1 = inode_ref (inode1);
+ cur_inode2 = inode_ref (inode2);
+
+ while (cur_inode1 && depth1 > depth2) {
+ cur_inode1 = quota_inode_parent (cur_inode1, 0 , NULL);
+ depth1--;
+ }
+
+ while (cur_inode2 && depth2 > depth1) {
+ cur_inode2 = quota_inode_parent (cur_inode2, 0 , NULL);
+ depth2--;
+ }
+
+ while (depth1 && cur_inode1 && cur_inode2 && cur_inode1 != cur_inode2) {
+ cur_inode1 = quota_inode_parent (cur_inode1, 0 , NULL);
+ cur_inode2 = quota_inode_parent (cur_inode2, 0 , NULL);
+ depth1--;
+ }
+
+ if (cur_inode1 && cur_inode2) {
+ uuid_copy (*common_ancestor, cur_inode1->gfid);
+ ret = 0;
+ }
+out:
+ if (cur_inode1)
+ inode_unref (cur_inode1);
+
+ if (cur_inode2)
+ inode_unref (cur_inode2);
+
+ return ret;
+ }
+
+void
+check_ancestory_continue (struct list_head *parents, inode_t *inode,
+ int32_t op_ret, int32_t op_errno, void *data)
+{
+ call_frame_t *frame = NULL;
+ quota_local_t *local = NULL;
+ uint32_t link_count = 0;
+
+ frame = data;
+ local = frame->local;
+
+ if (op_ret < 0 || (parents && list_empty (parents))) {
+ if (op_ret >= 0) {
+ gf_log (THIS->name, GF_LOG_WARNING,
+ "Couldn't build ancestry for inode (gfid:%s). "
+ "Without knowing ancestors till root, quota "
+ "cannot be enforced. "
+ "Hence, failing fop with EIO",
+ uuid_utoa (inode->gfid));
+ op_errno = EIO;
+ op_ret = -1;
+ }
+ }
+
+ LOCK (&local->lock);
+ {
+ link_count = --local->link_count;
+ if (op_ret < 0) {
+ local->op_ret = op_ret;
+ local->op_errno = op_errno;
+ }
+ }
+ UNLOCK (&local->lock);
+
+ if (link_count == 0)
+ local->fop_continue_cbk (frame);
+}
+
+void
+check_ancestory (call_frame_t *frame, inode_t *inode)
+{
+ inode_t *cur_inode = NULL;
+ inode_t *parent = NULL;
+
+ cur_inode = inode_ref (inode);
+ while (cur_inode && !__is_root_gfid (cur_inode->gfid)) {
+ parent = inode_parent (cur_inode, 0, NULL);
+ if (!parent) {
+ quota_build_ancestry (cur_inode,
+ check_ancestory_continue, frame);
+ return;
+ }
+ inode_unref (cur_inode);
+ cur_inode = parent;
+ }
+
+ if (cur_inode) {
+ inode_unref (cur_inode);
+ check_ancestory_continue (NULL, NULL, 0, 0, frame);
+ } else {
+ check_ancestory_continue (NULL, NULL, -1, ESTALE, frame);
+ }
+}
+
static inline void
quota_link_count_decrement (quota_local_t *local)
{
@@ -827,6 +974,14 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
}
do {
+ /* In a rename operation, enforce should be stopped at common
+ ancestor */
+ if (!uuid_is_null (local->common_ancestor) &&
+ !uuid_compare (_inode->gfid, local->common_ancestor)) {
+ quota_link_count_decrement (local);
+ break;
+ }
+
if (ctx != NULL && (ctx->hard_lim > 0 || ctx->soft_lim > 0)) {
wouldbe_size = ctx->size + delta;
@@ -2046,63 +2201,51 @@ out:
return 0;
}
-int32_t
-quota_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
- loc_t *newloc, dict_t *xdata)
+void
+quota_rename_continue (call_frame_t *frame)
{
- quota_priv_t *priv = NULL;
- int32_t ret = -1, op_errno = ENOMEM;
- quota_local_t *local = NULL;
- quota_inode_ctx_t *ctx = NULL;
- call_stub_t *stub = NULL;
-
- priv = this->private;
-
- WIND_IF_QUOTAOFF (priv->is_quota_on, off);
-
- local = quota_local_new ();
- if (local == NULL) {
- goto err;
- }
-
- frame->local = local;
+ int32_t ret = -1;
+ int32_t op_errno = EIO;
+ quota_local_t *local = NULL;
+ uuid_t common_ancestor = {0};
+ xlator_t *this = NULL;
+ quota_inode_ctx_t *ctx = NULL;
- ret = loc_copy (&local->oldloc, oldloc);
- if (ret < 0) {
- gf_log (this->name, GF_LOG_WARNING, "loc_copy failed");
- goto err;
- }
+ local = frame->local;
+ this = THIS;
- ret = loc_copy (&local->newloc, newloc);
- if (ret < 0) {
- gf_log (this->name, GF_LOG_WARNING, "loc_copy failed");
+ if (local->op_ret < 0) {
+ op_errno = local->op_errno;
goto err;
}
- stub = fop_rename_stub (frame, quota_rename_helper, oldloc, newloc,
- xdata);
- if (stub == NULL) {
+ ret = quota_find_common_ancestor (local->oldloc.parent,
+ local->newloc.parent,
+ &common_ancestor);
+ if (ret < 0 || uuid_is_null(common_ancestor)) {
+ gf_log (this->name, GF_LOG_ERROR, "failed to get "
+ "common_ancestor for %s and %s",
+ local->oldloc.path, local->newloc.path);
+ op_errno = ESTALE;
goto err;
}
LOCK (&local->lock);
{
local->link_count = 1;
- local->stub = stub;
+ uuid_copy (local->common_ancestor, common_ancestor);
}
UNLOCK (&local->lock);
- if (QUOTA_REG_OR_LNK_FILE (oldloc->inode->ia_type)) {
- ret = quota_inode_ctx_get (oldloc->inode, this, &ctx, 0);
+ if (QUOTA_REG_OR_LNK_FILE (local->oldloc.inode->ia_type)) {
+ ret = quota_inode_ctx_get (local->oldloc.inode, this, &ctx, 0);
if (ctx == NULL) {
gf_log (this->name, GF_LOG_WARNING,
"quota context not set in inode (gfid:%s), "
"considering file size as zero while enforcing "
"quota on new ancestry",
- oldloc->inode ? uuid_utoa (oldloc->inode->gfid)
- : "0");
+ uuid_utoa (local->oldloc.inode->gfid));
local->delta = 0;
-
} else {
/* FIXME: We need to account for the size occupied by this
@@ -2112,25 +2255,99 @@ quota_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
* directory inode*/
/* FIXME: The following code assumes that regular files and
- *linkfiles are present, in their entirety, in a single
- brick. This *assumption is invalid in the case of
- stripe.*/
+ * linkfiles are present, in their entirety, in a single
+ * brick. This *assumption is invalid in the case of
+ * stripe.*/
local->delta = ctx->buf.ia_blocks * 512;
}
- } else if (IA_ISDIR (oldloc->inode->ia_type)) {
- ret = quota_validate (frame, oldloc->inode, this,
+ } else if (IA_ISDIR (local->oldloc.inode->ia_type)) {
+ ret = quota_validate (frame, local->oldloc.inode, this,
quota_rename_get_size_cbk);
if (ret){
op_errno = -ret;
goto err;
}
- return 0;
+ return;
}
- quota_check_limit (frame, newloc->parent, this, NULL, NULL);
+ quota_check_limit (frame, local->newloc.parent, this, NULL, NULL);
+ return;
+
+err:
+ if (local && local->stub)
+ call_stub_destroy (local->stub);
+
+ QUOTA_STACK_UNWIND (rename, frame, -1, op_errno, NULL,
+ NULL, NULL, NULL, NULL, NULL);
+ return;
+
+}
+
+int32_t
+quota_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
+ loc_t *newloc, dict_t *xdata)
+{
+ quota_priv_t *priv = NULL;
+ int32_t ret = -1;
+ int32_t op_errno = ENOMEM;
+ quota_local_t *local = NULL;
+ call_stub_t *stub = NULL;
+ uuid_t common_ancestor = {0};
+
+ priv = this->private;
+
+ WIND_IF_QUOTAOFF (priv->is_quota_on, off);
+
+ /* No need to check quota limit if src and dst parents are same */
+ if (oldloc->parent && newloc->parent &&
+ !uuid_compare(oldloc->parent->gfid, newloc->parent->gfid)) {
+ gf_log (this->name, GF_LOG_DEBUG, "rename %s -> %s are "
+ "in the same directory, so skip check limit",
+ oldloc->path, newloc->path);
+ goto off;
+ }
+
+ local = quota_local_new ();
+ if (local == NULL) {
+ goto err;
+ }
+
+ frame->local = local;
+
+ ret = loc_copy (&local->oldloc, oldloc);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_WARNING, "loc_copy failed");
+ goto err;
+ }
+
+ ret = loc_copy (&local->newloc, newloc);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_WARNING, "loc_copy failed");
+ goto err;
+ }
+
+ stub = fop_rename_stub (frame, quota_rename_helper, oldloc, newloc,
+ xdata);
+ if (stub == NULL) {
+ goto err;
+ }
+
+ LOCK (&local->lock);
+ {
+ /* link_count here tell how many check_ancestory should be done
+ * before continuing the FOP
+ */
+ local->link_count = 2;
+ local->stub = stub;
+ local->fop_continue_cbk = quota_rename_continue;
+ }
+ UNLOCK (&local->lock);
+
+ check_ancestory (frame, newloc->parent);
+ check_ancestory (frame, oldloc->parent);
return 0;
err: