mongo-qd

Priority Job Queue with MongoDB. A qd implementation.

Usage no npm install needed!

<script type="module">
  import mongoQd from 'https://cdn.skypack.dev/mongo-qd';
</script>

README

mongo-qd

Priority Job Queue with MongoDB. A qd implementation.

Reactive, and easily extensible.

Introduction

First we need an instance of Db() from Mongo's native driver. Let's assume we already have it, and it's set as db.

Creating jobs

  var MongoQd = require('mongo-qd').MongoQd;
     
  var qd = MongoQd(db);
  
  var q = qd.queue('myFirstQueue');
   
  q.job('my first job', {'_this': 'is', my: 'payload'})
    .priority('high')
    .attempts(3)
    .on('complete', function(result){console.log('Result', result)});

Pulling Jobs

Meanwhile on some other machine...

var MongoQd = require('mongo-qd');
   
var qd = MongoQd(db);

var q = qd.queue('myFirstQueue');
 
q.pull(function (err, job) {
  if (job == null) return;
   
  // Use whatever async function you want 
  someAsyncTodo(job.payload, function (err, result) {
    if (err) return job.fail(err);
      
    job.complete(result);  
  });
});

Features

  • Abstract and easily extensible with other modules
  • Delayed Jobs
  • Job event and progress pubsub
  • Optional retries with backoff

Usage

  var MongoQd = require('mongo-qd').MongoQd;

var qd = MongoQd(db, opts)

Create an instance of MongoQd.

Args:

  • db - An instance of Db() from Mongo's native driver.
  • opts - Object with options.

Options:

  • ns - (Default: 'qd'). The prefix for the collection names that would be generated by this module.
  • separator - (Default: ':'). The separator to use for collection names.
  • priorities - (Default: would be documented later.). Json object to define the available priorities.

Events:

  • error - All errors would be redirected to the main client.

var queue = qd.queue(name)

Get a reference to a certain queue.

Returns a Queue instance.

Args:

  • name - The name of the queue you're referencing.

Events:

  • newJob - A new job has just been created. The first arg is a reference the new job.
  • pulledJob - A job has just been pulled. The first arg is a reference to the pulled job.

var newJob = queue.job(name, payload)

Create a new job. The job would be persisted on the next tick, so you'll be able to set some stuff with NewJob's methods.

Returns a NewJob instance.

Args:

  • name - A string that represents the name of the job.
  • payload - Any type of struct that will represent that params of the job.

Events:

  • complete - Fired when the job is complete. First arg is the result.
  • failed - Fired when the job fails. First arg is the reason for failure.
  • failedAttempt - Fired when the job fails, but has more attempts. First arg is the reason for failure.
  • progress - Fired when a worker wants to signal on a progress. First arg is completed, second arg is total amount of work.

newJob.priority(priority)

Define the priority of the job.

Returns itself.

priority is a string that should be mapped into a number, using a priority map.

A priority map can be defined in opts of the Qd instance.

Here is the default priority map:

{
  low: 10,
  normal: 0,
  medium: -5,
  high: -10,
  critical: -15
}

newJob.delay(delay)

Delay the processing of the job.

Returns self.

delay is defined in ms.

newJob.attempts(attempts)

Define how many times this job could be restarted after a failure.

Returns itself.

newJob.backoff(backoff)

Backoff a little bit if the job fails.

Returns itself.

backoff might be one of the following values:

  • true - When it's true, the job would be rescheduled after waiting delay ms, where delay is the same value that was defined using NewJob#delay.
  • { type: 'fixed', delay: X } - Reschedule the job after X amount of ms.
  • { type: 'exponential', delay: ?X } - The delay between failed jobs will grow exponentially, as more the job keeps failing. Where X is the base for the exponential delay. If you set NewJob#delay, you might ignore X.

queue.pull(function (err, pulledJob) {...})

Pull a job from the queue.

pulledJob is an instance of PulledJob.

Note: If there are no waiting jobs to pull, pulledJob would be null. The async pull function could easily, wait until a job will become available, because there is already a notification for that using Pub/Sub. However, the decision was to keep this module as simple as possible. Such functionality could be easily extended with an external module.

pulledJob.complete(result)

Finish processing a job, and mark it as complete.

result is optional, and it can be any struct.

pulledJob.fail(error)

Mark the pulled job as failed. If it has attempts left, it would be marked as failedAttempt.

pulledJob.progress(completed, total)

Notify about some progress with the job.

  • completed - Some number that indicates how many units have been completed.
  • total - The total amount of units to be completed.

install

With npm do:

npm install mongo-qd

license

MIT