Skip to content

Commit

Permalink
Sync with ES Module v0.6.2 (#158)
Browse files Browse the repository at this point in the history
* Sync with ES Module v0.6.2
  • Loading branch information
anthonysena authored Sep 3, 2024
1 parent c1b365f commit 53c83d1
Show file tree
Hide file tree
Showing 6 changed files with 818 additions and 12 deletions.
1 change: 1 addition & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Imports:
methods,
ParallelLogger (>= 3.1.0),
purrr,
readr,
ResultModelManager (>= 0.5.8),
rlang,
SqlRender (>= 1.18.0)
Expand Down
42 changes: 30 additions & 12 deletions R/Module-EvidenceSynthesis.R
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ EvidenceSynthesisModule <- R6::R6Class(
settings = jobContext$settings$evidenceSynthesisAnalysisList,
esDiagnosticThresholds = jobContext$settings$esDiagnosticThresholds,
resultsFolder = resultsFolder,
minCellCount = jobContext$moduleExecutionSettings$minCellCount
minCellCount = jobContext$moduleExecutionSettings$minCellCount,
maxCores = jobContext$moduleExecutionSettings$maxCores
)

file.copy(
Expand Down Expand Up @@ -299,7 +300,7 @@ EvidenceSynthesisModule <- R6::R6Class(
fileName <- file.path(resultsFolder, paste0(outputTable, ".csv"))
private$.writeToCsv(data = diagnostics, fileName = fileName, append = FALSE)
},
.executeEvidenceSynthesis = function(connectionDetails, databaseSchema, settings, esDiagnosticThresholds, resultsFolder, minCellCount) {
.executeEvidenceSynthesis = function(connectionDetails, databaseSchema, settings, esDiagnosticThresholds, resultsFolder, minCellCount, maxCores) {
connection <- DatabaseConnector::connect(connectionDetails)
on.exit(DatabaseConnector::disconnect(connection))

Expand All @@ -321,11 +322,12 @@ EvidenceSynthesisModule <- R6::R6Class(
databaseSchema = databaseSchema,
resultsFolder = resultsFolder,
minCellCount = minCellCount,
esDiagnosticThresholds = esDiagnosticThresholds
esDiagnosticThresholds = esDiagnosticThresholds,
maxCores = maxCores
))
},
# analysisSettings = settings[[4]]
.doAnalysis = function(analysisSettings, connection, databaseSchema, resultsFolder, minCellCount, esDiagnosticThresholds) {
.doAnalysis = function(analysisSettings, connection, databaseSchema, resultsFolder, minCellCount, esDiagnosticThresholds, maxCores) {
perDbEstimates <- private$.getPerDatabaseEstimates(
connection = connection,
databaseSchema = databaseSchema,
Expand Down Expand Up @@ -357,7 +359,7 @@ EvidenceSynthesisModule <- R6::R6Class(
fullKeys <- perDbEstimates$estimates[, c(perDbEstimates$key, "analysisId")] |>
distinct()

cluster <- ParallelLogger::makeCluster(min(10, jobContext$moduleExecutionSettings$maxCores))
cluster <- ParallelLogger::makeCluster(min(10, maxCores))
ParallelLogger::clusterRequire(cluster, "dplyr")
on.exit(ParallelLogger::stopCluster(cluster))

Expand All @@ -379,13 +381,13 @@ EvidenceSynthesisModule <- R6::R6Class(
if (analysisSettings$evidenceSynthesisSource$sourceMethod == "CohortMethod") {
controlKey <- c("targetId", "comparatorId", "analysisId")
} else if (analysisSettings$evidenceSynthesisSource$sourceMethod == "SelfControlledCaseSeries") {
controlKey <- c("covariateId", "analysisId")
controlKey <- c("exposureId", "nestingCohortId", "covariateId", "analysisId")
}
} else if (analysisSettings$controlType == "exposure") {
if (analysisSettings$evidenceSynthesisSource$sourceMethod == "CohortMethod") {
controlKey <- c("outcomeId", "analysisId")
} else if (analysisSettings$evidenceSynthesisSource$sourceMethod == "SelfControlledCaseSeries") {
controlKey <- c("exposuresOutcomeSetId", "analysisId")
controlKey <- c("outcomeId", "analysisId")
}
} else {
stop(sprintf("Unknown control type '%s'", analysisSettings$controlType))
Expand Down Expand Up @@ -452,7 +454,7 @@ EvidenceSynthesisModule <- R6::R6Class(
},
# group = split(estimates, groupKeys)[[1]]
.calibrateEstimates = function(group) {
ncs <- group[group$trueEffectSize == 1 & !is.na(group$seLogRr), ]
ncs <- group[!is.na(group$trueEffectSize) & group$trueEffectSize == 1 & !is.na(group$seLogRr), ]
pcs <- group[!is.na(group$trueEffectSize) & group$trueEffectSize != 1 & !is.na(group$seLogRr), ]
if (nrow(ncs) >= 5) {
null <- EmpiricalCalibration::fitMcmcNull(logRr = ncs$logRr, seLogRr = ncs$seLogRr)
Expand Down Expand Up @@ -825,7 +827,7 @@ EvidenceSynthesisModule <- R6::R6Class(
.data$trueEffectSize
))
} else if (evidenceSynthesisSource$sourceMethod == "SelfControlledCaseSeries") {
key <- c("exposuresOutcomeSetId", "covariateId")
key <- c("exposureId", "nestingCohortId", "outcomeId", "exposuresOutcomeSetId", "covariateId")
databaseIds <- evidenceSynthesisSource$databaseIds
analysisIds <- evidenceSynthesisSource$analysisIds
if (private$.hasUnblindForEvidenceSynthesisColumn(connection, databaseSchema, "sccs_diagnostics_summary")) {
Expand All @@ -834,20 +836,25 @@ EvidenceSynthesisModule <- R6::R6Class(
unblindColumn <- "unblind"
}
sql <- "SELECT sccs_result.*,
sccs_covariate.era_id AS exposure_id,
outcome_id,
nesting_cohort_id,
mdrr,
CASE
WHEN @unblind_column IS NULL THEN CASE WHEN true_effect_size IS NULL THEN 0 ELSE 1 END
ELSE @unblind_column
END AS unblind
FROM @database_schema.sccs_result
INNER JOIN @database_schema.sccs_exposures_outcome_set
ON sccs_result.exposures_outcome_set_id = sccs_exposures_outcome_set.exposures_outcome_set_id
INNER JOIN @database_schema.sccs_covariate
ON sccs_result.database_id = sccs_covariate.database_id
AND sccs_result.exposures_outcome_set_id = sccs_covariate.exposures_outcome_set_id
AND sccs_result.covariate_id = sccs_covariate.covariate_id
AND sccs_result.analysis_id = sccs_covariate.analysis_id
INNER JOIN @database_schema.sccs_exposure
ON sccs_result.exposures_outcome_set_id = sccs_exposure.exposures_outcome_set_id
AND sccs_covariate.era_id = sccs_covariate.era_id
AND sccs_covariate.era_id = sccs_exposure.era_id
LEFT JOIN @database_schema.sccs_diagnostics_summary
ON sccs_result.exposures_outcome_set_id = sccs_diagnostics_summary.exposures_outcome_set_id
AND sccs_result.covariate_id = sccs_diagnostics_summary.covariate_id
Expand Down Expand Up @@ -879,6 +886,9 @@ EvidenceSynthesisModule <- R6::R6Class(
filter(.data$unblind == 1) |>
select(
"exposuresOutcomeSetId",
"exposureId",
"nestingCohortId",
"outcomeId",
"covariateId",
"analysisId",
"databaseId",
Expand All @@ -905,6 +915,9 @@ EvidenceSynthesisModule <- R6::R6Class(
filter(.data$unblind == 1) |>
select(
"exposuresOutcomeSetId",
"exposureId",
"nestingCohortId",
"outcomeId",
"covariateId",
"analysisId",
"databaseId",
Expand All @@ -914,11 +927,16 @@ EvidenceSynthesisModule <- R6::R6Class(
} else {
stop(sprintf("Unknown likelihood approximation '%s'.", evidenceSynthesisSource$likelihoodApproximation))
}
sql <- "SELECT DISTINCT sccs_covariate.analysis_id,
sccs_covariate.exposures_outcome_set_id,
sql <- "SELECT DISTINCT sccs_exposure.exposures_outcome_set_id,
sccs_covariate.analysis_id,
sccs_covariate.era_id AS exposure_id,
nesting_cohort_id,
outcome_id,
sccs_covariate.covariate_id,
true_effect_size
FROM @database_schema.sccs_exposure
INNER JOIN @database_schema.sccs_exposures_outcome_set
ON sccs_exposure.exposures_outcome_set_id = sccs_exposures_outcome_set.exposures_outcome_set_id
INNER JOIN @database_schema.sccs_covariate
ON sccs_exposure.era_id = sccs_covariate.era_id
AND sccs_exposure.exposures_outcome_set_id = sccs_covariate.exposures_outcome_set_id
Expand Down
99 changes: 99 additions & 0 deletions extras/ESModule-SimulateResultsForTesting.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# This file was part of EvidenceSynthesisModule

library(dplyr)
source("extras/ESModule-SimulationFunctions.R")

databaseFile <- "inst/testdata/esmodule/results.sqlite"
if (file.exists(databaseFile)) {
unlink(databaseFile)
}
connection <- DatabaseConnector::connect(dbms = "sqlite", server = databaseFile)

# Simulate CohortMethod data ---------------------------------------------------

targetId <- 1
comparatorId <- 2
# outcomeId <- 1
for (outcomeId in 1:26) {
message(sprintf("Simulating outcome %d", outcomeId))
outcomeOfInterest <- outcomeId == 1
trueEffectSize <- if_else(outcomeOfInterest, 2, 1)
cmTargetComparatorOutcome <- tibble(
targetId = targetId,
comparatorId = comparatorId,
outcomeId = outcomeId,
trueEffectSize = trueEffectSize,
outcomeOfInterest = outcomeOfInterest
)
DatabaseConnector::insertTable(
connection = connection,
databaseSchema = "main",
tableName = "cm_target_comparator_outcome",
data = cmTargetComparatorOutcome,
createTable = outcomeId == 1,
dropTableIfExists = FALSE,
camelCaseToSnakeCase = TRUE
)
for (analysisId in 1:4) {
simulateTco(targetId, comparatorId, outcomeId, analysisId, hazardRatio = trueEffectSize)
}
}

# Simulate SCCS data -----------------------------------------------------------
exposureId <- 100
# outcomeId <- 1
for (outcomeId in 1:26) {
message(sprintf("Simulating outcome %d", outcomeId))
outcomeOfInterest <- outcomeId == 1
trueEffectSize <- if_else(outcomeOfInterest, 2, 1)

sccsExposuresOutcomeSet <- tibble(
exposuresOutcomeSetId = outcomeId,
outcomeId = !!outcomeId,
nestingCohortId = NA
)
DatabaseConnector::insertTable(
connection = connection,
databaseSchema = "main",
tableName = "sccs_exposures_outcome_set",
data = sccsExposuresOutcomeSet,
createTable = outcomeId == 1,
dropTableIfExists = outcomeId == 1,
camelCaseToSnakeCase = TRUE
)

sccsExposure <- tibble(
exposuresOutcomeSetId = outcomeId,
eraId = !!exposureId,
trueEffectSize = ifelse(trueEffectSize == 1, 1, NA),
)
DatabaseConnector::insertTable(
connection = connection,
databaseSchema = "main",
tableName = "sccs_exposure",
data = sccsExposure,
createTable = outcomeId == 1,
dropTableIfExists = outcomeId == 1,
camelCaseToSnakeCase = TRUE
)

sccsCovariateAnalysis <- tibble(
analysisId = 1:4,
covariateAnalysisId = 1,
variableOfInterest = 1
)
DatabaseConnector::insertTable(
connection = connection,
databaseSchema = "main",
tableName = "sccs_covariate_analysis",
data = sccsCovariateAnalysis,
createTable = outcomeId == 1,
dropTableIfExists = outcomeId == 1,
camelCaseToSnakeCase = TRUE
)

for (analysisId in 1:4) {
simulateEo(exposureId = exposureId, outcomeId = outcomeId, analysisId = analysisId, incidenceRateRatio = trueEffectSize)
}
}
DatabaseConnector::disconnect(connection)
Loading

0 comments on commit 53c83d1

Please sign in to comment.