From 806b5efcdfa907048e04512b3ab99d8c4e06dae9 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 6 Sep 2012 10:04:52 +0200 Subject: [PATCH 01/14] Fix NPE due to initialization order, see #2473 --- akka-cluster/src/main/scala/akka/cluster/Cluster.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index c76b637164..d648880396 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -149,7 +149,9 @@ class Cluster(val system: ExtendedActorSystem, val failureDetector: FailureDetec /** * INTERNAL API */ - private[cluster] val clusterCore: ActorRef = { + private[cluster] lazy val clusterCore: ActorRef = { + // this val must be lazy for correct initialization order, + // ClusterDaemon children may use for example subscribe before we get the GetClusterCoreRef reply implicit val timeout = system.settings.CreationTimeout Await.result((clusterDaemons ? InternalClusterAction.GetClusterCoreRef).mapTo[ActorRef], timeout.duration) } From bd6c39178ce53cc370198c4acdbf45ac5ad7fc63 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 6 Sep 2012 21:48:40 +0200 Subject: [PATCH 02/14] Fix leaking this in constructor of Cluster, see #2473 * Major refactoring to remove the need to use special Cluster instance for testing. Use default Cluster extension instead. Most of it is trivial changes. * Used failure-detector.implementation-class from config to swap to Puppet * Removed FailureDetectorStrategy, since it doesn't add any value * Added Cluster.joinSeedNodes to be able to test seedNodes when Addresses are unknown before startup time. * Removed ClusterEnvironment that was passed around among the actors, instead they use the ordinary Cluster extension. * Overall much cleaner design --- .../src/main/resources/reference.conf | 8 +- .../src/main/scala/akka/cluster/Cluster.scala | 38 ++++---- .../scala/akka/cluster/ClusterDaemon.scala | 91 +++++++++---------- .../scala/akka/cluster/ClusterEvent.scala | 2 +- .../scala/akka/cluster/ClusterHeartbeat.scala | 27 +++--- ...ientDowningNodeThatIsUnreachableSpec.scala | 28 +++--- .../ClientDowningNodeThatIsUpSpec.scala | 28 +++--- .../ClusterAccrualFailureDetectorSpec.scala | 6 +- .../scala/akka/cluster/ConvergenceSpec.scala | 28 +++--- .../cluster/FailureDetectorStrategy.scala | 61 ------------- .../akka/cluster/JoinInProgressSpec.scala | 4 +- .../scala/akka/cluster/JoinSeedNodeSpec.scala | 23 +++-- .../akka/cluster/JoinTwoClustersSpec.scala | 14 +-- .../scala/akka/cluster/LargeClusterSpec.scala | 10 +- ...aderDowningNodeThatIsUnreachableSpec.scala | 28 +++--- .../akka/cluster/LeaderElectionSpec.scala | 32 ++++--- .../akka/cluster/LeaderLeavingSpec.scala | 8 +- .../MembershipChangeListenerExitingSpec.scala | 8 +- .../MembershipChangeListenerJoinSpec.scala | 6 +- .../MembershipChangeListenerLeavingSpec.scala | 8 +- .../MembershipChangeListenerUpSpec.scala | 8 +- .../akka/cluster/MultiNodeClusterSpec.scala | 50 ++++++---- ...LeavingAndExitingAndBeingRemovedSpec.scala | 8 +- .../cluster/NodeLeavingAndExitingSpec.scala | 8 +- .../akka/cluster/NodeMembershipSpec.scala | 8 +- .../scala/akka/cluster/NodeUpSpec.scala | 6 +- .../akka/cluster/SingletonClusterSpec.scala | 20 ++-- .../scala/akka/cluster/SplitBrainSpec.scala | 32 ++++--- .../scala/akka/cluster/SunnyWeatherSpec.scala | 10 +- .../scala/akka/cluster/TransitionSpec.scala | 8 +- .../UnreachableNodeRejoinsClusterSpec.scala | 27 +++--- .../test/scala/akka/cluster/ClusterSpec.scala | 6 +- .../akka/cluster/FailureDetectorPuppet.scala | 4 +- 33 files changed, 313 insertions(+), 340 deletions(-) delete mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/FailureDetectorStrategy.scala diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 30e425dafd..dadda1ba33 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -63,6 +63,12 @@ akka { failure-detector { + # FQCN of the failure detector implementation. + # It must implement akka.cluster.akka.cluster and + # have constructor with akka.actor.ActorSystem and + # akka.cluster.ClusterSettings parameters + implementation-class = "akka.cluster.AccrualFailureDetector" + # defines the failure detector threshold # A low threshold is prone to generate many wrong suspicions but ensures # a quick detection in the event of a real crash. Conversely, a high @@ -84,8 +90,6 @@ akka { # network drop. acceptable-heartbeat-pause = 3s - implementation-class = "akka.cluster.AccrualFailureDetector" - max-sample-size = 1000 } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index d648880396..ab4ca8b37b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -40,19 +40,7 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { override def lookup = Cluster - override def createExtension(system: ExtendedActorSystem): Cluster = { - val clusterSettings = new ClusterSettings(system.settings.config, system.name) - - val failureDetector = { - import clusterSettings.{ FailureDetectorImplementationClass ⇒ fqcn } - system.dynamicAccess.createInstanceFor[FailureDetector]( - fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> clusterSettings)).fold( - e ⇒ throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString), - identity) - } - - new Cluster(system, failureDetector) - } + override def createExtension(system: ExtendedActorSystem): Cluster = new Cluster(system) } /** @@ -69,7 +57,7 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { * if (Cluster(system).isLeader) { ... } * }}} */ -class Cluster(val system: ExtendedActorSystem, val failureDetector: FailureDetector) extends Extension with ClusterEnvironment { +class Cluster(val system: ExtendedActorSystem) extends Extension { import ClusterEvent._ @@ -88,6 +76,14 @@ class Cluster(val system: ExtendedActorSystem, val failureDetector: FailureDetec log.info("Cluster Node [{}] - is starting up...", selfAddress) + val failureDetector = { + import settings.{ FailureDetectorImplementationClass ⇒ fqcn } + system.dynamicAccess.createInstanceFor[FailureDetector]( + fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> settings)).fold( + e ⇒ throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString), + identity) + } + // ======================================================== // ===================== WORK DAEMONS ===================== // ======================================================== @@ -142,16 +138,14 @@ class Cluster(val system: ExtendedActorSystem, val failureDetector: FailureDetec // create supervisor for daemons under path "/system/cluster" private val clusterDaemons: ActorRef = { - system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new ClusterDaemon(this)). + system.asInstanceOf[ActorSystemImpl].systemActorOf(Props[ClusterDaemon]. withDispatcher(UseDispatcher), name = "cluster") } /** * INTERNAL API */ - private[cluster] lazy val clusterCore: ActorRef = { - // this val must be lazy for correct initialization order, - // ClusterDaemon children may use for example subscribe before we get the GetClusterCoreRef reply + private[cluster] val clusterCore: ActorRef = { implicit val timeout = system.settings.CreationTimeout Await.result((clusterDaemons ? InternalClusterAction.GetClusterCoreRef).mapTo[ActorRef], timeout.duration) } @@ -219,10 +213,12 @@ class Cluster(val system: ExtendedActorSystem, val failureDetector: FailureDetec // ======================================================== /** - * Make it possible to override/configure seedNodes from tests without - * specifying in config. Addresses are unknown before startup time. + * Make it possible to join the specified seed nodes without defining them + * in config. Especially useful from tests when Addresses are unknown + * before startup time. */ - private[cluster] def seedNodes: IndexedSeq[Address] = SeedNodes + private[cluster] def joinSeedNodes(seedNodes: IndexedSeq[Address]): Unit = + clusterCore ! InternalClusterAction.JoinSeedNodes(seedNodes) /** * INTERNAL API. diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index ee8d1e0374..063fc54363 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -58,6 +58,12 @@ private[cluster] object InternalClusterAction { */ case class JoinTo(address: Address) extends ClusterMessage + /** + * Command to initiate the process to join the specified + * seed nodes. + */ + case class JoinSeedNodes(seedNodes: IndexedSeq[Address]) + /** * Start message of the process to join one of the seed nodes. * The node sends `InitJoin` to all seed nodes, which replies @@ -128,33 +134,21 @@ private[cluster] object ClusterLeaderAction { case class Remove(address: Address) extends ClusterMessage } -/** - * INTERNAL API - * - * The contextual pieces that ClusterDaemon actors need. - * Makes it easier to test the actors without using the Cluster extension. - */ -private[cluster] trait ClusterEnvironment { - private[cluster] def settings: ClusterSettings - private[cluster] def failureDetector: FailureDetector - private[cluster] def selfAddress: Address - private[cluster] def scheduler: Scheduler - private[cluster] def seedNodes: IndexedSeq[Address] - private[cluster] def shutdown(): Unit -} - /** * INTERNAL API. * * Supervisor managing the different Cluster daemons. */ -private[cluster] final class ClusterDaemon(environment: ClusterEnvironment) extends Actor with ActorLogging { +private[cluster] final class ClusterDaemon extends Actor with ActorLogging { - val configuredDispatcher = environment.settings.UseDispatcher - val core = context.actorOf(Props(new ClusterCoreDaemon(environment)). - withDispatcher(configuredDispatcher), name = "core") - val heartbeat = context.actorOf(Props(new ClusterHeartbeatDaemon(environment)). - withDispatcher(configuredDispatcher), name = "heartbeat") + // Important - don't use Cluster(context.system) here because that would + // cause deadlock. The Cluster extension is currently being created and is waiting + // for response from GetClusterCoreRef in its constructor. + + val core = context.actorOf(Props[ClusterCoreDaemon]. + withDispatcher(context.props.dispatcher), name = "core") + val heartbeat = context.actorOf(Props[ClusterHeartbeatDaemon]. + withDispatcher(context.props.dispatcher), name = "heartbeat") def receive = { case InternalClusterAction.GetClusterCoreRef ⇒ sender ! core @@ -165,16 +159,16 @@ private[cluster] final class ClusterDaemon(environment: ClusterEnvironment) exte /** * INTERNAL API. */ -private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) extends Actor with ActorLogging { +private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging { import ClusterLeaderAction._ import InternalClusterAction._ import ClusterHeartbeatSender._ - def selfAddress = environment.selfAddress - def clusterScheduler = environment.scheduler - def failureDetector = environment.failureDetector - val settings = environment.settings - import settings._ + val cluster = Cluster(context.system) + def selfAddress = cluster.selfAddress + def clusterScheduler = cluster.scheduler + def failureDetector = cluster.failureDetector + import cluster.settings._ val vclockNode = VectorClock.Node(selfAddress.toString) val selfHeartbeat = Heartbeat(selfAddress) @@ -186,11 +180,11 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) var stats = ClusterStats() - val heartbeatSender = context.actorOf(Props(new ClusterHeartbeatSender(environment)). + val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender]. withDispatcher(UseDispatcher), name = "heartbeatSender") - val coreSender = context.actorOf(Props(new ClusterCoreSender(selfAddress)). + val coreSender = context.actorOf(Props[ClusterCoreSender]. withDispatcher(UseDispatcher), name = "coreSender") - val publisher = context.actorOf(Props(new ClusterDomainEventPublisher(environment)). + val publisher = context.actorOf(Props[ClusterDomainEventPublisher]. withDispatcher(UseDispatcher), name = "publisher") import context.dispatcher @@ -227,14 +221,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) }) override def preStart(): Unit = { - if (AutoJoin) { - // only the node which is named first in the list of seed nodes will join itself - if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress) - self ! JoinTo(selfAddress) - else - context.actorOf(Props(new JoinSeedNodeProcess(environment)). - withDispatcher(UseDispatcher), name = "joinSeedNodeProcess") - } + if (AutoJoin) self ! JoinSeedNodes(SeedNodes) } override def postStop(): Unit = { @@ -248,6 +235,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) def uninitialized: Actor.Receive = { case InitJoin ⇒ // skip, not ready yet case JoinTo(address) ⇒ join(address) + case JoinSeedNodes(seedNodes) ⇒ joinSeedNodes(seedNodes) case msg: SubscriptionMessage ⇒ publisher forward msg case _: Tick ⇒ // ignore periodic tasks until initialized } @@ -282,6 +270,15 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) def initJoin(): Unit = sender ! InitJoinAck(selfAddress) + def joinSeedNodes(seedNodes: IndexedSeq[Address]): Unit = { + // only the node which is named first in the list of seed nodes will join itself + if (seedNodes.isEmpty || seedNodes.head == selfAddress) + self ! JoinTo(selfAddress) + else + context.actorOf(Props(new JoinSeedNodeProcess(seedNodes)). + withDispatcher(UseDispatcher), name = "joinSeedNodeProcess") + } + /** * Try to join this cluster node with the node specified by 'address'. * A 'Join(thisNodeAddress)' command is sent to the node to join. @@ -393,7 +390,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) // make sure the final (removed) state is published // before shutting down implicit val timeout = Timeout(5 seconds) - publisher ? PublishDone onComplete { case _ ⇒ environment.shutdown() } + publisher ? PublishDone onComplete { case _ ⇒ cluster.shutdown() } } /** @@ -796,8 +793,6 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) } } - def seedNodes: IndexedSeq[Address] = environment.seedNodes - def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] = if (addresses.isEmpty) None else Some(addresses(ThreadLocalRandom.current nextInt addresses.size)) @@ -865,22 +860,22 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) * 5. seed3 retries the join procedure and gets acks from seed2 first, and then joins to seed2 * */ -private[cluster] final class JoinSeedNodeProcess(environment: ClusterEnvironment) extends Actor with ActorLogging { +private[cluster] final class JoinSeedNodeProcess(seedNodes: IndexedSeq[Address]) extends Actor with ActorLogging { import InternalClusterAction._ - def selfAddress = environment.selfAddress + def selfAddress = Cluster(context.system).selfAddress - if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress) + if (seedNodes.isEmpty || seedNodes.head == selfAddress) throw new IllegalArgumentException("Join seed node should not be done") - context.setReceiveTimeout(environment.settings.SeedNodeTimeout) + context.setReceiveTimeout(Cluster(context.system).settings.SeedNodeTimeout) override def preStart(): Unit = self ! JoinSeedNode def receive = { case JoinSeedNode ⇒ // send InitJoin to all seed nodes (except myself) - environment.seedNodes.collect { + seedNodes.collect { case a if a != selfAddress ⇒ context.system.actorFor(context.parent.path.toStringWithAddress(a)) } foreach { _ ! InitJoin } case InitJoinAck(address) ⇒ @@ -901,9 +896,11 @@ private[cluster] final class JoinSeedNodeProcess(environment: ClusterEnvironment /** * INTERNAL API. */ -private[cluster] final class ClusterCoreSender(selfAddress: Address) extends Actor with ActorLogging { +private[cluster] final class ClusterCoreSender extends Actor with ActorLogging { import InternalClusterAction._ + val selfAddress = Cluster(context.system).selfAddress + /** * Looks up and returns the remote cluster command connection for the specific address. */ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index cbcee78cc9..ae1bd6b6ac 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -169,7 +169,7 @@ object ClusterEvent { * Responsible for domain event subscriptions and publishing of * domain events to event bus. */ -private[cluster] final class ClusterDomainEventPublisher(environment: ClusterEnvironment) extends Actor with ActorLogging { +private[cluster] final class ClusterDomainEventPublisher extends Actor with ActorLogging { import InternalClusterAction._ var latestGossip: Gossip = Gossip() diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index 932be34d2f..a7b4d52daa 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -23,10 +23,12 @@ case class Heartbeat(from: Address) extends ClusterMessage * Instantiated as a single instance for each Cluster - e.g. heartbeats are serialized * to Cluster message after message, but concurrent with other types of messages. */ -private[cluster] final class ClusterHeartbeatDaemon(environment: ClusterEnvironment) extends Actor with ActorLogging { +private[cluster] final class ClusterHeartbeatDaemon extends Actor with ActorLogging { + + val failureDetector = Cluster(context.system).failureDetector def receive = { - case Heartbeat(from) ⇒ environment.failureDetector heartbeat from + case Heartbeat(from) ⇒ failureDetector heartbeat from } } @@ -53,7 +55,7 @@ private[cluster] object ClusterHeartbeatSender { * address and thereby reduce the risk of irregular heartbeats to healty * nodes due to broken connections to other nodes. */ -private[cluster] final class ClusterHeartbeatSender(environment: ClusterEnvironment) extends Actor with ActorLogging { +private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging { import ClusterHeartbeatSender._ /** @@ -78,8 +80,7 @@ private[cluster] final class ClusterHeartbeatSender(environment: ClusterEnvironm val workerName = encodeChildName(to.toString) val worker = context.actorFor(workerName) match { case notFound if notFound.isTerminated ⇒ - context.actorOf(Props(new ClusterHeartbeatSenderWorker( - environment.settings.SendCircuitBreakerSettings, clusterHeartbeatConnectionFor(to))), workerName) + context.actorOf(Props(new ClusterHeartbeatSenderWorker(clusterHeartbeatConnectionFor(to))), workerName) case child ⇒ child } worker ! msg @@ -96,17 +97,19 @@ private[cluster] final class ClusterHeartbeatSender(environment: ClusterEnvironm * * @see ClusterHeartbeatSender */ -private[cluster] final class ClusterHeartbeatSenderWorker( - cbSettings: CircuitBreakerSettings, toRef: ActorRef) +private[cluster] final class ClusterHeartbeatSenderWorker(toRef: ActorRef) extends Actor with ActorLogging { import ClusterHeartbeatSender._ - val breaker = CircuitBreaker(context.system.scheduler, - cbSettings.maxFailures, cbSettings.callTimeout, cbSettings.resetTimeout). - onHalfOpen(log.debug("CircuitBreaker Half-Open for: [{}]", toRef)). - onOpen(log.debug("CircuitBreaker Open for [{}]", toRef)). - onClose(log.debug("CircuitBreaker Closed for [{}]", toRef)) + val breaker = { + val cbSettings = Cluster(context.system).settings.SendCircuitBreakerSettings + CircuitBreaker(context.system.scheduler, + cbSettings.maxFailures, cbSettings.callTimeout, cbSettings.resetTimeout). + onHalfOpen(log.debug("CircuitBreaker Half-Open for: [{}]", toRef)). + onOpen(log.debug("CircuitBreaker Open for [{}]", toRef)). + onClose(log.debug("CircuitBreaker Closed for [{}]", toRef)) + } // make sure it will cleanup when not used any more context.setReceiveTimeout(30 seconds) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index e0440394a7..49483d39ef 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -9,30 +9,32 @@ import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.actor.Address -object ClientDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { +case class ClientDowningNodeThatIsUnreachableMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") val fourth = role("fourth") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) } -class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy -class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy -class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy -class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy +class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true) +class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true) +class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true) +class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true) -class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false) +class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false) +class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false) +class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false) -abstract class ClientDowningNodeThatIsUnreachableSpec - extends MultiNodeSpec(ClientDowningNodeThatIsUnreachableMultiJvmSpec) +abstract class ClientDowningNodeThatIsUnreachableSpec(multiNodeConfig: ClientDowningNodeThatIsUnreachableMultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) with MultiNodeClusterSpec { - import ClientDowningNodeThatIsUnreachableMultiJvmSpec._ + def this(failureDetectorPuppet: Boolean) = this(ClientDowningNodeThatIsUnreachableMultiNodeConfig(failureDetectorPuppet)) + + import multiNodeConfig._ "Client of a 4 node cluster" must { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala index 82d90c81b5..5a7308ec92 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -9,30 +9,32 @@ import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.actor.Address -object ClientDowningNodeThatIsUpMultiJvmSpec extends MultiNodeConfig { +case class ClientDowningNodeThatIsUpMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") val fourth = role("fourth") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) } -class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy -class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy -class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy -class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy +class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = true) +class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = true) +class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = true) +class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = true) -class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = false) +class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = false) +class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = false) +class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = false) -abstract class ClientDowningNodeThatIsUpSpec - extends MultiNodeSpec(ClientDowningNodeThatIsUpMultiJvmSpec) +abstract class ClientDowningNodeThatIsUpSpec(multiNodeConfig: ClientDowningNodeThatIsUpMultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) with MultiNodeClusterSpec { - import ClientDowningNodeThatIsUpMultiJvmSpec._ + def this(failureDetectorPuppet: Boolean) = this(ClientDowningNodeThatIsUpMultiNodeConfig(failureDetectorPuppet)) + + import multiNodeConfig._ "Client of a 4 node cluster" must { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala index 89ad712815..97d1e870a5 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala @@ -19,9 +19,9 @@ object ClusterAccrualFailureDetectorMultiJvmSpec extends MultiNodeConfig { withFallback(MultiNodeClusterSpec.clusterConfig)) } -class ClusterAccrualFailureDetectorMultiJvmNode1 extends ClusterAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy -class ClusterAccrualFailureDetectorMultiJvmNode2 extends ClusterAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy -class ClusterAccrualFailureDetectorMultiJvmNode3 extends ClusterAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy +class ClusterAccrualFailureDetectorMultiJvmNode1 extends ClusterAccrualFailureDetectorSpec +class ClusterAccrualFailureDetectorMultiJvmNode2 extends ClusterAccrualFailureDetectorSpec +class ClusterAccrualFailureDetectorMultiJvmNode3 extends ClusterAccrualFailureDetectorSpec abstract class ClusterAccrualFailureDetectorSpec extends MultiNodeSpec(ClusterAccrualFailureDetectorMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala index 1862b8ea40..420b2acacb 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala @@ -12,7 +12,7 @@ import akka.testkit._ import scala.concurrent.util.duration._ import akka.actor.Address -object ConvergenceMultiJvmSpec extends MultiNodeConfig { +case class ConvergenceMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") @@ -20,24 +20,26 @@ object ConvergenceMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString("akka.cluster.failure-detector.threshold = 4")). - withFallback(MultiNodeClusterSpec.clusterConfig)) + withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) } -class ConvergenceWithFailureDetectorPuppetMultiJvmNode1 extends ConvergenceSpec with FailureDetectorPuppetStrategy -class ConvergenceWithFailureDetectorPuppetMultiJvmNode2 extends ConvergenceSpec with FailureDetectorPuppetStrategy -class ConvergenceWithFailureDetectorPuppetMultiJvmNode3 extends ConvergenceSpec with FailureDetectorPuppetStrategy -class ConvergenceWithFailureDetectorPuppetMultiJvmNode4 extends ConvergenceSpec with FailureDetectorPuppetStrategy +class ConvergenceWithFailureDetectorPuppetMultiJvmNode1 extends ConvergenceSpec(failureDetectorPuppet = true) +class ConvergenceWithFailureDetectorPuppetMultiJvmNode2 extends ConvergenceSpec(failureDetectorPuppet = true) +class ConvergenceWithFailureDetectorPuppetMultiJvmNode3 extends ConvergenceSpec(failureDetectorPuppet = true) +class ConvergenceWithFailureDetectorPuppetMultiJvmNode4 extends ConvergenceSpec(failureDetectorPuppet = true) -class ConvergenceWithAccrualFailureDetectorMultiJvmNode1 extends ConvergenceSpec with AccrualFailureDetectorStrategy -class ConvergenceWithAccrualFailureDetectorMultiJvmNode2 extends ConvergenceSpec with AccrualFailureDetectorStrategy -class ConvergenceWithAccrualFailureDetectorMultiJvmNode3 extends ConvergenceSpec with AccrualFailureDetectorStrategy -class ConvergenceWithAccrualFailureDetectorMultiJvmNode4 extends ConvergenceSpec with AccrualFailureDetectorStrategy +class ConvergenceWithAccrualFailureDetectorMultiJvmNode1 extends ConvergenceSpec(failureDetectorPuppet = false) +class ConvergenceWithAccrualFailureDetectorMultiJvmNode2 extends ConvergenceSpec(failureDetectorPuppet = false) +class ConvergenceWithAccrualFailureDetectorMultiJvmNode3 extends ConvergenceSpec(failureDetectorPuppet = false) +class ConvergenceWithAccrualFailureDetectorMultiJvmNode4 extends ConvergenceSpec(failureDetectorPuppet = false) -abstract class ConvergenceSpec - extends MultiNodeSpec(ConvergenceMultiJvmSpec) +abstract class ConvergenceSpec(multiNodeConfig: ConvergenceMultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) with MultiNodeClusterSpec { - import ConvergenceMultiJvmSpec._ + def this(failureDetectorPuppet: Boolean) = this(ConvergenceMultiNodeConfig(failureDetectorPuppet)) + + import multiNodeConfig._ "A cluster of 3 members" must { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/FailureDetectorStrategy.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/FailureDetectorStrategy.scala deleted file mode 100644 index 86e03f9457..0000000000 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/FailureDetectorStrategy.scala +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.cluster - -import akka.actor.Address -import akka.remote.testkit.MultiNodeSpec -import akka.testkit._ - -/** - * Base trait for all failure detector strategies. - */ -trait FailureDetectorStrategy { - - /** - * Get or create the FailureDetector to be used in the cluster node. - * To be defined by subclass. - */ - def failureDetector: FailureDetector - - /** - * Marks a node as available in the failure detector. - * To be defined by subclass. - */ - def markNodeAsAvailable(address: Address): Unit - - /** - * Marks a node as unavailable in the failure detector. - * To be defined by subclass. - */ - def markNodeAsUnavailable(address: Address): Unit -} - -/** - * Defines a FailureDetectorPuppet-based FailureDetectorStrategy. - */ -trait FailureDetectorPuppetStrategy extends FailureDetectorStrategy { self: MultiNodeSpec ⇒ - - /** - * The puppet instance. Separated from 'failureDetector' field so we don't have to cast when using the puppet specific methods. - */ - private val puppet = new FailureDetectorPuppet(system) - - override def failureDetector: FailureDetector = puppet - - override def markNodeAsAvailable(address: Address): Unit = puppet markNodeAsAvailable address - - override def markNodeAsUnavailable(address: Address): Unit = puppet markNodeAsUnavailable address -} - -/** - * Defines a AccrualFailureDetector-based FailureDetectorStrategy. - */ -trait AccrualFailureDetectorStrategy extends FailureDetectorStrategy { self: MultiNodeSpec ⇒ - - override val failureDetector: FailureDetector = new AccrualFailureDetector(system, new ClusterSettings(system.settings.config, system.name)) - - override def markNodeAsAvailable(address: Address): Unit = () - - override def markNodeAsUnavailable(address: Address): Unit = () -} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinInProgressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinInProgressSpec.scala index 61636f6358..e198694aab 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinInProgressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinInProgressSpec.scala @@ -29,8 +29,8 @@ object JoinInProgressMultiJvmSpec extends MultiNodeConfig { .withFallback(MultiNodeClusterSpec.clusterConfig))) } -class JoinInProgressMultiJvmNode1 extends JoinInProgressSpec with AccrualFailureDetectorStrategy -class JoinInProgressMultiJvmNode2 extends JoinInProgressSpec with AccrualFailureDetectorStrategy +class JoinInProgressMultiJvmNode1 extends JoinInProgressSpec +class JoinInProgressMultiJvmNode2 extends JoinInProgressSpec abstract class JoinInProgressSpec extends MultiNodeSpec(JoinInProgressMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala index 10d98cd86b..1391b80127 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala @@ -9,6 +9,7 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import scala.concurrent.util.duration._ +import akka.actor.Address object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig { val seed1 = role("seed1") @@ -18,15 +19,15 @@ object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig { val ordinary2 = role("ordinary2") commonConfig(debugConfig(on = false). - withFallback(ConfigFactory.parseString("akka.cluster.auto-join = on")). + withFallback(ConfigFactory.parseString("akka.cluster.auto-join = off")). withFallback(MultiNodeClusterSpec.clusterConfig)) } -class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy -class JoinSeedNodeMultiJvmNode2 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy -class JoinSeedNodeMultiJvmNode3 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy -class JoinSeedNodeMultiJvmNode4 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy -class JoinSeedNodeMultiJvmNode5 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy +class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec +class JoinSeedNodeMultiJvmNode2 extends JoinSeedNodeSpec +class JoinSeedNodeMultiJvmNode3 extends JoinSeedNodeSpec +class JoinSeedNodeMultiJvmNode4 extends JoinSeedNodeSpec +class JoinSeedNodeMultiJvmNode5 extends JoinSeedNodeSpec abstract class JoinSeedNodeSpec extends MultiNodeSpec(JoinSeedNodeMultiJvmSpec) @@ -34,9 +35,9 @@ abstract class JoinSeedNodeSpec import JoinSeedNodeMultiJvmSpec._ - override def seedNodes = IndexedSeq(seed1, seed2, seed3) + def seedNodes: IndexedSeq[Address] = IndexedSeq(seed1, seed2, seed3) - "A cluster with configured seed nodes" must { + "A cluster with seed nodes" must { "be able to start the seed nodes concurrently" taggedAs LongRunningTest in { runOn(seed1) { @@ -45,12 +46,16 @@ abstract class JoinSeedNodeSpec } runOn(seed1, seed2, seed3) { + cluster.joinSeedNodes(seedNodes) awaitUpConvergence(3) } enterBarrier("after-1") } - "join the seed nodes at startup" taggedAs LongRunningTest in { + "join the seed nodes" taggedAs LongRunningTest in { + runOn(ordinary1, ordinary2) { + cluster.joinSeedNodes(seedNodes) + } awaitUpConvergence(roles.size) enterBarrier("after-2") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala index d34a48f48e..efa9e3f25d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala @@ -17,15 +17,15 @@ object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig { val c1 = role("c1") val c2 = role("c2") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } -class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy -class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy -class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy -class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy -class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy -class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy +class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala index dd880a76d8..0f0bc0b429 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -70,11 +70,11 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig { """)) } -class LargeClusterMultiJvmNode1 extends LargeClusterSpec with AccrualFailureDetectorStrategy -class LargeClusterMultiJvmNode2 extends LargeClusterSpec with AccrualFailureDetectorStrategy -class LargeClusterMultiJvmNode3 extends LargeClusterSpec with AccrualFailureDetectorStrategy -class LargeClusterMultiJvmNode4 extends LargeClusterSpec with AccrualFailureDetectorStrategy -class LargeClusterMultiJvmNode5 extends LargeClusterSpec with AccrualFailureDetectorStrategy +class LargeClusterMultiJvmNode1 extends LargeClusterSpec +class LargeClusterMultiJvmNode2 extends LargeClusterSpec +class LargeClusterMultiJvmNode3 extends LargeClusterSpec +class LargeClusterMultiJvmNode4 extends LargeClusterSpec +class LargeClusterMultiJvmNode5 extends LargeClusterSpec abstract class LargeClusterSpec extends MultiNodeSpec(LargeClusterMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala index 2ae0a79483..01bacf9b14 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala @@ -12,7 +12,7 @@ import akka.testkit._ import akka.actor._ import scala.concurrent.util.duration._ -object LeaderDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { +case class LeaderDowningNodeThatIsUnreachableMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") @@ -20,24 +20,26 @@ object LeaderDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString("akka.cluster.auto-down = on")). - withFallback(MultiNodeClusterSpec.clusterConfig)) + withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) } -class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy -class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy -class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy -class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy +class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true) +class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true) +class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true) +class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true) -class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy -class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy -class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy -class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy +class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false) +class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false) +class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false) +class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false) -abstract class LeaderDowningNodeThatIsUnreachableSpec - extends MultiNodeSpec(LeaderDowningNodeThatIsUnreachableMultiJvmSpec) +abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDowningNodeThatIsUnreachableMultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) with MultiNodeClusterSpec { - import LeaderDowningNodeThatIsUnreachableMultiJvmSpec._ + def this(failureDetectorPuppet: Boolean) = this(LeaderDowningNodeThatIsUnreachableMultiNodeConfig(failureDetectorPuppet)) + + import multiNodeConfig._ "The Leader in a 4 node cluster" must { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index 1a657b3da8..8c2198dd7b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -9,33 +9,35 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ -object LeaderElectionMultiJvmSpec extends MultiNodeConfig { +case class LeaderElectionMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val controller = role("controller") val first = role("first") val second = role("second") val third = role("third") val fourth = role("fourth") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) } -class LeaderElectionWithFailureDetectorPuppetMultiJvmNode1 extends LeaderElectionSpec with FailureDetectorPuppetStrategy -class LeaderElectionWithFailureDetectorPuppetMultiJvmNode2 extends LeaderElectionSpec with FailureDetectorPuppetStrategy -class LeaderElectionWithFailureDetectorPuppetMultiJvmNode3 extends LeaderElectionSpec with FailureDetectorPuppetStrategy -class LeaderElectionWithFailureDetectorPuppetMultiJvmNode4 extends LeaderElectionSpec with FailureDetectorPuppetStrategy -class LeaderElectionWithFailureDetectorPuppetMultiJvmNode5 extends LeaderElectionSpec with FailureDetectorPuppetStrategy +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode1 extends LeaderElectionSpec(failureDetectorPuppet = true) +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode2 extends LeaderElectionSpec(failureDetectorPuppet = true) +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode3 extends LeaderElectionSpec(failureDetectorPuppet = true) +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode4 extends LeaderElectionSpec(failureDetectorPuppet = true) +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode5 extends LeaderElectionSpec(failureDetectorPuppet = true) -class LeaderElectionWithAccrualFailureDetectorMultiJvmNode1 extends LeaderElectionSpec with AccrualFailureDetectorStrategy -class LeaderElectionWithAccrualFailureDetectorMultiJvmNode2 extends LeaderElectionSpec with AccrualFailureDetectorStrategy -class LeaderElectionWithAccrualFailureDetectorMultiJvmNode3 extends LeaderElectionSpec with AccrualFailureDetectorStrategy -class LeaderElectionWithAccrualFailureDetectorMultiJvmNode4 extends LeaderElectionSpec with AccrualFailureDetectorStrategy -class LeaderElectionWithAccrualFailureDetectorMultiJvmNode5 extends LeaderElectionSpec with AccrualFailureDetectorStrategy +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode1 extends LeaderElectionSpec(failureDetectorPuppet = false) +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode2 extends LeaderElectionSpec(failureDetectorPuppet = false) +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode3 extends LeaderElectionSpec(failureDetectorPuppet = false) +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode4 extends LeaderElectionSpec(failureDetectorPuppet = false) +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode5 extends LeaderElectionSpec(failureDetectorPuppet = false) -abstract class LeaderElectionSpec - extends MultiNodeSpec(LeaderElectionMultiJvmSpec) +abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) with MultiNodeClusterSpec { - import LeaderElectionMultiJvmSpec._ + def this(failureDetectorPuppet: Boolean) = this(LeaderElectionMultiNodeConfig(failureDetectorPuppet)) + + import multiNodeConfig._ // sorted in the order used by the cluster lazy val sortedRoles = Seq(first, second, third, fourth).sorted diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala index 15e308cafb..eb0aa40591 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala @@ -23,12 +23,12 @@ object LeaderLeavingMultiJvmSpec extends MultiNodeConfig { .withFallback(ConfigFactory.parseString(""" # turn off unreachable reaper akka.cluster.unreachable-nodes-reaper-interval = 300 s""") - .withFallback(MultiNodeClusterSpec.clusterConfig))) + .withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))) } -class LeaderLeavingMultiJvmNode1 extends LeaderLeavingSpec with FailureDetectorPuppetStrategy -class LeaderLeavingMultiJvmNode2 extends LeaderLeavingSpec with FailureDetectorPuppetStrategy -class LeaderLeavingMultiJvmNode3 extends LeaderLeavingSpec with FailureDetectorPuppetStrategy +class LeaderLeavingMultiJvmNode1 extends LeaderLeavingSpec +class LeaderLeavingMultiJvmNode2 extends LeaderLeavingSpec +class LeaderLeavingMultiJvmNode3 extends LeaderLeavingSpec abstract class LeaderLeavingSpec extends MultiNodeSpec(LeaderLeavingMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala index 1d50fe0d37..afeec13d9e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala @@ -26,12 +26,12 @@ object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig { unreachable-nodes-reaper-interval = 300 s # turn "off" reaping to unreachable node set } """) - .withFallback(MultiNodeClusterSpec.clusterConfig))) + .withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))) } -class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListenerExitingSpec with FailureDetectorPuppetStrategy -class MembershipChangeListenerExitingMultiJvmNode2 extends MembershipChangeListenerExitingSpec with FailureDetectorPuppetStrategy -class MembershipChangeListenerExitingMultiJvmNode3 extends MembershipChangeListenerExitingSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListenerExitingSpec +class MembershipChangeListenerExitingMultiJvmNode2 extends MembershipChangeListenerExitingSpec +class MembershipChangeListenerExitingMultiJvmNode3 extends MembershipChangeListenerExitingSpec abstract class MembershipChangeListenerExitingSpec extends MultiNodeSpec(MembershipChangeListenerExitingMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala index 7c14af7203..b2c69c7c17 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala @@ -17,11 +17,11 @@ object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } -class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec with FailureDetectorPuppetStrategy -class MembershipChangeListenerJoinMultiJvmNode2 extends MembershipChangeListenerJoinSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec +class MembershipChangeListenerJoinMultiJvmNode2 extends MembershipChangeListenerJoinSpec abstract class MembershipChangeListenerJoinSpec extends MultiNodeSpec(MembershipChangeListenerJoinMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala index b1a7663154..835c3d722e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala @@ -24,12 +24,12 @@ object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig { .withFallback(ConfigFactory.parseString(""" akka.cluster.unreachable-nodes-reaper-interval = 300 s # turn "off" """)) - .withFallback(MultiNodeClusterSpec.clusterConfig)) + .withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } -class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListenerLeavingSpec with FailureDetectorPuppetStrategy -class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec with FailureDetectorPuppetStrategy -class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListenerLeavingSpec +class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec +class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec abstract class MembershipChangeListenerLeavingSpec extends MultiNodeSpec(MembershipChangeListenerLeavingMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala index c44e61df57..f4faf4234b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala @@ -16,12 +16,12 @@ object MembershipChangeListenerUpMultiJvmSpec extends MultiNodeConfig { val second = role("second") val third = role("third") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } -class MembershipChangeListenerUpMultiJvmNode1 extends MembershipChangeListenerUpSpec with FailureDetectorPuppetStrategy -class MembershipChangeListenerUpMultiJvmNode2 extends MembershipChangeListenerUpSpec with FailureDetectorPuppetStrategy -class MembershipChangeListenerUpMultiJvmNode3 extends MembershipChangeListenerUpSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerUpMultiJvmNode1 extends MembershipChangeListenerUpSpec +class MembershipChangeListenerUpMultiJvmNode2 extends MembershipChangeListenerUpSpec +class MembershipChangeListenerUpMultiJvmNode3 extends MembershipChangeListenerUpSpec abstract class MembershipChangeListenerUpSpec extends MultiNodeSpec(MembershipChangeListenerUpMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 93c1f921ae..3b593ee87c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -20,6 +20,14 @@ import akka.actor.ActorPath import akka.actor.RootActorPath object MultiNodeClusterSpec { + + def clusterConfigWithFailureDetectorPuppet: Config = + ConfigFactory.parseString("akka.cluster.failure-detector.implementation-class = akka.cluster.FailureDetectorPuppet"). + withFallback(clusterConfig) + + def clusterConfig(failureDetectorPuppet: Boolean): Config = + if (failureDetectorPuppet) clusterConfigWithFailureDetectorPuppet else clusterConfig + def clusterConfig: Config = ConfigFactory.parseString(""" akka.cluster { auto-join = on @@ -31,13 +39,14 @@ object MultiNodeClusterSpec { periodic-tasks-initial-delay = 300 ms publish-stats-interval = 0 s # always, when it happens } + akka.remote.log-remote-lifecycle-events = off akka.test { single-expect-default = 5 s } """) } -trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: MultiNodeSpec ⇒ +trait MultiNodeClusterSpec extends Suite { self: MultiNodeSpec ⇒ override def initialParticipants = roles.size @@ -80,29 +89,12 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu throw t } - /** - * Make it possible to override/configure seedNodes from tests without - * specifying in config. Addresses are unknown before startup time. - */ - protected def seedNodes: IndexedSeq[RoleName] = IndexedSeq.empty - - /** - * The cluster node instance. Needs to be lazily created. - */ - private lazy val clusterNode = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) { - override def seedNodes: IndexedSeq[Address] = { - val testSeedNodes = MultiNodeClusterSpec.this.seedNodes - if (testSeedNodes.isEmpty) super.seedNodes - else testSeedNodes map address - } - } - def clusterView: ClusterReadView = cluster.readView /** * Get the cluster node to use. */ - def cluster: Cluster = clusterNode + def cluster: Cluster = Cluster(system) /** * Use this method for the initial startup of the cluster node. @@ -213,4 +205,24 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu def roleName(addr: Address): Option[RoleName] = roles.find(address(_) == addr) + /** + * Marks a node as available in the failure detector if + * [[akka.cluster.FailureDetectorPuppet]] is used as + * failure detector. + */ + def markNodeAsAvailable(address: Address): Unit = cluster.failureDetector match { + case puppet: FailureDetectorPuppet ⇒ puppet.markNodeAsAvailable(address) + case _ ⇒ + } + + /** + * Marks a node as unavailable in the failure detector if + * [[akka.cluster.FailureDetectorPuppet]] is used as + * failure detector. + */ + def markNodeAsUnavailable(address: Address): Unit = cluster.failureDetector match { + case puppet: FailureDetectorPuppet ⇒ puppet.markNodeAsUnavailable(address) + case _ ⇒ + } + } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala index 752857316b..3fec2f22ad 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala @@ -15,12 +15,12 @@ object NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec extends MultiNodeConfig val second = role("second") val third = role("third") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } -class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec with FailureDetectorPuppetStrategy -class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode2 extends NodeLeavingAndExitingAndBeingRemovedSpec with FailureDetectorPuppetStrategy -class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode3 extends NodeLeavingAndExitingAndBeingRemovedSpec with FailureDetectorPuppetStrategy +class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec +class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode2 extends NodeLeavingAndExitingAndBeingRemovedSpec +class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode3 extends NodeLeavingAndExitingAndBeingRemovedSpec abstract class NodeLeavingAndExitingAndBeingRemovedSpec extends MultiNodeSpec(NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala index 529866c433..0d0b45fd04 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala @@ -23,12 +23,12 @@ object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig { .withFallback(ConfigFactory.parseString(""" # turn off unreachable reaper akka.cluster.unreachable-nodes-reaper-interval = 300 s""") - .withFallback(MultiNodeClusterSpec.clusterConfig))) + .withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))) } -class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec with FailureDetectorPuppetStrategy -class NodeLeavingAndExitingMultiJvmNode2 extends NodeLeavingAndExitingSpec with FailureDetectorPuppetStrategy -class NodeLeavingAndExitingMultiJvmNode3 extends NodeLeavingAndExitingSpec with FailureDetectorPuppetStrategy +class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec +class NodeLeavingAndExitingMultiJvmNode2 extends NodeLeavingAndExitingSpec +class NodeLeavingAndExitingMultiJvmNode3 extends NodeLeavingAndExitingSpec abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExitingMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala index 3df642afad..86e5dd71e9 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala @@ -13,12 +13,12 @@ object NodeMembershipMultiJvmSpec extends MultiNodeConfig { val second = role("second") val third = role("third") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } -class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec with FailureDetectorPuppetStrategy -class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec with FailureDetectorPuppetStrategy -class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec with FailureDetectorPuppetStrategy +class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec +class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec +class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala index 4318d0e79d..0b6cea8683 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala @@ -18,11 +18,11 @@ object NodeUpMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } -class NodeUpMultiJvmNode1 extends NodeUpSpec with FailureDetectorPuppetStrategy -class NodeUpMultiJvmNode2 extends NodeUpSpec with FailureDetectorPuppetStrategy +class NodeUpMultiJvmNode1 extends NodeUpSpec +class NodeUpMultiJvmNode2 extends NodeUpSpec abstract class NodeUpSpec extends MultiNodeSpec(NodeUpMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala index d044d90f72..d84ae89e5d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala @@ -9,7 +9,7 @@ import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import scala.concurrent.util.duration._ -object SingletonClusterMultiJvmSpec extends MultiNodeConfig { +case class SingletonClusterMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") val second = role("second") @@ -21,21 +21,23 @@ object SingletonClusterMultiJvmSpec extends MultiNodeConfig { failure-detector.threshold = 4 } """)). - withFallback(MultiNodeClusterSpec.clusterConfig)) + withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) } -class SingletonClusterWithFailureDetectorPuppetMultiJvmNode1 extends SingletonClusterSpec with FailureDetectorPuppetStrategy -class SingletonClusterWithFailureDetectorPuppetMultiJvmNode2 extends SingletonClusterSpec with FailureDetectorPuppetStrategy +class SingletonClusterWithFailureDetectorPuppetMultiJvmNode1 extends SingletonClusterSpec(failureDetectorPuppet = true) +class SingletonClusterWithFailureDetectorPuppetMultiJvmNode2 extends SingletonClusterSpec(failureDetectorPuppet = true) -class SingletonClusterWithAccrualFailureDetectorMultiJvmNode1 extends SingletonClusterSpec with AccrualFailureDetectorStrategy -class SingletonClusterWithAccrualFailureDetectorMultiJvmNode2 extends SingletonClusterSpec with AccrualFailureDetectorStrategy +class SingletonClusterWithAccrualFailureDetectorMultiJvmNode1 extends SingletonClusterSpec(failureDetectorPuppet = false) +class SingletonClusterWithAccrualFailureDetectorMultiJvmNode2 extends SingletonClusterSpec(failureDetectorPuppet = false) -abstract class SingletonClusterSpec - extends MultiNodeSpec(SingletonClusterMultiJvmSpec) +abstract class SingletonClusterSpec(multiNodeConfig: SingletonClusterMultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) with MultiNodeClusterSpec { - import SingletonClusterMultiJvmSpec._ + def this(failureDetectorPuppet: Boolean) = this(SingletonClusterMultiNodeConfig(failureDetectorPuppet)) + + import multiNodeConfig._ "A cluster of 2 nodes" must { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala index d030734a71..22e3cae694 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala @@ -13,7 +13,7 @@ import scala.concurrent.util.duration._ import akka.actor.Address import akka.remote.testconductor.Direction -object SplitBrainMultiJvmSpec extends MultiNodeConfig { +case class SplitBrainMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") @@ -26,26 +26,28 @@ object SplitBrainMultiJvmSpec extends MultiNodeConfig { auto-down = on failure-detector.threshold = 4 }""")). - withFallback(MultiNodeClusterSpec.clusterConfig)) + withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) } -class SplitBrainWithFailureDetectorPuppetMultiJvmNode1 extends SplitBrainSpec with FailureDetectorPuppetStrategy -class SplitBrainWithFailureDetectorPuppetMultiJvmNode2 extends SplitBrainSpec with FailureDetectorPuppetStrategy -class SplitBrainWithFailureDetectorPuppetMultiJvmNode3 extends SplitBrainSpec with FailureDetectorPuppetStrategy -class SplitBrainWithFailureDetectorPuppetMultiJvmNode4 extends SplitBrainSpec with FailureDetectorPuppetStrategy -class SplitBrainWithFailureDetectorPuppetMultiJvmNode5 extends SplitBrainSpec with FailureDetectorPuppetStrategy +class SplitBrainWithFailureDetectorPuppetMultiJvmNode1 extends SplitBrainSpec(failureDetectorPuppet = true) +class SplitBrainWithFailureDetectorPuppetMultiJvmNode2 extends SplitBrainSpec(failureDetectorPuppet = true) +class SplitBrainWithFailureDetectorPuppetMultiJvmNode3 extends SplitBrainSpec(failureDetectorPuppet = true) +class SplitBrainWithFailureDetectorPuppetMultiJvmNode4 extends SplitBrainSpec(failureDetectorPuppet = true) +class SplitBrainWithFailureDetectorPuppetMultiJvmNode5 extends SplitBrainSpec(failureDetectorPuppet = true) -class SplitBrainWithAccrualFailureDetectorMultiJvmNode1 extends SplitBrainSpec with AccrualFailureDetectorStrategy -class SplitBrainWithAccrualFailureDetectorMultiJvmNode2 extends SplitBrainSpec with AccrualFailureDetectorStrategy -class SplitBrainWithAccrualFailureDetectorMultiJvmNode3 extends SplitBrainSpec with AccrualFailureDetectorStrategy -class SplitBrainWithAccrualFailureDetectorMultiJvmNode4 extends SplitBrainSpec with AccrualFailureDetectorStrategy -class SplitBrainWithAccrualFailureDetectorMultiJvmNode5 extends SplitBrainSpec with AccrualFailureDetectorStrategy +class SplitBrainWithAccrualFailureDetectorMultiJvmNode1 extends SplitBrainSpec(failureDetectorPuppet = false) +class SplitBrainWithAccrualFailureDetectorMultiJvmNode2 extends SplitBrainSpec(failureDetectorPuppet = false) +class SplitBrainWithAccrualFailureDetectorMultiJvmNode3 extends SplitBrainSpec(failureDetectorPuppet = false) +class SplitBrainWithAccrualFailureDetectorMultiJvmNode4 extends SplitBrainSpec(failureDetectorPuppet = false) +class SplitBrainWithAccrualFailureDetectorMultiJvmNode5 extends SplitBrainSpec(failureDetectorPuppet = false) -abstract class SplitBrainSpec - extends MultiNodeSpec(SplitBrainMultiJvmSpec) +abstract class SplitBrainSpec(multiNodeConfig: SplitBrainMultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) with MultiNodeClusterSpec { - import SplitBrainMultiJvmSpec._ + def this(failureDetectorPuppet: Boolean) = this(SplitBrainMultiNodeConfig(failureDetectorPuppet)) + + import multiNodeConfig._ val side1 = IndexedSeq(first, second) val side2 = IndexedSeq(third, fourth, fifth) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index e1b3571bd2..ab521f4a5b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -31,11 +31,11 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { """)) } -class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy -class SunnyWeatherMultiJvmNode2 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy -class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy -class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy -class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy +class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec +class SunnyWeatherMultiJvmNode2 extends SunnyWeatherSpec +class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec +class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec +class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec abstract class SunnyWeatherSpec extends MultiNodeSpec(SunnyWeatherMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index e3e446bd50..4df6032116 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -23,12 +23,12 @@ object TransitionMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString("akka.cluster.periodic-tasks-initial-delay = 300 s # turn off all periodic tasks")). - withFallback(MultiNodeClusterSpec.clusterConfig)) + withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } -class TransitionMultiJvmNode1 extends TransitionSpec with FailureDetectorPuppetStrategy -class TransitionMultiJvmNode2 extends TransitionSpec with FailureDetectorPuppetStrategy -class TransitionMultiJvmNode3 extends TransitionSpec with FailureDetectorPuppetStrategy +class TransitionMultiJvmNode1 extends TransitionSpec +class TransitionMultiJvmNode2 extends TransitionSpec +class TransitionMultiJvmNode3 extends TransitionSpec abstract class TransitionSpec extends MultiNodeSpec(TransitionMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala index ca5c8f3265..97e15c0e4d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -14,7 +14,7 @@ import akka.actor.Address import akka.remote.testconductor.{ RoleName, Direction } import scala.concurrent.util.duration._ -object UnreachableNodeRejoinsClusterMultiJvmSpec extends MultiNodeConfig { +case class UnreachableNodeRejoinsClusterMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") @@ -23,20 +23,23 @@ object UnreachableNodeRejoinsClusterMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy -class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy -class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy -class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy +class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true) +class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true) +class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true) +class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true) -class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy -class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy -class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy -class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy +class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = false) +class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = false) +class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = false) +class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = false) -abstract class UnreachableNodeRejoinsClusterSpec - extends MultiNodeSpec(UnreachableNodeRejoinsClusterMultiJvmSpec) +abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNodeRejoinsClusterMultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) with MultiNodeClusterSpec { - import UnreachableNodeRejoinsClusterMultiJvmSpec._ + + def this(failureDetectorPuppet: Boolean) = this(UnreachableNodeRejoinsClusterMultiNodeConfig(failureDetectorPuppet)) + + import multiNodeConfig._ def allBut(role: RoleName, roles: Seq[RoleName] = roles): Seq[RoleName] = { roles.filterNot(_ == role) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 73364b853e..38ac359de4 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -26,8 +26,10 @@ object ClusterSpec { auto-down = off periodic-tasks-initial-delay = 120 seconds // turn off scheduled tasks publish-stats-interval = 0 s # always, when it happens + failure-detector.implementation-class = akka.cluster.FailureDetectorPuppet } akka.actor.provider = "akka.remote.RemoteActorRefProvider" + akka.remote.log-remote-lifecycle-events = off akka.remote.netty.port = 0 # akka.loglevel = DEBUG """ @@ -41,9 +43,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { val selfAddress = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.address - val failureDetector = new FailureDetectorPuppet(system) - - val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) + val cluster = Cluster(system) def clusterView = cluster.readView def leaderActions(): Unit = { diff --git a/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala index 9ddc9942b0..869df97f84 100644 --- a/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala +++ b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala @@ -6,6 +6,7 @@ package akka.cluster import akka.actor.{ Address, ActorSystem } import akka.event.{ Logging, LogSource } +import akka.remote.testkit.MultiNodeConfig /** * User controllable "puppet" failure detector. @@ -13,8 +14,6 @@ import akka.event.{ Logging, LogSource } class FailureDetectorPuppet(system: ActorSystem, settings: ClusterSettings) extends FailureDetector { import java.util.concurrent.ConcurrentHashMap - def this(system: ActorSystem) = this(system, new ClusterSettings(system.settings.config, system.name)) - trait Status object Up extends Status object Down extends Status @@ -63,3 +62,4 @@ class FailureDetectorPuppet(system: ActorSystem, settings: ClusterSettings) exte connections.clear() } } + From d895806b6e908ecd8973ccdd2e1af2234c15acbf Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 7 Sep 2012 15:48:39 +0200 Subject: [PATCH 03/14] Switching to autoselecting scala version for the artifact of akka-cluster --- akka-docs/cluster/cluster-usage.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-docs/cluster/cluster-usage.rst b/akka-docs/cluster/cluster-usage.rst index 7bf4e754db..0b72dbce70 100644 --- a/akka-docs/cluster/cluster-usage.rst +++ b/akka-docs/cluster/cluster-usage.rst @@ -18,7 +18,7 @@ The Akka cluster is a separate jar file. Make sure that you have the following d .. parsed-literal:: - "com.typesafe.akka" % "akka-cluster_|scalaVersion|" % "2.1-SNAPSHOT" + "com.typesafe.akka" %% "akka-cluster" % "2.1-SNAPSHOT" If you are using the latest nightly build you should pick a timestamped Akka version from ``_. From 5169630c4e6fff3399060d7927e3283c7d210e38 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 7 Sep 2012 16:03:37 +0200 Subject: [PATCH 04/14] #2427 - Hiding AkkaSpec from FSMDocSpec --- akka-docs/scala/code/docs/actor/FSMDocSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-docs/scala/code/docs/actor/FSMDocSpec.scala b/akka-docs/scala/code/docs/actor/FSMDocSpec.scala index ff9ae0cc14..5bc1ea8d70 100644 --- a/akka-docs/scala/code/docs/actor/FSMDocSpec.scala +++ b/akka-docs/scala/code/docs/actor/FSMDocSpec.scala @@ -5,11 +5,11 @@ package docs.actor import language.postfixOps +import akka.testkit.{ AkkaSpec ⇒ MyFavoriteTestFrameWorkPlusAkkaTestKit } //#test-code -import akka.testkit.AkkaSpec import akka.actor.Props -class FSMDocSpec extends AkkaSpec { +class FSMDocSpec extends MyFavoriteTestFrameWorkPlusAkkaTestKit { "simple finite state machine" must { //#fsm-code-elided From 4e60405708ea1dc92b9ce0419b2a12d4c1e9bc22 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 7 Sep 2012 16:14:04 +0200 Subject: [PATCH 05/14] #2476 - Fixing broken links to typesafe.com --- akka-docs/intro/getting-started.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/akka-docs/intro/getting-started.rst b/akka-docs/intro/getting-started.rst index 9fae5281a8..11b699fa8f 100644 --- a/akka-docs/intro/getting-started.rst +++ b/akka-docs/intro/getting-started.rst @@ -14,9 +14,9 @@ The best way to start learning Akka is to download the Typesafe Stack and either the Akka Getting Started Tutorials or check out one of Akka Template Projects. Both comes in several flavours depending on your development environment preferences. -- `Download Typesafe Stack `_ -- `Getting Started Tutorials `_ -- `Template Projects `_ +- `Download Typesafe Stack `_ +- `Getting Started Tutorials `_ +- `Template Projects `_ Download -------- From 79c9cdd98e934f0587f615fca954fe7c6645b582 Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 7 Sep 2012 16:56:42 +0200 Subject: [PATCH 06/14] unbreak the akka-actor-nightly build --- akka-actor/src/main/scala/akka/actor/FSM.scala | 2 +- akka-testkit/src/main/scala/akka/testkit/TestKit.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index a42066d0d2..51d0c290dc 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -597,7 +597,7 @@ trait FSM[S, D] extends Listeners with ActorLogging { val timeout = if (currentState.timeout.isDefined) currentState.timeout else stateTimeouts(currentState.stateName) if (timeout.isDefined) { val t = timeout.get - if (t.finite_? && t.length >= 0) { + if (t.isFinite && t.length >= 0) { import context.dispatcher timeoutFuture = Some(context.system.scheduler.scheduleOnce(t, self, TimeoutMarker(generation))) } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index e52fe741ea..a1721d2ffe 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -568,7 +568,7 @@ trait TestKitBase { val message = if (max == 0.seconds) { queue.pollFirst - } else if (max.finite_?) { + } else if (max.isFinite) { queue.pollFirst(max.length, max.unit) } else { queue.takeFirst From 83e7f5d6d626cd9a4eea6b5d191325e7c58afee0 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 7 Sep 2012 17:42:15 +0200 Subject: [PATCH 07/14] Incorparate review comments, see #2473 * Also added ClusterSettings in constructor of ClusterDaemon, because that will be needed to decide if the metrics actor is to be started --- .../src/main/scala/akka/cluster/Cluster.scala | 2 +- .../main/scala/akka/cluster/ClusterDaemon.scala | 16 +++++++--------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index bb027afade..6d06664004 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -138,7 +138,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { // create supervisor for daemons under path "/system/cluster" private val clusterDaemons: ActorRef = { - system.asInstanceOf[ActorSystemImpl].systemActorOf(Props[ClusterDaemon]. + system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new ClusterDaemon(settings)). withDispatcher(UseDispatcher), name = "cluster") } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 063fc54363..f84bf100ea 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -139,7 +139,7 @@ private[cluster] object ClusterLeaderAction { * * Supervisor managing the different Cluster daemons. */ -private[cluster] final class ClusterDaemon extends Actor with ActorLogging { +private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Actor with ActorLogging { // Important - don't use Cluster(context.system) here because that would // cause deadlock. The Cluster extension is currently being created and is waiting @@ -165,9 +165,7 @@ private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging { import ClusterHeartbeatSender._ val cluster = Cluster(context.system) - def selfAddress = cluster.selfAddress - def clusterScheduler = cluster.scheduler - def failureDetector = cluster.failureDetector + import cluster.{ selfAddress, scheduler, failureDetector } import cluster.settings._ val vclockNode = VectorClock.Node(selfAddress.toString) @@ -191,32 +189,32 @@ private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging { // start periodic gossip to random nodes in cluster val gossipTask = - FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(GossipInterval), GossipInterval) { + FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(GossipInterval), GossipInterval) { self ! GossipTick } // start periodic heartbeat to all nodes in cluster val heartbeatTask = - FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval), HeartbeatInterval) { + FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval), HeartbeatInterval) { self ! HeartbeatTick } // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list) val failureDetectorReaperTask = - FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval), UnreachableNodesReaperInterval) { + FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval), UnreachableNodesReaperInterval) { self ! ReapUnreachableTick } // start periodic leader action management (only applies for the current leader) private val leaderActionsTask = - FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval), LeaderActionsInterval) { + FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval), LeaderActionsInterval) { self ! LeaderActionsTick } // start periodic publish of current state private val publishStateTask: Option[Cancellable] = if (PublishStatsInterval == Duration.Zero) None - else Some(FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(PublishStatsInterval), PublishStatsInterval) { + else Some(FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(PublishStatsInterval), PublishStatsInterval) { self ! PublishStatsTick }) From ac55ce769306e5c5bf689ec15d0b1562821fe7bb Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 7 Sep 2012 18:03:46 +0200 Subject: [PATCH 08/14] #2432 - turning off smarty pants so that our quotes don't get mangled in code samples. --- akka-docs/_sphinx/pygments/setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/akka-docs/_sphinx/pygments/setup.py b/akka-docs/_sphinx/pygments/setup.py index 7c86a6a681..a8ef553e8f 100644 --- a/akka-docs/_sphinx/pygments/setup.py +++ b/akka-docs/_sphinx/pygments/setup.py @@ -15,5 +15,6 @@ setup( description = __doc__, author = "Akka", packages = ['styles'], - entry_points = entry_points + entry_points = entry_points, + html_use_smartypants = false ) From 0c7094f6af4e0ee592bb0d6e06be3d5e94b4eea2 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 7 Sep 2012 18:29:31 +0200 Subject: [PATCH 09/14] #2441 - Updating howto.rst to include all the summer of blog posts and removed the howdoi.rst --- akka-docs/general/howdoi.rst | 31 --------------- akka-docs/general/index.rst | 3 +- akka-docs/scala/howto.rst | 73 +++++++++++++++++++++++++++++++++--- 3 files changed, 69 insertions(+), 38 deletions(-) delete mode 100644 akka-docs/general/howdoi.rst diff --git a/akka-docs/general/howdoi.rst b/akka-docs/general/howdoi.rst deleted file mode 100644 index 0dd9a12ed9..0000000000 --- a/akka-docs/general/howdoi.rst +++ /dev/null @@ -1,31 +0,0 @@ -.. _howdoi: - -How do I … -================================ - -This section of the Akka Documentation tries to answer common usage questions. - -… deal with blocking third-party code? --------------------------------------- - -Some times you cannot avoid doing blocking, and in that case you might want to explore the following: - - 1. Isolate the blocking to a dedicated ``ExecutionContext`` - 2. Configure the actor to have a bounded-mailbox as to prevent from excessive mailbox sizes - -.. note:: - - Before you do anything at all, measure! - - -… use persistence with Akka? ----------------------------- - -You just use it? -You might want to have a look at the answer to the question about blocking though. - -… use a pool of connections/whatnots ------------------------------------- - -You most probably want to wrap that pooling service as an Akka Extension, -see the docs for documentation on Java / Scala Extensions. \ No newline at end of file diff --git a/akka-docs/general/index.rst b/akka-docs/general/index.rst index edfebff5c5..e76a81a80c 100644 --- a/akka-docs/general/index.rst +++ b/akka-docs/general/index.rst @@ -11,5 +11,4 @@ General remoting jmm message-send-semantics - configuration - howdoi + configuration \ No newline at end of file diff --git a/akka-docs/scala/howto.rst b/akka-docs/scala/howto.rst index c588c2fe25..bcaa456b82 100644 --- a/akka-docs/scala/howto.rst +++ b/akka-docs/scala/howto.rst @@ -19,7 +19,7 @@ Throttling Messages Contributed by: Kaspar Fischer -A message throttler that ensures that messages are not sent out at too high a rate. +"A message throttler that ensures that messages are not sent out at too high a rate." The pattern is described `here `_. @@ -28,9 +28,9 @@ Balancing Workload Across Nodes Contributed by: Derek Wyatt -Often times, people want the functionality of the BalancingDispatcher with the +"Often times, people want the functionality of the BalancingDispatcher with the stipulation that the Actors doing the work have distinct Mailboxes on remote -nodes. In this post we’ll explore the implementation of such a concept. +nodes. In this post we’ll explore the implementation of such a concept." The pattern is described `here `_. @@ -39,15 +39,78 @@ Ordered Termination Contributed by: Derek Wyatt -When an Actor stops, its children stop in an undefined order. Child termination is +"When an Actor stops, its children stop in an undefined order. Child termination is asynchronous and thus non-deterministic. If an Actor has children that have order dependencies, then you might need to ensure a particular shutdown order of those children so that their postStop() methods get -called in the right order. +called in the right order." The pattern is described `here `_. +Akka AMQP Proxies +================= + +Contributed by: Fabrice Drouin + +"“AMQP proxies” is a simple way of integrating AMQP with Akka to distribute jobs across a network of computing nodes. +You still write “local” code, have very little to configure, and end up with a distributed, elastic, +fault-tolerant grid where computing nodes can be written in nearly every programming language." + +The pattern is described `here `_. + +Shutdown Patterns in Akka 2 +=========================== + +Contributed by: Derek Wyatt + +“How do you tell Akka to shut down the ActorSystem when everything’s finished? +It turns out that there’s no magical flag for this, no configuration setting, no special callback you can register for, +and neither will the illustrious shutdown fairy grace your application with her glorious presence at that perfect moment. +She’s just plain mean. + +In this post, we’ll discuss why this is the case and provide you with a simple option for shutting down “at the right time”, +as well as a not-so-simple-option for doing the exact same thing." + +The pattern is described `here `_. + +Distributed (in-memory) graph processing with Akka +================================================== + +Contributed by: Adelbert Chang + +"Graphs have always been an interesting structure to study in both mathematics and computer science (among other fields), +and have become even more interesting in the context of online social networks such as Facebook and Twitter, +whose underlying network structures are nicely represented by graphs." + +The pattern is described `here `_. + +Case Study: An Auto-Updating Cache Using Actors +=============================================== + +Contributed by: Eric Pederson + +"We recently needed to build a caching system in front of a slow backend system with the following requirements: + +The data in the backend system is constantly being updated so the caches need to be updated every N minutes. +Requests to the backend system need to be throttled. +The caching system we built used Akka actors and Scala’s support for functions as first class objects." + +The pattern is described `here `_. + +Discovering message flows in actor systems with the Spider Pattern +================================================================== + +Contributed by: Raymond Roestenburg + +"Building actor systems is fun but debugging them can be difficult, you mostly end up browsing through many log files +on several machines to find out what’s going on. I’m sure you have browsed through logs and thought, +“Hey, where did that message go?”, “Why did this message cause that effect” or “Why did this actor never get a message?” + +This is where the Spider pattern comes in." + +The pattern is described `here `_. + Template Pattern ================ From 14f66d9c050ee53ea3ab6e64c8fd59b7d8e89603 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 10 Sep 2012 11:45:13 +0200 Subject: [PATCH 10/14] received Supervise from unregistered child Actor, see #2485 * The problem was that RemoteRouteeProvider didn't attachChild when creating routees * Added test for it --- .../akka/routing/RemoteRouterConfig.scala | 6 ++--- .../scala/akka/remote/RemoteRouterSpec.scala | 23 +++++++++++++++++-- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala index 0584296238..73bf6948fe 100644 --- a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala +++ b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala @@ -6,7 +6,6 @@ package akka.routing import com.typesafe.config.ConfigFactory import akka.actor.ActorContext import akka.actor.ActorRef -import akka.actor.ActorSystemImpl import akka.actor.Deploy import akka.actor.InternalActorRef import akka.actor.Props @@ -18,6 +17,7 @@ import akka.actor.Address import scala.collection.JavaConverters._ import java.util.concurrent.atomic.AtomicInteger import java.lang.IllegalStateException +import akka.actor.ActorCell /** * [[akka.routing.RouterConfig]] implementation for remote deployment on defined @@ -72,12 +72,10 @@ class RemoteRouteeProvider(nodes: Iterable[Address], _context: ActorContext, _re format context.self.path.toString) case (n, Nil, ys) ⇒ - val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559 IndexedSeq.empty[ActorRef] ++ (for (i ← 1 to nrOfInstances) yield { val name = "c" + childNameCounter.incrementAndGet val deploy = Deploy("", ConfigFactory.empty(), props.routerConfig, RemoteScope(nodeAddressIter.next)) - impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name, - systemService = false, Some(deploy), lookupDeploy = false, async = false) + context.asInstanceOf[ActorCell].attachChild(props.withDeploy(deploy), name, systemService = false) }) case (_, xs, _) ⇒ throw new ConfigurationException("Remote target.nodes can not be combined with routees for [%s]" diff --git a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala index 1255489691..8d7effc5d9 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala @@ -67,10 +67,10 @@ akka.actor.deployment { nr-of-instances = 4 } }""").withFallback(system.settings.config) - val other = ActorSystem("remote-sys", conf) + val otherSystem = ActorSystem("remote-sys", conf) override def atTermination() { - other.shutdown() + otherSystem.shutdown() } "A Remote Router" must { @@ -199,6 +199,25 @@ akka.actor.deployment { system.stop(router) } + "set supplied supervisorStrategy" in { + val escalator = OneForOneStrategy() { + case e ⇒ + println("## " + e) + testActor ! e; SupervisorStrategy.Escalate + } + val router = system.actorOf(Props.empty.withRouter(new RemoteRouterConfig( + RoundRobinRouter(1, supervisorStrategy = escalator), + Seq(Address("akka", "remote-sys", "localhost", 12347)))), "blub3") + + router ! CurrentRoutees + EventFilter[ActorKilledException](occurrences = 1) intercept { + EventFilter[ActorKilledException](occurrences = 1).intercept { + expectMsgType[RouterRoutees].routees.head ! Kill + }(otherSystem) + } + expectMsgType[ActorKilledException] + } + } } From bb5ab968dbc372c4554aa634a8ac1089b3dcad2e Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 10 Sep 2012 16:20:38 +0200 Subject: [PATCH 11/14] Make RoutingSpec more robust, see #2484 --- .../test/scala/akka/routing/RoutingSpec.scala | 29 +++++++------------ 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index b7000779af..782f4fb399 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -102,8 +102,6 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with } "be able to send their routees" in { - val doneLatch = new TestLatch(1) - class TheActor extends Actor { val routee1 = context.actorOf(Props[TestActor], "routee1") val routee2 = context.actorOf(Props[TestActor], "routee2") @@ -114,19 +112,18 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with within = 5 seconds))) def receive = { - case RouterRoutees(iterable) ⇒ - iterable.exists(_.path.name == "routee1") must be(true) - iterable.exists(_.path.name == "routee2") must be(true) - iterable.exists(_.path.name == "routee3") must be(true) - doneLatch.countDown() - case "doIt" ⇒ - router ! CurrentRoutees + case "doIt" ⇒ router ! CurrentRoutees + case routees: RouterRoutees ⇒ testActor forward routees } } val theActor = system.actorOf(Props(new TheActor), "theActor") theActor ! "doIt" - Await.ready(doneLatch, remaining) + val routees = expectMsgPF() { + case RouterRoutees(routees) ⇒ routees.toSet + } + + routees.map(_.path.name) must be(Set("routee1", "routee2", "routee3")) } "use configured nr-of-instances when FromConfig" in { @@ -226,14 +223,9 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with } "send message to connection" in { - val doneLatch = new TestLatch(1) - - val counter = new AtomicInteger(0) - class Actor1 extends Actor { def receive = { - case "end" ⇒ doneLatch.countDown() - case _ ⇒ counter.incrementAndGet + case msg ⇒ testActor forward msg } } @@ -241,9 +233,8 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with routedActor ! "hello" routedActor ! "end" - Await.ready(doneLatch, remaining) - - counter.get must be(1) + expectMsg("hello") + expectMsg("end") } } From 3cac1928412672cc8d4c23a751de4dc0bef18d16 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 10 Sep 2012 16:37:29 +0200 Subject: [PATCH 12/14] There was an error in the placement of the smartypants setting, fixed that and added exit code to the output if pygment installation fails. --- akka-docs/conf.py | 1 + project/Sphinx.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/akka-docs/conf.py b/akka-docs/conf.py index 2ca3a5c66c..63afaa3bb8 100644 --- a/akka-docs/conf.py +++ b/akka-docs/conf.py @@ -50,6 +50,7 @@ html_show_sourcelink = False html_show_sphinx = False html_show_copyright = True htmlhelp_basename = 'Akkadoc' +html_use_smartypants = False html_add_permalinks = '' html_context = { diff --git a/project/Sphinx.scala b/project/Sphinx.scala index be35d83916..b349265b60 100644 --- a/project/Sphinx.scala +++ b/project/Sphinx.scala @@ -52,7 +52,7 @@ object Sphinx { val env = "PYTHONPATH" -> target.absolutePath s.log.debug("Command: " + command.mkString(" ") + "\nEnv:" + env) val exitCode = Process(command, cwd, env) ! logger - if (exitCode != 0) sys.error("Failed to install custom Sphinx pygments styles.") + if (exitCode != 0) sys.error("Failed to install custom Sphinx pygments styles: exit code " + exitCode) (pygments * ("*.egg-info" | "build" | "temp")).get.foreach(IO.delete) s.log.info("Sphinx pygments styles installed at: " + target) } From d73c27106f86d32f3d7384a00d2773b414988bc3 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 10 Sep 2012 16:38:49 +0200 Subject: [PATCH 13/14] Removing the smartypants setting from setup.py --- akka-docs/_sphinx/pygments/setup.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/akka-docs/_sphinx/pygments/setup.py b/akka-docs/_sphinx/pygments/setup.py index a8ef553e8f..cdfa31d397 100644 --- a/akka-docs/_sphinx/pygments/setup.py +++ b/akka-docs/_sphinx/pygments/setup.py @@ -15,6 +15,5 @@ setup( description = __doc__, author = "Akka", packages = ['styles'], - entry_points = entry_points, - html_use_smartypants = false + entry_points = entry_points ) From 4b8a73cc7f2dad6c4f247920f751b5a5fe41300d Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 11 Sep 2012 09:28:05 +0200 Subject: [PATCH 14/14] Add comment about attachChild --- .../src/main/scala/akka/routing/RemoteRouterConfig.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala index 73bf6948fe..92d208092e 100644 --- a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala +++ b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala @@ -75,6 +75,10 @@ class RemoteRouteeProvider(nodes: Iterable[Address], _context: ActorContext, _re IndexedSeq.empty[ActorRef] ++ (for (i ← 1 to nrOfInstances) yield { val name = "c" + childNameCounter.incrementAndGet val deploy = Deploy("", ConfigFactory.empty(), props.routerConfig, RemoteScope(nodeAddressIter.next)) + + // attachChild means that the provider will treat this call as if possibly done out of the wrong + // context and use RepointableActorRef instead of LocalActorRef. Seems like a slightly sub-optimal + // choice in a corner case (and hence not worth fixing). context.asInstanceOf[ActorCell].attachChild(props.withDeploy(deploy), name, systemService = false) })