@transformation/core

Create transformation pipelines

Usage no npm install needed!

<script type="module">
  import transformationCore from 'https://cdn.skypack.dev/@transformation/core';
</script>

README

@transformation/core

accumulate

Produces the next item based on the current and the previous item:

const { accumulate } = require("@transformation/core");
await expect(
  pipeline(
    emitItems(0, 1, 2, 3, 4, 5),
    accumulate((n, previous) => ({ n, total: previous.total + n }), {
      total: 0,
    })
  ),
  "to yield items",
  [
    { n: 0, total: 0 },
    { n: 1, total: 1 },
    { n: 2, total: 3 },
    { n: 3, total: 6 },
    { n: 4, total: 10 },
    { n: 5, total: 15 },
  ]
);

appendItems

Appends the given items after all items in the pipeline.

const { appendItems } = require("@transformation/core");
await expect(
  pipeline(emitItems(0, 1, 2), appendItems(3, 4, 5), appendItems(6, 7, 8)),
  "to yield items",
  [0, 1, 2, 3, 4, 5, 6, 7, 8]
);

buffer

Adds a buffer of a given size into the pipeline.

const { buffer } = require("@transformation/core");

fixed buffer (default)

A fixed size buffer of n slots. It will wait when there is no more space awailable.

await expect(
  pipeline(emitItems(0, 1, 2, 3, 4, 5), buffer(3), delay(1)),
  "to yield items",
  [0, 1, 2, 3, 4, 5]
);

dropping buffer

A fixed size buffer of n slots. It will drop incoming items when there is no more space awailable.

await expect(
  pipeline(emitItems(0, 1, 2, 3, 4, 5), buffer(3, "dropping"), delay(1)),
  "to yield items",
  [0, 1, 2, 3]
);

Notice that because delay buffers one item, 0 is making it through without affecting the dropping.

sliding buffer

A fixed size buffer of n slots. It will drop outgoing items when there is no more space awailable.

await expect(
  pipeline(emitItems(0, 1, 2, 3, 4, 5), buffer(3, "sliding"), delay(1)),
  "to yield items",
  [0, 3, 4, 5]
);

Notice that because delay buffers one item, 0 is making it through without affecting the sliding.

chose

Choses a pipeline based on a given selector.

const { chose } = require("@transformation/core");

The selector is a function that returns a string deciding the pipeline to use.

If the selector is just a string, that pipeline will always be chosen.

await expect(
  pipeline(
    emitItems(0, 1, 2, 3, 4, 5, 6, 7, 8, 9),
    chose((n) => (n % 2 === 0 ? "even" : "odd"), {
      even: map((n) => n * 2),
      odd: map((n) => n * -2),
    })
  ),
  "to yield items",
  [0, -2, 4, -6, 8, -10, 12, -14, 16, -18]
);

Notice: because we need to keep the ordering, the pipeline in the individual cases will only process one item a time, so if you call toArray you will get an array of that item. But if you hard-code the choice as a string, it doesn't have this limitation.

cleanup

Executes a side-effect when the step is closing down.

import { cleanup } from "@transformation/core";

Notice that this step will run even when an exception happens in the pipeline.

This is useful for cleaning up resources after a pipeline it completed.

The below code snippet shows the execution order.

const items = [];

await program(
  emitItems(0, 1, 2, 3),
  forEach((item) => items.push(item)),
  cleanup(() => items.push(4)),
  cleanup(() => items.push(5))
);

expect(items, "to equal", [0, 1, 2, 3, 4, 5]);

debounce

Debounces the items passing this step to only emit an item when the given amount of milliseconds have passed. Other items will be skipped.

const { debounce } = require("transformation/core");

Here we generate items that is delayed by their amount. Then we debounce the input to only emit items when no input has been received for 40ms.

await expect(
  pipeline(
    emitItems(0, 1, 2, 50, 3, 4, 50, 5, 6),
    (ms) => pipeline(emitItems(ms), delay(ms)),
    debounce(40)
  ),
  "to yield items",
  [2, 4, 6]
);

deduplicate

Filters out items that is consecutive duplicates.

const { deduplicate } = require("@transformation/core");
await expect(
  pipeline(
    emitItems(0, 0, 4, 1, 1, 1, 1, 1, 2, 3, 0, 4, 1, 5, 7, 6, 7, 8, 9, 9),
    deduplicate()
  ),
  "to yield items",
  [0, 4, 1, 2, 3, 0, 4, 1, 5, 7, 6, 7, 8, 9]
);

deduplicateBy

Filters out items that is consecutive duplicates by a selected value.

const { deduplicateBy } = require("@transformation/core");

When given a string, it uses that field to determine if an item is different from the previous item.

await expect(
  pipeline(
    emitItems(
      { id: 0, name: "foo", count: 0 },
      { id: 0, name: "foo", count: 1 },
      { id: 1, name: "bar", count: 2 },
      { id: 2, name: "baz", count: 3 },
      { id: 2, name: "baz", count: 4 },
      { id: 3, name: "qux", count: 5 },
      { id: 2, name: "baz", count: 6 },
      { id: 0, name: "foo", count: 7 }
    ),
    deduplicateBy("id")
  ),
  "to yield items",
  [
    { id: 0, name: "foo", count: 1 },
    { id: 1, name: "bar", count: 2 },
    { id: 2, name: "baz", count: 3 },
    { id: 3, name: "qux", count: 5 },
    { id: 2, name: "baz", count: 6 },
    { id: 0, name: "foo", count: 7 },
  ]
);

