From 6b90b20334e540874cd10767aac8ee5bbf80ba0b Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Mon, 29 May 2023 17:27:37 +0200 Subject: [PATCH] stream: fix regression in JsonFraming (#44) * stream: fix regression in JsonFraming Was introduced in 6b30134c5c7e8deefdfc7ff9a3e74e0bbd2d433b where an invariant was changed what `pos` can be when the loop is run. * fix when maximumObjectLength is near Int.MaxValue --- .../org/apache/pekko/stream/scaladsl/JsonFramingSpec.scala | 7 ++++++- .../org/apache/pekko/stream/impl/JsonObjectParser.scala | 7 ++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/JsonFramingSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/JsonFramingSpec.scala index 0182969538..52508c41a0 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/JsonFramingSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/JsonFramingSpec.scala @@ -41,7 +41,7 @@ class JsonFramingSpec extends PekkoSpec { |""".stripMargin // also should complete once notices end of array val result = - Source.single(ByteString(input)).via(JsonFraming.objectScanner(Int.MaxValue)).runFold(Seq.empty[String]) { + Source.single(ByteString(input)).via(JsonFraming.objectScanner(64)).runFold(Seq.empty[String]) { case (acc, entry) => acc ++ Seq(entry.utf8String) } // #using-json-framing @@ -494,6 +494,11 @@ class JsonFramingSpec extends PekkoSpec { a[FramingException] shouldBe thrownBy { buffer.poll() } } } + "maximumObjectLength is near Int.MaxValue" in { + val buffer = new JsonObjectParser(Int.MaxValue - 1) + buffer.offer(ByteString(""" {}""")) + buffer.poll().get.utf8String shouldBe """{}""" + } } "fail on too large initial object" in { diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/JsonObjectParser.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/JsonObjectParser.scala index fc4413458f..3584a1467b 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/JsonObjectParser.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/JsonObjectParser.scala @@ -117,21 +117,22 @@ import pekko.util.ByteString val bufSize = buffer.length skipToNextObject(bufSize) + val maxObjectLengthIndex = if (pos + maximumObjectLength < 0) Int.MaxValue else pos + maximumObjectLength - while (pos < bufSize && pos < maximumObjectLength && !completedObject) { + while (pos < bufSize && pos < maxObjectLengthIndex && !completedObject) { val input = buffer(pos) proceed(input) pos += 1 } - if (pos >= maximumObjectLength) + if (pos >= maxObjectLengthIndex) throw new FramingException(s"""JSON element exceeded maximumObjectLength ($maximumObjectLength bytes)!""") completedObject } private def skipToNextObject(bufSize: Int): Unit = - while (pos != -1 && pos < bufSize && pos < maximumObjectLength && depth == 0) { + while (pos != -1 && pos < bufSize && depth == 0) { val outer = outerChars(buffer(pos) & 0xFF) start += outer & 1 depth = (outer & 2) >> 1