#| echo: false knitr::opts_chunk$set( collapse = FALSE, comment = "", out.width = "100%", cache = FALSE, asciicast_knitr_output = "html" ) asciicast::init_knitr_engine( echo = TRUE, echo_input = FALSE, same_process = TRUE, startup = quote({ library(maestro) set.seed(1) }) ) options(asciicast_theme = "pkgdown")
A common task in data engineering is to automate, schedule, and monitor multiple data processing pipelines. This is called orchestration. maestro
is an R package that helps orchestrate data pipelines.
A fully realized maestro
project involves the following components and actions:
Collection of pipelines (R functions to be orchestrated, such as batch ETL jobs)
Orchestrator - an R script or Quarto doc that orchestrates the pipelines and monitors them
A process external to R to schedule the orchestrator (e.g., cron, Posit Connect).
library(maestro)
Create a maestro
project in an existing project or a new project using create_maestro()
or the New Project wizard in RStudio. This creates the orchestrator script and the folder of pipelines with one sample pipeline. Your project should look something like this:
dir.create("pipelines") writeLines( " #' my_pipe maestro pipeline #' #' @maestroFrequency 1 day #' @maestroStartTime 2024-05-24 #' @maestroTz UTC #' @maestroLogLevel INFO my_pipe <- function() { # Pipeline code }", con = "pipelines/my_pipe.R" )
maestro_project ├── maestro_project.Rproj ├── orchestrator.R └── pipelines ├── my_pipe.R └── another_pipe.R
Pipelines are the jobs you want to automate, schedule, and monitor. For the most part, they're regular R functions with a special sprinkling of comments.
A pipeline is simply an R function with decorators called maestro tags. Maestro tags are special code comments used for communicating the scheduling and configuration of a pipeline to the orchestrator. Let's take a quick look at the sample my_pipe.R
:
#' my_pipe maestro pipeline #' #' @maestroFrequency 1 day #' @maestroStartTime 2024-05-24 #' @maestroTz UTC #' @maestroLogLevel INFO my_pipe <- function() { # Pipeline code }
my_pipe
is a function with an empty body - so right now it won't do anything. The comments above are interpreted by maestro
as "this function is scheduled to run every day starting at 2024-05-24 (00:00:00) UTC time".
maestroFrequency and maestroStartTime are the most important tags for scheduling. Frequency is how often you want the pipeline to run and can be formatted as a single string like hourly, daily, weekly, biweekly, etc. or with a number and a unit (e.g., 1 day, 3 hours, etc.).
Note that you don't need to provide all these tags. A single maestro tag is enough to distinguish it as a pipeline. Pipelines missing tags will use consistent defaults (e.g., if maestroFrequency
is missing the default is 1 day/daily).
In most use cases, the actual code inside of my_pipe
would be to run an ETL job (extract data from a source, transform it, and load it into a file system or database). In technical terms, it's the side effect of the code and not its return value that is important.
Here's a more realistic, albeit impractical, example:
#' my_pipe maestro pipeline #' #' @maestroFrequency 1 day #' @maestroStartTime 2024-05-24 #' @maestroTz UTC #' @maestroLogLevel INFO my_pipe <- function() { random_data <- data.frame( letters = sample(letters, 10), numbers = sample.int(10) ) write.csv(random_data, file = tempfile()) }
A project with a single pipeline is ok, but in maestro
is more useful when you have multiple jobs to run. You can add more pipelines to your pipelines directory manually or use create_pipeline()
:
#| eval: false create_pipeline( pipe_name = "another_pipeline", pipeline_dir = "pipelines", frequency = "1 hour", start_time = "2024-05-17 15:00:00", tz = "America/Halifax", log_level = "ERROR" )
The orchestrator is the process that schedules and monitors the pipelines.
The orchestrator can be an R script, Quarto/RMarkdown doc, but here we'll use a regular R script. Here is where you'll run maestro
functions. The two main functions are build_schedule()
and run_schedule()
.
library(maestro) schedule <- build_schedule() output <- run_schedule( schedule, orch_frequency = "1 hour" )
Building the schedule gets maestro
to look through the pipelines in the pipelines folder and creates a schedule object. Then, you pass that to run_schedule()
along with how often the orchestrator is supposed to run. It is important to tell maestro
how often it'll be checking the pipelines using the orch_frequency
parameter. Here, we're informing it that the orchestrator is running every 1 hour.
Importantly, it isn't maestro
's job to actually run it this often - it's your job to make sure it runs at that frequency (e.g., deploying it via cron or some cloud environment where code can be scheduled).[^1]
[^1]: The decision around how often to run the orchestrator depends on the frequencies of the pipelines in the project. The simplest guideline is to take the frequency of your most often recurring pipeline and split that in half. So for example, if my most often running pipeline runs every 1 day then my orchestrator should run every 12 hours.
unlink("pipelines", recursive = TRUE)
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.