From 1bbb36689ba6f1bea16e9efc5181d8ece1700657 Mon Sep 17 00:00:00 2001 From: Matteo Pietro Dazzi Date: Fri, 1 Dec 2023 23:16:53 +0100 Subject: [PATCH] feat: merge --- src/merge.ts | 32 ++++++++++++++++++++++++++++++++ test/merge.test.ts | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+) create mode 100644 src/merge.ts create mode 100644 test/merge.test.ts diff --git a/src/merge.ts b/src/merge.ts new file mode 100644 index 0000000..c90eb17 --- /dev/null +++ b/src/merge.ts @@ -0,0 +1,32 @@ +const recursivePromiseBuilder = ( + reader: ReadableStreamDefaultReader, + controller: ReadableStreamDefaultController, +) => { + const recursivePromise = async () => { + const readResult = await reader.read(); + + if (!readResult.done) { + controller.enqueue(readResult.value); + + return recursivePromise(); + } + }; + + return recursivePromise(); +}; + +export const merge = (...readableStreams: ReadableStream[]) => { + const fallbackedStreams = readableStreams ?? []; + + return new ReadableStream({ + async pull(controller) { + await Promise.all( + fallbackedStreams.map((stream) => + recursivePromiseBuilder(stream.getReader(), controller), + ), + ); + + controller.close(); + }, + }); +}; diff --git a/test/merge.test.ts b/test/merge.test.ts new file mode 100644 index 0000000..64790f3 --- /dev/null +++ b/test/merge.test.ts @@ -0,0 +1,33 @@ +import { describe, expect, test } from "vitest"; + +import { fromIterable } from "../src/fromIterable"; +import { merge } from "../src/merge"; +import { pipeline } from "../src/pipeline"; +import { toArray } from "../src/toArray"; + +describe("merge", () => { + test("should merge streams", async () => { + const destinationArray = []; + + await pipeline( + merge(fromIterable([1]), fromIterable([2]), fromIterable([3])), + toArray(destinationArray), + ); + + expect(destinationArray).toContain(1); + expect(destinationArray).toContain(2); + expect(destinationArray).toContain(3); + }); + + test("should merge streams even if first stream is empty", async () => { + const destinationArray = []; + + await pipeline( + merge(fromIterable([]), fromIterable([2]), fromIterable([3])), + toArray(destinationArray), + ); + + expect(destinationArray).toContain(2); + expect(destinationArray).toContain(3); + }); +});