Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perlmutter scheduling #775

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion alg/teca_bayesian_ar_detect.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ void teca_bayesian_ar_detect::set_modified()
void teca_bayesian_ar_detect::set_thread_pool_size(int n)
{
this->internals->queue = new_teca_data_request_queue(
this->get_communicator(), n, -1, true, this->get_verbose());
this->get_communicator(), n, -1, -1, true, this->get_verbose());
}

// --------------------------------------------------------------------------
Expand Down
2 changes: 0 additions & 2 deletions alg/teca_descriptive_statistics.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,6 @@ const_p_teca_dataset teca_descriptive_statistics::execute(
atrs.set("step", (teca_metadata)step_atts);

table->get_metadata().set("attributes", atrs);

atrs.to_stream(std::cerr);
}

// for each variable
Expand Down
2 changes: 1 addition & 1 deletion alg/teca_pytorch_algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def initialize(self):
n_threads = self.n_threads

n_threads, affinity, device_ids = \
thread_util.thread_parameters(comm, n_threads, 1, -1,
thread_util.thread_parameters(comm, n_threads, 1, -1, -1,
0 if self.verbose < 2 else 1)

# let the user request a bound on the number of threads
Expand Down
9 changes: 6 additions & 3 deletions core/teca_cpu_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ void teca_cpu_thread_pool<task_t, data_t>::create_threads(MPI_Comm comm,
std::vector<int> device_ids;

if (teca_thread_util::thread_parameters(comm, -1,
n_requested, bind, -1, verbose, n_threads, core_ids,
n_requested, -1, -1, bind, verbose, n_threads, core_ids,
device_ids))
{
TECA_WARNING("Failed to detetermine thread parameters."
Expand Down Expand Up @@ -229,9 +229,12 @@ int teca_cpu_thread_pool<task_t, data_t>::wait_some(long n_to_wait,
}
}

// if we have not accumulated the requested number of datasets
// we have the requested number of datasets
if (data.size() >= static_cast<unsigned int>(n_to_wait))
break;

// wait for the user supplied duration before re-scanning
if (thread_valid && (data.size() < static_cast<unsigned int>(n_to_wait)))
if (thread_valid)
std::this_thread::sleep_for(std::chrono::nanoseconds(poll_interval));
}

Expand Down
40 changes: 25 additions & 15 deletions core/teca_cuda_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,21 @@ class TECA_EXPORT teca_cuda_thread_pool
* ranks running on the same node are taken into
* account, resulting in 1 thread per core node wide.
*
* @param[in] n_threads_per_device number of threads to assign to each CUDA
* device. If 0 no threads will service
* CUDA devices, this is CPU only mode.
* If -1 the default of 8 threads per
* device will be used.
* @param[in] threads_per_device number of threads to assign to each
* GPU/device. If 0 only CPUs will be used.
* If -1 the default of 8 threads per device
* will be used.
*
* @param[in] ranks_per_device the number of MPI ranks to allow access to
* to each device/GPU.
*
* @param[in] bind bind each thread to a specific core.
*
* @param[in] verbose print a report of the thread to core bindings
*/
teca_cuda_thread_pool(MPI_Comm comm, int n_threads,
int n_threads_per_device, bool bind, bool verbose);
int threads_per_device, int ranks_per_device, bool bind,
bool verbose);

// get rid of copy and asignment
TECA_ALGORITHM_DELETE_COPY_ASSIGN(teca_cuda_thread_pool)
Expand Down Expand Up @@ -114,7 +117,8 @@ class TECA_EXPORT teca_cuda_thread_pool
private:
/// create n threads for the pool
void create_threads(MPI_Comm comm, int n_threads,
int n_threads_per_device, bool bind, bool verbose);
int threads_per_device, int ranks_per_device, bool bind,
bool verbose);

private:
long m_num_futures;
Expand All @@ -128,15 +132,18 @@ class TECA_EXPORT teca_cuda_thread_pool
// --------------------------------------------------------------------------
template <typename task_t, typename data_t>
teca_cuda_thread_pool<task_t, data_t>::teca_cuda_thread_pool(MPI_Comm comm,
int n_threads, int n_threads_per_device, bool bind, bool verbose) : m_live(true)
int n_threads, int threads_per_device, int ranks_per_device, bool bind,
bool verbose) : m_live(true)
{
this->create_threads(comm, n_threads, n_threads_per_device, bind, verbose);
this->create_threads(comm, n_threads, threads_per_device,
ranks_per_device, bind, verbose);
}

// --------------------------------------------------------------------------
template <typename task_t, typename data_t>
void teca_cuda_thread_pool<task_t, data_t>::create_threads(MPI_Comm comm,
int n_requested, int n_per_device, bool bind, bool verbose)
int n_requested, int threads_per_device, int ranks_per_device, bool bind,
bool verbose)
{
m_num_futures = 0;

Expand All @@ -146,8 +153,8 @@ void teca_cuda_thread_pool<task_t, data_t>::create_threads(MPI_Comm comm,
std::deque<int> core_ids;
std::vector<int> device_ids;

if (teca_thread_util::thread_parameters(comm, -1,
n_requested, n_per_device, bind, verbose, n_threads,
if (teca_thread_util::thread_parameters(comm, -1, n_requested,
threads_per_device, ranks_per_device, bind, verbose, n_threads,
core_ids, device_ids))
{
TECA_WARNING("Failed to detetermine thread parameters."
Expand Down Expand Up @@ -228,7 +235,7 @@ int teca_cuda_thread_pool<task_t, data_t>::wait_some(long n_to_wait,

// gather the requested number of datasets
size_t thread_valid = 1;
while (thread_valid)
while (thread_valid && ((data.size() < static_cast<unsigned int>(n_to_wait))))
{
{
thread_valid = 0;
Expand All @@ -252,9 +259,12 @@ int teca_cuda_thread_pool<task_t, data_t>::wait_some(long n_to_wait,
}
}

// if we have not accumulated the requested number of datasets
// we have the requested number of datasets
if (data.size() >= static_cast<unsigned int>(n_to_wait))
break;

// wait for the user supplied duration before re-scanning
if (thread_valid && (data.size() < static_cast<unsigned int>(n_to_wait)))
if (thread_valid)
std::this_thread::sleep_for(std::chrono::nanoseconds(poll_interval));
}

Expand Down
53 changes: 23 additions & 30 deletions core/teca_cuda_util.cu
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ int synchronize()
int get_local_cuda_devices(MPI_Comm comm, int &ranks_per_device,
std::vector<int> &local_dev)
{
// if ranks per device is zero this is a CPU only run
if (ranks_per_device == 0)
return 0;

cudaError_t ierr = cudaSuccess;

// get the number of CUDA GPU's available on this node
Expand Down Expand Up @@ -55,54 +59,43 @@ int get_local_cuda_devices(MPI_Comm comm, int &ranks_per_device,
MPI_Comm_size(node_comm, &n_node_ranks);
MPI_Comm_rank(node_comm, &node_rank);

// adjust the number of devices such that multiple ranks may share a
// device.
if (ranks_per_device < 0)
{
ranks_per_device = n_node_ranks / n_node_dev +
(n_node_ranks % n_node_dev ? 1 : 0);

// limit to at most 8 MPI ranks per GPU
ranks_per_device = std::min(ranks_per_device, 8);
}

int n_node_dev_use = n_node_dev * ranks_per_device;

if (n_node_dev_use >= n_node_ranks)
if (n_node_ranks < n_node_dev)
{
// more devices than ranks,
// assign devices evenly between ranks
int max_dev = n_node_dev_use - 1;
int n_per_rank = std::max(n_node_dev_use / n_node_ranks, 1);
int n_larger = n_node_dev_use % n_node_ranks;
int n_per_rank = n_node_dev / n_node_ranks;
int n_larger = n_node_dev % n_node_ranks;
int n_local = n_per_rank + (node_rank < n_larger ? 1 : 0);

int first_dev = n_per_rank * node_rank
+ (node_rank < n_larger ? node_rank : n_larger);

first_dev = std::min(max_dev, first_dev);

int last_dev = first_dev + n_per_rank - 1
+ (node_rank < n_larger ? 1 : 0);

last_dev = std::min(max_dev, last_dev);

for (int i = first_dev; i <= last_dev; ++i)
local_dev.push_back( i % n_node_dev );
for (int i = 0; i < n_local; ++i)
local_dev.push_back(first_dev + i);
}
else
{
// assign at most one MPI rank per GPU
if (node_rank < n_node_dev_use)
// TODO -- automatic settings
if (ranks_per_device < 0)
ranks_per_device *= -1;

// more ranks than devices. round robin assignment such that at
// most each device has ranks_per_device. the remaining ranks will
// be CPU only.
if (node_rank < ranks_per_device * n_node_dev)
{
local_dev.push_back( node_rank % n_node_dev );
}
}

MPI_Comm_free(&node_comm);

return 0;
}
else
#endif
// without MPI this process can use all CUDA devices
if (ranks_per_device != 0)
{
// without MPI this process can use all CUDA devices
for (int i = 0; i < n_node_dev; ++i)
local_dev.push_back(i);
}
Expand Down
15 changes: 7 additions & 8 deletions core/teca_cuda_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,17 @@ namespace teca_cuda_util
{
/** Query the system for the locally available(on this rank) CUDA device count.
* this is an MPI collective call which returns a set of device ids that can be
* used locally. If there are as many (or more than) devices on the node than
* the number of MPI ranks assigned to the node the list of device ids will be
* unique across MPI ranks on the node. Otherwise devices are assigned round
* robin fashion.
* used locally. Node wide coordination assures that one can put a limit on the
* number of ranks per node.
*
* @param[in] comm MPI communicator defining a set of nodes on which need
* access to the available GPUS
* @param[in,out] ranks_per_device The number of MPI ranks to use per CUDA
* device. Passing -1 will assign all MPI
* ranks a GPU up to a maximum of 8 ranks
* per GPU. The number of ranks per GPU
* used is returned through this argument.
* device. When set to 0 no GPUs are used. When
* set to -1 all ranks are assigned a GPU but
* multiple ranks will share a GPU when there
* are more ranks than devices.
*
* @param[out] local_dev a list of device ids that can be used by the calling
* MPI rank.
* @returns non-zero on error.
Expand Down
32 changes: 19 additions & 13 deletions core/teca_thread_util.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,12 @@ int generate_report(MPI_Comm comm, int local_proc, int base_id,

// **************************************************************************
int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested,
int n_per_device, bool bind, bool verbose, int &n_threads,
std::deque<int> &affinity, std::vector<int> &device_ids)
int threads_per_device, int ranks_per_device, bool bind, bool verbose,
int &n_threads, std::deque<int> &affinity, std::vector<int> &device_ids)
{
#if !defined(TECA_HAS_CUDA)
(void) n_per_device;
(void) threads_per_device;
(void) ranks_per_device;
#endif

// this rank is excluded from computations
Expand Down Expand Up @@ -379,7 +380,6 @@ int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested,

#if defined(TECA_HAS_CUDA)
// check for an override to the default number of MPI ranks per device
int ranks_per_device = 1;
int ranks_per_device_set = teca_system_util::get_environment_variable
("TECA_RANKS_PER_DEVICE", ranks_per_device);

Expand All @@ -404,14 +404,14 @@ int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested,
int default_per_device = 8;

if (!teca_system_util::get_environment_variable
("TECA_THREADS_PER_DEVICE", n_per_device) &&
("TECA_THREADS_PER_DEVICE", threads_per_device) &&
verbose && (rank == 0))
{
TECA_STATUS("TECA_THREADS_PER_DEVICE = " << n_per_device)
TECA_STATUS("TECA_THREADS_PER_DEVICE = " << threads_per_device)
}

int n_device_threads = n_cuda_devices *
(n_per_device < 0 ? default_per_device : n_per_device);
(threads_per_device < 0 ? default_per_device : threads_per_device);

#endif

Expand Down Expand Up @@ -528,16 +528,22 @@ int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested,

// thread pool size is based on core and process count
int nlg = 0;
// map threads to physical cores
nlg = cores_per_node % n_procs;
n_threads = cores_per_node / n_procs + (proc_id < nlg ? 1 : 0);
if (n_requested > 0)
{
// user specified override
// use exactly this many
n_threads = n_requested;
}
else if (n_requested < -1)
{
// use at most this many
n_threads = std::min(-n_requested, n_threads);
n_requested = n_threads;
}
else
{
// map threads to physical cores
nlg = cores_per_node % n_procs;
n_threads = cores_per_node/n_procs + (proc_id < nlg ? 1 : 0);
}

// if the user runs more MPI ranks than cores some of the ranks
Expand All @@ -560,7 +566,7 @@ int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested,

// assign each thread to a CUDA device or CPU core
#if defined(TECA_HAS_CUDA)
if (verbose && (n_threads < n_cuda_devices))
if (verbose && (n_ranks < 2) && (n_threads < n_cuda_devices))
{
TECA_WARNING(<< n_threads
<< " threads is insufficient to service " << n_cuda_devices
Expand Down Expand Up @@ -590,7 +596,7 @@ int thread_parameters(MPI_Comm comm, int base_core_id, int n_requested,
if (!bind)
{
if (verbose)
TECA_STATUS("thread to core binding disabled")
TECA_STATUS("thread to core binding disabled. n_threads=" << n_threads)

return 0;
}
Expand Down
Loading