=htp clean up json stream unmarshalling (#21233)
This commit is contained in:
parent
0c4d4c37ba
commit
0ed4a2aae9
6 changed files with 34 additions and 48 deletions
|
|
@ -13,7 +13,6 @@ import akka.http.scaladsl.server.{ UnacceptedResponseContentTypeRejection, Unsup
|
|||
import akka.stream.scaladsl.{ Flow, Source }
|
||||
import akka.util.ByteString
|
||||
import docs.http.scaladsl.server.RoutingSpec
|
||||
import spray.json.JsValue
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
|
|
@ -24,11 +23,11 @@ class JsonStreamingExamplesSpec extends RoutingSpec {
|
|||
case class Measurement(id: String, value: Int)
|
||||
//#
|
||||
|
||||
def getTweets() =
|
||||
Source(List(
|
||||
Tweet(1, "#Akka rocks!"),
|
||||
Tweet(2, "Streaming is so hot right now!"),
|
||||
Tweet(3, "You cannot enter the same river twice.")))
|
||||
val tweets = List(
|
||||
Tweet(1, "#Akka rocks!"),
|
||||
Tweet(2, "Streaming is so hot right now!"),
|
||||
Tweet(3, "You cannot enter the same river twice."))
|
||||
def getTweets = Source(tweets)
|
||||
|
||||
//#formats
|
||||
object MyJsonProtocol
|
||||
|
|
@ -51,7 +50,7 @@ class JsonStreamingExamplesSpec extends RoutingSpec {
|
|||
val route =
|
||||
path("tweets") {
|
||||
// [3] simply complete a request with a source of tweets:
|
||||
val tweets: Source[Tweet, NotUsed] = getTweets()
|
||||
val tweets: Source[Tweet, NotUsed] = getTweets
|
||||
complete(tweets)
|
||||
}
|
||||
|
||||
|
|
@ -93,7 +92,7 @@ class JsonStreamingExamplesSpec extends RoutingSpec {
|
|||
val route =
|
||||
path("tweets") {
|
||||
// [3] simply complete a request with a source of tweets:
|
||||
val tweets: Source[Tweet, NotUsed] = getTweets()
|
||||
val tweets: Source[Tweet, NotUsed] = getTweets
|
||||
complete(tweets)
|
||||
}
|
||||
|
||||
|
|
@ -123,7 +122,7 @@ class JsonStreamingExamplesSpec extends RoutingSpec {
|
|||
|
||||
val route =
|
||||
path("tweets") {
|
||||
val tweets: Source[Tweet, NotUsed] = getTweets()
|
||||
val tweets: Source[Tweet, NotUsed] = getTweets
|
||||
complete(tweets)
|
||||
}
|
||||
|
||||
|
|
@ -132,10 +131,9 @@ class JsonStreamingExamplesSpec extends RoutingSpec {
|
|||
|
||||
Get("/tweets").withHeaders(AcceptCsv) ~> route ~> check {
|
||||
responseAs[String] shouldEqual
|
||||
"""|1,#Akka rocks!
|
||||
|2,Streaming is so hot right now!
|
||||
|3,You cannot enter the same river twice."""
|
||||
.stripMargin
|
||||
"1,#Akka rocks!" + "\n" +
|
||||
"2,Streaming is so hot right now!" + "\n" +
|
||||
"3,You cannot enter the same river twice."
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -149,7 +147,7 @@ class JsonStreamingExamplesSpec extends RoutingSpec {
|
|||
.withParallelMarshalling(parallelism = 8, unordered = false)
|
||||
|
||||
path("tweets") {
|
||||
val tweets: Source[Tweet, NotUsed] = getTweets()
|
||||
val tweets: Source[Tweet, NotUsed] = getTweets
|
||||
complete(tweets)
|
||||
}
|
||||
//#
|
||||
|
|
@ -164,7 +162,7 @@ class JsonStreamingExamplesSpec extends RoutingSpec {
|
|||
.withParallelMarshalling(parallelism = 8, unordered = true)
|
||||
|
||||
path("tweets" / "unordered") {
|
||||
val tweets: Source[Tweet, NotUsed] = getTweets()
|
||||
val tweets: Source[Tweet, NotUsed] = getTweets
|
||||
complete(tweets)
|
||||
}
|
||||
//#
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
package akka.http.impl.engine.server
|
||||
|
||||
import java.net.{InetAddress, InetSocketAddress}
|
||||
import java.net.{ InetAddress, InetSocketAddress }
|
||||
|
||||
import akka.http.impl.util._
|
||||
import akka.http.scaladsl.Http.ServerLayer
|
||||
|
|
@ -17,7 +17,7 @@ import akka.http.scaladsl.settings.ServerSettings
|
|||
import akka.stream.scaladsl._
|
||||
import akka.stream.testkit.Utils.assertAllStagesStopped
|
||||
import akka.stream.testkit._
|
||||
import akka.stream.{ActorMaterializer, Fusing}
|
||||
import akka.stream.{ ActorMaterializer, Fusing }
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.util.ByteString
|
||||
import org.scalatest.Inside
|
||||
|
|
|
|||
|
|
@ -13,15 +13,7 @@ import scala.annotation.tailrec
|
|||
|
||||
/**
|
||||
* ParserInput reading directly off a ByteString. (Based on the ByteArrayBasedParserInput)
|
||||
* This avoids a separate decoding step but assumes that each byte represents exactly one character,
|
||||
* which is encoded by ISO-8859-1!
|
||||
* You can therefore use this ParserInput type only if you know that all input will be `ISO-8859-1`-encoded,
|
||||
* or only contains 7-bit ASCII characters (which is a subset of ISO-8859-1)!
|
||||
*
|
||||
* Note that this ParserInput type will NOT work with general `UTF-8`-encoded input as this can contain
|
||||
* character representations spanning multiple bytes. However, if you know that your input will only ever contain
|
||||
* 7-bit ASCII characters (0x00-0x7F) then UTF-8 is fine, since the first 127 UTF-8 characters are
|
||||
* encoded with only one byte that is identical to 7-bit ASCII and ISO-8859-1.
|
||||
* that avoids a separate decoding step.
|
||||
*/
|
||||
final class SprayJsonByteStringParserInput(bytes: ByteString) extends DefaultParserInput {
|
||||
|
||||
|
|
@ -67,9 +59,9 @@ final class SprayJsonByteStringParserInput(bytes: ByteString) extends DefaultPar
|
|||
|
||||
override def length: Int = bytes.size
|
||||
override def sliceString(start: Int, end: Int): String =
|
||||
bytes.slice(start, end - start).decodeString(StandardCharsets.ISO_8859_1)
|
||||
bytes.slice(start, end - start).decodeString(StandardCharsets.UTF_8)
|
||||
override def sliceCharArray(start: Int, end: Int): Array[Char] =
|
||||
StandardCharsets.ISO_8859_1.decode(bytes.slice(start, end).asByteBuffer).array()
|
||||
StandardCharsets.UTF_8.decode(bytes.slice(start, end).asByteBuffer).array()
|
||||
}
|
||||
|
||||
object SprayJsonByteStringParserInput {
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ import akka.http.scaladsl.common.EntityStreamingSupport
|
|||
import akka.http.scaladsl.marshalling._
|
||||
import akka.http.scaladsl.model.MediaTypes.`application/json`
|
||||
import akka.http.scaladsl.model.{ HttpCharsets, MediaTypes }
|
||||
import akka.http.scaladsl.unmarshalling.{ FromEntityUnmarshaller, FromRequestUnmarshaller, Unmarshaller }
|
||||
import akka.http.scaladsl.unmarshalling.{ FromByteStringUnmarshaller, FromEntityUnmarshaller, Unmarshaller }
|
||||
import akka.http.scaladsl.util.FastFuture
|
||||
import akka.stream.scaladsl.{ Flow, Keep, Source }
|
||||
import akka.util.ByteString
|
||||
|
|
@ -25,10 +25,10 @@ trait SprayJsonSupport {
|
|||
sprayJsonUnmarshaller(reader)
|
||||
implicit def sprayJsonUnmarshaller[T](implicit reader: RootJsonReader[T]): FromEntityUnmarshaller[T] =
|
||||
sprayJsValueUnmarshaller.map(jsonReader[T].read)
|
||||
implicit def sprayJsonByteStringUnmarshaller[T](implicit reader: RootJsonReader[T]): Unmarshaller[ByteString, T] =
|
||||
implicit def sprayJsonByteStringUnmarshaller[T](implicit reader: RootJsonReader[T]): FromByteStringUnmarshaller[T] =
|
||||
Unmarshaller.withMaterializer[ByteString, JsValue](_ ⇒ implicit mat ⇒ { bs ⇒
|
||||
// .compact so addressing into any address is very fast (also for large chunks)
|
||||
// TODO we could optimise ByteStrings to better handle lienear access like this (or provide ByteStrings.linearAccessOptimised)
|
||||
// TODO we could optimise ByteStrings to better handle linear access like this (or provide ByteStrings.linearAccessOptimised)
|
||||
// TODO IF it's worth it.
|
||||
val parserInput = new SprayJsonByteStringParserInput(bs.compact)
|
||||
FastFuture.successful(JsonParser(parserInput))
|
||||
|
|
@ -40,6 +40,19 @@ trait SprayJsonSupport {
|
|||
else ParserInput(data.decodeString(charset.nioCharset))
|
||||
JsonParser(input)
|
||||
}
|
||||
// support for as[Source[T, NotUsed]]
|
||||
implicit def sprayJsonSourceReader[T](implicit reader: RootJsonReader[T], support: EntityStreamingSupport): FromEntityUnmarshaller[Source[T, NotUsed]] =
|
||||
Unmarshaller.withMaterializer { implicit ec ⇒ implicit mat ⇒ e ⇒
|
||||
if (support.supported.matches(e.contentType)) {
|
||||
val frames = e.dataBytes.via(support.framingDecoder)
|
||||
val unmarshal = sprayJsonByteStringUnmarshaller(reader)(_)
|
||||
val unmarshallingFlow =
|
||||
if (support.unordered) Flow[ByteString].mapAsyncUnordered(support.parallelism)(unmarshal)
|
||||
else Flow[ByteString].mapAsync(support.parallelism)(unmarshal)
|
||||
val elements = frames.viaMat(unmarshallingFlow)(Keep.right)
|
||||
FastFuture.successful(elements)
|
||||
} else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(support.supported))
|
||||
}
|
||||
|
||||
implicit def sprayJsonMarshallerConverter[T](writer: RootJsonWriter[T])(implicit printer: JsonPrinter = CompactPrinter): ToEntityMarshaller[T] =
|
||||
sprayJsonMarshaller[T](writer, printer)
|
||||
|
|
@ -47,19 +60,5 @@ trait SprayJsonSupport {
|
|||
sprayJsValueMarshaller compose writer.write
|
||||
implicit def sprayJsValueMarshaller(implicit printer: JsonPrinter = CompactPrinter): ToEntityMarshaller[JsValue] =
|
||||
Marshaller.StringMarshaller.wrap(MediaTypes.`application/json`)(printer)
|
||||
|
||||
// support for as[Source[T, NotUsed]]
|
||||
implicit def sprayJsonSourceReader[T](implicit rootJsonReader: RootJsonReader[T], support: EntityStreamingSupport): FromRequestUnmarshaller[Source[T, NotUsed]] =
|
||||
Unmarshaller.withMaterializer { implicit ec ⇒ implicit mat ⇒ r ⇒
|
||||
if (support.supported.matches(r.entity.contentType)) {
|
||||
val bytes = r.entity.dataBytes
|
||||
val frames = bytes.via(support.framingDecoder)
|
||||
val unmarshalling =
|
||||
if (support.unordered) Flow[ByteString].mapAsyncUnordered(support.parallelism)(bs ⇒ sprayJsonByteStringUnmarshaller(rootJsonReader)(bs))
|
||||
else Flow[ByteString].mapAsync(support.parallelism)(bs ⇒ sprayJsonByteStringUnmarshaller(rootJsonReader)(bs))
|
||||
val elements = frames.viaMat(unmarshalling)(Keep.right)
|
||||
FastFuture.successful(elements)
|
||||
} else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(support.supported))
|
||||
}
|
||||
}
|
||||
object SprayJsonSupport extends SprayJsonSupport
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@ import scala.concurrent.duration._
|
|||
import scala.concurrent.Await
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
import java.util.concurrent.ThreadLocalRandom
|
||||
import scala.util.Random
|
||||
import scala.util.control.NoStackTrace
|
||||
import org.scalatest.{ Inspectors, WordSpec }
|
||||
import akka.util.ByteString
|
||||
|
|
|
|||
|
|
@ -14,8 +14,6 @@ import akka.stream._
|
|||
import akka.stream.scaladsl._
|
||||
import akka.http.scaladsl.Http
|
||||
import akka.http.scaladsl.common.EntityStreamingSupport
|
||||
import akka.http.scaladsl.marshalling.ToResponseMarshallable
|
||||
import spray.json.RootJsonReader
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.io.StdIn
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue