=htp address review feedback on JSON streaming

This commit is contained in:
Konrad Malawski 2016-07-29 16:29:50 +02:00
parent bc536be32c
commit 6562ddd2df
13 changed files with 114 additions and 22 deletions

View file

@ -3,7 +3,7 @@
Source Streaming Source Streaming
================ ================
Akka HTTP supports completing a request with an Akka ``Source<T, ?>``, which makes it possible to very easily build Akka HTTP supports completing a request with an Akka ``Source<T, ?>``, which makes it possible to easily build
streaming end-to-end APIs which apply back-pressure throughout the entire stack. streaming end-to-end APIs which apply back-pressure throughout the entire stack.
It is possible to complete requests with raw ``Source<ByteString, ?>``, however often it is more convenient to It is possible to complete requests with raw ``Source<ByteString, ?>``, however often it is more convenient to

View file

@ -25,7 +25,7 @@ lies in interfacing between private sphere and the public, but you dont want
that many doors inside your house, do you? For a longer discussion see `this that many doors inside your house, do you? For a longer discussion see `this
blog post <http://letitcrash.com/post/19074284309/when-to-use-typedactors>`_. blog post <http://letitcrash.com/post/19074284309/when-to-use-typedactors>`_.
A bit more background: TypedActors can very easily be abused as RPC, and that A bit more background: TypedActors can easily be abused as RPC, and that
is an abstraction which is `well-known is an abstraction which is `well-known
<http://doc.akka.io/docs/misc/smli_tr-94-29.pdf>`_ <http://doc.akka.io/docs/misc/smli_tr-94-29.pdf>`_
to be leaky. Hence TypedActors are not what we think of first when we talk to be leaky. Hence TypedActors are not what we think of first when we talk

View file

