Skip to content

Commit

Permalink
core: Apply SO_REUSEPORT to UDP sockets
Browse files Browse the repository at this point in the history
For some reason, 10 months ago when I implemented SO_REUSEPORT
for TCP, I didn't realize, or forgot, that it can be used for UDP too. It is a
much better solution than using deadline hacks to reuse a socket, at
least for TCP.

Then mholt/caddy-l4#132 was posted,
in which we see that UDP servers never actually stopped when the
L4 app was stopped. I verified this using this command:

    $ nc -u 127.0.0.1 55353

combined with POSTing configs to the /load admin endpoint (which
alternated between an echo server and a proxy server so I could tell
which config was being used).

I refactored the code to use SO_REUSEPORT for UDP, but of course
we still need graceful reloads on all platforms, not just Unix, so I
also implemented a deadline hack similar to what we used for
TCP before. That implementation for TCP was not perfect, possibly
having a logical (not data) race condition; but for UDP so far it
seems to be working. Verified the same way I verified that SO_REUSEPORT
works.

I think this code is slightly cleaner and I'm fairly confident this code
is effective.
  • Loading branch information
mholt committed Aug 9, 2023
1 parent fbb0ecf commit 06b24ed
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 42 deletions.
35 changes: 26 additions & 9 deletions listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,35 @@ func reuseUnixSocket(network, addr string) (any, error) {
return nil, nil
}

func listenTCPOrUnix(ctx context.Context, lnKey string, network, address string, config net.ListenConfig) (net.Listener, error) {
sharedLn, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) {
ln, err := config.Listen(ctx, network, address)
func listenReusable(ctx context.Context, lnKey string, network, address string, config net.ListenConfig) (any, error) {
switch network {
case "udp", "udp4", "udp6", "unixgram":
sharedPc, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) {
pc, err := config.ListenPacket(ctx, na.Network, address)
if err != nil {
return nil, err
}
return &sharedPacketConn{PacketConn: pc, key: lnKey}, nil
})
if err != nil {
return nil, err
}
spc := sharedPc.(*sharedPacketConn)
ln = &fakeClosePacketConn{spc: spc, UDPConn: spc.PacketConn.(*net.UDPConn)}

default:
sharedLn, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) {
ln, err := config.Listen(ctx, network, address)
if err != nil {
return nil, err
}
return &sharedListener{Listener: ln, key: lnKey}, nil
})
if err != nil {
return nil, err
}
return &sharedListener{Listener: ln, key: lnKey}, nil
})
if err != nil {
return nil, err
return &fakeCloseListener{sharedListener: sharedLn.(*sharedListener), keepAlivePeriod: config.KeepAlive}, nil
}
return &fakeCloseListener{sharedListener: sharedLn.(*sharedListener), keepAlivePeriod: config.KeepAlive}, nil
}

