Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: quic listener will manage the underlying socket by itself. #5749

Merged
merged 8 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 125 additions & 49 deletions listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,38 +470,90 @@ func ListenPacket(network, addr string) (net.PacketConn, error) {
// unixgram will be used; otherwise, udp will be used).
//
// NOTE: This API is EXPERIMENTAL and may be changed or removed.
//
func (na NetworkAddress) ListenQUIC(ctx context.Context, portOffset uint, config net.ListenConfig, tlsConf *tls.Config, activeRequests *int64) (http3.QUICEarlyListener, error) {
mholt marked this conversation as resolved.
Show resolved Hide resolved
lnKey := listenerKey("quic"+na.Network, na.JoinHostPort(portOffset))

sharedEarlyListener, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) {
lnAny, err := na.Listen(ctx, portOffset, config)
if err != nil {
return nil, err
}

ln := lnAny.(net.PacketConn)

h3ln := ln
for {
// retrieve the underlying socket, so quic-go can optimize.
if unwrapper, ok := h3ln.(interface{ Unwrap() net.PacketConn }); ok {
h3ln = unwrapper.Unwrap()
} else {
break
}
}

sqs := newSharedQUICState(tlsConf, activeRequests)
// http3.ConfigureTLSConfig only uses this field and tls App sets this field as well
//nolint:gosec
quicTlsConfig := &tls.Config{GetConfigForClient: sqs.getConfigForClient}
earlyLn, err := quic.ListenEarly(h3ln, http3.ConfigureTLSConfig(quicTlsConfig), &quic.Config{
Allow0RTT: true,
RequireAddressValidation: func(clientAddr net.Addr) bool {
// TODO: make tunable?
return sqs.getActiveRequests() > 1000
},
})
if err != nil {
return nil, err
}
// using the original net.PacketConn to close them properly
return &sharedQuicListener{EarlyListener: earlyLn, packetConn: ln, sqs: sqs, key: lnKey}, nil
})
if err != nil {
return nil, err
}

sql := sharedEarlyListener.(*sharedQuicListener)
// add current tls.Config to sqs, so GetConfigForClient will always return the latest tls.Config in case of context cancellation,
// and the request counter will reflect current http server
ctx, cancel := sql.sqs.addState(tlsConf, activeRequests)

return &fakeCloseQuicListener{
sharedQuicListener: sql,
context: ctx,
contextCancel: cancel,
}, nil
}

// DEPRECATED: Use NetworkAddress.ListenQUIC instead. This function will likely be changed or removed in the future.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good -- thanks. Let's actually remove the original comment lines above this, like the other deprecated Listen* functions, since they shouldn't be used anymore anyway.

// TODO: See if we can find a more elegant solution closer to the new NetworkAddress.Listen API.
func ListenQUIC(ln net.PacketConn, tlsConf *tls.Config, activeRequests *int64) (http3.QUICEarlyListener, error) {
lnKey := listenerKey("quic+"+ln.LocalAddr().Network(), ln.LocalAddr().String())

sharedEarlyListener, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) {
sqtc := newSharedQUICTLSConfig(tlsConf)
sqs := newSharedQUICState(tlsConf, activeRequests)
// http3.ConfigureTLSConfig only uses this field and tls App sets this field as well
//nolint:gosec
quicTlsConfig := &tls.Config{GetConfigForClient: sqtc.getConfigForClient}
quicTlsConfig := &tls.Config{GetConfigForClient: sqs.getConfigForClient}
earlyLn, err := quic.ListenEarly(ln, http3.ConfigureTLSConfig(quicTlsConfig), &quic.Config{
Allow0RTT: true,
RequireAddressValidation: func(clientAddr net.Addr) bool {
var highLoad bool
if activeRequests != nil {
highLoad = atomic.LoadInt64(activeRequests) > 1000 // TODO: make tunable?
}
return highLoad
// TODO: make tunable?
return sqs.getActiveRequests() > 1000
},
})
if err != nil {
return nil, err
}
return &sharedQuicListener{EarlyListener: earlyLn, sqtc: sqtc, key: lnKey}, nil
return &sharedQuicListener{EarlyListener: earlyLn, sqs: sqs, key: lnKey}, nil
})
if err != nil {
return nil, err
}

sql := sharedEarlyListener.(*sharedQuicListener)
// add current tls.Config to sqtc, so GetConfigForClient will always return the latest tls.Config in case of context cancellation
ctx, cancel := sql.sqtc.addTLSConfig(tlsConf)
// add current tls.Config and request counter to sqs, so GetConfigForClient will always return the latest tls.Config in case of context cancellation,
// and the request counter will reflect current http server
ctx, cancel := sql.sqs.addState(tlsConf, activeRequests)

