@sinet/lapin

RPC library for AMQP protocol

Usage no npm install needed!

<script type="module">
  import sinetLapin from 'https://cdn.skypack.dev/@sinet/lapin';
</script>

README

Lapin wrapper for RabbitMQ

Currently this project is using Rabbus and Wascally. This project is aiming to support several producer / consumer patterns. The following are is a list of the planned patterns, and the checked ones are currently implemented:

  • Send / Receive
  • Publish / Subscribe
  • Request / Response

The JSend specification is required to determine if an error has occurred in a response.

Installation and Usage

As lapin uses wascally you need to install it along with lapin:

npm install wascally
npm install lapin

Require lapin and wascally:

var rabbit = require( 'wascally' );
var lapin  = require( 'lapin' )( rabbit );

// or

var options = {
    'logger' : logger,
    'rabbit' : wascally
};

var lapin = require( 'lapin' )( options )

The following are simple usage examples:

Send / Receive

Sender Options

exchange, messageType, routingKey, autoDelete

Please refer to Rabbus options' info

Sender

options = 'v1.logs.log';
// or
options = {
    'messageType' : 'v1.logs.log',
    'exchange'    : 'logs'
}
lapin.send( options , message, function ( error, response ) {

    // handling the response is optional
    if ( !error ) {
        console.log( response );
    }

} );

Or use the promise style send

lapin.sendPromise( 'v1.logs.log', message )
    .then( function ( response ) {
        // Return for chain then and handle response
        console.log( response );

    } )
    .catch( function ( error ) {
        // Handler error
    } );

Receiver Options

queue, exchange, messageType, autoDelete, limit, noBatch

Receiver

options = 'v1.logs.log';
// or
options = {
    'messageType' : 'v1.logs.log',
    'exchange'    : logs
}

lapin.receive( options, function ( message, done ) {

    someDatabaseQuery( message, function ( err, body ) {

        if ( err ) {
            throw err;
        }

        done();

    } );

} );

Publish / Subscribe

Publisher Options

exchange, messageType, autoDelete

Publisher

options = 'v1.users.login';
// or
options = {
    'messageType' : 'v1.users.login',
    'exchange'    : 'users' // recommended not to prefix or suffix `exchange` lapin will do it for us
}
lapin.publish( options, message, function ( error, response ) {

        // handling the response is optional
    if ( !error ) {
        console.log( response );
    }

} );

Subscriber Options

queue, exchange, messageType, autoDelete, limit, noBatch

Subscriber

options = 'v1.users.login';
// or
options = {
    'messageType' : 'v1.users.login',
    'queue'       : 'users' // recommended not to put `queue` suffix or prefix, lapin will do it for you
    'exchange'    : 'users'
}
lapin.subscribe( options, function ( message, done ) {

    someDatabaseQuery( message, function ( err, body ) {

        if ( err ) {
            throw err;
        }

        done();

    } );

} );

Request / Response

Request Options

exchange, messageType, autoDelete, routingKey, forceAck

Requester

options = 'v1.users.findAll'
// or
options = {
    'messageType' : 'v1.users.findAll',
    'exchange'    : 'users'
}
lapin.request( options, message, function ( error, data ) {

    if ( error ) {
        return reply( error ).code( 500 );
    }

    return reply( data.data );
} );

Or use the promise style request

lapin.requestPromise( 'v1.users.findAll', message )
    .then( function ( data ) {
        // Handle data
        return reply( data.data );

    } )
    .catch( function ( error ) {
        // Handle error
    } );

Responder Options

exchange, queue, autoDelete, routingKey, limit, noBatch

Responder

options = 'v1.users.findAll';
// or
options = {
    'messageType' : 'v1.users.findAll',
    'limit'       : 1
}
lapin.respond( options, function ( message, respond ) {

    if ( message.invalid ) {
            return respond.fail( 'Invalid data' );
    }

    someDatabaseQuery().then( function ( result ) {

        // JSend success with data
        respond.success( result );

    } ).catch( function handleError ( error ) {

        // JSend error
        respond.error( 'Failed query', error, 500 );
        // or -- code is optional
        respond.error( 'Failed query', error );
        // or -- data is optional
        respond.error( 'Failed query' );

    } );

} );

Please refer to JSEND for standard reply attributes

Response with Validation using Joi

// Responder
lapin.respond( {
    'messageType' : 'v1.users.findAll',
    'validate'    : Joi.object().keys( {
        'username'     : Joi.string().alphanum().min( 3 ).max( 30 ).required(),
        'password'     : Joi.string().regex( /[a-zA-Z0-9]{3,30}/ ),
        'access_token' : [ Joi.string(), Joi.number() ],
        'birthyear'    : Joi.number().integer().min( 1900 ).max( 2013 ),
        'email'        : Joi.string().email()
    } ).with( 'username', 'birthyear' ).without( 'password', 'access_token' ),

    'validateOptions' : {} // <optional> see https://github.com/hapijs/joi for validation options

} , function ( message, respond ) {
    // consumer process
} );

If validation fails, lapin will bypass respond callback and response a fail status as seen below:

    respond( {
        'status' : 'fail',
        'data'   : <Validation error message>
    } );

Please refer to Joi Validation for validation examples, structure and validation options

To Consider

Make sure to use the same messageType, routingKey and exchange options. Whenever a String option is supplied instead of the Object option, lapin will automatically create the ff:

  • exchange and messageType ( Producer )
  • exchange, messageType and queue ( Consumer )

Contributing

All pull requests must follow coding conventions and standards.

Additional Information

RPC over RabbitMQ

In general, doing RPC over RabbitMQ is easy. A client sends a request message and a server replies with a response message. In order to receive a response the client needs to send a 'callback' queue address with the request.

RabbitMQ RPC

  • When the client starts up, it creates an exclusive callback queue.
  • For an RPC request, the Client sends a message with two required properties: reply_to, which is set to the callback queue and correlation_id, which is set to a unique value for every request.
  • The request is sent to an rpc_queue queue.
  • The RPC worker (aka: server) is waiting for requests on that queue. When a message appears, it does the job and sends a message with the result back to the Client, using the queue from the reply_to field.
  • The client waits for data on the callback queue. When a message appears, it checks the correlation_id property. If it matches the value from the request it returns the response to the application.

Standards/Conventions

  • messageType: <version>.<resource>.<action>

  • exchange: <pattern>.<resource>-exchange

  • queue: <pattern>.-queue

Where

Patterns:

  • req-res

  • pub-sub

  • send-rec

Version:

  • v1

  • v2

  • and so on.