R/queue.R

# simple FIFO queue that maintains a pushback queue
# for when data is prepended
Queue <- R6::R6Class(
    "Queue",
    public = list(
        initialize = function(init = 20) {
            private$.pushback_queue <- fastmap::fastqueue(init = ifelse(init > 1, init / 2, 1))
            private$.queue <- fastmap::fastqueue(init = init)
        },
        add = function(x, pushback = FALSE) {
            if (!pushback) {
                private$.queue$add(x)
            } else {
                private$.pushback_queue$add(x)
            }
        },
        remove = function(n = 1, missing = NULL) {
            pushback_len <- private$.pushback_queue$size()
            queue_len <- private$.queue$size()
            if (n == -1) {
                out1 <- NULL
                out2 <- NULL

                if (pushback_len > 0) {
                    out1 <- private$.pushback_queue$mremove(n = pushback_len, missing = missing)
                }
                if (queue_len > 0) {
                   out2 <- private$.queue$mremove(n = queue_len, missing = missing)
                }

                c(
                    unlist(out1, recursive = FALSE),
                    unlist(out2, recursive = FALSE)
                )
            } else if (private$.pushback_queue$size() > 0) {
                unlist(private$.pushback_queue$mremove(n = n, missing = missing), recursive = FALSE)
            } else if (private$.queue$size() > 0) {
                unlist(private$.queue$mremove(n = n, missing = missing), recursive = FALSE)
            } else {
                missing
            }
        },
        peek = function() {
            private$.queue$peek(missing = eos_signal())
        },
        reset = function() {
            private$.pushback_queue$reset()
            private$.queue$reset()
        },
        size = function() {
            private$.pushback_queue$size() + private$.queue$size()
        },
        as_list = function() {
            c(private$.pushback_queue$as_list(), private$.queue$as_list())
        }
    ),
    private = list(
        .pushback_queue = NULL,
        .queue = NULL
    ),
    class = FALSE,
    cloneable = FALSE
)
ElianHugh/emitters documentation built on Feb. 6, 2022, 4:55 a.m.