diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java index 85604e0484..77ff45fbb0 100644 --- a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java +++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.ThreadingBehavior; @@ -58,7 +59,6 @@ final class ReactiveDataConsumer implements AsyncDataConsumer, Publisher buffers = new LinkedBlockingQueue<>(); private final AtomicBoolean flushInProgress = new AtomicBoolean(false); - private final Object flushLock = new Object(); private final AtomicInteger windowScalingIncrement = new AtomicInteger(0); private volatile boolean cancelled; private volatile boolean completed; @@ -66,6 +66,8 @@ final class ReactiveDataConsumer implements AsyncDataConsumer, Publisher subscriber; + private final ReentrantLock lock = new ReentrantLock(); + public void failed(final Exception cause) { if (!completed) { exception = cause; @@ -119,7 +121,8 @@ public void releaseResources() { } private void flushToSubscriber() { - synchronized (flushLock) { + lock.lock(); + try { final Subscriber s = subscriber; if (flushInProgress.getAndSet(true)) { return; @@ -157,6 +160,8 @@ private void flushToSubscriber() { } finally { flushInProgress.set(false); } + } finally { + lock.unlock(); } } diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java index 61813caeba..036881c2fe 100644 --- a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java +++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataProducer.java @@ -31,6 +31,7 @@ import java.util.ArrayDeque; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.ThreadingBehavior; @@ -59,8 +60,11 @@ final class ReactiveDataProducer implements AsyncDataProducer, Subscriber subscription = new AtomicReference<>(); private final ArrayDeque buffers = new ArrayDeque<>(); // This field requires synchronization + private final ReentrantLock lock; + public ReactiveDataProducer(final Publisher publisher) { this.publisher = Args.notNull(publisher, "publisher"); + this.lock = new ReentrantLock(); } void setChannel(final DataStreamChannel channel) { @@ -80,8 +84,11 @@ public void onSubscribe(final Subscription subscription) { public void onNext(final ByteBuffer byteBuffer) { final byte[] copy = new byte[byteBuffer.remaining()]; byteBuffer.get(copy); - synchronized (buffers) { + lock.lock(); + try { buffers.add(ByteBuffer.wrap(copy)); + } finally { + lock.unlock(); } signalReadiness(); } @@ -113,12 +120,15 @@ public int available() { if (exception.get() != null || complete.get()) { return 1; } else { - synchronized (buffers) { + lock.lock(); + try { int sum = 0; for (final ByteBuffer buffer : buffers) { sum += buffer.remaining(); } return sum; + } finally { + lock.unlock(); } } } @@ -134,7 +144,8 @@ public void produce(final DataStreamChannel channel) throws IOException { final Subscription s = subscription.get(); int buffersToReplenish = 0; try { - synchronized (buffers) { + lock.lock(); + try { if (t != null) { throw new HttpStreamResetException(t.getMessage(), t); } else if (this.complete.get() && buffers.isEmpty()) { @@ -152,6 +163,8 @@ public void produce(final DataStreamChannel channel) throws IOException { } } } + } finally { + lock.unlock(); } } finally { if (s != null && buffersToReplenish > 0) { diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/SocksProxy.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/SocksProxy.java index 7d8234d8e6..f4a5609eb1 100644 --- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/SocksProxy.java +++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/SocksProxy.java @@ -37,6 +37,7 @@ import java.net.SocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hc.core5.net.InetAddressUtils; import org.apache.hc.core5.util.TimeValue; @@ -209,6 +210,7 @@ public void shutdown() { private final List handlers = new ArrayList<>(); private ServerSocket server; private Thread serverThread; + private final ReentrantLock lock; public SocksProxy() { this(0); @@ -216,36 +218,43 @@ public SocksProxy() { public SocksProxy(final int port) { this.port = port; + this.lock = new ReentrantLock(); } - public synchronized void start() throws IOException { - if (this.server == null) { - this.server = new ServerSocket(this.port); - this.serverThread = new Thread(() -> { - try { - while (true) { - final Socket socket = server.accept(); - startSocksProxyHandler(socket); - } - } catch (final IOException e) { - } finally { - if (server != null) { - try { - server.close(); - } catch (final IOException e) { + public void start() throws IOException { + lock.lock(); + try { + if (this.server == null) { + this.server = new ServerSocket(this.port); + this.serverThread = new Thread(() -> { + try { + while (true) { + final Socket socket = server.accept(); + startSocksProxyHandler(socket); + } + } catch (final IOException e) { + } finally { + if (server != null) { + try { + server.close(); + } catch (final IOException e) { + } + server = null; } - server = null; } - } - }); - this.serverThread.start(); + }); + this.serverThread.start(); + } + } finally { + lock.unlock(); } } public void shutdown(final TimeValue timeout) throws InterruptedException { final long waitUntil = System.currentTimeMillis() + timeout.toMilliseconds(); Thread t = null; - synchronized (this) { + lock.lock(); + try { if (this.server != null) { try { this.server.close(); @@ -265,6 +274,8 @@ public void shutdown(final TimeValue timeout) throws InterruptedException { wait(waitTime); } } + } finally { + lock.unlock(); } if (t != null) { final long waitTime = waitUntil - System.currentTimeMillis(); @@ -276,14 +287,22 @@ public void shutdown(final TimeValue timeout) throws InterruptedException { protected void startSocksProxyHandler(final Socket socket) { final SocksProxyHandler handler = new SocksProxyHandler(this, socket); - synchronized (this) { + lock.lock(); + try { this.handlers.add(handler); + } finally { + lock.unlock(); } handler.start(); } - protected synchronized void cleanupSocksProxyHandler(final SocksProxyHandler handler) { - this.handlers.remove(handler); + protected void cleanupSocksProxyHandler(final SocksProxyHandler handler) { + lock.lock(); + try { + this.handlers.remove(handler); + } finally { + lock.unlock(); + } } public SocketAddress getProxyAddress() { diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java index 0103df00cf..46951ffd1e 100644 --- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/Http1IntegrationTest.java @@ -56,6 +56,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hc.core5.http.ConnectionReuseStrategy; import org.apache.hc.core5.http.ContentLengthStrategy; @@ -141,6 +142,8 @@ public abstract class Http1IntegrationTest { private final URIScheme scheme; + private final ReentrantLock lock = new ReentrantLock(); + @RegisterExtension private final Http1TestResources resources; @@ -910,8 +913,11 @@ public void handleRequest( responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), context); } final HttpResponse response = new BasicHttpResponse(200); - synchronized (entityProducer) { + lock.lock(); + try { responseChannel.sendResponse(response, entityProducer, context); + } finally { + lock.unlock(); } } } catch (final Exception ignore) { @@ -936,15 +942,21 @@ public void streamEnd(final List trailers) throws HttpExceptio @Override public int available() { - synchronized (entityProducer) { + lock.lock(); + try { return entityProducer.available(); + } finally { + lock.unlock(); } } @Override public void produce(final DataStreamChannel channel) throws IOException { - synchronized (entityProducer) { + lock.lock(); + try { entityProducer.produce(channel); + } finally { + lock.unlock(); } } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/concurrent/BasicFuture.java b/httpcore5/src/main/java/org/apache/hc/core5/concurrent/BasicFuture.java index 447d2266e7..3cfe544173 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/concurrent/BasicFuture.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/concurrent/BasicFuture.java @@ -31,6 +31,8 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hc.core5.util.Args; import org.apache.hc.core5.util.TimeoutValueException; @@ -52,9 +54,14 @@ public class BasicFuture implements Future, Cancellable { private volatile T result; private volatile Exception ex; + private final ReentrantLock lock; + private final Condition condition; + public BasicFuture(final FutureCallback callback) { super(); this.callback = callback; + this.lock = new ReentrantLock(); + this.condition = lock.newCondition(); } @Override @@ -78,46 +85,59 @@ private T getResult() throws ExecutionException { } @Override - public synchronized T get() throws InterruptedException, ExecutionException { - while (!this.completed) { - wait(); + public T get() throws InterruptedException, ExecutionException { + lock.lock(); + try { + while (!this.completed) { + condition.await(); + } + return getResult(); + } finally { + lock.unlock(); } - return getResult(); } @Override - public synchronized T get(final long timeout, final TimeUnit unit) + public T get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { Args.notNull(unit, "Time unit"); final long msecs = unit.toMillis(timeout); final long startTime = (msecs <= 0) ? 0 : System.currentTimeMillis(); long waitTime = msecs; - if (this.completed) { - return getResult(); - } else if (waitTime <= 0) { - throw TimeoutValueException.fromMilliseconds(msecs, msecs + Math.abs(waitTime)); - } else { - for (;;) { - wait(waitTime); - if (this.completed) { - return getResult(); - } - waitTime = msecs - (System.currentTimeMillis() - startTime); - if (waitTime <= 0) { - throw TimeoutValueException.fromMilliseconds(msecs, msecs + Math.abs(waitTime)); + try { + lock.lock(); + if (this.completed) { + return getResult(); + } else if (waitTime <= 0) { + throw TimeoutValueException.fromMilliseconds(msecs, msecs + Math.abs(waitTime)); + } else { + for (; ; ) { + condition.await(waitTime, TimeUnit.MILLISECONDS); + if (this.completed) { + return getResult(); + } + waitTime = msecs - (System.currentTimeMillis() - startTime); + if (waitTime <= 0) { + throw TimeoutValueException.fromMilliseconds(msecs, msecs + Math.abs(waitTime)); + } } } + } finally { + lock.unlock(); } } public boolean completed(final T result) { - synchronized(this) { + lock.lock(); + try { if (this.completed) { return false; } this.completed = true; this.result = result; - notifyAll(); + condition.signalAll(); + } finally { + lock.unlock(); } if (this.callback != null) { this.callback.completed(result); @@ -126,13 +146,16 @@ public boolean completed(final T result) { } public boolean failed(final Exception exception) { - synchronized(this) { + lock.lock(); + try { if (this.completed) { return false; } this.completed = true; this.ex = exception; - notifyAll(); + condition.signalAll(); + } finally { + lock.unlock(); } if (this.callback != null) { this.callback.failed(exception); @@ -142,13 +165,16 @@ public boolean failed(final Exception exception) { @Override public boolean cancel(final boolean mayInterruptIfRunning) { - synchronized(this) { + lock.lock(); + try { if (this.completed) { return false; } this.completed = true; this.cancelled = true; - notifyAll(); + condition.signalAll(); + } finally { + lock.unlock(); } if (this.callback != null) { this.callback.cancelled(); diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java index b60adf2dcc..8ebf192e19 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/AbstractHttp1StreamDuplexer.java @@ -36,6 +36,7 @@ import java.nio.channels.WritableByteChannel; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import javax.net.ssl.SSLSession; @@ -611,19 +612,20 @@ void appendState(final StringBuilder buf) { static class CapacityWindow implements CapacityChannel { private final IOSession ioSession; - private final Object lock; + private final ReentrantLock lock; private int window; private boolean closed; CapacityWindow(final int window, final IOSession ioSession) { this.window = window; this.ioSession = ioSession; - this.lock = new Object(); + this.lock = new ReentrantLock(); } @Override public void update(final int increment) throws IOException { - synchronized (lock) { + lock.lock(); + try { if (closed) { return; } @@ -631,6 +633,8 @@ public void update(final int increment) throws IOException { updateWindow(increment); ioSession.setEvent(SelectionKey.OP_READ); } + } finally { + lock.unlock(); } } @@ -639,12 +643,15 @@ public void update(final int increment) throws IOException { * if this channel is closed in it. */ int removeCapacity(final int delta) { - synchronized (lock) { + lock.lock(); + try { updateWindow(-delta); if (window <= 0) { ioSession.clearEvent(SelectionKey.OP_READ); } return window; + } finally { + lock.unlock(); } } @@ -662,8 +669,11 @@ private void updateWindow(final int delta) { * read events outside of the context of the request the channel was created for */ void close() { - synchronized (lock) { + lock.lock(); + try { closed = true; + } finally { + lock.unlock(); } } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexer.java index fafd72fc62..4a886d7d56 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexer.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamDuplexer.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.http.ConnectionClosedException; @@ -477,6 +478,7 @@ private static class DelayedOutputChannel implements Http1StreamChannel channel) { this.channel = channel; @@ -492,13 +494,16 @@ public void submit( final HttpResponse response, final boolean endStream, final FlushMode flushMode) throws HttpException, IOException { - synchronized (this) { + lock.lock(); + try { if (direct) { channel.submit(response, endStream, flushMode); } else { delayedResponse = response; completed = endStream; } + } finally { + lock.unlock(); } } @@ -524,48 +529,63 @@ public void setSocketTimeout(final Timeout timeout) { @Override public int write(final ByteBuffer src) throws IOException { - synchronized (this) { + lock.lock(); + try { return direct ? channel.write(src) : 0; + } finally { + lock.unlock(); } } @Override public void complete(final List trailers) throws IOException { - synchronized (this) { + lock.lock(); + try { if (direct) { channel.complete(trailers); } else { completed = true; } + } finally { + lock.unlock(); } } @Override public boolean abortGracefully() throws IOException { - synchronized (this) { + lock.lock(); + try { if (direct) { return channel.abortGracefully(); } completed = true; return true; + } finally { + lock.unlock(); } } @Override public boolean isCompleted() { - synchronized (this) { + lock.lock(); + try { return direct ? channel.isCompleted() : completed; + } finally { + lock.unlock(); } } @Override public void activate() throws IOException, HttpException { - synchronized (this) { + lock.lock(); + try { direct = true; if (delayedResponse != null) { channel.submit(delayedResponse, completed, completed ? FlushMode.IMMEDIATE : FlushMode.BUFFER); delayedResponse = null; } + } finally { + lock.unlock(); } } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractBinAsyncEntityProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractBinAsyncEntityProducer.java index 4570daabb2..648685837f 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractBinAsyncEntityProducer.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractBinAsyncEntityProducer.java @@ -30,6 +30,7 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.ThreadingBehavior; @@ -53,6 +54,8 @@ enum State { ACTIVE, FLUSHING, END_STREAM } private final ByteBuffer byteBuffer; private final ContentType contentType; + private final ReentrantLock lock; + private volatile State state; public AbstractBinAsyncEntityProducer(final int fragmentSizeHint, final ContentType contentType) { @@ -60,6 +63,7 @@ public AbstractBinAsyncEntityProducer(final int fragmentSizeHint, final ContentT this.byteBuffer = ByteBuffer.allocate(this.fragmentSizeHint); this.contentType = contentType; this.state = State.ACTIVE; + this.lock = new ReentrantLock(); } private void flush(final StreamChannel channel) throws IOException { @@ -164,30 +168,40 @@ public final int available() { if (state == State.ACTIVE) { return availableData(); } else { - synchronized (byteBuffer) { + lock.lock(); + try { return byteBuffer.position(); + } finally { + lock.unlock(); } } } @Override public final void produce(final DataStreamChannel channel) throws IOException { - synchronized (byteBuffer) { + lock.lock(); + try { if (state == State.ACTIVE) { produceData(new StreamChannel() { @Override public int write(final ByteBuffer src) throws IOException { Args.notNull(src, "Buffer"); - synchronized (byteBuffer) { + lock.lock(); + try { return writeData(channel, src); + } finally { + lock.unlock(); } } @Override public void endStream() throws IOException { - synchronized (byteBuffer) { + lock.lock(); + try { streamEnd(channel); + } finally { + lock.unlock(); } } @@ -200,6 +214,8 @@ public void endStream() throws IOException { channel.endStream(); } } + } finally { + lock.unlock(); } } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractCharAsyncEntityProducer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractCharAsyncEntityProducer.java index c308fb7948..56f72bc0c4 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractCharAsyncEntityProducer.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/nio/entity/AbstractCharAsyncEntityProducer.java @@ -34,6 +34,7 @@ import java.nio.charset.CoderResult; import java.nio.charset.StandardCharsets; import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.ThreadingBehavior; @@ -59,6 +60,7 @@ enum State { ACTIVE, FLUSHING, END_STREAM } private final int fragmentSizeHint; private final ContentType contentType; private final CharsetEncoder charsetEncoder; + private final ReentrantLock lock; private volatile State state; @@ -73,6 +75,7 @@ public AbstractCharAsyncEntityProducer( final Charset charset = ContentType.getCharset(contentType, StandardCharsets.UTF_8); this.charsetEncoder = charset.newEncoder(); this.state = State.ACTIVE; + this.lock = new ReentrantLock(); } private void flush(final StreamChannel channel) throws IOException { @@ -178,30 +181,40 @@ public final int available() { if (state == State.ACTIVE) { return availableData(); } else { - synchronized (bytebuf) { + lock.lock(); + try { return bytebuf.position(); + } finally { + lock.unlock(); } } } @Override public final void produce(final DataStreamChannel channel) throws IOException { - synchronized (bytebuf) { + lock.lock(); + try { if (state == State.ACTIVE) { produceData(new StreamChannel() { @Override public int write(final CharBuffer src) throws IOException { Args.notNull(src, "Buffer"); - synchronized (bytebuf) { + lock.lock(); + try { return writeData(channel, src); + } finally { + lock.unlock(); } } @Override public void endStream() throws IOException { - synchronized (bytebuf) { + lock.lock(); + try { streamEnd(channel); + } finally { + lock.unlock(); } } @@ -223,6 +236,8 @@ public void endStream() throws IOException { } + } finally { + lock.unlock(); } } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/protocol/HttpDateGenerator.java b/httpcore5/src/main/java/org/apache/hc/core5/http/protocol/HttpDateGenerator.java index 8bfd090469..385a8ba01b 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/protocol/HttpDateGenerator.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/protocol/HttpDateGenerator.java @@ -32,6 +32,7 @@ import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; import java.util.TimeZone; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.ThreadingBehavior; @@ -66,6 +67,8 @@ public class HttpDateGenerator { private String dateAsText; private ZoneId zoneId; + private final ReentrantLock lock; + HttpDateGenerator() { dateTimeFormatter =new DateTimeFormatterBuilder() .parseLenient() @@ -73,6 +76,7 @@ public class HttpDateGenerator { .appendPattern(PATTERN_RFC1123) .toFormatter(); zoneId = GMT_ID; + this.lock = new ReentrantLock(); } @@ -83,16 +87,22 @@ private HttpDateGenerator(final String pattern, final ZoneId zoneId) { .appendPattern(pattern) .toFormatter(); this.zoneId = zoneId; + this.lock = new ReentrantLock(); } - public synchronized String getCurrentDate() { - final long now = System.currentTimeMillis(); - if (now - this.dateAsMillis > GRANULARITY_MILLIS) { - // Generate new date string - dateAsText = dateTimeFormatter.format(Instant.now().atZone(zoneId)); - dateAsMillis = now; + public String getCurrentDate() { + lock.lock(); + try { + final long now = System.currentTimeMillis(); + if (now - this.dateAsMillis > GRANULARITY_MILLIS) { + // Generate new date string + dateAsText = dateTimeFormatter.format(Instant.now().atZone(zoneId)); + dateAsMillis = now; + } + return dateAsText; + } finally { + lock.unlock(); } - return dateAsText; } } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/protocol/UriPatternMatcher.java b/httpcore5/src/main/java/org/apache/hc/core5/http/protocol/UriPatternMatcher.java index 97d8c8f925..fec7c9b46c 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/protocol/UriPatternMatcher.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/protocol/UriPatternMatcher.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.ThreadingBehavior; @@ -60,9 +61,12 @@ public class UriPatternMatcher implements LookupRegistry { private final Map map; + private final ReentrantLock lock; + public UriPatternMatcher() { super(); this.map = new LinkedHashMap<>(); + this.lock = new ReentrantLock(); } /** @@ -73,8 +77,13 @@ public UriPatternMatcher() { * @see Map#entrySet() * @since 4.4.9 */ - public synchronized Set> entrySet() { - return new HashSet<>(map.entrySet()); + public Set> entrySet() { + lock.lock(); + try { + return new HashSet<>(map.entrySet()); + } finally { + lock.unlock(); + } } /** @@ -86,9 +95,14 @@ public synchronized Set> entrySet() { * the object. */ @Override - public synchronized void register(final String pattern, final T obj) { - Args.notNull(pattern, "URI request pattern"); - this.map.put(pattern, obj); + public void register(final String pattern, final T obj) { + lock.lock(); + try { + Args.notNull(pattern, "URI request pattern"); + this.map.put(pattern, obj); + } finally { + lock.unlock(); + } } /** @@ -98,11 +112,16 @@ public synchronized void register(final String pattern, final T obj) { * the pattern to unregister. */ @Override - public synchronized void unregister(final String pattern) { - if (pattern == null) { - return; + public void unregister(final String pattern) { + lock.lock(); + try { + if (pattern == null) { + return; + } + this.map.remove(pattern); + } finally { + lock.unlock(); } - this.map.remove(pattern); } /** @@ -113,25 +132,30 @@ public synchronized void unregister(final String pattern) { * @return object or {@code null} if no match is found. */ @Override - public synchronized T lookup(final String path) { - Args.notNull(path, "Request path"); - // direct match? - T obj = this.map.get(path); - if (obj == null) { - // pattern match? - String bestMatch = null; - for (final String pattern : this.map.keySet()) { - if (matchUriRequestPattern(pattern, path)) { - // we have a match. is it any better? - if (bestMatch == null || (bestMatch.length() < pattern.length()) - || (bestMatch.length() == pattern.length() && pattern.endsWith("*"))) { - obj = this.map.get(pattern); - bestMatch = pattern; + public T lookup(final String path) { + lock.lock(); + try { + Args.notNull(path, "Request path"); + // direct match? + T obj = this.map.get(path); + if (obj == null) { + // pattern match? + String bestMatch = null; + for (final String pattern : this.map.keySet()) { + if (matchUriRequestPattern(pattern, path)) { + // we have a match. is it any better? + if (bestMatch == null || (bestMatch.length() < pattern.length()) + || (bestMatch.length() == pattern.length() && pattern.endsWith("*"))) { + obj = this.map.get(pattern); + bestMatch = pattern; + } } } } + return obj; + } finally { + lock.unlock(); } - return obj; } /** diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/protocol/UriPatternOrderedMatcher.java b/httpcore5/src/main/java/org/apache/hc/core5/http/protocol/UriPatternOrderedMatcher.java index 15fb5cde73..2eca498fbb 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/protocol/UriPatternOrderedMatcher.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/protocol/UriPatternOrderedMatcher.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.ThreadingBehavior; @@ -60,6 +61,8 @@ public class UriPatternOrderedMatcher implements LookupRegistry { private final Map map; + private final ReentrantLock lock = new ReentrantLock(); + public UriPatternOrderedMatcher() { super(); this.map = new LinkedHashMap<>(); @@ -73,8 +76,13 @@ public UriPatternOrderedMatcher() { * @see Map#entrySet() * @since 4.4.9 */ - public synchronized Set> entrySet() { - return new HashSet<>(map.entrySet()); + public Set> entrySet() { + lock.lock(); + try { + return new HashSet<>(map.entrySet()); + } finally { + lock.unlock(); + } } /** @@ -84,9 +92,14 @@ public synchronized Set> entrySet() { * @param obj the object. */ @Override - public synchronized void register(final String pattern, final T obj) { - Args.notNull(pattern, "URI request pattern"); - this.map.put(pattern, obj); + public void register(final String pattern, final T obj) { + lock.lock(); + try { + Args.notNull(pattern, "URI request pattern"); + this.map.put(pattern, obj); + } finally { + lock.unlock(); + } } /** @@ -95,11 +108,16 @@ public synchronized void register(final String pattern, final T obj) { * @param pattern the pattern to unregister. */ @Override - public synchronized void unregister(final String pattern) { - if (pattern == null) { - return; + public void unregister(final String pattern) { + lock.lock(); + try { + if (pattern == null) { + return; + } + this.map.remove(pattern); + } finally { + lock.unlock(); } - this.map.remove(pattern); } /** @@ -109,18 +127,23 @@ public synchronized void unregister(final String pattern) { * @return object or {@code null} if no match is found. */ @Override - public synchronized T lookup(final String path) { - Args.notNull(path, "Request path"); - for (final Entry entry : this.map.entrySet()) { - final String pattern = entry.getKey(); - if (path.equals(pattern)) { - return entry.getValue(); - } - if (matchUriRequestPattern(pattern, path)) { - return this.map.get(pattern); + public T lookup(final String path) { + lock.lock(); + try { + Args.notNull(path, "Request path"); + for (final Entry entry : this.map.entrySet()) { + final String pattern = entry.getKey(); + if (path.equals(pattern)) { + return entry.getValue(); + } + if (matchUriRequestPattern(pattern, path)) { + return this.map.get(pattern); + } } + return null; + } finally { + lock.unlock(); } - return null; } /** diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/protocol/UriRegexMatcher.java b/httpcore5/src/main/java/org/apache/hc/core5/http/protocol/UriRegexMatcher.java index 92f9beaed8..35e3ab60a3 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/protocol/UriRegexMatcher.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/protocol/UriRegexMatcher.java @@ -30,6 +30,7 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; import org.apache.hc.core5.annotation.Contract; @@ -53,10 +54,13 @@ public class UriRegexMatcher implements LookupRegistry { private final Map objectMap; private final Map patternMap; + private final ReentrantLock lock; + public UriRegexMatcher() { super(); this.objectMap = new LinkedHashMap<>(); this.patternMap = new LinkedHashMap<>(); + this.lock = new ReentrantLock(); } /** @@ -68,10 +72,15 @@ public UriRegexMatcher() { * the object. */ @Override - public synchronized void register(final String regex, final T obj) { - Args.notNull(regex, "URI request regex"); - this.objectMap.put(regex, obj); - this.patternMap.put(regex, Pattern.compile(regex)); + public void register(final String regex, final T obj) { + lock.lock(); + try { + Args.notNull(regex, "URI request regex"); + this.objectMap.put(regex, obj); + this.patternMap.put(regex, Pattern.compile(regex)); + } finally { + lock.unlock(); + } } /** @@ -81,12 +90,17 @@ public synchronized void register(final String regex, final T obj) { * the regex to unregister. */ @Override - public synchronized void unregister(final String regex) { - if (regex == null) { - return; + public void unregister(final String regex) { + lock.lock(); + try { + if (regex == null) { + return; + } + this.objectMap.remove(regex); + this.patternMap.remove(regex); + } finally { + lock.unlock(); } - this.objectMap.remove(regex); - this.patternMap.remove(regex); } /** @@ -97,19 +111,24 @@ public synchronized void unregister(final String regex) { * @return object or {@code null} if no match is found. */ @Override - public synchronized T lookup(final String path) { - Args.notNull(path, "Request path"); - // direct match? - final T obj = this.objectMap.get(path); - if (obj == null) { - // regex match? - for (final Entry entry : this.patternMap.entrySet()) { - if (entry.getValue().matcher(path).matches()) { - return objectMap.get(entry.getKey()); + public T lookup(final String path) { + lock.lock(); + try { + Args.notNull(path, "Request path"); + // direct match? + final T obj = this.objectMap.get(path); + if (obj == null) { + // regex match? + for (final Entry entry : this.patternMap.entrySet()) { + if (entry.getValue().matcher(path).matches()) { + return objectMap.get(entry.getKey()); + } } } + return obj; + } finally { + lock.unlock(); } - return obj; } @Override diff --git a/httpcore5/src/main/java/org/apache/hc/core5/pool/LaxConnPool.java b/httpcore5/src/main/java/org/apache/hc/core5/pool/LaxConnPool.java index 4a40470eec..6479ba7c10 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/pool/LaxConnPool.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/pool/LaxConnPool.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicMarkableReference; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.Experimental; @@ -362,6 +363,8 @@ private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL } private final AtomicInteger allocated; private final AtomicLong releaseSeqNum; + private final ReentrantLock lock; + private volatile int max; PerRoutePool( @@ -386,6 +389,7 @@ private enum RequestServiceStrategy { FIRST_SUCCESSFUL, ALL } this.allocated = new AtomicInteger(0); this.releaseSeqNum = new AtomicLong(0); this.max = max; + this.lock = new ReentrantLock(); } public void shutdown(final CloseMode closeMode) { @@ -462,13 +466,16 @@ public Future> lease( final BasicFuture> future = new BasicFuture>(callback) { @Override - public synchronized PoolEntry get( + public PoolEntry get( final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + lock.lock(); try { return super.get(timeout, unit); } catch (final TimeoutException ex) { cancel(); throw ex; + } finally { + lock.unlock(); } } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/pool/StrictConnPool.java b/httpcore5/src/main/java/org/apache/hc/core5/pool/StrictConnPool.java index 9801cf82e5..e6b2ee7568 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/pool/StrictConnPool.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/pool/StrictConnPool.java @@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.hc.core5.annotation.Contract; @@ -78,7 +77,7 @@ public class StrictConnPool implements ManagedConnP private final LinkedList> available; private final ConcurrentLinkedQueue> completedRequests; private final Map maxPerRoute; - private final Lock lock; + private final ReentrantLock lock; private final AtomicBoolean isShutDown; private volatile int defaultMaxPerRoute; @@ -178,13 +177,16 @@ public Future> lease( final BasicFuture> future = new BasicFuture>(callback) { @Override - public synchronized PoolEntry get( + public PoolEntry get( final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + lock.lock(); try { return super.get(timeout, unit); } catch (final TimeoutException ex) { cancel(); throw ex; + } finally { + lock.unlock(); } } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOSessionPool.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOSessionPool.java index efe855d959..9f0edb7f97 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOSessionPool.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOSessionPool.java @@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.ThreadingBehavior; @@ -58,10 +59,13 @@ public abstract class AbstractIOSessionPool implements ModalCloseable { private final ConcurrentMap sessionPool; private final AtomicBoolean closed; + private final ReentrantLock lock; + public AbstractIOSessionPool() { super(); this.sessionPool = new ConcurrentHashMap<>(); this.closed = new AtomicBoolean(false); + this.lock = new ReentrantLock(); } protected abstract Future connectSession( @@ -81,7 +85,8 @@ protected abstract void closeSession( public final void close(final CloseMode closeMode) { if (closed.compareAndSet(false, true)) { for (final PoolEntry poolEntry : sessionPool.values()) { - synchronized (poolEntry) { + lock.lock(); + try { if (poolEntry.session != null) { closeSession(poolEntry.session, closeMode); poolEntry.session = null; @@ -98,6 +103,8 @@ public final void close(final CloseMode closeMode) { break; } } + } finally { + lock.unlock(); } } sessionPool.clear(); @@ -170,7 +177,8 @@ private void getSessionInternal( final T namedEndpoint, final Timeout connectTimeout, final FutureCallback callback) { - synchronized (poolEntry) { + poolEntry.lock.lock(); + try { if (poolEntry.session != null && requestNew) { closeSession(poolEntry.session, CloseMode.GRACEFUL); poolEntry.session = null; @@ -193,7 +201,8 @@ private void getSessionInternal( @Override public void completed(final IOSession result) { - synchronized (poolEntry) { + poolEntry.lock.lock(); + try { poolEntry.session = result; for (;;) { final FutureCallback callback = poolEntry.requestQueue.poll(); @@ -203,12 +212,15 @@ public void completed(final IOSession result) { break; } } + } finally { + poolEntry.lock.unlock(); } } @Override public void failed(final Exception ex) { - synchronized (poolEntry) { + poolEntry.lock.lock(); + try { poolEntry.session = null; for (;;) { final FutureCallback callback = poolEntry.requestQueue.poll(); @@ -218,6 +230,8 @@ public void failed(final Exception ex) { break; } } + } finally { + poolEntry.lock.unlock(); } } @@ -229,19 +243,24 @@ public void cancelled() { }); } } + } finally { + poolEntry.lock.unlock(); } } public final void enumAvailable(final Callback callback) { for (final PoolEntry poolEntry: sessionPool.values()) { if (poolEntry.session != null) { - synchronized (poolEntry) { + lock.lock(); + try { if (poolEntry.session != null) { callback.execute(poolEntry.session); if (!poolEntry.session.isOpen()) { poolEntry.session = null; } } + } finally { + lock.unlock(); } } } @@ -251,11 +270,14 @@ public final void closeIdle(final TimeValue idleTime) { final long deadline = System.currentTimeMillis() - (TimeValue.isPositive(idleTime) ? idleTime.toMilliseconds() : 0); for (final PoolEntry poolEntry: sessionPool.values()) { if (poolEntry.session != null) { - synchronized (poolEntry) { + lock.lock(); + try { if (poolEntry.session != null && poolEntry.session.getLastReadTime() <= deadline) { closeSession(poolEntry.session, CloseMode.GRACEFUL); poolEntry.session = null; } + } finally { + lock.unlock(); } } } @@ -278,9 +300,11 @@ static class PoolEntry { final Queue> requestQueue; volatile Future sessionFuture; volatile IOSession session; + final ReentrantLock lock; // Added PoolEntry() { this.requestQueue = new ArrayDeque<>(); + this.lock = new ReentrantLock(); } } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractSingleCoreIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractSingleCoreIOReactor.java index b2834d3ed6..fc3de36f1f 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractSingleCoreIOReactor.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractSingleCoreIOReactor.java @@ -33,8 +33,11 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hc.core5.function.Callback; import org.apache.hc.core5.io.CloseMode; @@ -48,14 +51,15 @@ abstract class AbstractSingleCoreIOReactor implements IOReactor { private final Callback exceptionCallback; private final AtomicReference status; private final AtomicBoolean terminated; - private final Object shutdownMutex; + private final Condition condition; + + private final ReentrantLock lock; final Selector selector; AbstractSingleCoreIOReactor(final Callback exceptionCallback) { super(); this.exceptionCallback = exceptionCallback; - this.shutdownMutex = new Object(); this.status = new AtomicReference<>(IOReactorStatus.INACTIVE); this.terminated = new AtomicBoolean(); try { @@ -63,6 +67,8 @@ abstract class AbstractSingleCoreIOReactor implements IOReactor { } catch (final IOException ex) { throw new IllegalStateException("Unexpected failure opening I/O selector", ex); } + this.lock = new ReentrantLock(); + this.condition = lock.newCondition(); } @Override @@ -105,22 +111,28 @@ public final void awaitShutdown(final TimeValue waitTime) throws InterruptedExce Args.notNull(waitTime, "Wait time"); final long deadline = System.currentTimeMillis() + waitTime.toMilliseconds(); long remaining = waitTime.toMilliseconds(); - synchronized (this.shutdownMutex) { + lock.lock(); + try { while (this.status.get().compareTo(IOReactorStatus.SHUT_DOWN) < 0) { - this.shutdownMutex.wait(remaining); + condition.await(remaining, TimeUnit.MILLISECONDS); remaining = deadline - System.currentTimeMillis(); if (remaining <= 0) { return; } } + } finally { + lock.unlock(); } } @Override public final void initiateShutdown() { if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.SHUT_DOWN)) { - synchronized (this.shutdownMutex) { - this.shutdownMutex.notifyAll(); + lock.lock(); + try { + condition.signalAll(); + } finally { + lock.unlock(); } } else if (this.status.compareAndSet(IOReactorStatus.ACTIVE, IOReactorStatus.SHUTTING_DOWN)) { this.selector.wakeup(); @@ -169,8 +181,11 @@ public void close(final CloseMode closeMode, final Timeout timeout) { logException(ex); } } - synchronized (this.shutdownMutex) { - this.shutdownMutex.notifyAll(); + lock.lock(); + try { + condition.signalAll(); + } finally { + lock.unlock(); } } diff --git a/httpcore5/src/test/java/org/apache/hc/core5/concurrent/TestBasicFuture.java b/httpcore5/src/test/java/org/apache/hc/core5/concurrent/TestBasicFuture.java index a95e3a809d..df9bfec0fb 100644 --- a/httpcore5/src/test/java/org/apache/hc/core5/concurrent/TestBasicFuture.java +++ b/httpcore5/src/test/java/org/apache/hc/core5/concurrent/TestBasicFuture.java @@ -26,8 +26,17 @@ */ package org.apache.hc.core5.concurrent; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; +import static org.junit.jupiter.api.Assertions.fail; + +import java.time.Duration; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -113,7 +122,7 @@ public void testCancelled() throws Exception { Mockito.verify(callback, Mockito.never()).failed(Mockito.any()); Mockito.verify(callback).cancelled(); - Assertions.assertThrows(CancellationException.class, future::get); + assertThrows(CancellationException.class, future::get); Assertions.assertTrue(future.isDone()); Assertions.assertTrue(future.isCancelled()); } @@ -173,7 +182,7 @@ public void testAsyncCancelled() throws Exception { }); t.setDaemon(true); t.start(); - Assertions.assertThrows(CancellationException.class, () -> + assertThrows(CancellationException.class, () -> future.get(60, TimeUnit.SECONDS)); } @@ -191,15 +200,94 @@ public void testAsyncTimeout() throws Exception { }); t.setDaemon(true); t.start(); - Assertions.assertThrows(TimeoutException.class, () -> + assertThrows(TimeoutException.class, () -> future.get(1, TimeUnit.MILLISECONDS)); } @Test public void testAsyncNegativeTimeout() throws Exception { final BasicFuture future = new BasicFuture<>(null); - Assertions.assertThrows(TimeoutValueException.class, () -> + assertThrows(TimeoutValueException.class, () -> future.get(-1, TimeUnit.MILLISECONDS)); } + @Test + public void testConcurrentOperations() throws InterruptedException, ExecutionException { + final FutureCallback callback = new FutureCallback() { + public void completed(final Object result) { + } + + public void failed(final Exception ex) { + } + + public void cancelled() { + } + }; + + final ExecutorService executor = Executors.newFixedThreadPool(3); + final BasicFuture future = new BasicFuture<>(callback); + final Object expectedResult = new Object(); + + // Run 3 tasks concurrently: complete, fail, and cancel the future. + final Future future1 = executor.submit(() -> future.completed(expectedResult)); + final Future future2 = executor.submit(() -> future.failed(new Exception("Test Exception"))); + final Future future3 = executor.submit(() -> future.cancel()); + + // Wait for the tasks to finish. + future1.get(); + future2.get(); + future3.get(); + + // Verify that the first operation won and the other two failed. + if (future.isDone()) { + assertEquals(expectedResult, future.get()); + } else if (future.isCancelled()) { + assertThrows(CancellationException.class, future::get); + } else { + assertThrows(ExecutionException.class, future::get); + } + + // Shutdown the executor. + executor.shutdown(); + } + + @Test + void testGetWithTimeout() { + final FutureCallback callback = new FutureCallback() { + @Override + public void completed(final String result) { + // Nothing to do + } + + @Override + public void failed(final Exception ex) { + // Nothing to do + } + + @Override + public void cancelled() { + // Nothing to do + } + }; + + final BasicFuture future = new BasicFuture<>(callback); + + new Thread(() -> { + try { + Thread.sleep(100); // This simulates the delay in completing the future. + future.completed("test"); + } catch (final InterruptedException e) { + future.failed(e); + } + }).start(); + + assertTimeoutPreemptively(Duration.ofMillis(200), () -> { + try { + assertEquals("test", future.get(1, TimeUnit.SECONDS)); + } catch (final ExecutionException | TimeoutException e) { + fail("Test failed due to exception: " + e.getMessage()); + } + }); + } + } diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/examples/AsyncReverseProxyExample.java b/httpcore5/src/test/java/org/apache/hc/core5/http/examples/AsyncReverseProxyExample.java index 6a96b5d973..c1576b1713 100644 --- a/httpcore5/src/test/java/org/apache/hc/core5/http/examples/AsyncReverseProxyExample.java +++ b/httpcore5/src/test/java/org/apache/hc/core5/http/examples/AsyncReverseProxyExample.java @@ -40,6 +40,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.http.ConnectionClosedException; @@ -253,12 +254,14 @@ private static class IncomingExchangeHandler implements AsyncServerExchangeHandl private final HttpHost targetHost; private final HttpAsyncRequester requester; private final ProxyExchangeState exchangeState; + private final ReentrantLock lock; IncomingExchangeHandler(final HttpHost targetHost, final HttpAsyncRequester requester) { super(); this.targetHost = targetHost; this.requester = requester; this.exchangeState = new ProxyExchangeState(); + this.lock = new ReentrantLock(); } @Override @@ -268,7 +271,8 @@ public void handleRequest( final ResponseChannel responseChannel, final HttpContext httpContext) throws HttpException, IOException { - synchronized (exchangeState) { + lock.lock(); + try { println("[client->proxy] " + exchangeState.id + " " + incomingRequest.getMethod() + " " + incomingRequest.getRequestUri()); exchangeState.request = incomingRequest; @@ -282,6 +286,8 @@ public void handleRequest( responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), httpContext); } } + } finally { + lock.unlock(); } println("[proxy->origin] " + exchangeState.id + " request connection to " + targetHost); @@ -291,8 +297,11 @@ public void handleRequest( @Override public void completed(final AsyncClientEndpoint clientEndpoint) { println("[proxy->origin] " + exchangeState.id + " connection leased"); - synchronized (exchangeState) { + lock.lock(); + try { exchangeState.clientEndpoint = clientEndpoint; + } finally { + lock.unlock(); } clientEndpoint.execute( new OutgoingExchangeHandler(targetHost, clientEndpoint, exchangeState), @@ -306,12 +315,15 @@ public void failed(final Exception cause) { final ByteBuffer msg = StandardCharsets.US_ASCII.encode(CharBuffer.wrap(cause.getMessage())); final EntityDetails exEntityDetails = new BasicEntityDetails(msg.remaining(), ContentType.TEXT_PLAIN); - synchronized (exchangeState) { + lock.lock(); + try { exchangeState.response = outgoingResponse; exchangeState.responseEntityDetails = exEntityDetails; exchangeState.outBuf = new ProxyBuffer(1024); exchangeState.outBuf.put(msg); exchangeState.outputEnd = true; + } finally { + lock.unlock(); } println("[client<-proxy] " + exchangeState.id + " status " + outgoingResponse.getCode()); @@ -333,19 +345,23 @@ public void cancelled() { @Override public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { - synchronized (exchangeState) { + lock.lock(); + try { exchangeState.requestCapacityChannel = capacityChannel; final int capacity = exchangeState.inBuf != null ? exchangeState.inBuf.capacity() : INIT_BUFFER_SIZE; if (capacity > 0) { println("[client<-proxy] " + exchangeState.id + " input capacity: " + capacity); capacityChannel.update(capacity); } + } finally { + lock.unlock(); } } @Override public void consume(final ByteBuffer src) throws IOException { - synchronized (exchangeState) { + lock.lock(); + try { println("[client->proxy] " + exchangeState.id + " " + src.remaining() + " bytes received"); final DataStreamChannel dataChannel = exchangeState.requestDataChannel; if (dataChannel != null && exchangeState.inBuf != null) { @@ -369,12 +385,15 @@ public void consume(final ByteBuffer src) throws IOException { if (dataChannel != null) { dataChannel.requestOutput(); } + } finally { + lock.unlock(); } } @Override public void streamEnd(final List trailers) throws HttpException, IOException { - synchronized (exchangeState) { + lock.lock(); + try { println("[client->proxy] " + exchangeState.id + " end of input"); exchangeState.inputEnd = true; final DataStreamChannel dataChannel = exchangeState.requestDataChannel; @@ -382,21 +401,27 @@ public void streamEnd(final List trailers) throws HttpExceptio println("[proxy->origin] " + exchangeState.id + " end of output"); dataChannel.endStream(); } + } finally { + lock.unlock(); } } @Override public int available() { - synchronized (exchangeState) { + lock.lock(); + try { final int available = exchangeState.outBuf != null ? exchangeState.outBuf.length() : 0; println("[client<-proxy] " + exchangeState.id + " output available: " + available); return available; + } finally { + lock.unlock(); } } @Override public void produce(final DataStreamChannel channel) throws IOException { - synchronized (exchangeState) { + lock.lock(); + try { println("[client<-proxy] " + exchangeState.id + " produce output"); exchangeState.responseDataChannel = channel; @@ -420,6 +445,8 @@ public void produce(final DataStreamChannel channel) throws IOException { } } } + } finally { + lock.unlock(); } } @@ -429,19 +456,25 @@ public void failed(final Exception cause) { if (!(cause instanceof ConnectionClosedException)) { cause.printStackTrace(System.out); } - synchronized (exchangeState) { + lock.lock(); + try { if (exchangeState.clientEndpoint != null) { exchangeState.clientEndpoint.releaseAndDiscard(); } + } finally { + lock.unlock(); } } @Override public void releaseResources() { - synchronized (exchangeState) { + lock.lock(); + try { exchangeState.responseMessageChannel = null; exchangeState.responseDataChannel = null; exchangeState.requestCapacityChannel = null; + } finally { + lock.unlock(); } } @@ -463,6 +496,7 @@ private static class OutgoingExchangeHandler implements AsyncClientExchangeHandl private final HttpHost targetHost; private final AsyncClientEndpoint clientEndpoint; private final ProxyExchangeState exchangeState; + private final ReentrantLock lock; OutgoingExchangeHandler( final HttpHost targetHost, @@ -471,12 +505,14 @@ private static class OutgoingExchangeHandler implements AsyncClientExchangeHandl this.targetHost = targetHost; this.clientEndpoint = clientEndpoint; this.exchangeState = exchangeState; + this.lock = new ReentrantLock(); } @Override public void produceRequest( final RequestChannel channel, final HttpContext httpContext) throws HttpException, IOException { - synchronized (exchangeState) { + lock.lock(); + try { final HttpRequest incomingRequest = exchangeState.request; final EntityDetails entityDetails = exchangeState.requestEntityDetails; final HttpRequest outgoingRequest = new BasicHttpRequest( @@ -494,21 +530,27 @@ public void produceRequest( outgoingRequest.getMethod() + " " + outgoingRequest.getRequestUri()); channel.sendRequest(outgoingRequest, entityDetails, httpContext); + } finally { + lock.unlock(); } } @Override public int available() { - synchronized (exchangeState) { + lock.lock(); + try { final int available = exchangeState.inBuf != null ? exchangeState.inBuf.length() : 0; println("[proxy->origin] " + exchangeState.id + " output available: " + available); return available; + } finally { + lock.unlock(); } } @Override public void produce(final DataStreamChannel channel) throws IOException { - synchronized (exchangeState) { + lock.lock(); + try { println("[proxy->origin] " + exchangeState.id + " produce output"); exchangeState.requestDataChannel = channel; if (exchangeState.inBuf != null) { @@ -531,6 +573,8 @@ public void produce(final DataStreamChannel channel) throws IOException { } } } + } finally { + lock.unlock(); } } @@ -543,7 +587,8 @@ public void consumeInformation(final HttpResponse response, final HttpContext ht public void consumeResponse( final HttpResponse incomingResponse, final EntityDetails entityDetails, final HttpContext httpContext) throws HttpException, IOException { - synchronized (exchangeState) { + lock.lock(); + try { println("[proxy<-origin] " + exchangeState.id + " status " + incomingResponse.getCode()); if (entityDetails == null) { println("[proxy<-origin] " + exchangeState.id + " end of input"); @@ -572,24 +617,30 @@ public void consumeResponse( println("[client<-proxy] " + exchangeState.id + " end of output"); clientEndpoint.releaseAndReuse(); } + } finally { + lock.unlock(); } } @Override public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { - synchronized (exchangeState) { + lock.lock(); + try { exchangeState.responseCapacityChannel = capacityChannel; final int capacity = exchangeState.outBuf != null ? exchangeState.outBuf.capacity() : INIT_BUFFER_SIZE; if (capacity > 0) { println("[proxy->origin] " + exchangeState.id + " input capacity: " + capacity); capacityChannel.update(capacity); } + } finally { + lock.unlock(); } } @Override public void consume(final ByteBuffer src) throws IOException { - synchronized (exchangeState) { + lock.lock(); + try { println("[proxy<-origin] " + exchangeState.id + " " + src.remaining() + " bytes received"); final DataStreamChannel dataChannel = exchangeState.responseDataChannel; if (dataChannel != null && exchangeState.outBuf != null) { @@ -613,12 +664,15 @@ public void consume(final ByteBuffer src) throws IOException { if (dataChannel != null) { dataChannel.requestOutput(); } + } finally { + lock.unlock(); } } @Override public void streamEnd(final List trailers) throws HttpException, IOException { - synchronized (exchangeState) { + lock.lock(); + try { println("[proxy<-origin] " + exchangeState.id + " end of input"); exchangeState.outputEnd = true; final DataStreamChannel dataChannel = exchangeState.responseDataChannel; @@ -627,6 +681,8 @@ public void streamEnd(final List trailers) throws HttpExceptio dataChannel.endStream(); clientEndpoint.releaseAndReuse(); } + } finally { + lock.unlock(); } } @@ -641,7 +697,8 @@ public void failed(final Exception cause) { if (!(cause instanceof ConnectionClosedException)) { cause.printStackTrace(System.out); } - synchronized (exchangeState) { + lock.lock(); + try { if (exchangeState.response == null) { final int status = cause instanceof IOException ? HttpStatus.SC_SERVICE_UNAVAILABLE : HttpStatus.SC_INTERNAL_SERVER_ERROR; final HttpResponse outgoingResponse = new BasicHttpResponse(status); @@ -666,15 +723,20 @@ public void failed(final Exception cause) { exchangeState.outputEnd = true; } clientEndpoint.releaseAndDiscard(); + } finally { + lock.unlock(); } } @Override public void releaseResources() { - synchronized (exchangeState) { + lock.lock(); + try { exchangeState.requestDataChannel = null; exchangeState.responseCapacityChannel = null; clientEndpoint.releaseAndDiscard(); + } finally { + lock.unlock(); } } diff --git a/httpcore5/src/test/java/org/apache/hc/core5/http/nio/support/classic/TestSharedOutputBuffer.java b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/support/classic/TestSharedOutputBuffer.java index 35e8d55dba..479b50be9d 100644 --- a/httpcore5/src/test/java/org/apache/hc/core5/http/nio/support/classic/TestSharedOutputBuffer.java +++ b/httpcore5/src/test/java/org/apache/hc/core5/http/nio/support/classic/TestSharedOutputBuffer.java @@ -37,6 +37,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.WritableByteChannelMock; @@ -54,24 +56,44 @@ static class DataStreamChannelMock implements DataStreamChannel { private final WritableByteChannelMock channel; + private final ReentrantLock lock; + private final Condition condition; + DataStreamChannelMock(final WritableByteChannelMock channel) { this.channel = channel; + this.lock = new ReentrantLock(); + this.condition = lock.newCondition(); } @Override - public synchronized int write(final ByteBuffer src) throws IOException { - return channel.write(src); + public int write(final ByteBuffer src) throws IOException { + lock.lock(); + try { + return channel.write(src); + } finally { + lock.unlock(); + } } @Override - public synchronized void requestOutput() { - notifyAll(); + public void requestOutput() { + lock.lock(); + try { + condition.signalAll(); + } finally { + lock.unlock(); + } } @Override - public synchronized void endStream(final List trailers) throws IOException { - channel.close(); - notifyAll(); + public void endStream(final List trailers) throws IOException { + lock.lock(); + try { + channel.close(); + condition.signalAll(); + } finally { + lock.unlock(); + } } @Override @@ -79,8 +101,13 @@ public void endStream() throws IOException { endStream(null); } - public synchronized void awaitOutputRequest() throws InterruptedException { - wait(); + public void awaitOutputRequest() throws InterruptedException { + lock.lock(); + try { + condition.await(); + } finally { + lock.unlock(); + } } } diff --git a/httpcore5/src/test/java/org/apache/hc/core5/ssl/DummyProvider.java b/httpcore5/src/test/java/org/apache/hc/core5/ssl/DummyProvider.java index 6d82a48ab8..a19625fcdd 100644 --- a/httpcore5/src/test/java/org/apache/hc/core5/ssl/DummyProvider.java +++ b/httpcore5/src/test/java/org/apache/hc/core5/ssl/DummyProvider.java @@ -31,6 +31,7 @@ import java.security.Security; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; public class DummyProvider extends Provider { @@ -40,8 +41,11 @@ public class DummyProvider extends Provider { private final Set requestedTypes = new HashSet<>(); + private final ReentrantLock lock; + public DummyProvider() { super(NAME, 1.1, "http core fake provider 1.1"); + this.lock = new ReentrantLock(); } public boolean hasBeenRequested(final String what) { @@ -58,7 +62,12 @@ public Service getService(final String type, final String algorithm) { } @Override - public synchronized Set getServices() { - return realJSSEProvider.getServices(); + public Set getServices() { + lock.lock(); + try { + return realJSSEProvider.getServices(); + } finally { + lock.unlock(); + } } }