diff options
| -rw-r--r-- | cli/src/cli-rpc-ops.c | 65 | ||||
| -rw-r--r-- | libglusterfs/src/glusterfs.h | 3 | ||||
| -rw-r--r-- | libglusterfs/src/syncop.c | 36 | ||||
| -rw-r--r-- | libglusterfs/src/syncop.h | 4 | ||||
| -rw-r--r-- | libglusterfs/src/xlator.c | 42 | ||||
| -rw-r--r-- | libglusterfs/src/xlator.h | 1 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker-mem-types.h | 2 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker-quota-helper.c | 50 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker-quota-helper.h | 2 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker-quota.c | 1723 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker-quota.h | 62 | ||||
| -rw-r--r-- | xlators/features/marker/src/marker.c | 67 | 
12 files changed, 1788 insertions, 269 deletions
diff --git a/cli/src/cli-rpc-ops.c b/cli/src/cli-rpc-ops.c index b2964b68ff6..8ea43f824bc 100644 --- a/cli/src/cli-rpc-ops.c +++ b/cli/src/cli-rpc-ops.c @@ -2413,23 +2413,29 @@ static int  print_quota_list_output (cli_local_t *local, char *mountdir,                           char *default_sl, char *path)  { -        int64_t used_space       = 0; -        int64_t avail            = 0; -        char    *used_str         = NULL; -        char    *avail_str        = NULL; -        int     ret               = -1; -        char    *sl_final         = NULL; -        char    *hl_str           = NULL; -        double  sl_num           = 0; -        gf_boolean_t sl          = _gf_false; -        gf_boolean_t hl          = _gf_false; -        char percent_str[20]     = {0}; +        int64_t         avail            = 0; +        char           *used_str         = NULL; +        char           *avail_str        = NULL; +        int             ret              = -1; +        char           *sl_final         = NULL; +        char           *hl_str           = NULL; +        double          sl_num           = 0; +        gf_boolean_t    sl               = _gf_false; +        gf_boolean_t    hl               = _gf_false; +        char            percent_str[20]  = {0}; +        ssize_t         xattr_size       = 0;          struct quota_limit {                  int64_t hl;                  int64_t sl;          } __attribute__ ((__packed__)) existing_limits; +        struct quota_meta { +                int64_t size; +                int64_t file_count; +                int64_t dir_count; +        } __attribute__ ((__packed__)) used_space; +          ret = sys_lgetxattr (mountdir, "trusted.glusterfs.quota.limit-set",                               (void *)&existing_limits,                               sizeof (existing_limits)); @@ -2490,10 +2496,26 @@ print_quota_list_output (cli_local_t *local, char *mountdir,                  sl_final = percent_str;          } -        ret = sys_lgetxattr (mountdir, "trusted.glusterfs.quota.size", -                             &used_space, sizeof (used_space)); +        used_space.size = used_space.file_count = used_space.dir_count = 0; +        xattr_size = sys_lgetxattr (mountdir, "trusted.glusterfs.quota.size", +                                    NULL, 0); +        if (xattr_size > sizeof (int64_t)) { +                ret = sys_lgetxattr (mountdir, "trusted.glusterfs.quota.size", +                                     &used_space, sizeof (used_space)); +        } else if (xattr_size > 0) { +                /* This is for compatibility. +                 * Older version had only file usage +                 */ +                ret = sys_lgetxattr (mountdir, "trusted.glusterfs.quota.size", +                             &(used_space.size), sizeof (used_space.size)); +        } else { +                ret = -1; +        }          if (ret < 0) { +                gf_log ("cli", GF_LOG_ERROR, "Failed to get quota size " +                        "on path %s: %s", mountdir, strerror (errno)); +                  if (global_state->mode & GLUSTER_MODE_XML) {                          ret = cli_quota_xml_output (local, path, hl_str,                                                      sl_final, "N/A", @@ -2510,14 +2532,16 @@ print_quota_list_output (cli_local_t *local, char *mountdir,                                   "N/A", "N/A", "N/A", "N/A");                  }          } else { -                used_space = ntoh64 (used_space); +                used_space.size = ntoh64 (used_space.size); +                used_space.file_count = ntoh64 (used_space.file_count); +                used_space.dir_count = ntoh64 (used_space.dir_count); -                used_str = gf_uint64_2human_readable (used_space); +                used_str = gf_uint64_2human_readable (used_space.size); -                if (existing_limits.hl > used_space) { -                        avail = existing_limits.hl - used_space; +                if (existing_limits.hl > used_space.size) { +                        avail = existing_limits.hl - used_space.size;                          hl = _gf_false; -                        if (used_space > sl_num) +                        if (used_space.size > sl_num)                                  sl = _gf_true;                          else                                  sl = _gf_false; @@ -2544,8 +2568,9 @@ print_quota_list_output (cli_local_t *local, char *mountdir,                  if (used_str == NULL) {                          cli_out ("%-40s %7s %9s %11"PRIu64                                   "%9"PRIu64" %15s %18s", path, hl_str, -                                  sl_final, used_space, avail, sl? "Yes" : "No", -                                  hl? "Yes" : "No"); +                                  sl_final, used_space.size, avail, +                                  sl ? "Yes" : "No", +                                  hl ? "Yes" : "No");                  } else {                          cli_out ("%-40s %7s %9s %11s %7s %15s %20s", path, hl_str,                                   sl_final, used_str, avail_str, sl? "Yes" : "No", diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h index 8d7659b5015..a810f3a81f0 100644 --- a/libglusterfs/src/glusterfs.h +++ b/libglusterfs/src/glusterfs.h @@ -128,11 +128,12 @@  #define GLUSTERFS_POSIXLK_COUNT "glusterfs.posixlk-count"  #define GLUSTERFS_PARENT_ENTRYLK "glusterfs.parent-entrylk"  #define GLUSTERFS_INODELK_DOM_COUNT "glusterfs.inodelk-dom-count" -#define QUOTA_SIZE_KEY "trusted.glusterfs.quota.size"  #define GFID_TO_PATH_KEY "glusterfs.gfid2path"  #define GF_XATTR_STIME_PATTERN "trusted.glusterfs.*.stime"  #define GF_XATTR_TRIGGER_SYNC "glusterfs.geo-rep.trigger-sync" +#define QUOTA_SIZE_KEY "trusted.glusterfs.quota.size" +  /* Index xlator related */  #define GF_XATTROP_INDEX_GFID "glusterfs.xattrop_index_gfid"  #define GF_XATTROP_INDEX_COUNT "glusterfs.xattrop_index_count" diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c index e3321cf6ddb..e241e2c1ee0 100644 --- a/libglusterfs/src/syncop.c +++ b/libglusterfs/src/syncop.c @@ -2537,3 +2537,39 @@ syncop_inodelk (xlator_t *subvol, const char *volume, loc_t *loc, int32_t cmd,          return args.op_ret;  } + +int32_t +syncop_xattrop_cbk (call_frame_t *frame, void *cookie, xlator_t *this, +                    int32_t op_ret, int32_t op_errno, dict_t *dict, +                    dict_t *xdata) +{ +        struct syncargs *args = NULL; + +        args = cookie; + +        args->op_ret   = op_ret; +        args->op_errno = op_errno; + +        if (xdata) +                args->xdata = dict_ref (xdata); + +        __wake (args); + +        return 0; + +} + +int +syncop_xattrop (xlator_t *subvol, loc_t *loc, gf_xattrop_flags_t flags, +                dict_t *dict, dict_t *xdata) +{ +        struct syncargs args = {0, }; + +        SYNCOP (subvol, (&args), syncop_xattrop_cbk, subvol->fops->xattrop, +                loc, flags, dict, xdata); + +        if (args.op_ret < 0) +                return -args.op_errno; + +        return args.op_ret; +} diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h index 7f8ec7345b0..a9244a51552 100644 --- a/libglusterfs/src/syncop.h +++ b/libglusterfs/src/syncop.h @@ -439,4 +439,8 @@ syncop_inodelk (xlator_t *subvol, const char *volume, loc_t *loc, int32_t cmd,  int  syncop_ipc (xlator_t *subvol, int op, dict_t *xdata_in, dict_t **xdata_out); +int +syncop_xattrop (xlator_t *subvol, loc_t *loc, gf_xattrop_flags_t flags, +                dict_t *dict, dict_t *xdata); +  #endif /* _SYNCOP_H */ diff --git a/libglusterfs/src/xlator.c b/libglusterfs/src/xlator.c index 49af7d2e0e6..cc4726e0ea5 100644 --- a/libglusterfs/src/xlator.c +++ b/libglusterfs/src/xlator.c @@ -860,9 +860,51 @@ loc_is_root (loc_t *loc)          } else if (loc && loc->inode && __is_root_gfid (loc->inode->gfid)) {                  return _gf_true;          } +          return _gf_false;  } +int32_t +loc_build_child (loc_t *child, loc_t *parent, char *name) +{ +        int32_t  ret = -1; + +        GF_VALIDATE_OR_GOTO ("xlator", child, out); +        GF_VALIDATE_OR_GOTO ("xlator", parent, out); +        GF_VALIDATE_OR_GOTO ("xlator", name, out); + +        loc_gfid (parent, child->pargfid); + +        if (strcmp (parent->path, "/") == 0) +                ret = gf_asprintf ((char **)&child->path, "/%s", name); +        else +                ret = gf_asprintf ((char **)&child->path, "%s/%s", parent->path, +                                   name); + +        if (ret < 0 || !child->path) { +                ret = -1; +                goto out; +        } + +        child->name = strrchr (child->path, '/') + 1; + +        child->parent = inode_ref (parent->inode); +        child->inode = inode_new (parent->inode->table); + +        if (!child->inode) { +                ret = -1; +                goto out; +        } + +        ret = 0; + +out: +        if ((ret < 0) && child) +                loc_wipe (child); + +        return ret; +} +  int  xlator_destroy (xlator_t *xl)  { diff --git a/libglusterfs/src/xlator.h b/libglusterfs/src/xlator.h index e953ec04372..733f6cf47ab 100644 --- a/libglusterfs/src/xlator.h +++ b/libglusterfs/src/xlator.h @@ -960,6 +960,7 @@ int loc_path (loc_t *loc, const char *bname);  void loc_gfid (loc_t *loc, uuid_t gfid);  char* loc_gfid_utoa (loc_t *loc);  gf_boolean_t loc_is_root (loc_t *loc); +int32_t loc_build_child (loc_t *child, loc_t *parent, char *name);  int xlator_mem_acct_init (xlator_t *xl, int num_types);  int is_gf_log_command (xlator_t *trans, const char *name, char *value);  int glusterd_check_log_level (const char *value); diff --git a/xlators/features/marker/src/marker-mem-types.h b/xlators/features/marker/src/marker-mem-types.h index 1f74d504897..dc5ad16ed76 100644 --- a/xlators/features/marker/src/marker-mem-types.h +++ b/xlators/features/marker/src/marker-mem-types.h @@ -20,6 +20,8 @@ enum gf_marker_mem_types_ {          gf_marker_mt_quota_inode_ctx_t,          gf_marker_mt_marker_inode_ctx_t,          gf_marker_mt_inode_contribution_t, +        gf_marker_mt_quota_meta_t, +        gf_marker_mt_quota_synctask_t,          gf_marker_mt_end  };  #endif diff --git a/xlators/features/marker/src/marker-quota-helper.c b/xlators/features/marker/src/marker-quota-helper.c index ec0d83316c7..cdc2475c3e8 100644 --- a/xlators/features/marker/src/marker-quota-helper.c +++ b/xlators/features/marker/src/marker-quota-helper.c @@ -37,23 +37,27 @@ mq_loc_fill (loc_t *loc, inode_t *inode, inode_t *parent, char *path)          if (parent)                  loc->parent = inode_ref (parent); +        if (!uuid_is_null (inode->gfid)) +                uuid_copy (loc->gfid, inode->gfid); +          loc->path = gf_strdup (path);          if (!loc->path) {                  gf_log ("loc fill", GF_LOG_ERROR, "strdup failed"); -                goto loc_wipe; +                goto out;          }          loc->name = strrchr (loc->path, '/');          if (loc->name)                  loc->name++;          else -                goto loc_wipe; +                goto out;          ret = 0; -loc_wipe: + +out:          if (ret < 0)                  loc_wipe (loc); -out: +          return ret;  } @@ -222,37 +226,39 @@ mq_add_new_contribution_node (xlator_t *this, quota_inode_ctx_t *ctx,  int32_t -mq_dict_set_contribution (xlator_t *this, dict_t *dict, -                          loc_t *loc) +mq_dict_set_contribution (xlator_t *this, dict_t *dict, loc_t *loc, +                          uuid_t gfid, char *contri_key)  { -        int32_t ret              = -1; -        char    contri_key [512] = {0, }; +        int32_t ret                  = -1; +        char    key[CONTRI_KEY_MAX]  = {0, };          GF_VALIDATE_OR_GOTO ("marker", this, out);          GF_VALIDATE_OR_GOTO ("marker", dict, out);          GF_VALIDATE_OR_GOTO ("marker", loc, out); -        if (loc->parent) { -                GET_CONTRI_KEY (contri_key, loc->parent->gfid, ret); -                if (ret < 0) { -                        ret = -1; -                        goto out; -                } +        if (gfid && !uuid_is_null(gfid)) { +                GET_CONTRI_KEY (key, gfid, ret); +        } else if (loc->parent) { +                GET_CONTRI_KEY (key, loc->parent->gfid, ret);          } else {                  /* nameless lookup, fetch contributions to all parents */ -                GET_CONTRI_KEY (contri_key, NULL, ret); +                GET_CONTRI_KEY (key, NULL, ret);          } -        ret = dict_set_int64 (dict, contri_key, 0); -        if (ret < 0) { -                gf_log (this->name, GF_LOG_WARNING, -                        "unable to set dict value on %s.", -                        loc->path); +        if (ret < 0)                  goto out; -        } -        ret = 0; +        ret = dict_set_int64 (dict, key, 0); +        if (ret < 0) +                goto out; + +        if (contri_key) +                strncpy (contri_key, key, CONTRI_KEY_MAX); +  out: +        if (ret < 0) +                gf_log_callingfn (this->name, GF_LOG_ERROR, "dict set failed"); +          return ret;  } diff --git a/xlators/features/marker/src/marker-quota-helper.h b/xlators/features/marker/src/marker-quota-helper.h index b200413b0ad..161413debfa 100644 --- a/xlators/features/marker/src/marker-quota-helper.h +++ b/xlators/features/marker/src/marker-quota-helper.h @@ -44,7 +44,7 @@ inode_contribution_t *  mq_add_new_contribution_node (xlator_t *, quota_inode_ctx_t *, loc_t *);  int32_t -mq_dict_set_contribution (xlator_t *, dict_t *, loc_t *); +mq_dict_set_contribution (xlator_t *, dict_t *, loc_t *, uuid_t, char *);  quota_inode_ctx_t *  mq_inode_ctx_new (inode_t *, xlator_t *); diff --git a/xlators/features/marker/src/marker-quota.c b/xlators/features/marker/src/marker-quota.c index b8d7a774377..835108cc403 100644 --- a/xlators/features/marker/src/marker-quota.c +++ b/xlators/features/marker/src/marker-quota.c @@ -20,6 +20,7 @@  #include "byte-order.h"  #include "marker-quota.h"  #include "marker-quota-helper.h" +#include "syncop.h"  int  mq_loc_copy (loc_t *dst, loc_t *src) @@ -478,11 +479,11 @@ mq_get_child_contribution (call_frame_t *frame,                             dict_t *dict,                             struct iatt *postparent)  { -        int32_t        ret                = -1; -        int32_t        val                = 0; -        char           contri_key [512]   = {0, }; -        int64_t       *contri             = NULL; -        quota_local_t *local              = NULL; +        int32_t        ret                           = -1; +        int32_t        val                           = 0; +        char           contri_key[CONTRI_KEY_MAX]    = {0, }; +        int64_t       *contri                        = NULL; +        quota_local_t *local                         = NULL;          local = frame->local; @@ -543,16 +544,16 @@ mq_readdir_cbk (call_frame_t *frame,                  int32_t op_errno,                  gf_dirent_t *entries, dict_t *xdata)  { -        char           contri_key [512]   = {0, }; -        int32_t        ret                = 0; -        int32_t        val                = 0; -        off_t          offset             = 0; -        int32_t        count              = 0; -        dict_t        *dict               = NULL; -        quota_local_t *local              = NULL; -        gf_dirent_t   *entry              = NULL; -        call_frame_t  *newframe           = NULL; -        loc_t          loc                = {0, }; +        char           contri_key[CONTRI_KEY_MAX]    = {0, }; +        int32_t        ret                           = 0; +        int32_t        val                           = 0; +        off_t          offset                        = 0; +        int32_t        count                         = 0; +        dict_t        *dict                          = NULL; +        quota_local_t *local                         = NULL; +        gf_dirent_t   *entry                         = NULL; +        call_frame_t  *newframe                      = NULL; +        loc_t          loc                           = {0, };          local = mq_local_ref (frame->local); @@ -824,7 +825,10 @@ mq_get_dirty_xattr (call_frame_t *frame, void *cookie, xlator_t *this,          if (uuid_is_null (local->loc.gfid))                  uuid_copy (local->loc.gfid, local->loc.inode->gfid); -        GF_UUID_ASSERT (local->loc.gfid); +        if (uuid_is_null (local->loc.gfid)) { +                ret = -1; +                goto err; +        }          STACK_WIND (frame,                      mq_check_if_still_dirty, @@ -850,14 +854,12 @@ err:   * 0 other wise   */  int32_t -mq_update_dirty_inode (xlator_t *this, -                       loc_t *loc, -                       quota_inode_ctx_t *ctx, +mq_update_dirty_inode (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx,                         inode_contribution_t *contribution)  {          int32_t          ret        = -1;          quota_local_t   *local      = NULL; -        gf_boolean_t    status     = _gf_false; +        gf_boolean_t     status     = _gf_false;          struct gf_flock  lock       = {0, };          call_frame_t    *frame      = NULL; @@ -1017,14 +1019,14 @@ err:  int32_t  mq_create_xattr (xlator_t *this, call_frame_t *frame)  { -        int32_t               ret       = 0; -        int64_t              *value     = NULL; -        int64_t              *size      = NULL; -        dict_t               *dict      = NULL; -        char                  key[512]  = {0, }; -        quota_local_t        *local     = NULL; -        quota_inode_ctx_t    *ctx       = NULL; -        inode_contribution_t *contri    = NULL; +        int32_t               ret                  = 0; +        int64_t              *value                = NULL; +        int64_t              *size                 = NULL; +        dict_t               *dict                 = NULL; +        char                  key[CONTRI_KEY_MAX]  = {0, }; +        quota_local_t        *local                = NULL; +        quota_inode_ctx_t    *ctx                  = NULL; +        inode_contribution_t *contri               = NULL;          if (frame == NULL || this == NULL)                  return 0; @@ -1070,7 +1072,10 @@ mq_create_xattr (xlator_t *this, call_frame_t *frame)                          goto free_value;          } -        GF_UUID_ASSERT (local->loc.gfid); +        if (uuid_is_null (local->loc.gfid)) { +                ret = -1; +                goto out; +        }          STACK_WIND (frame, mq_create_dirty_xattr, FIRST_CHILD(this),                      FIRST_CHILD(this)->fops->xattrop, &local->loc, @@ -1105,11 +1110,12 @@ mq_check_n_set_inode_xattr (call_frame_t *frame, void *cookie,                              inode_t *inode, struct iatt *buf, dict_t *dict,                              struct iatt *postparent)  { -        quota_local_t        *local           = NULL; -        int64_t              *size            = NULL, *contri = NULL; -        int8_t                dirty           = 0; -        int32_t               ret             = 0; -        char                  contri_key[512] = {0, }; +        quota_local_t        *local                      = NULL; +        int64_t              *size                       = NULL; +        int64_t              *contri                     = NULL; +        int8_t                dirty                      = 0; +        int32_t               ret                        = 0; +        char                  contri_key[CONTRI_KEY_MAX] = {0, };          if (op_ret < 0) {                  goto out; @@ -1174,7 +1180,7 @@ mq_get_xattr (call_frame_t *frame, void *cookie, xlator_t *this,                  goto err;          } -        ret = mq_req_xattr (this, &local->loc, xattr_req); +        ret = mq_req_xattr (this, &local->loc, xattr_req, NULL);          if (ret < 0) {                  gf_log (this->name, GF_LOG_WARNING, "cannot request xattr");                  goto err; @@ -1183,7 +1189,10 @@ mq_get_xattr (call_frame_t *frame, void *cookie, xlator_t *this,          if (uuid_is_null (local->loc.gfid))                  uuid_copy (local->loc.gfid, local->loc.inode->gfid); -        GF_UUID_ASSERT (local->loc.gfid); +        if (uuid_is_null (local->loc.gfid)) { +                ret = -1; +                goto err; +        }          STACK_WIND (frame, mq_check_n_set_inode_xattr, FIRST_CHILD(this),                      FIRST_CHILD(this)->fops->lookup, &local->loc, xattr_req); @@ -1634,15 +1643,17 @@ mq_update_inode_contribution (call_frame_t *frame, void *cookie,                                struct iatt *buf, dict_t *dict,                                struct iatt *postparent)  { -        int32_t               ret              = -1; -        int64_t              *size             = NULL, size_int = 0, contri_int = 0; -        int64_t              *contri           = NULL; -        int64_t              *delta            = NULL; -        char                  contri_key [512] = {0, }; -        dict_t               *newdict          = NULL; -        quota_local_t        *local            = NULL; -        quota_inode_ctx_t    *ctx              = NULL; -        inode_contribution_t *contribution     = NULL; +        int32_t               ret                         = -1; +        int64_t              *size                        = NULL; +        int64_t               size_int                    = 0; +        int64_t               contri_int                  = 0; +        int64_t              *contri                      = NULL; +        int64_t              *delta                       = NULL; +        char                  contri_key[CONTRI_KEY_MAX]  = {0, }; +        dict_t               *newdict                     = NULL; +        quota_local_t        *local                       = NULL; +        quota_inode_ctx_t    *ctx                         = NULL; +        inode_contribution_t *contribution                = NULL;          local = frame->local; @@ -1760,11 +1771,11 @@ mq_fetch_child_size_and_contri (call_frame_t *frame, void *cookie,                                  xlator_t *this, int32_t op_ret,                                  int32_t op_errno, dict_t *xdata)  { -        int32_t            ret              = -1; -        char               contri_key [512] = {0, }; -        dict_t            *newdict          = NULL; -        quota_local_t     *local            = NULL; -        quota_inode_ctx_t *ctx              = NULL; +        int32_t            ret                         = -1; +        char               contri_key[CONTRI_KEY_MAX]  = {0, }; +        dict_t            *newdict                     = NULL; +        quota_local_t     *local                       = NULL; +        quota_inode_ctx_t *ctx                         = NULL;          local = frame->local; @@ -2024,19 +2035,1156 @@ err:          return -1;  } +int32_t +mq_dict_set_meta (dict_t *dict, char *key, const quota_meta_t *meta, +                  ia_type_t ia_type) +{ +        int32_t         ret      = -1; +        quota_meta_t   *value    = NULL; + +        QUOTA_ALLOC_OR_GOTO (value, quota_meta_t, ret, out); + +        value->size = hton64 (meta->size); +        value->file_count = hton64 (meta->file_count); +        value->dir_count = hton64 (meta->dir_count); + +        if (ia_type == IA_IFDIR) { +                ret = dict_set_bin (dict, key, value, sizeof (*value)); +        } else { +                /* For a file we don't need to store dir_count in the +                 * quota size xattr, so we set the len of the data in the dict +                 * as 128bits, so when the posix xattrop reads the dict, it only +                 * performs operations on size and file_count +                 */ +                ret = dict_set_bin (dict, key, value, +                                    sizeof (*value) - sizeof (int64_t)); +        } + +        if (ret < 0) { +                gf_log_callingfn ("marker", GF_LOG_ERROR, "dict set failed"); +                GF_FREE (value); +        } + +out: +        return ret; +} + +int32_t +mq_dict_get_meta (dict_t *dict, char *key, quota_meta_t *meta) +{ +        int32_t        ret      = -1; +        data_t        *data     = NULL; +        quota_meta_t  *value    = NULL; + +        if (!dict || !key || !meta) +                goto out; + +        data = dict_get (dict, key); +        if (!data || !data->data) +                goto out; + +        if (data->len > sizeof (int64_t)) { +                value = (quota_meta_t *) data->data; +                meta->size = ntoh64 (value->size); +                meta->file_count = ntoh64 (value->file_count); +                if (data->len > (sizeof (int64_t)) * 2) +                        meta->dir_count  = ntoh64 (value->dir_count); +                else +                        meta->dir_count = 0; +        } else { +                /* This can happen during software upgrade. +                 * Older version of glusterfs will not have inode count. +                 * Return failure, this will be healed as part of lookup +                 */ +                gf_log_callingfn ("marker", GF_LOG_DEBUG, "Object quota xattrs " +                                  "missing: len = %d", data->len); +                ret = -1; +                goto out; +        } + +        ret = 0; +out: + +        return ret; +} + +void +mq_compute_delta (quota_meta_t *delta, const quota_meta_t *op1, +                  const quota_meta_t *op2) +{ +        delta->size       = op1->size - op2->size; +        delta->file_count = op1->file_count - op2->file_count; +        delta->dir_count  = op1->dir_count - op2->dir_count; +} + +void +mq_add_meta (quota_meta_t *dst, const quota_meta_t *src) +{ +        dst->size       += src->size; +        dst->file_count += src->file_count; +        dst->dir_count  += src->dir_count; +} + +void +mq_sub_meta (quota_meta_t *dst, const quota_meta_t *src) +{ +        if (src == NULL) { +                dst->size       = -dst->size; +                dst->file_count = -dst->file_count; +                dst->dir_count  = -dst->dir_count; +        } else { +                dst->size       = src->size - dst->size; +                dst->file_count = src->file_count - dst->file_count; +                dst->dir_count  = src->dir_count - dst->dir_count; +        } +} + +gf_boolean_t +quota_meta_is_null (const quota_meta_t *meta) +{ +        if (meta->size == 0 && +            meta->file_count == 0 && +            meta->dir_count == 0) +                return _gf_true; + +        return _gf_false; +} + +int32_t +mq_are_xattrs_set (xlator_t *this, loc_t *loc, gf_boolean_t *result, +                   gf_boolean_t *objects) +{ +        int32_t        ret                         = -1; +        char           contri_key[CONTRI_KEY_MAX]  = {0, }; +        quota_meta_t   meta                        = {0, }; +        struct iatt    stbuf                       = {0,}; +        dict_t        *dict                        = NULL; +        dict_t        *rsp_dict                    = NULL; + +        dict = dict_new (); +        if (dict == NULL) { +                gf_log (this->name, GF_LOG_ERROR, "dict_new failed"); +                goto out; +        } + +        ret = mq_req_xattr (this, loc, dict, contri_key); +        if (ret < 0) +                goto out; + +        ret = syncop_lookup (FIRST_CHILD(this), loc, dict, &stbuf, &rsp_dict, +                             NULL); +        if (ret < 0) { +                gf_log (this->name, (-ret == ENOENT || -ret == ESTALE) +                        ? GF_LOG_DEBUG:GF_LOG_ERROR, "lookup failed " +                        "for %s: %s", loc->path, strerror (-ret)); +                goto out; +        } + +        if (rsp_dict == NULL) { +                *result = _gf_false; +                goto out; +        } + +        *result = _gf_true; +        if (loc->inode->ia_type == IA_IFDIR) { +                ret = mq_dict_get_meta (rsp_dict, QUOTA_SIZE_KEY, &meta); +                if (ret < 0 || meta.dir_count == 0) { +                        ret = 0; +                        *result = _gf_false; +                        goto out; +                } +                *objects = _gf_true; +        } + +        if (!loc_is_root(loc) && !dict_get (rsp_dict, contri_key)) { +                *result = _gf_false; +                goto out; +        } + +out: +        if (dict) +                dict_unref (dict); + +        if (rsp_dict) +                dict_unref (rsp_dict); + +        return ret; +} + +int32_t +mq_create_xattrs (xlator_t *this, loc_t *loc, gf_boolean_t objects) +{ +        quota_meta_t           size                 = {0, }; +        quota_meta_t           contri               = {0, }; +        int32_t                ret                  = -1; +        char                   key[CONTRI_KEY_MAX]  = {0, }; +        dict_t                *dict                 = NULL; +        quota_inode_ctx_t     *ctx                  = NULL; +        inode_contribution_t  *contribution         = NULL; + +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); + +        ret = mq_inode_ctx_get (loc->inode, this, &ctx); +        if (ret < 0) { +                ctx = mq_inode_ctx_new (loc->inode, this); +                if (ctx == NULL) { +                        gf_log (this->name, GF_LOG_WARNING, +                                "mq_inode_ctx_new failed"); +                        ret = -1; +                        goto out; +                } +        } + +        dict = dict_new (); +        if (!dict) { +                gf_log (this->name, GF_LOG_ERROR, "dict_new failed"); +                ret = -1; +                goto out; +        } + +        if (loc->inode->ia_type == IA_IFDIR) { +                if (objects == _gf_false) { +                        /* Initial object count of a directory is 1 */ +                        size.dir_count = 1; +                } +                ret = mq_dict_set_meta (dict, QUOTA_SIZE_KEY, &size, IA_IFDIR); +                if (ret < 0) +                        goto out; +        } + +        if (!loc_is_root (loc)) { +                contribution = mq_add_new_contribution_node (this, ctx, loc); +                if (contribution == NULL) { +                        ret = -1; +                        goto out; +                } + +                GET_CONTRI_KEY (key, contribution->gfid, ret); +                ret = mq_dict_set_meta (dict, key, &contri, +                                        loc->inode->ia_type); +                if (ret < 0) +                        goto out; +        } + +        ret = syncop_xattrop(FIRST_CHILD(this), loc, GF_XATTROP_ADD_ARRAY64, +                             dict, NULL); + +        if (ret < 0) { +                gf_log (this->name, (-ret == ENOENT || -ret == ESTALE) +                        ? GF_LOG_DEBUG:GF_LOG_ERROR, "xattrop failed " +                        "for %s: %s", loc->path, strerror (-ret)); +                goto out; +        } + +out: +        if (dict) +                dict_unref (dict); + +        return ret; +} + +int32_t +mq_lock (xlator_t *this, loc_t *loc, short l_type) +{ +        struct gf_flock  lock  = {0, }; +        int32_t          ret   = -1; + +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); + +        gf_log (this->name, GF_LOG_DEBUG, "set lock type %d on %s", +                l_type, loc->path); + +        lock.l_len    = 0; +        lock.l_start  = 0; +        lock.l_type   = l_type; +        lock.l_whence = SEEK_SET; + +        ret = syncop_inodelk (FIRST_CHILD(this), this->name, loc, F_SETLKW, +                              &lock, NULL, NULL); +        if (ret < 0) +                gf_log (this->name, (-ret == ENOENT || -ret == ESTALE) +                        ? GF_LOG_DEBUG:GF_LOG_ERROR, "inodelk failed " +                        "for %s: %s", loc->path, strerror (-ret)); + +out: + +        return ret; +} + +int32_t +mq_get_dirty (xlator_t *this, loc_t *loc, int32_t *dirty) +{ +        int32_t        ret              = -1; +        int8_t         value            = 0; +        dict_t        *dict             = NULL; +        dict_t        *rsp_dict         = NULL; +        struct iatt    stbuf            = {0,}; + +        dict = dict_new (); +        if (dict == NULL) { +                gf_log (this->name, GF_LOG_ERROR, "dict_new failed"); +                goto out; +        } + +        ret = dict_set_int64 (dict, QUOTA_DIRTY_KEY, 0); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_WARNING, "dict set failed"); +                goto out; +        } + +        ret = syncop_lookup (FIRST_CHILD(this), loc, dict, &stbuf, &rsp_dict, +                             NULL); +        if (ret < 0) { +                gf_log (this->name, (-ret == ENOENT || -ret == ESTALE) +                        ? GF_LOG_DEBUG:GF_LOG_ERROR, "lookup failed " +                        "for %s: %s", loc->path, strerror (-ret)); +                goto out; +        } + +        ret = dict_get_int8 (rsp_dict, QUOTA_DIRTY_KEY, &value); +        if (ret < 0) +                goto out; + +        *dirty = value; + +out: +        if (dict) +                dict_unref (dict); + +        if (rsp_dict) +                dict_unref (rsp_dict); + +        return ret; +} + +int32_t +mq_mark_dirty (xlator_t *this, loc_t *loc, int32_t dirty) +{ +        int32_t            ret      = -1; +        dict_t            *dict     = NULL; +        quota_inode_ctx_t *ctx      = NULL; + +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); + +        dict = dict_new (); +        if (!dict) { +                gf_log (this->name, GF_LOG_ERROR, "dict_new failed"); +                goto out; +        } + +        ret = dict_set_int8 (dict, QUOTA_DIRTY_KEY, dirty); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_ERROR, "dict_set failed"); +                goto out; +        } + +        ret = syncop_setxattr (FIRST_CHILD(this), loc, dict, 0); +        if (ret < 0) { +                gf_log (this->name, (-ret == ENOENT || -ret == ESTALE) +                        ? GF_LOG_DEBUG:GF_LOG_ERROR, "setxattr dirty = %d " +                        "failed for %s: %s", dirty, loc->path, strerror (-ret)); +                goto out; +        } + +        ret = mq_inode_ctx_get (loc->inode, this, &ctx); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_ERROR, "failed to get inode ctx for " +                        "%s", loc->path); +                goto out; +        } + +        LOCK (&ctx->lock); +        { +                ctx->dirty = dirty; +        } +        UNLOCK (&ctx->lock); + +out: +        if (dict) +                dict_unref (dict); + +        return ret; +} + +int32_t +_mq_get_metadata (xlator_t *this, loc_t *loc, quota_meta_t *contri, +                  quota_meta_t *size, uuid_t contri_gfid) +{ +        int32_t            ret                         = -1; +        quota_meta_t       meta                        = {0, }; +        char               contri_key[CONTRI_KEY_MAX]  = {0, }; +        dict_t            *dict                        = NULL; +        dict_t            *rsp_dict                    = NULL; +        struct iatt        stbuf                       = {0,}; + +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); + +        if (size == NULL && contri == NULL) +                goto out; + +        dict = dict_new (); +        if (dict == NULL) { +                gf_log (this->name, GF_LOG_ERROR, "dict_new failed"); +                goto out; +        } + +        if (size && loc->inode->ia_type == IA_IFDIR) { +                ret = dict_set_int64 (dict, QUOTA_SIZE_KEY, 0); +                if (ret < 0) { +                        gf_log (this->name, GF_LOG_ERROR, "dict_set failed."); +                        goto out; +                } +        } + +        if (contri && !loc_is_root(loc)) { +                ret = mq_dict_set_contribution (this, dict, loc, contri_gfid, +                                                contri_key); +                if (ret < 0) +                        goto out; +        } + +        ret = syncop_lookup (FIRST_CHILD(this), loc, dict, &stbuf, &rsp_dict, +                             NULL); +        if (ret < 0) { +                gf_log (this->name, (-ret == ENOENT || -ret == ESTALE) +                        ? GF_LOG_DEBUG:GF_LOG_ERROR, "lookup failed " +                        "for %s: %s", loc->path, strerror (-ret)); +                goto out; +        } + +        if (size) { +                if (loc->inode->ia_type == IA_IFDIR) { +                        ret = mq_dict_get_meta (rsp_dict, QUOTA_SIZE_KEY, +                                                &meta); +                        if (ret < 0) { +                                gf_log (this->name, GF_LOG_ERROR, +                                        "dict_get failed."); +                                goto out; +                        } + +                        size->size = meta.size; +                        size->file_count = meta.file_count; +                        size->dir_count = meta.dir_count; +                } else { +                        size->size = stbuf.ia_blocks * 512; +                        size->file_count = 1; +                        size->dir_count = 0; +                } +        } + +        if (contri && !loc_is_root(loc)) { +                ret = mq_dict_get_meta (rsp_dict, contri_key, &meta); +                if (ret < 0) { +                        contri->size = 0; +                        contri->file_count = 0; +                        contri->dir_count = 0; +                } else { +                        contri->size = meta.size; +                        contri->file_count = meta.file_count; +                        contri->dir_count = meta.dir_count; +                } +        } + +        ret = 0; + +out: +        if (dict) +                dict_unref (dict); + +        if (rsp_dict) +                dict_unref (rsp_dict); + +        return ret; +} + +int32_t +mq_get_metadata (xlator_t *this, loc_t *loc, quota_meta_t *contri, +                 quota_meta_t *size, quota_inode_ctx_t *ctx, +                 inode_contribution_t *contribution) +{ +        int32_t         ret      = -1; + +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); +        GF_VALIDATE_OR_GOTO ("marker", ctx, out); +        GF_VALIDATE_OR_GOTO ("marker", contribution, out); + +        if (size == NULL && contri == NULL) { +                ret = 0; +                goto out; +        } + +        ret = _mq_get_metadata (this, loc, contri, size, contribution->gfid); +        if (ret < 0) { +                gf_log_callingfn (this->name, GF_LOG_ERROR, "Failed to get " +                                  "metadata for %s", loc->path); +                goto out; +        } + +        if (size) { +                LOCK (&ctx->lock); +                { +                        ctx->size = size->size; +                        ctx->file_count = size->file_count; +                        ctx->dir_count = size->dir_count; +                } +                UNLOCK  (&ctx->lock); +        } + +        if (contri) { +                LOCK (&contribution->lock); +                { +                        contribution->contribution = contri->size; +                        contribution->file_count = contri->file_count; +                        contribution->dir_count = contri->dir_count; +                } +                UNLOCK (&contribution->lock); +        } + +out: +        return ret; +} + +int32_t +mq_get_size (xlator_t *this, loc_t *loc, quota_meta_t *size) +{ +        int32_t         ret      = -1; + +        ret = _mq_get_metadata (this, loc, NULL, size, 0); +        if (ret < 0) { +                gf_log_callingfn (this->name, GF_LOG_ERROR, "Failed to get " +                                  "metadata for %s", loc->path); +                goto out; +        } + +out: +        return ret; +} + +int32_t +mq_get_contri (xlator_t *this, loc_t *loc, quota_meta_t *contri, +               uuid_t contri_gfid) +{ +        int32_t         ret      = -1; + +        ret = _mq_get_metadata (this, loc, contri, NULL, contri_gfid); +        if (ret < 0) { +                gf_log_callingfn (this->name, GF_LOG_ERROR, "Failed to get " +                                  "metadata for %s", loc->path); +                goto out; +        } + +out: +        return ret; +} + +int32_t +mq_get_delta (xlator_t *this, loc_t *loc, quota_meta_t *delta, +              quota_inode_ctx_t *ctx, inode_contribution_t *contribution) +{ +        int32_t         ret      = -1; +        quota_meta_t    size     = {0, }; +        quota_meta_t    contri   = {0, }; + +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); +        GF_VALIDATE_OR_GOTO ("marker", ctx, out); +        GF_VALIDATE_OR_GOTO ("marker", contribution, out); + +        ret = mq_get_metadata (this, loc, &contri, &size, ctx, contribution); +        if (ret < 0) +                goto out; + +        mq_compute_delta (delta, &size, &contri); + +out: +        return ret; +} + +int32_t +mq_remove_contri (xlator_t *this, loc_t *loc, inode_contribution_t *contri) +{ +        int32_t              ret                         = -1; +        char                 contri_key[CONTRI_KEY_MAX]  = {0, }; + +        GET_CONTRI_KEY (contri_key, contri->gfid, ret); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_ERROR, "get contri_key " +                        "failed for %s", uuid_utoa(contri->gfid)); +                goto out; +        } + +        ret = syncop_removexattr (FIRST_CHILD(this), loc, contri_key, 0); +        if (ret < 0) { +                if (-ret == ENOENT || -ret == ESTALE) { +                        /* Remove contri in done when unlink operation is +                         * performed, so return success on ENOENT/ESTSLE +                         */ +                        ret = 0; +                } else { +                        gf_log (this->name, GF_LOG_ERROR, "removexattr %s " +                                "failed for %s: %s", contri_key, loc->path, +                                strerror (-ret)); +                        goto out; +                } +        } + +        LOCK (&contri->lock); +        { +                contri->contribution = 0; +                contri->file_count = 0; +                contri->dir_count = 0; +        } +        UNLOCK (&contri->lock); + +        ret = 0; +out: + +        return ret; +} + +int32_t +mq_update_contri (xlator_t *this, loc_t *loc, inode_contribution_t *contri, +                  quota_meta_t *delta) +{ +        int32_t              ret                         = -1; +        char                 contri_key[CONTRI_KEY_MAX]  = {0, }; +        dict_t              *dict                        = NULL; + +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); +        GF_VALIDATE_OR_GOTO ("marker", delta, out); +        GF_VALIDATE_OR_GOTO ("marker", contri, out); + +        if (quota_meta_is_null (delta)) { +                ret = 0; +                goto out; +        } + +        dict = dict_new (); +        if (!dict) { +                gf_log (this->name, GF_LOG_ERROR, "dict_new failed"); +                ret = -1; +                goto out; +        } + +        GET_CONTRI_KEY (contri_key, contri->gfid, ret); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_ERROR, "get contri_key " +                        "failed for %s", uuid_utoa(contri->gfid)); +                goto out; +        } + +        ret = mq_dict_set_meta (dict, contri_key, delta, loc->inode->ia_type); +        if (ret < 0) +                goto out; + +        ret = syncop_xattrop(FIRST_CHILD(this), loc, GF_XATTROP_ADD_ARRAY64, +                             dict, NULL); +        if (ret < 0) { +                gf_log (this->name, (-ret == ENOENT || -ret == ESTALE) +                        ? GF_LOG_DEBUG:GF_LOG_ERROR, "xattrop failed " +                        "for %s: %s", loc->path, strerror (-ret)); +                goto out; +        } + +        LOCK (&contri->lock); +        { +                contri->contribution += delta->size; +                contri->file_count += delta->file_count; +                contri->dir_count += delta->dir_count; +        } +        UNLOCK (&contri->lock); + +out: +        if (dict) +                dict_unref (dict); + +        return ret; +} + +int32_t +mq_update_size (xlator_t *this, loc_t *loc, quota_meta_t *delta) +{ +        int32_t              ret              = -1; +        quota_inode_ctx_t   *ctx              = NULL; +        dict_t              *dict             = NULL; + +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); +        GF_VALIDATE_OR_GOTO ("marker", delta, out); + +        if (quota_meta_is_null (delta)) { +                ret = 0; +                goto out; +        } + +        ret = mq_inode_ctx_get (loc->inode, this, &ctx); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_ERROR, "failed to get inode ctx for " +                        "%s", loc->path); +                goto out; +        } + +        dict = dict_new (); +        if (!dict) { +                gf_log (this->name, GF_LOG_ERROR, "dict_new failed"); +                ret = -1; +                goto out; +        } + +        ret = mq_dict_set_meta (dict, QUOTA_SIZE_KEY, delta, +                                loc->inode->ia_type); +        if (ret < 0) +                goto out; + +        ret = syncop_xattrop(FIRST_CHILD(this), loc, GF_XATTROP_ADD_ARRAY64, +                             dict, NULL); +        if (ret < 0) { +                gf_log (this->name, (-ret == ENOENT || -ret == ESTALE) +                        ? GF_LOG_DEBUG:GF_LOG_ERROR, "xattrop failed " +                        "for %s: %s", loc->path, strerror (-ret)); +                goto out; +        } + +        LOCK (&ctx->lock); +        { +                ctx->size += delta->size; +                ctx->file_count += delta->file_count; +                ctx->dir_count += delta->dir_count; +        } +        UNLOCK (&ctx->lock); + +out: +        if (dict) +                dict_unref (dict); + +        return ret; +}  int -mq_initiate_quota_txn (xlator_t *this, loc_t *loc) +mq_synctask_cleanup (int ret, call_frame_t *frame, void *opaque)  { -        int32_t               ret          = -1; -        gf_boolean_t          status       = _gf_false; -        quota_inode_ctx_t    *ctx          = NULL; -        inode_contribution_t *contribution = NULL; +        quota_synctask_t       *args         = NULL; + +        GF_ASSERT (opaque); + +        args = (quota_synctask_t *) opaque; +        loc_wipe (&args->loc); +        if (args->dict) +                dict_unref (args->dict); + +        if (!args->is_static) +                GF_FREE (args); + +        return 0; +} + +int +mq_synctask (xlator_t *this, synctask_fn_t task, gf_boolean_t spawn, loc_t *loc, +             dict_t *dict, struct iatt *buf, int64_t contri) +{ +        int32_t              ret         = -1; +        quota_synctask_t    *args        = NULL; +        quota_synctask_t     static_args = {0, }; + +        if (spawn) { +                QUOTA_ALLOC_OR_GOTO (args, quota_synctask_t, ret, out); +                args->is_static = _gf_false; +        } else { +                args = &static_args; +                args->is_static = _gf_true; +        } + +        args->this = this; +        loc_copy (&args->loc, loc); +        args->contri = contri; +        if (dict) +                args->dict = dict_ref (dict); +        if (buf) +                args->buf = *buf; + + +        if (spawn) { +                ret = synctask_new (this->ctx->env, task, mq_synctask_cleanup, +                                    NULL, args); +                if (ret) { +                        gf_log (this->name, GF_LOG_ERROR, "Failed to spawn " +                                "new synctask"); +                        mq_synctask_cleanup (ret, NULL, args); +                } +        } else { +                ret = task (args); +                mq_synctask_cleanup (ret, NULL, args); +        } + +out: +        return ret; +} + +int +mq_start_quota_txn_v2 (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx, +                       inode_contribution_t *contri) +{ +        int32_t            ret        = -1; +        loc_t              child_loc  = {0,}; +        loc_t              parent_loc = {0,}; +        gf_boolean_t       locked     = _gf_false; +        gf_boolean_t       dirty      = _gf_false; +        gf_boolean_t       status     = _gf_true; +        quota_meta_t       delta      = {0, }; + +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); +        GF_VALIDATE_OR_GOTO ("marker", ctx, out); +        GF_VALIDATE_OR_GOTO ("marker", contri, out); + +        ret = mq_loc_copy (&child_loc, loc); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_ERROR, "loc copy failed"); +                goto out; +        } + +        if (uuid_is_null (child_loc.gfid)) +                uuid_copy (child_loc.gfid, child_loc.inode->gfid); + +        if (uuid_is_null (child_loc.gfid)) { +                ret = -1; +                gf_log (this->name, GF_LOG_DEBUG, "UUID is null for %s", +                        child_loc.path); +                goto out; +        } + +        while (!__is_root_gfid (child_loc.gfid)) { +                /* To improve performance, abort current transaction +                 * if one is already in progress for same inode +                 */ +                ret = mq_test_and_set_ctx_updation_status (ctx, &status); +                if (ret < 0 || status == _gf_true) +                        goto out; + +                ret = mq_inode_loc_fill (NULL, child_loc.parent, &parent_loc); +                if (ret < 0) { +                        gf_log (this->name, GF_LOG_ERROR, "loc fill failed"); +                        goto out; +                } + +                ret = mq_lock (this, &parent_loc, F_WRLCK); +                if (ret < 0) +                        goto out; +                locked = _gf_true; + +                mq_set_ctx_updation_status (ctx, _gf_false); +                status = _gf_true; + +                ret = mq_get_delta (this, &child_loc, &delta, ctx, contri); +                if (ret < 0) +                        goto out; + +                if (quota_meta_is_null (&delta)) +                        goto out; + +                ret = mq_mark_dirty (this, &parent_loc, 1); +                if (ret < 0) +                        goto out; +                dirty = _gf_true; + +                ret = mq_update_contri (this, &child_loc, contri, &delta); +                if (ret < 0) { +                        gf_log (this->name, GF_LOG_ERROR, "contri " +                                "update failed for %s", child_loc.path); +                        goto out; +                } + +                ret = mq_update_size (this, &parent_loc, &delta); +                if (ret < 0) { +                        gf_log (this->name, GF_LOG_WARNING, "rollback " +                                "contri updation"); +                        mq_sub_meta (&delta, NULL); +                        mq_update_contri (this, &child_loc, contri, &delta); +                        goto out; +                } + +                ret = mq_mark_dirty (this, &parent_loc, 0); +                dirty = _gf_false; + +                ret = mq_lock (this, &parent_loc, F_UNLCK); +                locked = _gf_false; + +                if (__is_root_gfid (parent_loc.gfid)) +                        break; + +                /* Repeate above steps upwards till the root */ +                loc_wipe (&child_loc); +                ret = mq_loc_copy (&child_loc, &parent_loc); +                if (ret < 0) +                        goto out; +                loc_wipe (&parent_loc); + +                ret = mq_inode_ctx_get (child_loc.inode, this, &ctx); +                if (ret < 0) +                        goto out; + +                if (list_empty (&ctx->contribution_head)) { +                        gf_log (this->name, GF_LOG_ERROR, +                                "contribution node list is empty (%s)", +                                uuid_utoa(child_loc.inode->gfid)); +                        ret = -1; +                        goto out; +                } +                contri = mq_get_contribution_node (child_loc.parent, ctx); +                GF_ASSERT (contri != NULL); +        } + +out: +        if (ret >= 0 && dirty) +                ret = mq_mark_dirty (this, &parent_loc, 0); + +        if (locked) +                ret = mq_lock (this, &parent_loc, F_UNLCK); + +        if (status == _gf_false) +                mq_set_ctx_updation_status (ctx, _gf_false); + +        loc_wipe (&child_loc); +        loc_wipe (&parent_loc); + +        return ret; +} + +int +mq_create_xattrs_task (void *opaque) +{ +        int32_t                  ret        = -1; +        gf_boolean_t             locked     = _gf_false; +        gf_boolean_t             xattrs_set = _gf_false; +        gf_boolean_t             objects    = _gf_false; +        gf_boolean_t             need_txn   = _gf_false; +        quota_synctask_t        *args       = NULL; +        xlator_t                *this       = NULL; +        loc_t                   *loc        = NULL; + +        GF_ASSERT (opaque); + +        args = (quota_synctask_t *) opaque; +        loc = &args->loc; +        this = args->this; +        THIS = this; + +        if (uuid_is_null (loc->gfid)) +                uuid_copy (loc->gfid, loc->inode->gfid); + +        if (uuid_is_null (loc->gfid)) { +                ret = -1; +                gf_log (this->name, GF_LOG_DEBUG, "UUID is null for %s", +                        loc->path); +                goto out; +        } + +        ret = mq_lock (this, loc, F_WRLCK); +        if (ret < 0) +                goto out; +        locked = _gf_true; + +        ret = mq_are_xattrs_set (this, loc, &xattrs_set, &objects); +        if (ret < 0 || xattrs_set) +                goto out; + +        ret = mq_create_xattrs (this, loc, objects); +        if (ret < 0) +                goto out; + +        need_txn = _gf_true; +out: +        if (locked) +                ret = mq_lock (this, loc, F_UNLCK); + +        if (need_txn) +                ret = mq_initiate_quota_blocking_txn (this, loc); + +        return ret; +} + +int +mq_create_xattrs_txn (xlator_t *this, loc_t *loc) +{ +        int32_t                  ret        = -1; + +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); + +        ret = mq_synctask (this, mq_create_xattrs_task, _gf_true, loc, NULL, +                           NULL, 0); +out: +        return ret; +} + +int +mq_create_xattrs_blocking_txn (xlator_t *this, loc_t *loc) +{ +        int32_t                  ret        = -1; + +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); + +        ret = mq_synctask (this, mq_create_xattrs_task, _gf_false, loc, NULL, +                           NULL, 0); +out: +        return ret; +} + +int32_t +mq_reduce_parent_size_task (void *opaque) +{ +        int32_t                  ret           = -1; +        quota_inode_ctx_t       *ctx           = NULL; +        inode_contribution_t    *contribution  = NULL; +        quota_meta_t             delta         = {0, }; +        loc_t                    parent_loc    = {0,}; +        gf_boolean_t             locked        = _gf_false; +        gf_boolean_t             dirty         = _gf_false; +        quota_synctask_t        *args          = NULL; +        xlator_t                *this          = NULL; +        loc_t                   *loc           = NULL; +        int64_t                  contri        = 0; + +        GF_ASSERT (opaque); + +        args = (quota_synctask_t *) opaque; +        loc = &args->loc; +        contri = args->contri; +        this = args->this; +        THIS = this; + +        ret = mq_inode_ctx_get (loc->inode, this, &ctx); +        if (ret < 0) { +                gf_log_callingfn (this->name, GF_LOG_WARNING, "ctx for" +                                  " the node %s is NULL", loc->path); +                goto out; +        } + +        contribution = mq_get_contribution_node (loc->parent, ctx); +        if (contribution == NULL) { +                ret = -1; +                gf_log_callingfn (this->name, GF_LOG_WARNING, +                                  "contribution for the node %s is NULL", +                                  loc->path); +                goto out; +        } + +        if (contri >= 0) { +                /* contri paramater is supplied only for rename operation */ +                delta.size = contri; +                delta.file_count = 1; +                delta.dir_count = 0; +        } + +        ret = mq_inode_loc_fill (NULL, loc->parent, &parent_loc); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_ERROR, "loc fill failed"); +                goto out; +        } + +        ret = mq_lock (this, &parent_loc, F_WRLCK); +        if (ret < 0) +                goto out; +        locked = _gf_true; + +        if (contri < 0) { +                LOCK (&contribution->lock); +                { +                        delta.size = contribution->contribution; +                        delta.file_count = contribution->file_count; +                        delta.dir_count = contribution->dir_count; +                } +                UNLOCK (&contribution->lock); +        } + +        /* TODO: Handle handlinks with better approach +           Iterating dentry_list without a lock is not a good idea +        if (loc->inode->ia_type != IA_IFDIR) { +                list_for_each_entry (dentry, &inode->dentry_list, inode_list) { +                        if (loc->parent == dentry->parent) { +                                * If the file has another link within the same +                                * directory, we should not be reducing the size +                                * of parent +                                * +                                delta = 0; +                                idelta = 0; +                                break; +                        } +                } +        } +        */ + +        if (quota_meta_is_null (&delta)) +                goto out; + +        ret = mq_mark_dirty (this, &parent_loc, 1); +        if (ret < 0) +                goto out; +        dirty = _gf_true; + +        ret = mq_remove_contri (this, loc, contribution); +        if (ret < 0) +                goto out; + +        mq_sub_meta (&delta, NULL); +        ret = mq_update_size (this, &parent_loc, &delta); +        if (ret < 0) +                goto out; + +out: +        if (dirty && ret >= 0) +                ret = mq_mark_dirty (this, &parent_loc, 0); + +        if (locked) +                ret = mq_lock (this, &parent_loc, F_UNLCK); + +        if (ret >= 0) +                ret = mq_initiate_quota_blocking_txn (this, &parent_loc); + +        loc_wipe (&parent_loc); + +        return ret; +} + +int32_t +mq_reduce_parent_size_txn (xlator_t *this, loc_t *loc, int64_t contri) +{ +        int32_t                  ret           = -1;          GF_VALIDATE_OR_GOTO ("marker", this, out);          GF_VALIDATE_OR_GOTO ("marker", loc, out);          GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); +        ret = mq_synctask (this, mq_reduce_parent_size_task, _gf_true, loc, +                           NULL, NULL, contri); +out: +        return ret; +} + +int +mq_initiate_quota_task (void *opaque) +{ +        int32_t                 ret          = -1; +        quota_inode_ctx_t      *ctx          = NULL; +        inode_contribution_t   *contribution = NULL; +        quota_synctask_t       *args         = NULL; +        xlator_t               *this         = NULL; +        loc_t                  *loc          = NULL; + +        GF_ASSERT (opaque); + +        args = (quota_synctask_t *) opaque; +        loc = &args->loc; +        this = args->this; +        THIS = this; +          ret = mq_inode_ctx_get (loc->inode, this, &ctx);          if (ret == -1) {                  gf_log (this->name, GF_LOG_WARNING, @@ -2057,69 +3205,259 @@ mq_initiate_quota_txn (xlator_t *this, loc_t *loc)          */          contribution = mq_get_contribution_node (loc->parent, ctx);          if (!contribution) { -                if ((loc->path && strcmp (loc->path, "/")) -                    || (!uuid_is_null (loc->gfid) -                        && !__is_root_gfid (loc->gfid)) -                    || (loc->inode && !uuid_is_null (loc->inode->gfid) -                        && !__is_root_gfid (loc->inode->gfid))) +                if (!loc_is_root(loc))                          gf_log_callingfn (this->name, GF_LOG_TRACE,                                            "contribution node for the "                                            "path (%s) with parent (%s) "                                            "not found", loc->path, -                                          loc->parent? -                                          uuid_utoa (loc->parent->gfid): +                                          loc->parent ? +                                          uuid_utoa (loc->parent->gfid) :                                            NULL);                  contribution = mq_add_new_contribution_node (this, ctx, loc);                  if (!contribution) { -                        if(loc->path && strcmp (loc->path, "/")) +                        if (!loc_is_root(loc))                                  gf_log_callingfn (this->name, GF_LOG_WARNING,                                                    "could not allocate "                                                    " contribution node for (%s) "                                                    "parent: (%s)", loc->path, -                                                  loc->parent? -                                                  uuid_utoa (loc->parent->gfid): +                                                  loc->parent ? +                                                 uuid_utoa (loc->parent->gfid) :                                                    NULL);                          goto out;                  }          } -        /* To improve performance, do not start another transaction -         * if one is already in progress for same inode -         */ -        status = _gf_true; +        mq_start_quota_txn_v2 (this, loc, ctx, contribution); + +        ret = 0; +out: +        return ret; +} + +int +mq_initiate_quota_txn (xlator_t *this, loc_t *loc) +{ +        int32_t                 ret          = -1; + +        GF_VALIDATE_OR_GOTO ("marker", this, out); +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); + +        ret = mq_synctask (this, mq_initiate_quota_task, _gf_true, loc, NULL, +                           NULL, 0); +out: +        return ret; +} + +int +mq_initiate_quota_blocking_txn (xlator_t *this, loc_t *loc) +{ +        int32_t                 ret          = -1; + +        GF_VALIDATE_OR_GOTO ("marker", this, out); +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); + +        ret = mq_synctask (this, mq_initiate_quota_task, _gf_false, loc, NULL, +                           NULL, 0); +out: +        return ret; +} + +/* return 1 when dirty updation is performed + * return 0 other wise + */ +int32_t +mq_update_dirty_inode_v2 (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx, +                          inode_contribution_t *contribution) +{ +        int32_t          ret            = -1; +        fd_t            *fd             = NULL; +        off_t            offset         = 0; +        loc_t            child_loc      = {0, }; +        gf_dirent_t      entries; +        gf_dirent_t     *entry          = NULL; +        gf_boolean_t     status         = _gf_true; +        gf_boolean_t     locked         = _gf_false; +        gf_boolean_t     free_entries   = _gf_false; +        gf_boolean_t     updated        = _gf_false; +        int32_t          dirty          = 0; +        quota_meta_t     contri         = {0, }; +        quota_meta_t     size           = {0, }; +        quota_meta_t     contri_sum     = {0, }; +        quota_meta_t     delta          = {0, }; + +        GF_VALIDATE_OR_GOTO ("marker", loc, out); +        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out); + +        ret = mq_get_ctx_updation_status (ctx, &status); +        if (ret == -1 || status == _gf_true) { +                ret = 0; +                goto out; +        } + +        if (uuid_is_null (loc->gfid)) +                uuid_copy (loc->gfid, loc->inode->gfid); -        ret = mq_test_and_set_ctx_updation_status (ctx, &status); +        if (uuid_is_null (loc->gfid)) { +                ret = -1; +                gf_log (this->name, GF_LOG_DEBUG, "UUID is null for %s", +                        loc->path); +                goto out; +        } + +        ret = mq_lock (this, loc, F_WRLCK);          if (ret < 0)                  goto out; +        locked = _gf_true; -        if (status == _gf_false) { -                mq_start_quota_txn (this, loc, ctx, contribution); +        ret = mq_get_dirty (this, loc, &dirty); +        if (ret < 0 || dirty == 0) { +                ret = 0; +                goto out;          } -        ret = 0; +        fd = fd_create (loc->inode, 0); +        if (!fd) { +                gf_log (this->name, GF_LOG_ERROR, "Failed to create fd"); +                ret = -1; +                goto out; +        } + +        ret = syncop_opendir (this, loc, fd); +        if (ret < 0) { +                gf_log (this->name, (-ret == ENOENT || -ret == ESTALE) +                        ? GF_LOG_DEBUG:GF_LOG_ERROR, "opendir failed " +                        "for %s: %s", loc->path, strerror (-ret)); +                goto out; +        } + +        INIT_LIST_HEAD (&entries.list); +        while ((ret = syncop_readdirp (this, fd, 131072, offset, NULL, +                                       &entries)) != 0) { +                if (ret < 0) { +                        gf_log (this->name, (-ret == ENOENT || -ret == ESTALE) +                                ? GF_LOG_DEBUG:GF_LOG_ERROR, "readdirp failed " +                                "for %s: %s", loc->path, strerror (-ret)); +                        goto out; +                } + +                if (list_empty (&entries.list)) +                        break; + +                free_entries = _gf_true; +                list_for_each_entry (entry, &entries.list, list) { +                        offset = entry->d_off; + +                        if (!strcmp (entry->d_name, ".") || +                            !strcmp (entry->d_name, "..")) +                                continue; + +                        ret = loc_build_child (&child_loc, loc, entry->d_name); +                        if (ret < 0) { +                                gf_log (this->name, GF_LOG_WARNING, +                                        "Couldn't build loc for %s/%s " +                                        "returning from updation of dirty " +                                        "inode", loc->path, entry->d_name); +                                goto out; +                        } + +                        ret = mq_get_contri (this, &child_loc, &contri, +                                             loc->gfid); +                        if (ret < 0) +                                goto out; + +                        mq_add_meta (&contri_sum, &contri); +                        loc_wipe (&child_loc); +                } + +                gf_dirent_free (&entries); +                free_entries = _gf_false; +        } +        /* Inculde for self */ +        contri_sum.dir_count++; + +        ret = mq_get_size (this, loc, &size); +        if (ret < 0) +                goto out; + +        mq_compute_delta (&delta, &contri_sum, &size); + +        if (quota_meta_is_null (&delta)) +                goto out; + +        gf_log (this->name, GF_LOG_INFO, "calculated size = %"PRId64 +                ", original size = %"PRIu64 ", diff = %"PRIu64 +                ", path = %s ", contri_sum.size, size.size, delta.size, +                loc->path); + +        gf_log (this->name, GF_LOG_INFO, "calculated f_count = %"PRId64 +                ", original f_count = %"PRIu64 ", diff = %"PRIu64 +                ", path = %s ", contri_sum.file_count, size.file_count, +                delta.file_count, loc->path); + +        gf_log (this->name, GF_LOG_INFO, "calculated d_count = %"PRId64 +                ", original d_count = %"PRIu64 ", diff = %"PRIu64 +                ", path = %s ", contri_sum.dir_count, size.dir_count, +                delta.dir_count, loc->path); + + +        ret = mq_update_size (this, loc, &delta); +        if (ret < 0) +                goto out; + +        updated = _gf_true; +  out: -        return ret; -} +        if (free_entries) +                gf_dirent_free (&entries); +        if (fd) +                fd_unref (fd); + +        if (ret >= 0 && dirty) +                mq_mark_dirty (this, loc, 0); + +        if (locked) +                mq_lock (this, loc, F_UNLCK); +        if (status == _gf_false) +                mq_set_ctx_updation_status (ctx, _gf_false); +        loc_wipe(&child_loc); +        if (updated) +                return 1; +        else +                return 0; +}  int32_t -mq_inspect_directory_xattr (xlator_t *this, -                            loc_t *loc, -                            dict_t *dict, -                            struct iatt buf) +mq_inspect_directory_xattr_task (void *opaque)  { -        int32_t               ret                 = 0; -        int8_t                dirty               = -1; -        int64_t              *size                = NULL, size_int = 0; -        int64_t              *contri              = NULL, contri_int = 0; -        char                  contri_key [512]    = {0, }; -        gf_boolean_t          not_root            = _gf_false; -        quota_inode_ctx_t    *ctx                 = NULL; -        inode_contribution_t *contribution        = NULL; +        int32_t               ret                          = 0; +        int8_t                dirty                        = -1; +        quota_meta_t          size                         = {0, }; +        quota_meta_t          contri                       = {0, }; +        quota_meta_t          delta                        = {0, }; +        char                  contri_key[CONTRI_KEY_MAX]   = {0, }; +        quota_inode_ctx_t    *ctx                          = NULL; +        inode_contribution_t *contribution                 = NULL; +        quota_synctask_t     *args                         = NULL; +        xlator_t             *this                         = NULL; +        loc_t                *loc                          = NULL; +        dict_t               *dict                         = NULL; +        struct iatt           buf                          = {0,}; + +        GF_ASSERT (opaque); + +        args = (quota_synctask_t *) opaque; +        loc = &args->loc; +        dict = args->dict; +        buf = args->buf; +        this = args->this; +        THIS = this;          ret = mq_inode_ctx_get (loc->inode, this, &ctx);          if (ret < 0) { @@ -2132,7 +3470,7 @@ mq_inspect_directory_xattr (xlator_t *this,                  }          } -        if (!loc->path || (loc->path && strcmp (loc->path, "/") != 0)) { +        if (!loc_is_root(loc)) {                  contribution = mq_add_new_contribution_node (this, ctx, loc);                  if (contribution == NULL) {                          if (!uuid_is_null (loc->inode->gfid)) @@ -2144,76 +3482,96 @@ mq_inspect_directory_xattr (xlator_t *this,                  }          } -        ret = dict_get_bin (dict, QUOTA_SIZE_KEY, (void **) &size); +        ret = dict_get_int8 (dict, QUOTA_DIRTY_KEY, &dirty);          if (ret < 0)                  goto out; -        ret = dict_get_int8 (dict, QUOTA_DIRTY_KEY, &dirty); +        ret = mq_dict_get_meta (dict, QUOTA_SIZE_KEY, &size);          if (ret < 0)                  goto out; -        if ((loc->path && strcmp (loc->path, "/") != 0) -            || (!uuid_is_null (loc->gfid) && !__is_root_gfid (loc->gfid)) -            || (loc->inode && !uuid_is_null (loc->inode->gfid) && -                !__is_root_gfid (loc->inode->gfid))) { -                not_root = _gf_true; - +        if (!loc_is_root(loc)) {                  GET_CONTRI_KEY (contri_key, contribution->gfid, ret);                  if (ret < 0) -                        goto out; +                        goto err; -                ret = dict_get_bin (dict, contri_key, (void **) &contri); +                ret = mq_dict_get_meta (dict, contri_key, &contri);                  if (ret < 0)                          goto out;                  LOCK (&contribution->lock);                  { -                        contribution->contribution = ntoh64 (*contri); -                        contri_int = contribution->contribution; +                        contribution->contribution = contri.size; +                        contribution->file_count = contri.file_count; +                        contribution->dir_count = contri.dir_count;                  }                  UNLOCK (&contribution->lock);          }          LOCK (&ctx->lock);          { -                ctx->size = ntoh64 (*size); +                ctx->size = size.size; +                ctx->file_count = size.file_count; +                ctx->dir_count = size.dir_count;                  ctx->dirty = dirty; -                size_int = ctx->size;          }          UNLOCK (&ctx->lock); -        gf_log (this->name, GF_LOG_DEBUG, "size=%"PRId64 -                " contri=%"PRId64, size_int, contri_int); +        mq_compute_delta (&delta, &size, &contri); -        if (dirty) { -                ret = mq_update_dirty_inode (this, loc, ctx, contribution); -        } +        if (dirty) +                ret = mq_update_dirty_inode_v2 (this, loc, ctx, contribution); -        if ((!dirty || ret == 0) && (not_root == _gf_true) && -            (size_int != contri_int)) { -                mq_initiate_quota_txn (this, loc); -        } +        if ((!dirty || ret == 1) && +            !loc_is_root(loc) && +            !quota_meta_is_null (&delta)) +                mq_initiate_quota_blocking_txn (this, loc);          ret = 0;  out: -        if (ret) -                mq_set_inode_xattr (this, loc); +        if (ret < 0) +                ret = mq_create_xattrs_blocking_txn (this, loc); +  err:          return ret;  }  int32_t -mq_inspect_file_xattr (xlator_t *this, -                       loc_t *loc, -                       dict_t *dict, -                       struct iatt buf) +mq_inspect_directory_xattr_txn (xlator_t *this, loc_t *loc, dict_t *dict, +                                struct iatt buf)  { -        int32_t               ret              = -1; -        uint64_t              contri_int       = 0, size = 0; -        int64_t              *contri_ptr       = NULL; -        char                  contri_key [512] = {0, }; -        quota_inode_ctx_t    *ctx              = NULL; -        inode_contribution_t *contribution     = NULL; +        int32_t   ret = -1; + +        ret = mq_synctask (this, mq_inspect_directory_xattr_task, _gf_true, +                           loc, dict, &buf, 0); + +        return ret; +} + +int32_t +mq_inspect_file_xattr_task (void *opaque) +{ +        int32_t               ret                          = -1; +        quota_meta_t          size                         = {0, }; +        quota_meta_t          contri                       = {0, }; +        quota_meta_t          delta                        = {0, }; +        char                  contri_key[CONTRI_KEY_MAX]   = {0, }; +        quota_inode_ctx_t    *ctx                          = NULL; +        inode_contribution_t *contribution                 = NULL; +        quota_synctask_t     *args                         = NULL; +        xlator_t             *this                         = NULL; +        loc_t                *loc                          = NULL; +        dict_t               *dict                         = NULL; +        struct iatt           buf                          = {0,}; + +        GF_ASSERT (opaque); + +        args = (quota_synctask_t *) opaque; +        loc = &args->loc; +        dict = args->dict; +        buf = args->buf; +        this = args->this; +        THIS = this;          ret = mq_inode_ctx_get (loc->inode, this, &ctx);          if (ret < 0) { @@ -2230,103 +3588,111 @@ mq_inspect_file_xattr (xlator_t *this,          if (contribution == NULL) {                  gf_log_callingfn (this->name, GF_LOG_DEBUG, "cannot allocate "                                    "contribution node (path:%s)", loc->path); +                ret = -1;                  goto out;          }          LOCK (&ctx->lock);          {                  ctx->size = 512 * buf.ia_blocks; -                size = ctx->size; +                ctx->file_count = 1; +                ctx->dir_count = 0; + +                size.size = ctx->size; +                size.file_count = ctx->file_count; +                size.dir_count = ctx->dir_count;          }          UNLOCK (&ctx->lock);          list_for_each_entry (contribution, &ctx->contribution_head,                               contri_list) { +                  GET_CONTRI_KEY (contri_key, contribution->gfid, ret);                  if (ret < 0)                          continue; -                ret = dict_get_bin (dict, contri_key, (void **) &contri_int); -                if (ret == 0) { -                        contri_ptr = (int64_t *)(unsigned long)contri_int; - +                ret = mq_dict_get_meta (dict, contri_key, &contri); +                if (ret < 0) { +                        ret = mq_create_xattrs_blocking_txn (this, loc); +                } else {                          LOCK (&contribution->lock);                          { -                                contribution->contribution = ntoh64 (*contri_ptr); -                                contri_int = contribution->contribution; +                                contribution->contribution = contri.size; +                                contribution->file_count = contri.file_count; +                                contribution->dir_count = contri.dir_count;                          }                          UNLOCK (&contribution->lock); -                        gf_log (this->name, GF_LOG_DEBUG, -                                "size=%"PRId64 " contri=%"PRId64, size, contri_int); - -                        if (size != contri_int) { -                                mq_initiate_quota_txn (this, loc); +                        mq_compute_delta (&delta, &size, &contri); +                        if (!quota_meta_is_null (&delta)) { +                                mq_initiate_quota_blocking_txn (this, loc); +                                /* TODO: revist this code when fixing hardlinks +                                 */ +                                break;                          } -                } else { -                        if (size) -                                mq_initiate_quota_txn (this, loc); -                        else -                                mq_set_inode_xattr (this, loc);                  } + +                /* TODO: loc->parent might need to be assigned to corresponding +                 * contribution inode. We need to handle hard links here +                */          }  out: +          return ret;  }  int32_t -mq_xattr_state (xlator_t *this, -                loc_t *loc, -                dict_t *dict, -                struct iatt buf) +mq_inspect_file_xattr_txn (xlator_t *this, loc_t *loc, dict_t *dict, +                           struct iatt buf) +{ +        int32_t   ret = -1; + +        ret = mq_synctask (this, mq_inspect_file_xattr_task, _gf_true, +                           loc, dict, &buf, 0); + +        return ret; +} + +int32_t +mq_xattr_state (xlator_t *this, loc_t *loc, dict_t *dict, struct iatt buf)  {          if (((buf.ia_type == IA_IFREG) && !dht_is_linkfile (&buf, dict))              || (buf.ia_type == IA_IFLNK))  { -                mq_inspect_file_xattr (this, loc, dict, buf); +                mq_inspect_file_xattr_txn (this, loc, dict, buf);          } else if (buf.ia_type == IA_IFDIR) -                mq_inspect_directory_xattr (this, loc, dict, buf); +                mq_inspect_directory_xattr_txn (this, loc, dict, buf);          return 0;  }  int32_t -mq_req_xattr (xlator_t *this, -              loc_t *loc, -              dict_t *dict) +mq_req_xattr (xlator_t *this, loc_t *loc, dict_t *dict, +              char *contri_key)  {          int32_t               ret       = -1;          GF_VALIDATE_OR_GOTO ("marker", this, out); +        GF_VALIDATE_OR_GOTO ("marker", loc, out);          GF_VALIDATE_OR_GOTO ("marker", dict, out); -        if (!loc) -                goto set_size; - -        //if not "/" then request contribution -        if (loc->path && strcmp (loc->path, "/") == 0) -                goto set_size; - -        ret = mq_dict_set_contribution (this, dict, loc); -        if (ret == -1) -                goto out; +        if (!loc_is_root(loc)) { +                ret = mq_dict_set_contribution (this, dict, loc, NULL, +                                                contri_key); +                if (ret < 0) +                        goto out; +        } -set_size:          ret = dict_set_uint64 (dict, QUOTA_SIZE_KEY, 0); -        if (ret < 0) { -                ret = -1; +        if (ret < 0)                  goto out; -        }          ret = dict_set_int8 (dict, QUOTA_DIRTY_KEY, 0); -        if (ret < 0) { -                ret = -1; -                goto out; -        } - -        ret = 0;  out: +        if (ret < 0) +                gf_log_callingfn (this->name, GF_LOG_ERROR, "dict set failed"); +          return ret;  } @@ -2344,15 +3710,15 @@ int32_t  _mq_inode_remove_done (call_frame_t *frame, void *cookie, xlator_t *this,                         int32_t op_ret, int32_t op_errno, dict_t *xdata)  { -        int32_t        ret                = 0; -        char           contri_key [512]   = {0, }; -        quota_local_t *local              = NULL; -        inode_t       *inode              = NULL; -        dentry_t      *tmp                = NULL; -        gf_boolean_t  last_dentry         = _gf_true; -        loc_t         loc                 = {0, }; -        dentry_t      *other_dentry       = NULL; -        gf_boolean_t  remove              = _gf_false; +        int32_t        ret                           = 0; +        char           contri_key[CONTRI_KEY_MAX]    = {0, }; +        quota_local_t *local                         = NULL; +        inode_t       *inode                         = NULL; +        dentry_t      *tmp                           = NULL; +        gf_boolean_t  last_dentry                    = _gf_true; +        loc_t         loc                            = {0, }; +        dentry_t      *other_dentry                  = NULL; +        gf_boolean_t  remove                         = _gf_false;          local = (quota_local_t *) frame->local; @@ -2531,6 +3897,7 @@ mq_reduce_parent_size_xattr (call_frame_t *frame, void *cookie, xlator_t *this,          dict = dict_new ();          if (dict == NULL) { +                gf_log (this->name, GF_LOG_ERROR, "dict_new failed");                  ret = -1;                  goto err;          } diff --git a/xlators/features/marker/src/marker-quota.h b/xlators/features/marker/src/marker-quota.h index 42def9d22dc..fa132a815b7 100644 --- a/xlators/features/marker/src/marker-quota.h +++ b/xlators/features/marker/src/marker-quota.h @@ -21,7 +21,7 @@  #define QUOTA_XATTR_PREFIX "trusted.glusterfs"  #define QUOTA_DIRTY_KEY "trusted.glusterfs.quota.dirty" -#define CONTRIBUTION "contri" +#define CONTRIBUTION  "contri"  #define CONTRI_KEY_MAX 512  #define READDIR_BUF 4096 @@ -59,21 +59,21 @@                  ret = 0;                                \          } while (0); -#define GET_CONTRI_KEY(var, _gfid, _ret)              \ -        do {                                          \ -                if (_gfid != NULL) {                  \ -                        char _gfid_unparsed[40];      \ -                        uuid_unparse (_gfid, _gfid_unparsed);           \ -                        _ret = snprintf (var, CONTRI_KEY_MAX,           \ -                                         QUOTA_XATTR_PREFIX             \ +#define GET_CONTRI_KEY(var, _gfid, _ret)                                  \ +        do {                                                              \ +                if (_gfid != NULL) {                                      \ +                        char _gfid_unparsed[40];                          \ +                        uuid_unparse (_gfid, _gfid_unparsed);             \ +                        _ret = snprintf (var, CONTRI_KEY_MAX,             \ +                                         QUOTA_XATTR_PREFIX               \                                           ".%s.%s." CONTRIBUTION, "quota", \ -                                         _gfid_unparsed);               \ -                } else {                                                \ -                        _ret = snprintf (var, CONTRI_KEY_MAX,           \ -                                         QUOTA_XATTR_PREFIX             \ -                                         ".%s.." CONTRIBUTION, "quota"); \ -                }                                                       \ -        } while (0); +                                         _gfid_unparsed);                 \ +                } else {                                                  \ +                        _ret = snprintf (var, CONTRI_KEY_MAX,             \ +                                         QUOTA_XATTR_PREFIX               \ +                                         ".%s.." CONTRIBUTION, "quota");  \ +                }                                                         \ +        } while (0)  #define QUOTA_SAFE_INCREMENT(lock, var)         \          do {                                    \ @@ -84,6 +84,8 @@  struct quota_inode_ctx {          int64_t                size; +        int64_t                file_count; +        int64_t                dir_count;          int8_t                 dirty;          gf_boolean_t           updation_status;          gf_lock_t              lock; @@ -91,9 +93,28 @@ struct quota_inode_ctx {  };  typedef struct quota_inode_ctx quota_inode_ctx_t; +struct quota_meta { +        int64_t    size; +        int64_t    file_count; +        int64_t    dir_count; +}; +typedef struct quota_meta quota_meta_t; + +struct quota_synctask { +        xlator_t      *this; +        loc_t          loc; +        dict_t        *dict; +        struct iatt    buf; +        int64_t        contri; +        gf_boolean_t   is_static; +}; +typedef struct quota_synctask quota_synctask_t; +  struct inode_contribution {          struct list_head contri_list;          int64_t          contribution; +        int64_t          file_count; +        int64_t          dir_count;          uuid_t           gfid;    gf_lock_t lock;  }; @@ -103,7 +124,7 @@ int32_t  mq_get_lock_on_parent (call_frame_t *, xlator_t *);  int32_t -mq_req_xattr (xlator_t *, loc_t *, dict_t *); +mq_req_xattr (xlator_t *, loc_t *, dict_t *, char *);  int32_t  init_quota_priv (xlator_t *); @@ -117,6 +138,12 @@ mq_set_inode_xattr (xlator_t *, loc_t *);  int  mq_initiate_quota_txn (xlator_t *, loc_t *); +int +mq_initiate_quota_blocking_txn (xlator_t *, loc_t *); + +int +mq_create_xattrs_txn (xlator_t *this, loc_t *loc); +  int32_t  mq_dirty_inode_readdir (call_frame_t *, void *, xlator_t *,                          int32_t, int32_t, fd_t *, dict_t *); @@ -125,6 +152,9 @@ int32_t  mq_reduce_parent_size (xlator_t *, loc_t *, int64_t);  int32_t +mq_reduce_parent_size_txn (xlator_t *, loc_t *, int64_t); + +int32_t  mq_rename_update_newpath (xlator_t *, loc_t *);  int32_t diff --git a/xlators/features/marker/src/marker.c b/xlators/features/marker/src/marker.c index ad3aabda9ba..af7ec1f907f 100644 --- a/xlators/features/marker/src/marker.c +++ b/xlators/features/marker/src/marker.c @@ -607,7 +607,7 @@ marker_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          priv = this->private;          if (priv->feature_enabled & GF_QUOTA) -                mq_set_inode_xattr (this, &local->loc); +                mq_create_xattrs_txn (this, &local->loc);          if (priv->feature_enabled & GF_XTIME)                  marker_xtime_update_marks (this, local); @@ -681,7 +681,7 @@ marker_create_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          priv = this->private;          if (priv->feature_enabled & GF_QUOTA) -                mq_set_inode_xattr (this, &local->loc); +                mq_create_xattrs_txn (this, &local->loc);          if (priv->feature_enabled & GF_XTIME)                  marker_xtime_update_marks (this, local); @@ -827,7 +827,7 @@ marker_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          priv = this->private;          if (priv->feature_enabled & GF_QUOTA) -                mq_reduce_parent_size (this, &local->loc, -1); +                mq_reduce_parent_size_txn (this, &local->loc, -1);          if (priv->feature_enabled & GF_XTIME)                  marker_xtime_update_marks (this, local); @@ -896,7 +896,7 @@ marker_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          if (priv->feature_enabled & GF_QUOTA) {                  if (!local->skip_txn) -                        mq_reduce_parent_size (this, &local->loc, -1); +                        mq_reduce_parent_size_txn (this, &local->loc, -1);          }          if (priv->feature_enabled & GF_XTIME) @@ -977,7 +977,7 @@ marker_link_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          if (priv->feature_enabled & GF_QUOTA) {                  if (!local->skip_txn) -                        mq_set_inode_xattr (this, &local->loc); +                        mq_create_xattrs_txn (this, &local->loc);          } @@ -1065,10 +1065,11 @@ marker_rename_done (call_frame_t *frame, void *cookie, xlator_t *this,                          frame->root->unique);          } -        mq_reduce_parent_size (this, &oplocal->loc, oplocal->contribution); +        mq_reduce_parent_size_txn (this, &oplocal->loc, oplocal->contribution);          if (local->loc.inode != NULL) { -                mq_reduce_parent_size (this, &local->loc, local->contribution); +                mq_reduce_parent_size_txn (this, &local->loc, +                                           local->contribution);          }          newloc.inode = inode_ref (oplocal->loc.inode); @@ -1078,7 +1079,7 @@ marker_rename_done (call_frame_t *frame, void *cookie, xlator_t *this,                  newloc.name++;          newloc.parent = inode_ref (local->loc.parent); -        mq_set_inode_xattr (this, &newloc); +        mq_create_xattrs_txn (this, &newloc);          loc_wipe (&newloc); @@ -1181,13 +1182,13 @@ marker_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                     struct iatt *prenewparent, struct iatt *postnewparent,                     dict_t *xdata)  { -        marker_conf_t  *priv                 = NULL; -        marker_local_t *local                = NULL; -        marker_local_t *oplocal              = NULL; -        call_stub_t    *stub                 = NULL; -        int32_t         ret                  = 0; -        char            contri_key [512]     = {0, }; -        loc_t           newloc               = {0, }; +        marker_conf_t  *priv                            = NULL; +        marker_local_t *local                           = NULL; +        marker_local_t *oplocal                         = NULL; +        call_stub_t    *stub                            = NULL; +        int32_t         ret                             = 0; +        char            contri_key[CONTRI_KEY_MAX]      = {0, }; +        loc_t           newloc                          = {0, };          local = (marker_local_t *) frame->local; @@ -1284,10 +1285,11 @@ int32_t  marker_do_rename (call_frame_t *frame, void *cookie, xlator_t *this,                    int32_t op_ret, int32_t op_errno, dict_t *dict, dict_t *xdata)  { -        marker_local_t       *local           = NULL, *oplocal = NULL; -        char                  contri_key[512] = {0, }; -        int32_t               ret             = 0; -        int64_t              *contribution    = 0; +        marker_local_t       *local                      = NULL; +        marker_local_t       *oplocal                    = NULL; +        char                  contri_key[CONTRI_KEY_MAX] = {0, }; +        int32_t               ret                        = 0; +        int64_t              *contribution               = 0;          local = frame->local;          oplocal = local->oplocal; @@ -1336,10 +1338,11 @@ marker_get_newpath_contribution (call_frame_t *frame, void *cookie,                                   xlator_t *this, int32_t op_ret,                                   int32_t op_errno, dict_t *dict, dict_t *xdata)  { -        marker_local_t *local           = NULL, *oplocal = NULL; -        char            contri_key[512] = {0, }; -        int32_t         ret             = 0; -        int64_t        *contribution    = 0; +        marker_local_t *local                      = NULL; +        marker_local_t *oplocal                    = NULL; +        char            contri_key[CONTRI_KEY_MAX] = {0, }; +        int32_t         ret                        = 0; +        int64_t        *contribution               = 0;          local = frame->local;          oplocal = local->oplocal; @@ -1403,9 +1406,10 @@ marker_get_oldpath_contribution (call_frame_t *frame, void *cookie,                                   xlator_t *this, int32_t op_ret,                                   int32_t op_errno, dict_t *xdata)  { -        marker_local_t *local           = NULL, *oplocal = NULL; -        char            contri_key[512] = {0, }; -        int32_t         ret             = 0; +        marker_local_t *local                      = NULL; +        marker_local_t *oplocal                    = NULL; +        char            contri_key[CONTRI_KEY_MAX] = {0, }; +        int32_t         ret                        = 0;          local = frame->local;          oplocal = local->oplocal; @@ -1764,8 +1768,9 @@ marker_symlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          priv = this->private; -        if (priv->feature_enabled & GF_QUOTA) -                mq_set_inode_xattr (this, &local->loc); +        if (priv->feature_enabled & GF_QUOTA) { +                mq_create_xattrs_txn (this, &local->loc); +        }          if (priv->feature_enabled & GF_XTIME)                  marker_xtime_update_marks (this, local); @@ -1838,7 +1843,7 @@ marker_mknod_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          priv = this->private;          if ((priv->feature_enabled & GF_QUOTA) && (S_ISREG (local->mode))) { -                mq_set_inode_xattr (this, &local->loc); +                mq_create_xattrs_txn (this, &local->loc);          }          if (priv->feature_enabled & GF_XTIME) @@ -2706,7 +2711,7 @@ marker_lookup (call_frame_t *frame, xlator_t *this,                  goto err;          if ((priv->feature_enabled & GF_QUOTA) && xattr_req) -                mq_req_xattr (this, loc, xattr_req); +                mq_req_xattr (this, loc, xattr_req, NULL);  wind:          STACK_WIND (frame, marker_lookup_cbk, FIRST_CHILD(this),                      FIRST_CHILD(this)->fops->lookup, loc, xattr_req); @@ -2854,7 +2859,7 @@ marker_readdirp (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,                          loc.parent = local->loc.inode = inode_ref (fd->inode); -                        mq_req_xattr (this, &loc, dict); +                        mq_req_xattr (this, &loc, dict, NULL);                  }                  STACK_WIND (frame, marker_readdirp_cbk,  | 
