diff options
Diffstat (limited to 'xlators/storage')
| -rw-r--r-- | xlators/storage/bdb/src/bctx.c | 31 | ||||
| -rw-r--r-- | xlators/storage/bdb/src/bdb-ll.c | 1295 | ||||
| -rw-r--r-- | xlators/storage/bdb/src/bdb.c | 2108 | ||||
| -rw-r--r-- | xlators/storage/bdb/src/bdb.h | 108 | 
4 files changed, 2019 insertions, 1523 deletions
diff --git a/xlators/storage/bdb/src/bctx.c b/xlators/storage/bdb/src/bctx.c index fce78e95f..18f563fb3 100644 --- a/xlators/storage/bdb/src/bctx.c +++ b/xlators/storage/bdb/src/bctx.c @@ -75,16 +75,31 @@ bctx_table_prune (bctx_table_t *table)          list_for_each_entry_safe (del, tmp, &purge, list) {                  list_del_init (&del->list); -                if (del->dbp) { -                        ret = del->dbp->close (del->dbp, 0); +                if (del->primary) { +                        ret = del->primary->close (del->primary, 0);                          if (ret != 0) { -                                gf_log (table->this->name, GF_LOG_ERROR, -                                        "failed to close db on path (%s): %s", +                                gf_log (table->this->name, GF_LOG_DEBUG, +                                        "_BCTX_TABLE_PRUNE %s: %s " +                                        "(failed to close primary database)",                                          del->directory, db_strerror (ret));                          } else { -                                gf_log (table->this->name, GF_LOG_WARNING, -                                        "close db for path %s; " -                                        "table->lru_count = %d", +                                gf_log (table->this->name, GF_LOG_DEBUG, +                                        "_BCTX_TABLE_PRUNE %s (lru=%d)" +                                        "(closed primary database)", +                                        del->directory, table->lru_size); +                        } +                } +                if (del->secondary) { +                        ret = del->secondary->close (del->secondary, 0); +                        if (ret != 0) { +                                gf_log (table->this->name, GF_LOG_DEBUG, +                                        "_BCTX_TABLE_PRUNE %s: %s " +                                        "(failed to close secondary database)", +                                        del->directory, db_strerror (ret)); +                        } else { +                                gf_log (table->this->name, GF_LOG_DEBUG, +                                        "_BCTX_TABLE_PRUNE %s (lru=%d)" +                                        "(closed secondary database)",                                          del->directory, table->lru_size);                          }                  } @@ -130,7 +145,7 @@ __hash_bctx (bctx_t *bctx)  static inline bctx_t *  __bctx_passivate (bctx_t *bctx)  { -        if (bctx->dbp) { +        if (bctx->primary) {                  list_move_tail (&bctx->list, &(bctx->table->b_lru));                  bctx->table->lru_size++;          } else { diff --git a/xlators/storage/bdb/src/bdb-ll.c b/xlators/storage/bdb/src/bdb-ll.c index cd2d1ac49..59d431d82 100644 --- a/xlators/storage/bdb/src/bdb-ll.c +++ b/xlators/storage/bdb/src/bdb-ll.c @@ -20,6 +20,7 @@  #include <libgen.h>  #include "bdb.h"  #include <list.h> +#include "hashfn.h"  /*   * implement the procedures to interact with bdb */ @@ -31,22 +32,41 @@  ino_t  bdb_inode_transform (ino_t parent, -                     bctx_t *bctx) +                     const char *name, +                     size_t namelen)  { -        struct bdb_private *private = NULL;          ino_t               ino = -1; +        uint64_t            hash = 0; -        GF_VALIDATE_OR_GOTO ("bdb-ll", bctx, out); +        hash = gf_dm_hashfn (name, namelen); -        private = bctx->table->this->private; +        ino = (((parent << 32) | 0x00000000ffffffff) +               & (hash | 0xffffffff00000000)); -        LOCK (&private->ino_lock); -        ino = ++private->next_ino; -        UNLOCK (&private->ino_lock); -out:          return ino;  } +static int +bdb_generate_secondary_hash (DB *secondary, +                             const DBT *pkey, +                             const DBT *data, +                             DBT *skey) +{ +        char *primary = NULL; +        uint32_t *hash = NULL; + +        primary = pkey->data; + +        hash = calloc (1, sizeof (uint32_t)); + +        *hash = gf_dm_hashfn (primary, pkey->size); + +        skey->data = hash; +        skey->size = sizeof (hash); +        skey->flags = DB_DBT_APPMALLOC; + +        return 0; +}  /***********************************************************   * @@ -63,13 +83,13 @@ out:   *      if (no-empty-slots), then prune open dbs and close as many as possible   *      if (empty-slot-available), tika muchkonDu db open maaDu   * - * NOTE: illi baro munche lock hiDkobEku   */ -static DB * +static int  bdb_db_open (bctx_t *bctx)  { -        DB *storage_dbp = NULL; -        int32_t op_ret = -1; +        DB *primary   = NULL; +        DB *secondary = NULL; +        int32_t ret = -1;          bctx_table_t *table = NULL;          GF_VALIDATE_OR_GOTO ("bdb-ll", bctx, out); @@ -78,51 +98,94 @@ bdb_db_open (bctx_t *bctx)          GF_VALIDATE_OR_GOTO ("bdb-ll", table, out);          /* we have to do the following, we can't deny someone of db_open ;) */ -        op_ret = db_create (&storage_dbp, table->dbenv, 0); -        if (op_ret != 0) { -                gf_log ("bdb-ll", GF_LOG_ERROR, -                        "failed to do db_create for directory %s (%s)", -                        bctx->directory, db_strerror (op_ret)); -                storage_dbp = NULL; +        ret = db_create (&primary, table->dbenv, 0); +        if (ret < 0) { +                gf_log ("bdb-ll", GF_LOG_DEBUG, +                        "_BDB_DB_OPEN %s: %s (failed to create database object" +                        " for primary database)", +                        bctx->directory, db_strerror (ret)); +                ret = -ENOMEM;                  goto out;          }          if (table->page_size) { -                op_ret = storage_dbp->set_pagesize (storage_dbp, -                                                    table->page_size); -                if (op_ret != 0) { -                        gf_log ("bdb-ll", GF_LOG_ERROR, -                                "failed to set the page_size (%"PRIu64") for " -                                "directory %s (%s)", -                                table->page_size, bctx->directory, -                                db_strerror (op_ret)); -                } else { +                ret = primary->set_pagesize (primary, +                                             table->page_size); +                if (ret < 0) {                          gf_log ("bdb-ll", GF_LOG_DEBUG, -                                "page-size (%"PRIu64") set on DB", +                                "_BDB_DB_OPEN %s: %s (failed to set page-size " +                                "to %"PRIu64")", +                                bctx->directory, db_strerror (ret),                                  table->page_size); +                } else { +                        gf_log ("bdb-ll", GF_LOG_DEBUG, +                                "_BDB_DB_OPEN %s: page-size set to %"PRIu64, +                                bctx->directory, table->page_size);                  }          } -        op_ret = storage_dbp->open (storage_dbp, -                                    NULL, -                                    bctx->db_path, -                                    NULL, -                                    table->access_mode, -                                    table->dbflags, -                                    0); -        if (op_ret != 0 ) { -                gf_log ("bdb-ll", -                        GF_LOG_ERROR, -                        "failed to open storage-db for directory %s (%s)", -                        bctx->db_path, db_strerror (op_ret)); -                storage_dbp = NULL; +        ret = primary->open (primary, NULL, bctx->db_path, "primary", +                             table->access_mode, table->dbflags, 0); +        if (ret < 0) { +                gf_log ("bdb-ll", GF_LOG_ERROR, +                        "_BDB_DB_OPEN %s: %s " +                        "(failed to open primary database)", +                        bctx->directory, db_strerror (ret)); +                ret = -1; +                goto cleanup; +        } + +        ret = db_create (&secondary, table->dbenv, 0); +        if (ret < 0) { +                gf_log ("bdb-ll", GF_LOG_DEBUG, +                        "_BDB_DB_OPEN %s: %s (failed to create database object" +                        " for secondary database)", +                        bctx->directory, db_strerror (ret)); +                ret = -ENOMEM; +                goto cleanup; +        } + +        ret = secondary->open (secondary, NULL, bctx->db_path, "secondary", +                               table->access_mode, table->dbflags, 0); +        if (ret != 0 ) { +                gf_log ("bdb-ll", GF_LOG_ERROR, +                        "_BDB_DB_OPEN %s: %s " +                        "(failed to open secondary database)", +                        bctx->directory, db_strerror (ret)); +                ret = -1; +                goto cleanup; +        } + +        ret = primary->associate (primary, NULL, secondary, +                                  bdb_generate_secondary_hash, +#ifdef DB_IMMUTABLE_KEY +                                  DB_IMMUTABLE_KEY); +#else +                                  0); +#endif +        if (ret != 0 ) { +                gf_log ("bdb-ll", GF_LOG_ERROR, +                        "_BDB_DB_OPEN %s: %s " +                        "(failed to associate primary database with " +                        "secondary database)", +                        bctx->directory, db_strerror (ret)); +                ret = -1; +                goto cleanup;          }  out: -        return storage_dbp; -} +        bctx->primary = primary; +        bctx->secondary = secondary; +        return ret; +cleanup: +        if (primary) +                primary->close (primary, 0); +        if (secondary) +                secondary->close (secondary, 0); +        return ret; +}  int32_t  bdb_cursor_close (bctx_t *bctx, @@ -140,10 +203,10 @@ bdb_cursor_close (bctx_t *bctx,  #else                  ret = cursorp->c_close (cursorp);  #endif -                if ((ret != 0)) { -                        gf_log ("bdb-ll", GF_LOG_ERROR, -                                "failed to close db cursor for directory " -                                "%s (%s)", +                if (ret < 0) { +                        gf_log ("bdb-ll", GF_LOG_DEBUG, +                                "_BDB_CURSOR_CLOSE %s: %s " +                                "(failed to close database cursor)",                                  bctx->directory, db_strerror (ret));                  }          } @@ -165,27 +228,30 @@ bdb_cursor_open (bctx_t *bctx,          LOCK (&bctx->lock);          { -                if (bctx->dbp) { +                if (bctx->secondary) {                          /* do nothing, just continue */                          ret = 0;                  } else { -                        bctx->dbp = bdb_db_open (bctx); -                        if (!bctx->dbp) { -                                gf_log ("bdb-ll", GF_LOG_ERROR, -                                        "failed to open storage db for %s", +                        ret = bdb_db_open (bctx); +                        if (ret < 0) { +                                gf_log ("bdb-ll", GF_LOG_DEBUG, +                                        "_BDB_CURSOR_OPEN %s: ENOMEM " +                                        "(failed to open secondary database)",                                          bctx->directory); -                                ret = -1; +                                ret = -ENOMEM;                          } else {                                  ret = 0;                          }                  }                  if (ret == 0) { -                        /* all set, lets open cursor */ -                        ret = bctx->dbp->cursor (bctx->dbp, NULL, cursorpp, 0); -                        if (ret != 0) { -                                gf_log ("bdb-ll", GF_LOG_ERROR, -                                        "failed to create a cursor for %s (%s)", +                        /* all set, open cursor */ +                        ret = bctx->secondary->cursor (bctx->secondary, +                                                       NULL, cursorpp, 0); +                        if (ret < 0) { +                                gf_log ("bdb-ll", GF_LOG_DEBUG, +                                        "_BDB_CURSOR_OPEN %s: %s " +                                        "(failed to open a cursor to database)",                                          bctx->directory, db_strerror (ret));                          }                  } @@ -245,27 +311,37 @@ bdb_cache_insert (bctx_t *bctx,                          /* FIXME: ugly, not supposed to disect any of the                           * 'struct list_head' directly */                          if (!list_empty (&bctx->c_list)) { -                                bcache = list_entry (bctx->c_list.prev, bdb_cache_t, c_list); +                                bcache = list_entry (bctx->c_list.prev, +                                                     bdb_cache_t, c_list);                                  list_del_init (&bcache->c_list);                          }                          if (bcache->key) {                                  free (bcache->key); -                                bcache->key = strdup ((char *)key->data); -                                GF_VALIDATE_OR_GOTO ("bdb-ll", bcache->key, unlock); +                                bcache->key = calloc (key->size + 1, +                                                      sizeof (char)); +                                GF_VALIDATE_OR_GOTO ("bdb-ll", +                                                     bcache->key, unlock); +                                memcpy (bcache->key, (char *)key->data, +                                        key->size);                          } else {                                  /* should never come here */ -                                gf_log ("bdb-ll", GF_LOG_CRITICAL, -                                        "bcache->key (null)"); +                                gf_log ("bdb-ll", GF_LOG_DEBUG, +                                        "_BDB_CACHE_INSERT %s (%s) " +                                        "(found a cache entry with empty key)", +                                        bctx->directory, (char *)key->data);                          } /* if(bcache->key)...else */                          if (bcache->data) {                                  free (bcache->data);                                  bcache->data = memdup (data->data, data->size); -                                GF_VALIDATE_OR_GOTO ("bdb-ll", bcache->data, unlock); +                                GF_VALIDATE_OR_GOTO ("bdb-ll", bcache->data, +                                                     unlock);                                  bcache->size = data->size;                          } else {                                  /* should never come here */                                  gf_log ("bdb-ll", GF_LOG_CRITICAL, -                                        "bcache->data (null)"); +                                        "_BDB_CACHE_INSERT %s (%s) " +                                        "(found a cache entry with no data)", +                                        bctx->directory, (char *)key->data);                          } /* if(bcache->data)...else */                          list_add (&bcache->c_list, &bctx->c_list);                          ret = 0; @@ -273,10 +349,14 @@ bdb_cache_insert (bctx_t *bctx,                          /* we will be entering here very rarely */                          bcache = CALLOC (1, sizeof (*bcache));                          GF_VALIDATE_OR_GOTO ("bdb-ll", bcache, unlock); -                        bcache->key = strdup ((char *)(key->data)); + +                        bcache->key = calloc (key->size + 1, sizeof (char));                          GF_VALIDATE_OR_GOTO ("bdb-ll", bcache->key, unlock); +                        memcpy (bcache->key, key->data, key->size); +                          bcache->data = memdup (data->data, data->size);                          GF_VALIDATE_OR_GOTO ("bdb-ll", bcache->data, unlock); +                          bcache->size = data->size;                          list_add (&bcache->c_list, &bctx->c_list);                          bctx->c_count++; @@ -291,7 +371,7 @@ out:  static int32_t  bdb_cache_delete (bctx_t *bctx, -                  char *key) +                  const char *key)  {          bdb_cache_t *bcache = NULL;          bdb_cache_t *trav   = NULL; @@ -333,12 +413,12 @@ bdb_db_stat (bctx_t *bctx,          LOCK (&bctx->lock);          { -                if (bctx->dbp == NULL) { -                        bctx->dbp = bdb_db_open (bctx); -                        storage = bctx->dbp; +                if (bctx->primary == NULL) { +                        ret = bdb_db_open (bctx); +                        storage = bctx->primary;                  } else {                          /* we are just fine, lets continue */ -                        storage = bctx->dbp; +                        storage = bctx->primary;                  } /* if(bctx->dbp==NULL)...else */          }          UNLOCK (&bctx->lock); @@ -347,46 +427,48 @@ bdb_db_stat (bctx_t *bctx,          ret = storage->stat (storage, txnid, &stat, flags); -        if (ret != 0) { -                gf_log ("bdb-ll", GF_LOG_ERROR, -                        "failed to do DB->stat() on db file %s: %s", -                        bctx->db_path, db_strerror (ret)); -        } else { +        if (ret < 0) {                  gf_log ("bdb-ll", GF_LOG_DEBUG, -                        "successfully called DB->stat() on db file %s", -                        bctx->db_path); +                        "_BDB_DB_STAT %s: %s " +                        "(failed to do stat database)", +                        bctx->directory, db_strerror (ret));          }  out:          return stat;  } -/* bdb_storage_get - retrieve a key/value pair corresponding to @path from the corresponding - *                   db file. +/* bdb_storage_get - retrieve a key/value pair corresponding to @path from the + *  corresponding db file.   * - * @bctx: bctx_t * corresponding to the parent directory of @path. (should always be a valid - *        bctx).  bdb_storage_get should never be called if @bctx = NULL. - * @txnid: NULL if bdb_storage_get is not embedded in an explicit transaction or a valid - *         DB_TXN *, when embedded in an explicit transaction. - * @path: path of the file to read from (translated to a database key using MAKE_KEY_FROM_PATH) - * @buf: char ** - pointer to a pointer to char. a read buffer is created in this procedure - *       and pointer to the buffer is passed through @buf to the caller. + * @bctx: bctx_t * corresponding to the parent directory of @path. (should + *  always be a valid bctx).  bdb_storage_get should never be called if + *  @bctx = NULL. + * @txnid: NULL if bdb_storage_get is not embedded in an explicit transaction + *  or a valid DB_TXN *, when embedded in an explicit transaction. + * @path: path of the file to read from (translated to a database key using + *  MAKE_KEY_FROM_PATH) + * @buf: char ** - pointer to a pointer to char. a read buffer is created in + *  this procedure and pointer to the buffer is passed through @buf to the + *  caller.   * @size: size of the file content to be read.   * @offset: offset from which the file content to be read.   * - * NOTE: bdb_storage_get tries to open DB, if @bctx->dbp == NULL (@bctx->dbp == NULL, - *      nobody has opened DB till now or DB was closed by bdb_table_prune()). + * NOTE: bdb_storage_get tries to open DB, if @bctx->dbp == NULL + *  (@bctx->dbp == NULL, nobody has opened DB till now or DB was closed by + *  bdb_table_prune()).   * - * NOTE: if private->cache is set (bdb xlator's internal caching enabled), then bdb_storage_get - *      first looks up the cache for key/value pair. if bdb_lookup_cache fails, then only - *      DB->get() is called. also,  inserts a newly read key/value pair to cache through - *      bdb_insert_to_cache. + * NOTE: if private->cache is set (bdb xlator's internal caching enabled), then + *  bdb_storage_get first looks up the cache for key/value pair. if + *  bdb_lookup_cache fails, then only DB->get() is called. also,  inserts a + *  newly read key/value pair to cache through bdb_insert_to_cache.   *   * return: 'number of bytes read' on success or -1 on error.   * - * also see: bdb_lookup_cache, bdb_insert_to_cache for details about bdb xlator's internal cache. + * also see: bdb_lookup_cache, bdb_insert_to_cache for details about bdb + *  xlator's internal cache.   */ -int32_t +static int32_t  bdb_db_get (bctx_t *bctx,              DB_TXN *txnid,              const char *path, @@ -420,12 +502,12 @@ bdb_db_get (bctx_t *bctx,          } else {                  LOCK (&bctx->lock);                  { -                        if (bctx->dbp == NULL) { -                                bctx->dbp = bdb_db_open (bctx); -                                storage = bctx->dbp; +                        if (bctx->primary == NULL) { +                                ret = bdb_db_open (bctx); +                                storage = bctx->primary;                          } else {                                  /* we are just fine, lets continue */ -                                storage = bctx->dbp; +                                storage = bctx->primary;                          } /* if(bctx->dbp==NULL)...else */                  }                  UNLOCK (&bctx->lock); @@ -457,22 +539,25 @@ bdb_db_get (bctx_t *bctx,                          if (ret == DB_NOTFOUND) {                                  gf_log ("bdb-ll", GF_LOG_DEBUG, -                                        "failed to do DB->get() for key: %s." -                                        " key not found in storage DB", -                                        key_string); +                                        "_BDB_DB_GET %s - %s: ENOENT" +                                        "(specified key not found in database)", +                                        bctx->directory, key_string);                                  ret = -1;                                  need_break = 1;                          } else if (ret == DB_LOCK_DEADLOCK) {                                  retries++; -                                gf_log ("bdb-ll", GF_LOG_ERROR, -                                        "deadlock detected in DB->put. retrying" -                                        " DB->put (%d)", retries); -                        }else if (ret == 0) { +                                gf_log ("bdb-ll", GF_LOG_DEBUG, +                                        "_BDB_DB_GET %s - %s" +                                        "(deadlock detected, retrying for %d " +                                        "time)", +                                        bctx->directory, key_string, retries); +                        } else if (ret == 0) {                                  /* successfully read data, lets set everything                                   * in place and return */                                  if (buf) {                                          *buf = CALLOC (1, value.size); -                                        ERR_ABORT (*buf); +                                        GF_VALIDATE_OR_GOTO ("bdb-ll", +                                                             *buf, out);                                          memcpy (*buf, value.data, value.size);                                  }                                  ret = value.size; @@ -481,10 +566,12 @@ bdb_db_get (bctx_t *bctx,                                  free (value.data);                                  need_break = 1;                          } else { -                                gf_log ("bdb-ll", -                                        GF_LOG_ERROR, -                                        "failed to do DB->get() for key %s: %s", -                                        key_string, db_strerror (ret)); +                                gf_log ("bdb-ll", GF_LOG_DEBUG, +                                        "_BDB_DB_GET %s - %s: %s" +                                        "(failed to retrieve specified key from" +                                        " database)", +                                        bctx->directory, key_string, +                                        db_strerror (ret));                                  ret = -1;                                  need_break = 1;                          } @@ -494,6 +581,19 @@ out:          return ret;  }/* bdb_db_get */ +/* TODO: handle errors here and log. propogate only the errno to caller */ +int32_t +bdb_db_fread (struct bdb_fd *bfd, char **buf, size_t size, off_t offset) +{ +        return bdb_db_get (bfd->ctx, NULL, bfd->key, buf, size, offset); +} + +int32_t +bdb_db_iread (struct bdb_ctx *bctx, const char *key, char **buf) +{ +        return bdb_db_get (bctx, NULL, key, buf, 0, 0); +} +  /* bdb_storage_put - insert a key/value specified to the corresponding DB.   *   * @bctx: bctx_t * corresponding to the parent directory of @path. @@ -519,7 +619,7 @@ out:   * also see: bdb_cache_delete for details on how a cached key/value pair is   * removed.   */ -int32_t +static int32_t  bdb_db_put (bctx_t *bctx,              DB_TXN *txnid,              const char *key_string, @@ -537,12 +637,12 @@ bdb_db_put (bctx_t *bctx,          LOCK (&bctx->lock);          { -                if (bctx->dbp == NULL) { -                        bctx->dbp = bdb_db_open (bctx); -                        storage = bctx->dbp; +                if (bctx->primary == NULL) { +                        ret = bdb_db_open (bctx); +                        storage = bctx->primary;                  } else {                          /* we are just fine, lets continue */ -                        storage = bctx->dbp; +                        storage = bctx->primary;                  }          }          UNLOCK (&bctx->lock); @@ -582,15 +682,16 @@ bdb_db_put (bctx_t *bctx,                  ret = storage->put (storage, txnid, &key, &value, db_flags);                  if (ret == DB_LOCK_DEADLOCK) {                          retries++; -                        gf_log ("bdb-ll", GF_LOG_ERROR, -                                "deadlock detected in DB->put. " -                                "retrying DB->put (%d)", -                                retries); +                        gf_log ("bdb-ll", GF_LOG_DEBUG, +                                "_BDB_DB_PUT %s - %s" +                                "(deadlock detected, retying for %d time)", +                                bctx->directory, key_string, retries);                  } else if (ret) {                          /* write failed */ -                        gf_log ("bdb-ll", GF_LOG_ERROR, -                                "failed to do DB->put() for key %s: %s", -                                key_string, db_strerror (ret)); +                        gf_log ("bdb-ll", GF_LOG_DEBUG, +                                "_BDB_DB_PUT %s - %s: %s" +                                "(failed to put specified entry into database)", +                                bctx->directory, key_string, db_strerror (ret));                          need_break = 1;                  } else {                          /* successfully wrote */ @@ -602,44 +703,68 @@ out:          return ret;  }/* bdb_db_put */ +int32_t +bdb_db_icreate (struct bdb_ctx *bctx, const char *key) +{ +        return bdb_db_put (bctx, NULL, key, NULL, 0, 0, 0); +} + +/* TODO: handle errors here and log. propogate only the errno to caller */ +int32_t +bdb_db_fwrite (struct bdb_fd *bfd, char *buf, size_t size, off_t offset) +{ +        return bdb_db_put (bfd->ctx, NULL, bfd->key, buf, size, offset, 0); +} + +/* TODO: handle errors here and log. propogate only the errno to caller */ +int32_t +bdb_db_iwrite (struct bdb_ctx *bctx, const char *key, char *buf, size_t size) +{ +        return bdb_db_put (bctx, NULL, key, buf, size, 0, 0); +} + +int32_t +bdb_db_itruncate (struct bdb_ctx *bctx, const char *key) +{ +        return bdb_db_put (bctx, NULL, key, NULL, 0, 1, 0); +} -/* bdb_storage_del - delete a key/value pair corresponding to @path from corresponding db file. +/* bdb_storage_del - delete a key/value pair corresponding to @path from + *  corresponding db file.   *   * @bctx: bctx_t * corresponding to the parent directory of @path.   *       (should always be a valid bctx). bdb_storage_del should never be called   *       if @bctx = NULL. - * @txnid: NULL if bdb_storage_del is not embedded in an explicit transaction or a - *         valid DB_TXN *, when embedded in an explicit transaction. + * @txnid: NULL if bdb_storage_del is not embedded in an explicit transaction + *   or a valid DB_TXN *, when embedded in an explicit transaction.   * @path: path to the file, whose key/value pair has to be deleted.   * - * NOTE: bdb_storage_del tries to open DB, if @bctx->dbp == NULL (@bctx->dbp == NULL, - *      nobody has opened DB till now or DB was closed by bdb_table_prune()). + * NOTE: bdb_storage_del tries to open DB, if @bctx->dbp == NULL + *  (@bctx->dbp == NULL, nobody has opened DB till now or DB was closed by + *  bdb_table_prune()).   *   * return: 0 on success or -1 on error.   */ -int32_t +static int32_t  bdb_db_del (bctx_t *bctx,              DB_TXN *txnid, -            const char *path) +            const char *key_string)  {          DB     *storage    = NULL;          DBT     key        = {0,}; -        char   *key_string = NULL;          int32_t ret        = -1;          int32_t db_flags   = 0;          uint8_t need_break = 0;          int32_t retries    = 1; -        MAKE_KEY_FROM_PATH (key_string, path); -          LOCK (&bctx->lock);          { -                if (bctx->dbp == NULL) { -                        bctx->dbp = bdb_db_open (bctx); -                        storage = bctx->dbp; +                if (bctx->primary == NULL) { +                        ret = bdb_db_open (bctx); +                        storage = bctx->primary;                  } else {                          /* we are just fine, lets continue */ -                        storage = bctx->dbp; +                        storage = bctx->primary;                  }          }          UNLOCK (&bctx->lock); @@ -649,7 +774,7 @@ bdb_db_del (bctx_t *bctx,          ret = bdb_cache_delete (bctx, key_string);          GF_VALIDATE_OR_GOTO ("bdb-ll", (ret == 0), out); -        key.data = key_string; +        key.data = (char *)key_string;          key.size = strlen (key_string);          key.flags = DB_DBT_USERMEM; @@ -658,26 +783,30 @@ bdb_db_del (bctx_t *bctx,                  if (ret == DB_NOTFOUND) {                          gf_log ("bdb-ll", GF_LOG_DEBUG, -                                "failed to delete %s from storage db, " -                                "doesn't exist in storage DB", -                                path); +                                "_BDB_DB_DEL %s - %s: ENOENT" +                                "(failed to delete entry, could not be " +                                "found in the database)", +                                bctx->directory, key_string);                          need_break = 1;                  } else if (ret == DB_LOCK_DEADLOCK) {                          retries++; -                        gf_log ("bdb-ll", GF_LOG_ERROR, -                                "deadlock detected in DB->put. " -                                "retrying DB->put (%d)", -                                retries); -                }else if (ret == 0) { +                        gf_log ("bdb-ll", GF_LOG_DEBUG, +                                "_BDB_DB_DEL %s - %s" +                                "(deadlock detected, retying for %d time)", +                                bctx->directory, key_string, retries); +                } else if (ret == 0) {                          /* successfully deleted the entry */                          gf_log ("bdb-ll", GF_LOG_DEBUG, -                                "deleted %s from storage db", path); +                                "_BDB_DB_DEL %s - %s" +                                "(successfully deleted entry from database)", +                                bctx->directory, key_string);                          ret = 0;                          need_break = 1;                  } else { -                        gf_log ("bdb-ll", GF_LOG_ERROR, -                                "failed to delete %s from storage db: %s", -                                path, db_strerror (ret)); +                        gf_log ("bdb-ll", GF_LOG_DEBUG, +                                "_BDB_DB_DEL %s - %s: %s" +                                "(failed to delete entry from database)", +                                bctx->directory, key_string, db_strerror (ret));                          ret = -1;                          need_break = 1;                  } @@ -686,11 +815,18 @@ out:          return ret;  } +int32_t +bdb_db_iremove (bctx_t *bctx, +                const char *key) +{ +        return bdb_db_del (bctx, NULL, key); +} +  /* NOTE: bdb version compatibility wrapper */  int32_t  bdb_cursor_get (DBC *cursorp, -                DBT *key, -                DBT *value, +                DBT *sec, DBT *pri, +                DBT *val,                  int32_t flags)  {          int32_t ret = -1; @@ -698,21 +834,21 @@ bdb_cursor_get (DBC *cursorp,          GF_VALIDATE_OR_GOTO ("bdb-ll", cursorp, out);  #ifdef HAVE_BDB_CURSOR_GET -        ret = cursorp->get (cursorp, key, value, flags); +        ret = cursorp->pget (cursorp, sec, pri, val, flags);  #else -        ret = cursorp->c_get (cursorp, key, value, flags); +        ret = cursorp->c_pget (cursorp, sec, pri, val, flags);  #endif          if ((ret != 0)  && (ret != DB_NOTFOUND)) { -                gf_log ("bdb-ll", GF_LOG_ERROR, -                        "failed to CURSOR->get() for key %s (%s)", -                        (char *)key->data, db_strerror (ret)); +                gf_log ("bdb-ll", GF_LOG_DEBUG, +                        "_BDB_CURSOR_GET: %s" +                        "(failed to retrieve entry from database cursor)", +                        db_strerror (ret));          }  out:          return ret;  }/* bdb_cursor_get */ -  int32_t  bdb_dirent_size (DBT *key)  { @@ -720,29 +856,6 @@ bdb_dirent_size (DBT *key)  } -/* bdb_extract_bfd - translate a fd_t to a bfd (either a 'struct bdb_bfd' or 'struct bdb_dir') - * - * @fd->ctx is with bdb specific file handle during a successful bdb_open (also bdb_create) - *  or bdb_opendir. - * - * return: 'struct bdb_bfd *' or 'struct bdb_dir *' on success, or NULL on failure. - */ -inline void * -bdb_extract_bfd (fd_t *fd, -                 xlator_t *this) -{ -        uint64_t tmp_bfd = 0; -        void    *bfd     = NULL; - -        GF_VALIDATE_OR_GOTO ("bdb-ll", fd, out); -        GF_VALIDATE_OR_GOTO ("bdb-ll", this, out); - -        fd_ctx_get (fd, this, &tmp_bfd); -        bfd = (void *)(long)bfd; - -out: -        return bfd; -}  /* bdb_dbenv_init - initialize DB_ENV   * @@ -751,10 +864,10 @@ out:   *      NOTE: see private->envflags for flags used.   *   2. DB_ENV->set_lg_dir - set log directory to be used for storing log files   *     (log files are the files in which transaction logs are written by db). - *   3. DB_ENV->set_flags (DB_LOG_AUTOREMOVE) - set DB_ENV to automatically clear - *      the unwanted log files (flushed at each checkpoint). - *   4. DB_ENV->set_errfile - set errfile to be used by db to report detailed error logs. - *     used only for debbuging purpose. + *   3. DB_ENV->set_flags (DB_LOG_AUTOREMOVE) - set DB_ENV to automatically + *      clear the unwanted log files (flushed at each checkpoint). + *   4. DB_ENV->set_errfile - set errfile to be used by db to report detailed + *      error logs. used only for debbuging purpose.   *   * return: returns a valid DB_ENV * on success or NULL on error.   * @@ -769,55 +882,49 @@ bdb_dbenv_init (xlator_t *this,          bdb_private_t *private     = NULL;          int32_t        fatal_flags = 0; -        VALIDATE_OR_GOTO (this, out); -        VALIDATE_OR_GOTO (directory, out); +        VALIDATE_OR_GOTO (this, err); +        VALIDATE_OR_GOTO (directory, err);          private = this->private; -        VALIDATE_OR_GOTO (private, out); +        VALIDATE_OR_GOTO (private, err);          ret = db_env_create (&dbenv, 0); -        VALIDATE_OR_GOTO ((ret == 0), out); +        VALIDATE_OR_GOTO ((ret == 0), err);          /* NOTE: set_errpfx returns 'void' */          dbenv->set_errpfx(dbenv, this->name);          ret = dbenv->set_lk_detect (dbenv, DB_LOCK_DEFAULT); -        VALIDATE_OR_GOTO ((ret == 0), out); +        VALIDATE_OR_GOTO ((ret == 0), err);          ret = dbenv->open(dbenv, directory,                            private->envflags,                            S_IRUSR | S_IWUSR);          if ((ret != 0) && (ret != DB_RUNRECOVERY)) {                  gf_log (this->name, GF_LOG_CRITICAL, -                        "failed to open DB environment (%s)", -                        db_strerror (ret)); +                        "failed to join Berkeley DB environment at %s: %s." +                        "please run manual recovery and retry running " +                        "glusterfs", +                        directory, db_strerror (ret));                  dbenv = NULL; -                goto out; +                goto err;          } else if (ret == DB_RUNRECOVERY) {                  fatal_flags = ((private->envflags & (~DB_RECOVER))                                 | DB_RECOVER_FATAL);                  ret = dbenv->open(dbenv, directory, fatal_flags,                                    S_IRUSR | S_IWUSR);                  if (ret != 0) { -                        gf_log (this->name, GF_LOG_ERROR, -                                "failed to open DB environment (%s) with " -                                "DB_REOVER_FATAL", -                                db_strerror (ret)); +                        gf_log (this->name, GF_LOG_CRITICAL, +                                "failed to join Berkeley DB environment in " +                                "recovery mode at %s: %s. please run manual " +                                "recovery and retry running glusterfs", +                                directory, db_strerror (ret));                          dbenv = NULL; -                        goto out; -                } else { -                        gf_log (this->name, GF_LOG_WARNING, -                                "opened DB environment after DB_RECOVER_FATAL:" -                                " %s", db_strerror (ret)); +                        goto err;                  } -        } else { -                gf_log (this->name, GF_LOG_DEBUG, -                        "DB environment successfull opened: %s", -                        db_strerror (ret));          } - - +        ret = 0;  #if (DB_VERSION_MAJOR == 4 &&                   \       DB_VERSION_MINOR == 7)          if (private->log_auto_remove) { @@ -832,41 +939,42 @@ bdb_dbenv_init (xlator_t *this,                  ret = dbenv->set_flags (dbenv, DB_LOG_AUTOREMOVE, 0);          }  #endif -        if (ret != 0) { -                gf_log ("bctx", GF_LOG_ERROR, -                        "failed to set DB_LOG_AUTOREMOVE on dbenv: %s", +        if (ret < 0) { +                gf_log ("bdb-ll", GF_LOG_ERROR, +                        "autoremoval of transactional log files could not be " +                        "configured (%s). you may have to do a manual " +                        "monitoring of transactional log files and remove " +                        "periodically.",                          db_strerror (ret)); -        } else { -                gf_log ("bctx", GF_LOG_DEBUG, -                        "DB_LOG_AUTOREMOVE set on dbenv"); +                goto err;          }          if (private->transaction) {                  ret = dbenv->set_flags(dbenv, DB_AUTO_COMMIT, 1);                  if (ret != 0) { -                        gf_log ("bctx", GF_LOG_ERROR, -                                "failed to set DB_AUTO_COMMIT on dbenv: %s", +                        gf_log ("bdb-ll", GF_LOG_DEBUG, +                                "configuration of auto-commit failed for " +                                "database environment at %s. none of the " +                                "operations will be embedded in transaction " +                                "unless explicitly done so.",                                  db_strerror (ret)); -                } else { -                        gf_log ("bctx", GF_LOG_DEBUG, -                                "DB_AUTO_COMMIT set on dbenv"); +                        goto err;                  }                  if (private->txn_timeout) { -                        ret = dbenv->set_timeout (dbenv, -                                                  private->txn_timeout, +                        ret = dbenv->set_timeout (dbenv, private->txn_timeout,                                                    DB_SET_TXN_TIMEOUT);                          if (ret != 0) { -                                gf_log ("bctx", GF_LOG_ERROR, -                                        "failed to set TXN_TIMEOUT to %d " -                                        "milliseconds on dbenv: %s", +                                gf_log ("bdb-ll", GF_LOG_ERROR, +                                        "could not configure Berkeley DB " +                                        "transaction timeout to %d (%s). please" +                                        " review 'option transaction-timeout %d" +                                        "' option.",                                          private->txn_timeout, -                                        db_strerror (ret)); -                        } else { -                                gf_log ("bctx", GF_LOG_DEBUG, -                                        "TXN_TIMEOUT set to %d milliseconds", +                                        db_strerror (ret),                                          private->txn_timeout); +                                goto err;                          }                  } @@ -874,32 +982,28 @@ bdb_dbenv_init (xlator_t *this,                          ret = dbenv->set_timeout(dbenv,                                                   private->txn_timeout,                                                   DB_SET_LOCK_TIMEOUT); - -                        if (ret != 0) { -                                gf_log ("bctx", GF_LOG_ERROR, -                                        "failed to set LOCK_TIMEOUT to %d " -                                        "milliseconds on dbenv: %s", +                        if (ret < 0) { +                                gf_log ("bdb-ll", GF_LOG_ERROR, +                                        "could not configure Berkeley DB " +                                        "lock timeout to %d (%s). please" +                                        " review 'option lock-timeout %d" +                                        "' option.",                                          private->lock_timeout, -                                        db_strerror (ret)); -                        } else { -                                gf_log ("bctx", GF_LOG_DEBUG, -                                        "LOCK_TIMEOUT set to %d milliseconds", +                                        db_strerror (ret),                                          private->lock_timeout); +                                goto err;                          }                  }                  ret = dbenv->set_lg_dir (dbenv, private->logdir); - -                if (ret != 0) { -                        gf_log ("bctx", GF_LOG_ERROR, -                                "failed to set log directory for dbenv: %s", -                                db_strerror (ret)); -                } else { -                        gf_log ("bctx", GF_LOG_DEBUG, -                                "set dbenv log dir to %s", -                                private->logdir); +                if (ret < 0) { +                        gf_log ("bdb-ll", GF_LOG_ERROR, +                                "failed to configure libdb transaction log " +                                "directory at %s. please review the " +                                "'option logdir %s' option.", +                                db_strerror (ret), private->logdir); +                        goto err;                  } -          }          if (private->errfile) { @@ -907,41 +1011,52 @@ bdb_dbenv_init (xlator_t *this,                  if (private->errfp) {                          dbenv->set_errfile (dbenv, private->errfp);                  } else { -                        gf_log ("bctx", GF_LOG_ERROR, -                                "failed to open errfile: %s", -                                strerror (errno)); +                        gf_log ("bdb-ll", GF_LOG_ERROR, +                                "failed to open error logging file for " +                                "libdb (Berkeley DB) internal logging (%s)." +                                "please review the 'option errfile %s' option.", +                                strerror (errno), private->errfile); +                        goto err;                  }          } -out:          return dbenv; +err: +        if (dbenv) { +                dbenv->close (dbenv, 0); +        } + +        return NULL;  }  #define BDB_ENV(this) ((((struct bdb_private *)this->private)->b_table)->dbenv) -/* bdb_checkpoint - during transactional usage, db does not directly write the data to db - *                  files, instead db writes a 'log' (similar to a journal entry) into a - *                  log file. db normally clears the log files during opening of an - *                  environment. since we expect a filesystem server to run for a pretty - *                  long duration and flushing 'log's during dbenv->open would prove very - *                  costly, if we accumulate the log entries for one complete run of - *                  glusterfs server. to flush the logs frequently, db provides a mechanism - *                  called 'checkpointing'. when we do a checkpoint, db flushes the logs to - *                  disk (writes changes to db files) and we can also clear the accumulated - *                  log files after checkpointing. NOTE: removing unwanted log files is not - *                  part of dbenv->txn_checkpoint() call. +/* bdb_checkpoint - during transactional usage, db does not directly write the + *  data to db files, instead db writes a 'log' (similar to a journal entry) + *  into a log file. db normally clears the log files during opening of an + *  environment. since we expect a filesystem server to run for a pretty long + *  duration and flushing 'log's during dbenv->open would prove very costly, if + *  we accumulate the log entries for one complete run of glusterfs server. to + *  flush the logs frequently, db provides a mechanism called 'checkpointing'. + *  when we do a checkpoint, db flushes the logs to disk (writes changes to db + *  files) and we can also clear the accumulated log files after checkpointing. + *  NOTE: removing unwanted log files is not part of dbenv->txn_checkpoint() + *  call.   *   * @data: xlator_t of the current instance of bdb xlator.   * - *  bdb_checkpoint is called in a different thread from the main glusterfs thread. bdb - *  xlator creates the checkpoint thread after successfully opening the db environment. - *  NOTE: bdb_checkpoint thread shares the DB_ENV handle with the filesystem thread. + *  bdb_checkpoint is called in a different thread from the main glusterfs + *  thread. bdb xlator creates the checkpoint thread after successfully opening + *  the db environment. + *  NOTE: bdb_checkpoint thread shares the DB_ENV handle with the filesystem + *  thread.   *   *  db environment checkpointing frequency is controlled by   *  'option checkpoint-timeout <time-in-seconds>' in volfile.   * - * NOTE: checkpointing thread is started only if 'option transaction on' specified in - *      volfile. checkpointing is not valid for non-transactional environments. + * NOTE: checkpointing thread is started only if 'option transaction on' + *      specified in volfile. checkpointing is not valid for non-transactional + *      environments.   *   */  static void * @@ -965,23 +1080,29 @@ bdb_checkpoint (void *data)                  if (active) {                          ret = dbenv->txn_checkpoint (dbenv, 1024, 0, 0);                          if (ret) { -                                gf_log ("bctx", GF_LOG_ERROR, -                                        "failed to checkpoint environment: %s", +                                gf_log ("bdb-ll", GF_LOG_DEBUG, +                                        "_BDB_CHECKPOINT: %s" +                                        "(failed to checkpoint environment)",                                          db_strerror (ret));                          } else { -                                gf_log ("bctx", GF_LOG_DEBUG, -                                        "checkpointing successful"); +                                gf_log ("bdb-ll", GF_LOG_DEBUG, +                                        "_BDB_CHECKPOINT: successfully " +                                        "checkpointed");                          }                  } else {                          ret = dbenv->txn_checkpoint (dbenv, 1024, 0, 0);                          if (ret) { -                                gf_log ("bctx", GF_LOG_ERROR, -                                        "failed to do final checkpoint " -                                        "environment: %s", +                                gf_log ("bdb-ll", GF_LOG_ERROR, +                                        "_BDB_CHECKPOINT: %s" +                                        "(final checkpointing failed. might " +                                        "need to run recovery tool manually on " +                                        "next usage of this database " +                                        "environment)",                                          db_strerror (ret));                          } else { -                                gf_log ("bctx", GF_LOG_DEBUG, -                                        "final checkpointing successful"); +                                gf_log ("bdb-ll", GF_LOG_DEBUG, +                                        "_BDB_CHECKPOINT: final successfully " +                                        "checkpointed");                          }                          break;                  } @@ -990,449 +1111,321 @@ bdb_checkpoint (void *data)          return NULL;  } -static inline void -bdb_cache_init (xlator_t *this, -                dict_t *options, -                struct bdb_private *private) -{ -        /* cache is always on */ -        private->cache = ON; -} - -static inline void -bdb_log_remove_init (xlator_t *this, -                     dict_t *options, -                     struct bdb_private *private) -{ -        private->log_auto_remove = 1; -        gf_log (this->name, GF_LOG_DEBUG, -                "DB_ENV will use DB_LOG_AUTO_REMOVE"); -} -static inline void -bdb_errfile_init (xlator_t *this, -                  dict_t *options, -                  struct bdb_private *private) -{ -        int ret = -1; -        char *errfile = NULL; - -        ret = dict_get_str (options, "errfile", &errfile); -        if (ret == 0) { -                private->errfile = strdup (errfile); -                gf_log (this->name, GF_LOG_DEBUG, -                        "using errfile: %s", private->errfile); -        } -} - -static inline void -bdb_table_init (xlator_t *this, -                dict_t *options, -                struct bdb_private *private) +/* bdb_db_init - initialize bdb xlator + * + * reads the options from @options dictionary and sets appropriate values in + * @this->private. also initializes DB_ENV. + * + * return: 0 on success or -1 on error + * (with logging the error through gf_log()). + */ +int +bdb_db_init (xlator_t *this, +             dict_t *options)  { -        bctx_table_t *table = NULL; -        int32_t       idx   = 0; - -        int ret = -1; -        char *lru_limit_str = NULL; -        char *page_size_str = NULL; - -        table = CALLOC (1, sizeof (*table)); -        if (table) { -                INIT_LIST_HEAD(&(table->b_lru)); -                INIT_LIST_HEAD(&(table->active)); -                INIT_LIST_HEAD(&(table->purge)); - -                LOCK_INIT (&table->lock); -                LOCK_INIT (&table->checkpoint_lock); - -                table->transaction = private->transaction; -                table->access_mode = private->access_mode; -                table->dbflags = private->dbflags; -                table->this    = this; - -                { -                        ret = dict_get_str (options, "lru-limit", -                                            &lru_limit_str); - -                        /* TODO: set max lockers and max txns to accomodate -                         * for more than lru_limit */ -                        if (ret == 0) { -                                ret = gf_string2uint32 (lru_limit_str, -                                                        &table->lru_limit); -                                gf_log ("bdb-ll", GF_LOG_DEBUG, -                                        "setting bctx lru limit to %d", -                                        table->lru_limit); -                        } else { -                                table->lru_limit = BDB_DEFAULT_LRU_LIMIT; -                        } -                } - -                { -                        ret = dict_get_str (options, "page-size", -                                            &page_size_str); - -                        if (ret == 0) { -                                ret = gf_string2bytesize (page_size_str, -                                                          &table->page_size); -                                if (ret != 0) { -                                        gf_log ("bdb-ll", GF_LOG_ERROR, -                                                "invalid number format \"%s\"" -                                                " of \"option page-size\"", -                                                page_size_str); -                                } +        /* create a db entry for root */ +        int32_t        op_ret  = 0; +        bdb_private_t *private = NULL; +        bctx_table_t  *table = NULL; -                                if (!PAGE_SIZE_IN_RANGE(table->page_size)) { -                                        gf_log ("bdb-ll", GF_LOG_ERROR, -                                                "pagesize %s is out of range." -                                                "Allowed pagesize is between " -                                                "%d and %d", -                                                page_size_str, -                                                BDB_LL_PAGE_SIZE_MIN, -                                                BDB_LL_PAGE_SIZE_MAX); -                                } -                        } -                        else { -                                table->page_size = BDB_LL_PAGE_SIZE_DEFAULT; -                        } -                        gf_log ("bdb-ll", -                                GF_LOG_DEBUG, "using page-size %"PRIu64, -                                table->page_size); -                } +        char *checkpoint_interval_str = NULL; +        char *page_size_str           = NULL; +        char *lru_limit_str           = NULL; +        char *timeout_str             = NULL; +        char *access_mode             = NULL; +        char *endptr    = NULL; +        char *errfile   = NULL; +        char *directory = NULL; +        char *logdir    = NULL; +        char *mode      = NULL; +        char *mode_str  = NULL; +        int   ret = -1; +        int   idx = 0; +        struct stat stbuf = {0,}; -                table->hash_size = BDB_DEFAULT_HASH_SIZE; -                table->b_hash = CALLOC (BDB_DEFAULT_HASH_SIZE, -                                        sizeof (struct list_head)); +        private = this->private; -                for (idx = 0; idx < table->hash_size; idx++) -                        INIT_LIST_HEAD(&(table->b_hash[idx])); +        /* cache is always on */ +        private->cache = ON; -                private->b_table = table; +        ret = dict_get_str (options, "access-mode", &access_mode); +        if ((ret == 0) +            && (!strcmp (access_mode, "btree"))) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "using BTREE access mode to access libdb " +                        "(Berkeley DB)"); +                private->access_mode = DB_BTREE;          } else { -                gf_log ("bdb-ll", GF_LOG_CRITICAL, -                        "failed to allocate bctx table: out of memory"); +                gf_log (this->name, GF_LOG_DEBUG, +                        "using HASH access mode to access libdb (Berkeley DB)"); +                private->access_mode = DB_HASH;          } -} - -static inline void -bdb_directory_init (xlator_t *this, -                    dict_t *options, -                    struct bdb_private *private) -{ -        int ret = -1; -        char *directory = NULL; -        char *logdir = NULL; -        int32_t op_ret = -1; -        struct stat stbuf = {0}; -        ret = dict_get_str (options, "directory", &directory); +        ret = dict_get_str (options, "mode", &mode); +        if ((ret == 0) +            && (!strcmp (mode, "cache"))) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "cache data mode selected for 'storage/bdb'. filesystem" +                        " operations are not transactionally protected and " +                        "system crash does not guarantee recoverability of " +                        "data"); +                private->envflags = DB_CREATE | DB_INIT_LOG | +                        DB_INIT_MPOOL | DB_THREAD; +                private->dbflags = DB_CREATE | DB_THREAD; +                private->transaction = OFF; +        } else { +                gf_log (this->name, GF_LOG_DEBUG, +                        "persistent data mode selected for 'storage/bdb'. each" +                        "filesystem operation is guaranteed to be Berkeley DB " +                        "transaction protected."); +                private->transaction = ON; +                private->envflags = DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG | +                        DB_INIT_MPOOL | DB_INIT_TXN | DB_RECOVER | DB_THREAD; +                private->dbflags = DB_CREATE | DB_THREAD; -        if (ret == 0) { -                ret = dict_get_str (options, "logdir", &logdir); -                if (ret != 0) { -                        gf_log ("bdb-ll", GF_LOG_DEBUG, -                                "using default logdir as database home"); -                        private->logdir = strdup (directory); +                ret = dict_get_str (options, "lock-timeout", &timeout_str); -                } else { -                        private->logdir = strdup (logdir); -                        gf_log ("bdb-ll", GF_LOG_DEBUG, -                                "using logdir: %s", -                                private->logdir); -                        umask (000); -                        if (mkdir (private->logdir, 0777) == 0) { -                                gf_log ("bdb-ll", GF_LOG_WARNING, -                                        "logdir specified (%s) not exists, " -                                        "created", -                                        private->logdir); -                        } - -                        op_ret = stat (private->logdir, &stbuf); -                        if ((op_ret != 0) -                            || (!S_ISDIR (stbuf.st_mode))) { -                                gf_log ("bdb-ll", GF_LOG_ERROR, -                                        "specified logdir doesn't exist, " -                                        "using default " -                                        "(environment home directory: %s)", -                                        directory); -                                private->logdir = strdup (directory); +                if (ret == 0) { +                        ret = gf_string2time (timeout_str, +                                              &private->lock_timeout); + +                        if (private->lock_timeout > 4260000) { +                                /* db allows us to DB_SET_LOCK_TIMEOUT to be +                                 * set to a maximum of 71 mins +                                 * (4260000 milliseconds) */ +                                gf_log (this->name, GF_LOG_DEBUG, +                                        "Berkeley DB lock-timeout parameter " +                                        "(%d) is out of range. please specify" +                                        " a valid timeout value for " +                                        "lock-timeout and retry.", +                                        private->lock_timeout); +                                goto err;                          }                  } - -                private->b_table->dbenv = bdb_dbenv_init (this, directory); - -                if (!private->b_table->dbenv) { -                        gf_log ("bdb-ll", GF_LOG_ERROR, -                                "failed to initialize db environment"); -                        FREE (private); -                        op_ret = -1; -                } else { -                        if (private->transaction) { -                                /* all well, start the checkpointing thread */ -                                LOCK_INIT (&private->active_lock); - -                                LOCK (&private->active_lock); -                                { -                                        private->active = 1; -                                } -                                UNLOCK (&private->active_lock); -                                pthread_create (&private->checkpoint_thread, -                                                NULL, bdb_checkpoint, this); +                ret = dict_get_str (options, "transaction-timeout", +                                    &timeout_str); +                if (ret == 0) { +                        ret = gf_string2time (timeout_str, +                                              &private->txn_timeout); + +                        if (private->txn_timeout > 4260000) { +                                /* db allows us to DB_SET_TXN_TIMEOUT to be set +                                 * to a maximum of 71 mins +                                 * (4260000 milliseconds) */ +                                gf_log (this->name, GF_LOG_DEBUG, +                                        "Berkeley DB lock-timeout parameter " +                                        "(%d) is out of range. please specify" +                                        " a valid timeout value for " +                                        "lock-timeout and retry.", +                                        private->lock_timeout); +                                goto err;                          }                  } -        } -} - -static inline void -bdb_dir_mode_init (xlator_t *this, -                   dict_t *options, -                   struct bdb_private *private) -{ -        int ret = -1; -        char *mode_str = NULL; -        char *endptr = NULL; -        ret = dict_get_str (options, "dir-mode", &mode_str); - -        if (ret == 0) { -                private->dir_mode = strtol (mode_str, &endptr, 8); -                if ((*endptr) || -                    (!IS_VALID_FILE_MODE(private->dir_mode))) { -                        gf_log (this->name, GF_LOG_DEBUG, -                                "invalid dir-mode %o. setting to default %o", -                                private->dir_mode, -                                DEFAULT_DIR_MODE); -                        private->dir_mode = DEFAULT_DIR_MODE; -                } else { -                        gf_log (this->name, GF_LOG_DEBUG, -                                "setting dir-mode to %o", -                                private->dir_mode); +                private->checkpoint_interval = BDB_DEFAULT_CHECKPOINT_INTERVAL; +                ret = dict_get_str (options, "checkpoint-interval", +                                    &checkpoint_interval_str); +                if (ret == 0) { +                        ret = gf_string2time (checkpoint_interval_str, +                                              &private->checkpoint_interval); + +                        if (ret < 0) { +                                gf_log (this->name, GF_LOG_DEBUG, +                                        "'%"PRIu32"' is not a valid parameter " +                                        "for checkpoint-interval option. " +                                        "please specify a valid " +                                        "checkpoint-interval and retry", +                                        private->checkpoint_interval); +                                goto err; +                        }                  } -        } else { -                private->dir_mode = DEFAULT_DIR_MODE;          } -        private->dir_mode = private->dir_mode | S_IFDIR; -} - -static inline void -bdb_file_mode_init (xlator_t *this, -                    dict_t *options, -                    struct bdb_private *private) -{ -        int ret = -1; -        char *mode_str = NULL; -        char *endptr = NULL; -          ret = dict_get_str (options, "file-mode", &mode_str); -          if (ret == 0) {                  private->file_mode = strtol (mode_str, &endptr, 8);                  if ((*endptr) ||                      (!IS_VALID_FILE_MODE(private->file_mode))) {                          gf_log (this->name, GF_LOG_DEBUG, -                                "invalid file-mode %o. setting to default %o", -                                private->file_mode, DEFAULT_FILE_MODE); -                        private->file_mode = DEFAULT_FILE_MODE; -                } else { -                        gf_log (this->name, GF_LOG_DEBUG, -                                "setting file-mode to %o", +                                "'%o' is not a valid parameter for file-mode " +                                "option. please specify a valid parameter for " +                                "file-mode and retry.",                                  private->file_mode); -                        private->file_mode = private->file_mode; +                        goto err;                  }          } else {                  private->file_mode = DEFAULT_FILE_MODE;          } -          private->symlink_mode = private->file_mode | S_IFLNK;          private->file_mode = private->file_mode | S_IFREG; -} - -static inline void -bdb_checkpoint_interval_init (xlator_t *this, -                              dict_t *options, -                              struct bdb_private *private) -{ -        int   ret = -1; -        char *checkpoint_interval_str = NULL; - -        private->checkpoint_interval = BDB_DEFAULT_CHECKPOINT_INTERVAL; - -        ret = dict_get_str (options, "checkpoint-interval", -                            &checkpoint_interval_str); +        ret = dict_get_str (options, "dir-mode", &mode_str);          if (ret == 0) { -                ret = gf_string2time (checkpoint_interval_str, -                                      &private->checkpoint_interval); - -                if (ret == 0) { +                private->dir_mode = strtol (mode_str, &endptr, 8); +                if ((*endptr) || +                    (!IS_VALID_FILE_MODE(private->dir_mode))) {                          gf_log (this->name, GF_LOG_DEBUG, -                                "setting checkpoint-interval to %"PRIu32" seconds", -                                private->checkpoint_interval); +                                "'%o' is not a valid parameter for dir-mode " +                                "option. please specify a valid parameter for " +                                "dir-mode and retry.", +                                private->dir_mode); +                        goto err;                  }          } else { -                gf_log (this->name, GF_LOG_DEBUG, -                        "setting checkpoint-interval to default: %"PRIu32" seconds", -                        private->checkpoint_interval); +                private->dir_mode = DEFAULT_DIR_MODE;          } -} -static inline void -bdb_lock_timeout_init (xlator_t *this, -                       dict_t *options, -                       struct bdb_private *private) -{ -        int   ret = -1; -        char *timeout_str = NULL; +        private->dir_mode = private->dir_mode | S_IFDIR; -        ret = dict_get_str (options, "lock-timeout", &timeout_str); +        table = CALLOC (1, sizeof (*table)); +        if (table == NULL) { +                gf_log ("bdb-ll", GF_LOG_CRITICAL, +                        "memory allocation for 'storage/bdb' internal " +                        "context table failed."); +                goto err; +        } -        if (ret == 0) { -                ret = gf_string2time (timeout_str, &private->lock_timeout); +        INIT_LIST_HEAD(&(table->b_lru)); +        INIT_LIST_HEAD(&(table->active)); +        INIT_LIST_HEAD(&(table->purge)); -                if (private->lock_timeout > 4260000) { -                        /* db allows us to DB_SET_LOCK_TIMEOUT to be set to a -                         * maximum of 71 mins (4260000 milliseconds) */ -                        gf_log (this->name, GF_LOG_DEBUG, -                                "lock-timeout %d, out of range", -                                private->lock_timeout); -                        private->lock_timeout = 0; -                } else { -                        gf_log (this->name, GF_LOG_DEBUG, -                                "setting lock-timeout to %d milliseconds", -                                private->lock_timeout); -                } -        } -} +        LOCK_INIT (&table->lock); +        LOCK_INIT (&table->checkpoint_lock); -static inline void -bdb_transaction_timeout_init (xlator_t *this, -                              dict_t *options, -                              struct bdb_private *private) -{ -        int   ret = -1; -        char *timeout_str = NULL; +        table->transaction = private->transaction; +        table->access_mode = private->access_mode; +        table->dbflags = private->dbflags; +        table->this    = this; -        ret = dict_get_str (options, "transaction-timeout", &timeout_str); +        ret = dict_get_str (options, "lru-limit", +                            &lru_limit_str); +        /* TODO: set max lockers and max txns to accomodate +         * for more than lru_limit */          if (ret == 0) { -                ret = gf_string2time (timeout_str, &private->txn_timeout); - -                if (private->txn_timeout > 4260000) { -                        /* db allows us to DB_SET_TXN_TIMEOUT to be set to -                         * a maximum of 71 mins (4260000 milliseconds) */ -                        gf_log (this->name, GF_LOG_DEBUG, -                                "transaction-timeout %d, out of range", -                                private->txn_timeout); -                        private->txn_timeout = 0; -                } else { -                        gf_log (this->name, GF_LOG_DEBUG, -                                "setting transaction-timeout to %d " -                                "milliseconds", -                                private->txn_timeout); -                } +                ret = gf_string2uint32 (lru_limit_str, +                                        &table->lru_limit); +                gf_log ("bdb-ll", GF_LOG_DEBUG, +                        "setting lru limit of 'storage/bdb' internal context" +                        "table to %d. maximum of %d unused databases can be " +                        "open at any given point of time.", +                        table->lru_limit, table->lru_limit); +        } else { +                table->lru_limit = BDB_DEFAULT_LRU_LIMIT;          } -} -static inline void -bdb_transaction_init (xlator_t *this, -                      dict_t *options, -                      struct bdb_private *private) -{ -        int   ret = -1; -        char *mode = NULL; +        ret = dict_get_str (options, "page-size", +                            &page_size_str); -        ret = dict_get_str (options, "mode", &mode); +        if (ret == 0) { +                ret = gf_string2bytesize (page_size_str, +                                          &table->page_size); +                if (ret < 0) { +                        gf_log ("bdb-ll", GF_LOG_ERROR, +                                "\"%s\" is an invalid parameter to " +                                "\"option page-size\". please specify a valid " +                                "size and retry.", +                                page_size_str); +                        goto err; +                } -        if ((ret == 0) -            && (!strcmp (mode, "cache"))) { -                gf_log (this->name, GF_LOG_DEBUG, -                        "cache mode selected"); -                private->envflags = DB_CREATE | DB_INIT_LOG | -                        DB_INIT_MPOOL | DB_THREAD; -                private->dbflags = DB_CREATE | DB_THREAD; -                private->transaction = OFF; +                if (!PAGE_SIZE_IN_RANGE(table->page_size)) { +                        gf_log ("bdb-ll", GF_LOG_ERROR, +                                "\"%s\" is out of range for Berkeley DB " +                                "page-size. allowed page-size range is %d to " +                                "%d. please specify a page-size value in the " +                                "range and retry.", +                                page_size_str, BDB_LL_PAGE_SIZE_MIN, +                                BDB_LL_PAGE_SIZE_MAX); +                        goto err; +                }          } else { -                gf_log (this->name, GF_LOG_DEBUG, -                        "persistant mode selected"); -                private->transaction = ON; -                private->envflags = DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG | -                        DB_INIT_MPOOL | DB_INIT_TXN | DB_RECOVER | DB_THREAD; -                private->dbflags = DB_CREATE | DB_THREAD; - -                bdb_lock_timeout_init (this, options, private); - -                bdb_transaction_timeout_init (this, options, private); - -                bdb_log_remove_init (this, options, private); - -                bdb_checkpoint_interval_init (this, options, private); +                table->page_size = BDB_LL_PAGE_SIZE_DEFAULT;          } -} -static inline void -bdb_access_mode_init (xlator_t *this, -                      dict_t *options, -                      struct bdb_private *private) -{ -        int   ret = -1; -        char *access_mode = NULL; +        table->hash_size = BDB_DEFAULT_HASH_SIZE; +        table->b_hash = CALLOC (BDB_DEFAULT_HASH_SIZE, +                                sizeof (struct list_head)); -        ret = dict_get_str (options, "access-mode", &access_mode); +        for (idx = 0; idx < table->hash_size; idx++) +                INIT_LIST_HEAD(&(table->b_hash[idx])); -        if ((ret == 0) -            && (!strcmp (access_mode, "btree"))) { -                gf_log (this->name, GF_LOG_DEBUG, -                        "using access mode BTREE"); -                private->access_mode = DB_BTREE; -        } else { +        private->b_table = table; + +        ret = dict_get_str (options, "errfile", &errfile); +        if (ret == 0) { +                private->errfile = strdup (errfile);                  gf_log (this->name, GF_LOG_DEBUG, -                        "using access mode HASH"); -                private->access_mode = DB_HASH; +                        "using %s as error logging file for libdb (Berkeley DB " +                        "library) internal logging.", private->errfile);          } -} +        ret = dict_get_str (options, "directory", &directory); -/* bdb_db_init - initialize bdb xlator - * - * reads the options from @options dictionary and sets appropriate values in - * @this->private. also initializes DB_ENV. - * - * return: 0 on success or -1 on error - * (with logging the error through gf_log()). - */ -int -bdb_db_init (xlator_t *this, -             dict_t *options) -{ -        /* create a db entry for root */ -        int32_t        op_ret             = 0; -        bdb_private_t *private            = NULL; +        if (ret == 0) { +                ret = dict_get_str (options, "logdir", &logdir); -        private = this->private; +                if (ret < 0) { +                        gf_log ("bdb-ll", GF_LOG_DEBUG, +                                "using the database environment home " +                                "directory (%s) itself as transaction log " +                                "directory", directory); +                        private->logdir = strdup (directory); -        bdb_cache_init (this, options, private); +                } else { +                        private->logdir = strdup (logdir); -        bdb_access_mode_init (this, options, private); +                        op_ret = stat (private->logdir, &stbuf); +                        if ((op_ret != 0) +                            || (!S_ISDIR (stbuf.st_mode))) { +                                gf_log ("bdb-ll", GF_LOG_ERROR, +                                        "specified logdir %s does not exist. " +                                        "please provide a valid existing " +                                        "directory as parameter to 'option " +                                        "logdir'", +                                        private->logdir); +                                goto err; +                        } +                } -        bdb_transaction_init (this, options, private); +                private->b_table->dbenv = bdb_dbenv_init (this, directory); +                if (private->b_table->dbenv == NULL) { +                        gf_log ("bdb-ll", GF_LOG_ERROR, +                                "initialization of database environment " +                                "failed"); +                        goto err; +                } else { +                        if (private->transaction) { +                                /* all well, start the checkpointing thread */ +                                LOCK_INIT (&private->active_lock); -        { -                LOCK_INIT (&private->ino_lock); -                private->next_ino = 2; +                                LOCK (&private->active_lock); +                                { +                                        private->active = 1; +                                } +                                UNLOCK (&private->active_lock); +                                pthread_create (&private->checkpoint_thread, +                                                NULL, bdb_checkpoint, this); +                        } +                }          } -        bdb_file_mode_init (this, options, private); - -        bdb_dir_mode_init (this, options, private); - -        bdb_table_init (this, options, private); - -        bdb_errfile_init (this, options, private); +        return op_ret; +err: +        if (table) { +                FREE (table->b_hash); +                FREE (table); +        } +        if (private) { +                if (private->errfile) +                        FREE (private->errfile); -        bdb_directory_init (this, options, private); +                if (private->logdir) +                        FREE (private->logdir); +        } -        return op_ret; +        return -1;  } diff --git a/xlators/storage/bdb/src/bdb.c b/xlators/storage/bdb/src/bdb.c index a3c6c44ea..85f08ea9a 100644 --- a/xlators/storage/bdb/src/bdb.c +++ b/xlators/storage/bdb/src/bdb.c @@ -82,49 +82,57 @@ bdb_mknod (call_frame_t *frame,          if (!S_ISREG(mode)) {                  gf_log (this->name, GF_LOG_DEBUG, -                        "mknod for non-regular file"); +                        "MKNOD %"PRId64"/%s (%s): EPERM" +                        "(mknod supported only for regular files. " +                        "file mode '%o' not supported)", +                        loc->parent->ino, loc->name, loc->path, mode);                  op_ret = -1;                  op_errno = EPERM;                  goto out;          } /* if(!S_ISREG(mode)) */          bctx = bctx_parent (B_TABLE(this), loc->path); -          if (bctx == NULL) { -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to get bctx for path: %s", -                        loc->path); -                op_ret = -1; -                op_errno = ENOENT; +                gf_log (this->name, GF_LOG_DEBUG, +                        "MKNOD %"PRId64"/%s (%s): ENOMEM" +                        "(failed to lookup database handle)", +                        loc->parent->ino, loc->name, loc->path); +                op_ret   = -1; +                op_errno = ENOMEM;                  goto out; -        } /* if(bctx == NULL) */ +        }          MAKE_REAL_PATH_TO_STORAGE_DB (db_path, this, bctx->directory);          op_ret = lstat (db_path, &stbuf); -        op_errno = errno;          if (op_ret != 0) { -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to lstat on %s (%s)", -                        db_path, strerror (op_errno)); +                op_errno = EINVAL; +                gf_log (this->name, GF_LOG_DEBUG, +                        "MKNOD %"PRId64"/%s (%s): EINVAL" +                        "(failed to lookup database handle)", +                        loc->parent->ino, loc->name, loc->path);                  goto out;          }          MAKE_KEY_FROM_PATH (key_string, loc->path); -        op_ret = bdb_db_put (bctx, NULL, key_string, NULL, 0, 0, 0); +        op_ret = bdb_db_icreate (bctx, key_string);          if (op_ret > 0) {                  /* create successful */ -                stbuf.st_ino = bdb_inode_transform (stbuf.st_ino, bctx); +                stbuf.st_ino = bdb_inode_transform (loc->parent->ino, +                                                    key_string, +                                                    strlen (key_string));                  stbuf.st_mode  = mode;                  stbuf.st_size = 0;                  stbuf.st_blocks = BDB_COUNT_BLOCKS (stbuf.st_size, \                                                      stbuf.st_blksize);          } else { -                gf_log (this->name, GF_LOG_ERROR, -                        "bdb_db_get() failed for path: %s", -                        loc->path); -                op_ret = -1; -                op_errno = ENOENT; +                gf_log (this->name, GF_LOG_DEBUG, +                        "MKNOD %"PRId64"/%s (%s): ENOMEM" +                        "(failed to create database entry)", +                        loc->parent->ino, loc->name, loc->path); +                op_ret   = -1; +                op_errno = EINVAL; /* TODO: errno sari illa */ +                goto out;          }/* if (!op_ret)...else */  out: @@ -156,11 +164,7 @@ is_dir_empty (xlator_t *this,          bctx = bctx_lookup (B_TABLE(this), loc->path);          if (bctx == NULL) { -                gf_log (this->name, GF_LOG_DEBUG, -                        "failed to get bctx from inode for dir: %s," -                        "assuming empty directory", -                        loc->path); -                ret = 1; +                ret = -ENOMEM;                  goto out;          } @@ -180,33 +184,24 @@ is_dir_empty (xlator_t *this,                          break;                  case DB_UNKNOWN:                          gf_log (this->name, GF_LOG_CRITICAL, -                                "unknown access-mode set for db"); +                                "unknown access-mode set for database");                          ret = 0;                  }          } else { -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to get db stat for db at path: %s", -                        loc->path); -                ret = 1; +                ret = -EBUSY;                  goto out;          }          MAKE_REAL_PATH (real_path, this, loc->path);          dir = opendir (real_path);          if (dir == NULL) { -                gf_log (this->name, GF_LOG_DEBUG, -                        "failed to opendir(%s)", -                        loc->path); -                ret = 0; +                ret = -errno;                  goto out;          }          while ((entry = readdir (dir))) {                  if ((!IS_BDB_PRIVATE_FILE(entry->d_name)) &&                      (!IS_DOT_DOTDOT(entry->d_name))) { -                        gf_log (this->name, GF_LOG_DEBUG, -                                "directory (%s) not empty, has a non-db entry", -                                loc->path);                          ret = 0;                          break;                  }/* if(!IS_BDB_PRIVATE_FILE()) */ @@ -256,26 +251,19 @@ is_space_left (xlator_t *this,          ret = statvfs (private->export_path, &stbuf);          if (ret != 0) { -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to do statvfs on %s", -                        private->export_path); -                return 0; +                ret = 0;          } else {                  req_blocks = (size / stbuf.f_frsize) + 1;                  usable_blocks = (stbuf.f_bfree - BDB_ENOSPC_THRESHOLD); -                gf_log (this->name, GF_LOG_DEBUG, -                        "requested size: %"GF_PRI_SIZET"\n" -                        "free blocks: %"PRIu64"\n" -                        "block size: %lu\nfrag size: %lu", -                        size, stbuf.f_bfree, stbuf.f_bsize, stbuf.f_frsize); -                  if (req_blocks < usable_blocks) -                        return 1; +                        ret = 1;                  else -                        return 0; +                        ret = 0;          } + +        return ret;  }  int32_t @@ -303,40 +291,68 @@ bdb_create (call_frame_t *frame,          private = this->private;          bctx = bctx_parent (B_TABLE(this), loc->path); -        op_errno = ENOENT; -        GF_VALIDATE_OR_GOTO (this->name, bctx, out); +        if (bctx == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "CREATE %"PRId64"/%s (%s): ENOMEM" +                        "(failed to lookup database handle)", +                        loc->parent->ino, loc->name, loc->path); +                op_ret   = -1; +                op_errno = ENOMEM; +                goto out; +        }          MAKE_REAL_PATH_TO_STORAGE_DB (db_path, this, bctx->directory);          op_ret = lstat (db_path, &stbuf); -        op_errno = errno;          if (op_ret != 0) { -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to lstat on %s (%s)", -                        db_path, strerror (op_errno)); +                op_errno = EINVAL; +                gf_log (this->name, GF_LOG_DEBUG, +                        "CREATE %"PRId64"/%s (%s): EINVAL" +                        "(database file missing)", +                        loc->parent->ino, loc->name, loc->path);                  goto out;          }          MAKE_KEY_FROM_PATH (key_string, loc->path); -        op_ret = bdb_db_put (bctx, NULL, key_string, NULL, 0, 0, 0); -        op_errno = EINVAL; -        GF_VALIDATE_OR_GOTO (this->name, (op_ret == 0), out); +        op_ret = bdb_db_icreate (bctx, key_string); +        if (op_ret < 0) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "CREATE %"PRId64"/%s (%s): ENOMEM" +                        "(failed to create database entry)", +                        loc->parent->ino, loc->name, loc->path); +                op_errno = EINVAL; /* TODO: errno sari illa */ +                goto out; +        }          /* create successful */          bfd = CALLOC (1, sizeof (*bfd)); -        op_ret = -1; -        op_errno = ENOMEM; -        GF_VALIDATE_OR_GOTO (this->name, bfd, out); +        if (bfd == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "CREATE %"PRId64"/%s (%s): ENOMEM" +                        "(failed to allocate memory for internal fd context)", +                        loc->parent->ino, loc->name, loc->path); +                op_ret   = -1; +                op_errno = ENOMEM; +                goto out; +        }          /* NOTE: bdb_get_bctx_from () returns bctx with a ref */          bfd->ctx = bctx;          bfd->key = strdup (key_string); -        op_ret = -1; -        op_errno = ENOMEM; -        GF_VALIDATE_OR_GOTO (this->name, bfd->key, out); +        if (bfd->key == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "CREATE %"PRId64" (%s): ENOMEM" +                        "(failed to allocate memory for internal fd->key)", +                        loc->ino, loc->path); +                op_ret   = -1; +                op_errno = ENOMEM; +                goto out; +        } -        BDB_SET_BFD (this, fd, bfd); +        BDB_FCTX_SET (fd, this, bfd); -        stbuf.st_ino = bdb_inode_transform (stbuf.st_ino, bctx); +        stbuf.st_ino = bdb_inode_transform (loc->parent->ino, +                                            key_string, +                                            strlen (key_string));          stbuf.st_mode = private->file_mode;          stbuf.st_size = 0;          stbuf.st_nlink = 1; @@ -377,23 +393,43 @@ bdb_open (call_frame_t *frame,          GF_VALIDATE_OR_GOTO (this->name, fd, out);          bctx = bctx_parent (B_TABLE(this), loc->path); -        op_errno = EBADFD; -        GF_VALIDATE_OR_GOTO (this->name, bctx, out); +        if (bctx == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "OPEN %"PRId64" (%s): ENOMEM" +                        "(failed to lookup database handle)", +                        loc->ino, loc->path); +                op_ret   = -1; +                op_errno = ENOMEM; +                goto out; +        }          bfd = CALLOC (1, sizeof (*bfd)); -        op_errno = ENOMEM; -        GF_VALIDATE_OR_GOTO (this->name, bfd, out); +        if (bfd == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "OPEN %"PRId64" (%s): ENOMEM" +                        "(failed to allocate memory for internal fd context)", +                        loc->ino, loc->path); +                op_ret   = -1; +                op_errno = ENOMEM; +                goto out; +        }          /* NOTE: bctx_parent () returns bctx with a ref */          bfd->ctx = bctx;          MAKE_KEY_FROM_PATH (key_string, loc->path);          bfd->key = strdup (key_string); -        op_ret = -1; -        op_errno = ENOMEM; -        GF_VALIDATE_OR_GOTO (this->name, bfd->key, out); +        if (bfd->key == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "OPEN %"PRId64" (%s): ENOMEM" +                        "(failed to allocate memory for internal fd->key)", +                        loc->ino, loc->path); +                op_ret   = -1; +                op_errno = ENOMEM; +                goto out; +        } -        BDB_SET_BFD (this, fd, bfd); +        BDB_FCTX_SET (fd, this, bfd);          op_ret = 0;  out:          frame->root->rsp_refs = NULL; @@ -416,7 +452,6 @@ bdb_readv (call_frame_t *frame,          struct bdb_fd *bfd        = NULL;          dict_t        *reply_dict = NULL;          char          *buf        = NULL; -        data_t        *buf_data   = NULL;          char          *db_path    = NULL;          int32_t        read_size  = 0; @@ -424,29 +459,37 @@ bdb_readv (call_frame_t *frame,          GF_VALIDATE_OR_GOTO ("bdb", this, out);          GF_VALIDATE_OR_GOTO (this->name, fd, out); -        bfd = bdb_extract_bfd (fd, this); -        op_errno = EBADFD; -        GF_VALIDATE_OR_GOTO (this->name, bfd, out); +        BDB_FCTX_GET (fd, this, &bfd); +        if (bfd == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "READV %"PRId64" - %"PRId32",%"PRId64": EBADFD" +                        "(internal fd not found through fd)", +                        fd->inode->ino, size, offset); +                op_errno = EBADFD; +                op_ret = -1; +                goto out; +        }          MAKE_REAL_PATH_TO_STORAGE_DB (db_path, this, bfd->ctx->directory);          op_ret = lstat (db_path, &stbuf); -        op_errno = errno;          if (op_ret != 0) { -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to lstat on %s (%s)", -                        db_path, strerror (op_errno)); +                op_errno = errno; +                gf_log (this->name, GF_LOG_DEBUG, +                        "READV %"PRId64" - %"PRId32",%"PRId64": EINVAL" +                        "(database file missing)", +                        fd->inode->ino, size, offset);                  goto out;          }          /* we are ready to go */ -        op_ret = bdb_db_get (bfd->ctx, NULL, -                             bfd->key, &buf, -                             size, offset); +        op_ret = bdb_db_fread (bfd, &buf, size, offset);          read_size = op_ret;          if (op_ret == -1) { -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to do db_storage_get()"); -                op_ret = -1; +                gf_log (this->name, GF_LOG_DEBUG, +                        "READV %"PRId64" - %"PRId32",%"PRId64": EBADFD" +                        "(failed to find entry in database)", +                        fd->inode->ino, size, offset); +                op_ret   = -1;                  op_errno = ENOENT;                  goto out;          } else if (op_ret == 0) { @@ -454,17 +497,21 @@ bdb_readv (call_frame_t *frame,          }          reply_dict = dict_new (); -        op_ret = -1; -        op_errno = ENOMEM; -        GF_VALIDATE_OR_GOTO (this->name, reply_dict, out); +        if (reply_dict == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "READV %"PRId64" - %"PRId32",%"PRId64": EBADFD" +                        "(failed to allocate memory for reply dictionary)", +                        fd->inode->ino, size, offset); +                op_ret = -1; +                op_errno = ENOMEM; +                goto out; +        }          if (size < read_size) {                  op_ret = size;                  read_size = size;          } -        buf_data->len       = op_ret; -          op_ret = dict_set_dynptr (reply_dict, NULL, buf, op_ret);          if (op_ret < 0) {                  op_ret = -1; @@ -513,44 +560,51 @@ bdb_writev (call_frame_t *frame,          GF_VALIDATE_OR_GOTO (this->name, fd, out);          GF_VALIDATE_OR_GOTO (this->name, vector, out); -        bfd = bdb_extract_bfd (fd, this); -        op_errno = EBADFD; -        GF_VALIDATE_OR_GOTO (this->name, bfd, out); +        BDB_FCTX_GET (fd, this, &bfd); +        if (bfd == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "WRITEV %"PRId64" - %"PRId32",%"PRId64": EBADFD" +                        "(internal fd not found through fd)", +                        fd->inode->ino, count, offset); +                op_ret = -1; +                op_errno = EBADFD; +                goto out; +        }          MAKE_REAL_PATH_TO_STORAGE_DB (db_path, this, bfd->ctx->directory);          op_ret = lstat (db_path, &stbuf); -        op_errno = errno;          if (op_ret != 0) { +                op_errno = errno;                  gf_log (this->name, GF_LOG_ERROR, -                        "failed to lstat on %s (%s)", -                        db_path, strerror (op_errno)); +                        "WRITEV %"PRId64" - %"PRId32",%"PRId64": EINVAL" +                        "(database file missing)", +                        fd->inode->ino, count, offset);                  goto out;          } -          for (idx = 0; idx < count; idx++)                  total_size += vector[idx].iov_len;          if (!is_space_left (this, total_size)) {                  gf_log (this->name, GF_LOG_ERROR, -                        "requested storage for %"GF_PRI_SIZET", ENOSPC", -                        total_size); +                        "WRITEV %"PRId64" - %"PRId32" (%"PRId32"),%"PRId64": " +                        "ENOSPC " +                        "(not enough space after internal measurement)", +                        fd->inode->ino, count, total_size, offset);                  op_ret = -1;                  op_errno = ENOSPC;                  goto out;          } -          /* we are ready to go */          for (idx = 0; idx < count; idx++) { -                c_ret = bdb_db_put (bfd->ctx, NULL, -                                    bfd->key, vector[idx].iov_base, -                                    vector[idx].iov_len, c_off, 0); -                if (c_ret != 0) { +                c_ret = bdb_db_fwrite (bfd, vector[idx].iov_base, +                                       vector[idx].iov_len, c_off); +                if (c_ret < 0) {                          gf_log (this->name, GF_LOG_ERROR, -                                "failed to do bdb_db_put at offset: " -                                "%"PRIu64" for file: %s", -                                c_off, bfd->key); +                                "WRITEV %"PRId64" - %"PRId32",%"PRId64": EINVAL" +                                "(database write at %"PRId64" failed)", +                                fd->inode->ino, count, offset, c_off);                          break;                  } else {                          c_off += vector[idx].iov_len; @@ -559,16 +613,15 @@ bdb_writev (call_frame_t *frame,          } /* for(idx=0;...)... */          if (c_ret) { -                /* write failed */ -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to do bdb_db_put(): %s", -                        db_strerror (op_ret)); -                op_ret = -1; -                op_errno = EBADFD; /* TODO: search for a meaningful errno */ +                /* write failed after a point, not an error */ +                stbuf.st_size   = bdb_db_fread (bfd, NULL, 0, 0); +                stbuf.st_blocks = BDB_COUNT_BLOCKS (stbuf.st_size, +                                                    stbuf.st_blksize);                  goto out;          } +          /* NOTE: we want to increment stbuf->st_size, as stored in db */ -        stbuf.st_size = op_ret; +        stbuf.st_size   = op_ret;          stbuf.st_blocks = BDB_COUNT_BLOCKS (stbuf.st_size, stbuf.st_blksize);          op_errno = 0; @@ -591,9 +644,16 @@ bdb_flush (call_frame_t *frame,          GF_VALIDATE_OR_GOTO ("bdb", this, out);          GF_VALIDATE_OR_GOTO (this->name, fd, out); -        bfd = bdb_extract_bfd (fd, this); -        op_errno = EBADFD; -        GF_VALIDATE_OR_GOTO (this->name, bfd, out); +        BDB_FCTX_GET (fd, this, &bfd); +        if (bfd == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "FLUSH %"PRId64": EBADFD" +                        "(internal fd not found through fd)", +                        fd->inode->ino); +                op_ret = -1; +                op_errno = EBADFD; +                goto out; +        }          /* do nothing */          op_ret = 0; @@ -613,23 +673,27 @@ bdb_release (xlator_t *this,          int32_t op_errno = EBADFD;          struct bdb_fd *bfd = NULL; -        if ((bfd = bdb_extract_bfd (fd, this)) == NULL){ -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to extract %s specific information from fd:%p", -                        this->name, fd); +        BDB_FCTX_GET (fd, this, &bfd); +        if (bfd == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "RELEASE %"PRId64": EBADFD" +                        "(internal fd not found through fd)", +                        fd->inode->ino);                  op_ret = -1;                  op_errno = EBADFD; -        } else { -                bctx_unref (bfd->ctx); -                bfd->ctx = NULL; +                goto out; +        } -                if (bfd->key) -                        free (bfd->key); /* we did strdup() in bdb_open() */ -                free (bfd); -                op_ret = 0; -                op_errno = 0; -        } /* if((fd->ctx == NULL)...)...else */ +        bctx_unref (bfd->ctx); +        bfd->ctx = NULL; +        if (bfd->key) +                FREE (bfd->key); /* we did strdup() in bdb_open() */ +        FREE (bfd); +        op_ret = 0; +        op_errno = 0; + +out:          return 0;  }/* bdb_release */ @@ -656,15 +720,16 @@ bdb_lk (call_frame_t *frame,  {          struct flock nullock = {0, }; -        gf_bdb_lk_log++; -        if (!(gf_bdb_lk_log % GF_UNIVERSAL_ANSWER)) { -                gf_log (this->name, GF_LOG_ERROR, -                        "\"features/posix-locks\" translator is not loaded, " -                        "you need to use it"); +        if (BDB_TIMED_LOG (ENOTSUP, gf_bdb_lk_log)) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "LK %"PRId64": ENOTSUP " +                        "(load \"features/locks\" translator to enable " +                        "lock support)", +                        fd->inode->ino);          }          frame->root->rsp_refs = NULL; -        STACK_UNWIND (frame, -1, ENOSYS, &nullock); +        STACK_UNWIND (frame, -1, ENOTSUP, &nullock);          return 0;  }/* bdb_lk */ @@ -678,8 +743,8 @@ bdb_lk (call_frame_t *frame,   * case 1 and 2 are handled by doing lstat() on the @loc. if the file is a   * directory or symlink, lstat() succeeds. lookup continues to check if the   * @loc belongs to case-3 only if lstat() fails. - * to check for case 3, bdb_lookup does a bdb_db_get() for the given @loc. - * (see description of bdb_db_get() for more details on how @loc is transformed + * to check for case 3, bdb_lookup does a bdb_db_iread() for the given @loc. + * (see description of bdb_db_iread() for more details on how @loc is transformed   * into db handle and key). if check for case 1, 2 and 3 fail, we proceed to   * conclude that file doesn't exist (case 4).   * @@ -741,20 +806,26 @@ bdb_lookup (call_frame_t *frame,          if (!strcmp (directory, loc->path)) {                  /* SPECIAL CASE: looking up root */                  op_ret = lstat (real_path, &stbuf); -                op_errno = errno;                  if (op_ret != 0) { -                        gf_log (this->name, GF_LOG_ERROR, -                                "failed to lstat on %s (%s)", -                                real_path, strerror (op_errno)); +                        op_errno = errno; +                        gf_log (this->name, GF_LOG_DEBUG, +                                "LOOKUP %"PRId64" (%s): %s", +                                loc->ino, loc->path, strerror (op_errno));                          goto out;                  }                  /* bctx_lookup() returns NULL only when its time to wind up,                   * we should shutdown functioning */                  bctx = bctx_lookup (B_TABLE(this), (char *)loc->path); -                op_ret = -1; -                op_errno = EINVAL; -                GF_VALIDATE_OR_GOTO (this->name, bctx, out); +                if (bctx == NULL) { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "LOOKUP %"PRId64" (%s): ENOMEM" +                                "(failed to lookup database handle)", +                                loc->ino, loc->path); +                        op_ret   = -1; +                        op_errno = ENOMEM; +                        goto out; +                }                  stbuf.st_ino = 1;                  stbuf.st_mode = private->dir_mode; @@ -767,80 +838,99 @@ bdb_lookup (call_frame_t *frame,          op_ret = lstat (real_path, &stbuf);          if ((op_ret == 0) && (S_ISDIR (stbuf.st_mode))){                  bctx = bctx_lookup (B_TABLE(this), (char *)loc->path); -                op_ret = -1; -                op_errno = ENOMEM; -                GF_VALIDATE_OR_GOTO (this->name, bctx, out); +                if (bctx == NULL) { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "LOOKUP %"PRId64"/%s (%s): ENOMEM" +                                "(failed to lookup database handle)", +                                loc->parent->ino, loc->name, loc->path); +                        op_ret   = -1; +                        op_errno = ENOMEM; +                        goto out; +                }                  if (loc->ino) {                          /* revalidating directory inode */ -                        gf_log (this->name, GF_LOG_DEBUG, -                                "revalidating directory %s", -                                (char *)loc->path);                          stbuf.st_ino = loc->ino;                  } else { -                        stbuf.st_ino = bdb_inode_transform (stbuf.st_ino, bctx); +                        stbuf.st_ino = bdb_inode_transform (loc->parent->ino, +                                                            key_string, +                                                            strlen (key_string));                  }                  stbuf.st_mode = private->dir_mode; +                  op_ret = 0; -                op_errno = 0;                  goto out; +          } else if (op_ret == 0) {                  /* a symlink */ -                gf_log (this->name, GF_LOG_DEBUG, -                        "lookup called for symlink: %s", -                        loc->path);                  bctx = bctx_parent (B_TABLE(this), loc->path); -                op_ret = -1; -                op_errno = ENOMEM; -                GF_VALIDATE_OR_GOTO (this->name, bctx, out); +                if (bctx == NULL) { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "LOOKUP %"PRId64"/%s (%s): ENOMEM" +                                "(failed to lookup database handle)", +                                loc->parent->ino, loc->name, loc->path); +                        op_ret   = -1; +                        op_errno = ENOMEM; +                        goto out; +                }                  if (loc->ino) {                          stbuf.st_ino = loc->ino;                  } else { -                        stbuf.st_ino = bdb_inode_transform (stbuf.st_ino, bctx); +                        stbuf.st_ino = bdb_inode_transform (loc->parent->ino, +                                                            key_string, +                                                            strlen (key_string));                  } +                  stbuf.st_mode = private->symlink_mode; +                  op_ret = 0; -                op_errno = 0;                  goto out; +          }          /* for regular files */          bctx = bctx_parent (B_TABLE(this), loc->path); -        op_ret = -1; -        op_errno = ENOENT; -        GF_VALIDATE_OR_GOTO (this->name, bctx, out); +        if (bctx == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "LOOKUP %"PRId64"/%s (%s): ENOMEM" +                        "(failed to lookup database handle for parent)", +                        loc->parent->ino, loc->name, loc->path); +                op_ret   = -1; +                op_errno = ENOMEM; +                goto out; +        }          if (GF_FILE_CONTENT_REQUESTED(xattr_req, &need_xattr)) { -                entry_size = bdb_db_get (bctx, NULL, -                                         loc->path, &file_content, -                                         0, 0); +                entry_size = bdb_db_iread (bctx, key_string, &file_content);          } else { -                entry_size = bdb_db_get (bctx, NULL, loc->path, NULL, -                                         0, 0); +                entry_size = bdb_db_iread (bctx, key_string, NULL);          }          op_ret = entry_size; -        op_errno = ENOENT;          if (op_ret == -1) {                  gf_log (this->name, GF_LOG_DEBUG, -                        "returning ENOENT for %s", -                        loc->path); +                        "LOOKUP %"PRId64"/%s (%s): ENOENT" +                        "(database entry not found)", +                        loc->parent->ino, loc->name, loc->path); +                op_errno = ENOENT;                  goto out;          }          MAKE_REAL_PATH_TO_STORAGE_DB (db_path, this, bctx->directory);          op_ret = lstat (db_path, &stbuf); -        op_errno = errno;          if (op_ret != 0) { -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to lstat on %s (%s)", -                        db_path, strerror (op_errno)); +                op_errno = errno; +                gf_log (this->name, GF_LOG_DEBUG, +                        "LOOKUP %"PRId64"/%s (%s): %s", +                        loc->parent->ino, loc->name, loc->path, +                        strerror (op_errno));                  goto out;          } -        if ((need_xattr >= entry_size) -            && (entry_size) && (file_content)) { +        if (entry_size +            && (need_xattr >= entry_size) +            && (file_content)) {                  xattr = dict_new ();                  op_ret = dict_set_dynptr (xattr, "glusterfs.content",                                            file_content, entry_size); @@ -861,7 +951,9 @@ bdb_lookup (call_frame_t *frame,                                                      stbuf.st_blksize);          } else {                  /* fresh lookup, create an inode number */ -                stbuf.st_ino = bdb_inode_transform (stbuf.st_ino, bctx); +                stbuf.st_ino = bdb_inode_transform (loc->parent->ino, +                                                    key_string, +                                                    strlen (key_string));                  stbuf.st_size = entry_size;                  stbuf.st_blocks = BDB_COUNT_BLOCKS (stbuf.st_size,                                                      stbuf.st_blksize); @@ -930,21 +1022,28 @@ bdb_stat (call_frame_t *frame,          }          bctx = bctx_parent (B_TABLE(this), loc->path); -        op_ret = -1; -        op_errno = ENOENT; -        GF_VALIDATE_OR_GOTO (this->name, bctx, out); +        if (bctx == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "STAT %"PRId64" (%s): ENOMEM" +                        "(no database handle for parent)", +                        loc->ino, loc->path); +                op_ret = -1; +                op_errno = ENOMEM; +                goto out; +        }          MAKE_REAL_PATH_TO_STORAGE_DB (db_path, this, bctx->directory);          op_ret = lstat (db_path, &stbuf); -        op_errno = errno; -        if (op_ret != 0) { -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to lstat on %s (%s)", -                        db_path, strerror (op_errno)); +        if (op_ret < 0) { +                op_errno = errno; +                gf_log (this->name, GF_LOG_DEBUG, +                        "STAT %"PRId64" (%s): %s" +                        "(failed to stat on database file)", +                        loc->ino, loc->path, strerror (op_errno));                  goto out;          } -        stbuf.st_size = bdb_db_get (bctx, NULL, loc->path, NULL, 0, 0); +        stbuf.st_size = bdb_db_iread (bctx, loc->path, NULL);          stbuf.st_blocks = BDB_COUNT_BLOCKS (stbuf.st_size, stbuf.st_blksize);          stbuf.st_ino = loc->inode->ino; @@ -999,34 +1098,70 @@ bdb_opendir (call_frame_t *frame,          MAKE_REAL_PATH (real_path, this, loc->path);          bctx = bctx_lookup (B_TABLE(this), (char *)loc->path); -        op_errno = EBADFD; -        GF_VALIDATE_OR_GOTO (this->name, bctx, out); +        if (bctx == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "OPENDIR %"PRId64" (%s): ENOMEM" +                        "(no database handle for directory)", +                        loc->ino, loc->path); +                op_ret = -1; +                op_errno = ENOMEM; +                goto out; +        }          bfd = CALLOC (1, sizeof (*bfd)); -        op_errno = ENOMEM; -        GF_VALIDATE_OR_GOTO (this->name, bfd, out); +        if (bfd == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "OPENDIR %"PRId64" (%s): ENOMEM" +                        "(failed to allocate memory for internal fd)", +                        loc->ino, loc->path); +                op_ret = -1; +                op_errno = ENOMEM; +                goto err; +        }          bfd->dir = opendir (real_path); -        op_errno = errno; -        GF_VALIDATE_OR_GOTO (this->name, bfd->dir, out); +        if (bfd->dir == NULL) { +                op_ret   = -1; +                op_errno = errno; +                gf_log (this->name, GF_LOG_DEBUG, +                        "OPENDIR %"PRId64" (%s): %s", +                        loc->ino, loc->path, strerror (op_errno)); +                goto err; +        }          /* NOTE: bctx_lookup() return bctx with ref */          bfd->ctx = bctx;          bfd->path = strdup (real_path); -        op_errno = ENOMEM; -        GF_VALIDATE_OR_GOTO (this->name, bfd->path, out); +        if (bfd == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "OPENDIR %"PRId64" (%s): ENOMEM" +                        "(failed to allocate memory for internal fd->path)", +                        loc->ino, loc->path); +                op_ret = -1; +                op_errno = ENOMEM; +                goto err; +        } -        BDB_SET_BFD (this, fd, bfd); +        BDB_FCTX_SET (fd, this, bfd);          op_ret = 0;  out:          frame->root->rsp_refs = NULL;          STACK_UNWIND (frame, op_ret, op_errno, fd); +        return 0; +err: +        if (bctx) +                bctx_unref (bctx); +        if (bfd) { +                if (bfd->dir) +                        closedir (bfd->dir); + +                FREE (bfd); +        }          return 0;  }/* bdb_opendir */ -  int32_t  bdb_getdents (call_frame_t *frame,                xlator_t     *this, @@ -1035,192 +1170,281 @@ bdb_getdents (call_frame_t *frame,                off_t         off,                int32_t       flag)  { -        int32_t         op_ret         = -1; -        int32_t         op_errno       = EINVAL; +        struct bdb_dir *bfd        = NULL; +        int32_t         op_ret     = -1; +        int32_t         op_errno   = EINVAL; +        size_t          filled     = 0; +        dir_entry_t     entries    = {0, }; +        dir_entry_t    *this_entry = NULL; +        char           *entry_path     = NULL; +        struct dirent  *dirent         = NULL; +        off_t           in_case    = 0; +        int32_t         this_size  = 0; +        DBC            *cursorp    = NULL;          int32_t         ret            = -1;          int32_t         real_path_len  = 0;          int32_t         entry_path_len = 0;          int32_t         count          = 0; -        char           *real_path      = NULL; -        char           *entry_path     = NULL; -        char           *db_path        = NULL; -        dir_entry_t     entries        = {0, }; -        dir_entry_t    *tmp            = NULL; -        DIR            *dir            = NULL; -        struct dirent  *dirent         = NULL; -        struct bdb_dir *bfd            = NULL; +        off_t   offset = 0; +        size_t          tmp_name_len   = 0;          struct stat     db_stbuf       = {0,};          struct stat     buf            = {0,}; -        DBC            *cursorp        = NULL; -        size_t          tmp_name_len   = 0;          GF_VALIDATE_OR_GOTO ("bdb", frame, out);          GF_VALIDATE_OR_GOTO ("bdb", this, out);          GF_VALIDATE_OR_GOTO (this->name, fd, out); -        bfd = bdb_extract_bfd (fd, this); -        op_errno = EBADFD; -        GF_VALIDATE_OR_GOTO (this->name, bfd, out); +        BDB_FCTX_GET (fd, this, &bfd); +        if (bfd == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "GETDENTS %"PRId64" - %"PRId32",%"PRId64" %o: EBADFD " +                        "(failed to find internal context in fd)", +                        fd->inode->ino, size, off, flag); +                op_errno = EBADFD; +                op_ret   = -1; +                goto out; +        } -        MAKE_REAL_PATH (real_path, this, bfd->path); -        dir = bfd->dir; +        op_ret = bdb_cursor_open (bfd->ctx, &cursorp); +        if (op_ret < 0) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "GETDENTS %"PRId64" - %"PRId32",%"PRId64": EBADFD " +                        "(failed to open cursor to database handle)", +                        fd->inode->ino, size, off); +                op_errno = EBADFD; +                goto out; +        } -        while ((dirent = readdir (dir))) { -                if (!dirent) +        if (off) { +                DBT sec = {0,}, pri = {0,}, val = {0,}; +                sec.data = &(off); +                sec.size = sizeof (off); +                sec.flags = DB_DBT_USERMEM; +                val.dlen = 0; +                val.doff = 0; +                val.flags = DB_DBT_PARTIAL; + +                op_ret = bdb_cursor_get (cursorp, &sec, &pri, &val, DB_SET); +                if (op_ret == DB_NOTFOUND) { +                        offset = off; +                        goto dir_read; +                } +        } + +        while (filled <= size) { +                DBT sec = {0,}, pri = {0,}, val = {0,}; + +                this_entry = NULL; + +                sec.flags = DB_DBT_MALLOC; +                pri.flags = DB_DBT_MALLOC; +                val.dlen = 0; +                val.doff = 0; +                val.flags = DB_DBT_PARTIAL; +                op_ret = bdb_cursor_get (cursorp, &sec, &pri, &val, DB_NEXT); + +                if (op_ret == DB_NOTFOUND) { +                        /* we reached end of the directory */ +                        op_ret = 0; +                        op_errno = 0; +                        break; +                } else if (op_ret < 0) { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "GETDENTS %"PRId64" - %"PRId32",%"PRId64":" +                                "(failed to read the next entry from database)", +                                fd->inode->ino, size, off); +                        op_errno = ENOENT;                          break; +                } /* if (op_ret == DB_NOTFOUND)...else if...else */ -                if (IS_BDB_PRIVATE_FILE(dirent->d_name)) { +                if (pri.data == NULL) { +                        /* NOTE: currently ignore when we get key.data == NULL. +                         * FIXME: we should not get key.data = NULL */ +                        gf_log (this->name, GF_LOG_DEBUG, +                                "GETDENTS %"PRId64" - %"PRId32",%"PRId64":" +                                "(null key read for entry from database)", +                                fd->inode->ino, size, off);                          continue; +                }/* if(key.data)...else */ + +                this_entry = CALLOC (1, sizeof (*this_entry)); +                if (this_entry == NULL) { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "GETDENTS %"PRId64" - %"PRId32",%"PRId64" - %s:" +                                "(failed to allocate memory for an entry)", +                                fd->inode->ino, size, off, strerror (errno)); +                        op_errno = ENOMEM; +                        op_ret   = -1; +                        goto out;                  } +                this_entry->name = CALLOC (pri.size + 1, sizeof (char)); +                if (this_entry->name == NULL) { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "GETDENTS %"PRId64" - %"PRId32",%"PRId64" - %s:" +                                "(failed to allocate memory for an " +                                "entry->name)", +                                fd->inode->ino, size, off, strerror (errno)); +                        op_errno = ENOMEM; +                        op_ret   = -1; +                        goto out; +                } + +                memcpy (this_entry->name, pri.data, pri.size); +                this_entry->buf = db_stbuf; +                this_entry->buf.st_size = bdb_db_iread (bfd->ctx, +                                                        this_entry->name, NULL); +                this_entry->buf.st_blocks = BDB_COUNT_BLOCKS ( +                        this_entry->buf.st_size, +                        this_entry->buf.st_blksize); + +                this_entry->buf.st_ino = bdb_inode_transform (fd->inode->ino, +                                                              pri.data, +                                                              pri.size); +                count++; + +                this_entry->next = entries.next; +                this_entry->link = ""; +                entries.next = this_entry; +                /* if size is 0, count can never be = size, +                 * so entire dir is read */ +                if (sec.data) +                        FREE (sec.data); + +                if (pri.data) +                        FREE (pri.data); + +                if (count == size) +                        break; +        }/* while */ +        bdb_cursor_close (bfd->ctx, cursorp); +        op_ret = count; +        op_errno = 0; +        if (count >= size) +                goto out; +dir_read: +        /* hungry kyaa? */ +        if (!offset) { +                rewinddir (bfd->dir); +        } else { +                seekdir (bfd->dir, offset); +        } + +        while (filled <= size) { +                this_entry = NULL; +                this_size  = 0; + +                in_case = telldir (bfd->dir); +                dirent = readdir (bfd->dir); +                if (!dirent) +                        break; + +                if (IS_BDB_PRIVATE_FILE(dirent->d_name)) +                        continue; +                  tmp_name_len = strlen (dirent->d_name);                  if (entry_path_len < (real_path_len + 1 + (tmp_name_len) + 1)) {                          entry_path_len = real_path_len + tmp_name_len + 1024;                          entry_path = realloc (entry_path, entry_path_len); -                        op_errno = ENOMEM; -                        GF_VALIDATE_OR_GOTO (this->name, entry_path, out); +                        if (entry_path == NULL) { +                                gf_log (this->name, GF_LOG_DEBUG, +                                        "GETDENTS %"PRId64" - %"PRId32"," +                                        "%"PRId64" - %s: (failed to allocate " +                                        "memory for an entry_path)", +                                        fd->inode->ino, size, off, +                                        strerror (errno)); +                                op_errno = ENOMEM; +                                op_ret   = -1; +                                goto out; +                        }                  }                  strncpy (&entry_path[real_path_len+1], dirent->d_name,                           tmp_name_len);                  op_ret = stat (entry_path, &buf); -                op_errno = errno; -                if (op_ret != 0) { -                        gf_log (this->name, GF_LOG_ERROR, -                                "failed to lstat on %s (%s)", -                                entry_path, strerror (op_errno)); -                        goto out; +                if (op_ret < 0) { +                        op_errno = errno; +                        gf_log (this->name, GF_LOG_DEBUG, +                                "GETDENTS %"PRId64" - %"PRId32",%"PRId64" - %s:" +                                " (failed to stat on an entry '%s')", +                                fd->inode->ino, size, off, +                                strerror (errno), entry_path); +                        goto out; /* FIXME: shouldn't we continue here */                  }                  if ((flag == GF_GET_DIR_ONLY) && -                    (ret != -1 && !S_ISDIR(buf.st_mode))) { +                    ((ret != -1) && (!S_ISDIR(buf.st_mode)))) {                          continue;                  } -                tmp = CALLOC (1, sizeof (*tmp)); -                op_errno = ENOMEM; -                GF_VALIDATE_OR_GOTO (this->name, tmp, out); +                this_entry = CALLOC (1, sizeof (*this_entry)); +                if (this_entry == NULL) { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "GETDENTS %"PRId64" - %"PRId32",%"PRId64" - %s:" +                                "(failed to allocate memory for an entry)", +                                fd->inode->ino, size, off, strerror (errno)); +                        op_errno = ENOMEM; +                        op_ret   = -1; +                        goto out; +                } -                tmp->name = strdup (dirent->d_name); -                op_errno = ENOMEM; -                GF_VALIDATE_OR_GOTO (this->name, dirent->d_name, out); +                this_entry->name = strdup (dirent->d_name); +                if (this_entry->name == NULL) { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "GETDENTS %"PRId64" - %"PRId32",%"PRId64" - %s:" +                                "(failed to allocate memory for an " +                                "entry->name)", +                                fd->inode->ino, size, off, strerror (errno)); +                        op_errno = ENOMEM; +                        op_ret   = -1; +                        goto out; +                } -                memcpy (&tmp->buf, &buf, sizeof  (buf)); +                this_entry->buf = buf; -                tmp->buf.st_ino = -1; -                if (S_ISLNK(tmp->buf.st_mode)) { +                this_entry->buf.st_ino = -1; +                if (S_ISLNK(this_entry->buf.st_mode)) {                          char linkpath[ZR_PATH_MAX] = {0,};                          ret = readlink (entry_path, linkpath, ZR_PATH_MAX);                          if (ret != -1) {                                  linkpath[ret] = '\0'; -                                tmp->link = strdup (linkpath); +                                this_entry->link = strdup (linkpath);                          }                  } else { -                        tmp->link = ""; +                        this_entry->link = "";                  }                  count++; -                tmp->next = entries.next; -                entries.next = tmp; -                /* if size is 0, count can never be = size, -                   so entire dir is read */ +                this_entry->next = entries.next; +                entries.next = this_entry; +                /* if size is 0, count can never be = size, +                 * so entire dir is read */                  if (count == size)                          break;          } - -        if ((flag != GF_GET_DIR_ONLY) && (count < size)) { -                /* read from db */ -                op_ret = bdb_cursor_open (bfd->ctx, &cursorp); -                op_errno = EINVAL; -                GF_VALIDATE_OR_GOTO (this->name, (op_ret == 0), out); - -                MAKE_REAL_PATH_TO_STORAGE_DB (db_path, this, -                                              bfd->ctx->directory); -                op_ret = lstat (db_path, &db_stbuf); -                op_errno = errno; -                if (op_ret != 0) { -                        gf_log (this->name, GF_LOG_ERROR, -                                "failed to lstat on %s (%s)", -                                db_path, strerror (op_errno)); -                        goto out; -                } - -                /* read all the entries in database, one after the other and -                 * put into dictionary */ -                while (1) { -                        DBT key = {0,}, value = {0,}; - -                        key.flags = DB_DBT_MALLOC; -                        value.flags = DB_DBT_MALLOC; -                        op_ret = bdb_cursor_get (cursorp, &key, &value, -                                                 DB_NEXT); - -                        if (op_ret == DB_NOTFOUND) { -                                gf_log (this->name, GF_LOG_DEBUG, -                                        "end of list of key/value pair in db" -                                        " for directory: %s", -                                        bfd->ctx->directory); -                                op_ret = 0; -                                op_errno = 0; -                                break; -                        } else if (op_ret != 0){ -                                gf_log (this->name, GF_LOG_ERROR, -                                        "failed to do cursor get for " -                                        "directory %s: %s", -                                        bfd->ctx->directory, -                                        db_strerror (op_ret)); -                                op_ret = -1; -                                op_errno = ENOENT; -                                break; -                        } -                        /* successfully read */ -                        tmp = CALLOC (1, sizeof (*tmp)); -                        op_errno = ENOMEM; -                        GF_VALIDATE_OR_GOTO (this->name, tmp, out); - -                        tmp->name = CALLOC (1, key.size + 1); -                        op_errno = ENOMEM; -                        GF_VALIDATE_OR_GOTO (this->name, tmp->name, out); - -                        memcpy (tmp->name, key.data, key.size); -                        tmp->buf = db_stbuf; -                        tmp->buf.st_size = bdb_db_get (bfd->ctx, NULL, -                                                       tmp->name, NULL, -                                                       0, 0); -                        tmp->buf.st_blocks = BDB_COUNT_BLOCKS (tmp->buf.st_size, \ -                                                               tmp->buf.st_blksize); -                        /* FIXME: wat will be the effect of this? */ -                        tmp->buf.st_ino = -1; -                        count++; - -                        tmp->next = entries.next; -                        tmp->link = ""; -                        entries.next = tmp; -                        /* if size is 0, count can never be = size, so entire dir is read */ -                        if (count == size) -                                break; - -                        free (key.data); -                } /* while(1){ } */ -                bdb_cursor_close (bfd->ctx, cursorp); -        } else { -                /* do nothing */ -        } -        FREE (entry_path); -        op_ret = 0; +        op_ret = filled; +        op_errno = 0;  out:          frame->root->rsp_refs = NULL; -        STACK_UNWIND (frame, op_ret, op_errno, &entries, count); + +        gf_log (this->name, GF_LOG_DEBUG, +                "GETDENTS %"PRId64" - %"PRId32" (%"PRId32")/%"PRId32"," +                "%"PRId64":" +                "(failed to read the next entry from database)", +                fd->inode->ino, filled, count, size, off); + +        STACK_UNWIND (frame, count, op_errno, &entries);          while (entries.next) { -                tmp = entries.next; +                this_entry = entries.next;                  entries.next = entries.next->next; -                FREE (tmp->name); -                FREE (tmp); +                FREE (this_entry->name); +                FREE (this_entry);          } +          return 0;  }/* bdb_getdents */ @@ -1233,34 +1457,43 @@ bdb_releasedir (xlator_t *this,          int32_t op_errno = 0;          struct bdb_dir *bfd = NULL; -        if ((bfd = bdb_extract_bfd (fd, this)) == NULL) { -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to extract fd data from fd=%p", fd); -                op_ret = -1; -                op_errno = EBADF; +        BDB_FCTX_GET (fd, this, &bfd); +        if (bfd == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "RELEASEDIR %"PRId64": EBADFD", +                        fd->inode->ino); +                op_errno = EBADFD; +                op_ret   = -1; +                goto out; +        } + +        if (bfd->path) { +                free (bfd->path);          } else { -                if (bfd->path) { -                        free (bfd->path); -                } else { -                        gf_log (this->name, GF_LOG_ERROR, "bfd->path was NULL. fd=%p bfd=%p", -                                fd, bfd); -                } +                gf_log (this->name, GF_LOG_DEBUG, +                        "RELEASEDIR %"PRId64": (bfd->path is NULL)", +                        fd->inode->ino); +        } -                if (bfd->dir) { -                        closedir (bfd->dir); -                } else { -                        gf_log (this->name, GF_LOG_ERROR, -                                "bfd->dir is NULL."); -                } -                if (bfd->ctx) { -                        bctx_unref (bfd->ctx); -                } else { -                        gf_log (this->name, GF_LOG_ERROR, -                                "bfd->ctx is NULL"); -                } -                free (bfd); +        if (bfd->dir) { +                closedir (bfd->dir); +        } else { +                gf_log (this->name, GF_LOG_DEBUG, +                        "RELEASEDIR %"PRId64": (bfd->dir is NULL)", +                        fd->inode->ino);          } +        if (bfd->ctx) { +                bctx_unref (bfd->ctx); +        } else { +                gf_log (this->name, GF_LOG_DEBUG, +                        "RELEASEDIR %"PRId64": (bfd->ctx is NULL)", +                        fd->inode->ino); +        } + +        free (bfd); + +out:          return 0;  }/* bdb_releasedir */ @@ -1290,12 +1523,11 @@ bdb_readlink (call_frame_t *frame,          if (op_ret > 0)                  dest[op_ret] = 0; -        op_errno = errno; -          if (op_ret == -1) { +                op_errno = errno;                  gf_log (this->name, GF_LOG_DEBUG, -                        "readlink failed on %s: %s", -                        loc->path, strerror (op_errno)); +                        "READLINK %"PRId64" (%s): %s", +                        loc->ino, loc->path, strerror (op_errno));          }  out:          frame->root->rsp_refs = NULL; @@ -1317,57 +1549,69 @@ bdb_mkdir (call_frame_t *frame,          char *real_path = NULL;          struct stat stbuf = {0, };          bctx_t *bctx = NULL; +        char *key_string = NULL;          GF_VALIDATE_OR_GOTO ("bdb", frame, out);          GF_VALIDATE_OR_GOTO ("bdb", this, out);          GF_VALIDATE_OR_GOTO (this->name, loc, out); +        MAKE_KEY_FROM_PATH (key_string, loc->path);          MAKE_REAL_PATH (real_path, this, loc->path);          op_ret = mkdir (real_path, mode); -        op_errno = errno; -        if (op_ret != 0) { -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to mkdir %s (%s)", -                        real_path, strerror (op_errno)); +        if (op_ret < 0) { +                op_errno = errno; +                gf_log (this->name, GF_LOG_DEBUG, +                        "MKDIR %"PRId64" (%s): %s", +                        loc->ino, loc->path, strerror (op_errno));                  goto out;          }          op_ret = chown (real_path, frame->root->uid, frame->root->gid); -        op_errno = errno; -        if (op_ret != 0) { -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to chmod on %s (%s)", -                        real_path, strerror (op_errno)); +        if (op_ret < 0) { +                op_errno = errno; +                gf_log (this->name, GF_LOG_DEBUG, +                        "MKDIR %"PRId64" (%s): %s " +                        "(failed to do chmod)", +                        loc->ino, loc->path, strerror (op_errno));                  goto err;          }          op_ret = lstat (real_path, &stbuf); -        op_errno = errno; -        if (op_ret != 0) { -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to lstat on %s (%s)", -                        real_path, strerror (op_errno)); +        if (op_ret < 0) { +                op_errno = errno; +                gf_log (this->name, GF_LOG_DEBUG, +                        "MKDIR %"PRId64" (%s): %s " +                        "(failed to do lstat)", +                        loc->ino, loc->path, strerror (op_errno));                  goto err;          }          bctx = bctx_lookup (B_TABLE(this), (char *)loc->path); -        op_errno = ENOMEM; -        GF_VALIDATE_OR_GOTO (this->name, bctx, err); +        if (bctx == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "MKDIR %"PRId64" (%s): ENOMEM" +                        "(no database handle for parent)", +                        loc->ino, loc->path); +                op_ret = -1; +                op_errno = ENOMEM; +                goto err; +        } -        stbuf.st_ino = bdb_inode_transform (stbuf.st_ino, bctx); +        stbuf.st_ino = bdb_inode_transform (loc->parent->ino, key_string, +                                            strlen (key_string));          goto out;  err:          ret = rmdir (real_path); -        if (ret != 0) { -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to rmdir the directory created (%s)", -                        strerror (errno)); +        if (ret < 0) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "MKDIR %"PRId64" (%s): %s" +                        "(failed to do rmdir)", +                        loc->ino, loc->path, strerror (errno));          } -  out:          if (bctx) {                  /* NOTE: bctx_unref always returns success, @@ -1391,27 +1635,36 @@ bdb_unlink (call_frame_t *frame,          int32_t op_errno  = EINVAL;          bctx_t *bctx      = NULL;          char   *real_path = NULL; +        char   *key_string = NULL;          GF_VALIDATE_OR_GOTO ("bdb", frame, out);          GF_VALIDATE_OR_GOTO ("bdb", this, out);          GF_VALIDATE_OR_GOTO (this->name, loc, out);          bctx = bctx_parent (B_TABLE(this), loc->path); -        op_errno = ENOENT; -        GF_VALIDATE_OR_GOTO (this->name, bctx, out); +        if (bctx == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "UNLINK %"PRId64" (%s): ENOMEM" +                        "(no database handle for parent)", +                        loc->ino, loc->path); +                op_ret = -1; +                op_errno = ENOMEM; +                goto out; +        } -        op_ret = bdb_db_del (bctx, NULL, loc->path); +        MAKE_KEY_FROM_PATH (key_string, loc->path); +        op_ret = bdb_db_iremove (bctx, key_string);          if (op_ret == DB_NOTFOUND) {                  MAKE_REAL_PATH (real_path, this, loc->path);                  op_ret = unlink (real_path); -                op_errno = errno;                  if (op_ret != 0) { -                        gf_log (this->name, GF_LOG_ERROR, -                                "failed to unlink on %s (%s)", -                                real_path, strerror (op_errno)); +                        op_errno = errno; +                        gf_log (this->name, GF_LOG_DEBUG, +                                "UNLINK %"PRId64" (%s): %s" +                                "(symlink unlink failed)", +                                loc->ino, loc->path, strerror (op_errno));                          goto out;                  } -          } else if (op_ret == 0) {                  op_errno = 0;          } @@ -1430,7 +1683,7 @@ out: -int32_t +static int32_t  bdb_do_rmdir (xlator_t *this,                loc_t *loc)  { @@ -1448,38 +1701,46 @@ bdb_do_rmdir (xlator_t *this,          MAKE_REAL_PATH (real_path, this, loc->path);          bctx = bctx_lookup (B_TABLE(this), loc->path); -        GF_VALIDATE_OR_GOTO (this->name, bctx, out); +        if (bctx == NULL) { +                ret = -ENOMEM; +                goto out; +        }          LOCK(&bctx->lock);          { -                if (bctx->dbp == NULL) { +                if ((bctx->primary == NULL) +                    || (bctx->secondary == NULL)) {                          goto unlock;                  } -                ret = bctx->dbp->close (bctx->dbp, 0); -                GF_VALIDATE_OR_GOTO (this->name, (ret == 0), unlock); +                ret = bctx->primary->close (bctx->primary, 0); +                if (ret < 0) { +                        ret = -EINVAL; +                } -                bctx->dbp = NULL; +                ret = bctx->secondary->close (bctx->secondary, 0); +                if (ret < 0) { +                        ret = -EINVAL; +                } -                ret = dbenv->dbremove (dbenv, NULL, bctx->db_path, NULL, 0); +                ret = dbenv->dbremove (dbenv, NULL, bctx->db_path, +                                       "primary", 0); +                if (ret < 0) { +                        ret = -EBUSY; +                } + +                ret = dbenv->dbremove (dbenv, NULL, bctx->db_path, +                                       "secondary", 0);                  if (ret != 0) { -                        gf_log (this->name, GF_LOG_ERROR, -                                "failed to DB_ENV->dbremove() on path %s: %s", -                                loc->path, db_strerror (ret)); +                        ret = -EBUSY;                  }          }  unlock:          UNLOCK(&bctx->lock);          if (ret) { -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to remove db %s: %s", -                        bctx->db_path, db_strerror (ret)); -                ret = -1;                  goto out;          } -        gf_log (this->name, GF_LOG_DEBUG, -                "removed db %s", bctx->db_path);          ret = rmdir (real_path);  out: @@ -1498,22 +1759,31 @@ bdb_rmdir (call_frame_t *frame,             loc_t *loc)  {          int32_t op_ret   = -1; -        int32_t op_errno = ENOTEMPTY; +        int32_t op_errno = 0; -        if (!is_dir_empty (this, loc)) { +        op_ret = is_dir_empty (this, loc); +        if (op_ret < 0) { +                op_errno = -op_ret;                  gf_log (this->name, GF_LOG_DEBUG, -                        "rmdir: directory %s not empty", -                        loc->path); +                        "RMDIR %"PRId64" (%s): %s" +                        "(internal rmdir routine returned error)", +                        loc->ino, loc->path, strerror (op_errno)); +        } else if (op_ret == 0) { +                op_ret   = -1;                  op_errno = ENOTEMPTY; -                op_ret = -1; +                gf_log (this->name, GF_LOG_DEBUG, +                        "RMDIR %"PRId64" (%s): ENOTEMPTY", +                        loc->ino, loc->path);                  goto out;          }          op_ret = bdb_do_rmdir (this, loc); -        if (op_ret != 0) { -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to bdb_do_rmdir on %s", -                        loc->path); +        if (op_ret < 0) { +                op_errno = -op_ret; +                gf_log (this->name, GF_LOG_DEBUG, +                        "RMDIR %"PRId64" (%s): %s" +                        "(internal rmdir routine returned error)", +                        loc->ino, loc->path, strerror (op_errno));                  goto out;          } @@ -1536,6 +1806,7 @@ bdb_symlink (call_frame_t *frame,          struct stat         stbuf     = {0,};          struct bdb_private *private   = NULL;          bctx_t             *bctx      = NULL; +        char               *key_string = NULL;          GF_VALIDATE_OR_GOTO ("bdb", frame, out);          GF_VALIDATE_OR_GOTO ("bdb", this, out); @@ -1545,23 +1816,35 @@ bdb_symlink (call_frame_t *frame,          private = this->private;          GF_VALIDATE_OR_GOTO (this->name, private, out); +        MAKE_KEY_FROM_PATH (key_string, loc->path); +          MAKE_REAL_PATH (real_path, this, loc->path);          op_ret = symlink (linkname, real_path);          op_errno = errno;          if (op_ret == 0) {                  op_ret = lstat (real_path, &stbuf); -                op_errno = errno;                  if (op_ret != 0) { -                        gf_log (this->name, GF_LOG_ERROR, -                                "failed to lstat on %s (%s)", -                                real_path, strerror (op_errno)); +                        op_errno = errno; +                        gf_log (this->name, GF_LOG_DEBUG, +                                "SYMLINK %"PRId64" (%s): %s", +                                loc->ino, loc->path, strerror (op_errno));                          goto err;                  }                  bctx = bctx_parent (B_TABLE(this), loc->path); -                GF_VALIDATE_OR_GOTO (this->name, bctx, err); +                if (bctx == NULL) { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "SYMLINK %"PRId64" (%s): ENOMEM" +                                "(no database handle for parent)", +                                loc->ino, loc->path); +                        op_ret = -1; +                        op_errno = ENOMEM; +                        goto err; +                } -                stbuf.st_ino = bdb_inode_transform (stbuf.st_ino, bctx); +                stbuf.st_ino = bdb_inode_transform (loc->parent->ino, +                                                    key_string, +                                                    strlen (key_string));                  stbuf.st_mode = private->symlink_mode;                  goto out; @@ -1570,9 +1853,10 @@ err:          op_ret = unlink (real_path);          op_errno = errno;          if (op_ret != 0) { -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to unlink the previously created symlink (%s)", -                        strerror (op_errno)); +               gf_log (this->name, GF_LOG_DEBUG, +                       "SYMLINK %"PRId64" (%s): %s" +                       "(failed to unlink the created symlink)", +                       loc->ino, loc->path, strerror (op_errno));          }          op_ret = -1;          op_errno = ENOENT; @@ -1608,9 +1892,14 @@ bdb_chmod (call_frame_t *frame,          op_ret = lstat (real_path, &stbuf);          op_errno = errno;          if (op_ret != 0) { -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to lstat on %s (%s)", -                        real_path, strerror (op_errno)); +                if (op_errno == ENOENT) { +                        op_errno = EPERM; +                } else { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "CHMOD %"PRId64" (%s): %s" +                                "(lstat failed)", +                                loc->ino, loc->path, strerror (op_errno)); +                }                  goto out;          } @@ -1644,11 +1933,16 @@ bdb_chown (call_frame_t *frame,          MAKE_REAL_PATH (real_path, this, loc->path);          op_ret = lstat (real_path, &stbuf); -        op_errno = errno;          if (op_ret != 0) { -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to lstat on %s (%s)", -                        real_path, strerror (op_errno)); +                op_errno = errno; +                if (op_errno == ENOENT) { +                        op_errno = EPERM; +                } else { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "CHOWN %"PRId64" (%s): %s" +                                "(lstat failed)", +                                loc->ino, loc->path, strerror (op_errno)); +                }                  goto out;          } @@ -1682,8 +1976,15 @@ bdb_truncate (call_frame_t *frame,          GF_VALIDATE_OR_GOTO (this->name, loc, out);          bctx = bctx_parent (B_TABLE(this), loc->path); -        op_errno = ENOENT; -        GF_VALIDATE_OR_GOTO (this->name, bctx, out); +        if (bctx == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "TRUNCATE %"PRId64" (%s): ENOMEM" +                        "(no database handle for parent)", +                        loc->ino, loc->path); +                op_ret = -1; +                op_errno = ENOMEM; +                goto out; +        }          MAKE_REAL_PATH (real_path, this, loc->path);          MAKE_KEY_FROM_PATH (key_string, loc->path); @@ -1691,26 +1992,29 @@ bdb_truncate (call_frame_t *frame,          /* now truncate */          MAKE_REAL_PATH_TO_STORAGE_DB (db_path, this, bctx->directory);          op_ret = lstat (db_path, &stbuf); -        op_errno = errno;          if (op_ret != 0) { -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to lstat on %s (%s)", -                        db_path, strerror (op_errno)); +                op_errno = errno; +                gf_log (this->name, GF_LOG_DEBUG, +                        "TRUNCATE %"PRId64" (%s): %s" +                        "(lstat on database file failed)", +                        loc->ino, loc->path, strerror (op_errno));                  goto out;          }          if (loc->inode->ino) {                  stbuf.st_ino = loc->inode->ino;          }else { -                stbuf.st_ino = bdb_inode_transform (stbuf.st_ino, bctx); +                stbuf.st_ino = bdb_inode_transform (loc->parent->ino, +                                                    key_string, +                                                    strlen (key_string));          } -        op_ret = bdb_db_put (bctx, NULL, key_string, NULL, 0, 1, 0); -        if (op_ret == -1) { +        op_ret = bdb_db_itruncate (bctx, key_string); +        if (op_ret < 0) {                  gf_log (this->name, GF_LOG_DEBUG, -                        "failed to do bdb_db_put: %s", -                        db_strerror (op_ret)); -                op_ret = -1; +                        "TRUNCATE %"PRId64" (%s): EINVAL" +                        "(truncating entry in  database failed - %s)", +                        loc->ino, loc->path, db_strerror (op_ret));                  op_errno = EINVAL; /* TODO: better errno */          } @@ -1745,40 +2049,44 @@ bdb_utimens (call_frame_t *frame,          GF_VALIDATE_OR_GOTO (this->name, loc, out);          MAKE_REAL_PATH (real_path, this, loc->path); -        op_ret = lstat (real_path, &stbuf); -        op_errno = errno; +        op_ret = sys_lstat (real_path, &stbuf);          if (op_ret != 0) { -                op_errno = EPERM; -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to lstat on %s (%s)", -                        real_path, strerror (op_errno)); +                op_errno = errno; +                if (op_errno == ENOENT) { +                        op_errno = EPERM; +                } else { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "UTIMENS %"PRId64" (%s): %s", +                                loc->ino, loc->path, strerror (op_errno)); +                }                  goto out;          }          /* directory or symlink */ -        tv[0].tv_sec = ts[0].tv_sec; +        tv[0].tv_sec  = ts[0].tv_sec;          tv[0].tv_usec = ts[0].tv_nsec / 1000; -        tv[1].tv_sec = ts[1].tv_sec; +        tv[1].tv_sec  = ts[1].tv_sec;          tv[1].tv_usec = ts[1].tv_nsec / 1000;          op_ret = lutimes (real_path, tv); -        if (op_ret == -1 && errno == ENOSYS) { -                op_ret = utimes (real_path, tv); +        if ((op_ret == -1) && (errno == ENOSYS)) { +                op_ret = sys_utimes (real_path, tv);          } -        op_errno = errno; +          if (op_ret == -1) { -                gf_log (this->name, GF_LOG_WARNING, -                        "utimes on %s failed: %s", -                        loc->path, strerror (op_errno)); +                op_errno = errno; +                gf_log (this->name, GF_LOG_DEBUG, +                        "UTIMENS %"PRId64" (%s): %s", +                        loc->ino, loc->path, strerror (op_errno));                  goto out;          } -        op_ret = lstat (real_path, &stbuf); -        op_errno = errno; +        op_ret = sys_lstat (real_path, &stbuf);          if (op_ret != 0) { -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to lstat on %s (%s)", -                        real_path, strerror (op_errno)); +                op_errno = errno; +                gf_log (this->name, GF_LOG_DEBUG, +                        "UTIMENS %"PRId64" (%s): %s", +                        loc->ino, loc->path, strerror (op_errno));                  goto out;          } @@ -1858,52 +2166,54 @@ bdb_setxattr (call_frame_t *frame,          MAKE_REAL_PATH (real_path, this, loc->path);          if (!S_ISDIR (loc->inode->st_mode)) {                  op_ret   = -1; -                op_errno = EPERM; +                op_errno = ENOATTR;                  goto out;          }          while (trav) { -                if (ZR_FILE_CONTENT_REQUEST(trav->key) ) { -                        bctx = bctx_lookup (B_TABLE(this), loc->path); -                        op_errno = EINVAL; -                        GF_VALIDATE_OR_GOTO (this->name, bctx, out); +                if (GF_FILE_CONTENT_REQUEST(trav->key) ) { +                        key = BDB_KEY_FROM_FREQUEST_KEY(trav->key); -                        key = &(trav->key[15]); +                        bctx = bctx_lookup (B_TABLE(this), loc->path); +                        if (bctx == NULL) { +                                gf_log (this->name, GF_LOG_DEBUG, +                                        "SETXATTR %"PRId64" (%s) - %s: ENOMEM" +                                        "(no database handle for directory)", +                                        loc->ino, loc->path, key); +                                op_ret = -1; +                                op_errno = ENOMEM; +                                goto out; +                        }                          if (flags & XATTR_REPLACE) { -                                /* replace only if previously exists, otherwise -                                 * error out */ -                                op_ret = bdb_db_get (bctx, NULL, key, -                                                     NULL, 0, 0); +                                op_ret = bdb_db_itruncate (bctx, key);                                  if (op_ret == -1) {                                          /* key doesn't exist in database */                                          gf_log (this->name, GF_LOG_DEBUG, -                                                "cannot XATTR_REPLACE, xattr %s" -                                                " doesn't exist on path %s", -                                                key, loc->path); +                                                "SETXATTR %"PRId64" (%s) - %s:" +                                                " (entry not present in " +                                                "database)", +                                                loc->ino, loc->path, key);                                          op_ret = -1; -                                        op_errno = ENOENT; +                                        op_errno = ENOATTR;                                          break;                                  } -                                op_ret = bdb_db_put (bctx, NULL, -                                                     key, trav->value->data, -                                                     trav->value->len, -                                                     op_ret, -                                                     BDB_TRUNCATE_RECORD); +                                op_ret = bdb_db_iwrite (bctx, key, +                                                        trav->value->data, +                                                        trav->value->len);                                  if (op_ret != 0) {                                          op_ret   = -1; -                                        op_errno = EINVAL; +                                        op_errno = ENOATTR;                                          break;                                  }                          } else {                                  /* fresh create */ -                                op_ret = bdb_db_put (bctx, NULL, key, -                                                     trav->value->data, -                                                     trav->value->len, -                                                     0, 0); +                                op_ret = bdb_db_iwrite (bctx, key, +                                                        trav->value->data, +                                                        trav->value->len);                                  if (op_ret != 0) {                                          op_ret   = -1; -                                        op_errno = EINVAL; +                                        op_errno = EEXIST;                                          break;                                  } else {                                          op_ret = 0; @@ -1918,25 +2228,26 @@ bdb_setxattr (call_frame_t *frame,                  } else {                          /* do plain setxattr */                          op_ret = lsetxattr (real_path, -                                            trav->key, -                                            trav->value->data, +                                            trav->key, trav->value->data,                                              trav->value->len,                                              flags);                          op_errno = errno; -                        if ((op_ret == -1) && (op_errno != ENOENT)) { -                                if (op_errno == ENOTSUP) { -                                        gf_bdb_xattr_log++; -                                        if (!(gf_bdb_xattr_log % GF_UNIVERSAL_ANSWER)) { -                                                gf_log (this->name, GF_LOG_WARNING, -                                                        "Extended Attributes support not present."\ -                                                        "Please check"); -                                        } -                                } else { -                                        gf_log (this->name, GF_LOG_DEBUG, -                                                "setxattr failed on %s (%s)", -                                                loc->path, strerror (op_errno)); -                                } + +                        if ((op_errno == ENOATTR) || (op_errno == EEXIST)) { +                                /* don't log, normal behaviour */ +                                ; +                        } else if (BDB_TIMED_LOG (op_errno, gf_bdb_xattr_log)) { +                                gf_log (this->name, GF_LOG_DEBUG, +                                        "SETXATTR %"PRId64" (%s) - %s: %s", +                                        loc->ino, loc->path, trav->key, +                                        strerror (op_errno)); +                                /* do not continue, break out */                                  break; +                        } else { +                                gf_log (this->name, GF_LOG_DEBUG, +                                        "SETXATTR %"PRId64" (%s) - %s: %s", +                                        loc->ino, loc->path, trav->key, +                                        strerror (op_errno));                          }                  } /* if(ZR_FILE_CONTENT_REQUEST())...else */                  trav = trav->next; @@ -1988,109 +2299,131 @@ bdb_getxattr (call_frame_t *frame,          GF_VALIDATE_OR_GOTO (this->name, loc, out);          GF_VALIDATE_OR_GOTO (this->name, name, out); -        dict = get_new_dict (); +        dict = dict_new ();          GF_VALIDATE_OR_GOTO (this->name, dict, out);          if (!S_ISDIR (loc->inode->st_mode)) {                  gf_log (this->name, GF_LOG_DEBUG, -                        "operation not permitted on a non-directory file: %s", -                        loc->path); -                op_ret   = -1; -                op_errno = ENODATA; +                        "GETXATTR %"PRId64" (%s) - %s: ENOATTR " +                        "(not a directory)", +                        loc->ino, loc->path, name); +                op_ret = -1; +                op_errno = ENOATTR;                  goto out;          } -        if (name && ZR_FILE_CONTENT_REQUEST(name)) { +        if (name && GF_FILE_CONTENT_REQUEST(name)) {                  bctx = bctx_lookup (B_TABLE(this), loc->path); -                op_errno = EINVAL; -                GF_VALIDATE_OR_GOTO (this->name, bctx, out); +                if (bctx == NULL) { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "GETXATTR %"PRId64" (%s) - %s: ENOMEM" +                                "(no database handle for directory)", +                                loc->ino, loc->path, name); +                        op_ret = -1; +                        op_errno = ENOMEM; +                        goto out; +                } -                key_string = (char *)&(name[15]); +                key_string = BDB_KEY_FROM_FREQUEST_KEY(name); -                op_ret = bdb_db_get (bctx, NULL, key_string, &buf, 0, 0); +                op_ret = bdb_db_iread (bctx, key_string, &buf);                  if (op_ret == -1) {                          gf_log (this->name, GF_LOG_DEBUG, -                                "failed to db get on directory: %s for key: %s", -                                bctx->directory, name); -                        op_ret   = -1; -                        op_errno = ENODATA; +                                "GETXATTR %"PRId64" (%s) - %s: ENOATTR" +                                "(attribute not present in database)", +                                loc->ino, loc->path, name); +                        op_errno = ENOATTR;                          goto out;                  }                  op_ret = dict_set_dynptr (dict, (char *)name, buf, op_ret);                  if (op_ret < 0) {                          gf_log (this->name, GF_LOG_DEBUG, -                                "failed to set to dictionary"); -                        op_ret = -1; +                                "GETXATTR %"PRId64" (%s) - %s: ENOATTR" +                                "(attribute present in database, " +                                "dict set failed)", +                                loc->ino, loc->path, name);                          op_errno = ENODATA;                  } -        } else { -                MAKE_REAL_PATH (real_path, this, loc->path); -                size = llistxattr (real_path, NULL, 0); -                op_errno = errno; -                if (size <= 0) { -                        /* There are no extended attributes, send an empty -                         * dictionary */ -                        if (size == -1 && op_errno != ENODATA) { -                                if (op_errno == ENOTSUP) { -                                        gf_bdb_xattr_log++; -                                        if (!(gf_bdb_xattr_log % GF_UNIVERSAL_ANSWER)) -                                                gf_log (this->name, GF_LOG_WARNING, -                                                        "Extended Attributes support not present."\ -                                                        "Please check"); -                                } else { -                                        gf_log (this->name, GF_LOG_WARNING, -                                                "llistxattr failed on %s (%s)", -                                                loc->path, strerror (op_errno)); -                                } -                        } -                        op_ret = -1; -                        op_errno = ENODATA; + +                goto out; +        } + +        MAKE_REAL_PATH (real_path, this, loc->path); +        size = sys_llistxattr (real_path, NULL, 0); +        op_errno = errno; +        if (size < 0) { +                if (BDB_TIMED_LOG (op_errno, gf_bdb_xattr_log)) { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "GETXATTR %"PRId64" (%s) - %s: %s", +                                loc->ino, loc->path, name, strerror (op_errno));                  } else { -                        list = alloca (size + 1); -                        op_errno = ENOMEM; -                        GF_VALIDATE_OR_GOTO (this->name, list, out); +                        gf_log (this->name, GF_LOG_DEBUG, +                                "GETXATTR %"PRId64" (%s) - %s: %s", +                                loc->ino, loc->path, name, strerror (op_errno)); +                } +                op_ret = -1; +                op_errno = ENOATTR; -                        size = llistxattr (real_path, list, size); -                        op_ret = size; -                        op_errno = errno; -                        if (size == -1) { -                                gf_log (this->name, GF_LOG_ERROR, -                                        "llistxattr failed on %s (%s)", -                                        loc->path, strerror (errno)); -                                goto out; -                        } -                        remaining_size = size; -                        list_offset = 0; -                        while (remaining_size > 0) { -                                if(*(list+list_offset) == '\0') -                                        break; -                                strcpy (key, list + list_offset); -                                op_ret = lgetxattr (real_path, key, NULL, 0); -                                if (op_ret == -1) -                                        break; -                                value = CALLOC (op_ret + 1, sizeof(char)); -                                GF_VALIDATE_OR_GOTO (this->name, value, out); +                goto out; +        } -                                op_ret = lgetxattr (real_path, key, value, -                                                    op_ret); -                                if (op_ret == -1) -                                        break; -                                value [op_ret] = '\0'; -                                op_ret = dict_set_dynptr (dict, key, -                                                          value, op_ret); -                                if (op_ret < 0) { -                                        FREE (value); -                                        gf_log (this->name, GF_LOG_DEBUG, -                                                "skipping key %s", key); -                                        continue; -                                } -                                remaining_size -= strlen (key) + 1; -                                list_offset += strlen (key) + 1; -                        } /* while(remaining_size>0) */ -                } /* if(size <= 0)...else */ -        } /* if(name...)...else */ +        if (size == 0) +                goto done; + +        list = alloca (size + 1); +        if (list == NULL) { +                op_ret   = -1; +                op_errno = errno; +                gf_log (this->name, GF_LOG_DEBUG, +                        "GETXATTR %"PRId64" (%s) - %s: %s", +                        loc->ino, loc->path, name, strerror (op_errno)); +        } + +        size = sys_llistxattr (real_path, list, size); +        op_ret   = size; +        if (op_ret == -1) { +                op_errno = errno; +                gf_log (this->name, GF_LOG_DEBUG, +                        "GETXATTR %"PRId64" (%s) - %s: %s", +                        loc->ino, loc->path, name, strerror (op_errno)); +                goto out; +        } +        remaining_size = size; +        list_offset = 0; +        while (remaining_size > 0) { +                if(*(list+list_offset) == '\0') +                        break; + +                strcpy (key, list + list_offset); + +                op_ret = sys_lgetxattr (real_path, key, NULL, 0); +                if (op_ret == -1) +                        break; + +                value = CALLOC (op_ret + 1, sizeof(char)); +                GF_VALIDATE_OR_GOTO (this->name, value, out); + +                op_ret = sys_lgetxattr (real_path, key, value, +                                        op_ret); +                if (op_ret == -1) +                        break; +                value [op_ret] = '\0'; +                op_ret = dict_set_dynptr (dict, key, +                                          value, op_ret); +                if (op_ret < 0) { +                        FREE (value); +                        gf_log (this->name, GF_LOG_DEBUG, +                                "GETXATTR %"PRId64" (%s) - %s: " +                                "(skipping key %s)", +                                loc->ino, loc->path, name, key); +                        continue; +                } +                remaining_size -= strlen (key) + 1; +                list_offset += strlen (key) + 1; +        } /* while(remaining_size>0) */ +done:  out:          if(bctx) {                  /* NOTE: bctx_unref always returns success, @@ -2098,9 +2431,6 @@ out:                  bctx_unref (bctx);          } -        if (dict) -                dict_ref (dict); -          STACK_UNWIND (frame, op_ret, op_errno, dict);          if (dict) @@ -2127,45 +2457,52 @@ bdb_removexattr (call_frame_t *frame,          GF_VALIDATE_OR_GOTO (this->name, name, out);          if (!S_ISDIR(loc->inode->st_mode)) { -                gf_log (this->name, GF_LOG_WARNING, -                        "operation not permitted on non-directory files"); +                gf_log (this->name, GF_LOG_DEBUG, +                        "REMOVEXATTR %"PRId64" (%s) - %s: ENOATTR " +                        "(not a directory)", +                        loc->ino, loc->path, name);                  op_ret = -1; -                op_errno = EPERM; +                op_errno = ENOATTR;                  goto out;          } -        if (ZR_FILE_CONTENT_REQUEST(name)) { +        if (GF_FILE_CONTENT_REQUEST(name)) {                  bctx = bctx_lookup (B_TABLE(this), loc->path); -                op_errno = EINVAL; -                GF_VALIDATE_OR_GOTO (this->name, bctx, out); - -                op_ret = bdb_db_del (bctx, NULL, name); -                if (op_ret == -1) { -                        gf_log (this->name, GF_LOG_ERROR, -                                "failed to delete %s from db of %s directory", -                                name, loc->path); -                        op_errno = EINVAL; /* TODO: errno */ +                if (bctx == NULL) { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "REMOVEXATTR %"PRId64" (%s) - %s: ENOATTR" +                                "(no database handle for directory)", +                                loc->ino, loc->path, name); +                        op_ret = -1; +                        op_errno = ENOATTR;                          goto out;                  } -        } else { -                MAKE_REAL_PATH(real_path, this, loc->path); -                op_ret = lremovexattr (real_path, name); -                op_errno = errno; + +                op_ret = bdb_db_iremove (bctx, name);                  if (op_ret == -1) { -                        if (op_errno == ENOTSUP) { -                                gf_bdb_xattr_log++; -                                if (!(gf_bdb_xattr_log % GF_UNIVERSAL_ANSWER)) -                                        gf_log (this->name, GF_LOG_WARNING, -                                                "Extended Attributes support not present." -                                                "Please check"); -                        } else { -                                gf_log (this->name, GF_LOG_WARNING, -                                        "%s: %s", -                                        loc->path, strerror (op_errno)); -                        } -                } /* if(op_ret == -1) */ -        } /* if (ZR_FILE_CONTENT_REQUEST(name))...else */ +                        gf_log (this->name, GF_LOG_DEBUG, +                                "REMOVEXATTR %"PRId64" (%s) - %s: ENOATTR" +                                "(no such attribute in database)", +                                loc->ino, loc->path, name); +                        op_errno = ENOATTR; +                } +                goto out; +        } +        MAKE_REAL_PATH(real_path, this, loc->path); +        op_ret = lremovexattr (real_path, name); +        op_errno = errno; +        if (op_ret == -1) { +                if (BDB_TIMED_LOG (op_errno, gf_bdb_xattr_log)) { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "REMOVEXATTR %"PRId64" (%s) - %s: %s", +                                loc->ino, loc->path, name, strerror (op_errno)); +                } else { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "REMOVEXATTR %"PRId64" (%s) - %s: %s", +                                loc->ino, loc->path, name, strerror (op_errno)); +                } +        } /* if(op_ret == -1) */  out:          if (bctx) {                  /* NOTE: bctx_unref always returns success, @@ -2195,9 +2532,15 @@ bdb_fsyncdir (call_frame_t *frame,          frame->root->rsp_refs = NULL; -        bfd = bdb_extract_bfd (fd, this); -        op_errno = EBADFD; -        GF_VALIDATE_OR_GOTO (this->name, bfd, out); +        BDB_FCTX_GET (fd, this, &bfd); +        if (bfd == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "FSYNCDIR %"PRId64": EBADFD" +                        "(failed to find internal context from fd)", +                        fd->inode->ino); +                op_errno = EBADFD; +                op_ret   = -1; +        }  out:          STACK_UNWIND (frame, op_ret, op_errno); @@ -2321,9 +2664,15 @@ bdb_setdents (call_frame_t *frame,          frame->root->rsp_refs = NULL; -        bfd = bdb_extract_bfd (fd, this); -        op_errno = EBADFD; -        GF_VALIDATE_OR_GOTO (this->name, bfd, out); +        BDB_FCTX_GET (fd, this, &bfd); +        if (bfd == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "SETDENTS %"PRId64": EBADFD", +                        fd->inode->ino); +                op_errno = EBADFD; +                op_ret   = -1; +                goto out; +        }          real_path_len = strlen (bfd->path);          entry_path_len = real_path_len + 256; @@ -2346,60 +2695,68 @@ bdb_setdents (call_frame_t *frame,                           */                          ret = mkdir (pathname, trav->buf.st_mode);                          if ((ret == -1) && (errno != EEXIST)) { -                                gf_log (this->name, GF_LOG_ERROR, -                                        "failed to created directory %s: %s", -                                        pathname, strerror(errno)); +                                op_errno = errno; +                                op_ret   = ret; +                                gf_log (this->name, GF_LOG_DEBUG, +                                        "SETDENTS %"PRId64" - %s: %s " +                                        "(mkdir failed)", +                                        fd->inode->ino, pathname, +                                        strerror (op_errno));                                  goto loop;                          } -                        gf_log (this->name, GF_LOG_DEBUG, -                                "Creating directory %s with mode (0%o)", -                                pathname, -                                trav->buf.st_mode);                          /* Change the mode                           * NOTE: setdents tries its best to restore the state                           *       of storage. if chmod and chown fail, they can                           *       be ignored now */                          ret = chmod (pathname, trav->buf.st_mode); -                        if (ret != 0) { -                                op_ret = -1; +                        if (ret < 0) { +                                op_ret   = -1;                                  op_errno = errno; -                                gf_log (this->name, GF_LOG_ERROR, -                                        "chmod failed on %s (%s)", -                                        pathname, strerror (errno)); +                                gf_log (this->name, GF_LOG_DEBUG, +                                        "SETDENTS %"PRId64" - %s: %s " +                                        "(chmod failed)", +                                        fd->inode->ino, pathname, +                                        strerror (op_errno));                                  goto loop;                          }                          /* change the ownership */                          ret = chown (pathname, trav->buf.st_uid,                                       trav->buf.st_gid);                          if (ret != 0) { -                                op_ret = -1; +                                op_ret   = -1;                                  op_errno = errno; -                                gf_log (this->name, GF_LOG_ERROR, -                                        "chown failed on %s (%s)", -                                        pathname, strerror (errno)); +                                gf_log (this->name, GF_LOG_DEBUG, +                                        "SETDENTS %"PRId64" - %s: %s " +                                        "(chown failed)", +                                        fd->inode->ino, pathname, +                                        strerror (op_errno));                                  goto loop;                          }                  } else if ((flags == GF_SET_IF_NOT_PRESENT) ||                             (flags != GF_SET_DIR_ONLY)) {                          /* Create a 0 byte file here */                          if (S_ISREG (trav->buf.st_mode)) { -                                op_ret = bdb_db_put (bfd->ctx, NULL, -                                                     trav->name, NULL, 0, 0, 0); -                                if (op_ret != 0) { -                                        /* create successful */ -                                        gf_log (this->name, GF_LOG_ERROR, -                                                "failed to create file %s", -                                                pathname); -                                } /* if (!op_ret)...else */ +                                op_ret = bdb_db_icreate (bfd->ctx, +                                                         trav->name); +                                if (op_ret < 0) { +                                        gf_log (this->name, GF_LOG_DEBUG, +                                                "SETDENTS %"PRId64" (%s) - %s: " +                                                "%s (database entry creation" +                                                " failed)", +                                                fd->inode->ino, +                                                bfd->ctx->directory, trav->name, +                                                strerror (op_errno)); +                                }                          } else if (S_ISLNK (trav->buf.st_mode)) {                                  /* TODO: impelement */;                          } else { -                                gf_log (this->name, GF_LOG_ERROR, -                                        "storage/bdb allows to create regular" -                                        " files only file %s (mode = %d) cannot" -                                        " be created", -                                        pathname, trav->buf.st_mode); +                                gf_log (this->name, GF_LOG_DEBUG, +                                        "SETDENTS %"PRId64" (%s) - %s mode=%o: " +                                        "(unsupported file type)", +                                        fd->inode->ino, +                                        bfd->ctx->directory, trav->name, +                                        trav->buf.st_mode);                          } /* if(S_ISREG())...else */                  } /* if(S_ISDIR())...else if */          loop: @@ -2431,9 +2788,16 @@ bdb_fstat (call_frame_t *frame,          GF_VALIDATE_OR_GOTO ("bdb", this, out);          GF_VALIDATE_OR_GOTO (this->name, fd, out); -        bfd      = bdb_extract_bfd (fd, this); -        op_errno = EBADFD; -        GF_VALIDATE_OR_GOTO (this->name, bfd, out); +        BDB_FCTX_GET (fd, this, &bfd); +        if (bfd == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "FSTAT %"PRId64": EBADFD " +                        "(failed to find internal context in fd)", +                        fd->inode->ino); +                op_errno = EBADFD; +                op_ret   = -1; +                goto out; +        }          bctx = bfd->ctx; @@ -2441,14 +2805,15 @@ bdb_fstat (call_frame_t *frame,          op_ret = lstat (db_path, &stbuf);          op_errno = errno;          if (op_ret != 0) { -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to lstat on %s (%s)", -                        db_path, strerror (op_errno)); +                gf_log (this->name, GF_LOG_DEBUG, +                        "FSTAT %"PRId64": %s" +                        "(failed to stat database file %s)", +                        fd->inode->ino, strerror (op_errno), db_path);                  goto out;          }          stbuf.st_ino = fd->inode->ino; -        stbuf.st_size = bdb_db_get (bctx, NULL, bfd->key, NULL, 0, 0); +        stbuf.st_size = bdb_db_fread (bfd, NULL, 0, 0);          stbuf.st_blocks = BDB_COUNT_BLOCKS (stbuf.st_size, stbuf.st_blksize);  out: @@ -2458,6 +2823,20 @@ out:          return 0;  } +gf_dirent_t * +gf_dirent_for_namen (const char *name, +                     size_t len) +{ +        char *tmp_name = NULL; + +        tmp_name = alloca (len + 1); + +        memcpy (tmp_name, name, len); + +        tmp_name[len] = 0; + +        return gf_dirent_for_name (tmp_name); +}  int32_t  bdb_readdir (call_frame_t *frame, @@ -2477,6 +2856,7 @@ bdb_readdir (call_frame_t *frame,          int32_t         this_size  = 0;          DBC            *cursorp    = NULL;          int32_t count = 0; +        off_t   offset = 0;          GF_VALIDATE_OR_GOTO ("bdb", frame, out);          GF_VALIDATE_OR_GOTO ("bdb", this, out); @@ -2484,137 +2864,164 @@ bdb_readdir (call_frame_t *frame,          INIT_LIST_HEAD (&entries.list); -        bfd = bdb_extract_bfd (fd, this); -        op_errno = EBADFD; -        GF_VALIDATE_OR_GOTO (this->name, bfd, out); - -        op_errno = ENOMEM; - -        while (filled <= size) { -                this_entry = NULL; -                entry      = NULL; -                in_case    = 0; -                this_size  = 0; - -                in_case = telldir (bfd->dir); -                entry = readdir (bfd->dir); -                if (!entry) -                        break; - -                if (IS_BDB_PRIVATE_FILE(entry->d_name)) -                        continue; - -                this_size = dirent_size (entry); - -                if (this_size + filled > size) { -                        seekdir (bfd->dir, in_case); -                        break; -                } - -                count++; - -                this_entry = gf_dirent_for_name (entry->d_name); -                this_entry->d_ino = entry->d_ino; - -                this_entry->d_off = -1; - -                this_entry->d_type = entry->d_type; -                this_entry->d_len = entry->d_reclen; - - -                list_add (&this_entry->list, &entries.list); - -                filled += this_size; -        } -        op_ret = filled; -        op_errno = 0; -        if (filled >= size) { +        BDB_FCTX_GET (fd, this, &bfd); +        if (bfd == NULL) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "READDIR %"PRId64" - %"PRId32",%"PRId64": EBADFD " +                        "(failed to find internal context in fd)", +                        fd->inode->ino, size, off); +                op_errno = EBADFD; +                op_ret   = -1;                  goto out;          } -        /* hungry kyaa? */          op_ret = bdb_cursor_open (bfd->ctx, &cursorp); -        op_errno = EBADFD; -        GF_VALIDATE_OR_GOTO (this->name, (op_ret == 0), out); - -        /* TODO: fix d_off, don't use bfd->offset. wrong method */ -        if (strlen (bfd->offset)) { -                DBT key = {0,}, value = {0,}; -                key.data = bfd->offset; -                key.size = strlen (bfd->offset); -                key.flags = DB_DBT_USERMEM; -                value.dlen = 0; -                value.doff = 0; -                value.flags = DB_DBT_PARTIAL; - -                op_ret = bdb_cursor_get (cursorp, &key, &value, DB_SET); +        if (op_ret < 0) { +                gf_log (this->name, GF_LOG_DEBUG, +                        "READDIR %"PRId64" - %"PRId32",%"PRId64": EBADFD " +                        "(failed to open cursor to database handle)", +                        fd->inode->ino, size, off);                  op_errno = EBADFD; -                GF_VALIDATE_OR_GOTO (this->name, (op_ret == 0), out); +                goto out; +        } -        } else { -                /* first time or last time, do nothing */ +        if (off) { +                DBT sec = {0,}, pri = {0,}, val = {0,}; +                sec.data = &(off); +                sec.size = sizeof (off); +                sec.flags = DB_DBT_USERMEM; +                val.dlen = 0; +                val.doff = 0; +                val.flags = DB_DBT_PARTIAL; + +                op_ret = bdb_cursor_get (cursorp, &sec, &pri, &val, DB_SET); +                if (op_ret == DB_NOTFOUND) { +                        offset = off; +                        goto dir_read; +                }          }          while (filled <= size) { -                DBT key = {0,}, value = {0,}; +                DBT sec = {0,}, pri = {0,}, val = {0,}; +                  this_entry = NULL; -                key.flags = DB_DBT_MALLOC; -                value.dlen = 0; -                value.doff = 0; -                value.flags = DB_DBT_PARTIAL; -                op_ret = bdb_cursor_get (cursorp, &key, &value, DB_NEXT); +                sec.flags = DB_DBT_MALLOC; +                pri.flags = DB_DBT_MALLOC; +                val.dlen = 0; +                val.doff = 0; +                val.flags = DB_DBT_PARTIAL; +                op_ret = bdb_cursor_get (cursorp, &sec, &pri, &val, DB_NEXT);                  if (op_ret == DB_NOTFOUND) {                          /* we reached end of the directory */                          op_ret = 0;                          op_errno = 0;                          break; -                } else if (op_ret != 0) { +                } else if (op_ret < 0) {                          gf_log (this->name, GF_LOG_DEBUG, -                                "database error during readdir"); -                        op_ret = -1; +                                "READDIR %"PRId64" - %"PRId32",%"PRId64":" +                                "(failed to read the next entry from database)", +                                fd->inode->ino, size, off);                          op_errno = ENOENT;                          break;                  } /* if (op_ret == DB_NOTFOUND)...else if...else */ -                if (key.data == NULL) { +                if (pri.data == NULL) {                          /* NOTE: currently ignore when we get key.data == NULL.                           * TODO: we should not get key.data = NULL */                          gf_log (this->name, GF_LOG_DEBUG, -                                "null key read from db"); +                                "READDIR %"PRId64" - %"PRId32",%"PRId64":" +                                "(null key read for entry from database)", +                                fd->inode->ino, size, off);                          continue;                  }/* if(key.data)...else */                  count++; -                this_size = bdb_dirent_size (&key); +                this_size = bdb_dirent_size (&pri);                  if (this_size + filled > size)                          break;                  /* TODO - consider endianness here */ -                this_entry = gf_dirent_for_name ((const char *)key.data); -                /* FIXME: bug, if someone is going to use ->d_ino */ -                this_entry->d_ino = -1; -                this_entry->d_off = 0; +                this_entry = gf_dirent_for_namen ((const char *)pri.data, +                                                  pri.size); + +                this_entry->d_ino = bdb_inode_transform (fd->inode->ino, +                                                         pri.data, +                                                         pri.size); +                this_entry->d_off = *(uint32_t *)sec.data;                  this_entry->d_type = 0; -                this_entry->d_len = key.size; +                this_entry->d_len = pri.size + 1; -                if (key.data) { -                        strncpy (bfd->offset, key.data, key.size); -                        bfd->offset [key.size] = '\0'; -                        free (key.data); +                if (sec.data) { +                        FREE (sec.data);                  } -                list_add (&this_entry->list, &entries.list); +                if (pri.data) +                        FREE (pri.data); + +                list_add_tail (&this_entry->list, &entries.list);                  filled += this_size;          }/* while */          bdb_cursor_close (bfd->ctx, cursorp);          op_ret = filled;          op_errno = 0; +        if (filled >= size) { +                goto out; +        } +dir_read: +        /* hungry kyaa? */ +        if (!offset) { +                rewinddir (bfd->dir); +        } else { +                seekdir (bfd->dir, offset); +        } + +        while (filled <= size) { +                this_entry = NULL; +                entry      = NULL; +                this_size  = 0; + +                in_case = telldir (bfd->dir); +                entry = readdir (bfd->dir); +                if (!entry) +                        break; + +                if (IS_BDB_PRIVATE_FILE(entry->d_name)) +                        continue; + +                this_size = dirent_size (entry); + +                if (this_size + filled > size) { +                        seekdir (bfd->dir, in_case); +                        break; +                } + +                count++; + +                this_entry = gf_dirent_for_name (entry->d_name); +                this_entry->d_ino = entry->d_ino; + +                this_entry->d_off = entry->d_off; + +                this_entry->d_type = entry->d_type; +                this_entry->d_len = entry->d_reclen; + + +                list_add_tail (&this_entry->list, &entries.list); + +                filled += this_size; +        } +        op_ret = filled; +        op_errno = 0; +  out:          frame->root->rsp_refs = NULL; +          gf_log (this->name, GF_LOG_DEBUG, -                "read %"GF_PRI_SIZET" bytes for %d entries", -                filled, count); +                "READDIR %"PRId64" - %"PRId32" (%"PRId32")/%"PRId32",%"PRId64":" +                "(failed to read the next entry from database)", +                fd->inode->ino, filled, count, size, off); +          STACK_UNWIND (frame, count, op_errno, &entries);          gf_dirent_free (&entries); @@ -2629,11 +3036,11 @@ bdb_stats (call_frame_t *frame,             int32_t flags)  { -        int32_t op_ret = 0; +        int32_t op_ret   = 0;          int32_t op_errno = 0;          struct xlator_stats xlstats = {0, }, *stats = NULL; -        struct statvfs buf; +        struct statvfs buf = {0,};          struct timeval tv;          struct bdb_private *private = NULL;          int64_t avg_read = 0; @@ -2647,10 +3054,10 @@ bdb_stats (call_frame_t *frame,          stats = &xlstats;          op_ret = statvfs (private->export_path, &buf); -        op_errno = errno;          if (op_ret != 0) { -                gf_log (this->name, GF_LOG_ERROR, -                        "failed to statvfs on %s (%s)", +                op_errno = errno; +                gf_log (this->name, GF_LOG_DEBUG, +                        "STATS %s: %s",                          private->export_path, strerror (op_errno));                  goto out;          } @@ -2661,9 +3068,9 @@ bdb_stats (call_frame_t *frame,          stats->nr_clients = private->stats.nr_clients;          /* Number of Free block in the filesystem. */ -        stats->free_disk = buf.f_bfree * buf.f_bsize; +        stats->free_disk       = buf.f_bfree * buf.f_bsize;          stats->total_disk_size = buf.f_blocks * buf.f_bsize; /* */ -        stats->disk_usage = (buf.f_blocks - buf.f_bavail) * buf.f_bsize; +        stats->disk_usage      = (buf.f_blocks - buf.f_bavail) * buf.f_bsize;          /* Calculate read and write usage */          gettimeofday (&tv, NULL); @@ -2672,7 +3079,7 @@ bdb_stats (call_frame_t *frame,          _time_ms = (tv.tv_sec - private->init_time.tv_sec) * 1000 +                  ((tv.tv_usec - private->init_time.tv_usec) / 1000); -        avg_read = (_time_ms) ? (private->read_value / _time_ms) : 0; /* KBps */ +        avg_read  = (_time_ms) ? (private->read_value / _time_ms) : 0;/* KBps */          avg_write = (_time_ms) ? (private->write_value / _time_ms) : 0;          _time_ms = (tv.tv_sec - private->prev_fetch_time.tv_sec) * 1000 + @@ -2706,9 +3113,10 @@ bdb_inodelk (call_frame_t *frame, xlator_t *this,  {          frame->root->rsp_refs = NULL; -        gf_log (this->name, GF_LOG_CRITICAL, -                "\"features/posix-locks\" translator is not loaded. " -                "You need to use it for proper functioning of GlusterFS"); +        gf_log (this->name, GF_LOG_ERROR, +                "glusterfs internal locking request. please load " +                "'features/locks' translator to enable glusterfs " +                "support");          STACK_UNWIND (frame, -1, ENOSYS);          return 0; @@ -2721,9 +3129,10 @@ bdb_finodelk (call_frame_t *frame, xlator_t *this,  {          frame->root->rsp_refs = NULL; -        gf_log (this->name, GF_LOG_CRITICAL, -                "\"features/posix-locks\" translator is not loaded. " -                "You need to use it for proper functioning of GlusterFS"); +        gf_log (this->name, GF_LOG_ERROR, +                "glusterfs internal locking request. please load " +                "'features/locks' translator to enable glusterfs " +                "support");          STACK_UNWIND (frame, -1, ENOSYS);          return 0; @@ -2737,9 +3146,10 @@ bdb_entrylk (call_frame_t *frame, xlator_t *this,  {          frame->root->rsp_refs = NULL; -        gf_log (this->name, GF_LOG_CRITICAL, -                "\"features/posix-locks\" translator is not loaded. " -                "You need to use it for proper functioning of GlusterFS"); +        gf_log (this->name, GF_LOG_ERROR, +                "glusterfs internal locking request. please load " +                "'features/locks' translator to enable glusterfs " +                "support");          STACK_UNWIND (frame, -1, ENOSYS);          return 0; @@ -2753,15 +3163,15 @@ bdb_fentrylk (call_frame_t *frame, xlator_t *this,  {          frame->root->rsp_refs = NULL; -        gf_log (this->name, GF_LOG_CRITICAL, -                "\"features/posix-locks\" translator is not loaded. " -                "You need to use it for proper functioning of GlusterFS"); +        gf_log (this->name, GF_LOG_ERROR, +                "glusterfs internal locking request. please load " +                "'features/locks' translator to enable glusterfs " +                "support");          STACK_UNWIND (frame, -1, ENOSYS);          return 0;  } -  int32_t  bdb_checksum (call_frame_t *frame,                xlator_t *this, @@ -2775,10 +3185,11 @@ bdb_checksum (call_frame_t *frame,          uint8_t        dir_checksum[ZR_FILENAME_MAX]  = {0,};          int32_t        op_ret   = -1;          int32_t        op_errno = EINVAL; -        int32_t        i = 0, length = 0; +        int32_t        idx = 0, length = 0;          bctx_t        *bctx    = NULL;          DBC           *cursorp = NULL;          char          *data    = NULL; +        uint8_t        no_break = 1;          GF_VALIDATE_OR_GOTO ("bdb", frame, out);          GF_VALIDATE_OR_GOTO ("bdb", this, out); @@ -2798,55 +3209,66 @@ bdb_checksum (call_frame_t *frame,                                  continue;                          length = strlen (dirent->d_name); -                        for (i = 0; i < length; i++) -                                dir_checksum[i] ^= dirent->d_name[i]; +                        for (idx = 0; idx < length; idx++) +                                dir_checksum[idx] ^= dirent->d_name[idx];                  } /* while((dirent...)) */                  closedir (dir);          }          {                  bctx = bctx_lookup (B_TABLE(this), (char *)loc->path); -                op_errno = EINVAL; -                GF_VALIDATE_OR_GOTO (this->name, bctx, out); +                if (bctx == NULL) { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "CHECKSUM %"PRId64" (%s): ENOMEM" +                                "(failed to lookup database handle)", +                                loc->inode->ino, loc->path); +                        op_ret   = -1; +                        op_errno = ENOMEM; +                        goto out; +                }                  op_ret = bdb_cursor_open (bctx, &cursorp); -                op_errno = EINVAL; -                GF_VALIDATE_OR_GOTO (this->name, (op_ret == 0), out); +                if (op_ret < 0) { +                        gf_log (this->name, GF_LOG_DEBUG, +                                "CHECKSUM %"PRId64" (%s): EBADFD" +                                "(failed to open cursor to database handle)", +                                loc->inode->ino, loc->path); +                        op_ret   = -1; +                        op_errno = EBADFD; +                        goto out; +                } -                while (1) { -                        DBT key = {0,}, value = {0,}; + +                do { +                        DBT key = {0,}, value = {0,}, sec = {0,};                          key.flags = DB_DBT_MALLOC;                          value.doff = 0;                          value.dlen = 0; -                        op_ret = bdb_cursor_get (cursorp, &key, &value, -                                                 DB_NEXT); +                        op_ret = bdb_cursor_get (cursorp, &sec, &key, +                                                 &value, DB_NEXT);                          if (op_ret == DB_NOTFOUND) { -                                gf_log (this->name, GF_LOG_DEBUG, -                                        "end of list of key/value pair in db" -                                        " for directory: %s", bctx->directory);                                  op_ret = 0;                                  op_errno = 0; -                                break; +                                no_break = 0;                          } else if (op_ret == 0){                                  /* successfully read */                                  data = key.data;                                  length = key.size; -                                for (i = 0; i < length; i++) -                                        file_checksum[i] ^= data[i]; +                                for (idx = 0; idx < length; idx++) +                                        file_checksum[idx] ^= data[idx]; -                                free (key.data); +                                FREE (key.data);                          } else { -                                gf_log (this->name, GF_LOG_ERROR, -                                        "failed to do cursor get for directory" -                                        " %s: %s", -                                        bctx->directory, db_strerror (op_ret)); +                                gf_log (this->name, GF_LOG_DEBUG, +                                        "CHECKSUM %"PRId64" (%s)", +                                        loc->inode->ino, loc->path);                                  op_ret = -1; -                                op_errno = ENOENT; -                                break; +                                op_errno = ENOENT; /* TODO: watch errno */ +                                no_break = 0;                          }/* if(op_ret == DB_NOTFOUND)...else if...else */ -                } /* while(1) */ +                } while (no_break);                  bdb_cursor_close (bctx, cursorp);          }  out: @@ -2904,44 +3326,77 @@ init (xlator_t *this)          GF_VALIDATE_OR_GOTO ("bdb", this, out); -        _private = CALLOC (1, sizeof (*_private)); -        GF_VALIDATE_OR_GOTO (this->name, _private, out); -          if (this->children) {                  gf_log (this->name, GF_LOG_ERROR, -                        "FATAL: storage/bdb cannot have subvolumes"); -                FREE (_private); -                goto out;; +                        "'storage/bdb' translator should be used as leaf node " +                        "in translator tree. please remove the subvolumes" +                        " specified and retry."); +                goto err;          }          if (!this->parents) { -                gf_log (this->name, GF_LOG_WARNING, -                        "dangling volume. check volfile "); +                gf_log (this->name, GF_LOG_ERROR, +                        "'storage/bdb' translator needs at least one among " +                        "'protocol/server' or 'mount/fuse' translator as " +                        "parent. please add 'protocol/server' or 'mount/fuse' " +                        "as parent of 'storage/bdb' and retry. or you can also" +                        " try specifying mount-point on command-line."); +                goto err;          } +        _private = CALLOC (1, sizeof (*_private)); +        if (_private == NULL) { +                gf_log (this->name, GF_LOG_ERROR, +                        "could not allocate memory for 'storage/bdb' " +                        "configuration data-structure. cannot continue from " +                        "here"); +                goto err; +        } + +          ret = dict_get_str (this->options, "directory", &directory);          if (ret < 0) {                  gf_log (this->name, GF_LOG_ERROR, -                        "export directory not specified in volfile"); -                FREE (_private); -                goto out; +                        "'storage/bdb' needs at least " +                        "'option directory <path-to-export-directory>' as " +                        "minimal configuration option. please specify an " +                        "export directory using " +                        "'option directory <path-to-export-directory>' and " +                        "retry."); +                goto err;          } +          umask (000); /* umask `masking' is done at the client side */          /* Check whether the specified directory exists, if not create it. */          ret = stat (directory, &buf); -        if ((ret != 0) || !S_ISDIR (buf.st_mode)) { +        if (ret < 0) { +                gf_log (this->name, GF_LOG_ERROR, +                        "specified export path '%s' does not exist. " +                        "please create the export path '%s' and retry.", +                        directory, directory); +                goto err; +        } else if (!S_ISDIR (buf.st_mode)) {                  gf_log (this->name, GF_LOG_ERROR, -                        "specified directory '%s' doesn't exists, Exiting", +                        "specified export path '%s' is not a directory. " +                        "please specify a valid and existing directory as " +                        "export directory and retry.",                          directory); -                FREE (_private); -                goto out; +                goto err;          } else {                  ret = 0;          }          _private->export_path = strdup (directory); +        if (_private->export_path == NULL) { +                gf_log (this->name, GF_LOG_ERROR, +                        "could not allocate memory for 'storage/bdb' " +                        "configuration data-structure. cannot continue from " +                        "here"); +                goto err; +        } +          _private->export_path_length = strlen (_private->export_path);          { @@ -2953,27 +3408,40 @@ init (xlator_t *this)          }          this->private = (void *)_private; +          {                  ret = bdb_db_init (this, this->options); -                if (ret == -1){ -                        gf_log (this->name, GF_LOG_DEBUG, -                                "failed to initialize database"); -                        goto out; +                if (ret < 0){ +                        gf_log (this->name, GF_LOG_ERROR, +                                "database environment initialisation failed. " +                                "manually run database recovery tool and " +                                "retry to run glusterfs"); +                        goto err;                  } else {                          bctx = bctx_lookup (_private->b_table, "/");                          /* NOTE: we are not doing bctx_unref() for root bctx,                           *      let it remain in active list forever */ -                        if (!bctx) { +                        if (bctx == NULL) {                                  gf_log (this->name, GF_LOG_ERROR, -                                        "failed to allocate memory for root (/)" -                                        " bctx: out of memory"); -                                goto out; +                                        "could not allocate memory for " +                                        "'storage/bdb' configuration data-" +                                        "structure. cannot continue from " +                                        "here"); +                                goto err;                          } else {                                  ret = 0; +                                goto out;                          }                  }          } +err: +        if (_private) { +                if (_private->export_path) +                        FREE (_private->export_path); + +                FREE (_private); +        }  out:          return ret;  } @@ -2984,12 +3452,17 @@ bctx_cleanup (struct list_head *head)          bctx_t *trav    = NULL;          bctx_t *tmp     = NULL;          DB     *storage = NULL; +        DB     *secondary = NULL;          list_for_each_entry_safe (trav, tmp, head, list) {                  LOCK (&trav->lock);                  { -                        storage = trav->dbp; -                        trav->dbp = NULL; +                        storage = trav->primary; +                        trav->primary = NULL; + +                        secondary = trav->secondary; +                        trav->secondary = NULL; +                          list_del_init (&trav->list);                  }                  UNLOCK (&trav->lock); @@ -2998,6 +3471,11 @@ bctx_cleanup (struct list_head *head)                          storage->close (storage, 0);                          storage = NULL;                  } + +                if (secondary) { +                        secondary->close (secondary, 0); +                        secondary = NULL; +                }          }          return;  } @@ -3025,7 +3503,11 @@ fini (xlator_t *this)                          ret = pthread_join (private->checkpoint_thread, NULL);                          if (ret != 0) {                                  gf_log (this->name, GF_LOG_CRITICAL, -                                        "failed to join checkpoint thread"); +                                        "could not complete checkpointing " +                                        "database environment. this might " +                                        "result in inconsistencies in few" +                                        " recent data and meta-data " +                                        "operations");                          }                          BDB_ENV(this)->close (BDB_ENV(this), 0); diff --git a/xlators/storage/bdb/src/bdb.h b/xlators/storage/bdb/src/bdb.h index c9db02c10..e25978cc6 100644 --- a/xlators/storage/bdb/src/bdb.h +++ b/xlators/storage/bdb/src/bdb.h @@ -54,6 +54,8 @@  #include "inode.h"  #include "compat.h"  #include "compat-errno.h" +#include "fd.h" +#include "syscall.h"  #define BDB_STORAGE    "/glusterfs_storage.db" @@ -73,6 +75,8 @@  #define BDB_EXPORT_PATH_LEN(_private) \          (((struct bdb_private *)_private)->export_path_length) +#define BDB_KEY_FROM_FREQUEST_KEY(_key) (&(key[15])) +  #define BDB_EXPORT_PATH(_private) \          (((struct bdb_private *)_private)->export_path)  /* MAKE_REAL_PATH(var,this,path) @@ -89,6 +93,12 @@                  strcpy (&var[base_len], path);                          \          } while (0) + +#define BDB_TIMED_LOG(_errno,_counter)  \ +        ((_errno == ENOTSUP) && (((++_counter) % GF_UNIVERSAL_ANSWER) == 1)) + +#define GF_FILE_CONTENT_REQUEST ZR_FILE_CONTENT_REQUEST +  /* MAKE_REAL_PATH_TO_STORAGE_DB(var,this,path)   * make the real path to the storage-database file on file-system   * @@ -119,21 +129,6 @@                  key = basename (tmp);                   \          }while (0); -/* BDB_DO_LSTAT(path,stbuf,dirent) - * construct real-path to a dirent and do lstat on the real-path - * - * @path:   path to the directory whose readdir is currently in progress - * @stbuf:  a 'struct stat *' - * @dirent: a 'struct dirent *' - */ -#define BDB_DO_LSTAT(path, stbuf, dirent) do {          \ -                char tmp_real_path[GF_PATH_MAX];        \ -                strcpy(tmp_real_path, path);            \ -                strcat (tmp_real_path, "/");            \ -                strcat(tmp_real_path, dirent->d_name);  \ -                ret = lstat (tmp_real_path, stbuf);     \ -        } while(0); -  /* IS_BDB_PRIVATE_FILE(name)   * check if a given 'name' is bdb xlator's internal file name   * @@ -152,8 +147,7 @@  #define IS_DOT_DOTDOT(name) \          ((!strncmp(name,".", 1)) || (!strncmp(name,"..", 2))) -/* BDB_SET_BCTX(this,inode,bctx) - * put a stamp on inode. d00d, you are using bdb.. huhaha. +/* BDB_ICTX_SET(this,inode,bctx)   * pointer to 'struct bdb_ctx' is stored in inode's ctx of all directories.   * this will happen either in lookup() or mkdir().   * @@ -161,29 +155,35 @@   * @inode: inode where 'struct bdb_ctx *' has to be stored.   * @bctx:  a 'struct bdb_ctx *'   */ -#define BDB_SET_BCTX(this,inode,bctx) do{                               \ -                inode_ctx_put(inode, this, (uint64_t)(long)bctx);       \ +#define BDB_ICTX_SET(_inode,_this,_bctx) do{                            \ +                inode_ctx_put(_inode, _this, (uint64_t)(long)_bctx);    \ +        }while (0); + +#define BDB_ICTX_GET(_inode,_this,_bctxp) do {                  \ +                uint64_t tmp_bctx = 0;                          \ +                inode_ctx_get (_inode, _this, &tmp_bctx);       \ +                *_bctxp = tmp_bctx;                             \          }while (0); -/* MAKE_BCTX_FROM_INODE(this,bctx,inode) - * extract bdb xlator's 'struct bdb_ctx *' from an inode's ctx. - * valid only if done for directory inodes, otherwise bctx = NULL. +/* BDB_FCTX_SET(this,fd,bctx) + * pointer to 'struct bdb_ctx' is stored in inode's ctx of all directories. + * this will happen either in lookup() or mkdir().   *   * @this:  pointer xlator_t of bdb xlator. + * @inode: inode where 'struct bdb_ctx *' has to be stored.   * @bctx:  a 'struct bdb_ctx *' - * @inode: inode from where 'struct bdb_ctx *' has to be extracted.   */ -#define MAKE_BCTX_FROM_INODE(this,bctx,inode) do{       \ -                uint64_t tmp_bctx = 0;                  \ -                inode_ctx_get (inode, this, &tmp_bctx); \ -                if (ret == 0)                           \ -                        bctx = (void *)(long)tmp_bctx;  \ +#define BDB_FCTX_SET(_fd,_this,_bfd) do{                        \ +                fd_ctx_set(_fd, _this, (uint64_t)(long)_bfd);   \          }while (0); -#define BDB_SET_BFD(this,fd,bfd) do{                            \ -                fd_ctx_set (fd, this, (uint64_t)(long)bfd);     \ +#define BDB_FCTX_GET(_fd,_this,_bfdp) do {              \ +                uint64_t tmp_bfd = 0;                   \ +                fd_ctx_get (_fd, _this, &tmp_bfd);      \ +                *_bfdp = (void *)(long)tmp_bfd;         \          }while (0); +  /* maximum number of open dbs that bdb xlator will ever have */  #define BDB_MAX_OPEN_DBS 100 @@ -270,7 +270,8 @@ struct bdb_ctx {          char              *directory;   /* directory path */          /* pointer to open database, that resides inside this directory */ -        DB                *dbp; +        DB                *primary; +        DB                *secondary;          uint32_t           cache;       /* cache ON or OFF */          /* per directory cache, bdb xlator's internal cache */ @@ -298,8 +299,6 @@ struct bdb_dir {          /* open directory pointer, as returned by opendir() */          DIR            *dir; -        /* FIXME: readdir offset, too crude. must go  */ -        char            offset[NAME_MAX];          char           *path;             /* path to this directory */  }; @@ -386,12 +385,6 @@ struct bdb_private {           * (option checkpoint-interval <time-in-seconds>) */          uint32_t             checkpoint_interval; -        /* inode number allocation counter */ -        ino_t               next_ino; - -        /* lock to protect 'next_ino' */ -        gf_lock_t           ino_lock; -          /* environment log directory (option logdir <directory>) */          char               *logdir; @@ -436,26 +429,28 @@ bdb_txn_commit (DB_TXN *txnid)          return txnid->commit (txnid, 0);  } -inline void * -bdb_extract_bfd (fd_t *fd, xlator_t *this); - -  void *  bdb_db_stat (bctx_t *bctx,               DB_TXN *txnid,               uint32_t flags); -int32_t +/*int32_t  bdb_db_get(struct bdb_ctx *bctx,             DB_TXN *txnid,             const char *key_string,             char **buf,             size_t size,             off_t offset); +*/ +int32_t +bdb_db_fread (struct bdb_fd *bfd, char **bufp, size_t size, off_t offset); + +int32_t +bdb_db_iread (struct bdb_ctx *bctx, const char *key, char **bufp);  #define BDB_TRUNCATE_RECORD 0xcafebabe -int32_t +/*int32_t  bdb_db_put (struct bdb_ctx *bctx,              DB_TXN *txnid,              const char *key_string, @@ -463,16 +458,27 @@ bdb_db_put (struct bdb_ctx *bctx,              size_t size,              off_t offset,              int32_t flags); +*/ +int32_t +bdb_db_icreate (struct bdb_ctx *bctx, const char *key);  int32_t -bdb_db_del (struct bdb_ctx *bctx, -            DB_TXN *txnid, -            const char *path); +bdb_db_fwrite (struct bdb_fd *bfd, char *buf, size_t size, off_t offset); + +int32_t +bdb_db_iwrite (struct bdb_ctx *bctx, const char *key, char *buf, size_t size); + +int32_t +bdb_db_itruncate (struct bdb_ctx *bctx, const char *key); + +int32_t +bdb_db_iremove (struct bdb_ctx *bctx, +                const char *key);  ino_t  bdb_inode_transform (ino_t parent, -                     struct bdb_ctx *bctx); - +                     const char *name, +                     size_t namelen);  int32_t  bdb_cursor_open (struct bdb_ctx *bctx, @@ -480,7 +486,7 @@ bdb_cursor_open (struct bdb_ctx *bctx,  int32_t  bdb_cursor_get (DBC *cursorp, -                DBT *key, +                DBT *sec, DBT *pri,                  DBT *value,                  int32_t flags);  | 
