Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
129747: raft: export log mark type r=sumeerbhola a=pav-kv

The `LogMark` type will be used for log writes processing in the client code of `raft` package, instead of ad-hoc term/index pairs. The type documents the meaning of "leader term" and underlines the difference with the "entry term". Some raft APIs will be converted to expose this type too.

Epic: none
Release note: none

129749: crosscluster/physical: defensively check errCh in span config event stream r=dt a=msbutler

Select does not choose which channel to read from based on some order, but the span config event stream must return as soon as there is an error. For this reason, this patch rechecks the errCh after select chooses to read from the data channel.

Informs #128865

Release note: none

Co-authored-by: Pavel Kalinnikov <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
3 people committed Aug 27, 2024
3 parents 18795ed + baeb4ec + 8d6de3b commit f53a4ca
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 74 deletions.
7 changes: 6 additions & 1 deletion pkg/ccl/crosscluster/producer/span_config_event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,12 @@ func (s *spanConfigEventStream) Next(ctx context.Context) (bool, error) {
case err := <-s.errCh:
return false, err
case s.data = <-s.streamCh:
return true, nil
select {
case err := <-s.errCh:
return false, err
default:
return true, nil
}
}
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,19 +349,19 @@ func (l *raftLog) lastIndex() uint64 {

// commitTo bumps the commit index to the given value if it is higher than the
// current commit index.
func (l *raftLog) commitTo(mark logMark) {
func (l *raftLog) commitTo(mark LogMark) {
// TODO(pav-kv): it is only safe to update the commit index if our log is
// consistent with the mark.term leader. If the mark.term leader sees the
// mark.index entry as committed, all future leaders have it in the log. It is
// thus safe to bump the commit index to min(mark.index, lastIndex) if our
// accTerm >= mark.term. Do this once raftLog/unstable tracks the accTerm.

// never decrease commit
if l.committed < mark.index {
if l.lastIndex() < mark.index {
l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", mark.index, l.lastIndex())
if l.committed < mark.Index {
if l.lastIndex() < mark.Index {
l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", mark.Index, l.lastIndex())
}
l.committed = mark.index
l.committed = mark.Index
}
}

Expand Down Expand Up @@ -400,7 +400,7 @@ func (l *raftLog) acceptApplying(i uint64, size entryEncodingSize, allowUnstable
i < l.maxAppliableIndex(allowUnstable)
}

func (l *raftLog) stableTo(mark logMark) { l.unstable.stableTo(mark) }
func (l *raftLog) stableTo(mark LogMark) { l.unstable.stableTo(mark) }

func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }

Expand Down
64 changes: 32 additions & 32 deletions pkg/raft/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,8 @@ func TestHasNextCommittedEnts(t *testing.T) {
require.True(t, raftLog.append(init))
require.NoError(t, storage.Append(init.entries[:1]))

raftLog.stableTo(logMark{term: init.term, index: 4})
raftLog.commitTo(logMark{term: init.term, index: 5})
raftLog.stableTo(LogMark{Term: init.term, Index: 4})
raftLog.commitTo(LogMark{Term: init.term, Index: 5})
raftLog.appliedTo(tt.applied, 0 /* size */)
raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable)
raftLog.applyingEntsPaused = tt.paused
Expand Down Expand Up @@ -390,8 +390,8 @@ func TestNextCommittedEnts(t *testing.T) {
require.True(t, raftLog.append(init))
require.NoError(t, storage.Append(init.entries[:1]))

raftLog.stableTo(logMark{term: init.term, index: 4})
raftLog.commitTo(logMark{term: init.term, index: 5})
raftLog.stableTo(LogMark{Term: init.term, Index: 4})
raftLog.commitTo(LogMark{Term: init.term, Index: 5})
raftLog.appliedTo(tt.applied, 0 /* size */)
raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable)
raftLog.applyingEntsPaused = tt.paused
Expand Down Expand Up @@ -444,8 +444,8 @@ func TestAcceptApplying(t *testing.T) {
require.True(t, raftLog.append(init))
require.NoError(t, storage.Append(init.entries[:1]))

raftLog.stableTo(logMark{term: init.term, index: 4})
raftLog.commitTo(logMark{term: init.term, index: 5})
raftLog.stableTo(LogMark{Term: init.term, Index: 4})
raftLog.commitTo(LogMark{Term: init.term, Index: 5})
raftLog.appliedTo(3, 0 /* size */)

raftLog.acceptApplying(tt.index, tt.size, tt.allowUnstable)
Expand Down Expand Up @@ -488,8 +488,8 @@ func TestAppliedTo(t *testing.T) {
require.True(t, raftLog.append(init))
require.NoError(t, storage.Append(init.entries[:1]))

raftLog.stableTo(logMark{term: init.term, index: 4})
raftLog.commitTo(logMark{term: init.term, index: 5})
raftLog.stableTo(LogMark{Term: init.term, Index: 4})
raftLog.commitTo(LogMark{Term: init.term, Index: 5})
raftLog.appliedTo(3, 0 /* size */)
raftLog.acceptApplying(5, maxSize+overshoot, false /* allowUnstable */)

Expand Down Expand Up @@ -536,13 +536,13 @@ func TestCommitTo(t *testing.T) {
init := entryID{}.append(1, 2, 3)
commit := uint64(2)
for _, tt := range []struct {
commit logMark
commit LogMark
want uint64
panic bool
}{
{commit: logMark{term: 3, index: 3}, want: 3},
{commit: logMark{term: 3, index: 2}, want: 2}, // commit does not regress
{commit: logMark{term: 3, index: 4}, panic: true}, // commit out of range -> panic
{commit: LogMark{Term: 3, Index: 3}, want: 3},
{commit: LogMark{Term: 3, Index: 2}, want: 2}, // commit does not regress
{commit: LogMark{Term: 3, Index: 4}, panic: true}, // commit out of range -> panic
// TODO(pav-kv): add commit marks with a different term.
} {
t.Run("", func(t *testing.T) {
Expand All @@ -563,18 +563,18 @@ func TestCommitTo(t *testing.T) {
func TestStableTo(t *testing.T) {
init := entryID{}.append(1, 2)
for _, tt := range []struct {
mark logMark
mark LogMark
want uint64 // prev.index
}{
// out of bounds
{mark: logMark{term: 2, index: 0}, want: 0},
{mark: logMark{term: 2, index: 3}, want: 0},
{mark: LogMark{Term: 2, Index: 0}, want: 0},
{mark: LogMark{Term: 2, Index: 3}, want: 0},
// outdated accepted term
{mark: logMark{term: 1, index: 1}, want: 0},
{mark: logMark{term: 1, index: 2}, want: 0},
{mark: LogMark{Term: 1, Index: 1}, want: 0},
{mark: LogMark{Term: 1, Index: 2}, want: 0},
// successful acknowledgements
{mark: logMark{term: 2, index: 1}, want: 1},
{mark: logMark{term: 2, index: 2}, want: 2},
{mark: LogMark{Term: 2, Index: 1}, want: 1},
{mark: LogMark{Term: 2, Index: 2}, want: 2},
} {
t.Run("", func(t *testing.T) {
raftLog := newLog(NewMemoryStorage(), discardLogger)
Expand All @@ -590,24 +590,24 @@ func TestStableToWithSnap(t *testing.T) {
snap := pb.Snapshot{Metadata: pb.SnapshotMetadata{Term: snapID.term, Index: snapID.index}}
for _, tt := range []struct {
sl logSlice
to logMark
to LogMark
want uint64 // prev.index
}{
// out of bounds
{sl: snapID.append(), to: logMark{term: 1, index: 2}, want: 5},
{sl: snapID.append(), to: logMark{term: 2, index: 6}, want: 5},
{sl: snapID.append(), to: logMark{term: 2, index: 7}, want: 5},
{sl: snapID.append(6, 6, 8), to: logMark{term: 2, index: 4}, want: 5},
{sl: snapID.append(6, 6, 8), to: logMark{term: 2, index: 10}, want: 5},
{sl: snapID.append(), to: LogMark{Term: 1, Index: 2}, want: 5},
{sl: snapID.append(), to: LogMark{Term: 2, Index: 6}, want: 5},
{sl: snapID.append(), to: LogMark{Term: 2, Index: 7}, want: 5},
{sl: snapID.append(6, 6, 8), to: LogMark{Term: 2, Index: 4}, want: 5},
{sl: snapID.append(6, 6, 8), to: LogMark{Term: 2, Index: 10}, want: 5},
// successful acknowledgements
{sl: snapID.append(6, 6, 8), to: logMark{term: 8, index: 5}, want: 5},
{sl: snapID.append(6, 6, 8), to: logMark{term: 8, index: 6}, want: 6},
{sl: snapID.append(6, 6, 8), to: logMark{term: 8, index: 7}, want: 7},
{sl: snapID.append(6, 6, 8), to: logMark{term: 8, index: 8}, want: 8},
{sl: snapID.append(6, 6, 8), to: LogMark{Term: 8, Index: 5}, want: 5},
{sl: snapID.append(6, 6, 8), to: LogMark{Term: 8, Index: 6}, want: 6},
{sl: snapID.append(6, 6, 8), to: LogMark{Term: 8, Index: 7}, want: 7},
{sl: snapID.append(6, 6, 8), to: LogMark{Term: 8, Index: 8}, want: 8},
// mismatching accepted term
{sl: snapID.append(6, 6, 8), to: logMark{term: 3, index: 6}, want: 5},
{sl: snapID.append(6, 6, 8), to: logMark{term: 3, index: 7}, want: 5},
{sl: snapID.append(6, 6, 8), to: logMark{term: 3, index: 8}, want: 5},
{sl: snapID.append(6, 6, 8), to: LogMark{Term: 3, Index: 6}, want: 5},
{sl: snapID.append(6, 6, 8), to: LogMark{Term: 3, Index: 7}, want: 5},
{sl: snapID.append(6, 6, 8), to: LogMark{Term: 3, Index: 8}, want: 5},
} {
t.Run("", func(t *testing.T) {
s := NewMemoryStorage()
Expand Down
18 changes: 9 additions & 9 deletions pkg/raft/log_unstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,34 +172,34 @@ func (u *unstable) acceptInProgress() {
//
// The method makes sure the entries can not be overwritten by an in-progress
// log append. See the related comment in newStorageAppendRespMsg.
func (u *unstable) stableTo(mark logMark) {
if mark.term != u.term {
func (u *unstable) stableTo(mark LogMark) {
if mark.Term != u.term {
// The last accepted term has changed. Ignore. This is possible if part or
// all of the unstable log was replaced between that time that a set of
// entries started to be written to stable storage and when they finished.
u.logger.Infof("mark (term,index)=(%d,%d) mismatched the last accepted "+
"term %d in unstable log; ignoring ", mark.term, mark.index, u.term)
"term %d in unstable log; ignoring ", mark.Term, mark.Index, u.term)
return
}
if u.snapshot != nil && mark.index == u.snapshot.Metadata.Index {
if u.snapshot != nil && mark.Index == u.snapshot.Metadata.Index {
// Index matched unstable snapshot, not unstable entry. Ignore.
u.logger.Infof("entry at index %d matched unstable snapshot; ignoring", mark.index)
u.logger.Infof("entry at index %d matched unstable snapshot; ignoring", mark.Index)
return
}
if mark.index <= u.prev.index || mark.index > u.lastIndex() {
if mark.Index <= u.prev.index || mark.Index > u.lastIndex() {
// Unstable entry missing. Ignore.
u.logger.Infof("entry at index %d missing from unstable log; ignoring", mark.index)
u.logger.Infof("entry at index %d missing from unstable log; ignoring", mark.Index)
return
}
if u.snapshot != nil {
u.logger.Panicf("mark %+v acked earlier than the snapshot(in-progress=%t): %s",
mark, u.snapshotInProgress, DescribeSnapshot(*u.snapshot))
}
u.logSlice = u.forward(mark.index)
u.logSlice = u.forward(mark.Index)
// TODO(pav-kv): why can mark.index overtake u.entryInProgress? Probably bugs
// in tests using the log writes incorrectly, e.g. TestLeaderStartReplication
// takes nextUnstableEnts() without acceptInProgress().
u.entryInProgress = max(u.entryInProgress, mark.index)
u.entryInProgress = max(u.entryInProgress, mark.Index)
u.shrinkEntriesArray()
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/raft/log_unstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func TestUnstableStableTo(t *testing.T) {
u.stableSnapTo(u.snapshot.Metadata.Index)
}
u.checkInvariants(t)
u.stableTo(logMark{term: tt.term, index: tt.index})
u.stableTo(LogMark{Term: tt.term, Index: tt.index})
u.checkInvariants(t)
require.Equal(t, tt.wprev, u.prev.index)
require.Equal(t, tt.wentryInProgress, u.entryInProgress)
Expand Down
10 changes: 5 additions & 5 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ func (r *raft) maybeCommit() bool {
if !r.raftLog.matchTerm(entryID{term: r.Term, index: index}) {
return false
}
r.raftLog.commitTo(logMark{term: r.Term, index: index})
r.raftLog.commitTo(LogMark{Term: r.Term, Index: index})
return true
}

Expand Down Expand Up @@ -1278,7 +1278,7 @@ func (r *raft) Step(m pb.Message) error {
r.appliedSnap(m.Snapshot)
}
if m.Index != 0 {
r.raftLog.stableTo(logMark{term: m.LogTerm, index: m.Index})
r.raftLog.stableTo(LogMark{Term: m.LogTerm, Index: m.Index})
}

case pb.MsgStorageApplyResp:
Expand Down Expand Up @@ -1881,7 +1881,7 @@ func (r *raft) handleAppendEntries(m pb.Message) {
// committed entries at m.Term (by raft invariants), so it is safe to bump
// the commit index even if the MsgApp is stale.
lastIndex := a.lastIndex()
r.raftLog.commitTo(logMark{term: m.Term, index: min(m.Commit, lastIndex)})
r.raftLog.commitTo(LogMark{Term: m.Term, Index: min(m.Commit, lastIndex)})
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: lastIndex,
Commit: r.raftLog.committed})
return
Expand Down Expand Up @@ -1959,8 +1959,8 @@ func (r *raft) handleHeartbeat(m pb.Message) {
// commit index if accTerm >= m.Term.
// TODO(pav-kv): move this logic to raftLog.commitTo, once the accTerm has
// migrated to raftLog/unstable.
mark := logMark{term: m.Term, index: min(m.Commit, r.raftLog.lastIndex())}
if mark.term == r.raftLog.accTerm() {
mark := LogMark{Term: m.Term, Index: min(m.Commit, r.raftLog.lastIndex())}
if mark.Term == r.raftLog.accTerm() {
r.raftLog.commitTo(mark)
}
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp})
Expand Down
6 changes: 3 additions & 3 deletions pkg/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1158,7 +1158,7 @@ func TestHandleHeartbeat(t *testing.T) {
require.NoError(t, storage.Append(init.entries))
sm := newTestRaft(1, 5, 1, storage)
sm.becomeFollower(init.term, 2)
sm.raftLog.commitTo(logMark{term: init.term, index: commit})
sm.raftLog.commitTo(LogMark{Term: init.term, Index: commit})
sm.handleHeartbeat(tt.m)
assert.Equal(t, tt.wCommit, sm.raftLog.committed, "#%d", i)
m := sm.readMessages()
Expand All @@ -1175,7 +1175,7 @@ func TestHandleHeartbeatResp(t *testing.T) {
sm := newTestRaft(1, 5, 1, storage)
sm.becomeCandidate()
sm.becomeLeader()
sm.raftLog.commitTo(logMark{term: 3, index: sm.raftLog.lastIndex()})
sm.raftLog.commitTo(LogMark{Term: 3, Index: sm.raftLog.lastIndex()})

// A heartbeat response from a node that is behind; re-send MsgApp
sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp})
Expand Down Expand Up @@ -2359,7 +2359,7 @@ func TestRestoreIgnoreSnapshot(t *testing.T) {
storage := newTestMemoryStorage(withPeers(1, 2))
sm := newTestRaft(1, 10, 1, storage)
require.True(t, sm.raftLog.append(init))
sm.raftLog.commitTo(logMark{term: init.term, index: commit})
sm.raftLog.commitTo(LogMark{Term: init.term, Index: commit})

s := snapshot{
term: 1,
Expand Down
30 changes: 15 additions & 15 deletions pkg/raft/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,21 @@ func pbEntryID(entry *pb.Entry) entryID {
return entryID{term: entry.Term, index: entry.Index}
}

// logMark is a position in a log consistent with the leader at a specific term.
// LogMark is a position in a log consistent with the leader at a specific term.
//
// This is different from entryID. The entryID ties an entry to the term of the
// leader who proposed it, while the logMark identifies an entry in a particular
// leader who proposed it, while the LogMark identifies an entry in a particular
// leader's coordinate system. Different leaders can have different entries at a
// particular index.
//
// Generally, all entries in raft form a tree (branching when a new leader
// starts proposing entries at its term). A logMark identifies a position in a
// starts proposing entries at its term). A LogMark identifies a position in a
// particular branch of this tree.
type logMark struct {
// term is the term of the leader whose log is considered.
term uint64
// index is the position in this leader's log.
index uint64
type LogMark struct {
// Term is the term of the leader whose log is considered.
Term uint64
// Index is the position in this leader's log.
Index uint64
}

// logSlice describes a correct slice of a raft log.
Expand Down Expand Up @@ -108,9 +108,9 @@ func (s logSlice) lastEntryID() entryID {
return s.prev
}

// mark returns the logMark identifying the end of this logSlice.
func (s logSlice) mark() logMark {
return logMark{term: s.term, index: s.lastIndex()}
// mark returns the LogMark identifying the end of this logSlice.
func (s logSlice) mark() LogMark {
return LogMark{Term: s.term, Index: s.lastIndex()}
}

// termAt returns the term of the entry at the given index.
Expand Down Expand Up @@ -153,7 +153,7 @@ func (s logSlice) valid() error {
// observed this committed state.
//
// Semantically, from the log perspective, this type is equivalent to a logSlice
// from 0 to lastEntryID(), plus a commit logMark. All leader logs at terms >=
// from 0 to lastEntryID(), plus a commit LogMark. All leader logs at terms >=
// snapshot.term contain all entries up to the lastEntryID(). At earlier terms,
// logs may or may not be consistent with this snapshot, depending on whether
// they contain the lastEntryID().
Expand Down Expand Up @@ -186,10 +186,10 @@ func (s snapshot) lastEntryID() entryID {
return entryID{term: s.snap.Metadata.Term, index: s.snap.Metadata.Index}
}

// mark returns committed logMark of this snapshot, in the coordinate system of
// mark returns committed LogMark of this snapshot, in the coordinate system of
// the leader who observes this committed state.
func (s snapshot) mark() logMark {
return logMark{term: s.term, index: s.snap.Metadata.Index}
func (s snapshot) mark() LogMark {
return LogMark{Term: s.term, Index: s.snap.Metadata.Index}
}

// valid returns nil iff the snapshot is well-formed.
Expand Down
4 changes: 2 additions & 2 deletions pkg/raft/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestLogSlice(t *testing.T) {
last := s.lastEntryID()
require.Equal(t, tt.last, last)
require.Equal(t, last.index, s.lastIndex())
require.Equal(t, logMark{term: tt.term, index: last.index}, s.mark())
require.Equal(t, LogMark{Term: tt.term, Index: last.index}, s.mark())

require.Equal(t, tt.prev.term, s.termAt(tt.prev.index))
for _, e := range tt.entries {
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestSnapshot(t *testing.T) {
last := s.lastEntryID()
require.Equal(t, tt.last, last)
require.Equal(t, last.index, s.lastIndex())
require.Equal(t, logMark{term: tt.term, index: last.index}, s.mark())
require.Equal(t, LogMark{Term: tt.term, Index: last.index}, s.mark())
})
}
}

0 comments on commit f53a4ca

Please sign in to comment.