R/sse_aggregator.R

Defines functions map_anthropic_chunk map_openai_chunk is_empty_args_internal

Documented in map_anthropic_chunk map_openai_chunk

#' @title SSE Stream Aggregator
#' @description
#' Universal chunk aggregator for Server-Sent Events (SSE) streaming.
#' Manages all chunk-level state: text accumulation, reasoning/thinking
#' transitions, tool call assembly, usage tracking, and result finalization.
#'
#' This is a pure aggregator; it does not know about HTTP, SSE parsing,
#' or provider-specific event types. Provider event mappers call its methods.
#'
#' @name sse_aggregator
#' @keywords internal
NULL

#' @title SSEAggregator R6 Class
#' @description
#' Accumulates streaming chunks into a final GenerateResult.
#'
#' Handles two tool call formats:
#' - **OpenAI format**: Chunked deltas with index, id, function.name, function.arguments
#' - **Anthropic format**: content_block_start with id+name, then input_json_delta chunks
#'
#' @keywords internal
SSEAggregator <- R6::R6Class(
    "SSEAggregator",
    public = list(
        #' @description Initialize the aggregator.
        #' @param callback User callback function: callback(text, done).
        initialize = function(callback) {
            private$callback <- callback
            private$full_text <- ""
            private$full_reasoning <- ""
            private$is_reasoning <- FALSE
            private$inline_thinking_buffer <- ""
            private$inline_thinking_active <- FALSE
            private$tool_calls_acc <- list()
            private$tool_args_acc <- list()
            private$finish_reason <- NULL
            private$full_usage <- NULL
            private$last_response <- NULL
        },

        # ------------------------------------------------------------------
        # Content methods
        # ------------------------------------------------------------------

        #' @description Handle a text content delta.
        #' @param text The text chunk.
        on_text_delta = function(text) {
            if (is.null(text) || nchar(text) == 0) {
                return(invisible())
            }
            private$process_inline_thinking(text, done = FALSE)
        },

        #' @description Handle a reasoning/thinking content delta.
        #' @param text The reasoning text chunk.
        on_reasoning_delta = function(text) {
            if (is.null(text) || nchar(text) == 0) {
                return(invisible())
            }
            private$emit_reasoning_delta(text)
        },

        #' @description Signal the start of a reasoning block (Anthropic thinking).
        on_reasoning_start = function() {
            if (!private$is_reasoning) {
                private$callback("<think>\n", done = FALSE)
                private$is_reasoning <- TRUE
            }
        },

        #' @description Signal content block stop (closes reasoning if open).
        on_block_stop = function() {
            if (private$is_reasoning) {
                private$callback("\n</think>\n\n", done = FALSE)
                private$is_reasoning <- FALSE
            }
        },

        # ------------------------------------------------------------------
        # Tool call methods — OpenAI format (chunked deltas)
        # ------------------------------------------------------------------

        #' @description Handle OpenAI-format tool call deltas.
        #' @param tool_calls List of tool call delta objects from the choices delta.
        on_tool_call_delta = function(tool_calls) {
            if (is.null(tool_calls)) {
                return(invisible())
            }
            for (tc in tool_calls) {
                idx <- if (!is.null(tc$index)) tc$index + 1 else length(private$tool_calls_acc) + 1
                private$tool_calls_acc <- private$ensure_index(
                    private$tool_calls_acc, idx,
                    list(id = "", name = "", arguments = "", arguments_is_list = FALSE)
                )

                if (!is.null(tc$id)) {
                    private$tool_calls_acc[[idx]]$id <- paste0(private$tool_calls_acc[[idx]]$id, tc$id)
                }

                # Extract name from multiple possible locations
                name_val <- NULL
                if (!is.null(tc$`function`$name)) name_val <- tc$`function`$name
                if (is.null(name_val) && !is.null(tc$name)) name_val <- tc$name
                if (is.null(name_val) && !is.null(tc$tool_name)) name_val <- tc$tool_name
                if (!is.null(name_val)) {
                    private$tool_calls_acc[[idx]]$name <- paste0(private$tool_calls_acc[[idx]]$name, name_val)
                }

                # Extract arguments from multiple possible locations
                args_val <- NULL
                if (!is.null(tc$`function`$arguments)) args_val <- tc$`function`$arguments
                if (is.null(args_val) && !is.null(tc$arguments)) args_val <- tc$arguments
                if (!is.null(args_val)) {
                    if (is.list(args_val) && !is.character(args_val)) {
                        private$tool_calls_acc[[idx]]$arguments <- args_val
                        private$tool_calls_acc[[idx]]$arguments_is_list <- TRUE
                    } else {
                        private$tool_calls_acc[[idx]]$arguments <- paste0(
                            private$tool_calls_acc[[idx]]$arguments, args_val
                        )
                    }
                }
            }
        },

        # ------------------------------------------------------------------
        # Tool call methods — Anthropic format (block start + input deltas)
        # ------------------------------------------------------------------

        #' @description Handle Anthropic-format tool use block start.
        #' @param index Block index (0-based from API, converted to 1-based internally).
        #' @param id Tool call ID.
        #' @param name Tool name.
        #' @param input Initial input (usually NULL or empty).
        on_tool_start = function(index, id, name, input = NULL) {
            idx <- index + 1
            private$tool_calls_acc <- private$ensure_index(private$tool_calls_acc, idx, NULL)
            private$tool_calls_acc[[idx]] <- list(
                id = id %||% "",
                name = name %||% "",
                arguments = input
            )
            private$tool_args_acc <- private$ensure_index(private$tool_args_acc, idx, "")
        },

        #' @description Handle Anthropic-format input_json_delta.
        #' @param index Block index (0-based from API).
        #' @param partial_json Partial JSON string.
        on_tool_input_delta = function(index, partial_json) {
            idx <- index + 1
            private$tool_args_acc <- private$ensure_index(private$tool_args_acc, idx, "")
            private$tool_args_acc[[idx]] <- paste0(private$tool_args_acc[[idx]], partial_json)
        },

        # ------------------------------------------------------------------
        # Metadata methods
        # ------------------------------------------------------------------

        #' @description Store finish reason.
        #' @param reason The finish reason string.
        on_finish_reason = function(reason) {
            if (!is.null(reason)) {
                private$finish_reason <- reason
            }
        },

        #' @description Store usage information.
        #' @param usage Usage list.
        on_usage = function(usage) {
            if (!is.null(usage)) {
                private$full_usage <- usage
            }
        },

        #' @description Store last raw response for diagnostics.
        #' @param response The raw response data.
        on_raw_response = function(response) {
            private$last_response <- response
        },

        #' @description Signal stream completion.
        on_done = function() {
            private$process_inline_thinking("", done = TRUE)
            if (private$is_reasoning) {
                private$callback("\n</think>\n\n", done = FALSE)
                private$is_reasoning <- FALSE
            }
            private$callback(NULL, done = TRUE)
        },

        # ------------------------------------------------------------------
        # Finalization
        # ------------------------------------------------------------------

        #' @description Finalize accumulated state into a GenerateResult.
        #' @return A GenerateResult object.
        build_result = function() {
            final_tool_calls <- private$finalize_tool_calls()

            GenerateResult$new(
                text = private$full_text,
                usage = private$full_usage,
                finish_reason = private$finish_reason,
                raw_response = private$last_response,
                tool_calls = final_tool_calls,
                reasoning = private$full_reasoning
            )
        }
    ),
    private = list(
        callback = NULL,
        full_text = "",
        full_reasoning = "",
        is_reasoning = FALSE,
        inline_thinking_buffer = "",
        inline_thinking_active = FALSE,
        tool_calls_acc = list(),
        tool_args_acc = list(),
        finish_reason = NULL,
        full_usage = NULL,
        last_response = NULL,
        emit_text_delta = function(text) {
            if (is.null(text) || nchar(text) == 0) {
                return(invisible())
            }
            # Close reasoning block if transitioning
            if (private$is_reasoning) {
                private$callback("\n</think>\n\n", done = FALSE)
                private$is_reasoning <- FALSE
            }
            private$full_text <- paste0(private$full_text, text)
            private$callback(text, done = FALSE)
        },
        emit_reasoning_delta = function(text) {
            if (is.null(text) || nchar(text) == 0) {
                return(invisible())
            }
            if (!private$is_reasoning) {
                private$callback("<think>\n", done = FALSE)
                private$is_reasoning <- TRUE
            }
            private$full_reasoning <- paste0(private$full_reasoning, text)
            private$callback(text, done = FALSE)
        },
        emit_reasoning_close = function() {
            if (private$is_reasoning) {
                private$callback("\n</think>\n\n", done = FALSE)
                private$is_reasoning <- FALSE
            }
        },
        find_inline_thinking_tag = function(text, closing = FALSE) {
            if (!nzchar(text %||% "")) {
                return(NULL)
            }
            pattern <- if (isTRUE(closing)) {
                "</(think|thinking|reasoning)\\s*>"
            } else {
                "<(think|thinking|reasoning)\\b[^>]*>"
            }
            match <- regexpr(pattern, text, perl = TRUE, ignore.case = TRUE)
            pos <- as.integer(match[[1]])
            if (pos < 0L) {
                return(NULL)
            }
            list(
                start = pos,
                end = pos + attr(match, "match.length")[[1]] - 1L
            )
        },
        pending_inline_thinking_suffix = function(text, closing = FALSE) {
            if (!nzchar(text %||% "")) {
                return("")
            }

            lower <- tolower(text)
            incomplete <- if (isTRUE(closing)) {
                regexpr("</(think|thinking|reasoning)\\b[^>]*$", lower, perl = TRUE)
            } else {
                regexpr("<(think|thinking|reasoning)\\b[^>]*$", lower, perl = TRUE)
            }
            pos <- as.integer(incomplete[[1]])
            if (pos > 0L) {
                return(substr(text, pos, nchar(text)))
            }

            prefixes <- if (isTRUE(closing)) {
                c("</think>", "</thinking>", "</reasoning>")
            } else {
                c("<think>", "<thinking>", "<reasoning>")
            }
            best <- ""
            for (pattern in prefixes) {
                max_len <- min(nchar(pattern) - 1L, nchar(text))
                if (max_len <= 0L) {
                    next
                }
                for (len in seq.int(max_len, 1L)) {
                    suffix <- substr(lower, nchar(lower) - len + 1L, nchar(lower))
                    prefix <- substr(pattern, 1L, len)
                    if (identical(suffix, prefix) && len > nchar(best)) {
                        best <- substr(text, nchar(text) - len + 1L, nchar(text))
                    }
                }
            }
            best
        },
        process_inline_thinking = function(text, done = FALSE) {
            private$inline_thinking_buffer <- paste0(private$inline_thinking_buffer, text %||% "")
            if (!nzchar(private$inline_thinking_buffer)) {
                return(invisible())
            }

            repeat {
                buffer <- private$inline_thinking_buffer
                if (!nzchar(buffer)) {
                    break
                }

                if (isTRUE(private$inline_thinking_active)) {
                    close_tag <- private$find_inline_thinking_tag(buffer, closing = TRUE)
                    if (!is.null(close_tag)) {
                        if (close_tag$start > 1L) {
                            private$emit_reasoning_delta(substr(buffer, 1L, close_tag$start - 1L))
                        }
                        private$emit_reasoning_close()
                        private$inline_thinking_active <- FALSE
                        private$inline_thinking_buffer <- substr(buffer, close_tag$end + 1L, nchar(buffer))
                        next
                    }

                    pending <- if (isTRUE(done)) "" else private$pending_inline_thinking_suffix(buffer, closing = TRUE)
                    emit_len <- nchar(buffer) - nchar(pending)
                    if (emit_len > 0L) {
                        private$emit_reasoning_delta(substr(buffer, 1L, emit_len))
                    }
                    private$inline_thinking_buffer <- pending
                    break
                }

                open_tag <- private$find_inline_thinking_tag(buffer, closing = FALSE)
                if (!is.null(open_tag)) {
                    if (open_tag$start > 1L) {
                        private$emit_text_delta(substr(buffer, 1L, open_tag$start - 1L))
                    }
                    private$inline_thinking_active <- TRUE
                    private$inline_thinking_buffer <- substr(buffer, open_tag$end + 1L, nchar(buffer))
                    next
                }

                pending <- if (isTRUE(done)) "" else private$pending_inline_thinking_suffix(buffer, closing = FALSE)
                emit_len <- nchar(buffer) - nchar(pending)
                if (emit_len > 0L) {
                    private$emit_text_delta(substr(buffer, 1L, emit_len))
                }
                private$inline_thinking_buffer <- pending
                break
            }

            if (isTRUE(done) && nzchar(private$inline_thinking_buffer)) {
                if (isTRUE(private$inline_thinking_active)) {
                    private$emit_reasoning_delta(private$inline_thinking_buffer)
                } else {
                    private$emit_text_delta(private$inline_thinking_buffer)
                }
                private$inline_thinking_buffer <- ""
            }

            invisible()
        },
        ensure_index = function(lst, idx, default) {
            if (length(lst) < idx) {
                for (i in seq(from = length(lst) + 1, to = idx)) {
                    lst[[i]] <- default
                }
            }
            lst
        },
        finalize_tool_calls = function() {
            # Determine which format was used
            has_openai_format <- length(private$tool_calls_acc) > 0 &&
                !is.null(private$tool_calls_acc[[1]]) &&
                !is.null(private$tool_calls_acc[[1]]$arguments_is_list)

            has_anthropic_format <- length(private$tool_args_acc) > 0 &&
                any(nzchar(unlist(private$tool_args_acc)))

            if (has_openai_format) {
                return(private$finalize_openai_tool_calls())
            } else if (has_anthropic_format || length(private$tool_calls_acc) > 0) {
                return(private$finalize_anthropic_tool_calls())
            }

            NULL
        },
        finalize_openai_tool_calls = function() {
            if (length(private$tool_calls_acc) == 0) {
                return(NULL)
            }

            final <- lapply(private$tool_calls_acc, function(tc) {
                args_val <- if (isTRUE(tc$arguments_is_list)) {
                    tc$arguments
                } else {
                    parse_tool_arguments(tc$arguments, tool_name = tc$name)
                }
                list(id = tc$id, name = tc$name, arguments = args_val)
            })
            final <- Filter(function(tc) nzchar(tc$name %||% ""), final)
            if (length(final) == 0) NULL else final
        },
        finalize_anthropic_tool_calls = function() {
            all_indices <- union(seq_along(private$tool_calls_acc), seq_along(private$tool_args_acc))
            if (length(all_indices) == 0) {
                return(NULL)
            }

            final <- lapply(all_indices, function(idx) {
                tc <- if (idx <= length(private$tool_calls_acc)) private$tool_calls_acc[[idx]] else NULL
                raw_args <- if (idx <= length(private$tool_args_acc)) private$tool_args_acc[[idx]] else ""

                args_val <- if (!is.null(raw_args) && nzchar(raw_args)) {
                    parse_tool_arguments(raw_args, tool_name = tc$name %||% "unknown")
                } else if (!is.null(tc) && !is_empty_args_internal(tc$arguments)) {
                    tc$arguments
                } else {
                    list()
                }

                list(
                    id = if (!is.null(tc) && !is.null(tc$id)) tc$id else "",
                    name = if (!is.null(tc) && !is.null(tc$name)) tc$name else "",
                    arguments = args_val
                )
            })
            final <- Filter(function(tc) nzchar(tc$name %||% ""), final)
            if (length(final) == 0) NULL else final
        }
    )
)

# Internal helper — check if arguments are "empty" (NULL or empty list)
# Avoids dependency on provider_anthropic.R's is_empty_args
is_empty_args_internal <- function(args) {
    if (is.null(args)) {
        return(TRUE)
    }
    if (is.list(args) && length(args) == 0) {
        return(TRUE)
    }
    FALSE
}


# ============================================================================
# Event Mappers
# ============================================================================

#' @title Map OpenAI SSE chunk to aggregator calls
#' @description
#' Translates an OpenAI Chat Completions SSE data chunk into
#' the appropriate SSEAggregator method calls.
#' @param data Parsed JSON data from SSE event (or NULL if done).
#' @param done Logical, TRUE if stream is complete.
#' @param agg An SSEAggregator instance.
#' @keywords internal
map_openai_chunk <- function(data, done, agg) {
    if (done) {
        agg$on_done()
        return(invisible())
    }

    if (!is.null(data$choices) && length(data$choices) > 0) {
        agg$on_raw_response(data)
        delta <- data$choices[[1]]$delta
        choice <- data$choices[[1]]

        # Finish reason
        if (!is.null(choice$finish_reason)) {
            agg$on_finish_reason(choice$finish_reason)
        }

        # Reasoning content (DeepSeek / Doubao etc.)
        if (!is.null(delta$reasoning_content) && nchar(delta$reasoning_content) > 0) {
            agg$on_reasoning_delta(delta$reasoning_content)
        }

        # Text content
        if (!is.null(delta$content) && nchar(delta$content) > 0) {
            agg$on_text_delta(delta$content)
        }

        # Tool calls
        if (!is.null(delta$tool_calls)) {
            agg$on_tool_call_delta(delta$tool_calls)
        }
    }

    # Usage (may appear at top level, outside choices)
    if (!is.null(data$usage)) {
        agg$on_usage(data$usage)
    }
}


#' @title Map native Anthropic SSE event to aggregator calls
#' @description
#' Translates a native Anthropic Messages API SSE event into
#' the appropriate SSEAggregator method calls.
#' @param event_type SSE event type string (e.g. "content_block_delta").
#' @param event_data Parsed JSON data from SSE event.
#' @param agg An SSEAggregator instance.
#' @return Logical TRUE if the stream should break (message_stop received).
#' @keywords internal
map_anthropic_chunk <- function(event_type, event_data, agg) {
    agg$on_raw_response(event_data)

    if (event_type == "content_block_delta") {
        delta <- event_data$delta
        if (!is.null(delta)) {
            if (delta$type == "text_delta" && !is.null(delta$text)) {
                agg$on_text_delta(delta$text)
            } else if (delta$type == "thinking_delta" && !is.null(delta$thinking)) {
                agg$on_reasoning_delta(delta$thinking)
            } else if (delta$type == "input_json_delta" && !is.null(delta$partial_json)) {
                idx <- if (is.null(event_data$index)) 0 else event_data$index
                agg$on_tool_input_delta(idx, delta$partial_json)
            }
        }
    } else if (event_type == "content_block_start") {
        cb <- event_data$content_block
        idx <- if (is.null(event_data$index)) 0 else event_data$index

        if (!is.null(cb) && !is.null(cb$type)) {
            if (cb$type == "tool_use") {
                agg$on_tool_start(idx, cb$id, cb$name, cb$input)
            } else if (cb$type == "thinking") {
                agg$on_reasoning_start()
            }
            # "text" block start — no action needed
        }
    } else if (event_type == "content_block_stop") {
        agg$on_block_stop()
    } else if (event_type == "message_delta") {
        if (!is.null(event_data$delta$stop_reason)) {
            agg$on_finish_reason(event_data$delta$stop_reason)
        }
        if (!is.null(event_data$usage)) {
            agg$on_usage(list(
                prompt_tokens = 0,
                completion_tokens = event_data$usage$output_tokens,
                total_tokens = event_data$usage$output_tokens
            ))
        }
    } else if (event_type == "message_stop") {
        agg$on_done()
        return(TRUE) # Signal to break the loop
    }

    FALSE # Continue processing
}

Try the aisdk package in your browser

Any scripts or data that you put into this service are public.

aisdk documentation built on May 29, 2026, 9:07 a.m.