diff --git a/akka-docs/rst/java/code/docs/http/javadsl/server/JsonStreamingExamplesTest.java b/akka-docs/rst/java/code/docs/http/javadsl/server/JsonStreamingExamplesTest.java new file mode 100644 index 0000000000..acc6bbfac2 --- /dev/null +++ b/akka-docs/rst/java/code/docs/http/javadsl/server/JsonStreamingExamplesTest.java @@ -0,0 +1,101 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package docs.http.javadsl.server; + +import akka.NotUsed; +import akka.http.javadsl.common.FramingWithContentType; +import akka.http.javadsl.common.JsonSourceRenderingModes; +import akka.http.javadsl.marshallers.jackson.Jackson; +import akka.http.javadsl.model.*; +import akka.http.javadsl.model.headers.Accept; +import akka.http.javadsl.server.*; +import akka.http.javadsl.testkit.JUnitRouteTest; +import akka.http.javadsl.testkit.TestRoute; +import akka.stream.javadsl.Source; +import akka.util.ByteString; +import docs.http.javadsl.server.testkit.MyAppService; +import org.junit.Test; + +import java.util.concurrent.CompletionStage; + +public class JsonStreamingExamplesTest extends JUnitRouteTest { + + //#routes + final Route tweets() { + //#formats + final Unmarshaller JavaTweets = Jackson.byteStringUnmarshaller(JavaTweet.class); + //#formats + + //#response-streaming + final Route responseStreaming = path("tweets", () -> + get(() -> + parameter(StringUnmarshallers.INTEGER, "n", n -> { + final Source tws = + Source.repeat(new JavaTweet("Hello World!")).take(n); + return completeOKWithSource(tws, Jackson.marshaller(), JsonSourceRenderingModes.arrayCompact()); + }) + ) + ); + //#response-streaming + + //#incoming-request-streaming + final Route incomingStreaming = path("tweets", () -> + post(() -> + extractMaterializer(mat -> { + final FramingWithContentType jsonFraming = EntityStreamingSupport.bracketCountingJsonFraming(128); + + return entityasSourceOf(JavaTweets, jsonFraming, sourceOfTweets -> { + final CompletionStage tweetsCount = sourceOfTweets.runFold(0, (acc, tweet) -> acc + 1, mat); + return onComplete(tweetsCount, c -> complete("Total number of tweets: " + c)); + }); + } + ) + ) + ); + //#incoming-request-streaming + + return responseStreaming.orElse(incomingStreaming); + } + //#routes + + @Test + public void getTweetsTest() { + //#response-streaming + // tests: + final TestRoute routes = testRoute(tweets()); + + // test happy path + final Accept acceptApplication = Accept.create(MediaRanges.create(MediaTypes.APPLICATION_JSON)); + routes.run(HttpRequest.GET("/tweets?n=2").addHeader(acceptApplication)) + .assertStatusCode(200) + .assertEntity("[{\"message\":\"Hello World!\"},{\"message\":\"Hello World!\"}]"); + + // test responses to potential errors + final Accept acceptText = Accept.create(MediaRanges.ALL_TEXT); + routes.run(HttpRequest.GET("/tweets?n=3").addHeader(acceptText)) + .assertStatusCode(StatusCodes.NOT_ACCEPTABLE) // 406 + .assertEntity("Resource representation is only available with these types:\napplication/json"); + //#response-streaming + } + + //#models + private static final class JavaTweet { + private String message; + + public JavaTweet(String message) { + this.message = message; + } + + public void setMessage(String message) { + this.message = message; + } + + public String getMessage() { + return message; + } + + } + //#models +} diff --git a/akka-docs/rst/java/code/docs/http/javadsl/server/directives/FramedEntityStreamingExamplesTest.java b/akka-docs/rst/java/code/docs/http/javadsl/server/directives/FramedEntityStreamingExamplesTest.java deleted file mode 100644 index aee10b1935..0000000000 --- a/akka-docs/rst/java/code/docs/http/javadsl/server/directives/FramedEntityStreamingExamplesTest.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (C) 2015-2016 Lightbend Inc. - */ - -package docs.http.javadsl.server.directives; - -import akka.http.javadsl.model.HttpRequest; -import akka.http.javadsl.model.HttpResponse; -import akka.http.javadsl.server.Route; -import akka.http.javadsl.server.directives.FramedEntityStreamingDirectives; -import akka.http.javadsl.server.directives.LogEntry; -import akka.http.javadsl.testkit.JUnitRouteTest; -import akka.http.scaladsl.server.Rejection; -import org.junit.Test; - -import java.util.List; -import java.util.Optional; -import java.util.function.BiFunction; -import java.util.function.Function; -import java.util.stream.Collectors; - -import static akka.event.Logging.InfoLevel; - -import static akka.http.javadsl.server.directives.FramedEntityStreamingDirectives.*; - -public class FramedEntityStreamingExamplesTest extends JUnitRouteTest { - - @Test - public void testRenderSource() { - FramedEntityStreamingDirectives. - } - -} diff --git a/akka-docs/rst/java/http/routing-dsl/index.rst b/akka-docs/rst/java/http/routing-dsl/index.rst index f84f20759f..85fff2bcb5 100644 --- a/akka-docs/rst/java/http/routing-dsl/index.rst +++ b/akka-docs/rst/java/http/routing-dsl/index.rst @@ -18,6 +18,7 @@ To use the high-level API you need to add a dependency to the ``akka-http-experi directives/index marshalling exception-handling + source-streaming-support rejections testkit @@ -51,7 +52,6 @@ in the :ref:`exception-handling-java` section of the documtnation. You can use t File uploads ^^^^^^^^^^^^ -TODO not possible in Java DSL since there For high level directives to handle uploads see the :ref:`FileUploadDirectives-java`. diff --git a/akka-docs/rst/java/http/routing-dsl/source-streaming-support.rst b/akka-docs/rst/java/http/routing-dsl/source-streaming-support.rst new file mode 100644 index 0000000000..b83de64dc3 --- /dev/null +++ b/akka-docs/rst/java/http/routing-dsl/source-streaming-support.rst @@ -0,0 +1,74 @@ +.. _json-streaming-java: + +Source Streaming +================ + +Akka HTTP supports completing a request with an Akka ``Source``, which makes it possible to very easily build +streaming end-to-end APIs which apply back-pressure throughout the entire stack. + +It is possible to complete requests with raw ``Source``, however often it is more convenient to +stream on an element-by-element basis, and allow Akka HTTP to handle the rendering internally - for example as a JSON array, +or CSV stream (where each element is separated by a new-line). + +In the following sections we investigate how to make use of the JSON Streaming infrastructure, +however the general hints apply to any kind of element-by-element streaming you could imagine. + +It is possible to implement your own framing for any content type you might need, including bianary formats +by implementing :class:`FramingWithContentType`. + +JSON Streaming +============== + +`JSON Streaming`_ is a term refering to streaming a (possibly infinite) stream of element as independent JSON +objects as a continuous HTTP request or response. The elements are most often separated using newlines, +however do not have to be. Concatenating elements side-by-side or emitting "very long" JSON array is also another +use case. + +In the below examples, we'll be refering to the ``User`` and ``Measurement`` case classes as our model, which are defined as: + +.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#models + +.. _Json Streaming: https://en.wikipedia.org/wiki/JSON_Streaming + +Responding with JSON Streams +---------------------------- + +In this example we implement an API representing an infinite stream of tweets, very much like Twitter's `Streaming API`_. + +Firstly, we'll need to get some additional marshalling infrastructure set up, that is able to marshal to and from an +Akka Streams ``Source``. One such trait, containing the needed marshallers is ``SprayJsonSupport``, which uses +spray-json (a high performance json parser library), and is shipped as part of Akka HTTP in the +``akka-http-spray-json-experimental`` module. + +The last bit of setup, before we can render a streaming json response + +.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#response-streaming + +.. _Streaming API: https://dev.twitter.com/streaming/overview + +Consuming JSON Streaming uploads +-------------------------------- + +Sometimes the client may be sending a streaming request, for example an embedded device initiated a connection with +the server and is feeding it with one line of measurement data. + +In this example, we want to consume this data in a streaming fashion from the request entity, and also apply +back-pressure to the underlying TCP connection, if the server can not cope with the rate of incoming data (back-pressure +will be applied automatically thanks to using Akka HTTP/Streams). + +.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#formats + +.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#incoming-request-streaming + +Implementing custom (Un)Marshaller support for JSON streaming +------------------------------------------------------------- + +The following types that may need to be implemented by a custom framed-streaming support library are: + +- ``SourceRenderingMode`` which can customise how to render the begining / between-elements and ending of such + stream (while writing a response, i.e. by calling ``complete(source)``). + Implementations for JSON are available in ``akka.http.scaladsl.common.JsonSourceRenderingMode``. +- ``FramingWithContentType`` which is needed to be able to split incoming ``ByteString`` + chunks into frames of the higher-level data type format that is understood by the provided unmarshallers. + In the case of JSON it means chunking up ByteStrings such that each emitted element corresponds to exactly one JSON object, + this framing is implemented in ``EntityStreamingSupport``. diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala index b354504829..bb30f99612 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala @@ -5,13 +5,13 @@ package docs.http.scaladsl.server.directives import akka.NotUsed +import akka.http.scaladsl.common.{ FramingWithContentType, JsonSourceRenderingModes } import akka.http.scaladsl.marshalling.ToResponseMarshallable import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.Accept -import akka.http.scaladsl.server.{ UnsupportedRequestContentTypeRejection, UnacceptedResponseContentTypeRejection, JsonSourceRenderingMode } +import akka.http.scaladsl.server.{ UnacceptedResponseContentTypeRejection, UnsupportedRequestContentTypeRejection } import akka.stream.scaladsl.{ Flow, Source } import docs.http.scaladsl.server.RoutingSpec -import spray.json.{ JsValue, JsObject, DefaultJsonProtocol } import scala.concurrent.Future @@ -30,7 +30,7 @@ class JsonStreamingExamplesSpec extends RoutingSpec { //#formats object MyJsonProtocol extends spray.json.DefaultJsonProtocol { - implicit val userFormat = jsonFormat2(Tweet.apply) + implicit val tweetFormat = jsonFormat2(Tweet.apply) implicit val measurementFormat = jsonFormat2(Measurement.apply) } //# @@ -43,19 +43,22 @@ class JsonStreamingExamplesSpec extends RoutingSpec { import MyJsonProtocol._ // [3] pick json rendering mode: - implicit val jsonRenderingMode = JsonSourceRenderingMode.LineByLine + // HINT: if you extend `akka.http.scaladsl.server.EntityStreamingSupport` + // it'll guide you to do so via abstract defs + val maximumObjectLength = 128 + implicit val jsonRenderingMode = JsonSourceRenderingModes.LineByLine val route = - path("users") { - val users: Source[Tweet, NotUsed] = getTweets() - complete(ToResponseMarshallable(users)) + path("tweets") { + val tweets: Source[Tweet, NotUsed] = getTweets() + complete(ToResponseMarshallable(tweets)) } // tests: val AcceptJson = Accept(MediaRange(MediaTypes.`application/json`)) val AcceptXml = Accept(MediaRange(MediaTypes.`text/xml`)) - Get("/users").withHeaders(AcceptJson) ~> route ~> check { + Get("/tweets").withHeaders(AcceptJson) ~> route ~> check { responseAs[String] shouldEqual """{"uid":1,"txt":"#Akka rocks!"}""" + "\n" + """{"uid":2,"txt":"Streaming is so hot right now!"}""" + "\n" + @@ -63,7 +66,7 @@ class JsonStreamingExamplesSpec extends RoutingSpec { } // endpoint can only marshal Json, so it will *reject* requests for application/xml: - Get("/users").withHeaders(AcceptXml) ~> route ~> check { + Get("/tweets").withHeaders(AcceptXml) ~> route ~> check { handled should ===(false) rejection should ===(UnacceptedResponseContentTypeRejection(Set(ContentTypes.`application/json`))) } @@ -72,19 +75,19 @@ class JsonStreamingExamplesSpec extends RoutingSpec { "response-streaming-modes" in { import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import MyJsonProtocol._ - implicit val jsonRenderingMode = JsonSourceRenderingMode.LineByLine + implicit val jsonRenderingMode = JsonSourceRenderingModes.LineByLine //#async-rendering - path("users") { - val users: Source[Tweet, NotUsed] = getTweets() - complete(users.renderAsync(parallelism = 8)) + path("tweets") { + val tweets: Source[Tweet, NotUsed] = getTweets() + complete(tweets.renderAsync(parallelism = 8)) } //# //#async-unordered-rendering - path("users" / "unordered") { - val users: Source[Tweet, NotUsed] = getTweets() - complete(users.renderAsyncUnordered(parallelism = 8)) + path("tweets" / "unordered") { + val tweets: Source[Tweet, NotUsed] = getTweets() + complete(tweets.renderAsyncUnordered(parallelism = 8)) } //# } @@ -94,7 +97,9 @@ class JsonStreamingExamplesSpec extends RoutingSpec { import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ // [1.1] import framing mode - implicit val jsonFramingMode = akka.http.scaladsl.server.JsonEntityFramingSupport.bracketCountingJsonFraming(Int.MaxValue) + import akka.http.scaladsl.server.EntityStreamingSupport + implicit val jsonFramingMode: FramingWithContentType = + EntityStreamingSupport.bracketCountingJsonFraming(Int.MaxValue) // [2] import "my protocol", for unmarshalling Measurement objects: import MyJsonProtocol._ @@ -106,14 +111,10 @@ class JsonStreamingExamplesSpec extends RoutingSpec { path("metrics") { // [4] extract Source[Measurement, _] entity(asSourceOf[Measurement]) { measurements => - println("measurements = " + measurements) val measurementsSubmitted: Future[Int] = measurements .via(persistMetrics) - .runFold(0) { (cnt, _) => - println("cnt = " + cnt) - cnt + 1 - } + .runFold(0) { (cnt, _) => cnt + 1 } complete { measurementsSubmitted.map(n => Map("msg" -> s"""Total metrics received: $n""")) diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/MarshallingDirectivesExamplesSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/MarshallingDirectivesExamplesSpec.scala index a0720452cd..8f7c855dc9 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/MarshallingDirectivesExamplesSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/MarshallingDirectivesExamplesSpec.scala @@ -71,8 +71,8 @@ class MarshallingDirectivesExamplesSpec extends RoutingSpec { // tests: Get("/") ~> route ~> check { mediaType shouldEqual `application/json` - responseAs[String] should include(""""name": "Jane"""") - responseAs[String] should include(""""favoriteNumber": 42""") + responseAs[String] should include(""""name":"Jane"""") + responseAs[String] should include(""""favoriteNumber":42""") } } @@ -95,8 +95,8 @@ class MarshallingDirectivesExamplesSpec extends RoutingSpec { Post("/", HttpEntity(`application/json`, """{ "name": "Jane", "favoriteNumber" : 42 }""")) ~> route ~> check { mediaType shouldEqual `application/json` - responseAs[String] should include(""""name": "Jane"""") - responseAs[String] should include(""""favoriteNumber": 42""") + responseAs[String] should include(""""name":"Jane"""") + responseAs[String] should include(""""favoriteNumber":42""") } } } diff --git a/akka-docs/rst/scala/http/routing-dsl/index.rst b/akka-docs/rst/scala/http/routing-dsl/index.rst index a4e1ee5121..10942ef517 100644 --- a/akka-docs/rst/scala/http/routing-dsl/index.rst +++ b/akka-docs/rst/scala/http/routing-dsl/index.rst @@ -23,6 +23,7 @@ static content serving. exception-handling path-matchers case-class-extraction + source-streaming-support testkit websocket-support diff --git a/akka-docs/rst/scala/http/routing-dsl/json-streaming-support.rst b/akka-docs/rst/scala/http/routing-dsl/source-streaming-support.rst similarity index 80% rename from akka-docs/rst/scala/http/routing-dsl/json-streaming-support.rst rename to akka-docs/rst/scala/http/routing-dsl/source-streaming-support.rst index 34dda6235b..d6c7cbdc46 100644 --- a/akka-docs/rst/scala/http/routing-dsl/json-streaming-support.rst +++ b/akka-docs/rst/scala/http/routing-dsl/source-streaming-support.rst @@ -1,11 +1,27 @@ .. _json-streaming-scala: +Source Streaming +================ + +Akka HTTP supports completing a request with an Akka ``Source[T, _]``, which makes it possible to very easily build +streaming end-to-end APIs which apply back-pressure throughout the entire stack. + +It is possible to complete requests with raw ``Source[ByteString, _]``, however often it is more convenient to +stream on an element-by-element basis, and allow Akka HTTP to handle the rendering internally - for example as a JSON array, +or CSV stream (where each element is separated by a new-line). + +In the following sections we investigate how to make use of the JSON Streaming infrastructure, +however the general hints apply to any kind of element-by-element streaming you could imagine. + +It is possible to implement your own framing for any content type you might need, including bianary formats +by implementing :class:`FramingWithContentType`. + JSON Streaming ============== `JSON Streaming`_ is a term refering to streaming a (possibly infinite) stream of element as independent JSON -objects onto one continious HTTP connection. The elements are most often separated using newlines, -however do not have to be and concatenating elements side-by-side or emitting "very long" JSON array is also another +objects as a continuous HTTP request or response. The elements are most often separated using newlines, +however do not have to be. Concatenating elements side-by-side or emitting "very long" JSON array is also another use case. In the below examples, we'll be refering to the ``User`` and ``Measurement`` case classes as our model, which are defined as: @@ -30,7 +46,6 @@ Firstly, we'll need to get some additional marshalling infrastructure set up, th Akka Streams ``Source[T,_]``. One such trait, containing the needed marshallers is ``SprayJsonSupport``, which uses spray-json (a high performance json parser library), and is shipped as part of Akka HTTP in the ``akka-http-spray-json-experimental`` module. -to and from ``Source[T,_]`` by using spray-json provided Next we import our model's marshallers, generated by spray-json. @@ -68,7 +83,7 @@ in case one element in front of the stream takes a long time to marshall, yet ot Consuming JSON Streaming uploads -------------------------------- -Sometimes the client may be sending in a streaming request, for example an embedded device initiated a connection with +Sometimes the client may be sending a streaming request, for example an embedded device initiated a connection with the server and is feeding it with one line of measurement data. In this example, we want to consume this data in a streaming fashion from the request entity, and also apply diff --git a/akka-http-marshallers-java/akka-http-jackson/src/main/java/akka/http/javadsl/marshallers/jackson/Jackson.java b/akka-http-marshallers-java/akka-http-jackson/src/main/java/akka/http/javadsl/marshallers/jackson/Jackson.java index eb9e8ec78d..92cd6b9bb6 100644 --- a/akka-http-marshallers-java/akka-http-jackson/src/main/java/akka/http/javadsl/marshallers/jackson/Jackson.java +++ b/akka-http-marshallers-java/akka-http-jackson/src/main/java/akka/http/javadsl/marshallers/jackson/Jackson.java @@ -11,6 +11,7 @@ import akka.http.javadsl.model.RequestEntity; import akka.http.javadsl.marshalling.Marshaller; import akka.http.javadsl.unmarshalling.Unmarshaller; +import akka.util.ByteString; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.MapperFeature; import com.fasterxml.jackson.databind.ObjectMapper; @@ -31,6 +32,10 @@ public class Jackson { ); } + public static Unmarshaller byteStringUnmarshaller(Class expectedType) { + return byteStringUnmarshaller(defaultObjectMapper, expectedType); + } + public static Unmarshaller unmarshaller(Class expectedType) { return unmarshaller(defaultObjectMapper, expectedType); } @@ -39,6 +44,10 @@ public class Jackson { return Unmarshaller.forMediaType(MediaTypes.APPLICATION_JSON, Unmarshaller.entityToString()) .thenApply(s -> fromJSON(mapper, s, expectedType)); } + + public static Unmarshaller byteStringUnmarshaller(ObjectMapper mapper, Class expectedType) { + return Unmarshaller.sync(s -> fromJSON(mapper, s.utf8String(), expectedType)); + } private static String toJSON(ObjectMapper mapper, Object object) { try { diff --git a/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala b/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala index c62f51314f..ea99804502 100644 --- a/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala +++ b/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala @@ -34,11 +34,11 @@ trait SprayJsonSupport { JsonParser(input) } - implicit def sprayJsonMarshallerConverter[T](writer: RootJsonWriter[T])(implicit printer: JsonPrinter = PrettyPrinter): ToEntityMarshaller[T] = + implicit def sprayJsonMarshallerConverter[T](writer: RootJsonWriter[T])(implicit printer: JsonPrinter = CompactPrinter): ToEntityMarshaller[T] = sprayJsonMarshaller[T](writer, printer) implicit def sprayJsonMarshaller[T](implicit writer: RootJsonWriter[T], printer: JsonPrinter = CompactPrinter): ToEntityMarshaller[T] = sprayJsValueMarshaller compose writer.write - implicit def sprayJsValueMarshaller(implicit printer: JsonPrinter = PrettyPrinter): ToEntityMarshaller[JsValue] = + implicit def sprayJsValueMarshaller(implicit printer: JsonPrinter = CompactPrinter): ToEntityMarshaller[JsValue] = Marshaller.StringMarshaller.wrap(MediaTypes.`application/json`)(printer) implicit def sprayByteStringMarshaller[T](implicit writer: RootJsonFormat[T], printer: JsonPrinter = CompactPrinter): Marshaller[T, ByteString] = sprayJsValueMarshaller.map(s ⇒ ByteString(s.toString)) compose writer.write diff --git a/akka-http-tests/src/test/java/akka/http/javadsl/server/JavaTestServer.java b/akka-http-tests/src/test/java/akka/http/javadsl/server/JavaTestServer.java index 65ca1cec44..70900a9987 100644 --- a/akka-http-tests/src/test/java/akka/http/javadsl/server/JavaTestServer.java +++ b/akka-http-tests/src/test/java/akka/http/javadsl/server/JavaTestServer.java @@ -17,6 +17,7 @@ import akka.http.javadsl.common.JsonSourceRenderingModes; import akka.stream.ActorMaterializer; import akka.stream.javadsl.Flow; import akka.stream.javadsl.Source; +import akka.util.ByteString; import scala.concurrent.duration.Duration; import scala.runtime.BoxedUnit; @@ -64,7 +65,7 @@ public class JavaTestServer extends AllDirectives { // or import static Directiv path("java", () -> completeOKWithFutureString(CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Boom!"); })))) ); - final Unmarshaller JavaTweets = Jackson.unmarshaller(JavaTweet.class); + final Unmarshaller JavaTweets = Jackson.byteStringUnmarshaller(JavaTweet.class); final Route tweets = path("tweets", () -> get(() -> parameter(StringUnmarshallers.INTEGER, "n", n -> { diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/MarshallingDirectivesSpec.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/MarshallingDirectivesSpec.scala index 50be2b4c58..fc04936353 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/MarshallingDirectivesSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/MarshallingDirectivesSpec.scala @@ -184,7 +184,7 @@ class MarshallingDirectivesSpec extends RoutingSpec with Inside { "render JSON with UTF-8 encoding if no `Accept-Charset` request header is present" in { Get() ~> complete(foo) ~> check { - responseEntity shouldEqual HttpEntity(`application/json`, foo.toJson.prettyPrint) + responseEntity shouldEqual HttpEntity(`application/json`, foo.toJson.compactPrint) } } "reject JSON rendering if an `Accept-Charset` request header requests a non-UTF-8 encoding" in { diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/RouteDirectivesSpec.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/RouteDirectivesSpec.scala index 26e9f65b29..df9ee7c2b5 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/RouteDirectivesSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/directives/RouteDirectivesSpec.scala @@ -98,10 +98,7 @@ class RouteDirectivesSpec extends FreeSpec with GenericRoutingSpec { import akka.http.scaladsl.model.headers.Accept Get().withHeaders(Accept(MediaTypes.`application/json`)) ~> route ~> check { responseAs[String] shouldEqual - """{ - | "name": "Ida", - | "age": 83 - |}""".stripMarginWithNewline("\n") + """{"name":"Ida","age":83}""" } Get().withHeaders(Accept(MediaTypes.`text/xml`)) ~> route ~> check { responseAs[xml.NodeSeq] shouldEqual Ida83 diff --git a/akka-http/src/main/scala/akka/http/javadsl/common/FramingWithContentType.scala b/akka-http/src/main/scala/akka/http/javadsl/common/FramingWithContentType.scala index 127c9bf890..483b3d624f 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/common/FramingWithContentType.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/common/FramingWithContentType.scala @@ -4,13 +4,23 @@ package akka.http.javadsl.common +import akka.NotUsed +import akka.event.Logging import akka.http.javadsl.model.ContentTypeRange -import akka.stream.javadsl.Framing +import akka.stream.javadsl.{ Flow, Framing } +import akka.util.ByteString -trait FramingWithContentType extends Framing { self ⇒ +/** + * Wraps a framing [[akka.stream.javadsl.Flow]] (as provided by [[Framing]] for example) + * that chunks up incoming [[akka.util.ByteString]] according to some [[akka.http.javadsl.model.ContentType]] + * specific logic. + */ +abstract class FramingWithContentType { self ⇒ import akka.http.impl.util.JavaMapping.Implicits._ - override def asScala: akka.http.scaladsl.common.FramingWithContentType = + def getFlow: Flow[ByteString, ByteString, NotUsed] + + def asScala: akka.http.scaladsl.common.FramingWithContentType = this match { case f: akka.http.scaladsl.common.FramingWithContentType ⇒ f case _ ⇒ new akka.http.scaladsl.common.FramingWithContentType { @@ -21,4 +31,6 @@ trait FramingWithContentType extends Framing { self ⇒ def supported: ContentTypeRange def matches(ct: akka.http.javadsl.model.ContentType): Boolean = supported.matches(ct) + + override def toString = s"${Logging.simpleName(getClass)}($supported)" } diff --git a/akka-http/src/main/scala/akka/http/javadsl/common/JsonSourceRenderingMode.scala b/akka-http/src/main/scala/akka/http/javadsl/common/JsonSourceRenderingMode.scala index b5fe32fdf9..7d4ca7b727 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/common/JsonSourceRenderingMode.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/common/JsonSourceRenderingMode.scala @@ -10,6 +10,8 @@ import akka.http.javadsl.model.{ ContentType, ContentTypes } * Specialised rendering mode for streaming elements as JSON. * * See also: JSON Streaming on Wikipedia. + * + * See [[JsonSourceRenderingModes]] for commonly used pre-defined rendering modes. */ trait JsonSourceRenderingMode extends SourceRenderingMode { override val contentType: ContentType.WithFixedCharset = diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/EntityStreamingSupport.scala b/akka-http/src/main/scala/akka/http/javadsl/server/EntityStreamingSupport.scala new file mode 100644 index 0000000000..571867ac63 --- /dev/null +++ b/akka-http/src/main/scala/akka/http/javadsl/server/EntityStreamingSupport.scala @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.http.javadsl.server + +import akka.NotUsed +import akka.http.javadsl.common.{ FramingWithContentType, SourceRenderingMode } +import akka.http.javadsl.model.{ ContentTypeRange, MediaRanges } +import akka.http.scaladsl.server.ApplicationJsonBracketCountingFraming +import akka.stream.javadsl.{ Flow, Framing } +import akka.util.ByteString + +/** + * Entity streaming support, independent of used Json parsing library etc. + * + * Can be extended by various Support traits (e.g. "SprayJsonSupport"), + * in order to provide users with both `framing` (this trait) and `marshalling` + * (implemented by a library) by using a single trait. + */ +object EntityStreamingSupport { + // in the ScalaDSL version we make users implement abstract methods that are supposed to be + // implicit vals. This helps to guide in implementing the needed values, however in Java that would not really help. + + /** `application/json` specific Framing implementation */ + def bracketCountingJsonFraming(maximumObjectLength: Int): FramingWithContentType = + new ApplicationJsonBracketCountingFraming(maximumObjectLength) + + /** + * Frames incoming `text / *` entities on a line-by-line basis. + * Useful for accepting `text/csv` uploads as a stream of rows. + */ + def newLineFraming(maximumObjectLength: Int, supportedContentTypes: ContentTypeRange): FramingWithContentType = + new FramingWithContentType { + override final val getFlow: Flow[ByteString, ByteString, NotUsed] = + Flow.of(classOf[ByteString]).via(Framing.delimiter(ByteString("\n"), maximumObjectLength)) + + override final val supported: ContentTypeRange = + akka.http.scaladsl.model.ContentTypeRange(akka.http.scaladsl.model.MediaRanges.`text/*`) + } +} diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/directives/FramedEntityStreamingDirectives.scala b/akka-http/src/main/scala/akka/http/javadsl/server/directives/FramedEntityStreamingDirectives.scala index e39288a8b2..153ee0601e 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/server/directives/FramedEntityStreamingDirectives.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/server/directives/FramedEntityStreamingDirectives.scala @@ -3,35 +3,25 @@ */ package akka.http.javadsl.server.directives -import akka.http.javadsl.model.{ ContentType, HttpEntity } -import akka.util.ByteString -import java.util.{ List ⇒ JList, Map ⇒ JMap } -import java.util.AbstractMap.SimpleImmutableEntry -import java.util.Optional import java.util.function.{ Function ⇒ JFunction } +import java.util.{ List ⇒ JList, Map ⇒ JMap } import akka.NotUsed - -import scala.collection.JavaConverters._ -import akka.http.impl.util.JavaMapping.Implicits._ import akka.http.javadsl.common.{ FramingWithContentType, SourceRenderingMode } +import akka.http.javadsl.model.{ HttpEntity, _ } import akka.http.javadsl.server.{ Marshaller, Route, Unmarshaller } -import akka.http.javadsl.model._ -import akka.http.scaladsl.marshalling.{ ToResponseMarshallable, ToResponseMarshaller } +import akka.http.scaladsl.marshalling.ToResponseMarshallable import akka.http.scaladsl.server.{ Directives ⇒ D } -import akka.http.scaladsl.unmarshalling import akka.stream.javadsl.Source +import akka.util.ByteString /** EXPERIMENTAL API */ abstract class FramedEntityStreamingDirectives extends TimeoutDirectives { - // important import, as we implicitly resolve some marshallers inside the below directives - import akka.http.scaladsl.server.directives.FramedEntityStreamingDirectives._ @CorrespondsTo("asSourceOf") - def entityasSourceOf[T](um: Unmarshaller[HttpEntity, T], framing: FramingWithContentType, + def entityasSourceOf[T](um: Unmarshaller[ByteString, T], framing: FramingWithContentType, inner: java.util.function.Function[Source[T, NotUsed], Route]): Route = RouteAdapter { - val sum = um.asScalaCastInput[akka.http.scaladsl.model.HttpEntity] - D.entity(D.asSourceOf[T](framing.asScala)(sum)) { s: akka.stream.scaladsl.Source[T, NotUsed] ⇒ + D.entity(D.asSourceOf[T](framing.asScala)(um.asScala)) { s: akka.stream.scaladsl.Source[T, NotUsed] ⇒ inner(s.asJava).delegate } } @@ -39,10 +29,9 @@ abstract class FramedEntityStreamingDirectives extends TimeoutDirectives { @CorrespondsTo("asSourceOfAsync") def entityAsSourceAsyncOf[T]( parallelism: Int, - um: Unmarshaller[HttpEntity, T], framing: FramingWithContentType, + um: Unmarshaller[ByteString, T], framing: FramingWithContentType, inner: java.util.function.Function[Source[T, NotUsed], Route]): Route = RouteAdapter { - val sum = um.asScalaCastInput[akka.http.scaladsl.model.HttpEntity] - D.entity(D.asSourceOfAsync[T](parallelism, framing.asScala)(sum)) { s: akka.stream.scaladsl.Source[T, NotUsed] ⇒ + D.entity(D.asSourceOfAsync[T](parallelism, framing.asScala)(um.asScala)) { s: akka.stream.scaladsl.Source[T, NotUsed] ⇒ inner(s.asJava).delegate } } @@ -50,18 +39,16 @@ abstract class FramedEntityStreamingDirectives extends TimeoutDirectives { @CorrespondsTo("asSourceOfAsyncUnordered") def entityAsSourceAsyncUnorderedOf[T]( parallelism: Int, - um: Unmarshaller[HttpEntity, T], framing: FramingWithContentType, + um: Unmarshaller[ByteString, T], framing: FramingWithContentType, inner: java.util.function.Function[Source[T, NotUsed], Route]): Route = RouteAdapter { - val sum = um.asScalaCastInput[akka.http.scaladsl.model.HttpEntity] - D.entity(D.asSourceOfAsyncUnordered[T](parallelism, framing.asScala)(sum)) { s: akka.stream.scaladsl.Source[T, NotUsed] ⇒ + D.entity(D.asSourceOfAsyncUnordered[T](parallelism, framing.asScala)(um.asScala)) { s: akka.stream.scaladsl.Source[T, NotUsed] ⇒ inner(s.asJava).delegate } } - // implicit used internally, Java caller does not benefit or use it + // implicits used internally, Java caller does not benefit or use it @CorrespondsTo("complete") def completeWithSource[T, M](implicit source: Source[T, M], m: Marshaller[T, ByteString], rendering: SourceRenderingMode): Route = RouteAdapter { - import akka.http.scaladsl.marshalling.PredefinedToResponseMarshallers._ implicit val mm = _sourceMarshaller(m.map(ByteStringAsEntityFn), rendering) val response = ToResponseMarshallable(source) D.complete(response) @@ -75,8 +62,8 @@ abstract class FramedEntityStreamingDirectives extends TimeoutDirectives { } implicit private def _sourceMarshaller[T, M](implicit m: Marshaller[T, HttpEntity], rendering: SourceRenderingMode) = { - import akka.http.javadsl.server.RoutingJavaMapping._ import akka.http.javadsl.server.RoutingJavaMapping.Implicits._ + import akka.http.javadsl.server.RoutingJavaMapping._ val mm = m.asScalaCastOutput D._sourceMarshaller[T, M](mm, rendering.asScala).compose({ h: akka.stream.javadsl.Source[T, M] ⇒ h.asScala }) } diff --git a/akka-http/src/main/scala/akka/http/scaladsl/common/FramingWithContentType.scala b/akka-http/src/main/scala/akka/http/scaladsl/common/FramingWithContentType.scala index 75743b674d..13dc72ecba 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/common/FramingWithContentType.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/common/FramingWithContentType.scala @@ -6,16 +6,18 @@ package akka.http.scaladsl.common import akka.NotUsed import akka.event.Logging -import akka.http.scaladsl.model.{ ContentType, ContentTypeRange } +import akka.http.scaladsl.model.ContentTypeRange import akka.stream.scaladsl.{ Flow, Framing } import akka.util.ByteString /** - * Same as [[akka.stream.scaladsl.Framing]] but additionally can express which [[ContentType]] it supports, - * which can be used to reject routes if content type does not match used framing. + * Wraps a framing [[akka.stream.scaladsl.Flow]] (as provided by [[Framing]] for example) + * that chunks up incoming [[akka.util.ByteString]] according to some [[akka.http.javadsl.model.ContentType]] + * specific logic. */ -abstract class FramingWithContentType extends akka.http.javadsl.common.FramingWithContentType with Framing { +abstract class FramingWithContentType extends akka.http.javadsl.common.FramingWithContentType { def flow: Flow[ByteString, ByteString, NotUsed] + override final def getFlow = flow.asJava override def supported: ContentTypeRange override def matches(ct: akka.http.javadsl.model.ContentType): Boolean = supported.matches(ct) diff --git a/akka-http/src/main/scala/akka/http/scaladsl/common/JsonSourceRenderingMode.scala b/akka-http/src/main/scala/akka/http/scaladsl/common/JsonSourceRenderingMode.scala index 52e3d1b754..824af23b8c 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/common/JsonSourceRenderingMode.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/common/JsonSourceRenderingMode.scala @@ -11,6 +11,8 @@ import akka.util.ByteString * Specialised rendering mode for streaming elements as JSON. * * See also: JSON Streaming on Wikipedia. + * + * See [[JsonSourceRenderingModes]] for commonly used pre-defined rendering modes. */ trait JsonSourceRenderingMode extends akka.http.javadsl.common.JsonSourceRenderingMode with SourceRenderingMode { override val contentType: ContentType.WithFixedCharset = diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/EntityStreamingSupport.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/EntityStreamingSupport.scala index 9b6f9bf988..f4656e8612 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/EntityStreamingSupport.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/EntityStreamingSupport.scala @@ -4,13 +4,10 @@ package akka.http.scaladsl.server import akka.NotUsed -import akka.actor.ActorSystem -import akka.event.Logging import akka.http.scaladsl.common.{ FramingWithContentType, SourceRenderingMode } -import akka.http.scaladsl.model.{ ContentTypeRange, ContentTypes, MediaRange, MediaRanges } +import akka.http.scaladsl.model.{ ContentTypeRange, ContentTypes, MediaRanges } import akka.stream.scaladsl.{ Flow, Framing } import akka.util.ByteString -import com.typesafe.config.Config /** * Entity streaming support, independent of used Json parsing library etc. @@ -45,13 +42,7 @@ trait EntityStreamingSupportBase { * Useful for accepting `text/csv` uploads as a stream of rows. */ def newLineFraming(maximumObjectLength: Int, supportedContentTypes: ContentTypeRange): FramingWithContentType = - new FramingWithContentType { - override final val flow: Flow[ByteString, ByteString, NotUsed] = - Flow[ByteString].via(Framing.delimiter(ByteString("\n"), maximumObjectLength)) - - override final val supported: ContentTypeRange = - ContentTypeRange(MediaRanges.`text/*`) - } + new TextNewLineFraming(maximumObjectLength, supportedContentTypes) } /** @@ -67,3 +58,11 @@ final class ApplicationJsonBracketCountingFraming(maximumObjectLength: Int) exte override final val flow = Flow[ByteString].via(akka.stream.scaladsl.JsonFraming.bracketCounting(maximumObjectLength)) override final val supported = ContentTypeRange(ContentTypes.`application/json`) } + +final class TextNewLineFraming(maximumLineLength: Int, supportedContentTypes: ContentTypeRange) extends FramingWithContentType { + override final val flow: Flow[ByteString, ByteString, NotUsed] = + Flow[ByteString].via(Framing.delimiter(ByteString("\n"), maximumLineLength)) + + override final val supported: ContentTypeRange = + ContentTypeRange(MediaRanges.`text/*`) +} diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala index c93c6e4020..1e97170a11 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala @@ -11,48 +11,150 @@ import akka.http.scaladsl.unmarshalling.{ Unmarshal, Unmarshaller, _ } import akka.http.scaladsl.util.FastFuture import akka.stream.Materializer import akka.stream.impl.ConstantFun -import akka.stream.scaladsl.{ Flow, Source } +import akka.stream.scaladsl.{ Flow, Keep, Source } import akka.util.ByteString import scala.concurrent.ExecutionContext import scala.language.implicitConversions /** - * Allows the [[MarshallingDirectives.entity]] directive to extract a `stream[T]` for framed messages. - * See `JsonEntityStreamingSupport` and classes extending it, such as `SprayJsonSupport` to get marshallers. + * Allows the [[MarshallingDirectives.entity]] directive to extract a [[Source]] of elements. + * + * See [[akka.http.scaladsl.server.EntityStreamingSupport]] for useful default [[FramingWithContentType]] instances and + * support traits such as `SprayJsonSupport` (or your other favourite JSON library) to provide the needed [[Marshaller]] s. */ trait FramedEntityStreamingDirectives extends MarshallingDirectives { import FramedEntityStreamingDirectives._ type RequestToSourceUnmarshaller[T] = FromRequestUnmarshaller[Source[T, NotUsed]] - // TODO DOCS - - final def asSourceOf[T](implicit um: Unmarshaller[HttpEntity, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] = - asSourceOfAsync(1)(um, framing) - final def asSourceOf[T](framing: FramingWithContentType)(implicit um: Unmarshaller[HttpEntity, T]): RequestToSourceUnmarshaller[T] = + /** + * Extracts entity as [[Source]] of elements of type `T`. + * This is achieved by applying the implicitly provided (in the following order): + * + * - 1st: [[FramingWithContentType]] in order to chunk-up the incoming [[ByteString]]s according to the + * `Content-Type` aware framing (for example, [[akka.http.scaladsl.server.EntityStreamingSupport.bracketCountingJsonFraming]]). + * - 2nd: [[Unmarshaller]] (from [[ByteString]] to `T`) for each of the respective "chunks" (e.g. for each JSON element contained within an array). + * + * The request will be rejected with an [[akka.http.scaladsl.server.UnsupportedRequestContentTypeRejection]] if + * its [[ContentType]] is not supported by the used `framing` or `unmarshaller`. + * + * It is recommended to use the [[akka.http.scaladsl.server.EntityStreamingSupport]] trait in conjunction with this + * directive as it helps provide the right [[FramingWithContentType]] and [[SourceRenderingMode]] for the most + * typical usage scenarios (JSON, CSV, ...). + * + * Cancelling extracted [[Source]] closes the connection abruptly (same as cancelling the `entity.dataBytes`). + * + * If looking to improve marshalling performance in face of many elements (possibly of different sizes), + * you may be interested in using [[asSourceOfAsyncUnordered]] instead. + * + * See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route. + * By default the uploaded data is limited by the `akka.http.parsing.max-content-length`. + */ + final def asSourceOf[T](implicit um: Unmarshaller[ByteString, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] = asSourceOfAsync(1)(um, framing) - final def asSourceOfAsync[T](parallelism: Int)(implicit um: Unmarshaller[HttpEntity, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] = - asSourceOfInternal[T](framing, (ec, mat) ⇒ Flow[HttpEntity].mapAsync(parallelism)(Unmarshal(_).to[T](um, ec, mat))) - final def asSourceOfAsync[T](parallelism: Int, framing: FramingWithContentType)(implicit um: Unmarshaller[HttpEntity, T]): RequestToSourceUnmarshaller[T] = + /** + * Extracts entity as [[Source]] of elements of type `T`. + * This is achieved by applying the implicitly provided (in the following order): + * + * - 1st: [[FramingWithContentType]] in order to chunk-up the incoming [[ByteString]]s according to the + * `Content-Type` aware framing (for example, [[akka.http.scaladsl.server.EntityStreamingSupport.bracketCountingJsonFraming]]). + * - 2nd: [[Unmarshaller]] (from [[ByteString]] to `T`) for each of the respective "chunks" (e.g. for each JSON element contained within an array). + * + * The request will be rejected with an [[akka.http.scaladsl.server.UnsupportedRequestContentTypeRejection]] if + * its [[ContentType]] is not supported by the used `framing` or `unmarshaller`. + * + * It is recommended to use the [[akka.http.scaladsl.server.EntityStreamingSupport]] trait in conjunction with this + * directive as it helps provide the right [[FramingWithContentType]] and [[SourceRenderingMode]] for the most + * typical usage scenarios (JSON, CSV, ...). + * + * Cancelling extracted [[Source]] closes the connection abruptly (same as cancelling the `entity.dataBytes`). + * + * If looking to improve marshalling performance in face of many elements (possibly of different sizes), + * you may be interested in using [[asSourceOfAsyncUnordered]] instead. + * + * See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route. + * By default the uploaded data is limited by the `akka.http.parsing.max-content-length`. + */ + final def asSourceOf[T](framing: FramingWithContentType)(implicit um: Unmarshaller[ByteString, T]): RequestToSourceUnmarshaller[T] = + asSourceOfAsync(1)(um, framing) + + /** + * Similar to [[asSourceOf]] however will apply at most `parallelism` unmarshallers in parallel. + * + * The source elements emitted preserve the order in which they are sent in the incoming [[HttpRequest]]. + * If you want to sacrivice ordering in favour of (potential) slight performance improvements in reading the input + * you may want to use [[asSourceOfAsyncUnordered]] instead, which lifts the ordering guarantee. + * + * Refer to [[asSourceOf]] for more in depth-documentation and guidelines. + * + * If looking to improve marshalling performance in face of many elements (possibly of different sizes), + * you may be interested in using [[asSourceOfAsyncUnordered]] instead. + * + * See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route. + * By default the uploaded data is limited by the `akka.http.parsing.max-content-length`. + */ + final def asSourceOfAsync[T](parallelism: Int)(implicit um: Unmarshaller[ByteString, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] = + asSourceOfInternal[T](framing, (ec, mat) ⇒ Flow[ByteString].mapAsync(parallelism)(Unmarshal(_).to[T](um, ec, mat))) + + /** + * Similar to [[asSourceOf]] however will apply at most `parallelism` unmarshallers in parallel. + * + * The source elements emitted preserve the order in which they are sent in the incoming [[HttpRequest]]. + * If you want to sacrivice ordering in favour of (potential) slight performance improvements in reading the input + * you may want to use [[asSourceOfAsyncUnordered]] instead, which lifts the ordering guarantee. + * + * Refer to [[asSourceOf]] for more in depth-documentation and guidelines. + * + * See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route. + * By default the uploaded data is limited by the `akka.http.parsing.max-content-length`. + */ + final def asSourceOfAsync[T](parallelism: Int, framing: FramingWithContentType)(implicit um: Unmarshaller[ByteString, T]): RequestToSourceUnmarshaller[T] = asSourceOfAsync(parallelism)(um, framing) - final def asSourceOfAsyncUnordered[T](parallelism: Int)(implicit um: Unmarshaller[HttpEntity, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] = - asSourceOfInternal[T](framing, (ec, mat) ⇒ Flow[HttpEntity].mapAsyncUnordered(parallelism)(Unmarshal(_).to[T](um, ec, mat))) - final def asSourceOfAsyncUnordered[T](parallelism: Int, framing: FramingWithContentType)(implicit um: Unmarshaller[HttpEntity, T]): RequestToSourceUnmarshaller[T] = + /** + * Similar to [[asSourceOfAsync]], as it will apply at most `parallelism` unmarshallers in parallel. + * + * The source elements emitted preserve the order in which they are sent in the incoming [[HttpRequest]]. + * If you want to sacrivice ordering in favour of (potential) slight performance improvements in reading the input + * you may want to use [[asSourceOfAsyncUnordered]] instead, which lifts the ordering guarantee. + * + * Refer to [[asSourceOf]] for more in depth-documentation and guidelines. + * + * See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route. + * By default the uploaded data is limited by the `akka.http.parsing.max-content-length`. + */ + final def asSourceOfAsyncUnordered[T](parallelism: Int)(implicit um: Unmarshaller[ByteString, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] = + asSourceOfInternal[T](framing, (ec, mat) ⇒ Flow[ByteString].mapAsyncUnordered(parallelism)(Unmarshal(_).to[T](um, ec, mat))) + /** + * Similar to [[asSourceOfAsync]], as it will apply at most `parallelism` unmarshallers in parallel. + * + * The source elements emitted preserve the order in which they are sent in the incoming [[HttpRequest]]. + * If you want to sacrivice ordering in favour of (potential) slight performance improvements in reading the input + * you may want to use [[asSourceOfAsyncUnordered]] instead, which lifts the ordering guarantee. + * + * Refer to [[asSourceOf]] for more in depth-documentation and guidelines. + * + * See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route. + * By default the uploaded data is limited by the `akka.http.parsing.max-content-length`. + */ + final def asSourceOfAsyncUnordered[T](parallelism: Int, framing: FramingWithContentType)(implicit um: Unmarshaller[ByteString, T]): RequestToSourceUnmarshaller[T] = asSourceOfAsyncUnordered(parallelism)(um, framing) // format: OFF - private def asSourceOfInternal[T](framing: FramingWithContentType, marshalling: (ExecutionContext, Materializer) => Flow[HttpEntity, ByteString, NotUsed]#ReprMat[T, NotUsed]): RequestToSourceUnmarshaller[T] = + private final def asSourceOfInternal[T](framing: FramingWithContentType, marshalling: (ExecutionContext, Materializer) => Flow[ByteString, ByteString, NotUsed]#ReprMat[T, NotUsed]): RequestToSourceUnmarshaller[T] = Unmarshaller.withMaterializer[HttpRequest, Source[T, NotUsed]] { implicit ec ⇒ implicit mat ⇒ req ⇒ val entity = req.entity if (!framing.matches(entity.contentType)) { val supportedContentTypes = framing.supported FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(supportedContentTypes)) } else { -// val stream = entity.dataBytes.via(framing.flow).via(marshalling(ec, mat)).mapMaterializedValue(_ => NotUsed) - val stream = Source.single(entity.transformDataBytes(framing.flow)).via(marshalling(ec, mat)).mapMaterializedValue(_ => NotUsed) + val bytes = entity.dataBytes + val frames = bytes.viaMat(framing.flow)(Keep.right) + val elements = frames.viaMat(marshalling(ec, mat))(Keep.right) + val stream = elements.mapMaterializedValue(_ => NotUsed) +// val stream = Source.single(entity.transformDataBytes(framing.flow)).via(marshalling(ec, mat)).mapMaterializedValue(_ => NotUsed) FastFuture.successful(stream) } } @@ -108,6 +210,11 @@ trait FramedEntityStreamingDirectives extends MarshallingDirectives { new EnableSpecialSourceRenderingModes(source) } +/** + * Allows the [[MarshallingDirectives.entity]] directive to extract a [[Source]] of elements. + * + * See [[FramedEntityStreamingDirectives]] for detailed documentation. + */ object FramedEntityStreamingDirectives extends FramedEntityStreamingDirectives { sealed class AsyncSourceRenderingMode final class AsyncRenderingOf[T](val source: Source[T, Any], val parallelism: Int) extends AsyncSourceRenderingMode @@ -115,6 +222,7 @@ object FramedEntityStreamingDirectives extends FramedEntityStreamingDirectives { } +/** Provides DSL for special rendering modes, e.g. `complete(source.renderAsync)` */ final class EnableSpecialSourceRenderingModes[T](val source: Source[T, Any]) extends AnyVal { /** * Causes the response stream to be marshalled asynchronously (up to `parallelism` elements at once), diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala index ccd58690dd..e6b657fd59 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala @@ -21,7 +21,7 @@ class JsonFramingSpec extends AkkaSpec { implicit val mat = ActorMaterializer() "collecting multiple json" should { - "xoxo parse json array" in { + "parse json array" in { val input = """ |[ @@ -38,9 +38,9 @@ class JsonFramingSpec extends AkkaSpec { } result.futureValue shouldBe Seq( - """{ "name" : "john" }""".stripMargin, - """{ "name" : "jack" }""".stripMargin, - """{ "name" : "katie" }""".stripMargin) + """{ "name" : "john" }""", + """{ "name" : "jack" }""", + """{ "name" : "katie" }""") } "emit single json element from string" in { @@ -56,7 +56,7 @@ class JsonFramingSpec extends AkkaSpec { case (acc, entry) ⇒ acc ++ Seq(entry.utf8String) } - Await.result(result, 3.seconds) shouldBe Seq("""{ "name": "john" }""".stripMargin) + Await.result(result, 3.seconds) shouldBe Seq("""{ "name": "john" }""") } "parse line delimited" in { @@ -73,9 +73,9 @@ class JsonFramingSpec extends AkkaSpec { } Await.result(result, 3.seconds) shouldBe Seq( - """{ "name": "john" }""".stripMargin, - """{ "name": "jack" }""".stripMargin, - """{ "name": "katie" }""".stripMargin) + """{ "name": "john" }""", + """{ "name": "jack" }""", + """{ "name": "katie" }""") } "parse comma delimited" in { @@ -91,7 +91,7 @@ class JsonFramingSpec extends AkkaSpec { } result.futureValue shouldBe Seq( - """{ "name": "john" }""".stripMargin, + """{ "name": "john" }""", """{ "name": "jack" }""", """{ "name": "katie" }""") } @@ -121,7 +121,6 @@ class JsonFramingSpec extends AkkaSpec { } } - // TODO fold these specs into the previous section "collecting json buffer" when { "nothing is supplied" should { "return nothing" in { @@ -378,7 +377,7 @@ class JsonFramingSpec extends AkkaSpec { "returns none until valid json is encountered" in { val buffer = new JsonBracketCounting() - """{ "name": "john"""".stripMargin.foreach { + """{ "name": "john"""".foreach { c ⇒ buffer.offer(ByteString(c)) buffer.poll() should ===(None) @@ -434,7 +433,7 @@ class JsonFramingSpec extends AkkaSpec { probe.ensureSubscription() probe .request(1) - .expectNext(ByteString("""{ "name": "john" }""")) // FIXME we should not impact the given json in Framing + .expectNext(ByteString("""{ "name": "john" }""")) .request(1) .expectNext(ByteString("""{ "name": "jack" }""")) .request(1) diff --git a/akka-stream/src/main/scala/akka/stream/impl/JsonBracketCounting.scala b/akka-stream/src/main/scala/akka/stream/impl/JsonBracketCounting.scala index 7bd9e18ca2..850d3c0f68 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/JsonBracketCounting.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/JsonBracketCounting.scala @@ -8,15 +8,18 @@ import akka.util.ByteString import scala.annotation.switch -object JsonBracketCounting { +/** + * INTERNAL API: Use [[akka.stream.scaladsl.JsonFraming]] instead. + */ +private[akka] object JsonBracketCounting { - final val SquareBraceStart = "[".getBytes.head - final val SquareBraceEnd = "]".getBytes.head - final val CurlyBraceStart = "{".getBytes.head - final val CurlyBraceEnd = "}".getBytes.head - final val DoubleQuote = "\"".getBytes.head - final val Backslash = "\\".getBytes.head - final val Comma = ",".getBytes.head + final val SquareBraceStart = '['.toByte + final val SquareBraceEnd = ']'.toByte + final val CurlyBraceStart = '{'.toByte + final val CurlyBraceEnd = '}'.toByte + final val DoubleQuote = '\''.toByte + final val Backslash = '\\'.toByte + final val Comma = ','.toByte final val LineBreak = '\n'.toByte final val LineBreak2 = '\r'.toByte @@ -31,13 +34,15 @@ object JsonBracketCounting { } /** + * INTERNAL API: Use [[akka.stream.scaladsl.JsonFraming]] instead. + * * **Mutable** framing implementation that given any number of [[ByteString]] chunks, can emit JSON objects contained within them. * Typically JSON objects are separated by new-lines or comas, however a top-level JSON Array can also be understood and chunked up * into valid JSON objects by this framing implementation. * * Leading whitespace between elements will be trimmed. */ -class JsonBracketCounting(maximumObjectLength: Int = Int.MaxValue) { +private[akka] class JsonBracketCounting(maximumObjectLength: Int = Int.MaxValue) { import JsonBracketCounting._ private var buffer: ByteString = ByteString.empty diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Framing.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Framing.scala index 6ae30d8564..b7b21030b1 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Framing.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Framing.scala @@ -115,18 +115,3 @@ object Framing { scaladsl.Framing.simpleFramingProtocol(maximumMessageLength).asJava } - -/** - * Wrapper around a framing Flow (as provided by [[Framing.delimiter]] for example. - * Used for providing a framing implicitly for other components which may need one (such as framed entity streaming in Akka HTTP). - */ -trait Framing { - def asScala: akka.stream.scaladsl.Framing = - this match { - case f: akka.stream.scaladsl.Framing ⇒ f - case _ ⇒ new akka.stream.scaladsl.Framing { - override def flow = getFlow.asScala - } - } - def getFlow: Flow[ByteString, ByteString, NotUsed] -} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala index 4dadca9756..7ff38cae36 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala @@ -287,14 +287,3 @@ object Framing { } } - -/** - * Wrapper around a framing Flow (as provided by [[Framing.delimiter]] for example. - * Used for providing a framing implicitly for other components which may need one (such as framed entity streaming in Akka HTTP). - */ -trait Framing extends akka.stream.javadsl.Framing { - final def asJava: akka.stream.javadsl.Framing = this - override final def getFlow = flow.asJava - - def flow: Flow[ByteString, ByteString, NotUsed] -}