bus.io-exchange

Integrates Redis as a queue and a pubsub by providing an easy mechanism for publishing and handling messages

Usage no npm install needed!

<script type="module">
  import busIoExchange from 'https://cdn.skypack.dev/bus.io-exchange';
</script>

README

Build Status NPM version David DM

Bus.IO

A bus.io dependency.

The message exchange provides an iterface for publishing a message to a queue, handling that message, and potentially propagating that message to its destination.

Installation and Environment Setup

Install node.js (See download and install instructions here: http://nodejs.org/).

Install redis (See download and install instructions http://redis.io/topics/quickstart)

Install coffee-script

> npm install coffee-script -g

Clone this repository

> git clone git@github.com:turbonetix/bus.io-exchange.git

cd into the directory and install the dependencies

> cd bus.io-exchange
> npm install && npm shrinkwrap --dev

API

Exchange

This is wheere we publish, handle, and propagate messages.

Exchange#()


var exchange = require('bus.io-exchange')();

Exchange#(queue:Queue, pubsub:Pubsub, handler:EventEmitter)


var Exchange = require('bus.io-exchange');
var exchange = Exchange(Exchange.Queue(), Exchange.PubSub());

Exchange#make(queue:Queue, pubsub:Pubsub, handler:EventEmitter)


var Exchange = require('bus.io-exchange');

var queue = Exchange.Queue();
var pubsub = Exchange.PubSub();
var handler = new EventEmitter();

var exchange = Exchange(queue, pubsub, handler);

Exchange#publish(message:Object)

Puts the message onto the Queue if the message has not already been published to the Queue. If the message has already been published to the Queue it will be published onto the PubSub.


var Message = require('bus.io-common').Message;

exchange.publish( Message() );

Exchange#publish(message:Object, channel:String)

Puts the message onto the PubSub with the channel being "everyone".


var message = Message();

exchange.publish( message, 'everyone' );

Exchange#subscribe(channel:String, listener:Function, cb:Function)

Subscribes a listener to the channel and invokes the callback when the channel as been subscribed.


exchange.subscribe('some channel', function listener (message) { 

//this gets called when we receive a message on the channel

}, function callback (err, channel) { 

//this gets called when we subscribed to the channel

});

Exchange#unsubscribe(channel:String, listener:Function, cb:Function)

Unsubscribes the listener from the channel and invokes the callback when the listener as been unsubscribed.


var listener = function (message) { };

exchange.unsubscribe('some channel', listener, function callback (err, channel) { 

//this gets called when we unsubscribed from the channel

});

Exchange#queue()

Gets the Queue instance.


var queue = Exchange.queue();
queue.send(Message());

Exchange#queue(queue:Queue)

Sets the Queue instance.


var kue = require('kue');
var queue = messageExchange.Queue.make(kue.createClient());

exchange.queue(queue);

Exchange#pubsub()

Gets the pubsub instance.


var pubsub = exchange.pubsub();
pubsub.send(message, 'everyone');

Exchange#pubsub(pubsub:PubSub)

Sets the pubsub instance.


var redis = require('redis');

var pub = redis.createClient();
var sub = redis.createClient();

var pubsub = messageExchange.PubSub.make(pub, sub);

exchange.pubsub(pubsub);

Exchange#handler()

Gets the handler which is an EventEmitter.


var handler = exchange.handler();
handler.on('some message', function (message, exchange) {
  // do something
  exchange.channel(message.target).publish(message);
});

Exchange#handler(handler:EventEmitter)

Sets the handler.


var events = require('events');

var handler = new events.EventEmitter;
handler.on('some message', function (message, exchange) {
  // do something
  exchange.channel(message.target).publish(message);
});

exchange.handler(handler);

Queue

The queue is a lightweight wrapper around an object that supports a method process(name, fn). Where name is a String and fn is a Function. It must also support the method create(name, data) where name is a String and data is an Object. The return value of the create method must expose a function done(). In our case we used the Kue library. It is a really nice library for handling jobs.

Queue#()


var queue = Exchange.Queue();

Queue#(q:Object)


var kue = require('kue');
var queue = Exchange.Queue(kue.createQueue());

Queue#send(mesage:Message)


queue.send(Message());

PubSub

The pubusb is a lightweight wrapper around the redis module. You could pass in another object insead of the redis object. By making sure it supports these methods subscribe(name,cb), unsubscribe(name,cb), publish(channel, data).

PubSub#()


var pubsub = Exchange.PubSub();

PubSub#(pub:Object, sub:Object)


var pub = redis.createClient()
  , sub = redis.createClient();

var pubsub = Exchange.PubSub(pub, sub);

PubSub#send(message:Message)


var message = Message();

pubsub.send(message, message.target());

PubSub#subscribe(channel:String, cb:Function)


pubsub.subscribe('channel', function (err, channel) {
  if (err) throw err;
  console.log('channel subscribed');
});

PubSub#unsubscribe(channel:String, cb:Function)


pubsub.unsubscribe('channel', function (err, channel) {
  if (err) throw err;
  console.log('channel unsubscribed');
});

Running Tests

Unit Tests

Tests are run using grunt. You must first globally install the grunt-cli with npm.

> sudo npm install -g grunt-cli

To run the tests, just run grunt

> grunt

TODO

  • Support different queues
  • Support different pubsubs