R/query.R

Defines functions convert_results_to_dataframe cloudos.query cloudos.query_results cloudos.query_status cloudos.query_submit_async print.cloudos_tables cloudos.cohort_tables cloudos.sql_validate

Documented in cloudos.cohort_tables cloudos.query cloudos.query_results cloudos.query_status cloudos.query_submit_async cloudos.sql_validate convert_results_to_dataframe print.cloudos_tables

#' Validate SQL Query
#'
#' Validates SQL syntax and references before execution.
#'
#' @param profilename Character. Name of the configured profile to use. If empty or NULL, uses the default profile.
#' @param sql Character. SQL query to validate.
#'
#' @return List with validation results including isValid, tableReferences, and columnReferences.
#' @export
#'
#' @examples
#' \dontrun{
#'   # Validate a SQL query
#'   validation <- cloudos.sql_validate(
#'     profilename = "production",
#'     sql = "SELECT person_id FROM person WHERE year_of_birth >= 1960"
#'   )
#'   
#'   if (validation$isValid) {
#'     cat("SQL is valid\n")
#'     cat("Tables:", paste(validation$tableReferences, collapse = ", "), "\n")
#'   } else {
#'     cat("SQL is invalid:", validation$message, "\n")
#'   }
#' }
cloudos.sql_validate <- function(profilename = "", sql = "") {
  
  # Validate inputs
  validate_required_string(sql, "sql")
  
  # Load profile (uses default if profilename is empty)
  profile <- load_profile(profilename)
  
  # Build endpoint
  endpoint <- "/api/v2-cli/cohort-browser/sql-query/validate"
  
  # Build request body
  body <- list(
    sql = sql
  )
  
  # Build query params
  query_params <- list(
    teamId = profile$workspace_id
  )
  
  # Make request
  response <- tryCatch({
    http_post(profile, endpoint, body, query_params)
  }, error = function(e) {
    stop(sprintf(
      "Failed to validate SQL query:\n  Error: %s",
      e$message
    ), call. = FALSE)
  })
  
  # Return validation result
  return(response)
}


#' List Cohort Tables
#'
#' Retrieves the list of available database schemas and tables for a cohort.
#'
#' @param profilename Character. Name of the configured profile to use. If empty or NULL, uses the default profile.
#' @param cohort_id Character. ID of the cohort to query schemas for.
#'
#' @return List with schema information including databases, tables, and columns.
#' @export
#'
#' @examples
#' \dontrun{
#'   # Get available schemas for a cohort
#'   schemas <- cloudos.cohort_tables(
#'     profilename = "production",
#'     cohort_id = "your-cohort-id"
#'   )
#'   
#'   # Display databases
#'   cat("Available databases:\n")
#'   for (db in schemas) {
#'     cat("  -", db$database, "\n")
#'   }
#' }
cloudos.cohort_tables <- function(profilename = "", cohort_id = "") {
  
  # Validate inputs
  validate_required_string(cohort_id, "cohort_id")
  
  # Load profile (uses default if profilename is empty)
  profile <- load_profile(profilename)
  
  # Build endpoint
  endpoint <- "/api/v2-cli/cohort-browser/schemas"
  
  # Build query params
  query_params <- list(
    cohortId = cohort_id,
    teamId = profile$workspace_id
  )
  
  # Make request
  response <- tryCatch({
    http_get(profile, endpoint, query_params)
  }, error = function(e) {
    stop(sprintf(
      "Failed to fetch cohort schemas:\n  Error: %s",
      e$message
    ), call. = FALSE)
  })
  
  # Add class for custom printing
  class(response) <- c("cloudos_tables", class(response))
  attr(response, "cohort_id") <- cohort_id
  
  # Return schema information
  return(response)
}


#' Print method for cloudos_tables
#'
#' @param x A cloudos_tables object
#' @param ... Additional arguments (unused)
#' @return The \code{cloudos_tables} object \code{x}, returned invisibly.
#'   Called primarily for its side effect of printing a formatted list of
#'   schemas and tables to the console.
#' @export
print.cloudos_tables <- function(x, ...) {
  cohort_id <- attr(x, "cohort_id")
  schema_list <- x$schemas
  
  if (is.null(schema_list) || length(schema_list) == 0) {
    cat("No schemas found\n")
    return(invisible(x))
  }
  
  cat("Cohort", cohort_id, ":\n")
  
  for (schema in schema_list) {
    if (!is.null(schema$tables) && length(schema$tables) > 0) {
      for (table in schema$tables) {
        cat("  - ", schema$name, ".", table$name, "\n", sep = "")
        
        if (!is.null(table$columns) && length(table$columns) > 0) {
          # Show all columns
          for (col in table$columns) {
            cat("      - ", col$name, " (", col$dataType, ")\n", sep = "")
          }
        }
      }
    }
  }
  
  cat("\nTotal: ", length(schema_list), " database(s), ", 
      sum(sapply(schema_list, function(s) length(s$tables))), " table(s)\n", sep = "")
  cat("Use str() to see the full structure, or access x$schemas for the raw data\n")
  
  invisible(x)
}


