Skip to content

Commit

Permalink
[INLONG-11531][Manager] Fix bug in DolphinScheduler engine (#11532)
Browse files Browse the repository at this point in the history
  • Loading branch information
emptyOVO authored Nov 25, 2024
1 parent 82128a3 commit ace3362
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ public class DolphinScheduleConstants {
// DS public constants
public static final String DS_ID = "id";
public static final String DS_CODE = "code";
public static final String DS_SUCCESS = "success";
public static final String DS_TOKEN = "token";
public static final String DS_PAGE_SIZE = "pageSize";
public static final String DS_PAGE_NO = "pageNo";
public static final String DS_SEARCH_VAL = "searchVal";
public static final String DS_RESPONSE_DATA = "data";
public static final String DS_RESPONSE_NAME = "name";
public static final String DS_RESPONSE_TOTAL_LIST = "totalList";
public static final int DS_DEFAULT_RETRY_TIMES = 3;
public static final int DS_DEFAULT_WAIT_MILLS = 1000;
public static final String DS_DEFAULT_PAGE_SIZE = "10";
public static final String DS_DEFAULT_PAGE_NO = "1";
public static final String DS_DEFAULT_TIMEZONE_ID = "Asia/Shanghai";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public void start() {
@Override
@VisibleForTesting
public boolean handleRegister(ScheduleInfo scheduleInfo) {
start();
String processDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL;
String scheduleUrl = projectBaseUrl + "/" + projectCode + DS_SCHEDULE_URL;
String processName = scheduleInfo.getInlongGroupId() + DS_DEFAULT_PROCESS_NAME;
Expand Down Expand Up @@ -191,6 +192,7 @@ public boolean handleRegister(ScheduleInfo scheduleInfo) {
@Override
@VisibleForTesting
public boolean handleUnregister(String groupId) {
start();
String processName = groupId + DS_DEFAULT_PROCESS_NAME;
String processDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_CODE;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PAGE_NO;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PAGE_SIZE;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_RETRY_TIMES;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_SCHEDULE_TIME_FORMAT;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_DESC;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_GEN_NUM;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_NAME;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TIMEZONE_ID;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_WAIT_MILLS;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ID;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ONLINE_URL;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PAGE_NO;
Expand All @@ -78,6 +80,7 @@
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RESPONSE_TOTAL_LIST;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SCHEDULE_DEF;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SEARCH_VAL;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SUCCESS;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_DEFINITION;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_GEN_NUM;
import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_RELATION;
Expand All @@ -89,6 +92,7 @@
import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.JSON_PARSE_ERROR;
import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.NETWORK_ERROR;
import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_CREATION_FAILED;
import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_IN_USED_ERROR;
import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_QUERY_FAILED;
import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_RELEASE_FAILED;
import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROJECT_CREATION_FAILED;
Expand Down Expand Up @@ -489,20 +493,48 @@ public static void delete(String url, String token, long code) {
Map<String, String> header = buildHeader(token);

String requestUrl = url + "/" + code;
for (int retryTime = 1; retryTime <= DS_DEFAULT_RETRY_TIMES; retryTime++) {
JsonObject response = executeHttpRequest(requestUrl, DELETE, new HashMap<>(), header);
if (response.get(DS_CODE).getAsInt() == PROCESS_DEFINITION_IN_USED_ERROR) {

LOGGER.warn(
"Retrying for current retry time ={}, maximum retry count={}, code={}, url={}, after {} ms...",
retryTime, DS_DEFAULT_RETRY_TIMES, code, url, DS_DEFAULT_WAIT_MILLS);
Thread.sleep(DS_DEFAULT_WAIT_MILLS);

} else if (response.get(DS_SUCCESS).getAsBoolean()) {
LOGGER.info("Delete process or project success, response data: {}", response);
return;
} else {
LOGGER.warn("Delete process or project failed, response data: {}", response);
}

JsonObject response = executeHttpRequest(requestUrl, DELETE, new HashMap<>(), header);
LOGGER.info("delete process or project success, response data: {}", response);
}
LOGGER.error(
"Maximum retry attempts reached for deleting process or project. URL: {}, Code: {}",
url, code);
throw new DolphinScheduleException(
DELETION_FAILED,
String.format("Failed to delete after %d retries. Code: %d at URL: %s",
DS_DEFAULT_RETRY_TIMES, code, url));

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.error("Thread interrupted while retrying delete process or project: ", e);
throw new DolphinScheduleException(
DELETION_FAILED,
String.format("Thread interrupted while retrying delete for code: %d at URL: %s", code, url));
} catch (JsonParseException e) {
LOGGER.error("JsonParseException during deleting process or project", e);
throw new DolphinScheduleException(
JSON_PARSE_ERROR,
String.format("Error deleting process or project with code: %d at URL: %s", code, url), e);
String.format("Error deleting process or project with code: %d at URL: %s", code, url));

} catch (DolphinScheduleException e) {
LOGGER.error("Error deleting process or project for code={}, url={} ", code, url, e);
throw new DolphinScheduleException(
DELETION_FAILED,
String.format("Error deleting process or project with code: %d at URL: %s", code, url), e);
String.format("Error deleting process or project with code: %d at URL: %s", code, url));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class DolphinScheduleException extends RuntimeException {
public static final String GEN_TASK_CODE_FAILED = "GEN_TASK_CODE_FAILED";

// Process-related error codes
public static final int PROCESS_DEFINITION_IN_USED_ERROR = 10163;
public static final String PROCESS_DEFINITION_QUERY_FAILED = "PROCESS_DEFINITION_QUERY_FAILED";
public static final String PROCESS_DEFINITION_CREATION_FAILED = "PROCESS_DEFINITION_CREATION_FAILED";
public static final String PROCESS_DEFINITION_RELEASE_FAILED = "PROCESS_DEFINITION_RELEASE_FAILED";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public void beforeAll() {

String token = accessToken();
dolphinScheduleEngine.setToken(token);
dolphinScheduleEngine.start();
}

@AfterAll
Expand Down

0 comments on commit ace3362

Please sign in to comment.