@uwatch/amqp

Promise-based wrapper for amqplib

Usage no npm install needed!

<script type="module">
  import uwatchAmqp from 'https://cdn.skypack.dev/@uwatch/amqp';
</script>

README

@uwatch/amqp

Promise-based wrapper for amqplib

Install

$ yarn add @uwatch/amqp

Example

import AMQP from '@uwatch/amqp';

const amqp = new AMQP({
  connectionString: 'amqp://localhost'
});

(async () => {
  try {
    // Connect and create channel
    await amqp.init();

    // Setup needed queues, exchanges...
    await amqp.setup(async () => {
      // Create queue
      await amqp.createQueue('test_queue');
    });

    // Get queue from instance
    const queue = amqp.queue('test_queue');

    // Consume message from queue
    queue.consume(message => {
      console.log(message);
    });
  } catch (e) {
    console.error(e);
  }
});

API

AMQP class

AMQP constructor. Create an instance to work with amqplib.

Example

const amqp = new AMQP();

.connect()

Connect to RabbitMQ server.

Example

await amqp.connect();

.close(forceClose = true)

Close connection to RabbitMQ server.

Params

  • forceClose {boolean}: Force connection close, reconnection disabled.

Example

await amqp.close();

.createChannel()

Create channel.

Example

await amqp.createChannel();

.setup(func)

Add setup function.

Params

  • func {function}: Setup function

Example

await amqp.setup(async () => {
  console.log('Setup function called!');
});

.clearSetups()

Remove all setups.

Example

amqp.clearSetups();

.init()

Initializing: creating connection, creating channel, calling all setup functions.

Example

await amqp.init();

.createExchange(name, type = 'direct', options = {})

Create exchange.

Params

  • name {string}: Exchange name
  • type {string}: Exchange type
  • options {object}: Exchange options

Example

const exchange = await amqp.createExchange('test_exchange');

.createQueue(name, options = {})

Create queue.

Params

  • name {string}: Queue name
  • options {object}: Queue options

Example

const queue = await amqp.createQueue('test_queue');

.exchange(name)

Get exchange from instance by name.

Params

  • name {string}: Exchange name

Example

const exchange = amqp.exchange('test_exchange');

.deleteExchange(exchange)

Remove exchange from instance by name or object.

Params

  • exchange {(string|object)}: Exchange name or object

Example

await amqp.deleteExchange('test_exchange');

.queue(name)

Get queue from instance by name.

Params

  • name {string}: Queue name

Example

const queue = amqp.queue('test_queue');

.deleteQueue(queue)

Remove queue from instance by name or object.

Params

  • queue {(string|object)}: Queue name or object

Example

await amqp.deleteQueue('test_queue');

.send(queueName, data = null, options = {})

Send data to queue.

Params

  • queueName {string}: Queue name
  • data {any}: Data to send
  • options {object}: Send options
  • options.delay {number}: Send delay in ms

Example

await amqp.send('test_queue', 'Hello World!');

.publish(exchangeName, data = null, routingKey = '', options = {})

Publish data to exchange.

Params

  • exchangeName {string}: Exchange name
  • data {any}: Data to send
  • routingKey {string}: Routing key
  • options {object}: Publish options

Example

await amqp.publish('test_exchange', 'Hello World!');

.reconnect()

Reconnect.

Example

await amqp.reconnect();

Queue class

.bind(exchangeName, pattern = '')

Bind queue to exchange.

Params

  • exchangeName {string}: Exchange name
  • pattern {string}: Bind pattern

Example

await queue.bind('test_exchange');

.consume(callback, options = {})

Add consumer function.

Params

  • callback {function}: Consumer callback
  • options {object}: Consumer options

Example

queue.consume(message => {
  console.log(message);
});

.send(queueName, data = null, options = {})

Send data to queue.

Params

  • queueName {string}: Queue name
  • data {any}: Data to send
  • options {object}: Send options

Example

await queue.send('test_queue', 'Hello World!');

.event(queueName, name, data = null, options = {})

Send event-type message to queue.

Params

  • queueName {string}: Queue name
  • name {string}: Event name
  • data {any}: Event data
  • options {object}: Send options

Example

await queue.event('test_queue', 'test_event', 'Hello World!');

.on(eventName, callback)

Listen event-type message from queue.

Params

  • eventName {string}: Event name
  • callback {function}: Event callback

Example

queue.on('test_event', (data, message) => {
  console.log(data, message);
});

.method(name, callback)

Add RPC method.

Params

  • name {string}: RPC method name
  • callback {function}: RPC method callback

Example

queue.method('test_method', (data, message) => {
  console.log(data, message);
});

.deleteMethod(name)

Delete RPC method.

Params

  • name {string}: RPC method name

Example

queue.deleteMethod('test_method');

.call(queueName, method, data = null, progress, options = {})

Call RPC method.

Params

  • queueName {string}: Queue name
  • method {string}: Method name
  • data {any}: Data to send
  • options {object}: Send options
  • options.progress {function}: Progress callback(optional)
  • options.timeout {number}: RPC timeout(defaults to 20000ms)
  • options.doNotWait {boolean}: If true, promise will resolve to undefined and RPC will not wait for method to complete

Example

const result = await queue.call('test_queue', 'test_method', {
  message: 'Hello World!'
}, { timeout: 60000, progress: progress => console.log(progress) });

Will throw TimeoutError on when timeout reached

On successful call

{
  success: true,
  result: <data returned from method>
}

On failed call

{
  success: false,
  error: <error message from call>
}

Exchange class

.publish(data = null, routingKey = '', options = {})

Publish data to exchange.

Params

  • data {any}: Data to publish
  • routingKey {string}: Routing key for RabbitMQ
  • options {object}: Publish options

Example

await exchange.publish('Hello World!');

.event(name, data = null, options = {})

Publish event-type message.

Params

  • name {string}: Event name
  • data {any}: Event data

Example

await exchange.event('test_event', 'Hello World!');

Message class

.reply(data = null)

Reply to RPC call.

Params

  • data {any}: Reply data

Example

await message.reply('Reply from RPC call!');

.progress(data = null)

Send RPC progress.

Params

  • data {any}: Progress data

Example

await message.progress('RPC progress 50%!');