Skip to content

Commit

Permalink
Merge pull request #177 from chaisencs123/delayed_bettter
Browse files Browse the repository at this point in the history
DelayQueue重试优化-完善监控和日志
  • Loading branch information
keliwang authored Dec 1, 2023
2 parents 4942bb4 + 5c0bfaf commit 5abc92f
Show file tree
Hide file tree
Showing 32 changed files with 332 additions and 99 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.qunar.qmq</groupId>
<artifactId>qmq-parent</artifactId>
<version>1.1.43</version>
<version>1.1.44-SNAPSHOT</version>
<packaging>pom</packaging>

<name>qmq</name>
Expand Down
2 changes: 1 addition & 1 deletion qmq-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>qmq-parent</artifactId>
<groupId>com.qunar.qmq</groupId>
<version>1.1.43</version>
<version>1.1.44-SNAPSHOT</version>
</parent>

<artifactId>qmq-api</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion qmq-backup/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.qunar.qmq</groupId>
<artifactId>qmq-parent</artifactId>
<version>1.1.43</version>
<version>1.1.44-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion qmq-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>qmq-parent</artifactId>
<groupId>com.qunar.qmq</groupId>
<version>1.1.43</version>
<version>1.1.44-SNAPSHOT</version>
</parent>

<artifactId>qmq-client</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion qmq-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>qmq-parent</artifactId>
<groupId>com.qunar.qmq</groupId>
<version>1.1.43</version>
<version>1.1.44-SNAPSHOT</version>
</parent>

<artifactId>qmq-common</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion qmq-delay-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.qunar.qmq</groupId>
<artifactId>qmq-parent</artifactId>
<version>1.1.43</version>
<version>1.1.44-SNAPSHOT</version>
</parent>

