.addTask <- function(jobId, taskId, rCommand, ...) {
storageCredentials <- rAzureBatch::getStorageCredentials()
args <- list(...)
.doAzureBatchGlobals <- args$envir
argsList <- args$args
dependsOn <- args$dependsOn
cloudCombine <- args$cloudCombine
userOutputFiles <- args$outputFiles
containerImage <- args$containerImage
resultFile <- paste0(taskId, "-result", ".rds")
accountName <- storageCredentials$name
if (!is.null(argsList)) {
assign("argsList", argsList, .doAzureBatchGlobals)
}
# Only use the download command if cloudCombine is enabled
# Otherwise just leave it empty
commands <- c()
if (!is.null(cloudCombine)) {
assign("cloudCombine", cloudCombine, .doAzureBatchGlobals)
copyCommand <- sprintf(
"%s %s %s --download --saskey $BLOBXFER_SASKEY --remoteresource . --include result/*.rds",
accountName,
jobId,
"$AZ_BATCH_TASK_WORKING_DIR"
)
downloadCommand <-
dockerRunCommand("alfpark/blobxfer:0.12.1", copyCommand, "blobxfer", FALSE)
commands <- c(downloadCommand)
}
envFile <- paste0(taskId, ".rds")
saveRDS(argsList, file = envFile)
rAzureBatch::uploadBlob(jobId, paste0(getwd(), "/", envFile))
file.remove(envFile)
sasToken <- rAzureBatch::createSasToken("r", "c", jobId)
writeToken <- rAzureBatch::createSasToken("w", "c", jobId)
envFileUrl <-
rAzureBatch::createBlobUrl(storageCredentials$name, jobId, envFile, sasToken)
resourceFiles <-
list(rAzureBatch::createResourceFile(url = envFileUrl, fileName = envFile))
exitConditions <- NULL
if (!is.null(args$dependsOn)) {
dependsOn <- list(taskIds = dependsOn)
}
else {
exitConditions <- list(default = list(dependencyAction = "satisfy"))
}
containerUrl <-
rAzureBatch::createBlobUrl(
storageAccount = storageCredentials$name,
containerName = jobId,
sasToken = writeToken
)
outputFiles <- list(
list(
filePattern = resultFile,
destination = list(container = list(
path = paste0("result/", resultFile),
containerUrl = containerUrl
)),
uploadOptions = list(uploadCondition = "taskCompletion")
),
list(
filePattern = paste0(taskId, ".txt"),
destination = list(container = list(
path = paste0("logs/", taskId, ".txt"),
containerUrl = containerUrl
)),
uploadOptions = list(uploadCondition = "taskCompletion")
),
list(
filePattern = "../stdout.txt",
destination = list(container = list(
path = paste0("stdout/", taskId, "-stdout.txt"),
containerUrl = containerUrl
)),
uploadOptions = list(uploadCondition = "taskCompletion")
),
list(
filePattern = "../stderr.txt",
destination = list(container = list(
path = paste0("stderr/", taskId, "-stderr.txt"),
containerUrl = containerUrl
)),
uploadOptions = list(uploadCondition = "taskCompletion")
)
)
outputFiles <- append(outputFiles, userOutputFiles)
commands <-
c(commands,
dockerRunCommand(containerImage, rCommand, taskId))
commands <- linuxWrapCommands(commands)
sasToken <- rAzureBatch::createSasToken("rwcl", "c", jobId)
queryParameterUrl <- "?"
for (query in names(sasToken)) {
queryParameterUrl <-
paste0(queryParameterUrl,
query,
"=",
RCurl::curlEscape(sasToken[[query]]),
"&")
}
queryParameterUrl <-
substr(queryParameterUrl, 1, nchar(queryParameterUrl) - 1)
setting <- list(name = "BLOBXFER_SASKEY",
value = queryParameterUrl)
containerEnv <- list(name = "CONTAINER_NAME",
value = jobId)
rAzureBatch::addTask(
jobId,
taskId,
environmentSettings = list(setting, containerEnv),
resourceFiles = resourceFiles,
commandLine = commands,
dependsOn = dependsOn,
outputFiles = outputFiles,
exitConditions = exitConditions
)
}
.addJob <- function(jobId,
poolId,
resourceFiles,
metadata,
...) {
args <- list(...)
packages <- args$packages
github <- args$github
bioconductor <- args$bioconductor
containerImage <- args$containerImage
poolInfo <- list("poolId" = poolId)
# Default command for job preparation task
commands <- c("ls")
if (!is.null(packages)) {
jobPackages <-
dockerRunCommand(containerImage,
getJobPackageInstallationCommand("cran", packages),
jobId)
commands <- c(commands, jobPackages)
}
if (!is.null(github) && length(github) > 0) {
jobPackages <-
dockerRunCommand(containerImage,
getJobPackageInstallationCommand("github", github),
jobId)
commands <- c(commands, jobPackages)
}
if (!is.null(bioconductor) &&
length(bioconductor) > 0) {
jobPackages <-
dockerRunCommand(containerImage,
getJobPackageInstallationCommand("bioconductor", bioconductor),
jobId)
commands <- c(commands, jobPackages)
}
jobPreparationTask <- list(
commandLine = linuxWrapCommands(commands),
userIdentity = list(autoUser = list(
scope = "pool",
elevationLevel = "admin"
)),
waitForSuccess = TRUE,
resourceFiles = resourceFiles,
constraints = list(maxTaskRetryCount = 2)
)
usesTaskDependencies <- TRUE
response <- rAzureBatch::addJob(
jobId,
poolInfo = poolInfo,
jobPreparationTask = jobPreparationTask,
usesTaskDependencies = usesTaskDependencies,
content = "response",
metadata = metadata
)
return(response)
}
.addPool <-
function(pool,
packages,
environmentSettings,
resourceFiles,
...) {
args <- list(...)
commands <- c()
if (!is.null(args$commandLine)) {
commands <- c(commands, args$commandLine)
}
startTask <- list(
commandLine = linuxWrapCommands(commands),
userIdentity = list(autoUser = list(
scope = "pool",
elevationLevel = "admin"
)),
waitForSuccess = TRUE
)
if (!is.null(environmentSettings)) {
startTask$environmentSettings <- environmentSettings
}
if (length(resourceFiles) > 0) {
startTask$resourceFiles <- resourceFiles
}
virtualMachineConfiguration <- list(
imageReference = list(
publisher = "Canonical",
offer = "UbuntuServer",
sku = "16.04-LTS",
version = "latest"
),
nodeAgentSKUId = "batch.node.ubuntu 16.04"
)
response <- rAzureBatch::addPool(
pool$name,
pool$vmSize,
startTask = startTask,
virtualMachineConfiguration = virtualMachineConfiguration,
enableAutoScale = TRUE,
autoscaleFormula = getAutoscaleFormula(
pool$poolSize$autoscaleFormula,
pool$poolSize$dedicatedNodes$min,
pool$poolSize$dedicatedNodes$max,
pool$poolSize$lowPriorityNodes$min,
pool$poolSize$lowPriorityNodes$max,
maxTasksPerNode = pool$maxTasksPerNode
),
autoScaleEvaluationInterval = "PT5M",
maxTasksPerNode = pool$maxTasksPerNode,
content = "text"
)
return(response)
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.