Skip to content

Commit

Permalink
Merge #128400
Browse files Browse the repository at this point in the history
128400: sqlstats/insights: fix mem leaks on session close r=xinhaoz a=xinhaoz

insights: ensure releasing to Insight pool clears slice

Ensure that when we return objects into the Insight pool
that we release the statmeents in the slice. This fixes
an issue in the logic that returned this object to the pool
which did not nil the `Statements` slice, making it possible
for these slices to grow in capacity in the node's lifetime,
holding onto garbage Statement objects.

This is a lead up to #128199 which will likely remove
this pool and reuse the existing statmentBuf pool.

Epic: none
Release note: None

insights: add testing knobs

Add insights specific testing knobs. We'll add some
knobs in later commts.

Epic: none
Release note: None

sqlstats/insights: free memory allocated per session on session close

The insights locking registry buffers statement insight objects
by session id until receiving a transaction insight, when the
buffer is emptied. These buffers can leak if the session is closed
midway through a transaction since the registry will never
receive a transaction event for the session. This commit ensures
we clear any memory allocated in insights for a session by
sending an event to clear the container's session entry, if it
exists, on session close.

A testing knob was added, OnCloseSession, which is called
when the locking registry clears a session.

Epic: none
Fixes: #128213

Release note (bug fix): Fixes a memory leak where statement
insight objects may leak if the session is closed without
the transaction finishing.

insights: move insights testing knobs from sqlstats

Move some insights testing knobs that were on the
sqlstats testing knobs to insights pkg.

Epic: none

Release note: None

Co-authored-by: Xin Hao Zhang <[email protected]>
  • Loading branch information
craig[bot] and xinhaoz committed Aug 17, 2024
2 parents 446bc36 + 2da6863 commit 1993fc0
Show file tree
Hide file tree
Showing 21 changed files with 221 additions and 68 deletions.
1 change: 1 addition & 0 deletions pkg/base/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,5 @@ type TestingKnobs struct {
KeyVisualizer ModuleTestingKnobs
TenantCapabilitiesTestingKnobs ModuleTestingKnobs
TableStatsKnobs ModuleTestingKnobs
Insights ModuleTestingKnobs
}
6 changes: 6 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slprovider"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/insights"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilegecache"
Expand Down Expand Up @@ -1137,6 +1138,11 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
if externalConnKnobs := cfg.TestingKnobs.ExternalConnection; externalConnKnobs != nil {
execCfg.ExternalConnectionTestingKnobs = externalConnKnobs.(*externalconn.TestingKnobs)
}

