stream_in, stream_out | R Documentation |
The stream_in
and stream_out
functions implement line-by-line processing
of JSON data over a connection, such as a socket, url, file or pipe. JSON
streaming requires the ndjson format, which slightly differs
from fromJSON()
and toJSON()
, see details.
stream_in(con, handler = NULL, pagesize = 500, verbose = TRUE, ...)
stream_out(x, con = stdout(), pagesize = 500, verbose = TRUE, prefix = "", ...)
con |
a connection object. If the connection is not open,
|
handler |
a custom function that is called on each page of JSON data. If not specified,
the default handler stores all pages and binds them into a single data frame that will be
returned by |
pagesize |
number of lines to read/write from/to the connection per iteration. |
verbose |
print some information on what is going on. |
... |
arguments for |
x |
object to be streamed out. Currently only data frames are supported. |
prefix |
string to write before each line (use |
Because parsing huge JSON strings is difficult and inefficient, JSON streaming is done
using lines of minified JSON records, a.k.a. ndjson.
This is pretty standard: JSON databases such as MongoDB use the same format to
import/export datasets. Note that this means that the
total stream combined is not valid JSON itself; only the individual lines are. Also note
that because line-breaks are used as separators, prettified JSON is not permitted: the
JSON lines must be minified. In this respect, the format is a bit different from
fromJSON()
and toJSON()
where all lines are part of a single JSON
structure with optional line breaks.
The handler
is a callback function which is called for each page (batch) of
JSON data with exactly one argument (usually a data frame with pagesize
rows).
If handler
is missing or NULL
, a default handler is used which stores all
intermediate pages of data, and at the very end binds all pages together into one single
data frame that is returned by stream_in
. When a custom handler
function
is specified, stream_in
does not store any intermediate results and always returns
NULL
. It is then up to the handler
to process or store data pages.
A handler
function that does not store intermediate results in memory (for
example by writing output to another connection) results in a pipeline that can process an
unlimited amount of data. See example.
Note that a vector of JSON strings already in R can parsed with stream_in
by
creating a connection to it with textConnection()
.
If a connection is not opened yet, stream_in
and stream_out
will automatically open and later close the connection. Because R destroys connections
when they are closed, they cannot be reused. To use a single connection for multiple
calls to stream_in
or stream_out
, it needs to be opened
beforehand. See example.
The stream_out
function always returns NULL
.
When no custom handler is specified, stream_in
returns a data frame of all pages binded together.
When a custom handler function is specified, stream_in
always returns NULL
.
MongoDB export format: https://docs.mongodb.com/manual/reference/program/mongoexport/
Documentation for the JSON Lines text file format: https://jsonlines.org/
fromJSON()
, read_json()
# compare formats
x <- iris[1:3,]
toJSON(x)
stream_out(x)
# Trivial example
mydata <- stream_in(url("https://jeroen.github.io/data/iris.json"))
## Not run:
#stream large dataset to file and back
library(nycflights13)
stream_out(flights, file(tmp <- tempfile()))
flights2 <- stream_in(file(tmp))
unlink(tmp)
all.equal(flights2, as.data.frame(flights))
# stream over HTTP
diamonds2 <- stream_in(url("https://jeroen.github.io/data/diamonds.json"))
# stream over HTTP with gzip compression
flights3 <- stream_in(gzcon(url("https://jeroen.github.io/data/nycflights13.json.gz")))
all.equal(flights3, as.data.frame(flights))
# stream over HTTPS (HTTP+SSL) via curl
library(curl)
flights4 <- stream_in(gzcon(curl("https://jeroen.github.io/data/nycflights13.json.gz")))
all.equal(flights4, as.data.frame(flights))
# or alternatively:
flights5 <- stream_in(gzcon(pipe("curl https://jeroen.github.io/data/nycflights13.json.gz")))
all.equal(flights5, as.data.frame(flights))
# Full JSON IO stream from URL to file connection.
# Calculate delays for flights over 1000 miles in batches of 5k
library(dplyr)
con_in <- gzcon(url("https://jeroen.github.io/data/nycflights13.json.gz"))
con_out <- file(tmp <- tempfile(), open = "wb")
stream_in(con_in, handler = function(df){
df <- dplyr::filter(df, distance > 1000)
df <- dplyr::mutate(df, delta = dep_delay - arr_delay)
stream_out(df, con_out, pagesize = 1000)
}, pagesize = 5000)
close(con_out)
# stream it back in
mydata <- stream_in(file(tmp))
nrow(mydata)
unlink(tmp)
# Data from http://openweathermap.org/current#bulk
# Each row contains a nested data frame.
daily14 <- stream_in(gzcon(url("http://78.46.48.103/sample/daily_14.json.gz")), pagesize=50)
subset(daily14, city$name == "Berlin")$data[[1]]
# Or with dplyr:
library(dplyr)
daily14f <- flatten(daily14)
filter(daily14f, city.name == "Berlin")$data[[1]]
# Stream import large data from zip file
tmp <- tempfile()
download.file("http://jsonstudio.com/wp-content/uploads/2014/02/companies.zip", tmp)
companies <- stream_in(unz(tmp, "companies.json"))
## End(Not run)
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.