From 972a3127683ea53e9cec489c88213e7241df0258 Mon Sep 17 00:00:00 2001 From: "F.C. Martin" Date: Tue, 11 Jun 2024 13:03:54 +0200 Subject: [PATCH 01/16] Added OHDSI database connection --- Dockerfile | 2 +- Makefile | 8 +- setup.py | 1 + v6-kaplan-meier-py/central.py | 8 +- v6-kaplan-meier-py/partial.py | 168 +++++++++++++++-- v6-kaplan-meier-py/sql/standard_features.sql | 180 +++++++++++++++++++ 6 files changed, 351 insertions(+), 16 deletions(-) create mode 100644 v6-kaplan-meier-py/sql/standard_features.sql diff --git a/Dockerfile b/Dockerfile index bcb6b4e..d8f5964 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,7 +3,7 @@ # different image here (e.g. python:3). In that case it is important that # `vantage6-client` is a dependancy of you project as this contains the wrapper # we are using in this example. -FROM harbor2.vantage6.ai/infrastructure/algorithm-base:4.2 +FROM harbor2.vantage6.ai/infrastructure/algorithm-ohdsi-base:4.5 # Change this to the package name of your project. This needs to be the same # as what you specified for the name in the `setup.py`. diff --git a/Makefile b/Makefile index aecd9cb..3f8b167 100644 --- a/Makefile +++ b/Makefile @@ -15,11 +15,11 @@ ifeq ($(PUSH_REG), true) endif image: - @echo "Building ${REGISTRY}/algorithms/kaplan-meier:${TAG}-v6-${VANTAGE6_VERSION}" - @echo "Building ${REGISTRY}/algorithms/kaplan-meier:latest" + @echo "Building ${REGISTRY}/blueberry/kaplan-meier:${TAG}-v6-${VANTAGE6_VERSION}" + @echo "Building ${REGISTRY}/blueberry/kaplan-meier:latest" docker buildx build \ - --tag ${REGISTRY}/algorithms/kaplan-meier:${TAG} \ - --tag ${REGISTRY}/algorithms/kaplan-meier:latest \ + --tag ${REGISTRY}/blueberry/kaplan-meier:${TAG} \ + --tag ${REGISTRY}/blueberry/kaplan-meier:latest \ --platform ${PLATFORMS} \ -f ./Dockerfile \ $(if ${_condition_push},--push .,.) \ No newline at end of file diff --git a/setup.py b/setup.py index 8979ed1..37bf483 100644 --- a/setup.py +++ b/setup.py @@ -17,4 +17,5 @@ packages=find_packages(), python_requires=">=3.10", install_requires=["vantage6-algorithm-tools==4.4.1", "numpy", "pandas"], + package_data={"v6_kaplan_meier_py": ["sql/*.sql"]}, ) diff --git a/v6-kaplan-meier-py/central.py b/v6-kaplan-meier-py/central.py index 89ebaf1..5385242 100644 --- a/v6-kaplan-meier-py/central.py +++ b/v6-kaplan-meier-py/central.py @@ -21,6 +21,8 @@ @algorithm_client def central( client: AlgorithmClient, + cohort_task_id: int, + shared_cohort_id: int, time_column_name: str, censor_column_name: str, organizations_to_include: List[int] | None = None, @@ -69,6 +71,8 @@ def central( client=client, method="get_unique_event_times", organizations_to_include=organizations_to_include, + cohort_task_id=cohort_task_id, + shared_cohort_id=shared_cohort_id, time_column_name=time_column_name, ) @@ -82,9 +86,11 @@ def central( client=client, method="get_km_event_table", organizations_to_include=organizations_to_include, - unique_event_times=list(unique_event_times), + cohort_task_id=cohort_task_id, + shared_cohort_id=shared_cohort_id, time_column_name=time_column_name, censor_column_name=censor_column_name, + unique_event_times=list(unique_event_times), ) local_event_tables = [ pd.read_json(event_table) for event_table in local_km_per_node diff --git a/v6-kaplan-meier-py/partial.py b/v6-kaplan-meier-py/partial.py index e81508e..41a5da9 100644 --- a/v6-kaplan-meier-py/partial.py +++ b/v6-kaplan-meier-py/partial.py @@ -1,10 +1,21 @@ import re +import traceback +import pkg_resources import pandas as pd import numpy as np from typing import List + +from rpy2.robjects import RS4 +from rpy2.rinterface_lib.sexp import NACharacterType + from vantage6.algorithm.tools.util import get_env_var, info, warn, error -from vantage6.algorithm.tools.decorators import data +from vantage6.algorithm.tools.decorators import ( + database_connection, + metadata, + OHDSIMetaData, + RunMetaData, +) from vantage6.algorithm.tools.exceptions import InputError, EnvironmentVariableError from .utils import get_env_var_as_int, get_env_var_as_list, get_env_var_as_float @@ -16,16 +27,37 @@ ) from .enums import NoiseType +from ohdsi.sqlrender import read_sql, render, translate +from ohdsi.database_connector import query_sql +from ohdsi.common import convert_from_r -@data(1) -def get_unique_event_times(df: pd.DataFrame, time_column_name: str) -> List[str]: + +@metadata +@database_connection(types=["OMOP"], include_metadata=True) +def get_unique_event_times( + connection: RS4, + meta_omop: OHDSIMetaData, + meta_run: RunMetaData, + cohort_task_id: int, + shared_cohort_id: float, + time_column_name: str, +) -> List[str]: """ Get unique event times from a DataFrame. Parameters ---------- - df : pd.DataFrame - Input DataFrame supplied by the node. + connection : RS4 + Connection to the database. + meta_omop : OHDSIMetaData + Metadata of the OMOP database. + meta_run : RunMetaData + Metadata of the run. + cohort_task_id : int + Task ID of the task where the cohort is created in the database. + shared_cohort_id : float + Shared cohort ID, this id is the same over all nodes and is returned by the + task that created the cohort. time_column_name : str Name of the column representing time. @@ -39,19 +71,31 @@ def get_unique_event_times(df: pd.DataFrame, time_column_name: str) -> List[str] InputError If the time column is not found in the DataFrame. """ - info("Getting unique event times") + info("Getting unique event times per cohort") info(f"Time column name: {time_column_name}") + info("Creating cohort DataFrame from OMOP CDM") + df = __create_cohort_dataframe( + connection, meta_run, meta_omop, cohort_task_id, shared_cohort_id + ) + + info("Checking privacy guards") _privacy_gaurds(df, time_column_name) + info("Adding noise to event times") df = _add_noise_to_event_times(df, time_column_name) return df[time_column_name].unique().tolist() -@data(1) +@metadata +@database_connection(types=["OMOP"], include_metadata=True) def get_km_event_table( - df: pd.DataFrame, + connection: RS4, + meta_omop: OHDSIMetaData, + meta_run: RunMetaData, + cohort_task_id: int, + shared_cohort_id: float, time_column_name: str, censor_column_name: str, unique_event_times: List[int | float], @@ -61,8 +105,17 @@ def get_km_event_table( Parameters ---------- - df : pd.DataFrame - Input DataFrame. + connection : RS4 + Connection to the database. + meta_omop : OHDSIMetaData + Metadata of the OMOP database. + meta_run : RunMetaData + Metadata of the run. + cohort_task_id : int + Task ID of the task where the cohort is created in the database. + shared_cohort_id : float + Shared cohort ID, this id is the same over all nodes and is returned by the + task that created the cohort. time_column_name : str Name of the column representing time. censor_column_name : str @@ -75,6 +128,11 @@ def get_km_event_table( str The Kaplan-Meier event table as a JSON string. """ + info("Creating cohort DataFrame from OMOP CDM") + df = __create_cohort_dataframe( + connection, meta_run, meta_omop, cohort_task_id, shared_cohort_id + ) + _privacy_gaurds(df, time_column_name) df = _add_noise_to_event_times(df, time_column_name) @@ -134,6 +192,96 @@ def _privacy_gaurds(df: pd.DataFrame, time_column_name: str) -> pd.DataFrame: raise InputError(f"Column '{time_column_name}' not found in the data frame.") +def __create_cohort_dataframe( + connection: RS4, + meta_run: RunMetaData, + meta_omop: OHDSIMetaData, + cohort_task_id: int, + shared_cohort_id: float, +) -> list[pd.DataFrame]: + """ + Query the database for the data of the cohort. + + Parameters + ---------- + connection : RS4 + Connection to the database. + + Returns + ------- + pd.DataFrame + The data of the cohort. + """ + # Get the task id of the task that created the cohort at this node + cohort_table = f"cohort_{cohort_task_id}_{meta_run.node_id}" + + # Obtain the cohort IDs by combining the shared ids (equal over all nodes) with the + # local node id + cohort_id = float(f"{meta_run.node_id}{shared_cohort_id}") + + # Obtain SQL file for standard features + sql_path = pkg_resources.resource_filename( + "v6_kaplan_meier_py", "sql/standard_features.sql" + ) + + # SQL READ + try: + raw_sql = read_sql(sql_path) + except Exception as e: + error(f"Failed to read SQL file: {e}") + traceback.print_exc() + raise e + + df = _query_database(connection, raw_sql, cohort_table, cohort_id, meta_omop) + + # NACHARS + df["OBSERVATION_VAS"] = df["OBSERVATION_VAS"].apply( + lambda val: np.nan if isinstance(val, NACharacterType) else val + ) + + # DROP DUPLICATES + sub_df = df.drop_duplicates("SUBJECT_ID", keep="first") + info(f"Dropped {len(df) - len(sub_df)} rows") + + return sub_df + + +def _query_database( + connection: RS4, + sql: str, + cohort_table: str, + cohort_id: float, + meta_omop: OHDSIMetaData, +) -> pd.DataFrame: + + # RENDER + sql = render( + sql, + cohort_table=f"{meta_omop.results_schema}.{cohort_table}", + cohort_id=cohort_id, + cdm_database_schema=meta_omop.cdm_schema, + incl_condition_concept_id=["NULL"], + incl_procedure_concept_id=["NULL"], # 4066543 + incl_measurement_concept_id=["NULL"], + incl_drug_concept_id=["NULL"], #'ALL' ? @TODO in algo + ) + + # TRANSLATE + sql = translate(sql, target_dialect="postgresql") + + # QUERY + try: + data_r = query_sql(connection, sql) + except Exception as e: + error(f"Failed to query the database: {e}") + traceback.print_exc() + with open("errorReportSql.txt", "r") as f: + error(f.read()) + + # CONVERT + return convert_from_r(data_r) + + def _add_noise_to_event_times(df: pd.DataFrame, time_column_name: str) -> pd.DataFrame: """ Add noise to the event times in a DataFrame when this is requisted by the data- diff --git a/v6-kaplan-meier-py/sql/standard_features.sql b/v6-kaplan-meier-py/sql/standard_features.sql new file mode 100644 index 0000000..a137449 --- /dev/null +++ b/v6-kaplan-meier-py/sql/standard_features.sql @@ -0,0 +1,180 @@ +/* death_int +*/ +WITH death_query AS ( + SELECT + c.subject_id, + DATEDIFF(DAY, cohort_start_date, death_date) AS death_int + FROM + @cohort_table c + LEFT JOIN + @cdm_database_schema.death T + ON T.person_id = c.subject_id + {@cohort_id != -1} ? {AND cohort_definition_id = @cohort_id} +), + +/* gender, year of birth, birth int, age +*/ +person_query AS ( + SELECT + c.subject_id, + T.gender_concept_id AS gender, + T.year_of_birth, + DATEDIFF(DAY, birth_datetime, cohort_start_date) AS birth_int, + DATEDIFF(YEAR, birth_datetime, cohort_start_date) - CASE WHEN (MONTH(birth_datetime) > MONTH(cohort_start_date)) OR (MONTH(birth_datetime) = MONTH(cohort_start_date) AND DAY(birth_datetime) > DAY(cohort_start_date)) THEN 1 ELSE 0 END age + FROM + @cohort_table c + LEFT JOIN + @cdm_database_schema.person T + ON T.person_id = c.subject_id + {@cohort_id != -1} ? {AND cohort_definition_id = @cohort_id} +), + +/* condition_concept_id, condition_occurrence_start, condition_occurrence_end +*/ +condition_occurrence_query AS ( + SELECT + c.subject_id, + T.condition_concept_id, + DATEDIFF(DAY, cohort_start_date, condition_start_date) AS condition_occurrence_start, + DATEDIFF(DAY, cohort_start_date, condition_end_date) AS condition_occurrence_end + FROM + @cohort_table c + LEFT JOIN + @cdm_database_schema.condition_occurrence T + ON T.person_id = c.subject_id + {@cohort_id != -1} ? {AND cohort_definition_id = @cohort_id} +), + +/* drug_concept_id, drug_exposure_start, drug_exposure_end +*/ +drug_exposure_query AS ( + SELECT + c.subject_id, + T.drug_concept_id, + DATEDIFF(DAY, cohort_start_date, drug_exposure_start_date) AS drug_exposure_start, + DATEDIFF(DAY, cohort_start_date, drug_exposure_end_date) AS drug_exposure_end + FROM + @cohort_table c + LEFT JOIN + @cdm_database_schema.drug_exposure T + ON T.person_id = c.subject_id + {@cohort_id != -1} ? {AND cohort_definition_id = @cohort_id} +), + +/* procedure_concept_id, procedure_occurrence_start, procedure_occurrence_end +*/ +procedure_occurrence_query AS ( + SELECT + c.subject_id, + T.procedure_concept_id, + DATEDIFF(DAY, cohort_start_date, procedure_date) AS procedure_occurrence_start, + DATEDIFF(DAY, cohort_start_date, procedure_end_date) AS procedure_occurrence_end + FROM + @cohort_table c + LEFT JOIN + @cdm_database_schema.procedure_occurrence T + ON T.person_id = c.subject_id + {@cohort_id != -1} ? {AND cohort_definition_id = @cohort_id} +), + +/* measurement_concept_id, measurement_start, measurement_unit_concept_id, measurement_vac, + measurement_van, measurement_operator_concept_id +*/ +measurement_query AS ( + SELECT + c.subject_id, + T.measurement_concept_id, + DATEDIFF(DAY, cohort_start_date, measurement_date) AS measurement_start, + T.unit_concept_id AS measurement_unit_concept_id, + T.value_as_concept_id AS measurement_vac, + T.value_as_number AS measurement_van, + T.operator_concept_id AS measurement_operator_concept_id + FROM + @cohort_table c + LEFT JOIN + @cdm_database_schema.measurement T + ON T.person_id = c.subject_id + {@cohort_id != -1} ? {AND cohort_definition_id = @cohort_id} +), + +/* observation_concept_id, observation_start, observation_unit_concept_id, observation_vac, +observation_van, observation_vas, observation_qualifier_concept_id +*/ +observation_query AS ( + SELECT + c.subject_id, + T.observation_concept_id, + DATEDIFF(DAY, cohort_start_date, observation_date) AS observation_start, + T.unit_concept_id AS observation_unit_concept_id, + T.value_as_concept_id AS observation_vac, + T.value_as_number AS observation_van, + T.value_as_string AS observation_vas, + T.qualifier_concept_id AS observation_qualifier_concept_id + FROM + @cohort_table c + LEFT JOIN + @cdm_database_schema.observation T + ON T.person_id = c.subject_id + {@cohort_id != -1} ? {AND cohort_definition_id = @cohort_id} +) + + +SELECT + d.subject_id, + d.death_int, + p.gender, + p.year_of_birth, + p.birth_int, + p.age, + c.condition_concept_id, + c.condition_occurrence_start, + c.condition_occurrence_end, + de.drug_concept_id, + de.drug_exposure_start, + de.drug_exposure_end, + po.procedure_concept_id, + po.procedure_occurrence_start, + po.procedure_occurrence_end, + m.measurement_concept_id, + m.measurement_start, + m.measurement_unit_concept_id, + m.measurement_vac, + m.measurement_van, + m.measurement_operator_concept_id, + o.observation_concept_id, + o.observation_start, + o.observation_unit_concept_id, + o.observation_vac, + o.observation_van, + o.observation_vas, + o.observation_qualifier_concept_id +FROM + person_query p +FULL OUTER JOIN + death_query d + ON d.subject_id = p.subject_id +FULL OUTER JOIN + condition_occurrence_query c + ON p.subject_id = c.subject_id +FULL OUTER JOIN + drug_exposure_query de + ON p.subject_id = de.subject_id +FULL OUTER JOIN + procedure_occurrence_query po + ON p.subject_id = po.subject_id +FULL OUTER JOIN + measurement_query m + ON p.subject_id = m.subject_id +FULL OUTER JOIN + observation_query o + ON p.subject_id = o.subject_id + +WHERE (condition_concept_id IN (@incl_condition_concept_id) OR (@incl_condition_concept_id) IS NULL) +AND (procedure_concept_id IN (@incl_procedure_concept_id) OR (@incl_procedure_concept_id) IS NULL) +AND (measurement_concept_id IN (@incl_measurement_concept_id) OR (@incl_measurement_concept_id) IS NULL) +AND (drug_concept_id IN (@incl_drug_concept_id) OR (@incl_drug_concept_id) IS NULL) + +ORDER BY p.subject_id ASC -- LIMIT 100 + + + From 31b5e0041d219307ff02047c611ae18bedbc9001 Mon Sep 17 00:00:00 2001 From: "F.C. Martin" Date: Tue, 11 Jun 2024 15:13:53 +0200 Subject: [PATCH 02/16] Added censor column --- setup.py | 12 +++- v6-kaplan-meier-py/central.py | 12 ++-- v6-kaplan-meier-py/partial.py | 6 +- v6-kaplan-meier-py/sql/standard_features.sql | 65 ++++++++++---------- 4 files changed, 52 insertions(+), 43 deletions(-) diff --git a/setup.py b/setup.py index 37bf483..dfbec51 100644 --- a/setup.py +++ b/setup.py @@ -16,6 +16,14 @@ url="https://github.com/vantage6/v6-kaplan-meier-py", packages=find_packages(), python_requires=">=3.10", - install_requires=["vantage6-algorithm-tools==4.4.1", "numpy", "pandas"], - package_data={"v6_kaplan_meier_py": ["sql/*.sql"]}, + install_requires=[ + "vantage6-algorithm-tools==4.5.3", + "numpy", + "pandas", + "rpy2", + "ohdsi-common", + "ohdsi-database-connector", + "ohdsi-sqlrender", + ], + package_data={"v6-kaplan-meier-py": ["sql/*.sql"]}, ) diff --git a/v6-kaplan-meier-py/central.py b/v6-kaplan-meier-py/central.py index 5385242..b395701 100644 --- a/v6-kaplan-meier-py/central.py +++ b/v6-kaplan-meier-py/central.py @@ -22,7 +22,7 @@ def central( client: AlgorithmClient, cohort_task_id: int, - shared_cohort_id: int, + shared_cohort_id: str, time_column_name: str, censor_column_name: str, organizations_to_include: List[int] | None = None, @@ -60,11 +60,11 @@ def central( MINIMUM_ORGANIZATIONS = get_env_var_as_int( "KAPLAN_MEIER_MINIMUM_ORGANIZATIONS", KAPLAN_MEIER_MINIMUM_ORGANIZATIONS ) - if len(organizations_to_include) < MINIMUM_ORGANIZATIONS: - raise PrivacyThresholdViolation( - "Minimum number of organizations not met, should be at least " - f"{MINIMUM_ORGANIZATIONS}." - ) + # if len(organizations_to_include) < MINIMUM_ORGANIZATIONS: + # raise PrivacyThresholdViolation( + # "Minimum number of organizations not met, should be at least " + # f"{MINIMUM_ORGANIZATIONS}." + # ) info("Collecting unique event times") local_unique_event_times_per_node = _start_partial_and_collect_results( diff --git a/v6-kaplan-meier-py/partial.py b/v6-kaplan-meier-py/partial.py index 41a5da9..4de5be7 100644 --- a/v6-kaplan-meier-py/partial.py +++ b/v6-kaplan-meier-py/partial.py @@ -197,7 +197,7 @@ def __create_cohort_dataframe( meta_run: RunMetaData, meta_omop: OHDSIMetaData, cohort_task_id: int, - shared_cohort_id: float, + shared_cohort_id: str, ) -> list[pd.DataFrame]: """ Query the database for the data of the cohort. @@ -221,7 +221,7 @@ def __create_cohort_dataframe( # Obtain SQL file for standard features sql_path = pkg_resources.resource_filename( - "v6_kaplan_meier_py", "sql/standard_features.sql" + "v6-kaplan-meier-py", "sql/standard_features.sql" ) # SQL READ @@ -365,7 +365,7 @@ def __apply_poisson_noise(df: pd.DataFrame, time_column_name: str) -> pd.DataFra The DataFrame with Poisson noise applied to the event times column. """ __fix_random_seed() - df[time_column_name] = np.random.poisson(df[time_column_name]) + df[time_column_name] = np.random.poisson(df[time_column_name].astype(np.float64)) info("Poisson noise applied to the event times.") return df diff --git a/v6-kaplan-meier-py/sql/standard_features.sql b/v6-kaplan-meier-py/sql/standard_features.sql index a137449..92b1687 100644 --- a/v6-kaplan-meier-py/sql/standard_features.sql +++ b/v6-kaplan-meier-py/sql/standard_features.sql @@ -7,14 +7,14 @@ WITH death_query AS ( FROM @cohort_table c LEFT JOIN - @cdm_database_schema.death T + @cdm_database_schema.death T ON T.person_id = c.subject_id {@cohort_id != -1} ? {AND cohort_definition_id = @cohort_id} ), /* gender, year of birth, birth int, age */ -person_query AS ( +person_query AS ( SELECT c.subject_id, T.gender_concept_id AS gender, @@ -24,14 +24,14 @@ person_query AS ( FROM @cohort_table c LEFT JOIN - @cdm_database_schema.person T + @cdm_database_schema.person T ON T.person_id = c.subject_id {@cohort_id != -1} ? {AND cohort_definition_id = @cohort_id} ), /* condition_concept_id, condition_occurrence_start, condition_occurrence_end */ -condition_occurrence_query AS ( +condition_occurrence_query AS ( SELECT c.subject_id, T.condition_concept_id, @@ -40,14 +40,14 @@ condition_occurrence_query AS ( FROM @cohort_table c LEFT JOIN - @cdm_database_schema.condition_occurrence T + @cdm_database_schema.condition_occurrence T ON T.person_id = c.subject_id {@cohort_id != -1} ? {AND cohort_definition_id = @cohort_id} ), /* drug_concept_id, drug_exposure_start, drug_exposure_end */ -drug_exposure_query AS ( +drug_exposure_query AS ( SELECT c.subject_id, T.drug_concept_id, @@ -56,14 +56,14 @@ drug_exposure_query AS ( FROM @cohort_table c LEFT JOIN - @cdm_database_schema.drug_exposure T + @cdm_database_schema.drug_exposure T ON T.person_id = c.subject_id {@cohort_id != -1} ? {AND cohort_definition_id = @cohort_id} ), /* procedure_concept_id, procedure_occurrence_start, procedure_occurrence_end */ -procedure_occurrence_query AS ( +procedure_occurrence_query AS ( SELECT c.subject_id, T.procedure_concept_id, @@ -72,15 +72,15 @@ procedure_occurrence_query AS ( FROM @cohort_table c LEFT JOIN - @cdm_database_schema.procedure_occurrence T + @cdm_database_schema.procedure_occurrence T ON T.person_id = c.subject_id {@cohort_id != -1} ? {AND cohort_definition_id = @cohort_id} ), -/* measurement_concept_id, measurement_start, measurement_unit_concept_id, measurement_vac, +/* measurement_concept_id, measurement_start, measurement_unit_concept_id, measurement_vac, measurement_van, measurement_operator_concept_id */ -measurement_query AS ( +measurement_query AS ( SELECT c.subject_id, T.measurement_concept_id, @@ -92,15 +92,15 @@ measurement_query AS ( FROM @cohort_table c LEFT JOIN - @cdm_database_schema.measurement T + @cdm_database_schema.measurement T ON T.person_id = c.subject_id {@cohort_id != -1} ? {AND cohort_definition_id = @cohort_id} ), -/* observation_concept_id, observation_start, observation_unit_concept_id, observation_vac, +/* observation_concept_id, observation_start, observation_unit_concept_id, observation_vac, observation_van, observation_vas, observation_qualifier_concept_id */ -observation_query AS ( +observation_query AS ( SELECT c.subject_id, T.observation_concept_id, @@ -113,7 +113,7 @@ observation_query AS ( FROM @cohort_table c LEFT JOIN - @cdm_database_schema.observation T + @cdm_database_schema.observation T ON T.person_id = c.subject_id {@cohort_id != -1} ? {AND cohort_definition_id = @cohort_id} ) @@ -132,29 +132,30 @@ SELECT de.drug_concept_id, de.drug_exposure_start, de.drug_exposure_end, - po.procedure_concept_id, - po.procedure_occurrence_start, + po.procedure_concept_id, + po.procedure_occurrence_start, po.procedure_occurrence_end, - m.measurement_concept_id, - m.measurement_start, - m.measurement_unit_concept_id, - m.measurement_vac, - m.measurement_van, + m.measurement_concept_id, + m.measurement_start, + m.measurement_unit_concept_id, + m.measurement_vac, + m.measurement_van, m.measurement_operator_concept_id, - o.observation_concept_id, - o.observation_start, - o.observation_unit_concept_id, - o.observation_vac, - o.observation_van, - o.observation_vas, - o.observation_qualifier_concept_id + o.observation_concept_id, + o.observation_start, + o.observation_unit_concept_id, + o.observation_vac, + o.observation_van, + o.observation_vas, + o.observation_qualifier_concept_id, + CASE WHEN d.death_int IS NOT NULL THEN 1 ELSE 0 END AS censor FROM - person_query p + person_query p FULL OUTER JOIN death_query d ON d.subject_id = p.subject_id FULL OUTER JOIN - condition_occurrence_query c + condition_occurrence_query c ON p.subject_id = c.subject_id FULL OUTER JOIN drug_exposure_query de @@ -168,7 +169,7 @@ FULL OUTER JOIN FULL OUTER JOIN observation_query o ON p.subject_id = o.subject_id - + WHERE (condition_concept_id IN (@incl_condition_concept_id) OR (@incl_condition_concept_id) IS NULL) AND (procedure_concept_id IN (@incl_procedure_concept_id) OR (@incl_procedure_concept_id) IS NULL) AND (measurement_concept_id IN (@incl_measurement_concept_id) OR (@incl_measurement_concept_id) IS NULL) From 27fac072e3f4f3938461b847ff73c38e111ddf7e Mon Sep 17 00:00:00 2001 From: "F.C. Martin" Date: Wed, 12 Jun 2024 09:27:15 +0200 Subject: [PATCH 03/16] Only apply poisson noise to values (and not NaNs) --- v6-kaplan-meier-py/partial.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/v6-kaplan-meier-py/partial.py b/v6-kaplan-meier-py/partial.py index 4de5be7..62faf4d 100644 --- a/v6-kaplan-meier-py/partial.py +++ b/v6-kaplan-meier-py/partial.py @@ -365,7 +365,11 @@ def __apply_poisson_noise(df: pd.DataFrame, time_column_name: str) -> pd.DataFra The DataFrame with Poisson noise applied to the event times column. """ __fix_random_seed() - df[time_column_name] = np.random.poisson(df[time_column_name].astype(np.float64)) + info(f"Applying Poisson noise to the event times: {time_column_name}") + # df[time_column_name] = np.random.poisson(df[time_column_name]) + df.loc[df[time_column_name].notnull(), time_column_name] = np.random.poisson( + df.loc[df[time_column_name].notnull(), time_column_name] + ) info("Poisson noise applied to the event times.") return df From c60938942f0d4a00b2c2a789e239838cde05d814 Mon Sep 17 00:00:00 2001 From: "F.C. Martin" Date: Wed, 12 Jun 2024 14:01:52 +0200 Subject: [PATCH 04/16] Added `surv_int` to the output --- v6-kaplan-meier-py/sql/standard_features.sql | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/v6-kaplan-meier-py/sql/standard_features.sql b/v6-kaplan-meier-py/sql/standard_features.sql index 92b1687..0778ddd 100644 --- a/v6-kaplan-meier-py/sql/standard_features.sql +++ b/v6-kaplan-meier-py/sql/standard_features.sql @@ -4,6 +4,7 @@ WITH death_query AS ( SELECT c.subject_id, DATEDIFF(DAY, cohort_start_date, death_date) AS death_int + DATEDIFF(DAY, cohort_start_date, cohort_end_date) AS cohort_int FROM @cohort_table c LEFT JOIN @@ -148,7 +149,8 @@ SELECT o.observation_van, o.observation_vas, o.observation_qualifier_concept_id, - CASE WHEN d.death_int IS NOT NULL THEN 1 ELSE 0 END AS censor + CASE WHEN d.death_int IS NOT NULL THEN 1 ELSE 0 END AS censor, + COALESCE(d.death_int, d.cohort_int) AS surv_int FROM person_query p FULL OUTER JOIN From 42f9b12101be99ada844937de96888d76f0b9357 Mon Sep 17 00:00:00 2001 From: "F.C. Martin" Date: Wed, 12 Jun 2024 15:15:32 +0200 Subject: [PATCH 05/16] Fixed syntax error --- v6-kaplan-meier-py/sql/standard_features.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v6-kaplan-meier-py/sql/standard_features.sql b/v6-kaplan-meier-py/sql/standard_features.sql index 0778ddd..d7dcb54 100644 --- a/v6-kaplan-meier-py/sql/standard_features.sql +++ b/v6-kaplan-meier-py/sql/standard_features.sql @@ -3,7 +3,7 @@ WITH death_query AS ( SELECT c.subject_id, - DATEDIFF(DAY, cohort_start_date, death_date) AS death_int + DATEDIFF(DAY, cohort_start_date, death_date) AS death_int, DATEDIFF(DAY, cohort_start_date, cohort_end_date) AS cohort_int FROM @cohort_table c From ac498c144444777f3bfc6ebf489cd5eb57eac6fa Mon Sep 17 00:00:00 2001 From: "F.C. Martin" Date: Mon, 17 Jun 2024 09:49:40 +0200 Subject: [PATCH 06/16] Updated Makefile for consistency --- Makefile | 36 +++++++++++++++++++++++++++--------- requirements.txt | 2 +- setup.py | 2 +- 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/Makefile b/Makefile index 3f8b167..8334153 100644 --- a/Makefile +++ b/Makefile @@ -1,25 +1,43 @@ VANTAGE6_VERSION ?= 4.0.0 -TAG ?= latest +TAG ?= cotopaxi REGISTRY ?= harbor2.vantage6.ai +REGISTRY_PROJECT ?= blueberry PLATFORMS ?= linux/amd64 - -# Use `make PUSH_REG=true` to push images to registry after building -PUSH_REG ?= false +TAG ?= =latest +BASE ?= 4.5 +IMAGE ?= kaplan-meier # We use a conditional (true on any non-empty string) later. To avoid # accidents, we don't use user-controlled PUSH_REG directly. # See: https://www.gnu.org/software/make/manual/html_node/Conditional-Functions.html +PUSH_REG ?= false _condition_push := ifeq ($(PUSH_REG), true) _condition_push := not_empty_so_true endif +help: + @echo "Usage:" + @echo " make help - show this message" + @echo " make image - build the image" + @echo "" + @echo "Using " + @echo " registry: ${REGISTRY}/${REGISTRY_PROJECT}" + @echo " image: ${IMAGE}" + @echo " tag: ${TAG}-v6-${VANTAGE6_VERSION}" + @echo " base: ${BASE}" + @echo " platforms: ${PLATFORMS}" + @echo " vantage6: ${VANTAGE6_VERSION}" + @echo "" + image: - @echo "Building ${REGISTRY}/blueberry/kaplan-meier:${TAG}-v6-${VANTAGE6_VERSION}" - @echo "Building ${REGISTRY}/blueberry/kaplan-meier:latest" + @echo "Building ${REGISTRY}/${REGISTRY_PROJECT}/${IMAGE}:${TAG}-v6-${VANTAGE6_VERSION}" + @echo "Building ${REGISTRY}/${REGISTRY_PROJECT}/${IMAGE}:latest" docker buildx build \ - --tag ${REGISTRY}/blueberry/kaplan-meier:${TAG} \ - --tag ${REGISTRY}/blueberry/kaplan-meier:latest \ + --tag ${REGISTRY}/${REGISTRY_PROJECT}/${IMAGE}:${TAG}-v6-${VANTAGE6_VERSION} \ + --tag ${REGISTRY}/${REGISTRY_PROJECT}/${IMAGE}:latest \ --platform ${PLATFORMS} \ + --build-arg TAG=${TAG} \ + --build-arg BASE=${BASE} \ -f ./Dockerfile \ - $(if ${_condition_push},--push .,.) \ No newline at end of file + $(if ${_condition_push},--push .,.) diff --git a/requirements.txt b/requirements.txt index 308db1a..c0470e6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -vantage6-algorithm-tools==4.4.1 +vantage6-algorithm-tools lifelines==0.28.0 pytest==8.1.1 flake8==7.0.0 diff --git a/setup.py b/setup.py index dfbec51..febc986 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ packages=find_packages(), python_requires=">=3.10", install_requires=[ - "vantage6-algorithm-tools==4.5.3", + "vantage6-algorithm-tools", "numpy", "pandas", "rpy2", From 440ddaaf0a60983c9e087880ed704fc307dabcb8 Mon Sep 17 00:00:00 2001 From: "F.C. Martin" Date: Mon, 17 Jun 2024 11:09:08 +0200 Subject: [PATCH 07/16] Added algorithm store definitions --- .gitignore | 2 +- algorithm_store.json | 58 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) create mode 100644 algorithm_store.json diff --git a/.gitignore b/.gitignore index 0dce6bb..40efd82 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,7 @@ __pycache__/ *.egg-info/ __pycache__/ -*.json + analysis/ .DS_Store .ipynb_checkpoints diff --git a/algorithm_store.json b/algorithm_store.json new file mode 100644 index 0000000..06298ae --- /dev/null +++ b/algorithm_store.json @@ -0,0 +1,58 @@ +{ + "name": "Kaplan-Meier on OMOP", + "partitioning": "horizontal", + "vantage6_version": "4.5", + "functions": [ + { + "name": "kaplan_meier_central", + "databases": [ + { + "name": "Database", + "description": "Database to use for the Kaplan-Meier curve" + } + ], + "ui_visualizations": [ + { + "name": "Survival time table", + "schema": { + "location": [], + "columns": [] + }, + "description": "Surival time table for the cohort.", + "type": "table" + } + ], + "arguments": [ + { + "type": "integer", + "description": "Task ID of the task that created the cohort in the database.", + "name": "cohort_task_id" + }, + { + "type": "string", + "description": "The cohort ID to use for the Kaplan-Meier curve.", + "name": "shared_cohort_id" + }, + { + "type": "string", + "description": "The column name of the time variable.", + "name": "time_column_name" + }, + { + "type": "string", + "description": "The column name of the censor.", + "name": "censor_column_name" + }, + { + "type": "organization_list", + "description": "List of organizations to include in the analysis.", + "name": "organizations_to_include" + } + ], + "description": "Compute a Kaplan-Meier curves for a cohort of patients.", + "type": "central" + } + ], + "description": "Compute a Kaplan-Meier curves.", + "image": "harbor2.vantage6.ai/blueberry/kaplan-meier" + } \ No newline at end of file From e819596ec4f033406256784965e63734c5e10326 Mon Sep 17 00:00:00 2001 From: "F.C. Martin" Date: Mon, 17 Jun 2024 11:09:28 +0200 Subject: [PATCH 08/16] Renamed central function --- v6-kaplan-meier-py/central.py | 2 +- v6-kaplan-meier-py/globals.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/v6-kaplan-meier-py/central.py b/v6-kaplan-meier-py/central.py index b395701..201ed37 100644 --- a/v6-kaplan-meier-py/central.py +++ b/v6-kaplan-meier-py/central.py @@ -19,7 +19,7 @@ @algorithm_client -def central( +def kaplan_meier_central( client: AlgorithmClient, cohort_task_id: int, shared_cohort_id: str, diff --git a/v6-kaplan-meier-py/globals.py b/v6-kaplan-meier-py/globals.py index 0f47305..7ab029d 100644 --- a/v6-kaplan-meier-py/globals.py +++ b/v6-kaplan-meier-py/globals.py @@ -4,7 +4,7 @@ KAPLAN_MEIER_MINIMUM_ORGANIZATIONS = 3 -KAPLAN_MEIER_MINIMUM_NUMBER_OF_RECORDS = 3 +KAPLAN_MEIER_MINIMUM_NUMBER_OF_RECORDS = 10 KAPLAN_MEIER_ALLOWED_EVENT_TIME_COLUMNS_REGEX = ".*" From a470ea59fbc8312b20267a6a63491c7801914fae Mon Sep 17 00:00:00 2001 From: "F.C. Martin" Date: Mon, 17 Jun 2024 11:26:11 +0200 Subject: [PATCH 09/16] Updated readme --- README.md | 157 +++++++++++------------------------------------------- 1 file changed, 31 insertions(+), 126 deletions(-) diff --git a/README.md b/README.md index f54e1e3..ded1546 100644 --- a/README.md +++ b/README.md @@ -1,142 +1,47 @@ -# Federated Kaplan-Meier Curve Calculation with vantage6 +

