conveyor

Feed multiple node.js streams sequentially into one stream

Usage no npm install needed!

<script type="module">
  import conveyor from 'https://cdn.skypack.dev/conveyor';
</script>

README

Description

Feed multiple node.js streams sequentially into one (Writable or Duplex) stream.

Requirements

Install

npm install conveyor

Examples

  • Pass HTTP requests to an echo stream:
var TransformStream = require('stream').Transform,
    http = require('http');

var Conveyor = require('conveyor');

var stream = new TransformStream();
stream._transform = function(chunk, encoding, cb) {
  this.push(chunk);
  cb();
};

var c = new Conveyor(stream),
    TOTAL = 10,
    count = 0;

http.createServer(function(req, res) {
  if (++count === TOTAL)
    this.close();
  c.push(req, res);
}).listen(8080, function() {
  for (var i = 0; i < TOTAL; ++i) {
    http.request({
      host: '127.0.0.1',
      port: 8080,
      method: 'POST'
    }, function(res) {
      var b = '';
      res.setEncoding('utf8');
      res.on('data', function(d) {
        b += d;
      }).on('end', function() {
        console.log(b);
      });
    }).end('Hello from request #' + (i + 1));
  }
});

// output:
// Hello from request #1
// Hello from request #2
// Hello from request #3
// Hello from request #4
// Hello from request #5
// Hello from request #6
// Hello from request #7
// Hello from request #8
// Hello from request #9
// Hello from request #10
  • Pass HTTP requests to an Writable stream:
var WritableStream = require('stream').Writable,
    http = require('http');

var Conveyor = require('conveyor');

var stream = new WritableStream();
stream._write = function(chunk, encoding, cb) {
  console.log(chunk.toString());
  cb();
};

var c = new Conveyor(stream),
    TOTAL = 10,
    count = 0;

http.createServer(function(req, res) {
  if (++count === TOTAL)
    this.close();
  c.push(req, function() {
    // this req stream finished
    res.end();
  });
}).listen(8080, function() {
  for (var i = 0; i < TOTAL; ++i) {
    http.request({
      host: '127.0.0.1',
      port: 8080,
      method: 'POST'
    }, function(res) {
      res.resume();
    }).end('Hello from request #' + (i + 1));
  }
});

// output (assuming 1-chunk requests):
// Hello from request #1
// Hello from request #2
// Hello from request #3
// Hello from request #4
// Hello from request #5
// Hello from request #6
// Hello from request #7
// Hello from request #8
// Hello from request #9
// Hello from request #10

API

Conveyor is an EventEmitter

Conveyor events

  • end() - Emitted after end() is called and all streams have been processed.

Conveyor methods

  • (constructor)(< Writable >dest, < object >config) - Creates and returns a new Dicer instance with the following valid config settings:

    • max - integer - This is the max queue size.
  • push(< Readable >stream[, < Writable >pipeStream][, < object >pipeStreamOpts][, < function >callback]) - boolean - Pushes (appends) stream to the queue. If pipeStream is set, data (from dest passed to the constructor) will be piped to this stream with optional pipeStreamOpts pipe settings. callback is called once stream has ended and dest is drained. The return value is false if stream could not be enqueued due to the queue being full.

  • unshift(< Readable >stream[, < Writable >pipeStream][, < object >pipeStreamOpts][, < function >callback]) - boolean - Identical to push() except it unshifts (prepends) stream to the queue.