#' Submit Async SQL Query
#'
#' Starts async SQL execution for a cohort and returns a task ID for tracking.
#'
#' @param profilename Character. Name of the configured profile to use. If empty or NULL, uses the default profile.
#' @param cohort_id Character. ID of the cohort to query.
#' @param sql Character. SQL query to execute.
#' @param pagination List (optional). Pagination settings with pageNumber and pageSize.
#'   Example: list(pageNumber = 0, pageSize = 100). If NULL, API returns default page.
#'
#' @return List with task metadata including task_id, status, and full response.
#' @export
#'
#' @examples
#' \dontrun{
#'   # Submit query without pagination
#'   task <- cloudos.query_submit_async(
#'     profilename = "production",
#'     cohort_id = "699edb4380a6867895f0c9e1",
#'     sql = "SELECT person_id FROM person LIMIT 100"
#'   )
#'   
#'   # Submit query with pagination for page 2 with 50 rows per page
#'   task <- cloudos.query_submit_async(
#'     profilename = "production",
#'     cohort_id = "699edb4380a6867895f0c9e1",
#'     sql = "SELECT person_id FROM person",
#'     pagination = list(pageNumber = 2, pageSize = 50)
#'   )
#'   print(task$task_id)
#' }
cloudos.query_submit_async <- function(profilename = "", cohort_id = "", sql = "", pagination = NULL) {
  
  # Validate inputs
  validate_required_string(cohort_id, "cohort_id")
  validate_required_string(sql, "sql")
  
  # Validate pagination if provided
  if (!is.null(pagination)) {
    if (!is.list(pagination)) {
      stop("Error: pagination must be a list with pageNumber and pageSize.", call. = FALSE)
    }
    if (is.null(pagination$pageNumber) || is.null(pagination$pageSize)) {
      stop("Error: pagination must contain both pageNumber and pageSize.", call. = FALSE)
    }
    if (!is.numeric(pagination$pageNumber) || pagination$pageNumber < 0) {
      stop("Error: pagination$pageNumber must be a non-negative integer.", call. = FALSE)
    }
    if (!is.numeric(pagination$pageSize) || pagination$pageSize < 1) {
      stop("Error: pagination$pageSize must be a positive integer.", call. = FALSE)
    }
  }
  
  # Load profile (uses default if profilename is empty)
  profile <- load_profile(profilename)
  
  # Build endpoint
  endpoint <- sprintf("/api/v2-cli/cohort-browser/cohort/%s/query-results/async", cohort_id)
  
  # Build request body
  body <- list(
    query = sql
  )
  
  # Add pagination if provided
  if (!is.null(pagination)) {
    body$pagination <- pagination
  }
  
  # Build query params
  query_params <- list(
    cohortId = cohort_id,
    teamId = profile$workspace_id
  )
  
  # Make request
  response <- tryCatch({
    http_post(profile, endpoint, body, query_params)
  }, error = function(e) {
    stop(sprintf(
      "Failed to submit query:\n  Cohort ID: %s\n  Error: %s",
      cohort_id,
      e$message
    ), call. = FALSE)
  })
  
  # Extract task information
  if (is.null(response$task) || is.null(response$task$`_id`)) {
    stop("Invalid response from server: missing task ID", call. = FALSE)
  }
  
  # Return task metadata
  result <- list(
    task_id = response$task$`_id`,
    status = response$task$status %||% "unknown",
    query = response$task$query %||% sql,
    type = response$task$type %||% "unknown",
    sync_execution_timeout = response$syncExecutionTimeout %||% 5000,
    full_response = response
  )
  
  message(sprintf("Query submitted successfully. Task ID: %s", result$task_id))
  
  return(result)
}


