Skip to content

Commit

Permalink
feat: use streams rather than a buffer (#10)
Browse files Browse the repository at this point in the history
Reworks the main output method to use streams directly rather than
accumulating chunks through events. This should mean we don't hold any
output until the user awaits the result.
  • Loading branch information
43081j authored Jun 16, 2024
1 parent 8dc684e commit 6de6266
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 70 deletions.
73 changes: 54 additions & 19 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {type Readable} from 'node:stream';
import {normalize as normalizePath} from 'node:path';
import {cwd as getCwd} from 'node:process';
import {computeEnv} from './env.js';
import {readStreamAsString, combineStreams} from './stream.js';
import {combineStreams} from './stream.js';
import readline from 'node:readline';

export interface Output {
Expand Down Expand Up @@ -156,31 +156,37 @@ export class ExecProcess implements Result {

async *[Symbol.asyncIterator](): AsyncIterator<string> {
const proc = this._process;

if (!proc) {
return;
}

if (this._thrownError) {
throw this._thrownError;
}
const streams: Readable[] = [];

const sources: Readable[] = [];
if (proc.stderr) {
sources.push(proc.stderr);
if (this._streamErr) {
streams.push(this._streamErr);
}
if (proc.stdout) {
sources.push(proc.stdout);
if (this._streamOut) {
streams.push(this._streamOut);
}
const combined = combineStreams(sources);

const streamCombined = combineStreams(streams);

const rl = readline.createInterface({
input: combined
input: streamCombined
});

for await (const chunk of rl) {
yield chunk.toString();
}

await this._processClosed;

proc.removeAllListeners();

if (this._thrownError) {
throw this._thrownError;
}
}

protected async _waitForOutput(): Promise<Output> {
Expand All @@ -194,6 +200,21 @@ export class ExecProcess implements Result {
throw new Error('No process was started');
}

let stderr = '';
let stdout = '';

if (this._streamErr) {
for await (const chunk of this._streamErr) {
stderr += chunk.toString();
}
}

if (this._streamOut) {
for await (const chunk of this._streamOut) {
stdout += chunk.toString();
}
}

await this._processClosed;

proc.removeAllListeners();
Expand All @@ -202,14 +223,9 @@ export class ExecProcess implements Result {
throw this._thrownError;
}

const [stderr, stdout] = await Promise.all([
proc.stderr && readStreamAsString(proc.stderr),
proc.stdout && readStreamAsString(proc.stdout)
]);

const result: Output = {
stderr: stderr ?? '',
stdout: stdout ?? ''
stderr,
stdout
};

return result;
Expand All @@ -222,16 +238,20 @@ export class ExecProcess implements Result {
return this._waitForOutput().then(onfulfilled, onrejected);
}

protected _streamOut?: Readable;
protected _streamErr?: Readable;

public spawn(): void {
const cwd = getCwd();
const options = this._options;
const nodeOptions = {
...defaultNodeOptions,
...options.nodeOptions
};

const signals: AbortSignal[] = [];

this._resetState();

if (options.timeout !== undefined) {
signals.push(AbortSignal.timeout(options.timeout));
}
Expand All @@ -255,6 +275,13 @@ export class ExecProcess implements Result {

const handle = spawn(normalisedCommand, normalisedArgs, nodeOptions);

if (handle.stderr) {
this._streamErr = handle.stderr;
}
if (handle.stdout) {
this._streamOut = handle.stdout;
}

this._process = handle;
handle.once('error', this._onError);
handle.once('close', this._onClose);
Expand All @@ -267,6 +294,14 @@ export class ExecProcess implements Result {
}
}

protected _resetState(): void {
this._aborted = false;
this._processClosed = new Promise<void>((resolve) => {
this._resolveClose = resolve;
});
this._thrownError = undefined;
}

protected _onError = (err: Error): void => {
if (
err.name === 'AbortError' &&
Expand Down
20 changes: 0 additions & 20 deletions src/stream.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,6 @@
import {type EventEmitter} from 'node:events';
import {type Readable, PassThrough} from 'node:stream';

export const readStreamAsString = (stream: Readable): Promise<string> => {
return new Promise<string>((resolve, reject) => {
let result = '';
const onDataReceived = (chunk: Buffer | string): void => {
result += chunk.toString();
};

stream.once('error', (err) => {
reject(err);
});

stream.on('data', onDataReceived);

stream.once('end', () => {
stream.removeListener('data', onDataReceived);
resolve(result);
});
});
};

export const waitForEvent = (
emitter: EventEmitter,
name: string
Expand Down
98 changes: 98 additions & 0 deletions src/test/main_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import {x} from '../main.js';
import * as assert from 'node:assert/strict';
import {test} from 'node:test';

test('exec', async (t) => {
await t.test('resolves to stdout', async () => {
const result = await x('echo', ['foo']);
assert.equal(result.stdout, 'foo\n');
assert.equal(result.stderr, '');
});

await t.test('times out after defined timeout (ms)', async () => {
const proc = x('sleep', ['0.2s'], {timeout: 100});
await assert.rejects(async () => {
await proc;
});
assert.equal(proc.killed, true);
assert.equal(proc.process!.signalCode, 'SIGTERM');
});

await t.test('throws spawn errors', async () => {
const proc = x('definitelyNonExistent');
await assert.rejects(async () => {
await proc;
}, 'spawn definitelyNonExistent NOENT');
});

await t.test('captures stderr', async () => {
const result = await x('cat', ['nonexistentforsure']);
assert.equal(
result.stderr,
'cat: nonexistentforsure: No such file or directory\n'
);
assert.equal(result.stdout, '');
});

await t.test('pid is number', async () => {
const proc = x('echo', ['foo']);
await proc;
assert.ok(typeof proc.pid === 'number');
});

await t.test('exitCode is set correctly', async () => {
const proc = x('echo', ['foo']);
assert.equal(proc.exitCode, undefined);
await proc;
assert.equal(proc.exitCode, 0);
});

await t.test('kill terminates the process', async () => {
const proc = x('sleep', ['5s']);
const result = proc.kill();
assert.ok(result);
assert.ok(proc.killed);
assert.equal(proc.aborted, false);
});

await t.test('pipe correctly pipes output', async () => {
const echoProc = x('echo', ['foo\nbar']);
const grepProc = echoProc.pipe('grep', ['foo']);
const result = await grepProc;

assert.equal(result.stderr, '');
assert.equal(result.stdout, 'foo\n');
assert.equal(echoProc.exitCode, 0);
assert.equal(grepProc.exitCode, 0);
});

await t.test('async iterator gets correct output', async () => {
const proc = x('echo', ['foo\nbar']);
const lines = [];
for await (const line of proc) {
lines.push(line);
}

assert.deepEqual(lines, ['foo', 'bar']);
});

await t.test('async iterator receives errors', async () => {
const proc = x('nonexistentforsure');
await assert.rejects(async () => {
for await (const line of proc) {
line;
}
});
});

await t.test('signal can be used to abort execution', async () => {
const controller = new AbortController();
const proc = x('sleep', ['4s'], {signal: controller.signal});
controller.abort();
const result = await proc;
assert.ok(proc.aborted);
assert.ok(proc.killed);
assert.equal(result.stderr, '');
assert.equal(result.stdout, '');
});
});
32 changes: 1 addition & 31 deletions src/test/stream_test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {combineStreams, waitForEvent, readStreamAsString} from '../stream.js';
import {combineStreams, waitForEvent} from '../stream.js';
import * as assert from 'node:assert/strict';
import {test} from 'node:test';
import {EventEmitter} from 'node:events';
Expand All @@ -13,36 +13,6 @@ test('waitForEvent', async (t) => {
});
});

test('readStreamAsString', async (t) => {
await t.test('rejects on error', async () => {
const streamError = new Error('fudge');
const stream = new Readable({
read() {
this.destroy(streamError);
}
});
await assert.rejects(readStreamAsString(stream), streamError);
});

await t.test('resolves to concatenated data', async () => {
const stream = Readable.from(['foo', 'bar']);
const result = await readStreamAsString(stream);
assert.equal(result, 'foobar');
});

await t.test('handles buffer data', async () => {
const stream = new Readable({
read() {
this.push(Buffer.from('foo'));
this.push(Buffer.from('bar'));
this.push(null);
}
});
const result = await readStreamAsString(stream);
assert.equal(result, 'foobar');
});
});

test('combineStreams', async (t) => {
await t.test('works with a single stream', async () => {
const stream = Readable.from(['foo', 'bar']);
Expand Down

0 comments on commit 6de6266

Please sign in to comment.