Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: disable gossip-based physical planning by default #135034

Merged
merged 1 commit into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ func distBackupPlanSpecs(
var introducedSpanPartitions []sql.SpanPartition
var err error
if len(spans) > 0 {
spanPartitions, err = dsp.PartitionSpans(ctx, planCtx, spans)
spanPartitions, err = dsp.PartitionSpans(ctx, planCtx, spans, sql.PartitionSpansBoundDefault)
if err != nil {
return nil, err
}
}
if len(introducedSpans) > 0 {
introducedSpanPartitions, err = dsp.PartitionSpans(ctx, planCtx, introducedSpans)
introducedSpanPartitions, err = dsp.PartitionSpans(ctx, planCtx, introducedSpans, sql.PartitionSpansBoundDefault)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func makePlan(
}
planCtx := dsp.NewPlanningCtxWithOracle(ctx, execCtx.ExtendedEvalContext(), nil, /* planner */
blankTxn, sql.DistributionType(distMode), oracle, locFilter)
spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, trackedSpans)
spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, trackedSpans, sql.PartitionSpansBoundDefault)
if err != nil {
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/crosscluster/producer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func buildReplicationStreamSpec(
ctx, jobExecCtx.ExtendedEvalContext(), nil /* planner */, nil /* txn */, sql.FullDistribution, oracle, noLoc,
)

spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, targetSpans)
spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, targetSpans, sql.PartitionSpansBoundDefault)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/revertccl/revert.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func RevertSpansFanout(
return err
}

spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, spans)
spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, spans, sql.PartitionSpansBoundDefault)
if err != nil {
return err
}
Expand Down
118 changes: 89 additions & 29 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ const (
type spanPartitionState struct {
// partitionSpanDecisions is a mapping from a SpanPartitionReason to the number of
// times we have picked an instance for that reason.
partitionSpanDecisions [SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED + 1]int
partitionSpanDecisions [SpanPartitionReasonMax]int

// partitionSpans is a mapping from a SQLInstanceID to the number of
// partition spans that have been assigned to that node.
Expand Down Expand Up @@ -1187,6 +1187,8 @@ const (
// eligible but overloaded with other partitions. In this case we pick a
// random instance apart from the gateway.
SpanPartitionReason_LOCALITY_FILTERED_RANDOM_GATEWAY_OVERLOADED
// SpanPartitionReasonMax tracks the number of SpanPartitionReason objects.
SpanPartitionReasonMax
)

func (r SpanPartitionReason) String() string {
Expand Down Expand Up @@ -1331,27 +1333,49 @@ func (dsp *DistSQLPlanner) checkInstanceHealthAndVersionSystem(
return status
}

// PartitionSpansBoundGranularity indicates how "granular" boundaries between
// spans passed to PartitionSpans are.
type PartitionSpansBoundGranularity int

const (
// PartitionSpansBoundDefault indicates that consecutive spans passed to
// PartitionSpans can always be processed on different instances. This is
// the case when the spans boundaries are at SQL row level or higher (index,
// table, tenant, etc).
PartitionSpansBoundDefault PartitionSpansBoundGranularity = iota
// PartitionSpansBoundCFWithinRow indicates that consecutive spans passed to
// PartitionSpans _might_ be part of the same SQL row (when they correspond
// to different column families within a single row), and in that case those
// spans need to be assigned the same instance.
PartitionSpansBoundCFWithinRow
)

// PartitionSpans finds out which nodes are owners for ranges touching the
// given spans, and splits the spans according to owning nodes. The result is a
// set of SpanPartitions (guaranteed one for each relevant node), which form a
// partitioning of the spans (i.e. they are non-overlapping and their union is
// exactly the original set of spans).
//
// PartitionSpans does its best to not assign ranges on nodes that are known to
// either be unhealthy or running an incompatible version. The ranges owned by
// such nodes are assigned to the gateway.
// be unhealthy. The ranges owned by such nodes are assigned to the gateway.
func (dsp *DistSQLPlanner) PartitionSpans(
ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans,
ctx context.Context,
planCtx *PlanningCtx,
spans roachpb.Spans,
bound PartitionSpansBoundGranularity,
) ([]SpanPartition, error) {
partitions, _, err := dsp.partitionSpansEx(ctx, planCtx, spans)
partitions, _, err := dsp.partitionSpansEx(ctx, planCtx, spans, bound)
return partitions, err
}

// partitionSpansEx is the same as PartitionSpans but additionally returns a
// boolean indicating whether the misplanned ranges metadata should not be
// generated.
func (dsp *DistSQLPlanner) partitionSpansEx(
ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans,
ctx context.Context,
planCtx *PlanningCtx,
spans roachpb.Spans,
bound PartitionSpansBoundGranularity,
) (_ []SpanPartition, ignoreMisplannedRanges bool, _ error) {
if len(spans) == 0 {
return nil, false, errors.AssertionFailedf("no spans")
Expand All @@ -1367,7 +1391,7 @@ func (dsp *DistSQLPlanner) partitionSpansEx(
if dsp.useGossipPlanning(ctx, planCtx) {
return dsp.deprecatedPartitionSpansSystem(ctx, planCtx, spans)
}
return dsp.partitionSpans(ctx, planCtx, spans)
return dsp.partitionSpans(ctx, planCtx, spans, bound)
}

// partitionSpan takes a single span and splits it up according to the owning
Expand Down Expand Up @@ -1516,7 +1540,10 @@ func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem(
// available SQL instances if the locality info is available on at least some of
// the instances, and it falls back to naive round-robin assignment if not.
func (dsp *DistSQLPlanner) partitionSpans(
ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans,
ctx context.Context,
planCtx *PlanningCtx,
spans roachpb.Spans,
bound PartitionSpansBoundGranularity,
) (partitions []SpanPartition, ignoreMisplannedRanges bool, _ error) {
resolver, err := dsp.makeInstanceResolver(ctx, planCtx)
if err != nil {
Expand All @@ -1526,19 +1553,33 @@ func (dsp *DistSQLPlanner) partitionSpans(
var lastKey roachpb.Key
var lastPartitionIdx int
for _, span := range spans {
// Rows with column families may have been split into different spans.
// These spans should be assigned the same pod so that the pod can
// stitch together the rows correctly. Split rows are in adjacent spans.
if safeKey, err := keys.EnsureSafeSplitKey(span.Key); err == nil && len(safeKey) > 0 {
if safeKey.Equal(lastKey) {
if log.V(1) {
log.Infof(ctx, "partitioning span %s", span)
if bound == PartitionSpansBoundCFWithinRow {
// Rows with column families may have been split into different
// spans. These spans should be assigned the same pod so that the
// pod can stitch together the rows correctly. Split rows are in
// adjacent spans.
// TODO(112702): here we can stitch together spans in more cases
// then necessary because we currently can't distinguish some valid
// keys corresponding to a full SQL row (i.e. single CF case) vs
// those that are part of a SQL row (i.e. multiple CFs case). If we
// address #112702, we'll be able to make the check here precise.
if safeKey, err := keys.EnsureSafeSplitKey(span.Key); err == nil && len(safeKey) > 0 {
if safeKey.Equal(lastKey) {
if log.V(1) {
log.Infof(ctx, "stitching span %s into the previous span partition", span)
}
// TODO(yuzefovich): we're not updating
// SpanPartition.numRanges as well as spanPartitionState
// here. At the moment, that info is only used with
// PartitionSpansBoundDefault, which makes this code
// unreachable. If that ever changes, we'll need to be
// smarter here.
partition := &partitions[lastPartitionIdx]
partition.Spans = append(partition.Spans, span)
continue
}
partition := &partitions[lastPartitionIdx]
partition.Spans = append(partition.Spans, span)
continue
lastKey = safeKey
}
lastKey = safeKey
}
partitions, lastPartitionIdx, err = dsp.partitionSpan(
ctx, planCtx, span, partitions, nodeMap, resolver, &ignoreMisplannedRanges,
Expand All @@ -1559,9 +1600,8 @@ func (dsp *DistSQLPlanner) deprecatedHealthySQLInstanceIDForKVNodeIDSystem(
) (base.SQLInstanceID, SpanPartitionReason) {
sqlInstanceID := base.SQLInstanceID(nodeID)
status := dsp.checkInstanceHealthAndVersionSystem(ctx, planCtx, sqlInstanceID)
// If the node is unhealthy or its DistSQL version is incompatible, use the
// gateway to process this span instead of the unhealthy host. An empty
// address indicates an unhealthy host.
// If the node is unhealthy, use the gateway to process this span instead of
// the unhealthy host. An empty address indicates an unhealthy host.
reason := SpanPartitionReason_GOSSIP_TARGET_HEALTHY
if status != NodeOK {
log.VEventf(ctx, 2, "not planning on node %d: %s", sqlInstanceID, status)
Expand Down Expand Up @@ -1922,8 +1962,8 @@ func ClosestInstances[instance InstanceLocalityGetter](

// getInstanceIDForScan retrieves the SQL Instance ID where the single table
// reader should reside for a limited scan. Ideally this is the lease holder for
// the first range in the specified spans. But if that node is unhealthy or
// incompatible, we use the gateway node instead.
// the first range in the specified spans. But if that node is unhealthy, we use
// the gateway node instead.
func (dsp *DistSQLPlanner) getInstanceIDForScan(
ctx context.Context, planCtx *PlanningCtx, spans []roachpb.Span, reverse bool,
) (base.SQLInstanceID, error) {
Expand All @@ -1950,7 +1990,7 @@ func (dsp *DistSQLPlanner) getInstanceIDForScan(
return 0, err
}

if dsp.useGossipPlanning(ctx, planCtx) && planCtx.localityFilter.Empty() {
if dsp.useGossipPlanning(ctx, planCtx) {
sqlInstanceID, _ := dsp.deprecatedHealthySQLInstanceIDForKVNodeIDSystem(ctx, planCtx, replDesc.NodeID)
return sqlInstanceID, nil
}
Expand All @@ -1963,9 +2003,21 @@ func (dsp *DistSQLPlanner) getInstanceIDForScan(
return sqlInstanceID, nil
}

// TODO(yuzefovich): retire this setting altogether in 25.3 release.
var useGossipPlanning = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"sql.distsql_planning.use_gossip.enabled",
"if enabled, the DistSQL physical planner falls back to gossip-based planning",
false,
)

func (dsp *DistSQLPlanner) useGossipPlanning(_ context.Context, planCtx *PlanningCtx) bool {
// TODO(dt): enable this by default, e.g. // && !dsp.distSQLSrv.Settings.Version.IsActive(ctx, clusterversion.V23_1)
return dsp.codec.ForSystemTenant() && planCtx.localityFilter.Empty()
var gossipPlanningEnabled bool
// Some of the planCtx fields can be left unset in tests.
if planCtx.ExtendedEvalCtx != nil && planCtx.ExtendedEvalCtx.Settings != nil {
gossipPlanningEnabled = useGossipPlanning.Get(&planCtx.ExtendedEvalCtx.Settings.SV)
}
return dsp.codec.ForSystemTenant() && planCtx.localityFilter.Empty() && gossipPlanningEnabled
}

// convertOrdering maps the columns in props.ordering to the output columns of a
Expand Down Expand Up @@ -2122,8 +2174,12 @@ func (dsp *DistSQLPlanner) maybeParallelizeLocalScans(
// Temporarily unset isLocal so that PartitionSpans divides all spans
// according to the respective leaseholders.
planCtx.isLocal = false
bound := PartitionSpansBoundDefault
if info.desc.NumFamilies() > 1 {
bound = PartitionSpansBoundCFWithinRow
}
var err error
spanPartitions, err = dsp.PartitionSpans(ctx, planCtx, info.spans)
spanPartitions, err = dsp.PartitionSpans(ctx, planCtx, info.spans, bound)
planCtx.isLocal = true
if err != nil {
// For some reason we couldn't partition the spans - fallback to
Expand Down Expand Up @@ -2213,7 +2269,11 @@ func (dsp *DistSQLPlanner) planTableReaders(
// still read too eagerly in the soft limit case. To prevent this we'll
// need a new mechanism on the execution side to modulate table reads.
// TODO(yuzefovich): add that mechanism.
spanPartitions, ignoreMisplannedRanges, err = dsp.partitionSpansEx(ctx, planCtx, info.spans)
bound := PartitionSpansBoundDefault
if info.desc.NumFamilies() > 1 {
bound = PartitionSpansBoundCFWithinRow
}
spanPartitions, ignoreMisplannedRanges, err = dsp.partitionSpansEx(ctx, planCtx, info.spans, bound)
if err != nil {
return err
}
Expand Down
Loading