Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ object OwSink {
*/
def combine[T, U, M1, M2](first: Sink[U, M1], second: Sink[U, M2])(
strategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, (M1, M2)] = {
Sink.fromGraph(GraphDSL.create(first, second)((_, _)) { implicit b => (s1, s2) =>
Sink.fromGraph(GraphDSL.createGraph(first, second)((_, _)) { implicit b => (s1, s2) =>
import GraphDSL.Implicits._
val d = b.add(strategy(2))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.pekko.http.scaladsl.model.Uri.Path
import org.apache.pekko.http.scaladsl.model.headers.Authorization
import org.apache.pekko.http.scaladsl.model.headers.BasicHttpCredentials
import org.apache.pekko.http.scaladsl.unmarshalling.Unmarshal
import org.apache.pekko.stream.OverflowStrategy
import org.apache.pekko.stream.QueueOfferResult
import org.apache.pekko.stream.scaladsl.Flow
import org.apache.pekko.stream.scaladsl.Keep
Expand Down Expand Up @@ -178,9 +177,10 @@ class SplunkLogStore(
.convertTo[String]}: ${l.fields(splunkConfig.logMessageField).convertTo[String].trim}"

//based on https://pekko.apache.org/docs/pekko-http/current/client-side/host-level.html
// BoundedSourceQueue automatically drops new elements when full, maintaining dropNew behavior
val queue =
Source
.queue[(HttpRequest, Promise[HttpResponse])](maxPendingRequests, OverflowStrategy.dropNew)
.queue[(HttpRequest, Promise[HttpResponse])](maxPendingRequests)
.via(httpFlow.getOrElse(defaultHttpFlow))
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
Expand All @@ -190,7 +190,7 @@ class SplunkLogStore(

def queueRequest(request: HttpRequest): Future[HttpResponse] = {
val responsePromise = Promise[HttpResponse]()
queue.offer(request -> responsePromise).flatMap {
queue.offer(request -> responsePromise) match {
case QueueOfferResult.Enqueued => responsePromise.future
case QueueOfferResult.Dropped =>
Future.failed(new RuntimeException("Splunk API Client Queue overflowed. Try again later."))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class Batcher[T, R](batchSize: Int, concurrency: Int, retry: Int)(operation: (Se
completionMatcher = cm,
failureMatcher = PartialFunction.empty[Any, Throwable],
bufferSize = Int.MaxValue,
overflowStrategy = OverflowStrategy.dropNew)
overflowStrategy = OverflowStrategy.dropBuffer)
.batch(batchSize, Queue(_))((queue, element) => queue :+ element)
.mapAsyncUnordered(concurrency) { els =>
val elements = els.map(_._1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private[database] object StoreUtils {

def combinedSink[T](dest: Sink[ByteString, Future[T]])(
implicit ec: ExecutionContext): Sink[ByteString, Future[AttachmentUploadResult[T]]] = {
Sink.fromGraph(GraphDSL.create(digestSink(), lengthSink(), dest)(combineResult) {
Sink.fromGraph(GraphDSL.createGraph(digestSink(), lengthSink(), dest)(combineResult) {
implicit builder => (dgs, ls, dests) =>
import GraphDSL.Implicits._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ class AzureBlobAttachmentStore(client: BlobContainerAsyncClient, prefix: String,
case None =>
blobClient.exists().toFuture.toScala.map { exists =>
if (exists) {
val bbFlux = blobClient.download()
val bbFlux = blobClient.downloadStream()
Some(Source.fromPublisher(bbFlux).map(ByteString.fromByteBuffer))
} else {
throw NoDocumentException("Not found on 'readAttachment'.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,14 @@ class S3AttachmentStore(s3Settings: S3Settings, bucket: String, prefix: String,
s"[ATT_GET] '$prefix' finding attachment '$name' of document 'id: $docId'")
val source = getAttachmentSource(objectKey(docId, name))

val f = source.flatMap {
case Some(x) => x.withAttributes(s3attributes).runWith(sink)
case None => Future.failed(NoDocumentException("Not found on 'readAttachment'."))
}
val f = source
.flatMap { x =>
x.withAttributes(s3attributes).runWith(sink)
}
.recoverWith {
case e: Throwable if isMissingKeyException(e) =>
Future.failed(NoDocumentException("Not found on 'readAttachment'."))
}

val g = f.transform(
{ s =>
Expand All @@ -164,16 +168,14 @@ class S3AttachmentStore(s3Settings: S3Settings, bucket: String, prefix: String,
s"[ATT_GET] '$prefix' internal error, name: '$name', doc: 'id: $docId', failure: '${failure.getMessage}'")
}

private def getAttachmentSource(objectKey: String): Future[Option[Source[ByteString, Any]]] = urlSigner match {
case Some(signer) => getUrlContent(signer.getSignedURL(objectKey))
private def getAttachmentSource(objectKey: String): Future[Source[ByteString, Any]] = urlSigner match {
case Some(signer) =>
getUrlContent(signer.getSignedURL(objectKey)).map(_.get)

// When reading from S3 we get an optional source of ByteString and Metadata if the object exist
// For such case drop the metadata
// S3.getObject returns Source[ByteString, Future[ObjectMetadata]].
// The materialized future will fail if the object doesn't exist, which is handled by the caller.
case None =>
S3.download(bucket, objectKey)
.withAttributes(s3attributes)
.runWith(Sink.head)
.map(x => x.map(_._1))
Future.successful(S3.getObject(bucket, objectKey).withAttributes(s3attributes))
}

private def getUrlContent(uri: Uri): Future[Option[Source[ByteString, Any]]] = {
Expand All @@ -182,10 +184,9 @@ class S3AttachmentStore(s3Settings: S3Settings, bucket: String, prefix: String,
case HttpResponse(status, _, entity, _) if status.isSuccess() && !status.isRedirection() =>
Future.successful(Some(entity.dataBytes))
case HttpResponse(status, _, entity, _) =>
Unmarshal(entity).to[String].map { err =>
Unmarshal(entity).to[String].flatMap { err =>
//With CloudFront also the error message confirms to same S3 exception format
val exp = S3Exception(err, status)
if (isMissingKeyException(exp)) None else throw exp
Future.failed(S3Exception(err, status))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ trait WhiskActionsApi extends WhiskCollectionAPI with PostActionActivation with
}
case Failure(t: RecordTooLargeException) =>
logging.debug(this, s"[POST] action payload was too large")
terminate(PayloadTooLarge)
terminate(ContentTooLarge)
case Failure(RejectRequest(code, message)) =>
logging.debug(this, s"[POST] action rejected with code $code: $message")
terminate(code, message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.concurrent.Future
import scala.language.postfixOps
import scala.util.Try
import org.apache.pekko.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import org.apache.pekko.http.scaladsl.model.StatusCodes.PayloadTooLarge
import org.apache.pekko.http.scaladsl.model.StatusCodes.ContentTooLarge
import org.apache.pekko.http.scaladsl.model.headers.RawHeader
import org.apache.pekko.http.scaladsl.server.Directive0
import org.apache.pekko.http.scaladsl.server.Directives
Expand All @@ -45,7 +45,7 @@ protected[controller] trait ValidateRequestSize extends Directives {
new Directive0 {
override def tapply(f: Unit => Route) = {
check map {
case e: SizeError => terminate(PayloadTooLarge, Messages.entityTooBig(e))
case e: SizeError => terminate(ContentTooLarge, Messages.entityTooBig(e))
} getOrElse f(None)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ trait WhiskPackagesApi extends WhiskCollectionAPI with ReferencedEntities {
logging.debug(this, s"fetching package '$docid' for reference")
if (docid == wp.docid) {
logging.error(this, s"unexpected package binding refers to itself: $docid")
terminate(UnprocessableEntity, Messages.packageBindingCircularReference(b.fullyQualifiedName.toString))
terminate(UnprocessableContent, Messages.packageBindingCircularReference(b.fullyQualifiedName.toString))
} else {

/** Here's where I check package execute only case with package binding. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class PackageCollection(entityStore: EntityStore)(implicit logging: Logging) ext
logging.error(this, s"unexpected package binding refers to itself: $doc")
Future.failed(
RejectRequest(
UnprocessableEntity,
UnprocessableContent,
Messages.packageBindingCircularReference(binding.fullyQualifiedName.toString)))
} else {
checkPackageReadPermission(namespaces, pkgOwner, pkgDocid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.openwhisk.core.containerpool.v2
import java.net.InetSocketAddress
import java.time.Instant
import org.apache.pekko.actor.Status.{Failure => FailureMessage}
import org.apache.pekko.actor.{actorRef2Scala, ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
import org.apache.pekko.actor.{ActorRef, ActorRefFactory, ActorSystem, FSM, Props, Stash}
import org.apache.pekko.event.Logging.InfoLevel
import org.apache.pekko.io.{IO, Tcp}
import org.apache.pekko.pattern.pipe
Expand Down
4 changes: 1 addition & 3 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,7 @@ if (scalaVersion == '2.12') {
depVersion : '2.12',
scoverageScalaVersion : '2.12.15',
scoverageVersion : '1.4.11',
// Temporarily disabled -Xfatal-warnings during Pekko migration to allow deprecated API usage
// BEFORE READY: Fix deprecation warnings and re-enable -Xfatal-warnings
compileFlags: ['-feature', '-unchecked', '-deprecation', '-Ywarn-unused-import']
compileFlags: ['-feature', '-unchecked', '-deprecation', '-Xfatal-warnings', '-Ywarn-unused-import']
]
} else {
println("Build using Scala 2.13")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
val exec: Exec = jsDefault(code)
val content = JsObject("exec" -> exec.toJson)
Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(creds)) ~> check {
status should be(PayloadTooLarge)
status should be(ContentTooLarge)
responseAs[String] should include {
Messages.entityTooBig(SizeError(WhiskAction.execFieldName, exec.size, Exec.sizeLimit))
}
Expand Down Expand Up @@ -513,7 +513,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
val content = JsObject("exec" -> exec.toJson)
put(entityStore, action)
Put(s"$collectionPath/${action.name}?overwrite=true", content) ~> Route.seal(routes(creds)) ~> check {
status should be(PayloadTooLarge)
status should be(ContentTooLarge)
responseAs[String] should include {
Messages.entityTooBig(SizeError(WhiskAction.execFieldName, exec.size, Exec.sizeLimit))
}
Expand All @@ -529,7 +529,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
} reduce (_ ++ _)
val content = s"""{"exec":{"kind":"nodejs:default","code":"??"},"parameters":$parameters}""".stripMargin
Put(s"$collectionPath/${aname()}", content.parseJson.asJsObject) ~> Route.seal(routes(creds)) ~> check {
status should be(PayloadTooLarge)
status should be(ContentTooLarge)
responseAs[String] should include {
Messages.entityTooBig(SizeError(WhiskEntity.paramsFieldName, parameters.size, Parameters.sizeLimit))
}
Expand Down Expand Up @@ -558,7 +558,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {

val content = s"""{"exec":{"kind":"nodejs:default","code":"??"},"parameters":$parameters}""".stripMargin
Put(s"$collectionPath/${aname()}", content.parseJson.asJsObject) ~> Route.seal(routes(credsWithLimits)) ~> check {
status should be(PayloadTooLarge)
status should be(ContentTooLarge)
responseAs[String] should include {
Messages.entityTooBig(SizeError(WhiskEntity.paramsFieldName, parameters.size, namespaceLimit))
}
Expand All @@ -574,7 +574,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
} reduce (_ ++ _)
val content = s"""{"exec":{"kind":"nodejs:default","code":"??"},"annotations":$annotations}""".stripMargin
Put(s"$collectionPath/${aname()}", content.parseJson.asJsObject) ~> Route.seal(routes(creds)) ~> check {
status should be(PayloadTooLarge)
status should be(ContentTooLarge)
responseAs[String] should include {
Messages.entityTooBig(SizeError(WhiskEntity.annotationsFieldName, annotations.size, Parameters.sizeLimit))
}
Expand Down Expand Up @@ -603,7 +603,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {

val content = s"""{"exec":{"kind":"nodejs:default","code":"??"},"annotations":$annotations}""".stripMargin
Put(s"$collectionPath/${aname()}", content.parseJson.asJsObject) ~> Route.seal(routes(credsWithLimits)) ~> check {
status should be(PayloadTooLarge)
status should be(ContentTooLarge)
responseAs[String] should include {
Messages.entityTooBig(SizeError(WhiskEntity.annotationsFieldName, annotations.size, namespaceLimit))
}
Expand Down Expand Up @@ -971,7 +971,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
val code = "a" * (systemPayloadLimit.toBytes.toInt + 1)
val content = s"""{"a":"$code"}""".stripMargin
Post(s"$collectionPath/${aname()}", content.parseJson.asJsObject) ~> Route.seal(routes(creds)) ~> check {
status should be(PayloadTooLarge)
status should be(ContentTooLarge)
responseAs[String] should include {
Messages.entityTooBig(SizeError(fieldDescriptionForSizeError, (content.length).B, systemPayloadLimit.toBytes.B))
}
Expand All @@ -983,7 +983,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi {
val code = "a" * (namespacePayloadLimit.toBytes.toInt + 1)
val content = s"""{"a":"$code"}""".stripMargin
Post(s"$collectionPath/${aname()}", content.parseJson.asJsObject) ~> Route.seal(routes(credsWithPayloadLimit)) ~> check {
status should be(PayloadTooLarge)
status should be(ContentTooLarge)
responseAs[String] should include {
Messages.entityTooBig(
SizeError(fieldDescriptionForSizeError, (content.length).B, namespacePayloadLimit.toBytes.B))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ class PackagesApiTests extends ControllerTestCommon with WhiskPackagesApi {
} reduce (_ ++ _)
val content = s"""{"annotations":$annotations}""".parseJson.asJsObject
Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(creds)) ~> check {
status should be(PayloadTooLarge)
status should be(ContentTooLarge)
responseAs[String] should include {
Messages.entityTooBig(SizeError(WhiskEntity.annotationsFieldName, annotations.size, Parameters.sizeLimit))
}
Expand All @@ -607,7 +607,7 @@ class PackagesApiTests extends ControllerTestCommon with WhiskPackagesApi {
} reduce (_ ++ _)
val content = s"""{"parameters":$parameters}""".parseJson.asJsObject
Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(creds)) ~> check {
status should be(PayloadTooLarge)
status should be(ContentTooLarge)
responseAs[String] should include {
Messages.entityTooBig(SizeError(WhiskEntity.paramsFieldName, parameters.size, Parameters.sizeLimit))
}
Expand All @@ -625,7 +625,7 @@ class PackagesApiTests extends ControllerTestCommon with WhiskPackagesApi {
val content = s"""{"parameters":$parameters}""".parseJson.asJsObject
put(entityStore, provider)
Put(s"$collectionPath/${aname()}?overwrite=true", content) ~> Route.seal(routes(creds)) ~> check {
status should be(PayloadTooLarge)
status should be(ContentTooLarge)
responseAs[String] should include {
Messages.entityTooBig(SizeError(WhiskEntity.paramsFieldName, parameters.size, Parameters.sizeLimit))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ class RulesApiTests extends ControllerTestCommon with WhiskRulesApi {
val t = get(entityStore, trigger.docid, WhiskTrigger)
deleteTrigger(t.docid)

status should be(PayloadTooLarge)
status should be(ContentTooLarge)
responseAs[String] should include {
Messages.entityTooBig(SizeError(WhiskEntity.annotationsFieldName, annotations.size, Parameters.sizeLimit))
}
Expand Down Expand Up @@ -556,7 +556,7 @@ class RulesApiTests extends ControllerTestCommon with WhiskRulesApi {
val t = get(entityStore, trigger.docid, WhiskTrigger)
deleteTrigger(t.docid)

status should be(PayloadTooLarge)
status should be(ContentTooLarge)
responseAs[String] should include {
Messages.entityTooBig(SizeError(WhiskEntity.annotationsFieldName, annotations.size, Parameters.sizeLimit))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ class TriggersApiTests extends ControllerTestCommon with WhiskTriggersApi {
val code = "a" * (systemPayloadLimit.toBytes.toInt + 1)
val content = s"""{"a":"$code"}""".stripMargin
Post(s"$collectionPath/${aname()}", content.parseJson.asJsObject) ~> Route.seal(routes(creds)) ~> check {
status should be(PayloadTooLarge)
status should be(ContentTooLarge)
responseAs[String] should include {
Messages.entityTooBig(SizeError(fieldDescriptionForSizeError, (content.length).B, systemPayloadLimit.toBytes.B))
}
Expand All @@ -289,7 +289,7 @@ class TriggersApiTests extends ControllerTestCommon with WhiskTriggersApi {
} reduce (_ ++ _)
val content = s"""{"parameters":$parameters}""".parseJson.asJsObject
Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(creds)) ~> check {
status should be(PayloadTooLarge)
status should be(ContentTooLarge)
responseAs[String] should include {
Messages.entityTooBig(SizeError(WhiskEntity.paramsFieldName, parameters.size, Parameters.sizeLimit))
}
Expand All @@ -305,7 +305,7 @@ class TriggersApiTests extends ControllerTestCommon with WhiskTriggersApi {
} reduce (_ ++ _)
val content = s"""{"annotations":$annotations}""".parseJson.asJsObject
Put(s"$collectionPath/${aname()}", content) ~> Route.seal(routes(creds)) ~> check {
status should be(PayloadTooLarge)
status should be(ContentTooLarge)
responseAs[String] should include {
Messages.entityTooBig(SizeError(WhiskEntity.annotationsFieldName, annotations.size, Parameters.sizeLimit))
}
Expand All @@ -323,7 +323,7 @@ class TriggersApiTests extends ControllerTestCommon with WhiskTriggersApi {
val content = s"""{"parameters":$parameters}""".parseJson.asJsObject
put(entityStore, trigger)
Put(s"$collectionPath/${trigger.name}?overwrite=true", content) ~> Route.seal(routes(creds)) ~> check {
status should be(PayloadTooLarge)
status should be(ContentTooLarge)
responseAs[String] should include {
Messages.entityTooBig(SizeError(WhiskEntity.paramsFieldName, parameters.size, Parameters.sizeLimit))
}
Expand Down
Loading
Loading