diff --git a/datasources/pom-template.xml b/datasources/pom-template.xml
index 0d460ac37..1ba678f5f 100644
--- a/datasources/pom-template.xml
+++ b/datasources/pom-template.xml
@@ -46,6 +46,12 @@
2.5
provided
+
+
+ dev.zio
+ zio-cache_${scala-binary-version}
+ 0.1.5
+
diff --git a/datasources/src/main/scala/bootstrap/rudder/plugin/DataSourcesConf.scala b/datasources/src/main/scala/bootstrap/rudder/plugin/DataSourcesConf.scala
index 500b705e6..09413a134 100644
--- a/datasources/src/main/scala/bootstrap/rudder/plugin/DataSourcesConf.scala
+++ b/datasources/src/main/scala/bootstrap/rudder/plugin/DataSourcesConf.scala
@@ -53,6 +53,7 @@ import com.normation.zio._
import com.normation.zio.ZioRuntime
import net.liftweb.common.Box
import org.joda.time.DateTime
+import zio.duration._
/*
* An update hook which triggers a configuration generation if needed
@@ -88,6 +89,7 @@ object DatasourcesConf extends RudderPluginModule {
Cfg.interpolationCompiler,
regenerationHook.hook _,
() => Cfg.configService.rudder_global_policy_mode(),
+ Some(CacheParameters(Int.MaxValue, 2.minutes)),
ZioRuntime.environment
),
Cfg.stringUuidGenerator,
diff --git a/datasources/src/main/scala/com/normation/plugins/datasources/QueryService.scala b/datasources/src/main/scala/com/normation/plugins/datasources/QueryService.scala
index ae26bf9c3..cf864e694 100644
--- a/datasources/src/main/scala/com/normation/plugins/datasources/QueryService.scala
+++ b/datasources/src/main/scala/com/normation/plugins/datasources/QueryService.scala
@@ -85,6 +85,9 @@ trait QueryDataSourceService {
* A version that only query one node - do not use if you want to query several nodes
*/
def queryOne(datasource: DataSource, nodeId: NodeId, cause: UpdateCause): IOResult[NodeUpdateResult]
+
+ // perhaps should be a different service
+ def resetCache: UIO[Unit]
}
/**
@@ -98,16 +101,23 @@ class HttpQueryDataSourceService(
interpolCompiler: InterpolatedValueCompiler,
onUpdatedHook: (Set[NodeId], UpdateCause) => IOResult[Unit],
globalPolicyMode: () => IOResult[GlobalPolicyMode],
+ cacheParam: Option[CacheParameters],
clock: Clock
) extends QueryDataSourceService {
- val getHttp = new GetDataset(interpolCompiler)
+ val getHttp = new GetDataset(interpolCompiler, new QueryHttpServiceImpl(cacheParam))
+
+
/*
* We need a scheduler tailored for I/O, we are mostly doing http requests and
* database things here
*/
+ override def resetCache: UIO[Unit] = {
+ getHttp.resetCache
+ }
+
override def queryAll(datasource: DataSource, cause: UpdateCause): IOResult[Set[NodeUpdateResult]] = {
query[Set[NodeUpdateResult]](
"fetch data for all node",
diff --git a/datasources/src/main/scala/com/normation/plugins/datasources/Repository.scala b/datasources/src/main/scala/com/normation/plugins/datasources/Repository.scala
index df6b0826a..79bc0bbcc 100644
--- a/datasources/src/main/scala/com/normation/plugins/datasources/Repository.scala
+++ b/datasources/src/main/scala/com/normation/plugins/datasources/Repository.scala
@@ -351,6 +351,7 @@ class DataSourceRepoImpl(
override def onUserAskUpdateAllNodes(actor: EventActor): IOResult[Unit] = {
DataSourceLoggerPure.info(s"Fetching data from data sources for all node because ${actor.name} asked for it") *>
+ fetch.resetCache *>
fetchAllNode(actor, None)
}
@@ -358,6 +359,7 @@ class DataSourceRepoImpl(
DataSourceLoggerPure.info(
s"Fetching data from data source '${datasourceId.value}' for all node because ${actor.name} asked for it"
) *>
+ fetch.resetCache *>
fetchAllNode(actor, Some(datasourceId))
}
@@ -375,6 +377,7 @@ class DataSourceRepoImpl(
DataSourceLoggerPure.info(
s"Fetching data from data source for node '${nodeId.value}' because '${actor.name}' asked for it"
) *>
+ fetch.resetCache *>
fetchOneNode(actor, nodeId, None)
}
@@ -382,6 +385,7 @@ class DataSourceRepoImpl(
DataSourceLoggerPure.info(
s"Fetching data from data source for node '${nodeId.value}' because '${actor.name}' asked for it"
) *>
+ fetch.resetCache *>
fetchOneNode(actor, nodeId, Some(datasourceId))
}
diff --git a/datasources/src/main/scala/com/normation/plugins/datasources/Scheduler.scala b/datasources/src/main/scala/com/normation/plugins/datasources/Scheduler.scala
index fca8c5cc1..841a77fb0 100644
--- a/datasources/src/main/scala/com/normation/plugins/datasources/Scheduler.scala
+++ b/datasources/src/main/scala/com/normation/plugins/datasources/Scheduler.scala
@@ -38,7 +38,6 @@
package com.normation.plugins.datasources
import com.normation.errors.IOResult
-import com.normation.errors.effectUioUnit
import com.normation.eventlog.EventActor
import com.normation.eventlog.ModificationId
import com.normation.plugins.PluginStatus
@@ -108,7 +107,7 @@ class DataSourceScheduler(
never.succeed
}
- val msg = s"Automatically fetching data for data source '${datasource.name.value}' (${datasource.id.value}): ${schedule}"
+ val msg = s"Automatically fetching data for data source '${datasource.name.value}' (${datasource.id.value})"
// The full action with logging. We don't want it to be able to fail, because it would stop
// future update. So we catch all error and log them (in debug because they are (should) already log in error, we
diff --git a/datasources/src/main/scala/com/normation/plugins/datasources/UpdateHttpDataset.scala b/datasources/src/main/scala/com/normation/plugins/datasources/UpdateHttpDataset.scala
index e576249a5..41567a296 100644
--- a/datasources/src/main/scala/com/normation/plugins/datasources/UpdateHttpDataset.scala
+++ b/datasources/src/main/scala/com/normation/plugins/datasources/UpdateHttpDataset.scala
@@ -50,6 +50,7 @@ import com.normation.rudder.domain.properties.GlobalParameter
import com.normation.rudder.domain.properties.NodeProperty
import com.normation.rudder.services.policies.InterpolatedValueCompiler
import com.normation.rudder.services.policies.ParamInterpolationContext
+import com.normation.zio.ZioRuntime
import com.softwaremill.quicklens._
import com.typesafe.config.ConfigValue
import net.minidev.json.JSONArray
@@ -60,6 +61,8 @@ import scala.util.control.NonFatal
import scalaj.http.Http
import scalaj.http.HttpOptions
import zio._
+import zio.cache.Cache
+import zio.cache.Lookup
import zio.duration._
import zio.syntax._
@@ -80,10 +83,12 @@ import zio.syntax._
* - parse the json result,
* - return a rudder property with the content.
*/
-class GetDataset(valueCompiler: InterpolatedValueCompiler) {
+class GetDataset(valueCompiler: InterpolatedValueCompiler, queryHttpService: QueryHttpService) {
val compiler = new InterpolateNode(valueCompiler)
+ def resetCache: UIO[Unit] = queryHttpService.resetCache
+
/**
* Get the node property for the configured datasource.
* Return an Option[NodeProperty], where None mean "don't change
@@ -127,7 +132,7 @@ class GetDataset(valueCompiler: InterpolatedValueCompiler) {
headers <- expandMap(expand, datasource.headers)
httpParams <- expandMap(expand, datasource.params)
time_0 <- UIO.effectTotal(System.currentTimeMillis)
- body <- QueryHttp
+ body <- queryHttpService
.QUERY(datasource.httpMethod, url, headers, httpParams, datasource.sslCheck, connectionTimeout, readTimeOut)
.chainError(s"Error when fetching data from ${url}")
_ <- DataSourceLoggerPure.Timing.trace(
@@ -169,8 +174,77 @@ class GetDataset(valueCompiler: InterpolatedValueCompiler) {
}
+trait QueryHttpService {
+
+ def QUERY(
+ method: HttpMethod,
+ url: String,
+ headers: Map[String, String],
+ params: Map[String, String],
+ checkSsl: Boolean,
+ connectionTimeout: Duration,
+ readTimeOut: Duration
+ ): IOResult[Option[String]]
+
+ def resetCache: UIO[Unit]
+}
+
+final case class CacheParameters(cacheMaxItems: Int, cacheDuration: Duration)
+final case class CacheKey(
+ method: HttpMethod,
+ url: String,
+ headers: Map[String, String],
+ params: Map[String, String],
+ checkSsl: Boolean,
+ connectionTimeout: Duration,
+ readTimeOut: Duration
+)
+
+/*
+ * This implementation uses a cache for nodes, so that is several datasources use the same URL for a node,
+ * only one query is done. Cache is limited in size (you should not limit that, else cache will be mostly ineffective,
+ * since a set of nodes will evicted before being used in next datasource (update is done by datasource, not by node)
+ */
+class QueryHttpServiceImpl(useCache: Option[CacheParameters]) extends QueryHttpService {
+
+ val cache = useCache.map { p =>
+ val c: Cache[CacheKey, RudderError, Option[String]] = ZioRuntime.unsafeRun(
+ Cache.make(
+ p.cacheMaxItems,
+ p.cacheDuration,
+ Lookup((key: CacheKey) =>
+ QueryHttp.QUERY(key.method, key.url, key.headers, key.params, key.checkSsl, key.connectionTimeout, key.readTimeOut)
+ )
+ )
+ )
+ c
+ }
+
+ override def resetCache: UIO[Unit] = {
+ cache match {
+ case None => ZIO.unit
+ case Some(c) => c.invalidateAll
+ }
+ }
+
+ override def QUERY(
+ method: HttpMethod,
+ url: String,
+ headers: Map[String, String],
+ params: Map[String, String],
+ checkSsl: Boolean,
+ connectionTimeout: Duration,
+ readTimeOut: Duration
+ ): IOResult[Option[String]] = {
+ cache match {
+ case None => QueryHttp.QUERY(method, url, headers, params, checkSsl, connectionTimeout, readTimeOut)
+ case Some(c) => c.get(CacheKey(method, url, headers, params, checkSsl, connectionTimeout, readTimeOut))
+ }
+ }
+}
+
/*
- * Timeout are given in Milleseconds
+ * Timeout are given in Milliseconds
*/
object QueryHttp {
@@ -206,6 +280,7 @@ object QueryHttp {
}
for {
+ _ <- DataSourceLoggerPure.trace(s"Executing datasource request ${method.name} ${url}...")
response <- IOResult.effect(client.asString)
result <- if (response.isSuccess) {
Some(response.body).succeed
diff --git a/datasources/src/test/scala/com/normation/plugins/datasources/UpdateHttpDatasetTest.scala b/datasources/src/test/scala/com/normation/plugins/datasources/UpdateHttpDatasetTest.scala
index 6659cfaf0..9f19bb003 100644
--- a/datasources/src/test/scala/com/normation/plugins/datasources/UpdateHttpDatasetTest.scala
+++ b/datasources/src/test/scala/com/normation/plugins/datasources/UpdateHttpDatasetTest.scala
@@ -368,7 +368,8 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
List.empty
)
)
- val fetch = new GetDataset(interpolation)
+ val fetch = new GetDataset(interpolation, new QueryHttpServiceImpl(None))
+ val fetchCached = new GetDataset(interpolation, new QueryHttpServiceImpl(Some(CacheParameters(10, 1.minute))))
val parameterRepo = new RoParameterRepository() {
def getAllGlobalParameters() = Seq().succeed
@@ -506,6 +507,7 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
interpolation,
noPostHook,
() => alwaysEnforce.succeed,
+ None,
realClock // this one need a real clock to be able to do the requests
)
val uuidGen = new StringUuidGeneratorImpl()
@@ -546,6 +548,7 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
interpolation,
noPostHook,
() => alwaysEnforce.succeed,
+ None,
realClock
)
val nodeIds = infos.getAll().toBox.openOrThrowException("test shall not throw").keySet
@@ -579,6 +582,7 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
interpolation,
noPostHook,
() => alwaysEnforce.succeed,
+ None,
realClock
)
val nodeIds = infos.getAll().toBox.openOrThrowException("test shall not throw").keySet
@@ -785,6 +789,7 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
interpolation,
noPostHook,
() => alwaysEnforce.succeed,
+ None,
realClock
),
MyDatasource.uuidGen,
@@ -825,6 +830,7 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
interpolation,
noPostHook,
() => alwaysEnforce.succeed,
+ None,
realClock
)
@@ -957,6 +963,24 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
}
}
+ "Getting a node several time with cache should query only one time" >> {
+ val datasource = httpDatasourceTemplate.copy(
+ url = s"${REST_SERVER_URL}/single_$${rudder.node.id}",
+ path = "$.store.${node.properties[get-that]}[:1]"
+ )
+
+ val res = fetch.getNode(DataSourceId("test-get-one-node"), datasource, n1, root, alwaysEnforce, Set(), 1.second, 5.seconds)
+ val resCache =
+ fetchCached.getNode(DataSourceId("test-get-one-node"), datasource, n1, root, alwaysEnforce, Set(), 1.second, 5.seconds)
+
+ // even with two queries, we only hit server one time when cache, two time when no cache
+ ({ NodeDataset.reset(); res.either.runNow } must beRight) and (res.either.runNow must beRight) and
+ (NodeDataset.counterError.get.runNow must_=== 0) and (NodeDataset.counterSuccess.get.runNow must_=== 2) and
+ ({ NodeDataset.reset(); resCache.either.runNow } must beRight) and (resCache.either.runNow must beRight) and
+ (NodeDataset.counterError.get.runNow must_=== 0) and (NodeDataset.counterSuccess.get.runNow must_=== 1)
+
+ }
+
"The full http service" should {
val datasource = NewDataSource(
"test-http-service",
@@ -972,6 +996,7 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
interpolation,
noPostHook,
() => alwaysEnforce.succeed,
+ None,
realClock
)
@@ -1067,6 +1092,7 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
interpolation,
noPostHook,
() => alwaysEnforce.succeed,
+ None,
realClock
)
val datasource = NewDataSource(propName, url = s"${REST_SERVER_URL}/404", path = "$.some.prop", onMissing = onMissing)