parallel-universe

The set of async flow control structures and promise utils.

Usage no npm install needed!

<script type="module">
  import parallelUniverse from 'https://cdn.skypack.dev/parallel-universe';
</script>

README

parallel-universe build

Spaceman

The set of async flow control structures and promise utils.

npm install --save-prod parallel-universe

🚀 API documentation is available here.

Usage

AsyncQueue

Asynchronous queue decouples value providers and value consumers.

const queue = new AsyncQueue();

// Provider adds a value
queue.add('Mars');

// Consumer takes a value
queue.take(); // → Promise<"Mars">

add appends the value to the queue, while take removes the value from the queue as soon as it is available. If there are no values in the queue upon take call then the returned Promise is resolved after the next add call.

const queue = new AsyncQueue();

// The returned Promise would be resolved after the add call
queue.take(); // → Promise<"Mars">

queue.add("Mars");

Consumers receive values from the queue in the same order they were added by providers:

const queue = new AsyncQueue();

queue.add('Mars');
queue.add('Venus');

queue.take(); // → Promise<"Mars">
queue.take(); // → Promise<"Venus">

Acknowledgements

In some cases removing the value from the queue isn't the desirable behavior, since the consumer may not be able to process the taken value. Use takeAck to examine available value and acknowledge that it can be processed.

queue.takeAck().then(([value, ack]) => {
  if (doSomeChecks()) {
    ack();
    doSomething(value);
  }
});

takeAck returns an AckProtocol, a tuple of the available value and the acknowledgement callback. The consumer should call ack to notify the queue on weather to remove the value from the queue or to retain it.

To acknowledge that the consumer can process the value, and the value must be removed from the queue use:

ack(); // or ack(true)

To acknowledge that the value should be retained by the queue use:

ack(false);

The value that was retained in the queue becomes available for the subsequent consumer.

const queue = new AsyncQueue();

queue.add('Pluto');

queue.takeAck(([value, ack]) => {
  ack(false); // Tells queue to retain the value
});

queue.take(); // → Promise<"Pluto">

Blocking vs non-blocking acknowledgements

By default, if you didn't call ack, the acknowledgement would be automatically revoked on the next tick after the Promise returned by takeAck is resolved, and the value would remain in the queue.

If acknowledgement was revoked, the ack call would throw an error:

queue.takeAck()

    .then((protocol) => protocol) // Extra tick

    .then(([value, ack]) => {
      ack(); // → throws an Error 
    });

To prevent the acknowledgement from being revoked, request a blocking acknowledgement:

queue.takeAck(true) // Request blocking ack

    .then((protocol) => protocol) // Extra tick

    .then(([value, ack]) => {
      ack(); // Works just fine!
      doSomething(value);
    });

Blocking acknowledgement is required if the consumer has to perform asynchronous actions before processing the value.

To guarantee that consumers receive values in the same order as they were provided, blocking acknowledgements prevent subsequent consumers from being resolved until ack is called. Be sure to call ack to prevent the queue from being stuck indefinitely.

async function blockingConsumer() {

  const [value, ack] = queue.takeAck(true);

  try {
    if (await doSomeChecks()) {
      ack(true);
      doSomething(value);
    }
  } finally {
    // It's safe to call ack multiple times since it's a no-op
    ack(false);
  }
}

WorkPool

The callback execution pool that executes the limited number of callbacks in parallel while other submitted callbacks wait in the queue.

// The pool that processes 5 callbacks in parallel at maximum
const pool = new WorkPool(5);

pool.submit(async (signal) => doSomething());
// → Promise<ReturnType<typeof doSomething>>

You can change how many callbacks can the pool process in parallel:

pool.resize(2); // → Promise<void>

resize returns the Promise that is resolved when there are no excessive callbacks being processed in parallel.

If you resize the pool down, callbacks that are pending and exceed the new size limit, are notified via signal that they must be aborted.

To abort all callbacks that are being processed by the pool and wait for their completion use:

// Resolved when all pending callbacks are fulfilled
pool.resize(0); // → Promise<void>

Executor

Manages async callback execution process and provides ways to access execution results, abort or replace an execution, and subscribe to state changes.

const executor = new Executor();

executor.execute(async (signal) => doSomething());
// → Promise<void>

executor.pending;
// → true

// Aborts pending execution
executor.abort();

Lock

Promise-based lock implementation.

When someone tries to acquire a Lock they receive a Promise for a release callback that is resolved as soon as previous lock owner invokes their release callback.

const lock = new Lock();

async function doSomething() {
  const release = await lock.acquire();
  try {
    // Long process starts here
  } finally {
    release();
  }
}

// Long process would be executed three times sequentially
doSomething();
doSomething();
doSomething();

Blocker

Provides mechanism for blocking async processes and unblocking them from an external context.

const blocker = new Blocker();

async function doSomething() {
  const value = await blocker.block();
  // → "Mars"
}

doSomething();

blocker.unblock('Mars');

repeatUntil

Invokes a callback periodically with the given delay between resolutions of the returned Promise.

repeatUntil(
    // The callback that is invoked repeatedly
    async (signal) => doSomething(),

    // The until clause must return true to stop the loop
    (asyncResult) => asyncResult.rejected,

    // Optional delay between callback invokations
    100,
    // or
    // (asyncResult) => 100,

    // Optional signal that can abort the loop from the outside
    abortController.signal,
);
// → Promise<ReturnType<typeof doSomething>>

sleep

Returns a promise that resolves after a timeout. If aborted via a passed signal then rejected with an AbortError.

sleep(100, abortController.signal);
// → Promise<undefined>

timeout

Rejects with a TimeoutError if execution time exceeds the timeout. If aborted via a passed signal then rejected with an AbortError.

timeout(
    async (signal) => doSomething(),
    // or
    // doSomething()

    // Execution timeout
    100,

    // Optional signal that can abort the execution from the outside
    abortController.signal,
);
// → Promise<ReturnType<typeof doSomething>>