From df40ef7bc0863726c18f0046d7b3f94e43a90593 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Tue, 16 Jan 2018 09:10:21 +0100 Subject: [PATCH] Actually check the shutdown status before materializing anything (#24307) --- .../akka/stream/ActorMaterializerSpec.scala | 22 ++++++++++++++----- .../impl/PhasedFusingActorMaterializer.scala | 17 ++++++++++---- 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala index 241d6a2132..512064a391 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala @@ -6,9 +6,9 @@ import akka.stream.ActorMaterializerSpec.ActorWithMaterializer import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } import akka.stream.scaladsl.{ Flow, Sink, Source } import akka.stream.testkit.{ StreamSpec, TestPublisher } -import akka.testkit.{ ImplicitSender, TestActor, TestProbe } +import akka.testkit.{ ImplicitSender, TestActor, TestLatch, TestProbe } -import scala.concurrent.Await +import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ import scala.util.{ Failure, Try } @@ -37,8 +37,21 @@ class ActorMaterializerSpec extends StreamSpec with ImplicitSender { "refuse materialization after shutdown" in { val m = ActorMaterializer.create(system) m.shutdown() - an[IllegalStateException] should be thrownBy - Source(1 to 5).runForeach(println)(m) + the[IllegalStateException] thrownBy { + Source(1 to 5).runWith(Sink.ignore)(m) + } should have message "Trying to materialize stream after materializer has been shutdown" + } + + "refuse materialization when shutdown while materializing" in { + val m = ActorMaterializer.create(system) + + the[IllegalStateException] thrownBy { + Source(1 to 5).mapMaterializedValue { _ ⇒ + // shutdown while materializing + m.shutdown() + Thread.sleep(100) + }.runWith(Sink.ignore)(m) + } should have message "Materializer shutdown while materializing stream" } "shut down the supervisor actor it encapsulates" in { @@ -90,7 +103,6 @@ object ActorMaterializerSpec { Source.repeat("hello") .alsoTo(Flow[String].take(1).to(Sink.actorRef(p.ref, "one"))) .runWith(Sink.onComplete(signal ⇒ { - println(signal) p.ref ! signal })) diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index b81fe24a8f..588cfb787a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -420,6 +420,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff defaultAttributes: Attributes, defaultPhase: Phase[Any], phases: Map[IslandTag, Phase[Any]]): Mat = { + if (isShutdown) throw new IllegalStateException("Trying to materialize stream after materializer has been shutdown") val islandTracking = new IslandTracking(phases, settings, defaultAttributes, defaultPhase, this, islandNamePrefix = createFlowName() + "-") var current: Traversal = graph.traversalBuilder.traversal @@ -496,11 +497,19 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff } } - islandTracking.getCurrentPhase.onIslandReady() - islandTracking.allNestedIslandsReady() + def shutdownWhileMaterializingFailure = + new IllegalStateException("Materializer shutdown while materializing stream") + try { + islandTracking.getCurrentPhase.onIslandReady() + islandTracking.allNestedIslandsReady() + + if (Debug) println("--- Finished materialization") + matValueStack.peekLast().asInstanceOf[Mat] + + } finally { + if (isShutdown) throw shutdownWhileMaterializingFailure + } - if (Debug) println("--- Finished materialization") - matValueStack.peekLast().asInstanceOf[Mat] } private def wireInlets(islandTracking: IslandTracking, mod: StreamLayout.AtomicModule[Shape, Any], logic: Any): Unit = {