From 8ee0184815ac3ec194d7e8b4285807b75295f91c Mon Sep 17 00:00:00 2001 From: Saman Ehsan Date: Mon, 19 Aug 2019 18:38:21 -0400 Subject: [PATCH] Remove data file checkout when only retrieving metadata (#162) * Only checkout metadata to get inputs Because generating a hash of the workflow input metadata only requires certain metadata fields, remove the checkout of data files. Note, the data file checkout is still required in the pipeline getInputs step to get gs urls to the input fastqs. * Use metadata-api library to download metadata Many of the functions in pipeline-tools are also available in the metadata-api library, so we can simplify the pipeline-tools code by using that functionality. * Fix function calls * Update pipeline-tools image for testing * Update pipeline-tools version --- adapter_pipelines/Optimus/adapter.wdl | 2 +- adapter_pipelines/cellranger/adapter.wdl | 2 +- adapter_pipelines/ss2_single_end/adapter.wdl | 2 +- .../ss2_single_sample/adapter.wdl | 2 +- pipeline_tools/pipelines/optimus/optimus.py | 25 +++--- .../pipelines/smartseq2/smartseq2.py | 45 +++++----- pipeline_tools/shared/dcp_utils.py | 69 --------------- pipeline_tools/shared/metadata_utils.py | 87 ++----------------- pipeline_tools/tests/shared/test_dcp_utils.py | 59 ------------- .../tests/shared/test_metadata_utils.py | 33 ------- 10 files changed, 46 insertions(+), 280 deletions(-) diff --git a/adapter_pipelines/Optimus/adapter.wdl b/adapter_pipelines/Optimus/adapter.wdl index 0c93ba60..27936af3 100644 --- a/adapter_pipelines/Optimus/adapter.wdl +++ b/adapter_pipelines/Optimus/adapter.wdl @@ -138,7 +138,7 @@ workflow AdapterOptimus { Boolean record_http = false Boolean add_md5s = false - String pipeline_tools_version = "v0.56.2" + String pipeline_tools_version = "v0.56.3" call GetInputs as prep { input: diff --git a/adapter_pipelines/cellranger/adapter.wdl b/adapter_pipelines/cellranger/adapter.wdl index 806726f7..a19ec5a5 100644 --- a/adapter_pipelines/cellranger/adapter.wdl +++ b/adapter_pipelines/cellranger/adapter.wdl @@ -148,7 +148,7 @@ workflow Adapter10xCount { Boolean record_http = false Boolean add_md5s = false - String pipeline_tools_version = "v0.56.2" + String pipeline_tools_version = "v0.56.3" call GetInputs { input: diff --git a/adapter_pipelines/ss2_single_end/adapter.wdl b/adapter_pipelines/ss2_single_end/adapter.wdl index 0228736b..2e7f5c06 100644 --- a/adapter_pipelines/ss2_single_end/adapter.wdl +++ b/adapter_pipelines/ss2_single_end/adapter.wdl @@ -69,7 +69,7 @@ workflow AdapterSmartSeq2SingleCellUnpaired { Boolean record_http = false Boolean add_md5s = false - String pipeline_tools_version = "v0.56.2" + String pipeline_tools_version = "v0.56.3" call GetInputs as prep { input: diff --git a/adapter_pipelines/ss2_single_sample/adapter.wdl b/adapter_pipelines/ss2_single_sample/adapter.wdl index 79d60b33..9145357d 100644 --- a/adapter_pipelines/ss2_single_sample/adapter.wdl +++ b/adapter_pipelines/ss2_single_sample/adapter.wdl @@ -69,7 +69,7 @@ workflow AdapterSmartSeq2SingleCell{ Boolean record_http = false Boolean add_md5s = false - String pipeline_tools_version = "v0.56.2" + String pipeline_tools_version = "v0.56.3" call GetInputs as prep { input: diff --git a/pipeline_tools/pipelines/optimus/optimus.py b/pipeline_tools/pipelines/optimus/optimus.py index 8f034528..7cafbd8d 100644 --- a/pipeline_tools/pipelines/optimus/optimus.py +++ b/pipeline_tools/pipelines/optimus/optimus.py @@ -1,6 +1,5 @@ from pipeline_tools.shared import metadata_utils from pipeline_tools.shared import tenx_utils -from pipeline_tools.shared.http_requests import HttpRequests from pipeline_tools.shared.reference_id import ReferenceId @@ -18,13 +17,11 @@ } -def get_optimus_inputs(uuid, version, dss_url): - """Gather the necessary inputs for Optimus from the bundle metadata. +def get_optimus_inputs(primary_bundle): + """Gather the necessary inputs for Optimus from the bundle metadata object. Args: - bundle_uuid (str): the bundle uuid. - bundle_version (str): the bundle version. - dss_url (str): the url for the DCP Data Storage Service. + humancellatlas.data.metadata.Bundle (obj): A bundle metadata object. Returns: tuple: tuple of the sample_id, ncbi_taxon_id, dict mapping flow cell lane indices @@ -33,10 +30,6 @@ def get_optimus_inputs(uuid, version, dss_url): Raises: requests.HTTPError: on 4xx errors or 5xx errors beyond the timeout """ - print(f"Getting bundle manifest for id {uuid}, version {version}") - primary_bundle = metadata_utils.get_bundle_metadata( - uuid=uuid, version=version, dss_url=dss_url, http_requests=HttpRequests() - ) sample_id = metadata_utils.get_sample_id(primary_bundle) ncbi_taxon_id = metadata_utils.get_ncbi_taxon_id(primary_bundle) fastq_files = primary_bundle.sequencing_output @@ -62,9 +55,11 @@ def get_optimus_inputs_to_hash(uuid, version, dss_url): requests.HTTPError: on 4xx errors or 5xx errors beyond the timeout """ - sample_id, ncbi_taxon_id, lane_to_fastqs = get_optimus_inputs( - uuid, version, dss_url + print(f"Getting bundle manifest for id {uuid}, version {version}") + primary_bundle = metadata_utils.get_bundle_metadata( + uuid=uuid, version=version, dss_url=dss_url, directurls=False ) + sample_id, ncbi_taxon_id, lane_to_fastqs = get_optimus_inputs(primary_bundle) sorted_lanes = sorted(lane_to_fastqs.keys(), key=int) file_hashes = '' for lane in sorted_lanes: @@ -99,9 +94,11 @@ def create_optimus_input_tsv(uuid, version, dss_url): Raises: tenx_utils.LaneMissingFileError if any non-optional fastqs are missing """ - sample_id, ncbi_taxon_id, lane_to_fastqs = get_optimus_inputs( - uuid, version, dss_url + print(f"Getting bundle manifest for id {uuid}, version {version}") + primary_bundle = metadata_utils.get_bundle_metadata( + uuid=uuid, version=version, dss_url=dss_url, directurls=True ) + sample_id, ncbi_taxon_id, lane_to_fastqs = get_optimus_inputs(primary_bundle) # Stop if any fastqs are missing tenx_utils.validate_lanes(lane_to_fastqs) diff --git a/pipeline_tools/pipelines/smartseq2/smartseq2.py b/pipeline_tools/pipelines/smartseq2/smartseq2.py index 5bee3e7e..2a932f4d 100644 --- a/pipeline_tools/pipelines/smartseq2/smartseq2.py +++ b/pipeline_tools/pipelines/smartseq2/smartseq2.py @@ -18,13 +18,11 @@ } -def get_ss2_paired_end_inputs(bundle_uuid, bundle_version, dss_url): - """Gather the necessary inputs for ss2 from the bundle metadata. +def get_ss2_paired_end_inputs(primary_bundle): + """Gather the necessary inputs for ss2 from the bundle metadata object. Args: - bundle_uuid (str): the bundle uuid. - bundle_version (str): the bundle version. - dss_url (str): the url for the DCP Data Storage Service. + humancellatlas.data.metadata.Bundle (obj): A bundle metadata object. Returns: tuple: tuple of the sample_id, ncbi_taxon_id, fastq1_manifest object and fastq2_manifest object @@ -32,17 +30,6 @@ def get_ss2_paired_end_inputs(bundle_uuid, bundle_version, dss_url): Raises: requests.HTTPError: on 4xx errors or 5xx errors beyond the timeout """ - print( - "Getting bundle manifest for id {0}, version {1}".format( - bundle_uuid, bundle_version - ) - ) - primary_bundle = metadata_utils.get_bundle_metadata( - uuid=bundle_uuid, - version=bundle_version, - dss_url=dss_url, - http_requests=HttpRequests(), - ) sample_id = metadata_utils.get_sample_id(primary_bundle) ncbi_taxon_id = metadata_utils.get_ncbi_taxon_id(primary_bundle) fastq1_manifest, fastq2_manifest = get_fastq_manifest_entry_for_ss2(primary_bundle) @@ -50,8 +37,16 @@ def get_ss2_paired_end_inputs(bundle_uuid, bundle_version, dss_url): def get_ss2_paired_end_inputs_to_hash(bundle_uuid, bundle_version, dss_url): + print( + "Getting bundle manifest for id {0}, version {1}".format( + bundle_uuid, bundle_version + ) + ) + primary_bundle = metadata_utils.get_bundle_metadata( + uuid=bundle_uuid, version=bundle_version, dss_url=dss_url, directurls=False + ) sample_id, ncbi_taxon_id, fastq1_manifest, fastq2_manifest = get_ss2_paired_end_inputs( - bundle_uuid, bundle_version, dss_url + primary_bundle ) fastq1_hashes = metadata_utils.get_hashes_from_file_manifest(fastq1_manifest) fastq2_hashes = metadata_utils.get_hashes_from_file_manifest(fastq2_manifest) @@ -77,10 +72,17 @@ def create_ss2_input_tsv( Raises: requests.HTTPError: for 4xx errors or 5xx errors beyond the timeout """ + print( + "Getting bundle manifest for id {0}, version {1}".format( + bundle_uuid, bundle_version + ) + ) + primary_bundle = metadata_utils.get_bundle_metadata( + uuid=bundle_uuid, version=bundle_version, dss_url=dss_url, directurls=True + ) sample_id, ncbi_taxon_id, fastq1_manifest, fastq2_manifest = get_ss2_paired_end_inputs( - bundle_uuid, bundle_version, dss_url + primary_bundle ) - tsv_headers = ['fastq_1', 'fastq_2', 'sample_id'] tsv_values = [fastq1_manifest.url, fastq2_manifest.url, sample_id] @@ -191,10 +193,7 @@ def _get_content_for_ss2_se_input_tsv( ) ) primary_bundle = metadata_utils.get_bundle_metadata( - uuid=bundle_uuid, - version=bundle_version, - dss_url=dss_url, - http_requests=http_requests, + uuid=bundle_uuid, version=bundle_version, dss_url=dss_url ) sample_id = metadata_utils.get_sample_id(primary_bundle) diff --git a/pipeline_tools/shared/dcp_utils.py b/pipeline_tools/shared/dcp_utils.py index 0f20669e..c6a293c6 100644 --- a/pipeline_tools/shared/dcp_utils.py +++ b/pipeline_tools/shared/dcp_utils.py @@ -1,75 +1,6 @@ -import logging - from pipeline_tools.shared.http_requests import HttpRequests # noqa -def get_file_by_uuid(file_id, dss_url, http_requests): - """Retrieve a JSON file from the Human Cell Atlas data storage service by its id. - Retry with exponentially increasing wait times between requests if there are any failures. - - Args: - file_id (str): the id of the file to retrieve. - dss_url (str): the url for the HCA data storage service, e.g. "https://dss.staging.data.humancellatlas.org/v1". - http_requests (HttpRequests): the HttpRequests object to use - - Returns: - dict: dict representing the contents of the JSON file - - Raises: - requests.HTTPError: for 4xx errors or 5xx errors beyond timeout - """ - url = '{dss_url}/files/{file_id}?replica=gcp'.format( - dss_url=dss_url, file_id=file_id - ) - logging.info('GET {0}'.format(url)) - response = http_requests.get(url) - logging.info(response.status_code) - logging.info(response.text) - return response.json() - - -def get_manifest(bundle_uuid, bundle_version, dss_url, http_requests): - """Retrieve manifest JSON file for a given bundle uuid and version. - - Retry with exponentially increasing wait times between requests if there are any failures. - - TODO: Reduce the number of lines of code by switching to use DSS Python API client. - - Instead of talking to the DSS API directly, using the DSS Python API can avoid a lot of potential issues, - especially those related to the Checkout Service. A simple example of using the DSS Python client and the - metadata-api to get the manifest would be: - - ```python - from humancellatlas.data.metadata.helpers.dss import download_bundle_metadata, dss_client - - client = dss_client() - version, manifest, metadata_files = download_bundle_metadata(client, 'gcp', bundle_uuid, directurls=True) - ``` - - Args: - bundle_uuid (str): the uuid of the bundle - bundle_version (str): the bundle version, e.g. "2017-10-23T17:50:26.894Z" - dss_url (str): The url for the Human Cell Atlas data storage service, - e.g. "https://dss.staging.data.humancellatlas.org/v1" - http_requests (HttpRequests): the HttpRequests object to use - - Returns: - dict: A dict representing the full bundle manifest, with `directurls` for each file. - - Raises: - requests.HTTPError: for 4xx errors or 5xx errors beyond timeout - """ - url = '{dss_url}/bundles/{bundle_uuid}?version={bundle_version}&replica=gcp&directurls=true'.format( - dss_url=dss_url, bundle_uuid=bundle_uuid, bundle_version=bundle_version - ) - logging.info('GET {0}'.format(url)) - response = http_requests.get(url) - logging.info(response.status_code) - logging.info(response.text) - manifest = response.json() - return manifest - - def get_auth_token( http_requests, url="https://danielvaughan.eu.auth0.com/oauth/token", diff --git a/pipeline_tools/shared/metadata_utils.py b/pipeline_tools/shared/metadata_utils.py index 9a4ad3fe..1131f8f9 100644 --- a/pipeline_tools/shared/metadata_utils.py +++ b/pipeline_tools/shared/metadata_utils.py @@ -1,35 +1,27 @@ -from pipeline_tools.shared import dcp_utils from humancellatlas.data.metadata.api import Bundle, CellSuspension -from pipeline_tools.shared.http_requests import HttpRequests +from humancellatlas.data.metadata.helpers.dss import ( + download_bundle_metadata, + dss_client, +) from pipeline_tools.shared.exceptions import UnsupportedOrganismException -import functools -from concurrent.futures import ThreadPoolExecutor -def get_bundle_metadata(uuid, version, dss_url, http_requests): +def get_bundle_metadata(uuid, version, dss_url, directurls=False): """Factory function to create a `humancellatlas.data.metadata.Bundle` object from bundle information and manifest. Args: bundle_uuid (str): The bundle uuid. bundle_version (str): The bundle version. dss_url (str): Url of Data Storage System to query - http_requests (HttpRequests): An HttpRequests object. Returns: humancellatlas.data.metadata.Bundle: A bundle metadata object. """ - manifest = dcp_utils.get_manifest( - bundle_uuid=uuid, - bundle_version=version, - dss_url=dss_url, - http_requests=http_requests, - )['bundle']['files'] - - metadata_files_dict = {f['name']: f for f in manifest if f['indexed']} - metadata_files = get_metadata_files( - metadata_files_dict=metadata_files_dict, dss_url=dss_url + dss_environment = dss_url.split('.')[1] + client = dss_client(deployment=dss_environment) + version, manifest, metadata_files = download_bundle_metadata( + client=client, replica='gcp', uuid=uuid, version=version, directurls=directurls ) - return Bundle( uuid=uuid, version=version, manifest=manifest, metadata_files=metadata_files ) @@ -73,67 +65,6 @@ def get_ncbi_taxon_id(bundle: Bundle): return first_taxon_id -def download_file(item, dss_url, http_requests=HttpRequests()): - """Download the metadata for a given bundle from the HCA data store (DSS). - - This function borrows a lot of existing code from the `metadata-api` code base for consistency, - and this won't be required after migrating to use the HCA DSS Python API `dss_client` directly. - - Args: - item (typing.ItemsView): A dictionary's ItemsView object consisting of file_name and the manifest_entry. - dss_url (str): The url for the DCP Data Storage Service. - http_requests (HttpRequests): The HttpRequests object to use. - """ - file_name, manifest_entry = item - file_uuid = manifest_entry['uuid'] - return ( - file_name, - dcp_utils.get_file_by_uuid( - file_id=file_uuid, dss_url=dss_url, http_requests=http_requests - ), - ) - - -def get_metadata_files(metadata_files_dict, dss_url, num_workers=None): - """Get the dictionary mapping the file name of each metadata file in the bundle to the JSON contents of that file. - - This function by default uses concurrent threads to accelerate the communication with Data Store service. - - Args: - metadata_files_dict (dict): A dictionary maps filename to indexed file content among the bundle manifest, - this will only be used for preparing the metadata_files dictionary. - dss_url (str): The url for the DCP Data Storage Service. - num_workers(int or None): The size of the thread pool to use for downloading metadata files in parallel. - If None, the default pool size will be used, typically a small multiple of the number of cores - on the system executing this function. If 0, no thread pool will be used and all files will be - downloaded sequentially by the current thread. - - Returns: - metadata_files (dict): A dictionary mapping the file name of each metadata file in the bundle to the JSON - contents of that file. - """ - if num_workers == 0: - metadata_files = dict( - map( - functools.partial( - download_file, dss_url=dss_url, http_requests=HttpRequests() - ), - metadata_files_dict.items(), - ) - ) - else: - with ThreadPoolExecutor(num_workers) as tpe: - metadata_files = dict( - tpe.map( - functools.partial( - download_file, dss_url=dss_url, http_requests=HttpRequests() - ), - metadata_files_dict.items(), - ) - ) - return metadata_files - - def get_hashes_from_file_manifest(file_manifest): """ Return a string that is a concatenation of the file hashes provided in the bundle manifest entry for a file: {sha1}{sha256}{s3_etag}{crc32c} diff --git a/pipeline_tools/tests/shared/test_dcp_utils.py b/pipeline_tools/tests/shared/test_dcp_utils.py index 8315b643..ea3a6f75 100644 --- a/pipeline_tools/tests/shared/test_dcp_utils.py +++ b/pipeline_tools/tests/shared/test_dcp_utils.py @@ -1,5 +1,4 @@ import pytest -import requests from pipeline_tools.shared import dcp_utils from pipeline_tools.shared.http_requests import HttpRequests @@ -23,64 +22,6 @@ class Data: class TestDCPUtils(object): - def test_get_file_by_uuid(self, requests_mock, test_data): - expect_file = {"file": "test", "id": test_data.FILE_ID} - url = '{dss_url}/files/{file_id}?replica=gcp'.format( - dss_url=test_data.DSS_URL, file_id=test_data.FILE_ID - ) - - def _request_callback(request, context): - context.status_code = 200 - return expect_file - - requests_mock.get(url, json=_request_callback) - - with HttpRequestsManager(): - json_response = dcp_utils.get_file_by_uuid( - test_data.FILE_ID, test_data.DSS_URL, HttpRequests() - ) - - assert json_response['file'] == expect_file['file'] - - assert requests_mock.call_count == 1 - - def test_get_file_by_uuid_retries_on_error(self, requests_mock, test_data): - url = '{dss_url}/files/{file_id}?replica=gcp'.format( - dss_url=test_data.DSS_URL, file_id=test_data.FILE_ID - ) - - def _request_callback(request, context): - context.status_code = 500 - return {'status': 'error', 'message': 'Internal Server Error'} - - requests_mock.get(url, json=_request_callback) - with pytest.raises(requests.HTTPError), HttpRequestsManager(): - dcp_utils.get_file_by_uuid( - test_data.FILE_ID, test_data.DSS_URL, HttpRequests() - ) - assert requests_mock.call_count == 3 - - def test_get_manifest_retries_on_error(self, requests_mock, test_data): - url = '{dss_url}/bundles/{bundle_uuid}?version={bundle_version}&replica=gcp&directurls=true'.format( - dss_url=test_data.DSS_URL, - bundle_uuid=test_data.BUNDLE_UUID, - bundle_version=test_data.BUNDLE_VERSION, - ) - - def _request_callback(request, context): - context.status_code = 500 - return {'status': 'error', 'message': 'Internal Server Error'} - - requests_mock.get(url, json=_request_callback) - with pytest.raises(requests.HTTPError), HttpRequestsManager(): - dcp_utils.get_manifest( - test_data.BUNDLE_UUID, - test_data.BUNDLE_VERSION, - test_data.DSS_URL, - HttpRequests(), - ) - assert requests_mock.call_count == 3 - def test_get_auth_token(self, requests_mock, test_data): url = "https://test.auth0" diff --git a/pipeline_tools/tests/shared/test_metadata_utils.py b/pipeline_tools/tests/shared/test_metadata_utils.py index 1d88cc2e..fe82ae85 100644 --- a/pipeline_tools/tests/shared/test_metadata_utils.py +++ b/pipeline_tools/tests/shared/test_metadata_utils.py @@ -1,13 +1,8 @@ import json import os import pytest - from humancellatlas.data.metadata.api import Bundle, ManifestEntry - - from pipeline_tools.shared import metadata_utils -from pipeline_tools.shared.http_requests import HttpRequests -from pipeline_tools.tests.http_requests_manager import HttpRequestsManager from pathlib import Path @@ -145,34 +140,6 @@ def test_get_ncbi_taxon_id(self, test_ss2_bundle_vx): ncbi_taxon_id = metadata_utils.get_ncbi_taxon_id(test_ss2_bundle_vx) assert ncbi_taxon_id == 9606 - def test_download_file(self, requests_mock, test_ss2_bundle_manifest_vx): - manifest_dict = {'project.json': test_ss2_bundle_manifest_vx[0]} - item = tuple(manifest_dict.items())[ - 0 - ] # to test this func without calling `map`, we must convert typing here - dss_url = 'https://dss.mock.org/v0' - file_id = test_ss2_bundle_manifest_vx[0]['uuid'] - - expect_result_from_dcp_utils = {'file': 'test', 'id': file_id} - url = '{dss_url}/files/{file_id}?replica=gcp'.format( - dss_url=dss_url, file_id=file_id - ) - - def _request_callback(request, context): - context.status_code = 200 - return expect_result_from_dcp_utils - - requests_mock.get(url, json=_request_callback) - - with HttpRequestsManager(): - file_name, file_response_js = metadata_utils.download_file( - item=item, dss_url=dss_url, http_requests=HttpRequests() - ) - - assert file_name == 'project.json' - assert file_response_js['file'] == expect_result_from_dcp_utils['file'] - assert requests_mock.call_count == 1 - def test_get_hashes_from_file_manifest(self, test_fastq_file_manifest): file_hashes = metadata_utils.get_hashes_from_file_manifest( test_fastq_file_manifest