+htp #18837 completely rewrite EntityStreamingSupport
added CSV examples updated docs EntityStreamingSupport is now an entry point, to all streaming things both read and write side use it it's easy to extend as well
This commit is contained in:
parent
6562ddd2df
commit
9cc32c3aba
28 changed files with 862 additions and 847 deletions
|
|
@ -5,17 +5,21 @@
|
|||
package docs.http.javadsl.server;
|
||||
|
||||
import akka.NotUsed;
|
||||
import akka.http.javadsl.common.FramingWithContentType;
|
||||
import akka.http.javadsl.common.JsonSourceRenderingModes;
|
||||
import akka.http.javadsl.common.CsvEntityStreamingSupport;
|
||||
import akka.http.javadsl.common.JsonEntityStreamingSupport;
|
||||
import akka.http.javadsl.marshallers.jackson.Jackson;
|
||||
import akka.http.javadsl.marshalling.Marshaller;
|
||||
import akka.http.javadsl.model.*;
|
||||
import akka.http.javadsl.model.headers.Accept;
|
||||
import akka.http.javadsl.server.*;
|
||||
import akka.http.javadsl.testkit.JUnitRouteTest;
|
||||
import akka.http.javadsl.testkit.TestRoute;
|
||||
import akka.http.javadsl.unmarshalling.StringUnmarshallers;
|
||||
import akka.http.javadsl.common.EntityStreamingSupport;
|
||||
import akka.http.javadsl.unmarshalling.Unmarshaller;
|
||||
import akka.stream.javadsl.Flow;
|
||||
import akka.stream.javadsl.Source;
|
||||
import akka.util.ByteString;
|
||||
import docs.http.javadsl.server.testkit.MyAppService;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.CompletionStage;
|
||||
|
|
@ -29,12 +33,32 @@ public class JsonStreamingExamplesTest extends JUnitRouteTest {
|
|||
//#formats
|
||||
|
||||
//#response-streaming
|
||||
|
||||
// Step 1: Enable JSON streaming
|
||||
// we're not using this in the example, but it's the simplest way to start:
|
||||
// The default rendering is a JSON array: `[el, el, el , ...]`
|
||||
final JsonEntityStreamingSupport jsonStreaming = EntityStreamingSupport.json();
|
||||
|
||||
// Step 1.1: Enable and customise how we'll render the JSON, as a compact array:
|
||||
final ByteString start = ByteString.fromString("[");
|
||||
final ByteString between = ByteString.fromString(",");
|
||||
final ByteString end = ByteString.fromString("]");
|
||||
final Flow<ByteString, ByteString, NotUsed> compactArrayRendering =
|
||||
Flow.of(ByteString.class).intersperse(start, between, end);
|
||||
|
||||
final JsonEntityStreamingSupport compactJsonSupport = EntityStreamingSupport.json()
|
||||
.withFramingRendererFlow(compactArrayRendering);
|
||||
|
||||
|
||||
// Step 2: implement the route
|
||||
final Route responseStreaming = path("tweets", () ->
|
||||
get(() ->
|
||||
parameter(StringUnmarshallers.INTEGER, "n", n -> {
|
||||
final Source<JavaTweet, NotUsed> tws =
|
||||
Source.repeat(new JavaTweet("Hello World!")).take(n);
|
||||
return completeOKWithSource(tws, Jackson.marshaller(), JsonSourceRenderingModes.arrayCompact());
|
||||
Source.repeat(new JavaTweet(12, "Hello World!")).take(n);
|
||||
|
||||
// Step 3: call complete* with your source, marshaller, and stream rendering mode
|
||||
return completeOKWithSource(tws, Jackson.marshaller(), compactJsonSupport);
|
||||
})
|
||||
)
|
||||
);
|
||||
|
|
@ -44,9 +68,9 @@ public class JsonStreamingExamplesTest extends JUnitRouteTest {
|
|||
final Route incomingStreaming = path("tweets", () ->
|
||||
post(() ->
|
||||
extractMaterializer(mat -> {
|
||||
final FramingWithContentType jsonFraming = EntityStreamingSupport.bracketCountingJsonFraming(128);
|
||||
final JsonEntityStreamingSupport jsonSupport = EntityStreamingSupport.json();
|
||||
|
||||
return entityasSourceOf(JavaTweets, jsonFraming, sourceOfTweets -> {
|
||||
return entityAsSourceOf(JavaTweets, jsonSupport, sourceOfTweets -> {
|
||||
final CompletionStage<Integer> tweetsCount = sourceOfTweets.runFold(0, (acc, tweet) -> acc + 1, mat);
|
||||
return onComplete(tweetsCount, c -> complete("Total number of tweets: " + c));
|
||||
});
|
||||
|
|
@ -58,6 +82,29 @@ public class JsonStreamingExamplesTest extends JUnitRouteTest {
|
|||
|
||||
return responseStreaming.orElse(incomingStreaming);
|
||||
}
|
||||
|
||||
final Route csvTweets() {
|
||||
//#csv-example
|
||||
final Marshaller<JavaTweet, ByteString> renderAsCsv =
|
||||
Marshaller.withFixedContentType(ContentTypes.TEXT_CSV_UTF8, t ->
|
||||
ByteString.fromString(t.getId() + "," + t.getMessage())
|
||||
);
|
||||
|
||||
final CsvEntityStreamingSupport compactJsonSupport = EntityStreamingSupport.csv();
|
||||
|
||||
final Route responseStreaming = path("tweets", () ->
|
||||
get(() ->
|
||||
parameter(StringUnmarshallers.INTEGER, "n", n -> {
|
||||
final Source<JavaTweet, NotUsed> tws =
|
||||
Source.repeat(new JavaTweet(12, "Hello World!")).take(n);
|
||||
return completeWithSource(tws, renderAsCsv, compactJsonSupport);
|
||||
})
|
||||
)
|
||||
);
|
||||
//#csv-example
|
||||
|
||||
return responseStreaming;
|
||||
}
|
||||
//#routes
|
||||
|
||||
@Test
|
||||
|
|
@ -70,7 +117,7 @@ public class JsonStreamingExamplesTest extends JUnitRouteTest {
|
|||
final Accept acceptApplication = Accept.create(MediaRanges.create(MediaTypes.APPLICATION_JSON));
|
||||
routes.run(HttpRequest.GET("/tweets?n=2").addHeader(acceptApplication))
|
||||
.assertStatusCode(200)
|
||||
.assertEntity("[{\"message\":\"Hello World!\"},{\"message\":\"Hello World!\"}]");
|
||||
.assertEntity("[{\"id\":12,\"message\":\"Hello World!\"},{\"id\":12,\"message\":\"Hello World!\"}]");
|
||||
|
||||
// test responses to potential errors
|
||||
final Accept acceptText = Accept.create(MediaRanges.ALL_TEXT);
|
||||
|
|
@ -79,15 +126,46 @@ public class JsonStreamingExamplesTest extends JUnitRouteTest {
|
|||
.assertEntity("Resource representation is only available with these types:\napplication/json");
|
||||
//#response-streaming
|
||||
}
|
||||
|
||||
@Test
|
||||
public void csvExampleTweetsTest() {
|
||||
//#response-streaming
|
||||
// tests --------------------------------------------
|
||||
final TestRoute routes = testRoute(csvTweets());
|
||||
|
||||
// test happy path
|
||||
final Accept acceptCsv = Accept.create(MediaRanges.create(MediaTypes.TEXT_CSV));
|
||||
routes.run(HttpRequest.GET("/tweets?n=2").addHeader(acceptCsv))
|
||||
.assertStatusCode(200)
|
||||
.assertEntity("12,Hello World!\n" +
|
||||
"12,Hello World!");
|
||||
|
||||
// test responses to potential errors
|
||||
final Accept acceptText = Accept.create(MediaRanges.ALL_APPLICATION);
|
||||
routes.run(HttpRequest.GET("/tweets?n=3").addHeader(acceptText))
|
||||
.assertStatusCode(StatusCodes.NOT_ACCEPTABLE) // 406
|
||||
.assertEntity("Resource representation is only available with these types:\ntext/csv; charset=UTF-8");
|
||||
//#response-streaming
|
||||
}
|
||||
|
||||
//#models
|
||||
private static final class JavaTweet {
|
||||
private int id;
|
||||
private String message;
|
||||
|
||||
public JavaTweet(String message) {
|
||||
public JavaTweet(int id, String message) {
|
||||
this.id = id;
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
public int getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(int id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public void setMessage(String message) {
|
||||
this.message = message;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,18 +3,15 @@
|
|||
Source Streaming
|
||||
================
|
||||
|
||||
Akka HTTP supports completing a request with an Akka ``Source<T, ?>``, which makes it possible to easily build
|
||||
streaming end-to-end APIs which apply back-pressure throughout the entire stack.
|
||||
Akka HTTP supports completing a request with an Akka ``Source<T, _>``, which makes it possible to easily build
|
||||
and consume streaming end-to-end APIs which apply back-pressure throughout the entire stack.
|
||||
|
||||
It is possible to complete requests with raw ``Source<ByteString, ?>``, however often it is more convenient to
|
||||
It is possible to complete requests with raw ``Source<ByteString, _>``, however often it is more convenient to
|
||||
stream on an element-by-element basis, and allow Akka HTTP to handle the rendering internally - for example as a JSON array,
|
||||
or CSV stream (where each element is separated by a new-line).
|
||||
|
||||
In the following sections we investigate how to make use of the JSON Streaming infrastructure,
|
||||
however the general hints apply to any kind of element-by-element streaming you could imagine.
|
||||
|
||||
It is possible to implement your own framing for any content type you might need, including bianary formats
|
||||
by implementing :class:`FramingWithContentType`.
|
||||
however the general hints apply to any kind of element-by-element streaming you could imagine.
|
||||
|
||||
JSON Streaming
|
||||
==============
|
||||
|
|
@ -24,7 +21,7 @@ objects as a continuous HTTP request or response. The elements are most often se
|
|||
however do not have to be. Concatenating elements side-by-side or emitting "very long" JSON array is also another
|
||||
use case.
|
||||
|
||||
In the below examples, we'll be refering to the ``User`` and ``Measurement`` case classes as our model, which are defined as:
|
||||
In the below examples, we'll be refering to the ``Tweet`` and ``Measurement`` case classes as our model, which are defined as:
|
||||
|
||||
.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#models
|
||||
|
||||
|
|
@ -36,11 +33,21 @@ Responding with JSON Streams
|
|||
In this example we implement an API representing an infinite stream of tweets, very much like Twitter's `Streaming API`_.
|
||||
|
||||
Firstly, we'll need to get some additional marshalling infrastructure set up, that is able to marshal to and from an
|
||||
Akka Streams ``Source<T, ?>``. One such trait, containing the needed marshallers is ``SprayJsonSupport``, which uses
|
||||
spray-json (a high performance json parser library), and is shipped as part of Akka HTTP in the
|
||||
``akka-http-spray-json-experimental`` module.
|
||||
Akka Streams ``Source<T,_>``. Here we'll use the ``Jackson`` helper class from ``akka-http-jackson`` (a separate library
|
||||
that you should add as a dependency if you want to use Jackson with Akka HTTP).
|
||||
|
||||
The last bit of setup, before we can render a streaming json response
|
||||
First we enable JSON Streaming by making an implicit ``EntityStreamingSupport`` instance available (Step 1).
|
||||
|
||||
The default mode of rendering a ``Source`` is to represent it as an JSON Array. If you want to change this representation
|
||||
for example to use Twitter style new-line separated JSON objects, you can do so by configuring the support trait accordingly.
|
||||
|
||||
In Step 1.1. we demonstrate to configure configude the rendering to be new-line separated, and also how parallel marshalling
|
||||
can be applied. We configure the Support object to render the JSON as series of new-line separated JSON objects,
|
||||
simply by providing the ``start``, ``sep`` and ``end`` ByteStrings, which will be emitted at the apropriate
|
||||
places in the rendered stream. Although this format is *not* valid JSON, it is pretty popular since parsing it is relatively
|
||||
simple - clients need only to find the new-lines and apply JSON unmarshalling for an entire line of JSON.
|
||||
|
||||
The final step is simply completing a request using a Source of tweets, as simple as that:
|
||||
|
||||
.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#response-streaming
|
||||
|
||||
|
|
@ -60,15 +67,25 @@ will be applied automatically thanks to using Akka HTTP/Streams).
|
|||
|
||||
.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#incoming-request-streaming
|
||||
|
||||
Implementing custom (Un)Marshaller support for JSON streaming
|
||||
-------------------------------------------------------------
|
||||
|
||||
The following types that may need to be implemented by a custom framed-streaming support library are:
|
||||
Simple CSV streaming example
|
||||
----------------------------
|
||||
|
||||
- ``SourceRenderingMode`` which can customise how to render the begining / between-elements and ending of such
|
||||
stream (while writing a response, i.e. by calling ``complete(source)``).
|
||||
Implementations for JSON are available in ``akka.http.scaladsl.common.JsonSourceRenderingMode``.
|
||||
- ``FramingWithContentType`` which is needed to be able to split incoming ``ByteString``
|
||||
chunks into frames of the higher-level data type format that is understood by the provided unmarshallers.
|
||||
In the case of JSON it means chunking up ByteStrings such that each emitted element corresponds to exactly one JSON object,
|
||||
this framing is implemented in ``EntityStreamingSupport``.
|
||||
Akka HTTP provides another ``EntityStreamingSupport`` out of the box, namely ``csv`` (comma-separated values).
|
||||
For completeness, we demonstrate its usage in the below snippet. As you'll notice, switching betweeen streaming
|
||||
modes is fairly simple, one only has to make sure that an implicit ``Marshaller`` of the requested type is available,
|
||||
and that the streaming support operates on the same ``Content-Type`` as the rendered values. Otherwise you'll see
|
||||
an error during runtime that the marshaller did not expose the expected content type and thus we can not render
|
||||
the streaming response).
|
||||
|
||||
.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#csv-example
|
||||
|
||||
Implementing custom EntityStreamingSupport traits
|
||||
-------------------------------------------------
|
||||
|
||||
The ``EntityStreamingSupport`` infrastructure is open for extension and not bound to any single format, content type
|
||||
or marshalling library. The provided JSON support does not rely on Spray JSON directly, but uses ``Marshaller<T, ByteString>``
|
||||
instances, which can be provided using any JSON marshalling library (such as Circe, Jawn or Play JSON).
|
||||
|
||||
When implementing a custom support trait, one should simply extend the ``EntityStreamingSupport`` abstract class,
|
||||
and implement all of it's methods. It's best to use the existing implementations as a guideline.
|
||||
|
|
|
|||
|
|
@ -5,13 +5,15 @@
|
|||
package docs.http.scaladsl.server.directives
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.http.scaladsl.common.{ FramingWithContentType, JsonSourceRenderingModes }
|
||||
import akka.http.scaladsl.marshalling.ToResponseMarshallable
|
||||
import akka.http.scaladsl.common.{ EntityStreamingSupport, JsonEntityStreamingSupport }
|
||||
import akka.http.scaladsl.marshalling._
|
||||
import akka.http.scaladsl.model._
|
||||
import akka.http.scaladsl.model.headers.Accept
|
||||
import akka.http.scaladsl.server.{ UnacceptedResponseContentTypeRejection, UnsupportedRequestContentTypeRejection }
|
||||
import akka.stream.scaladsl.{ Flow, Source }
|
||||
import akka.util.ByteString
|
||||
import docs.http.scaladsl.server.RoutingSpec
|
||||
import spray.json.JsValue
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
|
|
@ -29,39 +31,41 @@ class JsonStreamingExamplesSpec extends RoutingSpec {
|
|||
Tweet(3, "You cannot enter the same river twice.")))
|
||||
|
||||
//#formats
|
||||
object MyJsonProtocol extends spray.json.DefaultJsonProtocol {
|
||||
object MyJsonProtocol
|
||||
extends akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
|
||||
with spray.json.DefaultJsonProtocol {
|
||||
|
||||
implicit val tweetFormat = jsonFormat2(Tweet.apply)
|
||||
implicit val measurementFormat = jsonFormat2(Measurement.apply)
|
||||
}
|
||||
//#
|
||||
|
||||
"spray-json-response-streaming" in {
|
||||
// [1] import generic spray-json marshallers support:
|
||||
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
|
||||
|
||||
// [2] import "my protocol", for marshalling Tweet objects:
|
||||
// [1] import "my protocol", for marshalling Tweet objects:
|
||||
import MyJsonProtocol._
|
||||
|
||||
// [3] pick json rendering mode:
|
||||
// HINT: if you extend `akka.http.scaladsl.server.EntityStreamingSupport`
|
||||
// it'll guide you to do so via abstract defs
|
||||
implicit val jsonRenderingMode = JsonSourceRenderingModes.LineByLine
|
||||
// [2] pick a Source rendering support trait:
|
||||
// Note that the default support renders the Source as JSON Array
|
||||
implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()
|
||||
|
||||
val route =
|
||||
path("tweets") {
|
||||
// [3] simply complete a request with a source of tweets:
|
||||
val tweets: Source[Tweet, NotUsed] = getTweets()
|
||||
complete(tweets)
|
||||
}
|
||||
|
||||
// tests:
|
||||
// tests ------------------------------------------------------------
|
||||
val AcceptJson = Accept(MediaRange(MediaTypes.`application/json`))
|
||||
val AcceptXml = Accept(MediaRange(MediaTypes.`text/xml`))
|
||||
|
||||
Get("/tweets").withHeaders(AcceptJson) ~> route ~> check {
|
||||
responseAs[String] shouldEqual
|
||||
"""{"uid":1,"txt":"#Akka rocks!"}""" + "\n" +
|
||||
"""{"uid":2,"txt":"Streaming is so hot right now!"}""" + "\n" +
|
||||
"""{"uid":3,"txt":"You cannot enter the same river twice."}"""
|
||||
"""[""" +
|
||||
"""{"uid":1,"txt":"#Akka rocks!"},""" +
|
||||
"""{"uid":2,"txt":"Streaming is so hot right now!"},""" +
|
||||
"""{"uid":3,"txt":"You cannot enter the same river twice."}""" +
|
||||
"""]"""
|
||||
}
|
||||
|
||||
// endpoint can only marshal Json, so it will *reject* requests for application/xml:
|
||||
|
|
@ -71,44 +75,115 @@ class JsonStreamingExamplesSpec extends RoutingSpec {
|
|||
}
|
||||
}
|
||||
|
||||
"response-streaming-modes" in {
|
||||
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
|
||||
"line-by-line-json-response-streaming" in {
|
||||
import MyJsonProtocol._
|
||||
implicit val jsonRenderingMode = JsonSourceRenderingModes.LineByLine
|
||||
|
||||
//#async-rendering
|
||||
path("tweets") {
|
||||
val tweets: Source[Tweet, NotUsed] = getTweets()
|
||||
complete(tweets.renderAsync(parallelism = 8))
|
||||
}
|
||||
//#
|
||||
// Configure the EntityStreamingSupport to render the elements as:
|
||||
// {"example":42}
|
||||
// {"example":43}
|
||||
// ...
|
||||
// {"example":1000}
|
||||
val start = ByteString.empty
|
||||
val sep = ByteString("\n")
|
||||
val end = ByteString.empty
|
||||
|
||||
//#async-unordered-rendering
|
||||
path("tweets" / "unordered") {
|
||||
val tweets: Source[Tweet, NotUsed] = getTweets()
|
||||
complete(tweets.renderAsyncUnordered(parallelism = 8))
|
||||
implicit val jsonStreamingSupport = EntityStreamingSupport.json()
|
||||
.withFramingRenderer(Flow[ByteString].intersperse(start, sep, end))
|
||||
|
||||
val route =
|
||||
path("tweets") {
|
||||
// [3] simply complete a request with a source of tweets:
|
||||
val tweets: Source[Tweet, NotUsed] = getTweets()
|
||||
complete(tweets)
|
||||
}
|
||||
|
||||
// tests ------------------------------------------------------------
|
||||
val AcceptJson = Accept(MediaRange(MediaTypes.`application/json`))
|
||||
|
||||
Get("/tweets").withHeaders(AcceptJson) ~> route ~> check {
|
||||
responseAs[String] shouldEqual
|
||||
"""{"uid":1,"txt":"#Akka rocks!"}""" + "\n" +
|
||||
"""{"uid":2,"txt":"Streaming is so hot right now!"}""" + "\n" +
|
||||
"""{"uid":3,"txt":"You cannot enter the same river twice."}"""
|
||||
}
|
||||
}
|
||||
|
||||
"csv-example" in {
|
||||
// [1] provide a marshaller to ByteString
|
||||
implicit val tweetAsCsv = Marshaller.strict[Tweet, ByteString] { t =>
|
||||
Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () => {
|
||||
val txt = t.txt.replaceAll(",", ".")
|
||||
val uid = t.uid
|
||||
ByteString(List(uid, txt).mkString(","))
|
||||
})
|
||||
}
|
||||
|
||||
// [2] enable csv streaming:
|
||||
implicit val csvStreaming = EntityStreamingSupport.csv()
|
||||
|
||||
val route =
|
||||
path("tweets") {
|
||||
val tweets: Source[Tweet, NotUsed] = getTweets()
|
||||
complete(tweets)
|
||||
}
|
||||
|
||||
// tests ------------------------------------------------------------
|
||||
val AcceptCsv = Accept(MediaRange(MediaTypes.`text/csv`))
|
||||
|
||||
Get("/tweets").withHeaders(AcceptCsv) ~> route ~> check {
|
||||
responseAs[String] shouldEqual
|
||||
"""|1,#Akka rocks!
|
||||
|2,Streaming is so hot right now!
|
||||
|3,You cannot enter the same river twice."""
|
||||
.stripMargin
|
||||
}
|
||||
}
|
||||
|
||||
"response-streaming-modes" in {
|
||||
|
||||
{
|
||||
//#async-rendering
|
||||
import MyJsonProtocol._
|
||||
implicit val jsonStreamingSupport: JsonEntityStreamingSupport =
|
||||
EntityStreamingSupport.json()
|
||||
.withParallelMarshalling(parallelism = 8, unordered = false)
|
||||
|
||||
path("tweets") {
|
||||
val tweets: Source[Tweet, NotUsed] = getTweets()
|
||||
complete(tweets)
|
||||
}
|
||||
//#
|
||||
}
|
||||
|
||||
{
|
||||
|
||||
//#async-unordered-rendering
|
||||
import MyJsonProtocol._
|
||||
implicit val jsonStreamingSupport: JsonEntityStreamingSupport =
|
||||
EntityStreamingSupport.json()
|
||||
.withParallelMarshalling(parallelism = 8, unordered = true)
|
||||
|
||||
path("tweets" / "unordered") {
|
||||
val tweets: Source[Tweet, NotUsed] = getTweets()
|
||||
complete(tweets)
|
||||
}
|
||||
//#
|
||||
}
|
||||
//#
|
||||
}
|
||||
|
||||
"spray-json-request-streaming" in {
|
||||
// [1] import generic spray-json (un)marshallers support:
|
||||
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
|
||||
|
||||
// [1.1] import framing mode
|
||||
import akka.http.scaladsl.server.EntityStreamingSupport
|
||||
implicit val jsonFramingMode: FramingWithContentType =
|
||||
EntityStreamingSupport.bracketCountingJsonFraming(Int.MaxValue)
|
||||
|
||||
// [2] import "my protocol", for unmarshalling Measurement objects:
|
||||
// [1] import "my protocol", for unmarshalling Measurement objects:
|
||||
import MyJsonProtocol._
|
||||
|
||||
// [3] prepare your persisting logic here
|
||||
// [2] enable Json Streaming
|
||||
implicit val jsonStreamingSupport = EntityStreamingSupport.json()
|
||||
|
||||
// prepare your persisting logic here
|
||||
val persistMetrics = Flow[Measurement]
|
||||
|
||||
val route =
|
||||
path("metrics") {
|
||||
// [4] extract Source[Measurement, _]
|
||||
// [3] extract Source[Measurement, _]
|
||||
entity(asSourceOf[Measurement]) { measurements =>
|
||||
// alternative syntax:
|
||||
// entity(as[Source[Measurement, NotUsed]]) { measurements =>
|
||||
|
|
@ -123,7 +198,8 @@ class JsonStreamingExamplesSpec extends RoutingSpec {
|
|||
}
|
||||
}
|
||||
|
||||
// tests:
|
||||
// tests ------------------------------------------------------------
|
||||
// uploading an array or newline separated values works out of the box
|
||||
val data = HttpEntity(
|
||||
ContentTypes.`application/json`,
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ Source Streaming
|
|||
================
|
||||
|
||||
Akka HTTP supports completing a request with an Akka ``Source[T, _]``, which makes it possible to easily build
|
||||
streaming end-to-end APIs which apply back-pressure throughout the entire stack.
|
||||
and consume streaming end-to-end APIs which apply back-pressure throughout the entire stack.
|
||||
|
||||
It is possible to complete requests with raw ``Source[ByteString, _]``, however often it is more convenient to
|
||||
stream on an element-by-element basis, and allow Akka HTTP to handle the rendering internally - for example as a JSON array,
|
||||
|
|
@ -13,9 +13,6 @@ or CSV stream (where each element is separated by a new-line).
|
|||
In the following sections we investigate how to make use of the JSON Streaming infrastructure,
|
||||
however the general hints apply to any kind of element-by-element streaming you could imagine.
|
||||
|
||||
It is possible to implement your own framing for any content type you might need, including bianary formats
|
||||
by implementing :class:`FramingWithContentType`.
|
||||
|
||||
JSON Streaming
|
||||
==============
|
||||
|
||||
|
|
@ -24,7 +21,7 @@ objects as a continuous HTTP request or response. The elements are most often se
|
|||
however do not have to be. Concatenating elements side-by-side or emitting "very long" JSON array is also another
|
||||
use case.
|
||||
|
||||
In the below examples, we'll be refering to the ``User`` and ``Measurement`` case classes as our model, which are defined as:
|
||||
In the below examples, we'll be refering to the ``Tweet`` and ``Measurement`` case classes as our model, which are defined as:
|
||||
|
||||
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
|
||||
:snippet: models
|
||||
|
|
@ -47,32 +44,53 @@ Akka Streams ``Source[T,_]``. One such trait, containing the needed marshallers
|
|||
spray-json (a high performance json parser library), and is shipped as part of Akka HTTP in the
|
||||
``akka-http-spray-json-experimental`` module.
|
||||
|
||||
Next we import our model's marshallers, generated by spray-json.
|
||||
Once the general infrastructure is prepared we import our model's marshallers, generated by spray-json (Step 1),
|
||||
and enable JSON Streaming by making an implicit ``EntityStreamingSupport`` instance available (Step 2).
|
||||
Akka HTTP pre-packages JSON and CSV entity streaming support, however it is simple to add your own, in case you'd
|
||||
like to stream a different content type (for example plists or protobuf).
|
||||
|
||||
The last bit of setup, before we can render a streaming json response
|
||||
The final step is simply completing a request using a Source of tweets, as simple as that:
|
||||
|
||||
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
|
||||
:snippet: spray-json-response-streaming
|
||||
|
||||
The reason the ``EntityStreamingSupport`` has to be enabled explicitly is that one might want to configure how the
|
||||
stream should be rendered. We'll dicuss this in depth in the next section though.
|
||||
|
||||
.. _Streaming API: https://dev.twitter.com/streaming/overview
|
||||
|
||||
Customising response rendering mode
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
The mode in which a response is marshalled and then rendered to the HttpResponse from the provided ``Source[T,_]``
|
||||
is customisable (thanks to conversions originating from ``Directives`` via ``EntityStreamingDirectives``).
|
||||
Since it is not always possible to directly and confidently answer the question of how a stream of ``T`` should look on
|
||||
the wire, the ``EntityStreamingSupport`` traits come into play and allow fine-tuning the streams rendered representation.
|
||||
|
||||
Since Marshalling is a potentially asynchronous operation in Akka HTTP (because transforming ``T`` to ``JsValue`` may
|
||||
potentially take a long time (depending on your definition of "long time"), we allow to run marshalling concurrently
|
||||
(up to ``parallelism`` concurrent marshallings) by using the ``renderAsync(parallelism)`` mode:
|
||||
For example, in case of JSON Streaming, there isn't really one standard about rendering the response. Some APIs prefer
|
||||
to render multiple JSON objects in a line-by-line fashion (Twitter's streaming APIs for example), while others simply return
|
||||
very large arrays, which could be streamed as well.
|
||||
|
||||
Akka defaults to the second one (streaming a JSON Array), as it is correct JSON and clients not expecting
|
||||
a streaming API would still be able to consume it in a naive way if they'd want to.
|
||||
|
||||
The line-by-line aproach however is also pretty popular even though it is not valid JSON. It's relatively simplicity for
|
||||
client-side parsing is a strong point in case to pick this format for your Streaming APIs.
|
||||
Below we demonstrate how to reconfigure the support trait to render the JSON as
|
||||
|
||||
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
|
||||
:snippet: line-by-line-json-response-streaming
|
||||
|
||||
Another interesting feature is parallel marshalling. Since marshalling can potentially take much time,
|
||||
it is possible to marshal multiple elements of the stream in parallel. This is simply a configuration
|
||||
option on ``EntityStreamingSupport`` and is configurable like this:
|
||||
|
||||
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
|
||||
:snippet: async-rendering
|
||||
|
||||
The ``renderAsync`` mode perserves ordering of the Source's elements, which may sometimes be a required property,
|
||||
The above shown mode perserves ordering of the Source's elements, which may sometimes be a required property,
|
||||
for example when streaming a strictly ordered dataset. Sometimes the contept of strict-order does not apply to the
|
||||
data being streamed though, which allows us to explit this property and use ``renderAsyncUnordered(parallelism)``,
|
||||
which will concurrently marshall up to ``parallelism`` elements and emit the first which is marshalled onto
|
||||
the HttpResponse:
|
||||
data being streamed though, which allows us to exploit this property and use an ``unordered`` rendering.
|
||||
|
||||
This also is a configuration option and is used as shown below. Effectively this will allow Akka's marshalling infrastructure
|
||||
to concurrently marshallup to ``parallelism`` elements and emit the first which is marshalled onto the ``HttpResponse``:
|
||||
|
||||
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
|
||||
:snippet: async-unordered-rendering
|
||||
|
|
@ -94,18 +112,25 @@ will be applied automatically thanks to using Akka HTTP/Streams).
|
|||
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
|
||||
:snippet: spray-json-request-streaming
|
||||
|
||||
Implementing custom (Un)Marshaller support for JSON streaming
|
||||
-------------------------------------------------------------
|
||||
Simple CSV streaming example
|
||||
----------------------------
|
||||
|
||||
While not provided by Akka HTTP directly, the infrastructure is extensible and by investigating how ``SprayJsonSupport``
|
||||
is implemented it is certainly possible to provide the same infrastructure for other marshaller implementations (such as
|
||||
Play JSON, or Jackson directly for example). Such support traits will want to extend the ``EntityStreamingSupport`` trait.
|
||||
Akka HTTP provides another ``EntityStreamingSupport`` out of the box, namely ``csv`` (comma-separated values).
|
||||
For completeness, we demonstrate its usage in the below snippet. As you'll notice, switching betweeen streaming
|
||||
modes is fairly simple, one only has to make sure that an implicit ``Marshaller`` of the requested type is available,
|
||||
and that the streaming support operates on the same ``Content-Type`` as the rendered values. Otherwise you'll see
|
||||
an error during runtime that the marshaller did not expose the expected content type and thus we can not render
|
||||
the streaming response).
|
||||
|
||||
The following types that may need to be implemented by a custom framed-streaming support library are:
|
||||
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
|
||||
:snippet: csv-example
|
||||
|
||||
- ``SourceRenderingMode`` which can customise how to render the begining / between-elements and ending of such stream (while writing a response, i.e. by calling ``complete(source)``).
|
||||
Implementations for JSON are available in ``akka.http.scaladsl.server.JsonSourceRenderingMode``.
|
||||
- ``FramingWithContentType`` which is needed to be able to split incoming ``ByteString`` chunks into frames
|
||||
of the higher-level data type format that is understood by the provided unmarshallers.
|
||||
In the case of JSON it means chunking up ByteStrings such that each emitted element corresponds to exactly one JSON object,
|
||||
this framing is implemented in ``EntityStreamingSupport``.
|
||||
Implementing custom EntityStreamingSupport traits
|
||||
-------------------------------------------------
|
||||
|
||||
The ``EntityStreamingSupport`` infrastructure is open for extension and not bound to any single format, content type
|
||||
or marshalling library. The provided JSON support does not rely on Spray JSON directly, but uses ``Marshaller[T, ByteString]``
|
||||
instances, which can be provided using any JSON marshalling library (such as Circe, Jawn or Play JSON).
|
||||
|
||||
When implementing a custom support trait, one should simply extend the ``EntityStreamingSupport`` abstract class,
|
||||
and implement all of it's methods. It's best to use the existing implementations as a guideline.
|
||||
|
|
|
|||
|
|
@ -4,11 +4,14 @@
|
|||
|
||||
package akka.http.scaladsl.marshallers.sprayjson
|
||||
|
||||
import akka.http.scaladsl.marshalling.{ Marshaller, ToByteStringMarshaller, ToEntityMarshaller }
|
||||
import akka.NotUsed
|
||||
import akka.http.scaladsl.common.EntityStreamingSupport
|
||||
import akka.http.scaladsl.marshalling._
|
||||
import akka.http.scaladsl.model.MediaTypes.`application/json`
|
||||
import akka.http.scaladsl.model.{ HttpCharsets, MediaTypes }
|
||||
import akka.http.scaladsl.unmarshalling.{ FromEntityUnmarshaller, Unmarshaller }
|
||||
import akka.http.scaladsl.unmarshalling.{ FromEntityUnmarshaller, FromRequestUnmarshaller, Unmarshaller }
|
||||
import akka.http.scaladsl.util.FastFuture
|
||||
import akka.stream.scaladsl.{ Flow, Keep, Source }
|
||||
import akka.util.ByteString
|
||||
import spray.json._
|
||||
|
||||
|
|
@ -44,7 +47,19 @@ trait SprayJsonSupport {
|
|||
sprayJsValueMarshaller compose writer.write
|
||||
implicit def sprayJsValueMarshaller(implicit printer: JsonPrinter = CompactPrinter): ToEntityMarshaller[JsValue] =
|
||||
Marshaller.StringMarshaller.wrap(MediaTypes.`application/json`)(printer)
|
||||
implicit def sprayByteStringMarshaller[T](implicit writer: RootJsonFormat[T], printer: JsonPrinter = CompactPrinter): ToByteStringMarshaller[T] =
|
||||
sprayJsValueMarshaller.map(s ⇒ ByteString(s.toString)) compose writer.write
|
||||
|
||||
// support for as[Source[T, NotUsed]]
|
||||
implicit def sprayJsonSourceReader[T](implicit rootJsonReader: RootJsonReader[T], support: EntityStreamingSupport): FromRequestUnmarshaller[Source[T, NotUsed]] =
|
||||
Unmarshaller.withMaterializer { implicit ec ⇒ implicit mat ⇒ r ⇒
|
||||
if (support.supported.matches(r.entity.contentType)) {
|
||||
val bytes = r.entity.dataBytes
|
||||
val frames = bytes.via(support.framingDecoder)
|
||||
val unmarshalling =
|
||||
if (support.unordered) Flow[ByteString].mapAsyncUnordered(support.parallelism)(bs ⇒ sprayJsonByteStringUnmarshaller(rootJsonReader)(bs))
|
||||
else Flow[ByteString].mapAsync(support.parallelism)(bs ⇒ sprayJsonByteStringUnmarshaller(rootJsonReader)(bs))
|
||||
val elements = frames.viaMat(unmarshalling)(Keep.right)
|
||||
FastFuture.successful(elements)
|
||||
} else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(support.supported))
|
||||
}
|
||||
}
|
||||
object SprayJsonSupport extends SprayJsonSupport
|
||||
|
|
|
|||
|
|
@ -8,12 +8,13 @@ import akka.actor.ActorSystem;
|
|||
import akka.http.javadsl.ConnectHttp;
|
||||
import akka.http.javadsl.Http;
|
||||
import akka.http.javadsl.ServerBinding;
|
||||
import akka.http.javadsl.common.EntityStreamingSupport;
|
||||
import akka.http.javadsl.marshallers.jackson.Jackson;
|
||||
import akka.http.javadsl.model.HttpEntity;
|
||||
import akka.http.javadsl.model.HttpRequest;
|
||||
import akka.http.javadsl.model.HttpResponse;
|
||||
import akka.http.javadsl.model.StatusCodes;
|
||||
import akka.http.javadsl.common.JsonSourceRenderingModes;
|
||||
import akka.http.javadsl.unmarshalling.StringUnmarshallers;
|
||||
import akka.http.javadsl.unmarshalling.Unmarshaller;
|
||||
import akka.stream.ActorMaterializer;
|
||||
import akka.stream.javadsl.Flow;
|
||||
import akka.stream.javadsl.Source;
|
||||
|
|
@ -70,12 +71,12 @@ public class JavaTestServer extends AllDirectives { // or import static Directiv
|
|||
get(() ->
|
||||
parameter(StringUnmarshallers.INTEGER, "n", n -> {
|
||||
final Source<JavaTweet, NotUsed> tws = Source.repeat(new JavaTweet("Hello World!")).take(n);
|
||||
return completeOKWithSource(tws, Jackson.marshaller(), JsonSourceRenderingModes.arrayCompact());
|
||||
return completeOKWithSource(tws, Jackson.marshaller(), EntityStreamingSupport.json());
|
||||
})
|
||||
).orElse(
|
||||
post(() ->
|
||||
extractMaterializer(mat ->
|
||||
entityasSourceOf(JavaTweets, null, sourceOfTweets -> {
|
||||
entityAsSourceOf(JavaTweets, null, sourceOfTweets -> {
|
||||
final CompletionStage<Integer> tweetsCount = sourceOfTweets.runFold(0, (acc, tweet) -> acc + 1, mat);
|
||||
return onComplete(tweetsCount, c -> complete("Total number of tweets: " + c));
|
||||
})
|
||||
|
|
|
|||
|
|
@ -13,8 +13,9 @@ import akka.actor.ActorSystem
|
|||
import akka.stream._
|
||||
import akka.stream.scaladsl._
|
||||
import akka.http.scaladsl.Http
|
||||
import akka.http.scaladsl.common.{ FramingWithContentType, JsonSourceRenderingModes, SourceRenderingMode }
|
||||
import akka.http.scaladsl.common.EntityStreamingSupport
|
||||
import akka.http.scaladsl.marshalling.ToResponseMarshallable
|
||||
import spray.json.RootJsonReader
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.io.StdIn
|
||||
|
|
@ -30,20 +31,12 @@ object TestServer extends App {
|
|||
import system.dispatcher
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
// --------- json streaming ---------
|
||||
import spray.json.DefaultJsonProtocol._
|
||||
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
|
||||
final case class Tweet(message: String)
|
||||
implicit val tweetFormat = jsonFormat1(Tweet)
|
||||
|
||||
// FIXME: Need to be able to support composive framing with content type (!!!!!!!)
|
||||
import akka.http.scaladsl.server.EntityStreamingSupport._
|
||||
/* override if extending EntityStreamingSupport */
|
||||
implicit val incomingEntityStreamFraming: FramingWithContentType = bracketCountingJsonFraming(128)
|
||||
/* override if extending EntityStreamingSupport */
|
||||
implicit val outgoingEntityStreamRendering: SourceRenderingMode = JsonSourceRenderingModes.LineByLine
|
||||
|
||||
// --------- end of json streaming ---------
|
||||
implicit val jsonStreaming = EntityStreamingSupport.json()
|
||||
|
||||
import ScalaXmlSupport._
|
||||
import Directives._
|
||||
|
|
@ -65,13 +58,7 @@ object TestServer extends App {
|
|||
} ~
|
||||
path("secure") {
|
||||
authenticateBasicPF("My very secure site", auth) { user ⇒
|
||||
complete(<html>
|
||||
<body>Hello
|
||||
<b>
|
||||
{user}
|
||||
</b>
|
||||
. Access has been granted!</body>
|
||||
</html>)
|
||||
complete(<html> <body> Hello <b>{user}</b>. Access has been granted! </body> </html>)
|
||||
}
|
||||
} ~
|
||||
path("ping") {
|
||||
|
|
@ -89,6 +76,14 @@ object TestServer extends App {
|
|||
complete(tweets)
|
||||
} ~
|
||||
post {
|
||||
entity(asSourceOf[Tweet]) { tweets ⇒
|
||||
onComplete(tweets.runFold(0)({ case (acc, t) => acc + 1 })) { count =>
|
||||
complete(s"Total tweets received: " + count)
|
||||
}
|
||||
}
|
||||
} ~
|
||||
put {
|
||||
// checking the alternative syntax also works:
|
||||
entity(as[Source[Tweet, NotUsed]]) { tweets ⇒
|
||||
onComplete(tweets.runFold(0)({ case (acc, t) => acc + 1 })) { count =>
|
||||
complete(s"Total tweets received: " + count)
|
||||
|
|
@ -103,7 +98,7 @@ object TestServer extends App {
|
|||
|
||||
val bindingFuture = Http().bindAndHandle(routes, interface = "0.0.0.0", port = 8080)
|
||||
|
||||
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
|
||||
println(s"Server online at http://0.0.0.0:8080/\nPress RETURN to stop...")
|
||||
StdIn.readLine()
|
||||
|
||||
bindingFuture.flatMap(_.unbind()).onComplete(_ ⇒ system.terminate())
|
||||
|
|
|
|||
|
|
@ -1,41 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.http.javadsl.common
|
||||
|
||||
import akka.http.javadsl.model.ContentType.WithCharset
|
||||
import akka.http.javadsl.model.ContentTypes
|
||||
import akka.util.ByteString
|
||||
|
||||
/**
|
||||
* Specialised rendering mode for streaming elements as CSV.
|
||||
*/
|
||||
trait CsvSourceRenderingMode extends SourceRenderingMode {
|
||||
override val contentType: WithCharset =
|
||||
ContentTypes.TEXT_CSV_UTF8
|
||||
}
|
||||
|
||||
object CsvSourceRenderingModes {
|
||||
|
||||
/**
|
||||
* Render sequence of values as row-by-row ('\n' separated) series of values.
|
||||
*/
|
||||
val create: CsvSourceRenderingMode =
|
||||
new CsvSourceRenderingMode {
|
||||
override def between: ByteString = ByteString("\n")
|
||||
override def end: ByteString = ByteString.empty
|
||||
override def start: ByteString = ByteString.empty
|
||||
}
|
||||
|
||||
/**
|
||||
* Render sequence of values as row-by-row (with custom row separator,
|
||||
* e.g. if you need to use '\r\n' instead of '\n') series of values.
|
||||
*/
|
||||
def custom(rowSeparator: String): CsvSourceRenderingMode =
|
||||
new CsvSourceRenderingMode {
|
||||
override def between: ByteString = ByteString(rowSeparator)
|
||||
override def end: ByteString = ByteString.empty
|
||||
override def start: ByteString = ByteString.empty
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,149 @@
|
|||
/*
|
||||
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.http.javadsl.common
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.http.javadsl.model.{ ContentType, ContentTypeRange }
|
||||
import akka.http.scaladsl.common
|
||||
import akka.stream.javadsl.Flow
|
||||
import akka.util.ByteString
|
||||
|
||||
/**
|
||||
* Entity streaming support trait allowing rendering and receiving incoming ``Source[T, _]`` from HTTP entities.
|
||||
*
|
||||
* See [[JsonEntityStreamingSupport]] or [[CsvEntityStreamingSupport]] for default implementations.
|
||||
*/
|
||||
abstract class EntityStreamingSupport {
|
||||
|
||||
/** Read-side, what content types it is able to frame and unmarshall. */
|
||||
def supported: ContentTypeRange
|
||||
/** Write-side, defines what Content-Type the Marshaller should offer and the final Content-Type of the response. */
|
||||
def contentType: ContentType
|
||||
|
||||
/**
|
||||
* Read-side, decode incoming framed entity.
|
||||
* For example with an incoming JSON array, chunk it up into JSON objects contained within that array.
|
||||
*/
|
||||
def getFramingDecoder: Flow[ByteString, ByteString, NotUsed]
|
||||
/**
|
||||
* Write-side, apply framing to outgoing entity stream.
|
||||
*
|
||||
* Most typical usage will be a variant of `Flow[ByteString].intersperse`.
|
||||
*
|
||||
* For example for rendering a JSON array one would return
|
||||
* `Flow[ByteString].intersperse(ByteString("["), ByteString(","), ByteString("]"))`
|
||||
* and for rendering a new-line separated CSV simply `Flow[ByteString].intersperse(ByteString("\n"))`.
|
||||
*/
|
||||
def getFramingRenderer: Flow[ByteString, ByteString, NotUsed]
|
||||
|
||||
/**
|
||||
* Read-side, allows changing what content types are accepted by this framing.
|
||||
*
|
||||
* EntityStreamingSupport traits MUST support re-configuring the accepted [[ContentTypeRange]].
|
||||
*
|
||||
* This is in order to support a-typical APIs which users still want to communicate with using
|
||||
* the provided support trait. Typical examples include APIs which return valid `application/json`
|
||||
* however advertise the content type as being `application/javascript` or vendor specific content types,
|
||||
* which still parse correctly as JSON, CSV or something else that a provided support trait is built for.
|
||||
*
|
||||
* NOTE: Implementations should specialize the return type to their own Type!
|
||||
*/
|
||||
def withSupported(range: ContentTypeRange): EntityStreamingSupport
|
||||
|
||||
/**
|
||||
* Write-side, defines what Content-Type the Marshaller should offer and the final Content-Type of the response.
|
||||
*
|
||||
* EntityStreamingSupport traits MUST support re-configuring the offered [[ContentType]].
|
||||
* This is due to the need integrating with existing systems which sometimes excpect custom Content-Types,
|
||||
* however really are just plain JSON or something else internally (perhaps with slight extensions).
|
||||
*
|
||||
* NOTE: Implementations should specialize the return type to their own Type!
|
||||
*/
|
||||
def withContentType(range: ContentType): EntityStreamingSupport
|
||||
|
||||
/**
|
||||
* Write-side / read-side, defines if (un)marshalling should be done in parallel.
|
||||
*
|
||||
* This may be beneficial marshalling the bottleneck in the pipeline.
|
||||
*
|
||||
* See also [[parallelism]] and [[withParallelMarshalling]].
|
||||
*/
|
||||
def parallelism: Int
|
||||
|
||||
/**
|
||||
* Write-side / read-side, defines if (un)marshalling of incoming stream elements should be perserved or not.
|
||||
*
|
||||
* Allowing for parallel and unordered (un)marshalling often yields higher throughput and also allows avoiding
|
||||
* head-of-line blocking if some elements are much larger than others.
|
||||
*
|
||||
* See also [[parallelism]] and [[withParallelMarshalling]].
|
||||
*/
|
||||
def unordered: Boolean
|
||||
|
||||
/**
|
||||
* Write-side / read-side, defines parallelism and if ordering should be preserved or not of Source element marshalling.
|
||||
*
|
||||
* Sometimes marshalling multiple elements at once (esp. when elements are not evenly sized, and ordering is not enforced)
|
||||
* may yield in higher throughput.
|
||||
*
|
||||
* NOTE: Implementations should specialize the return type to their own Type!
|
||||
*/
|
||||
def withParallelMarshalling(parallelism: Int, unordered: Boolean): EntityStreamingSupport
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Entity streaming support, independent of used Json parsing library etc.
|
||||
*/
|
||||
object EntityStreamingSupport {
|
||||
|
||||
/**
|
||||
* Default `application/json` entity streaming support.
|
||||
*
|
||||
* Provides framing (based on scanning the incoming dataBytes for valid JSON objects, so for example uploads using arrays or
|
||||
* new-line separated JSON objects are all parsed correctly) and rendering of Sources as JSON Arrays.
|
||||
* A different very popular style of returning streaming JSON is to separate JSON objects on a line-by-line basis,
|
||||
* you can configure the support trait to do so by calling `withFramingRendererFlow`.
|
||||
*
|
||||
* Limits the maximum JSON object length to 8KB, if you want to increase this limit provide a value explicitly.
|
||||
*
|
||||
* See also <a href="https://en.wikipedia.org/wiki/JSON_Streaming">https://en.wikipedia.org/wiki/JSON_Streaming</a>
|
||||
*/
|
||||
def json(): JsonEntityStreamingSupport = json(8 * 1024)
|
||||
/**
|
||||
* Default `application/json` entity streaming support.
|
||||
*
|
||||
* Provides framing (based on scanning the incoming dataBytes for valid JSON objects, so for example uploads using arrays or
|
||||
* new-line separated JSON objects are all parsed correctly) and rendering of Sources as JSON Arrays.
|
||||
* A different very popular style of returning streaming JSON is to separate JSON objects on a line-by-line basis,
|
||||
* you can configure the support trait to do so by calling `withFramingRendererFlow`.
|
||||
*
|
||||
* See also <a href="https://en.wikipedia.org/wiki/JSON_Streaming">https://en.wikipedia.org/wiki/JSON_Streaming</a>
|
||||
*/
|
||||
def json(maxObjectLength: Int): JsonEntityStreamingSupport = common.EntityStreamingSupport.json(maxObjectLength)
|
||||
|
||||
/**
|
||||
* Default `text/csv(UTF-8)` entity streaming support.
|
||||
* Provides framing and rendering of `\n` separated lines and marshalling Sources into such values.
|
||||
*
|
||||
* Limits the maximum line-length to 8KB, if you want to increase this limit provide a value explicitly.
|
||||
*/
|
||||
def csv(): CsvEntityStreamingSupport = csv(8 * 1024)
|
||||
/**
|
||||
* Default `text/csv(UTF-8)` entity streaming support.
|
||||
* Provides framing and rendering of `\n` separated lines and marshalling Sources into such values.
|
||||
*/
|
||||
def csv(maxLineLength: Int): CsvEntityStreamingSupport = common.EntityStreamingSupport.csv(maxLineLength)
|
||||
}
|
||||
|
||||
// extends Scala base, in order to get linearization right and (as we can't go into traits here, because companion object needed)
|
||||
abstract class JsonEntityStreamingSupport extends common.EntityStreamingSupport {
|
||||
def withFramingRendererFlow(flow: Flow[ByteString, ByteString, NotUsed]): JsonEntityStreamingSupport
|
||||
}
|
||||
|
||||
// extends Scala base, in order to get linearization right and (as we can't go into traits here, because companion object needed)
|
||||
abstract class CsvEntityStreamingSupport extends common.EntityStreamingSupport {
|
||||
def withFramingRendererFlow(flow: Flow[ByteString, ByteString, NotUsed]): CsvEntityStreamingSupport
|
||||
}
|
||||
|
|
@ -1,36 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.http.javadsl.common
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.event.Logging
|
||||
import akka.http.javadsl.model.ContentTypeRange
|
||||
import akka.stream.javadsl.{ Flow, Framing }
|
||||
import akka.util.ByteString
|
||||
|
||||
/**
|
||||
* Wraps a framing [[akka.stream.javadsl.Flow]] (as provided by [[Framing]] for example)
|
||||
* that chunks up incoming [[akka.util.ByteString]] according to some [[akka.http.javadsl.model.ContentType]]
|
||||
* specific logic.
|
||||
*/
|
||||
abstract class FramingWithContentType { self ⇒
|
||||
import akka.http.impl.util.JavaMapping.Implicits._
|
||||
|
||||
def getFlow: Flow[ByteString, ByteString, NotUsed]
|
||||
|
||||
def asScala: akka.http.scaladsl.common.FramingWithContentType =
|
||||
this match {
|
||||
case f: akka.http.scaladsl.common.FramingWithContentType ⇒ f
|
||||
case _ ⇒ new akka.http.scaladsl.common.FramingWithContentType {
|
||||
override def flow = self.getFlow.asScala
|
||||
override def supported = self.supported.asScala
|
||||
}
|
||||
}
|
||||
|
||||
def supported: ContentTypeRange
|
||||
def matches(ct: akka.http.javadsl.model.ContentType): Boolean = supported.matches(ct)
|
||||
|
||||
override def toString = s"${Logging.simpleName(getClass)}($supported)"
|
||||
}
|
||||
|
|
@ -1,103 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.http.javadsl.common
|
||||
|
||||
import akka.http.javadsl.model.{ ContentType, ContentTypes }
|
||||
|
||||
/**
|
||||
* Specialised rendering mode for streaming elements as JSON.
|
||||
*
|
||||
* See also: <a href="https://en.wikipedia.org/wiki/JSON_Streaming">JSON Streaming on Wikipedia</a>.
|
||||
*
|
||||
* See [[JsonSourceRenderingModes]] for commonly used pre-defined rendering modes.
|
||||
*/
|
||||
trait JsonSourceRenderingMode extends SourceRenderingMode {
|
||||
override val contentType: ContentType.WithFixedCharset =
|
||||
ContentTypes.APPLICATION_JSON
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides default JSON rendering modes.
|
||||
*/
|
||||
object JsonSourceRenderingModes {
|
||||
|
||||
/**
|
||||
* Most compact rendering mode.
|
||||
* It does not intersperse any separator between the signalled elements.
|
||||
*
|
||||
* It can be used with [[akka.stream.javadsl.JsonFraming.bracketCounting]].
|
||||
*
|
||||
* {{{
|
||||
* {"id":42}{"id":43}{"id":44}
|
||||
* }}}
|
||||
*/
|
||||
val compact = akka.http.scaladsl.common.JsonSourceRenderingModes.Compact
|
||||
|
||||
/**
|
||||
* Simple rendering mode, similar to [[compact]] however interspersing elements with a `\n` character.
|
||||
*
|
||||
* {{{
|
||||
* {"id":42},{"id":43},{"id":44}
|
||||
* }}}
|
||||
*/
|
||||
val compactCommaSeparated = akka.http.scaladsl.common.JsonSourceRenderingModes.CompactCommaSeparated
|
||||
|
||||
/**
|
||||
* Rendering mode useful when the receiving end expects a valid JSON Array.
|
||||
* It can be useful when the client wants to detect when the stream has been successfully received in-full,
|
||||
* which it can determine by seeing the terminating `]` character.
|
||||
*
|
||||
* The framing's terminal `]` will ONLY be emitted if the stream has completed successfully,
|
||||
* in other words - the stream has been emitted completely, without errors occuring before the final element has been signaled.
|
||||
*
|
||||
* {{{
|
||||
* [{"id":42},{"id":43},{"id":44}]
|
||||
* }}}
|
||||
*/
|
||||
val arrayCompact = akka.http.scaladsl.common.JsonSourceRenderingModes.ArrayCompact
|
||||
|
||||
/**
|
||||
* Rendering mode useful when the receiving end expects a valid JSON Array.
|
||||
* It can be useful when the client wants to detect when the stream has been successfully received in-full,
|
||||
* which it can determine by seeing the terminating `]` character.
|
||||
*
|
||||
* The framing's terminal `]` will ONLY be emitted if the stream has completed successfully,
|
||||
* in other words - the stream has been emitted completely, without errors occuring before the final element has been signaled.
|
||||
*
|
||||
* {{{
|
||||
* [{"id":42},
|
||||
* {"id":43},
|
||||
* {"id":44}]
|
||||
* }}}
|
||||
*/
|
||||
val arrayLineByLine = akka.http.scaladsl.common.JsonSourceRenderingModes.ArrayLineByLine
|
||||
|
||||
/**
|
||||
* Recommended rendering mode.
|
||||
*
|
||||
* It is a nice balance between valid and human-readable as well as resonably small size overhead (just the `\n` between elements).
|
||||
* A good example of API's using this syntax is Twitter's Firehose (last verified at 1.1 version of that API).
|
||||
*
|
||||
* {{{
|
||||
* {"id":42}
|
||||
* {"id":43}
|
||||
* {"id":44}
|
||||
* }}}
|
||||
*/
|
||||
val lineByLine = akka.http.scaladsl.common.JsonSourceRenderingModes.LineByLine
|
||||
|
||||
/**
|
||||
* Simple rendering mode interspersing each pair of elements with both `,\n`.
|
||||
* Picking the [[lineByLine]] format may be preferable, as it is slightly simpler to parse - each line being a valid json object (no need to trim the comma).
|
||||
*
|
||||
* {{{
|
||||
* {"id":42},
|
||||
* {"id":43},
|
||||
* {"id":44}
|
||||
* }}}
|
||||
*/
|
||||
val lineByLineCommaSeparated = akka.http.scaladsl.common.JsonSourceRenderingModes.LineByLineCommaSeparated
|
||||
|
||||
}
|
||||
|
|
@ -1,22 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.http.javadsl.common
|
||||
|
||||
import akka.http.javadsl.model.ContentType
|
||||
import akka.util.ByteString
|
||||
|
||||
/**
|
||||
* Defines how to render a [[akka.stream.javadsl.Source]] into a raw [[ByteString]]
|
||||
* output.
|
||||
*
|
||||
* This can be used to render a source into an [[akka.http.scaladsl.model.HttpEntity]].
|
||||
*/
|
||||
trait SourceRenderingMode {
|
||||
def contentType: ContentType
|
||||
|
||||
def start: ByteString
|
||||
def between: ByteString
|
||||
def end: ByteString
|
||||
}
|
||||
|
|
@ -1,41 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.http.javadsl.server
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.http.javadsl.common.{ FramingWithContentType, SourceRenderingMode }
|
||||
import akka.http.javadsl.model.{ ContentTypeRange, MediaRanges }
|
||||
import akka.http.scaladsl.server.ApplicationJsonBracketCountingFraming
|
||||
import akka.stream.javadsl.{ Flow, Framing }
|
||||
import akka.util.ByteString
|
||||
|
||||
/**
|
||||
* Entity streaming support, independent of used Json parsing library etc.
|
||||
*
|
||||
* Can be extended by various Support traits (e.g. "SprayJsonSupport"),
|
||||
* in order to provide users with both `framing` (this trait) and `marshalling`
|
||||
* (implemented by a library) by using a single trait.
|
||||
*/
|
||||
object EntityStreamingSupport {
|
||||
// in the ScalaDSL version we make users implement abstract methods that are supposed to be
|
||||
// implicit vals. This helps to guide in implementing the needed values, however in Java that would not really help.
|
||||
|
||||
/** `application/json` specific Framing implementation */
|
||||
def bracketCountingJsonFraming(maximumObjectLength: Int): FramingWithContentType =
|
||||
new ApplicationJsonBracketCountingFraming(maximumObjectLength)
|
||||
|
||||
/**
|
||||
* Frames incoming `text / *` entities on a line-by-line basis.
|
||||
* Useful for accepting `text/csv` uploads as a stream of rows.
|
||||
*/
|
||||
def newLineFraming(maximumObjectLength: Int, supportedContentTypes: ContentTypeRange): FramingWithContentType =
|
||||
new FramingWithContentType {
|
||||
override final val getFlow: Flow[ByteString, ByteString, NotUsed] =
|
||||
Flow.of(classOf[ByteString]).via(Framing.delimiter(ByteString("\n"), maximumObjectLength))
|
||||
|
||||
override final val supported: ContentTypeRange =
|
||||
akka.http.scaladsl.model.ContentTypeRange(akka.http.scaladsl.model.MediaRanges.`text/*`)
|
||||
}
|
||||
}
|
||||
|
|
@ -8,11 +8,13 @@ import java.util.concurrent.CompletionStage
|
|||
|
||||
import akka.http.impl.util.JavaMapping._
|
||||
import akka.http.impl.util._
|
||||
import akka.http.javadsl.common.EntityStreamingSupport
|
||||
import akka.http.{ javadsl, scaladsl }
|
||||
import akka.http.scaladsl.server.{ directives ⇒ sdirectives }
|
||||
import akka.http.scaladsl.{ common ⇒ scommon }
|
||||
import akka.http.javadsl.server.{ directives ⇒ jdirectives }
|
||||
import akka.http.javadsl.{ common ⇒ jcommon }
|
||||
|
||||
import scala.collection.immutable
|
||||
|
||||
/**
|
||||
|
|
@ -45,7 +47,7 @@ private[http] object RoutingJavaMapping {
|
|||
}
|
||||
implicit object convertRouteResult extends Inherited[javadsl.server.RouteResult, scaladsl.server.RouteResult]
|
||||
|
||||
implicit object convertSourceRenderingMode extends Inherited[jcommon.SourceRenderingMode, scommon.SourceRenderingMode]
|
||||
implicit object convertEntityStreamingSupport extends Inherited[EntityStreamingSupport, scommon.EntityStreamingSupport]
|
||||
|
||||
implicit object convertDirectoryRenderer extends Inherited[jdirectives.DirectoryRenderer, sdirectives.FileAndResourceDirectives.DirectoryRenderer]
|
||||
implicit object convertContentTypeResolver extends Inherited[jdirectives.ContentTypeResolver, sdirectives.ContentTypeResolver]
|
||||
|
|
|
|||
|
|
@ -7,10 +7,12 @@ import java.util.function.{ Function ⇒ JFunction }
|
|||
import java.util.{ List ⇒ JList, Map ⇒ JMap }
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.http.javadsl.common.{ FramingWithContentType, SourceRenderingMode }
|
||||
import akka.http.javadsl.common.EntityStreamingSupport
|
||||
import akka.http.javadsl.marshalling.Marshaller
|
||||
import akka.http.javadsl.model.{ HttpEntity, _ }
|
||||
import akka.http.javadsl.server.{ Marshaller, Route, Unmarshaller }
|
||||
import akka.http.scaladsl.marshalling.ToResponseMarshallable
|
||||
import akka.http.javadsl.server.Route
|
||||
import akka.http.javadsl.unmarshalling.Unmarshaller
|
||||
import akka.http.scaladsl.marshalling.{ Marshalling, ToByteStringMarshaller, ToResponseMarshallable }
|
||||
import akka.http.scaladsl.server.{ Directives ⇒ D }
|
||||
import akka.stream.javadsl.Source
|
||||
import akka.util.ByteString
|
||||
|
|
@ -18,56 +20,39 @@ import akka.util.ByteString
|
|||
/** EXPERIMENTAL API */
|
||||
abstract class FramedEntityStreamingDirectives extends TimeoutDirectives {
|
||||
|
||||
import akka.http.javadsl.server.RoutingJavaMapping._
|
||||
import akka.http.javadsl.server.RoutingJavaMapping.Implicits._
|
||||
|
||||
@CorrespondsTo("asSourceOf")
|
||||
def entityasSourceOf[T](um: Unmarshaller[ByteString, T], framing: FramingWithContentType,
|
||||
def entityAsSourceOf[T](um: Unmarshaller[ByteString, T], support: EntityStreamingSupport,
|
||||
inner: java.util.function.Function[Source[T, NotUsed], Route]): Route = RouteAdapter {
|
||||
D.entity(D.asSourceOf[T](framing.asScala)(um.asScala)) { s: akka.stream.scaladsl.Source[T, NotUsed] ⇒
|
||||
val umm = D.asSourceOf(um.asScala, support.asScala)
|
||||
D.entity(umm) { s: akka.stream.scaladsl.Source[T, NotUsed] ⇒
|
||||
inner(s.asJava).delegate
|
||||
}
|
||||
}
|
||||
|
||||
@CorrespondsTo("asSourceOfAsync")
|
||||
def entityAsSourceAsyncOf[T](
|
||||
parallelism: Int,
|
||||
um: Unmarshaller[ByteString, T], framing: FramingWithContentType,
|
||||
inner: java.util.function.Function[Source[T, NotUsed], Route]): Route = RouteAdapter {
|
||||
D.entity(D.asSourceOfAsync[T](parallelism, framing.asScala)(um.asScala)) { s: akka.stream.scaladsl.Source[T, NotUsed] ⇒
|
||||
inner(s.asJava).delegate
|
||||
}
|
||||
}
|
||||
|
||||
@CorrespondsTo("asSourceOfAsyncUnordered")
|
||||
def entityAsSourceAsyncUnorderedOf[T](
|
||||
parallelism: Int,
|
||||
um: Unmarshaller[ByteString, T], framing: FramingWithContentType,
|
||||
inner: java.util.function.Function[Source[T, NotUsed], Route]): Route = RouteAdapter {
|
||||
D.entity(D.asSourceOfAsyncUnordered[T](parallelism, framing.asScala)(um.asScala)) { s: akka.stream.scaladsl.Source[T, NotUsed] ⇒
|
||||
inner(s.asJava).delegate
|
||||
}
|
||||
}
|
||||
|
||||
// implicits used internally, Java caller does not benefit or use it
|
||||
// implicits and multiple parameter lists used internally, Java caller does not benefit or use it
|
||||
@CorrespondsTo("complete")
|
||||
def completeWithSource[T, M](implicit source: Source[T, M], m: Marshaller[T, ByteString], rendering: SourceRenderingMode): Route = RouteAdapter {
|
||||
implicit val mm = _sourceMarshaller(m.map(ByteStringAsEntityFn), rendering)
|
||||
val response = ToResponseMarshallable(source)
|
||||
def completeWithSource[T, M](source: Source[T, M])(implicit m: Marshaller[T, ByteString], support: EntityStreamingSupport): Route = RouteAdapter {
|
||||
import akka.http.scaladsl.marshalling.PredefinedToResponseMarshallers._
|
||||
val mm = m.map(ByteStringAsEntityFn).asScalaCastOutput[akka.http.scaladsl.model.RequestEntity]
|
||||
val mmm = fromEntityStreamingSupportAndEntityMarshaller[T, M](support.asScala, mm)
|
||||
val response = ToResponseMarshallable(source.asScala)(mmm)
|
||||
D.complete(response)
|
||||
}
|
||||
|
||||
// implicits and multiple parameter lists used internally, Java caller does not benefit or use it
|
||||
@CorrespondsTo("complete")
|
||||
def completeOKWithSource[T, M](implicit source: Source[T, M], m: Marshaller[T, RequestEntity], rendering: SourceRenderingMode): Route = RouteAdapter {
|
||||
implicit val mm = _sourceMarshaller[T, M](m, rendering)
|
||||
val response = ToResponseMarshallable(source)
|
||||
def completeOKWithSource[T, M](source: Source[T, M])(implicit m: Marshaller[T, RequestEntity], support: EntityStreamingSupport): Route = RouteAdapter {
|
||||
import akka.http.scaladsl.marshalling.PredefinedToResponseMarshallers._
|
||||
// don't try this at home:
|
||||
val mm = m.asScalaCastOutput[akka.http.scaladsl.model.RequestEntity].map(_.httpEntity.asInstanceOf[akka.http.scaladsl.model.RequestEntity])
|
||||
implicit val mmm = fromEntityStreamingSupportAndEntityMarshaller[T, M](support.asScala, mm)
|
||||
val response = ToResponseMarshallable(source.asScala)
|
||||
D.complete(response)
|
||||
}
|
||||
|
||||
implicit private def _sourceMarshaller[T, M](implicit m: Marshaller[T, HttpEntity], rendering: SourceRenderingMode) = {
|
||||
import akka.http.javadsl.server.RoutingJavaMapping.Implicits._
|
||||
import akka.http.javadsl.server.RoutingJavaMapping._
|
||||
val mm = m.asScalaCastOutput
|
||||
D._sourceMarshaller[T, M](mm, rendering.asScala).compose({ h: akka.stream.javadsl.Source[T, M] ⇒ h.asScala })
|
||||
}
|
||||
|
||||
private[this] val ByteStringAsEntityFn = new java.util.function.Function[ByteString, HttpEntity]() {
|
||||
override def apply(bs: ByteString): HttpEntity = HttpEntities.create(bs)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ abstract class FutureDirectives extends FormFieldDirectives {
|
|||
/**
|
||||
* "Unwraps" a `CompletionStage<T>` and runs the inner route after future
|
||||
* completion with the future's value as an extraction of type `Try<T>`.
|
||||
*
|
||||
*
|
||||
* @group future
|
||||
*/
|
||||
def onComplete[T](f: Supplier[CompletionStage[T]], inner: JFunction[Try[T], Route]) = RouteAdapter {
|
||||
|
|
@ -35,7 +35,7 @@ abstract class FutureDirectives extends FormFieldDirectives {
|
|||
/**
|
||||
* "Unwraps" a `CompletionStage<T>` and runs the inner route after future
|
||||
* completion with the future's value as an extraction of type `Try<T>`.
|
||||
*
|
||||
*
|
||||
* @group future
|
||||
*/
|
||||
def onComplete[T](cs: CompletionStage[T], inner: JFunction[Try[T], Route]) = RouteAdapter {
|
||||
|
|
@ -65,7 +65,7 @@ abstract class FutureDirectives extends FormFieldDirectives {
|
|||
* completion with the stage's value as an extraction of type `T`.
|
||||
* If the stage fails its failure Throwable is bubbled up to the nearest
|
||||
* ExceptionHandler.
|
||||
*
|
||||
*
|
||||
* @group future
|
||||
*/
|
||||
def onSuccess[T](f: Supplier[CompletionStage[T]], inner: JFunction[T, Route]) = RouteAdapter {
|
||||
|
|
@ -80,7 +80,7 @@ abstract class FutureDirectives extends FormFieldDirectives {
|
|||
* If the completion stage succeeds the request is completed using the values marshaller
|
||||
* (This directive therefore requires a marshaller for the completion stage value type to be
|
||||
* provided.)
|
||||
*
|
||||
*
|
||||
* @group future
|
||||
*/
|
||||
def completeOrRecoverWith[T](f: Supplier[CompletionStage[T]], marshaller: Marshaller[T, RequestEntity], inner: JFunction[Throwable, Route]): Route = RouteAdapter {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.http.scaladsl.common
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.event.Logging
|
||||
import akka.http.javadsl.{ common, model ⇒ jm }
|
||||
import akka.http.scaladsl.model.{ ContentType, ContentTypeRange, ContentTypes }
|
||||
import akka.stream.scaladsl.{ Flow, Framing }
|
||||
import akka.util.ByteString
|
||||
|
||||
final class CsvEntityStreamingSupport private[akka] (
|
||||
maxLineLength: Int,
|
||||
val supported: ContentTypeRange,
|
||||
val contentType: ContentType,
|
||||
val framingRenderer: Flow[ByteString, ByteString, NotUsed],
|
||||
val parallelism: Int,
|
||||
val unordered: Boolean
|
||||
) extends common.CsvEntityStreamingSupport {
|
||||
import akka.http.impl.util.JavaMapping.Implicits._
|
||||
|
||||
def this(maxObjectSize: Int) =
|
||||
this(
|
||||
maxObjectSize,
|
||||
ContentTypeRange(ContentTypes.`text/csv(UTF-8)`),
|
||||
ContentTypes.`text/csv(UTF-8)`,
|
||||
Flow[ByteString].intersperse(ByteString("\n")),
|
||||
1, false)
|
||||
|
||||
override val framingDecoder: Flow[ByteString, ByteString, NotUsed] =
|
||||
Framing.delimiter(ByteString("\n"), maxLineLength)
|
||||
|
||||
override def withFramingRendererFlow(framingRendererFlow: akka.stream.javadsl.Flow[ByteString, ByteString, NotUsed]): CsvEntityStreamingSupport =
|
||||
withFramingRenderer(framingRendererFlow.asScala)
|
||||
def withFramingRenderer(framingRendererFlow: Flow[ByteString, ByteString, NotUsed]): CsvEntityStreamingSupport =
|
||||
new CsvEntityStreamingSupport(maxLineLength, supported, contentType, framingRendererFlow, parallelism, unordered)
|
||||
|
||||
override def withContentType(ct: jm.ContentType): CsvEntityStreamingSupport =
|
||||
new CsvEntityStreamingSupport(maxLineLength, supported, ct.asScala, framingRenderer, parallelism, unordered)
|
||||
override def withSupported(range: jm.ContentTypeRange): CsvEntityStreamingSupport =
|
||||
new CsvEntityStreamingSupport(maxLineLength, range.asScala, contentType, framingRenderer, parallelism, unordered)
|
||||
override def withParallelMarshalling(parallelism: Int, unordered: Boolean): CsvEntityStreamingSupport =
|
||||
new CsvEntityStreamingSupport(maxLineLength, supported, contentType, framingRenderer, parallelism, unordered)
|
||||
|
||||
override def toString = s"""${Logging.simpleName(getClass)}($maxLineLength, $supported, $contentType)"""
|
||||
}
|
||||
|
|
@ -0,0 +1,141 @@
|
|||
/*
|
||||
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package akka.http.scaladsl.common
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.http.javadsl.{ common, model ⇒ jm }
|
||||
import akka.http.scaladsl.model._
|
||||
import akka.stream.scaladsl.Flow
|
||||
import akka.util.ByteString
|
||||
|
||||
/**
|
||||
* Entity streaming support trait allowing rendering and receiving incoming ``Source[T, _]`` from HTTP entities.
|
||||
*
|
||||
* See [[JsonEntityStreamingSupport]] or [[CsvEntityStreamingSupport]] for default implementations.
|
||||
*/
|
||||
abstract class EntityStreamingSupport extends common.EntityStreamingSupport {
|
||||
/** Read-side, what content types it is able to frame and unmarshall. */
|
||||
def supported: ContentTypeRange
|
||||
/** Write-side, defines what Content-Type the Marshaller should offer and the final Content-Type of the response. */
|
||||
def contentType: ContentType
|
||||
|
||||
/**
|
||||
* Read-side, decode incoming framed entity.
|
||||
* For example with an incoming JSON array, chunk it up into JSON objects contained within that array.
|
||||
*/
|
||||
def framingDecoder: Flow[ByteString, ByteString, NotUsed]
|
||||
override final def getFramingDecoder = framingDecoder.asJava
|
||||
|
||||
/**
|
||||
* Write-side, apply framing to outgoing entity stream.
|
||||
*
|
||||
* Most typical usage will be a variant of `Flow[ByteString].intersperse`.
|
||||
*
|
||||
* For example for rendering a JSON array one would return
|
||||
* `Flow[ByteString].intersperse(ByteString("["), ByteString(","), ByteString("]"))`
|
||||
* and for rendering a new-line separated CSV simply `Flow[ByteString].intersperse(ByteString("\n"))`.
|
||||
*/
|
||||
def framingRenderer: Flow[ByteString, ByteString, NotUsed]
|
||||
override final def getFramingRenderer = framingRenderer.asJava
|
||||
|
||||
/**
|
||||
* Read-side, allows changing what content types are accepted by this framing.
|
||||
*
|
||||
* EntityStreamingSupport traits MUST support re-configuring the accepted [[ContentTypeRange]].
|
||||
*
|
||||
* This is in order to support a-typical APIs which users still want to communicate with using
|
||||
* the provided support trait. Typical examples include APIs which return valid `application/json`
|
||||
* however advertise the content type as being `application/javascript` or vendor specific content types,
|
||||
* which still parse correctly as JSON, CSV or something else that a provided support trait is built for.
|
||||
*
|
||||
* NOTE: Implementations should specialize the return type to their own Type!
|
||||
*/
|
||||
override def withSupported(range: jm.ContentTypeRange): EntityStreamingSupport
|
||||
|
||||
/**
|
||||
* Write-side, defines what Content-Type the Marshaller should offer and the final Content-Type of the response.
|
||||
*
|
||||
* EntityStreamingSupport traits MUST support re-configuring the offered [[ContentType]].
|
||||
* This is due to the need integrating with existing systems which sometimes excpect custom Content-Types,
|
||||
* however really are just plain JSON or something else internally (perhaps with slight extensions).
|
||||
*
|
||||
* NOTE: Implementations should specialize the return type to their own Type!
|
||||
*/
|
||||
override def withContentType(range: jm.ContentType): EntityStreamingSupport
|
||||
|
||||
/**
|
||||
* Write-side / read-side, defines if (un)marshalling should be done in parallel.
|
||||
*
|
||||
* This may be beneficial marshalling the bottleneck in the pipeline.
|
||||
*
|
||||
* See also [[parallelism]] and [[withParallelMarshalling]].
|
||||
*/
|
||||
def parallelism: Int
|
||||
|
||||
/**
|
||||
* Write-side / read-side, defines if (un)marshalling of incoming stream elements should be perserved or not.
|
||||
*
|
||||
* Allowing for parallel and unordered (un)marshalling often yields higher throughput and also allows avoiding
|
||||
* head-of-line blocking if some elements are much larger than others.
|
||||
*
|
||||
* See also [[parallelism]] and [[withParallelMarshalling]].
|
||||
*/
|
||||
def unordered: Boolean
|
||||
|
||||
/**
|
||||
* Write-side / read-side, defines parallelism and if ordering should be preserved or not of Source element marshalling.
|
||||
*
|
||||
* Sometimes marshalling multiple elements at once (esp. when elements are not evenly sized, and ordering is not enforced)
|
||||
* may yield in higher throughput.
|
||||
*
|
||||
* NOTE: Implementations should specialize the return type to their own Type!
|
||||
*/
|
||||
def withParallelMarshalling(parallelism: Int, unordered: Boolean): EntityStreamingSupport
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Entity streaming support, independent of used Json parsing library etc.
|
||||
*/
|
||||
object EntityStreamingSupport {
|
||||
|
||||
/**
|
||||
* Default `application/json` entity streaming support.
|
||||
*
|
||||
* Provides framing (based on scanning the incoming dataBytes for valid JSON objects, so for example uploads using arrays or
|
||||
* new-line separated JSON objects are all parsed correctly) and rendering of Sources as JSON Arrays.
|
||||
* A different very popular style of returning streaming JSON is to separate JSON objects on a line-by-line basis,
|
||||
* you can configure the support trait to do so by calling `withFramingRendererFlow`.
|
||||
*
|
||||
* Limits the maximum JSON object length to 8KB, if you want to increase this limit provide a value explicitly.
|
||||
*
|
||||
* See also <a href="https://en.wikipedia.org/wiki/JSON_Streaming">https://en.wikipedia.org/wiki/JSON_Streaming</a>
|
||||
*/
|
||||
def json(): JsonEntityStreamingSupport = json(8 * 1024)
|
||||
/**
|
||||
* Default `application/json` entity streaming support.
|
||||
*
|
||||
* Provides framing (based on scanning the incoming dataBytes for valid JSON objects, so for example uploads using arrays or
|
||||
* new-line separated JSON objects are all parsed correctly) and rendering of Sources as JSON Arrays.
|
||||
* A different very popular style of returning streaming JSON is to separate JSON objects on a line-by-line basis,
|
||||
* you can configure the support trait to do so by calling `withFramingRendererFlow`.
|
||||
*
|
||||
* See also <a href="https://en.wikipedia.org/wiki/JSON_Streaming">https://en.wikipedia.org/wiki/JSON_Streaming</a>
|
||||
*/
|
||||
def json(maxObjectLength: Int): JsonEntityStreamingSupport = new JsonEntityStreamingSupport(maxObjectLength)
|
||||
|
||||
/**
|
||||
* Default `text/csv(UTF-8)` entity streaming support.
|
||||
* Provides framing and rendering of `\n` separated lines and marshalling Sources into such values.
|
||||
*
|
||||
* Limits the maximum line-length to 8KB, if you want to increase this limit provide a value explicitly.
|
||||
*/
|
||||
def csv(): CsvEntityStreamingSupport = csv(8 * 1024)
|
||||
/**
|
||||
* Default `text/csv(UTF-8)` entity streaming support.
|
||||
* Provides framing and rendering of `\n` separated lines and marshalling Sources into such values.
|
||||
*/
|
||||
def csv(maxLineLength: Int): CsvEntityStreamingSupport = new CsvEntityStreamingSupport(maxLineLength)
|
||||
}
|
||||
|
||||
|
|
@ -1,25 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.http.scaladsl.common
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.event.Logging
|
||||
import akka.http.scaladsl.model.ContentTypeRange
|
||||
import akka.stream.scaladsl.{ Flow, Framing }
|
||||
import akka.util.ByteString
|
||||
|
||||
/**
|
||||
* Wraps a framing [[akka.stream.scaladsl.Flow]] (as provided by [[Framing]] for example)
|
||||
* that chunks up incoming [[akka.util.ByteString]] according to some [[akka.http.javadsl.model.ContentType]]
|
||||
* specific logic.
|
||||
*/
|
||||
abstract class FramingWithContentType extends akka.http.javadsl.common.FramingWithContentType {
|
||||
def flow: Flow[ByteString, ByteString, NotUsed]
|
||||
override final def getFlow = flow.asJava
|
||||
override def supported: ContentTypeRange
|
||||
override def matches(ct: akka.http.javadsl.model.ContentType): Boolean = supported.matches(ct)
|
||||
|
||||
override def toString = s"${Logging.simpleName(getClass)}($supported)"
|
||||
}
|
||||
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.http.scaladsl.common
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.event.Logging
|
||||
import akka.http.javadsl.{ common, model ⇒ jm }
|
||||
import akka.http.scaladsl.model.{ ContentType, ContentTypeRange, ContentTypes }
|
||||
import akka.stream.scaladsl.Flow
|
||||
import akka.util.ByteString
|
||||
|
||||
final class JsonEntityStreamingSupport private[akka] (
|
||||
maxObjectSize: Int,
|
||||
val supported: ContentTypeRange,
|
||||
val contentType: ContentType,
|
||||
val framingRenderer: Flow[ByteString, ByteString, NotUsed],
|
||||
val parallelism: Int,
|
||||
val unordered: Boolean
|
||||
) extends common.JsonEntityStreamingSupport {
|
||||
import akka.http.impl.util.JavaMapping.Implicits._
|
||||
|
||||
def this(maxObjectSize: Int) =
|
||||
this(
|
||||
maxObjectSize,
|
||||
ContentTypeRange(ContentTypes.`application/json`),
|
||||
ContentTypes.`application/json`,
|
||||
Flow[ByteString].intersperse(ByteString("["), ByteString(","), ByteString("]")),
|
||||
1, false)
|
||||
|
||||
override val framingDecoder: Flow[ByteString, ByteString, NotUsed] =
|
||||
akka.stream.scaladsl.JsonFraming.objectScanner(maxObjectSize)
|
||||
|
||||
override def withFramingRendererFlow(framingRendererFlow: akka.stream.javadsl.Flow[ByteString, ByteString, NotUsed]): JsonEntityStreamingSupport =
|
||||
withFramingRenderer(framingRendererFlow.asScala)
|
||||
def withFramingRenderer(framingRendererFlow: Flow[ByteString, ByteString, NotUsed]): JsonEntityStreamingSupport =
|
||||
new JsonEntityStreamingSupport(maxObjectSize, supported, contentType, framingRendererFlow, parallelism, unordered)
|
||||
|
||||
override def withContentType(ct: jm.ContentType): JsonEntityStreamingSupport =
|
||||
new JsonEntityStreamingSupport(maxObjectSize, supported, ct.asScala, framingRenderer, parallelism, unordered)
|
||||
override def withSupported(range: jm.ContentTypeRange): JsonEntityStreamingSupport =
|
||||
new JsonEntityStreamingSupport(maxObjectSize, range.asScala, contentType, framingRenderer, parallelism, unordered)
|
||||
override def withParallelMarshalling(parallelism: Int, unordered: Boolean): JsonEntityStreamingSupport =
|
||||
new JsonEntityStreamingSupport(maxObjectSize, supported, contentType, framingRenderer, parallelism, unordered)
|
||||
|
||||
override def toString = s"""${Logging.simpleName(getClass)}($maxObjectSize, $supported, $contentType)"""
|
||||
|
||||
}
|
||||
|
|
@ -1,128 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.http.scaladsl.common
|
||||
|
||||
import akka.http.scaladsl.model.{ ContentType, ContentTypes }
|
||||
import akka.util.ByteString
|
||||
|
||||
/**
|
||||
* Specialised rendering mode for streaming elements as JSON.
|
||||
*
|
||||
* See also: <a href="https://en.wikipedia.org/wiki/JSON_Streaming">JSON Streaming on Wikipedia</a>.
|
||||
*
|
||||
* See [[JsonSourceRenderingModes]] for commonly used pre-defined rendering modes.
|
||||
*/
|
||||
trait JsonSourceRenderingMode extends akka.http.javadsl.common.JsonSourceRenderingMode with SourceRenderingMode {
|
||||
override val contentType: ContentType.WithFixedCharset =
|
||||
ContentTypes.`application/json`
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides default JSON rendering modes.
|
||||
*/
|
||||
object JsonSourceRenderingModes {
|
||||
|
||||
/**
|
||||
* Most compact rendering mode.
|
||||
* It does not intersperse any separator between the signalled elements.
|
||||
*
|
||||
* It is the most compact form to render JSON and can be framed properly by using [[akka.stream.javadsl.JsonFraming.bracketCounting]].
|
||||
*
|
||||
* {{{
|
||||
* {"id":42}{"id":43}{"id":44}
|
||||
* }}}
|
||||
*/
|
||||
object Compact extends JsonSourceRenderingMode {
|
||||
override val start: ByteString = ByteString.empty
|
||||
override val between: ByteString = ByteString.empty
|
||||
override val end: ByteString = ByteString.empty
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple rendering mode, similar to [[Compact]] however interspersing elements with a `\n` character.
|
||||
*
|
||||
* {{{
|
||||
* {"id":42},{"id":43},{"id":44}
|
||||
* }}}
|
||||
*/
|
||||
object CompactCommaSeparated extends JsonSourceRenderingMode {
|
||||
override val start: ByteString = ByteString.empty
|
||||
override val between: ByteString = ByteString(",")
|
||||
override val end: ByteString = ByteString.empty
|
||||
}
|
||||
|
||||
/**
|
||||
* Rendering mode useful when the receiving end expects a valid JSON Array.
|
||||
* It can be useful when the client wants to detect when the stream has been successfully received in-full,
|
||||
* which it can determine by seeing the terminating `]` character.
|
||||
*
|
||||
* The framing's terminal `]` will ONLY be emitted if the stream has completed successfully,
|
||||
* in other words - the stream has been emitted completely, without errors occuring before the final element has been signaled.
|
||||
*
|
||||
* {{{
|
||||
* [{"id":42},{"id":43},{"id":44}]
|
||||
* }}}
|
||||
*/
|
||||
object ArrayCompact extends JsonSourceRenderingMode {
|
||||
override val start: ByteString = ByteString("[")
|
||||
override val between: ByteString = ByteString(",")
|
||||
override val end: ByteString = ByteString("]")
|
||||
}
|
||||
|
||||
/**
|
||||
* Rendering mode useful when the receiving end expects a valid JSON Array.
|
||||
* It can be useful when the client wants to detect when the stream has been successfully received in-full,
|
||||
* which it can determine by seeing the terminating `]` character.
|
||||
*
|
||||
* The framing's terminal `]` will ONLY be emitted if the stream has completed successfully,
|
||||
* in other words - the stream has been emitted completely, without errors occuring before the final element has been signaled.
|
||||
*
|
||||
* {{{
|
||||
* [{"id":42},
|
||||
* {"id":43},
|
||||
* {"id":44}]
|
||||
* }}}
|
||||
*/
|
||||
object ArrayLineByLine extends JsonSourceRenderingMode {
|
||||
override val start: ByteString = ByteString("[")
|
||||
override val between: ByteString = ByteString(",\n")
|
||||
override val end: ByteString = ByteString("]")
|
||||
}
|
||||
|
||||
/**
|
||||
* Recommended rendering mode.
|
||||
*
|
||||
* It is a nice balance between valid and human-readable as well as resonably small size overhead (just the `\n` between elements).
|
||||
* A good example of API's using this syntax is Twitter's Firehose (last verified at 1.1 version of that API).
|
||||
*
|
||||
* {{{
|
||||
* {"id":42}
|
||||
* {"id":43}
|
||||
* {"id":44}
|
||||
* }}}
|
||||
*/
|
||||
object LineByLine extends JsonSourceRenderingMode {
|
||||
override val start: ByteString = ByteString.empty
|
||||
override val between: ByteString = ByteString("\n")
|
||||
override val end: ByteString = ByteString.empty
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple rendering mode interspersing each pair of elements with both `,\n`.
|
||||
* Picking the [[LineByLine]] format may be preferable, as it is slightly simpler to parse - each line being a valid json object (no need to trim the comma).
|
||||
*
|
||||
* {{{
|
||||
* {"id":42},
|
||||
* {"id":43},
|
||||
* {"id":44}
|
||||
* }}}
|
||||
*/
|
||||
object LineByLineCommaSeparated extends JsonSourceRenderingMode {
|
||||
override val start: ByteString = ByteString.empty
|
||||
override val between: ByteString = ByteString(",\n")
|
||||
override val end: ByteString = ByteString.empty
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,11 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.http.scaladsl.common
|
||||
|
||||
import akka.http.scaladsl.model.ContentType
|
||||
|
||||
trait SourceRenderingMode extends akka.http.javadsl.common.SourceRenderingMode {
|
||||
override def contentType: ContentType
|
||||
}
|
||||
|
|
@ -4,12 +4,19 @@
|
|||
|
||||
package akka.http.scaladsl.marshalling
|
||||
|
||||
import akka.http.scaladsl.common.EntityStreamingSupport
|
||||
import akka.stream.impl.ConstantFun
|
||||
|
||||
import scala.collection.immutable
|
||||
import akka.http.scaladsl.util.FastFuture._
|
||||
import akka.http.scaladsl.model.MediaTypes._
|
||||
import akka.http.scaladsl.model._
|
||||
import akka.http.scaladsl.server.ContentNegotiator
|
||||
import akka.http.scaladsl.util.FastFuture
|
||||
import akka.stream.scaladsl.Source
|
||||
import akka.util.ByteString
|
||||
|
||||
import scala.language.higherKinds
|
||||
|
||||
trait PredefinedToResponseMarshallers extends LowPriorityToResponseMarshallerImplicits {
|
||||
|
||||
|
|
@ -41,6 +48,37 @@ trait PredefinedToResponseMarshallers extends LowPriorityToResponseMarshallerImp
|
|||
Marshaller(implicit ec ⇒ {
|
||||
case (status, headers, value) ⇒ mt(value).fast map (_ map (_ map (HttpResponse(status, headers, _))))
|
||||
})
|
||||
|
||||
implicit def fromEntityStreamingSupportAndByteStringMarshaller[T, M](implicit s: EntityStreamingSupport, m: ToByteStringMarshaller[T]): ToResponseMarshaller[Source[T, M]] = {
|
||||
Marshaller[Source[T, M], HttpResponse] { implicit ec ⇒ source ⇒
|
||||
FastFuture successful {
|
||||
Marshalling.WithFixedContentType(s.contentType, () ⇒ {
|
||||
val availableMarshallingsPerElement = source.mapAsync(1) { t ⇒ m(t)(ec) }
|
||||
|
||||
// TODO optimise such that we pick the optimal marshalling only once (headAndTail needed?)
|
||||
// TODO, NOTE: this is somewhat duplicated from Marshal.scala it could be made DRYer
|
||||
val bestMarshallingPerElement = availableMarshallingsPerElement mapConcat { marshallings ⇒
|
||||
// pick the Marshalling that matches our EntityStreamingSupport
|
||||
(s.contentType match {
|
||||
case best @ (_: ContentType.Binary | _: ContentType.WithFixedCharset) ⇒
|
||||
marshallings collectFirst { case Marshalling.WithFixedContentType(`best`, marshal) ⇒ marshal }
|
||||
|
||||
case best @ ContentType.WithCharset(bestMT, bestCS) ⇒
|
||||
marshallings collectFirst {
|
||||
case Marshalling.WithFixedContentType(`best`, marshal) ⇒ marshal
|
||||
case Marshalling.WithOpenCharset(`bestMT`, marshal) ⇒ () ⇒ marshal(bestCS)
|
||||
}
|
||||
}).toList
|
||||
}
|
||||
val marshalledElements: Source[ByteString, M] =
|
||||
bestMarshallingPerElement.map(_.apply()) // marshal!
|
||||
.via(s.framingRenderer)
|
||||
|
||||
HttpResponse(entity = HttpEntity(s.contentType, marshalledElements))
|
||||
}) :: Nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trait LowPriorityToResponseMarshallerImplicits {
|
||||
|
|
@ -48,6 +86,40 @@ trait LowPriorityToResponseMarshallerImplicits {
|
|||
liftMarshaller(m)
|
||||
implicit def liftMarshaller[T](implicit m: ToEntityMarshaller[T]): ToResponseMarshaller[T] =
|
||||
PredefinedToResponseMarshallers.fromToEntityMarshaller()
|
||||
|
||||
// FIXME deduplicate this!!!
|
||||
implicit def fromEntityStreamingSupportAndEntityMarshaller[T, M](implicit s: EntityStreamingSupport, m: ToEntityMarshaller[T]): ToResponseMarshaller[Source[T, M]] = {
|
||||
Marshaller[Source[T, M], HttpResponse] { implicit ec ⇒ source ⇒
|
||||
FastFuture successful {
|
||||
Marshalling.WithFixedContentType(s.contentType, () ⇒ {
|
||||
val availableMarshallingsPerElement = source.mapAsync(1) { t ⇒ m(t)(ec) }
|
||||
|
||||
// TODO optimise such that we pick the optimal marshalling only once (headAndTail needed?)
|
||||
// TODO, NOTE: this is somewhat duplicated from Marshal.scala it could be made DRYer
|
||||
val bestMarshallingPerElement = availableMarshallingsPerElement mapConcat { marshallings ⇒
|
||||
// pick the Marshalling that matches our EntityStreamingSupport
|
||||
(s.contentType match {
|
||||
case best @ (_: ContentType.Binary | _: ContentType.WithFixedCharset) ⇒
|
||||
marshallings collectFirst { case Marshalling.WithFixedContentType(`best`, marshal) ⇒ marshal }
|
||||
|
||||
case best @ ContentType.WithCharset(bestMT, bestCS) ⇒
|
||||
marshallings collectFirst {
|
||||
case Marshalling.WithFixedContentType(`best`, marshal) ⇒ marshal
|
||||
case Marshalling.WithOpenCharset(`bestMT`, marshal) ⇒ () ⇒ marshal(bestCS)
|
||||
}
|
||||
}).toList
|
||||
}
|
||||
val marshalledElements: Source[ByteString, M] =
|
||||
bestMarshallingPerElement.map(_.apply()) // marshal!
|
||||
.flatMapConcat(_.dataBytes) // extract raw dataBytes
|
||||
.via(s.framingRenderer)
|
||||
|
||||
HttpResponse(entity = HttpEntity(s.contentType, marshalledElements))
|
||||
}) :: Nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object PredefinedToResponseMarshallers extends PredefinedToResponseMarshallers
|
||||
|
|
|
|||
|
|
@ -1,68 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.http.scaladsl.server
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.http.scaladsl.common.{ FramingWithContentType, SourceRenderingMode }
|
||||
import akka.http.scaladsl.model.{ ContentTypeRange, ContentTypes, MediaRanges }
|
||||
import akka.stream.scaladsl.{ Flow, Framing }
|
||||
import akka.util.ByteString
|
||||
|
||||
/**
|
||||
* Entity streaming support, independent of used Json parsing library etc.
|
||||
*
|
||||
* Can be extended by various Support traits (e.g. "SprayJsonSupport"),
|
||||
* in order to provide users with both `framing` (this trait) and `marshalling`
|
||||
* (implemented by a library) by using a single trait.
|
||||
*/
|
||||
trait EntityStreamingSupport extends EntityStreamingSupportBase {
|
||||
|
||||
/**
|
||||
* Implement as `implicit val` with required framing implementation, for example in
|
||||
* the case of streaming JSON uploads it could be `bracketCountingJsonFraming(maximumObjectLength)`.
|
||||
*/
|
||||
def incomingEntityStreamFraming: FramingWithContentType
|
||||
|
||||
/**
|
||||
* Implement as `implicit val` with the rendering mode to be used when redering `Source` instances.
|
||||
* For example for JSON it could be [[akka.http.scaladsl.common.JsonSourceRenderingMode.CompactArray]]
|
||||
* or [[akka.http.scaladsl.common.JsonSourceRenderingMode.LineByLine]].
|
||||
*/
|
||||
def outgoingEntityStreamRendering: SourceRenderingMode
|
||||
}
|
||||
|
||||
trait EntityStreamingSupportBase {
|
||||
/** `application/json` specific Framing implementation */
|
||||
def bracketCountingJsonFraming(maximumObjectLength: Int): FramingWithContentType =
|
||||
new ApplicationJsonBracketCountingFraming(maximumObjectLength)
|
||||
|
||||
/**
|
||||
* Frames incoming `text / *` entities on a line-by-line basis.
|
||||
* Useful for accepting `text/csv` uploads as a stream of rows.
|
||||
*/
|
||||
def newLineFraming(maximumObjectLength: Int, supportedContentTypes: ContentTypeRange): FramingWithContentType =
|
||||
new TextNewLineFraming(maximumObjectLength, supportedContentTypes)
|
||||
}
|
||||
|
||||
/**
|
||||
* Entity streaming support, independent of used Json parsing library etc.
|
||||
*
|
||||
* Can be extended by various Support traits (e.g. "SprayJsonSupport"),
|
||||
* in order to provide users with both `framing` (this trait) and `marshalling`
|
||||
* (implemented by a library) by using a single trait.
|
||||
*/
|
||||
object EntityStreamingSupport extends EntityStreamingSupportBase
|
||||
|
||||
final class ApplicationJsonBracketCountingFraming(maximumObjectLength: Int) extends FramingWithContentType {
|
||||
override final val flow = Flow[ByteString].via(akka.stream.scaladsl.JsonFraming.bracketCounting(maximumObjectLength))
|
||||
override final val supported = ContentTypeRange(ContentTypes.`application/json`)
|
||||
}
|
||||
|
||||
final class TextNewLineFraming(maximumLineLength: Int, supportedContentTypes: ContentTypeRange) extends FramingWithContentType {
|
||||
override final val flow: Flow[ByteString, ByteString, NotUsed] =
|
||||
Flow[ByteString].via(Framing.delimiter(ByteString("\n"), maximumLineLength))
|
||||
|
||||
override final val supported: ContentTypeRange =
|
||||
ContentTypeRange(MediaRanges.`text/*`)
|
||||
}
|
||||
|
|
@ -4,27 +4,24 @@
|
|||
package akka.http.scaladsl.server.directives
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.http.scaladsl.common.{ FramingWithContentType, SourceRenderingMode }
|
||||
import akka.http.scaladsl.common
|
||||
import akka.http.scaladsl.common.EntityStreamingSupport
|
||||
import akka.http.scaladsl.marshalling._
|
||||
import akka.http.scaladsl.model._
|
||||
import akka.http.scaladsl.unmarshalling.{ Unmarshal, Unmarshaller, _ }
|
||||
import akka.http.scaladsl.unmarshalling.{ Unmarshaller, _ }
|
||||
import akka.http.scaladsl.util.FastFuture
|
||||
import akka.stream.Materializer
|
||||
import akka.stream.impl.ConstantFun
|
||||
import akka.stream.scaladsl.{ Flow, Keep, Source }
|
||||
import akka.util.ByteString
|
||||
|
||||
import scala.concurrent.ExecutionContext
|
||||
import scala.language.implicitConversions
|
||||
|
||||
/**
|
||||
* Allows the [[MarshallingDirectives.entity]] directive to extract a [[Source]] of elements.
|
||||
*
|
||||
* See [[akka.http.scaladsl.server.EntityStreamingSupport]] for useful default [[FramingWithContentType]] instances and
|
||||
* See [[common.EntityStreamingSupport]] for useful default framing `Flow` instances and
|
||||
* support traits such as `SprayJsonSupport` (or your other favourite JSON library) to provide the needed [[Marshaller]] s.
|
||||
*/
|
||||
trait FramedEntityStreamingDirectives extends MarshallingDirectives {
|
||||
import FramedEntityStreamingDirectives._
|
||||
|
||||
type RequestToSourceUnmarshaller[T] = FromRequestUnmarshaller[Source[T, NotUsed]]
|
||||
|
||||
|
|
@ -32,41 +29,33 @@ trait FramedEntityStreamingDirectives extends MarshallingDirectives {
|
|||
* Extracts entity as [[Source]] of elements of type `T`.
|
||||
* This is achieved by applying the implicitly provided (in the following order):
|
||||
*
|
||||
* - 1st: [[FramingWithContentType]] in order to chunk-up the incoming [[ByteString]]s according to the
|
||||
* `Content-Type` aware framing (for example, [[akka.http.scaladsl.server.EntityStreamingSupport.bracketCountingJsonFraming]]).
|
||||
* - 2nd: [[Unmarshaller]] (from [[ByteString]] to `T`) for each of the respective "chunks" (e.g. for each JSON element contained within an array).
|
||||
* - 1st: chunk-up the incoming [[ByteString]]s by applying the `Content-Type`-aware framing
|
||||
* - 2nd: apply the [[Unmarshaller]] (from [[ByteString]] to `T`) for each of the respective "chunks" (e.g. for each JSON element contained within an array).
|
||||
*
|
||||
* The request will be rejected with an [[akka.http.scaladsl.server.UnsupportedRequestContentTypeRejection]] if
|
||||
* its [[ContentType]] is not supported by the used `framing` or `unmarshaller`.
|
||||
*
|
||||
* It is recommended to use the [[akka.http.scaladsl.server.EntityStreamingSupport]] trait in conjunction with this
|
||||
* directive as it helps provide the right [[FramingWithContentType]] and [[SourceRenderingMode]] for the most
|
||||
* typical usage scenarios (JSON, CSV, ...).
|
||||
*
|
||||
* Cancelling extracted [[Source]] closes the connection abruptly (same as cancelling the `entity.dataBytes`).
|
||||
*
|
||||
* If looking to improve marshalling performance in face of many elements (possibly of different sizes),
|
||||
* you may be interested in using [[asSourceOfAsyncUnordered]] instead.
|
||||
*
|
||||
* See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route.
|
||||
* By default the uploaded data is limited by the `akka.http.parsing.max-content-length`.
|
||||
*/
|
||||
final def asSourceOf[T](implicit um: Unmarshaller[ByteString, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] =
|
||||
asSourceOfAsync(1)(um, framing)
|
||||
final def asSourceOf[T](implicit um: FromByteStringUnmarshaller[T], support: EntityStreamingSupport): RequestToSourceUnmarshaller[T] =
|
||||
asSourceOfInternal(um, support)
|
||||
|
||||
/**
|
||||
* Extracts entity as [[Source]] of elements of type `T`.
|
||||
* This is achieved by applying the implicitly provided (in the following order):
|
||||
*
|
||||
* - 1st: [[FramingWithContentType]] in order to chunk-up the incoming [[ByteString]]s according to the
|
||||
* `Content-Type` aware framing (for example, [[akka.http.scaladsl.server.EntityStreamingSupport.bracketCountingJsonFraming]]).
|
||||
* - 1st: [[FramingFlow]] in order to chunk-up the incoming [[ByteString]]s according to the
|
||||
* `Content-Type` aware framing (for example, [[common.EntityStreamingSupport.bracketCountingJsonFraming]]).
|
||||
* - 2nd: [[Unmarshaller]] (from [[ByteString]] to `T`) for each of the respective "chunks" (e.g. for each JSON element contained within an array).
|
||||
*
|
||||
* The request will be rejected with an [[akka.http.scaladsl.server.UnsupportedRequestContentTypeRejection]] if
|
||||
* its [[ContentType]] is not supported by the used `framing` or `unmarshaller`.
|
||||
*
|
||||
* It is recommended to use the [[akka.http.scaladsl.server.EntityStreamingSupport]] trait in conjunction with this
|
||||
* directive as it helps provide the right [[FramingWithContentType]] and [[SourceRenderingMode]] for the most
|
||||
* It is recommended to use the [[common.EntityStreamingSupport]] trait in conjunction with this
|
||||
* directive as it helps provide the right [[FramingFlow]] and [[SourceRenderingMode]] for the most
|
||||
* typical usage scenarios (JSON, CSV, ...).
|
||||
*
|
||||
* Cancelling extracted [[Source]] closes the connection abruptly (same as cancelling the `entity.dataBytes`).
|
||||
|
|
@ -77,177 +66,25 @@ trait FramedEntityStreamingDirectives extends MarshallingDirectives {
|
|||
* See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route.
|
||||
* By default the uploaded data is limited by the `akka.http.parsing.max-content-length`.
|
||||
*/
|
||||
final def asSourceOf[T](framing: FramingWithContentType)(implicit um: Unmarshaller[ByteString, T]): RequestToSourceUnmarshaller[T] =
|
||||
asSourceOfAsync(1)(um, framing)
|
||||
|
||||
/**
|
||||
* Similar to [[asSourceOf]] however will apply at most `parallelism` unmarshallers in parallel.
|
||||
*
|
||||
* The source elements emitted preserve the order in which they are sent in the incoming [[HttpRequest]].
|
||||
* If you want to sacrivice ordering in favour of (potential) slight performance improvements in reading the input
|
||||
* you may want to use [[asSourceOfAsyncUnordered]] instead, which lifts the ordering guarantee.
|
||||
*
|
||||
* Refer to [[asSourceOf]] for more in depth-documentation and guidelines.
|
||||
*
|
||||
* If looking to improve marshalling performance in face of many elements (possibly of different sizes),
|
||||
* you may be interested in using [[asSourceOfAsyncUnordered]] instead.
|
||||
*
|
||||
* See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route.
|
||||
* By default the uploaded data is limited by the `akka.http.parsing.max-content-length`.
|
||||
*/
|
||||
final def asSourceOfAsync[T](parallelism: Int)(implicit um: Unmarshaller[ByteString, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] =
|
||||
asSourceOfInternal[T](framing, (ec, mat) ⇒ Flow[ByteString].mapAsync(parallelism)(Unmarshal(_).to[T](um, ec, mat)))
|
||||
|
||||
/**
|
||||
* Similar to [[asSourceOf]] however will apply at most `parallelism` unmarshallers in parallel.
|
||||
*
|
||||
* The source elements emitted preserve the order in which they are sent in the incoming [[HttpRequest]].
|
||||
* If you want to sacrivice ordering in favour of (potential) slight performance improvements in reading the input
|
||||
* you may want to use [[asSourceOfAsyncUnordered]] instead, which lifts the ordering guarantee.
|
||||
*
|
||||
* Refer to [[asSourceOf]] for more in depth-documentation and guidelines.
|
||||
*
|
||||
* See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route.
|
||||
* By default the uploaded data is limited by the `akka.http.parsing.max-content-length`.
|
||||
*/
|
||||
final def asSourceOfAsync[T](parallelism: Int, framing: FramingWithContentType)(implicit um: Unmarshaller[ByteString, T]): RequestToSourceUnmarshaller[T] =
|
||||
asSourceOfAsync(parallelism)(um, framing)
|
||||
|
||||
/**
|
||||
* Similar to [[asSourceOfAsync]], as it will apply at most `parallelism` unmarshallers in parallel.
|
||||
*
|
||||
* The source elements emitted preserve the order in which they are sent in the incoming [[HttpRequest]].
|
||||
* If you want to sacrivice ordering in favour of (potential) slight performance improvements in reading the input
|
||||
* you may want to use [[asSourceOfAsyncUnordered]] instead, which lifts the ordering guarantee.
|
||||
*
|
||||
* Refer to [[asSourceOf]] for more in depth-documentation and guidelines.
|
||||
*
|
||||
* See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route.
|
||||
* By default the uploaded data is limited by the `akka.http.parsing.max-content-length`.
|
||||
*/
|
||||
final def asSourceOfAsyncUnordered[T](parallelism: Int)(implicit um: Unmarshaller[ByteString, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] =
|
||||
asSourceOfInternal[T](framing, (ec, mat) ⇒ Flow[ByteString].mapAsyncUnordered(parallelism)(Unmarshal(_).to[T](um, ec, mat)))
|
||||
/**
|
||||
* Similar to [[asSourceOfAsync]], as it will apply at most `parallelism` unmarshallers in parallel.
|
||||
*
|
||||
* The source elements emitted preserve the order in which they are sent in the incoming [[HttpRequest]].
|
||||
* If you want to sacrivice ordering in favour of (potential) slight performance improvements in reading the input
|
||||
* you may want to use [[asSourceOfAsyncUnordered]] instead, which lifts the ordering guarantee.
|
||||
*
|
||||
* Refer to [[asSourceOf]] for more in depth-documentation and guidelines.
|
||||
*
|
||||
* See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route.
|
||||
* By default the uploaded data is limited by the `akka.http.parsing.max-content-length`.
|
||||
*/
|
||||
final def asSourceOfAsyncUnordered[T](parallelism: Int, framing: FramingWithContentType)(implicit um: Unmarshaller[ByteString, T]): RequestToSourceUnmarshaller[T] =
|
||||
asSourceOfAsyncUnordered(parallelism)(um, framing)
|
||||
final def asSourceOf[T](support: EntityStreamingSupport)(implicit um: FromByteStringUnmarshaller[T]): RequestToSourceUnmarshaller[T] =
|
||||
asSourceOfInternal(um, support)
|
||||
|
||||
// format: OFF
|
||||
private final def asSourceOfInternal[T](framing: FramingWithContentType, marshalling: (ExecutionContext, Materializer) => Flow[ByteString, ByteString, NotUsed]#ReprMat[T, NotUsed]): RequestToSourceUnmarshaller[T] =
|
||||
private final def asSourceOfInternal[T](um: Unmarshaller[ByteString, T], support: EntityStreamingSupport): RequestToSourceUnmarshaller[T] =
|
||||
Unmarshaller.withMaterializer[HttpRequest, Source[T, NotUsed]] { implicit ec ⇒ implicit mat ⇒ req ⇒
|
||||
val entity = req.entity
|
||||
if (framing.matches(entity.contentType)) {
|
||||
if (support.supported.matches(entity.contentType)) {
|
||||
val bytes = entity.dataBytes
|
||||
val frames = bytes.via(framing.flow)
|
||||
val elements = frames.viaMat(marshalling(ec, mat))(Keep.right)
|
||||
val frames = bytes.via(support.framingDecoder)
|
||||
val marshalling =
|
||||
if (support.unordered) Flow[ByteString].mapAsyncUnordered(support.parallelism)(bs => um(bs)(ec, mat))
|
||||
else Flow[ByteString].mapAsync(support.parallelism)(bs => um(bs)(ec, mat))
|
||||
|
||||
val elements = frames.viaMat(marshalling)(Keep.right)
|
||||
FastFuture.successful(elements)
|
||||
|
||||
} else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(framing.supported))
|
||||
} else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(support.supported))
|
||||
}
|
||||
// format: ON
|
||||
|
||||
// TODO note to self - we need the same of ease of streaming stuff for the client side - i.e. the twitter firehose case.
|
||||
|
||||
implicit def _asSourceUnmarshaller[T](implicit fem: FromEntityUnmarshaller[T], framing: FramingWithContentType): FromRequestUnmarshaller[Source[T, NotUsed]] = {
|
||||
Unmarshaller.withMaterializer[HttpRequest, Source[T, NotUsed]] { implicit ec ⇒ implicit mat ⇒ req ⇒
|
||||
val entity = req.entity
|
||||
if (framing.matches(entity.contentType)) {
|
||||
val bytes = entity.dataBytes
|
||||
val frames = bytes.viaMat(framing.flow)(Keep.right)
|
||||
val elements = frames.viaMat(Flow[ByteString].map(HttpEntity(entity.contentType, _)).mapAsync(1)(Unmarshal(_).to[T](fem, ec, mat)))(Keep.right)
|
||||
FastFuture.successful(elements)
|
||||
} else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(framing.supported))
|
||||
}
|
||||
}
|
||||
|
||||
implicit def _sourceMarshaller[T, M](implicit m: ToEntityMarshaller[T], mode: SourceRenderingMode): ToResponseMarshaller[Source[T, M]] =
|
||||
Marshaller[Source[T, M], HttpResponse] { implicit ec ⇒ source ⇒
|
||||
FastFuture successful {
|
||||
Marshalling.WithFixedContentType(mode.contentType, () ⇒ {
|
||||
val bytes = source
|
||||
.mapAsync(1)(t ⇒ Marshal(t).to[HttpEntity])
|
||||
.map(_.dataBytes)
|
||||
.flatMapConcat(ConstantFun.scalaIdentityFunction)
|
||||
.intersperse(mode.start, mode.between, mode.end)
|
||||
HttpResponse(entity = HttpEntity(mode.contentType, bytes))
|
||||
}) :: Nil
|
||||
}
|
||||
}
|
||||
|
||||
implicit def _sourceParallelismMarshaller[T](implicit m: ToEntityMarshaller[T], mode: SourceRenderingMode): ToResponseMarshaller[AsyncRenderingOf[T]] =
|
||||
Marshaller[AsyncRenderingOf[T], HttpResponse] { implicit ec ⇒ rendering ⇒
|
||||
FastFuture successful {
|
||||
Marshalling.WithFixedContentType(mode.contentType, () ⇒ {
|
||||
val bytes = rendering.source
|
||||
.mapAsync(rendering.parallelism)(t ⇒ Marshal(t).to[HttpEntity])
|
||||
.map(_.dataBytes)
|
||||
.flatMapConcat(ConstantFun.scalaIdentityFunction)
|
||||
.intersperse(mode.start, mode.between, mode.end)
|
||||
HttpResponse(entity = HttpEntity(mode.contentType, bytes))
|
||||
}) :: Nil
|
||||
}
|
||||
}
|
||||
|
||||
implicit def _sourceUnorderedMarshaller[T](implicit m: ToEntityMarshaller[T], mode: SourceRenderingMode): ToResponseMarshaller[AsyncUnorderedRenderingOf[T]] =
|
||||
Marshaller[AsyncUnorderedRenderingOf[T], HttpResponse] { implicit ec ⇒ rendering ⇒
|
||||
FastFuture successful {
|
||||
Marshalling.WithFixedContentType(mode.contentType, () ⇒ {
|
||||
val bytes = rendering.source
|
||||
.mapAsync(rendering.parallelism)(t ⇒ Marshal(t).to[HttpEntity])
|
||||
.map(_.dataBytes)
|
||||
.flatMapConcat(ConstantFun.scalaIdentityFunction)
|
||||
.intersperse(mode.start, mode.between, mode.end)
|
||||
HttpResponse(entity = HttpEntity(mode.contentType, bytes))
|
||||
}) :: Nil
|
||||
}
|
||||
}
|
||||
|
||||
// special rendering modes
|
||||
|
||||
implicit def _enableSpecialSourceRenderingModes[T](source: Source[T, Any]): EnableSpecialSourceRenderingModes[T] =
|
||||
new EnableSpecialSourceRenderingModes(source)
|
||||
|
||||
}
|
||||
/**
|
||||
* Allows the [[MarshallingDirectives.entity]] directive to extract a [[Source]] of elements.
|
||||
*
|
||||
* See [[FramedEntityStreamingDirectives]] for detailed documentation.
|
||||
*/
|
||||
object FramedEntityStreamingDirectives extends FramedEntityStreamingDirectives {
|
||||
sealed class AsyncSourceRenderingMode
|
||||
final class AsyncRenderingOf[T](val source: Source[T, Any], val parallelism: Int) extends AsyncSourceRenderingMode
|
||||
final class AsyncUnorderedRenderingOf[T](val source: Source[T, Any], val parallelism: Int) extends AsyncSourceRenderingMode
|
||||
|
||||
}
|
||||
|
||||
/** Provides DSL for special rendering modes, e.g. `complete(source.renderAsync)` */
|
||||
final class EnableSpecialSourceRenderingModes[T](val source: Source[T, Any]) extends AnyVal {
|
||||
/**
|
||||
* Causes the response stream to be marshalled asynchronously (up to `parallelism` elements at once),
|
||||
* while retaining the ordering of incoming elements.
|
||||
*
|
||||
* See also [[Source.mapAsync]].
|
||||
*/
|
||||
def renderAsync(parallelism: Int) = new FramedEntityStreamingDirectives.AsyncRenderingOf(source, parallelism)
|
||||
/**
|
||||
* Causes the response stream to be marshalled asynchronously (up to `parallelism` elements at once),
|
||||
* emitting the first one that finished marshalling onto the wire.
|
||||
*
|
||||
* This sacrifices ordering of the incoming data in regards to data actually rendered onto the wire,
|
||||
* but may be faster if some elements are smaller than other ones by not stalling the small elements
|
||||
* from being written while the large one still is being marshalled.
|
||||
*
|
||||
* See also [[Source.mapAsyncUnordered]].
|
||||
*/
|
||||
def renderAsyncUnordered(parallelism: Int) = new FramedEntityStreamingDirectives.AsyncUnorderedRenderingOf(source, parallelism)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,11 +6,9 @@ package akka.stream.scaladsl
|
|||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.impl.JsonObjectParser
|
||||
import akka.stream.scaladsl.Framing.FramingException
|
||||
import akka.stream.scaladsl.{ JsonFraming, Framing, Source }
|
||||
import akka.stream.testkit.scaladsl.TestSink
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.util.ByteString
|
||||
import org.scalatest.concurrent.ScalaFutures
|
||||
|
||||
import scala.collection.immutable.Seq
|
||||
import scala.concurrent.Await
|
||||
|
|
@ -26,21 +24,22 @@ class JsonFramingSpec extends AkkaSpec {
|
|||
"""
|
||||
|[
|
||||
| { "name" : "john" },
|
||||
| { "name" : "Ég get etið gler án þess að meiða mig" },
|
||||
| { "name" : "jack" },
|
||||
| { "name" : "katie" }
|
||||
|]
|
||||
|""".stripMargin // also should complete once notices end of array
|
||||
|
||||
val result = Source.single(ByteString(input))
|
||||
.via(JsonFraming.bracketCounting(Int.MaxValue))
|
||||
.via(JsonFraming.objectScanner(Int.MaxValue))
|
||||
.runFold(Seq.empty[String]) {
|
||||
case (acc, entry) ⇒ acc ++ Seq(entry.utf8String)
|
||||
}
|
||||
|
||||
result.futureValue shouldBe Seq(
|
||||
"""{ "name" : "john" }""",
|
||||
"""{ "name" : "jack" }""",
|
||||
"""{ "name" : "katie" }""")
|
||||
"""{ "name" : "Ég get etið gler án þess að meiða mig" }""",
|
||||
"""{ "name" : "jack" }"""
|
||||
)
|
||||
}
|
||||
|
||||
"emit single json element from string" in {
|
||||
|
|
@ -50,7 +49,7 @@ class JsonFramingSpec extends AkkaSpec {
|
|||
""".stripMargin
|
||||
|
||||
val result = Source.single(ByteString(input))
|
||||
.via(JsonFraming.bracketCounting(Int.MaxValue))
|
||||
.via(JsonFraming.objectScanner(Int.MaxValue))
|
||||
.take(1)
|
||||
.runFold(Seq.empty[String]) {
|
||||
case (acc, entry) ⇒ acc ++ Seq(entry.utf8String)
|
||||
|
|
@ -67,7 +66,7 @@ class JsonFramingSpec extends AkkaSpec {
|
|||
""".stripMargin
|
||||
|
||||
val result = Source.single(ByteString(input))
|
||||
.via(JsonFraming.bracketCounting(Int.MaxValue))
|
||||
.via(JsonFraming.objectScanner(Int.MaxValue))
|
||||
.runFold(Seq.empty[String]) {
|
||||
case (acc, entry) ⇒ acc ++ Seq(entry.utf8String)
|
||||
}
|
||||
|
|
@ -85,7 +84,7 @@ class JsonFramingSpec extends AkkaSpec {
|
|||
""".stripMargin
|
||||
|
||||
val result = Source.single(ByteString(input))
|
||||
.via(JsonFraming.bracketCounting(Int.MaxValue))
|
||||
.via(JsonFraming.objectScanner(Int.MaxValue))
|
||||
.runFold(Seq.empty[String]) {
|
||||
case (acc, entry) ⇒ acc ++ Seq(entry.utf8String)
|
||||
}
|
||||
|
|
@ -109,7 +108,7 @@ class JsonFramingSpec extends AkkaSpec {
|
|||
""""}]"""").map(ByteString(_))
|
||||
|
||||
val result = Source.apply(input)
|
||||
.via(JsonFraming.bracketCounting(Int.MaxValue))
|
||||
.via(JsonFraming.objectScanner(Int.MaxValue))
|
||||
.runFold(Seq.empty[String]) {
|
||||
case (acc, entry) ⇒ acc ++ Seq(entry.utf8String)
|
||||
}
|
||||
|
|
@ -410,7 +409,7 @@ class JsonFramingSpec extends AkkaSpec {
|
|||
""".stripMargin
|
||||
|
||||
val result = Source.single(ByteString(input))
|
||||
.via(JsonFraming.bracketCounting(5)).map(_.utf8String)
|
||||
.via(JsonFraming.objectScanner(5)).map(_.utf8String)
|
||||
.runFold(Seq.empty[String]) {
|
||||
case (acc, entry) ⇒ acc ++ Seq(entry)
|
||||
}
|
||||
|
|
@ -427,7 +426,7 @@ class JsonFramingSpec extends AkkaSpec {
|
|||
"""{ "name": "very very long name somehow. how did this happen?" }""").map(s ⇒ ByteString(s))
|
||||
|
||||
val probe = Source(input)
|
||||
.via(JsonFraming.bracketCounting(48))
|
||||
.via(JsonFraming.objectScanner(48))
|
||||
.runWith(TestSink.probe)
|
||||
|
||||
probe.ensureSubscription()
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ object JsonFraming {
|
|||
* @param maximumObjectLength The maximum length of allowed frames while decoding. If the maximum length is exceeded
|
||||
* this Flow will fail the stream.
|
||||
*/
|
||||
def bracketCounting(maximumObjectLength: Int): Flow[ByteString, ByteString, NotUsed] =
|
||||
akka.stream.scaladsl.JsonFraming.bracketCounting(maximumObjectLength).asJava
|
||||
def objectScanner(maximumObjectLength: Int): Flow[ByteString, ByteString, NotUsed] =
|
||||
akka.stream.scaladsl.JsonFraming.objectScanner(maximumObjectLength).asJava
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ object JsonFraming {
|
|||
|
||||
/**
|
||||
* Returns a Flow that implements a "brace counting" based framing stage for emitting valid JSON chunks.
|
||||
* It scans the incoming data stream for valid JSON objects and returns chunks of ByteStrings containing only those valid chunks.
|
||||
*
|
||||
* Typical examples of data that one may want to frame using this stage include:
|
||||
*
|
||||
|
|
@ -40,7 +41,7 @@ object JsonFraming {
|
|||
* @param maximumObjectLength The maximum length of allowed frames while decoding. If the maximum length is exceeded
|
||||
* this Flow will fail the stream.
|
||||
*/
|
||||
def bracketCounting(maximumObjectLength: Int): Flow[ByteString, ByteString, NotUsed] =
|
||||
def objectScanner(maximumObjectLength: Int): Flow[ByteString, ByteString, NotUsed] =
|
||||
Flow[ByteString].via(new SimpleLinearGraphStage[ByteString] {
|
||||
private[this] val buffer = new JsonObjectParser(maximumObjectLength)
|
||||
|
||||
|
|
@ -67,6 +68,6 @@ object JsonFraming {
|
|||
}
|
||||
}
|
||||
}
|
||||
}).named("jsonFraming(BracketCounting)")
|
||||
}).named("JsonFraming.objectScanner")
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue