Merge branch 'master' into wip-2175-problem-with-fault-injector-throttling-ban

This commit is contained in:
Björn Antonsson 2012-06-05 11:31:43 +02:00
commit 1dd1d98208
39 changed files with 638 additions and 193 deletions

View file

@ -25,13 +25,13 @@ akka {
periodic-tasks-initial-delay = 1s
# how often should the node send out gossip information?
gossip-frequency = 1s
gossip-interval = 1s
# how often should the leader perform maintenance tasks?
leader-actions-frequency = 1s
leader-actions-interval = 1s
# how often should the node move nodes, marked as unreachable by the failure detector, out of the membership ring?
unreachable-nodes-reaper-frequency = 1s
unreachable-nodes-reaper-interval = 1s
# accrual failure detection config
failure-detector {

View file

@ -380,9 +380,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
private val vclockNode = VectorClock.Node(selfAddress.toString)
private val periodicTasksInitialDelay = clusterSettings.PeriodicTasksInitialDelay
private val gossipFrequency = clusterSettings.GossipFrequency
private val leaderActionsFrequency = clusterSettings.LeaderActionsFrequency
private val unreachableNodesReaperFrequency = clusterSettings.UnreachableNodesReaperFrequency
private val gossipInterval = clusterSettings.GossipInterval
private val leaderActionsInterval = clusterSettings.LeaderActionsInterval
private val unreachableNodesReaperInterval = clusterSettings.UnreachableNodesReaperInterval
implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout)
@ -424,17 +424,17 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
// ========================================================
// start periodic gossip to random nodes in cluster
private val gossipCanceller = system.scheduler.schedule(periodicTasksInitialDelay, gossipFrequency) {
private val gossipCanceller = system.scheduler.schedule(periodicTasksInitialDelay, gossipInterval) {
gossip()
}
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
private val failureDetectorReaperCanceller = system.scheduler.schedule(periodicTasksInitialDelay, unreachableNodesReaperFrequency) {
private val failureDetectorReaperCanceller = system.scheduler.schedule(periodicTasksInitialDelay, unreachableNodesReaperInterval) {
reapUnreachableMembers()
}
// start periodic leader action management (only applies for the current leader)
private val leaderActionsCanceller = system.scheduler.schedule(periodicTasksInitialDelay, leaderActionsFrequency) {
private val leaderActionsCanceller = system.scheduler.schedule(periodicTasksInitialDelay, leaderActionsInterval) {
leaderActions()
}

View file

@ -20,9 +20,9 @@ class ClusterSettings(val config: Config, val systemName: String) {
case AddressFromURIString(addr) Some(addr)
}
val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS)
val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip-frequency"), MILLISECONDS)
val LeaderActionsFrequency = Duration(getMilliseconds("akka.cluster.leader-actions-frequency"), MILLISECONDS)
val UnreachableNodesReaperFrequency = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-frequency"), MILLISECONDS)
val GossipInterval = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS)
val LeaderActionsInterval = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS)
val UnreachableNodesReaperInterval = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS)
val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons")
val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes")
val AutoDown = getBoolean("akka.cluster.auto-down")

View file

