Skip to content

Commit

Permalink
move scalers to core
Browse files Browse the repository at this point in the history
  • Loading branch information
jmoralez committed Nov 7, 2023
1 parent 73684e5 commit 039e2a3
Show file tree
Hide file tree
Showing 5 changed files with 274 additions and 292 deletions.
125 changes: 99 additions & 26 deletions nbs/core.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,19 @@
"from copy import deepcopy\n",
"from itertools import chain\n",
"from os.path import isfile, join\n",
"from typing import Any, List, Optional\n",
"from typing import Any, Dict, List, Optional\n",
"\n",
"import numpy as np\n",
"import pandas as pd\n",
"import torch\n",
"from utilsforecast.grouped_array import GroupedArray\n",
"from utilsforecast.target_transforms import (\n",
" BaseTargetTransform,\n",
" LocalBoxCox,\n",
" LocalMinMaxScaler, \n",
" LocalRobustScaler, \n",
" LocalStandardScaler,\n",
")\n",
"\n",
"from neuralforecast.tsdataset import TimeSeriesDataset\n",
"from neuralforecast.models import (\n",
Expand Down Expand Up @@ -247,6 +256,23 @@
" }"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4c621a39-5658-4850-95c4-050eee97403d",
"metadata": {},
"outputs": [],
"source": [
"#| exporti\n",
"_type2scaler = {\n",
" 'standard': LocalStandardScaler,\n",
" 'robust': lambda: LocalRobustScaler(scale='mad'),\n",
" 'robust-iqr': lambda: LocalRobustScaler(scale='iqr'),\n",
" 'minmax': LocalMinMaxScaler,\n",
" 'boxcox': LocalBoxCox,\n",
"}"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down Expand Up @@ -290,17 +316,52 @@
" self.models_init = models\n",
" self.models = [deepcopy(model) for model in self.models_init]\n",
" self.freq = pd.tseries.frequencies.to_offset(freq)\n",
" if local_scaler_type is not None and local_scaler_type not in _type2scaler:\n",
" raise ValueError(f'scaler_type must be one of {_type2scaler.keys()}')\n",
" self.local_scaler_type = local_scaler_type\n",
" self.scalers_: Dict[str, BaseTargetTransform]\n",
"\n",
" # Flags and attributes\n",
" self._fitted = False\n",
"\n",
" def _prepare_fit(self, df, static_df, sort_df, scaler_type):\n",
" def _scalers_fit_transform(self, dataset: TimeSeriesDataset) -> None:\n",
" self.scalers_ = {} \n",
" if self.local_scaler_type is None:\n",
" return None\n",
" for i, col in enumerate(dataset.temporal_cols):\n",
" if col == 'available_mask':\n",
" continue\n",
" self.scalers_[col] = _type2scaler[self.local_scaler_type]() \n",
" ga = GroupedArray(dataset.temporal[:, i].numpy(), dataset.indptr)\n",
" dataset.temporal[:, i] = torch.from_numpy(self.scalers_[col].fit_transform(ga))\n",
" \n",
" def _scalers_transform(self, dataset: TimeSeriesDataset) -> None:\n",
" if not self.scalers_:\n",
" return None\n",
" for i, col in enumerate(dataset.temporal_cols):\n",
" scaler = self.scalers_.get(col, None)\n",
" if scaler is None:\n",
" continue\n",
" ga = GroupedArray(dataset.temporal[:, i].numpy(), dataset.indptr)\n",
" dataset.temporal[:, i] = torch.from_numpy(scaler.transform(ga))\n",
"\n",
" def _scalers_target_inverse_transform(self, data: np.ndarray, indptr: np.ndarray) -> np.ndarray:\n",
" if not self.scalers_:\n",
" return data\n",
" for i in range(data.shape[1]):\n",
" ga = GroupedArray(data[:, i], indptr)\n",
" data[:, i] = self.scalers_['y'].inverse_transform(ga)\n",
" return data\n",
"\n",
" def _prepare_fit(self, df, static_df, sort_df, predict_only):\n",
" #TODO: uids, last_dates and ds should be properties of the dataset class. See github issue.\n",
" dataset, uids, last_dates, ds = TimeSeriesDataset.from_df(df=df,\n",
" static_df=static_df,\n",
" sort_df=sort_df,\n",
" scaler_type=scaler_type)\n",
" sort_df=sort_df)\n",
" if predict_only:\n",
" self._scalers_transform(dataset)\n",
" else:\n",
" self._scalers_fit_transform(dataset)\n",
" return dataset, uids, last_dates, ds\n",
"\n",
" def fit(self,\n",
Expand Down Expand Up @@ -347,7 +408,7 @@
" # Process and save new dataset (in self)\n",
" if df is not None:\n",
" self.dataset, self.uids, self.last_dates, self.ds \\\n",
" = self._prepare_fit(df=df, static_df=static_df, sort_df=sort_df, scaler_type=self.local_scaler_type)\n",
" = self._prepare_fit(df=df, static_df=static_df, sort_df=sort_df, predict_only=False)\n",
" self.sort_df = sort_df\n",
" else:\n",
" if verbose: print('Using stored dataset.')\n",
Expand Down Expand Up @@ -420,9 +481,9 @@
"\n",
" # Process new dataset but does not store it.\n",
" if df is not None:\n",
" dataset, uids, last_dates, _ = self._prepare_fit(df=df, static_df=static_df, sort_df=sort_df, scaler_type=None)\n",
" dataset.scalers_ = self.dataset.scalers_\n",
" dataset._transform_temporal()\n",
" dataset, uids, last_dates, _ = self._prepare_fit(\n",
" df=df, static_df=static_df, sort_df=sort_df, predict_only=True\n",
" )\n",
" else:\n",
" dataset = self.dataset\n",
" uids = self.uids\n",
Expand Down Expand Up @@ -454,10 +515,12 @@
" f'Dropped {dropped_rows:,} unused rows from `futr_df`. ' + base_err_msg\n",
" )\n",
" if any(futr_df[col].isnull().any() for col in needed_futr_exog):\n",
" raise ValueError('Found null values in `futr_df`') \n",
" dataset = TimeSeriesDataset.update_dataset(dataset=dataset, future_df=futr_df)\n",
" raise ValueError('Found null values in `futr_df`')\n",
" futr_dataset = dataset.align(futr_df)\n",
" else:\n",
" dataset = TimeSeriesDataset.update_dataset(dataset=dataset, future_df=fcsts_df.reset_index())\n",
" futr_dataset = dataset.align(fcsts_df.reset_index())\n",
" self._scalers_transform(futr_dataset)\n",
" dataset = dataset.append(futr_dataset)\n",
"\n",
" col_idx = 0\n",
" fcsts = np.full((self.h * len(uids), len(cols)), fill_value=np.nan)\n",
Expand All @@ -470,9 +533,9 @@
" fcsts[:, col_idx : col_idx + output_length] = model_fcsts\n",
" col_idx += output_length\n",
" model.set_test_size(old_test_size) # Set back to original value\n",
" if self.dataset.scalers_ is not None:\n",
" if self.scalers_:\n",
" indptr = np.append(0, np.full(len(uids), self.h).cumsum())\n",
" fcsts = self.dataset._invert_target_transform(fcsts, indptr)\n",
" fcsts = self._scalers_target_inverse_transform(fcsts, indptr)\n",
"\n",
" # Declare predictions pd.DataFrame\n",
" fcsts = pd.DataFrame.from_records(fcsts, columns=cols, \n",
Expand Down Expand Up @@ -533,7 +596,7 @@
" # Process and save new dataset (in self)\n",
" if df is not None:\n",
" self.dataset, self.uids, self.last_dates, self.ds = self._prepare_fit(\n",
" df=df, static_df=static_df, sort_df=sort_df, scaler_type=self.local_scaler_type\n",
" df=df, static_df=static_df, sort_df=sort_df, predict_only=False\n",
" )\n",
" self.sort_df = sort_df\n",
" else:\n",
Expand Down Expand Up @@ -589,9 +652,9 @@
" output_length = len(model.loss.output_names)\n",
" fcsts[:,col_idx:(col_idx + output_length)] = model_fcsts\n",
" col_idx += output_length\n",
" if self.dataset.scalers_ is not None:\n",
" if self.scalers_: \n",
" indptr = np.append(0, np.full(self.dataset.n_groups, self.h * n_windows).cumsum())\n",
" fcsts = self.dataset._invert_target_transform(fcsts, indptr)\n",
" fcsts = self._scalers_target_inverse_transform(fcsts, indptr)\n",
"\n",
" self._fitted = True \n",
"\n",
Expand Down Expand Up @@ -686,11 +749,11 @@
" columns=['y'], index=self.ds)\n",
" Y_df = Y_df.reset_index(drop=False)\n",
" fcsts_df = fcsts_df.merge(Y_df, how='left', on=['unique_id', 'ds'])\n",
" if self.dataset.scalers_ is not None:\n",
" if self.scalers_:\n",
" sizes = fcsts_df.groupby('unique_id', observed=True).size().values\n",
" indptr = np.append(0, sizes.cumsum())\n",
" invert_cols = cols + ['y']\n",
" fcsts_df[invert_cols] = self.dataset._invert_target_transform(fcsts_df[invert_cols].values, indptr)\n",
" fcsts_df[invert_cols] = self._scalers_target_inverse_transform(fcsts_df[invert_cols].values, indptr)\n",
"\n",
" return fcsts_df\n",
" \n",
Expand Down Expand Up @@ -751,13 +814,17 @@
" set `save_dataset=False` to skip saving dataset.')\n",
"\n",
" # Save configuration and parameters\n",
" config_dict = {'h': self.h,\n",
" 'freq': self.freq,\n",
" 'uids': self.uids,\n",
" 'last_dates': self.last_dates,\n",
" 'ds': self.ds,\n",
" 'sort_df': self.sort_df,\n",
" '_fitted': self._fitted}\n",
" config_dict = {\n",
" \"h\": self.h,\n",
" \"freq\": self.freq,\n",
" \"uids\": self.uids,\n",
" \"last_dates\": self.last_dates,\n",
" \"ds\": self.ds,\n",
" \"sort_df\": self.sort_df,\n",
" \"_fitted\": self._fitted,\n",
" \"local_scaler_type\": self.local_scaler_type,\n",
" \"scalers_\": self.scalers_,\n",
" }\n",
"\n",
" with open(f\"{path}/configuration.pkl\", \"wb\") as f:\n",
" pickle.dump(config_dict, f)\n",
Expand Down Expand Up @@ -815,7 +882,11 @@
" raise Exception('No configuration found in directory.')\n",
"\n",
" # Create NeuralForecast object\n",
" neuralforecast = NeuralForecast(models=models, freq=config_dict['freq'])\n",
" neuralforecast = NeuralForecast(\n",
" models=models,\n",
" freq=config_dict['freq'],\n",
" local_scaler_type=config_dict['local_scaler_type'],\n",
" )\n",
"\n",
" # Dataset\n",
" if dataset is not None:\n",
Expand All @@ -828,6 +899,8 @@
" # Fitted flag\n",
" neuralforecast._fitted = config_dict['_fitted']\n",
"\n",
" neuralforecast.scalers_ = config_dict['scalers_']\n",
"\n",
" return neuralforecast"
]
},
Expand Down
Loading

0 comments on commit 039e2a3

Please sign in to comment.