You can also use a function to select the discriminating value.

await expect(
  pipeline(
    emitItems(
      { id: 0, name: "foo", count: 0 },
      { id: 0, name: "foo", count: 1 },
      { id: 1, name: "bar", count: 2 },
      { id: 2, name: "baz", count: 3 },
      { id: 2, name: "baz", count: 4 },
      { id: 3, name: "qux", count: 5 },
      { id: 2, name: "baz", count: 6 },
      { id: 0, name: "foo", count: 7 }
    ),
    deduplicateBy(({ name }) => name)
  ),
  "to yield items",
  [
    { id: 0, name: "foo", count: 1 },
    { id: 1, name: "bar", count: 2 },
    { id: 2, name: "baz", count: 3 },
    { id: 3, name: "qux", count: 5 },
    { id: 2, name: "baz", count: 6 },
    { id: 0, name: "foo", count: 7 },
  ]
);

defaults

Provide default values for any object.

const { defaults } = require("@transformation/core");
await expect(
  pipeline(
    emitItems(null, { value: "one" }, 2, { stuff: "three" }, 4),
    defaults({
      type: "object",
      metadata: {},
    })
  ),
  "to yield items",
  [
    null,
    { type: "object", metadata: {}, value: "one" },
    2,
    { type: "object", metadata: {}, stuff: "three" },
    4,
  ]
);

delay

Waits the given amount of milliseconds before emitting each item.

const { delay } = require("@transformation/core");
await expect(
  pipeline(emitItems(0, 1, 2, 3, 4, 5), delay(1)),
  "to yield items",
  [0, 1, 2, 3, 4, 5]
);

emitAll

Emits all the items in the given iterator into the pipeline.

Notice this step won't take any input, it only outputs the given items.

const { emitItems } = require("@transformation/core");

Emitting items from an array.

await expect(emitAll([0, 1, 2, 3, 4, 5]), "to yield items", [0, 1, 2, 3, 4, 5]);

Emitting items from an iterable.

function* iterable() {
  for (let i = 0; i < 6; i++) {
    yield i;
  }
}

await expect(emitAll(iterable()), "to yield items", [0, 1, 2, 3, 4, 5]);

Emitting items from an async iterable.

async function* asyncIterable() {
  for (let i = 0; i < 6; i++) {
    await sleep(1);
    yield i;
  }
}

await expect(emitAll(asyncIterable()), "to yield items", [0, 1, 2, 3, 4, 5]);

You can emit items from multiple iterable in the order they are given.

async function* asyncIterable() {
  for (let i = 0; i < 3; i++) {
    await sleep(1);
    yield i;
  }
}

await expect(
  emitAll(asyncIterable(), [3, 4, 5]),
  "to yield items",
  [0, 1, 2, 3, 4, 5]
);

emitItems

Emits the given items into the pipeline.

Notice this step wont take any input from the pipeline, it only outputs the given items.

const { emitItems } = require("@transformation/core");
await expect(emitItems(0, 1, 2, 3, 4, 5), "to yield items", [0, 1, 2, 3, 4, 5]);

emitRange

Emits the given range into the pipeline.

The range goes from start up to but not including end. You can specify a step that will decide delta between the values.

Notice this step wont take any input from the pipeline, it only outputs the given range.

When only given a positive number, it emits values from zero up to, but not including, that number.

const { emitRange } = require("@transformation/core");
await expect(emitRange(5), "to yield items", [0, 1, 2, 3, 4]);

When given a negative number, it emits values from zero down to, but not including, that number. numbers.

await expect(emitRange(-5), "to yield items", [0, -1, -2, -3, -4]);

When given a start and an end, where start is less than or equal to end, it emits values from start up to, but not including, end.

await expect(emitRange(2, 7), "to yield items", [2, 3, 4, 5, 6]);

When given a start and an end, where start is greater than end, it emits values from start down to, but not including, end.

await expect(emitRange(2, 7), "to yield items", [7, 6, 5, 4, 3]);

Finally you can also provide the step value.

await expect(emitRange(-5, 5, 3), "to yield items", [-5, -2, 1, 4]);

You can also provide a negative step.

await expect(emitRange(5, -5, -3), "to yield items", [5, 2, -1, -4]);

emitRepeat

Cycles the items the specified number of times.

import { emitRepeat } from "@transformation/core";
await expect(emitRepeat(["hi", "hey", "hello"], 5), "to yield items", [
  "hi",
  "hey",
  "hello",
  "hi",
  "hey",
]);

When only given a single item, it will be repeated the specified number of times.

await expect(emitRepeat("hi", 5), "to yield items", [
  "hi",
  "hi",
  "hi",
  "hi",
  "hi",
]);

If you don't specify the number items you want, it will keep emitting the forever. This can be useful together with delay to build polling as an example.

await expect(
  pipeline(emitRepeat(["hi", "hey", "hello"]), take(5)),
  "to yield items",
  ["hi", "hey", "hello", "hi", "hey"]
);

extend

It extends all items that are objects with the given description.

const { extend } = require("@transformation/core");
await expect(
  pipeline(
    emitItems(
      { firstName: "Jane", lastName: "Doe" },
      { firstName: "John", lastName: "Doe" }
    ),
    extend({
      type: "person",
      fullName: map(({ firstName, lastName }) => `${firstName} ${lastName}`),
      details: {
        nationality: "Danish",
        initials: ({ firstName, lastName }) => `${firstName[0]}${lastName[0]}`,
      },
    })
  ),
  "to yield items",
  [
    {
      type: "person",
      firstName: "Jane",
      lastName: "Doe",
      fullName: "Jane Doe",
      details: { nationality: "Danish", initials: "JD" },
    },
    {
      type: "person",
      firstName: "John",
      lastName: "Doe",
      fullName: "John Doe",
      details: { nationality: "Danish", initials: "JD" },
    },
  ]
);

frequencies

Counts frequencies of items in the pipeline.

const { frequencies } from '@transformation/core'

When given no arguments, it counts the items by identity.

await expect(
  pipeline(
    emitItems("foo", "bar", "baz", "qux", "qux", "baz", "qux", "foo"),
    frequencies()
  ),
  "to yield items",
  [{ foo: 2, bar: 1, baz: 2, qux: 3 }]
);

When given a field, it counts items by that field.

await expect(
  pipeline(
    emitItems(
      { id: 0, name: "foo" },
      { id: 1, name: "bar" },
      { id: 2, name: "baz" },
      { id: 3, name: "qux" },
      { id: 4, name: "qux" },
      { id: 5, name: "baz" },
      { id: 6, name: "qux" },
      { id: 7, name: "foo" }
    ),
    frequencies("name")
  ),
  "to yield items",
  [{ foo: 2, bar: 1, baz: 2, qux: 3 }]
);

When given a function, it counts items by the returned value.

await expect(
  pipeline(
    emitItems(
      { id: 0, name: "foo" },
      { id: 1, name: "bar" },
      { id: 2, name: "baz" },
      { id: 3, name: "qux" },
      { id: 4, name: "qux" },
      { id: 5, name: "baz" },
      { id: 6, name: "qux" },
      { id: 7, name: "foo" }
    ),
    frequencies(({ name }) => name[0])
  ),
  "to yield items",
  [{ f: 2, b: 3, q: 3 }]
);

parallel

Run the given step with the specified concurrency. If no concurrency is specified, it will default to 2 times the number of CPU's available.

Notice that we make sure to preserve the output order, so you can count on the output not changing order by using this step.

const { parallel } = require("@transformation/core");
await expect(
  pipeline(
    emitItems(5, 4, 3, 2, 1, 0),
    parallel(
      map(async (n) => {
        await sleep(n);
        return n + 1;
      }),
      4
    )
  ),
  "to yield items",
  [6, 5, 4, 3, 2, 1]
);

prependItems

Prepend the given items before all items in the pipeline.

const { prependItems } = require("@transformation/core");
await expect(
  pipeline(emitItems(6, 7, 8), prependItems(3, 4, 5), prependItems(0, 1, 2)),
  "to yield items",
  [0, 1, 2, 3, 4, 5, 6, 7, 8]
);

filter

Filter items with the given predicate.

const { filter } = require("@transformation/core");
await expect(
  pipeline(
    emitItems(0, 1, 2, 3, 4, 5),
    filter((n) => n % 2 === 0)
  ),
  "to yield items",
  [0, 2, 4]
);

flatMap

Maps each item with the given mapper, if a returned item is an array it emits the items individually.

const { flatMap } = require("@transformation/core");
await expect(
  pipeline(
    emitItems(0, 1, 2, 3, 4, 5),
    flatMap((n) => (n % 2 === 0 ? [n, n] : n))
  ),
  "to yield items",
  [0, 0, 1, 2, 2, 3, 4, 4, 5]
);

forEach

Performs a side-effect for each item.

const { forEach } = require("@transformation/core");
const items = [];

await program(
  emitItems(0, 1, 2, 3, 4, 5),
  forEach((item) => items.push(item))
);

expect(items, "to equal", [0, 1, 2, 3, 4, 5]);

fork

Forks the pipeline into two.

const { fork } = require("@transformation/core");
const forkedOutput = [];

await expect(
  pipeline(
    emitItems(0, 1, 2, 3, 4, 5),
    fork(
      map((n) => n * n),
      delay(10),
      forEach((n) => {
        forkedOutput.push(n);
      })
    ),
    filter((n) => n % 2 === 0)
  ),
  "to yield items",
  [0, 2, 4]
);

expect(forkedOutput, "to equal", [0, 1, 4, 9, 16, 25]);

fromJSON

Parses every items in the pipeline as JSON.

import { fromJSON } from "@transformation/core";
await expect(
  pipeline(
    emitItems('{ "foo": "bar", "year": 2000 }', "1", "{}", "true"),
    fromJSON()
  ),
  "to yield items",
  [{ foo: "bar", year: 2000 }, 1, {}, true]
);

groupBy

Groups all the items in the pipeline by a key.

Notice that this step will consume all items in the pipeline before emiting the groups.

const { groupBy } = require("@transformation/core");
await expect(
  pipeline(
    emitItems(0, 1, 2, 3, 4, 5, 6),
    groupBy((value) => (value % 2 === 0 ? "even" : "odd"))
  ),
  "to yield items",
  [
    { key: "even", items: [0, 2, 4, 6] },
    { key: "odd", items: [1, 3, 5] },
  ]
);

You can also give the groupBy a field to group objects by.

await expect(
  pipeline(
    emitItems(
      { symbol: "GOOG", price: 1349 },
      { symbol: "AAPL", price: 274 },
      { symbol: "AAPL", price: 275 },
      { symbol: "GOOG", price: 1351 },
      { symbol: "AAPL", price: 279 }
    ),
    groupBy("symbol")
  ),
  "to yield items",
  [
    {
      key: "GOOG",
      items: [
        { symbol: "GOOG", price: 1349 },
        { symbol: "GOOG", price: 1351 },
      ],
    },
    {
      key: "AAPL",
      items: [
        { symbol: "AAPL", price: 274 },
        { symbol: "AAPL", price: 275 },
        { symbol: "AAPL", price: 279 },
      ],
    },
  ]
);

You can transform the items of a group with withGroup.

interleave

Interleaves the given separators between the items in the pipeline.

const { interleave } = require("@transformation/core");
await expect(
  pipeline(emitItems("0", "1", "2", "3", "4", "5"), interleave(",")),
  "to yield items",
  ["0", ",", "1", ",", "2", ",", "3", ",", "4", ",", "5"]
);

When given multiple separators, they are cycled.

await expect(
  pipeline(emitItems("0", "1", "2", "3", "4", "5"), interleave(",", "-", "|")),
  "to yield items",
  ["0", ",", "1", "-", "2", "|", "3", ",", "4", "-", "5"]
);

join

Joins all the items in the pipeline into a string with a given separator.

const { join } = require("@transformation/core");
await expect(
  pipeline(emitItems(0, 1, 2, 3, 4, 5), join(" - ")),
  "to yield items",
  ["0 - 1 - 2 - 3 - 4 - 5"]
);

If you don't specify the separator it defaults to comma.

await expect(pipeline(emitItems(0, 1, 2, 3, 4, 5), join()), "to yield items", [
  "0,1,2,3,4,5",
]);

keyBy

Indexes each item into to an object by the selected keys.

const { keyBy } = require("@transformation/core");

When given a field, it indexes the items in the pipeline keyed by the given field.

await expect(
  pipeline(
    emitItems(
      { id: 0, name: "foo" },
      { id: 1, name: "bar" },
      { id: 2, name: "baz" },
      { id: 3, name: "qux" }
    ),
    keyBy("id")
  ),
  "to yield items",
  [
    {
      0: { id: 0, name: "foo" },
      1: { id: 1, name: "bar" },
      2: { id: 2, name: "baz" },
      3: { id: 3, name: "qux" },
    },
  ]
);

You can also provide a function the will be used to select the key to index by.

await expect(
  pipeline(
    emitItems(
      { id: 0, name: "foo" },
      { id: 1, name: "bar" },
      { id: 2, name: "baz" },
      { id: 3, name: "qux" }
    ),
    keyBy(({ name }) => name)
  ),
  "to yield items",
  [
    {
      foo: { id: 0, name: "foo" },
      bar: { id: 1, name: "bar" },
      baz: { id: 2, name: "baz" },
      qux: { id: 3, name: "qux" },
    },
  ]
);

map

Maps each item with the given mapper.

const { map } = require("@transformation/core");
await expect(
  pipeline(
    emitItems(0, 1, 2, 3, 4, 5),
    map((n) => n * n)
  ),
  "to yield items",
  [0, 1, 4, 9, 16, 25]
);

You can also get the index of the item being mapped.

await expect(
  pipeline(
    emitItems("zero", "one", "two", "three"),
    map((n, i) => `${i}: ${n}`)
  ),
  "to yield items",
  ["0: zero", "1: one", "2: two", "3: three"]
);

Finally in some situations it can be useful to map items into a step that emits new items.

await expect(
  pipeline(
    emitItems(0, 1, 2, 3, 4),
    map((n) => pipeline(emitRange(n), toArray()))
  ),
  "to yield items",
  [[], [0], [0, 1], [0, 1, 2], [0, 1, 2, 3]]
);

Notice that when you return a step from the mapper function, it will get no input, so it is only useful to emit new items into the pipeline.

memorize

Memorizes the given step.

const { memorize } = require("@transformation/core");
let i = 0;

await expect(
  pipeline(
    emitItems(0, 1, 2, 0, 1, 2, 0, 1, 2),
    memorize(map((v) => `${v}: ${i++}`))
  ),
  "to yield items",
  ["0: 0", "1: 1", "2: 2", "0: 0", "1: 1", "2: 2", "0: 0", "1: 1", "2: 2"]
);

You can specify the size of the LRU cache, by default it is unbounded.

let i = 0;

await expect(
  pipeline(
    emitItems(0, 1, 2, 2, 1, 0, 0, 1, 2),
    memorize(
      map((v) => `${v}: ${i++}`),
      { maxSize: 2 }
    )
  ),
  "to yield items",
  ["0: 0", "1: 1", "2: 2", "2: 2", "1: 1", "0: 3", "0: 3", "1: 1", "2: 4"]
);

You can specify a field to use for caching. By default it uses the identity function for computing the cache key.

let i = 0;

await expect(
  pipeline(
    emitItems(0, 1, 2, 0, 1, 2, 0, 1, 2),
    map((key) => ({ key, time: i++ })),
    memorize(
      map(({ key, time }) => `${key}: ${time}`),
      { key: "key" }
    )
  ),
  "to yield items",
  ["0: 0", "1: 1", "2: 2", "0: 0", "1: 1", "2: 2", "0: 0", "1: 1", "2: 2"]
);

Finally you can specify a function to compute the cache key.

let i = 0;

await expect(
  pipeline(
    emitItems(0, 1, 2, 0, 1, 2, 0, 1, 2),
    map((key) => ({ key, time: i++ })),
    memorize(
      map(({ key, time }) => `${key}: ${time}`),
      { key: (v) => v.key }
    )
  ),
  "to yield items",
  ["0: 0", "1: 1", "2: 2", "0: 0", "1: 1", "2: 2", "0: 0", "1: 1", "2: 2"]
);

partition

Partition items into groups of the given size.

const { partition } = require("@transformation/core");
await expect(
  pipeline(emitItems(0, 1, 2, 3, 4, 5, 6), partition(2)),
  "to yield items",
  [
    { key: "[0;1]", items: [0, 1] },
    { key: "[2;3]", items: [2, 3] },
    { key: "[4;5]", items: [4, 5] },
    { key: "[6;7]", items: [6] },
  ]
);

partitionBy

Partition items into groups by the given selector.

const { partitionBy } = require("@transformation/core");
await expect(
  pipeline(
    emitItems(
      { symbol: "GOOG", price: 1349 },
      { symbol: "AAPL", price: 274 },
      { symbol: "AAPL", price: 275 },
      { symbol: "GOOG", price: 1351 },
      { symbol: "AAPL", price: 279 }
    ),
    partitionBy("symbol")
  ),
  "to yield items",
  [
    {
      key: "GOOG",
      items: [{ symbol: "GOOG", price: 1349 }],
    },
    {
      key: "AAPL",
      items: [
        { symbol: "AAPL", price: 274 },
        { symbol: "AAPL", price: 275 },
      ],
    },
    {
      key: "GOOG",
      items: [{ symbol: "GOOG", price: 1351 }],
    },
    {
      key: "AAPL",
      items: [{ symbol: "AAPL", price: 279 }],
    },
  ]
);

You can also use a function to select the discriminating value.

await expect(
  pipeline(
    emitItems(
      { symbol: "GOOG", price: 1349 },
      { symbol: "AAPL", price: 274 },
      { symbol: "AAPL", price: 275 },
      { symbol: "GOOG", price: 1351 },
      { symbol: "AAPL", price: 279 }
    ),
    partitionBy(({ symbol }) => symbol)
  ),
  "to yield items",
  [
    {
      key: "GOOG",
      items: [{ symbol: "GOOG", price: 1349 }],
    },
    {
      key: "AAPL",
      items: [
        { symbol: "AAPL", price: 274 },
        { symbol: "AAPL", price: 275 },
      ],
    },
    {
      key: "GOOG",
      items: [{ symbol: "GOOG", price: 1351 }],
    },
    {
      key: "AAPL",
      items: [{ symbol: "AAPL", price: 279 }],
    },
  ]
);

pipeline

Turns multiple steps into a single step.

const { pipeline } = require("@transformation/core");
await expect(
  pipeline(
    emitItems(0, 1, 2, 3, 4, 5),
    pipeline(
      filter((n) => n % 2 === 0),
      false && map((n) => n * n)
    ),
    map((n) => `${n} elephants`)
  ),
  "to yield items",
  ["0 elephants", "2 elephants", "4 elephants"]
);

Plain functions will be interpreted as map.

await expect(
  pipeline(
    emitItems("  \nHere is some text\n  with multiple lines\n   "),
    (s) => s.trim(),
    (s) => s.split(/\n/),
    splitIterable(),
    (s) => s.trim(),
    (s, i) => s.replace(/^/, `${i + 1}) `)
  ),
  "to yield items",
  ["1) Here is some text", "2) with multiple lines"]
);

program

Runs all of the given steps until the output closes.

const { program } = require("@transformation/core");
const items = [];

await program(
  emitItems(0, 1, 2, 3, 4, 5),
  forEach((item) => items.push(item))
);

expect(items, "to equal", [0, 1, 2, 3, 4, 5]);

reduce

Reduces the given pipeline down to a single item using the given accumulator function and an initial value.

const { reduce } = require("@transformation/core");
await expect(
  pipeline(
    emitItems(0, 1, 2, 3, 4, 5),
    reduce((sum, n) => sum + n, 0)
  ),
  "to yield items",
  [15]
);

retry

Retries a series steps according to the given retry options.

Options

  • max (default 5) the maximum number of retries
  • delay (default 100) the delay basis for the retry
  • strategy (default exponential) can also be linear

The code snippet below will fetch todos with ids from 0 to 5. It a request fails it will retry at most 5 times where the delay of 100 is doubled for each retry.

const { retry } = require("@transformation/core");
await program(
  emitItems(0, 1, 2, 3, 4, 5),
  map(n => `https://jsonplaceholder.typicode.com/todos/${0}`),
  retry(
    map(url => fetch(url))
    map(res => res.json())
  ),
  tap()
)

You can override the defaults the following way:

await program(
  emitItems(0, 1, 2, 3, 4, 5),
  map(n => `https://jsonplaceholder.typicode.com/todos/${0}`),
  retry(
    { max: 3, delay: 300, strategy: 'linear' },
    map(url => fetch(url))
    map(res => res.json())
  ),
  tap()
)

