diff options
Diffstat (limited to 'xlators/performance/io-cache/src/page.c')
| -rw-r--r-- | xlators/performance/io-cache/src/page.c | 1057 | 
1 files changed, 569 insertions, 488 deletions
diff --git a/xlators/performance/io-cache/src/page.c b/xlators/performance/io-cache/src/page.c index 47a8fbb66..728f03736 100644 --- a/xlators/performance/io-cache/src/page.c +++ b/xlators/performance/io-cache/src/page.c @@ -38,26 +38,44 @@ ioc_empty (struct ioc_cache *cache)  }  ioc_page_t * -ioc_page_get (ioc_inode_t *ioc_inode, off_t offset) +__ioc_page_get (ioc_inode_t *ioc_inode, off_t offset)  { -	ioc_page_t   *page           = NULL; -	ioc_table_t  *table          = NULL; -	off_t         rounded_offset = 0; +        ioc_page_t   *page           = NULL; +        ioc_table_t  *table          = NULL; +        off_t         rounded_offset = 0;          table = ioc_inode->table;          rounded_offset = floor (offset, table->page_size); -  +          page = rbthash_get (ioc_inode->cache.page_table, &rounded_offset,                              sizeof (rounded_offset));          if (page != NULL) { -		/* push the page to the end of the lru list */ -		list_move_tail (&page->page_lru, &ioc_inode->cache.page_lru); -	} +                /* push the page to the end of the lru list */ +                list_move_tail (&page->page_lru, &ioc_inode->cache.page_lru); +        } -	return page; +        return page;  } +ioc_page_t * +ioc_page_get (ioc_inode_t *ioc_inode, off_t offset) +{ +        ioc_page_t *page = NULL; + +        if (ioc_inode == NULL) { +                goto out; +        } + +        ioc_inode_lock (ioc_inode); +        { +                page = __ioc_page_get (ioc_inode, offset); +        } +        ioc_inode_unlock (ioc_inode); + +out: +        return page; +}  /*   * ioc_page_destroy - @@ -66,40 +84,98 @@ ioc_page_get (ioc_inode_t *ioc_inode, off_t offset)   *   */  int64_t -ioc_page_destroy (ioc_page_t *page) +__ioc_page_destroy (ioc_page_t *page)  { -	int64_t  page_size = 0; +        int64_t  page_size = 0; -	page_size = iobref_size (page->iobref); +        page_size = iobref_size (page->iobref); -	if (page->waitq) { -		/* frames waiting on this page, do not destroy this page */ -		page_size = -1; -	} else { +        if (page->waitq) { +                /* frames waiting on this page, do not destroy this page */ +                page_size = -1; +        } else {                  rbthash_remove (page->inode->cache.page_table, &page->offset,                                  sizeof (page->offset)); -		list_del (&page->page_lru); -     -		gf_log (page->inode->table->xl->name, GF_LOG_TRACE, -			"destroying page = %p, offset = %"PRId64" " -			"&& inode = %p", -			page, page->offset, page->inode); -     -		if (page->vector){ -			iobref_unref (page->iobref); -			GF_FREE (page->vector); -			page->vector = NULL; -		} -     -		page->inode = NULL; -	} - -	if (page_size != -1) { -		pthread_mutex_destroy (&page->page_lock); -		GF_FREE (page); -	} - -	return page_size; +                list_del (&page->page_lru); + +                gf_log (page->inode->table->xl->name, GF_LOG_TRACE, +                        "destroying page = %p, offset = %"PRId64" " +                        "&& inode = %p", +                        page, page->offset, page->inode); + +                if (page->vector){ +                        iobref_unref (page->iobref); +                        GF_FREE (page->vector); +                        page->vector = NULL; +                } + +                page->inode = NULL; +        } + +        if (page_size != -1) { +                pthread_mutex_destroy (&page->page_lock); +                GF_FREE (page); +        } + +        return page_size; +} + + +int64_t +ioc_page_destroy (ioc_page_t *page) +{ +        int64_t ret = 0; + +        if (page == NULL) { +                goto out; +        } + +        ioc_inode_lock (page->inode); +        { +                ret = __ioc_page_destroy (page); +        } +        ioc_inode_unlock (page->inode); + +out: +        return ret; +} + +int32_t +__ioc_inode_prune (ioc_inode_t *curr, uint64_t *size_pruned, +                   uint64_t size_to_prune, uint32_t index) +{ +        ioc_page_t  *page  = NULL, *next = NULL; +        int32_t      ret   = 0; +        ioc_table_t *table = NULL; + +        if (curr == NULL) { +                goto out; +        } + +        table = curr->table; + +        list_for_each_entry_safe (page, next, &curr->cache.page_lru, page_lru) { +                *size_pruned += page->size; +                ret = __ioc_page_destroy (page); + +                if (ret != -1) +                        table->cache_used -= ret; + +                gf_log (table->xl->name, GF_LOG_TRACE, +                        "index = %d && table->cache_used = %"PRIu64" && table->" +                        "cache_size = %"PRIu64, index, table->cache_used, +                        table->cache_size); + +                if ((*size_pruned) >= size_to_prune) +                        break; +        } + +        if (ioc_empty (&curr->cache)) { +                list_del_init (&curr->inode_lru); +        } + +out: +        return 0;  }  /* @@ -112,69 +188,41 @@ ioc_page_destroy (ioc_page_t *page)  int32_t  ioc_prune (ioc_table_t *table)  { -	ioc_inode_t *curr = NULL, *next_ioc_inode = NULL; -	ioc_page_t  *page = NULL, *next = NULL; -	int32_t     ret = -1; -	int32_t     index = 0; -	uint64_t    size_to_prune = 0; -	uint64_t    size_pruned = 0; - -	ioc_table_lock (table); -	{ -		size_to_prune = table->cache_used - table->cache_size; -		/* take out the least recently used inode */ -		for (index=0; index < table->max_pri; index++) { -			list_for_each_entry_safe (curr, next_ioc_inode,  -						  &table->inode_lru[index],  -						  inode_lru) { -				/* prune page-by-page for this inode, till  -				 * we reach the equilibrium */ -				ioc_inode_lock (curr); -				/* { */ - -				list_for_each_entry_safe (page, next,  -							  &curr->cache.page_lru, -                                                          page_lru) { -					/* done with all pages, and not  -					 * reached equilibrium yet?? -					 * continue with next inode in  -					 * lru_list */ -					size_pruned += page->size; -					ret = ioc_page_destroy (page); - -					if (ret != -1) -						table->cache_used -= ret; -	     -					gf_log (table->xl->name, -						GF_LOG_TRACE, -						"index = %d && table->cache_" -						"used = %"PRIu64" && table->" -						"cache_size = %"PRIu64,  -						index, table->cache_used,  -						table->cache_size); -	     -					if (size_pruned >= size_to_prune) -						break; -				} /* list_for_each_entry_safe(page...) */ -				if (ioc_empty (&curr->cache)) { -					list_del_init (&curr->inode_lru); -				} - -				/* } */  -				ioc_inode_unlock (curr); -	 -				if (size_pruned >= size_to_prune) -					break; -			} /* list_for_each_entry_safe (curr...) */ -       -			if (size_pruned >= size_to_prune) -				break; -		} /* for(index=0;...) */ - -	} /* ioc_inode_table locked region end */ -	ioc_table_unlock (table); - -	return 0; +        ioc_inode_t *curr = NULL, *next_ioc_inode = NULL; +        int32_t     index = 0; +        uint64_t    size_to_prune = 0; +        uint64_t    size_pruned = 0; + +        ioc_table_lock (table); +        { +                size_to_prune = table->cache_used - table->cache_size; +                /* take out the least recently used inode */ +                for (index=0; index < table->max_pri; index++) { +                        list_for_each_entry_safe (curr, next_ioc_inode, +                                                  &table->inode_lru[index], +                                                  inode_lru) { +                                /* prune page-by-page for this inode, till +                                 * we reach the equilibrium */ +                                ioc_inode_lock (curr); +                                { +                                        __ioc_inode_prune (curr, &size_pruned, +                                                           size_to_prune, +                                                           index); +                                } +                                ioc_inode_unlock (curr); + +                                if (size_pruned >= size_to_prune) +                                        break; +                        } /* list_for_each_entry_safe (curr...) */ + +                        if (size_pruned >= size_to_prune) +                                break; +                } /* for(index=0;...) */ + +        } /* ioc_inode_table locked region end */ +        ioc_table_unlock (table); + +        return 0;  }  /* @@ -185,12 +233,12 @@ ioc_prune (ioc_table_t *table)   *   */  ioc_page_t * -ioc_page_create (ioc_inode_t *ioc_inode, off_t offset) +__ioc_page_create (ioc_inode_t *ioc_inode, off_t offset)  { -	ioc_table_t *table          = NULL; -	ioc_page_t  *page           = NULL; -	off_t        rounded_offset = 0; -	ioc_page_t  *newpage        = NULL; +        ioc_table_t *table          = NULL; +        ioc_page_t  *page           = NULL; +        off_t        rounded_offset = 0; +        ioc_page_t  *newpage        = NULL;          table = ioc_inode->table;          rounded_offset = floor (offset, table->page_size); @@ -201,71 +249,71 @@ ioc_page_create (ioc_inode_t *ioc_inode, off_t offset)                  goto out;          } -	if (!ioc_inode) { +        if (!ioc_inode) {                  GF_FREE (newpage);                  newpage = NULL;                  goto out; -	} +        } -	newpage->offset = rounded_offset; -	newpage->inode = ioc_inode; -	pthread_mutex_init (&newpage->page_lock, NULL); +        newpage->offset = rounded_offset; +        newpage->inode = ioc_inode; +        pthread_mutex_init (&newpage->page_lock, NULL);          rbthash_insert (ioc_inode->cache.page_table, newpage, &rounded_offset,                          sizeof (rounded_offset)); -	list_add_tail (&newpage->page_lru, &ioc_inode->cache.page_lru); +        list_add_tail (&newpage->page_lru, &ioc_inode->cache.page_lru); -	page = newpage; +        page = newpage; -	gf_log ("io-cache", GF_LOG_TRACE, -		"returning new page %p", page); +        gf_log ("io-cache", GF_LOG_TRACE, +                "returning new page %p", page);  out: -	return page; +        return page;  } -/*  - * ioc_wait_on_page - pause a frame to wait till the arrival of a page.  - * here we need to handle the case when the frame who calls wait_on_page  - * himself has caused page_fault  +/* + * ioc_wait_on_page - pause a frame to wait till the arrival of a page. + * here we need to handle the case when the frame who calls wait_on_page + * himself has caused page_fault   *   * @page: page to wait on   * @frame: call frame who is waiting on page   *   */  void -ioc_wait_on_page (ioc_page_t *page, call_frame_t *frame, off_t offset, -		  size_t size) +__ioc_wait_on_page (ioc_page_t *page, call_frame_t *frame, off_t offset, +                    size_t size)  { -	ioc_waitq_t *waitq = NULL; -	ioc_local_t *local = frame->local; +        ioc_waitq_t *waitq = NULL; +        ioc_local_t *local = frame->local; -	waitq = GF_CALLOC (1, sizeof (*waitq), gf_ioc_mt_ioc_waitq_t); +        waitq = GF_CALLOC (1, sizeof (*waitq), gf_ioc_mt_ioc_waitq_t);          if (waitq == NULL) {                  local->op_ret = -1;                  local->op_errno = ENOMEM;                  gf_log (frame->this->name, GF_LOG_ERROR, "out of memory");                  goto out; -        }  - -	gf_log (frame->this->name, GF_LOG_TRACE, -		"frame(%p) waiting on page = %p, offset=%"PRId64", " -		"size=%"GF_PRI_SIZET"", -		frame, page, offset, size); - -	waitq->data = frame; -	waitq->next = page->waitq; -	waitq->pending_offset = offset; -	waitq->pending_size = size; -	page->waitq = waitq; -	/* one frame can wait only once on a given page,  -	 * local->wait_count is number of pages a frame is waiting on */ -	ioc_local_lock (local); -	{ -		local->wait_count++; -	} -	ioc_local_unlock (local); +        } + +        gf_log (frame->this->name, GF_LOG_TRACE, +                "frame(%p) waiting on page = %p, offset=%"PRId64", " +                "size=%"GF_PRI_SIZET"", +                frame, page, offset, size); + +        waitq->data = frame; +        waitq->next = page->waitq; +        waitq->pending_offset = offset; +        waitq->pending_size = size; +        page->waitq = waitq; +        /* one frame can wait only once on a given page, +         * local->wait_count is number of pages a frame is waiting on */ +        ioc_local_lock (local); +        { +                local->wait_count++; +        } +        ioc_local_unlock (local);  out:          return; @@ -273,7 +321,7 @@ out:  /* - * ioc_cache_still_valid - see if cached pages ioc_inode are still valid  + * ioc_cache_still_valid - see if cached pages ioc_inode are still valid   * against given stbuf   *   * @ioc_inode: @@ -284,62 +332,62 @@ out:  int8_t  ioc_cache_still_valid (ioc_inode_t *ioc_inode, struct iatt *stbuf)  { -	int8_t cache_still_valid = 1; +        int8_t cache_still_valid = 1;  #if 0 -	if (!stbuf || (stbuf->ia_mtime != ioc_inode->cache.mtime) ||  -	    (stbuf->st_mtim.tv_nsec != ioc_inode->stbuf.st_mtim.tv_nsec)) -		cache_still_valid = 0; +        if (!stbuf || (stbuf->ia_mtime != ioc_inode->cache.mtime) || +            (stbuf->st_mtim.tv_nsec != ioc_inode->stbuf.st_mtim.tv_nsec)) +                cache_still_valid = 0;  #else -	if (!stbuf || (stbuf->ia_mtime != ioc_inode->cache.mtime) +        if (!stbuf || (stbuf->ia_mtime != ioc_inode->cache.mtime)              || (stbuf->ia_mtime_nsec != ioc_inode->cache.mtime_nsec)) -		cache_still_valid = 0; +                cache_still_valid = 0;  #endif  #if 0 -	/* talk with avati@gluster.com to enable this section */ -	if (!ioc_inode->mtime && stbuf) { -		cache_still_valid = 1; -		ioc_inode->mtime = stbuf->ia_mtime; -	} +        /* talk with avati@gluster.com to enable this section */ +        if (!ioc_inode->mtime && stbuf) { +                cache_still_valid = 1; +                ioc_inode->mtime = stbuf->ia_mtime; +        }  #endif -	return cache_still_valid; +        return cache_still_valid;  }  void  ioc_waitq_return (ioc_waitq_t *waitq)  { -	ioc_waitq_t  *trav   = NULL; -	ioc_waitq_t  *next   = NULL; -	call_frame_t *frame = NULL; +        ioc_waitq_t  *trav   = NULL; +        ioc_waitq_t  *next   = NULL; +        call_frame_t *frame = NULL; -	for (trav = waitq; trav; trav = next) { -		next = trav->next; +        for (trav = waitq; trav; trav = next) { +                next = trav->next; -		frame = trav->data; -		ioc_frame_return (frame); -		GF_FREE (trav); -	} +                frame = trav->data; +                ioc_frame_return (frame); +                GF_FREE (trav); +        }  }  int  ioc_fault_cbk (call_frame_t *frame, void *cookie, xlator_t *this,                 int32_t op_ret, int32_t op_errno, struct iovec *vector, -	       int32_t count, struct iatt *stbuf, struct iobref *iobref) +               int32_t count, struct iatt *stbuf, struct iobref *iobref)  { -	ioc_local_t *local = NULL; -	off_t       offset = 0; -	ioc_inode_t *ioc_inode = NULL; -	ioc_table_t *table = NULL; -	ioc_page_t  *page = NULL; -	int32_t     destroy_size = 0; -	size_t      page_size = 0; -	ioc_waitq_t *waitq = NULL; +        ioc_local_t *local = NULL; +        off_t       offset = 0; +        ioc_inode_t *ioc_inode = NULL; +        ioc_table_t *table = NULL; +        ioc_page_t  *page = NULL; +        int32_t     destroy_size = 0; +        size_t      page_size = 0; +        ioc_waitq_t *waitq = NULL;          size_t      iobref_page_size = 0;          char        zero_filled = 0; @@ -351,127 +399,128 @@ ioc_fault_cbk (call_frame_t *frame, void *cookie, xlator_t *this,          zero_filled = ((op_ret >=0)                         && (stbuf->ia_mtime == 0)); -	ioc_inode_lock (ioc_inode); -	{ -		if (op_ret == -1 || -		    !(zero_filled || +        ioc_inode_lock (ioc_inode); +        { +                if (op_ret == -1 || +                    !(zero_filled ||                        ioc_cache_still_valid(ioc_inode, stbuf))) { -			gf_log (ioc_inode->table->xl->name, GF_LOG_TRACE, -				"cache for inode(%p) is invalid. flushing " -				"all pages", ioc_inode); -			destroy_size = __ioc_inode_flush (ioc_inode); -		} - -		if ((op_ret >= 0) && !zero_filled) { -			ioc_inode->cache.mtime = stbuf->ia_mtime; +                        gf_log (ioc_inode->table->xl->name, GF_LOG_TRACE, +                                "cache for inode(%p) is invalid. flushing " +                                "all pages", ioc_inode); +                        destroy_size = __ioc_inode_flush (ioc_inode); +                } + +                if ((op_ret >= 0) && !zero_filled) { +                        ioc_inode->cache.mtime = stbuf->ia_mtime;                          ioc_inode->cache.mtime_nsec = stbuf->ia_mtime_nsec;                  } -		gettimeofday (&ioc_inode->cache.tv, NULL); - -		if (op_ret < 0) { -			/* error, readv returned -1 */ -			page = ioc_page_get (ioc_inode, offset); -			if (page) -				waitq = ioc_page_error (page, op_ret,  -							op_errno); -		} else { -			gf_log (ioc_inode->table->xl->name, GF_LOG_TRACE, -				"op_ret = %d", op_ret); -			page = ioc_page_get (ioc_inode, offset); -			if (!page) { -				/* page was flushed */ -				/* some serious bug ? */ -				gf_log (this->name, GF_LOG_DEBUG, -					"wasted copy: %"PRId64"[+%"PRId64"] " -					"ioc_inode=%p", offset,  -					table->page_size, ioc_inode); -			} else { -				if (page->vector) { -					iobref_unref (page->iobref); -					GF_FREE (page->vector); -					page->vector = NULL; -				} - -				/* keep a copy of the page for our cache */ -				page->vector = iov_dup (vector, count); +                gettimeofday (&ioc_inode->cache.tv, NULL); + +                if (op_ret < 0) { +                        /* error, readv returned -1 */ +                        page = __ioc_page_get (ioc_inode, offset); +                        if (page) +                                waitq = __ioc_page_error (page, op_ret, +                                                          op_errno); +                } else { +                        gf_log (ioc_inode->table->xl->name, GF_LOG_TRACE, +                                "op_ret = %d", op_ret); +                        page = __ioc_page_get (ioc_inode, offset); +                        if (!page) { +                                /* page was flushed */ +                                /* some serious bug ? */ +                                gf_log (this->name, GF_LOG_DEBUG, +                                        "wasted copy: %"PRId64"[+%"PRId64"] " +                                        "ioc_inode=%p", offset, +                                        table->page_size, ioc_inode); +                        } else { +                                if (page->vector) { +                                        iobref_unref (page->iobref); +                                        GF_FREE (page->vector); +                                        page->vector = NULL; +                                } + +                                /* keep a copy of the page for our cache */ +                                page->vector = iov_dup (vector, count);                                  if (page->vector == NULL) { -                                        page = ioc_page_get (ioc_inode, offset); +                                        page = __ioc_page_get (ioc_inode, +                                                               offset);                                          if (page != NULL) -                                                waitq = ioc_page_error (page, -                                                                        -1,  -                                                                        ENOMEM); +                                                waitq = __ioc_page_error (page, +                                                                          -1, +                                                                          ENOMEM);                                          goto unlock;                                  } -				page->count = count; -				if (iobref) { -					page->iobref = iobref_ref (iobref); -				} else { -					/* TODO: we have got a response to  -					 * our request and no data */ -					gf_log (this->name, GF_LOG_CRITICAL, -						"frame>root>rsp_refs is null"); -				} /* if(frame->root->rsp_refs) */ - -				/* page->size should indicate exactly how  -				 * much the readv call to the child -				 * translator returned. earlier op_ret  -				 * from child translator was used, which  -				 * gave rise to a bug where reads from  -				 * io-cached volume were resulting in 0  -				 * byte replies */ -				page_size = iov_length(vector, count); -				page->size = page_size; +                                page->count = count; +                                if (iobref) { +                                        page->iobref = iobref_ref (iobref); +                                } else { +                                        /* TODO: we have got a response to +                                         * our request and no data */ +                                        gf_log (this->name, GF_LOG_CRITICAL, +                                                "frame>root>rsp_refs is null"); +                                } /* if(frame->root->rsp_refs) */ + +                                /* page->size should indicate exactly how +                                 * much the readv call to the child +                                 * translator returned. earlier op_ret +                                 * from child translator was used, which +                                 * gave rise to a bug where reads from +                                 * io-cached volume were resulting in 0 +                                 * byte replies */ +                                page_size = iov_length(vector, count); +                                page->size = page_size;                                  iobref_page_size = iobref_size (page->iobref); -				if (page->waitq) { -					/* wake up all the frames waiting on  -					 * this page, including  -					 * the frame which triggered fault */ -					waitq = ioc_page_wakeup (page); -				} /* if(page->waitq) */ -			} /* if(!page)...else */ -		} /* if(op_ret < 0)...else */ -	} /* ioc_inode locked region end */ +                                if (page->waitq) { +                                        /* wake up all the frames waiting on +                                         * this page, including +                                         * the frame which triggered fault */ +                                        waitq = __ioc_page_wakeup (page); +                                } /* if(page->waitq) */ +                        } /* if(!page)...else */ +                } /* if(op_ret < 0)...else */ +        } /* ioc_inode locked region end */  unlock: -	ioc_inode_unlock (ioc_inode); +        ioc_inode_unlock (ioc_inode); -	ioc_waitq_return (waitq); +        ioc_waitq_return (waitq); -	if (iobref_page_size) { -		ioc_table_lock (table); -		{ -			table->cache_used += iobref_page_size; -		} -		ioc_table_unlock (table); -	} +        if (iobref_page_size) { +                ioc_table_lock (table); +                { +                        table->cache_used += iobref_page_size; +                } +                ioc_table_unlock (table); +        } -	if (destroy_size) { -		ioc_table_lock (table); -		{ -			table->cache_used -= destroy_size; -		} -		ioc_table_unlock (table); -	} +        if (destroy_size) { +                ioc_table_lock (table); +                { +                        table->cache_used -= destroy_size; +                } +                ioc_table_unlock (table); +        } -	if (ioc_need_prune (ioc_inode->table)) { -		ioc_prune (ioc_inode->table); -	} +        if (ioc_need_prune (ioc_inode->table)) { +                ioc_prune (ioc_inode->table); +        } -	gf_log (this->name, GF_LOG_TRACE, "fault frame %p returned", frame); -	pthread_mutex_destroy (&local->local_lock); +        gf_log (this->name, GF_LOG_TRACE, "fault frame %p returned", frame); +        pthread_mutex_destroy (&local->local_lock); -	fd_unref (local->fd); +        fd_unref (local->fd); -	STACK_DESTROY (frame->root); -	return 0; +        STACK_DESTROY (frame->root); +        return 0;  }  /*   * ioc_page_fault - - *  + *   * @ioc_inode:   * @frame:   * @fd: @@ -480,14 +529,14 @@ unlock:   */  void  ioc_page_fault (ioc_inode_t *ioc_inode,	call_frame_t *frame, fd_t *fd, -		off_t offset) +                off_t offset)  { -	ioc_table_t  *table = NULL; -	call_frame_t *fault_frame = NULL; -	ioc_local_t  *fault_local = NULL; -        int32_t      op_ret = -1, op_errno = -1; -        ioc_waitq_t  *waitq = NULL; -        ioc_page_t   *page = NULL; +        ioc_table_t  *table       = NULL; +        call_frame_t *fault_frame = NULL; +        ioc_local_t  *fault_local = NULL; +        int32_t       op_ret      = -1, op_errno = -1; +        ioc_waitq_t  *waitq       = NULL; +        ioc_page_t   *page        = NULL;          table = ioc_inode->table;          fault_frame = copy_frame (frame); @@ -500,7 +549,7 @@ ioc_page_fault (ioc_inode_t *ioc_inode,	call_frame_t *frame, fd_t *fd,          }          fault_local = GF_CALLOC (1, sizeof (ioc_local_t), -                                gf_ioc_mt_ioc_local_t); +                                 gf_ioc_mt_ioc_local_t);          if (fault_local == NULL) {                  op_ret = -1;                  op_errno = ENOMEM; @@ -510,92 +559,97 @@ ioc_page_fault (ioc_inode_t *ioc_inode,	call_frame_t *frame, fd_t *fd,                  goto err;          } -	/* NOTE: copy_frame() means, the frame the fop whose fd_ref we  -	 * are using till now won't be valid till we get reply from server.  -	 * we unref this fd, in fault_cbk */ -	fault_local->fd = fd_ref (fd); +        /* NOTE: copy_frame() means, the frame the fop whose fd_ref we +         * are using till now won't be valid till we get reply from server. +         * we unref this fd, in fault_cbk */ +        fault_local->fd = fd_ref (fd); + +        fault_frame->local = fault_local; +        pthread_mutex_init (&fault_local->local_lock, NULL); -	fault_frame->local = fault_local; -	pthread_mutex_init (&fault_local->local_lock, NULL); +        INIT_LIST_HEAD (&fault_local->fill_list); +        fault_local->pending_offset = offset; +        fault_local->pending_size = table->page_size; +        fault_local->inode = ioc_inode; -	INIT_LIST_HEAD (&fault_local->fill_list); -	fault_local->pending_offset = offset; -	fault_local->pending_size = table->page_size; -	fault_local->inode = ioc_inode; +        gf_log (frame->this->name, GF_LOG_TRACE, +                "stack winding page fault for offset = %"PRId64" with " +                "frame %p", offset, fault_frame); -	gf_log (frame->this->name, GF_LOG_TRACE, -		"stack winding page fault for offset = %"PRId64" with " -		"frame %p", offset, fault_frame); -   -	STACK_WIND (fault_frame, ioc_fault_cbk, FIRST_CHILD(fault_frame->this), -		    FIRST_CHILD(fault_frame->this)->fops->readv, fd, +        STACK_WIND (fault_frame, ioc_fault_cbk, FIRST_CHILD(fault_frame->this), +                    FIRST_CHILD(fault_frame->this)->fops->readv, fd,                      table->page_size, offset); -	return; +        return;  err: -        page = ioc_page_get (ioc_inode, offset); -        if (page != NULL) { -                waitq = ioc_page_error (page, op_ret, op_errno); -                if (waitq != NULL) { -                        ioc_waitq_return (waitq); +        ioc_inode_lock (ioc_inode); +        { +                page = __ioc_page_get (ioc_inode, offset); +                if (page != NULL) { +                        waitq = __ioc_page_error (page, op_ret, op_errno);                  }          } +        ioc_inode_unlock (ioc_inode); + +        if (waitq != NULL) { +                ioc_waitq_return (waitq); +        }  }  int32_t  ioc_frame_fill (ioc_page_t *page, call_frame_t *frame, off_t offset,                  size_t size)  { -	ioc_local_t *local = NULL; -	ioc_fill_t  *fill = NULL; -	off_t       src_offset = 0; -	off_t       dst_offset = 0; -	ssize_t     copy_size = 0; -	ioc_inode_t *ioc_inode = NULL; -        ioc_fill_t  *new = NULL; -        int8_t      found = 0; -        int32_t     ret = 0; +        ioc_local_t *local      = NULL; +        ioc_fill_t  *fill       = NULL; +        off_t        src_offset = 0; +        off_t        dst_offset = 0; +        ssize_t      copy_size  = 0; +        ioc_inode_t *ioc_inode  = NULL; +        ioc_fill_t  *new        = NULL; +        int8_t       found      = 0; +        int32_t      ret        = 0;          local = frame->local;          ioc_inode = page->inode; -	gf_log (frame->this->name, GF_LOG_TRACE, -		"frame (%p) offset = %"PRId64" && size = %"GF_PRI_SIZET" " -		"&& page->size = %"GF_PRI_SIZET" && wait_count = %d", -		frame, offset, size, page->size, local->wait_count); - -	/* immediately move this page to the end of the page_lru list */ -	list_move_tail (&page->page_lru, &ioc_inode->cache.page_lru); -	/* fill local->pending_size bytes from local->pending_offset */ -	if (local->op_ret != -1 && page->size) { -		if (offset > page->offset) -			/* offset is offset in file, convert it to offset in -			 * page */ -			src_offset = offset - page->offset; -		/*FIXME: since offset is the offset within page is the -		 * else case valid? */ -		else -			/* local->pending_offset is in previous page. do not -			 * fill until we have filled all previous pages */ -			dst_offset = page->offset - offset; - -		/* we have to copy from offset to either end of this page -		 * or till the requested size */ -		copy_size = min (page->size - src_offset, -				 size - dst_offset); - -		if (copy_size < 0) { -			/* if page contains fewer bytes and the required offset -			   is beyond the page size in the page */ -			copy_size = src_offset = 0; -		} - -		gf_log (page->inode->table->xl->name, GF_LOG_TRACE, -			"copy_size = %"GF_PRI_SIZET" && src_offset = " -			"%"PRId64" && dst_offset = %"PRId64"", -			copy_size, src_offset, dst_offset); - -		{ +        gf_log (frame->this->name, GF_LOG_TRACE, +                "frame (%p) offset = %"PRId64" && size = %"GF_PRI_SIZET" " +                "&& page->size = %"GF_PRI_SIZET" && wait_count = %d", +                frame, offset, size, page->size, local->wait_count); + +        /* immediately move this page to the end of the page_lru list */ +        list_move_tail (&page->page_lru, &ioc_inode->cache.page_lru); +        /* fill local->pending_size bytes from local->pending_offset */ +        if (local->op_ret != -1 && page->size) { +                if (offset > page->offset) +                        /* offset is offset in file, convert it to offset in +                         * page */ +                        src_offset = offset - page->offset; +                /*FIXME: since offset is the offset within page is the +                 * else case valid? */ +                else +                        /* local->pending_offset is in previous page. do not +                         * fill until we have filled all previous pages */ +                        dst_offset = page->offset - offset; + +                /* we have to copy from offset to either end of this page +                 * or till the requested size */ +                copy_size = min (page->size - src_offset, +                                 size - dst_offset); + +                if (copy_size < 0) { +                        /* if page contains fewer bytes and the required offset +                           is beyond the page size in the page */ +                        copy_size = src_offset = 0; +                } + +                gf_log (page->inode->table->xl->name, GF_LOG_TRACE, +                        "copy_size = %"GF_PRI_SIZET" && src_offset = " +                        "%"PRId64" && dst_offset = %"PRId64"", +                        copy_size, src_offset, dst_offset); + +                {                          new = GF_CALLOC (1, sizeof (*new),                                           gf_ioc_mt_ioc_fill_t);                          if (new == NULL) { @@ -607,17 +661,17 @@ ioc_frame_fill (ioc_page_t *page, call_frame_t *frame, off_t offset,                                  goto out;                          } -			new->offset = page->offset; -			new->size = copy_size; -			new->iobref = iobref_ref (page->iobref); -			new->count = iov_subset (page->vector, -						 page->count, -						 src_offset, -						 src_offset + copy_size, -						 NULL); - -			new->vector = GF_CALLOC (new->count, -					         sizeof (struct iovec), +                        new->offset = page->offset; +                        new->size = copy_size; +                        new->iobref = iobref_ref (page->iobref); +                        new->count = iov_subset (page->vector, +                                                 page->count, +                                                 src_offset, +                                                 src_offset + copy_size, +                                                 NULL); + +                        new->vector = GF_CALLOC (new->count, +                                                 sizeof (struct iovec),                                                   gf_ioc_mt_iovec);                          if (new->vector == NULL) {                                  local->op_ret = -1; @@ -632,50 +686,50 @@ ioc_frame_fill (ioc_page_t *page, call_frame_t *frame, off_t offset,                                  goto out;                          } -			new->count = iov_subset (page->vector, -						 page->count, -						 src_offset, -						 src_offset + copy_size, -						 new->vector); +                        new->count = iov_subset (page->vector, +                                                 page->count, +                                                 src_offset, +                                                 src_offset + copy_size, +                                                 new->vector); -			/* add the ioc_fill to fill_list for this frame */ -			if (list_empty (&local->fill_list)) { -				/* if list is empty, then this is the first -				 * time we are filling frame, add the -				 * ioc_fill_t to the end of list */ -				list_add_tail (&new->list, &local->fill_list); -			} else { +                        /* add the ioc_fill to fill_list for this frame */ +                        if (list_empty (&local->fill_list)) { +                                /* if list is empty, then this is the first +                                 * time we are filling frame, add the +                                 * ioc_fill_t to the end of list */ +                                list_add_tail (&new->list, &local->fill_list); +                        } else {                                  found = 0; -				/* list is not empty, we need to look for -				 * where this offset fits in list */ -				list_for_each_entry (fill, &local->fill_list, -						     list) { -					if (fill->offset > new->offset) { -						found = 1; -						break; -					} -				} - -				if (found) { -					list_add_tail (&new->list, -						       &fill->list); -				} else { -					list_add_tail (&new->list, -						       &local->fill_list); -				} -			} -		} -		local->op_ret += copy_size; -	} +                                /* list is not empty, we need to look for +                                 * where this offset fits in list */ +                                list_for_each_entry (fill, &local->fill_list, +                                                     list) { +                                        if (fill->offset > new->offset) { +                                                found = 1; +                                                break; +                                        } +                                } + +                                if (found) { +                                        list_add_tail (&new->list, +                                                       &fill->list); +                                } else { +                                        list_add_tail (&new->list, +                                                       &local->fill_list); +                                } +                        } +                } +                local->op_ret += copy_size; +        }  out:          return ret;  }  /* - * ioc_frame_unwind - frame unwinds only from here  + * ioc_frame_unwind - frame unwinds only from here   *   * @frame: call frame to unwind   * @@ -686,43 +740,43 @@ out:  static void  ioc_frame_unwind (call_frame_t *frame)  { -	ioc_local_t   *local = NULL; -	ioc_fill_t    *fill = NULL, *next = NULL; -	int32_t       count = 0; -	struct iovec  *vector = NULL; -	int32_t       copied = 0; -	struct iobref *iobref = NULL; -	struct iatt   stbuf = {0,}; -	int32_t       op_ret = 0; +        ioc_local_t   *local  = NULL; +        ioc_fill_t    *fill   = NULL, *next = NULL; +        int32_t        count  = 0; +        struct iovec  *vector = NULL; +        int32_t        copied = 0; +        struct iobref *iobref = NULL; +        struct iatt    stbuf  = {0,}; +        int32_t        op_ret = 0;          local = frame->local; -	//  ioc_local_lock (local); -	frame->local = NULL; -	iobref = iobref_new (); +        //  ioc_local_lock (local); +        frame->local = NULL; +        iobref = iobref_new ();          if (iobref == NULL) {                  op_ret = -1;                  gf_log (frame->this->name, GF_LOG_ERROR, "out of memory");          } -	if (list_empty (&local->fill_list)) { -		gf_log (frame->this->name, GF_LOG_TRACE, -			"frame(%p) has 0 entries in local->fill_list " -			"(offset = %"PRId64" && size = %"GF_PRI_SIZET")", -			frame, local->offset, local->size); -	} +        if (list_empty (&local->fill_list)) { +                gf_log (frame->this->name, GF_LOG_TRACE, +                        "frame(%p) has 0 entries in local->fill_list " +                        "(offset = %"PRId64" && size = %"GF_PRI_SIZET")", +                        frame, local->offset, local->size); +        } -	list_for_each_entry (fill, &local->fill_list, list) { -		count += fill->count; -	} +        list_for_each_entry (fill, &local->fill_list, list) { +                count += fill->count; +        } -	vector = GF_CALLOC (count, sizeof (*vector), gf_ioc_mt_iovec); +        vector = GF_CALLOC (count, sizeof (*vector), gf_ioc_mt_iovec);          if (vector == NULL) {                  op_ret = -1;                  gf_log (frame->this->name, GF_LOG_ERROR, "out of memory");          } -	list_for_each_entry_safe (fill, next, &local->fill_list, list) { +        list_for_each_entry_safe (fill, next, &local->fill_list, list) {                  if ((vector != NULL) &&  (iobref != NULL)) {                          memcpy (((char *)vector) + copied,                                  fill->vector, @@ -733,22 +787,22 @@ ioc_frame_unwind (call_frame_t *frame)                          iobref_merge (iobref, fill->iobref);                  } -		list_del (&fill->list); -		iobref_unref (fill->iobref); -		GF_FREE (fill->vector); -		GF_FREE (fill); -	} +                list_del (&fill->list); +                iobref_unref (fill->iobref); +                GF_FREE (fill->vector); +                GF_FREE (fill); +        }          if (op_ret != -1) {                  op_ret = iov_length (vector, count);          } -	gf_log (frame->this->name, GF_LOG_TRACE, -		"frame(%p) unwinding with op_ret=%d", frame, op_ret); +        gf_log (frame->this->name, GF_LOG_TRACE, +                "frame(%p) unwinding with op_ret=%d", frame, op_ret); -	//  ioc_local_unlock (local); +        //  ioc_local_unlock (local); -	STACK_UNWIND_STRICT (readv, frame, op_ret, local->op_errno, vector, +        STACK_UNWIND_STRICT (readv, frame, op_ret, local->op_errno, vector,                               count, &stbuf, iobref);          if (iobref != NULL) { @@ -760,8 +814,8 @@ ioc_frame_unwind (call_frame_t *frame)                  vector = NULL;          } -	pthread_mutex_destroy (&local->local_lock); -	GF_FREE (local); +        pthread_mutex_destroy (&local->local_lock); +        GF_FREE (local);          return;  } @@ -775,56 +829,104 @@ ioc_frame_unwind (call_frame_t *frame)  void  ioc_frame_return (call_frame_t *frame)  { -	ioc_local_t *local = NULL; -	int32_t wait_count = 0; +        ioc_local_t *local = NULL; +        int32_t wait_count = 0;          local = frame->local;          GF_ASSERT (local->wait_count > 0); -	ioc_local_lock (local); -	{ -		wait_count = --local->wait_count; -	} -	ioc_local_unlock (local); +        ioc_local_lock (local); +        { +                wait_count = --local->wait_count; +        } +        ioc_local_unlock (local); -	if (!wait_count) { -		ioc_frame_unwind (frame); -	} +        if (!wait_count) { +                ioc_frame_unwind (frame); +        } -	return; +        return;  }  /* - * ioc_page_wakeup - + * __ioc_page_wakeup -   * @page:   *   * to be called only when a frame is waiting on an in-transit page   */  ioc_waitq_t * -ioc_page_wakeup (ioc_page_t *page) +__ioc_page_wakeup (ioc_page_t *page)  { -	ioc_waitq_t  *waitq = NULL, *trav = NULL; -	call_frame_t *frame = NULL; -        int32_t      ret = -1; +        ioc_waitq_t  *waitq = NULL, *trav = NULL; +        call_frame_t *frame = NULL; +        int32_t       ret   = -1; -	waitq = page->waitq; -	page->waitq = NULL; +        waitq = page->waitq; +        page->waitq = NULL; -	page->ready = 1; +        page->ready = 1; -	gf_log (page->inode->table->xl->name, GF_LOG_TRACE, -		"page is %p && waitq = %p", page, waitq); +        gf_log (page->inode->table->xl->name, GF_LOG_TRACE, +                "page is %p && waitq = %p", page, waitq); -	for (trav = waitq; trav; trav = trav->next) { -		frame = trav->data; -		ret = ioc_frame_fill (page, frame, trav->pending_offset, +        for (trav = waitq; trav; trav = trav->next) { +                frame = trav->data; +                ret = ioc_frame_fill (page, frame, trav->pending_offset,                                        trav->pending_size);                  if (ret == -1) {                          break;                  } -	} -	 -	return waitq; +        } + +        return waitq; +} + + +/* + * ioc_page_error - + * @page: + * @op_ret: + * @op_errno: + * + */ +ioc_waitq_t * +__ioc_page_error (ioc_page_t *page, int32_t op_ret, int32_t op_errno) +{ +        ioc_waitq_t  *waitq = NULL, *trav = NULL; +        call_frame_t *frame = NULL; +        int64_t       ret   = 0; +        ioc_table_t  *table = NULL; +        ioc_local_t  *local = NULL; + +        waitq = page->waitq; +        page->waitq = NULL; + +        gf_log (page->inode->table->xl->name, GF_LOG_DEBUG, +                "page error for page = %p & waitq = %p", page, waitq); + +        for (trav = waitq; trav; trav = trav->next) { + +                frame = trav->data; + +                local = frame->local; +                ioc_local_lock (local); +                { +                        if (local->op_ret != -1) { +                                local->op_ret = op_ret; +                                local->op_errno = op_errno; +                        } +                } +                ioc_local_unlock (local); +        } + +        table = page->inode->table; +        ret = ioc_page_destroy (page); + +        if (ret != -1) { +                table->cache_used -= ret; +        } + +        return waitq;  } @@ -838,39 +940,18 @@ ioc_page_wakeup (ioc_page_t *page)  ioc_waitq_t *  ioc_page_error (ioc_page_t *page, int32_t op_ret, int32_t op_errno)  { -	ioc_waitq_t  *waitq = NULL, *trav = NULL; -	call_frame_t *frame = NULL; -	int64_t      ret = 0; -	ioc_table_t  *table = NULL; -	ioc_local_t  *local = NULL; - -	waitq = page->waitq; -	page->waitq = NULL; -   -	gf_log (page->inode->table->xl->name, GF_LOG_DEBUG, -		"page error for page = %p & waitq = %p", page, waitq); - -	for (trav = waitq; trav; trav = trav->next) { - -		frame = trav->data; - -		local = frame->local; -		ioc_local_lock (local); -		{ -			if (local->op_ret != -1) { -				local->op_ret = op_ret; -				local->op_errno = op_errno; -			} -		} -		ioc_local_unlock (local); -	} - -	table = page->inode->table; -	ret = ioc_page_destroy (page); - -	if (ret != -1) { -		table->cache_used -= ret; -	} - -	return waitq; +        ioc_waitq_t  *waitq = NULL; + +        if (page == NULL) { +                goto out; +        } + +        ioc_inode_lock (page->inode); +        { +                waitq = __ioc_page_error (page, op_ret, op_errno); +        } +        ioc_inode_unlock (page->inode); + +out: +        return waitq;  }  | 
