diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 64854f243b..5e03085e3a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -239,6 +239,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto var seedNodeProcess: Option[ActorRef] = None + var tryingToJoinWith: Option[Address] = None + /** * Looks up and returns the remote cluster command connection for the specific address. */ @@ -341,7 +343,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto log.warning("Trying to join member with wrong ActorSystem name, but was ignored, expected [{}] but was [{}]", selfAddress.system, address.system) else if (!latestGossip.members.exists(_.address == address)) { - // to support manual join when joining to seed nodes is stuck (no seed nodes available) val snd = sender seedNodeProcess match { @@ -355,15 +356,18 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto case None ⇒ // no seedNodeProcess in progress } - // wipe our state since a node that joins a cluster must be empty - latestGossip = Gossip.empty - // wipe the failure detector since we are starting fresh and shouldn't care about the past - failureDetector.reset() - // wipe the publisher since we are starting fresh - publisher ! PublishStart - - publish(latestGossip) + // only wipe the state if we're not in the process of joining this address + if (tryingToJoinWith.forall(_ != address)) { + tryingToJoinWith = Some(address) + // wipe our state since a node that joins a cluster must be empty + latestGossip = Gossip.empty + // wipe the failure detector since we are starting fresh and shouldn't care about the past + failureDetector.reset() + // wipe the publisher since we are starting fresh + publisher ! PublishStart + publish(latestGossip) + } context.become(initialized) if (address == selfAddress) joining(address, cluster.selfRoles) @@ -517,6 +521,10 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto } else if (localGossip.overview.isNonDownUnreachable(from)) { log.debug("Ignoring received gossip from unreachable [{}] ", from) } else { + // if we're in the remote gossip and not Removed, then we're not joining + if (tryingToJoinWith.nonEmpty && remoteGossip.member(selfAddress).status != Removed) + tryingToJoinWith = None + val comparison = remoteGossip.version tryCompareTo localGossip.version val conflict = comparison.isEmpty diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index f5d07b8724..52135b47d9 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -182,6 +182,27 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro enterBarrier(roles.map(_.name).mkString("-") + "-joined") } + /** + * Join the specific node within the given period by sending repeated join + * requests at periodic intervals until we succeed. + */ + def joinWithin(joinNode: RoleName, max: Duration = remaining, interval: Duration = 1.second): Unit = { + def memberInState(member: Address, status: Seq[MemberStatus]): Boolean = + clusterView.members.exists { m ⇒ (m.address == member) && status.contains(m.status) } + + cluster join joinNode + awaitCond({ + clusterView.refreshCurrentState() + if (memberInState(joinNode, List(MemberStatus.up)) && + memberInState(myself, List(MemberStatus.Joining, MemberStatus.Up))) + true + else { + cluster join joinNode + false + } + }, max, interval) + } + /** * Assert that the member addresses match the expected addresses in the * sort order used by the cluster. diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala index 9f076c1d78..fe1061e8a8 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -25,10 +25,14 @@ case class UnreachableNodeRejoinsClusterMultiNodeConfig(failureDetectorPuppet: B commonConfig(ConfigFactory.parseString( """ + # this setting is here to limit the number of retries and failures while the + # node is being blackholed + akka.remote.failure-detector.retry-gate-closed-for = 500 ms + akka.remote.log-remote-lifecycle-events = off akka.cluster.publish-stats-interval = 0s akka.loglevel = INFO - """).withFallback(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))) + """).withFallback(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet)))) testTransport(on = true) } @@ -74,8 +78,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod endBarrier } - // FIXME ignored due to ticket #2930 - timeout changing throttler mode - "mark a node as UNREACHABLE when we pull the network" taggedAs LongRunningTest ignore { + "mark a node as UNREACHABLE when we pull the network" taggedAs LongRunningTest in { // let them send at least one heartbeat to each other after the gossip convergence // because for new joining nodes we remove them from the failure detector when // receive gossip @@ -125,8 +128,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod endBarrier } - // FIXME ignored due to ticket #2930 - timeout changing throttler mode - "mark the node as DOWN" taggedAs LongRunningTest ignore { + "mark the node as DOWN" taggedAs LongRunningTest in { runOn(master) { cluster down victim } @@ -135,13 +137,12 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod awaitMembersUp(roles.size - 1, Set(victim)) // eventually removed awaitCond(clusterView.unreachableMembers.isEmpty, 15 seconds) - } + } endBarrier } - // FIXME ignored due to ticket #2930 - timeout changing throttler mode - "allow node to REJOIN when the network is plugged back in" taggedAs LongRunningTest ignore { + "allow node to REJOIN when the network is plugged back in" taggedAs LongRunningTest in { runOn(first) { // put the network back in allBut(victim).foreach { roleName ⇒ @@ -152,7 +153,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod enterBarrier("plug_in_victim") runOn(victim) { - cluster join master + joinWithin(master, 10.seconds) } awaitMembersUp(roles.size) diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index 2726d71191..55dfc152da 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -147,10 +147,10 @@ object ThrottlerTransportAdapter { * Management Command to force dissocation of an address. */ @SerialVersionUID(1L) - case class ForceDissociate(address: Address) + case class ForceDisassociate(address: Address) @SerialVersionUID(1L) - case object ForceDissociateAck { + case object ForceDisassociateAck { /** * Java API: get the singleton instance */ @@ -174,8 +174,8 @@ class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedA cmd match { case s: SetThrottle ⇒ manager ? s map { case SetThrottleAck ⇒ true } - case f: ForceDissociate ⇒ - manager ? f map { case ForceDissociateAck ⇒ true } + case f: ForceDisassociate ⇒ + manager ? f map { case ForceDisassociateAck ⇒ true } case _ ⇒ wrappedTransport.managementCommand(cmd) } } @@ -231,13 +231,13 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A case _ ⇒ ok } Future.sequence(allAcks).map(_ ⇒ SetThrottleAck) pipeTo sender - case ForceDissociate(address) ⇒ + case ForceDisassociate(address) ⇒ val naked = nakedAddress(address) handleTable.foreach { case (`naked`, handle) ⇒ handle.disassociate() case _ ⇒ } - sender ! ForceDissociateAck + sender ! ForceDisassociateAck case Checkin(origin, handle) ⇒ val naked: Address = nakedAddress(origin) diff --git a/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala index 9f31b5a271..e2f935f109 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/ThrottlerTransportAdapterSpec.scala @@ -74,10 +74,10 @@ class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSende Await.result(transport.managementCommand(SetThrottle(rootBAddress, direction, mode)), 3.seconds) } - def dissociate(): Boolean = { + def disassociate(): Boolean = { val rootBAddress = Address("akka", "systemB", "localhost", rootB.address.port.get) val transport = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport - Await.result(transport.managementCommand(ForceDissociate(rootBAddress)), 3.seconds) + Await.result(transport.managementCommand(ForceDisassociate(rootBAddress)), 3.seconds) } "ThrottlerTransportAdapter" must { @@ -99,7 +99,7 @@ class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSende here ! "Blackhole 2" expectNoMsg(1.seconds) - dissociate() must be(true) + disassociate() must be(true) expectNoMsg(1.seconds) throttle(Direction.Both, Unthrottled) must be(true)