Skip to content

Commit ac1bc00

Browse files
committed
CNAM-153 moves all global variables under coxConfig and filteringConfig obj
1 parent 565dc37 commit ac1bc00

11 files changed

Lines changed: 227 additions & 74 deletions

File tree

src/main/resources/config/filtering-cmap.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ paths = {
1515
root = "/shared/filtered_data"
1616
patients = "/shared/filtered_data/patients"
1717
flat_events = "/shared/filtered_data/flat_events"
18+
cox_features = "/shared/features/cox_features"
19+
ltsccs_features = "/shared/features/ltsccs_features"
20+
mlpp_features = "/shared/featues/mlpp_features"
1821
}
1922
}
2023

src/main/resources/config/filtering-cnam.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ paths = {
1515
root = "/shared/filtered_data"
1616
patients = "/shared/filtered_data/patients"
1717
flat_events = "/shared/filtered_data/flat_events"
18+
cox_features = "/shared/features/cox_features"
19+
ltsccs_features = "/shared/features/ltsccs_features"
20+
mlpp_features = "/shared/featues/mlpp_features"
1821
}
1922
}
2023

src/main/resources/config/filtering-default.conf

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,36 +15,25 @@ default = {
1515
min_age = 18
1616
max_age = 120
1717
}
18+
trackloss = {
19+
threshold = 4
20+
delay = 2
21+
}
22+
23+
cox_parameters = {
24+
follow-up-delay = 6 #Number of months after the observation start that is considered to be followup
25+
filter_delayed_patients = true #Patients who are exposed after certain months of the study start.
26+
delayed_entries_threshold = 12 #Months that signifies the delayed entries.
27+
filter_diagonized_patients = true #Patients who are diagonized cancer before study period.
1828

19-
cox_hypothesis = {
20-
trackloss = {
21-
threshold = 4
22-
delay = 2
23-
}
24-
follow-up = {
25-
delay = 6
26-
}
27-
observation = {
28-
start = "first_molecule" # or "study_start"
29-
end = "study_end"
30-
}
3129
exposures = {
32-
min_purchases = 2
33-
start_delay = 3
34-
purchases_window = 6
35-
only_first = false
36-
delayed_entries = {
37-
filter = true
38-
delay = 12
39-
}
40-
diagnosed_patients = {
41-
filter = true
42-
delay = 0
43-
}
30+
min_purchases = 2 #Minimum number of purchases that have to be made in order to be considered exposed.
31+
purchases_window = 6 #Purchase window, within which the min number of purchases have to be made.
32+
start_delay = 3 #Number of months after which a patient will be considered exposed after the min purchases, window.
4433
}
4534
}
4635

47-
ltsccs_hypothesis = {
36+
ltsccs_parameters = {
4837
observation = {
4938
start = "first_molecule" # "study_start"
5039
end = "study_end"
@@ -65,7 +54,7 @@ default = {
6554
}
6655
}
6756

68-
mlpp_hypothesis = {
57+
mlpp_parameters = {
6958
observation = {
7059
start = "first_molecule" # "study_start"
7160
end = "study_end"
@@ -94,4 +83,6 @@ cmap = ${default}
9483
cmap = include "filtering-cmap.conf"
9584

9685
test = ${default}
97-
test = include "filtering-test.conf"
86+
test = {
87+
include "filtering-test.conf"
88+
}

src/main/resources/config/filtering-test.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ paths = {
1515
root = "target/test/output"
1616
patients = "target/test/output/patients"
1717
flat_events = "target/test/output/flat_events"
18+
cox_features = "target/test/output/cox_features"
19+
ltsccs_features = "target/test/output/ltsccs_features"
20+
mlpp_features = "target/test/output/mlpp_features"
1821
}
1922
}
2023

src/main/scala/fr/polytechnique/cmap/cnam/filtering/FilteringConfig.scala

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import scala.collection.JavaConverters._
55
import org.apache.spark.SparkContext
66
import org.apache.spark.sql.SQLContext
77
import com.typesafe.config.{Config, ConfigFactory}
8+
import fr.polytechnique.cmap.cnam.filtering.mlpp.MLPPFeature
89
import fr.polytechnique.cmap.cnam.utilities.functions._
910

1011
object FilteringConfig {
@@ -60,7 +61,14 @@ object FilteringConfig {
6061
dosages: String
6162
)
6263

63-
case class OutputPaths(root: String, patients: String, flatEvents: String)
64+
case class OutputPaths(
65+
root: String,
66+
patients: String,
67+
flatEvents: String,
68+
coxFeatures: String,
69+
ltsccsFeatures: String,
70+
mlppFeatures: String
71+
)
6472

6573
case class Limits(
6674
minYear: Int,
@@ -73,7 +81,16 @@ object FilteringConfig {
7381
maxAge: Int
7482
)
7583

76-
case class Dates(ageReference: Timestamp)
84+
case class Dates(
85+
ageReference: Timestamp,
86+
studyStart: Timestamp,
87+
studyEnd: Timestamp
88+
)
89+
90+
case class TracklossDefinition(
91+
threshold: Int,
92+
delay: Int
93+
)
7794

7895
lazy val drugCategories: List[String] = conf.getStringList("drug_categories").asScala.toList
7996
lazy val cancerDefinition: String = conf.getString("cancer_definition")
@@ -92,7 +109,10 @@ object FilteringConfig {
92109
lazy val outputPaths = OutputPaths(
93110
root = conf.getString("paths.output.root"),
94111
patients = conf.getString("paths.output.patients"),
95-
flatEvents = conf.getString("paths.output.flat_events")
112+
flatEvents = conf.getString("paths.output.flat_events"),
113+
coxFeatures = conf.getString("paths.output.cox_features"),
114+
ltsccsFeatures = conf.getString("paths.output.ltsccs_features"),
115+
mlppFeatures = conf.getString("paths.output.mlpp_features")
96116
)
97117
lazy val limits = Limits(
98118
minYear = conf.getInt("limits.min_year"),
@@ -105,6 +125,14 @@ object FilteringConfig {
105125
maxAge = conf.getInt("limits.max_age")
106126
)
107127
lazy val dates = Dates(
108-
ageReference = makeTS(conf.getIntList("dates.age_reference").asScala.toList)
128+
ageReference = makeTS(conf.getIntList("dates.age_reference").asScala.toList),
129+
studyStart = makeTS(conf.getIntList("dates.study_start").asScala.toList),
130+
studyEnd = makeTS(conf.getIntList("dates.study_end").asScala.toList)
131+
)
132+
lazy val tracklossDefinition = TracklossDefinition(
133+
threshold = conf.getInt("trackloss.threshold"),
134+
delay = conf.getInt("trackloss.delay")
109135
)
136+
137+
def modelConfig(modelName: String): Config = conf.getConfig(modelName)
110138
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package fr.polytechnique.cmap.cnam.filtering.cox
2+
3+
import com.typesafe.config.Config
4+
import fr.polytechnique.cmap.cnam.filtering.FilteringConfig
5+
6+
/**
7+
* Created by sathiya on 23/11/16.
8+
*/
9+
object CoxConfig {
10+
11+
case class CoxExposureDefinition(
12+
minPurchases: Int,
13+
purchasesWindow: Int,
14+
startDelay: Int
15+
)
16+
17+
private lazy val modelParams: Config = FilteringConfig.modelConfig("cox_parameters")
18+
19+
lazy val followUpMonthsDelay: Int = modelParams.getInt("follow-up-delay")
20+
lazy val filterDelayedPatients: Boolean = modelParams.getBoolean("filter_delayed_patients")
21+
lazy val delayedEntriesThreshold: Int = modelParams.getInt("delayed_entries_threshold")
22+
lazy val filterDiagnosedPatients: Boolean = modelParams.getBoolean("filter_diagnosed_patients")
23+
24+
lazy val exposureDefinition = CoxExposureDefinition(
25+
minPurchases = modelParams.getInt("exposures.min_purchases"),
26+
startDelay = modelParams.getInt("exposures.start_delay"),
27+
purchasesWindow = modelParams.getInt("exposures.purchases_window")
28+
)
29+
}

src/main/scala/fr/polytechnique/cmap/cnam/filtering/cox/CoxExposuresTransformer.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,15 @@ import org.apache.spark.sql.expressions.Window
44
import org.apache.spark.sql.functions._
55
import org.apache.spark.sql.types.{BooleanType, TimestampType}
66
import org.apache.spark.sql.{Column, DataFrame, Dataset}
7-
import fr.polytechnique.cmap.cnam.filtering.{ExposuresTransformer, FlatEvent}
7+
import fr.polytechnique.cmap.cnam.filtering.{ExposuresTransformer, FilteringConfig, FlatEvent}
88

99
object CoxExposuresTransformer extends ExposuresTransformer {
1010

1111
// Constant definitions for delays and time windows. Should be verified before compiling.
1212
// In the future, we may want to export them to an external file.
13-
final val ExposureStartDelay = 3
14-
final val ExposureStartThreshold = 6
15-
final val DiseaseCode = "C67"
13+
final val ExposureStartDelay: Int = CoxConfig.exposureDefinition.startDelay
14+
final val ExposureStartThreshold: Int = CoxConfig.exposureDefinition.purchasesWindow
15+
final val DiseaseCode: String = FilteringConfig.diseaseCode
1616

1717
val outputColumns = List(
1818
col("patientID"),
@@ -41,7 +41,10 @@ object CoxExposuresTransformer extends ExposuresTransformer {
4141
).over(window).cast(BooleanType)
4242

4343
// Drop patients whose first molecule event is after PeriodStart + 1 year
44-
val firstYearObservation = add_months(lit(StudyStart), 12).cast(TimestampType)
44+
val firstYearObservation = add_months(
45+
lit(StudyStart),
46+
CoxConfig.delayedEntriesThreshold
47+
).cast(TimestampType)
4548
val drugFilter = max(
4649
when(
4750
col("category") === "molecule" && (col("start") <= firstYearObservation),

src/main/scala/fr/polytechnique/cmap/cnam/filtering/cox/CoxFollowUpEventsTransformer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ import org.apache.spark.sql.expressions.Window
44
import org.apache.spark.sql.functions._
55
import org.apache.spark.sql.types.TimestampType
66
import org.apache.spark.sql.{Column, DataFrame, Dataset}
7-
import fr.polytechnique.cmap.cnam.filtering.{DatasetTransformer, FlatEvent}
7+
import fr.polytechnique.cmap.cnam.filtering.{DatasetTransformer, FilteringConfig, FlatEvent}
88
import fr.polytechnique.cmap.cnam.utilities.ColumnUtilities._
99

1010
object CoxFollowUpEventsTransformer extends DatasetTransformer[FlatEvent, FlatEvent] {
1111

12-
final val followUpMonthsDelay = 6
13-
final val diseaseCode = "C67"
12+
final val followUpMonthsDelay: Int = CoxConfig.followUpMonthsDelay
13+
final val diseaseCode: String = FilteringConfig.diseaseCode
1414

1515
val outputColumns = List(
1616
col("patientID"),

src/main/scala/fr/polytechnique/cmap/cnam/filtering/cox/CoxMain.scala

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
package fr.polytechnique.cmap.cnam.filtering.cox
22

3-
import org.apache.spark.sql.{DataFrame, Dataset}
43
import org.apache.spark.sql.functions._
54
import org.apache.spark.sql.hive.HiveContext
6-
import com.typesafe.config.{Config, ConfigFactory}
5+
import org.apache.spark.sql.{DataFrame, Dataset}
76
import fr.polytechnique.cmap.cnam.Main
87
import fr.polytechnique.cmap.cnam.filtering._
98

@@ -14,26 +13,40 @@ object CoxMain extends Main {
1413

1514
def appName = "CoxFeaturing"
1615

17-
def coxFeaturing(sqlContext: HiveContext,
18-
config: Config,
19-
cancerDefinition: String,
20-
filterDelayedPatients: Boolean): Unit = {
21-
import sqlContext.implicits._
16+
def run(sqlContext: HiveContext, argsMap: Map[String, String]): Option[Dataset[_]] = {
17+
18+
val flatEvents: Dataset[FlatEvent] = FilteringMain.run(sqlContext, argsMap).get
19+
coxFeaturing(flatEvents, argsMap)
20+
}
21+
22+
def coxFeaturing(flatEvents: Dataset[FlatEvent], argsMap: Map[String, String]): Option[Dataset[_]] = {
23+
import flatEvents.sqlContext.implicits._
2224

23-
val flatEventPath = config.getString("paths.input.flatEvent")
24-
val flatDcirPath = config.getString("paths.input.flatDcir")
25-
val outputRoot = config.getString("paths.output.root")
25+
val sqlContext = flatEvents.sqlContext
26+
27+
argsMap.get("conf").foreach(sqlContext.setConf("conf", _))
28+
argsMap.get("env").foreach(sqlContext.setConf("env", _))
29+
30+
val cancerDefinition: String = FilteringConfig.cancerDefinition
31+
val filterDelayedPatients: Boolean = CoxConfig.filterDelayedPatients
32+
val outputRoot = FilteringConfig.outputPaths.coxFeatures
2633
val outputDir = s"$outputRoot/$cancerDefinition/$filterDelayedPatients"
2734

28-
logger.info(s"Reading flat events from $flatEventPath...")
35+
logger.info("Running FilteringMain...")
36+
37+
val dcirFlat: DataFrame = sqlContext.read.parquet(FilteringConfig.inputPaths.dcir)
2938

30-
val dcirFlat: DataFrame = sqlContext.read.parquet(flatDcirPath)
31-
val flatEvents: DataFrame = sqlContext.read.parquet(flatEventPath)
39+
val drugFlatEvents = flatEvents.filter(_.category == "molecule")
40+
val diseaseFlatEvents = flatEvents.filter(_.category == "disease")
3241

33-
val drugFlatEvents = flatEvents.filter(col("category") === "molecule").as[FlatEvent]
34-
val diseaseFlatEvents = flatEvents.filter(col("category") === "disease").as[FlatEvent]
35-
val patientColumns = Array($"patientID", $"gender", $"birthDate", $"deathDate")
36-
val patients = flatEvents.select(patientColumns: _*).distinct.as[Patient]
42+
val patients: Dataset[Patient] = flatEvents
43+
.map(
44+
x => Patient(
45+
x.patientID,
46+
x.gender,
47+
x.birthDate,
48+
x.deathDate)
49+
).distinct
3750

3851
logger.info("Number of drug events: " + drugFlatEvents.count)
3952
logger.info("Caching disease events...")
@@ -85,21 +98,7 @@ object CoxMain extends Main {
8598
import CoxFeaturesWriter._
8699
coxFeatures.toDF.write.parquet(s"$outputDir/cox")
87100
coxFeatures.writeCSV(s"$outputDir/cox.csv")
88-
}
89101

90-
override def main(args: Array[String]): Unit = {
91-
startContext()
92-
val (environment: String, cancerDefinition: String, filterDelayedPatients: Boolean) =
93-
args match {
94-
case Array(arg1, args2, args3) => (args(0), args(1), args(2).toBoolean)
95-
case Array(arg1, args2) => (args(0), args(1), true)
96-
case _ => ("test", "broad", true)
97-
}
98-
val config: Config = ConfigFactory.parseResources("config/filtering-default.conf").getConfig(environment)
99-
coxFeaturing(sqlContext, config, cancerDefinition, filterDelayedPatients)
100-
stopContext()
102+
Some(coxFeatures)
101103
}
102-
103-
// todo: refactor this function
104-
def run(sqlContext: HiveContext, argsMap: Map[String, String]): Option[Dataset[_]] = None
105104
}

src/main/scala/fr/polytechnique/cmap/cnam/filtering/cox/CoxTransformer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
package fr.polytechnique.cmap.cnam.filtering.cox
22

3+
import java.sql.Timestamp
34
import org.apache.spark.sql.expressions.Window
45
import org.apache.spark.sql.functions._
56
import org.apache.spark.sql.types.IntegerType
67
import org.apache.spark.sql.{Column, DataFrame, Dataset}
7-
import fr.polytechnique.cmap.cnam.filtering.{DatasetTransformer, FlatEvent}
8-
import fr.polytechnique.cmap.cnam.utilities.functions._
8+
import fr.polytechnique.cmap.cnam.filtering.{DatasetTransformer, FilteringConfig, FlatEvent}
99

1010
// Start and End are expressed in month from the patient startObs
1111
case class CoxFeature(
@@ -35,7 +35,7 @@ object CoxTransformer extends DatasetTransformer[FlatEvent, CoxFeature] {
3535
"other"
3636
)
3737

38-
final val AgeReferenceDate = makeTS(2006, 12, 31, 23, 59, 59)
38+
final val AgeReferenceDate: Timestamp = FilteringConfig.dates.ageReference
3939

4040
implicit class CoxDataFrame(data: DataFrame) {
4141

0 commit comments

Comments
 (0)