R/agent_runtime.R

Defines functions run_agent_runtime agent_runtime_deliver_final_text agent_runtime_make_blocked_result agent_runtime_append_provider_messages agent_runtime_generate_final_answer agent_runtime_build_final_answer agent_runtime_policy_decide agent_runtime_parse_policy_decision agent_runtime_policy_fallback agent_runtime_tool_results_have_error agent_runtime_tool_results_have_success agent_runtime_anthropic_content_without_text new_agent_runtime_thinking_markup_filter agent_runtime_pending_tag_suffix agent_runtime_emit_stream_event agent_runtime_has_tool_calls agent_runtime_tool_signature task_state_add_tool_results agent_runtime_tool_observation agent_runtime_goal_from_messages agent_runtime_text task_state_set_decision task_state_set_status new_agent_decision run_trace_add new_run_trace new_task_state

# Agent Runtime
#
# Internal task-state driven runtime for tool-using text generation.

aisdk_task_state_statuses <- c(
  "running",
  "continuing",
  "finalizing",
  "completed",
  "waiting_user",
  "blocked",
  "aborted_safety",
  "cancelled",
  "error"
)

aisdk_agent_decisions <- c(
  "continue",
  "finalize",
  "ask_user",
  "blocked",
  "abort_for_safety"
)

#' @keywords internal
new_task_state <- function(goal = NULL,
                           status = "running",
                           phase = "running",
                           observations = list(),
                           artifacts = list(),
                           failures = list(),
                           open_questions = character(0),
                           risk_gate = list(pending = FALSE, reason = NULL),
                           can_finalize = FALSE,
                           decision = NULL,
                           blocker = NULL,
                           last_tool_results = list(),
                           budget = list(),
                           run_id = NULL,
                           details = list()) {
  if (!status %in% aisdk_task_state_statuses) {
    rlang::abort(sprintf("Unknown task state status: %s", status))
  }

  structure(
    list(
      run_id = run_id %||% paste0("run_", generate_stable_id("task", Sys.time(), stats::runif(1))),
      status = status,
      phase = phase %||% status,
      goal = goal %||% "",
      observations = observations %||% list(),
      artifacts = artifacts %||% list(),
      failures = failures %||% list(),
      open_questions = open_questions %||% character(0),
      risk_gate = risk_gate %||% list(pending = FALSE, reason = NULL),
      can_finalize = isTRUE(can_finalize),
      decision = decision,
      blocker = blocker,
      last_tool_results = last_tool_results %||% list(),
      budget = budget %||% list(),
      details = details %||% list(),
      created_at = format(Sys.time(), "%Y-%m-%dT%H:%M:%OS3%z"),
      updated_at = format(Sys.time(), "%Y-%m-%dT%H:%M:%OS3%z")
    ),
    class = c("aisdk_task_state", "aisdk_run_state")
  )
}

#' @keywords internal
new_run_trace <- function(run_id = NULL) {
  structure(
    list(
      run_id = run_id %||% paste0("run_", generate_stable_id("trace", Sys.time(), stats::runif(1))),
      events = list()
    ),
    class = "aisdk_run_trace"
  )
}

#' @keywords internal
run_trace_add <- function(trace, type, payload = list()) {
  trace <- trace %||% new_run_trace()
  trace$events[[length(trace$events) + 1L]] <- list(
    event_id = paste0("evt_", generate_stable_id(type, Sys.time(), stats::runif(1))),
    timestamp = format(Sys.time(), "%Y-%m-%dT%H:%M:%OS3%z"),
    type = type,
    payload = payload %||% list()
  )
  trace
}

#' @keywords internal
new_agent_decision <- function(decision = "continue",
                               reason = NULL,
                               next_instruction = NULL,
                               needs_user_question = NULL,
                               final_answer_hint = NULL) {
  if (!decision %in% aisdk_agent_decisions) {
    rlang::abort(sprintf("Unknown agent decision: %s", decision))
  }
  structure(
    list(
      decision = decision,
      reason = reason %||% decision,
      next_instruction = next_instruction,
      needs_user_question = needs_user_question,
      final_answer_hint = final_answer_hint,
      timestamp = format(Sys.time(), "%Y-%m-%dT%H:%M:%OS3%z")
    ),
    class = "aisdk_agent_decision"
  )
}

#' @keywords internal
task_state_set_status <- function(task_state, status, phase = NULL, blocker = NULL) {
  if (!status %in% aisdk_task_state_statuses) {
    rlang::abort(sprintf("Unknown task state status: %s", status))
  }
  task_state$status <- status
  task_state$phase <- phase %||% status
  if (!is.null(blocker)) {
    task_state$blocker <- blocker
  }
  task_state$updated_at <- format(Sys.time(), "%Y-%m-%dT%H:%M:%OS3%z")
  task_state
}

#' @keywords internal
task_state_set_decision <- function(task_state, decision) {
  task_state$decision <- decision
  task_state$updated_at <- format(Sys.time(), "%Y-%m-%dT%H:%M:%OS3%z")
  task_state
}

#' @keywords internal
agent_runtime_text <- function(value, width = 800) {
  text <- tryCatch(
    {
      if (is.null(value)) {
        ""
      } else if (is.character(value)) {
        paste(value, collapse = "\n")
      } else {
        safe_to_json(value, auto_unbox = TRUE)
      }
    },
    error = function(e) as.character(value)[[1]] %||% ""
  )
  compact_text_preview(text, width = width)
}

