R/utils.R

Defines functions get_vpc_config get_or_create_security_group get_or_create_subnets list_active_clusters list_task_arns get_task_arn store_task_arn get_task_registry get_or_create_task_definition get_task_role_arn get_execution_role_arn ensure_log_group build_initial_environment build_environment_image ensure_base_image get_base_image_source build_base_image get_instance_specs get_architecture_from_instance_type get_base_image_uri check_ecr_image_exists ensure_environment compute_env_hash cleanup_s3_files stop_running_tasks calculate_total_cost calculate_task_cost get_instance_vcpus get_ec2_instance_price estimate_cost poll_for_result result_exists extract_region_from_key serialize_and_upload create_task `%||%` get_starburst_security_groups get_starburst_subnets get_starburst_bucket get_service_quotas_client check_ecr_image_age create_ecr_lifecycle_policy get_ec2_client get_ecr_client get_ecs_client get_s3_client get_aws_account_id check_aws_credentials safe_system

Documented in build_base_image build_environment_image build_initial_environment calculate_task_cost calculate_total_cost check_aws_credentials check_ecr_image_age check_ecr_image_exists cleanup_s3_files compute_env_hash create_ecr_lifecycle_policy create_task ensure_base_image ensure_environment ensure_log_group estimate_cost extract_region_from_key get_architecture_from_instance_type get_aws_account_id get_base_image_source get_base_image_uri get_ec2_client get_ec2_instance_price get_ecr_client get_ecs_client get_execution_role_arn get_instance_specs get_instance_vcpus get_or_create_security_group get_or_create_subnets get_or_create_task_definition get_s3_client get_service_quotas_client get_starburst_bucket get_starburst_security_groups get_starburst_subnets get_task_arn get_task_registry get_task_role_arn get_vpc_config list_active_clusters list_task_arns poll_for_result result_exists safe_system serialize_and_upload stop_running_tasks store_task_arn

#' Utility functions for staRburst
#'
#' @name utils
#' @keywords internal
NULL

#' Execute system command safely (no shell injection)
#'
#' @param command Command to execute (must be in whitelist)
#' @param args Character vector of arguments
#' @param allowed_commands Commands allowed to be executed
#' @param stdin Optional input to pass to stdin
#' @param ... Additional arguments passed to processx::run()
#' @return Result from processx::run()
#' @keywords internal
safe_system <- function(command,
                       args = character(),
                       allowed_commands = c("docker", "aws", "uname", "sysctl", "cat", "nproc"),
                       stdin = NULL,
                       stdout = "|",
                       stderr = "|",
                       ...) {
  # Validate command whitelist
  if (!command %in% allowed_commands) {
    stop(sprintf("Command not in whitelist: %s. Allowed: %s",
                 command, paste(allowed_commands, collapse = ", ")))
  }

  # Convert boolean stdout/stderr to processx format (TRUE = capture, FALSE = discard)
  if (isTRUE(stdout)) stdout <- "|"
  if (isFALSE(stdout)) stdout <- ""
  if (isTRUE(stderr)) stderr <- "|"
  if (isFALSE(stderr)) stderr <- ""

  # If stdin is a string that isn't a file path, write it to a temp file
  # processx::run() treats stdin as a file path, not file content
  stdin_file <- NULL
  if (!is.null(stdin) && stdin != "|" && !file.exists(stdin)) {
    stdin_file <- tempfile()
    writeLines(stdin, stdin_file)
    on.exit(unlink(stdin_file), add = TRUE)
    stdin <- stdin_file
  }

  # Use processx - automatically escapes, no shell
  result <- processx::run(
    command = command,
    args = args,
    stdin = stdin,
    stdout = stdout,
    stderr = stderr,
    error_on_status = FALSE,
    ...
  )

  if (result$status != 0) {
    stop(sprintf("Command '%s %s' failed (exit code %d):\n%s",
                 command, paste(args, collapse = " "),
                 result$status, result$stderr))
  }

  result
}

#' Check AWS credentials
#'
#' @return Logical indicating if credentials are valid
#' @keywords internal
check_aws_credentials <- function() {
  tryCatch({
    sts <- paws.security.identity::sts()
    identity <- sts$get_caller_identity()
    return(TRUE)
  }, error = function(e) {
    return(FALSE)
  })
}

#' Get AWS account ID
#'
#' @return AWS account ID
#' @keywords internal
get_aws_account_id <- function() {
  sts <- paws.security.identity::sts()
  identity <- sts$get_caller_identity()
  identity$Account
}

#' Get S3 client
#'
#' @param region AWS region
#' @return S3 client
#' @keywords internal
get_s3_client <- function(region) {
  paws.storage::s3(config = list(region = region))
}

#' Get ECS client
#'
#' @param region AWS region
#' @return ECS client
#' @keywords internal
get_ecs_client <- function(region) {
  paws.compute::ecs(config = list(region = region))
}

#' Get ECR client
#'
#' @param region AWS region
#' @return ECR client
#' @keywords internal
get_ecr_client <- function(region) {
  paws.compute::ecr(config = list(region = region))
}

#' Get EC2 client
#'
#' @param region AWS region
#' @return EC2 client
#' @keywords internal
get_ec2_client <- function(region) {
  paws.compute::ec2(config = list(region = region))
}

#' Create ECR lifecycle policy to auto-delete old images
#'
#' @param region AWS region
#' @param repository_name ECR repository name
#' @param ttl_days Number of days to keep images (NULL = no auto-delete)
#' @keywords internal
create_ecr_lifecycle_policy <- function(region, repository_name, ttl_days = NULL) {
  if (is.null(ttl_days)) {
    return(invisible(NULL))
  }

  ecr <- get_ecr_client(region)

  # Create lifecycle policy that deletes images older than ttl_days
  # This runs automatically in AWS - no starburst needed
  policy <- list(
    rules = list(
      list(
        rulePriority = 1L,
        description = sprintf("Auto-delete starburst images after %d days of no use", ttl_days),
        selection = list(
          tagStatus = "any",
          countType = "sinceImagePushed",
          countUnit = "days",
          countNumber = as.integer(ttl_days)
        ),
        action = list(
          type = "expire"
        )
      )
    )
  )

  tryCatch({
    ecr$put_lifecycle_policy(
      repositoryName = repository_name,
      lifecyclePolicyText = jsonlite::toJSON(policy, auto_unbox = TRUE)
    )
    cat_success(sprintf("[OK] ECR auto-cleanup enabled: Images deleted after %d days\n", ttl_days))
  }, error = function(e) {
    cat_warn(sprintf("[WARNING] Failed to set ECR lifecycle policy: %s\n", e$message))
  })
}

#' Check ECR image age and suggest/force rebuild
#'
#' @param region AWS region
#' @param image_tag Image tag to check
#' @param ttl_days TTL setting (NULL = no check)
#' @param force_rebuild Force rebuild if past TTL
#' @return TRUE if image is fresh or doesn't exist, FALSE if stale
#' @keywords internal
check_ecr_image_age <- function(region, image_tag, ttl_days = NULL, force_rebuild = FALSE) {
  if (is.null(ttl_days)) {
    return(TRUE)  # No TTL, always consider fresh
  }

  ecr <- get_ecr_client(region)
  config <- get_starburst_config()
  repo_name <- "starburst-worker"

  # Get image details
  image_exists <- tryCatch({
    result <- ecr$describe_images(
      repositoryName = repo_name,
      imageIds = list(list(imageTag = image_tag))
    )
    length(result$imageDetails) > 0
  }, error = function(e) {
    FALSE
  })

  if (!image_exists) {
    return(TRUE)  # Image doesn't exist, will be built
  }

  # Get image push timestamp
  image_details <- ecr$describe_images(
    repositoryName = repo_name,
    imageIds = list(list(imageTag = image_tag))
  )$imageDetails[[1]]

  push_time <- image_details$imagePushedAt
  age_days <- as.numeric(difftime(Sys.time(), push_time, units = "days"))

  # Check if image is stale
  if (age_days > ttl_days) {
    if (force_rebuild) {
      cat_warn(sprintf("[WARNING] Image is %.0f days old (TTL: %d days), rebuilding...\n",
                         age_days, ttl_days))
      return(FALSE)  # Signal rebuild needed
    } else {
      cat_warn(sprintf("[WARNING] Image is %.0f days old (TTL: %d days)\n", age_days, ttl_days))
      cat_info("  AWS will auto-delete soon. Consider running a job to refresh.\n")
      return(TRUE)  # Use existing but warn
    }
  } else {
    days_remaining <- ttl_days - age_days
    cat_info(sprintf("[OK] Image age: %.0f days (%.0f days until auto-delete)\n",
                    age_days, days_remaining))
    return(TRUE)
  }
}

#' Get Service Quotas client
#'
#' @param region AWS region
#' @return Service Quotas client
#' @keywords internal
get_service_quotas_client <- function(region) {
  paws.management::servicequotas(config = list(region = region))
}

#' Get starburst bucket name
#'
#' @return S3 bucket name
#' @keywords internal
get_starburst_bucket <- function() {
  config <- get_starburst_config()
  config$bucket
}

#' Get starburst subnets
#'
#' @param region AWS region
#' @return Vector of subnet IDs
#' @keywords internal
get_starburst_subnets <- function(region) {
  config <- get_starburst_config()
  config$subnets
}

#' Get starburst security groups
#'
#' @param region AWS region
#' @return Vector of security group IDs
#' @keywords internal
get_starburst_security_groups <- function(region) {
  config <- get_starburst_config()
  config$security_groups
}

#' Infix null-coalesce operator
#'
#' @keywords internal
#' @noRd
`%||%` <- function(a, b) {
  if (is.null(a)) b else a
}

#' Create task object
#'
#' @keywords internal
create_task <- function(expr, globals, packages, plan) {
  list(
    expr = expr,
    globals = globals,
    packages = packages,
    plan_info = list(
      cluster_id = plan$cluster_id,
      cpu = plan$cpu,
      memory = plan$memory,
      region = plan$region
    )
  )
}

#' Serialize and upload to S3
#'
#' @keywords internal
serialize_and_upload <- function(obj, bucket, key) {
  temp_file <- tempfile(fileext = ".qs")
  on.exit(unlink(temp_file))

  qs2::qs_save(obj, temp_file)

  s3 <- get_s3_client(extract_region_from_key(key))
  s3$put_object(
    Bucket = bucket,
    Key = key,
    Body = temp_file
  )

  invisible(NULL)
}

#' Extract region from S3 key
#'
#' @keywords internal
extract_region_from_key <- function(key) {
  config <- get_starburst_config()
  config$region
}

#' Check if result exists
#'
#' @keywords internal
result_exists <- function(task_id, region) {
  bucket <- get_starburst_bucket()
  key <- sprintf("results/%s.qs", task_id)

  s3 <- get_s3_client(region)

  tryCatch({
    s3$head_object(Bucket = bucket, Key = key)
    return(TRUE)
  }, error = function(e) {
    return(FALSE)
  })
}

#' Poll for result
#'
#' @keywords internal
poll_for_result <- function(future, timeout = 3600) {
  bucket <- get_starburst_bucket()
  key <- sprintf("results/%s/%s.qs", future$plan$cluster_id, future$task_id)
  region <- future$plan$region

  s3 <- get_s3_client(region)

  start_time <- Sys.time()

  while (TRUE) {
    # Check if result exists
    if (result_exists(future$task_id, region)) {
      # Download and deserialize
      temp_file <- tempfile(fileext = ".qs")
      on.exit(unlink(temp_file))

      s3$download_file(
        Bucket = bucket,
        Key = key,
        Filename = temp_file
      )

      result <- qs2::qs_read(temp_file)
      return(result)
    }

    # Check timeout
    elapsed <- as.numeric(difftime(Sys.time(), start_time, units = "secs"))
    if (elapsed > timeout) {
      stop(sprintf("Task timeout after %d seconds", timeout))
    }

    # Wait before next poll
    Sys.sleep(2)
  }
}

#' Estimate cost
#'
#' @keywords internal
estimate_cost <- function(workers, cpu, memory, estimated_runtime_hours = 1,
                         launch_type = "FARGATE", instance_type = NULL, use_spot = FALSE) {

  if (launch_type == "FARGATE") {
    # Fargate pricing (us-east-1, 2026)
    vcpu_price_per_hour <- 0.04048
    gb_price_per_hour <- 0.004445

    memory_gb <- parse_memory(memory)

    cost_per_worker_per_hour <-
      (cpu * vcpu_price_per_hour) +
      (memory_gb * gb_price_per_hour)

    list(
      per_worker = cost_per_worker_per_hour,
      per_hour = cost_per_worker_per_hour * workers,
      total_estimated = cost_per_worker_per_hour * workers * estimated_runtime_hours
    )
  } else {
    # EC2 pricing
    instance_price <- get_ec2_instance_price(instance_type, use_spot)
    instance_vcpus <- get_instance_vcpus(instance_type)

    # Calculate number of instances needed
    total_vcpus_needed <- workers * cpu
    instances_needed <- ceiling(total_vcpus_needed / instance_vcpus)

    total_cost_per_hour <- instances_needed * instance_price

    list(
      per_instance = instance_price,
      instances_needed = instances_needed,
      total_per_hour = total_cost_per_hour,
      total_estimated = total_cost_per_hour * estimated_runtime_hours,
      spot_discount = if (use_spot) "~70%" else "N/A",
      launch_type = "EC2"
    )
  }
}

