README
kafka-pipe
A functional/fluent utility for kafka, built on top of kafka-node
.
Install
npm install @specialblend/kafka-pipe
Classes
- PipeConsumer
Callable kafka Consumer with pipe and error helper methods
- PipeProducer
Callable kafka Producer when instance is called directly, acts like PipeProducer.send
- PipeSender
Callable kafka PipeProducer which allows presetting a destination topic and options
- PipeTransformer
Consumer/producer mixin that pipes messages from
sourceTopic
intotransformer
function, and sends result todestinationTopic
, ordeadLetterTopic
on error- Client
Kafka Client
Constants
- createConsumer ⇒
PipeConsumer
Curried factory of PipeConsumer
- createProducer ⇒
PipeProducer
Curried factory of PipeProducer
- createSender ⇒
PipeSender
Curried factory of PipeProducer
- createTransformer ⇒
PipeTransformer
Curried factory of PipeTransformer
PipeConsumer
Callable kafka Consumer with pipe and error helper methods
Kind: global class
PipeConsumer
pipeConsumer.pipe(handler) ⇒ Pipe incoming messages to provided handler
Kind: instance method of PipeConsumer
Returns: PipeConsumer
- self
Param | Type | Description |
---|---|---|
handler | function |
message handler function |
PipeConsumer
pipeConsumer.error(handler) ⇒ Alias for this.on('error')
Kind: instance method of PipeConsumer
Returns: PipeConsumer
- self
Param | Type | Description |
---|---|---|
handler | function |
error handler function |
PipeConsumer
pipeConsumer.__call__(handler) ⇒ Make instance callable alias of this.pipe
Kind: instance method of PipeConsumer
Returns: PipeConsumer
- self
Param | Type | Description |
---|---|---|
handler | function |
message handler function |
PipeProducer
Callable kafka Producer when instance is called directly, acts like PipeProducer.send
Kind: global class
- PipeProducer
- new PipeProducer(client, options)
- .send(payload) ⇒
Promise.<*>
- .call(payload) ⇒
Promise.<*>
new PipeProducer(client, options)
Create
Param | Type | Description |
---|---|---|
client | Client |
kafka client |
options | Object |
opyions |
Promise.<*>
pipeProducer.send(payload) ⇒ Send a payload
Kind: instance method of PipeProducer
Returns: Promise.<*>
- result
Param | Type | Description |
---|---|---|
payload | Array.<String> |
payload |
Promise.<*>
pipeProducer.__call__(payload) ⇒ Make instance callable alias of this.send
Kind: instance method of PipeProducer
Returns: Promise.<*>
- result
Param | Type | Description |
---|---|---|
payload | Array.<String> |
payload |
PipeSender
Callable kafka PipeProducer which allows presetting a destination topic and options
Kind: global class
- PipeSender
- new PipeSender(client, topic, payloadOptions, producerOptions)
- .send(messages) ⇒
Promise.<*>
- .call(payload) ⇒
Promise.<*>
new PipeSender(client, topic, payloadOptions, producerOptions)
Curry topic and payload options
Param | Type | Description |
---|---|---|
client | Client |
kafka client |
topic | String |
kafka topic name |
payloadOptions | Object |
options to include with outgoing payloads |
producerOptions | Object |
producer options |
Promise.<*>
pipeSender.send(messages) ⇒ Send messages to preset topic, with preset options
Kind: instance method of PipeSender
Returns: Promise.<*>
- returned Promise
Param | Type | Description |
---|---|---|
messages | Array.<String> |
an array of messages to send |
Promise.<*>
pipeSender.__call__(payload) ⇒ Make instance callable alias of this.send
Kind: instance method of PipeSender
Returns: Promise.<*>
- result
Param | Type | Description |
---|---|---|
payload | Array.<String> |
payload |
PipeTransformer
Consumer/producer mixin that
pipes messages from sourceTopic
into transformer
function,
and sends result to destinationTopic
,
or deadLetterTopic
on error
new PipeTransformer(transformer, client, sourceTopic, destinationTopic, deadLetterTopic)
create a PipeTransformer
Param | Type | Description |
---|---|---|
transformer | function |
the transformer function |
client | Client |
kafka Client |
sourceTopic | String |
name of topic to read from |
destinationTopic | String |
name of topic to send to |
deadLetterTopic | String |
name of topic to send failed payloads |
Client
Kafka Client
new Client(kafkaHost, options)
Create a kafka Client
Param | Type | Description |
---|---|---|
kafkaHost | String |
kafka host |
options | Object |
options |
PipeConsumer
createConsumer ⇒ Curried factory of PipeConsumer
PipeProducer
createProducer ⇒ Curried factory of PipeProducer
PipeSender
createSender ⇒ Curried factory of PipeProducer
PipeTransformer
createTransformer ⇒ Curried factory of PipeTransformer
Kind: global constant