2012-06-05 22:16:15 +02:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
package akka.cluster
|
|
|
|
|
|
|
|
|
|
import akka.testkit.AkkaSpec
|
2012-07-04 14:39:27 +02:00
|
|
|
import akka.testkit.ImplicitSender
|
2012-06-05 22:16:15 +02:00
|
|
|
import akka.util.duration._
|
|
|
|
|
import akka.util.Duration
|
|
|
|
|
import akka.actor.ExtendedActorSystem
|
|
|
|
|
import akka.actor.Address
|
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger
|
2012-06-21 10:58:35 +02:00
|
|
|
import akka.remote.RemoteActorRefProvider
|
2012-07-04 14:39:27 +02:00
|
|
|
import InternalClusterAction._
|
2012-06-05 22:16:15 +02:00
|
|
|
|
|
|
|
|
object ClusterSpec {
|
|
|
|
|
val config = """
|
|
|
|
|
akka.cluster {
|
2012-06-21 10:58:35 +02:00
|
|
|
auto-join = off
|
2012-06-05 22:16:15 +02:00
|
|
|
auto-down = off
|
|
|
|
|
periodic-tasks-initial-delay = 120 seconds // turn off scheduled tasks
|
2012-07-04 14:39:27 +02:00
|
|
|
publish-state-interval = 0 s # always, when it happens
|
2012-06-05 22:16:15 +02:00
|
|
|
}
|
|
|
|
|
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|
|
|
|
|
akka.remote.netty.port = 0
|
2012-06-07 13:32:12 +02:00
|
|
|
# akka.loglevel = DEBUG
|
2012-06-05 22:16:15 +02:00
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
case class GossipTo(address: Address)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
2012-07-04 14:39:27 +02:00
|
|
|
class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
|
2012-06-05 22:16:15 +02:00
|
|
|
import ClusterSpec._
|
|
|
|
|
|
2012-06-21 10:58:35 +02:00
|
|
|
val selfAddress = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.address
|
2012-06-05 22:16:15 +02:00
|
|
|
|
2012-06-15 14:45:15 +02:00
|
|
|
val failureDetector = new FailureDetectorPuppet(system)
|
|
|
|
|
|
2012-07-04 14:39:27 +02:00
|
|
|
val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector)
|
2012-07-04 11:37:56 +02:00
|
|
|
|
2012-07-04 14:39:27 +02:00
|
|
|
def leaderActions(): Unit = {
|
|
|
|
|
cluster.clusterCore ! LeaderActionsTick
|
|
|
|
|
awaitPing()
|
2012-06-05 22:16:15 +02:00
|
|
|
}
|
|
|
|
|
|
2012-07-04 14:39:27 +02:00
|
|
|
def awaitPing(): Unit = {
|
|
|
|
|
val ping = Ping()
|
|
|
|
|
cluster.clusterCore ! ping
|
|
|
|
|
expectMsgPF() { case pong @ Pong(`ping`, _) ⇒ pong }
|
2012-06-05 22:16:15 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"A Cluster" must {
|
|
|
|
|
|
2012-07-04 14:39:27 +02:00
|
|
|
"use the address of the remote transport" in {
|
2012-06-21 10:58:35 +02:00
|
|
|
cluster.selfAddress must be(selfAddress)
|
|
|
|
|
}
|
|
|
|
|
|
2012-07-04 14:39:27 +02:00
|
|
|
"initially become singleton cluster when joining itself and reach convergence" in {
|
2012-06-25 21:07:44 +02:00
|
|
|
cluster.isSingletonCluster must be(false) // auto-join = off
|
|
|
|
|
cluster.join(selfAddress)
|
|
|
|
|
awaitCond(cluster.isSingletonCluster)
|
|
|
|
|
cluster.self.address must be(selfAddress)
|
2012-06-05 22:16:15 +02:00
|
|
|
cluster.latestGossip.members.map(_.address) must be(Set(selfAddress))
|
2012-07-04 14:39:27 +02:00
|
|
|
cluster.status must be(MemberStatus.Joining)
|
2012-06-11 14:59:34 +02:00
|
|
|
cluster.convergence.isDefined must be(true)
|
2012-07-04 14:39:27 +02:00
|
|
|
leaderActions()
|
|
|
|
|
cluster.status must be(MemberStatus.Up)
|
2012-06-05 22:16:15 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|