RestartWithBackOff delay cancel to wait for failure (#24795)

* RestartWithBackOff delay cancel to wait for failure

For wrapping a user flow with FlowRestart.onFlowWithFailures when the user
flow it fails signals a cancel upstream and a failure downstream.

These are intercepted by a SubSource/SubSink. In the case
the SubSource receives the cancel before the SubSink receives
the real upstream is wrongly canceled leading to an error
when the SubSink restarts the flow.

This commit introduces a delay for the cancel so that the failure
is more likely to win.

Would be far better to propagate a reason for cancel so this could
be deterministic. See https://github.com/akka/akka/pull/23909

Refs #24528 #24726
This commit is contained in:
Christopher Batey 2018-04-27 12:09:16 +01:00 committed by GitHub
parent ca2fe92f0d
commit 82e2e2c551
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 117 additions and 14 deletions

View file

@ -6,10 +6,11 @@ package akka.stream.scaladsl
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import akka.stream.testkit.StreamSpec import akka.stream.scaladsl.RestartWithBackoffFlow.Delay
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped } import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped }
import akka.stream.testkit.scaladsl.{ TestSink, TestSource } import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import akka.stream.{ ActorMaterializer, OverflowStrategy } import akka.stream.{ ActorMaterializer, Attributes, OverflowStrategy }
import akka.testkit.{ DefaultTimeout, TestDuration } import akka.testkit.{ DefaultTimeout, TestDuration }
import akka.{ Done, NotUsed } import akka.{ Done, NotUsed }
@ -464,10 +465,12 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
def setupFlow(minBackoff: FiniteDuration, maxBackoff: FiniteDuration, maxRestarts: Int = -1, onlyOnFailures: Boolean = false) = { def setupFlow(minBackoff: FiniteDuration, maxBackoff: FiniteDuration, maxRestarts: Int = -1, onlyOnFailures: Boolean = false) = {
val created = new AtomicInteger() val created = new AtomicInteger()
val (flowInSource, flowInProbe) = TestSource.probe[String]
val (flowInSource: TestPublisher.Probe[String], flowInProbe: TestSubscriber.Probe[String]) = TestSource.probe[String]
.buffer(4, OverflowStrategy.backpressure) .buffer(4, OverflowStrategy.backpressure)
.toMat(TestSink.probe)(Keep.both).run() .toMat(TestSink.probe)(Keep.both).run()
val (flowOutProbe, flowOutSource) = TestSource.probe[String].toMat(BroadcastHub.sink)(Keep.both).run()
val (flowOutProbe: TestPublisher.Probe[String], flowOutSource: Source[String, NotUsed]) = TestSource.probe[String].toMat(BroadcastHub.sink)(Keep.both).run()
// We can't just use ordinary probes here because we're expecting them to get started/restarted. Instead, we // We can't just use ordinary probes here because we're expecting them to get started/restarted. Instead, we
// simply use the probes as a message bus for feeding and capturing events. // simply use the probes as a message bus for feeding and capturing events.
@ -476,6 +479,10 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
Flow.fromSinkAndSource( Flow.fromSinkAndSource(
Flow[String] Flow[String]
.takeWhile(_ != "cancel") .takeWhile(_ != "cancel")
.map {
case "in error" throw TE("in error")
case other other
}
.to(Sink.foreach(flowInSource.sendNext) .to(Sink.foreach(flowInSource.sendNext)
.mapMaterializedValue(_.onComplete { .mapMaterializedValue(_.onComplete {
case Success(_) flowInSource.sendNext("in complete") case Success(_) flowInSource.sendNext("in complete")
@ -736,5 +743,23 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
created.get() should ===(2) created.get() should ===(2)
} }
"onFailuresWithBackoff, wrapped flow exception should restart configured times" in {
val flowCreations = new AtomicInteger(0)
val failsSomeTimes = Flow[Int].map { i
if (i % 3 == 0) throw TE("fail") else i
}
val restartOnFailures =
RestartFlow.onFailuresWithBackoff(1.second, 2.seconds, 0.2, 2)(() {
flowCreations.incrementAndGet()
failsSomeTimes
}).addAttributes(Attributes(Delay(100.millis)))
val elements = Source(1 to 7)
.via(restartOnFailures)
.runWith(Sink.seq).futureValue
elements shouldEqual List(1, 2, 4, 5, 7)
flowCreations.get() shouldEqual 3
}
} }
} }

View file

@ -5,11 +5,15 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.NotUsed import akka.NotUsed
import akka.annotation.ApiMayChange
import akka.pattern.BackoffSupervisor import akka.pattern.BackoffSupervisor
import akka.stream.Attributes.Attribute
import akka.stream._ import akka.stream._
import akka.stream.stage.{ GraphStage, InHandler, OutHandler, TimerGraphStageLogicWithLogging } import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.scaladsl.RestartWithBackoffFlow.Delay
import akka.stream.stage._
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration._
/** /**
* A RestartSource wraps a [[Source]] that gets restarted when it completes or fails. * A RestartSource wraps a [[Source]] that gets restarted when it completes or fails.
@ -363,17 +367,25 @@ private final class RestartWithBackoffFlow[In, Out](
val out = Outlet[Out]("RestartWithBackoffFlow.out") val out = Outlet[Out]("RestartWithBackoffFlow.out")
override def shape = FlowShape(in, out) override def shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic( override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic(
"Flow", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures, maxRestarts) { "Flow", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures, maxRestarts) {
val delay = inheritedAttributes.get[Delay](Delay(50.millis)).duration
var activeOutIn: Option[(SubSourceOutlet[In], SubSinkInlet[Out])] = None var activeOutIn: Option[(SubSourceOutlet[In], SubSinkInlet[Out])] = None
override protected def logSource = self.getClass override protected def logSource = self.getClass
override protected def startGraph() = { override protected def startGraph() = {
val sourceOut = createSubOutlet(in) val sourceOut: SubSourceOutlet[In] = createSubOutlet(in)
val sinkIn = createSubInlet(out) val sinkIn: SubSinkInlet[Out] = createSubInlet(out)
Source.fromGraph(sourceOut.source).via(flowFactory()).runWith(sinkIn.sink)(subFusingMaterializer)
Source.fromGraph(sourceOut.source)
// Temp fix while waiting cause of cancellation. See #23909
.via(RestartWithBackoffFlow.delayCancellation[In](delay))
.via(flowFactory())
.runWith(sinkIn.sink)(subFusingMaterializer)
if (isAvailable(out)) { if (isAvailable(out)) {
sinkIn.pull() sinkIn.pull()
} }
@ -419,6 +431,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
maxRestarts: Int) extends TimerGraphStageLogicWithLogging(shape) { maxRestarts: Int) extends TimerGraphStageLogicWithLogging(shape) {
var restartCount = 0 var restartCount = 0
var resetDeadline = minBackoff.fromNow var resetDeadline = minBackoff.fromNow
// This is effectively only used for flows, if either the main inlet or outlet of this stage finishes, then we // This is effectively only used for flows, if either the main inlet or outlet of this stage finishes, then we
// don't want to restart the sub inlet when it finishes, we just finish normally. // don't want to restart the sub inlet when it finishes, we just finish normally.
var finishing = false var finishing = false
@ -426,19 +439,27 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
protected def startGraph(): Unit protected def startGraph(): Unit
protected def backoff(): Unit protected def backoff(): Unit
/**
* @param out The permanent outlet
* @return A sub sink inlet that's sink is attached to the wrapped stage
*/
protected final def createSubInlet[T](out: Outlet[T]): SubSinkInlet[T] = { protected final def createSubInlet[T](out: Outlet[T]): SubSinkInlet[T] = {
val sinkIn = new SubSinkInlet[T](s"RestartWithBackoff$name.subIn") val sinkIn = new SubSinkInlet[T](s"RestartWithBackoff$name.subIn")
sinkIn.setHandler(new InHandler { sinkIn.setHandler(new InHandler {
override def onPush() = push(out, sinkIn.grab()) override def onPush() = push(out, sinkIn.grab())
override def onUpstreamFinish() = { override def onUpstreamFinish() = {
if (finishing || maxRestartsReached() || onlyOnFailures) { if (finishing || maxRestartsReached() || onlyOnFailures) {
complete(out) complete(out)
} else { } else {
log.debug("Restarting graph due to finished upstream")
scheduleRestartTimer() scheduleRestartTimer()
} }
} }
/*
* Upstream in this context is the wrapped stage.
*/
override def onUpstreamFailure(ex: Throwable) = { override def onUpstreamFailure(ex: Throwable) = {
if (finishing || maxRestartsReached()) { if (finishing || maxRestartsReached()) {
fail(out, ex) fail(out, ex)
@ -456,10 +477,13 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
sinkIn.cancel() sinkIn.cancel()
} }
}) })
sinkIn sinkIn
} }
/**
* @param in The permanent inlet for this stage
* @return Temporary SubSourceOutlet for this "restart"
*/
protected final def createSubOutlet[T](in: Inlet[T]): SubSourceOutlet[T] = { protected final def createSubOutlet[T](in: Inlet[T]): SubSourceOutlet[T] = {
val sourceOut = new SubSourceOutlet[T](s"RestartWithBackoff$name.subOut") val sourceOut = new SubSourceOutlet[T](s"RestartWithBackoff$name.subOut")
@ -471,11 +495,17 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
pull(in) pull(in)
} }
} }
/*
* Downstream in this context is the wrapped stage.
*
* Can either be a failure or a cancel in the wrapped state.
* onlyOnFailures is thus racy so a delay to cancellation is added in the case of a flow.
*/
override def onDownstreamFinish() = { override def onDownstreamFinish() = {
if (finishing || maxRestartsReached() || onlyOnFailures) { if (finishing || maxRestartsReached() || onlyOnFailures) {
cancel(in) cancel(in)
} else { } else {
log.debug("Graph in finished")
scheduleRestartTimer() scheduleRestartTimer()
} }
} }
@ -498,7 +528,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
sourceOut sourceOut
} }
protected final def maxRestartsReached() = { protected final def maxRestartsReached(): Boolean = {
// Check if the last start attempt was more than the minimum backoff // Check if the last start attempt was more than the minimum backoff
if (resetDeadline.isOverdue()) { if (resetDeadline.isOverdue()) {
log.debug("Last restart attempt was more than {} ago, resetting restart count", minBackoff) log.debug("Last restart attempt was more than {} ago, resetting restart count", minBackoff)
@ -508,7 +538,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
} }
// Set a timer to restart after the calculated delay // Set a timer to restart after the calculated delay
protected final def scheduleRestartTimer() = { protected final def scheduleRestartTimer(): Unit = {
val restartDelay = BackoffSupervisor.calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor) val restartDelay = BackoffSupervisor.calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor)
log.debug("Restarting graph in {}", restartDelay) log.debug("Restarting graph in {}", restartDelay)
scheduleOnce("RestartTimer", restartDelay) scheduleOnce("RestartTimer", restartDelay)
@ -526,3 +556,51 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
// When the stage starts, start the source // When the stage starts, start the source
override def preStart() = startGraph() override def preStart() = startGraph()
} }
object RestartWithBackoffFlow {
/**
* Temporary attribute that can override the time a [[RestartWithBackoffFlow]] waits
* for a failure before cancelling.
*
* See https://github.com/akka/akka/issues/24529
*
* Will be removed if/when cancellation can include a cause.
*/
@ApiMayChange
case class Delay(duration: FiniteDuration) extends Attribute
/**
* Returns a flow that is almost identity but delays propagation of cancellation from downstream to upstream.
*
* Once the down stream is finish calls to onPush are ignored.
*/
private def delayCancellation[T](duration: FiniteDuration): Flow[T, T, NotUsed] =
Flow.fromGraph(new DelayCancellationStage(duration))
private final class DelayCancellationStage[T](delay: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
setHandlers(in, out, this)
def onPush(): Unit = push(out, grab(in))
def onPull(): Unit = pull(in)
override def onDownstreamFinish(): Unit = {
scheduleOnce("CompleteState", delay)
setHandler(
in,
new InHandler {
def onPush(): Unit = {}
}
)
}
override protected def onTimer(timerKey: Any): Unit = {
log.debug(s"Stage was canceled after delay of $delay")
completeStage()
}
}
}
}