From 6dde162706f13362f5f7634480c4f072392ba0d6 Mon Sep 17 00:00:00 2001 From: Ivan Shvedunov Date: Wed, 20 Nov 2024 18:53:22 +0400 Subject: [PATCH 1/5] fetch: smarter peer selection Add a possibility to select best peers by their supported protocols, which is used by syncv2. When doing hash requests, only query peers that support hs/1 and as/1 protocols. This avoids querying peers that didn't finish their initialization and thus didn't start their fetch servers yet. --- fetch/fetch.go | 16 ++++++- fetch/fetch_test.go | 10 +++-- fetch/mesh_data_test.go | 9 +++- fetch/peers/peers.go | 38 +++++++++++++++- fetch/peers/peers_test.go | 71 +++++++++++++++++++++++++++++- sync2/multipeer/multipeer.go | 7 ++- sync2/multipeer/multipeer_test.go | 3 +- sync2/multipeer/split_sync_test.go | 3 +- sync2/p2p_test.go | 6 ++- 9 files changed, 149 insertions(+), 14 deletions(-) diff --git a/fetch/fetch.go b/fetch/fetch.go index 7517dcc01a..de5afcebc5 100644 --- a/fetch/fetch.go +++ b/fetch/fetch.go @@ -12,6 +12,7 @@ import ( "time" "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/protocol" "go.uber.org/zap" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" @@ -297,7 +298,16 @@ func NewFetch( // there is one test that covers this part. if host != nil { connectedf := func(peer p2p.Peer) { - if f.peers.Add(peer) { + protocols := func() []protocol.ID { + ps, err := host.Peerstore().GetProtocols(peer) + if err != nil { + f.logger.Debug("failed to get protocols for peer", + zap.Stringer("id", peer), zap.Error(err)) + return nil + } + return ps + } + if f.peers.Add(peer, protocols) { f.logger.Debug("adding peer", zap.Stringer("id", peer)) } } @@ -703,7 +713,9 @@ func (f *Fetch) organizeRequests(requests []RequestMessage) map[p2p.Peer][]*batc rng := rand.New(rand.NewChaCha8(seed)) peer2requests := make(map[p2p.Peer][]RequestMessage) - best := f.peers.SelectBest(RedundantPeers) + // When selecting peers, provide protocol IDs so that peers that aren't yet fully + // initialized are not picked for the request, avoiding unnecessary errors. + best := f.peers.SelectBestWithProtocols(RedundantPeers, []protocol.ID{hashProtocol, activeSetProtocol}) if len(best) == 0 { f.logger.Warn("cannot send batch: no peers found") f.mu.Lock() diff --git a/fetch/fetch_test.go b/fetch/fetch_test.go index 50e81d9fe7..84cf24125b 100644 --- a/fetch/fetch_test.go +++ b/fetch/fetch_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" @@ -187,7 +188,9 @@ func TestFetch_RequestHashBatchFromPeers(t *testing.T) { f := createFetch(t) f.cfg.MaxRetriesForRequest = 0 peer := p2p.Peer("buddy") - f.peers.Add(peer) + f.peers.Add(peer, func() []protocol.ID { + return []protocol.ID{hashProtocol, activeSetProtocol} + }) hsh0 := types.RandomHash() res0 := ResponseMessage{ @@ -259,8 +262,9 @@ func TestFetch_Loop_BatchRequestMax(t *testing.T) { f.cfg.BatchTimeout = 1 f.cfg.BatchSize = 2 peer := p2p.Peer("buddy") - f.peers.Add(peer) - + f.peers.Add(peer, func() []protocol.ID { + return []protocol.ID{hashProtocol, activeSetProtocol} + }) h1 := types.RandomHash() h2 := types.RandomHash() h3 := types.RandomHash() diff --git a/fetch/mesh_data_test.go b/fetch/mesh_data_test.go index 713c96a7b7..918503a5c6 100644 --- a/fetch/mesh_data_test.go +++ b/fetch/mesh_data_test.go @@ -8,6 +8,7 @@ import ( "testing" p2phost "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/protocol" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -137,7 +138,9 @@ func TestFetch_getHashes(t *testing.T) { f.Start() tb.Cleanup(f.Stop) for _, peer := range peers { - f.peers.Add(peer) + f.peers.Add(peer, func() []protocol.ID { + return []protocol.ID{hashProtocol, activeSetProtocol} + }) } f.mh.EXPECT().ID().Return("self").AnyTimes() f.RegisterPeerHashes(peers[0], hashes[:2]) @@ -249,7 +252,9 @@ func TestFetch_getHashesStreaming(t *testing.T) { f.Start() tb.Cleanup(f.Stop) for _, peer := range peers { - f.peers.Add(peer) + f.peers.Add(peer, func() []protocol.ID { + return []protocol.ID{hashProtocol, activeSetProtocol} + }) } f.mh.EXPECT().ID().Return("self").AnyTimes() f.RegisterPeerHashes(peers[0], hashes[:2]) diff --git a/fetch/peers/peers.go b/fetch/peers/peers.go index 91d581c474..b13fa20cef 100644 --- a/fetch/peers/peers.go +++ b/fetch/peers/peers.go @@ -6,6 +6,7 @@ import ( "time" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" "go.uber.org/zap/zapcore" "github.com/spacemeshos/go-spacemesh/p2p" @@ -16,6 +17,7 @@ type data struct { success, failures int failRate float64 averageLatency float64 + protocols func() []protocol.ID } func (d *data) latency(global float64) float64 { @@ -61,14 +63,14 @@ func (p *Peers) Contains(id peer.ID) bool { return exist } -func (p *Peers) Add(id peer.ID) bool { +func (p *Peers) Add(id peer.ID, protocols func() []protocol.ID) bool { p.mu.Lock() defer p.mu.Unlock() _, exist := p.peers[id] if exist { return false } - p.peers[id] = &data{id: id} + p.peers[id] = &data{id: id, protocols: protocols} return true } @@ -151,12 +153,44 @@ func (p *Peers) SelectBestFrom(peers []peer.ID) peer.ID { func (p *Peers) SelectBest(n int) []peer.ID { p.mu.Lock() defer p.mu.Unlock() + return p.selectBest(n, nil) +} + +// SelectBestWithProtocols is similar to SelectBest but filters peers by supported protocols. +// If protocols is empty, it returns the best peers regardless of the protocol. +// If protocols is not empty, it returns the best peers that support at least one of the protocols. +func (p *Peers) SelectBestWithProtocols(n int, protocols []protocol.ID) []peer.ID { + p.mu.Lock() + defer p.mu.Unlock() + return p.selectBest(n, protocols) +} + +func (p *Peers) selectBest(n int, protocols []protocol.ID) []peer.ID { + var protoMap map[protocol.ID]struct{} + if len(protocols) > 0 { + protoMap = make(map[protocol.ID]struct{}, len(protocols)) + for _, proto := range protocols { + protoMap[proto] = struct{}{} + } + } lth := min(len(p.peers), n) if lth == 0 { return nil } best := make([]*data, 0, lth) for _, peer := range p.peers { + if protoMap != nil { + found := false + for _, proto := range peer.protocols() { + if _, exist := protoMap[proto]; exist { + found = true + break + } + } + if !found { + continue + } + } for i := range best { if peer.less(best[i], p.globalLatency) { best[i], peer = peer, best[i] diff --git a/fetch/peers/peers_test.go b/fetch/peers/peers_test.go index 611a79106f..db77721432 100644 --- a/fetch/peers/peers_test.go +++ b/fetch/peers/peers_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" "github.com/stretchr/testify/require" ) @@ -22,6 +23,7 @@ type event struct { success int failure int latency time.Duration + protocols []protocol.ID } func withEvents(events []event) *Peers { @@ -30,7 +32,7 @@ func withEvents(events []event) *Peers { if ev.delete { tracker.Delete(ev.id) } else if ev.add { - tracker.Add(ev.id) + tracker.Add(ev.id, func() []protocol.ID { return ev.protocols }) } for i := 0; i < ev.failure; i++ { tracker.OnFailure(ev.id, 0, ev.latency) @@ -210,6 +212,73 @@ func TestSelect(t *testing.T) { } } +func TestSelectBestWithProtocols(t *testing.T) { + for _, tc := range []struct { + desc string + events []event + + n int + protocols []protocol.ID + expect []peer.ID + }{ + { + desc: "no protocols required and no peer protocols", + events: []event{ + {id: "a", success: 1, latency: 8, add: true}, + {id: "b", success: 1, latency: 9, add: true}, + {id: "c", success: 3, latency: 14, add: true}, + }, + n: 2, + expect: []peer.ID{"a", "b"}, + protocols: nil, + }, + { + desc: "no protocols required, peers have protocols", + events: []event{ + {id: "a", success: 1, latency: 8, add: true, protocols: []protocol.ID{"a", "b"}}, + {id: "b", success: 1, latency: 9, add: true, protocols: []protocol.ID{"b", "c"}}, + {id: "c", success: 3, latency: 14, add: true, protocols: []protocol.ID{"c", "d"}}, + }, + n: 2, + expect: []peer.ID{"a", "b"}, + protocols: nil, + }, + { + desc: "single protocol required, peers have protocols", + events: []event{ + {id: "a", success: 1, latency: 8, add: true, protocols: []protocol.ID{"a", "b"}}, + {id: "b", success: 1, latency: 9, add: true, protocols: []protocol.ID{"b", "c"}}, + {id: "c", success: 3, latency: 14, add: true, protocols: []protocol.ID{"c", "d"}}, + }, + n: 2, + expect: []peer.ID{"b", "c"}, + protocols: []protocol.ID{"c"}, + }, + { + desc: "multiple protocols required, peers have protocols", + events: []event{ + {id: "a", success: 1, latency: 8, add: true, protocols: []protocol.ID{"a", "b"}}, + {id: "b", success: 1, latency: 9, add: true, protocols: []protocol.ID{"b", "c"}}, + {id: "c", success: 3, latency: 14, add: true, protocols: []protocol.ID{"c", "d"}}, + {id: "d", success: 3, latency: 12, add: true, protocols: []protocol.ID{"a", "e"}}, + }, + n: 3, + expect: []peer.ID{"a", "b", "c"}, + protocols: []protocol.ID{"b", "c"}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + require.Equal( + t, + tc.expect, + withEvents(tc.events).SelectBestWithProtocols(tc.n, tc.protocols), + "select best %d", + tc.n, + ) + }) + } +} + func TestTotal(t *testing.T) { const total = 100 events := []event{} diff --git a/sync2/multipeer/multipeer.go b/sync2/multipeer/multipeer.go index 3b35a057ee..8f5fd33adc 100644 --- a/sync2/multipeer/multipeer.go +++ b/sync2/multipeer/multipeer.go @@ -7,6 +7,7 @@ import ( "time" "github.com/jonboulle/clockwork" + "github.com/libp2p/go-libp2p/core/protocol" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -15,6 +16,10 @@ import ( "github.com/spacemeshos/go-spacemesh/sync2/rangesync" ) +const ( + Protocol = "sync/2" +) + type syncability struct { // peers that were probed successfully syncable []p2p.Peer @@ -283,7 +288,7 @@ func (mpr *MultiPeerReconciler) fullSync(ctx context.Context, syncPeers []p2p.Pe func (mpr *MultiPeerReconciler) syncOnce(ctx context.Context, lastWasSplit bool) (full bool, err error) { var s syncability for { - syncPeers := mpr.peers.SelectBest(mpr.cfg.SyncPeerCount) + syncPeers := mpr.peers.SelectBestWithProtocols(mpr.cfg.SyncPeerCount, []protocol.ID{Protocol}) mpr.logger.Debug("selected best peers for sync", zap.Int("syncPeerCount", mpr.cfg.SyncPeerCount), zap.Int("totalPeers", mpr.peers.Total()), diff --git a/sync2/multipeer/multipeer_test.go b/sync2/multipeer/multipeer_test.go index ab0a516839..d74ff95807 100644 --- a/sync2/multipeer/multipeer_test.go +++ b/sync2/multipeer/multipeer_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/jonboulle/clockwork" + "github.com/libp2p/go-libp2p/core/protocol" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" @@ -101,7 +102,7 @@ func (mt *multiPeerSyncTester) addPeers(n int) []p2p.Peer { r := make([]p2p.Peer, n) for i := 0; i < n; i++ { p := p2p.Peer(fmt.Sprintf("peer%d", i+1)) - mt.peers.Add(p) + mt.peers.Add(p, func() []protocol.ID { return []protocol.ID{multipeer.Protocol} }) r[i] = p } return r diff --git a/sync2/multipeer/split_sync_test.go b/sync2/multipeer/split_sync_test.go index 7a8e584009..24ba0a9cbd 100644 --- a/sync2/multipeer/split_sync_test.go +++ b/sync2/multipeer/split_sync_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/jonboulle/clockwork" + "github.com/libp2p/go-libp2p/core/protocol" "github.com/stretchr/testify/require" gomock "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" @@ -108,7 +109,7 @@ func newTestSplitSync(t testing.TB) *splitSyncTester { AnyTimes() } for _, p := range tst.syncPeers { - tst.peers.Add(p) + tst.peers.Add(p, func() []protocol.ID { return []protocol.ID{multipeer.Protocol} }) } tst.splitSync = multipeer.NewSplitSync( zaptest.NewLogger(t), diff --git a/sync2/p2p_test.go b/sync2/p2p_test.go index a36e4ebf27..d3f0c56be8 100644 --- a/sync2/p2p_test.go +++ b/sync2/p2p_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/libp2p/go-libp2p/core/protocol" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" @@ -16,6 +17,7 @@ import ( "github.com/spacemeshos/go-spacemesh/p2p" "github.com/spacemeshos/go-spacemesh/p2p/server" "github.com/spacemeshos/go-spacemesh/sync2" + "github.com/spacemeshos/go-spacemesh/sync2/multipeer" "github.com/spacemeshos/go-spacemesh/sync2/rangesync" ) @@ -88,7 +90,9 @@ func TestP2P(t *testing.T) { ps := peers.New() for m := 0; m < numNodes; m++ { if m != n { - ps.Add(mesh.Hosts()[m].ID()) + ps.Add(mesh.Hosts()[m].ID(), func() []protocol.ID { + return []protocol.ID{multipeer.Protocol} + }) } } cfg := sync2.DefaultConfig() From f36854207a3d4cc59465c221d15972a2111f1a24 Mon Sep 17 00:00:00 2001 From: Ivan Shvedunov Date: Thu, 21 Nov 2024 14:28:29 +0400 Subject: [PATCH 2/5] Update fetch/peers/peers.go Co-authored-by: Matthias Fasching <5011972+fasmat@users.noreply.github.com> --- fetch/peers/peers.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/fetch/peers/peers.go b/fetch/peers/peers.go index b13fa20cef..6e1aebb520 100644 --- a/fetch/peers/peers.go +++ b/fetch/peers/peers.go @@ -166,13 +166,8 @@ func (p *Peers) SelectBestWithProtocols(n int, protocols []protocol.ID) []peer.I } func (p *Peers) selectBest(n int, protocols []protocol.ID) []peer.ID { - var protoMap map[protocol.ID]struct{} - if len(protocols) > 0 { - protoMap = make(map[protocol.ID]struct{}, len(protocols)) - for _, proto := range protocols { - protoMap[proto] = struct{}{} - } - } + slices.Sort(protocols) + slices.Compact(protocols) lth := min(len(p.peers), n) if lth == 0 { return nil From 8ad95eef063592ad2427a2d9fa0d128d3334d4b1 Mon Sep 17 00:00:00 2001 From: Ivan Shvedunov Date: Thu, 21 Nov 2024 14:28:40 +0400 Subject: [PATCH 3/5] Update fetch/peers/peers.go Co-authored-by: Matthias Fasching <5011972+fasmat@users.noreply.github.com> --- fetch/peers/peers.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fetch/peers/peers.go b/fetch/peers/peers.go index 6e1aebb520..1c15651d04 100644 --- a/fetch/peers/peers.go +++ b/fetch/peers/peers.go @@ -174,10 +174,10 @@ func (p *Peers) selectBest(n int, protocols []protocol.ID) []peer.ID { } best := make([]*data, 0, lth) for _, peer := range p.peers { - if protoMap != nil { + if len(protocols) > 0 { found := false for _, proto := range peer.protocols() { - if _, exist := protoMap[proto]; exist { + if slices.Contains(protocols, proto) { found = true break } From fad6531fe21aa06cfc03f06a5ab98e356878cf0d Mon Sep 17 00:00:00 2001 From: Ivan Shvedunov Date: Thu, 21 Nov 2024 14:31:08 +0400 Subject: [PATCH 4/5] fetch: fix missing import --- fetch/peers/peers.go | 1 + 1 file changed, 1 insertion(+) diff --git a/fetch/peers/peers.go b/fetch/peers/peers.go index 1c15651d04..04b1a9a589 100644 --- a/fetch/peers/peers.go +++ b/fetch/peers/peers.go @@ -1,6 +1,7 @@ package peers import ( + "slices" "strings" "sync" "time" From 8b3ffa97e7cd4a06e6b3ec93fa34b5c0482a1666 Mon Sep 17 00:00:00 2001 From: Ivan Shvedunov Date: Thu, 21 Nov 2024 14:54:54 +0400 Subject: [PATCH 5/5] fetch: fix slices.Compact usage --- fetch/peers/peers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fetch/peers/peers.go b/fetch/peers/peers.go index 04b1a9a589..4d25f1d9e9 100644 --- a/fetch/peers/peers.go +++ b/fetch/peers/peers.go @@ -168,7 +168,7 @@ func (p *Peers) SelectBestWithProtocols(n int, protocols []protocol.ID) []peer.I func (p *Peers) selectBest(n int, protocols []protocol.ID) []peer.ID { slices.Sort(protocols) - slices.Compact(protocols) + protocols = slices.Compact(protocols) lth := min(len(p.peers), n) if lth == 0 { return nil