Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug fix: rmt can not exit when sync redis data to rdb files #27

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/rmt.c
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,12 @@ init_context(struct instance *rmti)
destroy_context(rmt_ctx);
return NULL;
}

rmt_ctx->dir = sdsdup(cf->dir);
}
if (cf->rdb_prefix != CONF_UNSET_PTR && sdslen(cf->rdb_prefix) > 0) {
rmt_ctx->rdb_prefix = sdsdup(cf->rdb_prefix);
}

rmt_ctx->loop = aeCreateEventLoop(1000);
if (rmt_ctx->loop == NULL) {
Expand Down Expand Up @@ -271,6 +274,10 @@ void destroy_context(rmtContext *rmt_ctx)
sdsfree(rmt_ctx->dir);
}

if (rmt_ctx->rdb_prefix != NULL) {
sdsfree(rmt_ctx->rdb_prefix);
}

while(listLength(&rmt_ctx->clients) > 0) {
lnode = listFirst(&rmt_ctx->clients);
c = listNodeValue(lnode);
Expand Down
12 changes: 11 additions & 1 deletion src/rmt_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ static conf_option conf_common_options[] = {
{ (char*)"dir",
conf_set_string,
offsetof(rmt_conf, dir) },
{ (char*)"rdb_prefix",
conf_set_string,
offsetof(rmt_conf, rdb_prefix) },
{ (char*)"max_clients",
conf_set_num,
offsetof(rmt_conf, max_clients) },
Expand Down Expand Up @@ -694,6 +697,7 @@ static int conf_init(rmt_conf *cf)
cf->rdb_diskless = CONF_UNSET_NUM;
cf->source_safe = CONF_UNSET_NUM;
cf->dir = CONF_UNSET_PTR;
cf->rdb_prefix = CONF_UNSET_PTR;

cf->max_clients = CONF_UNSET_NUM;

Expand Down Expand Up @@ -735,7 +739,12 @@ static void conf_deinit(rmt_conf *cf)
sdsfree(cf->dir);
cf->dir = CONF_UNSET_PTR;
}


if(cf->rdb_prefix != NULL){
sdsfree(cf->rdb_prefix);
cf->rdb_prefix = CONF_UNSET_PTR;
}

cf->maxmemory = CONF_UNSET_NUM;
cf->threads = CONF_UNSET_NUM;
cf->step = CONF_UNSET_NUM;
Expand Down Expand Up @@ -792,6 +801,7 @@ conf_dump(rmt_conf *cf)
log_debug(log_level, " rdb_diskless: %d", cf->rdb_diskless);
log_debug(log_level, " source_safe: %d", cf->source_safe);
log_debug(log_level, " dir: %s", cf->dir);
log_debug(log_level, " rdb_prefix: %s", cf->rdb_prefix);
log_debug(log_level, " max_clients: %d", cf->max_clients);
log_debug(log_level, " filter: %s", cf->filter);
log_debug(log_level, "");
Expand Down
1 change: 1 addition & 0 deletions src/rmt_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ typedef struct rmt_conf {
int rdb_diskless;
int source_safe;
sds dir;
sds rdb_prefix;

int max_clients;
sds filter;
Expand Down
27 changes: 25 additions & 2 deletions src/rmt_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,16 @@ static void *read_thread_run(void *args)
redis_node *srnode;
listNode *lnode;
listIter *it;
static int read_num;

read_num = listLength(nodes);
log_notice("init read event num:%u", read_num);
it = listGetIterator(nodes, AL_START_HEAD);
while((lnode = listNext(it)) != NULL){
srnode = listNodeValue(lnode);
srnode->event_num = &read_num;
}
listReleaseIterator(it);

it = listGetIterator(nodes, AL_START_HEAD);
while((lnode = listNext(it)) != NULL){
Expand Down Expand Up @@ -1217,6 +1227,7 @@ static void *read_thread_run(void *args)
listReleaseIterator(it);

aeMain(rdata->loop);
log_notice("read thread over");

return 0;
}
Expand Down Expand Up @@ -1275,6 +1286,7 @@ static void *write_thread_run(void *args)
rmt_write(srnode->sockpairfds[1], " ", 1);

aeMain(wdata->loop);
log_notice("write thread over");

return 0;
}
Expand Down Expand Up @@ -2550,19 +2562,30 @@ void redis_migrate(rmtContext *ctx, int type)

log_notice("migrate job is running...");

aeMain(ctx->loop);
// aeMain(ctx->loop);

//wait for the read job finish
for(i = 0; i < read_threads_count; i ++){
rdata = array_get(read_datas, (uint32_t)i);
pthread_join(rdata->thread_id, NULL);
}
log_notice("migrate read job over");

//wait for the write job finish
for(i = 0; i < write_threads_count; i ++){
wdata = array_get(write_datas, (uint32_t)i);
wdata = array_get(write_datas, (uint32_t)i);
list *nodes = wdata->nodes; //type : source redis_node
redis_group *srgroup;
redis_node *srnode = listFirstValue(nodes);
if(srnode != NULL){
srgroup = srnode->owner;
if(srgroup->kind != GROUP_TYPE_RDBFILE){
aeStop(wdata->loop);
}
}
pthread_join(wdata->thread_id, NULL);
}
log_notice("migrate write job over");

done:

Expand Down
1 change: 1 addition & 0 deletions src/rmt_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ typedef struct rmtContext {
int source_safe;

sds dir;
sds rdb_prefix;

struct redis_group *srgroup;

Expand Down
12 changes: 11 additions & 1 deletion src/rmt_redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -1651,6 +1651,11 @@ static void rmtReceiveRdb(aeEventLoop *el, int fd, void *privdata, int mask)
log_notice("Rdb file received, disconnect from the node[%s]",
srnode->addr);
notice_write_thread(srnode); /* Let the next node begin replication */
int event_num = __sync_fetch_and_sub(srnode->event_num, 1);
log_notice("current read event number:%u", event_num - 1);
if (event_num == 1) {
aeStop(el);
}
return;
}

Expand Down Expand Up @@ -1902,9 +1907,14 @@ static void rmtSyncRedisMaster(aeEventLoop *el, int fd, void *privdata, int mask
if (ctx->dir != NULL) {
rdb->fname = sdscatsds(rdb->fname, ctx->dir);
rdb->fname = sdscat(rdb->fname, "/");
if (ctx->rdb_prefix != NULL && sdslen(ctx->rdb_prefix)) {
rdb->fname = sdscatsds(rdb->fname, ctx->rdb_prefix);
} else {
rdb->fname = sdscat(rdb->fname, "node");
}
}
rdb->fname = sdscatfmt(rdb->fname,
"node%s-%I-%i.rdb",
"%s-%I-%i.rdb",
srnode->addr==NULL?"unknow":srnode->addr,
rmt_usec_now(),
(long int)getpid());
Expand Down
1 change: 1 addition & 0 deletions src/rmt_redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ typedef struct redis_node{
long long timestamp;

int sk_event; /* used to run some task */
unsigned int *event_num;

struct redis_node *next; /* next redis_node to begin replication */
}redis_node;
Expand Down