R/future-starburst.R

Defines functions check_and_submit_wave submit_task result.StarburstFuture resolved.StarburstFuture run.StarburstFuture StarburstFuture future.starburst

Documented in check_and_submit_wave future.starburst resolved.StarburstFuture result.StarburstFuture run.StarburstFuture StarburstFuture submit_task

#' Create a Future using Starburst Backend
#'
#' This is the entry point called by the Future package when a plan(starburst) is active
#'
#' @param expr Expression to evaluate
#' @param envir Environment for evaluation
#' @param substitute Whether to substitute the expression
#' @param globals Globals to export (TRUE for auto-detection, list for manual)
#' @param packages Packages to load
#' @param lazy Whether to lazily evaluate (always FALSE for remote)
#' @param seed Random seed
#' @param stdout Whether to capture stdout (TRUE, FALSE, or NA)
#' @param conditions Character vector of condition classes to capture
#' @param label Optional label for the future
#' @param ... Additional arguments
#'
#' @return A StarburstFuture object
#' @importFrom future future
#' @method future starburst
#' @export
future.starburst <- function(expr, envir = parent.frame(), substitute = TRUE,
                             lazy = FALSE, seed = FALSE, globals = TRUE,
                             packages = NULL, stdout = TRUE, conditions = "condition",
                             label = NULL, ...) {
  StarburstFuture(
    expr = expr,
    envir = envir,
    substitute = substitute,
    globals = globals,
    packages = packages,
    lazy = lazy,
    seed = seed,
    stdout = stdout,
    conditions = conditions,
    label = label,
    ...
  )
}

#' StarburstFuture Constructor
#'
#' Creates a Future object for evaluation on AWS Fargate
#'
#' @param expr Expression to evaluate
#' @param envir Environment for evaluation
#' @param substitute Whether to substitute the expression
#' @param globals Globals to export (TRUE for auto-detection, list for manual)
#' @param packages Packages to load
#' @param lazy Whether to lazily evaluate (always FALSE for remote)
#' @param seed Random seed
#' @param ... Additional arguments
#'
#' @return A StarburstFuture object
#' @keywords internal
#' @export
StarburstFuture <- function(expr, envir = parent.frame(), substitute = TRUE,
                            globals = TRUE, packages = NULL, lazy = FALSE,
                            seed = FALSE, stdout = TRUE, conditions = "condition",
                            label = NULL, ...) {

  # Substitute expression if needed
  if (substitute) {
    expr <- substitute(expr)
  }

  # Get current plan
  plan_obj <- future::plan("next")

  # Get backend from plan attributes or options
  backend <- attr(plan_obj, "backend")

  if (is.null(backend)) {
    # Try to get from options
    backend <- getOption("starburst.current_backend")
  }

  if (is.null(backend)) {
    stop("No starburst backend found. Call plan(starburst, ...) first.")
  }

  if (!inherits(backend, "StarburstBackend")) {
    class(backend) <- c("StarburstBackend", "FutureBackend", class(backend))
  }

  # Auto-detect globals and packages
  if (isTRUE(globals)) {
    gp <- future::getGlobalsAndPackages(expr, envir = envir, globals = TRUE)
    globals <- gp$globals
    if (is.null(packages)) {
      packages <- gp$packages
    }
  } else if (is.list(globals) && !inherits(globals, "Globals")) {
    # Convert plain list to Globals object for proper serialization
    globals <- globals::as.Globals(globals)
  }

  # Generate task ID
  task_id <- sprintf("task-%s", gsub("-", "", uuid::UUIDgenerate()))

  # Create future object as a list first
  f_list <- list(
    expr = expr,
    envir = envir,
    globals = globals,
    packages = packages,
    seed = seed,
    task_id = task_id,
    backend = backend,
    state = "created",
    result_value = NULL,
    task_arn = NULL,
    submitted_at = NULL,
    ...
  )

  # Convert to environment for mutability
  f <- list2env(f_list, parent = emptyenv())

  # Set class after conversion to environment
  class(f) <- c("StarburstFuture", "Future", "environment")

  f
}

#' Run a StarburstFuture
#'
#' Submits the future task to AWS Fargate for execution
#'
#' @param future A StarburstFuture object
#' @param ... Additional arguments
#'
#' @return The future object (invisibly)
#' @importFrom future run resolved result
#' @export
run.StarburstFuture <- function(future, ...) {

  # Skip if already running or resolved
  if (future$state %in% c("running", "finished")) {
    return(invisible(future))
  }

  # Get backend/plan
  backend <- future$backend

  # Serialize task data
  task_data <- list(
    expr = future$expr,
    globals = future$globals,
    packages = future$packages,
    seed = future$seed
  )

  # Upload to S3
  s3 <- get_s3_client(backend$region)
  task_key <- sprintf("tasks/%s.qs", future$task_id)
  temp_file <- tempfile(fileext = ".qs")

  tryCatch({
    qs2::qs_save(task_data, temp_file)
    s3$put_object(
      Bucket = backend$bucket,
      Key = task_key,
      Body = temp_file
    )
  }, finally = {
    unlink(temp_file)
  })

  # Check if we should queue or submit immediately
  if (backend$quota_limited) {
    # Add to wave queue
    future$state <- "queued"
    backend$wave_queue$pending <- append(
      backend$wave_queue$pending,
      list(future)
    )
    # Try to submit next wave
    check_and_submit_wave(backend)
  } else {
    # Submit immediately
    submit_task(future, backend)
  }

  invisible(future)
}

#' Check if StarburstFuture is Resolved
#'
#' Checks whether the future task has completed execution
#'
#' @param x A StarburstFuture object
#' @param ... Additional arguments
#'
#' @return Logical indicating if the future is resolved
#' @export
resolved.StarburstFuture <- function(x, ...) {
  future <- x  # Match generic signature
  # If already resolved, return TRUE
  if (future$state == "finished") {
    return(TRUE)
  }

  # If queued, check if wave has progressed
  if (future$state == "queued") {
    backend <- future$backend
    if (backend$quota_limited) {
      check_and_submit_wave(backend)
    }
    return(FALSE)
  }

  # If not running yet, return FALSE
  if (future$state != "running") {
    return(FALSE)
  }

  # Check S3 for result
  backend <- future$backend
  s3 <- get_s3_client(backend$region)
  result_key <- sprintf("results/%s.qs", future$task_id)

  result_exists <- tryCatch({
    s3$head_object(
      Bucket = backend$bucket,
      Key = result_key
    )
    TRUE
  }, error = function(e) {
    FALSE
  })

  if (result_exists) {
    future$state <- "finished"
    return(TRUE)
  }

  # Check if task failed (optional: query ECS for task status)
  # For now, just return FALSE
  FALSE
}

#' Get Result from StarburstFuture
#'
#' Retrieves the result from a resolved future
#'
#' @param future A StarburstFuture object
#' @param ... Additional arguments
#'
#' @return A FutureResult object
#' @export
result.StarburstFuture <- function(future, ...) {
  # If already have result, return it
  if (!is.null(future$result_value)) {
    return(future$result_value)
  }

  # Wait for resolution
  while (!resolved(future)) {
    Sys.sleep(1)
  }

  # Download result from S3
  backend <- future$backend
  s3 <- get_s3_client(backend$region)
  result_key <- sprintf("results/%s.qs", future$task_id)
  temp_file <- tempfile(fileext = ".qs")

  tryCatch({
    s3$download_file(
      Bucket = backend$bucket,
      Key = result_key,
      Filename = temp_file
    )
    result_data <- qs2::qs_read(temp_file)
  }, finally = {
    unlink(temp_file)
  })

  # Create FutureResult object
  if (!is.null(result_data$error) && result_data$error) {
    # Task failed
    result_obj <- structure(
      list(
        value = NULL,
        visible = TRUE,
        stdout = result_data$stdout %||% "",
        conditions = list(
          simpleError(result_data$message %||% "Task failed")
        ),
        version = "1.8"
      ),
      class = "FutureResult"
    )
  } else {
    # Task succeeded
    result_obj <- structure(
      list(
        value = result_data$value,
        visible = TRUE,
        stdout = result_data$stdout %||% "",
        conditions = list(),
        version = "1.8"
      ),
      class = "FutureResult"
    )
  }

  # Cache result
  future$result_value <- result_obj

  # Update backend stats
  backend$completed_tasks <- backend$completed_tasks + 1

  result_obj
}

