Skip to content

Latest commit

 

History

History

stream

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 

@motorcycle/stream -- 3.0.0

Functional and reactive event streams for Motorcycle.ts

Get it

yarn add @motorcycle/stream
# or
npm install --save @motorcycle/stream

API Documentation

All functions are curried!

ap<A, B>(fns: Stream<(value: A) => B>, values: Stream<A>): Stream<B>

Applies a stream of functions to the latest from a stream of values.

See an example
import { ap, now, periodic, scan, skip, observe } from '@motorcycle/stream'

const count$ = scan(x => x + 1, 0, skip(1, periodic(100)))

const fn$ = now(x => x * x)

const stream = ap(fn$, count$)

observe(console.log, stream)
// 0
// 4
// 9
// ...
See the code
export { ap } from '@most/core'

at<A>(time: Time, value: A): Stream<A>

Creates a stream that emits a value after a given amount of time.

See an example
See the code
export { at } from '@most/core'

at<A>(time: number, value: A): Stream<A>

Create a stream containing a single event at a specific time.

See an example
import { at, observe } from '@motorcycle/stream'

observe(console.log, at(1000, 'Hello'))
// After 1 second
// logs 'Hello'
See the code
export { at } from '@most/core'

awaitPromises<A>(stream: Stream<Promise<A>>): Stream<A>

Turn a stream of promises into a stream containing the promises' values. Note that order is always preserved, regardless of promise fulfillment order.

See an example
import { mergeArray, fromPromise, at, now, observe } from '@motorcycle/stream'

// ----1------->
const a = new Promise(resolve => setTimeout(resolve, 100, 1))
// ---------2-->
const b = new Promise(resolve => setTimeout(resolve, 200, 2))
// --3--------->
const c = new Promise(resolve => setTimeout(resolve, 50, 3))

// bc---a------->
const source = mergeArray([ at(100, a), now(b), now(c) ])

// -----1----23->
const stream = awaitPromises(source)
See the code
export { awaitPromises } from '@most/core'

chain<A, B>(f: (value: A) => Stream<B>, stream: Stream<A>): Stream<B>

Creates a new stream by applying a stream-returning function to every event value and merging them into the resulting stream.

See an example
import { chain, now, observe } from '@motorcycle/stream'

const stream = chain(x => now(x * 2), now(1000))

observe(console.log, stream) 
// 2000
See the code
export { chain } from '@most/core'

combine<A, B, C>(f: (a: A, b: B) => C, a$: Stream<A>, b$: Stream<B>): Stream<C>

Apply a function to the most recent event from each stream when a new event arrives on any stream.

See an example
import { combine, at, merge, observe } from '@motorcycle/stream'

const a$ = merge(at(100, 100), at(200, 200))
const b$ = merge(at(200, 3000), at(250, 100))

const stream = combine(add, a$, b$)

observe(console.log, stream)
// 3200 -- at time 200 as a result of add(200, 3000)
// 350 -- at time 250 as a result of add(200, 100)
See the code
export { combine } from '@most/core'

combineArray<A, B, C>(f: (a: A, b: B) => C, streams: [ Stream<A>, Stream<B> ]): Stream<C>

Applies a function to the most recent event from all streams when a new event arrives on any stream.

See an example
import { combineArray, now, merge, at, observe } from '@motorcycle/stream'

const a$ = now(1000)
const b$ = now(2000)
const c$ = merge(at(100, 1), at(200, 2))

const sum = (x, y, z) => x + y + z

const stream = combineArray(sum, [ a$, b$, c$ ])

observe(console.log, stream)
// 3001 -- at time 100 as result of sum(1000, 2000, 1)
// 30002 -- at time 200 as result of sum(1000, 2000, 2)
See the code
export { combineArray } from '@most/core'

combineObj<Obj extends object>(obj: { [K in keyof Obj]: Stream<Obj[K]> }): Stream<Obj>

Takes an object of streams and returns a Stream of an object.

See an example
import { combineObj, now } from '@motorcycle/stream'

const obj = { a: now(1), b: now(2), c: now(3) }

const stream: Stream<{ a: number, b: number, c: number }> = combineObj(obj)
See the code
export function combineObj<Obj extends object>(
  object: { readonly [K in keyof Obj]: Stream<Obj[K]> }
): Stream<Obj> {
  const objectKeys = Object.keys(object) as Array<keyof Obj>
  const sources = map(key => object[key], objectKeys)

  return combineArray((...values: Array<Obj[keyof Obj]>) => {
    const valuesMap = {} as Obj

    for (let i = 0; i < length(values); ++i) valuesMap[objectKeys[i]] = values[i]

    return valuesMap
  }, sources)
}

