o-stream

Intuitive node streams in typescript.

Usage no npm install needed!

<script type="module">
  import oStream from 'https://cdn.skypack.dev/o-stream';
</script>

README

O-Stream

Provides an intuitive interface for node streams, in typescript.

Useful for creating streams and gulp plugins (there is an example).

Install

npm install --save o-stream

Or dev:
npm install --save-dev o-stream

Basic Usage

Simple transform stream:

import ObjectStream, { EnteredArgs } from "o-stream"

let incrementStream = ObjectStream.transform<number, number>({
    onEntered: (args: EnteredArgs<number, number>) => {
        args.output.push(args.object + 1);
    }
});

stream.write(4);

let actual = stream.read(); // 5

Output on stream end:

import ObjectStream, { EnteredArgs, EndedArgs } from "o-stream"

let sum = 0;
let sumStream = ObjectStream.transform<number, number>({
    onEntered: (args: EnteredArgs<number, number>) => {
        sum += args.object;
    },
    onEnded: (args: EndedArgs<number>) => {
        args.output.push(sum);
    }
});

sumStream.write(100);
sumStream.write(5);
sumStream.write(-10);
sumStream.end();

let actualSum = sumStream.read(); // 95

Async:

import ObjectStream, { EnteredAsyncArgs, EndedAsyncArgs } from "o-stream"

let sum = 0;
let sumStream = ObjectStream.transform<number, number>({
    onEnteredAsync: (args: EnteredAsyncArgs<number, number>) => {
        sum += args.object;
        args.done();
    },
    onEndedAsync: (args: EndedAsyncArgs<number>) => {
        args.output.push(sum);
        args.done();
    }
});

let num1 = 4;
let num2 = 6;
let num3 = -1;
let expected = num1 + num2 + num3;

sumStream.write(num1);
sumStream.write(num2);
sumStream.write(num3);
sumStream.end();

let actual = sumStream.read();

expect(actual).toBe(expected);

Create a stream from an array:

import ObjectStream from "o-stream"

let stream = ObjectStream.fromArray([1, 2, 3, 5, 8, 13]);

Gulp plugin example

This example creates a single file listing the input files names:

import ObjectStream, { EnteredArgs, EndedArgs, Transform } from "o-stream";
import * as gutil from "gulp-util"; // install --save-dev @types/gulp-utils

export default function plugin(outFilePath: string): Transform {

    let myPaths: string[] = [];

    return ObjectStream.transform({
        onEntered: (args: EnteredArgs<gutil.File, gutil.File>) => {
            myPaths.push(args.object.relative);
        },
        onEnded: (args: EndedArgs<gutil.File>) => {
            let file = new gutil.File({
                cwd: "",
                base: "",
                path: outFilePath,
                contents: new Buffer(myPaths.join("\n"))
            })

            args.output.push(file);
        }
    });
}

Combine streams: Creates a stream that passes data through multiple underling streams.

import ObjectStream, { EnteredArgs, EndedArgs } from "o-stream"

// Just appends some string to the input string.
function createAppendStream(append: string): NodeJS.ReadWriteStream {
    return ObjectStream.transform<string, string>({
        onEntered: args => { args.output.push(args.object + append); }
    });
}

let combined = new CombinedStream([
    createAppendStream("b"),
    createAppendStream("c"),
    createAppendStream("d"),
    createAppendStream("e")
]);

combined.write("a");

let actual = combined.read();

expect(actual).to.equal("abcde");

Error handling

Node streams do not propagate errors.
ObjectStreams(created with this library), by default, propagate errors.
It means that errors emitted from a source stream will be re-emitted from the current stream.
You can override the default error handling:

import ObjectStream, { EnteredArgs } from "o-stream"

let myStream = ObjectStream.transform<number, number>({
        onSourceStreamError: args => args.emitError(args.error + "b")
    });

Require Usage

let ostream = require('o-stream').default;