Examples/GroupedPerformance/grouped_performance_data_algebra.md

import sqlite3

import timeit

import pandas

from data_algebra.data_ops import *
import data_algebra.db_model
import data_algebra.SQLite
import data_algebra.test_util

Load example

d = pandas.read_csv('d.csv.gz')
vars = [c for c in d.columns if not c == 'g']

d.head()
g v_00001 v_00002 v_00003 v_00004 v_00005 v_00006 v_00007 v_00008 v_00009 v_00010 0 level_000746732 0.501391 -0.042046 0.907852 1.103653 -0.933225 1.991693 -0.154361 -0.991363 2.328447 0.222225 1 level_000424470 0.512520 -0.536119 -0.827670 -1.587808 -0.047809 0.437973 -0.388076 -0.227378 0.454036 0.340655 2 level_000463169 -1.248967 -1.006886 0.352715 0.847306 1.417280 -1.852341 0.526444 0.051136 -0.053652 1.893562 3 level_000025764 1.093854 -0.975188 0.358386 0.381903 0.513759 0.710493 0.100913 -0.588443 -0.159640 -0.923627 4 level_000630470 0.319834 -0.073545 1.457324 -1.507512 -0.670575 -0.870075 -0.131663 -0.452909 1.415066 -2.134600
d.shape
(1000000, 11)

Set timing reps

reps = 5

data_algebra pandas solution

ops = describe_table(d, table_name='d'). \
    extend({'max_' + v: v + '.max()' for v in vars},
        partition_by=['g']). \
    order_rows(['g'] + vars)

ops    
TableDescription(
 table_name='d',
 column_names=[
   'g', 'v_00001', 'v_00002', 'v_00003', 'v_00004', 'v_00005', 'v_00006',  
 'v_00007', 'v_00008', 'v_00009', 'v_00010']) .\
   extend({
    'max_v_00001': 'v_00001.max()',
    'max_v_00002': 'v_00002.max()',
    'max_v_00003': 'v_00003.max()',
    'max_v_00004': 'v_00004.max()',
    'max_v_00005': 'v_00005.max()',
    'max_v_00006': 'v_00006.max()',
    'max_v_00007': 'v_00007.max()',
    'max_v_00008': 'v_00008.max()',
    'max_v_00009': 'v_00009.max()',
    'max_v_00010': 'v_00010.max()'},
   partition_by=['g']) .\
   order_rows(['g', 'v_00001', 'v_00002', 'v_00003', 'v_00004', 'v_00005', 'v_00006', 'v_00007', 'v_00008', 'v_00009', 'v_00010'])
res = ops.transform(d)

res.head()
g v_00001 v_00002 v_00003 v_00004 v_00005 v_00006 v_00007 v_00008 v_00009 ... max_v_00001 max_v_00002 max_v_00003 max_v_00004 max_v_00005 max_v_00006 max_v_00007 max_v_00008 max_v_00009 max_v_00010 0 level_000000002 0.480053 0.556812 -0.295186 1.069603 -1.287380 -0.343787 -0.555874 0.481993 -0.085779 ... 0.480053 0.556812 -0.295186 1.069603 -1.287380 -0.343787 -0.555874 0.481993 -0.085779 -1.203414 1 level_000000003 -0.052534 0.983563 0.145466 1.153262 -0.102269 0.593555 -0.437793 -0.052661 1.365170 ... -0.052534 0.983563 0.145466 1.153262 -0.102269 0.593555 -0.437793 -0.052661 1.365170 1.840541 2 level_000000004 0.114769 -0.228287 -0.739238 0.681996 -0.476465 -0.815794 0.426362 0.308667 -0.685185 ... 1.302818 -0.020408 -0.591229 0.681996 0.031225 0.518879 0.426362 0.522919 0.031270 0.647587 3 level_000000004 1.302818 -0.020408 -0.591229 -0.453501 0.031225 0.518879 -0.724670 0.522919 0.031270 ... 1.302818 -0.020408 -0.591229 0.681996 0.031225 0.518879 0.426362 0.522919 0.031270 0.647587 4 level_000000005 0.209939 0.568525 -0.657119 1.791830 1.800427 -0.123661 0.084579 0.057838 1.047468 ... 1.017089 0.568525 -0.022681 1.791830 1.800427 0.519874 0.084579 1.805242 1.047468 2.604739

5 rows × 21 columns

expect = pandas.read_csv('res.csv.gz')

expect.head()
g v_00001 v_00002 v_00003 v_00004 v_00005 v_00006 v_00007 v_00008 v_00009 ... max_v_00001 max_v_00002 max_v_00003 max_v_00004 max_v_00005 max_v_00006 max_v_00007 max_v_00008 max_v_00009 max_v_00010 0 level_000000002 0.480053 0.556812 -0.295186 1.069603 -1.287380 -0.343787 -0.555874 0.481993 -0.085779 ... 0.480053 0.556812 -0.295186 1.069603 -1.287380 -0.343787 -0.555874 0.481993 -0.085779 -1.203414 1 level_000000003 -0.052534 0.983563 0.145466 1.153262 -0.102269 0.593555 -0.437793 -0.052661 1.365170 ... -0.052534 0.983563 0.145466 1.153262 -0.102269 0.593555 -0.437793 -0.052661 1.365170 1.840541 2 level_000000004 0.114769 -0.228287 -0.739238 0.681996 -0.476465 -0.815794 0.426362 0.308667 -0.685185 ... 1.302818 -0.020408 -0.591229 0.681996 0.031225 0.518879 0.426362 0.522919 0.031270 0.647587 3 level_000000004 1.302818 -0.020408 -0.591229 -0.453501 0.031225 0.518879 -0.724670 0.522919 0.031270 ... 1.302818 -0.020408 -0.591229 0.681996 0.031225 0.518879 0.426362 0.522919 0.031270 0.647587 4 level_000000005 0.209939 0.568525 -0.657119 1.791830 1.800427 -0.123661 0.084579 0.057838 1.047468 ... 1.017089 0.568525 -0.022681 1.791830 1.800427 0.519874 0.084579 1.805242 1.047468 2.604739

5 rows × 21 columns

assert data_algebra.test_util.equivalent_frames(res, expect)
def f():
    global d
    global ops
    return ops.transform(d)

time_pandas = timeit.timeit(f, number=reps)
time_pandas
121.25399759000001
time_pandas/reps
24.250799518

data_algebra modin[ray] solution

import importlib

from data_algebra.modin_model import ModinModel
modin_pandas = importlib.import_module("modin.pandas")
data_model = ModinModel(modin_engine='ray')
d_modin = modin_pandas.DataFrame(d)
data_map = {'d':  d_modin}
UserWarning: Distributing <class 'pandas.core.frame.DataFrame'> object. This may take some time.

Note: modin may not be in parallel mode for many of the steps.

%%capture
def f_modin():
    global d_modin
    global data_model
    global ops
    global data_map
    # cleanup 
    to_del = [k for k in data_map.keys() if k != 'd']
    for k in to_del:
        del data_map[k]
    # execute
    return data_model.eval(ops, data_map=data_map)

res_modin = f_modin()
res_modin
'TMP_0000000_T'
res_pandas = data_model.to_pandas(res_modin, data_map=data_map)
assert data_algebra.test_util.equivalent_frames(res_pandas, expect)
%%capture
time_modin = timeit.timeit(f_modin, number=reps)
time_modin
605.750196339
time_modin/reps
121.1500392678

data_algebra SQL solution with copy in/out time

db_model = data_algebra.SQLite.SQLiteModel()
print(ops.to_sql(db_model, pretty=True))
SELECT "g",
       "v_00001",
       "v_00002",
       "v_00003",
       "v_00004",
       "v_00005",
       "v_00006",
       "v_00007",
       "v_00008",
       "v_00009",
       "v_00010",
       "max_v_00001",
       "max_v_00002",
       "max_v_00003",
       "max_v_00004",
       "max_v_00005",
       "max_v_00006",
       "max_v_00007",
       "max_v_00008",
       "max_v_00009",
       "max_v_00010"
FROM
  (SELECT "g",
          "v_00001",
          "v_00002",
          "v_00003",
          "v_00004",
          "v_00005",
          "v_00006",
          "v_00007",
          "v_00008",
          "v_00009",
          "v_00010",
          MAX("v_00001") OVER (PARTITION BY "g") AS "max_v_00001",
                              MAX("v_00002") OVER (PARTITION BY "g") AS "max_v_00002",
                                                  MAX("v_00003") OVER (PARTITION BY "g") AS "max_v_00003",
                                                                      MAX("v_00004") OVER (PARTITION BY "g") AS "max_v_00004",
                                                                                          MAX("v_00005") OVER (PARTITION BY "g") AS "max_v_00005",
                                                                                                              MAX("v_00006") OVER (PARTITION BY "g") AS "max_v_00006",
                                                                                                                                  MAX("v_00007") OVER (PARTITION BY "g") AS "max_v_00007",
                                                                                                                                                      MAX("v_00008") OVER (PARTITION BY "g") AS "max_v_00008",
                                                                                                                                                                          MAX("v_00009") OVER (PARTITION BY "g") AS "max_v_00009",
                                                                                                                                                                                              MAX("v_00010") OVER (PARTITION BY "g") AS "max_v_00010"
   FROM "d") "extend_1"
ORDER BY "g",
         "v_00001",
         "v_00002",
         "v_00003",
         "v_00004",
         "v_00005",
         "v_00006",
         "v_00007",
         "v_00008",
         "v_00009",
         "v_00010"
conn = sqlite3.connect(':memory:')
db_model.prepare_connection(conn)
def f_db():
    global d
    global db_model
    global conn
    global ops
    db_model.insert_table(conn, d, 'd', allow_overwrite=True)
    return db_model.read_query(conn, ops.to_sql(db_model))
res_db = f_db()

assert data_algebra.test_util.equivalent_frames(res_db, expect)
time_sql = timeit.timeit(f_db, number=reps)
time_sql
111.5820258849999
time_sql/reps
22.31640517699998

data_algebra SQL solution without copy in/out time

db_handle = data_algebra.db_model.DBHandle(db_model, conn)
data_map = {'d': db_handle.insert_table(d, table_name='d', allow_overwrite=True)}
def f_db_eval():
    global data_map
    global db_handle
    # cleanup 
    to_del = [k for k in data_map.keys() if k != 'd']
    for k in to_del:
        db_handle.db_model.execute(conn, "DROP TABLE " + db_handle.db_model.quote_table_name(k))
        del data_map[k]
    # execute
    return db_handle.eval(ops, data_map=data_map)

res_db = f_db_eval()
res_db_pandas = db_handle.to_pandas(res_db, data_map=data_map)
assert data_algebra.test_util.equivalent_frames(res_db_pandas, expect)
time_sql_only = timeit.timeit(f_db_eval, number=reps)
time_sql_only
50.91133955200007
time_sql_only/reps
10.182267910400014
# clean up
conn.close()

SQL only, PostgreSQL

import psycopg2  
import data_algebra.PostgreSQL
conn = psycopg2.connect(
    database="johnmount",
    user="johnmount",
    host="localhost",
    password=""
)
conn.autocommit=True
db_model = data_algebra.PostgreSQL.PostgreSQLModel() 
db_model.prepare_connection(conn)  # define any user functions and settings we want/need
db_handle = data_algebra.db_model.DBHandle(db_model, conn)
data_map = {'d': db_handle.insert_table(d, table_name='d', allow_overwrite=True)}
res_PostgreSQL = f_db_eval()
res_PostgreSQL_pandas = db_handle.to_pandas(res_PostgreSQL, data_map=data_map)
assert data_algebra.test_util.equivalent_frames(res_PostgreSQL_pandas, expect)
time_PostgreSQL_only = timeit.timeit(f_db_eval, number=reps)
time_PostgreSQL_only
45.7188854059998
time_PostgreSQL_only/reps
9.14377708119996
# clean up
for k in data_map.keys():
    db_handle.db_model.execute(conn, "DROP TABLE " + db_handle.db_model.quote_table_name(k))
conn.close()



WinVector/rqdatatable documentation built on Aug. 22, 2023, 3:25 p.m.