@vonage/kafka-glue

kafka consumers and producers integrated with AWS Glue schema registry with RXJS for streaming

Usage no npm install needed!

<script type="module">
  import vonageKafkaGlue from 'https://cdn.skypack.dev/@vonage/kafka-glue';
</script>

README

kafka-glue

Kafka Glue is a collection of libraries for kafka consumers and producers integrated with AWS Glue schema registry with RXJS to expose streaming of the kafka messages.

Usage Examples

Producer

Create the instance

const producer = new Producer<{type: string}, string>({
    schema: {
      region: 'us-east-1',
      valueParserProtocol: 'avro',
      keyParserProtocol: 'string',
      valueSchemaConfig: {
        SchemaId: {
          RegistryName: '<name>',
          SchemaName: '<name>'
        },
        SchemaVersionNumber: {
          LatestVersion: true
        }
      },
      keySchemaConfig: {
        encoding: 'utf-8'
      }
    },
    kafka: {
      topic: '<name>',
      pullInterval: 300,
      topicConfig: {},
      globalConfig: {
        // 'debug': 'producer,cgrp,topic,fetch',
        'log_level': 3,
        'security.protocol': 'ssl',
        'metadata.broker.list': '<list>'
      }
    }
  });

(Optional) Set callback for the on ready event

producer.onReady = (info, metadata) => {
    // console.log(info, metadata);
  };

Initialize the instance (make sure to use await as this is an async function)

  await producer.init();

Add subscribers for errors, logs, and delivery reports

producer.logs$.subscribe(log => {
    console.warn({ log });
  });
  producer.errors$.subscribe(err => {
    console.error(err);
  });
  producer._deliveryReport.subscribe(report => {
    console.log({ report });
  });
  producer._offsetReport.subscribe(offset => {
    console.log(`Offset: ${offset}`);
  });

Produce messages! 🥳

producer.produce({type: 'test'}, 'test', Date.now());
producer.produce({type: 'test2'}, 'test', Date.now());

please refer Node JS Producer Example for the full example.


Consumer

Create the instance

const consumer = new Consumer({
    schema: {
      region: 'us-east-1',
      valueParserProtocol: 'avro',
      keyParserProtocol: 'string',
      valueSchemaConfig: {
        SchemaId: {
          RegistryName: '----',
          SchemaName: '----'
        },
        SchemaVersionNumber: {
          LatestVersion: true
        }
      },
      keySchemaConfig: {
        encoding: 'utf-8'
      }
    },
    kafka: {
      topics: ['test'],
      topicConfig: {
        'auto.offset.reset': 'earliest'
      },
      globalConfig: {
        'enable.auto.offset.store': false,
        'enable.auto.commit': false,
        'group.id': '<id>',
        'security.protocol': 'ssl',
        'metadata.broker.list': '<list>'
      }
    }
  });

(Optional) Set callback for the on ready event

// set callback that will be fired once kafkaClient is ready to subscribe
  consumer.onReady = (info, metadata) => {
    consumer.kafkaClient.assign([{ topic: 'test', partition: 0, offset: 1 }]);
  };

Initialize the instance (make sure to use await as this is an async function)

  await consumer.init();

Add subscribers for errors, logs, and incoming messages

consumer.logs$.subscribe(log => {
    // console.log(log.message);
  });
  consumer.errors$.subscribe(err => {
    console.error(err);
  });
  consumer.messages$.subscribe(msg => {
    console.log(msg);
  });

Start the consumer 🥳

consumer.consume();

please refer Node JS Consumer Example for the full example.