|
| 1 | +package com.precog.tools.importers.mongo |
| 2 | + |
| 3 | +import com.mongodb.casbah.Imports._ |
| 4 | +import blueeyes.persistence.mongo.json.BijectionsMongoJson._ |
| 5 | +import blueeyes.persistence.mongo.json.BijectionsMongoJson.MongoToJson._ |
| 6 | +import com.mongodb.casbah.commons.TypeImports.ObjectId |
| 7 | +import scalaz.{Monad, StreamT} |
| 8 | +import akka.dispatch.{Await, Future} |
| 9 | +import blueeyes.json.{JObject, JString, JParser, JValue} |
| 10 | +import blueeyes.core.service.engines.HttpClientXLightWeb |
| 11 | +import blueeyes.bkka.AkkaDefaults._ |
| 12 | +import blueeyes.core.data.DefaultBijections._ |
| 13 | +import blueeyes.bkka.AkkaDefaults.defaultFutureDispatch |
| 14 | +import blueeyes.bkka.FutureMonad |
| 15 | +import blueeyes.core.data.ByteChunk |
| 16 | +import java.nio.ByteBuffer |
| 17 | +import akka.util.Duration |
| 18 | +import blueeyes.core.http.HttpResponse |
| 19 | +import blueeyes.core.service._ |
| 20 | +import annotation.tailrec |
| 21 | + |
| 22 | +/** |
| 23 | + * User: gabriel |
| 24 | + * Date: 1/17/13 |
| 25 | + */ |
| 26 | +object ImportMongo { |
| 27 | + |
| 28 | + implicit val as=actorSystem |
| 29 | + implicit val executionContext = defaultFutureDispatch |
| 30 | + implicit val M: Monad[Future] = new FutureMonad(executionContext) |
| 31 | + |
| 32 | + def parseInt(s : String) : Option[Int] = try { |
| 33 | + Some(s.toInt) |
| 34 | + } catch { |
| 35 | + case _ : java.lang.NumberFormatException => None |
| 36 | + } |
| 37 | + |
| 38 | + def main(args:Array[String]){ |
| 39 | + |
| 40 | + if (args.length != 5) { |
| 41 | + println("Wrong number of parameters.") |
| 42 | + println("Usage: ImportMongo mongo_host mongo_port precog_host precog_ingest_path precog_apiKey") |
| 43 | + actorSystem.shutdown() |
| 44 | + sys.exit(1) |
| 45 | + } |
| 46 | + |
| 47 | + val mongoHost=args(0) |
| 48 | + val mongoPort=parseInt(args(1)).get |
| 49 | + |
| 50 | + val precogHost=args(2) |
| 51 | + val basePath=args(3) |
| 52 | + val apiKey=args(4) |
| 53 | + |
| 54 | + implicit val mongoConn= MongoConnection(mongoHost,mongoPort) |
| 55 | + |
| 56 | + @tailrec |
| 57 | + def readConfigLine(acc:List[String]):List[String]={ |
| 58 | + val line=readLine() |
| 59 | + if (line != null && line != ""){ |
| 60 | + if (line.startsWith("#")) readConfigLine(acc) //skip lines starting with # |
| 61 | + else readConfigLine(line::acc) |
| 62 | + } else acc |
| 63 | + } |
| 64 | + println("# enter json import descriptors, EOF or empty line to continue") |
| 65 | + println("""# format: { "database":"<database name>", "collection":"<database name>" } or { "database":"<database name>", "collection":"<database name>", "lastId":"<last id>" } """) |
| 66 | + val jsonImputs=readConfigLine(Nil) |
| 67 | + val fresults=jsonImputs.map(JParser.parseFromString(_).map(importCollection(precogHost,basePath,apiKey,_))).flatMap(_.toList) |
| 68 | + |
| 69 | + val continueJson=Await.result(Future.sequence(fresults), Duration("24 hours")) |
| 70 | + println("#################################################################") |
| 71 | + println("# to continue ingestion from last point, use the following imput:") |
| 72 | + println(continueJson.mkString("\n")) |
| 73 | + |
| 74 | + actorSystem.shutdown() |
| 75 | + |
| 76 | + } |
| 77 | + |
| 78 | + def importCollection(precogHost:String, basePath:String, apiKey:String, jparams: JValue) (implicit mongoConn: MongoConnection):Future[String]={ |
| 79 | + def strValue(jv: JValue) = (jv --> classOf[JString]).value |
| 80 | + val dbName = strValue(jparams \ "database") |
| 81 | + val collName = strValue(jparams \ "collection") |
| 82 | + val lastId = (jparams \? "lastId").map(strValue(_)) getOrElse ("000000000000000000000000") |
| 83 | + |
| 84 | + val fdsid = Future { |
| 85 | + readFromMongo(mongoConn, dbName, collName, lastId) |
| 86 | + } |
| 87 | + val (fds, fmaxId) = (fdsid map (_._1), fdsid map (_._2)) |
| 88 | + val fjsons = fds.map(_.flatMap(MongoToJson(_).toStream)) |
| 89 | + val fullPath = "%s/ingest/v1/sync/fs%s/%s".format(precogHost, basePath, collName) |
| 90 | + val data = StreamT.fromStream[Future, JValue](fjsons) |
| 91 | + val fresult = M.lift2((a: HttpResponse[ByteChunk], b: ObjectId) => (a, b))(sendToPrecog(fullPath, apiKey, data), fmaxId) |
| 92 | + |
| 93 | + fresult.map(r => { |
| 94 | + val (result, oid) = r |
| 95 | + result match { |
| 96 | + case HttpResponse(_, _, Some(Left(buffer)), _) => { |
| 97 | + println("### result from precog: %s".format(new String(buffer.array(), "UTF-8"))) |
| 98 | + } |
| 99 | + case _ => println("### error: %s".format(result.toString())) |
| 100 | + } |
| 101 | + """{ "database":"%s", "collection":"%s" "lastId":"%s" }""".format(dbName, collName, oid) |
| 102 | + }) |
| 103 | + } |
| 104 | + |
| 105 | + def readFromMongo(mongoConn: MongoConnection, dbName: String, colName: String, oid:String):(Stream[DBObject],ObjectId)={ |
| 106 | + val mongoDB = mongoConn(dbName) |
| 107 | + val mongoColl = mongoDB(colName) |
| 108 | + val q = "_id" $gt (new ObjectId(oid)) |
| 109 | + val rStrm=mongoColl.find(q).toStream //.view ? |
| 110 | + val (oids,dataStrm)=rStrm.map(m=>(m.get("_id").asInstanceOf[ObjectId],m)).unzip |
| 111 | + (dataStrm,oids.max) |
| 112 | + } |
| 113 | + |
| 114 | + def sendToPrecog(fullPath:String, apiKey:String, dataStream:StreamT[Future,JValue]): Future[HttpResponse[ByteChunk]] = { |
| 115 | + |
| 116 | + val httpClient = new HttpClientXLightWeb()(defaultFutureDispatch) |
| 117 | + |
| 118 | + val byteStream: StreamT[Future, ByteBuffer] = dataStream.map(jv => ByteBuffer.wrap({ |
| 119 | + val js = "%s\n".format(jv.renderCompact) |
| 120 | + print("# %s".format(js)) |
| 121 | + js |
| 122 | + }.getBytes("UTF-8"))) |
| 123 | + |
| 124 | + //get the last/biggest id |
| 125 | + val byteChunks: ByteChunk = Right(byteStream) |
| 126 | + httpClient.parameters('apiKey -> apiKey).post(fullPath)(byteChunks) |
| 127 | + } |
| 128 | +} |
0 commit comments