Skip to content

Commit 2ace98b

Browse files
Implement action versioning
1 parent af16122 commit 2ace98b

28 files changed

Lines changed: 600 additions & 209 deletions

File tree

ansible/files/whisks_design_document_for_entities_db_v2.1.0.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
"triggers": {
2222
"map": "function (doc) {\n var PATHSEP = \"/\";\n var isTrigger = function (doc) { return (doc.exec === undefined && doc.binding === undefined && doc.parameters !== undefined) };\n if (isTrigger(doc)) try {\n var ns = doc.namespace.split(PATHSEP);\n var root = ns[0];\n var value = {\n namespace: doc.namespace,\n name: doc.name,\n version: doc.version,\n publish: doc.publish,\n annotations: doc.annotations,\n updated: doc.updated\n };\n emit([doc.namespace, doc.updated], value);\n if (root !== doc.namespace) {\n emit([root, doc.updated], value);\n }\n } catch (e) {}\n}",
2323
"reduce": "_count"
24+
},
25+
"action-versions": {
26+
"map": "function (doc) {\n var isAction = function (doc) { return (doc.exec !== undefined) };\n if (isAction(doc)) try {\n var value = {\n namespace: doc.namespace,\n name: doc.name,\n id: doc._id,\n version: doc.version,\n };\n emit([doc.namespace + \"/\" + doc.name], value);\n } catch (e) {}\n}"
2427
}
2528
}
26-
}
29+
}

