react-callbag-streams

apply streaming operators (like debounce) on your component state / props

Usage no npm install needed!

<script type="module">
  import reactCallbagStreams from 'https://cdn.skypack.dev/react-callbag-streams';
</script>

README


React Callbag Streams

tests coverage version


Use callbag operators on normal JS variables in React components. Easily debounce, flatten, throttle, maintain order of async operations (like data fetching), etc.

npm i react-callbag-streams
import { useStream } from 'react-callbag-streams'
import { debounce, filter, flatten, map, fromPromise } from 'callbag-common'


function App() {
  const [q, setQ] = useState('')
  const [info, loading] =
    useStream(q,                          // ๐Ÿ‘‰ take the stream of values of q
      debounce(200),                      // ๐Ÿ‘‰ debounce by 200 ms
      map(q => fromPromise(pokeInfo(q))), // ๐Ÿ‘‰ get the pokemon info (which is an async function)
      flatten,                            // ๐Ÿ‘‰ flatten requests (only keep one request in-flight)
    )

  return (
    <>
      <input type='text'
        placeholder='pokemon name ...'
        onInput={e => setQ((e.target as any).value)}/>
      <br/>
      <pre>
        { loading ? 'loading ...' : JSON.stringify(info, undefined, 2) }
      </pre>
    </>
  )
}

โ–บ Playground



Why?

Some extremely basic reactive programming stuff are weirdly difficult and migrain inducing in React, while they are pretty trivial with any reactive programming library.

react-callbag-streams provides hooks that allow you to treat variables as streams, use these super-convenient operators on the stream, and then treat the whole thing again as a plain variable (with also a loading indicator as a bonus).


Example: Request Ordering

Imagine you fetch data like this:

function App() {
  const [q, setQ] = useState('')
  const [info, setInfo] = useState({})
  
  useEffect(async () => {
    setInfo(await pokeInfo(q))
  }, [q])
  
  ...
}

Imagine you want to fetch info for "charizard" and then for "snorlax", but the responses to these requests come out of order. Now your query is for "snorlax" but you are displaying information for "charizard".

Fixing this issue is trivial with callbag-flatten:

import { flatten, map, fromPromise } from 'callbag-common'
import { useStream } from 'react-callbag-streams'

function App() {
  const [q, setQ] = useState('')
  const [info] = useStream(
    q,
    map(q => fromPromise(pokeInfo(q))),
    flatten
  )
  
  ...
}

Example: Debouncing

Debouncing, throttling, etc. become extremely easy operations when you treat your data as streams:

import { flatten, map, fromPromise, debounce } from 'callbag-common'
import { useStream } from 'react-callbag-streams'

function App() {
  const [q, setQ] = useState('')
  const [info] = useStream(
    q,
    debounce(200),
    map(q => fromPromise(pokeInfo(q))),
    flatten
  )
  
  ...
}

Why Callbags?

  • They are extremely lightweight. For example, callbag-merge is under 350B, while the merge() factory in RxJS is about 4.3KB in size.
  • They are pritty simple (for example compare callbag-map with RxJS's map()).
  • Callbags is a specification, not a library. This, combined with the simplicity, allows for fully decentralized and community-driven development of utilities and libraries, while for something like RxJS most of the utilities come from the library itself.



Usage

The whole point of this library is that it allows using callbag operators on variables inside React components. You can find a collection of useful and commonly used callbag operators here or here, find a list of available community operators here or here, or even easily create your own operators.


๐Ÿ‘‰ useStream() allows you to treat a variable as a stream:

import { useStream } from 'react-callbag-streams'
import { filter } from 'callbag-common'

function MyComp({ prop }) {
  //
  // ๐Ÿ‘‰ even will only have even values of prop, and won't be updated
  //    for its odd values.
  //
  const [even] = useStream(prop, filter(x => x % 2 === 0))
  
  ...
}
import { useStream } from 'react-callbag-streams'
import { debounce } from 'callbag-common'

function MyComp({ prop }) {
  //
  // ๐Ÿ‘‰ debounced will be the latest value of prop, 200ms after its last change.
  //    it will not be updated while prop is changing at intervals shorter than 200ms.
  //
  const [debounced] = useStream(prop, debounce(200))
  
  ...
}
import { useStream } from 'react-callbag-streams'
import { map, fromPromise, flatten } from 'callbag-common'

function MyComp({ prop }) {
  //
  // ๐Ÿ‘‰ fetched will be the result of asyncFetch() for latest value of prop,
  //    even if values for asyncFetch come out of order.
  //
  const [fetched] = useStream(prop, map(p => fromPromise(asyncFetch(p))), flatten)
  
  ...
}

๐Ÿ‘‰ useStream() also provides a loading indicator. The loading indicator is true between the time that the source variable changes until the next emission of the stream.

const [fetched, loading] = useStream(prop, map(p => fromPromise(asyncFetch(p))), flatten)

โšกโšก Checkout this real-life example.


Working Multiple Streams

๐Ÿ‘‰ useMergedStream() allows you to treat multiple variables as one stream. Whenever any of the variables has a new value, the stream will emit that value.

import { useMergedStream } from 'react-callbag-streams'
import { debounce } from 'callbag-common'

function MyComp() {
  const [a] = useState()
  const [b] = useState()
  
  //
  // ๐Ÿ‘‰ merged will be the latest value of either a or b (based on which changed later),
  //    200ms after the latest change to either.
  //
  const [merged] = useMergedStream([a, b], debounce(200))
  
  ...
}

๐Ÿ‘‰ useCombinedStream() is similar to useMergedStream(), except that it emits an array of latest values of all provided variables, every time any of them changes:

import { useCombinedStream } from 'react-callbag-streams'

//
// this app finds repositories on github based on a query and language
//
function App() {
  const [q, setQ] = useState('')
  const [l, setL] = useState('javascript')

  const [repos, loading] = useCombinedStream(
    [q, l],                                             // ๐Ÿ‘‰ a combined stream of query and language
    filter(([q]) => q.length >= 2),                     // ๐Ÿ‘‰ filter out when query is too short
    debounce(1000),                                     // ๐Ÿ‘‰ debounce the combo by a second (github API will block us o.w.)
    map(([q, l]) => fromPromise(search(q, l))),         // ๐Ÿ‘‰ search in github api using query and language
    flatten,                                            // ๐Ÿ‘‰ flatten the stream (preserve order of responses)
    map(res =>                                          // ๐Ÿ‘‰ format the incoming result ...
      res.items.map(item =>                             // .. take each repository ...
        ({ name: item.name, url: item.html_url })       // .. get its name and its url
      )
    ),
  )

  return <>
    <input type='text'
      placeholder='keywords ....'
      value={q}
      onInput={e => setQ((e.target as any).value)}/>
    <input type='text'
      placeholder='language'
      value={l}
      onInput={e => setL((e.target as any).value)}/>
    <br/>
    { q.length >= 2 ?
      (
        loading ? 'loading ...' : (
          <ul>
            { repos?.map(repo => (
              <li key={repo.url}><a href={repo.url}>{repo.name}</a></li>
            ))}
          </ul>
        )
      ) : ''
    }
  </>
}

โ–บ Playground


Working with Callbags Directly

๐Ÿ‘‰ useSource() provides access to the underlying callbags created from variables / parameters. It is useful for situations where advanced stream combination is needed:

import { useStream, useSource } from 'react-callbag-streams'

import fetch from './fetch'

//
// in this example we have a list of entries that is fetched in a paginated manner,
// and is based on a given query.
//
// - when the user changes the query, we want the pagination to reset and result set to be refreshed
// - when the user loads more content, we want them to be added to current data
//
function App() {
  const [query, setQuery] = useState('')
  const [page, setPage] = useState(0)

  const page$ = useSource(page)                            // ๐Ÿ‘‰ direct access to a stream from values of page
  const [entries, loading] = useStream(
    query,                                                 // ๐Ÿ‘‰ for each query
    tap(() => setPage(0)),                                 // .. reset the page counter
    map(q =>                                               // .. and map the query to a query-specific sub-stream
      pipe(
        combine(of(q), page$),                             // ๐Ÿ‘‰ combine query value and values from page stream
        map(([q, p]) => fromPromise(fetch(q, p))),         // ๐Ÿ‘‰ for each new pair, fetch data
        flatten,
        scan((all, page) => [...all, ...page], []),        // ๐Ÿ‘‰ accumulate results (this is query-specific stream)
      )
    ),
    flatten                                                // ๐Ÿ‘‰ flatten the query-speicifc substream
  )

  return <>
    <input
      type='text'
      placeholder='type something ...'
      onInput={e => setQuery((e.target as any).value)}
    />
    { entries?.map(entry => <div>{entry}</div>) : '' }
    <br />
    <button onClick={() => setPage(page + 1)}>Load More</button>
  </>
}

โ–บ Playground


๐Ÿ‘‰ useSignal() provides a callbag source you can utilize in your streams. In the pagination example provided above, instead of a state for the page, we can have a loadMoreSignal and calculate the page in the stream accordingly:

import { useStream, useSignal } from 'react-callbag-streams'

// ...

function App() {

  // ...
  
  const [loadMoreSignal, loadMore] = useSignal()
  
  const [entries, loading] = useStream(
    query,
    map(q =>
      pipe(
        combine(
          of(q),
          pipe(                                      
            loadMoreSignal,                          // ๐Ÿ‘‰ so for each signal
            scan(p => ++p, 0),                       // .. we increase the page number
            startWith(0)                             // .. starting with 0
          )
        ),
        map(([q, p]) => fromPromise(fetch(q, p))),
        flatten,
        scan((all, page) => [...all, ...page], []),
      )
    ),
    flatten
  )
  
  return (
  
    // ...
    
    <button onClick={() => loadMore()}>Load More</button>
  )
}

โ–บ Playground


โš ๏ธ Note that useSource() and useSignal() return callbags (basically raw streams) and not plain values, so you cannot use their returned result as raw values. For turning them into raw values, you can simply use the useCallbag() hook:

import useCallbag from 'use-callbag'

// ...

const [signal, loadMore] = useSignal()
const page = useCallbag(
  0,
  (initial) =>
    pipe(signal, scan(p => ++p, initial))
)

// ...

return (
  // ...
  <label>Page: {page}</label>
  // ...
)

Essentially, useStream() is equivalent to a useSource() followed by a useCallbag():

const [streamed] = useStream(prop, ...)

// -- OR --

const stream = useSource(prop)
const streamed = useCallbag(undefined, () => pipe(stream, ...))



Contribution

Be nice and respectful, more importantly super open and welcoming to all.

๐Ÿ‘‰ Useful commands for working on this repo:

git clone https://github.com/loreanvictor/react-callbag-streams.git
npm i              # --> install dependencies
npm start          # --> serves `samples/index.tsx` on localhost:3000
npm test           # --> run all tests
npm run cov:view   # --> view code coverage