if insightsKnobs := cfg.TestingKnobs.Insights; insightsKnobs != nil {
execCfg.InsightsTestingKnobs = insightsKnobs.(*insights.TestingKnobs)

}
var tableStatsTestingKnobs *stats.TableStatsTestingKnobs
if tableStatsKnobs := cfg.TestingKnobs.TableStatsKnobs; tableStatsKnobs != nil {
tableStatsTestingKnobs = tableStatsKnobs.(*stats.TableStatsTestingKnobs)
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ type ServerMetrics struct {
func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
metrics := makeMetrics(false /* internal */)
serverMetrics := makeServerMetrics(cfg)
insightsProvider := insights.New(cfg.Settings, serverMetrics.InsightsMetrics)
insightsProvider := insights.New(cfg.Settings, serverMetrics.InsightsMetrics, cfg.InsightsTestingKnobs)
// TODO(117690): Unify StmtStatsEnable and TxnStatsEnable into a single cluster setting.
sqlstats.TxnStatsEnable.SetOnChange(&cfg.Settings.SV, func(_ context.Context) {
if !sqlstats.TxnStatsEnable.Get(&cfg.Settings.SV) {
Expand Down Expand Up @@ -645,7 +645,7 @@ func (s *Server) GetIndexUsageStatsController() *idxusage.Controller {
return s.indexUsageStatsController
}

// GetInsightsReader returns the insights.Reader for the current sql.Server's
// GetInsightsReader returns the insights store for the current sql.Server's
// detected execution insights.
func (s *Server) GetInsightsReader() *insights.LockingStore {
return s.insights.Store()
Expand Down Expand Up @@ -1267,7 +1267,7 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {
}

// Free any memory used by the stats collector.
ex.statsCollector.Free(ctx)
ex.statsCollector.Close(ctx, ex.planner.extendedEvalCtx.SessionID)

var payloadErr error
if closeType == normalClose {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessionphase"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/insights"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilegecache"
Expand Down Expand Up @@ -1303,6 +1304,7 @@ type ExecutorConfig struct {
UnusedIndexRecommendationsKnobs *idxusage.UnusedIndexRecommendationTestingKnobs
ExternalConnectionTestingKnobs *externalconn.TestingKnobs
EventLogTestingKnobs *EventLogTestingKnobs
InsightsTestingKnobs *insights.TestingKnobs

// HistogramWindowInterval is (server.Config).HistogramWindowInterval.
HistogramWindowInterval time.Duration
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/sqlstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ go_library(
"//pkg/sql/execstats",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlstats/insights",
"//pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil",
"//pkg/util/log",
"//pkg/util/metric",
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sqlstats/insights/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"provider.go",
"registry.go",
"store.go",
"test_utils.go",
],
embed = [":insights_go_proto"],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/insights",
Expand Down
27 changes: 26 additions & 1 deletion pkg/sql/sqlstats/insights/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ type ConcurrentBufferIngester struct {
registry *lockingRegistry
clearRegistry uint32

closeCh chan struct{}
closeCh chan struct{}
testingKnobs *TestingKnobs
}

type eventBufChPayload struct {
Expand Down Expand Up @@ -145,6 +146,8 @@ func (i *ConcurrentBufferIngester) ingest(events *eventBuffer) {
i.registry.ObserveStatement(e.sessionID, e.statement)
} else if e.transaction != nil {
i.registry.ObserveTransaction(e.sessionID, e.transaction)
} else if e.sessionID != (clusterunique.ID{}) {
i.registry.clearSession(e.sessionID)
}
events[idx] = event{}
}
Expand All @@ -156,6 +159,12 @@ func (i *ConcurrentBufferIngester) ObserveStatement(
if !i.registry.enabled() {
return
}

if i.testingKnobs != nil && i.testingKnobs.InsightsWriterStmtInterceptor != nil {
i.testingKnobs.InsightsWriterStmtInterceptor(sessionID, statement)
return
}

i.guard.AtomicWrite(func(writerIdx int64) {
i.guard.eventBuffer[writerIdx] = event{
sessionID: sessionID,
Expand All @@ -170,6 +179,12 @@ func (i *ConcurrentBufferIngester) ObserveTransaction(
if !i.registry.enabled() {
return
}

if i.testingKnobs != nil && i.testingKnobs.InsightsWriterTxnInterceptor != nil {
i.testingKnobs.InsightsWriterTxnInterceptor(sessionID, transaction)
return
}

i.guard.AtomicWrite(func(writerIdx int64) {
i.guard.eventBuffer[writerIdx] = event{
sessionID: sessionID,
Expand All @@ -178,6 +193,16 @@ func (i *ConcurrentBufferIngester) ObserveTransaction(
})
}

// ClearSession sends a signal to the underlying registry to clear any cached
// data associated with the given sessionID. This is an async operation.
func (i *ConcurrentBufferIngester) ClearSession(sessionID clusterunique.ID) {
i.guard.AtomicWrite(func(writerIdx int64) {
i.guard.eventBuffer[writerIdx] = event{
sessionID: sessionID,
}
})
}

func newConcurrentBufferIngester(registry *lockingRegistry) *ConcurrentBufferIngester {
i := &ConcurrentBufferIngester{
// A channel size of 1 is sufficient to avoid unnecessarily
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/sqlstats/insights/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestIngester(t *testing.T) {
newRegistry(st, &fakeDetector{
stubEnabled: true,
stubIsSlow: true,
}, store),
}, store, nil),
)

ingester.Start(ctx, stopper)
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestIngester_Clear(t *testing.T) {
newRegistry(settings, &fakeDetector{
stubEnabled: true,
stubIsSlow: true,
}, store))
}, store, nil))

// Fill the ingester's buffer with some data. This sets us up to
// call Clear() with guaranteed data in the buffer, so we can assert
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestIngester_Disabled(t *testing.T) {
// the underlying registry is currently disabled.
st := cluster.MakeTestingClusterSettings()

ingester := newConcurrentBufferIngester(newRegistry(st, &fakeDetector{}, newStore(st)))
ingester := newConcurrentBufferIngester(newRegistry(st, &fakeDetector{}, newStore(st), nil))
ingester.ObserveStatement(clusterunique.ID{}, &Statement{})
ingester.ObserveTransaction(clusterunique.ID{}, &Transaction{})
require.Equal(t, event{}, ingester.guard.eventBuffer[0])
Expand All @@ -200,7 +200,7 @@ func TestIngester_DoesNotBlockWhenReceivingManyObservationsAfterShutdown(t *test
defer stopper.Stop(ctx)

st := cluster.MakeTestingClusterSettings()
registry := newRegistry(st, &fakeDetector{stubEnabled: true}, newStore(st))
registry := newRegistry(st, &fakeDetector{stubEnabled: true}, newStore(st), nil)
ingester := newConcurrentBufferIngester(registry)
ingester.Start(ctx, stopper)

Expand Down Expand Up @@ -259,7 +259,7 @@ func TestIngesterBlockedForceSync(t *testing.T) {
defer stopper.Stop(ctx)

st := cluster.MakeTestingClusterSettings()
registry := newRegistry(st, &fakeDetector{stubEnabled: true}, newStore(st))
registry := newRegistry(st, &fakeDetector{stubEnabled: true}, newStore(st), nil)
ingester := newConcurrentBufferIngester(registry)

// We queue up a bunch of sync operations because it's unclear how
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/sqlstats/insights/insights.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ type PercentileValues struct {
}

// New builds a new Provider.
func New(st *cluster.Settings, metrics Metrics) *Provider {
func New(st *cluster.Settings, metrics Metrics, knobs *TestingKnobs) *Provider {
store := newStore(st)
anomalyDetector := newAnomalyDetector(st, metrics)

Expand All @@ -148,7 +148,7 @@ func New(st *cluster.Settings, metrics Metrics) *Provider {
newRegistry(st, &compositeDetector{detectors: []detector{
&latencyThresholdDetector{st: st},
anomalyDetector,
}}, store),
}}, store, knobs),
),
anomalyDetector: anomalyDetector,
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/sqlstats/insights/insights_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func BenchmarkInsights(b *testing.B) {
// down, guiding us as we tune buffer sizes, etc.
for _, numSessions := range []int{1, 10, 100, 1000, 10000} {
b.Run(fmt.Sprintf("numSessions=%d", numSessions), func(b *testing.B) {
provider := insights.New(settings, insights.NewMetrics())
provider := insights.New(settings, insights.NewMetrics(), nil)
provider.Start(ctx, stopper)

// Spread the b.N work across the simulated SQL sessions, so that we
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sqlstats/insights/integration/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_test(
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/appstatspb",
"//pkg/sql/clusterunique",
"//pkg/sql/contention",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlstats/insights",
Expand Down
40 changes: 40 additions & 0 deletions pkg/sql/sqlstats/insights/integration/insights_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
"github.com/cockroachdb/cockroach/pkg/sql/contention"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/insights"
Expand Down Expand Up @@ -957,3 +958,42 @@ func TestInsightsIndexRecommendationIntegration(t *testing.T) {
return nil
}, 1*time.Second)
}

// TestInsightsClearsPerSessionMemory ensures that memory allocated
// for a session is freed when that session is closed.
func TestInsightsClearsPerSessionMemory(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
sessionClosedCh := make(chan struct{})
clearedSessionID := clusterunique.ID{}
ts := serverutils.StartServerOnly(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Insights: &insights.TestingKnobs{
OnSessionClear: func(sessionID clusterunique.ID) {
defer close(sessionClosedCh)
clearedSessionID = sessionID
},
},
},
})
defer ts.Stopper().Stop(ctx)
s := ts.ApplicationLayer()
conn1 := sqlutils.MakeSQLRunner(s.SQLConn(t))
conn2 := sqlutils.MakeSQLRunner(s.SQLConn(t))

var sessionID1 string
conn1.QueryRow(t, "SHOW session_id").Scan(&sessionID1)

// Start a transaction and cancel the session - ensure that the memory is freed.
conn1.Exec(t, "BEGIN")
for i := 0; i < 5; i++ {
conn1.Exec(t, "SELECT 1")
}

conn2.Exec(t, "CANCEL SESSION $1", sessionID1)

<-sessionClosedCh
require.Equal(t, clearedSessionID.String(), sessionID1)
}
5 changes: 5 additions & 0 deletions pkg/sql/sqlstats/insights/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
)

// TODO (xinhaoz): Remove this pool (#128199). The insights object
// can use the existing statementBuf pool for the statements slice.
var insightPool = sync.Pool{
New: func() interface{} {
return new(Insight)
Expand All @@ -32,6 +34,9 @@ func makeInsight(sessionID clusterunique.ID, transaction *Transaction) *Insight
}

func releaseInsight(insight *Insight) {
for i := range insight.Statements {
insight.Statements[i] = nil
}
insight.Statements = insight.Statements[:0]
*insight = Insight{Statements: insight.Statements}
insightPool.Put(insight)
Expand Down
34 changes: 25 additions & 9 deletions pkg/sql/sqlstats/insights/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (
// statement execution to determine which statements are outliers and
// writes insights into the provided sink.
type lockingRegistry struct {
statements map[clusterunique.ID]*statementBuf
detector detector
causes *causes
store *LockingStore
statements map[clusterunique.ID]*statementBuf
detector detector
causes *causes
store *LockingStore
testingKnobs *TestingKnobs
}

func (r *lockingRegistry) Clear() {
Expand Down Expand Up @@ -185,6 +186,18 @@ func (r *lockingRegistry) ObserveTransaction(sessionID clusterunique.ID, transac
r.store.addInsight(insight)
}

// clearSession removes the session from the registry and releases the
// associated statement buffer.
func (r *lockingRegistry) clearSession(sessionID clusterunique.ID) {
if b, ok := r.statements[sessionID]; ok {
delete(r.statements, sessionID)
b.release()
if r.testingKnobs != nil && r.testingKnobs.OnSessionClear != nil {
r.testingKnobs.OnSessionClear(sessionID)
}
}
}

// TODO(todd):
//
// Once we can handle sufficient throughput to live on the hot
Expand All @@ -195,11 +208,14 @@ func (r *lockingRegistry) enabled() bool {
return r.detector.enabled()
}

func newRegistry(st *cluster.Settings, detector detector, store *LockingStore) *lockingRegistry {
func newRegistry(
st *cluster.Settings, detector detector, store *LockingStore, knobs *TestingKnobs,
) *lockingRegistry {
return &lockingRegistry{
statements: make(map[clusterunique.ID]*statementBuf),
detector: detector,
causes: &causes{st: st},
store: store,
statements: make(map[clusterunique.ID]*statementBuf),
detector: detector,
causes: &causes{st: st},
store: store,
testingKnobs: knobs,
}
}
Loading

0 comments on commit 1993fc0

Please sign in to comment.