diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index e7f58ab1e3..68b791b68f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -110,25 +110,21 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg def active: Actor.Receive = { case HeartbeatTick ⇒ heartbeat() case HeartbeatRsp(from) ⇒ heartbeatRsp(from) - case MemberUp(m) ⇒ addMember(m) - case MemberWeaklyUp(m) ⇒ addMember(m) case MemberRemoved(m, _) ⇒ removeMember(m) + case evt: MemberEvent ⇒ addMember(evt.member) case UnreachableMember(m) ⇒ unreachableMember(m) case ReachableMember(m) ⇒ reachableMember(m) - case _: MemberEvent ⇒ // not interested in other types of MemberEvent case ExpectedFirstHeartbeat(from) ⇒ triggerFirstHeartbeat(from) } def init(snapshot: CurrentClusterState): Unit = { - val nodes: Set[UniqueAddress] = snapshot.members.collect { - case m if m.status == MemberStatus.Up || m.status == MemberStatus.WeaklyUp ⇒ m.uniqueAddress - }(collection.breakOut) + val nodes: Set[UniqueAddress] = snapshot.members.map(_.uniqueAddress) val unreachable: Set[UniqueAddress] = snapshot.unreachable.map(_.uniqueAddress) state = state.init(nodes, unreachable) } def addMember(m: Member): Unit = - if (m.uniqueAddress != selfUniqueAddress) + if (m.uniqueAddress != selfUniqueAddress && !state.contains(m.uniqueAddress)) state = state.addMember(m.uniqueAddress) def removeMember(m: Member): Unit = @@ -191,6 +187,8 @@ private[cluster] final case class ClusterHeartbeatSenderState( def init(nodes: Set[UniqueAddress], unreachable: Set[UniqueAddress]): ClusterHeartbeatSenderState = copy(ring = ring.copy(nodes = nodes + selfAddress, unreachable = unreachable)) + def contains(node: UniqueAddress): Boolean = ring.nodes(node) + def addMember(node: UniqueAddress): ClusterHeartbeatSenderState = membershipChange(ring :+ node) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode3Spec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode3Spec.scala new file mode 100644 index 0000000000..f1733ffd8d --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode3Spec.scala @@ -0,0 +1,144 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable +import scala.concurrent.duration._ + +import akka.actor.Actor +import akka.actor.ActorSystem +import akka.actor.Address +import akka.actor.Deploy +import akka.actor.Props +import akka.actor.RootActorPath +import akka.cluster.MemberStatus._ +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.transport.ThrottlerTransportAdapter.Direction +import akka.testkit._ +import com.typesafe.config.ConfigFactory + +object RestartNode3MultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString("akka.cluster.auto-down-unreachable-after = off")). + withFallback(MultiNodeClusterSpec.clusterConfig)) + + testTransport(on = true) +} + +class RestartNode3MultiJvmNode1 extends RestartNode3Spec +class RestartNode3MultiJvmNode2 extends RestartNode3Spec +class RestartNode3MultiJvmNode3 extends RestartNode3Spec + +abstract class RestartNode3Spec + extends MultiNodeSpec(RestartNode3MultiJvmSpec) + with MultiNodeClusterSpec with ImplicitSender { + + import RestartNode3MultiJvmSpec._ + + @volatile var secondUniqueAddress: UniqueAddress = _ + + // use a separate ActorSystem, to be able to simulate restart + lazy val secondSystem = ActorSystem(system.name, system.settings.config) + + def seedNodes: immutable.IndexedSeq[Address] = Vector(first) + + lazy val restartedSecondSystem = ActorSystem(system.name, + ConfigFactory.parseString("akka.remote.netty.tcp.port=" + secondUniqueAddress.address.port.get). + withFallback(system.settings.config)) + + override def afterAll(): Unit = { + runOn(second) { + if (secondSystem.whenTerminated.isCompleted) + shutdown(restartedSecondSystem) + else + shutdown(secondSystem) + } + super.afterAll() + } + + override def expectedTestDuration = 2.minutes + + "Cluster nodes" must { + "be able to restart and join again when Down before Up" taggedAs LongRunningTest in within(60.seconds) { + // secondSystem is a separate ActorSystem, to be able to simulate restart + // we must transfer its address to first + runOn(first, third) { + system.actorOf(Props(new Actor { + def receive = { + case a: UniqueAddress ⇒ + secondUniqueAddress = a + sender() ! "ok" + } + }).withDeploy(Deploy.local), name = "address-receiver") + enterBarrier("second-address-receiver-ready") + } + + runOn(second) { + enterBarrier("second-address-receiver-ready") + secondUniqueAddress = Cluster(secondSystem).selfUniqueAddress + List(first, third) foreach { r ⇒ + system.actorSelection(RootActorPath(r) / "user" / "address-receiver") ! secondUniqueAddress + expectMsg(5.seconds, "ok") + } + } + enterBarrier("second-address-transfered") + + // now we can join first, third together + runOn(first, third) { + cluster.joinSeedNodes(seedNodes) + awaitMembersUp(2) + } + enterBarrier("first-third-up") + + // make third unreachable, so that leader can't perform its duties + runOn(first) { + testConductor.blackhole(first, third, Direction.Both).await + val thirdAddress = address(third) + awaitAssert(clusterView.unreachableMembers.map(_.address) should ===(Set(thirdAddress))) + } + enterBarrier("third-unreachable") + + runOn(second) { + Cluster(secondSystem).joinSeedNodes(seedNodes) + awaitAssert(Cluster(secondSystem).readView.members.size should ===(3)) + awaitAssert(Cluster(secondSystem).readView.members.collectFirst { + case m if m.address == Cluster(secondSystem).selfAddress ⇒ m.status + } should ===(Some(Joining))) + } + enterBarrier("second-joined") + + // shutdown secondSystem + runOn(second) { + shutdown(secondSystem, remaining) + } + enterBarrier("second-shutdown") + + // then immediately start restartedSecondSystem, which has the same address as secondSystem + runOn(first) { + testConductor.passThrough(first, third, Direction.Both).await + } + runOn(second) { + Cluster(restartedSecondSystem).joinSeedNodes(seedNodes) + awaitAssert(Cluster(restartedSecondSystem).readView.members.size should ===(3)) + awaitAssert(Cluster(restartedSecondSystem).readView.members.map(_.status) should ===(Set(Up))) + } + runOn(first, third) { + awaitAssert { + Cluster(system).readView.members.size should ===(3) + Cluster(system).readView.members.exists { m ⇒ + m.address == secondUniqueAddress.address && m.uniqueAddress.uid != secondUniqueAddress.uid + } + } + } + enterBarrier("second-restarted") + + } + + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala index 2e43e4ce19..48aea86078 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala @@ -52,7 +52,7 @@ abstract class RestartNodeSpec override def afterAll(): Unit = { runOn(second) { - if (secondSystem.isTerminated) + if (secondSystem.whenTerminated.isCompleted) shutdown(restartedSecondSystem) else shutdown(secondSystem)