Skip to content

Commit

Permalink
Merge pull request #14 from NCAR/deep
Browse files Browse the repository at this point in the history
Created parallel DQuantileScaler using crick library.
  • Loading branch information
djgagne authored Mar 29, 2024
2 parents 3f9a682 + 521cceb commit c93666e
Show file tree
Hide file tree
Showing 11 changed files with 462 additions and 32 deletions.
2 changes: 1 addition & 1 deletion bridgescaler/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.5.1
0.6.0
30 changes: 26 additions & 4 deletions bridgescaler/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
SplineTransformer, PowerTransformer)
from bridgescaler.group import GroupStandardScaler, GroupRobustScaler, GroupMinMaxScaler
from bridgescaler.deep import DeepStandardScaler, DeepMinMaxScaler, DeepQuantileTransformer
from bridgescaler.distributed import DStandardScaler, DMinMaxScaler, DQuantileTransformer
from bridgescaler.distributed import DStandardScaler, DMinMaxScaler, DQuantileTransformer, DQuantileScaler
import numpy as np
import json
import pandas as pd
from numpy.lib.format import descr_to_dtype, dtype_to_descr
from base64 import b64decode, b64encode
from typing import Any


scaler_objs = {"StandardScaler": StandardScaler,
"MinMaxScaler": MinMaxScaler,
Expand All @@ -22,6 +26,7 @@
"DeepQuantileTransformer": DeepQuantileTransformer,
"DStandardScaler": DStandardScaler,
"DMinMaxScaler": DMinMaxScaler,
"DQuantileScaler": DQuantileScaler,
"DQuantileTransformer": DQuantileTransformer}


Expand Down Expand Up @@ -55,6 +60,14 @@ def print_scaler(scaler):
return json.dumps(scaler_params, indent=4, sort_keys=True, cls=NumpyEncoder)


def object_hook(dct: dict[Any, Any]) -> dict[Any, Any] | np.ndarray | np.generic:
if "__numpy__" in dct:
np_obj = np.frombuffer(
b64decode(dct["__numpy__"]), descr_to_dtype(dct["dtype"])
)
return np_obj.reshape(shape) if (shape := dct["shape"]) else np_obj[0]
return dct

def read_scaler(scaler_str):
"""
Initialize scikit-learn or bridgescaler scaler from json str.
Expand All @@ -65,11 +78,11 @@ def read_scaler(scaler_str):
Returns:
scaler object.
"""
scaler_params = json.loads(scaler_str)
scaler_params = json.loads(scaler_str, object_hook=object_hook)
scaler = scaler_objs[scaler_params["type"]]()
del scaler_params["type"]
for k, v in scaler_params.items():
if type(v) is dict:
if isinstance(v, dict) and v["object"] == "ndarray":
setattr(scaler, k, np.array(v['data'], dtype=v['dtype']).reshape(v['shape']))
else:
setattr(scaler, k, v)
Expand Down Expand Up @@ -107,10 +120,19 @@ def default(self, obj):
elif isinstance(obj, (np.complex_, np.complex64, np.complex128)):
return {'real': obj.real, 'imag': obj.imag}

elif isinstance(obj, (np.ndarray,)):
elif isinstance(obj, (np.ndarray,)) and obj.dtype == "|O":
return {'object': 'ndarray', 'dtype': obj.dtype.str, 'shape': list(obj.shape),
'data': obj.ravel().tolist()}

elif isinstance(obj, (np.ndarray, np.generic)):
return {
"__numpy__": b64encode(
obj.data if obj.flags.c_contiguous else obj.tobytes()
).decode(),
"dtype": dtype_to_descr(obj.dtype),
"shape": obj.shape,
}

elif isinstance(obj, (np.bool_)):
return bool(obj)

Expand Down
227 changes: 219 additions & 8 deletions bridgescaler/distributed.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
import numpy as np
from numpy.lib.recfunctions import structured_to_unstructured, unstructured_to_structured
from copy import deepcopy
from crick import TDigest as CTDigest
from numba_stats import norm
import pandas as pd
import xarray as xr
from pytdigest import TDigest
from scipy.stats import norm, logistic
from functools import partial
from scipy.stats import logistic
from warnings import warn

CENTROID_DTYPE = np.dtype([('mean', np.float64), ('weight', np.float64)])

