diff --git a/nbs/core.ipynb b/nbs/core.ipynb index 7f43b13bc..e55bceb08 100644 --- a/nbs/core.ipynb +++ b/nbs/core.ipynb @@ -64,10 +64,19 @@ "from copy import deepcopy\n", "from itertools import chain\n", "from os.path import isfile, join\n", - "from typing import Any, List, Optional\n", + "from typing import Any, Dict, List, Optional\n", "\n", "import numpy as np\n", "import pandas as pd\n", + "import torch\n", + "from utilsforecast.grouped_array import GroupedArray\n", + "from utilsforecast.target_transforms import (\n", + " BaseTargetTransform,\n", + " LocalBoxCox,\n", + " LocalMinMaxScaler, \n", + " LocalRobustScaler, \n", + " LocalStandardScaler,\n", + ")\n", "\n", "from neuralforecast.tsdataset import TimeSeriesDataset\n", "from neuralforecast.models import (\n", @@ -247,6 +256,23 @@ " }" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "4c621a39-5658-4850-95c4-050eee97403d", + "metadata": {}, + "outputs": [], + "source": [ + "#| exporti\n", + "_type2scaler = {\n", + " 'standard': LocalStandardScaler,\n", + " 'robust': lambda: LocalRobustScaler(scale='mad'),\n", + " 'robust-iqr': lambda: LocalRobustScaler(scale='iqr'),\n", + " 'minmax': LocalMinMaxScaler,\n", + " 'boxcox': LocalBoxCox,\n", + "}" + ] + }, { "cell_type": "code", "execution_count": null, @@ -290,17 +316,52 @@ " self.models_init = models\n", " self.models = [deepcopy(model) for model in self.models_init]\n", " self.freq = pd.tseries.frequencies.to_offset(freq)\n", + " if local_scaler_type is not None and local_scaler_type not in _type2scaler:\n", + " raise ValueError(f'scaler_type must be one of {_type2scaler.keys()}')\n", " self.local_scaler_type = local_scaler_type\n", + " self.scalers_: Dict[str, BaseTargetTransform]\n", "\n", " # Flags and attributes\n", " self._fitted = False\n", "\n", - " def _prepare_fit(self, df, static_df, sort_df, scaler_type):\n", + " def _scalers_fit_transform(self, dataset: TimeSeriesDataset) -> None:\n", + " self.scalers_ = {} \n", + " if self.local_scaler_type is None:\n", + " return None\n", + " for i, col in enumerate(dataset.temporal_cols):\n", + " if col == 'available_mask':\n", + " continue\n", + " self.scalers_[col] = _type2scaler[self.local_scaler_type]() \n", + " ga = GroupedArray(dataset.temporal[:, i].numpy(), dataset.indptr)\n", + " dataset.temporal[:, i] = torch.from_numpy(self.scalers_[col].fit_transform(ga))\n", + " \n", + " def _scalers_transform(self, dataset: TimeSeriesDataset) -> None:\n", + " if not self.scalers_:\n", + " return None\n", + " for i, col in enumerate(dataset.temporal_cols):\n", + " scaler = self.scalers_.get(col, None)\n", + " if scaler is None:\n", + " continue\n", + " ga = GroupedArray(dataset.temporal[:, i].numpy(), dataset.indptr)\n", + " dataset.temporal[:, i] = torch.from_numpy(scaler.transform(ga))\n", + "\n", + " def _scalers_target_inverse_transform(self, data: np.ndarray, indptr: np.ndarray) -> np.ndarray:\n", + " if not self.scalers_:\n", + " return data\n", + " for i in range(data.shape[1]):\n", + " ga = GroupedArray(data[:, i], indptr)\n", + " data[:, i] = self.scalers_['y'].inverse_transform(ga)\n", + " return data\n", + "\n", + " def _prepare_fit(self, df, static_df, sort_df, predict_only):\n", " #TODO: uids, last_dates and ds should be properties of the dataset class. See github issue.\n", " dataset, uids, last_dates, ds = TimeSeriesDataset.from_df(df=df,\n", " static_df=static_df,\n", - " sort_df=sort_df,\n", - " scaler_type=scaler_type)\n", + " sort_df=sort_df)\n", + " if predict_only:\n", + " self._scalers_transform(dataset)\n", + " else:\n", + " self._scalers_fit_transform(dataset)\n", " return dataset, uids, last_dates, ds\n", "\n", " def fit(self,\n", @@ -347,7 +408,7 @@ " # Process and save new dataset (in self)\n", " if df is not None:\n", " self.dataset, self.uids, self.last_dates, self.ds \\\n", - " = self._prepare_fit(df=df, static_df=static_df, sort_df=sort_df, scaler_type=self.local_scaler_type)\n", + " = self._prepare_fit(df=df, static_df=static_df, sort_df=sort_df, predict_only=False)\n", " self.sort_df = sort_df\n", " else:\n", " if verbose: print('Using stored dataset.')\n", @@ -420,9 +481,9 @@ "\n", " # Process new dataset but does not store it.\n", " if df is not None:\n", - " dataset, uids, last_dates, _ = self._prepare_fit(df=df, static_df=static_df, sort_df=sort_df, scaler_type=None)\n", - " dataset.scalers_ = self.dataset.scalers_\n", - " dataset._transform_temporal()\n", + " dataset, uids, last_dates, _ = self._prepare_fit(\n", + " df=df, static_df=static_df, sort_df=sort_df, predict_only=True\n", + " )\n", " else:\n", " dataset = self.dataset\n", " uids = self.uids\n", @@ -454,10 +515,12 @@ " f'Dropped {dropped_rows:,} unused rows from `futr_df`. ' + base_err_msg\n", " )\n", " if any(futr_df[col].isnull().any() for col in needed_futr_exog):\n", - " raise ValueError('Found null values in `futr_df`') \n", - " dataset = TimeSeriesDataset.update_dataset(dataset=dataset, future_df=futr_df)\n", + " raise ValueError('Found null values in `futr_df`')\n", + " futr_dataset = dataset.align(futr_df)\n", " else:\n", - " dataset = TimeSeriesDataset.update_dataset(dataset=dataset, future_df=fcsts_df.reset_index())\n", + " futr_dataset = dataset.align(fcsts_df.reset_index())\n", + " self._scalers_transform(futr_dataset)\n", + " dataset = dataset.append(futr_dataset)\n", "\n", " col_idx = 0\n", " fcsts = np.full((self.h * len(uids), len(cols)), fill_value=np.nan)\n", @@ -470,9 +533,9 @@ " fcsts[:, col_idx : col_idx + output_length] = model_fcsts\n", " col_idx += output_length\n", " model.set_test_size(old_test_size) # Set back to original value\n", - " if self.dataset.scalers_ is not None:\n", + " if self.scalers_:\n", " indptr = np.append(0, np.full(len(uids), self.h).cumsum())\n", - " fcsts = self.dataset._invert_target_transform(fcsts, indptr)\n", + " fcsts = self._scalers_target_inverse_transform(fcsts, indptr)\n", "\n", " # Declare predictions pd.DataFrame\n", " fcsts = pd.DataFrame.from_records(fcsts, columns=cols, \n", @@ -533,7 +596,7 @@ " # Process and save new dataset (in self)\n", " if df is not None:\n", " self.dataset, self.uids, self.last_dates, self.ds = self._prepare_fit(\n", - " df=df, static_df=static_df, sort_df=sort_df, scaler_type=self.local_scaler_type\n", + " df=df, static_df=static_df, sort_df=sort_df, predict_only=False\n", " )\n", " self.sort_df = sort_df\n", " else:\n", @@ -589,9 +652,9 @@ " output_length = len(model.loss.output_names)\n", " fcsts[:,col_idx:(col_idx + output_length)] = model_fcsts\n", " col_idx += output_length\n", - " if self.dataset.scalers_ is not None:\n", + " if self.scalers_: \n", " indptr = np.append(0, np.full(self.dataset.n_groups, self.h * n_windows).cumsum())\n", - " fcsts = self.dataset._invert_target_transform(fcsts, indptr)\n", + " fcsts = self._scalers_target_inverse_transform(fcsts, indptr)\n", "\n", " self._fitted = True \n", "\n", @@ -686,11 +749,11 @@ " columns=['y'], index=self.ds)\n", " Y_df = Y_df.reset_index(drop=False)\n", " fcsts_df = fcsts_df.merge(Y_df, how='left', on=['unique_id', 'ds'])\n", - " if self.dataset.scalers_ is not None:\n", + " if self.scalers_:\n", " sizes = fcsts_df.groupby('unique_id', observed=True).size().values\n", " indptr = np.append(0, sizes.cumsum())\n", " invert_cols = cols + ['y']\n", - " fcsts_df[invert_cols] = self.dataset._invert_target_transform(fcsts_df[invert_cols].values, indptr)\n", + " fcsts_df[invert_cols] = self._scalers_target_inverse_transform(fcsts_df[invert_cols].values, indptr)\n", "\n", " return fcsts_df\n", " \n", @@ -751,13 +814,17 @@ " set `save_dataset=False` to skip saving dataset.')\n", "\n", " # Save configuration and parameters\n", - " config_dict = {'h': self.h,\n", - " 'freq': self.freq,\n", - " 'uids': self.uids,\n", - " 'last_dates': self.last_dates,\n", - " 'ds': self.ds,\n", - " 'sort_df': self.sort_df,\n", - " '_fitted': self._fitted}\n", + " config_dict = {\n", + " \"h\": self.h,\n", + " \"freq\": self.freq,\n", + " \"uids\": self.uids,\n", + " \"last_dates\": self.last_dates,\n", + " \"ds\": self.ds,\n", + " \"sort_df\": self.sort_df,\n", + " \"_fitted\": self._fitted,\n", + " \"local_scaler_type\": self.local_scaler_type,\n", + " \"scalers_\": self.scalers_,\n", + " }\n", "\n", " with open(f\"{path}/configuration.pkl\", \"wb\") as f:\n", " pickle.dump(config_dict, f)\n", @@ -815,7 +882,11 @@ " raise Exception('No configuration found in directory.')\n", "\n", " # Create NeuralForecast object\n", - " neuralforecast = NeuralForecast(models=models, freq=config_dict['freq'])\n", + " neuralforecast = NeuralForecast(\n", + " models=models,\n", + " freq=config_dict['freq'],\n", + " local_scaler_type=config_dict['local_scaler_type'],\n", + " )\n", "\n", " # Dataset\n", " if dataset is not None:\n", @@ -828,6 +899,8 @@ " # Fitted flag\n", " neuralforecast._fitted = config_dict['_fitted']\n", "\n", + " neuralforecast.scalers_ = config_dict['scalers_']\n", + "\n", " return neuralforecast" ] }, diff --git a/nbs/tsdataset.ipynb b/nbs/tsdataset.ipynb index 8a0c0da0a..54d526845 100644 --- a/nbs/tsdataset.ipynb +++ b/nbs/tsdataset.ipynb @@ -54,17 +54,13 @@ "#| export\n", "import warnings\n", "from collections.abc import Mapping\n", - "from typing import Dict, Optional, TYPE_CHECKING\n", - "if TYPE_CHECKING:\n", - " from utilsforecast.target_transforms import BaseTargetTransform\n", "\n", "import numpy as np\n", "import pandas as pd\n", "import pytorch_lightning as pl\n", "import torch\n", "from torch.utils.data import Dataset, DataLoader\n", - "from utilsforecast.grouped_array import GroupedArray\n", - "from utilsforecast.processing import DataFrameProcessor" + "from utilsforecast.processing import process_df" ] }, { @@ -153,38 +149,8 @@ " min_size: int,\n", " static=None,\n", " static_cols=None,\n", - " sorted=False,\n", - " scaler_type=None):\n", + " sorted=False):\n", " super().__init__()\n", - "\n", - " if scaler_type is None:\n", - " self.scalers_: Optional[Dict[str, 'BaseTargetTransform']] = None\n", - " else:\n", - " # delay the import because these require numba, which isn't a requirement\n", - " from utilsforecast.target_transforms import (\n", - " LocalBoxCox,\n", - " LocalMinMaxScaler, \n", - " LocalRobustScaler, \n", - " LocalStandardScaler,\n", - " )\n", - "\n", - " type2scaler = {\n", - " 'standard': LocalStandardScaler,\n", - " 'robust': lambda: LocalRobustScaler(scale='mad'),\n", - " 'robust-iqr': lambda: LocalRobustScaler(scale='iqr'),\n", - " 'minmax': LocalMinMaxScaler,\n", - " 'boxcox': LocalBoxCox,\n", - " }\n", - " if scaler_type not in type2scaler:\n", - " raise ValueError(f'scaler_type must be one of {type2scaler.keys()}')\n", - " self.scalers_ = {}\n", - " for i, col in enumerate(temporal_cols):\n", - " if col == 'available_mask':\n", - " continue\n", - " ga = GroupedArray(temporal[:, i], indptr)\n", - " self.scalers_[col] = type2scaler[scaler_type]() \n", - " temporal[:, i] = self.scalers_[col].fit_transform(ga)\n", - "\n", " self.temporal = torch.tensor(temporal, dtype=torch.float)\n", " self.temporal_cols = pd.Index(list(temporal_cols))\n", "\n", @@ -233,78 +199,56 @@ " return False\n", " return np.allclose(self.data, other.data) and np.array_equal(self.indptr, other.indptr)\n", "\n", - " def _invert_target_transform(self, data: np.ndarray, indptr: np.ndarray) -> np.ndarray:\n", - " if self.scalers_ is None:\n", - " return data\n", - " for i in range(data.shape[1]):\n", - " ga = GroupedArray(data[:, i], indptr)\n", - " data[:, i] = self.scalers_['y'].inverse_transform(ga)\n", - " return data\n", - "\n", - " def _transform_temporal(self) -> None:\n", - " if self.scalers_ is None:\n", - " return\n", - " for i, col in enumerate(self.temporal_cols):\n", - " if col == 'available_mask':\n", - " continue\n", - " scaler = self.scalers_.get(col, None)\n", - " if scaler is None:\n", - " continue\n", - " ga = GroupedArray(self.temporal[:, i].numpy(), self.indptr)\n", - " self.temporal[:, i] = torch.from_numpy(scaler.transform(ga))\n", "\n", - " @staticmethod\n", - " def update_dataset(dataset, future_df):\n", - " \"\"\"Add future observations to the dataset.\n", - " \"\"\" \n", - " \n", + "\n", + " def align(self, df: pd.DataFrame) -> 'TimeSeriesDataset':\n", " # Protect consistency\n", - " future_df = future_df.copy()\n", + " df = df.copy()\n", "\n", " # Add Nones to missing columns (without available_mask)\n", - " temporal_cols = dataset.temporal_cols.copy()\n", + " temporal_cols = self.temporal_cols.copy()\n", " for col in temporal_cols:\n", - " if col not in future_df.columns:\n", - " future_df[col] = np.nan\n", + " if col not in df.columns:\n", + " df[col] = np.nan\n", " if col == 'available_mask':\n", - " future_df[col] = 1\n", + " df[col] = 1\n", " \n", " # Sort columns to match self.temporal_cols (without available_mask)\n", - " future_df = future_df[ ['unique_id','ds'] + temporal_cols.tolist() ]\n", + " df = df[ ['unique_id','ds'] + temporal_cols.tolist() ]\n", "\n", " # Process future_df\n", - " futr_dataset, *_ = dataset.from_df(df=future_df, sort_df=dataset.sorted)\n", - " futr_dataset.scalers_ = dataset.scalers_\n", - " futr_dataset._transform_temporal()\n", + " dataset, *_ = TimeSeriesDataset.from_df(df=df, sort_df=self.sorted)\n", + " return dataset\n", "\n", + " def append(self, futr_dataset: 'TimeSeriesDataset') -> 'TimeSeriesDataset':\n", + " \"\"\"Add future observations to the dataset. Returns a copy\"\"\"\n", + " if self.indptr.size != futr_dataset.indptr.size:\n", + " raise ValueError('Cannot append `futr_dataset` with different number of groups.')\n", " # Define and fill new temporal with updated information\n", - " len_temporal, col_temporal = dataset.temporal.shape\n", - " new_temporal = torch.zeros(size=(len_temporal+len(future_df), col_temporal))\n", - " new_indptr = [0]\n", - " new_max_size = 0\n", - "\n", - " acum = 0\n", - " for i in range(dataset.n_groups):\n", - " series_length = dataset.indptr[i + 1] - dataset.indptr[i]\n", - " new_length = series_length + futr_dataset.indptr[i + 1] - futr_dataset.indptr[i]\n", - " new_temporal[acum:(acum+series_length), :] = dataset.temporal[dataset.indptr[i] : dataset.indptr[i + 1], :]\n", - " new_temporal[(acum+series_length):(acum+new_length), :] = \\\n", - " futr_dataset.temporal[futr_dataset.indptr[i] : futr_dataset.indptr[i + 1], :]\n", - " \n", - " acum += new_length\n", - " new_indptr.append(acum)\n", - " if new_length > new_max_size:\n", - " new_max_size = new_length\n", + " len_temporal, col_temporal = self.temporal.shape\n", + " len_futr = futr_dataset.temporal.shape[0]\n", + " new_temporal = torch.empty(size=(len_temporal + len_futr, col_temporal))\n", + " new_sizes = np.diff(self.indptr) + np.diff(futr_dataset.indptr)\n", + " new_indptr = np.append(0, new_sizes.cumsum()).astype(np.int32)\n", + " new_max_size = np.max(new_sizes)\n", + "\n", + " for i in range(self.n_groups):\n", + " curr_slice = slice(self.indptr[i], self.indptr[i + 1])\n", + " curr_size = curr_slice.stop - curr_slice.start\n", + " futr_slice = slice(futr_dataset.indptr[i], futr_dataset.indptr[i + 1])\n", + " new_slice = slice(new_indptr[i], new_indptr[i + 1])\n", + " new_temporal[new_indptr[i] : new_indptr[i] + curr_size] = self.temporal[curr_slice]\n", + " new_temporal[new_indptr[i] + curr_size : new_indptr[i + 1]] = futr_dataset.temporal[futr_slice]\n", " \n", " # Define new dataset\n", " updated_dataset = TimeSeriesDataset(temporal=new_temporal,\n", - " temporal_cols=dataset.temporal_cols.copy(),\n", - " indptr=np.array(new_indptr).astype(np.int32),\n", + " temporal_cols=self.temporal_cols.copy(),\n", + " indptr=new_indptr,\n", " max_size=new_max_size,\n", - " min_size=dataset.min_size,\n", - " static=dataset.static,\n", - " static_cols=dataset.static_cols,\n", - " sorted=dataset.sorted)\n", + " min_size=self.min_size,\n", + " static=self.static,\n", + " static_cols=self.static_cols,\n", + " sorted=self.sorted)\n", "\n", " return updated_dataset\n", " \n", @@ -349,7 +293,7 @@ " return updated_dataset\n", "\n", " @staticmethod\n", - " def from_df(df, static_df=None, sort_df=False, scaler_type=None):\n", + " def from_df(df, static_df=None, sort_df=False):\n", " # TODO: protect on equality of static_df + df indexes\n", " if df.index.name == 'unique_id':\n", " warnings.warn(\n", @@ -369,8 +313,7 @@ " if sort_df:\n", " static_df = static_df.sort_index()\n", "\n", - " proc = DataFrameProcessor('unique_id', 'ds', 'y')\n", - " ids, times, data, indptr, sort_idxs = proc.process(df)\n", + " ids, times, data, indptr, sort_idxs = process_df(df, 'unique_id', 'ds', 'y')\n", " # processor sets y as the first column\n", " temporal_cols = pd.Index(['y'] + df.columns.drop(['unique_id', 'ds', 'y']).tolist())\n", " temporal = data.astype(np.float32, copy=False)\n", @@ -403,7 +346,6 @@ " max_size=max_size,\n", " min_size=min_size,\n", " sorted=sort_df,\n", - " scaler_type=scaler_type,\n", " )\n", " ds = pd.MultiIndex.from_frame(df[['unique_id', 'ds']])\n", " if sort_idxs is not None:\n", @@ -444,37 +386,6 @@ "test_eq(dates, temporal_df.groupby('unique_id')['ds'].max().values)" ] }, - { - "cell_type": "code", - "execution_count": null, - "id": "aca22c35-2806-4a84-b6f1-34104a63acee", - "metadata": {}, - "outputs": [], - "source": [ - "#| hide\n", - "# scaler_type\n", - "temporal_df_with_features = generate_series(n_series=10, n_temporal_features=2, equal_ends=False)\n", - "dataset, *_ = TimeSeriesDataset.from_df(df=temporal_df_with_features, scaler_type='standard')\n", - "grouped_y = temporal_df_with_features.groupby('unique_id')['y']\n", - "\n", - "# y was scaled\n", - "np.testing.assert_allclose(\n", - " temporal_df_with_features['y'].sub(grouped_y.transform('mean')).div(grouped_y.transform('std', ddof=0)).values,\n", - " dataset.temporal[:, 0].numpy(),\n", - " atol=1e-5,\n", - ")\n", - "\n", - "# mask is all 1s\n", - "assert dataset.temporal[:, -1].eq(1).all().item()\n", - "\n", - "# inverse tfm\n", - "np.testing.assert_allclose(\n", - " dataset.scalers_['y'].inverse_transform(GroupedArray(dataset.temporal[:, 0].numpy(), dataset.indptr)),\n", - " temporal_df_with_features['y'].values.astype('float32'),\n", - " atol=1e-5,\n", - ")" - ] - }, { "cell_type": "code", "execution_count": null, @@ -682,7 +593,8 @@ "# SPLIT_1 DATASET\n", "dataset_1, indices_1, dates_1, ds_1 = TimeSeriesDataset.from_df(df=split1_df,\n", " sort_df=False)\n", - "dataset_1 = dataset_1.update_dataset(dataset_1, split2_df)" + "futr_dataset = dataset_1.align(split2_df)\n", + "dataset_1 = dataset_1.append(futr_dataset)" ] }, { diff --git a/neuralforecast/_modidx.py b/neuralforecast/_modidx.py index fe61cb1ae..03a1a0929 100644 --- a/neuralforecast/_modidx.py +++ b/neuralforecast/_modidx.py @@ -62,6 +62,12 @@ 'neuralforecast/core.py'), 'neuralforecast.core.NeuralForecast._prepare_fit': ( 'core.html#neuralforecast._prepare_fit', 'neuralforecast/core.py'), + 'neuralforecast.core.NeuralForecast._scalers_fit_transform': ( 'core.html#neuralforecast._scalers_fit_transform', + 'neuralforecast/core.py'), + 'neuralforecast.core.NeuralForecast._scalers_target_inverse_transform': ( 'core.html#neuralforecast._scalers_target_inverse_transform', + 'neuralforecast/core.py'), + 'neuralforecast.core.NeuralForecast._scalers_transform': ( 'core.html#neuralforecast._scalers_transform', + 'neuralforecast/core.py'), 'neuralforecast.core.NeuralForecast.cross_validation': ( 'core.html#neuralforecast.cross_validation', 'neuralforecast/core.py'), 'neuralforecast.core.NeuralForecast.fit': ('core.html#neuralforecast.fit', 'neuralforecast/core.py'), @@ -915,16 +921,14 @@ 'neuralforecast/tsdataset.py'), 'neuralforecast.tsdataset.TimeSeriesDataset.__repr__': ( 'tsdataset.html#timeseriesdataset.__repr__', 'neuralforecast/tsdataset.py'), - 'neuralforecast.tsdataset.TimeSeriesDataset._invert_target_transform': ( 'tsdataset.html#timeseriesdataset._invert_target_transform', - 'neuralforecast/tsdataset.py'), - 'neuralforecast.tsdataset.TimeSeriesDataset._transform_temporal': ( 'tsdataset.html#timeseriesdataset._transform_temporal', - 'neuralforecast/tsdataset.py'), + 'neuralforecast.tsdataset.TimeSeriesDataset.align': ( 'tsdataset.html#timeseriesdataset.align', + 'neuralforecast/tsdataset.py'), + 'neuralforecast.tsdataset.TimeSeriesDataset.append': ( 'tsdataset.html#timeseriesdataset.append', + 'neuralforecast/tsdataset.py'), 'neuralforecast.tsdataset.TimeSeriesDataset.from_df': ( 'tsdataset.html#timeseriesdataset.from_df', 'neuralforecast/tsdataset.py'), 'neuralforecast.tsdataset.TimeSeriesDataset.trim_dataset': ( 'tsdataset.html#timeseriesdataset.trim_dataset', 'neuralforecast/tsdataset.py'), - 'neuralforecast.tsdataset.TimeSeriesDataset.update_dataset': ( 'tsdataset.html#timeseriesdataset.update_dataset', - 'neuralforecast/tsdataset.py'), 'neuralforecast.tsdataset.TimeSeriesLoader': ( 'tsdataset.html#timeseriesloader', 'neuralforecast/tsdataset.py'), 'neuralforecast.tsdataset.TimeSeriesLoader.__init__': ( 'tsdataset.html#timeseriesloader.__init__', diff --git a/neuralforecast/core.py b/neuralforecast/core.py index 777b4d235..06d0716a4 100644 --- a/neuralforecast/core.py +++ b/neuralforecast/core.py @@ -10,10 +10,19 @@ from copy import deepcopy from itertools import chain from os.path import isfile, join -from typing import Any, List, Optional +from typing import Any, Dict, List, Optional import numpy as np import pandas as pd +import torch +from utilsforecast.grouped_array import GroupedArray +from utilsforecast.target_transforms import ( + BaseTargetTransform, + LocalBoxCox, + LocalMinMaxScaler, + LocalRobustScaler, + LocalStandardScaler, +) from .tsdataset import TimeSeriesDataset from neuralforecast.models import ( @@ -154,6 +163,15 @@ def _future_dates(dataset, uids, last_dates, freq, h): } # %% ../nbs/core.ipynb 12 +_type2scaler = { + "standard": LocalStandardScaler, + "robust": lambda: LocalRobustScaler(scale="mad"), + "robust-iqr": lambda: LocalRobustScaler(scale="iqr"), + "minmax": LocalMinMaxScaler, + "boxcox": LocalBoxCox, +} + +# %% ../nbs/core.ipynb 13 class NeuralForecast: def __init__( self, models: List[Any], freq: str, local_scaler_type: Optional[str] = None @@ -189,16 +207,56 @@ def __init__( self.models_init = models self.models = [deepcopy(model) for model in self.models_init] self.freq = pd.tseries.frequencies.to_offset(freq) + if local_scaler_type is not None and local_scaler_type not in _type2scaler: + raise ValueError(f"scaler_type must be one of {_type2scaler.keys()}") self.local_scaler_type = local_scaler_type + self.scalers_: Dict[str, BaseTargetTransform] # Flags and attributes self._fitted = False - def _prepare_fit(self, df, static_df, sort_df, scaler_type): + def _scalers_fit_transform(self, dataset: TimeSeriesDataset) -> None: + self.scalers_ = {} + if self.local_scaler_type is None: + return None + for i, col in enumerate(dataset.temporal_cols): + if col == "available_mask": + continue + self.scalers_[col] = _type2scaler[self.local_scaler_type]() + ga = GroupedArray(dataset.temporal[:, i].numpy(), dataset.indptr) + dataset.temporal[:, i] = torch.from_numpy( + self.scalers_[col].fit_transform(ga) + ) + + def _scalers_transform(self, dataset: TimeSeriesDataset) -> None: + if not self.scalers_: + return None + for i, col in enumerate(dataset.temporal_cols): + scaler = self.scalers_.get(col, None) + if scaler is None: + continue + ga = GroupedArray(dataset.temporal[:, i].numpy(), dataset.indptr) + dataset.temporal[:, i] = torch.from_numpy(scaler.transform(ga)) + + def _scalers_target_inverse_transform( + self, data: np.ndarray, indptr: np.ndarray + ) -> np.ndarray: + if not self.scalers_: + return data + for i in range(data.shape[1]): + ga = GroupedArray(data[:, i], indptr) + data[:, i] = self.scalers_["y"].inverse_transform(ga) + return data + + def _prepare_fit(self, df, static_df, sort_df, predict_only): # TODO: uids, last_dates and ds should be properties of the dataset class. See github issue. dataset, uids, last_dates, ds = TimeSeriesDataset.from_df( - df=df, static_df=static_df, sort_df=sort_df, scaler_type=scaler_type + df=df, static_df=static_df, sort_df=sort_df ) + if predict_only: + self._scalers_transform(dataset) + else: + self._scalers_fit_transform(dataset) return dataset, uids, last_dates, ds def fit( @@ -248,10 +306,7 @@ def fit( # Process and save new dataset (in self) if df is not None: self.dataset, self.uids, self.last_dates, self.ds = self._prepare_fit( - df=df, - static_df=static_df, - sort_df=sort_df, - scaler_type=self.local_scaler_type, + df=df, static_df=static_df, sort_df=sort_df, predict_only=False ) self.sort_df = sort_df else: @@ -335,10 +390,8 @@ def predict( # Process new dataset but does not store it. if df is not None: dataset, uids, last_dates, _ = self._prepare_fit( - df=df, static_df=static_df, sort_df=sort_df, scaler_type=None + df=df, static_df=static_df, sort_df=sort_df, predict_only=True ) - dataset.scalers_ = self.dataset.scalers_ - dataset._transform_temporal() else: dataset = self.dataset uids = self.uids @@ -375,13 +428,11 @@ def predict( ) if any(futr_df[col].isnull().any() for col in needed_futr_exog): raise ValueError("Found null values in `futr_df`") - dataset = TimeSeriesDataset.update_dataset( - dataset=dataset, future_df=futr_df - ) + futr_dataset = dataset.align(futr_df) else: - dataset = TimeSeriesDataset.update_dataset( - dataset=dataset, future_df=fcsts_df.reset_index() - ) + futr_dataset = dataset.align(fcsts_df.reset_index()) + self._scalers_transform(futr_dataset) + dataset = dataset.append(futr_dataset) col_idx = 0 fcsts = np.full((self.h * len(uids), len(cols)), fill_value=np.nan) @@ -394,9 +445,9 @@ def predict( fcsts[:, col_idx : col_idx + output_length] = model_fcsts col_idx += output_length model.set_test_size(old_test_size) # Set back to original value - if self.dataset.scalers_ is not None: + if self.scalers_: indptr = np.append(0, np.full(len(uids), self.h).cumsum()) - fcsts = self.dataset._invert_target_transform(fcsts, indptr) + fcsts = self._scalers_target_inverse_transform(fcsts, indptr) # Declare predictions pd.DataFrame fcsts = pd.DataFrame.from_records(fcsts, columns=cols, index=fcsts_df.index) @@ -458,10 +509,7 @@ def cross_validation( # Process and save new dataset (in self) if df is not None: self.dataset, self.uids, self.last_dates, self.ds = self._prepare_fit( - df=df, - static_df=static_df, - sort_df=sort_df, - scaler_type=self.local_scaler_type, + df=df, static_df=static_df, sort_df=sort_df, predict_only=False ) self.sort_df = sort_df else: @@ -526,11 +574,11 @@ def cross_validation( output_length = len(model.loss.output_names) fcsts[:, col_idx : (col_idx + output_length)] = model_fcsts col_idx += output_length - if self.dataset.scalers_ is not None: + if self.scalers_: indptr = np.append( 0, np.full(self.dataset.n_groups, self.h * n_windows).cumsum() ) - fcsts = self.dataset._invert_target_transform(fcsts, indptr) + fcsts = self._scalers_target_inverse_transform(fcsts, indptr) self._fitted = True @@ -633,11 +681,11 @@ def predict_insample(self, step_size: int = 1): ) Y_df = Y_df.reset_index(drop=False) fcsts_df = fcsts_df.merge(Y_df, how="left", on=["unique_id", "ds"]) - if self.dataset.scalers_ is not None: + if self.scalers_: sizes = fcsts_df.groupby("unique_id", observed=True).size().values indptr = np.append(0, sizes.cumsum()) invert_cols = cols + ["y"] - fcsts_df[invert_cols] = self.dataset._invert_target_transform( + fcsts_df[invert_cols] = self._scalers_target_inverse_transform( fcsts_df[invert_cols].values, indptr ) @@ -718,6 +766,8 @@ def save( "ds": self.ds, "sort_df": self.sort_df, "_fitted": self._fitted, + "local_scaler_type": self.local_scaler_type, + "scalers_": self.scalers_, } with open(f"{path}/configuration.pkl", "wb") as f: @@ -787,7 +837,11 @@ def load(path, verbose=False, **kwargs): raise Exception("No configuration found in directory.") # Create NeuralForecast object - neuralforecast = NeuralForecast(models=models, freq=config_dict["freq"]) + neuralforecast = NeuralForecast( + models=models, + freq=config_dict["freq"], + local_scaler_type=config_dict["local_scaler_type"], + ) # Dataset if dataset is not None: @@ -800,4 +854,6 @@ def load(path, verbose=False, **kwargs): # Fitted flag neuralforecast._fitted = config_dict["_fitted"] + neuralforecast.scalers_ = config_dict["scalers_"] + return neuralforecast diff --git a/neuralforecast/tsdataset.py b/neuralforecast/tsdataset.py index df0f83d07..43d1b0691 100644 --- a/neuralforecast/tsdataset.py +++ b/neuralforecast/tsdataset.py @@ -6,18 +6,13 @@ # %% ../nbs/tsdataset.ipynb 4 import warnings from collections.abc import Mapping -from typing import Dict, Optional, TYPE_CHECKING - -if TYPE_CHECKING: - from utilsforecast.target_transforms import BaseTargetTransform import numpy as np import pandas as pd import pytorch_lightning as pl import torch from torch.utils.data import Dataset, DataLoader -from utilsforecast.grouped_array import GroupedArray -from utilsforecast.processing import DataFrameProcessor +from utilsforecast.processing import process_df # %% ../nbs/tsdataset.ipynb 5 class TimeSeriesLoader(DataLoader): @@ -86,38 +81,8 @@ def __init__( static=None, static_cols=None, sorted=False, - scaler_type=None, ): super().__init__() - - if scaler_type is None: - self.scalers_: Optional[Dict[str, "BaseTargetTransform"]] = None - else: - # delay the import because these require numba, which isn't a requirement - from utilsforecast.target_transforms import ( - LocalBoxCox, - LocalMinMaxScaler, - LocalRobustScaler, - LocalStandardScaler, - ) - - type2scaler = { - "standard": LocalStandardScaler, - "robust": lambda: LocalRobustScaler(scale="mad"), - "robust-iqr": lambda: LocalRobustScaler(scale="iqr"), - "minmax": LocalMinMaxScaler, - "boxcox": LocalBoxCox, - } - if scaler_type not in type2scaler: - raise ValueError(f"scaler_type must be one of {type2scaler.keys()}") - self.scalers_ = {} - for i, col in enumerate(temporal_cols): - if col == "available_mask": - continue - ga = GroupedArray(temporal[:, i], indptr) - self.scalers_[col] = type2scaler[scaler_type]() - temporal[:, i] = self.scalers_[col].fit_transform(ga) - self.temporal = torch.tensor(temporal, dtype=torch.float) self.temporal_cols = pd.Index(list(temporal_cols)) @@ -172,87 +137,61 @@ def __eq__(self, other): self.indptr, other.indptr ) - def _invert_target_transform( - self, data: np.ndarray, indptr: np.ndarray - ) -> np.ndarray: - if self.scalers_ is None: - return data - for i in range(data.shape[1]): - ga = GroupedArray(data[:, i], indptr) - data[:, i] = self.scalers_["y"].inverse_transform(ga) - return data - - def _transform_temporal(self) -> None: - if self.scalers_ is None: - return - for i, col in enumerate(self.temporal_cols): - if col == "available_mask": - continue - scaler = self.scalers_.get(col, None) - if scaler is None: - continue - ga = GroupedArray(self.temporal[:, i].numpy(), self.indptr) - self.temporal[:, i] = torch.from_numpy(scaler.transform(ga)) - - @staticmethod - def update_dataset(dataset, future_df): - """Add future observations to the dataset.""" - + def align(self, df: pd.DataFrame) -> "TimeSeriesDataset": # Protect consistency - future_df = future_df.copy() + df = df.copy() # Add Nones to missing columns (without available_mask) - temporal_cols = dataset.temporal_cols.copy() + temporal_cols = self.temporal_cols.copy() for col in temporal_cols: - if col not in future_df.columns: - future_df[col] = np.nan + if col not in df.columns: + df[col] = np.nan if col == "available_mask": - future_df[col] = 1 + df[col] = 1 # Sort columns to match self.temporal_cols (without available_mask) - future_df = future_df[["unique_id", "ds"] + temporal_cols.tolist()] + df = df[["unique_id", "ds"] + temporal_cols.tolist()] # Process future_df - futr_dataset, *_ = dataset.from_df(df=future_df, sort_df=dataset.sorted) - futr_dataset.scalers_ = dataset.scalers_ - futr_dataset._transform_temporal() - - # Define and fill new temporal with updated information - len_temporal, col_temporal = dataset.temporal.shape - new_temporal = torch.zeros(size=(len_temporal + len(future_df), col_temporal)) - new_indptr = [0] - new_max_size = 0 - - acum = 0 - for i in range(dataset.n_groups): - series_length = dataset.indptr[i + 1] - dataset.indptr[i] - new_length = ( - series_length + futr_dataset.indptr[i + 1] - futr_dataset.indptr[i] + dataset, *_ = TimeSeriesDataset.from_df(df=df, sort_df=self.sorted) + return dataset + + def append(self, futr_dataset: "TimeSeriesDataset") -> "TimeSeriesDataset": + """Add future observations to the dataset. Returns a copy""" + if self.indptr.size != futr_dataset.indptr.size: + raise ValueError( + "Cannot append `futr_dataset` with different number of groups." ) - new_temporal[acum : (acum + series_length), :] = dataset.temporal[ - dataset.indptr[i] : dataset.indptr[i + 1], : + # Define and fill new temporal with updated information + len_temporal, col_temporal = self.temporal.shape + len_futr = futr_dataset.temporal.shape[0] + new_temporal = torch.empty(size=(len_temporal + len_futr, col_temporal)) + new_sizes = np.diff(self.indptr) + np.diff(futr_dataset.indptr) + new_indptr = np.append(0, new_sizes.cumsum()).astype(np.int32) + new_max_size = np.max(new_sizes) + + for i in range(self.n_groups): + curr_slice = slice(self.indptr[i], self.indptr[i + 1]) + curr_size = curr_slice.stop - curr_slice.start + futr_slice = slice(futr_dataset.indptr[i], futr_dataset.indptr[i + 1]) + new_slice = slice(new_indptr[i], new_indptr[i + 1]) + new_temporal[new_indptr[i] : new_indptr[i] + curr_size] = self.temporal[ + curr_slice ] new_temporal[ - (acum + series_length) : (acum + new_length), : - ] = futr_dataset.temporal[ - futr_dataset.indptr[i] : futr_dataset.indptr[i + 1], : - ] - - acum += new_length - new_indptr.append(acum) - if new_length > new_max_size: - new_max_size = new_length + new_indptr[i] + curr_size : new_indptr[i + 1] + ] = futr_dataset.temporal[futr_slice] # Define new dataset updated_dataset = TimeSeriesDataset( temporal=new_temporal, - temporal_cols=dataset.temporal_cols.copy(), - indptr=np.array(new_indptr).astype(np.int32), + temporal_cols=self.temporal_cols.copy(), + indptr=new_indptr, max_size=new_max_size, - min_size=dataset.min_size, - static=dataset.static, - static_cols=dataset.static_cols, - sorted=dataset.sorted, + min_size=self.min_size, + static=self.static, + static_cols=self.static_cols, + sorted=self.sorted, ) return updated_dataset @@ -303,7 +242,7 @@ def trim_dataset(dataset, left_trim: int = 0, right_trim: int = 0): return updated_dataset @staticmethod - def from_df(df, static_df=None, sort_df=False, scaler_type=None): + def from_df(df, static_df=None, sort_df=False): # TODO: protect on equality of static_df + df indexes if df.index.name == "unique_id": warnings.warn( @@ -323,8 +262,7 @@ def from_df(df, static_df=None, sort_df=False, scaler_type=None): if sort_df: static_df = static_df.sort_index() - proc = DataFrameProcessor("unique_id", "ds", "y") - ids, times, data, indptr, sort_idxs = proc.process(df) + ids, times, data, indptr, sort_idxs = process_df(df, "unique_id", "ds", "y") # processor sets y as the first column temporal_cols = pd.Index( ["y"] + df.columns.drop(["unique_id", "ds", "y"]).tolist() @@ -359,14 +297,13 @@ def from_df(df, static_df=None, sort_df=False, scaler_type=None): max_size=max_size, min_size=min_size, sorted=sort_df, - scaler_type=scaler_type, ) ds = pd.MultiIndex.from_frame(df[["unique_id", "ds"]]) if sort_idxs is not None: ds = ds[sort_idxs] return dataset, indices, dates, ds -# %% ../nbs/tsdataset.ipynb 11 +# %% ../nbs/tsdataset.ipynb 10 class TimeSeriesDataModule(pl.LightningDataModule): def __init__( self,