Skip to content

Commit

Permalink
delay queue问题修复
Browse files Browse the repository at this point in the history
  • Loading branch information
sen.chai committed Nov 30, 2023
1 parent 121d8f5 commit 5c0bfaf
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,16 @@ public static void sendProcessorPartlyFailed() {
public static void appendScheduleLogTime(String subject, long time) {
Metrics.timer("appendScheduleLogTime", SUBJECT_ARRAY, new String[]{subject}).update(time, TimeUnit.MILLISECONDS);
}

public static void retryNettySend() {
Metrics.counter("retryNettySend", EMPTY, EMPTY).inc();
}

public static void retryNettySendError() {
Metrics.counter("retryNettySendError", EMPTY, EMPTY).inc();
}

public static void syncSend(String subject) {
Metrics.counter("sync_send_count", SUBJECT_ARRAY, new String[] {subject}).inc();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ ListenableFuture<Datagram> receive(List<RawMessageExtend> messages, RemotingComm
for (RawMessageExtend message : messages) {
final MessageHeader header = message.getHeader();
monitorMessageReceived(header.getCreateTime(), header.getSubject());

final ReceivedDelayMessage receivedDelayMessage = new ReceivedDelayMessage(message, cmd.getReceiveTime());
futures.add(receivedDelayMessage.getPromise());
invoker.invoke(receivedDelayMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ class SenderExecutor implements Disposable {
private final Sender sender;
private final DelayLogFacade store;
private final int sendThreads;
private DynamicConfig sendConfig;

SenderExecutor(final Sender sender, DelayLogFacade store, DynamicConfig sendConfig) {
this.sender = sender;
this.store = store;
this.brokerLoadBalance = PollBrokerLoadBalance.getInstance();
this.sendThreads = sendConfig.getInt("delay.send.threads", DEFAULT_SEND_THREAD);
this.sendConfig = sendConfig;
}

void execute(final List<ScheduleIndex> indexList, final SenderGroup.ResultHandler handler, final BrokerService brokerService) {
Expand All @@ -61,6 +63,17 @@ void execute(final List<ScheduleIndex> indexList, final SenderGroup.ResultHandle
}
}

public void syncExecute(final List<ScheduleIndex> indexList, final SenderGroup.ResultHandler handler, final BrokerService brokerService) {
Map<SenderGroup, List<ScheduleIndex>> groups = groupByBroker(indexList, brokerService);
for (Map.Entry<SenderGroup, List<ScheduleIndex>> entry : groups.entrySet()) {
doSyncExecute(entry.getKey(), entry.getValue(), handler);
}
}

private void doSyncExecute(final SenderGroup group, final List<ScheduleIndex> list, final SenderGroup.ResultHandler handler) {
group.sendSync(list, sender, handler);
}

private void doExecute(final SenderGroup group, final List<ScheduleIndex> list, final SenderGroup.ResultHandler handler) {
group.send(list, sender, handler);
}
Expand Down Expand Up @@ -88,7 +101,7 @@ private SenderGroup getGroup(BrokerGroupInfo groupInfo, int sendThreads) {
String groupName = groupInfo.getGroupName();
SenderGroup senderGroup = groupSenders.get(groupName);
if (null == senderGroup) {
senderGroup = new SenderGroup(groupInfo, sendThreads, store);
senderGroup = new SenderGroup(groupInfo, sendThreads, store, sendConfig);
SenderGroup currentSenderGroup = groupSenders.putIfAbsent(groupName, senderGroup);
senderGroup = null != currentSenderGroup ? currentSenderGroup : senderGroup;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.broker.BrokerGroupInfo;
import qunar.tc.qmq.common.Disposable;
import qunar.tc.qmq.configuration.DynamicConfig;
import qunar.tc.qmq.delay.DelayLogFacade;
import qunar.tc.qmq.delay.ScheduleIndex;
import qunar.tc.qmq.delay.monitor.QMon;
Expand All @@ -36,10 +37,7 @@
import qunar.tc.qmq.protocol.producer.SendResult;

import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import static qunar.tc.qmq.delay.monitor.QMon.delayBrokerSendMsgCount;
Expand All @@ -52,12 +50,16 @@
public class SenderGroup implements Disposable {
private static final Logger LOGGER = LoggerFactory.getLogger(SenderGroup.class);
private static final int MAX_SEND_BATCH_SIZE = 50;
private static final int RETRY_FALL_BACK = 1;//refresh再来重试
private static final int RETRY_WITHOUT_FALL_BACK = 2;//不再refresh,直接在netty端重试
private final AtomicReference<BrokerGroupInfo> groupInfo;
private final DelayLogFacade store;
private DynamicConfig sendConfig;
private final ThreadPoolExecutor executorService;
private final RateLimiter LOG_LIMITER = RateLimiter.create(2);

SenderGroup(final BrokerGroupInfo groupInfo, int sendThreads, DelayLogFacade store) {
SenderGroup(final BrokerGroupInfo groupInfo, int sendThreads, DelayLogFacade store, DynamicConfig sendConfig) {
this.sendConfig = sendConfig;
this.groupInfo = new AtomicReference<>(groupInfo);
this.store = store;
final LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(1);
Expand All @@ -79,27 +81,49 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
});
}

/**
* 绕过线程池直接执行
* @param records
* @param sender
* @param handler
*/
public void sendSync(final List<ScheduleIndex> records, final Sender sender, final ResultHandler handler) {
monitorSyncSend(records);
doSend(records, sender, handler, RETRY_WITHOUT_FALL_BACK);
}

private static void monitorSyncSend(List<ScheduleIndex> records) {
try {
for (int i = 0; i < records.size(); i++) {
ScheduleIndex scheduleIndex = records.get(i);
QMon.syncSend(scheduleIndex.getSubject());
}
} catch (Throwable throwable) {

}
}

public void send(final List<ScheduleIndex> records, final Sender sender, final ResultHandler handler) {
executorService.execute(() -> doSend(records, sender, handler));
executorService.execute(() -> doSend(records, sender, handler, RETRY_FALL_BACK));
}

private void doSend(final List<ScheduleIndex> batch, final Sender sender, final ResultHandler handler) {
private void doSend(final List<ScheduleIndex> batch, final Sender sender, final ResultHandler handler, int retryStrategy) {
BrokerGroupInfo groupInfo = this.groupInfo.get();
String groupName = groupInfo.getGroupName();
List<List<ScheduleIndex>> partitions = Lists.partition(batch, MAX_SEND_BATCH_SIZE);

for (List<ScheduleIndex> partition : partitions) {
send(sender, handler, groupInfo, groupName, partition);
send(sender, handler, groupInfo, groupName, partition, retryStrategy);
}
}

private void send(Sender sender, ResultHandler handler, BrokerGroupInfo groupInfo, String groupName, List<ScheduleIndex> list) {
private void send(Sender sender, ResultHandler handler, BrokerGroupInfo groupInfo, String groupName, List<ScheduleIndex> list, int retryStrategy) {
try {
long start = System.currentTimeMillis();
List<ScheduleSetRecord> records = store.recoverLogRecord(list);
QMon.loadMsgTime(System.currentTimeMillis() - start);

Datagram response = sendMessages(records, sender);
Datagram response = sendMessages(records, sender, retryStrategy);
release(records);
monitor(list, groupName);
if (response == null) {
Expand Down Expand Up @@ -183,21 +207,77 @@ private Map<String, SendResult> getSendResult(Datagram response) {
}
}

private Datagram sendMessages(final List<ScheduleSetRecord> records, final Sender sender) {
private Datagram sendMessages(final List<ScheduleSetRecord> records, final Sender sender, int retryStrategy) {
long start = System.currentTimeMillis();
try {
return sender.send(records, this);
//return sender.send(records, this);
return doSendDatagram(records, sender);
} catch (ClientSendException e) {
ClientSendException.SendErrorCode errorCode = e.getSendErrorCode();
LOGGER.error("SenderGroup sendMessages error, client send exception, broker group={}, errorCode={}", groupInfo.get(), errorCode, e);
monitorSendError(records, groupInfo.get(), errorCode.ordinal(), e);
} catch (Exception e) {
} catch (Throwable e) {
LOGGER.error("SenderGroup sendMessages error, broker group={}", groupInfo.get(), e);
monitorSendError(records, groupInfo.get(), -1, e);
} finally {
QMon.sendMsgTime(groupInfo.get().getGroupName(), System.currentTimeMillis() - start);
}
return null;
return retrySendMessages(records, sender, retryStrategy);
}

private Datagram doSendDatagram(List<ScheduleSetRecord> records, Sender sender) throws Exception {
return sender.send(records, this);
}

/**
* 如果想更优雅,可以参考guava-retry
* @param records
* @param sender
* @param retryStrategy
* @return
*/
private Datagram retrySendMessages(List<ScheduleSetRecord> records, Sender sender, int retryStrategy) {
Datagram result = null;
int maxTimes = sendConfig.getInt("netty.send.message.retry.max.times", 20);
/**
* 如果是Refresh重试过来的,那就在这里一直重试netty
* 如果delay机器本身出问题,refresh也没用,就在这里卡住
*/
if (retryStrategy == RETRY_WITHOUT_FALL_BACK) {
maxTimes = sendConfig.getInt("netty.send.message.retry.without.fallback.max.times", 36000);
}
for (int count = 0; count < maxTimes; count++) {
try {
QMon.retryNettySend();
result = doSendDatagram(records, sender);
if (result != null) {
break;
}
} catch (Throwable e) {
if (LOG_LIMITER.tryAcquire()) {
for (ScheduleSetRecord record : records) {
LOGGER.error("retry senderGroup sendMessages error, retry_count={}, subject={}, brokerGroup={}", count, record.getSubject(), groupInfo.get(), e);
}
}
QMon.retryNettySendError();
retryRandomSleep(retryStrategy);
}
}
return result;
}

private void retryRandomSleep(int retryStrategy) {
long randomMinSleepMillis = sendConfig.getLong("netty.send.message.retry.min.sleep.millis", 1000L);
long randomMaxSleepMillis = sendConfig.getLong("netty.send.message.retry.max.sleep.millis", 10000L);
if (retryStrategy == RETRY_WITHOUT_FALL_BACK) {
randomMaxSleepMillis = sendConfig.getLong("netty.send.message.retry.without.fallback.max.sleep.millis", 30000L);
}
final ThreadLocalRandom random = ThreadLocalRandom.current();
try {
Thread.sleep(random.nextLong(randomMinSleepMillis, randomMaxSleepMillis));
} catch (InterruptedException ex) {

}
}

private void monitorSendFail(List<ScheduleIndex> indexList, String groupName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package qunar.tc.qmq.delay.sender;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -75,6 +76,13 @@ public void init() {
this.batchExecutor.init();
}

private void sendSync(ScheduleIndex index) {
if (!BrokerRoleManager.isDelayMaster()) {
return;
}
syncProcess(Lists.newArrayList(index));
}

@Override
public void send(ScheduleIndex index) {
if (!BrokerRoleManager.isDelayMaster()) {
Expand Down Expand Up @@ -112,6 +120,15 @@ public void process(List<ScheduleIndex> indexList) {
}
}

private void syncProcess(List<ScheduleIndex> indexList) {
try {
senderExecutor.syncExecute(indexList, this, brokerService);
} catch (Exception e) {
LOGGER.error("send message failed,messageSize:{} will retry", indexList.size(), e);
retry(indexList);
}
}

private void success(ScheduleSetRecord record) {
facade.appendDispatchLog(new DispatchLogRecord(record.getSubject(), record.getMessageId(), record.getScheduleTime(), record.getSequence()));
}
Expand All @@ -138,7 +155,7 @@ private void retry(List<ScheduleIndex> indexList) {
final Set<String> refreshSubject = Sets.newHashSet();
for (ScheduleIndex index : indexList) {
refresh(index, refreshSubject);
send(index);
sendSync(index);
}
}

Expand Down

0 comments on commit 5c0bfaf

Please sign in to comment.