Painless concurrent pipelines for Node.js
Easily create concurrent pipelines for faster applications.
A pipeline has streams, and each stream progresses through a number of stages.
It all starts by creating a Pipeline
:
const ppl = new Pipeline(5);
The argument passed to Pipeline
controls the maximum number of streams that can ever exist concurrently. This allows easy implementation of back pressure.
To start a new Stream
, pass your code to Pipeline.pipelined
and await
on the function it returns:
await ppl.pipelined(async (stage, ...your args) => {
... your code here
})(... your pipeline args);
Inside the stream code, you can define stages:
await stage('process data', 3);
Diving the work into stages allows you to set different concurrency limits to different stages, in this snippet, 3.
Finally, to block execution until all the streams in the pipeline have finished:
await ppl.finish();
If any stream ends with an unhandled exception, calling finish()
or pipelined()
will throw an array of unhandled exceptions, along with the corresponding stream ids. Note that in this case, pipelined()
will wait for all running streams to finish before throwing the exception to make sure there are no "dangling" streams left.
You may want to add a new stage for exception handling, depending on the logic of your application.
Some sample code can be found at demos.