reverse

Reverses all of the items in the pipeline and re-emits them one by one.

Notice that this step will consume all of the items in the pipeline.

const { reduce } = require("@transformation/core");
await expect(
  pipeline(emitItems(0, 1, 2, 3, 4, 5), reverse()),
  "to yield items",
  [5, 4, 3, 2, 1, 0]
);

skip

Skips the given number of items before it starts to emitting.

import { skip } from "@transformation/core";
await expect(
  pipeline(emitItems([0, 1, 2, 3, 4, 5]), skip(2)),
  "to yield items",
  [2, 3, 4, 5]
);

setup

Executes a side-effect when the step is initialize.

import { setup } from "@transformation/core";

This is useful for cases where you have a pipeline that will be initialized more than ones, but it needs some storage. Then you can initialize it in the setup step.

The below code snippet shows the execution order.

const items = [];

await program(
  setup(() => items.push(0)),
  setup(() => items.push(1))
  emitItems(2, 3, 4, 5),
  forEach(item => items.push(item)),
);

expect(items, "to equal", [0, 1, 2, 3, 4, 5]);

skipLast

Given a number n, it skips the last n items.

import { skipLast } from "@transformation/core";
await expect(
  pipeline(emitItems([0, 1, 2, 3, 4, 5]), skipLast(2)),
  "to yield items",
  [0, 1, 2, 3]
);

When given no argument, it skips the last item.

await expect(
  pipeline(emitItems([0, 1, 2, 3, 4, 5]), skipLast()),
  "to yield items",
  [0, 1, 2, 3, 4]
);

sort

Sorts all of the items in the pipeline and re-emits them one by one.

Notice that this step will consume all of the items in the pipeline.

const { sort } = require("@transformation/core");
await expect(
  pipeline(emitItems(0, 1, 2, 3, 5, 7, 8, 2, 3, 4, 5), sort()),
  "to yield items",
  [0, 1, 2, 2, 3, 3, 4, 5, 5, 7, 8]
);

If you give it a comparison function, it will use that to decide the sorting order.

await expect(
  pipeline(
    emitItems(0, 1, 2, 3, 5, 7, 8, 2, 3, 4, 5),
    sort((a, b) => b - a)
  ),
  "to yield items",
  [8, 7, 5, 5, 4, 3, 3, 2, 2, 1, 0]
);

sortBy

Sorts all of the items in the pipeline by the specified criteria and re-emits them one by one.

Notice that this step will consume all of the items in the pipeline.

const { sortBy } = require("@transformation/core");
await expect(
  pipeline(
    emitItems(
      { name: "hat", price: 10 },
      { name: "cat", price: 100 },
      { name: "chat", price: 0 }
    ),
    sortBy("price")
  ),
  "to yield items",
  [
    { name: "chat", price: 0 },
    { name: "hat", price: 10 },
    { name: "cat", price: 100 },
  ]
);

You can sort by multiple fields and control the direction of the sorted fields.

await expect(
  pipeline(
    emitItems(
      { name: "wat", price: 100 },
      { name: "hat", price: 10 },
      { name: "cat", price: 100 },
      { name: "chat", price: 0 },
      { name: "wat", price: 100 }
    ),
    sortBy("price:desc", "name:asc")
  ),
  "to yield items",
  [
    { name: "cat", price: 100 },
    { name: "wat", price: 100 },
    { name: "wat", price: 100 },
    { name: "hat", price: 10 },
    { name: "chat", price: 0 },
  ]
);

You can even use a comparison for full control.

await expect(
  pipeline(
    emitItems(
      { name: "twat", price: 100 },
      { name: "hat", price: 10 },
      { name: "cat", price: 100 },
      { name: "chat", price: 0 },
      { name: "wat", price: 100 }
    ),
    sortBy((a, b) => a.price - b.price, "name:asc")
  ),
  "to yield items",
  [
    { name: "chat", price: 0 },
    { name: "hat", price: 10 },
    { name: "cat", price: 100 },
    { name: "twat", price: 100 },
    { name: "wat", price: 100 },
  ]
);

splitIterable

Re-emits any array as individual items.

const { splitIterable } = require("@transformation/core");
await expect(
  pipeline(emitItems(0, [1, 2], [3, 4, 5]), splitIterable()),
  "to yield items",
  [0, 1, 2, 3, 4, 5]
);

take

Emits the given number of items from the pipeline.

import { take } from "@transformation/core";
await expect(
  pipeline(emitItems([0, 1, 2, 3, 4, 5]), take(3)),
  "to yield items",
  [0, 1, 2]
);

tap

Print items to the console.

const { tap } = require("@transformation/core");
await expect(
  pipeline(
    emitItems(
      { name: "twat", price: 100 },
      { name: "hat", price: 10 },
      { name: "cat", price: 100 },
      { name: "chat", price: 0 },
      { name: "wat", price: 100 }
    )
  ),
  tap(({ name, price }) => `${name}: ${price}`),
  sortBy("price"),
  "to yield items",
  [
    { name: "twat", price: 100 },
    { name: "hat", price: 10 },
    { name: "cat", price: 100 },
    { name: "chat", price: 0 },
    { name: "wat", price: 100 },
  ]
);
0
[1, 2]
[3, 4, 5]

When given a field selector, prints that field to the console.

await expect(
  pipeline(
    emitItems(
      { name: "hat", price: 10 },
      { name: "cat", price: 100 },
      { name: "chat", price: 0 }
    ),
    tap("name"),
    sortBy("price")
  ),
  "to yield items",
  [
    { name: "chat", price: 0 },
    { name: "hat", price: 10 },
    { name: "cat", price: 100 },
  ]
);
hat
cat
chat

When given a function selector, prints the selected output to the console.

await expect(
  pipeline(
    emitItems(
      { name: "hat", price: 10 },
      { name: "cat", price: 100 },
      { name: "chat", price: 0 }
    ),
    tap(({ name, price }) => `${name}: ${price}`),
    sortBy("price")
  ),
  "to yield items",
  [
    { name: "chat", price: 0 },
    { name: "hat", price: 10 },
    { name: "cat", price: 100 },
  ]
);
hat: 10
cat: 100
chat: 0

throttle

Throttles the items passing this step to only emit an item at most once per every given milliseconds. Other items will be skipped.

const { throttle } = require("transformation/core");

Here we generate items that is delayed by their amount. Then we throttle the input to only emit items every 40ms.

await expect(
  pipeline(
    emitItems(0, 1, 2, 50, 3, 4, 50, 5, 6),
    (ms) => pipeline(emitItems(ms), delay(ms)),
    throttle(40)
  ),
  "to yield items",
  [0, 50, 50]
);

toArray

Accumulates all items into an array.

const { toArray } = require("transformation/core");
await expect(
  pipeline(emitItems(0, 1, 2, 3, 4, 5), toArray()),
  "to yield items",
  [[0, 1, 2, 3, 4, 5]]
);

toJSON

JSON stringify every item in the pipeline.

import { toJSON } from "@transformation/core";
await expect(
  pipeline(emitItems({ foo: "bar", year: 2000 }, 1, {}, true), toJSON()),
  "to yield items",
  ['{"foo":"bar","year":2000}', "1", "{}", "true"]
);

All arguments will be forwarded to JSON.stringify.

await expect(
  pipeline(emitItems({ foo: "bar", year: 2000 }, 1, {}, true), toJSON(null, 2)),
  "to yield items",
  ['{\n  "foo": "bar",\n  "year": 2000\n}', "1", "{}", "true"]
);

transform

Transforms object trees by running part of the tree though transformations.

await expect(
  pipeline(
    emitItems(
      { symbol: "goog", price: { value: 1349, currency: "USD" } },
      { symbol: "aapl", price: { value: 274, currency: "USD" } },
      { symbol: "aapl", price: { value: 275, currency: "USD" } },
      { symbol: "goog", price: { value: 1351, currency: "USD" } },
      { symbol: "aapl", price: { value: 279, currency: "USD" } }
    ),
    transform({
      symbol: map((symbol) => symbol.toUpperCase()),
      price: { value: map((price) => price * 2) },
    })
  ),
  "to yield items",
  [
    { symbol: "GOOG", price: { value: 2698, currency: "USD" } },
    { symbol: "AAPL", price: { value: 548, currency: "USD" } },
    { symbol: "AAPL", price: { value: 550, currency: "USD" } },
    { symbol: "GOOG", price: { value: 2702, currency: "USD" } },
    { symbol: "AAPL", price: { value: 558, currency: "USD" } },
  ]
);

Only matching parts of an object it transformed.

await expect(
  pipeline(
    emitItems(
      { symbol: "goog", currency: "USD" },
      { symbol: "aapl", price: 274, currency: "USD" },
      "this is not an object",
      null,
      {
        name: "no symbol",
        price: 666,
        currency: "USD",
        nesting: { supported: "yes" },
      },
      { symbol: "aapl", price: 275, currency: "USD" }
    ),
    transform({
      symbol: map((symbol) => symbol.toUpperCase()),
      price: map((price) => `${price}`),
      nesting: {
        supported: map((symbol) => symbol.toUpperCase()),
      },
    })
  ),
  "to yield items",
  [
    { symbol: "GOOG", currency: "USD" },
    { symbol: "AAPL", price: "$274", currency: "USD" },
    "this is not an object",
    null,
    {
      name: "no symbol",
      price: "$666",
      currency: "USD",
      nesting: { supported: "YES" },
    },
    { symbol: "AAPL", price: "$275", currency: "USD" },
  ]
);

uniq

Filters out items that is not unique.

const { uniq } = require("transformations/core");

It records items that is already seen and filters out the items that has already been emitted.

await expect(
  pipeline(emitItems(0, 4, 1, 2, 3, 0, 4, 5, 7, 6, 7, 8, 9, 9), uniq()),
  "to yield items",
  [0, 4, 1, 2, 3, 5, 7, 6, 8, 9]
);

uniqBy

Filters out items that is not unique by a selected value.

const { uniqBy } = require("transformations/core");

It records items that is already seen and filters out the items that has already been emitted.

await expect(
  pipeline(
    emitItems(
      { id: 0, name: "foo", count: 0 },
      { id: 1, name: "bar", count: 1 },
      { id: 2, name: "baz", count: 2 },
      { id: 0, name: "foo", count: 3 },
      { id: 3, name: "qux", count: 4 },
      { id: 2, name: "baz", count: 5 }
    ),
    uniqBy("id")
  ),
  "to yield items",
  [
    { id: 0, name: "foo", count: 0 },
    { id: 1, name: "bar", count: 1 },
    { id: 2, name: "baz", count: 2 },
    { id: 3, name: "qux", count: 4 },
  ]
);

