@ceramicnetwork/transport-subject

Message-based communication transport as a RxJS Subject

Usage no npm install needed!

<script type="module">
  import ceramicnetworkTransportSubject from 'https://cdn.skypack.dev/@ceramicnetwork/transport-subject';
</script>

README

TransportSubject

Message-based communication transport as a RxJS Subject

Installation

npm install @ceramicnetwork/transport-subject

Usage

import { TransportSubject } from '@ceramicnetwork/transport-subject'
import { Subscriber, interval } from 'rxjs'
import { map } from 'rxjs'

type Message = { type: string }

class MyTransport extends TransportSubject<Message> {
  constructor(time = 1000) {
    const source = interval(time).map(() => ({ type: 'ping' }))
    const sink = new Subscriber((message) => {
      console.log('send message', message)
    })
    super(source, sink)
  }
}

const transport = new MyTransport()
transport.subscribe((message) => {
  console.log('received message', message)
})
transport.next({ type: 'pong' })

Types

Wrapped

type Wrapped<Message, Namespace extends string> = { __tw: true; msg: Message; ns: Namespace }

Wrapper

type Wrapper<MsgIn, MsgOut, WrappedOut> = {
  wrap: (msg: MsgOut) => WrappedOut
  unwrap: (input: any) => MsgIn
}

UnwrapOperatorOptions

type UnwrapOperatorOptions = {
  onInvalidInput?: (input: unknown, error: Error) => void
  throwWhenInvalid?: boolean
}

API

TransportSubject class

Extends RxJS Subject class

Type parameters

  1. MsgIn: the type of the messages coming in from the source
  2. MsgOut = MsgIn: the type of the messages going out to the sink

new TransportSubject()

Arguments

  1. source: Observable<MsgIn>
  2. sink: Observer<MsgOut>

.next()

Arguments

  1. message: MsgOut

Returns void

createWrap()

Type parameters

  1. MsgOut
  2. Namespace extends string = string

Arguments

  1. namespace: Namespace

Returns (msg: MsgOut) => Wrapped<MsgOut, Namespace>, see Wrapped type

createUnwrap()

Type parameters

  1. MsgIn
  2. Namespace extends string = string

Arguments

  1. namespace: Namespace

Returns (input: any) => MsgIn

createWrapper()

Combines createWrap() and createUnwrap()

Type parameters

  1. MsgIn: the type of the messages coming in from the returned transport
  2. MsgOut = MsgIn: the type of the messages pushed to the returned transport
  3. Namespace extends string = string

Arguments

  1. namespace: Namespace

Returns Wrapper<MsgIn, MsgOut, Wrapped<MsgOut, Namespace>>, see Wrapper and Wrapped types

createUnwrapOperator()

Type parameters

  1. WrappedIn: the type of the messages coming in from the input source
  2. MsgIn: the type of the messages coming in from the returned observable

Arguments

  1. unwrap: (input: any) => MsgIn
  2. options?: UnwrapOperatorOptions = {}

Returns OperatorFunction<WrappedIn, MsgIn>

createWrapObserver()

Type parameters

  1. MsgOut: the type of the messages pushed to the returned observer
  2. WrappedOut: the type of the messages going out to the input sink

Arguments

  1. sink: Observer<WrappedOut>
  2. wrap: (msg: MsgOut) => WrappedOut

Returns Observer<MsgOut>

createWrappedTransport()

Combines createUnwrapObservable() and createWrapObserver() in a TransportSubject

Type parameters

  1. MsgIn: the type of the messages coming in from the returned transport
  2. MsgOut: the type of the messages pushed to the returned transport
  3. WrappedIn: the type of the messages coming in from the input transport source
  4. WrappedOut = WrappedIn: the type of the messages going out to the input transport sink

Arguments

  1. transport: TransportSubject<WrappedIn, WrappedOut>
  2. wrapper: MessageWrapper<MsgIn, MsgOut, WrappedOut>
  3. options?: UnwrapObservableOptions = {}

Returns TransportSubject<MsgIn, MsgOut>

createNamepacedTransport()

Combines createWrappedTransport() and createWrapper()

Type parameters

  1. MsgIn: the type of the messages coming in from the returned transport
  2. MsgOut = MsgIn: the type of the messages pushed to the returned transport
  3. Namespace extends string = string

Arguments

  1. transport TransportSubject<Wrapped<MsgIn, Namespace>, Wrapped<MsgOut, Namespace>>>
  2. namespace: Namespace
  3. options?: UnwrapOperatorOptions = {}

Returns TransportSubject<MsgIn, MsgOut>

License

Apache-2.0 OR MIT