@darkbyte/aqueue

an asynchronous queue for node

Usage no npm install needed!

<script type="module">
  import darkbyteAqueue from 'https://cdn.skypack.dev/@darkbyte/aqueue';
</script>

README

Asynchronous locking queue for node.

node

Examples

Using the queue

A Queue for types T returns items of type Consumable, that is: items that can be rejected or consumed. When the consume() or the reject() method are invoked on a queue item, it is marked as completed and no more actions can be taken on it. When an item is rejected, it is put back at the head of the queue, ready to be extracted with a subsequent call to pop() queue method.

const { Queue } = require('./');

const queue = new Queue();

queue.pop()
    .then((consumable) => {
        console.debug(`Got item: ${consumable.getItem()}`);
        consumable.consume(); // mark item as consumed
    }); // the item will be returned lately

console.debug('Waiting a bit');
console.debug('...');

// Produce a new number in 3 seconds
setTimeout(() => {
    console.debug('Pushing a new item');
    queue.push(42);
}, 3000);

/* Expected output:
Waiting a bit
...

(after 3 seconds)
Pushing a new item
Got item: 42

Rejecting an item

In the following example, a queue of messages must be delivered through a connection. If the connection is closed, you don't want to effectively consume the items from the queue. Rather, you put them back to be processed whenever the connection becomes available again:

const queue = new Queue<Message>();
const connection = ...;

queue.pop()
    .then((item: Consumable<Message>) => {
        if (!connection.isOpen()) {
            item.reject(); // puts the message back at the head of the queue
        } else {
            connection.send(item.getContent()) // extracts the Message from the queue item and pass it to the connection
                .then(() => item.consume()) // marks the item as completed
                .catch((error) => item.reject()); // in case of error, the item is put back at head of the queue
        }

Consuming the queue automagically

A generic QueueConsumer class is provided to make it even simpler to consume a queue.

const { Queue, QueueConsumer } = require('./');

const queue = new Queue<number>(); // queue is empty
const consumer = new QueueConsumer(queue);

// The consumer will automatically start popping the items from
// the queue, one by one, and will invoke your callback:
consumer.startConsuming((item: number) => {
    console.debug(`Got item: ${item}`);
});

// We push an item every second
let number = 0;
setInterval(() => {
    queue.push(number++); // The consumer is unlocked just now
}, 1000);

/* Expected outout:
   @t0:     Got item 0
   @t0+1s:  Got item 1
   @t0+2s:  Got item 2
   ...
*/

If an exception is thrown by the callback, the QueueConsumer will automatically reject the item extracted from the queue.

Pausing the consumer

A consumer can be paused and resumed at need.

const queue: Queue<Message> = new Queue();
const consumer = new QueueConsumer(queue);

consumer.startConsuming((item: Message) => {
    // use the extracted item, throw an exception if you can't handle it
});

// late on...
consumer.pause(); // from now on, the callback is no longer invoked

// late on...
consumer.resume(); // from now on the callback gets invoked again

How to install

npm install @darkbyte/aqueue

License

Copyright © 2019 Antonio Seprano antonio.seprano@gmail.com

This work is free. You can redistribute it and/or modify it under the terms of the MIT License. See LICENSE for full details.