Skip to content

Commit

Permalink
Remove data file checkout when only retrieving metadata (#162)
Browse files Browse the repository at this point in the history
* Only checkout metadata to get inputs

Because generating a hash of the workflow
input metadata only requires certain metadata
fields, remove the checkout of data files.
Note, the data file checkout is still required
in the pipeline getInputs step to get gs urls to
the input fastqs.

* Use metadata-api library to download metadata

Many of the functions in pipeline-tools are also
available in the metadata-api library, so we can
simplify the pipeline-tools code by using that
functionality.

* Fix function calls

* Update pipeline-tools image for testing

* Update pipeline-tools version
  • Loading branch information
samanehsan authored Aug 19, 2019
1 parent f14403c commit 8ee0184
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 280 deletions.
2 changes: 1 addition & 1 deletion adapter_pipelines/Optimus/adapter.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ workflow AdapterOptimus {
Boolean record_http = false
Boolean add_md5s = false
String pipeline_tools_version = "v0.56.2"
String pipeline_tools_version = "v0.56.3"
call GetInputs as prep {
input:
Expand Down
2 changes: 1 addition & 1 deletion adapter_pipelines/cellranger/adapter.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ workflow Adapter10xCount {
Boolean record_http = false
Boolean add_md5s = false
String pipeline_tools_version = "v0.56.2"
String pipeline_tools_version = "v0.56.3"
call GetInputs {
input:
Expand Down
2 changes: 1 addition & 1 deletion adapter_pipelines/ss2_single_end/adapter.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ workflow AdapterSmartSeq2SingleCellUnpaired {
Boolean record_http = false
Boolean add_md5s = false
String pipeline_tools_version = "v0.56.2"
String pipeline_tools_version = "v0.56.3"
call GetInputs as prep {
input:
Expand Down
2 changes: 1 addition & 1 deletion adapter_pipelines/ss2_single_sample/adapter.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ workflow AdapterSmartSeq2SingleCell{
Boolean record_http = false
Boolean add_md5s = false
String pipeline_tools_version = "v0.56.2"
String pipeline_tools_version = "v0.56.3"
call GetInputs as prep {
input:
Expand Down
25 changes: 11 additions & 14 deletions pipeline_tools/pipelines/optimus/optimus.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from pipeline_tools.shared import metadata_utils
from pipeline_tools.shared import tenx_utils
from pipeline_tools.shared.http_requests import HttpRequests
from pipeline_tools.shared.reference_id import ReferenceId


Expand All @@ -18,13 +17,11 @@
}


def get_optimus_inputs(uuid, version, dss_url):
"""Gather the necessary inputs for Optimus from the bundle metadata.
def get_optimus_inputs(primary_bundle):
"""Gather the necessary inputs for Optimus from the bundle metadata object.
Args:
bundle_uuid (str): the bundle uuid.
bundle_version (str): the bundle version.
dss_url (str): the url for the DCP Data Storage Service.
humancellatlas.data.metadata.Bundle (obj): A bundle metadata object.
Returns:
tuple: tuple of the sample_id, ncbi_taxon_id, dict mapping flow cell lane indices
Expand All @@ -33,10 +30,6 @@ def get_optimus_inputs(uuid, version, dss_url):
Raises:
requests.HTTPError: on 4xx errors or 5xx errors beyond the timeout
"""
print(f"Getting bundle manifest for id {uuid}, version {version}")
primary_bundle = metadata_utils.get_bundle_metadata(
uuid=uuid, version=version, dss_url=dss_url, http_requests=HttpRequests()
)
sample_id = metadata_utils.get_sample_id(primary_bundle)
ncbi_taxon_id = metadata_utils.get_ncbi_taxon_id(primary_bundle)
fastq_files = primary_bundle.sequencing_output
Expand All @@ -62,9 +55,11 @@ def get_optimus_inputs_to_hash(uuid, version, dss_url):
requests.HTTPError: on 4xx errors or 5xx errors beyond the timeout
"""
sample_id, ncbi_taxon_id, lane_to_fastqs = get_optimus_inputs(
uuid, version, dss_url
print(f"Getting bundle manifest for id {uuid}, version {version}")
primary_bundle = metadata_utils.get_bundle_metadata(
uuid=uuid, version=version, dss_url=dss_url, directurls=False
)
sample_id, ncbi_taxon_id, lane_to_fastqs = get_optimus_inputs(primary_bundle)
sorted_lanes = sorted(lane_to_fastqs.keys(), key=int)
file_hashes = ''
for lane in sorted_lanes:
Expand Down Expand Up @@ -99,9 +94,11 @@ def create_optimus_input_tsv(uuid, version, dss_url):
Raises:
tenx_utils.LaneMissingFileError if any non-optional fastqs are missing
"""
sample_id, ncbi_taxon_id, lane_to_fastqs = get_optimus_inputs(
uuid, version, dss_url
print(f"Getting bundle manifest for id {uuid}, version {version}")
primary_bundle = metadata_utils.get_bundle_metadata(
uuid=uuid, version=version, dss_url=dss_url, directurls=True
)
sample_id, ncbi_taxon_id, lane_to_fastqs = get_optimus_inputs(primary_bundle)

# Stop if any fastqs are missing
tenx_utils.validate_lanes(lane_to_fastqs)
Expand Down
45 changes: 22 additions & 23 deletions pipeline_tools/pipelines/smartseq2/smartseq2.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,35 @@
}


def get_ss2_paired_end_inputs(bundle_uuid, bundle_version, dss_url):
"""Gather the necessary inputs for ss2 from the bundle metadata.
def get_ss2_paired_end_inputs(primary_bundle):
"""Gather the necessary inputs for ss2 from the bundle metadata object.
Args:
bundle_uuid (str): the bundle uuid.
bundle_version (str): the bundle version.
dss_url (str): the url for the DCP Data Storage Service.
humancellatlas.data.metadata.Bundle (obj): A bundle metadata object.
Returns:
tuple: tuple of the sample_id, ncbi_taxon_id, fastq1_manifest object and fastq2_manifest object
Raises:
requests.HTTPError: on 4xx errors or 5xx errors beyond the timeout
"""
print(
"Getting bundle manifest for id {0}, version {1}".format(
bundle_uuid, bundle_version
)
)
primary_bundle = metadata_utils.get_bundle_metadata(
uuid=bundle_uuid,
version=bundle_version,
dss_url=dss_url,
http_requests=HttpRequests(),
)
sample_id = metadata_utils.get_sample_id(primary_bundle)
ncbi_taxon_id = metadata_utils.get_ncbi_taxon_id(primary_bundle)
fastq1_manifest, fastq2_manifest = get_fastq_manifest_entry_for_ss2(primary_bundle)
return sample_id, ncbi_taxon_id, fastq1_manifest, fastq2_manifest


def get_ss2_paired_end_inputs_to_hash(bundle_uuid, bundle_version, dss_url):
print(
"Getting bundle manifest for id {0}, version {1}".format(
bundle_uuid, bundle_version
)
)
primary_bundle = metadata_utils.get_bundle_metadata(
uuid=bundle_uuid, version=bundle_version, dss_url=dss_url, directurls=False
)
sample_id, ncbi_taxon_id, fastq1_manifest, fastq2_manifest = get_ss2_paired_end_inputs(
bundle_uuid, bundle_version, dss_url
primary_bundle
)
fastq1_hashes = metadata_utils.get_hashes_from_file_manifest(fastq1_manifest)
fastq2_hashes = metadata_utils.get_hashes_from_file_manifest(fastq2_manifest)
Expand All @@ -77,10 +72,17 @@ def create_ss2_input_tsv(
Raises:
requests.HTTPError: for 4xx errors or 5xx errors beyond the timeout
"""
print(
"Getting bundle manifest for id {0}, version {1}".format(
bundle_uuid, bundle_version
)
)
primary_bundle = metadata_utils.get_bundle_metadata(
uuid=bundle_uuid, version=bundle_version, dss_url=dss_url, directurls=True
)
sample_id, ncbi_taxon_id, fastq1_manifest, fastq2_manifest = get_ss2_paired_end_inputs(
bundle_uuid, bundle_version, dss_url
primary_bundle
)

tsv_headers = ['fastq_1', 'fastq_2', 'sample_id']
tsv_values = [fastq1_manifest.url, fastq2_manifest.url, sample_id]

Expand Down Expand Up @@ -191,10 +193,7 @@ def _get_content_for_ss2_se_input_tsv(
)
)
primary_bundle = metadata_utils.get_bundle_metadata(
uuid=bundle_uuid,
version=bundle_version,
dss_url=dss_url,
http_requests=http_requests,
uuid=bundle_uuid, version=bundle_version, dss_url=dss_url
)

sample_id = metadata_utils.get_sample_id(primary_bundle)
Expand Down
69 changes: 0 additions & 69 deletions pipeline_tools/shared/dcp_utils.py
Original file line number Diff line number Diff line change
@@ -1,75 +1,6 @@
import logging

from pipeline_tools.shared.http_requests import HttpRequests # noqa


def get_file_by_uuid(file_id, dss_url, http_requests):
"""Retrieve a JSON file from the Human Cell Atlas data storage service by its id.
Retry with exponentially increasing wait times between requests if there are any failures.
Args:
file_id (str): the id of the file to retrieve.
dss_url (str): the url for the HCA data storage service, e.g. "https://dss.staging.data.humancellatlas.org/v1".
http_requests (HttpRequests): the HttpRequests object to use
Returns:
dict: dict representing the contents of the JSON file
Raises:
requests.HTTPError: for 4xx errors or 5xx errors beyond timeout
"""
url = '{dss_url}/files/{file_id}?replica=gcp'.format(
dss_url=dss_url, file_id=file_id
)
logging.info('GET {0}'.format(url))
response = http_requests.get(url)
logging.info(response.status_code)
logging.info(response.text)
return response.json()


def get_manifest(bundle_uuid, bundle_version, dss_url, http_requests):
"""Retrieve manifest JSON file for a given bundle uuid and version.
Retry with exponentially increasing wait times between requests if there are any failures.
TODO: Reduce the number of lines of code by switching to use DSS Python API client.
Instead of talking to the DSS API directly, using the DSS Python API can avoid a lot of potential issues,
especially those related to the Checkout Service. A simple example of using the DSS Python client and the
metadata-api to get the manifest would be:
```python
from humancellatlas.data.metadata.helpers.dss import download_bundle_metadata, dss_client
client = dss_client()
version, manifest, metadata_files = download_bundle_metadata(client, 'gcp', bundle_uuid, directurls=True)
```
Args:
bundle_uuid (str): the uuid of the bundle
bundle_version (str): the bundle version, e.g. "2017-10-23T17:50:26.894Z"
dss_url (str): The url for the Human Cell Atlas data storage service,
e.g. "https://dss.staging.data.humancellatlas.org/v1"
http_requests (HttpRequests): the HttpRequests object to use
Returns:
dict: A dict representing the full bundle manifest, with `directurls` for each file.
Raises:
requests.HTTPError: for 4xx errors or 5xx errors beyond timeout
"""
url = '{dss_url}/bundles/{bundle_uuid}?version={bundle_version}&replica=gcp&directurls=true'.format(
dss_url=dss_url, bundle_uuid=bundle_uuid, bundle_version=bundle_version
)
logging.info('GET {0}'.format(url))
response = http_requests.get(url)
logging.info(response.status_code)
logging.info(response.text)
manifest = response.json()
return manifest


def get_auth_token(
http_requests,
url="https://danielvaughan.eu.auth0.com/oauth/token",
Expand Down
87 changes: 9 additions & 78 deletions pipeline_tools/shared/metadata_utils.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,27 @@
from pipeline_tools.shared import dcp_utils
from humancellatlas.data.metadata.api import Bundle, CellSuspension
from pipeline_tools.shared.http_requests import HttpRequests
from humancellatlas.data.metadata.helpers.dss import (
download_bundle_metadata,
dss_client,
)
from pipeline_tools.shared.exceptions import UnsupportedOrganismException
import functools
from concurrent.futures import ThreadPoolExecutor


def get_bundle_metadata(uuid, version, dss_url, http_requests):
def get_bundle_metadata(uuid, version, dss_url, directurls=False):
"""Factory function to create a `humancellatlas.data.metadata.Bundle` object from bundle information and manifest.
Args:
bundle_uuid (str): The bundle uuid.
bundle_version (str): The bundle version.
dss_url (str): Url of Data Storage System to query
http_requests (HttpRequests): An HttpRequests object.
Returns:
humancellatlas.data.metadata.Bundle: A bundle metadata object.
"""
manifest = dcp_utils.get_manifest(
bundle_uuid=uuid,
bundle_version=version,
dss_url=dss_url,
http_requests=http_requests,
)['bundle']['files']

metadata_files_dict = {f['name']: f for f in manifest if f['indexed']}
metadata_files = get_metadata_files(
metadata_files_dict=metadata_files_dict, dss_url=dss_url
dss_environment = dss_url.split('.')[1]
client = dss_client(deployment=dss_environment)
version, manifest, metadata_files = download_bundle_metadata(
client=client, replica='gcp', uuid=uuid, version=version, directurls=directurls
)

return Bundle(
uuid=uuid, version=version, manifest=manifest, metadata_files=metadata_files
)
Expand Down Expand Up @@ -73,67 +65,6 @@ def get_ncbi_taxon_id(bundle: Bundle):
return first_taxon_id


def download_file(item, dss_url, http_requests=HttpRequests()):
"""Download the metadata for a given bundle from the HCA data store (DSS).
This function borrows a lot of existing code from the `metadata-api` code base for consistency,
and this won't be required after migrating to use the HCA DSS Python API `dss_client` directly.
Args:
item (typing.ItemsView): A dictionary's ItemsView object consisting of file_name and the manifest_entry.
dss_url (str): The url for the DCP Data Storage Service.
http_requests (HttpRequests): The HttpRequests object to use.
"""
file_name, manifest_entry = item
file_uuid = manifest_entry['uuid']
return (
file_name,
dcp_utils.get_file_by_uuid(
file_id=file_uuid, dss_url=dss_url, http_requests=http_requests
),
)


def get_metadata_files(metadata_files_dict, dss_url, num_workers=None):
"""Get the dictionary mapping the file name of each metadata file in the bundle to the JSON contents of that file.
This function by default uses concurrent threads to accelerate the communication with Data Store service.
Args:
metadata_files_dict (dict): A dictionary maps filename to indexed file content among the bundle manifest,
this will only be used for preparing the metadata_files dictionary.
dss_url (str): The url for the DCP Data Storage Service.
num_workers(int or None): The size of the thread pool to use for downloading metadata files in parallel.
If None, the default pool size will be used, typically a small multiple of the number of cores
on the system executing this function. If 0, no thread pool will be used and all files will be
downloaded sequentially by the current thread.
Returns:
metadata_files (dict): A dictionary mapping the file name of each metadata file in the bundle to the JSON
contents of that file.
"""
if num_workers == 0:
metadata_files = dict(
map(
functools.partial(
download_file, dss_url=dss_url, http_requests=HttpRequests()
),
metadata_files_dict.items(),
)
)
else:
with ThreadPoolExecutor(num_workers) as tpe:
metadata_files = dict(
tpe.map(
functools.partial(
download_file, dss_url=dss_url, http_requests=HttpRequests()
),
metadata_files_dict.items(),
)
)
return metadata_files


def get_hashes_from_file_manifest(file_manifest):
""" Return a string that is a concatenation of the file hashes provided in the bundle manifest entry for a file:
{sha1}{sha256}{s3_etag}{crc32c}
Expand Down
Loading

0 comments on commit 8ee0184

Please sign in to comment.