concatMap<A, B>(f: (value: A) => Stream<B>, stream: Stream<A>): Stream<B>

Creates a new stream by lazily applying a stream-returning function to each event value of a stream concatenating that stream's values to the resulting stream.

See an example
import { concatMap, now, observe } from '@motorcycle/stream'

const source = // --104--101--108--108--111|

const f = (x: number) => now(String.fromCharCode(x))

const stream = concatMap(f, source)

observe(console.log, stream)
// h
// e
// l
// l
// o
See the code
export { concatMap } from '@most/core'

constant<A>(value: A, stream: Stream<any>): Stream<A>

Replace each event on a stream with a given value.

See an example
import { constant, periodic, observe } from '@motorcycle/stream'

const stream = constant(100, periodic(1000))

observe(console.log, stream) // every 1 second logs 100
See the code
export { constant } from '@most/core'

continueWith(f: () => Stream<A>, stream: Stream<A>): Stream<A>

Replace the end signal with a new stream returned by f. Note that f must return a stream.

See an example
import { continueWith, at } from '@motorcycle/stream'

// ----1------>
const a = at(100, 1)
// ----2------>
const b = at(100, 2)

// ----1----2->
const stream = continueWith(() => b, a)
See the code
export { continueWith } from '@most/core'

debounce<A>(ms: number, stream: Stream<A>): Stream<A>

Wait for a burst of events to subside and keep only the last event in the burst.

See an example
import { debounce } from '@motorcycle/stream'

const source = // abcd----abcd--->
//                -----d-------d->
const stream = debounce(2, source)
See the code
export { debounce } from '@most/core'

delay<A>(ms: number, stream: Stream<A>): Stream<A>

Timeshift a stream by a number of milliseconds.

See an example
import { delay } from '@motorcycle/stream'

const source = -1--2--3--4--5---->
//             ----1--2--3--4--5->
const stream = delay(3, source)
See the code
export { delay } from '@most/core'

drain<A>(stream: Stream<A>): Promise<void>

Activates a stream using an default scheduler instance from @most/scheduler, returning a promise of completion.

See an example
import { drain } from '@motorcycle/stream'

drain(stream)
 .then(() => console.log('complete'))
See the code
export const drain = <A>(stream: Stream<A>): Promise<void> => runEffects(stream, scheduler)

during<A>(signal: Stream<Stream<any>>, stream: Stream<A>): Stream<A>

Keep events that occur during a time window defined by a higher-order stream.

See an example
import { during } from '@motorcycle/stream'

const source = // -1-2-3-4-5-6-7-8->
const signal = // ------s---------->
const s      = //       --------x-->
//                -------4-5-6-7|
const stream = during(signal, source)
See the code
export { during } from '@most/core'

empty<A>(): Stream<A>

Create a stream containing no events, which ends immediately.

See an example
import { empty, drain } from '@motorcycle/stream'

const stream = empty()

drain(stream)
 .then(() => console.log('complete'))
See the code
export { empty } from '@most/core'

filter<A>(predicate: (value: A) => boolean, stream: Stream<A>): Stream<A>

Retain only events for which a predicate is truthy.

See an example
import { filter, observe } from '@motorcycle/stream'

const source = // ---true---false---true---|

// resulting stream only contains truthy values
const stream = filter(Boolean, source)

observe(console.log, stream)
// true
// true
See the code
export { filter } from '@most/core'

fromPromise<A>(promise: Promise<A>): Stream<A>

Create a stream containing a promise's value.

See an example
import { fromPromise, observe } from '@motorcycle/stream'

const a = fromPromise(Promise.resolve(1))
const b = fromPromise(Promise.reject(new Error('failure')))

observe(console.log, a)
 .then(() => console.log('done'))
 .catch(err => console.error(err.message))
// 1
// done

observe(console.log, b)
 .then(() => console.log('done'))
 .catch(err => console.error(err.message))
// 'failure'
See the code
export { fromPromise } from '@most/core'

hold<A>(stream: Stream<A>): Stream<A>

Deliver the most recently seen event to each new observer the instant it begins observing. A held stream is always multicast.

Given an input stream:

stream:    -a---b---c---d-\>

observers which begin observing at different times will see:

observer1: -a---b---c---d-\>
observer2:    a-b---c---d-\>
observer3:           c--d-\>

See an example
import { createDocumentDomSource events } from '@motorcycle/dom'
import { drain, hold, map } from @motorcycle/stream'

const doc = createDocumentDomSource(now(document))

// start holding on first subscription
const click$ = hold(map(e => ({ x: e.clientX, y: e.clientY }), events('click', doc)))

// hold the latest event even before the first subscription
drain(click$)
See the code
export function hold<A>(stream: Stream<A>): Stream<A> {
  return new Hold<A>(stream)
}

class Hold<A> extends MulticastSource<A> implements Stream<A> {
  private has: boolean
  private value: A
  private scheduler: Scheduler

  constructor(stream: Stream<A>) {
    super(stream)
  }

  public run(sink: Sink<A>, scheduler: Scheduler) {
    this.scheduler = scheduler

    return super.run(sink, scheduler)
  }

  public add(sink: Sink<A>) {
    if (this.has) sink.event(this.scheduler.currentTime(), this.value)

    return super.add(sink)
  }

  public event(time: Time, value: A) {
    this.has = true
    this.value = value

    return super.event(time, value)
  }
}

last<A>(stream: Stream<A>): Stream<A>

Returns a stream that will only emit it's last value right before ending. If the stream does not end, then no events will ever occur. If the stream ends before emitting a value, no value will emit.

See an example
See the code
export function last<A>(stream: Stream<A>): Stream<A> {
  return new Last(stream)
}

class Last<A> implements Stream<A> {
  constructor(private source: Stream<A>) {}

  public run(sink: Sink<A>, scheduler: Scheduler): Disposable {
    return this.source.run(new LastSink(sink), scheduler)
  }
}

class LastSink<A> implements Sink<A> {
  private has: boolean = false
  private value: A

  constructor(private sink: Sink<A>) {}

  public event(_: Time, value: A) {
    this.has = true
    this.value = value
  }

  public error(time: Time, error: Error) {
    this.has = false
    this.sink.error(time, error)
  }

  public end(time: Time) {
    if (this.has) this.sink.event(time, this.value)

    this.has = false

    this.sink.end(time)
  }
}

loop<A, B, C>(f: (accumulator: B, value: A) => { seed: B, value: C }, initial: B, stream: Stream<A>): Stream<A>

Accumulate results using a feedback loop that emits one value and feeds back another to be used in the next iteration.

It allows you to maintain and update a "state" while emitting a different value. In contrast, scan feeds back and produces the same value.

See an example
import { loop, periodic, filter, observe } from '@motorcycle/stream'

function pairwiseInterval (acc: number): { seed: number, value: [number, number] } {
  const seed = acc + 1
  const value =  [ acc, seed ]

  return { seed, value }
}

const stream = loop(pairwiseInterval, periodic(100))

observe(console.log, stream)
// [ 0, 1 ]
// [ 1, 2 ]
// [ 2, 3 ]
// ....
See the code
export { loop } from '@most/core'

map<A, B>(f: (value: A) => B, stream: Stream<A>): Stream<B>

Apply a function to each event value of a stream, returning a new stream containing the returned values.

See an example
import { map, now, observe } from '@motorcycle/stream'

const stream = map(x => x + 1, now(100))

observe(console.log, stream) // 101
See the code
export { map } from '@most/core'

mapList<A, B>(f: (value: A, index: number) => B, sinksList$: Stream<ArrayLike<A>>): Stream<ReadonlyArray<B>>

Applies a function to all Sinks in a list of Sinks.

See an example
import { mapList } from '@motorcycle/stream'

function Component(sources) {
  const { listOfData$ } = sources

  const sinksList$: Stream<ReadonlyArray<Sinks>> = mapList(
    data => ChildComponent({ ...sources, data$: now(data) })), 
    listOfData$,
  )

  const childViews$: Stream<ReadonlyArray<Stream<VNode>> = 
    mapList(({ view$ }) => view$, sinksList$)

  ...
}
See the code
export const mapList: MapList = curry2(__mapList)

export type MapList = {
  <A, B>(f: (value: A, index: number) => B, list$: Stream<ArrayLike<A>>): Stream<ReadonlyArray<B>>
  <A, B>(f: (value: A, index: number) => B): (
    list$: Stream<ArrayLike<A>>
  ) => Stream<ReadonlyArray<B>>
}

