+htp #18837 more docs and final cleanups, complete java docs
This commit is contained in:
parent
db880a3db0
commit
c76ec2ac15
25 changed files with 476 additions and 179 deletions
|
|
@ -0,0 +1,101 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package docs.http.javadsl.server;
|
||||||
|
|
||||||
|
import akka.NotUsed;
|
||||||
|
import akka.http.javadsl.common.FramingWithContentType;
|
||||||
|
import akka.http.javadsl.common.JsonSourceRenderingModes;
|
||||||
|
import akka.http.javadsl.marshallers.jackson.Jackson;
|
||||||
|
import akka.http.javadsl.model.*;
|
||||||
|
import akka.http.javadsl.model.headers.Accept;
|
||||||
|
import akka.http.javadsl.server.*;
|
||||||
|
import akka.http.javadsl.testkit.JUnitRouteTest;
|
||||||
|
import akka.http.javadsl.testkit.TestRoute;
|
||||||
|
import akka.stream.javadsl.Source;
|
||||||
|
import akka.util.ByteString;
|
||||||
|
import docs.http.javadsl.server.testkit.MyAppService;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.concurrent.CompletionStage;
|
||||||
|
|
||||||
|
public class JsonStreamingExamplesTest extends JUnitRouteTest {
|
||||||
|
|
||||||
|
//#routes
|
||||||
|
final Route tweets() {
|
||||||
|
//#formats
|
||||||
|
final Unmarshaller<ByteString, JavaTweet> JavaTweets = Jackson.byteStringUnmarshaller(JavaTweet.class);
|
||||||
|
//#formats
|
||||||
|
|
||||||
|
//#response-streaming
|
||||||
|
final Route responseStreaming = path("tweets", () ->
|
||||||
|
get(() ->
|
||||||
|
parameter(StringUnmarshallers.INTEGER, "n", n -> {
|
||||||
|
final Source<JavaTweet, NotUsed> tws =
|
||||||
|
Source.repeat(new JavaTweet("Hello World!")).take(n);
|
||||||
|
return completeOKWithSource(tws, Jackson.marshaller(), JsonSourceRenderingModes.arrayCompact());
|
||||||
|
})
|
||||||
|
)
|
||||||
|
);
|
||||||
|
//#response-streaming
|
||||||
|
|
||||||
|
//#incoming-request-streaming
|
||||||
|
final Route incomingStreaming = path("tweets", () ->
|
||||||
|
post(() ->
|
||||||
|
extractMaterializer(mat -> {
|
||||||
|
final FramingWithContentType jsonFraming = EntityStreamingSupport.bracketCountingJsonFraming(128);
|
||||||
|
|
||||||
|
return entityasSourceOf(JavaTweets, jsonFraming, sourceOfTweets -> {
|
||||||
|
final CompletionStage<Integer> tweetsCount = sourceOfTweets.runFold(0, (acc, tweet) -> acc + 1, mat);
|
||||||
|
return onComplete(tweetsCount, c -> complete("Total number of tweets: " + c));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
//#incoming-request-streaming
|
||||||
|
|
||||||
|
return responseStreaming.orElse(incomingStreaming);
|
||||||
|
}
|
||||||
|
//#routes
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void getTweetsTest() {
|
||||||
|
//#response-streaming
|
||||||
|
// tests:
|
||||||
|
final TestRoute routes = testRoute(tweets());
|
||||||
|
|
||||||
|
// test happy path
|
||||||
|
final Accept acceptApplication = Accept.create(MediaRanges.create(MediaTypes.APPLICATION_JSON));
|
||||||
|
routes.run(HttpRequest.GET("/tweets?n=2").addHeader(acceptApplication))
|
||||||
|
.assertStatusCode(200)
|
||||||
|
.assertEntity("[{\"message\":\"Hello World!\"},{\"message\":\"Hello World!\"}]");
|
||||||
|
|
||||||
|
// test responses to potential errors
|
||||||
|
final Accept acceptText = Accept.create(MediaRanges.ALL_TEXT);
|
||||||
|
routes.run(HttpRequest.GET("/tweets?n=3").addHeader(acceptText))
|
||||||
|
.assertStatusCode(StatusCodes.NOT_ACCEPTABLE) // 406
|
||||||
|
.assertEntity("Resource representation is only available with these types:\napplication/json");
|
||||||
|
//#response-streaming
|
||||||
|
}
|
||||||
|
|
||||||
|
//#models
|
||||||
|
private static final class JavaTweet {
|
||||||
|
private String message;
|
||||||
|
|
||||||
|
public JavaTweet(String message) {
|
||||||
|
this.message = message;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMessage(String message) {
|
||||||
|
this.message = message;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getMessage() {
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
//#models
|
||||||
|
}
|
||||||
|
|
@ -1,33 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (C) 2015-2016 Lightbend Inc. <http://www.lightbend.com>
|
|
||||||
*/
|
|
||||||
|
|
||||||
package docs.http.javadsl.server.directives;
|
|
||||||
|
|
||||||
import akka.http.javadsl.model.HttpRequest;
|
|
||||||
import akka.http.javadsl.model.HttpResponse;
|
|
||||||
import akka.http.javadsl.server.Route;
|
|
||||||
import akka.http.javadsl.server.directives.FramedEntityStreamingDirectives;
|
|
||||||
import akka.http.javadsl.server.directives.LogEntry;
|
|
||||||
import akka.http.javadsl.testkit.JUnitRouteTest;
|
|
||||||
import akka.http.scaladsl.server.Rejection;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.function.BiFunction;
|
|
||||||
import java.util.function.Function;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import static akka.event.Logging.InfoLevel;
|
|
||||||
|
|
||||||
import static akka.http.javadsl.server.directives.FramedEntityStreamingDirectives.*;
|
|
||||||
|
|
||||||
public class FramedEntityStreamingExamplesTest extends JUnitRouteTest {
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testRenderSource() {
|
|
||||||
FramedEntityStreamingDirectives.
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
@ -18,6 +18,7 @@ To use the high-level API you need to add a dependency to the ``akka-http-experi
|
||||||
directives/index
|
directives/index
|
||||||
marshalling
|
marshalling
|
||||||
exception-handling
|
exception-handling
|
||||||
|
source-streaming-support
|
||||||
rejections
|
rejections
|
||||||
testkit
|
testkit
|
||||||
|
|
||||||
|
|
@ -51,7 +52,6 @@ in the :ref:`exception-handling-java` section of the documtnation. You can use t
|
||||||
|
|
||||||
File uploads
|
File uploads
|
||||||
^^^^^^^^^^^^
|
^^^^^^^^^^^^
|
||||||
TODO not possible in Java DSL since there
|
|
||||||
|
|
||||||
For high level directives to handle uploads see the :ref:`FileUploadDirectives-java`.
|
For high level directives to handle uploads see the :ref:`FileUploadDirectives-java`.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,74 @@
|
||||||
|
.. _json-streaming-java:
|
||||||
|
|
||||||
|
Source Streaming
|
||||||
|
================
|
||||||
|
|
||||||
|
Akka HTTP supports completing a request with an Akka ``Source<T, ?>``, which makes it possible to very easily build
|
||||||
|
streaming end-to-end APIs which apply back-pressure throughout the entire stack.
|
||||||
|
|
||||||
|
It is possible to complete requests with raw ``Source<ByteString, ?>``, however often it is more convenient to
|
||||||
|
stream on an element-by-element basis, and allow Akka HTTP to handle the rendering internally - for example as a JSON array,
|
||||||
|
or CSV stream (where each element is separated by a new-line).
|
||||||
|
|
||||||
|
In the following sections we investigate how to make use of the JSON Streaming infrastructure,
|
||||||
|
however the general hints apply to any kind of element-by-element streaming you could imagine.
|
||||||
|
|
||||||
|
It is possible to implement your own framing for any content type you might need, including bianary formats
|
||||||
|
by implementing :class:`FramingWithContentType`.
|
||||||
|
|
||||||
|
JSON Streaming
|
||||||
|
==============
|
||||||
|
|
||||||
|
`JSON Streaming`_ is a term refering to streaming a (possibly infinite) stream of element as independent JSON
|
||||||
|
objects as a continuous HTTP request or response. The elements are most often separated using newlines,
|
||||||
|
however do not have to be. Concatenating elements side-by-side or emitting "very long" JSON array is also another
|
||||||
|
use case.
|
||||||
|
|
||||||
|
In the below examples, we'll be refering to the ``User`` and ``Measurement`` case classes as our model, which are defined as:
|
||||||
|
|
||||||
|
.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#models
|
||||||
|
|
||||||
|
.. _Json Streaming: https://en.wikipedia.org/wiki/JSON_Streaming
|
||||||
|
|
||||||
|
Responding with JSON Streams
|
||||||
|
----------------------------
|
||||||
|
|
||||||
|
In this example we implement an API representing an infinite stream of tweets, very much like Twitter's `Streaming API`_.
|
||||||
|
|
||||||
|
Firstly, we'll need to get some additional marshalling infrastructure set up, that is able to marshal to and from an
|
||||||
|
Akka Streams ``Source<T, ?>``. One such trait, containing the needed marshallers is ``SprayJsonSupport``, which uses
|
||||||
|
spray-json (a high performance json parser library), and is shipped as part of Akka HTTP in the
|
||||||
|
``akka-http-spray-json-experimental`` module.
|
||||||
|
|
||||||
|
The last bit of setup, before we can render a streaming json response
|
||||||
|
|
||||||
|
.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#response-streaming
|
||||||
|
|
||||||
|
.. _Streaming API: https://dev.twitter.com/streaming/overview
|
||||||
|
|
||||||
|
Consuming JSON Streaming uploads
|
||||||
|
--------------------------------
|
||||||
|
|
||||||
|
Sometimes the client may be sending a streaming request, for example an embedded device initiated a connection with
|
||||||
|
the server and is feeding it with one line of measurement data.
|
||||||
|
|
||||||
|
In this example, we want to consume this data in a streaming fashion from the request entity, and also apply
|
||||||
|
back-pressure to the underlying TCP connection, if the server can not cope with the rate of incoming data (back-pressure
|
||||||
|
will be applied automatically thanks to using Akka HTTP/Streams).
|
||||||
|
|
||||||
|
.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#formats
|
||||||
|
|
||||||
|
.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#incoming-request-streaming
|
||||||
|
|
||||||
|
Implementing custom (Un)Marshaller support for JSON streaming
|
||||||
|
-------------------------------------------------------------
|
||||||
|
|
||||||
|
The following types that may need to be implemented by a custom framed-streaming support library are:
|
||||||
|
|
||||||
|
- ``SourceRenderingMode`` which can customise how to render the begining / between-elements and ending of such
|
||||||
|
stream (while writing a response, i.e. by calling ``complete(source)``).
|
||||||
|
Implementations for JSON are available in ``akka.http.scaladsl.common.JsonSourceRenderingMode``.
|
||||||
|
- ``FramingWithContentType`` which is needed to be able to split incoming ``ByteString``
|
||||||
|
chunks into frames of the higher-level data type format that is understood by the provided unmarshallers.
|
||||||
|
In the case of JSON it means chunking up ByteStrings such that each emitted element corresponds to exactly one JSON object,
|
||||||
|
this framing is implemented in ``EntityStreamingSupport``.
|
||||||
|
|
@ -5,13 +5,13 @@
|
||||||
package docs.http.scaladsl.server.directives
|
package docs.http.scaladsl.server.directives
|
||||||
|
|
||||||
import akka.NotUsed
|
import akka.NotUsed
|
||||||
|
import akka.http.scaladsl.common.{ FramingWithContentType, JsonSourceRenderingModes }
|
||||||
import akka.http.scaladsl.marshalling.ToResponseMarshallable
|
import akka.http.scaladsl.marshalling.ToResponseMarshallable
|
||||||
import akka.http.scaladsl.model._
|
import akka.http.scaladsl.model._
|
||||||
import akka.http.scaladsl.model.headers.Accept
|
import akka.http.scaladsl.model.headers.Accept
|
||||||
import akka.http.scaladsl.server.{ UnsupportedRequestContentTypeRejection, UnacceptedResponseContentTypeRejection, JsonSourceRenderingMode }
|
import akka.http.scaladsl.server.{ UnacceptedResponseContentTypeRejection, UnsupportedRequestContentTypeRejection }
|
||||||
import akka.stream.scaladsl.{ Flow, Source }
|
import akka.stream.scaladsl.{ Flow, Source }
|
||||||
import docs.http.scaladsl.server.RoutingSpec
|
import docs.http.scaladsl.server.RoutingSpec
|
||||||
import spray.json.{ JsValue, JsObject, DefaultJsonProtocol }
|
|
||||||
|
|
||||||
import scala.concurrent.Future
|
import scala.concurrent.Future
|
||||||
|
|
||||||
|
|
@ -30,7 +30,7 @@ class JsonStreamingExamplesSpec extends RoutingSpec {
|
||||||
|
|
||||||
//#formats
|
//#formats
|
||||||
object MyJsonProtocol extends spray.json.DefaultJsonProtocol {
|
object MyJsonProtocol extends spray.json.DefaultJsonProtocol {
|
||||||
implicit val userFormat = jsonFormat2(Tweet.apply)
|
implicit val tweetFormat = jsonFormat2(Tweet.apply)
|
||||||
implicit val measurementFormat = jsonFormat2(Measurement.apply)
|
implicit val measurementFormat = jsonFormat2(Measurement.apply)
|
||||||
}
|
}
|
||||||
//#
|
//#
|
||||||
|
|
@ -43,19 +43,22 @@ class JsonStreamingExamplesSpec extends RoutingSpec {
|
||||||
import MyJsonProtocol._
|
import MyJsonProtocol._
|
||||||
|
|
||||||
// [3] pick json rendering mode:
|
// [3] pick json rendering mode:
|
||||||
implicit val jsonRenderingMode = JsonSourceRenderingMode.LineByLine
|
// HINT: if you extend `akka.http.scaladsl.server.EntityStreamingSupport`
|
||||||
|
// it'll guide you to do so via abstract defs
|
||||||
|
val maximumObjectLength = 128
|
||||||
|
implicit val jsonRenderingMode = JsonSourceRenderingModes.LineByLine
|
||||||
|
|
||||||
val route =
|
val route =
|
||||||
path("users") {
|
path("tweets") {
|
||||||
val users: Source[Tweet, NotUsed] = getTweets()
|
val tweets: Source[Tweet, NotUsed] = getTweets()
|
||||||
complete(ToResponseMarshallable(users))
|
complete(ToResponseMarshallable(tweets))
|
||||||
}
|
}
|
||||||
|
|
||||||
// tests:
|
// tests:
|
||||||
val AcceptJson = Accept(MediaRange(MediaTypes.`application/json`))
|
val AcceptJson = Accept(MediaRange(MediaTypes.`application/json`))
|
||||||
val AcceptXml = Accept(MediaRange(MediaTypes.`text/xml`))
|
val AcceptXml = Accept(MediaRange(MediaTypes.`text/xml`))
|
||||||
|
|
||||||
Get("/users").withHeaders(AcceptJson) ~> route ~> check {
|
Get("/tweets").withHeaders(AcceptJson) ~> route ~> check {
|
||||||
responseAs[String] shouldEqual
|
responseAs[String] shouldEqual
|
||||||
"""{"uid":1,"txt":"#Akka rocks!"}""" + "\n" +
|
"""{"uid":1,"txt":"#Akka rocks!"}""" + "\n" +
|
||||||
"""{"uid":2,"txt":"Streaming is so hot right now!"}""" + "\n" +
|
"""{"uid":2,"txt":"Streaming is so hot right now!"}""" + "\n" +
|
||||||
|
|
@ -63,7 +66,7 @@ class JsonStreamingExamplesSpec extends RoutingSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
// endpoint can only marshal Json, so it will *reject* requests for application/xml:
|
// endpoint can only marshal Json, so it will *reject* requests for application/xml:
|
||||||
Get("/users").withHeaders(AcceptXml) ~> route ~> check {
|
Get("/tweets").withHeaders(AcceptXml) ~> route ~> check {
|
||||||
handled should ===(false)
|
handled should ===(false)
|
||||||
rejection should ===(UnacceptedResponseContentTypeRejection(Set(ContentTypes.`application/json`)))
|
rejection should ===(UnacceptedResponseContentTypeRejection(Set(ContentTypes.`application/json`)))
|
||||||
}
|
}
|
||||||
|
|
@ -72,19 +75,19 @@ class JsonStreamingExamplesSpec extends RoutingSpec {
|
||||||
"response-streaming-modes" in {
|
"response-streaming-modes" in {
|
||||||
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
|
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
|
||||||
import MyJsonProtocol._
|
import MyJsonProtocol._
|
||||||
implicit val jsonRenderingMode = JsonSourceRenderingMode.LineByLine
|
implicit val jsonRenderingMode = JsonSourceRenderingModes.LineByLine
|
||||||
|
|
||||||
//#async-rendering
|
//#async-rendering
|
||||||
path("users") {
|
path("tweets") {
|
||||||
val users: Source[Tweet, NotUsed] = getTweets()
|
val tweets: Source[Tweet, NotUsed] = getTweets()
|
||||||
complete(users.renderAsync(parallelism = 8))
|
complete(tweets.renderAsync(parallelism = 8))
|
||||||
}
|
}
|
||||||
//#
|
//#
|
||||||
|
|
||||||
//#async-unordered-rendering
|
//#async-unordered-rendering
|
||||||
path("users" / "unordered") {
|
path("tweets" / "unordered") {
|
||||||
val users: Source[Tweet, NotUsed] = getTweets()
|
val tweets: Source[Tweet, NotUsed] = getTweets()
|
||||||
complete(users.renderAsyncUnordered(parallelism = 8))
|
complete(tweets.renderAsyncUnordered(parallelism = 8))
|
||||||
}
|
}
|
||||||
//#
|
//#
|
||||||
}
|
}
|
||||||
|
|
@ -94,7 +97,9 @@ class JsonStreamingExamplesSpec extends RoutingSpec {
|
||||||
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
|
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
|
||||||
|
|
||||||
// [1.1] import framing mode
|
// [1.1] import framing mode
|
||||||
implicit val jsonFramingMode = akka.http.scaladsl.server.JsonEntityFramingSupport.bracketCountingJsonFraming(Int.MaxValue)
|
import akka.http.scaladsl.server.EntityStreamingSupport
|
||||||
|
implicit val jsonFramingMode: FramingWithContentType =
|
||||||
|
EntityStreamingSupport.bracketCountingJsonFraming(Int.MaxValue)
|
||||||
|
|
||||||
// [2] import "my protocol", for unmarshalling Measurement objects:
|
// [2] import "my protocol", for unmarshalling Measurement objects:
|
||||||
import MyJsonProtocol._
|
import MyJsonProtocol._
|
||||||
|
|
@ -106,14 +111,10 @@ class JsonStreamingExamplesSpec extends RoutingSpec {
|
||||||
path("metrics") {
|
path("metrics") {
|
||||||
// [4] extract Source[Measurement, _]
|
// [4] extract Source[Measurement, _]
|
||||||
entity(asSourceOf[Measurement]) { measurements =>
|
entity(asSourceOf[Measurement]) { measurements =>
|
||||||
println("measurements = " + measurements)
|
|
||||||
val measurementsSubmitted: Future[Int] =
|
val measurementsSubmitted: Future[Int] =
|
||||||
measurements
|
measurements
|
||||||
.via(persistMetrics)
|
.via(persistMetrics)
|
||||||
.runFold(0) { (cnt, _) =>
|
.runFold(0) { (cnt, _) => cnt + 1 }
|
||||||
println("cnt = " + cnt)
|
|
||||||
cnt + 1
|
|
||||||
}
|
|
||||||
|
|
||||||
complete {
|
complete {
|
||||||
measurementsSubmitted.map(n => Map("msg" -> s"""Total metrics received: $n"""))
|
measurementsSubmitted.map(n => Map("msg" -> s"""Total metrics received: $n"""))
|
||||||
|
|
|
||||||
|
|
@ -71,8 +71,8 @@ class MarshallingDirectivesExamplesSpec extends RoutingSpec {
|
||||||
// tests:
|
// tests:
|
||||||
Get("/") ~> route ~> check {
|
Get("/") ~> route ~> check {
|
||||||
mediaType shouldEqual `application/json`
|
mediaType shouldEqual `application/json`
|
||||||
responseAs[String] should include(""""name": "Jane"""")
|
responseAs[String] should include(""""name":"Jane"""")
|
||||||
responseAs[String] should include(""""favoriteNumber": 42""")
|
responseAs[String] should include(""""favoriteNumber":42""")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -95,8 +95,8 @@ class MarshallingDirectivesExamplesSpec extends RoutingSpec {
|
||||||
Post("/", HttpEntity(`application/json`, """{ "name": "Jane", "favoriteNumber" : 42 }""")) ~>
|
Post("/", HttpEntity(`application/json`, """{ "name": "Jane", "favoriteNumber" : 42 }""")) ~>
|
||||||
route ~> check {
|
route ~> check {
|
||||||
mediaType shouldEqual `application/json`
|
mediaType shouldEqual `application/json`
|
||||||
responseAs[String] should include(""""name": "Jane"""")
|
responseAs[String] should include(""""name":"Jane"""")
|
||||||
responseAs[String] should include(""""favoriteNumber": 42""")
|
responseAs[String] should include(""""favoriteNumber":42""")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,7 @@ static content serving.
|
||||||
exception-handling
|
exception-handling
|
||||||
path-matchers
|
path-matchers
|
||||||
case-class-extraction
|
case-class-extraction
|
||||||
|
source-streaming-support
|
||||||
testkit
|
testkit
|
||||||
websocket-support
|
websocket-support
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,11 +1,27 @@
|
||||||
.. _json-streaming-scala:
|
.. _json-streaming-scala:
|
||||||
|
|
||||||
|
Source Streaming
|
||||||
|
================
|
||||||
|
|
||||||
|
Akka HTTP supports completing a request with an Akka ``Source[T, _]``, which makes it possible to very easily build
|
||||||
|
streaming end-to-end APIs which apply back-pressure throughout the entire stack.
|
||||||
|
|
||||||
|
It is possible to complete requests with raw ``Source[ByteString, _]``, however often it is more convenient to
|
||||||
|
stream on an element-by-element basis, and allow Akka HTTP to handle the rendering internally - for example as a JSON array,
|
||||||
|
or CSV stream (where each element is separated by a new-line).
|
||||||
|
|
||||||
|
In the following sections we investigate how to make use of the JSON Streaming infrastructure,
|
||||||
|
however the general hints apply to any kind of element-by-element streaming you could imagine.
|
||||||
|
|
||||||
|
It is possible to implement your own framing for any content type you might need, including bianary formats
|
||||||
|
by implementing :class:`FramingWithContentType`.
|
||||||
|
|
||||||
JSON Streaming
|
JSON Streaming
|
||||||
==============
|
==============
|
||||||
|
|
||||||
`JSON Streaming`_ is a term refering to streaming a (possibly infinite) stream of element as independent JSON
|
`JSON Streaming`_ is a term refering to streaming a (possibly infinite) stream of element as independent JSON
|
||||||
objects onto one continious HTTP connection. The elements are most often separated using newlines,
|
objects as a continuous HTTP request or response. The elements are most often separated using newlines,
|
||||||
however do not have to be and concatenating elements side-by-side or emitting "very long" JSON array is also another
|
however do not have to be. Concatenating elements side-by-side or emitting "very long" JSON array is also another
|
||||||
use case.
|
use case.
|
||||||
|
|
||||||
In the below examples, we'll be refering to the ``User`` and ``Measurement`` case classes as our model, which are defined as:
|
In the below examples, we'll be refering to the ``User`` and ``Measurement`` case classes as our model, which are defined as:
|
||||||
|
|
@ -30,7 +46,6 @@ Firstly, we'll need to get some additional marshalling infrastructure set up, th
|
||||||
Akka Streams ``Source[T,_]``. One such trait, containing the needed marshallers is ``SprayJsonSupport``, which uses
|
Akka Streams ``Source[T,_]``. One such trait, containing the needed marshallers is ``SprayJsonSupport``, which uses
|
||||||
spray-json (a high performance json parser library), and is shipped as part of Akka HTTP in the
|
spray-json (a high performance json parser library), and is shipped as part of Akka HTTP in the
|
||||||
``akka-http-spray-json-experimental`` module.
|
``akka-http-spray-json-experimental`` module.
|
||||||
to and from ``Source[T,_]`` by using spray-json provided
|
|
||||||
|
|
||||||
Next we import our model's marshallers, generated by spray-json.
|
Next we import our model's marshallers, generated by spray-json.
|
||||||
|
|
||||||
|
|
@ -68,7 +83,7 @@ in case one element in front of the stream takes a long time to marshall, yet ot
|
||||||
Consuming JSON Streaming uploads
|
Consuming JSON Streaming uploads
|
||||||
--------------------------------
|
--------------------------------
|
||||||
|
|
||||||
Sometimes the client may be sending in a streaming request, for example an embedded device initiated a connection with
|
Sometimes the client may be sending a streaming request, for example an embedded device initiated a connection with
|
||||||
the server and is feeding it with one line of measurement data.
|
the server and is feeding it with one line of measurement data.
|
||||||
|
|
||||||
In this example, we want to consume this data in a streaming fashion from the request entity, and also apply
|
In this example, we want to consume this data in a streaming fashion from the request entity, and also apply
|
||||||
|
|
@ -11,6 +11,7 @@ import akka.http.javadsl.model.RequestEntity;
|
||||||
import akka.http.javadsl.marshalling.Marshaller;
|
import akka.http.javadsl.marshalling.Marshaller;
|
||||||
import akka.http.javadsl.unmarshalling.Unmarshaller;
|
import akka.http.javadsl.unmarshalling.Unmarshaller;
|
||||||
|
|
||||||
|
import akka.util.ByteString;
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.MapperFeature;
|
import com.fasterxml.jackson.databind.MapperFeature;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
@ -31,6 +32,10 @@ public class Jackson {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static <T> Unmarshaller<ByteString, T> byteStringUnmarshaller(Class<T> expectedType) {
|
||||||
|
return byteStringUnmarshaller(defaultObjectMapper, expectedType);
|
||||||
|
}
|
||||||
|
|
||||||
public static <T> Unmarshaller<HttpEntity, T> unmarshaller(Class<T> expectedType) {
|
public static <T> Unmarshaller<HttpEntity, T> unmarshaller(Class<T> expectedType) {
|
||||||
return unmarshaller(defaultObjectMapper, expectedType);
|
return unmarshaller(defaultObjectMapper, expectedType);
|
||||||
}
|
}
|
||||||
|
|
@ -39,6 +44,10 @@ public class Jackson {
|
||||||
return Unmarshaller.forMediaType(MediaTypes.APPLICATION_JSON, Unmarshaller.entityToString())
|
return Unmarshaller.forMediaType(MediaTypes.APPLICATION_JSON, Unmarshaller.entityToString())
|
||||||
.thenApply(s -> fromJSON(mapper, s, expectedType));
|
.thenApply(s -> fromJSON(mapper, s, expectedType));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static <T> Unmarshaller<ByteString, T> byteStringUnmarshaller(ObjectMapper mapper, Class<T> expectedType) {
|
||||||
|
return Unmarshaller.sync(s -> fromJSON(mapper, s.utf8String(), expectedType));
|
||||||
|
}
|
||||||
|
|
||||||
private static String toJSON(ObjectMapper mapper, Object object) {
|
private static String toJSON(ObjectMapper mapper, Object object) {
|
||||||
try {
|
try {
|
||||||
|
|
|
||||||
|
|
@ -34,11 +34,11 @@ trait SprayJsonSupport {
|
||||||
JsonParser(input)
|
JsonParser(input)
|
||||||
}
|
}
|
||||||
|
|
||||||
implicit def sprayJsonMarshallerConverter[T](writer: RootJsonWriter[T])(implicit printer: JsonPrinter = PrettyPrinter): ToEntityMarshaller[T] =
|
implicit def sprayJsonMarshallerConverter[T](writer: RootJsonWriter[T])(implicit printer: JsonPrinter = CompactPrinter): ToEntityMarshaller[T] =
|
||||||
sprayJsonMarshaller[T](writer, printer)
|
sprayJsonMarshaller[T](writer, printer)
|
||||||
implicit def sprayJsonMarshaller[T](implicit writer: RootJsonWriter[T], printer: JsonPrinter = CompactPrinter): ToEntityMarshaller[T] =
|
implicit def sprayJsonMarshaller[T](implicit writer: RootJsonWriter[T], printer: JsonPrinter = CompactPrinter): ToEntityMarshaller[T] =
|
||||||
sprayJsValueMarshaller compose writer.write
|
sprayJsValueMarshaller compose writer.write
|
||||||
implicit def sprayJsValueMarshaller(implicit printer: JsonPrinter = PrettyPrinter): ToEntityMarshaller[JsValue] =
|
implicit def sprayJsValueMarshaller(implicit printer: JsonPrinter = CompactPrinter): ToEntityMarshaller[JsValue] =
|
||||||
Marshaller.StringMarshaller.wrap(MediaTypes.`application/json`)(printer)
|
Marshaller.StringMarshaller.wrap(MediaTypes.`application/json`)(printer)
|
||||||
implicit def sprayByteStringMarshaller[T](implicit writer: RootJsonFormat[T], printer: JsonPrinter = CompactPrinter): Marshaller[T, ByteString] =
|
implicit def sprayByteStringMarshaller[T](implicit writer: RootJsonFormat[T], printer: JsonPrinter = CompactPrinter): Marshaller[T, ByteString] =
|
||||||
sprayJsValueMarshaller.map(s ⇒ ByteString(s.toString)) compose writer.write
|
sprayJsValueMarshaller.map(s ⇒ ByteString(s.toString)) compose writer.write
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@ import akka.http.javadsl.common.JsonSourceRenderingModes;
|
||||||
import akka.stream.ActorMaterializer;
|
import akka.stream.ActorMaterializer;
|
||||||
import akka.stream.javadsl.Flow;
|
import akka.stream.javadsl.Flow;
|
||||||
import akka.stream.javadsl.Source;
|
import akka.stream.javadsl.Source;
|
||||||
|
import akka.util.ByteString;
|
||||||
import scala.concurrent.duration.Duration;
|
import scala.concurrent.duration.Duration;
|
||||||
import scala.runtime.BoxedUnit;
|
import scala.runtime.BoxedUnit;
|
||||||
|
|
||||||
|
|
@ -64,7 +65,7 @@ public class JavaTestServer extends AllDirectives { // or import static Directiv
|
||||||
path("java", () -> completeOKWithFutureString(CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Boom!"); }))))
|
path("java", () -> completeOKWithFutureString(CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Boom!"); }))))
|
||||||
);
|
);
|
||||||
|
|
||||||
final Unmarshaller<HttpEntity, JavaTweet> JavaTweets = Jackson.unmarshaller(JavaTweet.class);
|
final Unmarshaller<ByteString, JavaTweet> JavaTweets = Jackson.byteStringUnmarshaller(JavaTweet.class);
|
||||||
final Route tweets = path("tweets", () ->
|
final Route tweets = path("tweets", () ->
|
||||||
get(() ->
|
get(() ->
|
||||||
parameter(StringUnmarshallers.INTEGER, "n", n -> {
|
parameter(StringUnmarshallers.INTEGER, "n", n -> {
|
||||||
|
|
|
||||||
|
|
@ -184,7 +184,7 @@ class MarshallingDirectivesSpec extends RoutingSpec with Inside {
|
||||||
|
|
||||||
"render JSON with UTF-8 encoding if no `Accept-Charset` request header is present" in {
|
"render JSON with UTF-8 encoding if no `Accept-Charset` request header is present" in {
|
||||||
Get() ~> complete(foo) ~> check {
|
Get() ~> complete(foo) ~> check {
|
||||||
responseEntity shouldEqual HttpEntity(`application/json`, foo.toJson.prettyPrint)
|
responseEntity shouldEqual HttpEntity(`application/json`, foo.toJson.compactPrint)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"reject JSON rendering if an `Accept-Charset` request header requests a non-UTF-8 encoding" in {
|
"reject JSON rendering if an `Accept-Charset` request header requests a non-UTF-8 encoding" in {
|
||||||
|
|
|
||||||
|
|
@ -98,10 +98,7 @@ class RouteDirectivesSpec extends FreeSpec with GenericRoutingSpec {
|
||||||
import akka.http.scaladsl.model.headers.Accept
|
import akka.http.scaladsl.model.headers.Accept
|
||||||
Get().withHeaders(Accept(MediaTypes.`application/json`)) ~> route ~> check {
|
Get().withHeaders(Accept(MediaTypes.`application/json`)) ~> route ~> check {
|
||||||
responseAs[String] shouldEqual
|
responseAs[String] shouldEqual
|
||||||
"""{
|
"""{"name":"Ida","age":83}"""
|
||||||
| "name": "Ida",
|
|
||||||
| "age": 83
|
|
||||||
|}""".stripMarginWithNewline("\n")
|
|
||||||
}
|
}
|
||||||
Get().withHeaders(Accept(MediaTypes.`text/xml`)) ~> route ~> check {
|
Get().withHeaders(Accept(MediaTypes.`text/xml`)) ~> route ~> check {
|
||||||
responseAs[xml.NodeSeq] shouldEqual <data><name>Ida</name><age>83</age></data>
|
responseAs[xml.NodeSeq] shouldEqual <data><name>Ida</name><age>83</age></data>
|
||||||
|
|
|
||||||
|
|
@ -4,13 +4,23 @@
|
||||||
|
|
||||||
package akka.http.javadsl.common
|
package akka.http.javadsl.common
|
||||||
|
|
||||||
|
import akka.NotUsed
|
||||||
|
import akka.event.Logging
|
||||||
import akka.http.javadsl.model.ContentTypeRange
|
import akka.http.javadsl.model.ContentTypeRange
|
||||||
import akka.stream.javadsl.Framing
|
import akka.stream.javadsl.{ Flow, Framing }
|
||||||
|
import akka.util.ByteString
|
||||||
|
|
||||||
trait FramingWithContentType extends Framing { self ⇒
|
/**
|
||||||
|
* Wraps a framing [[akka.stream.javadsl.Flow]] (as provided by [[Framing]] for example)
|
||||||
|
* that chunks up incoming [[akka.util.ByteString]] according to some [[akka.http.javadsl.model.ContentType]]
|
||||||
|
* specific logic.
|
||||||
|
*/
|
||||||
|
abstract class FramingWithContentType { self ⇒
|
||||||
import akka.http.impl.util.JavaMapping.Implicits._
|
import akka.http.impl.util.JavaMapping.Implicits._
|
||||||
|
|
||||||
override def asScala: akka.http.scaladsl.common.FramingWithContentType =
|
def getFlow: Flow[ByteString, ByteString, NotUsed]
|
||||||
|
|
||||||
|
def asScala: akka.http.scaladsl.common.FramingWithContentType =
|
||||||
this match {
|
this match {
|
||||||
case f: akka.http.scaladsl.common.FramingWithContentType ⇒ f
|
case f: akka.http.scaladsl.common.FramingWithContentType ⇒ f
|
||||||
case _ ⇒ new akka.http.scaladsl.common.FramingWithContentType {
|
case _ ⇒ new akka.http.scaladsl.common.FramingWithContentType {
|
||||||
|
|
@ -21,4 +31,6 @@ trait FramingWithContentType extends Framing { self ⇒
|
||||||
|
|
||||||
def supported: ContentTypeRange
|
def supported: ContentTypeRange
|
||||||
def matches(ct: akka.http.javadsl.model.ContentType): Boolean = supported.matches(ct)
|
def matches(ct: akka.http.javadsl.model.ContentType): Boolean = supported.matches(ct)
|
||||||
|
|
||||||
|
override def toString = s"${Logging.simpleName(getClass)}($supported)"
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,8 @@ import akka.http.javadsl.model.{ ContentType, ContentTypes }
|
||||||
* Specialised rendering mode for streaming elements as JSON.
|
* Specialised rendering mode for streaming elements as JSON.
|
||||||
*
|
*
|
||||||
* See also: <a href="https://en.wikipedia.org/wiki/JSON_Streaming">JSON Streaming on Wikipedia</a>.
|
* See also: <a href="https://en.wikipedia.org/wiki/JSON_Streaming">JSON Streaming on Wikipedia</a>.
|
||||||
|
*
|
||||||
|
* See [[JsonSourceRenderingModes]] for commonly used pre-defined rendering modes.
|
||||||
*/
|
*/
|
||||||
trait JsonSourceRenderingMode extends SourceRenderingMode {
|
trait JsonSourceRenderingMode extends SourceRenderingMode {
|
||||||
override val contentType: ContentType.WithFixedCharset =
|
override val contentType: ContentType.WithFixedCharset =
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,41 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.http.javadsl.server
|
||||||
|
|
||||||
|
import akka.NotUsed
|
||||||
|
import akka.http.javadsl.common.{ FramingWithContentType, SourceRenderingMode }
|
||||||
|
import akka.http.javadsl.model.{ ContentTypeRange, MediaRanges }
|
||||||
|
import akka.http.scaladsl.server.ApplicationJsonBracketCountingFraming
|
||||||
|
import akka.stream.javadsl.{ Flow, Framing }
|
||||||
|
import akka.util.ByteString
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Entity streaming support, independent of used Json parsing library etc.
|
||||||
|
*
|
||||||
|
* Can be extended by various Support traits (e.g. "SprayJsonSupport"),
|
||||||
|
* in order to provide users with both `framing` (this trait) and `marshalling`
|
||||||
|
* (implemented by a library) by using a single trait.
|
||||||
|
*/
|
||||||
|
object EntityStreamingSupport {
|
||||||
|
// in the ScalaDSL version we make users implement abstract methods that are supposed to be
|
||||||
|
// implicit vals. This helps to guide in implementing the needed values, however in Java that would not really help.
|
||||||
|
|
||||||
|
/** `application/json` specific Framing implementation */
|
||||||
|
def bracketCountingJsonFraming(maximumObjectLength: Int): FramingWithContentType =
|
||||||
|
new ApplicationJsonBracketCountingFraming(maximumObjectLength)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Frames incoming `text / *` entities on a line-by-line basis.
|
||||||
|
* Useful for accepting `text/csv` uploads as a stream of rows.
|
||||||
|
*/
|
||||||
|
def newLineFraming(maximumObjectLength: Int, supportedContentTypes: ContentTypeRange): FramingWithContentType =
|
||||||
|
new FramingWithContentType {
|
||||||
|
override final val getFlow: Flow[ByteString, ByteString, NotUsed] =
|
||||||
|
Flow.of(classOf[ByteString]).via(Framing.delimiter(ByteString("\n"), maximumObjectLength))
|
||||||
|
|
||||||
|
override final val supported: ContentTypeRange =
|
||||||
|
akka.http.scaladsl.model.ContentTypeRange(akka.http.scaladsl.model.MediaRanges.`text/*`)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -3,35 +3,25 @@
|
||||||
*/
|
*/
|
||||||
package akka.http.javadsl.server.directives
|
package akka.http.javadsl.server.directives
|
||||||
|
|
||||||
import akka.http.javadsl.model.{ ContentType, HttpEntity }
|
|
||||||
import akka.util.ByteString
|
|
||||||
import java.util.{ List ⇒ JList, Map ⇒ JMap }
|
|
||||||
import java.util.AbstractMap.SimpleImmutableEntry
|
|
||||||
import java.util.Optional
|
|
||||||
import java.util.function.{ Function ⇒ JFunction }
|
import java.util.function.{ Function ⇒ JFunction }
|
||||||
|
import java.util.{ List ⇒ JList, Map ⇒ JMap }
|
||||||
|
|
||||||
import akka.NotUsed
|
import akka.NotUsed
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
|
||||||
import akka.http.impl.util.JavaMapping.Implicits._
|
|
||||||
import akka.http.javadsl.common.{ FramingWithContentType, SourceRenderingMode }
|
import akka.http.javadsl.common.{ FramingWithContentType, SourceRenderingMode }
|
||||||
|
import akka.http.javadsl.model.{ HttpEntity, _ }
|
||||||
import akka.http.javadsl.server.{ Marshaller, Route, Unmarshaller }
|
import akka.http.javadsl.server.{ Marshaller, Route, Unmarshaller }
|
||||||
import akka.http.javadsl.model._
|
import akka.http.scaladsl.marshalling.ToResponseMarshallable
|
||||||
import akka.http.scaladsl.marshalling.{ ToResponseMarshallable, ToResponseMarshaller }
|
|
||||||
import akka.http.scaladsl.server.{ Directives ⇒ D }
|
import akka.http.scaladsl.server.{ Directives ⇒ D }
|
||||||
import akka.http.scaladsl.unmarshalling
|
|
||||||
import akka.stream.javadsl.Source
|
import akka.stream.javadsl.Source
|
||||||
|
import akka.util.ByteString
|
||||||
|
|
||||||
/** EXPERIMENTAL API */
|
/** EXPERIMENTAL API */
|
||||||
abstract class FramedEntityStreamingDirectives extends TimeoutDirectives {
|
abstract class FramedEntityStreamingDirectives extends TimeoutDirectives {
|
||||||
// important import, as we implicitly resolve some marshallers inside the below directives
|
|
||||||
import akka.http.scaladsl.server.directives.FramedEntityStreamingDirectives._
|
|
||||||
|
|
||||||
@CorrespondsTo("asSourceOf")
|
@CorrespondsTo("asSourceOf")
|
||||||
def entityasSourceOf[T](um: Unmarshaller[HttpEntity, T], framing: FramingWithContentType,
|
def entityasSourceOf[T](um: Unmarshaller[ByteString, T], framing: FramingWithContentType,
|
||||||
inner: java.util.function.Function[Source[T, NotUsed], Route]): Route = RouteAdapter {
|
inner: java.util.function.Function[Source[T, NotUsed], Route]): Route = RouteAdapter {
|
||||||
val sum = um.asScalaCastInput[akka.http.scaladsl.model.HttpEntity]
|
D.entity(D.asSourceOf[T](framing.asScala)(um.asScala)) { s: akka.stream.scaladsl.Source[T, NotUsed] ⇒
|
||||||
D.entity(D.asSourceOf[T](framing.asScala)(sum)) { s: akka.stream.scaladsl.Source[T, NotUsed] ⇒
|
|
||||||
inner(s.asJava).delegate
|
inner(s.asJava).delegate
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -39,10 +29,9 @@ abstract class FramedEntityStreamingDirectives extends TimeoutDirectives {
|
||||||
@CorrespondsTo("asSourceOfAsync")
|
@CorrespondsTo("asSourceOfAsync")
|
||||||
def entityAsSourceAsyncOf[T](
|
def entityAsSourceAsyncOf[T](
|
||||||
parallelism: Int,
|
parallelism: Int,
|
||||||
um: Unmarshaller[HttpEntity, T], framing: FramingWithContentType,
|
um: Unmarshaller[ByteString, T], framing: FramingWithContentType,
|
||||||
inner: java.util.function.Function[Source[T, NotUsed], Route]): Route = RouteAdapter {
|
inner: java.util.function.Function[Source[T, NotUsed], Route]): Route = RouteAdapter {
|
||||||
val sum = um.asScalaCastInput[akka.http.scaladsl.model.HttpEntity]
|
D.entity(D.asSourceOfAsync[T](parallelism, framing.asScala)(um.asScala)) { s: akka.stream.scaladsl.Source[T, NotUsed] ⇒
|
||||||
D.entity(D.asSourceOfAsync[T](parallelism, framing.asScala)(sum)) { s: akka.stream.scaladsl.Source[T, NotUsed] ⇒
|
|
||||||
inner(s.asJava).delegate
|
inner(s.asJava).delegate
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -50,18 +39,16 @@ abstract class FramedEntityStreamingDirectives extends TimeoutDirectives {
|
||||||
@CorrespondsTo("asSourceOfAsyncUnordered")
|
@CorrespondsTo("asSourceOfAsyncUnordered")
|
||||||
def entityAsSourceAsyncUnorderedOf[T](
|
def entityAsSourceAsyncUnorderedOf[T](
|
||||||
parallelism: Int,
|
parallelism: Int,
|
||||||
um: Unmarshaller[HttpEntity, T], framing: FramingWithContentType,
|
um: Unmarshaller[ByteString, T], framing: FramingWithContentType,
|
||||||
inner: java.util.function.Function[Source[T, NotUsed], Route]): Route = RouteAdapter {
|
inner: java.util.function.Function[Source[T, NotUsed], Route]): Route = RouteAdapter {
|
||||||
val sum = um.asScalaCastInput[akka.http.scaladsl.model.HttpEntity]
|
D.entity(D.asSourceOfAsyncUnordered[T](parallelism, framing.asScala)(um.asScala)) { s: akka.stream.scaladsl.Source[T, NotUsed] ⇒
|
||||||
D.entity(D.asSourceOfAsyncUnordered[T](parallelism, framing.asScala)(sum)) { s: akka.stream.scaladsl.Source[T, NotUsed] ⇒
|
|
||||||
inner(s.asJava).delegate
|
inner(s.asJava).delegate
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// implicit used internally, Java caller does not benefit or use it
|
// implicits used internally, Java caller does not benefit or use it
|
||||||
@CorrespondsTo("complete")
|
@CorrespondsTo("complete")
|
||||||
def completeWithSource[T, M](implicit source: Source[T, M], m: Marshaller[T, ByteString], rendering: SourceRenderingMode): Route = RouteAdapter {
|
def completeWithSource[T, M](implicit source: Source[T, M], m: Marshaller[T, ByteString], rendering: SourceRenderingMode): Route = RouteAdapter {
|
||||||
import akka.http.scaladsl.marshalling.PredefinedToResponseMarshallers._
|
|
||||||
implicit val mm = _sourceMarshaller(m.map(ByteStringAsEntityFn), rendering)
|
implicit val mm = _sourceMarshaller(m.map(ByteStringAsEntityFn), rendering)
|
||||||
val response = ToResponseMarshallable(source)
|
val response = ToResponseMarshallable(source)
|
||||||
D.complete(response)
|
D.complete(response)
|
||||||
|
|
@ -75,8 +62,8 @@ abstract class FramedEntityStreamingDirectives extends TimeoutDirectives {
|
||||||
}
|
}
|
||||||
|
|
||||||
implicit private def _sourceMarshaller[T, M](implicit m: Marshaller[T, HttpEntity], rendering: SourceRenderingMode) = {
|
implicit private def _sourceMarshaller[T, M](implicit m: Marshaller[T, HttpEntity], rendering: SourceRenderingMode) = {
|
||||||
import akka.http.javadsl.server.RoutingJavaMapping._
|
|
||||||
import akka.http.javadsl.server.RoutingJavaMapping.Implicits._
|
import akka.http.javadsl.server.RoutingJavaMapping.Implicits._
|
||||||
|
import akka.http.javadsl.server.RoutingJavaMapping._
|
||||||
val mm = m.asScalaCastOutput
|
val mm = m.asScalaCastOutput
|
||||||
D._sourceMarshaller[T, M](mm, rendering.asScala).compose({ h: akka.stream.javadsl.Source[T, M] ⇒ h.asScala })
|
D._sourceMarshaller[T, M](mm, rendering.asScala).compose({ h: akka.stream.javadsl.Source[T, M] ⇒ h.asScala })
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,16 +6,18 @@ package akka.http.scaladsl.common
|
||||||
|
|
||||||
import akka.NotUsed
|
import akka.NotUsed
|
||||||
import akka.event.Logging
|
import akka.event.Logging
|
||||||
import akka.http.scaladsl.model.{ ContentType, ContentTypeRange }
|
import akka.http.scaladsl.model.ContentTypeRange
|
||||||
import akka.stream.scaladsl.{ Flow, Framing }
|
import akka.stream.scaladsl.{ Flow, Framing }
|
||||||
import akka.util.ByteString
|
import akka.util.ByteString
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Same as [[akka.stream.scaladsl.Framing]] but additionally can express which [[ContentType]] it supports,
|
* Wraps a framing [[akka.stream.scaladsl.Flow]] (as provided by [[Framing]] for example)
|
||||||
* which can be used to reject routes if content type does not match used framing.
|
* that chunks up incoming [[akka.util.ByteString]] according to some [[akka.http.javadsl.model.ContentType]]
|
||||||
|
* specific logic.
|
||||||
*/
|
*/
|
||||||
abstract class FramingWithContentType extends akka.http.javadsl.common.FramingWithContentType with Framing {
|
abstract class FramingWithContentType extends akka.http.javadsl.common.FramingWithContentType {
|
||||||
def flow: Flow[ByteString, ByteString, NotUsed]
|
def flow: Flow[ByteString, ByteString, NotUsed]
|
||||||
|
override final def getFlow = flow.asJava
|
||||||
override def supported: ContentTypeRange
|
override def supported: ContentTypeRange
|
||||||
override def matches(ct: akka.http.javadsl.model.ContentType): Boolean = supported.matches(ct)
|
override def matches(ct: akka.http.javadsl.model.ContentType): Boolean = supported.matches(ct)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,8 @@ import akka.util.ByteString
|
||||||
* Specialised rendering mode for streaming elements as JSON.
|
* Specialised rendering mode for streaming elements as JSON.
|
||||||
*
|
*
|
||||||
* See also: <a href="https://en.wikipedia.org/wiki/JSON_Streaming">JSON Streaming on Wikipedia</a>.
|
* See also: <a href="https://en.wikipedia.org/wiki/JSON_Streaming">JSON Streaming on Wikipedia</a>.
|
||||||
|
*
|
||||||
|
* See [[JsonSourceRenderingModes]] for commonly used pre-defined rendering modes.
|
||||||
*/
|
*/
|
||||||
trait JsonSourceRenderingMode extends akka.http.javadsl.common.JsonSourceRenderingMode with SourceRenderingMode {
|
trait JsonSourceRenderingMode extends akka.http.javadsl.common.JsonSourceRenderingMode with SourceRenderingMode {
|
||||||
override val contentType: ContentType.WithFixedCharset =
|
override val contentType: ContentType.WithFixedCharset =
|
||||||
|
|
|
||||||
|
|
@ -4,13 +4,10 @@
|
||||||
package akka.http.scaladsl.server
|
package akka.http.scaladsl.server
|
||||||
|
|
||||||
import akka.NotUsed
|
import akka.NotUsed
|
||||||
import akka.actor.ActorSystem
|
|
||||||
import akka.event.Logging
|
|
||||||
import akka.http.scaladsl.common.{ FramingWithContentType, SourceRenderingMode }
|
import akka.http.scaladsl.common.{ FramingWithContentType, SourceRenderingMode }
|
||||||
import akka.http.scaladsl.model.{ ContentTypeRange, ContentTypes, MediaRange, MediaRanges }
|
import akka.http.scaladsl.model.{ ContentTypeRange, ContentTypes, MediaRanges }
|
||||||
import akka.stream.scaladsl.{ Flow, Framing }
|
import akka.stream.scaladsl.{ Flow, Framing }
|
||||||
import akka.util.ByteString
|
import akka.util.ByteString
|
||||||
import com.typesafe.config.Config
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Entity streaming support, independent of used Json parsing library etc.
|
* Entity streaming support, independent of used Json parsing library etc.
|
||||||
|
|
@ -45,13 +42,7 @@ trait EntityStreamingSupportBase {
|
||||||
* Useful for accepting `text/csv` uploads as a stream of rows.
|
* Useful for accepting `text/csv` uploads as a stream of rows.
|
||||||
*/
|
*/
|
||||||
def newLineFraming(maximumObjectLength: Int, supportedContentTypes: ContentTypeRange): FramingWithContentType =
|
def newLineFraming(maximumObjectLength: Int, supportedContentTypes: ContentTypeRange): FramingWithContentType =
|
||||||
new FramingWithContentType {
|
new TextNewLineFraming(maximumObjectLength, supportedContentTypes)
|
||||||
override final val flow: Flow[ByteString, ByteString, NotUsed] =
|
|
||||||
Flow[ByteString].via(Framing.delimiter(ByteString("\n"), maximumObjectLength))
|
|
||||||
|
|
||||||
override final val supported: ContentTypeRange =
|
|
||||||
ContentTypeRange(MediaRanges.`text/*`)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -67,3 +58,11 @@ final class ApplicationJsonBracketCountingFraming(maximumObjectLength: Int) exte
|
||||||
override final val flow = Flow[ByteString].via(akka.stream.scaladsl.JsonFraming.bracketCounting(maximumObjectLength))
|
override final val flow = Flow[ByteString].via(akka.stream.scaladsl.JsonFraming.bracketCounting(maximumObjectLength))
|
||||||
override final val supported = ContentTypeRange(ContentTypes.`application/json`)
|
override final val supported = ContentTypeRange(ContentTypes.`application/json`)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final class TextNewLineFraming(maximumLineLength: Int, supportedContentTypes: ContentTypeRange) extends FramingWithContentType {
|
||||||
|
override final val flow: Flow[ByteString, ByteString, NotUsed] =
|
||||||
|
Flow[ByteString].via(Framing.delimiter(ByteString("\n"), maximumLineLength))
|
||||||
|
|
||||||
|
override final val supported: ContentTypeRange =
|
||||||
|
ContentTypeRange(MediaRanges.`text/*`)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,48 +11,150 @@ import akka.http.scaladsl.unmarshalling.{ Unmarshal, Unmarshaller, _ }
|
||||||
import akka.http.scaladsl.util.FastFuture
|
import akka.http.scaladsl.util.FastFuture
|
||||||
import akka.stream.Materializer
|
import akka.stream.Materializer
|
||||||
import akka.stream.impl.ConstantFun
|
import akka.stream.impl.ConstantFun
|
||||||
import akka.stream.scaladsl.{ Flow, Source }
|
import akka.stream.scaladsl.{ Flow, Keep, Source }
|
||||||
import akka.util.ByteString
|
import akka.util.ByteString
|
||||||
|
|
||||||
import scala.concurrent.ExecutionContext
|
import scala.concurrent.ExecutionContext
|
||||||
import scala.language.implicitConversions
|
import scala.language.implicitConversions
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Allows the [[MarshallingDirectives.entity]] directive to extract a `stream[T]` for framed messages.
|
* Allows the [[MarshallingDirectives.entity]] directive to extract a [[Source]] of elements.
|
||||||
* See `JsonEntityStreamingSupport` and classes extending it, such as `SprayJsonSupport` to get marshallers.
|
*
|
||||||
|
* See [[akka.http.scaladsl.server.EntityStreamingSupport]] for useful default [[FramingWithContentType]] instances and
|
||||||
|
* support traits such as `SprayJsonSupport` (or your other favourite JSON library) to provide the needed [[Marshaller]] s.
|
||||||
*/
|
*/
|
||||||
trait FramedEntityStreamingDirectives extends MarshallingDirectives {
|
trait FramedEntityStreamingDirectives extends MarshallingDirectives {
|
||||||
import FramedEntityStreamingDirectives._
|
import FramedEntityStreamingDirectives._
|
||||||
|
|
||||||
type RequestToSourceUnmarshaller[T] = FromRequestUnmarshaller[Source[T, NotUsed]]
|
type RequestToSourceUnmarshaller[T] = FromRequestUnmarshaller[Source[T, NotUsed]]
|
||||||
|
|
||||||
// TODO DOCS
|
/**
|
||||||
|
* Extracts entity as [[Source]] of elements of type `T`.
|
||||||
final def asSourceOf[T](implicit um: Unmarshaller[HttpEntity, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] =
|
* This is achieved by applying the implicitly provided (in the following order):
|
||||||
asSourceOfAsync(1)(um, framing)
|
*
|
||||||
final def asSourceOf[T](framing: FramingWithContentType)(implicit um: Unmarshaller[HttpEntity, T]): RequestToSourceUnmarshaller[T] =
|
* - 1st: [[FramingWithContentType]] in order to chunk-up the incoming [[ByteString]]s according to the
|
||||||
|
* `Content-Type` aware framing (for example, [[akka.http.scaladsl.server.EntityStreamingSupport.bracketCountingJsonFraming]]).
|
||||||
|
* - 2nd: [[Unmarshaller]] (from [[ByteString]] to `T`) for each of the respective "chunks" (e.g. for each JSON element contained within an array).
|
||||||
|
*
|
||||||
|
* The request will be rejected with an [[akka.http.scaladsl.server.UnsupportedRequestContentTypeRejection]] if
|
||||||
|
* its [[ContentType]] is not supported by the used `framing` or `unmarshaller`.
|
||||||
|
*
|
||||||
|
* It is recommended to use the [[akka.http.scaladsl.server.EntityStreamingSupport]] trait in conjunction with this
|
||||||
|
* directive as it helps provide the right [[FramingWithContentType]] and [[SourceRenderingMode]] for the most
|
||||||
|
* typical usage scenarios (JSON, CSV, ...).
|
||||||
|
*
|
||||||
|
* Cancelling extracted [[Source]] closes the connection abruptly (same as cancelling the `entity.dataBytes`).
|
||||||
|
*
|
||||||
|
* If looking to improve marshalling performance in face of many elements (possibly of different sizes),
|
||||||
|
* you may be interested in using [[asSourceOfAsyncUnordered]] instead.
|
||||||
|
*
|
||||||
|
* See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route.
|
||||||
|
* By default the uploaded data is limited by the `akka.http.parsing.max-content-length`.
|
||||||
|
*/
|
||||||
|
final def asSourceOf[T](implicit um: Unmarshaller[ByteString, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] =
|
||||||
asSourceOfAsync(1)(um, framing)
|
asSourceOfAsync(1)(um, framing)
|
||||||
|
|
||||||
final def asSourceOfAsync[T](parallelism: Int)(implicit um: Unmarshaller[HttpEntity, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] =
|
/**
|
||||||
asSourceOfInternal[T](framing, (ec, mat) ⇒ Flow[HttpEntity].mapAsync(parallelism)(Unmarshal(_).to[T](um, ec, mat)))
|
* Extracts entity as [[Source]] of elements of type `T`.
|
||||||
final def asSourceOfAsync[T](parallelism: Int, framing: FramingWithContentType)(implicit um: Unmarshaller[HttpEntity, T]): RequestToSourceUnmarshaller[T] =
|
* This is achieved by applying the implicitly provided (in the following order):
|
||||||
|
*
|
||||||
|
* - 1st: [[FramingWithContentType]] in order to chunk-up the incoming [[ByteString]]s according to the
|
||||||
|
* `Content-Type` aware framing (for example, [[akka.http.scaladsl.server.EntityStreamingSupport.bracketCountingJsonFraming]]).
|
||||||
|
* - 2nd: [[Unmarshaller]] (from [[ByteString]] to `T`) for each of the respective "chunks" (e.g. for each JSON element contained within an array).
|
||||||
|
*
|
||||||
|
* The request will be rejected with an [[akka.http.scaladsl.server.UnsupportedRequestContentTypeRejection]] if
|
||||||
|
* its [[ContentType]] is not supported by the used `framing` or `unmarshaller`.
|
||||||
|
*
|
||||||
|
* It is recommended to use the [[akka.http.scaladsl.server.EntityStreamingSupport]] trait in conjunction with this
|
||||||
|
* directive as it helps provide the right [[FramingWithContentType]] and [[SourceRenderingMode]] for the most
|
||||||
|
* typical usage scenarios (JSON, CSV, ...).
|
||||||
|
*
|
||||||
|
* Cancelling extracted [[Source]] closes the connection abruptly (same as cancelling the `entity.dataBytes`).
|
||||||
|
*
|
||||||
|
* If looking to improve marshalling performance in face of many elements (possibly of different sizes),
|
||||||
|
* you may be interested in using [[asSourceOfAsyncUnordered]] instead.
|
||||||
|
*
|
||||||
|
* See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route.
|
||||||
|
* By default the uploaded data is limited by the `akka.http.parsing.max-content-length`.
|
||||||
|
*/
|
||||||
|
final def asSourceOf[T](framing: FramingWithContentType)(implicit um: Unmarshaller[ByteString, T]): RequestToSourceUnmarshaller[T] =
|
||||||
|
asSourceOfAsync(1)(um, framing)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Similar to [[asSourceOf]] however will apply at most `parallelism` unmarshallers in parallel.
|
||||||
|
*
|
||||||
|
* The source elements emitted preserve the order in which they are sent in the incoming [[HttpRequest]].
|
||||||
|
* If you want to sacrivice ordering in favour of (potential) slight performance improvements in reading the input
|
||||||
|
* you may want to use [[asSourceOfAsyncUnordered]] instead, which lifts the ordering guarantee.
|
||||||
|
*
|
||||||
|
* Refer to [[asSourceOf]] for more in depth-documentation and guidelines.
|
||||||
|
*
|
||||||
|
* If looking to improve marshalling performance in face of many elements (possibly of different sizes),
|
||||||
|
* you may be interested in using [[asSourceOfAsyncUnordered]] instead.
|
||||||
|
*
|
||||||
|
* See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route.
|
||||||
|
* By default the uploaded data is limited by the `akka.http.parsing.max-content-length`.
|
||||||
|
*/
|
||||||
|
final def asSourceOfAsync[T](parallelism: Int)(implicit um: Unmarshaller[ByteString, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] =
|
||||||
|
asSourceOfInternal[T](framing, (ec, mat) ⇒ Flow[ByteString].mapAsync(parallelism)(Unmarshal(_).to[T](um, ec, mat)))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Similar to [[asSourceOf]] however will apply at most `parallelism` unmarshallers in parallel.
|
||||||
|
*
|
||||||
|
* The source elements emitted preserve the order in which they are sent in the incoming [[HttpRequest]].
|
||||||
|
* If you want to sacrivice ordering in favour of (potential) slight performance improvements in reading the input
|
||||||
|
* you may want to use [[asSourceOfAsyncUnordered]] instead, which lifts the ordering guarantee.
|
||||||
|
*
|
||||||
|
* Refer to [[asSourceOf]] for more in depth-documentation and guidelines.
|
||||||
|
*
|
||||||
|
* See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route.
|
||||||
|
* By default the uploaded data is limited by the `akka.http.parsing.max-content-length`.
|
||||||
|
*/
|
||||||
|
final def asSourceOfAsync[T](parallelism: Int, framing: FramingWithContentType)(implicit um: Unmarshaller[ByteString, T]): RequestToSourceUnmarshaller[T] =
|
||||||
asSourceOfAsync(parallelism)(um, framing)
|
asSourceOfAsync(parallelism)(um, framing)
|
||||||
|
|
||||||
final def asSourceOfAsyncUnordered[T](parallelism: Int)(implicit um: Unmarshaller[HttpEntity, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] =
|
/**
|
||||||
asSourceOfInternal[T](framing, (ec, mat) ⇒ Flow[HttpEntity].mapAsyncUnordered(parallelism)(Unmarshal(_).to[T](um, ec, mat)))
|
* Similar to [[asSourceOfAsync]], as it will apply at most `parallelism` unmarshallers in parallel.
|
||||||
final def asSourceOfAsyncUnordered[T](parallelism: Int, framing: FramingWithContentType)(implicit um: Unmarshaller[HttpEntity, T]): RequestToSourceUnmarshaller[T] =
|
*
|
||||||
|
* The source elements emitted preserve the order in which they are sent in the incoming [[HttpRequest]].
|
||||||
|
* If you want to sacrivice ordering in favour of (potential) slight performance improvements in reading the input
|
||||||
|
* you may want to use [[asSourceOfAsyncUnordered]] instead, which lifts the ordering guarantee.
|
||||||
|
*
|
||||||
|
* Refer to [[asSourceOf]] for more in depth-documentation and guidelines.
|
||||||
|
*
|
||||||
|
* See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route.
|
||||||
|
* By default the uploaded data is limited by the `akka.http.parsing.max-content-length`.
|
||||||
|
*/
|
||||||
|
final def asSourceOfAsyncUnordered[T](parallelism: Int)(implicit um: Unmarshaller[ByteString, T], framing: FramingWithContentType): RequestToSourceUnmarshaller[T] =
|
||||||
|
asSourceOfInternal[T](framing, (ec, mat) ⇒ Flow[ByteString].mapAsyncUnordered(parallelism)(Unmarshal(_).to[T](um, ec, mat)))
|
||||||
|
/**
|
||||||
|
* Similar to [[asSourceOfAsync]], as it will apply at most `parallelism` unmarshallers in parallel.
|
||||||
|
*
|
||||||
|
* The source elements emitted preserve the order in which they are sent in the incoming [[HttpRequest]].
|
||||||
|
* If you want to sacrivice ordering in favour of (potential) slight performance improvements in reading the input
|
||||||
|
* you may want to use [[asSourceOfAsyncUnordered]] instead, which lifts the ordering guarantee.
|
||||||
|
*
|
||||||
|
* Refer to [[asSourceOf]] for more in depth-documentation and guidelines.
|
||||||
|
*
|
||||||
|
* See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route.
|
||||||
|
* By default the uploaded data is limited by the `akka.http.parsing.max-content-length`.
|
||||||
|
*/
|
||||||
|
final def asSourceOfAsyncUnordered[T](parallelism: Int, framing: FramingWithContentType)(implicit um: Unmarshaller[ByteString, T]): RequestToSourceUnmarshaller[T] =
|
||||||
asSourceOfAsyncUnordered(parallelism)(um, framing)
|
asSourceOfAsyncUnordered(parallelism)(um, framing)
|
||||||
|
|
||||||
// format: OFF
|
// format: OFF
|
||||||
private def asSourceOfInternal[T](framing: FramingWithContentType, marshalling: (ExecutionContext, Materializer) => Flow[HttpEntity, ByteString, NotUsed]#ReprMat[T, NotUsed]): RequestToSourceUnmarshaller[T] =
|
private final def asSourceOfInternal[T](framing: FramingWithContentType, marshalling: (ExecutionContext, Materializer) => Flow[ByteString, ByteString, NotUsed]#ReprMat[T, NotUsed]): RequestToSourceUnmarshaller[T] =
|
||||||
Unmarshaller.withMaterializer[HttpRequest, Source[T, NotUsed]] { implicit ec ⇒ implicit mat ⇒ req ⇒
|
Unmarshaller.withMaterializer[HttpRequest, Source[T, NotUsed]] { implicit ec ⇒ implicit mat ⇒ req ⇒
|
||||||
val entity = req.entity
|
val entity = req.entity
|
||||||
if (!framing.matches(entity.contentType)) {
|
if (!framing.matches(entity.contentType)) {
|
||||||
val supportedContentTypes = framing.supported
|
val supportedContentTypes = framing.supported
|
||||||
FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(supportedContentTypes))
|
FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(supportedContentTypes))
|
||||||
} else {
|
} else {
|
||||||
// val stream = entity.dataBytes.via(framing.flow).via(marshalling(ec, mat)).mapMaterializedValue(_ => NotUsed)
|
val bytes = entity.dataBytes
|
||||||
val stream = Source.single(entity.transformDataBytes(framing.flow)).via(marshalling(ec, mat)).mapMaterializedValue(_ => NotUsed)
|
val frames = bytes.viaMat(framing.flow)(Keep.right)
|
||||||
|
val elements = frames.viaMat(marshalling(ec, mat))(Keep.right)
|
||||||
|
val stream = elements.mapMaterializedValue(_ => NotUsed)
|
||||||
|
// val stream = Source.single(entity.transformDataBytes(framing.flow)).via(marshalling(ec, mat)).mapMaterializedValue(_ => NotUsed)
|
||||||
FastFuture.successful(stream)
|
FastFuture.successful(stream)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -108,6 +210,11 @@ trait FramedEntityStreamingDirectives extends MarshallingDirectives {
|
||||||
new EnableSpecialSourceRenderingModes(source)
|
new EnableSpecialSourceRenderingModes(source)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
/**
|
||||||
|
* Allows the [[MarshallingDirectives.entity]] directive to extract a [[Source]] of elements.
|
||||||
|
*
|
||||||
|
* See [[FramedEntityStreamingDirectives]] for detailed documentation.
|
||||||
|
*/
|
||||||
object FramedEntityStreamingDirectives extends FramedEntityStreamingDirectives {
|
object FramedEntityStreamingDirectives extends FramedEntityStreamingDirectives {
|
||||||
sealed class AsyncSourceRenderingMode
|
sealed class AsyncSourceRenderingMode
|
||||||
final class AsyncRenderingOf[T](val source: Source[T, Any], val parallelism: Int) extends AsyncSourceRenderingMode
|
final class AsyncRenderingOf[T](val source: Source[T, Any], val parallelism: Int) extends AsyncSourceRenderingMode
|
||||||
|
|
@ -115,6 +222,7 @@ object FramedEntityStreamingDirectives extends FramedEntityStreamingDirectives {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Provides DSL for special rendering modes, e.g. `complete(source.renderAsync)` */
|
||||||
final class EnableSpecialSourceRenderingModes[T](val source: Source[T, Any]) extends AnyVal {
|
final class EnableSpecialSourceRenderingModes[T](val source: Source[T, Any]) extends AnyVal {
|
||||||
/**
|
/**
|
||||||
* Causes the response stream to be marshalled asynchronously (up to `parallelism` elements at once),
|
* Causes the response stream to be marshalled asynchronously (up to `parallelism` elements at once),
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,7 @@ class JsonFramingSpec extends AkkaSpec {
|
||||||
implicit val mat = ActorMaterializer()
|
implicit val mat = ActorMaterializer()
|
||||||
|
|
||||||
"collecting multiple json" should {
|
"collecting multiple json" should {
|
||||||
"xoxo parse json array" in {
|
"parse json array" in {
|
||||||
val input =
|
val input =
|
||||||
"""
|
"""
|
||||||
|[
|
|[
|
||||||
|
|
@ -38,9 +38,9 @@ class JsonFramingSpec extends AkkaSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
result.futureValue shouldBe Seq(
|
result.futureValue shouldBe Seq(
|
||||||
"""{ "name" : "john" }""".stripMargin,
|
"""{ "name" : "john" }""",
|
||||||
"""{ "name" : "jack" }""".stripMargin,
|
"""{ "name" : "jack" }""",
|
||||||
"""{ "name" : "katie" }""".stripMargin)
|
"""{ "name" : "katie" }""")
|
||||||
}
|
}
|
||||||
|
|
||||||
"emit single json element from string" in {
|
"emit single json element from string" in {
|
||||||
|
|
@ -56,7 +56,7 @@ class JsonFramingSpec extends AkkaSpec {
|
||||||
case (acc, entry) ⇒ acc ++ Seq(entry.utf8String)
|
case (acc, entry) ⇒ acc ++ Seq(entry.utf8String)
|
||||||
}
|
}
|
||||||
|
|
||||||
Await.result(result, 3.seconds) shouldBe Seq("""{ "name": "john" }""".stripMargin)
|
Await.result(result, 3.seconds) shouldBe Seq("""{ "name": "john" }""")
|
||||||
}
|
}
|
||||||
|
|
||||||
"parse line delimited" in {
|
"parse line delimited" in {
|
||||||
|
|
@ -73,9 +73,9 @@ class JsonFramingSpec extends AkkaSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
Await.result(result, 3.seconds) shouldBe Seq(
|
Await.result(result, 3.seconds) shouldBe Seq(
|
||||||
"""{ "name": "john" }""".stripMargin,
|
"""{ "name": "john" }""",
|
||||||
"""{ "name": "jack" }""".stripMargin,
|
"""{ "name": "jack" }""",
|
||||||
"""{ "name": "katie" }""".stripMargin)
|
"""{ "name": "katie" }""")
|
||||||
}
|
}
|
||||||
|
|
||||||
"parse comma delimited" in {
|
"parse comma delimited" in {
|
||||||
|
|
@ -91,7 +91,7 @@ class JsonFramingSpec extends AkkaSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
result.futureValue shouldBe Seq(
|
result.futureValue shouldBe Seq(
|
||||||
"""{ "name": "john" }""".stripMargin,
|
"""{ "name": "john" }""",
|
||||||
"""{ "name": "jack" }""",
|
"""{ "name": "jack" }""",
|
||||||
"""{ "name": "katie" }""")
|
"""{ "name": "katie" }""")
|
||||||
}
|
}
|
||||||
|
|
@ -121,7 +121,6 @@ class JsonFramingSpec extends AkkaSpec {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO fold these specs into the previous section
|
|
||||||
"collecting json buffer" when {
|
"collecting json buffer" when {
|
||||||
"nothing is supplied" should {
|
"nothing is supplied" should {
|
||||||
"return nothing" in {
|
"return nothing" in {
|
||||||
|
|
@ -378,7 +377,7 @@ class JsonFramingSpec extends AkkaSpec {
|
||||||
"returns none until valid json is encountered" in {
|
"returns none until valid json is encountered" in {
|
||||||
val buffer = new JsonBracketCounting()
|
val buffer = new JsonBracketCounting()
|
||||||
|
|
||||||
"""{ "name": "john"""".stripMargin.foreach {
|
"""{ "name": "john"""".foreach {
|
||||||
c ⇒
|
c ⇒
|
||||||
buffer.offer(ByteString(c))
|
buffer.offer(ByteString(c))
|
||||||
buffer.poll() should ===(None)
|
buffer.poll() should ===(None)
|
||||||
|
|
@ -434,7 +433,7 @@ class JsonFramingSpec extends AkkaSpec {
|
||||||
probe.ensureSubscription()
|
probe.ensureSubscription()
|
||||||
probe
|
probe
|
||||||
.request(1)
|
.request(1)
|
||||||
.expectNext(ByteString("""{ "name": "john" }""")) // FIXME we should not impact the given json in Framing
|
.expectNext(ByteString("""{ "name": "john" }"""))
|
||||||
.request(1)
|
.request(1)
|
||||||
.expectNext(ByteString("""{ "name": "jack" }"""))
|
.expectNext(ByteString("""{ "name": "jack" }"""))
|
||||||
.request(1)
|
.request(1)
|
||||||
|
|
|
||||||
|
|
@ -8,15 +8,18 @@ import akka.util.ByteString
|
||||||
|
|
||||||
import scala.annotation.switch
|
import scala.annotation.switch
|
||||||
|
|
||||||
object JsonBracketCounting {
|
/**
|
||||||
|
* INTERNAL API: Use [[akka.stream.scaladsl.JsonFraming]] instead.
|
||||||
|
*/
|
||||||
|
private[akka] object JsonBracketCounting {
|
||||||
|
|
||||||
final val SquareBraceStart = "[".getBytes.head
|
final val SquareBraceStart = '['.toByte
|
||||||
final val SquareBraceEnd = "]".getBytes.head
|
final val SquareBraceEnd = ']'.toByte
|
||||||
final val CurlyBraceStart = "{".getBytes.head
|
final val CurlyBraceStart = '{'.toByte
|
||||||
final val CurlyBraceEnd = "}".getBytes.head
|
final val CurlyBraceEnd = '}'.toByte
|
||||||
final val DoubleQuote = "\"".getBytes.head
|
final val DoubleQuote = '\''.toByte
|
||||||
final val Backslash = "\\".getBytes.head
|
final val Backslash = '\\'.toByte
|
||||||
final val Comma = ",".getBytes.head
|
final val Comma = ','.toByte
|
||||||
|
|
||||||
final val LineBreak = '\n'.toByte
|
final val LineBreak = '\n'.toByte
|
||||||
final val LineBreak2 = '\r'.toByte
|
final val LineBreak2 = '\r'.toByte
|
||||||
|
|
@ -31,13 +34,15 @@ object JsonBracketCounting {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* INTERNAL API: Use [[akka.stream.scaladsl.JsonFraming]] instead.
|
||||||
|
*
|
||||||
* **Mutable** framing implementation that given any number of [[ByteString]] chunks, can emit JSON objects contained within them.
|
* **Mutable** framing implementation that given any number of [[ByteString]] chunks, can emit JSON objects contained within them.
|
||||||
* Typically JSON objects are separated by new-lines or comas, however a top-level JSON Array can also be understood and chunked up
|
* Typically JSON objects are separated by new-lines or comas, however a top-level JSON Array can also be understood and chunked up
|
||||||
* into valid JSON objects by this framing implementation.
|
* into valid JSON objects by this framing implementation.
|
||||||
*
|
*
|
||||||
* Leading whitespace between elements will be trimmed.
|
* Leading whitespace between elements will be trimmed.
|
||||||
*/
|
*/
|
||||||
class JsonBracketCounting(maximumObjectLength: Int = Int.MaxValue) {
|
private[akka] class JsonBracketCounting(maximumObjectLength: Int = Int.MaxValue) {
|
||||||
import JsonBracketCounting._
|
import JsonBracketCounting._
|
||||||
|
|
||||||
private var buffer: ByteString = ByteString.empty
|
private var buffer: ByteString = ByteString.empty
|
||||||
|
|
|
||||||
|
|
@ -115,18 +115,3 @@ object Framing {
|
||||||
scaladsl.Framing.simpleFramingProtocol(maximumMessageLength).asJava
|
scaladsl.Framing.simpleFramingProtocol(maximumMessageLength).asJava
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrapper around a framing Flow (as provided by [[Framing.delimiter]] for example.
|
|
||||||
* Used for providing a framing implicitly for other components which may need one (such as framed entity streaming in Akka HTTP).
|
|
||||||
*/
|
|
||||||
trait Framing {
|
|
||||||
def asScala: akka.stream.scaladsl.Framing =
|
|
||||||
this match {
|
|
||||||
case f: akka.stream.scaladsl.Framing ⇒ f
|
|
||||||
case _ ⇒ new akka.stream.scaladsl.Framing {
|
|
||||||
override def flow = getFlow.asScala
|
|
||||||
}
|
|
||||||
}
|
|
||||||
def getFlow: Flow[ByteString, ByteString, NotUsed]
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -287,14 +287,3 @@ object Framing {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Wrapper around a framing Flow (as provided by [[Framing.delimiter]] for example.
|
|
||||||
* Used for providing a framing implicitly for other components which may need one (such as framed entity streaming in Akka HTTP).
|
|
||||||
*/
|
|
||||||
trait Framing extends akka.stream.javadsl.Framing {
|
|
||||||
final def asJava: akka.stream.javadsl.Framing = this
|
|
||||||
override final def getFlow = flow.asJava
|
|
||||||
|
|
||||||
def flow: Flow[ByteString, ByteString, NotUsed]
|
|
||||||
}
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue