extra-promise

Utilities for JavaScript Promise and AsyncFunction.

Usage no npm install needed!

<script type="module">
  import extraPromise from 'https://cdn.skypack.dev/extra-promise';
</script>

README

extra-promise npm GitHub license

Utilities for JavaScript Promise and AsyncFunction.

Install

npm install --save extra-promise
# or
yarn add extra-promise

API

functions

isPromise

function isPromise<T>(val: any): val is Promise<T>

Check if the val is a Promise instance.

isPromiseLike

function isPromiseLike<T>(val: any): val is PromiseLike<T>

Check if the val has a then method.

delay

function delay(timeout: number): Promise<void>

A simple wrapper for setTimeout.

timeout

function timeout(ms: number): Promise<never>

It throws a TimeoutError after ms milliseconds.

try {
  result = await Promise.race([
    fetchData()
  , timeout(5000)
  ])
} catch (e) {
  if (e instanceof TimeoutError) ...
}

pad

function pad<T>(ms: number, fn: () => T | PromiseLike<T>): Promise<T>

Run a function, but wait at least ms milliseconds before returning.

parallel

function parallel(
  tasks: Iterable<() => unknown | PromiseLike<unknown>>
, concurrency: number = Infinity
): Promise<void>

Perform tasks in parallel.

The value range of concurrency is [1, Infinity]. Invalid values will throw Error.

parallelAsync

function parallelAsync(
  tasks: AsyncIterable<() => unknown | PromiseLike<unknown>>
, /**
   * concurrency must be finite number
   */
  concurrency: number
): Promise<void>

Same as parallel, but tasks is an AsyncIterable.

series

function series(
  tasks: Iterable<() => unknown | PromiseLike<unknown>>
       | AsyncIterable<() => unknown | PromiseLike<unknown>>
): Promise<void>

Perform tasks in order. Equivalent to parallel(tasks, 1).

waterfall

function waterfall<T>(
  tasks: Iterable<(result: unknown) => unknown | PromiseLike<unknown>>
       | AsyncIterable<(result: unknown) => unknown | PromiseLike<unknown>>
): Promise<T | undefined>

Perform tasks in order, the return value of the previous task will become the parameter of the next task. If tasks is empty, return Promise<undefined>.

each

function each(
  iterable: Iterable<T>
, fn: (element: T, i: number) => unknown | PromiseLike<unknown>
, concurrency: number = Infinity
): Promise<void>

The async each operator for Iterable.

The value range of concurrency is [1, Infinity]. Invalid values will throw Error.

eachAsync

function eachAsync<T>(
  iterable: AsyncIterable<T>
, fn: (element: T, i: number) => unknown | PromiseLike<unknown>
, /**
   * concurrency must be finite number
   */
  concurrency: number
): Promise<void>

Same as each, but iterable is an AsyncIterable.

map

function map<T, U>(
  iterable: Iterable<T>
, fn: (element: T, i: number) => U | PromiseLike<U>
, concurrency: number = Infinity
): Promise<U[]>

The async map operator for Iterable.

The value range of concurrency is [1, Infinity]. Invalid values will throw Error.

mapAsync

export function mapAsync<T, U>(
  iterable: AsyncIterable<T>
, fn: (element: T, i: number) => U | PromiseLike<U>
, /**
   * concurrency must be finite number
   */
  concurrency: number
): Promise<U[]>

Same as map, but iterable is an AsyncIterable.

filter

function filter<T, U = T>(
  iterable: Iterable<T>
, fn: (element: T, i: number) => boolean | PromiseLike<boolean>
, concurrency: number = Infinity
): Promise<U[]>

The async filter operator for Iterable.

The value range of concurrency is [1, Infinity]. Invalid values will throw Error.

filterAsync

function filterAsync<T, U = T>(
  iterable: AsyncIterable<T>
, fn: (element: T, i: number) => boolean | PromiseLike<boolean>
, /**
   * concurrency must be finite number
   */
  concurrency: number
): Promise<U[]>

Same as filter, but iterable is an AsyncIterable.

all

function all<T extends { [key: string]: PromiseLike<unknown> }>(
  obj: T
): Promise<{ [Key in keyof T]: UnpackedPromiseLike<T[Key]> }>

It is similar to Promise.all, but the first parameter is an object.

Example:

const { task1, task2 } = await all({
  task1: invokeTask1()
, task2: invokeTask2()
})

promisify

type Callback<T> =(err: any, result?: T) => void

function promisify<Result, Args extends any[] = unknown[]>(
  fn: (...args: [...args: Args, callback: Callback<Result>]) => unknown
): (...args: Args) => Promise<Result>

The well-known promisify function.

callbackify

type Callback<T> = (err: any, result?: T) => void

function callbackify<Result, Args extends any[] = unknown[]>(
  fn: (...args: Args) => PromiseLike<Result>
): (...args: [...args: Args, callback: Callback<Result>]) => void

The callbackify function, as opposed to promisify.

asyncify

function asyncify<T extends any[], U>(
  fn: (...args: T) => U | PromiseLike<U>
): (...args: Promisify<T>) => Promise<U>

Turn sync functions into async functions.

const a = 1
const b = Promise.resolve(2)

const add = (a: number, b: number) => a + b

// BAD
add(a, await b) // 3

// GOOD
const addAsync = asyncify(add) // (a: number | PromiseLike<number>, b: number | PromiseLike<number>) => Promise<number>
await addAsync(a, b) // Promise<3>

cascadify

function cascadify<T extends object>(target: T): Cascadify<T>

Use the decorator Cascadable to mark the cascadable methods (the return value is PromiseLike<this>), transform the instance into a cascadify instance, and end with the non-cascadable member.

class Adder {
  value: number

  constructor(initialValue: number) {
    this.value = initialValue
  }

  get() {
    return this.value
  }

  async getAsync() {
    return this.value
  }

  @Cascadable
  async add(value: number) {
    this.value += value
    return this
  }
}

await cascadify(adder)
  .add(10)
  .get()

await cascadify(adder)
  .add(10)
  .getAsync()

await cascadify(adder)
  .add(10)
  .value

toExtraPromise

function toExtraPromise<T>(promise: PromiseLike<T>): ExtraPromise<T>

spawn

function spawn(num: number, task: (id: number) => Promise<void>): Promise<void>

A sugar for running the same task in parallel.

The parameter id is from 1 to num.

queueConcurrency

function queueConcurrency<T, Args extends any[]>(
  concurrency: number
, fn: (...args: Args) => PromiseLike<T>
): (...args: Args) => Promise<T>

Limit the number of concurrency, calls that exceed the number of concurrency will be delayed in order.

throttleConcurrency

function throttleConcurrency<T, Args extends any[]>(
  concurrency: number
, fn: (...args: Args) => PromiseLike<T>
): (...args: Args) => Promise<T> | undefined

Limit the number of concurrency, calls that exceed the number of concurrency will not occur.

throttleUntilDone

function throttleUntilDone<T>(fn: () => PromiseLike<T>): () => Promise<T>

Limit the number of concurrent to 1, calls that exceed the number of concurrency will return the same Promise of the currently executing call.

Classes

ExtraPromise

class ExtraPromise<T> extends Promise<T> {
  get pending(): boolean
  get fulfilled(): boolean
  get rejected(): boolean

  constructor(executor: (resolve: (value: T) => void, reject: (reason: any) => void) => void)
}

A subclass of Promise.

ExtraPromise has 3 readonly properties: pending, fulfilled, and rejected. So the state of the Promise can be known without calling the then method.

Channel

class Channel<T> {
  send(value: T): Promise<void>
  receive(): AsyncIterable<T>
  close: () => void
}

Implement MPMC(multi-producer, multi-consumer) FIFO queue communication with Promise and AsyncIterable.

  • send Send value to the channel, block until data is taken out by the consumer.
  • receive Receive value from the channel.
  • close Close the channel, no more values can be sent.

If the channel closed, send will throw ChannelClosedError.

const chan = new Channel<string>()
queueMicrotask(() => {
  await chan.send('hello')
  await chan.send('world')
  chan.close()
})
for await (const value of chan.receive()) {
  console.log(value)
}

