diff --git a/akka-docs/rst/java/code/docs/http/javadsl/server/JsonStreamingExamplesTest.java b/akka-docs/rst/java/code/docs/http/javadsl/server/JsonStreamingExamplesTest.java index acc6bbfac2..c43d6241e5 100644 --- a/akka-docs/rst/java/code/docs/http/javadsl/server/JsonStreamingExamplesTest.java +++ b/akka-docs/rst/java/code/docs/http/javadsl/server/JsonStreamingExamplesTest.java @@ -5,17 +5,21 @@ package docs.http.javadsl.server; import akka.NotUsed; -import akka.http.javadsl.common.FramingWithContentType; -import akka.http.javadsl.common.JsonSourceRenderingModes; +import akka.http.javadsl.common.CsvEntityStreamingSupport; +import akka.http.javadsl.common.JsonEntityStreamingSupport; import akka.http.javadsl.marshallers.jackson.Jackson; +import akka.http.javadsl.marshalling.Marshaller; import akka.http.javadsl.model.*; import akka.http.javadsl.model.headers.Accept; import akka.http.javadsl.server.*; import akka.http.javadsl.testkit.JUnitRouteTest; import akka.http.javadsl.testkit.TestRoute; +import akka.http.javadsl.unmarshalling.StringUnmarshallers; +import akka.http.javadsl.common.EntityStreamingSupport; +import akka.http.javadsl.unmarshalling.Unmarshaller; +import akka.stream.javadsl.Flow; import akka.stream.javadsl.Source; import akka.util.ByteString; -import docs.http.javadsl.server.testkit.MyAppService; import org.junit.Test; import java.util.concurrent.CompletionStage; @@ -29,12 +33,32 @@ public class JsonStreamingExamplesTest extends JUnitRouteTest { //#formats //#response-streaming + + // Step 1: Enable JSON streaming + // we're not using this in the example, but it's the simplest way to start: + // The default rendering is a JSON array: `[el, el, el , ...]` + final JsonEntityStreamingSupport jsonStreaming = EntityStreamingSupport.json(); + + // Step 1.1: Enable and customise how we'll render the JSON, as a compact array: + final ByteString start = ByteString.fromString("["); + final ByteString between = ByteString.fromString(","); + final ByteString end = ByteString.fromString("]"); + final Flow compactArrayRendering = + Flow.of(ByteString.class).intersperse(start, between, end); + + final JsonEntityStreamingSupport compactJsonSupport = EntityStreamingSupport.json() + .withFramingRendererFlow(compactArrayRendering); + + + // Step 2: implement the route final Route responseStreaming = path("tweets", () -> get(() -> parameter(StringUnmarshallers.INTEGER, "n", n -> { final Source tws = - Source.repeat(new JavaTweet("Hello World!")).take(n); - return completeOKWithSource(tws, Jackson.marshaller(), JsonSourceRenderingModes.arrayCompact()); + Source.repeat(new JavaTweet(12, "Hello World!")).take(n); + + // Step 3: call complete* with your source, marshaller, and stream rendering mode + return completeOKWithSource(tws, Jackson.marshaller(), compactJsonSupport); }) ) ); @@ -44,9 +68,9 @@ public class JsonStreamingExamplesTest extends JUnitRouteTest { final Route incomingStreaming = path("tweets", () -> post(() -> extractMaterializer(mat -> { - final FramingWithContentType jsonFraming = EntityStreamingSupport.bracketCountingJsonFraming(128); + final JsonEntityStreamingSupport jsonSupport = EntityStreamingSupport.json(); - return entityasSourceOf(JavaTweets, jsonFraming, sourceOfTweets -> { + return entityAsSourceOf(JavaTweets, jsonSupport, sourceOfTweets -> { final CompletionStage tweetsCount = sourceOfTweets.runFold(0, (acc, tweet) -> acc + 1, mat); return onComplete(tweetsCount, c -> complete("Total number of tweets: " + c)); }); @@ -58,6 +82,29 @@ public class JsonStreamingExamplesTest extends JUnitRouteTest { return responseStreaming.orElse(incomingStreaming); } + + final Route csvTweets() { + //#csv-example + final Marshaller renderAsCsv = + Marshaller.withFixedContentType(ContentTypes.TEXT_CSV_UTF8, t -> + ByteString.fromString(t.getId() + "," + t.getMessage()) + ); + + final CsvEntityStreamingSupport compactJsonSupport = EntityStreamingSupport.csv(); + + final Route responseStreaming = path("tweets", () -> + get(() -> + parameter(StringUnmarshallers.INTEGER, "n", n -> { + final Source tws = + Source.repeat(new JavaTweet(12, "Hello World!")).take(n); + return completeWithSource(tws, renderAsCsv, compactJsonSupport); + }) + ) + ); + //#csv-example + + return responseStreaming; + } //#routes @Test @@ -70,7 +117,7 @@ public class JsonStreamingExamplesTest extends JUnitRouteTest { final Accept acceptApplication = Accept.create(MediaRanges.create(MediaTypes.APPLICATION_JSON)); routes.run(HttpRequest.GET("/tweets?n=2").addHeader(acceptApplication)) .assertStatusCode(200) - .assertEntity("[{\"message\":\"Hello World!\"},{\"message\":\"Hello World!\"}]"); + .assertEntity("[{\"id\":12,\"message\":\"Hello World!\"},{\"id\":12,\"message\":\"Hello World!\"}]"); // test responses to potential errors final Accept acceptText = Accept.create(MediaRanges.ALL_TEXT); @@ -79,15 +126,46 @@ public class JsonStreamingExamplesTest extends JUnitRouteTest { .assertEntity("Resource representation is only available with these types:\napplication/json"); //#response-streaming } + + @Test + public void csvExampleTweetsTest() { + //#response-streaming + // tests -------------------------------------------- + final TestRoute routes = testRoute(csvTweets()); + + // test happy path + final Accept acceptCsv = Accept.create(MediaRanges.create(MediaTypes.TEXT_CSV)); + routes.run(HttpRequest.GET("/tweets?n=2").addHeader(acceptCsv)) + .assertStatusCode(200) + .assertEntity("12,Hello World!\n" + + "12,Hello World!"); + + // test responses to potential errors + final Accept acceptText = Accept.create(MediaRanges.ALL_APPLICATION); + routes.run(HttpRequest.GET("/tweets?n=3").addHeader(acceptText)) + .assertStatusCode(StatusCodes.NOT_ACCEPTABLE) // 406 + .assertEntity("Resource representation is only available with these types:\ntext/csv; charset=UTF-8"); + //#response-streaming + } //#models private static final class JavaTweet { + private int id; private String message; - public JavaTweet(String message) { + public JavaTweet(int id, String message) { + this.id = id; this.message = message; } + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + public void setMessage(String message) { this.message = message; } diff --git a/akka-docs/rst/java/http/routing-dsl/source-streaming-support.rst b/akka-docs/rst/java/http/routing-dsl/source-streaming-support.rst index f4a0e7c4b6..acf46421b6 100644 --- a/akka-docs/rst/java/http/routing-dsl/source-streaming-support.rst +++ b/akka-docs/rst/java/http/routing-dsl/source-streaming-support.rst @@ -3,18 +3,15 @@ Source Streaming ================ -Akka HTTP supports completing a request with an Akka ``Source``, which makes it possible to easily build -streaming end-to-end APIs which apply back-pressure throughout the entire stack. +Akka HTTP supports completing a request with an Akka ``Source``, which makes it possible to easily build +and consume streaming end-to-end APIs which apply back-pressure throughout the entire stack. -It is possible to complete requests with raw ``Source``, however often it is more convenient to +It is possible to complete requests with raw ``Source``, however often it is more convenient to stream on an element-by-element basis, and allow Akka HTTP to handle the rendering internally - for example as a JSON array, or CSV stream (where each element is separated by a new-line). In the following sections we investigate how to make use of the JSON Streaming infrastructure, -however the general hints apply to any kind of element-by-element streaming you could imagine. - -It is possible to implement your own framing for any content type you might need, including bianary formats -by implementing :class:`FramingWithContentType`. +however the general hints apply to any kind of element-by-element streaming you could imagine. JSON Streaming ============== @@ -24,7 +21,7 @@ objects as a continuous HTTP request or response. The elements are most often se however do not have to be. Concatenating elements side-by-side or emitting "very long" JSON array is also another use case. -In the below examples, we'll be refering to the ``User`` and ``Measurement`` case classes as our model, which are defined as: +In the below examples, we'll be refering to the ``Tweet`` and ``Measurement`` case classes as our model, which are defined as: .. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#models @@ -36,11 +33,21 @@ Responding with JSON Streams In this example we implement an API representing an infinite stream of tweets, very much like Twitter's `Streaming API`_. Firstly, we'll need to get some additional marshalling infrastructure set up, that is able to marshal to and from an -Akka Streams ``Source``. One such trait, containing the needed marshallers is ``SprayJsonSupport``, which uses -spray-json (a high performance json parser library), and is shipped as part of Akka HTTP in the -``akka-http-spray-json-experimental`` module. +Akka Streams ``Source``. Here we'll use the ``Jackson`` helper class from ``akka-http-jackson`` (a separate library +that you should add as a dependency if you want to use Jackson with Akka HTTP). -The last bit of setup, before we can render a streaming json response +First we enable JSON Streaming by making an implicit ``EntityStreamingSupport`` instance available (Step 1). + +The default mode of rendering a ``Source`` is to represent it as an JSON Array. If you want to change this representation +for example to use Twitter style new-line separated JSON objects, you can do so by configuring the support trait accordingly. + +In Step 1.1. we demonstrate to configure configude the rendering to be new-line separated, and also how parallel marshalling +can be applied. We configure the Support object to render the JSON as series of new-line separated JSON objects, +simply by providing the ``start``, ``sep`` and ``end`` ByteStrings, which will be emitted at the apropriate +places in the rendered stream. Although this format is *not* valid JSON, it is pretty popular since parsing it is relatively +simple - clients need only to find the new-lines and apply JSON unmarshalling for an entire line of JSON. + +The final step is simply completing a request using a Source of tweets, as simple as that: .. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#response-streaming @@ -60,15 +67,25 @@ will be applied automatically thanks to using Akka HTTP/Streams). .. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#incoming-request-streaming -Implementing custom (Un)Marshaller support for JSON streaming -------------------------------------------------------------- -The following types that may need to be implemented by a custom framed-streaming support library are: +Simple CSV streaming example +---------------------------- -- ``SourceRenderingMode`` which can customise how to render the begining / between-elements and ending of such - stream (while writing a response, i.e. by calling ``complete(source)``). - Implementations for JSON are available in ``akka.http.scaladsl.common.JsonSourceRenderingMode``. -- ``FramingWithContentType`` which is needed to be able to split incoming ``ByteString`` - chunks into frames of the higher-level data type format that is understood by the provided unmarshallers. - In the case of JSON it means chunking up ByteStrings such that each emitted element corresponds to exactly one JSON object, - this framing is implemented in ``EntityStreamingSupport``. +Akka HTTP provides another ``EntityStreamingSupport`` out of the box, namely ``csv`` (comma-separated values). +For completeness, we demonstrate its usage in the below snippet. As you'll notice, switching betweeen streaming +modes is fairly simple, one only has to make sure that an implicit ``Marshaller`` of the requested type is available, +and that the streaming support operates on the same ``Content-Type`` as the rendered values. Otherwise you'll see +an error during runtime that the marshaller did not expose the expected content type and thus we can not render +the streaming response). + +.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#csv-example + +Implementing custom EntityStreamingSupport traits +------------------------------------------------- + +The ``EntityStreamingSupport`` infrastructure is open for extension and not bound to any single format, content type +or marshalling library. The provided JSON support does not rely on Spray JSON directly, but uses ``Marshaller`` +instances, which can be provided using any JSON marshalling library (such as Circe, Jawn or Play JSON). + +When implementing a custom support trait, one should simply extend the ``EntityStreamingSupport`` abstract class, +and implement all of it's methods. It's best to use the existing implementations as a guideline. diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala index 4f047ae595..c311515c1b 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala @@ -5,13 +5,15 @@ package docs.http.scaladsl.server.directives import akka.NotUsed -import akka.http.scaladsl.common.{ FramingWithContentType, JsonSourceRenderingModes } -import akka.http.scaladsl.marshalling.ToResponseMarshallable +import akka.http.scaladsl.common.{ EntityStreamingSupport, JsonEntityStreamingSupport } +import akka.http.scaladsl.marshalling._ import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.Accept import akka.http.scaladsl.server.{ UnacceptedResponseContentTypeRejection, UnsupportedRequestContentTypeRejection } import akka.stream.scaladsl.{ Flow, Source } +import akka.util.ByteString import docs.http.scaladsl.server.RoutingSpec +import spray.json.JsValue import scala.concurrent.Future @@ -29,39 +31,41 @@ class JsonStreamingExamplesSpec extends RoutingSpec { Tweet(3, "You cannot enter the same river twice."))) //#formats - object MyJsonProtocol extends spray.json.DefaultJsonProtocol { + object MyJsonProtocol + extends akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport + with spray.json.DefaultJsonProtocol { + implicit val tweetFormat = jsonFormat2(Tweet.apply) implicit val measurementFormat = jsonFormat2(Measurement.apply) } //# "spray-json-response-streaming" in { - // [1] import generic spray-json marshallers support: - import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ - - // [2] import "my protocol", for marshalling Tweet objects: + // [1] import "my protocol", for marshalling Tweet objects: import MyJsonProtocol._ - // [3] pick json rendering mode: - // HINT: if you extend `akka.http.scaladsl.server.EntityStreamingSupport` - // it'll guide you to do so via abstract defs - implicit val jsonRenderingMode = JsonSourceRenderingModes.LineByLine + // [2] pick a Source rendering support trait: + // Note that the default support renders the Source as JSON Array + implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json() val route = path("tweets") { + // [3] simply complete a request with a source of tweets: val tweets: Source[Tweet, NotUsed] = getTweets() complete(tweets) } - // tests: + // tests ------------------------------------------------------------ val AcceptJson = Accept(MediaRange(MediaTypes.`application/json`)) val AcceptXml = Accept(MediaRange(MediaTypes.`text/xml`)) Get("/tweets").withHeaders(AcceptJson) ~> route ~> check { responseAs[String] shouldEqual - """{"uid":1,"txt":"#Akka rocks!"}""" + "\n" + - """{"uid":2,"txt":"Streaming is so hot right now!"}""" + "\n" + - """{"uid":3,"txt":"You cannot enter the same river twice."}""" + """[""" + + """{"uid":1,"txt":"#Akka rocks!"},""" + + """{"uid":2,"txt":"Streaming is so hot right now!"},""" + + """{"uid":3,"txt":"You cannot enter the same river twice."}""" + + """]""" } // endpoint can only marshal Json, so it will *reject* requests for application/xml: @@ -71,44 +75,115 @@ class JsonStreamingExamplesSpec extends RoutingSpec { } } - "response-streaming-modes" in { - import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ + "line-by-line-json-response-streaming" in { import MyJsonProtocol._ - implicit val jsonRenderingMode = JsonSourceRenderingModes.LineByLine - //#async-rendering - path("tweets") { - val tweets: Source[Tweet, NotUsed] = getTweets() - complete(tweets.renderAsync(parallelism = 8)) - } - //# + // Configure the EntityStreamingSupport to render the elements as: + // {"example":42} + // {"example":43} + // ... + // {"example":1000} + val start = ByteString.empty + val sep = ByteString("\n") + val end = ByteString.empty - //#async-unordered-rendering - path("tweets" / "unordered") { - val tweets: Source[Tweet, NotUsed] = getTweets() - complete(tweets.renderAsyncUnordered(parallelism = 8)) + implicit val jsonStreamingSupport = EntityStreamingSupport.json() + .withFramingRenderer(Flow[ByteString].intersperse(start, sep, end)) + + val route = + path("tweets") { + // [3] simply complete a request with a source of tweets: + val tweets: Source[Tweet, NotUsed] = getTweets() + complete(tweets) + } + + // tests ------------------------------------------------------------ + val AcceptJson = Accept(MediaRange(MediaTypes.`application/json`)) + + Get("/tweets").withHeaders(AcceptJson) ~> route ~> check { + responseAs[String] shouldEqual + """{"uid":1,"txt":"#Akka rocks!"}""" + "\n" + + """{"uid":2,"txt":"Streaming is so hot right now!"}""" + "\n" + + """{"uid":3,"txt":"You cannot enter the same river twice."}""" + } + } + + "csv-example" in { + // [1] provide a marshaller to ByteString + implicit val tweetAsCsv = Marshaller.strict[Tweet, ByteString] { t => + Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () => { + val txt = t.txt.replaceAll(",", ".") + val uid = t.uid + ByteString(List(uid, txt).mkString(",")) + }) + } + + // [2] enable csv streaming: + implicit val csvStreaming = EntityStreamingSupport.csv() + + val route = + path("tweets") { + val tweets: Source[Tweet, NotUsed] = getTweets() + complete(tweets) + } + + // tests ------------------------------------------------------------ + val AcceptCsv = Accept(MediaRange(MediaTypes.`text/csv`)) + + Get("/tweets").withHeaders(AcceptCsv) ~> route ~> check { + responseAs[String] shouldEqual + """|1,#Akka rocks! + |2,Streaming is so hot right now! + |3,You cannot enter the same river twice.""" + .stripMargin + } + } + + "response-streaming-modes" in { + + { + //#async-rendering + import MyJsonProtocol._ + implicit val jsonStreamingSupport: JsonEntityStreamingSupport = + EntityStreamingSupport.json() + .withParallelMarshalling(parallelism = 8, unordered = false) + + path("tweets") { + val tweets: Source[Tweet, NotUsed] = getTweets() + complete(tweets) + } + //# + } + + { + + //#async-unordered-rendering + import MyJsonProtocol._ + implicit val jsonStreamingSupport: JsonEntityStreamingSupport = + EntityStreamingSupport.json() + .withParallelMarshalling(parallelism = 8, unordered = true) + + path("tweets" / "unordered") { + val tweets: Source[Tweet, NotUsed] = getTweets() + complete(tweets) + } + //# } - //# } "spray-json-request-streaming" in { - // [1] import generic spray-json (un)marshallers support: - import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ - - // [1.1] import framing mode - import akka.http.scaladsl.server.EntityStreamingSupport - implicit val jsonFramingMode: FramingWithContentType = - EntityStreamingSupport.bracketCountingJsonFraming(Int.MaxValue) - - // [2] import "my protocol", for unmarshalling Measurement objects: + // [1] import "my protocol", for unmarshalling Measurement objects: import MyJsonProtocol._ - // [3] prepare your persisting logic here + // [2] enable Json Streaming + implicit val jsonStreamingSupport = EntityStreamingSupport.json() + + // prepare your persisting logic here val persistMetrics = Flow[Measurement] val route = path("metrics") { - // [4] extract Source[Measurement, _] + // [3] extract Source[Measurement, _] entity(asSourceOf[Measurement]) { measurements => // alternative syntax: // entity(as[Source[Measurement, NotUsed]]) { measurements => @@ -123,7 +198,8 @@ class JsonStreamingExamplesSpec extends RoutingSpec { } } - // tests: + // tests ------------------------------------------------------------ + // uploading an array or newline separated values works out of the box val data = HttpEntity( ContentTypes.`application/json`, """ diff --git a/akka-docs/rst/scala/http/routing-dsl/source-streaming-support.rst b/akka-docs/rst/scala/http/routing-dsl/source-streaming-support.rst index 8ead39b766..4a2d121b80 100644 --- a/akka-docs/rst/scala/http/routing-dsl/source-streaming-support.rst +++ b/akka-docs/rst/scala/http/routing-dsl/source-streaming-support.rst @@ -4,7 +4,7 @@ Source Streaming ================ Akka HTTP supports completing a request with an Akka ``Source[T, _]``, which makes it possible to easily build -streaming end-to-end APIs which apply back-pressure throughout the entire stack. +and consume streaming end-to-end APIs which apply back-pressure throughout the entire stack. It is possible to complete requests with raw ``Source[ByteString, _]``, however often it is more convenient to stream on an element-by-element basis, and allow Akka HTTP to handle the rendering internally - for example as a JSON array, @@ -13,9 +13,6 @@ or CSV stream (where each element is separated by a new-line). In the following sections we investigate how to make use of the JSON Streaming infrastructure, however the general hints apply to any kind of element-by-element streaming you could imagine. -It is possible to implement your own framing for any content type you might need, including bianary formats -by implementing :class:`FramingWithContentType`. - JSON Streaming ============== @@ -24,7 +21,7 @@ objects as a continuous HTTP request or response. The elements are most often se however do not have to be. Concatenating elements side-by-side or emitting "very long" JSON array is also another use case. -In the below examples, we'll be refering to the ``User`` and ``Measurement`` case classes as our model, which are defined as: +In the below examples, we'll be refering to the ``Tweet`` and ``Measurement`` case classes as our model, which are defined as: .. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala :snippet: models @@ -47,32 +44,53 @@ Akka Streams ``Source[T,_]``. One such trait, containing the needed marshallers spray-json (a high performance json parser library), and is shipped as part of Akka HTTP in the ``akka-http-spray-json-experimental`` module. -Next we import our model's marshallers, generated by spray-json. +Once the general infrastructure is prepared we import our model's marshallers, generated by spray-json (Step 1), +and enable JSON Streaming by making an implicit ``EntityStreamingSupport`` instance available (Step 2). +Akka HTTP pre-packages JSON and CSV entity streaming support, however it is simple to add your own, in case you'd +like to stream a different content type (for example plists or protobuf). -The last bit of setup, before we can render a streaming json response +The final step is simply completing a request using a Source of tweets, as simple as that: .. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala :snippet: spray-json-response-streaming +The reason the ``EntityStreamingSupport`` has to be enabled explicitly is that one might want to configure how the +stream should be rendered. We'll dicuss this in depth in the next section though. + .. _Streaming API: https://dev.twitter.com/streaming/overview Customising response rendering mode ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -The mode in which a response is marshalled and then rendered to the HttpResponse from the provided ``Source[T,_]`` -is customisable (thanks to conversions originating from ``Directives`` via ``EntityStreamingDirectives``). +Since it is not always possible to directly and confidently answer the question of how a stream of ``T`` should look on +the wire, the ``EntityStreamingSupport`` traits come into play and allow fine-tuning the streams rendered representation. -Since Marshalling is a potentially asynchronous operation in Akka HTTP (because transforming ``T`` to ``JsValue`` may -potentially take a long time (depending on your definition of "long time"), we allow to run marshalling concurrently -(up to ``parallelism`` concurrent marshallings) by using the ``renderAsync(parallelism)`` mode: +For example, in case of JSON Streaming, there isn't really one standard about rendering the response. Some APIs prefer +to render multiple JSON objects in a line-by-line fashion (Twitter's streaming APIs for example), while others simply return +very large arrays, which could be streamed as well. + +Akka defaults to the second one (streaming a JSON Array), as it is correct JSON and clients not expecting +a streaming API would still be able to consume it in a naive way if they'd want to. + +The line-by-line aproach however is also pretty popular even though it is not valid JSON. It's relatively simplicity for +client-side parsing is a strong point in case to pick this format for your Streaming APIs. +Below we demonstrate how to reconfigure the support trait to render the JSON as + +.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala + :snippet: line-by-line-json-response-streaming + +Another interesting feature is parallel marshalling. Since marshalling can potentially take much time, +it is possible to marshal multiple elements of the stream in parallel. This is simply a configuration +option on ``EntityStreamingSupport`` and is configurable like this: .. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala :snippet: async-rendering -The ``renderAsync`` mode perserves ordering of the Source's elements, which may sometimes be a required property, +The above shown mode perserves ordering of the Source's elements, which may sometimes be a required property, for example when streaming a strictly ordered dataset. Sometimes the contept of strict-order does not apply to the -data being streamed though, which allows us to explit this property and use ``renderAsyncUnordered(parallelism)``, -which will concurrently marshall up to ``parallelism`` elements and emit the first which is marshalled onto -the HttpResponse: +data being streamed though, which allows us to exploit this property and use an ``unordered`` rendering. + +This also is a configuration option and is used as shown below. Effectively this will allow Akka's marshalling infrastructure +to concurrently marshallup to ``parallelism`` elements and emit the first which is marshalled onto the ``HttpResponse``: .. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala :snippet: async-unordered-rendering @@ -94,18 +112,25 @@ will be applied automatically thanks to using Akka HTTP/Streams). .. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala :snippet: spray-json-request-streaming -Implementing custom (Un)Marshaller support for JSON streaming -------------------------------------------------------------- +Simple CSV streaming example +---------------------------- -While not provided by Akka HTTP directly, the infrastructure is extensible and by investigating how ``SprayJsonSupport`` -is implemented it is certainly possible to provide the same infrastructure for other marshaller implementations (such as -Play JSON, or Jackson directly for example). Such support traits will want to extend the ``EntityStreamingSupport`` trait. +Akka HTTP provides another ``EntityStreamingSupport`` out of the box, namely ``csv`` (comma-separated values). +For completeness, we demonstrate its usage in the below snippet. As you'll notice, switching betweeen streaming +modes is fairly simple, one only has to make sure that an implicit ``Marshaller`` of the requested type is available, +and that the streaming support operates on the same ``Content-Type`` as the rendered values. Otherwise you'll see +an error during runtime that the marshaller did not expose the expected content type and thus we can not render +the streaming response). -The following types that may need to be implemented by a custom framed-streaming support library are: +.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala + :snippet: csv-example -- ``SourceRenderingMode`` which can customise how to render the begining / between-elements and ending of such stream (while writing a response, i.e. by calling ``complete(source)``). - Implementations for JSON are available in ``akka.http.scaladsl.server.JsonSourceRenderingMode``. -- ``FramingWithContentType`` which is needed to be able to split incoming ``ByteString`` chunks into frames - of the higher-level data type format that is understood by the provided unmarshallers. - In the case of JSON it means chunking up ByteStrings such that each emitted element corresponds to exactly one JSON object, - this framing is implemented in ``EntityStreamingSupport``. +Implementing custom EntityStreamingSupport traits +------------------------------------------------- + +The ``EntityStreamingSupport`` infrastructure is open for extension and not bound to any single format, content type +or marshalling library. The provided JSON support does not rely on Spray JSON directly, but uses ``Marshaller[T, ByteString]`` +instances, which can be provided using any JSON marshalling library (such as Circe, Jawn or Play JSON). + +When implementing a custom support trait, one should simply extend the ``EntityStreamingSupport`` abstract class, +and implement all of it's methods. It's best to use the existing implementations as a guideline. diff --git a/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala b/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala index cc5fe07495..e399ca7706 100644 --- a/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala +++ b/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala @@ -4,11 +4,14 @@ package akka.http.scaladsl.marshallers.sprayjson -import akka.http.scaladsl.marshalling.{ Marshaller, ToByteStringMarshaller, ToEntityMarshaller } +import akka.NotUsed +import akka.http.scaladsl.common.EntityStreamingSupport +import akka.http.scaladsl.marshalling._ import akka.http.scaladsl.model.MediaTypes.`application/json` import akka.http.scaladsl.model.{ HttpCharsets, MediaTypes } -import akka.http.scaladsl.unmarshalling.{ FromEntityUnmarshaller, Unmarshaller } +import akka.http.scaladsl.unmarshalling.{ FromEntityUnmarshaller, FromRequestUnmarshaller, Unmarshaller } import akka.http.scaladsl.util.FastFuture +import akka.stream.scaladsl.{ Flow, Keep, Source } import akka.util.ByteString import spray.json._ @@ -44,7 +47,19 @@ trait SprayJsonSupport { sprayJsValueMarshaller compose writer.write implicit def sprayJsValueMarshaller(implicit printer: JsonPrinter = CompactPrinter): ToEntityMarshaller[JsValue] = Marshaller.StringMarshaller.wrap(MediaTypes.`application/json`)(printer) - implicit def sprayByteStringMarshaller[T](implicit writer: RootJsonFormat[T], printer: JsonPrinter = CompactPrinter): ToByteStringMarshaller[T] = - sprayJsValueMarshaller.map(s ⇒ ByteString(s.toString)) compose writer.write + + // support for as[Source[T, NotUsed]] + implicit def sprayJsonSourceReader[T](implicit rootJsonReader: RootJsonReader[T], support: EntityStreamingSupport): FromRequestUnmarshaller[Source[T, NotUsed]] = + Unmarshaller.withMaterializer { implicit ec ⇒ implicit mat ⇒ r ⇒ + if (support.supported.matches(r.entity.contentType)) { + val bytes = r.entity.dataBytes + val frames = bytes.via(support.framingDecoder) + val unmarshalling = + if (support.unordered) Flow[ByteString].mapAsyncUnordered(support.parallelism)(bs ⇒ sprayJsonByteStringUnmarshaller(rootJsonReader)(bs)) + else Flow[ByteString].mapAsync(support.parallelism)(bs ⇒ sprayJsonByteStringUnmarshaller(rootJsonReader)(bs)) + val elements = frames.viaMat(unmarshalling)(Keep.right) + FastFuture.successful(elements) + } else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(support.supported)) + } } object SprayJsonSupport extends SprayJsonSupport diff --git a/akka-http-tests/src/test/java/akka/http/javadsl/server/JavaTestServer.java b/akka-http-tests/src/test/java/akka/http/javadsl/server/JavaTestServer.java index 70900a9987..761a8e2ba9 100644 --- a/akka-http-tests/src/test/java/akka/http/javadsl/server/JavaTestServer.java +++ b/akka-http-tests/src/test/java/akka/http/javadsl/server/JavaTestServer.java @@ -8,12 +8,13 @@ import akka.actor.ActorSystem; import akka.http.javadsl.ConnectHttp; import akka.http.javadsl.Http; import akka.http.javadsl.ServerBinding; +import akka.http.javadsl.common.EntityStreamingSupport; import akka.http.javadsl.marshallers.jackson.Jackson; -import akka.http.javadsl.model.HttpEntity; import akka.http.javadsl.model.HttpRequest; import akka.http.javadsl.model.HttpResponse; import akka.http.javadsl.model.StatusCodes; -import akka.http.javadsl.common.JsonSourceRenderingModes; +import akka.http.javadsl.unmarshalling.StringUnmarshallers; +import akka.http.javadsl.unmarshalling.Unmarshaller; import akka.stream.ActorMaterializer; import akka.stream.javadsl.Flow; import akka.stream.javadsl.Source; @@ -70,12 +71,12 @@ public class JavaTestServer extends AllDirectives { // or import static Directiv get(() -> parameter(StringUnmarshallers.INTEGER, "n", n -> { final Source tws = Source.repeat(new JavaTweet("Hello World!")).take(n); - return completeOKWithSource(tws, Jackson.marshaller(), JsonSourceRenderingModes.arrayCompact()); + return completeOKWithSource(tws, Jackson.marshaller(), EntityStreamingSupport.json()); }) ).orElse( post(() -> extractMaterializer(mat -> - entityasSourceOf(JavaTweets, null, sourceOfTweets -> { + entityAsSourceOf(JavaTweets, null, sourceOfTweets -> { final CompletionStage tweetsCount = sourceOfTweets.runFold(0, (acc, tweet) -> acc + 1, mat); return onComplete(tweetsCount, c -> complete("Total number of tweets: " + c)); }) diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala index 1512bcb0d9..ea5cb71894 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala @@ -13,8 +13,9 @@ import akka.actor.ActorSystem import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http -import akka.http.scaladsl.common.{ FramingWithContentType, JsonSourceRenderingModes, SourceRenderingMode } +import akka.http.scaladsl.common.EntityStreamingSupport import akka.http.scaladsl.marshalling.ToResponseMarshallable +import spray.json.RootJsonReader import scala.concurrent.duration._ import scala.io.StdIn @@ -30,20 +31,12 @@ object TestServer extends App { import system.dispatcher implicit val materializer = ActorMaterializer() - // --------- json streaming --------- import spray.json.DefaultJsonProtocol._ import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ final case class Tweet(message: String) implicit val tweetFormat = jsonFormat1(Tweet) - // FIXME: Need to be able to support composive framing with content type (!!!!!!!) - import akka.http.scaladsl.server.EntityStreamingSupport._ - /* override if extending EntityStreamingSupport */ - implicit val incomingEntityStreamFraming: FramingWithContentType = bracketCountingJsonFraming(128) - /* override if extending EntityStreamingSupport */ - implicit val outgoingEntityStreamRendering: SourceRenderingMode = JsonSourceRenderingModes.LineByLine - - // --------- end of json streaming --------- + implicit val jsonStreaming = EntityStreamingSupport.json() import ScalaXmlSupport._ import Directives._ @@ -65,13 +58,7 @@ object TestServer extends App { } ~ path("secure") { authenticateBasicPF("My very secure site", auth) { user ⇒ - complete( - Hello - - {user} - - . Access has been granted! - ) + complete( Hello {user}. Access has been granted! ) } } ~ path("ping") { @@ -89,6 +76,14 @@ object TestServer extends App { complete(tweets) } ~ post { + entity(asSourceOf[Tweet]) { tweets ⇒ + onComplete(tweets.runFold(0)({ case (acc, t) => acc + 1 })) { count => + complete(s"Total tweets received: " + count) + } + } + } ~ + put { + // checking the alternative syntax also works: entity(as[Source[Tweet, NotUsed]]) { tweets ⇒ onComplete(tweets.runFold(0)({ case (acc, t) => acc + 1 })) { count => complete(s"Total tweets received: " + count) @@ -103,7 +98,7 @@ object TestServer extends App { val bindingFuture = Http().bindAndHandle(routes, interface = "0.0.0.0", port = 8080) - println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") + println(s"Server online at http://0.0.0.0:8080/\nPress RETURN to stop...") StdIn.readLine() bindingFuture.flatMap(_.unbind()).onComplete(_ ⇒ system.terminate()) diff --git a/akka-http/src/main/scala/akka/http/javadsl/common/CsvSourceRenderingMode.scala b/akka-http/src/main/scala/akka/http/javadsl/common/CsvSourceRenderingMode.scala deleted file mode 100644 index d755dd6bf2..0000000000 --- a/akka-http/src/main/scala/akka/http/javadsl/common/CsvSourceRenderingMode.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (C) 2016 Lightbend Inc. - */ - -package akka.http.javadsl.common - -import akka.http.javadsl.model.ContentType.WithCharset -import akka.http.javadsl.model.ContentTypes -import akka.util.ByteString - -/** - * Specialised rendering mode for streaming elements as CSV. - */ -trait CsvSourceRenderingMode extends SourceRenderingMode { - override val contentType: WithCharset = - ContentTypes.TEXT_CSV_UTF8 -} - -object CsvSourceRenderingModes { - - /** - * Render sequence of values as row-by-row ('\n' separated) series of values. - */ - val create: CsvSourceRenderingMode = - new CsvSourceRenderingMode { - override def between: ByteString = ByteString("\n") - override def end: ByteString = ByteString.empty - override def start: ByteString = ByteString.empty - } - - /** - * Render sequence of values as row-by-row (with custom row separator, - * e.g. if you need to use '\r\n' instead of '\n') series of values. - */ - def custom(rowSeparator: String): CsvSourceRenderingMode = - new CsvSourceRenderingMode { - override def between: ByteString = ByteString(rowSeparator) - override def end: ByteString = ByteString.empty - override def start: ByteString = ByteString.empty - } -} diff --git a/akka-http/src/main/scala/akka/http/javadsl/common/EntityStreamingSupport.scala b/akka-http/src/main/scala/akka/http/javadsl/common/EntityStreamingSupport.scala new file mode 100644 index 0000000000..b2c461f4c8 --- /dev/null +++ b/akka-http/src/main/scala/akka/http/javadsl/common/EntityStreamingSupport.scala @@ -0,0 +1,149 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.http.javadsl.common + +import akka.NotUsed +import akka.http.javadsl.model.{ ContentType, ContentTypeRange } +import akka.http.scaladsl.common +import akka.stream.javadsl.Flow +import akka.util.ByteString + +/** + * Entity streaming support trait allowing rendering and receiving incoming ``Source[T, _]`` from HTTP entities. + * + * See [[JsonEntityStreamingSupport]] or [[CsvEntityStreamingSupport]] for default implementations. + */ +abstract class EntityStreamingSupport { + + /** Read-side, what content types it is able to frame and unmarshall. */ + def supported: ContentTypeRange + /** Write-side, defines what Content-Type the Marshaller should offer and the final Content-Type of the response. */ + def contentType: ContentType + + /** + * Read-side, decode incoming framed entity. + * For example with an incoming JSON array, chunk it up into JSON objects contained within that array. + */ + def getFramingDecoder: Flow[ByteString, ByteString, NotUsed] + /** + * Write-side, apply framing to outgoing entity stream. + * + * Most typical usage will be a variant of `Flow[ByteString].intersperse`. + * + * For example for rendering a JSON array one would return + * `Flow[ByteString].intersperse(ByteString("["), ByteString(","), ByteString("]"))` + * and for rendering a new-line separated CSV simply `Flow[ByteString].intersperse(ByteString("\n"))`. + */ + def getFramingRenderer: Flow[ByteString, ByteString, NotUsed] + + /** + * Read-side, allows changing what content types are accepted by this framing. + * + * EntityStreamingSupport traits MUST support re-configuring the accepted [[ContentTypeRange]]. + * + * This is in order to support a-typical APIs which users still want to communicate with using + * the provided support trait. Typical examples include APIs which return valid `application/json` + * however advertise the content type as being `application/javascript` or vendor specific content types, + * which still parse correctly as JSON, CSV or something else that a provided support trait is built for. + * + * NOTE: Implementations should specialize the return type to their own Type! + */ + def withSupported(range: ContentTypeRange): EntityStreamingSupport + + /** + * Write-side, defines what Content-Type the Marshaller should offer and the final Content-Type of the response. + * + * EntityStreamingSupport traits MUST support re-configuring the offered [[ContentType]]. + * This is due to the need integrating with existing systems which sometimes excpect custom Content-Types, + * however really are just plain JSON or something else internally (perhaps with slight extensions). + * + * NOTE: Implementations should specialize the return type to their own Type! + */ + def withContentType(range: ContentType): EntityStreamingSupport + + /** + * Write-side / read-side, defines if (un)marshalling should be done in parallel. + * + * This may be beneficial marshalling the bottleneck in the pipeline. + * + * See also [[parallelism]] and [[withParallelMarshalling]]. + */ + def parallelism: Int + + /** + * Write-side / read-side, defines if (un)marshalling of incoming stream elements should be perserved or not. + * + * Allowing for parallel and unordered (un)marshalling often yields higher throughput and also allows avoiding + * head-of-line blocking if some elements are much larger than others. + * + * See also [[parallelism]] and [[withParallelMarshalling]]. + */ + def unordered: Boolean + + /** + * Write-side / read-side, defines parallelism and if ordering should be preserved or not of Source element marshalling. + * + * Sometimes marshalling multiple elements at once (esp. when elements are not evenly sized, and ordering is not enforced) + * may yield in higher throughput. + * + * NOTE: Implementations should specialize the return type to their own Type! + */ + def withParallelMarshalling(parallelism: Int, unordered: Boolean): EntityStreamingSupport + +} + +/** + * Entity streaming support, independent of used Json parsing library etc. + */ +object EntityStreamingSupport { + + /** + * Default `application/json` entity streaming support. + * + * Provides framing (based on scanning the incoming dataBytes for valid JSON objects, so for example uploads using arrays or + * new-line separated JSON objects are all parsed correctly) and rendering of Sources as JSON Arrays. + * A different very popular style of returning streaming JSON is to separate JSON objects on a line-by-line basis, + * you can configure the support trait to do so by calling `withFramingRendererFlow`. + * + * Limits the maximum JSON object length to 8KB, if you want to increase this limit provide a value explicitly. + * + * See also https://en.wikipedia.org/wiki/JSON_Streaming + */ + def json(): JsonEntityStreamingSupport = json(8 * 1024) + /** + * Default `application/json` entity streaming support. + * + * Provides framing (based on scanning the incoming dataBytes for valid JSON objects, so for example uploads using arrays or + * new-line separated JSON objects are all parsed correctly) and rendering of Sources as JSON Arrays. + * A different very popular style of returning streaming JSON is to separate JSON objects on a line-by-line basis, + * you can configure the support trait to do so by calling `withFramingRendererFlow`. + * + * See also https://en.wikipedia.org/wiki/JSON_Streaming + */ + def json(maxObjectLength: Int): JsonEntityStreamingSupport = common.EntityStreamingSupport.json(maxObjectLength) + + /** + * Default `text/csv(UTF-8)` entity streaming support. + * Provides framing and rendering of `\n` separated lines and marshalling Sources into such values. + * + * Limits the maximum line-length to 8KB, if you want to increase this limit provide a value explicitly. + */ + def csv(): CsvEntityStreamingSupport = csv(8 * 1024) + /** + * Default `text/csv(UTF-8)` entity streaming support. + * Provides framing and rendering of `\n` separated lines and marshalling Sources into such values. + */ + def csv(maxLineLength: Int): CsvEntityStreamingSupport = common.EntityStreamingSupport.csv(maxLineLength) +} + +// extends Scala base, in order to get linearization right and (as we can't go into traits here, because companion object needed) +abstract class JsonEntityStreamingSupport extends common.EntityStreamingSupport { + def withFramingRendererFlow(flow: Flow[ByteString, ByteString, NotUsed]): JsonEntityStreamingSupport +} + +// extends Scala base, in order to get linearization right and (as we can't go into traits here, because companion object needed) +abstract class CsvEntityStreamingSupport extends common.EntityStreamingSupport { + def withFramingRendererFlow(flow: Flow[ByteString, ByteString, NotUsed]): CsvEntityStreamingSupport +} diff --git a/akka-http/src/main/scala/akka/http/javadsl/common/FramingWithContentType.scala b/akka-http/src/main/scala/akka/http/javadsl/common/FramingWithContentType.scala deleted file mode 100644 index 483b3d624f..0000000000 --- a/akka-http/src/main/scala/akka/http/javadsl/common/FramingWithContentType.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (C) 2016 Lightbend Inc. - */ - -package akka.http.javadsl.common - -import akka.NotUsed -import akka.event.Logging -import akka.http.javadsl.model.ContentTypeRange -import akka.stream.javadsl.{ Flow, Framing } -import akka.util.ByteString - -/** - * Wraps a framing [[akka.stream.javadsl.Flow]] (as provided by [[Framing]] for example) - * that chunks up incoming [[akka.util.ByteString]] according to some [[akka.http.javadsl.model.ContentType]] - * specific logic. - */ -abstract class FramingWithContentType { self ⇒ - import akka.http.impl.util.JavaMapping.Implicits._ - - def getFlow: Flow[ByteString, ByteString, NotUsed] - - def asScala: akka.http.scaladsl.common.FramingWithContentType = - this match { - case f: akka.http.scaladsl.common.FramingWithContentType ⇒ f - case _ ⇒ new akka.http.scaladsl.common.FramingWithContentType { - override def flow = self.getFlow.asScala - override def supported = self.supported.asScala - } - } - - def supported: ContentTypeRange - def matches(ct: akka.http.javadsl.model.ContentType): Boolean = supported.matches(ct) - - override def toString = s"${Logging.simpleName(getClass)}($supported)" -} diff --git a/akka-http/src/main/scala/akka/http/javadsl/common/JsonSourceRenderingMode.scala b/akka-http/src/main/scala/akka/http/javadsl/common/JsonSourceRenderingMode.scala deleted file mode 100644 index 7d4ca7b727..0000000000 --- a/akka-http/src/main/scala/akka/http/javadsl/common/JsonSourceRenderingMode.scala +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright (C) 2016 Lightbend Inc. - */ - -package akka.http.javadsl.common - -import akka.http.javadsl.model.{ ContentType, ContentTypes } - -/** - * Specialised rendering mode for streaming elements as JSON. - * - * See also: JSON Streaming on Wikipedia. - * - * See [[JsonSourceRenderingModes]] for commonly used pre-defined rendering modes. - */ -trait JsonSourceRenderingMode extends SourceRenderingMode { - override val contentType: ContentType.WithFixedCharset = - ContentTypes.APPLICATION_JSON -} - -/** - * Provides default JSON rendering modes. - */ -object JsonSourceRenderingModes { - - /** - * Most compact rendering mode. - * It does not intersperse any separator between the signalled elements. - * - * It can be used with [[akka.stream.javadsl.JsonFraming.bracketCounting]]. - * - * {{{ - * {"id":42}{"id":43}{"id":44} - * }}} - */ - val compact = akka.http.scaladsl.common.JsonSourceRenderingModes.Compact - - /** - * Simple rendering mode, similar to [[compact]] however interspersing elements with a `\n` character. - * - * {{{ - * {"id":42},{"id":43},{"id":44} - * }}} - */ - val compactCommaSeparated = akka.http.scaladsl.common.JsonSourceRenderingModes.CompactCommaSeparated - - /** - * Rendering mode useful when the receiving end expects a valid JSON Array. - * It can be useful when the client wants to detect when the stream has been successfully received in-full, - * which it can determine by seeing the terminating `]` character. - * - * The framing's terminal `]` will ONLY be emitted if the stream has completed successfully, - * in other words - the stream has been emitted completely, without errors occuring before the final element has been signaled. - * - * {{{ - * [{"id":42},{"id":43},{"id":44}] - * }}} - */ - val arrayCompact = akka.http.scaladsl.common.JsonSourceRenderingModes.ArrayCompact - - /** - * Rendering mode useful when the receiving end expects a valid JSON Array. - * It can be useful when the client wants to detect when the stream has been successfully received in-full, - * which it can determine by seeing the terminating `]` character. - * - * The framing's terminal `]` will ONLY be emitted if the stream has completed successfully, - * in other words - the stream has been emitted completely, without errors occuring before the final element has been signaled. - * - * {{{ - * [{"id":42}, - * {"id":43}, - * {"id":44}] - * }}} - */ - val arrayLineByLine = akka.http.scaladsl.common.JsonSourceRenderingModes.ArrayLineByLine - - /** - * Recommended rendering mode. - * - * It is a nice balance between valid and human-readable as well as resonably small size overhead (just the `\n` between elements). - * A good example of API's using this syntax is Twitter's Firehose (last verified at 1.1 version of that API). - * - * {{{ - * {"id":42} - * {"id":43} - * {"id":44} - * }}} - */ - val lineByLine = akka.http.scaladsl.common.JsonSourceRenderingModes.LineByLine - - /** - * Simple rendering mode interspersing each pair of elements with both `,\n`. - * Picking the [[lineByLine]] format may be preferable, as it is slightly simpler to parse - each line being a valid json object (no need to trim the comma). - * - * {{{ - * {"id":42}, - * {"id":43}, - * {"id":44} - * }}} - */ - val lineByLineCommaSeparated = akka.http.scaladsl.common.JsonSourceRenderingModes.LineByLineCommaSeparated - -} diff --git a/akka-http/src/main/scala/akka/http/javadsl/common/SourceRenderingMode.scala b/akka-http/src/main/scala/akka/http/javadsl/common/SourceRenderingMode.scala deleted file mode 100644 index 5144f336f6..0000000000 --- a/akka-http/src/main/scala/akka/http/javadsl/common/SourceRenderingMode.scala +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright (C) 2016 Lightbend Inc. - */ - -package akka.http.javadsl.common - -import akka.http.javadsl.model.ContentType -import akka.util.ByteString - -/** - * Defines how to render a [[akka.stream.javadsl.Source]] into a raw [[ByteString]] - * output. - * - * This can be used to render a source into an [[akka.http.scaladsl.model.HttpEntity]]. - */ -trait SourceRenderingMode { - def contentType: ContentType - - def start: ByteString - def between: ByteString - def end: ByteString -} diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/EntityStreamingSupport.scala b/akka-http/src/main/scala/akka/http/javadsl/server/EntityStreamingSupport.scala deleted file mode 100644 index 571867ac63..0000000000 --- a/akka-http/src/main/scala/akka/http/javadsl/server/EntityStreamingSupport.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (C) 2016 Lightbend Inc. - */ - -package akka.http.javadsl.server - -import akka.NotUsed -import akka.http.javadsl.common.{ FramingWithContentType, SourceRenderingMode } -import akka.http.javadsl.model.{ ContentTypeRange, MediaRanges } -import akka.http.scaladsl.server.ApplicationJsonBracketCountingFraming -import akka.stream.javadsl.{ Flow, Framing } -import akka.util.ByteString - -/** - * Entity streaming support, independent of used Json parsing library etc. - * - * Can be extended by various Support traits (e.g. "SprayJsonSupport"), - * in order to provide users with both `framing` (this trait) and `marshalling` - * (implemented by a library) by using a single trait. - */ -object EntityStreamingSupport { - // in the ScalaDSL version we make users implement abstract methods that are supposed to be - // implicit vals. This helps to guide in implementing the needed values, however in Java that would not really help. - - /** `application/json` specific Framing implementation */ - def bracketCountingJsonFraming(maximumObjectLength: Int): FramingWithContentType = - new ApplicationJsonBracketCountingFraming(maximumObjectLength) - - /** - * Frames incoming `text / *` entities on a line-by-line basis. - * Useful for accepting `text/csv` uploads as a stream of rows. - */ - def newLineFraming(maximumObjectLength: Int, supportedContentTypes: ContentTypeRange): FramingWithContentType = - new FramingWithContentType { - override final val getFlow: Flow[ByteString, ByteString, NotUsed] = - Flow.of(classOf[ByteString]).via(Framing.delimiter(ByteString("\n"), maximumObjectLength)) - - override final val supported: ContentTypeRange = - akka.http.scaladsl.model.ContentTypeRange(akka.http.scaladsl.model.MediaRanges.`text/*`) - } -} diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/RoutingJavaMapping.scala b/akka-http/src/main/scala/akka/http/javadsl/server/RoutingJavaMapping.scala index dda99cbb75..ebb6eeb313 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/server/RoutingJavaMapping.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/server/RoutingJavaMapping.scala @@ -8,11 +8,13 @@ import java.util.concurrent.CompletionStage import akka.http.impl.util.JavaMapping._ import akka.http.impl.util._ +import akka.http.javadsl.common.EntityStreamingSupport import akka.http.{ javadsl, scaladsl } import akka.http.scaladsl.server.{ directives ⇒ sdirectives } import akka.http.scaladsl.{ common ⇒ scommon } import akka.http.javadsl.server.{ directives ⇒ jdirectives } import akka.http.javadsl.{ common ⇒ jcommon } + import scala.collection.immutable /** @@ -45,7 +47,7 @@ private[http] object RoutingJavaMapping { } implicit object convertRouteResult extends Inherited[javadsl.server.RouteResult, scaladsl.server.RouteResult] - implicit object convertSourceRenderingMode extends Inherited[jcommon.SourceRenderingMode, scommon.SourceRenderingMode] + implicit object convertEntityStreamingSupport extends Inherited[EntityStreamingSupport, scommon.EntityStreamingSupport] implicit object convertDirectoryRenderer extends Inherited[jdirectives.DirectoryRenderer, sdirectives.FileAndResourceDirectives.DirectoryRenderer] implicit object convertContentTypeResolver extends Inherited[jdirectives.ContentTypeResolver, sdirectives.ContentTypeResolver] diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/directives/FramedEntityStreamingDirectives.scala b/akka-http/src/main/scala/akka/http/javadsl/server/directives/FramedEntityStreamingDirectives.scala index 153ee0601e..eef4a3d1f9 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/server/directives/FramedEntityStreamingDirectives.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/server/directives/FramedEntityStreamingDirectives.scala @@ -7,10 +7,12 @@ import java.util.function.{ Function ⇒ JFunction } import java.util.{ List ⇒ JList, Map ⇒ JMap } import akka.NotUsed -import akka.http.javadsl.common.{ FramingWithContentType, SourceRenderingMode } +import akka.http.javadsl.common.EntityStreamingSupport +import akka.http.javadsl.marshalling.Marshaller import akka.http.javadsl.model.{ HttpEntity, _ } -import akka.http.javadsl.server.{ Marshaller, Route, Unmarshaller } -import akka.http.scaladsl.marshalling.ToResponseMarshallable +import akka.http.javadsl.server.Route +import akka.http.javadsl.unmarshalling.Unmarshaller +import akka.http.scaladsl.marshalling.{ Marshalling, ToByteStringMarshaller, ToResponseMarshallable } import akka.http.scaladsl.server.{ Directives ⇒ D } import akka.stream.javadsl.Source import akka.util.ByteString @@ -18,56 +20,39 @@ import akka.util.ByteString /** EXPERIMENTAL API */ abstract class FramedEntityStreamingDirectives extends TimeoutDirectives { + import akka.http.javadsl.server.RoutingJavaMapping._ + import akka.http.javadsl.server.RoutingJavaMapping.Implicits._ + @CorrespondsTo("asSourceOf") - def entityasSourceOf[T](um: Unmarshaller[ByteString, T], framing: FramingWithContentType, + def entityAsSourceOf[T](um: Unmarshaller[ByteString, T], support: EntityStreamingSupport, inner: java.util.function.Function[Source[T, NotUsed], Route]): Route = RouteAdapter { - D.entity(D.asSourceOf[T](framing.asScala)(um.asScala)) { s: akka.stream.scaladsl.Source[T, NotUsed] ⇒ + val umm = D.asSourceOf(um.asScala, support.asScala) + D.entity(umm) { s: akka.stream.scaladsl.Source[T, NotUsed] ⇒ inner(s.asJava).delegate } } - @CorrespondsTo("asSourceOfAsync") - def entityAsSourceAsyncOf[T]( - parallelism: Int, - um: Unmarshaller[ByteString, T], framing: FramingWithContentType, - inner: java.util.function.Function[Source[T, NotUsed], Route]): Route = RouteAdapter { - D.entity(D.asSourceOfAsync[T](parallelism, framing.asScala)(um.asScala)) { s: akka.stream.scaladsl.Source[T, NotUsed] ⇒ - inner(s.asJava).delegate - } - } - - @CorrespondsTo("asSourceOfAsyncUnordered") - def entityAsSourceAsyncUnorderedOf[T]( - parallelism: Int, - um: Unmarshaller[ByteString, T], framing: FramingWithContentType, - inner: java.util.function.Function[Source[T, NotUsed], Route]): Route = RouteAdapter { - D.entity(D.asSourceOfAsyncUnordered[T](parallelism, framing.asScala)(um.asScala)) { s: akka.stream.scaladsl.Source[T, NotUsed] ⇒ - inner(s.asJava).delegate - } - } - - // implicits used internally, Java caller does not benefit or use it + // implicits and multiple parameter lists used internally, Java caller does not benefit or use it @CorrespondsTo("complete") - def completeWithSource[T, M](implicit source: Source[T, M], m: Marshaller[T, ByteString], rendering: SourceRenderingMode): Route = RouteAdapter { - implicit val mm = _sourceMarshaller(m.map(ByteStringAsEntityFn), rendering) - val response = ToResponseMarshallable(source) + def completeWithSource[T, M](source: Source[T, M])(implicit m: Marshaller[T, ByteString], support: EntityStreamingSupport): Route = RouteAdapter { + import akka.http.scaladsl.marshalling.PredefinedToResponseMarshallers._ + val mm = m.map(ByteStringAsEntityFn).asScalaCastOutput[akka.http.scaladsl.model.RequestEntity] + val mmm = fromEntityStreamingSupportAndEntityMarshaller[T, M](support.asScala, mm) + val response = ToResponseMarshallable(source.asScala)(mmm) D.complete(response) } + // implicits and multiple parameter lists used internally, Java caller does not benefit or use it @CorrespondsTo("complete") - def completeOKWithSource[T, M](implicit source: Source[T, M], m: Marshaller[T, RequestEntity], rendering: SourceRenderingMode): Route = RouteAdapter { - implicit val mm = _sourceMarshaller[T, M](m, rendering) - val response = ToResponseMarshallable(source) + def completeOKWithSource[T, M](source: Source[T, M])(implicit m: Marshaller[T, RequestEntity], support: EntityStreamingSupport): Route = RouteAdapter { + import akka.http.scaladsl.marshalling.PredefinedToResponseMarshallers._ + // don't try this at home: + val mm = m.asScalaCastOutput[akka.http.scaladsl.model.RequestEntity].map(_.httpEntity.asInstanceOf[akka.http.scaladsl.model.RequestEntity]) + implicit val mmm = fromEntityStreamingSupportAndEntityMarshaller[T, M](support.asScala, mm) + val response = ToResponseMarshallable(source.asScala) D.complete(response) } - implicit private def _sourceMarshaller[T, M](implicit m: Marshaller[T, HttpEntity], rendering: SourceRenderingMode) = { - import akka.http.javadsl.server.RoutingJavaMapping.Implicits._ - import akka.http.javadsl.server.RoutingJavaMapping._ - val mm = m.asScalaCastOutput - D._sourceMarshaller[T, M](mm, rendering.asScala).compose({ h: akka.stream.javadsl.Source[T, M] ⇒ h.asScala }) - } - private[this] val ByteStringAsEntityFn = new java.util.function.Function[ByteString, HttpEntity]() { override def apply(bs: ByteString): HttpEntity = HttpEntities.create(bs) } diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/directives/FutureDirectives.scala b/akka-http/src/main/scala/akka/http/javadsl/server/directives/FutureDirectives.scala index 50a0cc75ef..f89ac51563 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/server/directives/FutureDirectives.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/server/directives/FutureDirectives.scala @@ -23,7 +23,7 @@ abstract class FutureDirectives extends FormFieldDirectives { /** * "Unwraps" a `CompletionStage` and runs the inner route after future * completion with the future's value as an extraction of type `Try`. - * + * * @group future */ def onComplete[T](f: Supplier[CompletionStage[T]], inner: JFunction[Try[T], Route]) = RouteAdapter { @@ -35,7 +35,7 @@ abstract class FutureDirectives extends FormFieldDirectives { /** * "Unwraps" a `CompletionStage` and runs the inner route after future * completion with the future's value as an extraction of type `Try`. - * + * * @group future */ def onComplete[T](cs: CompletionStage[T], inner: JFunction[Try[T], Route]) = RouteAdapter { @@ -65,7 +65,7 @@ abstract class FutureDirectives extends FormFieldDirectives { * completion with the stage's value as an extraction of type `T`. * If the stage fails its failure Throwable is bubbled up to the nearest * ExceptionHandler. - * + * * @group future */ def onSuccess[T](f: Supplier[CompletionStage[T]], inner: JFunction[T, Route]) = RouteAdapter { @@ -80,7 +80,7 @@ abstract class FutureDirectives extends FormFieldDirectives { * If the completion stage succeeds the request is completed using the values marshaller * (This directive therefore requires a marshaller for the completion stage value type to be * provided.) - * + * * @group future */ def completeOrRecoverWith[T](f: Supplier[CompletionStage[T]], marshaller: Marshaller[T, RequestEntity], inner: JFunction[Throwable, Route]): Route = RouteAdapter { diff --git a/akka-http/src/main/scala/akka/http/scaladsl/common/CsvEntityStreamingSupport.scala b/akka-http/src/main/scala/akka/http/scaladsl/common/CsvEntityStreamingSupport.scala new file mode 100644 index 0000000000..536a86f003 --- /dev/null +++ b/akka-http/src/main/scala/akka/http/scaladsl/common/CsvEntityStreamingSupport.scala @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.http.scaladsl.common + +import akka.NotUsed +import akka.event.Logging +import akka.http.javadsl.{ common, model ⇒ jm } +import akka.http.scaladsl.model.{ ContentType, ContentTypeRange, ContentTypes } +import akka.stream.scaladsl.{ Flow, Framing } +import akka.util.ByteString + +final class CsvEntityStreamingSupport private[akka] ( + maxLineLength: Int, + val supported: ContentTypeRange, + val contentType: ContentType, + val framingRenderer: Flow[ByteString, ByteString, NotUsed], + val parallelism: Int, + val unordered: Boolean +) extends common.CsvEntityStreamingSupport { + import akka.http.impl.util.JavaMapping.Implicits._ + + def this(maxObjectSize: Int) = + this( + maxObjectSize, + ContentTypeRange(ContentTypes.`text/csv(UTF-8)`), + ContentTypes.`text/csv(UTF-8)`, + Flow[ByteString].intersperse(ByteString("\n")), + 1, false) + + override val framingDecoder: Flow[ByteString, ByteString, NotUsed] = + Framing.delimiter(ByteString("\n"), maxLineLength) + + override def withFramingRendererFlow(framingRendererFlow: akka.stream.javadsl.Flow[ByteString, ByteString, NotUsed]): CsvEntityStreamingSupport = + withFramingRenderer(framingRendererFlow.asScala) + def withFramingRenderer(framingRendererFlow: Flow[ByteString, ByteString, NotUsed]): CsvEntityStreamingSupport = + new CsvEntityStreamingSupport(maxLineLength, supported, contentType, framingRendererFlow, parallelism, unordered) + + override def withContentType(ct: jm.ContentType): CsvEntityStreamingSupport = + new CsvEntityStreamingSupport(maxLineLength, supported, ct.asScala, framingRenderer, parallelism, unordered) + override def withSupported(range: jm.ContentTypeRange): CsvEntityStreamingSupport = + new CsvEntityStreamingSupport(maxLineLength, range.asScala, contentType, framingRenderer, parallelism, unordered) + override def withParallelMarshalling(parallelism: Int, unordered: Boolean): CsvEntityStreamingSupport = + new CsvEntityStreamingSupport(maxLineLength, supported, contentType, framingRenderer, parallelism, unordered) + + override def toString = s"""${Logging.simpleName(getClass)}($maxLineLength, $supported, $contentType)""" +} diff --git a/akka-http/src/main/scala/akka/http/scaladsl/common/EntityStreamingSupport.scala b/akka-http/src/main/scala/akka/http/scaladsl/common/EntityStreamingSupport.scala new file mode 100644 index 0000000000..aea219666f --- /dev/null +++ b/akka-http/src/main/scala/akka/http/scaladsl/common/EntityStreamingSupport.scala @@ -0,0 +1,141 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.http.scaladsl.common + +import akka.NotUsed +import akka.http.javadsl.{ common, model ⇒ jm } +import akka.http.scaladsl.model._ +import akka.stream.scaladsl.Flow +import akka.util.ByteString + +/** + * Entity streaming support trait allowing rendering and receiving incoming ``Source[T, _]`` from HTTP entities. + * + * See [[JsonEntityStreamingSupport]] or [[CsvEntityStreamingSupport]] for default implementations. + */ +abstract class EntityStreamingSupport extends common.EntityStreamingSupport { + /** Read-side, what content types it is able to frame and unmarshall. */ + def supported: ContentTypeRange + /** Write-side, defines what Content-Type the Marshaller should offer and the final Content-Type of the response. */ + def contentType: ContentType + + /** + * Read-side, decode incoming framed entity. + * For example with an incoming JSON array, chunk it up into JSON objects contained within that array. + */ + def framingDecoder: Flow[ByteString, ByteString, NotUsed] + override final def getFramingDecoder = framingDecoder.asJava + + /** + * Write-side, apply framing to outgoing entity stream. + * + * Most typical usage will be a variant of `Flow[ByteString].intersperse`. + * + * For example for rendering a JSON array one would return + * `Flow[ByteString].intersperse(ByteString("["), ByteString(","), ByteString("]"))` + * and for rendering a new-line separated CSV simply `Flow[ByteString].intersperse(ByteString("\n"))`. + */ + def framingRenderer: Flow[ByteString, ByteString, NotUsed] + override final def getFramingRenderer = framingRenderer.asJava + + /** + * Read-side, allows changing what content types are accepted by this framing. + * + * EntityStreamingSupport traits MUST support re-configuring the accepted [[ContentTypeRange]]. + * + * This is in order to support a-typical APIs which users still want to communicate with using + * the provided support trait. Typical examples include APIs which return valid `application/json` + * however advertise the content type as being `application/javascript` or vendor specific content types, + * which still parse correctly as JSON, CSV or something else that a provided support trait is built for. + * + * NOTE: Implementations should specialize the return type to their own Type! + */ + override def withSupported(range: jm.ContentTypeRange): EntityStreamingSupport + + /** + * Write-side, defines what Content-Type the Marshaller should offer and the final Content-Type of the response. + * + * EntityStreamingSupport traits MUST support re-configuring the offered [[ContentType]]. + * This is due to the need integrating with existing systems which sometimes excpect custom Content-Types, + * however really are just plain JSON or something else internally (perhaps with slight extensions). + * + * NOTE: Implementations should specialize the return type to their own Type! + */ + override def withContentType(range: jm.ContentType): EntityStreamingSupport + + /** + * Write-side / read-side, defines if (un)marshalling should be done in parallel. + * + * This may be beneficial marshalling the bottleneck in the pipeline. + * + * See also [[parallelism]] and [[withParallelMarshalling]]. + */ + def parallelism: Int + + /** + * Write-side / read-side, defines if (un)marshalling of incoming stream elements should be perserved or not. + * + * Allowing for parallel and unordered (un)marshalling often yields higher throughput and also allows avoiding + * head-of-line blocking if some elements are much larger than others. + * + * See also [[parallelism]] and [[withParallelMarshalling]]. + */ + def unordered: Boolean + + /** + * Write-side / read-side, defines parallelism and if ordering should be preserved or not of Source element marshalling. + * + * Sometimes marshalling multiple elements at once (esp. when elements are not evenly sized, and ordering is not enforced) + * may yield in higher throughput. + * + * NOTE: Implementations should specialize the return type to their own Type! + */ + def withParallelMarshalling(parallelism: Int, unordered: Boolean): EntityStreamingSupport + +} + +/** + * Entity streaming support, independent of used Json parsing library etc. + */ +object EntityStreamingSupport { + + /** + * Default `application/json` entity streaming support. + * + * Provides framing (based on scanning the incoming dataBytes for valid JSON objects, so for example uploads using arrays or + * new-line separated JSON objects are all parsed correctly) and rendering of Sources as JSON Arrays. + * A different very popular style of returning streaming JSON is to separate JSON objects on a line-by-line basis, + * you can configure the support trait to do so by calling `withFramingRendererFlow`. + * + * Limits the maximum JSON object length to 8KB, if you want to increase this limit provide a value explicitly. + * + * See also https://en.wikipedia.org/wiki/JSON_Streaming + */ + def json(): JsonEntityStreamingSupport = json(8 * 1024) + /** + * Default `application/json` entity streaming support. + * + * Provides framing (based on scanning the incoming dataBytes for valid JSON objects, so for example uploads using arrays or + * new-line separated JSON objects are all parsed correctly) and rendering of Sources as JSON Arrays. + * A different very popular style of returning streaming JSON is to separate JSON objects on a line-by-line basis, + * you can configure the support trait to do so by calling `withFramingRendererFlow`. + * + * See also https://en.wikipedia.org/wiki/JSON_Streaming + */ + def json(maxObjectLength: Int): JsonEntityStreamingSupport = new JsonEntityStreamingSupport(maxObjectLength) + + /** + * Default `text/csv(UTF-8)` entity streaming support. + * Provides framing and rendering of `\n` separated lines and marshalling Sources into such values. + * + * Limits the maximum line-length to 8KB, if you want to increase this limit provide a value explicitly. + */ + def csv(): CsvEntityStreamingSupport = csv(8 * 1024) + /** + * Default `text/csv(UTF-8)` entity streaming support. + * Provides framing and rendering of `\n` separated lines and marshalling Sources into such values. + */ + def csv(maxLineLength: Int): CsvEntityStreamingSupport = new CsvEntityStreamingSupport(maxLineLength) +} + diff --git a/akka-http/src/main/scala/akka/http/scaladsl/common/FramingWithContentType.scala b/akka-http/src/main/scala/akka/http/scaladsl/common/FramingWithContentType.scala deleted file mode 100644 index 13dc72ecba..0000000000 --- a/akka-http/src/main/scala/akka/http/scaladsl/common/FramingWithContentType.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (C) 2016 Lightbend Inc. - */ - -package akka.http.scaladsl.common - -import akka.NotUsed -import akka.event.Logging -import akka.http.scaladsl.model.ContentTypeRange -import akka.stream.scaladsl.{ Flow, Framing } -import akka.util.ByteString - -/** - * Wraps a framing [[akka.stream.scaladsl.Flow]] (as provided by [[Framing]] for example) - * that chunks up incoming [[akka.util.ByteString]] according to some [[akka.http.javadsl.model.ContentType]] - * specific logic. - */ -abstract class FramingWithContentType extends akka.http.javadsl.common.FramingWithContentType { - def flow: Flow[ByteString, ByteString, NotUsed] - override final def getFlow = flow.asJava - override def supported: ContentTypeRange - override def matches(ct: akka.http.javadsl.model.ContentType): Boolean = supported.matches(ct) - - override def toString = s"${Logging.simpleName(getClass)}($supported)" -} diff --git a/akka-http/src/main/scala/akka/http/scaladsl/common/JsonEntityStreamingSupport.scala b/akka-http/src/main/scala/akka/http/scaladsl/common/JsonEntityStreamingSupport.scala new file mode 100644 index 0000000000..743f5fd8aa --- /dev/null +++ b/akka-http/src/main/scala/akka/http/scaladsl/common/JsonEntityStreamingSupport.scala @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.http.scaladsl.common + +import akka.NotUsed +import akka.event.Logging +import akka.http.javadsl.{ common, model ⇒ jm } +import akka.http.scaladsl.model.{ ContentType, ContentTypeRange, ContentTypes } +import akka.stream.scaladsl.Flow +import akka.util.ByteString + +final class JsonEntityStreamingSupport private[akka] ( + maxObjectSize: Int, + val supported: ContentTypeRange, + val contentType: ContentType, + val framingRenderer: Flow[ByteString, ByteString, NotUsed], + val parallelism: Int, + val unordered: Boolean +) extends common.JsonEntityStreamingSupport { + import akka.http.impl.util.JavaMapping.Implicits._ + + def this(maxObjectSize: Int) = + this( + maxObjectSize, + ContentTypeRange(ContentTypes.`application/json`), + ContentTypes.`application/json`, + Flow[ByteString].intersperse(ByteString("["), ByteString(","), ByteString("]")), + 1, false) + + override val framingDecoder: Flow[ByteString, ByteString, NotUsed] = + akka.stream.scaladsl.JsonFraming.objectScanner(maxObjectSize) + + override def withFramingRendererFlow(framingRendererFlow: akka.stream.javadsl.Flow[ByteString, ByteString, NotUsed]): JsonEntityStreamingSupport = + withFramingRenderer(framingRendererFlow.asScala) + def withFramingRenderer(framingRendererFlow: Flow[ByteString, ByteString, NotUsed]): JsonEntityStreamingSupport = + new JsonEntityStreamingSupport(maxObjectSize, supported, contentType, framingRendererFlow, parallelism, unordered) + + override def withContentType(ct: jm.ContentType): JsonEntityStreamingSupport = + new JsonEntityStreamingSupport(maxObjectSize, supported, ct.asScala, framingRenderer, parallelism, unordered) + override def withSupported(range: jm.ContentTypeRange): JsonEntityStreamingSupport = + new JsonEntityStreamingSupport(maxObjectSize, range.asScala, contentType, framingRenderer, parallelism, unordered) + override def withParallelMarshalling(parallelism: Int, unordered: Boolean): JsonEntityStreamingSupport = + new JsonEntityStreamingSupport(maxObjectSize, supported, contentType, framingRenderer, parallelism, unordered) + + override def toString = s"""${Logging.simpleName(getClass)}($maxObjectSize, $supported, $contentType)""" + +} diff --git a/akka-http/src/main/scala/akka/http/scaladsl/common/JsonSourceRenderingMode.scala b/akka-http/src/main/scala/akka/http/scaladsl/common/JsonSourceRenderingMode.scala deleted file mode 100644 index 824af23b8c..0000000000 --- a/akka-http/src/main/scala/akka/http/scaladsl/common/JsonSourceRenderingMode.scala +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright (C) 2016 Lightbend Inc. - */ - -package akka.http.scaladsl.common - -import akka.http.scaladsl.model.{ ContentType, ContentTypes } -import akka.util.ByteString - -/** - * Specialised rendering mode for streaming elements as JSON. - * - * See also: JSON Streaming on Wikipedia. - * - * See [[JsonSourceRenderingModes]] for commonly used pre-defined rendering modes. - */ -trait JsonSourceRenderingMode extends akka.http.javadsl.common.JsonSourceRenderingMode with SourceRenderingMode { - override val contentType: ContentType.WithFixedCharset = - ContentTypes.`application/json` -} - -/** - * Provides default JSON rendering modes. - */ -object JsonSourceRenderingModes { - - /** - * Most compact rendering mode. - * It does not intersperse any separator between the signalled elements. - * - * It is the most compact form to render JSON and can be framed properly by using [[akka.stream.javadsl.JsonFraming.bracketCounting]]. - * - * {{{ - * {"id":42}{"id":43}{"id":44} - * }}} - */ - object Compact extends JsonSourceRenderingMode { - override val start: ByteString = ByteString.empty - override val between: ByteString = ByteString.empty - override val end: ByteString = ByteString.empty - } - - /** - * Simple rendering mode, similar to [[Compact]] however interspersing elements with a `\n` character. - * - * {{{ - * {"id":42},{"id":43},{"id":44} - * }}} - */ - object CompactCommaSeparated extends JsonSourceRenderingMode { - override val start: ByteString = ByteString.empty - override val between: ByteString = ByteString(",") - override val end: ByteString = ByteString.empty - } - - /** - * Rendering mode useful when the receiving end expects a valid JSON Array. - * It can be useful when the client wants to detect when the stream has been successfully received in-full, - * which it can determine by seeing the terminating `]` character. - * - * The framing's terminal `]` will ONLY be emitted if the stream has completed successfully, - * in other words - the stream has been emitted completely, without errors occuring before the final element has been signaled. - * - * {{{ - * [{"id":42},{"id":43},{"id":44}] - * }}} - */ - object ArrayCompact extends JsonSourceRenderingMode { - override val start: ByteString = ByteString("[") - override val between: ByteString = ByteString(",") - override val end: ByteString = ByteString("]") - } - - /** - * Rendering mode useful when the receiving end expects a valid JSON Array. - * It can be useful when the client wants to detect when the stream has been successfully received in-full, - * which it can determine by seeing the terminating `]` character. - * - * The framing's terminal `]` will ONLY be emitted if the stream has completed successfully, - * in other words - the stream has been emitted completely, without errors occuring before the final element has been signaled. - * - * {{{ - * [{"id":42}, - * {"id":43}, - * {"id":44}] - * }}} - */ - object ArrayLineByLine extends JsonSourceRenderingMode { - override val start: ByteString = ByteString("[") - override val between: ByteString = ByteString(",\n") - override val end: ByteString = ByteString("]") - } - - /** - * Recommended rendering mode. - * - * It is a nice balance between valid and human-readable as well as resonably small size overhead (just the `\n` between elements). - * A good example of API's using this syntax is Twitter's Firehose (last verified at 1.1 version of that API). - * - * {{{ - * {"id":42} - * {"id":43} - * {"id":44} - * }}} - */ - object LineByLine extends JsonSourceRenderingMode { - override val start: ByteString = ByteString.empty - override val between: ByteString = ByteString("\n") - override val end: ByteString = ByteString.empty - } - - /** - * Simple rendering mode interspersing each pair of elements with both `,\n`. - * Picking the [[LineByLine]] format may be preferable, as it is slightly simpler to parse - each line being a valid json object (no need to trim the comma). - * - * {{{ - * {"id":42}, - * {"id":43}, - * {"id":44} - * }}} - */ - object LineByLineCommaSeparated extends JsonSourceRenderingMode { - override val start: ByteString = ByteString.empty - override val between: ByteString = ByteString(",\n") - override val end: ByteString = ByteString.empty - } - -} diff --git a/akka-http/src/main/scala/akka/http/scaladsl/common/SourceRenderingMode.scala b/akka-http/src/main/scala/akka/http/scaladsl/common/SourceRenderingMode.scala deleted file mode 100644 index 61abd85144..0000000000 --- a/akka-http/src/main/scala/akka/http/scaladsl/common/SourceRenderingMode.scala +++ /dev/null @@ -1,11 +0,0 @@ -/* - * Copyright (C) 2016 Lightbend Inc. - */ - -package akka.http.scaladsl.common - -import akka.http.scaladsl.model.ContentType - -trait SourceRenderingMode extends akka.http.javadsl.common.SourceRenderingMode { - override def contentType: ContentType -} diff --git a/akka-http/src/main/scala/akka/http/scaladsl/marshalling/PredefinedToResponseMarshallers.scala b/akka-http/src/main/scala/akka/http/scaladsl/marshalling/PredefinedToResponseMarshallers.scala index 62777103a2..35bc19f3d0 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/marshalling/PredefinedToResponseMarshallers.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/marshalling/PredefinedToResponseMarshallers.scala @@ -4,12 +4,19 @@ package akka.http.scaladsl.marshalling +import akka.http.scaladsl.common.EntityStreamingSupport import akka.stream.impl.ConstantFun import scala.collection.immutable import akka.http.scaladsl.util.FastFuture._ import akka.http.scaladsl.model.MediaTypes._ import akka.http.scaladsl.model._ +import akka.http.scaladsl.server.ContentNegotiator +import akka.http.scaladsl.util.FastFuture +import akka.stream.scaladsl.Source +import akka.util.ByteString + +import scala.language.higherKinds trait PredefinedToResponseMarshallers extends LowPriorityToResponseMarshallerImplicits { @@ -41,6 +48,37 @@ trait PredefinedToResponseMarshallers extends LowPriorityToResponseMarshallerImp Marshaller(implicit ec ⇒ { case (status, headers, value) ⇒ mt(value).fast map (_ map (_ map (HttpResponse(status, headers, _)))) }) + + implicit def fromEntityStreamingSupportAndByteStringMarshaller[T, M](implicit s: EntityStreamingSupport, m: ToByteStringMarshaller[T]): ToResponseMarshaller[Source[T, M]] = { + Marshaller[Source[T, M], HttpResponse] { implicit ec ⇒ source ⇒ + FastFuture successful { + Marshalling.WithFixedContentType(s.contentType, () ⇒ { + val availableMarshallingsPerElement = source.mapAsync(1) { t ⇒ m(t)(ec) } + + // TODO optimise such that we pick the optimal marshalling only once (headAndTail needed?) + // TODO, NOTE: this is somewhat duplicated from Marshal.scala it could be made DRYer + val bestMarshallingPerElement = availableMarshallingsPerElement mapConcat { marshallings ⇒ + // pick the Marshalling that matches our EntityStreamingSupport + (s.contentType match { + case best @ (_: ContentType.Binary | _: ContentType.WithFixedCharset) ⇒ + marshallings collectFirst { case Marshalling.WithFixedContentType(`best`, marshal) ⇒ marshal } + + case best @ ContentType.WithCharset(bestMT, bestCS) ⇒ + marshallings collectFirst { + case Marshalling.WithFixedContentType(`best`, marshal) ⇒ marshal + case Marshalling.WithOpenCharset(`bestMT`, marshal) ⇒ () ⇒ marshal(bestCS) + } + }).toList + } + val marshalledElements: Source[ByteString, M] = + bestMarshallingPerElement.map(_.apply()) // marshal! + .via(s.framingRenderer) + + HttpResponse(entity = HttpEntity(s.contentType, marshalledElements)) + }) :: Nil + } + } + } } trait LowPriorityToResponseMarshallerImplicits { @@ -48,6 +86,40 @@ trait LowPriorityToResponseMarshallerImplicits { liftMarshaller(m) implicit def liftMarshaller[T](implicit m: ToEntityMarshaller[T]): ToResponseMarshaller[T] = PredefinedToResponseMarshallers.fromToEntityMarshaller() + + // FIXME deduplicate this!!! + implicit def fromEntityStreamingSupportAndEntityMarshaller[T, M](implicit s: EntityStreamingSupport, m: ToEntityMarshaller[T]): ToResponseMarshaller[Source[T, M]] = { + Marshaller[Source[T, M], HttpResponse] { implicit ec ⇒ source ⇒ + FastFuture successful { + Marshalling.WithFixedContentType(s.contentType, () ⇒ { + val availableMarshallingsPerElement = source.mapAsync(1) { t ⇒ m(t)(ec) } + + // TODO optimise such that we pick the optimal marshalling only once (headAndTail needed?) + // TODO, NOTE: this is somewhat duplicated from Marshal.scala it could be made DRYer + val bestMarshallingPerElement = availableMarshallingsPerElement mapConcat { marshallings ⇒ + // pick the Marshalling that matches our EntityStreamingSupport + (s.contentType match { + case best @ (_: ContentType.Binary | _: ContentType.WithFixedCharset) ⇒ + marshallings collectFirst { case Marshalling.WithFixedContentType(`best`, marshal) ⇒ marshal } + + case best @ ContentType.WithCharset(bestMT, bestCS) ⇒ + marshallings collectFirst { + case Marshalling.WithFixedContentType(`best`, marshal) ⇒ marshal + case Marshalling.WithOpenCharset(`bestMT`, marshal) ⇒ () ⇒ marshal(bestCS) + } + }).toList + } + val marshalledElements: Source[ByteString, M] = + bestMarshallingPerElement.map(_.apply()) // marshal! + .flatMapConcat(_.dataBytes) // extract raw dataBytes + .via(s.framingRenderer) + + HttpResponse(entity = HttpEntity(s.contentType, marshalledElements)) + }) :: Nil + } + } + } + } object PredefinedToResponseMarshallers extends PredefinedToResponseMarshallers diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/EntityStreamingSupport.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/EntityStreamingSupport.scala deleted file mode 100644 index f4656e8612..0000000000 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/EntityStreamingSupport.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (C) 2009-2015 Typesafe Inc. - */ -package akka.http.scaladsl.server - -import akka.NotUsed -import akka.http.scaladsl.common.{ FramingWithContentType, SourceRenderingMode } -import akka.http.scaladsl.model.{ ContentTypeRange, ContentTypes, MediaRanges } -import akka.stream.scaladsl.{ Flow, Framing } -import akka.util.ByteString - -/** - * Entity streaming support, independent of used Json parsing library etc. - * - * Can be extended by various Support traits (e.g. "SprayJsonSupport"), - * in order to provide users with both `framing` (this trait) and `marshalling` - * (implemented by a library) by using a single trait. - */ -trait EntityStreamingSupport extends EntityStreamingSupportBase { - - /** - * Implement as `implicit val` with required framing implementation, for example in - * the case of streaming JSON uploads it could be `bracketCountingJsonFraming(maximumObjectLength)`. - */ - def incomingEntityStreamFraming: FramingWithContentType - - /** - * Implement as `implicit val` with the rendering mode to be used when redering `Source` instances. - * For example for JSON it could be [[akka.http.scaladsl.common.JsonSourceRenderingMode.CompactArray]] - * or [[akka.http.scaladsl.common.JsonSourceRenderingMode.LineByLine]]. - */ - def outgoingEntityStreamRendering: SourceRenderingMode -} - -trait EntityStreamingSupportBase { - /** `application/json` specific Framing implementation */ - def bracketCountingJsonFraming(maximumObjectLength: Int): FramingWithContentType = - new ApplicationJsonBracketCountingFraming(maximumObjectLength) - - /** - * Frames incoming `text / *` entities on a line-by-line basis. - * Useful for accepting `text/csv` uploads as a stream of rows. - */ - def newLineFraming(maximumObjectLength: Int, supportedContentTypes: ContentTypeRange): FramingWithContentType = - new TextNewLineFraming(maximumObjectLength, supportedContentTypes) -} - -/** - * Entity streaming support, independent of used Json parsing library etc. - * - * Can be extended by various Support traits (e.g. "SprayJsonSupport"), - * in order to provide users with both `framing` (this trait) and `marshalling` - * (implemented by a library) by using a single trait. - */ -object EntityStreamingSupport extends EntityStreamingSupportBase - -final class ApplicationJsonBracketCountingFraming(maximumObjectLength: Int) extends FramingWithContentType { - override final val flow = Flow[ByteString].via(akka.stream.scaladsl.JsonFraming.bracketCounting(maximumObjectLength)) - override final val supported = ContentTypeRange(ContentTypes.`application/json`) -} - -final class TextNewLineFraming(maximumLineLength: Int, supportedContentTypes: ContentTypeRange) extends FramingWithContentType { - override final val flow: Flow[ByteString, ByteString, NotUsed] = - Flow[ByteString].via(Framing.delimiter(ByteString("\n"), maximumLineLength)) - - override final val supported: ContentTypeRange = - ContentTypeRange(MediaRanges.`text/*`) -} diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala index 9f3dffd6df..fde94d9e4d 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala @@ -4,27 +4,24 @@ package akka.http.scaladsl.server.directives import akka.NotUsed -import akka.http.scaladsl.common.{ FramingWithContentType, SourceRenderingMode } +import akka.http.scaladsl.common +import akka.http.scaladsl.common.EntityStreamingSupport import akka.http.scaladsl.marshalling._ import akka.http.scaladsl.model._ -import akka.http.scaladsl.unmarshalling.{ Unmarshal, Unmarshaller, _ } +import akka.http.scaladsl.unmarshalling.{ Unmarshaller, _ } import akka.http.scaladsl.util.FastFuture -import akka.stream.Materializer -import akka.stream.impl.ConstantFun import akka.stream.scaladsl.{ Flow, Keep, Source } import akka.util.ByteString -import scala.concurrent.ExecutionContext import scala.language.implicitConversions /** * Allows the [[MarshallingDirectives.entity]] directive to extract a [[Source]] of elements. * - * See [[akka.http.scaladsl.server.EntityStreamingSupport]] for useful default [[FramingWithContentType]] instances and + * See [[common.EntityStreamingSupport]] for useful default framing `Flow` instances and * support traits such as `SprayJsonSupport` (or your other favourite JSON library) to provide the needed [[Marshaller]] s. */ trait FramedEntityStreamingDirectives extends MarshallingDirectives { - import FramedEntityStreamingDirectives._ type RequestToSourceUnmarshaller[T] = FromRequestUnmarshaller[Source[T, NotUsed]] @@ -32,41 +29,33 @@ trait FramedEntityStreamingDirectives extends MarshallingDirectives { * Extracts entity as [[Source]] of elements of type `T`. * This is achieved by applying the implicitly provided (in the following order): * - * - 1st: [[FramingWithContentType]] in order to chunk-up the incoming [[ByteString]]s according to the - * `Content-Type` aware framing (for example, [[akka.http.scaladsl.server.EntityStreamingSupport.bracketCountingJsonFraming]]). - * - 2nd: [[Unmarshaller]] (from [[ByteString]] to `T`) for each of the respective "chunks" (e.g. for each JSON element contained within an array). + * - 1st: chunk-up the incoming [[ByteString]]s by applying the `Content-Type`-aware framing + * - 2nd: apply the [[Unmarshaller]] (from [[ByteString]] to `T`) for each of the respective "chunks" (e.g. for each JSON element contained within an array). * * The request will be rejected with an [[akka.http.scaladsl.server.UnsupportedRequestContentTypeRejection]] if * its [[ContentType]] is not supported by the used `framing` or `unmarshaller`. * - * It is recommended to use the [[akka.http.scaladsl.server.EntityStreamingSupport]] trait in conjunction with this - * directive as it helps provide the right [[FramingWithContentType]] and [[SourceRenderingMode]] for the most - * typical usage scenarios (JSON, CSV, ...). - * * Cancelling extracted [[Source]] closes the connection abruptly (same as cancelling the `entity.dataBytes`). * - * If looking to improve marshalling performance in face of many elements (possibly of different sizes), - * you may be interested in using [[asSourceOfAsyncUnordered]] instead. - * * See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route. * By default the uploaded data is limited by the `akka.http.parsing.max-content-length`. */ - final def asSourceOf[T](implicit um: Unmarshaller[ByteString, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] = - asSourceOfAsync(1)(um, framing) + final def asSourceOf[T](implicit um: FromByteStringUnmarshaller[T], support: EntityStreamingSupport): RequestToSourceUnmarshaller[T] = + asSourceOfInternal(um, support) /** * Extracts entity as [[Source]] of elements of type `T`. * This is achieved by applying the implicitly provided (in the following order): * - * - 1st: [[FramingWithContentType]] in order to chunk-up the incoming [[ByteString]]s according to the - * `Content-Type` aware framing (for example, [[akka.http.scaladsl.server.EntityStreamingSupport.bracketCountingJsonFraming]]). + * - 1st: [[FramingFlow]] in order to chunk-up the incoming [[ByteString]]s according to the + * `Content-Type` aware framing (for example, [[common.EntityStreamingSupport.bracketCountingJsonFraming]]). * - 2nd: [[Unmarshaller]] (from [[ByteString]] to `T`) for each of the respective "chunks" (e.g. for each JSON element contained within an array). * * The request will be rejected with an [[akka.http.scaladsl.server.UnsupportedRequestContentTypeRejection]] if * its [[ContentType]] is not supported by the used `framing` or `unmarshaller`. * - * It is recommended to use the [[akka.http.scaladsl.server.EntityStreamingSupport]] trait in conjunction with this - * directive as it helps provide the right [[FramingWithContentType]] and [[SourceRenderingMode]] for the most + * It is recommended to use the [[common.EntityStreamingSupport]] trait in conjunction with this + * directive as it helps provide the right [[FramingFlow]] and [[SourceRenderingMode]] for the most * typical usage scenarios (JSON, CSV, ...). * * Cancelling extracted [[Source]] closes the connection abruptly (same as cancelling the `entity.dataBytes`). @@ -77,177 +66,25 @@ trait FramedEntityStreamingDirectives extends MarshallingDirectives { * See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route. * By default the uploaded data is limited by the `akka.http.parsing.max-content-length`. */ - final def asSourceOf[T](framing: FramingWithContentType)(implicit um: Unmarshaller[ByteString, T]): RequestToSourceUnmarshaller[T] = - asSourceOfAsync(1)(um, framing) - - /** - * Similar to [[asSourceOf]] however will apply at most `parallelism` unmarshallers in parallel. - * - * The source elements emitted preserve the order in which they are sent in the incoming [[HttpRequest]]. - * If you want to sacrivice ordering in favour of (potential) slight performance improvements in reading the input - * you may want to use [[asSourceOfAsyncUnordered]] instead, which lifts the ordering guarantee. - * - * Refer to [[asSourceOf]] for more in depth-documentation and guidelines. - * - * If looking to improve marshalling performance in face of many elements (possibly of different sizes), - * you may be interested in using [[asSourceOfAsyncUnordered]] instead. - * - * See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route. - * By default the uploaded data is limited by the `akka.http.parsing.max-content-length`. - */ - final def asSourceOfAsync[T](parallelism: Int)(implicit um: Unmarshaller[ByteString, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] = - asSourceOfInternal[T](framing, (ec, mat) ⇒ Flow[ByteString].mapAsync(parallelism)(Unmarshal(_).to[T](um, ec, mat))) - - /** - * Similar to [[asSourceOf]] however will apply at most `parallelism` unmarshallers in parallel. - * - * The source elements emitted preserve the order in which they are sent in the incoming [[HttpRequest]]. - * If you want to sacrivice ordering in favour of (potential) slight performance improvements in reading the input - * you may want to use [[asSourceOfAsyncUnordered]] instead, which lifts the ordering guarantee. - * - * Refer to [[asSourceOf]] for more in depth-documentation and guidelines. - * - * See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route. - * By default the uploaded data is limited by the `akka.http.parsing.max-content-length`. - */ - final def asSourceOfAsync[T](parallelism: Int, framing: FramingWithContentType)(implicit um: Unmarshaller[ByteString, T]): RequestToSourceUnmarshaller[T] = - asSourceOfAsync(parallelism)(um, framing) - - /** - * Similar to [[asSourceOfAsync]], as it will apply at most `parallelism` unmarshallers in parallel. - * - * The source elements emitted preserve the order in which they are sent in the incoming [[HttpRequest]]. - * If you want to sacrivice ordering in favour of (potential) slight performance improvements in reading the input - * you may want to use [[asSourceOfAsyncUnordered]] instead, which lifts the ordering guarantee. - * - * Refer to [[asSourceOf]] for more in depth-documentation and guidelines. - * - * See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route. - * By default the uploaded data is limited by the `akka.http.parsing.max-content-length`. - */ - final def asSourceOfAsyncUnordered[T](parallelism: Int)(implicit um: Unmarshaller[ByteString, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] = - asSourceOfInternal[T](framing, (ec, mat) ⇒ Flow[ByteString].mapAsyncUnordered(parallelism)(Unmarshal(_).to[T](um, ec, mat))) - /** - * Similar to [[asSourceOfAsync]], as it will apply at most `parallelism` unmarshallers in parallel. - * - * The source elements emitted preserve the order in which they are sent in the incoming [[HttpRequest]]. - * If you want to sacrivice ordering in favour of (potential) slight performance improvements in reading the input - * you may want to use [[asSourceOfAsyncUnordered]] instead, which lifts the ordering guarantee. - * - * Refer to [[asSourceOf]] for more in depth-documentation and guidelines. - * - * See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route. - * By default the uploaded data is limited by the `akka.http.parsing.max-content-length`. - */ - final def asSourceOfAsyncUnordered[T](parallelism: Int, framing: FramingWithContentType)(implicit um: Unmarshaller[ByteString, T]): RequestToSourceUnmarshaller[T] = - asSourceOfAsyncUnordered(parallelism)(um, framing) + final def asSourceOf[T](support: EntityStreamingSupport)(implicit um: FromByteStringUnmarshaller[T]): RequestToSourceUnmarshaller[T] = + asSourceOfInternal(um, support) // format: OFF - private final def asSourceOfInternal[T](framing: FramingWithContentType, marshalling: (ExecutionContext, Materializer) => Flow[ByteString, ByteString, NotUsed]#ReprMat[T, NotUsed]): RequestToSourceUnmarshaller[T] = + private final def asSourceOfInternal[T](um: Unmarshaller[ByteString, T], support: EntityStreamingSupport): RequestToSourceUnmarshaller[T] = Unmarshaller.withMaterializer[HttpRequest, Source[T, NotUsed]] { implicit ec ⇒ implicit mat ⇒ req ⇒ val entity = req.entity - if (framing.matches(entity.contentType)) { + if (support.supported.matches(entity.contentType)) { val bytes = entity.dataBytes - val frames = bytes.via(framing.flow) - val elements = frames.viaMat(marshalling(ec, mat))(Keep.right) + val frames = bytes.via(support.framingDecoder) + val marshalling = + if (support.unordered) Flow[ByteString].mapAsyncUnordered(support.parallelism)(bs => um(bs)(ec, mat)) + else Flow[ByteString].mapAsync(support.parallelism)(bs => um(bs)(ec, mat)) + + val elements = frames.viaMat(marshalling)(Keep.right) FastFuture.successful(elements) - } else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(framing.supported)) + } else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(support.supported)) } // format: ON - // TODO note to self - we need the same of ease of streaming stuff for the client side - i.e. the twitter firehose case. - - implicit def _asSourceUnmarshaller[T](implicit fem: FromEntityUnmarshaller[T], framing: FramingWithContentType): FromRequestUnmarshaller[Source[T, NotUsed]] = { - Unmarshaller.withMaterializer[HttpRequest, Source[T, NotUsed]] { implicit ec ⇒ implicit mat ⇒ req ⇒ - val entity = req.entity - if (framing.matches(entity.contentType)) { - val bytes = entity.dataBytes - val frames = bytes.viaMat(framing.flow)(Keep.right) - val elements = frames.viaMat(Flow[ByteString].map(HttpEntity(entity.contentType, _)).mapAsync(1)(Unmarshal(_).to[T](fem, ec, mat)))(Keep.right) - FastFuture.successful(elements) - } else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(framing.supported)) - } - } - - implicit def _sourceMarshaller[T, M](implicit m: ToEntityMarshaller[T], mode: SourceRenderingMode): ToResponseMarshaller[Source[T, M]] = - Marshaller[Source[T, M], HttpResponse] { implicit ec ⇒ source ⇒ - FastFuture successful { - Marshalling.WithFixedContentType(mode.contentType, () ⇒ { - val bytes = source - .mapAsync(1)(t ⇒ Marshal(t).to[HttpEntity]) - .map(_.dataBytes) - .flatMapConcat(ConstantFun.scalaIdentityFunction) - .intersperse(mode.start, mode.between, mode.end) - HttpResponse(entity = HttpEntity(mode.contentType, bytes)) - }) :: Nil - } - } - - implicit def _sourceParallelismMarshaller[T](implicit m: ToEntityMarshaller[T], mode: SourceRenderingMode): ToResponseMarshaller[AsyncRenderingOf[T]] = - Marshaller[AsyncRenderingOf[T], HttpResponse] { implicit ec ⇒ rendering ⇒ - FastFuture successful { - Marshalling.WithFixedContentType(mode.contentType, () ⇒ { - val bytes = rendering.source - .mapAsync(rendering.parallelism)(t ⇒ Marshal(t).to[HttpEntity]) - .map(_.dataBytes) - .flatMapConcat(ConstantFun.scalaIdentityFunction) - .intersperse(mode.start, mode.between, mode.end) - HttpResponse(entity = HttpEntity(mode.contentType, bytes)) - }) :: Nil - } - } - - implicit def _sourceUnorderedMarshaller[T](implicit m: ToEntityMarshaller[T], mode: SourceRenderingMode): ToResponseMarshaller[AsyncUnorderedRenderingOf[T]] = - Marshaller[AsyncUnorderedRenderingOf[T], HttpResponse] { implicit ec ⇒ rendering ⇒ - FastFuture successful { - Marshalling.WithFixedContentType(mode.contentType, () ⇒ { - val bytes = rendering.source - .mapAsync(rendering.parallelism)(t ⇒ Marshal(t).to[HttpEntity]) - .map(_.dataBytes) - .flatMapConcat(ConstantFun.scalaIdentityFunction) - .intersperse(mode.start, mode.between, mode.end) - HttpResponse(entity = HttpEntity(mode.contentType, bytes)) - }) :: Nil - } - } - - // special rendering modes - - implicit def _enableSpecialSourceRenderingModes[T](source: Source[T, Any]): EnableSpecialSourceRenderingModes[T] = - new EnableSpecialSourceRenderingModes(source) - -} -/** - * Allows the [[MarshallingDirectives.entity]] directive to extract a [[Source]] of elements. - * - * See [[FramedEntityStreamingDirectives]] for detailed documentation. - */ -object FramedEntityStreamingDirectives extends FramedEntityStreamingDirectives { - sealed class AsyncSourceRenderingMode - final class AsyncRenderingOf[T](val source: Source[T, Any], val parallelism: Int) extends AsyncSourceRenderingMode - final class AsyncUnorderedRenderingOf[T](val source: Source[T, Any], val parallelism: Int) extends AsyncSourceRenderingMode - -} - -/** Provides DSL for special rendering modes, e.g. `complete(source.renderAsync)` */ -final class EnableSpecialSourceRenderingModes[T](val source: Source[T, Any]) extends AnyVal { - /** - * Causes the response stream to be marshalled asynchronously (up to `parallelism` elements at once), - * while retaining the ordering of incoming elements. - * - * See also [[Source.mapAsync]]. - */ - def renderAsync(parallelism: Int) = new FramedEntityStreamingDirectives.AsyncRenderingOf(source, parallelism) - /** - * Causes the response stream to be marshalled asynchronously (up to `parallelism` elements at once), - * emitting the first one that finished marshalling onto the wire. - * - * This sacrifices ordering of the incoming data in regards to data actually rendered onto the wire, - * but may be faster if some elements are smaller than other ones by not stalling the small elements - * from being written while the large one still is being marshalled. - * - * See also [[Source.mapAsyncUnordered]]. - */ - def renderAsyncUnordered(parallelism: Int) = new FramedEntityStreamingDirectives.AsyncUnorderedRenderingOf(source, parallelism) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala index c69bfc5820..6f34dc2fd7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala @@ -6,11 +6,9 @@ package akka.stream.scaladsl import akka.stream.ActorMaterializer import akka.stream.impl.JsonObjectParser import akka.stream.scaladsl.Framing.FramingException -import akka.stream.scaladsl.{ JsonFraming, Framing, Source } import akka.stream.testkit.scaladsl.TestSink import akka.testkit.AkkaSpec import akka.util.ByteString -import org.scalatest.concurrent.ScalaFutures import scala.collection.immutable.Seq import scala.concurrent.Await @@ -26,21 +24,22 @@ class JsonFramingSpec extends AkkaSpec { """ |[ | { "name" : "john" }, + | { "name" : "Ég get etið gler án þess að meiða mig" }, | { "name" : "jack" }, - | { "name" : "katie" } |] |""".stripMargin // also should complete once notices end of array val result = Source.single(ByteString(input)) - .via(JsonFraming.bracketCounting(Int.MaxValue)) + .via(JsonFraming.objectScanner(Int.MaxValue)) .runFold(Seq.empty[String]) { case (acc, entry) ⇒ acc ++ Seq(entry.utf8String) } result.futureValue shouldBe Seq( """{ "name" : "john" }""", - """{ "name" : "jack" }""", - """{ "name" : "katie" }""") + """{ "name" : "Ég get etið gler án þess að meiða mig" }""", + """{ "name" : "jack" }""" + ) } "emit single json element from string" in { @@ -50,7 +49,7 @@ class JsonFramingSpec extends AkkaSpec { """.stripMargin val result = Source.single(ByteString(input)) - .via(JsonFraming.bracketCounting(Int.MaxValue)) + .via(JsonFraming.objectScanner(Int.MaxValue)) .take(1) .runFold(Seq.empty[String]) { case (acc, entry) ⇒ acc ++ Seq(entry.utf8String) @@ -67,7 +66,7 @@ class JsonFramingSpec extends AkkaSpec { """.stripMargin val result = Source.single(ByteString(input)) - .via(JsonFraming.bracketCounting(Int.MaxValue)) + .via(JsonFraming.objectScanner(Int.MaxValue)) .runFold(Seq.empty[String]) { case (acc, entry) ⇒ acc ++ Seq(entry.utf8String) } @@ -85,7 +84,7 @@ class JsonFramingSpec extends AkkaSpec { """.stripMargin val result = Source.single(ByteString(input)) - .via(JsonFraming.bracketCounting(Int.MaxValue)) + .via(JsonFraming.objectScanner(Int.MaxValue)) .runFold(Seq.empty[String]) { case (acc, entry) ⇒ acc ++ Seq(entry.utf8String) } @@ -109,7 +108,7 @@ class JsonFramingSpec extends AkkaSpec { """"}]"""").map(ByteString(_)) val result = Source.apply(input) - .via(JsonFraming.bracketCounting(Int.MaxValue)) + .via(JsonFraming.objectScanner(Int.MaxValue)) .runFold(Seq.empty[String]) { case (acc, entry) ⇒ acc ++ Seq(entry.utf8String) } @@ -410,7 +409,7 @@ class JsonFramingSpec extends AkkaSpec { """.stripMargin val result = Source.single(ByteString(input)) - .via(JsonFraming.bracketCounting(5)).map(_.utf8String) + .via(JsonFraming.objectScanner(5)).map(_.utf8String) .runFold(Seq.empty[String]) { case (acc, entry) ⇒ acc ++ Seq(entry) } @@ -427,7 +426,7 @@ class JsonFramingSpec extends AkkaSpec { """{ "name": "very very long name somehow. how did this happen?" }""").map(s ⇒ ByteString(s)) val probe = Source(input) - .via(JsonFraming.bracketCounting(48)) + .via(JsonFraming.objectScanner(48)) .runWith(TestSink.probe) probe.ensureSubscription() diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/JsonFraming.scala b/akka-stream/src/main/scala/akka/stream/javadsl/JsonFraming.scala index 3fb3c28638..4bad96f790 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/JsonFraming.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/JsonFraming.scala @@ -34,7 +34,7 @@ object JsonFraming { * @param maximumObjectLength The maximum length of allowed frames while decoding. If the maximum length is exceeded * this Flow will fail the stream. */ - def bracketCounting(maximumObjectLength: Int): Flow[ByteString, ByteString, NotUsed] = - akka.stream.scaladsl.JsonFraming.bracketCounting(maximumObjectLength).asJava + def objectScanner(maximumObjectLength: Int): Flow[ByteString, ByteString, NotUsed] = + akka.stream.scaladsl.JsonFraming.objectScanner(maximumObjectLength).asJava } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala index bc5f69d037..79e35909cc 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala @@ -17,6 +17,7 @@ object JsonFraming { /** * Returns a Flow that implements a "brace counting" based framing stage for emitting valid JSON chunks. + * It scans the incoming data stream for valid JSON objects and returns chunks of ByteStrings containing only those valid chunks. * * Typical examples of data that one may want to frame using this stage include: * @@ -40,7 +41,7 @@ object JsonFraming { * @param maximumObjectLength The maximum length of allowed frames while decoding. If the maximum length is exceeded * this Flow will fail the stream. */ - def bracketCounting(maximumObjectLength: Int): Flow[ByteString, ByteString, NotUsed] = + def objectScanner(maximumObjectLength: Int): Flow[ByteString, ByteString, NotUsed] = Flow[ByteString].via(new SimpleLinearGraphStage[ByteString] { private[this] val buffer = new JsonObjectParser(maximumObjectLength) @@ -67,6 +68,6 @@ object JsonFraming { } } } - }).named("jsonFraming(BracketCounting)") + }).named("JsonFraming.objectScanner") }