R/particle_cloud.R

Defines functions make_particle_cloud_request

# functions to interact with the particle cloud variables --------

# helper function to make a particle cloud request
# @param endpoint the url endpoint for the request, typically devices/<device_id>/variable or devices/<device_id>/function
# @param arg request argument --> required for function calls! even if just \code{character(0)}
# @param timeout how long to wait for curl request
# @param nr which request this is (purely for info messages)
# @param total total number of requests (purely for info messages)
# @note consider implementing an asynchronious version with curl_fetch_multi (if possible to integrate well into shiny app.)
make_particle_cloud_request <- function(endpoint, arg = NULL, nr = NULL, total = NULL, timeout = default(request_timeout), access_token = default(access_token), quiet = default(quiet)) {

  # safety checks
  if (nchar(access_token) == 0) stop("missing access token", call. = FALSE)

  # request
  handle <- new_handle(timeout = timeout)
  if (is.null(arg)) {
    request <- sprintf("https://api.particle.io/v1/%s?access_token=%s", endpoint, access_token)
  } else if (is.character(arg) && length(arg) <= 1) {
    request <- sprintf("https://api.particle.io/v1/%s", endpoint)
    post <- sprintf("access_token=%s&arg=%s", access_token, utils::URLencode(arg))
  } else {
    stop("something amiss with arg: ", arg, call. = FALSE)
  }

  if (!quiet) {
    glue("\nInfo: making cloud request ",
         if(!is.null(nr) && !is.null(total)) "{nr}/{total} " else "",
         "('{endpoint}'",
         if(!is.null(arg)) " with arg '{arg}'" else "",
         ")... ") %>%
      message(appendLF = FALSE)
  }

  # generate curl handle
  result <-
    tryCatch(
      handle %>%
        {
          # POST?
          if (!is.null(arg)) handle_setopt(., copypostfields = post)
          else  .
        } %>%
        # make request
        curl_fetch_memory(request, handle = .) %>%
        { rawToChar(.$content) } %>%
        fromJSON(),
      error = function(e) {
        if (str_detect(e$message, ":")) {
          return(list(error =  str_extract(e$message, "^[^:]*"), error_details = str_extract(e$message, "[^:\\s][^:]*$")))
        } else {
          return(list(error = e$message))
        }
      }
    )

  if (!is.null(result$error)) {
    if (!quiet) glue("failed.") %>% message()
    glue("Warning: encountered the following error: {result$error}") %>%
      message()
  } else if (!quiet) {
    glue("successful.") %>% message()
  }

  return(result)
}

#' Get general device information
#'
#' Get information from the particle cloud about devices.
#'
#' @param devices data frame with devices for which to get cloud info, by default all devices associated with the group
#' @param include_unregistered whether to provide cloud info for devices that are not registered in the database (no by default)
#' @param particle_id the ID(s) of the particle device(s)
#' @param access_token the access token for the accout
#' @return nested data frame (converted from JSON)
# @ note: consider making a function to udpate particle ids in the DB from here (overkill? since state/data logs cause update too)
#' @export
ll_get_devices_cloud_info <- function(devices = ll_get_devices(group_id = group_id, con = con), include_unregistered = FALSE, group_id = default(group_id), con = default(con), access_token = default(access_token), convert_to_TZ = Sys.timezone(), quiet = default(quiet)) {

  # safety checks
  if (!is.data.frame(devices) | !all(c("particle_id", "device_name") %in% names(devices)))
    stop("devices needs to be supplied as a data frame with columns (at the least) 'particle_id' and 'device_name'", call. = FALSE)

  # request general info
  info <- make_particle_cloud_request(
    endpoint = "devices",
    access_token = access_token,
    quiet = quiet
  )

  if (is.data.frame(info) && nrow(info) > 0) {
    if (include_unregistered)
      devices <- devices %>% full_join(info, by = c("device_name" = "name"))
    else
      devices <- devices %>% left_join(info, by = c("device_name" = "name"))

    # check for missing particle IDs
    probs <- filter(devices, !is.na(device_id), !is.na(id), is.na(particle_id) | particle_id != id)
    if (nrow(probs) > 0) {
     glue::glue(
       "some registered devices ({paste(probs$device_name, collapse = ', ')}) ",
       "particle ids are not yet updated in the database and will be updated now...") %>%
        warning(immediate. = TRUE, call. = FALSE)
      # NOTE: this will NOT automatically update the devices in the data manager which
      # requires a "Refresh", but particle_id updates should happen rarely enough that we're
      # just not going to worry about it, even if this means that this update may run
      # multiple times (unnecessarily) until devices are refreshed
      devices <- update_device_particle_id(devices, con = con, quiet = quiet)
    }

    # check for registered devices not listed in the cloud
    probs <- filter(devices, !is.na(device_id), is.na(id))
    if (nrow(probs) > 0) {
      glue::glue(
        "some registered devices ({paste(probs$device_name, collapse = ', ')}) ",
        "do not seem to exist in the cloud") %>%
        warning(immediate. = TRUE, call. = FALSE)
    }

    devices <- devices %>% select(-id) %>%
      mutate(registered = !is.na(device_id))

    # time zone
    devices <- devices %>% mutate(last_heard = ymd_hms(last_heard)) %>%
    {
      if (!is.null(convert_to_TZ)) mutate(., last_heard = with_tz(last_heard, convert_to_TZ))
      else .
    }
  } else {
    warning("no information retrieved from cloud", immediate. = TRUE, call. = FALSE)
  }

  return(devices)
}

# helper function for cloud variable request
get_devices_cloud_variable <- function(devices, variable, access_token = default(access_token), quiet = default(quiet)) {
  # safety checks
  if (!is.data.frame(devices) | !all(c("particle_id", "device_name") %in% names(devices)))
    stop("devices needs to be supplied as a data frame with columns (at the least) 'particle_id' and 'device_name'", call. = FALSE)

  # request state
  ..device_variable.. <- variable
  devices %>%
    mutate(
      lists = map2(
        particle_id, dplyr::row_number(),
        ~make_particle_cloud_request(
          endpoint = sprintf("devices/%s/%s", .x, ..device_variable..),
          nr = .y,
          total = nrow(devices),
          access_token = access_token,
          quiet = quiet
        )
      )
    ) %>%
    unpack_lists_tibble(unnest_single_values = TRUE, unpack_sub_lists = TRUE, nest_into_data_frame = FALSE)
}

# helper function to unpack cloud variable result
unpack_cloud_variable_result <- function(var_data, data_column, renames = c(), convert_to_TZ = Sys.timezone(), spread_function = NULL) {

  var_data <- mutate(var_data, ..rowid.. = dplyr::row_number())
  data_column_quo <- enquo(data_column)

  # unpack state data
  if (nrow(var_data) > 0 && "result" %in% names(var_data)) {
    var_data_unpacked <-
      var_data %>%
      select(..rowid.., result) %>%
      mutate(result = map(result, ~if(!is.na(.x)) {
        tryCatch(fromJSON (fix_truncated_JSON(.x)), error = function(e) { warning("problems parsing JSON - ", e$message, immediate. = TRUE, call. = FALSE); list() })
      } else list())) %>%
      unpack_lists_tibble(result)

    # check if there is any data
    if (quo_text(data_column_quo) %in% names(var_data_unpacked)) {
      var_data_unpacked <- var_data_unpacked %>% unnest(!!data_column_quo)

      # only rename what exists
      renames <- renames[unname(renames) %in% names(var_data_unpacked)]

      if (nrow(var_data_unpacked) > 0) {
        var_data_unpacked <-
          var_data_unpacked %>%
          rename(!!!renames) %>%
          mutate(datetime = ymd_hms(datetime)) %>%
          {
            if (!is.null(convert_to_TZ)) mutate(., datetime = with_tz(datetime, convert_to_TZ))
            else .
          }

        if (!is.null(spread_function)) {
          var_data_unpacked <- spread_function(var_data_unpacked)
        }

    }

      var_data <- left_join(var_data %>% select(-result), var_data_unpacked, by = "..rowid..")
    }
  }

  return (select(var_data, -..rowid..))
}

# helper function for truncated JSON
# closes unclosed \", ] and } as well as removes terminal ,
# note
fix_truncated_JSON <- function(json) {

  # close quotes if it's an odd number of quotes
  if (stringr::str_count(json, "\\\"") %% 2 > 0)
    json <- paste0(json, "\"")
  # make sure it doesn't end on a comma that doesn't have any follow-up
  else if (stringr::str_sub(json, -1) == ",")
    json <- stringr::str_sub(json, 1, -2)
  # make sure doesn't end on a key without a value
  else if (stringr::str_detect(json, ",?\"[^\"]+\":?$"))
    json <- stringr::str_replace(json, ",?\"[^\"]+\":?$", "")

  # close missing parentheses
  open_brackets <- stringr::str_extract_all(json, "[\\{\\[]")[[1]]
  close_brackets <- stringr::str_extract_all(json, "[\\}\\]]")[[1]]
  if (length(open_brackets) > length(close_brackets)) {
    missing_brackets <- open_brackets[1:(length(open_brackets) - length(close_brackets))]
    matching_brackets <- c("{" = "}", "[" = "]")
    json <- paste0(json, paste(matching_brackets[rev(missing_brackets)], collapse = ""))
  }
  test <<- json
  return(json)
}

#' Get device state
#'
#' Get state from the particle cloud for devices.
#' @inheritParams ll_get_devices_cloud_info
#' @inheritParams ll_get_device_state_logs
#' @param spread whether to convert the state data into wide format (note that this combines value and units columns!)
#' @return nested data frame (converted from JSON)
ll_get_devices_cloud_state <-
  function(devices = ll_get_devices(group_id = group_id, con = con),
           group_id = default(group_id),
           con = default(con),
           access_token = default(access_token),
           convert_to_TZ = Sys.timezone(),
           spread = FALSE,
           quiet = default(quiet)) {

    if (nrow(devices) == 0) return(tibble())

    devices %>%
      # request state info
      get_devices_cloud_variable(
        variable = "state",
        access_token = access_token,
        quiet = quiet
      ) %>%
      # unpack state data
      unpack_cloud_variable_result(
        data_column = s,
        renames = c(datetime = "dt", key = "k", value = "v", units = "u"),
        convert_to_TZ = convert_to_TZ,
        spread_function = if (spread) spread_state_columns else NULL
      )
  }

#' Get device data
#'
#' Get latest data from the particle cloud for devices.
#' @inheritParams ll_get_devices_cloud_info
#' @inheritParams ll_get_device_state_logs
#' @param spread whether to convert the state data into wide format (note that this combines key and index, as well as, value and units columns!)
#' @return nested data frame (converted from JSON)
ll_get_devices_cloud_data <-
  function(devices = ll_get_devices(group_id = group_id, con = con),
           group_id = default(group_id),
           con = default(con),
           access_token = default(access_token),
           convert_to_TZ = Sys.timezone(),
           spread = FALSE,
           quiet = default(quiet)) {

    if (nrow(devices) == 0) return(tibble())

    devices %>%
      # request live data info
      get_devices_cloud_variable(
        variable = "data",
        access_token = access_token,
        quiet = quiet
      ) %>%
      # unpack live data
      unpack_cloud_variable_result(
        data_column = d,
        renames = c(datetime = "dt", idx = "i", key = "k", value = "v", units = "u"),
        convert_to_TZ = convert_to_TZ,
        spread_function = if (spread) spread_data_columns else NULL
      ) %>%
      # add  missing error
      { if (!"error" %in% names(.)) mutate(., error = NA_character_) else . } %>%
      # add missing datetime
      { if (!"datetime" %in% names(.)) mutate(., datetime = NA_real_) else . } %>%
      # rename raw data
      { if ("r" %in% names(.)) rename(., raw_serial = r) else mutate(., raw_serial = NA_character_) } %>%
      { if ("e" %in% names(.)) rename(., raw_serial_errors = e) else mutate(., raw_serial_errors = NA_character_) } %>%
      # add missing data fields
      { if (!spread && !"idx" %in% names(.)) mutate(., idx = NA_integer_) else . } %>%
      { if (!spread && !"key" %in% names(.)) mutate(., key = NA_character_) else . } %>%
      { if (!spread && !"value" %in% names(.)) mutate(., value = NA_real_) else . } %>%
      { if (!spread && !"units" %in% names(.)) mutate(., units = NA_character_) else . } %>%
      # arrange
      { if (spread) arrange(., device_name) else arrange(., device_name, idx) }
  }

