Skip to content

Commit

Permalink
Merge #129571
Browse files Browse the repository at this point in the history
129571: sql: support disabling read committed and enabling repeatable read r=nvanbenschoten a=nvanbenschoten

This commit fixes the isolation level upgrade logic to upgrade `READ UNCOMMITTED` and `READ COMMITTED` to `REPEATABLE READ` (i.e. `SNAPSHOT`) when read committed isolation is disabled but repeatable read is enabled. Previously, this combination would upgrade the isolation level all the way to `SERIALIZABLE` isolation.

Epic: None
Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Aug 28, 2024
2 parents c01bec3 + 9f15f64 commit 7512f81
Show file tree
Hide file tree
Showing 10 changed files with 226 additions and 163 deletions.
59 changes: 9 additions & 50 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3529,9 +3529,7 @@ var allowReadCommittedIsolation = settings.RegisterBoolSetting(
settings.WithPublic,
)

// TODO(nvanbenschoten): rename this variable and update uses of it to favor
// "repeatable read" over "snapshot".
var allowSnapshotIsolation = settings.RegisterBoolSetting(
var allowRepeatableReadIsolation = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"sql.txn.snapshot_isolation.enabled",
"set to true to allow transactions to use the REPEATABLE READ isolation "+
Expand Down Expand Up @@ -3566,56 +3564,17 @@ func (ex *connExecutor) txnIsolationLevelToKV(
if level == tree.UnspecifiedIsolation {
level = tree.IsolationLevel(ex.sessionData().DefaultTxnIsolationLevel)
}
upgraded := false
upgradedDueToLicense := false
originalLevel := level
allowReadCommitted := allowReadCommittedIsolation.Get(&ex.server.cfg.Settings.SV)
allowRepeatableRead := allowRepeatableReadIsolation.Get(&ex.server.cfg.Settings.SV)
hasLicense := base.CCLDistributionAndEnterpriseEnabled(ex.server.cfg.Settings)
ret := isolation.Serializable
switch level {
case tree.ReadUncommittedIsolation:
// READ UNCOMMITTED is mapped to READ COMMITTED. PostgreSQL also does
// this: https://www.postgresql.org/docs/current/transaction-iso.html.
upgraded = true
fallthrough
case tree.ReadCommittedIsolation:
// READ COMMITTED is only allowed if the cluster setting is enabled and
// the cluster has a license. Otherwise it is mapped to SERIALIZABLE.
allowReadCommitted := allowReadCommittedIsolation.Get(&ex.server.cfg.Settings.SV)
if allowReadCommitted && hasLicense {
ret = isolation.ReadCommitted
} else {
upgraded = true
ret = isolation.Serializable
if allowReadCommitted && !hasLicense {
upgradedDueToLicense = true
}
}
case tree.RepeatableReadIsolation:
// REPEATABLE READ is mapped to SNAPSHOT.
upgraded = true
fallthrough
case tree.SnapshotIsolation:
// SNAPSHOT is only allowed if the cluster setting is enabled and the
// cluster has a license. Otherwise it is mapped to SERIALIZABLE.
allowSnapshot := allowSnapshotIsolation.Get(&ex.server.cfg.Settings.SV)
if allowSnapshot && hasLicense {
ret = isolation.Snapshot
} else {
upgraded = true
ret = isolation.Serializable
if allowSnapshot && !hasLicense {
upgradedDueToLicense = true
}
}
case tree.SerializableIsolation:
ret = isolation.Serializable
default:
log.Fatalf(context.Background(), "unknown isolation level: %s", level)
}

level, upgraded, upgradedDueToLicense := level.UpgradeToEnabledLevel(
allowReadCommitted, allowRepeatableRead, hasLicense)
if f := ex.dataMutatorIterator.upgradedIsolationLevel; upgraded && f != nil {
f(ctx, level, upgradedDueToLicense)
f(ctx, originalLevel, upgradedDueToLicense)
}

ret := level.ToKVIsoLevel()
if ret != isolation.Serializable {
telemetry.Inc(sqltelemetry.IsolationLevelCounter(ctx, ret))
}
Expand Down Expand Up @@ -4257,7 +4216,7 @@ func (ex *connExecutor) serialize() serverpb.Session {
Priority: ex.state.mu.priority.String(),
QualityOfService: sessiondatapb.ToQoSLevelString(txn.AdmissionHeader().Priority),
LastAutoRetryReason: autoRetryReasonStr,
IsolationLevel: tree.IsolationLevelFromKVTxnIsolationLevel(ex.state.mu.isolationLevel).String(),
IsolationLevel: tree.FromKVIsoLevel(ex.state.mu.isolationLevel).String(),
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -8079,7 +8079,7 @@ func genClusterLocksGenerator(
if curLock.LockHolder != nil {
txnIDDatum = tree.NewDUuid(tree.DUuid{UUID: curLock.LockHolder.ID})
tsDatum = eval.TimestampToInexactDTimestamp(curLock.LockHolder.WriteTimestamp)
isolationLevelDatum = tree.NewDString(tree.IsolationLevelFromKVTxnIsolationLevel(curLock.LockHolder.IsoLevel).String())
isolationLevelDatum = tree.NewDString(tree.FromKVIsoLevel(curLock.LockHolder.IsoLevel).String())
strengthDatum = tree.NewDString(curLock.LockStrength.String())
durationDatum = tree.NewDInterval(
duration.MakeDuration(curLock.HoldDuration.Nanoseconds(), 0 /* days */, 0 /* months */),
Expand All @@ -8092,7 +8092,7 @@ func genClusterLocksGenerator(
if waiter.WaitingTxn != nil {
txnIDDatum = tree.NewDUuid(tree.DUuid{UUID: waiter.WaitingTxn.ID})
tsDatum = eval.TimestampToInexactDTimestamp(waiter.WaitingTxn.WriteTimestamp)
isolationLevelDatum = tree.NewDString(tree.IsolationLevelFromKVTxnIsolationLevel(waiter.WaitingTxn.IsoLevel).String())
isolationLevelDatum = tree.NewDString(tree.FromKVIsoLevel(waiter.WaitingTxn.IsoLevel).String())
}
strengthDatum = tree.NewDString(waiter.Strength.String())
durationDatum = tree.NewDInterval(
Expand Down
39 changes: 36 additions & 3 deletions pkg/sql/logictest/testdata/logic_test/txn
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,9 @@ serializable
statement ok
COMMIT

# Since read_committed_isolation.enabled is false, setting isolation level
# to READ COMMITTED should map to SERIALIZABLE.
# Since read_committed_isolation.enabled and repeatable_read_isolation.enabled
# are both false, setting isolation level to READ COMMITTED should map to
# SERIALIZABLE.
statement ok
SET default_transaction_isolation = 'read committed'

Expand All @@ -345,10 +346,42 @@ SHOW DEFAULT_TRANSACTION_ISOLATION
----
serializable

# REPEATABLE READ can be used with a hidden cluster setting.
statement ok
SET CLUSTER SETTING sql.txn.repeatable_read_isolation.enabled = true

# Since read_committed_isolation.enabled is false but repeatable_read_isolation.enabled
# is true, setting isolation level to READ COMMITTED should map to REPEATABLE READ.
statement ok
SET default_transaction_isolation = 'read committed'

onlyif config enterprise-configs
query T
SHOW default_transaction_isolation
----
snapshot

skipif config enterprise-configs
query T
SHOW default_transaction_isolation
----
serializable

statement ok
SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED

onlyif config enterprise-configs
query T
SHOW DEFAULT_TRANSACTION_ISOLATION
----
snapshot

skipif config enterprise-configs
query T
SHOW DEFAULT_TRANSACTION_ISOLATION
----
serializable

# Since repeatable_read_isolation.enabled is true, REPEATABLE READ can be used.
statement ok
BEGIN

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/sem/eval/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,8 +608,8 @@ func (ec *Context) GetClusterTimestamp() (*tree.DDecimal, error) {
// multiple timestamps. Prevent this with a gate at the SQL level and return
// a pgerror until we decide how this will officially behave. See #103245.
if ec.TxnIsoLevel.ToleratesWriteSkew() {
treeIso := tree.IsolationLevelFromKVTxnIsolationLevel(ec.TxnIsoLevel)
return nil, pgerror.Newf(pgcode.FeatureNotSupported, "unsupported in %s isolation", treeIso.String())
return nil, pgerror.Newf(pgcode.FeatureNotSupported,
"unsupported in %s isolation", tree.FromKVIsoLevel(ec.TxnIsoLevel).String())
}

ts, err := ec.Txn.CommitTimestamp()
Expand Down
74 changes: 63 additions & 11 deletions pkg/sql/sem/tree/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,74 @@ func (i IsolationLevel) String() string {
return isolationLevelNames[i]
}

// IsolationLevelFromKVTxnIsolationLevel converts a kv level isolation.Level to
// its SQL semantic equivalent.
func IsolationLevelFromKVTxnIsolationLevel(level isolation.Level) IsolationLevel {
var ret IsolationLevel
// ToKVIsoLevel converts an IsolationLevel to its isolation.Level equivalent.
func (i IsolationLevel) ToKVIsoLevel() isolation.Level {
switch i {
case ReadUncommittedIsolation, ReadCommittedIsolation:
return isolation.ReadCommitted
case RepeatableReadIsolation, SnapshotIsolation:
return isolation.Snapshot
case SerializableIsolation:
return isolation.Serializable
default:
panic(fmt.Sprintf("unknown isolation level: %s", i))
}
}

// FromKVIsoLevel converts an isolation.Level to its SQL semantic equivalent.
func FromKVIsoLevel(level isolation.Level) IsolationLevel {
switch level {
case isolation.Serializable:
ret = SerializableIsolation
case isolation.ReadCommitted:
ret = ReadCommittedIsolation
return ReadCommittedIsolation
case isolation.Snapshot:
ret = SnapshotIsolation
return SnapshotIsolation
case isolation.Serializable:
return SerializableIsolation
default:
panic(fmt.Sprintf("unknown isolation level: %s", level))
}
}

// UpgradeToEnabledLevel upgrades the isolation level to the weakest enabled
// isolation level that is stronger than or equal to the input level.
func (i IsolationLevel) UpgradeToEnabledLevel(
allowReadCommitted, allowRepeatableRead, hasLicense bool,
) (_ IsolationLevel, upgraded, upgradedDueToLicense bool) {
switch i {
case ReadUncommittedIsolation:
// READ UNCOMMITTED is mapped to READ COMMITTED. PostgreSQL also does
// this: https://www.postgresql.org/docs/current/transaction-iso.html.
upgraded = true
fallthrough
case ReadCommittedIsolation:
// READ COMMITTED is only allowed if the cluster setting is enabled and the
// cluster has a license. Otherwise, it is mapped to a stronger isolation
// level (REPEATABLE READ if enabled, SERIALIZABLE otherwise).
if allowReadCommitted && hasLicense {
return ReadCommittedIsolation, upgraded, upgradedDueToLicense
}
upgraded = true
if allowReadCommitted && !hasLicense {
upgradedDueToLicense = true
}
fallthrough
case RepeatableReadIsolation, SnapshotIsolation:
// REPEATABLE READ and SNAPSHOT are considered aliases. The isolation levels
// are only allowed if the cluster setting is enabled and the cluster has a
// license. Otherwise, they are mapped to SERIALIZABLE.
if allowRepeatableRead && hasLicense {
return SnapshotIsolation, upgraded, upgradedDueToLicense
}
upgraded = true
if allowRepeatableRead && !hasLicense {
upgradedDueToLicense = true
}
fallthrough
case SerializableIsolation:
return SerializableIsolation, upgraded, upgradedDueToLicense
default:
panic("What to do here? Log is a banned import")
// log.Fatalf(context.Background(), "unknown isolation level: %s", level)
panic(fmt.Sprintf("unknown isolation level: %s", i))
}
return ret
}

// UserPriority holds the user priority for a transaction.
Expand Down
117 changes: 101 additions & 16 deletions pkg/sql/sem/tree/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,113 @@ import (
"github.com/stretchr/testify/require"
)

func TestIsolationLevelFromKVTxnIsolationLevel(t *testing.T) {
func TestToKVIsoLevel(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
In isolation.Level
Out tree.IsolationLevel
in tree.IsolationLevel
out isolation.Level
}{
{
In: isolation.Serializable,
Out: tree.SerializableIsolation,
},
{
In: isolation.ReadCommitted,
Out: tree.ReadCommittedIsolation,
},
{
In: isolation.Snapshot,
Out: tree.SnapshotIsolation,
},
{tree.ReadUncommittedIsolation, isolation.ReadCommitted},
{tree.ReadCommittedIsolation, isolation.ReadCommitted},
{tree.RepeatableReadIsolation, isolation.Snapshot},
{tree.SnapshotIsolation, isolation.Snapshot},
{tree.SerializableIsolation, isolation.Serializable},
}

for _, tc := range testCases {
require.Equal(t, tc.Out, tree.IsolationLevelFromKVTxnIsolationLevel(tc.In))
t.Run("", func(t *testing.T) {
require.Equal(t, tc.out, tc.in.ToKVIsoLevel())
})
}
}

func TestFromKVIsoLevel(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
in isolation.Level
out tree.IsolationLevel
}{
{isolation.ReadCommitted, tree.ReadCommittedIsolation},
{isolation.Snapshot, tree.SnapshotIsolation},
{isolation.Serializable, tree.SerializableIsolation},
}

for _, tc := range testCases {
t.Run("", func(t *testing.T) {
require.Equal(t, tc.out, tree.FromKVIsoLevel(tc.in))
})
}
}

func TestUpgradeToEnabledLevel(t *testing.T) {
defer leaktest.AfterTest(t)()

const RU = tree.ReadUncommittedIsolation
const RC = tree.ReadCommittedIsolation
const RR = tree.RepeatableReadIsolation
const SI = tree.SnapshotIsolation
const SER = tree.SerializableIsolation

testCases := []struct {
in tree.IsolationLevel
allowRC bool
allowRR bool
license bool
expOut tree.IsolationLevel
expUpgraded bool
expUpgradedDueToLicense bool
}{
{in: RU, allowRC: false, allowRR: false, license: false, expOut: SER, expUpgraded: true, expUpgradedDueToLicense: false},
{in: RU, allowRC: true, allowRR: false, license: false, expOut: SER, expUpgraded: true, expUpgradedDueToLicense: true},
{in: RU, allowRC: false, allowRR: true, license: false, expOut: SER, expUpgraded: true, expUpgradedDueToLicense: true},
{in: RU, allowRC: true, allowRR: true, license: false, expOut: SER, expUpgraded: true, expUpgradedDueToLicense: true},
{in: RU, allowRC: false, allowRR: false, license: true, expOut: SER, expUpgraded: true, expUpgradedDueToLicense: false},
{in: RU, allowRC: true, allowRR: false, license: true, expOut: RC, expUpgraded: true, expUpgradedDueToLicense: false},
{in: RU, allowRC: false, allowRR: true, license: true, expOut: SI, expUpgraded: true, expUpgradedDueToLicense: false},
{in: RU, allowRC: true, allowRR: true, license: true, expOut: RC, expUpgraded: true, expUpgradedDueToLicense: false},
{in: RC, allowRC: false, allowRR: false, license: false, expOut: SER, expUpgraded: true, expUpgradedDueToLicense: false},
{in: RC, allowRC: true, allowRR: false, license: false, expOut: SER, expUpgraded: true, expUpgradedDueToLicense: true},
{in: RC, allowRC: false, allowRR: true, license: false, expOut: SER, expUpgraded: true, expUpgradedDueToLicense: true},
{in: RC, allowRC: true, allowRR: true, license: false, expOut: SER, expUpgraded: true, expUpgradedDueToLicense: true},
{in: RC, allowRC: false, allowRR: false, license: true, expOut: SER, expUpgraded: true, expUpgradedDueToLicense: false},
{in: RC, allowRC: true, allowRR: false, license: true, expOut: RC, expUpgraded: false, expUpgradedDueToLicense: false},
{in: RC, allowRC: false, allowRR: true, license: true, expOut: SI, expUpgraded: true, expUpgradedDueToLicense: false},
{in: RC, allowRC: true, allowRR: true, license: true, expOut: RC, expUpgraded: false, expUpgradedDueToLicense: false},
{in: RR, allowRC: false, allowRR: false, license: false, expOut: SER, expUpgraded: true, expUpgradedDueToLicense: false},
{in: RR, allowRC: true, allowRR: false, license: false, expOut: SER, expUpgraded: true, expUpgradedDueToLicense: false},
{in: RR, allowRC: false, allowRR: true, license: false, expOut: SER, expUpgraded: true, expUpgradedDueToLicense: true},
{in: RR, allowRC: true, allowRR: true, license: false, expOut: SER, expUpgraded: true, expUpgradedDueToLicense: true},
{in: RR, allowRC: false, allowRR: false, license: true, expOut: SER, expUpgraded: true, expUpgradedDueToLicense: false},
{in: RR, allowRC: true, allowRR: false, license: true, expOut: SER, expUpgraded: true, expUpgradedDueToLicense: false},
{in: RR, allowRC: false, allowRR: true, license: true, expOut: SI, expUpgraded: false, expUpgradedDueToLicense: false},
{in: RR, allowRC: true, allowRR: true, license: true, expOut: SI, expUpgraded: false, expUpgradedDueToLicense: false},
{in: SI, allowRC: false, allowRR: false, license: false, expOut: SER, expUpgraded: true, expUpgradedDueToLicense: false},
{in: SI, allowRC: true, allowRR: false, license: false, expOut: SER, expUpgraded: true, expUpgradedDueToLicense: false},
{in: SI, allowRC: false, allowRR: true, license: false, expOut: SER, expUpgraded: true, expUpgradedDueToLicense: true},
{in: SI, allowRC: true, allowRR: true, license: false, expOut: SER, expUpgraded: true, expUpgradedDueToLicense: true},
{in: SI, allowRC: false, allowRR: false, license: true, expOut: SER, expUpgraded: true, expUpgradedDueToLicense: false},
{in: SI, allowRC: true, allowRR: false, license: true, expOut: SER, expUpgraded: true, expUpgradedDueToLicense: false},
{in: SI, allowRC: false, allowRR: true, license: true, expOut: SI, expUpgraded: false, expUpgradedDueToLicense: false},
{in: SI, allowRC: true, allowRR: true, license: true, expOut: SI, expUpgraded: false, expUpgradedDueToLicense: false},
{in: SER, allowRC: false, allowRR: false, license: false, expOut: SER, expUpgraded: false, expUpgradedDueToLicense: false},
{in: SER, allowRC: true, allowRR: false, license: false, expOut: SER, expUpgraded: false, expUpgradedDueToLicense: false},
{in: SER, allowRC: false, allowRR: true, license: false, expOut: SER, expUpgraded: false, expUpgradedDueToLicense: false},
{in: SER, allowRC: true, allowRR: true, license: false, expOut: SER, expUpgraded: false, expUpgradedDueToLicense: false},
{in: SER, allowRC: false, allowRR: false, license: true, expOut: SER, expUpgraded: false, expUpgradedDueToLicense: false},
{in: SER, allowRC: true, allowRR: false, license: true, expOut: SER, expUpgraded: false, expUpgradedDueToLicense: false},
{in: SER, allowRC: false, allowRR: true, license: true, expOut: SER, expUpgraded: false, expUpgradedDueToLicense: false},
{in: SER, allowRC: true, allowRR: true, license: true, expOut: SER, expUpgraded: false, expUpgradedDueToLicense: false},
}

for _, tc := range testCases {
t.Run("", func(t *testing.T) {
res, upgraded, upgradedDueToLicense := tc.in.UpgradeToEnabledLevel(
tc.allowRC, tc.allowRR, tc.license)
require.Equal(t, tc.expOut, res)
require.Equal(t, tc.expUpgraded, upgraded)
require.Equal(t, tc.expUpgradedDueToLicense, upgradedDueToLicense)
})
}
}
Loading

0 comments on commit 7512f81

Please sign in to comment.