summaryrefslogtreecommitdiffstats
path: root/xlators/features/marker/src/marker.c
diff options
context:
space:
mode:
authorRaghavendra G <raghavendra@gluster.com>2011-06-14 23:54:16 +0000
committerAnand Avati <avati@gluster.com>2011-06-16 22:00:51 -0700
commita87555181d47522e985325c67b7d17c49dbd38de (patch)
tree16e7848551e64c1aa8b2038fabe2da4c3bd98dd7 /xlators/features/marker/src/marker.c
parent01d67311c83ae272f3ee3632c1e8f13ccebaca81 (diff)
features/marker-quota: fixes in rename path.
- remove xattrs from newpath after rename is complete. - hold inodelk on both parents (if they are different) before doing rename and gather contribution values of oldpath and newpath to their parents while still holding the locks. Use these contribution values to reduce parent sizes. Signed-off-by: Raghavendra G <raghavendra@gluster.com> Signed-off-by: Anand Avati <avati@gluster.com> BUG: 2697 (Quota: add-brick creates the size go awkward, though it was perfect earlier) URL: http://bugs.gluster.com/cgi-bin/bugzilla3/show_bug.cgi?id=2697
Diffstat (limited to 'xlators/features/marker/src/marker.c')
-rw-r--r--xlators/features/marker/src/marker.c403
1 files changed, 340 insertions, 63 deletions
diff --git a/xlators/features/marker/src/marker.c b/xlators/features/marker/src/marker.c
index f3c9e333c..b00d0a3d2 100644
--- a/xlators/features/marker/src/marker.c
+++ b/xlators/features/marker/src/marker.c
@@ -27,6 +27,7 @@
#include "marker.h"
#include "marker-mem-types.h"
#include "marker-quota.h"
+#include "marker-quota-helper.h"
#include "marker-common.h"
void
@@ -178,8 +179,8 @@ marker_local_unref (marker_local_t *local)
loc_wipe (&local->loc);
if (local->oplocal) {
- loc_wipe (&local->oplocal->loc);
- GF_FREE (local->oplocal);
+ marker_local_unref (local->oplocal);
+ local->oplocal = NULL;
}
GF_FREE (local);
out:
@@ -679,7 +680,7 @@ marker_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
priv = this->private;
if (priv->feature_enabled & GF_QUOTA)
- reduce_parent_size (this, &local->loc);
+ reduce_parent_size (this, &local->loc, -1);
if (priv->feature_enabled & GF_XTIME)
marker_xtime_update_marks (this, local);
@@ -746,7 +747,7 @@ marker_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
priv = this->private;
if ((priv->feature_enabled & GF_QUOTA) && (local->ia_nlink == 1))
- reduce_parent_size (this, &local->loc);
+ reduce_parent_size (this, &local->loc, -1);
if (priv->feature_enabled & GF_XTIME)
marker_xtime_update_marks (this, local);
@@ -889,60 +890,63 @@ err:
int32_t
-marker_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno, struct iatt *buf,
- struct iatt *preoldparent, struct iatt *postoldparent,
- struct iatt *prenewparent, struct iatt *postnewparent)
+marker_rename_done (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno)
{
- marker_conf_t *priv = NULL;
- marker_local_t *local = NULL;
- marker_local_t *oplocal = NULL;
- loc_t newloc = {0, };
+ marker_local_t *local = NULL, *oplocal = NULL;
+ loc_t newloc = {0, };
+ marker_conf_t *priv = NULL;
- if (op_ret == -1) {
- gf_log (this->name, GF_LOG_TRACE, "%s occured while "
- "renaming a file ", strerror (op_errno));
- }
+ local = frame->local;
+ oplocal = local->oplocal;
- local = (marker_local_t *) frame->local;
+ priv = this->private;
frame->local = NULL;
- STACK_UNWIND_STRICT (rename, frame, op_ret, op_errno, buf, preoldparent,
- postoldparent, prenewparent, postnewparent);
-
- if (op_ret == -1 || local == NULL)
- goto out;
+ if (op_ret < 0) {
+ if (local->err == 0) {
+ local->err = op_errno;
+ }
- oplocal = local->oplocal;
- local->oplocal = NULL;
+ gf_log (this->name, GF_LOG_WARNING,
+ "inodelk (UNLOCK) failed on path:%s, inode (ino:%"PRId64
+ ", gfid:%s)(%s)", local->parent_loc.path,
+ local->parent_loc.inode->ino,
+ uuid_utoa (local->parent_loc.inode->gfid),
+ strerror (op_errno));
+ }
- priv = this->private;
+ if (local->stub != NULL) {
+ call_resume (local->stub);
+ local->stub = NULL;
+ } else if (local->err != 0) {
+ STACK_UNWIND_STRICT (rename, frame, -1, local->err, NULL, NULL,
+ NULL, NULL, NULL);
+ }
- if (priv->feature_enabled & GF_QUOTA) {
- reduce_parent_size (this, &oplocal->loc);
+ reduce_parent_size (this, &oplocal->loc, oplocal->contribution);
- if (local->loc.inode != NULL) {
- reduce_parent_size (this, &local->loc);
- }
+ if (local->loc.inode != NULL) {
+ reduce_parent_size (this, &local->loc, local->contribution);
+ }
- newloc.inode = inode_ref (oplocal->loc.inode);
- newloc.path = gf_strdup (local->loc.path);
- newloc.name = gf_strdup (local->loc.name);
- newloc.parent = inode_ref (local->loc.parent);
- newloc.ino = oplocal->loc.inode->ino;
+ newloc.inode = inode_ref (oplocal->loc.inode);
+ newloc.path = gf_strdup (local->loc.path);
+ newloc.name = gf_strdup (local->loc.name);
+ newloc.parent = inode_ref (local->loc.parent);
+ newloc.ino = oplocal->loc.inode->ino;
- quota_rename_update_newpath (this, &newloc);
+ quota_rename_update_newpath (this, &newloc);
- loc_wipe (&newloc);
- }
+ loc_wipe (&newloc);
if (priv->feature_enabled & GF_XTIME) {
//update marks on oldpath
marker_xtime_update_marks (this, oplocal);
marker_xtime_update_marks (this, local);
}
-out:
+
marker_local_unref (local);
marker_local_unref (oplocal);
return 0;
@@ -950,35 +954,276 @@ out:
int32_t
-marker_quota_removexattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int32_t op_ret, int32_t op_errno)
+marker_rename_release_newp_lock (call_frame_t *frame, void *cookie,
+ xlator_t *this, int32_t op_ret,
+ int32_t op_errno)
+{
+ marker_local_t *local = NULL, *oplocal = NULL;
+ struct gf_flock lock = {0, };
+
+ local = frame->local;
+ oplocal = local->oplocal;
+
+ if (op_ret < 0) {
+ if (local->err == 0) {
+ local->err = op_errno;
+ }
+
+ gf_log (this->name, GF_LOG_WARNING,
+ "inodelk (UNLOCK) failed on path:%s, inode (ino:%"PRId64
+ ", gfid:%s)(%s)", oplocal->parent_loc.path,
+ oplocal->parent_loc.inode->ino,
+ uuid_utoa (oplocal->parent_loc.inode->gfid),
+ strerror (op_errno));
+ }
+
+ if (local->next_lock_on == NULL) {
+ marker_rename_done (frame, NULL, this, 0, 0);
+ goto out;
+ }
+
+ lock.l_type = F_UNLCK;
+ lock.l_whence = SEEK_SET;
+ lock.l_start = 0;
+ lock.l_len = 0;
+ lock.l_pid = 0;
+
+ STACK_WIND (frame,
+ marker_rename_done,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->inodelk,
+ this->name, &local->parent_loc, F_SETLKW, &lock);
+
+out:
+ return 0;
+}
+
+
+int32_t
+marker_rename_release_oldp_lock (call_frame_t *frame, void *cookie,
+ xlator_t *this, int32_t op_ret,
+ int32_t op_errno)
{
- marker_local_t *local = NULL, *oplocal = NULL;
+ marker_local_t *local = NULL, *oplocal = NULL;
+ struct gf_flock lock = {0, };
+
+ local = frame->local;
+ oplocal = local->oplocal;
if ((op_ret < 0) && (op_errno != ENOATTR)) {
- goto unwind;
+ local->err = op_errno;
+ }
+
+ lock.l_type = F_UNLCK;
+ lock.l_whence = SEEK_SET;
+ lock.l_start = 0;
+ lock.l_len = 0;
+ lock.l_pid = 0;
+
+ STACK_WIND (frame,
+ marker_rename_release_newp_lock,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->inodelk,
+ this->name, &oplocal->parent_loc, F_SETLKW, &lock);
+ return 0;
+}
+
+
+int32_t
+marker_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *buf,
+ struct iatt *preoldparent, struct iatt *postoldparent,
+ struct iatt *prenewparent, struct iatt *postnewparent)
+{
+ marker_conf_t *priv = NULL;
+ marker_local_t *local = NULL;
+ marker_local_t *oplocal = NULL;
+ call_stub_t *stub = NULL;
+ int32_t ret = 0;
+ char contri_key [512] = {0, };
+
+ local = (marker_local_t *) frame->local;
+
+ if (local != NULL) {
+ oplocal = local->oplocal;
+ }
+
+ priv = this->private;
+
+ if (op_ret < 0) {
+ if (local != NULL) {
+ local->err = op_errno;
+ }
+
+ gf_log (this->name, GF_LOG_TRACE, "%s occured while "
+ "renaming a file ", strerror (op_errno));
+ }
+
+ if (priv->feature_enabled & GF_QUOTA) {
+ if ((op_ret < 0) || (local == NULL)) {
+ goto quota_err;
+ }
+
+ stub = fop_rename_cbk_stub (frame, default_rename_cbk, op_ret,
+ op_errno, buf, preoldparent,
+ postoldparent, prenewparent,
+ postnewparent);
+ if (stub == NULL) {
+ local->err = ENOMEM;
+ goto quota_err;
+ }
+
+ local->stub = stub;
+
+ GET_CONTRI_KEY (contri_key, oplocal->loc.parent->gfid, ret);
+ if (ret < 0) {
+ local->err = ENOMEM;
+ goto quota_err;
+ }
+
+ STACK_WIND (frame, marker_rename_release_oldp_lock,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->removexattr, &local->loc,
+ contri_key);
+ } else {
+ frame->local = NULL;
+
+ STACK_UNWIND_STRICT (rename, frame, op_ret, op_errno, buf,
+ preoldparent, postoldparent, prenewparent,
+ postnewparent);
+
+ if ((op_ret < 0) || (local == NULL)) {
+ goto out;
+ }
+
+ if (priv->feature_enabled & GF_XTIME) {
+ //update marks on oldpath
+ marker_xtime_update_marks (this, oplocal);
+ marker_xtime_update_marks (this, local);
+ }
+ }
+
+out:
+ if (!(priv->feature_enabled & GF_QUOTA)) {
+ marker_local_unref (local);
+ marker_local_unref (oplocal);
+ }
+
+ return 0;
+
+quota_err:
+ marker_rename_release_oldp_lock (frame, NULL, this, 0, 0);
+ return 0;
+}
+
+
+int32_t
+marker_do_rename (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno)
+{
+ marker_local_t *local = NULL, *oplocal = NULL;
+ inode_contribution_t *contribution = NULL;
+
+ local = frame->local;
+ oplocal = local->oplocal;
+
+ if (op_ret < 0) {
+ local->err = op_errno;
+ gf_log (this->name, GF_LOG_WARNING,
+ "cannot hold inodelk on %s (ino:%"PRId64", gfid:%s)"
+ "(%s)",
+ local->next_lock_on->path,
+ local->next_lock_on->inode->ino,
+ uuid_utoa (local->next_lock_on->inode->gfid),
+ strerror (op_errno));
+ goto lock_err;
}
local = frame->local;
oplocal = local->oplocal;
+ if (local->loc.inode != NULL) {
+ contribution = get_contribution_from_loc (this, &local->loc);
+ if (contribution == NULL) {
+ local->contribution = 0;
+ } else {
+ local->contribution = contribution->contribution;
+ }
+ }
+
+ contribution = get_contribution_from_loc (this, &oplocal->loc);
+ if (contribution == NULL) {
+ oplocal->contribution = 0;
+ } else {
+ oplocal->contribution = contribution->contribution;
+ }
+
STACK_WIND (frame, marker_rename_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->rename, &oplocal->loc,
&local->loc);
+
return 0;
-unwind:
- STACK_UNWIND_STRICT (rename, frame, -1, ENOMEM, NULL,
- NULL, NULL, NULL, NULL);
- if (local) {
- local->oplocal = NULL;
- marker_local_unref (local);
- GF_FREE (local);
+lock_err:
+ if ((local->next_lock_on == NULL)
+ || (local->next_lock_on == &local->parent_loc)) {
+ local->next_lock_on = NULL;
+ marker_rename_release_oldp_lock (frame, NULL, this, 0, 0);
+ } else {
+ marker_rename_release_newp_lock (frame, NULL, this, 0, 0);
}
- if (oplocal) {
- marker_local_unref (oplocal);
- GF_FREE (oplocal);
+
+ return 0;
+}
+
+
+int32_t
+marker_rename_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno)
+{
+ marker_local_t *local = NULL, *oplocal = NULL;
+ loc_t *loc = NULL;
+ struct gf_flock lock = {0, };
+
+ local = frame->local;
+ oplocal = local->oplocal;
+
+ if (op_ret < 0) {
+ if (local->next_lock_on != &oplocal->parent_loc) {
+ loc = &oplocal->parent_loc;
+ } else {
+ loc = &local->parent_loc;
+ }
+
+ local->err = op_errno;
+ gf_log (this->name, GF_LOG_WARNING,
+ "cannot hold inodelk on %s (ino:%"PRId64", gfid:%s)"
+ "(%s)", loc->path, loc->inode->ino,
+ uuid_utoa (loc->inode->gfid),
+ strerror (op_errno));
+ goto err;
}
+
+ if (local->next_lock_on != NULL) {
+ lock.l_len = 0;
+ lock.l_start = 0;
+ lock.l_type = F_WRLCK;
+ lock.l_whence = SEEK_SET;
+
+ STACK_WIND (frame,
+ marker_do_rename,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->inodelk,
+ this->name, local->next_lock_on,
+ F_SETLKW, &lock);
+ } else {
+ marker_do_rename (frame, 0, this, 0, 0);
+ }
+
+ return 0;
+
+err:
+ marker_rename_done (frame, NULL, this, 0, 0);
return 0;
}
@@ -991,17 +1236,14 @@ marker_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
marker_local_t *local = NULL;
marker_local_t *oplocal = NULL;
marker_conf_t *priv = NULL;
- char contri_key[512] = {0,};
+ struct gf_flock lock = {0, };
+ loc_t *lock_on = NULL;
priv = this->private;
if (priv->feature_enabled == 0)
goto rename_wind;
- GET_CONTRI_KEY (contri_key, oldloc->parent->gfid, ret);
- if (ret < 0)
- goto err;
-
ALLOCATE_OR_GOTO (local, marker_local_t, err);
MARKER_INIT_LOCAL (frame, local);
@@ -1012,18 +1254,53 @@ marker_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
frame->local = local;
- local->oplocal = oplocal;
+ local->oplocal = marker_local_ref (oplocal);
ret = loc_copy (&local->loc, newloc);
- if (ret == -1)
+ if (ret < 0)
goto err;
ret = loc_copy (&oplocal->loc, oldloc);
- if (ret == -1)
+ if (ret < 0)
goto err;
- STACK_WIND (frame, marker_quota_removexattr_cbk, FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->removexattr, oldloc, contri_key);
+ if (!(priv->feature_enabled & GF_QUOTA)) {
+ goto rename_wind;
+ }
+
+ ret = quota_inode_loc_fill (NULL, newloc->parent, &local->parent_loc);
+ if (ret < 0)
+ goto err;
+
+ ret = quota_inode_loc_fill (NULL, oldloc->parent, &oplocal->parent_loc);
+ if (ret < 0)
+ goto err;
+
+ if ((newloc->inode != NULL) && (newloc->parent != oldloc->parent)
+ && (uuid_compare (newloc->parent->gfid,
+ oldloc->parent->gfid) < 0)) {
+ lock_on = &local->parent_loc;
+ local->next_lock_on = &oplocal->parent_loc;
+ } else {
+ lock_on = &oplocal->parent_loc;
+ if ((newloc->inode != NULL) && (newloc->parent
+ != oldloc->parent)) {
+ local->next_lock_on = &local->parent_loc;
+ }
+ }
+
+ lock.l_len = 0;
+ lock.l_start = 0;
+ lock.l_type = F_WRLCK;
+ lock.l_whence = SEEK_SET;
+
+ STACK_WIND (frame,
+ marker_rename_inodelk_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->inodelk,
+ this->name, lock_on,
+ F_SETLKW, &lock);
+
return 0;
rename_wind: