step-function-worker

Easy AWS step function activity worker in node.js

Usage no npm install needed!

<script type="module">
  import stepFunctionWorker from 'https://cdn.skypack.dev/step-function-worker';
</script>

README

Build Status codecov

step-function-worker

Create a nodejs aws step-function worker/pooler easily :-)

install

npm install step-function-worker

Example usage

Basic example

const fn = function(input, cb, heartbeat){
  // do something
  doSomething(input)

  // call heartbeat to avoid timeout
  heartbeat()

  // call callback in the end
  cb(null, {"foo" : "bar"}); // output must be compatible with JSON.stringify
};

const worker = new StepFunctionWorker({
  activityArn : '<activity-ARN>',
  workerName : 'workerName',
  fn : fn,
  taskConcurrency : 22, // default is null = Infinity
  poolConcurrency : 2 // default is 1
});

Concurrency management

Since version 3.0, concurrency has been replaced by poolConcurrency and taskConcurrency.

see more information in https://github.com/piercus/step-function-worker/issues/16#issuecomment-486971866

  • poolConcurrency is the maximum number of parallel getActivity, http request (see sdk.getActivity) (default: 1) Increase this to have a more responsive worker, decrease this to consume less http connections.

  • taskConcurrency (null means Infinite) represents the maximum number of parallel tasks done by the worker (default: equals to poolConcurrency).

Anyway, you should always have poolConcurrency <= taskConcurrency.

Set the Region

By default, this package is built on top of aws-sdk so you should set your AWS Region by changing AWS_REGION environment variable.

If you want to set it in JS code directly you can do it using awsConfig (see https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Config.html to see all available options) like

const worker = new StepFunctionWorker({
  activityArn : '<activity-ARN>',
  workerName : 'workerName',
  fn : fn,
  awsConfig: {
    region: '<your-region>'
  }
});

Close the worker

// when finish close the worker with a callback
// this closing process may take up to 60 seconds per concurent worker, to close all connections smoothly without loosing any task
worker.close(function(){
  process.exit();
})

Get info on current worker

// A worker as multiple poolers and multiple running tasks
// You can have infos about it by doing
const {poolers, tasks} = worker.report();

// poolers is an array of {
//   startTime: <Date>,
//   workerName: <String>,
//   status: <String>
// }
//
// tasks is an array of {
//  taskToken: <String>,
//  input: <Object>,
//  startTime: <Date>
// }
//

Custom logging with winston

You can customize logging by using a winston logger (or winston-like logger) as input

const winston = require('winston');

const logger = winston.createLogger({
  level: 'debug',
  format: winston.format.json(),
  defaultMeta: { service: 'user-service' },
  transports: [
    //
    // - Write to all logs with level `info` and below to `combined.log` 
    // - Write all logs error (and below) to `error.log`.
    //
    new winston.transports.File({ filename: 'error.log', level: 'error' }),
    new winston.transports.File({ filename: 'combined.log' })
  ]
});

const worker = new StepFunctionWorker({
  activityArn : '<activity-ARN>',
  workerName : 'workerName',
  fn : fn,
  logger
});

Alternatively, you can just use a winston-like logger

const logger = console;

const worker = new StepFunctionWorker({
  activityArn : '<activity-ARN>',
  workerName : 'workerName',
  fn : fn,
  logger
});

Events

// when a task starts
worker.on('task', function(task){
  // task.taskToken
  // task.input
  console.log("task ", task.input)
});

// when a task fails
worker.on('failure', function(failure){
  // out.error
  // out.taskToken
  console.log("Failure :",failure.error)
});

// when a heartbeat signal is sent
worker.on('heartbeat', function(beat){
  // out.taskToken
  console.log("Heartbeat");
});

// when a task succeed
worker.on('success', function(out){
  // out.output
  // out.taskToken
  console.log("Success :",out.output)
});

// when an error happens
worker.on('error', function(err){
  console.log("error ", err)
});

// when the worker has no more task to process
worker.on('empty', function(){
  console.log("error ", err)
});

// when the worker reaches taskConcurrency tasks
worker.on('full', function(err){
  console.log("error ", err)
});

Documentation

See JSDoc in the code.