diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala index c311515c1b..2f4b2e6766 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala @@ -13,7 +13,6 @@ import akka.http.scaladsl.server.{ UnacceptedResponseContentTypeRejection, Unsup import akka.stream.scaladsl.{ Flow, Source } import akka.util.ByteString import docs.http.scaladsl.server.RoutingSpec -import spray.json.JsValue import scala.concurrent.Future @@ -24,11 +23,11 @@ class JsonStreamingExamplesSpec extends RoutingSpec { case class Measurement(id: String, value: Int) //# - def getTweets() = - Source(List( - Tweet(1, "#Akka rocks!"), - Tweet(2, "Streaming is so hot right now!"), - Tweet(3, "You cannot enter the same river twice."))) + val tweets = List( + Tweet(1, "#Akka rocks!"), + Tweet(2, "Streaming is so hot right now!"), + Tweet(3, "You cannot enter the same river twice.")) + def getTweets = Source(tweets) //#formats object MyJsonProtocol @@ -51,7 +50,7 @@ class JsonStreamingExamplesSpec extends RoutingSpec { val route = path("tweets") { // [3] simply complete a request with a source of tweets: - val tweets: Source[Tweet, NotUsed] = getTweets() + val tweets: Source[Tweet, NotUsed] = getTweets complete(tweets) } @@ -93,7 +92,7 @@ class JsonStreamingExamplesSpec extends RoutingSpec { val route = path("tweets") { // [3] simply complete a request with a source of tweets: - val tweets: Source[Tweet, NotUsed] = getTweets() + val tweets: Source[Tweet, NotUsed] = getTweets complete(tweets) } @@ -123,7 +122,7 @@ class JsonStreamingExamplesSpec extends RoutingSpec { val route = path("tweets") { - val tweets: Source[Tweet, NotUsed] = getTweets() + val tweets: Source[Tweet, NotUsed] = getTweets complete(tweets) } @@ -132,10 +131,9 @@ class JsonStreamingExamplesSpec extends RoutingSpec { Get("/tweets").withHeaders(AcceptCsv) ~> route ~> check { responseAs[String] shouldEqual - """|1,#Akka rocks! - |2,Streaming is so hot right now! - |3,You cannot enter the same river twice.""" - .stripMargin + "1,#Akka rocks!" + "\n" + + "2,Streaming is so hot right now!" + "\n" + + "3,You cannot enter the same river twice." } } @@ -149,7 +147,7 @@ class JsonStreamingExamplesSpec extends RoutingSpec { .withParallelMarshalling(parallelism = 8, unordered = false) path("tweets") { - val tweets: Source[Tweet, NotUsed] = getTweets() + val tweets: Source[Tweet, NotUsed] = getTweets complete(tweets) } //# @@ -164,7 +162,7 @@ class JsonStreamingExamplesSpec extends RoutingSpec { .withParallelMarshalling(parallelism = 8, unordered = true) path("tweets" / "unordered") { - val tweets: Source[Tweet, NotUsed] = getTweets() + val tweets: Source[Tweet, NotUsed] = getTweets complete(tweets) } //# diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala index 221314df04..c943d2e303 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala @@ -4,7 +4,7 @@ package akka.http.impl.engine.server -import java.net.{InetAddress, InetSocketAddress} +import java.net.{ InetAddress, InetSocketAddress } import akka.http.impl.util._ import akka.http.scaladsl.Http.ServerLayer @@ -17,7 +17,7 @@ import akka.http.scaladsl.settings.ServerSettings import akka.stream.scaladsl._ import akka.stream.testkit.Utils.assertAllStagesStopped import akka.stream.testkit._ -import akka.stream.{ActorMaterializer, Fusing} +import akka.stream.{ ActorMaterializer, Fusing } import akka.testkit.AkkaSpec import akka.util.ByteString import org.scalatest.Inside diff --git a/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonByteStringParserInput.scala b/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonByteStringParserInput.scala index 71f7fec8cb..6e921267af 100644 --- a/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonByteStringParserInput.scala +++ b/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonByteStringParserInput.scala @@ -13,15 +13,7 @@ import scala.annotation.tailrec /** * ParserInput reading directly off a ByteString. (Based on the ByteArrayBasedParserInput) - * This avoids a separate decoding step but assumes that each byte represents exactly one character, - * which is encoded by ISO-8859-1! - * You can therefore use this ParserInput type only if you know that all input will be `ISO-8859-1`-encoded, - * or only contains 7-bit ASCII characters (which is a subset of ISO-8859-1)! - * - * Note that this ParserInput type will NOT work with general `UTF-8`-encoded input as this can contain - * character representations spanning multiple bytes. However, if you know that your input will only ever contain - * 7-bit ASCII characters (0x00-0x7F) then UTF-8 is fine, since the first 127 UTF-8 characters are - * encoded with only one byte that is identical to 7-bit ASCII and ISO-8859-1. + * that avoids a separate decoding step. */ final class SprayJsonByteStringParserInput(bytes: ByteString) extends DefaultParserInput { @@ -67,9 +59,9 @@ final class SprayJsonByteStringParserInput(bytes: ByteString) extends DefaultPar override def length: Int = bytes.size override def sliceString(start: Int, end: Int): String = - bytes.slice(start, end - start).decodeString(StandardCharsets.ISO_8859_1) + bytes.slice(start, end - start).decodeString(StandardCharsets.UTF_8) override def sliceCharArray(start: Int, end: Int): Array[Char] = - StandardCharsets.ISO_8859_1.decode(bytes.slice(start, end).asByteBuffer).array() + StandardCharsets.UTF_8.decode(bytes.slice(start, end).asByteBuffer).array() } object SprayJsonByteStringParserInput { diff --git a/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala b/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala index e399ca7706..7f91733aa4 100644 --- a/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala +++ b/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala @@ -9,7 +9,7 @@ import akka.http.scaladsl.common.EntityStreamingSupport import akka.http.scaladsl.marshalling._ import akka.http.scaladsl.model.MediaTypes.`application/json` import akka.http.scaladsl.model.{ HttpCharsets, MediaTypes } -import akka.http.scaladsl.unmarshalling.{ FromEntityUnmarshaller, FromRequestUnmarshaller, Unmarshaller } +import akka.http.scaladsl.unmarshalling.{ FromByteStringUnmarshaller, FromEntityUnmarshaller, Unmarshaller } import akka.http.scaladsl.util.FastFuture import akka.stream.scaladsl.{ Flow, Keep, Source } import akka.util.ByteString @@ -25,10 +25,10 @@ trait SprayJsonSupport { sprayJsonUnmarshaller(reader) implicit def sprayJsonUnmarshaller[T](implicit reader: RootJsonReader[T]): FromEntityUnmarshaller[T] = sprayJsValueUnmarshaller.map(jsonReader[T].read) - implicit def sprayJsonByteStringUnmarshaller[T](implicit reader: RootJsonReader[T]): Unmarshaller[ByteString, T] = + implicit def sprayJsonByteStringUnmarshaller[T](implicit reader: RootJsonReader[T]): FromByteStringUnmarshaller[T] = Unmarshaller.withMaterializer[ByteString, JsValue](_ ⇒ implicit mat ⇒ { bs ⇒ // .compact so addressing into any address is very fast (also for large chunks) - // TODO we could optimise ByteStrings to better handle lienear access like this (or provide ByteStrings.linearAccessOptimised) + // TODO we could optimise ByteStrings to better handle linear access like this (or provide ByteStrings.linearAccessOptimised) // TODO IF it's worth it. val parserInput = new SprayJsonByteStringParserInput(bs.compact) FastFuture.successful(JsonParser(parserInput)) @@ -40,6 +40,19 @@ trait SprayJsonSupport { else ParserInput(data.decodeString(charset.nioCharset)) JsonParser(input) } + // support for as[Source[T, NotUsed]] + implicit def sprayJsonSourceReader[T](implicit reader: RootJsonReader[T], support: EntityStreamingSupport): FromEntityUnmarshaller[Source[T, NotUsed]] = + Unmarshaller.withMaterializer { implicit ec ⇒ implicit mat ⇒ e ⇒ + if (support.supported.matches(e.contentType)) { + val frames = e.dataBytes.via(support.framingDecoder) + val unmarshal = sprayJsonByteStringUnmarshaller(reader)(_) + val unmarshallingFlow = + if (support.unordered) Flow[ByteString].mapAsyncUnordered(support.parallelism)(unmarshal) + else Flow[ByteString].mapAsync(support.parallelism)(unmarshal) + val elements = frames.viaMat(unmarshallingFlow)(Keep.right) + FastFuture.successful(elements) + } else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(support.supported)) + } implicit def sprayJsonMarshallerConverter[T](writer: RootJsonWriter[T])(implicit printer: JsonPrinter = CompactPrinter): ToEntityMarshaller[T] = sprayJsonMarshaller[T](writer, printer) @@ -47,19 +60,5 @@ trait SprayJsonSupport { sprayJsValueMarshaller compose writer.write implicit def sprayJsValueMarshaller(implicit printer: JsonPrinter = CompactPrinter): ToEntityMarshaller[JsValue] = Marshaller.StringMarshaller.wrap(MediaTypes.`application/json`)(printer) - - // support for as[Source[T, NotUsed]] - implicit def sprayJsonSourceReader[T](implicit rootJsonReader: RootJsonReader[T], support: EntityStreamingSupport): FromRequestUnmarshaller[Source[T, NotUsed]] = - Unmarshaller.withMaterializer { implicit ec ⇒ implicit mat ⇒ r ⇒ - if (support.supported.matches(r.entity.contentType)) { - val bytes = r.entity.dataBytes - val frames = bytes.via(support.framingDecoder) - val unmarshalling = - if (support.unordered) Flow[ByteString].mapAsyncUnordered(support.parallelism)(bs ⇒ sprayJsonByteStringUnmarshaller(rootJsonReader)(bs)) - else Flow[ByteString].mapAsync(support.parallelism)(bs ⇒ sprayJsonByteStringUnmarshaller(rootJsonReader)(bs)) - val elements = frames.viaMat(unmarshalling)(Keep.right) - FastFuture.successful(elements) - } else FastFuture.failed(Unmarshaller.UnsupportedContentTypeException(support.supported)) - } } object SprayJsonSupport extends SprayJsonSupport diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/coding/CoderSpec.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/coding/CoderSpec.scala index 8142d69f7d..1663f081e1 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/coding/CoderSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/coding/CoderSpec.scala @@ -14,7 +14,6 @@ import scala.concurrent.duration._ import scala.concurrent.Await import scala.concurrent.ExecutionContext.Implicits.global import java.util.concurrent.ThreadLocalRandom -import scala.util.Random import scala.util.control.NoStackTrace import org.scalatest.{ Inspectors, WordSpec } import akka.util.ByteString diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala index ea5cb71894..25fec791f3 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala @@ -14,8 +14,6 @@ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import akka.http.scaladsl.common.EntityStreamingSupport -import akka.http.scaladsl.marshalling.ToResponseMarshallable -import spray.json.RootJsonReader import scala.concurrent.duration._ import scala.io.StdIn