Skip to content

Commit 265995f

Browse files
committed
Implement multipart for netty-future, netty-cats & netty-zio
1 parent 31d1eac commit 265995f

27 files changed

Lines changed: 420 additions & 285 deletions

File tree

build.sbt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ val commonSettings = commonSmlBuildSettings ++ ossPublishSettings ++ Seq(
9292
evictionErrorLevel := Level.Info
9393
)
9494

95+
val testSettings = Seq(
96+
scalacOptions += "-Wconf:msg=unused value of type org.scalatest.Assertion:s",
97+
scalacOptions += "-Wconf:msg=unused value of type org.scalatest.compatible.Assertion:s"
98+
)
99+
95100
val versioningSchemeSettings = Seq(versionScheme := Some("early-semver"))
96101

97102
val enableMimaSettings = Seq(
@@ -539,6 +544,7 @@ lazy val testing: ProjectMatrix = (projectMatrix in file("testing"))
539544

540545
lazy val tests: ProjectMatrix = (projectMatrix in file("tests"))
541546
.settings(commonSettings)
547+
.settings(testSettings)
542548
.settings(
543549
name := "tapir-tests",
544550
libraryDependencies ++= Seq(
@@ -1375,6 +1381,7 @@ lazy val serverCore: ProjectMatrix = (projectMatrix in file("server/core"))
13751381

13761382
lazy val serverTests: ProjectMatrix = (projectMatrix in file("server/tests"))
13771383
.settings(commonSettings)
1384+
.settings(testSettings)
13781385
.settings(
13791386
name := "tapir-server-tests",
13801387
libraryDependencies ++= Seq(
@@ -1646,6 +1653,7 @@ lazy val nettyServer: ProjectMatrix = (projectMatrix in file("server/netty-serve
16461653
"io.netty" % "netty-all" % Versions.nettyAll,
16471654
"org.playframework.netty" % "netty-reactive-streams-http" % Versions.nettyReactiveStreams,
16481655
"org.apache.httpcomponents" % "httpmime" % "4.5.14",
1656+
"org.scala-lang.modules" %% "scala-java8-compat" % Versions.scalaJava8Compat,
16491657
slf4j
16501658
)
16511659
)
@@ -2059,6 +2067,7 @@ lazy val awsExamples2_13 = awsExamples.jvm(scala2_13).dependsOn(awsSam.jvm(scala
20592067

20602068
lazy val clientTests: ProjectMatrix = (projectMatrix in file("client/tests"))
20612069
.settings(commonSettings)
2070+
.settings(testSettings)
20622071
.settings(
20632072
name := "tapir-client-tests"
20642073
)
@@ -2326,6 +2335,7 @@ lazy val openapiCodegenCli: ProjectMatrix = (projectMatrix in file("openapi-code
23262335

23272336
lazy val examples: ProjectMatrix = (projectMatrix in file("examples"))
23282337
.settings(commonSettings)
2338+
.settings(testSettings)
23292339
.settings(
23302340
name := "tapir-examples",
23312341
libraryDependencies ++= Seq(

perf-tests/perf-tests-e2e/src/main/scala/sttp/tapir/perf/netty/cats/NettyCats.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ import sttp.tapir.server.netty.cats.NettyCatsServer
1212
import sttp.tapir.server.netty.cats.NettyCatsServerOptions
1313
import sttp.capabilities.fs2.Fs2Streams
1414

15-
import scala.concurrent.duration._
16-
1715
object Tapir extends Endpoints {
1816
val wsResponseStream = Stream.fixedRate[IO](WebSocketSingleResponseLag, dampen = false)
1917
val wsEndpoint = wsBaseEndpoint
Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,36 @@
11
package sttp.tapir.server.netty.cats.internal
22

33
import cats.effect.Async
4-
import cats.syntax.all._
54
import fs2.Chunk
65
import fs2.io.file.{Files, Path}
76
import io.netty.handler.codec.http.HttpContent
8-
import org.playframework.netty.http.StreamedHttpRequest
97
import org.reactivestreams.Publisher
108
import sttp.capabilities.fs2.Fs2Streams
119
import sttp.monad.MonadError
1210
import sttp.tapir.integ.cats.effect.CatsMonadError
1311
import sttp.tapir.model.ServerRequest
14-
import sttp.tapir.server.interpreter.RawValue
1512
import sttp.tapir.server.netty.internal.{NettyStreamingRequestBody, StreamCompatible}
16-
import sttp.tapir.{RawBodyType, RawPart, TapirFile}
13+
import sttp.tapir.{RawPart, TapirFile}
1714

1815
private[cats] class NettyCatsRequestBody[F[_]: Async](
1916
val createFile: ServerRequest => F[TapirFile],
20-
val streamCompatible: StreamCompatible[Fs2Streams[F]]
21-
) extends NettyStreamingRequestBody[F, Fs2Streams[F]] {
17+
val streamCompatible: StreamCompatible[Fs2Streams[F]],
18+
multipartTempDirectory: Option[TapirFile] = None,
19+
multipartMinSizeForDisk: Option[Long] = None
20+
) extends NettyStreamingRequestBody[F, Fs2Streams[F]](
21+
multipartTempDirectory,
22+
multipartMinSizeForDisk
23+
) {
2224

2325
override implicit val monad: MonadError[F] = new CatsMonadError()
2426

27+
import cats.implicits._
28+
29+
override protected val listMonadToMonadOfList: List[F[RawPart]] => F[List[RawPart]] = _.sequence
30+
2531
override def publisherToBytes(publisher: Publisher[HttpContent], contentLength: Option[Long], maxBytes: Option[Long]): F[Array[Byte]] =
2632
streamCompatible.fromPublisher(publisher, maxBytes).compile.to(Chunk).map(_.toArray[Byte])
2733

28-
def publisherToMultipart(
29-
nettyRequest: StreamedHttpRequest,
30-
serverRequest: ServerRequest,
31-
m: RawBodyType.MultipartBody,
32-
maxBytes: Option[Long]
33-
): F[RawValue[Seq[RawPart]]] = monad.error(new UnsupportedOperationException("Multipart requests are not supported"))
34-
3534
override def writeToFile(serverRequest: ServerRequest, file: TapirFile, maxBytes: Option[Long]): F[Unit] =
3635
(toStream(serverRequest, maxBytes)
3736
.asInstanceOf[streamCompatible.streams.BinaryStream])
@@ -42,5 +41,5 @@ private[cats] class NettyCatsRequestBody[F[_]: Async](
4241
.drain
4342

4443
override def writeBytesToFile(bytes: Array[Byte], file: TapirFile): F[Unit] =
45-
monad.error(new UnsupportedOperationException("Multipart requests are not supported"))
44+
Async[F].fromFuture(Async[F].delay(writeBytesToFileFuture(bytes, file)))
4645
}

server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,15 @@ class NettyCatsServerTest extends TestSuite with EitherValues {
2424

2525
val interpreter = new NettyCatsTestServerInterpreter(eventLoopGroup, dispatcher)
2626
val createServerTest = new DefaultCreateServerTest(backend, interpreter)
27-
val ioSleeper: Sleeper[IO] = new Sleeper[IO] {
28-
override def sleep(duration: FiniteDuration): IO[Unit] = IO.sleep(duration)
29-
}
27+
val ioSleeper: Sleeper[IO] = (duration: FiniteDuration) => IO.sleep(duration)
3028
def drainFs2(stream: Fs2Streams[IO]#BinaryStream): IO[Unit] =
3129
stream.compile.drain.void
3230

3331
val tests = new AllServerTests(
3432
createServerTest,
3533
interpreter,
3634
backend,
37-
multipart = false
35+
partOtherHeaderSupport = false
3836
)
3937
.tests() ++
4038
new ServerStreamingTests(createServerTest).tests(Fs2Streams[IO])(drainFs2) ++

server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsTestServerInterpreter.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,17 @@ package sttp.tapir.server.netty.cats
22

33
import cats.effect.std.Dispatcher
44
import cats.effect.{IO, Resource}
5-
import io.netty.channel.nio.NioEventLoopGroup
5+
import io.netty.channel.EventLoopGroup
66
import sttp.capabilities.WebSockets
77
import sttp.tapir.server.ServerEndpoint
88
import sttp.tapir.server.netty.{NettyConfig, Route}
99
import sttp.tapir.server.tests.TestServerInterpreter
1010
import sttp.tapir.tests._
1111
import sttp.capabilities.fs2.Fs2Streams
12+
1213
import scala.concurrent.duration.FiniteDuration
1314

14-
class NettyCatsTestServerInterpreter(eventLoopGroup: NioEventLoopGroup, dispatcher: Dispatcher[IO])
15+
class NettyCatsTestServerInterpreter(eventLoopGroup: EventLoopGroup, dispatcher: Dispatcher[IO])
1516
extends TestServerInterpreter[IO, Fs2Streams[IO] with WebSockets, NettyCatsServerOptions[IO], Route[IO]] {
1617
override def route(es: List[ServerEndpoint[Fs2Streams[IO] with WebSockets, IO]], interceptors: Interceptors): Route[IO] = {
1718
val serverOptions: NettyCatsServerOptions[IO] = interceptors(

server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyFutureServerInterpreter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ trait NettyFutureServerInterpreter {
2020
NettyServerInterpreter.toRoute(
2121
ses,
2222
nettyServerOptions.interceptors,
23-
new NettyFutureRequestBody(nettyServerOptions.createFile),
23+
new NettyFutureRequestBody(nettyServerOptions.createFile, None, None),
2424
new NettyToResponseBody[Future](RunAsync.Future),
2525
nettyServerOptions.deleteFile,
2626
RunAsync.Future

server/netty-server/src/main/scala/sttp/tapir/server/netty/NettyServerRequest.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import scala.collection.JavaConverters._
44
import scala.collection.immutable.Seq
55
import io.netty.channel.ChannelHandlerContext
66
import io.netty.handler.codec.http.{HttpRequest, QueryStringDecoder}
7-
import io.netty.handler.ssl.SslHandler
87
import sttp.model.{Header, Method, QueryParams, Uri}
98
import sttp.tapir.{AttributeKey, AttributeMap}
109
import sttp.tapir.model.{ConnectionInfo, ServerRequest}

server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyBootstrap.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package sttp.tapir.server.netty.internal
22

33
import io.netty.bootstrap.ServerBootstrap
4-
import io.netty.channel.{Channel, ChannelFuture, ChannelHandler, ChannelInitializer, ChannelOption, EventLoopGroup}
4+
import io.netty.channel.{Channel, ChannelFuture, ChannelInitializer, ChannelOption, EventLoopGroup}
55
import sttp.tapir.server.netty.NettyConfig
66

77
import java.net.{InetSocketAddress, SocketAddress}

server/netty-server/src/main/scala/sttp/tapir/server/netty/internal/NettyFutureRequestBody.scala

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,40 +7,38 @@ import sttp.capabilities
77
import sttp.monad.{FutureMonad, MonadError}
88
import sttp.tapir.capabilities.NoStreams
99
import sttp.tapir.model.ServerRequest
10-
import sttp.tapir.server.interpreter.RawValue
1110
import sttp.tapir.server.netty.internal.reactivestreams._
12-
import sttp.tapir.{RawBodyType, RawPart, TapirFile}
11+
import sttp.tapir.{RawPart, TapirFile}
1312

1413
import scala.concurrent.{ExecutionContext, Future}
1514

16-
private[netty] class NettyFutureRequestBody(val createFile: ServerRequest => Future[TapirFile])(implicit ec: ExecutionContext)
17-
extends NettyRequestBody[Future, NoStreams] {
15+
private[netty] class NettyFutureRequestBody(
16+
val createFile: ServerRequest => Future[TapirFile],
17+
multipartTempDirectory: Option[TapirFile],
18+
multipartMinSizeForDisk: Option[Long]
19+
)(implicit ec: ExecutionContext)
20+
extends NettyRequestBodyWithMultipartF[Future, NoStreams](multipartTempDirectory, multipartMinSizeForDisk) {
1821

1922
override val streams: capabilities.Streams[NoStreams] = NoStreams
2023
override implicit val monad: MonadError[Future] = new FutureMonad()
2124

25+
protected val listMonadToMonadOfList: List[Future[RawPart]] => Future[List[RawPart]] = l => Future.sequence(l)
26+
2227
override def publisherToBytes(
2328
publisher: Publisher[HttpContent],
2429
contentLength: Option[Long],
2530
maxBytes: Option[Long]
2631
): Future[Array[Byte]] =
2732
SimpleSubscriber.processAll(publisher, contentLength, maxBytes)
2833

29-
override def publisherToMultipart(
30-
nettyRequest: StreamedHttpRequest,
31-
serverRequest: ServerRequest,
32-
m: RawBodyType.MultipartBody,
33-
maxBytes: Option[Long]
34-
): Future[RawValue[Seq[RawPart]]] = Future.failed(new UnsupportedOperationException("Multipart requests are not supported"))
35-
3634
override def writeToFile(serverRequest: ServerRequest, file: TapirFile, maxBytes: Option[Long]): Future[Unit] =
3735
serverRequest.underlying match {
3836
case r: StreamedHttpRequest => FileWriterSubscriber.processAll(r, file.toPath, maxBytes)
3937
case _ => monad.unit(()) // Empty request
4038
}
4139

4240
override def writeBytesToFile(bytes: Array[Byte], file: TapirFile): Future[Unit] =
43-
Future.failed(new UnsupportedOperationException("Multipart requests are not supported"))
41+
writeBytesToFileFuture(bytes, file)
4442

4543
override def toStream(serverRequest: ServerRequest, maxBytes: Option[Long]): streams.BinaryStream =
4644
throw new UnsupportedOperationException()
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package sttp.tapir.server.netty.internal
2+
3+
import io.netty.handler.codec.http.multipart.{DefaultHttpDataFactory, HttpDataFactory}
4+
import sttp.capabilities.Streams
5+
import sttp.tapir.TapirFile
6+
7+
private[netty] abstract class NettyRequestBodyWithMultipart[F[_], S <: Streams[S]](
8+
multipartTempDirectory: Option[TapirFile],
9+
multipartMinSizeForDisk: Option[Long]
10+
) extends NettyRequestBody[F, S] {
11+
protected val httpDataFactory: HttpDataFactory = {
12+
val factory = multipartMinSizeForDisk match {
13+
case Some(minSize) => new DefaultHttpDataFactory(minSize)
14+
case None => new DefaultHttpDataFactory()
15+
}
16+
multipartTempDirectory.foreach(dir => factory.setBaseDir(dir.getPath))
17+
factory
18+
}
19+
20+
}

0 commit comments

Comments
 (0)