pull-stream-util

Helpers for working with pull streams

Usage no npm install needed!

<script type="module">
  import pullStreamUtil from 'https://cdn.skypack.dev/pull-stream-util';
</script>

README

pull stream util

Helpers for working with pull streams


  • cat
  • chain
  • combine latest
  • join
  • many
  • notify
  • sample
  • scan

install

npm install pull-stream-util

examples

from event

Create a stream from a single node style event

var S = require('pull-stream')
var test = require('tape')
var EE = require('events').EventEmitter
var FromEvent = require('../from-event')

test('from event', function (assert) {
    assert.plan(4)
    var bus = new EE()
    var fooStream = FromEvent('foo', bus)

    // this event will be buffered and emitted when the stream is drained
    bus.emit('foo', 'hello')

    S(
        fooStream,
        S.through(console.log.bind(console)),
        S.collect(function (err, res) {
            assert.equal(err, null)
            assert.deepEqual(res, ['hello', 'world'])
        })
    )

    bus.emit('foo', 'world')
    bus.on('removeListener', function (fn) {
        assert.pass('should remove listener on stream end')
    })

    // end the stream and remove the listener from the event emitter
    fooStream.end()

    // ------ curry -------------
    var barStream = FromEvent('bar')
    S(
        barStream(bus),
        S.through(console.log.bind(console)),
        S.drain(function (ev) {
            assert.deepEqual(ev, 'bar data')
        })
    )
    bus.emit('bar', 'bar data')
})


from emitter

Take an event emitter and return a stream of it's events

var S = require('pull-stream')
var FromEmitter = require('../from-emitter')
var EE = require('events').EventEmitter

var bus = new EE()
var streams = FromEmitter(['foo', 'bar', 'baz'], bus)

S( streams.foo(), S.log() )

bus.emit('foo', 'this is the foos')

// all events namespaced by key
S( streams(), S.drain(console.log.bind(console, 'muxed events')) )

bus.emit('foo', 'foo event')
bus.emit('bar', 'bar event')
bus.emit('baz', 'baz event')

// ---------------------------------------------
// this is the foos
// foo event
// muxed events [ 'foo', 'foo event' ]
// muxed events [ 'bar', 'bar event' ]
// muxed events [ 'baz', 'baz event' ]

mux

Take an object of streams and return a single stream of their values, namespaced by key

var S = require('pull-stream')
var mux = require('../mux')

var streams = {
    foo: S.values(['foo1', 'foo2']),
    bar: S.values(['bar1', 'bar2'])
}

S(
    mux(streams),
    S.log()
)

// ---------------------
// [ 'foo', 'foo1' ]
// [ 'bar', 'bar1' ]
// [ 'foo', 'foo2' ]
// [ 'bar', 'bar2' ]


var pair = mux.namespace('type', 'data')
console.log(pair)  // [ 'type', 'data' ]
console.log(mux.namespace.type(pair))  // 'type'
console.log(mux.namespace.data(pair))  // 'data'

broadcast

Take a source stream and pipe it to multiple subscribers


var S = require('pull-stream')
var broadcast = require('../broadcast')
var assert = require('assert')

// source *must* be async, or else it will drain before we subscribe
var source = S(
    S.values([1,2,3]),
    S.asyncMap((n, cb) => process.nextTick(cb.bind(null, null, n)))
)

// take a normal source, return a function that returns a clone of
// the source
var _source = broadcast(source)

S( _source(), S.log() )
S( _source(), S.map(n => n + 'a'), S.log(function onEnd () {
    doObject()
}) )

// -------------------------------
// 1
// 1a
// 2
// 2a
// 3
// 3a

S( _source(), S.collect(function (err, res) {
    assert(!err)
    assert.deepEqual(res, [1,2,3])
}) )


