The future
package's default behavior is zero parallelism -- hence we need an explicit
future::plan
.
Future's plan::multicore
is more or less like Python's ProcessPoolExecutor
. There exists no
analog of ThreadPoolExecutor
for R, and our understanding is there never will be -- too many
issues with threads and R.
Say you have a 2-node DAG where node B depends on node A.
There is an R process for the call to compute(B)
. Inside that is dag$poll()
which invokes
node$poll()
for both the A and B nodes.
There will be a forked R process (mananged by the future
package) for the call to A's future,
and another forked R process for the call to B's future.
Both of those forked R processes, in turn, are just sitting in a synchronous call to the REST server to execute the UDF/SQL/etc. So, in any non-trivial DAG, the parent/polling process spends most of its time awaiting futures which are running in the child/forked processes, and those are in turn spending most of their time awaiting the results of HTTP requests to the REST server.
Those futures resolve into the parent process' called to resolved()
. The only communication
between child processes and the parent process is via (a) the return value from the future,
and stdout prints by the child. Crucially, prints to stderr are completely lost, and prints to
stdout do not go directly to stdout -- they only are obtainable once the future is
resolved and the parent process gets all the child process' stdout lines all at one go via
self$future_result$stdout)
.
a <- delayed(function() { 9 }, name='a') b <- delayed(function(x) { 10*x }, args=list(a), name='b') o <- compute(a), namespace='namespace_to_charge')
The terminal node's compute()
function instantiates a DAG object and stashes
it as a slot within that node, for easy reference later. Nodes and edges are already
in existence; the DAG's purpose is to check for cycles, identify initial nodes, etc.
The DAG is poll-driven: its poll()
method calls every node's poll()
method
until computation is complete, or has failed at a node. In the latter case
there is a stop()
which aborts the DAG poll and returns control to the user.
Calling compute()
again on the terminal node will leave computed nodes computed,
and running nodes running, but will retry the failed nodes.
If you are doing light debug, just have R do print
/cat
/etc within the node functions -- their
stdout will appear once the future completes.
If you are doing hard debug, and if you need info on child processes which are still running
-- I've used R's broswer
to debug the parent process, but I don't know how to make that to
work to debug child processes. The best experience I can suggest is doing write-to-disk-file
within the child process and then tail that disk file from a separate terminal.
DAG invocation: More detail in the user-level vignette, but the essentials are:
a <- delayed(function() { 9 }, name='a')
b <- delayed(function(x) { 10*x }, args=list(a), name='b')
o <- compute(a), namespace='namespace_to_charge')
show(o$dag_for_terminal)
str(o$dag_for_terminal)
o <- compute(a), namespace='namespace_to_charge') # retry failed nodes
o <- compute(a), namespace='namespace_to_charge', verbose=TRUE)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.