class DBaseScaler(object):
"""
Expand Down Expand Up @@ -87,11 +95,9 @@ def package_transformed_x(x_transformed, x):
"""
if hasattr(x, "columns"):
x_packaged = deepcopy(x)
x_packaged.loc[:, :] = x_transformed
x_packaged = pd.DataFrame(x_transformed, index=x.index, columns=x.columns)
elif hasattr(x, "coords"):
x_packaged = deepcopy(x)
x_packaged[:] = x_transformed
x_packaged = xr.DataArray(x_transformed, coords=x.coords, dims=x.dims, attrs=x.attrs, name=x.name)
else:
x_packaged = x_transformed
return x_packaged
Expand Down Expand Up @@ -337,6 +343,208 @@ def __add__(self, other):
return current


def fit_variable(var_index, xv_shared=None, compression=None, channels_last=None):
xv = xv_shared
td_obj = CTDigest(compression=compression)
if channels_last:
td_obj.update(xv[..., var_index].ravel())
else:
td_obj.update(xv[:, var_index].ravel())
return td_obj


def transform_variable(td_obj, xv,
min_val=0.000001, max_val=0.9999999, distribution="normal"):
x_transformed = td_obj.cdf(xv)
x_transformed[:] = np.minimum(x_transformed, max_val)
x_transformed[:] = np.maximum(x_transformed, min_val)
if distribution == "normal":
x_transformed[:] = norm.ppf(x_transformed, loc=0, scale=1)
elif distribution == "logistic":
x_transformed[:] = logistic.ppf(x_transformed)
return x_transformed


def inv_transform_variable(td_obj, xv,
distribution="normal"):
x_transformed = np.zeros(xv.shape, dtype=xv.dtype)
if distribution == "normal":
x_transformed = norm.cdf(xv, loc=0, scale=1)
elif distribution == "logistic":
x_transformed = logistic.cdf(xv)
x_transformed[:] = td_obj.quantile(x_transformed)
return x_transformed


class DQuantileScaler(DBaseScaler):
"""
Distributed Quantile Scaler that uses the crick TDigest Cython library to compute quantiles across multiple
datasets in parallel. The library can perform fitting, transforms, and inverse transforms across variables
in parallel using the multiprocessing library. Multidimensional arrays are stored in shared memory across
processes to minimize inter-process communication.
Attributes:
compression: Recommended number of centroids to use.
distribution: "uniform", "normal", or "logistic".
min_val: Minimum value for quantile to prevent -inf results when distribution is normal or logistic.
max_val: Maximum value for quantile to prevent inf results when distribution is normal or logistic.
channels_last: Whether to assume the last dim or second dim are the channel/variable dimension.
"""
def __init__(self, compression=250, distribution="uniform", min_val=0.0000001, max_val=0.9999999, channels_last=True):
self.compression = compression
self.distribution = distribution
self.min_val = min_val
self.max_val = max_val
self.centroids_ = None
self.size_ = None
self.min_ = None
self.max_ = None

super().__init__(channels_last=channels_last)

def td_objs_to_attributes(self, td_objs):
self.centroids_ = [structured_to_unstructured(td_obj.centroids()) for td_obj in td_objs]
self.size_ = np.array([td_obj.size() for td_obj in td_objs])
self.min_ = np.array([td_obj.min() for td_obj in td_objs])
self.max_ = np.array([td_obj.max() for td_obj in td_objs])
return

def attributes_to_td_objs(self):
td_objs = []
if self.is_fit():
for i in range(self.max_.size):
td_objs.append(CTDigest(self.compression))
td_objs[-1].__setstate__((unstructured_to_structured(self.centroids_[i], CENTROID_DTYPE),
self.size_[i],
self.min_[i],
self.max_[i]))
return td_objs

