import sqlite3
import timeit
import pandas
from data_algebra.data_ops import *
import data_algebra.db_model
import data_algebra.SQLite
import data_algebra.test_util
Load example
d = pandas.read_csv('d.csv.gz')
vars = [c for c in d.columns if not c == 'g']
d.head()
d.shape
(1000000, 11)
Set timing reps
reps = 5
data_algebra pandas solution
ops = describe_table(d, table_name='d'). \
extend({'max_' + v: v + '.max()' for v in vars},
partition_by=['g']). \
order_rows(['g'] + vars)
ops
TableDescription(
table_name='d',
column_names=[
'g', 'v_00001', 'v_00002', 'v_00003', 'v_00004', 'v_00005', 'v_00006',
'v_00007', 'v_00008', 'v_00009', 'v_00010']) .\
extend({
'max_v_00001': 'v_00001.max()',
'max_v_00002': 'v_00002.max()',
'max_v_00003': 'v_00003.max()',
'max_v_00004': 'v_00004.max()',
'max_v_00005': 'v_00005.max()',
'max_v_00006': 'v_00006.max()',
'max_v_00007': 'v_00007.max()',
'max_v_00008': 'v_00008.max()',
'max_v_00009': 'v_00009.max()',
'max_v_00010': 'v_00010.max()'},
partition_by=['g']) .\
order_rows(['g', 'v_00001', 'v_00002', 'v_00003', 'v_00004', 'v_00005', 'v_00006', 'v_00007', 'v_00008', 'v_00009', 'v_00010'])
res = ops.transform(d)
res.head()
5 rows × 21 columns
expect = pandas.read_csv('res.csv.gz')
expect.head()
5 rows × 21 columns
assert data_algebra.test_util.equivalent_frames(res, expect)
def f():
global d
global ops
return ops.transform(d)
time_pandas = timeit.timeit(f, number=reps)
time_pandas
121.25399759000001
time_pandas/reps
24.250799518
data_algebra modin[ray] solution
import importlib
from data_algebra.modin_model import ModinModel
modin_pandas = importlib.import_module("modin.pandas")
data_model = ModinModel(modin_engine='ray')
d_modin = modin_pandas.DataFrame(d)
data_map = {'d': d_modin}
UserWarning: Distributing <class 'pandas.core.frame.DataFrame'> object. This may take some time.
Note: modin may not be in parallel mode for many of the steps.
%%capture
def f_modin():
global d_modin
global data_model
global ops
global data_map
# cleanup
to_del = [k for k in data_map.keys() if k != 'd']
for k in to_del:
del data_map[k]
# execute
return data_model.eval(ops, data_map=data_map)
res_modin = f_modin()
res_modin
'TMP_0000000_T'
res_pandas = data_model.to_pandas(res_modin, data_map=data_map)
assert data_algebra.test_util.equivalent_frames(res_pandas, expect)
%%capture
time_modin = timeit.timeit(f_modin, number=reps)
time_modin
605.750196339
time_modin/reps
121.1500392678
data_algebra SQL solution with copy in/out time
db_model = data_algebra.SQLite.SQLiteModel()
print(ops.to_sql(db_model, pretty=True))
SELECT "g",
"v_00001",
"v_00002",
"v_00003",
"v_00004",
"v_00005",
"v_00006",
"v_00007",
"v_00008",
"v_00009",
"v_00010",
"max_v_00001",
"max_v_00002",
"max_v_00003",
"max_v_00004",
"max_v_00005",
"max_v_00006",
"max_v_00007",
"max_v_00008",
"max_v_00009",
"max_v_00010"
FROM
(SELECT "g",
"v_00001",
"v_00002",
"v_00003",
"v_00004",
"v_00005",
"v_00006",
"v_00007",
"v_00008",
"v_00009",
"v_00010",
MAX("v_00001") OVER (PARTITION BY "g") AS "max_v_00001",
MAX("v_00002") OVER (PARTITION BY "g") AS "max_v_00002",
MAX("v_00003") OVER (PARTITION BY "g") AS "max_v_00003",
MAX("v_00004") OVER (PARTITION BY "g") AS "max_v_00004",
MAX("v_00005") OVER (PARTITION BY "g") AS "max_v_00005",
MAX("v_00006") OVER (PARTITION BY "g") AS "max_v_00006",
MAX("v_00007") OVER (PARTITION BY "g") AS "max_v_00007",
MAX("v_00008") OVER (PARTITION BY "g") AS "max_v_00008",
MAX("v_00009") OVER (PARTITION BY "g") AS "max_v_00009",
MAX("v_00010") OVER (PARTITION BY "g") AS "max_v_00010"
FROM "d") "extend_1"
ORDER BY "g",
"v_00001",
"v_00002",
"v_00003",
"v_00004",
"v_00005",
"v_00006",
"v_00007",
"v_00008",
"v_00009",
"v_00010"
conn = sqlite3.connect(':memory:')
db_model.prepare_connection(conn)
def f_db():
global d
global db_model
global conn
global ops
db_model.insert_table(conn, d, 'd', allow_overwrite=True)
return db_model.read_query(conn, ops.to_sql(db_model))
res_db = f_db()
assert data_algebra.test_util.equivalent_frames(res_db, expect)
time_sql = timeit.timeit(f_db, number=reps)
time_sql
111.5820258849999
time_sql/reps
22.31640517699998
data_algebra SQL solution without copy in/out time
db_handle = data_algebra.db_model.DBHandle(db_model, conn)
data_map = {'d': db_handle.insert_table(d, table_name='d', allow_overwrite=True)}
def f_db_eval():
global data_map
global db_handle
# cleanup
to_del = [k for k in data_map.keys() if k != 'd']
for k in to_del:
db_handle.db_model.execute(conn, "DROP TABLE " + db_handle.db_model.quote_table_name(k))
del data_map[k]
# execute
return db_handle.eval(ops, data_map=data_map)
res_db = f_db_eval()
res_db_pandas = db_handle.to_pandas(res_db, data_map=data_map)
assert data_algebra.test_util.equivalent_frames(res_db_pandas, expect)
time_sql_only = timeit.timeit(f_db_eval, number=reps)
time_sql_only
50.91133955200007
time_sql_only/reps
10.182267910400014
# clean up
conn.close()
SQL only, PostgreSQL
import psycopg2
import data_algebra.PostgreSQL
conn = psycopg2.connect(
database="johnmount",
user="johnmount",
host="localhost",
password=""
)
conn.autocommit=True
db_model = data_algebra.PostgreSQL.PostgreSQLModel()
db_model.prepare_connection(conn) # define any user functions and settings we want/need
db_handle = data_algebra.db_model.DBHandle(db_model, conn)
data_map = {'d': db_handle.insert_table(d, table_name='d', allow_overwrite=True)}
res_PostgreSQL = f_db_eval()
res_PostgreSQL_pandas = db_handle.to_pandas(res_PostgreSQL, data_map=data_map)
assert data_algebra.test_util.equivalent_frames(res_PostgreSQL_pandas, expect)
time_PostgreSQL_only = timeit.timeit(f_db_eval, number=reps)
time_PostgreSQL_only
45.7188854059998
time_PostgreSQL_only/reps
9.14377708119996
# clean up
for k in data_map.keys():
db_handle.db_model.execute(conn, "DROP TABLE " + db_handle.db_model.quote_table_name(k))
conn.close()
Add the following code to your website.
For more information on customizing the embed code, read Embedding Snippets.