abstract-message-queue

The AbstractMessageQueue attempts to define a common interface for interacting with message queues. By depending on this interface instead of any particular queue implementation, application code can remain flexible for different environments with differe

Usage no npm install needed!

<script type="module">
  import abstractMessageQueue from 'https://cdn.skypack.dev/abstract-message-queue';
</script>

README

AbstractMessageQueue

The AbstractMessageQueue attempts to define a common interface for interacting with message queues. By depending on this interface instead of any particular queue implementation, application code can remain flexible for different environments with different requirements.

I have tried to keep the interface to a minimum so as to make it easy to implement. Any extra features can be added at an implementor's discression, or by wrapping a queue instance in another library.

Inspired by abstract-blob-store.

In this package

This package contains:

- A class for helping to create your implementation of a MessageQueue
- A function for testing your implementation against the spec

You do not have to use either of these in order to be compliant - they are just (hopefully) useful tools.

Using a MessageQueue

The MessageQueue is designed to be used in a for await loop. The loop must be run in series, but you may have multiple loops running in paralel.

for await (const message of queueInstance) {
    const success = processMessage(message);

    // On successful processing, remove message from queue
    if(success) {
        queueInstance.delete();
    }
}

Abstract Spec

A message queue instance only needs to have three methods:

- `[Symbol.asyncIterator]`
- `delete`
- `send`

[Symbol.asyncIterator]

A method that returns an instance of an async iterator. The iterator must have a next method, and should have return and throw methods.

The easiest way to do this is by using an async generator, but does not have to be. When run, the iterator must do the following:

  1. Fetch and remove the next message from the queue (this removal may be temporary and time-based)
  2. Yield the message
  3. If the delete method has been called, permanently remove the message from the queue
  4. Otherwise, return the message to the queue (the message does not have to be immediately available again)

Steps 3 and 4 should happen regardless of whether an error is thrown at the yield stage.

An illistration of how to achieve this with an asyncIterator:

class MyQueue {
    delete(){
        this.deleteCalled = true;
    }

    async * [Symbol.asyncIterator](){
        while(true) {
            this.deleteCalled = false;
            const { id, message } = await somehowGetNextMessage();

            try {
                yield message;
            } finally {
                if(this.deleteCalled) {
                    deleteMessage(id);
                } else {
                    retryMessage(id);
                }
            }
        }
    }
}

This method may also take other actions such as opening and closing connections to external resources.

delete

This is a simple, syncronous message that sets a flag for the message to be deleted from the queue when the iterator is resumed or finalized.

send

When called with a message as the first argument, add that message to the queue.

Implementing a MessageQueue

You can use this package to create your message queue class by extending the provided MessageQueue class:

import MessageQueue from 'abstract-message-queue';

class MyQueue extends MessageQueue {
    constructor(){
        // Call the superconstructor with config methods:
        super({
            // The next method fetches the next message
            async next(){
                return {
                    id: 'optional - some value used to identify a message interally',
                    message: 'the message - may be of any time, including objects'
                }
            },
            async delete(){
                // Remove the message from the queue entirely
            },
            async retry(){
                // Re-add the message to the queue to be tried again at a later date
            }
        })
    }

    send(message){
        // Add the new message to the queue
    }

    async * [Symbol.asyncIterator](){
        // You may optionally extend the iterator method to open and close connections
        this.connection = openConnection();

        try {
            yield* super[Symbol.asyncIterator]();
        } finally {
            this.connection.close();
        }
    }
}

Dependencies

None

abstract-message-queue

MessageQueue

See: default

module.exports ⏏

Kind: Exported class

new module.exports(options)

Construct an instance of the message queue

Param Type Description
options Object Protected methods for the queue
options.next function Function that gets the next message from the queue. Its return value must be an object with a "message" property and an optional "id"
options.retry function Function that re-queues the last item returned by next. The item ID will be passed as the first argument.
options.delete function Function that deletes the last item rturned by next. The item ID will be passed as the first argument.

module.exports.delete()

Deletes the last message from the queue

Kind: instance method of module.exports

module.exports.default

Utility class to help implement the AbstractMessageQueue

Kind: static property of module.exports