Skip to content

Commit

Permalink
Adds generic UsagePoolOf, deprecates UsagePool
Browse files Browse the repository at this point in the history
  • Loading branch information
kkroo committed Aug 11, 2023
1 parent d813550 commit d31392e
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 56 deletions.
2 changes: 1 addition & 1 deletion listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (sl *sharedListener) setDeadline() error {
return err
}

// Destruct is called by the UsagePool when the listener is
// Destruct is called by UsagePoolOf when the listener is
// finally not being used anymore. It closes the socket.
func (sl *sharedListener) Destruct() error {
return sl.Listener.Close()
Expand Down
6 changes: 5 additions & 1 deletion listen_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func listenTCPOrUnix(ctx context.Context, lnKey string, network, address string,
// whether to enforce shutdown delays, for example (see #5393).
ln, err := config.Listen(ctx, network, address)
if err == nil {
listenerPool.LoadOrStore(lnKey, nil)
listenerPool.LoadOrStore(lnKey, deleteListener{})
}

// if new listener is a unix socket, make sure we can reuse it later
Expand Down Expand Up @@ -171,3 +171,7 @@ func (dl deleteListener) Close() error {
_, _ = listenerPool.Delete(dl.lnKey)
return dl.Listener.Close()
}

func (dl deleteListener) Destruct() error {
return nil
}
2 changes: 1 addition & 1 deletion listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ type ListenerWrapper interface {
}

// listenerPool stores and allows reuse of active listeners.
var listenerPool = NewUsagePool()
var listenerPool = NewUsagePoolOf[string, Destructor]()

const maxPortSpan = 65535

Expand Down
7 changes: 6 additions & 1 deletion logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,12 @@ var (
defaultLoggerMu sync.RWMutex
)

var writers = NewUsagePool()
type DestructableWriter interface {
Destructor
io.Writer
}

var writers = NewUsagePoolOf[string, DestructableWriter]()

const DefaultLoggerName = "default"

Expand Down
2 changes: 1 addition & 1 deletion modules/caddyhttp/ip_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func init() {
// so that it's ready to be used when requests start getting handled.
// A read lock should probably be used to get the cached value if the
// ranges can change at runtime (e.g. periodically refreshed).
// Using a `caddy.UsagePool` may be a good idea to avoid having refetch
// Using a `caddy.UsagePoolOf` may be a good idea to avoid having refetch
// the values when a config reload occurs, which would waste time.
//
// If the list of IP ranges cannot be sourced, then provisioning SHOULD
Expand Down
20 changes: 1 addition & 19 deletions modules/caddyhttp/reverseproxy/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,25 +76,7 @@ func (adminUpstreams) handleUpstreams(w http.ResponseWriter, r *http.Request) er

// Iterate over the upstream pool (needs to be fast)
var rangeErr error
hosts.Range(func(key, val any) bool {
address, ok := key.(string)
if !ok {
rangeErr = caddy.APIError{
HTTPStatus: http.StatusInternalServerError,
Err: fmt.Errorf("could not type assert upstream address"),
}
return false
}

upstream, ok := val.(*Host)
if !ok {
rangeErr = caddy.APIError{
HTTPStatus: http.StatusInternalServerError,
Err: fmt.Errorf("could not type assert upstream struct"),
}
return false
}

hosts.Range(func(address string, upstream *Host) bool {
results = append(results, upstreamStatus{
Address: address,
NumRequests: upstream.NumRequests(),
Expand Down
8 changes: 6 additions & 2 deletions modules/caddyhttp/reverseproxy/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (u *Upstream) fillHost() {
host := new(Host)
existingHost, loaded := hosts.LoadOrStore(u.String(), host)
if loaded {
host = existingHost.(*Host)
host = existingHost
}
u.Host = host
}
Expand All @@ -140,6 +140,10 @@ type Host struct {
fails int64
}

func (h *Host) Destruct() error {
return nil
}

// NumRequests returns the number of active requests to the upstream.
func (h *Host) NumRequests() int {
return int(atomic.LoadInt64(&h.numRequests))
Expand Down Expand Up @@ -229,7 +233,7 @@ func GetDialInfo(ctx context.Context) (DialInfo, bool) {
// currently in use by active configuration(s). This
// allows the state of remote hosts to be preserved
// through config reloads.
var hosts = caddy.NewUsagePool()
var hosts = caddy.NewUsagePoolOf[string, *Host]()

// dialInfoVarKey is the key used for the variable that holds
// the dial info for the upstream connection.
Expand Down
6 changes: 3 additions & 3 deletions modules/caddypki/acmeserver/acmeserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (ash Handler) openDatabase() (*db.AuthDB, error) {

err := os.MkdirAll(dbFolder, 0o755)
if err != nil {
return nil, fmt.Errorf("making folder for CA database: %v", err)
return databaseCloser{}, fmt.Errorf("making folder for CA database: %v", err)
}

dbConfig := &db.Config{
Expand All @@ -256,7 +256,7 @@ func (ash Handler) openDatabase() (*db.AuthDB, error) {
ash.logger.Debug("loaded preexisting CA database", zap.String("db_key", key))
}

return database.(databaseCloser).DB, err
return database.DB, err
}

// makeClient creates an ACME client which will use a custom
Expand Down Expand Up @@ -312,7 +312,7 @@ const defaultPathPrefix = "/acme/"

var (
keyCleaner = regexp.MustCompile(`[^\w.-_]`)
databasePool = caddy.NewUsagePool()
databasePool = caddy.NewUsagePoolOf[string, databaseCloser]()
)

type databaseCloser struct {
Expand Down
5 changes: 2 additions & 3 deletions modules/caddytls/connpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -324,7 +323,7 @@ func (p *ConnectionPolicy) buildStandardTLSConfig(ctx caddy.Context) error {
}
ctx.OnCancel(func() { _, _ = secretsLogPool.Delete(filename) })

cfg.KeyLogWriter = logFile.(io.Writer)
cfg.KeyLogWriter = logFile

tlsApp.logger.Warn("TLS SECURITY COMPROMISED: secrets logging is enabled!",
zap.String("log_filename", filename))
Expand Down Expand Up @@ -598,4 +597,4 @@ type destructableWriter struct{ *os.File }

func (d destructableWriter) Destruct() error { return d.Close() }

var secretsLogPool = caddy.NewUsagePool()
var secretsLogPool = caddy.NewUsagePoolOf[string, destructableWriter]()
60 changes: 36 additions & 24 deletions usagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"sync/atomic"
)

// UsagePool is a thread-safe map that pools values
// UsagePoolOf is a thread-safe map that pools values
// based on usage (reference counting). Values are
// only inserted if they do not already exist. There
// are two ways to add values to the pool:
Expand All @@ -47,23 +47,24 @@ import (
// was stored. Deleting too many times will panic.
//
// The implementation does not use a sync.Pool because
// UsagePool needs additional atomicity to run the
// UsagePoolOf needs additional atomicity to run the
// constructor functions when creating a new value when
// LoadOrNew is used. (We could probably use sync.Pool
// but we'd still have to layer our own additional locks
// on top.)
//
// An empty UsagePool is NOT safe to use; always call
// NewUsagePool() to make a new one.
type UsagePool struct {
// An empty UsagePoolOf is NOT safe to use; always call
// NewUsagePoolOf() to make a new one.
type UsagePoolOf[K comparable, V any] struct {
sync.RWMutex
pool map[any]*usagePoolVal
pool map[K]*usagePoolVal[V]
}

// NewUsagePool returns a new usage pool that is ready to use.
func NewUsagePool() *UsagePool {
return &UsagePool{
pool: make(map[any]*usagePoolVal),
// NewUsagePoolOf returns a new usage pool with comparable key type K and Destructor
// interface type V that is ready to use.
func NewUsagePoolOf[K comparable, V any]() *UsagePoolOf[K, V] {
return &UsagePoolOf[K, V]{
pool: make(map[K]*usagePoolVal[V]),
}
}

Expand All @@ -74,8 +75,8 @@ func NewUsagePool() *UsagePool {
// or constructed value is returned. The loaded return value is true
// if the value already existed and was loaded, or false if it was
// newly constructed.
func (up *UsagePool) LoadOrNew(key any, construct Constructor) (value any, loaded bool, err error) {
var upv *usagePoolVal
func (up *UsagePoolOf[K, V]) LoadOrNew(key K, construct Constructor) (value V, loaded bool, err error) {
var upv *usagePoolVal[V]
up.Lock()
upv, loaded = up.pool[key]
if loaded {
Expand All @@ -86,11 +87,12 @@ func (up *UsagePool) LoadOrNew(key any, construct Constructor) (value any, loade
err = upv.err
upv.RUnlock()
} else {
upv = &usagePoolVal{refs: 1}
upv = &usagePoolVal[V]{refs: 1}
upv.Lock()
up.pool[key] = upv
up.Unlock()
value, err = construct()
destructable, err := construct()
value = destructable.(V)
if err == nil {
upv.value = value
} else {
Expand All @@ -113,8 +115,8 @@ func (up *UsagePool) LoadOrNew(key any, construct Constructor) (value any, loade
// already exists, or stores it if it does not exist. It returns the
// value that was either loaded or stored, and true if the value already
// existed and was
func (up *UsagePool) LoadOrStore(key, val any) (value any, loaded bool) {
var upv *usagePoolVal
func (up *UsagePoolOf[K, V]) LoadOrStore(key K, val V) (value V, loaded bool) {
var upv *usagePoolVal[V]
up.Lock()
upv, loaded = up.pool[key]
if loaded {
Expand All @@ -129,7 +131,7 @@ func (up *UsagePool) LoadOrStore(key, val any) (value any, loaded bool) {
}
upv.Unlock()
} else {
upv = &usagePoolVal{refs: 1, value: val}
upv = &usagePoolVal[V]{refs: 1, value: val}
up.pool[key] = upv
up.Unlock()
value = val
Expand All @@ -144,7 +146,7 @@ func (up *UsagePool) LoadOrStore(key, val any) (value any, loaded bool) {
// This method is somewhat naive and acquires a read lock on the
// entire pool during iteration, so do your best to make f() really
// fast, m'kay?
func (up *UsagePool) Range(f func(key, value any) bool) {
func (up *UsagePoolOf[K, V]) Range(f func(key K, value V) bool) {
up.RLock()
defer up.RUnlock()
for key, upv := range up.pool {
Expand All @@ -166,7 +168,7 @@ func (up *UsagePool) Range(f func(key, value any) bool) {
// true if the usage count reached 0 and the value was deleted.
// It panics if the usage count drops below 0; always call
// Delete precisely as many times as LoadOrStore.
func (up *UsagePool) Delete(key any) (deleted bool, err error) {
func (up *UsagePoolOf[K, V]) Delete(key K) (deleted bool, err error) {
up.Lock()
upv, ok := up.pool[key]
if !ok {
Expand All @@ -180,7 +182,7 @@ func (up *UsagePool) Delete(key any) (deleted bool, err error) {
upv.RLock()
val := upv.value
upv.RUnlock()
if destructor, ok := val.(Destructor); ok {
if destructor, ok := any(val).(Destructor); ok {
err = destructor.Destruct()
}
deleted = true
Expand All @@ -196,13 +198,13 @@ func (up *UsagePool) Delete(key any) (deleted bool, err error) {

// References returns the number of references (count of usages) to a
// key in the pool, and true if the key exists, or false otherwise.
func (up *UsagePool) References(key any) (int, bool) {
func (up *UsagePoolOf[K, V]) References(key K) (int, bool) {
up.RLock()
upv, loaded := up.pool[key]
up.RUnlock()
if loaded {
// I wonder if it'd be safer to read this value during
// our lock on the UsagePool... guess we'll see...
// our lock on the UsagePoolOf... guess we'll see...
refs := atomic.LoadInt32(&upv.refs)
return int(refs), true
}
Expand All @@ -219,9 +221,19 @@ type Destructor interface {
Destruct() error
}

type usagePoolVal struct {
type usagePoolVal[V any] struct {
refs int32 // accessed atomically; must be 64-bit aligned for 32-bit systems
value any
value V
err error
sync.RWMutex
}

// UsagePool is DEPRECATED: Use UsagePoolOf instead. This type will likely be changed or removed in the future.
type UsagePool = UsagePoolOf[any, any]

// NewUsagePool is DEPRECATED: Use NewUsagePoolOf instead. This type will likely be changed or removed in the future.
func NewUsagePool() *UsagePool {
return &UsagePool{
pool: make(map[any]*usagePoolVal[any]),
}
}

0 comments on commit d31392e

Please sign in to comment.