diff --git a/python-spec/src/somacore/query/_fast_csr.py b/python-spec/src/somacore/query/_fast_csr.py index 17fb0aa2..511547a3 100644 --- a/python-spec/src/somacore/query/_fast_csr.py +++ b/python-spec/src/somacore/query/_fast_csr.py @@ -6,44 +6,46 @@ import numba.typed import numpy as np import numpy.typing as npt -import pandas as pd import pyarrow as pa from scipy import sparse from .. import data as scd from . import _eager_iter +from . import types -def read_scipy_csr( - matrix: scd.SparseNDArray, obs_joinids: pa.Array, var_joinids: pa.Array -) -> sparse.csr_matrix: - """ - Given a 2D SparseNDArray and joinids for the two dimensions, read the - slice and return it as an SciPy sparse.csr_matrix. - """ - data, indptr, indices, shape = _read_csr(matrix, obs_joinids, var_joinids) - csr = _create_scipy_csr_matrix(data, indices, indptr, shape=shape) - return csr +def read_csr( + matrix: scd.SparseNDArray, + obs_joinids: pa.Array, + var_joinids: pa.Array, + index_factory: types.IndexFactory, +) -> "AccumulatedCSR": + if not isinstance(matrix, scd.SparseNDArray) or matrix.ndim != 2: + raise TypeError("Can only read from a 2D SparseNDArray") + max_workers = (os.cpu_count() or 4) + 2 + with futures.ThreadPoolExecutor(max_workers=max_workers) as pool: + acc = _CSRAccumulator( + obs_joinids=obs_joinids, + var_joinids=var_joinids, + pool=pool, + index_factory=index_factory, + ) + for tbl in _eager_iter.EagerIterator( + matrix.read((obs_joinids, var_joinids)).tables(), + pool=pool, + ): + acc.append(tbl["soma_dim_0"], tbl["soma_dim_1"], tbl["soma_data"]) -def read_arrow_csr( - matrix: scd.SparseNDArray, obs_joinids: pa.Array, var_joinids: pa.Array -) -> pa.SparseCSRMatrix: - """ - Given a 2D SparseNDArray and joinids for the two dimensions, read the - slice and return it as a pyarrow.SparseCSRMatrix. - """ - data, indptr, indices, shape = _read_csr(matrix, obs_joinids, var_joinids) - csr = pa.SparseCSRMatrix.from_numpy(data, indptr, indices, shape=shape) - return csr + return acc.finalize() -class _CSRAccumulatorFinalResult(NamedTuple): +class AccumulatedCSR(NamedTuple): """ Private. Return type for the _CSRAccumulator.finalize method. - Contains a sparse CSR consituent elements + Contains a sparse CSR's constituent elements. """ data: npt.NDArray[np.number] @@ -51,6 +53,27 @@ class _CSRAccumulatorFinalResult(NamedTuple): indices: npt.NDArray[np.integer] shape: Tuple[int, int] + def to_scipy(self) -> sparse.csr_matrix: + """Create a Scipy sparse.csr_matrix from component elements. + + Conceptually, this is identical to:: + + sparse.csr_matrix((data, indices, indptr), shape=shape) + + This ugliness is to bypass the O(N) scan that + :meth:`sparse._cs_matrix.__init__` + does when a new compressed matrix is created. + + See `SciPy bug 11496 ` + for details. + """ + matrix = sparse.csr_matrix.__new__(sparse.csr_matrix) + matrix.data = self.data + matrix.indptr = self.indptr + matrix.indices = self.indices + matrix._shape = self.shape + return matrix + class _CSRAccumulator: """ @@ -62,14 +85,15 @@ def __init__( obs_joinids: npt.NDArray[np.int64], var_joinids: npt.NDArray[np.int64], pool: futures.Executor, + index_factory: types.IndexFactory, ): self.obs_joinids = obs_joinids self.var_joinids = var_joinids self.pool = pool self.shape: Tuple[int, int] = (len(self.obs_joinids), len(self.var_joinids)) - self.obs_indexer: pd.Index = pd.Index(self.obs_joinids) - self.var_indexer: pd.Index = pd.Index(self.var_joinids) + self.obs_indexer = index_factory(self.obs_joinids) + self.var_indexer = index_factory(self.var_joinids) self.row_length: npt.NDArray[np.int64] = np.zeros( (self.shape[0],), dtype=_select_dtype(self.shape[1]) ) @@ -91,6 +115,7 @@ def append( ) -> None: """ At accumulation time, do several things: + * re-index to positional indices, and if possible, cast to smaller dtype to minimize memory footprint (at cost of some amount of time) * accumulate column counts by row, i.e., build the basis of the indptr @@ -113,7 +138,7 @@ def append( self.coo_chunks.append((row_ind, col_ind, data.to_numpy())) _accum_row_length(self.row_length, row_ind) - def finalize(self) -> _CSRAccumulatorFinalResult: + def finalize(self) -> AccumulatedCSR: nnz = sum(len(chunk[2]) for chunk in self.coo_chunks) index_dtype = _select_dtype(nnz) if nnz == 0: @@ -121,7 +146,7 @@ def finalize(self) -> _CSRAccumulatorFinalResult: # an empty matrix. Float32 is used as a default type, as it is most # compatible with AnnData expectations. empty = sparse.csr_matrix((0, 0), dtype=np.float32) - return _CSRAccumulatorFinalResult( + return AccumulatedCSR( data=empty.data, indptr=empty.indptr, indices=empty.indices, @@ -160,7 +185,7 @@ def finalize(self) -> _CSRAccumulatorFinalResult: ] ) _finalize_indptr(indptr) - return _CSRAccumulatorFinalResult( + return AccumulatedCSR( data=data, indptr=indptr, indices=indices, shape=self.shape ) @@ -243,61 +268,8 @@ def _select_dtype( def _reindex_and_cast( - index: pd.Index, ids: npt.NDArray[np.int64], target_dtype: npt.DTypeLike + index: types.IndexLike, ids: npt.NDArray[np.int64], target_dtype: npt.DTypeLike ) -> npt.NDArray[np.int64]: return cast( npt.NDArray[np.int64], index.get_indexer(ids).astype(target_dtype, copy=False) ) - - -def _read_csr( - matrix: scd.SparseNDArray, obs_joinids: pa.Array, var_joinids: pa.Array -) -> Tuple[ - npt.NDArray[np.number], # data - npt.NDArray[np.integer], # indptr - npt.NDArray[np.integer], # indices - Tuple[int, int], # shape -]: - if not isinstance(matrix, scd.SparseNDArray) or matrix.ndim != 2: - raise TypeError("Can only read from a 2D SparseNDArray") - - max_workers = (os.cpu_count() or 4) + 2 - with futures.ThreadPoolExecutor(max_workers=max_workers) as pool: - acc = _CSRAccumulator( - obs_joinids=obs_joinids, var_joinids=var_joinids, pool=pool - ) - for tbl in _eager_iter.EagerIterator( - matrix.read((obs_joinids, var_joinids)).tables(), - pool=pool, - ): - acc.append(tbl["soma_dim_0"], tbl["soma_dim_1"], tbl["soma_data"]) - - data, indptr, indices, shape = acc.finalize() - - return data, indptr, indices, shape - - -def _create_scipy_csr_matrix( - data: npt.NDArray[np.number], - indices: npt.NDArray[np.integer], - indptr: npt.NDArray[np.integer], - shape: Tuple[int, int], -) -> sparse.csr_matrix: - """Create a Scipy sparse.csr_matrix from component elements. - - Conceptually, this is identical to:: - - sparse.csr_matrix((data, indices, indptr), shape=shape) - - This ugliness is to bypass the O(N) scan that - :meth:`scipy.sparse._cs_matrix.__init__` - does when a new compressed matrix is created. - - See https://github.com/scipy/scipy/issues/11496 for details on the bug. - """ - matrix = sparse.csr_matrix.__new__(sparse.csr_matrix) - matrix.data = data - matrix.indices = indices - matrix.indptr = indptr - matrix._shape = shape - return matrix diff --git a/python-spec/src/somacore/query/query.py b/python-spec/src/somacore/query/query.py index 9fed0f30..b8d4bc71 100644 --- a/python-spec/src/somacore/query/query.py +++ b/python-spec/src/somacore/query/query.py @@ -27,6 +27,7 @@ from .. import options from . import _fast_csr from . import axis +from . import types _RO_AUTO = options.ResultOrder.AUTO @@ -88,6 +89,7 @@ def __init__( *, obs_query: axis.AxisQuery = axis.AxisQuery(), var_query: axis.AxisQuery = axis.AxisQuery(), + index_factory: types.IndexFactory = pd.Index, ): if measurement_name not in experiment.ms: raise ValueError("Measurement does not exist in the experiment") @@ -97,7 +99,11 @@ def __init__( self._matrix_axis_query = _MatrixAxisQuery(obs=obs_query, var=var_query) self._joinids = _JoinIDCache(self) - self._indexer = AxisIndexer(self) + self._indexer = AxisIndexer( + self, + index_factory=index_factory, + ) + self._index_factory = index_factory self._threadpool_: Optional[futures.ThreadPoolExecutor] = None def obs( @@ -393,9 +399,12 @@ def _read_axis_mappings(fn, axis, keys: Sequence[str]) -> Dict[str, np.ndarray]: obs_table, var_table = self._read_both_axes(column_names) x_matrices = { - _xname: _fast_csr.read_scipy_csr( - all_x_arrays[_xname], self.obs_joinids(), self.var_joinids() - ) + _xname: _fast_csr.read_csr( + all_x_arrays[_xname], + self.obs_joinids(), + self.var_joinids(), + index_factory=self._index_factory, + ).to_scipy() for _xname in all_x_arrays } @@ -736,6 +745,7 @@ class AxisIndexer: """ query: ExperimentAxisQuery + _index_factory: types.IndexFactory _cached_obs: Optional[pd.Index] = None _cached_var: Optional[pd.Index] = None diff --git a/python-spec/src/somacore/query/types.py b/python-spec/src/somacore/query/types.py new file mode 100644 index 00000000..2b63b22a --- /dev/null +++ b/python-spec/src/somacore/query/types.py @@ -0,0 +1,33 @@ +"""Common types used across SOMA query modules.""" + +from typing import Any, Callable + +import numpy as np +import numpy.typing as npt +from typing_extensions import Protocol + + +class IndexLike(Protocol): + """The basics of what we expect an Index to be. + + This is a basic description of the parts of the ``pandas.Index`` type + that we use. It is intended as a rough guide so an implementor can know + that they are probably passing the right "index" type into a function, + not as a full specification of the types and behavior of ``get_indexer``. + """ + + def get_indexer( + self, + target: npt.NDArray[np.int64], + method: object = ..., + limit: object = ..., + tolerance: object = ..., + ) -> Any: + """Something compatible with Pandas' Index.get_indexer method.""" + + +IndexFactory = Callable[[npt.NDArray[np.int64]], "IndexLike"] +"""Function that builds an index over the given NDArray. + +This interface is implemented by the callable ``pandas.Index``. +"""