Skip to content
This repository has been archived by the owner on May 9, 2024. It is now read-only.

Commit

Permalink
use HDK class in examples + some new examples
Browse files Browse the repository at this point in the history
  • Loading branch information
akroviakov authored and kurapov-peter committed Dec 4, 2023
1 parent 1ca3597 commit 3d5595e
Show file tree
Hide file tree
Showing 9 changed files with 881 additions and 398 deletions.
209 changes: 209 additions & 0 deletions examples/basic_example_pyHDK.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import pyarrow as pa\n",
"import pyhdk\n",
"import time\n",
"\n",
"hdk = pyhdk.hdk.HDK(\n",
" debug_logs=\"INFO\"\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Init data\n",
"col2_tbl1 = np.array(['red', 'orange', 'yellow', 'green', 'blue'])\n",
"col1_tbl1 = np.arange(len(col2_tbl1))\n",
"\n",
"table1 = pa.Table.from_arrays(\n",
" [pa.array(col1_tbl1, pa.int64()), pa.array(col2_tbl1, pa.string())], \n",
" schema=pa.schema([('ID', pa.int64()), ('color', pa.string())])\n",
")\n",
"\n",
"table2_nrows = 10_000_000 # with more data, we expect GPU to beat CPU\n",
"col1_table2 = np.random.randint(1, 100, size=table2_nrows)\n",
"col2_table2 = np.random.randint(1, 100, size=table2_nrows)\n",
"col3_table2 = np.random.randint(len(col2_tbl1), size=table2_nrows)\n",
"\n",
"table2 = pa.Table.from_arrays(\n",
" [pa.array(col1_table2, pa.int64()), pa.array(col2_table2, pa.int64()), pa.array(col3_table2, pa.int64())], \n",
" schema=pa.schema([(\"price\", pa.int64()), ('Region', pa.int64()), ('color_ID', pa.int64())])\n",
" )\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fragment_count = 8\n",
"hdk_tbl1 = hdk.import_arrow(table1, \"ht1\", int(np.ceil(table1.num_rows/fragment_count)))\n",
"hdk_tbl2 = hdk.import_arrow(table2, \"ht2\", int(np.ceil(table2.num_rows/fragment_count)))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note that cold run may not indicate significant speedups, because HDK\n",
"potentially needs to materialize/build some info about the table and/or the individual columns.\n",
"That info, however, will be preserved and subsequent runs should be faster.\n",
"\n",
"To get \"fair\" results, do not run of optimized versions back-to-back as this will try to reuse results of previous compilations. \n",
"\n",
"You could run the unoptimized version before an optimized one to \n",
"wipe the cached plan and get a time that includes compilation.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Can also refragment original tables\n",
"# hdk_tbl3 = hdk_tbl1.refragmented_view(500_000)\n",
"# hdk_tbl4 = hdk.refragmented_view(\"ht2\", 500_000)\n",
"# OR\n",
"# hdk_tbl4 = hdk.refragmented_view(hdk_tbl2, 500_000)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# To see \"fair\" results, you can at first execute all cells and then click \"execute this cell and below\" \n",
"# Independent ops on CPU (dataframe-like, naive and suboptimal)\n",
"join_start = time.perf_counter()\n",
"join_res = hdk_tbl2.join(hdk_tbl1, lhs_cols=\"color_ID\", rhs_cols=\"ID\").run()\n",
"print(f\"Join time: {(time.perf_counter() - join_start):.3f}s\")\n",
"\n",
"sort_start = time.perf_counter()\n",
"sort_res = join_res.sort(fields={\"price\" : \"desc\"}).run()\n",
"print(f\"Sort time: {(time.perf_counter() - sort_start):.3f}s\")\n",
"\n",
"agg_start = time.perf_counter()\n",
"agg_res = sort_res.agg(\"color_ID\", \"avg(price)\").run()\n",
"print(f\"Agg time: {(time.perf_counter() - agg_start):.3f}s\")\n",
"unopt_query_t = time.perf_counter() - join_start\n",
"print(f\"Total time (unopt): {(unopt_query_t):.3f}s\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Combined plan on CPU \n",
"# Giving the compiler more overview of what we want to achieve allows for more optimizations\n",
"\n",
"q_start = time.perf_counter()\n",
"join_res = hdk_tbl2.join(hdk_tbl1, lhs_cols=\"color_ID\", rhs_cols=\"ID\")\n",
"sort_res = join_res.sort(fields={\"price\" : \"desc\"})\n",
"agg_res = sort_res.agg(\"color_ID\", \"avg(price)\").run()\n",
"opt_query_t = time.perf_counter() - q_start\n",
"print(f\"Total time (Combined plan on CPU): {(opt_query_t):.3f}s\")\n",
"print(f\"Speedup: {(unopt_query_t/opt_query_t):.2f}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Combined plan on GPU\n",
"q_start = time.perf_counter()\n",
"join_res = hdk_tbl2.join(hdk_tbl1, lhs_cols=\"color_ID\", rhs_cols=\"ID\")\n",
"sort_res = join_res.sort(fields={\"price\" : \"desc\"})\n",
"agg_res = join_res.agg(\"color_ID\", \"avg(price)\").run(device_type=\"GPU\")\n",
"opt_query_t = time.perf_counter() - q_start\n",
"print(f\"Total time (Combined plan on GPU): {(opt_query_t):.3f}s\")\n",
"print(f\"Speedup: {(unopt_query_t/opt_query_t):.2f}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Indep ops: Join on CPU, Sort and Agg on GPU\n",
"# Q: Why could it be so much slower than fully on either of the devices? \n",
"# A: GPU must *fetch intermediate results* of the join each run, whereas in\n",
"# the full-GPU mode GPU can retain columns of the table for the next run\n",
"# via BufferManager and thus only needs to transfer the aggregate back to CPU.\n",
"\n",
"join_start = time.perf_counter()\n",
"join_res = hdk_tbl2.join(hdk_tbl1, lhs_cols=\"color_ID\", rhs_cols=\"ID\").run()\n",
"print(f\"Join time: {(time.perf_counter() - join_start):.3f}s\")\n",
"\n",
"sort_res = join_res.sort(fields={\"price\" : \"desc\"})\n",
"agg_start = time.perf_counter()\n",
"agg_res = sort_res.agg(\"color_ID\", \"avg(price)\").run(device_type=\"GPU\")\n",
"print(f\"Sort+Agg time: {(time.perf_counter() - agg_start):.3f}s\")\n",
"opt_query_t = time.perf_counter() - join_start\n",
"print(f\"Speedup (Join on CPU, Agg on GPU): {(unopt_query_t/opt_query_t):.2f}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# SQL example\n",
"q_start = time.perf_counter()\n",
"hdk.sql(\"SELECT color_ID, AVG(price) \\\n",
" FROM ht2 \\\n",
" JOIN ht1 ON ht1.ID = ht2.color_ID \\\n",
" GROUP BY color_ID \\\n",
" ORDER BY AVG(price) DESC\")\n",
"print(f\"SQL time: {(time.perf_counter() - q_start):.3f}s\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "omnisci-dev",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.16"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
51 changes: 11 additions & 40 deletions examples/basic_python_usage.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,12 @@
"source": [
"# Initialization\n",
"import pyhdk \n",
"import pandas\n",
"import pyarrow as pa\n",
"\n",
"# Uses DBID 1\n",
"pyhdk.initLogger()\n",
"config = pyhdk.buildConfig()\n",
"storage = pyhdk.storage.ArrowStorage(1)\n",
"data_mgr = pyhdk.storage.DataMgr(config)\n",
"data_mgr.registerDataProvider(storage)\n",
"\n",
"calcite = pyhdk.sql.Calcite(storage, config)\n",
"executor = pyhdk.Executor(data_mgr, config)"
"hdk = pyhdk.hdk.HDK(\n",
" debug_logs=\"INFO\", # generates log file, DEBUG2 for more verbosity \n",
") "
]
},
{
Expand All @@ -28,15 +24,8 @@
"metadata": {},
"outputs": [],
"source": [
"# Helper Functions\n",
"def get_rel_alg(sql):\n",
" return calcite.process(sql)\n",
"\n",
"def run_query(sql):\n",
" ra = get_rel_alg(sql)\n",
" # One RelAlgExecutor per query\n",
" rel_alg_executor = pyhdk.sql.RelAlgExecutor(executor, storage, data_mgr, ra)\n",
" return rel_alg_executor.execute()"
"tbl = pa.Table.from_pandas(pandas.DataFrame({\"a\": [1, 2, 3], \"b\": [10, 20, 30]}))\n",
"hdk_tbl = hdk.import_arrow(tbl, \"test\")"
]
},
{
Expand All @@ -46,32 +35,14 @@
"metadata": {},
"outputs": [],
"source": [
"## Examples \n",
"\n",
"# Load some data\n",
"import pandas\n",
"import pyarrow as pa\n",
"\n",
"tbl = pa.Table.from_pandas(pandas.DataFrame({\"a\": [1, 2, 3], \"b\": [10, 20, 30]}))\n",
"opt = pyhdk.storage.TableOptions(2)\n",
"storage.importArrowTable(tbl, \"test\", opt)\n",
"\n",
"# Basic query\n",
"print(run_query(\"SELECT * FROM test;\").to_arrow().to_pandas())\n",
"print(hdk.sql(\"SELECT * FROM test;\").to_arrow().to_pandas())\n",
"\n",
"print(run_query(\"SELECT a, count(*), sum(b) FROM test GROUP BY a;\").to_arrow().to_pandas())\n",
"print(hdk.sql(\"SELECT a, count(*), sum(b) FROM test GROUP BY a;\").to_arrow().to_pandas())\n",
"\n",
"# Cleanup\n",
"storage.dropTable(\"test\")"
"hdk.drop_table(\"test\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "24e70d3f",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand All @@ -90,7 +61,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.13"
"version": "3.9.16"
}
},
"nbformat": 4,
Expand Down
Loading

0 comments on commit 3d5595e

Please sign in to comment.