From 8be83b89d7ecc9e60959b3c0fd72db9c64525005 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20Gu=C3=A9rout?= Date: Sun, 5 Jan 2025 13:58:24 +0100 Subject: [PATCH] rename chainStreams into pipeStreams --- README.md | 22 +++++------ src/index.ts | 6 +-- src/{chainStreams.ts => pipeStreams.ts} | 14 +++---- src/readLineByLine.ts | 4 +- test/oleoduc-test.ts | 10 ++--- ...ainStreams-test.ts => pipeStreams-test.ts} | 38 +++++++++---------- test/toAsyncIterator-test.ts | 4 +- 7 files changed, 49 insertions(+), 49 deletions(-) rename src/{chainStreams.ts => pipeStreams.ts} (70%) rename test/{chainStreams-test.ts => pipeStreams-test.ts} (85%) diff --git a/README.md b/README.md index 959c16f..8f7ff3b 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ import { ... } from "oleoduc"; - Easily transform, filter and write data flowing through the stream - Catch stream errors -- Chain and merge streams together +- Pipe and merge streams together - Read a stream as if it were a promise ### Quick tour @@ -67,11 +67,11 @@ app.get("/documents", async (req, res) => { Create a stream to parse CSV and iterate over it ```js -const { chainStreams, transformData } = require("oleoduc"); +const { pipeStreams, transformData } = require("oleoduc"); const { createReadStream } = require("fs"); const { parse } = require("csv-parse"); -const csvStream = chainStreams( +const csvStream = pipeStreams( createReadStream("/path/to/file.csv"), parse(), ) @@ -84,7 +84,7 @@ for await (const data of csvStream) { # API * [accumulateData](#accumulatedatacallback-options) -* [chainStreams](#chainstreamstreams-options) +* [pipeStreams](#pipestreamstreams-options) * [concatStreams](#concatstreamsstreams-options) * [filterData](#filterdatacallback-options) * [flattenArray](#flattenarrayoptions) @@ -167,7 +167,7 @@ oleoduc( ] ``` -## chainStreams(...streams, [options]) +## pipeStreams(...streams, [options]) Same as oleoduc but without promise stuff and stream composition capability @@ -178,14 +178,14 @@ Same as oleoduc but without promise stuff and stream composition capability - `*`: The rest of the options is passed to [stream.Transform](https://nodejs.org/api/stream.html#stream_class_stream_transform) -Chain streams +Pipe streams ```js -const { chainStreams, transformData, writeData } = require("oleoduc"); +const { pipeStreams, transformData, writeData } = require("oleoduc"); async function getCursor() { const cursor = await getDataFromDB(); - return chainStreams( + return pipeStreams( cursor, transformData((data) => data.value * 10), ) @@ -201,9 +201,9 @@ await oleoduc( Iterate over a chained readable stream ```js -const { chainStreams, transformData } = require("oleoduc"); +const { pipeStreams, transformData } = require("oleoduc"); -const stream = chainStreams( +const stream = pipeStreams( source, transformData((data) => data.trim()), ); @@ -219,7 +219,7 @@ Handle errors in single event listener ```js const { oleoduc, writeData } = require("oleoduc"); -const stream = chainStreams( +const stream = pipeStreams( source, writeData((obj) => throw new Error()) ); diff --git a/src/index.ts b/src/index.ts index 6e05e78..671f3bd 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,5 @@ import {accumulateData} from "./accumulateData.ts"; -import {chainStreams} from "./chainStreams.ts"; +import {pipeStreams} from "./pipeStreams.ts"; import {concatStreams} from "./concatStreams.ts"; import {filterData} from "./filterData.ts"; import {flattenArray} from "./flattenArray.ts"; @@ -15,11 +15,11 @@ import {writeData} from "./writeData.ts"; import {writeToStdout} from "./writeToStdout.ts"; //Legacy from 0.8.x -const compose = chainStreams; +const compose = pipeStreams; export { accumulateData, - chainStreams, + pipeStreams, compose, concatStreams, filterData, diff --git a/src/chainStreams.ts b/src/pipeStreams.ts similarity index 70% rename from src/chainStreams.ts rename to src/pipeStreams.ts index a7bdf5d..a7f145b 100644 --- a/src/chainStreams.ts +++ b/src/pipeStreams.ts @@ -6,17 +6,17 @@ import {decorateWithAsyncIterator} from "./utils/decorateWithAsyncIterator.ts"; import {pipeStreamsTogether} from "./utils/pipeStreamsTogether.ts"; import {AnyStream, PipeableStreams} from "./types.ts"; -export type ChainStreamsOptions = TransformOptions; +export type PipeStreamsOptions = TransformOptions; // eslint-disable-next-line @typescript-eslint/no-unused-vars -type ChainStreamsReturn = TLast extends Readable +type PipeStreamsReturn = TLast extends Readable ? NodeJS.ReadWriteStream & Readable : NodeJS.ReadWriteStream; -export function chainStreams( - ...args: PipeableStreams -): ChainStreamsReturn { - const {params: streams, options} = parseArgs(args); +export function pipeStreams( + ...args: PipeableStreams +): PipeStreamsReturn { + const {params: streams, options} = parseArgs(args); const {first, last, wrapper} = wrapStreams(streams, options); @@ -31,5 +31,5 @@ export function chainStreams; + return wrapper as PipeStreamsReturn; } diff --git a/src/readLineByLine.ts b/src/readLineByLine.ts index 9fd6051..9ddd031 100644 --- a/src/readLineByLine.ts +++ b/src/readLineByLine.ts @@ -1,9 +1,9 @@ import {accumulateData} from "./accumulateData.ts"; import {flattenArray} from "./flattenArray.ts"; -import {chainStreams} from "./chainStreams.ts"; +import {pipeStreams} from "./pipeStreams.ts"; export function readLineByLine() { - return chainStreams( + return pipeStreams( accumulateData( (acc, data: string, flush) => { const lines = data.toString().split(/\r?\n/); diff --git a/test/oleoduc-test.ts b/test/oleoduc-test.ts index 5cce0a4..6bfa114 100644 --- a/test/oleoduc-test.ts +++ b/test/oleoduc-test.ts @@ -1,6 +1,6 @@ import {deepStrictEqual, fail} from "assert"; import {assertErrorMessage, createStream, delay} from "./testUtils.ts"; -import {chainStreams, oleoduc, transformData, writeData} from "../src/index.ts"; +import {pipeStreams, oleoduc, transformData, writeData} from "../src/index.ts"; describe("oleoduc", () => { it("can create oleoduc", async () => { @@ -35,7 +35,7 @@ describe("oleoduc", () => { try { await oleoduc( source, - chainStreams( + pipeStreams( transformData((data: string) => data.substring(0, 1)), transformData((data: string) => "_" + data), ), @@ -47,10 +47,10 @@ describe("oleoduc", () => { } }); - it("can use chainStream inside oleoduc", async () => { + it("can use pipeStream inside oleoduc", async () => { const chunks: string[] = []; const source = createStream(); - const chained = chainStreams( + const piped = pipeStreams( source, transformData((d: string) => d.substring(0, 1)), ); @@ -59,7 +59,7 @@ describe("oleoduc", () => { source.push(null); await oleoduc( - chained, + piped, writeData((data: string) => chunks.push(data)), ) .then(() => { diff --git a/test/chainStreams-test.ts b/test/pipeStreams-test.ts similarity index 85% rename from test/chainStreams-test.ts rename to test/pipeStreams-test.ts index 837e7ae..352ec16 100644 --- a/test/chainStreams-test.ts +++ b/test/pipeStreams-test.ts @@ -1,15 +1,15 @@ import {deepStrictEqual, fail} from "assert"; import {createSlowStream, createStream} from "./testUtils.ts"; -import {chainStreams, flattenArray, transformData, writeData} from "../src/index.ts"; +import {pipeStreams, flattenArray, transformData, writeData} from "../src/index.ts"; -describe("chainStreams", () => { - it("can chain streams", (done) => { +describe("pipeStreams", () => { + it("can pipe streams", (done) => { const chunks: string[] = []; const source = createStream(); source.push("first"); source.push(null); - chainStreams( + pipeStreams( source, transformData((data: string) => data.substring(0, 1)), writeData((data: string) => { @@ -26,7 +26,7 @@ describe("chainStreams", () => { }); }); - it("can iterate over a chained stream", async () => { + it("can iterate over a piped stream", async () => { const chunks: string[] = []; const source = createStream(); source.push("andré"); @@ -34,7 +34,7 @@ describe("chainStreams", () => { source.push("robert"); source.push(null); - const stream = chainStreams( + const stream = pipeStreams( source, transformData((data: string) => data.substring(0, 1)), ); @@ -46,7 +46,7 @@ describe("chainStreams", () => { deepStrictEqual(chunks, ["a", "b", "r"]); }); - it("can pipe a chain stream", (done) => { + it("can pipe a pipe stream", (done) => { const chunks: string[] = []; const source = createStream(); source.push("andré"); @@ -54,7 +54,7 @@ describe("chainStreams", () => { source.push("robert"); source.push(null); - chainStreams(source) + pipeStreams(source) .pipe(transformData((data: string) => data.substring(0, 1))) .pipe( writeData((data: string) => { @@ -67,7 +67,7 @@ describe("chainStreams", () => { }); }); - it("can build chain with first writeable and last readable (duplex)", (done) => { + it("can build pipe with first writeable and last readable (duplex)", (done) => { const chunks: string[] = []; const source = createStream(); source.push("andré"); @@ -77,7 +77,7 @@ describe("chainStreams", () => { source .pipe( - chainStreams( + pipeStreams( transformData((data: string) => data.substring(0, 1)), transformData((data) => "_" + data), ), @@ -93,10 +93,10 @@ describe("chainStreams", () => { }); }); - it("can chain inside a chain", (done) => { + it("can pipe inside a pipe", (done) => { const chunks: string[] = []; const source = createStream(); - const nested = chainStreams( + const nested = pipeStreams( source, transformData((d: string) => d.substring(0, 1)), ); @@ -104,7 +104,7 @@ describe("chainStreams", () => { source.push("first"); source.push(null); - chainStreams( + pipeStreams( nested, writeData((data: string) => { chunks.push(data); @@ -126,7 +126,7 @@ describe("chainStreams", () => { source.push(["andré", "bruno", "robert"]); source.push(null); - chainStreams( + pipeStreams( source, flattenArray({highWaterMark: 1}), createSlowStream({maxWriteInterval: 10}), @@ -139,15 +139,15 @@ describe("chainStreams", () => { }); }); - it("can handle back pressure with nested chain", (done) => { + it("can handle back pressure with nested pipe", (done) => { let result = ""; const source = createStream(); source.push(["andré", "bruno", "robert"]); source.push(null); - chainStreams( + pipeStreams( source, - chainStreams(flattenArray({highWaterMark: 1}), createSlowStream({maxWriteInterval: 10})), + pipeStreams(flattenArray({highWaterMark: 1}), createSlowStream({maxWriteInterval: 10})), writeData((data: string) => { result += data; }), @@ -160,7 +160,7 @@ describe("chainStreams", () => { it("should propagate emitted error", (done) => { const source = createStream(); - chainStreams( + pipeStreams( source, writeData(() => {}), ) @@ -182,7 +182,7 @@ describe("chainStreams", () => { source.push("first"); source.push(null); - chainStreams( + pipeStreams( source, writeData(() => { throw new Error("write error"); diff --git a/test/toAsyncIterator-test.ts b/test/toAsyncIterator-test.ts index 232ca16..a3990a8 100644 --- a/test/toAsyncIterator-test.ts +++ b/test/toAsyncIterator-test.ts @@ -1,7 +1,7 @@ import {deepStrictEqual, fail} from "assert"; import {assertErrorMessage, createStream} from "./testUtils.ts"; import {toAsyncIterator} from "../src/utils/toAsyncIterator.ts"; -import {chainStreams, transformData} from "../src/index.ts"; +import {pipeStreams, transformData} from "../src/index.ts"; describe("toAsyncIterator", () => { it("can convert a readable stream into an iterator", async () => { @@ -21,7 +21,7 @@ describe("toAsyncIterator", () => { it("iterator should honor error", async () => { const readable = createStream(); - const failingStream = chainStreams( + const failingStream = pipeStreams( readable, transformData(() => { throw new Error("This is a stream error");