exframe-mq

Messaging framework module

Usage no npm install needed!

<script type="module">
  import exframeMq from 'https://cdn.skypack.dev/exframe-mq';
</script>

README

exframe Messaging Framework Module

A simplified abstraction for RabbitMQ.

Features and Assumptions

  • Persistant and shared connections
  • Assumes JSON payload
  • Default connection settings

Available Topologies

  • Publish/Subscribe

Usage

const mq = require("exframe-mq");
const rabbitmq = mq.create({
  logger,

  url: 'amqp://localhost',
  heartbeat: 30,
  baseTimeout: 500,
  maxAttempts: 5,
  responseTimeout: 60 * 1000
});

Configuration Settings

  • logger - See winston logger for interface of the logger. A default implementation will write to the stdout
  • url - default: 'amqp://localhost' The url for the rabbitmq server
  • heartbeat - default: 30 Time in seconds for a heartbeat between the client code and the rabbitmq server. Basically, keeps the connection alive with inactivity particularly if the connection will be closed by external mechanisms like Amazon's ELB.
  • baseTimeout - default: 500 Time in milliseconds for waiting between connection failures
  • maxAttempts - default: 190 Max number of attempts to try connecting to the rabbitmq server.
  • responseTimeout - default: 60000 Time in milliseconds for re-establishing a connection with the rabbitmq server after inactivity.

Environment Variable Configuration Settings

  • MQ_MAX_ATTEMPTS - default: 190 Max number of attempts to try and connect to the rabbitmq server.
  • MQ_EXIT_ON_FAILURE - default: false Whether or not the application should fail on FIRST attempt at message queue connection.

Methods

getConnection

Gets the connection used to communicate to the rabbitmq server with.

const connection = await mq.getConnection();

client

Creates a client to the configured rabbitmq server. For subscribing to and publishing messages.

const client = mq.client(options);

Arguments:

  • options - optional object
    • eventing - optional object The configuration rpc publishing events to the eventing microservice.
      • exchangeName - string The exchange to publish events to the eventing microservice
      • routingKey - string The routing key to route events to the eventing microservice

Returns:

  • Client

Client

subscribe

Subscribes to a given exchange and routing key pattern. Will ensure the existence of the exchange and the queue matching on the routing key pattern with the given options.

The subscribe method will take a series of middleware for error handling and message processing. Middleware are expected to return promises that resolve with or without values.

client.subscribe(exchangeName, routingKey, async (context, message) => {
  // do something with the message
});

for recieving some extra params in callback use default params default param can be null or empty object or we can set any value

client.subscribe(exchangeName, routingKey, async (context, message, extraParams = {}) => {
  // do something with the message
});

Arguments:

  • exchangeName - string The exchange to publish the message to
  • routingKey - string The routing key for the exchange, governs which subscription to send the message to
  • message - object The message passed from the server. If a context field was part of the message, the contents of the object will have been merged into the context object and the field will be removed from the message.
  • ...middleware - One or more message or error handling functions. See middleware functions
  • options - optional, object
    • exchangeType - string, default: 'topic' The type of exchange the message will be published to see rabbitmq documentation. Valid values: 'topic', 'fanout', 'direct'
    • exchangeOptions - object options for the creation of the exchange see amqplib documentation
      • Default Options
        • durable - boolean, default: true
    • publishOptions - object options for the publishing of the message see amqplib documentation
      • Default Options
        • mandatory - boolean, default: true
    • exclusiveQueue - boolean, default: false If true, the created queue will be exclusive to that connection / channel. This is generally used for short lived temporary queues that need to handle an ephemeral set of messages. Closing the subscription will close this queue.

publish

Publishes a message to a given exchange and routing key. If the exchange does not exist, will create the exchange with the given options.

await client.publish(context, exchangeName, routingKey, message, options)

