README
live-set
This class is basically a Set with a subscribe() method that calls your callback after changes happen to the set of values.
This class represents a set of values which may change over time or have transformations applied to it, resulting in a new LiveSet. After modifications are made, notifications will be delivered asynchronously, like Javascript Promises or MutationObservers do. This library is inspired by the Kefir library and the Observable proposal, but represents a changing set rather than a stream of a single updating value.
Like Kefir Observables, LiveSets have active and inactive states and start out in the inactive state. When a LiveSet first gets a subscriber, the LiveSet becomes active and calls the listen function provided to it. When a LiveSet no longer has any subscribers, it becomes deactivated. Unlike Kefir Observables, events are delivered asynchronously, and a LiveSet instance may have its current values queried at any time.
Here's an example of a liveset being made to represent the direct children of an HTMLElement, using a MutationObserver to keep the liveset updated:
import LiveSet from 'live-set';
function createElementChildLiveSet(element) {
return new LiveSet({
// Function to determine the LiveSet's values if they're read while the
// LiveSet is inactive.
read() {
return new Set(Array.from(element.children));
},
// Function to call when the LiveSet is activated.
listen(setValues, controller) {
// setValues must called before the listen function returns to set the
// initial values of the liveset. (We pass the initial values to a
// function like this rather than returning them from the listen function
// in order to support recursive livesets.)
setValues(this.read());
// The controller object has add and remove methods which may modify the
// set of values after setValues has been called. It also has an end
// method which may be called to signify that no further changes will be
// made, allowing the LiveSet to discard its subscribers.
function changesHandler(mutations) {
mutations.forEach(mutation => {
Array.prototype.forEach.call(mutation.addedNodes, child => {
if (child.nodeType === 1) controller.add(child);
});
Array.prototype.forEach.call(mutation.removedNodes, child => {
if (child.nodeType === 1) controller.remove(child);
});
});
}
const observer = new MutationObserver(changesHandler);
observer.observe(element, {childList: true});
// The listen function may return an unsubscribe callback, or an object
// containing an unsubscribe callback and a pullChanges callback.
return {
unsubscribe() {
// When the LiveSet has no more subscribers, disconnect the
// MutationObserver instance so that it doesn't continue to call
// `changesHandler` wastefully.
observer.disconnect();
},
pullChanges() {
// LiveSet instances generally deliver their change notifications
// asynchronously, but they can be forced to flush all queued up
// notifications synchronously. If a LiveSet is based on a source
// with the same behavior, such as a MutationObserver or another
// LiveSet,
changesHandler(observer.takeRecords());
}
};
}
});
}
const bodyChildren = createElementChildLiveSet(document.body);
console.log(bodyChildren.values());
// Example output:
// Set { <header>, <article>, <div>, <div>, <footer> }
const subscription = bodyChildren.subscribe(changes => {
console.log(changes);
// If an element is added directly to the page body, this would log output like this:
// [{type: 'add', value: <div> }]
});
// We can later unsubscribe the above callback when we're no longer interested:
subscription.unsubscribe();
The LiveSet instance could then be passed around, read, and subscribed to. New LiveSets can be created by transforming existing LiveSets. Here's an example building off of the above example:
import filter from 'live-set/filter';
const bodyChildrenNoDivs = filter(bodyChildren, el => el.nodeName !== 'DIV');
console.log(bodyChildrenNoDivs.values());
// Example output:
// Set { <header>, <article>, <footer> }
// bodyChildrenNoDivs can be subscribed to too.
import flatMap from 'live-set/flatMap';
// This liveset will contain all of the children of the elements in the
// bodyChildrenNoDivs liveset, and it will stay up-to-date!
const bodyChildrenNoDivsChildren = flatMap(bodyChildrenNoDivs, el => createElementChildLiveSet(el));
// Example output:
// Set { <nav>, <h1>, <p>, <p>, <p>, <div>, <div> }
Like Kefir Observables, the activation model means that intermediate LiveSets can be created and consumed without needing to be explicitly cleaned up after the output liveset is unsubscribed from. Consider the following code in which several livesets are created from other livesets:
import LiveSet from 'live-set';
import map from 'live-set/map';
import merge from 'live-set/merge';
// i1 is a liveset which starts out containing the number 5, and every second
// while active the subsequent number is added to it.
const i1 = new LiveSet({
// Don't bother making `i1` support having its values read while inactive.
read() {
throw new Error('not implemented');
},
// This will be called when `i1` becomes active.
listen(setValues, controller) {
setValues(new Set([5]));
let i = 6;
const t = setInterval(() => {
controller.add(i);
i++;
}, 1000);
// This will be called when `i1` becomes inactive. We can return a function
// here rather than an object if we only have a unsubscribe callback.
return () => {
clearInterval(t);
};
}
});
// i2 is a liveset which always contains the same values as i1 but multiplied
// by 10.
const i2 = map(i1, x => x*10);
// final is a liveset which always contains all of the values of i2 and the
// value 1.
const final = merge([
i2,
LiveSet.constant(new Set([1]))
]);
const subscription = final.subscribe({
start() {
// Note that whenever we are subscribing and reading the values of a
// liveset, we call the .values() method on the liveset only after we have
// subscribed to it, so that it's activated first. If we call .values()
// before `final` is active, then its values will be computed twice, once
// on the .values() call and then again when we subscribe to `final`.
//
// Additionally, the input liveset `i1` above was not constructed with a
// working read() function, so calling .values() on final before it's
// activated would fail.
console.log('All values', Array.from(final.values()));
},
next(changes) {
console.log('changes', changes);
}
});
setTimeout(() => {
// This will end our subscription to `final`, and `final` will become
// inactive since it has no more subscriptions. `final` will end its
// subscription to `i2`, which will then become inactive and end its
// subscription to `i1`, which will become inactive and call `clearInterval`,
// preventing the interval timer from being left running forever uselessly.
subscription.unsubscribe();
}, 3500);
/* console output:
All values [ 50, 1 ]
changes [ { type: 'add', value: 60 } ]
changes [ { type: 'add', value: 70 } ]
changes [ { type: 'add', value: 80 } ]
*/
The ability to read the values of an inactive LiveSet is provided for convenience, but in some situations it has surprising results if transformations are not pure (including the case where they instantiate objects and their identities are depended on). Consider the following code:
import LiveSet from 'live-set';
import map from 'live-set/map';
const input = LiveSet.constant(new Set([5, 6]));
const mapped = map(input, x => ({value: x}));
const firstValue1 = Array.from(mapped.values())[0];
console.log(firstValue1); // {value: 5}
const firstValue2 = Array.from(mapped.values())[0];
console.log(firstValue2); // {value: 5}
console.log(firstValue1 === firstValue2); // false
The mapped
LiveSet while inactive does not keep a cache of the results of the
transformed input values. It could only know to remove them if it subscribed
to the input liveset, but that could cause input
to keep resources open. The
mapped
LiveSet will only become active and trigger a subscribtion to input
if it is subscribed to first. Here we subscribe to it with an empty observer to
demonstrate the difference:
import LiveSet from 'live-set';
import map from 'live-set/map';
const input = LiveSet.constant(new Set([5, 6]));
const mapped = map(input, x => ({value: x}));
mapped.subscribe({});
const firstValue1 = Array.from(mapped.values())[0];
const firstValue2 = Array.from(mapped.values())[0];
console.log(firstValue1 === firstValue2); // true
API
Core
LiveSet::constructor
LiveSet<T>::constructor({scheduler?, read, listen})
The constructor must be passed an object containing read
and listen
functions, and optionally a scheduler
property of the Scheduler type.
The read
function is called if the values() method is called on the LiveSet
instance while it is inactive but not yet ended. The read
function is
expected to return a Set object containing the LiveSet's current values. If a
LiveSet is not intended to be read while inactive, then you should give a
function which throws an error.
The listen
function is called whenever the LiveSet is activated. Activation
occurs whenever the LiveSet goes from zero to one subscribers. Activation may
happen multiple times for a LiveSet if it is unsubscribed from and resubscribed
to. The listen
function is passed two parameters, setValues
and
controller
.
setValues
is a function that must be called with the initial values as a Set
before the passed listen
function ends and before any new subscriptions are
added to the LiveSet being activated.
controller
is an object with three methods, add(value)
, remove(value)
,
error(error: any)
and end()
. These are to be used to modify the LiveSet's
values. Do not modify the Set originally passed to setValues
to manipulate the
LiveSet. end()
may be called to signify that the LiveSet will have no more
changes; the LiveSet will become frozen with its current values at that point.
References to subscribers will be released when a LiveSet is ended. The error
function ends the LiveSet and delivers an error value to any current
subscribers.
The listen
function may return a function to call upon deactivation, or an
object with an unsubscribe
method (to call upon deactivation) and optionally
a pullChanges
method. The pullChanges method will be called to flush any
changes from the source when the values()
method is called on the LiveSet, or
the pullChanges
method is called on a LiveSetSubscription. If the listen
function subscribes to a LiveSet, then it may be useful to have the listen
function return the LiveSetSubscription, which has unsubscribe and pullChanges
methods.
LiveSet.constant
LiveSet.constant<T>(values: Set<T>, options?: Object): LiveSet<T>
This creates a LiveSet with a set of values that will never change. The LiveSet will start in the ended state, and therefore will never deliver change notifications or keep references to subscribers.
The optional options
parameter may have a scheduler
property specifying the
Scheduler instance to use.
LiveSet.active
LiveSet.active<T>(initialValues?: Set<T>, options?: Object): {liveSet: LiveSet<T>, controller: LiveSetController<T>}
This is a convenience method to create a LiveSet that starts out in the
activated state and never exits the activated state. The new LiveSet and its
controller (the same type as passed to the listen
callback passed to the
constructor) are returned.
Be warned that this function eschews the normal activation/deactivation lifecycle of LiveSets. If the LiveSet requires some resource to be held open to keep it populated, then you will not be able to auto-close the resource when the LiveSet loses its subscribers. You will have to provide your own mechanism to close the resource manually if necessary.
The optional options
parameter may have a scheduler
property specifying the
Scheduler instance to use.
This function is inspired by the nonstandard "Promise.defer()" function that some Promise libraries have implemented.
LiveSet.defaultScheduler
LiveSet.defaultScheduler: Scheduler
This is the Scheduler object used by default for new LiveSets.
LiveSet::isEnded
LiveSet<T>::isEnded(): boolean
This returns whether the LiveSet is in the ended state. LiveSets in the ended state will never have their values change, deliver any change notifications, or keep references to their subscribers.
LiveSet::getScheduler
LiveSet<T>::getScheduler(): Scheduler
Retrieve the Scheduler object that a LiveSet was instantiated with. This may be used so that a new LiveSet can be instantiated with the same Scheduler.
LiveSet::values
LiveSet<T>::values(): Set<T>
This returns a Set containing all of the LiveSet's current values at the time of the method call. If the LiveSet is modified, then previously-returned Set objects will not include the modifications. The Set object return by the values() method must not be modified.
If the LiveSet is currently inactive, then this will trigger the read
function passed to the constructor to be called. If the LiveSet is currently
active, then this will trigger the pullChanges
function returned by the
constructor's listen
function if present.
LiveSet::subscribe
LiveSet<T>::subscribe(observer): LiveSetSubscription
This function is used to subscribe to change notifications from the LiveSet.
The observer parameter must either be an Observer object with optional start
,
next
, error
, and complete
functions, or a function which is treated as an
Observer object with that function as the next
method. The subscribe method
returns a LiveSetSubscription object.
The start
function is called when the subscription first starts, before the
subscribe call has returned, and it is passed a reference to the
LiveSetSubscription object which will be returned. During the start
function,
the LiveSet being subscribed to is guaranteed to be active, so it's a good time
to read the current values of the LiveSet with the values() method.
The next
function is called after any changes have been made to the LiveSet's
set of values. These changes notifications are delivered either asynchronously,
or whenever change notifications are flushed early due to a LiveSet::values()
or LiveSetSubscription::pullChanges()
call. The parameter passed to the
next
function is an array of LiveSetChangeRecord objects.
The error
function is called if the LiveSet is ended by a call to
controller.error
, and it's passed the value passed to the controller.error
method.
The complete
function is called if the LiveSet is ended by a call to
controller.end
.
If either the error
or complete
function is called, then there will be no
more calls to any of the observer's functions after that.
This function is intended to be compatible with the Observable subscribe method of the Observable proposal.
LiveSetSubscription::closed
LiveSetSubscription::closed: boolean
This is true if the LiveSet has ended, or the subscription has been unsubscribed from.
LiveSetSubscription::unsubscribe
LiveSetSubscription::unsubscribe(): void
This immediately unsubscribes the subscription. None of the observer functions will be called after unsubscription.
LiveSetSubscription::pullChanges
LiveSetSubscription::pullChanges(): void
This will cause any queued change notifications to be immediately flushed to
this subscription's observer's next
function. This will not affect other
subscriptions to the LiveSet.
This is a simple object with a type
property and an optional value
property. An array of LiveSetChangeRecord objects is passed to the next
callback of a LiveSet::subscribe call.
{type: 'add', value: T} |
{type: 'remove', value: T} |
{type: 'end'};
Transformations
The following functions usually take a pre-existing LiveSet instance as input, and return a new LiveSet instance. These functions are implemented in separate modules rather than as methods of LiveSet in part so that only the functions used have to be included in a javascript bundle built for browsers.
For all functions below which produce a LiveSet, the LiveSet uses the same scheduler as the input LiveSet or the same as the first input LiveSet if multiple are given.
live-set/filter
filter<T>(liveSet: LiveSet<T>, cb: (value: T) => any): LiveSet<T>
This creates a LiveSet that contains only the values of the input liveSet
for which they given callback function returns a truthy value for.
live-set/map
map<T,U>(liveSet: LiveSet<T>, cb: (value: T) => U): LiveSet<U>
This creates a LiveSet that contains the result of cb(value)
for each value
in the input liveSet
instead of the original values. The callback will only
be called for the initial values and when values are added; the callback will
not be called when a value is removed.
The behavior is undefined if the callback returns the same value for distinct
input values present in the input liveSet
at the same time.
live-set/transduce
transduce(liveSet: LiveSet<any>, transducer: Function): LiveSet<any>
This creates a new LiveSet based on a transformation implemented by the given transducer function. It supports any transducers implementation that follows the transducer protocol, for example jlongster/transducers.js or cognitect-labs/transducers-js. To learn more about transducers please visit those libraries' pages.
Transducers are recommended to be used to replace any sequence of multiple map or filter function calls. The use of transducers removes the need for intermediate LiveSets to be created.
Note that each input value from the liveSet
passed to the transducer is
expected to immediately map to zero or more values. This mapping is remembered
so that if the input value is later removed from the input liveSet
, then the
associated output values are all removed from the output LiveSet. This is fine
for any combination of common transducers such as map(cb)
, filter(cb)
, and
take(n)
, but transducers which produce a many-to-one relationship between
values such as partition(n)
will not function in a sensible manner.
The behavior is undefined if the transducer outputs equal values to be present in the output LiveSet at the same time.
live-set/merge
merge<T>(liveSets: Array<LiveSet<T>>): LiveSet<T>
This function takes an array of LiveSets and returns a single LiveSet containing all of their values.
The behavior is undefined if multiple input LiveSets contain the same value at the same time.
live-set/flatMap
flatMap<T,U>(liveSet: LiveSet<T>, cb: (value: T) => LiveSet<U>): LiveSet<U>
This function calls the given callback function for each value in the input
liveSet
, and merges the values of all returned LiveSets into one LiveSet.
When a new value is added to the input liveSet
, then the callback will be
called a new LiveSet's values will be merged in. When a value is removed from
the input liveSet
, then the values from the LiveSet created for that value
will be removed from the output LiveSet.
The behavior is undefined if any of the LiveSets returned by the callback contain equal values at the same time.
live-set/flatMapR
flatMapR<T,U>(liveSet: LiveSet<T>, cb: (value: T) => LiveSet<U>): LiveSet<U>
This function is the same as flatMap
, but it should be used for fully correct
LiveSetSubscription::pullChanges
behavior in recursive situations where it is
expected that a new item being emitted to the output set will cause a new item
to be added to the input set (and therefore potentially more items to the
output set again).
live-set/mapWithRemoval
mapWithRemoval<T,U>(input: LiveSet<T>, cb: (value: T, removal: Promise<void>) => U): LiveSet<U>
This is similar to the live-set/map function, but the callback is also passed a
promise that will resolve when the value is removed from the input liveSet
.
The LiveSet returned by this function may not have values()
called on it
while it is inactive.
The behavior is undefined if the callback returns the same value for distinct
input values present in the input liveSet
at the same time.
live-set/toValueObservable
toValueObservable<T>(liveSet: LiveSet<T>): Observable<{value: T, removal: Promise<void>}>
This will return an Observable
instance which upon subscription will emit a {value, removal}
object for
every value
currently in the input liveSet
where removal
is a Promise
which will resolve after the value
is removed from the input liveSet
.
live-set/Scheduler
Scheduler::constructor()
A scheduler object has a schedule(callback)
method which schedules a callback
to run asynchronously, and a flush()
method to run all currently scheduled
callbacks early.
Bundling Note
To use this module in browsers, a CommonJS bundler such as Browserify or Webpack should be used.
LiveSet's code adds additional checks in some places if process.env.NODE_ENV
is not set to "production". If you're using Browserify, then setting the
NODE_ENV environment variable to "production" during build is enough to disable
these checks. Instructions for other bundlers can be found in React's
documentation,
which uses the same convention.
The additional checks make sure that the Set passed to setValues
and the Set
returned from the values()
method are not modified.
Browser support for Map
, Set
, and Promise
is required. Load a polyfill
for these (such as @babel/polyfill)
if you need to support browsers that don't natively support these.
Types
Both TypeScript and Flow type definitions for this module are included! The type definitions won't require any configuration to use.