Skip to content

Commit 3ad68b3

Browse files
authored
Flatten operator for Optional columns. (#307)
1 parent be589b3 commit 3ad68b3

3 files changed

Lines changed: 92 additions & 2 deletions

File tree

dataset/src/main/scala/frameless/TypedDataset.scala

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,9 +1165,18 @@ class TypedDataset[T] protected[frameless](val dataset: Dataset[T])(implicit val
11651165
}
11661166

11671167
/**
1168-
* Explodes (flattens) a single column at a time. It only compiles if the type of column supports this operation.
1168+
* Explodes a single column at a time. It only compiles if the type of column supports this operation.
11691169
*
1170-
* @param column the column we wish to explode/flatten
1170+
* @example
1171+
*
1172+
* {{{
1173+
* case class X(i: Int, j: Array[Int])
1174+
* case class Y(i: Int, j: Int)
1175+
*
1176+
* val f: TypedDataset[X] = ???
1177+
* val fNew: TypedDataset[Y] = f.explode('j).as[Y]
1178+
* }}}
1179+
* @param column the column we wish to explode
11711180
*/
11721181
def explode[A, TRep <: HList, V[_], OutMod <: HList, OutModValues <: HList, Out]
11731182
(column: Witness.Lt[Symbol])
@@ -1189,6 +1198,40 @@ class TypedDataset[T] protected[frameless](val dataset: Dataset[T])(implicit val
11891198
sparkExplode(df(column.value.name))).as[Out](TypedExpressionEncoder[Out])
11901199
TypedDataset.create[Out](trans)
11911200
}
1201+
1202+
/**
1203+
* Flattens a column of type Option[A]. Compiles only if the selected column is of type Option[A].
1204+
*
1205+
*
1206+
* @example
1207+
*
1208+
* {{{
1209+
* case class X(i: Int, j: Option[Int])
1210+
* case class Y(i: Int, j: Int)
1211+
*
1212+
* val f: TypedDataset[X] = ???
1213+
* val fNew: TypedDataset[Y] = f.flattenOption('j).as[Y]
1214+
* }}}
1215+
*
1216+
* @param column the column we wish to flatten
1217+
*/
1218+
def flattenOption[A, TRep <: HList, V[_], OutMod <: HList, OutModValues <: HList, Out]
1219+
(column: Witness.Lt[Symbol])
1220+
(implicit
1221+
i0: TypedColumn.Exists[T, column.T, V[A]],
1222+
i1: TypedEncoder[A],
1223+
i2: V[A] =:= Option[A],
1224+
i3: LabelledGeneric.Aux[T, TRep],
1225+
i4: Modifier.Aux[TRep, column.T, V[A], A, OutMod],
1226+
i5: Values.Aux[OutMod, OutModValues],
1227+
i6: Tupler.Aux[OutModValues, Out],
1228+
i7: TypedEncoder[Out]
1229+
): TypedDataset[Out] = {
1230+
val df = dataset.toDF()
1231+
val trans =
1232+
df.filter(df(column.value.name).isNotNull).as[Out](TypedExpressionEncoder[Out])
1233+
TypedDataset.create[Out](trans)
1234+
}
11921235
}
11931236

11941237
object TypedDataset {
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package frameless
2+
3+
import org.scalacheck.Prop
4+
import org.scalacheck.Prop.forAll
5+
import org.scalacheck.Prop._
6+
7+
8+
class FlattenTests extends TypedDatasetSuite {
9+
test("simple flatten test") {
10+
val ds: TypedDataset[(Int,Option[Int])] = TypedDataset.create(Seq((1,Option(1))))
11+
ds.flattenOption('_2): TypedDataset[(Int,Int)]
12+
}
13+
14+
test("different Optional types") {
15+
def prop[A: TypedEncoder](xs: List[X1[Option[A]]]): Prop = {
16+
val tds: TypedDataset[X1[Option[A]]] = TypedDataset.create(xs)
17+
18+
val framelessResults: Seq[Tuple1[A]] = tds.flattenOption('a).collect().run().toVector
19+
val scalaResults = xs.flatMap(_.a).map(Tuple1(_)).toVector
20+
21+
framelessResults ?= scalaResults
22+
}
23+
24+
check(forAll(prop[Long] _))
25+
check(forAll(prop[Int] _))
26+
check(forAll(prop[Char] _))
27+
check(forAll(prop[String] _))
28+
}
29+
}

docs/src/main/tut/FeatureOverview.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,24 @@ val sampleStats = bedroomStats.select(
500500
sampleStats.show().run()
501501
```
502502

503+
In addition, optional columns can be flatten using the `.flattenOption` method on `TypedDatset`.
504+
The result contains the rows for which the flattened column is not None (or null). The schema
505+
is automatically adapted to reflect this change.
506+
507+
```tut:book
508+
val flattenStats = bedroomStats.flattenOption('AvgPriceBeds2)
509+
510+
511+
// The second Option[Double] is now of type Double, since all 'null' values are removed
512+
flattenStats: TypedDataset[(String, Option[Double], Double, Option[Double], Option[Double])]
513+
```
514+
515+
In a DataFrame, if you just ignore types, this would equivelantly be written as:
516+
517+
```tut:book
518+
bedroomStats.dataset.toDF().filter($"AvgPriceBeds2".isNotNull)
519+
```
520+
503521

504522
### Entire TypedDataset Aggregation
505523

0 commit comments

Comments
 (0)