knitr::opts_chunk$set( collapse = TRUE, comment = "#>" )
The rworker
package establishes the interface for running asynchronous tasks sent by Celery. It works by listening for new task execution requests (TERs) comming from the message broker and giving it for the background worker processes to execute.
rworker
functionThis is the main function of the package. It basically creates a new instance of the Rworker
object, which is responsible for managing TERs comming from the message broker.
library(rworker) url <- 'redis://localhost:6379' rwork <- rworker(qname='celery', queue=url, backend=url, workers=2)
The qname
argument defines the name of the queue that we should listen for new TERs. By default, Celery queue name is 'celery'. The queue
and the backend
arguments follow the provider://address:port
format and define the provider and the address of the message queue and the task results backend, respectivelly. The last argument workers
defines the number of background processes responsible for executing the incoming TERs.
Before start receiving TERs, your have to define your tasks. These tasks are simply the functions you want to execute remotelly.
All tasks that you may want to execute from Celery need to be registered in the Rworker
instance.
library(magrittr) (function(){ # Simulating long running function Sys.sleep(10) }) %>% rwork$task(name='long_running_task') (function(){ # Another dummy function print('Hello world') }) %>% rwork$task(name='hello_world')
The name
argument must be unique, since it's used to identify the correct task to be executed.
Sometimes is nice to now in which point of execution your task currently is. You can do this using the task_progress
function.
(function(){ Sys.sleep(5) task_progress(50) # 50% progress Sys.sleep(5) task_progress(100) # 100% progress }) %>% rwork$task(name='task_with_progress')
On the Celery side, the progress information is stored inside the .result['progress']
attribute of the AsyncResult
object.
Now that the desired tasks were alredy registered, we just need to listen for new task execution requests
rwork$consume()
Now, every time you send new tasks from Python using Celery the rwork$consume()
method will receive it and execute in background.
Now let's review the whole ideia. We have three players on the game: Celery (Python), rworker (R) and the message broker (Redis, in this case).
library(rworker) library(magrittr) url <- 'redis://localhost:6379' rwork <- rworker(qname='celery', queue=url, backend=url, workers=2) (function(){ # Simulating long running function Sys.sleep(10) }) %>% rwork$task(name='long_running_task') (function(){ # Another dummy function print('Hello world') }) %>% rwork$task(name='hello_world') (function(){ Sys.sleep(5) task_progress(50) # 50% progress Sys.sleep(5) task_progress(100) # 100% progress }) %>% rwork$task(name='task_with_progress') rwork$consume()
from celery import Celery url = 'redis://localhost:6379/0' worker = Celery('app', broker=url, backend=url) async_result = worker.send_task('task_with_progress') # Check task progress async_result.info['progress'] # Check task state async_result.state
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.