inst/doc/managing-jobs.R

## ----include = FALSE----------------------------------------------------------
knitr::opts_chunk$set(
  collapse = TRUE,
  comment = "#>",
  eval = FALSE
)
library(brickster)

## ----include = FALSE----------------------------------------------------------
#  # # create a notebook for job examples
#  # # make dir on databricks workspace
#  # db_workspace_mkdirs("/brickster")
#  # files <- db_workspace_list("/brickster/")
#  # files <- sapply(files, function(x) x$path)
#  #
#  # if (!"/brickster/simple-notebook" %in% files) {
#  #   # temp dir to create notebook locally
#  #   temp_dir <- tempdir()
#  #
#  #   # make notebook locally
#  #   path <- file.path(temp_dir, "simple-notebook.r")
#  #   writeLines(text = "print('hello world')", con = path)
#  #
#  #   # import to workspace
#  #   db_workspace_import(
#  #     path = "/brickster/simple-notebook",
#  #     file = path,
#  #     format = "SOURCE",
#  #     language = "R"
#  #   )
#  # }

## -----------------------------------------------------------------------------
#  # list all jobs within Databricks workspace
#  # can control limit, offset, and if cluster/jobs details are returned
#  jobs <- db_jobs_list(limit = 10)
#  
#  # list all runs within a specific job
#  job_runs <- db_jobs_runs_list(job_id = jobs[[1]]$job_id)

## -----------------------------------------------------------------------------
#  # details of a specific job
#  job_details <- db_jobs_get(job_id = jobs[[1]]$job_id)

## -----------------------------------------------------------------------------
#  # define a job task
#  simple_task <- job_task(
#    task_key = "simple_task",
#    description = "a simple task that runs a notebook",
#    # specify a cluster for the job
#    new_cluster = new_cluster(
#      spark_version = "9.1.x-scala2.12",
#      driver_node_type_id = "m5a.large",
#      node_type_id = "m5a.large",
#      num_workers = 2,
#      cloud_attr = aws_attributes(ebs_volume_size = 32)
#    ),
#    # this task will be a notebook
#    task = notebook_task(notebook_path = "/brickster/simple-notebook")
#  )

## -----------------------------------------------------------------------------
#  # create job with simple task
#  simple_task_job <- db_jobs_create(
#    name = "brickster example: simple",
#    tasks = job_tasks(simple_task),
#    # 9am every day, paused currently
#    schedule = cron_schedule(
#      quartz_cron_expression = "0 0 9 * * ?",
#      pause_status = "PAUSED"
#    )
#  )

## -----------------------------------------------------------------------------
#  # one cluster definition, repeatedly use for each task
#  multitask_cluster <- new_cluster(
#    spark_version = "9.1.x-scala2.12",
#    driver_node_type_id = "m5a.large",
#    node_type_id = "m5a.large",
#    num_workers = 2,
#    cloud_attr = aws_attributes(ebs_volume_size = 32)
#  )
#  
#  # each task will run the same notebook (just for this example)
#  multitask_task <- notebook_task(notebook_path = "/brickster/simple-notebook")
#  
#  # create three simple tasks that will depend on each other
#  # task_a -> task_b -> task_c
#  task_a <- job_task(
#    task_key = "task_a",
#    description = "First task in the sequence",
#    new_cluster = multitask_cluster,
#    task = multitask_task
#  )
#  
#  task_b <- job_task(
#    task_key = "task_b",
#    description = "Second task in the sequence",
#    new_cluster = multitask_cluster,
#    task = multitask_task,
#    depends_on = "task_a"
#  )
#  
#  task_c <- job_task(
#    task_key = "task_c",
#    description = "Third task in the sequence",
#    new_cluster = multitask_cluster,
#    task = multitask_task,
#    depends_on = "task_b"
#  )

## -----------------------------------------------------------------------------
#  # create job with multiple tasks
#  multitask_job <- db_jobs_create(
#    name = "brickster example: multi-task",
#    tasks = job_tasks(task_a, task_b, task_c),
#    # 9am every day, paused currently
#    schedule = cron_schedule(
#      quartz_cron_expression = "0 0 9 * * ?",
#      pause_status = "PAUSED"
#    )
#  )

