diff --git a/akka-bench-jmh/src/main/scala/akka/stream/FramingBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/FramingBenchmark.scala new file mode 100644 index 0000000000..8833638b55 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/stream/FramingBenchmark.scala @@ -0,0 +1,87 @@ +/* + * Copyright (C) 2009-2018 Lightbend Inc. + */ + +package akka.stream + +import java.util.concurrent.{ Semaphore, TimeUnit } + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.scaladsl.{ Framing, Sink, Source } +import akka.util.ByteString +import com.typesafe.config.{ Config, ConfigFactory } +import org.openjdk.jmh.annotations._ + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.Random + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.SECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +class FramingBenchmark { + + val config: Config = ConfigFactory.parseString( + """ + akka { + log-config-on-start = off + log-dead-letters-during-shutdown = off + stdout-loglevel = "OFF" + loglevel = "OFF" + actor.default-dispatcher { + #executor = "thread-pool-executor" + throughput = 1024 + } + actor.default-mailbox { + mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" + } + test { + timefactor = 1.0 + filter-leeway = 3s + single-expect-default = 3s + default-timeout = 5s + calling-thread-dispatcher { + type = akka.testkit.CallingThreadDispatcherConfigurator + } + } + }""".stripMargin + ).withFallback(ConfigFactory.load()) + + implicit val system: ActorSystem = ActorSystem("test", config) + + var materializer: ActorMaterializer = _ + + // Safe to be benchmark scoped because the flows we construct in this bench are stateless + var flow: Source[ByteString, NotUsed] = _ + + @Param(Array("32", "64", "128", "256", "512", "1024")) + var messageSize = 0 + + @Param(Array("1", "8", "16", "32", "64", "128")) + var framePerSeq = 0 + + @Setup + def setup(): Unit = { + materializer = ActorMaterializer() + + val frame = List.range(0, messageSize, 1).map(_ ⇒ Random.nextPrintableChar()).mkString + "\n" + flow = Source.repeat(ByteString(List.range(0, framePerSeq, 1).map(_ ⇒ frame).mkString)).take(100000) + .via(Framing.delimiter(ByteString("\n"), Int.MaxValue)) + } + + @TearDown + def shutdown(): Unit = { + Await.result(system.terminate(), 5.seconds) + } + + @Benchmark + @OperationsPerInvocation(100000) + def framing(): Unit = { + val lock = new Semaphore(1) + lock.acquire() + flow.runWith(Sink.onComplete(_ ⇒ lock.release()))(materializer) + lock.acquire() + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala index 52367355a4..cdfb03b422 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala @@ -9,11 +9,12 @@ import java.nio.ByteOrder import akka.NotUsed import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage -import akka.stream.{ Attributes, FlowShape, Inlet, Outlet } import akka.stream.stage._ -import akka.util.{ ByteIterator, ByteString } +import akka.stream.{ Attributes, FlowShape, Inlet, Outlet } +import akka.util.{ ByteIterator, ByteString, OptionVal } import scala.annotation.tailrec +import scala.reflect.ClassTag object Framing { @@ -26,9 +27,9 @@ object Framing { * If there are buffered bytes (an incomplete frame) when the input stream finishes and ''allowTruncation'' is set to * false then this Flow will fail the stream reporting a truncated frame. * - * @param delimiter The byte sequence to be treated as the end of the frame. - * @param allowTruncation If `false`, then when the last frame being decoded contains no valid delimiter this Flow - * fails the stream instead of returning a truncated frame. + * @param delimiter The byte sequence to be treated as the end of the frame. + * @param allowTruncation If `false`, then when the last frame being decoded contains no valid delimiter this Flow + * fails the stream instead of returning a truncated frame. * @param maximumFrameLength The maximum length of allowed frames while decoding. If the maximum length is * exceeded this Flow will fail the stream. */ @@ -43,12 +44,12 @@ object Framing { * If the input stream finishes before the last frame has been fully decoded, this Flow will fail the stream reporting * a truncated frame. * - * @param fieldLength The length of the "size" field in bytes - * @param fieldOffset The offset of the field from the beginning of the frame in bytes + * @param fieldLength The length of the "size" field in bytes + * @param fieldOffset The offset of the field from the beginning of the frame in bytes * @param maximumFrameLength The maximum length of allowed frames while decoding. If the maximum length is exceeded * this Flow will fail the stream. This length *includes* the header (i.e the offset and * the length of the size field) - * @param byteOrder The ''ByteOrder'' to be used when decoding the field + * @param byteOrder The ''ByteOrder'' to be used when decoding the field */ def lengthField( fieldLength: Int, @@ -67,16 +68,16 @@ object Framing { * If the input stream finishes before the last frame has been fully decoded, this Flow will fail the stream reporting * a truncated frame. * - * @param fieldLength The length of the "size" field in bytes - * @param fieldOffset The offset of the field from the beginning of the frame in bytes + * @param fieldLength The length of the "size" field in bytes + * @param fieldOffset The offset of the field from the beginning of the frame in bytes * @param maximumFrameLength The maximum length of allowed frames while decoding. If the maximum length is exceeded * this Flow will fail the stream. This length *includes* the header (i.e the offset and * the length of the size field) - * @param byteOrder The ''ByteOrder'' to be used when decoding the field - * @param computeFrameSize This function can be supplied if frame size is varied or needs to be computed in a special fashion. - * For example, frame can have a shape like this: `[offset bytes][body size bytes][body bytes][footer bytes]`. - * Then computeFrameSize can be used to compute the frame size: `(offset bytes, computed size) => (actual frame size)`. - * ''Actual frame size'' must be equal or bigger than sum of `fieldOffset` and `fieldLength`, the operator fails otherwise. + * @param byteOrder The ''ByteOrder'' to be used when decoding the field + * @param computeFrameSize This function can be supplied if frame size is varied or needs to be computed in a special fashion. + * For example, frame can have a shape like this: `[offset bytes][body size bytes][body bytes][footer bytes]`. + * Then computeFrameSize can be used to compute the frame size: `(offset bytes, computed size) => (actual frame size)`. + * ''Actual frame size'' must be equal or bigger than sum of `fieldOffset` and `fieldLength`, the operator fails otherwise. * */ def lengthField( @@ -197,18 +198,25 @@ object Framing { private var buffer = ByteString.empty private var nextPossibleMatch = 0 + // We use an efficient unsafe array implementation and must be use with caution. + // It contains all indices computed during search phase. + // The capacity is fixed at 256 to preserve fairness and prevent uneccessary allocation during parsing phase. + // This array provide a way to check remaining capacity and must be use to prevent out of bounds exception. + // In this use case, we compute all possibles indices up to 256 and then parse everything. + private val indices = new LightArray[(Int, Int)](256) + override def onPush(): Unit = { buffer ++= grab(in) - doParse() + searchIndices() } - override def onPull(): Unit = doParse() + override def onPull(): Unit = searchIndices() override def onUpstreamFinish(): Unit = { if (buffer.isEmpty) { completeStage() } else if (isAvailable(out)) { - doParse() + searchIndices() } // else swallow the termination and wait for pull } @@ -224,41 +232,112 @@ object Framing { } @tailrec - private def doParse(): Unit = { + private def searchIndices(): Unit = { + // Next possible position for the delimiter val possibleMatchPos = buffer.indexOf(firstSeparatorByte, from = nextPossibleMatch) - if (possibleMatchPos > maximumLineBytes) - failStage(new FramingException(s"Read ${buffer.size} bytes " + + + // Retrive previous position + val previous = indices.lastOption match { + case OptionVal.Some((_, i)) ⇒ i + separatorBytes.size + case OptionVal.None ⇒ 0 + } + + if (possibleMatchPos - previous > maximumLineBytes) { + failStage(new FramingException(s"Read ${possibleMatchPos - previous} bytes " + s"which is more than $maximumLineBytes without seeing a line terminator")) - else if (possibleMatchPos == -1) { - if (buffer.size > maximumLineBytes) - failStage(new FramingException(s"Read ${buffer.size} bytes " + + } else if (possibleMatchPos == -1) { + if (buffer.size - previous > maximumLineBytes) + failStage(new FramingException(s"Read ${buffer.size - previous} bytes " + s"which is more than $maximumLineBytes without seeing a line terminator")) else { // No matching character, we need to accumulate more bytes into the buffer nextPossibleMatch = buffer.size - tryPull() + doParse() } } else if (possibleMatchPos + separatorBytes.size > buffer.size) { // We have found a possible match (we found the first character of the terminator // sequence) but we don't have yet enough bytes. We remember the position to // retry from next time. nextPossibleMatch = possibleMatchPos - tryPull() + doParse() } else if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.size) == separatorBytes) { - // Found a match - val parsedFrame = buffer.slice(0, possibleMatchPos).compact - buffer = buffer.drop(possibleMatchPos + separatorBytes.size).compact - nextPossibleMatch = 0 - if (isClosed(in) && buffer.isEmpty) { - push(out, parsedFrame) - completeStage() - } else push(out, parsedFrame) + // Found a match, mark start and end position and iterate if possible + indices += (previous, possibleMatchPos) + nextPossibleMatch = possibleMatchPos + separatorBytes.size + if (nextPossibleMatch == buffer.size || indices.isFull) { + doParse() + } else { + searchIndices() + } } else { // possibleMatchPos was not actually a match nextPossibleMatch += 1 - doParse() + searchIndices() } } + + private def doParse(): Unit = + if (indices.isEmpty) tryPull() + else if (indices.length == 1) { + // Emit result and compact buffer + val indice = indices(0) + push(out, buffer.slice(indice._1, indice._2).compact) + reset() + if (isClosed(in) && buffer.isEmpty) completeStage() + } else { + // Emit results and compact buffer + emitMultiple(out, new FrameIterator(), () ⇒ { + reset() + if (isClosed(in) && buffer.isEmpty) completeStage() + }) + } + + private def reset(): Unit = { + val previous = indices.lastOption match { + case OptionVal.Some((_, i)) ⇒ i + separatorBytes.size + case OptionVal.None ⇒ 0 + } + + buffer = buffer.drop(previous).compact + indices.setLength(0) + nextPossibleMatch = 0 + } + + // Iterator able to iterate over precompute frame based on start and end position + private class FrameIterator(private var index: Int = 0) extends Iterator[ByteString] { + def hasNext: Boolean = index != indices.length + + def next(): ByteString = { + val indice = indices(index) + index += 1 + buffer.slice(indice._1, indice._2).compact + } + } + + // Basic array implementation that allow unsafe resize. + private class LightArray[T: ClassTag](private val capacity: Int, private var index: Int = 0) { + + private val underlying = Array.ofDim[T](capacity) + + def apply(i: Int) = underlying(i) + + def +=(el: T): Unit = { + underlying(index) = el + index += 1 + } + + def isEmpty: Boolean = length == 0 + + def isFull: Boolean = capacity == length + + def setLength(length: Int): Unit = index = length + + def length: Int = index + + def lastOption: OptionVal[T] = + if (index > 0) OptionVal.Some(underlying(index - 1)) + else OptionVal.none + } setHandlers(in, out, this) } }