#' Check Async Query Status
#'
#' Returns the current status and metadata for a submitted async SQL task.
#'
#' @param profilename Character. Name of the configured profile to use. If empty or NULL, uses the default profile.
#' @param task_id Character. Task ID returned from cloudos.query_submit_async().
#'
#' @return List with task status, count of results, and other metadata.
#' @export
#'
#' @examples
#' \dontrun{
#'   status <- cloudos.query_status(
#'     profilename = "production",
#'     task_id = "69a5c58d626fe626da0025ce"
#'   )
#'   print(status$status)
#'   print(status$count_of_results)
#' }
cloudos.query_status <- function(profilename = "", task_id = "") {
  
  # Validate inputs
  validate_required_string(task_id, "task_id")
  
  # Load profile (uses default if profilename is empty)
  profile <- load_profile(profilename)
  
  # Build endpoint
  endpoint <- sprintf("/api/v2-cli/cohort-browser/async-tasks/%s", task_id)
  
  # Build query params
  query_params <- list(
    teamId = profile$workspace_id
  )
  
  # Make request
  response <- tryCatch({
    http_get(profile, endpoint, query_params)
  }, error = function(e) {
    stop(sprintf(
      "Failed to check task status:\n  Task ID: %s\n  Error: %s",
      task_id,
      e$message
    ), call. = FALSE)
  })
  
  # Extract status information
  result <- list(
    task_id = response$`_id` %||% task_id,
    status = response$status %||% "unknown",
    type = response$type %||% "unknown",
    count_of_results = response$countOfResults %||% 0,
    query = response$query %||% "",
    created_at = response$createdAt %||% "",
    started_at = response$startedAt %||% "",
    ended_at = response$endedAt %||% "",
    user = response$user %||% "",
    full_response = response
  )
  
  return(result)
}


#' Fetch Async Query Results
#'
#' Fetches results from a completed async SQL task and returns as a dataframe.
#' 
#' Note: Pagination is controlled when submitting the query via cloudos.query_submit_async(),
#' not when fetching results. This function returns whatever page the task was configured for.
#'
#' @param profilename Character. Name of the configured profile to use. If empty or NULL, uses the default profile.
#' @param task_id Character. Task ID returned from cloudos.query_submit_async().
#'
#' @return Data frame with query results from the page configured at submission time.
#' @export
#'
#' @examples
#' \dontrun{
#'   # Fetch results for a task (returns the page configured when query was submitted)
#'   results <- cloudos.query_results(
#'     profilename = "production",
#'     task_id = "69a5c58d626fe626da0025ce"
#'   )
#' }
cloudos.query_results <- function(profilename = "", task_id = "") {
  
  # Validate inputs
  validate_required_string(task_id, "task_id")
  
  # Load profile (uses default if profilename is empty)
  profile <- load_profile(profilename)
  
  # Build endpoint
  endpoint <- sprintf("/api/v2-cli/cohort-browser/async-tasks/%s/results", task_id)
  
  # Build query params
  query_params <- list(
    teamId = profile$workspace_id
  )
  
  # Fetch results
  response <- tryCatch({
    http_get(profile, endpoint, query_params)
  }, error = function(e) {
    stop(sprintf(
      "Failed to fetch results:\n  Task ID: %s\n  Error: %s",
      task_id,
      e$message
    ), call. = FALSE)
  })
  
  # Extract metadata
  # Note: API response structure is:
  #   - total: total number of rows across all pages
  #   - pageSize: total number of PAGES (not rows per page!)
  #   - pageNumber: current page index
  #   - data: array of rows for this page
  total_rows <- as.numeric(response$total %||% 0)
  total_pages <- as.numeric(response$pageSize %||% 1)  # This IS the number of pages
  current_page <- as.numeric(response$pageNumber %||% 0)
  rows_in_page <- length(response$data)
  
  # Extract column names
  column_names <- sapply(response$columns, function(col) {
    col$name %||% "unknown"
  })
  
  # Convert data to dataframe
  df <- convert_results_to_dataframe(response$data, column_names)
  
  # Add metadata as attributes
  attr(df, "total_rows") <- total_rows
  attr(df, "page") <- current_page
  attr(df, "page_size") <- rows_in_page  # Actual rows in this page
  attr(df, "total_pages") <- total_pages
  
  return(df)
}


