summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--xlators/performance/io-threads/src/io-threads.c277
1 files changed, 61 insertions, 216 deletions
diff --git a/xlators/performance/io-threads/src/io-threads.c b/xlators/performance/io-threads/src/io-threads.c
index 6a30184aa..5848a44e8 100644
--- a/xlators/performance/io-threads/src/io-threads.c
+++ b/xlators/performance/io-threads/src/io-threads.c
@@ -36,21 +36,18 @@ iot_queue (iot_worker_t *worker,
static call_stub_t *
iot_dequeue (iot_worker_t *worker);
-static iot_worker_t *
+static void
iot_schedule (iot_conf_t *conf,
- iot_file_t *file,
- ino_t ino)
+ inode_t *inode,
+ call_stub_t *stub)
{
int32_t idx = 0;
iot_worker_t *selected_worker = NULL;
- idx = (ino % conf->thread_count);
+ idx = (inode->ino % conf->thread_count);
selected_worker = conf->workers[idx];
- if (file)
- file->worker = selected_worker;
-
- return selected_worker;
+ iot_queue (selected_worker, stub);
}
int32_t
@@ -61,28 +58,22 @@ iot_open_cbk (call_frame_t *frame,
int32_t op_errno,
fd_t *fd)
{
- iot_conf_t *conf = this->private;
-
- if (op_ret >= 0) {
- iot_file_t *file = CALLOC (1, sizeof (*file));
- ERR_ABORT (file);
-
- iot_schedule (conf, file, fd->inode->ino);
- file->fd = fd;
-
- fd_ctx_set (fd, this, (uint64_t)(long)file);
-
- pthread_mutex_lock (&conf->files_lock);
- file->next = &conf->files;
- file->prev = file->next->prev;
- file->next->prev = file;
- file->prev->next = file;
- pthread_mutex_unlock (&conf->files_lock);
- }
STACK_UNWIND (frame, op_ret, op_errno, fd);
return 0;
}
+static int32_t
+iot_open_wrapper (call_frame_t * frame,
+ xlator_t * this,
+ loc_t *loc,
+ int32_t flags,
+ fd_t * fd)
+{
+ STACK_WIND (frame, iot_open_cbk, FIRST_CHILD (this),
+ FIRST_CHILD (this)->fops->open, loc, flags, fd);
+ return 0;
+}
+
int32_t
iot_open (call_frame_t *frame,
xlator_t *this,
@@ -90,13 +81,16 @@ iot_open (call_frame_t *frame,
int32_t flags,
fd_t *fd)
{
- STACK_WIND (frame,
- iot_open_cbk,
- FIRST_CHILD(this),
- FIRST_CHILD(this)->fops->open,
- loc,
- flags,
- fd);
+ call_stub_t *stub = NULL;
+
+ stub = fop_open_stub (frame, iot_open_wrapper, loc, flags, fd);
+ if (!stub) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "cannot get open call stub");
+ STACK_UNWIND (frame, -1, ENOMEM, NULL, 0);
+ }
+ iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+
return 0;
}
@@ -111,30 +105,12 @@ iot_create_cbk (call_frame_t *frame,
inode_t *inode,
struct stat *stbuf)
{
- iot_conf_t *conf = this->private;
-
- if (op_ret >= 0) {
- iot_file_t *file = CALLOC (1, sizeof (*file));
- ERR_ABORT (file);
-
- iot_schedule (conf, file, fd->inode->ino);
- file->fd = fd;
-
- fd_ctx_set (fd, this, (uint64_t)(long)file);
-
- pthread_mutex_lock (&conf->files_lock);
- file->next = &conf->files;
- file->prev = file->next->prev;
- file->next->prev = file;
- file->prev->next = file;
- pthread_mutex_unlock (&conf->files_lock);
- }
STACK_UNWIND (frame, op_ret, op_errno, fd, inode, stbuf);
return 0;
}
int32_t
-iot_create (call_frame_t *frame,
+iot_create_wrapper (call_frame_t *frame,
xlator_t *this,
loc_t *loc,
int32_t flags,
@@ -152,7 +128,26 @@ iot_create (call_frame_t *frame,
return 0;
}
-
+int32_t
+iot_create (call_frame_t *frame,
+ xlator_t *this,
+ loc_t *loc,
+ int32_t flags,
+ mode_t mode,
+ fd_t *fd)
+{
+ call_stub_t *stub = NULL;
+
+ stub = fop_create_stub (frame, iot_create_wrapper, loc, flags, mode,
+ fd);
+ if (!stub) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "cannot get create call stub");
+ STACK_UNWIND (frame, -1, ENOMEM, NULL, 0);
+ }
+ iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
+ return 0;
+}
int32_t
iot_readv_cbk (call_frame_t *frame,
@@ -199,21 +194,8 @@ iot_readv (call_frame_t *frame,
{
call_stub_t *stub;
iot_local_t *local = NULL;
- iot_file_t *file = NULL;
- iot_worker_t *worker = NULL;
- uint64_t tmp_file = 0;
-
- if (fd_ctx_get (fd, this, &tmp_file)) {
- gf_log (this->name, GF_LOG_ERROR,
- "fd context is NULL, returning EBADFD");
- STACK_UNWIND (frame, -1, EBADFD);
- return 0;
- }
-
- file = (iot_file_t *)(long)tmp_file;
- worker = file->worker;
- local = CALLOC (1, sizeof (*local));
+ local = CALLOC (1, sizeof (*local));
ERR_ABORT (local);
frame->local = local;
@@ -229,8 +211,7 @@ iot_readv (call_frame_t *frame,
return 0;
}
- iot_queue (worker, stub);
-
+ iot_schedule ((iot_conf_t *)this->private, fd->inode, stub);
return 0;
}
@@ -265,19 +246,6 @@ iot_flush (call_frame_t *frame,
{
call_stub_t *stub;
iot_local_t *local = NULL;
- iot_file_t *file = NULL;
- iot_worker_t *worker = NULL;
- uint64_t tmp_file = 0;
-
- if (fd_ctx_get (fd, this, &tmp_file)) {
- gf_log (this->name, GF_LOG_ERROR,
- "fd context is NULL, returning EBADFD");
- STACK_UNWIND (frame, -1, EBADFD);
- return 0;
- }
-
- file = (iot_file_t *)(long)tmp_file;
- worker = file->worker;
local = CALLOC (1, sizeof (*local));
ERR_ABORT (local);
@@ -292,8 +260,8 @@ iot_flush (call_frame_t *frame,
STACK_UNWIND (frame, -1, ENOMEM);
return 0;
}
- iot_queue (worker, stub);
+ iot_schedule ((iot_conf_t *)this->private, fd->inode, stub);
return 0;
}
@@ -331,19 +299,6 @@ iot_fsync (call_frame_t *frame,
{
call_stub_t *stub;
iot_local_t *local = NULL;
- iot_file_t *file = NULL;
- iot_worker_t *worker = NULL;
- uint64_t tmp_file = 0;
-
- if (fd_ctx_get (fd, this, &tmp_file)) {
- gf_log (this->name, GF_LOG_ERROR,
- "fd context is NULL, returning EBADFD");
- STACK_UNWIND (frame, -1, EBADFD);
- return 0;
- }
-
- file = (iot_file_t *)(long)tmp_file;
- worker = file->worker;
local = CALLOC (1, sizeof (*local));
ERR_ABORT (local);
@@ -359,8 +314,8 @@ iot_fsync (call_frame_t *frame,
STACK_UNWIND (frame, -1, ENOMEM);
return 0;
}
- iot_queue (worker, stub);
+ iot_schedule ((iot_conf_t *)this->private, fd->inode, stub);
return 0;
}
@@ -409,19 +364,6 @@ iot_writev (call_frame_t *frame,
{
call_stub_t *stub;
iot_local_t *local = NULL;
- iot_file_t *file = NULL;
- iot_worker_t *worker = NULL;
- uint64_t tmp_file = 0;
-
- if (fd_ctx_get (fd, this, &tmp_file)) {
- gf_log (this->name, GF_LOG_ERROR,
- "fd context is NULL, returning EBADFD");
- STACK_UNWIND (frame, -1, EBADFD);
- return 0;
- }
-
- file = (iot_file_t *)(long)tmp_file;
- worker = file->worker;
local = CALLOC (1, sizeof (*local));
ERR_ABORT (local);
@@ -441,7 +383,7 @@ iot_writev (call_frame_t *frame,
return 0;
}
- iot_queue (worker, stub);
+ iot_schedule ((iot_conf_t *)this->private, fd->inode, stub);
return 0;
}
@@ -487,19 +429,6 @@ iot_lk (call_frame_t *frame,
{
call_stub_t *stub;
iot_local_t *local = NULL;
- iot_file_t *file = NULL;
- iot_worker_t *worker = NULL;
- uint64_t tmp_file = 0;
-
- if (fd_ctx_get (fd, this, &tmp_file)) {
- gf_log (this->name, GF_LOG_ERROR,
- "fd context is NULL, returning EBADFD");
- STACK_UNWIND (frame, -1, EBADFD);
- return 0;
- }
-
- file = (iot_file_t *)(long)tmp_file;
- worker = file->worker;
local = CALLOC (1, sizeof (*local));
ERR_ABORT (local);
@@ -513,9 +442,8 @@ iot_lk (call_frame_t *frame,
STACK_UNWIND (frame, -1, ENOMEM, NULL);
return 0;
}
-
- iot_queue (worker, stub);
+ iot_schedule ((iot_conf_t *)this->private, fd->inode, stub);
return 0;
}
@@ -553,12 +481,8 @@ iot_stat (call_frame_t *frame,
{
call_stub_t *stub;
iot_local_t *local = NULL;
- iot_worker_t *worker = NULL;
- iot_conf_t *conf;
fd_t *fd = NULL;
- conf = this->private;
-
local = CALLOC (1, sizeof (*local));
ERR_ABORT (local);
frame->local = local;
@@ -576,8 +500,6 @@ iot_stat (call_frame_t *frame,
fd_unref (fd);
- worker = iot_schedule (conf, NULL, loc->inode->ino);
-
stub = fop_stat_stub (frame,
iot_stat_wrapper,
loc);
@@ -586,7 +508,7 @@ iot_stat (call_frame_t *frame,
STACK_UNWIND (frame, -1, ENOMEM, NULL);
return 0;
}
- iot_queue (worker, stub);
+ iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
return 0;
}
@@ -624,19 +546,6 @@ iot_fstat (call_frame_t *frame,
{
call_stub_t *stub;
iot_local_t *local = NULL;
- iot_file_t *file = NULL;
- iot_worker_t *worker = NULL;
- uint64_t tmp_file = 0;
-
- if (fd_ctx_get (fd, this, &tmp_file)) {
- gf_log (this->name, GF_LOG_ERROR,
- "fd context is NULL, returning EBADFD");
- STACK_UNWIND (frame, -1, EBADFD);
- return 0;
- }
-
- file = (iot_file_t *)(long)tmp_file;
- worker = file->worker;
local = CALLOC (1, sizeof (*local));
ERR_ABORT (local);
@@ -650,7 +559,7 @@ iot_fstat (call_frame_t *frame,
return 0;
}
- iot_queue (worker, stub);
+ iot_schedule ((iot_conf_t *)this->private, fd->inode, stub);
return 0;
}
@@ -690,11 +599,8 @@ iot_truncate (call_frame_t *frame,
{
call_stub_t *stub;
iot_local_t *local = NULL;
- iot_worker_t *worker = NULL;
- iot_conf_t *conf;
fd_t *fd = NULL;
- conf = this->private;
local = CALLOC (1, sizeof (*local));
ERR_ABORT (local);
frame->local = local;
@@ -713,8 +619,6 @@ iot_truncate (call_frame_t *frame,
fd_unref (fd);
- worker = iot_schedule (conf, NULL, loc->inode->ino);
-
stub = fop_truncate_stub (frame,
iot_truncate_wrapper,
loc,
@@ -724,7 +628,7 @@ iot_truncate (call_frame_t *frame,
STACK_UNWIND (frame, -1, ENOMEM, NULL);
return 0;
}
- iot_queue (worker, stub);
+ iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
return 0;
}
@@ -764,19 +668,6 @@ iot_ftruncate (call_frame_t *frame,
{
call_stub_t *stub;
iot_local_t *local = NULL;
- iot_file_t *file = NULL;
- iot_worker_t *worker = NULL;
- uint64_t tmp_file = 0;
-
- if (fd_ctx_get (fd, this, &tmp_file)) {
- gf_log (this->name, GF_LOG_ERROR,
- "fd context is NULL, returning EBADFD");
- STACK_UNWIND (frame, -1, EBADFD);
- return 0;
- }
-
- file = (iot_file_t *)(long)tmp_file;
- worker = file->worker;
local = CALLOC (1, sizeof (*local));
ERR_ABORT (local);
@@ -791,7 +682,7 @@ iot_ftruncate (call_frame_t *frame,
STACK_UNWIND (frame, -1, ENOMEM, NULL);
return 0;
}
- iot_queue (worker, stub);
+ iot_schedule ((iot_conf_t *)this->private, fd->inode, stub);
return 0;
}
@@ -832,12 +723,8 @@ iot_utimens (call_frame_t *frame,
{
call_stub_t *stub;
iot_local_t *local = NULL;
- iot_worker_t *worker = NULL;
- iot_conf_t *conf;
fd_t *fd = NULL;
- conf = this->private;
-
local = CALLOC (1, sizeof (*local));
ERR_ABORT (local);
frame->local = local;
@@ -856,8 +743,6 @@ iot_utimens (call_frame_t *frame,
fd_unref (fd);
- worker = iot_schedule (conf, NULL, loc->inode->ino);
-
stub = fop_utimens_stub (frame,
iot_utimens_wrapper,
loc,
@@ -867,7 +752,7 @@ iot_utimens (call_frame_t *frame,
STACK_UNWIND (frame, -1, ENOMEM, NULL);
return 0;
}
- iot_queue (worker, stub);
+ iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
return 0;
}
@@ -910,16 +795,10 @@ iot_checksum (call_frame_t *frame,
{
call_stub_t *stub = NULL;
iot_local_t *local = NULL;
- iot_worker_t *worker = NULL;
- iot_conf_t *conf = NULL;
- conf = this->private;
-
local = CALLOC (1, sizeof (*local));
frame->local = local;
- worker = iot_schedule (conf, NULL, conf->misc_thread_index++);
-
stub = fop_checksum_stub (frame,
iot_checksum_wrapper,
loc,
@@ -929,7 +808,7 @@ iot_checksum (call_frame_t *frame,
STACK_UNWIND (frame, -1, ENOMEM, NULL, NULL);
return 0;
}
- iot_queue (worker, stub);
+ iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
return 0;
}
@@ -967,51 +846,18 @@ iot_unlink (call_frame_t *frame,
{
call_stub_t *stub = NULL;
iot_local_t *local = NULL;
- iot_worker_t *worker = NULL;
- iot_conf_t *conf = NULL;
-
- conf = this->private;
local = CALLOC (1, sizeof (*local));
frame->local = local;
- worker = iot_schedule (conf, NULL, conf->misc_thread_index++);
-
stub = fop_unlink_stub (frame, iot_unlink_wrapper, loc);
if (!stub) {
gf_log (this->name, GF_LOG_ERROR, "cannot get fop_unlink call stub");
STACK_UNWIND (frame, -1, ENOMEM);
return 0;
}
- iot_queue (worker, stub);
-
- return 0;
-}
-
-int32_t
-iot_release (xlator_t *this,
- fd_t *fd)
-{
- iot_file_t *file = NULL;
- iot_conf_t *conf = NULL;
- uint64_t tmp_file = 0;
- int ret = 0;
-
- conf = this->private;
- ret = fd_ctx_del (fd, this, &tmp_file);
- if (ret)
- return 0;
-
- file = (iot_file_t *)(long)tmp_file;
-
- pthread_mutex_lock (&conf->files_lock);
- {
- (file->prev)->next = file->next;
- (file->next)->prev = file->prev;
- }
- pthread_mutex_unlock (&conf->files_lock);
+ iot_schedule ((iot_conf_t *)this->private, loc->inode, stub);
- FREE (file);
return 0;
}
@@ -1203,7 +1049,6 @@ struct xlator_mops mops = {
};
struct xlator_cbks cbks = {
- .release = iot_release,
};
struct volume_options options[] = {