Skip to content

Commit

Permalink
docs: creating steps, context.error()
Browse files Browse the repository at this point in the history
  • Loading branch information
tpluscode committed Jan 8, 2024
1 parent 1cccf5f commit 239db46
Show file tree
Hide file tree
Showing 2 changed files with 237 additions and 0 deletions.
71 changes: 71 additions & 0 deletions docs/workflows/how-to/fail-pipeline-gently.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Fail pipeline without stopping it immediately

The easiest way to stop a pipeline is to throw an exception. However, this will stop the pipeline immediately.
Consider the simple step implementation below.

```js
export default function streamFailingRandomly() {
return async function * (source) {
let total = 0;

for await (const chunk of source) {
if (Math.random() > 0.5) {
throw new Error('Random error');
} else {
total += chunk.length;
yield chunk;
}
}

this.logger.info(`Total processed: ${total}`);
}
}
```

When the error is thrown, the pipeline will stop immediately. No further chunks will be processed and the code following
the async loop will not be reached.
It is quite clear from a generator-style operation implementation but can be a surprise when implementing steps using low
level streams. In the latter case, the flush callback will also not be called.

What's more, an exception thrown in one step affects other steps in the pipeline, preventing their respective flush
callbacks from being called.

:::note
`try..catch` block will not help here. When an error is thrown inside the async generator, it will be caught by the
pipeline and the processing is stopped immediately.
:::

## `this.error()` to the rescue

The solution is to use the `this.error()` method. It will instruct barnard that an error occurred but will not stop
processing. It is the responsibility of the step implementor how to handle the rest of the stream.

For the example above, the implementation could be changed to break the loop and call `this.error` instead of throwing.

```js
export default function streamFailingRandomly() {
return async function * (source) {
let error = null;
let total = 0;

for await (const chunk of source) {
if (Math.random() > 0.5) {
this.error(new Error('Random error'));
break;
} else {
total += chunk.length;
yield chunk;
}
}

this.logger.info(`Total processed: ${total}`);
}.bind(this)
}
```

:::caution
When `this.error` is called, even when the implementor breaks the loop, the pipeline will still continue to the end.
All chunks which were already processed will be passed to the subsequent steps and the pipeline itself will not break.
This is important for example when writing to a HTTP endpoint. Unlike when throwing an exception, the HTTP request will
be sent, albeit possibly incomplete.
:::
166 changes: 166 additions & 0 deletions docs/workflows/how-to/implement-steps.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
---
title: Implement steps
sidebar_position: 1
---

# Implementing steps

As explained on the [Pipeline Concepts](../explanations/pipeline.md#step) page, steps are linked from a pipeline using
`code:link` property. Out of the box barnard59 supports JavaScript steps. The implementation of a step is a factory
function which returns a stream or an async generator.

This page presents three common ways to implement JavaScript steps.

## Async generators

The recommended way to implement a step is to use an async generator. Simply return an async generator from the factory.
It must have a single argument which is the stream itself as an iterable. The generator can then yield values to the stream.

```js
import nanoid from 'nanoid'

/**
* @this {import('barnard59-core').Context}
*/
export default function step() {
// 1. prepare the step
const id = nanoid()
const ex = this.rdf.namespace('http://example.org/')

return async function* (stream) {
// 2. before first chunk
let total = 0
yield this.rdf.quad(rdf.blankNode(id), rdf.ns.rdf.type, ex.StepSummary)

for await (const quad of stream) {
// 3. push chunks down the pipeline
total++
yield quad;
}

// 4. after last chunk
yield this.rdf.quad(rdf.blankNode(id), ex.totalQuads, total)
}.bind(this)
}
```

Commented are the four phases of a step:

1. The first phase is the preparation of the step. This happens before the pipeline
starts processing data. Can be `async`.
2. The second phase is right before the first chunk is processed.
3. The third phase is the processing of the stream. Implementors `yield` values to the stream. Individual chunks can be transformed, or skipped by continuing the loop without yielding.
4. The fourth phase is after the last chunk has been processed. This is the place to clean up resources or push additional data.

:::caution
The operations implemented using async generators always create [Duplex streams](https://nodejs.org/api/stream.html#stream_class_stream_duplex), which means that they will be used as both readable and writable streams. As a consequence,
they will not be able to be used as the first step in a pipeline, as the first step must be a readable stream.
:::

## through2

If, for some reason, you cannot use async generators, you can use the [through2](https://npm.im/through2) module as an
easy alternative to using the NodeJS streams API directly.

```js
import through2 from 'through2'
import nanoid from 'nanoid'

/**
* @this {import('barnard59-core').Context}
*/
export default function step() {
const { rdf } = this

// 1. prepare the step
const id = nanoid()
const ex = rdf.namespace('http://example.org/')
let total = 0

return through2.obj(function (chunk, encoding, callback) {
// 3. push chunks down the pipeline
total++
this.push(chunk)
callback()
}, function (callback) {
// 4. after last chunk
this.push(rdf.quad(rdf.blankNode(id), rdf.ns.rdf.type, ex.StepSummary))
this.push(rdf.quad(rdf.blankNode(id), ex.totalQuads, total))
callback()
})
}
```

Note that there are some important differences between the through2 step and async generators:

1. When using through2, it is not possible to capture a specific `before` stage. Any additional data must be pushed in the `flush` callback.
- Alternatively, a library like [onetime](https://npm.im/onetime) can be used to create `before` stage is only executed once.
2. The stream transform and flush functions are not bound to the context. This means that the context must be captured in a closure, or they must be implemented as arrow functions.

:::caution
Similarly, a through2 step will always create a Duplex stream, and cannot be used as the first step in a pipeline.
:::

## NodeJS streams

If you need more control over the stream, you can implement the step using the NodeJS streams API directly. This is the
most verbose and powerful way.

Below is the above example of a step that uses the NodeJS streams API directly. Same principles as with through2 apply here.

```js
import { Transform } from 'stream'
import nanoid from 'nanoid'

/**
* @this {import('barnard59-core').Context}
*/
export default function step() {
const { rdf } = this

// 1. prepare the step
const id = nanoid()
const ex = rdf.namespace('http://example.org/')
let total = 0

return new Transform({
objectMode: true,
transform (chunk, encoding, callback) {
// 3. push chunks down the pipeline
total++
this.push(chunk)
callback()
},
flush (callback) {
// 4. after last chunk
this.push(rdf.quad(rdf.blankNode(id), rdf.ns.rdf.type, ex.StepSummary))
this.push(rdf.quad(rdf.blankNode(id), ex.totalQuads, total))
callback()
}
})
}
```

The major difference between this and methods above is the possibility to implement streams which are only `Readable` or
only `Writable`. This means that this method can be used to implement the first and last step in a pipeline.

For example, the following step will create a stream which emits a single quad and then ends.

```js
import { Readable } from 'stream'

/**
* @this {import('barnard59-core').Context}
*/
export default function step() {
const { rdf } = this

return new Readable({
objectMode: true,
read () {
this.push(rdf.quad(rdf.blankNode(), rdf.ns.rdf.type, rdf.ns.rdfs.Resource))
this.push(null)
}
})
}
```

0 comments on commit 239db46

Please sign in to comment.