parallel-stream

Concurrent transform stream

Usage no npm install needed!

<script type="module">
  import parallelStream from 'https://cdn.skypack.dev/parallel-stream';
</script>

README

parallel-stream

build status

Transform and writable streams capable of processing chunks concurrently.

Usage

transform

A concurrent transform stream

Parameters

  • work function a function to process a single chunk. Function signature should be process(chunk, enc, callback). When finished processing, fire the provided callback.
  • options object options to pass to the transform stream. (optional, default undefined)
    • options.concurrency number number of chunks to process concurrently. (optional, default 1)

Examples

var parallel = require('parallel-stream');

var transform = parallel.transform(function(chunk, enc, callback) {
  processAsync(chunk)
    .on('done', function(processedData) {
      callback(null, processedData);
    });
}, { objectMode: true, concurrency: 15 });

readable.pipe(transform)
 .on('data', function(data) {
    console.log('got processed data: %j', data);
 })
 .on('end', function() {
   console.log('complete!');
});

Returns object a transform stream. Do not override the ._transform function.

writable

A concurrent writable stream

Parameters

  • work function a function to process a single chunk. Function signature should be process(chunk, enc, callback). When finished processing, fire the provided callback.
  • flush function a function to run once all chunks have been processed, but before the stream emits a finished event. Function signature should be flush(callback), fire the provided callback when complete. (optional, default undefined)
  • options object options to pass to the writable stream. (optional, default undefined)
    • options.concurrency number number of chunks to process concurrently. (optional, default 1)

Examples

var parallel = require('parallel-stream');

var writable = parallel.writable(function(chunk, enc, callback) {
  processAsync(chunk)
    .on('done', callback);
}, { objectMode: true, concurrency: 15 });

readable.pipe(writable)
 .on('finish', function() {
   console.log('complete!');
});

Returns object a writable stream. Do not override the ._write function.