-
Notifications
You must be signed in to change notification settings - Fork 20
Expand file tree
/
Copy pathReplicationStreamBatchReader.scala
More file actions
46 lines (34 loc) · 1.46 KB
/
ReplicationStreamBatchReader.scala
File metadata and controls
46 lines (34 loc) · 1.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package vectorpipe.sources
import java.net.URI
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader
import scala.collection.parallel.ForkJoinTaskSupport
import java.util.concurrent.ForkJoinPool
import scala.reflect.runtime.universe.TypeTag
abstract class ReplicationStreamBatchReader[T <: Product: TypeTag](baseURI: URI,
sequences: Seq[Int])
extends InputPartitionReader[InternalRow]
with Logging {
org.apache.spark.sql.jts.registerTypes()
private lazy val rowEncoder = RowEncoder(encoder.schema).resolveAndBind()
protected var index: Int = -1
protected var items: Vector[T] = _
val Concurrency: Int = 8
private lazy val encoder = ExpressionEncoder[T]
override def next(): Boolean = {
index += 1
if (Option(items).isEmpty) {
val parSequences = sequences.par
val taskSupport = new ForkJoinTaskSupport(new ForkJoinPool(Concurrency))
parSequences.tasksupport = taskSupport
items = parSequences.flatMap(seq => getSequence(baseURI, seq)).toVector
taskSupport.environment.shutdown()
}
index < items.length
}
override def get(): InternalRow = encoder.toRow(items(index))
override def close(): Unit = Unit
protected def getSequence(baseURI: URI, sequence: Int): Seq[T]
}