data-raw/f_data_munging_R.R

# data munging ----------------------------------------------------------------

#' Run data munging script.
#' @export
#' @param type Sensor type
#' @param type_sub Number of sensors considered 
data_munging <- function(type,type_sub) {
  
  if(type=="RG" && type_sub==""){
    sub_folder <- list.files(paste0(baseLoc,"../data-raw/",type))
    
    for(i in 1:length(sub_folder)){
      
      file_name <- list.files(paste0(baseLoc,"../data-raw/",type,"/",sub_folder[i]))
      file <- data.table::rbindlist(lapply(paste0(baseLoc,"../data-raw/",type,"/",sub_folder[i],"/",file_name), read.table))
      sensor_0001 <- paste0("<STATION>ADLISWIL0",i,"</STATION><SENSOR>0001</SENSOR><DATEFORMAT>YYYYMMDD</DATEFORMAT>")
      index_sensor_change <- grep("<STATION>", file$V1)
      file$V1 <- as.character(file$V1)
      obs <- list()
      for (row in 1:length(index_sensor_change)){
        if(file$V1[index_sensor_change][row] == sensor_0001){
          obs <- c(obs,strsplit(.subset2(file,1)[(index_sensor_change[row]+1):(index_sensor_change[row+1]-1)],";", fixed = T))
        }
      }
      obs <- data.frame(matrix(unlist(obs), ncol = 3, byrow = T))
      obs$X1 <- as.POSIXct(paste(as.character(obs$X1), as.character(obs$X2)), format = "%Y%m%d %H%M%S", tz = "UTC")
      obs <- obs[-2]
      colnames(obs) <- c("Date","Sensor_1")
      obs$Sensor_1 <- as.numeric(as.character(obs$Sensor_1))
      save(obs,file=paste0(baseLoc,"../data-preprocessed/",type,"/",type,"_data_",i,".RData"))
    }
    
    for(i in 1:length(sub_folder)){
      assign(paste0("tmp_",i),get(load(paste0(baseLoc,"../data-preprocessed/",type,"/",type,"_data_",i,".RData"))))
    }
    
    obs <- data.frame(Date=seq(as.POSIXct(analysis_start,tz ="UTC"),as.POSIXct(analysis_end,tz="UTC"),by=60))
    obs <- Reduce(function(dtf1,dtf2) dplyr::left_join(dtf1,dtf2,by="Date"),list(obs,tmp_1,tmp_2,tmp_3,tmp_4,tmp_5))
    colnames(obs) <- c("Date","Sensor_1","Sensor_2","Sensor_3","Sensor_4","Sensor_5")
    
    for (i in 1:length(sub_folder)){
      file.remove(paste0(baseLoc,"../data-preprocessed/",type,"/",type,"_data_",i,".RData"))
    }
    
    # mm/min to mm/h
    obs[,2:6] <- obs[,2:6]*60
    obs <- subset(obs,obs$Date>=as.POSIXct(analysis_start, format="%Y-%m-%d %H:%M",tz="UTC"))
    obs <- subset(obs,obs$Date<=as.POSIXct(analysis_end, format="%Y-%m-%d %H:%M",tz="UTC"))
    
    save(obs,file = paste0(baseLoc,"../data-preprocessed/",type,"/",type,type_sub,".RData"))
  }
    
  if(type=="RG" && type_sub=="_1"){
    
    sub_folder <- list.files(paste0(baseLoc,"../data-raw/RG"))
    file_name <- list.files(paste0(baseLoc,"../data-raw/RG/",sub_folder[1]))
    file <- data.table::rbindlist(lapply(paste0(baseLoc,"../data-raw/RG/",sub_folder[1],"/",file_name), read.table))
    sensor_0001 <- paste0("<STATION>ADLISWIL0",1,"</STATION><SENSOR>0001</SENSOR><DATEFORMAT>YYYYMMDD</DATEFORMAT>")
    index_sensor_change <- grep("<STATION>", file$V1)
    file$V1 <- as.character(file$V1)
    obs <- list()
    for (row in 1:length(index_sensor_change)){
      if(file$V1[index_sensor_change][row] == sensor_0001){
        obs <- c(obs,strsplit(.subset2(file,1)[(index_sensor_change[row]+1):(index_sensor_change[row+1]-1)],";", fixed = T))
      }
    }
    obs <- data.frame(matrix(unlist(obs), ncol = 3, byrow = T))
    obs$X1 <- as.POSIXct(paste(as.character(obs$X1), as.character(obs$X2)), format = "%Y%m%d %H%M%S", tz = "UTC")
    obs <- obs[-2]
    colnames(obs) <- c("Date","Sensor_1")
    obs$Sensor_1 <- as.numeric(as.character(obs$Sensor_1))
    
    # mm/min to mm/h
    obs$Sensor_1 <- obs$Sensor_1*60
    obs <- subset(obs,obs$Date>=as.POSIXct(analysis_start, format="%Y-%m-%d %H:%M",tz="UTC"))
    obs <- subset(obs,obs$Date<=as.POSIXct(analysis_end, format="%Y-%m-%d %H:%M",tz="UTC"))
    # smooth values to 15 min averages
    obs[,2] <- RG_smooth(obs[,2], 15)
    
    save(obs,file = paste0(baseLoc,"../data-preprocessed/",type,"/",type,type_sub,".RData"))
    
  }
  if(type=="CML" && type_sub==""){
  
    sub_folder <- list.files(paste0(baseLoc,"../data-raw/",type,""))
    for (i in 1:length(sub_folder)){
      file_names <- list.files(paste0(baseLoc,"../data-raw/",type,"/",sub_folder[i]))
      list_Date <- list()
      list_rain <- list()
      for (j in 1:length(file_names)){
        file <- read.table(paste0(baseLoc,"../data-raw/",type,"/",sub_folder[i],"/",file_names[j]),na.strings="NA",fill=T,header=F,blank.lines.skip=T)
        if (ncol(file)==2) {file[,3]=NA}
        file[,1] <- as.POSIXct(paste(file[,1],file[,2]),format="%Y-%m-%d %H:%M:%S",tz="CET")
        file[,1] <- format(file[,1],tz="UTC",usetz=TRUE)
        file <- file[,c(1,3)]
        list_Date <- c(list_Date,list(file[,1]))
        list_rain <- c(list_rain,file[,2])
      }
      obs <- data.frame(Date=as.POSIXct(unlist(list_Date),format="%Y-%m-%d %H:%M:%S",tz="UTC",origin=strptime("1970-01-01 00:00:00", format = "%Y-%m-%d %H:%M:%S", tz = "UTC")),sensor_1=unlist(list_rain))
      save(obs,file=paste0(baseLoc,"../data-preprocessed/",type,"/",type,"_data_",i,".RData"))
    }
    
    for (i in 1:length(sub_folder)){
      assign(paste0("tmp_",i),get(load(file=paste0(baseLoc,"../data-preprocessed/",type,"/",type,"_data_",i,".RDATA"))))
    }
    obs <- list(data.frame(Date=seq(as.POSIXct(analysis_start,tz="UTC"),as.POSIXct(analysis_end,tz="UTC"),by = 60)))
    obs <- c(obs,lapply(seq(1:length(sub_folder)),function(i) eval(parse(text=paste0("tmp_",i)))))
    obs <-  Reduce(function(dtf1,dtf2) dplyr::left_join(dtf1,dtf2,by="Date"),obs)
    colnames(obs) <- c("Date", unlist(lapply(1:(ncol(obs) - 1), function(i) paste0("Sensor_",i))))
    obs <- dplyr::summarise_all(dplyr::group_by(obs,Date=cut(Date,"1 min")),dplyr::funs(mean),na.rm=T)
    obs$Date <- as.POSIXct(obs$Date,format="%Y-%m-%d %H:%M:%S",tz="UTC")
    for(i in 1:17){
      df <- data.frame(Date=obs$Date,obs[,i+1])
      colnames(df) <- c("Date","Sensor_1")
      vec <- interpolate_values(df)
      obs[,i+1] <- vec
    }
    save(obs,file=paste0(baseLoc,"../data-preprocessed/",type,"/",type,".RDATA"))
    for (i in 1:length(sub_folder)){
      file.remove(paste0(baseLoc,"../data-preprocessed/",type,"/",type,type_sub,"_data_",i,".RDATA"))
    }
  }
  
  if(type=="CML" && type_sub=="_1"){
    sub_folder <- list.files(paste0(baseLoc,"../data-raw/",type))
    file_name <- list.files(paste0(baseLoc,"../data-raw/",type,"/",sub_folder[1]))
    list_Date <- list()
    list_rain <- list()
    for (j in 1:length(file_name)){
      file <- read.table(paste0(baseLoc,"../data-raw/",type,"/", sub_folder[1],"/", file_name[j]), na.strings = "NA", fill = T, header = F, blank.lines.skip = T)
      file[,1] <- as.POSIXct(paste(file[,1], file[,2]), format="%Y-%m-%d %H:%M:%S", tz="CET")
      # CET to UTC
      file[,1] <- format(file[,1], tz = "UTC", usetz = T)
      file <- file[,c(1,3)]
      list_Date <- c(list_Date,list(file[,1]))
      list_rain <- c(list_rain,file[,2])
    }
    obs <- data.frame(Date = as.POSIXct(unlist(list_Date), format = "%Y-%m-%d %H:%M:%S", tz = "UTC"), Sensor_1 = unlist(list_rain))
    
    obs <- subset(obs,obs$Date>=as.POSIXct(analysis_start, format="%Y-%m-%d %H:%M",tz="UTC"))
    obs <- subset(obs,obs$Date<=as.POSIXct(analysis_end, format="%Y-%m-%d %H:%M",tz="UTC"))
    
    # linear interpolation to 1 min data
    tmp <- data.frame(Date=seq(as.POSIXct(analysis_start,tz="UTC"),as.POSIXct(analysis_end,tz="UTC"),by = 60))
    obs <- as.data.frame(approx(obs$Date,obs$Sensor_1,unlist(tmp))$y)
    obs <- data.frame(tmp,obs)
    colnames(obs) <- c("Date","Sensor_1")
    
    save(obs,file = paste0(baseLoc,"../data-preprocessed/",type,"/",type,type_sub,".RData"))
  }
  
  if(type=="Radar"){
    files <- list.files(paste0(baseLoc,"/../data-raw/",type))
    total <- length(files)
    
    xlo <- 680346  
    xhi <- 684711   
    ylo <- 233741    
    yhi <- 246199   
    
    for (i in 1:total){
      nc.radar <- ncdf4::nc_open(paste0(baseLoc,"/../data-raw/",type,"/",files[i]))
      xst <- min(which(nc.radar$dim$chx$vals > xlo))
      xct <- (xhi-xlo)/1000
      yst <- min(which(nc.radar$dim$chy$vals < yhi))
      yct <- (yhi-ylo)/1000
      tmp_array <- ncdf4::ncvar_get(nc.radar,"rain",start=c(xst,yst,1),count=c(xct,yct,length(nc.radar$dim$time$vals)))
      names.x <- ncdf4::ncvar_get(nc.radar,"chx",start=xst,count=xct)
      names.y <- ncdf4::ncvar_get(nc.radar,"chy",start=yst,count=yct)
      names.t <- ncdf4::ncvar_get(nc.radar,"time",start=1,count=length(nc.radar$dim$time$vals))
      names.t <- as.character(as.POSIXct(names.t,origin=strptime("1970-01-01 00:00:00", format = "%Y-%m-%d %H:%M:%S", tz = "UTC"),tz="UTC"))
      dim(tmp_array)<-c(xct,yct,length(nc.radar$dim$time$vals))
      dimnames(tmp_array)<-list(names.x,names.y,names.t)
      ncdf4::nc_close(nc.radar)
      names(dimnames(tmp_array))<-c("x","y","t")
      obs<- data.frame(Date=names.t,as.data.frame(matrix(ncol=length(names.x)*length(names.y))))
      intr <- 2
      # tmp_array
      #   , , t = 2013-11-01 00:47:30
      # 
      #         y
      # x        245500 244500 243500 242500 241500 240500 239500 238500 237500 236500 235500 234500
      #   680500  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04
      #   681500  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04
      #   682500  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04
      #   683500  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04  1e-04
      for(x in 1:length(names.x)){
        for(y in 1:length(names.y)){
          obs[,intr] <- tmp_array[x,y,]
          intr <- intr+1
        }
      }
      colnames(obs) <- c("Date", unlist(lapply(1:(ncol(obs) - 1), function(i) paste0("sensor_",i))))
      obs$Date <- as.POSIXct(as.character(obs$Date),format="%Y-%m-%d %H:%M:%S",tz="UTC")
      save(obs,file=paste0(baseLoc,"/../data-processed/",type,"/",type,"_data_",i,".RDATA"))
    }
    
    for (i in 1:total){
      assign(paste0("tmp_",i),get(load(paste0(baseLoc,"/../data-processed/",type,"/",type,"_data_",i,".RDATA"))))
      assign(paste0("tmp_",i),fill_missing_timestamp(eval(parse(text=paste0("tmp_",i))),"150 secs"))
    }
    
    obs <- lapply(seq(1:total),function(i) eval(parse(text=paste0("tmp_",i))))
    obs <- Reduce(function(dtf1,dtf2) rbind(dtf1,dtf2),obs)
    obs[obs<0] <- 0
    obs[is.na(obs)] <- 0
    lapply(seq(1:total),function(i) file.remove(paste0(baseLoc,"/../data-processed/",type,"/",type,"_data_",i,".RDATA")))
    obs <- obs[,c(1,28:30)]
    colnames(obs) <- c("Date", unlist(lapply(1:(ncol(obs) - 1), function(i) paste0("Sensor_",i))))
    
    tmp <- data.frame(Date=seq(as.POSIXct(analysis_start,tz="UTC"),as.POSIXct(analysis_end,tz="UTC"),by = 60))
    obs1 <- as.data.frame(approx(obs$Date,obs$Sensor_1,unlist(tmp))$y)
    obs2 <- as.data.frame(approx(obs$Date,obs$Sensor_2,unlist(tmp))$y)
    obs3 <- as.data.frame(approx(obs$Date,obs$Sensor_3,unlist(tmp))$y)
    obs <- data.frame(tmp,obs1,obs2,obs3)
    colnames(obs) <- c("Date","Sensor_1","Sensor_2","Sensor_3")
    
    # fix to RG raining periods
    obs$Sensor_1[RG$Sensor_1==0] <- 0
    obs$Sensor_2[RG$Sensor_1==0] <- 0
    obs$Sensor_3[RG$Sensor_1==0] <- 0
    # remove high unplausible peak
    obs$Sensor_1[85235:85246] <- 0
    obs$Sensor_2[85235:85246] <- 0
    obs$Sensor_3[85235:85246] <- 0
    
    save(obs,file=paste0(baseLoc,"/../data-processed/",type,"/",type,type_sub,".RDATA"))

  }
  
  if(type=="flow"){

    obs <- read.table(file=paste0(baseLoc,"../data-raw/flow/Site1_full_period_rtq_final.txt"),skip=1)
    obs <- obs[c(1,2,12)]
    obs[,1] <- as.POSIXct(paste(as.character(obs[,1]), as.character(obs[,2])),format="%d.%m.%Y %H:%M",tz="UTC")
    obs <- obs[c(1,3)]
    colnames(obs) <- c("Date","Sensor_1")
    obs <- dplyr::mutate(obs, Date = as.POSIXct(Date))
    obs <- as.data.frame(tidyr::complete(obs, Date = seq.POSIXt(min(Date), max(Date), by = "4 min")))
    obs[is.na(obs)] <- 0
    obs <- subset(obs,obs$Date>=as.POSIXct(analysis_start, format="%Y-%m-%d %H:%M",tz="UTC"))
    obs <- subset(obs,obs$Date<=as.POSIXct(analysis_end, format="%Y-%m-%d %H:%M",tz="UTC"))
    save(obs,file=paste0(baseLoc,"../data-preprocessed/",type,"/",type,type_sub,".RDATA"))
    
  }
  
}
adisch87/COMCORDEcml documentation built on Dec. 22, 2017, 11:11 a.m.