Skip to content

Commit

Permalink
refactor: session find and append/update logic
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Nov 20, 2024
1 parent 216fd70 commit 74f48d4
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 110 deletions.
170 changes: 93 additions & 77 deletions go/vt/vtgate/safe_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,73 +347,76 @@ func (session *SafeSession) InTransaction() bool {
return session.Session.InTransaction
}

// FindAndChangeSessionIfInSingleTxMode returns the transactionId and tabletAlias, if any, for a session
// modifies the shard session in a specific case for single mode transaction.
func (session *SafeSession) FindAndChangeSessionIfInSingleTxMode(keyspace, shard string, tabletType topodatapb.TabletType, txMode vtgatepb.TransactionMode) (int64, int64, *topodatapb.TabletAlias, error) {
// FindAndChangeSessionIfInSingleTxMode retrieves the ShardSession matching the given keyspace, shard, and tablet type.
// It performs additional checks and may modify the ShardSession in specific cases for single-mode transactions.
//
// Key behavior:
// 1. Retrieves the appropriate list of sessions (PreSessions, PostSessions, or default ShardSessions) based on the commit order.
// 2. Identifies a matching session by keyspace, shard, and tablet type.
// 3. If the session meets specific conditions (e.g., non-vindex-only, single transaction mode), it updates the session state:
// - Converts a vindex-only session to a standard session if required by the transaction type.
// - If a multi-shard transaction is detected in Single mode, marks the session for rollback and returns an error.
//
// Parameters:
// - keyspace: The keyspace of the target shard.
// - shard: The shard name of the target.
// - tabletType: The type of the tablet for the shard session.
// - txMode: The transaction mode (e.g., Single, Multi).
//
// Returns:
// - The matching ShardSession, if found and valid for the operation.
// - An error if a Single-mode transaction attempts to span multiple shards.
func (session *SafeSession) FindAndChangeSessionIfInSingleTxMode(keyspace, shard string, tabletType topodatapb.TabletType, txMode vtgatepb.TransactionMode) (*vtgatepb.Session_ShardSession, error) {
session.mu.Lock()
defer session.mu.Unlock()
sessions := session.ShardSessions

shardSession := session.findSessionLocked(keyspace, shard, tabletType)
if !shardSession.VindexOnly {
return shardSession, nil
}

if err := session.singleModeErrorOnCrossShard(txMode, 0); err != nil {
return nil, err
}

// the shard session is now used by non-vindex query as well,
// so it is not an exclusive vindex only shard session anymore.
shardSession.VindexOnly = false
return shardSession, nil
}

func (session *SafeSession) findSessionLocked(keyspace, shard string, tabletType topodatapb.TabletType) *vtgatepb.Session_ShardSession {
// Select the appropriate session list based on the commit order.
var sessions []*vtgatepb.Session_ShardSession
switch session.commitOrder {
case vtgatepb.CommitOrder_PRE:
sessions = session.PreSessions
case vtgatepb.CommitOrder_POST:
sessions = session.PostSessions
default:
sessions = session.ShardSessions
}

// Find and return the matching shard session.
for _, shardSession := range sessions {
if keyspace == shardSession.Target.Keyspace && tabletType == shardSession.Target.TabletType && shard == shardSession.Target.Shard {
if txMode != vtgatepb.TransactionMode_SINGLE || !shardSession.VindexOnly || session.queryFromVindex {
return shardSession.TransactionId, shardSession.ReservedId, shardSession.TabletAlias, nil
}
count := actualNoOfShardSession(session.ShardSessions)
// If the count of shard session which are non vindex only is greater than 0, then it is a
if count > 0 {
session.mustRollback = true
return 0, 0, nil, vterrors.Errorf(vtrpcpb.Code_ABORTED, "multi-db transaction attempted: %v", session.ShardSessions)
}
// the shard session is now used by non-vindex query as well,
// so it is not an exclusive vindex only shard session anymore.
shardSession.VindexOnly = false
return shardSession.TransactionId, shardSession.ReservedId, shardSession.TabletAlias, nil
}
}
return 0, 0, nil, nil
}

func addOrUpdate(shardSession *vtgatepb.Session_ShardSession, sessions []*vtgatepb.Session_ShardSession) ([]*vtgatepb.Session_ShardSession, error) {
appendSession := true
for i, sess := range sessions {
targetedAtSameTablet := sess.Target.Keyspace == shardSession.Target.Keyspace &&
sess.Target.TabletType == shardSession.Target.TabletType &&
sess.Target.Shard == shardSession.Target.Shard
if targetedAtSameTablet {
if !proto.Equal(sess.TabletAlias, shardSession.TabletAlias) {
errorDetails := fmt.Sprintf("got non-matching aliases (%v vs %v) for the same target (keyspace: %v, tabletType: %v, shard: %v)",
sess.TabletAlias, shardSession.TabletAlias,
sess.Target.Keyspace, sess.Target.TabletType, sess.Target.Shard)
return nil, vterrors.New(vtrpcpb.Code_FAILED_PRECONDITION, errorDetails)
}
// replace the old info with the new one
sessions[i] = shardSession
appendSession = false
break
if shardSession.Target.Keyspace == keyspace &&
shardSession.Target.Shard == shard &&
shardSession.Target.TabletType == tabletType {
return shardSession
}
}
if appendSession {
sessions = append(sessions, shardSession)
}

return sessions, nil
return nil
}

// AppendOrUpdate adds a new ShardSession, or updates an existing one if one already exists for the given shard session
func (session *SafeSession) AppendOrUpdate(shardSession *vtgatepb.Session_ShardSession, txMode vtgatepb.TransactionMode) error {
func (session *SafeSession) AppendOrUpdate(target *querypb.Target, info *shardActionInfo, existingSession *vtgatepb.Session_ShardSession, txMode vtgatepb.TransactionMode) error {
session.mu.Lock()
defer session.mu.Unlock()

// additional check of transaction id is required
// as now in autocommit mode there can be session due to reserved connection
// that needs to be stored as shard session.
if session.autocommitState == autocommitted && shardSession.TransactionId != 0 {
if session.autocommitState == autocommitted && info.transactionID != 0 {
// Should be unreachable
return vterrors.VT13001("unexpected 'autocommitted' state in transaction")
}
Expand All @@ -423,45 +426,58 @@ func (session *SafeSession) AppendOrUpdate(shardSession *vtgatepb.Session_ShardS
}
session.autocommitState = notAutocommittable

// Always append, in order for rollback to succeed.
switch session.commitOrder {
case vtgatepb.CommitOrder_NORMAL:
if session.queryFromVindex {
shardSession.VindexOnly = true
if existingSession != nil {
existingSession.TransactionId = info.transactionID
existingSession.ReservedId = info.reservedID
if existingSession.VindexOnly {
existingSession.VindexOnly = session.queryFromVindex
}
newSessions, err := addOrUpdate(shardSession, session.ShardSessions)
if err != nil {
if err := session.singleModeErrorOnCrossShard(txMode, 1); err != nil {
return err
}
session.ShardSessions = newSessions
return nil
}
newSession := &vtgatepb.Session_ShardSession{
Target: target,
TabletAlias: info.alias,
TransactionId: info.transactionID,
ReservedId: info.reservedID,
VindexOnly: session.queryFromVindex,
}

if session.queryFromVindex {
break
}
// isSingle is enforced only for normal commit order operations.
if session.isSingleDB(txMode) && len(session.ShardSessions) > 1 {
count := actualNoOfShardSession(session.ShardSessions)
if count <= 1 {
break
}
session.mustRollback = true
return vterrors.Errorf(vtrpcpb.Code_ABORTED, "multi-db transaction attempted: %v", session.ShardSessions)
}
case vtgatepb.CommitOrder_PRE:
newSessions, err := addOrUpdate(shardSession, session.PreSessions)
if err != nil {
// Always append, in order for rollback to succeed.
switch session.commitOrder {
case vtgatepb.CommitOrder_NORMAL:
session.ShardSessions = append(session.ShardSessions, newSession)
if err := session.singleModeErrorOnCrossShard(txMode, 1); err != nil {
return err
}
session.PreSessions = newSessions
case vtgatepb.CommitOrder_PRE:
session.PreSessions = append(session.PreSessions, newSession)
case vtgatepb.CommitOrder_POST:
newSessions, err := addOrUpdate(shardSession, session.PostSessions)
if err != nil {
return err
}
session.PostSessions = newSessions
session.PostSessions = append(session.PostSessions, newSession)
default:
// Should be unreachable
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] SafeSession.AppendOrUpdate: unexpected commitOrder")
return vterrors.VT13001(fmt.Sprintf("unexpected commitOrder to append shard session: %v", session.commitOrder))
}

return nil
}

// singleModeErrorOnCrossShard checks if a transaction violates the Single mode constraint by spanning multiple shards.
func (session *SafeSession) singleModeErrorOnCrossShard(txMode vtgatepb.TransactionMode, exceedsCrossShard int) error {
// Skip the check if:
// 1. The query comes from a lookup vindex.
// 2. The transaction mode is not Single.
// 3. The transaction is not in the normal shard session.
if session.queryFromVindex || session.commitOrder != vtgatepb.CommitOrder_NORMAL || !session.isSingleDB(txMode) {
return nil
}

// If the transaction spans multiple shards, abort it.
if actualNoOfShardSession(session.ShardSessions) > exceedsCrossShard {
session.mustRollback = true // Mark the session for rollback.
return vterrors.Errorf(vtrpcpb.Code_ABORTED, "multi-db transaction attempted: %v", session.ShardSessions)
}

return nil
Expand Down
100 changes: 88 additions & 12 deletions go/vt/vtgate/safe_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,101 @@ import (
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
)

// TestFailToMultiShardWhenSetToSingleDb tests that single db transactions fails on going multi shard.
func TestFailToMultiShardWhenSetToSingleDb(t *testing.T) {
session := NewSafeSession(&vtgatepb.Session{
InTransaction: true, TransactionMode: vtgatepb.TransactionMode_SINGLE,
})

sess0 := &vtgatepb.Session_ShardSession{
Target: &querypb.Target{Keyspace: "keyspace", Shard: "0"},
TabletAlias: &topodatapb.TabletAlias{Cell: "cell", Uid: 0},
TransactionId: 1,
}
sess1 := &vtgatepb.Session_ShardSession{
Target: &querypb.Target{Keyspace: "keyspace", Shard: "1"},
TabletAlias: &topodatapb.TabletAlias{Cell: "cell", Uid: 1},
TransactionId: 1,
}
err := session.AppendOrUpdate(
&querypb.Target{Keyspace: "keyspace", Shard: "0"},
&shardActionInfo{transactionID: 1, alias: &topodatapb.TabletAlias{Cell: "cell", Uid: 0}},
nil,
vtgatepb.TransactionMode_SINGLE)
require.NoError(t, err)
err = session.AppendOrUpdate(
&querypb.Target{Keyspace: "keyspace", Shard: "1"},
&shardActionInfo{transactionID: 1, alias: &topodatapb.TabletAlias{Cell: "cell", Uid: 1}},
nil,
vtgatepb.TransactionMode_SINGLE)
require.Error(t, err)
}

// TestSingleDbUpdateToMultiShard tests that a single db transaction cannot be updated to multi shard.
func TestSingleDbUpdateToMultiShard(t *testing.T) {
session := NewSafeSession(&vtgatepb.Session{
InTransaction: true, TransactionMode: vtgatepb.TransactionMode_SINGLE,
})

// shard session s0 due to a vindex query
session.queryFromVindex = true
err := session.AppendOrUpdate(
&querypb.Target{Keyspace: "keyspace", Shard: "0"},
&shardActionInfo{transactionID: 1, alias: &topodatapb.TabletAlias{Cell: "cell", Uid: 0}},
nil,
vtgatepb.TransactionMode_SINGLE)
require.NoError(t, err)
session.queryFromVindex = false

// shard session s1
err = session.AppendOrUpdate(
&querypb.Target{Keyspace: "keyspace", Shard: "1"},
&shardActionInfo{transactionID: 1, alias: &topodatapb.TabletAlias{Cell: "cell", Uid: 1}},
nil,
vtgatepb.TransactionMode_SINGLE)
require.NoError(t, err)

// shard session s0 with normal query
err = session.AppendOrUpdate(
&querypb.Target{Keyspace: "keyspace", Shard: "0"},
&shardActionInfo{transactionID: 1, alias: &topodatapb.TabletAlias{Cell: "cell", Uid: 1}},
session.ShardSessions[0],
vtgatepb.TransactionMode_SINGLE)
require.Error(t, err)
}

// TestSingleDbPreFailOnFind tests that finding a shard session fails
// if already shard session exists on another shard and the query is not from vindex.
func TestSingleDbPreFailOnFind(t *testing.T) {
session := NewSafeSession(&vtgatepb.Session{
InTransaction: true, TransactionMode: vtgatepb.TransactionMode_SINGLE,
})

// shard session s0 due to a vindex query
session.queryFromVindex = true
err := session.AppendOrUpdate(
&querypb.Target{Keyspace: "keyspace", Shard: "0"},
&shardActionInfo{transactionID: 1, alias: &topodatapb.TabletAlias{Cell: "cell", Uid: 0}},
nil,
vtgatepb.TransactionMode_SINGLE)
require.NoError(t, err)
session.queryFromVindex = false

// shard session s1
err = session.AppendOrUpdate(
&querypb.Target{Keyspace: "keyspace", Shard: "1"},
&shardActionInfo{transactionID: 1, alias: &topodatapb.TabletAlias{Cell: "cell", Uid: 1}},
nil,
vtgatepb.TransactionMode_SINGLE)
require.NoError(t, err)

err := session.AppendOrUpdate(sess0, vtgatepb.TransactionMode_SINGLE)
// shard session s1 for normal query again - should not fail as already part of the session.
ss, err := session.FindAndChangeSessionIfInSingleTxMode(
"keyspace",
"1",
topodatapb.TabletType_UNKNOWN,
vtgatepb.TransactionMode_SINGLE)
require.NoError(t, err)
err = session.AppendOrUpdate(sess1, vtgatepb.TransactionMode_SINGLE)
require.NotNil(t, ss)
require.False(t, ss.VindexOnly)
require.EqualValues(t, 1, ss.TabletAlias.Uid)

// shard session s0 for normal query
_, err = session.FindAndChangeSessionIfInSingleTxMode(
"keyspace",
"0",
topodatapb.TabletType_UNKNOWN,
vtgatepb.TransactionMode_SINGLE)
require.Error(t, err)
}

Expand Down
Loading

0 comments on commit 74f48d4

Please sign in to comment.