common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ case class ActivationMessage(override val transid: TransactionId,
5353
revision: DocRevision,
5454
user: Identity,
5555
activationId: ActivationId,
56+
actionId: DocId,
5657
rootControllerIndex: ControllerInstanceId,
5758
blocking: Boolean,
5859
content: Option[JsObject],
@@ -171,7 +172,7 @@ object ActivationMessage extends DefaultJsonProtocol {
171172
def parse(msg: String) = Try(serdes.read(msg.parseJson))
172173

173174
private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
174-
implicit val serdes = jsonFormat11(ActivationMessage.apply)
175+
implicit val serdes = jsonFormat12(ActivationMessage.apply)
175176
}
176177

177178
object CombinedCompletionAndResultMessage extends DefaultJsonProtocol {

common/scala/src/main/scala/org/apache/openwhisk/core/database/RemoteCacheInvalidation.scala

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,16 @@ import org.apache.openwhisk.core.WhiskConfig
3333
import org.apache.openwhisk.core.connector.Message
3434
import org.apache.openwhisk.core.connector.MessageFeed
3535
import org.apache.openwhisk.core.connector.MessagingProvider
36-
import org.apache.openwhisk.core.entity.CacheKey
37-
import org.apache.openwhisk.core.entity.ControllerInstanceId
38-
import org.apache.openwhisk.core.entity.WhiskAction
39-
import org.apache.openwhisk.core.entity.WhiskActionMetaData
40-
import org.apache.openwhisk.core.entity.WhiskPackage
41-
import org.apache.openwhisk.core.entity.WhiskRule
42-
import org.apache.openwhisk.core.entity.WhiskTrigger
36+
import org.apache.openwhisk.core.entity.{
37+
CacheKey,
38+
ControllerInstanceId,
39+
WhiskAction,
40+
WhiskActionMetaData,
41+
WhiskActionVersionList,
42+
WhiskPackage,
43+
WhiskRule,
44+
WhiskTrigger
45+
}
4346
import org.apache.openwhisk.spi.SpiLoader
4447

4548
case class CacheInvalidationMessage(key: CacheKey, instanceId: String) extends Message {
@@ -92,6 +95,7 @@ class RemoteCacheInvalidation(config: WhiskConfig, component: String, instance:
9295
WhiskPackage.removeId(msg.key)
9396
WhiskRule.removeId(msg.key)
9497
WhiskTrigger.removeId(msg.key)
98+
WhiskActionVersionList.removeId(msg.key)
9599
}
96100
}
97101
case Failure(t) => logging.error(this, s"failed processing message: $raw with $t")

common/scala/src/main/scala/org/apache/openwhisk/core/entity/SemVer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ protected[core] object SemVer {
7878
* @return SemVer instance
7979
* @thrown IllegalArgumentException if string is not a valid semantic version
8080
*/
81-
protected[entity] def apply(str: String): SemVer = {
81+
protected[core] def apply(str: String): SemVer = {
8282
try {
8383
val parts = if (str != null && str.nonEmpty) str.split('.') else Array[String]()
8484
val major = if (parts.size >= 1) parts(0).toInt else 0

common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskAction.scala

Lines changed: 120 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,14 @@ import scala.concurrent.Future
2929
import scala.util.{Failure, Success, Try}
3030
import spray.json._
3131
import spray.json.DefaultJsonProtocol._
32-
import org.apache.openwhisk.common.TransactionId
33-
import org.apache.openwhisk.core.database.ArtifactStore
34-
import org.apache.openwhisk.core.database.DocumentFactory
35-
import org.apache.openwhisk.core.database.CacheChangeNotification
32+
import org.apache.openwhisk.common.{Logging, TransactionId}
33+
import org.apache.openwhisk.core.database.{
34+
ArtifactStore,
35+
CacheChangeNotification,
36+
DocumentFactory,
37+
MultipleReadersSingleWriterCache,
38+
StaleParameter
39+
}
3640
import org.apache.openwhisk.core.entity.Attachments._
3741
import org.apache.openwhisk.core.entity.types.EntityStore
3842

@@ -106,8 +110,10 @@ abstract class WhiskActionLike(override val name: EntityName) extends WhiskEntit
106110
"annotations" -> annotations.toJson)
107111
}
108112

109-
abstract class WhiskActionLikeMetaData(override val name: EntityName) extends WhiskActionLike(name) {
113+
abstract class WhiskActionLikeMetaData(override val name: EntityName, val docId: DocId) extends WhiskActionLike(name) {
110114
override def exec: ExecMetaDataBase
115+
116+
override def docid = docId
111117
}
112118

113119
/**
@@ -143,6 +149,8 @@ case class WhiskAction(namespace: EntityPath,
143149
require(exec != null, "exec undefined")
144150
require(limits != null, "limits undefined")
145151

152+
override def docid = DocId(fullyQualifiedName(true).asString)
153+
146154
/**
147155
* Merges parameters (usually from package) with existing action parameters.
148156
* Existing parameters supersede those in p.
@@ -173,7 +181,7 @@ case class WhiskAction(namespace: EntityPath,
173181
def toExecutableWhiskAction: Option[ExecutableWhiskAction] = exec match {
174182
case codeExec: CodeExec[_] =>
175183
Some(
176-
ExecutableWhiskAction(namespace, name, codeExec, parameters, limits, version, publish, annotations)
184+
ExecutableWhiskAction(namespace, name, docid, codeExec, parameters, limits, version, publish, annotations)
177185
.revision[ExecutableWhiskAction](rev))
178186
case _ => None
179187
}
@@ -198,6 +206,7 @@ case class WhiskAction(namespace: EntityPath,
198206
@throws[IllegalArgumentException]
199207
case class WhiskActionMetaData(namespace: EntityPath,
200208
override val name: EntityName,
209+
override val docId: DocId,
201210
exec: ExecMetaDataBase,
202211
parameters: Parameters = Parameters(),
203212
limits: ActionLimits = ActionLimits(),
@@ -206,7 +215,7 @@ case class WhiskActionMetaData(namespace: EntityPath,
206215
annotations: Parameters = Parameters(),
207216
override val updated: Instant = WhiskEntity.currentMillis(),
208217
binding: Option[EntityPath] = None)
209-
extends WhiskActionLikeMetaData(name) {
218+
extends WhiskActionLikeMetaData(name, docId) {
210219

211220
require(exec != null, "exec undefined")
212221
require(limits != null, "limits undefined")
@@ -238,6 +247,7 @@ case class WhiskActionMetaData(namespace: EntityPath,
238247
ExecutableWhiskActionMetaData(
239248
namespace,
240249
name,
250+
docId,
241251
execMetaData,
242252
parameters,
243253
limits,
@@ -276,6 +286,7 @@ case class WhiskActionMetaData(namespace: EntityPath,
276286
@throws[IllegalArgumentException]
277287
case class ExecutableWhiskAction(namespace: EntityPath,
278288
override val name: EntityName,
289+
docId: DocId,
279290
exec: CodeExec[_],
280291
parameters: Parameters = Parameters(),
281292
limits: ActionLimits = ActionLimits(),
@@ -322,20 +333,21 @@ case class ExecutableWhiskAction(namespace: EntityPath,
322333
@throws[IllegalArgumentException]
323334
case class ExecutableWhiskActionMetaData(namespace: EntityPath,
324335
override val name: EntityName,
336+
override val docId: DocId,
325337
exec: ExecMetaData,
326338
parameters: Parameters = Parameters(),
327339
limits: ActionLimits = ActionLimits(),
328340
version: SemVer = SemVer(),
329341
publish: Boolean = false,
330342
annotations: Parameters = Parameters(),
331343
binding: Option[EntityPath] = None)
332-
extends WhiskActionLikeMetaData(name) {
344+
extends WhiskActionLikeMetaData(name, docId) {
333345

334346
require(exec != null, "exec undefined")
335347
require(limits != null, "limits undefined")
336348

337349
def toWhiskAction =
338-
WhiskActionMetaData(namespace, name, exec, parameters, limits, version, publish, annotations, updated)
350+
WhiskActionMetaData(namespace, name, docId, exec, parameters, limits, version, publish, annotations, updated)
339351
.revision[WhiskActionMetaData](rev)
340352

341353
/**
@@ -346,6 +358,74 @@ case class ExecutableWhiskActionMetaData(namespace: EntityPath,
346358

347359
}
348360

361+
case class WhiskActionVersion(id: String, namespace: EntityPath, name: EntityName, version: SemVer)
362+
363+
object WhiskActionVersion {
364+
val serdes = jsonFormat4(WhiskActionVersion.apply)
365+
}
366+
367+
case class WhiskActionVersionList(namespace: EntityPath, name: EntityName, versions: Map[SemVer, String]) {
368+
def matchedDocId(version: Option[SemVer]): Option[DocId] = {
369+
version match {
370+
case Some(ver) =>
371+
versions.get(ver).map(DocId(_))
372+
case None if versions.nonEmpty =>
373+
Some(DocId(versions.maxBy(_._1.toString)._2))
374+
case _ =>
375+
None
376+
}
377+
}
378+
}
379+
380+
object WhiskActionVersionList extends MultipleReadersSingleWriterCache[WhiskActionVersionList, DocInfo] {
381+
lazy val viewName = WhiskQueries.entitiesView(collection = "action-versions").name
382+
383+
def cacheKey(action: FullyQualifiedEntityName): CacheKey = {
384+
CacheKey(action.fullPath.asString)
385+
}
386+
387+
def get(action: FullyQualifiedEntityName, datastore: EntityStore)(
388+
implicit transId: TransactionId): Future[WhiskActionVersionList] = {
389+
implicit val logger: Logging = datastore.logging
390+
implicit val ec = datastore.executionContext
391+
392+
val key = List(action.fullPath.asString)
393+
cacheLookup(
394+
cacheKey(action),
395+
datastore
396+
.query(
397+
viewName,
398+
startKey = key,
399+
endKey = key,
400+
skip = 0,
401+
limit = 0,
402+
includeDocs = false,
403+
descending = false,
404+
reduce = false,
405+
stale = StaleParameter.No)
406+
.map { result =>
407+
val values = result.map { row =>
408+
row.fields("value").asJsObject()
409+
}
410+
val mappings = values
411+
.map(WhiskActionVersion.serdes.read(_))
412+
.map { actionVersion =>
413+
(actionVersion.version, actionVersion.id)
414+
}
415+
.toMap
416+
WhiskActionVersionList(action.namespace.toPath, action.name, mappings)
417+
})
418+
}
419+
420+
// delete cache
421+
def deleteCache(action: FullyQualifiedEntityName)(implicit transId: TransactionId,
422+
ec: ExecutionContext,
423+
logger: Logging,
424+
notifier: Option[CacheChangeNotification]) = {
425+
cacheInvalidate(cacheKey(action), Future.successful(()))
426+
}
427+
}
428+
349429
object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[WhiskAction] with DefaultJsonProtocol {
350430
import WhiskActivation.instantSerdes
351431
val execFieldName = "exec"
@@ -410,7 +490,11 @@ object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[
410490
old)
411491
}
412492
} match {
413-
case Success(f) => f
493+
case Success(f) =>
494+
implicit val ec = db.executionContext
495+
implicit val logger = db.logging
496+
WhiskActionVersionList.deleteCache(doc.fullyQualifiedName(false))
497+
f
414498
case Failure(f) => Future.failed(f)
415499
}
416500
}
@@ -532,14 +616,18 @@ object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[
532616
* If it's the actual package, use its name directly as the package path name.
533617
* While traversing the package bindings, merge the parameters.
534618
*/
535-
def resolveActionAndMergeParameters(entityStore: EntityStore, fullyQualifiedName: FullyQualifiedEntityName)(
536-
implicit ec: ExecutionContext,
537-
transid: TransactionId): Future[WhiskAction] = {
619+
def resolveActionAndMergeParameters(
620+
entityStore: EntityStore,
621+
fullyQualifiedName: FullyQualifiedEntityName,
622+
version: Option[SemVer] = None)(implicit ec: ExecutionContext, transid: TransactionId): Future[WhiskAction] = {
538623
// first check that there is a package to be resolved
539624
val entityPath = fullyQualifiedName.path
540625
if (entityPath.defaultPackage) {
541626
// this is the default package, nothing to resolve
542-
WhiskAction.get(entityStore, fullyQualifiedName.toDocId)
627+
WhiskActionVersionList.get(fullyQualifiedName, entityStore).flatMap { result =>
628+
val docId = result.matchedDocId(version).getOrElse(fullyQualifiedName.toDocId)
629+
WhiskAction.get(entityStore, docId)
630+
}
543631
} else {
544632
// there is a package to be resolved
545633
val pkgDocid = fullyQualifiedName.path.toDocId
@@ -548,8 +636,12 @@ object WhiskAction extends DocumentFactory[WhiskAction] with WhiskEntityQueries[
548636
wp flatMap { resolvedPkg =>
549637
// fully resolved name for the action
550638
val fqnAction = resolvedPkg.fullyQualifiedName(withVersion = false).add(actionName)
639+
val action = WhiskActionVersionList.get(fqnAction, entityStore).flatMap { result =>
640+
val docId = result.matchedDocId(version).getOrElse(fqnAction.toDocId)
641+
WhiskAction.get(entityStore, docId)
642+
}
551643
// get the whisk action associate with it and inherit the parameters from the package/binding
552-
WhiskAction.get(entityStore, fqnAction.toDocId) map {
644+
action map {
553645
_.inherit(resolvedPkg.parameters)
554646
}
555647
}
@@ -571,6 +663,7 @@ object WhiskActionMetaData
571663
WhiskActionMetaData.apply,
572664
"namespace",
573665
"name",
666+
"_id",
574667
"exec",
575668
"parameters",
576669
"limits",
@@ -611,14 +704,19 @@ object WhiskActionMetaData
611704
* If it's the actual package, use its name directly as the package path name.
612705
* While traversing the package bindings, merge the parameters.
613706
*/
614-
def resolveActionAndMergeParameters(entityStore: EntityStore, fullyQualifiedName: FullyQualifiedEntityName)(
707+
def resolveActionAndMergeParameters(entityStore: EntityStore,
708+
fullyQualifiedName: FullyQualifiedEntityName,
709+
version: Option[SemVer] = None)(
615710
implicit ec: ExecutionContext,
616711
transid: TransactionId): Future[WhiskActionMetaData] = {
617712
// first check that there is a package to be resolved
618713
val entityPath = fullyQualifiedName.path
619714
if (entityPath.defaultPackage) {
620715
// this is the default package, nothing to resolve
621-
WhiskActionMetaData.get(entityStore, fullyQualifiedName.toDocId)
716+
WhiskActionVersionList.get(fullyQualifiedName, entityStore).flatMap { result =>
717+
val docId = result.matchedDocId(version).getOrElse(fullyQualifiedName.toDocId)
718+
WhiskActionMetaData.get(entityStore, docId)
719+
}
622720
} else {
623721
// there is a package to be resolved
624722
val pkgDocid = fullyQualifiedName.path.toDocId
@@ -627,8 +725,12 @@ object WhiskActionMetaData
627725
wp flatMap { resolvedPkg =>
628726
// fully resolved name for the action
629727
val fqnAction = resolvedPkg.fullyQualifiedName(withVersion = false).add(actionName)
728+
val action = WhiskActionVersionList.get(fqnAction, entityStore).flatMap { result =>
729+
val docId = result.matchedDocId(version).getOrElse(fqnAction.toDocId)
730+
WhiskActionMetaData.get(entityStore, docId)
731+
}
630732
// get the whisk action associate with it and inherit the parameters from the package/binding
631-
WhiskActionMetaData.get(entityStore, fqnAction.toDocId) map {
733+
action map {
632734
_.inherit(
633735
resolvedPkg.parameters,
634736
if (fullyQualifiedName.path.equals(resolvedPkg.fullPath)) None

common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskEntity.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ abstract class WhiskEntity protected[entity] (en: EntityName, val entityType: St
5858
FullyQualifiedEntityName(namespace, en, if (withVersion) Some(version) else None)
5959

6060
/** The primary key for the entity in the datastore */
61-
override final def docid = fullyQualifiedName(false).toDocId
61+
override def docid = fullyQualifiedName(false).toDocId
6262

6363
/**
6464
* Returns a JSON object with the fields specific to this abstract class.

0 commit comments

Comments
 (0)