#
# size price
# insert Y Y
# upadte Y N
# delete N N
#
# fill size of delete to 0
#
id2price = function(id, symbol='XBTUSD') {
if (symbol=='XBTUSD') symbolIdx = 88
return(((100000000 * symbolIdx) - id) / 100)
}
#' @import dplyr
#' @export
read_jsonlines = function(filename) {
ob.lines = readr::read_lines(filename)
lapply(ob.lines, function(row) {
# jsonlite::fromJSON(row, simplifyVector=FALSE)
rjson::fromJSON(row)
})
}
#' Read msgs from file
#' @import dplyr
#' @export
read_msgs = function(filename) {
ob.lines = readr::read_lines(filename)
lapply(ob.lines, function(row) {
msg = rjson::fromJSON(row)
msg$data = as_orderBookL2.list(msg$data)
msg = as_msg(msg)
msg
})
}
#' @import dplyr
#' @export
to_json = function(ob) {
ob %>% as.list() %>% unname() %>% rjson::toJSON()
}
#' @export
as_json = function(x, ...) {
UseMethod('as_json')
}
#' @import dplyr
#' @export
as_json.orderBookL2 = function(ob) {
to_json(ob)
}
#' #' @import dplyr
#' #' @export
#' to_scd = function(ob, timestamp=NULL) {
#' scd = ob %>%
#' as_tibble() %>%
#' mutate(key=paste(symbol, id, side, sep='_'))
#' if(!is.null(timestamp))
#' scd$start_date = timestamp
#' else
#' scd$start_date = lubridate::now()
#' scd$end_date = NA
#' return(scd)
#' }
#' Convert plain json object to msg object
#' @param msg msg object
#' @description msg object is list of websocket message
#' @export
as_msg = function(msg) {
if (!inherits(msg, 'msg')) {
class(msg) = c('msg', class(msg))
}
msg
}
#' @export
print.msg = function(x) {
cat(paste("BitMEX Websocket Message of", x[['table']]))
cat('\n')
cat(paste('Action:', x[['action']]))
cat('\n')
cat(paste('Timestamp:', x[['timestamp']]))
cat('\n')
print(x['data'])
}
#' @export
as_scd = function(msg) {
UseMethod('as_scd')
}
#' @import tidyr dplyr
#' @export
as_scd.msg = function(msg) {
scd = as_tibble(msg$data) %>%
mutate(
action = msg[['action']],
start_time = msg[['timestamp']],
end_time = NA,
)
if (!inherits(scd, 'scd'))
class(scd) = c('scd', class(scd))
return(scd)
}
#' @import tidyr dplyr
#' @export
as_scd.orderBookL2 = function(ob, action, timestamp) {
scd = as_tibble(ob) %>%
mutate(
action = action,
start_time = timestamp,
end_time = NA
)
if (!inherits(scd, 'scd'))
class(scd) = c('scd', class(scd))
return(scd)
}
#' @import tidyr dplyr
#' @export
as_scd.data.frame = function(scd) {
if (!inherits(scd, 'scd'))
class(scd) = c('scd', class(scd))
return(scd)
}
#' @export
merge_scd = function(x, y) {
UseMethod('merge_scd')
}
#' @import tidyr dplyr
#' @export
merge_scd.scd = function(scd.origin, scd.increment) {
if (nrow(scd.origin)==0) {
return(list(
to_update = scd.origin,
to_insert = scd.increment
))
}
keys = syms(c('symbol', 'id', 'side'))
scd.origin = scd.origin %>% mutate(tag='origin')
scd.increment = scd.increment %>% mutate(tag='increment')
dfr = scd.origin %>%
bind_rows(scd.increment) %>%
group_by(!!!keys) %>%
mutate(end_time = lead(start_time)) %>%
ungroup()
scd.update = dfr %>%
filter(tag=='origin', !is.na(end_time)) %>%
select(-tag)
scd.insert = dfr %>%
filter(tag == 'increment') %>%
select(-tag)
list(
to_update = scd.update,
to_insert = scd.insert
)
}
#' @import tidyr dplyr
#' @export
merge_scd.list = function(msgs) {
keys = syms(c('symbol', 'id', 'side'))
lapply(msgs, as_scd) %>%
bind_rows() %>%
group_by(!!!keys) %>%
mutate(
end_time = lead(start_time)
) %>%
ungroup() %>%
mutate(
id = as.character(id)
)
}
#' @export
new_orderBookL2 = function() {
ob = new.env()
class(ob) = c('orderBookL2', class(ob))
return(ob)
}
#' @export
as_orderBookL2 = function(x, ...) {
UseMethod('as_orderBookL2')
}
#' @export
as_orderBookL2.list = function(x, na.fill=TRUE) {
if (inherits(x, 'orderBookL2')) return(x)
if (na.fill)
x = lapply(x, function(order) {
if (is.null(order[['price']]))
order[['price']] = id2price(order[['id']])
if (is.null(order[['size']]))
order[['size']] = 0
order
})
keys = c('symbol', 'id', 'side')
names(x) = sapply(x, function(order) paste(order[keys], collapse='_'))
x = as.environment(x)
class(x) = c('orderBookL2', class(x))
x
}
#' @export
as_orderBookL2.environment = function(x) {
if (inherits(x, 'orderBookL2')) return(x)
class(x) = c('orderBookL2', class(x))
x
}
#' @export
as_orderBookL2.default = as_orderBookL2.list
#' @export
print.orderBookL2 = function(x) {
x.tbl = as_tibble(x)
print(paste(length(x), 'orders in orderBookL2'))
}
#' @export
as_tibble.orderBookL2 = function(ob) {
do.call(bind_rows, as.list(ob))
}
#' @export
as.list.orderBookL2 = function(ob) {
ob %>%
as.list.environment() %>%
unname()
}
#' @export
copy = function(x, ...) {
UseMethod('copy')
}
#' @import dplyr
#' @export
copy.orderBookL2 = function(ob) {
as.list.orderBookL2(ob) %>%
as_orderBookL2()
}
#' @export
modify = function(x, ...) {
UseMethod('modify')
}
#' Update orderBookL2
#' @details
#' modify by reference!!
modify.orderBookL2 = function(ob, inc, action='update', strict=FALSE, copy=FALSE) {
keys = c('symbol', 'id', 'side')
if (copy) {
ob = copy.orderBookL2(ob)
}
if (strict) {
invisible(lapply(inc, function(order) {
key = paste(order[keys], collapse='_')
if (action=='delete') {
if (!is.null(ob[[key]])) ob[[key]] = NULL
} else if (action=='partial') {
ob[[key]] = order
} else {
if (!is.null(ob[[key]])) ob[[key]] = modifyList(ob[[key]], order) # modify if not NULL
}
}))
} else {
if (action=='partial') {
ob = new_orderBookL2()
}
invisible(lapply(inc, function(order) {
key = paste(order[keys], collapse='_')
if (action=='delete') order[['size']] = 0
if (is.null(ob[[key]])) ob[[key]] = list() # fill NULL with empty list
ob[[key]] = modifyList(ob[[key]], order)
}))
}
if (!inherits(ob, 'orderBookL2')) {
class(ob) = c('orderBookL2', class(ob))
}
return(ob)
}
#
# merge_tibble = function(ob, inc, action='update') {
#
# }
#' @import dplyr
#' @export
reduce_msgs = function(msgs, limit=0, strict=FALSE) {
if (limit==0) limit = length(msgs)
msgs = msgs[1:limit]
data = lapply(msgs, function(msg) msg$data)
action = lapply(msgs, function(msg) msg$action)
f = purrr::partial(modify.orderBookL2, strict=strict)
ob = purrr::reduce2(data, action, f, .init=new_orderBookL2())
new_msg = msgs[[length(msgs)]] # the last msg
new_msg$data = ob
if (strict) new_msg$action = 'partial' else new_msg$action = 'update'
return(new_msg)
}
#' @import dplyr
#' @export
accumulate_msgs = function(msgs, limit=0, strict=FALSE) {
if (limit==0) limit = length(msgs)
msgs = msgs[1:limit]
data = lapply(msgs, function(msg) msg$data)
action = lapply(msgs, function(msg) msg$action)
f = purrr::partial(modify.orderBookL2, strict=strict, copy=TRUE)
obs = purrr::accumulate2(data, action, f, .init=new_orderBookL2())
mapply(function(msg, ob) {
new_msg = msg
new_msg$data = ob
new_msg$action = 'partial'
return(new_msg)
}, msgs, obs[2:length(obs)], SIMPLIFY=FALSE)
}
#' @import dplyr
#' @export
map_msgs = function(msgs, unit='seconds') {
idx = lapply(msgs, function(row) {
tibble::tibble(
timestamp = row$timestamp,
action = row$action,
table = row$table
)
}) %>%
bind_rows() %>%
mutate(timestamp = readr::parse_datetime(timestamp)) %>%
mutate(dt = lubridate::floor_date(timestamp, unit=unit)) %>%
group_by(dt)
split(msgs, group_indices(idx))
}
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.