Skip to content

Commit

Permalink
make PeerManager use only ConnectionManager, move getPeers to Connect…
Browse files Browse the repository at this point in the history
…ionManager, remove not needed code
  • Loading branch information
weboko committed Oct 20, 2024
1 parent e897d5c commit e1813bc
Show file tree
Hide file tree
Showing 20 changed files with 73 additions and 683 deletions.
3 changes: 1 addition & 2 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ export { LightPushCodec, LightPushCore } from "./lib/light_push/index.js";
export * as waku_store from "./lib/store/index.js";
export { StoreCore, StoreCodec } from "./lib/store/index.js";

export { ConnectionManager } from "./lib/connection_manager.js";
export { ConnectionManager } from "./lib/connection_manager/index.js";

export { getHealthManager } from "./lib/health_manager.js";

export { KeepAliveManager } from "./lib/keep_alive_manager.js";
export { StreamManager } from "./lib/stream_manager/index.js";

export { MetadataCodec, wakuMetadata } from "./lib/metadata/index.js";
55 changes: 1 addition & 54 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ import type {
Libp2pComponents,
PubsubTopic
} from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { getPeersForProtocol, sortPeersByLatency } from "@waku/utils/libp2p";
import { getPeersForProtocol } from "@waku/utils/libp2p";

import { filterPeersByDiscovery } from "./filterPeers.js";
import { StreamManager } from "./stream_manager/index.js";

/**
Expand All @@ -23,7 +21,6 @@ export class BaseProtocol implements IBaseProtocolCore {
protected constructor(
public multicodec: string,
protected components: Libp2pComponents,
private log: Logger,
public readonly pubsubTopics: PubsubTopic[]
) {
this.addLibp2pEventListener = components.events.addEventListener.bind(
Expand Down Expand Up @@ -64,54 +61,4 @@ export class BaseProtocol implements IBaseProtocolCore {
return connections.length > 0;
});
}

/**
* Retrieves a list of connected peers that support the protocol. The list is sorted by latency.
*
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned.
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.
* @returns A list of peers that support the protocol sorted by latency. By default, returns all peers available, including bootstrap.
*/
public async getPeers(
{
numPeers,
maxBootstrapPeers
}: {
numPeers: number;
maxBootstrapPeers: number;
} = {
maxBootstrapPeers: 0,
numPeers: 0
}
): Promise<Peer[]> {
// Retrieve all connected peers that support the protocol & shard (if configured)
const allAvailableConnectedPeers = await this.connectedPeers();

// Filter the peers based on discovery & number of peers requested
const filteredPeers = filterPeersByDiscovery(
allAvailableConnectedPeers,
numPeers,
maxBootstrapPeers
);

// Sort the peers by latency
const sortedFilteredPeers = await sortPeersByLatency(
this.components.peerStore,
filteredPeers
);

if (sortedFilteredPeers.length === 0) {
this.log.warn(
"No peers found. Ensure you have a connection to the network."
);
}

if (sortedFilteredPeers.length < numPeers) {
this.log.warn(
`Only ${sortedFilteredPeers.length} peers found. Requested ${numPeers}.`
);
}

return sortedFilteredPeers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { decodeRelayShard, shardInfoToPubsubTopics } from "@waku/utils";
import { Logger } from "@waku/utils";

import { KeepAliveManager } from "./keep_alive_manager.js";
import { getPeerPing } from "./utils.js";

const log = new Logger("connection-manager");

Expand Down Expand Up @@ -180,6 +181,29 @@ export class ConnectionManager
);
}

public async getConnectedPeers(codec?: string): Promise<Peer[]> {
const peerIDs = this.libp2p.getPeers();

if (peerIDs.length === 0) {
return [];
}

const peers = await Promise.all(
peerIDs.map(async (id) => {
try {
return await this.libp2p.peerStore.get(id);
} catch (e) {
return null;
}
})
);

return peers
.filter((p) => !!p)
.filter((p) => (codec ? (p as Peer).protocols.includes(codec) : true))
.sort((left, right) => getPeerPing(left) - getPeerPing(right)) as Peer[];
}

private async dialPeerStorePeers(): Promise<void> {
const peerInfos = await this.libp2p.peerStore.all();
const dialPromises = [];
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/lib/connection_manager/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { ConnectionManager } from "./connection_manager.js";
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { IRelay, Libp2p, PeerIdStr } from "@waku/interfaces";
import { Logger, pubsubTopicToSingleShardInfo } from "@waku/utils";
import { utf8ToBytes } from "@waku/utils/bytes";

import { createEncoder } from "./message/version_0.js";
import { createEncoder } from "../message/version_0.js";

const RelayPingContentTopic = "/relay-ping/1/ping/null";
const log = new Logger("keep-alive");
Expand Down
25 changes: 25 additions & 0 deletions packages/core/src/lib/connection_manager/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import type { Peer } from "@libp2p/interface";
import { bytesToUtf8 } from "@waku/utils/bytes";

/**
* Reads peer's metadata and retrieves ping value.
* @param peer Peer or null
* @returns -1 if no ping attached, otherwise returns ping value
*/
export const getPeerPing = (peer: Peer | null): number => {
if (!peer) {
return -1;
}

try {
const bytes = peer.metadata.get("ping");

if (!bytes) {
return -1;
}

return Number(bytesToUtf8(bytes));
} catch (e) {
return -1;
}
};
2 changes: 1 addition & 1 deletion packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
public readonly pubsubTopics: PubsubTopic[],
libp2p: Libp2p
) {
super(FilterCodecs.SUBSCRIBE, libp2p.components, log, pubsubTopics);
super(FilterCodecs.SUBSCRIBE, libp2p.components, pubsubTopics);

libp2p
.handle(FilterCodecs.PUSH, this.onRequest.bind(this), {
Expand Down
144 changes: 0 additions & 144 deletions packages/core/src/lib/filterPeers.spec.ts

This file was deleted.

51 changes: 0 additions & 51 deletions packages/core/src/lib/filterPeers.ts

This file was deleted.

2 changes: 1 addition & 1 deletion packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
public readonly pubsubTopics: PubsubTopic[],
libp2p: Libp2p
) {
super(LightPushCodec, libp2p.components, log, pubsubTopics);
super(LightPushCodec, libp2p.components, pubsubTopics);
}

private async preparePushMessage(
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/lib/metadata/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Metadata extends BaseProtocol implements IMetadata {
public pubsubTopics: PubsubTopic[],
libp2p: Libp2pComponents
) {
super(MetadataCodec, libp2p.components, log, pubsubTopics);
super(MetadataCodec, libp2p.components, pubsubTopics);
this.libp2pComponents = libp2p;
void libp2p.registrar.handle(MetadataCodec, (streamData) => {
void this.onRequest(streamData);
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/lib/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export class StoreCore extends BaseProtocol implements IStoreCore {
public readonly pubsubTopics: PubsubTopic[],
libp2p: Libp2p
) {
super(StoreCodec, libp2p.components, log, pubsubTopics);
super(StoreCodec, libp2p.components, pubsubTopics);
}

public async *queryPerPage<T extends IDecodedMessage>(
Expand Down
Loading

0 comments on commit e1813bc

Please sign in to comment.