Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EC/CUDA: enable memops for executor #691

Merged
merged 2 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions src/components/ec/base/ucc_ec_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ typedef struct ucc_ec_attr {
} ucc_ec_attr_t;

typedef struct ucc_ec_ops {
ucc_status_t (*task_post)(void *ee_context, void **ee_req);
ucc_status_t (*task_query)(void *ee_req);
ucc_status_t (*task_end)(void *ee_req);
ucc_status_t (*create_event)(void **event);
ucc_status_t (*destroy_event)(void *event);
ucc_status_t (*event_post)(void *ee_context, void *event);
Expand All @@ -59,7 +56,8 @@ typedef struct ucc_ee_executor {
} ucc_ee_executor_t;

enum ucc_ee_executor_params_field {
UCC_EE_EXECUTOR_PARAM_FIELD_TYPE = UCC_BIT(0),
UCC_EE_EXECUTOR_PARAM_FIELD_TYPE = UCC_BIT(0),
UCC_EE_EXECUTOR_PARAM_FIELD_TASK_TYPES = UCC_BIT(1),
};

typedef enum ucc_ee_executor_task_type {
Expand All @@ -74,6 +72,7 @@ typedef enum ucc_ee_executor_task_type {
typedef struct ucc_ee_executor_params {
uint64_t mask;
ucc_ee_type_t ee_type;
uint64_t task_types;
} ucc_ee_executor_params_t;

#define UCC_EE_EXECUTOR_NUM_BUFS 9
Expand Down
3 changes: 0 additions & 3 deletions src/components/ec/cpu/ec_cpu.c
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,6 @@ ucc_ec_cpu_t ucc_ec_cpu = {
.table = ucc_ec_cpu_config_table,
.size = sizeof(ucc_ec_cpu_config_t),
},
.super.ops.task_post = NULL,
.super.ops.task_query = NULL,
.super.ops.task_end = NULL,
.super.ops.create_event = NULL,
.super.ops.destroy_event = NULL,
.super.ops.event_post = NULL,
Expand Down
13 changes: 7 additions & 6 deletions src/components/ec/cuda/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ if HAVE_CUDA
SUBDIRS = kernel

sources = \
ec_cuda.h \
ec_cuda.c \
ec_cuda_executor.h \
ec_cuda_executor.c \
ec_cuda_executor_interruptible.c \
ec_cuda_executor_persistent.c
ec_cuda.h \
ec_cuda.c \
ec_cuda_executor.h \
ec_cuda_executor.c \
ec_cuda_executor_interruptible.c \
ec_cuda_executor_persistent.c \
ec_cuda_executor_persistent_wait.c

module_LTLIBRARIES = libucc_ec_cuda.la
libucc_ec_cuda_la_SOURCES = $(sources)
Expand Down
180 changes: 0 additions & 180 deletions src/components/ec/cuda/ec_cuda.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@ static const char *stream_task_modes[] = {
[UCC_EC_CUDA_TASK_LAST] = NULL
};

static const char *task_stream_types[] = {
[UCC_EC_CUDA_USER_STREAM] = "user",
[UCC_EC_CUDA_INTERNAL_STREAM] = "ucc",
[UCC_EC_CUDA_TASK_STREAM_LAST] = NULL
};

static ucc_config_field_t ucc_ec_cuda_config_table[] = {
{"", "", NULL, ucc_offsetof(ucc_ec_cuda_config_t, super),
UCC_CONFIG_TYPE_TABLE(ucc_ec_config_table)},
Expand All @@ -36,18 +30,6 @@ static ucc_config_field_t ucc_ec_cuda_config_table[] = {
ucc_offsetof(ucc_ec_cuda_config_t, strm_task_mode),
UCC_CONFIG_TYPE_ENUM(stream_task_modes)},

{"TASK_STREAM", "user",
"Stream for cuda task\n"
"user - user stream provided in execution engine context\n"
"ucc - ucc library internal stream",
ucc_offsetof(ucc_ec_cuda_config_t, task_strm_type),
UCC_CONFIG_TYPE_ENUM(task_stream_types)},

{"STREAM_BLOCKING_WAIT", "1",
"Stream is blocked until collective operation is done",
ucc_offsetof(ucc_ec_cuda_config_t, stream_blocking_wait),
UCC_CONFIG_TYPE_UINT},

{"EXEC_NUM_WORKERS", "1",
"Number of thread blocks to use for cuda persistent executor",
ucc_offsetof(ucc_ec_cuda_config_t, exec_num_workers),
Expand Down Expand Up @@ -143,38 +125,6 @@ static ucc_mpool_ops_t ucc_ec_cuda_ee_executor_mpool_ops = {
.obj_cleanup = ucc_ec_cuda_executor_chunk_cleanup,
};

static ucc_status_t ucc_ec_cuda_stream_req_mpool_chunk_malloc(ucc_mpool_t *mp, //NOLINT: mp is unused
size_t *size_p,
void ** chunk_p)
{
ucc_status_t status;

status = CUDA_FUNC(cudaHostAlloc((void**)chunk_p, *size_p,
cudaHostAllocMapped));
return status;
}

static void ucc_ec_cuda_stream_req_mpool_chunk_free(ucc_mpool_t *mp, //NOLINT: mp is unused
void * chunk)
{
cudaFreeHost(chunk);
}

static void ucc_ec_cuda_stream_req_init(ucc_mpool_t *mp, void *obj, void *chunk) //NOLINT: mp is unused
{
ucc_ec_cuda_stream_request_t *req = (ucc_ec_cuda_stream_request_t*) obj;

CUDA_FUNC(cudaHostGetDevicePointer(
(void**)(&req->dev_status), (void *)&req->status, 0));
}

static ucc_mpool_ops_t ucc_ec_cuda_stream_req_mpool_ops = {
.chunk_alloc = ucc_ec_cuda_stream_req_mpool_chunk_malloc,
.chunk_release = ucc_ec_cuda_stream_req_mpool_chunk_free,
.obj_init = ucc_ec_cuda_stream_req_init,
.obj_cleanup = NULL
};

static void ucc_ec_cuda_event_init(ucc_mpool_t *mp, void *obj, void *chunk) //NOLINT: mp is unused
{
ucc_ec_cuda_event_t *base = (ucc_ec_cuda_event_t *) obj;
Expand All @@ -196,28 +146,6 @@ static ucc_mpool_ops_t ucc_ec_cuda_event_mpool_ops = {
.obj_cleanup = ucc_ec_cuda_event_cleanup,
};

ucc_status_t ucc_ec_cuda_post_kernel_stream_task(uint32_t *status,
int blocking_wait,
cudaStream_t stream);

static ucc_status_t ucc_ec_cuda_post_driver_stream_task(uint32_t *status,
int blocking_wait,
cudaStream_t stream)
{
CUdeviceptr status_ptr = (CUdeviceptr)status;

if (blocking_wait) {
CUDADRV_FUNC(cuStreamWriteValue32(stream, status_ptr,
UCC_EC_CUDA_TASK_STARTED, 0));
CUDADRV_FUNC(cuStreamWaitValue32(stream, status_ptr,
UCC_EC_CUDA_TASK_COMPLETED,
CU_STREAM_WAIT_VALUE_EQ));
}
CUDADRV_FUNC(cuStreamWriteValue32(stream, status_ptr,
UCC_EC_CUDA_TASK_COMPLETED_ACK, 0));
return UCC_OK;
}

static inline void ucc_ec_cuda_set_threads_nbr(int *nt, int maxThreadsPerBlock)
{
if (*nt != UCC_ULUNITS_AUTO) {
Expand Down Expand Up @@ -303,16 +231,6 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params)
return status;
}

/* create request pool */
status = ucc_mpool_init(
&ucc_ec_cuda.strm_reqs, 0, sizeof(ucc_ec_cuda_stream_request_t), 0,
UCC_CACHE_LINE_SIZE, 16, UINT_MAX, &ucc_ec_cuda_stream_req_mpool_ops,
UCC_THREAD_MULTIPLE, "CUDA Event Objects");
if (status != UCC_OK) {
ec_error(&ucc_ec_cuda.super, "failed to create event pool");
return status;
}

status = ucc_mpool_init(
&ucc_ec_cuda.executors, 0, sizeof(ucc_ec_cuda_executor_t), 0,
UCC_CACHE_LINE_SIZE, 16, UINT_MAX, &ucc_ec_cuda_ee_executor_mpool_ops,
Expand Down Expand Up @@ -344,10 +262,8 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params)

if (cfg->strm_task_mode == UCC_EC_CUDA_TASK_KERNEL) {
ucc_ec_cuda.strm_task_mode = UCC_EC_CUDA_TASK_KERNEL;
ucc_ec_cuda.post_strm_task = ucc_ec_cuda_post_kernel_stream_task;
} else {
ucc_ec_cuda.strm_task_mode = UCC_EC_CUDA_TASK_MEM_OPS;
ucc_ec_cuda.post_strm_task = ucc_ec_cuda_post_driver_stream_task;
#if CUDA_VERSION < 12000
CUresult cu_st;
CUdevice cu_dev;
Expand All @@ -370,7 +286,6 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params)
ec_info(&ucc_ec_cuda.super,
"CUDA MEM OPS are not supported or disabled");
ucc_ec_cuda.strm_task_mode = UCC_EC_CUDA_TASK_KERNEL;
ucc_ec_cuda.post_strm_task = ucc_ec_cuda_post_kernel_stream_task;
}
} else if (attr == 0) {
ec_error(&ucc_ec_cuda.super,
Expand All @@ -391,7 +306,6 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params)
}
}

ucc_ec_cuda.task_strm_type = cfg->task_strm_type;
ucc_spinlock_init(&ucc_ec_cuda.init_spinlock, 0);
return UCC_OK;
}
Expand All @@ -404,96 +318,6 @@ static ucc_status_t ucc_ec_cuda_get_attr(ucc_ec_attr_t *ec_attr)
return UCC_OK;
}

ucc_status_t ucc_ec_cuda_task_post(void *ee_stream, void **ee_req)
{
ucc_ec_cuda_config_t *cfg = EC_CUDA_CONFIG;
ucc_ec_cuda_stream_request_t *req;
ucc_ec_cuda_event_t *cuda_event;
ucc_status_t status;

UCC_EC_CUDA_INIT_STREAM();
req = ucc_mpool_get(&ucc_ec_cuda.strm_reqs);
if (ucc_unlikely(!req)) {
ec_error(&ucc_ec_cuda.super, "failed to get stream request from mpool");
return UCC_ERR_NO_MEMORY;
}
req->status = UCC_EC_CUDA_TASK_POSTED;
req->stream = (cudaStream_t)ee_stream;

if (ucc_ec_cuda.task_strm_type == UCC_EC_CUDA_USER_STREAM) {
status = ucc_ec_cuda.post_strm_task(req->dev_status,
cfg->stream_blocking_wait,
req->stream);
if (ucc_unlikely(status != UCC_OK)) {
goto free_req;
}
} else {
cuda_event = ucc_mpool_get(&ucc_ec_cuda.events);
if (ucc_unlikely(!cuda_event)) {
ec_error(&ucc_ec_cuda.super, "failed to get event from mpool");
status = UCC_ERR_NO_MEMORY;
goto free_req;
}

CUDA_CHECK(cudaEventRecord(cuda_event->event, req->stream));
CUDA_CHECK(cudaStreamWaitEvent(ucc_ec_cuda.stream, cuda_event->event, 0));
status = ucc_ec_cuda.post_strm_task(req->dev_status,
cfg->stream_blocking_wait,
ucc_ec_cuda.stream);
if (ucc_unlikely(status != UCC_OK)) {
goto free_event;
}
CUDA_CHECK(cudaEventRecord(cuda_event->event, ucc_ec_cuda.stream));
CUDA_CHECK(cudaStreamWaitEvent(req->stream, cuda_event->event, 0));
ucc_mpool_put(cuda_event);
}

*ee_req = (void *) req;

ec_info(&ucc_ec_cuda.super, "stream task posted on \"%s\" stream. req:%p",
task_stream_types[ucc_ec_cuda.task_strm_type], req);

return UCC_OK;

free_event:
ucc_mpool_put(cuda_event);
free_req:
ucc_mpool_put(req);
return status;
}

ucc_status_t ucc_ec_cuda_task_query(void *ee_req)
{
ucc_ec_cuda_stream_request_t *req = ee_req;

/* ee task might be only in POSTED, STARTED or COMPLETED_ACK state
COMPLETED state is used by ucc_ee_cuda_task_end function to request
stream unblock*/
ucc_assert(req->status != UCC_EC_CUDA_TASK_COMPLETED);
if (req->status == UCC_EC_CUDA_TASK_POSTED) {
return UCC_INPROGRESS;
}
ec_info(&ucc_ec_cuda.super, "stream task started. req:%p", req);
return UCC_OK;
}

ucc_status_t ucc_ec_cuda_task_end(void *ee_req)
{
ucc_ec_cuda_stream_request_t *req = ee_req;
volatile ucc_ec_task_status_t *st = &req->status;

/* can be safely ended only if it's in STARTED or COMPLETED_ACK state */
ucc_assert((*st != UCC_EC_CUDA_TASK_POSTED) &&
(*st != UCC_EC_CUDA_TASK_COMPLETED));
if (*st == UCC_EC_CUDA_TASK_STARTED) {
*st = UCC_EC_CUDA_TASK_COMPLETED;
while(*st != UCC_EC_CUDA_TASK_COMPLETED_ACK) { }
}
ucc_mpool_put(req);
ec_info(&ucc_ec_cuda.super, "stream task done. req:%p", req);
return UCC_OK;
}

ucc_status_t ucc_ec_cuda_event_create(void **event)
{
ucc_ec_cuda_event_t *cuda_event;
Expand Down Expand Up @@ -556,7 +380,6 @@ static ucc_status_t ucc_ec_cuda_finalize()
}

ucc_mpool_cleanup(&ucc_ec_cuda.events, 1);
ucc_mpool_cleanup(&ucc_ec_cuda.strm_reqs, 1);
ucc_mpool_cleanup(&ucc_ec_cuda.executors, 1);
ucc_mpool_cleanup(&ucc_ec_cuda.executor_interruptible_tasks, 1);
ucc_mpool_cleanup(&ucc_ec_cuda.executor_persistent_tasks, 1);
Expand All @@ -579,9 +402,6 @@ ucc_ec_cuda_t ucc_ec_cuda = {
.table = ucc_ec_cuda_config_table,
.size = sizeof(ucc_ec_cuda_config_t),
},
.super.ops.task_post = ucc_ec_cuda_task_post,
.super.ops.task_query = ucc_ec_cuda_task_query,
.super.ops.task_end = ucc_ec_cuda_task_end,
.super.ops.create_event = ucc_ec_cuda_event_create,
.super.ops.destroy_event = ucc_ec_cuda_event_destroy,
.super.ops.event_post = ucc_ec_cuda_event_post,
Expand Down
19 changes: 1 addition & 18 deletions src/components/ec/cuda/ec_cuda.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,6 @@ typedef enum ucc_ec_cuda_strm_task_mode {
UCC_EC_CUDA_TASK_LAST,
} ucc_ec_cuda_strm_task_mode_t;

typedef enum ucc_ec_cuda_task_stream_type {
UCC_EC_CUDA_USER_STREAM,
UCC_EC_CUDA_INTERNAL_STREAM,
UCC_EC_CUDA_TASK_STREAM_LAST
} ucc_ec_cuda_task_stream_type_t;

typedef enum ucc_ec_task_status {
UCC_EC_CUDA_TASK_COMPLETED,
UCC_EC_CUDA_TASK_POSTED,
UCC_EC_CUDA_TASK_STARTED,
UCC_EC_CUDA_TASK_COMPLETED_ACK
} ucc_ec_task_status_t;

typedef enum ucc_ec_cuda_executor_state {
UCC_EC_CUDA_EXECUTOR_INITIALIZED,
UCC_EC_CUDA_EXECUTOR_POSTED,
Expand All @@ -54,8 +41,6 @@ typedef ucc_status_t (*ucc_ec_cuda_task_post_fn) (uint32_t *dev_status,
typedef struct ucc_ec_cuda_config {
ucc_ec_config_t super;
ucc_ec_cuda_strm_task_mode_t strm_task_mode;
ucc_ec_cuda_task_stream_type_t task_strm_type;
int stream_blocking_wait;
unsigned long exec_num_workers;
unsigned long exec_num_threads;
unsigned long exec_max_tasks;
Expand All @@ -72,14 +57,11 @@ typedef struct ucc_ec_cuda {
int exec_streams_initialized;
cudaStream_t *exec_streams;
ucc_mpool_t events;
ucc_mpool_t strm_reqs;
ucc_mpool_t executors;
ucc_mpool_t executor_interruptible_tasks;
ucc_mpool_t executor_persistent_tasks;
ucc_thread_mode_t thread_mode;
ucc_ec_cuda_strm_task_mode_t strm_task_mode;
ucc_ec_cuda_task_stream_type_t task_strm_type;
ucc_ec_cuda_task_post_fn post_strm_task;
ucc_spinlock_t init_spinlock;
} ucc_ec_cuda_t;

Expand Down Expand Up @@ -116,6 +98,7 @@ typedef struct ucc_ec_cuda_executor_task_ops {
typedef struct ucc_ec_cuda_executor {
ucc_ee_executor_t super;
ucc_ec_cuda_executor_mode_t mode;
uint64_t requested_ops;
ucc_ec_cuda_executor_task_ops_t ops;
ucc_spinlock_t tasks_lock;
ucc_ec_cuda_executor_state_t state;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR, but shouldn't state need to be allocated with cudaHostAlloc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's allocated with cudaHostAlloc see ucc_ec_cuda_ee_executor_mpool_ops

Expand Down
Loading