Skip to content

Commit 8b7d942

Browse files
committed
CNAM-153 configures minPurchases hypothesis in withExposureStart function
1 parent ac1bc00 commit 8b7d942

3 files changed

Lines changed: 84 additions & 24 deletions

File tree

src/main/scala/fr/polytechnique/cmap/cnam/filtering/cox/CoxExposuresTransformer.scala

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@ import org.apache.spark.sql.expressions.Window
44
import org.apache.spark.sql.functions._
55
import org.apache.spark.sql.types.{BooleanType, TimestampType}
66
import org.apache.spark.sql.{Column, DataFrame, Dataset}
7+
import fr.polytechnique.cmap.cnam.filtering.cox.CoxConfig.CoxExposureDefinition
78
import fr.polytechnique.cmap.cnam.filtering.{ExposuresTransformer, FilteringConfig, FlatEvent}
89

910
object CoxExposuresTransformer extends ExposuresTransformer {
1011

1112
// Constant definitions for delays and time windows. Should be verified before compiling.
1213
// In the future, we may want to export them to an external file.
13-
final val ExposureStartDelay: Int = CoxConfig.exposureDefinition.startDelay
14-
final val ExposureStartThreshold: Int = CoxConfig.exposureDefinition.purchasesWindow
14+
final val ExposureDefinition: CoxExposureDefinition = CoxConfig.exposureDefinition
15+
final val DelayedEntriesThreshold: Int = CoxConfig.delayedEntriesThreshold
1516
final val DiseaseCode: String = FilteringConfig.diseaseCode
1617

1718
val outputColumns = List(
@@ -43,7 +44,7 @@ object CoxExposuresTransformer extends ExposuresTransformer {
4344
// Drop patients whose first molecule event is after PeriodStart + 1 year
4445
val firstYearObservation = add_months(
4546
lit(StudyStart),
46-
CoxConfig.delayedEntriesThreshold
47+
DelayedEntriesThreshold
4748
).cast(TimestampType)
4849
val drugFilter = max(
4950
when(
@@ -67,17 +68,21 @@ object CoxExposuresTransformer extends ExposuresTransformer {
6768
// Ideally, this method must receive only molecules events, otherwise they will treat diseases
6869
// as molecules and add an exposure start date for them.
6970
// The exposure start date will be null when the patient was not exposed.
70-
def withExposureStart: DataFrame = {
71+
def withExposureStart(exposureDefinition: CoxExposureDefinition): DataFrame = {
7172
val window = Window.partitionBy("patientID", "eventId")
7273

7374
val exposureStartRule: Column = when(
74-
months_between(col("start"), col("previousStartDate")) <= ExposureStartThreshold,
75-
add_months(col("start"), ExposureStartDelay).cast(TimestampType)
75+
months_between(col("start"), col("previousStartDate")) <= exposureDefinition.purchasesWindow,
76+
add_months(col("start"), exposureDefinition.startDelay).cast(TimestampType)
7677
)
7778

78-
// todo: think about adding the range in the window for the lag function
79+
val potentialExposureStart: Column = if(exposureDefinition.minPurchases == 1)
80+
col("start")
81+
else
82+
lag(col("start"), exposureDefinition.minPurchases - 1).over(window.orderBy("start"))
83+
7984
data
80-
.withColumn("previousStartDate", lag(col("start"), 1).over(window.orderBy("start")))
85+
.withColumn("previousStartDate", potentialExposureStart)
8186
.withColumn("exposureStart", exposureStartRule)
8287
.withColumn("exposureStart", when(col("exposureStart") < col("followUpStart"),
8388
col("followUpStart")).otherwise(col("exposureStart"))
@@ -98,7 +103,7 @@ object CoxExposuresTransformer extends ExposuresTransformer {
98103
.filterPatients(filterDelayedPatients)
99104
.where(col("category") === "molecule")
100105
.where(col("start") < col("followUpEnd"))
101-
.withExposureStart
106+
.withExposureStart(ExposureDefinition)
102107
.withExposureEnd
103108
.where(col("exposureEnd") > col("exposureStart"))
104109
.select(outputColumns: _*)

src/test/scala/fr/polytechnique/cmap/cnam/filtering/cox/CoxExposuresTransformerSuite.scala

Lines changed: 68 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -91,35 +91,42 @@ class CoxExposuresTransformerSuite extends SharedContext {
9191
assert(result === expected)
9292
}
9393

94-
"withExposureStart" should "add a column with the start of the exposure" in {
94+
"withExposureStart" should "add exposureStart column correctly if the minPurchases value " +
95+
"is passed as 2" in {
9596
val sqlCtx = sqlContext
9697
import sqlCtx.implicits._
9798

9899
// Given
100+
import fr.polytechnique.cmap.cnam.filtering.cox.CoxConfig.CoxExposureDefinition
101+
val coxExposureDefintion = CoxExposureDefinition(
102+
minPurchases = 2,
103+
purchasesWindow = 6,
104+
startDelay = 3
105+
)
99106
val input = Seq(
100-
("Patient_A", "molecule", "PIOGLITAZONE", makeTS(2008, 3, 1), makeTS(2008, 6, 29)),
101-
("Patient_A", "molecule", "PIOGLITAZONE", makeTS(2008, 1, 1), makeTS(2008, 6, 29)),
102-
("Patient_A", "molecule", "PIOGLITAZONE", makeTS(2008, 8, 1), makeTS(2008, 6, 29)),
103-
("Patient_A", "molecule", "SULFONYLUREA", makeTS(2008, 9, 1), makeTS(2008, 6, 29)),
104-
("Patient_A", "molecule", "SULFONYLUREA", makeTS(2008, 10, 1), makeTS(2008, 6, 29)),
105-
("Patient_B", "molecule", "PIOGLITAZONE", makeTS(2009, 1, 1), makeTS(2009, 6, 29)),
106-
("Patient_B", "molecule", "BENFLUOREX", makeTS(2007, 1, 1), makeTS(2009, 6, 29))
107+
("Patient_A", "molecule", "PIOGLITAZONE", makeTS(2008, 8, 1), makeTS(2008, 6, 29)),
108+
("Patient_A", "molecule", "PIOGLITAZONE", makeTS(2008, 1, 1), makeTS(2008, 6, 29)),
109+
("Patient_A", "molecule", "PIOGLITAZONE", makeTS(2008, 9, 1), makeTS(2008, 6, 29)),
110+
("Patient_A", "molecule", "SULFONYLUREA", makeTS(2009, 3, 1), makeTS(2008, 6, 29)),
111+
("Patient_A", "molecule", "SULFONYLUREA", makeTS(2009, 4, 1), makeTS(2008, 6, 29)),
112+
("Patient_B", "molecule", "PIOGLITAZONE", makeTS(2009, 1, 1), makeTS(2009, 6, 29)),
113+
("Patient_B", "molecule", "BENFLUOREX", makeTS(2007, 1, 1), makeTS(2009, 6, 29))
107114
).toDF("PatientID", "category", "eventId", "start", "followUpStart")
108115

109116
val expected = Seq(
110-
("Patient_A", "PIOGLITAZONE", Some(makeTS(2008, 6, 29))),
111-
("Patient_A", "PIOGLITAZONE", Some(makeTS(2008, 6, 29))),
112-
("Patient_A", "PIOGLITAZONE", Some(makeTS(2008, 6, 29))),
113-
("Patient_A", "SULFONYLUREA", Some(makeTS(2009, 1, 1))),
114-
("Patient_A", "SULFONYLUREA", Some(makeTS(2009, 1, 1))),
117+
("Patient_A", "PIOGLITAZONE", Some(makeTS(2008, 12, 1))),
118+
("Patient_A", "PIOGLITAZONE", Some(makeTS(2008, 12, 1))),
119+
("Patient_A", "PIOGLITAZONE", Some(makeTS(2008, 12, 1))),
120+
("Patient_A", "SULFONYLUREA", Some(makeTS(2009, 7, 1))),
121+
("Patient_A", "SULFONYLUREA", Some(makeTS(2009, 7, 1))),
115122
("Patient_B", "PIOGLITAZONE", None),
116123
("Patient_B", "BENFLUOREX", None)
117124
).toDF("PatientID", "eventId", "exposureStart")
118125

119126

120127
// When
121128
import fr.polytechnique.cmap.cnam.filtering.cox.CoxExposuresTransformer.ExposuresDataFrame
122-
val result = input.withExposureStart.select("PatientID", "eventId", "exposureStart")
129+
val result = input.withExposureStart(coxExposureDefintion).select("PatientID", "eventId", "exposureStart")
123130

124131
// Then
125132
import RichDataFrames._
@@ -130,6 +137,52 @@ class CoxExposuresTransformerSuite extends SharedContext {
130137
assert(result === expected)
131138
}
132139

140+
it should "compute the exposureStart date correctly if the minPurchases value is passed as 1" in {
141+
val sqlCtx = sqlContext
142+
import sqlCtx.implicits._
143+
144+
// Given
145+
import fr.polytechnique.cmap.cnam.filtering.cox.CoxConfig.CoxExposureDefinition
146+
val coxExposureDefintion = CoxExposureDefinition(
147+
minPurchases = 1,
148+
purchasesWindow = 6,
149+
startDelay = 3
150+
)
151+
val input = Seq(
152+
("Patient_A", "molecule", "PIOGLITAZONE", makeTS(2008, 3, 1), makeTS(2008, 6, 29)),
153+
("Patient_A", "molecule", "PIOGLITAZONE", makeTS(2008, 1, 1), makeTS(2008, 6, 29)),
154+
("Patient_A", "molecule", "PIOGLITAZONE", makeTS(2008, 8, 1), makeTS(2008, 6, 29)),
155+
("Patient_A", "molecule", "SULFONYLUREA", makeTS(2009, 3, 1), makeTS(2008, 6, 29)),
156+
("Patient_A", "molecule", "SULFONYLUREA", makeTS(2009, 4, 1), makeTS(2008, 6, 29)),
157+
("Patient_B", "molecule", "PIOGLITAZONE", makeTS(2009, 1, 1), makeTS(2009, 6, 29)),
158+
("Patient_B", "molecule", "BENFLUOREX", makeTS(2009, 6, 1), makeTS(2009, 6, 29))
159+
).toDF("PatientID", "category", "eventId", "start", "followUpStart")
160+
161+
val expected = Seq(
162+
("Patient_A", "PIOGLITAZONE", Some(makeTS(2008, 6, 29))),
163+
("Patient_A", "PIOGLITAZONE", Some(makeTS(2008, 6, 29))),
164+
("Patient_A", "PIOGLITAZONE", Some(makeTS(2008, 6, 29))),
165+
("Patient_A", "SULFONYLUREA", Some(makeTS(2009, 6, 1))),
166+
("Patient_A", "SULFONYLUREA", Some(makeTS(2009, 6, 1))),
167+
("Patient_B", "PIOGLITAZONE", Some(makeTS(2009, 6, 29))),
168+
("Patient_B", "BENFLUOREX", Some(makeTS(2009, 9, 1)))
169+
).toDF("PatientID", "eventId", "exposureStart")
170+
171+
// When
172+
import fr.polytechnique.cmap.cnam.filtering.cox.CoxExposuresTransformer.ExposuresDataFrame
173+
val result = input.withExposureStart(coxExposureDefintion)
174+
.select("PatientID", "eventId", "exposureStart")
175+
176+
// Then
177+
import RichDataFrames._
178+
println("Result:")
179+
result.show
180+
println("Expected:")
181+
expected.show
182+
assert(result === expected)
183+
184+
}
185+
133186
"transform" should "return a valid Dataset for a known input" in {
134187

135188
val sqlCtx = sqlContext
@@ -247,7 +300,7 @@ class CoxExposuresTransformerSuite extends SharedContext {
247300
).toDF
248301

249302
// When
250-
val result = CoxExposuresTransformer.transform(input, false)
303+
val result = CoxExposuresTransformer.transform(input, filterDelayedPatients = false)
251304

252305
// Then
253306
result.show

src/test/scala/fr/polytechnique/cmap/cnam/filtering/cox/CoxMainSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ class CoxMainSuite extends SharedContext {
3636
assert(result === expectedResult)
3737
}
3838

39+
//TODO: We should update the dummy files and test only the above test case in the future.
40+
// We are testing it for the moment because the dummy data don't have any meaningful exposure.
3941
"coxFeaturing" should "correctly run the CoxFeaturing for the given FlatEvents" in {
4042
val sqlCtx = sqlContext
4143
import sqlCtx.implicits._

0 commit comments

Comments
 (0)