
Simple stream-based kafka listener based on node-rdkafka. Calculates metrics on lag and group consumption rate.

Exposes a single function that returns an object used for streaming messages and consuming.

const kafka = require("exp-kafka-listener");
const listener = listen(options, groupId, topics);
const readStream = listener.readStream;

See examples below for more info.


  • host: Comma-separated list of kafka hosts.
  • username: If set, SASL/PLAIN authentication will be used when connecting.
  • password: Password for SASL authentication.
  • autoCommit: Automatically commit messeges every 5 seconds, default false.
  • fetchSize: Kafka fetch size, default 500.
  • fromOffset: Kafka start offset, default "latest".
  • statsInterval: The rate at which statistics are reported (in ms), default 30000.


The object returned from "listen" is an event emitter that emits the following events:

  • 'ready': Emitted once the listener has successfully connected to the kafka cluster.
  • 'stats': Emitted on a regular interval, supplies an object with the following props
    • lag: Total lag for consumer group
    • messageRate: Message consumption rate for consumer group (will be negative if producers are faster than consumers)
    • error: If an error occured when stats were calculated
    • time: Timestamp when stats were generated


Manual commits and streams

Use this if you want to be sure that all messages are processed before being committed. Any in-flight messages will be re-sent in case of a process crash/restart. Back-pressure is handled by node js streams so the fetch rate is adjusted to the consumption rate.

const kafka = require("exp-kafka-listener");
const through = require("through2");
const {pipeline} = require("stream");

const kafkaOptions = {
  host: "mykafkahost-1:9200,mykafkahost-2:9200",
  autoCommit: false

const listener = kafka.listen("my-group-id", ["my-topic"]);

const msgHandler = through.obj((msg, _encoding, done) => {
  const payload = msg.value;
  someAsyncOperation(payload, (err)) => {

const commitHandler = through.obj((msg, _encoding, done) => {

pipeline(listener.readStream, msgHandler, commitHandler, (err) {
  throw err || "Stream ended"; // Stream should never end.

Autocommit and streams

Use this if you don't care about losing a few in-flight messages during restarts. Messages will be automatically committed every five seconds. Back-pressure is handled by node js streams so the fetch rate is adjusted to the consumtion rate. Therefore the number of in-flight messages is usually low.

const kafka = require("exp-kafka-listener");
const through = require("through2");
const {pipeline} = require("stream");

const kafkaOptions = {
  host: "mykafkahost-1:9200,mykafkahost-2:9200",
  autoCommit: true

const listener = kafka.listen("my-group-id", ["my-topic"]);

const msgHandler = through.obj((msg, _encoding, done) => {
  const payload = msg.value;
  someAsyncOperation(payload, (err)) => {

pipeline(listener.readStream, msgHandler, (err) {
  throw err || "Stream ended"; // Stream should never end.

Autocommit scenario ignoring backpressure

The simplest and fastest of consuming messages. However, backpressure is not dealt with so if consumption is slow many messages left hanging in-flight and likely not redelivered in case of crashes/restarts.

const kafka = require("exp-kafka-listener");

const kafkaOptions = {
  host: "mykafkahost-1:9200,mykafkahost-2:9200",
  autoCommit: true

const listener = kafka.listen("my-group-id", ["my-topic"]);
listener.readStream.on("data", (msg) => {
  // .. go to town

Further reading

Node js streams: node-rdkafka