Skip to content

Commit

Permalink
Add maxIdleTime to Http2Pool (#2257)
Browse files Browse the repository at this point in the history
Related to #2151 and #2262
  • Loading branch information
violetagg committed Jun 8, 2022
1 parent f5e79b7 commit d9ae62c
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,10 @@ PoolBuilder<T, PoolConfig<T>> newPoolInternal(
return poolBuilder;
}

public long maxIdleTime() {
return this.maxIdleTime;
}

public long maxLifeTime() {
return maxLifeTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ static final class PooledConnectionAllocator {
this.remoteAddress = remoteAddress;
this.resolver = resolver;
this.pool = poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
poolConFig -> new Http2Pool(poolConFig, poolFactory.maxLifeTime()));
poolConFig -> new Http2Pool(poolConFig, poolFactory.maxIdleTime(), poolFactory.maxLifeTime()));
}

Publisher<Connection> connectChannel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
* <ul>
* <li>The connection is closed.</li>
* <li>The connection has reached its life time and there are no active streams.</li>
* <li>The connection has reached its idle time and there are no active streams.</li>
* <li>When the client is in one of the two modes: 1) H2 and HTTP/1.1 or 2) H2C and HTTP/1.1,
* and the negotiated protocol is HTTP/1.1.</li>
* </ul>
Expand Down Expand Up @@ -148,18 +149,20 @@ final class Http2Pool implements InstrumentedPool<Connection>, InstrumentedPool.
AtomicIntegerFieldUpdater.newUpdater(Http2Pool.class, "wip");

final Clock clock;
final long maxIdleTime;
final long maxLifeTime;
final PoolConfig<Connection> poolConfig;

long lastInteractionTimestamp;

Http2Pool(PoolConfig<Connection> poolConfig, long maxLifeTime) {
Http2Pool(PoolConfig<Connection> poolConfig, long maxIdleTime, long maxLifeTime) {
if (poolConfig.allocationStrategy().getPermits(0) != 0) {
throw new IllegalArgumentException("No support for configuring minimum number of connections");
}
this.clock = poolConfig.clock();
this.connections = new ConcurrentLinkedQueue<>();
this.lastInteractionTimestamp = clock.millis();
this.maxIdleTime = maxIdleTime;
this.maxLifeTime = maxLifeTime;
this.pending = new ConcurrentLinkedDeque<>();
this.poolConfig = poolConfig;
Expand Down Expand Up @@ -461,7 +464,18 @@ Slot findConnection(ConcurrentLinkedQueue<Slot> resources) {
continue;
}

// check that the connection's max lifetime has not been reached
// check whether the connection's idle time has been reached
if (maxIdleTime != -1 && slot.idleTime() >= maxIdleTime) {
if (log.isDebugEnabled()) {
log.debug(format(slot.connection.channel(), "Idle time is reached, remove from pool"));
}
//"FutureReturnValueIgnored" this is deliberate
slot.connection.channel().close();
slot.invalidate();
continue;
}

// check whether the connection's max lifetime has been reached
if (maxLifeReached(slot)) {
if (slot.concurrency() > 0) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -798,6 +812,8 @@ static final class Slot extends AtomicBoolean {
final Http2Pool pool;
final String applicationProtocol;

long idleTimestamp;

volatile ChannelHandlerContext http2FrameCodecCtx;
volatile ChannelHandlerContext http2MultiplexHandlerCtx;
volatile ChannelHandlerContext h2cUpgradeHandlerCtx;
Expand Down Expand Up @@ -840,7 +856,17 @@ void deactivate() {
}

int decrementConcurrencyAndGet() {
return CONCURRENCY.decrementAndGet(this);
int concurrency = CONCURRENCY.decrementAndGet(this);
idleTimestamp = pool.clock.millis();
return concurrency;
}

long idleTime() {
if (concurrency() > 0) {
return 0L;
}
long idleTime = idleTimestamp != 0 ? idleTimestamp : creationTimestamp;
return pool.clock.millis() - idleTime;
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void acquireInvalidate() {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1));
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1));

try {
List<PooledRef<Connection>> acquired = new ArrayList<>();
Expand Down Expand Up @@ -94,7 +94,7 @@ void acquireRelease() {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1));
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1));

try {
List<PooledRef<Connection>> acquired = new ArrayList<>();
Expand Down Expand Up @@ -138,7 +138,7 @@ void evictClosedConnection() throws Exception {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1));
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1));

Connection connection = null;
try {
Expand Down Expand Up @@ -211,7 +211,7 @@ private void evictClosedConnectionMaxConnectionsNotReached(boolean closeSecond)
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 2);
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1));
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1));

Connection connection = null;
try {
Expand Down Expand Up @@ -293,7 +293,7 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1));
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1));

Connection connection = null;
try {
Expand Down Expand Up @@ -335,6 +335,120 @@ void evictClosedConnectionMaxConnectionsReached() throws Exception {
}
}

@Test
void maxIdleTime() throws Exception {
PoolBuilder<Connection, PoolConfig<Connection>> poolBuilder =
PoolBuilder.from(Mono.fromSupplier(() -> {
Channel channel = new EmbeddedChannel(
new TestChannelId(),
Http2FrameCodecBuilder.forClient().build());
return Connection.from(channel);
}))
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10, -1));

Connection connection1 = null;
Connection connection2 = null;
try {
PooledRef<Connection> acquired1 = http2Pool.acquire().block();

assertThat(acquired1).isNotNull();
assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);

connection1 = acquired1.poolable();
ChannelId id1 = connection1.channel().id();

acquired1.invalidate().block();

Thread.sleep(15);

PooledRef<Connection> acquired2 = http2Pool.acquire().block();

assertThat(acquired2).isNotNull();
assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);

connection2 = acquired2.poolable();
ChannelId id2 = connection2.channel().id();

assertThat(id1).isNotEqualTo(id2);

acquired2.invalidate().block();

assertThat(http2Pool.activeStreams()).isEqualTo(0);
assertThat(http2Pool.connections.size()).isEqualTo(1);
}
finally {
if (connection1 != null) {
((EmbeddedChannel) connection1.channel()).finishAndReleaseAll();
connection1.dispose();
}
if (connection2 != null) {
((EmbeddedChannel) connection2.channel()).finishAndReleaseAll();
connection2.dispose();
}
}
}

@Test
void maxIdleTimeActiveStreams() throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(new TestChannelId(),
Http2FrameCodecBuilder.forClient().build(), new Http2MultiplexHandler(new ChannelHandlerAdapter() {}));
PoolBuilder<Connection, PoolConfig<Connection>> poolBuilder =
PoolBuilder.from(Mono.just(Connection.from(channel)))
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10, -1));

Connection connection1 = null;
Connection connection2 = null;
try {
List<PooledRef<Connection>> acquired = new ArrayList<>();
http2Pool.acquire().subscribe(acquired::add);
http2Pool.acquire().subscribe(acquired::add);

channel.runPendingTasks();

assertThat(acquired).hasSize(2);
assertThat(http2Pool.activeStreams()).isEqualTo(2);
assertThat(http2Pool.connections.size()).isEqualTo(1);

connection1 = acquired.get(0).poolable();
ChannelId id1 = connection1.channel().id();

acquired.get(0).invalidate().block();

Thread.sleep(15);

assertThat(http2Pool.activeStreams()).isEqualTo(1);
assertThat(http2Pool.connections.size()).isEqualTo(1);

connection2 = acquired.get(1).poolable();
ChannelId id2 = connection2.channel().id();

assertThat(id1).isEqualTo(id2);

acquired.get(1).invalidate().block();

assertThat(http2Pool.activeStreams()).isEqualTo(0);
assertThat(http2Pool.connections.size()).isEqualTo(1);
}
finally {
if (connection1 != null) {
((EmbeddedChannel) connection1.channel()).finishAndReleaseAll();
connection1.dispose();
}
if (connection2 != null) {
((EmbeddedChannel) connection2.channel()).finishAndReleaseAll();
connection2.dispose();
}
}
}

@Test
void maxLifeTime() throws Exception {
PoolBuilder<Connection, PoolConfig<Connection>> poolBuilder =
Expand All @@ -347,7 +461,7 @@ void maxLifeTime() throws Exception {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10));
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 10));

Connection connection1 = null;
Connection connection2 = null;
Expand Down Expand Up @@ -411,7 +525,7 @@ void maxLifeTimeMaxConnectionsNotReached() throws Exception {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 2);
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 50));
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 50));

Connection connection1 = null;
Connection connection2 = null;
Expand Down Expand Up @@ -471,7 +585,7 @@ void maxLifeTimeMaxConnectionsReached() throws Exception {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, 10));
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, 10));

Connection connection = null;
try {
Expand Down Expand Up @@ -514,7 +628,7 @@ void minConnectionsConfigNotSupported() {
PoolBuilder<Connection, PoolConfig<Connection>> poolBuilder =
PoolBuilder.from(Mono.<Connection>empty()).sizeBetween(1, 2);
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> poolBuilder.build(config -> new Http2Pool(config, -1)));
.isThrownBy(() -> poolBuilder.build(config -> new Http2Pool(config, -1, -1)));
}

@Test
Expand All @@ -525,7 +639,7 @@ void nonHttp2ConnectionEmittedOnce() {
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 1);
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1));
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, -1, -1));

try {
PooledRef<Connection> acquired = http2Pool.acquire().block(Duration.ofSeconds(1));
Expand Down

0 comments on commit d9ae62c

Please sign in to comment.