From 3b277b3a2ef05ffb27a83ac5c315a9bebbc2e354 Mon Sep 17 00:00:00 2001 From: Burlen Loring Date: Thu, 24 Aug 2023 16:18:25 -0700 Subject: [PATCH 01/14] test temporal reduction steps_per_request command line argument --- test/CMakeLists.txt | 2 +- test/test_temporal_reduction.cpp | 22 ++++++++++++---------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index ea9d8d35f..94c942714 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1025,4 +1025,4 @@ teca_add_test(test_cpp_temporal_reduction SOURCES test_temporal_reduction.cpp LIBS teca_core teca_data teca_io teca_alg ${teca_test_link} COMMAND test_temporal_reduction - 360 180 5 24 1 1 1 monthly average test_cpp_temporal_reduction_%t%.nc yearly 1 1 1 1) + 360 180 5 24 1 1 1 monthly average 1 test_cpp_temporal_reduction_%t%.nc yearly 1 1 1 1) diff --git a/test/test_temporal_reduction.cpp b/test/test_temporal_reduction.cpp index 12af44e9c..99609b153 100644 --- a/test/test_temporal_reduction.cpp +++ b/test/test_temporal_reduction.cpp @@ -145,7 +145,7 @@ generate(size_t nx, size_t ny, size_t nz, size_t nt, size_t nxyzt = nxyz*nt; double *pf_xyzt; - std::tie(f_xyzt, pf_xyzt) = ::New(nxyzt, 0.0, allocator::malloc); + std::tie(f_xyzt, pf_xyzt) = ::New(nxyzt, allocator::malloc); double pi = 3.14159235658979; @@ -365,12 +365,12 @@ int main(int argc, char **argv) teca_system_interface::set_stack_trace_on_error(); teca_system_interface::set_stack_trace_on_mpi_error(); - if (argc != 16) + if (argc != 17) { std::cerr << "test_temporal_reduction [nx in] [ny in] [nz in]" " [steps per day] [num years]" " [num reduction threads] [threads per device]" - " [reduction interval] [reduction operator]" + " [reduction interval] [reduction operator] [steps per request]" " [out file] [file layout] [num writer threads]" " [nx out] [ny out] [nz out]" << std::endl; return -1; @@ -389,14 +389,15 @@ int main(int argc, char **argv) int threads_per_dev = atoi(argv[7]); std::string red_int = argv[8]; std::string red_op = argv[9]; + int steps_per_req = atoi(argv[10]); - std::string ofile_name = argv[10]; - std::string layout = argv[11]; - int n_wri_threads = atoi(argv[12]); + std::string ofile_name = argv[11]; + std::string layout = argv[12]; + int n_wri_threads = atoi(argv[13]); - unsigned long nx_out = atoi(argv[13]); - unsigned long ny_out = atoi(argv[14]); - unsigned long nz_out = atoi(argv[15]); + unsigned long nx_out = atoi(argv[14]); + unsigned long ny_out = atoi(argv[15]); + unsigned long nz_out = atoi(argv[16]); double T0 = 360.0; // period of 1 year double T1 = 1.0; // period of 1 day @@ -413,6 +414,7 @@ int main(int argc, char **argv) << " nt=" << nt << " reduce_threads=" << n_red_threads << " threads_per_dev=" << threads_per_dev << " red_int=" << red_int << " red_op=" << red_op + << " steps_per_req=" << steps_per_req << " layout=" << layout << " writer_threads=" << n_wri_threads << " nx_out=" << nx_out << " ny_out=" << ny_out << " nz_out=" << nz_out << std::endl; @@ -439,7 +441,7 @@ int main(int argc, char **argv) reduc->set_interval(red_int); reduc->set_operation(red_op); reduc->set_point_arrays({"f_t"}); - + reduc->set_steps_per_request(steps_per_req); // low res output mesh auto md = reduc->update_metadata(); From abcc371f2ae135caf9ea686dc3c76a04d5190f11 Mon Sep 17 00:00:00 2001 From: Burlen Loring Date: Thu, 24 Aug 2023 16:23:27 -0700 Subject: [PATCH 02/14] rename temporal reduction in memory test --- test/CMakeLists.txt | 8 +++----- ...eduction.cpp => test_cpp_temporal_reduction_inmem.cpp} | 0 2 files changed, 3 insertions(+), 5 deletions(-) rename test/{test_temporal_reduction.cpp => test_cpp_temporal_reduction_inmem.cpp} (100%) diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 94c942714..ce3f12562 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1019,10 +1019,8 @@ teca_add_test(test_ha4_connected_components_periodic COMMAND test_ha4_connected_components 256 256 14 1 1024 1 FEATURES ${TECA_HAS_CUDA}) - -teca_add_test(test_cpp_temporal_reduction - EXEC_NAME test_temporal_reduction - SOURCES test_temporal_reduction.cpp +teca_add_test(test_cpp_temporal_reduction_inmem + SOURCES test_cpp_temporal_reduction_inmem.cpp LIBS teca_core teca_data teca_io teca_alg ${teca_test_link} - COMMAND test_temporal_reduction + COMMAND test_cpp_temporal_reduction_inmem 360 180 5 24 1 1 1 monthly average 1 test_cpp_temporal_reduction_%t%.nc yearly 1 1 1 1) diff --git a/test/test_temporal_reduction.cpp b/test/test_cpp_temporal_reduction_inmem.cpp similarity index 100% rename from test/test_temporal_reduction.cpp rename to test/test_cpp_temporal_reduction_inmem.cpp From 81d4e2d00e8b65610d71dddce9867438f9aece9a Mon Sep 17 00:00:00 2001 From: Burlen Loring Date: Mon, 28 Aug 2023 09:17:12 -0700 Subject: [PATCH 03/14] threaded_algorithm expose ranks_per_device in API --- alg/teca_bayesian_ar_detect.cxx | 2 +- alg/teca_pytorch_algorithm.py | 2 +- core/teca_cpu_thread_pool.h | 2 +- core/teca_cuda_thread_pool.h | 31 ++++++++++------- core/teca_cuda_util.cu | 4 +-- core/teca_thread_util.cxx | 14 ++++---- core/teca_thread_util.h | 17 ++++++---- core/teca_threaded_algorithm.cxx | 41 +++++++++++++---------- core/teca_threaded_algorithm.h | 18 ++++++++-- python/teca_py_core.i | 8 ++--- test/python/test_thread_parameters.py | 2 +- test/python/test_thread_parameters_mpi.py | 8 ++--- 12 files changed, 89 insertions(+), 60 deletions(-) diff --git a/alg/teca_bayesian_ar_detect.cxx b/alg/teca_bayesian_ar_detect.cxx index a81bd2fee..30afad9a9 100644 --- a/alg/teca_bayesian_ar_detect.cxx +++ b/alg/teca_bayesian_ar_detect.cxx @@ -692,7 +692,7 @@ void teca_bayesian_ar_detect::set_modified() void teca_bayesian_ar_detect::set_thread_pool_size(int n) { this->internals->queue = new_teca_data_request_queue( - this->get_communicator(), n, -1, true, this->get_verbose()); + this->get_communicator(), n, -1, -1, true, this->get_verbose()); } // -------------------------------------------------------------------------- diff --git a/alg/teca_pytorch_algorithm.py b/alg/teca_pytorch_algorithm.py index c0f831b06..c8ae53b09 100644 --- a/alg/teca_pytorch_algorithm.py +++ b/alg/teca_pytorch_algorithm.py @@ -145,7 +145,7 @@ def initialize(self): n_threads = self.n_threads n_threads, affinity, device_ids = \ - thread_util.thread_parameters(comm, n_threads, 1, -1, + thread_util.thread_parameters(comm, n_threads, 1, -1, -1, 0 if self.verbose < 2 else 1) # let the user request a bound on the number of threads diff --git a/core/teca_cpu_thread_pool.h b/core/teca_cpu_thread_pool.h index 4041ffc3f..14b0e7523 100644 --- a/core/teca_cpu_thread_pool.h +++ b/core/teca_cpu_thread_pool.h @@ -123,7 +123,7 @@ void teca_cpu_thread_pool::create_threads(MPI_Comm comm, std::vector device_ids; if (teca_thread_util::thread_parameters(comm, -1, - n_requested, bind, -1, verbose, n_threads, core_ids, + n_requested, bind, -1, -1, verbose, n_threads, core_ids, device_ids)) { TECA_WARNING("Failed to detetermine thread parameters." diff --git a/core/teca_cuda_thread_pool.h b/core/teca_cuda_thread_pool.h index 1b702b906..874526e1d 100644 --- a/core/teca_cuda_thread_pool.h +++ b/core/teca_cuda_thread_pool.h @@ -67,18 +67,21 @@ class TECA_EXPORT teca_cuda_thread_pool * ranks running on the same node are taken into * account, resulting in 1 thread per core node wide. * - * @param[in] n_threads_per_device number of threads to assign to each CUDA - * device. If 0 no threads will service - * CUDA devices, this is CPU only mode. - * If -1 the default of 8 threads per - * device will be used. + * @param[in] threads_per_device number of threads to assign to each + * GPU/device. If 0 only CPUs will be used. + * If -1 the default of 8 threads per device + * will be used. + * + * @param[in] ranks_per_device the number of MPI ranks to allow access to + * to each device/GPU. * * @param[in] bind bind each thread to a specific core. * * @param[in] verbose print a report of the thread to core bindings */ teca_cuda_thread_pool(MPI_Comm comm, int n_threads, - int n_threads_per_device, bool bind, bool verbose); + int threads_per_device, int ranks_per_device, bool bind, + bool verbose); // get rid of copy and asignment TECA_ALGORITHM_DELETE_COPY_ASSIGN(teca_cuda_thread_pool) @@ -114,7 +117,8 @@ class TECA_EXPORT teca_cuda_thread_pool private: /// create n threads for the pool void create_threads(MPI_Comm comm, int n_threads, - int n_threads_per_device, bool bind, bool verbose); + int threads_per_device, int ranks_per_device, bool bind, + bool verbose); private: long m_num_futures; @@ -128,15 +132,18 @@ class TECA_EXPORT teca_cuda_thread_pool // -------------------------------------------------------------------------- template teca_cuda_thread_pool::teca_cuda_thread_pool(MPI_Comm comm, - int n_threads, int n_threads_per_device, bool bind, bool verbose) : m_live(true) + int n_threads, int threads_per_device, int ranks_per_device, bool bind, + bool verbose) : m_live(true) { - this->create_threads(comm, n_threads, n_threads_per_device, bind, verbose); + this->create_threads(comm, n_threads, threads_per_device, + ranks_per_device, bind, verbose); } // -------------------------------------------------------------------------- template void teca_cuda_thread_pool::create_threads(MPI_Comm comm, - int n_requested, int n_per_device, bool bind, bool verbose) + int n_requested, int threads_per_device, int ranks_per_device, bool bind, + bool verbose) { m_num_futures = 0; @@ -146,8 +153,8 @@ void teca_cuda_thread_pool::create_threads(MPI_Comm comm, std::deque core_ids; std::vector device_ids; - if (teca_thread_util::thread_parameters(comm, -1, - n_requested, n_per_device, bind, verbose, n_threads, + if (teca_thread_util::thread_parameters(comm, -1, n_requested, + threads_per_device, ranks_per_device, bind, verbose, n_threads, core_ids, device_ids)) { TECA_WARNING("Failed to detetermine thread parameters." diff --git a/core/teca_cuda_util.cu b/core/teca_cuda_util.cu index 434a1f0e2..1d5b4ee96 100644 --- a/core/teca_cuda_util.cu +++ b/core/teca_cuda_util.cu @@ -97,12 +97,12 @@ int get_local_cuda_devices(MPI_Comm comm, int &ranks_per_device, MPI_Comm_free(&node_comm); - return 0; } + else #endif - // without MPI this process can use all CUDA devices if (ranks_per_device != 0) { + // without MPI this process can use all CUDA devices for (int i = 0; i < n_node_dev; ++i) local_dev.push_back(i); } diff --git a/core/teca_thread_util.cxx b/core/teca_thread_util.cxx index 1ac210a11..f0eb9036b 100644 --- a/core/teca_thread_util.cxx +++ b/core/teca_thread_util.cxx @@ -339,11 +339,12 @@ int generate_report(MPI_Comm comm, int local_proc, int base_id, // ************************************************************************** int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested, - int n_per_device, bool bind, bool verbose, int &n_threads, - std::deque &affinity, std::vector &device_ids) + int threads_per_device, int ranks_per_device, bool bind, bool verbose, + int &n_threads, std::deque &affinity, std::vector &device_ids) { #if !defined(TECA_HAS_CUDA) - (void) n_per_device; + (void) threads_per_device; + (void) ranks_per_device; #endif // this rank is excluded from computations @@ -379,7 +380,6 @@ int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested, #if defined(TECA_HAS_CUDA) // check for an override to the default number of MPI ranks per device - int ranks_per_device = 1; int ranks_per_device_set = teca_system_util::get_environment_variable ("TECA_RANKS_PER_DEVICE", ranks_per_device); @@ -404,14 +404,14 @@ int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested, int default_per_device = 8; if (!teca_system_util::get_environment_variable - ("TECA_THREADS_PER_DEVICE", n_per_device) && + ("TECA_THREADS_PER_DEVICE", threads_per_device) && verbose && (rank == 0)) { - TECA_STATUS("TECA_THREADS_PER_DEVICE = " << n_per_device) + TECA_STATUS("TECA_THREADS_PER_DEVICE = " << threads_per_device) } int n_device_threads = n_cuda_devices * - (n_per_device < 0 ? default_per_device : n_per_device); + (threads_per_device < 0 ? default_per_device : threads_per_device); #endif diff --git a/core/teca_thread_util.h b/core/teca_thread_util.h index 52e4a9d79..33f0db276 100644 --- a/core/teca_thread_util.h +++ b/core/teca_thread_util.h @@ -41,13 +41,16 @@ namespace teca_thread_util * affinity map is also constructed. * * @param[in] n_threads_per_device the number of threads that should service - * GPUs. If 0 no threads will service GPUs the - * run will be CPU only. If -1 the default - * setting (8 threads per GPU) will be used. - * This can be overriden at runtime with the - * TECA_THREADS_PER_DEVICE environment + * GPUs. If 0 the run will be CPU only. If -1 + * the default setting (8 threads per GPU) will + * be used. This can be overriden at runtime + * with the TECA_THREADS_PER_DEVICE environment * variable. * + * @param[in] n_ranks_per_device the number of MPI ranks that should be allowed + * to access each GPU. MPI ranks not allowed to + * access a GPU will execute on the CPU. + * * @param[in] bind if true extra work is done to determine an affinity map such * that each thread can be bound to a unique core on the node. * @@ -83,8 +86,8 @@ namespace teca_thread_util */ TECA_EXPORT int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested, - int n_threads_per_device, bool bind, bool verbose, int &n_threads, - std::deque &affinity, std::vector &device_ids); + int n_threads_per_device, int n_ranks_per_device, bool bind, bool verbose, + int &n_threads, std::deque &affinity, std::vector &device_ids); }; #endif diff --git a/core/teca_threaded_algorithm.cxx b/core/teca_threaded_algorithm.cxx index b985afd32..c72c1dde4 100644 --- a/core/teca_threaded_algorithm.cxx +++ b/core/teca_threaded_algorithm.cxx @@ -23,13 +23,16 @@ // ************************************************************************** p_teca_data_request_queue new_teca_data_request_queue(MPI_Comm comm, - int n_threads, int n_threads_per_device, bool bind, bool verbose) + int n_threads, int threads_per_device, int ranks_per_device, bool bind, + bool verbose) { #if defined(TECA_HAS_CUDA) - return std::make_shared( - comm, n_threads, n_threads_per_device, bind, verbose); + return std::make_shared(comm, + n_threads, threads_per_device, ranks_per_device, bind, + verbose); #else - (void) n_threads_per_device; + (void) threads_per_device; + (void) ranks_per_device; return std::make_shared( comm, n_threads, bind, verbose); #endif @@ -69,7 +72,8 @@ class teca_threaded_algorithm_internals teca_threaded_algorithm_internals() {} void thread_pool_resize(MPI_Comm comm, int n_threads, - int n_threads_per_device, bool bind, bool verbose); + int threads_per_device, int ranks_per_device, bool bind, + bool verbose); unsigned int get_thread_pool_size() const noexcept { return this->thread_pool ? this->thread_pool->size() : 0; } @@ -80,10 +84,11 @@ class teca_threaded_algorithm_internals // -------------------------------------------------------------------------- void teca_threaded_algorithm_internals::thread_pool_resize(MPI_Comm comm, - int n_threads, int n_threads_per_device, bool bind, bool verbose) + int n_threads, int threads_per_device, int ranks_per_device, bool bind, + bool verbose) { - this->thread_pool = new_teca_data_request_queue( - comm, n_threads, n_threads_per_device, bind, verbose); + this->thread_pool = new_teca_data_request_queue(comm, + n_threads, threads_per_device, ranks_per_device, bind, verbose); } @@ -91,7 +96,7 @@ void teca_threaded_algorithm_internals::thread_pool_resize(MPI_Comm comm, // -------------------------------------------------------------------------- teca_threaded_algorithm::teca_threaded_algorithm() : bind_threads(1), stream_size(-1), poll_interval(1000000), - threads_per_device(-1), + threads_per_device(-1), ranks_per_device(1), internals(new teca_threaded_algorithm_internals) { } @@ -113,21 +118,21 @@ void teca_threaded_algorithm::get_properties_description( opts.add_options() TECA_POPTS_GET(int, prefix, bind_threads, "bind software threads to hardware cores") - TECA_POPTS_GET(int, prefix, verbose, - "print a run time report of settings") TECA_POPTS_GET(int, prefix, thread_pool_size, "number of threads in pool. When n == -1, 1 thread per core is " "created") TECA_POPTS_GET(int, prefix, stream_size, "number of datasests to pass per execute call. -1 means wait " "for all.") - TECA_POPTS_GET(long, prefix, poll_interval, + TECA_POPTS_GET(long long, prefix, poll_interval, "number of nanoseconds to wait between scans of the thread pool " "for completed tasks") TECA_POPTS_GET(int, prefix, threads_per_device, - "Sets the number of threads that service each CUDA GPU. If -1 the" - "default of 8 threads per CUDA GPU is used. If 0 no threads will" - "service GPUs (CPU only mode)") + "Sets the number of threads that service each CUDA GPU. If -1 the " + "default of 8 threads per CUDA GPU is used. If 0 only the CPU is used.") + TECA_POPTS_GET(int, prefix, ranks_per_device, + "Sets the number of threads that service each CUDA GPU. If -1 the " + "default of ranks allowed to access each GPU.") ; this->teca_algorithm::get_properties_description(prefix, opts); @@ -142,8 +147,9 @@ void teca_threaded_algorithm::set_properties(const std::string &prefix, this->teca_algorithm::set_properties(prefix, opts); TECA_POPTS_SET(opts, int, prefix, bind_threads) - TECA_POPTS_SET(opts, int, prefix, verbose) + TECA_POPTS_SET(opts, int, prefix, ranks_per_device) + // force update the the thread pool settings std::string opt_name = (prefix.empty()?"":prefix+"::") + "thread_pool_size"; if (opts.count(opt_name)) this->set_thread_pool_size(opts[opt_name].as()); @@ -155,7 +161,8 @@ void teca_threaded_algorithm::set_thread_pool_size(int n) { TECA_PROFILE_METHOD(128, this, "set_thread_pool_size", this->internals->thread_pool_resize(this->get_communicator(), - n, this->threads_per_device, this->bind_threads, this->verbose); + n, this->threads_per_device, this->ranks_per_device, + this->bind_threads, this->verbose); ) } diff --git a/core/teca_threaded_algorithm.h b/core/teca_threaded_algorithm.h index 86de18d8a..e18307c97 100644 --- a/core/teca_threaded_algorithm.h +++ b/core/teca_threaded_algorithm.h @@ -40,17 +40,20 @@ using teca_data_request_queue = using p_teca_data_request_queue = std::shared_ptr; /** Allocate and initialize a new thread pool. + * * @param comm[in] The communicator to allocate thread across * @param n_threads[in] The number of threads to create per MPI rank. Use -1 to * map one thread per physical core on each node. - * @param n_threads_per_device[in] The number of threads to assign to servicing - * each CUDA device. -1 for all threads. + * @param threads_per_device[in] The number of threads to assign to servicing + * each GPU/device. + * @param ranks_per_device[in] The number of ranks allowed to access each GPU/device. * @param bind[in] If set then thread will be bound to a specific core. * @param verbose[in] If set then the mapping is sent to the stderr */ TECA_EXPORT p_teca_data_request_queue new_teca_data_request_queue(MPI_Comm comm, - int n_threads, int n_threads_per_device, bool bind, bool verbose); + int n_threads, int threads_per_device, int ranks_per_device, bool bind, + bool verbose); /// This is the base class defining a threaded algorithm. /** The strategy employed is to parallelize over upstream data requests using a @@ -125,6 +128,14 @@ class TECA_EXPORT teca_threaded_algorithm : public teca_algorithm TECA_ALGORITHM_PROPERTY(int, threads_per_device) ///@} + /** @name ranks_per_device + * Set the number of ranks that have access to each GPU/device. Other ranks + * will use the CPU. + */ + ///@{ + TECA_ALGORITHM_PROPERTY(int, ranks_per_device) + ///@} + /// explicitly set the thread pool to submit requests to void set_data_request_queue(const p_teca_data_request_queue &queue); @@ -156,6 +167,7 @@ class TECA_EXPORT teca_threaded_algorithm : public teca_algorithm int stream_size; long long poll_interval; int threads_per_device; + int ranks_per_device; teca_threaded_algorithm_internals *internals; }; diff --git a/python/teca_py_core.i b/python/teca_py_core.i index d782c9ac6..dd87bea81 100644 --- a/python/teca_py_core.i +++ b/python/teca_py_core.i @@ -1105,8 +1105,8 @@ struct thread_util // each thread. static PyObject *thread_parameters(MPI_Comm comm, - int n_requested, int bind, int n_per_device, - int verbose) + int n_requested, int bind, int threads_per_device, + int ranks_per_device, int verbose) { teca_py_gil_state gil; @@ -1114,8 +1114,8 @@ PyObject *thread_parameters(MPI_Comm comm, std::vector device_ids; int n_threads = n_requested; if (teca_thread_util::thread_parameters(comm, -1, - n_requested, bind, n_per_device, verbose, n_threads, - affinity, device_ids)) + n_requested, bind, threads_per_device, ranks_per_device, + verbose, n_threads, affinity, device_ids)) { // caller requested automatic load balancing but this, failed. PyErr_Format(PyExc_RuntimeError, diff --git a/test/python/test_thread_parameters.py b/test/python/test_thread_parameters.py index b665830de..c2497f227 100644 --- a/test/python/test_thread_parameters.py +++ b/test/python/test_thread_parameters.py @@ -1,7 +1,7 @@ from teca import * from mpi4py.MPI import COMM_SELF -n,affin,devs = thread_util.thread_parameters(COMM_SELF, -1, 1, -1, 1) +n,affin,devs = thread_util.thread_parameters(COMM_SELF, -1, 1, -1, -1, 1) print('num_threads = %d'%(n)) print('affinity = %s'%(str(affin))) diff --git a/test/python/test_thread_parameters_mpi.py b/test/python/test_thread_parameters_mpi.py index 4e0adad84..10e5c37b3 100644 --- a/test/python/test_thread_parameters_mpi.py +++ b/test/python/test_thread_parameters_mpi.py @@ -22,7 +22,7 @@ sys.stderr.write('\n\nTesting automatic load balancing w/ affiniity map\n') try: n_threads, affinity, device_ids = \ - thread_util.thread_parameters(MPI.COMM_WORLD, -1, 1, -1, verbose) + thread_util.thread_parameters(MPI.COMM_WORLD, -1, 1, -1, -1, verbose) except(RuntimeError): sys.stderr.write('Failed to determine threading parameters\n') for i in range(n_ranks): @@ -39,7 +39,7 @@ sys.stderr.write('\n\nTesting automatic load balancing w/o affiniity map\n') try: n_threads, affinity, device_ids = \ - thread_util.thread_parameters(MPI.COMM_WORLD, -1, 0, -1, 0) + thread_util.thread_parameters(MPI.COMM_WORLD, -1, 0, -1, -1, 0) except(RuntimeError): sys.stderr.write('Failed to determine threading parameters\n') for i in range(n_ranks): @@ -56,7 +56,7 @@ sys.stderr.write('\n\nTesting explcit load balancing (%d threads per rank) w/ affiniity map\n'%(n_explicit)) try: n_threads, affinity, device_ids = \ - thread_util.thread_parameters(MPI.COMM_WORLD, n_explicit, 1, -1, verbose) + thread_util.thread_parameters(MPI.COMM_WORLD, n_explicit, 1, -1, -1, verbose) except(RuntimeError): sys.stderr.write('Failed to determine threading parameters\n') for i in range(n_ranks): @@ -73,7 +73,7 @@ sys.stderr.write('\n\nTesting explcit load balancing (%d threads per rank) w/o affiniity map\n'%(n_explicit)) try: n_threads, affinity, device_ids = \ - thread_util.thread_parameters(MPI.COMM_WORLD, n_explicit, 0, -1, 0) + thread_util.thread_parameters(MPI.COMM_WORLD, n_explicit, 0, -1, -1, 0) except(RuntimeError): sys.stderr.write('Failed to determine threading parameters\n') for i in range(n_ranks): From af1592a49c6412684ed1cfee24ddae0dde44ce5a Mon Sep 17 00:00:00 2001 From: Burlen Loring Date: Mon, 28 Aug 2023 09:21:01 -0700 Subject: [PATCH 04/14] threaded_algorithm propagate_device_assignment add an algorithm property that allows threads in the thread pool to inherit their device assignment from the down stream. This reduces inter device data movement when chaining thread pools. --- core/teca_threaded_algorithm.cxx | 24 ++++++++++++++++++------ core/teca_threaded_algorithm.h | 9 +++++++++ 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/core/teca_threaded_algorithm.cxx b/core/teca_threaded_algorithm.cxx index c72c1dde4..e159fbc29 100644 --- a/core/teca_threaded_algorithm.cxx +++ b/core/teca_threaded_algorithm.cxx @@ -45,12 +45,16 @@ class teca_data_request public: teca_data_request(const p_teca_algorithm &alg, const teca_algorithm_output_port up_port, - const teca_metadata &up_req) : m_alg(alg), - m_up_port(up_port), m_up_req(up_req) + const teca_metadata &up_req, int pass_dev) : m_alg(alg), + m_up_port(up_port), m_up_req(up_req), m_pass_dev(pass_dev) {} const_p_teca_dataset operator()(int device_id = -1) { + // use the device assigned downstream + if (m_pass_dev) + m_up_req.get("device_id", device_id); + // set the device on which to execute this pipeline m_up_req.set("device_id", device_id); @@ -62,6 +66,7 @@ class teca_data_request p_teca_algorithm m_alg; teca_algorithm_output_port m_up_port; teca_metadata m_up_req; + int m_pass_dev; }; @@ -96,7 +101,7 @@ void teca_threaded_algorithm_internals::thread_pool_resize(MPI_Comm comm, // -------------------------------------------------------------------------- teca_threaded_algorithm::teca_threaded_algorithm() : bind_threads(1), stream_size(-1), poll_interval(1000000), - threads_per_device(-1), ranks_per_device(1), + threads_per_device(-1), ranks_per_device(1), propagate_device_assignment(0), internals(new teca_threaded_algorithm_internals) { } @@ -133,6 +138,10 @@ void teca_threaded_algorithm::get_properties_description( TECA_POPTS_GET(int, prefix, ranks_per_device, "Sets the number of threads that service each CUDA GPU. If -1 the " "default of ranks allowed to access each GPU.") + TECA_POPTS_GET(int, prefix, propagate_device_assignment, + "When set device assignment is taken from the in coming request. " + "Otherwise the thread executing the upstream pipeline provides the " + "device assignment.") ; this->teca_algorithm::get_properties_description(prefix, opts); @@ -148,6 +157,7 @@ void teca_threaded_algorithm::set_properties(const std::string &prefix, TECA_POPTS_SET(opts, int, prefix, bind_threads) TECA_POPTS_SET(opts, int, prefix, ranks_per_device) + TECA_POPTS_SET(opts, int, prefix, propagate_device_assignment) // force update the the thread pool settings std::string opt_name = (prefix.empty()?"":prefix+"::") + "thread_pool_size"; @@ -246,10 +256,12 @@ const_p_teca_dataset teca_threaded_algorithm::request_data( if (!up_reqs[i].empty()) { teca_algorithm_output_port &up_port - = alg->get_input_connection(i%n_inputs); + = alg->get_input_connection( i % n_inputs ); - teca_data_request dreq(get_algorithm(up_port), up_port, up_reqs[i]); - teca_data_request_task task(dreq); + teca_data_request_task task( + teca_data_request(get_algorithm(up_port), up_port, + up_reqs[i], this->propagate_device_assignment) + ); this->internals->thread_pool->push_task(task); } diff --git a/core/teca_threaded_algorithm.h b/core/teca_threaded_algorithm.h index e18307c97..557c8b779 100644 --- a/core/teca_threaded_algorithm.h +++ b/core/teca_threaded_algorithm.h @@ -139,6 +139,14 @@ class TECA_EXPORT teca_threaded_algorithm : public teca_algorithm /// explicitly set the thread pool to submit requests to void set_data_request_queue(const p_teca_data_request_queue &queue); + /** @name propagate_device_assignment + * When set device assignment is taken from down stream request. + * Otherwise the thread executing the pipeline will provide the assignment. + */ + ///@{ + TECA_ALGORITHM_PROPERTY(int, propagate_device_assignment) + ///@} + protected: teca_threaded_algorithm(); @@ -168,6 +176,7 @@ class TECA_EXPORT teca_threaded_algorithm : public teca_algorithm long long poll_interval; int threads_per_device; int ranks_per_device; + int propagate_device_assignment; teca_threaded_algorithm_internals *internals; }; From 80820ef12b1031f067c61d4d5310134f3ae2368f Mon Sep 17 00:00:00 2001 From: Burlen Loring Date: Mon, 28 Aug 2023 11:38:05 -0700 Subject: [PATCH 05/14] threaded_algorithm fix set algorithm props from command line --- core/teca_threaded_algorithm.cxx | 3 +++ core/teca_threaded_algorithm.h | 12 +++--------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/core/teca_threaded_algorithm.cxx b/core/teca_threaded_algorithm.cxx index e159fbc29..743468ecc 100644 --- a/core/teca_threaded_algorithm.cxx +++ b/core/teca_threaded_algorithm.cxx @@ -156,6 +156,9 @@ void teca_threaded_algorithm::set_properties(const std::string &prefix, this->teca_algorithm::set_properties(prefix, opts); TECA_POPTS_SET(opts, int, prefix, bind_threads) + TECA_POPTS_SET(opts, int, prefix, stream_size) + TECA_POPTS_SET(opts, long long, prefix, poll_interval) + TECA_POPTS_SET(opts, int, prefix, threads_per_device) TECA_POPTS_SET(opts, int, prefix, ranks_per_device) TECA_POPTS_SET(opts, int, prefix, propagate_device_assignment) diff --git a/core/teca_threaded_algorithm.h b/core/teca_threaded_algorithm.h index 557c8b779..6c3048753 100644 --- a/core/teca_threaded_algorithm.h +++ b/core/teca_threaded_algorithm.h @@ -88,13 +88,6 @@ class TECA_EXPORT teca_threaded_algorithm : public teca_algorithm /// Get the number of threads in the pool. unsigned int get_thread_pool_size() const noexcept; - /** @name verbose - * set/get the verbosity level. - */ - ///@{ - TECA_ALGORITHM_PROPERTY(int, verbose) - ///@} - /** @name bind_threads * set/get thread affinity mode. When 0 threads are not bound CPU cores, * allowing for migration among all cores. This will likely degrade @@ -121,8 +114,9 @@ class TECA_EXPORT teca_threaded_algorithm : public teca_algorithm TECA_ALGORITHM_PROPERTY(long long, poll_interval) ///@} - /** @name threads_per_cuda_device - * set the number of threads to service each CUDA device. + /** @name threads_per_device + * Set the number of threads to service each GPU/device. Other threads will + * use the CPU. */ ///@{ TECA_ALGORITHM_PROPERTY(int, threads_per_device) From 80a01599ef12854b26f238582076857ea54c3f70 Mon Sep 17 00:00:00 2001 From: Burlen Loring Date: Mon, 28 Aug 2023 11:39:05 -0700 Subject: [PATCH 06/14] test add test for cpp_temporal_reduciton w. io --- test/CMakeLists.txt | 13 ++- test/test_cpp_temporal_reduction_io.cpp | 136 ++++++++++++++++++++++++ 2 files changed, 148 insertions(+), 1 deletion(-) create mode 100644 test/test_cpp_temporal_reduction_io.cpp diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index ce3f12562..4556d3dee 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1023,4 +1023,15 @@ teca_add_test(test_cpp_temporal_reduction_inmem SOURCES test_cpp_temporal_reduction_inmem.cpp LIBS teca_core teca_data teca_io teca_alg ${teca_test_link} COMMAND test_cpp_temporal_reduction_inmem - 360 180 5 24 1 1 1 monthly average 1 test_cpp_temporal_reduction_%t%.nc yearly 1 1 1 1) + 360 180 5 24 1 ${TEST_CORES} ${TEST_CORES} monthly average 1 #TODO -- more than 1 step per req + test_cpp_temporal_reduction_inmem_%t%.nc yearly ${TEST_CORES} 1 1 1) + +teca_add_test(test_cpp_temporal_reduction_io + SOURCES test_cpp_temporal_reduction_io.cpp + LIBS teca_core teca_data teca_io teca_alg ${teca_test_link} + COMMAND test_cpp_temporal_reduction_io + ${TECA_DATA_ROOT}/prw_hus_day_MRI-CGCM3_historical_r1i1p1_19500101-19501231\.nc + lon lat . time prw 0 -1 ${TEST_CORES} ${TEST_CORES} ${TEST_CORES} 4 1 0 + monthly average 1 # TODO -- more than 1 step per req. + ./test_cpp_temporal_reduction_io_%t%.nc yearly + ${TEST_CORES} ${TEST_CORES} ${TEST_CORES} 1 1) diff --git a/test/test_cpp_temporal_reduction_io.cpp b/test/test_cpp_temporal_reduction_io.cpp new file mode 100644 index 000000000..9334c21d7 --- /dev/null +++ b/test/test_cpp_temporal_reduction_io.cpp @@ -0,0 +1,136 @@ +#include "teca_config.h" +#include "teca_metadata.h" +#include "teca_cf_reader.h" +#include "teca_temporal_reduction.h" +#include "teca_cf_writer.h" +#include "teca_mpi_manager.h" +#include "teca_system_interface.h" + +#include +#include +#include +#include +#include +#include + +using microseconds_t = std::chrono::duration; + +int main(int argc, char **argv) +{ + auto t0 = std::chrono::high_resolution_clock::now(); + + teca_mpi_manager mpi_man(argc, argv); + int rank = mpi_man.get_comm_rank(); + int n_ranks = mpi_man.get_comm_size(); + + teca_system_interface::set_stack_trace_on_error(); + teca_system_interface::set_stack_trace_on_mpi_error(); + + if (argc != 25) + { + if (rank == 0) + { + std::cerr << "test_temporal_reduction [files regex]" + " [x axis var] [y axis var] [z axis var] [t axis var]" + " [red var] [first step] [last step]" + " [red threads] [red threads per dev] [red ranks per dev] [red stream size] [red bind threads] [red prop dev]" + " [reduction interval] [reduction operator] [steps per request]" + " [out file] [file layout]" + " [wri threads] [wri threads per dev] [wri ranks per dev] [wri stream size] [wri bind threads]" + << std::endl; + } + return -1; + } + + std::string files_regex = argv[1]; + std::string x_axis_var = argv[2]; + std::string y_axis_var = argv[3]; + std::string z_axis_var = argv[4]; + std::string t_axis_var = argv[5]; + std::string red_var = argv[6]; + int first_step = atoi(argv[7]); + int last_step = atoi(argv[8]); + + int n_red_threads = atoi(argv[9]); + int red_threads_per_dev = atoi(argv[10]); + int red_ranks_per_dev = atoi(argv[11]); + int red_stream_size = atoi(argv[12]); + int red_bind_threads = atoi(argv[13]); + int red_prop_dev_id = atoi(argv[14]); + + std::string red_int = argv[15]; + std::string red_op = argv[16]; + int steps_per_req = atoi(argv[17]); + + std::string ofile_name = argv[18]; + std::string layout = argv[19]; + int n_wri_threads = atoi(argv[20]); + int wri_threads_per_dev = atoi(argv[21]); + int wri_ranks_per_dev = atoi(argv[22]); + int wri_stream_size = atoi(argv[23]); + int wri_bind_threads = atoi(argv[24]); + + + if (rank == 0) + { + std::cerr << "n_ranks=" << n_ranks + << " n_red_threads=" << n_red_threads << " red_threads_per_dev=" << red_threads_per_dev + << " red_ranks_per_dev=" << red_ranks_per_dev << "red_stream_size=" << red_stream_size + << " red_bind_threads=" << red_bind_threads << " red_pro_dev_id=" << red_prop_dev_id + << " red_int=" << red_int << " red_op=" << red_op << " steps_per_req=" << steps_per_req + << " layout=" << layout << " n_wri_threads=" << n_wri_threads + << " wri_threads_per_dev=" << wri_threads_per_dev + << " wri_ranks_per_dev=" << wri_ranks_per_dev << " wri_stream_size=" << wri_stream_size + << " wri_bind_threads=" << wri_bind_threads + << std::endl; + } + + // reader + auto cf_reader = teca_cf_reader::New(); + cf_reader->set_x_axis_variable(x_axis_var); + cf_reader->set_y_axis_variable(y_axis_var); + cf_reader->set_z_axis_variable(z_axis_var == "." ? std::string() : z_axis_var); + cf_reader->set_t_axis_variable(t_axis_var); + cf_reader->set_files_regex(files_regex); + + // temporal reduction + auto reduc = teca_cpp_temporal_reduction::New(); + reduc->set_input_connection(cf_reader->get_output_port()); + reduc->set_verbose(1); + reduc->set_threads_per_device(red_threads_per_dev); + reduc->set_ranks_per_device(red_ranks_per_dev); + reduc->set_bind_threads(red_bind_threads); + reduc->set_stream_size(red_stream_size); + reduc->set_propagate_device_assignment(red_prop_dev_id); + reduc->set_thread_pool_size(n_red_threads); + reduc->set_interval(red_int); + reduc->set_operation(red_op); + reduc->set_point_arrays({red_var}); + reduc->set_steps_per_request(steps_per_req); + + // writer + auto cfw = teca_cf_writer::New(); + cfw->set_input_connection(reduc->get_output_port()); + cfw->set_verbose(1); + cfw->set_threads_per_device(wri_threads_per_dev); + cfw->set_ranks_per_device(wri_ranks_per_dev); + cfw->set_stream_size(wri_stream_size); + cfw->set_bind_threads(wri_bind_threads); + cfw->set_thread_pool_size(n_wri_threads); + cfw->set_file_name(ofile_name); + cfw->set_layout(layout); + cfw->set_point_arrays({red_var}); + cfw->set_first_step(first_step); + cfw->set_last_step(last_step); + + cfw->update(); + + if (rank == 0) + { + auto t1 = std::chrono::high_resolution_clock::now(); + microseconds_t dt(t1 - t0); + std::cerr << "total runtime : " << (dt.count() / 1e6) << std::endl; + } + + return 0; +} From 3d5d4db29de28fc48276ec2b4912f3277831d5f5 Mon Sep 17 00:00:00 2001 From: Burlen Loring Date: Mon, 28 Aug 2023 11:58:36 -0700 Subject: [PATCH 07/14] cf_writer fix let threaded_algorithm process command line was forwarding to teca_algorithm which resulted in none of the threading related properties being picked up from the command line. --- io/teca_cf_writer.cxx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/teca_cf_writer.cxx b/io/teca_cf_writer.cxx index e5f3878b9..3141a0ba3 100644 --- a/io/teca_cf_writer.cxx +++ b/io/teca_cf_writer.cxx @@ -147,7 +147,7 @@ void teca_cf_writer::get_properties_description( "the list of non-geometric arrays to write") ; - this->teca_algorithm::get_properties_description(prefix, opts); + this->teca_threaded_algorithm::get_properties_description(prefix, opts); global_opts.add(opts); } From bd321844e7b351a6c8d8c89221f74bd8bd569b6e Mon Sep 17 00:00:00 2001 From: Burlen Loring Date: Wed, 30 Aug 2023 09:38:05 -0700 Subject: [PATCH 08/14] descriptive_statistics remove debuging code --- alg/teca_descriptive_statistics.cxx | 2 -- 1 file changed, 2 deletions(-) diff --git a/alg/teca_descriptive_statistics.cxx b/alg/teca_descriptive_statistics.cxx index a02b469c7..242c24782 100644 --- a/alg/teca_descriptive_statistics.cxx +++ b/alg/teca_descriptive_statistics.cxx @@ -306,8 +306,6 @@ const_p_teca_dataset teca_descriptive_statistics::execute( atrs.set("step", (teca_metadata)step_atts); table->get_metadata().set("attributes", atrs); - - atrs.to_stream(std::cerr); } // for each variable From bf99eff337d6658321e7322489117bb6c7c5d768 Mon Sep 17 00:00:00 2001 From: Burlen Loring Date: Wed, 30 Aug 2023 09:38:51 -0700 Subject: [PATCH 09/14] cpu/cuda_thread_pool fix streaming bug This fixes a bug introduced in 7120ecb83. There the early termination criteria was dropped from the loop that scans for completed work. Early termination is the basis for streaming and without it we were waiting for all work to complete before returning effictively disabling streaming. --- core/teca_cpu_thread_pool.h | 7 +++++-- core/teca_cuda_thread_pool.h | 9 ++++++--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/core/teca_cpu_thread_pool.h b/core/teca_cpu_thread_pool.h index 14b0e7523..d4daa603c 100644 --- a/core/teca_cpu_thread_pool.h +++ b/core/teca_cpu_thread_pool.h @@ -229,9 +229,12 @@ int teca_cpu_thread_pool::wait_some(long n_to_wait, } } - // if we have not accumulated the requested number of datasets + // we have the requested number of datasets + if (data.size() >= static_cast(n_to_wait)) + break; + // wait for the user supplied duration before re-scanning - if (thread_valid && (data.size() < static_cast(n_to_wait))) + if (thread_valid) std::this_thread::sleep_for(std::chrono::nanoseconds(poll_interval)); } diff --git a/core/teca_cuda_thread_pool.h b/core/teca_cuda_thread_pool.h index 874526e1d..618454291 100644 --- a/core/teca_cuda_thread_pool.h +++ b/core/teca_cuda_thread_pool.h @@ -235,7 +235,7 @@ int teca_cuda_thread_pool::wait_some(long n_to_wait, // gather the requested number of datasets size_t thread_valid = 1; - while (thread_valid) + while (thread_valid && ((data.size() < static_cast(n_to_wait)))) { { thread_valid = 0; @@ -259,9 +259,12 @@ int teca_cuda_thread_pool::wait_some(long n_to_wait, } } - // if we have not accumulated the requested number of datasets + // we have the requested number of datasets + if (data.size() >= static_cast(n_to_wait)) + break; + // wait for the user supplied duration before re-scanning - if (thread_valid && (data.size() < static_cast(n_to_wait))) + if (thread_valid) std::this_thread::sleep_for(std::chrono::nanoseconds(poll_interval)); } From 220587ad6452bcc38cfad7ff3c629ca12e6ea120 Mon Sep 17 00:00:00 2001 From: Burlen Loring Date: Wed, 30 Aug 2023 12:40:18 -0700 Subject: [PATCH 10/14] cpu_thread_pool fix bind argument position --- core/teca_cpu_thread_pool.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/teca_cpu_thread_pool.h b/core/teca_cpu_thread_pool.h index d4daa603c..396b31778 100644 --- a/core/teca_cpu_thread_pool.h +++ b/core/teca_cpu_thread_pool.h @@ -123,7 +123,7 @@ void teca_cpu_thread_pool::create_threads(MPI_Comm comm, std::vector device_ids; if (teca_thread_util::thread_parameters(comm, -1, - n_requested, bind, -1, -1, verbose, n_threads, core_ids, + n_requested, -1, -1, bind, verbose, n_threads, core_ids, device_ids)) { TECA_WARNING("Failed to detetermine thread parameters." From c9704448ebd89ac82fc25ef69caaa7e0875766b8 Mon Sep 17 00:00:00 2001 From: Burlen Loring Date: Wed, 30 Aug 2023 12:41:52 -0700 Subject: [PATCH 11/14] thread_util report num threads when not binding --- core/teca_thread_util.cxx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/teca_thread_util.cxx b/core/teca_thread_util.cxx index f0eb9036b..5c12dbf18 100644 --- a/core/teca_thread_util.cxx +++ b/core/teca_thread_util.cxx @@ -590,7 +590,7 @@ int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested, if (!bind) { if (verbose) - TECA_STATUS("thread to core binding disabled") + TECA_STATUS("thread to core binding disabled. n_threads=" << n_threads) return 0; } From 1d5f4158127487793e8236189e68fd6411b0fa17 Mon Sep 17 00:00:00 2001 From: Burlen Loring Date: Wed, 30 Aug 2023 16:49:01 -0700 Subject: [PATCH 12/14] thread_util clamp the number of threads when the requested the numebr of threads is less than -1, use at most this many threads. fewer may be used if there are insufficient cores on the node. --- core/teca_thread_util.cxx | 14 ++++++++++---- core/teca_thread_util.h | 15 +++++++++------ 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/core/teca_thread_util.cxx b/core/teca_thread_util.cxx index 5c12dbf18..40b87930e 100644 --- a/core/teca_thread_util.cxx +++ b/core/teca_thread_util.cxx @@ -528,16 +528,22 @@ int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested, // thread pool size is based on core and process count int nlg = 0; + // map threads to physical cores + nlg = cores_per_node % n_procs; + n_threads = cores_per_node / n_procs + (proc_id < nlg ? 1 : 0); if (n_requested > 0) { - // user specified override + // use exactly this many n_threads = n_requested; } + else if (n_requested < -1) + { + // use at most this many + n_threads = std::min(-n_requested, n_threads); + n_requested = n_threads; + } else { - // map threads to physical cores - nlg = cores_per_node % n_procs; - n_threads = cores_per_node/n_procs + (proc_id < nlg ? 1 : 0); } // if the user runs more MPI ranks than cores some of the ranks diff --git a/core/teca_thread_util.h b/core/teca_thread_util.h index 33f0db276..c420a8431 100644 --- a/core/teca_thread_util.h +++ b/core/teca_thread_util.h @@ -37,8 +37,10 @@ namespace teca_thread_util * be bound to to acheive this. Passing n_requested >= 1 * specifies a run time override. This indicates that * caller wants to use a specific number of threads, - * rather than one per physical core. In this case the - * affinity map is also constructed. + * rather than one per physical core. Passing + * n_requested < -1 specifies a maximum to use if + * sufficient cores are available. In all cases the + * affinity map is constructed. * * @param[in] n_threads_per_device the number of threads that should service * GPUs. If 0 the run will be CPU only. If -1 @@ -60,10 +62,11 @@ namespace teca_thread_util * of threads one can use such that there is one * thread per phycial core taking into account all * ranks running on the node. if n_requested is >= 1 - * n_threads will be set to n_requested. This allows a - * run time override for cases when the caller knows - * how she wants to schedule things. if an error - * occurs and n_requested is -1 this will be set to 1. + * n_threads will explicitly be set to n_requested. If + * n_requested < -1 at most -n_requested threads will + * be used. Fewer threads will be used if there are + * insufficient cores available. if an error occurs + * and n_requested is -1 this will be set to 1. * * @param[out] affinity an affinity map, describing for each of n_threads, * a core id that the thread can be bound to. if From 7549e888322a9c9e5ebba738a30d841db7904f88 Mon Sep 17 00:00:00 2001 From: Burlen Loring Date: Thu, 31 Aug 2023 11:34:43 -0700 Subject: [PATCH 13/14] cuda_util simplify device assignment --- core/teca_cuda_util.cu | 49 ++++++++++++++++++------------------------ core/teca_cuda_util.h | 15 ++++++------- 2 files changed, 28 insertions(+), 36 deletions(-) diff --git a/core/teca_cuda_util.cu b/core/teca_cuda_util.cu index 1d5b4ee96..300f36063 100644 --- a/core/teca_cuda_util.cu +++ b/core/teca_cuda_util.cu @@ -20,6 +20,10 @@ int synchronize() int get_local_cuda_devices(MPI_Comm comm, int &ranks_per_device, std::vector &local_dev) { + // if ranks per device is zero this is a CPU only run + if (ranks_per_device == 0) + return 0; + cudaError_t ierr = cudaSuccess; // get the number of CUDA GPU's available on this node @@ -55,44 +59,33 @@ int get_local_cuda_devices(MPI_Comm comm, int &ranks_per_device, MPI_Comm_size(node_comm, &n_node_ranks); MPI_Comm_rank(node_comm, &node_rank); - // adjust the number of devices such that multiple ranks may share a - // device. - if (ranks_per_device < 0) - { - ranks_per_device = n_node_ranks / n_node_dev + - (n_node_ranks % n_node_dev ? 1 : 0); - - // limit to at most 8 MPI ranks per GPU - ranks_per_device = std::min(ranks_per_device, 8); - } - - int n_node_dev_use = n_node_dev * ranks_per_device; - - if (n_node_dev_use >= n_node_ranks) + if (n_node_ranks < n_node_dev) { + // more devices than ranks, // assign devices evenly between ranks - int max_dev = n_node_dev_use - 1; - int n_per_rank = std::max(n_node_dev_use / n_node_ranks, 1); - int n_larger = n_node_dev_use % n_node_ranks; + int n_per_rank = n_node_dev / n_node_ranks; + int n_larger = n_node_dev % n_node_ranks; + int n_local = n_per_rank + (node_rank < n_larger ? 1 : 0); int first_dev = n_per_rank * node_rank + (node_rank < n_larger ? node_rank : n_larger); - first_dev = std::min(max_dev, first_dev); - - int last_dev = first_dev + n_per_rank - 1 - + (node_rank < n_larger ? 1 : 0); - - last_dev = std::min(max_dev, last_dev); - - for (int i = first_dev; i <= last_dev; ++i) - local_dev.push_back( i % n_node_dev ); + for (int i = 0; i < n_local; ++i) + local_dev.push_back(first_dev + i); } else { - // assign at most one MPI rank per GPU - if (node_rank < n_node_dev_use) + // TODO -- automatic settings + if (ranks_per_device < 0) + ranks_per_device *= -1; + + // more ranks than devices. round robin assignment such that at + // most each device has ranks_per_device. the remaining ranks will + // be CPU only. + if (node_rank < ranks_per_device * n_node_dev) + { local_dev.push_back( node_rank % n_node_dev ); + } } MPI_Comm_free(&node_comm); diff --git a/core/teca_cuda_util.h b/core/teca_cuda_util.h index e77152ff1..1d13a6b12 100644 --- a/core/teca_cuda_util.h +++ b/core/teca_cuda_util.h @@ -19,18 +19,17 @@ namespace teca_cuda_util { /** Query the system for the locally available(on this rank) CUDA device count. * this is an MPI collective call which returns a set of device ids that can be - * used locally. If there are as many (or more than) devices on the node than - * the number of MPI ranks assigned to the node the list of device ids will be - * unique across MPI ranks on the node. Otherwise devices are assigned round - * robin fashion. + * used locally. Node wide coordination assures that one can put a limit on the + * number of ranks per node. * * @param[in] comm MPI communicator defining a set of nodes on which need * access to the available GPUS * @param[in,out] ranks_per_device The number of MPI ranks to use per CUDA - * device. Passing -1 will assign all MPI - * ranks a GPU up to a maximum of 8 ranks - * per GPU. The number of ranks per GPU - * used is returned through this argument. + * device. When set to 0 no GPUs are used. When + * set to -1 all ranks are assigned a GPU but + * multiple ranks will share a GPU when there + * are more ranks than devices. + * * @param[out] local_dev a list of device ids that can be used by the calling * MPI rank. * @returns non-zero on error. From fa1c2099a6011e689c2f72982a7f1cae4b5ad2ef Mon Sep 17 00:00:00 2001 From: Burlen Loring Date: Thu, 31 Aug 2023 11:51:26 -0700 Subject: [PATCH 14/14] thread_util warn about too few threads wo MPI don't issue the warning when MPI is used --- core/teca_thread_util.cxx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/teca_thread_util.cxx b/core/teca_thread_util.cxx index 40b87930e..39975658d 100644 --- a/core/teca_thread_util.cxx +++ b/core/teca_thread_util.cxx @@ -566,7 +566,7 @@ int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested, // assign each thread to a CUDA device or CPU core #if defined(TECA_HAS_CUDA) - if (verbose && (n_threads < n_cuda_devices)) + if (verbose && (n_ranks < 2) && (n_threads < n_cuda_devices)) { TECA_WARNING(<< n_threads << " threads is insufficient to service " << n_cuda_devices