From aea361261a29567574b2851eee4c73efb5863de0 Mon Sep 17 00:00:00 2001 From: Matteo Pietro Dazzi Date: Fri, 1 Dec 2023 23:16:53 +0100 Subject: [PATCH] feat: merge --- src/merge.ts | 29 +++++++++++++++++++++++++++++ test/merge.test.ts | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+) create mode 100644 src/merge.ts create mode 100644 test/merge.test.ts diff --git a/src/merge.ts b/src/merge.ts new file mode 100644 index 0000000..1d00be8 --- /dev/null +++ b/src/merge.ts @@ -0,0 +1,29 @@ +const recursivePromiseBuilder = (reader: ReadableStreamDefaultReader, controller: ReadableStreamDefaultController) => { + const recursivePromise = async () => { + const readResult = await reader.read(); + + if (!readResult.done) { + controller.enqueue(readResult.value); + + return recursivePromise(); + } + + } + + return recursivePromise(); +} + +export const merge = (...readableStreams: ReadableStream[]) => { + const fallbackedStreams = readableStreams ?? []; + + return new ReadableStream({ + async pull(controller) { + + await Promise.all( + fallbackedStreams.map(stream => recursivePromiseBuilder(stream.getReader(), controller)) + ) + + controller.close(); + }, + }); +}; diff --git a/test/merge.test.ts b/test/merge.test.ts new file mode 100644 index 0000000..64790f3 --- /dev/null +++ b/test/merge.test.ts @@ -0,0 +1,33 @@ +import { describe, expect, test } from "vitest"; + +import { fromIterable } from "../src/fromIterable"; +import { merge } from "../src/merge"; +import { pipeline } from "../src/pipeline"; +import { toArray } from "../src/toArray"; + +describe("merge", () => { + test("should merge streams", async () => { + const destinationArray = []; + + await pipeline( + merge(fromIterable([1]), fromIterable([2]), fromIterable([3])), + toArray(destinationArray), + ); + + expect(destinationArray).toContain(1); + expect(destinationArray).toContain(2); + expect(destinationArray).toContain(3); + }); + + test("should merge streams even if first stream is empty", async () => { + const destinationArray = []; + + await pipeline( + merge(fromIterable([]), fromIterable([2]), fromIterable([3])), + toArray(destinationArray), + ); + + expect(destinationArray).toContain(2); + expect(destinationArray).toContain(3); + }); +});