diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/JsonFramingSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/JsonFramingSpec.scala index 937dff96d9..8150274f27 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/JsonFramingSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/JsonFramingSpec.scala @@ -545,6 +545,22 @@ class JsonFramingSpec extends PekkoSpec { probe.request(1).expectError() shouldBe a[PartialObjectException] } + "complete when completing inside an empty chars" in { + val input = ByteString(" ") + val probe = Source.single(input).via(JsonFraming.objectScanner(48)).runWith(TestSink.probe) + + probe.ensureSubscription() + probe.request(1).expectComplete() + } + + "complete when completing with an empty byte string" in { + val input = ByteString() + val probe = Source.single(input).via(JsonFraming.objectScanner(48)).runWith(TestSink.probe) + + probe.ensureSubscription() + probe.request(1).expectComplete() + } + "fail when pushing and inside an object" in { val input = """ { "name": "john" }, { """ Source diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/JsonObjectParser.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/JsonObjectParser.scala index b7f24f7a0a..4b87c356bd 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/JsonObjectParser.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/JsonObjectParser.scala @@ -115,20 +115,21 @@ import pekko.util.ByteString private def seekObject(): Boolean = { completedObject = false val bufSize = buffer.length + if (bufSize > 0) { + skipToNextObject(bufSize) + val maxObjectLengthIndex = if (pos + maximumObjectLength < 0) Int.MaxValue else pos + maximumObjectLength - skipToNextObject(bufSize) - val maxObjectLengthIndex = if (pos + maximumObjectLength < 0) Int.MaxValue else pos + maximumObjectLength + while (pos < bufSize && pos < maxObjectLengthIndex && !completedObject) { + val input = buffer(pos) + proceed(input) + pos += 1 + } - while (pos < bufSize && pos < maxObjectLengthIndex && !completedObject) { - val input = buffer(pos) - proceed(input) - pos += 1 - } + if (pos >= maxObjectLengthIndex) + throw new FramingException(s"""JSON element exceeded maximumObjectLength ($maximumObjectLength bytes)!""") - if (pos >= maxObjectLengthIndex) - throw new FramingException(s"""JSON element exceeded maximumObjectLength ($maximumObjectLength bytes)!""") - - completedObject + completedObject + } else false } private def skipToNextObject(bufSize: Int): Unit =