README
worky-mcworkflowface
A Javascript library for working with AWS's Simple Workflow Service.
Before using this, you should review:
Getting Started
- Register a SWF domain (if you need one).
- Write some workflows and activity tasks.
- Call
init()
with your configuration options. - Register your activity tasks and workflows.
- Start decision and activity task pollers.
Or, in JS:
const AWS = require('aws-sdk');
const { init } = require('worky-mcworkflowface');
const {
register,
startActivityTaskPoller,
startDecisionTaskPoller,
} = init({
swfClient: new AWS.SWF(),
domain: 'MyDomain',
taskList: 'MainTaskList',
workflowDefinitions: require('./workflows'),
activityTaskDefinitions: require('./activities'),
});
register().then(() => {
startActivityTaskPoller();
startDecisionTaskPoller();
});
Logging
Log output is done via debug
.
Namespaces are as follows:
swf:polling
- Related to long-polling operations (for decision and activity tasks).swf:registration
- Related to registering workflows and activities with AWS.swf:decider:<workflowId>:<runId>
- For decision task execution.swf:activity:<ActivityTaskName>:<workflowId>:<activityId>
- For activity task + execution.
You should run with your DEBUG
environment variable set to at least swf:*
so you see error messages.
Public API
Method | Description |
---|---|
init(Object) |
The main entry point. Returns a set of functions preconfigured for use in your environment. |
distillEventsIntoItems(Array) |
Takes an array of SWF events from the AWS API and distills them into easier-to-digest items. Provided for use in testing your workflows with real data. |
init(options)
This is the main API method. It accepts the following options:
Option | Type | Description |
---|---|---|
swfClient |
AWS SWF client instance to use (e.g. new AWS.SWF() ). |
|
activityTaskDefinitions |
Array |
Array of Activity Task Definitions to use. |
workflowDefinitions |
Array |
Array of Workflow Definitions to use. |
domain |
String |
SWF domain we are running in. |
identity |
String |
(Optional.) Identity of this worker passed to SWF. Defaults to os.hostname() . |
taskList |
String |
Name of the task list we are using. Any decisions made requiring a task list will use this. |
It returns an object with the following methods:
Method | Description |
---|---|
register() |
Registers your workflows and activity tasks with SWF. Returns a Promise that resolves when registration completes. |
resolveActivityTaskDefinition(name, version) |
Resolves a specific name + version combo of an activity task. |
resolveWorkflowDefinition(name, version) |
Resolves a specific name + version combo of a decision task. |
startActivityTaskPoller() |
Starts polling for and executing SWF Activity Tasks. |
startDecisionTaskPoller() |
Starts polling for and executing SWF Decision Tasks. |
Deciders
The core of each workflow is the Decider function. The decider is called repeatedly and receives a record of execution so far. Its job is to tell SWF the thing(s) to do next.
Here's an example:
function myDeciderFunction(workflowItems, availableDecisions) {
const { startTimer, completeWorkflow } = availableDecisions;
let timerItem = { started: false };
// Scan through the workflow and look for a timer
workflowItems.forEach((item) => {
if (item.type === 'timer' && item.timerId === 'testTimer') {
timerItem = item;
}
});
if (!timerItem.started || timerItem.error) {
// We have not started our timer or there was an error starting it.
// Attempt to start it.
return startTimer('testTimer', 30);
}
if (timerItem.fired) {
// Timer has fired! All done.
return completeWorkflow();
}
}
Things to note about Decider functions:
- They receive two arguments:
items
andavailableDecisions
- Can return one of the following:
- An object describing a Decision
- An array of either of the above
If your Decider throws an Error
the associated decision task will be marked as failed and retried. Returning Promises from Deciders is not supported--your decider should execute synchronously, using Activity Tasks and Child Workflows for async processing.
availableDecisions
The following decision functions are passed into the decider as the second arg:
cancelTimer(timerId)
cancelWorkflowExecution()
completeWorkflowExecution(result)
continueAsNewWorkflowExecution(input)
failWorkflowExecution(err)
requestCancelExternalWorkflowExecution(workflowId)
startActivity(name, input)
startChildWorkflowExecution(type, id, input)
startTimer(timerId, seconds)
These functions are convenient factories for the objects described in AWS's respondDecisionTaskCompleted()
docs.
WorkflowItems
SWF sends down a complete record of all events with each Decision task. This record can be difficult to parse, so we pre-process it. We merge associated events into single WorkflowItem
for ease of reference.
Supported Item Types
type |
Description |
---|---|
activity |
An attempt to execute an activity task. |
child_workflow |
Another workflow execution started via a startChildWorkflowExecution() decision. |
signal |
An external signal received by the workflow. |
timer |
An attempt to start a timer. |
workflow |
The current workflow execution. This will always be the first item. |
TODO: Support other kinds of events.
activity
, workflow
, and child_workflow
Items
Properties of Property | Type | Description |
---|---|---|
type |
String |
"activity" or "workflow" . |
name |
String |
Name of the activity or workflow. |
version |
String |
Version of the activity or workflow. |
activityId or workflowId |
String |
ID assigned to this activity or workflow execution. |
canceled |
Bool |
Whether this item's execution was canceled. |
cancelRequested |
Bool |
Whether we've requested cancellation of this item's execution. |
error |
Object |
If execution failed, this will be an object with code and message fields describing why. Otherwise it will be undefined . |
finishedAt |
Date |
Date/time execution stopped, whether due to successful completion, failure, or cancellation. |
inProgress |
Bool |
Whether execution of this item is currently happening (that is, it has not completed or been canceled). |
input |
Mixed |
Input to the activity or workflow. |
result |
Mixed |
If execution completed successfully, this will be the activity/workflow result. |
started |
Bool |
Whether execution has started. |
startedAt |
Date |
When execution started. |
success |
Bool |
Whether execution completed successfully. |
timer
Items
Properties of Property | Type | Description |
---|---|---|
type |
String |
"timer" . |
timerId |
String |
Timer id. Pass to cancelTimer() decision. |
canceled |
Bool |
Whether this timer has been canceled. |
cancelRequested |
Bool |
Whether we have requested to cancel this timer. |
error |
Object |
If there was an error starting this timer, it will be here. |
fired |
Bool |
Whether this timer has fired (without being canceled). |
firedAt |
Date |
Timestamp when this timer fired. If the timer has not fired, this will be undefined . |
inProgress |
Bool |
Whether this timer is currently running. |
started |
Bool |
Whether this timer has started ticking. |
startedAt |
Date |
When this timer started ticking. undefined if not started. |
signal
Items
Properties of Property | Type | Description |
---|---|---|
type |
String |
"signal" |
signalName |
String |
|
input |
String |
Representing Errors
Errors are represented in Workflow Items as objects with two fields: code
and message
. These fields map to the reason
and details
field used by the SWF API.
Workflow Definition
A Workflow Definition looks like this:
{
name: "MyWorkflow",
versions: {
"1.0": {
decider: function myOldDecider() { },
settings: {
/* various settings for workflow creation */
defaultChildPolicy: 'TERMINATE',
defaultTaskStartToCloseTimeout: '600',
}
},
"1.1": {
decider: function myNewDecider() { },
settings: {
/* etc. */
}
}
}
}
Things to note:
- Each version of a workflow must provide its own
decider
function andsettings
object. settings
are passed into the AWS SDK'sregisterWorkflowType()
method.- Calling the
register()
function returned byinit()
will attempt to register each workflow version.
You provide your workflow definitions as an array via the workflowDefinitions
option pass into init()
.
Activity Task Definition
An Activity Task Definition looks like this:
{
name: "MyActivity",
versions: {
"1.0": {
func: function myActivityFunction(input)
settings: {
/* settings for activity registration */
},
},
}
}
Activity Task Definitions look like Workflow Definitions
Writing Activity Functions
An Activity function:
- Receives a single argument,
input
input
can be any JSON-encodable type.
- Can return a value or the
Promise
of a value.- Return values will be automatically passed through
JSON.stringify()
. Avoid returning values that cannot be serialized to JSON.
- Return values will be automatically passed through
- Can throw
Error
s.
Returning a rejected Promise
or throwing an Error
will result in the Activity Task Execution failing.
Your decider will be passed the error details and can decide how to proceed.