@ -26,6 +26,7 @@ class ClientDowningNodeThatIsUnreachableMultiJvmNode4 extends ClientDowningNodeT
class ClientDowningNodeThatIsUnreachableSpec
extends MultiNodeSpec(ClientDowningNodeThatIsUnreachableMultiJvmSpec)
with MultiNodeClusterSpec {
import ClientDowningNodeThatIsUnreachableMultiJvmSpec._
override def initialParticipants = 4
@ -34,7 +35,7 @@ class ClientDowningNodeThatIsUnreachableSpec
"be able to DOWN a node that is UNREACHABLE (killed)" taggedAs LongRunningTest in {
runOn(first) {
cluster.self
startClusterNode()
awaitUpConvergence(numberOfMembers = 4)
val thirdAddress = node(third).address

View file

@ -26,6 +26,7 @@ class ClientDowningNodeThatIsUpMultiJvmNode4 extends ClientDowningNodeThatIsUpSp
class ClientDowningNodeThatIsUpSpec
extends MultiNodeSpec(ClientDowningNodeThatIsUpMultiJvmSpec)
with MultiNodeClusterSpec {
import ClientDowningNodeThatIsUpMultiJvmSpec._
override def initialParticipants = 4
@ -34,7 +35,7 @@ class ClientDowningNodeThatIsUpSpec
"be able to DOWN a node that is UP (healthy and available)" taggedAs LongRunningTest in {
runOn(first) {
cluster.self
startClusterNode()
awaitUpConvergence(numberOfMembers = 4)
val thirdAddress = node(third).address

View file

@ -15,7 +15,7 @@ object GossipingAccrualFailureDetectorMultiJvmSpec extends MultiNodeConfig {
val third = role("third")
commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString("akka.cluster.failure-detector.threshold=4")).
withFallback(ConfigFactory.parseString("akka.cluster.failure-detector.threshold = 4")).
withFallback(MultiNodeClusterSpec.clusterConfig))
}
@ -38,7 +38,7 @@ abstract class GossipingAccrualFailureDetectorSpec extends MultiNodeSpec(Gossipi
"receive gossip heartbeats so that all member nodes in the cluster are marked 'available'" taggedAs LongRunningTest in {
// make sure that the node-to-join is started before other join
runOn(first) {
cluster.self
startClusterNode()
}
testConductor.enter("first-started")
@ -68,5 +68,4 @@ abstract class GossipingAccrualFailureDetectorSpec extends MultiNodeSpec(Gossipi
testConductor.enter("after-2")
}
}
}

View file

@ -27,7 +27,10 @@ class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec
class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec
class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec
abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender {
abstract class JoinTwoClustersSpec
extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec)
with MultiNodeClusterSpec {
import JoinTwoClustersMultiJvmSpec._
override def initialParticipants = 6
@ -41,7 +44,7 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm
"be able to 'elect' a single leader after joining (A -> B)" taggedAs LongRunningTest in {
// make sure that the node-to-join is started before other join
runOn(a1, b1, c1) {
cluster.self
startClusterNode()
}
testConductor.enter("first-started")
@ -75,7 +78,6 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm
assertLeader(c1, c2)
testConductor.enter("four-members")
}
"be able to 'elect' a single leader after joining (C -> A + B)" taggedAs LongRunningTest in {
@ -91,5 +93,4 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm
testConductor.enter("six-members")
}
}
}

View file

@ -34,6 +34,7 @@ class LeaderDowningNodeThatIsUnreachableMultiJvmNode4 extends LeaderDowningNodeT
class LeaderDowningNodeThatIsUnreachableSpec
extends MultiNodeSpec(LeaderDowningNodeThatIsUnreachableMultiJvmSpec)
with MultiNodeClusterSpec {
import LeaderDowningNodeThatIsUnreachableMultiJvmSpec._
override def initialParticipants = 4
@ -42,7 +43,7 @@ class LeaderDowningNodeThatIsUnreachableSpec
"be able to DOWN a 'last' node that is UNREACHABLE" taggedAs LongRunningTest in {
runOn(first) {
cluster.self
startClusterNode()
awaitUpConvergence(numberOfMembers = 4)
val fourthAddress = node(fourth).address

View file

@ -17,7 +17,6 @@ object LeaderElectionMultiJvmSpec extends MultiNodeConfig {
val fourth = role("fourth")
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
}
class LeaderElectionMultiJvmNode1 extends LeaderElectionSpec
@ -26,7 +25,10 @@ class LeaderElectionMultiJvmNode3 extends LeaderElectionSpec
class LeaderElectionMultiJvmNode4 extends LeaderElectionSpec
class LeaderElectionMultiJvmNode5 extends LeaderElectionSpec
abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSpec) with MultiNodeClusterSpec {
abstract class LeaderElectionSpec
extends MultiNodeSpec(LeaderElectionMultiJvmSpec)
with MultiNodeClusterSpec {
import LeaderElectionMultiJvmSpec._
override def initialParticipants = 5
@ -41,7 +43,7 @@ abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSp
"be able to 'elect' a single leader" taggedAs LongRunningTest in {
// make sure that the node-to-join is started before other join
runOn(first) {
cluster.self
startClusterNode()
}
testConductor.enter("first-started")
@ -91,7 +93,6 @@ abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSp
testConductor.enter("completed")
}
}
"be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in {
@ -102,5 +103,4 @@ abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSp
shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 1)
}
}
}

View file

@ -0,0 +1,84 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import scala.collection.immutable.SortedSet
import org.scalatest.BeforeAndAfter
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.util.duration._
object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(
debugConfig(on = false)
.withFallback(ConfigFactory.parseString("""
akka.cluster {
leader-actions-interval = 5 s # increase the leader action task interval
unreachable-nodes-reaper-interval = 30 s # turn "off" reaping to unreachable node set
}
""")
.withFallback(MultiNodeClusterSpec.clusterConfig)))
}
class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListenerExitingSpec
class MembershipChangeListenerExitingMultiJvmNode2 extends MembershipChangeListenerExitingSpec
class MembershipChangeListenerExitingMultiJvmNode3 extends MembershipChangeListenerExitingSpec
abstract class MembershipChangeListenerExitingSpec
extends MultiNodeSpec(MembershipChangeListenerExitingMultiJvmSpec)
with MultiNodeClusterSpec {
import MembershipChangeListenerExitingMultiJvmSpec._
override def initialParticipants = 3
lazy val firstAddress = node(first).address
lazy val secondAddress = node(second).address
lazy val thirdAddress = node(third).address
"A registered MembershipChangeListener" must {
"be notified when new node is EXITING" taggedAs LongRunningTest in {
runOn(first) {
startClusterNode()
}
testConductor.enter("first-started")
runOn(second, third) {
cluster.join(firstAddress)
}
awaitUpConvergence(numberOfMembers = 3)
testConductor.enter("rest-started")
runOn(first) {
testConductor.enter("registered-listener")
cluster.leave(secondAddress)
}
runOn(second) {
testConductor.enter("registered-listener")
}
runOn(third) {
val exitingLatch = TestLatch()
cluster.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
if (members.size == 3 && members.exists( m => m.address == secondAddress && m.status == MemberStatus.Exiting))
exitingLatch.countDown()
}
})
testConductor.enter("registered-listener")
exitingLatch.await
}
testConductor.enter("finished")
}
}
}

View file

@ -0,0 +1,71 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import scala.collection.immutable.SortedSet
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfter
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.util.duration._
object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(
debugConfig(on = false)
.withFallback(ConfigFactory.parseString("""
akka.cluster {
leader-actions-interval = 5 s # increase the leader action task interval to allow time checking for JOIN before leader moves it to UP
}
""")
.withFallback(MultiNodeClusterSpec.clusterConfig)))
}
class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec
class MembershipChangeListenerJoinMultiJvmNode2 extends MembershipChangeListenerJoinSpec
abstract class MembershipChangeListenerJoinSpec
extends MultiNodeSpec(MembershipChangeListenerJoinMultiJvmSpec)
with MultiNodeClusterSpec {
import MembershipChangeListenerJoinMultiJvmSpec._
override def initialParticipants = 2
lazy val firstAddress = node(first).address
lazy val secondAddress = node(second).address
"A registered MembershipChangeListener" must {
"be notified when new node is JOINING" taggedAs LongRunningTest in {
runOn(first) {
startClusterNode()
}
runOn(second) {
testConductor.enter("registered-listener")
cluster.join(firstAddress)
}
runOn(first) {
val joinLatch = TestLatch()
cluster.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
if (members.size == 2 && members.exists(_.status == MemberStatus.Joining)) // second node is not part of node ring anymore
joinLatch.countDown()
}
})
testConductor.enter("registered-listener")
joinLatch.await
cluster.convergence.isDefined must be(true)
}
testConductor.enter("after")
}
}
}

View file

@ -0,0 +1,81 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import scala.collection.immutable.SortedSet
import org.scalatest.BeforeAndAfter
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(
debugConfig(on = false)
.withFallback(ConfigFactory.parseString("""
akka.cluster.leader-actions-interval = 5 s
akka.cluster.unreachable-nodes-reaper-interval = 30 s
"""))
.withFallback(MultiNodeClusterSpec.clusterConfig))
}
class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListenerLeavingSpec
class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec
class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec
abstract class MembershipChangeListenerLeavingSpec
extends MultiNodeSpec(MembershipChangeListenerLeavingMultiJvmSpec)
with MultiNodeClusterSpec {
import MembershipChangeListenerLeavingMultiJvmSpec._
override def initialParticipants = 3
lazy val firstAddress = node(first).address
lazy val secondAddress = node(second).address
lazy val thirdAddress = node(third).address
"A registered MembershipChangeListener" must {
"be notified when new node is LEAVING" taggedAs LongRunningTest in {
runOn(first) {
startClusterNode()
}
testConductor.enter("first-started")
runOn(second, third) {
cluster.join(firstAddress)
}
awaitUpConvergence(numberOfMembers = 3)
testConductor.enter("rest-started")
runOn(first) {
testConductor.enter("registered-listener")
cluster.leave(secondAddress)
}
runOn(second) {
testConductor.enter("registered-listener")
}
runOn(third) {
val latch = TestLatch()
cluster.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
if (members.size == 3 && members.exists( m => m.address == secondAddress && m.status == MemberStatus.Leaving))
latch.countDown()
}
})
testConductor.enter("registered-listener")
latch.await
}
testConductor.enter("finished")
}
}
}

View file

@ -15,7 +15,6 @@ object MembershipChangeListenerMultiJvmSpec extends MultiNodeConfig {
val third = role("third")
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
}
class MembershipChangeListenerMultiJvmNode1 extends MembershipChangeListenerSpec
@ -55,7 +54,6 @@ abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChan
}
testConductor.enter("after-1")
}
"(when three systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in {
@ -75,8 +73,6 @@ abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChan
cluster.convergence.isDefined must be(true)
testConductor.enter("after-2")
}
}
}

View file

@ -0,0 +1,64 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import scala.collection.immutable.SortedSet
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfter
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.util.duration._
object MembershipChangeListenerUpMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
}
class MembershipChangeListenerUpMultiJvmNode1 extends MembershipChangeListenerUpSpec
class MembershipChangeListenerUpMultiJvmNode2 extends MembershipChangeListenerUpSpec
abstract class MembershipChangeListenerUpSpec
extends MultiNodeSpec(MembershipChangeListenerUpMultiJvmSpec)
with MultiNodeClusterSpec {
import MembershipChangeListenerUpMultiJvmSpec._
override def initialParticipants = 2
lazy val firstAddress = node(first).address
lazy val secondAddress = node(second).address
"A registered MembershipChangeListener" must {
"be notified when new node is marked as UP by the leader" taggedAs LongRunningTest in {
runOn(first) {
startClusterNode()
}
runOn(second) {
testConductor.enter("registered-listener")
cluster.join(firstAddress)
}
runOn(first) {
val upLatch = TestLatch()
cluster.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
if (members.size == 2 && members.forall(_.status == MemberStatus.Up))
upLatch.countDown()
}
})
testConductor.enter("registered-listener")
upLatch.await
awaitUpConvergence(numberOfMembers = 2)
}
testConductor.enter("after")
}
}
}