Arguments:

  • context - See context object
    • expects log
  • exchangeName - string The exchange to publish the message to
  • routingKey - string The routing key for the exchange, governs which subscription to send the message to
  • message - object The message passed from the server. If a context field was part of the message, the contents of the object will have been merged into the context object and the field will be removed from the message.
  • options - optional, object
    • exchangeType - string, default: 'topic' The type of exchange the message will be published to see rabbitmq documentation. Valid values: 'topic', 'fanout', 'direct'
    • exchangeOptions - object options for the creation of the exchange see amqplib documentation
      • Default Options
        • durable - boolean, default: true
    • publishOptions - object options for the publishing of the message see amqplib documentation
      • Default Options
        • mandatory - boolean, default: true

rpc

RPCs to a given exchange and routing key. If the exchange does not exist, will create the exchange with the given options. Will resolve or reject with the response from the subscribed client. If the subscribing service takes too long, rpc will reject with a timeout error.

const response = await client.rpc(context, exchangeName, routingKey, message, options)

Arguments:

  • context - See context object
    • expects log
  • exchangeName - string The exchange to publish the message to
  • routingKey - string The routing key for the exchange, governs which subscription to send the message to
  • message - object The message passed from the server. If a context field was part of the message, the contents of the object will have been merged into the context object and the field will be removed from the message.
  • options - optional, object
    • exchangeType - string, default: 'topic' The type of exchange the message will be published to see rabbitmq documentation. Valid values: 'topic', 'fanout', 'direct'
    • exchangeOptions - object options for the creation of the exchange see amqplib documentation
      • Default Options
        • durable - boolean, default: true
    • publishOptions - object options for the publishing of the message see amqplib documentation
      • Default Options
        • mandatory - boolean, default: true
    • timeout - integer, default: 60000 Time to wait for a response from the server.

Returns:

  • Promise<Response> - object The response from server

event

RPC an event to the eventing microservice, which adds the event to an event log and then publishes the event for consumption.

const response = await client.event(context, documentId, routingKey, message, options)

Arguments:

  • context - See context object
    • expects log
  • documentId - string The document id associated with the event
  • routingKey - string The routing key subscribers for the event to be routed to
  • message - object The event data to pass to the consumer of the event.
  • options - optional, object
    • exchangeType - string, default: 'topic' The type of exchange the message will be published to see rabbitmq documentation. Valid values: 'topic', 'fanout', 'direct'
    • exchangeOptions - object options for the creation of the exchange see amqplib documentation
      • Default Options
        • durable - boolean, default: true
    • publishOptions - object options for the publishing of the message see amqplib documentation
      • Default Options
        • mandatory - boolean, default: true
    • timeout - integer, default: 60000 Time to wait for a response from the server.

Returns:

  • Promise<Response> - object The response from server

use

Sets up global middleware for subscriptions. Middleware are expected to return promises that resolve with or without values.

client.use(async (context, message) => {
  context.user = await authorizeUser(message.token);
});
client.use(async (error, context, message) => {
  context.log.error('error occurred', { errorData: error });
});

Arguments:

Context Objects

Context objects are used to pass information from middleware to middleware. Just add additional fields to the context object as necessary. The context object is pre-populated upon the handling of a message or passed in by the calling code.

Subscription Generated Fields

  • requestId - string This is the request id associated with message. If a request id is not passed with the message context a request id will be generated, e.g., A1B2C3E4
  • log - object A logger that will take the interface of the winston logger. If a logger was not passed a default logger will be used that outputs to the stdout.

Middleware Functions

There are two types of middleware used by subscriptions: an error handling function that takes 3 arguments and a message processing function that takes 2.

To finish processing and the current middleware function is not the last in the chain of middleware. Set a done flag to the context object context.done = true and resolve the returning promise.

If the function resolves with an object, the object is returned to the calling client code if available.

Message Handling

  • context - See context object
  • message - object The message passed to the subscribing middleware
  • Returns Promise<Any>

Error Handling

  • error - The error that has occurred either on the subscription or during message processing
  • context - See context object
  • message - object The message passed to the subscribing middleware (If available, may be null/undefined)
  • Returns Promise<Any>