From 5c0bfaf810b7e6af342ed2a44d2a2230061953d9 Mon Sep 17 00:00:00 2001 From: "sen.chai" Date: Thu, 30 Nov 2023 16:52:12 +0800 Subject: [PATCH] =?UTF-8?q?delay=20queue=E9=97=AE=E9=A2=98=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/qunar/tc/qmq/delay/monitor/QMon.java | 12 ++ .../qunar/tc/qmq/delay/receiver/Receiver.java | 1 - .../tc/qmq/delay/sender/SenderExecutor.java | 15 ++- .../tc/qmq/delay/sender/SenderGroup.java | 108 +++++++++++++++--- .../tc/qmq/delay/sender/SenderProcessor.java | 19 ++- 5 files changed, 138 insertions(+), 17 deletions(-) diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/monitor/QMon.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/monitor/QMon.java index f8c44fa0..cc63a7ad 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/monitor/QMon.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/monitor/QMon.java @@ -151,4 +151,16 @@ public static void sendProcessorPartlyFailed() { public static void appendScheduleLogTime(String subject, long time) { Metrics.timer("appendScheduleLogTime", SUBJECT_ARRAY, new String[]{subject}).update(time, TimeUnit.MILLISECONDS); } + + public static void retryNettySend() { + Metrics.counter("retryNettySend", EMPTY, EMPTY).inc(); + } + + public static void retryNettySendError() { + Metrics.counter("retryNettySendError", EMPTY, EMPTY).inc(); + } + + public static void syncSend(String subject) { + Metrics.counter("sync_send_count", SUBJECT_ARRAY, new String[] {subject}).inc(); + } } diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/receiver/Receiver.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/receiver/Receiver.java index f54ed56d..44de6982 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/receiver/Receiver.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/receiver/Receiver.java @@ -73,7 +73,6 @@ ListenableFuture receive(List messages, RemotingComm for (RawMessageExtend message : messages) { final MessageHeader header = message.getHeader(); monitorMessageReceived(header.getCreateTime(), header.getSubject()); - final ReceivedDelayMessage receivedDelayMessage = new ReceivedDelayMessage(message, cmd.getReceiveTime()); futures.add(receivedDelayMessage.getPromise()); invoker.invoke(receivedDelayMessage); diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderExecutor.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderExecutor.java index 30732c10..642b5708 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderExecutor.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderExecutor.java @@ -46,12 +46,14 @@ class SenderExecutor implements Disposable { private final Sender sender; private final DelayLogFacade store; private final int sendThreads; + private DynamicConfig sendConfig; SenderExecutor(final Sender sender, DelayLogFacade store, DynamicConfig sendConfig) { this.sender = sender; this.store = store; this.brokerLoadBalance = PollBrokerLoadBalance.getInstance(); this.sendThreads = sendConfig.getInt("delay.send.threads", DEFAULT_SEND_THREAD); + this.sendConfig = sendConfig; } void execute(final List indexList, final SenderGroup.ResultHandler handler, final BrokerService brokerService) { @@ -61,6 +63,17 @@ void execute(final List indexList, final SenderGroup.ResultHandle } } + public void syncExecute(final List indexList, final SenderGroup.ResultHandler handler, final BrokerService brokerService) { + Map> groups = groupByBroker(indexList, brokerService); + for (Map.Entry> entry : groups.entrySet()) { + doSyncExecute(entry.getKey(), entry.getValue(), handler); + } + } + + private void doSyncExecute(final SenderGroup group, final List list, final SenderGroup.ResultHandler handler) { + group.sendSync(list, sender, handler); + } + private void doExecute(final SenderGroup group, final List list, final SenderGroup.ResultHandler handler) { group.send(list, sender, handler); } @@ -88,7 +101,7 @@ private SenderGroup getGroup(BrokerGroupInfo groupInfo, int sendThreads) { String groupName = groupInfo.getGroupName(); SenderGroup senderGroup = groupSenders.get(groupName); if (null == senderGroup) { - senderGroup = new SenderGroup(groupInfo, sendThreads, store); + senderGroup = new SenderGroup(groupInfo, sendThreads, store, sendConfig); SenderGroup currentSenderGroup = groupSenders.putIfAbsent(groupName, senderGroup); senderGroup = null != currentSenderGroup ? currentSenderGroup : senderGroup; } else { diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderGroup.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderGroup.java index 56a7cd9a..90e841e5 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderGroup.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderGroup.java @@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory; import qunar.tc.qmq.broker.BrokerGroupInfo; import qunar.tc.qmq.common.Disposable; +import qunar.tc.qmq.configuration.DynamicConfig; import qunar.tc.qmq.delay.DelayLogFacade; import qunar.tc.qmq.delay.ScheduleIndex; import qunar.tc.qmq.delay.monitor.QMon; @@ -36,10 +37,7 @@ import qunar.tc.qmq.protocol.producer.SendResult; import java.util.*; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; import static qunar.tc.qmq.delay.monitor.QMon.delayBrokerSendMsgCount; @@ -52,12 +50,16 @@ public class SenderGroup implements Disposable { private static final Logger LOGGER = LoggerFactory.getLogger(SenderGroup.class); private static final int MAX_SEND_BATCH_SIZE = 50; + private static final int RETRY_FALL_BACK = 1;//refresh再来重试 + private static final int RETRY_WITHOUT_FALL_BACK = 2;//不再refresh,直接在netty端重试 private final AtomicReference groupInfo; private final DelayLogFacade store; + private DynamicConfig sendConfig; private final ThreadPoolExecutor executorService; private final RateLimiter LOG_LIMITER = RateLimiter.create(2); - SenderGroup(final BrokerGroupInfo groupInfo, int sendThreads, DelayLogFacade store) { + SenderGroup(final BrokerGroupInfo groupInfo, int sendThreads, DelayLogFacade store, DynamicConfig sendConfig) { + this.sendConfig = sendConfig; this.groupInfo = new AtomicReference<>(groupInfo); this.store = store; final LinkedBlockingQueue workQueue = new LinkedBlockingQueue<>(1); @@ -79,27 +81,49 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { }); } + /** + * 绕过线程池直接执行 + * @param records + * @param sender + * @param handler + */ + public void sendSync(final List records, final Sender sender, final ResultHandler handler) { + monitorSyncSend(records); + doSend(records, sender, handler, RETRY_WITHOUT_FALL_BACK); + } + + private static void monitorSyncSend(List records) { + try { + for (int i = 0; i < records.size(); i++) { + ScheduleIndex scheduleIndex = records.get(i); + QMon.syncSend(scheduleIndex.getSubject()); + } + } catch (Throwable throwable) { + + } + } + public void send(final List records, final Sender sender, final ResultHandler handler) { - executorService.execute(() -> doSend(records, sender, handler)); + executorService.execute(() -> doSend(records, sender, handler, RETRY_FALL_BACK)); } - private void doSend(final List batch, final Sender sender, final ResultHandler handler) { + private void doSend(final List batch, final Sender sender, final ResultHandler handler, int retryStrategy) { BrokerGroupInfo groupInfo = this.groupInfo.get(); String groupName = groupInfo.getGroupName(); List> partitions = Lists.partition(batch, MAX_SEND_BATCH_SIZE); for (List partition : partitions) { - send(sender, handler, groupInfo, groupName, partition); + send(sender, handler, groupInfo, groupName, partition, retryStrategy); } } - private void send(Sender sender, ResultHandler handler, BrokerGroupInfo groupInfo, String groupName, List list) { + private void send(Sender sender, ResultHandler handler, BrokerGroupInfo groupInfo, String groupName, List list, int retryStrategy) { try { long start = System.currentTimeMillis(); List records = store.recoverLogRecord(list); QMon.loadMsgTime(System.currentTimeMillis() - start); - Datagram response = sendMessages(records, sender); + Datagram response = sendMessages(records, sender, retryStrategy); release(records); monitor(list, groupName); if (response == null) { @@ -183,21 +207,77 @@ private Map getSendResult(Datagram response) { } } - private Datagram sendMessages(final List records, final Sender sender) { + private Datagram sendMessages(final List records, final Sender sender, int retryStrategy) { long start = System.currentTimeMillis(); try { - return sender.send(records, this); + //return sender.send(records, this); + return doSendDatagram(records, sender); } catch (ClientSendException e) { ClientSendException.SendErrorCode errorCode = e.getSendErrorCode(); LOGGER.error("SenderGroup sendMessages error, client send exception, broker group={}, errorCode={}", groupInfo.get(), errorCode, e); monitorSendError(records, groupInfo.get(), errorCode.ordinal(), e); - } catch (Exception e) { + } catch (Throwable e) { LOGGER.error("SenderGroup sendMessages error, broker group={}", groupInfo.get(), e); monitorSendError(records, groupInfo.get(), -1, e); } finally { QMon.sendMsgTime(groupInfo.get().getGroupName(), System.currentTimeMillis() - start); } - return null; + return retrySendMessages(records, sender, retryStrategy); + } + + private Datagram doSendDatagram(List records, Sender sender) throws Exception { + return sender.send(records, this); + } + + /** + * 如果想更优雅,可以参考guava-retry + * @param records + * @param sender + * @param retryStrategy + * @return + */ + private Datagram retrySendMessages(List records, Sender sender, int retryStrategy) { + Datagram result = null; + int maxTimes = sendConfig.getInt("netty.send.message.retry.max.times", 20); + /** + * 如果是Refresh重试过来的,那就在这里一直重试netty + * 如果delay机器本身出问题,refresh也没用,就在这里卡住 + */ + if (retryStrategy == RETRY_WITHOUT_FALL_BACK) { + maxTimes = sendConfig.getInt("netty.send.message.retry.without.fallback.max.times", 36000); + } + for (int count = 0; count < maxTimes; count++) { + try { + QMon.retryNettySend(); + result = doSendDatagram(records, sender); + if (result != null) { + break; + } + } catch (Throwable e) { + if (LOG_LIMITER.tryAcquire()) { + for (ScheduleSetRecord record : records) { + LOGGER.error("retry senderGroup sendMessages error, retry_count={}, subject={}, brokerGroup={}", count, record.getSubject(), groupInfo.get(), e); + } + } + QMon.retryNettySendError(); + retryRandomSleep(retryStrategy); + } + } + return result; + } + + private void retryRandomSleep(int retryStrategy) { + long randomMinSleepMillis = sendConfig.getLong("netty.send.message.retry.min.sleep.millis", 1000L); + long randomMaxSleepMillis = sendConfig.getLong("netty.send.message.retry.max.sleep.millis", 10000L); + if (retryStrategy == RETRY_WITHOUT_FALL_BACK) { + randomMaxSleepMillis = sendConfig.getLong("netty.send.message.retry.without.fallback.max.sleep.millis", 30000L); + } + final ThreadLocalRandom random = ThreadLocalRandom.current(); + try { + Thread.sleep(random.nextLong(randomMinSleepMillis, randomMaxSleepMillis)); + } catch (InterruptedException ex) { + + } } private void monitorSendFail(List indexList, String groupName) { diff --git a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderProcessor.java b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderProcessor.java index 90f90893..a281dc78 100644 --- a/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderProcessor.java +++ b/qmq-delay-server/src/main/java/qunar/tc/qmq/delay/sender/SenderProcessor.java @@ -16,6 +16,7 @@ package qunar.tc.qmq.delay.sender; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,6 +76,13 @@ public void init() { this.batchExecutor.init(); } + private void sendSync(ScheduleIndex index) { + if (!BrokerRoleManager.isDelayMaster()) { + return; + } + syncProcess(Lists.newArrayList(index)); + } + @Override public void send(ScheduleIndex index) { if (!BrokerRoleManager.isDelayMaster()) { @@ -112,6 +120,15 @@ public void process(List indexList) { } } + private void syncProcess(List indexList) { + try { + senderExecutor.syncExecute(indexList, this, brokerService); + } catch (Exception e) { + LOGGER.error("send message failed,messageSize:{} will retry", indexList.size(), e); + retry(indexList); + } + } + private void success(ScheduleSetRecord record) { facade.appendDispatchLog(new DispatchLogRecord(record.getSubject(), record.getMessageId(), record.getScheduleTime(), record.getSequence())); } @@ -138,7 +155,7 @@ private void retry(List indexList) { final Set refreshSubject = Sets.newHashSet(); for (ScheduleIndex index : indexList) { refresh(index, refreshSubject); - send(index); + sendSync(index); } }