From 2f2ee9e67c472515af87a4c91f0197179de0a303 Mon Sep 17 00:00:00 2001 From: Dave Handy Date: Fri, 3 Jul 2020 11:22:40 -0400 Subject: [PATCH] Change JsonFraming to fail stage if completing within an object #29228 --- .../stream/scaladsl/JsonFramingSpec.scala | 40 +++++++++++++------ .../akka/stream/impl/JsonObjectParser.scala | 3 ++ .../akka/stream/scaladsl/JsonFraming.scala | 21 +++++++--- 3 files changed, 47 insertions(+), 17 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala index 6e2aced1eb..726118c41f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala @@ -4,18 +4,18 @@ package akka.stream.scaladsl +import akka.stream.impl.JsonObjectParser +import akka.stream.scaladsl.Framing.FramingException +import akka.stream.scaladsl.JsonFraming.PartialObjectException +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.testkit.{ TestPublisher, TestSubscriber } +import akka.testkit.AkkaSpec +import akka.util.ByteString + import scala.collection.immutable.Seq import scala.concurrent.Await import scala.concurrent.duration._ -import akka.stream.impl.JsonObjectParser -import akka.stream.scaladsl.Framing.FramingException -import akka.stream.testkit.TestPublisher -import akka.stream.testkit.TestSubscriber -import akka.stream.testkit.scaladsl.TestSink -import akka.testkit.AkkaSpec -import akka.util.ByteString - class JsonFramingSpec extends AkkaSpec { "collecting multiple json" should { @@ -515,13 +515,29 @@ class JsonFramingSpec extends AkkaSpec { probe.ensureSubscription() probe - .request(1) - .expectNext(ByteString("""{ "name": "john" }""")) - .request(1) - .expectNext(ByteString("""{ "name": "jack" }""")) + .requestNext(ByteString("""{ "name": "john" }""")) + .requestNext(ByteString("""{ "name": "jack" }""")) .request(1) .expectError() .getMessage should include("exceeded") } + + "fail when completing inside an object" in { + val input = ByteString("{") + val probe = Source.single(input).via(JsonFraming.objectScanner(48)).runWith(TestSink.probe) + + probe.ensureSubscription() + probe.request(1).expectError() shouldBe a[PartialObjectException] + } + + "fail when pushing and inside an object" in { + val input = """ { "name": "john" }, { """ + Source + .single(ByteString(input)) + .via(JsonFraming.objectScanner(Int.MaxValue)) + .runWith(Sink.last) + .failed + .futureValue shouldBe a[PartialObjectException] + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala b/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala index dd3a9486bc..b052849354 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala @@ -70,6 +70,9 @@ import akka.util.ByteString def isEmpty: Boolean = buffer.isEmpty + /** `true` if the buffer is in a valid state to end framing. */ + def canComplete: Boolean = !insideObject + /** * Attempt to locate next complete JSON object in buffered ByteString and returns `Some(it)` if found. * May throw a [[akka.stream.scaladsl.Framing.FramingException]] if the contained JSON is invalid or max object size is exceeded. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala index 8449264632..f81e8a1760 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/JsonFraming.scala @@ -4,18 +4,23 @@ package akka.stream.scaladsl -import scala.util.control.NonFatal - import akka.NotUsed import akka.stream.Attributes import akka.stream.impl.JsonObjectParser import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage +import akka.stream.scaladsl.Framing.FramingException import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler } import akka.util.ByteString +import scala.util.control.NonFatal + /** Provides JSON framing operators that can separate valid JSON objects from incoming [[ByteString]] objects. */ object JsonFraming { + /** Thrown if upstream completes with a partial object in the buffer. */ + class PartialObjectException(msg: String = "JSON stream completed with partial content in the buffer!") + extends FramingException(msg) + /** * Returns a Flow that implements a "brace counting" based framing operator for emitting valid JSON chunks. * It scans the incoming data stream for valid JSON objects and returns chunks of ByteStrings containing only those valid chunks. @@ -37,6 +42,8 @@ object JsonFraming { * elements are separated by multiple newlines or other whitespace characters. And of course is insensitive * (and does not impact the emitting frame) to the JSON object's internal formatting. * + * If the stream completes while mid-object, the stage will fail with a [[PartialObjectException]]. + * * @param maximumObjectLength The maximum length of allowed frames while decoding. If the maximum length is exceeded * this Flow will fail the stream. */ @@ -62,18 +69,22 @@ object JsonFraming { override def onUpstreamFinish(): Unit = { buffer.poll() match { case Some(json) => emit(out, json) - case _ => completeStage() + case _ => complete() } } - def tryPopBuffer() = { + def tryPopBuffer(): Unit = { try buffer.poll() match { case Some(json) => push(out, json) - case _ => if (isClosed(in)) completeStage() else pull(in) + case _ => if (isClosed(in)) complete() else pull(in) } catch { case NonFatal(ex) => failStage(ex) } } + + def complete(): Unit = + if (buffer.canComplete) completeStage() + else failStage(new PartialObjectException) } })