diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala index 57bfa79349..fc91396ca7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala @@ -6,6 +6,7 @@ package akka.stream.scaladsl import akka.stream.ActorMaterializer import akka.stream.impl.JsonObjectParser import akka.stream.scaladsl.Framing.FramingException +import akka.stream.testkit.{ TestPublisher, TestSubscriber } import akka.stream.testkit.scaladsl.TestSink import akka.testkit.AkkaSpec import akka.util.ByteString @@ -116,6 +117,34 @@ class JsonFramingSpec extends AkkaSpec { |}""".stripMargin, """{ "name": "jack"}""") } + + "emit all elements after input completes" in { + // coverage for #21150 + val input = TestPublisher.probe[ByteString]() + val output = TestSubscriber.probe[String]() + + val result = Source.fromPublisher(input) + .via(JsonFraming.objectScanner(Int.MaxValue)) + .map(_.utf8String) + .runWith(Sink.fromSubscriber(output)) + + output.request(1) + input.expectRequest() + input.sendNext(ByteString("""[{"a":0}, {"b":1}, {"c":2}, {"d":3}, {"e":4}]""")) + input.sendComplete() + Thread.sleep(10) // another of those races, we don't know the order of next and complete + output.expectNext("""{"a":0}""") + output.request(1) + output.expectNext("""{"b":1}""") + output.request(1) + output.expectNext("""{"c":2}""") + output.request(1) + output.expectNext("""{"d":3}""") + output.request(1) + output.expectNext("""{"e":4}""") + output.request(1) + output.expectComplete() + } } "collecting json buffer" when { diff --git a/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala b/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala index ed77964607..fc87c86569 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala @@ -37,7 +37,7 @@ private[akka] object JsonObjectParser { * INTERNAL API: Use [[akka.stream.scaladsl.JsonFraming]] instead. * * **Mutable** framing implementation that given any number of [[ByteString]] chunks, can emit JSON objects contained within them. - * Typically JSON objects are separated by new-lines or comas, however a top-level JSON Array can also be understood and chunked up + * Typically JSON objects are separated by new-lines or commas, however a top-level JSON Array can also be understood and chunked up * into valid JSON objects by this framing implementation. * * Leading whitespace between elements will be trimmed. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/JsonFraming.scala b/akka-stream/src/main/scala/akka/stream/javadsl/JsonFraming.scala index 4bad96f790..99ec1d03cf 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/JsonFraming.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/JsonFraming.scala @@ -29,8 +29,6 @@ object JsonFraming { * elements are separated by multiple newlines or other whitespace characters. And of course is insensitive * (and does not impact the emitting frame) to the JSON object's internal formatting. * - * Framing raw JSON values (such as integers or strings) is supported as well. - * * @param maximumObjectLength The maximum length of allowed frames while decoding. If the maximum length is exceeded * this Flow will fail the stream. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala index f7c7aeb50e..d0dffb52c9 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala @@ -36,16 +36,17 @@ object JsonFraming { * elements are separated by multiple newlines or other whitespace characters. And of course is insensitive * (and does not impact the emitting frame) to the JSON object's internal formatting. * - * Framing raw JSON values (such as integers or strings) is supported as well. - * * @param maximumObjectLength The maximum length of allowed frames while decoding. If the maximum length is exceeded * this Flow will fail the stream. */ def objectScanner(maximumObjectLength: Int): Flow[ByteString, ByteString, NotUsed] = Flow[ByteString].via(new SimpleLinearGraphStage[ByteString] { - private[this] val buffer = new JsonObjectParser(maximumObjectLength) + + override protected def initialAttributes: Attributes = Attributes.name("JsonFraming.objectScanner") override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler { + private val buffer = new JsonObjectParser(maximumObjectLength) + setHandlers(in, out, this) override def onPush(): Unit = { @@ -57,8 +58,8 @@ object JsonFraming { tryPopBuffer() override def onUpstreamFinish(): Unit = { - try buffer.poll() match { - case Some(json) ⇒ emit(out, json, () ⇒ completeStage()) + buffer.poll() match { + case Some(json) ⇒ emit(out, json) case _ ⇒ completeStage() } } @@ -72,6 +73,6 @@ object JsonFraming { } } } - }).named("JsonFraming.objectScanner") + }) }