bhb-amqp-connection-manager

Auto-reconnect and round robin support for amqplib.

Usage no npm install needed!

<script type="module">
  import bhbAmqpConnectionManager from 'https://cdn.skypack.dev/bhb-amqp-connection-manager';
</script>

README

bhb-amqp-connection-manager

安装

npm install --save amqplib bhb-amqp-connection-manager@0.0.7
注意:该组件在不断完善中,可能引入比较多不兼容的内容,请锁死版本,按需升级

简介

大头兄弟amqp链接管理 基于amqp-connection-manager 包的功能,做部分业务扩展

需要了解的使用教程 包含

发送信息示例

const rabbitMQ = require('bhb-amqp-connection-manager');

const connection = rabbitMQ.connect(process.env.APP_MQ_URL)
    .on('connect', () => console.log('Connected!'))
    .on('disconnect', params => console.log('Disconnected.', params.err.stack));

const channel = connection.createChannel()
    .addSetup(async function(channel) {
        // `channel` here is a regular amqplib `ConfirmChannel`.
        const prefetch = parseInt(process.env.prefetch) || 1;

        await channel.assertQueue('hello', {durable: false});

        await channel.prefetch(prefetch);
    });
(async function() {
    await channel.sendToQueue('hello', {num: 1});
    await channel.publish('', 'hello', {num: 10001});
    await connection.close();
})();

消费消息示例

if (require('cluster').isMaster) {
    return require('../util/master');
}
const bluebird = require('bluebird');
const commonUtil = require('../util/common');

const rabbitMQ = require('bhb-amqp-connection-manager');

const connection = rabbitMQ.connect(process.env.APP_MQ_URL);

const channel = connection.createChannel()
    .addSetup(async function(channel) {
        // `channel` here is a regular amqplib `ConfirmChannel`.
        const prefetch = parseInt(process.env.prefetch) || 5;

        await channel.assertQueue('hello', {durable: false});

        await channel.prefetch(prefetch);

        await channel.consumerQueue('hello', async function({num}) {
            console.log(`start ${num}`);
            await bluebird.delay(2000);
            console.log(`end ${num}`);
        });
    });

commonUtil.bindGraceExit(async function() {
    await channel.cancelConsumers();
    await channel.waitMessageEmpty();
    await connection.close();
});

版本变更

  • v0.0.3 支持node 4.0版本(babel编译)
  • v0.0.4 移除babel编译,只支持node 6>0
  • v0.0.5 修复json解析错误未捕获的bug。默认忽略推入数据json格式有误的数据,并通过日志输出提醒.
  • v0.0.6 修复当连接后未执行完setup的情况下,channel已经失去连接导致忽略处理,并返回null的bug。改为直接抛出一个错误,中断后续没必要的执行。 引入一个不兼容的默认值处理:当queue声明为durable的时候,如果没设置x-queue-mode的情况下,将默认设置x-queue-mode=lazy, 如果需要自定义,需要明确声明该配置,否则assertQueue时导致配置不一致会抛出错误。
  • v0.0.7 添加 consumerQueueUseRetry 便捷方法 相比 consumerQueue增加一个option参数 option.count (可选) 设置重试次数 option.failureQueue (可选) 设置失败后推入的队列,如果没设置,则自动创建一个队列,名字为:failure.${queueName} ,持久缓存 option.delay (可选) 设置重试频率的函数 ,设置该值时 count 不生效。 当count和delay都不设置的时候,使用amqplib-retry 的默认行为
       (attempts) => {
         const delay = Math.pow(2, attempts)
         if (delay > 60 * 60 * 24) {
           // the delay for the message is longer than 24 hours.  Fail the message and never retry again.
           return -1
         }
         return delay * 1000
       }
  • v0.0.8 改写mq默认的失败重试机制

    新增setConfig方法,目前可配置项:

        {
            DINGDING_HOST:'',//失败钉钉通知
            DEAD_LETTER_TTLs:[4,20,100]//失败重试间隔 单位 秒
        }