@escio/stream-through-stream

Transform object stream mapping each object into a sub-stream through a callback.

Usage no npm install needed!

<script type="module">
  import escioStreamThroughStream from 'https://cdn.skypack.dev/@escio/stream-through-stream';
</script>

README

@escio/stream-through-stream

Transform object stream mapping each object into a sub-stream through a callback.

Installation

$ npm install @escio/stream-through-stream

Usage

The stream takes a callback that is called with every object passing through the stream. The callback is expected to return a readable stream which will be piped into the output of the transform stream. The transform stream will end once all the readable streams have ended.

const stream = require('stream');
const throughStream = require('@escio/stream-through-stream');

// Create a readable stream generating two strings 'a' and 'b'
// before ending:
const source = new stream.Readable({
    objectMode: true,
    read(size) {
        this.push('a');
        this.push('b');
        this.push(null);
    }
});

source.pipe(
    // For each object in the stream, generate a new stream
    // outputting the object twice:
    throughStream((obj) => new stream.Readable({
        objectMode: true,
        read(size) {
            this.push(obj);
            this.push(obj);
            this.push(null);
        }
    }))
).pipe(
    // Log all objects to the console. Should output:
    //
    // a
    // a
    // b
    // b
    //
    // Notice how the two original strings were duplicated.
    new stream.Writable({
        write(object, encoding, done) {
            console.log(object);
            done();
        }
    })
);

By default the through stream will wait for the generated readable stream to end before requesting a stream for the next object in the source stream. It's possible change this and run multiple readable streams in parallel by setting the concurrency parameter.

const stream = require('stream');
const throughStream = require('@escio/stream-through-stream');

...

source.pipe(
    throughStream((obj) => new stream.Readable({
        ...
    }), {
        concurrency: 4 // Run up to 4 concurrent streams
    })
).pipe(
    ...
);

Keep in mind that when using concurrency, there is no guaranteed ordering in the output from the through stream. That means that given the example above, the output could be any permutation of the objects a a b b.

License

ISC © Escio AS