def fit(self, x, weight=None):
x_columns, is_array = self.extract_x_columns(x, channels_last=self.channels_last)
xv = self.extract_array(x)
channel_dim = self.set_channel_dim()
if not self._fit:
self.x_columns_ = x_columns
self.is_array_ = is_array
fit_var_func = partial(fit_variable,
xv_shared=xv,
compression=self.compression,
channels_last=self.channels_last)
td_objs = [fit_var_func(x) for x in np.arange(xv.shape[channel_dim])]
self.td_objs_to_attributes(td_objs)
else:
assert x.shape[channel_dim] == self.x_columns_.shape[0], "New data has a different number of columns"
if is_array:
x_col_order = np.arange(x.shape[-1])
else:
x_col_order = self.get_column_order(x_columns)
td_objs = self.attributes_to_td_objs()
fit_var_func = partial(fit_variable,
xv_shared=xv,
compression=self.compression,
channels_last=self.channels_last)
new_td_objs = [fit_var_func(x) for x in np.arange(xv.shape[channel_dim])]
for i, o in enumerate(x_col_order):
td_objs[o].merge(new_td_objs[i])
self.td_objs_to_attributes(td_objs)
self._fit = True
return

def transform(self, x, channels_last=None, pool=None):
xv, x_transformed, channels_last, channel_dim, x_col_order = self.process_x_for_transform(x, channels_last)
td_objs = self.attributes_to_td_objs()
td_i_objs = [(i, td_objs[o]) for i, o in enumerate(x_col_order)]

trans_var_func = partial(transform_variable,
min_val=self.min_val, max_val=self.max_val,
distribution=self.distribution)
if channels_last:
if pool is not None:
split_indices = np.round(np.linspace(0, xv[..., 0].size, pool._processes)).astype(int)
xt_shape = x_transformed[..., 0].shape
outputs = []
for td_obj in td_i_objs:
for s, s_ind in enumerate(split_indices[1:]):
outputs.append(pool.apply_async(trans_var_func, (td_obj[1],
xv[..., td_obj[0]].ravel()[split_indices[s]:s_ind])))
x_transformed[..., td_obj[0]] = np.reshape(np.concatenate([o.get() for o in outputs]), xt_shape)
del outputs[:]
else:
for td_obj in td_i_objs:
x_transformed[..., td_obj[0]] = trans_var_func(td_obj[1], xv[..., td_obj[0]])
else:
if pool is not None:
split_indices = np.round(np.linspace(0, xv[..., 0].size, pool._processes)).astype(int)
xt_shape = x_transformed[:, 0].shape
outputs = []
for td_obj in td_i_objs:
for s, s_ind in enumerate(split_indices[1:]):
outputs.append(pool.apply_async(trans_var_func, (td_obj[1],
xv[..., td_obj[0]].ravel()[split_indices[s]:s_ind])))
x_transformed[:, td_obj[0]] = np.reshape(np.concatenate([o.get() for o in outputs]), xt_shape)
del outputs[:]
else:
for td_obj in td_i_objs:
x_transformed[:, td_obj[0]] = trans_var_func(td_obj[1], xv[:, td_obj[0]])
x_transformed_final = self.package_transformed_x(x_transformed, x)
return x_transformed_final

def fit_transform(self, x, channels_last=None, weight=None, pool=None):
self.fit(x, weight=weight)
return self.transform(x, channels_last=channels_last, pool=pool)

def inverse_transform(self, x, channels_last=None, pool=None):
xv, x_transformed, channels_last, channel_dim, x_col_order = self.process_x_for_transform(x, channels_last)
td_objs = self.attributes_to_td_objs()
td_i_objs = [(i, td_objs[o]) for i, o in enumerate(x_col_order)]
inv_trans_var_func = partial(inv_transform_variable,
distribution=self.distribution)
if channels_last:
if pool is not None:
split_indices = np.round(np.linspace(0, xv[..., 0].size, pool._processes)).astype(int)
xt_shape = x_transformed[..., 0].shape
outputs = []
for td_obj in td_i_objs:
for s, s_ind in enumerate(split_indices[1:]):
outputs.append(pool.apply_async(inv_trans_var_func, (td_obj[1],
xv[..., td_obj[0]].ravel()[split_indices[s]:s_ind])))
x_transformed[..., td_obj[0]] = np.reshape(np.concatenate([o.get() for o in outputs]), xt_shape)
del outputs[:]
else:
for td_obj in td_i_objs:
x_transformed[..., td_obj[0]] = inv_trans_var_func(td_obj[1], xv[:, td_obj[0]])
else:
if pool is not None:
split_indices = np.round(np.linspace(0, xv[..., 0].size, pool._processes)).astype(int)
xt_shape = x_transformed[:, 0].shape
outputs = []
for td_obj in td_i_objs:
for s, s_ind in enumerate(split_indices[1:]):
outputs.append(pool.apply_async(inv_trans_var_func, (td_obj[1],
xv[..., td_obj[0]].ravel()[split_indices[s]:s_ind])))
x_transformed[:, td_obj[0]] = np.reshape(np.concatenate([o.get() for o in outputs]), xt_shape)
del outputs[:]
else:
for td_obj in td_i_objs:
x_transformed[:, td_obj[0]] = inv_trans_var_func(td_obj[1], xv[:, td_obj[0]])
x_transformed_final = self.package_transformed_x(x_transformed, x)
return x_transformed_final