#' Get EC2 instance pricing
#'
#' @param instance_type EC2 instance type (e.g., "c7g.xlarge")
#' @param use_spot Whether to use spot pricing
#' @return Price per hour in USD
#' @keywords internal
get_ec2_instance_price <- function(instance_type, use_spot = FALSE) {
  # Simplified pricing table for common instance types (us-east-1, 2026)
  # In production, this could query AWS Pricing API
  pricing <- list(
    # Graviton3 (ARM64) - 7th gen
    "c7g.large" = 0.0725,
    "c7g.xlarge" = 0.145,
    "c7g.2xlarge" = 0.29,
    "c7g.4xlarge" = 0.58,
    "c7g.8xlarge" = 1.16,
    "c7g.12xlarge" = 1.74,
    "c7g.16xlarge" = 2.32,
    "r7g.large" = 0.1008,
    "r7g.xlarge" = 0.2016,
    "r7g.2xlarge" = 0.4032,
    "r7g.4xlarge" = 0.8064,
    "m7g.large" = 0.0816,
    "m7g.xlarge" = 0.1632,
    "m7g.2xlarge" = 0.3264,
    "t4g.small" = 0.0168,
    "t4g.medium" = 0.0336,
    "t4g.large" = 0.0672,

    # AMD 8th gen (x86_64) - BEST OVERALL PRICE/PERFORMANCE (Feb 2026)
    "c8a.large" = 0.072,
    "c8a.xlarge" = 0.144,
    "c8a.2xlarge" = 0.288,
    "c8a.4xlarge" = 0.576,
    "c8a.8xlarge" = 1.152,
    "c8a.12xlarge" = 1.728,
    "c8a.16xlarge" = 2.304,
    "c8a.24xlarge" = 3.456,
    "c8a.32xlarge" = 4.608,
    "r8a.large" = 0.1008,
    "r8a.xlarge" = 0.2016,
    "r8a.2xlarge" = 0.4032,
    "r8a.4xlarge" = 0.8064,
    "m8a.large" = 0.0816,
    "m8a.xlarge" = 0.1632,
    "m8a.2xlarge" = 0.3264,

    # Graviton4 (ARM64) - 8th gen - Second best price/performance
    "c8g.large" = 0.076,
    "c8g.xlarge" = 0.152,
    "c8g.2xlarge" = 0.304,
    "c8g.4xlarge" = 0.608,
    "c8g.8xlarge" = 1.216,
    "c8g.12xlarge" = 1.824,
    "c8g.16xlarge" = 2.432,
    "r8g.large" = 0.1058,
    "r8g.xlarge" = 0.2116,
    "r8g.2xlarge" = 0.4232,
    "r8g.4xlarge" = 0.8464,
    "m8g.large" = 0.0856,
    "m8g.xlarge" = 0.1712,
    "m8g.2xlarge" = 0.3424,

    # Intel 7th gen (x86_64) - Ice Lake
    "c7i.large" = 0.0893,
    "c7i.xlarge" = 0.1785,
    "c7i.2xlarge" = 0.357,
    "c7i.4xlarge" = 0.714,
    "c7i.8xlarge" = 1.428,
    "c7i.12xlarge" = 2.142,
    "c7i.16xlarge" = 2.856,
    "c6i.large" = 0.085,
    "c6i.xlarge" = 0.17,
    "c6i.2xlarge" = 0.34,
    "c6i.4xlarge" = 0.68,
    "r6i.large" = 0.126,
    "r6i.xlarge" = 0.252,
    "r6i.2xlarge" = 0.504,
    "r6i.4xlarge" = 1.008,
    "m6i.large" = 0.096,
    "m6i.xlarge" = 0.192,
    "m6i.2xlarge" = 0.384,

    # Intel 8th gen (x86_64) - Sapphire Rapids - Highest single-thread performance
    "c8i.large" = 0.0935,
    "c8i.xlarge" = 0.187,
    "c8i.2xlarge" = 0.374,
    "c8i.4xlarge" = 0.748,
    "c8i.8xlarge" = 1.496,
    "c8i.12xlarge" = 2.244,
    "c8i.16xlarge" = 2.992,

    # AMD 7th gen (x86_64) - Best price/performance for x86
    "c7a.large" = 0.0765,
    "c7a.xlarge" = 0.153,
    "c7a.2xlarge" = 0.306,
    "c7a.4xlarge" = 0.612,
    "c7a.8xlarge" = 1.224,
    "c7a.12xlarge" = 1.836,
    "c7a.16xlarge" = 2.448,
    "r7a.large" = 0.1134,
    "r7a.xlarge" = 0.2268,
    "r7a.2xlarge" = 0.4536,
    "r7a.4xlarge" = 0.9072,
    "m7a.large" = 0.0864,
    "m7a.xlarge" = 0.1728,
    "m7a.2xlarge" = 0.3456,

    # AMD 6th gen (x86_64) - Good budget option
    "c6a.large" = 0.0765,
    "c6a.xlarge" = 0.153,
    "c6a.2xlarge" = 0.306,
    "c6a.4xlarge" = 0.612,
    "r6a.large" = 0.1134,
    "r6a.xlarge" = 0.2268,
    "r6a.2xlarge" = 0.4536,
    "m6a.large" = 0.0864,
    "m6a.xlarge" = 0.1728,
    "m6a.2xlarge" = 0.3456
  )

  on_demand_price <- pricing[[instance_type]]

  if (is.null(on_demand_price)) {
    # Default estimate if instance type not in table
    cat_warn(sprintf("Warning: No pricing data for %s, using estimate\n", instance_type))
    on_demand_price <- 0.15  # Conservative estimate
  }

  if (use_spot) {
    # Spot instances typically 70% cheaper
    return(on_demand_price * 0.3)
  } else {
    return(on_demand_price)
  }
}

#' Get vCPU count for instance type
#'
#' @param instance_type EC2 instance type
#' @return Number of vCPUs
#' @keywords internal
get_instance_vcpus <- function(instance_type) {
  # Parse instance size to determine vCPUs
  # Format: family + generation + size (e.g., c7g.xlarge)
  size_mapping <- list(
    "nano" = 0.25,
    "micro" = 0.5,
    "small" = 1,
    "medium" = 2,
    "large" = 2,
    "xlarge" = 4,
    "2xlarge" = 8,
    "4xlarge" = 16,
    "8xlarge" = 32,
    "12xlarge" = 48,
    "16xlarge" = 64,
    "24xlarge" = 96,
    "32xlarge" = 128
  )

  # Extract size from instance type
  parts <- strsplit(instance_type, "\\.")[[1]]
  if (length(parts) < 2) {
    stop(sprintf("Invalid instance type format: %s", instance_type))
  }

  size <- parts[2]
  vcpus <- size_mapping[[size]]

  if (is.null(vcpus)) {
    stop(sprintf("Unknown instance size: %s", size))
  }

  return(vcpus)
}

#' Calculate task cost
#'
#' @keywords internal
calculate_task_cost <- function(future) {
  # Get actual runtime
  if (is.null(future$completed_at)) {
    return(0)
  }

  runtime_hours <- as.numeric(
    difftime(future$completed_at, future$submitted_at, units = "hours")
  )

  # Calculate cost
  cost_est <- estimate_cost(1, future$plan$cpu, future$plan$memory, runtime_hours)
  cost_est$total_estimated
}

#' Calculate total cost
#'
#' @keywords internal
calculate_total_cost <- function(plan) {
  ecs <- get_ecs_client(plan$region)

  # Fargate pricing (us-east-1, adjust for other regions)
  # Reference: https://aws.amazon.com/fargate/pricing/
  vcpu_hour_cost <- 0.04048
  gb_hour_cost <- 0.004445

  total_cost <- 0

  tryCatch({
    # Get all task ARNs for this cluster
    task_arns <- list()
    stored_tasks <- list_task_arns()

    for (task_id in names(stored_tasks)) {
      task_arns <- append(task_arns, stored_tasks[[task_id]]$task_arn)
    }

    if (length(task_arns) == 0) {
      return(0)
    }

    # Describe tasks to get runtime information
    # Process in batches of 100 (ECS limit)
    batch_size <- 100
    for (i in seq(1, length(task_arns), by = batch_size)) {
      end_idx <- min(i + batch_size - 1, length(task_arns))
      batch <- task_arns[i:end_idx]

      tasks <- ecs$describe_tasks(
        cluster = "starburst-cluster",
        tasks = batch
      )

      for (task in tasks$tasks) {
        # Calculate runtime
        if (!is.null(task$startedAt)) {
          started <- as.POSIXct(task$startedAt, origin = "1970-01-01")

          stopped <- if (!is.null(task$stoppedAt)) {
            as.POSIXct(task$stoppedAt, origin = "1970-01-01")
          } else {
            Sys.time()  # Still running
          }

          runtime_hours <- as.numeric(difftime(stopped, started, units = "hours"))

          # Calculate cost based on CPU and memory
          vcpu_cost <- runtime_hours * plan$worker_cpu * vcpu_hour_cost
          memory_cost <- runtime_hours * plan$worker_memory * gb_hour_cost

          total_cost <- total_cost + vcpu_cost + memory_cost
        }
      }
    }

    return(total_cost)

  }, error = function(e) {
    # If we can't get actual costs, return tracked estimate
    if (!is.null(plan$total_cost)) {
      return(plan$total_cost)
    }
    return(0)
  })
}

#' Stop running tasks
#'
#' @keywords internal
stop_running_tasks <- function(plan) {
  ecs <- get_ecs_client(plan$region)

  tryCatch({
    # List running tasks
    tasks <- ecs$list_tasks(
      cluster = "starburst-cluster",
      family = "starburst-worker"
    )

    # Stop each task
    for (task_arn in tasks$taskArns) {
      ecs$stop_task(
        cluster = "starburst-cluster",
        task = task_arn,
        reason = "Cluster cleanup"
      )
    }
  }, error = function(e) {
    warning(sprintf("Error stopping tasks: %s", e$message))
  })

  invisible(NULL)
}

#' Cleanup S3 files
#'
#' @keywords internal
cleanup_s3_files <- function(plan) {
  bucket <- get_starburst_bucket()
  prefix <- sprintf("tasks/%s/", plan$cluster_id)

  s3 <- get_s3_client(plan$region)

  tryCatch({
    # List and delete task files
    objects <- s3$list_objects_v2(Bucket = bucket, Prefix = prefix)

    if (length(objects$Contents) > 0) {
      delete_objects <- lapply(objects$Contents, function(obj) {
        list(Key = obj$Key)
      })

      s3$delete_objects(
        Bucket = bucket,
        Delete = list(Objects = delete_objects)
      )
    }
  }, error = function(e) {
    warning(sprintf("Error cleaning S3 files: %s", e$message))
  })

  invisible(NULL)
}

#' Compute environment image hash
#'
#' Computes the hash used to tag environment Docker images, combining the
#' renv.lock file contents with the starburst package version. This ensures
#' new images are built when either the R package environment or the starburst
#' worker script changes.
#'
#' @param lock_file Path to renv.lock file
#' @return MD5 hash string
#' @keywords internal
compute_env_hash <- function(lock_file) {
  # Read version from DESCRIPTION to handle dev vs installed discrepancies
  desc_path <- system.file("DESCRIPTION", package = "starburst")
  if (nzchar(desc_path)) {
    pkg_version <- read.dcf(desc_path, fields = "Version")[1, "Version"]
  } else {
    pkg_version <- as.character(utils::packageVersion("starburst"))
  }
  hash_input <- paste0(readLines(lock_file, warn = FALSE), collapse = "\n", pkg_version)
  digest::digest(hash_input, algo = "md5")
}

#' Ensure environment is ready
#'
#' @keywords internal
ensure_environment <- function(region) {
  # Get renv lock file hash
  lock_file <- renv::paths$lockfile()

  # Walk up directory tree to find the canonical package root renv.lock.
  # This is needed because testthat sets CWD to tests/testthat/ during tests,
  # causing renv::paths$lockfile() to return tests/testthat/renv.lock instead
  # of the package root renv.lock. We prefer the renv.lock closest to an R/
  # directory (indicating an R package root).
  search_dir <- dirname(lock_file)
  found_root_lock <- FALSE
  for (i in seq_len(10)) {
    parent <- dirname(search_dir)
    if (parent == search_dir) break  # reached filesystem root
    candidate <- file.path(parent, "renv.lock")
    if (file.exists(candidate) && dir.exists(file.path(parent, "R"))) {
      # Found a renv.lock next to an R/ directory - this is the package root
      lock_file <- candidate
      found_root_lock <- TRUE
      break
    }
    search_dir <- parent
  }

  if (!file.exists(lock_file)) {
    # No lockfile found anywhere - create a new snapshot as fallback
    # force = TRUE allows locally installed packages like starburst itself
    renv::snapshot(prompt = FALSE, force = TRUE)
    lock_file <- renv::paths$lockfile()
  }

  # Calculate hash using shared helper
  env_hash <- compute_env_hash(lock_file)

  # Get configuration for ECR URI
  config <- get_starburst_config()
  account_id <- config$aws_account_id
  ecr_uri <- sprintf("%s.dkr.ecr.%s.amazonaws.com/starburst-worker", account_id, region)
  image_uri <- sprintf("%s:%s", ecr_uri, env_hash)

  # Check if image exists in ECR
  image_exists <- check_ecr_image_exists(env_hash, region)

  if (!image_exists) {
    cat_info("[Setup] Building Docker image for R environment (this may take 5-10 minutes)...\n")
    build_environment_image(env_hash, region)
  }

  # Get cluster name from config
  cluster <- config$cluster %||% "starburst-cluster"

  # Return environment info
  list(
    hash = env_hash,
    image_uri = image_uri,
    cluster = cluster
  )
}

#' Check if ECR image exists
#'
#' @keywords internal
check_ecr_image_exists <- function(tag, region) {
  config <- get_starburst_config()
  ecr <- get_ecr_client(region)

  tryCatch({
    images <- ecr$describe_images(
      repositoryName = "starburst-worker",
      imageIds = list(list(imageTag = tag))
    )

    length(images$imageDetails) > 0
  }, error = function(e) {
    return(FALSE)
  })
}

#' Get base image URI
#'
#' @keywords internal
get_base_image_uri <- function(region) {
  config <- get_starburst_config()
  account_id <- config$aws_account_id
  r_version <- paste0(R.version$major, ".", R.version$minor)

  # Use base image tag based on R version
  base_tag <- sprintf("base-%s", r_version)

  sprintf("%s.dkr.ecr.%s.amazonaws.com/starburst-worker:%s",
          account_id, region, base_tag)
}

#' Get CPU architecture from instance type
#'
#' @param instance_type EC2 instance type (e.g., "c7g.xlarge", "c7i.xlarge", "c7a.xlarge")
#' @return CPU architecture ("ARM64" or "X86_64")
#' @keywords internal
get_architecture_from_instance_type <- function(instance_type) {
  # Graviton instances end with 'g' in the instance family
  # Examples: c7g/c8g (Graviton3/4), t4g, r7g/r8g, m7g/m8g
  #
  # Intel/AMD instances use other suffixes:
  # - 'i' = Intel (c7i, c8i)
  # - 'a' = AMD (c8a, c7a, c6a - 8th/7th/6th gen)
  # - 'n' = Network optimized
  #
  # Best price/performance (Feb 2026): c8a > c8g > c7a
  if (grepl("^[cmrt][0-9]+g\\.", instance_type)) {
    return("ARM64")
  } else {
    return("X86_64")
  }
}

#' Get instance specifications (vCPUs, memory)
#'
#' @keywords internal
get_instance_specs <- function(instance_type) {
  # Common instance type specs
  # Format: "family.size" -> c(vcpus, memory_gb)
  specs_map <- list(
    # C6a (AMD, 3rd gen EPYC)
    "c6a.large" = c(2, 4),
    "c6a.xlarge" = c(4, 8),
    "c6a.2xlarge" = c(8, 16),
    "c6a.4xlarge" = c(16, 32),
    # C7a (AMD, 4th gen EPYC)
    "c7a.large" = c(2, 4),
    "c7a.xlarge" = c(4, 8),
    "c7a.2xlarge" = c(8, 16),
    "c7a.4xlarge" = c(16, 32),
    # C7g (Graviton3)
    "c7g.large" = c(2, 4),
    "c7g.xlarge" = c(4, 8),
    "c7g.2xlarge" = c(8, 16),
    "c7g.4xlarge" = c(16, 32),
    # C7i (Intel)
    "c7i.large" = c(2, 4),
    "c7i.xlarge" = c(4, 8),
    "c7i.2xlarge" = c(8, 16),
    "c7i.4xlarge" = c(16, 32)
  )

  if (!instance_type %in% names(specs_map)) {
    stop(sprintf("Unknown instance type: %s. Supported types: %s",
                 instance_type, paste(names(specs_map), collapse = ", ")))
  }

  specs <- specs_map[[instance_type]]
  list(
    vcpus = specs[1],
    memory_gb = specs[2]
  )
}

#' Build base Docker image with common dependencies
#'
#' @keywords internal
build_base_image <- function(region) {
  cat_info("[Docker] Building staRburst base image...\n")

  # Validate Docker is installed
  tryCatch({
    safe_system("docker", c("--version"), stdout = TRUE, stderr = TRUE)
  }, error = function(e) {
    stop("Docker is not installed or not accessible. Please install Docker: https://docs.docker.com/get-docker/")
  })

  # Get configuration
  config <- get_starburst_config()
  account_id <- config$aws_account_id
  r_version <- paste0(R.version$major, ".", R.version$minor)
  base_tag <- sprintf("base-%s", r_version)

  # Check if base image already exists
  if (check_ecr_image_exists(base_tag, region)) {
    base_uri <- get_base_image_uri(region)

    # Check image age if TTL is configured
    ttl_days <- config$ecr_image_ttl_days
    if (!is.null(ttl_days)) {
      image_fresh <- check_ecr_image_age(region, base_tag, ttl_days, force_rebuild = FALSE)
      if (!image_fresh) {
        cat_info("   * Image expired, rebuilding...\n")
        # Continue to rebuild below
      } else {
        cat_success(sprintf("[OK] Base image already exists: %s\n", base_uri))
        return(base_uri)
      }
    } else {
      cat_success(sprintf("[OK] Base image already exists: %s\n", base_uri))
      return(base_uri)
    }
  }

  # Create temporary build directory
  build_dir <- tempfile(pattern = "starburst_base_build_")
  dir.create(build_dir, recursive = TRUE)
  on.exit(unlink(build_dir, recursive = TRUE), add = TRUE)

  tryCatch({
    # Process Dockerfile.base template
    dockerfile_template <- system.file("templates", "Dockerfile.base", package = "starburst")
    if (!file.exists(dockerfile_template)) {
      stop("Dockerfile.base template not found")
    }

    template_content <- readLines(dockerfile_template)
    dockerfile_content <- gsub("\\{\\{R_VERSION\\}\\}", r_version, template_content)
    writeLines(dockerfile_content, file.path(build_dir, "Dockerfile"))

    cat_info(sprintf("   * Build directory: %s\n", build_dir))
    cat_info(sprintf("   * R version: %s\n", r_version))
    cat_info("   * This includes system deps + renv + future/globals/qs/paws\n")
    cat_info("   * This is a one-time build (3-5 min), reused by all projects\n")

    # Authenticate with ECR
    cat_info("   * Authenticating with ECR...\n")
    ecr <- get_ecr_client(region)
    auth_token <- ecr$get_authorization_token()

    if (length(auth_token$authorizationData) == 0) {
      stop("Failed to get ECR authorization token")
    }

    token_data <- auth_token$authorizationData[[1]]
    decoded_token <- rawToChar(base64enc::base64decode(token_data$authorizationToken))
    token_parts <- strsplit(decoded_token, ":")[[1]]
    password <- token_parts[2]

    # Docker login - pass password via stdin (secure, no shell exposure)
    login_result <- tryCatch({
      safe_system(
        "docker",
        c("login", "--username", "AWS", "--password-stdin", token_data$proxyEndpoint),
        stdin = password
      )
      TRUE
    }, error = function(e) {
      stop(sprintf("Failed to authenticate with ECR for account %s in region %s: %s",
                  account_id, region, e$message))
    })

    # Build multi-platform base image
    ecr_uri <- sprintf("%s.dkr.ecr.%s.amazonaws.com/starburst-worker", account_id, region)
    image_tag <- sprintf("%s:%s", ecr_uri, base_tag)
    cat_info(sprintf("   * Building multi-platform base image: %s\n", image_tag))
    cat_info("   * Platforms: linux/amd64, linux/arm64\n")

    # Ensure buildx builder exists with docker-container driver (required for multi-platform)
    # Per Docker docs: use --bootstrap flag and set as default with --use
    # Try to create builder, if it exists, use it
    tryCatch({
      safe_system(
        "docker",
        c("buildx", "create", "--name", "starburst-builder",
          "--driver", "docker-container", "--bootstrap", "--use"),
        stdout = FALSE, stderr = FALSE
      )
    }, error = function(e) {
      # Builder might already exist, try to use it
      tryCatch({
        safe_system("docker", c("buildx", "use", "starburst-builder"),
                   stdout = FALSE, stderr = FALSE)
      }, error = function(e2) {
        cat_warn("Warning: Failed to setup buildx builder, will try default\n")
      })
    })

    # Build and push multi-platform image (no cache for clean multi-platform build)
    safe_system(
      "docker",
      c("buildx", "build",
        "--builder", "starburst-builder",
        "--platform", "linux/amd64,linux/arm64",
        "--no-cache",
        "-t", image_tag,
        "--push",
        build_dir)
    )

    cat_success(sprintf("[OK] Base image built and pushed: %s\n", image_tag))
    cat_success("[OK] This base image will be reused by all future projects\n")

    return(image_tag)

  }, error = function(e) {
    cat_error(sprintf("[ERROR] Base image build failed: %s\n", e$message))
    stop(e)
  })
}

