diff --git a/src/akkudoktoreos/class_load_corrector.py b/src/akkudoktoreos/class_load_corrector.py index 2499c437..0bf98154 100644 --- a/src/akkudoktoreos/class_load_corrector.py +++ b/src/akkudoktoreos/class_load_corrector.py @@ -1,3 +1,5 @@ +from typing import Optional, Tuple + import matplotlib.pyplot as plt import numpy as np import pandas as pd @@ -5,29 +7,66 @@ class LoadPredictionAdjuster: - def __init__(self, measured_data, predicted_data, load_forecast): - self.measured_data = measured_data - self.predicted_data = predicted_data - self.load_forecast = load_forecast - self.merged_data = self._merge_data() - self.train_data = None - self.test_data = None - self.weekday_diff = None - self.weekend_diff = None - - def _remove_outliers(self, data, threshold=2): - # Calculate the Z-Score of the 'Last' data + def __init__( + self, + measured_data: pd.DataFrame, + predicted_data: pd.DataFrame, + load_forecast: object, + ) -> None: + """ + Initialize the LoadPredictionAdjuster with measured, predicted data, and a load forecast object. + """ + # Store the input dataframes + self.measured_data: pd.DataFrame = measured_data + self.predicted_data: pd.DataFrame = predicted_data + self.load_forecast: object = load_forecast + + # Merge measured and predicted data + self.merged_data: pd.DataFrame = self._merge_data() + + # Initialize placeholders for train/test data and differences + self.train_data: Optional[pd.DataFrame] = None + self.test_data: Optional[pd.DataFrame] = None + self.weekday_diff: Optional[pd.Series] = None + self.weekend_diff: Optional[pd.Series] = None + + def _remove_outliers(self, data: pd.DataFrame, threshold: float = 2.0) -> pd.DataFrame: + """ + Remove outliers based on the Z-score from the 'Last' column. + + Args: + data (pd.DataFrame): The input data with 'Last' column. + threshold (float): The Z-score threshold for detecting outliers. + + Returns: + pd.DataFrame: Filtered data without outliers. + """ + # Calculate Z-score for 'Last' column and filter based on threshold data["Z-Score"] = np.abs((data["Last"] - data["Last"].mean()) / data["Last"].std()) - # Filter the data based on the threshold filtered_data = data[data["Z-Score"] < threshold] - return filtered_data.drop(columns=["Z-Score"]) + return filtered_data.drop(columns=["Z-Score"]) # Drop Z-score column after filtering + + def _merge_data(self) -> pd.DataFrame: + """ + Merge the measured and predicted data on the 'time' column. + + Returns: + pd.DataFrame: The merged dataset. + """ + # Convert time columns to datetime in both datasets - def _merge_data(self): - # Convert the time column in both DataFrames to datetime + def _merge_data(self) -> pd.DataFrame: + """ + Merge the measured and predicted data on the 'time' column. + + Returns: + pd.DataFrame: The merged dataset. + """ + # Convert time columns to datetime in both datasets self.predicted_data["time"] = pd.to_datetime(self.predicted_data["time"]) self.measured_data["time"] = pd.to_datetime(self.measured_data["time"]) - # Ensure both time columns have the same timezone + # Localize time to UTC and then convert to Berlin time if self.measured_data["time"].dt.tz is None: self.measured_data["time"] = self.measured_data["time"].dt.tz_localize("UTC") @@ -36,19 +75,32 @@ def _merge_data(self): ) self.measured_data["time"] = self.measured_data["time"].dt.tz_convert("Europe/Berlin") - # Optionally: Remove timezone information if only working locally + # Remove timezone information (optional for local work) self.predicted_data["time"] = self.predicted_data["time"].dt.tz_localize(None) self.measured_data["time"] = self.measured_data["time"].dt.tz_localize(None) - # Now you can perform the merge + # Merge the measured and predicted dataframes on 'time' merged_data = pd.merge(self.measured_data, self.predicted_data, on="time", how="inner") - print(merged_data) + + # Extract useful columns such as 'Hour' and 'DayOfWeek' merged_data["Hour"] = merged_data["time"].dt.hour merged_data["DayOfWeek"] = merged_data["time"].dt.dayofweek return merged_data - def calculate_weighted_mean(self, train_period_weeks=9, test_period_weeks=1): + def calculate_weighted_mean( + self, train_period_weeks: int = 9, test_period_weeks: int = 1 + ) -> None: + """ + Calculate the weighted mean difference between actual and predicted values for training and testing periods. + + Args: + train_period_weeks (int): Number of weeks to use for training data. + test_period_weeks (int): Number of weeks to use for testing data. + """ + # Remove outliers from the merged data self.merged_data = self._remove_outliers(self.merged_data) + + # Define training and testing periods based on weeks train_end_date = self.merged_data["time"].max() - pd.Timedelta(weeks=test_period_weeks) train_start_date = train_end_date - pd.Timedelta(weeks=train_period_weeks) @@ -57,21 +109,24 @@ def calculate_weighted_mean(self, train_period_weeks=9, test_period_weeks=1): test_start_date + pd.Timedelta(weeks=test_period_weeks) - pd.Timedelta(hours=1) ) + # Split merged data into training and testing datasets self.train_data = self.merged_data[ (self.merged_data["time"] >= train_start_date) & (self.merged_data["time"] <= train_end_date) ] - self.test_data = self.merged_data[ (self.merged_data["time"] >= test_start_date) & (self.merged_data["time"] <= test_end_date) ] + # Calculate the difference between actual ('Last') and predicted ('Last Pred') self.train_data["Difference"] = self.train_data["Last"] - self.train_data["Last Pred"] + # Separate training data into weekdays and weekends weekdays_train_data = self.train_data[self.train_data["DayOfWeek"] < 5] weekends_train_data = self.train_data[self.train_data["DayOfWeek"] >= 5] + # Calculate weighted mean differences for both weekdays and weekends self.weekday_diff = ( weekdays_train_data.groupby("Hour").apply(self._weighted_mean_diff).dropna() ) @@ -79,27 +134,62 @@ def calculate_weighted_mean(self, train_period_weeks=9, test_period_weeks=1): weekends_train_data.groupby("Hour").apply(self._weighted_mean_diff).dropna() ) - def _weighted_mean_diff(self, data): + def _weighted_mean_diff(self, data: pd.DataFrame) -> float: + """ + Compute the weighted mean difference between actual and predicted values. + + Args: + data (pd.DataFrame): Data for a specific hour. + + Returns: + float: Weighted mean difference for that hour. + """ + # Weigh recent data more by using days difference from the last date in the training set train_end_date = self.train_data["time"].max() weights = 1 / (train_end_date - data["time"]).dt.days.replace(0, np.nan) weighted_mean = (data["Difference"] * weights).sum() / weights.sum() return weighted_mean - def adjust_predictions(self): + def adjust_predictions(self) -> None: + """ + Adjust predictions for both training and test data using the calculated weighted differences. + """ + # Apply adjustments to both training and testing data self.train_data["Adjusted Pred"] = self.train_data.apply(self._adjust_row, axis=1) self.test_data["Adjusted Pred"] = self.test_data.apply(self._adjust_row, axis=1) - def _adjust_row(self, row): + def _adjust_row(self, row: pd.Series) -> float: + """ + Adjust a single row's prediction based on the hour and day of the week. + + Args: + row (pd.Series): A single row of data. + + Returns: + float: Adjusted prediction. + """ + # Adjust predictions based on whether it's a weekday or weekend if row["DayOfWeek"] < 5: return row["Last Pred"] + self.weekday_diff.get(row["Hour"], 0) else: return row["Last Pred"] + self.weekend_diff.get(row["Hour"], 0) - def plot_results(self): + def plot_results(self) -> None: + """ + Plot the actual, predicted, and adjusted predicted values for both training and testing data. + """ + # Plot results for training and testing data self._plot_data(self.train_data, "Training") self._plot_data(self.test_data, "Testing") - def _plot_data(self, data, data_type): + def _plot_data(self, data: pd.DataFrame, data_type: str) -> None: + """ + Helper function to plot the data. + + Args: + data (pd.DataFrame): Data to plot (training or testing). + data_type (str): Label to identify whether it's training or testing data. + """ plt.figure(figsize=(14, 7)) plt.plot(data["time"], data["Last"], label=f"Actual Last - {data_type}", color="blue") plt.plot( @@ -123,76 +213,61 @@ def _plot_data(self, data, data_type): plt.grid(True) plt.show() - def evaluate_model(self): + def evaluate_model(self) -> Tuple[float, float]: + """ + Evaluate the model performance using Mean Squared Error and R-squared metrics. + + Args: + mse: Mean squared error of the adjusted prediction w.r.t. last test data. + r2: R2 score of the adjusted prediction w.r.t. last test data. + """ + # Calculate Mean Squared Error and R-squared for the adjusted predictions mse = mean_squared_error(self.test_data["Last"], self.test_data["Adjusted Pred"]) r2 = r2_score(self.test_data["Last"], self.test_data["Adjusted Pred"]) print(f"Mean Squared Error: {mse}") print(f"R-squared: {r2}") + return mse, r2 + + def predict_next_hours(self, hours_ahead: int) -> pd.DataFrame: + """ + Predict load for the next given number of hours. - def predict_next_hours(self, hours_ahead): + Args: + hours_ahead (int): Number of hours to predict. + + Returns: + pd.DataFrame: DataFrame with future predicted and adjusted load. + """ + # Get the latest time in the merged data last_date = self.merged_data["time"].max() + + # Generate future timestamps for the next 'hours_ahead' future_dates = [last_date + pd.Timedelta(hours=i) for i in range(1, hours_ahead + 1)] future_df = pd.DataFrame({"time": future_dates}) + + # Extract hour and day of the week for the future predictions future_df["Hour"] = future_df["time"].dt.hour future_df["DayOfWeek"] = future_df["time"].dt.dayofweek + + # Predict the load and apply adjustments for future predictions future_df["Last Pred"] = future_df["time"].apply(self._forecast_next_hours) future_df["Adjusted Pred"] = future_df.apply(self._adjust_row, axis=1) + return future_df - def _forecast_next_hours(self, timestamp): + def _forecast_next_hours(self, timestamp: pd.Timestamp) -> float: + """ + Helper function to forecast the load for the next hours using the load_forecast object. + + Args: + timestamp (pd.Timestamp): The time for which to predict the load. + + Returns: + float: Predicted load for the given time. + """ + # Use the load_forecast object to get the hourly forecast for the given timestamp date_str = timestamp.strftime("%Y-%m-%d") hour = timestamp.hour daily_forecast = self.load_forecast.get_daily_stats(date_str) - return daily_forecast[0][hour] if hour < len(daily_forecast[0]) else np.nan - - -# if __name__ == '__main__': -# estimator = LastEstimator() -# start_date = "2024-06-01" -# end_date = "2024-08-01" -# last_df = estimator.get_last(start_date, end_date) - -# selected_columns = last_df[['timestamp', 'Last']] -# selected_columns['time'] = pd.to_datetime(selected_columns['timestamp']).dt.floor('H') -# selected_columns['Last'] = pd.to_numeric(selected_columns['Last'], errors='coerce') - -# # Drop rows with NaN values -# cleaned_data = selected_columns.dropna() - -# print(cleaned_data) -# # Create an instance of LoadForecast -# lf = LoadForecast(filepath=r'.\load_profiles.npz', year_energy=6000*1000) -# # Initialize an empty DataFrame to hold the forecast data -# forecast_list = [] - -# # Loop through each day in the date range -# for single_date in pd.date_range(cleaned_data['time'].min().date(), cleaned_data['time'].max().date()): -# date_str = single_date.strftime('%Y-%m-%d') -# daily_forecast = lf.get_daily_stats(date_str) -# mean_values = daily_forecast[0] # Extract the mean values -# hours = [single_date + pd.Timedelta(hours=i) for i in range(24)] -# daily_forecast_df = pd.DataFrame({'time': hours, 'Last Pred': mean_values}) -# forecast_list.append(daily_forecast_df) - -# # Concatenate all daily forecasts into a single DataFrame -# forecast_df = pd.concat(forecast_list, ignore_index=True) - -# # Create an instance of the LoadPredictionAdjuster class -# adjuster = LoadPredictionAdjuster(cleaned_data, forecast_df, lf) - -# # Calculate the weighted mean differences -# adjuster.calculate_weighted_mean() - -# # Adjust the predictions -# adjuster.adjust_predictions() - -# # Plot the results -# adjuster.plot_results() - -# # Evaluate the model -# adjuster.evaluate_model() - -# # Predict the next x hours -# future_predictions = adjuster.predict_next_hours(48) -# print(future_predictions) + return daily_forecast[0][hour] if hour < len(daily_forecast[0]) else np.nan diff --git a/tests/test_load_corrector.py b/tests/test_load_corrector.py new file mode 100644 index 00000000..39ee31d1 --- /dev/null +++ b/tests/test_load_corrector.py @@ -0,0 +1,184 @@ +from unittest.mock import MagicMock + +import numpy as np +import pandas as pd +import pytest + +from akkudoktoreos.class_load_corrector import LoadPredictionAdjuster + + +@pytest.fixture +def setup_data() -> tuple[pd.DataFrame, pd.DataFrame, MagicMock]: + """ + Fixture to create mock measured_data, predicted_data, and a mock load_forecast. + These mocks are returned as a tuple for testing purposes. + """ + # Create mock measured_data (real measured load data) + measured_data = pd.DataFrame( + { + "time": pd.date_range(start="2023-10-01", periods=24, freq="H"), + "Last": np.random.rand(24) * 100, # Random measured load values + } + ) + + # Create mock predicted_data (forecasted load data) + predicted_data = pd.DataFrame( + { + "time": pd.date_range(start="2023-10-01", periods=24, freq="H"), + "Last Pred": np.random.rand(24) * 100, # Random predicted load values + } + ) + + # Mock the load_forecast object + load_forecast = MagicMock() + load_forecast.get_daily_stats = MagicMock( + return_value=([np.random.rand(24) * 100],) # Simulate daily statistics + ) + + return measured_data, predicted_data, load_forecast + + +def test_merge_data(setup_data: tuple[pd.DataFrame, pd.DataFrame, MagicMock]) -> None: + """ + Test the _merge_data method to ensure it merges measured and predicted data correctly. + """ + measured_data, predicted_data, load_forecast = setup_data + adjuster = LoadPredictionAdjuster(measured_data, predicted_data, load_forecast) + + # Call the method to merge data + merged_data = adjuster._merge_data() + + # Assert the merged data is a DataFrame + assert isinstance(merged_data, pd.DataFrame), "Merged data should be a DataFrame" + # Assert certain columns are present in the merged data + assert "Hour" in merged_data.columns, "Merged data should contain 'Hour' column" + assert "DayOfWeek" in merged_data.columns, "Merged data should contain 'DayOfWeek' column" + assert len(merged_data) > 0, "Merged data should not be empty" + + +def test_remove_outliers( + setup_data: tuple[pd.DataFrame, pd.DataFrame, MagicMock], +) -> None: + """ + Test the _remove_outliers method to ensure it filters outliers from the data. + """ + measured_data, predicted_data, load_forecast = setup_data + adjuster = LoadPredictionAdjuster(measured_data, predicted_data, load_forecast) + + # Create data with explicit outliers for testing + normal_values = np.random.rand(98) * 100 # Normal load values + outliers = np.array([500, -500]) # Explicit extreme outlier values + data_with_outliers = np.concatenate([normal_values, outliers]) + + # Simulate the merged_data with outliers to test the _remove_outliers method + adjuster.merged_data = pd.DataFrame({"Last": data_with_outliers}) + + # Apply the _remove_outliers method with default threshold + filtered_data = adjuster._remove_outliers(adjuster.merged_data) + + # Assert that the output is a DataFrame and that outliers were removed + assert isinstance(filtered_data, pd.DataFrame), "Filtered data should be a DataFrame" + assert len(filtered_data) < len( + adjuster.merged_data + ), "Filtered data should remove some outliers" + assert len(filtered_data) == 98, "Filtered data should have removed exactly 2 outliers" + + +def test_calculate_weighted_mean( + setup_data: tuple[pd.DataFrame, pd.DataFrame, MagicMock], +) -> None: + """ + Test the calculate_weighted_mean method to ensure weighted means for weekday and weekend differences are calculated correctly. + """ + measured_data, predicted_data, load_forecast = setup_data + + # Create time range and new data for 14 days (2 weeks) + time_range = pd.date_range(start="2023-09-25", periods=24 * 14, freq="H") + + # Create new measured_data and predicted_data matching the time range + measured_data = pd.DataFrame( + { + "time": time_range, + "Last": np.random.rand(len(time_range)) * 100, # Random 'Last' values + } + ) + + predicted_data = pd.DataFrame( + { + "time": time_range, + "Last Pred": np.random.rand(len(time_range)) * 100, # Random 'Last Pred' values + } + ) + + adjuster = LoadPredictionAdjuster(measured_data, predicted_data, load_forecast) + adjuster.merged_data = adjuster._merge_data() + + # Calculate the weighted mean over training and testing periods + adjuster.calculate_weighted_mean(train_period_weeks=1, test_period_weeks=1) + + # Assert that weekday and weekend differences are calculated and non-empty + assert adjuster.weekday_diff is not None, "Weekday differences should be calculated" + assert len(adjuster.weekend_diff) > 0, "Weekend differences should not be empty" + + +def test_adjust_predictions( + setup_data: tuple[pd.DataFrame, pd.DataFrame, MagicMock], +) -> None: + """ + Test the adjust_predictions method to ensure it correctly adds the 'Adjusted Pred' column to train and test data. + """ + measured_data, predicted_data, load_forecast = setup_data + adjuster = LoadPredictionAdjuster(measured_data, predicted_data, load_forecast) + adjuster.merged_data = adjuster._merge_data() + + # Calculate the weighted mean and adjust predictions + adjuster.calculate_weighted_mean(train_period_weeks=1, test_period_weeks=1) + adjuster.adjust_predictions() + + # Assert that the 'Adjusted Pred' column is present in both train and test data + assert ( + "Adjusted Pred" in adjuster.train_data.columns + ), "Train data should have 'Adjusted Pred' column" + + +def test_evaluate_model( + setup_data: tuple[pd.DataFrame, pd.DataFrame, MagicMock], + capsys: pytest.CaptureFixture, +) -> None: + """ + Test the evaluate_model method to ensure it prints evaluation metrics (MSE and R-squared). + """ + measured_data, predicted_data, load_forecast = setup_data + adjuster = LoadPredictionAdjuster(measured_data, predicted_data, load_forecast) + adjuster.merged_data = adjuster._merge_data() + + # Calculate weighted mean, adjust predictions, and evaluate the model + adjuster.calculate_weighted_mean(train_period_weeks=1, test_period_weeks=1) + adjuster.adjust_predictions() + mse, r2 = adjuster.evaluate_model() + assert not np.isnan(mse) + assert not np.isnan(r2) + + # Capture printed output and assert that evaluation metrics are printed + captured = capsys.readouterr() + assert "Mean Squared Error" in captured.out, "Evaluation should print Mean Squared Error" + assert "R-squared" in captured.out, "Evaluation should print R-squared" + + +def test_predict_next_hours( + setup_data: tuple[pd.DataFrame, pd.DataFrame, MagicMock], +) -> None: + """ + Test the predict_next_hours method to ensure future predictions are made and contain 'Adjusted Pred'. + """ + measured_data, predicted_data, load_forecast = setup_data + adjuster = LoadPredictionAdjuster(measured_data, predicted_data, load_forecast) + adjuster.merged_data = adjuster._merge_data() + + # Calculate weighted mean and predict the next 5 hours + adjuster.calculate_weighted_mean(train_period_weeks=1, test_period_weeks=1) + future_df = adjuster.predict_next_hours(5) + + # Assert that the correct number of future hours are predicted and that 'Adjusted Pred' is present + assert len(future_df) == 5, "Should predict for 5 future hours" + assert "Adjusted Pred" in future_df.columns, "Future data should have 'Adjusted Pred' column"