wikimedia-node-rdkafka

Node.js bindings for librdkafka

Usage no npm install needed!

<script type="module">
  import wikimediaNodeRdkafka from 'https://cdn.skypack.dev/wikimedia-node-rdkafka';
</script>

README

node-rdkafka - Node.js wrapper for Kafka C/C++ library

Copyright (c) 2016 Blizzard Entertainment.

https://github.com/blizzard/node-rdkafka

Build Status npm version

Overview

The node-rdkafka library is a high-performance NodeJS client for Apache Kafka that wraps the native librdkafka library. All the complexity of balancing writes across partitions and managing (possibly ever-changing) brokers should be encapsulated in the library.

Reference Docs

To view the reference docs for the current version, go here

Contributing

For guidelines on contributing please see CONTRIBUTING.md

Code of Conduct

Play nice; Play fair.

Requirements

  • Apache Kafka >=0.9
  • Node.js >=4
  • Linux/Mac (Sorry Windows :()

Tests

This project includes two types of unit tests in this project:

  • end-to-end integration tests
  • unit tests

You can run both types of tests by using Makefile. Doing so calls mocha in your locally installed node_modules directory.

  • Before you run the tests, be sure to init and update the submodules:
    1. git submodule init
    2. git submodule update
  • To run the unit tests, you can run make lint or make test.
  • To run the integration tests, you must have a running Kafka installation available. By default, the test tries to connect to localhost:9092; however, you can supply the KAFKA_HOST environment variable to override this default behavior.

Usage

You can install the node-rdkafka module like any other module:

npm install node-rdkafka

To use the module, you must require it.

var Kafka = require('node-rdkafka');

Configuration

You can pass many configuration options to librdkafka. A full list can be found at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.

Configuration keys that have the suffix _cb are designated as callbacks. Some of these keys are informational and you can choose to opt-in (for example, dr_cb). Others are callbacks designed to return a value, such as partitioner_cb.

Not all of these options are supported. The library will throw an error if the value you send in is invalid.

The library currently supports the following callbacks:

  • partitioner_cb
  • dr_cb
  • event_cb

Sending Messages

A Producer sends messages to Kafka. The Producer constructor takes a configuration object, as shown in the following example:

var producer = new Kafka.Producer({
  'metadata.broker.list': 'kafka-host1:9092,kafka-host2:9092'
});

A Producer requires only metadata.broker.list (the Kafka brokers) to be created. The values in this list are separated by commas. For other configuration options, see the Configuration.md file described previously.

The following example illustrates a list with several librdkafka options set.

var producer = new Kafka.Producer({
  'client.id': 'kafka',
  'metadata.broker.list': 'localhost:9092',
  'compression.codec': 'gzip',
  'retry.backoff.ms': 200,
  'message.send.max.retries': 10,
  'socket.keepalive.enable': true,
  'queue.buffering.max.messages': 100000,
  'queue.buffering.max.ms': 1000,
  'batch.num.messages': 1000000,
  'dr_cb': true
});

Stream API

You can easily use the Producer as a writable stream immediately after creation (as shown in the following example):

// Our producer with its Kafka brokers
var producer = new Kafka.Producer({
  'metadata.broker.list': 'kafka-host1:9092,kafka-host2:9092'
});

// This call returns a new writable stream to our topic 'topic-name'
var stream = producer.getWriteStream('topic-name');

// Writes a message to the stream
var queuedSuccess = stream.write(new Buffer('Awesome message'));

if (queuedSuccess) {
  console.log('We queued our message!');
} else {
  // Note that this only tells us if the stream's queue is full,
  // it does NOT tell us if the message got to Kafka!  See below...
  console.log('Too many messages in our queue already');
}

stream.on('error', function (err) {
  // Here's where we'll know if something went wrong sending to Kafka
  console.error('Error in our kafka stream');
  console.error(err);
})

Note that getWriteStream will create a new stream on every call. You should try to cache the returned stream for a topic after the first call.

Standard API

The Standard API is more performant, particularly when handling high volumes of messages. However, it requires more manual setup to use. The following example illustrates its use:

var producer = new Kafka.Producer({
  'metadata.broker.list': 'localhost:9092',
  'dr_cb': true
});

// Connect to the broker manually
producer.connect();

// Wait for the ready event before proceeding
producer.on('ready', function() {
  // Create a Topic object with any options our Producer
  // should use when writing to that topic.
  var topic = producer.Topic('topic', {
    // Make the Kafka broker acknowledge our message (optional)
    'request.required.acks': 1
  });

  producer.produce({
    // Message to send. If a string is supplied, it will be
    // converted to a Buffer automatically, but we're being
    // explicit here for the sake of example.
    message: new Buffer('Awesome message'),

    // The topic object we created above
    topic: topic
  }, function(err) {
    // Called after the message is queued
    if (err) {
      console.error('A problem occurred when sending our message');
      console.error(err);
    } else {
      console.log('Message produced successfully!');
    }
  });
});

// Any errors we encounter, including connection errors
producer.on('error', function(err) {
  console.error('Error from producer');
  console.error(err);
})

To see the configuration options available to you, see the Configuration section.

Methods
Method Description
producer.connect() Connects to the broker.

The connect() method emits the ready event when it connects successfully or an error when it does not.
producer.disconnect() Disconnects from the broker.

The disconnect() method emits the disconnected event when it has disconnected or error if something went wrong.
producer.poll() Polls the producer for delivery reports or other events to be transmitted via the emitter.

This happens automatically on transactions such as produce.
producer.produce(msg) Sends a message.

The produce() method takes a JSON object in the format showed above.
Events

Some configuration properties that end in _cb indicate that an event should be generated for that option. You can either:

  • provide a value of true and react to the event
  • provide a callback function directly

The following example illustrates an event:

var producer = new Kafka.Producer({
  'client.id': 'my-client', // Specifies an identifier to use to help trace activity in Kafka
  'metadata.broker.list': 'localhost:9092', // Connect to a Kafka instance on localhost
  'dr_cb': true // Specifies that we want a delivery-report event to be generated
});

producer.on('delivery-report', function(report) {
  // Report of delivery statistics here:
  //
  console.log(report);
});

The following table describes types of events.

Event Description
error Error reporting is handled through this pipeline.

Most errors will have a value for code, message, and origin. origin will be local or kafka to determine where the error happened.
disconnected The disconnected event is emitted when the broker has disconnected.

This event is emitted only when .disconnect is called. The wrapper will always try to reconnect otherwise.
ready The ready event is emitted when the Producer is ready to send messages.
event The event event is emitted when librdkafka reports an event (if you opted in via the event_cb option).
event.log The event.log event is emitted when logging events come in (if you opted into logging via the event_cb option).

You will need to set a value for debug if you want to send information.
event.status The event.status event is emitted when librdkafka reports stats (if you opted in).
event.throttle The event.throttle event emitted when librdkafka reports throttling.
delivery-report The delivery-report event is emitted when a delivery report has been found via polling.

To use this event, you must set request.required.acks to 1 or -1 in topic configuration and dr_cb to true in the Producer constructor options.

Kafka.KafkaConsumer

To read messages from Kafka, you use a KafkaConsumer. You instantiate a KafkaConsumer object as follows:

var consumer = new Kafka.KafkaConsumer({
  'group.id': 'kafka',
  'metadata.broker.list': 'localhost:9092',
}, {});

The first parameter is the global config, while the second parameter is the topic config that gets applied to all subscribed topics. To view a list of all supported configuration properties, see the Configuration.md file described previously. Look for the C and * keys.

The group.id and metadata.broker.list properties are required for a consumer.

Message Structure

Messages that are returned by the KafkaConsumer have the following structure.

{
  message: new Buffer('hi'), // message contents as a Buffer
  size: 2, // size of the message, in bytes
  topic: 'librdtesting-01', // topic the message comes from
  offset: 1337, // offset the message was read from
  partition: 1 // partition the message was on
}

Stream API

The stream API is the easiest way to consume messages. The following example illustrates the use of the stream API:

// Read from the librdtesting-01 topic... note that this creates a new stream on each call!
var stream = consumer.getReadStream('librdtesting-01');

stream.on('data', function(data) {
  console.log('Got message');
  console.log(data.message.toString());
});

Standard API

You can also use the Standard API and manage callbacks and events yourself. You can choose different modes for consuming messages:

  • Flowing mode. This mode flows all of the messages it can read by maintaining an infinite loop in the event loop. It only stops when it detects the consumer has issued the unsubscribe or disconnect method.
  • Non-flowing mode. This mode reads a single message from Kafka at a time manually.

The following example illustrates flowing mode:

// Flowing mode
consumer.connect();

consumer
  .on('ready', function() {
    // Consume from the librdtesting-01 topic. This is what determines
    // the mode we are running in. By consuming an entire topic,
    // we will get messages from that topic as soon as they are available
    consumer.consume('librdtesting-01');
  })
  .on('data', function(data) {
    // Output the actual message contents
    console.log(data.message.toString());
  });

The following example illustrates non-flowing mode:

// Non-flowing mode
consumer.connect();

consumer
  .on('ready', function() {
    // Subscribe to the librdtesting-01 topic
    // This makes subsequent consumes read from that topic.
    consumer.subscribe('librdtesting-01');

    // Read one message every 1000 seconds
    setInterval(function() {
      consumer.consume();
    }, 1000);
  })
  .on('data', function(data) {
    console.log('Message found!  Contents below.');
    console.log(data.message.toString());
  });

The following table lists important methods for this API.

Method Description
consumer.connect() Connects to the broker.

The connect() emits the event ready when it has successfully connected, or an error when it has not.
consumer.disconnect() Disconnects from the broker.

The disconnect() method emits disconnected when it has disconnected or error if something went wrong.
consumer.subscribe(topics, callback) Subscribes to an array of topics.

topics can be either an array or a string for a single topic.
consumer.unsubscribe() Unsubscribes from the currently subscribed topics.

You cannot subscribe to different topics without calling the unsubscribe() method first.
consumer.consume(cb) Gets a message from the existing subscription.
consumer.consume(topics, cb) Creates a subscription and get messages as they become available.

The consume() method keeps a background thread running to do the work.

The following table lists events for this API.

Event Description
error Error reporting is handled through this pipeline.

Most errors will have a code, message, and origin value. The origin value will be local or remote to determine where the error happened.
disconnected The disconnected event is emitted when the broker disconnects.

This event is only emitted when .disconnect is called. The wrapper will always try to reconnect otherwise.
ready The ready event is emitted when the Producer is ready to send messages.
event The event event is emitted when librdkafka reports an event (if you opted in via the event_cb option).
event.log The event.log event is emitted when logging events occur (if you opted in for logging via the event_cb option).

You will need to set a value for debug if you want information to send.
event.status The event.status event is emitted when librdkafka reports stats (if you opted in).
event.throttle The event.throttle event is emitted when librdkafka reports throttling.

Metadata

Both Kafka.Producer and Kafka.KafkaConsumer include a getMetadata method to retrieve metadata from Kafka.

Getting metadata on any connection returns the following data structure:

{
  orig_broker_id: 1,
  orig_broker_name: "broker_name",
  brokers: [
    {
      id: 1,
      host: 'localhost',
      port: 40
    }
  ],
  topics: [
    {
      name: 'awesome-topic',
      partitions: [
        {
          id: 1,
          leader: 20,
          replicas: [1, 2],
          isrs: [1, 2]
        }
      ]
    }
  ]
}

The following example illustrates how to use the getMetadata method.

var opts = {
  topic: 'librdtesting-01',
  timeout: 10000
};

producer.getMetadata(opts, function(err, metadata) {
  if (err) {
    console.error('Error getting metadata');
    console.error(err);
  } else {
    console.log('Got metadata');
    console.log(metadata);
  }
});