@hapiness/rabbitmq

Hapiness module for rabbitmq

Usage no npm install needed!

<script type="module">
  import hapinessRabbitmq from 'https://cdn.skypack.dev/@hapiness/rabbitmq';
</script>

README

Hapiness

RabbitMQ Module

RabbitMQ module for the Hapiness framework.

RabbitMQ is a server that implement the AMQP 0-9-1 protocol.

Getting started with AMQP concepts

The module uses amqp.node to connect to RabbitMQ and is architectured arround the channel API provided.

Table of contents

How this module works

Prototyping your AMQP usage

With this module you will be able to configure your AMQP stack easily with the way you prefer.

We provide three decorators, @Exchange, @Queue, @Message that will allow you to quickly getting started.

Configuration

Key Type Infos
connection object Connection Object

Connection object

Key Type Default Infos
uri string undefined other values are ignored if set
host string localhost -
port number 5672 -
login string undefined -
password string undefined -
params object undefined Parameters to include in querystring, like:
{ heartBeat: 30 }
retry.delay number 5000 Delay in ms to wait after trying to reconnect
retry.maximum_attempts number -1 Maximum reconnection attempts, -1 for Infinity
default_prefetch number 10 Default prefetch used when creating new channels

Connection & initialization

This module supports only one connection at the same time.

By default the module will retry to connect after a connection error. This behaviour is configurable.

When the connection is ready the extension will find all classes with decorators and do all the work to get everything ready.

Channels

Each connection can open several channels. Every operation on RabbitMQ occurs through channels.

You can create them easily with the ChannelService.

Exchanges

Exchanges needs a name and a type.

Decorator parameters:

  • name: string
  • type: ExchangeType (ExchangeType.Direct, ExchangeType.Topic, ExchangeType.Fanout)
  • options: Object optional see exchange assert options

Queues

Queues only requires a name.

Decorator parameters:

Message and routing

Each message sent on RabbitMQ is consumed by a queue.

You can decide to receive all the messages on your queue onMessage method. That's a good option if you have only one type of message arriving on it. You can also call your own dispatcher there.

It's also possible to receive plenty of different messages on the same queue. Creating one class to handle each message is then a better choice.

This module allow you to link a RabbitMessage to your custom message class. We provide a message router that will load the right message decorator class when receiving new messages. If no message class is found the onMessage method on your queue is used as a fallback. If you did not provide this method an error will be throwned.

Decorator parameters:

  • queue: the queue class where the message is consumed
  • exchange: the exchange class
  • routingKey: string or regex to match the routingKey of the message
  • filter: a simple one level object with keys and values. Keys are the path on the RabbitMQ message and values could be a string, number, boolean or RegExp.

Using your module inside Hapiness application

yarn or npm it in your package.json

$ npm install --save @hapiness/core @hapiness/rabbitmq rxjs

or

$ yarn add @hapiness/core @hapiness/rabbitmq rxjs
"dependencies": {
    "@hapiness/core": "^1.3.0",
    "@hapiness/rabbitmq": "^1.2.3",
    "rxjs": "^5.5.6",
    //...
}
//...

Importing RabbitMQModule from the library

This module provide an Hapiness extension for RabbitMQ. To use it, simply register it during the bootstrap step of your project and provide the RabbitMQExt with its config

import { RabbitMQExt } from '@hapiness/rabbitmq';

@HapinessModule({
    version: '1.0.0',
    providers: [],
    declarations: [],
    imports: [RabbitMQModule]
})
class MyApp implements OnStart {
    constructor() {}
    onStart() {}
}

Hapiness
    .bootstrap(
        MyApp,
        [
            /* ... */
            RabbitMQExt.setConfig(
                {
                    connection: {
                        host: 'localhost',
                        port: 5276,
                        vhost: 'my_vhost',
                        login: 'xxx',
                        password: 'xxxx'
                    }
                }
            )
        ]
    )
    .catch(err => {
        /* ... */
    });

Using RabbitMQ inside your application

Using decorators

@Exchange({
    name: 'user.exchange',
    type: ExchangeType.Topic,
    // See options available: http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertExchange
    options: {
        durable: true,
        autoDelete: false
    }
})
export class UserExchange implements ExchangeInterface {}

@Queue({
    name: 'user.queue',
    // See options available: http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue
    options: {
        durable: true
    },
    binds: [{
        exchange: UserExchange,
        pattern: 'user.*'
    }]
})
export class UserQueue implements QueueInterface {

    // Inject your services
    constructor(private _myService; MyService) {}

    // Optional
    // Do some action when the queue is asserted
    onAsserted() {
        this._myService.foo();
    }

    // When a message is consumed it will arrives here if no message class has been found
    // by the router
    onMessage(message: RabbitMessage, ch: ChannelInterface) {
        return Observable.of({ ack: true });
    }

}

@Message({
    queue: UserQueue,
    exchange: UserExchange,
    routingKey: 'user.edited'
})
export class UserCreatedMessage implements MessageInterface {

    constructor(private _myService: MyService) {
        super();
    }

    // Executed when a message is consumed and dispatched here
    onMessage(message: RabbitMessage, ch: ChannelInterface): Observable<MessageResult> {
        this._myService.foo();
        // You can return an object to let the consumer know what to do with your message:
        // acknowleding, rejecting it or do nothing
        return Observable.of({ ack: true });
    }

}

This configuration will create:

  • One exchange of type topic named user.exchange.
  • One durable queue named user.queue
    • It will bind this queue to the previously created exchange with the routingKey user.*
  • It will dispatch all messages which are sent to the exchange and have the routingKey user.edited consumed by the previously created queue to the new message we created.
  • All other messages sent to the exchange with a routingKey matching the pattern user.* or sent directly to the queue will be consumed by the onMessage() method defined in the queue.

