ts-reactivekafka

Typesafe & reactive Kafka Subjects made with Typescript and RxJS

Usage no npm install needed!

<script type="module">
  import tsReactivekafka from 'https://cdn.skypack.dev/ts-reactivekafka';
</script>

README

ts-reactivekafka

Typesafe & reactive Kafka Subjects made with Typescript, KafkaJS and RxJS.

Quickstart

npm i ts-reactivekafka

single consumer

import rxkafka from "ts-reactivekafka";
import { tap } from "rxjs/operators";

(async () => {
    let kafkaConfig = {
        kafkaHost: process.env.KAFKA_HOST || "url:9092",
        serviceId: "test",
        logAllEvents: false,
        ssl: true,
        sasl: {
            mechanism: "plain",
            username: process.env.KAFKA_USER || "xxuser",
            password: process.env.KAFKA_PASS || "xxapitoken",
        },
    };

    // singleton is instantiated
    rxkafka({
        ...kafkaConfig,
        consumerConfig: {
            topics: ["topicA", "topicB", "topicC"],
            consumerId: "testConsumer",
        },
        producerConfig: {
            topics: ["target"],
        },
    });

    // NOTE: multiple invocations of rxkafka result in the original singleton
    let kafka = rxkafka();

    kafka.consumer
        .getSubject()
        .pipe(
            tap((event) => console.log(event.message)) // rxjs operators
        )
        .subscribe(kafka.producer.getSubject());
})();

consumer group with dynamic import

import { merge } from "rxjs";

(async () => {
    const kafkaConfig = {
        kafkaHost: process.env.KAFKA_HOST || "url:9092",
        serviceId: "test",
        logAllEvents: false,
        ssl: true,
        sasl: {
            mechanism: "plain",
            username: process.env.KAFKA_USER || "xxuser",
            password: process.env.KAFKA_PASS || "xxapitoken",
        },
    };

    const kafkaProducerSingleton = (await import("ts-reactivekafka"))({
        ...kafkaConfig,
        producerConfig: {
            topics: ["target"],
        },
    });

    const kafkaGroupConsumer1 = (await import("ts-reactivekafka"))({
        ...kafkaConfig,
        consumerConfig: {
            topics: ["topicA", "topicB", "topicC"],
            consumerGroupId: "consumerGroupX", // NOTE: consumerGroupId instead of consumerId
        },
    });

    const kafkaGroupConsumer2 = (await import("ts-reactivekafka"))({
        ...kafkaConfig,
        consumerConfig: {
            topics: ["topicA", "topicB", "topicC"],
            consumerGroupId: "consumerGroupX",
        },
    });

    const kafkaGroupConsumer3 = (await import("ts-reactivekafka"))({
        ...kafkaConfig,
        consumerConfig: {
            topics: ["topicA", "topicB", "topicC"],
            consumerGroupId: "consumerGroupX",
        },
    });

    const consumerGroup = merge(
        kafkaGroupConsumer1.consumer.getSubject(),
        kafkaGroupConsumer2.consumer.getSubject(),
        kafkaGroupConsumer3.consumer.getSubject()
    );

    consumerGroup.subscribe(kafkaProducerSingleton.producer.getSubject());
})();

The Code

The code follows a Singleton Pattern* in which an optional consumer and producer instance are made available given the respective consumer and producer configurations and given a general Kafka config. The consumer or producer instance is only instantiated when their respective configs are well-defined.

The only public methods for both instances are:

  • getSubject() which returns the RxJs Subject (a multicasted observable)
  • disconnect() which disconnects the Kafka consumer/producer and completes the subject

The subjects are typed as follows:

type Event = {
    key?: string | null
    value: any
    partition?: number
    headers?: IHeaders
    timestamp?: string
}

Subject<{topic: string, message: Event}>

Since, in Typescript, no basic JSON type exists yet the value is typed as any. Otherwise value could have been typed as JSON | string.

Reactive Operators

RxJs offers a vast range operators in which developers can construct fast & flexible functionally reactive programming constructs. A good overview of the operators can be found here:

Optimized module thanks to Vercel Ncc

Vercel's ncc provides NPM developers with the ultimate NPM module compilation toolkit:

  • Publish minimal packages to npm
  • Only ship relevant app code to serverless environments
  • Don't waste time configuring bundlers
  • Generally faster bootup time and less I/O overhead
  • Compiled language-like experience (e.g.: go)

Sponsored by Charp

This package is used in production and maintained by the developers of Charp.