From 550706700fec344e4d4030c67bb08d5bfb751d18 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 12 Nov 2024 14:16:35 -0800 Subject: [PATCH] sql: disable gossip-based physical planning by default Now that we improved handling of draining procedure in the virtual clusters in 24.3 timeframe, I believe it's time to begin fully deprecating the gossip-based physical planning, and as the first step this commit introduces a cluster setting to control which method is used in single-tenant deployments (instance-based planning being the default). I plan to have the setting as an escape hatch in case we find problems when rolling out this change and to remove the setting altogether after 25.2. One of the differences between two planning methods is slightly adjusted in this commit. In particular, in the instance-based planning we stitch together consecutive spans whenever `EnsureSafeSplitKey` for the keys in the spans returns the same prefix. The idea is that spans corresponding to different column families of the same SQL row must be placed on the same instance. However, the condition on the length of the keys returned by `EnsureSafeSplitKey` is too broad and captures more cases than needed (i.e. two keys that cannot be part of the same SQL row end up in the same `SpanPartition`). To partially alleviate this difference, this commit introduces "boundary granularity" knob which indicates whether consecutive spans _might_ be part of the same SQL row, and we now use the stitching logic only if so. All callers of `PartitionSpans` have been audited, and we need the stitching logic only in two call sites that correspond to planning of table readers. Longer term, if we change the encoding so that column family IDs are encoded in a special way, we'll be able to tell definitively whether stitching is needed simply based on the key itself, and this additional logic could be removed. Epic: None Release note: None --- .../backupccl/backup_processor_planning.go | 4 +- pkg/ccl/changefeedccl/changefeed_dist.go | 2 +- .../crosscluster/producer/stream_lifetime.go | 2 +- pkg/ccl/revertccl/revert.go | 2 +- pkg/sql/distsql_physical_planner.go | 118 +++++++++---- pkg/sql/distsql_physical_planner_test.go | 163 ++++++++++-------- pkg/sql/distsql_plan_backfill.go | 4 +- pkg/sql/fingerprint_span.go | 2 +- pkg/sql/ttl/ttljob/ttljob.go | 2 +- 9 files changed, 187 insertions(+), 112 deletions(-) diff --git a/pkg/ccl/backupccl/backup_processor_planning.go b/pkg/ccl/backupccl/backup_processor_planning.go index 57927dc0690c..b1b779fe3b5d 100644 --- a/pkg/ccl/backupccl/backup_processor_planning.go +++ b/pkg/ccl/backupccl/backup_processor_planning.go @@ -54,13 +54,13 @@ func distBackupPlanSpecs( var introducedSpanPartitions []sql.SpanPartition var err error if len(spans) > 0 { - spanPartitions, err = dsp.PartitionSpans(ctx, planCtx, spans) + spanPartitions, err = dsp.PartitionSpans(ctx, planCtx, spans, sql.PartitionSpansBoundDefault) if err != nil { return nil, err } } if len(introducedSpans) > 0 { - introducedSpanPartitions, err = dsp.PartitionSpans(ctx, planCtx, introducedSpans) + introducedSpanPartitions, err = dsp.PartitionSpans(ctx, planCtx, introducedSpans, sql.PartitionSpansBoundDefault) if err != nil { return nil, err } diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index b7d6ad479021..774fecb1a569 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -405,7 +405,7 @@ func makePlan( } planCtx := dsp.NewPlanningCtxWithOracle(ctx, execCtx.ExtendedEvalContext(), nil, /* planner */ blankTxn, sql.DistributionType(distMode), oracle, locFilter) - spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, trackedSpans) + spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, trackedSpans, sql.PartitionSpansBoundDefault) if err != nil { return nil, nil, err } diff --git a/pkg/ccl/crosscluster/producer/stream_lifetime.go b/pkg/ccl/crosscluster/producer/stream_lifetime.go index 2252c85937c5..63a0a75c36c5 100644 --- a/pkg/ccl/crosscluster/producer/stream_lifetime.go +++ b/pkg/ccl/crosscluster/producer/stream_lifetime.go @@ -305,7 +305,7 @@ func buildReplicationStreamSpec( ctx, jobExecCtx.ExtendedEvalContext(), nil /* planner */, nil /* txn */, sql.FullDistribution, oracle, noLoc, ) - spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, targetSpans) + spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, targetSpans, sql.PartitionSpansBoundDefault) if err != nil { return nil, err } diff --git a/pkg/ccl/revertccl/revert.go b/pkg/ccl/revertccl/revert.go index 8166b3192789..3253e1ef73aa 100644 --- a/pkg/ccl/revertccl/revert.go +++ b/pkg/ccl/revertccl/revert.go @@ -60,7 +60,7 @@ func RevertSpansFanout( return err } - spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, spans) + spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, spans, sql.PartitionSpansBoundDefault) if err != nil { return err } diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 7f0ccf565c3e..4ddf751f8dd6 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -823,7 +823,7 @@ const ( type spanPartitionState struct { // partitionSpanDecisions is a mapping from a SpanPartitionReason to the number of // times we have picked an instance for that reason. - partitionSpanDecisions [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int + partitionSpanDecisions [SpanPartitionReasonMax]int // partitionSpans is a mapping from a SQLInstanceID to the number of // partition spans that have been assigned to that node. @@ -1187,6 +1187,8 @@ const ( // eligible but overloaded with other partitions. In this case we pick a // random instance apart from the gateway. SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + // SpanPartitionReasonMax tracks the number of SpanPartitionReason objects. + SpanPartitionReasonMax ) func (r SpanPartitionReason) String() string { @@ -1331,6 +1333,23 @@ func (dsp *DistSQLPlanner) checkInstanceHealthAndVersionSystem( return status } +// PartitionSpansBoundGranularity indicates how "granular" boundaries between +// spans passed to PartitionSpans are. +type PartitionSpansBoundGranularity int + +const ( + // PartitionSpansBoundDefault indicates that consecutive spans passed to + // PartitionSpans can always be processed on different instances. This is + // the case when the spans boundaries are at SQL row level or higher (index, + // table, tenant, etc). + PartitionSpansBoundDefault PartitionSpansBoundGranularity = iota + // PartitionSpansBoundCFWithinRow indicates that consecutive spans passed to + // PartitionSpans _might_ be part of the same SQL row (when they correspond + // to different column families within a single row), and in that case those + // spans need to be assigned the same instance. + PartitionSpansBoundCFWithinRow +) + // PartitionSpans finds out which nodes are owners for ranges touching the // given spans, and splits the spans according to owning nodes. The result is a // set of SpanPartitions (guaranteed one for each relevant node), which form a @@ -1338,12 +1357,14 @@ func (dsp *DistSQLPlanner) checkInstanceHealthAndVersionSystem( // exactly the original set of spans). // // PartitionSpans does its best to not assign ranges on nodes that are known to -// either be unhealthy or running an incompatible version. The ranges owned by -// such nodes are assigned to the gateway. +// be unhealthy. The ranges owned by such nodes are assigned to the gateway. func (dsp *DistSQLPlanner) PartitionSpans( - ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans, + ctx context.Context, + planCtx *PlanningCtx, + spans roachpb.Spans, + bound PartitionSpansBoundGranularity, ) ([]SpanPartition, error) { - partitions, _, err := dsp.partitionSpansEx(ctx, planCtx, spans) + partitions, _, err := dsp.partitionSpansEx(ctx, planCtx, spans, bound) return partitions, err } @@ -1351,7 +1372,10 @@ func (dsp *DistSQLPlanner) PartitionSpans( // boolean indicating whether the misplanned ranges metadata should not be // generated. func (dsp *DistSQLPlanner) partitionSpansEx( - ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans, + ctx context.Context, + planCtx *PlanningCtx, + spans roachpb.Spans, + bound PartitionSpansBoundGranularity, ) (_ []SpanPartition, ignoreMisplannedRanges bool, _ error) { if len(spans) == 0 { return nil, false, errors.AssertionFailedf("no spans") @@ -1367,7 +1391,7 @@ func (dsp *DistSQLPlanner) partitionSpansEx( if dsp.useGossipPlanning(ctx, planCtx) { return dsp.deprecatedPartitionSpansSystem(ctx, planCtx, spans) } - return dsp.partitionSpans(ctx, planCtx, spans) + return dsp.partitionSpans(ctx, planCtx, spans, bound) } // partitionSpan takes a single span and splits it up according to the owning @@ -1516,7 +1540,10 @@ func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem( // available SQL instances if the locality info is available on at least some of // the instances, and it falls back to naive round-robin assignment if not. func (dsp *DistSQLPlanner) partitionSpans( - ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans, + ctx context.Context, + planCtx *PlanningCtx, + spans roachpb.Spans, + bound PartitionSpansBoundGranularity, ) (partitions []SpanPartition, ignoreMisplannedRanges bool, _ error) { resolver, err := dsp.makeInstanceResolver(ctx, planCtx) if err != nil { @@ -1526,19 +1553,33 @@ func (dsp *DistSQLPlanner) partitionSpans( var lastKey roachpb.Key var lastPartitionIdx int for _, span := range spans { - // Rows with column families may have been split into different spans. - // These spans should be assigned the same pod so that the pod can - // stitch together the rows correctly. Split rows are in adjacent spans. - if safeKey, err := keys.EnsureSafeSplitKey(span.Key); err == nil && len(safeKey) > 0 { - if safeKey.Equal(lastKey) { - if log.V(1) { - log.Infof(ctx, "partitioning span %s", span) + if bound == PartitionSpansBoundCFWithinRow { + // Rows with column families may have been split into different + // spans. These spans should be assigned the same pod so that the + // pod can stitch together the rows correctly. Split rows are in + // adjacent spans. + // TODO(112702): here we can stitch together spans in more cases + // then necessary because we currently can't distinguish some valid + // keys corresponding to a full SQL row (i.e. single CF case) vs + // those that are part of a SQL row (i.e. multiple CFs case). If we + // address #112702, we'll be able to make the check here precise. + if safeKey, err := keys.EnsureSafeSplitKey(span.Key); err == nil && len(safeKey) > 0 { + if safeKey.Equal(lastKey) { + if log.V(1) { + log.Infof(ctx, "stitching span %s into the previous span partition", span) + } + // TODO(yuzefovich): we're not updating + // SpanPartition.numRanges as well as spanPartitionState + // here. At the moment, that info is only used with + // PartitionSpansBoundDefault, which makes this code + // unreachable. If that ever changes, we'll need to be + // smarter here. + partition := &partitions[lastPartitionIdx] + partition.Spans = append(partition.Spans, span) + continue } - partition := &partitions[lastPartitionIdx] - partition.Spans = append(partition.Spans, span) - continue + lastKey = safeKey } - lastKey = safeKey } partitions, lastPartitionIdx, err = dsp.partitionSpan( ctx, planCtx, span, partitions, nodeMap, resolver, &ignoreMisplannedRanges, @@ -1559,9 +1600,8 @@ func (dsp *DistSQLPlanner) deprecatedHealthySQLInstanceIDForKVNodeIDSystem( ) (base.SQLInstanceID, SpanPartitionReason) { sqlInstanceID := base.SQLInstanceID(nodeID) status := dsp.checkInstanceHealthAndVersionSystem(ctx, planCtx, sqlInstanceID) - // If the node is unhealthy or its DistSQL version is incompatible, use the - // gateway to process this span instead of the unhealthy host. An empty - // address indicates an unhealthy host. + // If the node is unhealthy, use the gateway to process this span instead of + // the unhealthy host. An empty address indicates an unhealthy host. reason := SpanPartitionReason_GOSSIP_TARGET_HEALTHY if status != NodeOK { log.VEventf(ctx, 2, "not planning on node %d: %s", sqlInstanceID, status) @@ -1922,8 +1962,8 @@ func ClosestInstances[instance InstanceLocalityGetter]( // getInstanceIDForScan retrieves the SQL Instance ID where the single table // reader should reside for a limited scan. Ideally this is the lease holder for -// the first range in the specified spans. But if that node is unhealthy or -// incompatible, we use the gateway node instead. +// the first range in the specified spans. But if that node is unhealthy, we use +// the gateway node instead. func (dsp *DistSQLPlanner) getInstanceIDForScan( ctx context.Context, planCtx *PlanningCtx, spans []roachpb.Span, reverse bool, ) (base.SQLInstanceID, error) { @@ -1950,7 +1990,7 @@ func (dsp *DistSQLPlanner) getInstanceIDForScan( return 0, err } - if dsp.useGossipPlanning(ctx, planCtx) && planCtx.localityFilter.Empty() { + if dsp.useGossipPlanning(ctx, planCtx) { sqlInstanceID, _ := dsp.deprecatedHealthySQLInstanceIDForKVNodeIDSystem(ctx, planCtx, replDesc.NodeID) return sqlInstanceID, nil } @@ -1963,9 +2003,21 @@ func (dsp *DistSQLPlanner) getInstanceIDForScan( return sqlInstanceID, nil } +// TODO(yuzefovich): retire this setting altogether in 25.3 release. +var useGossipPlanning = settings.RegisterBoolSetting( + settings.ApplicationLevel, + "sql.distsql_planning.use_gossip.enabled", + "if enabled, the DistSQL physical planner falls back to gossip-based planning", + false, +) + func (dsp *DistSQLPlanner) useGossipPlanning(_ context.Context, planCtx *PlanningCtx) bool { - // TODO(dt): enable this by default, e.g. // && !dsp.distSQLSrv.Settings.Version.IsActive(ctx, clusterversion.V23_1) - return dsp.codec.ForSystemTenant() && planCtx.localityFilter.Empty() + var gossipPlanningEnabled bool + // Some of the planCtx fields can be left unset in tests. + if planCtx.ExtendedEvalCtx != nil && planCtx.ExtendedEvalCtx.Settings != nil { + gossipPlanningEnabled = useGossipPlanning.Get(&planCtx.ExtendedEvalCtx.Settings.SV) + } + return dsp.codec.ForSystemTenant() && planCtx.localityFilter.Empty() && gossipPlanningEnabled } // convertOrdering maps the columns in props.ordering to the output columns of a @@ -2122,8 +2174,12 @@ func (dsp *DistSQLPlanner) maybeParallelizeLocalScans( // Temporarily unset isLocal so that PartitionSpans divides all spans // according to the respective leaseholders. planCtx.isLocal = false + bound := PartitionSpansBoundDefault + if info.desc.NumFamilies() > 1 { + bound = PartitionSpansBoundCFWithinRow + } var err error - spanPartitions, err = dsp.PartitionSpans(ctx, planCtx, info.spans) + spanPartitions, err = dsp.PartitionSpans(ctx, planCtx, info.spans, bound) planCtx.isLocal = true if err != nil { // For some reason we couldn't partition the spans - fallback to @@ -2213,7 +2269,11 @@ func (dsp *DistSQLPlanner) planTableReaders( // still read too eagerly in the soft limit case. To prevent this we'll // need a new mechanism on the execution side to modulate table reads. // TODO(yuzefovich): add that mechanism. - spanPartitions, ignoreMisplannedRanges, err = dsp.partitionSpansEx(ctx, planCtx, info.spans) + bound := PartitionSpansBoundDefault + if info.desc.NumFamilies() > 1 { + bound = PartitionSpansBoundCFWithinRow + } + spanPartitions, ignoreMisplannedRanges, err = dsp.partitionSpansEx(ctx, planCtx, info.spans, bound) if err != nil { return err } diff --git a/pkg/sql/distsql_physical_planner_test.go b/pkg/sql/distsql_physical_planner_test.go index 83030fe4043a..7b0267c09bf5 100644 --- a/pkg/sql/distsql_physical_planner_test.go +++ b/pkg/sql/distsql_physical_planner_test.go @@ -937,6 +937,7 @@ func TestPartitionSpans(t *testing.T) { partitionStates []string partitionState spanPartitionState }{ + // 0 { ranges: []testSpanResolverRange{{"A", 1}, {"B", 2}, {"C", 1}, {"D", 3}}, gatewayNode: 1, @@ -950,10 +951,10 @@ func TestPartitionSpans(t *testing.T) { }, partitionStates: []string{ - "partition span: {A1-B}, instance ID: 1, reason: gossip-target-healthy", - "partition span: {B-C}, instance ID: 2, reason: gossip-target-healthy", - "partition span: C{-1}, instance ID: 1, reason: gossip-target-healthy", - "partition span: {D1-X}, instance ID: 3, reason: gossip-target-healthy", + "partition span: {A1-B}, instance ID: 1, reason: target-healthy", + "partition span: {B-C}, instance ID: 2, reason: target-healthy", + "partition span: C{-1}, instance ID: 1, reason: target-healthy", + "partition span: {D1-X}, instance ID: 3, reason: target-healthy", }, partitionState: spanPartitionState{ @@ -962,13 +963,14 @@ func TestPartitionSpans(t *testing.T) { 2: 1, 3: 1, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ - SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 4, + partitionSpanDecisions: [SpanPartitionReasonMax]int{ + SpanPartitionReason_TARGET_HEALTHY: 4, }, totalPartitionSpans: 4, }, }, + // 1 { ranges: []testSpanResolverRange{{"A", 1}, {"B", 2}, {"C", 1}, {"D", 3}}, deadNodes: []int{1}, // The health status of the gateway node shouldn't matter. @@ -983,10 +985,10 @@ func TestPartitionSpans(t *testing.T) { }, partitionStates: []string{ - "partition span: {A1-B}, instance ID: 1, reason: gossip-target-healthy", - "partition span: {B-C}, instance ID: 2, reason: gossip-target-healthy", - "partition span: C{-1}, instance ID: 1, reason: gossip-target-healthy", - "partition span: {D1-X}, instance ID: 3, reason: gossip-target-healthy", + "partition span: {A1-B}, instance ID: 1, reason: target-healthy", + "partition span: {B-C}, instance ID: 2, reason: target-healthy", + "partition span: C{-1}, instance ID: 1, reason: target-healthy", + "partition span: {D1-X}, instance ID: 3, reason: target-healthy", }, partitionState: spanPartitionState{ @@ -995,13 +997,14 @@ func TestPartitionSpans(t *testing.T) { 2: 1, 3: 1, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ - SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 4, + partitionSpanDecisions: [SpanPartitionReasonMax]int{ + SpanPartitionReason_TARGET_HEALTHY: 4, }, totalPartitionSpans: 4, }, }, + // 2 { ranges: []testSpanResolverRange{{"A", 1}, {"B", 2}, {"C", 1}, {"D", 3}}, deadNodes: []int{2}, @@ -1015,10 +1018,10 @@ func TestPartitionSpans(t *testing.T) { }, partitionStates: []string{ - "partition span: {A1-B}, instance ID: 1, reason: gossip-target-healthy", - "partition span: {B-C}, instance ID: 1, reason: gossip-gateway-target-unhealthy", - "partition span: C{-1}, instance ID: 1, reason: gossip-target-healthy", - "partition span: {D1-X}, instance ID: 3, reason: gossip-target-healthy", + "partition span: {A1-B}, instance ID: 1, reason: target-healthy", + "partition span: {B-C}, instance ID: 1, reason: gateway-target-unhealthy", + "partition span: C{-1}, instance ID: 1, reason: target-healthy", + "partition span: {D1-X}, instance ID: 3, reason: target-healthy", }, partitionState: spanPartitionState{ @@ -1026,14 +1029,15 @@ func TestPartitionSpans(t *testing.T) { 1: 3, 3: 1, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ - SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 3, - SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY: 1, + partitionSpanDecisions: [SpanPartitionReasonMax]int{ + SpanPartitionReason_TARGET_HEALTHY: 3, + SpanPartitionReason_GATEWAY_TARGET_UNHEALTHY: 1, }, totalPartitionSpans: 4, }, }, + // 3 { ranges: []testSpanResolverRange{{"A", 1}, {"B", 2}, {"C", 1}, {"D", 3}}, deadNodes: []int{3}, @@ -1047,10 +1051,10 @@ func TestPartitionSpans(t *testing.T) { }, partitionStates: []string{ - "partition span: {A1-B}, instance ID: 1, reason: gossip-target-healthy", - "partition span: {B-C}, instance ID: 2, reason: gossip-target-healthy", - "partition span: C{-1}, instance ID: 1, reason: gossip-target-healthy", - "partition span: {D1-X}, instance ID: 1, reason: gossip-gateway-target-unhealthy", + "partition span: {A1-B}, instance ID: 1, reason: target-healthy", + "partition span: {B-C}, instance ID: 2, reason: target-healthy", + "partition span: C{-1}, instance ID: 1, reason: target-healthy", + "partition span: {D1-X}, instance ID: 1, reason: gateway-target-unhealthy", }, partitionState: spanPartitionState{ @@ -1058,14 +1062,15 @@ func TestPartitionSpans(t *testing.T) { 1: 3, 2: 1, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ - SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 3, - SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY: 1, + partitionSpanDecisions: [SpanPartitionReasonMax]int{ + SpanPartitionReason_TARGET_HEALTHY: 3, + SpanPartitionReason_GATEWAY_TARGET_UNHEALTHY: 1, }, totalPartitionSpans: 4, }, }, + // 4 { ranges: []testSpanResolverRange{{"A", 1}, {"B", 2}, {"C", 1}, {"D", 3}}, deadNodes: []int{1}, @@ -1079,10 +1084,10 @@ func TestPartitionSpans(t *testing.T) { }, partitionStates: []string{ - "partition span: {A1-B}, instance ID: 2, reason: gossip-gateway-target-unhealthy", - "partition span: {B-C}, instance ID: 2, reason: gossip-target-healthy", - "partition span: C{-1}, instance ID: 2, reason: gossip-gateway-target-unhealthy", - "partition span: {D1-X}, instance ID: 3, reason: gossip-target-healthy", + "partition span: {A1-B}, instance ID: 2, reason: gateway-target-unhealthy", + "partition span: {B-C}, instance ID: 2, reason: target-healthy", + "partition span: C{-1}, instance ID: 2, reason: gateway-target-unhealthy", + "partition span: {D1-X}, instance ID: 3, reason: target-healthy", }, partitionState: spanPartitionState{ @@ -1090,14 +1095,15 @@ func TestPartitionSpans(t *testing.T) { 2: 3, 3: 1, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ - SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 2, - SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY: 2, + partitionSpanDecisions: [SpanPartitionReasonMax]int{ + SpanPartitionReason_TARGET_HEALTHY: 2, + SpanPartitionReason_GATEWAY_TARGET_UNHEALTHY: 2, }, totalPartitionSpans: 4, }, }, + // 5 { ranges: []testSpanResolverRange{{"A", 1}, {"B", 2}, {"C", 1}, {"D", 3}}, deadNodes: []int{1}, @@ -1111,10 +1117,10 @@ func TestPartitionSpans(t *testing.T) { }, partitionStates: []string{ - "partition span: {A1-B}, instance ID: 3, reason: gossip-gateway-target-unhealthy", - "partition span: {B-C}, instance ID: 2, reason: gossip-target-healthy", - "partition span: C{-1}, instance ID: 3, reason: gossip-gateway-target-unhealthy", - "partition span: {D1-X}, instance ID: 3, reason: gossip-target-healthy", + "partition span: {A1-B}, instance ID: 3, reason: gateway-target-unhealthy", + "partition span: {B-C}, instance ID: 2, reason: target-healthy", + "partition span: C{-1}, instance ID: 3, reason: gateway-target-unhealthy", + "partition span: {D1-X}, instance ID: 3, reason: target-healthy", }, partitionState: spanPartitionState{ @@ -1122,15 +1128,15 @@ func TestPartitionSpans(t *testing.T) { 2: 1, 3: 3, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ - SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 2, - SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY: 2, + partitionSpanDecisions: [SpanPartitionReasonMax]int{ + SpanPartitionReason_TARGET_HEALTHY: 2, + SpanPartitionReason_GATEWAY_TARGET_UNHEALTHY: 2, }, totalPartitionSpans: 4, }, }, - // Test point lookups in isolation. + // 6: Test point lookups in isolation. { ranges: []testSpanResolverRange{{"A", 1}, {"B", 2}}, gatewayNode: 1, @@ -1143,9 +1149,9 @@ func TestPartitionSpans(t *testing.T) { }, partitionStates: []string{ - "partition span: A2, instance ID: 1, reason: gossip-target-healthy", - "partition span: A1, instance ID: 1, reason: gossip-target-healthy", - "partition span: B1, instance ID: 2, reason: gossip-target-healthy", + "partition span: A2, instance ID: 1, reason: target-healthy", + "partition span: A1, instance ID: 1, reason: target-healthy", + "partition span: B1, instance ID: 2, reason: target-healthy", }, partitionState: spanPartitionState{ @@ -1153,14 +1159,14 @@ func TestPartitionSpans(t *testing.T) { 1: 2, 2: 1, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ - SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 3, + partitionSpanDecisions: [SpanPartitionReasonMax]int{ + SpanPartitionReason_TARGET_HEALTHY: 3, }, totalPartitionSpans: 3, }, }, - // Test point lookups intertwined with span scans. + // 7: Test point lookups intertwined with span scans. { ranges: []testSpanResolverRange{{"A", 1}, {"B", 1}, {"C", 2}}, gatewayNode: 1, @@ -1173,16 +1179,16 @@ func TestPartitionSpans(t *testing.T) { }, partitionStates: []string{ - "partition span: A1, instance ID: 1, reason: gossip-target-healthy", - "partition span: A{1-2}, instance ID: 1, reason: gossip-target-healthy", - "partition span: A2, instance ID: 1, reason: gossip-target-healthy", - "partition span: {A2-B}, instance ID: 1, reason: gossip-target-healthy", - "partition span: {B-C}, instance ID: 1, reason: gossip-target-healthy", - "partition span: C{-2}, instance ID: 2, reason: gossip-target-healthy", - "partition span: B1, instance ID: 1, reason: gossip-target-healthy", - "partition span: {A3-B}, instance ID: 1, reason: gossip-target-healthy", - "partition span: B{-3}, instance ID: 1, reason: gossip-target-healthy", - "partition span: B2, instance ID: 1, reason: gossip-target-healthy", + "partition span: A1, instance ID: 1, reason: target-healthy", + "partition span: A{1-2}, instance ID: 1, reason: target-healthy", + "partition span: A2, instance ID: 1, reason: target-healthy", + "partition span: {A2-B}, instance ID: 1, reason: target-healthy", + "partition span: {B-C}, instance ID: 1, reason: target-healthy", + "partition span: C{-2}, instance ID: 2, reason: target-healthy", + "partition span: B1, instance ID: 1, reason: target-healthy", + "partition span: {A3-B}, instance ID: 1, reason: target-healthy", + "partition span: B{-3}, instance ID: 1, reason: target-healthy", + "partition span: B2, instance ID: 1, reason: target-healthy", }, partitionState: spanPartitionState{ @@ -1190,14 +1196,14 @@ func TestPartitionSpans(t *testing.T) { 1: 9, 2: 1, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ - SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 10, + partitionSpanDecisions: [SpanPartitionReasonMax]int{ + SpanPartitionReason_TARGET_HEALTHY: 10, }, totalPartitionSpans: 10, }, }, - // A single span touching multiple ranges but on the same node results + // 8: A single span touching multiple ranges but on the same node results // in a single partitioned span. { ranges: []testSpanResolverRange{{"A", 1}, {"A1", 1}, {"B", 2}}, @@ -1210,21 +1216,22 @@ func TestPartitionSpans(t *testing.T) { }, partitionStates: []string{ - "partition span: A{-1}, instance ID: 1, reason: gossip-target-healthy", - "partition span: {A1-B}, instance ID: 1, reason: gossip-target-healthy", + "partition span: A{-1}, instance ID: 1, reason: target-healthy", + "partition span: {A1-B}, instance ID: 1, reason: target-healthy", }, partitionState: spanPartitionState{ partitionSpans: map[base.SQLInstanceID]int{ 1: 2, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ - SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 2, + partitionSpanDecisions: [SpanPartitionReasonMax]int{ + SpanPartitionReason_TARGET_HEALTHY: 2, }, totalPartitionSpans: 2, }, }, - // Test some locality-filtered planning too. + + // 9: Test some locality-filtered planning too. // // Since this test is run on a system tenant but there is a locality filter, // the spans are resolved in a mixed process mode. As a result, the @@ -1253,7 +1260,7 @@ func TestPartitionSpans(t *testing.T) { 1: 2, 2: 2, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + partitionSpanDecisions: [SpanPartitionReasonMax]int{ SpanPartitionReason_TARGET_HEALTHY: 3, SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED: 1, }, @@ -1263,6 +1270,8 @@ func TestPartitionSpans(t *testing.T) { totalPartitionSpans: 4, }, }, + + // 10 { ranges: []testSpanResolverRange{{"A", 1}, {"B", 2}, {"C", 1}, {"D", 3}}, gatewayNode: 1, @@ -1286,13 +1295,15 @@ func TestPartitionSpans(t *testing.T) { 2: 3, 4: 1, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + partitionSpanDecisions: [SpanPartitionReasonMax]int{ SpanPartitionReason_TARGET_HEALTHY: 1, SpanPartitionReason_CLOSEST_LOCALITY_MATCH: 3, }, totalPartitionSpans: 4, }, }, + + // 11 { ranges: []testSpanResolverRange{{"A", 1}, {"B", 2}, {"C", 1}, {"D", 3}}, gatewayNode: 7, @@ -1316,7 +1327,7 @@ func TestPartitionSpans(t *testing.T) { 6: 2, 7: 2, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + partitionSpanDecisions: [SpanPartitionReasonMax]int{ SpanPartitionReason_GATEWAY_NO_LOCALITY_MATCH: 2, SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED: 2, }, @@ -1326,6 +1337,8 @@ func TestPartitionSpans(t *testing.T) { }, }, }, + + // 12 { ranges: []testSpanResolverRange{{"A", 1}, {"B", 2}, {"C", 1}, {"D", 3}}, gatewayNode: 1, @@ -1347,7 +1360,7 @@ func TestPartitionSpans(t *testing.T) { partitionSpans: map[base.SQLInstanceID]int{ 7: 4, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + partitionSpanDecisions: [SpanPartitionReasonMax]int{ SpanPartitionReason_LOCALITY_FILTERED_RANDOM: 4, }, totalPartitionSpans: 4, @@ -1471,7 +1484,7 @@ func TestPartitionSpans(t *testing.T) { spans = append(spans, roachpb.Span{Key: roachpb.Key(s[0]), EndKey: roachpb.Key(s[1])}) } - partitions, err := dsp.PartitionSpans(ctx, planCtx, spans) + partitions, err := dsp.PartitionSpans(ctx, planCtx, spans, PartitionSpansBoundDefault) if err != nil { t.Fatal(err) } @@ -1523,7 +1536,6 @@ func TestPartitionSpans(t *testing.T) { } recording := getRecAndFinish() - t.Logf("recording is %s", recording) for _, expectedMsg := range tc.partitionStates { require.NotEqual(t, -1, tracing.FindMsgInRecording(recording, expectedMsg)) } @@ -1784,9 +1796,10 @@ func TestPartitionSpansSkipsNodesNotInGossip(t *testing.T) { ranges: ranges, } + st := cluster.MakeTestingClusterSettings() gw := gossip.MakeOptionalGossip(mockGossip) dsp := DistSQLPlanner{ - st: cluster.MakeTestingClusterSettings(), + st: st, gatewaySQLInstanceID: base.SQLInstanceID(tsp.nodes[gatewayNode-1].NodeID), stopper: stopper, spanResolver: tsp, @@ -1805,11 +1818,13 @@ func TestPartitionSpansSkipsNodesNotInGossip(t *testing.T) { } ctx := context.Background() + // This test is specific to gossip-based planning. + useGossipPlanning.Override(ctx, &st.SV, true) planCtx := dsp.NewPlanningCtx( - ctx, &extendedEvalContext{Context: eval.Context{Codec: keys.SystemSQLCodec}}, + ctx, &extendedEvalContext{Context: eval.Context{Codec: keys.SystemSQLCodec, Settings: st}}, nil /* planner */, nil /* txn */, FullDistribution, ) - partitions, err := dsp.PartitionSpans(ctx, planCtx, roachpb.Spans{span}) + partitions, err := dsp.PartitionSpans(ctx, planCtx, roachpb.Spans{span}, PartitionSpansBoundDefault) if err != nil { t.Fatal(err) } diff --git a/pkg/sql/distsql_plan_backfill.go b/pkg/sql/distsql_plan_backfill.go index 590f5de4f299..73616616727a 100644 --- a/pkg/sql/distsql_plan_backfill.go +++ b/pkg/sql/distsql_plan_backfill.go @@ -76,7 +76,7 @@ func initIndexBackfillMergerSpec( func (dsp *DistSQLPlanner) createBackfillerPhysicalPlan( ctx context.Context, planCtx *PlanningCtx, spec execinfrapb.BackfillerSpec, spans []roachpb.Span, ) (*PhysicalPlan, error) { - spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, spans) + spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, spans, PartitionSpansBoundDefault) if err != nil { return nil, err } @@ -145,7 +145,7 @@ func (dsp *DistSQLPlanner) createIndexBackfillerMergePhysicalPlan( return idx } - spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, indexSpans) + spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, indexSpans, PartitionSpansBoundDefault) if err != nil { return nil, err } diff --git a/pkg/sql/fingerprint_span.go b/pkg/sql/fingerprint_span.go index 9a2a24688e59..f56a0df7b31d 100644 --- a/pkg/sql/fingerprint_span.go +++ b/pkg/sql/fingerprint_span.go @@ -118,7 +118,7 @@ func (p *planner) fingerprintSpanFanout( return 0, nil, err } - spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, []roachpb.Span{span}) + spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, []roachpb.Span{span}, PartitionSpansBoundDefault) if err != nil { return 0, nil, err } diff --git a/pkg/sql/ttl/ttljob/ttljob.go b/pkg/sql/ttl/ttljob/ttljob.go index d1311217ff74..9a598877c7e4 100644 --- a/pkg/sql/ttl/ttljob/ttljob.go +++ b/pkg/sql/ttl/ttljob/ttljob.go @@ -166,7 +166,7 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) (re if err != nil { return err } - spanPartitions, err := distSQLPlanner.PartitionSpans(ctx, planCtx, []roachpb.Span{entirePKSpan}) + spanPartitions, err := distSQLPlanner.PartitionSpans(ctx, planCtx, []roachpb.Span{entirePKSpan}, sql.PartitionSpansBoundDefault) if err != nil { return err }