R/insertIndicators.R

if (FALSE) {

  install_github("tmk-c/myrlib")
  library(myrlib)

  # Functions ------------------------------------------------------------------
  calcAndSaveIndicator <- function(data, symbol, dir, lens, ...) {

    # List of files
    cur_symbols <- stringr::str_replace(list.files(dir), ".csv", "")

    if (!symbol %in% cur_symbols) {
      results <- do.call("rbind", lapply(lens, function(len) {
        result <- zoo::rollapply(data, len, ..., by.column = FALSE) %>%
          na.omit() %>%
          data.table::as.data.table()
        data.table::setnames(result, c("date", "value"))
        result[, len:= len]
        result
      }))

      # Arrange columns
      results[, ticker:= symbol]
      data.table::setcolorder(results, c("date", "ticker", "len", "value"))

      path <- paste0(dir, symbol, ".csv")
      write.csv(results, path, row.names = FALSE)

      return (TRUE)

    } else {
      return (FALSE)
    }
  }

  # All symbols from equity prices (10,033)
  con <- DBI::dbConnect(RSQLite::SQLite(), settings$sharadar_plus.sqli)
  src <- dplyr::tbl(con, "equity_prices")
  tickers <- src %>%
    dplyr::select(ticker) %>% dplyr::collect() %>% dplyr::pull() %>% unique()
  DBI::dbDisconnect(con)

  # Trading universe (8,871)
  universe <- getActiveSymbols(settings$sharadar_plus.sqli,
                               start.date = "2008-01-02",
                               end.date = "2018-11-28")
  # Intersect symbols (7,974)
  symbols <- intersect(tickers, universe)

  # Current symbols
  cur_sma <- stringr::str_replace(list.files("~/tmp/sma"), ".csv", "")
  cur_sd  <- stringr::str_replace(list.files("~/tmp/sd"), ".csv", "")
  cur_ato <- stringr::str_replace(list.files("~/tmp/ato"), ".csv", "")
  cur_ogc <- stringr::str_replace(list.files("~/tmp/ogc"), ".csv", "")

  # Prepare parallel foreach
  library(foreach)
  library(doParallel)
  pkgs <- c("DBI", "zoo", "xts", "data.table", "dbplyr")
  expt <- c("getDailyOHLCV")
  cl   <- parallel::makeCluster(parallel::detectCores() - 2)
  doParallel::registerDoParallel(cl)

  # Create CSV parallely
  s <- proc.time()
  # foreach(symbol = symbols) %do% {
  rslt <- foreach(symbol = symbols, .packages = pkgs, .export = expt) %dopar% {

    # Check that we have already all data
    if ((!symbol %in% cur_sma) | (!symbol %in% cur_sd) |
        (!symbol %in% cur_ato) | (!symbol %in% cur_ogc)) {

      # Prepare data
      data      <- getDailyOHLCV(settings$sharadar_plus.sqli, symbol)
      adj_close <- data[, c("date", "adj.close")] %>% as.xts.data.table()
      adj_open  <- data[, c("date", "adj.open")] %>% as.xts.data.table()

      # Calculate length must be less than or equal to data length
      all_lens <- seq(10, 250, 10)
      lens <- all_lens[all_lens <= nrow(data)]

      if (length(lens) > 0) {

        # SMA
        calcAndSaveIndicator(adj_close, symbol, "~/tmp/sma/", lens,
                             mean, na.rm = TRUE)

        # SD
        roc_pc2tc <- diff.xts(adj_close, log = TRUE)
        calcAndSaveIndicator(roc_pc2tc, symbol, "~/tmp/sd/", lens,
                             sd, na.rm = TRUE)

        # ATO
        tover <- data[, tover:= adj.close * adj.volume][, c("date", "tover")] %>%
          as.xts.data.table()
        calcAndSaveIndicator(tover, symbol, "~/tmp/ato/", lens,
                             mean, na.rm = TRUE)

        # OGC
        roc_pc2to <- log(adj_open) - lag.xts(log(adj_close))
        roc_to2tc <- log(adj_close) - log(adj_open)
        ogc_data  <- merge.xts(roc_pc2to, roc_to2tc)
        colnames(ogc_data) <- c("gap", "ret")

        # Data cleaning for lm
        ogc_data$gap[is.infinite(ogc_data$gap)] <- NA
        ogc_data$gap[is.nan(ogc_data$gap)] <- NA
        ogc_data$ret[is.infinite(ogc_data$ret)] <- NA
        ogc_data$ret[is.nan(ogc_data$ret)] <- NA

        calcOgc <- function(data) coef(lm(ret ~ gap + 0, data = data))
        calcAndSaveIndicator(ogc_data, symbol, "~/tmp/ogc/", lens, calcOgc)
      }
    }
  }

  # Stop cluster
  e <- proc.time() - s
  print(paste0("Completed in ", e[3], " sec."))
  stopCluster(cl)

  # Check data
  # a <- fread("~/tmp/tmp/A.csv")
  # b <- fread("~/tmp/ogc/A.csv")
  # ((a$value == b$value) == FALSE) %>% sum()

  ### Check if last date is the same as original data --------------------------
  info <- getSymbolInfo(settings$sharadar_plus.sqli)
  for (symbol in cur_sma) {

    data <- fread(paste0("~/tmp/sma/", symbol, ".csv"))

    if (nrow(data) > 0) {
      info_date <- info[ticker == symbol]$last_price_date
      data_date <- data[nrow(data)]$date
      ok <- info_date == data_date

      if (!ok) {
        print(paste(symbol, info_date, data_date, sep = " - "))
      }

    } else {
      print(paste0(symbol, " does not have data."))
    }
  }

  ### DB insert ----------------------------------------------------------------

  # Build data from CSV
  dir <- "~/tmp/ogc/"
  data <- do.call("rbind", lapply(cur_sma, function(symbol) {
    path <- paste0(dir, symbol, ".csv")
    d <- fread(path)
  }))

  # Check data
  data$value %>% is.na() %>% sum()
  data$value %>% is.nan() %>% sum()
  data$value %>% is.infinite() %>% sum()
  data$value %>% is.null() %>% sum()

  # dcast data
  p <- proc.time()
  ddata <- data %>% data.table::dcast(date + ticker ~ len, value.var = "value")
  proc.time() - p

  # Order data
  ddata <- ddata[order(date, ticker)]
  write.csv(ddata, "~/tmp/ogc_dcast.csv")

  # Insert into DB
  # テーブルへの挿入はデータ量が多いので、csv .import を利用する
  # con <- DBI::dbConnect(RSQLite::SQLite(), settings$sharadar_plus.sqli)
  # src <- dbplyr::src_dbi(con)
  # dplyr::db_insert_into(con, table = "sma", values = data)
  # DBI::dbDisconnect(con)

  # Check DB
  channel <- DBI::dbConnect(RSQLite::SQLite(), settings$sharadar_plus.sqli)
  sql = paste0("SELECT [date],[ticker],[10],[20],[30],[40] FROM [sma2]
                WHERE [date] BETWEEN '2017-11-28' AND '2018-11-28'")
  data <- DBI::dbGetQuery(channel, sql) %>% data.table::data.table()
  data$date <- as.POSIXct(data$date)
  DBI::dbDisconnect(channel)
}
tmk-c/myrlib documentation built on May 29, 2019, 1:44 p.m.