Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

H2O: checksum computation is 100 times slower for HDK vs Panas(or polars) #703

Open
Egor-Krivov opened this issue Oct 12, 2023 · 0 comments

Comments

@Egor-Krivov
Copy link

Currently HDK is very slow at checksum computation for H2O benchmark. It is significantly slower than pandas at checksum computation. Compared with polars it is significantly slower as well if we combine query execution and checksum computation.

Here are my results

Query HDK execution time, s HDK checksum time, s Pandas checksum time, s
Groupby Q10 9.7 22.5 0.2
Join Q1 1.1 9.0 0.2
Join Q2 1.0 9.6 0.1
Join Q3 0.7 2.9 0.3
Join Q4 1.0 10.3 0.2
Join Q5 6.5 10.8 0.1

I will attach csv files with raw results in H2O format to this issue.
results_groupby.csv
results_join.csv

Might be related to #696

Groupby source code (mainly provided by HDK team)
#!/usr/bin/env python

print("# groupby-pyhdk.py", flush=True)

import os
import gc
import sys
import timeit
import pyhdk

sys.path.append(os.path.join(os.path.dirname(__file__), "..", "_helpers"))
from helpers import memory_usage, write_log, make_chk

# exec(open("./_helpers/helpers.py").read())

ver = pyhdk.__version__
git = pyhdk.__version__
task = "groupby"
solution = "pyhdk_nonlazy"
fun = ".groupby"
cache = "TRUE"
on_disk = "FALSE"

data_name = os.environ["SRC_DATANAME"]
src_grp = os.path.join("data", data_name + ".csv")
print("loading dataset %s" % data_name, flush=True)

# hdk = pyhdk.init(enable_cpu_groupby_multifrag_kernels=False)
# hdk = pyhdk.init(enable_non_lazy_data_import=True)# enable_cpu_groupby_multifrag_kernels=False)
pyhdk.initLogger(debug_logs=True)
hdk = pyhdk.init(enable_debug_timer=True, )
x = hdk.import_csv(
    src_grp,
    schema={
        "id1": "dict",
        "id2": "dict",
        "id3": "dict",
        "id4": "int32",
        "id5": "int32",
        "id6": "int32",
        "v1": "int32",
        "v2": "int32",
        "v3": "fp64",
    },
)
# TODO: use warm-up SQL query if using SQL in bench

task_init = timeit.default_timer()
print("grouping...", flush=True)

question = "sum v1 by id1"  # q1
gc.collect()
t_start = timeit.default_timer()
ans = x.agg("id1", v1="sum(v1)").run()
t = timeit.default_timer() - t_start
print(ans.shape, flush=True)
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], v1="sum(v1)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=1,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.agg("id1", v1="sum(v1)").run()
t = timeit.default_timer() - t_start
print(ans.shape, flush=True)
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], v1="sum(v1)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=2,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans

question = "sum v1 by id1:id2"  # q2
gc.collect()
t_start = timeit.default_timer()
ans = x.agg(["id1", "id2"], v1="sum(v1)").run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], v1="sum(v1)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=1,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.agg(["id1", "id2"], v1="sum(v1)").run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], v1="sum(v1)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=2,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans

question = "sum v1 mean v3 by id3"  # q3
gc.collect()
t_start = timeit.default_timer()
ans = x.agg("id3", v1="sum(v1)", v3="avg(v3)").run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], v1="sum(v1)", v3="sum(v3)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=1,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.agg("id3", v1="sum(v1)", v3="avg(v3)").run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], v1="sum(v1)", v3="sum(v3)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=2,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans

question = "mean v1:v3 by id4"  # q4
gc.collect()
t_start = timeit.default_timer()
ans = x.agg("id4", v1="avg(v1)", v2="avg(v2)", v3="avg(v3)").run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], v1="sum(v1)", v2="sum(v2)", v3="sum(v3)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=1,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.agg("id4", v1="avg(v1)", v2="avg(v2)", v3="avg(v3)").run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], v1="sum(v1)", v2="sum(v2)", v3="sum(v3)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=2,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans

question = "sum v1:v3 by id6"  # q5
gc.collect()
t_start = timeit.default_timer()
ans = x.agg("id6", v1="sum(v1)", v2="sum(v2)", v3="sum(v3)").run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], v1="sum(v1)", v2="sum(v2)", v3="sum(v3)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=1,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.agg("id6", v1="sum(v1)", v2="sum(v2)", v3="sum(v3)").run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], v1="sum(v1)", v2="sum(v2)", v3="sum(v3)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=2,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans

question = "median v3 sd v3 by id4 id5"  # q6
gc.collect()
t_start = timeit.default_timer()
ans = x.agg(
    ["id4", "id5"], v3_median="approx_quantile(v3, 0.5)", v3_stddev="stddev(v3)"
).run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], "sum(v3_median)", "sum(v3_stddev)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=1,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.agg(
    ["id4", "id5"], v3_median="approx_quantile(v3, 0.5)", v3_stddev="stddev(v3)"
).run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], "sum(v3_median)", "sum(v3_stddev)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=2,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans

question = "max v1 - min v2 by id3"  # q7
gc.collect()
t_start = timeit.default_timer()
tmp = x.agg("id3", "max(v1)", "min(v2)")
ans = tmp.proj("id3", range_v1_v2=tmp["v1_max"] - tmp["v2_min"]).run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], range_v1_v2="sum(range_v1_v2)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=1,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
del ans
gc.collect()
t_start = timeit.default_timer()
tmp = x.agg("id3", "max(v1)", "min(v2)")
ans = tmp.proj("id3", range_v1_v2=tmp["v1_max"] - tmp["v2_min"]).run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], range_v1_v2="sum(range_v1_v2)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=2,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans

question = "largest two v3 by id6"  # q8
gc.collect()
t_start = timeit.default_timer()
tmp = x.proj(
    "id6",
    "v3",
    row_no=hdk.row_number().over(x.ref("id6")).order_by((x.ref("v3"), "desc")),
)
ans = tmp.filter(tmp.ref("row_no") < 3).proj("id6", "v3").run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], "sum(v3)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=1,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
del ans
gc.collect()
t_start = timeit.default_timer()
tmp = x.proj(
    "id6",
    "v3",
    row_no=hdk.row_number().over(x.ref("id6")).order_by((x.ref("v3"), "desc")),
)
ans = tmp.filter(tmp.ref("row_no") < 3).proj("id6", "v3").run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], "sum(v3)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=2,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans

question = "regression v1 v2 by id2 id4"  # q9
gc.collect()
t_start = timeit.default_timer()
tmp = x.agg(["id2", "id4"], r2="corr(v1, v2)")
ans = tmp.proj(r2=tmp["r2"] * tmp["r2"]).run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], r2="sum(r2)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=1,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
del ans
gc.collect()
t_start = timeit.default_timer()
tmp = x.agg(["id2", "id4"], r2="corr(v1, v2)")
ans = tmp.proj(r2=tmp["r2"] * tmp["r2"]).run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], r2="sum(r2)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=2,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans

question = "sum v3 count by id1:id6"  # q10
gc.collect()
t_start = timeit.default_timer()
ans = x.agg(["id1", "id2", "id3", "id4", "id5", "id6"], v3="sum(v3)", v1="count").run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], v3="sum(v3)", v1="sum(v1)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=1,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.agg(["id1", "id2", "id3", "id4", "id5", "id6"], v3="sum(v3)", v1="count").run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], v3="sum(v3)", v1="sum(v1)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=2,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans

print(
    "grouping finished, took %0.fs" % (timeit.default_timer() - task_init), flush=True
)

exit(0)
Join source code (mainly provided by HDK team)
#!/usr/bin/env python

print("# join-pyhdk.py", flush=True)

import os
import gc
import timeit
import sys
import pyhdk

sys.path.append(os.path.join(os.path.dirname(__file__), "..", "_helpers"))
from helpers import memory_usage, write_log, make_chk, join_to_tbls

# exec(open("./_helpers/helpers.py").read())

ver = pyhdk.__version__
git = pyhdk.__version__
task = "join"
solution = "pyhdk"
fun = ".join"
cache = "TRUE"
on_disk = "FALSE"

data_name = os.environ["SRC_DATANAME"]
src_jn_x = os.path.join("data", data_name + ".csv")
y_data_name = join_to_tbls(data_name)
print("pyhdk data_name: ", data_name)
src_jn_y = [
    os.path.join("data", y_data_name[0] + ".csv"),
    os.path.join("data", y_data_name[1] + ".csv"),
    os.path.join("data", y_data_name[2] + ".csv"),
]
if len(src_jn_y) != 3:
    raise Exception("Something went wrong in preparing files used for join")

print(
    "loading datasets "
    + data_name
    + ", "
    + y_data_name[0]
    + ", "
    + y_data_name[1]
    + ", "
    + y_data_name[2],
    flush=True,
)
pyhdk_init_args = {}
# pyhdk_init_args["enable_debug_timer"] = True
# pyhdk_init_args["enable-non-lazy-data-import"] = True
pyhdk_init_args["enable_cpu_groupby_multifrag_kernels"] = False
pyhdk.initLogger(debug_logs=True)

fragment_size = int(os.environ["FRAGMENT_SIZE"])
print(f"Using fragment size {fragment_size}")

hdk = pyhdk.init(**pyhdk_init_args)
# TODO: use 32-bit integers for less memory consumption and better perf
x = hdk.import_csv(
    src_jn_x,
    schema={
        "id1": "int32",
        "id2": "int32",
        "id3": "int32",
        "id4": "dict",
        "id5": "dict",
        "id6": "dict",
        "v1": "fp64",
    },
    fragment_size=fragment_size,
)
small = hdk.import_csv(
    src_jn_y[0],
    schema={"id1": "int32", "id4": "dict", "v2": "fp64"},
    fragment_size=fragment_size,
)
medium = hdk.import_csv(
    src_jn_y[1],
    schema={"id1": "int32", "id2": "int32", "id4": "dict", "id5": "dict", "v2": "fp64"},
    fragment_size=fragment_size,
)
big = hdk.import_csv(
    src_jn_y[2],
    schema={
        "id1": "int32",
        "id2": "int32",
        "id3": "int32",
        "id4": "dict",
        "id5": "dict",
        "id6": "dict",
        "v2": "fp64",
    },
    fragment_size=fragment_size,
)
print(x.shape[0], flush=True)
print(small.shape[0], flush=True)
print(medium.shape[0], flush=True)
print(big.shape[0], flush=True)

task_init = timeit.default_timer()
print("joining...", flush=True)


question = "small inner on int"  # q1
gc.collect()
t_start = timeit.default_timer()
ans = x.join(small, "id1").run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], v1="sum(v1)", v2="sum(v2)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=1,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.join(small, "id1").run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], v1="sum(v1)", v2="sum(v2)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=2,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
# print(ans.head(3), flush=True)
# print(ans.tail(3), flush=True)
del ans

question = "medium inner on int"  # q2
gc.collect()
t_start = timeit.default_timer()
ans = x.join(medium, "id2").run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], v1="sum(v1)", v2="sum(v2)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=1,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.join(medium, "id2").run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], v1="sum(v1)", v2="sum(v2)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=2,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
# print(ans.head(3), flush=True)
# print(ans.tail(3), flush=True)
del ans

question = "medium outer on int"  # q3
gc.collect()
t_start = timeit.default_timer()
ans = x.join(medium, "id2", how="left").run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], v1="sum(v1)", v2="sum(v2)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=1,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.join(medium, "id2", how="left").run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], v1="sum(v1)", v2="sum(v2)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=2,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
# print(ans.head(3), flush=True)
# print(ans.tail(3), flush=True)
del ans

question = "medium inner on factor"  # q4
gc.collect()
t_start = timeit.default_timer()
ans = x.join(medium, "id5").run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], v1="sum(v1)", v2="sum(v2)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=1,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.join(medium, "id5").run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], v1="sum(v1)", v2="sum(v2)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=2,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
# print(ans.head(3), flush=True)
# print(ans.tail(3), flush=True)
del ans

question = "big inner on int"  # q5
gc.collect()
t_start = timeit.default_timer()
ans = x.join(big, "id3").run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], v1="sum(v1)", v2="sum(v2)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=1,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x.join(big, "id3").run()
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans.agg([], v1="sum(v1)", v2="sum(v2)").run().row(0)
chkt = timeit.default_timer() - t_start
write_log(
    task=task,
    data=data_name,
    in_rows=x.shape[0],
    question=question,
    out_rows=ans.shape[0],
    out_cols=ans.shape[1],
    solution=solution,
    version=ver,
    git=git,
    fun=fun,
    run=2,
    time_sec=t,
    mem_gb=m,
    cache=cache,
    chk=make_chk(chk),
    chk_time_sec=chkt,
    on_disk=on_disk,
)
# print(ans.head(3), flush=True)
# print(ans.tail(3), flush=True)
del ans

print("joining finished, took %0.fs" % (timeit.default_timer() - task_init), flush=True)

exit(0)
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant