diff --git a/akka-docs/rst/java/http/routing-dsl/source-streaming-support.rst b/akka-docs/rst/java/http/routing-dsl/source-streaming-support.rst index b83de64dc3..f4a0e7c4b6 100644 --- a/akka-docs/rst/java/http/routing-dsl/source-streaming-support.rst +++ b/akka-docs/rst/java/http/routing-dsl/source-streaming-support.rst @@ -3,7 +3,7 @@ Source Streaming ================ -Akka HTTP supports completing a request with an Akka ``Source``, which makes it possible to very easily build +Akka HTTP supports completing a request with an Akka ``Source``, which makes it possible to easily build streaming end-to-end APIs which apply back-pressure throughout the entire stack. It is possible to complete requests with raw ``Source``, however often it is more convenient to diff --git a/akka-docs/rst/java/typed-actors.rst b/akka-docs/rst/java/typed-actors.rst index 11160b69e1..0b6622023c 100644 --- a/akka-docs/rst/java/typed-actors.rst +++ b/akka-docs/rst/java/typed-actors.rst @@ -25,7 +25,7 @@ lies in interfacing between private sphere and the public, but you don’t want that many doors inside your house, do you? For a longer discussion see `this blog post `_. -A bit more background: TypedActors can very easily be abused as RPC, and that +A bit more background: TypedActors can easily be abused as RPC, and that is an abstraction which is `well-known `_ to be leaky. Hence TypedActors are not what we think of first when we talk diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala index f019b6fef5..4f047ae595 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/JsonStreamingExamplesSpec.scala @@ -45,13 +45,12 @@ class JsonStreamingExamplesSpec extends RoutingSpec { // [3] pick json rendering mode: // HINT: if you extend `akka.http.scaladsl.server.EntityStreamingSupport` // it'll guide you to do so via abstract defs - val maximumObjectLength = 128 implicit val jsonRenderingMode = JsonSourceRenderingModes.LineByLine val route = path("tweets") { val tweets: Source[Tweet, NotUsed] = getTweets() - complete(ToResponseMarshallable(tweets)) + complete(tweets) } // tests: @@ -104,7 +103,7 @@ class JsonStreamingExamplesSpec extends RoutingSpec { // [2] import "my protocol", for unmarshalling Measurement objects: import MyJsonProtocol._ - // [3] prepareyour persisting logic here + // [3] prepare your persisting logic here val persistMetrics = Flow[Measurement] val route = diff --git a/akka-docs/rst/scala/http/routing-dsl/directives/index.rst b/akka-docs/rst/scala/http/routing-dsl/directives/index.rst index e0082a338a..4e30d49f50 100644 --- a/akka-docs/rst/scala/http/routing-dsl/directives/index.rst +++ b/akka-docs/rst/scala/http/routing-dsl/directives/index.rst @@ -224,4 +224,4 @@ When you combine directives producing extractions with the ``&`` operator all ex Directives offer a great way of constructing your web service logic from small building blocks in a plug and play fashion while maintaining DRYness and full type-safety. If the large range of :ref:`Predefined Directives` does not -fully satisfy your needs you can also very easily create :ref:`Custom Directives`. +fully satisfy your needs you can also easily create :ref:`Custom Directives`. diff --git a/akka-docs/rst/scala/http/routing-dsl/source-streaming-support.rst b/akka-docs/rst/scala/http/routing-dsl/source-streaming-support.rst index d6c7cbdc46..8ead39b766 100644 --- a/akka-docs/rst/scala/http/routing-dsl/source-streaming-support.rst +++ b/akka-docs/rst/scala/http/routing-dsl/source-streaming-support.rst @@ -3,7 +3,7 @@ Source Streaming ================ -Akka HTTP supports completing a request with an Akka ``Source[T, _]``, which makes it possible to very easily build +Akka HTTP supports completing a request with an Akka ``Source[T, _]``, which makes it possible to easily build streaming end-to-end APIs which apply back-pressure throughout the entire stack. It is possible to complete requests with raw ``Source[ByteString, _]``, however often it is more convenient to @@ -99,7 +99,7 @@ Implementing custom (Un)Marshaller support for JSON streaming While not provided by Akka HTTP directly, the infrastructure is extensible and by investigating how ``SprayJsonSupport`` is implemented it is certainly possible to provide the same infrastructure for other marshaller implementations (such as -Play JSON, or Jackson directly for example). Such support traits will want to extend the ``JsonEntityStreamingSupport`` trait. +Play JSON, or Jackson directly for example). Such support traits will want to extend the ``EntityStreamingSupport`` trait. The following types that may need to be implemented by a custom framed-streaming support library are: @@ -108,4 +108,4 @@ The following types that may need to be implemented by a custom framed-streaming - ``FramingWithContentType`` which is needed to be able to split incoming ``ByteString`` chunks into frames of the higher-level data type format that is understood by the provided unmarshallers. In the case of JSON it means chunking up ByteStrings such that each emitted element corresponds to exactly one JSON object, - this framing is implemented in ``JsonEntityStreamingSupport``. + this framing is implemented in ``EntityStreamingSupport``. diff --git a/akka-docs/rst/scala/typed-actors.rst b/akka-docs/rst/scala/typed-actors.rst index f9f5ab8fd5..75c72637f3 100644 --- a/akka-docs/rst/scala/typed-actors.rst +++ b/akka-docs/rst/scala/typed-actors.rst @@ -35,7 +35,7 @@ lies in interfacing between private sphere and the public, but you don’t want that many doors inside your house, do you? For a longer discussion see `this blog post `_. -A bit more background: TypedActors can very easily be abused as RPC, and that +A bit more background: TypedActors can easily be abused as RPC, and that is an abstraction which is `well-known `_ to be leaky. Hence TypedActors are not what we think of first when we talk diff --git a/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonByteStringParserInput.scala b/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonByteStringParserInput.scala new file mode 100644 index 0000000000..71f7fec8cb --- /dev/null +++ b/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonByteStringParserInput.scala @@ -0,0 +1,79 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ + +package akka.http.scaladsl.marshallers.sprayjson + +import java.nio.{ ByteBuffer, CharBuffer } +import java.nio.charset.{ Charset, StandardCharsets } + +import akka.util.ByteString +import spray.json.ParserInput.DefaultParserInput +import scala.annotation.tailrec + +/** + * ParserInput reading directly off a ByteString. (Based on the ByteArrayBasedParserInput) + * This avoids a separate decoding step but assumes that each byte represents exactly one character, + * which is encoded by ISO-8859-1! + * You can therefore use this ParserInput type only if you know that all input will be `ISO-8859-1`-encoded, + * or only contains 7-bit ASCII characters (which is a subset of ISO-8859-1)! + * + * Note that this ParserInput type will NOT work with general `UTF-8`-encoded input as this can contain + * character representations spanning multiple bytes. However, if you know that your input will only ever contain + * 7-bit ASCII characters (0x00-0x7F) then UTF-8 is fine, since the first 127 UTF-8 characters are + * encoded with only one byte that is identical to 7-bit ASCII and ISO-8859-1. + */ +final class SprayJsonByteStringParserInput(bytes: ByteString) extends DefaultParserInput { + + import SprayJsonByteStringParserInput._ + + private[this] val byteBuffer = ByteBuffer.allocate(4) + private[this] val charBuffer = CharBuffer.allocate(1) + + private[this] val decoder = Charset.forName("UTF-8").newDecoder() + + override def nextChar() = { + _cursor += 1 + if (_cursor < bytes.length) (bytes(_cursor) & 0xFF).toChar else EOI + } + + override def nextUtf8Char() = { + @tailrec def decode(byte: Byte, remainingBytes: Int): Char = { + byteBuffer.put(byte) + if (remainingBytes > 0) { + _cursor += 1 + if (_cursor < bytes.length) decode(bytes(_cursor), remainingBytes - 1) else ErrorChar + } else { + byteBuffer.flip() + val coderResult = decoder.decode(byteBuffer, charBuffer, false) + charBuffer.flip() + val result = if (coderResult.isUnderflow & charBuffer.hasRemaining) charBuffer.get() else ErrorChar + byteBuffer.clear() + charBuffer.clear() + result + } + } + + _cursor += 1 + if (_cursor < bytes.length) { + val byte = bytes(_cursor) + if (byte >= 0) byte.toChar // 7-Bit ASCII + else if ((byte & 0xE0) == 0xC0) decode(byte, 1) // 2-byte UTF-8 sequence + else if ((byte & 0xF0) == 0xE0) decode(byte, 2) // 3-byte UTF-8 sequence + else if ((byte & 0xF8) == 0xF0) decode(byte, 3) // 4-byte UTF-8 sequence, will probably produce an (unsupported) surrogate pair + else ErrorChar + } else EOI + } + + override def length: Int = bytes.size + override def sliceString(start: Int, end: Int): String = + bytes.slice(start, end - start).decodeString(StandardCharsets.ISO_8859_1) + override def sliceCharArray(start: Int, end: Int): Array[Char] = + StandardCharsets.ISO_8859_1.decode(bytes.slice(start, end).asByteBuffer).array() +} + +object SprayJsonByteStringParserInput { + private final val EOI = '\uFFFF' + // compile-time constant + private final val ErrorChar = '\uFFFD' // compile-time constant, universal UTF-8 replacement character '�' +} diff --git a/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala b/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala index d3f13379bd..cc5fe07495 100644 --- a/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala +++ b/akka-http-marshallers-scala/akka-http-spray-json/src/main/scala/akka/http/scaladsl/marshallers/sprayjson/SprayJsonSupport.scala @@ -4,15 +4,15 @@ package akka.http.scaladsl.marshallers.sprayjson +import akka.http.scaladsl.marshalling.{ Marshaller, ToByteStringMarshaller, ToEntityMarshaller } +import akka.http.scaladsl.model.MediaTypes.`application/json` +import akka.http.scaladsl.model.{ HttpCharsets, MediaTypes } +import akka.http.scaladsl.unmarshalling.{ FromEntityUnmarshaller, Unmarshaller } import akka.http.scaladsl.util.FastFuture import akka.util.ByteString +import spray.json._ import scala.language.implicitConversions -import akka.http.scaladsl.marshalling.{Marshaller, ToByteStringMarshaller, ToEntityMarshaller} -import akka.http.scaladsl.unmarshalling.{FromEntityUnmarshaller, Unmarshaller} -import akka.http.scaladsl.model.{ContentTypes, HttpCharsets, MediaTypes} -import akka.http.scaladsl.model.MediaTypes.`application/json` -import spray.json._ /** * A trait providing automatic to and from JSON marshalling/unmarshalling using an in-scope *spray-json* protocol. @@ -24,7 +24,11 @@ trait SprayJsonSupport { sprayJsValueUnmarshaller.map(jsonReader[T].read) implicit def sprayJsonByteStringUnmarshaller[T](implicit reader: RootJsonReader[T]): Unmarshaller[ByteString, T] = Unmarshaller.withMaterializer[ByteString, JsValue](_ ⇒ implicit mat ⇒ { bs ⇒ - FastFuture.successful(JsonParser(bs.toArray[Byte])) + // .compact so addressing into any address is very fast (also for large chunks) + // TODO we could optimise ByteStrings to better handle lienear access like this (or provide ByteStrings.linearAccessOptimised) + // TODO IF it's worth it. + val parserInput = new SprayJsonByteStringParserInput(bs.compact) + FastFuture.successful(JsonParser(parserInput)) }).map(jsonReader[T].read) implicit def sprayJsValueUnmarshaller: FromEntityUnmarshaller[JsValue] = Unmarshaller.byteStringUnmarshaller.forContentTypes(`application/json`).mapWithCharset { (data, charset) ⇒ diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala index 7711dc1b88..1512bcb0d9 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/TestServer.scala @@ -86,11 +86,13 @@ object TestServer extends App { (path("tweets") & parameter('n.as[Int])) { n => get { val tweets = Source.repeat(Tweet("Hello, world!")).take(n) - complete(ToResponseMarshallable(tweets)) + complete(tweets) } ~ post { entity(as[Source[Tweet, NotUsed]]) { tweets ⇒ - complete(s"Total tweets received: " + tweets.runFold(0)({ case (acc, t) => acc + 1 })) + onComplete(tweets.runFold(0)({ case (acc, t) => acc + 1 })) { count => + complete(s"Total tweets received: " + count) + } } } } diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/directives/FutureDirectives.scala b/akka-http/src/main/scala/akka/http/javadsl/server/directives/FutureDirectives.scala index dd48c4c187..50a0cc75ef 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/server/directives/FutureDirectives.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/server/directives/FutureDirectives.scala @@ -23,6 +23,8 @@ abstract class FutureDirectives extends FormFieldDirectives { /** * "Unwraps" a `CompletionStage` and runs the inner route after future * completion with the future's value as an extraction of type `Try`. + * + * @group future */ def onComplete[T](f: Supplier[CompletionStage[T]], inner: JFunction[Try[T], Route]) = RouteAdapter { D.onComplete(f.get.toScala.recover(unwrapCompletionException)) { value ⇒ @@ -33,6 +35,8 @@ abstract class FutureDirectives extends FormFieldDirectives { /** * "Unwraps" a `CompletionStage` and runs the inner route after future * completion with the future's value as an extraction of type `Try`. + * + * @group future */ def onComplete[T](cs: CompletionStage[T], inner: JFunction[Try[T], Route]) = RouteAdapter { D.onComplete(cs.toScala.recover(unwrapCompletionException)) { value ⇒ @@ -61,6 +65,8 @@ abstract class FutureDirectives extends FormFieldDirectives { * completion with the stage's value as an extraction of type `T`. * If the stage fails its failure Throwable is bubbled up to the nearest * ExceptionHandler. + * + * @group future */ def onSuccess[T](f: Supplier[CompletionStage[T]], inner: JFunction[T, Route]) = RouteAdapter { D.onSuccess(f.get.toScala.recover(unwrapCompletionException)) { value ⇒ @@ -74,6 +80,8 @@ abstract class FutureDirectives extends FormFieldDirectives { * If the completion stage succeeds the request is completed using the values marshaller * (This directive therefore requires a marshaller for the completion stage value type to be * provided.) + * + * @group future */ def completeOrRecoverWith[T](f: Supplier[CompletionStage[T]], marshaller: Marshaller[T, RequestEntity], inner: JFunction[Throwable, Route]): Route = RouteAdapter { val magnet = CompleteOrRecoverWithMagnet(f.get.toScala)(Marshaller.asScalaEntityMarshaller(marshaller)) diff --git a/akka-http/src/main/scala/akka/http/scaladsl/common/StrictForm.scala b/akka-http/src/main/scala/akka/http/scaladsl/common/StrictForm.scala index cfea86b285..f6a5d35206 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/common/StrictForm.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/common/StrictForm.scala @@ -61,9 +61,9 @@ object StrictForm { fsu(value.entity.data.decodeString(charsetName)) }) - @implicitNotFound(msg = + @implicitNotFound(msg = s"In order to unmarshal a `StrictForm.Field` to type `$${T}` you need to supply a " + - s"`FromStringUnmarshaller[$${T}]` and/or a `FromEntityUnmarshaller[$${T}]`") + s"`FromStringUnmarshaller[$${T}]` and/or a `FromEntityUnmarshaller[$${T}]`") sealed trait FieldUnmarshaller[T] { def unmarshalString(value: String)(implicit ec: ExecutionContext, mat: Materializer): Future[T] def unmarshalPart(value: Multipart.FormData.BodyPart.Strict)(implicit ec: ExecutionContext, mat: Materializer): Future[T] diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala index 61c39c6843..9f3dffd6df 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala @@ -148,7 +148,7 @@ trait FramedEntityStreamingDirectives extends MarshallingDirectives { val entity = req.entity if (framing.matches(entity.contentType)) { val bytes = entity.dataBytes - val frames = bytes.viaMat(framing.flow)(Keep.right) + val frames = bytes.via(framing.flow) val elements = frames.viaMat(marshalling(ec, mat))(Keep.right) FastFuture.successful(elements) diff --git a/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala b/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala index 3d0c6ec253..ed77964607 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala @@ -143,7 +143,7 @@ private[akka] class JsonObjectParser(maximumObjectLength: Int = Int.MaxValue) { isStartOfEscapeSequence = false pos += 1 } else { - throw new FramingException(s"Invalid JSON encountered as position [$pos] of [$buffer]") + throw new FramingException(s"Invalid JSON encountered at position [$pos] of [$buffer]") } @inline private final def insideObject: Boolean =