136 lines
7.7 KiB
ReStructuredText
136 lines
7.7 KiB
ReStructuredText
.. _json-streaming-scala:
|
|
|
|
Source Streaming
|
|
================
|
|
|
|
Akka HTTP supports completing a request with an Akka ``Source[T, _]``, which makes it possible to easily build
|
|
and consume streaming end-to-end APIs which apply back-pressure throughout the entire stack.
|
|
|
|
It is possible to complete requests with raw ``Source[ByteString, _]``, however often it is more convenient to
|
|
stream on an element-by-element basis, and allow Akka HTTP to handle the rendering internally - for example as a JSON array,
|
|
or CSV stream (where each element is separated by a new-line).
|
|
|
|
In the following sections we investigate how to make use of the JSON Streaming infrastructure,
|
|
however the general hints apply to any kind of element-by-element streaming you could imagine.
|
|
|
|
JSON Streaming
|
|
==============
|
|
|
|
`JSON Streaming`_ is a term refering to streaming a (possibly infinite) stream of element as independent JSON
|
|
objects as a continuous HTTP request or response. The elements are most often separated using newlines,
|
|
however do not have to be. Concatenating elements side-by-side or emitting "very long" JSON array is also another
|
|
use case.
|
|
|
|
In the below examples, we'll be refering to the ``Tweet`` and ``Measurement`` case classes as our model, which are defined as:
|
|
|
|
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
|
|
:snippet: models
|
|
|
|
And as always with spray-json, we provide our (Un)Marshaller instances as implicit values using the ``jsonFormat##``
|
|
method to generate them statically:
|
|
|
|
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
|
|
:snippet: formats
|
|
|
|
.. _Json Streaming: https://en.wikipedia.org/wiki/JSON_Streaming
|
|
|
|
Responding with JSON Streams
|
|
----------------------------
|
|
|
|
In this example we implement an API representing an infinite stream of tweets, very much like Twitter's `Streaming API`_.
|
|
|
|
Firstly, we'll need to get some additional marshalling infrastructure set up, that is able to marshal to and from an
|
|
Akka Streams ``Source[T,_]``. One such trait, containing the needed marshallers is ``SprayJsonSupport``, which uses
|
|
spray-json (a high performance json parser library), and is shipped as part of Akka HTTP in the
|
|
``akka-http-spray-json-experimental`` module.
|
|
|
|
Once the general infrastructure is prepared we import our model's marshallers, generated by spray-json (Step 1),
|
|
and enable JSON Streaming by making an implicit ``EntityStreamingSupport`` instance available (Step 2).
|
|
Akka HTTP pre-packages JSON and CSV entity streaming support, however it is simple to add your own, in case you'd
|
|
like to stream a different content type (for example plists or protobuf).
|
|
|
|
The final step is simply completing a request using a Source of tweets, as simple as that:
|
|
|
|
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
|
|
:snippet: spray-json-response-streaming
|
|
|
|
The reason the ``EntityStreamingSupport`` has to be enabled explicitly is that one might want to configure how the
|
|
stream should be rendered. We'll dicuss this in depth in the next section though.
|
|
|
|
.. _Streaming API: https://dev.twitter.com/streaming/overview
|
|
|
|
Customising response rendering mode
|
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
|
Since it is not always possible to directly and confidently answer the question of how a stream of ``T`` should look on
|
|
the wire, the ``EntityStreamingSupport`` traits come into play and allow fine-tuning the streams rendered representation.
|
|
|
|
For example, in case of JSON Streaming, there isn't really one standard about rendering the response. Some APIs prefer
|
|
to render multiple JSON objects in a line-by-line fashion (Twitter's streaming APIs for example), while others simply return
|
|
very large arrays, which could be streamed as well.
|
|
|
|
Akka defaults to the second one (streaming a JSON Array), as it is correct JSON and clients not expecting
|
|
a streaming API would still be able to consume it in a naive way if they'd want to.
|
|
|
|
The line-by-line aproach however is also pretty popular even though it is not valid JSON. It's relatively simplicity for
|
|
client-side parsing is a strong point in case to pick this format for your Streaming APIs.
|
|
Below we demonstrate how to reconfigure the support trait to render the JSON as
|
|
|
|
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
|
|
:snippet: line-by-line-json-response-streaming
|
|
|
|
Another interesting feature is parallel marshalling. Since marshalling can potentially take much time,
|
|
it is possible to marshal multiple elements of the stream in parallel. This is simply a configuration
|
|
option on ``EntityStreamingSupport`` and is configurable like this:
|
|
|
|
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
|
|
:snippet: async-rendering
|
|
|
|
The above shown mode perserves ordering of the Source's elements, which may sometimes be a required property,
|
|
for example when streaming a strictly ordered dataset. Sometimes the contept of strict-order does not apply to the
|
|
data being streamed though, which allows us to exploit this property and use an ``unordered`` rendering.
|
|
|
|
This also is a configuration option and is used as shown below. Effectively this will allow Akka's marshalling infrastructure
|
|
to concurrently marshallup to ``parallelism`` elements and emit the first which is marshalled onto the ``HttpResponse``:
|
|
|
|
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
|
|
:snippet: async-unordered-rendering
|
|
|
|
This allows us to _potentially_ render elements faster onto the HttpResponse, since it can avoid "head of line blocking",
|
|
in case one element in front of the stream takes a long time to marshall, yet others after it are very quick to marshall.
|
|
|
|
Consuming JSON Streaming uploads
|
|
--------------------------------
|
|
|
|
Sometimes the client may be sending a streaming request, for example an embedded device initiated a connection with
|
|
the server and is feeding it with one line of measurement data.
|
|
|
|
In this example, we want to consume this data in a streaming fashion from the request entity, and also apply
|
|
back-pressure to the underlying TCP connection, if the server can not cope with the rate of incoming data (back-pressure
|
|
will be applied automatically thanks to using Akka HTTP/Streams).
|
|
|
|
|
|
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
|
|
:snippet: spray-json-request-streaming
|
|
|
|
Simple CSV streaming example
|
|
----------------------------
|
|
|
|
Akka HTTP provides another ``EntityStreamingSupport`` out of the box, namely ``csv`` (comma-separated values).
|
|
For completeness, we demonstrate its usage in the below snippet. As you'll notice, switching betweeen streaming
|
|
modes is fairly simple, one only has to make sure that an implicit ``Marshaller`` of the requested type is available,
|
|
and that the streaming support operates on the same ``Content-Type`` as the rendered values. Otherwise you'll see
|
|
an error during runtime that the marshaller did not expose the expected content type and thus we can not render
|
|
the streaming response).
|
|
|
|
.. includecode2:: ../../code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala
|
|
:snippet: csv-example
|
|
|
|
Implementing custom EntityStreamingSupport traits
|
|
-------------------------------------------------
|
|
|
|
The ``EntityStreamingSupport`` infrastructure is open for extension and not bound to any single format, content type
|
|
or marshalling library. The provided JSON support does not rely on Spray JSON directly, but uses ``Marshaller[T, ByteString]``
|
|
instances, which can be provided using any JSON marshalling library (such as Circe, Jawn or Play JSON).
|
|
|
|
When implementing a custom support trait, one should simply extend the ``EntityStreamingSupport`` abstract class,
|
|
and implement all of it's methods. It's best to use the existing implementations as a guideline.
|