Skip to content

Commit b71ac62

Browse files
committed
CNAM-152 Refactored run signature to return an optional Dataset
CNAM-152 Corrected csv file extension CNAM-152 Small changes and review
1 parent 0381b7f commit b71ac62

9 files changed

Lines changed: 28 additions & 11 deletions

File tree

src/main/resources/filtering-default.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ test.paths = {
5959
ir_ben = "src/test/resources/test-input/IR_BEN_R.parquet"
6060
ir_imb = "src/test/resources/test-input/IR_IMB_R.parquet"
6161
ir_pha = "src/test/resources/test-input/IR_PHA_R.parquet"
62-
dosages = "src/test/resources/test-input/DOSE_PER_MOLECULE.csv"
62+
dosages = "src/test/resources/test-input/DOSE_PER_MOLECULE.CSV"
6363
}
6464
output = {
6565
root = "target/test/output"

src/main/scala/fr/polytechnique/cmap/cnam/Main.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ package fr.polytechnique.cmap.cnam
22

33
import java.util.{Locale, TimeZone}
44
import org.apache.log4j.{Level, Logger}
5+
import org.apache.spark.sql.Dataset
56
import org.apache.spark.sql.hive.HiveContext
67
import org.apache.spark.{SparkConf, SparkContext}
8+
import fr.polytechnique.cmap.cnam.flattening.FlatteningMain._
79

810
trait Main {
911

@@ -27,6 +29,7 @@ trait Main {
2729
def startContext(): Unit = {
2830
_sc = new SparkContext(new SparkConf().setAppName(this.appName))
2931
_sql = new HiveContext(_sc)
32+
_sql.setConf("spark.sql.autoBroadcastJoinThreshold", "104857600")
3033
}
3134
def stopContext(): Unit = _sc.stop()
3235

@@ -44,5 +47,5 @@ trait Main {
4447
}
4548

4649
def appName: String
47-
def run(sqlContext: HiveContext, argsMap: Map[String, String]): Unit = {}
50+
def run(sqlContext: HiveContext, argsMap: Map[String, String]): Option[Dataset[_]]
4851
}

src/main/scala/fr/polytechnique/cmap/cnam/filtering/FilteringMain.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ object FilteringMain extends Main {
1515
* "conf" -> "path/to/file.conf" (default: "$resources/filtering-default.conf")
1616
* "env" -> "cnam" | "cmap" | "test" (deafult: "test")
1717
*/
18-
override def run(sqlContext: HiveContext, argsMap: Map[String, String] = Map()): Unit = {
18+
def run(sqlContext: HiveContext, argsMap: Map[String, String] = Map()): Option[Dataset[FlatEvent]] = {
1919

2020
import implicits.SourceExtractor
2121
import sqlContext.implicits._
@@ -70,5 +70,7 @@ object FilteringMain extends Main {
7070
patients.toDF.write.parquet(outputPaths.patients)
7171
logger.info("Writing FlatEvents...")
7272
flatEvents.toDF.write.parquet(outputPaths.flatEvents)
73+
74+
Some(flatEvents)
7375
}
7476
}

src/main/scala/fr/polytechnique/cmap/cnam/filtering/PatientsTransformer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ trait PatientsTransformer {
2020
object PatientsTransformer extends Transformer[Patient] with PatientsTransformer {
2121

2222
def isDeathDateValid(deathDate: Column, birthDate: Column): Column =
23-
deathDate.between(birthDate, lit(makeTS(MaxYear, 1, 1)))
23+
deathDate.between(birthDate, makeTS(MaxYear, 1, 1))
2424

2525
def transform(sources: Sources): Dataset[Patient] = {
2626
val irBen = IrBenPatientTransformer.transform(sources).toDF.as("irBen")

src/main/scala/fr/polytechnique/cmap/cnam/filtering/cox/CoxMain.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,4 +121,7 @@ object CoxMain extends Main {
121121
coxFeaturing(sqlContext, config, cancerDefinition, filterDelayedPatients)
122122
stopContext()
123123
}
124+
125+
// todo: refactor this function
126+
def run(sqlContext: HiveContext, argsMap: Map[String, String]): Option[Dataset[_]] = None
124127
}

src/main/scala/fr/polytechnique/cmap/cnam/filtering/ltsccs/LTSCCSMain.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package fr.polytechnique.cmap.cnam.filtering.ltsccs
22

3+
import org.apache.spark.sql.Dataset
34
import org.apache.spark.sql.functions._
45
import org.apache.spark.sql.hive.HiveContext
56
import com.typesafe.config.{Config, ConfigFactory}
@@ -60,4 +61,7 @@ object LTSCCSMain extends Main {
6061
runLTSCCS(sqlContext, config)
6162
stopContext()
6263
}
64+
65+
// todo: refactor this function
66+
def run(sqlContext: HiveContext, argsMap: Map[String, String]): Option[Dataset[_]] = None
6367
}

src/main/scala/fr/polytechnique/cmap/cnam/filtering/mlpp/MLPPMain.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package fr.polytechnique.cmap.cnam.filtering.mlpp
22

33
import scala.collection.JavaConversions._
4+
import org.apache.spark.sql.Dataset
45
import org.apache.spark.sql.functions._
56
import org.apache.spark.sql.hive.HiveContext
67
import com.typesafe.config.{Config, ConfigFactory}
@@ -49,4 +50,7 @@ object MLPPMain extends Main {
4950
MLPPFeaturing(sqlContext, config)
5051
stopContext()
5152
}
53+
54+
// todo: refactor this function
55+
def run(sqlContext: HiveContext, argsMap: Map[String, String]): Option[Dataset[_]] = None
5256
}

src/main/scala/fr/polytechnique/cmap/cnam/flattening/FlatteningMain.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package fr.polytechnique.cmap.cnam.flattening
22

3-
import org.apache.spark.sql.DataFrame
3+
import org.apache.spark.sql.{DataFrame, Dataset}
4+
import org.apache.spark.sql.hive.HiveContext
45
import fr.polytechnique.cmap.cnam.Main
56
import fr.polytechnique.cmap.cnam.utilities.FlatteningConfig
67
import fr.polytechnique.cmap.cnam.utilities.FlatteningConfig._
@@ -60,10 +61,9 @@ object FlatteningMain extends Main {
6061
}
6162
}
6263

63-
override def main(args: Array[String]): Unit = {
64-
startContext()
65-
sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "104857600")
64+
def run(sqlContext: HiveContext, argsMap: Map[String, String]): Option[Dataset[_]] = {
6665
loadToParquet()
6766
joinTables()
67+
None
6868
}
6969
}

src/main/scala/fr/polytechnique/cmap/cnam/flattening/ValidateFlattening.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ package fr.polytechnique.cmap.cnam.flattening
33
import fr.polytechnique.cmap.cnam.Main
44
import fr.polytechnique.cmap.cnam.statistics.Comparator
55
import fr.polytechnique.cmap.cnam.utilities.RichDataFrames._
6-
import org.apache.spark.sql.{Column, DataFrame}
6+
import org.apache.spark.sql.{Column, DataFrame, Dataset}
77
import org.apache.spark.sql.functions._
8+
import org.apache.spark.sql.hive.HiveContext
89
import fr.polytechnique.cmap.cnam.utilities.FlatteningConfig
910
import fr.polytechnique.cmap.cnam.utilities.FlatteningConfig._
1011

@@ -116,8 +117,8 @@ object ValidateFlattening extends Main {
116117
}
117118
}
118119

119-
override def main(args: Array[String]){
120-
startContext( )
120+
def run(sqlContext: HiveContext, argsMap: Map[String, String]): Option[Dataset[_]] = {
121121
computeStoreFlatAndInputDfsStat()
122+
None
122123
}
123124
}

0 commit comments

Comments
 (0)