
Common operations for piped NodeJS streams. INCEPTIVE PACKAGE IN ALPHA (use Pipedreams v0.1.x instead)

Common operations for piped NodeJS streams.

Stream and Transform Construction



D2.create_throughstream is an exact copy of [event-streams' through() method] (which in turn is implemented with through); however, the write() method of the returned stream will work in an asynchronous fashion when passed some data and a callback.

Here's an example for using an asynchronous write in concert with ES6 generators / yield, taking advantage of the simplified handling offered by coffeenode-suspend:

suspend = require 'coffeenode-suspend'
step    = suspend.step

f = ->
  step ( resume ) =>
    input = D.create_throughstream()
      .pipe HOLLERITH.$write db
      .pipe D.$on_end =>
        urge "test data written"
    for idx in [ 0 .. 100 ]
      probe = "entry-#{idx}"
      yield input.write probe, resume

The general advantage of asynchronous writes is that the JavaScript event loop gets an opportunity to process steps further down the line; in this example, you could imagine millions of records being sent into the pipeline. Without asynchronicity, that data would have to be buffered somewhere before end is called on the input stream. With asynchronicity, the processing steps are called after each single item.

Error Handling

Handling errors that occur in NodeJS streams can be tough. The best solution known to me is to use domains. Here's an example from the Hollerith tests:

@[ "invalid key not accepted (2)" ] = ( T, done ) ->
  domain  = ( require 'domain' ).create();
  domain.on 'error', ( error ) ->
    T.eq error[ 'message' ], "invalid SPO key, must be of length 3: [ 'foo' ]"
  domain.run ->
    input   = D.create_throughstream()
    input.pipe HOLLERITH.$write db
    input.write [ 'foo', ]

To simplify the above, you may want to use PipeDreams' run method:

@run = ( method, handler ) ->
  domain  = ( require 'domain' ).create()
  domain.on 'error', ( error ) -> handler error
  domain.run -> method()
  return domain

run expects a method to execute and a handler that will be called in case an error should have occurred. Another example from Hollerith tests:

@[ "catching errors (3)" ] = ( T, done ) ->
  D.run ->
    input   = D.create_throughstream()
      .pipe HOLLERITH.$write db
      .pipe D.$on_end -> setImmediate done
    input.write [ 'foo', 'bar', 'baz', 'gnu', ]
  , ( error ) ->
    T.eq error[ 'message' ], "invalid SPO key, must be of length 3: [ 'foo', 'bar', 'baz', 'gnu' ]"


  • Nothing keeps you from calling run with arbitrary, non-streamy code as it is a fully generic method.

  • As for the style of the above example one could frown upon the use of two consecutive anonymous functions; then again, it really looks like

    catch error

'Retroactive' Sub-Streams: $sub()

The PipeDreams $sub method allows to formulate pipes which 'talk back', as it were, to upstream transformers. This can be handy when a given transformer performs single steps of an iterative optimization process; with $sub, it becomes possible to re-submit a less-than-perfect value from a downstream tranformer. Let's have a look at a simple example; we start with a stream of numbers, and our goal is to 'reduce' each number to a value closer to 1 than a given quality margin epsilon allows. We implement that by an 'optimizer' transform which takes the square root of each number and passes it on. The result will always be closer to 1 (if input was > 0), but not necessarily good enough. We verify for that in the next step: if the recevied number differs from 1 by more than allowed by epsilon, it is re-written into the source stream to be 'optimized' again; otherwise, it is send on as usual:

sub_demo = ->
  input   = D2.create_throughstream()
  epsilon = 0.1
    # .pipe this # imagine any number of piping steps here
    # .pipe that
    # Let's start the substream:
    .pipe D2.$sub ( source, sink ) ->
        # Optimizer: take the square root of each number:
        .pipe $ ( n, send ) ->
          send Math.sqrt n
        # Quality Control: if distance to target value 1 to great, re-insert into the sub-stream source;
        # if OK, then pass on downstream
        .pipe $ ( n, send ) ->
          whisper n
          if ( Math.abs n - 1 ) > epsilon then  source.write n # value sent 'up'
          else                                  send n         # value sent 'down'
        # Don't forget to pipe to the sink:
        .pipe sink
    # The substream is finished here, let's show the results:
    .pipe D2.$show()
  for n in [ 3, 4, 1e6, 0.1, ]
    input.write n

And here is the output of the above:

*  ▶  1.0710754830729146
*  ▶  1.0905077326652577
*  ▶  1.055449600878603
*  ▶  0.930572040929699

Important Caveat Because sub-streams are intended to be used in cases where re-sending of values to upstream consumers is needed, the source stream will not receive an end event, as that would preclude re-sending of data for the last element in the stream (you can't write to a stream that has ended). Instead, a special attribute, source.ended, is set to true when the last data item comes down the stream. Based on the knowledge of what you're doing and at what point in time you are through with sending more data, you can check for that condition and say, for example, if source.ended then source.end().

Be aware that some stream transformers (e.g. transformers that sort the entire stream) rely on the end event in the stream to be issued; such transformers must not be in the stream above the point where you explicitly call source.end().


$link accepts any number of stream transforms, either as single arguments or as list arguments; it returns a stream transform that represents the pipeline of the individual transforms. When called with no arguments or an empty list, it simply returns $create_throughstream() (i.e. a neutral stream transform). $link allows to parametrize pipelines; as a side effect, it allows to omit the ever-repeating .pipe from source code. For example, instead of writing

  .pipe @_$break_lines()
  .pipe @_$disperse_texts()
  .pipe assemble_buffer()
  .pipe build_line()
  .pipe test test_line
  .pipe D.$collect ( collector ) -> urge collector
  .pipe D.$count ( count ) -> urge count
  .pipe D.$show()

you can now write

input.pipe D.$link [
  test test_line
  D.$collect ( collector ) -> urge collector
  D.$count ( count ) -> urge count



$aggregate = ( aggregator, on_end = null ) ->

A generic aggregator. This is how it is used in the PipeDreams source itself:

@$count = ( on_end = null ) ->
  count = 0
  return @$aggregate ( -> count += +1 ), on_end
@$collect = ( on_end = null ) ->
  collector = []
  aggregator = ( data ) ->
    collector.push data
    return collector
  return @$aggregate aggregator, on_end

To use $aggregate, you have to pass in an aggregator function and an option on_end handler. The aggregator will be called once for each piece of data that comes down the stream and should return the current state of the aggregation (i.e. the intermediate result).

Aggregators display one of two behavioral patterns depending on whether on_end has been given. In case on_end has been given, each piece of data that arrives in the aggregator will be passed through the pipe and on_end will be called once with the last intermediate result. If on_end has not been given, the individual data events will not be passed on; instead, when the stream has ended, the aggregation result will be sent downstream.

$collect(), $count()

Two standard aggregators; $collect() collects all data items into a list, and $count() counts how many data items have been encountered in a stream.





D.new_hyphenate = ( hyphenation = null, min_length = 2 ) ->

'Dense' Sorting



new_densort = ( key = 1, first_idx = 0, report_handler = null ) ->

The motivation for this function is the observation that in order to sort a stream of elements, it is in the general case necessary to buffer all elements before they can be sorted and sent on. This is because in the general case it is unknown prior to stream completion whether or not yet another element that will fit into any given position is pending; for example, if you queried a database for a list of words to be sorted alphabetically, it is, generally, not possible to decide whether between any two words—say, 'train' and 'trainspotter'—a third word is due, say, 'trains'. This is because the sorting criterion (i.e. the sequence of letters of each word) is 'sparse'.

I love trains, but i don't like the fact that i will always have to backup potentially large streams in memory before i can go on with processing.

Fortunately, there is an important class of cases that provide 'dense' sorting criterion coupled with moderate disorder among the elements: Consider a stream that originates from a database query similar to SELECT INDEX(), word FROM words ORDER BY word ASC (where INDEX() is a function to add a zero-based row index to each record in the result set); we want to send each record to a consumer over a network connection*, one record at a time. We can then be reasonably sure that that the order of items arriving at the consumer is somewhat correlated to their original order; at the same time, we may be justified in suspecting that some items might have swapped places; in other words, the INDEX() field in each record will be very similar to a monotonically growing series.

*) In fact, network connections—e.g. those using WebSockets—may indeed be order-preserving, but it's easy to imagine a transport protocol (like UDP) that isn't, or a result set that is assembled from asynchronous calls to a database with each call originating from one piece of data in the stream. There may also be cases where a proof of sequentiality is not obvious, and it would be nice to have a guaranteed ordering without incurring too much of an overhead in time and space.

This is where densort comes in: assuming records are offered in a 'dense' fashion, with some field of the recording containing an integer index i, forming a finite series with a definite lower bound i0 and a certain number of elements n such that the index of the last element is i1 = n + i0 - 1 and each index i in the range i0 <= i <= i1 is associated with exactly one record.










