R/airflow.R

Defines functions get_csrf_token py_to_r.pendulum.date.Date py_to_r.datetime.timedelta py_to_r_reconvert get_af_dag_args

Documented in get_af_dag_args get_csrf_token

#' Get Airflow csrf token
#'
#' Get csrf token by scraping Airflow's queryview page
#' @param airflow_url base url for airflow instance
#' @return csrf token string
#' @export
get_csrf_token <- function(airflow_url){
  airflow_url %>%
    paste0("/admin/queryview") %>%
    httr::GET() %>%
    httr::content() %>%
    xml2::xml_find_first('.//input[@name="_csrf_token"]') %>%
    xml2::xml_attr('value')
}


# date conversion
py_to_r.pendulum.date.Date <- function(x) {lubridate::as_datetime(as.character(x))}

# timedelta conversion
py_to_r.datetime.timedelta <- function(x) {as.character(x)}

# recursively (re)convert list of reticulate into R equivalents
# Example: py_run_string("from datetime import timedelta; x = {'a' : 1, 'b' : timedelta(minutes=5)}")
#          py_to_r_reconvert(py$x)
py_to_r_reconvert <- function(x){
  rapply(x, function(object) {
    if (inherits(object, "python.builtin.object"))
      reticulate::py_to_r(object)
    else
      object
  }, how = 'replace')
}


#' Get airflow dag args
#'
#' Get specified args (attributes) from all Airflow dags contained in dag_dir
#' @param dag_dir airflow dag directory
#' @param args attributes to extract. NOTE: currently only extracts start_date and schedule_interval - needs generalization.
#' @return data frame of the form (dag_id, dag_args)
#' @export
get_af_dag_args <- function(dag_dir = config::get()$dag_dir,
                            args = c('start_date', 'schedule_interval')){

  pydag_filepath  <- system.file(package = 'laminar')
  pydag           <- reticulate::import_from_path('dag', path = pydag_filepath)
  af_dags         <- pydag$list_dir_dags(dag_dir)
  af_dag_args_lst <- py_to_r_reconvert(pydag$get_dag_args(af_dags, args))
  af_dag_args     <-
    af_dag_args_lst %>%
    {data_frame(dag_id = names(.),
                schedule_interval = purrr::map_chr(., 'schedule_interval', .null = NA_character_),
                start_date = purrr::map_df(., 'start_date') %>% tidyr::gather(dag_id, start_date) %>% .$start_date
    )}

  af_dag_args
}
misha-lisovich/laminar documentation built on May 23, 2019, 9:51 a.m.