diff --git a/adapter_pipelines/Optimus/adapter.wdl b/adapter_pipelines/Optimus/adapter.wdl index 0c93ba60..27936af3 100644 --- a/adapter_pipelines/Optimus/adapter.wdl +++ b/adapter_pipelines/Optimus/adapter.wdl @@ -138,7 +138,7 @@ workflow AdapterOptimus { Boolean record_http = false Boolean add_md5s = false - String pipeline_tools_version = "v0.56.2" + String pipeline_tools_version = "v0.56.3" call GetInputs as prep { input: diff --git a/adapter_pipelines/cellranger/adapter.wdl b/adapter_pipelines/cellranger/adapter.wdl index 806726f7..a19ec5a5 100644 --- a/adapter_pipelines/cellranger/adapter.wdl +++ b/adapter_pipelines/cellranger/adapter.wdl @@ -148,7 +148,7 @@ workflow Adapter10xCount { Boolean record_http = false Boolean add_md5s = false - String pipeline_tools_version = "v0.56.2" + String pipeline_tools_version = "v0.56.3" call GetInputs { input: diff --git a/adapter_pipelines/ss2_single_end/adapter.wdl b/adapter_pipelines/ss2_single_end/adapter.wdl index 0228736b..2e7f5c06 100644 --- a/adapter_pipelines/ss2_single_end/adapter.wdl +++ b/adapter_pipelines/ss2_single_end/adapter.wdl @@ -69,7 +69,7 @@ workflow AdapterSmartSeq2SingleCellUnpaired { Boolean record_http = false Boolean add_md5s = false - String pipeline_tools_version = "v0.56.2" + String pipeline_tools_version = "v0.56.3" call GetInputs as prep { input: diff --git a/adapter_pipelines/ss2_single_sample/adapter.wdl b/adapter_pipelines/ss2_single_sample/adapter.wdl index 79d60b33..9145357d 100644 --- a/adapter_pipelines/ss2_single_sample/adapter.wdl +++ b/adapter_pipelines/ss2_single_sample/adapter.wdl @@ -69,7 +69,7 @@ workflow AdapterSmartSeq2SingleCell{ Boolean record_http = false Boolean add_md5s = false - String pipeline_tools_version = "v0.56.2" + String pipeline_tools_version = "v0.56.3" call GetInputs as prep { input: diff --git a/pipeline_tools/pipelines/optimus/optimus.py b/pipeline_tools/pipelines/optimus/optimus.py index 8f034528..7cafbd8d 100644 --- a/pipeline_tools/pipelines/optimus/optimus.py +++ b/pipeline_tools/pipelines/optimus/optimus.py @@ -1,6 +1,5 @@ from pipeline_tools.shared import metadata_utils from pipeline_tools.shared import tenx_utils -from pipeline_tools.shared.http_requests import HttpRequests from pipeline_tools.shared.reference_id import ReferenceId @@ -18,13 +17,11 @@ } -def get_optimus_inputs(uuid, version, dss_url): - """Gather the necessary inputs for Optimus from the bundle metadata. +def get_optimus_inputs(primary_bundle): + """Gather the necessary inputs for Optimus from the bundle metadata object. Args: - bundle_uuid (str): the bundle uuid. - bundle_version (str): the bundle version. - dss_url (str): the url for the DCP Data Storage Service. + humancellatlas.data.metadata.Bundle (obj): A bundle metadata object. Returns: tuple: tuple of the sample_id, ncbi_taxon_id, dict mapping flow cell lane indices @@ -33,10 +30,6 @@ def get_optimus_inputs(uuid, version, dss_url): Raises: requests.HTTPError: on 4xx errors or 5xx errors beyond the timeout """ - print(f"Getting bundle manifest for id {uuid}, version {version}") - primary_bundle = metadata_utils.get_bundle_metadata( - uuid=uuid, version=version, dss_url=dss_url, http_requests=HttpRequests() - ) sample_id = metadata_utils.get_sample_id(primary_bundle) ncbi_taxon_id = metadata_utils.get_ncbi_taxon_id(primary_bundle) fastq_files = primary_bundle.sequencing_output @@ -62,9 +55,11 @@ def get_optimus_inputs_to_hash(uuid, version, dss_url): requests.HTTPError: on 4xx errors or 5xx errors beyond the timeout """ - sample_id, ncbi_taxon_id, lane_to_fastqs = get_optimus_inputs( - uuid, version, dss_url + print(f"Getting bundle manifest for id {uuid}, version {version}") + primary_bundle = metadata_utils.get_bundle_metadata( + uuid=uuid, version=version, dss_url=dss_url, directurls=False ) + sample_id, ncbi_taxon_id, lane_to_fastqs = get_optimus_inputs(primary_bundle) sorted_lanes = sorted(lane_to_fastqs.keys(), key=int) file_hashes = '' for lane in sorted_lanes: @@ -99,9 +94,11 @@ def create_optimus_input_tsv(uuid, version, dss_url): Raises: tenx_utils.LaneMissingFileError if any non-optional fastqs are missing """ - sample_id, ncbi_taxon_id, lane_to_fastqs = get_optimus_inputs( - uuid, version, dss_url + print(f"Getting bundle manifest for id {uuid}, version {version}") + primary_bundle = metadata_utils.get_bundle_metadata( + uuid=uuid, version=version, dss_url=dss_url, directurls=True ) + sample_id, ncbi_taxon_id, lane_to_fastqs = get_optimus_inputs(primary_bundle) # Stop if any fastqs are missing tenx_utils.validate_lanes(lane_to_fastqs) diff --git a/pipeline_tools/pipelines/smartseq2/smartseq2.py b/pipeline_tools/pipelines/smartseq2/smartseq2.py index 5bee3e7e..2a932f4d 100644 --- a/pipeline_tools/pipelines/smartseq2/smartseq2.py +++ b/pipeline_tools/pipelines/smartseq2/smartseq2.py @@ -18,13 +18,11 @@ } -def get_ss2_paired_end_inputs(bundle_uuid, bundle_version, dss_url): - """Gather the necessary inputs for ss2 from the bundle metadata. +def get_ss2_paired_end_inputs(primary_bundle): + """Gather the necessary inputs for ss2 from the bundle metadata object. Args: - bundle_uuid (str): the bundle uuid. - bundle_version (str): the bundle version. - dss_url (str): the url for the DCP Data Storage Service. + humancellatlas.data.metadata.Bundle (obj): A bundle metadata object. Returns: tuple: tuple of the sample_id, ncbi_taxon_id, fastq1_manifest object and fastq2_manifest object @@ -32,17 +30,6 @@ def get_ss2_paired_end_inputs(bundle_uuid, bundle_version, dss_url): Raises: requests.HTTPError: on 4xx errors or 5xx errors beyond the timeout """ - print( - "Getting bundle manifest for id {0}, version {1}".format( - bundle_uuid, bundle_version - ) - ) - primary_bundle = metadata_utils.get_bundle_metadata( - uuid=bundle_uuid, - version=bundle_version, - dss_url=dss_url, - http_requests=HttpRequests(), - ) sample_id = metadata_utils.get_sample_id(primary_bundle) ncbi_taxon_id = metadata_utils.get_ncbi_taxon_id(primary_bundle) fastq1_manifest, fastq2_manifest = get_fastq_manifest_entry_for_ss2(primary_bundle) @@ -50,8 +37,16 @@ def get_ss2_paired_end_inputs(bundle_uuid, bundle_version, dss_url): def get_ss2_paired_end_inputs_to_hash(bundle_uuid, bundle_version, dss_url): + print( + "Getting bundle manifest for id {0}, version {1}".format( + bundle_uuid, bundle_version + ) + ) + primary_bundle = metadata_utils.get_bundle_metadata( + uuid=bundle_uuid, version=bundle_version, dss_url=dss_url, directurls=False + ) sample_id, ncbi_taxon_id, fastq1_manifest, fastq2_manifest = get_ss2_paired_end_inputs( - bundle_uuid, bundle_version, dss_url + primary_bundle ) fastq1_hashes = metadata_utils.get_hashes_from_file_manifest(fastq1_manifest) fastq2_hashes = metadata_utils.get_hashes_from_file_manifest(fastq2_manifest) @@ -77,10 +72,17 @@ def create_ss2_input_tsv( Raises: requests.HTTPError: for 4xx errors or 5xx errors beyond the timeout """ + print( + "Getting bundle manifest for id {0}, version {1}".format( + bundle_uuid, bundle_version + ) + ) + primary_bundle = metadata_utils.get_bundle_metadata( + uuid=bundle_uuid, version=bundle_version, dss_url=dss_url, directurls=True + ) sample_id, ncbi_taxon_id, fastq1_manifest, fastq2_manifest = get_ss2_paired_end_inputs( - bundle_uuid, bundle_version, dss_url + primary_bundle ) - tsv_headers = ['fastq_1', 'fastq_2', 'sample_id'] tsv_values = [fastq1_manifest.url, fastq2_manifest.url, sample_id] @@ -191,10 +193,7 @@ def _get_content_for_ss2_se_input_tsv( ) ) primary_bundle = metadata_utils.get_bundle_metadata( - uuid=bundle_uuid, - version=bundle_version, - dss_url=dss_url, - http_requests=http_requests, + uuid=bundle_uuid, version=bundle_version, dss_url=dss_url ) sample_id = metadata_utils.get_sample_id(primary_bundle) diff --git a/pipeline_tools/shared/dcp_utils.py b/pipeline_tools/shared/dcp_utils.py index 0f20669e..c6a293c6 100644 --- a/pipeline_tools/shared/dcp_utils.py +++ b/pipeline_tools/shared/dcp_utils.py @@ -1,75 +1,6 @@ -import logging - from pipeline_tools.shared.http_requests import HttpRequests # noqa -def get_file_by_uuid(file_id, dss_url, http_requests): - """Retrieve a JSON file from the Human Cell Atlas data storage service by its id. - Retry with exponentially increasing wait times between requests if there are any failures. - - Args: - file_id (str): the id of the file to retrieve. - dss_url (str): the url for the HCA data storage service, e.g. "https://dss.staging.data.humancellatlas.org/v1". - http_requests (HttpRequests): the HttpRequests object to use - - Returns: - dict: dict representing the contents of the JSON file - - Raises: - requests.HTTPError: for 4xx errors or 5xx errors beyond timeout - """ - url = '{dss_url}/files/{file_id}?replica=gcp'.format( - dss_url=dss_url, file_id=file_id - ) - logging.info('GET {0}'.format(url)) - response = http_requests.get(url) - logging.info(response.status_code) - logging.info(response.text) - return response.json() - - -def get_manifest(bundle_uuid, bundle_version, dss_url, http_requests): - """Retrieve manifest JSON file for a given bundle uuid and version. - - Retry with exponentially increasing wait times between requests if there are any failures. - - TODO: Reduce the number of lines of code by switching to use DSS Python API client. - - Instead of talking to the DSS API directly, using the DSS Python API can avoid a lot of potential issues, - especially those related to the Checkout Service. A simple example of using the DSS Python client and the - metadata-api to get the manifest would be: - - ```python - from humancellatlas.data.metadata.helpers.dss import download_bundle_metadata, dss_client - - client = dss_client() - version, manifest, metadata_files = download_bundle_metadata(client, 'gcp', bundle_uuid, directurls=True) - ``` - - Args: - bundle_uuid (str): the uuid of the bundle - bundle_version (str): the bundle version, e.g. "2017-10-23T17:50:26.894Z" - dss_url (str): The url for the Human Cell Atlas data storage service, - e.g. "https://dss.staging.data.humancellatlas.org/v1" - http_requests (HttpRequests): the HttpRequests object to use - - Returns: - dict: A dict representing the full bundle manifest, with `directurls` for each file. - - Raises: - requests.HTTPError: for 4xx errors or 5xx errors beyond timeout - """ - url = '{dss_url}/bundles/{bundle_uuid}?version={bundle_version}&replica=gcp&directurls=true'.format( - dss_url=dss_url, bundle_uuid=bundle_uuid, bundle_version=bundle_version - ) - logging.info('GET {0}'.format(url)) - response = http_requests.get(url) - logging.info(response.status_code) - logging.info(response.text) - manifest = response.json() - return manifest - - def get_auth_token( http_requests, url="https://danielvaughan.eu.auth0.com/oauth/token", diff --git a/pipeline_tools/shared/metadata_utils.py b/pipeline_tools/shared/metadata_utils.py index 9a4ad3fe..1131f8f9 100644 --- a/pipeline_tools/shared/metadata_utils.py +++ b/pipeline_tools/shared/metadata_utils.py @@ -1,35 +1,27 @@ -from pipeline_tools.shared import dcp_utils from humancellatlas.data.metadata.api import Bundle, CellSuspension -from pipeline_tools.shared.http_requests import HttpRequests +from humancellatlas.data.metadata.helpers.dss import ( + download_bundle_metadata, + dss_client, +) from pipeline_tools.shared.exceptions import UnsupportedOrganismException -import functools -from concurrent.futures import ThreadPoolExecutor -def get_bundle_metadata(uuid, version, dss_url, http_requests): +def get_bundle_metadata(uuid, version, dss_url, directurls=False): """Factory function to create a `humancellatlas.data.metadata.Bundle` object from bundle information and manifest. Args: bundle_uuid (str): The bundle uuid. bundle_version (str): The bundle version. dss_url (str): Url of Data Storage System to query - http_requests (HttpRequests): An HttpRequests object. Returns: humancellatlas.data.metadata.Bundle: A bundle metadata object. """ - manifest = dcp_utils.get_manifest( - bundle_uuid=uuid, - bundle_version=version, - dss_url=dss_url, - http_requests=http_requests, - )['bundle']['files'] - - metadata_files_dict = {f['name']: f for f in manifest if f['indexed']} - metadata_files = get_metadata_files( - metadata_files_dict=metadata_files_dict, dss_url=dss_url + dss_environment = dss_url.split('.')[1] + client = dss_client(deployment=dss_environment) + version, manifest, metadata_files = download_bundle_metadata( + client=client, replica='gcp', uuid=uuid, version=version, directurls=directurls ) - return Bundle( uuid=uuid, version=version, manifest=manifest, metadata_files=metadata_files ) @@ -73,67 +65,6 @@ def get_ncbi_taxon_id(bundle: Bundle): return first_taxon_id -def download_file(item, dss_url, http_requests=HttpRequests()): - """Download the metadata for a given bundle from the HCA data store (DSS). - - This function borrows a lot of existing code from the `metadata-api` code base for consistency, - and this won't be required after migrating to use the HCA DSS Python API `dss_client` directly. - - Args: - item (typing.ItemsView): A dictionary's ItemsView object consisting of file_name and the manifest_entry. - dss_url (str): The url for the DCP Data Storage Service. - http_requests (HttpRequests): The HttpRequests object to use. - """ - file_name, manifest_entry = item - file_uuid = manifest_entry['uuid'] - return ( - file_name, - dcp_utils.get_file_by_uuid( - file_id=file_uuid, dss_url=dss_url, http_requests=http_requests - ), - ) - - -def get_metadata_files(metadata_files_dict, dss_url, num_workers=None): - """Get the dictionary mapping the file name of each metadata file in the bundle to the JSON contents of that file. - - This function by default uses concurrent threads to accelerate the communication with Data Store service. - - Args: - metadata_files_dict (dict): A dictionary maps filename to indexed file content among the bundle manifest, - this will only be used for preparing the metadata_files dictionary. - dss_url (str): The url for the DCP Data Storage Service. - num_workers(int or None): The size of the thread pool to use for downloading metadata files in parallel. - If None, the default pool size will be used, typically a small multiple of the number of cores - on the system executing this function. If 0, no thread pool will be used and all files will be - downloaded sequentially by the current thread. - - Returns: - metadata_files (dict): A dictionary mapping the file name of each metadata file in the bundle to the JSON - contents of that file. - """ - if num_workers == 0: - metadata_files = dict( - map( - functools.partial( - download_file, dss_url=dss_url, http_requests=HttpRequests() - ), - metadata_files_dict.items(), - ) - ) - else: - with ThreadPoolExecutor(num_workers) as tpe: - metadata_files = dict( - tpe.map( - functools.partial( - download_file, dss_url=dss_url, http_requests=HttpRequests() - ), - metadata_files_dict.items(), - ) - ) - return metadata_files - - def get_hashes_from_file_manifest(file_manifest): """ Return a string that is a concatenation of the file hashes provided in the bundle manifest entry for a file: {sha1}{sha256}{s3_etag}{crc32c} diff --git a/pipeline_tools/tests/shared/test_dcp_utils.py b/pipeline_tools/tests/shared/test_dcp_utils.py index 8315b643..ea3a6f75 100644 --- a/pipeline_tools/tests/shared/test_dcp_utils.py +++ b/pipeline_tools/tests/shared/test_dcp_utils.py @@ -1,5 +1,4 @@ import pytest -import requests from pipeline_tools.shared import dcp_utils from pipeline_tools.shared.http_requests import HttpRequests @@ -23,64 +22,6 @@ class Data: class TestDCPUtils(object): - def test_get_file_by_uuid(self, requests_mock, test_data): - expect_file = {"file": "test", "id": test_data.FILE_ID} - url = '{dss_url}/files/{file_id}?replica=gcp'.format( - dss_url=test_data.DSS_URL, file_id=test_data.FILE_ID - ) - - def _request_callback(request, context): - context.status_code = 200 - return expect_file - - requests_mock.get(url, json=_request_callback) - - with HttpRequestsManager(): - json_response = dcp_utils.get_file_by_uuid( - test_data.FILE_ID, test_data.DSS_URL, HttpRequests() - ) - - assert json_response['file'] == expect_file['file'] - - assert requests_mock.call_count == 1 - - def test_get_file_by_uuid_retries_on_error(self, requests_mock, test_data): - url = '{dss_url}/files/{file_id}?replica=gcp'.format( - dss_url=test_data.DSS_URL, file_id=test_data.FILE_ID - ) - - def _request_callback(request, context): - context.status_code = 500 - return {'status': 'error', 'message': 'Internal Server Error'} - - requests_mock.get(url, json=_request_callback) - with pytest.raises(requests.HTTPError), HttpRequestsManager(): - dcp_utils.get_file_by_uuid( - test_data.FILE_ID, test_data.DSS_URL, HttpRequests() - ) - assert requests_mock.call_count == 3 - - def test_get_manifest_retries_on_error(self, requests_mock, test_data): - url = '{dss_url}/bundles/{bundle_uuid}?version={bundle_version}&replica=gcp&directurls=true'.format( - dss_url=test_data.DSS_URL, - bundle_uuid=test_data.BUNDLE_UUID, - bundle_version=test_data.BUNDLE_VERSION, - ) - - def _request_callback(request, context): - context.status_code = 500 - return {'status': 'error', 'message': 'Internal Server Error'} - - requests_mock.get(url, json=_request_callback) - with pytest.raises(requests.HTTPError), HttpRequestsManager(): - dcp_utils.get_manifest( - test_data.BUNDLE_UUID, - test_data.BUNDLE_VERSION, - test_data.DSS_URL, - HttpRequests(), - ) - assert requests_mock.call_count == 3 - def test_get_auth_token(self, requests_mock, test_data): url = "https://test.auth0" diff --git a/pipeline_tools/tests/shared/test_metadata_utils.py b/pipeline_tools/tests/shared/test_metadata_utils.py index 1d88cc2e..fe82ae85 100644 --- a/pipeline_tools/tests/shared/test_metadata_utils.py +++ b/pipeline_tools/tests/shared/test_metadata_utils.py @@ -1,13 +1,8 @@ import json import os import pytest - from humancellatlas.data.metadata.api import Bundle, ManifestEntry - - from pipeline_tools.shared import metadata_utils -from pipeline_tools.shared.http_requests import HttpRequests -from pipeline_tools.tests.http_requests_manager import HttpRequestsManager from pathlib import Path @@ -145,34 +140,6 @@ def test_get_ncbi_taxon_id(self, test_ss2_bundle_vx): ncbi_taxon_id = metadata_utils.get_ncbi_taxon_id(test_ss2_bundle_vx) assert ncbi_taxon_id == 9606 - def test_download_file(self, requests_mock, test_ss2_bundle_manifest_vx): - manifest_dict = {'project.json': test_ss2_bundle_manifest_vx[0]} - item = tuple(manifest_dict.items())[ - 0 - ] # to test this func without calling `map`, we must convert typing here - dss_url = 'https://dss.mock.org/v0' - file_id = test_ss2_bundle_manifest_vx[0]['uuid'] - - expect_result_from_dcp_utils = {'file': 'test', 'id': file_id} - url = '{dss_url}/files/{file_id}?replica=gcp'.format( - dss_url=dss_url, file_id=file_id - ) - - def _request_callback(request, context): - context.status_code = 200 - return expect_result_from_dcp_utils - - requests_mock.get(url, json=_request_callback) - - with HttpRequestsManager(): - file_name, file_response_js = metadata_utils.download_file( - item=item, dss_url=dss_url, http_requests=HttpRequests() - ) - - assert file_name == 'project.json' - assert file_response_js['file'] == expect_result_from_dcp_utils['file'] - assert requests_mock.call_count == 1 - def test_get_hashes_from_file_manifest(self, test_fastq_file_manifest): file_hashes = metadata_utils.get_hashes_from_file_manifest( test_fastq_file_manifest