@ontic-ai/nk-kafka-node

A wrapper to enforce work-arounds for working with kafka via node in a dockerized env

Usage no npm install needed!

<script type="module">
  import onticAiNkKafkaNode from 'https://cdn.skypack.dev/@ontic-ai/nk-kafka-node';
</script>

README

nk-kafka-node

Motivations

Working with Kafka in Node can be difficult. There's plenty of libraries to use that are actively maintained. However they fall short in a couple of places:

  • In containerized environments where containers are containers are coming up asynchronously, a few checks need to happen before consumers are connected to Kafka and before producers should start producing messages.

  • In instances where you have consumers subscribed to a topic and a producer publishing to the same topic, you need to be assured that the topic is created ahead of time in the case that the consumer readys before the producer.

  • kafka-node simplifies a lot of complexity, but has known issues around the zookeeper client.

  • Most clients leave you in callback chains as you proactively try to prevent faults.

To address these, this helper library has been created.

Usage

KafkaClient

Internally, nk-kafka-node uses KafkaClient from kafak-node. One client per producer/consumer is made at the time of producer/consumer creation. You can configure clients with the method configureClient(clientConfig) and pass all the same options as you can to KafkaClient.

The default configuration is:

const clientConfig = {
  kafkaHost: process.env.KAFKA_HOST || 'kafka:9092',
  connectRetryOptions: {
    retries: 20,
    factor: 3,
    minTimeout: 1000,
    maxTimeout: 30000
  }
};

If you only need to set the kafkaHost you can use the environment variable KAFKA_HOST.

Creating Consumers

After setting your client config, you can create consumers with createConsumer(topic, [groupId], [optionsOverride]). GroupId is optional, but is recommended. Without groupId provided, the consumer will be assigned a uuid. optionsOverride merges with the default options set which are the following:

const consumerOptions = {
  autoCommit: true,
  fromOffset: false,
  encoding: 'utf8'
};

createConsumer extends kafka-node's consumer. It has the same events, methods, and options available to it. createConsumer checks if topics that are being subscribed to exist, and tries to create them with kafka-node's createTopics.

Note: In order to take advantage of creating topics proactively, set the equivalent environment variable or setting in your kafka instance AUTO_CREATE_TOPICS_ENABLE to true. For "ches/kafka" that is KAFKA_AUTO_CREATE_TOPICS_ENABLE=true

Example:

const Kafka = require('nk-kafka-node');

Kafka.createConsumer([{topic: 'my-topic1'}, {topic: 'my-topic12'}], 'app-my-topics-consumer')
  .then((consumer) => {
    consumer.on('message', (msg) => {
      // Handle message
    });
    consumer.on('error', (err) => {
      // Handle error from connected consumer
      consumer.close(); // Closing on error is a suggestion
    });
  })
  .catch(err => {
    // Handle error connecting consumer
  });

Creating and Using Producers

After setting your client config, you can create producers with createProducer(). createProducer extends kafka-node's producer. It has the same events and methods available to it as well as an extra method.

The producer that is resolved by createProducer has a method called produce on it. produce ensures that a leader node has been elected before it attempts to publish a set of messages as well as setting some options.

Example:

const Kafka = require('nk-kafka-node');

Kafka.createProducer()
  .then((producer) => {
    producer.produce('my-topic1', [
      JSON.stringify({
        body: {
          user: 'Jeff'
          rug: null
        }
      }),
      'plain string',
      Buffer.from('boat string')
    ]);
    producer.on('error', (err) => {
      // Handle connected producer error
    })
  })
  .catch((err) => {
    // Handle creating/connecting producer
  })