From 884e8a42ea9e94d0ec3b1ce591791cdef95ca857 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Thu, 7 Nov 2024 14:13:12 -0500 Subject: [PATCH] `rm _{{test_,}eager_iter,fast_csr}.py` (moved to TileDB-SOMA) --- pyproject.toml | 1 - python-spec/requirements-py3.10.txt | 1 - python-spec/requirements-py3.11.txt | 1 - python-spec/requirements-py3.12.txt | 1 - python-spec/requirements-py3.9.txt | 1 - python-spec/src/somacore/query/_eager_iter.py | 51 ---- python-spec/src/somacore/query/_fast_csr.py | 275 ------------------ python-spec/testing/test_eager_iter.py | 64 ---- 8 files changed, 395 deletions(-) delete mode 100644 python-spec/src/somacore/query/_eager_iter.py delete mode 100644 python-spec/src/somacore/query/_fast_csr.py delete mode 100644 python-spec/testing/test_eager_iter.py diff --git a/pyproject.toml b/pyproject.toml index b3f31e2a..7e07880e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,6 @@ readme = "./python-spec/README.md" dependencies = [ "anndata", "attrs>=22.1", - "numba", "numpy>=1.21", "pandas", "pyarrow", diff --git a/python-spec/requirements-py3.10.txt b/python-spec/requirements-py3.10.txt index c2fd0fe3..951c0cd9 100644 --- a/python-spec/requirements-py3.10.txt +++ b/python-spec/requirements-py3.10.txt @@ -5,7 +5,6 @@ exceptiongroup==1.2.1 h5py==3.11.0 llvmlite==0.43.0 natsort==8.4.0 -numba==0.60.0 numpy==2.0.0 packaging==24.1 pandas==2.2.2 diff --git a/python-spec/requirements-py3.11.txt b/python-spec/requirements-py3.11.txt index 665528ff..0dd6c8ad 100644 --- a/python-spec/requirements-py3.11.txt +++ b/python-spec/requirements-py3.11.txt @@ -4,7 +4,6 @@ attrs==23.2.0 h5py==3.11.0 llvmlite==0.43.0 natsort==8.4.0 -numba==0.60.0 numpy==2.0.0 packaging==24.1 pandas==2.2.2 diff --git a/python-spec/requirements-py3.12.txt b/python-spec/requirements-py3.12.txt index 0fe2f050..1d0585c5 100644 --- a/python-spec/requirements-py3.12.txt +++ b/python-spec/requirements-py3.12.txt @@ -4,7 +4,6 @@ attrs==23.2.0 h5py==3.11.0 llvmlite==0.43.0 natsort==8.4.0 -numba==0.60.0 numpy==2.0.0 packaging==24.1 pandas==2.2.2 diff --git a/python-spec/requirements-py3.9.txt b/python-spec/requirements-py3.9.txt index db2c51ae..fc763a1c 100644 --- a/python-spec/requirements-py3.9.txt +++ b/python-spec/requirements-py3.9.txt @@ -7,7 +7,6 @@ h5py==3.11.0 jmespath==1.0.1 llvmlite==0.43.0 natsort==8.4.0 -numba==0.60.0 numpy==2.0.0 packaging==24.1 pandas==2.2.2 diff --git a/python-spec/src/somacore/query/_eager_iter.py b/python-spec/src/somacore/query/_eager_iter.py deleted file mode 100644 index c42e52c1..00000000 --- a/python-spec/src/somacore/query/_eager_iter.py +++ /dev/null @@ -1,51 +0,0 @@ -from concurrent import futures -from typing import Iterator, Optional, TypeVar - -_T = TypeVar("_T") - - -class EagerIterator(Iterator[_T]): - def __init__( - self, - iterator: Iterator[_T], - pool: Optional[futures.Executor] = None, - ): - super().__init__() - self.iterator = iterator - self._pool = pool or futures.ThreadPoolExecutor() - self._own_pool = pool is None - self._preload_future = self._pool.submit(self.iterator.__next__) - - def __next__(self) -> _T: - stopped = False - try: - if self._preload_future.cancel(): - # If `.cancel` returns True, cancellation was successful. - # The self.iterator.__next__ call has not yet been started, - # and will never be started, so we can compute next ourselves. - # This prevents deadlocks if the thread pool is too small - # and we can never create a preload thread. - return next(self.iterator) - # `.cancel` returned false, so the preload is already running. - # Just wait for it. - return self._preload_future.result() - except StopIteration: - self._cleanup() - stopped = True - raise - finally: - if not stopped: - # If we have more to do, go for the next thing. - self._preload_future = self._pool.submit(self.iterator.__next__) - - def _cleanup(self) -> None: - if self._own_pool: - self._pool.shutdown() - - def __del__(self) -> None: - # Ensure the threadpool is cleaned up in the case where the - # iterator is not exhausted. For more information on __del__: - # https://docs.python.org/3/reference/datamodel.html#object.__del__ - self._cleanup() - super_del = getattr(super(), "__del__", lambda: None) - super_del() diff --git a/python-spec/src/somacore/query/_fast_csr.py b/python-spec/src/somacore/query/_fast_csr.py deleted file mode 100644 index 99a784d4..00000000 --- a/python-spec/src/somacore/query/_fast_csr.py +++ /dev/null @@ -1,275 +0,0 @@ -import os -from concurrent import futures -from typing import List, NamedTuple, Tuple, Type, cast - -import numba -import numba.typed -import numpy as np -import numpy.typing as npt -import pyarrow as pa -from scipy import sparse - -from .. import data as scd -from . import _eager_iter -from . import types - - -def read_csr( - matrix: scd.SparseNDArray, - obs_joinids: pa.IntegerArray, - var_joinids: pa.IntegerArray, - index_factory: types.IndexFactory, -) -> "AccumulatedCSR": - if not isinstance(matrix, scd.SparseNDArray) or matrix.ndim != 2: - raise TypeError("Can only read from a 2D SparseNDArray") - - max_workers = (os.cpu_count() or 4) + 2 - with futures.ThreadPoolExecutor(max_workers=max_workers) as pool: - acc = _CSRAccumulator( - obs_joinids=obs_joinids, - var_joinids=var_joinids, - pool=pool, - index_factory=index_factory, - ) - for tbl in _eager_iter.EagerIterator( - matrix.read((obs_joinids, var_joinids)).tables(), - pool=pool, - ): - acc.append(tbl["soma_dim_0"], tbl["soma_dim_1"], tbl["soma_data"]) - - return acc.finalize() - - -class AccumulatedCSR(NamedTuple): - """ - Private. - - Return type for the _CSRAccumulator.finalize method. - Contains a sparse CSR's constituent elements. - """ - - data: npt.NDArray[np.number] - indptr: npt.NDArray[np.integer] - indices: npt.NDArray[np.integer] - shape: Tuple[int, int] - - def to_scipy(self) -> sparse.csr_matrix: - """Create a Scipy sparse.csr_matrix from component elements. - - Conceptually, this is identical to:: - - sparse.csr_matrix((data, indices, indptr), shape=shape) - - This ugliness is to bypass the O(N) scan that - :meth:`sparse._cs_matrix.__init__` - does when a new compressed matrix is created. - - See `SciPy bug 11496 ` - for details. - """ - matrix = sparse.csr_matrix.__new__(sparse.csr_matrix) - matrix.data = self.data - matrix.indptr = self.indptr - matrix.indices = self.indices - matrix._shape = self.shape - return matrix - - -class _CSRAccumulator: - """ - Fast accumulator of a CSR, based upon COO input. - """ - - def __init__( - self, - obs_joinids: pa.IntegerArray, - var_joinids: pa.IntegerArray, - pool: futures.Executor, - index_factory: types.IndexFactory, - ): - self.obs_joinids = obs_joinids - self.var_joinids = var_joinids - self.pool = pool - - self.shape: Tuple[int, int] = (len(self.obs_joinids), len(self.var_joinids)) - self.obs_indexer = index_factory(self.obs_joinids) - self.var_indexer = index_factory(self.var_joinids) - self.row_length: npt.NDArray[np.int64] = np.zeros( - (self.shape[0],), dtype=_select_dtype(self.shape[1]) - ) - - # COO accumulated chunks, stored as list of triples (row_ind, col_ind, data) - self.coo_chunks: List[ - Tuple[ - npt.NDArray[np.integer], # row_ind - npt.NDArray[np.integer], # col_ind - npt.NDArray[np.number], # data - ] - ] = [] - - def append( - self, - row_joinids: pa.Array, - col_joinids: pa.Array, - data: pa.Array, - ) -> None: - """ - At accumulation time, do several things: - - * re-index to positional indices, and if possible, cast to smaller dtype - to minimize memory footprint (at cost of some amount of time) - * accumulate column counts by row, i.e., build the basis of the indptr - * cache the tuple of data, row, col - """ - rows_future = self.pool.submit( - _reindex_and_cast, - self.obs_indexer, - row_joinids.to_numpy(), - _select_dtype(self.shape[0]), - ) - cols_future = self.pool.submit( - _reindex_and_cast, - self.var_indexer, - col_joinids.to_numpy(), - _select_dtype(self.shape[1]), - ) - row_ind = rows_future.result() - col_ind = cols_future.result() - self.coo_chunks.append((row_ind, col_ind, data.to_numpy())) - _accum_row_length(self.row_length, row_ind) - - def finalize(self) -> AccumulatedCSR: - nnz = sum(len(chunk[2]) for chunk in self.coo_chunks) - index_dtype = _select_dtype(nnz) - if nnz == 0: - # There is no way to infer matrix dtype, so use a default and return - # an empty matrix. Float32 is used as a default type, as it is most - # compatible with AnnData expectations. - empty = sparse.csr_matrix((0, 0), dtype=np.float32) - return AccumulatedCSR( - data=empty.data, - indptr=empty.indptr, - indices=empty.indices, - shape=self.shape, - ) - - # cumsum row lengths to get indptr - indptr = np.empty((self.shape[0] + 1,), dtype=index_dtype) - indptr[0:1] = 0 - np.cumsum(self.row_length, out=indptr[1:]) - - # Parallel copy of data and column indices - indices = np.empty((nnz,), dtype=index_dtype) - data = np.empty((nnz,), dtype=self.coo_chunks[0][2].dtype) - - # Empirically determined value. Needs to be large enough for reasonable - # concurrency, without excessive write cache conflict. Controls the - # number of rows that are processed in a single thread, and therefore - # is the primary tuning parameter related to concurrency. - row_rng_mask_bits = 18 - - n_jobs = (self.shape[0] >> row_rng_mask_bits) + 1 - chunk_list = numba.typed.List(self.coo_chunks) - futures.wait( - [ - self.pool.submit( - _copy_chunklist_range, - chunk_list, - data, - indices, - indptr, - row_rng_mask_bits, - job, - ) - for job in range(n_jobs) - ] - ) - _finalize_indptr(indptr) - return AccumulatedCSR( - data=data, indptr=indptr, indices=indices, shape=self.shape - ) - - -@numba.jit(nopython=True, nogil=True) # type: ignore[attr-defined] -def _accum_row_length( - row_length: npt.NDArray[np.int64], row_ind: npt.NDArray[np.int64] -) -> None: - for rind in row_ind: - row_length[rind] += 1 - - -@numba.jit(nopython=True, nogil=True) # type: ignore[attr-defined] -def _copy_chunk_range( - row_ind_chunk: npt.NDArray[np.signedinteger], - col_ind_chunk: npt.NDArray[np.signedinteger], - data_chunk: npt.NDArray[np.number], - data: npt.NDArray[np.number], - indices: npt.NDArray[np.signedinteger], - indptr: npt.NDArray[np.signedinteger], - row_rng_mask: int, - row_rng_val: int, -): - for n in range(len(data_chunk)): - row = row_ind_chunk[n] - if (row & row_rng_mask) != row_rng_val: - continue - ptr = indptr[row] - indices[ptr] = col_ind_chunk[n] - data[ptr] = data_chunk[n] - indptr[row] += 1 - - -@numba.jit(nopython=True, nogil=True) # type: ignore[attr-defined] -def _copy_chunklist_range( - chunk_list: numba.typed.List, - data: npt.NDArray[np.number], - indices: npt.NDArray[np.signedinteger], - indptr: npt.NDArray[np.signedinteger], - row_rng_mask_bits: int, - job: int, -): - assert row_rng_mask_bits >= 1 and row_rng_mask_bits < 64 - row_rng_mask = (2**64 - 1) >> row_rng_mask_bits << row_rng_mask_bits - row_rng_val = job << row_rng_mask_bits - for row_ind_chunk, col_ind_chunk, data_chunk in chunk_list: - _copy_chunk_range( - row_ind_chunk, - col_ind_chunk, - data_chunk, - data, - indices, - indptr, - row_rng_mask, - row_rng_val, - ) - - -@numba.jit(nopython=True, nogil=True) # type: ignore[attr-defined] -def _finalize_indptr(indptr: npt.NDArray[np.signedinteger]): - prev = 0 - for r in range(len(indptr)): - t = indptr[r] - indptr[r] = prev - prev = t - - -def _select_dtype( - maxval: int, -) -> Type[np.signedinteger]: - """ - Ascertain the "best" dtype for a zero-based index. Given our - goal of minimizing memory use, "best" is currently defined as - smallest. - """ - if maxval > np.iinfo(np.int32).max: - return np.int64 - else: - return np.int32 - - -def _reindex_and_cast( - index: types.IndexLike, ids: npt.NDArray[np.int64], target_dtype: npt.DTypeLike -) -> npt.NDArray[np.int64]: - return cast( - npt.NDArray[np.int64], index.get_indexer(ids).astype(target_dtype, copy=False) - ) diff --git a/python-spec/testing/test_eager_iter.py b/python-spec/testing/test_eager_iter.py deleted file mode 100644 index 87f74b9c..00000000 --- a/python-spec/testing/test_eager_iter.py +++ /dev/null @@ -1,64 +0,0 @@ -import threading -import unittest -from concurrent import futures -from unittest import mock - -from somacore.query import _eager_iter - - -class EagerIterTest(unittest.TestCase): - def setUp(self): - super().setUp() - self.kiddie_pool = futures.ThreadPoolExecutor(1) - """Tiny thread pool for testing.""" - self.verify_pool = futures.ThreadPoolExecutor(1) - """Separate thread pool so verification is not blocked.""" - - def tearDown(self): - self.verify_pool.shutdown(wait=False) - self.kiddie_pool.shutdown(wait=False) - super().tearDown() - - def test_thread_starvation(self): - sem = threading.Semaphore() - try: - # Monopolize the threadpool. - sem.acquire() - self.kiddie_pool.submit(sem.acquire) - eager = _eager_iter.EagerIterator(iter("abc"), pool=self.kiddie_pool) - got_a = self.verify_pool.submit(lambda: next(eager)) - self.assertEqual("a", got_a.result(0.1)) - got_b = self.verify_pool.submit(lambda: next(eager)) - self.assertEqual("b", got_b.result(0.1)) - got_c = self.verify_pool.submit(lambda: next(eager)) - self.assertEqual("c", got_c.result(0.1)) - with self.assertRaises(StopIteration): - self.verify_pool.submit(lambda: next(eager)).result(0.1) - finally: - sem.release() - - def test_nesting(self): - inner = _eager_iter.EagerIterator(iter("abc"), pool=self.kiddie_pool) - outer = _eager_iter.EagerIterator(inner, pool=self.kiddie_pool) - self.assertEqual( - "a, b, c", self.verify_pool.submit(", ".join, outer).result(0.1) - ) - - def test_exceptions(self): - flaky = mock.MagicMock() - flaky.__next__.side_effect = [1, 2, ValueError(), 3, 4] - - eager_flaky = _eager_iter.EagerIterator(flaky, pool=self.kiddie_pool) - got_1 = self.verify_pool.submit(lambda: next(eager_flaky)) - self.assertEqual(1, got_1.result(0.1)) - got_2 = self.verify_pool.submit(lambda: next(eager_flaky)) - self.assertEqual(2, got_2.result(0.1)) - with self.assertRaises(ValueError): - self.verify_pool.submit(lambda: next(eager_flaky)).result(0.1) - got_3 = self.verify_pool.submit(lambda: next(eager_flaky)) - self.assertEqual(3, got_3.result(0.1)) - got_4 = self.verify_pool.submit(lambda: next(eager_flaky)) - self.assertEqual(4, got_4.result(0.1)) - for _ in range(5): - with self.assertRaises(StopIteration): - self.verify_pool.submit(lambda: next(eager_flaky)).result(0.1)