2012-05-25 12:10:37 +02:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
|
|
|
|
*/
|
|
|
|
|
package akka.cluster
|
|
|
|
|
|
2012-07-26 14:47:21 +02:00
|
|
|
import language.implicitConversions
|
|
|
|
|
|
2012-05-25 12:10:37 +02:00
|
|
|
import com.typesafe.config.Config
|
|
|
|
|
import com.typesafe.config.ConfigFactory
|
2012-06-15 14:37:51 +02:00
|
|
|
import akka.actor.{ Address, ExtendedActorSystem }
|
2012-05-25 12:10:37 +02:00
|
|
|
import akka.remote.testconductor.RoleName
|
|
|
|
|
import akka.remote.testkit.MultiNodeSpec
|
2012-05-28 11:06:02 +02:00
|
|
|
import akka.testkit._
|
2012-06-29 13:33:20 +02:00
|
|
|
import scala.concurrent.util.duration._
|
|
|
|
|
import scala.concurrent.util.Duration
|
2012-06-15 13:32:55 +02:00
|
|
|
import org.scalatest.Suite
|
2012-07-26 14:47:21 +02:00
|
|
|
import org.scalatest.exceptions.TestFailedException
|
2012-06-15 17:32:40 +02:00
|
|
|
import java.util.concurrent.ConcurrentHashMap
|
|
|
|
|
import akka.actor.ActorPath
|
|
|
|
|
import akka.actor.RootActorPath
|
2012-05-25 12:10:37 +02:00
|
|
|
|
|
|
|
|
object MultiNodeClusterSpec {
|
|
|
|
|
def clusterConfig: Config = ConfigFactory.parseString("""
|
|
|
|
|
akka.cluster {
|
2012-06-25 21:07:44 +02:00
|
|
|
auto-join = off
|
2012-06-04 23:21:28 +02:00
|
|
|
auto-down = off
|
|
|
|
|
gossip-interval = 200 ms
|
2012-06-11 14:59:34 +02:00
|
|
|
heartbeat-interval = 400 ms
|
2012-06-04 23:21:28 +02:00
|
|
|
leader-actions-interval = 200 ms
|
|
|
|
|
unreachable-nodes-reaper-interval = 200 ms
|
|
|
|
|
periodic-tasks-initial-delay = 300 ms
|
2012-08-15 16:47:34 +02:00
|
|
|
publish-stats-interval = 0 s # always, when it happens
|
2012-05-25 12:10:37 +02:00
|
|
|
}
|
|
|
|
|
akka.test {
|
|
|
|
|
single-expect-default = 5 s
|
|
|
|
|
}
|
|
|
|
|
""")
|
|
|
|
|
}
|
|
|
|
|
|
2012-06-15 13:32:55 +02:00
|
|
|
trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: MultiNodeSpec ⇒
|
2012-05-25 12:10:37 +02:00
|
|
|
|
2012-06-08 09:23:36 +02:00
|
|
|
override def initialParticipants = roles.size
|
|
|
|
|
|
2012-06-15 17:32:40 +02:00
|
|
|
private val cachedAddresses = new ConcurrentHashMap[RoleName, Address]
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Lookup the Address for the role.
|
2012-06-18 11:16:30 +02:00
|
|
|
*
|
|
|
|
|
* Implicit conversion from RoleName to Address.
|
|
|
|
|
*
|
2012-06-15 17:32:40 +02:00
|
|
|
* It is cached, which has the implication that stopping
|
|
|
|
|
* and then restarting a role (jvm) with another address is not
|
|
|
|
|
* supported.
|
|
|
|
|
*/
|
2012-06-18 11:16:30 +02:00
|
|
|
implicit def address(role: RoleName): Address = {
|
2012-06-15 17:32:40 +02:00
|
|
|
cachedAddresses.get(role) match {
|
|
|
|
|
case null ⇒
|
|
|
|
|
val address = node(role).address
|
|
|
|
|
cachedAddresses.put(role, address)
|
|
|
|
|
address
|
|
|
|
|
case address ⇒ address
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-06-15 13:32:55 +02:00
|
|
|
// Cluster tests are written so that if previous step (test method) failed
|
|
|
|
|
// it will most likely not be possible to run next step. This ensures
|
|
|
|
|
// fail fast of steps after the first failure.
|
|
|
|
|
private var failed = false
|
|
|
|
|
override protected def withFixture(test: NoArgTest): Unit = try {
|
|
|
|
|
if (failed) {
|
|
|
|
|
val e = new TestFailedException("Previous step failed", 0)
|
|
|
|
|
// short stack trace
|
|
|
|
|
e.setStackTrace(e.getStackTrace.take(1))
|
|
|
|
|
throw e
|
|
|
|
|
}
|
|
|
|
|
super.withFixture(test)
|
|
|
|
|
} catch {
|
2012-07-26 14:47:21 +02:00
|
|
|
case t: Throwable ⇒
|
2012-06-15 13:32:55 +02:00
|
|
|
failed = true
|
|
|
|
|
throw t
|
|
|
|
|
}
|
|
|
|
|
|
2012-06-21 10:58:35 +02:00
|
|
|
/**
|
|
|
|
|
* Make it possible to override/configure seedNodes from tests without
|
|
|
|
|
* specifying in config. Addresses are unknown before startup time.
|
|
|
|
|
*/
|
|
|
|
|
protected def seedNodes: IndexedSeq[RoleName] = IndexedSeq.empty
|
|
|
|
|
|
2012-06-04 23:21:28 +02:00
|
|
|
/**
|
2012-06-11 14:32:17 +02:00
|
|
|
* The cluster node instance. Needs to be lazily created.
|
2012-06-04 23:21:28 +02:00
|
|
|
*/
|
2012-06-21 10:58:35 +02:00
|
|
|
private lazy val clusterNode = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) {
|
|
|
|
|
override def seedNodes: IndexedSeq[Address] = {
|
|
|
|
|
val testSeedNodes = MultiNodeClusterSpec.this.seedNodes
|
|
|
|
|
if (testSeedNodes.isEmpty) super.seedNodes
|
|
|
|
|
else testSeedNodes map address
|
|
|
|
|
}
|
|
|
|
|
}
|
2012-06-11 14:32:17 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Get the cluster node to use.
|
|
|
|
|
*/
|
|
|
|
|
def cluster: Cluster = clusterNode
|
2012-05-25 12:10:37 +02:00
|
|
|
|
2012-06-04 23:21:28 +02:00
|
|
|
/**
|
2012-06-25 21:07:44 +02:00
|
|
|
* Use this method for the initial startup of the cluster node.
|
2012-06-04 23:21:28 +02:00
|
|
|
*/
|
2012-06-25 21:07:44 +02:00
|
|
|
def startClusterNode(): Unit = {
|
2012-08-15 16:47:34 +02:00
|
|
|
if (cluster.members.isEmpty) {
|
2012-06-25 21:07:44 +02:00
|
|
|
cluster join myself
|
2012-08-15 16:47:34 +02:00
|
|
|
awaitCond(cluster.members.exists(_.address == address(myself)))
|
2012-06-25 21:07:44 +02:00
|
|
|
} else
|
|
|
|
|
cluster.self
|
|
|
|
|
}
|
2012-06-04 23:21:28 +02:00
|
|
|
|
2012-06-05 15:53:30 +02:00
|
|
|
/**
|
|
|
|
|
* Initialize the cluster with the specified member
|
|
|
|
|
* nodes (roles). First node will be started first
|
|
|
|
|
* and others will join the first.
|
|
|
|
|
*/
|
2012-06-11 14:32:17 +02:00
|
|
|
def startCluster(roles: RoleName*): Unit = awaitStartCluster(false, roles.toSeq)
|
2012-06-05 14:13:44 +02:00
|
|
|
|
2012-06-05 15:53:30 +02:00
|
|
|
/**
|
|
|
|
|
* Initialize the cluster of the specified member
|
|
|
|
|
* nodes (roles) and wait until all joined and `Up`.
|
|
|
|
|
* First node will be started first and others will join
|
|
|
|
|
* the first.
|
|
|
|
|
*/
|
2012-06-05 14:13:44 +02:00
|
|
|
def awaitClusterUp(roles: RoleName*): Unit = {
|
|
|
|
|
awaitStartCluster(true, roles.toSeq)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def awaitStartCluster(upConvergence: Boolean = true, roles: Seq[RoleName]): Unit = {
|
|
|
|
|
runOn(roles.head) {
|
|
|
|
|
// make sure that the node-to-join is started before other join
|
|
|
|
|
startClusterNode()
|
|
|
|
|
}
|
2012-06-15 14:39:47 +02:00
|
|
|
enterBarrier(roles.head.name + "-started")
|
2012-06-05 14:13:44 +02:00
|
|
|
if (roles.tail.contains(myself)) {
|
2012-06-15 17:32:40 +02:00
|
|
|
cluster.join(roles.head)
|
2012-06-05 14:13:44 +02:00
|
|
|
}
|
|
|
|
|
if (upConvergence && roles.contains(myself)) {
|
|
|
|
|
awaitUpConvergence(numberOfMembers = roles.length)
|
|
|
|
|
}
|
2012-06-15 14:39:47 +02:00
|
|
|
enterBarrier(roles.map(_.name).mkString("-") + "-joined")
|
2012-06-05 14:13:44 +02:00
|
|
|
}
|
|
|
|
|
|
2012-05-25 12:10:37 +02:00
|
|
|
/**
|
|
|
|
|
* Assert that the member addresses match the expected addresses in the
|
|
|
|
|
* sort order used by the cluster.
|
|
|
|
|
*/
|
|
|
|
|
def assertMembers(gotMembers: Iterable[Member], expectedAddresses: Address*): Unit = {
|
|
|
|
|
import Member.addressOrdering
|
|
|
|
|
val members = gotMembers.toIndexedSeq
|
|
|
|
|
members.size must be(expectedAddresses.length)
|
|
|
|
|
expectedAddresses.sorted.zipWithIndex.foreach { case (a, i) ⇒ members(i).address must be(a) }
|
|
|
|
|
}
|
|
|
|
|
|
2012-06-01 11:37:44 +02:00
|
|
|
def assertLeader(nodesInCluster: RoleName*): Unit = if (nodesInCluster.contains(myself)) {
|
|
|
|
|
assertLeaderIn(nodesInCluster)
|
|
|
|
|
}
|
|
|
|
|
|
2012-05-25 12:10:37 +02:00
|
|
|
/**
|
|
|
|
|
* Assert that the cluster has elected the correct leader
|
|
|
|
|
* out of all nodes in the cluster. First
|
|
|
|
|
* member in the cluster ring is expected leader.
|
|
|
|
|
*/
|
2012-06-01 11:37:44 +02:00
|
|
|
def assertLeaderIn(nodesInCluster: Seq[RoleName]): Unit = if (nodesInCluster.contains(myself)) {
|
2012-05-25 12:10:37 +02:00
|
|
|
nodesInCluster.length must not be (0)
|
2012-05-25 14:48:00 +02:00
|
|
|
val expectedLeader = roleOfLeader(nodesInCluster)
|
2012-05-25 12:10:37 +02:00
|
|
|
cluster.isLeader must be(ifNode(expectedLeader)(true)(false))
|
2012-06-01 11:37:44 +02:00
|
|
|
cluster.status must (be(MemberStatus.Up) or be(MemberStatus.Leaving))
|
2012-05-25 12:10:37 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2012-05-28 11:06:02 +02:00
|
|
|
* Wait until the expected number of members has status Up and convergence has been reached.
|
|
|
|
|
* Also asserts that nodes in the 'canNotBePartOfMemberRing' are *not* part of the cluster ring.
|
2012-05-25 12:10:37 +02:00
|
|
|
*/
|
2012-05-28 11:06:02 +02:00
|
|
|
def awaitUpConvergence(
|
|
|
|
|
numberOfMembers: Int,
|
|
|
|
|
canNotBePartOfMemberRing: Seq[Address] = Seq.empty[Address],
|
2012-05-30 17:17:09 +02:00
|
|
|
timeout: Duration = 20.seconds): Unit = {
|
|
|
|
|
within(timeout) {
|
2012-08-15 16:47:34 +02:00
|
|
|
awaitCond(cluster.members.size == numberOfMembers)
|
|
|
|
|
awaitCond(cluster.members.forall(_.status == MemberStatus.Up))
|
|
|
|
|
awaitCond(cluster.convergence)
|
2012-05-30 17:17:09 +02:00
|
|
|
if (!canNotBePartOfMemberRing.isEmpty) // don't run this on an empty set
|
|
|
|
|
awaitCond(
|
2012-08-15 16:47:34 +02:00
|
|
|
canNotBePartOfMemberRing forall (address ⇒ !(cluster.members exists (_.address == address))))
|
2012-05-30 17:17:09 +02:00
|
|
|
}
|
2012-05-25 12:10:37 +02:00
|
|
|
}
|
2012-05-28 14:15:44 +02:00
|
|
|
|
2012-06-01 15:15:53 +02:00
|
|
|
/**
|
|
|
|
|
* Wait until the specified nodes have seen the same gossip overview.
|
|
|
|
|
*/
|
2012-08-15 16:47:34 +02:00
|
|
|
def awaitSeenSameState(addresses: Address*): Unit =
|
|
|
|
|
awaitCond((addresses.toSet -- cluster.seenBy).isEmpty)
|
2012-06-01 15:15:53 +02:00
|
|
|
|
2012-06-26 09:33:46 +02:00
|
|
|
def roleOfLeader(nodesInCluster: Seq[RoleName] = roles): RoleName = {
|
2012-05-25 14:48:00 +02:00
|
|
|
nodesInCluster.length must not be (0)
|
|
|
|
|
nodesInCluster.sorted.head
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Sort the roles in the order used by the cluster.
|
|
|
|
|
*/
|
|
|
|
|
implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] {
|
|
|
|
|
import Member.addressOrdering
|
2012-06-15 17:32:40 +02:00
|
|
|
def compare(x: RoleName, y: RoleName) = addressOrdering.compare(address(x), address(y))
|
2012-05-25 14:48:00 +02:00
|
|
|
}
|
|
|
|
|
|
2012-06-18 11:16:30 +02:00
|
|
|
def roleName(addr: Address): Option[RoleName] = roles.find(address(_) == addr)
|
2012-06-15 14:37:51 +02:00
|
|
|
|
2012-05-28 11:06:02 +02:00
|
|
|
}
|