Nothing
testthat::skip_on_cran()
testthat::skip_if_not(
condition = check_aif_py_modules(trace = FALSE),
message = "Necessary python modules not available"
)
# Start time
test_time_start <- Sys.time()
# Load python scripts
load_all_py_scripts()
# Prototyp Metric---------------------------------------------------------------
test_that("Prototype Metric", {
device <- ifelse(torch$cuda$is_available(), "cuda", "cpu")
layer <- py$layer_protonet_metric()$to(device)
samples <- matrix(
data = c(
0.59, 0.71, 0.51,
0.45, -1, -0.77,
0.27, 0.11, -0.99,
-0.15, -0.61, -0.89,
0.39, 0.57, -0.31,
0.57, 0.21, 0.46,
-0.59, 0.44, -0.06,
-0.85, 0.45, 0.94,
0.1, -0.36, 0.49
),
nrow = 9,
ncol = 3,
byrow = TRUE
)
samples <- reticulate::np_array(samples)
samples <- torch$from_numpy(
samples
)
prototypes <- matrix(
data = c(
0.436667, -0.060000, -0.416667,
0.270000, 0.056667, -0.246667,
-0.446667, 0.176667, 0.456667
),
nrow = 3,
ncol = 3,
byrow = TRUE
)
prototypes <- reticulate::np_array(prototypes)
prototypes <- torch$from_numpy(
prototypes
)
result_matrix <- matrix(
data = c(
1.214546097, 1.049661321, 1.167033276,
1.004301858, 1.192821119, 1.921787363,
0.620796979, 0.745244181, 1.615827961,
0.933124977, 1.017212968, 1.587566272,
0.640668054, 0.530963485, 1.201041215,
0.926942405, 0.782872205, 1.01721843,
1.196350933, 0.95989004, 0.597355282,
1.938080379, 1.678478147, 0.686294397,
1.012614877, 0.863243239, 0.766789845
),
nrow = 9,
ncol = 3,
byrow = TRUE
)
scaling_factor <- layer$get_scaling_factor()
distances <- layer(x = samples$to(device), prototypes = prototypes$to(device))
expect_equal(
object = tensor_to_numpy(distances),
expected = result_matrix,
tolerance = 1e-6
)
})
# Masking Layer-----------------------------------------------------------------
test_that("Masking Layer", {
device <- ifelse(torch$cuda$is_available(), "cuda", "cpu")
pad_value <- sample(x = seq(from = -200, to = -10, by = 10), size = 1)
times <- sample(x = seq(from = 3, to = 10, by = 1), size = 1)
features <- sample(x = seq(from = 3, to = 1024, by = 1), size = 1)
sequence_length <- sample(x = seq(from = 1, to = times, by = 1), size = 30, replace = TRUE)
example_tensor <- generate_tensors(
times = times,
features = features,
seq_len = sequence_length,
pad_value = pad_value
)
layer <- py$masking_layer(pad_value)$to(device)
y <- layer(example_tensor)
# Check if input is the same as the output
expect_equal(tensor_to_numpy(y[[1]]), tensor_to_numpy(example_tensor))
# Check sequence length
expect_equal(as.numeric(tensor_to_numpy(y[[2]])), sequence_length)
# Check Masking times
expect_equal(rowSums(tensor_to_numpy(y[[3]])), times - sequence_length)
})
# Identity Layer----------------------------------------------------------------
test_that("identity layer", {
device <- ifelse(torch$cuda$is_available(), "cuda", "cpu")
pad_value <- sample(x = seq(from = -200, to = -10, by = 10), size = 1)
times <- sample(x = seq(from = 3, to = 10, by = 1), size = 1)
features <- sample(x = seq(from = 3, to = 1024, by = 1), size = 1)
sequence_length <- sample(x = seq(from = 1, to = times, by = 1), size = 30, replace = TRUE)
example_tensor <- generate_tensors(
times = times,
features = features,
seq_len = sequence_length,
pad_value = pad_value
)$to(device)
masking_layer <- py$masking_layer(pad_value)$to(device)
values <- masking_layer(example_tensor)
layer <- py$identity_layer(apply_masking = FALSE)$to(device)
y <- layer(
x = values[[1]],
seq_len = values[[2]],
mask_times = values[[3]],
mask_features = values[[4]]
)
# Test that values are the same
expect_equal(tensor_to_numpy(y[[1]]), tensor_to_numpy(values[[1]]))
expect_equal(tensor_to_numpy(y[[2]]), tensor_to_numpy(values[[2]]))
expect_equal(tensor_to_numpy(y[[3]]), tensor_to_numpy(values[[3]]))
expect_equal(tensor_to_numpy(y[[4]]), tensor_to_numpy(values[[4]]))
# Test that padding is not destroyed
y_2 <- masking_layer(y[[1]])
expect_equal(tensor_to_numpy(y[[2]]), tensor_to_numpy(y_2[[2]]))
expect_equal(tensor_to_numpy(y[[3]]), tensor_to_numpy(y_2[[3]]))
expect_equal(tensor_to_numpy(y[[4]]), tensor_to_numpy(y_2[[4]]))
})
# Residual connection with Mask-----------------------------------------------------------
test_that("residual connection with Mask", {
device <- ifelse(torch$cuda$is_available(), "cuda", "cpu")
pad_value <- sample(x = seq(from = -200, to = -10, by = 10), size = 1)
times <- sample(x = seq(from = 3, to = 10, by = 1), size = 1)
features <- sample(x = seq(from = 3, to = 1024, by = 1), size = 1)
sequence_length <- sample(x = seq(from = 1, to = times, by = 1), size = 30, replace = TRUE)
example_tensor <- generate_tensors(
times = times,
features = features,
seq_len = sequence_length,
pad_value = pad_value
)$to(device)
types <- c("None", "Addition", "ResidualGate")
masking_layer <- py$masking_layer(pad_value)$to(device)
values <- masking_layer(example_tensor)
for (type in types) {
layer <- py$layer_residual_connection(
type = type,
pad_value = as.integer(pad_value)
)$to(device)
y <- layer(
x = values[[1]],
y = values[[1]],
seq_len = values[[2]],
mask_times = values[[3]],
mask_features = values[[4]]
)
layer$eval()
# Test that masking values are the same
expect_equal(tensor_to_numpy(y[[2]]), tensor_to_numpy(values[[2]]))
expect_equal(tensor_to_numpy(y[[3]]), tensor_to_numpy(values[[3]]))
expect_equal(tensor_to_numpy(y[[4]]), tensor_to_numpy(values[[4]]))
# Test that padding is not destroyed
y_2 <- masking_layer(y[[1]])
expect_equal(tensor_to_numpy(y[[2]]), tensor_to_numpy(y_2[[2]]))
expect_equal(tensor_to_numpy(y[[3]]), tensor_to_numpy(y_2[[3]]))
expect_equal(tensor_to_numpy(y[[4]]), tensor_to_numpy(y_2[[4]]))
# Test that values do not change at random for same input
y_1 <- layer(
x = values[[1]],
y = values[[1]],
seq_len = values[[2]],
mask_times = values[[3]],
mask_features = values[[4]]
)
y_2 <- layer(
x = values[[1]],
y = values[[1]],
seq_len = values[[2]],
mask_times = values[[3]],
mask_features = values[[4]]
)
expect_equal(tensor_to_numpy(y_1[[1]]), tensor_to_numpy(y_2[[1]]))
}
})
# LayerNorm with Mask-----------------------------------------------------------
test_that("LayerNorm with Mask", {
device <- ifelse(torch$cuda$is_available(), "cuda", "cpu")
pad_value <- sample(x = seq(from = -200, to = -10, by = 10), size = 1)
times <- sample(x = seq(from = 3, to = 10, by = 1), size = 1)
features <- sample(x = seq(from = 3, to = 1024, by = 1), size = 1)
sequence_length <- sample(x = seq(from = 1, to = times, by = 1), size = 30, replace = TRUE)
example_tensor <- generate_tensors(
times = times,
features = features,
seq_len = sequence_length,
pad_value = pad_value
)$to(device)
masking_layer <- py$masking_layer(pad_value)$to(device)
values <- masking_layer(example_tensor)
layer <- py$LayerNorm_with_Mask(
times = as.integer(times),
features = as.integer(features),
pad_value = as.integer(pad_value)
)$to(device)
y <- layer(
x = values[[1]],
seq_len = values[[2]],
mask_times = values[[3]],
mask_features = values[[4]]
)
# Test that masking values are the same
expect_equal(tensor_to_numpy(y[[2]]), tensor_to_numpy(values[[2]]))
expect_equal(tensor_to_numpy(y[[3]]), tensor_to_numpy(values[[3]]))
expect_equal(tensor_to_numpy(y[[4]]), tensor_to_numpy(values[[4]]))
# Test that padding is not destroyed
y_2 <- masking_layer(y[[1]])
expect_equal(tensor_to_numpy(y[[2]]), tensor_to_numpy(y_2[[2]]))
expect_equal(tensor_to_numpy(y[[3]]), tensor_to_numpy(y_2[[3]]))
expect_equal(tensor_to_numpy(y[[4]]), tensor_to_numpy(y_2[[4]]))
# Test that the computations are correct for sequences with full length
example_tensor <- generate_tensors(
times = times,
features = features,
seq_len = rep.int(times, times = 10),
pad_value = pad_value
)$to(device)
comparison_layer <- torch$nn$LayerNorm(
normalized_shape = example_tensor$size(2L),
eps = 1e-05,
elementwise_affine = TRUE,
bias = TRUE,
device = NULL,
dtype = example_tensor$dtype
)$to(device)
res_expected <- tensor_to_numpy(comparison_layer(example_tensor))
values <- masking_layer(example_tensor)
results <- tensor_to_numpy(layer(
x = values[[1]],
seq_len = values[[2]],
mask_times = values[[3]],
mask_features = values[[4]]
)[[1]])
expect_equal(results, res_expected, tolerance = 1e-5)
})
# Dense Layer with Mask-----------------------------------------------------------
test_that("DenseLayer with Mask", {
device <- ifelse(torch$cuda$is_available(), "cuda", "cpu")
normalization_types <- c("None", "LayerNorm")
residual_types <- c("None", "Addition", "ResidualGate")
connection_types <- c("Regular", "PairwiseOrthogonal")
pad_value <- sample(x = seq(from = -200, to = -10, by = 10), size = 1)
times <- sample(x = seq(from = 3, to = 10, by = 1), size = 1)
features <- sample(x = seq(from = 3, to = 1024, by = 1), size = 1)
sequence_length <- sample(x = seq(from = 1, to = times, by = 1), size = 30, replace = TRUE)
example_tensor <- generate_tensors(
times = times,
features = features,
seq_len = sequence_length,
pad_value = pad_value
)$to(device)
masking_layer <- py$masking_layer(pad_value)$to(device)
values <- masking_layer(example_tensor)
# Test for equal, more, and fewer features as input size
features_output <- c(
features,
sample(x = seq(from = 1, to = (features - 1)), size = 1),
sample(x = seq(from = (features + 1), to = 2 * features), size = 1)
)
for (connection_type in connection_types) {
for (norm_types in normalization_types) {
for (res_types in residual_types) {
for (target_features in features_output) {
# Create layer
layer <- py$dense_layer_with_mask(
input_size = as.integer(features),
output_size = as.integer(target_features),
times = as.integer(times),
pad_value = as.integer(pad_value),
connection_type = connection_type,
act_fct = "ELU",
dropout = 0.3,
bias = TRUE,
parametrizations = "None",
dtype = values[[1]]$dtype,
residual_type = "None",
normalization_type = norm_types
)$to(device)
layer$eval()
y <- layer(
x = values[[1]],
seq_len = values[[2]],
mask_times = values[[3]],
mask_features = values[[4]]
)
# Test that masking values are the same
expect_equal(tensor_to_numpy(y[[2]]), tensor_to_numpy(values[[2]]))
expect_equal(tensor_to_numpy(y[[3]]), tensor_to_numpy(values[[3]]))
# Test the correct size of the new masking on feature level
expect_equal(dim(tensor_to_numpy(y[[4]]))[3], target_features)
# Test that padding is not destroyed
y_2 <- masking_layer(y[[1]])
expect_equal(tensor_to_numpy(y[[2]]), tensor_to_numpy(y_2[[2]]))
expect_equal(tensor_to_numpy(y[[3]]), tensor_to_numpy(y_2[[3]]))
expect_equal(tensor_to_numpy(y[[4]]), tensor_to_numpy(y_2[[4]]))
# Test that values do not change at random for same input
y_1 <- layer(
x = values[[1]],
seq_len = values[[2]],
mask_times = values[[3]],
mask_features = values[[4]]
)
y_2 <- layer(
x = values[[1]],
seq_len = values[[2]],
mask_times = values[[3]],
mask_features = values[[4]]
)
expect_equal(tensor_to_numpy(y_1[[1]]), tensor_to_numpy(y_2[[1]]))
}
}
}
}
})
# layer_tf_encoder-------------------------------------------------------
test_that("layer_tf_encoder", {
device <- ifelse(torch$cuda$is_available(), "cuda", "cpu")
attention_types <- c("MultiHead", "Fourier")
normalization_types <- c("None", "LayerNorm")
normalization_positions <- c("Post", "Pre")
pad_value <- sample(x = seq(from = -200, to = -10, by = 10), size = 1)
times <- sample(x = seq(from = 3, to = 10, by = 1), size = 1)
features <- sample(x = seq(from = 4, to = 1024, by = 2), size = 1)
sequence_length <- sample(x = seq(from = 1, to = times, by = 1), size = 30, replace = TRUE)
example_tensor <- generate_tensors(
times = times,
features = features,
seq_len = sequence_length,
pad_value = pad_value
)$to(device)
masking_layer <- py$masking_layer(pad_value)$to(device)
values <- masking_layer(example_tensor)
# Test for equal, more, and fewer features as input size
features_output <- c(
features,
sample(x = seq(from = 1, to = (features - 1)), size = 1),
sample(x = seq(from = (features + 1), to = 2 * features), size = 1)
)
for (attention_type in attention_types) {
for (normalization_type in normalization_types) {
for (normalization_position in normalization_positions) {
# Create layer
layer <- py$layer_tf_encoder(
dense_dim = 38L,
times = as.integer(times),
pad_value = as.integer(pad_value),
attention_type = attention_type,
features = as.integer(features),
normalization_type = normalization_type,
normalization_position = normalization_position,
residual_type = "ResidualGate",
num_heads = as.integer(2),
act_fct = "ELU",
dropout_rate_1 = 0.3,
dropout_rate_2 = 0.3,
bias = TRUE,
parametrizations = "None",
dtype = values[[1]]$dtype,
)$to(device)
layer$eval()
y <- layer(
x = values[[1]],
seq_len = values[[2]],
mask_times = values[[3]],
mask_features = values[[4]]
)
# Test that masking values are the same
expect_equal(tensor_to_numpy(y[[2]]), tensor_to_numpy(values[[2]]))
expect_equal(tensor_to_numpy(y[[3]]), tensor_to_numpy(values[[3]]))
# Test that padding is not destroyed
y_2 <- masking_layer(y[[1]])
expect_equal(tensor_to_numpy(y[[2]]), tensor_to_numpy(y_2[[2]]))
expect_equal(tensor_to_numpy(y[[3]]), tensor_to_numpy(y_2[[3]]))
expect_equal(tensor_to_numpy(y[[4]]), tensor_to_numpy(y_2[[4]]))
# Test that values do not change at random for same input
y_1 <- layer(
x = values[[1]],
seq_len = values[[2]],
mask_times = values[[3]],
mask_features = values[[4]]
)
y_2 <- layer(
x = values[[1]],
seq_len = values[[2]],
mask_times = values[[3]],
mask_features = values[[4]]
)
expect_equal(tensor_to_numpy(y_1[[1]]), tensor_to_numpy(y_2[[1]]))
}
}
}
})
# exreme_pooling_over_time------------------------------------------------------------
test_that("exreme_pooling_over_time", {
device <- ifelse(torch$cuda$is_available(), "cuda", "cpu")
pad_value <- sample(x = seq(from = -200, to = -10, by = 10), size = 1)
times <- sample(x = seq(from = 3, to = 10, by = 1), size = 1)
features <- sample(x = seq(from = 3, to = 1024, by = 1), size = 1)
sequence_length <- sample(x = seq(from = 1, to = times, by = 1), size = 30, replace = TRUE)
example_tensor <- generate_tensors(
times = times,
features = features,
seq_len = sequence_length,
pad_value = pad_value
)$to(device)
masking_layer <- py$masking_layer(pad_value)$to(device)
values <- masking_layer(example_tensor)
for (pooling_type in c("Max", "Min", "MinMax")) {
layer <- py$exreme_pooling_over_time(
times = as.integer(times),
features = as.integer(features),
pad_value = as.integer(pad_value),
pooling_type = pooling_type
)$to(device)
y_1 <- layer(values[[1]], values[[4]])
if (pooling_type != "MinMax") {
expect_equal(dim(tensor_to_numpy(y_1)), c(length(sequence_length), features))
} else {
expect_equal(dim(tensor_to_numpy(y_1)), c(length(sequence_length), 2 * features))
}
}
})
# layer_adaptive_extreme_pooling_1d---------------------------------------------
test_that("layer_adaptive_extreme_pooling_1d", {
device <- ifelse(torch$cuda$is_available(), "cuda", "cpu")
tensor <- torch$rand(30L, 768L)$to(device)
output_size <- 10
for (pooling_type in c("Max", "Min", "MinMax")) {
layer <- py$layer_adaptive_extreme_pooling_1d(
output_size = as.integer(output_size),
pooling_type = pooling_type
)$to(device)
result <- layer(tensor)
# Check the corres output shape
expect_equal(ncol(tensor_to_numpy(result)), output_size)
# Check if the correct values are selected
result_matrix <- tensor_to_numpy(result)
tensor_matrix <- tensor_to_numpy(tensor)
for (i in seq_len(nrow(result_matrix))) {
if (pooling_type == "Max") {
ordered_values <- tensor_matrix[i, order(tensor_matrix[i, ], decreasing = TRUE)]
relevant_values <- ordered_values[seq(from = 1, to = output_size)]
expect_equal(sum(round(result_matrix[i, ], digits = 5) %in% round(relevant_values, digits = 5)), output_size)
} else if (pooling_type == "Min") {
ordered_values <- tensor_matrix[i, order(tensor_matrix[i, ], decreasing = FALSE)]
relevant_values <- ordered_values[seq(from = 1, to = output_size)]
expect_equal(sum(round(result_matrix[i, ], digits = 5) %in% round(relevant_values, digits = 5)), output_size)
} else {
n_max <- ceiling(output_size / 2)
n_min <- output_size - n_max
# Max
ordered_values_max <- tensor_matrix[i, order(tensor_matrix[i, ], decreasing = TRUE)]
relevant_values_max <- ordered_values_max[seq(from = 1, to = n_max)]
expect_equal(sum(round(result_matrix[i, ], digits = 5) %in% round(relevant_values_max, digits = 5)), n_max)
# Min
ordered_values_min <- tensor_matrix[i, order(tensor_matrix[i, ], decreasing = FALSE)]
relevant_values_min <- ordered_values_min[seq(from = 1, to = n_min)]
expect_equal(sum(round(result_matrix[i, ], digits = 5) %in% round(relevant_values_min, digits = 5)), n_min)
}
}
}
})
# Layer layer_n_gram_convolution--------------------------------------------------
test_that("layer_n_gram_convolution", {
device <- ifelse(torch$cuda$is_available(), "cuda", "cpu")
pad_value <- sample(x = seq(from = -200, to = -10, by = 10), size = 1)
times <- sample(x = seq(from = 3, to = 10, by = 1), size = 1)
features <- sample(x = seq(from = 3, to = 1024, by = 1), size = 1)
sequence_length <- sample(x = seq(from = 1, to = times, by = 1), size = 30, replace = TRUE)
example_tensor <- generate_tensors(
times = times,
features = features,
seq_len = sequence_length,
pad_value = pad_value
)$to(device)
masking_layer <- py$masking_layer(pad_value)$to(device)
values <- masking_layer(example_tensor)
n_filter <- sample(x = seq(from = 2, to = features, by = 1), size = 1)
layer <- py$layer_n_gram_convolution(
kernel_size_times = as.integer(2),
times = as.integer(times),
features = as.integer(features),
pad_value = as.integer(pad_value),
n_filter = as.integer(n_filter),
bias = TRUE,
parametrizations = "None",
dtype = values[[1]]$dtype,
device = device
)$to(device)
layer$eval()
y <- layer(
x = values[[1]],
seq_len = values[[2]],
mask_times = values[[3]],
mask_features = values[[4]]
)
# Test that masking values are the same
expect_equal(tensor_to_numpy(y[[2]]), tensor_to_numpy(values[[2]]))
expect_equal(tensor_to_numpy(y[[3]]), tensor_to_numpy(values[[3]]))
# Test the correct size of the new masking on feature level
expect_equal(dim(tensor_to_numpy(y[[4]]))[3], n_filter)
# Test that padding is not destroyed
y_2 <- masking_layer(y[[1]])
expect_equal(tensor_to_numpy(y[[2]]), tensor_to_numpy(y_2[[2]]))
expect_equal(tensor_to_numpy(y[[3]]), tensor_to_numpy(y_2[[3]]))
expect_equal(tensor_to_numpy(y[[4]]), tensor_to_numpy(y_2[[4]]))
# Test that values do not change at random for same input
y_1 <- layer(
x = values[[1]],
seq_len = values[[2]],
mask_times = values[[3]],
mask_features = values[[4]]
)
y_2 <- layer(
x = values[[1]],
seq_len = values[[2]],
mask_times = values[[3]],
mask_features = values[[4]]
)
expect_equal(tensor_to_numpy(y_1[[1]]), tensor_to_numpy(y_2[[1]]))
# Test method return shape
expect_equal(dim(tensor_to_numpy(y_1[[1]])), c(length(sequence_length), times, n_filter))
})
# Layer layer_mutiple_n_gram_convolution--------------------------------------------------
test_that("layer_mutiple_n_gram_convolution", {
device <- ifelse(torch$cuda$is_available(), "cuda", "cpu")
pad_value <- sample(x = seq(from = -200, to = -10, by = 10), size = 1)
times <- sample(x = seq(from = 3, to = 10, by = 1), size = 1)
features <- sample(x = seq(from = 3, to = 1024, by = 1), size = 1)
sequence_length <- sample(x = seq(from = 1, to = times, by = 1), size = 30, replace = TRUE)
example_tensor <- generate_tensors(
times = times,
features = features,
seq_len = sequence_length,
pad_value = pad_value
)$to(device)
masking_layer <- py$masking_layer(pad_value)$to(device)
values <- masking_layer(example_tensor)
for (max_n_gram in 3:times) {
layer <- py$layer_mutiple_n_gram_convolution(
ks_min = 2L,
ks_max = as.integer(max_n_gram),
times = as.integer(times),
features = as.integer(features),
pad_value = as.integer(pad_value),
bias = TRUE,
parametrizations = "None",
dtype = values[[1]]$dtype,
device = device,
act_fct_name = "ELU"
)$to(device)
layer$eval()
y <- layer(
x = values[[1]],
seq_len = values[[2]],
mask_times = values[[3]],
mask_features = values[[4]]
)
# Test that masking values are the same
expect_equal(tensor_to_numpy(y[[2]]), tensor_to_numpy(values[[2]]))
expect_equal(tensor_to_numpy(y[[3]]), tensor_to_numpy(values[[3]]))
# Test the correct size of the new masking on feature level
expect_equal(dim(tensor_to_numpy(y[[4]]))[3], features)
# Test that padding is not destroyed
y_2 <- masking_layer(y[[1]])
expect_equal(tensor_to_numpy(y[[2]]), tensor_to_numpy(y_2[[2]]))
expect_equal(tensor_to_numpy(y[[3]]), tensor_to_numpy(y_2[[3]]))
expect_equal(tensor_to_numpy(y[[4]]), tensor_to_numpy(y_2[[4]]))
# Test that values do not change at random for same input
y_1 <- layer(
x = values[[1]],
seq_len = values[[2]],
mask_times = values[[3]],
mask_features = values[[4]]
)
y_2 <- layer(
x = values[[1]],
seq_len = values[[2]],
mask_times = values[[3]],
mask_features = values[[4]]
)
expect_equal(tensor_to_numpy(y_1[[1]]), tensor_to_numpy(y_2[[1]]))
# Test method return shape
expect_equal(dim(tensor_to_numpy(y_1[[1]])), c(length(sequence_length), times, features))
}
})
# Layer merge leyer---------------------------------------
test_that("merge_layer", {
device <- ifelse(torch$cuda$is_available(), "cuda", "cpu")
pad_value <- sample(x = seq(from = -200, to = -10, by = 10), size = 1)
times <- sample(x = seq(from = 3, to = 10, by = 1), size = 1)
features <- sample(x = seq(from = 52, to = 1024, by = 1), size = 1)
sequence_length <- sample(x = seq(from = 1, to = times, by = 1), size = 30, replace = TRUE)
example_tensor <- generate_tensors(
times = times,
features = features,
seq_len = sequence_length,
pad_value = pad_value
)$to(device)
n_input_streams <- sample(seq(from = 2, to = 10, by = 1), size = 1)
n_extracted_features <- sample(seq(from = 2, to = features, by = 1), size = 1)
masking_layer <- py$masking_layer(pad_value)$to(device)
values <- masking_layer(example_tensor)
for (attention_type in c("MultiHead", "Fourier")) {
for (pooling_type in c("Max", "Min", "MinMax")) {
layer <- py$merge_layer(
times = as.integer(times),
features = as.integer(features),
n_extracted_features = as.integer(n_extracted_features),
n_input_streams = as.integer(n_input_streams),
pad_value = as.integer(pad_value),
pooling_type = pooling_type,
attention_type = attention_type,
num_heads = 1L,
dtype = values[[1]]$dtype,
device = device
)$to(device)
layer$eval()
y <- layer(
tensor_list = rep(values[1], times = n_input_streams),
seq_len = values[[2]],
mask_times = values[[3]],
mask_features = values[[4]]
)
# Test the correct shape
expect_equal(dim(tensor_to_numpy(y)), c(length(sequence_length), n_extracted_features))
}
}
})
# Layer rnn preparation Layer---------------------------------------
test_that("rnn_preparation", {
device <- ifelse(torch$cuda$is_available(), "cuda", "cpu")
pad_value <- sample(x = seq(from = -200, to = -10, by = 10), size = 1)
times <- sample(x = seq(from = 3, to = 10, by = 1), size = 1)
features <- sample(x = seq(from = 3, to = 1024, by = 1), size = 1)
sequence_length <- sample(x = seq(from = 1, to = times, by = 1), size = 30, replace = TRUE)
example_tensor <- generate_tensors(
times = times,
features = features,
seq_len = sequence_length,
pad_value = pad_value
)$to(device)
masking_layer <- py$masking_layer(pad_value)$to(device)
values <- masking_layer(example_tensor)
layer_do <- py$layer_pack_and_masking()$to(device)
layer_do$eval()
layer_undo <- py$layer_unpack_and_masking(
sequence_length = as.integer(times),
pad_value = pad_value
)$to(device)
layer_undo$eval()
y <- layer_do(
x = values[[1]],
seq_len = values[[2]],
mask_times = values[[3]],
mask_features = values[[4]]
)
y <- layer_undo(
x = y[[1]],
seq_len = y[[2]],
mask_times = y[[3]],
mask_features = y[[4]]
)
# Test that masking values are the same
expect_equal(tensor_to_numpy(y[[2]]), tensor_to_numpy(values[[2]]))
expect_equal(tensor_to_numpy(y[[3]]), tensor_to_numpy(values[[3]]))
# Test the correct size of the new masking on feature level
expect_equal(dim(tensor_to_numpy(y[[4]]))[3], features)
# Test that padding is not destroyed
y_2 <- masking_layer(y[[1]])
expect_equal(tensor_to_numpy(y[[2]]), tensor_to_numpy(y_2[[2]]))
expect_equal(tensor_to_numpy(y[[3]]), tensor_to_numpy(y_2[[3]]))
expect_equal(tensor_to_numpy(y[[4]]), tensor_to_numpy(y_2[[4]]))
})
test_that("layer_class_mean", {
device <- ifelse(torch$cuda$is_available(), "cuda", "cpu")
layer <- py$layer_class_mean()$to(device)
test_tensor <- matrix(
data = c(
0.59, 0.71, 0.51,
0.45, -1, -0.77,
0.27, 0.11, -0.99,
-0.15, -0.61, -0.89,
0.39, 0.57, -0.31,
0.57, 0.21, 0.46,
-0.59, 0.44, -0.06,
-0.85, 0.45, 0.94,
0.1, -0.36, 0.49
), nrow = 9, ncol = 3, byrow = TRUE
)
test_tensor <- reticulate::np_array(test_tensor)
test_tensor <- torch$from_numpy(
test_tensor
)
test_classes <- torch$from_numpy(reticulate::np_array(c(0, 0, 0, 1, 1, 1, 2, 2, 2))$copy())
num_classes <- 3
result_matrix <- matrix(
data = c(
0.436667, -0.060000, -0.416667,
0.270000, 0.056667, -0.246667,
-0.446667, 0.176667, 0.456667
),
nrow = 3, ncol = 3, byrow = TRUE
)
cls_means <- layer(
x = test_tensor$to(device),
classes = test_classes$to(device),
total_classes = 3L
)
expect_equal(
object = tensor_to_numpy(cls_means),
expected = result_matrix,
tolerance = 1e-5
)
})
# layer_global_average_pooling_1d------------------------------------------------
test_that("layer_global_average_pooling_1d", {
device <- ifelse(torch$cuda$is_available(), "cuda", "cpu")
pad_value <- sample(x = seq(from = -200, to = -10, by = 10), size = 1)
times <- sample(x = seq(from = 3, to = 10, by = 1), size = 1)
features <- sample(x = seq(from = 3, to = 1024, by = 1), size = 1)
sequence_length <- sample(x = seq(from = 1, to = times, by = 1), size = 30, replace = TRUE)
example_tensor <- generate_tensors(
times = times,
features = features,
seq_len = sequence_length,
pad_value = pad_value
)$to(device)
masking_layer <- py$masking_layer(pad_value)$to(device)
values <- masking_layer(example_tensor)
layer <- py$layer_global_average_pooling_1d(mask_type = "mask")$to(device)
results <- layer(x = values[[1]], mask = values[[3]])
true_mean_source <- tensor_to_numpy(example_tensor)
true_mean_source <- replace(x = true_mean_source, true_mean_source == pad_value, values = 0)
true_mean <- matrix(data = 0, nrow = 30, ncol = features)
for (b in seq(30)) {
for (t in 1:times) {
for (f in 1:features) {
true_mean[b, f] <- true_mean_source[b, t, f] + true_mean[b, f]
}
}
}
true_mean <- true_mean / sequence_length
expect_equal(
object = tensor_to_numpy(results),
expected = true_mean,
tolerance = 1e-7
)
})
# Monitor test time
monitor_test_time_on_CI(
start_time = test_time_start,
test_name = "02_02_pytorch_layers"
)
Any scripts or data that you put into this service are public.
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.