Skip to content

Commit 79f96aa

Browse files
authored
Merge pull request #109 from jpolchlo/optimization/checkpoint-and-persist
Minor fixes and optimizations
2 parents cbeb773 + d4c5d36 commit 79f96aa

11 files changed

Lines changed: 127 additions & 19 deletions

File tree

README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,26 @@ calculation from geometric types. See
129129
[here](https://www.geomesa.org/documentation/user/spark/sparksql_functions.html)
130130
for a list of functions that operate on geometries.
131131

132+
#### A Note on Geocoding ####
133+
134+
VectorPipe provides the means to tag geometries with the country codes of the
135+
countries they interact with, but it does not provide the boundaries used to
136+
do the coding. That gives the user the option to select geometries
137+
appropriate to the task at hand—low resolution geometries for less fussy
138+
applications, high resolution when precision is important.
139+
140+
In order for an application to make use of `vectorpipe.util.Geocode`, it must
141+
supply a `countries.geojson` in in the root of its project's `resources`
142+
directory. That GeoJSON file must contain a `FeatureCollection`, with each
143+
entry having an `ADM0_A3` entry in its `properties` list.
144+
145+
One may employ the [Natural Earth Admin
146+
0](https://www.naturalearthdata.com/downloads/10m-cultural-vectors/10m-admin-0-boundary-lines/)
147+
resource for low-precision tasks, or use something like the [Global LSIB
148+
Polygons](http://geonode.state.gov/layers/geonode%3AGlobal_LSIB_Polygons_Detailed)
149+
for more precise tasks (though the latter resource does not tag its elements
150+
with the `ADM0_A3` three-letter codes, so some preprocessing would be required).
151+
132152
## The `internal` package ##
133153

134154
While most users will rely solely on the features exposed by the `OSM` object,

project/Version.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
object Version {
22
val awscala = "0.8.1"
3-
val vectorpipe = "1.1.0"
3+
val vectorpipe = "1.1.0-SNAPSHOT"
44
val scala = "2.11.12"
55
val geotrellis = "2.2.0"
66
val geomesa = "2.2.1"

src/main/scala/vectorpipe/VectorPipe.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import org.apache.spark.sql._
1313
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
1414
import org.apache.spark.sql.functions._
1515
import org.apache.spark.sql.types.StringType
16+
import org.apache.spark.storage.StorageLevel
1617
import org.locationtech.jts.{geom => jts}
1718

1819
object VectorPipe {
@@ -25,6 +26,8 @@ object VectorPipe {
2526
* @param srcCRS CRS of the original geometry
2627
* @param destCRS (optional) The CRS to produce vectortiles into. When omitted,
2728
* defaults to [[WebMercator]].
29+
* @param useCaching Allows intermediate results to be cached to disk. May require
30+
* additional disk space on executor nodes.
2831
* @param orderAreas Sorts polygonal geometries in vectortiles. In case of overlaps,
2932
* smaller geometries will draw on top of larger ones.
3033
* @param tileResolution Resolution of output tiles; i.e., the number of discretized bins
@@ -36,6 +39,7 @@ object VectorPipe {
3639
minZoom: Option[Int],
3740
srcCRS: CRS,
3841
destCRS: Option[CRS],
42+
useCaching: Boolean = false,
3943
orderAreas: Boolean = false,
4044
tileResolution: Int = 4096
4145
)
@@ -141,8 +145,8 @@ object VectorPipe {
141145
val simplify = udf { g: jts.Geometry => pipeline.simplify(g, level.layout) }
142146
val reduced = pipeline
143147
.reduce(working, level, keyColumn)
144-
.localCheckpoint()
145-
val prepared = reduced
148+
val persisted = if (options.useCaching) reduced.persist(StorageLevel.DISK_ONLY) else reduced
149+
val prepared = persisted
146150
.withColumn(geomColumn, simplify(col(geomColumn)))
147151
val vts = generateVectorTiles(prepared, level)
148152
saveVectorTiles(vts, zoom, pipeline.baseOutputURI)

src/main/scala/vectorpipe/functions/osm/package.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,8 @@ package object osm {
277277
when(comment.isNotNull and length(comment) > 0, extractHashtags(comment))
278278
.otherwise(typedLit(Seq.empty[String])) as 'hashtags
279279

280+
def isTagged(tags: Column): Column = size(map_keys(tags)) > 0 as 'isTagged
281+
280282
def isBuilding(tags: Column): Column =
281283
!lower(coalesce(tags.getItem("building"), lit("no"))).isin(FalsyValues: _*) as 'isBuilding
282284

src/main/scala/vectorpipe/sources/AugmentedDiffSource.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package vectorpipe.sources
33
import java.io.{ByteArrayInputStream, File}
44
import java.net.URI
55
import java.nio.charset.StandardCharsets
6+
import java.sql.Timestamp
7+
import java.time.Instant
68
import java.util.zip.GZIPInputStream
79

810
import cats.implicits._
@@ -16,6 +18,8 @@ import io.circe.generic.auto._
1618
import io.circe.{yaml, _}
1719
import org.apache.commons.io.IOUtils
1820
import org.apache.spark.internal.Logging
21+
import org.apache.spark.sql.Column
22+
import org.apache.spark.sql.functions.{floor, from_unixtime, to_timestamp, unix_timestamp}
1923
import org.joda.time.DateTime
2024
import vectorpipe.model.{AugmentedDiff, ElementWithSequence}
2125

@@ -105,5 +109,17 @@ object AugmentedDiffSource extends Logging {
105109
}
106110
}
107111

112+
def timestampToSequence(timestamp: Timestamp): Int =
113+
((timestamp.toInstant.getEpochSecond - 1347432900) / 60).toInt
114+
115+
def timestampToSequence(timestamp: Column): Column =
116+
floor((unix_timestamp(timestamp) - 1347432900) / 60)
117+
118+
def sequenceToTimestamp(sequence: Int): Timestamp =
119+
Timestamp.from(Instant.ofEpochSecond(sequence.toLong * 60 + 1347432900L))
120+
121+
def sequenceToTimestamp(sequence: Column): Column =
122+
to_timestamp(from_unixtime(sequence * 60 + 1347432900))
123+
108124
case class State(last_run: DateTime, sequence: Int)
109125
}

src/main/scala/vectorpipe/sources/ChangesetMicroBatchReader.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class ChangesetStreamBatchReader(baseURI: URI, sequences: Seq[Int])
2020
extends ReplicationStreamBatchReader[Changeset](baseURI, sequences) {
2121

2222
override def getSequence(baseURI: URI, sequence: Int): Seq[Changeset] =
23-
ChangesetSource.getSequence(baseURI, sequence)
23+
ChangesetSource.getChangeset(baseURI, sequence)
2424
}
2525

2626
class ChangesetMicroBatchReader(options: DataSourceOptions, checkpointLocation: String)
@@ -32,7 +32,7 @@ class ChangesetMicroBatchReader(options: DataSourceOptions, checkpointLocation:
3232
)
3333

3434
override def getCurrentSequence: Option[Int] =
35-
ChangesetSource.getCurrentSequence(baseURI)
35+
ChangesetSource.getCurrentSequence(baseURI).map(_.sequence.toInt)
3636

3737
override def createDataReaderFactories(): util.List[DataReaderFactory[Row]] =
3838
sequenceRange

src/main/scala/vectorpipe/sources/ChangesetReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ case class ChangesetReader(options: DataSourceOptions)
2828
}
2929

3030
override protected def getCurrentSequence: Option[Int] =
31-
ChangesetSource.getCurrentSequence(baseURI)
31+
ChangesetSource.getCurrentSequence(baseURI).map(_.sequence.toInt)
3232

3333
private def baseURI =
3434
new URI(

src/main/scala/vectorpipe/sources/ChangesetSource.scala

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package vectorpipe.sources
33
import java.io.{ByteArrayInputStream, IOException}
44
import java.net.URI
55
import java.nio.charset.StandardCharsets
6+
import java.time.Instant
67
import java.util.zip.GZIPInputStream
78

89
import cats.implicits._
@@ -27,7 +28,7 @@ object ChangesetSource extends Logging {
2728
private implicit val dateTimeDecoder: Decoder[DateTime] =
2829
Decoder.instance(a => a.as[String].map(DateTime.parse(_, formatter)))
2930

30-
def getSequence(baseURI: URI, sequence: Int): Seq[Changeset] = {
31+
def getChangeset(baseURI: URI, sequence: Int): Seq[Changeset] = {
3132
val s = f"$sequence%09d"
3233
val path = s"${s.slice(0, 3)}/${s.slice(3, 6)}/${s.slice(6, 9)}.osm.gz"
3334

@@ -40,7 +41,7 @@ object ChangesetSource extends Logging {
4041
if (response.code == 404) {
4142
logDebug(s"$sequence is not yet available, sleeping.")
4243
Thread.sleep(Delay.toMillis)
43-
getSequence(baseURI, sequence)
44+
getChangeset(baseURI, sequence)
4445
} else {
4546
// NOTE: if diff bodies get really large, switch to a SAX parser to help with the memory footprint
4647
val bais = new ByteArrayInputStream(response.body)
@@ -62,25 +63,27 @@ object ChangesetSource extends Logging {
6263
case e: IOException =>
6364
logWarning(s"Error fetching changeset $sequence", e)
6465
Thread.sleep(Delay.toMillis)
65-
getSequence(baseURI, sequence)
66+
getChangeset(baseURI, sequence)
6667
}
6768
}
6869

70+
case class Sequence(last_run: DateTime, sequence: Long)
71+
6972
@memoize(maxSize = 1, expiresAfter = 30 seconds)
70-
def getCurrentSequence(baseURI: URI): Option[Int] = {
73+
def getCurrentSequence(baseURI: URI): Option[Sequence] = {
7174
try {
7275
val response =
7376
Http(baseURI.resolve("state.yaml").toString).asString
7477

7578
val state = yaml.parser
7679
.parse(response.body)
7780
.leftMap(err => err: Error)
78-
.flatMap(_.as[State])
81+
.flatMap(_.as[Sequence])
7982
.valueOr(throw _)
8083

8184
logDebug(s"$baseURI state: ${state.sequence} @ ${state.last_run}")
8285

83-
Some(state.sequence)
86+
Some(state)
8487
} catch {
8588
case err: Throwable =>
8689
logError("Error fetching / parsing changeset state.", err)
@@ -89,5 +92,42 @@ object ChangesetSource extends Logging {
8992
}
9093
}
9194

92-
case class State(last_run: DateTime, sequence: Int)
95+
def getSequence(baseURI: URI, sequence: Long): Option[Sequence] = {
96+
val s = f"${sequence+1}%09d"
97+
val path = s"${s.slice(0, 3)}/${s.slice(3, 6)}/${s.slice(6, 9)}.state.txt"
98+
99+
try {
100+
val response =
101+
Http(baseURI.resolve(path).toString).asString
102+
103+
val state = yaml.parser
104+
.parse(response.body)
105+
.leftMap(err => err: Error)
106+
.flatMap(_.as[Sequence])
107+
.valueOr(throw _)
108+
109+
Some(state)
110+
} catch {
111+
case err: Throwable =>
112+
logError("Error fetching / parsing changeset state.", err)
113+
114+
None
115+
}
116+
}
117+
118+
def estimateSequenceNumber(modifiedTime: Instant, baseURI: URI): Long = {
119+
val current = getCurrentSequence(baseURI)
120+
val diffMinutes = (current.get.last_run.toInstant.getMillis/1000 - modifiedTime.getEpochSecond) / 60
121+
current.get.sequence - diffMinutes
122+
}
123+
124+
def findSequenceFor(modifiedTime: Instant, baseURI: URI): Long = {
125+
var guess = estimateSequenceNumber(modifiedTime, baseURI)
126+
val target = org.joda.time.Instant.parse(modifiedTime.toString)
127+
128+
while (getSequence(baseURI, guess).get.last_run.isAfter(target)) { guess -= 1 }
129+
while (getSequence(baseURI, guess).get.last_run.isBefore(target)) { guess += 1 }
130+
131+
getSequence(baseURI, guess).get.sequence
132+
}
93133
}

src/main/scala/vectorpipe/vectortile/Pipeline.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,8 @@ package vectorpipe.vectortile
33
import geotrellis.spark.SpatialKey
44
import geotrellis.spark.tiling._
55
import geotrellis.vector.{Feature, Geometry}
6-
import geotrellis.vectortile.Value
76
import org.apache.spark.sql.{DataFrame, Row}
87
import org.locationtech.jts.{geom => jts}
9-
import org.locationtech.jts.simplify
108

119
/**
1210
* The interface governing the transformation from processed OSM dataframes to

src/main/scala/vectorpipe/vectortile/package.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import geotrellis.proj4._
44
import geotrellis.spark.SpatialKey
55
import geotrellis.spark.tiling.LayoutDefinition
66
import geotrellis.vector._
7-
import geotrellis.vector.reproject._
87
import geotrellis.vectortile._
98
import org.apache.spark.sql._
109
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
@@ -27,15 +26,19 @@ package object vectortile {
2726
@transient lazy val st_reprojectGeom = udf { (g: jts.Geometry, srcProj: String, destProj: String) =>
2827
val trans = Proj4Transform(CRS.fromString(srcProj), CRS.fromString(destProj))
2928
if (Option(g).isDefined) {
30-
val gt = Geometry(g)
31-
gt.reproject(trans).jtsGeom
29+
if (g.isEmpty)
30+
g
31+
else {
32+
val gt = Geometry(g)
33+
gt.reproject(trans).jtsGeom
34+
}
3235
} else {
3336
null
3437
}
3538
}
3639

3740
def keyTo(layout: LayoutDefinition) = udf { g: jts.Geometry =>
38-
if (Option(g).isDefined) {
41+
if (Option(g).isDefined && !g.isEmpty) {
3942
layout.mapTransform.keysForGeometry(geotrellis.vector.Geometry(g)).toArray
4043
} else {
4144
Array.empty[SpatialKey]

0 commit comments

Comments
 (0)