Skip to content

Commit

Permalink
[ISSUE #5105] Fix the retry mechanism of the HttpSinkConnector (#5106)
Browse files Browse the repository at this point in the history
  • Loading branch information
cnzakii authored Oct 28, 2024
1 parent 77063b3 commit fe3d56b
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,82 @@

package org.apache.eventmesh.connector.http.sink.data;

import lombok.Data;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Single HTTP retry event
* Single HTTP attempt event
*/
@Data
public class HttpRetryEvent {
public class HttpAttemptEvent {

public static final String PREFIX = "http-retry-event-";
public static final String PREFIX = "http-attempt-event-";

private String parentId;
private final int maxAttempts;

private int maxRetries;

private int currentRetries;
private final AtomicInteger attempts;

private Throwable lastException;


public HttpAttemptEvent(int maxAttempts) {
this.maxAttempts = maxAttempts;
this.attempts = new AtomicInteger(0);
}

/**
* Increment the attempts
*/
public void incrementAttempts() {
attempts.incrementAndGet();
}

/**
* Increase the current retries by 1
* Update the event, incrementing the attempts and setting the last exception
*
* @param exception the exception to update, can be null
*/
public void increaseCurrentRetries() {
this.currentRetries++;
public void updateEvent(Throwable exception) {
// increment the attempts
incrementAttempts();

// update the last exception
lastException = exception;
}

/**
* Check if the current retries is greater than or equal to the max retries
* @return true if the current retries is greater than or equal to the max retries
* Check if the attempts are less than the maximum attempts
*
* @return true if the attempts are less than the maximum attempts, false otherwise
*/
public boolean isMaxRetriesReached() {
return this.currentRetries >= this.maxRetries;
public boolean canAttempt() {
return attempts.get() < maxAttempts;
}

public boolean isComplete() {
if (attempts.get() == 0) {
// No start yet
return false;
}

// If no attempt can be made or the last exception is null, the event completed
return !canAttempt() || lastException == null;
}


public int getMaxAttempts() {
return maxAttempts;
}

public int getAttempts() {
return attempts.get();
}

public Throwable getLastException() {
return lastException;
}

/**
* Get the limited exception message with the default limit of 256
*
* @return the limited exception message
*/
public String getLimitedExceptionMessage() {
Expand All @@ -60,6 +101,7 @@ public String getLimitedExceptionMessage() {

/**
* Get the limited exception message with the specified limit
*
* @param maxLimit the maximum limit of the exception message
* @return the limited exception message
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ public class MultiHttpRequestContext {

/**
* The last failed event.
* If there are no retries or retries are not enabled, it will be null.
* If retries occur but still fail, it will be logged, and only the last one will be retained.
*/
private HttpRetryEvent lastFailedEvent;
private HttpAttemptEvent lastFailedEvent;

public MultiHttpRequestContext(int remainingEvents) {
this.remainingRequests = new AtomicInteger(remainingEvents);
Expand All @@ -50,15 +49,24 @@ public void decrementRemainingRequests() {
remainingRequests.decrementAndGet();
}

/**
* Check if all requests have been processed.
*
* @return true if all requests have been processed, false otherwise.
*/
public boolean isAllRequestsProcessed() {
return remainingRequests.get() == 0;
}

public int getRemainingRequests() {
return remainingRequests.get();
}

public HttpRetryEvent getLastFailedEvent() {
public HttpAttemptEvent getLastFailedEvent() {
return lastFailedEvent;
}

public void setLastFailedEvent(HttpRetryEvent lastFailedEvent) {
public void setLastFailedEvent(HttpAttemptEvent lastFailedEvent) {
this.lastFailedEvent = lastFailedEvent;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.eventmesh.connector.http.sink.handler;

import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
import org.apache.eventmesh.connector.http.sink.data.HttpAttemptEvent;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
import org.apache.eventmesh.connector.http.sink.data.MultiHttpRequestContext;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

Expand Down Expand Up @@ -75,10 +75,9 @@ public void handle(ConnectRecord record) {
this.sinkConnectorConfig.getWebhookConfig().isActivate() ? "webhook" : "common");
HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type);

// add retry event to attributes
HttpRetryEvent retryEvent = new HttpRetryEvent();
retryEvent.setMaxRetries(sinkConnectorConfig.getRetryConfig().getMaxRetries());
attributes.put(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId(), retryEvent);
// add AttemptEvent to the attributes
HttpAttemptEvent attemptEvent = new HttpAttemptEvent(this.sinkConnectorConfig.getRetryConfig().getMaxRetries() + 1);
attributes.put(HttpAttemptEvent.PREFIX + httpConnectRecord.getHttpRecordId(), attemptEvent);

// deliver the record
deliver(url, httpConnectRecord, attributes, record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.connector.http.sink.data.HttpAttemptEvent;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
import org.apache.eventmesh.connector.http.sink.data.MultiHttpRequestContext;
import org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler;
import org.apache.eventmesh.connector.http.util.HttpUtils;
Expand Down Expand Up @@ -176,13 +176,14 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne
* @param attributes additional attributes to be used in processing
*/
private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, Map<String, Object> attributes, ConnectRecord record) {
// get the retry event
HttpRetryEvent retryEvent = getAndUpdateRetryEvent(attributes, httpConnectRecord, e);
// get and update the attempt event
HttpAttemptEvent attemptEvent = (HttpAttemptEvent) attributes.get(HttpAttemptEvent.PREFIX + httpConnectRecord.getHttpRecordId());
attemptEvent.updateEvent(e);

// get the multi http request context
MultiHttpRequestContext multiHttpRequestContext = getAndUpdateMultiHttpRequestContext(attributes, retryEvent);
// get and update the multiHttpRequestContext
MultiHttpRequestContext multiHttpRequestContext = getAndUpdateMultiHttpRequestContext(attributes, attemptEvent);

if (multiHttpRequestContext.getRemainingRequests() == 0) {
if (multiHttpRequestContext.isAllRequestsProcessed()) {
// do callback
if (record.getCallback() == null) {
if (log.isDebugEnabled()) {
Expand All @@ -193,7 +194,8 @@ private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, Map<S
return;
}

HttpRetryEvent lastFailedEvent = multiHttpRequestContext.getLastFailedEvent();
// get the last failed event
HttpAttemptEvent lastFailedEvent = multiHttpRequestContext.getLastFailedEvent();
if (lastFailedEvent == null) {
// success
record.getCallback().onSuccess(convertToSendResult(record));
Expand All @@ -204,41 +206,26 @@ private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, Map<S
}
}

/**
* Gets and updates the retry event based on the provided attributes and HttpConnectRecord.
*
* @param attributes the attributes to use
* @param httpConnectRecord the HttpConnectRecord to use
* @param e the exception thrown during the request, may be null
* @return the updated retry event
*/
private HttpRetryEvent getAndUpdateRetryEvent(Map<String, Object> attributes, HttpConnectRecord httpConnectRecord, Throwable e) {
// get the retry event
HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId());
// update the retry event
retryEvent.setLastException(e);
return retryEvent;
}


/**
* Gets and updates the multi http request context based on the provided attributes and HttpConnectRecord.
*
* @param attributes the attributes to use
* @param retryEvent the retry event to use
* @param attributes the attributes to use
* @param attemptEvent the HttpAttemptEvent to use
* @return the updated multi http request context
*/
private MultiHttpRequestContext getAndUpdateMultiHttpRequestContext(Map<String, Object> attributes, HttpRetryEvent retryEvent) {
private MultiHttpRequestContext getAndUpdateMultiHttpRequestContext(Map<String, Object> attributes, HttpAttemptEvent attemptEvent) {
// get the multi http request context
MultiHttpRequestContext multiHttpRequestContext = (MultiHttpRequestContext) attributes.get(MultiHttpRequestContext.NAME);

if (retryEvent.getLastException() == null || retryEvent.isMaxRetriesReached()) {
// Check if the current attempted event has completed
if (attemptEvent.isComplete()) {
// decrement the counter
multiHttpRequestContext.decrementRemainingRequests();

// try set failed event
if (retryEvent.getLastException() != null) {
multiHttpRequestContext.setLastFailedEvent(retryEvent);
if (attemptEvent.getLastException() != null) {
// if all attempts are exhausted, set the last failed event
multiHttpRequestContext.setLastFailedEvent(attemptEvent);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.eventmesh.common.config.connector.http.HttpRetryConfig;
import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent;
import org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler;
import org.apache.eventmesh.connector.http.sink.handler.HttpSinkHandler;
import org.apache.eventmesh.connector.http.util.HttpUtils;
Expand Down Expand Up @@ -51,10 +50,38 @@ public class HttpSinkHandlerRetryWrapper extends AbstractHttpSinkHandler {

private final HttpSinkHandler sinkHandler;

private final RetryPolicy<HttpResponse<Buffer>> retryPolicy;

public HttpSinkHandlerRetryWrapper(SinkConnectorConfig sinkConnectorConfig, HttpSinkHandler sinkHandler) {
super(sinkConnectorConfig);
this.sinkHandler = sinkHandler;
this.httpRetryConfig = getSinkConnectorConfig().getRetryConfig();
this.retryPolicy = buildRetryPolicy();
}

private RetryPolicy<HttpResponse<Buffer>> buildRetryPolicy() {
return RetryPolicy.<HttpResponse<Buffer>>builder()
.handleIf(e -> e instanceof ConnectException)
.handleResultIf(response -> httpRetryConfig.isRetryOnNonSuccess() && !HttpUtils.is2xxSuccessful(response.statusCode()))
.withMaxRetries(httpRetryConfig.getMaxRetries())
.withDelay(Duration.ofMillis(httpRetryConfig.getInterval()))
.onRetry(event -> {
if (log.isDebugEnabled()) {
log.warn("Failed to deliver message after {} attempts. Retrying in {} ms. Error: {}",
event.getAttemptCount(), httpRetryConfig.getInterval(), event.getLastException());
} else {
log.warn("Failed to deliver message after {} attempts. Retrying in {} ms.",
event.getAttemptCount(), httpRetryConfig.getInterval());
}
}).onFailure(event -> {
if (log.isDebugEnabled()) {
log.error("Failed to deliver message after {} attempts. Error: {}",
event.getAttemptCount(), event.getException());
} else {
log.error("Failed to deliver message after {} attempts.",
event.getAttemptCount());
}
}).build();
}

/**
Expand All @@ -78,36 +105,8 @@ public void start() {
@Override
public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConnectRecord, Map<String, Object> attributes,
ConnectRecord connectRecord) {

// Build the retry policy
RetryPolicy<HttpResponse<Buffer>> retryPolicy = RetryPolicy.<HttpResponse<Buffer>>builder()
.handleIf(e -> e instanceof ConnectException)
.handleResultIf(response -> httpRetryConfig.isRetryOnNonSuccess() && !HttpUtils.is2xxSuccessful(response.statusCode()))
.withMaxRetries(httpRetryConfig.getMaxRetries())
.withDelay(Duration.ofMillis(httpRetryConfig.getInterval()))
.onRetry(event -> {
if (log.isDebugEnabled()) {
log.warn("Retrying the request to {} for the {} time. {}", url, event.getAttemptCount(), httpConnectRecord);
} else {
log.warn("Retrying the request to {} for the {} time.", url, event.getAttemptCount());
}
// update the retry event
HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId());
retryEvent.increaseCurrentRetries();
})
.onFailure(event -> {
if (log.isDebugEnabled()) {
log.error("Failed to send the request to {} after {} attempts. {}", url, event.getAttemptCount(),
httpConnectRecord, event.getException());
} else {
log.error("Failed to send the request to {} after {} attempts.", url, event.getAttemptCount(), event.getException());
}
}).build();

// Handle the ConnectRecord with retry policy
Failsafe.with(retryPolicy)
.getStageAsync(() -> sinkHandler.deliver(url, httpConnectRecord, attributes, connectRecord).toCompletionStage());

return null;
}

Expand Down
Loading

0 comments on commit fe3d56b

Please sign in to comment.