Change JsonFraming to fail stage if completing within an object #29228

This commit is contained in:
Dave Handy 2020-07-03 11:22:40 -04:00 committed by GitHub
parent f6ceb4d49a
commit 2f2ee9e67c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 47 additions and 17 deletions

View file

@ -4,18 +4,18 @@
package akka.stream.scaladsl
import akka.stream.impl.JsonObjectParser
import akka.stream.scaladsl.Framing.FramingException
import akka.stream.scaladsl.JsonFraming.PartialObjectException
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.{ TestPublisher, TestSubscriber }
import akka.testkit.AkkaSpec
import akka.util.ByteString
import scala.collection.immutable.Seq
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.stream.impl.JsonObjectParser
import akka.stream.scaladsl.Framing.FramingException
import akka.stream.testkit.TestPublisher
import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.AkkaSpec
import akka.util.ByteString
class JsonFramingSpec extends AkkaSpec {
"collecting multiple json" should {
@ -515,13 +515,29 @@ class JsonFramingSpec extends AkkaSpec {
probe.ensureSubscription()
probe
.request(1)
.expectNext(ByteString("""{ "name": "john" }"""))
.request(1)
.expectNext(ByteString("""{ "name": "jack" }"""))
.requestNext(ByteString("""{ "name": "john" }"""))
.requestNext(ByteString("""{ "name": "jack" }"""))
.request(1)
.expectError()
.getMessage should include("exceeded")
}
"fail when completing inside an object" in {
val input = ByteString("{")
val probe = Source.single(input).via(JsonFraming.objectScanner(48)).runWith(TestSink.probe)
probe.ensureSubscription()
probe.request(1).expectError() shouldBe a[PartialObjectException]
}
"fail when pushing and inside an object" in {
val input = """ { "name": "john" }, { """
Source
.single(ByteString(input))
.via(JsonFraming.objectScanner(Int.MaxValue))
.runWith(Sink.last)
.failed
.futureValue shouldBe a[PartialObjectException]
}
}
}

View file

@ -70,6 +70,9 @@ import akka.util.ByteString
def isEmpty: Boolean = buffer.isEmpty
/** `true` if the buffer is in a valid state to end framing. */
def canComplete: Boolean = !insideObject
/**
* Attempt to locate next complete JSON object in buffered ByteString and returns `Some(it)` if found.
* May throw a [[akka.stream.scaladsl.Framing.FramingException]] if the contained JSON is invalid or max object size is exceeded.

View file

@ -4,18 +4,23 @@
package akka.stream.scaladsl
import scala.util.control.NonFatal
import akka.NotUsed
import akka.stream.Attributes
import akka.stream.impl.JsonObjectParser
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.scaladsl.Framing.FramingException
import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler }
import akka.util.ByteString
import scala.util.control.NonFatal
/** Provides JSON framing operators that can separate valid JSON objects from incoming [[ByteString]] objects. */
object JsonFraming {
/** Thrown if upstream completes with a partial object in the buffer. */
class PartialObjectException(msg: String = "JSON stream completed with partial content in the buffer!")
extends FramingException(msg)
/**
* Returns a Flow that implements a "brace counting" based framing operator for emitting valid JSON chunks.
* It scans the incoming data stream for valid JSON objects and returns chunks of ByteStrings containing only those valid chunks.
@ -37,6 +42,8 @@ object JsonFraming {
* elements are separated by multiple newlines or other whitespace characters. And of course is insensitive
* (and does not impact the emitting frame) to the JSON object's internal formatting.
*
* If the stream completes while mid-object, the stage will fail with a [[PartialObjectException]].
*
* @param maximumObjectLength The maximum length of allowed frames while decoding. If the maximum length is exceeded
* this Flow will fail the stream.
*/
@ -62,18 +69,22 @@ object JsonFraming {
override def onUpstreamFinish(): Unit = {
buffer.poll() match {
case Some(json) => emit(out, json)
case _ => completeStage()
case _ => complete()
}
}
def tryPopBuffer() = {
def tryPopBuffer(): Unit = {
try buffer.poll() match {
case Some(json) => push(out, json)
case _ => if (isClosed(in)) completeStage() else pull(in)
case _ => if (isClosed(in)) complete() else pull(in)
} catch {
case NonFatal(ex) => failStage(ex)
}
}
def complete(): Unit =
if (buffer.canComplete) completeStage()
else failStage(new PartialObjectException)
}
})