#' Test which values one gets for a set of experiment devices
#' @param experiment_device_links experiment_device_links records, see \link{ll_get_experiment_device_links}
ll_test_experiment_device_links <- function(experiment_device_links, spread = FALSE, access_token = default(access_token), quiet = default(quiet)) {

  if (!"particle_id" %in% names(experiment_device_links))
    stop("particle_id is a required column in experiment_device_links data frame", call. = FALSE)
  if (!"device_name" %in% names(experiment_device_links))
    stop("device_name is a required column in experiment_device_links data frame", call. = FALSE)
  if (!"data_idx" %in% names(experiment_device_links))
    stop("data_idx is a required column in experiment_device_links data frame", call. = FALSE)

  data <- ll_get_devices_cloud_data(devices = experiment_device_links %>% select(particle_id, device_name) %>% unique(), spread = FALSE)
  if (nrow(data) > 0) {
    data <- select(data, particle_id, device_name, datetime, raw_serial, raw_serial_errors, idx, key, value, units)
    experiment_device_links %>%
      rename(idx = data_idx) %>%
      left_join(data, by = c("particle_id", "device_name", "idx")) %>%
      { if (spread) spread_data_columns(.) else . }
  } else {
    experiment_device_links
  }
}

#' Cloud data / experiment links summary
#'
#' Utility function to combine experimental device links with devices cloud data. Will join by device_name, device_id or both, depending on which of these are in both the cloud_data and experiment_device_links tables.
#'
#' @param experiment_device_links the links between devices and experiments, see \link{ll_get_experiment_device_links}
#' @param linked whether to include linked data
#' @param unlinked whether to include unlinked data
#' @export
ll_summarize_cloud_data_experiment_links <- function(
  cloud_data = tibble(),
  experiment_device_links = tibble(),
  linked = TRUE, unlinked = TRUE) {

  # remove exp_device_data_ids because they interfere with summarizing, and particle_id if it exists because we want it from the cloud data instead
  experiment_device_links <- experiment_device_links[!names(experiment_device_links) %in% c("exp_device_data_id", "particle_id")]

  # make sure empty cloud data or device links have the necessary columns
  if (nrow(experiment_device_links) == 0) {
    experiment_device_links <- tibble(
      exp_id = character(), recording = logical(),
      device_id = integer(), device_name = character(),
      data_idx = integer(), active = logical())
  }
  experiment_device_links <- rename(experiment_device_links, idx = data_idx)

  if (nrow(cloud_data) == 0) {
    cloud_data <- tibble(
      particle_id = character(), device_id = integer(), device_name = character(),
      datetime = as.POSIXct(numeric(), origin = "1960-01-01"),
      error = character(),
      raw_serial = character(), raw_serial_errors = character(),
      idx = integer(), key = character(), value = character(), units = character()
    )
  }

  # figure out join by
  join_by <- c()
  if ("device_id" %in% names(experiment_device_links) && "device_id" %in% names(cloud_data))
    join_by <- c(join_by, "device_id")
  if ("device_name" %in% names(experiment_device_links) && "device_name" %in% names(cloud_data))
    join_by <- c(join_by, "device_name")

  if (length(join_by) == 0) stop("neither device_id nor device_name", call. = TRUE)

  cloud_data <- cloud_data %>%
    select(particle_id, !!join_by, device_name, datetime, error, raw_serial, raw_serial_errors, idx, key, value, units)

  full_join(
    cloud_data %>% filter(is.na(error)) %>% select(-error),
    experiment_device_links %>% filter(active),
    by = c(join_by, "idx")) %>%
    # add error info from cloud data to the existing links
    left_join(cloud_data %>% filter(!is.na(error)) %>% select(!!join_by, error), by = join_by) %>%
    # add info from cloud data for devices that have no existing links at all
    {
      bind_rows(., dplyr::anti_join(filter(cloud_data, !is.na(error)), ., by = join_by))
    } %>%
    filter(linked & !is.na(exp_id) | unlinked & is.na(exp_id)) %>%
    nest(..exp_data.. = c(exp_id, recording)) %>%
    mutate(
      recording_exp_ids = map_chr(..exp_data.., ~filter(.x, recording)$exp_id %>% { if(length(.) > 0) glue::glue_collapse(., sep = ", ") else NA_character_ }),
      non_recording_exp_ids = map_chr(..exp_data.., ~filter(.x, !recording)$exp_id %>% { if(length(.) > 0) glue::glue_collapse(., sep = ", ") else NA_character_ })
    ) %>%
    select(-..exp_data..) %>%
    select(particle_id, !!join_by, datetime, error, everything())
}

