Skip to content

Commit

Permalink
rename compose into chainStream
Browse files Browse the repository at this point in the history
  • Loading branch information
bguerout committed Jan 3, 2025
1 parent f19c8b2 commit 9e3c474
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 49 deletions.
25 changes: 12 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import { ... } from "oleoduc";
- Easily transform, filter and write data flowing through the stream
- Catch stream errors
- Compose and merge streams together
- Chain and merge streams together
- Read a stream as if it were a promise
### Quick tour
Expand Down Expand Up @@ -67,11 +67,11 @@ app.get("/documents", async (req, res) => {
Create a stream to parse CSV and iterate over it
```js
const { compose, transformData } = require("oleoduc");
const { chainStream, transformData } = require("oleoduc");
const { createReadStream } = require("fs");
const { parse } = require("csv-parse");
const csvStream = compose(
const csvStream = chainStream(
createReadStream("/path/to/file.csv"),
parse(),
)
Expand All @@ -84,7 +84,7 @@ for await (const data of csvStream) {
# API
* [accumulateData](#accumulatedatacallback-options)
* [compose](#composestreams-options)
* [chainStream](#chainstreamstreams-options)
* [concatStreams](#concatstreamsstreams-options)
* [filterData](#filterdatacallback-options)
* [flattenArray](#flattenarrayoptions)
Expand Down Expand Up @@ -167,7 +167,7 @@ oleoduc(
]
```
## compose(...streams, [options])
## chainStream(...streams, [options])
Same as oleoduc but without promise stuff and stream composition capability
Expand All @@ -178,14 +178,14 @@ Same as oleoduc but without promise stuff and stream composition capability
- `*`: The rest of the options is passed
to [stream.Transform](https://nodejs.org/api/stream.html#stream_class_stream_transform)
Compose streams
Chain streams
```js
const { compose, transformData, writeData } = require("oleoduc");
const { chainStream, transformData, writeData } = require("oleoduc");
async function getCursor() {
const cursor = await getDataFromDB();
return compose(
return chainStream(
cursor,
transformData((data) => data.value * 10),
)
Expand All @@ -198,13 +198,12 @@ await oleoduc(
);
```
Iterate over a composed readable stream
Iterate over a chained readable stream
```js
const { compose, transformData } = require("oleoduc");
const { chainStream, transformData } = require("oleoduc");
const stream
compose(
const stream = chainStream(
source,
transformData((data) => data.trim()),
);
Expand All @@ -220,7 +219,7 @@ Handle errors in single event listener
```js
const { oleoduc, writeData } = require("oleoduc");
const stream = compose(
const stream = chainStream(
source,
writeData((obj) => throw new Error())
);
Expand Down
14 changes: 7 additions & 7 deletions src/compose.ts → src/chainStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ import {pipeStreamsTogether} from "./utils/pipeStreamsTogether";
import {AnyStream, PipeableStreams} from "./types";
import {Readable, TransformOptions} from "stream";

export type ComposeOptions = TransformOptions;
export type ChainStreamOptions = TransformOptions;

// eslint-disable-next-line @typescript-eslint/no-unused-vars
type ComposeReturn<TLast extends AnyStream> = TLast extends Readable
type ChainStreamReturn<TLast extends AnyStream> = TLast extends Readable
? NodeJS.ReadWriteStream & Readable
: NodeJS.ReadWriteStream;

export function compose<TLast extends NodeJS.ReadWriteStream | NodeJS.WritableStream>(
...args: PipeableStreams<NodeJS.ReadableStream | NodeJS.ReadWriteStream, TLast, ComposeOptions>
): ComposeReturn<TLast> {
const {params: streams, options} = parseArgs<AnyStream, ComposeOptions>(args);
export function chainStream<TLast extends NodeJS.ReadWriteStream | NodeJS.WritableStream>(
...args: PipeableStreams<NodeJS.ReadableStream | NodeJS.ReadWriteStream, TLast, ChainStreamOptions>
): ChainStreamReturn<TLast> {
const {params: streams, options} = parseArgs<AnyStream, ChainStreamOptions>(args);

const {first, last, wrapper} = wrapStreams(streams, options);

Expand All @@ -31,5 +31,5 @@ export function compose<TLast extends NodeJS.ReadWriteStream | NodeJS.WritableSt

pipeStreamsTogether(streams, wrapper);

return wrapper as ComposeReturn<TLast>;
return wrapper as ChainStreamReturn<TLast>;
}
6 changes: 5 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {accumulateData} from "./accumulateData";
import {compose} from "./compose";
import {chainStream} from "./chainStream";
import {concatStreams} from "./concatStreams";
import {filterData} from "./filterData";
import {flattenArray} from "./flattenArray";
Expand All @@ -14,8 +14,12 @@ import {transformStream} from "./transformStream";
import {writeData} from "./writeData";
import {writeToStdout} from "./writeToStdout";

//Legacy from 0.8.x
const compose = chainStream;

export {
accumulateData,
chainStream,
compose,
concatStreams,
filterData,
Expand Down
4 changes: 2 additions & 2 deletions src/readLineByLine.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import {accumulateData} from "./accumulateData";
import {flattenArray} from "./flattenArray";
import {compose} from "./compose";
import {chainStream} from "./chainStream";

export function readLineByLine() {
return compose(
return chainStream(
accumulateData<string, string[]>(
(acc, data: string, flush) => {
const lines = data.toString().split(/\r?\n/);
Expand Down
38 changes: 19 additions & 19 deletions test/compose-test.ts → test/chainStream-test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import {deepStrictEqual, fail} from "assert";
import {createSlowStream, createStream} from "./testUtils";
import {compose, flattenArray, transformData, writeData} from "../src";
import {chainStream, flattenArray, transformData, writeData} from "../src";

describe("compose", () => {
it("can compose streams", (done) => {
describe("chainStream", () => {
it("can chain streams", (done) => {
const chunks: string[] = [];
const source = createStream();
source.push("first");
source.push(null);

compose(
chainStream(
source,
transformData((data: string) => data.substring(0, 1)),
writeData((data: string) => {
Expand All @@ -26,15 +26,15 @@ describe("compose", () => {
});
});

it("can iterate over a composed stream", async () => {
it("can iterate over a chained stream", async () => {
const chunks: string[] = [];
const source = createStream();
source.push("andré");
source.push("bruno");
source.push("robert");
source.push(null);

const stream = compose(
const stream = chainStream(
source,
transformData((data: string) => data.substring(0, 1)),
);
Expand All @@ -46,15 +46,15 @@ describe("compose", () => {
deepStrictEqual(chunks, ["a", "b", "r"]);
});

it("can pipe a compose stream", (done) => {
it("can pipe a chain stream", (done) => {
const chunks: string[] = [];
const source = createStream();
source.push("andré");
source.push("bruno");
source.push("robert");
source.push(null);

compose(source)
chainStream(source)
.pipe(transformData((data: string) => data.substring(0, 1)))
.pipe(
writeData((data: string) => {
Expand All @@ -67,7 +67,7 @@ describe("compose", () => {
});
});

it("can build compose with first writeable and last readable (duplex)", (done) => {
it("can build chain with first writeable and last readable (duplex)", (done) => {
const chunks: string[] = [];
const source = createStream();
source.push("andré");
Expand All @@ -77,7 +77,7 @@ describe("compose", () => {

source
.pipe(
compose(
chainStream(
transformData((data: string) => data.substring(0, 1)),
transformData((data) => "_" + data),
),
Expand All @@ -93,18 +93,18 @@ describe("compose", () => {
});
});

it("can compose inside compose", (done) => {
it("can chain inside a chain", (done) => {
const chunks: string[] = [];
const source = createStream();
const nested = compose(
const nested = chainStream(
source,
transformData((d: string) => d.substring(0, 1)),
);

source.push("first");
source.push(null);

compose(
chainStream(
nested,
writeData((data: string) => {
chunks.push(data);
Expand All @@ -126,7 +126,7 @@ describe("compose", () => {
source.push(["andré", "bruno", "robert"]);
source.push(null);

compose(
chainStream(
source,
flattenArray({highWaterMark: 1}),
createSlowStream({maxWriteInterval: 10}),
Expand All @@ -139,15 +139,15 @@ describe("compose", () => {
});
});

it("can handle back pressure with nested compose", (done) => {
it("can handle back pressure with nested chain", (done) => {
let result = "";
const source = createStream();
source.push(["andré", "bruno", "robert"]);
source.push(null);

compose(
chainStream(
source,
compose(flattenArray({highWaterMark: 1}), createSlowStream({maxWriteInterval: 10})),
chainStream(flattenArray({highWaterMark: 1}), createSlowStream({maxWriteInterval: 10})),
writeData((data: string) => {
result += data;
}),
Expand All @@ -160,7 +160,7 @@ describe("compose", () => {
it("should propagate emitted error", (done) => {
const source = createStream();

compose(
chainStream(
source,
writeData(() => {}),
)
Expand All @@ -182,7 +182,7 @@ describe("compose", () => {
source.push("first");
source.push(null);

compose(
chainStream(
source,
writeData(() => {
throw new Error("write error");
Expand Down
10 changes: 5 additions & 5 deletions test/oleoduc-test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {deepStrictEqual, fail} from "assert";
import {assertErrorMessage, createStream, delay} from "./testUtils";
import {compose, oleoduc, transformData, writeData} from "../src";
import {chainStream, oleoduc, transformData, writeData} from "../src";

describe("oleoduc", () => {
it("can create oleoduc", async () => {
Expand Down Expand Up @@ -35,7 +35,7 @@ describe("oleoduc", () => {
try {
await oleoduc(
source,
compose(
chainStream(
transformData((data: string) => data.substring(0, 1)),
transformData((data: string) => "_" + data),
),
Expand All @@ -47,10 +47,10 @@ describe("oleoduc", () => {
}
});

it("can use compose inside oleoduc", async () => {
it("can use chainStream inside oleoduc", async () => {
const chunks: string[] = [];
const source = createStream();
const composed = compose(
const chained = chainStream(
source,
transformData((d: string) => d.substring(0, 1)),
);
Expand All @@ -59,7 +59,7 @@ describe("oleoduc", () => {
source.push(null);

await oleoduc(
composed,
chained,
writeData((data: string) => chunks.push(data)),
)
.then(() => {
Expand Down
4 changes: 2 additions & 2 deletions test/toAsyncIterator-test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {deepStrictEqual, fail} from "assert";
import {assertErrorMessage, createStream} from "./testUtils";
import {toAsyncIterator} from "../src/utils/toAsyncIterator";
import {compose, transformData} from "../src";
import {chainStream, transformData} from "../src";

describe("toAsyncIterator", () => {
it("can convert a readable stream into an iterator", async () => {
Expand All @@ -21,7 +21,7 @@ describe("toAsyncIterator", () => {

it("iterator should honor error", async () => {
const readable = createStream();
const failingStream = compose(
const failingStream = chainStream(
readable,
transformData(() => {
throw new Error("This is a stream error");
Expand Down

0 comments on commit 9e3c474

Please sign in to comment.