<artifactId>qmq-delay-server</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,40 @@ public static void appendFailedByMessageIllegal(String subject) {
public static void processTime(String subject, long time) {
Metrics.timer("processTime", SUBJECT_ARRAY, new String[]{subject}).update(time, TimeUnit.MILLISECONDS);
}

public static void sendBatchExecutorAddFailed(String subject) {
countInc("sendBatchExecutorAddFailed", subject);
}

public static void hashedWheelTimerExpireError() {
Metrics.counter("hashed_wheel_timer_expire_error", EMPTY, EMPTY).inc();
}

public static void addWheelFailed(String subject) {
countInc("addWheelFailed", subject);
}

public static void sendProcessorFailed() {
Metrics.counter("sendProcessorFailed", EMPTY, EMPTY).inc();
}

public static void sendProcessorPartlyFailed() {
Metrics.counter("sendProcessorPartlyFailed", EMPTY, EMPTY).inc();
}

public static void appendScheduleLogTime(String subject, long time) {
Metrics.timer("appendScheduleLogTime", SUBJECT_ARRAY, new String[]{subject}).update(time, TimeUnit.MILLISECONDS);
}

public static void retryNettySend() {
Metrics.counter("retryNettySend", EMPTY, EMPTY).inc();
}

public static void retryNettySendError() {
Metrics.counter("retryNettySendError", EMPTY, EMPTY).inc();
}

public static void syncSend(String subject) {
Metrics.counter("sync_send_count", SUBJECT_ARRAY, new String[] {subject}).inc();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ ListenableFuture<Datagram> receive(List<RawMessageExtend> messages, RemotingComm
for (RawMessageExtend message : messages) {
final MessageHeader header = message.getHeader();
monitorMessageReceived(header.getCreateTime(), header.getSubject());

final ReceivedDelayMessage receivedDelayMessage = new ReceivedDelayMessage(message, cmd.getReceiveTime());
futures.add(receivedDelayMessage.getPromise());
invoker.invoke(receivedDelayMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ class SenderExecutor implements Disposable {
private final Sender sender;
private final DelayLogFacade store;
private final int sendThreads;
private DynamicConfig sendConfig;

SenderExecutor(final Sender sender, DelayLogFacade store, DynamicConfig sendConfig) {
this.sender = sender;
this.store = store;
this.brokerLoadBalance = PollBrokerLoadBalance.getInstance();
this.sendThreads = sendConfig.getInt("delay.send.threads", DEFAULT_SEND_THREAD);
this.sendConfig = sendConfig;
}

void execute(final List<ScheduleIndex> indexList, final SenderGroup.ResultHandler handler, final BrokerService brokerService) {
Expand All @@ -61,6 +63,17 @@ void execute(final List<ScheduleIndex> indexList, final SenderGroup.ResultHandle
}
}

public void syncExecute(final List<ScheduleIndex> indexList, final SenderGroup.ResultHandler handler, final BrokerService brokerService) {
Map<SenderGroup, List<ScheduleIndex>> groups = groupByBroker(indexList, brokerService);
for (Map.Entry<SenderGroup, List<ScheduleIndex>> entry : groups.entrySet()) {
doSyncExecute(entry.getKey(), entry.getValue(), handler);
}
}

private void doSyncExecute(final SenderGroup group, final List<ScheduleIndex> list, final SenderGroup.ResultHandler handler) {
group.sendSync(list, sender, handler);
}

private void doExecute(final SenderGroup group, final List<ScheduleIndex> list, final SenderGroup.ResultHandler handler) {
group.send(list, sender, handler);
}
Expand Down Expand Up @@ -88,7 +101,7 @@ private SenderGroup getGroup(BrokerGroupInfo groupInfo, int sendThreads) {
String groupName = groupInfo.getGroupName();
SenderGroup senderGroup = groupSenders.get(groupName);
if (null == senderGroup) {
senderGroup = new SenderGroup(groupInfo, sendThreads, store);
senderGroup = new SenderGroup(groupInfo, sendThreads, store, sendConfig);
SenderGroup currentSenderGroup = groupSenders.putIfAbsent(groupName, senderGroup);
senderGroup = null != currentSenderGroup ? currentSenderGroup : senderGroup;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.broker.BrokerGroupInfo;
import qunar.tc.qmq.common.Disposable;
import qunar.tc.qmq.configuration.DynamicConfig;
import qunar.tc.qmq.delay.DelayLogFacade;
import qunar.tc.qmq.delay.ScheduleIndex;
import qunar.tc.qmq.delay.monitor.QMon;
Expand All @@ -36,10 +37,7 @@
import qunar.tc.qmq.protocol.producer.SendResult;

import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import static qunar.tc.qmq.delay.monitor.QMon.delayBrokerSendMsgCount;
Expand All @@ -52,12 +50,16 @@
public class SenderGroup implements Disposable {
private static final Logger LOGGER = LoggerFactory.getLogger(SenderGroup.class);
private static final int MAX_SEND_BATCH_SIZE = 50;
private static final int RETRY_FALL_BACK = 1;//refresh再来重试
private static final int RETRY_WITHOUT_FALL_BACK = 2;//不再refresh,直接在netty端重试
private final AtomicReference<BrokerGroupInfo> groupInfo;
private final DelayLogFacade store;
private DynamicConfig sendConfig;
private final ThreadPoolExecutor executorService;
private final RateLimiter LOG_LIMITER = RateLimiter.create(2);

SenderGroup(final BrokerGroupInfo groupInfo, int sendThreads, DelayLogFacade store) {
SenderGroup(final BrokerGroupInfo groupInfo, int sendThreads, DelayLogFacade store, DynamicConfig sendConfig) {
this.sendConfig = sendConfig;
this.groupInfo = new AtomicReference<>(groupInfo);
this.store = store;
final LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(1);
Expand All @@ -69,7 +71,8 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
boolean success = false;
while (!success) {
try {
success = workQueue.add(r);
success = workQueue.offer(r);
//success = workQueue.add(r); 这里会频繁抛异常,微秒级别性能会有问题。
} catch (Throwable ignore) {

}
Expand All @@ -78,27 +81,49 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
});
}

/**
* 绕过线程池直接执行
* @param records
* @param sender
* @param handler
*/
public void sendSync(final List<ScheduleIndex> records, final Sender sender, final ResultHandler handler) {
monitorSyncSend(records);
doSend(records, sender, handler, RETRY_WITHOUT_FALL_BACK);
}

private static void monitorSyncSend(List<ScheduleIndex> records) {
try {
for (int i = 0; i < records.size(); i++) {
ScheduleIndex scheduleIndex = records.get(i);
QMon.syncSend(scheduleIndex.getSubject());
}
} catch (Throwable throwable) {

}
}

public void send(final List<ScheduleIndex> records, final Sender sender, final ResultHandler handler) {
executorService.execute(() -> doSend(records, sender, handler));
executorService.execute(() -> doSend(records, sender, handler, RETRY_FALL_BACK));
}

private void doSend(final List<ScheduleIndex> batch, final Sender sender, final ResultHandler handler) {
private void doSend(final List<ScheduleIndex> batch, final Sender sender, final ResultHandler handler, int retryStrategy) {
BrokerGroupInfo groupInfo = this.groupInfo.get();
String groupName = groupInfo.getGroupName();
List<List<ScheduleIndex>> partitions = Lists.partition(batch, MAX_SEND_BATCH_SIZE);

for (List<ScheduleIndex> partition : partitions) {
send(sender, handler, groupInfo, groupName, partition);
send(sender, handler, groupInfo, groupName, partition, retryStrategy);
}
}

private void send(Sender sender, ResultHandler handler, BrokerGroupInfo groupInfo, String groupName, List<ScheduleIndex> list) {
private void send(Sender sender, ResultHandler handler, BrokerGroupInfo groupInfo, String groupName, List<ScheduleIndex> list, int retryStrategy) {
try {
long start = System.currentTimeMillis();
List<ScheduleSetRecord> records = store.recoverLogRecord(list);
QMon.loadMsgTime(System.currentTimeMillis() - start);

Datagram response = sendMessages(records, sender);
Datagram response = sendMessages(records, sender, retryStrategy);
release(records);
monitor(list, groupName);
if (response == null) {
Expand Down Expand Up @@ -182,21 +207,77 @@ private Map<String, SendResult> getSendResult(Datagram response) {
}
}

private Datagram sendMessages(final List<ScheduleSetRecord> records, final Sender sender) {
private Datagram sendMessages(final List<ScheduleSetRecord> records, final Sender sender, int retryStrategy) {
long start = System.currentTimeMillis();

try {
return sender.send(records, this);
//return sender.send(records, this);
return doSendDatagram(records, sender);
} catch (ClientSendException e) {
ClientSendException.SendErrorCode errorCode = e.getSendErrorCode();
monitorSendError(records, groupInfo.get(), errorCode.ordinal());
} catch (Exception e) {
monitorSendError(records, groupInfo.get(), -1);
LOGGER.error("SenderGroup sendMessages error, client send exception, broker group={}, errorCode={}", groupInfo.get(), errorCode, e);
monitorSendError(records, groupInfo.get(), errorCode.ordinal(), e);
} catch (Throwable e) {
LOGGER.error("SenderGroup sendMessages error, broker group={}", groupInfo.get(), e);
monitorSendError(records, groupInfo.get(), -1, e);
} finally {
QMon.sendMsgTime(groupInfo.get().getGroupName(), System.currentTimeMillis() - start);
}
return retrySendMessages(records, sender, retryStrategy);
}

private Datagram doSendDatagram(List<ScheduleSetRecord> records, Sender sender) throws Exception {
return sender.send(records, this);
}

/**
* 如果想更优雅,可以参考guava-retry
* @param records
* @param sender
* @param retryStrategy
* @return
*/
private Datagram retrySendMessages(List<ScheduleSetRecord> records, Sender sender, int retryStrategy) {
Datagram result = null;
int maxTimes = sendConfig.getInt("netty.send.message.retry.max.times", 20);
/**
* 如果是Refresh重试过来的,那就在这里一直重试netty
* 如果delay机器本身出问题,refresh也没用,就在这里卡住
*/
if (retryStrategy == RETRY_WITHOUT_FALL_BACK) {
maxTimes = sendConfig.getInt("netty.send.message.retry.without.fallback.max.times", 36000);
}
for (int count = 0; count < maxTimes; count++) {
try {
QMon.retryNettySend();
result = doSendDatagram(records, sender);
if (result != null) {
break;
}
} catch (Throwable e) {
if (LOG_LIMITER.tryAcquire()) {
for (ScheduleSetRecord record : records) {
LOGGER.error("retry senderGroup sendMessages error, retry_count={}, subject={}, brokerGroup={}", count, record.getSubject(), groupInfo.get(), e);
}
}
QMon.retryNettySendError();
retryRandomSleep(retryStrategy);
}
}
return result;
}

private void retryRandomSleep(int retryStrategy) {
long randomMinSleepMillis = sendConfig.getLong("netty.send.message.retry.min.sleep.millis", 1000L);
long randomMaxSleepMillis = sendConfig.getLong("netty.send.message.retry.max.sleep.millis", 10000L);
if (retryStrategy == RETRY_WITHOUT_FALL_BACK) {
randomMaxSleepMillis = sendConfig.getLong("netty.send.message.retry.without.fallback.max.sleep.millis", 30000L);
}
final ThreadLocalRandom random = ThreadLocalRandom.current();
try {
Thread.sleep(random.nextLong(randomMinSleepMillis, randomMaxSleepMillis));
} catch (InterruptedException ex) {

return null;
}
}

private void monitorSendFail(List<ScheduleIndex> indexList, String groupName) {
Expand All @@ -210,13 +291,15 @@ private void monitorSendFail(String subject, String groupName) {
QMon.nettySendMessageFailCount(subject, groupName);
}

private void monitorSendError(List<ScheduleSetRecord> records, BrokerGroupInfo group, int errorCode) {
records.parallelStream().forEach(record -> monitorSendError(record.getSubject(), group, errorCode));
private void monitorSendError(List<ScheduleSetRecord> records, BrokerGroupInfo group, int errorCode, Throwable throwable) {
for (ScheduleSetRecord record : records) {
monitorSendError(record.getSubject(), group, errorCode, throwable);
}
}

private void monitorSendError(String subject, BrokerGroupInfo group, int errorCode) {
private void monitorSendError(String subject, BrokerGroupInfo group, int errorCode, Throwable throwable) {
if (LOG_LIMITER.tryAcquire()) {
LOGGER.error("netty delay sender send error,subject:{},group:{},code:{}", subject, group, errorCode);
LOGGER.error("netty delay sender send error,subject:{},group:{},code:{}", subject, group, errorCode, throwable);
}
QMon.nettySendMessageFailCount(subject, group.getGroupName());
}
Expand Down
Loading

0 comments on commit 5abc92f

Please sign in to comment.