+htp #18837 JSON framing and framed entity streaming directives
This commit is contained in:
parent
8e71346295
commit
24454f7f09
14 changed files with 1423 additions and 3 deletions
|
|
@ -0,0 +1,150 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package docs.http.scaladsl.server.directives
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.http.scaladsl.marshalling.ToResponseMarshallable
|
||||
import akka.http.scaladsl.model._
|
||||
import akka.http.scaladsl.model.headers.Accept
|
||||
import akka.http.scaladsl.server.{ UnsupportedRequestContentTypeRejection, UnacceptedResponseContentTypeRejection, JsonSourceRenderingMode }
|
||||
import akka.stream.scaladsl.{ Flow, Source }
|
||||
import docs.http.scaladsl.server.RoutingSpec
|
||||
import spray.json.{ JsValue, JsObject, DefaultJsonProtocol }
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
class JsonStreamingExamplesSpec extends RoutingSpec {
|
||||
|
||||
//#models
|
||||
case class Tweet(uid: Int, txt: String)
|
||||
case class Measurement(id: String, value: Int)
|
||||
//#
|
||||
|
||||
def getTweets() =
|
||||
Source(List(
|
||||
Tweet(1, "#Akka rocks!"),
|
||||
Tweet(2, "Streaming is so hot right now!"),
|
||||
Tweet(3, "You cannot enter the same river twice.")))
|
||||
|
||||
//#formats
|
||||
object MyJsonProtocol extends spray.json.DefaultJsonProtocol {
|
||||
implicit val userFormat = jsonFormat2(Tweet.apply)
|
||||
implicit val measurementFormat = jsonFormat2(Measurement.apply)
|
||||
}
|
||||
//#
|
||||
|
||||
"spray-json-response-streaming" in {
|
||||
// [1] import generic spray-json marshallers support:
|
||||
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
|
||||
|
||||
// [2] import "my protocol", for marshalling Tweet objects:
|
||||
import MyJsonProtocol._
|
||||
|
||||
// [3] pick json rendering mode:
|
||||
implicit val jsonRenderingMode = JsonSourceRenderingMode.LineByLine
|
||||
|
||||
val route =
|
||||
path("users") {
|
||||
val users: Source[Tweet, NotUsed] = getTweets()
|
||||
complete(ToResponseMarshallable(users))
|
||||
}
|
||||
|
||||
// tests:
|
||||
val AcceptJson = Accept(MediaRange(MediaTypes.`application/json`))
|
||||
val AcceptXml = Accept(MediaRange(MediaTypes.`text/xml`))
|
||||
|
||||
Get("/users").withHeaders(AcceptJson) ~> route ~> check {
|
||||
responseAs[String] shouldEqual
|
||||
"""{"uid":1,"txt":"#Akka rocks!"}""" + "\n" +
|
||||
"""{"uid":2,"txt":"Streaming is so hot right now!"}""" + "\n" +
|
||||
"""{"uid":3,"txt":"You cannot enter the same river twice."}"""
|
||||
}
|
||||
|
||||
// endpoint can only marshal Json, so it will *reject* requests for application/xml:
|
||||
Get("/users").withHeaders(AcceptXml) ~> route ~> check {
|
||||
handled should ===(false)
|
||||
rejection should ===(UnacceptedResponseContentTypeRejection(Set(ContentTypes.`application/json`)))
|
||||
}
|
||||
}
|
||||
|
||||
"response-streaming-modes" in {
|
||||
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
|
||||
import MyJsonProtocol._
|
||||
implicit val jsonRenderingMode = JsonSourceRenderingMode.LineByLine
|
||||
|
||||
//#async-rendering
|
||||
path("users") {
|
||||
val users: Source[Tweet, NotUsed] = getTweets()
|
||||
complete(users.renderAsync(parallelism = 8))
|
||||
}
|
||||
//#
|
||||
|
||||
//#async-unordered-rendering
|
||||
path("users" / "unordered") {
|
||||
val users: Source[Tweet, NotUsed] = getTweets()
|
||||
complete(users.renderAsyncUnordered(parallelism = 8))
|
||||
}
|
||||
//#
|
||||
}
|
||||
|
||||
"spray-json-request-streaming" in {
|
||||
// [1] import generic spray-json (un)marshallers support:
|
||||
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
|
||||
|
||||
// [1.1] import framing mode
|
||||
implicit val jsonFramingMode = akka.http.scaladsl.server.JsonEntityFramingSupport.bracketCountingJsonFraming(Int.MaxValue)
|
||||
|
||||
// [2] import "my protocol", for unmarshalling Measurement objects:
|
||||
import MyJsonProtocol._
|
||||
|
||||
// [3] prepareyour persisting logic here
|
||||
val persistMetrics = Flow[Measurement]
|
||||
|
||||
val route =
|
||||
path("metrics") {
|
||||
// [4] extract Source[Measurement, _]
|
||||
entity(stream[Measurement]) { measurements =>
|
||||
println("measurements = " + measurements)
|
||||
val measurementsSubmitted: Future[Int] =
|
||||
measurements
|
||||
.via(persistMetrics)
|
||||
.runFold(0) { (cnt, _) =>
|
||||
println("cnt = " + cnt)
|
||||
cnt + 1
|
||||
}
|
||||
|
||||
complete {
|
||||
measurementsSubmitted.map(n => Map("msg" -> s"""Total metrics received: $n"""))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// tests:
|
||||
val data = HttpEntity(
|
||||
ContentTypes.`application/json`,
|
||||
"""
|
||||
|{"id":"temp","value":32}
|
||||
|{"id":"temp","value":31}
|
||||
|
|
||||
""".stripMargin)
|
||||
|
||||
Post("/metrics", entity = data) ~> route ~> check {
|
||||
status should ===(StatusCodes.OK)
|
||||
responseAs[String] should ===("""{"msg":"Total metrics received: 2"}""")
|
||||
}
|
||||
|
||||
// the FramingWithContentType will reject any content type that it does not understand:
|
||||
val xmlData = HttpEntity(
|
||||
ContentTypes.`text/xml(UTF-8)`,
|
||||
"""|<data id="temp" value="32"/>
|
||||
|<data id="temp" value="31"/>""".stripMargin)
|
||||
|
||||
Post("/metrics", entity = xmlData) ~> route ~> check {
|
||||
handled should ===(false)
|
||||
rejection should ===(UnsupportedRequestContentTypeRejection(Set(ContentTypes.`application/json`)))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,96 @@
|
|||
.. _json-streaming-scala:
|
||||
|
||||
JSON Streaming
|
||||
==============
|
||||
|
||||
`JSON Streaming`_ is a term refering to streaming a (possibly infinite) stream of element as independent JSON
|
||||
objects onto one continious HTTP connection. The elements are most often separated using newlines,
|
||||
however do not have to be and concatenating elements side-by-side or emitting "very long" JSON array is also another
|
||||
use case.
|
||||
|
||||
In the below examples, we'll be refering to the ``User`` and ``Measurement`` case classes as our model, which are defined as:
|
||||
|
||||
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
|
||||
:snippet: models
|
||||
|
||||
And as always with spray-json, we provide our (Un)Marshaller instances as implicit values uding the ``jsonFormat##``
|
||||
method to generate them statically:
|
||||
|
||||
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
|
||||
:snippet: formats
|
||||
|
||||
.. _Json Streaming: https://en.wikipedia.org/wiki/JSON_Streaming
|
||||
|
||||
Responding with JSON Streams
|
||||
----------------------------
|
||||
|
||||
In this example we implement an API representing an infinite stream of tweets, very much like Twitter's `Streaming API`_.
|
||||
|
||||
Firstly, we'll need to get some additional marshalling infrastructure set up, that is able to marshal to and from an
|
||||
Akka Streams ``Source[T,_]``. One such trait, containing the needed marshallers is ``SprayJsonSupport``, which uses
|
||||
spray-json (a high performance json parser library), and is shipped as part of Akka HTTP in the
|
||||
``akka-http-spray-json-experimental`` module.
|
||||
to and from ``Source[T,_]`` by using spray-json provided
|
||||
|
||||
Next we import our model's marshallers, generated by spray-json.
|
||||
|
||||
The last bit of setup, before we can render a streaming json response
|
||||
|
||||
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
|
||||
:snippet: spray-json-response-streaming
|
||||
|
||||
.. _Streaming API: https://dev.twitter.com/streaming/overview
|
||||
|
||||
Customising response rendering mode
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
The mode in which a response is marshalled and then rendered to the HttpResponse from the provided ``Source[T,_]``
|
||||
is customisable (thanks to conversions originating from ``Directives`` via ``EntityStreamingDirectives``).
|
||||
|
||||
Since Marshalling is a potentially asynchronous operation in Akka HTTP (because transforming ``T`` to ``JsValue`` may
|
||||
potentially take a long time (depending on your definition of "long time"), we allow to run marshalling concurrently
|
||||
(up to ``parallelism`` concurrent marshallings) by using the ``renderAsync(parallelism)`` mode:
|
||||
|
||||
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
|
||||
:snippet: async-rendering
|
||||
|
||||
The ``renderAsync`` mode perserves ordering of the Source's elements, which may sometimes be a required property,
|
||||
for example when streaming a strictly ordered dataset. Sometimes the contept of strict-order does not apply to the
|
||||
data being streamed though, which allows us to explit this property and use ``renderAsyncUnordered(parallelism)``,
|
||||
which will concurrently marshall up to ``parallelism`` elements and emit the first which is marshalled onto
|
||||
the HttpResponse:
|
||||
|
||||
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
|
||||
:snippet: async-unordered-rendering
|
||||
|
||||
This allows us to _potentially_ render elements faster onto the HttpResponse, since it can avoid "head of line blocking",
|
||||
in case one element in front of the stream takes a long time to marshall, yet others after it are very quick to marshall.
|
||||
|
||||
Consuming JSON Streaming uploads
|
||||
--------------------------------
|
||||
|
||||
Sometimes the client may be sending in a streaming request, for example an embedded device initiated a connection with
|
||||
the server and is feeding it with one line of measurement data.
|
||||
|
||||
In this example, we want to consume this data in a streaming fashion from the request entity, and also apply
|
||||
back-pressure to the underlying TCP connection, if the server can not cope with the rate of incoming data (back-pressure
|
||||
will be applied automatically thanks to using Akka HTTP/Streams).
|
||||
|
||||
|
||||
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
|
||||
:snippet: spray-json-request-streaming
|
||||
|
||||
Implementing custom (Un)Marshaller support for JSON streaming
|
||||
-------------------------------------------------------------
|
||||
|
||||
While not provided by Akka HTTP directly, the infrastructure is extensible and by investigating how ``SprayJsonSupport``
|
||||
is implemented it is certainly possible to provide the same infrastructure for other marshaller implementations (such as
|
||||
Play JSON, or Jackson directly for example). Such support traits will want to extend the ``JsonEntityStreamingSupport`` trait.
|
||||
|
||||
The following types that may need to be implemented by a custom framed-streaming support library are:
|
||||
|
||||
- ``SourceRenderingMode`` which can customise how to render the begining / between-elements and ending of such stream (while writing a response, i.e. by calling ``complete(source)``).
|
||||
Implementations for JSON are available in ``akka.http.scaladsl.server.JsonSourceRenderingMode``.
|
||||
- ``FramingWithContentType`` which is needed to be able to split incoming ``ByteString`` chunks into frames
|
||||
of the higher-level data type format that is understood by the provided unmarshallers.
|
||||
In the case of JSON it means chunking up ByteStrings such that each emitted element corresponds to exactly one JSON object,
|
||||
this framing is implemented in ``JsonEntityStreamingSupport``.
|
||||
Loading…
Add table
Add a link
Reference in a new issue