diff --git a/pkg/ccl/backupccl/backup_processor_planning.go b/pkg/ccl/backupccl/backup_processor_planning.go index 57927dc0690c..b1b779fe3b5d 100644 --- a/pkg/ccl/backupccl/backup_processor_planning.go +++ b/pkg/ccl/backupccl/backup_processor_planning.go @@ -54,13 +54,13 @@ func distBackupPlanSpecs( var introducedSpanPartitions []sql.SpanPartition var err error if len(spans) > 0 { - spanPartitions, err = dsp.PartitionSpans(ctx, planCtx, spans) + spanPartitions, err = dsp.PartitionSpans(ctx, planCtx, spans, sql.PartitionSpansBoundDefault) if err != nil { return nil, err } } if len(introducedSpans) > 0 { - introducedSpanPartitions, err = dsp.PartitionSpans(ctx, planCtx, introducedSpans) + introducedSpanPartitions, err = dsp.PartitionSpans(ctx, planCtx, introducedSpans, sql.PartitionSpansBoundDefault) if err != nil { return nil, err } diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index b7d6ad479021..774fecb1a569 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -405,7 +405,7 @@ func makePlan( } planCtx := dsp.NewPlanningCtxWithOracle(ctx, execCtx.ExtendedEvalContext(), nil, /* planner */ blankTxn, sql.DistributionType(distMode), oracle, locFilter) - spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, trackedSpans) + spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, trackedSpans, sql.PartitionSpansBoundDefault) if err != nil { return nil, nil, err } diff --git a/pkg/ccl/crosscluster/producer/stream_lifetime.go b/pkg/ccl/crosscluster/producer/stream_lifetime.go index 2252c85937c5..63a0a75c36c5 100644 --- a/pkg/ccl/crosscluster/producer/stream_lifetime.go +++ b/pkg/ccl/crosscluster/producer/stream_lifetime.go @@ -305,7 +305,7 @@ func buildReplicationStreamSpec( ctx, jobExecCtx.ExtendedEvalContext(), nil /* planner */, nil /* txn */, sql.FullDistribution, oracle, noLoc, ) - spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, targetSpans) + spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, targetSpans, sql.PartitionSpansBoundDefault) if err != nil { return nil, err } diff --git a/pkg/ccl/revertccl/revert.go b/pkg/ccl/revertccl/revert.go index 8166b3192789..3253e1ef73aa 100644 --- a/pkg/ccl/revertccl/revert.go +++ b/pkg/ccl/revertccl/revert.go @@ -60,7 +60,7 @@ func RevertSpansFanout( return err } - spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, spans) + spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, spans, sql.PartitionSpansBoundDefault) if err != nil { return err } diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 7f0ccf565c3e..4ddf751f8dd6 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -823,7 +823,7 @@ const ( type spanPartitionState struct { // partitionSpanDecisions is a mapping from a SpanPartitionReason to the number of // times we have picked an instance for that reason. - partitionSpanDecisions [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int + partitionSpanDecisions [SpanPartitionReasonMax]int // partitionSpans is a mapping from a SQLInstanceID to the number of // partition spans that have been assigned to that node. @@ -1187,6 +1187,8 @@ const ( // eligible but overloaded with other partitions. In this case we pick a // random instance apart from the gateway. SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + // SpanPartitionReasonMax tracks the number of SpanPartitionReason objects. + SpanPartitionReasonMax ) func (r SpanPartitionReason) String() string { @@ -1331,6 +1333,23 @@ func (dsp *DistSQLPlanner) checkInstanceHealthAndVersionSystem( return status } +// PartitionSpansBoundGranularity indicates how "granular" boundaries between +// spans passed to PartitionSpans are. +type PartitionSpansBoundGranularity int + +const ( + // PartitionSpansBoundDefault indicates that consecutive spans passed to + // PartitionSpans can always be processed on different instances. This is + // the case when the spans boundaries are at SQL row level or higher (index, + // table, tenant, etc). + PartitionSpansBoundDefault PartitionSpansBoundGranularity = iota + // PartitionSpansBoundCFWithinRow indicates that consecutive spans passed to + // PartitionSpans _might_ be part of the same SQL row (when they correspond + // to different column families within a single row), and in that case those + // spans need to be assigned the same instance. + PartitionSpansBoundCFWithinRow +) + // PartitionSpans finds out which nodes are owners for ranges touching the // given spans, and splits the spans according to owning nodes. The result is a // set of SpanPartitions (guaranteed one for each relevant node), which form a @@ -1338,12 +1357,14 @@ func (dsp *DistSQLPlanner) checkInstanceHealthAndVersionSystem( // exactly the original set of spans). // // PartitionSpans does its best to not assign ranges on nodes that are known to -// either be unhealthy or running an incompatible version. The ranges owned by -// such nodes are assigned to the gateway. +// be unhealthy. The ranges owned by such nodes are assigned to the gateway. func (dsp *DistSQLPlanner) PartitionSpans( - ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans, + ctx context.Context, + planCtx *PlanningCtx, + spans roachpb.Spans, + bound PartitionSpansBoundGranularity, ) ([]SpanPartition, error) { - partitions, _, err := dsp.partitionSpansEx(ctx, planCtx, spans) + partitions, _, err := dsp.partitionSpansEx(ctx, planCtx, spans, bound) return partitions, err } @@ -1351,7 +1372,10 @@ func (dsp *DistSQLPlanner) PartitionSpans( // boolean indicating whether the misplanned ranges metadata should not be // generated. func (dsp *DistSQLPlanner) partitionSpansEx( - ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans, + ctx context.Context, + planCtx *PlanningCtx, + spans roachpb.Spans, + bound PartitionSpansBoundGranularity, ) (_ []SpanPartition, ignoreMisplannedRanges bool, _ error) { if len(spans) == 0 { return nil, false, errors.AssertionFailedf("no spans") @@ -1367,7 +1391,7 @@ func (dsp *DistSQLPlanner) partitionSpansEx( if dsp.useGossipPlanning(ctx, planCtx) { return dsp.deprecatedPartitionSpansSystem(ctx, planCtx, spans) } - return dsp.partitionSpans(ctx, planCtx, spans) + return dsp.partitionSpans(ctx, planCtx, spans, bound) } // partitionSpan takes a single span and splits it up according to the owning @@ -1516,7 +1540,10 @@ func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem( // available SQL instances if the locality info is available on at least some of // the instances, and it falls back to naive round-robin assignment if not. func (dsp *DistSQLPlanner) partitionSpans( - ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans, + ctx context.Context, + planCtx *PlanningCtx, + spans roachpb.Spans, + bound PartitionSpansBoundGranularity, ) (partitions []SpanPartition, ignoreMisplannedRanges bool, _ error) { resolver, err := dsp.makeInstanceResolver(ctx, planCtx) if err != nil { @@ -1526,19 +1553,33 @@ func (dsp *DistSQLPlanner) partitionSpans( var lastKey roachpb.Key var lastPartitionIdx int for _, span := range spans { - // Rows with column families may have been split into different spans. - // These spans should be assigned the same pod so that the pod can - // stitch together the rows correctly. Split rows are in adjacent spans. - if safeKey, err := keys.EnsureSafeSplitKey(span.Key); err == nil && len(safeKey) > 0 { - if safeKey.Equal(lastKey) { - if log.V(1) { - log.Infof(ctx, "partitioning span %s", span) + if bound == PartitionSpansBoundCFWithinRow { + // Rows with column families may have been split into different + // spans. These spans should be assigned the same pod so that the + // pod can stitch together the rows correctly. Split rows are in + // adjacent spans. + // TODO(112702): here we can stitch together spans in more cases + // then necessary because we currently can't distinguish some valid + // keys corresponding to a full SQL row (i.e. single CF case) vs + // those that are part of a SQL row (i.e. multiple CFs case). If we + // address #112702, we'll be able to make the check here precise. + if safeKey, err := keys.EnsureSafeSplitKey(span.Key); err == nil && len(safeKey) > 0 { + if safeKey.Equal(lastKey) { + if log.V(1) { + log.Infof(ctx, "stitching span %s into the previous span partition", span) + } + // TODO(yuzefovich): we're not updating + // SpanPartition.numRanges as well as spanPartitionState + // here. At the moment, that info is only used with + // PartitionSpansBoundDefault, which makes this code + // unreachable. If that ever changes, we'll need to be + // smarter here. + partition := &partitions[lastPartitionIdx] + partition.Spans = append(partition.Spans, span) + continue } - partition := &partitions[lastPartitionIdx] - partition.Spans = append(partition.Spans, span) - continue + lastKey = safeKey } - lastKey = safeKey } partitions, lastPartitionIdx, err = dsp.partitionSpan( ctx, planCtx, span, partitions, nodeMap, resolver, &ignoreMisplannedRanges, @@ -1559,9 +1600,8 @@ func (dsp *DistSQLPlanner) deprecatedHealthySQLInstanceIDForKVNodeIDSystem( ) (base.SQLInstanceID, SpanPartitionReason) { sqlInstanceID := base.SQLInstanceID(nodeID) status := dsp.checkInstanceHealthAndVersionSystem(ctx, planCtx, sqlInstanceID) - // If the node is unhealthy or its DistSQL version is incompatible, use the - // gateway to process this span instead of the unhealthy host. An empty - // address indicates an unhealthy host. + // If the node is unhealthy, use the gateway to process this span instead of + // the unhealthy host. An empty address indicates an unhealthy host. reason := SpanPartitionReason_GOSSIP_TARGET_HEALTHY if status != NodeOK { log.VEventf(ctx, 2, "not planning on node %d: %s", sqlInstanceID, status) @@ -1922,8 +1962,8 @@ func ClosestInstances[instance InstanceLocalityGetter]( // getInstanceIDForScan retrieves the SQL Instance ID where the single table // reader should reside for a limited scan. Ideally this is the lease holder for -// the first range in the specified spans. But if that node is unhealthy or -// incompatible, we use the gateway node instead. +// the first range in the specified spans. But if that node is unhealthy, we use +// the gateway node instead. func (dsp *DistSQLPlanner) getInstanceIDForScan( ctx context.Context, planCtx *PlanningCtx, spans []roachpb.Span, reverse bool, ) (base.SQLInstanceID, error) { @@ -1950,7 +1990,7 @@ func (dsp *DistSQLPlanner) getInstanceIDForScan( return 0, err } - if dsp.useGossipPlanning(ctx, planCtx) && planCtx.localityFilter.Empty() { + if dsp.useGossipPlanning(ctx, planCtx) { sqlInstanceID, _ := dsp.deprecatedHealthySQLInstanceIDForKVNodeIDSystem(ctx, planCtx, replDesc.NodeID) return sqlInstanceID, nil } @@ -1963,9 +2003,21 @@ func (dsp *DistSQLPlanner) getInstanceIDForScan( return sqlInstanceID, nil } +// TODO(yuzefovich): retire this setting altogether in 25.3 release. +var useGossipPlanning = settings.RegisterBoolSetting( + settings.ApplicationLevel, + "sql.distsql_planning.use_gossip.enabled", + "if enabled, the DistSQL physical planner falls back to gossip-based planning", + false, +) + func (dsp *DistSQLPlanner) useGossipPlanning(_ context.Context, planCtx *PlanningCtx) bool { - // TODO(dt): enable this by default, e.g. // && !dsp.distSQLSrv.Settings.Version.IsActive(ctx, clusterversion.V23_1) - return dsp.codec.ForSystemTenant() && planCtx.localityFilter.Empty() + var gossipPlanningEnabled bool + // Some of the planCtx fields can be left unset in tests. + if planCtx.ExtendedEvalCtx != nil && planCtx.ExtendedEvalCtx.Settings != nil { + gossipPlanningEnabled = useGossipPlanning.Get(&planCtx.ExtendedEvalCtx.Settings.SV) + } + return dsp.codec.ForSystemTenant() && planCtx.localityFilter.Empty() && gossipPlanningEnabled } // convertOrdering maps the columns in props.ordering to the output columns of a @@ -2122,8 +2174,12 @@ func (dsp *DistSQLPlanner) maybeParallelizeLocalScans( // Temporarily unset isLocal so that PartitionSpans divides all spans // according to the respective leaseholders. planCtx.isLocal = false + bound := PartitionSpansBoundDefault + if info.desc.NumFamilies() > 1 { + bound = PartitionSpansBoundCFWithinRow + } var err error - spanPartitions, err = dsp.PartitionSpans(ctx, planCtx, info.spans) + spanPartitions, err = dsp.PartitionSpans(ctx, planCtx, info.spans, bound) planCtx.isLocal = true if err != nil { // For some reason we couldn't partition the spans - fallback to @@ -2213,7 +2269,11 @@ func (dsp *DistSQLPlanner) planTableReaders( // still read too eagerly in the soft limit case. To prevent this we'll // need a new mechanism on the execution side to modulate table reads. // TODO(yuzefovich): add that mechanism. - spanPartitions, ignoreMisplannedRanges, err = dsp.partitionSpansEx(ctx, planCtx, info.spans) + bound := PartitionSpansBoundDefault + if info.desc.NumFamilies() > 1 { + bound = PartitionSpansBoundCFWithinRow + } + spanPartitions, ignoreMisplannedRanges, err = dsp.partitionSpansEx(ctx, planCtx, info.spans, bound) if err != nil { return err } diff --git a/pkg/sql/distsql_physical_planner_test.go b/pkg/sql/distsql_physical_planner_test.go index 83030fe4043a..7b0267c09bf5 100644 --- a/pkg/sql/distsql_physical_planner_test.go +++ b/pkg/sql/distsql_physical_planner_test.go @@ -937,6 +937,7 @@ func TestPartitionSpans(t *testing.T) { partitionStates []string partitionState spanPartitionState }{ + // 0 { ranges: []testSpanResolverRange{{"A", 1}, {"B", 2}, {"C", 1}, {"D", 3}}, gatewayNode: 1, @@ -950,10 +951,10 @@ func TestPartitionSpans(t *testing.T) { }, partitionStates: []string{ - "partition span: {A1-B}, instance ID: 1, reason: gossip-target-healthy", - "partition span: {B-C}, instance ID: 2, reason: gossip-target-healthy", - "partition span: C{-1}, instance ID: 1, reason: gossip-target-healthy", - "partition span: {D1-X}, instance ID: 3, reason: gossip-target-healthy", + "partition span: {A1-B}, instance ID: 1, reason: target-healthy", + "partition span: {B-C}, instance ID: 2, reason: target-healthy", + "partition span: C{-1}, instance ID: 1, reason: target-healthy", + "partition span: {D1-X}, instance ID: 3, reason: target-healthy", }, partitionState: spanPartitionState{ @@ -962,13 +963,14 @@ func TestPartitionSpans(t *testing.T) { 2: 1, 3: 1, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ - SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 4, + partitionSpanDecisions: [SpanPartitionReasonMax]int{ + SpanPartitionReason_TARGET_HEALTHY: 4, }, totalPartitionSpans: 4, }, }, + // 1 { ranges: []testSpanResolverRange{{"A", 1}, {"B", 2}, {"C", 1}, {"D", 3}}, deadNodes: []int{1}, // The health status of the gateway node shouldn't matter. @@ -983,10 +985,10 @@ func TestPartitionSpans(t *testing.T) { }, partitionStates: []string{ - "partition span: {A1-B}, instance ID: 1, reason: gossip-target-healthy", - "partition span: {B-C}, instance ID: 2, reason: gossip-target-healthy", - "partition span: C{-1}, instance ID: 1, reason: gossip-target-healthy", - "partition span: {D1-X}, instance ID: 3, reason: gossip-target-healthy", + "partition span: {A1-B}, instance ID: 1, reason: target-healthy", + "partition span: {B-C}, instance ID: 2, reason: target-healthy", + "partition span: C{-1}, instance ID: 1, reason: target-healthy", + "partition span: {D1-X}, instance ID: 3, reason: target-healthy", }, partitionState: spanPartitionState{ @@ -995,13 +997,14 @@ func TestPartitionSpans(t *testing.T) { 2: 1, 3: 1, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ - SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 4, + partitionSpanDecisions: [SpanPartitionReasonMax]int{ + SpanPartitionReason_TARGET_HEALTHY: 4, }, totalPartitionSpans: 4, }, }, + // 2 { ranges: []testSpanResolverRange{{"A", 1}, {"B", 2}, {"C", 1}, {"D", 3}}, deadNodes: []int{2}, @@ -1015,10 +1018,10 @@ func TestPartitionSpans(t *testing.T) { }, partitionStates: []string{ - "partition span: {A1-B}, instance ID: 1, reason: gossip-target-healthy", - "partition span: {B-C}, instance ID: 1, reason: gossip-gateway-target-unhealthy", - "partition span: C{-1}, instance ID: 1, reason: gossip-target-healthy", - "partition span: {D1-X}, instance ID: 3, reason: gossip-target-healthy", + "partition span: {A1-B}, instance ID: 1, reason: target-healthy", + "partition span: {B-C}, instance ID: 1, reason: gateway-target-unhealthy", + "partition span: C{-1}, instance ID: 1, reason: target-healthy", + "partition span: {D1-X}, instance ID: 3, reason: target-healthy", }, partitionState: spanPartitionState{ @@ -1026,14 +1029,15 @@ func TestPartitionSpans(t *testing.T) { 1: 3, 3: 1, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ - SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 3, - SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY: 1, + partitionSpanDecisions: [SpanPartitionReasonMax]int{ + SpanPartitionReason_TARGET_HEALTHY: 3, + SpanPartitionReason_GATEWAY_TARGET_UNHEALTHY: 1, }, totalPartitionSpans: 4, }, }, + // 3 { ranges: []testSpanResolverRange{{"A", 1}, {"B", 2}, {"C", 1}, {"D", 3}}, deadNodes: []int{3}, @@ -1047,10 +1051,10 @@ func TestPartitionSpans(t *testing.T) { }, partitionStates: []string{ - "partition span: {A1-B}, instance ID: 1, reason: gossip-target-healthy", - "partition span: {B-C}, instance ID: 2, reason: gossip-target-healthy", - "partition span: C{-1}, instance ID: 1, reason: gossip-target-healthy", - "partition span: {D1-X}, instance ID: 1, reason: gossip-gateway-target-unhealthy", + "partition span: {A1-B}, instance ID: 1, reason: target-healthy", + "partition span: {B-C}, instance ID: 2, reason: target-healthy", + "partition span: C{-1}, instance ID: 1, reason: target-healthy", + "partition span: {D1-X}, instance ID: 1, reason: gateway-target-unhealthy", }, partitionState: spanPartitionState{ @@ -1058,14 +1062,15 @@ func TestPartitionSpans(t *testing.T) { 1: 3, 2: 1, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ - SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 3, - SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY: 1, + partitionSpanDecisions: [SpanPartitionReasonMax]int{ + SpanPartitionReason_TARGET_HEALTHY: 3, + SpanPartitionReason_GATEWAY_TARGET_UNHEALTHY: 1, }, totalPartitionSpans: 4, }, }, + // 4 { ranges: []testSpanResolverRange{{"A", 1}, {"B", 2}, {"C", 1}, {"D", 3}}, deadNodes: []int{1}, @@ -1079,10 +1084,10 @@ func TestPartitionSpans(t *testing.T) { }, partitionStates: []string{ - "partition span: {A1-B}, instance ID: 2, reason: gossip-gateway-target-unhealthy", - "partition span: {B-C}, instance ID: 2, reason: gossip-target-healthy", - "partition span: C{-1}, instance ID: 2, reason: gossip-gateway-target-unhealthy", - "partition span: {D1-X}, instance ID: 3, reason: gossip-target-healthy", + "partition span: {A1-B}, instance ID: 2, reason: gateway-target-unhealthy", + "partition span: {B-C}, instance ID: 2, reason: target-healthy", + "partition span: C{-1}, instance ID: 2, reason: gateway-target-unhealthy", + "partition span: {D1-X}, instance ID: 3, reason: target-healthy", }, partitionState: spanPartitionState{ @@ -1090,14 +1095,15 @@ func TestPartitionSpans(t *testing.T) { 2: 3, 3: 1, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ - SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 2, - SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY: 2, + partitionSpanDecisions: [SpanPartitionReasonMax]int{ + SpanPartitionReason_TARGET_HEALTHY: 2, + SpanPartitionReason_GATEWAY_TARGET_UNHEALTHY: 2, }, totalPartitionSpans: 4, }, }, + // 5 { ranges: []testSpanResolverRange{{"A", 1}, {"B", 2}, {"C", 1}, {"D", 3}}, deadNodes: []int{1}, @@ -1111,10 +1117,10 @@ func TestPartitionSpans(t *testing.T) { }, partitionStates: []string{ - "partition span: {A1-B}, instance ID: 3, reason: gossip-gateway-target-unhealthy", - "partition span: {B-C}, instance ID: 2, reason: gossip-target-healthy", - "partition span: C{-1}, instance ID: 3, reason: gossip-gateway-target-unhealthy", - "partition span: {D1-X}, instance ID: 3, reason: gossip-target-healthy", + "partition span: {A1-B}, instance ID: 3, reason: gateway-target-unhealthy", + "partition span: {B-C}, instance ID: 2, reason: target-healthy", + "partition span: C{-1}, instance ID: 3, reason: gateway-target-unhealthy", + "partition span: {D1-X}, instance ID: 3, reason: target-healthy", }, partitionState: spanPartitionState{ @@ -1122,15 +1128,15 @@ func TestPartitionSpans(t *testing.T) { 2: 1, 3: 3, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ - SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 2, - SpanPartitionReason_GOSSIP_GATEWAY_TARGET_UNHEALTHY: 2, + partitionSpanDecisions: [SpanPartitionReasonMax]int{ + SpanPartitionReason_TARGET_HEALTHY: 2, + SpanPartitionReason_GATEWAY_TARGET_UNHEALTHY: 2, }, totalPartitionSpans: 4, }, }, - // Test point lookups in isolation. + // 6: Test point lookups in isolation. { ranges: []testSpanResolverRange{{"A", 1}, {"B", 2}}, gatewayNode: 1, @@ -1143,9 +1149,9 @@ func TestPartitionSpans(t *testing.T) { }, partitionStates: []string{ - "partition span: A2, instance ID: 1, reason: gossip-target-healthy", - "partition span: A1, instance ID: 1, reason: gossip-target-healthy", - "partition span: B1, instance ID: 2, reason: gossip-target-healthy", + "partition span: A2, instance ID: 1, reason: target-healthy", + "partition span: A1, instance ID: 1, reason: target-healthy", + "partition span: B1, instance ID: 2, reason: target-healthy", }, partitionState: spanPartitionState{ @@ -1153,14 +1159,14 @@ func TestPartitionSpans(t *testing.T) { 1: 2, 2: 1, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ - SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 3, + partitionSpanDecisions: [SpanPartitionReasonMax]int{ + SpanPartitionReason_TARGET_HEALTHY: 3, }, totalPartitionSpans: 3, }, }, - // Test point lookups intertwined with span scans. + // 7: Test point lookups intertwined with span scans. { ranges: []testSpanResolverRange{{"A", 1}, {"B", 1}, {"C", 2}}, gatewayNode: 1, @@ -1173,16 +1179,16 @@ func TestPartitionSpans(t *testing.T) { }, partitionStates: []string{ - "partition span: A1, instance ID: 1, reason: gossip-target-healthy", - "partition span: A{1-2}, instance ID: 1, reason: gossip-target-healthy", - "partition span: A2, instance ID: 1, reason: gossip-target-healthy", - "partition span: {A2-B}, instance ID: 1, reason: gossip-target-healthy", - "partition span: {B-C}, instance ID: 1, reason: gossip-target-healthy", - "partition span: C{-2}, instance ID: 2, reason: gossip-target-healthy", - "partition span: B1, instance ID: 1, reason: gossip-target-healthy", - "partition span: {A3-B}, instance ID: 1, reason: gossip-target-healthy", - "partition span: B{-3}, instance ID: 1, reason: gossip-target-healthy", - "partition span: B2, instance ID: 1, reason: gossip-target-healthy", + "partition span: A1, instance ID: 1, reason: target-healthy", + "partition span: A{1-2}, instance ID: 1, reason: target-healthy", + "partition span: A2, instance ID: 1, reason: target-healthy", + "partition span: {A2-B}, instance ID: 1, reason: target-healthy", + "partition span: {B-C}, instance ID: 1, reason: target-healthy", + "partition span: C{-2}, instance ID: 2, reason: target-healthy", + "partition span: B1, instance ID: 1, reason: target-healthy", + "partition span: {A3-B}, instance ID: 1, reason: target-healthy", + "partition span: B{-3}, instance ID: 1, reason: target-healthy", + "partition span: B2, instance ID: 1, reason: target-healthy", }, partitionState: spanPartitionState{ @@ -1190,14 +1196,14 @@ func TestPartitionSpans(t *testing.T) { 1: 9, 2: 1, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ - SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 10, + partitionSpanDecisions: [SpanPartitionReasonMax]int{ + SpanPartitionReason_TARGET_HEALTHY: 10, }, totalPartitionSpans: 10, }, }, - // A single span touching multiple ranges but on the same node results + // 8: A single span touching multiple ranges but on the same node results // in a single partitioned span. { ranges: []testSpanResolverRange{{"A", 1}, {"A1", 1}, {"B", 2}}, @@ -1210,21 +1216,22 @@ func TestPartitionSpans(t *testing.T) { }, partitionStates: []string{ - "partition span: A{-1}, instance ID: 1, reason: gossip-target-healthy", - "partition span: {A1-B}, instance ID: 1, reason: gossip-target-healthy", + "partition span: A{-1}, instance ID: 1, reason: target-healthy", + "partition span: {A1-B}, instance ID: 1, reason: target-healthy", }, partitionState: spanPartitionState{ partitionSpans: map[base.SQLInstanceID]int{ 1: 2, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ - SpanPartitionReason_GOSSIP_TARGET_HEALTHY: 2, + partitionSpanDecisions: [SpanPartitionReasonMax]int{ + SpanPartitionReason_TARGET_HEALTHY: 2, }, totalPartitionSpans: 2, }, }, - // Test some locality-filtered planning too. + + // 9: Test some locality-filtered planning too. // // Since this test is run on a system tenant but there is a locality filter, // the spans are resolved in a mixed process mode. As a result, the @@ -1253,7 +1260,7 @@ func TestPartitionSpans(t *testing.T) { 1: 2, 2: 2, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + partitionSpanDecisions: [SpanPartitionReasonMax]int{ SpanPartitionReason_TARGET_HEALTHY: 3, SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED: 1, }, @@ -1263,6 +1270,8 @@ func TestPartitionSpans(t *testing.T) { totalPartitionSpans: 4, }, }, + + // 10 { ranges: []testSpanResolverRange{{"A", 1}, {"B", 2}, {"C", 1}, {"D", 3}}, gatewayNode: 1, @@ -1286,13 +1295,15 @@ func TestPartitionSpans(t *testing.T) { 2: 3, 4: 1, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + partitionSpanDecisions: [SpanPartitionReasonMax]int{ SpanPartitionReason_TARGET_HEALTHY: 1, SpanPartitionReason_CLOSEST_LOCALITY_MATCH: 3, }, totalPartitionSpans: 4, }, }, + + // 11 { ranges: []testSpanResolverRange{{"A", 1}, {"B", 2}, {"C", 1}, {"D", 3}}, gatewayNode: 7, @@ -1316,7 +1327,7 @@ func TestPartitionSpans(t *testing.T) { 6: 2, 7: 2, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + partitionSpanDecisions: [SpanPartitionReasonMax]int{ SpanPartitionReason_GATEWAY_NO_LOCALITY_MATCH: 2, SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED: 2, }, @@ -1326,6 +1337,8 @@ func TestPartitionSpans(t *testing.T) { }, }, }, + + // 12 { ranges: []testSpanResolverRange{{"A", 1}, {"B", 2}, {"C", 1}, {"D", 3}}, gatewayNode: 1, @@ -1347,7 +1360,7 @@ func TestPartitionSpans(t *testing.T) { partitionSpans: map[base.SQLInstanceID]int{ 7: 4, }, - partitionSpanDecisions: [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int{ + partitionSpanDecisions: [SpanPartitionReasonMax]int{ SpanPartitionReason_LOCALITY_FILTERED_RANDOM: 4, }, totalPartitionSpans: 4, @@ -1471,7 +1484,7 @@ func TestPartitionSpans(t *testing.T) { spans = append(spans, roachpb.Span{Key: roachpb.Key(s[0]), EndKey: roachpb.Key(s[1])}) } - partitions, err := dsp.PartitionSpans(ctx, planCtx, spans) + partitions, err := dsp.PartitionSpans(ctx, planCtx, spans, PartitionSpansBoundDefault) if err != nil { t.Fatal(err) } @@ -1523,7 +1536,6 @@ func TestPartitionSpans(t *testing.T) { } recording := getRecAndFinish() - t.Logf("recording is %s", recording) for _, expectedMsg := range tc.partitionStates { require.NotEqual(t, -1, tracing.FindMsgInRecording(recording, expectedMsg)) } @@ -1784,9 +1796,10 @@ func TestPartitionSpansSkipsNodesNotInGossip(t *testing.T) { ranges: ranges, } + st := cluster.MakeTestingClusterSettings() gw := gossip.MakeOptionalGossip(mockGossip) dsp := DistSQLPlanner{ - st: cluster.MakeTestingClusterSettings(), + st: st, gatewaySQLInstanceID: base.SQLInstanceID(tsp.nodes[gatewayNode-1].NodeID), stopper: stopper, spanResolver: tsp, @@ -1805,11 +1818,13 @@ func TestPartitionSpansSkipsNodesNotInGossip(t *testing.T) { } ctx := context.Background() + // This test is specific to gossip-based planning. + useGossipPlanning.Override(ctx, &st.SV, true) planCtx := dsp.NewPlanningCtx( - ctx, &extendedEvalContext{Context: eval.Context{Codec: keys.SystemSQLCodec}}, + ctx, &extendedEvalContext{Context: eval.Context{Codec: keys.SystemSQLCodec, Settings: st}}, nil /* planner */, nil /* txn */, FullDistribution, ) - partitions, err := dsp.PartitionSpans(ctx, planCtx, roachpb.Spans{span}) + partitions, err := dsp.PartitionSpans(ctx, planCtx, roachpb.Spans{span}, PartitionSpansBoundDefault) if err != nil { t.Fatal(err) } diff --git a/pkg/sql/distsql_plan_backfill.go b/pkg/sql/distsql_plan_backfill.go index 590f5de4f299..73616616727a 100644 --- a/pkg/sql/distsql_plan_backfill.go +++ b/pkg/sql/distsql_plan_backfill.go @@ -76,7 +76,7 @@ func initIndexBackfillMergerSpec( func (dsp *DistSQLPlanner) createBackfillerPhysicalPlan( ctx context.Context, planCtx *PlanningCtx, spec execinfrapb.BackfillerSpec, spans []roachpb.Span, ) (*PhysicalPlan, error) { - spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, spans) + spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, spans, PartitionSpansBoundDefault) if err != nil { return nil, err } @@ -145,7 +145,7 @@ func (dsp *DistSQLPlanner) createIndexBackfillerMergePhysicalPlan( return idx } - spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, indexSpans) + spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, indexSpans, PartitionSpansBoundDefault) if err != nil { return nil, err } diff --git a/pkg/sql/fingerprint_span.go b/pkg/sql/fingerprint_span.go index 9a2a24688e59..f56a0df7b31d 100644 --- a/pkg/sql/fingerprint_span.go +++ b/pkg/sql/fingerprint_span.go @@ -118,7 +118,7 @@ func (p *planner) fingerprintSpanFanout( return 0, nil, err } - spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, []roachpb.Span{span}) + spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, []roachpb.Span{span}, PartitionSpansBoundDefault) if err != nil { return 0, nil, err } diff --git a/pkg/sql/ttl/ttljob/ttljob.go b/pkg/sql/ttl/ttljob/ttljob.go index d1311217ff74..9a598877c7e4 100644 --- a/pkg/sql/ttl/ttljob/ttljob.go +++ b/pkg/sql/ttl/ttljob/ttljob.go @@ -166,7 +166,7 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) (re if err != nil { return err } - spanPartitions, err := distSQLPlanner.PartitionSpans(ctx, planCtx, []roachpb.Span{entirePKSpan}) + spanPartitions, err := distSQLPlanner.PartitionSpans(ctx, planCtx, []roachpb.Span{entirePKSpan}, sql.PartitionSpansBoundDefault) if err != nil { return err }