@@ -6,7 +6,7 @@ import blueeyes.persistence.mongo.json.BijectionsMongoJson.MongoToJson._
66import com .mongodb .casbah .commons .TypeImports .ObjectId
77import scalaz .{Monad , StreamT }
88import akka .dispatch .{Await , Future }
9- import blueeyes .json .{ JObject , JString , JParser , JValue }
9+ import blueeyes .json ._
1010import blueeyes .core .service .engines .HttpClientXLightWeb
1111import blueeyes .bkka .AkkaDefaults ._
1212import blueeyes .core .data .DefaultBijections ._
@@ -17,7 +17,17 @@ import java.nio.ByteBuffer
1717import akka .util .Duration
1818import blueeyes .core .http .HttpResponse
1919import blueeyes .core .service ._
20- import annotation .tailrec
20+ import com .precog .tools .importers .common ._
21+ import ConsoleUtils ._
22+ import scala .Left
23+ import scala .Some
24+ import scala .Right
25+ import scala .Left
26+ import scala .Some
27+ import scala .Right
28+ import com .mongodb
29+ import collection .JavaConversions .SeqWrapper
30+
2131
2232/**
2333 * User: gabriel
@@ -29,12 +39,50 @@ object ImportMongo {
2939 implicit val executionContext = defaultFutureDispatch
3040 implicit val M : Monad [Future ] = new FutureMonad (executionContext)
3141
42+ val configDb = " _precog_mongo_importer"
43+ val collsConfig = " collections_to_import"
44+ val sampleSize = 100
45+
3246 def parseInt (s : String ) : Option [Int ] = try {
3347 Some (s.toInt)
3448 } catch {
3549 case _ : java.lang.NumberFormatException => None
3650 }
3751
52+ // No @tailrec but we don't expect getting back from mongoDb a hierarchy big enough to blow the stack
53+ def columnsOf (bObject : MongoDBObject ): Seq [String ]= {
54+ bObject.flatMap(kv => kv._2 match {
55+ case m: MongoDBObject => columnsOf(m).map(" %s.%s" .format(kv._1,_))
56+ case _ => Set (kv._1)
57+ }).toSeq
58+ }
59+
60+ def sampleColumns (db : String , coll : String )(implicit mongoConn: MongoConnection )= {
61+ val collection = mongoConn(db)(coll).find().take(sampleSize)
62+ collection.flatMap(columnsOf(_)).toSet
63+ }
64+
65+ def configureCollections (connection : MongoConnection ): Seq [DBObject ]= {
66+ implicit val c = connection
67+ println(" No configuration found in the mongo instance, creating a new one." )
68+ val databases = selectSet(" database" ,connection.databaseNames)
69+ val dbColls = databases.map( db=> { println(" Database %s" .format(db)); (db,selectSet(" collection" ,connection(db).getCollectionNames().toSeq))})
70+ dbColls.flatMap(dbColl => {
71+ val (db,colls) = dbColl
72+ colls.map( coll => {
73+ val fields = if (readLine(" Sample and select columns of %s.%s? (y/N)" .format(db,coll)).toLowerCase == " y" ){
74+ Some (selectSet(" column" , sampleColumns(db,coll).toSeq ))
75+ } else {
76+ None
77+ }
78+ val dbObj = MongoDBObject (" database" -> db, " collection" -> coll)
79+ fields.map(flds=> dbObj ++ (" fields" -> flds)).getOrElse(dbObj)
80+ }
81+ )
82+ }
83+ )
84+ }
85+
3886 def main (args: Array [String ]){
3987
4088 if (args.length != 5 ) {
@@ -50,65 +98,76 @@ object ImportMongo {
5098 val precogHost = args(2 )
5199 val basePath = args(3 )
52100 val apiKey = args(4 )
101+ try {
102+ implicit val mongoConn = MongoConnection (mongoHost,mongoPort)
53103
54- implicit val mongoConn = MongoConnection (mongoHost,mongoPort )
104+ val inputConfigColl = mongoConn(configDb)(collsConfig )
55105
56- @ tailrec
57- def readConfigLine (acc: List [String ]): List [String ]= {
58- val line = readLine()
59- if (line != null && line != " " ){
60- if (line.startsWith(" #" )) readConfigLine(acc) // skip lines starting with #
61- else readConfigLine(line:: acc)
62- } else acc
63- }
64- println(" # enter json import descriptors, EOF or empty line to continue" )
65- println(""" # format: { "database":"<database name>", "collection":"<database name>" } or { "database":"<database name>", "collection":"<database name>", "lastId":"<last id>" } """ )
66- val jsonImputs = readConfigLine(Nil )
67- val fresults = jsonImputs.map(JParser .parseFromString(_).map(importCollection(precogHost,basePath,apiKey,_))).flatMap(_.toList)
106+ // workaround
107+ if (inputConfigColl.isEmpty) {
108+ val configs = configureCollections(mongoConn)
109+ configs.map(inputConfigColl.save(_))
110+ }
111+ val jsonImputs = inputConfigColl.find().toList
68112
69- val continueJson = Await .result(Future .sequence(fresults), Duration (" 24 hours" ))
70- println(" #################################################################" )
71- println(" # to continue ingestion from last point, use the following imput:" )
72- println(continueJson.mkString(" \n " ))
113+ val fimports = jsonImputs.flatMap(x=> MongoToJson (x).toList.map(importCollection(precogHost,basePath,apiKey,_)))
73114
74- actorSystem.shutdown( )
115+ val fresults = Await .result( Future .sequence(fimports), Duration ( " 24 hours " ) )
75116
117+ jsonImputs.zip(fresults).map( r => {
118+ val (mDbObj,(result,lastId)) = r
119+ println(" %s" .format(result))
120+ inputConfigColl.save(mDbObj++ (" lastId" -> lastId)) // JsonToMongo(continueJson).map(inputConfigColl.save(_))
121+ }
122+ )
123+ } finally {
124+ println(" Shutting down..." )
125+ actorSystem.shutdown()
126+ }
76127 }
77128
78- def importCollection (precogHost: String , basePath: String , apiKey: String , jparams : JValue ) (implicit mongoConn : MongoConnection ): Future [String ]= {
79- def strValue (jv : JValue ) = (jv --> classOf [JString ]).value
80- val dbName = strValue(jparams \ " database" )
81- val collName = strValue(jparams \ " collection" )
82- val lastId = (jparams \? " lastId" ).map(strValue(_)) getOrElse (" 000000000000000000000000" )
129+ def pair [T ](getter : String => T )(name: String ) = (name-> getter(name))
83130
131+ def getString (jo : JObject )(field: String ) = strValue(jo \ field)
132+ def getArray (jo : JObject )(field: String ) = arrOfStrValues(jo \ field)
133+
134+ def strValue (jv : JValue ) = (jv --> classOf [JString ]).value
135+ def arrOfStrValues (jv : JValue ) = (jv -->? classOf [JArray ]).map(_.elements.map(strValue(_))).getOrElse(Nil )
136+
137+ def importCollection (precogHost: String , basePath: String , apiKey: String , jparams : JObject ) (implicit mongoConn : MongoConnection ): Future [(String ,String )]= {
138+ val dbName = getString(jparams)(" database" )
139+ val collName = getString(jparams)(" collection" )
140+ val fieldNames = getArray(jparams)(" fields" )
141+ val lastId = (jparams \? " lastId" ).map(strValue(_)) getOrElse (" 000000000000000000000000" )
84142 val fdsid = Future {
85- readFromMongo(mongoConn, dbName, collName, lastId)
143+ val rStrm = readFromMongo(mongoConn, dbName, collName, lastId, fieldNames)
144+ val (oids,dataStrm)= rStrm.map(m=> (m.get(" _id" ).asInstanceOf [ObjectId ],m)).unzip
145+ val maxOid = if (oids.isEmpty) lastId else oids.max.toStringMongod
146+ (dataStrm,maxOid)
86147 }
87148 val (fds, fmaxId) = (fdsid map (_._1), fdsid map (_._2))
149+
88150 val fjsons = fds.map(_.flatMap(MongoToJson (_).toStream))
89- val fullPath = " %s/ingest/v1/sync/fs%s/%s /%s" .format(precogHost, basePath, dbName, collName)
90- val data = StreamT .fromStream[Future , JValue ](fjsons)
91- val fresult = M .lift2(( a : HttpResponse [ ByteChunk ], b : ObjectId ) => (a, b))(sendToPrecog(fullPath, apiKey, data), fmaxId)
92-
93- fresult. map(r => {
94- val (result, oid) = r
95- result match {
96- case HttpResponse (_, _, Some ( Left (buffer)), _) => {
97- println( " ### result from precog : %s" .format(new String (buffer.array(), " UTF-8 " ) ))
151+ val fullPath = " %s/ingest/v1/sync/fs%s/%sr /%s" .format(precogHost, basePath, dbName, collName)
152+ val data = StreamT .fromStream[Future , JObject ](fjsons)
153+ val fsend = data.isEmpty.flatMap( isEmpty =>
154+ if (isEmpty) Future ( " No new data found in %s.%s " .format(dbName,collName))
155+ else sendToPrecog(fullPath, apiKey, data) map( _ match {
156+ case HttpResponse (_, _, Some ( Left (buffer)), _) => {
157+ " Result from precog: %s " .format( new String (buffer.array(), " UTF-8 " ))
158+ }
159+ case result => " Error : %s" .format(result.toString( ))
98160 }
99- case _ => println(" ### error: %s" .format(result.toString()))
100- }
101- """ { "database":"%s", "collection":"%s" "lastId":"%s" }""" .format(dbName, collName, oid)
102- })
161+ ))
162+ M .lift2((a : String , b : String ) => (a, b))(fsend, fmaxId)
103163 }
104164
105- def readFromMongo (mongoConn : MongoConnection , dbName : String , colName : String , oid: String ) : ( Stream [DBObject ], ObjectId ) = {
165+ def readFromMongo (mongoConn : MongoConnection , dbName : String , colName : String , oid: String , fieldNames : Seq [ String ]) : Stream [DBObject ]= {
106166 val mongoDB = mongoConn(dbName)
107167 val mongoColl = mongoDB(colName)
108168 val q = " _id" $gt (new ObjectId (oid))
109- val rStrm = mongoColl.find(q).toStream // .view ?
110- val (oids,dataStrm)= rStrm.map(m=> (m.get(" _id" ).asInstanceOf [ObjectId ],m)).unzip
111- (dataStrm,oids.max)
169+ val fields = MongoDBObject (fieldNames.map(_-> " " ):_* )
170+ mongoColl.find(q,fields).toStream // .view ?
112171 }
113172
114173 def sendToPrecog (fullPath: String , apiKey: String , dataStream: StreamT [Future ,JValue ]): Future [HttpResponse [ByteChunk ]] = {
@@ -117,7 +176,7 @@ object ImportMongo {
117176
118177 val byteStream : StreamT [Future , ByteBuffer ] = dataStream.map(jv => ByteBuffer .wrap({
119178 val js = " %s\n " .format(jv.renderCompact)
120- print(" # %s" .format(js))
179+ print(" %s" .format(js))
121180 js
122181 }.getBytes(" UTF-8" )))
123182
0 commit comments