View file

@ -15,11 +15,11 @@ import akka.util.Duration
object MultiNodeClusterSpec {
def clusterConfig: Config = ConfigFactory.parseString("""
akka.cluster {
auto-down = off
gossip-frequency = 200 ms
leader-actions-frequency = 200 ms
unreachable-nodes-reaper-frequency = 200 ms
periodic-tasks-initial-delay = 300 ms
auto-down = off
gossip-interval = 200 ms
leader-actions-interval = 200 ms
unreachable-nodes-reaper-interval = 200 ms
periodic-tasks-initial-delay = 300 ms
}
akka.test {
single-expect-default = 5 s
@ -29,8 +29,16 @@ object MultiNodeClusterSpec {
trait MultiNodeClusterSpec { self: MultiNodeSpec
/**
* Create a cluster node using 'Cluster(system)'.
*/
def cluster: Cluster = Cluster(system)
/**
* Use this method instead of 'cluster.self'.
*/
def startClusterNode(): Unit = cluster.self
/**
* Assert that the member addresses match the expected addresses in the
* sort order used by the cluster.

View file

@ -0,0 +1,57 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfter
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.util.duration._
object NodeJoinMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(
debugConfig(on = false)
.withFallback(ConfigFactory.parseString("""
akka.cluster {
leader-actions-interval = 5 s # increase the leader action task interval
}
""")
.withFallback(MultiNodeClusterSpec.clusterConfig)))
}
class NodeJoinMultiJvmNode1 extends NodeJoinSpec
class NodeJoinMultiJvmNode2 extends NodeJoinSpec
abstract class NodeJoinSpec
extends MultiNodeSpec(NodeJoinMultiJvmSpec)
with MultiNodeClusterSpec {
import NodeJoinMultiJvmSpec._
override def initialParticipants = 2
lazy val firstAddress = node(first).address
lazy val secondAddress = node(second).address
"A cluster node" must {
"join another cluster and get status JOINING - when sending a 'Join' command" taggedAs LongRunningTest in {
runOn(first) {
startClusterNode()
}
runOn(second) {
cluster.join(firstAddress)
}
awaitCond(cluster.latestGossip.members.exists { member member.address == secondAddress && member.status == MemberStatus.Joining })
testConductor.enter("after")
}
}
}

View file

@ -22,8 +22,10 @@ class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndEx
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode2 extends NodeLeavingAndExitingAndBeingRemovedSpec
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode3 extends NodeLeavingAndExitingAndBeingRemovedSpec
abstract class NodeLeavingAndExitingAndBeingRemovedSpec extends MultiNodeSpec(NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec)
abstract class NodeLeavingAndExitingAndBeingRemovedSpec
extends MultiNodeSpec(NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec)
with MultiNodeClusterSpec {
import NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec._
override def initialParticipants = 3
@ -39,7 +41,7 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec extends MultiNodeSpec(No
"be moved to EXITING and then to REMOVED by the reaper" taggedAs LongRunningTest in {
runOn(first) {
cluster.self
startClusterNode()
}
testConductor.enter("first-started")

View file

@ -19,8 +19,8 @@ object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig {
debugConfig(on = false)
.withFallback(ConfigFactory.parseString("""
akka.cluster {
leader-actions-frequency = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state
unreachable-nodes-reaper-frequency = 30 s # turn "off" reaping to unreachable node set
leader-actions-interval = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state
unreachable-nodes-reaper-interval = 30 s
}
""")
.withFallback(MultiNodeClusterSpec.clusterConfig)))
@ -30,8 +30,10 @@ class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec
class NodeLeavingAndExitingMultiJvmNode2 extends NodeLeavingAndExitingSpec
class NodeLeavingAndExitingMultiJvmNode3 extends NodeLeavingAndExitingSpec
abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExitingMultiJvmSpec)
abstract class NodeLeavingAndExitingSpec
extends MultiNodeSpec(NodeLeavingAndExitingMultiJvmSpec)
with MultiNodeClusterSpec {
import NodeLeavingAndExitingMultiJvmSpec._
override def initialParticipants = 3
@ -45,7 +47,7 @@ abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExi
"be moved to EXITING by the leader" taggedAs LongRunningTest in {
runOn(first) {
cluster.self
startClusterNode()
}
testConductor.enter("first-started")
@ -63,7 +65,7 @@ abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExi
runOn(first, third) {
// 1. Verify that 'second' node is set to LEAVING
// We have set the 'leader-actions-frequency' to 5 seconds to make sure that we get a
// We have set the 'leader-actions-interval' to 5 seconds to make sure that we get a
// chance to test the LEAVING state before the leader moves the node to EXITING
awaitCond(cluster.latestGossip.members.exists(_.status == MemberStatus.Leaving)) // wait on LEAVING
val hasLeft = cluster.latestGossip.members.find(_.status == MemberStatus.Leaving) // verify node that left

View file

@ -17,7 +17,7 @@ object NodeLeavingMultiJvmSpec extends MultiNodeConfig {
commonConfig(
debugConfig(on = false)
.withFallback(ConfigFactory.parseString("""
akka.cluster.unreachable-nodes-reaper-frequency = 30 s # turn "off" reaping to unreachable node set
akka.cluster.unreachable-nodes-reaper-frequency = 30 s
"""))
.withFallback(MultiNodeClusterSpec.clusterConfig))
}
@ -41,7 +41,7 @@ abstract class NodeLeavingSpec extends MultiNodeSpec(NodeLeavingMultiJvmSpec)
"be marked as LEAVING in the converged membership table" taggedAs LongRunningTest in {
runOn(first) {
cluster.self
startClusterNode()
}
testConductor.enter("first-started")

View file

@ -14,14 +14,16 @@ object NodeMembershipMultiJvmSpec extends MultiNodeConfig {
val third = role("third")
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
}
class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec
class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec
class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec
abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) with MultiNodeClusterSpec {
abstract class NodeMembershipSpec
extends MultiNodeSpec(NodeMembershipMultiJvmSpec)
with MultiNodeClusterSpec {
import NodeMembershipMultiJvmSpec._
override def initialParticipants = 3
@ -36,7 +38,7 @@ abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSp
// make sure that the node-to-join is started before other join
runOn(first) {
cluster.self
startClusterNode()
}
testConductor.enter("first-started")
@ -51,7 +53,6 @@ abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSp
}
testConductor.enter("after-1")
}
"(when three nodes) start gossiping to each other so that all nodes gets the same gossip info" taggedAs LongRunningTest in {
@ -68,8 +69,6 @@ abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSp
awaitCond(cluster.convergence.isDefined)
testConductor.enter("after-2")
}
}
}

View file

@ -37,7 +37,7 @@ abstract class NodeShutdownSpec extends MultiNodeSpec(NodeShutdownMultiJvmSpec)
"not be singleton cluster when joined" taggedAs LongRunningTest in {
// make sure that the node-to-join is started before other join
runOn(first) {
cluster.self
startClusterNode()
}
testConductor.enter("first-started")
@ -63,5 +63,4 @@ abstract class NodeShutdownSpec extends MultiNodeSpec(NodeShutdownMultiJvmSpec)
testConductor.enter("after-2")
}
}
}

View file

@ -1,61 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
object NodeStartupMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
}
class NodeStartupMultiJvmNode1 extends NodeStartupSpec
class NodeStartupMultiJvmNode2 extends NodeStartupSpec
abstract class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) with MultiNodeClusterSpec {
import NodeStartupMultiJvmSpec._
override def initialParticipants = 2
lazy val firstAddress = node(first).address
lazy val secondAddress = node(second).address
"A first cluster node with a 'node-to-join' config set to empty string (singleton cluster)" must {
"be a singleton cluster when started up" taggedAs LongRunningTest in {
runOn(first) {
awaitCond(cluster.isSingletonCluster)
awaitUpConvergence(numberOfMembers = 1)
assertLeader(first)
}
testConductor.enter("after-1")
}
}
"A second cluster node" must {
"join the other node cluster when sending a Join command" taggedAs LongRunningTest in {
runOn(second) {
cluster.join(firstAddress)
}
awaitCond {
cluster.latestGossip.members.exists { member
member.address == secondAddress && member.status == MemberStatus.Up
}
}
cluster.latestGossip.members.size must be(2)
awaitCond(cluster.convergence.isDefined)
assertLeader(first, second)
testConductor.enter("after-2")
}
}
}

View file

@ -0,0 +1,50 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfter
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.util.duration._
object NodeUpMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
}
class NodeUpMultiJvmNode1 extends NodeUpSpec
class NodeUpMultiJvmNode2 extends NodeUpSpec
abstract class NodeUpSpec
extends MultiNodeSpec(NodeUpMultiJvmSpec)
with MultiNodeClusterSpec {
import NodeUpMultiJvmSpec._
override def initialParticipants = 2
lazy val firstAddress = node(first).address
lazy val secondAddress = node(second).address
"A cluster node that is joining another cluster" must {
"be moved to UP by the leader after a convergence" taggedAs LongRunningTest in {
runOn(first) {
startClusterNode()
}
runOn(second) {
cluster.join(firstAddress)
}
awaitUpConvergence(numberOfMembers = 2)
testConductor.enter("after")
}
}
}

View file

@ -20,9 +20,9 @@ class ClusterConfigSpec extends AkkaSpec {
FailureDetectorMaxSampleSize must be(1000)
NodeToJoin must be(None)
PeriodicTasksInitialDelay must be(1 seconds)
GossipFrequency must be(1 second)
LeaderActionsFrequency must be(1 second)
UnreachableNodesReaperFrequency must be(1 second)
GossipInterval must be(1 second)
LeaderActionsInterval must be(1 second)
UnreachableNodesReaperInterval must be(1 second)
NrOfGossipDaemons must be(4)
NrOfDeputyNodes must be(3)
AutoDown must be(true)

View file

@ -43,7 +43,7 @@ Step Description
9, 10, 11 and tells the ``Counter`` that there is no ``Storage``.
12 The ``CounterService`` schedules a ``Reconnect`` message to itself.
13, 14 When it receives the ``Reconnect`` message it creates a new ``Storage`` ...
15, 16 and tells the the ``Counter`` to use the new ``Storage``
15, 16 and tells the ``Counter`` to use the new ``Storage``
=========== ==================================================================================
Full Source Code of the Fault Tolerance Sample (Java)

View file

@ -211,7 +211,7 @@ the first case and ``LoggerFactory.getLogger(String s)`` in the second).
.. note::
Beware that the the actor systems name is appended to a :class:`String` log
Beware that the actor systems name is appended to a :class:`String` log
source if the LoggingAdapter was created giving an :class:`ActorSystem` to
the factory. If this is not intended, give a :class:`LoggingBus` instead as
shown below:

View file

@ -92,6 +92,14 @@ As you can see from the example above the following pattern is used to find an `
akka://<actorsystemname>@<hostname>:<port>/<actor path>
.. note::
In order to ensure serializability of ``Props`` when passing constructor
arguments to the actor being created, do not make the factory a non-static
inner class: this will inherently capture a reference to its enclosing
object, which in most cases is not serializable. It is best to make a static
inner class which implements :class:`UntypedActorFactory`.
Programmatic Remote Deployment
------------------------------

View file

@ -586,7 +586,7 @@ What happens to the Message
---------------------------
If an exception is thrown while a message is being processed (so taken of his
mailbox and handed over the the receive), then this message will be lost. It is
mailbox and handed over to the receive), then this message will be lost. It is
important to understand that it is not put back on the mailbox. So if you want
to retry processing of a message, you need to deal with it yourself by catching
the exception and retry your flow. Make sure that you put a bound on the number

View file

@ -651,7 +651,7 @@ What happens to the Message
---------------------------
If an exception is thrown while a message is being processed (so taken of his
mailbox and handed over the the receive), then this message will be lost. It is
mailbox and handed over to the receive), then this message will be lost. It is
important to understand that it is not put back on the mailbox. So if you want
to retry processing of a message, you need to deal with it yourself by catching
the exception and retry your flow. Make sure that you put a bound on the number

View file

@ -14,6 +14,8 @@ import akka.dispatch.Futures
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
import akka.testkit.ImplicitSender
import akka.util.NonFatal
object TestkitDocSpec {
case object Say42
case object Unknown
@ -208,7 +210,7 @@ class TestkitDocSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
val probe = TestProbe()
val future = probe.ref ? "hello"
probe.expectMsg(0 millis, "hello") // TestActor runs on CallingThreadDispatcher
probe.sender ! "world"
probe.reply("world")
assert(future.isCompleted && future.value == Some(Right("world")))
//#test-probe-reply
}
@ -252,4 +254,22 @@ class TestkitDocSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
//#event-filter
}
"demonstrate TestKitBase" in {
//#test-kit-base
import akka.testkit.TestKitBase
class MyTest extends TestKitBase {
implicit lazy val system = ActorSystem()
//#put-your-test-code-here
val probe = TestProbe()
probe.send(testActor, "hello")
try expectMsg("hello") catch { case NonFatal(e) system.shutdown(); throw e }
//#put-your-test-code-here
system.shutdown()
}
//#test-kit-base
}
}

View file

@ -45,7 +45,7 @@ Step Description
9, 10, 11 and tells the ``Counter`` that there is no ``Storage``.
12 The ``CounterService`` schedules a ``Reconnect`` message to itself.
13, 14 When it receives the ``Reconnect`` message it creates a new ``Storage`` ...
15, 16 and tells the the ``Counter`` to use the new ``Storage``
15, 16 and tells the ``Counter`` to use the new ``Storage``
=========== ==================================================================================
Full Source Code of the Fault Tolerance Sample (Scala)

View file

@ -253,7 +253,7 @@ the first case and ``LoggerFactory.getLogger(s: String)`` in the second).
.. note::
Beware that the the actor systems name is appended to a :class:`String` log
Beware that the actor systems name is appended to a :class:`String` log
source if the LoggingAdapter was created giving an :class:`ActorSystem` to
the factory. If this is not intended, give a :class:`LoggingBus` instead as
shown below:

View file

@ -105,6 +105,14 @@ Once you have configured the properties above you would do the following in code
``SampleActor`` has to be available to the runtimes using it, i.e. the classloader of the
actor systems has to have a JAR containing the class.
.. note::
In order to ensure serializability of ``Props`` when passing constructor
arguments to the actor being created, do not make the factory an inner class:
this will inherently capture a reference to its enclosing object, which in
most cases is not serializable. It is best to create a factory method in the
companion object of the actors class.
Programmatic Remote Deployment
------------------------------

View file

@ -194,10 +194,10 @@ is a whole set of examination methods, e.g. receiving all consecutive messages
matching certain criteria, receiving a whole sequence of fixed messages or
classes, receiving nothing for some time, etc.
The ActorSystem passed in to the constructor of TestKit is accessible with
the the :obj:`system` member.
Remember to shut down the actor system after the test is finished (also in case
of failure) so that all actors—including the test actor—are stopped.
The ActorSystem passed in to the constructor of TestKit is accessible via the
:obj:`system` member. Remember to shut down the actor system after the test is
finished (also in case of failure) so that all actors—including the test
actor—are stopped.
Built-In Assertions
-------------------
@ -671,6 +671,25 @@ This section contains a collection of known gotchas with some other frameworks,
which is by no means exhaustive and does not imply endorsement or special
support.
When you need it to be a trait
------------------------------
If for some reason it is a problem to inherit from :class:`TestKit` due to it
being a concrete class instead of a trait, theres :class:`TestKitBase`:
.. includecode:: code/docs/testkit/TestkitDocSpec.scala
:include: test-kit-base
:exclude: put-your-test-code-here
The ``implicit lazy val system`` must be declared exactly like that (you can of
course pass arguments to the actor system factory as needed) because trait
:class:`TestKitBase` needs the system during its construction.
.. warning::
Use of the trait is discouraged because of potential issues with binary
backwards compatibility in the future, use at own risk.
Specs2
------

View file

@ -3,23 +3,16 @@
*/
package akka.remote.testkit
import akka.testkit.AkkaSpec
import akka.actor.{ ActorSystem, ExtendedActorSystem }
import akka.remote.testconductor.TestConductor
import java.net.InetAddress
import java.net.InetSocketAddress
import akka.remote.testconductor.TestConductorExt
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.dispatch.Await.Awaitable
import com.typesafe.config.{ ConfigObject, ConfigFactory, Config }
import akka.actor.{ RootActorPath, Deploy, ActorPath, ActorSystem, ExtendedActorSystem }
import akka.dispatch.Await
import akka.util.Duration
import akka.util.NonFatal
import akka.actor.ActorPath
import akka.actor.RootActorPath
import akka.remote.testconductor.RoleName
import akka.actor.Deploy
import com.typesafe.config.ConfigObject
import akka.dispatch.Await.Awaitable
import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName }
import akka.testkit.AkkaSpec
import akka.util.{ NonFatal, Duration }
/**
* Configure the role names and participants of the test, including configuration settings.
@ -144,7 +137,7 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, roles:
import MultiNodeSpec._
def this(config: MultiNodeConfig) =
this(config.myself, ActorSystem(AkkaSpec.getCallerName, config.config), config.roles, config.deployments)
this(config.myself, ActorSystem(AkkaSpec.getCallerName(classOf[MultiNodeSpec]), config.config), config.roles, config.deployments)
/*
* Test Class Interface
@ -252,4 +245,4 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, roles:
// useful to see which jvm is running which role
log.info("Role [{}] started", myself.name)
}
}

View file

@ -56,7 +56,17 @@ class TestActorRef[T <: Actor](
* thrown will be available to you, while still being able to use
* become/unbecome.
*/
def receive(o: Any): Unit = underlying.receiveMessage(o)
def receive(o: Any): Unit = receive(o, underlying.system.deadLetters)
/**
* Directly inject messages into actor receive behavior. Any exceptions
* thrown will be available to you, while still being able to use
* become/unbecome.
*/
def receive(o: Any, sender: ActorRef): Unit = try {
underlying.currentMessage = Envelope(o, if (sender eq null) underlying.system.deadLetters else sender)(underlying.system)
underlying.receiveMessage(o)
} finally underlying.currentMessage = null
/**
* Retrieve reference to the underlying actor, where the static type matches the factory used inside the

View file

@ -62,50 +62,28 @@ class TestActor(queue: BlockingDeque[TestActor.Message]) extends Actor {
}
/**
* Test kit for testing actors. Inheriting from this trait enables reception of
* replies from actors, which are queued by an internal actor and can be
* examined using the `expectMsg...` methods. Assertions and bounds concerning
* timing are available in the form of `within` blocks.
* Implementation trait behind the [[akka.testkit.TestKit]] class: you may use
* this if inheriting from a concrete class is not possible.
*
* <pre>
* class Test extends TestKit(ActorSystem()) {
* try {
* <b>Use of the trait is discouraged because of potential issues with binary
* backwards compatibility in the future, use at own risk.</b>
*
* val test = system.actorOf(Props[SomeActor]
* This trait requires the concrete class mixing it in to provide an
* [[akka.actor.ActorSystem]] which is available before this traitss
* constructor is run. The recommended way is this:
*
* within (1 second) {
* test ! SomeWork
* expectMsg(Result1) // bounded to 1 second
* expectMsg(Result2) // bounded to the remainder of the 1 second
* }
*
* } finally {
* system.shutdown()
* }
* {{{
* class MyTest extends TestKitBase {
* implicit lazy val system = ActorSystem() // may add arguments here
* ...
* }
* </pre>
*
* Beware of two points:
*
* - the ActorSystem passed into the constructor needs to be shutdown,
* otherwise thread pools and memory will be leaked
* - this trait is not thread-safe (only one actor with one queue, one stack
* of `within` blocks); it is expected that the code is executed from a
* constructor as shown above, which makes this a non-issue, otherwise take
* care not to run tests within a single test class instance in parallel.
*
* It should be noted that for CI servers and the like all maximum Durations
* are scaled using their Duration.dilated method, which uses the
* TestKitExtension.Settings.TestTimeFactor settable via akka.conf entry "akka.test.timefactor".
*
* @author Roland Kuhn
* @since 1.1
* }}}
*/
class TestKit(_system: ActorSystem) {
trait TestKitBase {
import TestActor.{ Message, RealMessage, NullMessage }
implicit val system = _system
implicit val system: ActorSystem
val testKitSettings = TestKitExtension(system)
private val queue = new LinkedBlockingDeque[Message]()
@ -579,6 +557,48 @@ class TestKit(_system: ActorSystem) {
private def format(u: TimeUnit, d: Duration) = "%.3f %s".format(d.toUnit(u), u.toString.toLowerCase)
}
/**
* Test kit for testing actors. Inheriting from this trait enables reception of
* replies from actors, which are queued by an internal actor and can be
* examined using the `expectMsg...` methods. Assertions and bounds concerning
* timing are available in the form of `within` blocks.
*
* <pre>
* class Test extends TestKit(ActorSystem()) {
* try {
*
* val test = system.actorOf(Props[SomeActor]
*
* within (1 second) {
* test ! SomeWork
* expectMsg(Result1) // bounded to 1 second
* expectMsg(Result2) // bounded to the remainder of the 1 second
* }
*
* } finally {
* system.shutdown()
* }
* }
* </pre>
*
* Beware of two points:
*
* - the ActorSystem passed into the constructor needs to be shutdown,
* otherwise thread pools and memory will be leaked
* - this trait is not thread-safe (only one actor with one queue, one stack
* of `within` blocks); it is expected that the code is executed from a
* constructor as shown above, which makes this a non-issue, otherwise take
* care not to run tests within a single test class instance in parallel.
*
* It should be noted that for CI servers and the like all maximum Durations
* are scaled using their Duration.dilated method, which uses the
* TestKitExtension.Settings.TestTimeFactor settable via akka.conf entry "akka.test.timefactor".
*
* @author Roland Kuhn
* @since 1.1
*/
class TestKit(_system: ActorSystem) extends { implicit val system = _system } with TestKitBase
object TestKit {
private[testkit] val testActorId = new AtomicInteger(0)
@ -640,22 +660,23 @@ class TestProbe(_application: ActorSystem) extends TestKit(_application) {
* Replies will be available for inspection with all of TestKit's assertion
* methods.
*/
def send(actor: ActorRef, msg: AnyRef) = {
actor.!(msg)(testActor)
}
def send(actor: ActorRef, msg: Any): Unit = actor.!(msg)(testActor)
/**
* Forward this message as if in the TestActor's receive method with self.forward.
*/
def forward(actor: ActorRef, msg: AnyRef = lastMessage.msg) {
actor.!(msg)(lastMessage.sender)
}
def forward(actor: ActorRef, msg: Any = lastMessage.msg): Unit = actor.!(msg)(lastMessage.sender)
/**
* Get sender of last received message.
*/
def sender = lastMessage.sender
/**
* Send message to the sender of the last dequeued message.
*/
def reply(msg: Any): Unit = sender.!(msg)(ref)
}
object TestProbe {

View file

@ -46,9 +46,13 @@ object AkkaSpec {
ConfigFactory.parseMap(map.asJava)
}
def getCallerName: String = {
def getCallerName(clazz: Class[_]): String = {
val s = Thread.currentThread.getStackTrace map (_.getClassName) drop 1 dropWhile (_ matches ".*AkkaSpec.?$")
s.head.replaceFirst(""".*\.""", "").replaceAll("[^a-zA-Z_0-9]", "_")
val reduced = s.lastIndexWhere(_ == clazz.getName) match {
case -1 s
case z s drop (z + 1)
}
reduced.head.replaceFirst(""".*\.""", "").replaceAll("[^a-zA-Z_0-9]", "_")
}
}
@ -56,13 +60,13 @@ object AkkaSpec {
abstract class AkkaSpec(_system: ActorSystem)
extends TestKit(_system) with WordSpec with MustMatchers with BeforeAndAfterAll {
def this(config: Config) = this(ActorSystem(AkkaSpec.getCallerName, config.withFallback(AkkaSpec.testConf)))
def this(config: Config) = this(ActorSystem(AkkaSpec.getCallerName(getClass), config.withFallback(AkkaSpec.testConf)))
def this(s: String) = this(ConfigFactory.parseString(s))
def this(configMap: Map[String, _]) = this(AkkaSpec.mapToConfig(configMap))
def this() = this(ActorSystem(AkkaSpec.getCallerName, AkkaSpec.testConf))
def this() = this(ActorSystem(AkkaSpec.getCallerName(getClass), AkkaSpec.testConf))
val log: LoggingAdapter = Logging(system, this.getClass)

View file

@ -246,11 +246,18 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA
a.underlying.dispatcher.getClass must be(classOf[Dispatcher])
}
"proxy receive for the underlying actor" in {
"proxy receive for the underlying actor without sender" in {
val ref = TestActorRef[WorkerActor]
ref.receive("work")
ref.isTerminated must be(true)
}
"proxy receive for the underlying actor with sender" in {
val ref = TestActorRef[WorkerActor]
ref.receive("work", testActor)
ref.isTerminated must be(true)
expectMsg("workDone")
}
}
}