#' Submit Task to ECS
#'
#' Internal function to submit a task to ECS (Fargate or EC2)
#'
#' @param future StarburstFuture object
#' @param backend Backend/plan object
#' @keywords internal
submit_task <- function(future, backend) {

  # Ensure task definition exists
  if (is.null(backend$task_definition_arn)) {
    backend$task_definition_arn <- get_or_create_task_definition(backend)
  }

  # Handle EC2 pool warmup if needed
  if (backend$launch_type == "EC2" && is.null(backend$pool_started_at)) {
    cat_info("[Setup] Starting warm EC2 pool (~2 min first time)...\n")
    start_warm_pool(backend, backend$workers)
    backend$pool_started_at <- Sys.time()
  }

  # Get ECS client
  ecs <- get_ecs_client(backend$region)

  # Get network configuration
  vpc_config <- get_vpc_config(backend$region)

  # Build base run_task parameters
  run_task_params <- list(
    cluster = backend$cluster,
    taskDefinition = backend$task_definition_arn,
    networkConfiguration = list(
      awsvpcConfiguration = list(
        subnets = vpc_config$subnets,
        securityGroups = vpc_config$security_groups,
        assignPublicIp = "ENABLED"
      )
    ),
    overrides = list(
      containerOverrides = list(
        list(
          name = "starburst-worker",
          environment = list(
            list(name = "TASK_ID", value = future$task_id),
            list(name = "S3_BUCKET", value = backend$bucket),
            list(name = "AWS_DEFAULT_REGION", value = backend$region),
            list(name = "CLUSTER_ID", value = backend$cluster_id)
          )
        )
      )
    )
  )

  # Add launch-type specific parameters
  if (backend$launch_type == "EC2") {
    run_task_params$capacityProviderStrategy <- list(
      list(
        capacityProvider = backend$capacity_provider_name,
        weight = 1
      )
    )
    run_task_params$placementConstraints <- list(
      list(
        type = "memberOf",
        expression = sprintf("attribute:ecs.instance-type == %s",
                           backend$instance_type)
      )
    )
  } else {
    run_task_params$launchType <- "FARGATE"
  }

  # Submit task
  response <- do.call(ecs$run_task, run_task_params)

  if (length(response$tasks) > 0) {
    future$task_arn <- response$tasks[[1]]$taskArn
    future$submitted_at <- Sys.time()
    future$state <- "running"

    # Store task ARN for monitoring
    store_task_arn(future$task_id, future$task_arn)
  } else {
    # Task submission failed
    failure_msg <- if (length(response$failures) > 0) {
      sprintf("%s: %s", response$failures[[1]]$reason, response$failures[[1]]$detail)
    } else {
      "Unknown reason"
    }
    stop(sprintf("Failed to submit task: %s", failure_msg))
  }

  invisible(NULL)
}

#' Check and Submit Wave if Ready
#'
#' Checks wave queue and submits next wave if current wave is complete
#'
#' @param backend Backend environment
#' @keywords internal
check_and_submit_wave <- function(backend) {

  if (!backend$quota_limited) {
    return(invisible(NULL))
  }

  # Check how many futures are currently running
  running_futures <- backend$wave_queue$wave_futures
  running_count <- length(running_futures)

  # Remove completed futures from wave_futures
  if (running_count > 0) {
    still_running <- list()
    for (i in seq_along(running_futures)) {
      future_obj <- running_futures[[i]]
      if (!resolved(future_obj)) {
        still_running <- append(still_running, list(future_obj))
      } else {
        backend$wave_queue$completed <- backend$wave_queue$completed + 1
      }
    }
    backend$wave_queue$wave_futures <- still_running
    running_count <- length(still_running)
  }

  # If current wave is empty and there are pending futures, start new wave
  pending_futures <- backend$wave_queue$pending
  if (running_count == 0 && length(pending_futures) > 0) {
    # Calculate how many tasks to submit in this wave
    tasks_to_submit <- min(backend$workers_per_wave, length(pending_futures))

    cat_info(sprintf(
      "[Status] Starting wave %d: submitting %d tasks (%d pending, %d completed)\n",
      backend$wave_queue$current_wave,
      tasks_to_submit,
      length(pending_futures),
      backend$wave_queue$completed
    ))

    # Submit futures
    for (i in seq_len(tasks_to_submit)) {
      future_obj <- pending_futures[[1]]
      backend$wave_queue$pending <- pending_futures[-1]
      pending_futures <- backend$wave_queue$pending

      # Submit the future to Fargate
      submit_task(future_obj, backend)

      # Add to wave_futures for tracking
      backend$wave_queue$wave_futures <- append(
        backend$wave_queue$wave_futures,
        list(future_obj)
      )
    }

    # Increment wave counter
    backend$wave_queue$current_wave <- backend$wave_queue$current_wave + 1
  }

  invisible(NULL)
}

# Note: %||% operator defined in R/utils.R

Try the starburst package in your browser

Any scripts or data that you put into this service are public.

starburst documentation built on March 19, 2026, 5:08 p.m.