## -----------------------------------------------------------------------------
#  # create three simple tasks that will depend on each other
#  # this time we will use a shared cluster to reduce startup overhead
#  # task_a -> task_b -> task_c
#  task_a <- job_task(
#    task_key = "task_a",
#    description = "First task in the sequence",
#    job_cluster_key = "shared_cluster",
#    task = multitask_task
#  )
#  
#  task_b <- job_task(
#    task_key = "task_b",
#    description = "Second task in the sequence",
#    job_cluster_key = "shared_cluster",
#    task = multitask_task,
#    depends_on = "task_a"
#  )
#  
#  task_c <- job_task(
#    task_key = "task_c",
#    description = "Third task in the sequence",
#    job_cluster_key = "shared_cluster",
#    task = multitask_task,
#    depends_on = "task_b"
#  )

## -----------------------------------------------------------------------------
#  # define job_clusters as a named list of new_cluster()
#  multitask_job_with_reuse <- db_jobs_create(
#    name = "brickster example: multi-task with reuse",
#    job_clusters = list("shared_cluster" = multitask_cluster),
#    tasks = job_tasks(task_a, task_b, task_c),
#    # 9am every day, paused currently
#    schedule = cron_schedule(
#      quartz_cron_expression = "0 0 9 * * ?",
#      pause_status = "PAUSED"
#    )
#  )

## -----------------------------------------------------------------------------
#  # submit a one off job run
#  # reuse the simple task from first example
#  # idempotency_token guarentees no additional triggers
#  oneoff_job <- db_jobs_runs_submit(
#    tasks = job_tasks(simple_task),
#    run_name = "brickster example: one-off job",
#    idempotency_token = "my_job_run_token"
#  )

## -----------------------------------------------------------------------------
#  # define a job task
#  # this time, the notebook_path is relative to the git root directory
#  # omit file extensions like .py or .r
#  simple_task <- job_task(
#    task_key = "simple_task",
#    description = "a simple task that runs a notebook",
#    # specify a cluster for the job
#    new_cluster = new_cluster(
#      spark_version = "9.1.x-scala2.12",
#      driver_node_type_id = "m5a.large",
#      node_type_id = "m5a.large",
#      num_workers = 2,
#      cloud_attr = aws_attributes(ebs_volume_size = 32)
#    ),
#    # this task will be a notebook
#    task = notebook_task(notebook_path = "example/simple-notebook")
#  )

## -----------------------------------------------------------------------------
#  # create job with simple task
#  simple_task_job <- db_jobs_create(
#    name = "brickster example: simple",
#    tasks = job_tasks(simple_task),
#    # git source points to repo
#    git_source = git_source(
#      git_url = "www.github.com/<user>/<repo>",
#      git_provider = "github",
#      reference = "main",
#      type = "branch"
#    ),
#    # 9am every day, paused currently
#    schedule = cron_schedule(
#      quartz_cron_expression = "0 0 9 * * ?",
#      pause_status = "PAUSED"
#    )
#  )

## ----results = FALSE----------------------------------------------------------
#  # only change the job name
#  db_jobs_update(
#    job_id = multitask_job_with_reuse$job_id,
#    name = "brickster example: renamed job"
#  )

## ----results = FALSE----------------------------------------------------------
#  # adding timeout and increasing max concurrent runs
#  db_jobs_update(
#    job_id = multitask_job_with_reuse$job_id,
#    timeout_seconds = 60 * 5,
#    max_concurrent_runs = 2
#  )

## -----------------------------------------------------------------------------
#  # invoke simple job example
#  triggered_run <- db_jobs_run_now(job_id = simple_task_job$job_id)

## ----results = FALSE----------------------------------------------------------
#  # cancel run whilst it is in progress
#  db_jobs_runs_cancel(run_id = triggered_run$run_id)

## ----results = FALSE----------------------------------------------------------
#  db_jobs_delete(job_id = simple_task_job$job_id)
#  db_jobs_delete(job_id = multitask_job$job_id)
#  db_jobs_delete(job_id = multitask_job_with_reuse$job_id)

## ----cleanup, include = FALSE-------------------------------------------------
#  # db_workspace_delete("/brickster/simple-notebook")

Try the brickster package in your browser

Any scripts or data that you put into this service are public.

brickster documentation built on April 12, 2025, 1:21 a.m.