Skip to content

Commit

Permalink
feat: pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
ilteoood committed Nov 26, 2023
1 parent d682ac8 commit b4ea70d
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 17 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"type": "module",
"scripts": {
"format": "biome format ./src ./test --write",
"sort": "biome check --apply-unsafe ./src ./test",
"lint": "biome lint ./src ./test",
"lint:fix": "pnpm run lint --fix",
"test": "pnpm run \"/^test:.*/\"",
Expand Down
20 changes: 10 additions & 10 deletions src/filter.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
type CallbackFn<T> = (value: T, index: number) => boolean | Promise<boolean>;

export const filter = <T>(callbackfn: CallbackFn<T>) => {
let index = 0;
return new TransformStream<T, T>({
async transform(chunk, controller) {
const result = await callbackfn(chunk, index++);
if (result) {
controller.enqueue(chunk);
}
},
});
}
let index = 0;
return new TransformStream<T, T>({
async transform(chunk, controller) {
const result = await callbackfn(chunk, index++);
if (result) {
controller.enqueue(chunk);
}
},
});
};
35 changes: 35 additions & 0 deletions src/pipeline.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
type MixedPipeline = (
source: ReadableStream,
...streams: (WritableStream | TransformStream)[]
) => ReadableStream | Promise<void>;

type TransformPipeline = (
source: ReadableStream,
...streams: TransformStream[]
) => ReadableStream;

type WritablePipeline = (
source: ReadableStream,
stream: WritableStream,
) => Promise<void>;

type PipelineType = TransformPipeline & WritablePipeline & MixedPipeline;

const pipelineReducerBuilder =
(lastPipelineItem: number) =>
(pipeline, stream: WritableStream | TransformStream, index: number) => {
if (index === lastPipelineItem && stream instanceof WritableStream) {
return pipeline.pipeTo(stream);
}

return pipeline.pipeThrough(stream as TransformStream);
};

export const pipeline: PipelineType = (
source: ReadableStream,
...streams: (TransformStream | WritableStream)[]
) => {
const lastPipelineItem = streams.length - 1;

return streams.reduce(pipelineReducerBuilder(lastPipelineItem), source);
};
6 changes: 3 additions & 3 deletions test/filter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ describe("filter", () => {
const destinationArray = [];

const filterStream = filter<number>(async (value) => {
await setTimeout(0)
return value % 2 === 0
await setTimeout(0);
return value % 2 === 0;
});
await fromIterable(sourceArray)
.pipeThrough(filterStream)
.pipeTo(toArray(destinationArray));

expect(destinationArray).toEqual([2]);
})
});
});
6 changes: 3 additions & 3 deletions test/map.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ describe("map", () => {
const destinationArray = [];

const mapStream = map<number, number>(async (value) => {
await setTimeout(0)
return value * 3
await setTimeout(0);
return value * 3;
});
await fromIterable(sourceArray)
.pipeThrough(mapStream)
.pipeTo(toArray(destinationArray));

expect(destinationArray).toEqual([3, 6, 9]);
})
});
});
73 changes: 73 additions & 0 deletions test/pipeline.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { setTimeout } from "timers/promises";
import { describe, expect, test } from "vitest";

import { fromIterable } from "../src/fromIterable";
import { map } from "../src/map";
import { pipeline } from "../src/pipeline";
import { toArray } from "../src/toArray";

describe("pipeline", () => {
test("should correctly handle mixed pipeline", async () => {
const sourceArray = [1, 2, 3];
const destinationArray = [];

await pipeline(
fromIterable(sourceArray),
map((value) => value * 2),
toArray(destinationArray),
);

expect(destinationArray).toEqual([2, 4, 6]);
});

test("should correctly handle transform and write pipelines separately", async () => {
const sourceArray = [1, 2, 3];
const destinationArray = [];

const mappingPipeline = pipeline(
fromIterable(sourceArray),
map((value) => value * 2),
);

await pipeline(mappingPipeline, toArray(destinationArray));

expect(destinationArray).toEqual([2, 4, 6]);
});

test("should correctly handle write pipeline", async () => {
const sourceArray = [1, 2, 3];
const destinationArray = [];

await pipeline(fromIterable(sourceArray), toArray(destinationArray));

expect(destinationArray).toEqual(sourceArray);
});

test("should correctly throw if there are 2 readable streams", async () => {
expect(() =>
pipeline(
fromIterable([1]),
// @ts-expect-error the >2 parameter should never be a readable stream
fromIterable([2]),
),
).toThrow(
'The "transform.readable" property must be an instance of ReadableStream. Received undefined',
);
});

test("should correctly handle promises", async () => {
const sourceArray = [1, 2, 3];
const destinationArray = [];

await pipeline(
fromIterable(sourceArray),
map(async (value) => {
await setTimeout(1);
return value * 2;
}),
toArray(destinationArray),
);

expect(destinationArray).toEqual([2, 4, 6]);
});
});
4 changes: 3 additions & 1 deletion test/toArray.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ describe("toArray", () => {

it("should not throw if source stream is empty", async () => {
// @ts-expect-error undefined parameter for test
expect(fromIterable([]).pipeTo(toArray(undefined))).resolves.toBeUndefined();
expect(
fromIterable([]).pipeTo(toArray(undefined)),
).resolves.toBeUndefined();
});
});

0 comments on commit b4ea70d

Please sign in to comment.