diff --git a/notebooks/02_ids_properties.ipynb b/notebooks/02_ids_properties.ipynb index 8cd52f2..53a878a 100644 --- a/notebooks/02_ids_properties.ipynb +++ b/notebooks/02_ids_properties.ipynb @@ -63,7 +63,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Duration" + "## Pipelines" ] }, { @@ -85,19 +85,7 @@ "outputs": [], "source": [ "# | export\n", - "def calc_candidate_duration(candidate, data, **kwargs):\n", - " candidate_data = get_candidate_data(candidate, data)\n", - " return calc_duration(candidate_data, **kwargs)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# | export\n", - "def calc_events_features(\n", + "def calc_events_tr_features(\n", " df: pl.DataFrame, data, tr_cols=[\"tstart\", \"tstop\"], func=None, **kwargs\n", "):\n", " tranges = df.select(tr_cols).to_numpy()\n", @@ -113,20 +101,15 @@ "\n", "\n", "def calc_events_duration(df, data, tr_cols=[\"tstart\", \"tstop\"], **kwargs):\n", - " return calc_events_features(\n", + " return calc_events_tr_features(\n", " df, data, tr_cols, func=calc_duration, **kwargs\n", " ).drop_nulls()\n", "\n", "\n", "def calc_events_mva_features(df, data, tr_cols=[\"t.d_start\", \"t.d_end\"], **kwargs):\n", - " return calc_events_features(df, data, tr_cols, func=calc_mva_features_all, **kwargs)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Normal direction" + " return calc_events_tr_features(\n", + " df, data, tr_cols, func=calc_mva_features_all, **kwargs\n", + " )" ] }, { @@ -136,7 +119,7 @@ "outputs": [], "source": [ "# | export\n", - "def calc_normal_direction(v1, v2, normalize=True) -> np.ndarray:\n", + "def calc_normal_direction(v1, v2):\n", " \"\"\"\n", " Computes the normal direction of two vectors.\n", "\n", @@ -158,47 +141,43 @@ "outputs": [], "source": [ "# | export\n", - "def calc_events_normal_direction(events: pl.DataFrame, data: xr.DataArray, name=\"k\"):\n", + "def calc_events_normal_direction(\n", + " df: pl.DataFrame, data: xr.DataArray, name=\"k\", tr_cols=[\"t.d_start\", \"t.d_end\"]\n", + "):\n", " \"\"\"\n", " Computes the normal directions(s) at two different time steps.\n", " \"\"\"\n", - " tstart = events[\"t.d_start\"].to_numpy()\n", - " tstop = events[\"t.d_end\"].to_numpy()\n", + " tstart, tstop = df.select(tr_cols).to_numpy()\n", "\n", " vecs_before = get_data_at_times(data, tstart)\n", " vecs_after = get_data_at_times(data, tstop)\n", "\n", " normal_directions = calc_normal_direction(vecs_before, vecs_after)\n", " # need to convert to list first, as only 1D array is supported\n", - " return events.with_columns(pl.Series(name, normal_directions))" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ + " return df.with_columns(pl.Series(name, normal_directions))\n", + "\n", + "\n", "# | export\n", - "def calc_events_vec_change(events: pl.DataFrame, data: xr.DataArray, name=\"dB\"):\n", + "def calc_events_vec_change(\n", + " df: pl.DataFrame, data: xr.DataArray, name=\"dB\", tr_cols=[\"t.d_start\", \"t.d_end\"]\n", + "):\n", " \"\"\"\n", " Utils function to calculate features related to the change of the magnetic field\n", " \"\"\"\n", - " tstart = events[\"t.d_start\"].to_numpy()\n", - " tstop = events[\"t.d_end\"].to_numpy()\n", + " tstart, tstop = df.select(tr_cols).to_numpy()\n", "\n", " vecs_before = get_data_at_times(data, tstart)\n", " vecs_after = get_data_at_times(data, tstop)\n", " dvecs = vecs_after - vecs_before\n", "\n", - " return events.with_columns(pl.Series(name, dvecs))" + " return df.with_columns(pl.Series(name, dvecs))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Pipelines" + "## Data processing" ] }, { @@ -232,143 +211,6 @@ " .pipe(calc_events_normal_direction, data=data, name=\"k\")\n", " ).with_columns(duration=duration_expr)" ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# | hide\n", - "from nbdev import nbdev_export\n", - "\n", - "nbdev_export()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "## Test" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Test parallelization\n", - "\n", - "\n", - "Generally `mapply` and `modin` are the fastest. `xorbits` is expected to be the fastest but it is not and it is the slowest one.\n", - "\n", - "```python\n", - "#| notest\n", - "sat = 'jno'\n", - "coord = 'se'\n", - "cols = [\"BX\", \"BY\", \"BZ\"]\n", - "tau = timedelta(seconds=60)\n", - "data_resolution = timedelta(seconds=1)\n", - "\n", - "if True:\n", - " year = 2012\n", - " files = f'../data/{sat}_data_{year}.parquet'\n", - " output = f'../data/{sat}_candidates_{year}_tau_{tau.seconds}.parquet'\n", - "\n", - " data = pl.scan_parquet(files).set_sorted('time').collect()\n", - "\n", - " indices = compute_indices(data, tau)\n", - " # filter condition\n", - " sparse_num = tau / data_resolution // 3\n", - " filter_condition = filter_indices(sparse_num = sparse_num)\n", - "\n", - " candidates = indices.filter(filter_condition).with_columns(pl_format_time(tau)).sort('time')\n", - " \n", - " data_c = compress_data_by_events(data, candidates, tau)\n", - " sat_fgm = df2ts(data_c, cols, attrs={\"units\": \"nT\"})\n", - "```" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# | code-summary: Test different libraries to parallelize the computation\n", - "# | notest\n", - "def test_parallelization(candidates, sat_fgm):\n", - " # process_events(candidates_modin, sat_fgm, sat_state, data_resolution)\n", - "\n", - " # ---\n", - " # successful cases\n", - " # ---\n", - " # candidates_pd.mapply(lambda candidate: calc_candidate_duration(candidate, sat_fgm), axis=1) # this works, 4.2 secs\n", - " # candidates_pd.mapply(calc_candidate_duration, axis=1, data=sat_fgm) # this works, but a little bit slower, 6.7 secs\n", - "\n", - " # candidates_pd.apply(calc_candidate_duration, axis=1, data=sat_fgm) # Standard case: 24+s secs\n", - " # candidates_pd.swifter.apply(calc_candidate_duration, axis=1, data=sat_fgm) # this works with dask, 80 secs\n", - " # candidates_pd.swifter.set_dask_scheduler(scheduler=\"threads\").apply(calc_candidate_duration, axis=1, data=sat_fgm) # this works with dask, 60 secs\n", - " # candidates_modin.apply(lambda candidate: calc_candidate_duration(candidate, sat_fgm), axis=1) # this works with ray, 6 secs # NOTE: can not work with dask\n", - " # candidates_x.apply(calc_candidate_duration, axis=1, data=sat_fgm) # 30 seconds\n", - " # ---\n", - " # failed cases\n", - " # ---\n", - " # candidates_modin.apply(calc_candidate_duration, axis=1, data=sat_fgm) # AttributeError: 'DataFrame' object has no attribute 'sel'\n", - " pass" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import timeit\n", - "from functools import partial" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "def benchmark(task_dict, number=1):\n", - " results = {}\n", - " for name, (data, task) in task_dict.items():\n", - " try:\n", - " time_taken = timeit.timeit(lambda: task(data), number=number)\n", - " results[name] = time_taken / number\n", - " except Exception as e:\n", - " results[name] = str(e)\n", - " return results" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# | notest\n", - "def benchmark_results(candidates, sat_fgm):\n", - " import modin.pandas as mpd\n", - "\n", - " candidates_pd = candidates.to_pandas()\n", - " candidates_modin = mpd.DataFrame(candidates_pd)\n", - " # candidates_x = xpd.DataFrame(candidates_pd)\n", - "\n", - " func = partial(calc_candidate_duration, data=sat_fgm)\n", - " task_dict = {\n", - " \"pandas\": (candidates_pd, lambda _: _.apply(func, axis=1)),\n", - " \"pandas-mapply\": (candidates_pd, lambda _: _.mapply(func, axis=1)),\n", - " \"modin\": (candidates_modin, lambda _: _.apply(func, axis=1)),\n", - " # 'xorbits': (candidates_x, lambda _: _.apply(func, axis=1)),\n", - " }\n", - "\n", - " results = benchmark(task_dict)\n", - " return results" - ] } ], "metadata": { diff --git a/src/discontinuitypy/_modidx.py b/src/discontinuitypy/_modidx.py index bc7e473..2ec67ae 100644 --- a/src/discontinuitypy/_modidx.py +++ b/src/discontinuitypy/_modidx.py @@ -39,16 +39,14 @@ 'discontinuitypy/core/pipeline.py'), 'discontinuitypy.core.pipeline.ids_finder': ( 'ids_finder.html#ids_finder', 'discontinuitypy/core/pipeline.py')}, - 'discontinuitypy.core.propeties': { 'discontinuitypy.core.propeties.calc_candidate_duration': ( 'ids_properties.html#calc_candidate_duration', - 'discontinuitypy/core/propeties.py'), - 'discontinuitypy.core.propeties.calc_events_duration': ( 'ids_properties.html#calc_events_duration', - 'discontinuitypy/core/propeties.py'), - 'discontinuitypy.core.propeties.calc_events_features': ( 'ids_properties.html#calc_events_features', + 'discontinuitypy.core.propeties': { 'discontinuitypy.core.propeties.calc_events_duration': ( 'ids_properties.html#calc_events_duration', 'discontinuitypy/core/propeties.py'), 'discontinuitypy.core.propeties.calc_events_mva_features': ( 'ids_properties.html#calc_events_mva_features', 'discontinuitypy/core/propeties.py'), 'discontinuitypy.core.propeties.calc_events_normal_direction': ( 'ids_properties.html#calc_events_normal_direction', 'discontinuitypy/core/propeties.py'), + 'discontinuitypy.core.propeties.calc_events_tr_features': ( 'ids_properties.html#calc_events_tr_features', + 'discontinuitypy/core/propeties.py'), 'discontinuitypy.core.propeties.calc_events_vec_change': ( 'ids_properties.html#calc_events_vec_change', 'discontinuitypy/core/propeties.py'), 'discontinuitypy.core.propeties.calc_normal_direction': ( 'ids_properties.html#calc_normal_direction', diff --git a/src/discontinuitypy/core/propeties.py b/src/discontinuitypy/core/propeties.py index 0d43766..2e57480 100644 --- a/src/discontinuitypy/core/propeties.py +++ b/src/discontinuitypy/core/propeties.py @@ -1,9 +1,9 @@ # AUTOGENERATED! DO NOT EDIT! File to edit: ../../../notebooks/02_ids_properties.ipynb. # %% auto 0 -__all__ = ['get_data_at_times', 'select_data_by_timerange', 'get_candidate_data', 'calc_candidate_duration', - 'calc_events_features', 'calc_events_duration', 'calc_events_mva_features', 'calc_normal_direction', - 'calc_events_normal_direction', 'calc_events_vec_change', 'process_events'] +__all__ = ['get_data_at_times', 'select_data_by_timerange', 'get_candidate_data', 'calc_events_tr_features', + 'calc_events_duration', 'calc_events_mva_features', 'calc_normal_direction', 'calc_events_normal_direction', + 'calc_events_vec_change', 'process_events'] # %% ../../../notebooks/02_ids_properties.ipynb 1 # | code-summary: "Import all the packages needed for the project" @@ -45,12 +45,7 @@ def ld2dl(listdict: list[dict], func=np.array): return {key: func([result[key] for result in listdict]) for key in listdict[0]} # %% ../../../notebooks/02_ids_properties.ipynb 5 -def calc_candidate_duration(candidate, data, **kwargs): - candidate_data = get_candidate_data(candidate, data) - return calc_duration(candidate_data, **kwargs) - -# %% ../../../notebooks/02_ids_properties.ipynb 6 -def calc_events_features( +def calc_events_tr_features( df: pl.DataFrame, data, tr_cols=["tstart", "tstop"], func=None, **kwargs ): tranges = df.select(tr_cols).to_numpy() @@ -66,16 +61,18 @@ def remote(tr, **kwargs): def calc_events_duration(df, data, tr_cols=["tstart", "tstop"], **kwargs): - return calc_events_features( + return calc_events_tr_features( df, data, tr_cols, func=calc_duration, **kwargs ).drop_nulls() def calc_events_mva_features(df, data, tr_cols=["t.d_start", "t.d_end"], **kwargs): - return calc_events_features(df, data, tr_cols, func=calc_mva_features_all, **kwargs) + return calc_events_tr_features( + df, data, tr_cols, func=calc_mva_features_all, **kwargs + ) -# %% ../../../notebooks/02_ids_properties.ipynb 8 -def calc_normal_direction(v1, v2, normalize=True) -> np.ndarray: +# %% ../../../notebooks/02_ids_properties.ipynb 6 +def calc_normal_direction(v1, v2): """ Computes the normal direction of two vectors. @@ -89,36 +86,39 @@ def calc_normal_direction(v1, v2, normalize=True) -> np.ndarray: c = np.cross(v1, v2) return c / np.linalg.norm(c, axis=-1, keepdims=True) -# %% ../../../notebooks/02_ids_properties.ipynb 9 -def calc_events_normal_direction(events: pl.DataFrame, data: xr.DataArray, name="k"): +# %% ../../../notebooks/02_ids_properties.ipynb 7 +def calc_events_normal_direction( + df: pl.DataFrame, data: xr.DataArray, name="k", tr_cols=["t.d_start", "t.d_end"] +): """ Computes the normal directions(s) at two different time steps. """ - tstart = events["t.d_start"].to_numpy() - tstop = events["t.d_end"].to_numpy() + tstart, tstop = df.select(tr_cols).to_numpy() vecs_before = get_data_at_times(data, tstart) vecs_after = get_data_at_times(data, tstop) normal_directions = calc_normal_direction(vecs_before, vecs_after) # need to convert to list first, as only 1D array is supported - return events.with_columns(pl.Series(name, normal_directions)) + return df.with_columns(pl.Series(name, normal_directions)) + -# %% ../../../notebooks/02_ids_properties.ipynb 10 -def calc_events_vec_change(events: pl.DataFrame, data: xr.DataArray, name="dB"): +# | export +def calc_events_vec_change( + df: pl.DataFrame, data: xr.DataArray, name="dB", tr_cols=["t.d_start", "t.d_end"] +): """ Utils function to calculate features related to the change of the magnetic field """ - tstart = events["t.d_start"].to_numpy() - tstop = events["t.d_end"].to_numpy() + tstart, tstop = df.select(tr_cols).to_numpy() vecs_before = get_data_at_times(data, tstart) vecs_after = get_data_at_times(data, tstop) dvecs = vecs_after - vecs_before - return events.with_columns(pl.Series(name, dvecs)) + return df.with_columns(pl.Series(name, dvecs)) -# %% ../../../notebooks/02_ids_properties.ipynb 12 +# %% ../../../notebooks/02_ids_properties.ipynb 9 def process_events( events: pl.DataFrame, # potential candidates DataFrame data: xr.DataArray,