diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala index 0d67dcd847..cc7a5b94b9 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/JsonFramingSpec.scala @@ -123,7 +123,7 @@ class JsonFramingSpec extends AkkaSpec { """.stripMargin, """{ "na""", """me": "jack""", - """"}]"""").map(ByteString(_)) + """"}]""").map(ByteString(_)) val result = Source.apply(input).via(JsonFraming.objectScanner(Int.MaxValue)).runFold(Seq.empty[String]) { case (acc, entry) => acc ++ Seq(entry.utf8String) diff --git a/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala b/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala index b500974347..ad777371cd 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala @@ -5,7 +5,6 @@ package akka.stream.impl import scala.annotation.switch - import akka.annotation.InternalApi import akka.stream.scaladsl.Framing.FramingException import akka.util.ByteString @@ -50,61 +49,70 @@ import akka.util.ByteString @InternalApi private[akka] class JsonObjectParser(maximumObjectLength: Int = Int.MaxValue) { import JsonObjectParser._ - private var buffer: ByteString = ByteString.empty + private[this] var buffer: Array[Byte] = Array.empty - private var pos = 0 // latest position of pointer while scanning for json object end - private var trimFront = 0 // number of chars to drop from the front of the bytestring before emitting (skip whitespace etc) - private var depth = 0 // counter of object-nesting depth, once hits 0 an object should be emitted + private[this] var pos = 0 // latest position of pointer while scanning for json object end + private[this] var start = 0 // number of chars to drop from the front of the bytestring before emitting (skip whitespace etc) + private[this] var depth = 0 // counter of object-nesting depth, once hits 0 an object should be emitted - private var completedObject = false - private var inStringExpression = false - private var isStartOfEscapeSequence = false - private var lastInput = 0.toByte + private[this] var completedObject = false + private[this] var inStringExpression = false + private[this] var inBackslashEscape = false /** - * Appends input ByteString to internal byte string buffer. + * Appends input ByteString to internal buffer. * Use [[poll]] to extract contained JSON objects. */ - def offer(input: ByteString): Unit = - buffer ++= input + def offer(input: ByteString): Unit = { + val oldBuffer = buffer + buffer = new Array[Byte](oldBuffer.length - start + input.size) + System.arraycopy(oldBuffer, start, buffer, 0, oldBuffer.length - start) + input.copyToArray(buffer, oldBuffer.length - start) + pos -= start + start = 0 + } def isEmpty: Boolean = buffer.isEmpty /** `true` if the buffer is in a valid state to end framing. */ - def canComplete: Boolean = !insideObject + def canComplete: Boolean = depth == 0 /** * Attempt to locate next complete JSON object in buffered ByteString and returns `Some(it)` if found. * May throw a [[akka.stream.scaladsl.Framing.FramingException]] if the contained JSON is invalid or max object size is exceeded. */ - def poll(): Option[ByteString] = { - val foundObject = seekObject() - if (!foundObject) None - else - (pos: @switch) match { - case -1 | 0 => None - case _ => - val (emit, buf) = buffer.splitAt(pos) - buffer = buf.compact - pos = 0 - - val tf = trimFront - trimFront = 0 - - if (tf == 0) Some(emit) - else { - val trimmed = emit.drop(tf) - if (trimmed.isEmpty) None - else Some(trimmed) - } - } - } + def poll(): Option[ByteString] = + try { + val foundObject = seekObject() + if (!foundObject) None + else + (pos: @switch) match { + case -1 | 0 => None + case _ => + if (start == pos) None + else { + val res = ByteString.fromArrayUnsafe(buffer, start, pos - start) + start = pos + Some(res) + } + } + } catch { + case _: ArithmeticException => + throw new FramingException(s"Invalid JSON encountered at position [$pos] of [${ByteString(buffer).utf8String}]") + } /** @return true if an entire valid JSON object was found, false otherwise */ private def seekObject(): Boolean = { completedObject = false - val bufSize = buffer.size - while (pos != -1 && (pos < bufSize && pos < maximumObjectLength) && !completedObject) proceed(buffer(pos)) + val bufSize = buffer.length + + skipToNextObject(bufSize) + + while (pos < bufSize && pos < maximumObjectLength && !completedObject) { + val input = buffer(pos) + proceed(input) + pos += 1 + } if (pos >= maximumObjectLength) throw new FramingException(s"""JSON element exceeded maximumObjectLength ($maximumObjectLength bytes)!""") @@ -112,53 +120,49 @@ import akka.util.ByteString completedObject } - private def proceed(input: Byte): Unit = { - if (input == SquareBraceStart && outsideObject) { - // outer object is an array + private def skipToNextObject(bufSize: Int): Unit = + while (pos != -1 && pos < bufSize && pos < maximumObjectLength && depth == 0) { + val outer = outerChars(buffer(pos) & 0xff) + start += outer & 1 + depth = (outer & 2) >> 1 + + 1 / outer // HACK: use arithmetic exception to cover error case pos += 1 - trimFront += 1 - } else if (input == SquareBraceEnd && outsideObject) { - // outer array completed! - pos += 1 - trimFront += 1 - } else if (input == Comma && outsideObject) { - // do nothing - pos += 1 - trimFront += 1 - } else if (input == Backslash) { - if (lastInput == Backslash & isStartOfEscapeSequence) isStartOfEscapeSequence = false - else isStartOfEscapeSequence = true - pos += 1 - } else if (input == DoubleQuote) { - if (!isStartOfEscapeSequence) inStringExpression = !inStringExpression - isStartOfEscapeSequence = false - pos += 1 - } else if (input == CurlyBraceStart && !inStringExpression) { - isStartOfEscapeSequence = false - depth += 1 - pos += 1 - } else if (input == CurlyBraceEnd && !inStringExpression) { - isStartOfEscapeSequence = false - depth -= 1 - pos += 1 - if (depth == 0) completedObject = true - } else if (isWhitespace(input) && !inStringExpression) { - pos += 1 - if (depth == 0) trimFront += 1 - } else if (insideObject) { - isStartOfEscapeSequence = false - pos += 1 - } else { - throw new FramingException(s"Invalid JSON encountered at position [$pos] of [$buffer]") } - lastInput = input - } - - @inline private final def insideObject: Boolean = - !outsideObject - - @inline private final def outsideObject: Boolean = - depth == 0 + private def proceed(input: Byte): Unit = + if (inStringExpression) { + input match { + case DoubleQuote => + inStringExpression = inBackslashEscape + inBackslashEscape = false + case Backslash => + inBackslashEscape = !inBackslashEscape + case _ => + inBackslashEscape = false + } + } else + input match { + case DoubleQuote => + inStringExpression = true + case CurlyBraceStart => + depth += 1 + case CurlyBraceEnd => + depth -= 1 + completedObject = depth == 0 + case _ => + } + // lookup table to avoid branches while scanning over characters outside of objects + private val outerChars: Array[Byte] = + // 0x1: skip + // 0x2: depth + 1 + Array.tabulate[Byte](256) { i => + i.toByte match { + case CurlyBraceStart => 2 // found object + case SquareBraceStart | SquareBraceEnd | Comma => 1 // skip + case b if isWhitespace(b.toByte) => 1 // skip + case _ => 0 // error + } + } }