tests/testthat/test-vpc.R

skip_if_not(localstack_available(), "LocalStack Not Available")

Sys.setenv(AWS_PROFILE = "localstack")
purge_sec_grps <- function() {
  groups <- aws_vpc_security_groups()
  map(
    purrr::map_vec(groups$SecurityGroups, "GroupId"),
    \(id) con_ec2()$delete_security_group(GroupId = id)
  )
}

test_that("aws_vpcs", {
  res <- aws_vpcs()

  expect_type(res, "list")
  expect_named(res, c("NextToken", "Vpcs"))
  expect_equal(length(res$Vpcs), 1)
  expect_equal(res$Vpcs[[1]]$InstanceTenancy, "default")
})

test_that("aws_vpc", {
  vpclist <- aws_vpcs()
  res <- aws_vpc(vpclist$Vpcs[[1]]$VpcId)

  expect_type(res, "list")
  expect_named(res, c("NextToken", "Vpcs"))
  expect_equal(length(res$Vpcs), 1)
  expect_equal(res$Vpcs[[1]]$InstanceTenancy, "default")
})

test_that("aws_vpc_security_groups", {
  purge_sec_grps()
  out <- aws_vpc_security_groups()

  expect_type(out, "list")
  expect_length(out$SecurityGroups, 0)
})

test_that("aws_vpc_security_group", {
  purge_sec_grps()
  z <- aws_vpc_security_group_create(
    name = "testing2",
    description = "Testing security group creation"
  )
  out <- aws_vpc_security_group(z$GroupId)

  expect_type(out, "list")
  expect_length(out$SecurityGroups, 1)
  expect_equal(out$SecurityGroups[[1]]$GroupName, "testing2")
})

test_that("aws_vpc_security_group_create", {
  purge_sec_grps()
  group <- aws_vpc_security_group_create(name = "atest")
  expect_type(group, "list")
  expect_named(group, c("GroupId", "Tags", "SecurityGroupArn"))
  expect_match(group$GroupId, "sg-")
  expect_length(group$Tags, 0)

  created_group <- aws_vpc_security_group(group$GroupId)

  expect_type(created_group, "list")
  # mariadb is default `engine` value, used in description if
  # description is NULL
  expect_match(created_group$SecurityGroups[[1]]$Description, "mariadb")
})

test_that("aws_vpc_security_group_create - with tags", {
  purge_sec_grps()
  group <- aws_vpc_security_group_create(
    name = "testing4",
    tags = list(
      list(
        ResourceType = "security-group",
        Tags = list(
          list(
            Key = "sky",
            Value = "blue"
          )
        )
      )
    )
  )

  expect_type(group, "list")
  expect_length(group$Tags, 1)
  expect_named(group$Tags[[1]], c("Key", "Value"))

  created_group <- aws_vpc_security_group(group$GroupId)
  tags <- created_group$SecurityGroups[[1]]$Tags

  expect_type(created_group, "list")
  expect_length(tags, 1)
  expect_named(tags[[1]], c("Key", "Value"))
  expect_equal(tags[[1]]$Key, "sky")
  expect_equal(tags[[1]]$Value, "blue")
})

test_that("aws_vpc_security_group_ingress", {
  # nolint start
  # ipperms <- ip_permissions_generator("mariadb")
  # ipperms$IpRanges[[1]]$CidrIp <- "123.156.222.198/32"
  # ipperms$IpRanges[[1]]$Description <- "Access for someuser from sixtyfour"
  # save(ipperms, file = "tests/testthat/ipperms.rda", version = 2)
  # nolint end
  load("ipperms.rda")

  expect_error(aws_vpc_security_group_ingress())

  purge_sec_grps()
  group <- aws_vpc_security_group_create(name = "testing423")
  out <- aws_vpc_security_group_ingress(
    id = group$GroupId,
    ip_permissions = ipperms
  )

  expect_type(out, "list")
  expect_named(out, c("Return", "SecurityGroupRules"))
  expect_true(out$Return)
  # security group rule starts with "sgr-"
  expect_match(out$SecurityGroupRules[[1]]$SecurityGroupRuleId, "sgr-")
  # security group starts with just "sg-"
  expect_match(out$SecurityGroupRules[[1]]$GroupId, "sg-")
})

test_that("aws_vpc_sg_with_ingress", {
  expect_error(aws_vpc_sg_with_ingress())

  res <- aws_vpc_sg_with_ingress("mariadb")
  expect_type(res, "character")
  expect_match(res, "sg-")

  out <- aws_vpc_security_group(res)
  expect_type(out, "list")
  expect_length(out$SecurityGroups, 1)
  expect_match(out$SecurityGroups[[1]]$GroupName, "mariadb-")

  res_mysql <- aws_vpc_sg_with_ingress("mysql")
  out_mysql <- aws_vpc_security_group(res_mysql)
  expect_match(out_mysql$SecurityGroups[[1]]$GroupName, "mysql-")
})

test_that("security_group_handler", {
  # missing `id` param
  expect_error(security_group_handler())
  # if `id` given and not `NULL`, returns itself
  an_id <- 123
  expect_equal(security_group_handler(an_id), an_id)
  # if `id` given and IS `NULL`, errors b/c `engine` missing
  expect_error(security_group_handler(NULL))
  # if `id` given, engine value not supported
  expect_error(security_group_handler(NULL, engine = "asdff"))
})

test_that("aws_vpc_sec_group_rules_mod works for ipv6 address", {
  local_mocked_bindings(
    .ip_address = function() {
      as.character(ipaddress::sample_ipv6(1))
    }
  )

  a_grp_name <- random_string("vpcsecgroup")
  x <- aws_vpc_security_group_create(name = a_grp_name)

  perms <- ip_permissions_generator("mariadb")

  expect_true(
    ipaddress::is_ipv6(
      ipaddress::as_ip_network(perms$Ipv6Ranges[[1]]$CidrIpv6)
    )
  )

  a_rule <- aws_vpc_security_group_ingress(
    id = x$GroupId,
    ip_permissions = perms
  )

  expect_type(a_rule, "list")
  expect_named(a_rule, c("Return", "SecurityGroupRules"))
  expect_true(a_rule$Return)
})

test_that("aws_vpc_sec_group_rules_mod works for ipv4 address", {
  local_mocked_bindings(
    .ip_address = function() {
      as.character(ipaddress::sample_ipv4(1))
    }
  )

  a_grp_name <- random_string("vpcsecgroup")
  x <- aws_vpc_security_group_create(name = a_grp_name)

  perms <- ip_permissions_generator("mariadb")

  expect_true(
    ipaddress::is_ipv4(
      ipaddress::as_ip_network(perms$IpRanges[[1]]$CidrIp)
    )
  )

  a_rule <- aws_vpc_security_group_ingress(
    id = x$GroupId,
    ip_permissions = perms
  )

  expect_type(a_rule, "list")
  expect_named(a_rule, c("Return", "SecurityGroupRules"))
  expect_true(a_rule$Return)
})

# cleanup
Sys.unsetenv("AWS_PROFILE")

Try the sixtyfour package in your browser

Any scripts or data that you put into this service are public.

sixtyfour documentation built on April 3, 2025, 8:22 p.m.