#' @keywords internal
agent_runtime_goal_from_messages <- function(messages) {
  messages <- messages %||% list()
  user_messages <- Filter(function(msg) identical(msg$role %||% "", "user"), messages)
  if (length(user_messages) == 0) {
    return("")
  }
  content <- user_messages[[length(user_messages)]]$content %||% ""
  agent_runtime_text(content, width = 1000)
}

#' @keywords internal
agent_runtime_tool_observation <- function(tool_result) {
  result <- tool_result$result %||% tool_result$raw_result %||% NULL
  list(
    id = tool_result$id %||% NULL,
    name = tool_result$name %||% "unknown",
    status = if (isTRUE(tool_result$is_error)) "error" else "ok",
    is_error = isTRUE(tool_result$is_error),
    result = agent_runtime_text(result, width = 800)
  )
}

#' @keywords internal
task_state_add_tool_results <- function(task_state, tool_results) {
  tool_results <- tool_results %||% list()
  if (length(tool_results) == 0) {
    return(task_state)
  }

  observations <- lapply(tool_results, agent_runtime_tool_observation)
  task_state$observations <- c(task_state$observations %||% list(), observations)
  task_state$last_tool_results <- run_state_tool_results_tail(tool_results)

  failures <- Filter(function(obs) isTRUE(obs$is_error), observations)
  if (length(failures) > 0) {
    task_state$failures <- c(task_state$failures %||% list(), failures)
  }
  if (any(vapply(observations, function(obs) !isTRUE(obs$is_error), logical(1)))) {
    task_state$can_finalize <- TRUE
  }
  task_state$updated_at <- format(Sys.time(), "%Y-%m-%dT%H:%M:%OS3%z")
  task_state
}

#' @keywords internal
agent_runtime_tool_signature <- function(tool_calls) {
  if (is.null(tool_calls) || length(tool_calls) == 0) {
    return(NULL)
  }
  paste(
    vapply(tool_calls, function(tc) {
      paste0(tc$name %||% "unknown", ":", safe_to_json(tc$arguments %||% list(), auto_unbox = TRUE))
    }, character(1)),
    collapse = "|"
  )
}

#' @keywords internal
agent_runtime_has_tool_calls <- function(result) {
  length(result$tool_calls %||% list()) > 0
}

#' @keywords internal
agent_runtime_emit_stream_event <- function(callback,
                                            type,
                                            text = NULL,
                                            done = FALSE,
                                            step = NULL,
                                            metadata = list()) {
  if (!is.function(callback)) {
    return(invisible(NULL))
  }
  event <- c(
    list(
      type = type,
      text = text,
      done = isTRUE(done),
      step = step
    ),
    metadata %||% list()
  )
  callback(event)
  invisible(event)
}

#' @keywords internal
agent_runtime_pending_tag_suffix <- function(text, patterns) {
  if (!nzchar(text %||% "")) {
    return("")
  }

  best <- ""
  for (pattern in patterns) {
    max_len <- min(nchar(pattern) - 1L, nchar(text))
    if (max_len <= 0L) {
      next
    }
    for (len in seq.int(max_len, 1L)) {
      suffix <- substr(text, nchar(text) - len + 1L, nchar(text))
      prefix <- substr(pattern, 1L, len)
      if (identical(suffix, prefix) && len > nchar(best)) {
        best <- suffix
      }
    }
  }
  best
}

#' @keywords internal
new_agent_runtime_thinking_markup_filter <- function() {
  state <- new.env(parent = emptyenv())
  state$in_thinking <- FALSE
  state$pending <- ""

  process <- function(text, done = FALSE) {
    text <- paste0(state$pending, text %||% "")
    state$pending <- ""
    if (!nzchar(text)) {
      return(list(visible = "", thinking = ""))
    }

    if (!isTRUE(done)) {
      patterns <- c("<think>", "</think>")
      pending <- agent_runtime_pending_tag_suffix(text, patterns)
      if (nzchar(pending)) {
        state$pending <- pending
        text <- substr(text, 1L, nchar(text) - nchar(pending))
      }
    }

    visible <- character()
    thinking <- character()
    while (nzchar(text)) {
      if (isTRUE(state$in_thinking)) {
        close_pos <- regexpr("</think>", text, fixed = TRUE)[[1]]
        if (close_pos > 0L) {
          close_end <- close_pos + nchar("</think>") - 1L
          thinking <- c(thinking, substr(text, 1L, close_end))
          text <- substr(text, close_end + 1L, nchar(text))
          state$in_thinking <- FALSE
        } else {
          thinking <- c(thinking, text)
          text <- ""
        }
      } else {
        open_pos <- regexpr("<think>", text, fixed = TRUE)[[1]]
        if (open_pos > 0L) {
          if (open_pos > 1L) {
            visible <- c(visible, substr(text, 1L, open_pos - 1L))
          }
          text <- substr(text, open_pos, nchar(text))
          state$in_thinking <- TRUE
        } else {
          visible <- c(visible, text)
          text <- ""
        }
      }
    }

    list(
      visible = paste0(visible, collapse = ""),
      thinking = paste0(thinking, collapse = "")
    )
  }

  list(process = process)
}

#' @keywords internal
agent_runtime_anthropic_content_without_text <- function(content) {
  if (!is.list(content)) {
    return(content)
  }
  kept <- Filter(function(block) {
    !identical(block$type %||% NULL, "text")
  }, content)
  if (length(kept) == 0) {
    return(content)
  }
  kept
}

#' @keywords internal
agent_runtime_tool_results_have_success <- function(tool_results) {
  any(vapply(tool_results %||% list(), function(tr) !isTRUE(tr$is_error), logical(1)))
}

#' @keywords internal
agent_runtime_tool_results_have_error <- function(tool_results) {
  any(vapply(tool_results %||% list(), function(tr) isTRUE(tr$is_error), logical(1)))
}

#' @keywords internal
agent_runtime_policy_fallback <- function(reason,
                                          task_state,
                                          all_tool_results = list(),
                                          budget = list()) {
  if (isTRUE(task_state$risk_gate$pending)) {
    return(new_agent_decision(
      "ask_user",
      reason = task_state$risk_gate$reason %||% "risk_gate_pending",
      needs_user_question = task_state$risk_gate$question %||% "Please confirm how to proceed."
    ))
  }

  if (reason %in% c("repeated_identical_tool_calls", "network_error", "no_progress_budget_exhausted")) {
    return(new_agent_decision(
      "blocked",
      reason = reason,
      final_answer_hint = "Explain what was attempted and why the task cannot safely continue without a changed approach."
    ))
  }

  if (reason %in% c("empty_after_tools", "protocol_exhausted", "tool_result_budget_exhausted")) {
    if (length(all_tool_results %||% list()) > 0) {
      return(new_agent_decision(
        "finalize",
        reason = reason,
        final_answer_hint = "Summarize the tool evidence for the user."
      ))
    }
  }

  if (reason %in% c("window_boundary", "tool_result_errors", "empty_no_progress")) {
    total_steps <- budget$total_steps %||% 0L
    max_total_steps <- budget$max_total_steps %||% Inf
    if (is.finite(max_total_steps) && total_steps >= max_total_steps) {
      if (length(all_tool_results %||% list()) > 0) {
        return(new_agent_decision(
          "finalize",
          reason = "execution_budget_exhausted",
          final_answer_hint = "Give the best answer possible from the observations."
        ))
      }
      return(new_agent_decision("blocked", reason = "execution_budget_exhausted"))
    }
    return(new_agent_decision(
      "continue",
      reason = reason,
      next_instruction = "Open another execution window and continue the task."
    ))
  }

  if (length(all_tool_results %||% list()) > 0) {
    return(new_agent_decision(
      "finalize",
      reason = reason %||% "tool_evidence_available",
      final_answer_hint = "Use the available tool observations to answer."
    ))
  }

  new_agent_decision("continue", reason = reason %||% "default_continue")
}

#' @keywords internal
agent_runtime_parse_policy_decision <- function(text) {
  parsed <- safe_parse_json(text %||% "")
  if (is.null(parsed) || !is.list(parsed)) {
    return(NULL)
  }
  decision <- parsed$decision %||% NULL
  if (is.null(decision) || !decision %in% aisdk_agent_decisions) {
    return(NULL)
  }
  new_agent_decision(
    decision = decision,
    reason = parsed$reason %||% "llm_policy",
    next_instruction = parsed$next_instruction %||% NULL,
    needs_user_question = parsed$needs_user_question %||% NULL,
    final_answer_hint = parsed$final_answer_hint %||% NULL
  )
}

#' @keywords internal
agent_runtime_policy_decide <- function(reason,
                                        task_state,
                                        trace,
                                        model = NULL,
                                        messages = NULL,
                                        base_params = list(),
                                        all_tool_results = list(),
                                        budget = list(),
                                        policy_config = list()) {
  use_llm_policy <- isTRUE(policy_config$use_llm) ||
    isTRUE(getOption("aisdk.agent_runtime.llm_policy", FALSE))

  if (isTRUE(use_llm_policy) && !is.null(model)) {
    policy_messages <- list(
      list(
        role = "system",
        content = paste(
          "You are the hidden policy controller for an agent runtime.",
          "Return only JSON with keys: decision, reason, next_instruction, needs_user_question, final_answer_hint.",
          "decision must be one of: continue, finalize, ask_user, blocked, abort_for_safety.",
          sep = "\n"
        )
      ),
      list(
        role = "user",
        content = safe_to_json(list(
          reason = reason,
          task_state = task_state,
          recent_observations = utils::tail(task_state$observations %||% list(), 6),
          budget = budget
        ), auto_unbox = TRUE)
      )
    )
    policy_params <- base_params
    policy_params$tools <- NULL
    policy_params$messages <- policy_messages
    policy_result <- tryCatch(
      model$do_generate(policy_params),
      error = function(e) NULL
    )
    parsed <- agent_runtime_parse_policy_decision(policy_result$text %||% "")
    if (!is.null(parsed)) {
      return(parsed)
    }
    trace <- run_trace_add(trace, "policy_parse_failure", list(
      reason = reason,
      text = agent_runtime_text(policy_result$text %||% "", width = 500)
    ))
  }

  agent_runtime_policy_fallback(
    reason = reason,
    task_state = task_state,
    all_tool_results = all_tool_results,
    budget = budget
  )
}

#' @keywords internal
agent_runtime_build_final_answer <- function(task_state,
                                             all_tool_results = list(),
                                             blocked = FALSE) {
  observations <- task_state$observations %||% list()
  if (length(observations) == 0) {
    if (isTRUE(blocked)) {
      return("I could not complete the task because the run is blocked and no usable tool observations were produced.")
    }
    return("I completed the run, but there were no tool observations to summarize.")
  }

  tail_obs <- utils::tail(observations, 6)
  lines <- vapply(tail_obs, function(obs) {
    status <- if (isTRUE(obs$is_error)) "error" else "ok"
    sprintf("- %s [%s]: %s", obs$name %||% "unknown", status, obs$result %||% "")
  }, character(1))

  if (isTRUE(blocked)) {
    intro <- "I could not safely continue the task. Here is the latest evidence:"
  } else if (agent_runtime_tool_results_have_error(all_tool_results) &&
             !agent_runtime_tool_results_have_success(all_tool_results)) {
    intro <- "The tool work did not complete successfully. Here is the latest evidence:"
  } else {
    intro <- "I completed the tool work. Here is the result:"
  }

  paste(c(intro, lines), collapse = "\n")
}

#' @keywords internal
agent_runtime_generate_final_answer <- function(model,
                                                messages,
                                                base_params = list(),
                                                task_state,
                                                all_tool_results = list(),
                                                blocked = FALSE,
                                                reason = "finalize") {
  fallback <- agent_runtime_build_final_answer(
    task_state = task_state,
    all_tool_results = all_tool_results,
    blocked = blocked
  )

  if (is.null(model) || length(all_tool_results %||% list()) == 0) {
    return(fallback)
  }

  finalizer_params <- base_params %||% list()
  finalizer_params$tools <- NULL
  finalizer_params$messages <- c(
    messages %||% list(),
    list(list(
      role = "user",
      content = paste(
        "The agent executed tools but did not produce a visible final answer.",
        "Write the final user-visible response now.",
        "Do not call tools. Do not mention hidden policy or runtime internals.",
        "Use the tool observations as evidence. Mention concrete files, paths, results, errors, and next steps when relevant.",
        "If the task is blocked, explain what was attempted and what input or state change is required.",
        "",
        paste0("Blocked: ", if (isTRUE(blocked)) "true" else "false"),
        paste0("Reason: ", reason %||% "finalize"),
        paste0("User goal: ", task_state$goal %||% ""),
        paste0("Tool observations: ", safe_to_json(task_state$observations %||% list(), auto_unbox = TRUE)),
        sep = "\n"
      )
    ))
  )

  finalizer_result <- tryCatch(
    model$do_generate(finalizer_params),
    error = function(e) NULL
  )
  if (is.null(finalizer_result)) {
    return(fallback)
  }

  finalizer_result <- recover_text_final_answer(finalizer_result)
  text <- trimws(finalizer_result$text %||% "")
  if (!nzchar(text)) {
    return(fallback)
  }

  text
}

#' @keywords internal
agent_runtime_append_provider_messages <- function(messages,
                                                   model,
                                                   result,
                                                   tool_results,
                                                   require_post_tool_protocol = FALSE,
                                                   use_text_tool_fallback = FALSE) {
  if (isTRUE(use_text_tool_fallback)) {
    return(list(
      messages = append_text_tool_result_messages(messages, result, tool_results),
      awaiting_post_tool_protocol = TRUE
    ))
  }

  has_tool_calls <- agent_runtime_has_tool_calls(result)
  assistant_message <- list(
    role = "assistant",
    content = if (isTRUE(has_tool_calls)) "" else result$text %||% ""
  )
  history_format <- model$get_history_format()

  if (identical(history_format, "openai")) {
    assistant_message$tool_calls <- lapply(result$tool_calls, function(tc) {
      list(
        id = tc$id,
        type = "function",
        `function` = list(
          name = tc$name,
          arguments = safe_to_json(tc$arguments, auto_unbox = TRUE)
        )
      )
    })
    if (isTRUE(model$capabilities$preserve_reasoning_content) &&
        !is.null(result$reasoning) &&
        nzchar(result$reasoning)) {
      assistant_message$reasoning_content <- result$reasoning
    }
  } else if (identical(history_format, "anthropic")) {
    assistant_message$content <- if (isTRUE(has_tool_calls)) {
      agent_runtime_anthropic_content_without_text(result$raw_response$content)
    } else {
      result$raw_response$content
    }
  }

  messages <- c(messages, list(assistant_message))
  for (tr in tool_results) {
    messages <- c(messages, list(model$format_tool_result(tr$id, tr$name, tr$result)))
  }

  awaiting <- FALSE
  if (isTRUE(require_post_tool_protocol)) {
    messages <- append_post_tool_protocol_message(
      messages,
      use_text_tool_fallback = FALSE
    )
    awaiting <- TRUE
  }

  list(messages = messages, awaiting_post_tool_protocol = awaiting)
}

#' @keywords internal
agent_runtime_make_blocked_result <- function(message,
                                              reason = "blocked",
                                              run_id = NULL) {
  GenerateResult$new(
    text = message %||% "",
    finish_reason = "blocked",
    warnings = reason
  )
}

#' @keywords internal
agent_runtime_deliver_final_text <- function(text,
                                             stream = FALSE,
                                             callback = NULL,
                                             renderer = NULL) {
  if (!isTRUE(stream) || !nzchar(text %||% "")) {
    return(invisible(NULL))
  }
  if (interactive() && !is.null(renderer) && is.null(callback)) {
    renderer$process_chunk(text, FALSE)
    renderer$process_chunk(NULL, TRUE)
  }
  if (!is.null(callback)) {
    callback(text, FALSE)
    callback(NULL, TRUE)
  }
  invisible(NULL)
}

#' @keywords internal
run_agent_runtime <- function(model,
                              messages,
                              base_params = list(),
                              tools = NULL,
                              session = NULL,
                              hooks = NULL,
                              stream = FALSE,
                              callback = NULL,
                              renderer = NULL,
                              run_id = NULL,
                              max_steps = 1,
                              max_tool_result_errors = 2,
                              require_post_tool_protocol = FALSE,
                              use_text_tool_fallback = FALSE,
                              initial_messages_len = length(messages),
                              stream_event_callback = NULL,
                              policy_config = list()) {
  run_id <- run_id %||% paste0("run_", generate_stable_id("agent_runtime", Sys.time(), stats::runif(1)))
  raw_window_size <- max_steps %||% 1L
  window_size <- if (is.numeric(raw_window_size) && length(raw_window_size) == 1L && is.finite(raw_window_size)) {
    max(1L, as.integer(raw_window_size))
  } else {
    20L
  }
  raw_max_total_steps <- policy_config$max_total_steps %||% max(20L, window_size * 8L)
  max_total_steps <- if (is.numeric(raw_max_total_steps) &&
                         length(raw_max_total_steps) == 1L &&
                         is.finite(raw_max_total_steps)) {
    max(1L, as.integer(raw_max_total_steps))
  } else {
    Inf
  }
  repeated_call_limit <- as.integer(policy_config$max_identical_tool_calls %||% 3L)

  task_state <- new_task_state(
    goal = agent_runtime_goal_from_messages(messages),
    status = "running",
    phase = "model_call",
    run_id = run_id,
    budget = list(
      window_steps = window_size,
      max_total_steps = max_total_steps,
      total_steps = 0L,
      execution_windows = 1L,
      max_tool_result_errors = max_tool_result_errors
    )
  )
  trace <- new_run_trace(run_id = run_id)
  decision <- new_agent_decision("continue", reason = "start")

  all_tool_calls <- list()
  all_tool_results <- list()
  stream_events <- list()
  result <- NULL
  step <- 0L
  window_step <- 0L
  execution_windows <- 1L
  awaiting_post_tool_protocol <- FALSE
  last_tool_signature <- NULL
  repeated_identical_calls <- 0L

  record_stream_event <- function(type,
                                  text = NULL,
                                  done = FALSE,
                                  metadata = list()) {
    event <- c(
      list(
        type = type,
        text = text,
        done = isTRUE(done),
        step = step
      ),
      metadata %||% list()
    )
    stream_events[[length(stream_events) + 1L]] <<- event
    agent_runtime_emit_stream_event(
      stream_event_callback,
      type = type,
      text = text,
      done = done,
      step = step,
      metadata = metadata
    )
    invisible(event)
  }

  finalize_result <- function(final_text, reason = "finalize", blocked = FALSE) {
    if (is.null(result)) {
      result <<- GenerateResult$new()
    }
    result$text <<- final_text %||% ""
    result$tool_calls <<- NULL
    result$finish_reason <<- if (isTRUE(blocked)) "blocked" else "stop"
    task_state <<- task_state_set_status(
      task_state,
      status = if (isTRUE(blocked)) "blocked" else "completed",
      phase = if (isTRUE(blocked)) "blocked" else "completed",
      blocker = if (isTRUE(blocked)) reason else NULL
    )
    task_state$can_finalize <<- TRUE
    trace <<- run_trace_add(trace, "finalizer_output", list(
      reason = reason,
      blocked = isTRUE(blocked),
      text = agent_runtime_text(final_text, width = 1000)
    ))
    if (isTRUE(stream) && is.function(stream_event_callback)) {
      record_stream_event(
        "final_text",
        text = final_text,
        metadata = list(reason = reason, blocked = isTRUE(blocked))
      )
      record_stream_event("done", done = TRUE)
    } else {
      agent_runtime_deliver_final_text(
        final_text,
        stream = stream,
        callback = callback,
        renderer = renderer
      )
    }
  }

  final_text_from_state <- function(blocked = FALSE, reason = "finalize") {
    finalizer_messages <- utils::head(messages %||% list(), initial_messages_len)
    agent_runtime_generate_final_answer(
      model = model,
      messages = finalizer_messages,
      base_params = base_params,
      task_state = task_state,
      all_tool_results = all_tool_results,
      blocked = blocked,
      reason = reason
    )
  }

  tryCatch(
    {
      repeat {
        if (window_step >= window_size) {
          budget <- list(
            total_steps = step,
            max_total_steps = max_total_steps,
            window_steps = window_size,
            execution_windows = execution_windows
          )
          decision <- agent_runtime_policy_decide(
            reason = "window_boundary",
            task_state = task_state,
            trace = trace,
            model = model,
            messages = messages,
            base_params = base_params,
            all_tool_results = all_tool_results,
            budget = budget,
            policy_config = policy_config
          )
          task_state <- task_state_set_decision(task_state, decision)
          trace <- run_trace_add(trace, "policy_decision", list(
            boundary = "window",
            decision = decision
          ))

          if (identical(decision$decision, "continue")) {
            execution_windows <- execution_windows + 1L
            window_step <- 0L
            task_state <- task_state_set_status(task_state, "continuing", phase = "execution_window")
          } else if (identical(decision$decision, "finalize")) {
            finalize_result(
              final_text_from_state(reason = decision$reason),
              reason = decision$reason
            )
            break
          } else if (identical(decision$decision, "ask_user")) {
            final_text <- decision$needs_user_question %||% "I need your input before I can safely continue."
            finalize_result(final_text, reason = decision$reason)
            task_state <- task_state_set_status(task_state, "waiting_user", phase = "waiting_user")
            result$finish_reason <- "waiting_user"
            break
          } else {
            finalize_result(
              final_text_from_state(blocked = TRUE, reason = decision$reason),
              reason = decision$reason,
              blocked = TRUE
            )
            break
          }
        }

        if (step >= max_total_steps) {
          reason <- if (length(all_tool_results) > 0) "tool_result_budget_exhausted" else "no_progress_budget_exhausted"
          decision <- agent_runtime_policy_decide(
            reason = reason,
            task_state = task_state,
            trace = trace,
            model = model,
            messages = messages,
            base_params = base_params,
            all_tool_results = all_tool_results,
            budget = list(total_steps = step, max_total_steps = max_total_steps),
            policy_config = policy_config
          )
          task_state <- task_state_set_decision(task_state, decision)
          trace <- run_trace_add(trace, "policy_decision", list(
            boundary = "total_budget",
            decision = decision
          ))
          blocked <- identical(decision$decision, "blocked")
          finalize_result(
            final_text_from_state(blocked = blocked, reason = decision$reason),
            reason = decision$reason,
            blocked = blocked
          )
          break
        }

        step <- step + 1L
        window_step <- window_step + 1L
        task_state$budget$total_steps <- step
        task_state$budget$execution_windows <- execution_windows
        task_state <- task_state_set_status(task_state, if (step == 1L) "running" else "continuing", phase = "model_call")

        params <- c(list(messages = messages), base_params)
        filter_protocol_output <- isTRUE(stream) &&
          isTRUE(require_post_tool_protocol) &&
          isTRUE(awaiting_post_tool_protocol)
        protocol_markup_filter <- if (isTRUE(filter_protocol_output)) {
          new_tool_protocol_markup_filter()
        } else {
          NULL
        }

        trace <- run_trace_add(trace, "model_call", list(step = step, window = execution_windows))
        if (isTRUE(stream)) {
          step_stream_chunks <- character()
          step_stream_has_visible_text <- FALSE
          thinking_markup_filter <- if (is.function(stream_event_callback)) {
            new_agent_runtime_thinking_markup_filter()
          } else {
            NULL
          }
          if (interactive() && !is.null(renderer)) {
            renderer$start_thinking()
          }
          result <- model$do_stream(params, function(chunk, done) {
            display_chunk <- chunk
            if (!is.null(protocol_markup_filter)) {
              display_chunk <- protocol_markup_filter$process(chunk, done)
            }

            if (is.function(stream_event_callback)) {
              split_chunk <- thinking_markup_filter$process(display_chunk, done = done)
              if (nzchar(split_chunk$thinking %||% "")) {
                record_stream_event(
                  "thinking_text",
                  text = split_chunk$thinking,
                  metadata = list(reason = "provider_reasoning")
                )
              }
              if (nzchar(split_chunk$visible %||% "")) {
                visible_has_content <- nzchar(trimws(split_chunk$visible))
                if (isTRUE(visible_has_content) || isTRUE(step_stream_has_visible_text)) {
                  step_stream_chunks <<- c(step_stream_chunks, split_chunk$visible)
                  if (isTRUE(visible_has_content)) {
                    step_stream_has_visible_text <<- TRUE
                  }
                  record_stream_event(
                    "text_delta",
                    text = split_chunk$visible,
                    metadata = list(reason = "assistant_text")
                  )
                }
              }
              if (interactive() && !is.null(renderer)) {
                renderer$stop_thinking()
              }
            } else if (interactive() && !is.null(renderer)) {
              if (!is.null(callback)) {
                renderer$stop_thinking()
              } else {
                renderer$process_chunk(display_chunk, done)
              }
            }
            if (!is.function(stream_event_callback) && !is.null(callback)) {
              callback(display_chunk, done)
            }
          })
          if (is.function(stream_event_callback) &&
              !nzchar(result$text %||% "") &&
              length(step_stream_chunks) > 0) {
            result$text <- paste(step_stream_chunks, collapse = "")
          }
        } else {
          result <- model$do_generate(params)
        }

        result <- recover_text_tool_calls(result)
        result <- recover_text_final_answer(result)
        trace <- run_trace_add(trace, "model_response", list(
          step = step,
          finish_reason = result$finish_reason %||% NULL,
          text = agent_runtime_text(result$text %||% "", width = 800),
          tool_call_count = length(result$tool_calls %||% list())
        ))

        if (text_tool_protocol_missing(result, awaiting_post_tool_protocol)) {
          if (isTRUE(stream) &&
              is.function(stream_event_callback) &&
              nzchar(result$text %||% "")) {
            record_stream_event(
              "intermediate_text",
              text = result$text,
              metadata = list(reason = "protocol_correction")
            )
          }
          messages <- c(messages, list(text_tool_protocol_correction_message(
            result,
            use_text_tool_fallback = use_text_tool_fallback
          )))
          trace <- run_trace_add(trace, "protocol_correction", list(step = step))
          if (interactive() && isTRUE(stream) && !is.null(renderer)) {
            renderer$reset_for_new_step()
          }
          next
        }

        awaiting_post_tool_protocol <- FALSE

        if (agent_runtime_has_tool_calls(result) && length(tools %||% list()) > 0) {
          if (isTRUE(stream) &&
              is.function(stream_event_callback) &&
              nzchar(result$text %||% "")) {
            record_stream_event(
              "intermediate_text",
              text = result$text,
              metadata = list(reason = "tool_call")
            )
          }
          all_tool_calls <- c(all_tool_calls, result$tool_calls)
          task_state <- task_state_set_status(task_state, "running", phase = "tool_execution")
          task_state$budget$tool_calls <- length(all_tool_calls)

          current_signature <- agent_runtime_tool_signature(result$tool_calls)
          if (identical(current_signature, last_tool_signature)) {
            repeated_identical_calls <- repeated_identical_calls + 1L
          } else {
            repeated_identical_calls <- 0L
            last_tool_signature <- current_signature
          }

          if (repeated_identical_calls >= repeated_call_limit) {
            decision <- agent_runtime_policy_decide(
              reason = "repeated_identical_tool_calls",
              task_state = task_state,
              trace = trace,
              model = model,
              messages = messages,
              base_params = base_params,
              all_tool_results = all_tool_results,
              budget = list(total_steps = step, max_total_steps = max_total_steps),
              policy_config = policy_config
            )
            task_state <- task_state_set_decision(task_state, decision)
            trace <- run_trace_add(trace, "policy_decision", list(
              boundary = "repeated_tool_call",
              decision = decision
            ))
            finalize_result(
              final_text_from_state(blocked = TRUE, reason = decision$reason),
              reason = decision$reason,
              blocked = TRUE
            )
            break
          }

          tool_envir <- if (!is.null(session)) session$get_envir() else NULL
          if (interactive()) {
            for (tc in result$tool_calls) {
              if (isTRUE(stream) && !is.null(renderer)) {
                renderer$render_tool_start(tc$name, tc$arguments)
              } else {
                print_tool_execution(tc$name, tc$arguments)
              }
            }
          }

          tool_results <- tryCatch(
            execute_tool_calls(result$tool_calls, tools, hooks, envir = tool_envir),
            error = function(e) {
              lapply(result$tool_calls, function(tc) {
                list(
                  id = tc$id,
                  name = tc$name,
                  result = paste0("Error executing tool: ", conditionMessage(e)),
                  raw_result = NULL,
                  is_error = TRUE
                )
              })
            }
          )
          all_tool_results <- c(all_tool_results, tool_results)
          task_state <- task_state_add_tool_results(task_state, tool_results)
          trace <- run_trace_add(trace, "tool_results", list(
            step = step,
            results = lapply(tool_results, agent_runtime_tool_observation)
          ))

          if (agent_runtime_tool_results_have_error(tool_results)) {
            decision <- agent_runtime_policy_decide(
              reason = "tool_result_errors",
              task_state = task_state,
              trace = trace,
              model = model,
              messages = messages,
              base_params = base_params,
              all_tool_results = all_tool_results,
              budget = list(total_steps = step, max_total_steps = max_total_steps),
              policy_config = policy_config
            )
            task_state <- task_state_set_decision(task_state, decision)
            trace <- run_trace_add(trace, "policy_decision", list(
              boundary = "tool_result_errors",
              decision = decision
            ))
            if (identical(decision$decision, "ask_user")) {
              finalize_result(
                decision$needs_user_question %||% agent_runtime_build_final_answer(task_state, all_tool_results),
                reason = decision$reason
              )
              task_state <- task_state_set_status(task_state, "waiting_user", phase = "waiting_user")
              result$finish_reason <- "waiting_user"
              break
            } else if (identical(decision$decision, "blocked") ||
                       identical(decision$decision, "abort_for_safety")) {
              finalize_result(
                final_text_from_state(blocked = TRUE, reason = decision$reason),
                reason = decision$reason,
                blocked = TRUE
              )
              if (identical(decision$decision, "abort_for_safety")) {
                task_state <- task_state_set_status(task_state, "aborted_safety", phase = "aborted_safety")
                result$finish_reason <- "aborted_safety"
              }
              break
            }
          }

          if (interactive()) {
            for (tr in tool_results) {
              if (isTRUE(stream) && !is.null(renderer)) {
                renderer$render_tool_result(
                  tr$name,
                  tr$result,
                  success = !isTRUE(tr$is_error),
                  raw_result = tr$raw_result %||% tr$result
                )
              } else {
                print_tool_result(
                  tr$name,
                  tr$result,
                  success = !isTRUE(tr$is_error),
                  raw_result = tr$raw_result %||% tr$result
                )
              }
            }
          }

          appended <- agent_runtime_append_provider_messages(
            messages = messages,
            model = model,
            result = result,
            tool_results = tool_results,
            require_post_tool_protocol = require_post_tool_protocol,
            use_text_tool_fallback = use_text_tool_fallback
          )
          messages <- appended$messages
          awaiting_post_tool_protocol <- appended$awaiting_post_tool_protocol

          if (interactive() && isTRUE(stream) && !is.null(renderer)) {
            renderer$reset_for_new_step()
          }
          next
        }

        text <- trimws(result$text %||% "")
        if (!nzchar(text)) {
          empty_reason <- if (length(all_tool_results) > 0) {
            "empty_after_tools"
          } else {
            "empty_no_progress"
          }

          if (identical(empty_reason, "empty_after_tools")) {
            decision <- new_agent_decision(
              "finalize",
              reason = empty_reason,
              final_answer_hint = "Summarize the tool evidence for the user."
            )
            task_state <- task_state_set_decision(task_state, decision)
            trace <- run_trace_add(trace, "policy_decision", list(
              boundary = empty_reason,
              decision = decision
            ))
            finalize_result(
              final_text_from_state(reason = decision$reason),
              reason = decision$reason
            )
            break
          }

          decision <- agent_runtime_policy_decide(
            reason = empty_reason,
            task_state = task_state,
            trace = trace,
            model = model,
            messages = messages,
            base_params = base_params,
            all_tool_results = all_tool_results,
            budget = list(total_steps = step, max_total_steps = max_total_steps),
            policy_config = policy_config
          )
          task_state <- task_state_set_decision(task_state, decision)
          trace <- run_trace_add(trace, "policy_decision", list(
            boundary = empty_reason,
            decision = decision
          ))

          if (identical(decision$decision, "continue")) {
            correction_text <- if (length(all_tool_results) > 0) {
              paste(
                "You executed tools but produced no visible answer.",
                "Continue with the next necessary tool call or provide the final answer now.",
                "Do not return an empty response.",
                sep = "\n"
              )
            } else {
              paste(
                "Your previous response produced no visible answer and made no tool call.",
                "Continue the user's task now: either use the appropriate tool or provide a visible answer.",
                "Do not return only reasoning or an empty response.",
                sep = "\n"
              )
            }
            messages <- c(messages, list(list(
              role = "user",
              content = correction_text
            )))
            if (interactive() && isTRUE(stream) && !is.null(renderer)) {
              renderer$reset_for_new_step()
            }
            next
          }

          blocked <- identical(decision$decision, "blocked")
          finalize_result(
            final_text_from_state(blocked = blocked, reason = decision$reason),
            reason = decision$reason,
            blocked = blocked
          )
          break
        }

        task_state <- task_state_set_status(task_state, "completed", phase = "completed")
        task_state$can_finalize <- TRUE
        decision <- new_agent_decision("finalize", reason = "completed")
        task_state <- task_state_set_decision(task_state, decision)
        if (isTRUE(stream) && is.function(stream_event_callback)) {
          final_already_streamed <- length(step_stream_chunks) > 0 &&
            identical(paste(step_stream_chunks, collapse = ""), result$text %||% "")
          record_stream_event(
            "final_text",
            text = result$text %||% "",
            metadata = list(
              reason = "completed",
              blocked = FALSE,
              already_streamed = isTRUE(final_already_streamed)
            )
          )
          record_stream_event("done", done = TRUE)
        }
        break
      }
    },
    error = function(e) {
      if (is_network_error_condition(e)) {
        handle_network_error(e, rethrow = FALSE)
        decision <<- new_agent_decision("blocked", reason = "network_error")
        task_state <<- task_state_set_decision(task_state, decision)
        task_state <<- task_state_set_status(
          task_state,
          "blocked",
          phase = "blocked",
          blocker = conditionMessage(e)
        )
        result <<- agent_runtime_make_blocked_result(
          message = "",
          reason = conditionMessage(e),
          run_id = run_id
        )
        trace <<- run_trace_add(trace, "network_error", list(message = conditionMessage(e)))
      } else {
        stop(e)
      }
    }
  )

  if (is.null(result)) {
    result <- GenerateResult$new(text = "", finish_reason = "error")
    task_state <- task_state_set_status(task_state, "error", phase = "error", blocker = "runtime_returned_no_result")
    decision <- new_agent_decision("blocked", reason = "runtime_returned_no_result")
    task_state <- task_state_set_decision(task_state, decision)
  }

  result$steps <- step
  result$all_tool_calls <- all_tool_calls
  result$all_tool_results <- all_tool_results
  result$stream_events <- stream_events
  result$messages_added <- build_messages_added(
    messages = messages,
    initial_len = initial_messages_len,
    final_text = result$text %||% NULL,
    final_reasoning = result$reasoning %||% NULL
  )

  task_state$budget$total_steps <- step
  task_state$budget$execution_windows <- execution_windows
  task_state$budget$tool_calls <- length(all_tool_calls)
  task_state$last_tool_results <- run_state_tool_results_tail(all_tool_results)
  task_state$updated_at <- format(Sys.time(), "%Y-%m-%dT%H:%M:%OS3%z")

  result$task_state <- task_state
  result$run_state <- task_state
  result$run_trace <- trace
  result$decision <- decision
  result
}

Try the aisdk package in your browser

Any scripts or data that you put into this service are public.

aisdk documentation built on May 29, 2026, 9:07 a.m.