Skip to content

Commit

Permalink
rename compose into chainStream
Browse files Browse the repository at this point in the history
  • Loading branch information
bguerout committed Jan 5, 2025
1 parent 7175a9d commit f75776b
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 49 deletions.
25 changes: 12 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import { ... } from "oleoduc";
- Easily transform, filter and write data flowing through the stream
- Catch stream errors
- Compose and merge streams together
- Chain and merge streams together
- Read a stream as if it were a promise
### Quick tour
Expand Down Expand Up @@ -67,11 +67,11 @@ app.get("/documents", async (req, res) => {
Create a stream to parse CSV and iterate over it
```js
const { compose, transformData } = require("oleoduc");
const { chainStreams, transformData } = require("oleoduc");
const { createReadStream } = require("fs");
const { parse } = require("csv-parse");
const csvStream = compose(
const csvStream = chainStreams(
createReadStream("/path/to/file.csv"),
parse(),
)
Expand All @@ -84,7 +84,7 @@ for await (const data of csvStream) {
# API
* [accumulateData](#accumulatedatacallback-options)
* [compose](#composestreams-options)
* [chainStreams](#chainstreamstreams-options)
* [concatStreams](#concatstreamsstreams-options)
* [filterData](#filterdatacallback-options)
* [flattenArray](#flattenarrayoptions)
Expand Down Expand Up @@ -167,7 +167,7 @@ oleoduc(
]
```
## compose(...streams, [options])
## chainStreams(...streams, [options])
Same as oleoduc but without promise stuff and stream composition capability
Expand All @@ -178,14 +178,14 @@ Same as oleoduc but without promise stuff and stream composition capability
- `*`: The rest of the options is passed
to [stream.Transform](https://nodejs.org/api/stream.html#stream_class_stream_transform)
Compose streams
Chain streams
```js
const { compose, transformData, writeData } = require("oleoduc");
const { chainStreams, transformData, writeData } = require("oleoduc");
async function getCursor() {
const cursor = await getDataFromDB();
return compose(
return chainStreams(
cursor,
transformData((data) => data.value * 10),
)
Expand All @@ -198,13 +198,12 @@ await oleoduc(
);
```
Iterate over a composed readable stream
Iterate over a chained readable stream
```js
const { compose, transformData } = require("oleoduc");
const { chainStreams, transformData } = require("oleoduc");
const stream
compose(
const stream = chainStreams(
source,
transformData((data) => data.trim()),
);
Expand All @@ -220,7 +219,7 @@ Handle errors in single event listener
```js
const { oleoduc, writeData } = require("oleoduc");
const stream = compose(
const stream = chainStreams(
source,
writeData((obj) => throw new Error())
);
Expand Down
14 changes: 7 additions & 7 deletions src/compose.ts → src/chainStreams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ import {decorateWithAsyncIterator} from "./utils/decorateWithAsyncIterator.ts";
import {pipeStreamsTogether} from "./utils/pipeStreamsTogether.ts";
import {AnyStream, PipeableStreams} from "./types.ts";

export type ComposeOptions = TransformOptions;
export type ChainStreamsOptions = TransformOptions;

// eslint-disable-next-line @typescript-eslint/no-unused-vars
type ComposeReturn<TLast extends AnyStream> = TLast extends Readable
type ChainStreamsReturn<TLast extends AnyStream> = TLast extends Readable
? NodeJS.ReadWriteStream & Readable
: NodeJS.ReadWriteStream;

export function compose<TLast extends NodeJS.ReadWriteStream | NodeJS.WritableStream>(
...args: PipeableStreams<NodeJS.ReadableStream | NodeJS.ReadWriteStream, TLast, ComposeOptions>
): ComposeReturn<TLast> {
const {params: streams, options} = parseArgs<AnyStream, ComposeOptions>(args);
export function chainStreams<TLast extends NodeJS.ReadWriteStream | NodeJS.WritableStream>(
...args: PipeableStreams<NodeJS.ReadableStream | NodeJS.ReadWriteStream, TLast, ChainStreamsOptions>
): ChainStreamsReturn<TLast> {
const {params: streams, options} = parseArgs<AnyStream, ChainStreamsOptions>(args);

const {first, last, wrapper} = wrapStreams(streams, options);

Expand All @@ -31,5 +31,5 @@ export function compose<TLast extends NodeJS.ReadWriteStream | NodeJS.WritableSt

pipeStreamsTogether(streams, wrapper);

return wrapper as ComposeReturn<TLast>;
return wrapper as ChainStreamsReturn<TLast>;
}
6 changes: 5 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {accumulateData} from "./accumulateData.ts";
import {compose} from "./compose.ts";
import {chainStreams} from "./chainStreams.ts";
import {concatStreams} from "./concatStreams.ts";
import {filterData} from "./filterData.ts";
import {flattenArray} from "./flattenArray.ts";
Expand All @@ -14,8 +14,12 @@ import {transformStream} from "./transformStream.ts";
import {writeData} from "./writeData.ts";
import {writeToStdout} from "./writeToStdout.ts";

//Legacy from 0.8.x
const compose = chainStreams;

export {
accumulateData,
chainStreams,
compose,
concatStreams,
filterData,
Expand Down
4 changes: 2 additions & 2 deletions src/readLineByLine.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import {accumulateData} from "./accumulateData.ts";
import {flattenArray} from "./flattenArray.ts";
import {compose} from "./compose.ts";
import {chainStreams} from "./chainStreams.ts";

export function readLineByLine() {
return compose(
return chainStreams(
accumulateData<string, string[]>(
(acc, data: string, flush) => {
const lines = data.toString().split(/\r?\n/);
Expand Down
38 changes: 19 additions & 19 deletions test/compose-test.ts → test/chainStreams-test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import {deepStrictEqual, fail} from "assert";
import {createSlowStream, createStream} from "./testUtils";

Check failure on line 2 in test/chainStreams-test.ts

View workflow job for this annotation

GitHub Actions / test

"./testUtils" is not found
import {compose, flattenArray, transformData, writeData} from "../src";
import {chainStreams, flattenArray, transformData, writeData} from "../src";

Check failure on line 3 in test/chainStreams-test.ts

View workflow job for this annotation

GitHub Actions / test

"../src" is not found

describe("compose", () => {
it("can compose streams", (done) => {
describe("chainStreams", () => {
it("can chain streams", (done) => {
const chunks: string[] = [];
const source = createStream();
source.push("first");
source.push(null);

compose(
chainStreams(
source,
transformData((data: string) => data.substring(0, 1)),
writeData((data: string) => {
Expand All @@ -26,15 +26,15 @@ describe("compose", () => {
});
});

it("can iterate over a composed stream", async () => {
it("can iterate over a chained stream", async () => {
const chunks: string[] = [];
const source = createStream();
source.push("andré");
source.push("bruno");
source.push("robert");
source.push(null);

const stream = compose(
const stream = chainStreams(
source,
transformData((data: string) => data.substring(0, 1)),
);
Expand All @@ -46,15 +46,15 @@ describe("compose", () => {
deepStrictEqual(chunks, ["a", "b", "r"]);
});

it("can pipe a compose stream", (done) => {
it("can pipe a chain stream", (done) => {
const chunks: string[] = [];
const source = createStream();
source.push("andré");
source.push("bruno");
source.push("robert");
source.push(null);

compose(source)
chainStreams(source)
.pipe(transformData((data: string) => data.substring(0, 1)))
.pipe(
writeData((data: string) => {
Expand All @@ -67,7 +67,7 @@ describe("compose", () => {
});
});

it("can build compose with first writeable and last readable (duplex)", (done) => {
it("can build chain with first writeable and last readable (duplex)", (done) => {
const chunks: string[] = [];
const source = createStream();
source.push("andré");
Expand All @@ -77,7 +77,7 @@ describe("compose", () => {

source
.pipe(
compose(
chainStreams(
transformData((data: string) => data.substring(0, 1)),
transformData((data) => "_" + data),
),
Expand All @@ -93,18 +93,18 @@ describe("compose", () => {
});
});

it("can compose inside compose", (done) => {
it("can chain inside a chain", (done) => {
const chunks: string[] = [];
const source = createStream();
const nested = compose(
const nested = chainStreams(
source,
transformData((d: string) => d.substring(0, 1)),
);

source.push("first");
source.push(null);

compose(
chainStreams(
nested,
writeData((data: string) => {
chunks.push(data);
Expand All @@ -126,7 +126,7 @@ describe("compose", () => {
source.push(["andré", "bruno", "robert"]);
source.push(null);

compose(
chainStreams(
source,
flattenArray({highWaterMark: 1}),
createSlowStream({maxWriteInterval: 10}),
Expand All @@ -139,15 +139,15 @@ describe("compose", () => {
});
});

it("can handle back pressure with nested compose", (done) => {
it("can handle back pressure with nested chain", (done) => {
let result = "";
const source = createStream();
source.push(["andré", "bruno", "robert"]);
source.push(null);

compose(
chainStreams(
source,
compose(flattenArray({highWaterMark: 1}), createSlowStream({maxWriteInterval: 10})),
chainStreams(flattenArray({highWaterMark: 1}), createSlowStream({maxWriteInterval: 10})),
writeData((data: string) => {
result += data;
}),
Expand All @@ -160,7 +160,7 @@ describe("compose", () => {
it("should propagate emitted error", (done) => {
const source = createStream();

compose(
chainStreams(
source,
writeData(() => {}),
)
Expand All @@ -182,7 +182,7 @@ describe("compose", () => {
source.push("first");
source.push(null);

compose(
chainStreams(
source,
writeData(() => {
throw new Error("write error");
Expand Down
10 changes: 5 additions & 5 deletions test/oleoduc-test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {deepStrictEqual, fail} from "assert";
import {assertErrorMessage, createStream, delay} from "./testUtils";
import {compose, oleoduc, transformData, writeData} from "../src";
import {chainStreams, oleoduc, transformData, writeData} from "../src";

describe("oleoduc", () => {
it("can create oleoduc", async () => {
Expand Down Expand Up @@ -35,7 +35,7 @@ describe("oleoduc", () => {
try {
await oleoduc(
source,
compose(
chainStreams(
transformData((data: string) => data.substring(0, 1)),
transformData((data: string) => "_" + data),
),
Expand All @@ -47,10 +47,10 @@ describe("oleoduc", () => {
}
});

it("can use compose inside oleoduc", async () => {
it("can use chainStream inside oleoduc", async () => {
const chunks: string[] = [];
const source = createStream();
const composed = compose(
const chained = chainStreams(
source,
transformData((d: string) => d.substring(0, 1)),
);
Expand All @@ -59,7 +59,7 @@ describe("oleoduc", () => {
source.push(null);

await oleoduc(
composed,
chained,
writeData((data: string) => chunks.push(data)),
)
.then(() => {
Expand Down
4 changes: 2 additions & 2 deletions test/toAsyncIterator-test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {deepStrictEqual, fail} from "assert";
import {assertErrorMessage, createStream} from "./testUtils";
import {toAsyncIterator} from "../src/utils/toAsyncIterator";
import {compose, transformData} from "../src";
import {chainStreams, transformData} from "../src";

describe("toAsyncIterator", () => {
it("can convert a readable stream into an iterator", async () => {
Expand All @@ -21,7 +21,7 @@ describe("toAsyncIterator", () => {

it("iterator should honor error", async () => {
const readable = createStream();
const failingStream = compose(
const failingStream = chainStreams(
readable,
transformData(() => {
throw new Error("This is a stream error");
Expand Down

0 comments on commit f75776b

Please sign in to comment.