manifold.stream

Methods for creating, transforming, and interacting with asynchronous streams of values.

->BufferedStream

(->BufferedStream buf limit metric description buffer-size last-put buf+ handle mta)

Positional factory function for class manifold.stream.BufferedStream.

->sink

(->sink x)(->sink x default-val)

Converts, if possible, the object to a Manifold sink, or default-val if it cannot. If no default value is given, an IllegalArgumentException is thrown.

->source

(->source x)(->source x default-val)

Converts, if possible, the object to a Manifold source, or default-val if it cannot. If no default value is given, an IllegalArgumentException is thrown.

batch

(batch batch-size s)(batch max-size max-latency s)(batch metric max-size max-latency s)

Batches messages, either into groups of fixed size, or according to upper bounds on size and latency, in milliseconds. By default, each message is of size 1, but a custom metric function that returns the size of each message may be defined.

buffer

(buffer limit s)(buffer metric limit s)

Takes a stream, and returns a stream which is a buffered view of that stream. The buffer size may either be measured in messages, or if a metric is defined, by the sum of metric mapped over all messages currently buffered.

buffered-stream

(buffered-stream buffer-size)(buffered-stream metric limit)(buffered-stream metric limit description)

A stream which will buffer at most limit data, where the size of each message is defined by (metric message).

close!

(close! sink)

Closes a source or sink, so that it can’t emit or accept any more messages.

closed?

(closed? sink)

Returns true if the event sink is closed.

concat

(concat s)

Takes a stream of streams, and flattens it into a single stream.

connect

(connect source sink)(connect source sink {:keys [upstream? downstream? timeout description], :or {upstream? false, downstream? true}})

Connects a source to a sink, propagating all messages from the former into the latter.

Optionally takes a map of parameters:

upstream? if closing the sink should always close the source, even if there are other sinks downstream of the source. Defaults to false. Note that if the sink is the only thing downstream of the source, the source will always be closed, unless it is permanent.
downstream? if closing the source will close the sink. Defaults to true.
timeout if defined, the maximum time, in milliseconds, that will be spent trying to put a message into the sink before closing it. Useful when there are multiple sinks downstream of a source, and you want to avoid a single backed up sink from blocking all the others.
description describes the connection, useful for traversing the stream topology via downstream.

connect-via

(connect-via src callback dst)(connect-via src callback dst options)

Feeds all messages from src into callback, with the understanding that they will eventually be propagated into dst in some form. The return value of callback should be a deferred yielding either true or false. When false, the downstream sink is assumed to be closed, and the connection is severed.

Returns a deferred which yields true when src is exhausted or callback yields false.

consume

(consume callback source)

Feeds all messages from source into callback.

Messages will be processed as quickly as the callback can be executed. Returns a deferred which yields true when source is exhausted.

consume-async

(consume-async callback source)

Feeds all messages from source into callback, which must return a deferred yielding true or false. If the returned value yields false, the consumption will be cancelled.

Messages will be processed only as quickly as the deferred values are realized. Returns a deferred which yields true when source is exhausted or callback yields false.

description

(description x)

Returns a description of the stream.

downstream

(downstream x)

Returns all sinks downstream of the given source as a sequence of 2-tuples, with the first element containing the connection’s description, and the second element containing the sink.

drain-into

(drain-into src dst)

Takes all messages from src and puts them into dst, and returns a deferred that yields true once src is drained or dst is closed. If src is closed or drained, dst will not be closed.

drained?

(drained? source)

Returns true if the event source is drained.

filter

(filter pred s)

Equivalent to Clojure’s filter, but for streams instead of sequences.

lazily-partition-by

(lazily-partition-by f s)

Equivalent to Clojure’s partition-by, but returns a stream of streams. This means that if a sub-stream is not completely consumed, the next sub-stream will never be emitted.

Use with caution. If you’re not totally sure you want a stream of streams, use (transform (partition-by f)) instead.

map

(map f s)(map f s & rest)

Equivalent to Clojure’s map, but for streams instead of sequences.

mapcat

(mapcat f s)(mapcat f s & rest)

Equivalent to Clojure’s mapcat, but for streams instead of sequences.

on-closed

(on-closed sink callback)

Registers a no-arg callback which is invoked when the sink is closed.

on-drained

(on-drained source callback)

Registers a no-arg callback which is invoked when the source is drained.

onto

(onto executor s)

Returns an identical stream whose deferred callbacks will be executed on executor.