Integration in your hapiness application

Module

You need to include RabbitMQModule in imports and all your decorated classes in declarations.

@HapinessModule({
            version: '1.0.0',
            declarations: [
                MyQueue,
                MyExchange,
                MyMessage,
                ...
            ],
            providers: [
                MyService
            ],
            exports: [],
            imports: [RabbitMQModule]
        })
Bootstrap

You need to inject the extension in bootstrap using setConfig to instantiate the module.

Hapiness.bootstrap(RabbitMQModuleTest, [
    RabbitMQExt.setConfig({
        connection: {
            host: '....',
            login: '....',
            password: '....'
        }
    })
]).catch(err => done(err));

Using the services

Once the extension is loaded and RabbitMQ is connected you can use the services in your app.

We provide two services:

ConnectionService, ChannelService, MessageService

To send messages you can also use the sendMessage() utility provided.


class FooProvider {

    constructor(private _channelService: ChannelService, private _messageService: MessageService) {}

    bar(): Observable<ChannelManager> {
        // Upsert a channel by specifying a key to identify it
        // one key per channel.
        // The function returns a Observable of ChannelManager instance
        this._channelService.upsert('publish')
            .subscribe(channelManager => {
                this._myChannelManager = channelManager;
            });
    }


    foo() {
        // Use the created channel
        // Use the manager to retrieve the channel instance
        const ch = this._myChannelManager.getChannel();

        // ... or retrieve it with the shortcut getChannel and your key
        const ch = this._channelService.getChannel('publish');

        // Use any function from amqp.node
        ch.sendToQueue(...);

        this.sendToQueue(ch, { foo: 'bar' }, UserQueue);
        this.publish(ch, { foo: 'bar' }, UserExchange, { routingKey: 'foo.bar' });
    }

}

Back to top

Contributing

To set up your development environment:

  1. clone the repo to your workspace,
  2. in the shell cd to the main folder,
  3. hit npm or yarn install,
  4. run npm or yarn run test.
    • It will lint the code and execute all tests.
    • The test coverage report can be viewed from ./coverage/lcov-report/index.html.

Back to top

Change History

  • v1.7.2 (2019-12-16)
    • Handle all errors when sending a message
    • Fix scope of "this" when sending message
  • v1.7.1 (2019-12-13)
    • Handle channel closed error when sending a message to add a custom code on the thrown error
  • v1.7.0 (2019-02-27)
    • Add method to cancel consuming queue
    • Refactor consume queue to allow easier consume/cancel
    • Add a QueueStore to fetch all the queues manager instances
  • v1.6.2 (2018-11-22)
    • Create DI with providers for queues and exchanges
  • v1.6.1 (2018-11-14)
    • force_json_decode is now true by default
  • v1.6.0 (2018-10-31)
    • Add assert option in Exchange and Queue decorator to allow to disable assert during bootstrap
    • Add check option in Exchange and Queue decorator to verify existence during bootstrap
  • v1.5.1 (2018-09-24)
    • Fix reconnection error: use once instad of on and rebind event correctly
  • v1.5.0 (2018-08-24)
    • Add possibility to provide a custom MessageRouter
  • v1.4.3 (2018-08-20)
    • Emit RETRY_LIMIT_EXCEEDED error on ConnectionManager
  • v1.4.2 (2018-06-11)
    • Do not retry to connect if closing server
  • v1.4.1 (2018-05-31)
    • Fix channel creation after reconnection
  • v1.4.0 (2018-04-24)
    • Refactor channel management to handle connection errors
  • v1.3.0 (2018-03-27)
    • Add shutdown (SIGTERM/SIGINT) support
  • v1.2.3 (2018-02-05)
    • Latest packages' versions.
    • Fix typings
    • Documentation.
  • v1.2.2 (2017-12-20)
    • Latest packages' versions.
    • Fix queue dispatching in routing messages
    • Documentation.
  • v1.2.1 (2017-11-23)
    • Latest packages' versions.
    • Fix routing messages
    • Documentation.
  • v1.2.0 (2017-11-20)
    • Latest packages' versions.
    • Update Module + Tests related to latest core version.
    • Documentation.
    • Change packaging process.
  • v1.1.2 (2017-11-02)
    • Fix decorators prefetch
  • v1.1.1 (2017-10-31)
    • Fix queue binding
  • v1.1.0 (2017-10-27)
    • Allow to define queue binds without pattern
    • Allow to define queue bind pattern as array
    • Add default prefetch that is used for each channel creation if not specified in create() method first argument
    • Rename decodeContent to decodeJSONContent and change logic to not throw if content is not JSON, add force argument to try to decode if headers.json boolean is missing
    • Add force_json_decode option in queue decorator to force JSON decoding of all messages consumed
    • Rework dispatcher logic (1)
    • Add channel option for queue to allow using different channel for each queue with a different prefetch
    • Export a global event object for connection and queueManager events
    • Correct logic behind message routing
    • Add checks and throw if messages do not have all required properties
    • If the message has a filter property and it does not match discard the class from the selection
    • Update tests
    • Update documentation
  • v1.0.0 (2017-10-23)
    • Publish all features of the module
    • Tests
    • Documentation

Back to top

Maintainers

tadaweb
Julien Fauville Antoine Gomez Sébastien Ritz Nicolas Jessel

Back to top

License

Copyright (c) 2017 Hapiness Licensed under the MIT license.

Back to top