#' Get base image source URI
#'
#' @param use_public Logical, use public ECR base image (default TRUE)
#' @keywords internal
get_base_image_source <- function(use_public = TRUE) {
  r_version <- paste0(R.version$major, ".", R.version$minor)

  if (use_public) {
    # Public ECR (no auth needed, instant pull)
    # NOTE: This will be available when public images are published
    return(sprintf("public.ecr.aws/starburst/base:r%s", r_version))
  } else {
    # Private ECR (build if missing)
    config <- get_starburst_config()
    account_id <- config$aws_account_id
    region <- config$region
    return(sprintf("%s.dkr.ecr.%s.amazonaws.com/starburst-worker:base-%s",
                   account_id, region, r_version))
  }
}

#' Ensure base image exists
#'
#' @param region AWS region
#' @param use_public Logical, use public ECR base image (default TRUE)
#' @keywords internal
ensure_base_image <- function(region, use_public = NULL) {
  # Get preference from config or default to FALSE (safer default)
  if (is.null(use_public)) {
    config <- get_starburst_config()
    use_public <- config$use_public_base %||% FALSE
  }

  r_version <- paste0(R.version$major, ".", R.version$minor)
  base_tag <- sprintf("base-%s", r_version)

  if (use_public) {
    # Try public base image first, fall back to private if not available
    base_uri <- get_base_image_source(use_public = TRUE)
    cat_info(sprintf("Trying public base image: %s\n", base_uri))

    # For now, public images aren't published yet, so fall back
    cat_warn("   Public base images not yet available\n")
    cat_info("   Falling back to private base image build...\n")
    use_public <- FALSE
  }

  if (!use_public) {
    # Use private base image (build if needed)
    if (!check_ecr_image_exists(base_tag, region)) {
      cat_info("[Package] Base image not found in private ECR, building it now...\n")
      cat_info("   (This will take 3-5 minutes, but only needed once per R version)\n")
      build_base_image(region)
    } else {
      base_uri <- get_base_image_uri(region)
      cat_info(sprintf("[OK] Using existing private base image: %s\n", base_uri))
    }

    return(get_base_image_uri(region))
  }
}

#' Build environment image
#'
#' @param tag Image tag
#' @param region AWS region
#' @param use_public Logical, use public ECR base image (default NULL = from config)
#' @keywords internal
build_environment_image <- function(tag, region, use_public = NULL) {
  cat_info("[Docker] Building project Docker image...\n")

  # Validate Docker is installed
  tryCatch({
    safe_system("docker", c("--version"), stdout = TRUE, stderr = TRUE)
  }, error = function(e) {
    stop("Docker is not installed or not accessible. Please install Docker: https://docs.docker.com/get-docker/")
  })

  # Ensure base image exists (will build if needed, or use public)
  base_image_uri <- ensure_base_image(region, use_public = use_public)

  # Get configuration
  config <- get_starburst_config()
  account_id <- config$aws_account_id

  # Get ECR repository URI
  ecr_uri <- sprintf("%s.dkr.ecr.%s.amazonaws.com/starburst-worker", account_id, region)

  # Create temporary build directory
  build_dir <- tempfile(pattern = "starburst_build_")
  dir.create(build_dir, recursive = TRUE)
  on.exit(unlink(build_dir, recursive = TRUE), add = TRUE)

  tryCatch({
    # Copy renv.lock from project, excluding staRburst itself
    if (!file.exists("renv.lock")) {
      stop("renv.lock not found in current directory. Initialize renv first with: renv::init()")
    }

    # Read and filter lock file to exclude starburst package
    lock_data <- jsonlite::fromJSON("renv.lock", simplifyVector = FALSE)
    if (!is.null(lock_data$Packages$starburst)) {
      lock_data$Packages$starburst <- NULL
    }
    jsonlite::write_json(lock_data, file.path(build_dir, "renv.lock"),
                        pretty = TRUE, auto_unbox = TRUE)

    # Copy worker script
    worker_script <- system.file("templates", "worker.R", package = "starburst")
    if (!file.exists(worker_script)) {
      stop("Worker script template not found")
    }
    file.copy(worker_script, file.path(build_dir, "worker.R"))

    # Process Dockerfile template
    dockerfile_template <- system.file("templates", "Dockerfile.template", package = "starburst")
    if (!file.exists(dockerfile_template)) {
      stop("Dockerfile template not found")
    }

    template_content <- readLines(dockerfile_template)
    dockerfile_content <- gsub("\\{\\{BASE_IMAGE\\}\\}", base_image_uri, template_content)
    writeLines(dockerfile_content, file.path(build_dir, "Dockerfile"))

    cat_info(sprintf("   * Build directory: %s\n", build_dir))
    cat_info(sprintf("   * Base image: %s\n", base_image_uri))
    cat_info("   * Building only project-specific packages...\n")

    # Authenticate with ECR
    cat_info("   * Authenticating with ECR...\n")
    ecr <- get_ecr_client(region)
    auth_token <- ecr$get_authorization_token()

    if (length(auth_token$authorizationData) == 0) {
      stop("Failed to get ECR authorization token")
    }

    token_data <- auth_token$authorizationData[[1]]
    decoded_token <- rawToChar(base64enc::base64decode(token_data$authorizationToken))
    token_parts <- strsplit(decoded_token, ":")[[1]]
    password <- token_parts[2]

    # Docker login - pass password via stdin (secure, no shell exposure)
    login_result <- tryCatch({
      safe_system(
        "docker",
        c("login", "--username", "AWS", "--password-stdin", token_data$proxyEndpoint),
        stdin = password
      )
      TRUE
    }, error = function(e) {
      stop(sprintf("Failed to authenticate with ECR for account %s in region %s: %s",
                  account_id, region, e$message))
    })

    # Build multi-platform image
    image_tag <- sprintf("%s:%s", ecr_uri, tag)
    cat_info(sprintf("   * Building multi-platform image: %s\n", image_tag))
    cat_info("   * Platforms: linux/amd64, linux/arm64\n")

    # Ensure buildx builder exists with docker-container driver (required for multi-platform)
    # Per Docker docs: use --bootstrap flag and set as default with --use
    # Try to create builder, if it exists, use it
    tryCatch({
      safe_system(
        "docker",
        c("buildx", "create", "--name", "starburst-builder",
          "--driver", "docker-container", "--bootstrap", "--use"),
        stdout = FALSE, stderr = FALSE
      )
    }, error = function(e) {
      # Builder might already exist, try to use it
      tryCatch({
        safe_system("docker", c("buildx", "use", "starburst-builder"),
                   stdout = FALSE, stderr = FALSE)
      }, error = function(e2) {
        cat_warn("Warning: Failed to setup buildx builder, will try default\n")
      })
    })

    # Build and push multi-platform image (no cache for clean multi-platform build)
    safe_system(
      "docker",
      c("buildx", "build",
        "--builder", "starburst-builder",
        "--platform", "linux/amd64,linux/arm64",
        "--no-cache",
        "-t", image_tag,
        "--push",
        build_dir)
    )

    cat_success(sprintf("[OK] Image built and pushed: %s\n", image_tag))

    return(image_tag)

  }, error = function(e) {
    cat_error(sprintf("[ERROR] Image build failed: %s\n", e$message))
    stop(e)
  })
}

#' Build initial environment
#'
#' @keywords internal
build_initial_environment <- function(region) {
  ensure_environment(region)
}

#' Ensure CloudWatch log group exists
#'
#' @keywords internal
ensure_log_group <- function(log_group_name, region) {
  logs <- paws.management::cloudwatchlogs(
    config = list(
      credentials = list(profile = Sys.getenv("AWS_PROFILE", "default")),
      region = region
    )
  )

  # Check if log group exists
  log_group_exists <- FALSE
  tryCatch({
    result <- logs$describe_log_groups(logGroupNamePrefix = log_group_name)

    # If log group exists with exact name match, return
    if (length(result$logGroups) > 0) {
      for (lg in result$logGroups) {
        if (lg$logGroupName == log_group_name) {
          log_group_exists <- TRUE
          break
        }
      }
    }
  }, error = function(e) {
    # ResourceNotFoundException means log group doesn't exist
    if (grepl("ResourceNotFoundException", e$message)) {
      log_group_exists <<- FALSE
    } else {
      stop(e)
    }
  })

  if (log_group_exists) {
    return(invisible(NULL))
  }

  # Create log group if it doesn't exist
  tryCatch({
    logs$create_log_group(logGroupName = log_group_name)
    cat_info(sprintf("   * Created log group: %s\n", log_group_name))
  }, error = function(e) {
    # Ignore if already exists
    if (!grepl("ResourceAlreadyExistsException", e$message)) {
      stop(e)
    }
  })
}

#' Get IAM execution role ARN
#'
#' Returns the ARN for the ECS execution role (should be created during setup)
#'
#' @keywords internal
get_execution_role_arn <- function(region) {
  config <- get_starburst_config()

  # Use role from config if available
  if (!is.null(config$execution_role_arn)) {
    return(config$execution_role_arn)
  }

  # Return default role ARN
  aws_account_id <- config$aws_account_id
  role_name <- "starburstECSExecutionRole"

  sprintf("arn:aws:iam::%s:role/%s", aws_account_id, role_name)
}

#' Get IAM task role ARN
#'
#' Returns the ARN for the ECS task role (should be created during setup)
#'
#' @keywords internal
get_task_role_arn <- function(region) {
  config <- get_starburst_config()

  # Use role from config if available
  if (!is.null(config$task_role_arn)) {
    return(config$task_role_arn)
  }

  # Return default role ARN
  aws_account_id <- config$aws_account_id
  role_name <- "starburstECSTaskRole"

  sprintf("arn:aws:iam::%s:role/%s", aws_account_id, role_name)
}

#' Get or create task definition
#'
#' @keywords internal
get_or_create_task_definition <- function(plan) {
  cat_info("[Info] Preparing task definition...\n")

  ecs <- get_ecs_client(plan$region)
  config <- get_starburst_config()

  # Calculate CPU and memory in ECS units
  # CPU: 1 vCPU = 1024 units
  cpu <- plan$cpu %||% plan$worker_cpu
  cpu_units <- as.character(as.integer(cpu * 1024))

  # Memory in MB (parse from string like "8GB")
  memory <- plan$memory %||% plan$worker_memory
  if (is.character(memory)) {
    memory <- as.numeric(gsub("[^0-9.]", "", memory))
  }
  memory_mb <- as.character(as.integer(memory * 1024))

  # Ensure log group exists
  log_group_name <- "/aws/ecs/starburst-worker"
  ensure_log_group(log_group_name, plan$region)

  # Get IAM roles
  execution_role_arn <- get_execution_role_arn(plan$region)
  task_role_arn <- get_task_role_arn(plan$region)

  # Check for existing compatible task definition
  family_name <- "starburst-worker"

  tryCatch({
    task_defs <- ecs$list_task_definitions(
      familyPrefix = family_name,
      status = "ACTIVE",
      sort = "DESC",
      maxResults = 10
    )

    # Check if any existing task definition matches our requirements
    for (task_def_arn in task_defs$taskDefinitionArns) {
      task_def <- ecs$describe_task_definition(taskDefinition = task_def_arn)$taskDefinition

      # Check if CPU, memory, image, and launch type match
      if (task_def$cpu == cpu_units &&
          task_def$memory == memory_mb &&
          length(task_def$containerDefinitions) > 0) {

        # Check image
        container_def <- task_def$containerDefinitions[[1]]
        if (container_def$image != plan$image_uri) {
          next
        }

        # Check launch type compatibility
        launch_type <- plan$launch_type %||% "FARGATE"
        compatibilities <- task_def$requiresCompatibilities %||% list()

        if (!launch_type %in% compatibilities) {
          next
        }

        # For EC2, also check runtimePlatform matches
        if (launch_type == "EC2") {
          expected_arch <- plan$architecture %||% "X86_64"
          actual_arch <- task_def$runtimePlatform$cpuArchitecture %||% "X86_64"

          if (actual_arch != expected_arch) {
            next
          }
        }

        cat_success(sprintf("[OK] Using existing task definition: %s\n", task_def$taskDefinitionArn))
        return(task_def$taskDefinitionArn)
      }
    }
  }, error = function(e) {
    # Continue to create new task definition
  })

  # Create new task definition
  cat_info("   * Registering new task definition...\n")
  cat_info(sprintf("     CPU: %s units, Memory: %s MB\n", cpu_units, memory_mb))

  # For Fargate, container-level memory can be omitted if task-level memory is set
  # But we'll include it to ensure proper resource allocation
  container_memory <- as.integer(memory_mb)

  container_def <- list(
    name = "starburst-worker",
    image = plan$image_uri,
    cpu = 0,  # Not specifying container-level CPU for Fargate
    memory = container_memory,
    essential = TRUE,
    environment = list(),  # Empty environment variables
    logConfiguration = list(
      logDriver = "awslogs",
      options = list(
        "awslogs-group" = log_group_name,
        "awslogs-region" = plan$region,
        "awslogs-stream-prefix" = "starburst"
      )
    )
  )

  cat_info(sprintf("     Container memory: %d MB\n", container_memory))

  # Build task definition parameters
  task_def_params <- list(
    family = family_name,
    networkMode = "awsvpc",
    cpu = cpu_units,
    memory = memory_mb,
    executionRoleArn = execution_role_arn,
    taskRoleArn = task_role_arn,
    containerDefinitions = list(container_def)
  )

  # Add launch type specific parameters
  if (!is.null(plan$launch_type) && plan$launch_type == "EC2") {
    task_def_params$requiresCompatibilities <- list("EC2")
    task_def_params$runtimePlatform <- list(
      cpuArchitecture = plan$architecture,
      operatingSystemFamily = "LINUX"
    )
    cat_info(sprintf("     Launch type: EC2, Architecture: %s\n", plan$architecture))
  } else {
    task_def_params$requiresCompatibilities <- list("FARGATE")
    cat_info("     Launch type: FARGATE\n")
  }

  response <- do.call(ecs$register_task_definition, task_def_params)

  task_def_arn <- response$taskDefinition$taskDefinitionArn
  cat_success(sprintf("[OK] Task definition registered: %s\n", task_def_arn))

  return(task_def_arn)
}

#' Get task registry environment
#'
#' @keywords internal
get_task_registry <- function() {
  if (!exists(".starburst_task_registry", envir = .starburst_env)) {
    assign(".starburst_task_registry", new.env(parent = emptyenv()), envir = .starburst_env)
  }
  get(".starburst_task_registry", envir = .starburst_env)
}

