diff --git a/datasources/pom-template.xml b/datasources/pom-template.xml index 0d460ac37..1ba678f5f 100644 --- a/datasources/pom-template.xml +++ b/datasources/pom-template.xml @@ -46,6 +46,12 @@ 2.5 provided + + + dev.zio + zio-cache_${scala-binary-version} + 0.1.5 + diff --git a/datasources/src/main/scala/bootstrap/rudder/plugin/DataSourcesConf.scala b/datasources/src/main/scala/bootstrap/rudder/plugin/DataSourcesConf.scala index 500b705e6..09413a134 100644 --- a/datasources/src/main/scala/bootstrap/rudder/plugin/DataSourcesConf.scala +++ b/datasources/src/main/scala/bootstrap/rudder/plugin/DataSourcesConf.scala @@ -53,6 +53,7 @@ import com.normation.zio._ import com.normation.zio.ZioRuntime import net.liftweb.common.Box import org.joda.time.DateTime +import zio.duration._ /* * An update hook which triggers a configuration generation if needed @@ -88,6 +89,7 @@ object DatasourcesConf extends RudderPluginModule { Cfg.interpolationCompiler, regenerationHook.hook _, () => Cfg.configService.rudder_global_policy_mode(), + Some(CacheParameters(Int.MaxValue, 2.minutes)), ZioRuntime.environment ), Cfg.stringUuidGenerator, diff --git a/datasources/src/main/scala/com/normation/plugins/datasources/QueryService.scala b/datasources/src/main/scala/com/normation/plugins/datasources/QueryService.scala index ae26bf9c3..cf864e694 100644 --- a/datasources/src/main/scala/com/normation/plugins/datasources/QueryService.scala +++ b/datasources/src/main/scala/com/normation/plugins/datasources/QueryService.scala @@ -85,6 +85,9 @@ trait QueryDataSourceService { * A version that only query one node - do not use if you want to query several nodes */ def queryOne(datasource: DataSource, nodeId: NodeId, cause: UpdateCause): IOResult[NodeUpdateResult] + + // perhaps should be a different service + def resetCache: UIO[Unit] } /** @@ -98,16 +101,23 @@ class HttpQueryDataSourceService( interpolCompiler: InterpolatedValueCompiler, onUpdatedHook: (Set[NodeId], UpdateCause) => IOResult[Unit], globalPolicyMode: () => IOResult[GlobalPolicyMode], + cacheParam: Option[CacheParameters], clock: Clock ) extends QueryDataSourceService { - val getHttp = new GetDataset(interpolCompiler) + val getHttp = new GetDataset(interpolCompiler, new QueryHttpServiceImpl(cacheParam)) + + /* * We need a scheduler tailored for I/O, we are mostly doing http requests and * database things here */ + override def resetCache: UIO[Unit] = { + getHttp.resetCache + } + override def queryAll(datasource: DataSource, cause: UpdateCause): IOResult[Set[NodeUpdateResult]] = { query[Set[NodeUpdateResult]]( "fetch data for all node", diff --git a/datasources/src/main/scala/com/normation/plugins/datasources/Repository.scala b/datasources/src/main/scala/com/normation/plugins/datasources/Repository.scala index df6b0826a..79bc0bbcc 100644 --- a/datasources/src/main/scala/com/normation/plugins/datasources/Repository.scala +++ b/datasources/src/main/scala/com/normation/plugins/datasources/Repository.scala @@ -351,6 +351,7 @@ class DataSourceRepoImpl( override def onUserAskUpdateAllNodes(actor: EventActor): IOResult[Unit] = { DataSourceLoggerPure.info(s"Fetching data from data sources for all node because ${actor.name} asked for it") *> + fetch.resetCache *> fetchAllNode(actor, None) } @@ -358,6 +359,7 @@ class DataSourceRepoImpl( DataSourceLoggerPure.info( s"Fetching data from data source '${datasourceId.value}' for all node because ${actor.name} asked for it" ) *> + fetch.resetCache *> fetchAllNode(actor, Some(datasourceId)) } @@ -375,6 +377,7 @@ class DataSourceRepoImpl( DataSourceLoggerPure.info( s"Fetching data from data source for node '${nodeId.value}' because '${actor.name}' asked for it" ) *> + fetch.resetCache *> fetchOneNode(actor, nodeId, None) } @@ -382,6 +385,7 @@ class DataSourceRepoImpl( DataSourceLoggerPure.info( s"Fetching data from data source for node '${nodeId.value}' because '${actor.name}' asked for it" ) *> + fetch.resetCache *> fetchOneNode(actor, nodeId, Some(datasourceId)) } diff --git a/datasources/src/main/scala/com/normation/plugins/datasources/Scheduler.scala b/datasources/src/main/scala/com/normation/plugins/datasources/Scheduler.scala index fca8c5cc1..841a77fb0 100644 --- a/datasources/src/main/scala/com/normation/plugins/datasources/Scheduler.scala +++ b/datasources/src/main/scala/com/normation/plugins/datasources/Scheduler.scala @@ -38,7 +38,6 @@ package com.normation.plugins.datasources import com.normation.errors.IOResult -import com.normation.errors.effectUioUnit import com.normation.eventlog.EventActor import com.normation.eventlog.ModificationId import com.normation.plugins.PluginStatus @@ -108,7 +107,7 @@ class DataSourceScheduler( never.succeed } - val msg = s"Automatically fetching data for data source '${datasource.name.value}' (${datasource.id.value}): ${schedule}" + val msg = s"Automatically fetching data for data source '${datasource.name.value}' (${datasource.id.value})" // The full action with logging. We don't want it to be able to fail, because it would stop // future update. So we catch all error and log them (in debug because they are (should) already log in error, we diff --git a/datasources/src/main/scala/com/normation/plugins/datasources/UpdateHttpDataset.scala b/datasources/src/main/scala/com/normation/plugins/datasources/UpdateHttpDataset.scala index e576249a5..41567a296 100644 --- a/datasources/src/main/scala/com/normation/plugins/datasources/UpdateHttpDataset.scala +++ b/datasources/src/main/scala/com/normation/plugins/datasources/UpdateHttpDataset.scala @@ -50,6 +50,7 @@ import com.normation.rudder.domain.properties.GlobalParameter import com.normation.rudder.domain.properties.NodeProperty import com.normation.rudder.services.policies.InterpolatedValueCompiler import com.normation.rudder.services.policies.ParamInterpolationContext +import com.normation.zio.ZioRuntime import com.softwaremill.quicklens._ import com.typesafe.config.ConfigValue import net.minidev.json.JSONArray @@ -60,6 +61,8 @@ import scala.util.control.NonFatal import scalaj.http.Http import scalaj.http.HttpOptions import zio._ +import zio.cache.Cache +import zio.cache.Lookup import zio.duration._ import zio.syntax._ @@ -80,10 +83,12 @@ import zio.syntax._ * - parse the json result, * - return a rudder property with the content. */ -class GetDataset(valueCompiler: InterpolatedValueCompiler) { +class GetDataset(valueCompiler: InterpolatedValueCompiler, queryHttpService: QueryHttpService) { val compiler = new InterpolateNode(valueCompiler) + def resetCache: UIO[Unit] = queryHttpService.resetCache + /** * Get the node property for the configured datasource. * Return an Option[NodeProperty], where None mean "don't change @@ -127,7 +132,7 @@ class GetDataset(valueCompiler: InterpolatedValueCompiler) { headers <- expandMap(expand, datasource.headers) httpParams <- expandMap(expand, datasource.params) time_0 <- UIO.effectTotal(System.currentTimeMillis) - body <- QueryHttp + body <- queryHttpService .QUERY(datasource.httpMethod, url, headers, httpParams, datasource.sslCheck, connectionTimeout, readTimeOut) .chainError(s"Error when fetching data from ${url}") _ <- DataSourceLoggerPure.Timing.trace( @@ -169,8 +174,77 @@ class GetDataset(valueCompiler: InterpolatedValueCompiler) { } +trait QueryHttpService { + + def QUERY( + method: HttpMethod, + url: String, + headers: Map[String, String], + params: Map[String, String], + checkSsl: Boolean, + connectionTimeout: Duration, + readTimeOut: Duration + ): IOResult[Option[String]] + + def resetCache: UIO[Unit] +} + +final case class CacheParameters(cacheMaxItems: Int, cacheDuration: Duration) +final case class CacheKey( + method: HttpMethod, + url: String, + headers: Map[String, String], + params: Map[String, String], + checkSsl: Boolean, + connectionTimeout: Duration, + readTimeOut: Duration +) + +/* + * This implementation uses a cache for nodes, so that is several datasources use the same URL for a node, + * only one query is done. Cache is limited in size (you should not limit that, else cache will be mostly ineffective, + * since a set of nodes will evicted before being used in next datasource (update is done by datasource, not by node) + */ +class QueryHttpServiceImpl(useCache: Option[CacheParameters]) extends QueryHttpService { + + val cache = useCache.map { p => + val c: Cache[CacheKey, RudderError, Option[String]] = ZioRuntime.unsafeRun( + Cache.make( + p.cacheMaxItems, + p.cacheDuration, + Lookup((key: CacheKey) => + QueryHttp.QUERY(key.method, key.url, key.headers, key.params, key.checkSsl, key.connectionTimeout, key.readTimeOut) + ) + ) + ) + c + } + + override def resetCache: UIO[Unit] = { + cache match { + case None => ZIO.unit + case Some(c) => c.invalidateAll + } + } + + override def QUERY( + method: HttpMethod, + url: String, + headers: Map[String, String], + params: Map[String, String], + checkSsl: Boolean, + connectionTimeout: Duration, + readTimeOut: Duration + ): IOResult[Option[String]] = { + cache match { + case None => QueryHttp.QUERY(method, url, headers, params, checkSsl, connectionTimeout, readTimeOut) + case Some(c) => c.get(CacheKey(method, url, headers, params, checkSsl, connectionTimeout, readTimeOut)) + } + } +} + /* - * Timeout are given in Milleseconds + * Timeout are given in Milliseconds */ object QueryHttp { @@ -206,6 +280,7 @@ object QueryHttp { } for { + _ <- DataSourceLoggerPure.trace(s"Executing datasource request ${method.name} ${url}...") response <- IOResult.effect(client.asString) result <- if (response.isSuccess) { Some(response.body).succeed diff --git a/datasources/src/test/scala/com/normation/plugins/datasources/UpdateHttpDatasetTest.scala b/datasources/src/test/scala/com/normation/plugins/datasources/UpdateHttpDatasetTest.scala index 6659cfaf0..9f19bb003 100644 --- a/datasources/src/test/scala/com/normation/plugins/datasources/UpdateHttpDatasetTest.scala +++ b/datasources/src/test/scala/com/normation/plugins/datasources/UpdateHttpDatasetTest.scala @@ -368,7 +368,8 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga List.empty ) ) - val fetch = new GetDataset(interpolation) + val fetch = new GetDataset(interpolation, new QueryHttpServiceImpl(None)) + val fetchCached = new GetDataset(interpolation, new QueryHttpServiceImpl(Some(CacheParameters(10, 1.minute)))) val parameterRepo = new RoParameterRepository() { def getAllGlobalParameters() = Seq().succeed @@ -506,6 +507,7 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga interpolation, noPostHook, () => alwaysEnforce.succeed, + None, realClock // this one need a real clock to be able to do the requests ) val uuidGen = new StringUuidGeneratorImpl() @@ -546,6 +548,7 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga interpolation, noPostHook, () => alwaysEnforce.succeed, + None, realClock ) val nodeIds = infos.getAll().toBox.openOrThrowException("test shall not throw").keySet @@ -579,6 +582,7 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga interpolation, noPostHook, () => alwaysEnforce.succeed, + None, realClock ) val nodeIds = infos.getAll().toBox.openOrThrowException("test shall not throw").keySet @@ -785,6 +789,7 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga interpolation, noPostHook, () => alwaysEnforce.succeed, + None, realClock ), MyDatasource.uuidGen, @@ -825,6 +830,7 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga interpolation, noPostHook, () => alwaysEnforce.succeed, + None, realClock ) @@ -957,6 +963,24 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga } } + "Getting a node several time with cache should query only one time" >> { + val datasource = httpDatasourceTemplate.copy( + url = s"${REST_SERVER_URL}/single_$${rudder.node.id}", + path = "$.store.${node.properties[get-that]}[:1]" + ) + + val res = fetch.getNode(DataSourceId("test-get-one-node"), datasource, n1, root, alwaysEnforce, Set(), 1.second, 5.seconds) + val resCache = + fetchCached.getNode(DataSourceId("test-get-one-node"), datasource, n1, root, alwaysEnforce, Set(), 1.second, 5.seconds) + + // even with two queries, we only hit server one time when cache, two time when no cache + ({ NodeDataset.reset(); res.either.runNow } must beRight) and (res.either.runNow must beRight) and + (NodeDataset.counterError.get.runNow must_=== 0) and (NodeDataset.counterSuccess.get.runNow must_=== 2) and + ({ NodeDataset.reset(); resCache.either.runNow } must beRight) and (resCache.either.runNow must beRight) and + (NodeDataset.counterError.get.runNow must_=== 0) and (NodeDataset.counterSuccess.get.runNow must_=== 1) + + } + "The full http service" should { val datasource = NewDataSource( "test-http-service", @@ -972,6 +996,7 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga interpolation, noPostHook, () => alwaysEnforce.succeed, + None, realClock ) @@ -1067,6 +1092,7 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga interpolation, noPostHook, () => alwaysEnforce.succeed, + None, realClock ) val datasource = NewDataSource(propName, url = s"${REST_SERVER_URL}/404", path = "$.some.prop", onMissing = onMissing)