Merge pull request #20778 from ktoso/revival-of-the-undead-json-streaming-of-doom-ktoso

+htp #18837 JSON framing and framed entity streaming directives
This commit is contained in:
Konrad Malawski 2016-08-02 15:59:58 +02:00 committed by GitHub
commit a712f0149a
43 changed files with 2323 additions and 61 deletions

View file

@ -91,7 +91,7 @@ object ConsistentHashingRouter {
* INTERNAL API
*/
private[akka] def hashMappingAdapter(mapper: ConsistentHashMapper): ConsistentHashMapping = {
case message if (mapper.hashKey(message).asInstanceOf[AnyRef] ne null)
case message if mapper.hashKey(message).asInstanceOf[AnyRef] ne null
mapper.hashKey(message)
}

View file

@ -0,0 +1,62 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import java.util.concurrent.TimeUnit
import akka.stream.impl.JsonObjectParser
import akka.util.ByteString
import org.openjdk.jmh.annotations._
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode(Array(Mode.Throughput))
class JsonFramingBenchmark {
/*
Benchmark Mode Cnt Score Error Units
// old
JsonFramingBenchmark.collecting_1 thrpt 20 81.476 ± 14.793 ops/s
JsonFramingBenchmark.collecting_offer_5 thrpt 20 20.187 ± 2.291 ops/s
// new
JsonFramingBenchmark.counting_1 thrpt 20 10766.738 ± 1278.300 ops/s
JsonFramingBenchmark.counting_offer_5 thrpt 20 28798.255 ± 2670.163 ops/s
*/
val json =
ByteString(
"""|{"fname":"Frank","name":"Smith","age":42,"id":1337,"boardMember":false},
|{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false},
|{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false},
|{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false},
|{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false},
|{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false},
|{"fname":"Hank","name":"Smith","age":42,"id":1337,"boardMember":false}""".stripMargin
)
val bracket = new JsonObjectParser
@Setup(Level.Invocation)
def init(): Unit = {
bracket.offer(json)
}
@Benchmark
def counting_1: ByteString =
bracket.poll().get
@Benchmark
@OperationsPerInvocation(5)
def counting_offer_5: ByteString = {
bracket.offer(json)
bracket.poll().get
bracket.poll().get
bracket.poll().get
bracket.poll().get
bracket.poll().get
bracket.poll().get
}
}

View file

@ -0,0 +1,179 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package docs.http.javadsl.server;
import akka.NotUsed;
import akka.http.javadsl.common.CsvEntityStreamingSupport;
import akka.http.javadsl.common.JsonEntityStreamingSupport;
import akka.http.javadsl.marshallers.jackson.Jackson;
import akka.http.javadsl.marshalling.Marshaller;
import akka.http.javadsl.model.*;
import akka.http.javadsl.model.headers.Accept;
import akka.http.javadsl.server.*;
import akka.http.javadsl.testkit.JUnitRouteTest;
import akka.http.javadsl.testkit.TestRoute;
import akka.http.javadsl.unmarshalling.StringUnmarshallers;
import akka.http.javadsl.common.EntityStreamingSupport;
import akka.http.javadsl.unmarshalling.Unmarshaller;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
import org.junit.Test;
import java.util.concurrent.CompletionStage;
public class JsonStreamingExamplesTest extends JUnitRouteTest {
//#routes
final Route tweets() {
//#formats
final Unmarshaller<ByteString, JavaTweet> JavaTweets = Jackson.byteStringUnmarshaller(JavaTweet.class);
//#formats
//#response-streaming
// Step 1: Enable JSON streaming
// we're not using this in the example, but it's the simplest way to start:
// The default rendering is a JSON array: `[el, el, el , ...]`
final JsonEntityStreamingSupport jsonStreaming = EntityStreamingSupport.json();
// Step 1.1: Enable and customise how we'll render the JSON, as a compact array:
final ByteString start = ByteString.fromString("[");
final ByteString between = ByteString.fromString(",");
final ByteString end = ByteString.fromString("]");
final Flow<ByteString, ByteString, NotUsed> compactArrayRendering =
Flow.of(ByteString.class).intersperse(start, between, end);
final JsonEntityStreamingSupport compactJsonSupport = EntityStreamingSupport.json()
.withFramingRendererFlow(compactArrayRendering);
// Step 2: implement the route
final Route responseStreaming = path("tweets", () ->
get(() ->
parameter(StringUnmarshallers.INTEGER, "n", n -> {
final Source<JavaTweet, NotUsed> tws =
Source.repeat(new JavaTweet(12, "Hello World!")).take(n);
// Step 3: call complete* with your source, marshaller, and stream rendering mode
return completeOKWithSource(tws, Jackson.marshaller(), compactJsonSupport);
})
)
);
//#response-streaming
//#incoming-request-streaming
final Route incomingStreaming = path("tweets", () ->
post(() ->
extractMaterializer(mat -> {
final JsonEntityStreamingSupport jsonSupport = EntityStreamingSupport.json();
return entityAsSourceOf(JavaTweets, jsonSupport, sourceOfTweets -> {
final CompletionStage<Integer> tweetsCount = sourceOfTweets.runFold(0, (acc, tweet) -> acc + 1, mat);
return onComplete(tweetsCount, c -> complete("Total number of tweets: " + c));
});
}
)
)
);
//#incoming-request-streaming
return responseStreaming.orElse(incomingStreaming);
}
final Route csvTweets() {
//#csv-example
final Marshaller<JavaTweet, ByteString> renderAsCsv =
Marshaller.withFixedContentType(ContentTypes.TEXT_CSV_UTF8, t ->
ByteString.fromString(t.getId() + "," + t.getMessage())
);
final CsvEntityStreamingSupport compactJsonSupport = EntityStreamingSupport.csv();
final Route responseStreaming = path("tweets", () ->
get(() ->
parameter(StringUnmarshallers.INTEGER, "n", n -> {
final Source<JavaTweet, NotUsed> tws =
Source.repeat(new JavaTweet(12, "Hello World!")).take(n);
return completeWithSource(tws, renderAsCsv, compactJsonSupport);
})
)
);
//#csv-example
return responseStreaming;
}
//#routes
@Test
public void getTweetsTest() {
//#response-streaming
// tests:
final TestRoute routes = testRoute(tweets());
// test happy path
final Accept acceptApplication = Accept.create(MediaRanges.create(MediaTypes.APPLICATION_JSON));
routes.run(HttpRequest.GET("/tweets?n=2").addHeader(acceptApplication))
.assertStatusCode(200)
.assertEntity("[{\"id\":12,\"message\":\"Hello World!\"},{\"id\":12,\"message\":\"Hello World!\"}]");
// test responses to potential errors
final Accept acceptText = Accept.create(MediaRanges.ALL_TEXT);
routes.run(HttpRequest.GET("/tweets?n=3").addHeader(acceptText))
.assertStatusCode(StatusCodes.NOT_ACCEPTABLE) // 406
.assertEntity("Resource representation is only available with these types:\napplication/json");
//#response-streaming
}
@Test
public void csvExampleTweetsTest() {
//#response-streaming
// tests --------------------------------------------
final TestRoute routes = testRoute(csvTweets());
// test happy path
final Accept acceptCsv = Accept.create(MediaRanges.create(MediaTypes.TEXT_CSV));
routes.run(HttpRequest.GET("/tweets?n=2").addHeader(acceptCsv))
.assertStatusCode(200)
.assertEntity("12,Hello World!\n" +
"12,Hello World!");
// test responses to potential errors
final Accept acceptText = Accept.create(MediaRanges.ALL_APPLICATION);
routes.run(HttpRequest.GET("/tweets?n=3").addHeader(acceptText))
.assertStatusCode(StatusCodes.NOT_ACCEPTABLE) // 406
.assertEntity("Resource representation is only available with these types:\ntext/csv; charset=UTF-8");
//#response-streaming
}
//#models
private static final class JavaTweet {
private int id;
private String message;
public JavaTweet(int id, String message) {
this.id = id;
this.message = message;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public void setMessage(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
}
//#models
}

View file

@ -18,6 +18,7 @@ To use the high-level API you need to add a dependency to the ``akka-http-experi
directives/index
marshalling
exception-handling
source-streaming-support
rejections
testkit
@ -51,7 +52,6 @@ in the :ref:`exception-handling-java` section of the documtnation. You can use t
File uploads
^^^^^^^^^^^^
TODO not possible in Java DSL since there
For high level directives to handle uploads see the :ref:`FileUploadDirectives-java`.

View file

@ -0,0 +1,91 @@
.. _json-streaming-java:
Source Streaming
================
Akka HTTP supports completing a request with an Akka ``Source<T, _>``, which makes it possible to easily build
and consume streaming end-to-end APIs which apply back-pressure throughout the entire stack.
It is possible to complete requests with raw ``Source<ByteString, _>``, however often it is more convenient to
stream on an element-by-element basis, and allow Akka HTTP to handle the rendering internally - for example as a JSON array,
or CSV stream (where each element is separated by a new-line).
In the following sections we investigate how to make use of the JSON Streaming infrastructure,
however the general hints apply to any kind of element-by-element streaming you could imagine.
JSON Streaming
==============
`JSON Streaming`_ is a term refering to streaming a (possibly infinite) stream of element as independent JSON
objects as a continuous HTTP request or response. The elements are most often separated using newlines,
however do not have to be. Concatenating elements side-by-side or emitting "very long" JSON array is also another
use case.
In the below examples, we'll be refering to the ``Tweet`` and ``Measurement`` case classes as our model, which are defined as:
.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#models
.. _Json Streaming: https://en.wikipedia.org/wiki/JSON_Streaming
Responding with JSON Streams
----------------------------
In this example we implement an API representing an infinite stream of tweets, very much like Twitter's `Streaming API`_.
Firstly, we'll need to get some additional marshalling infrastructure set up, that is able to marshal to and from an
Akka Streams ``Source<T,_>``. Here we'll use the ``Jackson`` helper class from ``akka-http-jackson`` (a separate library
that you should add as a dependency if you want to use Jackson with Akka HTTP).
First we enable JSON Streaming by making an implicit ``EntityStreamingSupport`` instance available (Step 1).
The default mode of rendering a ``Source`` is to represent it as an JSON Array. If you want to change this representation
for example to use Twitter style new-line separated JSON objects, you can do so by configuring the support trait accordingly.
In Step 1.1. we demonstrate to configure configude the rendering to be new-line separated, and also how parallel marshalling
can be applied. We configure the Support object to render the JSON as series of new-line separated JSON objects,
simply by providing the ``start``, ``sep`` and ``end`` ByteStrings, which will be emitted at the apropriate
places in the rendered stream. Although this format is *not* valid JSON, it is pretty popular since parsing it is relatively
simple - clients need only to find the new-lines and apply JSON unmarshalling for an entire line of JSON.
The final step is simply completing a request using a Source of tweets, as simple as that:
.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#response-streaming
.. _Streaming API: https://dev.twitter.com/streaming/overview
Consuming JSON Streaming uploads
--------------------------------
Sometimes the client may be sending a streaming request, for example an embedded device initiated a connection with
the server and is feeding it with one line of measurement data.
In this example, we want to consume this data in a streaming fashion from the request entity, and also apply
back-pressure to the underlying TCP connection, if the server can not cope with the rate of incoming data (back-pressure
will be applied automatically thanks to using Akka HTTP/Streams).
.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#formats
.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#incoming-request-streaming
Simple CSV streaming example
----------------------------
Akka HTTP provides another ``EntityStreamingSupport`` out of the box, namely ``csv`` (comma-separated values).
For completeness, we demonstrate its usage in the below snippet. As you'll notice, switching betweeen streaming
modes is fairly simple, one only has to make sure that an implicit ``Marshaller`` of the requested type is available,
and that the streaming support operates on the same ``Content-Type`` as the rendered values. Otherwise you'll see
an error during runtime that the marshaller did not expose the expected content type and thus we can not render
the streaming response).
.. includecode:: ../../code/docs/http/javadsl/server/JsonStreamingExamplesTest.java#csv-example
Implementing custom EntityStreamingSupport traits
-------------------------------------------------
The ``EntityStreamingSupport`` infrastructure is open for extension and not bound to any single format, content type
or marshalling library. The provided JSON support does not rely on Spray JSON directly, but uses ``Marshaller<T, ByteString>``
instances, which can be provided using any JSON marshalling library (such as Circe, Jawn or Play JSON).
When implementing a custom support trait, one should simply extend the ``EntityStreamingSupport`` abstract class,
and implement all of it's methods. It's best to use the existing implementations as a guideline.

View file

@ -25,7 +25,7 @@ lies in interfacing between private sphere and the public, but you dont want
that many doors inside your house, do you? For a longer discussion see `this
blog post <http://letitcrash.com/post/19074284309/when-to-use-typedactors>`_.
A bit more background: TypedActors can very easily be abused as RPC, and that
A bit more background: TypedActors can easily be abused as RPC, and that
is an abstraction which is `well-known
<http://doc.akka.io/docs/misc/smli_tr-94-29.pdf>`_
to be leaky. Hence TypedActors are not what we think of first when we talk

View file

@ -0,0 +1,228 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.http.scaladsl.server.directives
import akka.NotUsed
import akka.http.scaladsl.common.{ EntityStreamingSupport, JsonEntityStreamingSupport }
import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.Accept
import akka.http.scaladsl.server.{ UnacceptedResponseContentTypeRejection, UnsupportedRequestContentTypeRejection }
import akka.stream.scaladsl.{ Flow, Source }
import akka.util.ByteString
import docs.http.scaladsl.server.RoutingSpec
import spray.json.JsValue
import scala.concurrent.Future
class JsonStreamingExamplesSpec extends RoutingSpec {
//#models
case class Tweet(uid: Int, txt: String)
case class Measurement(id: String, value: Int)
//#
def getTweets() =
Source(List(
Tweet(1, "#Akka rocks!"),
Tweet(2, "Streaming is so hot right now!"),
Tweet(3, "You cannot enter the same river twice.")))
//#formats
object MyJsonProtocol
extends akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
with spray.json.DefaultJsonProtocol {
implicit val tweetFormat = jsonFormat2(Tweet.apply)
implicit val measurementFormat = jsonFormat2(Measurement.apply)
}
//#
"spray-json-response-streaming" in {
// [1] import "my protocol", for marshalling Tweet objects:
import MyJsonProtocol._
// [2] pick a Source rendering support trait:
// Note that the default support renders the Source as JSON Array
implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()
val route =
path("tweets") {
// [3] simply complete a request with a source of tweets:
val tweets: Source[Tweet, NotUsed] = getTweets()
complete(tweets)
}
// tests ------------------------------------------------------------
val AcceptJson = Accept(MediaRange(MediaTypes.`application/json`))
val AcceptXml = Accept(MediaRange(MediaTypes.`text/xml`))
Get("/tweets").withHeaders(AcceptJson) ~> route ~> check {
responseAs[String] shouldEqual
"""[""" +
"""{"uid":1,"txt":"#Akka rocks!"},""" +
"""{"uid":2,"txt":"Streaming is so hot right now!"},""" +
"""{"uid":3,"txt":"You cannot enter the same river twice."}""" +
"""]"""
}
// endpoint can only marshal Json, so it will *reject* requests for application/xml:
Get("/tweets").withHeaders(AcceptXml) ~> route ~> check {
handled should ===(false)
rejection should ===(UnacceptedResponseContentTypeRejection(Set(ContentTypes.`application/json`)))
}
}
"line-by-line-json-response-streaming" in {
import MyJsonProtocol._
// Configure the EntityStreamingSupport to render the elements as:
// {"example":42}
// {"example":43}
// ...
// {"example":1000}
val start = ByteString.empty
val sep = ByteString("\n")
val end = ByteString.empty
implicit val jsonStreamingSupport = EntityStreamingSupport.json()
.withFramingRenderer(Flow[ByteString].intersperse(start, sep, end))
val route =
path("tweets") {
// [3] simply complete a request with a source of tweets:
val tweets: Source[Tweet, NotUsed] = getTweets()
complete(tweets)
}
// tests ------------------------------------------------------------
val AcceptJson = Accept(MediaRange(MediaTypes.`application/json`))
Get("/tweets").withHeaders(AcceptJson) ~> route ~> check {
responseAs[String] shouldEqual
"""{"uid":1,"txt":"#Akka rocks!"}""" + "\n" +
"""{"uid":2,"txt":"Streaming is so hot right now!"}""" + "\n" +
"""{"uid":3,"txt":"You cannot enter the same river twice."}"""
}
}
"csv-example" in {
// [1] provide a marshaller to ByteString
implicit val tweetAsCsv = Marshaller.strict[Tweet, ByteString] { t =>
Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () => {
val txt = t.txt.replaceAll(",", ".")
val uid = t.uid
ByteString(List(uid, txt).mkString(","))
})
}
// [2] enable csv streaming:
implicit val csvStreaming = EntityStreamingSupport.csv()
val route =
path("tweets") {
val tweets: Source[Tweet, NotUsed] = getTweets()
complete(tweets)
}
// tests ------------------------------------------------------------
val AcceptCsv = Accept(MediaRange(MediaTypes.`text/csv`))
Get("/tweets").withHeaders(AcceptCsv) ~> route ~> check {
responseAs[String] shouldEqual
"""|1,#Akka rocks!
|2,Streaming is so hot right now!
|3,You cannot enter the same river twice."""
.stripMargin
}
}
"response-streaming-modes" in {
{
//#async-rendering
import MyJsonProtocol._
implicit val jsonStreamingSupport: JsonEntityStreamingSupport =
EntityStreamingSupport.json()
.withParallelMarshalling(parallelism = 8, unordered = false)
path("tweets") {
val tweets: Source[Tweet, NotUsed] = getTweets()
complete(tweets)
}
//#
}
{
//#async-unordered-rendering
import MyJsonProtocol._
implicit val jsonStreamingSupport: JsonEntityStreamingSupport =
EntityStreamingSupport.json()
.withParallelMarshalling(parallelism = 8, unordered = true)
path("tweets" / "unordered") {
val tweets: Source[Tweet, NotUsed] = getTweets()
complete(tweets)
}
//#
}
}
"spray-json-request-streaming" in {
// [1] import "my protocol", for unmarshalling Measurement objects:
import MyJsonProtocol._
// [2] enable Json Streaming
implicit val jsonStreamingSupport = EntityStreamingSupport.json()
// prepare your persisting logic here
val persistMetrics = Flow[Measurement]
val route =
path("metrics") {
// [3] extract Source[Measurement, _]
entity(asSourceOf[Measurement]) { measurements =>
// alternative syntax:
// entity(as[Source[Measurement, NotUsed]]) { measurements =>
val measurementsSubmitted: Future[Int] =
measurements
.via(persistMetrics)
.runFold(0) { (cnt, _) => cnt + 1 }
complete {
measurementsSubmitted.map(n => Map("msg" -> s"""Total metrics received: $n"""))
}
}
}
// tests ------------------------------------------------------------
// uploading an array or newline separated values works out of the box
val data = HttpEntity(
ContentTypes.`application/json`,
"""
|{"id":"temp","value":32}
|{"id":"temp","value":31}
|
""".stripMargin)
Post("/metrics", entity = data) ~> route ~> check {
status should ===(StatusCodes.OK)
responseAs[String] should ===("""{"msg":"Total metrics received: 2"}""")
}
// the FramingWithContentType will reject any content type that it does not understand:
val xmlData = HttpEntity(
ContentTypes.`text/xml(UTF-8)`,
"""|<data id="temp" value="32"/>
|<data id="temp" value="31"/>""".stripMargin)
Post("/metrics", entity = xmlData) ~> route ~> check {
handled should ===(false)
rejection should ===(UnsupportedRequestContentTypeRejection(Set(ContentTypes.`application/json`)))
}
}
}

View file

@ -71,8 +71,8 @@ class MarshallingDirectivesExamplesSpec extends RoutingSpec {
// tests:
Get("/") ~> route ~> check {
mediaType shouldEqual `application/json`
responseAs[String] should include(""""name": "Jane"""")
responseAs[String] should include(""""favoriteNumber": 42""")
responseAs[String] should include(""""name":"Jane"""")
responseAs[String] should include(""""favoriteNumber":42""")
}
}
@ -95,8 +95,8 @@ class MarshallingDirectivesExamplesSpec extends RoutingSpec {
Post("/", HttpEntity(`application/json`, """{ "name": "Jane", "favoriteNumber" : 42 }""")) ~>
route ~> check {
mediaType shouldEqual `application/json`
responseAs[String] should include(""""name": "Jane"""")
responseAs[String] should include(""""favoriteNumber": 42""")
responseAs[String] should include(""""name":"Jane"""")
responseAs[String] should include(""""favoriteNumber":42""")
}
}
}

View file

@ -224,4 +224,4 @@ When you combine directives producing extractions with the ``&`` operator all ex
Directives offer a great way of constructing your web service logic from small building blocks in a plug and play
fashion while maintaining DRYness and full type-safety. If the large range of :ref:`Predefined Directives` does not
fully satisfy your needs you can also very easily create :ref:`Custom Directives`.
fully satisfy your needs you can also easily create :ref:`Custom Directives`.

View file

@ -23,6 +23,7 @@ static content serving.
exception-handling
path-matchers
case-class-extraction
source-streaming-support
testkit
websocket-support

View file

@ -0,0 +1,136 @@
.. _json-streaming-scala:
Source Streaming
================
Akka HTTP supports completing a request with an Akka ``Source[T, _]``, which makes it possible to easily build
and consume streaming end-to-end APIs which apply back-pressure throughout the entire stack.
It is possible to complete requests with raw ``Source[ByteString, _]``, however often it is more convenient to
stream on an element-by-element basis, and allow Akka HTTP to handle the rendering internally - for example as a JSON array,
or CSV stream (where each element is separated by a new-line).
In the following sections we investigate how to make use of the JSON Streaming infrastructure,
however the general hints apply to any kind of element-by-element streaming you could imagine.
JSON Streaming
==============
`JSON Streaming`_ is a term refering to streaming a (possibly infinite) stream of element as independent JSON
objects as a continuous HTTP request or response. The elements are most often separated using newlines,
however do not have to be. Concatenating elements side-by-side or emitting "very long" JSON array is also another
use case.
In the below examples, we'll be refering to the ``Tweet`` and ``Measurement`` case classes as our model, which are defined as:
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
:snippet: models
And as always with spray-json, we provide our (Un)Marshaller instances as implicit values uding the ``jsonFormat##``
method to generate them statically:
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
:snippet: formats
.. _Json Streaming: https://en.wikipedia.org/wiki/JSON_Streaming
Responding with JSON Streams
----------------------------
In this example we implement an API representing an infinite stream of tweets, very much like Twitter's `Streaming API`_.
Firstly, we'll need to get some additional marshalling infrastructure set up, that is able to marshal to and from an
Akka Streams ``Source[T,_]``. One such trait, containing the needed marshallers is ``SprayJsonSupport``, which uses
spray-json (a high performance json parser library), and is shipped as part of Akka HTTP in the
``akka-http-spray-json-experimental`` module.
Once the general infrastructure is prepared we import our model's marshallers, generated by spray-json (Step 1),
and enable JSON Streaming by making an implicit ``EntityStreamingSupport`` instance available (Step 2).
Akka HTTP pre-packages JSON and CSV entity streaming support, however it is simple to add your own, in case you'd
like to stream a different content type (for example plists or protobuf).
The final step is simply completing a request using a Source of tweets, as simple as that:
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
:snippet: spray-json-response-streaming
The reason the ``EntityStreamingSupport`` has to be enabled explicitly is that one might want to configure how the
stream should be rendered. We'll dicuss this in depth in the next section though.
.. _Streaming API: https://dev.twitter.com/streaming/overview
Customising response rendering mode
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Since it is not always possible to directly and confidently answer the question of how a stream of ``T`` should look on
the wire, the ``EntityStreamingSupport`` traits come into play and allow fine-tuning the streams rendered representation.
For example, in case of JSON Streaming, there isn't really one standard about rendering the response. Some APIs prefer
to render multiple JSON objects in a line-by-line fashion (Twitter's streaming APIs for example), while others simply return
very large arrays, which could be streamed as well.
Akka defaults to the second one (streaming a JSON Array), as it is correct JSON and clients not expecting
a streaming API would still be able to consume it in a naive way if they'd want to.
The line-by-line aproach however is also pretty popular even though it is not valid JSON. It's relatively simplicity for
client-side parsing is a strong point in case to pick this format for your Streaming APIs.
Below we demonstrate how to reconfigure the support trait to render the JSON as
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
:snippet: line-by-line-json-response-streaming
Another interesting feature is parallel marshalling. Since marshalling can potentially take much time,
it is possible to marshal multiple elements of the stream in parallel. This is simply a configuration
option on ``EntityStreamingSupport`` and is configurable like this:
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
:snippet: async-rendering
The above shown mode perserves ordering of the Source's elements, which may sometimes be a required property,
for example when streaming a strictly ordered dataset. Sometimes the contept of strict-order does not apply to the
data being streamed though, which allows us to exploit this property and use an ``unordered`` rendering.
This also is a configuration option and is used as shown below. Effectively this will allow Akka's marshalling infrastructure
to concurrently marshallup to ``parallelism`` elements and emit the first which is marshalled onto the ``HttpResponse``:
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
:snippet: async-unordered-rendering
This allows us to _potentially_ render elements faster onto the HttpResponse, since it can avoid "head of line blocking",
in case one element in front of the stream takes a long time to marshall, yet others after it are very quick to marshall.
Consuming JSON Streaming uploads
--------------------------------
Sometimes the client may be sending a streaming request, for example an embedded device initiated a connection with
the server and is feeding it with one line of measurement data.
In this example, we want to consume this data in a streaming fashion from the request entity, and also apply
back-pressure to the underlying TCP connection, if the server can not cope with the rate of incoming data (back-pressure
will be applied automatically thanks to using Akka HTTP/Streams).
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
:snippet: spray-json-request-streaming
Simple CSV streaming example
----------------------------
Akka HTTP provides another ``EntityStreamingSupport`` out of the box, namely ``csv`` (comma-separated values).
For completeness, we demonstrate its usage in the below snippet. As you'll notice, switching betweeen streaming
modes is fairly simple, one only has to make sure that an implicit ``Marshaller`` of the requested type is available,
and that the streaming support operates on the same ``Content-Type`` as the rendered values. Otherwise you'll see
an error during runtime that the marshaller did not expose the expected content type and thus we can not render
the streaming response).
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
:snippet: csv-example
Implementing custom EntityStreamingSupport traits
-------------------------------------------------
The ``EntityStreamingSupport`` infrastructure is open for extension and not bound to any single format, content type
or marshalling library. The provided JSON support does not rely on Spray JSON directly, but uses ``Marshaller[T, ByteString]``
instances, which can be provided using any JSON marshalling library (such as Circe, Jawn or Play JSON).
When implementing a custom support trait, one should simply extend the ``EntityStreamingSupport`` abstract class,
and implement all of it's methods. It's best to use the existing implementations as a guideline.

View file

@ -35,7 +35,7 @@ lies in interfacing between private sphere and the public, but you dont want
that many doors inside your house, do you? For a longer discussion see `this
blog post <http://letitcrash.com/post/19074284309/when-to-use-typedactors>`_.
A bit more background: TypedActors can very easily be abused as RPC, and that
A bit more background: TypedActors can easily be abused as RPC, and that
is an abstraction which is `well-known
<http://doc.akka.io/docs/misc/smli_tr-94-29.pdf>`_
to be leaky. Hence TypedActors are not what we think of first when we talk

View file

@ -25,6 +25,9 @@ public final class ContentTypes {
public static final ContentType.WithCharset TEXT_XML_UTF8 =
akka.http.scaladsl.model.ContentTypes.text$divxml$u0028UTF$minus8$u0029();
public static final ContentType.WithCharset TEXT_CSV_UTF8 =
akka.http.scaladsl.model.ContentTypes.text$divcsv$u0028UTF$minus8$u0029();
public static ContentType.Binary create(MediaType.Binary mediaType) {
return ContentType$.MODULE$.apply((akka.http.scaladsl.model.MediaType.Binary) mediaType);
}

View file

@ -11,6 +11,7 @@ import akka.http.javadsl.model.RequestEntity;
import akka.http.javadsl.marshalling.Marshaller;
import akka.http.javadsl.unmarshalling.Unmarshaller;
import akka.util.ByteString;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -31,6 +32,10 @@ public class Jackson {
);
}
public static <T> Unmarshaller<ByteString, T> byteStringUnmarshaller(Class<T> expectedType) {
return byteStringUnmarshaller(defaultObjectMapper, expectedType);
}
public static <T> Unmarshaller<HttpEntity, T> unmarshaller(Class<T> expectedType) {
return unmarshaller(defaultObjectMapper, expectedType);
}
@ -39,6 +44,10 @@ public class Jackson {
return Unmarshaller.forMediaType(MediaTypes.APPLICATION_JSON, Unmarshaller.entityToString())
.thenApply(s -> fromJSON(mapper, s, expectedType));
}
public static <T> Unmarshaller<ByteString, T> byteStringUnmarshaller(ObjectMapper mapper, Class<T> expectedType) {
return Unmarshaller.sync(s -> fromJSON(mapper, s.utf8String(), expectedType));
}
private static String toJSON(ObjectMapper mapper, Object object) {
try {

View file

@ -0,0 +1,79 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.scaladsl.marshallers.sprayjson
import java.nio.{ ByteBuffer, CharBuffer }
import java.nio.charset.{ Charset, StandardCharsets }
import akka.util.ByteString
import spray.json.ParserInput.DefaultParserInput
import scala.annotation.tailrec
/**
* ParserInput reading directly off a ByteString. (Based on the ByteArrayBasedParserInput)
* This avoids a separate decoding step but assumes that each byte represents exactly one character,
* which is encoded by ISO-8859-1!
* You can therefore use this ParserInput type only if you know that all input will be `ISO-8859-1`-encoded,
* or only contains 7-bit ASCII characters (which is a subset of ISO-8859-1)!
*
* Note that this ParserInput type will NOT work with general `UTF-8`-encoded input as this can contain
* character representations spanning multiple bytes. However, if you know that your input will only ever contain
* 7-bit ASCII characters (0x00-0x7F) then UTF-8 is fine, since the first 127 UTF-8 characters are
* encoded with only one byte that is identical to 7-bit ASCII and ISO-8859-1.
*/
final class SprayJsonByteStringParserInput(bytes: ByteString) extends DefaultParserInput {
import SprayJsonByteStringParserInput._
private[this] val byteBuffer = ByteBuffer.allocate(4)
private[this] val charBuffer = CharBuffer.allocate(1)
private[this] val decoder = Charset.forName("UTF-8").newDecoder()
override def nextChar() = {
_cursor += 1
if (_cursor < bytes.length) (bytes(_cursor) & 0xFF).toChar else EOI
}
override def nextUtf8Char() = {
@tailrec def decode(byte: Byte, remainingBytes: Int): Char = {
byteBuffer.put(byte)
if (remainingBytes > 0) {
_cursor += 1
if (_cursor < bytes.length) decode(bytes(_cursor), remainingBytes - 1) else ErrorChar
} else {
byteBuffer.flip()
val coderResult = decoder.decode(byteBuffer, charBuffer, false)
charBuffer.flip()
val result = if (coderResult.isUnderflow & charBuffer.hasRemaining) charBuffer.get() else ErrorChar
byteBuffer.clear()
charBuffer.clear()
result
}
}
_cursor += 1
if (_cursor < bytes.length) {
val byte = bytes(_cursor)
if (byte >= 0) byte.toChar // 7-Bit ASCII
else if ((byte & 0xE0) == 0xC0) decode(byte, 1) // 2-byte UTF-8 sequence
else if ((byte & 0xF0) == 0xE0) decode(byte, 2) // 3-byte UTF-8 sequence
else if ((byte & 0xF8) == 0xF0) decode(byte, 3) // 4-byte UTF-8 sequence, will probably produce an (unsupported) surrogate pair
else ErrorChar
} else EOI
}
override def length: Int = bytes.size
override def sliceString(start: Int, end: Int): String =
bytes.slice(start, end - start).decodeString(StandardCharsets.ISO_8859_1)
override def sliceCharArray(start: Int, end: Int): Array[Char] =
StandardCharsets.ISO_8859_1.decode(bytes.slice(start, end).asByteBuffer).array()
}
object SprayJsonByteStringParserInput {
private final val EOI = '\uFFFF'
// compile-time constant
private final val ErrorChar = '\uFFFD' // compile-time constant, universal UTF-8 replacement character '<27>'
}

View file

@ -4,13 +4,19 @@
package akka.http.scaladsl.marshallers.sprayjson
import scala.language.implicitConversions
import akka.http.scaladsl.marshalling.{ ToEntityMarshaller, Marshaller }
import akka.http.scaladsl.unmarshalling.{ FromEntityUnmarshaller, Unmarshaller }
import akka.http.scaladsl.model.{ MediaTypes, HttpCharsets }
import akka.NotUsed
import akka.http.scaladsl.common.EntityStreamingSupport
import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.model.MediaTypes.`application/json`
import akka.http.scaladsl.model.{ HttpCharsets, MediaTypes }
import akka.http.scaladsl.unmarshalling.{ FromEntityUnmarshaller, FromRequestUnmarshaller, Unmarshaller }
import akka.http.scaladsl.util.FastFuture
import akka.stream.scaladsl.{ Flow, Keep, Source }
import akka.util.ByteString
import spray.json._
import scala.language.implicitConversions
/**
* A trait providing automatic to and from JSON marshalling/unmarshalling using an in-scope *spray-json* protocol.
*/
@ -19,19 +25,41 @@ trait SprayJsonSupport {
sprayJsonUnmarshaller(reader)
implicit def sprayJsonUnmarshaller[T](implicit reader: RootJsonReader[T]): FromEntityUnmarshaller[T] =
sprayJsValueUnmarshaller.map(jsonReader[T].read)
implicit def sprayJsonByteStringUnmarshaller[T](implicit reader: RootJsonReader[T]): Unmarshaller[ByteString, T] =
Unmarshaller.withMaterializer[ByteString, JsValue](_ implicit mat { bs
// .compact so addressing into any address is very fast (also for large chunks)
// TODO we could optimise ByteStrings to better handle lienear access like this (or provide ByteStrings.linearAccessOptimised)
// TODO IF it's worth it.
val parserInput = new SprayJsonByteStringParserInput(bs.compact)
FastFuture.successful(JsonParser(parserInput))
}).map(jsonReader[T].read)
implicit def sprayJsValueUnmarshaller: FromEntityUnmarshaller[JsValue] =
Unmarshaller.byteStringUnmarshaller.forContentTypes(`application/json`).mapWithCharset { (data, charset)
val input =
if (charset == HttpCharsets.`UTF-8`) ParserInput(data.toArray)
else ParserInput(data.decodeString(charset.nioCharset.name)) // FIXME: identify charset by instance, not by name!
else ParserInput(data.decodeString(charset.nioCharset))
JsonParser(input)
}
implicit def sprayJsonMarshallerConverter[T](writer: RootJsonWriter[T])(implicit printer: JsonPrinter = PrettyPrinter): ToEntityMarshaller[T] =
implicit def sprayJsonMarshallerConverter[T](writer: RootJsonWriter[T])(implicit printer: JsonPrinter = CompactPrinter): ToEntityMarshaller[T] =
sprayJsonMarshaller[T](writer, printer)
implicit def sprayJsonMarshaller[T](implicit writer: RootJsonWriter[T], printer: JsonPrinter = PrettyPrinter): ToEntityMarshaller[T] =
implicit def sprayJsonMarshaller[T](implicit writer: RootJsonWriter[T], printer: JsonPrinter = CompactPrinter): ToEntityMarshaller[T] =
sprayJsValueMarshaller compose writer.write
implicit def sprayJsValueMarshaller(implicit printer: JsonPrinter = PrettyPrinter): ToEntityMarshaller[JsValue] =
implicit def sprayJsValueMarshaller(implicit printer: JsonPrinter = CompactPrinter): ToEntityMarshaller[JsValue] =
Marshaller.StringMarshaller.wrap(MediaTypes.`application/json`)(printer)
// support for as[Source[T, NotUsed]]
implicit def sprayJsonSourceReader[T](implicit rootJsonReader: RootJsonReader[T], support: EntityStreamingSupport): FromRequestUnmarshaller[Source[T, NotUsed]] =
Unmarshaller.withMaterializer { implicit ec implicit mat r
if (support.supported.matches(r.entity.contentType)) {
val bytes = r.entity.dataBytes
val frames = bytes.via(support.framingDecoder)
val unmarshalling =
if (support.unordered) Flow[ByteString].mapAsyncUnordered(support.parallelism)(bs sprayJsonByteStringUnmarshaller(rootJsonReader)(bs))
else Flow[ByteString].mapAsync(support.parallelism)(bs sprayJsonByteStringUnmarshaller(rootJsonReader)(bs))
val elements = frames.viaMat(unmarshalling)(Keep.right)
FastFuture.successful(elements)
} else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(support.supported))
}
}
object SprayJsonSupport extends SprayJsonSupport
object SprayJsonSupport extends SprayJsonSupport

View file

@ -3,15 +3,22 @@
*/
package akka.http.javadsl.server;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.Http;
import akka.http.javadsl.ServerBinding;
import akka.http.javadsl.common.EntityStreamingSupport;
import akka.http.javadsl.marshallers.jackson.Jackson;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.model.StatusCodes;
import akka.http.javadsl.unmarshalling.StringUnmarshallers;
import akka.http.javadsl.unmarshalling.Unmarshaller;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
import scala.concurrent.duration.Duration;
import scala.runtime.BoxedUnit;
@ -55,18 +62,41 @@ public class JavaTestServer extends AllDirectives { // or import static Directiv
);
final Route crash = path("crash", () ->
path("scala", () -> completeOKWithFutureString(akka.dispatch.Futures.<String>failed(new Exception("Boom!")))).orElse(
path("java", () -> completeOKWithFutureString(CompletableFuture.<String>supplyAsync(() -> { throw new RuntimeException("Boom!"); }))))
path("scala", () -> completeOKWithFutureString(akka.dispatch.Futures.failed(new Exception("Boom!")))).orElse(
path("java", () -> completeOKWithFutureString(CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Boom!"); }))))
);
final Unmarshaller<ByteString, JavaTweet> JavaTweets = Jackson.byteStringUnmarshaller(JavaTweet.class);
final Route tweets = path("tweets", () ->
get(() ->
parameter(StringUnmarshallers.INTEGER, "n", n -> {
final Source<JavaTweet, NotUsed> tws = Source.repeat(new JavaTweet("Hello World!")).take(n);
return completeOKWithSource(tws, Jackson.marshaller(), EntityStreamingSupport.json());
})
).orElse(
post(() ->
extractMaterializer(mat ->
entityAsSourceOf(JavaTweets, null, sourceOfTweets -> {
final CompletionStage<Integer> tweetsCount = sourceOfTweets.runFold(0, (acc, tweet) -> acc + 1, mat);
return onComplete(tweetsCount, c -> complete("Total number of tweets: " + c));
})
)
))
);
final Route inner = path("inner", () ->
getFromResourceDirectory("someDir")
);
return get(() ->
index.orElse(secure).orElse(ping).orElse(crash).orElse(inner).orElse(requestTimeout)
);
return index
.orElse(secure)
.orElse(ping)
.orElse(crash)
.orElse(inner)
.orElse(requestTimeout)
.orElse(tweets)
;
}
private void silentSleep(int millis) {
@ -113,7 +143,7 @@ public class JavaTestServer extends AllDirectives { // or import static Directiv
final Flow<HttpRequest, HttpResponse, ?> flow = createRoute().flow(system, mat);
final CompletionStage<ServerBinding> binding =
Http.get(system).bindAndHandle(flow, ConnectHttp.toHost("127.0.0.1"), mat);
Http.get(system).bindAndHandle(flow, ConnectHttp.toHost("127.0.0.1", 8080), mat);
System.console().readLine("Press [ENTER] to quit...");
shutdown(binding);
@ -131,4 +161,21 @@ public class JavaTestServer extends AllDirectives { // or import static Directiv
}
});
}
private static final class JavaTweet {
private String message;
public JavaTweet(String message) {
this.message = message;
}
public void setMessage(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
}
}

View file

@ -18,6 +18,7 @@ import scala.concurrent.Await
private[akka] trait IntegrationRoutingSpec extends WordSpecLike with Matchers with BeforeAndAfterAll
with Directives with RequestBuilding
with ScalaFutures with IntegrationPatience {
import IntegrationRoutingSpec._
implicit val system = ActorSystem(AkkaSpec.getCallerName(getClass))
implicit val mat = ActorMaterializer()
@ -31,8 +32,6 @@ private[akka] trait IntegrationRoutingSpec extends WordSpecLike with Matchers wi
def ~!>(route: Route) = new Prepped(request, route)
}
final case class Prepped(request: HttpRequest, route: Route)
implicit class Checking(p: Prepped) {
def ~!>(checking: HttpResponse Unit) = {
val (_, host, port) = TestUtils.temporaryServerHostnameAndPort()
@ -47,3 +46,7 @@ private[akka] trait IntegrationRoutingSpec extends WordSpecLike with Matchers wi
}
}
object IntegrationRoutingSpec {
final case class Prepped(request: HttpRequest, route: Route)
}

View file

@ -4,14 +4,19 @@
package akka.http.scaladsl.server
import akka.NotUsed
import akka.http.scaladsl.marshallers.xml.ScalaXmlSupport
import akka.http.scaladsl.model.{ StatusCodes, HttpResponse }
import akka.http.scaladsl.model.{ HttpResponse, StatusCodes }
import akka.http.scaladsl.server.directives.Credentials
import com.typesafe.config.{ ConfigFactory, Config }
import com.typesafe.config.{ Config, ConfigFactory }
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.common.EntityStreamingSupport
import akka.http.scaladsl.marshalling.ToResponseMarshallable
import spray.json.RootJsonReader
import scala.concurrent.duration._
import scala.io.StdIn
@ -21,10 +26,18 @@ object TestServer extends App {
akka.log-dead-letters = off
akka.stream.materializer.debug.fuzzing-mode = off
""")
implicit val system = ActorSystem("ServerTest", testConf)
import system.dispatcher
implicit val materializer = ActorMaterializer()
import spray.json.DefaultJsonProtocol._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
final case class Tweet(message: String)
implicit val tweetFormat = jsonFormat1(Tweet)
implicit val jsonStreaming = EntityStreamingSupport.json()
import ScalaXmlSupport._
import Directives._
@ -32,7 +45,8 @@ object TestServer extends App {
case p @ Credentials.Provided(name) if p.verify(name + "-password") name
}
val bindingFuture = Http().bindAndHandle({
// format: OFF
val routes = {
get {
path("") {
withRequestTimeout(1.milli, _ HttpResponse(
@ -42,21 +56,49 @@ object TestServer extends App {
complete(index)
}
} ~
path("secure") {
authenticateBasicPF("My very secure site", auth) { user
complete(<html><body>Hello <b>{ user }</b>. Access has been granted!</body></html>)
path("secure") {
authenticateBasicPF("My very secure site", auth) { user
complete(<html> <body> Hello <b>{user}</b>. Access has been granted! </body> </html>)
}
} ~
path("ping") {
complete("PONG!")
} ~
path("crash") {
complete(sys.error("BOOM!"))
} ~
path("tweet") {
complete(Tweet("Hello, world!"))
} ~
(path("tweets") & parameter('n.as[Int])) { n =>
get {
val tweets = Source.repeat(Tweet("Hello, world!")).take(n)
complete(tweets)
} ~
post {
entity(asSourceOf[Tweet]) { tweets
onComplete(tweets.runFold(0)({ case (acc, t) => acc + 1 })) { count =>
complete(s"Total tweets received: " + count)
}
}
} ~
path("ping") {
complete("PONG!")
} ~
path("crash") {
complete(sys.error("BOOM!"))
put {
// checking the alternative syntax also works:
entity(as[Source[Tweet, NotUsed]]) { tweets
onComplete(tweets.runFold(0)({ case (acc, t) => acc + 1 })) { count =>
complete(s"Total tweets received: " + count)
}
}
}
} ~ pathPrefix("inner")(getFromResourceDirectory("someDir"))
}, interface = "localhost", port = 8080)
}
} ~
pathPrefix("inner")(getFromResourceDirectory("someDir"))
}
// format: ON
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
val bindingFuture = Http().bindAndHandle(routes, interface = "0.0.0.0", port = 8080)
println(s"Server online at http://0.0.0.0:8080/\nPress RETURN to stop...")
StdIn.readLine()
bindingFuture.flatMap(_.unbind()).onComplete(_ system.terminate())

View file

@ -184,7 +184,7 @@ class MarshallingDirectivesSpec extends RoutingSpec with Inside {
"render JSON with UTF-8 encoding if no `Accept-Charset` request header is present" in {
Get() ~> complete(foo) ~> check {
responseEntity shouldEqual HttpEntity(`application/json`, foo.toJson.prettyPrint)
responseEntity shouldEqual HttpEntity(`application/json`, foo.toJson.compactPrint)
}
}
"reject JSON rendering if an `Accept-Charset` request header requests a non-UTF-8 encoding" in {

View file

@ -98,10 +98,7 @@ class RouteDirectivesSpec extends FreeSpec with GenericRoutingSpec {
import akka.http.scaladsl.model.headers.Accept
Get().withHeaders(Accept(MediaTypes.`application/json`)) ~> route ~> check {
responseAs[String] shouldEqual
"""{
| "name": "Ida",
| "age": 83
|}""".stripMarginWithNewline("\n")
"""{"name":"Ida","age":83}"""
}
Get().withHeaders(Accept(MediaTypes.`text/xml`)) ~> route ~> check {
responseAs[xml.NodeSeq] shouldEqual <data><name>Ida</name><age>83</age></data>

View file

@ -0,0 +1,149 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.javadsl.common
import akka.NotUsed
import akka.http.javadsl.model.{ ContentType, ContentTypeRange }
import akka.http.scaladsl.common
import akka.stream.javadsl.Flow
import akka.util.ByteString
/**
* Entity streaming support trait allowing rendering and receiving incoming ``Source[T, _]`` from HTTP entities.
*
* See [[JsonEntityStreamingSupport]] or [[CsvEntityStreamingSupport]] for default implementations.
*/
abstract class EntityStreamingSupport {
/** Read-side, what content types it is able to frame and unmarshall. */
def supported: ContentTypeRange
/** Write-side, defines what Content-Type the Marshaller should offer and the final Content-Type of the response. */
def contentType: ContentType
/**
* Read-side, decode incoming framed entity.
* For example with an incoming JSON array, chunk it up into JSON objects contained within that array.
*/
def getFramingDecoder: Flow[ByteString, ByteString, NotUsed]
/**
* Write-side, apply framing to outgoing entity stream.
*
* Most typical usage will be a variant of `Flow[ByteString].intersperse`.
*
* For example for rendering a JSON array one would return
* `Flow[ByteString].intersperse(ByteString("["), ByteString(","), ByteString("]"))`
* and for rendering a new-line separated CSV simply `Flow[ByteString].intersperse(ByteString("\n"))`.
*/
def getFramingRenderer: Flow[ByteString, ByteString, NotUsed]
/**
* Read-side, allows changing what content types are accepted by this framing.
*
* EntityStreamingSupport traits MUST support re-configuring the accepted [[ContentTypeRange]].
*
* This is in order to support a-typical APIs which users still want to communicate with using
* the provided support trait. Typical examples include APIs which return valid `application/json`
* however advertise the content type as being `application/javascript` or vendor specific content types,
* which still parse correctly as JSON, CSV or something else that a provided support trait is built for.
*
* NOTE: Implementations should specialize the return type to their own Type!
*/
def withSupported(range: ContentTypeRange): EntityStreamingSupport
/**
* Write-side, defines what Content-Type the Marshaller should offer and the final Content-Type of the response.
*
* EntityStreamingSupport traits MUST support re-configuring the offered [[ContentType]].
* This is due to the need integrating with existing systems which sometimes excpect custom Content-Types,
* however really are just plain JSON or something else internally (perhaps with slight extensions).
*
* NOTE: Implementations should specialize the return type to their own Type!
*/
def withContentType(range: ContentType): EntityStreamingSupport
/**
* Write-side / read-side, defines if (un)marshalling should be done in parallel.
*
* This may be beneficial marshalling the bottleneck in the pipeline.
*
* See also [[parallelism]] and [[withParallelMarshalling]].
*/
def parallelism: Int
/**
* Write-side / read-side, defines if (un)marshalling of incoming stream elements should be perserved or not.
*
* Allowing for parallel and unordered (un)marshalling often yields higher throughput and also allows avoiding
* head-of-line blocking if some elements are much larger than others.
*
* See also [[parallelism]] and [[withParallelMarshalling]].
*/
def unordered: Boolean
/**
* Write-side / read-side, defines parallelism and if ordering should be preserved or not of Source element marshalling.
*
* Sometimes marshalling multiple elements at once (esp. when elements are not evenly sized, and ordering is not enforced)
* may yield in higher throughput.
*
* NOTE: Implementations should specialize the return type to their own Type!
*/
def withParallelMarshalling(parallelism: Int, unordered: Boolean): EntityStreamingSupport
}
/**
* Entity streaming support, independent of used Json parsing library etc.
*/
object EntityStreamingSupport {
/**
* Default `application/json` entity streaming support.
*
* Provides framing (based on scanning the incoming dataBytes for valid JSON objects, so for example uploads using arrays or
* new-line separated JSON objects are all parsed correctly) and rendering of Sources as JSON Arrays.
* A different very popular style of returning streaming JSON is to separate JSON objects on a line-by-line basis,
* you can configure the support trait to do so by calling `withFramingRendererFlow`.
*
* Limits the maximum JSON object length to 8KB, if you want to increase this limit provide a value explicitly.
*
* See also <a href="https://en.wikipedia.org/wiki/JSON_Streaming">https://en.wikipedia.org/wiki/JSON_Streaming</a>
*/
def json(): JsonEntityStreamingSupport = json(8 * 1024)
/**
* Default `application/json` entity streaming support.
*
* Provides framing (based on scanning the incoming dataBytes for valid JSON objects, so for example uploads using arrays or
* new-line separated JSON objects are all parsed correctly) and rendering of Sources as JSON Arrays.
* A different very popular style of returning streaming JSON is to separate JSON objects on a line-by-line basis,
* you can configure the support trait to do so by calling `withFramingRendererFlow`.
*
* See also <a href="https://en.wikipedia.org/wiki/JSON_Streaming">https://en.wikipedia.org/wiki/JSON_Streaming</a>
*/
def json(maxObjectLength: Int): JsonEntityStreamingSupport = common.EntityStreamingSupport.json(maxObjectLength)
/**
* Default `text/csv(UTF-8)` entity streaming support.
* Provides framing and rendering of `\n` separated lines and marshalling Sources into such values.
*
* Limits the maximum line-length to 8KB, if you want to increase this limit provide a value explicitly.
*/
def csv(): CsvEntityStreamingSupport = csv(8 * 1024)
/**
* Default `text/csv(UTF-8)` entity streaming support.
* Provides framing and rendering of `\n` separated lines and marshalling Sources into such values.
*/
def csv(maxLineLength: Int): CsvEntityStreamingSupport = common.EntityStreamingSupport.csv(maxLineLength)
}
// extends Scala base, in order to get linearization right and (as we can't go into traits here, because companion object needed)
abstract class JsonEntityStreamingSupport extends common.EntityStreamingSupport {
def withFramingRendererFlow(flow: Flow[ByteString, ByteString, NotUsed]): JsonEntityStreamingSupport
}
// extends Scala base, in order to get linearization right and (as we can't go into traits here, because companion object needed)
abstract class CsvEntityStreamingSupport extends common.EntityStreamingSupport {
def withFramingRendererFlow(flow: Flow[ByteString, ByteString, NotUsed]): CsvEntityStreamingSupport
}

View file

@ -16,6 +16,7 @@ import akka.japi.Util
import akka.util.ByteString
import scala.concurrent.ExecutionContext
import scala.annotation.unchecked.uncheckedVariance
import scala.language.implicitConversions
object Marshaller {
@ -56,29 +57,29 @@ object Marshaller {
// TODO make sure these are actually usable in a sane way
def wrapEntity[A, C](f: function.BiFunction[ExecutionContext, C, A], m: Marshaller[A, RequestEntity], mediaType: MediaType): Marshaller[C, RequestEntity] = {
val scalaMarshaller = m.asScalaToEntityMarshaller
val scalaMarshaller = m.asScalaCastOutput
fromScala(scalaMarshaller.wrapWithEC(mediaType.asScala) { ctx c: C f(ctx, c) }(ContentTypeOverrider.forEntity))
}
def wrapEntity[A, C, E <: RequestEntity](f: function.Function[C, A], m: Marshaller[A, E], mediaType: MediaType): Marshaller[C, RequestEntity] = {
val scalaMarshaller = m.asScalaToEntityMarshaller
val scalaMarshaller = m.asScalaCastOutput
fromScala(scalaMarshaller.wrap(mediaType.asScala)((in: C) f.apply(in))(ContentTypeOverrider.forEntity))
}
def entityToOKResponse[A](m: Marshaller[A, _ <: RequestEntity]): Marshaller[A, HttpResponse] = {
fromScala(marshalling.Marshaller.fromToEntityMarshaller[A]()(m.asScalaToEntityMarshaller))
fromScala(marshalling.Marshaller.fromToEntityMarshaller[A]()(m.asScalaCastOutput))
}
def entityToResponse[A, R <: RequestEntity](status: StatusCode, m: Marshaller[A, R]): Marshaller[A, HttpResponse] = {
fromScala(marshalling.Marshaller.fromToEntityMarshaller[A](status.asScala)(m.asScalaToEntityMarshaller))
fromScala(marshalling.Marshaller.fromToEntityMarshaller[A](status.asScala)(m.asScalaCastOutput))
}
def entityToResponse[A](status: StatusCode, headers: java.lang.Iterable[HttpHeader], m: Marshaller[A, _ <: RequestEntity]): Marshaller[A, HttpResponse] = {
fromScala(marshalling.Marshaller.fromToEntityMarshaller[A](status.asScala, Util.immutableSeq(headers).map(_.asScala))(m.asScalaToEntityMarshaller)) // TODO can we avoid the map() ?
fromScala(marshalling.Marshaller.fromToEntityMarshaller[A](status.asScala, Util.immutableSeq(headers).map(_.asScala))(m.asScalaCastOutput)) // TODO can we avoid the map() ?
}
def entityToOKResponse[A](headers: java.lang.Iterable[HttpHeader], m: Marshaller[A, _ <: RequestEntity]): Marshaller[A, HttpResponse] = {
fromScala(marshalling.Marshaller.fromToEntityMarshaller[A](headers = Util.immutableSeq(headers).map(_.asScala))(m.asScalaToEntityMarshaller)) // TODO avoid the map()
fromScala(marshalling.Marshaller.fromToEntityMarshaller[A](headers = Util.immutableSeq(headers).map(_.asScala))(m.asScalaCastOutput)) // TODO avoid the map()
}
// these are methods not varargs to avoid call site warning about unchecked type params
@ -140,13 +141,14 @@ object Marshaller {
m.asScala.map(_.asScala)
}
class Marshaller[A, B] private (implicit val asScala: marshalling.Marshaller[A, B]) {
class Marshaller[-A, +B] private (implicit val asScala: marshalling.Marshaller[A, B]) {
import Marshaller.fromScala
/** INTERNAL API: involves unsafe cast (however is very fast) */
// TODO would be nice to not need this special case
def asScalaToEntityMarshaller[C]: marshalling.Marshaller[A, C] = asScala.asInstanceOf[marshalling.Marshaller[A, C]]
private[akka] def asScalaCastOutput[C]: marshalling.Marshaller[A, C] = asScala.asInstanceOf[marshalling.Marshaller[A, C]]
def map[C](f: function.Function[B, C]): Marshaller[A, C] = fromScala(asScala.map(f.apply))
def map[C](f: function.Function[B @uncheckedVariance, C]): Marshaller[A, C] = fromScala(asScala.map(f.apply))
def compose[C](f: function.Function[C, A]): Marshaller[C, B] = fromScala(asScala.compose(f.apply))
def compose[C](f: function.Function[C, A @uncheckedVariance]): Marshaller[C, B] = fromScala(asScala.compose(f.apply))
}

View file

@ -5,11 +5,11 @@
package akka.http.javadsl.server
import akka.http.impl.util.JavaMapping
import akka.http.javadsl.server.directives.TimeoutDirectives
import akka.http.javadsl.server.directives.{ FramedEntityStreamingDirectives, TimeoutDirectives }
import scala.annotation.varargs
abstract class AllDirectives extends TimeoutDirectives
abstract class AllDirectives extends FramedEntityStreamingDirectives
/**
* INTERNAL API

View file

@ -8,9 +8,13 @@ import java.util.concurrent.CompletionStage
import akka.http.impl.util.JavaMapping._
import akka.http.impl.util._
import akka.http.javadsl.common.EntityStreamingSupport
import akka.http.{ javadsl, scaladsl }
import akka.http.scaladsl.server.{ directives sdirectives }
import akka.http.scaladsl.{ common scommon }
import akka.http.javadsl.server.{ directives jdirectives }
import akka.http.javadsl.{ common jcommon }
import scala.collection.immutable
/**
@ -43,6 +47,8 @@ private[http] object RoutingJavaMapping {
}
implicit object convertRouteResult extends Inherited[javadsl.server.RouteResult, scaladsl.server.RouteResult]
implicit object convertEntityStreamingSupport extends Inherited[EntityStreamingSupport, scommon.EntityStreamingSupport]
implicit object convertDirectoryRenderer extends Inherited[jdirectives.DirectoryRenderer, sdirectives.FileAndResourceDirectives.DirectoryRenderer]
implicit object convertContentTypeResolver extends Inherited[jdirectives.ContentTypeResolver, sdirectives.ContentTypeResolver]
implicit object convertDirectoryListing extends Inherited[jdirectives.DirectoryListing, sdirectives.DirectoryListing]

View file

@ -0,0 +1,61 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.javadsl.server.directives
import java.util.function.{ Function JFunction }
import java.util.{ List JList, Map JMap }
import akka.NotUsed
import akka.http.javadsl.common.EntityStreamingSupport
import akka.http.javadsl.marshalling.Marshaller
import akka.http.javadsl.model.{ HttpEntity, _ }
import akka.http.javadsl.server.Route
import akka.http.javadsl.unmarshalling.Unmarshaller
import akka.http.scaladsl.marshalling.{ Marshalling, ToByteStringMarshaller, ToResponseMarshallable }
import akka.http.scaladsl.server.{ Directives D }
import akka.stream.javadsl.Source
import akka.util.ByteString
/** EXPERIMENTAL API */
abstract class FramedEntityStreamingDirectives extends TimeoutDirectives {
import akka.http.javadsl.server.RoutingJavaMapping._
import akka.http.javadsl.server.RoutingJavaMapping.Implicits._
@CorrespondsTo("asSourceOf")
def entityAsSourceOf[T](um: Unmarshaller[ByteString, T], support: EntityStreamingSupport,
inner: java.util.function.Function[Source[T, NotUsed], Route]): Route = RouteAdapter {
val umm = D.asSourceOf(um.asScala, support.asScala)
D.entity(umm) { s: akka.stream.scaladsl.Source[T, NotUsed]
inner(s.asJava).delegate
}
}
// implicits and multiple parameter lists used internally, Java caller does not benefit or use it
@CorrespondsTo("complete")
def completeWithSource[T, M](source: Source[T, M])(implicit m: Marshaller[T, ByteString], support: EntityStreamingSupport): Route = RouteAdapter {
import akka.http.scaladsl.marshalling.PredefinedToResponseMarshallers._
val mm = m.map(ByteStringAsEntityFn).asScalaCastOutput[akka.http.scaladsl.model.RequestEntity]
val mmm = fromEntityStreamingSupportAndEntityMarshaller[T, M](support.asScala, mm)
val response = ToResponseMarshallable(source.asScala)(mmm)
D.complete(response)
}
// implicits and multiple parameter lists used internally, Java caller does not benefit or use it
@CorrespondsTo("complete")
def completeOKWithSource[T, M](source: Source[T, M])(implicit m: Marshaller[T, RequestEntity], support: EntityStreamingSupport): Route = RouteAdapter {
import akka.http.scaladsl.marshalling.PredefinedToResponseMarshallers._
// don't try this at home:
val mm = m.asScalaCastOutput[akka.http.scaladsl.model.RequestEntity].map(_.httpEntity.asInstanceOf[akka.http.scaladsl.model.RequestEntity])
implicit val mmm = fromEntityStreamingSupportAndEntityMarshaller[T, M](support.asScala, mm)
val response = ToResponseMarshallable(source.asScala)
D.complete(response)
}
private[this] val ByteStringAsEntityFn = new java.util.function.Function[ByteString, HttpEntity]() {
override def apply(bs: ByteString): HttpEntity = HttpEntities.create(bs)
}
}
object FramedEntityStreamingDirectives extends FramedEntityStreamingDirectives

View file

@ -23,6 +23,8 @@ abstract class FutureDirectives extends FormFieldDirectives {
/**
* "Unwraps" a `CompletionStage<T>` and runs the inner route after future
* completion with the future's value as an extraction of type `Try<T>`.
*
* @group future
*/
def onComplete[T](f: Supplier[CompletionStage[T]], inner: JFunction[Try[T], Route]) = RouteAdapter {
D.onComplete(f.get.toScala.recover(unwrapCompletionException)) { value
@ -30,6 +32,18 @@ abstract class FutureDirectives extends FormFieldDirectives {
}
}
/**
* "Unwraps" a `CompletionStage<T>` and runs the inner route after future
* completion with the future's value as an extraction of type `Try<T>`.
*
* @group future
*/
def onComplete[T](cs: CompletionStage[T], inner: JFunction[Try[T], Route]) = RouteAdapter {
D.onComplete(cs.toScala.recover(unwrapCompletionException)) { value
inner(value).delegate
}
}
/**
* "Unwraps" a `CompletionStage[T]` and runs the inner route after future
* completion with the future's value as an extraction of type `T` if
@ -51,6 +65,8 @@ abstract class FutureDirectives extends FormFieldDirectives {
* completion with the stage's value as an extraction of type `T`.
* If the stage fails its failure Throwable is bubbled up to the nearest
* ExceptionHandler.
*
* @group future
*/
def onSuccess[T](f: Supplier[CompletionStage[T]], inner: JFunction[T, Route]) = RouteAdapter {
D.onSuccess(f.get.toScala.recover(unwrapCompletionException)) { value
@ -64,6 +80,8 @@ abstract class FutureDirectives extends FormFieldDirectives {
* If the completion stage succeeds the request is completed using the values marshaller
* (This directive therefore requires a marshaller for the completion stage value type to be
* provided.)
*
* @group future
*/
def completeOrRecoverWith[T](f: Supplier[CompletionStage[T]], marshaller: Marshaller[T, RequestEntity], inner: JFunction[Throwable, Route]): Route = RouteAdapter {
val magnet = CompleteOrRecoverWithMagnet(f.get.toScala)(Marshaller.asScalaEntityMarshaller(marshaller))

View file

@ -101,6 +101,9 @@ abstract class Unmarshaller[-A, B] extends UnmarshallerBase[A, B] {
implicit def asScala: akka.http.scaladsl.unmarshalling.Unmarshaller[A, B]
/** INTERNAL API */
private[akka] def asScalaCastInput[I]: unmarshalling.Unmarshaller[I, B] = asScala.asInstanceOf[unmarshalling.Unmarshaller[I, B]]
def unmarshall(a: A, ec: ExecutionContext, mat: Materializer): CompletionStage[B] = asScala.apply(a)(ec, mat).toJava
/**

View file

@ -0,0 +1,48 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.scaladsl.common
import akka.NotUsed
import akka.event.Logging
import akka.http.javadsl.{ common, model jm }
import akka.http.scaladsl.model.{ ContentType, ContentTypeRange, ContentTypes }
import akka.stream.scaladsl.{ Flow, Framing }
import akka.util.ByteString
final class CsvEntityStreamingSupport private[akka] (
maxLineLength: Int,
val supported: ContentTypeRange,
val contentType: ContentType,
val framingRenderer: Flow[ByteString, ByteString, NotUsed],
val parallelism: Int,
val unordered: Boolean
) extends common.CsvEntityStreamingSupport {
import akka.http.impl.util.JavaMapping.Implicits._
def this(maxObjectSize: Int) =
this(
maxObjectSize,
ContentTypeRange(ContentTypes.`text/csv(UTF-8)`),
ContentTypes.`text/csv(UTF-8)`,
Flow[ByteString].intersperse(ByteString("\n")),
1, false)
override val framingDecoder: Flow[ByteString, ByteString, NotUsed] =
Framing.delimiter(ByteString("\n"), maxLineLength)
override def withFramingRendererFlow(framingRendererFlow: akka.stream.javadsl.Flow[ByteString, ByteString, NotUsed]): CsvEntityStreamingSupport =
withFramingRenderer(framingRendererFlow.asScala)
def withFramingRenderer(framingRendererFlow: Flow[ByteString, ByteString, NotUsed]): CsvEntityStreamingSupport =
new CsvEntityStreamingSupport(maxLineLength, supported, contentType, framingRendererFlow, parallelism, unordered)
override def withContentType(ct: jm.ContentType): CsvEntityStreamingSupport =
new CsvEntityStreamingSupport(maxLineLength, supported, ct.asScala, framingRenderer, parallelism, unordered)
override def withSupported(range: jm.ContentTypeRange): CsvEntityStreamingSupport =
new CsvEntityStreamingSupport(maxLineLength, range.asScala, contentType, framingRenderer, parallelism, unordered)
override def withParallelMarshalling(parallelism: Int, unordered: Boolean): CsvEntityStreamingSupport =
new CsvEntityStreamingSupport(maxLineLength, supported, contentType, framingRenderer, parallelism, unordered)
override def toString = s"""${Logging.simpleName(getClass)}($maxLineLength, $supported, $contentType)"""
}

View file

@ -0,0 +1,141 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.scaladsl.common
import akka.NotUsed
import akka.http.javadsl.{ common, model jm }
import akka.http.scaladsl.model._
import akka.stream.scaladsl.Flow
import akka.util.ByteString
/**
* Entity streaming support trait allowing rendering and receiving incoming ``Source[T, _]`` from HTTP entities.
*
* See [[JsonEntityStreamingSupport]] or [[CsvEntityStreamingSupport]] for default implementations.
*/
abstract class EntityStreamingSupport extends common.EntityStreamingSupport {
/** Read-side, what content types it is able to frame and unmarshall. */
def supported: ContentTypeRange
/** Write-side, defines what Content-Type the Marshaller should offer and the final Content-Type of the response. */
def contentType: ContentType
/**
* Read-side, decode incoming framed entity.
* For example with an incoming JSON array, chunk it up into JSON objects contained within that array.
*/
def framingDecoder: Flow[ByteString, ByteString, NotUsed]
override final def getFramingDecoder = framingDecoder.asJava
/**
* Write-side, apply framing to outgoing entity stream.
*
* Most typical usage will be a variant of `Flow[ByteString].intersperse`.
*
* For example for rendering a JSON array one would return
* `Flow[ByteString].intersperse(ByteString("["), ByteString(","), ByteString("]"))`
* and for rendering a new-line separated CSV simply `Flow[ByteString].intersperse(ByteString("\n"))`.
*/
def framingRenderer: Flow[ByteString, ByteString, NotUsed]
override final def getFramingRenderer = framingRenderer.asJava
/**
* Read-side, allows changing what content types are accepted by this framing.
*
* EntityStreamingSupport traits MUST support re-configuring the accepted [[ContentTypeRange]].
*
* This is in order to support a-typical APIs which users still want to communicate with using
* the provided support trait. Typical examples include APIs which return valid `application/json`
* however advertise the content type as being `application/javascript` or vendor specific content types,
* which still parse correctly as JSON, CSV or something else that a provided support trait is built for.
*
* NOTE: Implementations should specialize the return type to their own Type!
*/
override def withSupported(range: jm.ContentTypeRange): EntityStreamingSupport
/**
* Write-side, defines what Content-Type the Marshaller should offer and the final Content-Type of the response.
*
* EntityStreamingSupport traits MUST support re-configuring the offered [[ContentType]].
* This is due to the need integrating with existing systems which sometimes excpect custom Content-Types,
* however really are just plain JSON or something else internally (perhaps with slight extensions).
*
* NOTE: Implementations should specialize the return type to their own Type!
*/
override def withContentType(range: jm.ContentType): EntityStreamingSupport
/**
* Write-side / read-side, defines if (un)marshalling should be done in parallel.
*
* This may be beneficial marshalling the bottleneck in the pipeline.
*
* See also [[parallelism]] and [[withParallelMarshalling]].
*/
def parallelism: Int
/**
* Write-side / read-side, defines if (un)marshalling of incoming stream elements should be perserved or not.
*
* Allowing for parallel and unordered (un)marshalling often yields higher throughput and also allows avoiding
* head-of-line blocking if some elements are much larger than others.
*
* See also [[parallelism]] and [[withParallelMarshalling]].
*/
def unordered: Boolean
/**
* Write-side / read-side, defines parallelism and if ordering should be preserved or not of Source element marshalling.
*
* Sometimes marshalling multiple elements at once (esp. when elements are not evenly sized, and ordering is not enforced)
* may yield in higher throughput.
*
* NOTE: Implementations should specialize the return type to their own Type!
*/
def withParallelMarshalling(parallelism: Int, unordered: Boolean): EntityStreamingSupport
}
/**
* Entity streaming support, independent of used Json parsing library etc.
*/
object EntityStreamingSupport {
/**
* Default `application/json` entity streaming support.
*
* Provides framing (based on scanning the incoming dataBytes for valid JSON objects, so for example uploads using arrays or
* new-line separated JSON objects are all parsed correctly) and rendering of Sources as JSON Arrays.
* A different very popular style of returning streaming JSON is to separate JSON objects on a line-by-line basis,
* you can configure the support trait to do so by calling `withFramingRendererFlow`.
*
* Limits the maximum JSON object length to 8KB, if you want to increase this limit provide a value explicitly.
*
* See also <a href="https://en.wikipedia.org/wiki/JSON_Streaming">https://en.wikipedia.org/wiki/JSON_Streaming</a>
*/
def json(): JsonEntityStreamingSupport = json(8 * 1024)
/**
* Default `application/json` entity streaming support.
*
* Provides framing (based on scanning the incoming dataBytes for valid JSON objects, so for example uploads using arrays or
* new-line separated JSON objects are all parsed correctly) and rendering of Sources as JSON Arrays.
* A different very popular style of returning streaming JSON is to separate JSON objects on a line-by-line basis,
* you can configure the support trait to do so by calling `withFramingRendererFlow`.
*
* See also <a href="https://en.wikipedia.org/wiki/JSON_Streaming">https://en.wikipedia.org/wiki/JSON_Streaming</a>
*/
def json(maxObjectLength: Int): JsonEntityStreamingSupport = new JsonEntityStreamingSupport(maxObjectLength)
/**
* Default `text/csv(UTF-8)` entity streaming support.
* Provides framing and rendering of `\n` separated lines and marshalling Sources into such values.
*
* Limits the maximum line-length to 8KB, if you want to increase this limit provide a value explicitly.
*/
def csv(): CsvEntityStreamingSupport = csv(8 * 1024)
/**
* Default `text/csv(UTF-8)` entity streaming support.
* Provides framing and rendering of `\n` separated lines and marshalling Sources into such values.
*/
def csv(maxLineLength: Int): CsvEntityStreamingSupport = new CsvEntityStreamingSupport(maxLineLength)
}

View file

@ -0,0 +1,49 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.scaladsl.common
import akka.NotUsed
import akka.event.Logging
import akka.http.javadsl.{ common, model jm }
import akka.http.scaladsl.model.{ ContentType, ContentTypeRange, ContentTypes }
import akka.stream.scaladsl.Flow
import akka.util.ByteString
final class JsonEntityStreamingSupport private[akka] (
maxObjectSize: Int,
val supported: ContentTypeRange,
val contentType: ContentType,
val framingRenderer: Flow[ByteString, ByteString, NotUsed],
val parallelism: Int,
val unordered: Boolean
) extends common.JsonEntityStreamingSupport {
import akka.http.impl.util.JavaMapping.Implicits._
def this(maxObjectSize: Int) =
this(
maxObjectSize,
ContentTypeRange(ContentTypes.`application/json`),
ContentTypes.`application/json`,
Flow[ByteString].intersperse(ByteString("["), ByteString(","), ByteString("]")),
1, false)
override val framingDecoder: Flow[ByteString, ByteString, NotUsed] =
akka.stream.scaladsl.JsonFraming.objectScanner(maxObjectSize)
override def withFramingRendererFlow(framingRendererFlow: akka.stream.javadsl.Flow[ByteString, ByteString, NotUsed]): JsonEntityStreamingSupport =
withFramingRenderer(framingRendererFlow.asScala)
def withFramingRenderer(framingRendererFlow: Flow[ByteString, ByteString, NotUsed]): JsonEntityStreamingSupport =
new JsonEntityStreamingSupport(maxObjectSize, supported, contentType, framingRendererFlow, parallelism, unordered)
override def withContentType(ct: jm.ContentType): JsonEntityStreamingSupport =
new JsonEntityStreamingSupport(maxObjectSize, supported, ct.asScala, framingRenderer, parallelism, unordered)
override def withSupported(range: jm.ContentTypeRange): JsonEntityStreamingSupport =
new JsonEntityStreamingSupport(maxObjectSize, range.asScala, contentType, framingRenderer, parallelism, unordered)
override def withParallelMarshalling(parallelism: Int, unordered: Boolean): JsonEntityStreamingSupport =
new JsonEntityStreamingSupport(maxObjectSize, supported, contentType, framingRenderer, parallelism, unordered)
override def toString = s"""${Logging.simpleName(getClass)}($maxObjectSize, $supported, $contentType)"""
}

View file

@ -61,8 +61,9 @@ object StrictForm {
fsu(value.entity.data.decodeString(charsetName))
})
@implicitNotFound("In order to unmarshal a `StrictForm.Field` to type `${T}` you need to supply a " +
"`FromStringUnmarshaller[${T}]` and/or a `FromEntityUnmarshaller[${T}]`")
@implicitNotFound(msg =
s"In order to unmarshal a `StrictForm.Field` to type `$${T}` you need to supply a " +
s"`FromStringUnmarshaller[$${T}]` and/or a `FromEntityUnmarshaller[$${T}]`")
sealed trait FieldUnmarshaller[T] {
def unmarshalString(value: String)(implicit ec: ExecutionContext, mat: Materializer): Future[T]
def unmarshalPart(value: Multipart.FormData.BodyPart.Strict)(implicit ec: ExecutionContext, mat: Materializer): Future[T]

View file

@ -4,7 +4,9 @@
package akka.http.scaladsl.marshalling
import scala.concurrent.{ Future, ExecutionContext }
import akka.http.scaladsl.marshalling.Marshalling.Opaque
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.control.NonFatal
import akka.http.scaladsl.model._
import akka.http.scaladsl.util.FastFuture
@ -174,4 +176,4 @@ object Marshalling {
def map[B](f: A B): Opaque[B] = copy(marshal = () f(marshal()))
}
}
//#
//#

View file

@ -4,12 +4,19 @@
package akka.http.scaladsl.marshalling
import akka.http.scaladsl.common.EntityStreamingSupport
import akka.stream.impl.ConstantFun
import scala.collection.immutable
import akka.http.scaladsl.util.FastFuture._
import akka.http.scaladsl.model.MediaTypes._
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.ContentNegotiator
import akka.http.scaladsl.util.FastFuture
import akka.stream.scaladsl.Source
import akka.util.ByteString
import scala.language.higherKinds
trait PredefinedToResponseMarshallers extends LowPriorityToResponseMarshallerImplicits {
@ -41,6 +48,37 @@ trait PredefinedToResponseMarshallers extends LowPriorityToResponseMarshallerImp
Marshaller(implicit ec {
case (status, headers, value) mt(value).fast map (_ map (_ map (HttpResponse(status, headers, _))))
})
implicit def fromEntityStreamingSupportAndByteStringMarshaller[T, M](implicit s: EntityStreamingSupport, m: ToByteStringMarshaller[T]): ToResponseMarshaller[Source[T, M]] = {
Marshaller[Source[T, M], HttpResponse] { implicit ec source
FastFuture successful {
Marshalling.WithFixedContentType(s.contentType, () {
val availableMarshallingsPerElement = source.mapAsync(1) { t m(t)(ec) }
// TODO optimise such that we pick the optimal marshalling only once (headAndTail needed?)
// TODO, NOTE: this is somewhat duplicated from Marshal.scala it could be made DRYer
val bestMarshallingPerElement = availableMarshallingsPerElement mapConcat { marshallings
// pick the Marshalling that matches our EntityStreamingSupport
(s.contentType match {
case best @ (_: ContentType.Binary | _: ContentType.WithFixedCharset)
marshallings collectFirst { case Marshalling.WithFixedContentType(`best`, marshal) marshal }
case best @ ContentType.WithCharset(bestMT, bestCS)
marshallings collectFirst {
case Marshalling.WithFixedContentType(`best`, marshal) marshal
case Marshalling.WithOpenCharset(`bestMT`, marshal) () marshal(bestCS)
}
}).toList
}
val marshalledElements: Source[ByteString, M] =
bestMarshallingPerElement.map(_.apply()) // marshal!
.via(s.framingRenderer)
HttpResponse(entity = HttpEntity(s.contentType, marshalledElements))
}) :: Nil
}
}
}
}
trait LowPriorityToResponseMarshallerImplicits {
@ -48,6 +86,40 @@ trait LowPriorityToResponseMarshallerImplicits {
liftMarshaller(m)
implicit def liftMarshaller[T](implicit m: ToEntityMarshaller[T]): ToResponseMarshaller[T] =
PredefinedToResponseMarshallers.fromToEntityMarshaller()
// FIXME deduplicate this!!!
implicit def fromEntityStreamingSupportAndEntityMarshaller[T, M](implicit s: EntityStreamingSupport, m: ToEntityMarshaller[T]): ToResponseMarshaller[Source[T, M]] = {
Marshaller[Source[T, M], HttpResponse] { implicit ec source
FastFuture successful {
Marshalling.WithFixedContentType(s.contentType, () {
val availableMarshallingsPerElement = source.mapAsync(1) { t m(t)(ec) }
// TODO optimise such that we pick the optimal marshalling only once (headAndTail needed?)
// TODO, NOTE: this is somewhat duplicated from Marshal.scala it could be made DRYer
val bestMarshallingPerElement = availableMarshallingsPerElement mapConcat { marshallings
// pick the Marshalling that matches our EntityStreamingSupport
(s.contentType match {
case best @ (_: ContentType.Binary | _: ContentType.WithFixedCharset)
marshallings collectFirst { case Marshalling.WithFixedContentType(`best`, marshal) marshal }
case best @ ContentType.WithCharset(bestMT, bestCS)
marshallings collectFirst {
case Marshalling.WithFixedContentType(`best`, marshal) marshal
case Marshalling.WithOpenCharset(`bestMT`, marshal) () marshal(bestCS)
}
}).toList
}
val marshalledElements: Source[ByteString, M] =
bestMarshallingPerElement.map(_.apply()) // marshal!
.flatMapConcat(_.dataBytes) // extract raw dataBytes
.via(s.framingRenderer)
HttpResponse(entity = HttpEntity(s.contentType, marshalledElements))
}) :: Nil
}
}
}
}
object PredefinedToResponseMarshallers extends PredefinedToResponseMarshallers

View file

@ -6,10 +6,12 @@ package akka.http.scaladsl
import scala.collection.immutable
import akka.http.scaladsl.model._
import akka.util.ByteString
package object marshalling {
//# marshaller-aliases
type ToEntityMarshaller[T] = Marshaller[T, MessageEntity]
type ToByteStringMarshaller[T] = Marshaller[T, ByteString]
type ToHeadersAndEntityMarshaller[T] = Marshaller[T, (immutable.Seq[HttpHeader], MessageEntity)]
type ToResponseMarshaller[T] = Marshaller[T, HttpResponse]
type ToRequestMarshaller[T] = Marshaller[T, HttpRequest]

View file

@ -36,5 +36,6 @@ trait Directives extends RouteConcatenation
with SchemeDirectives
with SecurityDirectives
with WebSocketDirectives
with FramedEntityStreamingDirectives
object Directives extends Directives

View file

@ -0,0 +1,82 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http.scaladsl.server.directives
import akka.NotUsed
import akka.http.scaladsl.common
import akka.http.scaladsl.common.EntityStreamingSupport
import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.{ Unmarshaller, _ }
import akka.http.scaladsl.util.FastFuture
import akka.stream.scaladsl.{ Flow, Keep, Source }
import akka.util.ByteString
import scala.language.implicitConversions
/**
* Allows the [[MarshallingDirectives.entity]] directive to extract a [[Source]] of elements.
*
* See [[common.EntityStreamingSupport]] for useful default framing `Flow` instances and
* support traits such as `SprayJsonSupport` (or your other favourite JSON library) to provide the needed [[Marshaller]] s.
*/
trait FramedEntityStreamingDirectives extends MarshallingDirectives {
type RequestToSourceUnmarshaller[T] = FromRequestUnmarshaller[Source[T, NotUsed]]
/**
* Extracts entity as [[Source]] of elements of type `T`.
* This is achieved by applying the implicitly provided (in the following order):
*
* - 1st: chunk-up the incoming [[ByteString]]s by applying the `Content-Type`-aware framing
* - 2nd: apply the [[Unmarshaller]] (from [[ByteString]] to `T`) for each of the respective "chunks" (e.g. for each JSON element contained within an array).
*
* The request will be rejected with an [[akka.http.scaladsl.server.UnsupportedRequestContentTypeRejection]] if
* its [[ContentType]] is not supported by the used `framing` or `unmarshaller`.
*
* Cancelling extracted [[Source]] closes the connection abruptly (same as cancelling the `entity.dataBytes`).
*
* See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route.
* By default the uploaded data is limited by the `akka.http.parsing.max-content-length`.
*/
final def asSourceOf[T](implicit um: FromByteStringUnmarshaller[T], support: EntityStreamingSupport): RequestToSourceUnmarshaller[T] =
asSourceOfInternal(um, support)
/**
* Extracts entity as [[Source]] of elements of type `T`.
* This is achieved by applying the implicitly provided (in the following order):
*
* - 1st: chunk-up the incoming [[ByteString]]s by applying the `Content-Type`-aware framing
* - 2nd: apply the [[Unmarshaller]] (from [[ByteString]] to `T`) for each of the respective "chunks" (e.g. for each JSON element contained within an array).
*
* The request will be rejected with an [[akka.http.scaladsl.server.UnsupportedRequestContentTypeRejection]] if
* its [[ContentType]] is not supported by the used `framing` or `unmarshaller`.
*
* Cancelling extracted [[Source]] closes the connection abruptly (same as cancelling the `entity.dataBytes`).
*
* See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route.
* By default the uploaded data is limited by the `akka.http.parsing.max-content-length`.
*/
final def asSourceOf[T](support: EntityStreamingSupport)(implicit um: FromByteStringUnmarshaller[T]): RequestToSourceUnmarshaller[T] =
asSourceOfInternal(um, support)
// format: OFF
private final def asSourceOfInternal[T](um: Unmarshaller[ByteString, T], support: EntityStreamingSupport): RequestToSourceUnmarshaller[T] =
Unmarshaller.withMaterializer[HttpRequest, Source[T, NotUsed]] { implicit ec implicit mat req
val entity = req.entity
if (support.supported.matches(entity.contentType)) {
val bytes = entity.dataBytes
val frames = bytes.via(support.framingDecoder)
val marshalling =
if (support.unordered) Flow[ByteString].mapAsyncUnordered(support.parallelism)(bs => um(bs)(ec, mat))
else Flow[ByteString].mapAsync(support.parallelism)(bs => um(bs)(ec, mat))
val elements = frames.viaMat(marshalling)(Keep.right)
FastFuture.successful(elements)
} else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(support.supported))
}
// format: ON
}

View file

@ -6,9 +6,15 @@ package akka.http.scaladsl.unmarshalling
import scala.collection.immutable
import akka.http.scaladsl.util.FastFuture
import akka.util.ByteString
trait PredefinedFromStringUnmarshallers {
implicit def _fromStringUnmarshallerFromByteStringUnmarshaller[T](implicit bsum: FromByteStringUnmarshaller[T]): Unmarshaller[String, T] = {
val bs = Unmarshaller.strict[String, ByteString](s ByteString(s))
bs.flatMap(implicit ec implicit mat bsum(_))
}
implicit val byteFromStringUnmarshaller: Unmarshaller[String, Byte] =
numberUnmarshaller(_.toByte, "8-bit signed integer")

View file

@ -6,6 +6,7 @@ package akka.http.scaladsl
import akka.http.scaladsl.common.StrictForm
import akka.http.scaladsl.model._
import akka.util.ByteString
package object unmarshalling {
//# unmarshaller-aliases
@ -13,6 +14,7 @@ package object unmarshalling {
type FromMessageUnmarshaller[T] = Unmarshaller[HttpMessage, T]
type FromResponseUnmarshaller[T] = Unmarshaller[HttpResponse, T]
type FromRequestUnmarshaller[T] = Unmarshaller[HttpRequest, T]
type FromByteStringUnmarshaller[T] = Unmarshaller[ByteString, T]
type FromStringUnmarshaller[T] = Unmarshaller[String, T]
type FromStrictFormFieldUnmarshaller[T] = Unmarshaller[StrictForm.Field, T]
//#

View file

@ -0,0 +1,440 @@
/*
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.ActorMaterializer
import akka.stream.impl.JsonObjectParser
import akka.stream.scaladsl.Framing.FramingException
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.AkkaSpec
import akka.util.ByteString
import scala.collection.immutable.Seq
import scala.concurrent.Await
import scala.concurrent.duration._
class JsonFramingSpec extends AkkaSpec {
implicit val mat = ActorMaterializer()
"collecting multiple json" should {
"parse json array" in {
val input =
"""
|[
| { "name" : "john" },
| { "name" : "Ég get etið gler án þess meiða mig" },
| { "name" : "jack" },
|]
|""".stripMargin // also should complete once notices end of array
val result = Source.single(ByteString(input))
.via(JsonFraming.objectScanner(Int.MaxValue))
.runFold(Seq.empty[String]) {
case (acc, entry) acc ++ Seq(entry.utf8String)
}
result.futureValue shouldBe Seq(
"""{ "name" : "john" }""",
"""{ "name" : "Ég get etið gler án þess að meiða mig" }""",
"""{ "name" : "jack" }"""
)
}
"emit single json element from string" in {
val input =
"""| { "name": "john" }
| { "name": "jack" }
""".stripMargin
val result = Source.single(ByteString(input))
.via(JsonFraming.objectScanner(Int.MaxValue))
.take(1)
.runFold(Seq.empty[String]) {
case (acc, entry) acc ++ Seq(entry.utf8String)
}
Await.result(result, 3.seconds) shouldBe Seq("""{ "name": "john" }""")
}
"parse line delimited" in {
val input =
"""| { "name": "john" }
| { "name": "jack" }
| { "name": "katie" }
""".stripMargin
val result = Source.single(ByteString(input))
.via(JsonFraming.objectScanner(Int.MaxValue))
.runFold(Seq.empty[String]) {
case (acc, entry) acc ++ Seq(entry.utf8String)
}
Await.result(result, 3.seconds) shouldBe Seq(
"""{ "name": "john" }""",
"""{ "name": "jack" }""",
"""{ "name": "katie" }""")
}
"parse comma delimited" in {
val input =
""" { "name": "john" }, { "name": "jack" }, { "name": "katie" } """
val result = Source.single(ByteString(input))
.via(JsonFraming.objectScanner(Int.MaxValue))
.runFold(Seq.empty[String]) {
case (acc, entry) acc ++ Seq(entry.utf8String)
}
result.futureValue shouldBe Seq(
"""{ "name": "john" }""",
"""{ "name": "jack" }""",
"""{ "name": "katie" }""")
}
"parse chunks successfully" in {
val input: Seq[ByteString] = Seq(
"""
|[
| { "name": "john"""".stripMargin,
"""
|},
""".stripMargin,
"""{ "na""",
"""me": "jack""",
""""}]"""").map(ByteString(_))
val result = Source.apply(input)
.via(JsonFraming.objectScanner(Int.MaxValue))
.runFold(Seq.empty[String]) {
case (acc, entry) acc ++ Seq(entry.utf8String)
}
result.futureValue shouldBe Seq(
"""{ "name": "john"
|}""".stripMargin,
"""{ "name": "jack"}""")
}
}
"collecting json buffer" when {
"nothing is supplied" should {
"return nothing" in {
val buffer = new JsonObjectParser()
buffer.poll() should ===(None)
}
}
"valid json is supplied" which {
"has one object" should {
"successfully parse empty object" in {
val buffer = new JsonObjectParser()
buffer.offer(ByteString("""{}"""))
buffer.poll().get.utf8String shouldBe """{}"""
}
"successfully parse single field having string value" in {
val buffer = new JsonObjectParser()
buffer.offer(ByteString("""{ "name": "john"}"""))
buffer.poll().get.utf8String shouldBe """{ "name": "john"}"""
}
"successfully parse single field having string value containing space" in {
val buffer = new JsonObjectParser()
buffer.offer(ByteString("""{ "name": "john doe"}"""))
buffer.poll().get.utf8String shouldBe """{ "name": "john doe"}"""
}
"successfully parse single field having string value containing curly brace" in {
val buffer = new JsonObjectParser()
buffer.offer(ByteString("""{ "name": "john{"""))
buffer.offer(ByteString("}"))
buffer.offer(ByteString("\""))
buffer.offer(ByteString("}"))
buffer.poll().get.utf8String shouldBe """{ "name": "john{}"}"""
}
"successfully parse single field having string value containing curly brace and escape character" in {
val buffer = new JsonObjectParser()
buffer.offer(ByteString("""{ "name": "john"""))
buffer.offer(ByteString("\\\""))
buffer.offer(ByteString("{"))
buffer.offer(ByteString("}"))
buffer.offer(ByteString("\\\""))
buffer.offer(ByteString(" "))
buffer.offer(ByteString("hey"))
buffer.offer(ByteString("\""))
buffer.offer(ByteString("}"))
buffer.poll().get.utf8String shouldBe """{ "name": "john\"{}\" hey"}"""
}
"successfully parse single field having integer value" in {
val buffer = new JsonObjectParser()
buffer.offer(ByteString("""{ "age": 101}"""))
buffer.poll().get.utf8String shouldBe """{ "age": 101}"""
}
"successfully parse single field having decimal value" in {
val buffer = new JsonObjectParser()
buffer.offer(ByteString("""{ "age": 101}"""))
buffer.poll().get.utf8String shouldBe """{ "age": 101}"""
}
"successfully parse single field having nested object" in {
val buffer = new JsonObjectParser()
buffer.offer(ByteString(
"""
|{ "name": "john",
| "age": 101,
| "address": {
| "street": "Straight Street",
| "postcode": 1234
| }
|}
| """.stripMargin))
buffer.poll().get.utf8String shouldBe """{ "name": "john",
| "age": 101,
| "address": {
| "street": "Straight Street",
| "postcode": 1234
| }
|}""".stripMargin
}
"successfully parse single field having multiple level of nested object" in {
val buffer = new JsonObjectParser()
buffer.offer(ByteString(
"""
|{ "name": "john",
| "age": 101,
| "address": {
| "street": {
| "name": "Straight",
| "type": "Avenue"
| },
| "postcode": 1234
| }
|}
| """.stripMargin))
buffer.poll().get.utf8String shouldBe """{ "name": "john",
| "age": 101,
| "address": {
| "street": {
| "name": "Straight",
| "type": "Avenue"
| },
| "postcode": 1234
| }
|}""".stripMargin
}
}
"has nested array" should {
"successfully parse" in {
val buffer = new JsonObjectParser()
buffer.offer(ByteString(
"""
|{ "name": "john",
| "things": [
| 1,
| "hey",
| 3,
| "there"
| ]
|}
| """.stripMargin))
buffer.poll().get.utf8String shouldBe """{ "name": "john",
| "things": [
| 1,
| "hey",
| 3,
| "there"
| ]
|}""".stripMargin
}
}
"has complex object graph" should {
"successfully parse" in {
val buffer = new JsonObjectParser()
buffer.offer(ByteString(
"""
|{
| "name": "john",
| "addresses": [
| {
| "street": "3 Hopson Street",
| "postcode": "ABC-123",
| "tags": ["work", "office"],
| "contactTime": [
| {"time": "0900-1800", "timezone", "UTC"}
| ]
| },
| {
| "street": "12 Adielie Road",
| "postcode": "ZZY-888",
| "tags": ["home"],
| "contactTime": [
| {"time": "0800-0830", "timezone", "UTC"},
| {"time": "1800-2000", "timezone", "UTC"}
| ]
| }
| ]
|}
| """.stripMargin))
buffer.poll().get.utf8String shouldBe """{
| "name": "john",
| "addresses": [
| {
| "street": "3 Hopson Street",
| "postcode": "ABC-123",
| "tags": ["work", "office"],
| "contactTime": [
| {"time": "0900-1800", "timezone", "UTC"}
| ]
| },
| {
| "street": "12 Adielie Road",
| "postcode": "ZZY-888",
| "tags": ["home"],
| "contactTime": [
| {"time": "0800-0830", "timezone", "UTC"},
| {"time": "1800-2000", "timezone", "UTC"}
| ]
| }
| ]
|}""".stripMargin
}
}
"has multiple fields" should {
"parse successfully" in {
val buffer = new JsonObjectParser()
buffer.offer(ByteString("""{ "name": "john", "age": 101}"""))
buffer.poll().get.utf8String shouldBe """{ "name": "john", "age": 101}"""
}
"parse successfully despite valid whitespaces around json" in {
val buffer = new JsonObjectParser()
buffer.offer(ByteString(
"""
|
|
|{"name": "john"
|, "age": 101}""".stripMargin))
buffer.poll().get.utf8String shouldBe
"""{"name": "john"
|, "age": 101}""".stripMargin
}
}
"has multiple objects" should {
"pops the right object as buffer is filled" in {
val input =
"""
| {
| "name": "john",
| "age": 32
| },
| {
| "name": "katie",
| "age": 25
| }
""".stripMargin
val buffer = new JsonObjectParser()
buffer.offer(ByteString(input))
buffer.poll().get.utf8String shouldBe
"""{
| "name": "john",
| "age": 32
| }""".stripMargin
buffer.poll().get.utf8String shouldBe
"""{
| "name": "katie",
| "age": 25
| }""".stripMargin
buffer.poll() should ===(None)
buffer.offer(ByteString("""{"name":"jenkins","age": """))
buffer.poll() should ===(None)
buffer.offer(ByteString("65 }"))
buffer.poll().get.utf8String shouldBe """{"name":"jenkins","age": 65 }"""
}
}
"returns none until valid json is encountered" in {
val buffer = new JsonObjectParser()
"""{ "name": "john"""".foreach {
c
buffer.offer(ByteString(c))
buffer.poll() should ===(None)
}
buffer.offer(ByteString("}"))
buffer.poll().get.utf8String shouldBe """{ "name": "john"}"""
}
"invalid json is supplied" should {
"fail if it's broken from the start" in {
val buffer = new JsonObjectParser()
buffer.offer(ByteString("""THIS IS NOT VALID { "name": "john"}"""))
a[FramingException] shouldBe thrownBy { buffer.poll() }
}
"fail if it's broken at the end" in {
val buffer = new JsonObjectParser()
buffer.offer(ByteString("""{ "name": "john"} THIS IS NOT VALID"""))
buffer.poll() // first emitting the valid element
a[FramingException] shouldBe thrownBy { buffer.poll() }
}
}
}
"fail on too large initial object" in {
val input =
"""
| { "name": "john" }, { "name": "jack" }
""".stripMargin
val result = Source.single(ByteString(input))
.via(JsonFraming.objectScanner(5)).map(_.utf8String)
.runFold(Seq.empty[String]) {
case (acc, entry) acc ++ Seq(entry)
}
a[FramingException] shouldBe thrownBy {
Await.result(result, 3.seconds)
}
}
"fail when 2nd object is too large" in {
val input = List(
"""{ "name": "john" }""",
"""{ "name": "jack" }""",
"""{ "name": "very very long name somehow. how did this happen?" }""").map(s ByteString(s))
val probe = Source(input)
.via(JsonFraming.objectScanner(48))
.runWith(TestSink.probe)
probe.ensureSubscription()
probe
.request(1)
.expectNext(ByteString("""{ "name": "john" }"""))
.request(1)
.expectNext(ByteString("""{ "name": "jack" }"""))
.request(1)
.expectError().getMessage should include("exceeded")
}
}
}

View file

@ -0,0 +1,155 @@
/**
* Copyright (C) 2014-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import akka.stream.scaladsl.Framing.FramingException
import akka.util.ByteString
import scala.annotation.switch
/**
* INTERNAL API: Use [[akka.stream.scaladsl.JsonFraming]] instead.
*/
private[akka] object JsonObjectParser {
final val SquareBraceStart = '['.toByte
final val SquareBraceEnd = ']'.toByte
final val CurlyBraceStart = '{'.toByte
final val CurlyBraceEnd = '}'.toByte
final val DoubleQuote = '\''.toByte
final val Backslash = '\\'.toByte
final val Comma = ','.toByte
final val LineBreak = '\n'.toByte
final val LineBreak2 = '\r'.toByte
final val Tab = '\t'.toByte
final val Space = ' '.toByte
final val Whitespace = Set(LineBreak, LineBreak2, Tab, Space)
def isWhitespace(input: Byte): Boolean =
Whitespace.contains(input)
}
/**
* INTERNAL API: Use [[akka.stream.scaladsl.JsonFraming]] instead.
*
* **Mutable** framing implementation that given any number of [[ByteString]] chunks, can emit JSON objects contained within them.
* Typically JSON objects are separated by new-lines or comas, however a top-level JSON Array can also be understood and chunked up
* into valid JSON objects by this framing implementation.
*
* Leading whitespace between elements will be trimmed.
*/
private[akka] class JsonObjectParser(maximumObjectLength: Int = Int.MaxValue) {
import JsonObjectParser._
private var buffer: ByteString = ByteString.empty
private var pos = 0 // latest position of pointer while scanning for json object end
private var trimFront = 0 // number of chars to drop from the front of the bytestring before emitting (skip whitespace etc)
private var depth = 0 // counter of object-nesting depth, once hits 0 an object should be emitted
private var charsInObject = 0
private var completedObject = false
private var inStringExpression = false
private var isStartOfEscapeSequence = false
/**
* Appends input ByteString to internal byte string buffer.
* Use [[poll]] to extract contained JSON objects.
*/
def offer(input: ByteString): Unit =
buffer ++= input
def isEmpty: Boolean = buffer.isEmpty
/**
* Attempt to locate next complete JSON object in buffered ByteString and returns `Some(it)` if found.
* May throw a [[akka.stream.scaladsl.Framing.FramingException]] if the contained JSON is invalid or max object size is exceeded.
*/
def poll(): Option[ByteString] = {
val foundObject = seekObject()
if (!foundObject) None
else
(pos: @switch) match {
case -1 | 0 None
case _
val (emit, buf) = buffer.splitAt(pos)
buffer = buf.compact
pos = 0
val tf = trimFront
trimFront = 0
if (tf == 0) Some(emit)
else {
val trimmed = emit.drop(tf)
if (trimmed.isEmpty) None
else Some(trimmed)
}
}
}
/** @return true if an entire valid JSON object was found, false otherwise */
private def seekObject(): Boolean = {
completedObject = false
val bufSize = buffer.size
while (pos != -1 && (pos < bufSize && pos < maximumObjectLength) && !completedObject)
proceed(buffer(pos))
if (pos >= maximumObjectLength)
throw new FramingException(s"""JSON element exceeded maximumObjectLength ($maximumObjectLength bytes)!""")
completedObject
}
private def proceed(input: Byte): Unit =
if (input == SquareBraceStart && outsideObject) {
// outer object is an array
pos += 1
trimFront += 1
} else if (input == SquareBraceEnd && outsideObject) {
// outer array completed!
pos = -1
} else if (input == Comma && outsideObject) {
// do nothing
pos += 1
trimFront += 1
} else if (input == Backslash) {
isStartOfEscapeSequence = true
pos += 1
} else if (input == DoubleQuote) {
if (!isStartOfEscapeSequence) inStringExpression = !inStringExpression
isStartOfEscapeSequence = false
pos += 1
} else if (input == CurlyBraceStart && !inStringExpression) {
isStartOfEscapeSequence = false
depth += 1
pos += 1
} else if (input == CurlyBraceEnd && !inStringExpression) {
isStartOfEscapeSequence = false
depth -= 1
pos += 1
if (depth == 0) {
charsInObject = 0
completedObject = true
}
} else if (isWhitespace(input) && !inStringExpression) {
pos += 1
if (depth == 0) trimFront += 1
} else if (insideObject) {
isStartOfEscapeSequence = false
pos += 1
} else {
throw new FramingException(s"Invalid JSON encountered at position [$pos] of [$buffer]")
}
@inline private final def insideObject: Boolean =
!outsideObject
@inline private final def outsideObject: Boolean =
depth == 0
}

View file

@ -0,0 +1,40 @@
/**
* Copyright (C) 2015-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.javadsl
import akka.NotUsed
import akka.util.ByteString
/** Provides JSON framing stages that can separate valid JSON objects from incoming [[akka.util.ByteString]] objects. */
object JsonFraming {
/**
* Returns a Flow that implements a "brace counting" based framing stage for emitting valid JSON chunks.
*
* Typical examples of data that one may want to frame using this stage include:
*
* **Very large arrays**:
* {{{
* [{"id": 1}, {"id": 2}, [...], {"id": 999}]
* }}}
*
* **Multiple concatenated JSON objects** (with, or without commas between them):
*
* {{{
* {"id": 1}, {"id": 2}, [...], {"id": 999}
* }}}
*
* The framing works independently of formatting, i.e. it will still emit valid JSON elements even if two
* elements are separated by multiple newlines or other whitespace characters. And of course is insensitive
* (and does not impact the emitting frame) to the JSON object's internal formatting.
*
* Framing raw JSON values (such as integers or strings) is supported as well.
*
* @param maximumObjectLength The maximum length of allowed frames while decoding. If the maximum length is exceeded
* this Flow will fail the stream.
*/
def objectScanner(maximumObjectLength: Int): Flow[ByteString, ByteString, NotUsed] =
akka.stream.scaladsl.JsonFraming.objectScanner(maximumObjectLength).asJava
}

View file

@ -0,0 +1,77 @@
/**
* Copyright (C) 2015-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.NotUsed
import akka.stream.Attributes
import akka.stream.impl.JsonObjectParser
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.stage.{ InHandler, OutHandler, GraphStageLogic }
import akka.util.ByteString
import scala.util.control.NonFatal
/** Provides JSON framing stages that can separate valid JSON objects from incoming [[ByteString]] objects. */
object JsonFraming {
/**
* Returns a Flow that implements a "brace counting" based framing stage for emitting valid JSON chunks.
* It scans the incoming data stream for valid JSON objects and returns chunks of ByteStrings containing only those valid chunks.
*
* Typical examples of data that one may want to frame using this stage include:
*
* **Very large arrays**:
* {{{
* [{"id": 1}, {"id": 2}, [...], {"id": 999}]
* }}}
*
* **Multiple concatenated JSON objects** (with, or without commas between them):
*
* {{{
* {"id": 1}, {"id": 2}, [...], {"id": 999}
* }}}
*
* The framing works independently of formatting, i.e. it will still emit valid JSON elements even if two
* elements are separated by multiple newlines or other whitespace characters. And of course is insensitive
* (and does not impact the emitting frame) to the JSON object's internal formatting.
*
* Framing raw JSON values (such as integers or strings) is supported as well.
*
* @param maximumObjectLength The maximum length of allowed frames while decoding. If the maximum length is exceeded
* this Flow will fail the stream.
*/
def objectScanner(maximumObjectLength: Int): Flow[ByteString, ByteString, NotUsed] =
Flow[ByteString].via(new SimpleLinearGraphStage[ByteString] {
private[this] val buffer = new JsonObjectParser(maximumObjectLength)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler {
setHandlers(in, out, this)
override def onPush(): Unit = {
buffer.offer(grab(in))
tryPopBuffer()
}
override def onPull(): Unit =
tryPopBuffer()
override def onUpstreamFinish(): Unit = {
try buffer.poll() match {
case Some(json) emit(out, json, () completeStage())
case _ completeStage()
}
}
def tryPopBuffer() = {
try buffer.poll() match {
case Some(json) push(out, json)
case _ if (isClosed(in)) completeStage() else pull(in)
} catch {
case NonFatal(ex) failStage(ex)
}
}
}
}).named("JsonFraming.objectScanner")
}