Skip to content

Commit

Permalink
Merge #3459 into 1.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Oct 17, 2024
2 parents dbaacac + e00891b commit 52326af
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,17 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.util.ReferenceCounted;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
Expand Down Expand Up @@ -123,9 +131,9 @@ else if (recorder instanceof ContextAwareChannelMetricsRecorder) {
.as(ChannelOperations.class);
}

final Connection connection;
Connection connection;
final FluxReceive inbound;
final ConnectionObserver listener;
ConnectionObserver listener;
final Sinks.Empty<Void> onTerminate;

volatile Subscription outboundSubscription;
Expand Down Expand Up @@ -206,7 +214,9 @@ public void dispose() {
if (!inbound.isDisposed()) {
discard();
}
connection.dispose();
if (!connection.isDisposed()) {
connection.dispose();
}
}

@Override
Expand Down Expand Up @@ -502,6 +512,8 @@ protected final void terminate() {
// and it is guarded by rebind(connection), so tryEmitEmpty() should happen just once
onTerminate.tryEmitEmpty();
listener.onStateChange(this, ConnectionObserver.State.DISCONNECTING);
connection = new DisposedConnection(channel());
listener = ConnectionObserver.emptyListener();
}
}

Expand Down Expand Up @@ -681,4 +693,111 @@ static OnSetup empty() {
Subscription.class,
"outboundSubscription");

static final class DisposedChannel extends AbstractChannel {

final DefaultChannelConfig config;
final SocketAddress localAddress;
final ChannelMetadata metadata;
final SocketAddress remoteAddress;

DisposedChannel(Channel actual) {
super(null);
this.metadata = actual.metadata();
this.config = new DefaultChannelConfig(this);
this.localAddress = actual.localAddress();
this.remoteAddress = actual.remoteAddress();
}

@Override
public ChannelFuture closeFuture() {
return newSucceededFuture();
}

@Override
public ChannelConfig config() {
return config;
}

@Override
protected void doBeginRead() {
throw new UnsupportedOperationException();
}

@Override
protected void doBind(SocketAddress socketAddress) {
throw new UnsupportedOperationException();
}

@Override
protected void doClose() {
throw new UnsupportedOperationException();
}

@Override
protected void doDisconnect() {
throw new UnsupportedOperationException();
}

@Override
protected void doWrite(ChannelOutboundBuffer channelOutboundBuffer) {
throw new UnsupportedOperationException();
}

@Override
public boolean isActive() {
return false;
}

@Override
protected boolean isCompatible(EventLoop eventLoop) {
return false;
}

@Override
public boolean isOpen() {
return false;
}

@Override
protected SocketAddress localAddress0() {
return localAddress;
}

@Override
public ChannelMetadata metadata() {
return metadata;
}

@Override
protected AbstractUnsafe newUnsafe() {
return new DisposedChannelUnsafe();
}

@Override
protected SocketAddress remoteAddress0() {
return remoteAddress;
}

final class DisposedChannelUnsafe extends AbstractUnsafe {

@Override
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
promise.setFailure(new UnsupportedOperationException());
}
}
}

