=str add safeguard against accidental logic cycles in user ByteStringParser logic

Before, such a cyclic logic might have spun on one thread. Now recursion
is limited to a configurable number of recursions.

The default number of 1000 is an arbitrary number high enough not to be a
limitation in realistic cases. Recursion only happens when a ParsingStep consumes
data without emitting an element immediately moving on to the next step.

In the unlikely case that a parsing logic requires more recursion, the logic
can override `ParsingLogic.recursionLimit`.
This commit is contained in:
Johannes Rudolph 2016-11-19 17:07:33 +01:00 committed by Konrad Malawski
parent a2402f7aff
commit 7fa7cc1624
2 changed files with 59 additions and 5 deletions

View file

@ -3,13 +3,12 @@
*/
package akka.stream.io
import akka.stream.{ ActorMaterializer, Attributes, ThrottleMode }
import akka.stream.impl.io.ByteStringParser
import akka.stream.impl.io.ByteStringParser.{ ByteReader, ParseResult, ParseStep }
import akka.stream.io.compression.LogByteStringTools
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.stage.GraphStageLogic
import akka.stream.testkit.StreamSpec
import akka.stream.{ ActorMaterializer, Attributes, ThrottleMode }
import akka.util.ByteString
import scala.concurrent.Await
@ -83,6 +82,33 @@ class ByteStringParserSpec extends StreamSpec() {
run(ByteString(0xca, 0xfe), ByteString(0xef, 0x12)) shouldEqual ByteString(0xef, 0x12)
run(ByteString(0xca, 0xfe, 0xef, 0x12)) shouldEqual ByteString(0xef, 0x12)
}
"don't spin when logic is flawed" in {
object SpinningLogic extends ByteStringParser[Int] {
def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new ParsingLogic {
object State1 extends ParseStep[Int] {
def parse(reader: ByteReader): ParseResult[Int] =
ParseResult(None, State2)
}
object State2 extends ParseStep[Int] {
def parse(reader: ByteReader): ParseResult[Int] =
ParseResult(None, State1)
}
startWith(State1)
override protected def recursionLimit: Int = 10 // fail even earlier than the default
}
}
(the[IllegalStateException] thrownBy Await.result(
Source.single(ByteString("abc"))
.via(SpinningLogic)
.runWith(Sink.ignore),
3.seconds)).getMessage shouldBe "Parsing logic didn't produce result after 10 steps. " +
"Aborting processing to avoid infinite cycles. In the unlikely case that the parsing logic needs more recursion, " +
"override ParsingLogic.recursionLimit."
}
}
}

View file

@ -7,6 +7,7 @@ import akka.stream._
import akka.stream.stage._
import akka.util.ByteString
import scala.annotation.tailrec
import scala.util.control.{ NoStackTrace, NonFatal }
/**
@ -29,6 +30,8 @@ private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[By
final protected def startWith(step: ParseStep[T]): Unit = current = step
protected def recursionLimit: Int = 1000
/**
* doParse() is the main driver for the parser. It can be called from onPush, onPull and onUpstreamFinish.
* The general logic is that invocation of this method either results in an emitted parsed element, or an indication
@ -40,8 +43,10 @@ private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[By
* buffer. This can lead to two conditions:
* - drained, empty buffer. This is either accepted completion (acceptUpstreamFinish) or a truncation.
* - parser demands more data than in buffer. This is always a truncation.
*
* If the return value is true the method must be called another time to continue processing.
*/
private def doParse(): Unit = {
private def doParseInner(): Boolean =
if (buffer.nonEmpty) {
val reader = new ByteReader(buffer)
try {
@ -51,13 +56,16 @@ private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[By
if (parseResult.nextStep == FinishedParser) {
completeStage()
DontRecurse
} else {
buffer = reader.remainingData
current = parseResult.nextStep
// If this step didn't produce a result, continue parsing.
if (parseResult.result.isEmpty)
doParse()
Recurse
else
DontRecurse
}
} catch {
case NeedMoreData
@ -67,8 +75,12 @@ private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[By
// Not enough data in buffer and upstream is closed
if (isClosed(bytesIn)) current.onTruncation()
else pull(bytesIn)
DontRecurse
case NonFatal(ex)
failStage(new ParsingException(s"Parsing failed in step $current", ex))
DontRecurse
}
} else {
if (isClosed(bytesIn)) {
@ -77,8 +89,21 @@ private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[By
if (acceptUpstreamFinish) completeStage()
else current.onTruncation()
} else pull(bytesIn)
DontRecurse
}
@tailrec private def doParse(remainingRecursions: Int = recursionLimit): Unit =
if (remainingRecursions == 0)
failStage(
new IllegalStateException(s"Parsing logic didn't produce result after $recursionLimit steps. " +
"Aborting processing to avoid infinite cycles. In the unlikely case that the parsing logic " +
"needs more recursion, override ParsingLogic.recursionLimit.")
)
else {
val recurse = doParseInner()
if (recurse) doParse(remainingRecursions - 1)
}
}
// Completion is handled by doParse as the buffer either gets empty after this call, or the parser requests
// data that we can no longer provide (truncation).
@ -118,6 +143,9 @@ private[akka] object ByteStringParser {
val CompactionThreshold = 16
private final val Recurse = true
private final val DontRecurse = false
/**
* @param result - parser can return some element for downstream or return None if no element was generated in this step
* and parsing should immediately continue with the next step.