2012-05-25 12:10:37 +02:00
|
|
|
/**
|
2016-02-23 12:58:39 +01:00
|
|
|
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
|
2012-05-25 12:10:37 +02:00
|
|
|
*/
|
|
|
|
|
package akka.cluster
|
|
|
|
|
|
2014-12-12 11:49:32 -06:00
|
|
|
// TODO remove metrics
|
|
|
|
|
|
2012-07-26 14:47:21 +02:00
|
|
|
import language.implicitConversions
|
2013-12-03 09:18:26 +01:00
|
|
|
import org.scalatest.{ Suite, Outcome, Canceled }
|
|
|
|
|
import org.scalatest.exceptions.TestCanceledException
|
2012-05-25 12:10:37 +02:00
|
|
|
import com.typesafe.config.Config
|
|
|
|
|
import com.typesafe.config.ConfigFactory
|
|
|
|
|
import akka.remote.testconductor.RoleName
|
2012-09-20 15:23:18 +02:00
|
|
|
import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeSpec }
|
2012-05-28 11:06:02 +02:00
|
|
|
import akka.testkit._
|
2012-10-01 20:08:21 +02:00
|
|
|
import akka.testkit.TestEvent._
|
2012-10-30 15:08:41 +01:00
|
|
|
import akka.actor.{ ActorSystem, Address }
|
|
|
|
|
import akka.event.Logging.ErrorLevel
|
2012-09-21 14:50:06 +02:00
|
|
|
import scala.concurrent.duration._
|
2012-10-30 15:08:41 +01:00
|
|
|
import scala.collection.immutable
|
2012-06-15 17:32:40 +02:00
|
|
|
import java.util.concurrent.ConcurrentHashMap
|
2013-01-29 11:55:33 +01:00
|
|
|
import akka.remote.DefaultFailureDetectorRegistry
|
2013-04-23 11:56:09 +02:00
|
|
|
import akka.actor.ActorRef
|
|
|
|
|
import akka.actor.Actor
|
|
|
|
|
import akka.actor.RootActorPath
|
2012-05-25 12:10:37 +02:00
|
|
|
|
|
|
|
|
object MultiNodeClusterSpec {
|
2012-09-06 21:48:40 +02:00
|
|
|
|
|
|
|
|
def clusterConfigWithFailureDetectorPuppet: Config =
|
|
|
|
|
ConfigFactory.parseString("akka.cluster.failure-detector.implementation-class = akka.cluster.FailureDetectorPuppet").
|
|
|
|
|
withFallback(clusterConfig)
|
|
|
|
|
|
|
|
|
|
def clusterConfig(failureDetectorPuppet: Boolean): Config =
|
|
|
|
|
if (failureDetectorPuppet) clusterConfigWithFailureDetectorPuppet else clusterConfig
|
|
|
|
|
|
2012-05-25 12:10:37 +02:00
|
|
|
def clusterConfig: Config = ConfigFactory.parseString("""
|
2016-06-10 15:04:13 +02:00
|
|
|
akka.actor.provider = cluster
|
2012-05-25 12:10:37 +02:00
|
|
|
akka.cluster {
|
2012-09-20 15:23:18 +02:00
|
|
|
jmx.enabled = off
|
|
|
|
|
gossip-interval = 200 ms
|
|
|
|
|
leader-actions-interval = 200 ms
|
2013-08-27 15:14:53 +02:00
|
|
|
unreachable-nodes-reaper-interval = 500 ms
|
2012-09-20 15:23:18 +02:00
|
|
|
periodic-tasks-initial-delay = 300 ms
|
|
|
|
|
publish-stats-interval = 0 s # always, when it happens
|
2013-08-27 15:14:53 +02:00
|
|
|
failure-detector.heartbeat-interval = 500 ms
|
2012-05-25 12:10:37 +02:00
|
|
|
}
|
2012-10-01 20:08:21 +02:00
|
|
|
akka.loglevel = INFO
|
2013-08-27 15:14:53 +02:00
|
|
|
akka.log-dead-letters = off
|
2013-06-19 18:28:51 +02:00
|
|
|
akka.log-dead-letters-during-shutdown = off
|
2012-09-06 21:48:40 +02:00
|
|
|
akka.remote.log-remote-lifecycle-events = off
|
2013-02-01 08:02:53 +01:00
|
|
|
akka.loggers = ["akka.testkit.TestEventListener"]
|
2012-05-25 12:10:37 +02:00
|
|
|
akka.test {
|
|
|
|
|
single-expect-default = 5 s
|
|
|
|
|
}
|
|
|
|
|
""")
|
2013-04-23 11:56:09 +02:00
|
|
|
|
|
|
|
|
// sometimes we need to coordinate test shutdown with messages instead of barriers
|
|
|
|
|
object EndActor {
|
|
|
|
|
case object SendEnd
|
|
|
|
|
case object End
|
|
|
|
|
case object EndAck
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class EndActor(testActor: ActorRef, target: Option[Address]) extends Actor {
|
|
|
|
|
import EndActor._
|
|
|
|
|
def receive = {
|
|
|
|
|
case SendEnd ⇒
|
|
|
|
|
target foreach { t ⇒
|
|
|
|
|
context.actorSelection(RootActorPath(t) / self.path.elements) ! End
|
|
|
|
|
}
|
|
|
|
|
case End ⇒
|
|
|
|
|
testActor forward End
|
2014-01-16 15:16:35 +01:00
|
|
|
sender() ! EndAck
|
2013-04-23 11:56:09 +02:00
|
|
|
case EndAck ⇒
|
|
|
|
|
testActor forward EndAck
|
|
|
|
|
}
|
|
|
|
|
}
|
2012-05-25 12:10:37 +02:00
|
|
|
}
|
|
|
|
|
|
2013-02-28 15:45:12 +13:00
|
|
|
trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoroner { self: MultiNodeSpec ⇒
|
2012-05-25 12:10:37 +02:00
|
|
|
|
2012-06-08 09:23:36 +02:00
|
|
|
override def initialParticipants = roles.size
|
|
|
|
|
|
2012-06-15 17:32:40 +02:00
|
|
|
private val cachedAddresses = new ConcurrentHashMap[RoleName, Address]
|
|
|
|
|
|
2012-10-01 20:08:21 +02:00
|
|
|
override def atStartup(): Unit = {
|
2013-02-28 15:45:12 +13:00
|
|
|
startCoroner()
|
2012-10-01 20:08:21 +02:00
|
|
|
muteLog()
|
|
|
|
|
}
|
|
|
|
|
|
2013-02-28 15:45:12 +13:00
|
|
|
override def afterTermination(): Unit = {
|
|
|
|
|
stopCoroner()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def expectedTestDuration = 60.seconds
|
|
|
|
|
|
2012-10-01 20:08:21 +02:00
|
|
|
def muteLog(sys: ActorSystem = system): Unit = {
|
|
|
|
|
if (!sys.log.isDebugEnabled) {
|
2016-06-02 14:06:57 +02:00
|
|
|
Seq(
|
|
|
|
|
".*Metrics collection has started successfully.*",
|
2012-11-07 20:36:24 +01:00
|
|
|
".*Metrics will be retreived from MBeans.*",
|
2012-10-01 20:08:21 +02:00
|
|
|
".*Cluster Node.* - registered cluster JMX MBean.*",
|
|
|
|
|
".*Cluster Node.* - is starting up.*",
|
|
|
|
|
".*Shutting down cluster Node.*",
|
|
|
|
|
".*Cluster node successfully shut down.*",
|
2013-01-15 09:35:07 +01:00
|
|
|
".*Using a dedicated scheduler for cluster.*") foreach { s ⇒
|
2012-10-01 20:08:21 +02:00
|
|
|
sys.eventStream.publish(Mute(EventFilter.info(pattern = s)))
|
|
|
|
|
}
|
|
|
|
|
|
2013-04-22 20:39:32 +02:00
|
|
|
muteDeadLetters(
|
2013-11-07 13:52:08 +01:00
|
|
|
classOf[ClusterHeartbeatSender.Heartbeat],
|
|
|
|
|
classOf[ClusterHeartbeatSender.HeartbeatRsp],
|
2013-04-23 15:05:27 +02:00
|
|
|
classOf[GossipEnvelope],
|
2013-04-28 22:28:20 +02:00
|
|
|
classOf[GossipStatus],
|
2013-04-23 15:05:27 +02:00
|
|
|
classOf[MetricsGossipEnvelope],
|
|
|
|
|
classOf[ClusterEvent.ClusterMetricsChanged],
|
|
|
|
|
classOf[InternalClusterAction.Tick],
|
|
|
|
|
classOf[akka.actor.PoisonPill],
|
|
|
|
|
classOf[akka.dispatch.sysmsg.DeathWatchNotification],
|
2013-08-27 15:14:53 +02:00
|
|
|
classOf[akka.remote.transport.AssociationHandle.Disassociated],
|
|
|
|
|
// akka.remote.transport.AssociationHandle.Disassociated.getClass,
|
|
|
|
|
classOf[akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying],
|
|
|
|
|
// akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying.getClass,
|
2013-04-23 15:05:27 +02:00
|
|
|
classOf[akka.remote.transport.AssociationHandle.InboundPayload])(sys)
|
2013-04-22 20:39:32 +02:00
|
|
|
|
2012-10-01 20:08:21 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2013-01-17 14:00:01 +01:00
|
|
|
def muteMarkingAsUnreachable(sys: ActorSystem = system): Unit =
|
|
|
|
|
if (!sys.log.isDebugEnabled)
|
|
|
|
|
sys.eventStream.publish(Mute(EventFilter.error(pattern = ".*Marking.* as UNREACHABLE.*")))
|
2012-10-01 20:08:21 +02:00
|
|
|
|
2013-08-27 15:14:53 +02:00
|
|
|
def muteMarkingAsReachable(sys: ActorSystem = system): Unit =
|
|
|
|
|
if (!sys.log.isDebugEnabled)
|
|
|
|
|
sys.eventStream.publish(Mute(EventFilter.info(pattern = ".*Marking.* as REACHABLE.*")))
|
|
|
|
|
|
2012-10-01 20:08:21 +02:00
|
|
|
override def afterAll(): Unit = {
|
|
|
|
|
if (!log.isDebugEnabled) {
|
2013-04-22 20:39:32 +02:00
|
|
|
muteDeadLetters()()
|
2012-10-01 20:08:21 +02:00
|
|
|
system.eventStream.setLogLevel(ErrorLevel)
|
|
|
|
|
}
|
|
|
|
|
super.afterAll()
|
|
|
|
|
}
|
|
|
|
|
|
2012-06-15 17:32:40 +02:00
|
|
|
/**
|
|
|
|
|
* Lookup the Address for the role.
|
2012-06-18 11:16:30 +02:00
|
|
|
*
|
|
|
|
|
* Implicit conversion from RoleName to Address.
|
|
|
|
|
*
|
2012-06-15 17:32:40 +02:00
|
|
|
* It is cached, which has the implication that stopping
|
|
|
|
|
* and then restarting a role (jvm) with another address is not
|
|
|
|
|
* supported.
|
|
|
|
|
*/
|
2012-06-18 11:16:30 +02:00
|
|
|
implicit def address(role: RoleName): Address = {
|
2012-06-15 17:32:40 +02:00
|
|
|
cachedAddresses.get(role) match {
|
|
|
|
|
case null ⇒
|
|
|
|
|
val address = node(role).address
|
|
|
|
|
cachedAddresses.put(role, address)
|
|
|
|
|
address
|
|
|
|
|
case address ⇒ address
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-06-15 13:32:55 +02:00
|
|
|
// Cluster tests are written so that if previous step (test method) failed
|
|
|
|
|
// it will most likely not be possible to run next step. This ensures
|
|
|
|
|
// fail fast of steps after the first failure.
|
|
|
|
|
private var failed = false
|
2013-12-03 09:18:26 +01:00
|
|
|
override protected def withFixture(test: NoArgTest): Outcome =
|
2012-06-15 13:32:55 +02:00
|
|
|
if (failed) {
|
2013-12-03 09:18:26 +01:00
|
|
|
Canceled(new TestCanceledException("Previous step failed", 0))
|
|
|
|
|
} else {
|
|
|
|
|
val out = super.withFixture(test)
|
|
|
|
|
if (!out.isSucceeded)
|
|
|
|
|
failed = true
|
|
|
|
|
out
|
2012-06-15 13:32:55 +02:00
|
|
|
}
|
|
|
|
|
|
2012-08-16 18:28:01 +02:00
|
|
|
def clusterView: ClusterReadView = cluster.readView
|
|
|
|
|
|
2012-06-11 14:32:17 +02:00
|
|
|
/**
|
|
|
|
|
* Get the cluster node to use.
|
|
|
|
|
*/
|
2012-09-06 21:48:40 +02:00
|
|
|
def cluster: Cluster = Cluster(system)
|
2012-05-25 12:10:37 +02:00
|
|
|
|
2012-06-04 23:21:28 +02:00
|
|
|
/**
|
2012-06-25 21:07:44 +02:00
|
|
|
* Use this method for the initial startup of the cluster node.
|
2012-06-04 23:21:28 +02:00
|
|
|
*/
|
2012-06-25 21:07:44 +02:00
|
|
|
def startClusterNode(): Unit = {
|
2012-08-16 18:28:01 +02:00
|
|
|
if (clusterView.members.isEmpty) {
|
2012-06-25 21:07:44 +02:00
|
|
|
cluster join myself
|
2013-12-17 14:25:56 +01:00
|
|
|
awaitAssert(clusterView.members.map(_.address) should contain(address(myself)))
|
2012-06-25 21:07:44 +02:00
|
|
|
} else
|
2012-08-16 18:28:01 +02:00
|
|
|
clusterView.self
|
2012-06-25 21:07:44 +02:00
|
|
|
}
|
2012-06-04 23:21:28 +02:00
|
|
|
|
2012-06-05 15:53:30 +02:00
|
|
|
/**
|
|
|
|
|
* Initialize the cluster of the specified member
|
|
|
|
|
* nodes (roles) and wait until all joined and `Up`.
|
|
|
|
|
* First node will be started first and others will join
|
|
|
|
|
* the first.
|
|
|
|
|
*/
|
2013-03-05 21:05:11 +01:00
|
|
|
def awaitClusterUp(roles: RoleName*): Unit = {
|
2012-06-05 14:13:44 +02:00
|
|
|
runOn(roles.head) {
|
|
|
|
|
// make sure that the node-to-join is started before other join
|
|
|
|
|
startClusterNode()
|
|
|
|
|
}
|
2012-06-15 14:39:47 +02:00
|
|
|
enterBarrier(roles.head.name + "-started")
|
2012-06-05 14:13:44 +02:00
|
|
|
if (roles.tail.contains(myself)) {
|
2012-06-15 17:32:40 +02:00
|
|
|
cluster.join(roles.head)
|
2012-06-05 14:13:44 +02:00
|
|
|
}
|
2013-03-05 21:05:11 +01:00
|
|
|
if (roles.contains(myself)) {
|
|
|
|
|
awaitMembersUp(numberOfMembers = roles.length)
|
2012-06-05 14:13:44 +02:00
|
|
|
}
|
2012-06-15 14:39:47 +02:00
|
|
|
enterBarrier(roles.map(_.name).mkString("-") + "-joined")
|
2012-06-05 14:13:44 +02:00
|
|
|
}
|
|
|
|
|
|
2013-03-20 10:32:18 +01:00
|
|
|
/**
|
|
|
|
|
* Join the specific node within the given period by sending repeated join
|
|
|
|
|
* requests at periodic intervals until we succeed.
|
|
|
|
|
*/
|
2014-03-11 11:23:12 +01:00
|
|
|
def joinWithin(joinNode: RoleName, max: Duration = remainingOrDefault, interval: Duration = 1.second): Unit = {
|
2013-03-20 10:32:18 +01:00
|
|
|
def memberInState(member: Address, status: Seq[MemberStatus]): Boolean =
|
|
|
|
|
clusterView.members.exists { m ⇒ (m.address == member) && status.contains(m.status) }
|
|
|
|
|
|
|
|
|
|
cluster join joinNode
|
|
|
|
|
awaitCond({
|
|
|
|
|
clusterView.refreshCurrentState()
|
|
|
|
|
if (memberInState(joinNode, List(MemberStatus.up)) &&
|
|
|
|
|
memberInState(myself, List(MemberStatus.Joining, MemberStatus.Up)))
|
|
|
|
|
true
|
|
|
|
|
else {
|
|
|
|
|
cluster join joinNode
|
|
|
|
|
false
|
|
|
|
|
}
|
|
|
|
|
}, max, interval)
|
|
|
|
|
}
|
|
|
|
|
|
2012-05-25 12:10:37 +02:00
|
|
|
/**
|
|
|
|
|
* Assert that the member addresses match the expected addresses in the
|
|
|
|
|
* sort order used by the cluster.
|
|
|
|
|
*/
|
|
|
|
|
def assertMembers(gotMembers: Iterable[Member], expectedAddresses: Address*): Unit = {
|
|
|
|
|
import Member.addressOrdering
|
|
|
|
|
val members = gotMembers.toIndexedSeq
|
2015-01-16 11:09:59 +01:00
|
|
|
members.size should ===(expectedAddresses.length)
|
|
|
|
|
expectedAddresses.sorted.zipWithIndex.foreach { case (a, i) ⇒ members(i).address should ===(a) }
|
2012-05-25 12:10:37 +02:00
|
|
|
}
|
|
|
|
|
|
2013-03-06 16:39:22 +01:00
|
|
|
/**
|
|
|
|
|
* Note that this can only be used for a cluster with all members
|
|
|
|
|
* in Up status, i.e. use `awaitMembersUp` before using this method.
|
|
|
|
|
* The reason for that is that the cluster leader is preferably a
|
|
|
|
|
* member with status Up or Leaving and that information can't
|
|
|
|
|
* be determined from the `RoleName`.
|
|
|
|
|
*/
|
2012-10-30 15:08:41 +01:00
|
|
|
def assertLeader(nodesInCluster: RoleName*): Unit =
|
|
|
|
|
if (nodesInCluster.contains(myself)) assertLeaderIn(nodesInCluster.to[immutable.Seq])
|
2012-06-01 11:37:44 +02:00
|
|
|
|
2012-05-25 12:10:37 +02:00
|
|
|
/**
|
|
|
|
|
* Assert that the cluster has elected the correct leader
|
|
|
|
|
* out of all nodes in the cluster. First
|
|
|
|
|
* member in the cluster ring is expected leader.
|
2013-03-06 16:39:22 +01:00
|
|
|
*
|
|
|
|
|
* Note that this can only be used for a cluster with all members
|
|
|
|
|
* in Up status, i.e. use `awaitMembersUp` before using this method.
|
|
|
|
|
* The reason for that is that the cluster leader is preferably a
|
|
|
|
|
* member with status Up or Leaving and that information can't
|
|
|
|
|
* be determined from the `RoleName`.
|
2012-05-25 12:10:37 +02:00
|
|
|
*/
|
2013-01-17 14:00:01 +01:00
|
|
|
def assertLeaderIn(nodesInCluster: immutable.Seq[RoleName]): Unit =
|
|
|
|
|
if (nodesInCluster.contains(myself)) {
|
2013-12-17 14:25:56 +01:00
|
|
|
nodesInCluster.length should not be (0)
|
2013-01-17 14:00:01 +01:00
|
|
|
val expectedLeader = roleOfLeader(nodesInCluster)
|
|
|
|
|
val leader = clusterView.leader
|
|
|
|
|
val isLeader = leader == Some(clusterView.selfAddress)
|
2016-06-02 14:06:57 +02:00
|
|
|
assert(
|
|
|
|
|
isLeader == isNode(expectedLeader),
|
2013-01-17 14:00:01 +01:00
|
|
|
"expectedLeader [%s], got leader [%s], members [%s]".format(expectedLeader, leader, clusterView.members))
|
2013-12-17 14:25:56 +01:00
|
|
|
clusterView.status should (be(MemberStatus.Up) or be(MemberStatus.Leaving))
|
2013-01-17 14:00:01 +01:00
|
|
|
}
|
2012-05-25 12:10:37 +02:00
|
|
|
|
|
|
|
|
/**
|
2013-03-05 21:05:11 +01:00
|
|
|
* Wait until the expected number of members has status Up has been reached.
|
2012-05-28 11:06:02 +02:00
|
|
|
* Also asserts that nodes in the 'canNotBePartOfMemberRing' are *not* part of the cluster ring.
|
2012-05-25 12:10:37 +02:00
|
|
|
*/
|
2013-03-05 21:05:11 +01:00
|
|
|
def awaitMembersUp(
|
2016-06-02 14:06:57 +02:00
|
|
|
numberOfMembers: Int,
|
|
|
|
|
canNotBePartOfMemberRing: Set[Address] = Set.empty,
|
|
|
|
|
timeout: FiniteDuration = 25.seconds): Unit = {
|
2012-05-30 17:17:09 +02:00
|
|
|
within(timeout) {
|
2012-12-12 11:49:20 +01:00
|
|
|
if (!canNotBePartOfMemberRing.isEmpty) // don't run this on an empty set
|
2013-12-17 14:25:56 +01:00
|
|
|
awaitAssert(canNotBePartOfMemberRing foreach (a ⇒ clusterView.members.map(_.address) should not contain (a)))
|
2015-01-16 11:09:59 +01:00
|
|
|
awaitAssert(clusterView.members.size should ===(numberOfMembers))
|
|
|
|
|
awaitAssert(clusterView.members.map(_.status) should ===(Set(MemberStatus.Up)))
|
2012-12-04 17:07:08 +01:00
|
|
|
// clusterView.leader is updated by LeaderChanged, await that to be updated also
|
|
|
|
|
val expectedLeader = clusterView.members.headOption.map(_.address)
|
2015-01-16 11:09:59 +01:00
|
|
|
awaitAssert(clusterView.leader should ===(expectedLeader))
|
2012-05-30 17:17:09 +02:00
|
|
|
}
|
2012-05-25 12:10:37 +02:00
|
|
|
}
|
2012-05-28 14:15:44 +02:00
|
|
|
|
2013-08-27 15:14:53 +02:00
|
|
|
def awaitAllReachable(): Unit =
|
2015-01-16 11:09:59 +01:00
|
|
|
awaitAssert(clusterView.unreachableMembers should ===(Set.empty))
|
2013-04-23 15:05:27 +02:00
|
|
|
|
2012-06-01 15:15:53 +02:00
|
|
|
/**
|
|
|
|
|
* Wait until the specified nodes have seen the same gossip overview.
|
|
|
|
|
*/
|
2012-08-15 16:47:34 +02:00
|
|
|
def awaitSeenSameState(addresses: Address*): Unit =
|
2015-10-30 14:59:36 +01:00
|
|
|
awaitAssert((addresses.toSet diff clusterView.seenBy) should ===(Set.empty))
|
2012-06-01 15:15:53 +02:00
|
|
|
|
2013-03-06 16:39:22 +01:00
|
|
|
/**
|
|
|
|
|
* Leader according to the address ordering of the roles.
|
|
|
|
|
* Note that this can only be used for a cluster with all members
|
|
|
|
|
* in Up status, i.e. use `awaitMembersUp` before using this method.
|
|
|
|
|
* The reason for that is that the cluster leader is preferably a
|
|
|
|
|
* member with status Up or Leaving and that information can't
|
|
|
|
|
* be determined from the `RoleName`.
|
|
|
|
|
*/
|
2012-10-30 15:08:41 +01:00
|
|
|
def roleOfLeader(nodesInCluster: immutable.Seq[RoleName] = roles): RoleName = {
|
2013-12-17 14:25:56 +01:00
|
|
|
nodesInCluster.length should not be (0)
|
2012-05-25 14:48:00 +02:00
|
|
|
nodesInCluster.sorted.head
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2013-03-06 16:39:22 +01:00
|
|
|
* Sort the roles in the address order used by the cluster node ring.
|
2012-05-25 14:48:00 +02:00
|
|
|
*/
|
|
|
|
|
implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] {
|
|
|
|
|
import Member.addressOrdering
|
2012-06-15 17:32:40 +02:00
|
|
|
def compare(x: RoleName, y: RoleName) = addressOrdering.compare(address(x), address(y))
|
2012-05-25 14:48:00 +02:00
|
|
|
}
|
|
|
|
|
|
2012-06-18 11:16:30 +02:00
|
|
|
def roleName(addr: Address): Option[RoleName] = roles.find(address(_) == addr)
|
2012-06-15 14:37:51 +02:00
|
|
|
|
2012-09-06 21:48:40 +02:00
|
|
|
/**
|
|
|
|
|
* Marks a node as available in the failure detector if
|
|
|
|
|
* [[akka.cluster.FailureDetectorPuppet]] is used as
|
|
|
|
|
* failure detector.
|
|
|
|
|
*/
|
2013-01-29 11:55:33 +01:00
|
|
|
def markNodeAsAvailable(address: Address): Unit =
|
|
|
|
|
failureDetectorPuppet(address) foreach (_.markNodeAsAvailable())
|
2012-09-06 21:48:40 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Marks a node as unavailable in the failure detector if
|
|
|
|
|
* [[akka.cluster.FailureDetectorPuppet]] is used as
|
|
|
|
|
* failure detector.
|
|
|
|
|
*/
|
2013-01-29 11:55:33 +01:00
|
|
|
def markNodeAsUnavailable(address: Address): Unit = {
|
|
|
|
|
if (isFailureDetectorPuppet) {
|
2015-06-02 21:01:00 -07:00
|
|
|
// before marking it as unavailable there should be at least one heartbeat
|
2013-01-29 11:55:33 +01:00
|
|
|
// to create the FailureDetectorPuppet in the FailureDetectorRegistry
|
|
|
|
|
cluster.failureDetector.heartbeat(address)
|
|
|
|
|
failureDetectorPuppet(address) foreach (_.markNodeAsUnavailable())
|
|
|
|
|
}
|
2012-09-06 21:48:40 +02:00
|
|
|
}
|
|
|
|
|
|
2013-01-29 11:55:33 +01:00
|
|
|
private def isFailureDetectorPuppet: Boolean =
|
|
|
|
|
cluster.settings.FailureDetectorImplementationClass == classOf[FailureDetectorPuppet].getName
|
|
|
|
|
|
|
|
|
|
private def failureDetectorPuppet(address: Address): Option[FailureDetectorPuppet] =
|
|
|
|
|
cluster.failureDetector match {
|
|
|
|
|
case reg: DefaultFailureDetectorRegistry[Address] ⇒
|
|
|
|
|
reg.failureDetector(address) collect { case p: FailureDetectorPuppet ⇒ p }
|
|
|
|
|
case _ ⇒ None
|
|
|
|
|
}
|
|
|
|
|
|
2012-05-28 11:06:02 +02:00
|
|
|
}
|
2012-08-28 16:38:05 +02:00
|
|
|
|