From b6ed008c292482af55e9c95bd35172b4e6c3907e Mon Sep 17 00:00:00 2001 From: Tomer Aberbach Date: Sun, 24 Nov 2024 20:14:33 -0500 Subject: [PATCH] feat(lib): add zip functions --- src/operations/splices.d.ts | 31 +++++ src/operations/splices.js | 113 ++++++++++++++++ test/helpers/fast-check/iterables.ts | 17 ++- test/operations/splices.ts | 184 +++++++++++++++++++++++++++ 4 files changed, 342 insertions(+), 3 deletions(-) diff --git a/src/operations/splices.d.ts b/src/operations/splices.d.ts index 4d69fff..ceb8766 100644 --- a/src/operations/splices.d.ts +++ b/src/operations/splices.d.ts @@ -1238,3 +1238,34 @@ export const concatConcur: ( | ConcurIterable )[] ) => ConcurIterable + +/** + * @category Splices + * @since v3.8.0 + */ +export const zip: ( + ...iterables: Readonly<{ [Key in keyof Values]: Iterable }> +) => Iterable + +/** + * @category Splices + * @since v3.8.0 + */ +export const zipAsync: ( + ...iterables: Readonly<{ + [Key in keyof Values]: Iterable | AsyncIterable + }> +) => AsyncIterable + +/** + * @category Splices + * @since v3.8.0 + */ +export const zipConcur: ( + ...iterables: Readonly<{ + [Key in keyof Values]: + | Iterable + | AsyncIterable + | ConcurIterable + }> +) => ConcurIterable diff --git a/src/operations/splices.js b/src/operations/splices.js index 1faa2a3..d478acb 100644 --- a/src/operations/splices.js +++ b/src/operations/splices.js @@ -18,6 +18,7 @@ import { pipe, } from './core.js' import { findLast, findLastAsync, findLastConcur } from './filters.js' +import { allConcur } from './predicates.js' import { flatten, flattenAsync, @@ -338,3 +339,115 @@ const createWindow = size => { export const concat = (...iterables) => flatten(iterables) export const concatAsync = (...iterables) => flattenAsync(asAsync(iterables)) export const concatConcur = (...iterables) => flattenConcur(asConcur(iterables)) + +export const zip = (...iterables) => + createIterable(function* () { + if (iterables.length === 0) { + return + } + + const iterators = iterables.map(iterable => iterable[Symbol.iterator]()) + const values = iterables.map(() => undefined) + while ( + iterators.every((iterator, index) => { + const result = iterator.next() + values[index] = result.value + return !result.done + }) + ) { + yield [...values] + } + }) + +export const zipAsync = (...iterables) => + createAsyncIterable(async function* () { + if (iterables.length === 0) { + return + } + + const asyncIterators = iterables.map(iterable => + asAsync(iterable)[Symbol.asyncIterator](), + ) + const values = iterables.map(() => undefined) + while ( + await allConcur(async ([index, asyncIterator]) => { + const result = await asyncIterator.next() + values[index] = result.value + return !result.done + }, asConcur(asyncIterators.entries())) + ) { + yield [...values] + } + }) + +export const zipConcur = + (...iterables) => + apply => + promiseWithEarlyResolve(async resolve => { + if (iterables.length === 0) { + return + } + + const valuesPerIterable = iterables.map(() => []) + const remainingValues = [] + let remainingBatches = Infinity + let resolved = false + + await Promise.all( + map(async ([index, iterable]) => { + const values = valuesPerIterable[index] + + await asConcur(iterable)(async value => { + if (resolved) { + return + } + + const valueIndex = values.length + if (valueIndex >= remainingBatches) { + // There's no point in processing this value because the length of + // the shortest known iterable, so it's not going to be part of + // any yielded batch. + return + } + + // Queue this value for a future batch and track the remaining + // number of values we're waiting on to be able to yield that batch. + values.push(value) + if (valueIndex >= remainingValues.length) { + remainingValues.push(iterables.length) + } + remainingValues[valueIndex]-- + + const canYieldBatch = valueIndex === 0 && remainingValues[0] === 0 + if (!canYieldBatch) { + return + } + + // Dequeue and yield the next batch. + remainingValues.shift() + const batch = valuesPerIterable.map(values => values.shift()) + remainingBatches-- + await apply(batch) + + if (!resolved && remainingBatches === 0) { + resolved = true + resolve() + } + }) + + if (values.length > 0) { + // The remaining number of batches can be no more than the remaining + // number of queued values for this resolved concur iterable. Track + // that number so we can resolve the zipped concur iterable as soon + // as possible (see above). + remainingBatches = Math.min(remainingBatches, values.length) + } else { + // We can resolve early because there aren't any queued values left + // for concur iterable, so we'll never complete and yield another + // batch. + resolved = true + resolve() + } + }, iterables.entries()), + ) + }) diff --git a/test/helpers/fast-check/iterables.ts b/test/helpers/fast-check/iterables.ts index 19eab14..8d2db68 100644 --- a/test/helpers/fast-check/iterables.ts +++ b/test/helpers/fast-check/iterables.ts @@ -24,11 +24,13 @@ export const getIterableArb = ( getArrayArb(arb, constraints).map(values => ({ iterable: new IterableWithPrivateFields(values), values, + iterationOrder: values, })) export type GeneratedIterable = { iterable: Iterable values: Value[] + iterationOrder: Value[] } // Used to ensure we call `Symbol.iterator` with the right `this`. @@ -62,11 +64,13 @@ export const getAsyncIterableArb = ( getArrayArb(arb, constraints).map(values => ({ iterable: new AsyncIterableWithPrivateFields(values), values, + iterationOrder: values, })) export type GeneratedAsyncIterable = { iterable: AsyncIterable values: Value[] + iterationOrder: Value[] } // Used to ensure we call `Symbol.asyncIterator` with the right `this`. @@ -102,24 +106,31 @@ export const getConcurIterableArb = ( ): fc.Arbitrary> => getArrayArb(arb, constraints).map(values => { const index = getIterableIndex() + const iterationOrder: Value[] = [] return { iterable: Object.assign( async (apply: (value: Value) => MaybePromiseLike) => { await Promise.all( - values.map(async value => - apply(await getScheduler()!.schedule(value)), - ), + values.map(async value => { + const scheduledValue = await getScheduler()!.schedule(value) + iterationOrder.push(value) + await apply(scheduledValue) + }), ) }, { [fc.toStringMethod]: () => `ConcurIterable$${index}` }, ), values, + get iterationOrder() { + return iterationOrder + }, } }) export type GeneratedConcurIterable = { iterable: ConcurIterable values: Value[] + iterationOrder: Value[] } export const concurIterableArb = getConcurIterableArb(fc.anything()) diff --git a/test/operations/splices.ts b/test/operations/splices.ts index 1b5e960..a18b829 100644 --- a/test/operations/splices.ts +++ b/test/operations/splices.ts @@ -39,6 +39,8 @@ import { dropWhile, dropWhileAsync, dropWhileConcur, + each, + eachAsync, emptyAsync, emptyConcur, first, @@ -47,6 +49,8 @@ import { flatMapConcur, flatten, flattenAsync, + forEach, + forEachAsync, get, getAsync, getConcur, @@ -74,6 +78,9 @@ import { window, windowAsync, windowConcur, + zip, + zipAsync, + zipConcur, } from '~/index.js' import type { AsyncOptional, @@ -2522,3 +2529,180 @@ test.prop([ expect(elapsed).toBe((await scheduler.report()).max().elapsed) }, ) + +test.skip(`zip types are correct`, () => { + expectTypeOf(zip([1, 2, 3], [`a`, `b`, `c`], [1, 2, 3])).toMatchTypeOf< + Iterable<[number, string, number]> + >() +}) + +test.prop([fc.array(iterableArb)])(`zip returns a pure iterable`, iterables => { + const zippedIterable = zip(...iterables.map(({ iterable }) => iterable)) + + expect(zippedIterable).toBeIterable() +}) + +test.prop([fc.array(iterableArb, { minLength: 1 })])( + `zip returns an iterable zipped from the given non-empty iterables`, + iterables => { + const zippedIterable = zip(...iterables.map(({ iterable }) => iterable)) + + expect([...zippedIterable]).toStrictEqual( + Array.from( + { length: Math.min(...iterables.map(({ values }) => values.length)) }, + (_, index) => iterables.map(({ values }) => values[index]), + ), + ) + }, +) + +test(`zip returns an empty iterable for zero arguments`, () => { + const iterable = zip() + + expect([...iterable]).toBeEmpty() +}) + +test.prop([fc.array(iterableArb)])(`zip is lazy`, iterables => { + const counts = iterables.map(() => 0) + + const zippedIterable = zip( + ...iterables.map(({ iterable }, index) => + each(() => counts[index]!++, iterable), + ), + ) + + let expectedCount = 0 + expect(counts).toStrictEqual(iterables.map(() => expectedCount)) + forEach(() => { + expectedCount++ + expect(counts).toStrictEqual(iterables.map(() => expectedCount)) + }, zippedIterable) +}) + +test.skip(`zipAsync types are correct`, () => { + expectTypeOf(zipAsync([1, 2, 3], [`a`, `b`, `c`], [1, 2, 3])).toMatchTypeOf< + AsyncIterable<[number, string, number]> + >() + expectTypeOf( + zipAsync(asAsync([1, 2, 3]), [`a`, `b`, `c`], [1, 2, 3]), + ).toMatchTypeOf>() + expectTypeOf( + zipAsync(asAsync([1, 2, 3]), asAsync([`a`, `b`, `c`]), [1, 2, 3]), + ).toMatchTypeOf>() + expectTypeOf( + zipAsync(asAsync([1, 2, 3]), asAsync([`a`, `b`, `c`]), asAsync([1, 2, 3])), + ).toMatchTypeOf>() +}) + +test.prop([fc.array(fc.oneof(iterableArb, asyncIterableArb))])( + `zipAsync returns a pure async iterable`, + async iterables => { + const zippedIterable = zipAsync( + ...iterables.map(({ iterable }) => iterable), + ) + + await expect(zippedIterable).toBeAsyncIterable() + }, +) + +test.prop([ + fc.array(fc.oneof(iterableArb, asyncIterableArb), { minLength: 1 }), +])( + `zipAsync returns an async iterable zipped from the given non-empty iterables`, + async iterables => { + const zippedIterable = zipAsync( + ...iterables.map(({ iterable }) => iterable), + ) + + await expect(reduceAsync(toArray(), zippedIterable)).resolves.toStrictEqual( + Array.from( + { length: Math.min(...iterables.map(({ values }) => values.length)) }, + (_, index) => iterables.map(({ values }) => values[index]), + ), + ) + }, +) + +test(`zipAsync returns an empty async iterable for zero arguments`, async () => { + const asyncIterable = zipAsync() + + await expect(reduceAsync(toArray(), asyncIterable)).resolves.toBeEmpty() +}) + +test.prop([fc.array(fc.oneof(iterableArb, asyncIterableArb))])( + `zipAsync is lazy`, + async iterables => { + const counts = iterables.map(() => 0) + + const zippedIterable = zipAsync( + ...iterables.map(({ iterable }, index) => + eachAsync(() => counts[index]!++, asAsync(iterable)), + ), + ) + + let expectedCount = 0 + expect(counts).toStrictEqual(iterables.map(() => expectedCount)) + await forEachAsync(() => { + expectedCount++ + expect(counts).toStrictEqual(iterables.map(() => expectedCount)) + }, zippedIterable) + }, +) + +test.skip(`zipConcur types are correct`, () => { + expectTypeOf(zipConcur([1, 2, 3], [`a`, `b`, `c`], [1, 2, 3])).toMatchTypeOf< + ConcurIterable<[number, string, number]> + >() + expectTypeOf( + zipConcur([1, 2, 3], asAsync([`a`, `b`, `c`]), [1, 2, 3]), + ).toMatchTypeOf>() + expectTypeOf( + zipConcur([1, 2, 3], asAsync([`a`, `b`, `c`]), asConcur([1, 2, 3])), + ).toMatchTypeOf>() + expectTypeOf( + zipConcur([1, 2, 3], asConcur([`a`, `b`, `c`]), asConcur([1, 2, 3])), + ).toMatchTypeOf>() + expectTypeOf( + zipConcur( + asAsync([1, 2, 3]), + asConcur([`a`, `b`, `c`]), + asConcur([1, 2, 3]), + ), + ).toMatchTypeOf>() +}) + +test.prop([ + fc.array(fc.oneof(iterableArb, asyncIterableArb, concurIterableArb)), +])(`zipConcur returns a concur iterable`, async iterables => { + const zippedIterable = zipConcur(...iterables.map(({ iterable }) => iterable)) + + await expect(zippedIterable).toBeConcurIterable({ pure: false }) +}) + +test.prop([ + fc.array(fc.oneof(iterableArb, asyncIterableArb, concurIterableArb), { + minLength: 1, + }), +])( + `zipConcur returns a concur iterable zipped from the given non-empty iterables`, + async iterables => { + const zippedIterable = zipConcur( + ...iterables.map(({ iterable }) => iterable), + ) + + // eslint-disable-next-line vitest/prefer-expect-resolves + expect(await reduceConcur(toArray(), zippedIterable)).toStrictEqual( + Array.from( + { length: Math.min(...iterables.map(({ values }) => values.length)) }, + (_, index) => + iterables.map(({ iterationOrder }) => iterationOrder[index]), + ), + ) + }, +) + +test(`zipConcur returns an empty async iterable for zero arguments`, async () => { + const concurIterable = zipConcur() + + await expect(reduceConcur(toArray(), concurIterable)).resolves.toBeEmpty() +})