#' Execute SQL Query (Orchestrator)
#'
#' High-level function that orchestrates the full query lifecycle:
#' submit -> poll status -> fetch results as dataframe.
#' 
#' IMPORTANT: Pagination works by submitting separate tasks for each page. When all_pages=TRUE,
#' this function submits multiple async tasks (one per page), waits for all to complete,
#' and combines the results. This may take longer for large result sets.
#'
#' @param profilename Character. Name of the configured profile to use. If empty or NULL, uses the default profile.
#' @param cohort_id Character. ID of the cohort to query.
#' @param sql Character. SQL query to execute.
#' @param poll_interval Integer. Seconds between status checks (default: 2).
#' @param max_wait Integer. Maximum seconds to wait for completion (default: 600).
#' @param page_size Integer. Number of rows per page (default: 1000).
#' @param all_pages Logical. Fetch all result pages automatically (default: TRUE).
#'   When TRUE, submits multiple async tasks (one per page) to fetch complete results.
#'
#' @return Data frame with query results.
#' @export
#'
#' @examples
#' \dontrun{
#'   # Fetch all results
#'   results <- cloudos.query(
#'     profilename = "production",
#'     cohort_id = "699edb4380a6867895f0c9e1",
#'     sql = "SELECT person_id, gender_concept_id FROM person LIMIT 500"
#'   )
#'   
#'   # Fetch only first page
#'   results_page1 <- cloudos.query(
#'     profilename = "production",
#'     cohort_id = "699edb4380a6867895f0c9e1",
#'     sql = "SELECT person_id FROM person",
#'     all_pages = FALSE,
#'     page_size = 100
#'   )
#' }
cloudos.query <- function(profilename = "",
                         cohort_id = "",
                         sql = "",
                         poll_interval = 2,
                         max_wait = 600,
                         page_size = 1000,
                         all_pages = TRUE) {
  
  # Validate inputs
  validate_required_string(cohort_id, "cohort_id")
  validate_required_string(sql, "sql")
  
  if (!is.numeric(poll_interval) || poll_interval < 1) {
    stop("Error: poll_interval must be at least 1 second.", call. = FALSE)
  }
  
  if (!is.numeric(max_wait) || max_wait < 1) {
    stop("Error: max_wait must be at least 1 second.", call. = FALSE)
  }
  
  if (!is.numeric(page_size) || page_size < 1) {
    stop("Error: page_size must be a positive integer.", call. = FALSE)
  }
  
  # Helper function to poll a task until completion
  poll_task <- function(task_id, task_description = "") {
    start_time <- Sys.time()
    
    while (TRUE) {
      elapsed <- as.numeric(difftime(Sys.time(), start_time, units = "secs"))
      
      if (elapsed >= max_wait) {
        stop(sprintf(
          "Task did not complete within %d seconds.\nTask ID: %s\nUse cloudos.query_status() to check progress.",
          max_wait,
          task_id
        ), call. = FALSE)
      }
      
      status_info <- tryCatch({
        cloudos.query_status(profilename, task_id)
      }, error = function(e) {
        stop(sprintf("Status check failed for %s: %s", task_description, e$message), call. = FALSE)
      })
      
      current_status <- tolower(trimws(status_info$status))
      
      if (current_status == "completed") {
        return(status_info)
      } else if (current_status == "failed") {
        stop(sprintf(
          "Query execution failed for %s.\nTask ID: %s\nCheck task status for details.",
          task_description,
          task_id
        ), call. = FALSE)
      } else if (current_status %in% c("pending", "running")) {
        message(sprintf("  %s: %s (%.1fs elapsed)...", task_description, trimws(status_info$status), elapsed))
      }
      
      Sys.sleep(poll_interval)
    }
  }
  
  # Step 1: Submit first query (page 0)
  message(sprintf("Submitting query%s...", if(all_pages) " for page 0" else ""))
  
  pagination <- list(pageNumber = 0, pageSize = page_size)
  
  task <- tryCatch({
    cloudos.query_submit_async(profilename, cohort_id, sql, pagination)
  }, error = function(e) {
    stop(sprintf("Query submission failed: %s", e$message), call. = FALSE)
  })
  
  task_id <- task$task_id
  
  # Step 2: Poll for completion
  message(sprintf("Polling for completion (max wait: %d seconds)...", max_wait))
  status_info <- poll_task(task_id, if(all_pages) "Page 0" else "Query")
  
  message(sprintf(
    "Page 0 completed successfully (%s results)",
    status_info$count_of_results
  ))
  
  # Step 3: Fetch first page results
  message("Fetching page 0 results...")
  first_page <- tryCatch({
    cloudos.query_results(profilename, task_id)
  }, error = function(e) {
    stop(sprintf("Failed to fetch results: %s", e$message), call. = FALSE)
  })
  
  # If not fetching all pages, return first page
  if (!all_pages) {
    total_rows <- attr(first_page, "total_rows")
    total_pages <- attr(first_page, "total_pages")
    
    if (total_pages > 1) {
      message(sprintf(
        "\nNote: Query has %d total rows across %d pages. Only page 0 (%d rows) returned.",
        total_rows,
        total_pages,
        nrow(first_page)
      ))
      message("Use all_pages=TRUE to fetch all results.")
    }
    
    message("Query complete!")
    return(first_page)
  }
  
  # Check if there are more pages
  total_rows <- attr(first_page, "total_rows")
  total_pages <- attr(first_page, "total_pages")
  
  if (total_pages <= 1) {
    message(sprintf("All results fetched: %d rows", nrow(first_page)))
    message("Query complete!")
    return(first_page)
  }
  
  # Step 4: Submit tasks for remaining pages
  message(sprintf(
    "\nFetching remaining pages (1 to %d) - total: %d rows across %d pages...",
    total_pages - 1,
    total_rows,
    total_pages
  ))
  
  remaining_tasks <- list()
  
  for (page_num in 1:(total_pages - 1)) {
    pagination <- list(pageNumber = page_num, pageSize = page_size)
    
    task <- tryCatch({
      cloudos.query_submit_async(profilename, cohort_id, sql, pagination)
    }, error = function(e) {
      stop(sprintf("Failed to submit query for page %d: %s", page_num, e$message), call. = FALSE)
    })
    
    remaining_tasks[[page_num]] <- list(
      task_id = task$task_id,
      page_num = page_num
    )
    
    message(sprintf("  Submitted page %d (task ID: %s)", page_num, task$task_id))
    
    # Small delay to avoid overwhelming the server
    Sys.sleep(0.2)
  }
  
  # Step 5: Poll all remaining tasks
  message("\nWaiting for all pages to complete...")
  
  page_results <- list(first_page)
  
  for (task_info in remaining_tasks) {
    page_num <- task_info$page_num
    task_id <- task_info$task_id
    
    message(sprintf("Polling page %d...", page_num))
    status_info <- poll_task(task_id, sprintf("Page %d", page_num))
    
    message(sprintf("Page %d completed, fetching results...", page_num))
    
    page_df <- tryCatch({
      cloudos.query_results(profilename, task_id)
    }, error = function(e) {
      stop(sprintf("Failed to fetch page %d results: %s", page_num, e$message), call. = FALSE)
    })
    
    page_results[[page_num + 1]] <- page_df
  }
  
  # Step 6: Combine all pages
  message("\nCombining all pages...")
  combined_df <- do.call(rbind, page_results)
  
  # Update metadata
  attr(combined_df, "total_rows") <- total_rows
  attr(combined_df, "page") <- 0
  attr(combined_df, "page_size") <- total_rows
  attr(combined_df, "total_pages") <- 1
  attr(combined_df, "all_pages_fetched") <- TRUE
  
  message(sprintf("All pages fetched: %d rows", nrow(combined_df)))
  message("Query complete!")
  
  return(combined_df)
}


