diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index 5b65df3317..f665299b1d 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -262,14 +262,17 @@ object ClusterSingletonManager { def add(m: Member): Unit = { if (matchingRole(m)) trackChange { () ⇒ - membersByAge -= m // replace + // replace, it's possible that the upNumber is changed + membersByAge = membersByAge.filterNot(_.uniqueAddress == m.uniqueAddress) membersByAge += m } } def remove(m: Member): Unit = { if (matchingRole(m)) - trackChange { () ⇒ membersByAge -= m } + trackChange { () ⇒ + membersByAge = membersByAge.filterNot(_.uniqueAddress == m.uniqueAddress) + } } def sendFirstChange(): Unit = { diff --git a/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonRestart2Spec.scala b/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonRestart2Spec.scala new file mode 100644 index 0000000000..6197790d49 --- /dev/null +++ b/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonRestart2Spec.scala @@ -0,0 +1,132 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ +package akka.cluster.singleton + +import scala.concurrent.duration._ + +import akka.actor.Actor +import akka.actor.ActorSystem +import akka.actor.PoisonPill +import akka.actor.Props +import akka.cluster.Cluster +import akka.cluster.MemberStatus +import akka.cluster.UniqueAddress +import akka.remote.RARP +import akka.testkit.AkkaSpec +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory + +object ClusterSingletonRestart2Spec { + def singletonActorProps: Props = Props(new Singleton) + + class Singleton extends Actor { + def receive = { + case _ ⇒ sender() ! Cluster(context.system).selfUniqueAddress + } + } +} + +class ClusterSingletonRestart2Spec extends AkkaSpec(""" + akka.loglevel = INFO + akka.cluster.roles = [singleton] + akka.actor.provider = akka.cluster.ClusterActorRefProvider + akka.remote { + netty.tcp { + hostname = "127.0.0.1" + port = 0 + } + artery.canonical { + hostname = "127.0.0.1" + port = 0 + } + } + """) { + + val sys1 = ActorSystem(system.name, system.settings.config) + val sys2 = ActorSystem(system.name, system.settings.config) + val sys3 = ActorSystem( + system.name, + ConfigFactory.parseString("akka.cluster.roles = [other]").withFallback(system.settings.config)) + var sys4: ActorSystem = null + + def join(from: ActorSystem, to: ActorSystem): Unit = { + if (Cluster(from).selfRoles.contains("singleton")) + from.actorOf( + ClusterSingletonManager.props( + singletonProps = ClusterSingletonRestart2Spec.singletonActorProps, + terminationMessage = PoisonPill, + settings = ClusterSingletonManagerSettings(from).withRole("singleton")), + name = "echo") + + within(10.seconds) { + awaitAssert { + Cluster(from) join Cluster(to).selfAddress + Cluster(from).state.members.map(_.uniqueAddress) should contain(Cluster(from).selfUniqueAddress) + Cluster(from).state.members.map(_.status) should ===(Set(MemberStatus.Up)) + } + } + } + + "Restarting cluster node during hand over" must { + "start singletons in restarted node" in { + join(sys1, sys1) + join(sys2, sys1) + join(sys3, sys1) + + val proxy3 = sys3.actorOf(ClusterSingletonProxy.props( + "user/echo", + ClusterSingletonProxySettings(sys3).withRole("singleton")), "proxy3") + + within(5.seconds) { + awaitAssert { + val probe = TestProbe()(sys3) + proxy3.tell("hello", probe.ref) + probe.expectMsgType[UniqueAddress](1.second) should be(Cluster(sys1).selfUniqueAddress) + } + } + + Cluster(sys1).leave(Cluster(sys1).selfAddress) + + // at the same time, shutdown sys2, which would be the expected next singleton node + shutdown(sys2) + // it will be downed by the join attempts of the new incarnation + + // then restart it + sys4 = { + val sys2port = Cluster(sys2).selfAddress.port.get + + val sys4Config = + ConfigFactory.parseString( + if (RARP(sys1).provider.remoteSettings.Artery.Enabled) s"akka.remote.artery.canonical.port=$sys2port" + else s"akka.remote.netty.tcp.port=$sys2port").withFallback(system.settings.config) + + ActorSystem(system.name, sys4Config) + } + join(sys4, sys3) + + // let it stabilize + Thread.sleep(5000) + + within(10.seconds) { + awaitAssert { + val probe = TestProbe()(sys3) + proxy3.tell("hello2", probe.ref) + // note that sys3 doesn't have the required singleton role, so singleton instance should be + // on the restarted node + probe.expectMsgType[UniqueAddress](1.second) should be(Cluster(sys4).selfUniqueAddress) + } + } + + } + } + + override def afterTermination(): Unit = { + shutdown(sys1) + shutdown(sys2) + shutdown(sys3) + if (sys4 != null) + shutdown(sys4) + } +} +