Skip to content

Commit 67ed5da

Browse files
authored
Merge pull request #57 from X-DataInitiative/CNAM-152-FilteringMainRefactoring
CNAM-152-FilteringMainRefactoring
2 parents 19f3c38 + b71ac62 commit 67ed5da

32 files changed

Lines changed: 982 additions & 265 deletions
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
default = {
2+
env_name = "default"
3+
drug_categories = ["A10"]
4+
cancer_definition = "broad"
5+
disease_code = "C67"
6+
mco_death_code = 9
7+
limits = {
8+
min_year = 1900
9+
max_year = 2020
10+
min_month = 1
11+
max_month = 12
12+
min_gender = 1
13+
max_gender = 2
14+
min_age = 18
15+
max_age = 120
16+
}
17+
dates = {
18+
age_reference = [2006, 12, 31, 23, 59, 59]
19+
}
20+
paths = {
21+
input = {
22+
dcir = "/shared/flat_data/joins/DCIR"
23+
pmsi_mco = "/shared/flat_data/joins/MCO"
24+
pmsi_had = "/shared/flat_data/joins/HAD"
25+
pmsi_ssr = "/shared/flat_data/joins/SSR"
26+
ir_ben = "/shared/flat_data/IR_BEN_R"
27+
ir_imb = "/shared/flat_data/IR_IMB_R"
28+
ir_pha = "/shared/value_tables/IR_PHA_R"
29+
dosages = "/shared/value_tables/DOSE_PER_MOLECULE.CSV"
30+
}
31+
output = {
32+
root = "/shared/filtered_data"
33+
patients = "/shared/filtered_data/patients"
34+
flat_events = "/shared/filtered_data/flat_events"
35+
}
36+
}
37+
}
38+
39+
# For the CNAM environment, we use the default values
40+
cnam = ${default}
41+
cnam.env_name = "cnam"
42+
43+
# Overriding only the dates for cmap environment:
44+
cmap = ${default}
45+
cmap.env_name = "cmap"
46+
cmap.dates = {
47+
age_reference = [2010, 12, 31, 23, 59, 59]
48+
}
49+
50+
# Overriding only the paths for test environment:
51+
test = ${default}
52+
test.env_name = "test"
53+
test.paths = {
54+
input = {
55+
dcir = "src/test/resources/test-input/DCIR.parquet"
56+
pmsi_mco = "src/test/resources/test-input/MCO.parquet"
57+
pmsi_had = "src/test/resources/test-input/HAD.parquet"
58+
pmsi_ssr = "src/test/resources/test-input/SSR.parquet"
59+
ir_ben = "src/test/resources/test-input/IR_BEN_R.parquet"
60+
ir_imb = "src/test/resources/test-input/IR_IMB_R.parquet"
61+
ir_pha = "src/test/resources/test-input/IR_PHA_R.parquet"
62+
dosages = "src/test/resources/test-input/DOSE_PER_MOLECULE.CSV"
63+
}
64+
output = {
65+
root = "target/test/output"
66+
patients = "target/test/output/patients"
67+
flat_events = "target/test/output/flat_events"
68+
}
69+
}

src/main/resources/filtering.conf

Lines changed: 0 additions & 53 deletions
This file was deleted.

src/main/scala/fr/polytechnique/cmap/cnam/Main.scala

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@ package fr.polytechnique.cmap.cnam
22

33
import java.util.{Locale, TimeZone}
44
import org.apache.log4j.{Level, Logger}
5+
import org.apache.spark.sql.Dataset
56
import org.apache.spark.sql.hive.HiveContext
67
import org.apache.spark.{SparkConf, SparkContext}
8+
import fr.polytechnique.cmap.cnam.flattening.FlatteningMain._
79