def __add__(self, other):
current = deepcopy(self)
td_objs = current.attributes_to_td_objs()
other_td_objs = other.attributes_to_td_objs()
assert type(other) is DQuantileScaler, "Adding mismatched scaler types."
assert current.is_fit() and other.is_fit(), "At least one scaler is not fit."
x_col_order = current.get_column_order(other.x_columns_)
assert x_col_order.size > 0, "No matching columns in other DQuantileScaler"
for i, o in enumerate(x_col_order):
td_objs[o].merge(other_td_objs[i])
current.td_objs_to_attributes(td_objs)
return current


class DQuantileTransformer(DBaseScaler):
"""
Distributed quantile transformer that uses the T-Digest algorithm to transform the input data into approximate
Expand All @@ -354,6 +562,9 @@ class DQuantileTransformer(DBaseScaler):
or the second dimension (False).
"""
def __init__(self, max_merged_centroids=1000, distribution="uniform", channels_last=True):
warn(f'{self.__class__.__name__} will be deprecated in the next version.',
DeprecationWarning, stacklevel=2)

self.max_merged_centroids = max_merged_centroids
self.distribution = distribution
self.centroids_ = None
Expand All @@ -369,7 +580,7 @@ def fit(self, x, weight=None):
self.is_array_ = is_array
# The number of centroids may vary depending on the distribution of each input variable.
# The extra spots at the end are padded with nans.
self.centroids_ = np.ones((xv.shape[-1], self.max_merged_centroids, 2)) * np.nan
self.centroids_ = np.ones((xv.shape[channel_dim], self.max_merged_centroids, 2)) * np.nan
if self.channels_last:
for i in range(xv.shape[channel_dim]):
td_obj = TDigest.compute(xv[..., i].ravel(), w=weight, compression=self.max_merged_centroids)
Expand Down Expand Up @@ -444,7 +655,7 @@ def transform(self, x, channels_last=None):
xd = xv[:, i].ravel().astype("float64")
x_transformed[:, i] = np.reshape(td_objs[o].cdf(xd), xv[:, i].shape)
if self.distribution == "normal":
x_transformed = norm.ppf(x_transformed)
x_transformed = norm.ppf(x_transformed, loc=0, scale=1)
elif self.distribution == "logistic":
x_transformed = logistic.ppf(x_transformed)
x_transformed = self.package_transformed_x(x_transformed, x)
Expand All @@ -454,7 +665,7 @@ def inverse_transform(self, x, channels_last=None):
xv, x_transformed, channels_last, channel_dim, x_col_order = self.process_x_for_transform(x, channels_last)
td_objs = self.to_digests()
if self.distribution == "normal":
x_transformed = norm.cdf(xv)
x_transformed = norm.cdf(xv, loc=0, scale=1)
elif self.distribution == "logistic":
x_transformed = logistic.cdf(xv)
if channels_last:
Expand Down
5 changes: 3 additions & 2 deletions bridgescaler/tests/backend_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import numpy as np
import os
from sklearn.preprocessing import StandardScaler, RobustScaler, MinMaxScaler, QuantileTransformer
from bridgescaler.distributed import DStandardScaler, DMinMaxScaler, DQuantileTransformer
from bridgescaler.distributed import DStandardScaler, DMinMaxScaler, DQuantileTransformer, DQuantileScaler
from pandas import DataFrame
from os.path import exists

Expand All @@ -14,7 +14,8 @@
"QuantileTransformer": QuantileTransformer,
"DStandardScaler": DStandardScaler,
"DMinMaxScaler": DMinMaxScaler,
"DQuantileTransformer": DQuantileTransformer}
"DQuantileTransformer": DQuantileTransformer,
"DQuantileScaler": DQuantileScaler}


def test_scaler_io():
Expand Down
Loading

0 comments on commit c93666e

Please sign in to comment.