From c3308149be4100aa15c0fefe83211a85153eb322 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Mon, 25 Jul 2016 01:50:55 +0200 Subject: [PATCH] +htp #18837 allow as[Source[Tweet, NotUsed]] --- .../JsonStreamingExamplesSpec.scala | 2 ++ .../http/scaladsl/server/TestServer.scala | 3 +-- .../FramedEntityStreamingDirectives.scala | 24 ++++++++++++------- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala index bb30f99612..f019b6fef5 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala @@ -111,6 +111,8 @@ class JsonStreamingExamplesSpec extends RoutingSpec { path("metrics") { // [4] extract Source[Measurement, _] entity(asSourceOf[Measurement]) { measurements => + // alternative syntax: + // entity(as[Source[Measurement, NotUsed]]) { measurements => val measurementsSubmitted: Future[Int] = measurements .via(persistMetrics) diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala index b4901750d5..7711dc1b88 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala @@ -89,8 +89,7 @@ object TestServer extends App { complete(ToResponseMarshallable(tweets)) } ~ post { - entity(asSourceOf[Tweet]) { tweets ⇒ - // entity(asSourceOf[Tweet](bracketCountingJsonFraming(1024))) { tweets: Source[Tweet, NotUsed] ⇒ + entity(as[Source[Tweet, NotUsed]]) { tweets ⇒ complete(s"Total tweets received: " + tweets.runFold(0)({ case (acc, t) => acc + 1 })) } } diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala index 1e97170a11..61c39c6843 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala @@ -146,22 +146,30 @@ trait FramedEntityStreamingDirectives extends MarshallingDirectives { private final def asSourceOfInternal[T](framing: FramingWithContentType, marshalling: (ExecutionContext, Materializer) => Flow[ByteString, ByteString, NotUsed]#ReprMat[T, NotUsed]): RequestToSourceUnmarshaller[T] = Unmarshaller.withMaterializer[HttpRequest, Source[T, NotUsed]] { implicit ec ⇒ implicit mat ⇒ req ⇒ val entity = req.entity - if (!framing.matches(entity.contentType)) { - val supportedContentTypes = framing.supported - FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(supportedContentTypes)) - } else { + if (framing.matches(entity.contentType)) { val bytes = entity.dataBytes val frames = bytes.viaMat(framing.flow)(Keep.right) val elements = frames.viaMat(marshalling(ec, mat))(Keep.right) - val stream = elements.mapMaterializedValue(_ => NotUsed) -// val stream = Source.single(entity.transformDataBytes(framing.flow)).via(marshalling(ec, mat)).mapMaterializedValue(_ => NotUsed) - FastFuture.successful(stream) - } + FastFuture.successful(elements) + + } else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(framing.supported)) } // format: ON // TODO note to self - we need the same of ease of streaming stuff for the client side - i.e. the twitter firehose case. + implicit def _asSourceUnmarshaller[T](implicit fem: FromEntityUnmarshaller[T], framing: FramingWithContentType): FromRequestUnmarshaller[Source[T, NotUsed]] = { + Unmarshaller.withMaterializer[HttpRequest, Source[T, NotUsed]] { implicit ec ⇒ implicit mat ⇒ req ⇒ + val entity = req.entity + if (framing.matches(entity.contentType)) { + val bytes = entity.dataBytes + val frames = bytes.viaMat(framing.flow)(Keep.right) + val elements = frames.viaMat(Flow[ByteString].map(HttpEntity(entity.contentType, _)).mapAsync(1)(Unmarshal(_).to[T](fem, ec, mat)))(Keep.right) + FastFuture.successful(elements) + } else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(framing.supported)) + } + } + implicit def _sourceMarshaller[T, M](implicit m: ToEntityMarshaller[T], mode: SourceRenderingMode): ToResponseMarshaller[Source[T, M]] = Marshaller[Source[T, M], HttpResponse] { implicit ec ⇒ source ⇒ FastFuture successful {