810
trait Main {
911

1012
Logger.getRootLogger.setLevel(Level.ERROR)
1113
Logger.getLogger("org").setLevel(Level.ERROR)
1214
Logger.getLogger("akka").setLevel(Level.ERROR)
13-
Logger.getLogger("fr.polytechnique").setLevel(Level.WARN)
15+
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
16+
Logger.getLogger("fr.polytechnique").setLevel(Level.INFO)
1417

1518
Locale.setDefault(Locale.US)
1619
TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
@@ -26,9 +29,23 @@ trait Main {
2629
def startContext(): Unit = {
2730
_sc = new SparkContext(new SparkConf().setAppName(this.appName))
2831
_sql = new HiveContext(_sc)
32+
_sql.setConf("spark.sql.autoBroadcastJoinThreshold", "104857600")
2933
}
3034
def stopContext(): Unit = _sc.stop()
3135

36+
// Expected args are in format "arg1=value1 arg2=value2 ..."
37+
def main(args: Array[String]): Unit = {
38+
startContext()
39+
val sqlCtx = sqlContext
40+
val argsMap = args.map(
41+
arg => arg.split("=")(0) -> arg.split("=")(1)
42+
).toMap
43+
try {
44+
run(sqlCtx, argsMap)
45+
}
46+
finally stopContext()
47+
}
48+
3249
def appName: String
33-
def main(args: Array[String]): Unit
50+
def run(sqlContext: HiveContext, argsMap: Map[String, String]): Option[Dataset[_]]
3451
}

src/main/scala/fr/polytechnique/cmap/cnam/filtering/DcirPatientTransformer.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,10 @@ import org.apache.spark.sql.expressions.Window
44
import org.apache.spark.sql.functions._
55
import org.apache.spark.sql.types.{DateType, IntegerType, StringType, TimestampType}
66
import org.apache.spark.sql.{Column, DataFrame, Dataset}
7-
87
import fr.polytechnique.cmap.cnam.utilities.ColumnUtilities._
98

109
object DcirPatientTransformer extends Transformer[Patient] with PatientsTransformer {
1110

12-
1311
def estimateBirthDateCol(ts1: Column, ts2: Column, birthYear: Column): Column = {
1412
unix_timestamp(
1513
concat(
@@ -34,7 +32,6 @@ object DcirPatientTransformer extends Transformer[Patient] with PatientsTransfor
3432

3533
implicit class PatientTransformer(data: DataFrame) {
3634

37-
3835
// The birth year for each patient is found by grouping by patientId and birthYear and then
3936
// by taking the most frequent birth year for each patient.
4037
def findBirthYears: DataFrame = {
@@ -105,5 +102,4 @@ object DcirPatientTransformer extends Transformer[Patient] with PatientsTransfor
105102
dcir.unpersist()
106103
result
107104
}
108-
109105
}

src/main/scala/fr/polytechnique/cmap/cnam/filtering/DiseaseTransformer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
package fr.polytechnique.cmap.cnam.filtering
22

3-
import org.apache.spark.sql.{Column, Dataset}
43
import org.apache.spark.sql.functions._
54
import org.apache.spark.sql.types.TimestampType
5+
import org.apache.spark.sql.{Column, Dataset}
66

77
/**
88
* This trait contains the skeleton of the output events and the target disease code
99
*/
1010
trait DiseaseTransformer extends Transformer[Event] {
11-
final val DiseaseCode = "C67"
11+
final val DiseaseCode: String = FilteringConfig.diseaseCode
1212

1313
protected val outputColumns: List[Column] = List(
1414
col("patientID"),

src/main/scala/fr/polytechnique/cmap/cnam/filtering/DrugEventsTransformer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import fr.polytechnique.cmap.cnam.utilities.DrugEventsTransformerHelper
77

88
object DrugEventsTransformer extends Transformer[Event] {
99

10-
val drugCategories = List("A10") // Only anti-diabetics
10+
val drugCategories: List[String] = FilteringConfig.drugCategories
1111

1212
val dcirInputColumns: List[Column] = List(
1313
col("NUM_ENQ").cast(StringType).as("patientID"),
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package fr.polytechnique.cmap.cnam.filtering
2+
3+
import java.sql.Timestamp
4+
import scala.collection.JavaConverters._
5+
import org.apache.spark.SparkContext
6+
import org.apache.spark.sql.SQLContext
7+
import com.typesafe.config.{Config, ConfigFactory}
8+
import fr.polytechnique.cmap.cnam.utilities.functions._
9+
10+
object FilteringConfig {
11+
12+
/* Alternative option using vars instead of SQLContext:
13+
14+
private var _conf: Config = _
15+
private var _path: String = ""
16+
private var _env: String = "test"
17+
final private val defaultConfig = ConfigFactory.parseResources("filtering-default.conf")
18+
19+
def path = _path
20+
def env = _env
21+
22+
def setPath(path: String): Unit = { _path = path }
23+
def setEnv(env: String): Unit = { _env = env }
24+
def init(path: String, env: String ): Unit = {
25+
_path = path
26+
_env = env
27+
init()
28+
}
29+
def init(): Unit = {
30+
_conf = {
31+
val defaultConfig = ConfigFactory.parseResources("filtering-default.conf")
32+
val config = ConfigFactory.parseFile(new java.io.File(path)).withFallback(defaultConfig).resolve()
33+
config.getConfig(env)
34+
}
35+
}
36+
37+
def conf = _conf
38+
*/
39+
40+
private lazy val conf: Config = {
41+
// This is a little hacky. In the future, it may be nice to find a better way.
42+
val sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())
43+
val configPath: String = sqlContext.getConf("conf", "")
44+
val environment: String = sqlContext.getConf("env", "test")
45+
46+
val defaultConfig = ConfigFactory.parseResources("filtering-default.conf").resolve().getConfig(environment)
47+
val newConfig = ConfigFactory.parseFile(new java.io.File(configPath))
48+
49+
newConfig.withFallback(defaultConfig).resolve()
50+
}
51+
52+
case class InputPaths(
53+
dcir: String,
54+
pmsiMco: String,
55+
pmsiHad: String,
56+
pmsiSsr: String,
57+
irBen: String,
58+
irImb: String,
59+
irPha: String,
60+
dosages: String
61+
)
62+
63+
case class OutputPaths(root: String, patients: String, flatEvents: String)
64+
65+
case class Limits(
66+
minYear: Int,
67+
maxYear: Int,
68+
minMonth: Int,
69+
maxMonth: Int,
70+
minGender: Int,
71+
maxGender: Int,
72+
minAge: Int,
73+
maxAge: Int
74+
)
75+
76+
case class Dates(ageReference: Timestamp)
77+
78+
lazy val drugCategories: List[String] = conf.getStringList("drug_categories").asScala.toList
79+
lazy val cancerDefinition: String = conf.getString("cancer_definition")
80+
lazy val diseaseCode: String = conf.getString("disease_code")
81+
lazy val mcoDeathCode: Int = conf.getInt("mco_death_code")
82+
lazy val inputPaths = InputPaths(
83+
dcir = conf.getString("paths.input.dcir"),
84+
pmsiMco = conf.getString("paths.input.pmsi_mco"),
85+
pmsiHad = conf.getString("paths.input.pmsi_had"),
86+
pmsiSsr = conf.getString("paths.input.pmsi_ssr"),
87+
irBen = conf.getString("paths.input.ir_ben"),
88+
irImb = conf.getString("paths.input.ir_imb"),
89+
irPha = conf.getString("paths.input.ir_pha"),
90+
dosages = conf.getString("paths.input.dosages")
91+
)
92+
lazy val outputPaths = OutputPaths(
93+
root = conf.getString("paths.output.root"),
94+
patients = conf.getString("paths.output.patients"),
95+
flatEvents = conf.getString("paths.output.flat_events")
96+
)
97+
lazy val limits = Limits(
98+
minYear = conf.getInt("limits.min_year"),
99+
maxYear = conf.getInt("limits.max_year"),
100+
minMonth = conf.getInt("limits.min_month"),
101+
maxMonth = conf.getInt("limits.max_month"),
102+
minGender = conf.getInt("limits.min_gender"),
103+
maxGender = conf.getInt("limits.max_gender"),
104+
minAge = conf.getInt("limits.min_age"),
105+
maxAge = conf.getInt("limits.max_age")
106+
)
107+
lazy val dates = Dates(
108+
ageReference = makeTS(conf.getIntList("dates.age_reference").asScala.toList)
109+
)
110+
}

0 commit comments

Comments
 (0)