diff --git a/docs/workflows/how-to/fail-pipeline-gently.md b/docs/workflows/how-to/fail-pipeline-gently.md new file mode 100644 index 0000000..b31fdb1 --- /dev/null +++ b/docs/workflows/how-to/fail-pipeline-gently.md @@ -0,0 +1,71 @@ +# Fail pipeline without stopping it immediately + +The easiest way to stop a pipeline is to throw an exception. However, this will stop the pipeline immediately. +Consider the simple step implementation below. + +```js +export default function streamFailingRandomly() { + return async function * (source) { + let total = 0; + + for await (const chunk of source) { + if (Math.random() > 0.5) { + throw new Error('Random error'); + } else { + total += chunk.length; + yield chunk; + } + } + + this.logger.info(`Total processed: ${total}`); + } +} +``` + +When the error is thrown, the pipeline will stop immediately. No further chunks will be processed and the code following +the async loop will not be reached. +It is quite clear from a generator-style operation implementation but can be a surprise when implementing steps using low +level streams. In the latter case, the flush callback will also not be called. + +What's more, an exception thrown in one step affects other steps in the pipeline, preventing their respective flush +callbacks from being called. + +:::note +`try..catch` block will not help here. When an error is thrown inside the async generator, it will be caught by the +pipeline and the processing is stopped immediately. +::: + +## `this.error()` to the rescue + +The solution is to use the `this.error()` method. It will instruct barnard that an error occurred but will not stop +processing. It is the responsibility of the step implementor how to handle the rest of the stream. + +For the example above, the implementation could be changed to break the loop and call `this.error` instead of throwing. + +```js +export default function streamFailingRandomly() { + return async function * (source) { + let error = null; + let total = 0; + + for await (const chunk of source) { + if (Math.random() > 0.5) { + this.error(new Error('Random error')); + break; + } else { + total += chunk.length; + yield chunk; + } + } + + this.logger.info(`Total processed: ${total}`); + }.bind(this) +} +``` + +:::caution +When `this.error` is called, even when the implementor breaks the loop, the pipeline will still continue to the end. +All chunks which were already processed will be passed to the subsequent steps and the pipeline itself will not break. +This is important for example when writing to a HTTP endpoint. Unlike when throwing an exception, the HTTP request will +be sent, albeit possibly incomplete. +::: diff --git a/docs/workflows/how-to/implement-steps.md b/docs/workflows/how-to/implement-steps.md new file mode 100644 index 0000000..4578f42 --- /dev/null +++ b/docs/workflows/how-to/implement-steps.md @@ -0,0 +1,166 @@ +--- +title: Implement steps +sidebar_position: 1 +--- + +# Implementing steps + +As explained on the [Pipeline Concepts](../explanations/pipeline.md#step) page, steps are linked from a pipeline using +`code:link` property. Out of the box barnard59 supports JavaScript steps. The implementation of a step is a factory +function which returns a stream or an async generator. + +This page presents three common ways to implement JavaScript steps. + +## Async generators + +The recommended way to implement a step is to use an async generator. Simply return an async generator from the factory. +It must have a single argument which is the stream itself as an iterable. The generator can then yield values to the stream. + +```js +import nanoid from 'nanoid' + +/** + * @this {import('barnard59-core').Context} + */ +export default function step() { + // 1. prepare the step + const id = nanoid() + const ex = this.rdf.namespace('http://example.org/') + + return async function* (stream) { + // 2. before first chunk + let total = 0 + yield this.rdf.quad(rdf.blankNode(id), rdf.ns.rdf.type, ex.StepSummary) + + for await (const quad of stream) { + // 3. push chunks down the pipeline + total++ + yield quad; + } + + // 4. after last chunk + yield this.rdf.quad(rdf.blankNode(id), ex.totalQuads, total) + }.bind(this) +} +``` + +Commented are the four phases of a step: + +1. The first phase is the preparation of the step. This happens before the pipeline +starts processing data. Can be `async`. +2. The second phase is right before the first chunk is processed. +3. The third phase is the processing of the stream. Implementors `yield` values to the stream. Individual chunks can be transformed, or skipped by continuing the loop without yielding. +4. The fourth phase is after the last chunk has been processed. This is the place to clean up resources or push additional data. + +:::caution +The operations implemented using async generators always create [Duplex streams](https://nodejs.org/api/stream.html#stream_class_stream_duplex), which means that they will be used as both readable and writable streams. As a consequence, +they will not be able to be used as the first step in a pipeline, as the first step must be a readable stream. +::: + +## through2 + +If, for some reason, you cannot use async generators, you can use the [through2](https://npm.im/through2) module as an +easy alternative to using the NodeJS streams API directly. + +```js +import through2 from 'through2' +import nanoid from 'nanoid' + +/** + * @this {import('barnard59-core').Context} + */ +export default function step() { + const { rdf } = this + + // 1. prepare the step + const id = nanoid() + const ex = rdf.namespace('http://example.org/') + let total = 0 + + return through2.obj(function (chunk, encoding, callback) { + // 3. push chunks down the pipeline + total++ + this.push(chunk) + callback() + }, function (callback) { + // 4. after last chunk + this.push(rdf.quad(rdf.blankNode(id), rdf.ns.rdf.type, ex.StepSummary)) + this.push(rdf.quad(rdf.blankNode(id), ex.totalQuads, total)) + callback() + }) +} +``` + +Note that there are some important differences between the through2 step and async generators: + +1. When using through2, it is not possible to capture a specific `before` stage. Any additional data must be pushed in the `flush` callback. + - Alternatively, a library like [onetime](https://npm.im/onetime) can be used to create `before` stage is only executed once. +2. The stream transform and flush functions are not bound to the context. This means that the context must be captured in a closure, or they must be implemented as arrow functions. + +:::caution +Similarly, a through2 step will always create a Duplex stream, and cannot be used as the first step in a pipeline. +::: + +## NodeJS streams + +If you need more control over the stream, you can implement the step using the NodeJS streams API directly. This is the +most verbose and powerful way. + +Below is the above example of a step that uses the NodeJS streams API directly. Same principles as with through2 apply here. + +```js +import { Transform } from 'stream' +import nanoid from 'nanoid' + +/** + * @this {import('barnard59-core').Context} + */ +export default function step() { + const { rdf } = this + + // 1. prepare the step + const id = nanoid() + const ex = rdf.namespace('http://example.org/') + let total = 0 + + return new Transform({ + objectMode: true, + transform (chunk, encoding, callback) { + // 3. push chunks down the pipeline + total++ + this.push(chunk) + callback() + }, + flush (callback) { + // 4. after last chunk + this.push(rdf.quad(rdf.blankNode(id), rdf.ns.rdf.type, ex.StepSummary)) + this.push(rdf.quad(rdf.blankNode(id), ex.totalQuads, total)) + callback() + } + }) +} +``` + +The major difference between this and methods above is the possibility to implement streams which are only `Readable` or +only `Writable`. This means that this method can be used to implement the first and last step in a pipeline. + +For example, the following step will create a stream which emits a single quad and then ends. + +```js +import { Readable } from 'stream' + +/** + * @this {import('barnard59-core').Context} + */ +export default function step() { + const { rdf } = this + + return new Readable({ + objectMode: true, + read () { + this.push(rdf.quad(rdf.blankNode(), rdf.ns.rdf.type, rdf.ns.rdfs.Resource)) + this.push(null) + } + }) +} +```