Make joining to the same node multiple times work, and reenable blackhole test. See #2930

This commit is contained in:
Björn Antonsson 2013-03-20 10:32:18 +01:00
parent f18575e251
commit 5827a27b94
5 changed files with 57 additions and 27 deletions

View file

@ -239,6 +239,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
var seedNodeProcess: Option[ActorRef] = None
var tryingToJoinWith: Option[Address] = None
/**
* Looks up and returns the remote cluster command connection for the specific address.
*/
@ -341,7 +343,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
log.warning("Trying to join member with wrong ActorSystem name, but was ignored, expected [{}] but was [{}]",
selfAddress.system, address.system)
else if (!latestGossip.members.exists(_.address == address)) {
// to support manual join when joining to seed nodes is stuck (no seed nodes available)
val snd = sender
seedNodeProcess match {
@ -355,15 +356,18 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
case None // no seedNodeProcess in progress
}
// wipe our state since a node that joins a cluster must be empty
latestGossip = Gossip.empty
// wipe the failure detector since we are starting fresh and shouldn't care about the past
failureDetector.reset()
// wipe the publisher since we are starting fresh
publisher ! PublishStart
publish(latestGossip)
// only wipe the state if we're not in the process of joining this address
if (tryingToJoinWith.forall(_ != address)) {
tryingToJoinWith = Some(address)
// wipe our state since a node that joins a cluster must be empty
latestGossip = Gossip.empty
// wipe the failure detector since we are starting fresh and shouldn't care about the past
failureDetector.reset()
// wipe the publisher since we are starting fresh
publisher ! PublishStart
publish(latestGossip)
}
context.become(initialized)
if (address == selfAddress)
joining(address, cluster.selfRoles)
@ -517,6 +521,10 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
} else if (localGossip.overview.isNonDownUnreachable(from)) {
log.debug("Ignoring received gossip from unreachable [{}] ", from)
} else {
// if we're in the remote gossip and not Removed, then we're not joining
if (tryingToJoinWith.nonEmpty && remoteGossip.member(selfAddress).status != Removed)
tryingToJoinWith = None
val comparison = remoteGossip.version tryCompareTo localGossip.version
val conflict = comparison.isEmpty

View file

@ -182,6 +182,27 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro
enterBarrier(roles.map(_.name).mkString("-") + "-joined")
}
/**
* Join the specific node within the given period by sending repeated join
* requests at periodic intervals until we succeed.
*/
def joinWithin(joinNode: RoleName, max: Duration = remaining, interval: Duration = 1.second): Unit = {
def memberInState(member: Address, status: Seq[MemberStatus]): Boolean =
clusterView.members.exists { m (m.address == member) && status.contains(m.status) }
cluster join joinNode
awaitCond({
clusterView.refreshCurrentState()
if (memberInState(joinNode, List(MemberStatus.up)) &&
memberInState(myself, List(MemberStatus.Joining, MemberStatus.Up)))
true
else {
cluster join joinNode
false
}
}, max, interval)
}
/**
* Assert that the member addresses match the expected addresses in the
* sort order used by the cluster.

View file

@ -25,10 +25,14 @@ case class UnreachableNodeRejoinsClusterMultiNodeConfig(failureDetectorPuppet: B
commonConfig(ConfigFactory.parseString(
"""
# this setting is here to limit the number of retries and failures while the
# node is being blackholed
akka.remote.failure-detector.retry-gate-closed-for = 500 ms
akka.remote.log-remote-lifecycle-events = off
akka.cluster.publish-stats-interval = 0s
akka.loglevel = INFO
""").withFallback(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)))
""").withFallback(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))))
testTransport(on = true)
}
@ -74,8 +78,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod
endBarrier
}
// FIXME ignored due to ticket #2930 - timeout changing throttler mode
"mark a node as UNREACHABLE when we pull the network" taggedAs LongRunningTest ignore {
"mark a node as UNREACHABLE when we pull the network" taggedAs LongRunningTest in {
// let them send at least one heartbeat to each other after the gossip convergence
// because for new joining nodes we remove them from the failure detector when
// receive gossip
@ -125,8 +128,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod
endBarrier
}
// FIXME ignored due to ticket #2930 - timeout changing throttler mode
"mark the node as DOWN" taggedAs LongRunningTest ignore {
"mark the node as DOWN" taggedAs LongRunningTest in {
runOn(master) {
cluster down victim
}
@ -135,13 +137,12 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod
awaitMembersUp(roles.size - 1, Set(victim))
// eventually removed
awaitCond(clusterView.unreachableMembers.isEmpty, 15 seconds)
}
}
endBarrier
}
// FIXME ignored due to ticket #2930 - timeout changing throttler mode
"allow node to REJOIN when the network is plugged back in" taggedAs LongRunningTest ignore {
"allow node to REJOIN when the network is plugged back in" taggedAs LongRunningTest in {
runOn(first) {
// put the network back in
allBut(victim).foreach { roleName
@ -152,7 +153,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod
enterBarrier("plug_in_victim")
runOn(victim) {
cluster join master
joinWithin(master, 10.seconds)
}
awaitMembersUp(roles.size)

View file

@ -147,10 +147,10 @@ object ThrottlerTransportAdapter {
* Management Command to force dissocation of an address.
*/
@SerialVersionUID(1L)
case class ForceDissociate(address: Address)
case class ForceDisassociate(address: Address)
@SerialVersionUID(1L)
case object ForceDissociateAck {
case object ForceDisassociateAck {
/**
* Java API: get the singleton instance
*/
@ -174,8 +174,8 @@ class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedA
cmd match {
case s: SetThrottle
manager ? s map { case SetThrottleAck true }
case f: ForceDissociate
manager ? f map { case ForceDissociateAck true }
case f: ForceDisassociate
manager ? f map { case ForceDisassociateAck true }
case _ wrappedTransport.managementCommand(cmd)
}
}
@ -231,13 +231,13 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
case _ ok
}
Future.sequence(allAcks).map(_ SetThrottleAck) pipeTo sender
case ForceDissociate(address)
case ForceDisassociate(address)
val naked = nakedAddress(address)
handleTable.foreach {
case (`naked`, handle) handle.disassociate()
case _
}
sender ! ForceDissociateAck
sender ! ForceDisassociateAck
case Checkin(origin, handle)
val naked: Address = nakedAddress(origin)

View file

@ -74,10 +74,10 @@ class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSende
Await.result(transport.managementCommand(SetThrottle(rootBAddress, direction, mode)), 3.seconds)
}
def dissociate(): Boolean = {
def disassociate(): Boolean = {
val rootBAddress = Address("akka", "systemB", "localhost", rootB.address.port.get)
val transport = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport
Await.result(transport.managementCommand(ForceDissociate(rootBAddress)), 3.seconds)
Await.result(transport.managementCommand(ForceDisassociate(rootBAddress)), 3.seconds)
}
"ThrottlerTransportAdapter" must {
@ -99,7 +99,7 @@ class ThrottlerTransportAdapterSpec extends AkkaSpec(configA) with ImplicitSende
here ! "Blackhole 2"
expectNoMsg(1.seconds)
dissociate() must be(true)
disassociate() must be(true)
expectNoMsg(1.seconds)
throttle(Direction.Both, Unthrottled) must be(true)