#21838: Fix ByteStringParser backpressure

and convert it to simple push-pull
This commit is contained in:
Endre Sándor Varga 2016-11-17 13:17:34 +01:00
parent 88a6bdb059
commit f3ba271842
2 changed files with 109 additions and 18 deletions

View file

@ -0,0 +1,56 @@
/**
* Copyright (C) 2014-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.io
import akka.stream.{ ActorMaterializer, Attributes, ThrottleMode }
import akka.stream.impl.io.ByteStringParser
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.stage.GraphStageLogic
import akka.stream.testkit.StreamSpec
import akka.util.ByteString
import scala.concurrent.Await
import scala.concurrent.duration._
class ByteStringParserSpec extends StreamSpec() {
implicit val materializer = ActorMaterializer()
class Chunker extends ByteStringParser[ByteString] {
import ByteStringParser._
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new ParsingLogic {
lazy val step: ParseStep[ByteString] = new ParseStep[ByteString] {
override def parse(reader: ByteReader): ParseResult[ByteString] = {
val bytes = reader.take(2)
ParseResult(Some(bytes), step)
}
}
startWith(step)
}
}
"ByteStringParser" must {
"respect backpressure" in {
// The Chunker produces two frames for one incoming 4 byte chunk. Hence, the rate in the incoming
// side of the Chunker should only be half than on its outgoing side.
val result = Source.repeat(ByteString("abcd"))
.take(500)
.throttle(1000, 1.second, 10, ThrottleMode.Enforcing)
.via(new Chunker)
.throttle(1000, 1.second, 10, ThrottleMode.Shaping)
.runWith(Sink.ignore)
Await.result(result, 5.seconds)
}
}
}

View file

@ -22,54 +22,87 @@ private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[By
override def initialAttributes = Attributes.name("ByteStringParser")
final override val shape = FlowShape(bytesIn, objOut)
class ParsingLogic extends GraphStageLogic(shape) with InHandler {
var pullOnParserRequest = false
override def preStart(): Unit = pull(bytesIn)
setHandler(objOut, eagerTerminateOutput)
class ParsingLogic extends GraphStageLogic(shape) with InHandler with OutHandler {
private var buffer = ByteString.empty
private var current: ParseStep[T] = FinishedParser
private var acceptUpstreamFinish: Boolean = true
private var untilCompact = CompactionThreshold
final protected def startWith(step: ParseStep[T]): Unit = current = step
@tailrec private def doParse(): Unit =
/**
* doParse() is the main driver for the parser. It can be called from onPush, onPull and onUpstreamFinish.
* The general logic is that invocation of this method either results in an emitted parsed element, or an indication
* that there is more data needed.
*
* On completion there are various cases:
* - buffer is empty: parser accepts completion or fails.
* - buffer is non-empty, we wait for a pull. This might result in a few more onPull-push cycles, served from the
* buffer. This can lead to two conditions:
* - drained, empty buffer. This is either accepted completion (acceptUpstreamFinish) or a truncation.
* - parser demands more data than in buffer. This is always a truncation.
*/
private def doParse(): Unit = {
if (buffer.nonEmpty) {
val reader = new ByteReader(buffer)
val cont = try {
try {
val parseResult = current.parse(reader)
acceptUpstreamFinish = parseResult.acceptUpstreamFinish
parseResult.result.map(emit(objOut, _))
parseResult.result.foreach(push(objOut, _))
if (parseResult.nextStep == FinishedParser) {
completeStage()
false
} else {
buffer = reader.remainingData
current = parseResult.nextStep
true
}
} catch {
case NeedMoreData
acceptUpstreamFinish = false
if (current.canWorkWithPartialData) buffer = reader.remainingData
pull(bytesIn)
false
// Not enough data in buffer and upstream is closed
if (isClosed(bytesIn)) current.onTruncation()
else pull(bytesIn)
}
if (cont) doParse()
} else pull(bytesIn)
} else {
if (isClosed(bytesIn)) {
// Buffer is empty and upstream is done. If the current phase accepts completion we are done,
// otherwise report truncation.
if (acceptUpstreamFinish) completeStage()
else current.onTruncation()
} else pull(bytesIn)
}
}
// Completion is handled by doParse as the buffer either gets empty after this call, or the parser requests
// data that we can no longer provide (truncation).
override def onPull(): Unit = doParse()
def onPush(): Unit = {
pullOnParserRequest = false
// Buffer management before we call doParse():
// - append new bytes
// - compact buffer if necessary
buffer ++= grab(bytesIn)
untilCompact -= 1
if (untilCompact == 0) {
// Compaction prevents of ever growing tree (list) of ByteString if buffer contents overlap most of the
// time and hence keep referring to old buffer ByteStrings. Compaction is performed only once in a while
// to reduce cost of copy.
untilCompact = CompactionThreshold
buffer = buffer.compact
}
doParse()
}
override def onUpstreamFinish(): Unit = {
if (buffer.isEmpty && acceptUpstreamFinish) completeStage()
else current.onTruncation()
// If we have no pending pull from downstream, attempt to invoke the parser again. This will handle
// truncation if necessary, or complete the stage (and maybe a final emit).
if (isAvailable(objOut)) doParse()
// Otherwise the pending pull will kick of doParse()
}
setHandler(bytesIn, this)
setHandlers(bytesIn, objOut, this)
}
}
@ -78,6 +111,8 @@ private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[By
*/
private[akka] object ByteStringParser {
val CompactionThreshold = 16
/**
* @param result - parser can return some element for downstream or return None if no element was generated
* @param nextStep - next parser