Skip to content

Commit

Permalink
[Optimize][Code]update_spotless (#2202)
Browse files Browse the repository at this point in the history
* update_spotless

* update_spotless
  • Loading branch information
zackyoungh authored Aug 11, 2023
1 parent e48dd5a commit d5b597c
Show file tree
Hide file tree
Showing 511 changed files with 7,817 additions and 11,872 deletions.
16 changes: 4 additions & 12 deletions dinky-admin/src/main/java/org/dinky/aop/LogAspect.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,7 @@ protected void handleLog(final JoinPoint joinPoint, final Exception e, Object js
// *========数据库日志=========*//
OperateLog operLog = new OperateLog();
Result result = JSONUtil.toBean(JSONUtil.parseObj(jsonResult), Result.class);
operLog.setStatus(
result.isSuccess()
? BusinessStatus.SUCCESS.ordinal()
: BusinessStatus.FAIL.ordinal());
operLog.setStatus(result.isSuccess() ? BusinessStatus.SUCCESS.ordinal() : BusinessStatus.FAIL.ordinal());

// 请求的地址
String ip = IpUtils.getIpAddr(ServletUtils.getRequest());
Expand Down Expand Up @@ -150,8 +147,7 @@ protected void handleLog(final JoinPoint joinPoint, final Exception e, Object js
* @param operLog 操作日志
* @throws Exception
*/
public void getControllerMethodDescription(JoinPoint joinPoint, Log log, OperateLog operLog)
throws Exception {
public void getControllerMethodDescription(JoinPoint joinPoint, Log log, OperateLog operLog) throws Exception {
// 设置action动作
operLog.setBusinessType(log.businessType().ordinal());
// 设置标题
Expand All @@ -177,9 +173,7 @@ private void setRequestValue(JoinPoint joinPoint, OperateLog operLog) throws Exc
operLog.setOperateParam(StringUtils.substring(params, 0, 2000));
} else {
Map<?, ?> paramsMap =
(Map<?, ?>)
ServletUtils.getRequest()
.getAttribute(HandlerMapping.URI_TEMPLATE_VARIABLES_ATTRIBUTE);
(Map<?, ?>) ServletUtils.getRequest().getAttribute(HandlerMapping.URI_TEMPLATE_VARIABLES_ATTRIBUTE);
operLog.setOperateParam(StringUtils.substring(paramsMap.toString(), 0, 2000));
}
}
Expand Down Expand Up @@ -217,8 +211,6 @@ private String argsArrayToString(Object[] paramsArray) {
* @return 如果是需要过滤的对象,则返回true;否则返回false。
*/
public boolean isFilterObject(final Object o) {
return o instanceof MultipartFile
|| o instanceof HttpServletRequest
|| o instanceof HttpServletResponse;
return o instanceof MultipartFile || o instanceof HttpServletRequest || o instanceof HttpServletResponse;
}
}
27 changes: 11 additions & 16 deletions dinky-admin/src/main/java/org/dinky/aop/WebExceptionHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,15 @@ public Result<Void> busException(BusException e) {
return Result.failed(e.getMsg());
}

private static final Map<String, Status> ERR_CODE_MAPPING =
MapUtil.<String, Status>builder()
.put(NotLoginException.NOT_TOKEN, Status.NOT_TOKEN)
.put(NotLoginException.INVALID_TOKEN, Status.INVALID_TOKEN)
.put(NotLoginException.TOKEN_TIMEOUT, Status.EXPIRED_TOKEN)
.put(NotLoginException.BE_REPLACED, Status.BE_REPLACED)
.put(NotLoginException.KICK_OUT, Status.KICK_OUT)
.put(NotLoginException.TOKEN_FREEZE, Status.TOKEN_FREEZED)
.put(NotLoginException.NO_PREFIX, Status.NO_PREFIX)
.build();
private static final Map<String, Status> ERR_CODE_MAPPING = MapUtil.<String, Status>builder()
.put(NotLoginException.NOT_TOKEN, Status.NOT_TOKEN)
.put(NotLoginException.INVALID_TOKEN, Status.INVALID_TOKEN)
.put(NotLoginException.TOKEN_TIMEOUT, Status.EXPIRED_TOKEN)
.put(NotLoginException.BE_REPLACED, Status.BE_REPLACED)
.put(NotLoginException.KICK_OUT, Status.KICK_OUT)
.put(NotLoginException.TOKEN_FREEZE, Status.TOKEN_FREEZED)
.put(NotLoginException.NO_PREFIX, Status.NO_PREFIX)
.build();

@ExceptionHandler
public Result<Void> notLoginException(NotLoginException notLoginException) {
Expand Down Expand Up @@ -112,14 +111,10 @@ public Result<String> paramExceptionHandler(MethodArgumentNotValidException e) {
FieldError fieldError = (FieldError) errors.get(0);
if (StringUtils.isNotBlank(fieldError.getDefaultMessage())) {
return Result.failed(
Status.GLOBAL_PARAMS_CHECK_ERROR,
fieldError.getField(),
fieldError.getDefaultMessage());
Status.GLOBAL_PARAMS_CHECK_ERROR, fieldError.getField(), fieldError.getDefaultMessage());
}
return Result.failed(
Status.GLOBAL_PARAMS_CHECK_ERROR_VALUE,
fieldError.getField(),
fieldError.getRejectedValue());
Status.GLOBAL_PARAMS_CHECK_ERROR_VALUE, fieldError.getField(), fieldError.getRejectedValue());
}
}
return Result.failed(Status.REQUEST_PARAMS_ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,9 @@ public class CacheConfig {
public RedisCacheConfiguration cacheConfiguration() {
return RedisCacheConfiguration.defaultCacheConfig()
// 序列化为json
.serializeValuesWith(
RedisSerializationContext.SerializationPair.fromSerializer(
RedisSerializer.json()))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(RedisSerializer.json()))
.serializeKeysWith(
RedisSerializationContext.SerializationPair.fromSerializer(
new StringRedisSerializer()));
RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()));
}

// /**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ private boolean shouldRegisterLinksMapping(
WebEndpointProperties webEndpointProperties, Environment environment, String basePath) {
return webEndpointProperties.getDiscovery().isEnabled()
&& (StringUtils.hasText(basePath)
|| ManagementPortType.get(environment)
.equals(ManagementPortType.DIFFERENT));
|| ManagementPortType.get(environment).equals(ManagementPortType.DIFFERENT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,27 +58,26 @@ public class MybatisPlusConfig {

private final MybatisPlusFillProperties autoFillProperties;

private static final Set<String> IGNORE_TABLE_NAMES =
ImmutableSet.of(
"dinky_namespace",
"dinky_alert_group",
"dinky_alert_history",
"dinky_alert_instance",
"dinky_catalogue",
"dinky_cluster",
"dinky_cluster_configuration",
"dinky_database",
"dinky_fragment",
"dinky_history",
"dinky_jar",
"dinky_job_history",
"dinky_job_instance",
"dinky_role",
"dinky_savepoints",
"dinky_task",
"dinky_task_statement",
"dinky_git_project",
"dinky_task_version");
private static final Set<String> IGNORE_TABLE_NAMES = ImmutableSet.of(
"dinky_namespace",
"dinky_alert_group",
"dinky_alert_history",
"dinky_alert_instance",
"dinky_catalogue",
"dinky_cluster",
"dinky_cluster_configuration",
"dinky_database",
"dinky_fragment",
"dinky_history",
"dinky_jar",
"dinky_job_history",
"dinky_job_instance",
"dinky_role",
"dinky_savepoints",
"dinky_task",
"dinky_task_statement",
"dinky_git_project",
"dinky_task_version");

@Bean
// @ConditionalOnProperty(name = "spring.profiles.active", havingValue = "pgsql , jmx")
Expand All @@ -103,24 +102,22 @@ public PostgreSQLPrepareInterceptor postgreSQLPrepareInterceptor() {
public MybatisPlusInterceptor mybatisPlusInterceptor() {
log.info("mybatis plus interceptor execute");
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(
new TenantLineInnerInterceptor(
new TenantLineHandler() {

@Override
public Expression getTenantId() {
Integer tenantId = (Integer) TenantContextHolder.get();
if (tenantId == null) {
return new NullValue();
}
return new LongValue(tenantId);
}

@Override
public boolean ignoreTable(String tableName) {
return !IGNORE_TABLE_NAMES.contains(tableName);
}
}));
interceptor.addInnerInterceptor(new TenantLineInnerInterceptor(new TenantLineHandler() {

@Override
public Expression getTenantId() {
Integer tenantId = (Integer) TenantContextHolder.get();
if (tenantId == null) {
return new NullValue();
}
return new LongValue(tenantId);
}

@Override
public boolean ignoreTable(String tableName) {
return !IGNORE_TABLE_NAMES.contains(tableName);
}
}));

return interceptor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
public abstract class BaseSchedule {
private static final HashMap<String, ScheduledFuture<?>> SCHEDULE_MAP = new HashMap<>();

@Resource private ThreadPoolTaskScheduler threadPoolTaskScheduler;
@Resource
private ThreadPoolTaskScheduler threadPoolTaskScheduler;

protected void addSchedule(String key, Runnable runnable, Trigger trigger) {
ScheduledFuture<?> schedule = threadPoolTaskScheduler.schedule(runnable, trigger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,26 +69,20 @@ public class FlinkMetricsIndicator extends BaseSchedule {
private final HistoryService historyService;
private final JobInstanceService jobInstanceService;
private final MonitorService monitorService;
public AtomicReference<Integer> flinkMetricsRequestTimeout =
new AtomicReference<>(
SystemConfiguration.getInstances()
.getFlinkMetricsGatherTimeout()
.getDefaultValue());
public AtomicReference<Integer> flinkMetricsRequestTimeout = new AtomicReference<>(
SystemConfiguration.getInstances().getFlinkMetricsGatherTimeout().getDefaultValue());

/** task可用的url */
private static final Map<Integer, FlinkMetrics> TASK_FLINK_METRICS_MAP =
new ConcurrentHashMap<>();
private static final Map<Integer, FlinkMetrics> TASK_FLINK_METRICS_MAP = new ConcurrentHashMap<>();

private static final Map<LocalDateTime, List<FlinkMetrics>> FLINK_METRICS_DATA_MAP =
new ConcurrentHashMap<>();
private static final Map<LocalDateTime, List<FlinkMetrics>> FLINK_METRICS_DATA_MAP = new ConcurrentHashMap<>();

public void writeFlinkMetrics() {
LocalDateTime now = LocalDateTime.now();
FLINK_METRICS_DATA_MAP.put(now, new CopyOnWriteArrayList<>());
CompletableFuture<?>[] array =
TASK_FLINK_METRICS_MAP.values().stream()
.map((f) -> CompletableFuture.runAsync(() -> addFlinkMetrics(f, now)))
.toArray(CompletableFuture[]::new);
CompletableFuture<?>[] array = TASK_FLINK_METRICS_MAP.values().stream()
.map((f) -> CompletableFuture.runAsync(() -> addFlinkMetrics(f, now)))
.toArray(CompletableFuture[]::new);
AsyncUtil.waitAll(array);
MetricsVO metricsVO = new MetricsVO();
metricsVO.setModel("flink");
Expand All @@ -109,11 +103,10 @@ public void init() {
Configuration<Integer> flinkMetricsGatherTiming =
SystemConfiguration.getInstances().getFlinkMetricsGatherTiming();
final String key = flinkMetricsGatherTiming.getKey();
flinkMetricsGatherTiming.addChangeEvent(
time -> {
removeSchedule(key);
addSchedule(key, this::writeFlinkMetrics, new PeriodicTrigger(time));
});
flinkMetricsGatherTiming.addChangeEvent(time -> {
removeSchedule(key);
addSchedule(key, this::writeFlinkMetrics, new PeriodicTrigger(time));
});
}

public void getAndCheckFlinkUrlAvailable() {
Expand All @@ -122,14 +115,10 @@ public void getAndCheckFlinkUrlAvailable() {
if (CollUtil.isEmpty(jobInstances)) {
return;
}
List<History> historyList =
historyService.listByIds(
jobInstances.stream()
.map(JobInstance::getHistoryId)
.collect(Collectors.toList()));
List<History> historyList = historyService.listByIds(
jobInstances.stream().map(JobInstance::getHistoryId).collect(Collectors.toList()));
List<Metrics> metricsList = monitorService.list();
Set<Integer> taskIdSet =
metricsList.stream().map(Metrics::getTaskId).collect(Collectors.toSet());
Set<Integer> taskIdSet = metricsList.stream().map(Metrics::getTaskId).collect(Collectors.toSet());
for (JobInstance jobInstance : jobInstances) {
Integer taskId = jobInstance.getTaskId();
if (!taskIdSet.contains(taskId)) {
Expand All @@ -139,16 +128,11 @@ public void getAndCheckFlinkUrlAvailable() {
flinkMetrics.setTaskId(taskId);
flinkMetrics.setJobId(jobInstance.getJid());
TASK_FLINK_METRICS_MAP.put(taskId, flinkMetrics);
metricsList.stream()
.filter(x -> x.getTaskId().equals(taskId))
.forEach(
m -> {
Map<String, Map<String, String>> verticesAndMetricsMap =
flinkMetrics.getVerticesAndMetricsMap();
verticesAndMetricsMap.putIfAbsent(
m.getVertices(), new ConcurrentHashMap<>());
verticesAndMetricsMap.get(m.getVertices()).put(m.getMetrics(), "");
});
metricsList.stream().filter(x -> x.getTaskId().equals(taskId)).forEach(m -> {
Map<String, Map<String, String>> verticesAndMetricsMap = flinkMetrics.getVerticesAndMetricsMap();
verticesAndMetricsMap.putIfAbsent(m.getVertices(), new ConcurrentHashMap<>());
verticesAndMetricsMap.get(m.getVertices()).put(m.getMetrics(), "");
});
for (History jobHistory : historyList) {
if (jobInstance.getHistoryId().equals(jobHistory.getId())) {
String hosts = jobHistory.getJobManagerAddress();
Expand All @@ -157,12 +141,10 @@ public void getAndCheckFlinkUrlAvailable() {
try {
HttpUtil.createGet(host + "/config")
.timeout(flinkMetricsRequestTimeout.get())
.then(
resp ->
TASK_FLINK_METRICS_MAP
.get(taskId)
.getUrls()
.add(host));
.then(resp -> TASK_FLINK_METRICS_MAP
.get(taskId)
.getUrls()
.add(host));
} catch (Exception e) {
log.warn("host read Timeout:{}", host);
}
Expand All @@ -182,37 +164,33 @@ public void addFlinkMetrics(FlinkMetrics flinkMetrics, LocalDateTime now) {
}

// http://10.8.16.125:8282/jobs/06ccde3ff6e53bafe729e0e50fca72fd/vertices/cbc357ccb763df2852fee8c4fc7d55f2/metrics?get=0.buffers.inputExclusiveBuffersUsage,0.numRecordsInPerSecond
flinkMetrics
.getVerticesAndMetricsMap()
.forEach(
(v, m) -> {
if (CollUtil.isEmpty(urlList)) {
return;
}
String metricsName = StrUtil.join(",", m.keySet());
HttpUtils.asyncRequest(
flinkMetrics.getUrls(),
"/jobs/"
+ flinkMetrics.getJobId()
+ "/vertices/"
+ v
+ "/metrics?get="
+ URLUtil.encode(metricsName),
flinkMetricsRequestTimeout.get(),
x -> {
JSONArray array = JSONUtil.parseArray(x.body());
if (CollUtil.isEmpty(array)) {
return;
}
array.forEach(
y -> {
JSONObject jsonObject = JSONUtil.parseObj(y);
String id = jsonObject.getStr("id");
String value = jsonObject.getStr("value");
m.put(id, value);
});
});
flinkMetrics.getVerticesAndMetricsMap().forEach((v, m) -> {
if (CollUtil.isEmpty(urlList)) {
return;
}
String metricsName = StrUtil.join(",", m.keySet());
HttpUtils.asyncRequest(
flinkMetrics.getUrls(),
"/jobs/"
+ flinkMetrics.getJobId()
+ "/vertices/"
+ v
+ "/metrics?get="
+ URLUtil.encode(metricsName),
flinkMetricsRequestTimeout.get(),
x -> {
JSONArray array = JSONUtil.parseArray(x.body());
if (CollUtil.isEmpty(array)) {
return;
}
array.forEach(y -> {
JSONObject jsonObject = JSONUtil.parseObj(y);
String id = jsonObject.getStr("id");
String value = jsonObject.getStr("value");
m.put(id, value);
});
});
});
FLINK_METRICS_DATA_MAP.get(now).add(flinkMetrics);
}

Expand Down
Loading

0 comments on commit d5b597c

Please sign in to comment.