README
@uwatch/amqp
Promise-based wrapper for amqplib
Install
$ yarn add @uwatch/amqp
Example
import AMQP from '@uwatch/amqp';
const amqp = new AMQP({
connectionString: 'amqp://localhost'
});
(async () => {
try {
// Connect and create channel
await amqp.init();
// Setup needed queues, exchanges...
await amqp.setup(async () => {
// Create queue
await amqp.createQueue('test_queue');
});
// Get queue from instance
const queue = amqp.queue('test_queue');
// Consume message from queue
queue.consume(message => {
console.log(message);
});
} catch (e) {
console.error(e);
}
});
API
AMQP class
AMQP constructor. Create an instance to work with amqplib.
Example
const amqp = new AMQP();
.connect()
Connect to RabbitMQ server.
Example
await amqp.connect();
.close(forceClose = true)
Close connection to RabbitMQ server.
Params
forceClose
{boolean}: Force connection close, reconnection disabled.
Example
await amqp.close();
.createChannel()
Create channel.
Example
await amqp.createChannel();
.setup(func)
Add setup function.
Params
func
{function}: Setup function
Example
await amqp.setup(async () => {
console.log('Setup function called!');
});
.clearSetups()
Remove all setups.
Example
amqp.clearSetups();
.init()
Initializing: creating connection, creating channel, calling all setup functions.
Example
await amqp.init();
.createExchange(name, type = 'direct', options = {})
Create exchange.
Params
name
{string}: Exchange nametype
{string}: Exchange typeoptions
{object}: Exchange options
Example
const exchange = await amqp.createExchange('test_exchange');
.createQueue(name, options = {})
Create queue.
Params
name
{string}: Queue nameoptions
{object}: Queue options
Example
const queue = await amqp.createQueue('test_queue');
.exchange(name)
Get exchange from instance by name.
Params
name
{string}: Exchange name
Example
const exchange = amqp.exchange('test_exchange');
.deleteExchange(exchange)
Remove exchange from instance by name or object.
Params
exchange
{(string|object)}: Exchange name or object
Example
await amqp.deleteExchange('test_exchange');
.queue(name)
Get queue from instance by name.
Params
name
{string}: Queue name
Example
const queue = amqp.queue('test_queue');
.deleteQueue(queue)
Remove queue from instance by name or object.
Params
queue
{(string|object)}: Queue name or object
Example
await amqp.deleteQueue('test_queue');
.send(queueName, data = null, options = {})
Send data to queue.
Params
queueName
{string}: Queue namedata
{any}: Data to sendoptions
{object}: Send optionsoptions.delay
{number}: Send delay in ms
Example
await amqp.send('test_queue', 'Hello World!');
.publish(exchangeName, data = null, routingKey = '', options = {})
Publish data to exchange.
Params
exchangeName
{string}: Exchange namedata
{any}: Data to sendroutingKey
{string}: Routing keyoptions
{object}: Publish options
Example
await amqp.publish('test_exchange', 'Hello World!');
.reconnect()
Reconnect.
Example
await amqp.reconnect();
Queue class
.bind(exchangeName, pattern = '')
Bind queue to exchange.
Params
exchangeName
{string}: Exchange namepattern
{string}: Bind pattern
Example
await queue.bind('test_exchange');
.consume(callback, options = {})
Add consumer function.
Params
callback
{function}: Consumer callbackoptions
{object}: Consumer options
Example
queue.consume(message => {
console.log(message);
});
.send(queueName, data = null, options = {})
Send data to queue.
Params
queueName
{string}: Queue namedata
{any}: Data to sendoptions
{object}: Send options
Example
await queue.send('test_queue', 'Hello World!');
.event(queueName, name, data = null, options = {})
Send event-type message to queue.
Params
queueName
{string}: Queue namename
{string}: Event namedata
{any}: Event dataoptions
{object}: Send options
Example
await queue.event('test_queue', 'test_event', 'Hello World!');
.on(eventName, callback)
Listen event-type message from queue.
Params
eventName
{string}: Event namecallback
{function}: Event callback
Example
queue.on('test_event', (data, message) => {
console.log(data, message);
});
.method(name, callback)
Add RPC method.
Params
name
{string}: RPC method namecallback
{function}: RPC method callback
Example
queue.method('test_method', (data, message) => {
console.log(data, message);
});
.deleteMethod(name)
Delete RPC method.
Params
name
{string}: RPC method name
Example
queue.deleteMethod('test_method');
.call(queueName, method, data = null, progress, options = {})
Call RPC method.
Params
queueName
{string}: Queue namemethod
{string}: Method namedata
{any}: Data to sendoptions
{object}: Send optionsoptions.progress
{function}: Progress callback(optional)options.timeout
{number}: RPC timeout(defaults to 20000ms)options.doNotWait
{boolean}: If true, promise will resolve to undefined and RPC will not wait for method to complete
Example
const result = await queue.call('test_queue', 'test_method', {
message: 'Hello World!'
}, { timeout: 60000, progress: progress => console.log(progress) });
Will throw TimeoutError on when timeout reached
On successful call
{
success: true,
result: <data returned from method>
}
On failed call
{
success: false,
error: <error message from call>
}
Exchange class
.publish(data = null, routingKey = '', options = {})
Publish data to exchange.
Params
data
{any}: Data to publishroutingKey
{string}: Routing key for RabbitMQoptions
{object}: Publish options
Example
await exchange.publish('Hello World!');
.event(name, data = null, options = {})
Publish event-type message.
Params
name
{string}: Event namedata
{any}: Event data
Example
await exchange.event('test_event', 'Hello World!');
Message class
.reply(data = null)
Reply to RPC call.
Params
data
{any}: Reply data
Example
await message.reply('Reply from RPC call!');
.progress(data = null)
Send RPC progress.
Params
data
{any}: Progress data
Example
await message.progress('RPC progress 50%!');