/* Copyright (c) 2006-2010 Gluster, Inc. This file is part of GlusterFS. GlusterFS is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. GlusterFS is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ #ifndef _CONFIG_H #define _CONFIG_H #include "config.h" #endif #include "glusterfs.h" #include "logging.h" #include "dict.h" #include "xlator.h" #include "read-ahead.h" #include ra_page_t * ra_page_get (ra_file_t *file, off_t offset) { ra_page_t *page = NULL; off_t rounded_offset = 0; page = file->pages.next; rounded_offset = floor (offset, file->page_size); while (page != &file->pages && page->offset < rounded_offset) page = page->next; if (page == &file->pages || page->offset != rounded_offset) page = NULL; return page; } ra_page_t * ra_page_create (ra_file_t *file, off_t offset) { ra_page_t *page = NULL; off_t rounded_offset = 0; ra_page_t *newpage = NULL; page = file->pages.next; rounded_offset = floor (offset, file->page_size); while (page != &file->pages && page->offset < rounded_offset) page = page->next; if (page == &file->pages || page->offset != rounded_offset) { newpage = GF_CALLOC (1, sizeof (*newpage), gf_ra_mt_ra_page_t); if (!newpage) return NULL; newpage->offset = rounded_offset; newpage->prev = page->prev; newpage->next = page; newpage->file = file; page->prev->next = newpage; page->prev = newpage; page = newpage; } return page; } void ra_wait_on_page (ra_page_t *page, call_frame_t *frame) { ra_waitq_t *waitq = NULL; ra_local_t *local = NULL; local = frame->local; waitq = GF_CALLOC (1, sizeof (*waitq), gf_ra_mt_ra_waitq_t); if (!waitq) { gf_log (frame->this->name, GF_LOG_ERROR, "out of memory"); local->op_ret = -1; local->op_errno = ENOMEM; goto out; } waitq->data = frame; waitq->next = page->waitq; page->waitq = waitq; ra_local_lock (local); { local->wait_count++; } ra_local_unlock (local); out: return; } void ra_waitq_return (ra_waitq_t *waitq) { ra_waitq_t *trav = NULL; ra_waitq_t *next = NULL; call_frame_t *frame = NULL; for (trav = waitq; trav; trav = next) { next = trav->next; frame = trav->data; ra_frame_return (frame); GF_FREE (trav); } } int ra_fault_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret, int32_t op_errno, struct iovec *vector, int32_t count, struct iatt *stbuf, struct iobref *iobref) { ra_local_t *local = NULL; off_t pending_offset = 0; ra_file_t *file = NULL; ra_page_t *page = NULL; ra_waitq_t *waitq = NULL; fd_t *fd = NULL; uint64_t tmp_file = 0; local = frame->local; fd = local->fd; fd_ctx_get (fd, this, &tmp_file); file = (ra_file_t *)(long)tmp_file; pending_offset = local->pending_offset; ra_file_lock (file); { if (op_ret >= 0) file->stbuf = *stbuf; if (op_ret < 0) { page = ra_page_get (file, pending_offset); if (page) waitq = ra_page_error (page, op_ret, op_errno); goto unlock; } page = ra_page_get (file, pending_offset); if (!page) { gf_log (this->name, GF_LOG_DEBUG, "wasted copy: %"PRId64"[+%"PRId64"] file=%p", pending_offset, file->page_size, file); goto unlock; } if (page->vector) { iobref_unref (page->iobref); GF_FREE (page->vector); } page->vector = iov_dup (vector, count); if (page->vector == NULL) { waitq = ra_page_error (page, -1, ENOMEM); goto unlock; } page->count = count; page->iobref = iobref_ref (iobref); page->ready = 1; page->size = iov_length (vector, count); waitq = ra_page_wakeup (page); } unlock: ra_file_unlock (file); ra_waitq_return (waitq); fd_unref (local->fd); GF_FREE (frame->local); frame->local = NULL; STACK_DESTROY (frame->root); return 0; } void ra_page_fault (ra_file_t *file, call_frame_t *frame, off_t offset) { call_frame_t *fault_frame = NULL; ra_local_t *fault_local = NULL; ra_page_t *page = NULL; ra_waitq_t *waitq = NULL; int32_t op_ret = -1, op_errno = -1; fault_frame = copy_frame (frame); if (fault_frame == NULL) { op_ret = -1; op_errno = ENOMEM; goto err; } fault_local = GF_CALLOC (1, sizeof (ra_local_t), gf_ra_mt_ra_local_t); if (fault_local == NULL) { STACK_DESTROY (fault_frame->root); op_ret = -1; op_errno = ENOMEM; goto err; } fault_frame->local = fault_local; fault_local->pending_offset = offset; fault_local->pending_size = file->page_size; fault_local->fd = fd_ref (file->fd); STACK_WIND (fault_frame, ra_fault_cbk, FIRST_CHILD (fault_frame->this), FIRST_CHILD (fault_frame->this)->fops->readv, file->fd, file->page_size, offset); return; err: ra_file_lock (file); { page = ra_page_get (file, offset); if (page) waitq = ra_page_error (page, op_ret, op_errno); } ra_file_unlock (file); if (waitq != NULL) { ra_waitq_return (waitq); } } void ra_frame_fill (ra_page_t *page, call_frame_t *frame) { ra_local_t *local = NULL; ra_fill_t *fill = NULL; off_t src_offset = 0; off_t dst_offset = 0; ssize_t copy_size = 0; ra_fill_t *new = NULL; local = frame->local; fill = &local->fill; if (local->op_ret != -1 && page->size) { if (local->offset > page->offset) src_offset = local->offset - page->offset; else dst_offset = page->offset - local->offset; copy_size = min (page->size - src_offset, local->size - dst_offset); if (copy_size < 0) { /* if page contains fewer bytes and the required offset is beyond the page size in the page */ copy_size = src_offset = 0; } fill = fill->next; while (fill != &local->fill) { if (fill->offset > page->offset) { break; } fill = fill->next; } new = GF_CALLOC (1, sizeof (*new), gf_ra_mt_ra_fill_t); if (new == NULL) { local->op_ret = -1; local->op_errno = ENOMEM; goto out; } new->offset = page->offset; new->size = copy_size; new->iobref = iobref_ref (page->iobref); new->count = iov_subset (page->vector, page->count, src_offset, src_offset+copy_size, NULL); new->vector = GF_CALLOC (new->count, sizeof (struct iovec), gf_ra_mt_iovec); if (new->vector == NULL) { local->op_ret = -1; local->op_errno = ENOMEM; GF_FREE (new); goto out; } new->count = iov_subset (page->vector, page->count, src_offset, src_offset+copy_size, new->vector); new->next = fill; new->prev = new->next->prev; new->next->prev = new; new->prev->next = new; local->op_ret += copy_size; } out: return; } void ra_frame_unwind (call_frame_t *frame) { ra_local_t *local = NULL; ra_fill_t *fill = NULL; int32_t count = 0; struct iovec *vector; int32_t copied = 0; struct iobref *iobref = NULL; ra_fill_t *next = NULL; fd_t *fd = NULL; ra_file_t *file = NULL; uint64_t tmp_file = 0; local = frame->local; fill = local->fill.next; iobref = iobref_new (); if (iobref == NULL) { local->op_ret = -1; local->op_errno = ENOMEM; } frame->local = NULL; while (fill != &local->fill) { count += fill->count; fill = fill->next; } vector = GF_CALLOC (count, sizeof (*vector), gf_ra_mt_iovec); if (vector == NULL) { local->op_ret = -1; local->op_errno = ENOMEM; iobref_unref (iobref); iobref = NULL; } fill = local->fill.next; while (fill != &local->fill) { next = fill->next; if ((vector != NULL) && (iobref != NULL)) { memcpy (((char *)vector) + copied, fill->vector, fill->count * sizeof (*vector)); copied += (fill->count * sizeof (*vector)); iobref_merge (iobref, fill->iobref); } fill->next->prev = fill->prev; fill->prev->next = fill->prev; iobref_unref (fill->iobref); GF_FREE (fill->vector); GF_FREE (fill); fill = next; } fd = local->fd; fd_ctx_get (fd, frame->this, &tmp_file); file = (ra_file_t *)(long)tmp_file; STACK_UNWIND_STRICT (readv, frame, local->op_ret, local->op_errno, vector, count, &file->stbuf, iobref); iobref_unref (iobref); pthread_mutex_destroy (&local->local_lock); GF_FREE (local); GF_FREE (vector); return; } /* * ra_frame_return - * @frame: * */ void ra_frame_return (call_frame_t *frame) { ra_local_t *local = NULL; int32_t wait_count = 0; local = frame->local; GF_ASSERT (local->wait_count > 0); ra_local_lock (local); { wait_count = --local->wait_count; } ra_local_unlock (local); if (!wait_count) ra_frame_unwind (frame); return; } /* * ra_page_wakeup - * @page: * */ ra_waitq_t * ra_page_wakeup (ra_page_t *page) { ra_waitq_t *waitq = NULL, *trav = NULL; call_frame_t *frame; waitq = page->waitq; page->waitq = NULL; for (trav = waitq; trav; trav = trav->next) { frame = trav->data; ra_frame_fill (page, frame); } return waitq; } /* * ra_page_purge - * @page: * */ void ra_page_purge (ra_page_t *page) { page->prev->next = page->next; page->next->prev = page->prev; if (page->iobref) { iobref_unref (page->iobref); } GF_FREE (page->vector); GF_FREE (page); } /* * ra_page_error - * @page: * @op_ret: * @op_errno: * */ ra_waitq_t * ra_page_error (ra_page_t *page, int32_t op_ret, int32_t op_errno) { ra_waitq_t *waitq = NULL; ra_waitq_t *trav = NULL; call_frame_t *frame = NULL; ra_local_t *local = NULL; waitq = page->waitq; page->waitq = NULL; for (trav = waitq; trav; trav = trav->next) { frame = trav->data; local = frame->local; if (local->op_ret != -1) { local->op_ret = op_ret; local->op_errno = op_errno; } } ra_page_purge (page); return waitq; } /* * ra_file_destroy - * @file: * */ void ra_file_destroy (ra_file_t *file) { ra_conf_t *conf = NULL; ra_page_t *trav = NULL; conf = file->conf; ra_conf_lock (conf); { file->prev->next = file->next; file->next->prev = file->prev; } ra_conf_unlock (conf); trav = file->pages.next; while (trav != &file->pages) { ra_page_error (trav, -1, EINVAL); trav = file->pages.next; } pthread_mutex_destroy (&file->file_lock); GF_FREE (file); }