From ae0b40651d4bc8f52f3653e5365fbff312b66c6e Mon Sep 17 00:00:00 2001 From: "djgagne@ou.edu" Date: Fri, 12 Apr 2024 18:02:51 -0600 Subject: [PATCH 01/10] Bridgescaler memory issues --- bridgescaler/distributed.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/bridgescaler/distributed.py b/bridgescaler/distributed.py index 3bdef4e..5c31df4 100644 --- a/bridgescaler/distributed.py +++ b/bridgescaler/distributed.py @@ -9,6 +9,7 @@ from functools import partial from scipy.stats import logistic from warnings import warn +from memory_profiler import profile CENTROID_DTYPE = np.dtype([('mean', np.float64), ('weight', np.float64)]) @@ -356,12 +357,12 @@ def fit_variable(var_index, xv_shared=None, compression=None, channels_last=None def transform_variable(td_obj, xv, min_val=0.000001, max_val=0.9999999, distribution="normal"): x_transformed = td_obj.cdf(xv) - x_transformed[:] = np.minimum(x_transformed, max_val) - x_transformed[:] = np.maximum(x_transformed, min_val) + x_transformed = np.minimum(x_transformed, max_val) + x_transformed = np.maximum(x_transformed, min_val) if distribution == "normal": - x_transformed[:] = norm.ppf(x_transformed, loc=0, scale=1) + x_transformed = norm.ppf(x_transformed, loc=0, scale=1) elif distribution == "logistic": - x_transformed[:] = logistic.ppf(x_transformed) + x_transformed = logistic.ppf(x_transformed) return x_transformed @@ -372,7 +373,7 @@ def inv_transform_variable(td_obj, xv, x_transformed = norm.cdf(xv, loc=0, scale=1) elif distribution == "logistic": x_transformed = logistic.cdf(xv) - x_transformed[:] = td_obj.quantile(x_transformed) + x_transformed = td_obj.quantile(x_transformed) return x_transformed @@ -451,6 +452,7 @@ def fit(self, x, weight=None): self._fit = True return + @profile def transform(self, x, channels_last=None, pool=None): xv, x_transformed, channels_last, channel_dim, x_col_order = self.process_x_for_transform(x, channels_last) td_objs = self.attributes_to_td_objs() @@ -486,7 +488,7 @@ def transform(self, x, channels_last=None, pool=None): del outputs[:] else: for td_obj in td_i_objs: - x_transformed[:, td_obj[0]] = trans_var_func(td_obj[1], xv[:, td_obj[0]]) + x_transformed[:, td_obj[0]] = trans_var_func(td_obj[1], xv[:, td_obj[0]]).copy() x_transformed_final = self.package_transformed_x(x_transformed, x) return x_transformed_final From fade1ee548077b3aa00cb17a97928f6ab8f3ed74 Mon Sep 17 00:00:00 2001 From: David John Gagne Date: Fri, 12 Apr 2024 18:32:15 -0600 Subject: [PATCH 02/10] Updates to distributed to reduce memory usage --- bridgescaler/distributed.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/bridgescaler/distributed.py b/bridgescaler/distributed.py index 5c31df4..d1b1397 100644 --- a/bridgescaler/distributed.py +++ b/bridgescaler/distributed.py @@ -2,14 +2,13 @@ from numpy.lib.recfunctions import structured_to_unstructured, unstructured_to_structured from copy import deepcopy from crick import TDigest as CTDigest -from numba_stats import norm +#from numba_stats import norm import pandas as pd import xarray as xr from pytdigest import TDigest from functools import partial -from scipy.stats import logistic +from scipy.stats import norm, logistic from warnings import warn -from memory_profiler import profile CENTROID_DTYPE = np.dtype([('mean', np.float64), ('weight', np.float64)]) @@ -357,13 +356,15 @@ def fit_variable(var_index, xv_shared=None, compression=None, channels_last=None def transform_variable(td_obj, xv, min_val=0.000001, max_val=0.9999999, distribution="normal"): x_transformed = td_obj.cdf(xv) - x_transformed = np.minimum(x_transformed, max_val) - x_transformed = np.maximum(x_transformed, min_val) + np.minimum(x_transformed, max_val, out=x_transformed) + np.maximum(x_transformed, min_val, out=x_transformed) if distribution == "normal": - x_transformed = norm.ppf(x_transformed, loc=0, scale=1) + x_transformed_final = norm.ppf(x_transformed, loc=0, scale=1) elif distribution == "logistic": - x_transformed = logistic.ppf(x_transformed) - return x_transformed + x_transformed_final = logistic.ppf(x_transformed) + else: + x_transformed_final = x_transformed + return x_transformed_final def inv_transform_variable(td_obj, xv, @@ -452,7 +453,6 @@ def fit(self, x, weight=None): self._fit = True return - @profile def transform(self, x, channels_last=None, pool=None): xv, x_transformed, channels_last, channel_dim, x_col_order = self.process_x_for_transform(x, channels_last) td_objs = self.attributes_to_td_objs() From 09d4899411d203d15361346c311605b0083c315e Mon Sep 17 00:00:00 2001 From: David John Gagne Date: Sat, 13 Apr 2024 14:13:46 -0600 Subject: [PATCH 03/10] Added numpy mem example --- scripts/numpy_mem.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 scripts/numpy_mem.py diff --git a/scripts/numpy_mem.py b/scripts/numpy_mem.py new file mode 100644 index 0000000..3baf3dd --- /dev/null +++ b/scripts/numpy_mem.py @@ -0,0 +1,18 @@ +import numpy as np +import matplotlib.pyplot as plt +import psutil +import xarray as xr +mem = [] +mem.append(psutil.virtual_memory()[3]) +def get_data(): + return np.zeros((1000, 50, 50), dtype=np.float32) +data = get_data() +mem.append(psutil.virtual_memory()[3]) +for i in range(data.shape[0]): + data[i] = np.random.random((50, 50)) + mem.append(psutil.virtual_memory()[3]) +mem.append(psutil.virtual_memory()[3]) +xd = xr.DataArray(data) +mem.append(psutil.virtual_memory()[3]) +plt.plot(mem) +plt.show() From d8371071be73cc7e914f6b553aa92456df4d8a0d Mon Sep 17 00:00:00 2001 From: "djgagne@ou.edu" Date: Sat, 13 Apr 2024 14:28:22 -0600 Subject: [PATCH 04/10] Updated memory --- scripts/numpy_mem.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/scripts/numpy_mem.py b/scripts/numpy_mem.py index 3baf3dd..3ccdff2 100644 --- a/scripts/numpy_mem.py +++ b/scripts/numpy_mem.py @@ -1,3 +1,5 @@ +import matplotlib +matplotlib.use('agg') import numpy as np import matplotlib.pyplot as plt import psutil @@ -15,4 +17,4 @@ def get_data(): xd = xr.DataArray(data) mem.append(psutil.virtual_memory()[3]) plt.plot(mem) -plt.show() +plt.savefig("mem_profile.png", dpi=200, bbox_inches="tight") From 488d209c120899be0fc0a6b22f88fbad33ce4b33 Mon Sep 17 00:00:00 2001 From: David John Gagne Date: Sat, 13 Apr 2024 14:32:43 -0600 Subject: [PATCH 05/10] Hmm --- scripts/numpy_mem.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/scripts/numpy_mem.py b/scripts/numpy_mem.py index 3ccdff2..09117d1 100644 --- a/scripts/numpy_mem.py +++ b/scripts/numpy_mem.py @@ -5,16 +5,14 @@ import psutil import xarray as xr mem = [] -mem.append(psutil.virtual_memory()[3]) def get_data(): return np.zeros((1000, 50, 50), dtype=np.float32) data = get_data() -mem.append(psutil.virtual_memory()[3]) for i in range(data.shape[0]): data[i] = np.random.random((50, 50)) - mem.append(psutil.virtual_memory()[3]) -mem.append(psutil.virtual_memory()[3]) + mem.append(psutil.virtual_memory()[1]) +mem.append(psutil.virtual_memory()[1]) xd = xr.DataArray(data) -mem.append(psutil.virtual_memory()[3]) +mem.append(psutil.virtual_memory()[1]) plt.plot(mem) plt.savefig("mem_profile.png", dpi=200, bbox_inches="tight") From efec81ac1e3288367c0df1728d114c116c80dbdb Mon Sep 17 00:00:00 2001 From: David John Gagne Date: Mon, 15 Apr 2024 08:40:05 -0600 Subject: [PATCH 06/10] Trying to figure out issues --- bridgescaler/distributed.py | 24 +++++++++++++++++----- scripts/eval_scaler.py | 40 ++++++++++++++++++++++++++++++++---- scripts/scipy_ppf_example.py | 38 ++++++++++++++++++++++++++++++++++ 3 files changed, 93 insertions(+), 9 deletions(-) create mode 100644 scripts/scipy_ppf_example.py diff --git a/bridgescaler/distributed.py b/bridgescaler/distributed.py index d1b1397..d7fef19 100644 --- a/bridgescaler/distributed.py +++ b/bridgescaler/distributed.py @@ -2,15 +2,17 @@ from numpy.lib.recfunctions import structured_to_unstructured, unstructured_to_structured from copy import deepcopy from crick import TDigest as CTDigest -#from numba_stats import norm +from numba_stats import norm import pandas as pd import xarray as xr from pytdigest import TDigest from functools import partial -from scipy.stats import norm, logistic +from scipy.stats import logistic from warnings import warn - +import psutil +from memory_profiler import profile CENTROID_DTYPE = np.dtype([('mean', np.float64), ('weight', np.float64)]) +import gc class DBaseScaler(object): """ @@ -352,10 +354,21 @@ def fit_variable(var_index, xv_shared=None, compression=None, channels_last=None td_obj.update(xv[:, var_index].ravel()) return td_obj - +@profile def transform_variable(td_obj, xv, min_val=0.000001, max_val=0.9999999, distribution="normal"): + process = psutil.Process() + + # Record initial memory usage + stats_before = gc.get_stats() + #x_transformed = np.zeros(xv.shape, dtype=xv.dtype) + initial_memory = process.memory_info().rss x_transformed = td_obj.cdf(xv) + final_memory = process.memory_info().rss + stats_after = gc.get_stats() + print("Memory used:", (final_memory - initial_memory) / 1e6) + print(stats_before) + print(stats_after) np.minimum(x_transformed, max_val, out=x_transformed) np.maximum(x_transformed, min_val, out=x_transformed) if distribution == "normal": @@ -453,6 +466,7 @@ def fit(self, x, weight=None): self._fit = True return + @profile def transform(self, x, channels_last=None, pool=None): xv, x_transformed, channels_last, channel_dim, x_col_order = self.process_x_for_transform(x, channels_last) td_objs = self.attributes_to_td_objs() @@ -488,7 +502,7 @@ def transform(self, x, channels_last=None, pool=None): del outputs[:] else: for td_obj in td_i_objs: - x_transformed[:, td_obj[0]] = trans_var_func(td_obj[1], xv[:, td_obj[0]]).copy() + x_transformed[:, td_obj[0]] = trans_var_func(td_obj[1], xv[:, td_obj[0]]) x_transformed_final = self.package_transformed_x(x_transformed, x) return x_transformed_final diff --git a/scripts/eval_scaler.py b/scripts/eval_scaler.py index eee906a..e8b3e95 100644 --- a/scripts/eval_scaler.py +++ b/scripts/eval_scaler.py @@ -5,7 +5,10 @@ import xarray as xr import os from multiprocessing import Pool -from multiprocessing.pool import ThreadPool +import psutil +from scipy.special import ndtri +from scipy.stats import norm +from memory_profiler import profile def make_test_data(): np.random.seed(34325) @@ -13,7 +16,7 @@ def make_test_data(): col_names = ["a", "b", "c", "d", "e"] test_data["means"] = np.array([0, 5.3, -2.421, 21456.3, 1.e-5]) test_data["sds"] = np.array([5, 352.2, 1e-4, 20000.3, 5.3e-2]) - test_data["n_examples"] = np.array([1000000, 500, 88]) + test_data["n_examples"] = np.array([100000, 500, 88]) test_data["numpy_2d"] = [] test_data["numpy_4d"] = [] test_data["pandas"] = [] @@ -98,11 +101,40 @@ def eval_dquantile_scaler(test_data): pool.join() return +@profile +def small_eval(test_data): + process = psutil.Process() + + # Record initial memory usage + + test_data_c_first = test_data["xarray"][0].transpose("batch", "variable", "y", "x").astype("float32") + xr_dss_f = DQuantileScaler(distribution="normal", channels_last=False) + xr_dss_f.fit(test_data_c_first) + bt_memory = process.memory_info().rss + initial_memory = process.memory_info().rss + print(initial_memory/1e6) + xr_dss_f.distribution = None + test_data_c_first = xr_dss_f.transform(test_data_c_first) + test_data_c_sec = ndtri(test_data_c_first) + output_arr = np.full((1000, 50, 50), 0.5) + output_arr = norm.ppf(output_arr) + output_arr = np.full((1000, 50, 50), 0.5) + output_arr = ndtri(output_arr) + at_memory = process.memory_info().rss + print("final mem:", at_memory / 1e6) + + print("mem diff:", (at_memory - bt_memory) / 1e6) + return test_data_c_first + + if __name__ == "__main__": - from time import perf_counter, time + from time import time start = time() test_data = make_test_data() - eval_dquantile_scaler(test_data) + test_data_c_first = test_data["xarray"][0].transpose("batch", "variable", "y", "x").astype("float32") + print(test_data["xarray"][0]) + test_data_c_first[:] = small_eval(test_data) + #eval_dquantile_scaler(test_data) stop = time() print(stop - start) diff --git a/scripts/scipy_ppf_example.py b/scripts/scipy_ppf_example.py new file mode 100644 index 0000000..7dd650a --- /dev/null +++ b/scripts/scipy_ppf_example.py @@ -0,0 +1,38 @@ +from scipy.stats import norm +from scipy.special import ndtri +import numpy as np +import matplotlib.pyplot as plt +import psutil +import gc + +process = psutil.Process() +n_elements = 301 +mem_vals = np.zeros(n_elements) +mem_vals[0] = process.memory_info().rss / 1e6 +for i in range(1, n_elements): + x = np.random.random(size=(100, 50, 50)) + ppf_val = ndtri(x) + mem_vals[i] = process.memory_info().rss / 1e6 + gc.collect() +plt.plot(mem_vals[1:] - mem_vals[0], label="ndtri") +mem_vals = np.zeros(n_elements) +mem_vals[0] = process.memory_info().rss / 1e6 +for i in range(1, n_elements): + x = np.random.random(size=(100, 50, 50)) + + ppf_val = norm.ppf(x) + mem_vals[i] = process.memory_info().rss / 1e6 + gc.collect() +plt.plot(mem_vals[1:] - mem_vals[0], label="norm.ppf") +mem_vals = np.zeros(n_elements) +mem_vals[0] = process.memory_info().rss / 1e6 +for i in range(1, n_elements): + x = np.random.random(size=(100, 50, 50)) + + mem_vals[i] = process.memory_info().rss / 1e6 + gc.collect() +plt.plot(mem_vals[1:] - mem_vals[0], label="control") +plt.xlabel("Iterations") +plt.ylabel("Memory usage (MB)") +plt.legend() +plt.savefig("norm_usage_tracking.png", dpi=200, bbox_inches="tight") From 1de227168af9d8aecaca98bc3986522032f416c0 Mon Sep 17 00:00:00 2001 From: "djgagne@ou.edu" Date: Mon, 15 Apr 2024 08:40:35 -0600 Subject: [PATCH 07/10] Test --- bridgescaler/distributed.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/bridgescaler/distributed.py b/bridgescaler/distributed.py index d1b1397..d2ac858 100644 --- a/bridgescaler/distributed.py +++ b/bridgescaler/distributed.py @@ -9,7 +9,7 @@ from functools import partial from scipy.stats import norm, logistic from warnings import warn - +from memory_profiler import profile CENTROID_DTYPE = np.dtype([('mean', np.float64), ('weight', np.float64)]) class DBaseScaler(object): @@ -352,7 +352,7 @@ def fit_variable(var_index, xv_shared=None, compression=None, channels_last=None td_obj.update(xv[:, var_index].ravel()) return td_obj - +@profile def transform_variable(td_obj, xv, min_val=0.000001, max_val=0.9999999, distribution="normal"): x_transformed = td_obj.cdf(xv) @@ -453,6 +453,7 @@ def fit(self, x, weight=None): self._fit = True return + @profile def transform(self, x, channels_last=None, pool=None): xv, x_transformed, channels_last, channel_dim, x_col_order = self.process_x_for_transform(x, channels_last) td_objs = self.attributes_to_td_objs() From d400f80ecf83d1ae2b16f250971f0b10c4f43987 Mon Sep 17 00:00:00 2001 From: David John Gagne Date: Mon, 15 Apr 2024 17:29:30 -0600 Subject: [PATCH 08/10] Added numba-vectorized tdigest cdf. --- bridgescaler/distributed.py | 63 ++++++++++++++++++++++++++++++++++-- scripts/eval_scaler.py | 1 - scripts/scipy_ppf_example.py | 3 +- 3 files changed, 61 insertions(+), 6 deletions(-) diff --git a/bridgescaler/distributed.py b/bridgescaler/distributed.py index d7fef19..0419129 100644 --- a/bridgescaler/distributed.py +++ b/bridgescaler/distributed.py @@ -3,6 +3,7 @@ from copy import deepcopy from crick import TDigest as CTDigest from numba_stats import norm +from scipy.special import ndtri import pandas as pd import xarray as xr from pytdigest import TDigest @@ -13,6 +14,7 @@ from memory_profiler import profile CENTROID_DTYPE = np.dtype([('mean', np.float64), ('weight', np.float64)]) import gc +from numba import vectorize, guvectorize, float32, float64, void class DBaseScaler(object): """ @@ -354,6 +356,7 @@ def fit_variable(var_index, xv_shared=None, compression=None, channels_last=None td_obj.update(xv[:, var_index].ravel()) return td_obj + @profile def transform_variable(td_obj, xv, min_val=0.000001, max_val=0.9999999, distribution="normal"): @@ -363,7 +366,13 @@ def transform_variable(td_obj, xv, stats_before = gc.get_stats() #x_transformed = np.zeros(xv.shape, dtype=xv.dtype) initial_memory = process.memory_info().rss - x_transformed = td_obj.cdf(xv) + #x_transformed = td_obj.cdf(xv) + td_centroids = td_obj.centroids() + print(xv.min(), xv.max(), xv.shape) + x_transformed = np.zeros(xv.shape, xv.dtype) + tdigest_cdf(xv, td_centroids["mean"], td_centroids["weight"], + td_obj.min(), td_obj.max(), x_transformed) + print(x_transformed.min(), x_transformed.max(), x_transformed.shape) final_memory = process.memory_info().rss stats_after = gc.get_stats() print("Memory used:", (final_memory - initial_memory) / 1e6) @@ -372,7 +381,7 @@ def transform_variable(td_obj, xv, np.minimum(x_transformed, max_val, out=x_transformed) np.maximum(x_transformed, min_val, out=x_transformed) if distribution == "normal": - x_transformed_final = norm.ppf(x_transformed, loc=0, scale=1) + x_transformed_final = ndtri(x_transformed) elif distribution == "logistic": x_transformed_final = logistic.ppf(x_transformed) else: @@ -380,6 +389,55 @@ def transform_variable(td_obj, xv, return x_transformed_final +@guvectorize([void(float64[:], float64[:], float64[:], float64, float64, float64[:]), + void(float32[:], float64[:], float64[:], float64, float64, float32[:])], "(m),(n),(n),(),()->(m)") +def tdigest_cdf(xv, centroids_mean, centroids_weight, min_val, max_val, out): + for i, x in enumerate(xv): + if centroids_mean.size == 0: + out[i] = np.nan + + # Single centroid + if centroids_mean.size == 1: + if x < min_val: + out[i] = 0.0 + elif x > max_val: + out[i] = 1.0 + elif max_val - min_val < np.finfo(np.float64).eps: + out[i] = 0.5 + else: + out[i] = (x - min_val) / (max_val - min_val) + # Equality checks only apply if > 1 centroid + if x >= max_val: + out[i] = 1.0 + elif x <= min_val: + out[i] = 0.0 + + # i_l = bisect_left_mean(T->merge_centroids, x, 0, T->ncentroids); + i_l = np.searchsorted(centroids_mean, x, side="left") + + if x < centroids_mean[0]: + # min < x < first centroid + x0 = min_val + x1 = centroids_mean[0] + dw = centroids_weight[0] / 2.0 + out[i] = dw * (x - x0) / (x1 - x0) / centroids_weight.sum() + elif i_l == centroids_mean.size: + # last centroid < x < max + x0 = centroids_mean[i_l - 1] + x1 = max_val + dw = centroids_weight[i_l - 1] / 2.0 + out[i] = 1 - dw * (x1 - x) / (x1 - x0) / centroids_weight.sum() + elif centroids_mean[i_l] == x: + # x is equal to one or more centroids + i_r = np.searchsorted(centroids_mean, x, side="right") + out[i] = centroids_weight[i_r] / centroids_weight.sum() + else: + assert centroids_mean[i_l] > x + x0 = centroids_mean[i_l - 1] + x1 = centroids_mean[i_l] + dw = 0.5 * (centroids_weight[i_l - 1] + centroids_weight[i_l]) + out[i] = (centroids_weight[i_l - 1] + dw * (x - x0) / (x1 - x0)) / centroids_weight.sum() + def inv_transform_variable(td_obj, xv, distribution="normal"): x_transformed = np.zeros(xv.shape, dtype=xv.dtype) @@ -466,7 +524,6 @@ def fit(self, x, weight=None): self._fit = True return - @profile def transform(self, x, channels_last=None, pool=None): xv, x_transformed, channels_last, channel_dim, x_col_order = self.process_x_for_transform(x, channels_last) td_objs = self.attributes_to_td_objs() diff --git a/scripts/eval_scaler.py b/scripts/eval_scaler.py index e8b3e95..07c6712 100644 --- a/scripts/eval_scaler.py +++ b/scripts/eval_scaler.py @@ -101,7 +101,6 @@ def eval_dquantile_scaler(test_data): pool.join() return -@profile def small_eval(test_data): process = psutil.Process() diff --git a/scripts/scipy_ppf_example.py b/scripts/scipy_ppf_example.py index 7dd650a..a326c39 100644 --- a/scripts/scipy_ppf_example.py +++ b/scripts/scipy_ppf_example.py @@ -17,9 +17,9 @@ plt.plot(mem_vals[1:] - mem_vals[0], label="ndtri") mem_vals = np.zeros(n_elements) mem_vals[0] = process.memory_info().rss / 1e6 + for i in range(1, n_elements): x = np.random.random(size=(100, 50, 50)) - ppf_val = norm.ppf(x) mem_vals[i] = process.memory_info().rss / 1e6 gc.collect() @@ -28,7 +28,6 @@ mem_vals[0] = process.memory_info().rss / 1e6 for i in range(1, n_elements): x = np.random.random(size=(100, 50, 50)) - mem_vals[i] = process.memory_info().rss / 1e6 gc.collect() plt.plot(mem_vals[1:] - mem_vals[0], label="control") From 8cce4378fb67bc81739760966284203fc91b485e Mon Sep 17 00:00:00 2001 From: David John Gagne Date: Mon, 15 Apr 2024 17:36:51 -0600 Subject: [PATCH 09/10] Removed profile and print statements --- bridgescaler/distributed.py | 27 ++++++++++----------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/bridgescaler/distributed.py b/bridgescaler/distributed.py index 0419129..28b8068 100644 --- a/bridgescaler/distributed.py +++ b/bridgescaler/distributed.py @@ -10,11 +10,8 @@ from functools import partial from scipy.stats import logistic from warnings import warn -import psutil -from memory_profiler import profile +from numba import guvectorize, float32, float64, void CENTROID_DTYPE = np.dtype([('mean', np.float64), ('weight', np.float64)]) -import gc -from numba import vectorize, guvectorize, float32, float64, void class DBaseScaler(object): """ @@ -357,27 +354,23 @@ def fit_variable(var_index, xv_shared=None, compression=None, channels_last=None return td_obj -@profile def transform_variable(td_obj, xv, min_val=0.000001, max_val=0.9999999, distribution="normal"): - process = psutil.Process() + # process = psutil.Process() # Record initial memory usage - stats_before = gc.get_stats() - #x_transformed = np.zeros(xv.shape, dtype=xv.dtype) - initial_memory = process.memory_info().rss - #x_transformed = td_obj.cdf(xv) + # initial_memory = process.memory_info().rss td_centroids = td_obj.centroids() - print(xv.min(), xv.max(), xv.shape) + # print(xv.min(), xv.max(), xv.shape) x_transformed = np.zeros(xv.shape, xv.dtype) tdigest_cdf(xv, td_centroids["mean"], td_centroids["weight"], td_obj.min(), td_obj.max(), x_transformed) - print(x_transformed.min(), x_transformed.max(), x_transformed.shape) - final_memory = process.memory_info().rss - stats_after = gc.get_stats() - print("Memory used:", (final_memory - initial_memory) / 1e6) - print(stats_before) - print(stats_after) + # print(x_transformed.min(), x_transformed.max(), x_transformed.shape) + # final_memory = process.memory_info().rss + # stats_after = gc.get_stats() + #print("Memory used:", (final_memory - initial_memory) / 1e6) + # print(stats_before) + # print(stats_after) np.minimum(x_transformed, max_val, out=x_transformed) np.maximum(x_transformed, min_val, out=x_transformed) if distribution == "normal": From 55f4bb157d7fe11643df8245c4449ad556e7a8bb Mon Sep 17 00:00:00 2001 From: David John Gagne Date: Tue, 16 Apr 2024 21:37:56 -0600 Subject: [PATCH 10/10] Fixed bugs in numba tdigest code and added tdigest_quantile. --- bridgescaler/distributed.py | 157 ++++++++++++++++++++++-------------- 1 file changed, 98 insertions(+), 59 deletions(-) diff --git a/bridgescaler/distributed.py b/bridgescaler/distributed.py index 28b8068..61b2d14 100644 --- a/bridgescaler/distributed.py +++ b/bridgescaler/distributed.py @@ -2,8 +2,8 @@ from numpy.lib.recfunctions import structured_to_unstructured, unstructured_to_structured from copy import deepcopy from crick import TDigest as CTDigest +from scipy.special import ndtr, ndtri from numba_stats import norm -from scipy.special import ndtri import pandas as pd import xarray as xr from pytdigest import TDigest @@ -356,90 +356,129 @@ def fit_variable(var_index, xv_shared=None, compression=None, channels_last=None def transform_variable(td_obj, xv, min_val=0.000001, max_val=0.9999999, distribution="normal"): - # process = psutil.Process() - - # Record initial memory usage - # initial_memory = process.memory_info().rss td_centroids = td_obj.centroids() - # print(xv.min(), xv.max(), xv.shape) - x_transformed = np.zeros(xv.shape, xv.dtype) + x_transformed = np.zeros_like(xv) tdigest_cdf(xv, td_centroids["mean"], td_centroids["weight"], td_obj.min(), td_obj.max(), x_transformed) - # print(x_transformed.min(), x_transformed.max(), x_transformed.shape) - # final_memory = process.memory_info().rss - # stats_after = gc.get_stats() - #print("Memory used:", (final_memory - initial_memory) / 1e6) - # print(stats_before) - # print(stats_after) - np.minimum(x_transformed, max_val, out=x_transformed) - np.maximum(x_transformed, min_val, out=x_transformed) + x_transformed = np.minimum(x_transformed, max_val) + x_transformed = np.maximum(x_transformed, min_val) if distribution == "normal": - x_transformed_final = ndtri(x_transformed) + x_transformed = ndtri(x_transformed) elif distribution == "logistic": - x_transformed_final = logistic.ppf(x_transformed) - else: - x_transformed_final = x_transformed - return x_transformed_final + x_transformed = logistic.ppf(x_transformed) + return x_transformed + + +def inv_transform_variable(td_obj, xv, + distribution="normal"): + td_centroids = td_obj.centroids() + x_transformed = np.zeros_like(xv) + if distribution == "normal": + x_transformed = ndtr(xv) + elif distribution == "logistic": + x_transformed = logistic.cdf(xv) + tdigest_quantile(xv, td_centroids["mean"], td_centroids["weight"], + td_obj.min(), td_obj.max(), x_transformed) + return x_transformed @guvectorize([void(float64[:], float64[:], float64[:], float64, float64, float64[:]), void(float32[:], float64[:], float64[:], float64, float64, float32[:])], "(m),(n),(n),(),()->(m)") -def tdigest_cdf(xv, centroids_mean, centroids_weight, min_val, max_val, out): +def tdigest_cdf(xv, cent_mean, cent_weight, t_min, t_max, out): + cent_merged_weight = np.zeros_like(cent_weight) + cumulative_weight = 0 + for i in range(cent_weight.size): + cent_merged_weight[i] = cumulative_weight + cent_weight[i] / 2.0 + cumulative_weight += cent_weight[i] + total_weight = cent_weight.sum() for i, x in enumerate(xv): - if centroids_mean.size == 0: + if cent_mean.size == 0: out[i] = np.nan - + continue # Single centroid - if centroids_mean.size == 1: - if x < min_val: + if cent_mean.size == 1: + if x < t_min: out[i] = 0.0 - elif x > max_val: + elif x > t_max: out[i] = 1.0 - elif max_val - min_val < np.finfo(np.float64).eps: + elif t_max - t_min < np.finfo(np.float64).eps: out[i] = 0.5 else: - out[i] = (x - min_val) / (max_val - min_val) + out[i] = (x - t_min) / (t_max - t_min) + continue # Equality checks only apply if > 1 centroid - if x >= max_val: + if x >= t_max: out[i] = 1.0 - elif x <= min_val: + continue + elif x <= t_min: out[i] = 0.0 + continue # i_l = bisect_left_mean(T->merge_centroids, x, 0, T->ncentroids); - i_l = np.searchsorted(centroids_mean, x, side="left") - - if x < centroids_mean[0]: + i_l = np.searchsorted(cent_mean, x, side="left") + if x < cent_mean[0]: # min < x < first centroid - x0 = min_val - x1 = centroids_mean[0] - dw = centroids_weight[0] / 2.0 - out[i] = dw * (x - x0) / (x1 - x0) / centroids_weight.sum() - elif i_l == centroids_mean.size: + x0 = t_min + x1 = cent_mean[0] + dw = cent_merged_weight[0] / 2.0 + out[i] = dw * (x - x0) / (x1 - x0) / total_weight + elif i_l == cent_mean.size: # last centroid < x < max - x0 = centroids_mean[i_l - 1] - x1 = max_val - dw = centroids_weight[i_l - 1] / 2.0 - out[i] = 1 - dw * (x1 - x) / (x1 - x0) / centroids_weight.sum() - elif centroids_mean[i_l] == x: + x0 = cent_mean[i_l - 1] + x1 = t_max + dw = cent_weight[i_l - 1] / 2.0 + out[i] = 1.0 - dw * (x1 - x) / (x1 - x0) / total_weight + elif cent_mean[i_l] == x: # x is equal to one or more centroids - i_r = np.searchsorted(centroids_mean, x, side="right") - out[i] = centroids_weight[i_r] / centroids_weight.sum() + i_r = np.searchsorted(cent_mean, x, side="right") + out[i] = cent_merged_weight[i_r] / total_weight else: - assert centroids_mean[i_l] > x - x0 = centroids_mean[i_l - 1] - x1 = centroids_mean[i_l] - dw = 0.5 * (centroids_weight[i_l - 1] + centroids_weight[i_l]) - out[i] = (centroids_weight[i_l - 1] + dw * (x - x0) / (x1 - x0)) / centroids_weight.sum() + assert cent_mean[i_l] > x + x0 = cent_mean[i_l - 1] + x1 = cent_mean[i_l] + dw = 0.5 * (cent_weight[i_l - 1] + cent_weight[i_l]) + out[i] = (cent_merged_weight[i_l - 1] + dw * (x - x0) / (x1 - x0)) / total_weight -def inv_transform_variable(td_obj, xv, - distribution="normal"): - x_transformed = np.zeros(xv.shape, dtype=xv.dtype) - if distribution == "normal": - x_transformed = norm.cdf(xv, loc=0, scale=1) - elif distribution == "logistic": - x_transformed = logistic.cdf(xv) - x_transformed = td_obj.quantile(x_transformed) - return x_transformed + +@guvectorize([void(float64[:], float64[:], float64[:], float64, float64, float64[:]), + void(float32[:], float64[:], float64[:], float64, float64, float32[:])], "(m),(n),(n),(),()->(m)") +def tdigest_quantile(qv, cent_mean, cent_weight, t_min, t_max, out): + cent_merged_weight = np.zeros_like(cent_weight) + cumulative_weight = 0 + for i in range(cent_weight.size): + cent_merged_weight[i] = cumulative_weight + cent_weight[i] / 2.0 + cumulative_weight += cent_weight[i] + total_weight = cent_weight.sum() + for i, q in enumerate(qv): + if total_weight == 0: + out[i] = np.nan + continue + if q <= 0: + out[i] = t_min + continue + if q >= 1: + out[i] = t_max + continue + if cent_mean.size == 1: + out[i] = cent_mean[0] + continue + + index = q * total_weight + b = np.searchsorted(cent_merged_weight, index, side="left") + if b == 0: + x0 = 0 + y0 = t_min + else: + x0 = cent_merged_weight[b - 1] + y0 = cent_mean[b - 1] + + if b == cent_mean.size: + x1 = total_weight + y1 = t_max + else: + x1 = cent_merged_weight[b] + y1 = cent_mean[b] + out[i] = y0 + (index - x0) * (y1 - y0) / (x1 - x0) class DQuantileScaler(DBaseScaler):