Skip to content

Commit

Permalink
rename chainStreams into pipeStreams
Browse files Browse the repository at this point in the history
  • Loading branch information
bguerout committed Jan 5, 2025
1 parent adba5cd commit 8be83b8
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 49 deletions.
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import { ... } from "oleoduc";
- Easily transform, filter and write data flowing through the stream
- Catch stream errors
- Chain and merge streams together
- Pipe and merge streams together
- Read a stream as if it were a promise
### Quick tour
Expand Down Expand Up @@ -67,11 +67,11 @@ app.get("/documents", async (req, res) => {
Create a stream to parse CSV and iterate over it
```js
const { chainStreams, transformData } = require("oleoduc");
const { pipeStreams, transformData } = require("oleoduc");
const { createReadStream } = require("fs");
const { parse } = require("csv-parse");
const csvStream = chainStreams(
const csvStream = pipeStreams(
createReadStream("/path/to/file.csv"),
parse(),
)
Expand All @@ -84,7 +84,7 @@ for await (const data of csvStream) {
# API
* [accumulateData](#accumulatedatacallback-options)
* [chainStreams](#chainstreamstreams-options)
* [pipeStreams](#pipestreamstreams-options)
* [concatStreams](#concatstreamsstreams-options)
* [filterData](#filterdatacallback-options)
* [flattenArray](#flattenarrayoptions)
Expand Down Expand Up @@ -167,7 +167,7 @@ oleoduc(
]
```
## chainStreams(...streams, [options])
## pipeStreams(...streams, [options])
Same as oleoduc but without promise stuff and stream composition capability
Expand All @@ -178,14 +178,14 @@ Same as oleoduc but without promise stuff and stream composition capability
- `*`: The rest of the options is passed
to [stream.Transform](https://nodejs.org/api/stream.html#stream_class_stream_transform)
Chain streams
Pipe streams
```js
const { chainStreams, transformData, writeData } = require("oleoduc");
const { pipeStreams, transformData, writeData } = require("oleoduc");
async function getCursor() {
const cursor = await getDataFromDB();
return chainStreams(
return pipeStreams(
cursor,
transformData((data) => data.value * 10),
)
Expand All @@ -201,9 +201,9 @@ await oleoduc(
Iterate over a chained readable stream
```js
const { chainStreams, transformData } = require("oleoduc");
const { pipeStreams, transformData } = require("oleoduc");
const stream = chainStreams(
const stream = pipeStreams(
source,
transformData((data) => data.trim()),
);
Expand All @@ -219,7 +219,7 @@ Handle errors in single event listener
```js
const { oleoduc, writeData } = require("oleoduc");
const stream = chainStreams(
const stream = pipeStreams(
source,
writeData((obj) => throw new Error())
);
Expand Down
6 changes: 3 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {accumulateData} from "./accumulateData.ts";
import {chainStreams} from "./chainStreams.ts";
import {pipeStreams} from "./pipeStreams.ts";
import {concatStreams} from "./concatStreams.ts";
import {filterData} from "./filterData.ts";
import {flattenArray} from "./flattenArray.ts";
Expand All @@ -15,11 +15,11 @@ import {writeData} from "./writeData.ts";
import {writeToStdout} from "./writeToStdout.ts";

//Legacy from 0.8.x
const compose = chainStreams;
const compose = pipeStreams;

export {
accumulateData,
chainStreams,
pipeStreams,
compose,
concatStreams,
filterData,
Expand Down
14 changes: 7 additions & 7 deletions src/chainStreams.ts → src/pipeStreams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ import {decorateWithAsyncIterator} from "./utils/decorateWithAsyncIterator.ts";
import {pipeStreamsTogether} from "./utils/pipeStreamsTogether.ts";
import {AnyStream, PipeableStreams} from "./types.ts";

export type ChainStreamsOptions = TransformOptions;
export type PipeStreamsOptions = TransformOptions;

// eslint-disable-next-line @typescript-eslint/no-unused-vars
type ChainStreamsReturn<TLast extends AnyStream> = TLast extends Readable
type PipeStreamsReturn<TLast extends AnyStream> = TLast extends Readable
? NodeJS.ReadWriteStream & Readable
: NodeJS.ReadWriteStream;

export function chainStreams<TLast extends NodeJS.ReadWriteStream | NodeJS.WritableStream>(
...args: PipeableStreams<NodeJS.ReadableStream | NodeJS.ReadWriteStream, TLast, ChainStreamsOptions>
): ChainStreamsReturn<TLast> {
const {params: streams, options} = parseArgs<AnyStream, ChainStreamsOptions>(args);
export function pipeStreams<TLast extends NodeJS.ReadWriteStream | NodeJS.WritableStream>(
...args: PipeableStreams<NodeJS.ReadableStream | NodeJS.ReadWriteStream, TLast, PipeStreamsOptions>
): PipeStreamsReturn<TLast> {
const {params: streams, options} = parseArgs<AnyStream, PipeStreamsOptions>(args);

const {first, last, wrapper} = wrapStreams(streams, options);

Expand All @@ -31,5 +31,5 @@ export function chainStreams<TLast extends NodeJS.ReadWriteStream | NodeJS.Writa

pipeStreamsTogether(streams, wrapper);

return wrapper as ChainStreamsReturn<TLast>;
return wrapper as PipeStreamsReturn<TLast>;
}
4 changes: 2 additions & 2 deletions src/readLineByLine.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import {accumulateData} from "./accumulateData.ts";
import {flattenArray} from "./flattenArray.ts";
import {chainStreams} from "./chainStreams.ts";
import {pipeStreams} from "./pipeStreams.ts";

export function readLineByLine() {
return chainStreams(
return pipeStreams(
accumulateData<string, string[]>(
(acc, data: string, flush) => {
const lines = data.toString().split(/\r?\n/);
Expand Down
10 changes: 5 additions & 5 deletions test/oleoduc-test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {deepStrictEqual, fail} from "assert";
import {assertErrorMessage, createStream, delay} from "./testUtils.ts";
import {chainStreams, oleoduc, transformData, writeData} from "../src/index.ts";
import {pipeStreams, oleoduc, transformData, writeData} from "../src/index.ts";

describe("oleoduc", () => {
it("can create oleoduc", async () => {
Expand Down Expand Up @@ -35,7 +35,7 @@ describe("oleoduc", () => {
try {
await oleoduc(
source,
chainStreams(
pipeStreams(
transformData((data: string) => data.substring(0, 1)),
transformData((data: string) => "_" + data),
),
Expand All @@ -47,10 +47,10 @@ describe("oleoduc", () => {
}
});

it("can use chainStream inside oleoduc", async () => {
it("can use pipeStream inside oleoduc", async () => {
const chunks: string[] = [];
const source = createStream();
const chained = chainStreams(
const piped = pipeStreams(
source,
transformData((d: string) => d.substring(0, 1)),
);
Expand All @@ -59,7 +59,7 @@ describe("oleoduc", () => {
source.push(null);

await oleoduc(
chained,
piped,
writeData((data: string) => chunks.push(data)),
)
.then(() => {
Expand Down
38 changes: 19 additions & 19 deletions test/chainStreams-test.ts → test/pipeStreams-test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import {deepStrictEqual, fail} from "assert";
import {createSlowStream, createStream} from "./testUtils.ts";
import {chainStreams, flattenArray, transformData, writeData} from "../src/index.ts";
import {pipeStreams, flattenArray, transformData, writeData} from "../src/index.ts";

describe("chainStreams", () => {
it("can chain streams", (done) => {
describe("pipeStreams", () => {
it("can pipe streams", (done) => {
const chunks: string[] = [];
const source = createStream();
source.push("first");
source.push(null);

chainStreams(
pipeStreams(
source,
transformData((data: string) => data.substring(0, 1)),
writeData((data: string) => {
Expand All @@ -26,15 +26,15 @@ describe("chainStreams", () => {
});
});

it("can iterate over a chained stream", async () => {
it("can iterate over a piped stream", async () => {
const chunks: string[] = [];
const source = createStream();
source.push("andré");
source.push("bruno");
source.push("robert");
source.push(null);

const stream = chainStreams(
const stream = pipeStreams(
source,
transformData((data: string) => data.substring(0, 1)),
);
Expand All @@ -46,15 +46,15 @@ describe("chainStreams", () => {
deepStrictEqual(chunks, ["a", "b", "r"]);
});

it("can pipe a chain stream", (done) => {
it("can pipe a pipe stream", (done) => {
const chunks: string[] = [];
const source = createStream();
source.push("andré");
source.push("bruno");
source.push("robert");
source.push(null);

chainStreams(source)
pipeStreams(source)
.pipe(transformData((data: string) => data.substring(0, 1)))
.pipe(
writeData((data: string) => {
Expand All @@ -67,7 +67,7 @@ describe("chainStreams", () => {
});
});

it("can build chain with first writeable and last readable (duplex)", (done) => {
it("can build pipe with first writeable and last readable (duplex)", (done) => {
const chunks: string[] = [];
const source = createStream();
source.push("andré");
Expand All @@ -77,7 +77,7 @@ describe("chainStreams", () => {

source
.pipe(
chainStreams(
pipeStreams(
transformData((data: string) => data.substring(0, 1)),
transformData((data) => "_" + data),
),
Expand All @@ -93,18 +93,18 @@ describe("chainStreams", () => {
});
});

it("can chain inside a chain", (done) => {
it("can pipe inside a pipe", (done) => {
const chunks: string[] = [];
const source = createStream();
const nested = chainStreams(
const nested = pipeStreams(
source,
transformData((d: string) => d.substring(0, 1)),
);

source.push("first");
source.push(null);

chainStreams(
pipeStreams(
nested,
writeData((data: string) => {
chunks.push(data);
Expand All @@ -126,7 +126,7 @@ describe("chainStreams", () => {
source.push(["andré", "bruno", "robert"]);
source.push(null);

chainStreams(
pipeStreams(
source,
flattenArray({highWaterMark: 1}),
createSlowStream({maxWriteInterval: 10}),
Expand All @@ -139,15 +139,15 @@ describe("chainStreams", () => {
});
});

it("can handle back pressure with nested chain", (done) => {
it("can handle back pressure with nested pipe", (done) => {
let result = "";
const source = createStream();
source.push(["andré", "bruno", "robert"]);
source.push(null);

chainStreams(
pipeStreams(
source,
chainStreams(flattenArray({highWaterMark: 1}), createSlowStream({maxWriteInterval: 10})),
pipeStreams(flattenArray({highWaterMark: 1}), createSlowStream({maxWriteInterval: 10})),
writeData((data: string) => {
result += data;
}),
Expand All @@ -160,7 +160,7 @@ describe("chainStreams", () => {
it("should propagate emitted error", (done) => {
const source = createStream();

chainStreams(
pipeStreams(
source,
writeData(() => {}),
)
Expand All @@ -182,7 +182,7 @@ describe("chainStreams", () => {
source.push("first");
source.push(null);

chainStreams(
pipeStreams(
source,
writeData(() => {
throw new Error("write error");
Expand Down
4 changes: 2 additions & 2 deletions test/toAsyncIterator-test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {deepStrictEqual, fail} from "assert";
import {assertErrorMessage, createStream} from "./testUtils.ts";
import {toAsyncIterator} from "../src/utils/toAsyncIterator.ts";
import {chainStreams, transformData} from "../src/index.ts";
import {pipeStreams, transformData} from "../src/index.ts";

describe("toAsyncIterator", () => {
it("can convert a readable stream into an iterator", async () => {
Expand All @@ -21,7 +21,7 @@ describe("toAsyncIterator", () => {

it("iterator should honor error", async () => {
const readable = createStream();
const failingStream = chainStreams(
const failingStream = pipeStreams(
readable,
transformData(() => {
throw new Error("This is a stream error");
Expand Down

0 comments on commit 8be83b8

Please sign in to comment.