manifold.stream
Methods for creating, transforming, and interacting with asynchronous streams of values.
->sink
(->sink x)
(->sink x default-val)
Converts, if possible, the object to a Manifold sink, or default-val
if it cannot. If no default value is given, an IllegalArgumentException
is thrown.
->source
(->source x)
(->source x default-val)
Converts, if possible, the object to a Manifold source, or default-val
if it cannot. If no default value is given, an IllegalArgumentException
is thrown.
batch
(batch batch-size s)
(batch max-size max-latency s)
(batch metric max-size max-latency s)
Batches messages, either into groups of fixed size, or according to upper bounds on size and latency, in milliseconds. By default, each message is of size 1
, but a custom metric
function that returns the size of each message may be defined.
buffer
(buffer limit s)
(buffer metric limit s)
Takes a stream, and returns a stream which is a buffered view of that stream. The buffer size may either be measured in messages, or if a metric
is defined, by the sum of metric
mapped over all messages currently buffered.
buffered-stream
(buffered-stream buffer-size)
(buffered-stream metric limit)
(buffered-stream metric limit description)
A stream which will buffer at most limit
data, where the size of each message is defined by (metric message)
.
close!
(close! sink)
Closes a source or sink, so that it can’t emit or accept any more messages.
connect
(connect source sink)
(connect source sink {:keys [upstream? downstream? timeout description], :or {upstream? false, downstream? true}})
Connects a source to a sink, propagating all messages from the former into the latter.
Optionally takes a map of parameters:
upstream? |
if closing the sink should always close the source, even if there are other sinks downstream of the source. Defaults to false . Note that if the sink is the only thing downstream of the source, the source will always be closed, unless it is permanent. |
downstream? |
if closing the source will close the sink. Defaults to true . |
timeout |
if defined, the maximum time, in milliseconds, that will be spent trying to put a message into the sink before closing it. Useful when there are multiple sinks downstream of a source, and you want to avoid a single backed up sink from blocking all the others. |
description |
describes the connection, useful for traversing the stream topology via downstream . |
connect-via
(connect-via src callback dst)
(connect-via src callback dst options)
Feeds all messages from src
into callback
, with the understanding that they will eventually be propagated into dst
in some form. The return value of callback
should be a deferred yielding either true
or false
. When false
, the downstream sink is assumed to be closed, and the connection is severed.
Returns a deferred which yields true
when src
is exhausted or callback
yields false
.
consume
(consume callback source)
Feeds all messages from source
into callback
.
Messages will be processed as quickly as the callback can be executed. Returns a deferred which yields true
when source
is exhausted.
consume-async
(consume-async callback source)
Feeds all messages from source
into callback
, which must return a deferred yielding true
or false
. If the returned value yields false
, the consumption will be cancelled.
Messages will be processed only as quickly as the deferred values are realized. Returns a deferred which yields true
when source
is exhausted or callback
yields false
.
downstream
(downstream x)
Returns all sinks downstream of the given source as a sequence of 2-tuples, with the first element containing the connection’s description, and the second element containing the sink.
drain-into
(drain-into src dst)
Takes all messages from src
and puts them into dst
, and returns a deferred that yields true
once src
is drained or dst
is closed. If src
is closed or drained, dst
will not be closed.
filter
(filter pred s)
Equivalent to Clojure’s filter
, but for streams instead of sequences.
lazily-partition-by
(lazily-partition-by f s)
Equivalent to Clojure’s partition-by
, but returns a stream of streams. This means that if a sub-stream is not completely consumed, the next sub-stream will never be emitted.
Use with caution. If you’re not totally sure you want a stream of streams, use (transform (partition-by f))
instead.
map
(map f s)
(map f s & rest)
Equivalent to Clojure’s map
, but for streams instead of sequences.
mapcat
(mapcat f s)
(mapcat f s & rest)
Equivalent to Clojure’s mapcat
, but for streams instead of sequences.
on-closed
(on-closed sink callback)
Registers a no-arg callback which is invoked when the sink is closed.
on-drained
(on-drained source callback)
Registers a no-arg callback which is invoked when the source is drained.
onto
(onto executor s)
Returns an identical stream whose deferred callbacks will be executed on executor
.
periodically
(periodically period initial-delay f)
(periodically period f)
Creates a stream which emits the result of invoking (f)
every period
milliseconds.
put!
(put! sink x)
Puts a value into a sink, returning a deferred that yields true
if it succeeds, and false
if it fails. Guaranteed to be non-blocking.
put-all!
(put-all! sink msgs)
Puts all values into the sink, returning a deferred that yields true
if all puts are successful, or false
otherwise. If the sink provides backpressure, will pause. Guaranteed to be non-blocking.
realize-each
(realize-each s)
Takes a stream of potentially deferred values, and returns a stream of realized values.
reduce
(reduce f s)
(reduce f initial-value s)
Equivalent to Clojure’s reduce
, but returns a deferred representing the return value.
The deferred will be realized once the stream is closed or if the accumulator functions returns a reduced
value.
reductions
(reductions f s)
(reductions f initial-value s)
Equivalent to Clojure’s reductions
, but for streams instead of sequences.
splice
(splice sink source)
Splices together two halves of a stream, such that all messages enqueued via put!
go into sink
, and all messages dequeued via take!
come from source
.
stream
(stream)
(stream buffer-size)
(stream buffer-size xform)
(stream buffer-size xform executor)
Returns a Manifold stream with a configurable buffer-size
. If a capacity is specified, put!
will yield true
when the message is in the buffer. Otherwise it will only yield true
once it has been consumed.
xform
is an optional transducer, which will transform all messages that are enqueued via put!
before they are dequeued via take!
.
executor
, if defined, specifies which java.util.concurrent.Executor will be used to handle the deferreds returned by put!
and take!
.
stream*
(stream* {:keys [permanent? buffer-size description executor xform]})
An alternate way to build a stream, via a map of parameters.
permanent? |
if true , the channel cannot be closed |
buffer-size |
the number of messages that can accumulate in the channel before backpressure is applied |
description |
the description of the channel, which is a single arg function that takes the base properties and returns an enriched map. |
executor |
the java.util.concurrent.Executor that will execute all callbacks registered on the deferreds returns by put! and take! |
xform |
a transducer which will transform all messages that are enqueued via put! before they are dequeued via take! . |
stream->seq
(stream->seq s)
(stream->seq s timeout-interval)
Transforms a stream into a lazy sequence. If a timeout-interval
is defined, the sequence will terminate if timeout-interval
milliseconds elapses without a new event.
synchronous?
(synchronous? x)
Returns true if the underlying abstraction behaves synchronously, using thread blocking to provide backpressure.
take!
(take! source)
(take! source default-val)
Takes a value from a stream, returning a deferred that yields the value when it is available, or nil
if the take fails. Guaranteed to be non-blocking.
A special default-val
may be specified, if it is important to differentiate between actual nil
values and failures.
throttle
(throttle max-rate s)
(throttle max-rate max-backlog s)
Limits the max-rate
that messages are emitted, per second.
The max-backlog
dictates how much “memory” the throttling mechanism has, or how many messages it will emit immediately after a long interval without any messages. By default, this is set to one second’s worth.
transform
(transform xform s)
(transform xform buffer-size s)
Takes a transducer xform
and returns a source which applies it to source s
. A buffer-size may optionally be defined for the output source.
try-put!
(try-put! sink x timeout)
(try-put! sink x timeout timeout-val)
Puts a value into a stream if the put can successfully be completed in timeout
milliseconds. Returns a promise that yields true
if it succeeds, and false
if it fails or times out. Guaranteed to be non-blocking.
A special timeout-val
may be specified, if it is important to differentiate between failure due to timeout and other failures.
try-take!
(try-take! source timeout)
(try-take! source default-val timeout timeout-val)
Takes a value from a stream, returning a deferred that yields the value if it is available within timeout
milliseconds, or nil
if it fails or times out. Guaranteed to be non-blocking.
Special timeout-val
and default-val
values may be specified, if it is important to differentiate between actual nil
values and failures.
weak-handle
(weak-handle x)
Returns a weak reference that can be used to construct topologies of streams.
zip
(zip a)
(zip a & rest)
Takes n-many streams, and returns a single stream which will emit n-tuples representing a message from each stream.