Skip to content

Commit

Permalink
Merge pull request #194 from tcezard/EVA3455_variantload
Browse files Browse the repository at this point in the history
EVA-3455 - Job specific to variantLoad
  • Loading branch information
tcezard authored Jan 15, 2024
2 parents 0d5c6f0 + b4478b7 commit 682eb86
Show file tree
Hide file tree
Showing 4 changed files with 296 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class BeanNames {
public static final String ANNOTATE_VARIANTS_JOB = "annotate-variants-job";
public static final String INIT_DATABASE_JOB = "init-database-job";
public static final String GENOTYPED_VCF_JOB = "genotyped-vcf-job";
public static final String LOAD_VCF_JOB = "load-vcf-job";
public static final String CALCULATE_STATISTICS_JOB = "calculate-statistics-job";
public static final String DROP_STUDY_JOB = "drop-study-job";
public static final String ACCESSION_IMPORT_JOB = "accession-import-job";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2015-2017 EMBL - European Bioinformatics Institute
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package uk.ac.ebi.eva.pipeline.configuration.jobs;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.job.builder.FlowJobBuilder;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Scope;
import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.LoadFileStepConfiguration;
import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.LoadVariantsStepConfiguration;
import uk.ac.ebi.eva.pipeline.parameters.NewJobIncrementer;
import uk.ac.ebi.eva.pipeline.parameters.validation.job.LoadVcfJobParametersValidator;

import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.*;

/**
* Variant load pipeline workflow:
* <p>
* |
* transform ---> load -+
* |
* <p>
*
*/
@Configuration
@EnableBatchProcessing
@Import({LoadVariantsStepConfiguration.class, LoadFileStepConfiguration.class})
public class LoadVcfJobConfiguration {

private static final Logger logger = LoggerFactory.getLogger(LoadVcfJobConfiguration.class);

@Autowired
@Qualifier(LOAD_VARIANTS_STEP)
private Step variantLoaderStep;

@Autowired
@Qualifier(LOAD_FILE_STEP)
private Step loadFileStep;

@Bean(LOAD_VCF_JOB)
@Scope("prototype")
public Job genotypedVcfJob(JobBuilderFactory jobBuilderFactory) {
logger.debug("Building '" + LOAD_VCF_JOB + "'");

JobBuilder jobBuilder = jobBuilderFactory
.get(LOAD_VCF_JOB)
.incrementer(new NewJobIncrementer())
.validator(new LoadVcfJobParametersValidator());
FlowJobBuilder builder = jobBuilder
.flow(variantLoaderStep)
.next(loadFileStep)
.end();

return builder.build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2017 EMBL - European Bioinformatics Institute
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package uk.ac.ebi.eva.pipeline.parameters.validation.job;

import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.JobParametersValidator;
import org.springframework.batch.core.job.CompositeJobParametersValidator;
import org.springframework.batch.core.job.DefaultJobParametersValidator;
import uk.ac.ebi.eva.pipeline.configuration.jobs.LoadVcfJobConfiguration;
import uk.ac.ebi.eva.pipeline.parameters.validation.step.*;

import java.util.ArrayList;
import java.util.List;

/**
* Validates the job parameters necessary to execute an {@link LoadVcfJobConfiguration}
*/
public class LoadVcfJobParametersValidator extends DefaultJobParametersValidator {

@Override
public void validate(JobParameters parameters) throws JobParametersInvalidException {
compositeJobParametersValidator(parameters).validate(parameters);
}

private CompositeJobParametersValidator compositeJobParametersValidator(JobParameters jobParameters) {
List<JobParametersValidator> jobParametersValidators = new ArrayList<>();

jobParametersValidators.add(new LoadVariantsStepParametersValidator());
jobParametersValidators.add(new LoadFileStepParametersValidator());

CompositeJobParametersValidator compositeJobParametersValidator = new CompositeJobParametersValidator();
compositeJobParametersValidator.setValidators(jobParametersValidators);
return compositeJobParametersValidator;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright 2015-2017 EMBL - European Bioinformatics Institute
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package uk.ac.ebi.eva.pipeline.configuration.jobs;

import org.junit.*;
import org.junit.runner.RunWith;
import org.opencb.biodata.models.variant.Variant;
import org.opencb.datastore.core.QueryOptions;
import org.opencb.opencga.lib.common.Config;
import org.opencb.opencga.storage.core.StorageManagerFactory;
import org.opencb.opencga.storage.core.variant.VariantStorageManager;
import org.opencb.opencga.storage.core.variant.adaptors.VariantDBAdaptor;
import org.opencb.opencga.storage.core.variant.adaptors.VariantDBIterator;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import uk.ac.ebi.eva.pipeline.Application;
import uk.ac.ebi.eva.pipeline.configuration.BeanNames;
import uk.ac.ebi.eva.test.configuration.BatchTestConfiguration;
import uk.ac.ebi.eva.test.configuration.TemporaryRuleConfiguration;
import uk.ac.ebi.eva.test.rules.PipelineTemporaryFolderRule;
import uk.ac.ebi.eva.test.rules.TemporaryMongoRule;
import uk.ac.ebi.eva.test.utils.GenotypedVcfJobTestUtils;
import uk.ac.ebi.eva.test.utils.JobTestUtils;
import uk.ac.ebi.eva.utils.EvaJobParameterBuilder;

import java.io.File;
import java.io.FileInputStream;
import java.util.*;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static uk.ac.ebi.eva.test.utils.GenotypedVcfJobTestUtils.COLLECTION_ANNOTATIONS_NAME;
import static uk.ac.ebi.eva.test.utils.JobTestUtils.assertCompleted;
import static uk.ac.ebi.eva.test.utils.JobTestUtils.assertFailed;
import static uk.ac.ebi.eva.utils.FileUtils.getResource;

/**
* Test for {@link LoadVcfJobConfiguration}
*/

@RunWith(SpringRunner.class)
@ActiveProfiles({Application.VARIANT_WRITER_MONGO_PROFILE, Application.VARIANT_ANNOTATION_MONGO_PROFILE})
@TestPropertySource({"classpath:variant-aggregated.properties", "classpath:test-mongo.properties"})
@ContextConfiguration(classes = {LoadVcfJobConfiguration.class, BatchTestConfiguration.class, TemporaryRuleConfiguration.class})
public class LoadVcfJobTest {
public static final String INPUT = "/input-files/vcf/aggregated.vcf.gz";

private static final String COLLECTION_VARIANTS_NAME = "variants";

private static final String COLLECTION_FILES_NAME = "files";

@Autowired
@Rule
public TemporaryMongoRule mongoRule;

@Rule
public PipelineTemporaryFolderRule temporaryFolderRule = new PipelineTemporaryFolderRule();

@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;

public static final Set<String> EXPECTED_REQUIRED_STEP_NAMES = new TreeSet<>(
Arrays.asList(BeanNames.LOAD_VARIANTS_STEP, BeanNames.LOAD_FILE_STEP));

@Before
public void setUp() throws Exception {
Config.setOpenCGAHome(GenotypedVcfJobTestUtils.getDefaultOpencgaHome());
}

@Test
public void aggregatedLoadVcf() throws Exception {
String dbName = mongoRule.getRandomTemporaryDatabaseName();

JobParameters jobParameters = new EvaJobParameterBuilder()
.collectionFilesName(COLLECTION_FILES_NAME)
.collectionVariantsName(COLLECTION_VARIANTS_NAME)
.databaseName(dbName)
.inputStudyId("aggregated-job")
.inputStudyName("inputStudyName")
.inputStudyType("COLLECTION")
.inputVcf(getResource(INPUT).getAbsolutePath())
.inputVcfAggregation("BASIC")
.inputVcfId("1")
.timestamp()
.toJobParameters();
JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters);

// check execution flow
assertCompleted(jobExecution);

Collection<StepExecution> stepExecutions = jobExecution.getStepExecutions();
Set<String> names = stepExecutions.stream().map(StepExecution::getStepName)
.collect(Collectors.toSet());

assertEquals(EXPECTED_REQUIRED_STEP_NAMES, names);

StepExecution lastRequiredStep = new ArrayList<>(stepExecutions).get(EXPECTED_REQUIRED_STEP_NAMES.size() - 1);
assertEquals(BeanNames.LOAD_FILE_STEP, lastRequiredStep.getStepName());

// check ((documents in DB) == (lines in file))
VariantStorageManager variantStorageManager = StorageManagerFactory.getVariantStorageManager();
VariantDBAdaptor variantDBAdaptor = variantStorageManager.getDBAdaptor(dbName, null);
VariantDBIterator iterator = variantDBAdaptor.iterator(new QueryOptions());

File file = getResource(INPUT);
long lines = JobTestUtils.getLines(new GZIPInputStream(new FileInputStream(file)));
Assert.assertEquals(lines, JobTestUtils.count(iterator));

// check that stats are loaded properly
Variant variant = variantDBAdaptor.iterator(new QueryOptions()).next();
assertFalse(variant.getSourceEntries().values().iterator().next().getCohortStats().isEmpty());
}

@Test
public void GenotypedLoadVcfJob() throws Exception {
File inputFile = GenotypedVcfJobTestUtils.getInputFile();
String databaseName = mongoRule.getRandomTemporaryDatabaseName();
File fasta = temporaryFolderRule.newFile();

// Run the Job
JobParameters jobParameters = new EvaJobParameterBuilder()
.collectionFilesName(GenotypedVcfJobTestUtils.COLLECTION_FILES_NAME)
.collectionVariantsName(GenotypedVcfJobTestUtils.COLLECTION_VARIANTS_NAME)
.databaseName(databaseName)
.inputFasta(fasta.getAbsolutePath())
.inputStudyId(GenotypedVcfJobTestUtils.INPUT_STUDY_ID)
.inputStudyName("inputStudyName")
.inputStudyType("COLLECTION")
.inputVcf(inputFile.getAbsolutePath())
.inputVcfAggregation("NONE")
.inputVcfId(GenotypedVcfJobTestUtils.INPUT_VCF_ID)
.toJobParameters();

JobExecution jobExecution = jobLauncherTestUtils.launchJob(jobParameters);

assertCompleted(jobExecution);

GenotypedVcfJobTestUtils.checkLoadStep(mongoRule, databaseName);

}

}

0 comments on commit 682eb86

Please sign in to comment.