Nothing
#' Validate SQL Query
#'
#' Validates SQL syntax and references before execution.
#'
#' @param profilename Character. Name of the configured profile to use. If empty or NULL, uses the default profile.
#' @param sql Character. SQL query to validate.
#'
#' @return List with validation results including isValid, tableReferences, and columnReferences.
#' @export
#'
#' @examples
#' \dontrun{
#' # Validate a SQL query
#' validation <- cloudos.sql_validate(
#' profilename = "production",
#' sql = "SELECT person_id FROM person WHERE year_of_birth >= 1960"
#' )
#'
#' if (validation$isValid) {
#' cat("SQL is valid\n")
#' cat("Tables:", paste(validation$tableReferences, collapse = ", "), "\n")
#' } else {
#' cat("SQL is invalid:", validation$message, "\n")
#' }
#' }
cloudos.sql_validate <- function(profilename = "", sql = "") {
# Validate inputs
validate_required_string(sql, "sql")
# Load profile (uses default if profilename is empty)
profile <- load_profile(profilename)
# Build endpoint
endpoint <- "/api/v2-cli/cohort-browser/sql-query/validate"
# Build request body
body <- list(
sql = sql
)
# Build query params
query_params <- list(
teamId = profile$workspace_id
)
# Make request
response <- tryCatch({
http_post(profile, endpoint, body, query_params)
}, error = function(e) {
stop(sprintf(
"Failed to validate SQL query:\n Error: %s",
e$message
), call. = FALSE)
})
# Return validation result
return(response)
}
#' List Cohort Tables
#'
#' Retrieves the list of available database schemas and tables for a cohort.
#'
#' @param profilename Character. Name of the configured profile to use. If empty or NULL, uses the default profile.
#' @param cohort_id Character. ID of the cohort to query schemas for.
#'
#' @return List with schema information including databases, tables, and columns.
#' @export
#'
#' @examples
#' \dontrun{
#' # Get available schemas for a cohort
#' schemas <- cloudos.cohort_tables(
#' profilename = "production",
#' cohort_id = "your-cohort-id"
#' )
#'
#' # Display databases
#' cat("Available databases:\n")
#' for (db in schemas) {
#' cat(" -", db$database, "\n")
#' }
#' }
cloudos.cohort_tables <- function(profilename = "", cohort_id = "") {
# Validate inputs
validate_required_string(cohort_id, "cohort_id")
# Load profile (uses default if profilename is empty)
profile <- load_profile(profilename)
# Build endpoint
endpoint <- "/api/v2-cli/cohort-browser/schemas"
# Build query params
query_params <- list(
cohortId = cohort_id,
teamId = profile$workspace_id
)
# Make request
response <- tryCatch({
http_get(profile, endpoint, query_params)
}, error = function(e) {
stop(sprintf(
"Failed to fetch cohort schemas:\n Error: %s",
e$message
), call. = FALSE)
})
# Add class for custom printing
class(response) <- c("cloudos_tables", class(response))
attr(response, "cohort_id") <- cohort_id
# Return schema information
return(response)
}
#' Print method for cloudos_tables
#'
#' @param x A cloudos_tables object
#' @param ... Additional arguments (unused)
#' @return The \code{cloudos_tables} object \code{x}, returned invisibly.
#' Called primarily for its side effect of printing a formatted list of
#' schemas and tables to the console.
#' @export
print.cloudos_tables <- function(x, ...) {
cohort_id <- attr(x, "cohort_id")
schema_list <- x$schemas
if (is.null(schema_list) || length(schema_list) == 0) {
cat("No schemas found\n")
return(invisible(x))
}
cat("Cohort", cohort_id, ":\n")
for (schema in schema_list) {
if (!is.null(schema$tables) && length(schema$tables) > 0) {
for (table in schema$tables) {
cat(" - ", schema$name, ".", table$name, "\n", sep = "")
if (!is.null(table$columns) && length(table$columns) > 0) {
# Show all columns
for (col in table$columns) {
cat(" - ", col$name, " (", col$dataType, ")\n", sep = "")
}
}
}
}
}
cat("\nTotal: ", length(schema_list), " database(s), ",
sum(sapply(schema_list, function(s) length(s$tables))), " table(s)\n", sep = "")
cat("Use str() to see the full structure, or access x$schemas for the raw data\n")
invisible(x)
}
#' Submit Async SQL Query
#'
#' Starts async SQL execution for a cohort and returns a task ID for tracking.
#'
#' @param profilename Character. Name of the configured profile to use. If empty or NULL, uses the default profile.
#' @param cohort_id Character. ID of the cohort to query.
#' @param sql Character. SQL query to execute.
#' @param pagination List (optional). Pagination settings with pageNumber and pageSize.
#' Example: list(pageNumber = 0, pageSize = 100). If NULL, API returns default page.
#'
#' @return List with task metadata including task_id, status, and full response.
#' @export
#'
#' @examples
#' \dontrun{
#' # Submit query without pagination
#' task <- cloudos.query_submit_async(
#' profilename = "production",
#' cohort_id = "699edb4380a6867895f0c9e1",
#' sql = "SELECT person_id FROM person LIMIT 100"
#' )
#'
#' # Submit query with pagination for page 2 with 50 rows per page
#' task <- cloudos.query_submit_async(
#' profilename = "production",
#' cohort_id = "699edb4380a6867895f0c9e1",
#' sql = "SELECT person_id FROM person",
#' pagination = list(pageNumber = 2, pageSize = 50)
#' )
#' print(task$task_id)
#' }
cloudos.query_submit_async <- function(profilename = "", cohort_id = "", sql = "", pagination = NULL) {
# Validate inputs
validate_required_string(cohort_id, "cohort_id")
validate_required_string(sql, "sql")
# Validate pagination if provided
if (!is.null(pagination)) {
if (!is.list(pagination)) {
stop("Error: pagination must be a list with pageNumber and pageSize.", call. = FALSE)
}
if (is.null(pagination$pageNumber) || is.null(pagination$pageSize)) {
stop("Error: pagination must contain both pageNumber and pageSize.", call. = FALSE)
}
if (!is.numeric(pagination$pageNumber) || pagination$pageNumber < 0) {
stop("Error: pagination$pageNumber must be a non-negative integer.", call. = FALSE)
}
if (!is.numeric(pagination$pageSize) || pagination$pageSize < 1) {
stop("Error: pagination$pageSize must be a positive integer.", call. = FALSE)
}
}
# Load profile (uses default if profilename is empty)
profile <- load_profile(profilename)
# Build endpoint
endpoint <- sprintf("/api/v2-cli/cohort-browser/cohort/%s/query-results/async", cohort_id)
# Build request body
body <- list(
query = sql
)
# Add pagination if provided
if (!is.null(pagination)) {
body$pagination <- pagination
}
# Build query params
query_params <- list(
cohortId = cohort_id,
teamId = profile$workspace_id
)
# Make request
response <- tryCatch({
http_post(profile, endpoint, body, query_params)
}, error = function(e) {
stop(sprintf(
"Failed to submit query:\n Cohort ID: %s\n Error: %s",
cohort_id,
e$message
), call. = FALSE)
})
# Extract task information
if (is.null(response$task) || is.null(response$task$`_id`)) {
stop("Invalid response from server: missing task ID", call. = FALSE)
}
# Return task metadata
result <- list(
task_id = response$task$`_id`,
status = response$task$status %||% "unknown",
query = response$task$query %||% sql,
type = response$task$type %||% "unknown",
sync_execution_timeout = response$syncExecutionTimeout %||% 5000,
full_response = response
)
message(sprintf("Query submitted successfully. Task ID: %s", result$task_id))
return(result)
}
#' Check Async Query Status
#'
#' Returns the current status and metadata for a submitted async SQL task.
#'
#' @param profilename Character. Name of the configured profile to use. If empty or NULL, uses the default profile.
#' @param task_id Character. Task ID returned from cloudos.query_submit_async().
#'
#' @return List with task status, count of results, and other metadata.
#' @export
#'
#' @examples
#' \dontrun{
#' status <- cloudos.query_status(
#' profilename = "production",
#' task_id = "69a5c58d626fe626da0025ce"
#' )
#' print(status$status)
#' print(status$count_of_results)
#' }
cloudos.query_status <- function(profilename = "", task_id = "") {
# Validate inputs
validate_required_string(task_id, "task_id")
# Load profile (uses default if profilename is empty)
profile <- load_profile(profilename)
# Build endpoint
endpoint <- sprintf("/api/v2-cli/cohort-browser/async-tasks/%s", task_id)
# Build query params
query_params <- list(
teamId = profile$workspace_id
)
# Make request
response <- tryCatch({
http_get(profile, endpoint, query_params)
}, error = function(e) {
stop(sprintf(
"Failed to check task status:\n Task ID: %s\n Error: %s",
task_id,
e$message
), call. = FALSE)
})
# Extract status information
result <- list(
task_id = response$`_id` %||% task_id,
status = response$status %||% "unknown",
type = response$type %||% "unknown",
count_of_results = response$countOfResults %||% 0,
query = response$query %||% "",
created_at = response$createdAt %||% "",
started_at = response$startedAt %||% "",
ended_at = response$endedAt %||% "",
user = response$user %||% "",
full_response = response
)
return(result)
}
#' Fetch Async Query Results
#'
#' Fetches results from a completed async SQL task and returns as a dataframe.
#'
#' Note: Pagination is controlled when submitting the query via cloudos.query_submit_async(),
#' not when fetching results. This function returns whatever page the task was configured for.
#'
#' @param profilename Character. Name of the configured profile to use. If empty or NULL, uses the default profile.
#' @param task_id Character. Task ID returned from cloudos.query_submit_async().
#'
#' @return Data frame with query results from the page configured at submission time.
#' @export
#'
#' @examples
#' \dontrun{
#' # Fetch results for a task (returns the page configured when query was submitted)
#' results <- cloudos.query_results(
#' profilename = "production",
#' task_id = "69a5c58d626fe626da0025ce"
#' )
#' }
cloudos.query_results <- function(profilename = "", task_id = "") {
# Validate inputs
validate_required_string(task_id, "task_id")
# Load profile (uses default if profilename is empty)
profile <- load_profile(profilename)
# Build endpoint
endpoint <- sprintf("/api/v2-cli/cohort-browser/async-tasks/%s/results", task_id)
# Build query params
query_params <- list(
teamId = profile$workspace_id
)
# Fetch results
response <- tryCatch({
http_get(profile, endpoint, query_params)
}, error = function(e) {
stop(sprintf(
"Failed to fetch results:\n Task ID: %s\n Error: %s",
task_id,
e$message
), call. = FALSE)
})
# Extract metadata
# Note: API response structure is:
# - total: total number of rows across all pages
# - pageSize: total number of PAGES (not rows per page!)
# - pageNumber: current page index
# - data: array of rows for this page
total_rows <- as.numeric(response$total %||% 0)
total_pages <- as.numeric(response$pageSize %||% 1) # This IS the number of pages
current_page <- as.numeric(response$pageNumber %||% 0)
rows_in_page <- length(response$data)
# Extract column names
column_names <- sapply(response$columns, function(col) {
col$name %||% "unknown"
})
# Convert data to dataframe
df <- convert_results_to_dataframe(response$data, column_names)
# Add metadata as attributes
attr(df, "total_rows") <- total_rows
attr(df, "page") <- current_page
attr(df, "page_size") <- rows_in_page # Actual rows in this page
attr(df, "total_pages") <- total_pages
return(df)
}
#' Execute SQL Query (Orchestrator)
#'
#' High-level function that orchestrates the full query lifecycle:
#' submit -> poll status -> fetch results as dataframe.
#'
#' IMPORTANT: Pagination works by submitting separate tasks for each page. When all_pages=TRUE,
#' this function submits multiple async tasks (one per page), waits for all to complete,
#' and combines the results. This may take longer for large result sets.
#'
#' @param profilename Character. Name of the configured profile to use. If empty or NULL, uses the default profile.
#' @param cohort_id Character. ID of the cohort to query.
#' @param sql Character. SQL query to execute.
#' @param poll_interval Integer. Seconds between status checks (default: 2).
#' @param max_wait Integer. Maximum seconds to wait for completion (default: 600).
#' @param page_size Integer. Number of rows per page (default: 1000).
#' @param all_pages Logical. Fetch all result pages automatically (default: TRUE).
#' When TRUE, submits multiple async tasks (one per page) to fetch complete results.
#'
#' @return Data frame with query results.
#' @export
#'
#' @examples
#' \dontrun{
#' # Fetch all results
#' results <- cloudos.query(
#' profilename = "production",
#' cohort_id = "699edb4380a6867895f0c9e1",
#' sql = "SELECT person_id, gender_concept_id FROM person LIMIT 500"
#' )
#'
#' # Fetch only first page
#' results_page1 <- cloudos.query(
#' profilename = "production",
#' cohort_id = "699edb4380a6867895f0c9e1",
#' sql = "SELECT person_id FROM person",
#' all_pages = FALSE,
#' page_size = 100
#' )
#' }
cloudos.query <- function(profilename = "",
cohort_id = "",
sql = "",
poll_interval = 2,
max_wait = 600,
page_size = 1000,
all_pages = TRUE) {
# Validate inputs
validate_required_string(cohort_id, "cohort_id")
validate_required_string(sql, "sql")
if (!is.numeric(poll_interval) || poll_interval < 1) {
stop("Error: poll_interval must be at least 1 second.", call. = FALSE)
}
if (!is.numeric(max_wait) || max_wait < 1) {
stop("Error: max_wait must be at least 1 second.", call. = FALSE)
}
if (!is.numeric(page_size) || page_size < 1) {
stop("Error: page_size must be a positive integer.", call. = FALSE)
}
# Helper function to poll a task until completion
poll_task <- function(task_id, task_description = "") {
start_time <- Sys.time()
while (TRUE) {
elapsed <- as.numeric(difftime(Sys.time(), start_time, units = "secs"))
if (elapsed >= max_wait) {
stop(sprintf(
"Task did not complete within %d seconds.\nTask ID: %s\nUse cloudos.query_status() to check progress.",
max_wait,
task_id
), call. = FALSE)
}
status_info <- tryCatch({
cloudos.query_status(profilename, task_id)
}, error = function(e) {
stop(sprintf("Status check failed for %s: %s", task_description, e$message), call. = FALSE)
})
current_status <- tolower(trimws(status_info$status))
if (current_status == "completed") {
return(status_info)
} else if (current_status == "failed") {
stop(sprintf(
"Query execution failed for %s.\nTask ID: %s\nCheck task status for details.",
task_description,
task_id
), call. = FALSE)
} else if (current_status %in% c("pending", "running")) {
message(sprintf(" %s: %s (%.1fs elapsed)...", task_description, trimws(status_info$status), elapsed))
}
Sys.sleep(poll_interval)
}
}
# Step 1: Submit first query (page 0)
message(sprintf("Submitting query%s...", if(all_pages) " for page 0" else ""))
pagination <- list(pageNumber = 0, pageSize = page_size)
task <- tryCatch({
cloudos.query_submit_async(profilename, cohort_id, sql, pagination)
}, error = function(e) {
stop(sprintf("Query submission failed: %s", e$message), call. = FALSE)
})
task_id <- task$task_id
# Step 2: Poll for completion
message(sprintf("Polling for completion (max wait: %d seconds)...", max_wait))
status_info <- poll_task(task_id, if(all_pages) "Page 0" else "Query")
message(sprintf(
"Page 0 completed successfully (%s results)",
status_info$count_of_results
))
# Step 3: Fetch first page results
message("Fetching page 0 results...")
first_page <- tryCatch({
cloudos.query_results(profilename, task_id)
}, error = function(e) {
stop(sprintf("Failed to fetch results: %s", e$message), call. = FALSE)
})
# If not fetching all pages, return first page
if (!all_pages) {
total_rows <- attr(first_page, "total_rows")
total_pages <- attr(first_page, "total_pages")
if (total_pages > 1) {
message(sprintf(
"\nNote: Query has %d total rows across %d pages. Only page 0 (%d rows) returned.",
total_rows,
total_pages,
nrow(first_page)
))
message("Use all_pages=TRUE to fetch all results.")
}
message("Query complete!")
return(first_page)
}
# Check if there are more pages
total_rows <- attr(first_page, "total_rows")
total_pages <- attr(first_page, "total_pages")
if (total_pages <= 1) {
message(sprintf("All results fetched: %d rows", nrow(first_page)))
message("Query complete!")
return(first_page)
}
# Step 4: Submit tasks for remaining pages
message(sprintf(
"\nFetching remaining pages (1 to %d) - total: %d rows across %d pages...",
total_pages - 1,
total_rows,
total_pages
))
remaining_tasks <- list()
for (page_num in 1:(total_pages - 1)) {
pagination <- list(pageNumber = page_num, pageSize = page_size)
task <- tryCatch({
cloudos.query_submit_async(profilename, cohort_id, sql, pagination)
}, error = function(e) {
stop(sprintf("Failed to submit query for page %d: %s", page_num, e$message), call. = FALSE)
})
remaining_tasks[[page_num]] <- list(
task_id = task$task_id,
page_num = page_num
)
message(sprintf(" Submitted page %d (task ID: %s)", page_num, task$task_id))
# Small delay to avoid overwhelming the server
Sys.sleep(0.2)
}
# Step 5: Poll all remaining tasks
message("\nWaiting for all pages to complete...")
page_results <- list(first_page)
for (task_info in remaining_tasks) {
page_num <- task_info$page_num
task_id <- task_info$task_id
message(sprintf("Polling page %d...", page_num))
status_info <- poll_task(task_id, sprintf("Page %d", page_num))
message(sprintf("Page %d completed, fetching results...", page_num))
page_df <- tryCatch({
cloudos.query_results(profilename, task_id)
}, error = function(e) {
stop(sprintf("Failed to fetch page %d results: %s", page_num, e$message), call. = FALSE)
})
page_results[[page_num + 1]] <- page_df
}
# Step 6: Combine all pages
message("\nCombining all pages...")
combined_df <- do.call(rbind, page_results)
# Update metadata
attr(combined_df, "total_rows") <- total_rows
attr(combined_df, "page") <- 0
attr(combined_df, "page_size") <- total_rows
attr(combined_df, "total_pages") <- 1
attr(combined_df, "all_pages_fetched") <- TRUE
message(sprintf("All pages fetched: %d rows", nrow(combined_df)))
message("Query complete!")
return(combined_df)
}
#' Convert Query Results to Dataframe
#'
#' Internal function to convert API response data to a dataframe.
#'
#' @param data List. Data rows from API response.
#' @param column_names Character vector. Column names.
#'
#' @return Data frame with query results.
#' @keywords internal
convert_results_to_dataframe <- function(data, column_names) {
# Handle empty results
if (is.null(data) || length(data) == 0) {
# Create empty dataframe with correct columns
df <- as.data.frame(matrix(ncol = length(column_names), nrow = 0))
colnames(df) <- column_names
return(df)
}
# Convert each row to a named list
rows <- lapply(data, function(row) {
# Create a named list for the row
row_data <- list()
for (i in seq_along(column_names)) {
col_name <- column_names[i]
# API returns rows as objects with column names as keys
value <- row[[col_name]]
# Handle NULL values
if (is.null(value)) {
row_data[[col_name]] <- NA
} else {
row_data[[col_name]] <- value
}
}
return(row_data)
})
# Convert to dataframe
df <- as.data.frame(do.call(rbind, lapply(rows, function(r) {
as.data.frame(r, stringsAsFactors = FALSE)
})), stringsAsFactors = FALSE)
# Ensure column names are set correctly
colnames(df) <- column_names
return(df)
}
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.