static final class DisposedConnection implements Connection {

final Channel channel;

DisposedConnection(Channel actual) {
this.channel = new DisposedChannel(actual);
}

@Override
public Channel channel() {
return channel;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2011-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,7 +46,6 @@ final class FluxReceive extends Flux<Object> implements Subscription, Disposable

static final int QUEUE_LOW_LIMIT = 32;

final Channel channel;
final ChannelOperations<?, ?> parent;
final EventLoop eventLoop;

Expand Down Expand Up @@ -78,9 +77,8 @@ final class FluxReceive extends Flux<Object> implements Subscription, Disposable
//reset channel to manual read if re-used

this.parent = parent;
this.channel = parent.channel();
this.eventLoop = channel.eventLoop();
channel.config()
this.eventLoop = parent.channel().eventLoop();
parent.channel().config()
.setAutoRead(false);
CANCEL.lazySet(this, (state) -> {
if (eventLoop.inEventLoop()) {
Expand Down Expand Up @@ -155,7 +153,7 @@ final void startReceiver(CoreSubscriber<? super Object> s) {
if (!subscribedOnce) {
subscribedOnce = true;
if (log.isDebugEnabled()) {
log.debug(format(channel, "{}: subscribing inbound receiver"), this);
log.debug(format(parent.channel(), "{}: subscribing inbound receiver"), this);
}
if ((inboundDone && getPending() == 0) || isCancelled()) {
if (inboundError != null) {
Expand All @@ -182,7 +180,7 @@ final void startReceiver(CoreSubscriber<? super Object> s) {
}
else {
if (log.isDebugEnabled()) {
log.debug(format(channel, "{}: Rejecting additional inbound receiver."), this);
log.debug(format(parent.channel(), "{}: Rejecting additional inbound receiver."), this);
}

String msg = "Rejecting additional inbound receiver. State=" + toString(false);
Expand Down Expand Up @@ -218,7 +216,7 @@ final void cleanQueue(@Nullable Queue<Object> q) {
Object o;
while ((o = q.poll()) != null) {
if (log.isDebugEnabled()) {
log.debug(format(channel, "{}: dropping frame {}"), this, parent.asDebugLogMessage(o));
log.debug(format(parent.channel(), "{}: dropping frame {}"), this, parent.asDebugLogMessage(o));
}
ReferenceCountUtil.release(o);
}
Expand Down Expand Up @@ -283,11 +281,11 @@ final void drainReceiver() {
try {
if (logLeakDetection.isDebugEnabled()) {
if (v instanceof ByteBuf) {
((ByteBuf) v).touch(format(channel, "Receiver " + a.getClass().getName() +
((ByteBuf) v).touch(format(parent.channel(), "Receiver " + a.getClass().getName() +
" will handle the message from this point"));
}
else if (v instanceof ByteBufHolder) {
((ByteBufHolder) v).touch(format(channel, "Receiver " + a.getClass().getName() +
((ByteBufHolder) v).touch(format(parent.channel(), "Receiver " + a.getClass().getName() +
" will handle the message from this point"));
}
}
Expand Down Expand Up @@ -322,7 +320,7 @@ else if (v instanceof ByteBufHolder) {
receiverFastpath = true;
if (needRead) {
needRead = false;
channel.config()
parent.channel().config()
.setAutoRead(true);
}
//CHECKSTYLE:OFF
Expand All @@ -336,13 +334,13 @@ else if (v instanceof ByteBufHolder) {
if ((receiverDemand -= e) > 0L || (e > 0L && q.size() < QUEUE_LOW_LIMIT)) {
if (needRead) {
needRead = false;
channel.config()
parent.channel().config()
.setAutoRead(true);
}
}
else if (!needRead) {
needRead = true;
channel.config()
parent.channel().config()
.setAutoRead(false);
}

Expand All @@ -358,7 +356,7 @@ else if (!needRead) {
final void onInboundNext(Object msg) {
if (inboundDone || isCancelled()) {
if (log.isDebugEnabled()) {
log.debug(format(channel, "{}: dropping frame {}"), this, parent.asDebugLogMessage(msg));
log.debug(format(parent.channel(), "{}: dropping frame {}"), this, parent.asDebugLogMessage(msg));
}
ReferenceCountUtil.release(msg);
return;
Expand All @@ -368,11 +366,11 @@ final void onInboundNext(Object msg) {
try {
if (logLeakDetection.isDebugEnabled()) {
if (msg instanceof ByteBuf) {
((ByteBuf) msg).touch(format(channel, "Receiver " + receiver.getClass().getName() +
((ByteBuf) msg).touch(format(parent.channel(), "Receiver " + receiver.getClass().getName() +
" will handle the message from this point"));
}
else if (msg instanceof ByteBufHolder) {
((ByteBufHolder) msg).touch(format(channel, "Receiver " + receiver.getClass().getName() +
((ByteBufHolder) msg).touch(format(parent.channel(), "Receiver " + receiver.getClass().getName() +
" will handle the message from this point"));
}
}
Expand All @@ -393,10 +391,10 @@ else if (msg instanceof ByteBufHolder) {
}
if (logLeakDetection.isDebugEnabled()) {
if (msg instanceof ByteBuf) {
((ByteBuf) msg).touch(format(channel, "Buffered ByteBuf in the inbound buffer queue"));
((ByteBuf) msg).touch(format(parent.channel(), "Buffered ByteBuf in the inbound buffer queue"));
}
else if (msg instanceof ByteBufHolder) {
((ByteBufHolder) msg).touch(format(channel, "Buffered ByteBufHolder in the inbound buffer queue"));
((ByteBufHolder) msg).touch(format(parent.channel(), "Buffered ByteBufHolder in the inbound buffer queue"));
}
}
q.offer(msg);
Expand All @@ -423,20 +421,20 @@ final void onInboundError(Throwable err) {
if (isCancelled() || inboundDone) {
if (log.isDebugEnabled()) {
if (AbortedException.isConnectionReset(err)) {
log.debug(format(channel, "Connection reset has been observed post termination"), err);
log.debug(format(parent.channel(), "Connection reset has been observed post termination"), err);
}
else {
log.warn(format(channel, "An exception has been observed post termination"), err);
log.warn(format(parent.channel(), "An exception has been observed post termination"), err);
}
}
else if (log.isWarnEnabled() && !AbortedException.isConnectionReset(err)) {
log.warn(format(channel, "An exception has been observed post termination, use DEBUG level to see the full stack: {}"), err.toString());
log.warn(format(parent.channel(), "An exception has been observed post termination, use DEBUG level to see the full stack: {}"), err.toString());
}
return;
}
CoreSubscriber<?> receiver = this.receiver;
this.inboundDone = true;
if (channel.isActive()) {
if (parent.channel().isActive()) {
parent.markPersistent(false);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -266,27 +266,27 @@ static final class TracingMapHandle implements BiFunction<Mono<Void>, Connection
public Mono<Void> apply(Mono<Void> voidMono, Connection connection) {
HttpServerRequest braveRequest = connection.channel().attr(REQUEST_ATTR_KEY).get();
Span span = connection.channel().attr(SPAN_ATTR_KEY).get();
connection.onTerminate()
.subscribe(
null,
t -> cleanup(connection.channel()),
() -> cleanup(connection.channel()));
// At the point of doFinally the connection might be disposed and there might be no event loop
// associated with the disposed connection
EventLoop eventLoop = connection.channel().eventLoop();
return voidMono.doFinally(sig -> {
if (braveRequest.unwrap() instanceof reactor.netty.http.server.HttpServerResponse) {
reactor.netty.http.server.HttpServerResponse response =
(reactor.netty.http.server.HttpServerResponse) braveRequest.unwrap();
Span localSpan = sig == SignalType.CANCEL ? span.annotate("cancel") : span;
HttpServerResponse braveResponse =
new DelegatingHttpResponse(response, braveRequest, throwable);
response.withConnection(conn -> {
conn.onTerminate()
.subscribe(
null,
t -> cleanup(connection.channel()),
() -> cleanup(connection.channel()));
EventLoop eventLoop = conn.channel().eventLoop();
if (eventLoop.inEventLoop()) {
handler.handleSend(braveResponse, localSpan);
}
else {
eventLoop.execute(() -> handler.handleSend(braveResponse, localSpan));
}
});
if (eventLoop.inEventLoop()) {
handler.handleSend(braveResponse, localSpan);
}
else {
eventLoop.execute(() -> handler.handleSend(braveResponse, localSpan));
}
}
})
.doOnError(this::throwable)
Expand Down
Loading

0 comments on commit 52326af

Please sign in to comment.