function doObject () {
    var streams = {
        a: S(
            S.values(['1aa','2aa','3aa']),
            S.asyncMap((n, cb) => process.nextTick(cb.bind(null, null, n)))
        ),
        b: S(
            S.values(['1bb','2bb','3bb']),
            S.asyncMap((n, cb) => process.nextTick(cb.bind(null, null, n)))
        )
    }

    // pass in an object of streams
    // return a function that creates an object of clones
    var createStreams = broadcast(streams)
    var _streams = createStreams()
    S( _streams.a, S.collect(function (err, res) {
        assert(!err)
        assert.deepEqual(res, [ '1aa', '2aa', '3aa' ])
    }) )
    S( _streams.b, S.collect(function (err, res) {
        assert(!err)
        assert.deepEqual(res, [ '1bb', '2bb', '3bb' ])
    }) )

    // --------------------------------
    // 1aa
    // 1bb
    // 2aa
    // 2bb
    // 3aa
    // 3bb
}

by key

Pipe an object of sources to an object of sinks

var S = require('pull-stream')
var SS = require('../by-key')

var sources = {
    foo: S.values(['foo1', 'foo2', 'foo3']),
    bar: S.values(['bar1', 'bar2', 'bar3'])
}

var sinks = {
    foo: S.log(),
    bar: S.log()
}

SS(sources, sinks)

// ------------------------
// foo1
// foo2
// foo3
// bar1
// bar2
// bar3

http

Create streams of http events from objects of request data

var S = require('pull-stream')
var createServer = require('http').createServer
var HTTPStream = require('../http')
var HTTPData = require('../http-data')
var assert = require('assert')

// xhr args
var RequestData = HTTPData({
    url: 'http://localhost:8000',
    headers: { foo: 'bar' },
    body: { test: 'test' },
    method: 'POST',
    json: true
})

// path and request body
var requestData = RequestData(['foo'], { bar: 'baz' })

var server = createServer(function onRequest (req, res) {
    assert.equal(req.headers.foo, 'bar')
    assert.equal(req.url, '/foo')

    var data = ''
    req.on('data', function (d) {
        data += d
    })
    req.on('end', function () {
        assert.deepEqual(JSON.parse(data), {
            test: 'test',
            bar: 'baz'
        })
    })

    res.end(JSON.stringify({ hello: 'world' }))
})

server.listen(8000, function () {
    S(
        HTTPStream(requestData),
        S.collect(function (err, res) {
            var resData = res.map(function (ev) {
                return {
                    type: ev.type,
                    reqBody: ev.req.body,
                    body: ev.body
                }
            })

            assert.deepEqual(resData, [
                {
                    type: 'start',
                    reqBody: { test: 'test', bar: 'baz' },
                    body: undefined
                },
                {
                    type: 'resolve',
                    reqBody: { test: 'test', bar: 'baz' },
                    body: { hello: 'world' }
                }
            ])

            console.log('http stream', err, resData)
            server.close()
        })
    )
})

http data

Helper for creating request data

var HttpRequest = require('../http-data')
var assert = require('assert')

// take this data and return a new function
var RequestData = HttpRequest({
    url: 'http://localhost:8000',
    method: 'POST',
    headers: { foo: 'bar' },
    body: { a: 'b' },
    json: true
})

// merge this argument with the request body from above, 
// return new request object
var data = RequestData({ test: 'test' })
console.log(data)
assert.deepEqual(data, {
    url: 'http://localhost:8000',
    method: 'POST',
    headers: { foo: 'bar' },
    body: { a: 'b', test: 'test' },
    json: true
})

// pass an array to add path segments to the `url` key,
// return a new function
var FooRequest = RequestData(['foo'])

// add body data, return request data
var fooData = FooRequest({ hello: 'world' })
console.log(fooData)
assert.deepEqual(fooData, {
    url: 'http://localhost:8000/foo',
    method: 'POST',
    headers: { foo: 'bar' },
    body: { a: 'b', hello: 'world' },
    json: true
})

from cb

Take an async (node style) function, and return a function that returns streams of http events, just like the http request stream in this module.

var assert = require('assert')
var fromCb = require('../http/from-cb')
var S = require('pull-stream')

function echo (data, cb) {
    process.nextTick(cb.bind(null, null, data))
}

S(
    fromCb(echo)('hello'),
    S.collect(function (err, res) {
        console.log(res)
        assert.deepEqual(res, [
            { type: 'start', req: 'hello' },
            { type: 'resolve', req: 'hello', res: 'hello', body: undefined }
        ])
    })
)