stream: optimize JsonFraming (#31380)

This commit is contained in:
Johannes Rudolph 2022-05-13 10:32:44 +02:00 committed by GitHub
parent b80b50a2e5
commit 6b30134c5c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 87 additions and 83 deletions

View file

@ -123,7 +123,7 @@ class JsonFramingSpec extends AkkaSpec {
""".stripMargin, """.stripMargin,
"""{ "na""", """{ "na""",
"""me": "jack""", """me": "jack""",
""""}]"""").map(ByteString(_)) """"}]""").map(ByteString(_))
val result = Source.apply(input).via(JsonFraming.objectScanner(Int.MaxValue)).runFold(Seq.empty[String]) { val result = Source.apply(input).via(JsonFraming.objectScanner(Int.MaxValue)).runFold(Seq.empty[String]) {
case (acc, entry) => acc ++ Seq(entry.utf8String) case (acc, entry) => acc ++ Seq(entry.utf8String)

View file

@ -5,7 +5,6 @@
package akka.stream.impl package akka.stream.impl
import scala.annotation.switch import scala.annotation.switch
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.stream.scaladsl.Framing.FramingException import akka.stream.scaladsl.Framing.FramingException
import akka.util.ByteString import akka.util.ByteString
@ -50,61 +49,70 @@ import akka.util.ByteString
@InternalApi private[akka] class JsonObjectParser(maximumObjectLength: Int = Int.MaxValue) { @InternalApi private[akka] class JsonObjectParser(maximumObjectLength: Int = Int.MaxValue) {
import JsonObjectParser._ import JsonObjectParser._
private var buffer: ByteString = ByteString.empty private[this] var buffer: Array[Byte] = Array.empty
private var pos = 0 // latest position of pointer while scanning for json object end private[this] var pos = 0 // latest position of pointer while scanning for json object end
private var trimFront = 0 // number of chars to drop from the front of the bytestring before emitting (skip whitespace etc) private[this] var start = 0 // number of chars to drop from the front of the bytestring before emitting (skip whitespace etc)
private var depth = 0 // counter of object-nesting depth, once hits 0 an object should be emitted private[this] var depth = 0 // counter of object-nesting depth, once hits 0 an object should be emitted
private var completedObject = false private[this] var completedObject = false
private var inStringExpression = false private[this] var inStringExpression = false
private var isStartOfEscapeSequence = false private[this] var inBackslashEscape = false
private var lastInput = 0.toByte
/** /**
* Appends input ByteString to internal byte string buffer. * Appends input ByteString to internal buffer.
* Use [[poll]] to extract contained JSON objects. * Use [[poll]] to extract contained JSON objects.
*/ */
def offer(input: ByteString): Unit = def offer(input: ByteString): Unit = {
buffer ++= input val oldBuffer = buffer
buffer = new Array[Byte](oldBuffer.length - start + input.size)
System.arraycopy(oldBuffer, start, buffer, 0, oldBuffer.length - start)
input.copyToArray(buffer, oldBuffer.length - start)
pos -= start
start = 0
}
def isEmpty: Boolean = buffer.isEmpty def isEmpty: Boolean = buffer.isEmpty
/** `true` if the buffer is in a valid state to end framing. */ /** `true` if the buffer is in a valid state to end framing. */
def canComplete: Boolean = !insideObject def canComplete: Boolean = depth == 0
/** /**
* Attempt to locate next complete JSON object in buffered ByteString and returns `Some(it)` if found. * Attempt to locate next complete JSON object in buffered ByteString and returns `Some(it)` if found.
* May throw a [[akka.stream.scaladsl.Framing.FramingException]] if the contained JSON is invalid or max object size is exceeded. * May throw a [[akka.stream.scaladsl.Framing.FramingException]] if the contained JSON is invalid or max object size is exceeded.
*/ */
def poll(): Option[ByteString] = { def poll(): Option[ByteString] =
val foundObject = seekObject() try {
if (!foundObject) None val foundObject = seekObject()
else if (!foundObject) None
(pos: @switch) match { else
case -1 | 0 => None (pos: @switch) match {
case _ => case -1 | 0 => None
val (emit, buf) = buffer.splitAt(pos) case _ =>
buffer = buf.compact if (start == pos) None
pos = 0 else {
val res = ByteString.fromArrayUnsafe(buffer, start, pos - start)
val tf = trimFront start = pos
trimFront = 0 Some(res)
}
if (tf == 0) Some(emit) }
else { } catch {
val trimmed = emit.drop(tf) case _: ArithmeticException =>
if (trimmed.isEmpty) None throw new FramingException(s"Invalid JSON encountered at position [$pos] of [${ByteString(buffer).utf8String}]")
else Some(trimmed) }
}
}
}
/** @return true if an entire valid JSON object was found, false otherwise */ /** @return true if an entire valid JSON object was found, false otherwise */
private def seekObject(): Boolean = { private def seekObject(): Boolean = {
completedObject = false completedObject = false
val bufSize = buffer.size val bufSize = buffer.length
while (pos != -1 && (pos < bufSize && pos < maximumObjectLength) && !completedObject) proceed(buffer(pos))
skipToNextObject(bufSize)
while (pos < bufSize && pos < maximumObjectLength && !completedObject) {
val input = buffer(pos)
proceed(input)
pos += 1
}
if (pos >= maximumObjectLength) if (pos >= maximumObjectLength)
throw new FramingException(s"""JSON element exceeded maximumObjectLength ($maximumObjectLength bytes)!""") throw new FramingException(s"""JSON element exceeded maximumObjectLength ($maximumObjectLength bytes)!""")
@ -112,53 +120,49 @@ import akka.util.ByteString
completedObject completedObject
} }
private def proceed(input: Byte): Unit = { private def skipToNextObject(bufSize: Int): Unit =
if (input == SquareBraceStart && outsideObject) { while (pos != -1 && pos < bufSize && pos < maximumObjectLength && depth == 0) {
// outer object is an array val outer = outerChars(buffer(pos) & 0xff)
start += outer & 1
depth = (outer & 2) >> 1
1 / outer // HACK: use arithmetic exception to cover error case
pos += 1 pos += 1
trimFront += 1
} else if (input == SquareBraceEnd && outsideObject) {
// outer array completed!
pos += 1
trimFront += 1
} else if (input == Comma && outsideObject) {
// do nothing
pos += 1
trimFront += 1
} else if (input == Backslash) {
if (lastInput == Backslash & isStartOfEscapeSequence) isStartOfEscapeSequence = false
else isStartOfEscapeSequence = true
pos += 1
} else if (input == DoubleQuote) {
if (!isStartOfEscapeSequence) inStringExpression = !inStringExpression
isStartOfEscapeSequence = false
pos += 1
} else if (input == CurlyBraceStart && !inStringExpression) {
isStartOfEscapeSequence = false
depth += 1
pos += 1
} else if (input == CurlyBraceEnd && !inStringExpression) {
isStartOfEscapeSequence = false
depth -= 1
pos += 1
if (depth == 0) completedObject = true
} else if (isWhitespace(input) && !inStringExpression) {
pos += 1
if (depth == 0) trimFront += 1
} else if (insideObject) {
isStartOfEscapeSequence = false
pos += 1
} else {
throw new FramingException(s"Invalid JSON encountered at position [$pos] of [$buffer]")
} }
lastInput = input private def proceed(input: Byte): Unit =
} if (inStringExpression) {
input match {
@inline private final def insideObject: Boolean = case DoubleQuote =>
!outsideObject inStringExpression = inBackslashEscape
inBackslashEscape = false
@inline private final def outsideObject: Boolean = case Backslash =>
depth == 0 inBackslashEscape = !inBackslashEscape
case _ =>
inBackslashEscape = false
}
} else
input match {
case DoubleQuote =>
inStringExpression = true
case CurlyBraceStart =>
depth += 1
case CurlyBraceEnd =>
depth -= 1
completedObject = depth == 0
case _ =>
}
// lookup table to avoid branches while scanning over characters outside of objects
private val outerChars: Array[Byte] =
// 0x1: skip
// 0x2: depth + 1
Array.tabulate[Byte](256) { i =>
i.toByte match {
case CurlyBraceStart => 2 // found object
case SquareBraceStart | SquareBraceEnd | Comma => 1 // skip
case b if isWhitespace(b.toByte) => 1 // skip
case _ => 0 // error
}
}
} }