diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpAttemptEvent.java similarity index 52% rename from eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java rename to eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpAttemptEvent.java index 4b229f9839..8163852f8f 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpRetryEvent.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/HttpAttemptEvent.java @@ -17,41 +17,82 @@ package org.apache.eventmesh.connector.http.sink.data; -import lombok.Data; +import java.util.concurrent.atomic.AtomicInteger; /** - * Single HTTP retry event + * Single HTTP attempt event */ -@Data -public class HttpRetryEvent { +public class HttpAttemptEvent { - public static final String PREFIX = "http-retry-event-"; + public static final String PREFIX = "http-attempt-event-"; - private String parentId; + private final int maxAttempts; - private int maxRetries; - - private int currentRetries; + private final AtomicInteger attempts; private Throwable lastException; + + public HttpAttemptEvent(int maxAttempts) { + this.maxAttempts = maxAttempts; + this.attempts = new AtomicInteger(0); + } + + /** + * Increment the attempts + */ + public void incrementAttempts() { + attempts.incrementAndGet(); + } + /** - * Increase the current retries by 1 + * Update the event, incrementing the attempts and setting the last exception + * + * @param exception the exception to update, can be null */ - public void increaseCurrentRetries() { - this.currentRetries++; + public void updateEvent(Throwable exception) { + // increment the attempts + incrementAttempts(); + + // update the last exception + lastException = exception; } /** - * Check if the current retries is greater than or equal to the max retries - * @return true if the current retries is greater than or equal to the max retries + * Check if the attempts are less than the maximum attempts + * + * @return true if the attempts are less than the maximum attempts, false otherwise */ - public boolean isMaxRetriesReached() { - return this.currentRetries >= this.maxRetries; + public boolean canAttempt() { + return attempts.get() < maxAttempts; + } + + public boolean isComplete() { + if (attempts.get() == 0) { + // No start yet + return false; + } + + // If no attempt can be made or the last exception is null, the event completed + return !canAttempt() || lastException == null; + } + + + public int getMaxAttempts() { + return maxAttempts; + } + + public int getAttempts() { + return attempts.get(); + } + + public Throwable getLastException() { + return lastException; } /** * Get the limited exception message with the default limit of 256 + * * @return the limited exception message */ public String getLimitedExceptionMessage() { @@ -60,6 +101,7 @@ public String getLimitedExceptionMessage() { /** * Get the limited exception message with the specified limit + * * @param maxLimit the maximum limit of the exception message * @return the limited exception message */ diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/MultiHttpRequestContext.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/MultiHttpRequestContext.java index 67ab943818..66f5d0e7ec 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/MultiHttpRequestContext.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/data/MultiHttpRequestContext.java @@ -34,10 +34,9 @@ public class MultiHttpRequestContext { /** * The last failed event. - * If there are no retries or retries are not enabled, it will be null. * If retries occur but still fail, it will be logged, and only the last one will be retained. */ - private HttpRetryEvent lastFailedEvent; + private HttpAttemptEvent lastFailedEvent; public MultiHttpRequestContext(int remainingEvents) { this.remainingRequests = new AtomicInteger(remainingEvents); @@ -50,15 +49,24 @@ public void decrementRemainingRequests() { remainingRequests.decrementAndGet(); } + /** + * Check if all requests have been processed. + * + * @return true if all requests have been processed, false otherwise. + */ + public boolean isAllRequestsProcessed() { + return remainingRequests.get() == 0; + } + public int getRemainingRequests() { return remainingRequests.get(); } - public HttpRetryEvent getLastFailedEvent() { + public HttpAttemptEvent getLastFailedEvent() { return lastFailedEvent; } - public void setLastFailedEvent(HttpRetryEvent lastFailedEvent) { + public void setLastFailedEvent(HttpAttemptEvent lastFailedEvent) { this.lastFailedEvent = lastFailedEvent; } } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java index 5c868f4aa9..28ba791127 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/AbstractHttpSinkHandler.java @@ -18,8 +18,8 @@ package org.apache.eventmesh.connector.http.sink.handler; import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig; +import org.apache.eventmesh.connector.http.sink.data.HttpAttemptEvent; import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; -import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent; import org.apache.eventmesh.connector.http.sink.data.MultiHttpRequestContext; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; @@ -75,10 +75,9 @@ public void handle(ConnectRecord record) { this.sinkConnectorConfig.getWebhookConfig().isActivate() ? "webhook" : "common"); HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type); - // add retry event to attributes - HttpRetryEvent retryEvent = new HttpRetryEvent(); - retryEvent.setMaxRetries(sinkConnectorConfig.getRetryConfig().getMaxRetries()); - attributes.put(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId(), retryEvent); + // add AttemptEvent to the attributes + HttpAttemptEvent attemptEvent = new HttpAttemptEvent(this.sinkConnectorConfig.getRetryConfig().getMaxRetries() + 1); + attributes.put(HttpAttemptEvent.PREFIX + httpConnectRecord.getHttpRecordId(), attemptEvent); // deliver the record deliver(url, httpConnectRecord, attributes, record); diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java index e88707482f..61bdc9f310 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/CommonHttpSinkHandler.java @@ -19,8 +19,8 @@ import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig; import org.apache.eventmesh.common.utils.JsonUtils; +import org.apache.eventmesh.connector.http.sink.data.HttpAttemptEvent; import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; -import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent; import org.apache.eventmesh.connector.http.sink.data.MultiHttpRequestContext; import org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler; import org.apache.eventmesh.connector.http.util.HttpUtils; @@ -176,13 +176,14 @@ public Future> deliver(URI url, HttpConnectRecord httpConne * @param attributes additional attributes to be used in processing */ private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, Map attributes, ConnectRecord record) { - // get the retry event - HttpRetryEvent retryEvent = getAndUpdateRetryEvent(attributes, httpConnectRecord, e); + // get and update the attempt event + HttpAttemptEvent attemptEvent = (HttpAttemptEvent) attributes.get(HttpAttemptEvent.PREFIX + httpConnectRecord.getHttpRecordId()); + attemptEvent.updateEvent(e); - // get the multi http request context - MultiHttpRequestContext multiHttpRequestContext = getAndUpdateMultiHttpRequestContext(attributes, retryEvent); + // get and update the multiHttpRequestContext + MultiHttpRequestContext multiHttpRequestContext = getAndUpdateMultiHttpRequestContext(attributes, attemptEvent); - if (multiHttpRequestContext.getRemainingRequests() == 0) { + if (multiHttpRequestContext.isAllRequestsProcessed()) { // do callback if (record.getCallback() == null) { if (log.isDebugEnabled()) { @@ -193,7 +194,8 @@ private void tryCallback(HttpConnectRecord httpConnectRecord, Throwable e, Map attributes, HttpConnectRecord httpConnectRecord, Throwable e) { - // get the retry event - HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId()); - // update the retry event - retryEvent.setLastException(e); - return retryEvent; - } - /** * Gets and updates the multi http request context based on the provided attributes and HttpConnectRecord. * - * @param attributes the attributes to use - * @param retryEvent the retry event to use + * @param attributes the attributes to use + * @param attemptEvent the HttpAttemptEvent to use * @return the updated multi http request context */ - private MultiHttpRequestContext getAndUpdateMultiHttpRequestContext(Map attributes, HttpRetryEvent retryEvent) { + private MultiHttpRequestContext getAndUpdateMultiHttpRequestContext(Map attributes, HttpAttemptEvent attemptEvent) { // get the multi http request context MultiHttpRequestContext multiHttpRequestContext = (MultiHttpRequestContext) attributes.get(MultiHttpRequestContext.NAME); - if (retryEvent.getLastException() == null || retryEvent.isMaxRetriesReached()) { + // Check if the current attempted event has completed + if (attemptEvent.isComplete()) { // decrement the counter multiHttpRequestContext.decrementRemainingRequests(); - // try set failed event - if (retryEvent.getLastException() != null) { - multiHttpRequestContext.setLastFailedEvent(retryEvent); + if (attemptEvent.getLastException() != null) { + // if all attempts are exhausted, set the last failed event + multiHttpRequestContext.setLastFailedEvent(attemptEvent); } } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java index 820b46296a..050839451a 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/HttpSinkHandlerRetryWrapper.java @@ -20,7 +20,6 @@ import org.apache.eventmesh.common.config.connector.http.HttpRetryConfig; import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig; import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; -import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent; import org.apache.eventmesh.connector.http.sink.handler.AbstractHttpSinkHandler; import org.apache.eventmesh.connector.http.sink.handler.HttpSinkHandler; import org.apache.eventmesh.connector.http.util.HttpUtils; @@ -51,10 +50,38 @@ public class HttpSinkHandlerRetryWrapper extends AbstractHttpSinkHandler { private final HttpSinkHandler sinkHandler; + private final RetryPolicy> retryPolicy; + public HttpSinkHandlerRetryWrapper(SinkConnectorConfig sinkConnectorConfig, HttpSinkHandler sinkHandler) { super(sinkConnectorConfig); this.sinkHandler = sinkHandler; this.httpRetryConfig = getSinkConnectorConfig().getRetryConfig(); + this.retryPolicy = buildRetryPolicy(); + } + + private RetryPolicy> buildRetryPolicy() { + return RetryPolicy.>builder() + .handleIf(e -> e instanceof ConnectException) + .handleResultIf(response -> httpRetryConfig.isRetryOnNonSuccess() && !HttpUtils.is2xxSuccessful(response.statusCode())) + .withMaxRetries(httpRetryConfig.getMaxRetries()) + .withDelay(Duration.ofMillis(httpRetryConfig.getInterval())) + .onRetry(event -> { + if (log.isDebugEnabled()) { + log.warn("Failed to deliver message after {} attempts. Retrying in {} ms. Error: {}", + event.getAttemptCount(), httpRetryConfig.getInterval(), event.getLastException()); + } else { + log.warn("Failed to deliver message after {} attempts. Retrying in {} ms.", + event.getAttemptCount(), httpRetryConfig.getInterval()); + } + }).onFailure(event -> { + if (log.isDebugEnabled()) { + log.error("Failed to deliver message after {} attempts. Error: {}", + event.getAttemptCount(), event.getException()); + } else { + log.error("Failed to deliver message after {} attempts.", + event.getAttemptCount()); + } + }).build(); } /** @@ -78,36 +105,8 @@ public void start() { @Override public Future> deliver(URI url, HttpConnectRecord httpConnectRecord, Map attributes, ConnectRecord connectRecord) { - - // Build the retry policy - RetryPolicy> retryPolicy = RetryPolicy.>builder() - .handleIf(e -> e instanceof ConnectException) - .handleResultIf(response -> httpRetryConfig.isRetryOnNonSuccess() && !HttpUtils.is2xxSuccessful(response.statusCode())) - .withMaxRetries(httpRetryConfig.getMaxRetries()) - .withDelay(Duration.ofMillis(httpRetryConfig.getInterval())) - .onRetry(event -> { - if (log.isDebugEnabled()) { - log.warn("Retrying the request to {} for the {} time. {}", url, event.getAttemptCount(), httpConnectRecord); - } else { - log.warn("Retrying the request to {} for the {} time.", url, event.getAttemptCount()); - } - // update the retry event - HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId()); - retryEvent.increaseCurrentRetries(); - }) - .onFailure(event -> { - if (log.isDebugEnabled()) { - log.error("Failed to send the request to {} after {} attempts. {}", url, event.getAttemptCount(), - httpConnectRecord, event.getException()); - } else { - log.error("Failed to send the request to {} after {} attempts.", url, event.getAttemptCount(), event.getException()); - } - }).build(); - - // Handle the ConnectRecord with retry policy Failsafe.with(retryPolicy) .getStageAsync(() -> sinkHandler.deliver(url, httpConnectRecord, attributes, connectRecord).toCompletionStage()); - return null; } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java index 7edd84a967..0751918ee7 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/handler/impl/WebhookHttpSinkHandler.java @@ -21,11 +21,11 @@ import org.apache.eventmesh.common.config.connector.http.SinkConnectorConfig; import org.apache.eventmesh.common.exception.EventMeshException; import org.apache.eventmesh.connector.http.common.SynchronizedCircularFifoQueue; +import org.apache.eventmesh.connector.http.sink.data.HttpAttemptEvent; import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord; import org.apache.eventmesh.connector.http.sink.data.HttpExportMetadata; import org.apache.eventmesh.connector.http.sink.data.HttpExportRecord; import org.apache.eventmesh.connector.http.sink.data.HttpExportRecordPage; -import org.apache.eventmesh.connector.http.sink.data.HttpRetryEvent; import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord; import org.apache.commons.lang3.StringUtils; @@ -216,18 +216,14 @@ public Future> deliver(URI url, HttpConnectRecord httpConne Future> responseFuture = super.deliver(url, httpConnectRecord, attributes, connectRecord); // store the received data return responseFuture.onComplete(arr -> { - // get tryEvent from attributes - HttpRetryEvent retryEvent = (HttpRetryEvent) attributes.get(HttpRetryEvent.PREFIX + httpConnectRecord.getHttpRecordId()); + // get HttpAttemptEvent + HttpAttemptEvent attemptEvent = (HttpAttemptEvent) attributes.get(HttpAttemptEvent.PREFIX + httpConnectRecord.getHttpRecordId()); - HttpResponse response = null; - if (arr.succeeded()) { - response = arr.result(); - } else { - retryEvent.setLastException(arr.cause()); - } + // get the response + HttpResponse response = arr.succeeded() ? arr.result() : null; // create ExportMetadata - HttpExportMetadata httpExportMetadata = buildHttpExportMetadata(url, response, httpConnectRecord, retryEvent); + HttpExportMetadata httpExportMetadata = buildHttpExportMetadata(url, response, httpConnectRecord, attemptEvent); // create ExportRecord HttpExportRecord exportRecord = new HttpExportRecord(httpExportMetadata, arr.succeeded() ? arr.result().bodyAsString() : null); @@ -242,17 +238,16 @@ public Future> deliver(URI url, HttpConnectRecord httpConne * @param url the URI to which the HttpConnectRecord was sent * @param response the response received from the URI * @param httpConnectRecord the HttpConnectRecord that was sent - * @param retryEvent the SingleHttpRetryEvent that was used for retries + * @param attemptEvent the HttpAttemptEvent that was used to send the HttpConnectRecord * @return the HttpExportMetadata object */ private HttpExportMetadata buildHttpExportMetadata(URI url, HttpResponse response, HttpConnectRecord httpConnectRecord, - HttpRetryEvent retryEvent) { + HttpAttemptEvent attemptEvent) { String msg = null; // order of precedence: lastException > response > null - if (retryEvent.getLastException() != null) { - msg = retryEvent.getLimitedExceptionMessage(); - retryEvent.setLastException(null); + if (attemptEvent.getLastException() != null) { + msg = attemptEvent.getLimitedExceptionMessage(); } else if (response != null) { msg = response.statusMessage(); } @@ -263,7 +258,7 @@ private HttpExportMetadata buildHttpExportMetadata(URI url, HttpResponse .message(msg) .receivedTime(LocalDateTime.now()) .recordId(httpConnectRecord.getHttpRecordId()) - .retryNum(retryEvent.getCurrentRetries()) + .retryNum(attemptEvent.getAttempts() - 1) .build(); } diff --git a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java index 5f65f0749f..be2b52e737 100644 --- a/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java +++ b/eventmesh-connectors/eventmesh-connector-http/src/test/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnectorTest.java @@ -83,14 +83,12 @@ void before() throws Exception { httpRequest -> { // Increase the number of requests received counter.incrementAndGet(); - JSONObject requestBody = JSON.parseObject(httpRequest.getBodyAsString()); return HttpResponse.response() .withContentType(MediaType.APPLICATION_JSON) .withStatusCode(HttpStatus.SC_OK) .withBody(new JSONObject() .fluentPut("code", 0) .fluentPut("message", "success") - .fluentPut("data", requestBody.getJSONObject("data").get("data")) .toJSONString() ); // .withDelay(TimeUnit.SECONDS, 10); }