stream {async} | R Documentation |
(Experimental as of async 0.3) stream(...)
constructs a channel
object, i.e. an asynchronous iterator, which will compute and
return values according to sequential code written in expr
. A
stream
is a coroutine wearing a channel interface in the same
way that async
is a coroutine wearing a promise interface, and a
gen is a coroutine sitting behind an iteror interface.
stream(
expr,
...,
split_pipes = TRUE,
lazy = TRUE,
compileLevel = getOption("async.compileLevel"),
debugR = FALSE,
debugInternal = FALSE,
trace = getOption("async.verbose")
)
expr |
A coroutine expression, using some combination of
|
... |
Undocumented. |
split_pipes |
See description under async; defaults to
|
lazy |
If TRUE, start paused, and pause after |
compileLevel |
Compilation level. |
debugR |
Set TRUE to single-step debug at R level. Use |
debugInternal |
Set TRUE to single-step debug at coroutine implementation level. |
trace |
An optional tracing function. |
In a stream expression, you can call yield()
to emit a value, and
await()
to wait for a value from a promise. To have your stream
wait for values from another stream or channel, call
awaitNext()
; you can also use awaitNext
when you are writing an
async
. You can also use a simple for
loop to consume all future
values from a stream or channel.
The lower-level interface to consume values from a stream is by using nextThen from the channel interface.
Streams come in both "lazy" and "eager" varieties. If lazy=TRUE
,
a stream starts idle, and does not process anything
until it is woken up by a call to its channel's nextThen
. It will
pause after reaching yield
if there are no more outstanding
requests. If lazy=FALSE
, a stream will begin executing
immediately, not pausing on yield
, possibly queuing up emitted
values until it needs to await
something.
(For comparison, in this package, gen are lazy in that they do
not start executing until a call to nextOr
and pause
immediately after yield
, while async blocks are eager,
starting at construction and running until they hit an await
.)
Like its coroutine counterparts, if stream
is given a function
expression, like stream(function(...) ...)
, it will return a
"stream function" i.e. a function that constructs a stream object.
An object with (at least) classes "stream", "channel", "coroutine", "iteror", "iter".
Peter Meilstrup
# emit values _no more than_ once per second
count_to <- stream(function(n, interval=1) {
for (i in 1:n) {
await(delay(interval))
yield(i)
}
})
accumulate <- stream(function(st, sum=0) {
for (i in st) {sum <- sum + i; yield(sum)}
})
print_each <- async(function(st) for (i in st) print(i))
count_to(10) |> accumulate() |> print_each()