From 5cbeb97c37c5e6b0263b09b308f755ab6c582d38 Mon Sep 17 00:00:00 2001 From: botbw Date: Thu, 19 Sep 2024 14:43:40 +0800 Subject: [PATCH 01/13] [backend] init pthread pool --- csrc/threadpool.c | 305 +++++++++++++++++++++++++++++++++++++++++++ include/threadpool.h | 101 ++++++++++++++ 2 files changed, 406 insertions(+) create mode 100644 csrc/threadpool.c create mode 100644 include/threadpool.h diff --git a/csrc/threadpool.c b/csrc/threadpool.c new file mode 100644 index 0000000..935e7e0 --- /dev/null +++ b/csrc/threadpool.c @@ -0,0 +1,305 @@ +/* + * Copyright (c) 2016, Mathias Brossard . + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * @file threadpool.c + * @brief Threadpool implementation file + */ + +#include +#include +#include + +#include "threadpool.h" + +typedef enum { + immediate_shutdown = 1, + graceful_shutdown = 2 +} threadpool_shutdown_t; + +/** + * @struct threadpool_task + * @brief the work struct + * + * @var function Pointer to the function that will perform the task. + * @var argument Argument to be passed to the function. + */ + +typedef struct { + void (*function)(void *); + void *argument; +} threadpool_task_t; + +/** + * @struct threadpool + * @brief The threadpool struct + * + * @var notify Condition variable to notify worker threads. + * @var threads Array containing worker threads ID. + * @var thread_count Number of threads + * @var queue Array containing the task queue. + * @var queue_size Size of the task queue. + * @var head Index of the first element. + * @var tail Index of the next element. + * @var count Number of pending tasks + * @var shutdown Flag indicating if the pool is shutting down + * @var started Number of started threads + */ +struct threadpool_t { + pthread_mutex_t lock; + pthread_cond_t notify; + pthread_t *threads; + threadpool_task_t *queue; + int thread_count; + int queue_size; + int head; + int tail; + int count; + int shutdown; + int started; +}; + +/** + * @function void *threadpool_thread(void *threadpool) + * @brief the worker thread + * @param threadpool the pool which own the thread + */ +static void *threadpool_thread(void *threadpool); + +int threadpool_free(threadpool_t *pool); + +threadpool_t *threadpool_create(int thread_count, int queue_size, int flags) +{ + threadpool_t *pool; + int i; + (void) flags; + + if(thread_count <= 0 || thread_count > MAX_THREADS || queue_size <= 0 || queue_size > MAX_QUEUE) { + return NULL; + } + + if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) { + goto err; + } + + /* Initialize */ + pool->thread_count = 0; + pool->queue_size = queue_size; + pool->head = pool->tail = pool->count = 0; + pool->shutdown = pool->started = 0; + + /* Allocate thread and task queue */ + pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count); + pool->queue = (threadpool_task_t *)malloc + (sizeof(threadpool_task_t) * queue_size); + + /* Initialize mutex and conditional variable first */ + if((pthread_mutex_init(&(pool->lock), NULL) != 0) || + (pthread_cond_init(&(pool->notify), NULL) != 0) || + (pool->threads == NULL) || + (pool->queue == NULL)) { + goto err; + } + + /* Start worker threads */ + for(i = 0; i < thread_count; i++) { + if(pthread_create(&(pool->threads[i]), NULL, + threadpool_thread, (void*)pool) != 0) { + threadpool_destroy(pool, 0); + return NULL; + } + pool->thread_count++; + pool->started++; + } + + return pool; + + err: + if(pool) { + threadpool_free(pool); + } + return NULL; +} + +int threadpool_add(threadpool_t *pool, void (*function)(void *), + void *argument, int flags) +{ + int err = 0; + int next; + (void) flags; + + if(pool == NULL || function == NULL) { + return threadpool_invalid; + } + + if(pthread_mutex_lock(&(pool->lock)) != 0) { + return threadpool_lock_failure; + } + + next = (pool->tail + 1) % pool->queue_size; + + do { + /* Are we full ? */ + if(pool->count == pool->queue_size) { + err = threadpool_queue_full; + break; + } + + /* Are we shutting down ? */ + if(pool->shutdown) { + err = threadpool_shutdown; + break; + } + + /* Add task to queue */ + pool->queue[pool->tail].function = function; + pool->queue[pool->tail].argument = argument; + pool->tail = next; + pool->count += 1; + + /* pthread_cond_broadcast */ + if(pthread_cond_signal(&(pool->notify)) != 0) { + err = threadpool_lock_failure; + break; + } + } while(0); + + if(pthread_mutex_unlock(&pool->lock) != 0) { + err = threadpool_lock_failure; + } + + return err; +} + +int threadpool_destroy(threadpool_t *pool, int flags) +{ + int i, err = 0; + + if(pool == NULL) { + return threadpool_invalid; + } + + if(pthread_mutex_lock(&(pool->lock)) != 0) { + return threadpool_lock_failure; + } + + do { + /* Already shutting down */ + if(pool->shutdown) { + err = threadpool_shutdown; + break; + } + + pool->shutdown = (flags & threadpool_graceful) ? + graceful_shutdown : immediate_shutdown; + + /* Wake up all worker threads */ + if((pthread_cond_broadcast(&(pool->notify)) != 0) || + (pthread_mutex_unlock(&(pool->lock)) != 0)) { + err = threadpool_lock_failure; + break; + } + + /* Join all worker thread */ + for(i = 0; i < pool->thread_count; i++) { + if(pthread_join(pool->threads[i], NULL) != 0) { + err = threadpool_thread_failure; + } + } + } while(0); + + /* Only if everything went well do we deallocate the pool */ + if(!err) { + threadpool_free(pool); + } + return err; +} + +int threadpool_free(threadpool_t *pool) +{ + if(pool == NULL || pool->started > 0) { + return -1; + } + + /* Did we manage to allocate ? */ + if(pool->threads) { + free(pool->threads); + free(pool->queue); + + /* Because we allocate pool->threads after initializing the + mutex and condition variable, we're sure they're + initialized. Let's lock the mutex just in case. */ + pthread_mutex_lock(&(pool->lock)); + pthread_mutex_destroy(&(pool->lock)); + pthread_cond_destroy(&(pool->notify)); + } + free(pool); + return 0; +} + + +static void *threadpool_thread(void *threadpool) +{ + threadpool_t *pool = (threadpool_t *)threadpool; + threadpool_task_t task; + + for(;;) { + /* Lock must be taken to wait on conditional variable */ + pthread_mutex_lock(&(pool->lock)); + + /* Wait on condition variable, check for spurious wakeups. + When returning from pthread_cond_wait(), we own the lock. */ + while((pool->count == 0) && (!pool->shutdown)) { + pthread_cond_wait(&(pool->notify), &(pool->lock)); + } + + if((pool->shutdown == immediate_shutdown) || + ((pool->shutdown == graceful_shutdown) && + (pool->count == 0))) { + break; + } + + /* Grab our task */ + task.function = pool->queue[pool->head].function; + task.argument = pool->queue[pool->head].argument; + pool->head = (pool->head + 1) % pool->queue_size; + pool->count -= 1; + + /* Unlock */ + pthread_mutex_unlock(&(pool->lock)); + + /* Get to work */ + (*(task.function))(task.argument); + } + + pool->started--; + + pthread_mutex_unlock(&(pool->lock)); + pthread_exit(NULL); + return(NULL); +} diff --git a/include/threadpool.h b/include/threadpool.h new file mode 100644 index 0000000..0850832 --- /dev/null +++ b/include/threadpool.h @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2016, Mathias Brossard . + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef _THREADPOOL_H_ +#define _THREADPOOL_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * @file threadpool.h + * @brief Threadpool Header File + */ + + /** + * Increase this constants at your own risk + * Large values might slow down your system + */ +#define MAX_THREADS 128 +#define MAX_QUEUE 65536 + +typedef struct threadpool_t threadpool_t; + +typedef enum { + threadpool_invalid = -1, + threadpool_lock_failure = -2, + threadpool_queue_full = -3, + threadpool_shutdown = -4, + threadpool_thread_failure = -5 +} threadpool_error_t; + +typedef enum { + threadpool_graceful = 1 +} threadpool_destroy_flags_t; + +/** + * @function threadpool_create + * @brief Creates a threadpool_t object. + * @param thread_count Number of worker threads. + * @param queue_size Size of the queue. + * @param flags Unused parameter. + * @return a newly created thread pool or NULL + */ +threadpool_t *threadpool_create(int thread_count, int queue_size, int flags); + +/** + * @function threadpool_add + * @brief add a new task in the queue of a thread pool + * @param pool Thread pool to which add the task. + * @param function Pointer to the function that will perform the task. + * @param argument Argument to be passed to the function. + * @param flags Unused parameter. + * @return 0 if all goes well, negative values in case of error (@see + * threadpool_error_t for codes). + */ +int threadpool_add(threadpool_t *pool, void (*routine)(void *), + void *arg, int flags); + +/** + * @function threadpool_destroy + * @brief Stops and destroys a thread pool. + * @param pool Thread pool to destroy. + * @param flags Flags for shutdown + * + * Known values for flags are 0 (default) and threadpool_graceful in + * which case the thread pool doesn't accept any new tasks but + * processes all pending tasks before shutdown. + */ +int threadpool_destroy(threadpool_t *pool, int flags); + +#ifdef __cplusplus +} +#endif + +#endif /* _THREADPOOL_H_ */ From 7a9e4afd26047fa8746e8cadf4b31b1659f8ac44 Mon Sep 17 00:00:00 2001 From: botbw Date: Thu, 19 Sep 2024 17:54:34 +0800 Subject: [PATCH 02/13] [pthread] make it compilable --- csrc/backend.cpp | 25 ++++- csrc/pthread_backend.cpp | 130 ++++++++++++++++++++++++++ csrc/{threadpool.c => threadpool.cpp} | 3 +- include/offload.h | 2 + include/pthread_backend.h | 123 ++++++++++++++++++++++++ include/threadpool.h | 8 -- setup.py | 16 +++- 7 files changed, 292 insertions(+), 15 deletions(-) create mode 100644 csrc/pthread_backend.cpp rename csrc/{threadpool.c => threadpool.cpp} (99%) create mode 100644 include/pthread_backend.h diff --git a/csrc/backend.cpp b/csrc/backend.cpp index 137a903..b6a4e14 100644 --- a/csrc/backend.cpp +++ b/csrc/backend.cpp @@ -11,6 +11,9 @@ #ifndef DISABLE_AIO #include "aio.h" #endif +#ifndef DISABLE_PTHREAD +#include "pthread_backend.h" +#endif std::unordered_set get_backends() { @@ -20,6 +23,9 @@ std::unordered_set get_backends() #endif #ifndef DISABLE_AIO backends.insert("aio"); +#endif +#ifndef DISABLE_PTHREAD + backends.insert("pthread"); #endif return backends; } @@ -35,18 +41,25 @@ void probe_asyncio(const std::string &backend) try { std::unique_ptr aio; - if (backend == "uring") + if (backend == "uring") { #ifndef DISABLE_URING aio.reset(new UringAsyncIO(2)); #else - throw std::runtime_error("backend is not installed\n"); + throw std::runtime_error("backend uring is not installed\n"); #endif - else + } else if (backend == "aio") { #ifndef DISABLE_AIO aio.reset(new AIOAsyncIO(2)); #else - throw std::runtime_error("backend is not installed\n"); + throw std::runtime_error("backend aio is not installed\n"); +#endif + } else { +#ifndef DISABLE_PTHREAD + aio.reset(new PthreadAsyncIO(2)); +#else + throw std::runtime_error("backend pthread is not installed\n"); #endif + } int fd = fileno(fp); const int n_loop = 5, n_len = 18; @@ -120,6 +133,10 @@ AsyncIO *create_asyncio(unsigned int n_entries, const std::string &backend) #ifndef DISABLE_AIO if (backend == "aio") return new AIOAsyncIO(n_entries); +#endif +#ifndef DISABLE_PTHREAD + if (backend == "pthread") + throw std::runtime_error("not implemented: " + backend); #endif throw std::runtime_error("Unsupported backend: " + backend); } \ No newline at end of file diff --git a/csrc/pthread_backend.cpp b/csrc/pthread_backend.cpp new file mode 100644 index 0000000..b7ded38 --- /dev/null +++ b/csrc/pthread_backend.cpp @@ -0,0 +1,130 @@ +#include +#include +#include +#include + +#include "asyncio.h" +#include "threadpool.h" +#include "pthread_backend.h" + + +void AIOContext::worker(AIOOperation &op) { + + if (op.opcode == THAIO_NOOP) { + return; + } + + int fileno = op.fileno; + off_t offset = op.offset; + int buf_size = op.buf_size; + void* buf = op.buf; + const iovec* iov = op.iov; + + int result; + + switch (op.opcode) { + case THAIO_WRITE: + result = pwrite(fileno, buf, buf_size, offset); + break; + case THAIO_WRITEV: + result = pwritev(fileno, iov, buf_size, offset); + break; + case THAIO_FSYNC: + result = fsync(fileno); + break; + case THAIO_FDSYNC: + result = fdatasync(fileno); + break; + case THAIO_READ: + result = pread(fileno, buf, buf_size, offset); + break; + case THAIO_READV: + result = preadv(fileno, iov, buf_size, offset); + break; + } + + op.result = result; + + if (result < 0) op.error = errno; + +} + +void AIOContext::submit(AIOOperation &op) { + op.in_progress = true; + int result = threadpool_add( + this->pool, + reinterpret_cast(AIOContext::worker), + nullptr, // reinterpret_cast(&op), // TODO @botbw OP RAII? + 0 + ); + if (result < 0) { + throw std::runtime_error("error when submitting job"); + } +} + +void PthreadAsyncIO::write(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback) { + AIOOperation op( + THAIO_WRITE, + fd, + offset, + n_bytes, + buffer, + nullptr + ); + this->ctx.submit(op); +} + +void PthreadAsyncIO::read(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback) { + AIOOperation op( + THAIO_READ, + fd, + offset, + n_bytes, + buffer, + nullptr + ); + this->ctx.submit(op); +} + + +void PthreadAsyncIO::writev(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback) { + AIOOperation op( + THAIO_WRITEV, + fd, + offset, + static_cast(iovcnt), + nullptr, + iov + ); + this->ctx.submit(op); +} + +void PthreadAsyncIO::readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback) { + AIOOperation op( + THAIO_READV, + fd, + offset, + static_cast(iovcnt), + nullptr, + iov + ); + this->ctx.submit(op); +} + +void PthreadAsyncIO::get_event(WaitType wt) { + throw std::runtime_error("not implemented"); +} + +void PthreadAsyncIO::sync_write_events() { + throw std::runtime_error("not implemented"); +} + +void PthreadAsyncIO::sync_read_events() { + throw std::runtime_error("not implemented"); +} + +void PthreadAsyncIO::synchronize() { + throw std::runtime_error("not implemented"); +} + +void PthreadAsyncIO::register_file(int fd) {} \ No newline at end of file diff --git a/csrc/threadpool.c b/csrc/threadpool.cpp similarity index 99% rename from csrc/threadpool.c rename to csrc/threadpool.cpp index 935e7e0..60c3ff8 100644 --- a/csrc/threadpool.c +++ b/csrc/threadpool.cpp @@ -31,6 +31,7 @@ * @brief Threadpool implementation file */ + #include #include #include @@ -302,4 +303,4 @@ static void *threadpool_thread(void *threadpool) pthread_mutex_unlock(&(pool->lock)); pthread_exit(NULL); return(NULL); -} +} \ No newline at end of file diff --git a/include/offload.h b/include/offload.h index 501ea84..7fc72df 100644 --- a/include/offload.h +++ b/include/offload.h @@ -1,6 +1,8 @@ #pragma once +#include "asyncio.h" #include + #include "space_mgr.h" #ifndef DISABLE_URING #include "uring.h" diff --git a/include/pthread_backend.h b/include/pthread_backend.h new file mode 100644 index 0000000..6789eaf --- /dev/null +++ b/include/pthread_backend.h @@ -0,0 +1,123 @@ +#pragma once + +#include +#include +#include "asyncio.h" +#include "threadpool.h" + + +enum thaio_op_code_t { + THAIO_READ, + THAIO_READV, + THAIO_WRITE, + THAIO_WRITEV, + THAIO_FSYNC, + THAIO_FDSYNC, + THAIO_NOOP, +}; + +static const unsigned int CTX_POOL_SIZE_DEFAULT = 8; +static const unsigned int CTX_MAX_REQUESTS_DEFAULT = 512; + +class AIOOperation; + +class AIOContext { +public: + AIOContext( + unsigned int max_requests_, + unsigned int pool_size_ + ) + : pool(nullptr) + , max_requests(max_requests_) + , pool_size(pool_size_) + { + this->pool = threadpool_create(this->pool_size, this->max_requests, 0); + if (this->pool == nullptr) { + throw std::runtime_error("failed to allocate thread pool"); + } + } + + ~AIOContext() { + if (this->pool != nullptr) { + threadpool_t* pool = this->pool; + this->pool = nullptr; + threadpool_destroy(pool, 0); + } + } + + + void submit(AIOOperation &op); + + static void worker(AIOOperation &op); + +private: + threadpool_t *pool; + unsigned int max_requests; + unsigned int pool_size; + +}; + +// data class +class AIOOperation { + friend class AIOContext; + +public: + AIOOperation( + const thaio_op_code_t opcode_, + const int fileno_, + const unsigned long long offset_, + const unsigned long long buf_size_, + void* buf_, + const iovec *iov_ + ) + : opcode(opcode_) + , fileno(fileno_) + , offset(offset_) + , result(-1) + , error(0) + , in_progress(false) + , buf_size(buf_size_) + , buf(buf_) + , iov(iov_) + {} + + ~AIOOperation() = default; +private: + const thaio_op_code_t opcode; + const int fileno; + const unsigned long long offset; + int result; + int error; + bool in_progress; + const unsigned long long buf_size; + void* buf; + const iovec *iov; +}; + + +class PthreadAsyncIO : public AsyncIO +{ +private: + AIOContext ctx; + + +public: + PthreadAsyncIO( + unsigned int max_requests = CTX_MAX_REQUESTS_DEFAULT, + unsigned int pool_size = CTX_POOL_SIZE_DEFAULT + ): ctx(max_requests, pool_size) {} + + ~PthreadAsyncIO() = default; + + void write(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback); + void read(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback); + void writev(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback); + void readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback); + + void get_event(WaitType wt); + void sync_write_events(); + void sync_read_events(); + void synchronize(); + + void register_file(int fd); +}; \ No newline at end of file diff --git a/include/threadpool.h b/include/threadpool.h index 0850832..b206c13 100644 --- a/include/threadpool.h +++ b/include/threadpool.h @@ -29,10 +29,6 @@ #ifndef _THREADPOOL_H_ #define _THREADPOOL_H_ -#ifdef __cplusplus -extern "C" { -#endif - /** * @file threadpool.h * @brief Threadpool Header File @@ -94,8 +90,4 @@ int threadpool_add(threadpool_t *pool, void (*routine)(void *), */ int threadpool_destroy(threadpool_t *pool, int flags); -#ifdef __cplusplus -} -#endif - #endif /* _THREADPOOL_H_ */ diff --git a/setup.py b/setup.py index f4bb739..99cbfce 100644 --- a/setup.py +++ b/setup.py @@ -20,17 +20,27 @@ def check_uring_compatibility(): kernel_version = version.parse(uname_info.release.split("-")[0]) return kernel_version >= version.parse("5.10") +def check_pthread_compatibility(): + uname_info = uname() + if uname_info.system != "Linux": + raise RuntimeError("Only Linux is supported") + return True + this_dir = os.path.dirname(os.path.abspath(__file__)) backend_install_dir = os.path.join(os.path.expanduser("~"), ".tensornvme") enable_uring = True enable_aio = True +enable_pthread = True if os.environ.get("DISABLE_URING") == "1" or not check_uring_compatibility(): enable_uring = False if os.environ.get("DISABLE_AIO") == "1": enable_aio = False -assert enable_aio or enable_uring +if os.environ.get("DISABLE_PTHREAD") == "1" or not check_pthread_compatibility(): + enable_pthread=False + +assert enable_aio or enable_uring or enable_pthread if os.environ.get("WITH_ROOT") == "1": backend_install_dir = "/usr" if not os.access(backend_install_dir, os.W_OK): @@ -45,6 +55,8 @@ def check_uring_compatibility(): "csrc/backend.cpp", "csrc/async_file_io.cpp", "csrc/py_api.cpp", + "csrc/pthread_backend.cpp", + "csrc/threadpool.cpp" ] extra_objects = [] define_macros = [] @@ -77,7 +89,7 @@ def cpp_ext_helper(name, sources, **kwargs): def find_static_lib(lib_name: str, lib_paths: List[str] = []) -> str: static_lib_name = f"lib{lib_name}.a" - lib_paths.extend(["/usr/lib", "/usr/lib64"]) + lib_paths.extend(["/usr/lib", "/usr/lib64", "/usr/lib/x86_64-linux-gnu/"]) if os.environ.get("LIBRARY_PATH", None) is not None: lib_paths.extend(os.environ["LIBRARY_PATH"].split(":")) for lib_dir in lib_paths: From 71a50741cc0b65fdc6f21dff9792bc75f5a406e0 Mon Sep 17 00:00:00 2001 From: botbw Date: Fri, 20 Sep 2024 18:00:09 +0800 Subject: [PATCH 03/13] [pthread] pass callback f ptr --- csrc/pthread_backend.cpp | 17 +++++++++++++---- include/pthread_backend.h | 5 ++++- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/csrc/pthread_backend.cpp b/csrc/pthread_backend.cpp index b7ded38..4adc71d 100644 --- a/csrc/pthread_backend.cpp +++ b/csrc/pthread_backend.cpp @@ -19,6 +19,7 @@ void AIOContext::worker(AIOOperation &op) { int buf_size = op.buf_size; void* buf = op.buf; const iovec* iov = op.iov; + const callback_t cb = op.callback; int result; @@ -45,6 +46,10 @@ void AIOContext::worker(AIOOperation &op) { op.result = result; + if (cb != nullptr) { + cb(); + } + if (result < 0) op.error = errno; } @@ -69,7 +74,8 @@ void PthreadAsyncIO::write(int fd, void *buffer, size_t n_bytes, unsigned long l offset, n_bytes, buffer, - nullptr + nullptr, + callback ); this->ctx.submit(op); } @@ -81,7 +87,8 @@ void PthreadAsyncIO::read(int fd, void *buffer, size_t n_bytes, unsigned long lo offset, n_bytes, buffer, - nullptr + nullptr, + callback ); this->ctx.submit(op); } @@ -94,7 +101,8 @@ void PthreadAsyncIO::writev(int fd, const iovec *iov, unsigned int iovcnt, unsig offset, static_cast(iovcnt), nullptr, - iov + iov, + callback ); this->ctx.submit(op); } @@ -106,7 +114,8 @@ void PthreadAsyncIO::readv(int fd, const iovec *iov, unsigned int iovcnt, unsign offset, static_cast(iovcnt), nullptr, - iov + iov, + callback ); this->ctx.submit(op); } diff --git a/include/pthread_backend.h b/include/pthread_backend.h index 6789eaf..e09c179 100644 --- a/include/pthread_backend.h +++ b/include/pthread_backend.h @@ -68,7 +68,8 @@ class AIOOperation { const unsigned long long offset_, const unsigned long long buf_size_, void* buf_, - const iovec *iov_ + const iovec *iov_, + const callback_t callback_ ) : opcode(opcode_) , fileno(fileno_) @@ -79,6 +80,7 @@ class AIOOperation { , buf_size(buf_size_) , buf(buf_) , iov(iov_) + , callback(callback_) {} ~AIOOperation() = default; @@ -92,6 +94,7 @@ class AIOOperation { const unsigned long long buf_size; void* buf; const iovec *iov; + const callback_t callback; }; From 489b3b0ff875cb846d9dcb6c1f3e3bd71c0bb6ca Mon Sep 17 00:00:00 2001 From: botbw Date: Fri, 20 Sep 2024 18:23:48 +0800 Subject: [PATCH 04/13] [pthread] sync api with other backends --- csrc/pthread_backend.cpp | 64 +++++++++++++++++---------------------- include/asyncio.h | 4 ++- include/pthread_backend.h | 33 ++++++-------------- 3 files changed, 40 insertions(+), 61 deletions(-) diff --git a/csrc/pthread_backend.cpp b/csrc/pthread_backend.cpp index 4adc71d..5ef235d 100644 --- a/csrc/pthread_backend.cpp +++ b/csrc/pthread_backend.cpp @@ -8,58 +8,49 @@ #include "pthread_backend.h" -void AIOContext::worker(AIOOperation &op) { +void AIOContext::worker(void *op_) { + PthradIOData *op = reinterpret_cast(op_); - if (op.opcode == THAIO_NOOP) { - return; - } - - int fileno = op.fileno; - off_t offset = op.offset; - int buf_size = op.buf_size; - void* buf = op.buf; - const iovec* iov = op.iov; - const callback_t cb = op.callback; + int fileno = op->fileno; + off_t offset = op->offset; + int buf_size = op->buf_size; + void* buf = op->buf; + const iovec* iov = op->iov; + const callback_t cb = op->callback; int result; - switch (op.opcode) { - case THAIO_WRITE: + switch (op->type) { + case WRITE: result = pwrite(fileno, buf, buf_size, offset); break; - case THAIO_WRITEV: + case WRITEV: result = pwritev(fileno, iov, buf_size, offset); break; - case THAIO_FSYNC: - result = fsync(fileno); - break; - case THAIO_FDSYNC: - result = fdatasync(fileno); - break; - case THAIO_READ: + case READ: result = pread(fileno, buf, buf_size, offset); break; - case THAIO_READV: + case READV: result = preadv(fileno, iov, buf_size, offset); break; } - op.result = result; + op->result = result; if (cb != nullptr) { cb(); } - if (result < 0) op.error = errno; + if (result < 0) op->error = errno; } -void AIOContext::submit(AIOOperation &op) { - op.in_progress = true; +void AIOContext::submit(PthradIOData *op) { + op->in_progress = true; int result = threadpool_add( this->pool, - reinterpret_cast(AIOContext::worker), - nullptr, // reinterpret_cast(&op), // TODO @botbw OP RAII? + AIOContext::worker, + static_cast(op), 0 ); if (result < 0) { @@ -68,8 +59,8 @@ void AIOContext::submit(AIOOperation &op) { } void PthreadAsyncIO::write(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback) { - AIOOperation op( - THAIO_WRITE, + PthradIOData *op = new PthradIOData( + WRITE, fd, offset, n_bytes, @@ -81,8 +72,8 @@ void PthreadAsyncIO::write(int fd, void *buffer, size_t n_bytes, unsigned long l } void PthreadAsyncIO::read(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback) { - AIOOperation op( - THAIO_READ, + PthradIOData *op = new PthradIOData( + READ, fd, offset, n_bytes, @@ -95,8 +86,8 @@ void PthreadAsyncIO::read(int fd, void *buffer, size_t n_bytes, unsigned long lo void PthreadAsyncIO::writev(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback) { - AIOOperation op( - THAIO_WRITEV, + PthradIOData *op = new PthradIOData( + WRITEV, fd, offset, static_cast(iovcnt), @@ -108,8 +99,8 @@ void PthreadAsyncIO::writev(int fd, const iovec *iov, unsigned int iovcnt, unsig } void PthreadAsyncIO::readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback) { - AIOOperation op( - THAIO_READV, + PthradIOData *op = new PthradIOData( + READV, fd, offset, static_cast(iovcnt), @@ -122,6 +113,7 @@ void PthreadAsyncIO::readv(int fd, const iovec *iov, unsigned int iovcnt, unsign void PthreadAsyncIO::get_event(WaitType wt) { throw std::runtime_error("not implemented"); + // TODO @botbw: release PthreadIOData here } void PthreadAsyncIO::sync_write_events() { diff --git a/include/asyncio.h b/include/asyncio.h index d479123..8f27222 100644 --- a/include/asyncio.h +++ b/include/asyncio.h @@ -8,7 +8,9 @@ using callback_t = std::function; enum IOType { WRITE, - READ + WRITEV, + READ, + READV }; enum WaitType diff --git a/include/pthread_backend.h b/include/pthread_backend.h index e09c179..1a7886b 100644 --- a/include/pthread_backend.h +++ b/include/pthread_backend.h @@ -6,20 +6,10 @@ #include "threadpool.h" -enum thaio_op_code_t { - THAIO_READ, - THAIO_READV, - THAIO_WRITE, - THAIO_WRITEV, - THAIO_FSYNC, - THAIO_FDSYNC, - THAIO_NOOP, -}; - static const unsigned int CTX_POOL_SIZE_DEFAULT = 8; static const unsigned int CTX_MAX_REQUESTS_DEFAULT = 512; -class AIOOperation; +class PthradIOData; class AIOContext { public: @@ -46,9 +36,9 @@ class AIOContext { } - void submit(AIOOperation &op); + void submit(PthradIOData *op); - static void worker(AIOOperation &op); + static void worker(void *op); private: threadpool_t *pool; @@ -58,12 +48,12 @@ class AIOContext { }; // data class -class AIOOperation { +class PthradIOData: IOData { friend class AIOContext; public: - AIOOperation( - const thaio_op_code_t opcode_, + PthradIOData( + const IOType type_, const int fileno_, const unsigned long long offset_, const unsigned long long buf_size_, @@ -71,7 +61,7 @@ class AIOOperation { const iovec *iov_, const callback_t callback_ ) - : opcode(opcode_) + : IOData(type_, callback_, iov_) , fileno(fileno_) , offset(offset_) , result(-1) @@ -79,13 +69,10 @@ class AIOOperation { , in_progress(false) , buf_size(buf_size_) , buf(buf_) - , iov(iov_) - , callback(callback_) {} - ~AIOOperation() = default; + ~PthradIOData() = default; // release iov_ by calling parent destructor private: - const thaio_op_code_t opcode; const int fileno; const unsigned long long offset; int result; @@ -93,8 +80,6 @@ class AIOOperation { bool in_progress; const unsigned long long buf_size; void* buf; - const iovec *iov; - const callback_t callback; }; @@ -121,6 +106,6 @@ class PthreadAsyncIO : public AsyncIO void sync_write_events(); void sync_read_events(); void synchronize(); - + void register_file(int fd); }; \ No newline at end of file From 08215cdde6188f2de9a3cf055081958196d25ba6 Mon Sep 17 00:00:00 2001 From: botbw Date: Sun, 22 Sep 2024 14:44:08 +0800 Subject: [PATCH 05/13] [pthread] fetch environ --- csrc/backend.cpp | 2 +- include/pthread_backend.h | 38 +++++++++++++++++++++++++++++--------- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/csrc/backend.cpp b/csrc/backend.cpp index b6a4e14..ee29c4e 100644 --- a/csrc/backend.cpp +++ b/csrc/backend.cpp @@ -136,7 +136,7 @@ AsyncIO *create_asyncio(unsigned int n_entries, const std::string &backend) #endif #ifndef DISABLE_PTHREAD if (backend == "pthread") - throw std::runtime_error("not implemented: " + backend); + return new PthreadAsyncIO(n_entries); #endif throw std::runtime_error("Unsupported backend: " + backend); } \ No newline at end of file diff --git a/include/pthread_backend.h b/include/pthread_backend.h index 1a7886b..5947144 100644 --- a/include/pthread_backend.h +++ b/include/pthread_backend.h @@ -1,13 +1,16 @@ #pragma once +#include #include #include +#include +#include #include "asyncio.h" #include "threadpool.h" -static const unsigned int CTX_POOL_SIZE_DEFAULT = 8; -static const unsigned int CTX_MAX_REQUESTS_DEFAULT = 512; +static const unsigned int PTHREAD_POOL_SIZE_DEFAULT = 8; +static const char* PTHREAD_POOL_SIZE_ENVIRON_NAME = "PTHREAD_POOL_SIZE"; class PthradIOData; @@ -31,11 +34,10 @@ class AIOContext { if (this->pool != nullptr) { threadpool_t* pool = this->pool; this->pool = nullptr; - threadpool_destroy(pool, 0); + threadpool_destroy(pool, 0); // wait all threads } } - void submit(PthradIOData *op); static void worker(void *op); @@ -87,13 +89,31 @@ class PthreadAsyncIO : public AsyncIO { private: AIOContext ctx; - + + static int getEnvValue(const char* varName, int defaultValue) { + const char* envVar = std::getenv(varName); + if (envVar != nullptr) { + std::stringstream ss(envVar); + int value; + // Try converting to an integer + if (ss >> value) { + return value; + } else { + throw std::runtime_error("Failed to parse integer environ"); + } + } + return defaultValue; + } public: - PthreadAsyncIO( - unsigned int max_requests = CTX_MAX_REQUESTS_DEFAULT, - unsigned int pool_size = CTX_POOL_SIZE_DEFAULT - ): ctx(max_requests, pool_size) {} + PthreadAsyncIO(unsigned int n_entries) + : ctx( + n_entries, + getEnvValue( + PTHREAD_POOL_SIZE_ENVIRON_NAME, + PTHREAD_POOL_SIZE_DEFAULT + ) + ) {} ~PthreadAsyncIO() = default; From c97d01942f528d3b17733fb56012eb9b89ec0eeb Mon Sep 17 00:00:00 2001 From: botbw Date: Sun, 22 Sep 2024 15:49:05 +0800 Subject: [PATCH 06/13] [pthread] implement sync --- csrc/pthread_backend.cpp | 60 +++++++++++++++++++++++++++++---------- include/pthread_backend.h | 48 +++++++++++-------------------- 2 files changed, 61 insertions(+), 47 deletions(-) diff --git a/csrc/pthread_backend.cpp b/csrc/pthread_backend.cpp index 5ef235d..b0928f9 100644 --- a/csrc/pthread_backend.cpp +++ b/csrc/pthread_backend.cpp @@ -17,6 +17,7 @@ void AIOContext::worker(void *op_) { void* buf = op->buf; const iovec* iov = op->iov; const callback_t cb = op->callback; + std::atomic *p_cnt = op->p_cnt; int result; @@ -33,20 +34,22 @@ void AIOContext::worker(void *op_) { case READV: result = preadv(fileno, iov, buf_size, offset); break; + default: + throw std::runtime_error("Unkown task"); } - op->result = result; - if (cb != nullptr) { cb(); } - if (result < 0) op->error = errno; + if (result < 0) { + throw std::runtime_error("Error when executing tasks"); + } + p_cnt->fetch_sub(1); } void AIOContext::submit(PthradIOData *op) { - op->in_progress = true; int result = threadpool_add( this->pool, AIOContext::worker, @@ -58,7 +61,23 @@ void AIOContext::submit(PthradIOData *op) { } } +int PthreadAsyncIO::getEnvValue(const char* varName, int defaultValue) { + const char* envVar = std::getenv(varName); + if (envVar != nullptr) { + std::stringstream ss(envVar); + int value; + // Try converting to an integer + if (ss >> value) { + return value; + } else { + throw std::runtime_error("Failed to parse integer environ"); + } + } + return defaultValue; +} + void PthreadAsyncIO::write(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback) { + this->n_write_events.fetch_add(1); PthradIOData *op = new PthradIOData( WRITE, fd, @@ -66,12 +85,14 @@ void PthreadAsyncIO::write(int fd, void *buffer, size_t n_bytes, unsigned long l n_bytes, buffer, nullptr, - callback + callback, + &this->n_write_events ); this->ctx.submit(op); } void PthreadAsyncIO::read(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback) { + this->n_read_events.fetch_add(1); PthradIOData *op = new PthradIOData( READ, fd, @@ -79,53 +100,62 @@ void PthreadAsyncIO::read(int fd, void *buffer, size_t n_bytes, unsigned long lo n_bytes, buffer, nullptr, - callback + callback, + &this->n_read_events ); this->ctx.submit(op); } void PthreadAsyncIO::writev(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback) { - PthradIOData *op = new PthradIOData( + this->n_write_events.fetch_add(1); + PthradIOData *op = new PthradIOData( WRITEV, fd, offset, static_cast(iovcnt), nullptr, iov, - callback + callback, + &this->n_write_events ); this->ctx.submit(op); } void PthreadAsyncIO::readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback) { - PthradIOData *op = new PthradIOData( + this->n_read_events.fetch_add(1); + PthradIOData *op = new PthradIOData( READV, fd, offset, static_cast(iovcnt), nullptr, iov, - callback + callback, + &this->n_read_events ); this->ctx.submit(op); } void PthreadAsyncIO::get_event(WaitType wt) { - throw std::runtime_error("not implemented"); - // TODO @botbw: release PthreadIOData here + if (wt == NOWAIT) return; + // busy waiting + while ( + this->n_write_events.load() != 0 + || this->n_read_events.load() != 0 + ) {} } void PthreadAsyncIO::sync_write_events() { - throw std::runtime_error("not implemented"); + while (this->n_write_events.load() != 0) {} } void PthreadAsyncIO::sync_read_events() { - throw std::runtime_error("not implemented"); + while (this->n_read_events.load() != 0) {} } void PthreadAsyncIO::synchronize() { - throw std::runtime_error("not implemented"); + this->get_event(WAIT); } void PthreadAsyncIO::register_file(int fd) {} \ No newline at end of file diff --git a/include/pthread_backend.h b/include/pthread_backend.h index 5947144..3050092 100644 --- a/include/pthread_backend.h +++ b/include/pthread_backend.h @@ -5,6 +5,7 @@ #include #include #include +#include #include "asyncio.h" #include "threadpool.h" @@ -14,17 +15,15 @@ static const char* PTHREAD_POOL_SIZE_ENVIRON_NAME = "PTHREAD_POOL_SIZE"; class PthradIOData; +// thread pool wrapper class AIOContext { public: AIOContext( - unsigned int max_requests_, - unsigned int pool_size_ + unsigned int max_requests, + unsigned int pool_size ) - : pool(nullptr) - , max_requests(max_requests_) - , pool_size(pool_size_) + : pool(threadpool_create(max_requests, pool_size, 0)) { - this->pool = threadpool_create(this->pool_size, this->max_requests, 0); if (this->pool == nullptr) { throw std::runtime_error("failed to allocate thread pool"); } @@ -44,9 +43,6 @@ class AIOContext { private: threadpool_t *pool; - unsigned int max_requests; - unsigned int pool_size; - }; // data class @@ -61,27 +57,25 @@ class PthradIOData: IOData { const unsigned long long buf_size_, void* buf_, const iovec *iov_, - const callback_t callback_ + const callback_t callback_, + std::atomic *p_cnt_ ) : IOData(type_, callback_, iov_) , fileno(fileno_) , offset(offset_) - , result(-1) - , error(0) - , in_progress(false) , buf_size(buf_size_) , buf(buf_) + , p_cnt(p_cnt_) {} ~PthradIOData() = default; // release iov_ by calling parent destructor + private: const int fileno; const unsigned long long offset; - int result; - int error; - bool in_progress; const unsigned long long buf_size; void* buf; + std::atomic *p_cnt; }; @@ -89,22 +83,10 @@ class PthreadAsyncIO : public AsyncIO { private: AIOContext ctx; + std::atomic n_write_events; // atomic is needed here since this value could be updated by multiple threads + std::atomic n_read_events; // atomic is needed here since this value could be updated by multiple threads - static int getEnvValue(const char* varName, int defaultValue) { - const char* envVar = std::getenv(varName); - if (envVar != nullptr) { - std::stringstream ss(envVar); - int value; - // Try converting to an integer - if (ss >> value) { - return value; - } else { - throw std::runtime_error("Failed to parse integer environ"); - } - } - return defaultValue; - } - + static int getEnvValue(const char* varName, int defaultValue); public: PthreadAsyncIO(unsigned int n_entries) : ctx( @@ -113,7 +95,9 @@ class PthreadAsyncIO : public AsyncIO PTHREAD_POOL_SIZE_ENVIRON_NAME, PTHREAD_POOL_SIZE_DEFAULT ) - ) {} + ) + , n_write_events(0) + , n_read_events(0) {} ~PthreadAsyncIO() = default; From d099cd523ad334d1a4e3ea91c27f99f34e64f0cf Mon Sep 17 00:00:00 2001 From: botbw Date: Sun, 22 Sep 2024 18:16:52 +0800 Subject: [PATCH 07/13] [pthread] fix --- csrc/backend.cpp | 2 +- csrc/pthread_backend.cpp | 19 ++++++++++++------- csrc/py_api.cpp | 2 ++ include/pthread_backend.h | 12 ++++++------ 4 files changed, 21 insertions(+), 14 deletions(-) diff --git a/csrc/backend.cpp b/csrc/backend.cpp index ee29c4e..e00495d 100644 --- a/csrc/backend.cpp +++ b/csrc/backend.cpp @@ -88,7 +88,7 @@ void probe_asyncio(const std::string &backend) for (int i = 0; i < n_loop; i++) { for (int j = 0; j < n_len; j++) - { + { assert(text[i][j] == new_text[i][j]); } } diff --git a/csrc/pthread_backend.cpp b/csrc/pthread_backend.cpp index b0928f9..4f00afc 100644 --- a/csrc/pthread_backend.cpp +++ b/csrc/pthread_backend.cpp @@ -19,7 +19,7 @@ void AIOContext::worker(void *op_) { const callback_t cb = op->callback; std::atomic *p_cnt = op->p_cnt; - int result; + int result = -1; switch (op->type) { case WRITE: @@ -47,15 +47,20 @@ void AIOContext::worker(void *op_) { } p_cnt->fetch_sub(1); + delete op; } void AIOContext::submit(PthradIOData *op) { - int result = threadpool_add( - this->pool, - AIOContext::worker, - static_cast(op), - 0 - ); + int result; + do { + result = threadpool_add( + this->pool, + AIOContext::worker, + static_cast(op), + 0 + ); + } while (result == threadpool_queue_full); + if (result < 0) { throw std::runtime_error("error when submitting job"); } diff --git a/csrc/py_api.cpp b/csrc/py_api.cpp index 5584524..4d95a9f 100644 --- a/csrc/py_api.cpp +++ b/csrc/py_api.cpp @@ -1,5 +1,7 @@ #include #include +#include +#include #include "offload.h" #include "async_file_io.h" #include "backend.h" diff --git a/include/pthread_backend.h b/include/pthread_backend.h index 3050092..857adc9 100644 --- a/include/pthread_backend.h +++ b/include/pthread_backend.h @@ -19,10 +19,10 @@ class PthradIOData; class AIOContext { public: AIOContext( - unsigned int max_requests, - unsigned int pool_size + unsigned int pool_size, + unsigned int n_entries ) - : pool(threadpool_create(max_requests, pool_size, 0)) + : pool(threadpool_create(pool_size, n_entries, 0)) { if (this->pool == nullptr) { throw std::runtime_error("failed to allocate thread pool"); @@ -33,7 +33,7 @@ class AIOContext { if (this->pool != nullptr) { threadpool_t* pool = this->pool; this->pool = nullptr; - threadpool_destroy(pool, 0); // wait all threads + threadpool_destroy(pool, 1); // wait all threads } } @@ -90,11 +90,11 @@ class PthreadAsyncIO : public AsyncIO public: PthreadAsyncIO(unsigned int n_entries) : ctx( - n_entries, getEnvValue( PTHREAD_POOL_SIZE_ENVIRON_NAME, PTHREAD_POOL_SIZE_DEFAULT - ) + ), + n_entries ) , n_write_events(0) , n_read_events(0) {} From 6acab39c9ca57c74aa1e745e9dfa5a84122f0e8e Mon Sep 17 00:00:00 2001 From: botbw Date: Mon, 23 Sep 2024 11:49:27 +0800 Subject: [PATCH 08/13] [pthread] debug hang --- csrc/pthread_backend.cpp | 52 ++++++++++++++++++++++++++++++++++------ setup.py | 3 +++ 2 files changed, 48 insertions(+), 7 deletions(-) diff --git a/csrc/pthread_backend.cpp b/csrc/pthread_backend.cpp index 4f00afc..3ff97b8 100644 --- a/csrc/pthread_backend.cpp +++ b/csrc/pthread_backend.cpp @@ -7,6 +7,17 @@ #include "threadpool.h" #include "pthread_backend.h" +#ifdef DEBUG +#include +#include +std::mutex cout_mutex; + +template +void thread_safe_cout(Args&&... args) { + std::lock_guard guard(cout_mutex); + (std::cout << ... << args) << std::endl; +} +#endif void AIOContext::worker(void *op_) { PthradIOData *op = reinterpret_cast(op_); @@ -47,6 +58,9 @@ void AIOContext::worker(void *op_) { } p_cnt->fetch_sub(1); +#ifdef DEBUG + thread_safe_cout("worker_end:", op->type, ":", p_cnt->load()); +#endif delete op; } @@ -82,7 +96,6 @@ int PthreadAsyncIO::getEnvValue(const char* varName, int defaultValue) { } void PthreadAsyncIO::write(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback) { - this->n_write_events.fetch_add(1); PthradIOData *op = new PthradIOData( WRITE, fd, @@ -93,11 +106,14 @@ void PthreadAsyncIO::write(int fd, void *buffer, size_t n_bytes, unsigned long l callback, &this->n_write_events ); + this->n_write_events.fetch_add(1); this->ctx.submit(op); +#ifdef DEBUG + thread_safe_cout("write:",this->n_write_events.load()); +#endif } void PthreadAsyncIO::read(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback) { - this->n_read_events.fetch_add(1); PthradIOData *op = new PthradIOData( READ, fd, @@ -108,12 +124,15 @@ void PthreadAsyncIO::read(int fd, void *buffer, size_t n_bytes, unsigned long lo callback, &this->n_read_events ); + this->n_read_events.fetch_add(1); this->ctx.submit(op); +#ifdef DEBUG + thread_safe_cout("read:", this->n_read_events.load()); +#endif } void PthreadAsyncIO::writev(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback) { - this->n_write_events.fetch_add(1); PthradIOData *op = new PthradIOData( WRITEV, fd, @@ -124,11 +143,14 @@ void PthreadAsyncIO::writev(int fd, const iovec *iov, unsigned int iovcnt, unsig callback, &this->n_write_events ); + this->n_write_events.fetch_add(1); this->ctx.submit(op); +#ifdef DEBUG + thread_safe_cout("writev:", this->n_write_events.load()); +#endif } void PthreadAsyncIO::readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback) { - this->n_read_events.fetch_add(1); PthradIOData *op = new PthradIOData( READV, fd, @@ -139,7 +161,11 @@ void PthreadAsyncIO::readv(int fd, const iovec *iov, unsigned int iovcnt, unsign callback, &this->n_read_events ); + this->n_read_events.fetch_add(1); this->ctx.submit(op); +#ifdef DEBUG + thread_safe_cout("readv", this->n_read_events.load()); +#endif } void PthreadAsyncIO::get_event(WaitType wt) { @@ -148,15 +174,27 @@ void PthreadAsyncIO::get_event(WaitType wt) { while ( this->n_write_events.load() != 0 || this->n_read_events.load() != 0 - ) {} + ) { +#ifdef DEBUG + thread_safe_cout("get_event:", this->n_write_events.load(), ":", this->n_read_events.load()); +#endif + } } void PthreadAsyncIO::sync_write_events() { - while (this->n_write_events.load() != 0) {} + while (this->n_write_events.load() != 0) { +#ifdef DEBUG + thread_safe_cout("sync_write_events:", this->n_write_events.load()); +#endif + } } void PthreadAsyncIO::sync_read_events() { - while (this->n_read_events.load() != 0) {} + while (this->n_read_events.load() != 0) { +#ifdef DEBUG + thread_safe_cout("n_read_events:", this->n_read_events.load()); +#endif + } } void PthreadAsyncIO::synchronize() { diff --git a/setup.py b/setup.py index 99cbfce..257be99 100644 --- a/setup.py +++ b/setup.py @@ -30,6 +30,7 @@ def check_pthread_compatibility(): this_dir = os.path.dirname(os.path.abspath(__file__)) backend_install_dir = os.path.join(os.path.expanduser("~"), ".tensornvme") +debug = os.environ.get("DEBUG", "0") == "1" enable_uring = True enable_aio = True enable_pthread = True @@ -113,6 +114,8 @@ def setup_bachrc(): def setup_dependencies(): build_dir = os.path.join(this_dir, "cmake-build") + if debug: + define_macros.append(("DEBUG", None)) if not enable_uring: define_macros.append(("DISABLE_URING", None)) sources.remove("csrc/uring.cpp") From 337fda1d21258705a28f52c5c97e9ab56691987f Mon Sep 17 00:00:00 2001 From: botbw Date: Mon, 23 Sep 2024 13:29:38 +0800 Subject: [PATCH 09/13] [pthread] use c++ 3rd lib thread pool --- csrc/pthread_backend.cpp | 215 ++----- csrc/threadpool.cpp | 306 ---------- include/pthread_backend.h | 95 +-- include/threadpool.h | 93 --- include/threadpool.hpp | 1158 +++++++++++++++++++++++++++++++++++++ setup.py | 1 - 6 files changed, 1217 insertions(+), 651 deletions(-) delete mode 100644 csrc/threadpool.cpp delete mode 100644 include/threadpool.h create mode 100644 include/threadpool.hpp diff --git a/csrc/pthread_backend.cpp b/csrc/pthread_backend.cpp index 3ff97b8..2f2db9e 100644 --- a/csrc/pthread_backend.cpp +++ b/csrc/pthread_backend.cpp @@ -4,196 +4,83 @@ #include #include "asyncio.h" -#include "threadpool.h" #include "pthread_backend.h" -#ifdef DEBUG #include -#include -std::mutex cout_mutex; - -template -void thread_safe_cout(Args&&... args) { - std::lock_guard guard(cout_mutex); - (std::cout << ... << args) << std::endl; -} -#endif - -void AIOContext::worker(void *op_) { - PthradIOData *op = reinterpret_cast(op_); - - int fileno = op->fileno; - off_t offset = op->offset; - int buf_size = op->buf_size; - void* buf = op->buf; - const iovec* iov = op->iov; - const callback_t cb = op->callback; - std::atomic *p_cnt = op->p_cnt; - - int result = -1; - - switch (op->type) { - case WRITE: - result = pwrite(fileno, buf, buf_size, offset); - break; - case WRITEV: - result = pwritev(fileno, iov, buf_size, offset); - break; - case READ: - result = pread(fileno, buf, buf_size, offset); - break; - case READV: - result = preadv(fileno, iov, buf_size, offset); - break; - default: - throw std::runtime_error("Unkown task"); - } - - if (cb != nullptr) { - cb(); - } - - if (result < 0) { - throw std::runtime_error("Error when executing tasks"); - } - - p_cnt->fetch_sub(1); -#ifdef DEBUG - thread_safe_cout("worker_end:", op->type, ":", p_cnt->load()); -#endif - delete op; -} - -void AIOContext::submit(PthradIOData *op) { - int result; - do { - result = threadpool_add( - this->pool, - AIOContext::worker, - static_cast(op), - 0 - ); - } while (result == threadpool_queue_full); - - if (result < 0) { - throw std::runtime_error("error when submitting job"); - } -} - -int PthreadAsyncIO::getEnvValue(const char* varName, int defaultValue) { - const char* envVar = std::getenv(varName); - if (envVar != nullptr) { - std::stringstream ss(envVar); - int value; - // Try converting to an integer - if (ss >> value) { - return value; - } else { - throw std::runtime_error("Failed to parse integer environ"); - } - } - return defaultValue; -} - void PthreadAsyncIO::write(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback) { - PthradIOData *op = new PthradIOData( - WRITE, - fd, - offset, - n_bytes, - buffer, - nullptr, - callback, - &this->n_write_events + auto fut = this->pool.submit_task( + [fd, buffer, n_bytes, offset, callback] { + int result = pwrite(fd, buffer, n_bytes, offset); + if (callback != nullptr) { + callback(); + } + return result; + } ); - this->n_write_events.fetch_add(1); - this->ctx.submit(op); -#ifdef DEBUG - thread_safe_cout("write:",this->n_write_events.load()); -#endif + this->write_fut.push_back(std::move(fut)); } void PthreadAsyncIO::read(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback) { - PthradIOData *op = new PthradIOData( - READ, - fd, - offset, - n_bytes, - buffer, - nullptr, - callback, - &this->n_read_events + auto fut = this->pool.submit_task( + [fd, buffer, n_bytes, offset, callback] { + int result = pread(fd, buffer, n_bytes, offset); + if (callback != nullptr) { + callback(); + } + return result; + } ); - this->n_read_events.fetch_add(1); - this->ctx.submit(op); -#ifdef DEBUG - thread_safe_cout("read:", this->n_read_events.load()); -#endif + this->read_fut.push_back(std::move(fut)); } - -void PthreadAsyncIO::writev(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback) { - PthradIOData *op = new PthradIOData( - WRITEV, - fd, - offset, - static_cast(iovcnt), - nullptr, - iov, - callback, - &this->n_write_events +void PthreadAsyncIO::readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback) { + auto fut = this->pool.submit_task( + [fd, iov, iovcnt, offset, callback] { + int result = preadv(fd, iov, iovcnt, offset); + if (callback != nullptr) { + callback(); + } + return result; + } ); - this->n_write_events.fetch_add(1); - this->ctx.submit(op); -#ifdef DEBUG - thread_safe_cout("writev:", this->n_write_events.load()); -#endif + this->read_fut.push_back(std::move(fut)); } -void PthreadAsyncIO::readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback) { - PthradIOData *op = new PthradIOData( - READV, - fd, - offset, - static_cast(iovcnt), - nullptr, - iov, - callback, - &this->n_read_events +void PthreadAsyncIO::writev(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback) { + auto fut = this->pool.submit_task( + [fd, iov, iovcnt, offset, callback] { + int result = pwritev(fd, iov, iovcnt, offset); + if (callback != nullptr) { + callback(); + } + return result; + } ); - this->n_read_events.fetch_add(1); - this->ctx.submit(op); -#ifdef DEBUG - thread_safe_cout("readv", this->n_read_events.load()); -#endif + this->write_fut.push_back(std::move(fut)); } void PthreadAsyncIO::get_event(WaitType wt) { if (wt == NOWAIT) return; - // busy waiting - while ( - this->n_write_events.load() != 0 - || this->n_read_events.load() != 0 - ) { -#ifdef DEBUG - thread_safe_cout("get_event:", this->n_write_events.load(), ":", this->n_read_events.load()); -#endif - } + this->sync_write_events(); + this->sync_read_events(); + } void PthreadAsyncIO::sync_write_events() { - while (this->n_write_events.load() != 0) { -#ifdef DEBUG - thread_safe_cout("sync_write_events:", this->n_write_events.load()); -#endif + while (this->write_fut.size() > 0) { + std::cout << "hi:" << this->write_fut.size() << std::endl; + auto front = std::move(this->write_fut.front()); + this->write_fut.pop_front(); + front.wait(); } } void PthreadAsyncIO::sync_read_events() { - while (this->n_read_events.load() != 0) { -#ifdef DEBUG - thread_safe_cout("n_read_events:", this->n_read_events.load()); -#endif + while (this->read_fut.size() > 0) { + std::cout << "ho:" << this->read_fut.size() << std::endl; + auto front = std::move(this->read_fut.front()); + this->read_fut.pop_front(); + front.wait(); } } diff --git a/csrc/threadpool.cpp b/csrc/threadpool.cpp deleted file mode 100644 index 60c3ff8..0000000 --- a/csrc/threadpool.cpp +++ /dev/null @@ -1,306 +0,0 @@ -/* - * Copyright (c) 2016, Mathias Brossard . - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -/** - * @file threadpool.c - * @brief Threadpool implementation file - */ - - -#include -#include -#include - -#include "threadpool.h" - -typedef enum { - immediate_shutdown = 1, - graceful_shutdown = 2 -} threadpool_shutdown_t; - -/** - * @struct threadpool_task - * @brief the work struct - * - * @var function Pointer to the function that will perform the task. - * @var argument Argument to be passed to the function. - */ - -typedef struct { - void (*function)(void *); - void *argument; -} threadpool_task_t; - -/** - * @struct threadpool - * @brief The threadpool struct - * - * @var notify Condition variable to notify worker threads. - * @var threads Array containing worker threads ID. - * @var thread_count Number of threads - * @var queue Array containing the task queue. - * @var queue_size Size of the task queue. - * @var head Index of the first element. - * @var tail Index of the next element. - * @var count Number of pending tasks - * @var shutdown Flag indicating if the pool is shutting down - * @var started Number of started threads - */ -struct threadpool_t { - pthread_mutex_t lock; - pthread_cond_t notify; - pthread_t *threads; - threadpool_task_t *queue; - int thread_count; - int queue_size; - int head; - int tail; - int count; - int shutdown; - int started; -}; - -/** - * @function void *threadpool_thread(void *threadpool) - * @brief the worker thread - * @param threadpool the pool which own the thread - */ -static void *threadpool_thread(void *threadpool); - -int threadpool_free(threadpool_t *pool); - -threadpool_t *threadpool_create(int thread_count, int queue_size, int flags) -{ - threadpool_t *pool; - int i; - (void) flags; - - if(thread_count <= 0 || thread_count > MAX_THREADS || queue_size <= 0 || queue_size > MAX_QUEUE) { - return NULL; - } - - if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) { - goto err; - } - - /* Initialize */ - pool->thread_count = 0; - pool->queue_size = queue_size; - pool->head = pool->tail = pool->count = 0; - pool->shutdown = pool->started = 0; - - /* Allocate thread and task queue */ - pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count); - pool->queue = (threadpool_task_t *)malloc - (sizeof(threadpool_task_t) * queue_size); - - /* Initialize mutex and conditional variable first */ - if((pthread_mutex_init(&(pool->lock), NULL) != 0) || - (pthread_cond_init(&(pool->notify), NULL) != 0) || - (pool->threads == NULL) || - (pool->queue == NULL)) { - goto err; - } - - /* Start worker threads */ - for(i = 0; i < thread_count; i++) { - if(pthread_create(&(pool->threads[i]), NULL, - threadpool_thread, (void*)pool) != 0) { - threadpool_destroy(pool, 0); - return NULL; - } - pool->thread_count++; - pool->started++; - } - - return pool; - - err: - if(pool) { - threadpool_free(pool); - } - return NULL; -} - -int threadpool_add(threadpool_t *pool, void (*function)(void *), - void *argument, int flags) -{ - int err = 0; - int next; - (void) flags; - - if(pool == NULL || function == NULL) { - return threadpool_invalid; - } - - if(pthread_mutex_lock(&(pool->lock)) != 0) { - return threadpool_lock_failure; - } - - next = (pool->tail + 1) % pool->queue_size; - - do { - /* Are we full ? */ - if(pool->count == pool->queue_size) { - err = threadpool_queue_full; - break; - } - - /* Are we shutting down ? */ - if(pool->shutdown) { - err = threadpool_shutdown; - break; - } - - /* Add task to queue */ - pool->queue[pool->tail].function = function; - pool->queue[pool->tail].argument = argument; - pool->tail = next; - pool->count += 1; - - /* pthread_cond_broadcast */ - if(pthread_cond_signal(&(pool->notify)) != 0) { - err = threadpool_lock_failure; - break; - } - } while(0); - - if(pthread_mutex_unlock(&pool->lock) != 0) { - err = threadpool_lock_failure; - } - - return err; -} - -int threadpool_destroy(threadpool_t *pool, int flags) -{ - int i, err = 0; - - if(pool == NULL) { - return threadpool_invalid; - } - - if(pthread_mutex_lock(&(pool->lock)) != 0) { - return threadpool_lock_failure; - } - - do { - /* Already shutting down */ - if(pool->shutdown) { - err = threadpool_shutdown; - break; - } - - pool->shutdown = (flags & threadpool_graceful) ? - graceful_shutdown : immediate_shutdown; - - /* Wake up all worker threads */ - if((pthread_cond_broadcast(&(pool->notify)) != 0) || - (pthread_mutex_unlock(&(pool->lock)) != 0)) { - err = threadpool_lock_failure; - break; - } - - /* Join all worker thread */ - for(i = 0; i < pool->thread_count; i++) { - if(pthread_join(pool->threads[i], NULL) != 0) { - err = threadpool_thread_failure; - } - } - } while(0); - - /* Only if everything went well do we deallocate the pool */ - if(!err) { - threadpool_free(pool); - } - return err; -} - -int threadpool_free(threadpool_t *pool) -{ - if(pool == NULL || pool->started > 0) { - return -1; - } - - /* Did we manage to allocate ? */ - if(pool->threads) { - free(pool->threads); - free(pool->queue); - - /* Because we allocate pool->threads after initializing the - mutex and condition variable, we're sure they're - initialized. Let's lock the mutex just in case. */ - pthread_mutex_lock(&(pool->lock)); - pthread_mutex_destroy(&(pool->lock)); - pthread_cond_destroy(&(pool->notify)); - } - free(pool); - return 0; -} - - -static void *threadpool_thread(void *threadpool) -{ - threadpool_t *pool = (threadpool_t *)threadpool; - threadpool_task_t task; - - for(;;) { - /* Lock must be taken to wait on conditional variable */ - pthread_mutex_lock(&(pool->lock)); - - /* Wait on condition variable, check for spurious wakeups. - When returning from pthread_cond_wait(), we own the lock. */ - while((pool->count == 0) && (!pool->shutdown)) { - pthread_cond_wait(&(pool->notify), &(pool->lock)); - } - - if((pool->shutdown == immediate_shutdown) || - ((pool->shutdown == graceful_shutdown) && - (pool->count == 0))) { - break; - } - - /* Grab our task */ - task.function = pool->queue[pool->head].function; - task.argument = pool->queue[pool->head].argument; - pool->head = (pool->head + 1) % pool->queue_size; - pool->count -= 1; - - /* Unlock */ - pthread_mutex_unlock(&(pool->lock)); - - /* Get to work */ - (*(task.function))(task.argument); - } - - pool->started--; - - pthread_mutex_unlock(&(pool->lock)); - pthread_exit(NULL); - return(NULL); -} \ No newline at end of file diff --git a/include/pthread_backend.h b/include/pthread_backend.h index 857adc9..2133ad6 100644 --- a/include/pthread_backend.h +++ b/include/pthread_backend.h @@ -1,105 +1,26 @@ #pragma once -#include #include #include #include -#include -#include +#include +#include #include "asyncio.h" -#include "threadpool.h" - - -static const unsigned int PTHREAD_POOL_SIZE_DEFAULT = 8; -static const char* PTHREAD_POOL_SIZE_ENVIRON_NAME = "PTHREAD_POOL_SIZE"; - -class PthradIOData; - -// thread pool wrapper -class AIOContext { -public: - AIOContext( - unsigned int pool_size, - unsigned int n_entries - ) - : pool(threadpool_create(pool_size, n_entries, 0)) - { - if (this->pool == nullptr) { - throw std::runtime_error("failed to allocate thread pool"); - } - } - - ~AIOContext() { - if (this->pool != nullptr) { - threadpool_t* pool = this->pool; - this->pool = nullptr; - threadpool_destroy(pool, 1); // wait all threads - } - } - - void submit(PthradIOData *op); - - static void worker(void *op); - -private: - threadpool_t *pool; -}; - -// data class -class PthradIOData: IOData { - friend class AIOContext; - -public: - PthradIOData( - const IOType type_, - const int fileno_, - const unsigned long long offset_, - const unsigned long long buf_size_, - void* buf_, - const iovec *iov_, - const callback_t callback_, - std::atomic *p_cnt_ - ) - : IOData(type_, callback_, iov_) - , fileno(fileno_) - , offset(offset_) - , buf_size(buf_size_) - , buf(buf_) - , p_cnt(p_cnt_) - {} - - ~PthradIOData() = default; // release iov_ by calling parent destructor - -private: - const int fileno; - const unsigned long long offset; - const unsigned long long buf_size; - void* buf; - std::atomic *p_cnt; -}; +#include "threadpool.hpp" class PthreadAsyncIO : public AsyncIO { private: - AIOContext ctx; - std::atomic n_write_events; // atomic is needed here since this value could be updated by multiple threads - std::atomic n_read_events; // atomic is needed here since this value could be updated by multiple threads + BS::thread_pool pool; + std::deque> write_fut; + std::deque> read_fut; - static int getEnvValue(const char* varName, int defaultValue); public: PthreadAsyncIO(unsigned int n_entries) - : ctx( - getEnvValue( - PTHREAD_POOL_SIZE_ENVIRON_NAME, - PTHREAD_POOL_SIZE_DEFAULT - ), - n_entries - ) - , n_write_events(0) - , n_read_events(0) {} + : pool(n_entries) {} - ~PthreadAsyncIO() = default; + ~PthreadAsyncIO() {} void write(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback); void read(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback); diff --git a/include/threadpool.h b/include/threadpool.h deleted file mode 100644 index b206c13..0000000 --- a/include/threadpool.h +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright (c) 2016, Mathias Brossard . - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * 1. Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -#ifndef _THREADPOOL_H_ -#define _THREADPOOL_H_ - -/** - * @file threadpool.h - * @brief Threadpool Header File - */ - - /** - * Increase this constants at your own risk - * Large values might slow down your system - */ -#define MAX_THREADS 128 -#define MAX_QUEUE 65536 - -typedef struct threadpool_t threadpool_t; - -typedef enum { - threadpool_invalid = -1, - threadpool_lock_failure = -2, - threadpool_queue_full = -3, - threadpool_shutdown = -4, - threadpool_thread_failure = -5 -} threadpool_error_t; - -typedef enum { - threadpool_graceful = 1 -} threadpool_destroy_flags_t; - -/** - * @function threadpool_create - * @brief Creates a threadpool_t object. - * @param thread_count Number of worker threads. - * @param queue_size Size of the queue. - * @param flags Unused parameter. - * @return a newly created thread pool or NULL - */ -threadpool_t *threadpool_create(int thread_count, int queue_size, int flags); - -/** - * @function threadpool_add - * @brief add a new task in the queue of a thread pool - * @param pool Thread pool to which add the task. - * @param function Pointer to the function that will perform the task. - * @param argument Argument to be passed to the function. - * @param flags Unused parameter. - * @return 0 if all goes well, negative values in case of error (@see - * threadpool_error_t for codes). - */ -int threadpool_add(threadpool_t *pool, void (*routine)(void *), - void *arg, int flags); - -/** - * @function threadpool_destroy - * @brief Stops and destroys a thread pool. - * @param pool Thread pool to destroy. - * @param flags Flags for shutdown - * - * Known values for flags are 0 (default) and threadpool_graceful in - * which case the thread pool doesn't accept any new tasks but - * processes all pending tasks before shutdown. - */ -int threadpool_destroy(threadpool_t *pool, int flags); - -#endif /* _THREADPOOL_H_ */ diff --git a/include/threadpool.hpp b/include/threadpool.hpp new file mode 100644 index 0000000..14e764d --- /dev/null +++ b/include/threadpool.hpp @@ -0,0 +1,1158 @@ +// Copied from https://github.com/bshoshany/thread-pool/blob/097aa718f25d44315cadb80b407144ad455ee4f9/include/BS_thread_pool.hpp + +#ifndef BS_THREAD_POOL_HPP +#define BS_THREAD_POOL_HPP +/** + * @file BS_thread_pool.hpp + * @author Barak Shoshany (baraksh@gmail.com) (https://baraksh.com) + * @version 4.1.0 + * @date 2024-03-22 + * @copyright Copyright (c) 2024 Barak Shoshany. Licensed under the MIT license. If you found this project useful, please consider starring it on GitHub! If you use this library in software of any kind, please provide a link to the GitHub repository https://github.com/bshoshany/thread-pool in the source code and documentation. If you use this library in published research, please cite it as follows: Barak Shoshany, "A C++17 Thread Pool for High-Performance Scientific Computing", doi:10.1016/j.softx.2024.101687, SoftwareX 26 (2024) 101687, arXiv:2105.00613 + * + * @brief BS::thread_pool: a fast, lightweight, and easy-to-use C++17 thread pool library. This header file contains the main thread pool class and some additional classes and definitions. No other files are needed in order to use the thread pool itself. + */ + +#ifndef __cpp_exceptions + #define BS_THREAD_POOL_DISABLE_EXCEPTION_HANDLING + #undef BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK +#endif + +#include // std::chrono +#include // std::condition_variable +#include // std::size_t +#ifdef BS_THREAD_POOL_ENABLE_PRIORITY + #include // std::int_least16_t +#endif +#ifndef BS_THREAD_POOL_DISABLE_EXCEPTION_HANDLING + #include // std::current_exception +#endif +#include // std::function +#include // std::future, std::future_status, std::promise +#include // std::make_shared, std::make_unique, std::shared_ptr, std::unique_ptr +#include // std::mutex, std::scoped_lock, std::unique_lock +#include // std::nullopt, std::optional +#include // std::priority_queue (if priority enabled), std::queue +#ifdef BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK + #include // std::runtime_error +#endif +#include // std::thread +#include // std::conditional_t, std::decay_t, std::invoke_result_t, std::is_void_v, std::remove_const_t (if priority enabled) +#include // std::forward, std::move +#include // std::vector + +/** + * @brief A namespace used by Barak Shoshany's projects. + */ +namespace BS { +// Macros indicating the version of the thread pool library. +#define BS_THREAD_POOL_VERSION_MAJOR 4 +#define BS_THREAD_POOL_VERSION_MINOR 1 +#define BS_THREAD_POOL_VERSION_PATCH 0 + +class thread_pool; + +/** + * @brief A type to represent the size of things. + */ +using size_t = std::size_t; + +/** + * @brief A convenient shorthand for the type of `std::thread::hardware_concurrency()`. Should evaluate to unsigned int. + */ +using concurrency_t = std::invoke_result_t; + +#ifdef BS_THREAD_POOL_ENABLE_PRIORITY +/** + * @brief A type used to indicate the priority of a task. Defined to be an integer with a width of (at least) 16 bits. + */ +using priority_t = std::int_least16_t; + +/** + * @brief A namespace containing some pre-defined priorities for convenience. + */ +namespace pr { + constexpr priority_t highest = 32767; + constexpr priority_t high = 16383; + constexpr priority_t normal = 0; + constexpr priority_t low = -16384; + constexpr priority_t lowest = -32768; +} // namespace pr + + // Macros used internally to enable or disable the priority arguments in the relevant functions. + #define BS_THREAD_POOL_PRIORITY_INPUT , const priority_t priority = 0 + #define BS_THREAD_POOL_PRIORITY_OUTPUT , priority +#else + #define BS_THREAD_POOL_PRIORITY_INPUT + #define BS_THREAD_POOL_PRIORITY_OUTPUT +#endif + +/** + * @brief A namespace used to obtain information about the current thread. + */ +namespace this_thread { + /** + * @brief A type returned by `BS::this_thread::get_index()` which can optionally contain the index of a thread, if that thread belongs to a `BS::thread_pool`. Otherwise, it will contain no value. + */ + using optional_index = std::optional; + + /** + * @brief A type returned by `BS::this_thread::get_pool()` which can optionally contain the pointer to the pool that owns a thread, if that thread belongs to a `BS::thread_pool`. Otherwise, it will contain no value. + */ + using optional_pool = std::optional; + + /** + * @brief A helper class to store information about the index of the current thread. + */ + class [[nodiscard]] thread_info_index + { + friend class BS::thread_pool; + + public: + /** + * @brief Get the index of the current thread. If this thread belongs to a `BS::thread_pool` object, it will have an index from 0 to `BS::thread_pool::get_thread_count() - 1`. Otherwise, for example if this thread is the main thread or an independent `std::thread`, `std::nullopt` will be returned. + * + * @return An `std::optional` object, optionally containing a thread index. Unless you are 100% sure this thread is in a pool, first use `std::optional::has_value()` to check if it contains a value, and if so, use `std::optional::value()` to obtain that value. + */ + [[nodiscard]] optional_index operator()() const + { + return index; + } + + private: + /** + * @brief The index of the current thread. + */ + optional_index index = std::nullopt; + }; // class thread_info_index + + /** + * @brief A helper class to store information about the thread pool that owns the current thread. + */ + class [[nodiscard]] thread_info_pool + { + friend class BS::thread_pool; + + public: + /** + * @brief Get the pointer to the thread pool that owns the current thread. If this thread belongs to a `BS::thread_pool` object, a pointer to that object will be returned. Otherwise, for example if this thread is the main thread or an independent `std::thread`, `std::nullopt` will be returned. + * + * @return An `std::optional` object, optionally containing a pointer to a thread pool. Unless you are 100% sure this thread is in a pool, first use `std::optional::has_value()` to check if it contains a value, and if so, use `std::optional::value()` to obtain that value. + */ + [[nodiscard]] optional_pool operator()() const + { + return pool; + } + + private: + /** + * @brief A pointer to the thread pool that owns the current thread. + */ + optional_pool pool = std::nullopt; + }; // class thread_info_pool + + /** + * @brief A `thread_local` object used to obtain information about the index of the current thread. + */ + inline thread_local thread_info_index get_index; + + /** + * @brief A `thread_local` object used to obtain information about the thread pool that owns the current thread. + */ + inline thread_local thread_info_pool get_pool; +} // namespace this_thread + +/** + * @brief A helper class to facilitate waiting for and/or getting the results of multiple futures at once. + * + * @tparam T The return type of the futures. + */ +template +class [[nodiscard]] multi_future : public std::vector> +{ +public: + // Inherit all constructors from the base class `std::vector`. + using std::vector>::vector; + + // The copy constructor and copy assignment operator are deleted. The elements stored in a `multi_future` are futures, which cannot be copied. + multi_future(const multi_future&) = delete; + multi_future& operator=(const multi_future&) = delete; + + // The move constructor and move assignment operator are defaulted. + multi_future(multi_future&&) = default; + multi_future& operator=(multi_future&&) = default; + + /** + * @brief Get the results from all the futures stored in this `multi_future`, rethrowing any stored exceptions. + * + * @return If the futures return `void`, this function returns `void` as well. Otherwise, it returns a vector containing the results. + */ + [[nodiscard]] std::conditional_t, void, std::vector> get() + { + if constexpr (std::is_void_v) + { + for (std::future& future : *this) + future.get(); + return; + } + else + { + std::vector results; + results.reserve(this->size()); + for (std::future& future : *this) + results.push_back(future.get()); + return results; + } + } + + /** + * @brief Check how many of the futures stored in this `multi_future` are ready. + * + * @return The number of ready futures. + */ + [[nodiscard]] size_t ready_count() const + { + size_t count = 0; + for (const std::future& future : *this) + { + if (future.wait_for(std::chrono::duration::zero()) == std::future_status::ready) + ++count; + } + return count; + } + + /** + * @brief Check if all the futures stored in this `multi_future` are valid. + * + * @return `true` if all futures are valid, `false` if at least one of the futures is not valid. + */ + [[nodiscard]] bool valid() const + { + bool is_valid = true; + for (const std::future& future : *this) + is_valid = is_valid && future.valid(); + return is_valid; + } + + /** + * @brief Wait for all the futures stored in this `multi_future`. + */ + void wait() const + { + for (const std::future& future : *this) + future.wait(); + } + + /** + * @brief Wait for all the futures stored in this `multi_future`, but stop waiting after the specified duration has passed. This function first waits for the first future for the desired duration. If that future is ready before the duration expires, this function waits for the second future for whatever remains of the duration. It continues similarly until the duration expires. + * + * @tparam R An arithmetic type representing the number of ticks to wait. + * @tparam P An `std::ratio` representing the length of each tick in seconds. + * @param duration The amount of time to wait. + * @return `true` if all futures have been waited for before the duration expired, `false` otherwise. + */ + template + bool wait_for(const std::chrono::duration& duration) const + { + const std::chrono::time_point start_time = std::chrono::steady_clock::now(); + for (const std::future& future : *this) + { + future.wait_for(duration - (std::chrono::steady_clock::now() - start_time)); + if (duration < std::chrono::steady_clock::now() - start_time) + return false; + } + return true; + } + + /** + * @brief Wait for all the futures stored in this `multi_future`, but stop waiting after the specified time point has been reached. This function first waits for the first future until the desired time point. If that future is ready before the time point is reached, this function waits for the second future until the desired time point. It continues similarly until the time point is reached. + * + * @tparam C The type of the clock used to measure time. + * @tparam D An `std::chrono::duration` type used to indicate the time point. + * @param timeout_time The time point at which to stop waiting. + * @return `true` if all futures have been waited for before the time point was reached, `false` otherwise. + */ + template + bool wait_until(const std::chrono::time_point& timeout_time) const + { + for (const std::future& future : *this) + { + future.wait_until(timeout_time); + if (timeout_time < std::chrono::steady_clock::now()) + return false; + } + return true; + } +}; // class multi_future + +/** + * @brief A fast, lightweight, and easy-to-use C++17 thread pool class. + */ +class [[nodiscard]] thread_pool +{ +public: + // ============================ + // Constructors and destructors + // ============================ + + /** + * @brief Construct a new thread pool. The number of threads will be the total number of hardware threads available, as reported by the implementation. This is usually determined by the number of cores in the CPU. If a core is hyperthreaded, it will count as two threads. + */ + thread_pool() : thread_pool(0, [] {}) {} + + /** + * @brief Construct a new thread pool with the specified number of threads. + * + * @param num_threads The number of threads to use. + */ + explicit thread_pool(const concurrency_t num_threads) : thread_pool(num_threads, [] {}) {} + + /** + * @brief Construct a new thread pool with the specified initialization function. + * + * @param init_task An initialization function to run in each thread before it starts to execute any submitted tasks. The function must take no arguments and have no return value. It will only be executed exactly once, when the thread is first constructed. + */ + explicit thread_pool(const std::function& init_task) : thread_pool(0, init_task) {} + + /** + * @brief Construct a new thread pool with the specified number of threads and initialization function. + * + * @param num_threads The number of threads to use. + * @param init_task An initialization function to run in each thread before it starts to execute any submitted tasks. The function must take no arguments and have no return value. It will only be executed exactly once, when the thread is first constructed. + */ + thread_pool(const concurrency_t num_threads, const std::function& init_task) : thread_count(determine_thread_count(num_threads)), threads(std::make_unique(determine_thread_count(num_threads))) + { + create_threads(init_task); + } + + // The copy and move constructors and assignment operators are deleted. The thread pool uses a mutex, which cannot be copied or moved. + thread_pool(const thread_pool&) = delete; + thread_pool(thread_pool&&) = delete; + thread_pool& operator=(const thread_pool&) = delete; + thread_pool& operator=(thread_pool&&) = delete; + + /** + * @brief Destruct the thread pool. Waits for all tasks to complete, then destroys all threads. Note that if the pool is paused, then any tasks still in the queue will never be executed. + */ + ~thread_pool() + { + wait(); + destroy_threads(); + } + + // ======================= + // Public member functions + // ======================= + +#ifdef BS_THREAD_POOL_ENABLE_NATIVE_HANDLES + /** + * @brief Get a vector containing the underlying implementation-defined thread handles for each of the pool's threads, as obtained by `std::thread::native_handle()`. Only enabled if `BS_THREAD_POOL_ENABLE_NATIVE_HANDLES` is defined. + * + * @return The native thread handles. + */ + [[nodiscard]] std::vector get_native_handles() const + { + std::vector native_handles(thread_count); + for (concurrency_t i = 0; i < thread_count; ++i) + { + native_handles[i] = threads[i].native_handle(); + } + return native_handles; + } +#endif + + /** + * @brief Get the number of tasks currently waiting in the queue to be executed by the threads. + * + * @return The number of queued tasks. + */ + [[nodiscard]] size_t get_tasks_queued() const + { + const std::scoped_lock tasks_lock(tasks_mutex); + return tasks.size(); + } + + /** + * @brief Get the number of tasks currently being executed by the threads. + * + * @return The number of running tasks. + */ + [[nodiscard]] size_t get_tasks_running() const + { + const std::scoped_lock tasks_lock(tasks_mutex); + return tasks_running; + } + + /** + * @brief Get the total number of unfinished tasks: either still waiting in the queue, or running in a thread. Note that `get_tasks_total() == get_tasks_queued() + get_tasks_running()`. + * + * @return The total number of tasks. + */ + [[nodiscard]] size_t get_tasks_total() const + { + const std::scoped_lock tasks_lock(tasks_mutex); + return tasks_running + tasks.size(); + } + + /** + * @brief Get the number of threads in the pool. + * + * @return The number of threads. + */ + [[nodiscard]] concurrency_t get_thread_count() const + { + return thread_count; + } + + /** + * @brief Get a vector containing the unique identifiers for each of the pool's threads, as obtained by `std::thread::get_id()`. + * + * @return The unique thread identifiers. + */ + [[nodiscard]] std::vector get_thread_ids() const + { + std::vector thread_ids(thread_count); + for (concurrency_t i = 0; i < thread_count; ++i) + { + thread_ids[i] = threads[i].get_id(); + } + return thread_ids; + } + +#ifdef BS_THREAD_POOL_ENABLE_PAUSE + /** + * @brief Check whether the pool is currently paused. Only enabled if `BS_THREAD_POOL_ENABLE_PAUSE` is defined. + * + * @return `true` if the pool is paused, `false` if it is not paused. + */ + [[nodiscard]] bool is_paused() const + { + const std::scoped_lock tasks_lock(tasks_mutex); + return paused; + } + + /** + * @brief Pause the pool. The workers will temporarily stop retrieving new tasks out of the queue, although any tasks already executed will keep running until they are finished. Only enabled if `BS_THREAD_POOL_ENABLE_PAUSE` is defined. + */ + void pause() + { + const std::scoped_lock tasks_lock(tasks_mutex); + paused = true; + } +#endif + + /** + * @brief Purge all the tasks waiting in the queue. Tasks that are currently running will not be affected, but any tasks still waiting in the queue will be discarded, and will never be executed by the threads. Please note that there is no way to restore the purged tasks. + */ + void purge() + { + const std::scoped_lock tasks_lock(tasks_mutex); + while (!tasks.empty()) + tasks.pop(); + } + + /** + * @brief Submit a function with no arguments and no return value into the task queue, with the specified priority. To push a function with arguments, enclose it in a lambda expression. Does not return a future, so the user must use `wait()` or some other method to ensure that the task finishes executing, otherwise bad things will happen. + * + * @tparam F The type of the function. + * @param task The function to push. + * @param priority The priority of the task. Should be between -32,768 and 32,767 (a signed 16-bit integer). The default is 0. Only enabled if `BS_THREAD_POOL_ENABLE_PRIORITY` is defined. + */ + template + void detach_task(F&& task BS_THREAD_POOL_PRIORITY_INPUT) + { + { + const std::scoped_lock tasks_lock(tasks_mutex); + tasks.emplace(std::forward(task) BS_THREAD_POOL_PRIORITY_OUTPUT); + } + task_available_cv.notify_one(); + } + + /** + * @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue, with the specified priority. The block function takes two arguments, the start and end of the block, so that it is only called only once per block, but it is up to the user make sure the block function correctly deals with all the indices in each block. Does not return a `multi_future`, so the user must use `wait()` or some other method to ensure that the loop finishes executing, otherwise bad things will happen. + * + * @tparam T The type of the indices. Should be a signed or unsigned integer. + * @tparam F The type of the function to loop through. + * @param first_index The first index in the loop. + * @param index_after_last The index after the last index in the loop. The loop will iterate from `first_index` to `(index_after_last - 1)` inclusive. In other words, it will be equivalent to `for (T i = first_index; i < index_after_last; ++i)`. Note that if `index_after_last <= first_index`, no blocks will be submitted. + * @param block A function that will be called once per block. Should take exactly two arguments: the first index in the block and the index after the last index in the block. `block(start, end)` should typically involve a loop of the form `for (T i = start; i < end; ++i)`. + * @param num_blocks The maximum number of blocks to split the loop into. The default is 0, which means the number of blocks will be equal to the number of threads in the pool. + * @param priority The priority of the tasks. Should be between -32,768 and 32,767 (a signed 16-bit integer). The default is 0. Only enabled if `BS_THREAD_POOL_ENABLE_PRIORITY` is defined. + */ + template + void detach_blocks(const T first_index, const T index_after_last, F&& block, const size_t num_blocks = 0 BS_THREAD_POOL_PRIORITY_INPUT) + { + if (index_after_last > first_index) + { + const blocks blks(first_index, index_after_last, num_blocks ? num_blocks : thread_count); + for (size_t blk = 0; blk < blks.get_num_blocks(); ++blk) + detach_task( + [block = std::forward(block), start = blks.start(blk), end = blks.end(blk)] + { + block(start, end); + } BS_THREAD_POOL_PRIORITY_OUTPUT); + } + } + + /** + * @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue, with the specified priority. The loop function takes one argument, the loop index, so that it is called many times per block. Does not return a `multi_future`, so the user must use `wait()` or some other method to ensure that the loop finishes executing, otherwise bad things will happen. + * + * @tparam T The type of the indices. Should be a signed or unsigned integer. + * @tparam F The type of the function to loop through. + * @param first_index The first index in the loop. + * @param index_after_last The index after the last index in the loop. The loop will iterate from `first_index` to `(index_after_last - 1)` inclusive. In other words, it will be equivalent to `for (T i = first_index; i < index_after_last; ++i)`. Note that if `index_after_last <= first_index`, no blocks will be submitted. + * @param loop The function to loop through. Will be called once per index, many times per block. Should take exactly one argument: the loop index. + * @param num_blocks The maximum number of blocks to split the loop into. The default is 0, which means the number of blocks will be equal to the number of threads in the pool. + * @param priority The priority of the tasks. Should be between -32,768 and 32,767 (a signed 16-bit integer). The default is 0. Only enabled if `BS_THREAD_POOL_ENABLE_PRIORITY` is defined. + */ + template + void detach_loop(const T first_index, const T index_after_last, F&& loop, const size_t num_blocks = 0 BS_THREAD_POOL_PRIORITY_INPUT) + { + if (index_after_last > first_index) + { + const blocks blks(first_index, index_after_last, num_blocks ? num_blocks : thread_count); + for (size_t blk = 0; blk < blks.get_num_blocks(); ++blk) + detach_task( + [loop = std::forward(loop), start = blks.start(blk), end = blks.end(blk)] + { + for (T i = start; i < end; ++i) + loop(i); + } BS_THREAD_POOL_PRIORITY_OUTPUT); + } + } + + /** + * @brief Submit a sequence of tasks enumerated by indices to the queue, with the specified priority. Does not return a `multi_future`, so the user must use `wait()` or some other method to ensure that the sequence finishes executing, otherwise bad things will happen. + * + * @tparam T The type of the indices. Should be a signed or unsigned integer. + * @tparam F The type of the function used to define the sequence. + * @param first_index The first index in the sequence. + * @param index_after_last The index after the last index in the sequence. The sequence will iterate from `first_index` to `(index_after_last - 1)` inclusive. In other words, it will be equivalent to `for (T i = first_index; i < index_after_last; ++i)`. Note that if `index_after_last <= first_index`, no tasks will be submitted. + * @param sequence The function used to define the sequence. Will be called once per index. Should take exactly one argument, the index. + * @param priority The priority of the tasks. Should be between -32,768 and 32,767 (a signed 16-bit integer). The default is 0. Only enabled if `BS_THREAD_POOL_ENABLE_PRIORITY` is defined. + */ + template + void detach_sequence(const T first_index, const T index_after_last, F&& sequence BS_THREAD_POOL_PRIORITY_INPUT) + { + for (T i = first_index; i < index_after_last; ++i) + detach_task( + [sequence = std::forward(sequence), i] + { + sequence(i); + } BS_THREAD_POOL_PRIORITY_OUTPUT); + } + + /** + * @brief Reset the pool with the total number of hardware threads available, as reported by the implementation. Waits for all currently running tasks to be completed, then destroys all threads in the pool and creates a new thread pool with the new number of threads. Any tasks that were waiting in the queue before the pool was reset will then be executed by the new threads. If the pool was paused before resetting it, the new pool will be paused as well. + */ + void reset() + { + reset(0, [] {}); + } + + /** + * @brief Reset the pool with a new number of threads. Waits for all currently running tasks to be completed, then destroys all threads in the pool and creates a new thread pool with the new number of threads. Any tasks that were waiting in the queue before the pool was reset will then be executed by the new threads. If the pool was paused before resetting it, the new pool will be paused as well. + * + * @param num_threads The number of threads to use. + */ + void reset(const concurrency_t num_threads) + { + reset(num_threads, [] {}); + } + + /** + * @brief Reset the pool with the total number of hardware threads available, as reported by the implementation, and a new initialization function. Waits for all currently running tasks to be completed, then destroys all threads in the pool and creates a new thread pool with the new number of threads and initialization function. Any tasks that were waiting in the queue before the pool was reset will then be executed by the new threads. If the pool was paused before resetting it, the new pool will be paused as well. + * + * @param init_task An initialization function to run in each thread before it starts to execute any submitted tasks. The function must take no arguments and have no return value. It will only be executed exactly once, when the thread is first constructed. + */ + void reset(const std::function& init_task) + { + reset(0, init_task); + } + + /** + * @brief Reset the pool with a new number of threads and a new initialization function. Waits for all currently running tasks to be completed, then destroys all threads in the pool and creates a new thread pool with the new number of threads and initialization function. Any tasks that were waiting in the queue before the pool was reset will then be executed by the new threads. If the pool was paused before resetting it, the new pool will be paused as well. + * + * @param num_threads The number of threads to use. + * @param init_task An initialization function to run in each thread before it starts to execute any submitted tasks. The function must take no arguments and have no return value. It will only be executed exactly once, when the thread is first constructed. + */ + void reset(const concurrency_t num_threads, const std::function& init_task) + { +#ifdef BS_THREAD_POOL_ENABLE_PAUSE + std::unique_lock tasks_lock(tasks_mutex); + const bool was_paused = paused; + paused = true; + tasks_lock.unlock(); +#endif + wait(); + destroy_threads(); + thread_count = determine_thread_count(num_threads); + threads = std::make_unique(thread_count); + create_threads(init_task); +#ifdef BS_THREAD_POOL_ENABLE_PAUSE + tasks_lock.lock(); + paused = was_paused; +#endif + } + + /** + * @brief Submit a function with no arguments into the task queue, with the specified priority. To submit a function with arguments, enclose it in a lambda expression. If the function has a return value, get a future for the eventual returned value. If the function has no return value, get an `std::future` which can be used to wait until the task finishes. + * + * @tparam F The type of the function. + * @tparam R The return type of the function (can be `void`). + * @param task The function to submit. + * @param priority The priority of the task. Should be between -32,768 and 32,767 (a signed 16-bit integer). The default is 0. Only enabled if `BS_THREAD_POOL_ENABLE_PRIORITY` is defined. + * @return A future to be used later to wait for the function to finish executing and/or obtain its returned value if it has one. + */ + template >> + [[nodiscard]] std::future submit_task(F&& task BS_THREAD_POOL_PRIORITY_INPUT) + { + const std::shared_ptr> task_promise = std::make_shared>(); + detach_task( + [task = std::forward(task), task_promise] + { +#ifndef BS_THREAD_POOL_DISABLE_EXCEPTION_HANDLING + try + { +#endif + if constexpr (std::is_void_v) + { + task(); + task_promise->set_value(); + } + else + { + task_promise->set_value(task()); + } +#ifndef BS_THREAD_POOL_DISABLE_EXCEPTION_HANDLING + } + catch (...) + { + try + { + task_promise->set_exception(std::current_exception()); + } + catch (...) + { + } + } +#endif + } BS_THREAD_POOL_PRIORITY_OUTPUT); + return task_promise->get_future(); + } + + /** + * @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue, with the specified priority. The block function takes two arguments, the start and end of the block, so that it is only called only once per block, but it is up to the user make sure the block function correctly deals with all the indices in each block. Returns a `multi_future` that contains the futures for all of the blocks. + * + * @tparam T The type of the indices. Should be a signed or unsigned integer. + * @tparam F The type of the function to loop through. + * @tparam R The return type of the function to loop through (can be `void`). + * @param first_index The first index in the loop. + * @param index_after_last The index after the last index in the loop. The loop will iterate from `first_index` to `(index_after_last - 1)` inclusive. In other words, it will be equivalent to `for (T i = first_index; i < index_after_last; ++i)`. Note that if `index_after_last <= first_index`, no blocks will be submitted, and an empty `multi_future` will be returned. + * @param block A function that will be called once per block. Should take exactly two arguments: the first index in the block and the index after the last index in the block. `block(start, end)` should typically involve a loop of the form `for (T i = start; i < end; ++i)`. + * @param num_blocks The maximum number of blocks to split the loop into. The default is 0, which means the number of blocks will be equal to the number of threads in the pool. + * @param priority The priority of the tasks. Should be between -32,768 and 32,767 (a signed 16-bit integer). The default is 0. Only enabled if `BS_THREAD_POOL_ENABLE_PRIORITY` is defined. + * @return A `multi_future` that can be used to wait for all the blocks to finish. If the block function returns a value, the `multi_future` can also be used to obtain the values returned by each block. + */ + template , T, T>> + [[nodiscard]] multi_future submit_blocks(const T first_index, const T index_after_last, F&& block, const size_t num_blocks = 0 BS_THREAD_POOL_PRIORITY_INPUT) + { + if (index_after_last > first_index) + { + const blocks blks(first_index, index_after_last, num_blocks ? num_blocks : thread_count); + multi_future future; + future.reserve(blks.get_num_blocks()); + for (size_t blk = 0; blk < blks.get_num_blocks(); ++blk) + future.push_back(submit_task( + [block = std::forward(block), start = blks.start(blk), end = blks.end(blk)] + { + return block(start, end); + } BS_THREAD_POOL_PRIORITY_OUTPUT)); + return future; + } + return {}; + } + + /** + * @brief Parallelize a loop by automatically splitting it into blocks and submitting each block separately to the queue, with the specified priority. The loop function takes one argument, the loop index, so that it is called many times per block. It must have no return value. Returns a `multi_future` that contains the futures for all of the blocks. + * + * @tparam T The type of the indices. Should be a signed or unsigned integer. + * @tparam F The type of the function to loop through. + * @param first_index The first index in the loop. + * @param index_after_last The index after the last index in the loop. The loop will iterate from `first_index` to `(index_after_last - 1)` inclusive. In other words, it will be equivalent to `for (T i = first_index; i < index_after_last; ++i)`. Note that if `index_after_last <= first_index`, no tasks will be submitted, and an empty `multi_future` will be returned. + * @param loop The function to loop through. Will be called once per index, many times per block. Should take exactly one argument: the loop index. It cannot have a return value. + * @param num_blocks The maximum number of blocks to split the loop into. The default is 0, which means the number of blocks will be equal to the number of threads in the pool. + * @param priority The priority of the tasks. Should be between -32,768 and 32,767 (a signed 16-bit integer). The default is 0. Only enabled if `BS_THREAD_POOL_ENABLE_PRIORITY` is defined. + * @return A `multi_future` that can be used to wait for all the blocks to finish. + */ + template + [[nodiscard]] multi_future submit_loop(const T first_index, const T index_after_last, F&& loop, const size_t num_blocks = 0 BS_THREAD_POOL_PRIORITY_INPUT) + { + if (index_after_last > first_index) + { + const blocks blks(first_index, index_after_last, num_blocks ? num_blocks : thread_count); + multi_future future; + future.reserve(blks.get_num_blocks()); + for (size_t blk = 0; blk < blks.get_num_blocks(); ++blk) + future.push_back(submit_task( + [loop = std::forward(loop), start = blks.start(blk), end = blks.end(blk)] + { + for (T i = start; i < end; ++i) + loop(i); + } BS_THREAD_POOL_PRIORITY_OUTPUT)); + return future; + } + return {}; + } + + /** + * @brief Submit a sequence of tasks enumerated by indices to the queue, with the specified priority. Returns a `multi_future` that contains the futures for all of the tasks. + * + * @tparam T The type of the indices. Should be a signed or unsigned integer. + * @tparam F The type of the function used to define the sequence. + * @tparam R The return type of the function used to define the sequence (can be `void`). + * @param first_index The first index in the sequence. + * @param index_after_last The index after the last index in the sequence. The sequence will iterate from `first_index` to `(index_after_last - 1)` inclusive. In other words, it will be equivalent to `for (T i = first_index; i < index_after_last; ++i)`. Note that if `index_after_last <= first_index`, no tasks will be submitted, and an empty `multi_future` will be returned. + * @param sequence The function used to define the sequence. Will be called once per index. Should take exactly one argument, the index. + * @param priority The priority of the tasks. Should be between -32,768 and 32,767 (a signed 16-bit integer). The default is 0. Only enabled if `BS_THREAD_POOL_ENABLE_PRIORITY` is defined. + * @return A `multi_future` that can be used to wait for all the tasks to finish. If the sequence function returns a value, the `multi_future` can also be used to obtain the values returned by each task. + */ + template , T>> + [[nodiscard]] multi_future submit_sequence(const T first_index, const T index_after_last, F&& sequence BS_THREAD_POOL_PRIORITY_INPUT) + { + if (index_after_last > first_index) + { + multi_future future; + future.reserve(static_cast(index_after_last - first_index)); + for (T i = first_index; i < index_after_last; ++i) + future.push_back(submit_task( + [sequence = std::forward(sequence), i] + { + return sequence(i); + } BS_THREAD_POOL_PRIORITY_OUTPUT)); + return future; + } + return {}; + } + +#ifdef BS_THREAD_POOL_ENABLE_PAUSE + /** + * @brief Unpause the pool. The workers will resume retrieving new tasks out of the queue. Only enabled if `BS_THREAD_POOL_ENABLE_PAUSE` is defined. + */ + void unpause() + { + { + const std::scoped_lock tasks_lock(tasks_mutex); + paused = false; + } + task_available_cv.notify_all(); + } +#endif + +// Macros used internally to enable or disable pausing in the waiting and worker functions. +#ifdef BS_THREAD_POOL_ENABLE_PAUSE + #define BS_THREAD_POOL_PAUSED_OR_EMPTY (paused || tasks.empty()) +#else + #define BS_THREAD_POOL_PAUSED_OR_EMPTY tasks.empty() +#endif + + /** + * @brief Wait for tasks to be completed. Normally, this function waits for all tasks, both those that are currently running in the threads and those that are still waiting in the queue. However, if the pool is paused, this function only waits for the currently running tasks (otherwise it would wait forever). Note: To wait for just one specific task, use `submit_task()` instead, and call the `wait()` member function of the generated future. + * + * @throws `wait_deadlock` if called from within a thread of the same pool, which would result in a deadlock. Only enabled if `BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK` is defined. + */ + void wait() + { +#ifdef BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK + if (this_thread::get_pool() == this) + throw wait_deadlock(); +#endif + std::unique_lock tasks_lock(tasks_mutex); + waiting = true; + tasks_done_cv.wait(tasks_lock, + [this] + { + return (tasks_running == 0) && BS_THREAD_POOL_PAUSED_OR_EMPTY; + }); + waiting = false; + } + + /** + * @brief Wait for tasks to be completed, but stop waiting after the specified duration has passed. + * + * @tparam R An arithmetic type representing the number of ticks to wait. + * @tparam P An `std::ratio` representing the length of each tick in seconds. + * @param duration The amount of time to wait. + * @return `true` if all tasks finished running, `false` if the duration expired but some tasks are still running. + * + * @throws `wait_deadlock` if called from within a thread of the same pool, which would result in a deadlock. Only enabled if `BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK` is defined. + */ + template + bool wait_for(const std::chrono::duration& duration) + { +#ifdef BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK + if (this_thread::get_pool() == this) + throw wait_deadlock(); +#endif + std::unique_lock tasks_lock(tasks_mutex); + waiting = true; + const bool status = tasks_done_cv.wait_for(tasks_lock, duration, + [this] + { + return (tasks_running == 0) && BS_THREAD_POOL_PAUSED_OR_EMPTY; + }); + waiting = false; + return status; + } + + /** + * @brief Wait for tasks to be completed, but stop waiting after the specified time point has been reached. + * + * @tparam C The type of the clock used to measure time. + * @tparam D An `std::chrono::duration` type used to indicate the time point. + * @param timeout_time The time point at which to stop waiting. + * @return `true` if all tasks finished running, `false` if the time point was reached but some tasks are still running. + * + * @throws `wait_deadlock` if called from within a thread of the same pool, which would result in a deadlock. Only enabled if `BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK` is defined. + */ + template + bool wait_until(const std::chrono::time_point& timeout_time) + { +#ifdef BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK + if (this_thread::get_pool() == this) + throw wait_deadlock(); +#endif + std::unique_lock tasks_lock(tasks_mutex); + waiting = true; + const bool status = tasks_done_cv.wait_until(tasks_lock, timeout_time, + [this] + { + return (tasks_running == 0) && BS_THREAD_POOL_PAUSED_OR_EMPTY; + }); + waiting = false; + return status; + } + +#ifdef BS_THREAD_POOL_ENABLE_WAIT_DEADLOCK_CHECK + // ============== + // Public classes + // ============== + + /** + * @brief An exception that will be thrown by `wait()`, `wait_for()`, and `wait_until()` if the user tries to call them from within a thread of the same pool, which would result in a deadlock. + */ + struct wait_deadlock : public std::runtime_error + { + wait_deadlock() : std::runtime_error("BS::thread_pool::wait_deadlock"){}; + }; +#endif + +private: + // ======================== + // Private member functions + // ======================== + + /** + * @brief Create the threads in the pool and assign a worker to each thread. + * + * @param init_task An initialization function to run in each thread before it starts to execute any submitted tasks. + */ + void create_threads(const std::function& init_task) + { + { + const std::scoped_lock tasks_lock(tasks_mutex); + tasks_running = thread_count; + workers_running = true; + } + for (concurrency_t i = 0; i < thread_count; ++i) + { + threads[i] = std::thread(&thread_pool::worker, this, i, init_task); + } + } + + /** + * @brief Destroy the threads in the pool. + */ + void destroy_threads() + { + { + const std::scoped_lock tasks_lock(tasks_mutex); + workers_running = false; + } + task_available_cv.notify_all(); + for (concurrency_t i = 0; i < thread_count; ++i) + { + threads[i].join(); + } + } + + /** + * @brief Determine how many threads the pool should have, based on the parameter passed to the constructor or reset(). + * + * @param num_threads The parameter passed to the constructor or `reset()`. If the parameter is a positive number, then the pool will be created with this number of threads. If the parameter is non-positive, or a parameter was not supplied (in which case it will have the default value of 0), then the pool will be created with the total number of hardware threads available, as obtained from `std::thread::hardware_concurrency()`. If the latter returns zero for some reason, then the pool will be created with just one thread. + * @return The number of threads to use for constructing the pool. + */ + [[nodiscard]] static concurrency_t determine_thread_count(const concurrency_t num_threads) + { + if (num_threads > 0) + return num_threads; + if (std::thread::hardware_concurrency() > 0) + return std::thread::hardware_concurrency(); + return 1; + } + + /** + * @brief A worker function to be assigned to each thread in the pool. Waits until it is notified by `detach_task()` that a task is available, and then retrieves the task from the queue and executes it. Once the task finishes, the worker notifies `wait()` in case it is waiting. + * + * @param idx The index of this thread. + * @param init_task An initialization function to run in this thread before it starts to execute any submitted tasks. + */ + void worker(const concurrency_t idx, const std::function& init_task) + { + this_thread::get_index.index = idx; + this_thread::get_pool.pool = this; + init_task(); + std::unique_lock tasks_lock(tasks_mutex); + while (true) + { + --tasks_running; + tasks_lock.unlock(); + if (waiting && (tasks_running == 0) && BS_THREAD_POOL_PAUSED_OR_EMPTY) + tasks_done_cv.notify_all(); + tasks_lock.lock(); + task_available_cv.wait(tasks_lock, + [this] + { + return !BS_THREAD_POOL_PAUSED_OR_EMPTY || !workers_running; + }); + if (!workers_running) + break; + { +#ifdef BS_THREAD_POOL_ENABLE_PRIORITY + const std::function task = std::move(std::remove_const_t(tasks.top()).task); + tasks.pop(); +#else + const std::function task = std::move(tasks.front()); + tasks.pop(); +#endif + ++tasks_running; + tasks_lock.unlock(); + task(); + } + tasks_lock.lock(); + } + this_thread::get_index.index = std::nullopt; + this_thread::get_pool.pool = std::nullopt; + } + + // =============== + // Private classes + // =============== + + /** + * @brief A helper class to divide a range into blocks. Used by `detach_blocks()`, `submit_blocks()`, `detach_loop()`, and `submit_loop()`. + * + * @tparam T The type of the indices. Should be a signed or unsigned integer. + */ + template + class [[nodiscard]] blocks + { + public: + /** + * @brief Construct a `blocks` object with the given specifications. + * + * @param first_index_ The first index in the range. + * @param index_after_last_ The index after the last index in the range. + * @param num_blocks_ The desired number of blocks to divide the range into. + */ + blocks(const T first_index_, const T index_after_last_, const size_t num_blocks_) : first_index(first_index_), index_after_last(index_after_last_), num_blocks(num_blocks_) + { + if (index_after_last > first_index) + { + const size_t total_size = static_cast(index_after_last - first_index); + if (num_blocks > total_size) + num_blocks = total_size; + block_size = total_size / num_blocks; + remainder = total_size % num_blocks; + if (block_size == 0) + { + block_size = 1; + num_blocks = (total_size > 1) ? total_size : 1; + } + } + else + { + num_blocks = 0; + } + } + + /** + * @brief Get the first index of a block. + * + * @param block The block number. + * @return The first index. + */ + [[nodiscard]] T start(const size_t block) const + { + return first_index + static_cast(block * block_size) + static_cast(block < remainder ? block : remainder); + } + + /** + * @brief Get the index after the last index of a block. + * + * @param block The block number. + * @return The index after the last index. + */ + [[nodiscard]] T end(const size_t block) const + { + return (block == num_blocks - 1) ? index_after_last : start(block + 1); + } + + /** + * @brief Get the number of blocks. Note that this may be different than the desired number of blocks that was passed to the constructor. + * + * @return The number of blocks. + */ + [[nodiscard]] size_t get_num_blocks() const + { + return num_blocks; + } + + private: + /** + * @brief The size of each block (except possibly the last block). + */ + size_t block_size = 0; + + /** + * @brief The first index in the range. + */ + T first_index = 0; + + /** + * @brief The index after the last index in the range. + */ + T index_after_last = 0; + + /** + * @brief The number of blocks. + */ + size_t num_blocks = 0; + + /** + * @brief The remainder obtained after dividing the total size by the number of blocks. + */ + size_t remainder = 0; + }; // class blocks + +#ifdef BS_THREAD_POOL_ENABLE_PRIORITY + /** + * @brief A helper class to store a task with an assigned priority. + */ + class [[nodiscard]] pr_task + { + friend class thread_pool; + + public: + /** + * @brief Construct a new task with an assigned priority by copying the task. + * + * @param task_ The task. + * @param priority_ The desired priority. + */ + explicit pr_task(const std::function& task_, const priority_t priority_ = 0) : task(task_), priority(priority_) {} + + /** + * @brief Construct a new task with an assigned priority by moving the task. + * + * @param task_ The task. + * @param priority_ The desired priority. + */ + explicit pr_task(std::function&& task_, const priority_t priority_ = 0) : task(std::move(task_)), priority(priority_) {} + + /** + * @brief Compare the priority of two tasks. + * + * @param lhs The first task. + * @param rhs The second task. + * @return `true` if the first task has a lower priority than the second task, `false` otherwise. + */ + [[nodiscard]] friend bool operator<(const pr_task& lhs, const pr_task& rhs) + { + return lhs.priority < rhs.priority; + } + + private: + /** + * @brief The task. + */ + std::function task = {}; + + /** + * @brief The priority of the task. + */ + priority_t priority = 0; + }; // class pr_task +#endif + + // ============ + // Private data + // ============ + +#ifdef BS_THREAD_POOL_ENABLE_PAUSE + /** + * @brief A flag indicating whether the workers should pause. When set to `true`, the workers temporarily stop retrieving new tasks out of the queue, although any tasks already executed will keep running until they are finished. When set to `false` again, the workers resume retrieving tasks. + */ + bool paused = false; +#endif + + /** + * @brief A condition variable to notify `worker()` that a new task has become available. + */ + std::condition_variable task_available_cv = {}; + + /** + * @brief A condition variable to notify `wait()` that the tasks are done. + */ + std::condition_variable tasks_done_cv = {}; + + /** + * @brief A queue of tasks to be executed by the threads. + */ +#ifdef BS_THREAD_POOL_ENABLE_PRIORITY + std::priority_queue tasks = {}; +#else + std::queue> tasks = {}; +#endif + + /** + * @brief A counter for the total number of currently running tasks. + */ + size_t tasks_running = 0; + + /** + * @brief A mutex to synchronize access to the task queue by different threads. + */ + mutable std::mutex tasks_mutex = {}; + + /** + * @brief The number of threads in the pool. + */ + concurrency_t thread_count = 0; + + /** + * @brief A smart pointer to manage the memory allocated for the threads. + */ + std::unique_ptr threads = nullptr; + + /** + * @brief A flag indicating that `wait()` is active and expects to be notified whenever a task is done. + */ + bool waiting = false; + + /** + * @brief A flag indicating to the workers to keep running. When set to `false`, the workers terminate permanently. + */ + bool workers_running = false; +}; // class thread_pool +} // namespace BS +#endif \ No newline at end of file diff --git a/setup.py b/setup.py index 257be99..3c23286 100644 --- a/setup.py +++ b/setup.py @@ -57,7 +57,6 @@ def check_pthread_compatibility(): "csrc/async_file_io.cpp", "csrc/py_api.cpp", "csrc/pthread_backend.cpp", - "csrc/threadpool.cpp" ] extra_objects = [] define_macros = [] From 75cde33459006e251911e9b0fcf787f50eb3849f Mon Sep 17 00:00:00 2001 From: botbw Date: Mon, 23 Sep 2024 14:15:07 +0800 Subject: [PATCH 10/13] [pthread] pass test --- csrc/pthread_backend.cpp | 78 +++++++++++++++++---------------------- include/pthread_backend.h | 9 ++++- tests/test_adam.py | 6 +++ 3 files changed, 47 insertions(+), 46 deletions(-) diff --git a/csrc/pthread_backend.cpp b/csrc/pthread_backend.cpp index 2f2db9e..3ddb429 100644 --- a/csrc/pthread_backend.cpp +++ b/csrc/pthread_backend.cpp @@ -1,64 +1,42 @@ -#include -#include -#include -#include - -#include "asyncio.h" #include "pthread_backend.h" -#include void PthreadAsyncIO::write(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback) { auto fut = this->pool.submit_task( - [fd, buffer, n_bytes, offset, callback] { - int result = pwrite(fd, buffer, n_bytes, offset); - if (callback != nullptr) { - callback(); - } - return result; + [fd, buffer, n_bytes, offset] { + return pwrite(fd, buffer, n_bytes, offset); } ); - this->write_fut.push_back(std::move(fut)); + this->write_fut.push_back(std::make_tuple(std::move(fut), callback)); } -void PthreadAsyncIO::read(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback) { +void PthreadAsyncIO::writev(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback) { auto fut = this->pool.submit_task( - [fd, buffer, n_bytes, offset, callback] { - int result = pread(fd, buffer, n_bytes, offset); - if (callback != nullptr) { - callback(); - } - return result; + [fd, iov, iovcnt, offset] { + return pwritev(fd, iov, iovcnt, offset); } ); - this->read_fut.push_back(std::move(fut)); + this->write_fut.push_back(std::make_tuple(std::move(fut), callback)); } -void PthreadAsyncIO::readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback) { - auto fut = this->pool.submit_task( - [fd, iov, iovcnt, offset, callback] { - int result = preadv(fd, iov, iovcnt, offset); - if (callback != nullptr) { - callback(); - } - return result; +void PthreadAsyncIO::read(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback) { + auto fut = this->pool.submit_task( + [fd, buffer, n_bytes, offset] { + return pread(fd, buffer, n_bytes, offset); } ); - this->read_fut.push_back(std::move(fut)); + this->read_fut.push_back(std::make_tuple(std::move(fut), callback)); } -void PthreadAsyncIO::writev(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback) { - auto fut = this->pool.submit_task( - [fd, iov, iovcnt, offset, callback] { - int result = pwritev(fd, iov, iovcnt, offset); - if (callback != nullptr) { - callback(); - } - return result; +void PthreadAsyncIO::readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned long long offset, callback_t callback) { + auto fut = this->pool.submit_task( + [fd, iov, iovcnt, offset] { + return preadv(fd, iov, iovcnt, offset); } ); - this->write_fut.push_back(std::move(fut)); + this->read_fut.push_back(std::make_tuple(std::move(fut), callback)); } + void PthreadAsyncIO::get_event(WaitType wt) { if (wt == NOWAIT) return; this->sync_write_events(); @@ -68,19 +46,31 @@ void PthreadAsyncIO::get_event(WaitType wt) { void PthreadAsyncIO::sync_write_events() { while (this->write_fut.size() > 0) { - std::cout << "hi:" << this->write_fut.size() << std::endl; auto front = std::move(this->write_fut.front()); this->write_fut.pop_front(); - front.wait(); + + std::future fut(std::move(std::get<0>(front))); + fut.wait(); + + callback_t callback = std::get<1>(front); + if (callback != nullptr) { + callback(); + } } } void PthreadAsyncIO::sync_read_events() { while (this->read_fut.size() > 0) { - std::cout << "ho:" << this->read_fut.size() << std::endl; auto front = std::move(this->read_fut.front()); this->read_fut.pop_front(); - front.wait(); + + std::future fut(std::move(std::get<0>(front))); + fut.wait(); + + callback_t callback = std::get<1>(front); + if (callback != nullptr) { + callback(); + } } } diff --git a/include/pthread_backend.h b/include/pthread_backend.h index 2133ad6..b41d443 100644 --- a/include/pthread_backend.h +++ b/include/pthread_backend.h @@ -2,9 +2,14 @@ #include #include +#include +#include #include #include #include +#include +#include + #include "asyncio.h" #include "threadpool.hpp" @@ -13,8 +18,8 @@ class PthreadAsyncIO : public AsyncIO { private: BS::thread_pool pool; - std::deque> write_fut; - std::deque> read_fut; + std::deque, callback_t>> write_fut; + std::deque, callback_t>> read_fut; public: PthreadAsyncIO(unsigned int n_entries) diff --git a/tests/test_adam.py b/tests/test_adam.py index 59f1363..a4cf83d 100644 --- a/tests/test_adam.py +++ b/tests/test_adam.py @@ -208,6 +208,12 @@ def test_adam(): {'n_entries': 1, 'backend': 'aio', 'prefetch': 0, 'vecio': True}, {'n_entries': 8, 'backend': 'aio', 'prefetch': 2, 'vecio': True}, + + {'n_entries': 1, 'backend': 'pthread', 'prefetch': 0, 'vecio': False}, + {'n_entries': 8, 'backend': 'pthread', 'prefetch': 2, 'vecio': False}, + + {'n_entries': 1, 'backend': 'pthread', 'prefetch': 0, 'vecio': True}, + {'n_entries': 8, 'backend': 'pthread', 'prefetch': 2, 'vecio': True}, ] for i, cfg in enumerate(test_config): From 2703e932e5376555651b563d623e1e398b61d31c Mon Sep 17 00:00:00 2001 From: botbw Date: Mon, 23 Sep 2024 14:22:22 +0800 Subject: [PATCH 11/13] [chore] remove unintended commit --- include/asyncio.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/include/asyncio.h b/include/asyncio.h index 8f27222..d7f6886 100644 --- a/include/asyncio.h +++ b/include/asyncio.h @@ -8,9 +8,7 @@ using callback_t = std::function; enum IOType { WRITE, - WRITEV, READ, - READV }; enum WaitType From 186e13930dcd638c345b9bcf890d7de2ebc89ab7 Mon Sep 17 00:00:00 2001 From: botbw Date: Mon, 23 Sep 2024 14:38:09 +0800 Subject: [PATCH 12/13] [chore] refine --- csrc/backend.cpp | 6 ++++-- csrc/pthread_backend.cpp | 10 ++++------ include/asyncio.h | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/csrc/backend.cpp b/csrc/backend.cpp index e00495d..98e40ff 100644 --- a/csrc/backend.cpp +++ b/csrc/backend.cpp @@ -53,12 +53,14 @@ void probe_asyncio(const std::string &backend) #else throw std::runtime_error("backend aio is not installed\n"); #endif - } else { + } else if (backend == "pthread") { #ifndef DISABLE_PTHREAD aio.reset(new PthreadAsyncIO(2)); #else throw std::runtime_error("backend pthread is not installed\n"); #endif + } else { + throw std::runtime_error("unknown backend"); } int fd = fileno(fp); @@ -88,7 +90,7 @@ void probe_asyncio(const std::string &backend) for (int i = 0; i < n_loop; i++) { for (int j = 0; j < n_len; j++) - { + { assert(text[i][j] == new_text[i][j]); } } diff --git a/csrc/pthread_backend.cpp b/csrc/pthread_backend.cpp index 3ddb429..601fc33 100644 --- a/csrc/pthread_backend.cpp +++ b/csrc/pthread_backend.cpp @@ -36,12 +36,10 @@ void PthreadAsyncIO::readv(int fd, const iovec *iov, unsigned int iovcnt, unsign this->read_fut.push_back(std::make_tuple(std::move(fut), callback)); } - void PthreadAsyncIO::get_event(WaitType wt) { if (wt == NOWAIT) return; this->sync_write_events(); this->sync_read_events(); - } void PthreadAsyncIO::sync_write_events() { @@ -49,10 +47,10 @@ void PthreadAsyncIO::sync_write_events() { auto front = std::move(this->write_fut.front()); this->write_fut.pop_front(); - std::future fut(std::move(std::get<0>(front))); + auto fut(std::move(std::get<0>(front))); fut.wait(); - callback_t callback = std::get<1>(front); + auto callback = std::get<1>(front); if (callback != nullptr) { callback(); } @@ -64,10 +62,10 @@ void PthreadAsyncIO::sync_read_events() { auto front = std::move(this->read_fut.front()); this->read_fut.pop_front(); - std::future fut(std::move(std::get<0>(front))); + auto fut(std::move(std::get<0>(front))); fut.wait(); - callback_t callback = std::get<1>(front); + auto callback = std::get<1>(front); if (callback != nullptr) { callback(); } diff --git a/include/asyncio.h b/include/asyncio.h index d7f6886..d479123 100644 --- a/include/asyncio.h +++ b/include/asyncio.h @@ -8,7 +8,7 @@ using callback_t = std::function; enum IOType { WRITE, - READ, + READ }; enum WaitType From 409958f310e38895d90201044fbb7f2bc2942125 Mon Sep 17 00:00:00 2001 From: botbw Date: Mon, 23 Sep 2024 14:59:30 +0800 Subject: [PATCH 13/13] [chore] license --- include/threadpool.hpp | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/include/threadpool.hpp b/include/threadpool.hpp index 14e764d..b4365bf 100644 --- a/include/threadpool.hpp +++ b/include/threadpool.hpp @@ -1,5 +1,29 @@ // Copied from https://github.com/bshoshany/thread-pool/blob/097aa718f25d44315cadb80b407144ad455ee4f9/include/BS_thread_pool.hpp +/* Original license + + MIT License + + Copyright (c) 2024 Barak Shoshany + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. +*/ #ifndef BS_THREAD_POOL_HPP #define BS_THREAD_POOL_HPP /**