n.
The hardened shell of a beetle.1- A handy interface for Kafka in Node.
Compatible with Kafka 0.8.x.x and higher.
Elytron relies on the kafkacat
C library. See kafkacat for instructions on how to install it.
Elytron can be installed by running npm i -s elytron
in the terminal.
Elytron's API exposes three things:
import { produce, consume, starve } from 'elytron';
You'll need to ensure the KAFKA_BROKERS
environment variable is set to the comma-separated list of brokers you want to use, e.g.:
export KAFKA_BROKERS="broker1,broker2,broker3"
# For local testing, such as with a kafka docker cluster:
KAFKA_BROKERS="localhost:9092,localhost:9093,localhost:9094" npm start
A message can be produced and sent to Kafka on a topic by calling the produce
function and passing it the name of a topic as a string. Example:
produce('an_interesting_topic');
Every message produced by elytron includes a timestamp and a unique identifier with the original message. If no message is provided, as in the example above, a "registration" message is created; its value is set to the timestamp of when it is called.
A message can be included like so:
produce('an_interesting_topic', a_relevant_message);
The message provided must be JSON-serializable.
produce
returns a hash containing the status of the produced message. An example hash might look like:
let message = { presses: 'stop' };
let status = produce('news', message);
console.log(status);
//{
// payload: {
// timestamp: 1484272028549,
// id: '1befd1ad-351e-47fe-bb1a-eb5019cbfbd9',
// value: {
// // If no message is provided, this would be:
// // registration: 1484272028549
// presses: 'stop'
// }
// }
//}
The produce
method accepts an optional callback as a third argument:
function work (response) {
// Work based on the response message happens here
}
produce('an_interesting_topic', a_relevant_message, some_work_to_do);
//{
// payload: {
// timestamp: 1484272028549,
// id: '1befd1ad-351e-47fe-bb1a-eb5019cbfbd9',
// value: {
// bar: 'bar'
// },
// response_topic: "response.an_interesting_topic.1befd1ad-351e-47fe-bb1a-eb5019cbfbd9"
// }
//}
If a callback is provided, elytron will create a "private" topic using a UUID, automatically create a consumer for it, and include the name of the response_topic
in its initial message payload. This allows for a consumer listening on the initial topic to provide a response message, which is in turn passed to the callback as a response. Example:
consume('an_interesting_topic', (msg) => {
// Do some work with what you consume
return 'a_reply_message'
});
produce('an_interesting_topic', a_relevant_message, (response) => {
const { payload: { value } } = JSON.parse(response);
console.log(value); // 'a_reply_message'
});
There are multiple ways to consume topics with the consume
function.
// Consume a single topic, do some work for each message
consume('news', (msg) => { /* Do work */ });
// Consume multiple topics, doing work for messages on any of them
consume(['media', 'entertainment'], (msg) => {
// Returned values from a consumer's callback get produced on the
// response_topic, if it's present
return msg.split('').reverse().join('');
});
// Consume as a High-Level Consumer, part of a balanced Consumer Group
consume(topic, work, { group: 'engineering' });
// Consume a topic starting at offset 5 (e.g. consume from the 6th on), and
// continue through all subsequent messages
consume(topic, work, { group: false, offset: 5 });
// Same as above, but exit when the last message has been consumed
consume(topic, work, { group: false, offset: 5, exit: true });
// Consume on all available topics
consume('*', (msg) => {});
Elytron can stop consuming from topics via the starve
function, like so:
// Stop all consumers of sugar
starve('sugar');
// Stop a particular consumer from television
starve('television', "1befd1ad-351e-47fe-bb1a-eb5019cbfbd9");
To run tests for elytron, within the repo execute either npm test
to run the suite, or npm run watch
to execute the suite and watch for changes.