From 93ce2b86960dddea0ccacd997a3b4a283506a3d5 Mon Sep 17 00:00:00 2001 From: Arnaud Burlet Date: Mon, 4 Feb 2019 08:09:28 +0100 Subject: [PATCH] Test to ensure a coordinated shutdown is successful when a stream involving StreamRefs is running --- .../test/scala/akka/cluster/ClusterSpec.scala | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index ff01b29e05..99a24137d3 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -20,6 +20,8 @@ import com.typesafe.config.ConfigFactory import akka.actor.CoordinatedShutdown import akka.cluster.ClusterEvent.MemberEvent import akka.cluster.ClusterEvent._ +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{ Sink, Source, StreamRefs } import scala.concurrent.Await @@ -171,6 +173,40 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { } } + "terminate ActorSystem via CoordinatedShutdown.run when a stream involving StreamRefs is running" in { + val sys2 = ActorSystem("ClusterSpec2", ConfigFactory.parseString(""" + akka.actor.provider = "cluster" + akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + akka.coordinated-shutdown.terminate-actor-system = on + """)) + try { + val probe = TestProbe()(sys2) + Cluster(sys2).subscribe(probe.ref, classOf[MemberEvent]) + probe.expectMsgType[CurrentClusterState] + Cluster(sys2).join(Cluster(sys2).selfAddress) + probe.expectMsgType[MemberUp] + val mat = ActorMaterializer()(sys2) + val sink = Await.result(StreamRefs.sinkRef[String]().to(Sink.ignore).run()(mat), 10.seconds) + Source.tick(1.milli, 10.millis, "tick").to(sink).run()(mat) + + CoordinatedShutdown(sys2).run(CoordinatedShutdown.UnknownReason) + probe.expectMsgType[MemberLeft] + // MemberExited might not be published before MemberRemoved + val removed = probe.fishForMessage() { + case _: MemberExited ⇒ false + case _: MemberRemoved ⇒ true + }.asInstanceOf[MemberRemoved] + removed.previousStatus should ===(MemberStatus.Exiting) + + Await.result(sys2.whenTerminated, 10.seconds) + Cluster(sys2).isTerminated should ===(true) + CoordinatedShutdown(sys2).shutdownReason() should ===(Some(CoordinatedShutdown.UnknownReason)) + } finally { + shutdown(sys2) + } + } + "leave via CoordinatedShutdown.run when member status is Joining" in { val sys2 = ActorSystem("ClusterSpec2", ConfigFactory.parseString(""" akka.actor.provider = "cluster"