pandakafka

The utils is Panda private

Usage no npm install needed!

<script type="module">
  import pandakafka from 'https://cdn.skypack.dev/pandakafka';
</script>

README

node-rdkafka的高级封装,在消息队列的基础上现实了类RPC远程调用.

特性

  • 消息队列
  • 远程调用
  • 推送回调.
  • 通用组件.
  • 拓展服务.
  • 中间件.

关于

该系统构建的目的在于解决Jessehealth公司的内部高并发集群架构问题.

用法

快速入门

const pandakafka = require("pandakafka")

// 消费
const consumer = new pandakafka({
  type: "consumer",
  configure: {
    name: "test",
    host: "localhost:9092",
    topic: "test"
  },
  callback: function (msg, callback) {
    console.log(msg.data.toString()) // "我的名字?"
    callback("panda")
  }
})

// 生产
const producer = new pandakafka({
  type: "producer",
  configure: {
    name: "test",
    host: "localhost:9092",
    topic: "test"
  }
})

// 生产测试消息并等待回调
producer.request("我的名字?", function (cb) {
  console.log(cb.data.toString()) // "panda"
})

配置

type: "consumer" // 模式 (consumer or producer)
configure: 
  name: "test" // 组ID
  host: "localhost:9092" // 服务器
  topic: "test" // 主题
  timeout: // 默认值为下面的示例
    loop: 600000 // 消息回调清扫的周期
    max: 1200000 // 消息回调的保留时间
callback: function // 处理需要回调的消息

异步处理请求并返回回调

// 消费
const consumer = new pandakafka({
  type: "consumer",
  configure: {
    name: "test",
    host: "localhost:9092",
    topic: "test"
  },
  callback: async function (msg, callback) {
    console.log(msg.data.toString()) // "我的名字?"
    return "panda"
  }
})

直接推送消息不要求回调

producer.send("这条消息不要求回调")

消息类型(send and request) 这里只举例send

producer.send("这条消息不要求回调")
producer.send({ text: "这条消息不要求回调" })
producer.send(Buffer.from("这条消息不要求回调"))

全局的API

.use(function) // 中间件
.data(callback function[massage]) // 接收直接推送的消息
.ready // true, false 获取是否已连接并准备完成
.undefinedBind(callback function[massage]) // 接收到的没有找到回调绑定函数的消息

许可

MIT

Copyright (c) 2018 Mr.Panda.