simple-amqplib-rpc

simple amqp rpc interface

Usage no npm install needed!

<script type="module">
  import simpleAmqplibRpc from 'https://cdn.skypack.dev/simple-amqplib-rpc';
</script>

README

Simple amqplib RPC Build Status

Simple RPC interface for amqplib

Installation

This module is installed via npm:

$ npm install simple-amqplib-rpc

Example Usage

Client side:

const amqplib = require('amqplib');
const { request } = require('simple-amqplib-rpc');
const config = {
  url: 'amqp://guest:guest@127.0.0.1:5672//',
  exchange: 'exchange',
  routingKey: 'sum'
};

const connection = await amqplib.connect(config.url);
try {
  const content = [ 1, 2, 3 ];
  const opts = { exchange: config.exchange, timeout: 5000 };
  const resp = await request(connection, config.routingKey, content, opts);
  // resp = 6
} catch (err) {
  switch (err.name) {
    case 'ChannelClosedError': // the connection was closed unexpectedly.
    case 'NoRouteError': // the specified routing key goes nowhere (server needs to bindQueue).
    case 'ResponseError': // the request returned an error.
    case 'TimeoutError': // the request took more than 5 seconds.
  }
}

Server side:

const amqplib = require('amqplib');
const { checkReplyQueue, error, reply } = require('simple-amqplib-rpc');
const config = {
  url: 'amqp://guest:guest@127.0.0.1:5672//',
  exchange: 'exchange',
  queue: 'sum',
  routingKey: 'sum'
};

const connection = await amqplib.connect(config.url);
const consumeChannel = await connection.createChannel();
await consumeChannel.assertQueue(config.queue);
await consumeChannel.bindQueue(config.queue, config.exchange, config.routingKey);
const publishChannel = await connection.createChannel();
consumeChannel.consume(config.queue, async message => {
  if (!await checkReplyQueue(connection, message)) { // the consumer doesn't exist anymore
    return consumeChannel.reject(message, false); // reject and don't requeue
  }
  try {
    const numbers = JSON.parse(message.content);
    const sum = numbers.reduce((acc, n) => acc + n, 0);
    reply(consumeChannel, message, sum, publishChannel);
  } catch (err) {
    // if something went wrong, return an error to the client
    error(consumeChannel, message, err, publishChannel);
  }
});

API

checkReplyQueue(connection, message)boolean

Check if a "reply to" queue exists or not. Will create a separate channel so that it doesn't kill an existing one if the queue check fails.

error(channel, message, error, publisherChannel)

Reply to an rpc request with an error. Will automatically nack and not requeue the message after the error response has been sent.

reply(channel, message, content, publisherChannel)

Reply to an rpc request. Will automatically ack the message after the response has been sent.

request(connection, key, content)*

Make an rpc request. Each request will have its own channel.

checkReplyQueue(connection, message) ⇒ boolean

Check if a "reply to" queue exists or not. Will create a separate channel so that it doesn't kill an existing one if the queue check fails.

Kind: global function
Returns: boolean - whether the reply queue exists or not

Param Type Description
connection amqplibConnection amqplib connection
message object incomming message

error(channel, message, error, publisherChannel)

Reply to an rpc request with an error. Will automatically nack and not requeue the message after the error response has been sent.

Kind: global function

Param Type Description
channel AmqplibChannel the amqplib channel on which the message was received
message object incomming message
error Error an error object. { message, code } will be returned to the client.
publisherChannel AmqplibChannel optional separate channel to publish response on

reply(channel, message, content, publisherChannel)

Reply to an rpc request. Will automatically ack the message after the response has been sent.

Kind: global function

Param Type Description
channel AmqplibChannel on which the message was received
message object incomming message
content * response message
publisherChannel AmqplibChannel optional separate channel to publish response on

request(connection, key, content, opts) ⇒ *

Make an rpc request. Each request will have its own channel.

Kind: global function Returns: * - json decoded response Throws:

  • ChannelClosedError when the channel is closed
  • NoRouteError if a published message has nowhere to go
  • ResponseError if the request returned an error
  • TimeoutError after the specified timeout period
Param Type Description
connection amqplibConnection amqplib connection
key string the routing key for the rpc service
content * must be json serialisable
opts object
opts.exchange string the amqp exchange to publish to (defaults to '')
opts.timeout number optional max time to wait for a response

Note: To regenerate this section from the jsdoc run npm run docs and paste the output above.

License

The BSD License

Copyright (c) 2019, Andrew Harris

All rights reserved.

Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:

  • Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.

  • Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.

  • Neither the name of the Andrew Harris nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.