Skip to content

Commit

Permalink
[2425] Begin move to configurable RetrySpec for request retry..
Browse files Browse the repository at this point in the history
as opposed to some homegrown class.

 reactor#2425
  • Loading branch information
crankydillo committed Aug 27, 2022
1 parent 6bc44e6 commit 35713bf
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import reactor.netty.transport.ClientTransport;
import reactor.util.Metrics;
import reactor.util.annotation.Nullable;
import reactor.util.retry.RetrySpec;

/**
* An HttpClient allows building in a safe immutable way an http client that is
Expand Down Expand Up @@ -637,49 +638,39 @@ public final RequestSender delete() {
}

/**
* Option to disable {@code retry once} support for the outgoing requests that fail with
* {@link reactor.netty.channel.AbortedException#isConnectionReset(Throwable)}.
* <p>By default this is set to false in which case {@code retry once} is enabled.
* Option to disable {@code request retry} established by {@link #requestRetry(RetrySpec)}.
* See its doc for any default retry behavior!
* <p>By default this is set to false.
*
* @param disableRetry true to disable {@code retry once}, false to enable it
* @param disableRetry true to disable {@code request retry}, false to enable it
*
* @return a new {@link HttpClient}
* @since 0.9.6
*/
public final HttpClient disableRetry(boolean disableRetry) {
if (RequestRetryConfig.DISABLED == configuration().retryConfig) { // yes instance comparison..
if (disableRetry == configuration().retryDisabled) {
return this;
}
HttpClient dup = duplicate();
dup.configuration().retryConfig = RequestRetryConfig.DISABLED;
dup.configuration().retryDisabled = disableRetry;
return dup;
}

public final HttpClient retryConfig(final RequestRetryConfig retryConfig) {
Objects.requireNonNull(retryConfig, "retryConfig");
/**
* Option to customize {@code request retry} behavior. If any HTTP request data
* (headers, body, etc.), the request will not be resubmitted regardless of this
* configuration. This can be disabled via {@link #disableRetry(boolean)}.
*
* <p>This defaults to {@code retry once} for outgoing requests that fail with
* {@link reactor.netty.channel.AbortedException#isConnectionReset(Throwable)}.
*/
public final HttpClient requestRetry(final RetrySpec requestRetry) {
Objects.requireNonNull(requestRetry, "requestRetry");
HttpClient dup = duplicate();
dup.configuration().retryConfig = retryConfig;
dup.configuration().requestRetrySpec = requestRetry;
return dup;
}

public static class RequestRetryConfig {

public final static RequestRetryConfig DEFAULT = new RequestRetryConfig(1, AbortedException::isConnectionReset);
public final static RequestRetryConfig DISABLED = new RequestRetryConfig(0, anyException -> false);

final int maxRetries;
final Predicate<IOException> isRetriable;

public RequestRetryConfig(final int maxRetries, final Predicate<IOException> isRetriable) {
this.maxRetries = maxRetries;
this.isRetriable = isRetriable;
}

boolean isRetrieable(final IOException ioe) {
return isRetriable.test(ioe);
}
}

/**
* Setup a callback called when {@link HttpClientRequest} has been sent
* and {@link HttpClientState#REQUEST_SENT} has been emitted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import reactor.netty.NettyOutbound;
import reactor.netty.NettyPipeline;
import reactor.netty.ReactorNetty;
import reactor.netty.channel.AbortedException;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.http.Http2SettingsSpec;
Expand All @@ -84,6 +85,7 @@
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
import reactor.util.retry.Retry;
import reactor.util.retry.RetrySpec;

import static reactor.netty.ReactorNetty.format;
import static reactor.netty.http.client.Http2ConnectionProvider.OWNER;
Expand Down Expand Up @@ -316,9 +318,8 @@ public WebsocketClientSpec websocketClientSpec() {
Consumer<HttpClientRequest> redirectRequestConsumer;
Duration responseTimeout;

HttpClient.RequestRetryConfig retryConfig;
RetrySpec requestRetrySpec;

// TODO consolidate this with config concept
boolean retryDisabled;

SslProvider sslProvider;
Expand All @@ -338,7 +339,7 @@ public WebsocketClientSpec websocketClientSpec() {
this.method = HttpMethod.GET;
this.protocols = new HttpProtocol[]{HttpProtocol.HTTP11};
this._protocols = h11;
this.retryConfig = HttpClient.RequestRetryConfig.DEFAULT;
this.requestRetrySpec = Retry.max(1).filter(AbortedException::isConnectionReset);
}

HttpClientConfig(HttpClientConfig parent) {
Expand Down Expand Up @@ -367,7 +368,7 @@ public WebsocketClientSpec websocketClientSpec() {
this.redirectRequestBiConsumer = parent.redirectRequestBiConsumer;
this.redirectRequestConsumer = parent.redirectRequestConsumer;
this.responseTimeout = parent.responseTimeout;
this.retryConfig = parent.retryConfig;
this.requestRetrySpec = parent.requestRetrySpec;
this.retryDisabled = parent.retryDisabled;
this.sslProvider = parent.sslProvider;
this.uri = parent.uri;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package reactor.netty.http.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
Expand Down Expand Up @@ -67,6 +66,7 @@
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
import reactor.util.retry.Retry;
import reactor.util.retry.RetrySpec;

import static reactor.netty.ReactorNetty.format;
import static reactor.netty.http.client.HttpClientState.STREAM_CONFIGURED;
Expand Down Expand Up @@ -208,7 +208,7 @@ static final class MonoHttpConnect extends Mono<Connection> {
public void subscribe(CoreSubscriber<? super Connection> actual) {
HttpClientHandler handler = new HttpClientHandler(config);

Mono.<Connection>create(sink -> {
final Mono<Connection> baseMono = Mono.<Connection>create(sink -> {
HttpClientConfig _config = config;

//append secure handler if needed
Expand Down Expand Up @@ -269,7 +269,6 @@ public void subscribe(CoreSubscriber<? super Connection> actual) {
.acquire(_config, observer, handler, resolver)
.subscribe(new ClientTransportSubscriber(sink));

// TODO definitely not happy about spreading the retry logic even more
}).retryWhen(Retry.indefinitely().filter(err -> {
if (err instanceof RedirectClientException) {
RedirectClientException re = (RedirectClientException)err;
Expand All @@ -280,8 +279,13 @@ public void subscribe(CoreSubscriber<? super Connection> actual) {
return true;
}
return false;
})).retryWhen(Retry.max(config.retryConfig.maxRetries).filter(handler))
.subscribe(actual);
}));

// If request retry is enabled, the handler should guarantee no request data sent
(config.retryDisabled
? baseMono
: baseMono.retryWhen(config.requestRetrySpec.modifyErrorFilter(handler::and))
).subscribe(actual);
}

private void removeIncompatibleProtocol(HttpClientConfig config, HttpProtocol protocol) {
Expand Down Expand Up @@ -358,7 +362,7 @@ public void onUncaughtException(Connection connection, Throwable error) {
handler.previousRequestHeaders = ops.requestHeaders;
}
}
else if (handler.canRetry(error)) {
else if (handler.requestRetrySpec.errorFilter.test(error)) {
HttpClientOperations ops = connection.as(HttpClientOperations.class);
if (ops != null && ops.hasSentHeaders()) {
// In some cases the channel close event may be delayed and thus the connection to be
Expand All @@ -369,8 +373,8 @@ else if (handler.canRetry(error)) {
// Mark the connection as non-persistent here so that it is never returned to the pool and leave
// the channel close event to invalidate it.
ops.markPersistent(false);
// Disable retry if the headers or/and the body were sent
handler.shouldRetry = false;
// Signal to retry that headers or/and the body were sent
handler.requestDataSent = true;
if (log.isWarnEnabled()) {
log.warn(format(connection.channel(),
"The connection observed an error, the request cannot be " +
Expand Down Expand Up @@ -480,10 +484,14 @@ static final class HttpClientHandler extends SocketAddress
volatile UriEndpoint fromURI;
volatile Supplier<String>[] redirectedFrom;

final RequestRetryConfig retryConfig;
/**
* A {@link RetrySpec} that is tied to request submission. The implementation
* that leverages this is supposed to guarantee that no retry will happen if
* any HTTP request data is sent over the wire.
*/
final RetrySpec requestRetrySpec;

// TODO not happy with name as it collides with config concept..
volatile boolean shouldRetry = true;
volatile boolean requestDataSent;

volatile HttpHeaders previousRequestHeaders;

Expand All @@ -504,7 +512,7 @@ static final class HttpClientHandler extends SocketAddress
new UriEndpointFactory(configuration.remoteAddress(), configuration.isSecure(), URI_ADDRESS_MAPPER);

this.websocketClientSpec = configuration.websocketClientSpec;
this.retryConfig = configuration.retryConfig;
this.requestRetrySpec = configuration.requestRetrySpec;
this.handler = configuration.body;

if (configuration.uri == null) {
Expand Down Expand Up @@ -690,22 +698,13 @@ void channel(HttpClientOperations ops) {

@Override
public boolean test(Throwable throwable) {
if (shouldRetry && canRetry(throwable)) {
if (!requestDataSent) {
redirect(toURI.toString());
return true;
}
return false;
}

/**
* Signals that the request <i>can</i> be retried.
*/
boolean canRetry(final Throwable err) {
return shouldRetry &&
err instanceof IOException &&
retryConfig.isRetrieable((IOException)err);
}

@Override
public String toString() {
return "{" + "uri=" + toURI + ", method=" + method + '}';
Expand Down

0 comments on commit 35713bf

Please sign in to comment.