diff --git a/src/cooler/cli/cload.py b/src/cooler/cli/cload.py index b2fa749..ded4b19 100644 --- a/src/cooler/cli/cload.py +++ b/src/cooler/cli/cload.py @@ -372,12 +372,6 @@ def pairix( @click.option( "--pos2", "-p2", help="pos2 field number (one-based)", type=int, required=True ) -@click.option( - "--chunksize", - help="Number of input lines to load at a time", - type=int, - default=int(15e6), -) @click.option( "--zero-based", "-0", @@ -433,6 +427,28 @@ def pairix( # "specify at least one input field for aggregation as an alternative.", # is_flag=True, # default=False) +@click.option( + "--chunksize", + "-c", + help="Size in number of lines/records of data chunks to read and process " + "from the input stream at a time. These chunks will be saved as temporary " + "partial coolers and then merged.", + type=int, + default=15_000_000, +) +@click.option( + "--mergebuf", + help="Total number of pixel records to buffer per epoch of merging data. " + "Defaults to the same value as `chunksize`.", + type=int, +) +@click.option( + "--max-merge", + help="Maximum number of chunks to merge in a single pass.", + type=int, + default=200, + show_default=True, +) @click.option( "--temp-dir", help="Create temporary files in a specified directory. Pass ``-`` to use " @@ -445,13 +461,6 @@ def pairix( is_flag=True, default=False, ) -@click.option( - "--max-merge", - help="Maximum number of chunks to merge before invoking recursive merging", - type=int, - default=200, - show_default=True, -) @click.option( "--storage-options", help="Options to modify the data filter pipeline. Provide as a " @@ -478,12 +487,13 @@ def pairs( cool_path, metadata, assembly, - chunksize, zero_based, comment_char, input_copy_status, no_symmetric_upper, field, + chunksize, + mergebuf, temp_dir, no_delete_temp, max_merge, @@ -501,6 +511,8 @@ def pairs( """ chromsizes, bins = parse_bins(bins) + if mergebuf is None: + mergebuf = chunksize symmetric_upper = not no_symmetric_upper tril_action = None @@ -639,7 +651,7 @@ def pairs( dtypes=output_field_dtypes, metadata=metadata, assembly=assembly, - mergebuf=chunksize, + mergebuf=mergebuf, max_merge=max_merge, temp_dir=temp_dir, delete_temp=not no_delete_temp, diff --git a/src/cooler/cli/load.py b/src/cooler/cli/load.py index 837695c..6717ac4 100644 --- a/src/cooler/cli/load.py +++ b/src/cooler/cli/load.py @@ -46,16 +46,6 @@ type=str, multiple=True, ) -@click.option( - "--chunksize", - "-c", - help="Size (in number of lines/records) of data chunks to read and process " - "from the input file at a time. These chunks will be saved as " - "temporary partial Coolers and merged at the end. Also specifies the " - "size of the buffer during the merge step.", - type=int, - default=int(20e6), -) @click.option( "--count-as-float", is_flag=True, @@ -99,12 +89,34 @@ "distinct, use the ``--no-symmetric-upper`` option instead. ", show_default=True, ) +@click.option( + "--chunksize", + "-c", + help="Size in number of lines/records of data chunks to read and process " + "from the input stream at a time. These chunks will be saved as temporary " + "partial coolers and then merged.", + type=int, + default=20_000_000, +) +@click.option( + "--mergebuf", + help="Total number of records to buffer per epoch of merging data. Defaults " + "to the same value as `chunksize`.", + type=int, +) @click.option( "--temp-dir", help="Create temporary files in a specified directory. Pass ``-`` to use " "the platform default temp dir.", type=click.Path(exists=True, file_okay=False, dir_okay=True, allow_dash=True), ) +@click.option( + "--max-merge", + help="Maximum number of chunks to merge in a single pass.", + type=int, + default=200, + show_default=True, +) @click.option( "--no-delete-temp", help="Do not delete temporary files when finished.", @@ -133,13 +145,15 @@ def load( format, metadata, assembly, - chunksize, field, count_as_float, one_based, comment_char, input_copy_status, no_symmetric_upper, + chunksize, + mergebuf, + max_merge, temp_dir, no_delete_temp, storage_options, @@ -178,6 +192,8 @@ def load( """ logger = get_logger(__name__) chromsizes, bins = parse_bins(bins_path) + if mergebuf is None: + mergebuf = chunksize symmetric_upper = not no_symmetric_upper tril_action = None @@ -334,10 +350,11 @@ def load( dtypes=output_field_dtypes, metadata=metadata, assembly=assembly, - mergebuf=chunksize, - ensure_sorted=False, + mergebuf=mergebuf, + max_merge=max_merge, temp_dir=temp_dir, delete_temp=not no_delete_temp, + ensure_sorted=False, # boundscheck=True, # dupcheck=True, triucheck=True if symmetric_upper else False, diff --git a/src/cooler/reduce.py b/src/cooler/reduce.py index 35442ba..77729d4 100644 --- a/src/cooler/reduce.py +++ b/src/cooler/reduce.py @@ -44,68 +44,91 @@ def merge_breakpoints( - indexes: list[np.ndarray], - maxbuf: int + indexes: list[np.ndarray | h5py.Dataset], + bufsize: int ) -> tuple[np.ndarray, np.ndarray]: """ - Partition k offset arrays for performing a k-way external merge, such that - no single merge pass loads more than ``maxbuf`` records into memory, with - one exception (see Notes). + Given ``k`` bin1_offset indexes, determine how to partition the data from + ``k`` corresponding pixel tables for a k-way merge. + + The paritition is a subsequence of bin1 IDs, defining the bounds of chunks + of data that will be loaded into memory from each table in a single "epoch" + of merging data. The bounds are calculated such that no single epoch will + load more than ``bufsize`` records into memory. + + However, the ``bufsize`` condition is not guaranteed and a warning will be + raised if it cannot be satisfied for one or more epochs (see Notes). Parameters ---------- - indexes : sequence of 1D arrays of equal length - These offset-array indexes map non-negative integers to their offset - locations in a corresponding data table - maxbuf : int - Maximum cumulative number of records loaded into memory for a single - merge pass + indexes : sequence of 1D array-like of equal length + Offset arrays that map bin1 IDs to their offset locations in a + corresponding pixel table. + bufsize : int + Maximum number of pixel records loaded into memory in a single merge + epoch. Returns ------- - breakpoints : 1D array - breakpoint locations to segment all the offset arrays - cum_offset : 1D array - cumulative number of records that will be processed at each breakpoint + bin1_partition : 1D array + Bin1 IDs defining where to partition all the tables for merging. + cum_nrecords : 1D array + Cumulative number of records (from all pixel tables combined) that will + be processed at each epoch. Notes ----- - The one exception to the post-condition is if any single increment of the - indexes maps to more than ``maxbuf`` records, these will produce - oversized chunks. - + The one exception to the post-condition is when a single bin1 increment in + a table contains more than ``bufsize`` records. """ - # k = len(indexes) - - # the virtual cumulative index if no pixels were merged - cumindex = np.vstack(indexes).sum(axis=0) - cum_start = 0 - cum_nnz = cumindex[-1] - # n = len(cumindex) - - breakpoints = [0] - cum_offsets = [0] + # This is a "virtual" cumulative index if all the tables were concatenated + # and sorted but no pixel records were aggregated. It helps us track how + # many records would be processed at each merge epoch. + # NOTE: We sum these incrementally in case the indexes are lazy to avoid + # loading all indexes into memory at once. + combined_index = np.zeros(indexes[0].shape) + for i in range(len(indexes)): + combined_index += indexes[i] + combined_start = 0 + combined_nnz = combined_index[-1] + + bin1_partition = [0] + cum_nrecords = [0] lo = 0 while True: - # find the next mark - hi = bisect_right(cumindex, min(cum_start + maxbuf, cum_nnz), lo=lo) - 1 + # Find the next bin1 ID from the combined index + hi = bisect_right( + combined_index, + min(combined_start + bufsize, combined_nnz), + lo=lo + ) - 1 + if hi == lo: - # number of records to nearest mark exceeds `maxbuf` - # check for oversized chunks afterwards + # This means number of records to nearest mark exceeds `bufsize`. + # Check for oversized chunks afterwards. hi += 1 - breakpoints.append(hi) - cum_offsets.append(cumindex[hi]) + bin1_partition.append(hi) + cum_nrecords.append(combined_index[hi]) - if cumindex[hi] == cum_nnz: + if combined_index[hi] == combined_nnz: break lo = hi - cum_start = cumindex[hi] + combined_start = combined_index[hi] - breakpoints = np.array(breakpoints) - cum_offsets = np.array(cum_offsets) - return breakpoints, cum_offsets + bin1_partition = np.array(bin1_partition) + cum_nrecords = np.array(cum_nrecords) + + nrecords_per_epoch = np.diff(cum_nrecords) + n_over = (nrecords_per_epoch > bufsize).sum() + if n_over > 0: + warnings.warn( + f"{n_over} merge epochs will require buffering more than {bufsize} " + f"pixel records, with as many as {nrecords_per_epoch.max():g}." + ) + + return bin1_partition, cum_nrecords class CoolerMerger(ContactBinner): @@ -117,12 +140,12 @@ class CoolerMerger(ContactBinner): def __init__( self, coolers: list[Cooler], - maxbuf: int, + mergebuf: int, columns: list[str] | None = None, agg: dict[str, Any] | None = None ): self.coolers = list(coolers) - self.maxbuf = maxbuf + self.mergebuf = mergebuf self.columns = ["count"] if columns is None else columns self.agg = {col: "sum" for col in self.columns} if agg is not None: @@ -145,18 +168,23 @@ def __init__( raise ValueError("Coolers must have same bin structure") def __iter__(self) -> Iterator[dict[str, np.ndarray]]: - indexes = [c._load_dset("indexes/bin1_offset") for c in self.coolers] - breakpoints, cum_offsets = merge_breakpoints(indexes, self.maxbuf) - chunksizes = np.diff(cum_offsets) - if chunksizes.max() > self.maxbuf: - warnings.warn(f"Some merge passes will use more than {self.maxbuf} pixels") + # Load bin1_offset indexes lazily. + indexes = [c.open("r")["indexes/bin1_offset"] for c in self.coolers] + + # Calculate the common partition of bin1 offsets that define the epochs + # of merging data. + bin1_partition, cum_nrecords = merge_breakpoints(indexes, self.mergebuf) + nrecords_per_epoch = np.diff(cum_nrecords) + logger.info(f"n_merge_epochs: {len(nrecords_per_epoch)}") + logger.debug(f"bin1_partition: {bin1_partition}") + logger.debug(f"nrecords_per_merge_epoch: {nrecords_per_epoch}") + nnzs = [len(c.pixels()) for c in self.coolers] logger.info(f"nnzs: {nnzs}") starts = [0] * len(self.coolers) - for bp in breakpoints[1:]: - stops = [index[bp] for index in indexes] - logger.info(f"current: {stops}") + for bin1_id in bin1_partition[1:]: + stops = [index[bin1_id] for index in indexes] # extract, concat combined = pd.concat( @@ -178,6 +206,7 @@ def __iter__(self) -> Iterator[dict[str, np.ndarray]]: yield {k: v.values for k, v in df.items()} + logger.info(f"records consumed: {stops}") starts = stops @@ -265,7 +294,7 @@ def merge_coolers( bins = clrs[0].bins()[["chrom", "start", "end"]][:] assembly = clrs[0].info.get("genome-assembly", None) - iterator = CoolerMerger(clrs, maxbuf=mergebuf, columns=columns, agg=agg) + iterator = CoolerMerger(clrs, mergebuf=mergebuf, columns=columns, agg=agg) create( output_uri, diff --git a/tests/test_create_ingest.py b/tests/test_create_ingest.py index a0e5752..99a8f9c 100644 --- a/tests/test_create_ingest.py +++ b/tests/test_create_ingest.py @@ -301,17 +301,18 @@ def test_cload_pairs(bins_path, pairs_path, ref_path): kwargs = dict( metadata=None, assembly="hg19", - chunksize=int(15e6), zero_based=False, comment_char="#", input_copy_status="unique", no_symmetric_upper=False, field=(), + chunksize=int(15e6), + mergebuf=int(20e6), + max_merge=200, temp_dir=None, no_delete_temp=False, storage_options=None, no_count=False, - max_merge=200, chrom1=2, pos1=3, chrom2=4, @@ -342,16 +343,17 @@ def test_cload_field(bins_path, pairs_path): kwargs = dict( metadata=None, assembly="toy", - chunksize=10, zero_based=False, comment_char="#", input_copy_status="unique", no_symmetric_upper=False, + chunksize=10, + mergebuf=10, + max_merge=200, temp_dir=None, no_delete_temp=False, storage_options=None, no_count=True, - max_merge=200, chrom1=2, pos1=3, chrom2=4, @@ -383,17 +385,18 @@ def test_cload_custom_tempdir(bins_path, pairs_path): testcool_path, metadata=None, assembly="toy", - chunksize=10, zero_based=False, comment_char="#", input_copy_status="unique", no_symmetric_upper=False, field=(), + chunksize=10, + mergebuf=10, temp_dir=temp_dir, no_delete_temp=False, + max_merge=200, storage_options=None, no_count=True, - max_merge=200, chrom1=2, pos1=3, chrom2=4, @@ -411,6 +414,8 @@ def test_load_bg2_vs_coo(): metadata=None, assembly="hg19", chunksize=int(20e6), + mergebuf=int(15e6), + max_merge=200, field=(), count_as_float=False, one_based=False, @@ -458,6 +463,8 @@ def test_load_zero_one_based_bg2(): metadata=None, assembly="toy", chunksize=10, + mergebuf=10, + max_merge=200, field=(), count_as_float=False, comment_char="#", @@ -507,6 +514,8 @@ def test_load_zero_one_based_coo(): metadata=None, assembly="toy", chunksize=10, + mergebuf=10, + max_merge=200, field=(), count_as_float=False, comment_char="#", @@ -566,6 +575,7 @@ def test_cload_append_mode(): metadata=None, assembly="hg19", chunksize=int(15e6), + mergebuf=int(15e6), zero_based=False, comment_char="#", input_copy_status="unique", @@ -608,6 +618,8 @@ def test_load_append_mode(): metadata=None, assembly="hg19", chunksize=int(20e6), + mergebuf=int(20e6), + max_merge=200, field=(), count_as_float=False, one_based=False, @@ -647,6 +659,8 @@ def test_load_custom_tempdir(): metadata=None, assembly="hg19", chunksize=int(20e6), + mergebuf=int(20e6), + max_merge=200, field=(), count_as_float=False, one_based=False,