@ -45,13 +45,12 @@ class JsonStreamingExamplesSpec extends RoutingSpec {
// [3] pick json rendering mode: // [3] pick json rendering mode:
// HINT: if you extend `akka.http.scaladsl.server.EntityStreamingSupport` // HINT: if you extend `akka.http.scaladsl.server.EntityStreamingSupport`
// it'll guide you to do so via abstract defs // it'll guide you to do so via abstract defs
val maximumObjectLength = 128
implicit val jsonRenderingMode = JsonSourceRenderingModes.LineByLine implicit val jsonRenderingMode = JsonSourceRenderingModes.LineByLine
val route = val route =
path("tweets") { path("tweets") {
val tweets: Source[Tweet, NotUsed] = getTweets() val tweets: Source[Tweet, NotUsed] = getTweets()
complete(ToResponseMarshallable(tweets)) complete(tweets)
} }
// tests: // tests:
@ -104,7 +103,7 @@ class JsonStreamingExamplesSpec extends RoutingSpec {
// [2] import "my protocol", for unmarshalling Measurement objects: // [2] import "my protocol", for unmarshalling Measurement objects:
import MyJsonProtocol._ import MyJsonProtocol._
// [3] prepareyour persisting logic here // [3] prepare your persisting logic here
val persistMetrics = Flow[Measurement] val persistMetrics = Flow[Measurement]
val route = val route =

View file

@ -224,4 +224,4 @@ When you combine directives producing extractions with the ``&`` operator all ex
Directives offer a great way of constructing your web service logic from small building blocks in a plug and play Directives offer a great way of constructing your web service logic from small building blocks in a plug and play
fashion while maintaining DRYness and full type-safety. If the large range of :ref:`Predefined Directives` does not fashion while maintaining DRYness and full type-safety. If the large range of :ref:`Predefined Directives` does not
fully satisfy your needs you can also very easily create :ref:`Custom Directives`. fully satisfy your needs you can also easily create :ref:`Custom Directives`.

View file

@ -3,7 +3,7 @@
Source Streaming Source Streaming
================ ================
Akka HTTP supports completing a request with an Akka ``Source[T, _]``, which makes it possible to very easily build Akka HTTP supports completing a request with an Akka ``Source[T, _]``, which makes it possible to easily build
streaming end-to-end APIs which apply back-pressure throughout the entire stack. streaming end-to-end APIs which apply back-pressure throughout the entire stack.
It is possible to complete requests with raw ``Source[ByteString, _]``, however often it is more convenient to It is possible to complete requests with raw ``Source[ByteString, _]``, however often it is more convenient to
@ -99,7 +99,7 @@ Implementing custom (Un)Marshaller support for JSON streaming
While not provided by Akka HTTP directly, the infrastructure is extensible and by investigating how ``SprayJsonSupport`` While not provided by Akka HTTP directly, the infrastructure is extensible and by investigating how ``SprayJsonSupport``
is implemented it is certainly possible to provide the same infrastructure for other marshaller implementations (such as is implemented it is certainly possible to provide the same infrastructure for other marshaller implementations (such as
Play JSON, or Jackson directly for example). Such support traits will want to extend the ``JsonEntityStreamingSupport`` trait. Play JSON, or Jackson directly for example). Such support traits will want to extend the ``EntityStreamingSupport`` trait.
The following types that may need to be implemented by a custom framed-streaming support library are: The following types that may need to be implemented by a custom framed-streaming support library are:
@ -108,4 +108,4 @@ The following types that may need to be implemented by a custom framed-streaming
- ``FramingWithContentType`` which is needed to be able to split incoming ``ByteString`` chunks into frames - ``FramingWithContentType`` which is needed to be able to split incoming ``ByteString`` chunks into frames
of the higher-level data type format that is understood by the provided unmarshallers. of the higher-level data type format that is understood by the provided unmarshallers.
In the case of JSON it means chunking up ByteStrings such that each emitted element corresponds to exactly one JSON object, In the case of JSON it means chunking up ByteStrings such that each emitted element corresponds to exactly one JSON object,
this framing is implemented in ``JsonEntityStreamingSupport``. this framing is implemented in ``EntityStreamingSupport``.

View file

@ -35,7 +35,7 @@ lies in interfacing between private sphere and the public, but you dont want
that many doors inside your house, do you? For a longer discussion see `this that many doors inside your house, do you? For a longer discussion see `this
blog post <http://letitcrash.com/post/19074284309/when-to-use-typedactors>`_. blog post <http://letitcrash.com/post/19074284309/when-to-use-typedactors>`_.
A bit more background: TypedActors can very easily be abused as RPC, and that A bit more background: TypedActors can easily be abused as RPC, and that
is an abstraction which is `well-known is an abstraction which is `well-known
<http://doc.akka.io/docs/misc/smli_tr-94-29.pdf>`_ <http://doc.akka.io/docs/misc/smli_tr-94-29.pdf>`_
to be leaky. Hence TypedActors are not what we think of first when we talk to be leaky. Hence TypedActors are not what we think of first when we talk

View file

@ -0,0 +1,79 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.scaladsl.marshallers.sprayjson
import java.nio.{ ByteBuffer, CharBuffer }
import java.nio.charset.{ Charset, StandardCharsets }
import akka.util.ByteString
import spray.json.ParserInput.DefaultParserInput
import scala.annotation.tailrec
/**
* ParserInput reading directly off a ByteString. (Based on the ByteArrayBasedParserInput)
* This avoids a separate decoding step but assumes that each byte represents exactly one character,
* which is encoded by ISO-8859-1!
* You can therefore use this ParserInput type only if you know that all input will be `ISO-8859-1`-encoded,
* or only contains 7-bit ASCII characters (which is a subset of ISO-8859-1)!
*
* Note that this ParserInput type will NOT work with general `UTF-8`-encoded input as this can contain
* character representations spanning multiple bytes. However, if you know that your input will only ever contain
* 7-bit ASCII characters (0x00-0x7F) then UTF-8 is fine, since the first 127 UTF-8 characters are
* encoded with only one byte that is identical to 7-bit ASCII and ISO-8859-1.
*/
final class SprayJsonByteStringParserInput(bytes: ByteString) extends DefaultParserInput {
import SprayJsonByteStringParserInput._
private[this] val byteBuffer = ByteBuffer.allocate(4)
private[this] val charBuffer = CharBuffer.allocate(1)
private[this] val decoder = Charset.forName("UTF-8").newDecoder()
override def nextChar() = {
_cursor += 1
if (_cursor < bytes.length) (bytes(_cursor) & 0xFF).toChar else EOI
}
override def nextUtf8Char() = {
@tailrec def decode(byte: Byte, remainingBytes: Int): Char = {
byteBuffer.put(byte)
if (remainingBytes > 0) {
_cursor += 1
if (_cursor < bytes.length) decode(bytes(_cursor), remainingBytes - 1) else ErrorChar
} else {
byteBuffer.flip()
val coderResult = decoder.decode(byteBuffer, charBuffer, false)
charBuffer.flip()
val result = if (coderResult.isUnderflow & charBuffer.hasRemaining) charBuffer.get() else ErrorChar
byteBuffer.clear()
charBuffer.clear()
result
}
}
_cursor += 1
if (_cursor < bytes.length) {
val byte = bytes(_cursor)
if (byte >= 0) byte.toChar // 7-Bit ASCII
else if ((byte & 0xE0) == 0xC0) decode(byte, 1) // 2-byte UTF-8 sequence
else if ((byte & 0xF0) == 0xE0) decode(byte, 2) // 3-byte UTF-8 sequence
else if ((byte & 0xF8) == 0xF0) decode(byte, 3) // 4-byte UTF-8 sequence, will probably produce an (unsupported) surrogate pair
else ErrorChar
} else EOI
}
override def length: Int = bytes.size
override def sliceString(start: Int, end: Int): String =
bytes.slice(start, end - start).decodeString(StandardCharsets.ISO_8859_1)
override def sliceCharArray(start: Int, end: Int): Array[Char] =
StandardCharsets.ISO_8859_1.decode(bytes.slice(start, end).asByteBuffer).array()
}
object SprayJsonByteStringParserInput {
private final val EOI = '\uFFFF'
// compile-time constant
private final val ErrorChar = '\uFFFD' // compile-time constant, universal UTF-8 replacement character '<27>'
}

View file

@ -4,15 +4,15 @@
package akka.http.scaladsl.marshallers.sprayjson package akka.http.scaladsl.marshallers.sprayjson
import akka.http.scaladsl.marshalling.{ Marshaller, ToByteStringMarshaller, ToEntityMarshaller }
import akka.http.scaladsl.model.MediaTypes.`application/json`
import akka.http.scaladsl.model.{ HttpCharsets, MediaTypes }
import akka.http.scaladsl.unmarshalling.{ FromEntityUnmarshaller, Unmarshaller }
import akka.http.scaladsl.util.FastFuture import akka.http.scaladsl.util.FastFuture
import akka.util.ByteString import akka.util.ByteString
import spray.json._
import scala.language.implicitConversions import scala.language.implicitConversions
import akka.http.scaladsl.marshalling.{Marshaller, ToByteStringMarshaller, ToEntityMarshaller}
import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller}
import akka.http.scaladsl.model.{ContentTypes, HttpCharsets, MediaTypes}
import akka.http.scaladsl.model.MediaTypes.`application/json`
import spray.json._
/** /**
* A trait providing automatic to and from JSON marshalling/unmarshalling using an in-scope *spray-json* protocol. * A trait providing automatic to and from JSON marshalling/unmarshalling using an in-scope *spray-json* protocol.
@ -24,7 +24,11 @@ trait SprayJsonSupport {
sprayJsValueUnmarshaller.map(jsonReader[T].read) sprayJsValueUnmarshaller.map(jsonReader[T].read)
implicit def sprayJsonByteStringUnmarshaller[T](implicit reader: RootJsonReader[T]): Unmarshaller[ByteString, T] = implicit def sprayJsonByteStringUnmarshaller[T](implicit reader: RootJsonReader[T]): Unmarshaller[ByteString, T] =
Unmarshaller.withMaterializer[ByteString, JsValue](_ implicit mat { bs Unmarshaller.withMaterializer[ByteString, JsValue](_ implicit mat { bs
FastFuture.successful(JsonParser(bs.toArray[Byte])) // .compact so addressing into any address is very fast (also for large chunks)
// TODO we could optimise ByteStrings to better handle lienear access like this (or provide ByteStrings.linearAccessOptimised)
// TODO IF it's worth it.
val parserInput = new SprayJsonByteStringParserInput(bs.compact)
FastFuture.successful(JsonParser(parserInput))
}).map(jsonReader[T].read) }).map(jsonReader[T].read)
implicit def sprayJsValueUnmarshaller: FromEntityUnmarshaller[JsValue] = implicit def sprayJsValueUnmarshaller: FromEntityUnmarshaller[JsValue] =
Unmarshaller.byteStringUnmarshaller.forContentTypes(`application/json`).mapWithCharset { (data, charset) Unmarshaller.byteStringUnmarshaller.forContentTypes(`application/json`).mapWithCharset { (data, charset)

View file

@ -86,11 +86,13 @@ object TestServer extends App {
(path("tweets") & parameter('n.as[Int])) { n => (path("tweets") & parameter('n.as[Int])) { n =>
get { get {
val tweets = Source.repeat(Tweet("Hello, world!")).take(n) val tweets = Source.repeat(Tweet("Hello, world!")).take(n)
complete(ToResponseMarshallable(tweets)) complete(tweets)
} ~ } ~
post { post {
entity(as[Source[Tweet, NotUsed]]) { tweets entity(as[Source[Tweet, NotUsed]]) { tweets
complete(s"Total tweets received: " + tweets.runFold(0)({ case (acc, t) => acc + 1 })) onComplete(tweets.runFold(0)({ case (acc, t) => acc + 1 })) { count =>
complete(s"Total tweets received: " + count)
}
} }
} }
} }

View file

@ -23,6 +23,8 @@ abstract class FutureDirectives extends FormFieldDirectives {
/** /**
* "Unwraps" a `CompletionStage<T>` and runs the inner route after future * "Unwraps" a `CompletionStage<T>` and runs the inner route after future
* completion with the future's value as an extraction of type `Try<T>`. * completion with the future's value as an extraction of type `Try<T>`.
*
* @group future
*/ */
def onComplete[T](f: Supplier[CompletionStage[T]], inner: JFunction[Try[T], Route]) = RouteAdapter { def onComplete[T](f: Supplier[CompletionStage[T]], inner: JFunction[Try[T], Route]) = RouteAdapter {
D.onComplete(f.get.toScala.recover(unwrapCompletionException)) { value D.onComplete(f.get.toScala.recover(unwrapCompletionException)) { value
@ -33,6 +35,8 @@ abstract class FutureDirectives extends FormFieldDirectives {
/** /**
* "Unwraps" a `CompletionStage<T>` and runs the inner route after future * "Unwraps" a `CompletionStage<T>` and runs the inner route after future
* completion with the future's value as an extraction of type `Try<T>`. * completion with the future's value as an extraction of type `Try<T>`.
*
* @group future
*/ */
def onComplete[T](cs: CompletionStage[T], inner: JFunction[Try[T], Route]) = RouteAdapter { def onComplete[T](cs: CompletionStage[T], inner: JFunction[Try[T], Route]) = RouteAdapter {
D.onComplete(cs.toScala.recover(unwrapCompletionException)) { value D.onComplete(cs.toScala.recover(unwrapCompletionException)) { value
@ -61,6 +65,8 @@ abstract class FutureDirectives extends FormFieldDirectives {
* completion with the stage's value as an extraction of type `T`. * completion with the stage's value as an extraction of type `T`.
* If the stage fails its failure Throwable is bubbled up to the nearest * If the stage fails its failure Throwable is bubbled up to the nearest
* ExceptionHandler. * ExceptionHandler.
*
* @group future
*/ */
def onSuccess[T](f: Supplier[CompletionStage[T]], inner: JFunction[T, Route]) = RouteAdapter { def onSuccess[T](f: Supplier[CompletionStage[T]], inner: JFunction[T, Route]) = RouteAdapter {
D.onSuccess(f.get.toScala.recover(unwrapCompletionException)) { value D.onSuccess(f.get.toScala.recover(unwrapCompletionException)) { value
@ -74,6 +80,8 @@ abstract class FutureDirectives extends FormFieldDirectives {
* If the completion stage succeeds the request is completed using the values marshaller * If the completion stage succeeds the request is completed using the values marshaller
* (This directive therefore requires a marshaller for the completion stage value type to be * (This directive therefore requires a marshaller for the completion stage value type to be
* provided.) * provided.)
*
* @group future
*/ */
def completeOrRecoverWith[T](f: Supplier[CompletionStage[T]], marshaller: Marshaller[T, RequestEntity], inner: JFunction[Throwable, Route]): Route = RouteAdapter { def completeOrRecoverWith[T](f: Supplier[CompletionStage[T]], marshaller: Marshaller[T, RequestEntity], inner: JFunction[Throwable, Route]): Route = RouteAdapter {
val magnet = CompleteOrRecoverWithMagnet(f.get.toScala)(Marshaller.asScalaEntityMarshaller(marshaller)) val magnet = CompleteOrRecoverWithMagnet(f.get.toScala)(Marshaller.asScalaEntityMarshaller(marshaller))

View file

@ -61,9 +61,9 @@ object StrictForm {
fsu(value.entity.data.decodeString(charsetName)) fsu(value.entity.data.decodeString(charsetName))
}) })
@implicitNotFound(msg = @implicitNotFound(msg =
s"In order to unmarshal a `StrictForm.Field` to type `$${T}` you need to supply a " + s"In order to unmarshal a `StrictForm.Field` to type `$${T}` you need to supply a " +
s"`FromStringUnmarshaller[$${T}]` and/or a `FromEntityUnmarshaller[$${T}]`") s"`FromStringUnmarshaller[$${T}]` and/or a `FromEntityUnmarshaller[$${T}]`")
sealed trait FieldUnmarshaller[T] { sealed trait FieldUnmarshaller[T] {
def unmarshalString(value: String)(implicit ec: ExecutionContext, mat: Materializer): Future[T] def unmarshalString(value: String)(implicit ec: ExecutionContext, mat: Materializer): Future[T]
def unmarshalPart(value: Multipart.FormData.BodyPart.Strict)(implicit ec: ExecutionContext, mat: Materializer): Future[T] def unmarshalPart(value: Multipart.FormData.BodyPart.Strict)(implicit ec: ExecutionContext, mat: Materializer): Future[T]

View file

@ -148,7 +148,7 @@ trait FramedEntityStreamingDirectives extends MarshallingDirectives {
val entity = req.entity val entity = req.entity
if (framing.matches(entity.contentType)) { if (framing.matches(entity.contentType)) {
val bytes = entity.dataBytes val bytes = entity.dataBytes
val frames = bytes.viaMat(framing.flow)(Keep.right) val frames = bytes.via(framing.flow)
val elements = frames.viaMat(marshalling(ec, mat))(Keep.right) val elements = frames.viaMat(marshalling(ec, mat))(Keep.right)
FastFuture.successful(elements) FastFuture.successful(elements)

View file

@ -143,7 +143,7 @@ private[akka] class JsonObjectParser(maximumObjectLength: Int = Int.MaxValue) {
isStartOfEscapeSequence = false isStartOfEscapeSequence = false
pos += 1 pos += 1
} else { } else {
throw new FramingException(s"Invalid JSON encountered as position [$pos] of [$buffer]") throw new FramingException(s"Invalid JSON encountered at position [$pos] of [$buffer]")
} }
@inline private final def insideObject: Boolean = @inline private final def insideObject: Boolean =