Cluster singleton testing (#21803)

* add another singleton restart test

* remove from singleton sorted set with unique address

* because the upNumber used by the Ordering can change
This commit is contained in:
Patrik Nordwall 2016-11-12 00:10:30 +01:00 committed by Konrad Malawski
parent 9c087d115e
commit 88a6bdb059
2 changed files with 137 additions and 2 deletions

View file

@ -262,14 +262,17 @@ object ClusterSingletonManager {
def add(m: Member): Unit = {
if (matchingRole(m))
trackChange { ()
membersByAge -= m // replace
// replace, it's possible that the upNumber is changed
membersByAge = membersByAge.filterNot(_.uniqueAddress == m.uniqueAddress)
membersByAge += m
}
}
def remove(m: Member): Unit = {
if (matchingRole(m))
trackChange { () membersByAge -= m }
trackChange { ()
membersByAge = membersByAge.filterNot(_.uniqueAddress == m.uniqueAddress)
}
}
def sendFirstChange(): Unit = {

View file

@ -0,0 +1,132 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster.singleton
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.PoisonPill
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.MemberStatus
import akka.cluster.UniqueAddress
import akka.remote.RARP
import akka.testkit.AkkaSpec
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
object ClusterSingletonRestart2Spec {
def singletonActorProps: Props = Props(new Singleton)
class Singleton extends Actor {
def receive = {
case _ sender() ! Cluster(context.system).selfUniqueAddress
}
}
}
class ClusterSingletonRestart2Spec extends AkkaSpec("""
akka.loglevel = INFO
akka.cluster.roles = [singleton]
akka.actor.provider = akka.cluster.ClusterActorRefProvider
akka.remote {
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
artery.canonical {
hostname = "127.0.0.1"
port = 0
}
}
""") {
val sys1 = ActorSystem(system.name, system.settings.config)
val sys2 = ActorSystem(system.name, system.settings.config)
val sys3 = ActorSystem(
system.name,
ConfigFactory.parseString("akka.cluster.roles = [other]").withFallback(system.settings.config))
var sys4: ActorSystem = null
def join(from: ActorSystem, to: ActorSystem): Unit = {
if (Cluster(from).selfRoles.contains("singleton"))
from.actorOf(
ClusterSingletonManager.props(
singletonProps = ClusterSingletonRestart2Spec.singletonActorProps,
terminationMessage = PoisonPill,
settings = ClusterSingletonManagerSettings(from).withRole("singleton")),
name = "echo")
within(10.seconds) {
awaitAssert {
Cluster(from) join Cluster(to).selfAddress
Cluster(from).state.members.map(_.uniqueAddress) should contain(Cluster(from).selfUniqueAddress)
Cluster(from).state.members.map(_.status) should ===(Set(MemberStatus.Up))
}
}
}
"Restarting cluster node during hand over" must {
"start singletons in restarted node" in {
join(sys1, sys1)
join(sys2, sys1)
join(sys3, sys1)
val proxy3 = sys3.actorOf(ClusterSingletonProxy.props(
"user/echo",
ClusterSingletonProxySettings(sys3).withRole("singleton")), "proxy3")
within(5.seconds) {
awaitAssert {
val probe = TestProbe()(sys3)
proxy3.tell("hello", probe.ref)
probe.expectMsgType[UniqueAddress](1.second) should be(Cluster(sys1).selfUniqueAddress)
}
}
Cluster(sys1).leave(Cluster(sys1).selfAddress)
// at the same time, shutdown sys2, which would be the expected next singleton node
shutdown(sys2)
// it will be downed by the join attempts of the new incarnation
// then restart it
sys4 = {
val sys2port = Cluster(sys2).selfAddress.port.get
val sys4Config =
ConfigFactory.parseString(
if (RARP(sys1).provider.remoteSettings.Artery.Enabled) s"akka.remote.artery.canonical.port=$sys2port"
else s"akka.remote.netty.tcp.port=$sys2port").withFallback(system.settings.config)
ActorSystem(system.name, sys4Config)
}
join(sys4, sys3)
// let it stabilize
Thread.sleep(5000)
within(10.seconds) {
awaitAssert {
val probe = TestProbe()(sys3)
proxy3.tell("hello2", probe.ref)
// note that sys3 doesn't have the required singleton role, so singleton instance should be
// on the restarted node
probe.expectMsgType[UniqueAddress](1.second) should be(Cluster(sys4).selfUniqueAddress)
}
}
}
}
override def afterTermination(): Unit = {
shutdown(sys1)
shutdown(sys2)
shutdown(sys3)
if (sys4 != null)
shutdown(sys4)
}
}