@@ -8,6 +8,17 @@ import blueeyes.bkka.AkkaDefaults.defaultFutureDispatch
88import scala .Some
99import blueeyes .core .service .engines .HttpClientXLightWeb
1010import Datatypes ._
11+ import blueeyes .bkka .FutureMonad
12+ import scalaz .{Hoist , StreamT , ~> }
13+ import akka .dispatch .{Await , ExecutionContext , Future }
14+ import java .nio .ByteBuffer
15+ import scalaz .Id ._
16+ import annotation .tailrec
17+ import akka .util .Duration
18+ import blueeyes .core .http .HttpResponse
19+ import blueeyes .core .data .ByteChunk
20+ import akka .dispatch .Future
21+
1122
1223/**
1324 * User: gabriel
@@ -17,6 +28,8 @@ object ImportJdbc {
1728
1829 import DbAccess ._
1930
31+ val httpClient = new HttpClientXLightWeb ()(defaultFutureDispatch)
32+
2033 case class ImportTable (name: String , columns: Seq [String ], baseOrJoin: Either [Table ,Join ]){ val isCollection = baseOrJoin.right.toOption.map(_.exported).getOrElse(false ) }
2134 case class IngestInfo (tables: Seq [ImportTable ])
2235
@@ -34,53 +47,63 @@ object ImportJdbc {
3447
3548 def buildSort (ingestInfo: IngestInfo ) = ingestInfo.tables.flatMap( t => t.columns.map(" %s.%s" .format(t.name,_)) )
3649
50+ def getElements (o: Option [JValue ]): List [JValue ]= o match {
51+ case Some (l: JArray ) => l.elements
52+ case _ => Nil
53+ }
54+
55+ def toJObject (o: JValue ): JObject = o match {
56+ case j: JObject => j
57+ case _ => sys.error(" base value is not jobject!" )
58+ }
3759
38- def mkPartialJson (baseName : String , ingestInfo : IngestInfo , s : Seq [ String ], prevMap : Map [ String , JValue ] = Map ()) = {
60+ def buildField ( nm : ( String , String )) = Option (nm._2).map( s => JField (nm._1, JString (s)))
3961
40- def getElements (o: Option [JValue ]): List [JValue ]= o match {
41- case Some (l: JArray ) => l.elements
42- case _ => Nil
43- }
44- def toJObject (o: JValue ): JObject = o match {
45- case j: JObject => j
46- case _ => sys.error(" base value is not jobject!" )
47- }
62+ type StrJVMap = Map [String ,JValue ]
4863
49- def buildJValues ( ms : ( Map [ String , JValue ], Seq [String ]) , tblDesc : ImportTable ): (Option [(String ,JValue )],Seq [String ])= {
50- val (m,s) = ms
51- val (tblColValues,rest) = s.splitAt (tblDesc.columns.length)
52- val objValues = ( tblDesc.columns.zip(tblColValues)).flatMap(buildField(_) ).toList
53- val tblName = tblDesc.name
54- val keyValue = if (objValues.isEmpty) if (tblDesc.isCollection) Some (tblName-> JArray .empty) else None
64+ def buildJValues ( map : StrJVMap , s : Seq [String ], tblDesc : ImportTable ): (Option [(String ,JValue )],Seq [String ])= {
65+ val (tblColValues,rest) = s.splitAt(tblDesc.columns.length)
66+ val objValues = (tblDesc.columns.zip(tblColValues)).flatMap(buildField(_) ).toList
67+ val tblName = tblDesc.name.toUpperCase
68+ val keyValue =
69+ if (objValues.isEmpty) if (tblDesc.isCollection) Some (tblName-> JArray .empty) else None
5570 else {
5671 val data = JObject (objValues)
57- val obj = if (tblDesc.isCollection) JArray (data :: getElements(m .get(tblName)) ) else data
72+ val obj = if (tblDesc.isCollection) JArray (getElements(map .get(tblName)):+ data ) else data
5873 Some (tblName-> obj)
5974 }
60- (keyValue,rest)
75+ (keyValue,rest)
76+ }
77+
78+ @ tailrec
79+ def mkPartialJson (baseName: String , ingestInfo: IngestInfo , dataStream : StreamT [Id ,Seq [String ]], prevMap: StrJVMap = Map ()): Option [(JValue ,StreamT [Id ,Seq [String ]])] =
80+ if (dataStream.isEmpty) None
81+ else {
82+ val s = dataStream.head
83+ val tail = dataStream.tail
84+ val jsonMap = buildJsonObjMap(ingestInfo, prevMap, s)
85+ val baseNameUC = baseName.toUpperCase
86+ // peek into the stream
87+ val nextJsonMap : StrJVMap = if (tail.isEmpty) Map () else buildJsonObjMap(ingestInfo, Map (), tail.head)
88+ if ( ! nextJsonMap.isEmpty && (jsonMap.get(baseNameUC) == nextJsonMap.get(baseNameUC)) ) {
89+ // if next row is the same object, keep building
90+ mkPartialJson(baseNameUC,ingestInfo,tail,jsonMap)
91+ } else {
92+ val base = toJObject(jsonMap(baseNameUC))
93+ val values = (jsonMap- baseNameUC).map(nv => JField (nv._1, nv._2)).toList
94+ Some (JObject (base.fields ++ values),tail)
95+ }
6196 }
6297
63- def buildField ( nm : (String ,String )) = Option (nm._2).map( s=> JField (nm._1,JString (s)))
64-
65- val jsonMap : Map [String ,JValue ]= ingestInfo.tables.foldLeft( (prevMap,s) )(
66- (ms,v) => {
67- val (opt,r)= buildJValues(ms,v)
68- val (m,_)= ms
69- opt.map( (kobj)=> {
70- val (k,obj) = kobj
71- if (k!= baseName)
72- (m+ (kobj),r)
73- else if (prevMap.isEmpty || prevMap(k)!= obj)
74- (Map (kobj),r)
75- else (m,r)
76- }).getOrElse((m,r))
77- } )._1
78-
79- val base : JObject = toJObject(jsonMap(baseName))
80- val values : List [JField ] = (jsonMap- baseName).map(nv => JField (nv._1, nv._2)).toList
81- (JObject (base.fields ++ values),jsonMap)
82- }
8398
99+ def buildJsonObjMap (ingestInfo : ImportJdbc .IngestInfo , prevMap : ImportJdbc .StrJVMap , s : Seq [String ]): StrJVMap = {
100+ ingestInfo.tables.foldLeft((prevMap, s))(
101+ (ms, v) => {
102+ val (m,seq)= ms
103+ val (opt, r): (Option [(String , JValue )], Seq [String ]) = buildJValues(m, seq, v) // build a json object from the seq values
104+ opt.map(kv => (m + kv, r)).getOrElse((m, r))
105+ })._1
106+ }
84107
85108 def names (cs: Seq [Column ])= cs.map(_.name)
86109
@@ -92,33 +115,39 @@ object ImportJdbc {
92115 " select %s from %s order by %s" .format(colSelect,join,sort)
93116 }
94117
95- def executeQuery (connDb : Connection , query : String ): (Iterator [ IndexedSeq [String ]],IndexedSeq [Column ]) = {
118+ def executeQuery (connDb : Connection , query : String ): (StreamT [ Id , IndexedSeq [String ]],IndexedSeq [Column ]) = {
96119 val stmt = connDb.prepareStatement(query)
97120 val columns = getColumns(stmt)
98121 val rs = stmt.executeQuery()
99- (rsIterator (rs)(row => for (i <- 1 to columns.size) yield row.getString(i)),columns)
122+ (rsStreamT (rs)(row => for (i <- 1 to columns.size) yield row.getString(i)),columns)
100123 }
101124
102- def getConnection (dbUrl : String , user : String , password : String ): Connection = {
103- DriverManager .getConnection(dbUrl, user, password)
125+ def getConnection (dbUrl : String , user : String , password : String , database: Option [String ]): Connection = {
126+ val uri = database.map( dbName=> if (dbUrl.endsWith(dbName)) dbUrl else " %s%s" .format(dbUrl,dbName)).getOrElse(dbUrl)
127+ DriverManager .getConnection(uri, user, password)
104128 }
105129
106- def ingest (connDb : Connection , objName: String , query : String , oTblDesc: Option [IngestInfo ], ingestPath : => String , host : => String , apiKey : => String ) = {
130+ def ingest (connDb : Connection , objName: String , query : String , oTblDesc: Option [IngestInfo ], ingestPath : => String , host : => String , apiKey : => String )(implicit executor : ExecutionContext ): Future [HttpResponse [ByteChunk ]] = {
131+ implicit val M = new FutureMonad (executor)
107132 val (data,columns) = executeQuery(connDb, query)
108133 val tblDesc = oTblDesc.getOrElse(IngestInfo (Seq (ImportTable (objName,names(columns),Left (Table (" base" ))))))
109- val body = buildBody(data, objName, tblDesc)
134+
135+ val dataStream : StreamT [Future ,ByteBuffer ] = buildBody(data, objName, tblDesc)
136+ .map(jv=> ByteBuffer .wrap({val js = " %s\n " .format(jv.renderCompact); print(js); js}.getBytes(" UTF-8" )))
137+
138+ val body : ByteChunk = Right (dataStream)
110139 val fullPath = " %s/ingest/v1/sync/fs%s/%s" .format(host, ingestPath,objName)
111- val httpClient = new HttpClientXLightWeb ()(defaultFutureDispatch)
112140 // TODO add owner account id
113- httpClient.parameters(' apiKey -> apiKey).post(fullPath)(jvalueToChunk(body))
141+ println(" sending to ingest: path=%s query=%s" .format(fullPath,query))
142+ httpClient.parameters(' apiKey -> apiKey).post(fullPath)(body)
114143 }
115144
116- def buildBody (data : Iterator [ IndexedSeq [String ]], baseTable : String , i : IngestInfo ): JArray =
117- JArray (data.foldLeft(( List [ JValue ](), Map [ String , JValue ]()))((lm, r) => {
118- val (l, m) = lm
119- val (values, map) = mkPartialJson(baseTable, i, r, m )
120- (values :: l, map)
121- })._1)
145+ def buildBody (data : StreamT [ Id , Seq [String ]], baseTable : String , i : IngestInfo )( implicit executor : ExecutionContext , m : FutureMonad ) : StreamT [ Future , JValue ] =
146+ StreamT .unfoldM[ Future , JValue , StreamT [ Id , Seq [ String ]]](data)(ds =>
147+ if (ds.isEmpty) Future ( None )
148+ else Future ( mkPartialJson(baseTable,i,ds)) )
149+
150+
122151}
123152
124153
0 commit comments