redis-streams-broker

This package is a broker to redis stream data type, This package provides guaranteed message delivery feature with acknowledgement.

Usage no npm install needed!

<script type="module">
  import redisStreamsBroker from 'https://cdn.skypack.dev/redis-streams-broker';
</script>

README

redis-streams-broker

This package is based on redis stream data type which provides you with following features

  1. Broker to redis stream which can be used as centralized que between microservices. (Using Redis)
  2. Support for injectable redis client (be it ioredis or redis)
  3. Guarantee of message delivery via consumer acknowledgements.
  4. Consumer Group functionality for scalability. (Just like Kafka)
  5. Option to drop a message when its acked, thus keeping memory footprint in check.

Getting Started

  1. Install using npm -i redis-streams-broker
  2. Require in your project. const brokerType = require('redis-streams-broker').StreamChannelBroker;
  3. Run redis on local docker if required. docker run --name streamz -p 6379:6379 -itd --rm redis:latest
  4. Instantiate with a redis client and name for the stream. const broker = new brokerType(redisClient, name);
  5. All done, Start using it!!.

Examples/Code snippets

  1. Please find example code for injectable ioredis client here
  2. Please find example code for injectable custom client here
  3. Please find multi threading examples here
  4. Please find async processing examples here
const Redis = require("ioredis");
const redisConnectionString = "redis://127.0.0.1:6379/";
const qName = "Queue";
const redisClient = new Redis(redisConnectionString);
const brokerType = require('redis-streams-broker').StreamChannelBroker;
const broker = new brokerType(redisClient, qName);

//Used to publish a paylod on stream.
const payloadId = await broker.publish({ a: "Hello", b: "World" }); 

//Creates a consumer group to receive payload
const consumerGroup = await broker.joinConsumerGroup("MyGroup"); 

//Registers a new consumer with Name and Callback for message handlling.
const subscriptionHandle = await consumerGroup.subscribe("Consumer1", newMessageHandler); 

// Handler for arriving Payload
async function newMessageHandler(payloads) {
    for (let index = 0; index < payloads.length; index++) {
        try {
            const element = payloads[index];
            console.log("Payload Id:", element.id); //Payload Id
            console.log("Payload Received from :", element.channel); //Stream name
            console.log("Actual Payload:", element.payload); //Actual Payload
            await element.markAsRead(); //Payload is marked as delivered or Acked also optionaly the message can be dropped.
        }
        catch (exception) {
            console.error(exception);
        }
    }
}

//Provides summary of payloads which have delivered but not acked yet.
const summary = await consumerGroup.pendingSummary();

//Unsubscribes the consumer from the group.
const sucess = consumerGroup.unsubscribe(subscriptionHandle); 

//Amount of memory consumed by this stream in bytes.
const consumedMem = await broker.memoryFootprint();

Built with

  1. Authors :heart for Open Source.
  2. nanoid for auto generating subscribtion handles.
  3. redis-scripto2 for handling lua scripts.

Contributions

  1. New ideas/techniques are welcomed.
  2. Raise a Pull Request.

Current Version:

0.0.13[Beta]

License

This project is contrubution to public domain and completely free for use, view LICENSE.md file for details.

API

Class StreamChannelBroker

  1. constructor(redisClient: any, channelName: string)

    Creates a broker instance.

    redisClient: Injectable redis client which will be used to send commands to redis server.

    channelName: Name of the stream key, if this doesnot exists it will be created on first push or group subscription.

  2. publish(payload: any, maximumApproximateMessages?: number, failOnMaxMessageCount:boolean): Promise<string>;

    Publishes provided message into the stream and returns id generated by server.

    payload: A JS object containing properties which are passed as key values pairs.

    maximumApproximateMessages: Appropiate length of the stream it is equal to ~ MAXLENGTH option in redis. Defaulted to 100, If negative number is passed then it behaves as non capped stream.

    failOnMaxMessageCount: if maximumApproximateMessages is positive number and failOnMaxMessageCount is set to true then it will only publish messages untill it reaches the maximum count post that it will start failling by returning null as message id, default value is false.

  3. joinConsumerGroup(groupName: string, readFrom: string): Promise<ConsumerGroup>

    Creates a consumer group on the given redis stream with information provided, if the group exists does nothing returning a ConsumerGroup object.

    groupName: Name of the group to be created ot joined.

    readFrom: Id of the mesage to start reading from. defaulted to $ to only read new messages recevied on redis, check redis docs for more info.

  4. memoryFootprint(): Promise<number>

    Returns number of bytes consumed by the current stream.

  5. destroy(): Promise<boolean>;

    Starts to unsubscribe all the handles that were subscribed to this instance.

Class ConsumerGroup

  1. subscribe(consumerName: string, handler: (payload: Payload[]) => Promise<boolean>, pollSpan?: number, payloadsToFetch?: number, subscriptionHandle?: string, readPending?: boolean): Promise<string>

    Subscribes to stream to start receiving events when new payload arrives, this internally creates a polling system to check for new messages in stream. returns subscription name.

    consumerName: Name of the consumer who is subscribing via the consumer group object.

    handler: A callback function which will be invoked when new message a.k.a payload(s) arrive. Should be of signature (payload: Payload[]) => Promise<number> should be async & return from this function is number of messages to fetch from redis(expected +ve number; -ve or 0 will unsubscribe from the group stopping all further reads from stream,if NAN then defaults to number provided when subscribing), look at Payload class below for more details.

    pollSpan: Number of millisecond to wait after completion of handler to check for next available message in stream. Defaulted to 1000 milliseconds.

    payloadsToFetch: Maximum number of messages to fetch in one poll to server this is simillar to COUNT command in redis, this is optional and defaulted to 2.

    subscriptionHandle: Name for subscription handler this is what will be returned from the function, this is defaulted to unique shortid.

    readPending: If set to true will read all messages from start of the stream ie: Id = 0 which are in pending list of this consumer and group, once all pending are read it will automatically switch to latest messages from the stream. If set to false it will always look for new message from the stream, this is defaulted to false.

  2. unsubscribe(subscriptionHandle: string): Promise<boolean>

    Unsubscribes from the stream for the given subscriptionhandle, returns true for sucess and false for failure.

    subscriptionHandle: Name of the subscription handle which was returned by subscribe api.

  3. pendingSummary(): Promise<GroupSummary>

    Returns details of the pending items for the given group by exposing GroupSummary object.

Class Payload

  1. channel: string: Name of the stream key in redis.

  2. id: string: Id of the message being received.

  3. payload: any: Actual payload to processs.

  4. markAsRead(deleteMessage?: boolean): Promise<boolean>

    This function helps to ack the payload as read or processed, returns status of the operation via boolean return type true indicating success.

    deleteMessage: if set to true it will ack & delete the message from the stream if set to false will only ack the message defaulted to false.

Class GroupSummary

  1. total: number: This is the total number of messages in pending list.
  2. firstId: string: Id of the first message which is pending.
  3. lastId: string: Id of the last message which is pending.
  4. consumerStats: any: Extra information provided by XPENDING command.