diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala index f80acd3d78..e385724fc0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala @@ -6,10 +6,11 @@ package akka.stream.scaladsl import java.util.concurrent.atomic.AtomicInteger -import akka.stream.testkit.StreamSpec +import akka.stream.scaladsl.RestartWithBackoffFlow.Delay +import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped } import akka.stream.testkit.scaladsl.{ TestSink, TestSource } -import akka.stream.{ ActorMaterializer, OverflowStrategy } +import akka.stream.{ ActorMaterializer, Attributes, OverflowStrategy } import akka.testkit.{ DefaultTimeout, TestDuration } import akka.{ Done, NotUsed } @@ -464,10 +465,12 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1 def setupFlow(minBackoff: FiniteDuration, maxBackoff: FiniteDuration, maxRestarts: Int = -1, onlyOnFailures: Boolean = false) = { val created = new AtomicInteger() - val (flowInSource, flowInProbe) = TestSource.probe[String] + + val (flowInSource: TestPublisher.Probe[String], flowInProbe: TestSubscriber.Probe[String]) = TestSource.probe[String] .buffer(4, OverflowStrategy.backpressure) .toMat(TestSink.probe)(Keep.both).run() - val (flowOutProbe, flowOutSource) = TestSource.probe[String].toMat(BroadcastHub.sink)(Keep.both).run() + + val (flowOutProbe: TestPublisher.Probe[String], flowOutSource: Source[String, NotUsed]) = TestSource.probe[String].toMat(BroadcastHub.sink)(Keep.both).run() // We can't just use ordinary probes here because we're expecting them to get started/restarted. Instead, we // simply use the probes as a message bus for feeding and capturing events. @@ -476,6 +479,10 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1 Flow.fromSinkAndSource( Flow[String] .takeWhile(_ != "cancel") + .map { + case "in error" ⇒ throw TE("in error") + case other ⇒ other + } .to(Sink.foreach(flowInSource.sendNext) .mapMaterializedValue(_.onComplete { case Success(_) ⇒ flowInSource.sendNext("in complete") @@ -736,5 +743,23 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1 created.get() should ===(2) } + "onFailuresWithBackoff, wrapped flow exception should restart configured times" in { + val flowCreations = new AtomicInteger(0) + val failsSomeTimes = Flow[Int].map { i ⇒ + if (i % 3 == 0) throw TE("fail") else i + } + + val restartOnFailures = + RestartFlow.onFailuresWithBackoff(1.second, 2.seconds, 0.2, 2)(() ⇒ { + flowCreations.incrementAndGet() + failsSomeTimes + }).addAttributes(Attributes(Delay(100.millis))) + + val elements = Source(1 to 7) + .via(restartOnFailures) + .runWith(Sink.seq).futureValue + elements shouldEqual List(1, 2, 4, 5, 7) + flowCreations.get() shouldEqual 3 + } } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Restart.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Restart.scala index c73eda8e04..2ab77a399c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Restart.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Restart.scala @@ -5,11 +5,15 @@ package akka.stream.scaladsl import akka.NotUsed +import akka.annotation.ApiMayChange import akka.pattern.BackoffSupervisor +import akka.stream.Attributes.Attribute import akka.stream._ -import akka.stream.stage.{ GraphStage, InHandler, OutHandler, TimerGraphStageLogicWithLogging } +import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage +import akka.stream.scaladsl.RestartWithBackoffFlow.Delay +import akka.stream.stage._ -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ /** * A RestartSource wraps a [[Source]] that gets restarted when it completes or fails. @@ -363,17 +367,25 @@ private final class RestartWithBackoffFlow[In, Out]( val out = Outlet[Out]("RestartWithBackoffFlow.out") override def shape = FlowShape(in, out) + override def createLogic(inheritedAttributes: Attributes) = new RestartWithBackoffLogic( "Flow", shape, minBackoff, maxBackoff, randomFactor, onlyOnFailures, maxRestarts) { + val delay = inheritedAttributes.get[Delay](Delay(50.millis)).duration var activeOutIn: Option[(SubSourceOutlet[In], SubSinkInlet[Out])] = None override protected def logSource = self.getClass override protected def startGraph() = { - val sourceOut = createSubOutlet(in) - val sinkIn = createSubInlet(out) - Source.fromGraph(sourceOut.source).via(flowFactory()).runWith(sinkIn.sink)(subFusingMaterializer) + val sourceOut: SubSourceOutlet[In] = createSubOutlet(in) + val sinkIn: SubSinkInlet[Out] = createSubInlet(out) + + Source.fromGraph(sourceOut.source) + // Temp fix while waiting cause of cancellation. See #23909 + .via(RestartWithBackoffFlow.delayCancellation[In](delay)) + .via(flowFactory()) + .runWith(sinkIn.sink)(subFusingMaterializer) + if (isAvailable(out)) { sinkIn.pull() } @@ -419,6 +431,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape]( maxRestarts: Int) extends TimerGraphStageLogicWithLogging(shape) { var restartCount = 0 var resetDeadline = minBackoff.fromNow + // This is effectively only used for flows, if either the main inlet or outlet of this stage finishes, then we // don't want to restart the sub inlet when it finishes, we just finish normally. var finishing = false @@ -426,19 +439,27 @@ private abstract class RestartWithBackoffLogic[S <: Shape]( protected def startGraph(): Unit protected def backoff(): Unit + /** + * @param out The permanent outlet + * @return A sub sink inlet that's sink is attached to the wrapped stage + */ protected final def createSubInlet[T](out: Outlet[T]): SubSinkInlet[T] = { val sinkIn = new SubSinkInlet[T](s"RestartWithBackoff$name.subIn") sinkIn.setHandler(new InHandler { override def onPush() = push(out, sinkIn.grab()) + override def onUpstreamFinish() = { if (finishing || maxRestartsReached() || onlyOnFailures) { complete(out) } else { - log.debug("Restarting graph due to finished upstream") scheduleRestartTimer() } } + + /* + * Upstream in this context is the wrapped stage. + */ override def onUpstreamFailure(ex: Throwable) = { if (finishing || maxRestartsReached()) { fail(out, ex) @@ -456,10 +477,13 @@ private abstract class RestartWithBackoffLogic[S <: Shape]( sinkIn.cancel() } }) - sinkIn } + /** + * @param in The permanent inlet for this stage + * @return Temporary SubSourceOutlet for this "restart" + */ protected final def createSubOutlet[T](in: Inlet[T]): SubSourceOutlet[T] = { val sourceOut = new SubSourceOutlet[T](s"RestartWithBackoff$name.subOut") @@ -471,11 +495,17 @@ private abstract class RestartWithBackoffLogic[S <: Shape]( pull(in) } } + + /* + * Downstream in this context is the wrapped stage. + * + * Can either be a failure or a cancel in the wrapped state. + * onlyOnFailures is thus racy so a delay to cancellation is added in the case of a flow. + */ override def onDownstreamFinish() = { if (finishing || maxRestartsReached() || onlyOnFailures) { cancel(in) } else { - log.debug("Graph in finished") scheduleRestartTimer() } } @@ -498,7 +528,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape]( sourceOut } - protected final def maxRestartsReached() = { + protected final def maxRestartsReached(): Boolean = { // Check if the last start attempt was more than the minimum backoff if (resetDeadline.isOverdue()) { log.debug("Last restart attempt was more than {} ago, resetting restart count", minBackoff) @@ -508,7 +538,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape]( } // Set a timer to restart after the calculated delay - protected final def scheduleRestartTimer() = { + protected final def scheduleRestartTimer(): Unit = { val restartDelay = BackoffSupervisor.calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor) log.debug("Restarting graph in {}", restartDelay) scheduleOnce("RestartTimer", restartDelay) @@ -526,3 +556,51 @@ private abstract class RestartWithBackoffLogic[S <: Shape]( // When the stage starts, start the source override def preStart() = startGraph() } + +object RestartWithBackoffFlow { + + /** + * Temporary attribute that can override the time a [[RestartWithBackoffFlow]] waits + * for a failure before cancelling. + * + * See https://github.com/akka/akka/issues/24529 + * + * Will be removed if/when cancellation can include a cause. + */ + @ApiMayChange + case class Delay(duration: FiniteDuration) extends Attribute + + /** + * Returns a flow that is almost identity but delays propagation of cancellation from downstream to upstream. + * + * Once the down stream is finish calls to onPush are ignored. + */ + private def delayCancellation[T](duration: FiniteDuration): Flow[T, T, NotUsed] = + Flow.fromGraph(new DelayCancellationStage(duration)) + + private final class DelayCancellationStage[T](delay: FiniteDuration) extends SimpleLinearGraphStage[T] { + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler with StageLogging { + + setHandlers(in, out, this) + + def onPush(): Unit = push(out, grab(in)) + def onPull(): Unit = pull(in) + + override def onDownstreamFinish(): Unit = { + scheduleOnce("CompleteState", delay) + setHandler( + in, + new InHandler { + def onPush(): Unit = {} + } + ) + } + + override protected def onTimer(timerKey: Any): Unit = { + log.debug(s"Stage was canceled after delay of $delay") + completeStage() + } + } + } +}