diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala index fab7e5d575..b0b9e8aa99 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala @@ -145,7 +145,7 @@ object ClusterSingletonManager { /** * The first event, corresponding to CurrentClusterState. */ - final case class InitialOldestState(oldest: Option[Address], memberCount: Int) + final case class InitialOldestState(oldest: Option[Address], safeToBeOldest: Boolean) final case class OldestChanged(oldest: Option[Address]) } @@ -191,8 +191,9 @@ object ClusterSingletonManager { def handleInitial(state: CurrentClusterState): Unit = { membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m ⇒ - m.status == MemberStatus.Up && matchingRole(m)) - val initial = InitialOldestState(membersByAge.headOption.map(_.address), membersByAge.size) + (m.status == MemberStatus.Up || m.status == MemberStatus.Leaving) && matchingRole(m)) + val safeToBeOldest = !state.members.exists { m ⇒ (m.status == MemberStatus.Down || m.status == MemberStatus.Exiting) } + val initial = InitialOldestState(membersByAge.headOption.map(_.address), safeToBeOldest) changes :+= initial } @@ -421,10 +422,10 @@ class ClusterSingletonManager( getNextOldestChanged() stay - case Event(InitialOldestState(oldestOption, memberCount), _) ⇒ + case Event(InitialOldestState(oldestOption, safeToBeOldest), _) ⇒ oldestChangedReceived = true - if (oldestOption == selfAddressOption && memberCount == 1) - // alone, oldest immediately + if (oldestOption == selfAddressOption && safeToBeOldest) + // oldest immediately gotoOldest() else if (oldestOption == selfAddressOption) goto(BecomingOldest) using BecomingOldestData(None) @@ -592,7 +593,10 @@ class ClusterSingletonManager( val newOldest = handOverTo.map(_.path.address) logInfo("Singleton terminated, hand-over done [{} -> {}]", cluster.selfAddress, newOldest) handOverTo foreach { _ ! HandOverDone } - if (selfExited || removed.contains(cluster.selfAddress)) + if (removed.contains(cluster.selfAddress)) { + logInfo("Self removed, stopping ClusterSingletonManager") + stop() + } else if (selfExited) goto(End) using EndData else goto(Younger) using YoungerData(newOldest) @@ -612,6 +616,9 @@ class ClusterSingletonManager( logInfo("Exited [{}]", m.address) } stay + case Event(MemberRemoved(m, _), _) if m.address == cluster.selfAddress && !selfExited ⇒ + logInfo("Self removed, stopping ClusterSingletonManager") + stop() case Event(MemberRemoved(m, _), _) ⇒ if (!selfExited) logInfo("Member removed [{}]", m.address) addRemoved(m.address) diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerLeaveSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerLeaveSpec.scala new file mode 100644 index 0000000000..0b04acb5d6 --- /dev/null +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerLeaveSpec.scala @@ -0,0 +1,135 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.contrib.pattern + +import language.postfixOps +import scala.collection.immutable +import scala.concurrent.duration._ +import com.typesafe.config.ConfigFactory +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.Address +import akka.actor.Props +import akka.actor.PoisonPill +import akka.actor.RootActorPath +import akka.cluster.Cluster +import akka.cluster.ClusterEvent._ +import akka.cluster.Member +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit._ +import akka.testkit.TestEvent._ +import akka.actor.Terminated +import akka.actor.ActorSelection +import akka.cluster.MemberStatus + +object ClusterSingletonManagerLeaveSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.auto-down-unreachable-after = off + """)) + + case object EchoStarted + /** + * The singleton actor + */ + class Echo(testActor: ActorRef) extends Actor { + override def postStop(): Unit = { + testActor ! "stopped" + } + + def receive = { + case _ ⇒ + sender() ! self + } + } +} + +class ClusterSingletonManagerLeaveMultiJvmNode1 extends ClusterSingletonManagerLeaveSpec +class ClusterSingletonManagerLeaveMultiJvmNode2 extends ClusterSingletonManagerLeaveSpec +class ClusterSingletonManagerLeaveMultiJvmNode3 extends ClusterSingletonManagerLeaveSpec + +class ClusterSingletonManagerLeaveSpec extends MultiNodeSpec(ClusterSingletonManagerLeaveSpec) with STMultiNodeSpec with ImplicitSender { + import ClusterSingletonManagerLeaveSpec._ + + override def initialParticipants = roles.size + + lazy val cluster = Cluster(system) + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + cluster join node(to).address + createSingleton() + } + } + + def createSingleton(): ActorRef = { + system.actorOf(ClusterSingletonManager.props( + singletonProps = Props(classOf[Echo], testActor), + singletonName = "echo", + terminationMessage = PoisonPill, + role = None), + name = "singleton") + } + + lazy val echoProxy: ActorRef = { + system.actorOf(ClusterSingletonProxy.props( + singletonPath = "/user/singleton/echo", + role = None), + name = "echoProxy") + } + + "Leaving ClusterSingletonManager" must { + + "hand-over to new instance" in { + join(first, first) + + runOn(first) { + echoProxy ! "hello" + expectMsgType[ActorRef](5.seconds) + } + enterBarrier("first-active") + + join(second, first) + join(third, first) + within(10.seconds) { + awaitAssert(cluster.state.members.count(m ⇒ m.status == MemberStatus.Up) should be(3)) + } + + runOn(second) { + cluster.leave(node(first).address) + } + + runOn(first) { + expectMsg(10.seconds, "stopped") + } + enterBarrier("first-stopped") + + runOn(second, third) { + val p = TestProbe() + val firstAddress = node(first).address + p.within(10.seconds) { + p.awaitAssert { + echoProxy.tell("hello2", p.ref) + p.expectMsgType[ActorRef](1.seconds).path.address should not be (firstAddress) + + } + } + } + + enterBarrier("hand-over-done") + } + + } +} diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerStartupSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerStartupSpec.scala new file mode 100644 index 0000000000..55804d0cb2 --- /dev/null +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerStartupSpec.scala @@ -0,0 +1,111 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.contrib.pattern + +import language.postfixOps +import scala.collection.immutable +import scala.concurrent.duration._ +import com.typesafe.config.ConfigFactory +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.Address +import akka.actor.Props +import akka.actor.PoisonPill +import akka.actor.RootActorPath +import akka.cluster.Cluster +import akka.cluster.ClusterEvent._ +import akka.cluster.Member +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit._ +import akka.testkit.TestEvent._ +import akka.actor.Terminated +import akka.actor.ActorSelection +import akka.cluster.MemberStatus + +object ClusterSingletonManagerStartupSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.auto-down-unreachable-after = 0s + """)) + + case object EchoStarted + /** + * The singleton actor + */ + class Echo(testActor: ActorRef) extends Actor { + def receive = { + case _ ⇒ + sender() ! self + } + } +} + +class ClusterSingletonManagerStartupMultiJvmNode1 extends ClusterSingletonManagerStartupSpec +class ClusterSingletonManagerStartupMultiJvmNode2 extends ClusterSingletonManagerStartupSpec +class ClusterSingletonManagerStartupMultiJvmNode3 extends ClusterSingletonManagerStartupSpec + +class ClusterSingletonManagerStartupSpec extends MultiNodeSpec(ClusterSingletonManagerStartupSpec) with STMultiNodeSpec with ImplicitSender { + import ClusterSingletonManagerStartupSpec._ + + override def initialParticipants = roles.size + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + Cluster(system) join node(to).address + createSingleton() + } + } + + def createSingleton(): ActorRef = { + system.actorOf(ClusterSingletonManager.props( + singletonProps = Props(classOf[Echo], testActor), + singletonName = "echo", + terminationMessage = PoisonPill, + role = None), + name = "singleton") + } + + lazy val echoProxy: ActorRef = { + system.actorOf(ClusterSingletonProxy.props( + singletonPath = "/user/singleton/echo", + role = None), + name = "echoProxy") + } + + "Startup of Cluster Singleton" must { + + "be quick" in { + join(first, first) + join(second, first) + join(third, first) + + within(7.seconds) { + awaitAssert { + val members = Cluster(system).state.members + members.size should be(3) + members.forall(_.status == MemberStatus.Up) should be(true) + } + } + enterBarrier("all-up") + + // the singleton instance is expected to start "instantly" + echoProxy ! "hello" + expectMsgType[ActorRef](3.seconds) + + enterBarrier("done") + } + + } +}