README
node-rdkafka-commit-manager
A simple helper for controlling when an offset is ready to be committed via node-rdkafka.
Goals
This package is intended to help you implement at-least-once processing without making a network call for every message you process.
During frequent use, the commit manager will commit offsets only as often as a configurable commit interval.
During infrequent use, the commit manager will always immediately commit if it has seen a period of inactivity exceeding the configurable commit interval.
useCommitManager
The commit manager is exposed via a hook function (you don't need to be using React in order to use this.)
const { readyToCommit, onRebalance } = useCommitManager(consumer, commitIntervalMs);
Arguments
- consumer: this must be a KafkaConsumer object from node-rdkafka. It must be configured with auto-commit turned off.
- commitIntervalMs: the number of milliseconds the commit manager should wait between commits.
- optional: defaults to 5000
Returned Functions
- readyToCommit(data)
- data: This must be an object containing the properties "topic", "partition", and "offset" from the Kafka mesage.
- Call this whenever you are finished with a Kafka message.
- onRebalance()
- Call this whenever your KafkaConsumer has its partitions revoked.
Usage Example
This package can be used from JavaScript or TypeScript.
The examples below illustrate a few key points about using this commit manager:
Disable auto-commit feature on your KafkaConsumer.
- Otherwise, the commit manager will be competing with node-rdkafka's auto-commit behavior.
Use non-flowing mode on your KafkaConsumer.
- Otherwise, node-rdkafka will provide batches of messages, which may be handled out of order.
- Since non-flowing mode only allows for one message at a time, it will significantly slow the rate at which a single consumer can process messages, so additional horizontal scaling is necessary to compensate.
Implement a rebalance callback function which calls the the commit manager's onRebalance function any time your KafkaConsumer's partitions are revoked.
Otherwise, the commit manager may later try to commit offsets for partitions which it is no longer assigned.
The above example covers the minimum responsibilities of the function. See the node-rdkafka and/or librdkafka documentation for more details.
Call the commit manager's readyToCommit function for each Kafka message you process.
- Only call readyToCommit when you have finished processing the message.
- Since the data objects provided by node-rdkafka's KafkaConsumer already have all of the necessary properties, you can just use those if you want to.
TypeScript
import { useCommitManager } from "node-rdkafka-commit-manager";
import { CODES, KafkaConsumer } from "node-rdkafka";
const consumeNonFlowing = (consumer, consumeTimeout) => {
consumer.consume(1);
return setInterval(function() {
consumer.consume(1);
}, CONSUME_TIMEOUT_MS);
};
const stopConsuming = (consumeInterval) => {
if (consumeInterval) {
clearInterval(consumeInterval);
}
};
const rebalanceCallback = async (err: any, assignments: any) => {
if (err.code === CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
consumer.assign(assignments);
} else if (err.code === CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
consumer.unassign();
onRebalance();
} else {
console.error(`Kafka rebalance error : ${err}`);
}
};
const consumer = new KafkaConsumer(
{
"enable.auto.commit": false,
rebalance_cb: rebalanceCallback.bind(this),
// <Your global config here (ex: authentication, consumer group, etc.)>
},
{
// <Your topic config here>
}
);
const { readyToCommit, onRebalance } = useCommitManager(consumer);
const CONSUME_TIMEOUT_MS = 1000;
let consumeInterval: NodeJS.Timeout;
consumer
.on("ready", function() {
consumer.subscribe(["sample.test.topic"]);
consumer.setDefaultConsumeTimeout(CONSUME_TIMEOUT_MS);
consumeInterval = consumeNonFlowing(consumer, CONSUME_TIMEOUT_MS);
})
.on("data", function(data: any) {
stopConsuming(consumeInterval);
// <Process the Kafka message here.>
readyToCommit(data);
consumeInterval = consumeNonFlowing(consumer, CONSUME_TIMEOUT_MS);
})
.connect();
JavaScript
const { useCommitManager } = require("node-rdkafka-commit-manager");
const { CODES, KafkaConsumer } = require("node-rdkafka");
const consumeNonFlowing = (consumer, consumeTimeout) => {
consumer.consume(1);
return setInterval(function() {
consumer.consume(1);
}, CONSUME_TIMEOUT_MS);
};
const stopConsuming = (consumeInterval) => {
if (consumeInterval) {
clearInterval(consumeInterval);
}
};
const rebalanceCallback = async (err, assignments) => {
if (err.code === CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
consumer.assign(assignments);
} else if (err.code === CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
consumer.unassign();
onRebalance();
} else {
console.error(`Kafka rebalance error : ${err}`);
}
};
const consumer = new KafkaConsumer(
{
"enable.auto.commit": false,
rebalance_cb: rebalanceCallback.bind(this),
// <Your global config here (ex: authentication, consumer group, etc.)>
},
{
// <Your topic config here>
}
);
const { readyToCommit, onRebalance } = useCommitManager(consumer);
const CONSUME_TIMEOUT_MS = 1000;
let consumeInterval;
consumer
.on("ready", function() {
consumer.subscribe(["sample.test.topic"]);
consumer.setDefaultConsumeTimeout(CONSUME_TIMEOUT_MS);
consumeInterval = consumeNonFlowing(consumer, CONSUME_TIMEOUT_MS);
})
.on("data", function(data) {
stopConsuming(consumeInterval);
// <Process the Kafka message here.>
readyToCommit(data);
consumeInterval = consumeNonFlowing(consumer, CONSUME_TIMEOUT_MS);
})
.connect();