Skip to content

Commit

Permalink
fix: add RateLimiterFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
thji committed Sep 26, 2024
1 parent 38e1b22 commit dde5ae4
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.arextest.schedule.common;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import lombok.AllArgsConstructor;
import org.apache.commons.lang3.StringUtils;

/**
* the rate limiter factory
* @author thji
* @date 2024/8/27
* @since 1.0.0
*/
@AllArgsConstructor
public class RateLimiterFactory {

private final int singleCaseTasks;
private final double errorBreakRatio;
private final int continuousFailThreshold;
private final int replaySendMaxQps;

private Map<String, SendSemaphoreLimiter> rateLimiterMap = new HashMap<>();

public RateLimiterFactory(int singleCaseTasks, double errorBreakRatio,
int continuousFailThreshold, int replaySendMaxQps) {
this.singleCaseTasks = singleCaseTasks;
this.errorBreakRatio = errorBreakRatio;
this.continuousFailThreshold = continuousFailThreshold;
this.replaySendMaxQps = replaySendMaxQps;
}

public SendSemaphoreLimiter getRateLimiter(String host) {
if (StringUtils.isEmpty(host)) {
return null;
}
return rateLimiterMap.computeIfAbsent(host, k -> {
SendSemaphoreLimiter sendSemaphoreLimiter = new SendSemaphoreLimiter(
replaySendMaxQps, 1);
sendSemaphoreLimiter.setTotalTasks(singleCaseTasks);
sendSemaphoreLimiter.setErrorBreakRatio(errorBreakRatio);
sendSemaphoreLimiter.setContinuousFailThreshold(continuousFailThreshold);
sendSemaphoreLimiter.setHost(host);
return sendSemaphoreLimiter;
}
);
}

public Collection<SendSemaphoreLimiter> getAll() {
return Collections.unmodifiableCollection(rateLimiterMap.values());
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.arextest.schedule.model;

import com.arextest.schedule.common.RateLimiterFactory;
import com.arextest.schedule.model.bizlog.BizLog;
import com.arextest.schedule.model.dao.mongodb.ReplayPlanCollection;
import com.arextest.schedule.model.plan.BuildReplayPlanType;
Expand Down Expand Up @@ -107,6 +108,8 @@ public class ReplayPlan {
@JsonIgnore
private Map<String, String> caseTags;
private boolean initReportItem;
@JsonIgnore
private RateLimiterFactory rateLimiterFactory;

public void enqueueBizLog(BizLog log) {
this.bizLogs.add(log);
Expand All @@ -115,6 +118,6 @@ public void enqueueBizLog(BizLog log) {
public void buildActionItemMap() {
this.getReplayActionItemList().forEach(
replayActionItem -> this.actionItemMap.put(replayActionItem.getId(), replayActionItem));
LOGGER.info("buildActionItemMap, planId:{}, keySet:{}", getId(), actionItemMap.keySet());
LOGGER.info("buildActionItemMap, planId:{}, keySet:{}", getId(), actionItemMap.values());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ public class ServiceInstance {
private List<ServiceInstanceOperation> operationList;
private Env metadata;

@JsonIgnore
@Getter
@Setter
private SendSemaphoreLimiter sendSemaphoreLimiter;

public String subEnv() {
return metadata == null ? StringUtils.EMPTY : metadata.subEnv;
}
Expand All @@ -61,5 +56,4 @@ public static class Env {

private String subEnv;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.arextest.model.replay.CaseSendScene;
import com.arextest.schedule.bizlog.BizLogger;
import com.arextest.schedule.common.CommonConstant;
import com.arextest.schedule.common.RateLimiterFactory;
import com.arextest.schedule.common.SendSemaphoreLimiter;
import com.arextest.schedule.comparer.CompareConfigService;
import com.arextest.schedule.dao.mongodb.ReplayActionCaseItemRepository;
Expand All @@ -24,6 +25,7 @@
import com.arextest.schedule.utils.ReplayParentBinder;
import com.arextest.schedule.utils.ServiceUrlUtils;
import com.arextest.schedule.utils.StageUtils;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -245,7 +247,8 @@ private static void setCurrentServiceInstances(PlanExecutionContext executionCon
.map(ServiceInstance::getIp)
.collect(Collectors.joining(",")) : null;
BizLogger.recordCurrentServerInstances(executionContext.getPlan(), targetHosts, sourceHosts);
LOGGER.info("set service instances for the current context," +
LOGGER.info(
"[[title=setCurrentServiceInstances]] set service instances for the current context," +
"targetInstances: {}, sourceInstances: {}",
targetInstances, sourceInstances);
}
Expand Down Expand Up @@ -447,7 +450,7 @@ private boolean initCaseCount() {
private void initReplayPlan() {
compareConfigService.preload(replayPlan);
// init rate limiter for each target instance
List<SendSemaphoreLimiter> sendSemaphoreLimiterList = this.initRateLimiters(replayPlan);
Collection<SendSemaphoreLimiter> sendSemaphoreLimiterList = this.initRateLimiters(replayPlan);
replayPlan.setPlanStatus(ExecutionStatus.buildNormal(sendSemaphoreLimiterList));
replayPlan.buildActionItemMap();
}
Expand All @@ -457,35 +460,37 @@ private void initReplayPlan() {
*
* @param replayPlan the replay plan
*/
private List<SendSemaphoreLimiter> initRateLimiters(ReplayPlan replayPlan) {
private Collection<SendSemaphoreLimiter> initRateLimiters(ReplayPlan replayPlan) {
if (replayPlan == null) {
return Collections.emptyList();
}
Optional<ReplayActionItem> optionalReplayActionItem = replayPlan.getReplayActionItemList()
.stream()
.filter(obj -> CollectionUtils.isNotEmpty(obj.getTargetInstance())).findFirst();
if (!optionalReplayActionItem.isPresent()) {
List<ReplayActionItem> replayActionItems = replayPlan.getReplayActionItemList();
if (CollectionUtils.isEmpty(replayActionItems)) {
return Collections.emptyList();
}
List<ServiceInstance> targetInstances = optionalReplayActionItem.get().getTargetInstance();
if (CollectionUtils.isEmpty(targetInstances)) {
// distinct target instances
List<ServiceInstance> distinctTargetInstances = replayActionItems
.stream()
.map(ReplayActionItem::getTargetInstance)
.flatMap(Collection::stream)
.distinct()
.collect(Collectors.toList());

if (CollectionUtils.isEmpty(distinctTargetInstances)) {
return Collections.emptyList();
}
int singleTasks = replayPlan.getCaseTotalCount() / targetInstances.size();

for (ServiceInstance targetInstance : targetInstances) {
SendSemaphoreLimiter sendSemaphoreLimiter = new SendSemaphoreLimiter(
replayPlan.getReplaySendMaxQps(), 1);
sendSemaphoreLimiter.setTotalTasks(singleTasks);
sendSemaphoreLimiter.setErrorBreakRatio(errorBreakRatio);
sendSemaphoreLimiter.setContinuousFailThreshold(continuousFailThreshold);
sendSemaphoreLimiter.setHost(targetInstance.getIp());
targetInstance.setSendSemaphoreLimiter(sendSemaphoreLimiter);
int singleTasks = replayPlan.getCaseTotalCount() / distinctTargetInstances.size();
RateLimiterFactory rateLimiterFactory = new RateLimiterFactory(singleTasks, errorBreakRatio,
continuousFailThreshold, replayPlan.getReplaySendMaxQps());
for (ServiceInstance targetInstance : distinctTargetInstances) {
rateLimiterFactory.getRateLimiter(targetInstance.getIp());
LOGGER.info("[[title=RateLimiterFactory]] create sendSemaphoreLimiter,ip [{}]",
targetInstance.getIp());
}
replayPlan.setRateLimiterFactory(rateLimiterFactory);
LOGGER.info("[[title=RateLimiterFactory]] init success for [{}]", replayPlan.getPlanName());

return targetInstances.stream().map(ServiceInstance::getSendSemaphoreLimiter)
.collect(Collectors.toList());
return rateLimiterFactory.getAll();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,11 @@ private void doDistribute(ArrayBlockingQueue<ReplayActionCaseItem> caseItemArray
ServiceInstance targetServiceInstance,
CountDownLatch groupSentLatch,
PlanExecutionContext<?> planExecutionContext) {
SendSemaphoreLimiter currentLimiter = targetServiceInstance.getSendSemaphoreLimiter();

SendSemaphoreLimiter currentLimiter = planExecutionContext
.getPlan()
.getRateLimiterFactory()
.getRateLimiter(targetServiceInstance.getIp());
if (currentLimiter == null) {
LOGGER.warn("The current service instance - [{}],has no rate limiter,skip send.",
targetServiceInstance.getIp());
Expand All @@ -217,10 +221,6 @@ private void doDistribute(ArrayBlockingQueue<ReplayActionCaseItem> caseItemArray
break;
}
ReplayActionCaseItem caseItem = caseItemArrayBlockingQueue.poll();
if (caseItem == null) {
LOGGER.warn("The case item is empty,skip send.");
continue;
}

this.doExecute(caseItem, targetServiceInstance, groupSentLatch, planExecutionContext,
currentLimiter);
Expand Down Expand Up @@ -252,6 +252,11 @@ private void doExecute(ReplayActionCaseItem replayActionCaseItem,
PlanExecutionContext<?> executionContext,
SendSemaphoreLimiter sendSemaphoreLimiter) {

if (replayActionCaseItem == null) {
LOGGER.warn("The current case item is null,skip send.");
return;
}

ReplayActionItem actionItem = replayActionCaseItem.getParent();
MDCTracer.addDetailId(replayActionCaseItem.getId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.arextest.common.config.DefaultApplicationConfig;
import com.arextest.schedule.common.RateLimiterFactory;
import com.arextest.schedule.common.SendSemaphoreLimiter;
import com.arextest.schedule.comparer.ComparisonWriter;
import com.arextest.schedule.dao.mongodb.ReplayActionCaseItemRepository;
Expand Down Expand Up @@ -172,14 +173,16 @@ void testDoDistribute() {
targetServiceInstance.setIp("127.0.0.1");
actionItem.setTargetInstance(Lists.list(targetServiceInstance));

SendSemaphoreLimiter sendSemaphoreLimiter = new SendSemaphoreLimiter(20, 1);
targetServiceInstance.setSendSemaphoreLimiter(sendSemaphoreLimiter);
RateLimiterFactory rateLimiterFactory = new RateLimiterFactory(100, 0.1,
20, 20);
rateLimiterFactory.getRateLimiter(targetServiceInstance.getIp());

ReplayPlan replayPlan = new ReplayPlan();
replayPlan.setAppId("appId");
replayPlan.setReplaySendMaxQps(1);
replayPlan.setCaseTotalCount(100);
replayPlan.setReplayActionItemList(Lists.list(actionItem));
replayPlan.setRateLimiterFactory(rateLimiterFactory);

PlanExecutionContext<?> planExecutionContext = new PlanExecutionContext<>();
planExecutionContext.setPlan(replayPlan);
Expand Down

0 comments on commit dde5ae4

Please sign in to comment.