Manage complex application logic using in memory pipelines.

Composed by three objects:

  1. Pipeline: define a series of functions in order accomplish a job
  2. Runner: run Pipeline against a series of data
  3. Scheduler: run Runner[s] with start/execution/sharing policies


You can use any of the objects independently from the others. See the examples folder for complex examples.

Complete example

let PipeRunner = require('piperunner')
let scheduler = new PipeRunner.Scheduler()

*	Define a scheduler pipeline named
*	'processing_video', and a pipeline
*	*step*.
.step('first step', (pipeline, job) => {
    console.log('acquiring video ->', job)

*	Add another step to the pipeline
.step('second step', (pipeline, job) => {
    console.log('> publish video ->', job, pipe.env.videoFrameRate)

*	Configure the scheduler in order
*	to run the pipeline at the *start.processing.video*
*	event and every 1000 milliseconds
    name: 'processing_video',
    run: {
        onEvent: 'start.processing.video',
        everyMs: 1000

*	Assign data to the pipelines runner (internal
*	to the scheduler)
    name: 'processing_video', 
    data: [{name: 'video1'}, {name: 'video2'}, {name: 'video3'}] 


/** Output
* Emitting start.processing.video
* Running pipeline processing_video with 1 running process
* acquiring video -> { name: 'video1' }
* > publish video -> { name: 'video1' }
* acquiring video -> { name: 'video2' }
* > publish video -> { name: 'video2' }
* acquiring video -> { name: 'video3' }
* > publish video -> { name: 'video3' }

Scheduler example options

    name: 'processing_video',
    run: {
        // will run when this event is emitted
        onEvent: 'start.processing.video',
        // will run when these events are emitted 
        onEvents: ['event1', 'event2'],
        // run every milliseconds
        everyMs: 1000
    on: {
        end: {
            // emit this series of event when the pipeline ends
            emit: ['end.event'],
            // exec these functions then the pipeline ends
            exec: [
                // Pass to another pipeline the data
                (scheduler, pipeline) => { 
                    scheduler.assignData('processing_audio', 'audio_frames', pipeline.data().processedAudio)

How To

Stop a pipeline

scheduler.stop({name: 'processing_video'})

Ovverride the internal EventEmitter

scheduler.emitter(new EventEmitter)

End a job before reaching the pipe end

let pipe = scheduler.pipeline('processing_video')

pipe.step('maybe', (pipe, job) => {
    if (job.name == 'api1-step1') {
        pipe.end() // Ending 
    } else {

End the entire runner before time [no processing the other jobs]

let pipe = scheduler.pipeline('processing_video')

pipe.step('maybe', (pipe, job) => {
    if (job.name == 'api1-step1') {
        pipe.endRunner() // Exit the runner 
    } else {

Pass data between pipe functions

let pipe = scheduler.pipeline('processing_video')

pipe.step('compute data', (pipe, job) => {
    pipe.next('Pass data')

pipe.step('need data', (pipe, job, incomingData) => {
    pipe.next('Received data ->', incomingData)

Store internal data

let pipe = scheduler.pipeline('processing_video')

pipe.step('compute data', (pipe, job) => {
    pipe.data['customdata'] = 'Yeah'

pipe.step('need data', (pipe, job,) => {
    pipe.next('Received data ->', pipe.data['customdata'])

Set the end callback

let runner = new Runner(jobs, pipe, () => {
    console.log('All Jobs finished')