Optimize compact during framing (#25902)

This commit is contained in:
Adrien Mannocci 2018-12-17 12:46:38 +01:00 committed by Arnout Engelen
parent 7e614a595f
commit 82dea881ce
2 changed files with 201 additions and 35 deletions

View file

@ -0,0 +1,87 @@
/*
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream
import java.util.concurrent.{ Semaphore, TimeUnit }
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.{ Framing, Sink, Source }
import akka.util.ByteString
import com.typesafe.config.{ Config, ConfigFactory }
import org.openjdk.jmh.annotations._
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Random
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode(Array(Mode.Throughput))
class FramingBenchmark {
val config: Config = ConfigFactory.parseString(
"""
akka {
log-config-on-start = off
log-dead-letters-during-shutdown = off
stdout-loglevel = "OFF"
loglevel = "OFF"
actor.default-dispatcher {
#executor = "thread-pool-executor"
throughput = 1024
}
actor.default-mailbox {
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
}
test {
timefactor = 1.0
filter-leeway = 3s
single-expect-default = 3s
default-timeout = 5s
calling-thread-dispatcher {
type = akka.testkit.CallingThreadDispatcherConfigurator
}
}
}""".stripMargin
).withFallback(ConfigFactory.load())
implicit val system: ActorSystem = ActorSystem("test", config)
var materializer: ActorMaterializer = _
// Safe to be benchmark scoped because the flows we construct in this bench are stateless
var flow: Source[ByteString, NotUsed] = _
@Param(Array("32", "64", "128", "256", "512", "1024"))
var messageSize = 0
@Param(Array("1", "8", "16", "32", "64", "128"))
var framePerSeq = 0
@Setup
def setup(): Unit = {
materializer = ActorMaterializer()
val frame = List.range(0, messageSize, 1).map(_ Random.nextPrintableChar()).mkString + "\n"
flow = Source.repeat(ByteString(List.range(0, framePerSeq, 1).map(_ frame).mkString)).take(100000)
.via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
}
@TearDown
def shutdown(): Unit = {
Await.result(system.terminate(), 5.seconds)
}
@Benchmark
@OperationsPerInvocation(100000)
def framing(): Unit = {
val lock = new Semaphore(1)
lock.acquire()
flow.runWith(Sink.onComplete(_ lock.release()))(materializer)
lock.acquire()
}
}

View file

@ -9,11 +9,12 @@ import java.nio.ByteOrder
import akka.NotUsed
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
import akka.stream.stage._
import akka.util.{ ByteIterator, ByteString }
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
import akka.util.{ ByteIterator, ByteString, OptionVal }
import scala.annotation.tailrec
import scala.reflect.ClassTag
object Framing {
@ -26,9 +27,9 @@ object Framing {
* If there are buffered bytes (an incomplete frame) when the input stream finishes and ''allowTruncation'' is set to
* false then this Flow will fail the stream reporting a truncated frame.
*
* @param delimiter The byte sequence to be treated as the end of the frame.
* @param allowTruncation If `false`, then when the last frame being decoded contains no valid delimiter this Flow
* fails the stream instead of returning a truncated frame.
* @param delimiter The byte sequence to be treated as the end of the frame.
* @param allowTruncation If `false`, then when the last frame being decoded contains no valid delimiter this Flow
* fails the stream instead of returning a truncated frame.
* @param maximumFrameLength The maximum length of allowed frames while decoding. If the maximum length is
* exceeded this Flow will fail the stream.
*/
@ -43,12 +44,12 @@ object Framing {
* If the input stream finishes before the last frame has been fully decoded, this Flow will fail the stream reporting
* a truncated frame.
*
* @param fieldLength The length of the "size" field in bytes
* @param fieldOffset The offset of the field from the beginning of the frame in bytes
* @param fieldLength The length of the "size" field in bytes
* @param fieldOffset The offset of the field from the beginning of the frame in bytes
* @param maximumFrameLength The maximum length of allowed frames while decoding. If the maximum length is exceeded
* this Flow will fail the stream. This length *includes* the header (i.e the offset and
* the length of the size field)
* @param byteOrder The ''ByteOrder'' to be used when decoding the field
* @param byteOrder The ''ByteOrder'' to be used when decoding the field
*/
def lengthField(
fieldLength: Int,
@ -67,16 +68,16 @@ object Framing {
* If the input stream finishes before the last frame has been fully decoded, this Flow will fail the stream reporting
* a truncated frame.
*
* @param fieldLength The length of the "size" field in bytes
* @param fieldOffset The offset of the field from the beginning of the frame in bytes
* @param fieldLength The length of the "size" field in bytes
* @param fieldOffset The offset of the field from the beginning of the frame in bytes
* @param maximumFrameLength The maximum length of allowed frames while decoding. If the maximum length is exceeded
* this Flow will fail the stream. This length *includes* the header (i.e the offset and
* the length of the size field)
* @param byteOrder The ''ByteOrder'' to be used when decoding the field
* @param computeFrameSize This function can be supplied if frame size is varied or needs to be computed in a special fashion.
* For example, frame can have a shape like this: `[offset bytes][body size bytes][body bytes][footer bytes]`.
* Then computeFrameSize can be used to compute the frame size: `(offset bytes, computed size) => (actual frame size)`.
* ''Actual frame size'' must be equal or bigger than sum of `fieldOffset` and `fieldLength`, the operator fails otherwise.
* @param byteOrder The ''ByteOrder'' to be used when decoding the field
* @param computeFrameSize This function can be supplied if frame size is varied or needs to be computed in a special fashion.
* For example, frame can have a shape like this: `[offset bytes][body size bytes][body bytes][footer bytes]`.
* Then computeFrameSize can be used to compute the frame size: `(offset bytes, computed size) => (actual frame size)`.
* ''Actual frame size'' must be equal or bigger than sum of `fieldOffset` and `fieldLength`, the operator fails otherwise.
*
*/
def lengthField(
@ -197,18 +198,25 @@ object Framing {
private var buffer = ByteString.empty
private var nextPossibleMatch = 0
// We use an efficient unsafe array implementation and must be use with caution.
// It contains all indices computed during search phase.
// The capacity is fixed at 256 to preserve fairness and prevent uneccessary allocation during parsing phase.
// This array provide a way to check remaining capacity and must be use to prevent out of bounds exception.
// In this use case, we compute all possibles indices up to 256 and then parse everything.
private val indices = new LightArray[(Int, Int)](256)
override def onPush(): Unit = {
buffer ++= grab(in)
doParse()
searchIndices()
}
override def onPull(): Unit = doParse()
override def onPull(): Unit = searchIndices()
override def onUpstreamFinish(): Unit = {
if (buffer.isEmpty) {
completeStage()
} else if (isAvailable(out)) {
doParse()
searchIndices()
} // else swallow the termination and wait for pull
}
@ -224,41 +232,112 @@ object Framing {
}
@tailrec
private def doParse(): Unit = {
private def searchIndices(): Unit = {
// Next possible position for the delimiter
val possibleMatchPos = buffer.indexOf(firstSeparatorByte, from = nextPossibleMatch)
if (possibleMatchPos > maximumLineBytes)
failStage(new FramingException(s"Read ${buffer.size} bytes " +
// Retrive previous position
val previous = indices.lastOption match {
case OptionVal.Some((_, i)) i + separatorBytes.size
case OptionVal.None 0
}
if (possibleMatchPos - previous > maximumLineBytes) {
failStage(new FramingException(s"Read ${possibleMatchPos - previous} bytes " +
s"which is more than $maximumLineBytes without seeing a line terminator"))
else if (possibleMatchPos == -1) {
if (buffer.size > maximumLineBytes)
failStage(new FramingException(s"Read ${buffer.size} bytes " +
} else if (possibleMatchPos == -1) {
if (buffer.size - previous > maximumLineBytes)
failStage(new FramingException(s"Read ${buffer.size - previous} bytes " +
s"which is more than $maximumLineBytes without seeing a line terminator"))
else {
// No matching character, we need to accumulate more bytes into the buffer
nextPossibleMatch = buffer.size
tryPull()
doParse()
}
} else if (possibleMatchPos + separatorBytes.size > buffer.size) {
// We have found a possible match (we found the first character of the terminator
// sequence) but we don't have yet enough bytes. We remember the position to
// retry from next time.
nextPossibleMatch = possibleMatchPos
tryPull()
doParse()
} else if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.size) == separatorBytes) {
// Found a match
val parsedFrame = buffer.slice(0, possibleMatchPos).compact
buffer = buffer.drop(possibleMatchPos + separatorBytes.size).compact
nextPossibleMatch = 0
if (isClosed(in) && buffer.isEmpty) {
push(out, parsedFrame)
completeStage()
} else push(out, parsedFrame)
// Found a match, mark start and end position and iterate if possible
indices += (previous, possibleMatchPos)
nextPossibleMatch = possibleMatchPos + separatorBytes.size
if (nextPossibleMatch == buffer.size || indices.isFull) {
doParse()
} else {
searchIndices()
}
} else {
// possibleMatchPos was not actually a match
nextPossibleMatch += 1
doParse()
searchIndices()
}
}
private def doParse(): Unit =
if (indices.isEmpty) tryPull()
else if (indices.length == 1) {
// Emit result and compact buffer
val indice = indices(0)
push(out, buffer.slice(indice._1, indice._2).compact)
reset()
if (isClosed(in) && buffer.isEmpty) completeStage()
} else {
// Emit results and compact buffer
emitMultiple(out, new FrameIterator(), () {
reset()
if (isClosed(in) && buffer.isEmpty) completeStage()
})
}
private def reset(): Unit = {
val previous = indices.lastOption match {
case OptionVal.Some((_, i)) i + separatorBytes.size
case OptionVal.None 0
}
buffer = buffer.drop(previous).compact
indices.setLength(0)
nextPossibleMatch = 0
}
// Iterator able to iterate over precompute frame based on start and end position
private class FrameIterator(private var index: Int = 0) extends Iterator[ByteString] {
def hasNext: Boolean = index != indices.length
def next(): ByteString = {
val indice = indices(index)
index += 1
buffer.slice(indice._1, indice._2).compact
}
}
// Basic array implementation that allow unsafe resize.
private class LightArray[T: ClassTag](private val capacity: Int, private var index: Int = 0) {
private val underlying = Array.ofDim[T](capacity)
def apply(i: Int) = underlying(i)
def +=(el: T): Unit = {
underlying(index) = el
index += 1
}
def isEmpty: Boolean = length == 0
def isFull: Boolean = capacity == length
def setLength(length: Int): Unit = index = length
def length: Int = index
def lastOption: OptionVal[T] =
if (index > 0) OptionVal.Some(underlying(index - 1))
else OptionVal.none
}
setHandlers(in, out, this)
}
}