kafkalytics-node

This is the NodeJS plugin for Kafkalytics.

Usage no npm install needed!

<script type="module">
  import kafkalyticsNode from 'https://cdn.skypack.dev/kafkalytics-node';
</script>

README

Kafkalytics for NodeJS

This is the NodeJS plugin for Kafkalytics.

codeclimate gemnasium npm version npm license

Releases

Kafkalytics and its plugins are still in a beta phase and are not recommended to depend upon in production. APIs may be subject to change without notice or grace periods.

Releases for this plugin can be found on npm and on Github.

How to use

Installation

The plugin is available from NPM. The only dependency is the kafka-node plugin. Since you are already using Kafka, this means that most likely no additional dependencies will be required.

Install the plugin using npm i kafkalytics-node --save.

Usage

The plugin can easily be required or imported.

import Kafkalytics from 'kafkalytics-node';    // ES 6
var Kafkalytics = require('kafkalytics-node'); // ES 5

The next step is to initialize and configure the plugin. This happens in the same step, using a configuration object. This object has the following form:

var kafkalyticsOptions = {
  service: "Name of this service",                            // required, string
  zookeeper_address: "Zookeepers address",                    // required, string
  kafka_clientId: "This service client ID for Kafka",         // required, string
  zookeeper_clientOptions: {},                                // optional, object, zookeeper client options (default empty object)
  loggingTopic: "the topic kafkalytics server is running on", // required, string
  threshold: 100                                              // optional, integer, the number of event to batch before sending (default 25)
};

The complete configuration and initialization then looks like this:

var kafkalytics = Kafkalytics(kafkalyticsOptions);

Notes on closing applications!

Since Kafkalytics will batch send events (unless you set the threshold to 1), upon closing some events will not have been send yet. Therefore, in your shutdown hook (e.g. process.on('exit') or process.on('SIGINT')), ensure you call kafkalytics.close() to drain the queue.

API

send(messageId, topic)

This sends a message to your Kafkalytics server to save the message as read. The messageId is location within your message, on message.id. See the Kafkalytics docs for more information about creating messageIds.

options()

Returns the options object which holds the currently used configuration.

close(callback)

Closes the plugin for new messages (meaning that calls to send() will now fail). It then drains the queues, and shuts the producer and client for Kafka down. Once done, it calls the callback (or nothing if the callback is not supplied).

forceFlush()

Force a flush to Kafkalytics. This method is also called by close(), and generally you don't need to call it. However, in systems which consume very little messages, you might want to call this periodically if flushes are queued for too long.

Ecmascript 5

For compatibility reasons, and to prevent the need for transpiling, the plugin is written in Ecmascript 5.1. This allows both ES5 and ES6 project to adopt the plugin without problems.