Skip to content

Commit d4c5d36

Browse files
author
jpolchlo
committed
Add sequence/timestamp conversions and tests
1 parent 4d56957 commit d4c5d36

2 files changed

Lines changed: 41 additions & 0 deletions

File tree

src/main/scala/vectorpipe/sources/AugmentedDiffSource.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package vectorpipe.sources
33
import java.io.{ByteArrayInputStream, File}
44
import java.net.URI
55
import java.nio.charset.StandardCharsets
6+
import java.sql.Timestamp
7+
import java.time.Instant
68
import java.util.zip.GZIPInputStream
79

810
import cats.implicits._
@@ -16,6 +18,8 @@ import io.circe.generic.auto._
1618
import io.circe.{yaml, _}
1719
import org.apache.commons.io.IOUtils
1820
import org.apache.spark.internal.Logging
21+
import org.apache.spark.sql.Column
22+
import org.apache.spark.sql.functions.{floor, from_unixtime, to_timestamp, unix_timestamp}
1923
import org.joda.time.DateTime
2024
import vectorpipe.model.{AugmentedDiff, ElementWithSequence}
2125

@@ -105,5 +109,17 @@ object AugmentedDiffSource extends Logging {
105109
}
106110
}
107111

112+
def timestampToSequence(timestamp: Timestamp): Int =
113+
((timestamp.toInstant.getEpochSecond - 1347432900) / 60).toInt
114+
115+
def timestampToSequence(timestamp: Column): Column =
116+
floor((unix_timestamp(timestamp) - 1347432900) / 60)
117+
118+
def sequenceToTimestamp(sequence: Int): Timestamp =
119+
Timestamp.from(Instant.ofEpochSecond(sequence.toLong * 60 + 1347432900L))
120+
121+
def sequenceToTimestamp(sequence: Column): Column =
122+
to_timestamp(from_unixtime(sequence * 60 + 1347432900))
123+
108124
case class State(last_run: DateTime, sequence: Int)
109125
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package vectorpipe.sources
2+
3+
import org.apache.spark.sql.Row
4+
import org.scalatest.{FunSpec, Matchers}
5+
import vectorpipe.TestEnvironment
6+
7+
class AugmentedDiffSourceSpec extends FunSpec with TestEnvironment with Matchers {
8+
9+
import ss.implicits._
10+
11+
describe("Timestamp to sequence conversion") {
12+
it("should provide a round trip for simple conversion") {
13+
AugmentedDiffSource.timestampToSequence(AugmentedDiffSource.sequenceToTimestamp(3700047)) should be (3700047)
14+
}
15+
16+
it("should provide a round trip for column functions") {
17+
val df = ss.createDataset(Seq(3700047)).toDF
18+
(df.select(AugmentedDiffSource.sequenceToTimestamp('value) as 'time)
19+
.select(AugmentedDiffSource.timestampToSequence('time) as 'value)
20+
.first
21+
.getLong(0)) should be (3700047)
22+
}
23+
}
24+
25+
}

0 commit comments

Comments
 (0)