#' Store task ARN
#'
#' @keywords internal
store_task_arn <- function(task_id, task_arn) {
  registry <- get_task_registry()
  registry[[task_id]] <- list(
    task_arn = task_arn,
    submitted_at = Sys.time()
  )
  invisible(NULL)
}

#' Get task ARN
#'
#' @keywords internal
get_task_arn <- function(task_id) {
  registry <- get_task_registry()
  if (exists(task_id, envir = registry)) {
    registry[[task_id]]$task_arn
  } else {
    NULL
  }
}

#' List all stored task ARNs
#'
#' @keywords internal
list_task_arns <- function() {
  registry <- get_task_registry()
  task_ids <- ls(registry)
  if (length(task_ids) == 0) {
    return(list())
  }

  result <- list()
  for (task_id in task_ids) {
    result[[task_id]] <- registry[[task_id]]
  }
  result
}

#' List active clusters
#'
#' @keywords internal
list_active_clusters <- function(region) {
  ecs <- get_ecs_client(region)

  tryCatch({
    # List all running tasks in the starburst cluster
    task_list <- ecs$list_tasks(
      cluster = "starburst-cluster",
      desiredStatus = "RUNNING"
    )

    if (length(task_list$taskArns) == 0) {
      return(list())
    }

    # Describe tasks to get details
    tasks <- ecs$describe_tasks(
      cluster = "starburst-cluster",
      tasks = task_list$taskArns
    )

    # Group by cluster ID from environment variables
    clusters <- list()

    for (task in tasks$tasks) {
      # Extract CLUSTER_ID from environment variables
      cluster_id <- NULL
      if (!is.null(task$overrides) && !is.null(task$overrides$containerOverrides)) {
        for (container in task$overrides$containerOverrides) {
          if (!is.null(container$environment)) {
            for (env_var in container$environment) {
              if (env_var$name == "CLUSTER_ID") {
                cluster_id <- env_var$value
                break
              }
            }
          }
          if (!is.null(cluster_id)) break
        }
      }

      if (!is.null(cluster_id)) {
        if (is.null(clusters[[cluster_id]])) {
          clusters[[cluster_id]] <- list(
            cluster_id = cluster_id,
            task_count = 0,
            tasks = list()
          )
        }

        clusters[[cluster_id]]$task_count <- clusters[[cluster_id]]$task_count + 1
        clusters[[cluster_id]]$tasks <- append(
          clusters[[cluster_id]]$tasks,
          list(list(
            task_arn = task$taskArn,
            started_at = task$startedAt,
            status = task$lastStatus
          ))
        )
      }
    }

    return(clusters)

  }, error = function(e) {
    # Return empty list if cluster doesn't exist or other error
    return(list())
  })
}

#' Get or create subnets
#'
#' @keywords internal
get_or_create_subnets <- function(vpc_id, region) {
  ec2 <- get_ec2_client(region)

  # Check for existing subnets with starburst tag
  subnets <- ec2$describe_subnets(
    Filters = list(
      list(Name = "vpc-id", Values = list(vpc_id)),
      list(Name = "tag:ManagedBy", Values = list("starburst"))
    )
  )

  if (length(subnets$Subnets) > 0) {
    return(vapply(subnets$Subnets, function(s) s$SubnetId, FUN.VALUE = character(1)))
  }

  # Get existing subnets to check if VPC has any
  all_subnets <- ec2$describe_subnets(
    Filters = list(
      list(Name = "vpc-id", Values = list(vpc_id))
    )
  )

  if (length(all_subnets$Subnets) > 0) {
    # Use existing subnets (don't create new ones unnecessarily)
    return(vapply(all_subnets$Subnets, function(s) s$SubnetId, FUN.VALUE = character(1)))
  }

  # Create subnets in multiple availability zones
  cat_info("   * Creating subnets for VPC...\n")

  # Get available AZs
  azs <- ec2$describe_availability_zones(
    Filters = list(
      list(Name = "region-name", Values = list(region)),
      list(Name = "state", Values = list("available"))
    )
  )

  if (length(azs$AvailabilityZones) == 0) {
    stop("No availability zones found in region")
  }

  # Create subnets in first 2-3 AZs
  num_subnets <- min(3, length(azs$AvailabilityZones))
  subnet_ids <- character(0)

  for (i in 1:num_subnets) {
    az <- azs$AvailabilityZones[[i]]$ZoneName
    cidr_block <- sprintf("10.0.%d.0/24", i)

    tryCatch({
      subnet <- ec2$create_subnet(
        VpcId = vpc_id,
        CidrBlock = cidr_block,
        AvailabilityZone = az
      )

      subnet_id <- subnet$Subnet$SubnetId

      # Tag subnet
      ec2$create_tags(
        Resources = list(subnet_id),
        Tags = list(
          list(Key = "Name", Value = sprintf("starburst-subnet-%d", i)),
          list(Key = "ManagedBy", Value = "starburst")
        )
      )

      # Enable auto-assign public IP
      ec2$modify_subnet_attribute(
        SubnetId = subnet_id,
        MapPublicIpOnLaunch = list(Value = TRUE)
      )

      subnet_ids <- c(subnet_ids, subnet_id)
      cat_info(sprintf("   * Created subnet: %s in %s\n", subnet_id, az))

    }, error = function(e) {
      cat_warn(sprintf("   * Failed to create subnet in %s: %s\n", az, e$message))
    })
  }

  if (length(subnet_ids) == 0) {
    stop("Failed to create any subnets")
  }

  return(subnet_ids)
}

#' Get or create security group
#'
#' @keywords internal
get_or_create_security_group <- function(vpc_id, region) {
  ec2 <- get_ec2_client(region)

  # Look for existing starburst security group
  sgs <- ec2$describe_security_groups(
    Filters = list(
      list(Name = "vpc-id", Values = list(vpc_id)),
      list(Name = "group-name", Values = list("starburst-worker"))
    )
  )

  if (length(sgs$SecurityGroups) > 0) {
    return(sgs$SecurityGroups[[1]]$GroupId)
  }

  # Create security group
  sg <- ec2$create_security_group(
    GroupName = "starburst-worker",
    Description = "Security group for staRburst workers",
    VpcId = vpc_id
  )

  sg$GroupId
}

#' Get VPC configuration for ECS tasks
#'
#' @keywords internal
get_vpc_config <- function(region) {
  ec2 <- get_ec2_client(region)

  # Get default VPC
  vpcs <- ec2$describe_vpcs(
    Filters = list(
      list(Name = "isDefault", Values = list("true"))
    )
  )

  if (length(vpcs$Vpcs) == 0) {
    stop("No default VPC found. Please create a VPC in region: ", region)
  }

  vpc_id <- vpcs$Vpcs[[1]]$VpcId

  # Get or create subnets
  subnets <- get_or_create_subnets(vpc_id, region)

  if (length(subnets) == 0) {
    stop("Failed to create subnets in VPC: ", vpc_id)
  }

  # Get or create security group
  sg_id <- get_or_create_security_group(vpc_id, region)

  list(
    vpc_id = vpc_id,
    subnets = as.list(subnets),
    security_groups = list(sg_id)
  )
}

Try the starburst package in your browser

Any scripts or data that you put into this service are public.

starburst documentation built on March 19, 2026, 5:08 p.m.