Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions datasources/pom-template.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
<version>2.5</version>
<scope>provided</scope>
</dependency>
<!-- cache service -->
<dependency>
<groupId>dev.zio</groupId>
<artifactId>zio-cache_${scala-binary-version}</artifactId>
<version>0.1.5</version>
</dependency>

<!-- Test: rudder -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import com.normation.zio._
import com.normation.zio.ZioRuntime
import net.liftweb.common.Box
import org.joda.time.DateTime
import zio.duration._

/*
* An update hook which triggers a configuration generation if needed
Expand Down Expand Up @@ -88,6 +89,7 @@ object DatasourcesConf extends RudderPluginModule {
Cfg.interpolationCompiler,
regenerationHook.hook _,
() => Cfg.configService.rudder_global_policy_mode(),
Some(CacheParameters(Int.MaxValue, 2.minutes)),
ZioRuntime.environment
),
Cfg.stringUuidGenerator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ trait QueryDataSourceService {
* A version that only query one node - do not use if you want to query several nodes
*/
def queryOne(datasource: DataSource, nodeId: NodeId, cause: UpdateCause): IOResult[NodeUpdateResult]

// perhaps should be a different service
def resetCache: UIO[Unit]
}

/**
Expand All @@ -98,16 +101,23 @@ class HttpQueryDataSourceService(
interpolCompiler: InterpolatedValueCompiler,
onUpdatedHook: (Set[NodeId], UpdateCause) => IOResult[Unit],
globalPolicyMode: () => IOResult[GlobalPolicyMode],
cacheParam: Option[CacheParameters],
clock: Clock
) extends QueryDataSourceService {

val getHttp = new GetDataset(interpolCompiler)
val getHttp = new GetDataset(interpolCompiler, new QueryHttpServiceImpl(cacheParam))



/*
* We need a scheduler tailored for I/O, we are mostly doing http requests and
* database things here
*/

override def resetCache: UIO[Unit] = {
getHttp.resetCache
}

override def queryAll(datasource: DataSource, cause: UpdateCause): IOResult[Set[NodeUpdateResult]] = {
query[Set[NodeUpdateResult]](
"fetch data for all node",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,15 @@ class DataSourceRepoImpl(

override def onUserAskUpdateAllNodes(actor: EventActor): IOResult[Unit] = {
DataSourceLoggerPure.info(s"Fetching data from data sources for all node because ${actor.name} asked for it") *>
fetch.resetCache *>
fetchAllNode(actor, None)
}

override def onUserAskUpdateAllNodesFor(actor: EventActor, datasourceId: DataSourceId): IOResult[Unit] = {
DataSourceLoggerPure.info(
s"Fetching data from data source '${datasourceId.value}' for all node because ${actor.name} asked for it"
) *>
fetch.resetCache *>
fetchAllNode(actor, Some(datasourceId))
}

Expand All @@ -375,13 +377,15 @@ class DataSourceRepoImpl(
DataSourceLoggerPure.info(
s"Fetching data from data source for node '${nodeId.value}' because '${actor.name}' asked for it"
) *>
fetch.resetCache *>
fetchOneNode(actor, nodeId, None)
}

override def onUserAskUpdateNodeFor(actor: EventActor, nodeId: NodeId, datasourceId: DataSourceId): IOResult[Unit] = {
DataSourceLoggerPure.info(
s"Fetching data from data source for node '${nodeId.value}' because '${actor.name}' asked for it"
) *>
fetch.resetCache *>
fetchOneNode(actor, nodeId, Some(datasourceId))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
package com.normation.plugins.datasources

import com.normation.errors.IOResult
import com.normation.errors.effectUioUnit
import com.normation.eventlog.EventActor
import com.normation.eventlog.ModificationId
import com.normation.plugins.PluginStatus
Expand Down Expand Up @@ -108,7 +107,7 @@ class DataSourceScheduler(
never.succeed
}

val msg = s"Automatically fetching data for data source '${datasource.name.value}' (${datasource.id.value}): ${schedule}"
val msg = s"Automatically fetching data for data source '${datasource.name.value}' (${datasource.id.value})"

// The full action with logging. We don't want it to be able to fail, because it would stop
// future update. So we catch all error and log them (in debug because they are (should) already log in error, we
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import com.normation.rudder.domain.properties.GlobalParameter
import com.normation.rudder.domain.properties.NodeProperty
import com.normation.rudder.services.policies.InterpolatedValueCompiler
import com.normation.rudder.services.policies.ParamInterpolationContext
import com.normation.zio.ZioRuntime
import com.softwaremill.quicklens._
import com.typesafe.config.ConfigValue
import net.minidev.json.JSONArray
Expand All @@ -60,6 +61,8 @@ import scala.util.control.NonFatal
import scalaj.http.Http
import scalaj.http.HttpOptions
import zio._
import zio.cache.Cache
import zio.cache.Lookup
import zio.duration._
import zio.syntax._

Expand All @@ -80,10 +83,12 @@ import zio.syntax._
* - parse the json result,
* - return a rudder property with the content.
*/
class GetDataset(valueCompiler: InterpolatedValueCompiler) {
class GetDataset(valueCompiler: InterpolatedValueCompiler, queryHttpService: QueryHttpService) {

val compiler = new InterpolateNode(valueCompiler)

def resetCache: UIO[Unit] = queryHttpService.resetCache

/**
* Get the node property for the configured datasource.
* Return an Option[NodeProperty], where None mean "don't change
Expand Down Expand Up @@ -127,7 +132,7 @@ class GetDataset(valueCompiler: InterpolatedValueCompiler) {
headers <- expandMap(expand, datasource.headers)
httpParams <- expandMap(expand, datasource.params)
time_0 <- UIO.effectTotal(System.currentTimeMillis)
body <- QueryHttp
body <- queryHttpService
.QUERY(datasource.httpMethod, url, headers, httpParams, datasource.sslCheck, connectionTimeout, readTimeOut)
.chainError(s"Error when fetching data from ${url}")
_ <- DataSourceLoggerPure.Timing.trace(
Expand Down Expand Up @@ -169,8 +174,77 @@ class GetDataset(valueCompiler: InterpolatedValueCompiler) {

}

trait QueryHttpService {

def QUERY(
method: HttpMethod,
url: String,
headers: Map[String, String],
params: Map[String, String],
checkSsl: Boolean,
connectionTimeout: Duration,
readTimeOut: Duration
): IOResult[Option[String]]

def resetCache: UIO[Unit]
}

final case class CacheParameters(cacheMaxItems: Int, cacheDuration: Duration)
final case class CacheKey(
method: HttpMethod,
url: String,
headers: Map[String, String],
params: Map[String, String],
checkSsl: Boolean,
connectionTimeout: Duration,
readTimeOut: Duration
)

/*
* This implementation uses a cache for nodes, so that is several datasources use the same URL for a node,
* only one query is done. Cache is limited in size (you should not limit that, else cache will be mostly ineffective,
* since a set of nodes will evicted before being used in next datasource (update is done by datasource, not by node)
*/
class QueryHttpServiceImpl(useCache: Option[CacheParameters]) extends QueryHttpService {

val cache = useCache.map { p =>
val c: Cache[CacheKey, RudderError, Option[String]] = ZioRuntime.unsafeRun(
Cache.make(
p.cacheMaxItems,
p.cacheDuration,
Lookup((key: CacheKey) =>
QueryHttp.QUERY(key.method, key.url, key.headers, key.params, key.checkSsl, key.connectionTimeout, key.readTimeOut)
)
)
)
c
}

override def resetCache: UIO[Unit] = {
cache match {
case None => ZIO.unit
case Some(c) => c.invalidateAll
}
}

override def QUERY(
method: HttpMethod,
url: String,
headers: Map[String, String],
params: Map[String, String],
checkSsl: Boolean,
connectionTimeout: Duration,
readTimeOut: Duration
): IOResult[Option[String]] = {
cache match {
case None => QueryHttp.QUERY(method, url, headers, params, checkSsl, connectionTimeout, readTimeOut)
case Some(c) => c.get(CacheKey(method, url, headers, params, checkSsl, connectionTimeout, readTimeOut))
}
}
}

/*
* Timeout are given in Milleseconds
* Timeout are given in Milliseconds
*/
object QueryHttp {

Expand Down Expand Up @@ -206,6 +280,7 @@ object QueryHttp {
}

for {
_ <- DataSourceLoggerPure.trace(s"Executing datasource request ${method.name} ${url}...")
response <- IOResult.effect(client.asString)
result <- if (response.isSuccess) {
Some(response.body).succeed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,8 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
List.empty
)
)
val fetch = new GetDataset(interpolation)
val fetch = new GetDataset(interpolation, new QueryHttpServiceImpl(None))
val fetchCached = new GetDataset(interpolation, new QueryHttpServiceImpl(Some(CacheParameters(10, 1.minute))))

val parameterRepo = new RoParameterRepository() {
def getAllGlobalParameters() = Seq().succeed
Expand Down Expand Up @@ -506,6 +507,7 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
interpolation,
noPostHook,
() => alwaysEnforce.succeed,
None,
realClock // this one need a real clock to be able to do the requests
)
val uuidGen = new StringUuidGeneratorImpl()
Expand Down Expand Up @@ -546,6 +548,7 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
interpolation,
noPostHook,
() => alwaysEnforce.succeed,
None,
realClock
)
val nodeIds = infos.getAll().toBox.openOrThrowException("test shall not throw").keySet
Expand Down Expand Up @@ -579,6 +582,7 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
interpolation,
noPostHook,
() => alwaysEnforce.succeed,
None,
realClock
)
val nodeIds = infos.getAll().toBox.openOrThrowException("test shall not throw").keySet
Expand Down Expand Up @@ -785,6 +789,7 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
interpolation,
noPostHook,
() => alwaysEnforce.succeed,
None,
realClock
),
MyDatasource.uuidGen,
Expand Down Expand Up @@ -825,6 +830,7 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
interpolation,
noPostHook,
() => alwaysEnforce.succeed,
None,
realClock
)

Expand Down Expand Up @@ -957,6 +963,24 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
}
}

"Getting a node several time with cache should query only one time" >> {
val datasource = httpDatasourceTemplate.copy(
url = s"${REST_SERVER_URL}/single_$${rudder.node.id}",
path = "$.store.${node.properties[get-that]}[:1]"
)

val res = fetch.getNode(DataSourceId("test-get-one-node"), datasource, n1, root, alwaysEnforce, Set(), 1.second, 5.seconds)
val resCache =
fetchCached.getNode(DataSourceId("test-get-one-node"), datasource, n1, root, alwaysEnforce, Set(), 1.second, 5.seconds)

// even with two queries, we only hit server one time when cache, two time when no cache
({ NodeDataset.reset(); res.either.runNow } must beRight) and (res.either.runNow must beRight) and
(NodeDataset.counterError.get.runNow must_=== 0) and (NodeDataset.counterSuccess.get.runNow must_=== 2) and
({ NodeDataset.reset(); resCache.either.runNow } must beRight) and (resCache.either.runNow must beRight) and
(NodeDataset.counterError.get.runNow must_=== 0) and (NodeDataset.counterSuccess.get.runNow must_=== 1)

}

"The full http service" should {
val datasource = NewDataSource(
"test-http-service",
Expand All @@ -972,6 +996,7 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
interpolation,
noPostHook,
() => alwaysEnforce.succeed,
None,
realClock
)

Expand Down Expand Up @@ -1067,6 +1092,7 @@ class UpdateHttpDatasetTest extends Specification with BoxSpecMatcher with Logga
interpolation,
noPostHook,
() => alwaysEnforce.succeed,
None,
realClock
)
val datasource = NewDataSource(propName, url = s"${REST_SERVER_URL}/404", path = "$.some.prop", onMissing = onMissing)
Expand Down