You can also use a function to select the discriminating value.

await expect(
  pipeline(
    emitItems(
      { id: 0, name: "foo", count: 0 },
      { id: 1, name: "bar", count: 1 },
      { id: 2, name: "baz", count: 2 },
      { id: 0, name: "foo", count: 3 },
      { id: 3, name: "qux", count: 4 },
      { id: 2, name: "baz", count: 5 }
    ),
    uniqBy(({ name }) => name)
  ),
  "to yield items",
  [
    { id: 0, name: "foo", count: 0 },
    { id: 1, name: "bar", count: 1 },
    { id: 2, name: "baz", count: 2 },
    { id: 3, name: "qux", count: 4 },
  ]
);

unless

Executes a sub pipeline when a given condition is not meet.

See when for the opposite computation.

const { unless } = require("transformation/core");

When given a predicate function, it executes the sub pipeline when the predicate is false.

await expect(
  pipeline(
    emitItems(0, 1, 2, 3, 4, 5, 6),
    unless(
      (n) => n % 2 === 0,
      map((n) => n * 2),
      map((n) => `${n} transformed`)
    )
  ),
  "to yield items",
  [0, "2 transformed", 2, "6 transformed", 4, "10 transformed"]
);

When given a boolean that is used to decide if the sub pipeline should be executed.

await expect(
  pipeline(
    emitItems(0, 1, 2, 3, 4, 5, 6),
    unless(
      true,
      map((n) => n * n)
    ),
    unless(
      false,
      map((n) => `${n} transformed`)
    )
  ),
  "to yield items",
  [
    "0 transformed",
    "1 transformed",
    "2 transformed",
    "3 transformed",
    "4 transformed",
    "5 transformed",
    "6 transformed",
  ]
);

when

Conditionally executes a sub pipeline.

See unless for the opposite computation.

const { when } = require("transformation/core");

When given a predicate function, it executes the sub pipeline when the predicate is true.

await expect(
  pipeline(
    emitItems(0, 1, 2, 3, 4, 5, 6),
    when(
      (n) => n % 2 === 0,
      map((n) => n * 2),
      map((n) => `${n} transformed`)
    )
  ),
  "to yield items",
  ["0 transformed", 1, "4 transformed", 3, "8 transformed", 5, "12 transformed"]
);

When given a boolean that is used to decide if the sub pipeline should be executed.

await expect(
  pipeline(
    emitItems(0, 1, 2, 3, 4, 5, 6),
    when(
      true,
      map((n) => n * n)
    ),
    when(
      false,
      map((n) => `${n} transformed`)
    )
  ),
  "to yield items",
  [0, 1, 4, 9, 16, 25, 36]
);

withGroup

Transform items in groups created by groupBy.

Notice that you can provide one or more transformation steps to withGroup.

const { withGroup } = require("transformation/core");

Here we attach labels to rows in stock groups.

await expect(
  pipeline(
    emitItems(
      { symbol: "GOOG", price: 1349 },
      { symbol: "AAPL", price: 274 },
      { symbol: "AAPL", price: 275 },
      { symbol: "GOOG", price: 1351 },
      { symbol: "AAPL", price: 279 }
    ),
    groupBy("symbol"),
    withGroup(extend({ label: ({ symbol, price }) => `${symbol}: ${price}` }))
  ),
  "to yield items",
  [
    {
      key: "GOOG",
      items: [
        { symbol: "GOOG", price: 1349, label: "GOOG: 1349" },
        { symbol: "GOOG", price: 1351, label: "GOOG: 1351" },
      ],
    },
    {
      key: "AAPL",
      items: [
        { symbol: "AAPL", price: 274, label: "AAPL: 274" },
        { symbol: "AAPL", price: 275, label: "AAPL: 275" },
        { symbol: "AAPL", price: 279, label: "AAPL: 279" },
      ],
    },
  ]
);

Utilities

takeAll

This function drains all items from a pipeline and returns them as an array.

const { takeAll } = require("transformation/core");
const items = await takeAll(
  emitItems(0, 1, 2, 3, 4, 5),
  map((x) => x * x)
);

expect(items, "to equal", [0, 1, 4, 9, 16, 25]);

Building new steps

Let's say we want to build a custom step that can't easily be built by composing the existing step. Then you can use the step function to create a custom step.

const { step } = require("transformation/core");

The step we will use for this example is one that duplicates all items.

const duplicate = () =>
  step(async ({ take, put, CLOSED }) => {
    while (true) {
      const value = await take();
      if (value === CLOSED) break;
      await put(value);
      await put(value);
    }
  });

await expect(
  pipeline(emitItems(0, 1, 2, 3, 4, 5), duplicate()),
  "to yield items",
  [0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
);

Notice that most custom steps can just be a composition of existing steps.

As an example let's make a step that averages numbers.

const average = () =>
  pipeline(
    toArray(),
    map((items) =>
      items.length === 0
        ? NaN
        : items.reduce((sum, n) => sum + n, 0) / items.length
    )
  );

await expect(
  pipeline(emitItems(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), average()),
  "to yield items",
  [4]
);