Kafka in Node.js

Use the installation instructions below if you're installing Helios in a Node.js service consuming or producing messages to Kafka.

Consuming messages with eachBatch

When consuming messages from Kafka with the eachBatch handler, special attention should be given to prevent Trace data cut off.

In Kafka, the context is injected into each message and so should also be extracted from each message so that the flow will be intact.

Instead of using the for (let message of batch.messages) form, move to an iterative batch.messages.forEach((message) => {...}) form. This should assure proper context propagation (map and filter array operations are also supported out of the box).

Alternatively, for any custom flow, you can also use the following method to run any handling inside the context of any message:

const { withKafkaMessageContext } = require('@heliosphere/opentelemetry-instrumentation-kafkajs');

for (let message of batch.messages) {
    await withKafkaMessageContext(message, async () => {
	await this.handleEntityCreated(message);
    });
}
import { withKafkaMessageContext } from '@heliosphere/opentelemetry-instrumentation-kafkajs';

for (let message of batch.messages) {
    await withKafkaMessageContext(message, async () => {
	await this.handleEntityCreated(messages);
    });
}