knitr::opts_chunk$set(
  collapse = TRUE,
  comment = "#>"
)

The rworker package establishes the interface for running asynchronous tasks sent by Celery. It works by listening for new task execution requests (TERs) comming from the message broker and giving it for the background worker processes to execute.

The rworker function

This is the main function of the package. It basically creates a new instance of the Rworker object, which is responsible for managing TERs comming from the message broker.

library(rworker)

url <- 'redis://localhost:6379'
rwork <- rworker(qname='celery', queue=url, backend=url, workers=2)

The qname argument defines the name of the queue that we should listen for new TERs. By default, Celery queue name is 'celery'. The queue and the backend arguments follow the provider://address:port format and define the provider and the address of the message queue and the task results backend, respectivelly. The last argument workers defines the number of background processes responsible for executing the incoming TERs.

Tasks

Before start receiving TERs, your have to define your tasks. These tasks are simply the functions you want to execute remotelly.

Registering tasks

All tasks that you may want to execute from Celery need to be registered in the Rworker instance.

library(magrittr)

(function(){
  # Simulating long running function
  Sys.sleep(10)
}) %>% rwork$task(name='long_running_task')

(function(){
  # Another dummy function
  print('Hello world')
}) %>% rwork$task(name='hello_world')

The name argument must be unique, since it's used to identify the correct task to be executed.

Task execution progress

Sometimes is nice to now in which point of execution your task currently is. You can do this using the task_progress function.

(function(){
  Sys.sleep(5)
  task_progress(50) # 50% progress
  Sys.sleep(5)
  task_progress(100) # 100% progress
}) %>% rwork$task(name='task_with_progress')

On the Celery side, the progress information is stored inside the .result['progress'] attribute of the AsyncResult object.

Consuming TERs

Now that the desired tasks were alredy registered, we just need to listen for new task execution requests

rwork$consume()

Now, every time you send new tasks from Python using Celery the rwork$consume() method will receive it and execute in background.


Wrapping up

Now let's review the whole ideia. We have three players on the game: Celery (Python), rworker (R) and the message broker (Redis, in this case).

  1. Celery sends task execution request to message broker
  2. rworker consumes task execution request from message broker
  3. rworker executes tasks and stores task state back on the message broker
  4. Celery get task state from message broker

R code

library(rworker)
library(magrittr)

url <- 'redis://localhost:6379'
rwork <- rworker(qname='celery', queue=url, backend=url, workers=2)

(function(){
  # Simulating long running function
  Sys.sleep(10)
}) %>% rwork$task(name='long_running_task')

(function(){
  # Another dummy function
  print('Hello world')
}) %>% rwork$task(name='hello_world')

(function(){
  Sys.sleep(5)
  task_progress(50) # 50% progress
  Sys.sleep(5)
  task_progress(100) # 100% progress
}) %>% rwork$task(name='task_with_progress')

rwork$consume()

Python code

from celery import Celery

url = 'redis://localhost:6379/0'
worker = Celery('app', broker=url, backend=url)
async_result = worker.send_task('task_with_progress')

# Check task progress
async_result.info['progress']

# Check task state
async_result.state


lecardozo/rworker documentation built on May 12, 2021, 5:37 p.m.