tream

Lightweight lazy streams in TypeScript

Usage no npm install needed!

<script type="module">
  import tream from 'https://cdn.skypack.dev/tream';
</script>

README

Lightweight lazy streams in TypeScript

License: MIT npm version npm downloads Build Status

Data types

The first abstraction of lazy streams representation called Pull. It's a function which must be called by consumer side to get a next value from the stream.

In response to get value request the producer side must send value or terminate stream. The Pull function has two arguments: push and done, which can be used in this purposes.

Note: Implementation requires that the push and done both must be deferred.

Also the consumer side can cancel getting value using function which returned from Pull.

You cannot operate with streams when pull request is started but you can either wait for push response or cancel request immediatelly by undo operation. In both cases you get the pull to make next requests.

The done response means the end of stream. The ended (or terminated) stream cannot be used to get next values.

Pull<Type>

The Pull function defined as:

interface Pull<Type> {
    (push: Push<Type>, done?: Done): Undo<Type>;
}

In common words Pull type represents source or stream.

Push<Type>

When stream as active (i.e. not ended) it can send values in response of Pull calls by calling Push function. The Push function defined as:

interface Push<Type> {
    (val: Type, pull: Pull<Type>): void;
}

It sends to consumer the value and the next Pull which can be used by consumer to get the next value and so on.

Done

In case of ended (or terminated) stream it must send end-of-stream signal by calling Done function. The Done defined as:

interface Done {
    (): void;
}

Undo<Type>

The Undo function purposed to cancel pull request and defined as:

interface Undo<Type> {
    (): Pull<Type>;
}

The returned Pull function can be used to getting the next values from stream.

None

The value none of type None is used instead of special values like null or undefined in order to represent the lack of values. This technique allows work with special values (null and undefined) in streams like with any other ordinary values without exceptions.

Maybe<Type>

The type Maybe is intended to represent values which may be lack. The Maybe type defined as:

type Maybe<Type> = Type | None;

Simple usage example:

import { Maybe, none, some } from 'tream/stream';

let str: Maybe<string> = none;

if (some(str)) {
    // do something with 'str'
    console.log(str.length);
}

Creating streams

The first thing which we would like to do in order to operate with streams is a creation of streams. Actually there is a much ways to do this.

Special streams

  • empty: The stream which has no values and ends immediately.
  • never: The stream which not sends values and never ends.
  • once(value): The stream which sends single value and ends.
  • repeat(value): The stream which sends same values and never ends.

Generators

Generator is a function which returns some value each time when it called. Also generator may return none to end the stream.

The particular case of generator is an iterator which gets an array and sends it values from first to last.

import { generate, iterate } from 'tream/stream';

const n = generate(1, s => [s * (s + 1), s]); // => 1, 2, 6, 42, 1806, ...

const v = iterate([1, 2, 3, 4, 5]); // => 1, 2, 3, 4, 5

Channels

Channel is a pair of sink and stream which co-exists independently. In other words the sink can be used to send values and the stream will get all of it which is sended through channel. So the producer side doesn't need awaiting get value requests from consumer side.

import { channel, collect } from 'tream/stream';

const [[send, end], src] = channel<number>();

send(1);
send(2);

setTimeout(() => {
  send(3);
  end();
}, 150);

collect(src)(val => {
  console.log(val); // => [1, 2, 3]
});

Forks

Same stream cannot be used multiple times directly but a Fork abstraction makes it possible. The Fork is a function which might been called to clone stream. This function is a result of applying operation fork to the source stream.

import {iterate, fork, collect} from 'tream/stream';

const src = fork(iterate([1, 2, 3]));

const a = src();
const b = src();
const c = src();

collect(a)(val => { console.log(val); }); // => [1, 2, 3]
collect(b)(val => { console.log(val); }); // => [1, 2, 3]
collect(c)(val => { console.log(val); }); // => [1, 2, 3]

Timers

TODO...

Request


Streaming algebra

To operate with streams in monadic style developed so called streaming algebra.

The streaming algebra includes the set of operations which takes a streams and creates new streams as result. Besides streams it also take functions and values which help control behavior of operations.

