knitr::opts_chunk$set( collapse = TRUE, comment = "#>", eval = FALSE ) library(brickster)
# # create a notebook for job examples # # make dir on databricks workspace # db_workspace_mkdirs("/brickster") # files <- db_workspace_list("/brickster/") # files <- sapply(files, function(x) x$path) # # if (!"/brickster/simple-notebook" %in% files) { # # temp dir to create notebook locally # temp_dir <- tempdir() # # # make notebook locally # path <- file.path(temp_dir, "simple-notebook.r") # writeLines(text = "print('hello world')", con = path) # # # import to workspace # db_workspace_import( # path = "/brickster/simple-notebook", # file = path, # format = "SOURCE", # language = "R" # ) # }
{brickster}
provides full coverage of the Jobs REST API's (specifically version 2.1), allowing creation of multi-task jobs via R code.
When listing the jobs the default behaviour is to returns the first 25 jobs, to configure the number of jobs returned you can specify limit
and offset
.
The expand_tasks
parameter will return task and cluster details for each job (default FALSE)
.
# list all jobs within Databricks workspace # can control limit, offset, and if cluster/jobs details are returned jobs <- db_jobs_list(limit = 10) # list all runs within a specific job job_runs <- db_jobs_runs_list(job_id = jobs[[1]]$job_id)
To return the details of a specific job you can use db_jobs_get()
, this requires knowledge of the job_id
which can be found via the user interface in Databricks or using db_jobs_list()
.
# details of a specific job job_details <- db_jobs_get(job_id = jobs[[1]]$job_id)
Each job has one or more tasks which may have dependence upon each other, execution of a job will flow through these tasks - this is known as a "run" of a job. Jobs can be scheduled to start a run on a particular schedule, or by a direct trigger such as db_jobs_run_now()
.
There are a number of functions that are specific to job runs execution and metadata:
| Purpose | Function |
|------------------------------------------:|:---------------------------:|
| Trigger new run of an existing job | db_jobs_run_now()
|
| List runs of a job | db_jobs_runs_list()
|
| Retrieve metadata for a specific job run | db_jobs_runs_get()
|
| Retrieve metadata for a specific task run | db_jobs_runs_get_output()
|
| Export code and/or dashboard for task run | db_jobs_runs_export()
|
| Delete record of a job run | db_jobs_runs_delete()
|
| Cancel run of a job | db_jobs_runs_cancel()
|
When using functions such as db_jobs_run_get_output()
and db_jobs_runs_export()
the required run_id
can refer to the id of a given task. Each task within a given "run" will also have a run_id
that can be referenced.
{brickster}
enables creation of jobs from R, these jobs can be simple single task jobs or complex multi-task jobs with dependencies that share and re-use clusters.
A single-task job can be created with the following components:
job_task()
which defines a singular task
new_cluster()
required by job_task()
, it defines the compute specifications
*_task()
, one of the task functions e.g. notebook_task()
Below is the creation of a simple job that runs a notebook and has a paused schedule:
# define a job task simple_task <- job_task( task_key = "simple_task", description = "a simple task that runs a notebook", # specify a cluster for the job new_cluster = new_cluster( spark_version = "9.1.x-scala2.12", driver_node_type_id = "m5a.large", node_type_id = "m5a.large", num_workers = 2, cloud_attr = aws_attributes(ebs_volume_size = 32) ), # this task will be a notebook task = notebook_task(notebook_path = "/brickster/simple-notebook") )
# create job with simple task simple_task_job <- db_jobs_create( name = "brickster example: simple", tasks = job_tasks(simple_task), # 9am every day, paused currently schedule = cron_schedule( quartz_cron_expression = "0 0 9 * * ?", pause_status = "PAUSED" ) )
Jobs can be extended beyond a singular task, this next example extends the simple job example by now having three tasks. Each subsequent task depends on the priors completion before it can proceed, each task also will define its own new_cluster
.
Whilst this job runs the same notebook each time for the sake of example it demonstrates how to build dependencies.
# one cluster definition, repeatedly use for each task multitask_cluster <- new_cluster( spark_version = "9.1.x-scala2.12", driver_node_type_id = "m5a.large", node_type_id = "m5a.large", num_workers = 2, cloud_attr = aws_attributes(ebs_volume_size = 32) ) # each task will run the same notebook (just for this example) multitask_task <- notebook_task(notebook_path = "/brickster/simple-notebook") # create three simple tasks that will depend on each other # task_a -> task_b -> task_c task_a <- job_task( task_key = "task_a", description = "First task in the sequence", new_cluster = multitask_cluster, task = multitask_task ) task_b <- job_task( task_key = "task_b", description = "Second task in the sequence", new_cluster = multitask_cluster, task = multitask_task, depends_on = "task_a" ) task_c <- job_task( task_key = "task_c", description = "Third task in the sequence", new_cluster = multitask_cluster, task = multitask_task, depends_on = "task_b" )
# create job with multiple tasks multitask_job <- db_jobs_create( name = "brickster example: multi-task", tasks = job_tasks(task_a, task_b, task_c), # 9am every day, paused currently schedule = cron_schedule( quartz_cron_expression = "0 0 9 * * ?", pause_status = "PAUSED" ) )
The multiple tasks example has one shortcoming - clusters are not reused between tasks and therefore each task will have to wait for another cluster to be started before its computations can begin, this will typically add a few minutes to each task.
It can be advantageous to reuse clusters between job tasks to avoid overhead for short lived tasks, or to share resources for tasks which are not computationally demanding.
This example is the same as the previous multi-task job however it now shares a single cluster for each task.
new_cluster
is replaced with job_cluster_key
in each task:
# create three simple tasks that will depend on each other # this time we will use a shared cluster to reduce startup overhead # task_a -> task_b -> task_c task_a <- job_task( task_key = "task_a", description = "First task in the sequence", job_cluster_key = "shared_cluster", task = multitask_task ) task_b <- job_task( task_key = "task_b", description = "Second task in the sequence", job_cluster_key = "shared_cluster", task = multitask_task, depends_on = "task_a" ) task_c <- job_task( task_key = "task_c", description = "Third task in the sequence", job_cluster_key = "shared_cluster", task = multitask_task, depends_on = "task_b" )
job_clusters
is now defined in the db_jobs_create()
call where it accepts a named list of new_clusters()
, we will reuse our example from earlier:
# define job_clusters as a named list of new_cluster() multitask_job_with_reuse <- db_jobs_create( name = "brickster example: multi-task with reuse", job_clusters = list("shared_cluster" = multitask_cluster), tasks = job_tasks(task_a, task_b, task_c), # 9am every day, paused currently schedule = cron_schedule( quartz_cron_expression = "0 0 9 * * ?", pause_status = "PAUSED" ) )
There can be multiple shared clusters and it may be beneficial to give some demanding tasks a larger cluster alone (or to share with other demanding tasks).
It's possible to create one-off jobs that do not have a schedule and only appears under the "jobs runs" tab, not "jobs". db_jobs_runs_submit()
permits the use of an idempotency_token
which can avoid re-submission of a run.
# submit a one off job run # reuse the simple task from first example # idempotency_token guarentees no additional triggers oneoff_job <- db_jobs_runs_submit( tasks = job_tasks(simple_task), run_name = "brickster example: one-off job", idempotency_token = "my_job_run_token" )
Instead of using notebooks contained within the Databricks workspace a job can be referenced in an external git repository and reference a specific branch, tag, or commit.
Let's revisit the simple job example from before and adjust to use git_source()
.\
In this example the repo will have a folder example
which contains simple-notebook.py
.
# define a job task # this time, the notebook_path is relative to the git root directory # omit file extensions like .py or .r simple_task <- job_task( task_key = "simple_task", description = "a simple task that runs a notebook", # specify a cluster for the job new_cluster = new_cluster( spark_version = "9.1.x-scala2.12", driver_node_type_id = "m5a.large", node_type_id = "m5a.large", num_workers = 2, cloud_attr = aws_attributes(ebs_volume_size = 32) ), # this task will be a notebook task = notebook_task(notebook_path = "example/simple-notebook") )
Within the call to db_jobs_create()
use git_source()
to point to the repo containing the notebook.
# create job with simple task simple_task_job <- db_jobs_create( name = "brickster example: simple", tasks = job_tasks(simple_task), # git source points to repo git_source = git_source( git_url = "www.github.com/<user>/<repo>", git_provider = "github", reference = "main", type = "branch" ), # 9am every day, paused currently schedule = cron_schedule( quartz_cron_expression = "0 0 9 * * ?", pause_status = "PAUSED" ) )
When specifying git_source()
all notebook_task()
must be repo paths - it's not possible to have a mix of repo and workspace based notebooks. Additionally, this behaviour only works for notebook_task()
, other tasks such as spark_python_task()
are unsupported.
Jobs can be partially updated, for example to rename an existing job:
# only change the job name db_jobs_update( job_id = multitask_job_with_reuse$job_id, name = "brickster example: renamed job" )
Adding a timeout and adjusting maximum concurrent runs allowed:
# adding timeout and increasing max concurrent runs db_jobs_update( job_id = multitask_job_with_reuse$job_id, timeout_seconds = 60 * 5, max_concurrent_runs = 2 )
You can add new tasks but it may involve some extra steps, either:
Having job_task()
required for job defined already within R and adding new tasks
Specifying all job_task()
's again
Using db_jobs_get()
and parsing the returned metadata into job_task()
's
A complete overwrite of a job can be done with db_jobs_reset()
- this allows the job_id
to remain constant while maintaining the historical run information with prior settings. db_jobs_reset()
is the same as db_jobs_create()
except it takes one additional parameter (job_id
).
db_jobs_list()
and db_jobs_runs_list()
were covered in [job basics].
Jobs can be triggered by:
Their defined schedule (see: cron_schedule()
)
Using db_jobs_run_now()
to trigger a run outside regular schedule or on a paused job.
Creating a one off job run via db_jobs_runs_submit()
The only example yet to be covered is db_jobs_run_now()
, it accepts parameters as named lists:
# invoke simple job example triggered_run <- db_jobs_run_now(job_id = simple_task_job$job_id)
Runs can also be cancelled:
# cancel run whilst it is in progress db_jobs_runs_cancel(run_id = triggered_run$run_id)
Jobs and runs can be deleted with db_jobs_delete()
and db_jobs_runs_delete()
respectively.
Cleaning up all jobs created in this documentation:
db_jobs_delete(job_id = simple_task_job$job_id) db_jobs_delete(job_id = multitask_job$job_id) db_jobs_delete(job_id = multitask_job_with_reuse$job_id)
# db_workspace_delete("/brickster/simple-notebook")
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.