anytv-kue

kue wrapper for any.tv

Usage no npm install needed!

<script type="module">
  import anytvKue from 'https://cdn.skypack.dev/anytv-kue';
</script>

README

anytv-kue

Kue Helper for setup and cleanup of Kue

Changes to be made to old codes

  1. Replace all require('kue'); with

  2. NOTE: Define namespace when calling createQueue to ensure properly scheduling for multiple applications. ex

        const kue = require('anytv-kue')();
        const queue = kue.createQueue({
            prefix: 'namespace'
        });
    

Added features

  • Setup kue

    // server.js (before)
    const kue = require('anytv-kue')();
    const queue = kue.createQueue();
    
    function start () {
        /* server code */
    }
    
    queue.on('error', err => {
        winston.log('error', 'QUEUE ERROR:', err);
    });
    
    process.once('SIGTERM', sig => {
        winston.log('SIGTERM', sig);
        queue.shutdown(5000, err => {
            winston.log('error', 'Kue shutdown:', err );
            process.exit(0);
        });
    });
    
    queue.active((err, ids) => {
        ids.forEach(id => {
            kue.Job.get(id, (_err, job) => {
                job.inactive();
            });
        });
    });
    
    queue.inactive((err, ids) => {
        ids.forEach(id => {
            kue.Job.get(id, (_err, job) => {
                job.inactive();
            });
        });
    });
    
    start();
    
    
    // server.js (now)
    const kue = require('antv-kue')();
    const queue = kue.createQueue();
    
    function start () {
        /* server code */
    }
    
    kue.setup(queue);
    
    start();
    
    
  • Activate UI

        const kue = require('anytv-kue')();
        const queue = kue.createQueue({remove_on_complete:false});
        const express = require('express');
        const app = express();
    
        //activates UI without auth in `/kue`
        kue.activateUI(app)();
        //activates UI in route `/kueapp` without auth
        kue.activateUI(app)('/kueapp');
        //activates UI with basic auth in `/kue`
        kue.activateUI(app, 'username', 'password')();
        //activates UI with basic auth in `/kueapp`
        kue.activateUI(app, 'username', 'password')('/kueapp');
        //activates UI with custom middleware in `/kue`
        kue.activateUI(app, middleWare)();
        //activates UI with custom middleware in `/kueapp`
        kue.activateUI(app, middleWare)('/kueapp');
    
  • Default Title

        const kue = require('anytv-kue')();
        const queue = kue.createQueue();
    
        queue.create('jobtitle', { test: 123 })
            .save(); //default title will be "{ test: 123 }"
    
        queue.create('jobtitle', { test: 123, title: '123'})
            .save(); //title will be "123"
    
        queue.create('jobtitle')
            .save(); //title will be "undefined"
    
  • Cleanup jobs

        kue.cleanup(job_type, status);
    
  • Remove jobs on complete

      // before
      const kue = require('kue');
      const queue = kue.createQueue();
    
      queue.create('name', {})
        .removeOnComplete(true)
        .save();
    
      queue.create('name2', {})
        .removeOnComplete(true)
        .save();
    
      //now
      const kue = require('anytv-kue')({shutdownTimer: 10000});
      const queue = kue.createQueue({remove_on_complete: true});
    
      queue.create('name', {})
        .save();
    
      queue.create('name2', {})
        .save();
    
  • fixed_doubling and delay_doubling custom backoffs

    • fixed_doubling - starts at 2 minutes, then 4, then 8, etc
    • delay_doubling - starts at initial_delay * 2 (2 seconds if no initial delay)
        const kue = require('anytv-kue')();
        const queue = kue.createQueue();
    
        queue.createJob('jobtitle', { test: 123 })
            .backoff('fixed_doubling')
            .save(); // next delay will be 2 minutes
          
        queue.createJob('jobtitle', { test: 123 })
            .delay(1000 * 10) // 10 seconds
            .backoff('delay_doubling')
            .save(); // next delay will be 20 seconds
          
        queue.createJob('jobtitle', { test: 123 })
            .backoff('delay_doubling')
            .save(); // next delay will be 2 seconds
    

Available configurations

constructor

  • shutdownTimer - time alotted for graceful shutdown

options

setup

kue.setup(queue, callbacks);
  • queue - the queue we want to listen to (usually the return of kue.createQueue)
  • callbacks - there are 5 callbacks available
    {
      //called when there is an error in redis queue
      error: function (err) {
    
      },
    
      //called when a job failed altogethere after alotted attempts
      failed: function (jobid) {
    
      },
    
      //called when a process SIGTERM occurs
      sigterm: function (sig) {
    
      },
    
      //called upon setup, control what you want to do with active jobs
      active: function (err, ids) {
    
      },
    
      //called upon setup, control what you want to do with inactive jobs
      inactive: function (err, ids) {
    
      }
    }
    

How To's

Update Delayed jobs

    'use strict';

    const kue = require('kue');
    const queue = kue.createQueue();

    queue.delayed(function (err, selectedJobs) {
        selectedJobs.forEach(function (id) {
            kue.Job.get(id, function (err, job) {
                if (!job.removeOnComplete()) {
                    job.removeOnComplete(true).update();
                }
            });
        });
    });

.update() on a Job object will update it. this is an undocumented of Kue

Contributing Guidelines

Please read CONTRIBUTING.md