From 7c6025b06ab65f711ee79db0cd8f533f3cf55899 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Mon, 20 May 2019 13:56:23 +0200 Subject: [PATCH] Snapshot when stages stopped incorrect and can cause crash #26902 --- .../snapshot/MaterializerStateSpec.scala | 88 +++++++++++++++++-- .../stream/impl/ActorMaterializerImpl.scala | 38 ++++++-- .../stream/impl/fusing/GraphInterpreter.scala | 7 +- .../stream/snapshot/MaterializerState.scala | 13 +-- 4 files changed, 123 insertions(+), 23 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/snapshot/MaterializerStateSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/snapshot/MaterializerStateSpec.scala index ee3b241bc5..606c9198ce 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/snapshot/MaterializerStateSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/snapshot/MaterializerStateSpec.scala @@ -4,8 +4,9 @@ package akka.stream.snapshot -import akka.stream.ActorMaterializer -import akka.stream.scaladsl.{ Sink, Source } +import akka.stream.{ ActorMaterializer, FlowShape } +import akka.stream.scaladsl.{ Flow, GraphDSL, Keep, Merge, Partition, Sink, Source } +import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.StreamSpec import scala.concurrent.duration._ @@ -16,16 +17,85 @@ class MaterializerStateSpec extends StreamSpec { "snapshot a running stream" in { implicit val mat = ActorMaterializer() - Source.maybe[Int].map(_.toString).zipWithIndex.runWith(Sink.seq) + try { + Source.maybe[Int].map(_.toString).zipWithIndex.runWith(Sink.seq) - awaitAssert({ - val snapshot = MaterializerState.streamSnapshots(mat).futureValue + awaitAssert({ + val snapshot = MaterializerState.streamSnapshots(mat).futureValue - snapshot should have size (1) - snapshot.head.activeInterpreters should have size (1) - snapshot.head.activeInterpreters.head.logics should have size (4) // all 4 operators - }, 3.seconds) + snapshot should have size (1) + snapshot.head.activeInterpreters should have size (1) + snapshot.head.activeInterpreters.head.logics should have size (4) // all 4 operators + }, remainingOrDefault) + } finally { + mat.shutdown() + } } + + "snapshot a stream that has a stopped stage" in { + implicit val mat = ActorMaterializer() + try { + val probe = TestSink.probe[String](system) + val out = Source + .single("one") + .concat(Source.maybe[String]) // make sure we leave it running + .runWith(probe) + out.requestNext("one") + awaitAssert({ + val snapshot = MaterializerState.streamSnapshots(mat).futureValue + snapshot should have size (1) + snapshot.head.activeInterpreters should have size (1) + snapshot.head.activeInterpreters.head.stoppedLogics should have size (2) // Source.single and a detach + }, remainingOrDefault) + + } finally { + mat.shutdown() + } + } + + "snapshot a more complicated graph" in { + implicit val mat = ActorMaterializer() + try { + // snapshot before anything is running + MaterializerState.streamSnapshots(mat).futureValue + + val graph = Flow.fromGraph(GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + val partition = b.add(Partition[String](4, { + case "green" => 0 + case "red" => 1 + case "blue" => 2 + case _ => 3 + })) + val merge = b.add(Merge[String](4, eagerComplete = false)) + val discard = b.add(Sink.ignore.async) + val one = b.add(Source.single("purple")) + + partition.out(0) ~> merge.in(0) + partition.out(1).via(Flow[String].map(_.toUpperCase()).async) ~> merge.in(1) + partition.out(2).groupBy(2, identity).mergeSubstreams ~> merge.in(2) + partition.out(3) ~> discard + + one ~> merge.in(3) + + FlowShape(partition.in, merge.out) + }) + + val callMeMaybe = + Source.maybe[String].viaMat(graph)(Keep.left).toMat(Sink.ignore)(Keep.left).run() + + // just check that we can snapshot without errors + MaterializerState.streamSnapshots(mat).futureValue + callMeMaybe.success(Some("green")) + MaterializerState.streamSnapshots(mat).futureValue + Thread.sleep(100) // just to give it a bigger chance to cover different states of shutting down + MaterializerState.streamSnapshots(mat).futureValue + + } finally { + mat.shutdown() + } + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index cacdc9405d..2a7246d8b7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -10,13 +10,15 @@ import akka.actor._ import akka.annotation.{ DoNotInherit, InternalApi } import akka.dispatch.Dispatchers import akka.event.LoggingAdapter -import akka.pattern.ask +import akka.pattern.{ ask, pipe, retry } import akka.stream._ -import akka.stream.impl.fusing.GraphInterpreterShell -import akka.util.OptionVal +import akka.stream.impl.fusing.{ ActorGraphInterpreter, GraphInterpreterShell } +import akka.stream.snapshot.StreamSnapshot +import akka.util.{ OptionVal, Timeout } -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ Await, ExecutionContextExecutor } +import scala.collection.immutable +import scala.concurrent.duration._ +import scala.concurrent.{ Await, ExecutionContextExecutor, Future } /** * ExtendedActorMaterializer used by subtypes which delegates in-island wiring to [[akka.stream.impl.PhaseIsland]]s @@ -177,6 +179,11 @@ private[akka] class SubFusingActorMaterializerImpl( extends DeadLetterSuppression with NoSerializationVerificationNeeded + case object GetChildrenSnapshots + final case class ChildrenSnapshots(seq: immutable.Seq[StreamSnapshot]) + extends DeadLetterSuppression + with NoSerializationVerificationNeeded + /** Testing purpose */ case object GetChildren @@ -195,7 +202,7 @@ private[akka] class SubFusingActorMaterializerImpl( */ @InternalApi private[akka] class StreamSupervisor(haveShutDown: AtomicBoolean) extends Actor { import akka.stream.impl.StreamSupervisor._ - + implicit val ec = context.dispatcher override def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy def receive = { @@ -209,10 +216,29 @@ private[akka] class SubFusingActorMaterializerImpl( context.asInstanceOf[ActorCell].removeFunctionRef(ref) case GetChildren => sender() ! Children(context.children.toSet) + case GetChildrenSnapshots => + takeSnapshotsOfChildren().map(ChildrenSnapshots.apply _).pipeTo(sender()) + case StopChildren => context.children.foreach(context.stop) sender() ! StoppedChildren } + def takeSnapshotsOfChildren(): Future[immutable.Seq[StreamSnapshot]] = { + implicit val scheduler = context.system.scheduler + // Arbitrary timeout but should always be quick, the failure scenario is that + // the child/stream stopped, and we do retry below + implicit val timeout: Timeout = 1.second + def takeSnapshot() = { + val futureSnapshots = + context.children.toList.map(child => (child ? ActorGraphInterpreter.Snapshot).mapTo[StreamSnapshot]) + Future.sequence(futureSnapshots) + } + + // If the timeout hits it is likely because one of the streams stopped between looking at the list + // of children and asking it for a snapshot. We retry the entire snapshot in that case + retry(() => takeSnapshot(), 3, Duration.Zero) + } + override def postStop(): Unit = haveShutDown.set(true) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index 70fc0c7c8e..e859aca9a1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -680,12 +680,15 @@ import akka.stream.snapshot._ }) } + val stoppedStages: List[LogicSnapshot] = shutdownCounter.zipWithIndex.collect { + case (activeConnections, idx) if activeConnections < 1 => logicSnapshots(idx) + }.toList + RunningInterpreterImpl( logicSnapshots.toVector, connectionSnapshots.toVector, queueStatus, runningStages, - shutdownCounter.toList.map(n => logicSnapshots(n))) + stoppedStages) } - } diff --git a/akka-stream/src/main/scala/akka/stream/snapshot/MaterializerState.scala b/akka-stream/src/main/scala/akka/stream/snapshot/MaterializerState.scala index 3ba83ca04e..55d8fc7d78 100644 --- a/akka-stream/src/main/scala/akka/stream/snapshot/MaterializerState.scala +++ b/akka-stream/src/main/scala/akka/stream/snapshot/MaterializerState.scala @@ -8,8 +8,8 @@ import akka.actor.{ ActorPath, ActorRef } import akka.annotation.{ ApiMayChange, DoNotInherit, InternalApi } import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } import akka.pattern.ask -import akka.stream.{ Attributes, Materializer } import akka.stream.impl.fusing.ActorGraphInterpreter +import akka.stream.{ Attributes, Materializer } import akka.util.Timeout import scala.collection.immutable @@ -41,11 +41,9 @@ object MaterializerState { @InternalApi private[akka] def requestFromSupervisor(supervisor: ActorRef)( implicit ec: ExecutionContext): Future[immutable.Seq[StreamSnapshot]] = { - // FIXME arbitrary timeout + // Arbitrary timeout: operation should always be quick, when it times out it will be because the materializer stopped implicit val timeout: Timeout = 10.seconds - (supervisor ? StreamSupervisor.GetChildren) - .mapTo[StreamSupervisor.Children] - .flatMap(msg => Future.sequence(msg.children.toVector.map(requestFromChild))) + (supervisor ? StreamSupervisor.GetChildrenSnapshots).mapTo[StreamSupervisor.ChildrenSnapshots].map(_.seq) } /** INTERNAL API */ @@ -188,7 +186,10 @@ private[akka] final case class RunningInterpreterImpl( @InternalApi private[akka] final case class LogicSnapshotImpl(index: Int, label: String, attributes: Attributes) extends LogicSnapshot - with HideImpl + with HideImpl { + + override def toString: String = s"Logic($label)" +} /** * INTERNAL API