
#' R6 worker object for use as a worker with master objects generated by [QueryCountMaster()]
#' @description `QueryCountWorker` objects are worker objects at each site of
#' a distributed QueryCount model computation
#' @seealso [QueryCountMaster()] which goes hand-in-hand with this object
#' @importFrom R6 R6Class
#' @export
QueryCountWorker <- R6::R6Class(

    private = list(
        ## the computation definition
        defn = NA,

        ## the local data
        data = NA,

        ## the stateful flag
        stateful = FALSE
  , public = list(

        #' @description
        #' Create a new `QueryCountWorker` object.
        #' @param defn the computation definition
        #' @param data the local data
        #' @param stateful the statefulness flag, default `FALSE`
        #' @return a new `QueryCountWorker` object
        initialize = function(defn, data, stateful = FALSE) {
            private$defn  <- defn
            private$data  <- data

        #' @description
        #' Retrieve the value of the `stateful` field
        getStateful = function() {

        #' @description
        #' Check if inputs and state of object are sane. For future use
        #' @return `TRUE` or `FALSE`
        kosher = function() {

        #' @description
        #' Return the query count on the local data
        #' @import rlang
        #' @importFrom dplyr filter
        #' @importFrom magrittr %>%
        queryCount = function() {
            data  <- private$data
            filter_expr  <- eval(parse(text = paste("rlang::expr(", private$defn$filterCondition, ")")))
            data %>%
                dplyr::filter(!! filter_expr) %>%

#' Create a master object to control worker objects generated by [QueryCountWorker()]
#' @description `QueryCountMaster` objects instantiate and run a distributed query count computation
#' @docType class
#' @seealso [QueryCountWorker()] which goes hand-in-hand with this object
#' @importFrom R6 R6Class
#' @importFrom httr POST add_headers
#' @importFrom jsonlite toJSON
#' @export
QueryCountMaster <- R6::R6Class(
    private = list(
        defn = NA,
        dry_run = FALSE,
        sites = list(),
        mapFn = function(site) {
            payload <- list(objectId = site$instanceId,
                            method = "queryCount")
            q <- httr::POST(.makeOpencpuURL(urlPrefix=site$url, fn="executeMethod"),
                            body = jsonlite::toJSON(payload),
                            httr::add_headers("Content-Type" = "application/json"),
                            config = getConfig()$sslConfig
            ## Should really examine result here.
        result_cache = list(),
        debug = FALSE

    public = list(

        #' @description
        #' Create a new `QueryCountMaster` object.
        #' @param defn the computation definition
        #' @param debug a flag for debugging, default `FALSE`
        #' @return a new `QueryCountMaster` object
        initialize = function(defn, debug = FALSE) {
            'Initialize the object with a dataset'
            private$defn <- defn
            private$debug <- debug

        #' @description
        #' Check if inputs and state of object are sane. For future use
        #' @return `TRUE` or `FALSE`
        kosher = function() {
            ' Check for appropriateness'

        #' @description
        #' Run the distributed query count and return the result
        #' @return the count
        queryCount = function() {
            'Compute the count'
            sites <- private$sites
            if (private$dry_run) {
                mapFn <- function(x) x$worker$queryCount()
            } else {
                mapFn <- private$mapFn
            results <- Map(mapFn, sites)
            value <- Reduce(f = sum, results)
            if (private$debug) {

        #' @description
        #' Retrieve the value of the private `sites` field
        getSites = function() {

        #' @description
        #' Add a url or worker object for a site for participating in the distributed computation. The worker object can be used to avoid complications in debugging remote calls during prototyping.
        #' @param name of the site
        #' @param url web url of the site; exactly one of `url` or `worker` should be specified
        #' @param worker worker object for the site; exactly one of `url` or `worker` should be specified
        addSite = function(name, url = NULL, worker = NULL) {
            ## critical section start
            ## This is the time to cache "p" and check it
            ## against all added sites
            ## Only one of url/worker should be non-null
            stopifnot(is.null(url) || is.null(worker))
            n <- length(private$sites)
            if (is.null(url)) {
                private$dry_run <- private$dry_run || TRUE
                private$sites[[n+1]] <- list(name = name, worker = worker)
            } else {
                localhost <- (grepl("^http://localhost", url) ||
                              grepl("^", url))
                private$sites[[n+1]] <- list(name = name, url = url,
                                             localhost = localhost,
                                             dataFileName = if (localhost) paste0(name, ".rds") else NULL)
            ## critical section end

        #' @description
        #' Run the distributed query count
        #' @return the count

        run = function() {
            'Run Computation'
            dry_run <- private$dry_run
            defn <- private$defn
            debug <- private$debug
            n <- length(sites)
            if (debug) {
                print("run(): checking worker object creation")
            if (dry_run) {
                ## Workers have already been created and passed
                sites <- private$sites
            } else {
                ## Create an instance Id
                ## Make remote call to instantiate workers
                instanceId <- generateId(object=list(Sys.time(), self))
                ## Augment each site with object instance ids
                private$sites <- sites <- lapply(private$sites,
                                                 function(x) list(name = x$name,
                                                                  url = x$url,
                                                                  localhost = x$localhost,
                                                                  dataFileName = x$dataFileName,
                                                                  instanceId = if (x$localhost) x$name else instanceId))
                sitesOK <- sapply(sites,
                                  function(x) {
                                      payload <- if (is.null(x$dataFileName)) {
                                                     list(defnId = defn$id, instanceId = x$instanceId)
                                                 } else {
                                                     list(defnId = defn$id, instanceId = x$instanceId,
                                                          dataFileName = x$dataFileName)
                                      q <- httr::POST(url = .makeOpencpuURL(urlPrefix=x$url, fn="createWorkerInstance"),
                                                      body = jsonlite::toJSON(payload),
                                                      httr::add_headers("Content-Type" = "application/json"),
                                                      config = getConfig()$sslConfig

                ## Stop on error
                if (!all(sitesOK)) {
                    stop("run():  Some sites did not respond successfully!")
                    sites <- sites[which(sitesOK)]  ## Only use sites that created objects successfully.
            result  <- self$queryCount()

            if (!dry_run) {
                if (debug) {
                    print("run(): checking worker object cleanup")
                sitesOK <- sapply(sites,
                                  function(x) {
                                      payload <- list(instanceId = x$instanceId)
                                      q <- POST(url = .makeOpencpuURL(urlPrefix=x$url, fn="destroyInstanceObject"),
                                                body = jsonlite::toJSON(payload),
                                                add_headers("Content-Type" = "application/json"),
                if (!all(sitesOK)) {
                    warning("run():  Some sites did not clean up successfully!")

#' Create a homomorphic computation query count worker object for use with master objects generated by [HEQueryCountMaster()]
#' @description `HEQueryCountWorker` objects are worker objects at each site of
#' a distributed query count model computation using homomorphic encryption
#' @seealso [HEQueryCountMaster()] which goes hand-in-hand with this object
#' @importFrom R6 R6Class
#' @export
HEQueryCountWorker <- R6Class(
    inherit = QueryCountWorker,

    private = list(
        ## the result cache for saving parts of answers to NCP1 and NCP2
        result_cache = list()
    public = list(

        #' @field pubkey the master's public key visible to everyone
        pubkey = NA,

        #' @field den the denominator for rational arithmetic
        den = NA,

        #' @description
        #' Create a new `HEQueryMaster` object.
        #' @param defn the computation definition
        #' @param data the data which is usually the list of sites
        #' @param pubkey_bits the number of bits in public key
        #' @param pubkey_n the `n` for the public key
        #' @param den_bits the number of bits in the denominator (power of 2) used in rational approximations
        #' @importFrom homomorpheR PaillierKeyPair
        #' @return a new `HEQueryMaster` object
        initialize = function(defn, data, pubkey_bits = NULL, pubkey_n = NULL, den_bits = NULL) {
            private$defn  <- defn
            private$data  <- data
            private$stateful <- TRUE
            if (!is.null(pubkey_bits)) {
                self$pubkey  <- homomorpheR::PaillierPublicKey$new(pubkey_bits, pubkey_n)
                self$den <- gmp::as.bigq(2)^(den_bits)  #Our denominator for rational approximations

        ## For debugging only
        ## setResultsCache = function(value) {
        ##     private$results_cache  <- value
        ## },
        ## getResultsCache = function() {
        ##     private$result_cache
        ## },

        #' @description
        #' Set some parameters for homomorphic computations
        #' @param pubkey_bits the number of bits in public key
        #' @param pubkey_n the `n` for the public key
        #' @param den_bits the number of bits in the denominator (power of 2) used in rational approximations
        setParams = function(pubkey_bits, pubkey_n, den_bits) {
            if(is.character(pubkey_n)) { ## we serialize stuff to character over the wire...
                pubkey_n  <- gmp::as.bigz(pubkey_n)
            if (!is.null(pubkey_bits)) {
                self$pubkey  <- homomorpheR::PaillierPublicKey$new(pubkey_bits, pubkey_n)
                self$den <- gmp::as.bigq(2)^(den_bits)  #Our denominator for rational approximations

        #' @description
        #' Run the query count on local data and return the appropriate encrypted result to the party
        #' @param partyNumber the NCP party number (1 or 2)
        #' @param token a token to use for identifying parts of the same computation for NCP1 and NCP2
        #' @return the count as a list of encrypted items with components `int` and `frac`
        #' @importFrom homomorpheR random.bigz
        queryCount = function(partyNumber, token) {
            result  <- private$result_cache[[token]]
            if (is.null(result)) {
                ## we have to compute result for token
                result.int  <- super$queryCount()
                ## Add random quantity and encrypt
                pubkey <- self$pubkey
                ## Generate random offset for int and frac parts
                ## making nBits 256 ensures that x-r and x+r are both positive if x is 1024 bits
                offset <- homomorpheR::random.bigz(nBits = 256)
                ## For debugging set offset to zero
                ##offset  <- 0
                zero  <- pubkey$encrypt(0)
                ncp1Result  <- list(int = pubkey$encrypt(result.int - offset), frac = zero)
                ncp2Result  <- list(int = pubkey$encrypt(result.int + offset), frac = zero)
                result  <- list(ncp1 = ncp1Result, ncp2 = ncp2Result)
                private$result_cache[[token]] <- result
            if (partyNumber == 1) result$ncp1 else result$ncp2

#' Create a homomorphic computation query count master object to employ worker objects generated by [HEQueryCountWorker()]
#' @description `HEQueryCountMaster` objects instantiate and run a distributed homomorphic query count computation; they're instantiated by non-cooperating parties (NCPs)
#' @seealso [HEQueryCountWorker()] which goes hand-in-hand with this object
#' @importFrom R6 R6Class
#' @importFrom httr POST add_headers
#' @importFrom jsonlite toJSON
#' @export
HEQueryCountMaster <- R6Class(
    inherit = QueryCountMaster,
    private = list(
        ## the party number of the NCP this object belongs to
        partyNumber = NA,
        ## the computation definition
        defn = NA,
        ## the mapping function that runs the HE query count method on each site
        mapFn = function(site, token) {
            payload <- list(objectId = site$instanceId,
                            method = "queryCount", partyNumber = private$partyNumber,
                            token = token)
            q <- httr::POST(.makeOpencpuURL(urlPrefix=site$url, fn="executeHEMethod"),
                            body = jsonlite::toJSON(payload),
                            httr::add_headers("Content-Type" = "application/json"),
                            config = getConfig()$sslConfig
            ## Should really examine result here.
        ## A flag for debugging
        debug = FALSE
    public = list(

        #' @field pubkey the master's public key visible to everyone
        pubkey = NA,

        #' @field pubkey_bits the number of bits in the public key (used for reconstructing public key remotely by serializing to character)
        pubkey_bits = NA,

        #' @field pubkey_n the `n` for the public key used for reconstructing public key remotely
        pubkey_n = NA,

        #' @field den the denominator for rational arithmetic
        den = NA,

        #' @field den_bits the number of bits in the denominator used for reconstructing denominator remotely
        den_bits = NA,

        #' @description
        #' Create a new `HEQueryCountMaster` object.
        #' @param defn the computation definition
        #' @param partyNumber the party number of the NCP that this object belongs to (1 or 2)
        #' @param debug a flag for debugging, default `FALSE`
        #' @return a new `HEQueryCountMaster` object
        initialize = function(defn, partyNumber, debug = FALSE) {
            'Initialize the object with a dataset'
            private$defn <- defn
            private$partyNumber <- partyNumber
            private$debug <- debug

        #' @description
        #' Set some parameters of the `HEQueryCountMaster` object for homomorphic computations
        #' @param pubkey_bits the number of bits in public key
        #' @param pubkey_n the `n` for the public key
        #' @param den_bits the number of bits in the denominator (power of 2) used in rational approximations
        setParams = function(pubkey_bits, pubkey_n, den_bits) {
            self$pubkey_bits  <- pubkey_bits
            if(is.character(pubkey_n)) { ## we serialize stuff to character over the wire...
                pubkey_n  <- gmp::as.bigz(pubkey_n)
            self$pubkey_n  <- pubkey_n
            self$den_bits  <- den_bits
            self$pubkey  <- homomorpheR::PaillierPublicKey$new(pubkey_bits, pubkey_n)
            self$den <- gmp::as.bigq(2)^(den_bits)  #Our denominator for rational approximations

        #' @description
        #' Check if inputs and state of object are sane. For future use
        #' @return `TRUE` or `FALSE`
        kosher = function() {
            ' Check for appropriateness'

        #' @description
        #' Run the distributed query count, associate it with a token, and return the result
        #' @param token a token to use as key
        #' @return the partial result as a list of encrypted items with components `int` and `frac`
        #' @importFrom gmp as.bigz
        queryCount = function(token) {
            'Compute the query count from all sites'
            sites <- private$sites
            if (private$dry_run) {
                mapFn <- function(site, token) site$worker$queryCount(private$partyNumber, token)
            } else {
                mapFn <- private$mapFn
            results <- Map(mapFn, sites, rep(token, length(sites)))
            pubkey  <- self$pubkey
            zero  <- pubkey$encrypt(0)
            ## The results could arrive as strings over the wire, so convert
            intResults  <- lapply(results, function(x) gmp::as.bigz(x$int))
            fracResults  <- lapply(results, function(x) gmp::as.bigz(x$frac))
            ## ADD manually for testing
            ## intSum  <- pubkey$add(intResults[[1L]], intResults[[2L]])
            ## intSum  <- pubkey$add(intSum, intResults[[3L]])
            ## fracSum  <- pubkey$add(intResults[[1L]], intResults[[2L]])
            ## fracSum  <- pubkey$add(fracSum, intResults[[3L]])
            intSum  <- Reduce(f = pubkey$add, x = intResults, init = zero)
            fracSum  <- Reduce(f = pubkey$add, x = fracResults, init = zero)
            list(int = intSum, frac = fracSum)

        #' @description
        #' Cleanup the instance objects
        #' @importFrom httr POST add_headers
        #' @importFrom jsonlite toJSON
        cleanup = function() {
            'Send cleanup message to sites'
            if (!private$dry_run) {
                sites  <- private$sites
                ## Sites have already been augmented with instanceId in run method, after which
                ## this should be called
                sitesOK <- sapply(sites,
                                  function(x) {
                                      payload <- list(instanceId = x$instanceId)
                                      q <- httr::POST(url = .makeOpencpuURL(urlPrefix=x$url, fn="destroyInstanceObject"),
                                                      body = jsonlite::toJSON(payload),
                                                      httr::add_headers("Content-Type" = "application/json"),
                if (!all(sitesOK)) {
                    warning("run():  Some sites did not clean up successfully!")

        #' @description
        #' Run the homomorphic encrypted distributed query count computation
        #' @param token a token to use as key
        #' @return the partial result as a list of encrypted items with components `int` and `frac`
        #' @importFrom httr POST add_headers
        #' @importFrom jsonlite toJSON
        run = function(token) {
            'Run Computation'
            dry_run <- private$dry_run
            debug  <- private$debug
            defn <- private$defn
            sites  <- private$sites
            if (dry_run) {
                ## Workers have already been created and passed
                workers  <- lapply(sites, function(x) x$worker)
                ## We just need to set params on the sites
                for (worker in workers) {
                    worker$setParams(pubkey_bits = self$pubkey_bits, pubkey_n = self$pubkey_n, den_bits = self$den_bits)
            } else {
                ## Create an instance Id
                ## Make remote call to instantiate workers
                ## instanceId <- generateId(object=list(Sys.time(), self))
                ## Instancd ID for HE method is just the token!
                instanceId  <- token
                ## Augment each site with object instance ids
                private$sites <- sites <- lapply(private$sites,
                                                 function(x) list(name = x$name,
                                                                  url = x$url,
                                                                  localhost = x$localhost,
                                                                  dataFileName = x$dataFileName,
                                                                  instanceId = if (x$localhost) x$name else instanceId))
                sitesOK <- sapply(sites,
                                  function(x) {
                                      ### FIXED NOW. BUG HERE. This payload of pubkey stuff needs to made character!!
                                      payload <- if (is.null(x$dataFileName)) {
                                                     list(defnId = defn$id, instanceId = x$instanceId,
                                                          pubkey_bits = self$pubkey_bits, pubkey_n = as.character(self$pubkey_n), den_bits = self$den_bits)
                                                 } else {
                                                     list(defnId = defn$id, instanceId = x$instanceId,
                                                          dataFileName = x$dataFileName,
                                                          pubkey_bits = self$pubkey_bits, pubkey_n = as.character(self$pubkey_n), den_bits = self$den_bits)
                                      q <- httr::POST(url = .makeOpencpuURL(urlPrefix=x$url, fn="createHEWorkerInstance"),
                                                      body = jsonlite::toJSON(payload),
                                                      httr::add_headers("Content-Type" = "application/json"),
                                                      config = getConfig()$sslConfig

                ## Stop on error
                if (!all(sitesOK)) {
                    stop("run():  Some sites did not respond successfully!")
                    sites <- sites[which(sitesOK)]  ## Only use sites that created objects successfully.

Try the distcomp package in your browser

Any scripts or data that you put into this service are public.

distcomp documentation built on Sept. 2, 2022, 1:07 a.m.