@specialblend/kafka-pipe

A functional/fluent utility for kafka, built on top of kafka-node.

Usage no npm install needed!

<script type="module">
  import specialblendKafkaPipe from 'https://cdn.skypack.dev/@specialblend/kafka-pipe';
</script>

README

kafka-pipe

A functional/fluent utility for kafka, built on top of kafka-node.

Install

npm install @specialblend/kafka-pipe

Classes

PipeConsumer

Callable kafka Consumer with pipe and error helper methods

PipeProducer

Callable kafka Producer when instance is called directly, acts like PipeProducer.send

PipeSender

Callable kafka PipeProducer which allows presetting a destination topic and options

PipeTransformer

Consumer/producer mixin that pipes messages from sourceTopic into transformer function, and sends result to destinationTopic, or deadLetterTopic on error

Client

Kafka Client

Constants

createConsumerPipeConsumer

Curried factory of PipeConsumer

createProducerPipeProducer

Curried factory of PipeProducer

createSenderPipeSender

Curried factory of PipeProducer

createTransformerPipeTransformer

Curried factory of PipeTransformer

PipeConsumer

Callable kafka Consumer with pipe and error helper methods

Kind: global class

pipeConsumer.pipe(handler) ⇒ PipeConsumer

Pipe incoming messages to provided handler

Kind: instance method of PipeConsumer
Returns: PipeConsumer - self

Param Type Description
handler function message handler function

pipeConsumer.error(handler) ⇒ PipeConsumer

Alias for this.on('error')

Kind: instance method of PipeConsumer
Returns: PipeConsumer - self

Param Type Description
handler function error handler function

pipeConsumer.__call__(handler) ⇒ PipeConsumer

Make instance callable alias of this.pipe

Kind: instance method of PipeConsumer
Returns: PipeConsumer - self

Param Type Description
handler function message handler function

PipeProducer

Callable kafka Producer when instance is called directly, acts like PipeProducer.send

Kind: global class

new PipeProducer(client, options)

Create

Param Type Description
client Client kafka client
options Object opyions

pipeProducer.send(payload) ⇒ Promise.<*>

Send a payload

Kind: instance method of PipeProducer
Returns: Promise.<*> - result

Param Type Description
payload Array.<String> payload

pipeProducer.__call__(payload) ⇒ Promise.<*>

Make instance callable alias of this.send

Kind: instance method of PipeProducer
Returns: Promise.<*> - result

Param Type Description
payload Array.<String> payload

PipeSender

Callable kafka PipeProducer which allows presetting a destination topic and options

Kind: global class

new PipeSender(client, topic, payloadOptions, producerOptions)

Curry topic and payload options

Param Type Description
client Client kafka client
topic String kafka topic name
payloadOptions Object options to include with outgoing payloads
producerOptions Object producer options

pipeSender.send(messages) ⇒ Promise.<*>

Send messages to preset topic, with preset options

Kind: instance method of PipeSender
Returns: Promise.<*> - returned Promise

Param Type Description
messages Array.<String> an array of messages to send

pipeSender.__call__(payload) ⇒ Promise.<*>

Make instance callable alias of this.send

Kind: instance method of PipeSender
Returns: Promise.<*> - result

Param Type Description
payload Array.<String> payload

PipeTransformer

Consumer/producer mixin that pipes messages from sourceTopic into transformer function, and sends result to destinationTopic, or deadLetterTopic on error

Kind: global class

new PipeTransformer(transformer, client, sourceTopic, destinationTopic, deadLetterTopic)

create a PipeTransformer

Param Type Description
transformer function the transformer function
client Client kafka Client
sourceTopic String name of topic to read from
destinationTopic String name of topic to send to
deadLetterTopic String name of topic to send failed payloads

Client

Kafka Client

Kind: global class

new Client(kafkaHost, options)

Create a kafka Client

Param Type Description
kafkaHost String kafka host
options Object options

createConsumer ⇒ PipeConsumer

Curried factory of PipeConsumer

Kind: global constant

createProducer ⇒ PipeProducer

Curried factory of PipeProducer

Kind: global constant

createSender ⇒ PipeSender

Curried factory of PipeProducer

Kind: global constant

createTransformer ⇒ PipeTransformer

Curried factory of PipeTransformer

Kind: global constant