| stream | R Documentation |
(Experimental as of async 0.3) stream(...) constructs a channel
object, i.e. an asynchronous iterator, which will compute and
return values according to sequential code written in expr. A
stream is a coroutine wearing a channel interface in the same
way that async is a coroutine wearing a promise interface, and a
gen is a coroutine sitting behind an iteror interface.
stream(
expr,
...,
split_pipes = TRUE,
lazy = TRUE,
compileLevel = getOption("async.compileLevel"),
debugR = FALSE,
debugInternal = FALSE,
trace = getOption("async.verbose")
)
expr |
A coroutine expression, using some combination of
|
... |
Undocumented. |
split_pipes |
See description under async; defaults to
|
lazy |
If TRUE, start paused, and pause after |
compileLevel |
Compilation level. |
debugR |
Set TRUE to single-step debug at R level. Use |
debugInternal |
Set TRUE to single-step debug at coroutine implementation level. |
trace |
An optional tracing function. |
In a stream expression, you can call yield() to emit a value, and
await() to wait for a value from a promise. To have your stream
wait for values from another stream or channel, call
awaitNext(); you can also use awaitNext when you are writing an
async. You can also use a simple for loop to consume all future
values from a stream or channel.
The lower-level interface to consume values from a stream is by using nextThen from the channel interface.
Streams come in both "lazy" and "eager" varieties. If lazy=TRUE,
a stream starts idle, and does not process anything
until it is woken up by a call to its channel's nextThen. It will
pause after reaching yield if there are no more outstanding
requests. If lazy=FALSE, a stream will begin executing
immediately, not pausing on yield, possibly queuing up emitted
values until it needs to await something.
(For comparison, in this package, gen are lazy in that they do
not start executing until a call to nextOr and pause
immediately after yield, while async blocks are eager,
starting at construction and running until they hit an await.)
Like its coroutine counterparts, if stream is given a function
expression, like stream(function(...) ...), it will return a
"stream function" i.e. a function that constructs a stream object.
An object with (at least) classes "stream", "channel", "coroutine", "iteror", "iter".
Peter Meilstrup
# emit values _no more than_ once per second
count_to <- stream(function(n, interval=1) {
for (i in 1:n) {
await(delay(interval))
yield(i)
}
})
accumulate <- stream(function(st, sum=0) {
for (i in st) {sum <- sum + i; yield(sum)}
})
print_each <- async(function(st) for (i in st) print(i))
count_to(10) |> accumulate() |> print_each()
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.