Skip to content

Commit

Permalink
feat: map stream
Browse files Browse the repository at this point in the history
  • Loading branch information
ilteoood committed Nov 26, 2023
1 parent cdb8cf3 commit b06b1fa
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 36 deletions.
22 changes: 14 additions & 8 deletions src/fromArray.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
export const fromArray = <T>(sourceArray: T[], strategy?: QueuingStrategy): ReadableStream<T> => {
export const fromArray = <T>(
sourceArray: T[],
strategy?: QueuingStrategy,
): ReadableStream<T> => {
const arrayLength = sourceArray.length;
return new ReadableStream<T>({
start(controller) {
for (let i = 0; i < arrayLength; i++) {
controller.enqueue(sourceArray[i]);
}
controller.close();
return new ReadableStream<T>(
{
start(controller) {
for (let i = 0; i < arrayLength; i++) {
controller.enqueue(sourceArray[i]);
}
controller.close();
},
},
}, strategy);
strategy,
);
};
10 changes: 10 additions & 0 deletions src/map.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
type CallbackFn<T, U> = (value: T, index: number) => U;

export const map = <T, U>(callbackfn: CallbackFn<T, U>) => {
let index = 0;
return new TransformStream<T, U>({
transform(chunk, controller) {
controller.enqueue(callbackfn(chunk, index++));
},
});
};
10 changes: 5 additions & 5 deletions src/toArray.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
export const toArray = <T>(destinationArray: T[]): WritableStream<T> => {
return new WritableStream<T>({
write(chunk) {
destinationArray.push(chunk);
},
})
return new WritableStream<T>({
write(chunk) {
destinationArray.push(chunk);
},
});
};
42 changes: 42 additions & 0 deletions test/map.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { describe, expect, it } from "vitest";
import { fromArray } from "../src/fromArray";
import { map } from "../src/map";
import { toArray } from "../src/toArray";

describe("map", () => {
it("should map a stream", async () => {
const sourceArray = [1, 2, 3];
const destinationArray = [];

const mapStream = map<number, number>((value) => value * 2);

await fromArray(sourceArray)
.pipeThrough(mapStream)
.pipeTo(toArray(destinationArray));

expect(destinationArray).toEqual([2, 4, 6]);
});

it("should not break for an empty array", async () => {
const sourceArray = [];
const destinationArray = [];

const mapStream = map<number, number>((value) => value);
await fromArray(sourceArray)
.pipeThrough(mapStream)
.pipeTo(toArray(destinationArray));

expect(destinationArray).toEqual([]);
});

it("should throw if parameter is undefined", async () => {
// @ts-expect-error undefined parameter for test
const mapStream = map<number, number>(undefined);

expect(
fromArray([1])
.pipeThrough(mapStream)
.pipeTo(toArray([] as number[])),
).rejects.toThrow("callbackfn is not a function");
});
});
46 changes: 23 additions & 23 deletions test/toArray.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,33 @@ import { fromArray } from "../src/fromArray";
import { toArray } from "../src/toArray";

describe("toArray", () => {
it('should accumulate a stream into an array', async () => {
const sourceArray = [1, 2, 3];
const destinationArray = [];
it("should accumulate a stream into an array", async () => {
const sourceArray = [1, 2, 3];
const destinationArray = [];

await fromArray(sourceArray).pipeTo(toArray(destinationArray));
await fromArray(sourceArray).pipeTo(toArray(destinationArray));

expect(destinationArray).toEqual(sourceArray);
})
expect(destinationArray).toEqual(sourceArray);
});

it('should not break for an empty array', async () => {
const sourceArray = [];
const destinationArray = [];
it("should not break for an empty array", async () => {
const sourceArray = [];
const destinationArray = [];

await fromArray(sourceArray).pipeTo(toArray(destinationArray));
await fromArray(sourceArray).pipeTo(toArray(destinationArray));

expect(destinationArray).toEqual(sourceArray);
})
expect(destinationArray).toEqual(sourceArray);
});

it('should throw if parameter is undefined', async () => {
// @ts-expect-error undefined parameter for test
expect(fromArray([1]).pipeTo(toArray(undefined))).rejects.toThrow(
"Cannot read properties of undefined (reading 'push')",
);
})
it("should throw if parameter is undefined", async () => {
// @ts-expect-error undefined parameter for test
expect(fromArray([1]).pipeTo(toArray(undefined))).rejects.toThrow(
"Cannot read properties of undefined (reading 'push')",
);
});

it('should not throw if source stream is empty', async () => {
// @ts-expect-error undefined parameter for test
expect(fromArray([]).pipeTo(toArray(undefined))).resolves.toBeUndefined()
})
})
it("should not throw if source stream is empty", async () => {
// @ts-expect-error undefined parameter for test
expect(fromArray([]).pipeTo(toArray(undefined))).resolves.toBeUndefined();
});
});

0 comments on commit b06b1fa

Please sign in to comment.