R/send.R

Defines functions send_file receive_file send_big_objects send_big_object receive_big_object receive_big_objects execute_remote

send_file<-function(cl, file_path, remote_path, flag_check_first=TRUE) {
  if(!file.exists(file_path)) {
    stop(paste0("Cannot find ", file_path))
  }
  flag_may_skip_upload<-FALSE
  if(flag_check_first) {
    remote_hash<-eval(substitute(parallel::clusterEvalQ(cl, if(file.exists(remote_path)) {tools::md5sum(remote_path)} else{''}), list(remote_path=remote_path)))[[1]]
    if(remote_hash!='') {
      our_hash <-tools::md5sum(file_path)
      if(our_hash==remote_hash) {
        return()
      }
    }
  }

  if(!flag_may_skip_upload) {
    n<-file.size(file_path)
    file_handle<-file(file_path, 'rb')
    a<-readBin(file_handle, raw(), n=n)
    close(file_handle)
    e<-new.env()
    assign('a', a, envir=e)
    parallel::clusterExport(cl, 'a', envir = e)
    os<-object.size(a)
    rm(a)
    rm(e)
    gc()
    remote_os<-parallel::clusterEvalQ(cl, object.size(a))[[1]]
    if(remote_os != os) {
      stop("Sent object has different size")
    }
    remote_path <- remote_path
    parallel::clusterExport(cl, varlist = 'remote_path', envir = environment())
    parallel::clusterEvalQ(cl,
    {
      if(!dir.exists(dirname(remote_path))) {
        dir.create(dirname(remote_path), recursive = TRUE)
      }
      f<-file(remote_path, 'wb')
      writeBin(a, f, useBytes = TRUE)
      close(f)
      rm(list=c('a', 'remote_path'));1
    })
  }
}

receive_file<-function(cl, local_path, remote_path, flag_check_first=TRUE) {
  if(!file.exists(local_path)) {
    flag_check_first=FALSE
  }
  if(flag_check_first) {
    our_hash<-tools::md5sum(local_path)
  } else {
    our_hash<-""
  }

  a<-eval(substitute(parallel::clusterEvalQ(cl,{
    if(file.exists(remote_path)) {
      if(our_hash!="") {
        hash<-tools::md5sum(remote_path)
      } else {
        hash<-"!"
      }
      if(hash!=our_hash) {
        n<-file.size(remote_path)
        file_handle<-file(remote_path, 'rb')
        a<-readBin(file_handle, raw(), n=n)
        a
      } else {
        "file already present"
      }
    } else {
      paste0("file ", remote_path, " not found on ", system("hostname", intern = TRUE))
    }
  }),list(remote_path=remote_path, cl=cl, our_hash=our_hash)))

  if('character' %in% class(a)) {
    if(a=="file already present") {
      return(local_path)
    } else {
      return(stop(a))
    }
  }

  if(!dir.exists(dirname(local_path))) {
    dir.create(dirname(local_path), recursive = TRUE)
  }

  f<-file(local_path, 'wb')
  writeBin(a[[1]], f, useBytes = TRUE)
  close(f)
  return(local_path)
}

send_big_objects<-function(cl, objects, compress=NULL) {
  if(is.null(compress)) {
    if(as.numeric(object.size(objects)) < 100000) {
      compress<-'raw'
    }else {
      compress<-'bzip2'
    }
  }
  if(!'list' %in% class(objects)) {
    stop("objects must be a named list of objects to upload")
  }

  if(compress!='raw') {
    e<-new.env()
    raw<-memCompress(serialize(objects, connection=NULL), type=compress)
    assign('raw', raw, envir=e)
    parallel::clusterExport(cl, 'raw', envir = e)
    eval(substitute(parallel::clusterEvalQ(cl, {
      .tmp_imported_objects=unserialize(memDecompress(from = raw, type=compress))
      lapply(seq_along(.tmp_imported_objects),
             function(i) {
               assign(names(.tmp_imported_objects)[[i]], .tmp_imported_objects[[i]], .GlobalEnv)
             }
      )
      rm(.tmp_imported_objects)
      1}), list(compress=compress)))
    return(1)
  } else {
    parallel::clusterExport(cl, names(objects), envir = as.environment(objects))
    return(1)
  }
}

send_big_object<-function(cl, object, remote_name, compress=NULL) {
  if(is.null(compress)) {
    if(as.numeric(object.size(object)) < 100000) {
      compress<-'raw'
    }else {
      compress<-'bzip2'
    }
  }

  if(compress!='') {
    e<-new.env()
    raw<-memCompress(serialize(object, connection=NULL), type=compress)
    assign('raw', raw, envir=e)
    parallel::clusterExport(cl, 'raw', envir = e)
    eval(substitute(parallel::clusterEvalQ(cl, {assign(remote_name, unserialize(memDecompress(from = raw, type=compress)),.GlobalEnv);1}),
                    list(remote_name=remote_name, compress=compress)))
  } else {
    e<-new.env()
    assign('raw', object, envir=e)
    parallel::clusterExport(cl, remote_name, envir = e)
  }
}

receive_big_object<-function(cl, object_name, compress=NULL) {
  if(is.null(compress)) {
    obj<-tryCatch(
      eval(substitute(parallel::clusterEvalQ(cl, object.size(eval(parse(text=object_name)))),
                      list(object_name=parse(text=object_name), compress=compress))),

      error=function(e)e
    )
    if('error' %in% class(obj))
    {
      stop(paste0("Getting the remote object returned an error: ", obj$message))
    }
    if(as.numeric(obj) < 100000) {
      compress<-'raw'
    }else {
      compress<-'bzip2'
    }
  }

  if(compress!='') {
    ans<-eval(substitute(parallel::clusterEvalQ(cl, memCompress(serialize(eval(parse(text=object_name)), connection=NULL), type=compress)),
                         list(object_name=parse(text=object_name), compress=compress)))
    obj<-unserialize(memDecompress(from = ans[[1]], type=compress))
  } else {
    obj<-eval(substitute(parallel::clusterEvalQ(cl, object_name), list(object_name=parse(text=object_name))))
  }
  return(obj)
}

receive_big_objects<-function(cl, object_names, compress=NULL) {
  if(is.null(compress)) {
    obj<-tryCatch(
      unlist(eval(substitute(parallel::clusterEvalQ(cl, sum(as.numeric(lapply(object_names,
                                                           function(object_name) object.size(eval(parse(text=object_name))))))),
                      list(object_names=object_names)))),

      error=function(e)e
    )
    if('error' %in% class(obj))
    {
      stop(paste0("Getting the remote object returned an error: ", obj$message))
    }
    if(as.numeric(obj) < 100000) {
      compress<-'raw'
    }else {
      compress<-'bzip2'
    }
  }

  if(compress!='raw') {
    ans<-eval(substitute(parallel::clusterEvalQ(cl, {
        e<-new.env()
        for(object_name in object_names) {
          e$object_name <- eval(parse(text=object_name))
        }
        memCompress(serialize(e, connection=NULL), type=compress)
      }), list(object_names=object_names, compress=compress)))
    objs<-unserialize(memDecompress(from = ans[[1]], type=compress))
  } else {
    objs<-eval(substitute(parallel::clusterEvalQ(cl, {
        e<-new.env()
        for(object_name in object_names) {
          assign(object_name, eval(parse(text=object_name)), envir=e)
        }
        e
      }), list(object_names=object_names)))
  }
  return(objs)
}

execute_remote<-function(cl, expr) {
  expr <- substitute(expression)

  job <- BackgroundTask$new()

  job$run_task({
    stats<-get_current_load(private$cl_connection, private$remote_tmp_dir_, private$cl_id_)
    start_stats<-list(peak_mem_kb=stats$peak_mem_kb, cpu_time=stats$cpu_time, wall_time=stats$wall_time)

    ans<-parallel::clusterEvalQ(cl = private$cl_connection, expression)

    stats<-get_current_load(private$cl_connection, private$remote_tmp_dir_, private$cl_id_)
    end_stats<-list(peak_mem_kb=stats$peak_mem_kb, cpu_time=stats$cpu_time, wall_time=stats$wall_time)
    return(list(start_stats=start_stats, ans=ans, end_stats=end_stats))
  }
  )
  return(job)
}

# srv_loc<-RemoteServer$new('localhost')
# debugonce(srv_loc$print)
# debugonce(srv_loc$pri)
# print(srv_loc)
adamryczkowski/clustertools documentation built on May 3, 2019, 2:55 p.m.