#' Convert Query Results to Dataframe
#'
#' Internal function to convert API response data to a dataframe.
#'
#' @param data List. Data rows from API response.
#' @param column_names Character vector. Column names.
#'
#' @return Data frame with query results.
#' @keywords internal
convert_results_to_dataframe <- function(data, column_names) {
  
  # Handle empty results
  if (is.null(data) || length(data) == 0) {
    # Create empty dataframe with correct columns
    df <- as.data.frame(matrix(ncol = length(column_names), nrow = 0))
    colnames(df) <- column_names
    return(df)
  }
  
  # Convert each row to a named list
  rows <- lapply(data, function(row) {
    # Create a named list for the row
    row_data <- list()
    for (i in seq_along(column_names)) {
      col_name <- column_names[i]
      # API returns rows as objects with column names as keys
      value <- row[[col_name]]
      # Handle NULL values
      if (is.null(value)) {
        row_data[[col_name]] <- NA
      } else {
        row_data[[col_name]] <- value
      }
    }
    return(row_data)
  })
  
  # Convert to dataframe
  df <- as.data.frame(do.call(rbind, lapply(rows, function(r) {
    as.data.frame(r, stringsAsFactors = FALSE)
  })), stringsAsFactors = FALSE)
  
  # Ensure column names are set correctly
  colnames(df) <- column_names
  
  return(df)
}

Try the cloudosR package in your browser

Any scripts or data that you put into this service are public.

cloudosR documentation built on June 1, 2026, 5:07 p.m.