// TODO: to serve QUIC over a unix socket, currently we need to hold onto
// the underlying net.PacketConn (which we wrap as unixConn to keep count
Expand Down Expand Up @@ -534,76 +586,95 @@ type contextAndCancelFunc struct {
context.CancelFunc
}

// sharedQUICTLSConfig manages GetConfigForClient
// sharedQUICState manages GetConfigForClient and current number of active requests
// see issue: https://github.com/caddyserver/caddy/pull/4849
type sharedQUICTLSConfig struct {
rmu sync.RWMutex
tlsConfs map[*tls.Config]contextAndCancelFunc
activeTlsConf *tls.Config
}

// newSharedQUICTLSConfig creates a new sharedQUICTLSConfig
func newSharedQUICTLSConfig(tlsConfig *tls.Config) *sharedQUICTLSConfig {
sqtc := &sharedQUICTLSConfig{
tlsConfs: make(map[*tls.Config]contextAndCancelFunc),
activeTlsConf: tlsConfig,
}
sqtc.addTLSConfig(tlsConfig)
type sharedQUICState struct {
rmu sync.RWMutex
tlsConfs map[*tls.Config]contextAndCancelFunc
requestCounters map[*tls.Config]*int64
activeTlsConf *tls.Config
activeRequestsCounter *int64
}

// newSharedQUICState creates a new sharedQUICState
func newSharedQUICState(tlsConfig *tls.Config, activeRequests *int64) *sharedQUICState {
sqtc := &sharedQUICState{
tlsConfs: make(map[*tls.Config]contextAndCancelFunc),
requestCounters: make(map[*tls.Config]*int64),
activeTlsConf: tlsConfig,
activeRequestsCounter: activeRequests,
}
sqtc.addState(tlsConfig, activeRequests)
return sqtc
}

// getConfigForClient is used as tls.Config's GetConfigForClient field
func (sqtc *sharedQUICTLSConfig) getConfigForClient(ch *tls.ClientHelloInfo) (*tls.Config, error) {
sqtc.rmu.RLock()
defer sqtc.rmu.RUnlock()
return sqtc.activeTlsConf.GetConfigForClient(ch)
func (sqs *sharedQUICState) getConfigForClient(ch *tls.ClientHelloInfo) (*tls.Config, error) {
sqs.rmu.RLock()
defer sqs.rmu.RUnlock()
return sqs.activeTlsConf.GetConfigForClient(ch)
}

// getActiveRequests returns the number of active requests
func (sqs *sharedQUICState) getActiveRequests() int64 {
// Prevent a race when a context is cancelled and active request counter is being changed
sqs.rmu.RLock()
defer sqs.rmu.RUnlock()
return atomic.LoadInt64(sqs.activeRequestsCounter)
}

// addTLSConfig adds tls.Config to the map if not present and returns the corresponding context and its cancelFunc
// so that when cancelled, the active tls.Config will change
func (sqtc *sharedQUICTLSConfig) addTLSConfig(tlsConfig *tls.Config) (context.Context, context.CancelFunc) {
sqtc.rmu.Lock()
defer sqtc.rmu.Unlock()
// addState adds tls.Config and activeRequests to the map if not present and returns the corresponding context and its cancelFunc
// so that when cancelled, the active tls.Config and request counter will change
func (sqs *sharedQUICState) addState(tlsConfig *tls.Config, activeRequests *int64) (context.Context, context.CancelFunc) {
sqs.rmu.Lock()
defer sqs.rmu.Unlock()

if cacc, ok := sqtc.tlsConfs[tlsConfig]; ok {
if cacc, ok := sqs.tlsConfs[tlsConfig]; ok {
return cacc.Context, cacc.CancelFunc
}

ctx, cancel := context.WithCancel(context.Background())
wrappedCancel := func() {
cancel()

sqtc.rmu.Lock()
defer sqtc.rmu.Unlock()
sqs.rmu.Lock()
defer sqs.rmu.Unlock()

delete(sqtc.tlsConfs, tlsConfig)
if sqtc.activeTlsConf == tlsConfig {
// select another tls.Config, if there is none,
delete(sqs.tlsConfs, tlsConfig)
delete(sqs.requestCounters, tlsConfig)
if sqs.activeTlsConf == tlsConfig {
// select another tls.Config and request counter, if there is none,
// related sharedQuicListener will be destroyed anyway
for tc := range sqtc.tlsConfs {
sqtc.activeTlsConf = tc
for tc, counter := range sqs.requestCounters {
sqs.activeTlsConf = tc
sqs.activeRequestsCounter = counter
break
}
}
}
sqtc.tlsConfs[tlsConfig] = contextAndCancelFunc{ctx, wrappedCancel}
sqs.tlsConfs[tlsConfig] = contextAndCancelFunc{ctx, wrappedCancel}
sqs.requestCounters[tlsConfig] = activeRequests
// there should be at most 2 tls.Configs
if len(sqtc.tlsConfs) > 2 {
Log().Warn("quic listener tls configs are more than 2", zap.Int("number of configs", len(sqtc.tlsConfs)))
if len(sqs.tlsConfs) > 2 {
Log().Warn("quic listener tls configs are more than 2", zap.Int("number of configs", len(sqs.tlsConfs)))
}
return ctx, wrappedCancel
}

// sharedQuicListener is like sharedListener, but for quic.EarlyListeners.
type sharedQuicListener struct {
*quic.EarlyListener
sqtc *sharedQUICTLSConfig
key string
packetConn net.PacketConn // we have to hold these because quic-go won't close listeners it didn't create
sqs *sharedQUICState
key string
}

// Destruct closes the underlying QUIC listener.
// Destruct closes the underlying QUIC listener and its associated net.PacketConn.
func (sql *sharedQuicListener) Destruct() error {
return sql.EarlyListener.Close()
// close EarlyListener first to stop any operations being done to the net.PacketConn
_ = sql.EarlyListener.Close()
Comment on lines +674 to +675
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should at least do a log output in the rare event this does fail. Log().Error("closing QUIC early listener", zap.Error(err)) or something.

// then close the net.PacketConn
return sql.packetConn.Close()
}

// sharedPacketConn is like sharedListener, but for net.PacketConns.
Expand Down Expand Up @@ -652,6 +723,11 @@ var _ quic.OOBCapablePacketConn = (*fakeClosePacketConn)(nil)
// but doesn't actually use these methods, the only methods needed are `ReadMsgUDP` and `SyscallConn`.
var _ net.Conn = (*fakeClosePacketConn)(nil)

// Unwrap returns the underlying net.UDPConn for quic-go optimization
func (fcpc *fakeClosePacketConn) Unwrap() any {
return fcpc.UDPConn
}
WeidiDeng marked this conversation as resolved.
Show resolved Hide resolved

// Close won't close the underlying socket unless there is no more reference, then listenerPool will close it.
func (fcpc *fakeClosePacketConn) Close() error {
if atomic.CompareAndSwapInt32(&fcpc.closed, 0, 1) {
Expand Down
11 changes: 0 additions & 11 deletions modules/caddyhttp/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,17 +617,6 @@ func (app *App) Stop() error {
zap.Error(err),
zap.Strings("addresses", server.Listen))
}

// TODO: we have to manually close our listeners because quic-go won't
// close listeners it didn't create along with the server itself...
// see https://github.com/quic-go/quic-go/issues/3560
for _, el := range server.h3listeners {
if err := el.Close(); err != nil {
app.logger.Error("HTTP/3 listener close",
zap.Error(err),
zap.String("address", el.LocalAddr().String()))
}
}
}
stopH2Listener := func(server *Server) {
defer finishedShutdown.Done()
Expand Down
11 changes: 1 addition & 10 deletions modules/caddyhttp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ type Server struct {

server *http.Server
h3server *http3.Server
h3listeners []net.PacketConn // TODO: we have to hold these because quic-go won't close listeners it didn't create
h2listeners []*http2Listener
addresses []caddy.NetworkAddress

Expand Down Expand Up @@ -555,13 +554,7 @@ func (s *Server) findLastRouteWithHostMatcher() int {
// the listener, with Server s as the handler.
func (s *Server) serveHTTP3(addr caddy.NetworkAddress, tlsCfg *tls.Config) error {
addr.Network = getHTTP3Network(addr.Network)
lnAny, err := addr.Listen(s.ctx, 0, net.ListenConfig{})
if err != nil {
return err
}
ln := lnAny.(net.PacketConn)

h3ln, err := caddy.ListenQUIC(ln, tlsCfg, &s.activeRequests)
h3ln, err := addr.ListenQUIC(s.ctx, 0, net.ListenConfig{}, tlsCfg, &s.activeRequests)
if err != nil {
return fmt.Errorf("starting HTTP/3 QUIC listener: %v", err)
}
Expand All @@ -579,8 +572,6 @@ func (s *Server) serveHTTP3(addr caddy.NetworkAddress, tlsCfg *tls.Config) error
}
}

s.h3listeners = append(s.h3listeners, ln)

//nolint:errcheck
go s.h3server.ServeListener(h3ln)

Expand Down
Loading