From e1813bc47b9f70124e7b4ec76cccb4596bbfd1c1 Mon Sep 17 00:00:00 2001 From: Sasha Date: Sun, 20 Oct 2024 22:54:36 +0200 Subject: [PATCH] make PeerManager use only ConnectionManager, move getPeers to ConnectionManager, remove not needed code --- packages/core/src/index.ts | 3 +- packages/core/src/lib/base_protocol.ts | 55 +-- .../connection_manager.ts | 24 ++ .../core/src/lib/connection_manager/index.ts | 1 + .../keep_alive_manager.ts | 2 +- .../core/src/lib/connection_manager/utils.ts | 25 ++ packages/core/src/lib/filter/index.ts | 2 +- packages/core/src/lib/filterPeers.spec.ts | 144 -------- packages/core/src/lib/filterPeers.ts | 51 --- packages/core/src/lib/light_push/index.ts | 2 +- packages/core/src/lib/metadata/index.ts | 2 +- packages/core/src/lib/store/index.ts | 2 +- .../src/peer-exchange/waku_peer_exchange.ts | 2 +- packages/interfaces/src/connection_manager.ts | 1 + packages/sdk/src/protocols/base_protocol.ts | 2 +- .../src/protocols/light_push/light_push.ts | 27 +- packages/sdk/src/protocols/peer_manager.ts | 23 +- packages/sdk/src/protocols/store/index.ts | 11 +- packages/tests/tests/getPeers.spec.ts | 339 ------------------ packages/utils/src/libp2p/index.ts | 38 -- 20 files changed, 73 insertions(+), 683 deletions(-) rename packages/core/src/lib/{ => connection_manager}/connection_manager.ts (96%) create mode 100644 packages/core/src/lib/connection_manager/index.ts rename packages/core/src/lib/{ => connection_manager}/keep_alive_manager.ts (98%) create mode 100644 packages/core/src/lib/connection_manager/utils.ts delete mode 100644 packages/core/src/lib/filterPeers.spec.ts delete mode 100644 packages/core/src/lib/filterPeers.ts delete mode 100644 packages/tests/tests/getPeers.spec.ts diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 9b4acf2eae..cd9f894730 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -15,11 +15,10 @@ export { LightPushCodec, LightPushCore } from "./lib/light_push/index.js"; export * as waku_store from "./lib/store/index.js"; export { StoreCore, StoreCodec } from "./lib/store/index.js"; -export { ConnectionManager } from "./lib/connection_manager.js"; +export { ConnectionManager } from "./lib/connection_manager/index.js"; export { getHealthManager } from "./lib/health_manager.js"; -export { KeepAliveManager } from "./lib/keep_alive_manager.js"; export { StreamManager } from "./lib/stream_manager/index.js"; export { MetadataCodec, wakuMetadata } from "./lib/metadata/index.js"; diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index 740298ae0a..b8ed44e8d0 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -5,10 +5,8 @@ import type { Libp2pComponents, PubsubTopic } from "@waku/interfaces"; -import { Logger } from "@waku/utils"; -import { getPeersForProtocol, sortPeersByLatency } from "@waku/utils/libp2p"; +import { getPeersForProtocol } from "@waku/utils/libp2p"; -import { filterPeersByDiscovery } from "./filterPeers.js"; import { StreamManager } from "./stream_manager/index.js"; /** @@ -23,7 +21,6 @@ export class BaseProtocol implements IBaseProtocolCore { protected constructor( public multicodec: string, protected components: Libp2pComponents, - private log: Logger, public readonly pubsubTopics: PubsubTopic[] ) { this.addLibp2pEventListener = components.events.addEventListener.bind( @@ -64,54 +61,4 @@ export class BaseProtocol implements IBaseProtocolCore { return connections.length > 0; }); } - - /** - * Retrieves a list of connected peers that support the protocol. The list is sorted by latency. - * - * @param numPeers - The total number of peers to retrieve. If 0, all peers are returned. - * @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve. - * @returns A list of peers that support the protocol sorted by latency. By default, returns all peers available, including bootstrap. - */ - public async getPeers( - { - numPeers, - maxBootstrapPeers - }: { - numPeers: number; - maxBootstrapPeers: number; - } = { - maxBootstrapPeers: 0, - numPeers: 0 - } - ): Promise { - // Retrieve all connected peers that support the protocol & shard (if configured) - const allAvailableConnectedPeers = await this.connectedPeers(); - - // Filter the peers based on discovery & number of peers requested - const filteredPeers = filterPeersByDiscovery( - allAvailableConnectedPeers, - numPeers, - maxBootstrapPeers - ); - - // Sort the peers by latency - const sortedFilteredPeers = await sortPeersByLatency( - this.components.peerStore, - filteredPeers - ); - - if (sortedFilteredPeers.length === 0) { - this.log.warn( - "No peers found. Ensure you have a connection to the network." - ); - } - - if (sortedFilteredPeers.length < numPeers) { - this.log.warn( - `Only ${sortedFilteredPeers.length} peers found. Requested ${numPeers}.` - ); - } - - return sortedFilteredPeers; - } } diff --git a/packages/core/src/lib/connection_manager.ts b/packages/core/src/lib/connection_manager/connection_manager.ts similarity index 96% rename from packages/core/src/lib/connection_manager.ts rename to packages/core/src/lib/connection_manager/connection_manager.ts index d8697784f9..4f482d4822 100644 --- a/packages/core/src/lib/connection_manager.ts +++ b/packages/core/src/lib/connection_manager/connection_manager.ts @@ -19,6 +19,7 @@ import { decodeRelayShard, shardInfoToPubsubTopics } from "@waku/utils"; import { Logger } from "@waku/utils"; import { KeepAliveManager } from "./keep_alive_manager.js"; +import { getPeerPing } from "./utils.js"; const log = new Logger("connection-manager"); @@ -180,6 +181,29 @@ export class ConnectionManager ); } + public async getConnectedPeers(codec?: string): Promise { + const peerIDs = this.libp2p.getPeers(); + + if (peerIDs.length === 0) { + return []; + } + + const peers = await Promise.all( + peerIDs.map(async (id) => { + try { + return await this.libp2p.peerStore.get(id); + } catch (e) { + return null; + } + }) + ); + + return peers + .filter((p) => !!p) + .filter((p) => (codec ? (p as Peer).protocols.includes(codec) : true)) + .sort((left, right) => getPeerPing(left) - getPeerPing(right)) as Peer[]; + } + private async dialPeerStorePeers(): Promise { const peerInfos = await this.libp2p.peerStore.all(); const dialPromises = []; diff --git a/packages/core/src/lib/connection_manager/index.ts b/packages/core/src/lib/connection_manager/index.ts new file mode 100644 index 0000000000..b5e0c20d00 --- /dev/null +++ b/packages/core/src/lib/connection_manager/index.ts @@ -0,0 +1 @@ +export { ConnectionManager } from "./connection_manager.js"; diff --git a/packages/core/src/lib/keep_alive_manager.ts b/packages/core/src/lib/connection_manager/keep_alive_manager.ts similarity index 98% rename from packages/core/src/lib/keep_alive_manager.ts rename to packages/core/src/lib/connection_manager/keep_alive_manager.ts index 3f606acd9b..266a94da39 100644 --- a/packages/core/src/lib/keep_alive_manager.ts +++ b/packages/core/src/lib/connection_manager/keep_alive_manager.ts @@ -3,7 +3,7 @@ import type { IRelay, Libp2p, PeerIdStr } from "@waku/interfaces"; import { Logger, pubsubTopicToSingleShardInfo } from "@waku/utils"; import { utf8ToBytes } from "@waku/utils/bytes"; -import { createEncoder } from "./message/version_0.js"; +import { createEncoder } from "../message/version_0.js"; const RelayPingContentTopic = "/relay-ping/1/ping/null"; const log = new Logger("keep-alive"); diff --git a/packages/core/src/lib/connection_manager/utils.ts b/packages/core/src/lib/connection_manager/utils.ts new file mode 100644 index 0000000000..b994e1df7c --- /dev/null +++ b/packages/core/src/lib/connection_manager/utils.ts @@ -0,0 +1,25 @@ +import type { Peer } from "@libp2p/interface"; +import { bytesToUtf8 } from "@waku/utils/bytes"; + +/** + * Reads peer's metadata and retrieves ping value. + * @param peer Peer or null + * @returns -1 if no ping attached, otherwise returns ping value + */ +export const getPeerPing = (peer: Peer | null): number => { + if (!peer) { + return -1; + } + + try { + const bytes = peer.metadata.get("ping"); + + if (!bytes) { + return -1; + } + + return Number(bytesToUtf8(bytes)); + } catch (e) { + return -1; + } +}; diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 06e57cfe1c..09bbef9e62 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -40,7 +40,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { public readonly pubsubTopics: PubsubTopic[], libp2p: Libp2p ) { - super(FilterCodecs.SUBSCRIBE, libp2p.components, log, pubsubTopics); + super(FilterCodecs.SUBSCRIBE, libp2p.components, pubsubTopics); libp2p .handle(FilterCodecs.PUSH, this.onRequest.bind(this), { diff --git a/packages/core/src/lib/filterPeers.spec.ts b/packages/core/src/lib/filterPeers.spec.ts deleted file mode 100644 index 8ae77c3bd2..0000000000 --- a/packages/core/src/lib/filterPeers.spec.ts +++ /dev/null @@ -1,144 +0,0 @@ -import { Peer } from "@libp2p/interface"; -import type { Tag } from "@libp2p/interface"; -import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; -import { Tags } from "@waku/interfaces"; -import { expect } from "chai"; - -import { filterPeersByDiscovery } from "./filterPeers.js"; - -describe("filterPeersByDiscovery function", function () { - it("should return all peers when numPeers is 0", async function () { - const peer1 = await createSecp256k1PeerId(); - const peer2 = await createSecp256k1PeerId(); - const peer3 = await createSecp256k1PeerId(); - - const mockPeers = [ - { - id: peer1, - tags: new Map([[Tags.BOOTSTRAP, { value: 100 }]]) - }, - { - id: peer2, - tags: new Map([[Tags.BOOTSTRAP, { value: 100 }]]) - }, - { - id: peer3, - tags: new Map([[Tags.PEER_EXCHANGE, { value: 100 }]]) - } - ] as unknown as Peer[]; - - const result = filterPeersByDiscovery(mockPeers, 0, 10); - expect(result.length).to.deep.equal(mockPeers.length); - }); - - it("should return all non-bootstrap peers and no bootstrap peer when numPeers is 0 and maxBootstrapPeers is 0", async function () { - const peer1 = await createSecp256k1PeerId(); - const peer2 = await createSecp256k1PeerId(); - const peer3 = await createSecp256k1PeerId(); - const peer4 = await createSecp256k1PeerId(); - - const mockPeers = [ - { - id: peer1, - tags: new Map([[Tags.BOOTSTRAP, { value: 100 }]]) - }, - { - id: peer2, - tags: new Map([[Tags.BOOTSTRAP, { value: 100 }]]) - }, - { - id: peer3, - tags: new Map([[Tags.PEER_EXCHANGE, { value: 100 }]]) - }, - { - id: peer4, - tags: new Map([[Tags.PEER_EXCHANGE, { value: 100 }]]) - } - ] as unknown as Peer[]; - - const result = filterPeersByDiscovery(mockPeers, 0, 0); - - // result should have no bootstrap peers, and a total of 2 peers - expect(result.length).to.equal(2); - expect( - result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length - ).to.equal(0); - }); - - it("should return one bootstrap peer, and all non-boostrap peers, when numPeers is 0 & maxBootstrap is 1", async function () { - const peer1 = await createSecp256k1PeerId(); - const peer2 = await createSecp256k1PeerId(); - const peer3 = await createSecp256k1PeerId(); - const peer4 = await createSecp256k1PeerId(); - const peer5 = await createSecp256k1PeerId(); - - const mockPeers = [ - { - id: peer1, - tags: new Map([[Tags.BOOTSTRAP, { value: 100 }]]) - }, - { - id: peer2, - tags: new Map([[Tags.BOOTSTRAP, { value: 100 }]]) - }, - { - id: peer3, - tags: new Map([[Tags.PEER_EXCHANGE, { value: 100 }]]) - }, - { - id: peer4, - tags: new Map([[Tags.PEER_EXCHANGE, { value: 100 }]]) - }, - { - id: peer5, - tags: new Map([[Tags.PEER_EXCHANGE, { value: 100 }]]) - } - ] as unknown as Peer[]; - - const result = filterPeersByDiscovery(mockPeers, 0, 1); - - // result should have 1 bootstrap peers, and a total of 4 peers - expect(result.length).to.equal(4); - expect( - result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length - ).to.equal(1); - }); - - it("should return only bootstrap peers up to maxBootstrapPeers", async function () { - const peer1 = await createSecp256k1PeerId(); - const peer2 = await createSecp256k1PeerId(); - const peer3 = await createSecp256k1PeerId(); - const peer4 = await createSecp256k1PeerId(); - const peer5 = await createSecp256k1PeerId(); - - const mockPeers = [ - { - id: peer1, - tags: new Map([[Tags.BOOTSTRAP, { value: 100 }]]) - }, - { - id: peer2, - tags: new Map([[Tags.BOOTSTRAP, { value: 100 }]]) - }, - { - id: peer3, - tags: new Map([[Tags.BOOTSTRAP, { value: 100 }]]) - }, - { - id: peer4, - tags: new Map([[Tags.PEER_EXCHANGE, { value: 100 }]]) - }, - { - id: peer5, - tags: new Map([[Tags.PEER_EXCHANGE, { value: 100 }]]) - } - ] as unknown as Peer[]; - - const result = filterPeersByDiscovery(mockPeers, 5, 2); - - // check that result has at least 2 bootstrap peers and no more than 5 peers - expect(result.length).to.be.at.least(2); - expect(result.length).to.be.at.most(5); - expect(result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length); - }); -}); diff --git a/packages/core/src/lib/filterPeers.ts b/packages/core/src/lib/filterPeers.ts deleted file mode 100644 index 816c3bd5b5..0000000000 --- a/packages/core/src/lib/filterPeers.ts +++ /dev/null @@ -1,51 +0,0 @@ -import { Peer } from "@libp2p/interface"; -import { Tags } from "@waku/interfaces"; - -/** - * Retrieves a list of peers based on the specified criteria: - * 1. If numPeers is 0, return all peers - * 2. Bootstrap peers are prioritized - * 3. Non-bootstrap peers are randomly selected to fill up to numPeers - * - * @param peers - The list of peers to filter from. - * @param numPeers - The total number of peers to retrieve. If 0, all peers are returned, irrespective of `maxBootstrapPeers`. - * @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve. - * @returns An array of peers based on the specified criteria. - */ -export function filterPeersByDiscovery( - peers: Peer[], - numPeers: number, - maxBootstrapPeers: number -): Peer[] { - // Collect the bootstrap peers up to the specified maximum - let bootstrapPeers = peers - .filter((peer) => peer.tags.has(Tags.BOOTSTRAP)) - .slice(0, maxBootstrapPeers); - - // If numPeers is less than the number of bootstrap peers, adjust the bootstrapPeers array - if (numPeers > 0 && numPeers < bootstrapPeers.length) { - bootstrapPeers = bootstrapPeers.slice(0, numPeers); - } - - // Collect non-bootstrap peers - const nonBootstrapPeers = peers.filter( - (peer) => !peer.tags.has(Tags.BOOTSTRAP) - ); - - // If numPeers is 0, return all peers - if (numPeers === 0) { - return [...bootstrapPeers, ...nonBootstrapPeers]; - } - - // Initialize the list of selected peers with the bootstrap peers - const selectedPeers: Peer[] = [...bootstrapPeers]; - - // Fill up to numPeers with remaining random peers if needed - while (selectedPeers.length < numPeers && nonBootstrapPeers.length > 0) { - const randomIndex = Math.floor(Math.random() * nonBootstrapPeers.length); - const randomPeer = nonBootstrapPeers.splice(randomIndex, 1)[0]; - selectedPeers.push(randomPeer); - } - - return selectedPeers; -} diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index e807bee5ad..440201464f 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -37,7 +37,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore { public readonly pubsubTopics: PubsubTopic[], libp2p: Libp2p ) { - super(LightPushCodec, libp2p.components, log, pubsubTopics); + super(LightPushCodec, libp2p.components, pubsubTopics); } private async preparePushMessage( diff --git a/packages/core/src/lib/metadata/index.ts b/packages/core/src/lib/metadata/index.ts index 1744450c7b..2b3bfe2532 100644 --- a/packages/core/src/lib/metadata/index.ts +++ b/packages/core/src/lib/metadata/index.ts @@ -30,7 +30,7 @@ class Metadata extends BaseProtocol implements IMetadata { public pubsubTopics: PubsubTopic[], libp2p: Libp2pComponents ) { - super(MetadataCodec, libp2p.components, log, pubsubTopics); + super(MetadataCodec, libp2p.components, pubsubTopics); this.libp2pComponents = libp2p; void libp2p.registrar.handle(MetadataCodec, (streamData) => { void this.onRequest(streamData); diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index a4b8b9c2e7..830a7b5100 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -32,7 +32,7 @@ export class StoreCore extends BaseProtocol implements IStoreCore { public readonly pubsubTopics: PubsubTopic[], libp2p: Libp2p ) { - super(StoreCodec, libp2p.components, log, pubsubTopics); + super(StoreCodec, libp2p.components, pubsubTopics); } public async *queryPerPage( diff --git a/packages/discovery/src/peer-exchange/waku_peer_exchange.ts b/packages/discovery/src/peer-exchange/waku_peer_exchange.ts index 6030b86f76..43520e9bde 100644 --- a/packages/discovery/src/peer-exchange/waku_peer_exchange.ts +++ b/packages/discovery/src/peer-exchange/waku_peer_exchange.ts @@ -32,7 +32,7 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange { components: Libp2pComponents, pubsubTopics: PubsubTopic[] ) { - super(PeerExchangeCodec, components, log, pubsubTopics); + super(PeerExchangeCodec, components, pubsubTopics); } /** diff --git a/packages/interfaces/src/connection_manager.ts b/packages/interfaces/src/connection_manager.ts index f0810e4736..8610acab95 100644 --- a/packages/interfaces/src/connection_manager.ts +++ b/packages/interfaces/src/connection_manager.ts @@ -86,6 +86,7 @@ export interface IConnectionStateEvents { export interface IConnectionManager extends TypedEventEmitter { pubsubTopics: PubsubTopic[]; + getConnectedPeers(codec?: string): Promise; dropConnection(peerId: PeerId): Promise; getPeersByDiscovery(): Promise; stop(): void; diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 72295b2747..3a7abc1b36 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -33,7 +33,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { const maintainPeersInterval = options?.maintainPeersInterval ?? DEFAULT_MAINTAIN_PEERS_INTERVAL; - this.peerManager = new PeerManager(connectionManager, core); + this.peerManager = new PeerManager(connectionManager); this.log.info( `Initializing BaseProtocolSDK with numPeersToUse: ${this.numPeersToUse}, maintainPeersInterval: ${maintainPeersInterval}ms` diff --git a/packages/sdk/src/protocols/light_push/light_push.ts b/packages/sdk/src/protocols/light_push/light_push.ts index 5919822656..478e683b7d 100644 --- a/packages/sdk/src/protocols/light_push/light_push.ts +++ b/packages/sdk/src/protocols/light_push/light_push.ts @@ -36,8 +36,8 @@ export class LightPush implements ILightPush { public readonly protocol: LightPushCore; public constructor( - connectionManager: ConnectionManager, - private libp2p: Libp2p, + private connectionManager: ConnectionManager, + libp2p: Libp2p, options?: ProtocolCreateOptions ) { this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE; @@ -147,26 +147,9 @@ export class LightPush implements ILightPush { } private async getConnectedPeers(): Promise { - const peerIDs = this.libp2p.getPeers(); - - if (peerIDs.length === 0) { - return []; - } - - const peers = await Promise.all( - peerIDs.map(async (id) => { - try { - return await this.libp2p.peerStore.get(id); - } catch (e) { - return null; - } - }) - ); - - return peers - .filter((p) => !!p) - .filter((p) => (p as Peer).protocols.includes(LightPushCodec)) - .slice(0, this.numPeersToUse) as Peer[]; + const peers = + await this.connectionManager.getConnectedPeers(LightPushCodec); + return peers.slice(0, this.numPeersToUse); } } diff --git a/packages/sdk/src/protocols/peer_manager.ts b/packages/sdk/src/protocols/peer_manager.ts index 422f508e15..d8bf25f0bc 100644 --- a/packages/sdk/src/protocols/peer_manager.ts +++ b/packages/sdk/src/protocols/peer_manager.ts @@ -1,7 +1,5 @@ import { Peer, PeerId } from "@libp2p/interface"; -import { ConnectionManager, getHealthManager } from "@waku/core"; -import { BaseProtocol } from "@waku/core/lib/base_protocol"; -import { IHealthManager } from "@waku/interfaces"; +import { ConnectionManager } from "@waku/core"; import { Logger } from "@waku/utils"; import { Mutex } from "async-mutex"; @@ -9,19 +7,12 @@ const log = new Logger("peer-manager"); export class PeerManager { private peers: Map = new Map(); - private healthManager: IHealthManager; private readMutex = new Mutex(); private writeMutex = new Mutex(); private writeLockHolder: string | null = null; - public constructor( - private readonly connectionManager: ConnectionManager, - private readonly core: BaseProtocol - ) { - this.healthManager = getHealthManager(); - this.healthManager.updateProtocolHealth(this.core.multicodec, 0); - } + public constructor(private readonly connectionManager: ConnectionManager) {} public getWriteLockHolder(): string | null { return this.writeLockHolder; @@ -37,10 +28,6 @@ export class PeerManager { await this.connectionManager.attemptDial(peer.id); this.peers.set(peer.id.toString(), peer); log.info(`Added and dialed peer: ${peer.id.toString()}`); - this.healthManager.updateProtocolHealth( - this.core.multicodec, - this.peers.size - ); this.writeLockHolder = null; }); } @@ -50,10 +37,6 @@ export class PeerManager { this.writeLockHolder = `removePeer: ${peerId.toString()}`; this.peers.delete(peerId.toString()); log.info(`Removed peer: ${peerId.toString()}`); - this.healthManager.updateProtocolHealth( - this.core.multicodec, - this.peers.size - ); this.writeLockHolder = null; }); } @@ -92,7 +75,7 @@ export class PeerManager { * @param numPeers The number of peers to find. */ public async findPeers(numPeers: number): Promise { - const connectedPeers = await this.core.getPeers(); + const connectedPeers = await this.connectionManager.getConnectedPeers(); return this.readMutex.runExclusive(async () => { const newPeers = connectedPeers diff --git a/packages/sdk/src/protocols/store/index.ts b/packages/sdk/src/protocols/store/index.ts index 322e34f6d3..efd58033f8 100644 --- a/packages/sdk/src/protocols/store/index.ts +++ b/packages/sdk/src/protocols/store/index.ts @@ -58,12 +58,11 @@ export class Store extends BaseProtocolSDK implements IStore { ...options }; - const peer = ( - await this.protocol.getPeers({ - numPeers: this.numPeersToUse, - maxBootstrapPeers: 1 - }) - )[0]; + const peers = await this.connectionManager.getConnectedPeers( + this.core.multicodec + ); + const peer = peers[0]; + if (!peer) { log.error("No peers available to query"); throw new Error("No peers available to query"); diff --git a/packages/tests/tests/getPeers.spec.ts b/packages/tests/tests/getPeers.spec.ts deleted file mode 100644 index dd0ba3ad5c..0000000000 --- a/packages/tests/tests/getPeers.spec.ts +++ /dev/null @@ -1,339 +0,0 @@ -import type { Connection, Peer, PeerStore } from "@libp2p/interface"; -import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; -import { - createLightNode, - Libp2pComponents, - type LightNode, - Tags, - utf8ToBytes -} from "@waku/sdk"; -import { encodeRelayShard } from "@waku/utils"; -import { expect } from "chai"; -import fc from "fast-check"; -import Sinon from "sinon"; - -import { - afterEachCustom, - beforeEachCustom, - DefaultTestShardInfo -} from "../src/index.js"; - -describe("getPeers", function () { - let peerStore: PeerStore; - let connectionManager: Libp2pComponents["connectionManager"]; - let waku: LightNode; - const lowPingBytes = utf8ToBytes("50"); - const midPingBytes = utf8ToBytes("100"); - const highPingBytes = utf8ToBytes("200"); - - let lowPingBootstrapPeer: Peer, - lowPingNonBootstrapPeer: Peer, - midPingBootstrapPeer: Peer, - midPingNonBootstrapPeer: Peer, - highPingBootstrapPeer: Peer, - highPingNonBootstrapPeer: Peer, - differentCodecPeer: Peer, - anotherDifferentCodecPeer: Peer; - - let bootstrapPeers: Peer[]; - let nonBootstrapPeers: Peer[]; - let allPeers: Peer[]; - - beforeEachCustom(this, async () => { - waku = await createLightNode({ networkConfig: DefaultTestShardInfo }); - peerStore = waku.libp2p.peerStore; - connectionManager = waku.libp2p.components.connectionManager; - - const [ - lowPingBootstrapPeerId, - lowPingNonBootstrapPeerId, - midPingBootstrapPeerId, - midPingNonBootstrapPeerId, - highPingBootstrapPeerId, - highPingNonBootstrapPeerId, - differentCodecPeerId, - anotherDifferentCodecPeerId - ] = await Promise.all([ - createSecp256k1PeerId(), - createSecp256k1PeerId(), - createSecp256k1PeerId(), - createSecp256k1PeerId(), - createSecp256k1PeerId(), - createSecp256k1PeerId(), - createSecp256k1PeerId(), - createSecp256k1PeerId() - ]); - - lowPingBootstrapPeer = { - id: lowPingBootstrapPeerId, - protocols: [waku.lightPush.protocol.multicodec], - metadata: new Map().set("ping", lowPingBytes), - tags: new Map().set(Tags.BOOTSTRAP, {}) - } as Peer; - lowPingNonBootstrapPeer = { - id: lowPingNonBootstrapPeerId, - protocols: [waku.lightPush.protocol.multicodec], - metadata: new Map().set("ping", lowPingBytes), - tags: new Map().set(Tags.PEER_EXCHANGE, {}) - } as Peer; - midPingBootstrapPeer = { - id: midPingBootstrapPeerId, - protocols: [waku.lightPush.protocol.multicodec], - metadata: new Map().set("ping", midPingBytes), - tags: new Map().set(Tags.BOOTSTRAP, {}) - } as Peer; - midPingNonBootstrapPeer = { - id: midPingNonBootstrapPeerId, - protocols: [waku.lightPush.protocol.multicodec], - metadata: new Map().set("ping", midPingBytes), - tags: new Map().set(Tags.PEER_EXCHANGE, {}) - } as Peer; - highPingBootstrapPeer = { - id: highPingBootstrapPeerId, - protocols: [waku.lightPush.protocol.multicodec], - metadata: new Map().set("ping", highPingBytes), - tags: new Map().set(Tags.BOOTSTRAP, {}) - } as Peer; - highPingNonBootstrapPeer = { - id: highPingNonBootstrapPeerId, - protocols: [waku.lightPush.protocol.multicodec], - metadata: new Map().set("ping", highPingBytes), - tags: new Map().set(Tags.PEER_EXCHANGE, {}) - } as Peer; - differentCodecPeer = { - id: differentCodecPeerId, - protocols: ["different/1"], - metadata: new Map().set("ping", lowPingBytes), - tags: new Map().set(Tags.BOOTSTRAP, {}) - } as Peer; - anotherDifferentCodecPeer = { - id: anotherDifferentCodecPeerId, - protocols: ["different/2"], - metadata: new Map().set("ping", lowPingBytes), - tags: new Map().set(Tags.BOOTSTRAP, {}) - } as Peer; - - bootstrapPeers = [ - lowPingBootstrapPeer, - midPingBootstrapPeer, - highPingBootstrapPeer - ]; - - nonBootstrapPeers = [ - lowPingNonBootstrapPeer, - midPingNonBootstrapPeer, - highPingNonBootstrapPeer - ]; - - allPeers = [ - ...bootstrapPeers, - ...nonBootstrapPeers, - differentCodecPeer, - anotherDifferentCodecPeer - ]; - - allPeers.forEach((peer) => { - peer.metadata.set("shardInfo", encodeRelayShard(DefaultTestShardInfo)); - }); - - Sinon.stub(peerStore, "get").callsFake(async (peerId) => { - return allPeers.find((peer) => peer.id.equals(peerId))!; - }); - - Sinon.stub(peerStore, "forEach").callsFake(async (callback) => { - for (const peer of allPeers) { - callback(peer); - } - }); - - // assume all peers have an opened connection - Sinon.stub(connectionManager, "getConnections").callsFake(() => { - const connections: Connection[] = []; - for (const peer of allPeers) { - connections.push({ - status: "open", - remotePeer: peer.id, - streams: [{ protocol: waku.lightPush.protocol.multicodec }] - } as unknown as Connection); - } - return connections; - }); - }); - - afterEachCustom(this, async () => { - Sinon.restore(); - }); - - describe("getPeers with varying maxBootstrapPeers", function () { - const maxBootstrapPeersValues = [1, 2, 3, 4, 5, 6, 7]; - - maxBootstrapPeersValues.forEach((maxBootstrapPeers) => { - describe(`maxBootstrapPeers=${maxBootstrapPeers}`, function () { - it(`numPeers=1 -- returns one bootstrap peer `, async function () { - const result = (await (waku.lightPush.protocol as any).getPeers({ - numPeers: 1, - maxBootstrapPeers - })) as Peer[]; - - // Should only have 1 peer - expect(result).to.have.lengthOf(1); - - // The peer should be a bootstrap peer - expect(result[0].tags.has(Tags.BOOTSTRAP)).to.be.true; - - // Peer should be of the same protocol - expect( - result[0].protocols.includes(waku.lightPush.protocol.multicodec) - ).to.be.true; - - // Peer should have the lowest ping - expect(result[0].metadata.get("ping")).to.equal(lowPingBytes); - }); - - it(`numPeers=2 -- returns total 2 peers, with max ${maxBootstrapPeers} bootstrap peers`, async function () { - const result = (await (waku.lightPush.protocol as any).getPeers({ - numPeers: 2, - maxBootstrapPeers - })) as Peer[]; - - // Should only have 2 peers - expect(result).to.have.lengthOf(2); - - // Should only have ${maxBootstrapPeers} bootstrap peers - expect( - result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length - ).to.be.lessThanOrEqual(maxBootstrapPeers); - - // Should return peers with the same protocol - expect( - result.every((peer: Peer) => - peer.protocols.includes(waku.lightPush.protocol.multicodec) - ) - ).to.be.true; - - // All peers should be sorted by latency - // 0th index should be the lowest ping of all peers returned - expect(result[0].metadata.get("ping")).to.equal(lowPingBytes); - }); - - it(`numPeers=3 -- returns total 3 peers, with max ${maxBootstrapPeers} bootstrap peers`, async function () { - const result = (await (waku.lightPush.protocol as any).getPeers({ - numPeers: 3, - maxBootstrapPeers - })) as Peer[]; - - // Should only have 3 peers - expect(result).to.have.lengthOf(3); - - // Should only have ${maxBootstrapPeers} bootstrap peers - expect( - result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length - ).to.be.lessThanOrEqual(maxBootstrapPeers); - - // Should return peers with the same protocol - expect( - result.every((peer: Peer) => - peer.protocols.includes(waku.lightPush.protocol.multicodec) - ) - ).to.be.true; - - // All peers should be sorted by latency - // 0th index should be the lowest ping of all peers returned - expect(result[0].metadata.get("ping")).to.equal(lowPingBytes); - }); - - it(`numPeers=4 -- returns total 4 peers, with max ${maxBootstrapPeers} bootstrap peers`, async function () { - const result = (await (waku.lightPush.protocol as any).getPeers({ - numPeers: 4, - maxBootstrapPeers - })) as Peer[]; - - // Should only have 4 peers - expect(result).to.have.lengthOf(4); - - // Should only have ${maxBootstrapPeers} bootstrap peers - expect( - result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length - ).to.be.lessThanOrEqual(maxBootstrapPeers); - - // Should return peers with the same protocol - expect( - result.every((peer: Peer) => - peer.protocols.includes(waku.lightPush.protocol.multicodec) - ) - ).to.be.true; - - // All peers should be sorted by latency - // 0th index should be the lowest ping of all peers returned - expect(result[0].metadata.get("ping")).to.equal(lowPingBytes); - }); - - it(`numPeers=0 -- returns all peers including all non-bootstrap with maxBootstrapPeers: ${maxBootstrapPeers}`, async function () { - const result = (await (waku.lightPush.protocol as any).getPeers({ - numPeers: 0, - maxBootstrapPeers - })) as Peer[]; - - // Should have all non-bootstrap peers + ${maxBootstrapPeers} bootstrap peers - // Unless bootstrapPeers.length < maxBootstrapPeers - // Then it should be all non-bootstrap peers + bootstrapPeers.length - if (maxBootstrapPeers > bootstrapPeers.length) { - expect(result).to.have.lengthOf( - nonBootstrapPeers.length + bootstrapPeers.length - ); - } else { - expect(result).to.have.lengthOf( - nonBootstrapPeers.length + maxBootstrapPeers - ); - } - - // All peers should be bootstrap peers - expect( - result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length - ).to.be.lessThanOrEqual(maxBootstrapPeers); - - // Peers should be of the same protocol - expect( - result.every((peer: Peer) => - peer.protocols.includes(waku.lightPush.protocol.multicodec) - ) - ).to.be.true; - - // All peers returned should be sorted by latency - // 0th index should be the lowest ping of all peers returned - expect(result[0].metadata.get("ping")).to.equal(lowPingBytes); - }); - }); - }); - }); - - describe("getPeers property-based tests", function () { - it("should return the correct number of peers based on numPeers and maxBootstrapPeers", async function () { - await fc.assert( - fc.asyncProperty( - //max bootstrap peers - fc.integer({ min: 1, max: 100 }), - //numPeers - fc.integer({ min: 0, max: 100 }), - async (maxBootstrapPeers, numPeers) => { - const result = (await (waku.lightPush.protocol as any).getPeers({ - numPeers, - maxBootstrapPeers - })) as Peer[]; - - if (numPeers === 0) { - // Expect all peers when numPeers is 0 - expect(result.length).to.be.greaterThanOrEqual(1); - } else { - // Expect up to numPeers peers - expect(result.length).to.be.lessThanOrEqual(numPeers); - } - } - ), - { - verbose: true - } - ); - }); - }); -}); diff --git a/packages/utils/src/libp2p/index.ts b/packages/utils/src/libp2p/index.ts index 80feaec6b3..c24df61883 100644 --- a/packages/utils/src/libp2p/index.ts +++ b/packages/utils/src/libp2p/index.ts @@ -1,7 +1,5 @@ import type { Peer, PeerStore } from "@libp2p/interface"; -import { bytesToUtf8 } from "../bytes/index.js"; - /** * Returns a pseudo-random peer that supports the given protocol. * Useful for protocols such as store and light push @@ -13,42 +11,6 @@ export function selectRandomPeer(peers: Peer[]): Peer | undefined { return peers[index]; } -/** - * Function to sort peers by latency from lowest to highest - * @param peerStore - The Libp2p PeerStore - * @param peers - The list of peers to choose from - * @returns Sorted array of peers by latency - */ -export async function sortPeersByLatency( - peerStore: PeerStore, - peers: Peer[] -): Promise { - if (peers.length === 0) return []; - - const results = await Promise.all( - peers.map(async (peer) => { - try { - const pingBytes = (await peerStore.get(peer.id)).metadata.get("ping"); - if (!pingBytes) return { peer, ping: Infinity }; - - const ping = Number(bytesToUtf8(pingBytes)); - return { peer, ping }; - } catch (error) { - return { peer, ping: Infinity }; - } - }) - ); - - // filter out null values - const validResults = results.filter( - (result): result is { peer: Peer; ping: number } => result !== null - ); - - return validResults - .sort((a, b) => a.ping - b.ping) - .map((result) => result.peer); -} - /** * Returns the list of peers that supports the given protocol. */