Actually check the shutdown status before materializing anything (#24307)
This commit is contained in:
parent
0529f1814b
commit
df40ef7bc0
2 changed files with 30 additions and 9 deletions
|
|
@ -6,9 +6,9 @@ import akka.stream.ActorMaterializerSpec.ActorWithMaterializer
|
||||||
import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
|
import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
|
||||||
import akka.stream.scaladsl.{ Flow, Sink, Source }
|
import akka.stream.scaladsl.{ Flow, Sink, Source }
|
||||||
import akka.stream.testkit.{ StreamSpec, TestPublisher }
|
import akka.stream.testkit.{ StreamSpec, TestPublisher }
|
||||||
import akka.testkit.{ ImplicitSender, TestActor, TestProbe }
|
import akka.testkit.{ ImplicitSender, TestActor, TestLatch, TestProbe }
|
||||||
|
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.{ Await, Future }
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import scala.util.{ Failure, Try }
|
import scala.util.{ Failure, Try }
|
||||||
|
|
||||||
|
|
@ -37,8 +37,21 @@ class ActorMaterializerSpec extends StreamSpec with ImplicitSender {
|
||||||
"refuse materialization after shutdown" in {
|
"refuse materialization after shutdown" in {
|
||||||
val m = ActorMaterializer.create(system)
|
val m = ActorMaterializer.create(system)
|
||||||
m.shutdown()
|
m.shutdown()
|
||||||
an[IllegalStateException] should be thrownBy
|
the[IllegalStateException] thrownBy {
|
||||||
Source(1 to 5).runForeach(println)(m)
|
Source(1 to 5).runWith(Sink.ignore)(m)
|
||||||
|
} should have message "Trying to materialize stream after materializer has been shutdown"
|
||||||
|
}
|
||||||
|
|
||||||
|
"refuse materialization when shutdown while materializing" in {
|
||||||
|
val m = ActorMaterializer.create(system)
|
||||||
|
|
||||||
|
the[IllegalStateException] thrownBy {
|
||||||
|
Source(1 to 5).mapMaterializedValue { _ ⇒
|
||||||
|
// shutdown while materializing
|
||||||
|
m.shutdown()
|
||||||
|
Thread.sleep(100)
|
||||||
|
}.runWith(Sink.ignore)(m)
|
||||||
|
} should have message "Materializer shutdown while materializing stream"
|
||||||
}
|
}
|
||||||
|
|
||||||
"shut down the supervisor actor it encapsulates" in {
|
"shut down the supervisor actor it encapsulates" in {
|
||||||
|
|
@ -90,7 +103,6 @@ object ActorMaterializerSpec {
|
||||||
Source.repeat("hello")
|
Source.repeat("hello")
|
||||||
.alsoTo(Flow[String].take(1).to(Sink.actorRef(p.ref, "one")))
|
.alsoTo(Flow[String].take(1).to(Sink.actorRef(p.ref, "one")))
|
||||||
.runWith(Sink.onComplete(signal ⇒ {
|
.runWith(Sink.onComplete(signal ⇒ {
|
||||||
println(signal)
|
|
||||||
p.ref ! signal
|
p.ref ! signal
|
||||||
}))
|
}))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -420,6 +420,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
|
||||||
defaultAttributes: Attributes,
|
defaultAttributes: Attributes,
|
||||||
defaultPhase: Phase[Any],
|
defaultPhase: Phase[Any],
|
||||||
phases: Map[IslandTag, Phase[Any]]): Mat = {
|
phases: Map[IslandTag, Phase[Any]]): Mat = {
|
||||||
|
if (isShutdown) throw new IllegalStateException("Trying to materialize stream after materializer has been shutdown")
|
||||||
val islandTracking = new IslandTracking(phases, settings, defaultAttributes, defaultPhase, this, islandNamePrefix = createFlowName() + "-")
|
val islandTracking = new IslandTracking(phases, settings, defaultAttributes, defaultPhase, this, islandNamePrefix = createFlowName() + "-")
|
||||||
|
|
||||||
var current: Traversal = graph.traversalBuilder.traversal
|
var current: Traversal = graph.traversalBuilder.traversal
|
||||||
|
|
@ -496,11 +497,19 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
islandTracking.getCurrentPhase.onIslandReady()
|
def shutdownWhileMaterializingFailure =
|
||||||
islandTracking.allNestedIslandsReady()
|
new IllegalStateException("Materializer shutdown while materializing stream")
|
||||||
|
try {
|
||||||
|
islandTracking.getCurrentPhase.onIslandReady()
|
||||||
|
islandTracking.allNestedIslandsReady()
|
||||||
|
|
||||||
|
if (Debug) println("--- Finished materialization")
|
||||||
|
matValueStack.peekLast().asInstanceOf[Mat]
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
if (isShutdown) throw shutdownWhileMaterializingFailure
|
||||||
|
}
|
||||||
|
|
||||||
if (Debug) println("--- Finished materialization")
|
|
||||||
matValueStack.peekLast().asInstanceOf[Mat]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private def wireInlets(islandTracking: IslandTracking, mod: StreamLayout.AtomicModule[Shape, Any], logic: Any): Unit = {
|
private def wireInlets(islandTracking: IslandTracking, mod: StreamLayout.AtomicModule[Shape, Any], logic: Any): Unit = {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue