Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding the XCCL DPU team, and DPU daemon #106

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ AC_ARG_WITH([cuda],
AM_CONDITIONAL([HAVE_CUDA], [test "x$cuda_happy" != xno])
AC_MSG_RESULT([CUDA support: $cuda_happy; $CUDA_CPPFLAGS $CUDA_LDFLAGS])

AC_ARG_WITH([dpu],
AC_HELP_STRING([--with-dpu=yes/no], [Enable/Disable dpu team]),
[AS_IF([test "x$with_dpu" != "xno"], dpu_happy="yes", dpu_happy="no")],
[dpu_happy="no"])
AM_CONDITIONAL([HAVE_DPU], [test "x$dpu_happy" != xno])
AC_MSG_RESULT([DPU support: $dpu_happy])

AM_CONDITIONAL([HAVE_NCCL], [false])
if test "x$cuda_happy" != xno; then
m4_include([m4/nccl.m4])
Expand All @@ -136,6 +143,7 @@ AC_CONFIG_FILES([
src/team_lib/hier/Makefile
src/team_lib/multirail/Makefile
src/team_lib/nccl/Makefile
src/team_lib/dpu/Makefile
src/utils/cuda/Makefile
src/utils/cuda/kernels/Makefile
test/Makefile
Expand Down
20 changes: 20 additions & 0 deletions contrib/dpu_daemon/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Copyright (c) 2020 Mellanox Technologies. All rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow
#
# $HEADER$
#

CFLAGS = -I$(XCCL_PATH)/include -I$(UCX_PATH)/include
LDFLAGS = -L$(XCCL_PATH)/lib $(XCCL_PATH)/lib/libxccl.so $(UCX_PATH)/lib/libucs.so $(UCX_PATH)/lib/libucp.so -Wl,-rpath -Wl,$(XCCL_PATH)/lib -Wl,-rpath -Wl,$(XCCL_PATH)/lib

rel:
mpicc -O3 -DNDEBUG -std=c11 $(CFLAGS) -o dpu_server dpu_server.c host_channel.c server_xccl.c $(LDFLAGS)

dbg:
mpicc -O0 -g -std=c11 $(CFLAGS) -o dpu_server dpu_server.c host_channel.c server_xccl.c $(LDFLAGS)

clean:
rm -f dpu_server
171 changes: 171 additions & 0 deletions contrib/dpu_daemon/dpu_server.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Copyright (C) Mellanox Technologies Ltd. 2020. ALL RIGHTS RESERVED.
* See file LICENSE for terms.
*/

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

#include "server_xccl.h"
#include "host_channel.h"

#define MAX_THREADS 128
typedef struct {
pthread_t id;
int idx, nthreads;
dpu_xccl_comm_t comm;
dpu_hc_t *hc;
unsigned int itt;
} thread_ctx_t;

/* thread accisble data - split reader/writer */
typedef struct {
volatile unsigned long g_itt; /* first cache line */
volatile unsigned long pad[3]; /* pad to 64bytes */
volatile unsigned long l_itt; /* second cache line */
volatile unsigned long pad2[3]; /* pad to 64 bytes */
} thread_sync_t;

static thread_sync_t *thread_sync = NULL;

void *dpu_worker(void *arg)
{
int i = 0;
thread_ctx_t *ctx = (thread_ctx_t*)arg;
xccl_coll_req_h request;

while(1) {
ctx->itt++;
if (ctx->idx > 0) {
while (thread_sync[ctx->idx].g_itt < ctx->itt) {
/* busy wait */
}
}
else {
dpu_hc_wait(ctx->hc, ctx->itt);
for (i = 0; i < ctx->nthreads; i++) {
thread_sync[i].g_itt++;
}
}

int offset, block;
int count = dpu_hc_get_count(ctx->hc);
int ready = 0;

block = count / ctx->nthreads;
offset = block * ctx->idx;
if(ctx->idx < (count % ctx->nthreads)) {
offset += ctx->idx;
block++;
} else {
offset += (count % ctx->nthreads);
}

xccl_coll_op_args_t coll = {
.field_mask = 0,
.coll_type = XCCL_ALLREDUCE,
.buffer_info = {
.src_buffer = ctx->hc->mem_segs.put.base + offset * sizeof(int),
.dst_buffer = ctx->hc->mem_segs.get.base + offset * sizeof(int),
.len = block * xccl_dt_size(dpu_hc_get_dtype(ctx->hc)),
},
.reduce_info = {
.dt = dpu_hc_get_dtype(ctx->hc),
.op = dpu_hc_get_op(ctx->hc),
.count = block,
},
.alg.set_by_user = 0,
.tag = 123, //todo
};

if (coll.reduce_info.op == XCCL_OP_UNSUPPORTED) {
break;
}

XCCL_CHECK(xccl_collective_init(&coll, &request, ctx->comm.team));
XCCL_CHECK(xccl_collective_post(request));
while (XCCL_OK != xccl_collective_test(request)) {
xccl_context_progress(ctx->comm.ctx);
}
XCCL_CHECK(xccl_collective_finalize(request));

thread_sync[ctx->idx].l_itt++;

if (ctx->idx == 0) {
while (ready != ctx->nthreads) {
ready = 0;
for (i = 0; i < ctx->nthreads; i++) {
if (thread_sync[i].l_itt == ctx->itt) {
ready++;
}
else {
break;
}
}
}

dpu_hc_reply(ctx->hc, ctx->itt);
}
}

return NULL;
}

int main(int argc, char **argv)
{
int nthreads = 0, i;
thread_ctx_t *tctx_pool = NULL;
dpu_xccl_global_t xccl_glob;
dpu_hc_t hc_b, *hc = &hc_b;

if (argc < 2 ) {
printf("Need thread # as an argument\n");
return 1;
}
nthreads = atoi(argv[1]);
if (MAX_THREADS < nthreads || 0 >= nthreads) {
printf("ERROR: bad thread #: %d\n", nthreads);
return 1;
}
printf("DPU daemon: Running with %d threads\n", nthreads);
tctx_pool = calloc(nthreads, sizeof(*tctx_pool));
XCCL_CHECK(dpu_xccl_init(argc, argv, &xccl_glob));

// thread_sync = calloc(nthreads, sizeof(*thread_sync));
thread_sync = aligned_alloc(64, nthreads * sizeof(*thread_sync));
memset(thread_sync, 0, nthreads * sizeof(*thread_sync));

dpu_hc_init(hc);
dpu_hc_accept(hc);

for(i = 0; i < nthreads; i++) {
// printf("Thread %d spawned!\n", i);
XCCL_CHECK(dpu_xccl_alloc_team(&xccl_glob, &tctx_pool[i].comm));

tctx_pool[i].idx = i;
tctx_pool[i].nthreads = nthreads;
tctx_pool[i].hc = hc;
tctx_pool[i].itt = 0;

if (i < nthreads - 1) {
pthread_create(&tctx_pool[i].id, NULL, dpu_worker,
(void*)&tctx_pool[i]);
}
}

/* The final DPU worker is executed in this context */
dpu_worker((void*)&tctx_pool[i-1]);

for(i = 0; i < nthreads; i++) {
if (i < nthreads - 1) {
pthread_join(tctx_pool[i].id, NULL);
}
dpu_xccl_free_team(&xccl_glob, &tctx_pool[i].comm);
// printf("Thread %d joined!\n", i);
}

dpu_xccl_finalize(&xccl_glob);
return 0;
}
Loading