From 9e3c474e807876f3b155ed90b4f76cae107594c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Gu=C3=A9rout?= Date: Sat, 4 Jan 2025 00:32:29 +0100 Subject: [PATCH] rename compose into chainStream --- README.md | 25 ++++++------ src/{compose.ts => chainStream.ts} | 14 +++---- src/index.ts | 6 ++- src/readLineByLine.ts | 4 +- test/{compose-test.ts => chainStream-test.ts} | 38 +++++++++---------- test/oleoduc-test.ts | 10 ++--- test/toAsyncIterator-test.ts | 4 +- 7 files changed, 52 insertions(+), 49 deletions(-) rename src/{compose.ts => chainStream.ts} (70%) rename test/{compose-test.ts => chainStream-test.ts} (85%) diff --git a/README.md b/README.md index ca4ccb6..e4fb106 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ import { ... } from "oleoduc"; - Easily transform, filter and write data flowing through the stream - Catch stream errors -- Compose and merge streams together +- Chain and merge streams together - Read a stream as if it were a promise ### Quick tour @@ -67,11 +67,11 @@ app.get("/documents", async (req, res) => { Create a stream to parse CSV and iterate over it ```js -const { compose, transformData } = require("oleoduc"); +const { chainStream, transformData } = require("oleoduc"); const { createReadStream } = require("fs"); const { parse } = require("csv-parse"); -const csvStream = compose( +const csvStream = chainStream( createReadStream("/path/to/file.csv"), parse(), ) @@ -84,7 +84,7 @@ for await (const data of csvStream) { # API * [accumulateData](#accumulatedatacallback-options) -* [compose](#composestreams-options) +* [chainStream](#chainstreamstreams-options) * [concatStreams](#concatstreamsstreams-options) * [filterData](#filterdatacallback-options) * [flattenArray](#flattenarrayoptions) @@ -167,7 +167,7 @@ oleoduc( ] ``` -## compose(...streams, [options]) +## chainStream(...streams, [options]) Same as oleoduc but without promise stuff and stream composition capability @@ -178,14 +178,14 @@ Same as oleoduc but without promise stuff and stream composition capability - `*`: The rest of the options is passed to [stream.Transform](https://nodejs.org/api/stream.html#stream_class_stream_transform) -Compose streams +Chain streams ```js -const { compose, transformData, writeData } = require("oleoduc"); +const { chainStream, transformData, writeData } = require("oleoduc"); async function getCursor() { const cursor = await getDataFromDB(); - return compose( + return chainStream( cursor, transformData((data) => data.value * 10), ) @@ -198,13 +198,12 @@ await oleoduc( ); ``` -Iterate over a composed readable stream +Iterate over a chained readable stream ```js -const { compose, transformData } = require("oleoduc"); +const { chainStream, transformData } = require("oleoduc"); -const stream -compose( +const stream = chainStream( source, transformData((data) => data.trim()), ); @@ -220,7 +219,7 @@ Handle errors in single event listener ```js const { oleoduc, writeData } = require("oleoduc"); -const stream = compose( +const stream = chainStream( source, writeData((obj) => throw new Error()) ); diff --git a/src/compose.ts b/src/chainStream.ts similarity index 70% rename from src/compose.ts rename to src/chainStream.ts index 5e5b4b6..077e622 100644 --- a/src/compose.ts +++ b/src/chainStream.ts @@ -6,17 +6,17 @@ import {pipeStreamsTogether} from "./utils/pipeStreamsTogether"; import {AnyStream, PipeableStreams} from "./types"; import {Readable, TransformOptions} from "stream"; -export type ComposeOptions = TransformOptions; +export type ChainStreamOptions = TransformOptions; // eslint-disable-next-line @typescript-eslint/no-unused-vars -type ComposeReturn = TLast extends Readable +type ChainStreamReturn = TLast extends Readable ? NodeJS.ReadWriteStream & Readable : NodeJS.ReadWriteStream; -export function compose( - ...args: PipeableStreams -): ComposeReturn { - const {params: streams, options} = parseArgs(args); +export function chainStream( + ...args: PipeableStreams +): ChainStreamReturn { + const {params: streams, options} = parseArgs(args); const {first, last, wrapper} = wrapStreams(streams, options); @@ -31,5 +31,5 @@ export function compose; + return wrapper as ChainStreamReturn; } diff --git a/src/index.ts b/src/index.ts index 571f925..1b6203d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,5 @@ import {accumulateData} from "./accumulateData"; -import {compose} from "./compose"; +import {chainStream} from "./chainStream"; import {concatStreams} from "./concatStreams"; import {filterData} from "./filterData"; import {flattenArray} from "./flattenArray"; @@ -14,8 +14,12 @@ import {transformStream} from "./transformStream"; import {writeData} from "./writeData"; import {writeToStdout} from "./writeToStdout"; +//Legacy from 0.8.x +const compose = chainStream; + export { accumulateData, + chainStream, compose, concatStreams, filterData, diff --git a/src/readLineByLine.ts b/src/readLineByLine.ts index d2ce443..8034d4c 100644 --- a/src/readLineByLine.ts +++ b/src/readLineByLine.ts @@ -1,9 +1,9 @@ import {accumulateData} from "./accumulateData"; import {flattenArray} from "./flattenArray"; -import {compose} from "./compose"; +import {chainStream} from "./chainStream"; export function readLineByLine() { - return compose( + return chainStream( accumulateData( (acc, data: string, flush) => { const lines = data.toString().split(/\r?\n/); diff --git a/test/compose-test.ts b/test/chainStream-test.ts similarity index 85% rename from test/compose-test.ts rename to test/chainStream-test.ts index fc8bfc9..33175a2 100644 --- a/test/compose-test.ts +++ b/test/chainStream-test.ts @@ -1,15 +1,15 @@ import {deepStrictEqual, fail} from "assert"; import {createSlowStream, createStream} from "./testUtils"; -import {compose, flattenArray, transformData, writeData} from "../src"; +import {chainStream, flattenArray, transformData, writeData} from "../src"; -describe("compose", () => { - it("can compose streams", (done) => { +describe("chainStream", () => { + it("can chain streams", (done) => { const chunks: string[] = []; const source = createStream(); source.push("first"); source.push(null); - compose( + chainStream( source, transformData((data: string) => data.substring(0, 1)), writeData((data: string) => { @@ -26,7 +26,7 @@ describe("compose", () => { }); }); - it("can iterate over a composed stream", async () => { + it("can iterate over a chained stream", async () => { const chunks: string[] = []; const source = createStream(); source.push("andré"); @@ -34,7 +34,7 @@ describe("compose", () => { source.push("robert"); source.push(null); - const stream = compose( + const stream = chainStream( source, transformData((data: string) => data.substring(0, 1)), ); @@ -46,7 +46,7 @@ describe("compose", () => { deepStrictEqual(chunks, ["a", "b", "r"]); }); - it("can pipe a compose stream", (done) => { + it("can pipe a chain stream", (done) => { const chunks: string[] = []; const source = createStream(); source.push("andré"); @@ -54,7 +54,7 @@ describe("compose", () => { source.push("robert"); source.push(null); - compose(source) + chainStream(source) .pipe(transformData((data: string) => data.substring(0, 1))) .pipe( writeData((data: string) => { @@ -67,7 +67,7 @@ describe("compose", () => { }); }); - it("can build compose with first writeable and last readable (duplex)", (done) => { + it("can build chain with first writeable and last readable (duplex)", (done) => { const chunks: string[] = []; const source = createStream(); source.push("andré"); @@ -77,7 +77,7 @@ describe("compose", () => { source .pipe( - compose( + chainStream( transformData((data: string) => data.substring(0, 1)), transformData((data) => "_" + data), ), @@ -93,10 +93,10 @@ describe("compose", () => { }); }); - it("can compose inside compose", (done) => { + it("can chain inside a chain", (done) => { const chunks: string[] = []; const source = createStream(); - const nested = compose( + const nested = chainStream( source, transformData((d: string) => d.substring(0, 1)), ); @@ -104,7 +104,7 @@ describe("compose", () => { source.push("first"); source.push(null); - compose( + chainStream( nested, writeData((data: string) => { chunks.push(data); @@ -126,7 +126,7 @@ describe("compose", () => { source.push(["andré", "bruno", "robert"]); source.push(null); - compose( + chainStream( source, flattenArray({highWaterMark: 1}), createSlowStream({maxWriteInterval: 10}), @@ -139,15 +139,15 @@ describe("compose", () => { }); }); - it("can handle back pressure with nested compose", (done) => { + it("can handle back pressure with nested chain", (done) => { let result = ""; const source = createStream(); source.push(["andré", "bruno", "robert"]); source.push(null); - compose( + chainStream( source, - compose(flattenArray({highWaterMark: 1}), createSlowStream({maxWriteInterval: 10})), + chainStream(flattenArray({highWaterMark: 1}), createSlowStream({maxWriteInterval: 10})), writeData((data: string) => { result += data; }), @@ -160,7 +160,7 @@ describe("compose", () => { it("should propagate emitted error", (done) => { const source = createStream(); - compose( + chainStream( source, writeData(() => {}), ) @@ -182,7 +182,7 @@ describe("compose", () => { source.push("first"); source.push(null); - compose( + chainStream( source, writeData(() => { throw new Error("write error"); diff --git a/test/oleoduc-test.ts b/test/oleoduc-test.ts index 9d9165a..ee0d5cd 100644 --- a/test/oleoduc-test.ts +++ b/test/oleoduc-test.ts @@ -1,6 +1,6 @@ import {deepStrictEqual, fail} from "assert"; import {assertErrorMessage, createStream, delay} from "./testUtils"; -import {compose, oleoduc, transformData, writeData} from "../src"; +import {chainStream, oleoduc, transformData, writeData} from "../src"; describe("oleoduc", () => { it("can create oleoduc", async () => { @@ -35,7 +35,7 @@ describe("oleoduc", () => { try { await oleoduc( source, - compose( + chainStream( transformData((data: string) => data.substring(0, 1)), transformData((data: string) => "_" + data), ), @@ -47,10 +47,10 @@ describe("oleoduc", () => { } }); - it("can use compose inside oleoduc", async () => { + it("can use chainStream inside oleoduc", async () => { const chunks: string[] = []; const source = createStream(); - const composed = compose( + const chained = chainStream( source, transformData((d: string) => d.substring(0, 1)), ); @@ -59,7 +59,7 @@ describe("oleoduc", () => { source.push(null); await oleoduc( - composed, + chained, writeData((data: string) => chunks.push(data)), ) .then(() => { diff --git a/test/toAsyncIterator-test.ts b/test/toAsyncIterator-test.ts index 76b1861..77604fb 100644 --- a/test/toAsyncIterator-test.ts +++ b/test/toAsyncIterator-test.ts @@ -1,7 +1,7 @@ import {deepStrictEqual, fail} from "assert"; import {assertErrorMessage, createStream} from "./testUtils"; import {toAsyncIterator} from "../src/utils/toAsyncIterator"; -import {compose, transformData} from "../src"; +import {chainStream, transformData} from "../src"; describe("toAsyncIterator", () => { it("can convert a readable stream into an iterator", async () => { @@ -21,7 +21,7 @@ describe("toAsyncIterator", () => { it("iterator should honor error", async () => { const readable = createStream(); - const failingStream = compose( + const failingStream = chainStream( readable, transformData(() => { throw new Error("This is a stream error");