From 7fa7cc1624e53cf7687e4ae3ad81d98b032df71a Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Sat, 19 Nov 2016 17:07:33 +0100 Subject: [PATCH] =str add safeguard against accidental logic cycles in user ByteStringParser logic Before, such a cyclic logic might have spun on one thread. Now recursion is limited to a configurable number of recursions. The default number of 1000 is an arbitrary number high enough not to be a limitation in realistic cases. Recursion only happens when a ParsingStep consumes data without emitting an element immediately moving on to the next step. In the unlikely case that a parsing logic requires more recursion, the logic can override `ParsingLogic.recursionLimit`. --- .../akka/stream/io/ByteStringParserSpec.scala | 30 ++++++++++++++-- .../stream/impl/io/ByteStringParser.scala | 34 +++++++++++++++++-- 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/ByteStringParserSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/ByteStringParserSpec.scala index 7b46b8d175..e21a1fec28 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/ByteStringParserSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/ByteStringParserSpec.scala @@ -3,13 +3,12 @@ */ package akka.stream.io -import akka.stream.{ ActorMaterializer, Attributes, ThrottleMode } import akka.stream.impl.io.ByteStringParser import akka.stream.impl.io.ByteStringParser.{ ByteReader, ParseResult, ParseStep } -import akka.stream.io.compression.LogByteStringTools import akka.stream.scaladsl.{ Sink, Source } import akka.stream.stage.GraphStageLogic import akka.stream.testkit.StreamSpec +import akka.stream.{ ActorMaterializer, Attributes, ThrottleMode } import akka.util.ByteString import scala.concurrent.Await @@ -83,6 +82,33 @@ class ByteStringParserSpec extends StreamSpec() { run(ByteString(0xca, 0xfe), ByteString(0xef, 0x12)) shouldEqual ByteString(0xef, 0x12) run(ByteString(0xca, 0xfe, 0xef, 0x12)) shouldEqual ByteString(0xef, 0x12) } + + "don't spin when logic is flawed" in { + object SpinningLogic extends ByteStringParser[Int] { + def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new ParsingLogic { + object State1 extends ParseStep[Int] { + def parse(reader: ByteReader): ParseResult[Int] = + ParseResult(None, State2) + } + object State2 extends ParseStep[Int] { + def parse(reader: ByteReader): ParseResult[Int] = + ParseResult(None, State1) + } + + startWith(State1) + + override protected def recursionLimit: Int = 10 // fail even earlier than the default + } + } + + (the[IllegalStateException] thrownBy Await.result( + Source.single(ByteString("abc")) + .via(SpinningLogic) + .runWith(Sink.ignore), + 3.seconds)).getMessage shouldBe "Parsing logic didn't produce result after 10 steps. " + + "Aborting processing to avoid infinite cycles. In the unlikely case that the parsing logic needs more recursion, " + + "override ParsingLogic.recursionLimit." + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/ByteStringParser.scala b/akka-stream/src/main/scala/akka/stream/impl/io/ByteStringParser.scala index b9d55b0c1b..36a8e4efbd 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/ByteStringParser.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/ByteStringParser.scala @@ -7,6 +7,7 @@ import akka.stream._ import akka.stream.stage._ import akka.util.ByteString +import scala.annotation.tailrec import scala.util.control.{ NoStackTrace, NonFatal } /** @@ -29,6 +30,8 @@ private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[By final protected def startWith(step: ParseStep[T]): Unit = current = step + protected def recursionLimit: Int = 1000 + /** * doParse() is the main driver for the parser. It can be called from onPush, onPull and onUpstreamFinish. * The general logic is that invocation of this method either results in an emitted parsed element, or an indication @@ -40,8 +43,10 @@ private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[By * buffer. This can lead to two conditions: * - drained, empty buffer. This is either accepted completion (acceptUpstreamFinish) or a truncation. * - parser demands more data than in buffer. This is always a truncation. + * + * If the return value is true the method must be called another time to continue processing. */ - private def doParse(): Unit = { + private def doParseInner(): Boolean = if (buffer.nonEmpty) { val reader = new ByteReader(buffer) try { @@ -51,13 +56,16 @@ private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[By if (parseResult.nextStep == FinishedParser) { completeStage() + DontRecurse } else { buffer = reader.remainingData current = parseResult.nextStep // If this step didn't produce a result, continue parsing. if (parseResult.result.isEmpty) - doParse() + Recurse + else + DontRecurse } } catch { case NeedMoreData ⇒ @@ -67,8 +75,12 @@ private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[By // Not enough data in buffer and upstream is closed if (isClosed(bytesIn)) current.onTruncation() else pull(bytesIn) + + DontRecurse case NonFatal(ex) ⇒ failStage(new ParsingException(s"Parsing failed in step $current", ex)) + + DontRecurse } } else { if (isClosed(bytesIn)) { @@ -77,8 +89,21 @@ private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[By if (acceptUpstreamFinish) completeStage() else current.onTruncation() } else pull(bytesIn) + + DontRecurse + } + + @tailrec private def doParse(remainingRecursions: Int = recursionLimit): Unit = + if (remainingRecursions == 0) + failStage( + new IllegalStateException(s"Parsing logic didn't produce result after $recursionLimit steps. " + + "Aborting processing to avoid infinite cycles. In the unlikely case that the parsing logic " + + "needs more recursion, override ParsingLogic.recursionLimit.") + ) + else { + val recurse = doParseInner() + if (recurse) doParse(remainingRecursions - 1) } - } // Completion is handled by doParse as the buffer either gets empty after this call, or the parser requests // data that we can no longer provide (truncation). @@ -118,6 +143,9 @@ private[akka] object ByteStringParser { val CompactionThreshold = 16 + private final val Recurse = true + private final val DontRecurse = false + /** * @param result - parser can return some element for downstream or return None if no element was generated in this step * and parsing should immediately continue with the next step.