Skip to content

Commit

Permalink
Add first types
Browse files Browse the repository at this point in the history
  • Loading branch information
bguerout committed Nov 26, 2023
1 parent efe8d34 commit 918dbde
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 107 deletions.
5 changes: 1 addition & 4 deletions .mocharc.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
{
"extension": ["ts"],
"node-option": [
"experimental-specifier-resolution=node",
"loader=ts-node/esm"
]
"node-option": ["loader=ts-node/esm"]
}
6 changes: 0 additions & 6 deletions .tools/git-hooks/pre-commit

This file was deleted.

19 changes: 19 additions & 0 deletions .tools/hooks.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/usr/bin/env bash
set -euo pipefail

readonly PROJECT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)/.."
readonly PRE_COMMIT_HOOK="${PROJECT_DIR}/.git/hooks/pre-commit"

cat <<'EOF' >"${PRE_COMMIT_HOOK}"
#!/usr/bin/env bash
set -euo pipefail
# Do not edit. This file has been generated by oleoduc
npm test
npm run lint
npm run build
EOF

chmod +x "${PRE_COMMIT_HOOK}"
echo "pre-push hooks installed in ${PRE_COMMIT_HOOK}"
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
},
"scripts": {
"build": "bash .tools/build.sh",
"hooks": "git config core.hooksPath .tools/git-hooks && chmod +x .tools/git-hooks/*",
"test": "mocha test/",
"hooks": "bash .tools/hooks.sh",
"test": "TS_NODE_PROJECT=tsconfig.test.json mocha test/",
"coverage": "nyc --temp-dir .coverage/.nyc_output --report-dir .coverage --reporter=lcov --reporter=html npm test",
"lint": "eslint src/index.ts test/",
"release": "bash .tools/release.sh master",
Expand Down
8 changes: 6 additions & 2 deletions src/accumulateData.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { Transform } from "stream";
import { Transform, TransformOptions } from "stream";

export function accumulateData(accumulate, options: any = {}) {
type AccumulateDateOptions = {
accumulator?: unknown;
} & TransformOptions;

export function accumulateData(accumulate, options: AccumulateDateOptions = {}) {
const { accumulator, ...rest } = options;
let acc = accumulator === undefined ? null : accumulator;
let flushed = false;
Expand Down
6 changes: 2 additions & 4 deletions src/utils/decorateWithPromise.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
export function decorateWithPromise(stream, promise) {
const descriptors = ["then", "catch", "finally"].map((property) => {
return [property, Reflect.getOwnPropertyDescriptor(Promise.prototype, property)];
return { property, descriptor: Reflect.getOwnPropertyDescriptor(Promise.prototype, property) };
});

for (const [property, descriptor] of descriptors) {
// @ts-ignore
for (const { property, descriptor } of descriptors) {
const value = (...args) => Reflect.apply(descriptor.value, promise, args);
// @ts-ignore
Reflect.defineProperty(stream, property, { ...descriptor, value });
}
return stream;
Expand Down
11 changes: 8 additions & 3 deletions src/utils/toAsyncIterator.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import { Readable } from "stream";

type ToAsyncIteratorOptions = {
chunkSize?: number;
};

/**
* Adapted from
* - https://iximiuz.com/en/posts/nodejs-readable-streams-distilled/
Expand All @@ -8,11 +12,12 @@ import { Readable } from "stream";
* @param options
* @returns {AsyncGenerator<*, void, *>}
*/
export async function* toAsyncIterator(stream, options: any = {}) {
export async function* toAsyncIterator(stream, options: ToAsyncIteratorOptions = {}) {
const chunkSize = options.chunkSize || 1;

if (typeof stream.read !== "function") {
// @ts-ignore
stream = Readable.wrap(stream, { objectMode: true });
//FIXME seems not used
stream = new Readable().wrap(stream);
}

let ended = false;
Expand Down
176 changes: 92 additions & 84 deletions src/writeData.ts
Original file line number Diff line number Diff line change
@@ -1,103 +1,111 @@
import { Writable } from "stream";
import cyclist from "cyclist";
import { inherits } from "util";

const ParallelWrite = function (maxParallel, opts, onWrite) {
if (!(this instanceof ParallelWrite)) {
// @ts-ignore
return new ParallelWrite(maxParallel, opts, onWrite);
}
type ParallelWriteOptions = {
parallel?: number;
};

if (typeof maxParallel === "function") {
onWrite = maxParallel;
opts = null;
maxParallel = 1;
}
if (typeof opts === "function") {
onWrite = opts;
opts = null;
}
class ParallelWrite<T> extends Writable {
private _maxParallel: number;
private _onWrite: (chunk: T, callback: (e: Error, data: T) => void) => void;
private _destroyed: boolean;
private _flushed: boolean;
private _ordered: boolean;
private _buffer: cyclist | [];
private _top: number;
private _bottom: number;
private _ondrain: () => void;

constructor(maxParallel, opts, onWrite) {
super();
if (typeof maxParallel === "function") {
onWrite = maxParallel;
opts = null;
maxParallel = 1;
}
if (typeof opts === "function") {
onWrite = opts;
opts = null;
}

if (!opts) opts = {};
if (!opts.highWaterMark) opts.highWaterMark = Math.max(maxParallel, 16);
if (opts.objectMode !== false) opts.objectMode = true;

Writable.call(this, opts);

this._maxParallel = maxParallel;
this._onWrite = onWrite;
this._destroyed = false;
this._flushed = false;
this._ordered = opts.ordered !== false;
this._buffer = this._ordered ? cyclist(maxParallel) : [];
this._top = 0;
this._bottom = 0;
this._ondrain = null;
};
if (!opts) opts = {};
if (!opts.highWaterMark) opts.highWaterMark = Math.max(maxParallel, 16);
if (opts.objectMode !== false) opts.objectMode = true;

Writable.call(this, opts);

this._maxParallel = maxParallel;
this._onWrite = onWrite;
this._destroyed = false;
this._flushed = false;
this._ordered = opts.ordered !== false;
this._buffer = this._ordered ? cyclist(maxParallel) : [];
this._top = 0;
this._bottom = 0;
this._ondrain = null;
}

inherits(ParallelWrite, Writable);
_destroy() {
if (this._destroyed) return;
this._destroyed = true;
this.emit("close");
}

ParallelWrite.prototype._destroy = function () {
if (this._destroyed) return;
this._destroyed = true;
this.emit("close");
};
_write(chunk, enc, callback) {
const pos = this._top++;

this._onWrite(chunk, (err, data) => {
if (this._destroyed) return;
if (err) {
this.emit("error", err);
this.destroy();
return;
}
if (this._ordered) {
this._buffer.put(pos, data === undefined || data === null ? null : data);
} else {
this._buffer.push(data);
}
this._drain();
});

if (this._top - this._bottom < this._maxParallel) return callback();
this._ondrain = callback;
}

ParallelWrite.prototype._write = function (chunk, enc, callback) {
const self = this;
const pos = this._top++;
_final(callback) {
this._flushed = true;
this._ondrain = callback;
this._drain();
}

this._onWrite(chunk, function (err, data) {
if (self._destroyed) return;
if (err) {
self.emit("error", err);
self.destroy();
return;
}
if (self._ordered) {
self._buffer.put(pos, data === undefined || data === null ? null : data);
_drain() {
if (this._ordered) {
while (this._buffer.get(this._bottom) !== undefined) {
this._buffer.del(this._bottom++);
}
} else {
self._buffer.push(data);
while (this._buffer.length > 0) {
this._buffer.pop();
this._bottom++;
}
}
self._drain();
});

if (this._top - this._bottom < this._maxParallel) return callback();
this._ondrain = callback;
};

ParallelWrite.prototype._final = function (callback) {
this._flushed = true;
this._ondrain = callback;
this._drain();
};
if (!this._drained() || !this._ondrain) return;

ParallelWrite.prototype._drain = function () {
if (this._ordered) {
while (this._buffer.get(this._bottom) !== undefined) {
this._buffer.del(this._bottom++);
}
} else {
while (this._buffer.length > 0) {
this._buffer.pop();
this._bottom++;
}
const ondrain = this._ondrain;
this._ondrain = null;
ondrain();
}

if (!this._drained() || !this._ondrain) return;

const ondrain = this._ondrain;
this._ondrain = null;
ondrain();
};

ParallelWrite.prototype._drained = function () {
const diff = this._top - this._bottom;
return this._flushed ? !diff : diff < this._maxParallel;
};
_drained() {
const diff = this._top - this._bottom;
return this._flushed ? !diff : diff < this._maxParallel;
}
}

export function writeData(handleChunk, options: any = {}) {
return ParallelWrite(options.parallel || 1, { objectMode: true }, async (chunk, callback) => {
export function writeData(handleChunk, options: ParallelWriteOptions = {}) {
return new ParallelWrite(options.parallel || 1, { objectMode: true }, async (chunk, callback) => {
try {
const res = await handleChunk(chunk);
callback(null, res);
Expand Down
2 changes: 1 addition & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"rootDirs": ["src"],
"target": "esnext",
"lib": ["esnext"],
"types": ["node", "mocha"]
"types": ["node"]
},
"exclude": ["node_modules", "dist"],
"include": ["src"]
Expand Down
5 changes: 4 additions & 1 deletion tsconfig.test.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@
"rootDirs": ["./test"],
"types": ["node", "mocha"]
},
"include": ["src", "test"]
"include": ["src", "test"],
"ts-node": {
"transpileOnly": true
}
}

0 comments on commit 918dbde

Please sign in to comment.