Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - fetch: smarter peer selection #6477

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"time"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -297,7 +298,16 @@
// there is one test that covers this part.
if host != nil {
connectedf := func(peer p2p.Peer) {
if f.peers.Add(peer) {
protocols := func() []protocol.ID {
ps, err := host.Peerstore().GetProtocols(peer)
if err != nil {
f.logger.Debug("failed to get protocols for peer",
zap.Stringer("id", peer), zap.Error(err))
return nil
}

Check warning on line 307 in fetch/fetch.go

View check run for this annotation

Codecov / codecov/patch

fetch/fetch.go#L304-L307

Added lines #L304 - L307 were not covered by tests
return ps
}
if f.peers.Add(peer, protocols) {
f.logger.Debug("adding peer", zap.Stringer("id", peer))
}
}
Expand Down Expand Up @@ -703,7 +713,9 @@
rng := rand.New(rand.NewChaCha8(seed))
peer2requests := make(map[p2p.Peer][]RequestMessage)

best := f.peers.SelectBest(RedundantPeers)
// When selecting peers, provide protocol IDs so that peers that aren't yet fully
// initialized are not picked for the request, avoiding unnecessary errors.
best := f.peers.SelectBestWithProtocols(RedundantPeers, []protocol.ID{hashProtocol, activeSetProtocol})
if len(best) == 0 {
f.logger.Warn("cannot send batch: no peers found")
f.mu.Lock()
Expand Down
10 changes: 7 additions & 3 deletions fetch/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
Expand Down Expand Up @@ -187,7 +188,9 @@ func TestFetch_RequestHashBatchFromPeers(t *testing.T) {
f := createFetch(t)
f.cfg.MaxRetriesForRequest = 0
peer := p2p.Peer("buddy")
f.peers.Add(peer)
f.peers.Add(peer, func() []protocol.ID {
return []protocol.ID{hashProtocol, activeSetProtocol}
})

hsh0 := types.RandomHash()
res0 := ResponseMessage{
Expand Down Expand Up @@ -259,8 +262,9 @@ func TestFetch_Loop_BatchRequestMax(t *testing.T) {
f.cfg.BatchTimeout = 1
f.cfg.BatchSize = 2
peer := p2p.Peer("buddy")
f.peers.Add(peer)

f.peers.Add(peer, func() []protocol.ID {
return []protocol.ID{hashProtocol, activeSetProtocol}
})
h1 := types.RandomHash()
h2 := types.RandomHash()
h3 := types.RandomHash()
Expand Down
9 changes: 7 additions & 2 deletions fetch/mesh_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"

p2phost "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/protocol"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -137,7 +138,9 @@ func TestFetch_getHashes(t *testing.T) {
f.Start()
tb.Cleanup(f.Stop)
for _, peer := range peers {
f.peers.Add(peer)
f.peers.Add(peer, func() []protocol.ID {
return []protocol.ID{hashProtocol, activeSetProtocol}
})
}
f.mh.EXPECT().ID().Return("self").AnyTimes()
f.RegisterPeerHashes(peers[0], hashes[:2])
Expand Down Expand Up @@ -249,7 +252,9 @@ func TestFetch_getHashesStreaming(t *testing.T) {
f.Start()
tb.Cleanup(f.Stop)
for _, peer := range peers {
f.peers.Add(peer)
f.peers.Add(peer, func() []protocol.ID {
return []protocol.ID{hashProtocol, activeSetProtocol}
})
}
f.mh.EXPECT().ID().Return("self").AnyTimes()
f.RegisterPeerHashes(peers[0], hashes[:2])
Expand Down
34 changes: 32 additions & 2 deletions fetch/peers/peers.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package peers

import (
"slices"
fasmat marked this conversation as resolved.
Show resolved Hide resolved
"strings"
"sync"
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"go.uber.org/zap/zapcore"

"github.com/spacemeshos/go-spacemesh/p2p"
Expand All @@ -16,6 +18,7 @@ type data struct {
success, failures int
failRate float64
averageLatency float64
protocols func() []protocol.ID
}

func (d *data) latency(global float64) float64 {
Expand Down Expand Up @@ -61,14 +64,14 @@ func (p *Peers) Contains(id peer.ID) bool {
return exist
}

func (p *Peers) Add(id peer.ID) bool {
func (p *Peers) Add(id peer.ID, protocols func() []protocol.ID) bool {
p.mu.Lock()
defer p.mu.Unlock()
_, exist := p.peers[id]
if exist {
return false
}
p.peers[id] = &data{id: id}
p.peers[id] = &data{id: id, protocols: protocols}
return true
}

Expand Down Expand Up @@ -151,12 +154,39 @@ func (p *Peers) SelectBestFrom(peers []peer.ID) peer.ID {
func (p *Peers) SelectBest(n int) []peer.ID {
p.mu.Lock()
defer p.mu.Unlock()
return p.selectBest(n, nil)
}

// SelectBestWithProtocols is similar to SelectBest but filters peers by supported protocols.
// If protocols is empty, it returns the best peers regardless of the protocol.
// If protocols is not empty, it returns the best peers that support at least one of the protocols.
func (p *Peers) SelectBestWithProtocols(n int, protocols []protocol.ID) []peer.ID {
Comment on lines +160 to +163
Copy link
Member

@fasmat fasmat Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not fully understanding this - why only at least one protocol needs to be supported? The only place outside of tests where this is used activeSetProtocol and hashProtocol are requested, aren't both needed?

Copy link
Contributor Author

@ivan4th ivan4th Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that there may be multiple versions of syncv2 protocol deployed at some point in time: sync/2, sync/2.1 and so on. When choosing peers for multipeer sync, any of such nodes will do, it's just that different protocols will be in use, later versions being more traffic-efficient etc.

For the purpose of hash fetching, the requirement may be just one of activeSetProtocol / hashProtocol or all of them, depending on which IDs are being fetched. It maybe could make some sense to invent a more complex logic for paritioning the set of hashes to download by the protocol etc., but in practice the servers for activeSetProtocol and hashProtocol are enabled almost at the same instant at the end of the node initialization, and in a very unlikely case of a race the worst thing that may happen is current go-spacemesh behavior, that is, failed fetch which will be retried later, with failed peer getting lower priority when doing peer selection. So, in case of hash fetching, from practical standpoint it doesn't really matter whether we pass activeSetProtocol, hashProtocol or both to this function.

p.mu.Lock()
defer p.mu.Unlock()
return p.selectBest(n, protocols)
}

func (p *Peers) selectBest(n int, protocols []protocol.ID) []peer.ID {
slices.Sort(protocols)
protocols = slices.Compact(protocols)
lth := min(len(p.peers), n)
if lth == 0 {
return nil
}
best := make([]*data, 0, lth)
for _, peer := range p.peers {
if len(protocols) > 0 {
found := false
for _, proto := range peer.protocols() {
if slices.Contains(protocols, proto) {
found = true
break
}
}
if !found {
continue
}
}
for i := range best {
if peer.less(best[i], p.globalLatency) {
best[i], peer = peer, best[i]
Expand Down
71 changes: 70 additions & 1 deletion fetch/peers/peers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/stretchr/testify/require"
)

Expand All @@ -22,6 +23,7 @@ type event struct {
success int
failure int
latency time.Duration
protocols []protocol.ID
}

func withEvents(events []event) *Peers {
Expand All @@ -30,7 +32,7 @@ func withEvents(events []event) *Peers {
if ev.delete {
tracker.Delete(ev.id)
} else if ev.add {
tracker.Add(ev.id)
tracker.Add(ev.id, func() []protocol.ID { return ev.protocols })
}
for i := 0; i < ev.failure; i++ {
tracker.OnFailure(ev.id, 0, ev.latency)
Expand Down Expand Up @@ -210,6 +212,73 @@ func TestSelect(t *testing.T) {
}
}

func TestSelectBestWithProtocols(t *testing.T) {
for _, tc := range []struct {
desc string
events []event

n int
protocols []protocol.ID
expect []peer.ID
}{
{
desc: "no protocols required and no peer protocols",
events: []event{
{id: "a", success: 1, latency: 8, add: true},
{id: "b", success: 1, latency: 9, add: true},
{id: "c", success: 3, latency: 14, add: true},
},
n: 2,
expect: []peer.ID{"a", "b"},
protocols: nil,
},
{
desc: "no protocols required, peers have protocols",
events: []event{
{id: "a", success: 1, latency: 8, add: true, protocols: []protocol.ID{"a", "b"}},
{id: "b", success: 1, latency: 9, add: true, protocols: []protocol.ID{"b", "c"}},
{id: "c", success: 3, latency: 14, add: true, protocols: []protocol.ID{"c", "d"}},
},
n: 2,
expect: []peer.ID{"a", "b"},
protocols: nil,
},
{
desc: "single protocol required, peers have protocols",
events: []event{
{id: "a", success: 1, latency: 8, add: true, protocols: []protocol.ID{"a", "b"}},
{id: "b", success: 1, latency: 9, add: true, protocols: []protocol.ID{"b", "c"}},
{id: "c", success: 3, latency: 14, add: true, protocols: []protocol.ID{"c", "d"}},
},
n: 2,
expect: []peer.ID{"b", "c"},
protocols: []protocol.ID{"c"},
},
{
desc: "multiple protocols required, peers have protocols",
events: []event{
{id: "a", success: 1, latency: 8, add: true, protocols: []protocol.ID{"a", "b"}},
{id: "b", success: 1, latency: 9, add: true, protocols: []protocol.ID{"b", "c"}},
{id: "c", success: 3, latency: 14, add: true, protocols: []protocol.ID{"c", "d"}},
{id: "d", success: 3, latency: 12, add: true, protocols: []protocol.ID{"a", "e"}},
},
n: 3,
expect: []peer.ID{"a", "b", "c"},
protocols: []protocol.ID{"b", "c"},
},
} {
t.Run(tc.desc, func(t *testing.T) {
require.Equal(
t,
tc.expect,
withEvents(tc.events).SelectBestWithProtocols(tc.n, tc.protocols),
"select best %d",
tc.n,
)
})
}
}

func TestTotal(t *testing.T) {
const total = 100
events := []event{}
Expand Down
7 changes: 6 additions & 1 deletion sync2/multipeer/multipeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/jonboulle/clockwork"
"github.com/libp2p/go-libp2p/core/protocol"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

Expand All @@ -15,6 +16,10 @@ import (
"github.com/spacemeshos/go-spacemesh/sync2/rangesync"
)

const (
Protocol = "sync/2"
)

type syncability struct {
// peers that were probed successfully
syncable []p2p.Peer
Expand Down Expand Up @@ -283,7 +288,7 @@ func (mpr *MultiPeerReconciler) fullSync(ctx context.Context, syncPeers []p2p.Pe
func (mpr *MultiPeerReconciler) syncOnce(ctx context.Context, lastWasSplit bool) (full bool, err error) {
var s syncability
for {
syncPeers := mpr.peers.SelectBest(mpr.cfg.SyncPeerCount)
syncPeers := mpr.peers.SelectBestWithProtocols(mpr.cfg.SyncPeerCount, []protocol.ID{Protocol})
mpr.logger.Debug("selected best peers for sync",
zap.Int("syncPeerCount", mpr.cfg.SyncPeerCount),
zap.Int("totalPeers", mpr.peers.Total()),
Expand Down
3 changes: 2 additions & 1 deletion sync2/multipeer/multipeer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/jonboulle/clockwork"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"go.uber.org/zap/zaptest"
Expand Down Expand Up @@ -101,7 +102,7 @@ func (mt *multiPeerSyncTester) addPeers(n int) []p2p.Peer {
r := make([]p2p.Peer, n)
for i := 0; i < n; i++ {
p := p2p.Peer(fmt.Sprintf("peer%d", i+1))
mt.peers.Add(p)
mt.peers.Add(p, func() []protocol.ID { return []protocol.ID{multipeer.Protocol} })
r[i] = p
}
return r
Expand Down
3 changes: 2 additions & 1 deletion sync2/multipeer/split_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/jonboulle/clockwork"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/stretchr/testify/require"
gomock "go.uber.org/mock/gomock"
"go.uber.org/zap/zaptest"
Expand Down Expand Up @@ -108,7 +109,7 @@ func newTestSplitSync(t testing.TB) *splitSyncTester {
AnyTimes()
}
for _, p := range tst.syncPeers {
tst.peers.Add(p)
tst.peers.Add(p, func() []protocol.ID { return []protocol.ID{multipeer.Protocol} })
}
tst.splitSync = multipeer.NewSplitSync(
zaptest.NewLogger(t),
Expand Down
6 changes: 5 additions & 1 deletion sync2/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/libp2p/go-libp2p/core/protocol"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/p2p/server"
"github.com/spacemeshos/go-spacemesh/sync2"
"github.com/spacemeshos/go-spacemesh/sync2/multipeer"
"github.com/spacemeshos/go-spacemesh/sync2/rangesync"
)

Expand Down Expand Up @@ -88,7 +90,9 @@ func TestP2P(t *testing.T) {
ps := peers.New()
for m := 0; m < numNodes; m++ {
if m != n {
ps.Add(mesh.Hosts()[m].ID())
ps.Add(mesh.Hosts()[m].ID(), func() []protocol.ID {
return []protocol.ID{multipeer.Protocol}
})
}
}
cfg := sync2.DefaultConfig()
Expand Down
Loading