Skip to content

Commit

Permalink
Refactor Synchronized Blocks to Use ReentrantLock. (#412)
Browse files Browse the repository at this point in the history
This commit replaces instances of 'synchronized' keyword with the explicit use of 'ReentrantLock' to provide more advanced synchronization mechanisms.
  • Loading branch information
arturobernalg authored Aug 11, 2023
1 parent 9663aa1 commit 4d6d9f5
Show file tree
Hide file tree
Showing 21 changed files with 641 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
Expand All @@ -58,14 +59,15 @@ final class ReactiveDataConsumer implements AsyncDataConsumer, Publisher<ByteBuf

private final BlockingQueue<ByteBuffer> buffers = new LinkedBlockingQueue<>();
private final AtomicBoolean flushInProgress = new AtomicBoolean(false);
private final Object flushLock = new Object();
private final AtomicInteger windowScalingIncrement = new AtomicInteger(0);
private volatile boolean cancelled;
private volatile boolean completed;
private volatile Exception exception;
private volatile CapacityChannel capacityChannel;
private volatile Subscriber<? super ByteBuffer> subscriber;

private final ReentrantLock lock = new ReentrantLock();

public void failed(final Exception cause) {
if (!completed) {
exception = cause;
Expand Down Expand Up @@ -119,7 +121,8 @@ public void releaseResources() {
}

private void flushToSubscriber() {
synchronized (flushLock) {
lock.lock();
try {
final Subscriber<? super ByteBuffer> s = subscriber;
if (flushInProgress.getAndSet(true)) {
return;
Expand Down Expand Up @@ -157,6 +160,8 @@ private void flushToSubscriber() {
} finally {
flushInProgress.set(false);
}
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.ArrayDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
Expand Down Expand Up @@ -59,8 +60,11 @@ final class ReactiveDataProducer implements AsyncDataProducer, Subscriber<ByteBu
private final AtomicReference<Subscription> subscription = new AtomicReference<>();
private final ArrayDeque<ByteBuffer> buffers = new ArrayDeque<>(); // This field requires synchronization

private final ReentrantLock lock;

public ReactiveDataProducer(final Publisher<ByteBuffer> publisher) {
this.publisher = Args.notNull(publisher, "publisher");
this.lock = new ReentrantLock();
}

void setChannel(final DataStreamChannel channel) {
Expand All @@ -80,8 +84,11 @@ public void onSubscribe(final Subscription subscription) {
public void onNext(final ByteBuffer byteBuffer) {
final byte[] copy = new byte[byteBuffer.remaining()];
byteBuffer.get(copy);
synchronized (buffers) {
lock.lock();
try {
buffers.add(ByteBuffer.wrap(copy));
} finally {
lock.unlock();
}
signalReadiness();
}
Expand Down Expand Up @@ -113,12 +120,15 @@ public int available() {
if (exception.get() != null || complete.get()) {
return 1;
} else {
synchronized (buffers) {
lock.lock();
try {
int sum = 0;
for (final ByteBuffer buffer : buffers) {
sum += buffer.remaining();
}
return sum;
} finally {
lock.unlock();
}
}
}
Expand All @@ -134,7 +144,8 @@ public void produce(final DataStreamChannel channel) throws IOException {
final Subscription s = subscription.get();
int buffersToReplenish = 0;
try {
synchronized (buffers) {
lock.lock();
try {
if (t != null) {
throw new HttpStreamResetException(t.getMessage(), t);
} else if (this.complete.get() && buffers.isEmpty()) {
Expand All @@ -152,6 +163,8 @@ public void produce(final DataStreamChannel channel) throws IOException {
}
}
}
} finally {
lock.unlock();
}
} finally {
if (s != null && buffersToReplenish > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.hc.core5.net.InetAddressUtils;
import org.apache.hc.core5.util.TimeValue;
Expand Down Expand Up @@ -209,43 +210,51 @@ public void shutdown() {
private final List<SocksProxyHandler> handlers = new ArrayList<>();
private ServerSocket server;
private Thread serverThread;
private final ReentrantLock lock;

public SocksProxy() {
this(0);
}

public SocksProxy(final int port) {
this.port = port;
this.lock = new ReentrantLock();
}

public synchronized void start() throws IOException {
if (this.server == null) {
this.server = new ServerSocket(this.port);
this.serverThread = new Thread(() -> {
try {
while (true) {
final Socket socket = server.accept();
startSocksProxyHandler(socket);
}
} catch (final IOException e) {
} finally {
if (server != null) {
try {
server.close();
} catch (final IOException e) {
public void start() throws IOException {
lock.lock();
try {
if (this.server == null) {
this.server = new ServerSocket(this.port);
this.serverThread = new Thread(() -> {
try {
while (true) {
final Socket socket = server.accept();
startSocksProxyHandler(socket);
}
} catch (final IOException e) {
} finally {
if (server != null) {
try {
server.close();
} catch (final IOException e) {
}
server = null;
}
server = null;
}
}
});
this.serverThread.start();
});
this.serverThread.start();
}
} finally {
lock.unlock();
}
}

public void shutdown(final TimeValue timeout) throws InterruptedException {
final long waitUntil = System.currentTimeMillis() + timeout.toMilliseconds();
Thread t = null;
synchronized (this) {
lock.lock();
try {
if (this.server != null) {
try {
this.server.close();
Expand All @@ -265,6 +274,8 @@ public void shutdown(final TimeValue timeout) throws InterruptedException {
wait(waitTime);
}
}
} finally {
lock.unlock();
}
if (t != null) {
final long waitTime = waitUntil - System.currentTimeMillis();
Expand All @@ -276,14 +287,22 @@ public void shutdown(final TimeValue timeout) throws InterruptedException {

protected void startSocksProxyHandler(final Socket socket) {
final SocksProxyHandler handler = new SocksProxyHandler(this, socket);
synchronized (this) {
lock.lock();
try {
this.handlers.add(handler);
} finally {
lock.unlock();
}
handler.start();
}

protected synchronized void cleanupSocksProxyHandler(final SocksProxyHandler handler) {
this.handlers.remove(handler);
protected void cleanupSocksProxyHandler(final SocksProxyHandler handler) {
lock.lock();
try {
this.handlers.remove(handler);
} finally {
lock.unlock();
}
}

public SocketAddress getProxyAddress() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.hc.core5.http.ConnectionReuseStrategy;
import org.apache.hc.core5.http.ContentLengthStrategy;
Expand Down Expand Up @@ -141,6 +142,8 @@ public abstract class Http1IntegrationTest {

private final URIScheme scheme;

private final ReentrantLock lock = new ReentrantLock();

@RegisterExtension
private final Http1TestResources resources;

Expand Down Expand Up @@ -910,8 +913,11 @@ public void handleRequest(
responseChannel.sendInformation(new BasicHttpResponse(HttpStatus.SC_CONTINUE), context);
}
final HttpResponse response = new BasicHttpResponse(200);
synchronized (entityProducer) {
lock.lock();
try {
responseChannel.sendResponse(response, entityProducer, context);
} finally {
lock.unlock();
}
}
} catch (final Exception ignore) {
Expand All @@ -936,15 +942,21 @@ public void streamEnd(final List<? extends Header> trailers) throws HttpExceptio

@Override
public int available() {
synchronized (entityProducer) {
lock.lock();
try {
return entityProducer.available();
} finally {
lock.unlock();
}
}

@Override
public void produce(final DataStreamChannel channel) throws IOException {
synchronized (entityProducer) {
lock.lock();
try {
entityProducer.produce(channel);
} finally {
lock.unlock();
}
}

Expand Down
Loading

0 comments on commit 4d6d9f5

Please sign in to comment.