// fakeCloseListener is a private wrapper over a listener that
Expand Down Expand Up @@ -98,7 +115,7 @@ func (fcl *fakeCloseListener) Accept() (net.Conn, error) {
// so that it's clear in the code that side-effects are shared with other
// users of this listener, not just our own reference to it; we also don't
// do anything with the error because all we could do is log it, but we
// expliclty assign it to nothing so we don't forget it's there if needed
// explicitly assign it to nothing so we don't forget it's there if needed
_ = fcl.sharedListener.clearDeadline()

if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
Expand Down
34 changes: 31 additions & 3 deletions listen_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package caddy
import (
"context"
"errors"
"io"
"io/fs"
"net"
"sync/atomic"
Expand Down Expand Up @@ -87,7 +88,7 @@ func reuseUnixSocket(network, addr string) (any, error) {
return nil, nil
}

func listenTCPOrUnix(ctx context.Context, lnKey string, network, address string, config net.ListenConfig) (net.Listener, error) {
func listenReusable(ctx context.Context, lnKey string, network, address string, config net.ListenConfig) (any, error) {
// wrap any Control function set by the user so we can also add our reusePort control without clobbering theirs
oldControl := config.Control
config.Control = func(network, address string, c syscall.RawConn) error {
Expand All @@ -103,7 +104,14 @@ func listenTCPOrUnix(ctx context.Context, lnKey string, network, address string,
// we still put it in the listenerPool so we can count how many
// configs are using this socket; necessary to ensure we can know
// whether to enforce shutdown delays, for example (see #5393).
ln, err := config.Listen(ctx, network, address)
var ln io.Closer
var err error
switch network {
case "udp", "udp4", "udp6", "unixgram":
ln, err = config.ListenPacket(ctx, network, address)
default:
ln, err = config.Listen(ctx, network, address)
}
if err == nil {
listenerPool.LoadOrStore(lnKey, nil)
}
Expand All @@ -119,7 +127,15 @@ func listenTCPOrUnix(ctx context.Context, lnKey string, network, address string,

// lightly wrap the listener so that when it is closed,
// we can decrement the usage pool counter
return deleteListener{ln, lnKey}, err
switch specificLn := ln.(type) {
case net.Listener:
return deleteListener{specificLn, lnKey}, err
case net.PacketConn:
return deletePacketConn{specificLn, lnKey}, err
}

// other types, I guess we just return them directly
return ln, err
}

// reusePort sets SO_REUSEPORT. Ineffective for unix sockets.
Expand Down Expand Up @@ -171,3 +187,15 @@ func (dl deleteListener) Close() error {
_, _ = listenerPool.Delete(dl.lnKey)
return dl.Listener.Close()
}

// deletePacketConn is like deleteListener, but
// for net.PacketConns.
type deletePacketConn struct {
net.PacketConn
lnKey string
}

func (dl deletePacketConn) Close() error {
_, _ = listenerPool.Delete(dl.lnKey)
return dl.PacketConn.Close()
}
78 changes: 48 additions & 30 deletions listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,13 @@ func (na NetworkAddress) Listen(ctx context.Context, portOffset uint, config net
}

func (na NetworkAddress) listen(ctx context.Context, portOffset uint, config net.ListenConfig) (any, error) {
var ln any
var err error
var address string
var unixFileMode fs.FileMode
var isAbtractUnixSocket bool
var (
ln any
err error
address string
unixFileMode fs.FileMode
isAbtractUnixSocket bool
)

// split unix socket addr early so lnKey
// is independent of permissions bits
Expand Down Expand Up @@ -180,27 +182,10 @@ func (na NetworkAddress) listen(ctx context.Context, portOffset uint, config net

lnKey := listenerKey(na.Network, address)

switch na.Network {
case "tcp", "tcp4", "tcp6", "unix", "unixpacket":
ln, err = listenTCPOrUnix(ctx, lnKey, na.Network, address, config)
case "unixgram":
ln, err = config.ListenPacket(ctx, na.Network, address)
case "udp", "udp4", "udp6":
sharedPc, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) {
pc, err := config.ListenPacket(ctx, na.Network, address)
if err != nil {
return nil, err
}
return &sharedPacketConn{PacketConn: pc, key: lnKey}, nil
})
if err != nil {
return nil, err
}
spc := sharedPc.(*sharedPacketConn)
ln = &fakeClosePacketConn{spc: spc, UDPConn: spc.PacketConn.(*net.UDPConn)}
}
if strings.HasPrefix(na.Network, "ip") {
ln, err = config.ListenPacket(ctx, na.Network, address)
} else {
ln, err = listenReusable(ctx, lnKey, na.Network, address, config)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -643,17 +628,50 @@ type fakeClosePacketConn struct {
*net.UDPConn // embedded, so we also become a net.PacketConn and enable several other optimizations done by quic-go
}

// interface guard for extra optimizations
// needed by QUIC implementation: https://github.com/caddyserver/caddy/issues/3998, https://github.com/caddyserver/caddy/issues/5605
var _ quic.OOBCapablePacketConn = (*fakeClosePacketConn)(nil)
// Interface guards for extra optimizations
// needed by QUIC implementation:
// https://github.com/caddyserver/caddy/issues/3998
// https://github.com/caddyserver/caddy/issues/5605
var (
_ quic.OOBCapablePacketConn = (*fakeClosePacketConn)(nil)

// https://pkg.go.dev/golang.org/x/net/ipv4#NewPacketConn is used by quic-go and requires
// a net.PacketConn type assertable to a net.Conn, but doesn't actually use these methods;
// the only methods needed are `ReadMsgUDP` and `SyscallConn`.
_ net.Conn = (*fakeClosePacketConn)(nil)
)

// https://pkg.go.dev/golang.org/x/net/ipv4#NewPacketConn is used by quic-go and requires a net.PacketConn type assertable to a net.Conn,
// but doesn't actually use these methods, the only methods needed are `ReadMsgUDP` and `SyscallConn`.
var _ net.Conn = (*fakeClosePacketConn)(nil)
func (fcpc *fakeClosePacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
// if the listener is already "closed", return error
if atomic.LoadInt32(&fcpc.closed) == 1 {
return 0, nil, &net.OpError{
Op: "readfrom",
Net: fcpc.LocalAddr().Network(),
Addr: fcpc.LocalAddr(),
Err: errFakeClosed,
}
}

// call underlying readfrom
n, addr, err = fcpc.spc.ReadFrom(p)
if err != nil {
// this server was stopped, so clear the deadline and let
// any new server continue reading; but we will exit
if atomic.LoadInt32(&fcpc.closed) == 1 {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
fcpc.SetReadDeadline(time.Time{})

Check failure on line 662 in listeners.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Error return value of `fcpc.SetReadDeadline` is not checked (errcheck)
}
}
return
}

return
}

// Close won't close the underlying socket unless there is no more reference, then listenerPool will close it.
func (fcpc *fakeClosePacketConn) Close() error {
if atomic.CompareAndSwapInt32(&fcpc.closed, 0, 1) {
fcpc.SetReadDeadline(time.Now()) // unblock ReadFrom() calls to kick old servers out of their loops

Check failure on line 674 in listeners.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Error return value of `fcpc.SetReadDeadline` is not checked (errcheck)
_, _ = listenerPool.Delete(fcpc.spc.key)
}
return nil
Expand Down

0 comments on commit 06b24ed

Please sign in to comment.