easy-rmq

RabbitMQ (amqp) for no-brainer.

Usage no npm install needed!

<script type="module">
  import easyRmq from 'https://cdn.skypack.dev/easy-rmq';
</script>

README

Easy RabbitMQ

Wanna let your software to series of jobs? You don't want to use extensive and complicated complex queueing system. Easy RabbitMQ provide simple and easy use of RabbitMQ for you.

Project site: [https://github.com/huferry/easy-rmq]

Quick Start

First, install the package:

npm i -s easy-rmq

Connect to the queue as simple as:

const conn = await required('easy-rmq').connect({
    user: 'guest',
    password: 'secret',
    host: '172.134.1.25'
})

const queue = await conn.queue('my-queue')

and publish your message just like this:

queue.publish({
    from: 'Somebody',
    msg: 'Hello, World!'
})

or if you're from the consuming side:

queue.subscribe(payload => {
    console.log(payload)
    // this will print:
    // { "from": "Somebody", "msg": "Hello, World!" }
})

But, if you're outside a non-async function:

required('easy-rmq').connect({
    user: 'guest',
    password: 'secret',
    host: '172.134.1.25'
})
.then(conn => conn.queue('my-queue'))
.then(queue => {
    // publish message
    queue.publish({msg: 'Hello, World!'})

    // consume message
    queue.subscribe(payload => {
        console.log(payload)
    })
})

Documentation

  1. Connecting to Server
  2. Access to The Queue
  3. Publish A Message
  4. Subscribing for Messages 4.1. Handler Function 4.2. Error Handling

1. Connecting to Server

The module is exporting a single async function connect that will take an object containing properties to connect to the amqp server.

require('easy-rmq').connect({
    user: 'guest',      // mandatory
    password: 'guest',  // mandatory
    host: '127.0.0.1',  // mandatory
    port: '5672'        // optional, 5672 by default
})

Note that this function is asynchronous and will return a Promise object that returns one function queue to access the queue.

2. Access to The Queue

Accessing to the queue can be made by the the queue(queueName) function.

require('easy-rmq')
.connect({ /* fill in with your connection properties */ })
.then(conn => conn.queue('my-queue'))

Note that you don't have to create the queue on the server. We are basically doing a code-first queueing process. In the example above, the queue 'my-queue' will be created automatically on the server.

This queue function returns (a Promise to-) an object containing functions to publish a message and to subscribe for messages from the queue.

3. Publish A Message

3.1. Simple Message

The publish function takes the payload of the queue message and this can be any JSON serializable object. The payload will be carried onto the queue message and delivered by the subscribing function.

require('easy-rmq')
.connect({ /* fill in with your connection properties */ })
.then(conn => conn.queue('my-queue'))
.then(queue => {
    // payload can be any JSON serializable object
    const payload = {
        time: new Date(),
        text: 'please send the goods in 2 weeks',
        importance: 5
    }
    queue.publish(payload)
})

3.2. Propagate to Multiple Messages

Basically the payload argument corresponds with a single object. In case the payload is an array then the it will be split into multiple messages and every single object in the array will be a payload on its own, resulting in multiple queue entries.

Example:

require('easy-rmq')
.connect({ /* fill in with your connection properties */ })
.then(conn => conn.queue('my-queue'))
.then(queue => {
    // payload as array will be propagated 
    // as individuals messages
    queue.publish([
        { id: 'xxx' },
        { id: 'yyy' }
    ])

    // first message payload (aded by program):
    // [ { "id": "xxx" }, { "id": "yyy" }]

    // second message payload (automatically added):
    // { "id": "xxx" }

    // third message payload (automatically added):
    /// { "id": "yyy" }
})

4. Subscribe for Messages

The last part of this module is to subscribe for the queue messages (and then do some processing). From the queue object, use the subscribe function. This will take in as arguments the handler function and the onError function.

require('easy-rmq')
.connect({ /* fill in with your connection properties */ })
.then(conn => conn.queue('my-queue'))
.then(queue => {
    queue.subscribe(handler, onError)
})

4.1. The handler Function

The handler function is given the payload object as argument (see example).

Example:

require('easy-rmq')
.connect({ /* fill in with your connection properties */ })
.then(conn => conn.queue('my-queue'))
.then(queue => {
    queue.subscribe(
        payload => handler(payload), // the handler function
        (error, requeue) => handleError(error, requeue)) // the onError function
})

function handler(payload) {
    // Do any processing needed here!
    // Any error thrown within this processing,
    // will be forwarded to the error handling function.
    console.log(payload)
}

function handleError(error, requeue) {
    // error is of the Javascript class Error.
    // Depending on the type/kind of error you can
    // decide to requeue the message.
    // for example:
    if (error.message === 'server is still starting') {
        const timeoutInMs = 5000
        // timeout is optional, it is 1000 milliseconds by default.
        requeue(timeoutInMs)
    }
}

4.2. Error Handling

Any error thrown in the processing will triggers the error handler (see example in previous section). The error handling function, which is provided by the user, will be given 2 arguments: the error and a requeue function. In case that the user decided that he wants to retry the processing then he can invoke the requeue function. The user can set a delay in milliseconds to this function. If no delay is provided it will default to 1 seconds.