Test to ensure a coordinated shutdown is successful when a stream involving StreamRefs is running
This commit is contained in:
parent
8e2d378228
commit
93ce2b8696
1 changed files with 36 additions and 0 deletions
|
|
@ -20,6 +20,8 @@ import com.typesafe.config.ConfigFactory
|
|||
import akka.actor.CoordinatedShutdown
|
||||
import akka.cluster.ClusterEvent.MemberEvent
|
||||
import akka.cluster.ClusterEvent._
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.scaladsl.{ Sink, Source, StreamRefs }
|
||||
|
||||
import scala.concurrent.Await
|
||||
|
||||
|
|
@ -171,6 +173,40 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
|
|||
}
|
||||
}
|
||||
|
||||
"terminate ActorSystem via CoordinatedShutdown.run when a stream involving StreamRefs is running" in {
|
||||
val sys2 = ActorSystem("ClusterSpec2", ConfigFactory.parseString("""
|
||||
akka.actor.provider = "cluster"
|
||||
akka.remote.netty.tcp.port = 0
|
||||
akka.remote.artery.canonical.port = 0
|
||||
akka.coordinated-shutdown.terminate-actor-system = on
|
||||
"""))
|
||||
try {
|
||||
val probe = TestProbe()(sys2)
|
||||
Cluster(sys2).subscribe(probe.ref, classOf[MemberEvent])
|
||||
probe.expectMsgType[CurrentClusterState]
|
||||
Cluster(sys2).join(Cluster(sys2).selfAddress)
|
||||
probe.expectMsgType[MemberUp]
|
||||
val mat = ActorMaterializer()(sys2)
|
||||
val sink = Await.result(StreamRefs.sinkRef[String]().to(Sink.ignore).run()(mat), 10.seconds)
|
||||
Source.tick(1.milli, 10.millis, "tick").to(sink).run()(mat)
|
||||
|
||||
CoordinatedShutdown(sys2).run(CoordinatedShutdown.UnknownReason)
|
||||
probe.expectMsgType[MemberLeft]
|
||||
// MemberExited might not be published before MemberRemoved
|
||||
val removed = probe.fishForMessage() {
|
||||
case _: MemberExited ⇒ false
|
||||
case _: MemberRemoved ⇒ true
|
||||
}.asInstanceOf[MemberRemoved]
|
||||
removed.previousStatus should ===(MemberStatus.Exiting)
|
||||
|
||||
Await.result(sys2.whenTerminated, 10.seconds)
|
||||
Cluster(sys2).isTerminated should ===(true)
|
||||
CoordinatedShutdown(sys2).shutdownReason() should ===(Some(CoordinatedShutdown.UnknownReason))
|
||||
} finally {
|
||||
shutdown(sys2)
|
||||
}
|
||||
}
|
||||
|
||||
"leave via CoordinatedShutdown.run when member status is Joining" in {
|
||||
val sys2 = ActorSystem("ClusterSpec2", ConfigFactory.parseString("""
|
||||
akka.actor.provider = "cluster"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue