Skip to content

Commit

Permalink
chore: cleanup codes
Browse files Browse the repository at this point in the history
  • Loading branch information
Beforerr committed Nov 9, 2024
1 parent f0adc0b commit c18ec97
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 207 deletions.
198 changes: 20 additions & 178 deletions notebooks/02_ids_properties.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"## Duration"
"## Pipelines"
]
},
{
Expand All @@ -85,19 +85,7 @@
"outputs": [],
"source": [
"# | export\n",
"def calc_candidate_duration(candidate, data, **kwargs):\n",
" candidate_data = get_candidate_data(candidate, data)\n",
" return calc_duration(candidate_data, **kwargs)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# | export\n",
"def calc_events_features(\n",
"def calc_events_tr_features(\n",
" df: pl.DataFrame, data, tr_cols=[\"tstart\", \"tstop\"], func=None, **kwargs\n",
"):\n",
" tranges = df.select(tr_cols).to_numpy()\n",
Expand All @@ -113,20 +101,15 @@
"\n",
"\n",
"def calc_events_duration(df, data, tr_cols=[\"tstart\", \"tstop\"], **kwargs):\n",
" return calc_events_features(\n",
" return calc_events_tr_features(\n",
" df, data, tr_cols, func=calc_duration, **kwargs\n",
" ).drop_nulls()\n",
"\n",
"\n",
"def calc_events_mva_features(df, data, tr_cols=[\"t.d_start\", \"t.d_end\"], **kwargs):\n",
" return calc_events_features(df, data, tr_cols, func=calc_mva_features_all, **kwargs)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Normal direction"
" return calc_events_tr_features(\n",
" df, data, tr_cols, func=calc_mva_features_all, **kwargs\n",
" )"
]
},
{
Expand All @@ -136,7 +119,7 @@
"outputs": [],
"source": [
"# | export\n",
"def calc_normal_direction(v1, v2, normalize=True) -> np.ndarray:\n",
"def calc_normal_direction(v1, v2):\n",
" \"\"\"\n",
" Computes the normal direction of two vectors.\n",
"\n",
Expand All @@ -158,47 +141,43 @@
"outputs": [],
"source": [
"# | export\n",
"def calc_events_normal_direction(events: pl.DataFrame, data: xr.DataArray, name=\"k\"):\n",
"def calc_events_normal_direction(\n",
" df: pl.DataFrame, data: xr.DataArray, name=\"k\", tr_cols=[\"t.d_start\", \"t.d_end\"]\n",
"):\n",
" \"\"\"\n",
" Computes the normal directions(s) at two different time steps.\n",
" \"\"\"\n",
" tstart = events[\"t.d_start\"].to_numpy()\n",
" tstop = events[\"t.d_end\"].to_numpy()\n",
" tstart, tstop = df.select(tr_cols).to_numpy()\n",
"\n",
" vecs_before = get_data_at_times(data, tstart)\n",
" vecs_after = get_data_at_times(data, tstop)\n",
"\n",
" normal_directions = calc_normal_direction(vecs_before, vecs_after)\n",
" # need to convert to list first, as only 1D array is supported\n",
" return events.with_columns(pl.Series(name, normal_directions))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
" return df.with_columns(pl.Series(name, normal_directions))\n",
"\n",
"\n",
"# | export\n",
"def calc_events_vec_change(events: pl.DataFrame, data: xr.DataArray, name=\"dB\"):\n",
"def calc_events_vec_change(\n",
" df: pl.DataFrame, data: xr.DataArray, name=\"dB\", tr_cols=[\"t.d_start\", \"t.d_end\"]\n",
"):\n",
" \"\"\"\n",
" Utils function to calculate features related to the change of the magnetic field\n",
" \"\"\"\n",
" tstart = events[\"t.d_start\"].to_numpy()\n",
" tstop = events[\"t.d_end\"].to_numpy()\n",
" tstart, tstop = df.select(tr_cols).to_numpy()\n",
"\n",
" vecs_before = get_data_at_times(data, tstart)\n",
" vecs_after = get_data_at_times(data, tstop)\n",
" dvecs = vecs_after - vecs_before\n",
"\n",
" return events.with_columns(pl.Series(name, dvecs))"
" return df.with_columns(pl.Series(name, dvecs))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Pipelines"
"## Data processing"
]
},
{
Expand Down Expand Up @@ -232,143 +211,6 @@
" .pipe(calc_events_normal_direction, data=data, name=\"k\")\n",
" ).with_columns(duration=duration_expr)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# | hide\n",
"from nbdev import nbdev_export\n",
"\n",
"nbdev_export()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Test"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Test parallelization\n",
"\n",
"\n",
"Generally `mapply` and `modin` are the fastest. `xorbits` is expected to be the fastest but it is not and it is the slowest one.\n",
"\n",
"```python\n",
"#| notest\n",
"sat = 'jno'\n",
"coord = 'se'\n",
"cols = [\"BX\", \"BY\", \"BZ\"]\n",
"tau = timedelta(seconds=60)\n",
"data_resolution = timedelta(seconds=1)\n",
"\n",
"if True:\n",
" year = 2012\n",
" files = f'../data/{sat}_data_{year}.parquet'\n",
" output = f'../data/{sat}_candidates_{year}_tau_{tau.seconds}.parquet'\n",
"\n",
" data = pl.scan_parquet(files).set_sorted('time').collect()\n",
"\n",
" indices = compute_indices(data, tau)\n",
" # filter condition\n",
" sparse_num = tau / data_resolution // 3\n",
" filter_condition = filter_indices(sparse_num = sparse_num)\n",
"\n",
" candidates = indices.filter(filter_condition).with_columns(pl_format_time(tau)).sort('time')\n",
" \n",
" data_c = compress_data_by_events(data, candidates, tau)\n",
" sat_fgm = df2ts(data_c, cols, attrs={\"units\": \"nT\"})\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# | code-summary: Test different libraries to parallelize the computation\n",
"# | notest\n",
"def test_parallelization(candidates, sat_fgm):\n",
" # process_events(candidates_modin, sat_fgm, sat_state, data_resolution)\n",
"\n",
" # ---\n",
" # successful cases\n",
" # ---\n",
" # candidates_pd.mapply(lambda candidate: calc_candidate_duration(candidate, sat_fgm), axis=1) # this works, 4.2 secs\n",
" # candidates_pd.mapply(calc_candidate_duration, axis=1, data=sat_fgm) # this works, but a little bit slower, 6.7 secs\n",
"\n",
" # candidates_pd.apply(calc_candidate_duration, axis=1, data=sat_fgm) # Standard case: 24+s secs\n",
" # candidates_pd.swifter.apply(calc_candidate_duration, axis=1, data=sat_fgm) # this works with dask, 80 secs\n",
" # candidates_pd.swifter.set_dask_scheduler(scheduler=\"threads\").apply(calc_candidate_duration, axis=1, data=sat_fgm) # this works with dask, 60 secs\n",
" # candidates_modin.apply(lambda candidate: calc_candidate_duration(candidate, sat_fgm), axis=1) # this works with ray, 6 secs # NOTE: can not work with dask\n",
" # candidates_x.apply(calc_candidate_duration, axis=1, data=sat_fgm) # 30 seconds\n",
" # ---\n",
" # failed cases\n",
" # ---\n",
" # candidates_modin.apply(calc_candidate_duration, axis=1, data=sat_fgm) # AttributeError: 'DataFrame' object has no attribute 'sel'\n",
" pass"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import timeit\n",
"from functools import partial"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def benchmark(task_dict, number=1):\n",
" results = {}\n",
" for name, (data, task) in task_dict.items():\n",
" try:\n",
" time_taken = timeit.timeit(lambda: task(data), number=number)\n",
" results[name] = time_taken / number\n",
" except Exception as e:\n",
" results[name] = str(e)\n",
" return results"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# | notest\n",
"def benchmark_results(candidates, sat_fgm):\n",
" import modin.pandas as mpd\n",
"\n",
" candidates_pd = candidates.to_pandas()\n",
" candidates_modin = mpd.DataFrame(candidates_pd)\n",
" # candidates_x = xpd.DataFrame(candidates_pd)\n",
"\n",
" func = partial(calc_candidate_duration, data=sat_fgm)\n",
" task_dict = {\n",
" \"pandas\": (candidates_pd, lambda _: _.apply(func, axis=1)),\n",
" \"pandas-mapply\": (candidates_pd, lambda _: _.mapply(func, axis=1)),\n",
" \"modin\": (candidates_modin, lambda _: _.apply(func, axis=1)),\n",
" # 'xorbits': (candidates_x, lambda _: _.apply(func, axis=1)),\n",
" }\n",
"\n",
" results = benchmark(task_dict)\n",
" return results"
]
}
],
"metadata": {
Expand Down
8 changes: 3 additions & 5 deletions src/discontinuitypy/_modidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,14 @@
'discontinuitypy/core/pipeline.py'),
'discontinuitypy.core.pipeline.ids_finder': ( 'ids_finder.html#ids_finder',
'discontinuitypy/core/pipeline.py')},
'discontinuitypy.core.propeties': { 'discontinuitypy.core.propeties.calc_candidate_duration': ( 'ids_properties.html#calc_candidate_duration',
'discontinuitypy/core/propeties.py'),
'discontinuitypy.core.propeties.calc_events_duration': ( 'ids_properties.html#calc_events_duration',
'discontinuitypy/core/propeties.py'),
'discontinuitypy.core.propeties.calc_events_features': ( 'ids_properties.html#calc_events_features',
'discontinuitypy.core.propeties': { 'discontinuitypy.core.propeties.calc_events_duration': ( 'ids_properties.html#calc_events_duration',
'discontinuitypy/core/propeties.py'),
'discontinuitypy.core.propeties.calc_events_mva_features': ( 'ids_properties.html#calc_events_mva_features',
'discontinuitypy/core/propeties.py'),
'discontinuitypy.core.propeties.calc_events_normal_direction': ( 'ids_properties.html#calc_events_normal_direction',
'discontinuitypy/core/propeties.py'),
'discontinuitypy.core.propeties.calc_events_tr_features': ( 'ids_properties.html#calc_events_tr_features',
'discontinuitypy/core/propeties.py'),
'discontinuitypy.core.propeties.calc_events_vec_change': ( 'ids_properties.html#calc_events_vec_change',
'discontinuitypy/core/propeties.py'),
'discontinuitypy.core.propeties.calc_normal_direction': ( 'ids_properties.html#calc_normal_direction',
Expand Down
48 changes: 24 additions & 24 deletions src/discontinuitypy/core/propeties.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# AUTOGENERATED! DO NOT EDIT! File to edit: ../../../notebooks/02_ids_properties.ipynb.

# %% auto 0
__all__ = ['get_data_at_times', 'select_data_by_timerange', 'get_candidate_data', 'calc_candidate_duration',
'calc_events_features', 'calc_events_duration', 'calc_events_mva_features', 'calc_normal_direction',
'calc_events_normal_direction', 'calc_events_vec_change', 'process_events']
__all__ = ['get_data_at_times', 'select_data_by_timerange', 'get_candidate_data', 'calc_events_tr_features',
'calc_events_duration', 'calc_events_mva_features', 'calc_normal_direction', 'calc_events_normal_direction',
'calc_events_vec_change', 'process_events']

# %% ../../../notebooks/02_ids_properties.ipynb 1
# | code-summary: "Import all the packages needed for the project"
Expand Down Expand Up @@ -45,12 +45,7 @@ def ld2dl(listdict: list[dict], func=np.array):
return {key: func([result[key] for result in listdict]) for key in listdict[0]}

# %% ../../../notebooks/02_ids_properties.ipynb 5
def calc_candidate_duration(candidate, data, **kwargs):
candidate_data = get_candidate_data(candidate, data)
return calc_duration(candidate_data, **kwargs)

# %% ../../../notebooks/02_ids_properties.ipynb 6
def calc_events_features(
def calc_events_tr_features(
df: pl.DataFrame, data, tr_cols=["tstart", "tstop"], func=None, **kwargs
):
tranges = df.select(tr_cols).to_numpy()
Expand All @@ -66,16 +61,18 @@ def remote(tr, **kwargs):


def calc_events_duration(df, data, tr_cols=["tstart", "tstop"], **kwargs):
return calc_events_features(
return calc_events_tr_features(
df, data, tr_cols, func=calc_duration, **kwargs
).drop_nulls()


def calc_events_mva_features(df, data, tr_cols=["t.d_start", "t.d_end"], **kwargs):
return calc_events_features(df, data, tr_cols, func=calc_mva_features_all, **kwargs)
return calc_events_tr_features(
df, data, tr_cols, func=calc_mva_features_all, **kwargs
)

# %% ../../../notebooks/02_ids_properties.ipynb 8
def calc_normal_direction(v1, v2, normalize=True) -> np.ndarray:
# %% ../../../notebooks/02_ids_properties.ipynb 6
def calc_normal_direction(v1, v2):
"""
Computes the normal direction of two vectors.
Expand All @@ -89,36 +86,39 @@ def calc_normal_direction(v1, v2, normalize=True) -> np.ndarray:
c = np.cross(v1, v2)
return c / np.linalg.norm(c, axis=-1, keepdims=True)

# %% ../../../notebooks/02_ids_properties.ipynb 9
def calc_events_normal_direction(events: pl.DataFrame, data: xr.DataArray, name="k"):
# %% ../../../notebooks/02_ids_properties.ipynb 7
def calc_events_normal_direction(
df: pl.DataFrame, data: xr.DataArray, name="k", tr_cols=["t.d_start", "t.d_end"]
):
"""
Computes the normal directions(s) at two different time steps.
"""
tstart = events["t.d_start"].to_numpy()
tstop = events["t.d_end"].to_numpy()
tstart, tstop = df.select(tr_cols).to_numpy()

vecs_before = get_data_at_times(data, tstart)
vecs_after = get_data_at_times(data, tstop)

normal_directions = calc_normal_direction(vecs_before, vecs_after)
# need to convert to list first, as only 1D array is supported
return events.with_columns(pl.Series(name, normal_directions))
return df.with_columns(pl.Series(name, normal_directions))


# %% ../../../notebooks/02_ids_properties.ipynb 10
def calc_events_vec_change(events: pl.DataFrame, data: xr.DataArray, name="dB"):
# | export
def calc_events_vec_change(
df: pl.DataFrame, data: xr.DataArray, name="dB", tr_cols=["t.d_start", "t.d_end"]
):
"""
Utils function to calculate features related to the change of the magnetic field
"""
tstart = events["t.d_start"].to_numpy()
tstop = events["t.d_end"].to_numpy()
tstart, tstop = df.select(tr_cols).to_numpy()

vecs_before = get_data_at_times(data, tstart)
vecs_after = get_data_at_times(data, tstop)
dvecs = vecs_after - vecs_before

return events.with_columns(pl.Series(name, dvecs))
return df.with_columns(pl.Series(name, dvecs))

# %% ../../../notebooks/02_ids_properties.ipynb 12
# %% ../../../notebooks/02_ids_properties.ipynb 9
def process_events(
events: pl.DataFrame, # potential candidates DataFrame
data: xr.DataArray,
Expand Down

0 comments on commit c18ec97

Please sign in to comment.