Skip to content

Commit

Permalink
fix: peer renewal connection drop & stream management (#2145)
Browse files Browse the repository at this point in the history
* fix: peer renewal connection drop

* fix stream manager

* fix over iteration during stream creation

* remove timeout and use only open peers

* add logs

* refactor code, add tests

* debug test

* up debug

* remove debug, supress check for timestamps

* remove only

* add more debug

* remove debug

* remove check for timestamps
  • Loading branch information
weboko authored Oct 1, 2024
1 parent 3e82159 commit b93134a
Show file tree
Hide file tree
Showing 9 changed files with 344 additions and 90 deletions.
1 change: 1 addition & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
}
],
"@typescript-eslint/explicit-member-accessibility": "error",
"@typescript-eslint/no-unused-vars": ["warn", { "argsIgnorePattern": "^_" }],
"prettier/prettier": [
"error",
{
Expand Down
3 changes: 2 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
"@types/uuid": "^9.0.8",
"@waku/build-utils": "*",
"chai": "^4.3.10",
"sinon": "^18.0.0",
"cspell": "^8.6.1",
"fast-check": "^3.19.0",
"ignore-loader": "^0.1.2",
Expand Down
161 changes: 161 additions & 0 deletions packages/core/src/lib/stream_manager/stream_manager.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import { Connection, Peer, PeerId, Stream } from "@libp2p/interface";
import { expect } from "chai";
import sinon from "sinon";

import { StreamManager } from "./stream_manager.js";

const MULTICODEC = "/test";

describe("StreamManager", () => {
let eventTarget: EventTarget;
let streamManager: StreamManager;

const mockPeer: Peer = {
id: {
toString() {
return "1";
}
}
} as unknown as Peer;

beforeEach(() => {
eventTarget = new EventTarget();
streamManager = new StreamManager(
MULTICODEC,
() => [],
eventTarget.addEventListener.bind(eventTarget)
);
});

it("should return usable stream attached to connection", async () => {
for (const writeStatus of ["ready", "writing"]) {
const con1 = createMockConnection();
con1.streams = [
createMockStream({ id: "1", protocol: MULTICODEC, writeStatus })
];

streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1];

const stream = await streamManager.getStream(mockPeer);

expect(stream).not.to.be.undefined;
expect(stream?.id).to.be.eq("1");
}
});

it("should throw if no connection provided", async () => {
streamManager["getConnections"] = (_peerId: PeerId | undefined) => [];

let error: Error | undefined;
try {
await streamManager.getStream(mockPeer);
} catch (e) {
error = e as Error;
}

expect(error).not.to.be.undefined;
expect(error?.message).to.include(mockPeer.id.toString());
expect(error?.message).to.include(MULTICODEC);
});

it("should create a new stream if no existing for protocol found", async () => {
for (const writeStatus of ["done", "closed", "closing"]) {
const con1 = createMockConnection();
con1.streams = [
createMockStream({ id: "1", protocol: MULTICODEC, writeStatus })
];

const newStreamSpy = sinon.spy(async (_protocol, _options) =>
createMockStream({
id: "2",
protocol: MULTICODEC,
writeStatus: "writable"
})
);

con1.newStream = newStreamSpy;
streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1];

const stream = await streamManager.getStream(mockPeer);

expect(stream).not.to.be.undefined;
expect(stream?.id).to.be.eq("2");

expect(newStreamSpy.calledOnce).to.be.true;
expect(newStreamSpy.calledWith(MULTICODEC)).to.be.true;
}
});

it("peer:update - should do nothing if another protocol hit", async () => {
const scheduleNewStreamSpy = sinon.spy();
streamManager["scheduleNewStream"] = scheduleNewStreamSpy;
eventTarget.dispatchEvent(
new CustomEvent("peer:update", { detail: { peer: { protocols: [] } } })
);

expect(scheduleNewStreamSpy.calledOnce).to.be.false;
});

it("peer:update - should schedule stream creation IF protocol hit AND no stream found on connection", async () => {
const scheduleNewStreamSpy = sinon.spy();
streamManager["scheduleNewStream"] = scheduleNewStreamSpy;
eventTarget.dispatchEvent(
new CustomEvent("peer:update", {
detail: { peer: { protocols: [MULTICODEC] } }
})
);

expect(scheduleNewStreamSpy.calledOnce).to.be.true;
});

it("peer:update - should not schedule stream creation IF protocol hit AND stream found on connection", async () => {
const con1 = createMockConnection();
con1.streams = [
createMockStream({
id: "1",
protocol: MULTICODEC,
writeStatus: "writable"
})
];
streamManager["getConnections"] = (_id) => [con1];

const scheduleNewStreamSpy = sinon.spy();
streamManager["scheduleNewStream"] = scheduleNewStreamSpy;

eventTarget.dispatchEvent(
new CustomEvent("peer:update", {
detail: { peer: { protocols: [MULTICODEC] } }
})
);

expect(scheduleNewStreamSpy.calledOnce).to.be.false;
});
});

type MockConnectionOptions = {
status?: string;
open?: number;
};

function createMockConnection(options: MockConnectionOptions = {}): Connection {
return {
status: options.status || "open",
timeline: {
open: options.open || 1
}
} as Connection;
}

type MockStreamOptions = {
id?: string;
protocol?: string;
writeStatus?: string;
};

function createMockStream(options: MockStreamOptions): Stream {
return {
id: options.id,
protocol: options.protocol,
writeStatus: options.writeStatus || "ready"
} as Stream;
}
Loading

0 comments on commit b93134a

Please sign in to comment.