diff --git a/akka-bench-jmh/src/main/scala/akka/stream/JsonFramingBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/JsonFramingBenchmark.scala index cf5d779415..8e7cfe884e 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/JsonFramingBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/JsonFramingBenchmark.scala @@ -33,7 +33,8 @@ class JsonFramingBenchmark { |{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false}, |{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false}, |{"fname":"Bob","name":"Smith","age":42,"id":1337,"boardMember":false}, - |{"fname":"Hank","name":"Smith","age":42,"id":1337,"boardMember":false}""".stripMargin) + |{"fname":"Hank","name":"Smith","age":42,"id":1337,"boardMember":false}""".stripMargin + ) val bracket = new JsonObjectParser diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala index fde94d9e4d..dc4a5ab001 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FramedEntityStreamingDirectives.scala @@ -47,22 +47,14 @@ trait FramedEntityStreamingDirectives extends MarshallingDirectives { * Extracts entity as [[Source]] of elements of type `T`. * This is achieved by applying the implicitly provided (in the following order): * - * - 1st: [[FramingFlow]] in order to chunk-up the incoming [[ByteString]]s according to the - * `Content-Type` aware framing (for example, [[common.EntityStreamingSupport.bracketCountingJsonFraming]]). - * - 2nd: [[Unmarshaller]] (from [[ByteString]] to `T`) for each of the respective "chunks" (e.g. for each JSON element contained within an array). + * - 1st: chunk-up the incoming [[ByteString]]s by applying the `Content-Type`-aware framing + * - 2nd: apply the [[Unmarshaller]] (from [[ByteString]] to `T`) for each of the respective "chunks" (e.g. for each JSON element contained within an array). * * The request will be rejected with an [[akka.http.scaladsl.server.UnsupportedRequestContentTypeRejection]] if * its [[ContentType]] is not supported by the used `framing` or `unmarshaller`. * - * It is recommended to use the [[common.EntityStreamingSupport]] trait in conjunction with this - * directive as it helps provide the right [[FramingFlow]] and [[SourceRenderingMode]] for the most - * typical usage scenarios (JSON, CSV, ...). - * * Cancelling extracted [[Source]] closes the connection abruptly (same as cancelling the `entity.dataBytes`). * - * If looking to improve marshalling performance in face of many elements (possibly of different sizes), - * you may be interested in using [[asSourceOfAsyncUnordered]] instead. - * * See also [[MiscDirectives.withoutSizeLimit]] as you may want to allow streaming infinite streams of data in this route. * By default the uploaded data is limited by the `akka.http.parsing.max-content-length`. */ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala index 37b2a5aad8..57bfa79349 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala @@ -16,8 +16,6 @@ import scala.concurrent.duration._ class JsonFramingSpec extends AkkaSpec { - override implicit val patience = PatienceConfig(timeout = 10.seconds) - implicit val mat = ActorMaterializer() "collecting multiple json" should { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala index 79e35909cc..f7c7aeb50e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala @@ -56,8 +56,12 @@ object JsonFraming { override def onPull(): Unit = tryPopBuffer() - override def onUpstreamFinish(): Unit = - if (buffer.isEmpty) completeStage() + override def onUpstreamFinish(): Unit = { + try buffer.poll() match { + case Some(json) ⇒ emit(out, json, () ⇒ completeStage()) + case _ ⇒ completeStage() + } + } def tryPopBuffer() = { try buffer.poll() match {