From f3ba271842fbf0e12fedb9bc65bf00732b4a5e8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Thu, 17 Nov 2016 13:17:34 +0100 Subject: [PATCH] #21838: Fix ByteStringParser backpressure and convert it to simple push-pull --- .../akka/stream/io/ByteStringParserSpec.scala | 56 +++++++++++++++ .../stream/impl/io/ByteStringParser.scala | 71 ++++++++++++++----- 2 files changed, 109 insertions(+), 18 deletions(-) create mode 100644 akka-stream-tests/src/test/scala/akka/stream/io/ByteStringParserSpec.scala diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/ByteStringParserSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/ByteStringParserSpec.scala new file mode 100644 index 0000000000..33c7f5e01b --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/io/ByteStringParserSpec.scala @@ -0,0 +1,56 @@ +/** + * Copyright (C) 2014-2016 Lightbend Inc. + */ +package akka.stream.io + +import akka.stream.{ ActorMaterializer, Attributes, ThrottleMode } +import akka.stream.impl.io.ByteStringParser +import akka.stream.scaladsl.{ Sink, Source } +import akka.stream.stage.GraphStageLogic +import akka.stream.testkit.StreamSpec +import akka.util.ByteString + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class ByteStringParserSpec extends StreamSpec() { + implicit val materializer = ActorMaterializer() + + class Chunker extends ByteStringParser[ByteString] { + import ByteStringParser._ + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new ParsingLogic { + + lazy val step: ParseStep[ByteString] = new ParseStep[ByteString] { + override def parse(reader: ByteReader): ParseResult[ByteString] = { + val bytes = reader.take(2) + ParseResult(Some(bytes), step) + } + } + + startWith(step) + + } + + } + + "ByteStringParser" must { + + "respect backpressure" in { + // The Chunker produces two frames for one incoming 4 byte chunk. Hence, the rate in the incoming + // side of the Chunker should only be half than on its outgoing side. + + val result = Source.repeat(ByteString("abcd")) + .take(500) + .throttle(1000, 1.second, 10, ThrottleMode.Enforcing) + .via(new Chunker) + .throttle(1000, 1.second, 10, ThrottleMode.Shaping) + .runWith(Sink.ignore) + + Await.result(result, 5.seconds) + + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/ByteStringParser.scala b/akka-stream/src/main/scala/akka/stream/impl/io/ByteStringParser.scala index 988f239815..553322fae3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/ByteStringParser.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/ByteStringParser.scala @@ -22,54 +22,87 @@ private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[By override def initialAttributes = Attributes.name("ByteStringParser") final override val shape = FlowShape(bytesIn, objOut) - class ParsingLogic extends GraphStageLogic(shape) with InHandler { - var pullOnParserRequest = false - override def preStart(): Unit = pull(bytesIn) - setHandler(objOut, eagerTerminateOutput) - + class ParsingLogic extends GraphStageLogic(shape) with InHandler with OutHandler { private var buffer = ByteString.empty private var current: ParseStep[T] = FinishedParser private var acceptUpstreamFinish: Boolean = true + private var untilCompact = CompactionThreshold final protected def startWith(step: ParseStep[T]): Unit = current = step - @tailrec private def doParse(): Unit = + /** + * doParse() is the main driver for the parser. It can be called from onPush, onPull and onUpstreamFinish. + * The general logic is that invocation of this method either results in an emitted parsed element, or an indication + * that there is more data needed. + * + * On completion there are various cases: + * - buffer is empty: parser accepts completion or fails. + * - buffer is non-empty, we wait for a pull. This might result in a few more onPull-push cycles, served from the + * buffer. This can lead to two conditions: + * - drained, empty buffer. This is either accepted completion (acceptUpstreamFinish) or a truncation. + * - parser demands more data than in buffer. This is always a truncation. + */ + private def doParse(): Unit = { if (buffer.nonEmpty) { val reader = new ByteReader(buffer) - val cont = try { + try { val parseResult = current.parse(reader) acceptUpstreamFinish = parseResult.acceptUpstreamFinish - parseResult.result.map(emit(objOut, _)) + parseResult.result.foreach(push(objOut, _)) + if (parseResult.nextStep == FinishedParser) { completeStage() - false } else { buffer = reader.remainingData current = parseResult.nextStep - true } } catch { case NeedMoreData ⇒ acceptUpstreamFinish = false if (current.canWorkWithPartialData) buffer = reader.remainingData - pull(bytesIn) - false + + // Not enough data in buffer and upstream is closed + if (isClosed(bytesIn)) current.onTruncation() + else pull(bytesIn) } - if (cont) doParse() - } else pull(bytesIn) + } else { + if (isClosed(bytesIn)) { + // Buffer is empty and upstream is done. If the current phase accepts completion we are done, + // otherwise report truncation. + if (acceptUpstreamFinish) completeStage() + else current.onTruncation() + } else pull(bytesIn) + } + } + + // Completion is handled by doParse as the buffer either gets empty after this call, or the parser requests + // data that we can no longer provide (truncation). + override def onPull(): Unit = doParse() def onPush(): Unit = { - pullOnParserRequest = false + // Buffer management before we call doParse(): + // - append new bytes + // - compact buffer if necessary buffer ++= grab(bytesIn) + untilCompact -= 1 + if (untilCompact == 0) { + // Compaction prevents of ever growing tree (list) of ByteString if buffer contents overlap most of the + // time and hence keep referring to old buffer ByteStrings. Compaction is performed only once in a while + // to reduce cost of copy. + untilCompact = CompactionThreshold + buffer = buffer.compact + } doParse() } override def onUpstreamFinish(): Unit = { - if (buffer.isEmpty && acceptUpstreamFinish) completeStage() - else current.onTruncation() + // If we have no pending pull from downstream, attempt to invoke the parser again. This will handle + // truncation if necessary, or complete the stage (and maybe a final emit). + if (isAvailable(objOut)) doParse() + // Otherwise the pending pull will kick of doParse() } - setHandler(bytesIn, this) + setHandlers(bytesIn, objOut, this) } } @@ -78,6 +111,8 @@ private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[By */ private[akka] object ByteStringParser { + val CompactionThreshold = 16 + /** * @param result - parser can return some element for downstream or return None if no element was generated * @param nextStep - next parser