periodically

(periodically period initial-delay f)(periodically period f)

Creates a stream which emits the result of invoking (f) every period milliseconds.

put!

(put! sink x)

Puts a value into a sink, returning a deferred that yields true if it succeeds, and false if it fails. Guaranteed to be non-blocking.

put-all!

(put-all! sink msgs)

Puts all values into the sink, returning a deferred that yields true if all puts are successful, or false otherwise. If the sink provides backpressure, will pause. Guaranteed to be non-blocking.

realize-each

(realize-each s)

Takes a stream of potentially deferred values, and returns a stream of realized values.

reduce

(reduce f s)(reduce f initial-value s)

Equivalent to Clojure’s reduce, but returns a deferred representing the return value.

The deferred will be realized once the stream is closed or if the accumulator functions returns a reduced value.

reductions

(reductions f s)(reductions f initial-value s)

Equivalent to Clojure’s reductions, but for streams instead of sequences.

sink-only

(sink-only s)

Returns a view of the stream which is only a sink.

sink?

(sink? x)

Returns true if the object is a Manifold sink.

sinkable?

(sinkable? x)

source-only

(source-only s)

Returns a view of the stream which is only a source.

source?

(source? x)

Returns true if the object is a Manifold source.

sourceable?

(sourceable? x)

splice

(splice sink source)

Splices together two halves of a stream, such that all messages enqueued via put! go into sink, and all messages dequeued via take! come from source.

stream

(stream)(stream buffer-size)(stream buffer-size xform)(stream buffer-size xform executor)

Returns a Manifold stream with a configurable buffer-size. If a capacity is specified, put! will yield true when the message is in the buffer. Otherwise it will only yield true once it has been consumed.

xform is an optional transducer, which will transform all messages that are enqueued via put! before they are dequeued via take!.

executor, if defined, specifies which java.util.concurrent.Executor will be used to handle the deferreds returned by put! and take!.

stream*

(stream* {:keys [permanent? buffer-size description executor xform]})

An alternate way to build a stream, via a map of parameters.

permanent? if true, the channel cannot be closed
buffer-size the number of messages that can accumulate in the channel before backpressure is applied
description the description of the channel, which is a single arg function that takes the base properties and returns an enriched map.
executor the java.util.concurrent.Executor that will execute all callbacks registered on the deferreds returns by put! and take!
xform a transducer which will transform all messages that are enqueued via put! before they are dequeued via take!.

stream->seq

(stream->seq s)(stream->seq s timeout-interval)

Transforms a stream into a lazy sequence. If a timeout-interval is defined, the sequence will terminate if timeout-interval milliseconds elapses without a new event.

stream?

(stream? x)

Returns true if the object is a Manifold stream.

synchronous?

(synchronous? x)

Returns true if the underlying abstraction behaves synchronously, using thread blocking to provide backpressure.

take!

(take! source)(take! source default-val)

Takes a value from a stream, returning a deferred that yields the value when it is available, or nil if the take fails. Guaranteed to be non-blocking.

A special default-val may be specified, if it is important to differentiate between actual nil values and failures.

throttle

(throttle max-rate s)(throttle max-rate max-backlog s)

Limits the max-rate that messages are emitted, per second.

The max-backlog dictates how much “memory” the throttling mechanism has, or how many messages it will emit immediately after a long interval without any messages. By default, this is set to one second’s worth.

transform

(transform xform s)(transform xform buffer-size s)

Takes a transducer xform and returns a source which applies it to source s. A buffer-size may optionally be defined for the output source.

try-put!

(try-put! sink x timeout)(try-put! sink x timeout timeout-val)

Puts a value into a stream if the put can successfully be completed in timeout milliseconds. Returns a promiise that yields true if it succeeds, and false if it fails or times out. Guaranteed to be non-blocking.

A special timeout-val may be specified, if it is important to differentiate between failure due to timeout and other failures.

try-take!

(try-take! source timeout)(try-take! source default-val timeout timeout-val)

Takes a value from a stream, returning a deferred that yields the value if it is available within timeout milliseconds, or nil if it fails or times out. Guaranteed to be non-blocking.

Special timeout-val and default-val values may be specified, if it is important to differentiate between actual nil values and failures.

weak-handle

(weak-handle x)

Returns a weak reference that can be used to construct topologies of streams.

zip

(zip a)(zip a & rest)

Takes n-many streams, and returns a single stream which will emit n-tuples representing a message from each stream.