BufferedChannel

class BufferedChannel {
  send(value: T): Promise<void>
  receive(): AsyncIterable<T>
  close: () => void
}

Implement MPMC(multi-producer, multi-consumer) FIFO queue communication with Promise and AsyncIterable. When the amount of data sent exceeds bufferSize, send will block until data in buffer is taken out by the consumer.

  • send Send value to the channel. If the buffer is full, block.
  • receive Receive value from the channel.
  • close Close channel, no more values can be sent.

If the channel closed, send will throw ChannelClosedError.

const chan = new BufferedChannel<string>(1)

queueMicrotask(() => {
  await chan.send('hello')
  await chan.send('world')
  chan.close()
})

for await (const value of chan.receive()) {
  console.log(value)
}

UnlimitedChannel

class UnlimitedChannel {
  send(value: T): void
  receive(): AsyncIterable<T>
  close: () => void
}

Implement MPMC(multi-producer, multi-consumer) FIFO queue communication with Promise and AsyncIterable.

UnlimitedChannel return a tuple includes three channel functions:

  • send Send value to the channel. There is no size limit on the buffer, all sending will return immediately.
  • receive Receive value from the channel.
  • close close the channel, no more values can be sent.

If the channel closed, send will throw ChannelClosedError.

const chan = new UnlimitedChannel<string>()

queueMicrotask(() => {
  chan.send('hello')
  chan.send('world')
  chan.close()
})

for await (const value of chan.receive()) {
  console.log(value)
}

Deferred

class Deferred<T> implements PromiseLike<T> {
  then: PromiseLike<T>['then']

  resolve(value: T): void
  reject(reason: unknown): void
}

Deferred is a Promise that separates resolve() and reject() from the constructor.

ReusableDeferred

class ReusableDeferred<T> implements PromiseLike<T> {
  then: PromiseLike<T>['then']

  resolve(value: T): void
  reject(reason: unknown): void
}

ReusableDeferred is similar to Deferred, but its resolve() and reject() can be called multiple times to change the value.

LazyPromise

class LazyPromise<T> implements PromiseLike<T> {
  then: PromiseLike<T>['then']

  constructor(executor: (resolve: (value: T) => void, reject: (reason: any) => void) => void)
}

LazyPromise constructor is the same as Promise.

The difference with Promise is that LazyPromise only performs executor after then method is called.

Signal

class Signal implements PromiseLike<void> {
  then: PromiseLike<void>['then']

  emit(): void
  discard(): void
}

The emit() make the internal Promise resolve.

The discard() make the internal Promise reject SignalDiscarded.

SignalGroup

class SignalGroup {
  add(signal: Signal): void
  remove(signal: Signal): void

  emitAll(): void
  discardAll(): void
}

Semaphore

type Release = () => void

class Semaphore {
  constructor(count: number)

  acquire(): Promise<Release>
  acquire<T>(handler: () => T | PromiseLike<T>): Promise<T>
}

Mutex

type Release = () => void

class Mutex extends Semaphore {
  acquire(): Promise<Release>
  acquire<T>(handler: () => T | PromiseLike<T>): Promise<T>
}

DebounceMicrotask

class DebounceMicrotask {
  queue(fn: () => void): void
  cancel(fn: () => void): boolean
}

queue can create microtasks, if the microtask is not executed, multiple calls will only queue it once.

cancel can cancel microtasks before it is executed.

TaskRunner

type Task<T> = () => PromiseLike<T>

class TaskRunner<T> extends EventEmitter {
  constructor(concurrency: number = Infinity)

  setConcurrency(concurrency: number): void
  push(...tasks: Task<T>[]): void
  clear(): void

  pause(): void
  resume(): void
}

A task runner, it will automatically execute tasks in FIFO order.

TaskRunner provides theses events:

  • started: It will be triggered after a task is started, provide the parameter task.
  • resolved: It will be triggered after a task is resolved, provide the paramter task and result.
  • rejected: It will be triggered after a task is rejected, provide the paramters task and reason. At the same time, TaskRunner will pause.