R/stream-net.R

Defines functions .get_socket_port .get_socket_address create_netsocket

# ~~~~~~~~~~~~~~~ Exports ~~~~~~~~~~~~~~~~ #

#' @export
create_netsocket <- function(host, port, readable = TRUE, writeable = TRUE) {
    if (readable == FALSE && writeable == TRUE) {
        return(WriteableNetSocket$new(host, port))
    } else if (readable == TRUE && writeable == FALSE) {
        return(ReadableNetSocket$new(host, port))
    } else {
        return(DuplexNetSocket$new(host, port))
    }
}

#' @export
ReadableNetSocket <- R6::R6Class(
    inherit = ReadableStream,
    "NetSocket",
    public = list(
        initialize = function(host = "localhost", port = 8888, socket = NULL) {
            if (!is.null(socket)) {
                host <- ""
                port <- .get_socket_port(socket)
                super$initialize(
                    socket
                )
            } else {
                super$initialize(
                    socketConnection(
                        host = host,
                        port = port,
                        blocking = FALSE,
                        timeout = 30,
                        open = "r"
                    )
                )
            }
            self$emit("connect")

            private$.address <- host
            private$.port <- port
        },
        # Used to *reconnect* to a socket
        connect = function() {
            super$.connection_obj <- NULL
            super$.connection_obj <- socketConnection(
                host = private$.address,
                port = private$.port,
                blocking = FALSE,
                timeout = 30,
                open = "r"
            )
            self$emit("connect")
        }
    ),
    active = list(
        address = function() {
            list(
                address = .get_socket_address(private$.address),
                port = private$.port,
                family = private$.family # NYI
            )
        }
    ),
    private = list(
        .address = NULL,
        .port = NULL,
        .family = NULL
    )
)

#' @export
WriteableNetSocket <- R6::R6Class(
    inherit = WriteableStream,
    "NetSocket",
    public = list(
        initialize = function(host = "localhost", port = 8888, socket = NULL) {
            if (!is.null(socket)) {
                host <- ""
                port <- .get_socket_port(socket)
                super$initialize(
                    socket
                )
            } else {
                super$initialize(
                    socketConnection(
                        host = host,
                        port = port,
                        blocking = FALSE,
                        timeout = 30,
                        open = "a"
                    )
                )
            }
            self$emit("connect")
            private$.address <- host
            private$.port <- port
        },
        # Used to *reconnect* to a socket
        connect = function() {
            super$.connection_obj <- NULL
            super$.connection_obj <- socketConnection(
                host = private$.address,
                port = private$.port,
                blocking = FALSE,
                timeout = 30,
                open = "a"
            )
            self$emit("connect")
        }
    ),
    active = list(
        address = function() {
            list(
                address = .get_socket_address(private$.address),
                port = private$.port,
                family = private$.family # NYI
            )
        }
    ),
    private = list(
        .address = NULL,
        .port = NULL,
        .family = NULL
    )
)

#' @export
DuplexNetSocket <- R6::R6Class(
    inherit = DuplexStream,
    "NetSocket",
    public = list(
        initialize = function(host = "localhost", port = 8888, socket = NULL) {
                if (!is.null(socket)) {
                    host <- ""
                    port <- .get_socket_port(socket)
                    super$initialize(
                        socket
                    )
                } else {
                    super$initialize(
                        socketConnection(
                            host = host,
                            port = port,
                            blocking = FALSE,
                            timeout = 30,
                            open = "r"
                        )
                    )
                }
                private$.address <- host
                private$.port <- port
        }
    ),
    active = list(
        address = function() {
            list(
                address = private$.address,
                port = private$.port,
                family = private$.family # NYI
            )
        }
    ),
    private = list(
        .address = NULL,
        .port = NULL,
        .family = NULL
    )
)


# ~~~~~~~~~~~~~~~ Internals ~~~~~~~~~~~~~~~~ #

.get_socket_address <- function(x, ...) {
    if (x == "localhost") {
        return("127.0.0.1")
    } else {
        return(x)
    }
}

.get_socket_port <- function(x, ...) {
    sock_desc <- summary(x)$description
    return(regmatches(sock_desc, regexpr("(?<=:)[0-9]+", sock_desc, perl = TRUE)))
}
ElianHugh/emitters documentation built on Feb. 6, 2022, 4:55 a.m.