Map

The map operation is purposed to convert values using some function which take a value and returns new.

import {map, iterate} from 'tream/stream';

map(v => v * v,
  iterate([1, 2, 3])); // => [1, 4, 9]

map(v => `a=${v}`,
  iterate([1, 2, 3])); // => ["a=1", "a=2", "a=3"]

Filter

The operation filter was designed to filter values in stream using some function which take a value and returns boolean.

import {filter, iterate} from 'tream/stream';

filter(v => v > 0,
  iterate([-1, 0, 2, 0.1])); // => [2, 0.1]

Filter Map

The operator filter_map combines filtering and mapping.

import {filter_map, iterate, none} from 'tream/stream';

filter_map(v => v > 0 ? v * 2 : none,
  iterate([-1, 0, 2, 0.1])); // => [4, 0.2]

To remove values from stream the function must return none.

Scan

The operator scan is a form of filter_map with state.

import {scan, iterate, none} from 'tream/stream';

scan(1, (s /* previous state */, v) =>
  [s + 1 /* next state */, v > 0 ? v * s : none],
  iterate([-1, 0, 2, 0.1])); // => [6, 0.4]

Forward

import {repeat, iterate, forward} from 'tream/stream';

forward(() => repeat(4), iterate([1, 2, 3]));
// => [1, 2, 3, 4, 4, 4, 4, ...]

Then

import {repeat, iterate, taken, then} from 'tream/stream';

then(v => taken(v, repeat(`${v}`)), iterate([1, 2, 3]));
// => ["1", "2", "2", "3", "3", "3"]

Each

import {collect, id, map, taken, each} from 'tream/stream';
import {interval} from 'tream/timer';

collect(each(val => map(id(val), interval(15)),
  count(taken(4, interval(50)))));
// => [1, 1, 1, 2, 2, 2, 3, 3, 3]

Take

// TODO

Head

// TODO

Skip

// TODO

TakeN

import {iterate, taken, collect} from 'tream/stream';

collect(taken(3, iterate([1, 2, 3, 4, 5])))
  (val => { console.log(val); }); // => [1, 2, 3]

SkipN

import {iterate, skipn, collect} from 'tream/stream';

collect(skipn(2, iterate([1, 2, 3, 4, 5])))
  (val => { console.log(val); }); // => [3, 4, 5]

Fold

import {iterate, fold, collect} from 'tream/stream';

collect(fold(0, (pre, val) => pre + val,
  iterate([1, 2, 3, 4, 5])))
  (val => { console.log(val); }); // => 15

Collect

import {iterate, collect} from 'tream/stream';

collect(iterate([1, 2, 3]))
  (val => { console.log(val); }); // => [1, 2, 3]

Last

import {iterate, collect} from 'tream/stream';

last(iterate([1, 2, 3]))(val => { console.log(val); });
// => 3

Select

import {id, map, join, taken, collect} from 'tream/stream';
import {interval} from 'tream/timer';

collect(taken(5, join([
  map(id(100), interval(100)),
  map(id(30), interval(30))
])))(val => { console.log(val); });
// => [30, 30, 30, 100, 30]

Combine

import {id, map, combine, taken, collect} from 'tream/stream';
import {interval} from 'tream/timer';

collect(taken(5, combine([
  map(id(100), interval(100)),
  map(id(30), interval(30))
])))(val => { console.log(val); });
// => [[none, 30], [none, 30], [none, 30], [100, 30], [100, 30]]

Join

Combine

import {id, map, join, taken, collect} from 'tream/stream';
import {interval} from 'tream/timer';

collect(taken(5, join([
  map(id(100), interval(100)),
  map(id(30), interval(30))
])))(val => { console.log(val); });
// => [[100, 30], [100, 30]]

Chain

The operator chain concatenates streams from the first to the last. When the first stream is ended, the second begins and so on.

import {once, empty, repeat, iterate, taken, chain, collect} from 'tream/stream';

collect(chain([
    once(1),
    iterate([2, 3]),
    empty,
    taken(3, repeat(4))
]))(val => { console.log(val); }); // => [1, 2, 3, 4, 4, 4]