+
+ vantage6 +

-This repository contains an implementation of the Kaplan-Meier curve calculation designed for federated learning environments via the vantage6 framework. It allows for the estimation of survival probabilities across distributed datasets without sharing the patient-specific information. This method supports privacy-preserving data analysis in medical research and other fields where event-time analysis is critical. +

+ A Privacy Enhancing Technologies Operations (PETOps) platform +

-The algorithm operates within the vantage6 infrastructure, a platform supporting federated learning, to enable institutions to perform survival analysis while maintaining data privacy. The initial idea was based on contributions from Benedetta Gottardelli (benedetta.gottardelli@unicatt.it). - -Follow the instructions in subsequent sections to set up and execute the federated Kaplan-Meier analysis. - - -## Usage - -This section provides a comprehensive guide on how to use the repository to perform federated Kaplan-Meier analysis, from initializing the client to executing the task and retrieving the results. - -To perform Kaplan-Meier curve calculation in a federated learning context using vantage6, follow these instructions: - -1. **Install vantage6 Client** (if not already installed): -```bash -pip install vantage6-client -``` - -2. **Initialize vantage6 Client** - -```python -from vantage6.client import Client - -# Load your configuration settings from a file or environment -config = { - 'server_url': '', - 'server_port': , - 'server_api': '', - 'username': '', - 'password': '', - 'organization_key': '' -} - -client = Client(config['server_url'], config['server_port'], config['server_api']) -client.authenticate(username=config['username'], password=config['password']) -client.setup_encryption(config['organization_key']) -``` - -Replace the placeholders in `config` with your actual configuration details. - -3. **Define Algorithm Input** -```python -input_ = { - 'method': 'master', - 'kwargs': { - 'time_column_name': 'time_to_event', - 'censor_column_name': 'event_occurred', - 'organization_ids': [1, 2, 3], # Example organization IDs - 'bin_size': None # Or a specific bin size - } -} -``` - -Set your specific time and censor column names, organization IDs, and bin size if needed. - -4. **Create and Run the Task** -```python -task = client.task.create( - collaboration=3, # Use your specific collaboration ID - organizations=[1, 2, 3], # List your organization IDs - name='Kaplan-Meier Task', # Give your task a specific name - image='ghcr.io/mdw-nl/v6-km-studyathon:v1', # Specify the desired algorithm Docker image version - description='Survival analysis using Kaplan-Meier', # Describe the task - databases=[{'label': 'my_database_label'}], # Use your database label - input_=input_ -) -``` +-------------------- +# Kaplan-Meier on OHDSI -Provide actual values for the `collaboration`, `organizations`, `name`, `image`, `description`, and `databases` fields. +This repository contains an implementation of the Kaplan-Meier curve calculation designed for federated learning environments via the vantage6 framework. It allows for the estimation of survival probabilities across distributed datasets without sharing the patient-specific information. This method supports privacy-enhancing data analysis in medical research and other fields where event-time analysis is critical. -5. **Monitor and Retrieve Results**: Utilize the vantage6 client methods to check the status of the task and retrieve the results when the task is complete. - -Ensure all prerequisites are met and configurations are set by referring to the 'Installation and Setup' section before proceeding with the above steps. - - -## Data Format and Preprocessing - -To ensure successful Kaplan-Meier curve calculation, databases at each node need to be structured with the necessary columns: - -- `time_column_name`: Indicates the time from the start point (e.g., diagnosis) to either an event of interest (e.g., death) or right censoring. Should be of a numeric dtype (integer or float). - -- `censor_column_name`: A binary column indicating whether the event of interest occurred (1) or if the data was censored (0). Needs to be of integer dtype. - -Optionally, a `patient_id` column can be included as a unique identifier for each subject, but it is not required for the analysis. - -### Sample Table Structure: - -| Column Name | Description | Dtype | Required | -|-----------------------|---------------------------------------------------|---------|----------| -| patient_id | Unique identifier for each patient (optional) | String | No | -| time_to_event | Duration until event of interest or censoring | Numeric | Yes | -| event_occurred | Event occurrence indicator (1: yes, 0: no) | Integer | Yes | -| additional_column1 | Description of optional additional data | ... | No | -| additional_column2 | Description of optional additional data | ... | No | -| ... | ... | ... | ... | - -`time_to_event` refers to your `time_column_name` and `event_occurred` to your `censor_column_name`, as defined in the input parameters of the algorithm. - -### Preprocessing Steps: - -1. Confirm no missing values in numeric columns like `time_column_name`. Handle any missing data through imputation or exclusion before proceeding. - -2. Ensure `censor_column_name` is binary (containing only 0s and 1s) and of integer dtype. - -3. Perform any necessary data cleaning, normalization, or datatype conversion on additional columns according to the specifics of your study and requirements for the analysis. - -Be mindful that any domain-specific preprocessing, such as adjusting time units or categorizing features, should be completed prior to analysis. - -Follow these specifications to prepare your data correctly for a federated analysis with the Kaplan-Meier algorithm on vantage6. +The algorithm operates within the vantage6 infrastructure, a platform supporting federated learning, to enable institutions to perform survival analysis while maintaining data privacy. The initial idea was based on contributions from Benedetta Gottardelli (benedetta.gottardelli@unicatt.it). +This initial version has been updated and adapted for the [BlueBerry](https://euracan.eu/registries/blueberry/) project. -## Output Interpretation +Follow the instructions in subsequent sections to set up and execute the federated Kaplan-Meier analysis. -The Kaplan-Meier curve calculation returns a DataFrame with the following columns, including their data types and descriptions: -| Column Name | Dtype | Description | -|-----------------------------|----------------|---------------------------------------------------------------------------------| -| `` | Numeric (float or int) | Timestamps of the events or censored data, based on the provided time data. | -| `removed` | Integer | Number of subjects removed from the risk set in each time interval. | -| `observed` | Integer | Observed number of events of interest (e.g., death or failure) at each timestamp.| -| `censored` | Integer | Number of subjects censored at each timestamp. | -| `at_risk` | Integer | Number of individuals at risk at each timestamp. | -| `hazard` | Float | Hazard rate at each timestamp, calculated as `observed / at_risk`. | -| `survival_cdf` | Float | Cumulative survival probability up to and including each timestamp. | +## Privacy Gaurds -* Replace `` with the column name you specified in the input configuration for the time data. +### Minimum number of organizations +In order to minimize the risk of reconstruction the number of organizations should be at least 3. The value of this threshold can be changed by setting `KAPLAN_MEIER_MINIMUM_ORGANIZATIONS`. Note that this threshold can be set by the aggregator party only! -### How to Interpret the Output: +### Minimum number of records +The algorithm will only share information if there are at least n records present in the local dataset. This can be set using the variable `KAPLAN_MEIER_MINIMUM_NUMBER_OF_RECORDS`. -- `` shows each recorded or estimated event/censoring timestamp, which is not an interval but discrete points in time. +### Fix event time column +In order to limit the options the user has for selecting the event time column the `KAPLAN_MEIER_ALLOWED_EVENT_TIME_COLUMNS_REGEX` can be set to a comma separated list. Each element in the list can be a regex pattern. -- `observed` provides the count of events that occurred, while `censored` shows how many subjects' data did not reach an event by the end of observation. +### Noise to event times +In order to protect the individual event times noise can be added to this column. The column is user defined, see “Fixed event time column” section. -- `at_risk` is critical as it denotes the number of subjects that could potentially experience the event at each timestamp. +The type of noise can be set through `KAPLAN_MEIER_TYPE_NOISE`. This can be one of the following: -- The `hazard` rate gives an indication of the instant risk of event occurrence over time. +* `NONE` – no noise will be added to the time event columns +* `GAUSSIAN` – Gaussian noise will be added, the amount of noise can be controlled to a signal to noise ratio: `KAPLAN_MEIER_PRIVACY_SNR_EVENT_TIME`. The SNR is defined as the amount of noise compared to the standard deviation of the original signal. +* `POISSON` – Poisson noise will be applied. -- `survival_cdf` is the key metric representing the estimated probability of surviving beyond each timestamp in ``. +## Build +In order to build its best to use the makefile. -The analysis is commonly graphed as the Kaplan-Meier curve plotting `survival_cdf` versus `` to depict survival trends over time. Periods with a high `censored` count should be carefully interpreted, as they may affect the accuracy of the survival analysis. \ No newline at end of file +```bash +make image VANTAGE6_VERSION=4.5.3 +``` \ No newline at end of file From eed9a8d65278d841a9492db8650bbb55f136745c Mon Sep 17 00:00:00 2001 From: "F.C. Martin" Date: Mon, 17 Jun 2024 11:47:27 +0200 Subject: [PATCH 10/16] Updated Dockerfile --- Dockerfile | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/Dockerfile b/Dockerfile index d8f5964..e9f2739 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,14 +1,13 @@ -# This specifies our base image. This base image contains some commonly used -# dependancies and an install from all vantage6 packages. You can specify a -# different image here (e.g. python:3). In that case it is important that -# `vantage6-client` is a dependancy of you project as this contains the wrapper -# we are using in this example. -FROM harbor2.vantage6.ai/infrastructure/algorithm-ohdsi-base:4.5 - -# Change this to the package name of your project. This needs to be the same -# as what you specified for the name in the `setup.py`. +ARG BASE=4.5 +ARG TAG=latest ARG PKG_NAME="v6-kaplan-meier-py" +FROM harbor2.vantage6.ai/infrastructure/algorithm-ohdsi-base:${BASE} + +LABEL version=${TAG} +LABEL maintainer="F.C. Martin " +LABEL maintainer="A.J. van Gestel " + # This will install your algorithm into this image. COPY . /app RUN pip install /app From 9f3b6c5d1c12f8da55760c4917d2471f69bb5fb1 Mon Sep 17 00:00:00 2001 From: "F.C. Martin" Date: Mon, 17 Jun 2024 14:46:24 +0200 Subject: [PATCH 11/16] Added line visualization --- algorithm_store.json | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/algorithm_store.json b/algorithm_store.json index 06298ae..d987aa0 100644 --- a/algorithm_store.json +++ b/algorithm_store.json @@ -20,6 +20,15 @@ }, "description": "Surival time table for the cohort.", "type": "table" + }, + { + "name": "Kaplan-Meier curve", + "properties": { + "x": "SURV_INT", + "y": "survival_cdf" + }, + "description": "Kaplan-Meier curve for the cohort.", + "type": "line" } ], "arguments": [ From 195aa05778a6eca791c8257ad7e1a3a422da3c07 Mon Sep 17 00:00:00 2001 From: "F.C. Martin" Date: Mon, 17 Jun 2024 15:08:50 +0200 Subject: [PATCH 12/16] Moved Docker `ARG`s bellow the `FROM` specification --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index e9f2739..bddbdeb 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,8 +1,8 @@ ARG BASE=4.5 ARG TAG=latest -ARG PKG_NAME="v6-kaplan-meier-py" FROM harbor2.vantage6.ai/infrastructure/algorithm-ohdsi-base:${BASE} +ARG PKG_NAME="v6-kaplan-meier-py" LABEL version=${TAG} LABEL maintainer="F.C. Martin " From b83b4c6e5cdf28ecb2b0d9a4df7761acf43bb81d Mon Sep 17 00:00:00 2001 From: "F.C. Martin" Date: Tue, 18 Jun 2024 09:38:28 +0200 Subject: [PATCH 13/16] Moved arguments --- Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index bddbdeb..a2614de 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,7 @@ +FROM harbor2.vantage6.ai/infrastructure/algorithm-ohdsi-base:${BASE} + ARG BASE=4.5 ARG TAG=latest - -FROM harbor2.vantage6.ai/infrastructure/algorithm-ohdsi-base:${BASE} ARG PKG_NAME="v6-kaplan-meier-py" LABEL version=${TAG} From 261c89fbe2535b799518fd0dde625affe1dd05d8 Mon Sep 17 00:00:00 2001 From: "F.C. Martin" Date: Wed, 19 Jun 2024 15:11:52 +0200 Subject: [PATCH 14/16] Updated database label for the UI --- algorithm_store.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/algorithm_store.json b/algorithm_store.json index d987aa0..685920f 100644 --- a/algorithm_store.json +++ b/algorithm_store.json @@ -7,7 +7,7 @@ "name": "kaplan_meier_central", "databases": [ { - "name": "Database", + "name": "OMOP CDM Database", "description": "Database to use for the Kaplan-Meier curve" } ], From ee19db13692dbc3146bb4cac76cd31d27069581b Mon Sep 17 00:00:00 2001 From: "F.C. Martin" Date: Thu, 4 Jul 2024 09:13:32 +0200 Subject: [PATCH 15/16] Some minor fixes --- Dockerfile | 5 +++-- v6-kaplan-meier-py/partial.py | 14 +++++++++++++- v6-kaplan-meier-py/sql/standard_features.sql | 14 +++++++------- 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/Dockerfile b/Dockerfile index a2614de..daf1354 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,8 @@ -FROM harbor2.vantage6.ai/infrastructure/algorithm-ohdsi-base:${BASE} - ARG BASE=4.5 ARG TAG=latest + +FROM harbor2.vantage6.ai/infrastructure/algorithm-ohdsi-base:${BASE} + ARG PKG_NAME="v6-kaplan-meier-py" LABEL version=${TAG} diff --git a/v6-kaplan-meier-py/partial.py b/v6-kaplan-meier-py/partial.py index 62faf4d..bc96840 100644 --- a/v6-kaplan-meier-py/partial.py +++ b/v6-kaplan-meier-py/partial.py @@ -173,7 +173,10 @@ def _privacy_gaurds(df: pd.DataFrame, time_column_name: str) -> pd.DataFrame: "KAPLAN_MEIER_MINIMUM_NUMBER_OF_RECORDS", KAPLAN_MEIER_MINIMUM_NUMBER_OF_RECORDS ) if len(df) <= MINIMUM_NUMBER_OF_RECORDS: - raise InputError("Number of records in 'df' must be greater than 3.") + raise InputError( + "Number of records in 'df' must be greater than " + f"{MINIMUM_NUMBER_OF_RECORDS}." + ) info("Check that the selected time column is allowed by the node") ALLOWED_EVENT_TIME_COLUMNS_REGEX = get_env_var_as_list( @@ -214,10 +217,12 @@ def __create_cohort_dataframe( """ # Get the task id of the task that created the cohort at this node cohort_table = f"cohort_{cohort_task_id}_{meta_run.node_id}" + info(f"Using cohort table: {cohort_table}") # Obtain the cohort IDs by combining the shared ids (equal over all nodes) with the # local node id cohort_id = float(f"{meta_run.node_id}{shared_cohort_id}") + info(f"Using cohort ID: {cohort_id}") # Obtain SQL file for standard features sql_path = pkg_resources.resource_filename( @@ -231,10 +236,13 @@ def __create_cohort_dataframe( error(f"Failed to read SQL file: {e}") traceback.print_exc() raise e + info(f"Read SQL file: {sql_path}") + info("Start query sequence the database") df = _query_database(connection, raw_sql, cohort_table, cohort_id, meta_omop) # NACHARS + info("Post-processing the data") df["OBSERVATION_VAS"] = df["OBSERVATION_VAS"].apply( lambda val: np.nan if isinstance(val, NACharacterType) else val ) @@ -255,6 +263,7 @@ def _query_database( ) -> pd.DataFrame: # RENDER + info("Rendering the SQL") sql = render( sql, cohort_table=f"{meta_omop.results_schema}.{cohort_table}", @@ -267,9 +276,11 @@ def _query_database( ) # TRANSLATE + info("Translating the SQL") sql = translate(sql, target_dialect="postgresql") # QUERY + info("Querying the database") try: data_r = query_sql(connection, sql) except Exception as e: @@ -278,6 +289,7 @@ def _query_database( with open("errorReportSql.txt", "r") as f: error(f.read()) + info("Convert") # CONVERT return convert_from_r(data_r) diff --git a/v6-kaplan-meier-py/sql/standard_features.sql b/v6-kaplan-meier-py/sql/standard_features.sql index d7dcb54..3b846b1 100644 --- a/v6-kaplan-meier-py/sql/standard_features.sql +++ b/v6-kaplan-meier-py/sql/standard_features.sql @@ -10,7 +10,7 @@ WITH death_query AS ( LEFT JOIN @cdm_database_schema.death T ON T.person_id = c.subject_id - {@cohort_id != -1} ? {AND cohort_definition_id = @cohort_id} + {@cohort_id != -1} ? {WHERE cohort_definition_id = @cohort_id} ), /* gender, year of birth, birth int, age @@ -27,7 +27,7 @@ person_query AS ( LEFT JOIN @cdm_database_schema.person T ON T.person_id = c.subject_id - {@cohort_id != -1} ? {AND cohort_definition_id = @cohort_id} + {@cohort_id != -1} ? {WHERE cohort_definition_id = @cohort_id} ), /* condition_concept_id, condition_occurrence_start, condition_occurrence_end @@ -43,7 +43,7 @@ condition_occurrence_query AS ( LEFT JOIN @cdm_database_schema.condition_occurrence T ON T.person_id = c.subject_id - {@cohort_id != -1} ? {AND cohort_definition_id = @cohort_id} + {@cohort_id != -1} ? {WHERE cohort_definition_id = @cohort_id} ), /* drug_concept_id, drug_exposure_start, drug_exposure_end @@ -59,7 +59,7 @@ drug_exposure_query AS ( LEFT JOIN @cdm_database_schema.drug_exposure T ON T.person_id = c.subject_id - {@cohort_id != -1} ? {AND cohort_definition_id = @cohort_id} + {@cohort_id != -1} ? {WHERE cohort_definition_id = @cohort_id} ), /* procedure_concept_id, procedure_occurrence_start, procedure_occurrence_end @@ -75,7 +75,7 @@ procedure_occurrence_query AS ( LEFT JOIN @cdm_database_schema.procedure_occurrence T ON T.person_id = c.subject_id - {@cohort_id != -1} ? {AND cohort_definition_id = @cohort_id} + {@cohort_id != -1} ? {WHERE cohort_definition_id = @cohort_id} ), /* measurement_concept_id, measurement_start, measurement_unit_concept_id, measurement_vac, @@ -95,7 +95,7 @@ measurement_query AS ( LEFT JOIN @cdm_database_schema.measurement T ON T.person_id = c.subject_id - {@cohort_id != -1} ? {AND cohort_definition_id = @cohort_id} + {@cohort_id != -1} ? {WHERE cohort_definition_id = @cohort_id} ), /* observation_concept_id, observation_start, observation_unit_concept_id, observation_vac, @@ -116,7 +116,7 @@ observation_query AS ( LEFT JOIN @cdm_database_schema.observation T ON T.person_id = c.subject_id - {@cohort_id != -1} ? {AND cohort_definition_id = @cohort_id} + {@cohort_id != -1} ? {WHERE cohort_definition_id = @cohort_id} ) From 60375f516b91a83d72c7e78cf26bc9d3558f58be Mon Sep 17 00:00:00 2001 From: "F.C. Martin" Date: Tue, 15 Oct 2024 09:49:59 +0200 Subject: [PATCH 16/16] Removed the node_id from the cohort_id --- v6-kaplan-meier-py/partial.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/v6-kaplan-meier-py/partial.py b/v6-kaplan-meier-py/partial.py index bc96840..60a8a9d 100644 --- a/v6-kaplan-meier-py/partial.py +++ b/v6-kaplan-meier-py/partial.py @@ -219,9 +219,7 @@ def __create_cohort_dataframe( cohort_table = f"cohort_{cohort_task_id}_{meta_run.node_id}" info(f"Using cohort table: {cohort_table}") - # Obtain the cohort IDs by combining the shared ids (equal over all nodes) with the - # local node id - cohort_id = float(f"{meta_run.node_id}{shared_cohort_id}") + cohort_id = float(f"{shared_cohort_id}") info(f"Using cohort ID: {cohort_id}") # Obtain SQL file for standard features