# functions to interact with particle cloud commands =====

# helper function for cloud function calls
call_devices_cloud_function <- function(devices, func = "device", arg = "", access_token = default(access_token), quiet = default(quiet)) {
  # safety checks
  if (!is.data.frame(devices) | !all(c("particle_id", "device_name") %in% names(devices)))
    stop("devices needs to be supplied as a data frame with columns (at the least) 'particle_id' and 'device_name'", call. = FALSE)

  # request state
  devices %>%
    mutate(
      lists = map2(
        particle_id, dplyr::row_number(),
        ~make_particle_cloud_request(
          endpoint = sprintf("devices/%s/%s", .x, !!func),
          arg = !!arg,
          nr = .y,
          total = nrow(devices),
          access_token = access_token,
          quiet = quiet
        )
      )
    ) %>%
    unpack_lists_tibble(unnest_single_values = TRUE, unpack_sub_lists = TRUE, nest_into_data_frame = FALSE)
}

#' Send device commands to the cloud
#'
#' Send commands to one or more devices.
#'
#' @param devices data frame with devices for which to issue commands
#' @param command to send
#' @param message message to add to command
#' @param access_token the access token for the accout
#' @return nested data frame (converted from JSON)
# @ note: consider making a function to udpate particle ids in the DB from here (overkill? since state/data logs cause update too)
#' @export
ll_send_devices_command <- function(devices, command, message = "", access_token = default(access_token), quiet = default(quiet)) {

  if (nchar(message) > 0) command <- paste(command, message)

  # return codes
  return_codes <- get_device_command_return_values()

  # call cloud function
  devices %>%
    # add command
    mutate(
      command = !!command,
    ) %>%
    call_devices_cloud_function(
      func = "device", arg = command,
      access_token = access_token, quiet = quiet
    ) %>%
    # add  missing error
    { if (!"error" %in% names(.)) mutate(., error = NA_character_) else . } %>%
    # add  missing return value
    { if (!"return_value" %in% names(.)) mutate(., return_value = NA_integer_) else . } %>%
    # add return message
    mutate(
      return_message = case_when(
        is.na(return_value) ~ error,
        as.character(return_value) %in% names(return_codes) ~ return_codes[as.character(return_value)],
        return_value < 0 ~ "Unknown error",
        return_value > 0 ~ "Unknown warning",
        TRUE ~ "Undefined behaviour",
      ) %>% {
        ifelse(!is.na(return_value), str_c(., " (code ", return_value, ")"), .)
      }
    )
}

# helper function for device return values
get_device_command_return_values <- function() {
  # details at https://github.com/KopfLab/labware_photon/blob/master/DeviceCommands.h
  c(
     `0` = "Success",
    `-1` = "Undefined error",
    `-2` = "Device locked",
    `-3` = "Invalid command",
    `-4` = "Invalid command value",
    `-5` = "Invalid command units",
     `1` = "State already as requested"
  )
}
KopfLab/lablogger documentation built on July 6, 2023, 12:03 a.m.