function __mapList<A, B>(
  f: (value: A) => B,
  list$: Stream<ArrayLike<A>>
): Stream<ReadonlyArray<B>> {
  return map<ArrayLike<A>, ReadonlyArray<B>>(mapArray(f), list$)
}

merge<A>(a: Stream<A>, b: Stream<A>): Stream<A>

Creates a new Stream containing events from both streams.

See an example
import { merge, at, observe } from '@motorcycle/stream'

const stream = merge(at(1000, 'World'), at(100, 'Hello'))

observe(console.log, stream)
// Hello -- at time 100
// World -- at time 1000
See the code
export { merge } from '@most/core'

mergeArray<A>(stream: Array<Stream<A>>): Stream<A>

Creates a new stream containing all events of underlying streams.

See an example
import { at, mergeArray, observe } from '@motorcycle/stream'

const stream = mergeArray([
  at(100, 'foo'),
  at(300, 'baz')
  at(200, 'bar'),
])

observe(console.log, stream)
// foo -- at time 100
// bar -- at time 200
// baz -- at time 300
See the code
export { mergeArray } from '@most/core'

multicast<A>(stream: Stream<A>): Stream<A>

Returns a stream equivalent to the original, but which can be shared more efficiently among multiple consumers.

See an example
import { multicast, observe } from '@motorcycle/stream'

// --1--2--3--4--5--6--7--8-->
const source = // ...

// --1--2--3--4--5--6--7--8-->
observe(console.log, source)

setTimeout(() => {
// --------------1--2--3--4--5--6--7--8-->
  observe(console.log, source)
}, 5)

const stream = multicast(source)

// --1--2--3--4--5--6--7--8-->
observe(console.log, stream)

setTimeout(() => {
// --------------5--6--7--8-->
  observe(console.log, stream)
}, 5)
See the code
export { multicast } from '@most/core'

never<A>(): Stream<A>

Create a stream containing no events, which never ends.

See an example
import { never, drain } from '@motorcycle/stream'

const stream = never()

drain(stream) // Returns a promise that never fulfills.
See the code
export { never } from '@most/core'

now<A>(value: A): Stream<A>

Create a stream containing a single event at time 0

See an example
import { now, observe } from '@motorcycle/stream'

const stream = now(1)

observe(console.log, stream)
// 1
See the code
export { now } from '@most/core'

observe<A>(f: (value: A) => any, stream: Stream<A>): Promise<void>

Activates a stream, calling a function f with each event value, and returns a Promise of completion.

See an example
See the code
export const observe: Observe = curry2(<A>(f: (value: A) => any, stream: Stream<A>): Promise<
  void
> => drain(tap(f, stream)))

export interface Observe {
  <A>(f: (value: A) => any, stream: Stream<A>): Promise<void>
  <A>(f: (value: A) => any): (stream: Stream<A>) => Promise<void>
}

periodic(ms: number): Stream<void>

Creates a stream that emits ever time 0 and every n milliseconds after.

See an example
import { periodic } from '@motorcycle/stream'

// void----void----void----void---->
const stream = periodic(5)
See the code
export { periodic } from '@most/core'

recoverWith<A>((err: Error) => Stream<A>, stream: Stream<A>): Stream<A>

Recover from a stream failure by calling a function to create a new stream.

See an example
import { recoverWith } from '@motorcycle/stream'

// -1-2-3X------->
const a = // ...
// -4-5-6-------->
const b = // ...

// -1-2-3-4-5-6-->
const stream = recoverWith(() => b, a)
See the code
export { recoverWith } from '@most/core'

runEffects<A>(stream: Stream<A>, scheduler: Scheduler): Promise<void>

Activate an event stream, and consume all its events.

See an example
import { runEffects, tap } from '@motorcycle/stream'
import { newDefaultScheduler } from '@most/scheduler'

const logStream = tap(console.log, stream)

runEffects(logStream, newDefaultScheduler())
 .then(() => console.log('complete'))
 .catch(err => console.error(err))
See the code
export { runEffects } from '@most/core'

sample<A, B, C>(f: (a: A, b: B) => C, sampler: Stream<A>, stream: Stream<B>): Stream<C>

For each event in a sampler stream, apply a function to combine it with the most recent event in another stream. The resulting stream will contain the same number of events as the sampler stream.

See an example
s1:                       -1--2--3--4--5->
sampler:                  -1-----2-----3->
sample(sum, sampler, s1): -2-----5-----8->

s1:                       -1-----2-----3->
sampler:                  -1--2--3--4--5->
sample(sum, sampler, s1): -2--3--5--6--8->
See the code
export { sample } from '@most/core'

sampleWith<A>(sampler: Stream<any>, stream: Stream<A>): Stream<A>

Given each event occurrence from a sampler stream takes the latest value from the given stream.

See an example
import { sampleWith } from '@motorcycle/stream'

function submit(dom: DomSource): Stream<string> {
  const button = query('button', dom)
  const input = query('input', dom)

  const click$ = events('click', button)
  const value$ = map(ev => ev.target.value, events('input', input))

  return sampleWith(click$, value$)
}
See the code
export const sampleWith = sample(takeRight) as SampleWith

export interface SampleWith {
  <A>(sampler: Stream<any>, stream: Stream<A>): Stream<A>
  <A>(sampler: Stream<any>): (stream: Stream<A>) => Stream<A>
  (sampler: Stream<any>): <A>(stream: Stream<A>) => Stream<A>
}

function takeRight<A>(_: any, value: A): A {
  return value
}

scan<A, B>(f: (seed: B, value: A) => B, initial: B, stream: Stream<A>): Stream<B>

Incrementally accumulate results, starting with the provided initial value.

See an example
import { scan, periodic, observe } from '@motorcycle/stream'

// creates a stream that increments by 1 every 1000ms
const count$ = scan(x => x + 1, 0, periodic(1000))

observe(console.log, count$)
See the code
export { scan } from '@most/core'

scheduler (Scheduler)

A shared instance of the default scheduler from @most/scheduler

See an example
import { scheduler, now } from '@motorcycle/stream'

const stream = now(1)

const sink = {
  event(time: number, value: number) { ... },
  error(time: number, err: Error) { ... },
  end(time: number) { ... }
}

const disposable = stream.run(sink, scheduler)

// later 
disposable.dispose()
See the code
export const scheduler: Scheduler = newDefaultScheduler()

since<A>(startSingal: Stream<any>, stream: Stream<A>): Stream<A>

Discard all events in one stream until the first event occurs in another.:

See an example
import { since } from '@motorcycle/stream'

const source = // -1-2-3-4-5-6-7-8->
const start =  // --------x-------->
//                ---------5-6-7-8->
const stream = since(start, source)
See the code
export { since } from '@most/core'

skip<A>(quanity: number, stream: Stream<A>): Stream<A>

Skip the first n number of events.

See an example
import { skip } from '@motorcycle/stream'

const source = // -1-2-3-4-5-6-7-8-9-10->
//                -----------6-7-8-9-10->
const stream = skip(5, source)
See the code
export { skip } from '@most/core'

skipAfter<A>(predicate: (value: A) => boolean, stream: Stream<A>): Stream<A>

Discard all events after the first event for which predicate returns true.

See an example
import { skipAfter } from '@motorcycle/stream'

const source = // --1-2-3-4-5-6-7-8->
//                --1-2|
const stream = skipAfter(even, source)
See the code
export { skipAfter } from '@most/core'

skipRepeats<A>(stream: Stream<A>): Stream<A>

Remove adjacent events that are equal in terms of value equality.

See an example
const a = { a: 1 }
const b = Object.assign({}, a)
const c = { c: 2 }

const source = // --a--b--a--c-->
//                --a--------c-->
const stream = skipRepeats(source)

observe(console.log, stream)
// { a: 1 }
// { c: 2 }
See the code
export const skipRepeats: SkipRepeats = skipRepeatsWith(equals)

export type SkipRepeats = <A>(stream: Stream<A>) => Stream<A>

skipRepeatsWith<A>(predicate: (a: A, b: A) => boolean, stream: Stream<A>): Stream<A>

Remove adjacent repeated events, using the provided equality function to compare adjacent events.:

See an example
import { skipRepeatsWith, observe } from '@motorcycle/stream'
 
const source = // --a-b-B-c-D-d-e->

const equalsIgnoreCase = (a: string, b: string) =>
 a.toLowerCase() === b.toLowerCase()

const stream = skipRepeatsWith(equalsIgnoreCase, source)

observe(console.log, stream)
// a
// b
// c
// D
// e
See the code
export { skipRepeatsWith } from '@most/core'

skipWhile(predicate: (value: A) => boolean, stream: Stream<A>): Stream<A>

Discard all events until predicate returns false, and keep the rest.

See an example
import { skipWhile } from '@motorcycle/stream'

const source = // -2-4-5-6-8->
//                 ----5-6-8->
const stream = skipWhile(even, source)
See the code
export { skipWhile } from '@most/core'

slice<A>(skip: number, take: number, stream: Stream<A>): Stream<A>

Keep only events in a range, where start <= index < end, and index is the ordinal index of an event in stream.

See an example
import { slice } from '@most/core'

const source = // --1--2--3--4--5--6--7--8--9--10-->
//                --------3--4--5|
const stream = slice(2, 3, source)
See the code
export { slice } from '@most/core'

startWith<A>(initialValue: A, stream: Stream<A>): Stream<A>

Prepends an event to a stream at time 0.

See an example
import { startWith, at, observe } from '@motorcycle/stream'

const stream = startWith('Hello', at(1000, 'world'))

observe(console.log, stream)
// At time 0 logs 'Hello'
// At time 1000 logs 'world'
See the code
export { startWith } from '@most/core'

state<A, B>(f: (acc: A, value: B) => A, seed$: Stream<A>, values$: Stream<B>): Stream<A>

Especially useful when keeping local state that also needs to be updated from a source.

See an example
import { Stream } from '@motorcycle/types'
import { query, dragOverEvent, dragStartEvent, dropEvent } from '@motorcycle/dom'
import { sample, map, state, mapList } from '@motorcycle/stream'
import { move } from '@typed/prelude'

export function ReorderableList(sources) {
  const { list$, dom } = sources
  const listItemSource = query(listItemCssSelector, dom)
  const dragOver$ = dragOverEvent(listItemSource)
  const dragStart$ = dragStartEvent(listItemSource)
  const drop$ = dropEvent(listItemSource)
  const reducer$: Stream<(list: Array<string>) => Array<string>> =
    sample((to, from) => move(from, to), map(elementDataKey, drop$), map(elementDataKey, dragStart$))
  const reorderedList$ = state((x, f) => f(x), list$, reducer$)
  // Create all list items.
  const listItemViews$ = mapList(listItem, reorderedList$)
  // Pass the list items to the view
  const view$ = map(view, listItemViews$)

  return {
    view$,
    preventDefault$: dragOver$,
  }
}
See the code
export const state: State = curry3(__state)

function __state<A, B>(
  f: (accumulator: B, value: A) => B,
  seed$: Stream<B>,
  values$: Stream<A>
): Stream<B> {
  return hold(skipRepeats(switchMap(seed => scan(f, seed, values$), seed$)))
}

export interface State {
  <A, B>(f: (accumulator: A, value: B) => A, seed$: Stream<A>, values$: Stream<B>): Stream<A>
  <A, B>(f: (accumulator: A, value: B) => A, seed$: Stream<A>): (values$: Stream<B>) => Stream<A>
  <A, B>(f: (accumulator: A, value: B) => A): {
    (seed$: Stream<A>, values$: Stream<B>): Stream<A>
    (seed$: Stream<A>): (values$: Stream<B>) => Stream<A>
  }
}

switchCombine<A>(streamList$: Stream<Array<Stream<A>>): Stream<ReadonlyArray<A>>

Flattens an array of streams into an array of values. Particularly useful when dealing with a list of children components.

See an example
import { switchCombine, mapSinks, map, now } from '@motorcycle/stream'

function Component(sources) {
  const { listOfData$ } = sources

  const childSinks$ = map(
    listOfData => listOfData.map(data => ChildComponent({ ...sources, data$: now(data) }))
    listOfData$
  )

  const childViews$: Stream<ReadonlyArray<VNode>> = 
    switchCombine(mapSinks(sinks => sinks.view$, childSinks$))

  const view$ = map(view, childView$)

  return { view$ }
}

function view(childViews: ReadonlyArray<VNode>): VNode {
  // ...
}
See the code
export function switchCombine<A>(streamList$: Stream<Array<Stream<A>>>): Stream<ReadonlyArray<A>> {
  return switchLatest(
    map(
      streams => (streams.length === 0 ? now([]) : combineArray((...items) => items, streams)),
      streamList$
    )
  )
}

switchLatest<A>(stream: Stream<Stream<A>>): Stream<A>

Given a higher-order stream, return a new stream that adopts the behavior of (ie emits the events of) the most recent inner stream.

See an example
import { switchLatest, now } from '@motorcycle/stream'

