R/read_embrace_plus.R

Defines functions read_raw_embrace_plus read_aggregated_embrace_plus read_embrace_plus create_dataframes get_timestamp_column

Documented in read_embrace_plus

#' Extract time column from Embrace Plus data
#' @description Extracts the time column from Embrace Plus data
#' @param start_time start time of the recording in seconds
#' @param sampling_freq sampling frequency of the recording
#' @param len_list length of the list
#' @param tz timezone
#' @keywords internal
#' @noRd
get_timestamp_column <- function(start_time, sampling_freq, len_list, tz) {
  start_time_ns <- start_time * 1000
  start_timestamp <- as.POSIXct(start_time_ns / 1e9, origin = "1970-01-01", tz = tz)
  
  # Calculate end_timestamp based on the length of the list and sampling frequency
  end_timestamp <- start_timestamp + as.difftime(len_list / sampling_freq, units = "secs")
  
  # Generate a range of timestamps from start to end with the given frequency
  timestamp_column <- seq(from = start_timestamp, to = end_timestamp, by = 1 / sampling_freq)
  timestamp_df <- data.frame(DateTime = timestamp_column)
  
  # Convert 'timestamp' column back to Unix timestamp in seconds
  timestamp_df$unix_timestamp <- as.numeric(as.POSIXct(timestamp_df$DateTime, origin = "1970-01-01", tz = "UTC"))
  
  # Drop last row
  if (nrow(timestamp_df) > len_list) {
    timestamp_df <- timestamp_df[-nrow(timestamp_df), ]
  }
  
  return(timestamp_df)
}




#' Create dataframe for psychological factors
#' @description Creates a dataframe for psychological factors
#' @param data list of dataframes
#' @param type type of data to extract
#' @param file filename of the original data
#' @param vars variables to extract
#' @param vars_names names to give to the variables
#' @param tz timezone
#' @param timestamp_start vector of 3 elements, containing the names of the columns 
#'   in the data that contain the start time of the recording.
#' @keywords internal
#' @noRd
create_dataframes <- function(data, type, file, vars = c("x", "y", "z"), 
                              var_names = c("x", "y", "z"),
                              timestamp_start = NULL, tz) {
  
  if (!all(vars %in% names(data[[type]]))) {
    stop(sprintf("vars must be in the data, vars found are: %s", 
                 paste0(names(data[[type]]), collapse = ", ")))
  }
  
  if (!type %in% names(data)) {
    stop(sprintf("type must be in the data, types found are: %s", 
                 paste0(names(data), collapse = ", ")))
  }
  
  df <- data.frame()
  
  if (length(data[[type]][[vars[1]]]) == 0) {
    cli_alert_warning(sprintf("Empty %s data in %s", type, file))
  } else {
    for (var in vars) {
      if (length(df) == 0) {
        df <- data.frame(x = data[[type]][[var]])
        names(df) <- c(var_names[which(vars == var)])
      }
      df[[var]] <- data[[type]][[var_names[which(vars == var)]]]
    }
    
    # Handle imuParams
    if (!is.null(data[[type]]$imuParams)) {
      imu_params <- data[[type]]$imuParams
      # Flatten imuParams and add as columns
      for (param_name in names(imu_params)) {
        df[[paste0("imu_", param_name)]] <- imu_params[[param_name]]
      }
    }
    
    if (!is.null(timestamp_start)) {
      if (length (timestamp_start) != 3) {
        stop("timestamp_start must be a vector of length 3")
      }
      
      timestamp_df <- get_timestamp_column(data[[type]][[timestamp_start[1]]], 
                                           data[[type]][[timestamp_start[2]]], 
                                           length(data[[type]][[timestamp_start[3]]]),
                                           tz = tz)
      
      df <- cbind(df, timestamp_df)
    }
  }
  return(df)
}




#' Read Embrace Plus data
#' @description Reads in Embrace Plus data as a list (with EDA, HR, Temp, ACC, BVP, IBI as dataframes), and prepends timecolumns
#' @details This function reads in a zipfile with data from the Embrace Plus device, or
#' a folder with unzipped files. The unzipped files are avro or csv files.
#'
#' The unzipped files are avro or csv files, where avro files are read in with using `sparklyr`, which sets up a local Spark cluster.
#'
#' The function returns an object of class "embrace_plus_data" with a prepended datetime columns.
#' The object contains a list with dataframes from the physiological signals.
#'
#' @param zipfile A zip file as exported by the instrument. Can be aggregated data, or raw data.
#' @param folder A folder with the unzipped files. If this is provided, the zipfile is not used.
#' @param type The type of data contained in the zip file or folder. Either "raw" or "aggregated".
#' @param tz The timezone used by the instrument (defaults to user timezone).
#' @examples
#' \dontrun{
#'  library(wearables)
#'  read_embrace_plus(zipfile = "yourpathtohezipfile.zip")
#'  read_embrace_plus(folder = "/path/to/folder/with/files", type = "aggregated")
#' }
#' @export
#' @import cli
#' @importFrom dplyr pull
read_embrace_plus <- function(zipfile = NULL,
                              folder = NULL,
                              type = "raw",
                              tz = Sys.timezone()) {
  
  # Check if zipfile or folder is provided
  if (is.null(zipfile) && is.null(folder)) {
    cli_abort("Either zipfile or folder must be provided")
  }
  
  # Check if file or folder exist
  if (!is.null(zipfile) && !file.exists(zipfile)) {
    cli_abort("File does not exist")
  }
  
  if (!is.null(folder) && !dir.exists(folder)) {
    cli_abort("Folder does not exist")
  }
  
  # Check type
  if (!type %in% c("raw", "aggregated")) {
    cli_abort("type must be either 'raw' or 'aggregated'")
  }
  
  if (type == "raw") {
    return(read_raw_embrace_plus(zipfile, folder, tz))
  }
  
  if (type == "aggregated") {
    return(read_aggregated_embrace_plus(zipfile, folder, tz))
  }
  
}




#' Extract csv files from data
#' @description Processes .csv files
#' @param zipfile path to zipfile
#' @param folder path to folder
#' @param tz timezone
#' @keywords internal
#' @import cli
#' @noRd
read_aggregated_embrace_plus <- function(zipfile = NULL, folder = NULL, tz) {
  
  if (!is.null(zipfile)) {
    csv_files <- unzip_files(zipfile, "csv")
  }
  
  if (!is.null(folder)) {
    csv_files <- list.files(folder, 
                            pattern = ".csv", 
                            full.names = TRUE,
                            recursive = TRUE)
  }
  
  # Only keep files that contain "digital_biomarkers" in name -
  # prevents top levelsfiles from being read
  csv_files <- csv_files[grep("digital_biomarkers", csv_files)]
  
  # Get the content before .csv and after the last _ (but include -)
  dataset_names <- gsub(".*?([A-Za-z0-9\\-]+)[.]csv", "\\1", csv_files)
  dataset_names <- toupper(dataset_names)
  dataset_names <- gsub("TEMPERATURE", "TEMP", dataset_names)
  dataset_names <- gsub("SLEEP-DETECTION", "SLEEP", dataset_names)
  dataset_names <- gsub("PULSE-RATE", "HR", dataset_names)
  dataset_names <- gsub("MOVEMENT-INTENSITY", "MOVE", dataset_names)
  dataset_names <- gsub("RESPIRATORY-RATE", "RR", dataset_names)
  dataset_names <- gsub("WEARING-DETECTION", "WEAR", dataset_names)
  csv_files <- setNames(csv_files, dataset_names)
  
  csv_list <- list()
  
  for (i in 1:length(csv_files)) {
    
    file <- csv_files[i]
    
    this_file <- read.csv(file, stringsAsFactors = FALSE)
    
    rename_cols <- list(c("timestamp_iso", "DateTime"),
                        c("timestamp_unix", "unix_timestamp"),
                        c("eda_scl_usiemens", "EDA"),
                        c("temperature_celsius", "TEMP"),
                        c("pulse_rate_bpm", "HR"))
    
    for (j in rename_cols) {
      if (j[[1]] %in% colnames(this_file)) {
        names(this_file)[names(this_file) == j[[1]]] <- j[[2]]
      }
    }
    
    # further pre-processing
    this_file$DateTime <- as.POSIXct(this_file$unix_timestamp / 1000, 
                                     origin = "1970-01-01", 
                                     tz = tz)
    
    # If names(file) already exists in csv_list, append to it
    if (names(file) %in% names(csv_list)) {
      csv_list[[names(file)]] <- rbind(csv_list[[names(file)]], this_file)
    } else {
      csv_list[[names(file)]] <- this_file
    }
  }
  
  return(    
    structure(csv_list, 
              class = "embraceplusdata",
              zipfile = tools::file_path_sans_ext(zipfile),
              tz = tz
    )
  )
  
}



#' Extracts avro files from raw data
#' @description Processes .avro files
#' @param zipfile zip file
#' @param folder folder
#' @param tz timezone
#' @keywords internal
#' @import sparklyr
#' @import cli
#' @noRd
read_raw_embrace_plus <- function(zipfile = NULL, folder = NULL , tz) {
  
  # Check for already installed Spark versions
  # if none available, install the latest version
  if (nrow(spark_available_versions()) == 0) {
    cli_alert_info("Installing Spark")
    spark_install(version = tail(spark_available_versions(), 1)$spark)
  }
  
  # Open a local Spark connection
  # Attach avro package to be able to read in avro files
  cli_alert_info("Connecting to local Spark cluster")
  sc <- spark_connect(master = "local",
                      version = tail(spark_available_versions(), 1)$spark,
                      packages = "org.apache.spark:spark-avro_2.12:3.5.0")
  cli_alert_success("Connected!")
  
  if (!is.null(zipfile)) {
    avro_files <- unzip_files(zipfile, type = "avro")
  }
  
  if (!is.null(folder)) {
    avro_files <- list.files(folder, 
                             pattern = ".avro", 
                             full.names = TRUE,
                             recursive = TRUE)
  }
  
  cli_alert_info("About to start processing {length(avro_files)} avro file{?s}")
  
  i <- 0
  cli_progress_step("Processed {i}/{length(avro_files)} {qty(i)}file{?s}",
                    msg_done = "Completed data reading and processing",
                    spinner = TRUE)
  
  for (file in avro_files) {
    
    # Read each avro file into a spark dataframe
    record <-
      spark_read_avro(
        sc,
        "embraceplus",
        file,
        repartition = FALSE,
        memory = FALSE,
        overwrite = TRUE
      )
    
    # Pull the data from the Spark cluster
    raw_data <- pull(record, rawData)
    raw_data <- raw_data[[1]]
    
    acc_data <- create_dataframes(raw_data, 
                                  type = "accelerometer", 
                                  file, 
                                  timestamp_start = c("timestampStart", "samplingFrequency", "x"),
                                  tz = tz)
    # For ACC, add the geometric mean acceleration
    delta_physical <- acc_data$imu_physicalMax - acc_data$imu_physicalMin
    delta_digital <- acc_data$imu_digitalMax - acc_data$imu_digitalMin
    acc_data$x_g = acc_data$x * (delta_physical/delta_digital)
    acc_data$y_g = acc_data$y * (delta_physical/delta_digital)
    acc_data$z_g = acc_data$z * (delta_physical/delta_digital)
    acc_data$a <- sqrt(acc_data$x_g^2 + acc_data$y_g^2 + acc_data$z_g^2) 
    
    gy_data <- create_dataframes(raw_data, 
                                 type = "gyroscope", 
                                 file, 
                                 timestamp_start = c("timestampStart", "samplingFrequency", "x"),
                                 tz = tz)
    
    eda_data <- create_dataframes(raw_data, 
                                  type = "eda",
                                  vars = "values",
                                  var_names = "EDA",
                                  file, 
                                  timestamp_start = c("timestampStart", "samplingFrequency", "values"),
                                  tz = tz)
    
    temp_data <- create_dataframes(raw_data, 
                                   type = "temperature",
                                   vars = "values",
                                   var_names = "TEMP",
                                   file, 
                                   timestamp_start = c("timestampStart", "samplingFrequency", "values"),
                                   tz = tz)
    
    bvp_data <- create_dataframes(raw_data,
                                  type = "bvp",
                                  vars = "values",
                                  var_names = "BVP",
                                  file, 
                                  timestamp_start = c("timestampStart", "samplingFrequency", "values"),
                                  tz = tz)
    
    steps_data <- create_dataframes(raw_data,
                                    type = "steps",
                                    vars = "values",
                                    var_names = "STEPS",
                                    file, 
                                    timestamp_start = c("timestampStart", "samplingFrequency", "values"),
                                    tz = tz)
    
    systolic_peaks_data <- create_dataframes(raw_data,
                                             type = "systolicPeaks",
                                             vars = "peaksTimeNanos",
                                             var_names = "PEAKS",
                                             file)
    
    
    this_avro_list <- list(
      ACC = acc_data,
      GY = gy_data,
      EDA = eda_data,
      TEMP = temp_data,
      BVP = bvp_data,
      steps = steps_data,
      systolic_peaks = systolic_peaks_data
    )
    
    if (exists("avro_list")) {
      avro_list <- append(avro_list, list(this_avro_list))
    } else {
      avro_list <- list()
      avro_list[[1]] <- this_avro_list
    }
    
    i <- i + 1
    cli_progress_update()
    
    if(i == length(avro_files)) {
      cli_progress_done()
    }
    
  }
  
  # Disconnect from the Spark cluster
  spark_disconnect(sc)
  
  return(avro_list)
  
}
PCdLf/wearables documentation built on Nov. 19, 2024, 5:57 p.m.