diff --git a/Makefile b/Makefile index 4f5b0b2..907c50f 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ VERSION ?= $(shell git rev-parse --short HEAD) CONDA_ENV_NAME ?= vflow HATCH_ENV_NAME ?= test -.PHONY: build_conda_env build_ipykernel test_% run_tests fix_styles +.PHONY: build_conda_env build_ipykernel test_% run_tests fix_styles build_docs build_conda_env: conda create -n $(CONDA_ENV_NAME) -y python==3.10 pip @@ -20,3 +20,6 @@ run_tests: fix_styles: hatch -v run style:fmt + +build_docs: + hatch -v run docs:build diff --git a/README.md b/README.md index 22b5df0..876de41 100644 --- a/README.md +++ b/README.md @@ -82,7 +82,10 @@ Once we've written this pipeline, we can easily measure the stability of metrics See the [docs](https://yu-group.github.io/veridical-flow/) for reference on the API -> **Notebook examples** (Note that some of these require more dependencies than just those required for vflow - to install all, use the `notebooks` dependencies in the `setup.py` file) +> **Notebook examples** +> +> Note that some of these require more dependencies than just those required for +> `vflow`. To install all, run `pip install vflow[nb]`. > > [Synthetic classification](https://yu-group.github.io/veridical-flow/notebooks/00_synthetic_classification.html) > diff --git a/docs/build_docs.sh b/docs/build_docs.sh deleted file mode 100755 index 5769070..0000000 --- a/docs/build_docs.sh +++ /dev/null @@ -1,11 +0,0 @@ -cd ../vflow -pdoc --html . --output-dir ../docs --template-dir ../docs # need to have pip installed pdoc3 -cp -rf ../docs/vflow/* ../docs/ -rm -rf ../docs/vflow - -cd ../notebooks -jupyter nbconvert --to html *.ipynb -mv *.html ../docs/notebooks/ - -cd ../docs -python style_docs.py diff --git a/docs/contributing.md b/docs/contributing.md index 40b4a41..8590300 100644 --- a/docs/contributing.md +++ b/docs/contributing.md @@ -5,24 +5,38 @@ Contributions are very welcome! ## Getting Started -If you are looking to contribute to the *veridical-flow* codebase, the best place to start is the [GitHub "issues" tab](https://github.com/Yu-Group/veridical-flow/issues). This is also a great place for filing bug reports and suggesting improvement to the code and documentation. +If you are looking to contribute to the `veridical-flow` codebase, the best +place to start is the [GitHub "issues" tab](https://github.com/Yu-Group/veridical-flow/issues). This is also a great +place for filing bug reports and suggesting improvement to the code and +documentation. ## Filing Issues -If you notice a bug in the code or documentation, or have suggestions for how we can improve either, feel free to create an issue on the [GitHub "issues" tab](https://github.com/Yu-Group/veridical-flow/issues) using [GitHub's "issue" form](https://github.com/Yu-Group/veridical-flow/issues/new). Please be as descriptive as possible we so can best address your issue. +If you notice a bug in the code or documentation, or have suggestions for how we +can improve either, feel free to create an issue on the [GitHub "issues" tab](https://github.com/Yu-Group/veridical-flow/issues) using [GitHub's "issue" form](https://github.com/Yu-Group/veridical-flow/issues/new). Please be as +descriptive as possible we so can best address your issue. ## Contributing to the Codebase -After forking the [repository](https://github.com/Yu-Group/veridical-flow), be sure to create a development environment that is separate from your existing Python environment so that you can make and test changes without compromising your own work environment. To develop locally, navigate to the repo root directory and run `python setup.py develop --user`. - -Before submitting your changes for review, please make sure to check that your changes do not break any tests. -- [Tests](tests) are written with [pytest](https://docs.pytest.org/en/stable/), and can be run by running `pytest` in the repo root directory. -- [Docs](https://csinva.io/imodels/docs/) are built using [pdoc3](https://pdoc3.github.io/pdoc/) (install with `pip install pdoc3`, and can be built by navigating to the `docs` directory and running `./build_docs.sh`. Note that we follow the [numpy docstrings](https://numpydoc.readthedocs.io/en/latest/format.html) convention. - -Once your changes are ready to be submitted, make sure to push your change to GitHub before creating a pull request. See [here](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-a-pull-request-from-a-fork) for instruction on how to create a pull request from a fork. We will review your changes, and you may be asked to make additional changes before it is finally ready to merge. Once it is ready, we will merge your pull request, and you have will successfully contributed to the codebase 🎉! - - - - - - +After forking the [repository](https://github.com/Yu-Group/veridical-flow), be +sure to create a development environment that is separate from your existing +Python environment so that you can make and test changes without compromising +your own work environment. To develop locally, navigate to the repo root +directory and run `python setup.py develop --user`. + +Before submitting your changes for review, please make sure to check that your +changes do not break any tests. + +- [Tests](tests) are written with [pytest](https://docs.pytest.org/en/stable/), + and can be run by running `pytest` in the repo root directory. +- [Docs](https://csinva.io/imodels/docs/) can be built by running `hatch run + docs:build`. Note that we follow the [numpy + docstrings](https://numpydoc.readthedocs.io/en/latest/format.html) convention. + +Once your changes are ready to be submitted, make sure to push your change to +GitHub before creating a pull request. See +[here](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-a-pull-request-from-a-fork) +for instruction on how to create a pull request from a fork. We will review your +changes, and you may be asked to make additional changes before it is finally +ready to merge. Once it is ready, we will merge your pull request, and you have +will successfully contributed to the codebase 🎉! diff --git a/docs/helpers.html b/docs/helpers.html index e9f12d0..6a582fc 100644 --- a/docs/helpers.html +++ b/docs/helpers.html @@ -26,6 +26,7 @@
"""User-facing helper functions included at import vflow
"""
+
from functools import partial
from itertools import product
from typing import Union
@@ -33,9 +34,9 @@
import mlflow
import numpy as np
-from vflow.utils import dict_to_df, dict_keys, dict_data
+from vflow.utils import dict_data, dict_keys, dict_to_df
from vflow.vfunc import Vfunc
-from vflow.vset import Vset, Subkey, PREV_KEY, FILTER_PREV_KEY
+from vflow.vset import FILTER_PREV_KEY, PREV_KEY, Subkey, Vset
def init_args(args_tuple: Union[tuple, list], names=None):
@@ -47,28 +48,40 @@
given names for each of the arguments in the tuple
"""
if names is None:
- names = ['start'] * len(args_tuple)
+ names = ["start"] * len(args_tuple)
else:
- assert len(names) == len(args_tuple), 'names should be same length as args_tuple'
+ assert len(names) == len(
+ args_tuple
+ ), "names should be same length as args_tuple"
output_dicts = []
for i, _ in enumerate(args_tuple):
- output_dicts.append({
- (Subkey(names[i], 'init'),): args_tuple[i],
- PREV_KEY: ('init',),
- })
+ output_dicts.append(
+ {
+ (Subkey(names[i], "init"),): args_tuple[i],
+ PREV_KEY: ("init",),
+ }
+ )
return output_dicts
-def build_vset(name: str, func, param_dict=None, reps: int = 1,
- is_async: bool = False, output_matching: bool = False,
- lazy: bool = False, cache_dir: str = None,
- tracking_dir: str = None, **kwargs) -> Vset:
+def build_vset(
+ name: str,
+ func,
+ param_dict=None,
+ reps: int = 1,
+ is_async: bool = False,
+ output_matching: bool = False,
+ lazy: bool = False,
+ cache_dir: str = None,
+ tracking_dir: str = None,
+ **kwargs,
+) -> Vset:
"""Builds a new Vset by currying or instantiating callable `func` with all
combinations of parameters in `param_dict` and optional additional `**kwargs`.
- If `func` and `param_dict` are lists, then the ith entry of `func` will be
- curried with ith entry of `param_dict`. If only one of `func` or `param_dict`
+ If `func` and `param_dict` are lists, then the ith entry of `func` will be
+ curried with ith entry of `param_dict`. If only one of `func` or `param_dict`
is a list, the same `func`/`param_dict` will be curried for all entries in the
- list. Vfuncs are named with `param_dict` items as tuples of
+ list. Vfuncs are named with `param_dict` items as tuples of
str("param_name=param_val").
Parameters
@@ -79,7 +92,7 @@
A callable to use as the base for Vfuncs in the output Vset. Can also be
a class object, in which case the class is immediately instantiated with
the parameter combinations from `param_dict`. Can also be a list of
- callables, where the ith entry corresponds to `param_dict` or the ith
+ callables, where the ith entry corresponds to `param_dict` or the ith
entry of `param_dict` (if `param_dict` is a list).
param_dict : dict[str, list] or list[dict[str, list]], optional
A dict with string keys corresponding to argument names of `func` and
@@ -114,8 +127,9 @@
pd_list = []
if isinstance(func, list):
if isinstance(param_dict, list):
- assert len(param_dict) == len(func), \
- 'list of param_dicts must be same length as list of funcs'
+ assert len(param_dict) == len(
+ func
+ ), "list of param_dicts must be same length as list of funcs"
f_list.extend(func)
pd_list.extend(param_dict)
else:
@@ -127,29 +141,29 @@
else:
f_list.append(func)
pd_list.append(param_dict)
-
+
vfuncs = []
vkeys = []
for f, pd in zip(f_list, pd_list):
if pd is None:
pd = {}
- assert callable(f), 'func must be callable'
-
+ assert callable(f), "func must be callable"
+
kwargs_tuples = product(*list(pd.values()))
for tup in kwargs_tuples:
kwargs_dict = {}
- vkey_tup = (f'func={f.__name__}', )
+ vkey_tup = (f"func={f.__name__}",)
for param_name, param_val in zip(list(pd.keys()), tup):
kwargs_dict[param_name] = param_val
- vkey_tup += (f'{param_name}={param_val}', )
+ vkey_tup += (f"{param_name}={param_val}",)
# add additional fixed kwargs to kwargs_dict
for k, v in kwargs.items():
kwargs_dict[k] = v
for i in range(reps):
# add vfunc key to vkeys
if reps > 1:
- vkeys.append((f'rep={i}', ) + vkey_tup)
+ vkeys.append((f"rep={i}",) + vkey_tup)
else:
vkeys.append(vkey_tup)
# check if func is a class
@@ -158,18 +172,33 @@
vfuncs.append(Vfunc(vfunc=f(**kwargs_dict), name=str(vkey_tup)))
else:
# use partial to wrap func
- vfuncs.append(Vfunc(vfunc=partial(f, **kwargs_dict), name=str(vkey_tup)))
+ vfuncs.append(
+ Vfunc(vfunc=partial(f, **kwargs_dict), name=str(vkey_tup))
+ )
if all(pd is None for pd in pd_list) and reps == 1:
vkeys = None
-
- return Vset(name, vfuncs, is_async=is_async, vfunc_keys=vkeys,
- output_matching=output_matching, lazy=lazy,
- cache_dir=cache_dir, tracking_dir=tracking_dir)
+
+ return Vset(
+ name,
+ vfuncs,
+ is_async=is_async,
+ vfunc_keys=vkeys,
+ output_matching=output_matching,
+ lazy=lazy,
+ cache_dir=cache_dir,
+ tracking_dir=tracking_dir,
+ )
-def filter_vset_by_metric(metric_dict: dict, vset: Vset, *vsets: Vset, n_keep: int = 1,
- bigger_is_better: bool = True, filter_on=None,
- group: bool = False) -> Union[Vset, list]:
+def filter_vset_by_metric(
+ metric_dict: dict,
+ vset: Vset,
+ *vsets: Vset,
+ n_keep: int = 1,
+ bigger_is_better: bool = True,
+ filter_on=None,
+ group: bool = False,
+) -> Union[Vset, list]:
"""Returns a new Vset by filtering `vset.vfuncs` based on values in filter_dict.
Parameters
@@ -204,28 +233,45 @@
vset_names = []
for vset_i in vsets:
if vset_i.name not in df.columns:
- raise ValueError((f'{vset_i.name} should be one '
- 'of the columns of dict_to_df(metric_dict)'))
+ raise ValueError(
+ (
+ f"{vset_i.name} should be one "
+ "of the columns of dict_to_df(metric_dict)"
+ )
+ )
vset_names.append(vset_i.name)
if len(filter_on) > 0:
filter_col = list(metric_dict.keys())[0][-1].origin
df = df[df[filter_col].isin(filter_on)]
if group:
- df = df.groupby(by=vset_names, as_index=False).mean()
+ df = df.groupby(by=vset_names, as_index=False).mean(numeric_only=True)
if bigger_is_better:
- df = df.sort_values(by='out', ascending=False)
+ df = df.sort_values(by="out", ascending=False)
else:
- df = df.sort_values(by='out')
+ df = df.sort_values(by="out")
df = df.iloc[0:n_keep]
for i, vset_i in enumerate(vsets):
vfuncs = vset_i.vfuncs
vfunc_filter = [str(name) for name in df[vset_i.name].to_numpy()]
new_vfuncs = {k: v for k, v in vfuncs.items() if str(v.name) in vfunc_filter}
tracking_dir = None if vset_i._mlflow is None else mlflow.get_tracking_uri()
- new_vset = Vset('filtered_' + vset_i.name, new_vfuncs, is_async=vset_i._async,
- output_matching=vset_i._output_matching, lazy=vset_i._lazy,
- cache_dir=vset_i._cache_dir, tracking_dir=tracking_dir)
- setattr(new_vset, FILTER_PREV_KEY, (metric_dict[PREV_KEY], vset_i,))
+ new_vset = Vset(
+ "filtered_" + vset_i.name,
+ new_vfuncs,
+ is_async=vset_i._async,
+ output_matching=vset_i._output_matching,
+ lazy=vset_i._lazy,
+ cache_dir=vset_i._cache_dir,
+ tracking_dir=tracking_dir,
+ )
+ setattr(
+ new_vset,
+ FILTER_PREV_KEY,
+ (
+ metric_dict[PREV_KEY],
+ vset_i,
+ ),
+ )
setattr(new_vset, PREV_KEY, getattr(new_vset, FILTER_PREV_KEY))
vsets[i] = new_vset
if len(vsets) == 1:
@@ -247,22 +293,28 @@
TODO: generalize to multi-class classification
"""
- assert dict_keys(mean_preds) == dict_keys(std_preds), \
- "mean_preds and std_preds must share the same keys"
+ assert dict_keys(mean_preds) == dict_keys(
+ std_preds
+ ), "mean_preds and std_preds must share the same keys"
# match predictions on keys
- paired_preds = [[d[k] for d in (mean_preds, std_preds)] for k in dict_keys(mean_preds)]
- mean_preds, std_preds = (np.array(p)[:,:,1] for p in zip(*paired_preds))
+ paired_preds = [
+ [d[k] for d in (mean_preds, std_preds)] for k in dict_keys(mean_preds)
+ ]
+ mean_preds, std_preds = (np.array(p)[:, :, 1] for p in zip(*paired_preds))
if isinstance(true_labels, dict):
true_labels = dict_data(true_labels)
- assert len(true_labels) == 1, 'true_labels should have a single 1D vector entry'
+ assert len(true_labels) == 1, "true_labels should have a single 1D vector entry"
true_labels = true_labels[0]
n_obs = len(mean_preds[0])
- assert len(true_labels) == n_obs, \
- f'true_labels has {len(true_labels)} obs. but should have same as predictions ({n_obs})'
+ assert (
+ len(true_labels) == n_obs
+ ), f"true_labels has {len(true_labels)} obs. but should have same as predictions ({n_obs})"
sorted_idx = np.argsort(std_preds, axis=1)
- correct_labels = np.take_along_axis(np.around(mean_preds) - true_labels == 0, sorted_idx, 1)
+ correct_labels = np.take_along_axis(
+ np.around(mean_preds) - true_labels == 0, sorted_idx, 1
+ )
uncertainty = np.take_along_axis(std_preds, sorted_idx, 1)
- cum_acc = np.cumsum(correct_labels, axis=1) / range(1, n_obs+1)
+ cum_acc = np.cumsum(correct_labels, axis=1) / range(1, n_obs + 1)
return uncertainty, cum_acc, sorted_idx
@@ -327,16 +379,24 @@ def build_vset(name: str, func, param_dict=None, reps: int = 1,
- is_async: bool = False, output_matching: bool = False,
- lazy: bool = False, cache_dir: str = None,
- tracking_dir: str = None, **kwargs) -> Vset:
+def build_vset(
+ name: str,
+ func,
+ param_dict=None,
+ reps: int = 1,
+ is_async: bool = False,
+ output_matching: bool = False,
+ lazy: bool = False,
+ cache_dir: str = None,
+ tracking_dir: str = None,
+ **kwargs,
+) -> Vset:
"""Builds a new Vset by currying or instantiating callable `func` with all
combinations of parameters in `param_dict` and optional additional `**kwargs`.
- If `func` and `param_dict` are lists, then the ith entry of `func` will be
- curried with ith entry of `param_dict`. If only one of `func` or `param_dict`
+ If `func` and `param_dict` are lists, then the ith entry of `func` will be
+ curried with ith entry of `param_dict`. If only one of `func` or `param_dict`
is a list, the same `func`/`param_dict` will be curried for all entries in the
- list. Vfuncs are named with `param_dict` items as tuples of
+ list. Vfuncs are named with `param_dict` items as tuples of
str("param_name=param_val").
Parameters
@@ -347,7 +407,7 @@ Returns
A callable to use as the base for Vfuncs in the output Vset. Can also be
a class object, in which case the class is immediately instantiated with
the parameter combinations from `param_dict`. Can also be a list of
- callables, where the ith entry corresponds to `param_dict` or the ith
+ callables, where the ith entry corresponds to `param_dict` or the ith
entry of `param_dict` (if `param_dict` is a list).
param_dict : dict[str, list] or list[dict[str, list]], optional
A dict with string keys corresponding to argument names of `func` and
@@ -382,8 +442,9 @@ Returns
pd_list = []
if isinstance(func, list):
if isinstance(param_dict, list):
- assert len(param_dict) == len(func), \
- 'list of param_dicts must be same length as list of funcs'
+ assert len(param_dict) == len(
+ func
+ ), "list of param_dicts must be same length as list of funcs"
f_list.extend(func)
pd_list.extend(param_dict)
else:
@@ -395,29 +456,29 @@ Returns
else:
f_list.append(func)
pd_list.append(param_dict)
-
+
vfuncs = []
vkeys = []
for f, pd in zip(f_list, pd_list):
if pd is None:
pd = {}
- assert callable(f), 'func must be callable'
-
+ assert callable(f), "func must be callable"
+
kwargs_tuples = product(*list(pd.values()))
for tup in kwargs_tuples:
kwargs_dict = {}
- vkey_tup = (f'func={f.__name__}', )
+ vkey_tup = (f"func={f.__name__}",)
for param_name, param_val in zip(list(pd.keys()), tup):
kwargs_dict[param_name] = param_val
- vkey_tup += (f'{param_name}={param_val}', )
+ vkey_tup += (f"{param_name}={param_val}",)
# add additional fixed kwargs to kwargs_dict
for k, v in kwargs.items():
kwargs_dict[k] = v
for i in range(reps):
# add vfunc key to vkeys
if reps > 1:
- vkeys.append((f'rep={i}', ) + vkey_tup)
+ vkeys.append((f"rep={i}",) + vkey_tup)
else:
vkeys.append(vkey_tup)
# check if func is a class
@@ -426,13 +487,22 @@ Returns
vfuncs.append(Vfunc(vfunc=f(**kwargs_dict), name=str(vkey_tup)))
else:
# use partial to wrap func
- vfuncs.append(Vfunc(vfunc=partial(f, **kwargs_dict), name=str(vkey_tup)))
+ vfuncs.append(
+ Vfunc(vfunc=partial(f, **kwargs_dict), name=str(vkey_tup))
+ )
if all(pd is None for pd in pd_list) and reps == 1:
vkeys = None
-
- return Vset(name, vfuncs, is_async=is_async, vfunc_keys=vkeys,
- output_matching=output_matching, lazy=lazy,
- cache_dir=cache_dir, tracking_dir=tracking_dir)
+
+ return Vset(
+ name,
+ vfuncs,
+ is_async=is_async,
+ vfunc_keys=vkeys,
+ output_matching=output_matching,
+ lazy=lazy,
+ cache_dir=cache_dir,
+ tracking_dir=tracking_dir,
+ )
@@ -466,22 +536,28 @@ Params
TODO: generalize to multi-class classification
"""
- assert dict_keys(mean_preds) == dict_keys(std_preds), \
- "mean_preds and std_preds must share the same keys"
+ assert dict_keys(mean_preds) == dict_keys(
+ std_preds
+ ), "mean_preds and std_preds must share the same keys"
# match predictions on keys
- paired_preds = [[d[k] for d in (mean_preds, std_preds)] for k in dict_keys(mean_preds)]
- mean_preds, std_preds = (np.array(p)[:,:,1] for p in zip(*paired_preds))
+ paired_preds = [
+ [d[k] for d in (mean_preds, std_preds)] for k in dict_keys(mean_preds)
+ ]
+ mean_preds, std_preds = (np.array(p)[:, :, 1] for p in zip(*paired_preds))
if isinstance(true_labels, dict):
true_labels = dict_data(true_labels)
- assert len(true_labels) == 1, 'true_labels should have a single 1D vector entry'
+ assert len(true_labels) == 1, "true_labels should have a single 1D vector entry"
true_labels = true_labels[0]
n_obs = len(mean_preds[0])
- assert len(true_labels) == n_obs, \
- f'true_labels has {len(true_labels)} obs. but should have same as predictions ({n_obs})'
+ assert (
+ len(true_labels) == n_obs
+ ), f"true_labels has {len(true_labels)} obs. but should have same as predictions ({n_obs})"
sorted_idx = np.argsort(std_preds, axis=1)
- correct_labels = np.take_along_axis(np.around(mean_preds) - true_labels == 0, sorted_idx, 1)
+ correct_labels = np.take_along_axis(
+ np.around(mean_preds) - true_labels == 0, sorted_idx, 1
+ )
uncertainty = np.take_along_axis(std_preds, sorted_idx, 1)
- cum_acc = np.cumsum(correct_labels, axis=1) / range(1, n_obs+1)
+ cum_acc = np.cumsum(correct_labels, axis=1) / range(1, n_obs + 1)
return uncertainty, cum_acc, sorted_idx
@@ -519,9 +595,15 @@ def filter_vset_by_metric(metric_dict: dict, vset: Vset, *vsets: Vset, n_keep: int = 1,
- bigger_is_better: bool = True, filter_on=None,
- group: bool = False) -> Union[Vset, list]:
+def filter_vset_by_metric(
+ metric_dict: dict,
+ vset: Vset,
+ *vsets: Vset,
+ n_keep: int = 1,
+ bigger_is_better: bool = True,
+ filter_on=None,
+ group: bool = False,
+) -> Union[Vset, list]:
"""Returns a new Vset by filtering `vset.vfuncs` based on values in filter_dict.
Parameters
@@ -556,28 +638,45 @@ Returns
vset_names = []
for vset_i in vsets:
if vset_i.name not in df.columns:
- raise ValueError((f'{vset_i.name} should be one '
- 'of the columns of dict_to_df(metric_dict)'))
+ raise ValueError(
+ (
+ f"{vset_i.name} should be one "
+ "of the columns of dict_to_df(metric_dict)"
+ )
+ )
vset_names.append(vset_i.name)
if len(filter_on) > 0:
filter_col = list(metric_dict.keys())[0][-1].origin
df = df[df[filter_col].isin(filter_on)]
if group:
- df = df.groupby(by=vset_names, as_index=False).mean()
+ df = df.groupby(by=vset_names, as_index=False).mean(numeric_only=True)
if bigger_is_better:
- df = df.sort_values(by='out', ascending=False)
+ df = df.sort_values(by="out", ascending=False)
else:
- df = df.sort_values(by='out')
+ df = df.sort_values(by="out")
df = df.iloc[0:n_keep]
for i, vset_i in enumerate(vsets):
vfuncs = vset_i.vfuncs
vfunc_filter = [str(name) for name in df[vset_i.name].to_numpy()]
new_vfuncs = {k: v for k, v in vfuncs.items() if str(v.name) in vfunc_filter}
tracking_dir = None if vset_i._mlflow is None else mlflow.get_tracking_uri()
- new_vset = Vset('filtered_' + vset_i.name, new_vfuncs, is_async=vset_i._async,
- output_matching=vset_i._output_matching, lazy=vset_i._lazy,
- cache_dir=vset_i._cache_dir, tracking_dir=tracking_dir)
- setattr(new_vset, FILTER_PREV_KEY, (metric_dict[PREV_KEY], vset_i,))
+ new_vset = Vset(
+ "filtered_" + vset_i.name,
+ new_vfuncs,
+ is_async=vset_i._async,
+ output_matching=vset_i._output_matching,
+ lazy=vset_i._lazy,
+ cache_dir=vset_i._cache_dir,
+ tracking_dir=tracking_dir,
+ )
+ setattr(
+ new_vset,
+ FILTER_PREV_KEY,
+ (
+ metric_dict[PREV_KEY],
+ vset_i,
+ ),
+ )
setattr(new_vset, PREV_KEY, getattr(new_vset, FILTER_PREV_KEY))
vsets[i] = new_vset
if len(vsets) == 1:
@@ -608,15 +707,19 @@ Parameters
given names for each of the arguments in the tuple
"""
if names is None:
- names = ['start'] * len(args_tuple)
+ names = ["start"] * len(args_tuple)
else:
- assert len(names) == len(args_tuple), 'names should be same length as args_tuple'
+ assert len(names) == len(
+ args_tuple
+ ), "names should be same length as args_tuple"
output_dicts = []
for i, _ in enumerate(args_tuple):
- output_dicts.append({
- (Subkey(names[i], 'init'),): args_tuple[i],
- PREV_KEY: ('init',),
- })
+ output_dicts.append(
+ {
+ (Subkey(names[i], "init"),): args_tuple[i],
+ PREV_KEY: ("init",),
+ }
+ )
return output_dicts
diff --git a/docs/index.html b/docs/index.html
index 984bcf8..b037398 100644
--- a/docs/index.html
+++ b/docs/index.html
@@ -68,47 +68,49 @@ Why use vflowvflow
.
import sklearn
+from sklearn.datasets import make_classification
+from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, balanced_accuracy_score
-from vflow import init_args, Vset
+from sklearn.model_selection import train_test_split
+from sklearn.tree import DecisionTreeClassifier
+
+from vflow import Vset, init_args
# initialize data
-X, y = sklearn.datasets.make_classification()
+X, y = make_classification()
X_train, X_test, y_train, y_test = init_args(
- sklearn.model_selection.train_test_split(X, y),
- names=['X_train', 'X_test', 'y_train', 'y_test'] # optionally name the args
+ train_test_split(X, y),
+ names=["X_train", "X_test", "y_train", "y_test"], # optionally name the args
)
# subsample data
-subsampling_funcs = [
- sklearn.utils.resample for _ in range(3)
-]
-subsampling_set = Vset(name='subsampling',
- modules=subsampling_funcs,
- output_matching=True)
+subsampling_funcs = [sklearn.utils.resample for _ in range(3)]
+subsampling_set = Vset(
+ name="subsampling", vfuncs=subsampling_funcs, output_matching=True
+)
X_trains, y_trains = subsampling_set(X_train, y_train)
# fit models
-models = [
- sklearn.linear_model.LogisticRegression(),
- sklearn.tree.DecisionTreeClassifier()
-]
-modeling_set = Vset(name='modeling',
- modules=models,
- module_keys=["LR", "DT"])
+models = [LogisticRegression(), DecisionTreeClassifier()]
+modeling_set = Vset(name="modeling", vfuncs=models, vfunc_keys=["LR", "DT"])
modeling_set.fit(X_trains, y_trains)
preds_test = modeling_set.predict(X_test)
# get metrics
-binary_metrics_set = Vset(name='binary_metrics',
- modules=[accuracy_score, balanced_accuracy_score],
- module_keys=["Acc", "Bal_Acc"])
+binary_metrics_set = Vset(
+ name="binary_metrics",
+ vfuncs=[accuracy_score, balanced_accuracy_score],
+ vfunc_keys=["Acc", "Bal_Acc"],
+)
binary_metrics = binary_metrics_set.evaluate(preds_test, y_test)
Once we've written this pipeline, we can easily measure the stability of metrics (e.g. "Accuracy") to our choice of subsampling or model.
Documentation
See the docs for reference on the API
-Notebook examples (Note that some of these require more dependencies than just those required for vflow - to install all, use the notebooks
dependencies in the setup.py
file)
+Notebook examples
+Note that some of these require more dependencies than just those required for
+vflow
. To install all, run pip install vflow[nb]
.
@@ -117,7 +119,12 @@ Documentation
Installation
-Install with pip install vflow
(see here for help). For dev version (unstable), clone the repo and run python setup.py develop
from the repo directory.
+Stable version
+pip install vflow
+
+Development version (unstable)
+pip install vflow@git+https://github.com/Yu-Group/veridical-flow
+
References
- interface: easily build on scikit-learn and dvc (data version control)
@@ -141,11 +148,63 @@ References
"""
.. include:: ../README.md
"""
-from .vfunc import *
-from .vset import *
-from .pipeline import *
-from .utils import *
-from .helpers import *
+
+from .helpers import (
+ build_vset,
+ cum_acc_by_uncertainty,
+ filter_vset_by_metric,
+ init_args,
+)
+from .pipeline import PCSPipeline, build_graph
+from .subkey import Subkey
+from .utils import (
+ apply_vfuncs,
+ base_dict,
+ combine_dicts,
+ combine_keys,
+ dict_data,
+ dict_keys,
+ dict_to_df,
+ init_step,
+ perturbation_stats,
+ sep_dicts,
+ to_list,
+ to_tuple,
+)
+from .vfunc import AsyncVfunc, Vfunc, VfuncPromise
+from .vset import Vset
+
+__all__ = [
+ # vflow.helpers
+ "init_args",
+ "build_vset",
+ "filter_vset_by_metric",
+ "cum_acc_by_uncertainty",
+ # vflow.pipeline
+ "PCSPipeline",
+ "build_graph",
+ # vflow.subkey
+ "Subkey",
+ # vflow.utils
+ "apply_vfuncs",
+ "base_dict",
+ "combine_dicts",
+ "combine_keys",
+ "dict_data",
+ "dict_keys",
+ "dict_to_df",
+ "init_step",
+ "perturbation_stats",
+ "sep_dicts",
+ "to_list",
+ "to_tuple",
+ # vflow.vfunc
+ "Vfunc",
+ "AsyncVfunc",
+ "VfuncPromise",
+ # vflow.vset
+ "Vset",
+]
+def apply_vfuncs(vfuncs: dict, data_dict: dict, lazy: bool = False)
+
Apply a dictionary of functions vfuncs
to each item of data_dict
,
+optionally returning a dictionary of VfuncPromise
objects if lazy
is True
Output keys are determined by applying combine_keys()
to each pair of items from
+vfuncs
and data_dict
. This function is used by all Vsets to apply functions.
vfuncs
: dict
data_dict
.data_dict
: dict
vfuncs
.lazy
: bool (option)
, default False
vfuncs
are applied lazily, returning VfuncPromise
+objects,out_dict
: dict
vfuncs
to data_dict
.def apply_vfuncs(vfuncs: dict, data_dict: dict, lazy: bool = False):
+ """Apply a dictionary of functions `vfuncs` to each item of `data_dict`,
+ optionally returning a dictionary of `vflow.vfunc.VfuncPromise` objects if `lazy` is True
+
+ Output keys are determined by applying `combine_keys` to each pair of items from
+ `vfuncs` and `data_dict`. This function is used by all Vsets to apply functions.
+
+ Parameters
+ ----------
+ vfuncs: dict
+ Dictionary of functions to apply to `data_dict`.
+ data_dict: dict
+ Dictionary of parameters to call each function in `vfuncs`.
+ lazy: bool (option), default False
+ If True, `vfuncs` are applied lazily, returning `vflow.vfunc.VfuncPromise`
+ objects,
+
+ Returns
+ -------
+ out_dict: dict
+ Output dictionary of applying `vfuncs` to `data_dict`.
+ """
+ out_dict = {}
+ for vf_k in vfuncs:
+ if len(data_dict) == 0:
+ func = deepcopy(vfuncs[vf_k])
+ if lazy:
+ out_dict[vf_k] = VfuncPromise(func)
+ else:
+ out_dict[vf_k] = func()
+ for data_k in data_dict:
+ if PREV_KEY in (vf_k, data_k):
+ continue
+
+ combined_key = combine_keys(data_k, vf_k)
+
+ if not len(combined_key) > 0:
+ continue
+
+ func = deepcopy(vfuncs[vf_k])
+ if lazy:
+ # return a promise
+ out_dict[combined_key] = VfuncPromise(func, *data_dict[data_k])
+ else:
+ data_list = list(data_dict[data_k])
+ for i, data in enumerate(data_list):
+ if isinstance(data, VfuncPromise):
+ data_list[i] = data()
+ if isinstance(func, RayRemoteFun) and not isinstance(
+ data_list[i], ray.ObjectRef
+ ):
+ # send data to Ray's remote object store
+ data_list[i] = ray.put(data_list[i])
+ elif isinstance(data_list[i], ray.ObjectRef):
+ # this is not a remote function so get the data
+ data_list[i] = ray.get(data_list[i])
+ out_dict[combined_key] = func(*data_list)
+
+ return out_dict
+
+def base_dict(d: dict)
+
Remove PREV_KEY from dict d if present
def base_dict(d: dict):
+ """Remove PREV_KEY from dict d if present"""
+ return {k: v for k, v in d.items() if k != PREV_KEY}
+
+def build_graph(node, draw=True)
+
Helper function that just calls build_graph_recur with an empty graph
+node
: dict
or Vset
G
: nx.Digraph()
def build_graph(node, draw=True):
+ """Helper function that just calls build_graph_recur with an empty graph
+
+ Parameters
+ ----------
+ node: dict or Vset
+
+ Returns
+ -------
+ G: nx.Digraph()
+ """
+
+ def unnest_node(node):
+ """Unnest a node, if necessary (i.e., when node is a tuple)
+
+ Parameters
+ ----------
+ node: str, dict, Vset, or tuple
+
+ Returns
+ -------
+ unnested_node: str, Vset, or None
+ """
+ node_type = type(node)
+ if node_type is str or "Vset" in str(node_type):
+ return node
+ if node_type is tuple:
+ return unnest_node(node[0])
+ return None
+
+ def build_graph_recur(node, G):
+ """Builds a graph up using __prev__ and PREV_KEY pointers
+
+ Parameters
+ ----------
+ node: str, dict, Vset, or tuple
+ G: nx.Digraph()
+
+ Returns
+ -------
+ G: nx.Digraph()
+ """
+ # base case: reached starting node
+ if isinstance(node, str):
+ return G
+
+ # initial case: starting at dict
+ if isinstance(node, dict):
+ s_node = "End"
+ nodes_prev = node[PREV_KEY]
+ G.add_edge(nodes_prev[0], s_node)
+ for node_prev in nodes_prev[1:]:
+ G.add_edge(unnest_node(node_prev), nodes_prev[0])
+ G = build_graph_recur(node_prev, G)
+ return G
+
+ # main case: at a vfuncset
+ if "Vset" in str(type(node)):
+ if hasattr(node, PREV_KEY):
+ nodes_prev = getattr(node, PREV_KEY)
+ for node_prev in nodes_prev:
+ G.add_edge(unnest_node(node_prev), node)
+ G = build_graph_recur(node_prev, G)
+ return G
+
+ # nested prev key case
+ if isinstance(node, tuple):
+ func_node = unnest_node(node[0])
+ G = build_graph_recur(func_node, G)
+ for arg_node in node[1:]:
+ G.add_edge(unnest_node(arg_node), func_node)
+ G = build_graph_recur(arg_node, G)
+ return G
+
+ return G
+
+ G = nx.DiGraph()
+ G = build_graph_recur(node, G)
+ if draw:
+ nx.draw(G, with_labels=True, node_color="#CCCCCC")
+ return G
+
+def build_vset(name: str, func, param_dict=None, reps: int = 1, is_async: bool = False, output_matching: bool = False, lazy: bool = False, cache_dir: str = None, tracking_dir: str = None, **kwargs) ‑> Vset
+
Builds a new Vset by currying or instantiating callable func
with all
+combinations of parameters in param_dict
and optional additional **kwargs
.
+If func
and param_dict
are lists, then the ith entry of func
will be
+curried with ith entry of param_dict
. If only one of func
or param_dict
+is a list, the same func
/param_dict
will be curried for all entries in the
+list. Vfuncs are named with param_dict
items as tuples of
+str("param_name=param_val").
name
: str
func
: callable
or list[callable]
param_dict
. Can also be a list of
+callables, where the ith entry corresponds to param_dict
or the ith
+entry of param_dict
(if param_dict
is a list).param_dict
: dict[str, list]
or list[dict[str, list]]
, optionalfunc
and
+entries which are lists of values to pass to func
at run time (or when
+instantiating func
if it's a class object). Can also be a list of
+dicts, where the ith dict entry corresponds to func
or the ith entry
+of func
(if func
is a list). If no parameters are required for the
+ith function, the ith entry of param_dict
can be None
.reps
: int
, optionalfunc
in the output Vset's vfuncs for
+each combination of the parameters in param_dict
.is_async
: bool
, optionaloutput_matching
: bool
, optionalcache_dir
: str
, optionalcache_dir
as the data store for
+joblib.Memory.tracking_dir
: str
, optional**kwargs
func
.new_vset
: Vset
def build_vset(
+ name: str,
+ func,
+ param_dict=None,
+ reps: int = 1,
+ is_async: bool = False,
+ output_matching: bool = False,
+ lazy: bool = False,
+ cache_dir: str = None,
+ tracking_dir: str = None,
+ **kwargs,
+) -> Vset:
+ """Builds a new Vset by currying or instantiating callable `func` with all
+ combinations of parameters in `param_dict` and optional additional `**kwargs`.
+ If `func` and `param_dict` are lists, then the ith entry of `func` will be
+ curried with ith entry of `param_dict`. If only one of `func` or `param_dict`
+ is a list, the same `func`/`param_dict` will be curried for all entries in the
+ list. Vfuncs are named with `param_dict` items as tuples of
+ str("param_name=param_val").
+
+ Parameters
+ ----------
+ name : str
+ A name for the output Vset.
+ func : callable or list[callable]
+ A callable to use as the base for Vfuncs in the output Vset. Can also be
+ a class object, in which case the class is immediately instantiated with
+ the parameter combinations from `param_dict`. Can also be a list of
+ callables, where the ith entry corresponds to `param_dict` or the ith
+ entry of `param_dict` (if `param_dict` is a list).
+ param_dict : dict[str, list] or list[dict[str, list]], optional
+ A dict with string keys corresponding to argument names of `func` and
+ entries which are lists of values to pass to `func` at run time (or when
+ instantiating `func` if it's a class object). Can also be a list of
+ dicts, where the ith dict entry corresponds to `func` or the ith entry
+ of `func` (if `func` is a list). If no parameters are required for the
+ ith function, the ith entry of `param_dict` can be `None`.
+ reps : int, optional
+ The number of times to repeat `func` in the output Vset's vfuncs for
+ each combination of the parameters in `param_dict`.
+ is_async : bool, optional
+ If True, vfuncs are computed asynchronously.
+ output_matching : bool, optional
+ If True, then output keys from Vset will be matched when used in other
+ Vsets.
+ cache_dir : str, optional
+ If provided, do caching and use `cache_dir` as the data store for
+ joblib.Memory.
+ tracking_dir : str, optional
+ If provided, use the mlflow.tracking API to log outputs as metrics with
+ parameters determined by input keys.
+ **kwargs
+ Additional fixed keyword arguments to pass to `func`.
+
+ Returns
+ -------
+ new_vset : vflow.vset.Vset
+
+ """
+ f_list = []
+ pd_list = []
+ if isinstance(func, list):
+ if isinstance(param_dict, list):
+ assert len(param_dict) == len(
+ func
+ ), "list of param_dicts must be same length as list of funcs"
+ f_list.extend(func)
+ pd_list.extend(param_dict)
+ else:
+ pd_list.extend([param_dict] * len(func))
+ f_list.extend(func)
+ elif isinstance(param_dict, list):
+ f_list.extend([func] * len(param_dict))
+ pd_list.extend(param_dict)
+ else:
+ f_list.append(func)
+ pd_list.append(param_dict)
+
+ vfuncs = []
+ vkeys = []
+
+ for f, pd in zip(f_list, pd_list):
+ if pd is None:
+ pd = {}
+ assert callable(f), "func must be callable"
+
+ kwargs_tuples = product(*list(pd.values()))
+ for tup in kwargs_tuples:
+ kwargs_dict = {}
+ vkey_tup = (f"func={f.__name__}",)
+ for param_name, param_val in zip(list(pd.keys()), tup):
+ kwargs_dict[param_name] = param_val
+ vkey_tup += (f"{param_name}={param_val}",)
+ # add additional fixed kwargs to kwargs_dict
+ for k, v in kwargs.items():
+ kwargs_dict[k] = v
+ for i in range(reps):
+ # add vfunc key to vkeys
+ if reps > 1:
+ vkeys.append((f"rep={i}",) + vkey_tup)
+ else:
+ vkeys.append(vkey_tup)
+ # check if func is a class
+ if isinstance(f, type):
+ # instantiate func
+ vfuncs.append(Vfunc(vfunc=f(**kwargs_dict), name=str(vkey_tup)))
+ else:
+ # use partial to wrap func
+ vfuncs.append(
+ Vfunc(vfunc=partial(f, **kwargs_dict), name=str(vkey_tup))
+ )
+ if all(pd is None for pd in pd_list) and reps == 1:
+ vkeys = None
+
+ return Vset(
+ name,
+ vfuncs,
+ is_async=is_async,
+ vfunc_keys=vkeys,
+ output_matching=output_matching,
+ lazy=lazy,
+ cache_dir=cache_dir,
+ tracking_dir=tracking_dir,
+ )
+
+def combine_dicts(*args: dict, base_case=True)
+
Combines any number of dictionaries into a single dictionary. Dictionaries
+are combined left to right matching all keys according to combine_keys()
*args
: dict
combined_dict
: dict
def combine_dicts(*args: dict, base_case=True):
+ """Combines any number of dictionaries into a single dictionary. Dictionaries
+ are combined left to right matching all keys according to `combine_keys`
+
+ Parameters
+ ----------
+ *args: dict
+ Dictionaries to recursively combine left to right.
+
+ Returns
+ -------
+ combined_dict: dict
+ Combined dictionary.
+ """
+ n_args = len(args)
+ combined_dict = {}
+ if n_args == 0:
+ return combined_dict
+ if n_args == 1:
+ for k in args[0]:
+ # wrap the dict values in tuples; this is helpful so that when we
+ # pass the values to a vfunc fun in we can just use * expansion
+ if k != PREV_KEY:
+ combined_dict[k] = (args[0][k],)
+ else:
+ combined_dict[k] = args[0][k]
+ return combined_dict
+ if n_args == 2:
+ for k0 in args[0]:
+ for k1 in args[1]:
+ if PREV_KEY in (k0, k1):
+ continue
+
+ combined_key = combine_keys(k0, k1)
+
+ if len(combined_key) > 0:
+ if base_case:
+ combined_dict[combined_key] = (args[0][k0], args[1][k1])
+ else:
+ combined_dict[combined_key] = args[0][k0] + (args[1][k1],)
+
+ return combined_dict
+ # combine the first two dicts and call recursively with remaining args
+ return combine_dicts(combine_dicts(args[0], args[1]), *args[2:], base_case=False)
+
+def combine_keys(left_key, right_key)
+
Combines left_key
and right_key
, attempting to match on any Subkey
where
+Subkey.is_matching()
is True
.
Returns an empty key on failed matches when
+Subkey.mismatches()
is True
. Always filters on right_key
+and returns combined_key
with left_key
prefix.
left_key
: tuple
right_key
: tuple
combined_key
: tuple
Subkey.matches()
rules,
+which is empty according to Subkey.mismatches()
rule.def combine_keys(left_key, right_key):
+ """Combines `left_key` and `right_key`, attempting to match on any `Subkey` where
+ `vflow.subkey.Subkey.is_matching` is `True`.
+
+ Returns an empty key on failed matches when
+ `vflow.subkey.Subkey.mismatches` is `True`. Always filters on `right_key`
+ and returns `combined_key` with `left_key` prefix.
+
+ Parameters
+ ----------
+ left_key: tuple
+ Left tuple key to combine.
+ right_key: tuple
+ Right tuple key to combine.
+
+ Returns
+ -------
+ combined_key: tuple
+ Combined tuple key filtered according to `vflow.subkey.Subkey.matches` rules,
+ which is empty according to `vflow.subkey.Subkey.mismatches` rule.
+
+ """
+ if len(left_key) < len(right_key):
+ match_key = left_key
+ compare_key = right_key
+ else:
+ match_key = right_key
+ compare_key = left_key
+ match_subkeys = [subkey for subkey in match_key if subkey.is_matching()]
+ if len(match_subkeys) > 0:
+ matched_subkeys = []
+ for subkey in match_subkeys:
+ for c_subkey in compare_key:
+ if subkey.matches(c_subkey):
+ matched_subkeys.append(subkey)
+ break
+ if subkey.mismatches(c_subkey):
+ # subkeys with same origin but different values are rejected
+ return ()
+ if len(matched_subkeys) > 0:
+ # always filter on right key
+ filtered_key = tuple(
+ subkey for subkey in right_key if subkey not in matched_subkeys
+ )
+ combined_key = left_key + filtered_key
+ return combined_key
+ return left_key + right_key
+ return left_key + right_key
+
+def cum_acc_by_uncertainty(mean_preds, std_preds, true_labels)
+
Returns uncertainty and cumulative accuracy for grouped class predictions, +sorted in increasing order of uncertainty
+mean_preds: dict +mean predictions, output from Vset.predict_with_uncertainties +std_preds: dict +std predictions, output from Vset.predict_with_uncertainties +true_labels: dict or list-like
+TODO: generalize to multi-class classification
def cum_acc_by_uncertainty(mean_preds, std_preds, true_labels):
+ """Returns uncertainty and cumulative accuracy for grouped class predictions,
+ sorted in increasing order of uncertainty
+
+ Params
+ ------
+ mean_preds: dict
+ mean predictions, output from Vset.predict_with_uncertainties
+ std_preds: dict
+ std predictions, output from Vset.predict_with_uncertainties
+ true_labels: dict or list-like
+
+ TODO: generalize to multi-class classification
+ """
+ assert dict_keys(mean_preds) == dict_keys(
+ std_preds
+ ), "mean_preds and std_preds must share the same keys"
+ # match predictions on keys
+ paired_preds = [
+ [d[k] for d in (mean_preds, std_preds)] for k in dict_keys(mean_preds)
+ ]
+ mean_preds, std_preds = (np.array(p)[:, :, 1] for p in zip(*paired_preds))
+ if isinstance(true_labels, dict):
+ true_labels = dict_data(true_labels)
+ assert len(true_labels) == 1, "true_labels should have a single 1D vector entry"
+ true_labels = true_labels[0]
+ n_obs = len(mean_preds[0])
+ assert (
+ len(true_labels) == n_obs
+ ), f"true_labels has {len(true_labels)} obs. but should have same as predictions ({n_obs})"
+ sorted_idx = np.argsort(std_preds, axis=1)
+ correct_labels = np.take_along_axis(
+ np.around(mean_preds) - true_labels == 0, sorted_idx, 1
+ )
+ uncertainty = np.take_along_axis(std_preds, sorted_idx, 1)
+ cum_acc = np.cumsum(correct_labels, axis=1) / range(1, n_obs + 1)
+ return uncertainty, cum_acc, sorted_idx
+
+def dict_data(d: dict)
+
Returns a list containing all data in dict d
def dict_data(d: dict):
+ """Returns a list containing all data in dict d"""
+ return list(base_dict(d).values())
+
+def dict_keys(d: dict)
+
Returns a list containing all keys in dict d
def dict_keys(d: dict):
+ """Returns a list containing all keys in dict d"""
+ return list(base_dict(d).keys())
+
+def dict_to_df(d: dict, param_key=None)
+
Converts a dictionary with tuple keys
+into a pandas DataFrame, optionally seperating
+parameters in param_key
if not None
d
: dict
param_key
: str (optional)
, default None
df
: pandas.DataFrame
d
tuple keys seperated into columns.def dict_to_df(d: dict, param_key=None):
+ """Converts a dictionary with tuple keys
+ into a pandas DataFrame, optionally seperating
+ parameters in `param_key` if not None
+
+ Parameters
+ ----------
+ d: dict
+ Output dictionary with tuple keys from a Vset.
+ param_key: str (optional), default None
+ Name of parameter to seperate into multiple columns.
+
+ Returns
+ -------
+ df: pandas.DataFrame
+ A DataFrame with `d` tuple keys seperated into columns.
+ """
+ d_copy = {tuple(sk.value for sk in k): d[k] for k in d if k != PREV_KEY}
+ df = pd.Series(d_copy).reset_index()
+ if len(d_copy.keys()) > 0:
+ key_list = list(d.keys())
+ subkey_list = key_list[0] if key_list[0] != PREV_KEY else key_list[1]
+ cols = [sk.origin for sk in subkey_list] + ["out"]
+ # set each init col to init-{next_vfunc_set}
+ cols = [
+ c if c != "init" else init_step(idx, cols) for idx, c in enumerate(cols)
+ ]
+ df = df.set_axis(cols, axis=1)
+ if param_key:
+ param_keys = df[
+ param_key
+ ].tolist() # pylint: disable=unsubscriptable-object
+ if param_key == "out" and hasattr(param_keys[0], "__iter__"):
+ param_df = pd.DataFrame(param_keys)
+ param_df.columns = [f"{param_key}-{col}" for col in param_df.columns]
+ df = df.join(param_df)
+ else:
+ param_loc = df.columns.get_loc(param_key)
+ param_key_cols = [
+ f"{p.split('=')[0]}-{param_key}" for p in param_keys[0]
+ ]
+ param_keys = [[s.split("=")[1] for s in t] for t in param_keys]
+ df = df.join(pd.DataFrame(param_keys)).drop(columns=param_key)
+ new_cols = df.columns[: len(cols) - 1].tolist() + param_key_cols
+ df = df.set_axis(new_cols, axis=1)
+ new_idx = list(range(len(new_cols)))
+ new_idx = (
+ new_idx[:param_loc]
+ + new_idx[len(cols) - 1 :]
+ + new_idx[param_loc : len(cols) - 1]
+ )
+ df = df.iloc[:, new_idx]
+ return df
+
+def filter_vset_by_metric(metric_dict: dict, vset: Vset, *vsets: Vset, n_keep: int = 1, bigger_is_better: bool = True, filter_on=None, group: bool = False) ‑> Union[Vset, list]
+
Returns a new Vset by filtering vset.vfuncs
based on values in filter_dict.
metric_dict
: dict
vset.vfuncs
vset
: Vset
*vsets
: Vset
n_keep
: int (optional)
vset.vfuncs
bigger_is_better
: bool (optional)
n_keep
largest values are retainedfilter_on
: list[str] (optional)
metric_dict
, you can specify a subset
+to considergroup
: bool (optional)
metric_dict
by the
+input Vset names*new_vset
: Vset
def filter_vset_by_metric(
+ metric_dict: dict,
+ vset: Vset,
+ *vsets: Vset,
+ n_keep: int = 1,
+ bigger_is_better: bool = True,
+ filter_on=None,
+ group: bool = False,
+) -> Union[Vset, list]:
+ """Returns a new Vset by filtering `vset.vfuncs` based on values in filter_dict.
+
+ Parameters
+ ----------
+ metric_dict: dict
+ output from a Vset, typically with metrics or other numeric values to use when
+ filtering `vset.vfuncs`
+ vset: Vset
+ a Vsets
+ *vsets: Vset
+ zero or more additional Vsets
+ n_keep: int (optional)
+ number of entries to keep from `vset.vfuncs`
+ bigger_is_better: bool (optional)
+ if True, then the top `n_keep` largest values are retained
+ filter_on: list[str] (optional)
+ if there are multiple metrics in `metric_dict`, you can specify a subset
+ to consider
+ group: bool (optional)
+ if True, average metrics after grouping values in `metric_dict` by the
+ input Vset names
+
+ Returns
+ -------
+ *new_vset : Vset
+ Copies of the input Vsets but with Vfuncs filtered based on metrics
+ """
+ if filter_on is None:
+ filter_on = []
+ df = dict_to_df(metric_dict)
+ vsets = [vset, *vsets]
+ vset_names = []
+ for vset_i in vsets:
+ if vset_i.name not in df.columns:
+ raise ValueError(
+ (
+ f"{vset_i.name} should be one "
+ "of the columns of dict_to_df(metric_dict)"
+ )
+ )
+ vset_names.append(vset_i.name)
+ if len(filter_on) > 0:
+ filter_col = list(metric_dict.keys())[0][-1].origin
+ df = df[df[filter_col].isin(filter_on)]
+ if group:
+ df = df.groupby(by=vset_names, as_index=False).mean(numeric_only=True)
+ if bigger_is_better:
+ df = df.sort_values(by="out", ascending=False)
+ else:
+ df = df.sort_values(by="out")
+ df = df.iloc[0:n_keep]
+ for i, vset_i in enumerate(vsets):
+ vfuncs = vset_i.vfuncs
+ vfunc_filter = [str(name) for name in df[vset_i.name].to_numpy()]
+ new_vfuncs = {k: v for k, v in vfuncs.items() if str(v.name) in vfunc_filter}
+ tracking_dir = None if vset_i._mlflow is None else mlflow.get_tracking_uri()
+ new_vset = Vset(
+ "filtered_" + vset_i.name,
+ new_vfuncs,
+ is_async=vset_i._async,
+ output_matching=vset_i._output_matching,
+ lazy=vset_i._lazy,
+ cache_dir=vset_i._cache_dir,
+ tracking_dir=tracking_dir,
+ )
+ setattr(
+ new_vset,
+ FILTER_PREV_KEY,
+ (
+ metric_dict[PREV_KEY],
+ vset_i,
+ ),
+ )
+ setattr(new_vset, PREV_KEY, getattr(new_vset, FILTER_PREV_KEY))
+ vsets[i] = new_vset
+ if len(vsets) == 1:
+ return vsets[0]
+ return vsets
+
+def init_args(args_tuple: Union[tuple, list], names=None)
+
Converts tuple of arguments to a list of dicts
+names
: list-like (optional)
, default None
def init_args(args_tuple: Union[tuple, list], names=None):
+ """Converts tuple of arguments to a list of dicts
+
+ Parameters
+ ----------
+ names: list-like (optional), default None
+ given names for each of the arguments in the tuple
+ """
+ if names is None:
+ names = ["start"] * len(args_tuple)
+ else:
+ assert len(names) == len(
+ args_tuple
+ ), "names should be same length as args_tuple"
+ output_dicts = []
+ for i, _ in enumerate(args_tuple):
+ output_dicts.append(
+ {
+ (Subkey(names[i], "init"),): args_tuple[i],
+ PREV_KEY: ("init",),
+ }
+ )
+ return output_dicts
+
+def init_step(idx, cols)
+
Helper function to find init suffix +in a column
+idx
: int
cols
: list[str]
def init_step(idx, cols):
+ """Helper function to find init suffix
+ in a column
+
+ Parameters
+ ----------
+ idx: int
+ Index of 'init' column in cols.
+ cols: list[str]
+ List of column names.
+ """
+ for i in range(idx, len(cols)):
+ if cols[i] != "init":
+ return "init-" + cols[i]
+ return None
+
+def perturbation_stats(data: Union[pandas.core.frame.DataFrame, dict], *group_by: str, wrt: str = 'out', func=None, prefix: str = None, split: bool = False)
+
Compute statistics for wrt
in data
, conditional on group_by
data
: Union[pandas.DataFrame, dict]
dict_to_df()
on an output dict from a Vset,
+or the output dict itself.*group_by
: str
data
to group on. If none provided, treats everything as one big
+group.wrt
: str (optional)
data
or dict_to_df()(data)
on which to compute statistics.
+Defaults to 'out'
, the values of the original Vset output dict.func
: function, str, list
or dict (optional)
, default None
None
, defaults to
+['count', 'mean', 'std']
.prefix
: str (optional)
, default None
None
,
+uses the value of wrt
.split
: bool (optional)
, default False
True
and wrt
in data
has list
or numpy.ndarray
entries, will
+attempt to split the entries into multiple columns for the output.df
: pandas.DataFrame
wrt
.def perturbation_stats(
+ data: Union[pd.DataFrame, dict],
+ *group_by: str,
+ wrt: str = "out",
+ func=None,
+ prefix: str = None,
+ split: bool = False,
+):
+ """Compute statistics for `wrt` in `data`, conditional on `group_by`
+
+ Parameters
+ ----------
+ data: Union[pandas.DataFrame, dict]
+ DataFrame, as from calling `dict_to_df` on an output dict from a Vset,
+ or the output dict itself.
+ *group_by: str
+ Vset names in `data` to group on. If none provided, treats everything as one big
+ group.
+ wrt: str (optional)
+ Column name in `data` or `dict_to_df(data)` on which to compute statistics.
+ Defaults to `'out'`, the values of the original Vset output dict.
+ func: function, str, list or dict (optional), default None
+ A list of functions or function names to use for computing
+ statistics, analogous to the parameter of the same name in
+ pandas.core.groupby.DataFrameGroupBy.aggregate. If `None`, defaults to
+ `['count', 'mean', 'std']`.
+ prefix: str (optional), default None
+ A string to prefix to new columns in output DataFrame. If `None`,
+ uses the value of `wrt`.
+ split: bool (optional), default False
+ If `True` and `wrt` in `data` has `list` or `numpy.ndarray` entries, will
+ attempt to split the entries into multiple columns for the output.
+
+ Returns
+ -------
+ df: pandas.DataFrame
+ A DataFrame with summary statistics on `wrt`.
+ """
+ if func is None:
+ func = ["count", "mean", "std"]
+ if prefix is None:
+ prefix = wrt
+ if isinstance(data, dict):
+ df = dict_to_df(data)
+ else:
+ df = data
+ group_by = list(group_by)
+ if len(group_by) > 0:
+ gb = df.groupby(group_by)[wrt]
+ else:
+ gb = df.groupby(lambda x: True)[wrt]
+ if (isinstance(func, list) and "mean" in func or "std" in func) and (
+ type(df[wrt].iloc[0]) in [list, np.ndarray]
+ ):
+ wrt_arrays = [
+ np.stack(d.tolist()) for d in (gb.get_group(grp) for grp in gb.groups)
+ ]
+ n_cols = wrt_arrays[0].shape[1]
+ df_out = pd.DataFrame(gb.agg("count"))
+ df_out.columns = [f"{prefix}-count"]
+ if "mean" in func:
+ if split:
+ col_means = [arr.mean(axis=0) for arr in wrt_arrays]
+ wrt_means = pd.DataFrame(
+ col_means,
+ columns=[f"{prefix}{i}-mean" for i in range(n_cols)],
+ index=gb.groups.keys(),
+ )
+ else:
+ col_means = [{f"{prefix}-mean": arr.mean(axis=0)} for arr in wrt_arrays]
+ wrt_means = pd.DataFrame(col_means, index=gb.groups.keys())
+ wrt_means.index.names = df_out.index.names
+ df_out = df_out.join(wrt_means)
+ if "std" in func:
+ if split:
+ col_stds = [arr.std(axis=0, ddof=1) for arr in wrt_arrays]
+ wrt_stds = pd.DataFrame(
+ col_stds,
+ columns=[f"{prefix}{i}-std" for i in range(n_cols)],
+ index=gb.groups.keys(),
+ )
+ else:
+ col_stds = [
+ {f"{prefix}-std": arr.std(axis=0, ddof=1)} for arr in wrt_arrays
+ ]
+ wrt_stds = pd.DataFrame(col_stds, index=gb.groups.keys())
+ wrt_stds.index.names = df_out.index.names
+ df_out = df_out.join(wrt_stds)
+ if "count" not in func:
+ df_out = df_out.drop(f"{prefix}-count")
+ else:
+ df_out = gb.agg(func)
+ df_out = df_out.reindex(sorted(df_out.columns), axis=1)
+ df_out.reset_index(inplace=True)
+ if len(group_by) > 0:
+ return df_out.sort_values(group_by[0])
+ return df_out
+
+def sep_dicts(d: dict, n_out: int = 1, keys=None)
+
Converts dictionary with value being saved as an iterable into multiple dictionaries
+Assumes every value has same length n_out
+d
: dict
n_out
: int
, default 1
keys
: list-like
, default None
sep_dicts_list
: list
>>> sep_dicts({k1: (x1, y1), k2: (x2, y2), ..., '__prev__': p})
+[{k1: x1, k2: x2, ..., '__prev__': p}, {k1: y1, k2: y2, ..., '__prev__': p}]
+
def sep_dicts(d: dict, n_out: int = 1, keys=None):
+ """Converts dictionary with value being saved as an iterable into multiple dictionaries
+
+ Assumes every value has same length n_out
+
+ Parameters
+ ----------
+ d: dict
+ Dictionary with iterable values to be converted.
+ n_out: int, default 1
+ The number of dictionaries to separate d into.
+ keys: list-like, default None
+ Optional list of keys to use in output dicts.
+
+ Returns
+ -------
+ sep_dicts_list: list
+ List of seperated dictionaries.
+
+ Examples
+ --------
+ >>> sep_dicts({k1: (x1, y1), k2: (x2, y2), ..., '__prev__': p})
+ [{k1: x1, k2: x2, ..., '__prev__': p}, {k1: y1, k2: y2, ..., '__prev__': p}]
+ """
+ if keys is None:
+ keys = []
+ if len(keys) > 0 and len(keys) != n_out:
+ raise ValueError(f"keys should be empty or have length n_out={n_out}")
+ # empty dict -- return empty dict
+ if n_out <= 1:
+ return d
+ # try separating dict into multiple dicts
+ sep_dicts_id = str(uuid4()) # w/ high prob, uuid4 is unique
+ sep_dicts_list = [{} for _ in range(n_out)]
+ for key, value in d.items():
+ if key != PREV_KEY:
+ for i in range(n_out):
+ # assumes the correct sub-key for item i is in the i-th position
+ if len(keys) == 0:
+ new_key = (key[i],) + key[n_out:]
+ else:
+ new_sub = Subkey(
+ value=keys[i], origin=key[-1].origin + "-" + str(i)
+ )
+ new_key = (new_sub,) + key
+ new_key[-1].sep_dicts_id = sep_dicts_id
+ if isinstance(value, VfuncPromise):
+ # return a promise to get the value at index i of the
+ # original promise
+ value_i = VfuncPromise(lambda v, x: v[x], value, i)
+ else:
+ value_i = value[i]
+ sep_dicts_list[i][new_key] = value_i
+
+ return sep_dicts_list
+
+def to_list(tup: tuple)
+
Convert from tuple to packed list
+Allows us to call function with arguments in a loop
+tup
: tuple
ValueError
>>> to_list(([x1, x2, x3], [y1, y2, y3]))
+[[x1, y1], [x2, y2], [x3, y3]]
+>>> to_list(([x1], [y1]))
+[[x1, y1]]
+>>> to_list(([x1, x2, x3], ))
+[[x1], [x2], [x3]]
+>>> to_list((x1, ))
+[[x1]]
+>>> to_list((x1, y1))
+[[x1, y1]]
+>>> to_list((x1, x2, x3, y1, y2, y3))
+[[x1, y1], [x2, y2], [x3, y3]]
+
def to_list(tup: tuple):
+ """Convert from tuple to packed list
+
+ Allows us to call function with arguments in a loop
+
+ Parameters
+ ----------
+ tup: tuple
+ tuple of objects to convert to packed list
+
+ Raises
+ ------
+ ValueError
+ If passed uneven number of arguments without a list. Please wrap your args in a list.
+
+ Examples
+ --------
+ >>> to_list(([x1, x2, x3], [y1, y2, y3]))
+ [[x1, y1], [x2, y2], [x3, y3]]
+ >>> to_list(([x1], [y1]))
+ [[x1, y1]]
+ >>> to_list(([x1, x2, x3], ))
+ [[x1], [x2], [x3]]
+ >>> to_list((x1, ))
+ [[x1]]
+ >>> to_list((x1, y1))
+ [[x1, y1]]
+ >>> to_list((x1, x2, x3, y1, y2, y3))
+ [[x1, y1], [x2, y2], [x3, y3]]
+ """
+ n_tup = len(tup)
+ if n_tup == 0:
+ return []
+ if not isinstance(tup[0], list):
+ # the first element is data
+ if n_tup == 1:
+ return [list(tup)]
+ if n_tup % 2 != 0:
+ raise ValueError(
+ "Don't know how to handle uneven number of args "
+ "without a list. Please wrap your args in a list."
+ )
+ # assume first half of args is input and second half is outcome
+ return [list(el) for el in zip(tup[: (n_tup // 2)], tup[(n_tup // 2) :])]
+ if n_tup == 1:
+ return [[x] for x in tup[0]]
+ n_mods = len(tup[0])
+ lists_packed = [[] for _ in range(n_mods)]
+ for i in range(n_mods):
+ for j in range(n_tup):
+ lists_packed[i].append(tup[j][i])
+ return lists_packed
+
+def to_tuple(lists: list)
+
Convert from lists to unpacked tuple
+Allows us to write X, y = to_tuple([[x1, y1], [x2, y2], [x3, y3]])
lists
: list
>>> to_tuple([[x1, y1], [x2, y2], [x3, y3]])
+([x1, x2, x3], [y1, y2, y3])
+>>> to_tuple([[x1, y1]])
+([x1], [y1])
+>>> to_tuple([m1, m2, m3])
+[m1, m2, m3]
+
def to_tuple(lists: list):
+ """Convert from lists to unpacked tuple
+
+ Allows us to write `X, y = to_tuple([[x1, y1], [x2, y2], [x3, y3]])`
+
+ Parameters
+ ----------
+ lists: list
+ list of objects to convert to unpacked tuple
+
+ Examples
+ --------
+ >>> to_tuple([[x1, y1], [x2, y2], [x3, y3]])
+ ([x1, x2, x3], [y1, y2, y3])
+ >>> to_tuple([[x1, y1]])
+ ([x1], [y1])
+ >>> to_tuple([m1, m2, m3])
+ [m1, m2, m3]
+ """
+ n_mods = len(lists)
+ if n_mods <= 1:
+ return lists
+ if not isinstance(lists[0], list):
+ return lists
+ n_tup = len(lists[0])
+ tup = [[] for _ in range(n_tup)]
+ for i in range(n_mods):
+ for j in range(n_tup):
+ tup[j].append(lists[i][j])
+ return tuple(tup)
+
+class AsyncVfunc
+(name: str = '', vfunc=<function AsyncVfunc.<lambda>>)
+
An asynchronous version of the Vfunc class.
class AsyncVfunc:
+ """An asynchronous version of the Vfunc class."""
+
+ def __init__(self, name: str = "", vfunc=lambda x: x):
+ self.name = name
+ if isinstance(vfunc, Vfunc):
+ self.vfunc = vfunc.vfunc
+ else:
+ assert hasattr(vfunc, "fit") or callable(
+ vfunc
+ ), "vfunc must be an object with a fit method or a callable"
+ self.vfunc = vfunc
+
+ def fit(self, *args, **kwargs):
+ """This function fits params for this vfunc"""
+ if hasattr(self.vfunc, "fit"):
+ return _remote_fun.remote(self.vfunc.fit, *args, **kwargs)
+ return _remote_fun.remote(self.vfunc, *args, **kwargs)
+
+ def transform(self, *args, **kwargs):
+ """This function transforms its input in some way"""
+ if hasattr(self.vfunc, "transform"):
+ return _remote_fun.remote(self.vfunc.transform, *args, **kwargs)
+ return _remote_fun.remote(self.vfunc, *args, **kwargs)
+
+ def __call__(self, *args, **kwargs):
+ """This should decide what to call"""
+ return self.fit(*args, **kwargs)
+
+def fit(self, *args, **kwargs)
+
This function fits params for this vfunc
def fit(self, *args, **kwargs):
+ """This function fits params for this vfunc"""
+ if hasattr(self.vfunc, "fit"):
+ return _remote_fun.remote(self.vfunc.fit, *args, **kwargs)
+ return _remote_fun.remote(self.vfunc, *args, **kwargs)
+
+def transform(self, *args, **kwargs)
+
This function transforms its input in some way
def transform(self, *args, **kwargs):
+ """This function transforms its input in some way"""
+ if hasattr(self.vfunc, "transform"):
+ return _remote_fun.remote(self.vfunc.transform, *args, **kwargs)
+ return _remote_fun.remote(self.vfunc, *args, **kwargs)
+
+class PCSPipeline
+(steps=None, cache_dir=None)
+
steps
: list
cache_dir
: str
, default=None
joblib
. If None, won't do
+caching.class PCSPipeline:
+ def __init__(self, steps=None, cache_dir=None):
+ """
+ Parameters
+ ----------
+ steps: list
+ a list of Vset instances
+ cache_dir: str, default=None
+ The directory to use as data store by `joblib`. If None, won't do
+ caching.
+ """
+ if steps is None:
+ steps = []
+ self.steps = steps
+ # set up the cache
+ self.memory = joblib.Memory(location=cache_dir)
+
+ def run(self, *args, **kwargs):
+ """Runs the pipeline"""
+ run_step_cached = self.memory.cache(_run_step)
+ for i, step in enumerate(self.steps):
+ try:
+ step_name = step.name
+ except AttributeError:
+ step_name = f"Step {i}"
+ print(step_name)
+ _, fitted_step = run_step_cached(step, *args, **kwargs)
+ self.steps[i] = fitted_step
+
+ def __getitem__(self, i):
+ """Accesses ith step of pipeline"""
+ return self.steps[i]
+
+ def __len__(self):
+ return len(self.steps)
+
+ def generate_names(self, as_pandas=True):
+ name_lists = []
+ if as_pandas:
+ for step in self.steps:
+ name_lists.append([f"{i}_{str(mod)[:8]}" for i, mod in enumerate(step)])
+ indexes = list(itertools.product(*name_lists))
+ return pd.DataFrame(indexes, columns=[step.name for step in self.steps])
+ for step in self.steps:
+ name_lists.append(
+ [f"{step.name}_{i}_{str(mod)[:8]}" for i, mod in enumerate(step)]
+ )
+ return list(itertools.product(*name_lists))
+
+def generate_names(self, as_pandas=True)
+
def generate_names(self, as_pandas=True):
+ name_lists = []
+ if as_pandas:
+ for step in self.steps:
+ name_lists.append([f"{i}_{str(mod)[:8]}" for i, mod in enumerate(step)])
+ indexes = list(itertools.product(*name_lists))
+ return pd.DataFrame(indexes, columns=[step.name for step in self.steps])
+ for step in self.steps:
+ name_lists.append(
+ [f"{step.name}_{i}_{str(mod)[:8]}" for i, mod in enumerate(step)]
+ )
+ return list(itertools.product(*name_lists))
+
+def run(self, *args, **kwargs)
+
Runs the pipeline
def run(self, *args, **kwargs):
+ """Runs the pipeline"""
+ run_step_cached = self.memory.cache(_run_step)
+ for i, step in enumerate(self.steps):
+ try:
+ step_name = step.name
+ except AttributeError:
+ step_name = f"Step {i}"
+ print(step_name)
+ _, fitted_step = run_step_cached(step, *args, **kwargs)
+ self.steps[i] = fitted_step
+
+class Subkey
+(value, origin: str, output_matching: bool = False)
+
value
: Any
origin
: str
output_matching
: bool (optional)
, default False
class Subkey:
+ def __init__(self, value, origin: str, output_matching: bool = False):
+ """
+ Parameters
+ ----------
+ value: Any
+ subkey value corresponding to a Vset vfunc
+ origin: str
+ name of the origin Vset of this Subkey
+ output_matching: bool (optional), default False
+ inherited from the Vset where the Subkey is created
+ """
+ self.value = value
+ self.origin = origin
+ self.output_matching = output_matching
+ # sep_dicts_id identifies the particular call to sep_dicts() that this
+ # key's dictionary went through (if any).
+ self.sep_dicts_id = None
+
+ def is_matching(self):
+ """Checks if subkey should be matched in other Vsets"""
+ return self.output_matching or self.sep_dicts_id is not None
+
+ def matches_sep_dict_id(self, other: object):
+ """Helper to match Subkey by _sep_dict_id"""
+ if isinstance(other, self.__class__):
+ return (
+ self.sep_dicts_id is not None
+ and self.sep_dicts_id == other.sep_dicts_id
+ )
+ return False
+
+ def matches(self, other: object):
+ """When Subkey matching is required, determines if this Subkey is compatible
+ with another, meaning that the origins and values match, and either the
+ _sep_dicts_id matches or both Subkeys have _output_matching True.
+ """
+ if isinstance(other, self.__class__):
+ # they're both matching
+ cond0 = self.is_matching() and other.is_matching()
+ # value and origins match
+ cond1 = self.value == other.value and self.origin == other.origin
+ # sep_dicts_id matches
+ cond2 = self.sep_dicts_id == other.sep_dicts_id or (
+ self.output_matching and other.output_matching
+ )
+ return cond0 and cond1 and cond2
+ return False
+
+ def mismatches(self, other: object):
+ """When Subkey matching is required, determines if this Subkey and another are
+ a bad match, meaning either:
+
+ 1. output_matching is True, origin is same, value is different
+ 2. output_matching is False, sep_dicts_id is same and not None, origin
+ is same, value is different
+ """
+ if isinstance(other, self.__class__):
+ # one of the two keys is output_matching
+ cond0 = self.output_matching or other.output_matching
+ # neither key is output_matching but sep_dict_ids not None and match
+ cond1 = not cond0 and self.matches_sep_dict_id(other)
+ # origins match and values mismatch
+ cond2 = self.origin == other.origin and self.value != other.value
+ return (cond0 or cond1) and cond2
+ return True
+
+ def __eq__(self, other: object):
+ """Mainly used for testing purposes."""
+ if isinstance(other, self.__class__):
+ # value and origins match
+ return self.value == other.value and self.origin == other.origin
+ return False
+
+ def __repr__(self):
+ return str(self.value)
+
+ def __hash__(self):
+ """Mainly used for testing purposes."""
+ return hash(self.value) ^ hash(self.origin) ^ hash(self.output_matching)
+
+def is_matching(self)
+
Checks if subkey should be matched in other Vsets
def is_matching(self):
+ """Checks if subkey should be matched in other Vsets"""
+ return self.output_matching or self.sep_dicts_id is not None
+
+def matches(self, other: object)
+
When Subkey matching is required, determines if this Subkey is compatible +with another, meaning that the origins and values match, and either the +_sep_dicts_id matches or both Subkeys have _output_matching True.
def matches(self, other: object):
+ """When Subkey matching is required, determines if this Subkey is compatible
+ with another, meaning that the origins and values match, and either the
+ _sep_dicts_id matches or both Subkeys have _output_matching True.
+ """
+ if isinstance(other, self.__class__):
+ # they're both matching
+ cond0 = self.is_matching() and other.is_matching()
+ # value and origins match
+ cond1 = self.value == other.value and self.origin == other.origin
+ # sep_dicts_id matches
+ cond2 = self.sep_dicts_id == other.sep_dicts_id or (
+ self.output_matching and other.output_matching
+ )
+ return cond0 and cond1 and cond2
+ return False
+
+def matches_sep_dict_id(self, other: object)
+
Helper to match Subkey by _sep_dict_id
def matches_sep_dict_id(self, other: object):
+ """Helper to match Subkey by _sep_dict_id"""
+ if isinstance(other, self.__class__):
+ return (
+ self.sep_dicts_id is not None
+ and self.sep_dicts_id == other.sep_dicts_id
+ )
+ return False
+
+def mismatches(self, other: object)
+
When Subkey matching is required, determines if this Subkey and another are +a bad match, meaning either:
+def mismatches(self, other: object):
+ """When Subkey matching is required, determines if this Subkey and another are
+ a bad match, meaning either:
+
+ 1. output_matching is True, origin is same, value is different
+ 2. output_matching is False, sep_dicts_id is same and not None, origin
+ is same, value is different
+ """
+ if isinstance(other, self.__class__):
+ # one of the two keys is output_matching
+ cond0 = self.output_matching or other.output_matching
+ # neither key is output_matching but sep_dict_ids not None and match
+ cond1 = not cond0 and self.matches_sep_dict_id(other)
+ # origins match and values mismatch
+ cond2 = self.origin == other.origin and self.value != other.value
+ return (cond0 or cond1) and cond2
+ return True
+
+class Vfunc
+(name: str = '', vfunc=<function Vfunc.<lambda>>)
+
Vfunc is basically a function along with a name attribute. +It may support a "fit" function, but may also just have a "transform" function. +If none of these is supported, it need only be a function
class Vfunc:
+ """Vfunc is basically a function along with a name attribute.
+ It may support a "fit" function, but may also just have a "transform" function.
+ If none of these is supported, it need only be a function
+ """
+
+ def __init__(self, name: str = "", vfunc=lambda x: x):
+ assert hasattr(vfunc, "fit") or callable(
+ vfunc
+ ), "vfunc must be an object with a fit method or a callable"
+ self.name = name
+ self.vfunc = vfunc
+
+ def fit(self, *args, **kwargs):
+ """This function fits params for this vfunc"""
+ if hasattr(self.vfunc, "fit"):
+ return self.vfunc.fit(*args, **kwargs)
+ return self.vfunc(*args, **kwargs)
+
+ def transform(self, *args, **kwargs):
+ """This function transforms its input in some way"""
+ if hasattr(self.vfunc, "transform"):
+ return self.vfunc.transform(*args, **kwargs)
+ return self.vfunc(*args, **kwargs)
+
+ def __call__(self, *args, **kwargs):
+ """This should decide what to call"""
+ return self.fit(*args, **kwargs)
+
+def fit(self, *args, **kwargs)
+
This function fits params for this vfunc
def fit(self, *args, **kwargs):
+ """This function fits params for this vfunc"""
+ if hasattr(self.vfunc, "fit"):
+ return self.vfunc.fit(*args, **kwargs)
+ return self.vfunc(*args, **kwargs)
+
+def transform(self, *args, **kwargs)
+
This function transforms its input in some way
def transform(self, *args, **kwargs):
+ """This function transforms its input in some way"""
+ if hasattr(self.vfunc, "transform"):
+ return self.vfunc.transform(*args, **kwargs)
+ return self.vfunc(*args, **kwargs)
+
+class VfuncPromise
+(vfunc: , *args)
+
A Vfunc promise used for lazy evaluation.
class VfuncPromise:
+ """A Vfunc promise used for lazy evaluation."""
+
+ def __init__(self, vfunc: callable, *args):
+ self.vfunc = vfunc
+ self.args = args
+ self.called = False
+ self.value = None
+
+ def __call__(self):
+ """This should decide what to call"""
+ if self.called:
+ return self.value
+ tmp_args = []
+ for i, arg in enumerate(self.args):
+ tmp_args.append(arg)
+ while isinstance(tmp_args[i], VfuncPromise):
+ tmp_args[i] = tmp_args[i]()
+ while isinstance(self.vfunc, VfuncPromise):
+ self.vfunc = self.vfunc()
+ self.value = self.vfunc(*tmp_args)
+ self.called = True
+ return self.value
+
+ def _get_value(self):
+ if isinstance(self(), ray.ObjectRef):
+ self.value = ray.get(self.value)
+ return self.value
+
+ def transform(self, *args):
+ """This function transforms its input in some way"""
+ return self._get_value().transform(*args)
+
+ def predict(self, *args):
+ """This function calls predict on its inputs"""
+ return self._get_value().predict(*args)
+
+ def predict_proba(self, *args):
+ """This function calls predict_proba on its inputs"""
+ return self._get_value().predict_proba(*args)
+
+ def __repr__(self):
+ if self.called:
+ return f"Fulfilled VfuncPromise({self.value})"
+ return f"Unfulfilled VfuncPromise(func={self.vfunc}, args={self.args})"
+
+def predict(self, *args)
+
This function calls predict on its inputs
def predict(self, *args):
+ """This function calls predict on its inputs"""
+ return self._get_value().predict(*args)
+
+def predict_proba(self, *args)
+
This function calls predict_proba on its inputs
def predict_proba(self, *args):
+ """This function calls predict_proba on its inputs"""
+ return self._get_value().predict_proba(*args)
+
+def transform(self, *args)
+
This function transforms its input in some way
def transform(self, *args):
+ """This function transforms its input in some way"""
+ return self._get_value().transform(*args)
+
+class Vset
+(name: str, vfuncs, vfunc_keys: list = None, is_async: bool = False, output_matching: bool = False, lazy: bool = False, cache_dir: str = None, tracking_dir: str = None)
+
name
: str
vfuncs
: list
or dict
vfunc_keys
: list (optional)
is_async
: bool (optional)
vfuncs
are computed asynchronouslyoutput_matching
: bool (optional)
lazy
: bool (optional)
vset.vfunc.VfuncPromise
cache_dir
: str (optional)
cache_dir
as the data store for
+joblib.Memory
.tracking_dir
: str (optional)
mlflow.tracking
api to log outputs as metrics
+with params determined by input keys.class Vset:
+ def __init__(
+ self,
+ name: str,
+ vfuncs,
+ vfunc_keys: list = None,
+ is_async: bool = False,
+ output_matching: bool = False,
+ lazy: bool = False,
+ cache_dir: str = None,
+ tracking_dir: str = None,
+ ):
+ """
+ Parameters
+ ----------
+ name: str
+ Name of this Vset.
+ vfuncs: list or dict
+ Dictionary of functions that we want to associate with
+ vfunc_keys: list (optional)
+ List of names corresponding to each vfunc
+ is_async: bool (optional)
+ If True, `vfuncs` are computed asynchronously
+ output_matching: bool (optional)
+ If True, then output keys from this Vset will be matched when used
+ in other Vsets
+ lazy: bool (optional)
+ If True, then vfuncs are evaluated lazily, i.e. outputs are `vset.vfunc.VfuncPromise`
+ cache_dir: str (optional)
+ If provided, do caching and use `cache_dir` as the data store for
+ `joblib.Memory`.
+ tracking_dir: str (optional)
+ If provided, use the `mlflow.tracking` api to log outputs as metrics
+ with params determined by input keys.
+
+ """
+ self.name = name
+ self._fitted = False
+ self.fitted_vfuncs = None # outputs
+ self._async = is_async
+ self._output_matching = output_matching
+ self._lazy = lazy
+ self._cache_dir = cache_dir
+ self._memory = joblib.Memory(self._cache_dir)
+ if tracking_dir is not None:
+ self._mlflow = MlflowClient(tracking_uri=tracking_dir)
+ experiment = self._mlflow.get_experiment_by_name(name=self.name)
+ if experiment is None:
+ self._exp_id = self._mlflow.create_experiment(name=self.name)
+ else:
+ self._exp_id = experiment.experiment_id
+ else:
+ self._mlflow = None
+ # check if any of the vfuncs are AsyncVfuncs
+ # if so, we'll make then all AsyncVfuncs later on
+ if not self._async and np.any([isinstance(vf, AsyncVfunc) for vf in vfuncs]):
+ self._async = True
+ if isinstance(vfuncs, dict):
+ self.vfuncs = vfuncs
+ elif isinstance(vfuncs, list):
+ if vfunc_keys is not None:
+ assert isinstance(
+ vfunc_keys, list
+ ), "vfuncs passed as list but vfunc_keys is not a list"
+ assert len(vfuncs) == len(
+ vfunc_keys
+ ), "vfuncs list and vfunc_keys list do not have the same length"
+ # TODO: how best to handle tuple subkeys?
+ vfunc_keys = [(self.__create_subkey(k),) for k in vfunc_keys]
+ else:
+ vfunc_keys = [
+ (self.__create_subkey(f"{name}_{i}"),) for i in range(len(vfuncs))
+ ]
+ # convert vfunc keys to singleton tuples
+ self.vfuncs = dict(zip(vfunc_keys, vfuncs))
+ # if needed, wrap the vfuncs in the Vfunc or AsyncVfunc class
+ for k, v in self.vfuncs.items():
+ if self._async:
+ if not isinstance(v, AsyncVfunc):
+ self.vfuncs[k] = AsyncVfunc(k[0], v)
+ elif not isinstance(v, Vfunc):
+ self.vfuncs[k] = Vfunc(k[0], v)
+
+ def _apply_func(self, *args, out_dict: dict = None):
+ """Apply functions in out_dict to combined args dict
+
+ Optionally logs output Subkeys and values as params and metrics using
+ `mlflow.tracking` if this Vset has a `_tracking_dir`.
+
+ Parameters
+ ----------
+ *args: dict
+ Takes multiple dicts and combines them into one.
+ Then runs vfuncs on each item in combined dict.
+ out_dict: dict (optional), default None
+ The dictionary to pass to the matching function. If None, defaults to self.vfuncs.
+
+ Returns
+ -------
+ out_dict: dict
+ Dictionary with items being determined by functions in vfunc set.
+ Functions and input dictionaries are currently matched using a cartesian matching format.
+
+ Examples
+ --------
+ >>> vfuncs, data = {LR : logistic}, {train_1 : [X1,y1], train2 : [X2,y2]}
+ {(train_1, LR) : fitted logistic, (train_2, LR) : fitted logistic}
+ """
+ if out_dict is None:
+ out_dict = deepcopy(self.vfuncs)
+
+ apply_func_cached = self._memory.cache(_apply_func_cached)
+ out_dict = apply_func_cached(out_dict, self._async, self._lazy, *args)
+
+ prev = tuple()
+ for arg in args:
+ if PREV_KEY in arg:
+ prev += (arg[PREV_KEY],)
+ out_dict[PREV_KEY] = (self,) + prev
+
+ if self._mlflow is not None:
+ run_dict = {}
+ # log subkeys as params and value as metric
+ for k, v in out_dict.items():
+ if k == PREV_KEY:
+ continue
+ origins = np.array([subk.origin for subk in k])
+ # ignore init origins and the last origin (this Vset)
+ param_idx = [i for i in range(len(k[:-1])) if origins[i] != "init"]
+ # get or create mlflow run
+ run_dict_key = tuple(subk.value for subk in k[:-1])
+ if run_dict_key in run_dict:
+ run_id = run_dict[run_dict_key]
+ else:
+ run = self._mlflow.create_run(self._exp_id)
+ run_id = run.info.run_id
+ run_dict[run_dict_key] = run_id
+ # log params
+ for idx in param_idx:
+ subkey = k[idx]
+ param_name = subkey.origin
+ # check if the origin occurs multiple times
+ if np.sum(origins == param_name) > 1:
+ occurence = np.sum(origins[:idx] == param_name)
+ param_name = param_name + str(occurence)
+ self._mlflow.log_param(run_id, param_name, subkey.value)
+ self._mlflow.log_metric(run_id, k[-1].value, v)
+ return out_dict
+
+ def fit(self, *args):
+ """Fits to args using `_apply_func`"""
+ out_dict = {}
+ for k, v in self.vfuncs.items():
+ out_dict[k] = v.fit
+ self.fitted_vfuncs = self._apply_func(*args, out_dict=out_dict)
+ prev = self.fitted_vfuncs[PREV_KEY][1:]
+ if hasattr(self, FILTER_PREV_KEY):
+ prev = getattr(self, FILTER_PREV_KEY) + prev
+ setattr(self, PREV_KEY, prev)
+ self._fitted = True
+ return self
+
+ def fit_transform(self, *args):
+ """Fits to args and transforms only the first arg."""
+ return self.fit(*args).transform(args[0])
+
+ def transform(self, *args):
+ """Transforms args using `_apply_func`"""
+ if not self._fitted:
+ raise AttributeError(
+ "Please fit the Vset object before calling the transform method."
+ )
+ out_dict = {}
+ for k, v in self.fitted_vfuncs.items():
+ if hasattr(v, "transform"):
+ out_dict[k] = v.transform
+ return self._apply_func(*args, out_dict=out_dict)
+
+ def predict(self, *args, with_uncertainty: bool = False, group_by: list = None):
+ """Predicts args using `_apply_func`"""
+ if not self._fitted:
+ raise AttributeError("Please fit the Vset object before calling predict.")
+ pred_dict = {}
+ for k, v in self.fitted_vfuncs.items():
+ if hasattr(v, "predict"):
+ pred_dict[k] = v.predict
+ preds = self._apply_func(*args, out_dict=pred_dict)
+ if with_uncertainty:
+ return prediction_uncertainty(preds, group_by)
+ return preds
+
+ def predict_proba(
+ self, *args, with_uncertainty: bool = False, group_by: list = None
+ ):
+ """Calls predict_proba on args using `_apply_func`"""
+ if not self._fitted:
+ raise AttributeError(
+ "Please fit the Vset object before calling predict_proba."
+ )
+ pred_dict = {}
+ for k, v in self.fitted_vfuncs.items():
+ if hasattr(v, "predict_proba"):
+ pred_dict[k] = v.predict_proba
+ preds = self._apply_func(*args, out_dict=pred_dict)
+ if with_uncertainty:
+ return prediction_uncertainty(preds, group_by)
+ return preds
+
+ def evaluate(self, *args):
+ """Combines dicts before calling `_apply_func`"""
+ return self._apply_func(*args)
+
+ def __call__(self, *args, n_out: int = None, keys=None, **kwargs):
+ """Call args using `_apply_func`, optionally seperating
+ output dictionary into `n_out` dictionaries with `keys`
+ """
+ if keys is None:
+ keys = []
+ if n_out is None:
+ n_out = len(args)
+ out_dict = self._apply_func(*args)
+ if n_out == 1:
+ return out_dict
+ out_dicts = sep_dicts(out_dict, n_out=n_out, keys=keys)
+ # add back prev
+ prev = out_dict[PREV_KEY]
+ for i in range(n_out):
+ if n_out == len(args):
+ out_dicts[i][PREV_KEY] = (prev[0],) + (prev[i + 1],)
+ else:
+ out_dicts[i][PREV_KEY] = prev
+ return out_dicts
+
+ def __getitem__(self, i):
+ """Accesses ith item in the vfunc set"""
+ return self.vfuncs[i]
+
+ def __contains__(self, key):
+ """Returns true if vfuncs is a dict and key is one of its keys"""
+ if isinstance(self.vfuncs, dict):
+ return key in self.vfuncs.keys()
+ return False
+
+ def keys(self):
+ """Returns Vset vfunc keys"""
+ if isinstance(self.vfuncs, dict):
+ return self.vfuncs.keys()
+ return {}.keys()
+
+ def __len__(self):
+ return len(self.vfuncs)
+
+ def __str__(self):
+ return "Vset(" + self.name + ")"
+
+ def __create_subkey(self, value):
+ """Helper function to construct `Subkey` with
+ this Vset determining origin and output_matching
+ """
+ return Subkey(value, self.name, self._output_matching)
+
+def evaluate(self, *args)
+
Combines dicts before calling _apply_func
def evaluate(self, *args):
+ """Combines dicts before calling `_apply_func`"""
+ return self._apply_func(*args)
+
+def fit(self, *args)
+
Fits to args using _apply_func
def fit(self, *args):
+ """Fits to args using `_apply_func`"""
+ out_dict = {}
+ for k, v in self.vfuncs.items():
+ out_dict[k] = v.fit
+ self.fitted_vfuncs = self._apply_func(*args, out_dict=out_dict)
+ prev = self.fitted_vfuncs[PREV_KEY][1:]
+ if hasattr(self, FILTER_PREV_KEY):
+ prev = getattr(self, FILTER_PREV_KEY) + prev
+ setattr(self, PREV_KEY, prev)
+ self._fitted = True
+ return self
+
+def fit_transform(self, *args)
+
Fits to args and transforms only the first arg.
def fit_transform(self, *args):
+ """Fits to args and transforms only the first arg."""
+ return self.fit(*args).transform(args[0])
+
+def keys(self)
+
Returns Vset vfunc keys
def keys(self):
+ """Returns Vset vfunc keys"""
+ if isinstance(self.vfuncs, dict):
+ return self.vfuncs.keys()
+ return {}.keys()
+
+def predict(self, *args, with_uncertainty: bool = False, group_by: list = None)
+
Predicts args using _apply_func
def predict(self, *args, with_uncertainty: bool = False, group_by: list = None):
+ """Predicts args using `_apply_func`"""
+ if not self._fitted:
+ raise AttributeError("Please fit the Vset object before calling predict.")
+ pred_dict = {}
+ for k, v in self.fitted_vfuncs.items():
+ if hasattr(v, "predict"):
+ pred_dict[k] = v.predict
+ preds = self._apply_func(*args, out_dict=pred_dict)
+ if with_uncertainty:
+ return prediction_uncertainty(preds, group_by)
+ return preds
+
+def predict_proba(self, *args, with_uncertainty: bool = False, group_by: list = None)
+
Calls predict_proba on args using _apply_func
def predict_proba(
+ self, *args, with_uncertainty: bool = False, group_by: list = None
+):
+ """Calls predict_proba on args using `_apply_func`"""
+ if not self._fitted:
+ raise AttributeError(
+ "Please fit the Vset object before calling predict_proba."
+ )
+ pred_dict = {}
+ for k, v in self.fitted_vfuncs.items():
+ if hasattr(v, "predict_proba"):
+ pred_dict[k] = v.predict_proba
+ preds = self._apply_func(*args, out_dict=pred_dict)
+ if with_uncertainty:
+ return prediction_uncertainty(preds, group_by)
+ return preds
+
+def transform(self, *args)
+
Transforms args using _apply_func
def transform(self, *args):
+ """Transforms args using `_apply_func`"""
+ if not self._fitted:
+ raise AttributeError(
+ "Please fit the Vset object before calling the transform method."
+ )
+ out_dict = {}
+ for k, v in self.fitted_vfuncs.items():
+ if hasattr(v, "transform"):
+ out_dict[k] = v.transform
+ return self._apply_func(*args, out_dict=out_dict)
+