const A = // -1--2--3----->
const B = // -4--5--6----->
const C = // -7--8--9----->

// --A-----B-----C-------->
const source = // ...

// ---1--2--4--5--7--8--9->
const stream = switchLatest(source)
See the code
export { switchLatest } from '@most/core'

switchMap<A, B = A>(f: (a: A) => Stream<B>, s: Stream<A>): Stream<B>

Applies a function, which returns a higher-order stream, to each event value of a stream and returns a new stream that adopts the behavior of (i.e., emits the events of) the most recent inner stream.

See an example
import { now, scan, switchMap, observe, skip } from '@motorcycle/stream'

const a$ = now(1)
const b$ = now(2)
const f = (a: number) => scan((x, y) => x + y, a, b$)
const s = skip(1, switchMap(f, a$))

observe(console.log, s) // 3
See the code
export const switchMap: SwitchMapArity2 = curry2(function switchMap<A, B = A>(
  f: (a: A) => Stream<B>,
  s: Stream<A>
): Stream<B> {
  return switchLatest(map(f, s))
})

export interface SwitchMapArity2 {
  <A, B = A>(f: (a: A) => Stream<B>, s: Stream<A>): Stream<B>

  <A, B = A>(f: (a: A) => Stream<B>): SwitchMapArity1<A, B>
}

export interface SwitchMapArity1<A, B = A> {
  (s: Stream<A>): Stream<B>
}

switchMerge<A>(streams$: Stream<Array<Stream<A>>): Stream<A>

Merges a list of streams into a single stream containing events from all of the stream. Particularly useful when dealing with a list of child components.

See an example
import { switchMerge, mapSinks, now } from '@motorcycle/stream'

function Component(sources) {
  const { listOfData$ } = sources

  const childSinks$ = map(
    listOfData => listOfData.map(data => ChildComponent({ ...sources, data$: now(data) }))),
    listOfData$
  )

  const foo$ = switchMerge(mapSinks(sinks => sinks.foo$, childSinks$))

  return { foo$ } 
}
See the code
export function switchMerge<A>(streams$: Stream<Array<Stream<A>>>): Stream<A> {
  return switchLatest(map(mergeArray, streams$))
}

switchSinkOr<Sinks, K extends keyof Sinks>(or$: Sinks[K], sinkName: K, sinks$: Stream<Sinks>): Sinks[K]

Flattens a stream of sinks into a single sink.

See an example
import { switchSinkOr, map, now, never } from '@motorcycle/stream'

const switchSinkOrNever = switchSinkOr(never())

function Component(sources) {
  const { listOfItems$ } = sources

 const sinks$ = map(items => SubComponent({ ...sources, items$: now(items) }), listOfItems$)

 const history$ = switchSinkOrNever('history$', sinks$)

 return { history$ } 
}
See the code
export const switchSinkOr: SwitchSinkOr = curry3<any, any, any, any>(function switchSinkOr<
  Sinks extends { readonly [key: string]: Stream<any> },
  K extends keyof Sinks = keyof Sinks
>(or$: Sinks[K], sinkName: K, sinks$: Stream<Sinks>): Sinks[K] {
  return switchLatest(map(sinks => sinks[sinkName] || or$, sinks$))
})

export interface SwitchSinkOr {
  <Sinks extends { readonly [key: string]: Stream<any> }, K extends keyof Sinks = keyof Sinks>(
    or$: Sinks[K],
    sinkName: K,
    sinks$: Stream<Sinks>
  ): Sinks[K]

  <Sinks extends { readonly [key: string]: Stream<any> }, K extends keyof Sinks = keyof Sinks>(
    or$: Sinks[K],
    sinkName: K
  ): (sinks$: Stream<Sinks>) => Sinks[K]

  <Sinks extends { readonly [key: string]: Stream<any> }, K extends keyof Sinks = keyof Sinks>(
    or$: Sinks[K]
  ): (sinkName: K, sinks$: Stream<Sinks>) => Sinks[K]

  <Sinks extends { readonly [key: string]: Stream<any> }, K extends keyof Sinks = keyof Sinks>(
    or$: Sinks[K]
  ): (sinkName: K) => (sinks$: Stream<Sinks>) => Sinks[K]
}

take<A>(quantity: number, stream: Stream<A>): Stream<A>

Take at most the first n events of a stream.

See an example
import { take } from '@motorcycle/stream'

const source = // -1-2-3-4-5-6-7-8-9-10->
//                -1-2-3|
const stream = take(3, source)
See the code
export { take } from '@most/core'

takeWhile<A>(predicate: (value: A) => boolean, stream: Stream<A>): Stream<A>

Keep all events until predicate returns false, and discard the rest.

See an example
import { takeWhile } from '@motorcycle/stream'

const source = // -2-4-5-6-8->
//                -2-4-|
const stream = takeWhile(even, source)
See the code
export { takeWhile } from '@most/core'

tap<A>(f: (value: A) => any, stream: Stream<A>): Stream<A>

Creates a new stream that upon each event performs a side-effect.

See an example
import { tap, drain } from '@motorcycle/stream'

const logStream = tap(console.log, stream)

drain(logStream)
See the code
export { tap } from '@most/core'

throttle<A>(ms: number, stream: Stream<A>): Stream<A>

Limit the rate of events to at most one per a number of milliseconds.

In contrast to debounce, throttle simply drops events that occur "too often", whereas debounce waits for a "quiet period".

See an example
import { throttle } from '@motorcycle/stream'

const source = // -abcd---abcd--->
//                -a-c----a-c---->
const stream = throttle(2, source)
See the code
export { throttle } from '@most/core'

throwError(err: Error): Stream<never>

Create a stream in the error state. This can be useful for functions that need to return a stream, but need to signal an error.

See an example
import { throwError, chain, now } from '@motorcycle/stream'

const f = (x: Maybe<number>): Stream<number> => isNothing(x)
 ? throwError(new Error('cannot be given Nothing'))
 : now(fromJust(x)) 

const stream = chain(f, maybe$)
See the code
export { throwError } from '@most/core'

until<A>(endSignal: Stream<any>, stream: Stream<A>): Stream<A>

Keep all events in one stream until the first event occurs in another.

See an example
import { until } from '@motorcycle/stream'

const source =     // --1-2-3-4-5-6-7-8->
const endSignal =  // ---------z-------->
//                    --1-2-3-4|
const stream = until(endSingal, source)
See the code
export { until } from '@most/core'

withArrayValues<A>(array: Array<A>, stream: Stream<any>): Stream<A>

Creates a new stream by associating event times with values from an array. The resulting stream will end when all array values have been used or when the underlying stream ends.

See an example
import { withArrayValues, periodic, observe } from '@motorcycle/stream'

const stream = withArrayValues([ 1, 2, 3 ], periodic(100))

observe(console.log, stream)
// 1 -- time 0
// 2 -- time 100
// 3 -- time 200
See the code
export { withArrayValues } from '@most/core'

zip<A, B, C>(f: (a: A, b: B) => C, a$: Stream<A>, b$: Stream<B>): Stream<C>

Applies a function to corresponding pairs of events from the input streams.

See an example
import { zip, observe } from '@motorcycle/stream'

const tuple = (x, y) => [x, y]

const a$ = // --1----3-------5------6----|
const b$ = // --------2--3--------4------|
//         // --------[3,2]--[5,3]--[6,4]|
const stream = zip(tuple, a$, b$)

observe(console.log, stream)
// [3, 2]
// [5, 3]
// [6, 4]
See the code
export { zip } from '@most/core'

zipArray<A, B, C>(f: (a: A, b: B) => C, streams: [Stream<A>, Stream<B>]): Stream<C>

Applies a function to corresponding pairs of events from the input streams.

See an example
import { zipArray, observe } from '@motorcycle/stream'

const tuple = (x, y) => [x, y]

const a$ = // --1----3-------5------6----|
const b$ = // --------2--3--------4------|
//         // --------[3,2]--[5,3]--[6,4]|
const stream = zipArray(tuple [a$, b$])

observe(console.log, stream)
// [3, 2]
// [5, 3]
// [6, 4]
See the code
export { zipArray } from '@most/core'

zipArrayValues<A, B, C>(f: (arrayValue: A, streamValue: Stream<B>) => C, array: Array<A>, stream: Stream<B>): Stream<C>

Creates a new stream by applying a function with a value at increasing index of an array and the latest event value from a stream. The resulting stream will end when all array values have been used or as soon as the underlying stream ends.

See an example
import { zipArrayValues, now, concat, observe } from '@motorcycle/stream'

const f = (x, y) => x + y

const array = [ 100, 200 ]
const stream = concat(now(1), now(2))

observe(console.log, zipArrayValues(f, array, stream))
// 101
// 202
See the code
export { zipArrayValues } from '@most/core'