diff --git a/package.json b/package.json index 42b3c06..90cc581 100644 --- a/package.json +++ b/package.json @@ -6,6 +6,7 @@ "type": "module", "scripts": { "format": "biome format ./src ./test --write", + "sort": "biome check --apply-unsafe ./src ./test", "lint": "biome lint ./src ./test", "lint:fix": "pnpm run lint --fix", "test": "pnpm run \"/^test:.*/\"", diff --git a/src/filter.ts b/src/filter.ts index df2aaa4..b7006bd 100644 --- a/src/filter.ts +++ b/src/filter.ts @@ -1,13 +1,13 @@ type CallbackFn = (value: T, index: number) => boolean | Promise; export const filter = (callbackfn: CallbackFn) => { - let index = 0; - return new TransformStream({ - async transform(chunk, controller) { - const result = await callbackfn(chunk, index++); - if (result) { - controller.enqueue(chunk); - } - }, - }); -} \ No newline at end of file + let index = 0; + return new TransformStream({ + async transform(chunk, controller) { + const result = await callbackfn(chunk, index++); + if (result) { + controller.enqueue(chunk); + } + }, + }); +}; diff --git a/src/pipeline.ts b/src/pipeline.ts new file mode 100644 index 0000000..3731618 --- /dev/null +++ b/src/pipeline.ts @@ -0,0 +1,35 @@ +type MixedPipeline = ( + source: ReadableStream, + ...streams: (WritableStream | TransformStream)[] +) => ReadableStream | Promise; + +type TransformPipeline = ( + source: ReadableStream, + ...streams: TransformStream[] +) => ReadableStream; + +type WritablePipeline = ( + source: ReadableStream, + stream: WritableStream, +) => Promise; + +type PipelineType = TransformPipeline & WritablePipeline & MixedPipeline; + +const pipelineReducerBuilder = + (lastPipelineItem: number) => + (pipeline, stream: WritableStream | TransformStream, index: number) => { + if (index === lastPipelineItem && stream instanceof WritableStream) { + return pipeline.pipeTo(stream); + } + + return pipeline.pipeThrough(stream as TransformStream); + }; + +export const pipeline: PipelineType = ( + source: ReadableStream, + ...streams: (TransformStream | WritableStream)[] +) => { + const lastPipelineItem = streams.length - 1; + + return streams.reduce(pipelineReducerBuilder(lastPipelineItem), source); +}; diff --git a/test/filter.test.ts b/test/filter.test.ts index 1d0098f..240a8e7 100644 --- a/test/filter.test.ts +++ b/test/filter.test.ts @@ -46,13 +46,13 @@ describe("filter", () => { const destinationArray = []; const filterStream = filter(async (value) => { - await setTimeout(0) - return value % 2 === 0 + await setTimeout(0); + return value % 2 === 0; }); await fromIterable(sourceArray) .pipeThrough(filterStream) .pipeTo(toArray(destinationArray)); expect(destinationArray).toEqual([2]); - }) + }); }); diff --git a/test/map.test.ts b/test/map.test.ts index b022c9b..7ecfbc4 100644 --- a/test/map.test.ts +++ b/test/map.test.ts @@ -46,13 +46,13 @@ describe("map", () => { const destinationArray = []; const mapStream = map(async (value) => { - await setTimeout(0) - return value * 3 + await setTimeout(0); + return value * 3; }); await fromIterable(sourceArray) .pipeThrough(mapStream) .pipeTo(toArray(destinationArray)); expect(destinationArray).toEqual([3, 6, 9]); - }) + }); }); diff --git a/test/pipeline.test.ts b/test/pipeline.test.ts new file mode 100644 index 0000000..a17a86a --- /dev/null +++ b/test/pipeline.test.ts @@ -0,0 +1,73 @@ +import { setTimeout } from "timers/promises"; +import { describe, expect, test } from "vitest"; + +import { fromIterable } from "../src/fromIterable"; +import { map } from "../src/map"; +import { pipeline } from "../src/pipeline"; +import { toArray } from "../src/toArray"; + +describe("pipeline", () => { + test("should correctly handle mixed pipeline", async () => { + const sourceArray = [1, 2, 3]; + const destinationArray = []; + + await pipeline( + fromIterable(sourceArray), + map((value) => value * 2), + toArray(destinationArray), + ); + + expect(destinationArray).toEqual([2, 4, 6]); + }); + + test("should correctly handle transform and write pipelines separately", async () => { + const sourceArray = [1, 2, 3]; + const destinationArray = []; + + const mappingPipeline = pipeline( + fromIterable(sourceArray), + map((value) => value * 2), + ); + + await pipeline(mappingPipeline, toArray(destinationArray)); + + expect(destinationArray).toEqual([2, 4, 6]); + }); + + test("should correctly handle write pipeline", async () => { + const sourceArray = [1, 2, 3]; + const destinationArray = []; + + await pipeline(fromIterable(sourceArray), toArray(destinationArray)); + + expect(destinationArray).toEqual(sourceArray); + }); + + test("should correctly throw if there are 2 readable streams", async () => { + expect(() => + pipeline( + fromIterable([1]), + // @ts-expect-error the >2 parameter should never be a readable stream + fromIterable([2]), + ), + ).toThrow( + 'The "transform.readable" property must be an instance of ReadableStream. Received undefined', + ); + }); + + test("should correctly handle promises", async () => { + const sourceArray = [1, 2, 3]; + const destinationArray = []; + + await pipeline( + fromIterable(sourceArray), + map(async (value) => { + await setTimeout(1); + return value * 2; + }), + toArray(destinationArray), + ); + + expect(destinationArray).toEqual([2, 4, 6]); + }); +}); diff --git a/test/toArray.test.ts b/test/toArray.test.ts index 2fd9a4e..7329819 100644 --- a/test/toArray.test.ts +++ b/test/toArray.test.ts @@ -30,6 +30,8 @@ describe("toArray", () => { it("should not throw if source stream is empty", async () => { // @ts-expect-error undefined parameter for test - expect(fromIterable([]).pipeTo(toArray(undefined))).resolves.toBeUndefined(); + expect( + fromIterable([]).pipeTo(toArray(undefined)), + ).resolves.toBeUndefined(); }); });