nxus-worker-queue

Nxus worker queue for background tasks

Usage no npm install needed!

<script type="module">
  import nxusWorkerQueue from 'https://cdn.skypack.dev/nxus-worker-queue';
</script>

README

nxus-worker-queue

Table of Contents

Worker Queue Module

Build Status

Using Redis for pub/sub background tasks

Installation

    > npm install nxus-worker-queue --save

Configuration Options

    "worker_queue": {
      "redis_url": "redis://localhost:6379",
      "cleanInterval": 3600000
    }

It's conventional to use a configuration variable to set the Redis URL in the production environment. For example:

    let config = {}
    if (process.env.REDIS_URL)
      config.worker_queue = { redis_url: process.env.REDIS_URL }
    application.start(config)

Usage

For each task, you need to define a unique task name.

Register a worker handler

    import {workerQueue} from 'nxus-worker-queue'
    workerQueue.worker('myBackgroundTask', ({data}) => {
      this.log.debug("Hello", data.hi)
    })

Request task processing

    import {workerQueue} from 'nxus-worker-queue'
    let job = workerQueue.task('myBackgroundTask', {hi: world})

The job object and notification of completed tasks

The worker queue module interacts with Redis through the intermediary Bull package. This "fastest, most reliable, Redis-based queue for Node" is "carefully written for rock solid stability and atomicity". For documentation, a good place to start is the Reference page.

The task() method returns a Bull Job object that allows you to interact with the background task.

In particular, the Job object exposes a finished() method that, when invoked, returns a promise that resolves when the job finishes. The value of the promise corresponds to the value of the promise returned by the task handler.

    let job = workerQueue.task('myBackgroundTask', {hi: world})
    job.finished().then((rslt) = { console.log('background task finished: ', rslt) })

API


WorkerQueue

Extends NxusModule

Worker Queue module for background tasks

worker

Provide a task handler

Parameters

  • taskName string Name of the task (channel) to listen for
  • handler function Handler for processing task requests; should return a promise that resolves on completion
  • opts (optional, default {})

Examples

workerQueue.worker('backgroundJob', (msg) -> {})

task

Request handling of a background task

Parameters

  • taskName string Name of the task (channel) to publish to
  • message object Options for the task worker; must be JSON serializable
  • opts (optional, default {})

Examples

workerQueue.task('backgroundJob', {hi: 'world'})

Returns object Bull job object

clean

Cleans the current queue for the given taskName.

Parameters

  • taskName string The queue/task name to clean.
  • type String The type of message to clean. Defaults to 'completed'. (optional, default 'completed')
  • delay Number The grace period. Messages older than this will be cleaned. Defaults to 1 hour. (optional, default 3600000)

cleanAll

Cleans all queues for the specified message type.

Parameters

  • type String The type of message to clean. Defaults to 'completed'. (optional, default 'completed')
  • delay Number The grace period. Messages older than this will be cleaned. Defaults to 1 hour. (optional, default 3600000)

empty

Emptys the current queue for the given taskName.

Parameters

  • taskName string The name of the queue to empty. If not provided, all queues are emptied.

emptyAll

Emptys the all queues.

Parameters

  • taskName string The name of the queue to empty. If not provided, all queues are emptied.