diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 30e425dafd..dadda1ba33 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -63,6 +63,12 @@ akka { failure-detector { + # FQCN of the failure detector implementation. + # It must implement akka.cluster.akka.cluster and + # have constructor with akka.actor.ActorSystem and + # akka.cluster.ClusterSettings parameters + implementation-class = "akka.cluster.AccrualFailureDetector" + # defines the failure detector threshold # A low threshold is prone to generate many wrong suspicions but ensures # a quick detection in the event of a real crash. Conversely, a high @@ -84,8 +90,6 @@ akka { # network drop. acceptable-heartbeat-pause = 3s - implementation-class = "akka.cluster.AccrualFailureDetector" - max-sample-size = 1000 } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index d648880396..ab4ca8b37b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -40,19 +40,7 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { override def lookup = Cluster - override def createExtension(system: ExtendedActorSystem): Cluster = { - val clusterSettings = new ClusterSettings(system.settings.config, system.name) - - val failureDetector = { - import clusterSettings.{ FailureDetectorImplementationClass ⇒ fqcn } - system.dynamicAccess.createInstanceFor[FailureDetector]( - fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> clusterSettings)).fold( - e ⇒ throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString), - identity) - } - - new Cluster(system, failureDetector) - } + override def createExtension(system: ExtendedActorSystem): Cluster = new Cluster(system) } /** @@ -69,7 +57,7 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { * if (Cluster(system).isLeader) { ... } * }}} */ -class Cluster(val system: ExtendedActorSystem, val failureDetector: FailureDetector) extends Extension with ClusterEnvironment { +class Cluster(val system: ExtendedActorSystem) extends Extension { import ClusterEvent._ @@ -88,6 +76,14 @@ class Cluster(val system: ExtendedActorSystem, val failureDetector: FailureDetec log.info("Cluster Node [{}] - is starting up...", selfAddress) + val failureDetector = { + import settings.{ FailureDetectorImplementationClass ⇒ fqcn } + system.dynamicAccess.createInstanceFor[FailureDetector]( + fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> settings)).fold( + e ⇒ throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString), + identity) + } + // ======================================================== // ===================== WORK DAEMONS ===================== // ======================================================== @@ -142,16 +138,14 @@ class Cluster(val system: ExtendedActorSystem, val failureDetector: FailureDetec // create supervisor for daemons under path "/system/cluster" private val clusterDaemons: ActorRef = { - system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new ClusterDaemon(this)). + system.asInstanceOf[ActorSystemImpl].systemActorOf(Props[ClusterDaemon]. withDispatcher(UseDispatcher), name = "cluster") } /** * INTERNAL API */ - private[cluster] lazy val clusterCore: ActorRef = { - // this val must be lazy for correct initialization order, - // ClusterDaemon children may use for example subscribe before we get the GetClusterCoreRef reply + private[cluster] val clusterCore: ActorRef = { implicit val timeout = system.settings.CreationTimeout Await.result((clusterDaemons ? InternalClusterAction.GetClusterCoreRef).mapTo[ActorRef], timeout.duration) } @@ -219,10 +213,12 @@ class Cluster(val system: ExtendedActorSystem, val failureDetector: FailureDetec // ======================================================== /** - * Make it possible to override/configure seedNodes from tests without - * specifying in config. Addresses are unknown before startup time. + * Make it possible to join the specified seed nodes without defining them + * in config. Especially useful from tests when Addresses are unknown + * before startup time. */ - private[cluster] def seedNodes: IndexedSeq[Address] = SeedNodes + private[cluster] def joinSeedNodes(seedNodes: IndexedSeq[Address]): Unit = + clusterCore ! InternalClusterAction.JoinSeedNodes(seedNodes) /** * INTERNAL API. diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index ee8d1e0374..063fc54363 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -58,6 +58,12 @@ private[cluster] object InternalClusterAction { */ case class JoinTo(address: Address) extends ClusterMessage + /** + * Command to initiate the process to join the specified + * seed nodes. + */ + case class JoinSeedNodes(seedNodes: IndexedSeq[Address]) + /** * Start message of the process to join one of the seed nodes. * The node sends `InitJoin` to all seed nodes, which replies @@ -128,33 +134,21 @@ private[cluster] object ClusterLeaderAction { case class Remove(address: Address) extends ClusterMessage } -/** - * INTERNAL API - * - * The contextual pieces that ClusterDaemon actors need. - * Makes it easier to test the actors without using the Cluster extension. - */ -private[cluster] trait ClusterEnvironment { - private[cluster] def settings: ClusterSettings - private[cluster] def failureDetector: FailureDetector - private[cluster] def selfAddress: Address - private[cluster] def scheduler: Scheduler - private[cluster] def seedNodes: IndexedSeq[Address] - private[cluster] def shutdown(): Unit -} - /** * INTERNAL API. * * Supervisor managing the different Cluster daemons. */ -private[cluster] final class ClusterDaemon(environment: ClusterEnvironment) extends Actor with ActorLogging { +private[cluster] final class ClusterDaemon extends Actor with ActorLogging { - val configuredDispatcher = environment.settings.UseDispatcher - val core = context.actorOf(Props(new ClusterCoreDaemon(environment)). - withDispatcher(configuredDispatcher), name = "core") - val heartbeat = context.actorOf(Props(new ClusterHeartbeatDaemon(environment)). - withDispatcher(configuredDispatcher), name = "heartbeat") + // Important - don't use Cluster(context.system) here because that would + // cause deadlock. The Cluster extension is currently being created and is waiting + // for response from GetClusterCoreRef in its constructor. + + val core = context.actorOf(Props[ClusterCoreDaemon]. + withDispatcher(context.props.dispatcher), name = "core") + val heartbeat = context.actorOf(Props[ClusterHeartbeatDaemon]. + withDispatcher(context.props.dispatcher), name = "heartbeat") def receive = { case InternalClusterAction.GetClusterCoreRef ⇒ sender ! core @@ -165,16 +159,16 @@ private[cluster] final class ClusterDaemon(environment: ClusterEnvironment) exte /** * INTERNAL API. */ -private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) extends Actor with ActorLogging { +private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging { import ClusterLeaderAction._ import InternalClusterAction._ import ClusterHeartbeatSender._ - def selfAddress = environment.selfAddress - def clusterScheduler = environment.scheduler - def failureDetector = environment.failureDetector - val settings = environment.settings - import settings._ + val cluster = Cluster(context.system) + def selfAddress = cluster.selfAddress + def clusterScheduler = cluster.scheduler + def failureDetector = cluster.failureDetector + import cluster.settings._ val vclockNode = VectorClock.Node(selfAddress.toString) val selfHeartbeat = Heartbeat(selfAddress) @@ -186,11 +180,11 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) var stats = ClusterStats() - val heartbeatSender = context.actorOf(Props(new ClusterHeartbeatSender(environment)). + val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender]. withDispatcher(UseDispatcher), name = "heartbeatSender") - val coreSender = context.actorOf(Props(new ClusterCoreSender(selfAddress)). + val coreSender = context.actorOf(Props[ClusterCoreSender]. withDispatcher(UseDispatcher), name = "coreSender") - val publisher = context.actorOf(Props(new ClusterDomainEventPublisher(environment)). + val publisher = context.actorOf(Props[ClusterDomainEventPublisher]. withDispatcher(UseDispatcher), name = "publisher") import context.dispatcher @@ -227,14 +221,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) }) override def preStart(): Unit = { - if (AutoJoin) { - // only the node which is named first in the list of seed nodes will join itself - if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress) - self ! JoinTo(selfAddress) - else - context.actorOf(Props(new JoinSeedNodeProcess(environment)). - withDispatcher(UseDispatcher), name = "joinSeedNodeProcess") - } + if (AutoJoin) self ! JoinSeedNodes(SeedNodes) } override def postStop(): Unit = { @@ -248,6 +235,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) def uninitialized: Actor.Receive = { case InitJoin ⇒ // skip, not ready yet case JoinTo(address) ⇒ join(address) + case JoinSeedNodes(seedNodes) ⇒ joinSeedNodes(seedNodes) case msg: SubscriptionMessage ⇒ publisher forward msg case _: Tick ⇒ // ignore periodic tasks until initialized } @@ -282,6 +270,15 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) def initJoin(): Unit = sender ! InitJoinAck(selfAddress) + def joinSeedNodes(seedNodes: IndexedSeq[Address]): Unit = { + // only the node which is named first in the list of seed nodes will join itself + if (seedNodes.isEmpty || seedNodes.head == selfAddress) + self ! JoinTo(selfAddress) + else + context.actorOf(Props(new JoinSeedNodeProcess(seedNodes)). + withDispatcher(UseDispatcher), name = "joinSeedNodeProcess") + } + /** * Try to join this cluster node with the node specified by 'address'. * A 'Join(thisNodeAddress)' command is sent to the node to join. @@ -393,7 +390,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) // make sure the final (removed) state is published // before shutting down implicit val timeout = Timeout(5 seconds) - publisher ? PublishDone onComplete { case _ ⇒ environment.shutdown() } + publisher ? PublishDone onComplete { case _ ⇒ cluster.shutdown() } } /** @@ -796,8 +793,6 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) } } - def seedNodes: IndexedSeq[Address] = environment.seedNodes - def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] = if (addresses.isEmpty) None else Some(addresses(ThreadLocalRandom.current nextInt addresses.size)) @@ -865,22 +860,22 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) * 5. seed3 retries the join procedure and gets acks from seed2 first, and then joins to seed2 * */ -private[cluster] final class JoinSeedNodeProcess(environment: ClusterEnvironment) extends Actor with ActorLogging { +private[cluster] final class JoinSeedNodeProcess(seedNodes: IndexedSeq[Address]) extends Actor with ActorLogging { import InternalClusterAction._ - def selfAddress = environment.selfAddress + def selfAddress = Cluster(context.system).selfAddress - if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress) + if (seedNodes.isEmpty || seedNodes.head == selfAddress) throw new IllegalArgumentException("Join seed node should not be done") - context.setReceiveTimeout(environment.settings.SeedNodeTimeout) + context.setReceiveTimeout(Cluster(context.system).settings.SeedNodeTimeout) override def preStart(): Unit = self ! JoinSeedNode def receive = { case JoinSeedNode ⇒ // send InitJoin to all seed nodes (except myself) - environment.seedNodes.collect { + seedNodes.collect { case a if a != selfAddress ⇒ context.system.actorFor(context.parent.path.toStringWithAddress(a)) } foreach { _ ! InitJoin } case InitJoinAck(address) ⇒ @@ -901,9 +896,11 @@ private[cluster] final class JoinSeedNodeProcess(environment: ClusterEnvironment /** * INTERNAL API. */ -private[cluster] final class ClusterCoreSender(selfAddress: Address) extends Actor with ActorLogging { +private[cluster] final class ClusterCoreSender extends Actor with ActorLogging { import InternalClusterAction._ + val selfAddress = Cluster(context.system).selfAddress + /** * Looks up and returns the remote cluster command connection for the specific address. */ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index cbcee78cc9..ae1bd6b6ac 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -169,7 +169,7 @@ object ClusterEvent { * Responsible for domain event subscriptions and publishing of * domain events to event bus. */ -private[cluster] final class ClusterDomainEventPublisher(environment: ClusterEnvironment) extends Actor with ActorLogging { +private[cluster] final class ClusterDomainEventPublisher extends Actor with ActorLogging { import InternalClusterAction._ var latestGossip: Gossip = Gossip() diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index 932be34d2f..a7b4d52daa 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -23,10 +23,12 @@ case class Heartbeat(from: Address) extends ClusterMessage * Instantiated as a single instance for each Cluster - e.g. heartbeats are serialized * to Cluster message after message, but concurrent with other types of messages. */ -private[cluster] final class ClusterHeartbeatDaemon(environment: ClusterEnvironment) extends Actor with ActorLogging { +private[cluster] final class ClusterHeartbeatDaemon extends Actor with ActorLogging { + + val failureDetector = Cluster(context.system).failureDetector def receive = { - case Heartbeat(from) ⇒ environment.failureDetector heartbeat from + case Heartbeat(from) ⇒ failureDetector heartbeat from } } @@ -53,7 +55,7 @@ private[cluster] object ClusterHeartbeatSender { * address and thereby reduce the risk of irregular heartbeats to healty * nodes due to broken connections to other nodes. */ -private[cluster] final class ClusterHeartbeatSender(environment: ClusterEnvironment) extends Actor with ActorLogging { +private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging { import ClusterHeartbeatSender._ /** @@ -78,8 +80,7 @@ private[cluster] final class ClusterHeartbeatSender(environment: ClusterEnvironm val workerName = encodeChildName(to.toString) val worker = context.actorFor(workerName) match { case notFound if notFound.isTerminated ⇒ - context.actorOf(Props(new ClusterHeartbeatSenderWorker( - environment.settings.SendCircuitBreakerSettings, clusterHeartbeatConnectionFor(to))), workerName) + context.actorOf(Props(new ClusterHeartbeatSenderWorker(clusterHeartbeatConnectionFor(to))), workerName) case child ⇒ child } worker ! msg @@ -96,17 +97,19 @@ private[cluster] final class ClusterHeartbeatSender(environment: ClusterEnvironm * * @see ClusterHeartbeatSender */ -private[cluster] final class ClusterHeartbeatSenderWorker( - cbSettings: CircuitBreakerSettings, toRef: ActorRef) +private[cluster] final class ClusterHeartbeatSenderWorker(toRef: ActorRef) extends Actor with ActorLogging { import ClusterHeartbeatSender._ - val breaker = CircuitBreaker(context.system.scheduler, - cbSettings.maxFailures, cbSettings.callTimeout, cbSettings.resetTimeout). - onHalfOpen(log.debug("CircuitBreaker Half-Open for: [{}]", toRef)). - onOpen(log.debug("CircuitBreaker Open for [{}]", toRef)). - onClose(log.debug("CircuitBreaker Closed for [{}]", toRef)) + val breaker = { + val cbSettings = Cluster(context.system).settings.SendCircuitBreakerSettings + CircuitBreaker(context.system.scheduler, + cbSettings.maxFailures, cbSettings.callTimeout, cbSettings.resetTimeout). + onHalfOpen(log.debug("CircuitBreaker Half-Open for: [{}]", toRef)). + onOpen(log.debug("CircuitBreaker Open for [{}]", toRef)). + onClose(log.debug("CircuitBreaker Closed for [{}]", toRef)) + } // make sure it will cleanup when not used any more context.setReceiveTimeout(30 seconds) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index e0440394a7..49483d39ef 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -9,30 +9,32 @@ import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.actor.Address -object ClientDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { +case class ClientDowningNodeThatIsUnreachableMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") val fourth = role("fourth") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) } -class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy -class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy -class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy -class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy +class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true) +class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true) +class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true) +class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true) -class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false) +class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false) +class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false) +class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false) -abstract class ClientDowningNodeThatIsUnreachableSpec - extends MultiNodeSpec(ClientDowningNodeThatIsUnreachableMultiJvmSpec) +abstract class ClientDowningNodeThatIsUnreachableSpec(multiNodeConfig: ClientDowningNodeThatIsUnreachableMultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) with MultiNodeClusterSpec { - import ClientDowningNodeThatIsUnreachableMultiJvmSpec._ + def this(failureDetectorPuppet: Boolean) = this(ClientDowningNodeThatIsUnreachableMultiNodeConfig(failureDetectorPuppet)) + + import multiNodeConfig._ "Client of a 4 node cluster" must { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala index 82d90c81b5..5a7308ec92 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -9,30 +9,32 @@ import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.actor.Address -object ClientDowningNodeThatIsUpMultiJvmSpec extends MultiNodeConfig { +case class ClientDowningNodeThatIsUpMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") val fourth = role("fourth") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) } -class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy -class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy -class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy -class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy +class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = true) +class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = true) +class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = true) +class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = true) -class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = false) +class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = false) +class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = false) +class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = false) -abstract class ClientDowningNodeThatIsUpSpec - extends MultiNodeSpec(ClientDowningNodeThatIsUpMultiJvmSpec) +abstract class ClientDowningNodeThatIsUpSpec(multiNodeConfig: ClientDowningNodeThatIsUpMultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) with MultiNodeClusterSpec { - import ClientDowningNodeThatIsUpMultiJvmSpec._ + def this(failureDetectorPuppet: Boolean) = this(ClientDowningNodeThatIsUpMultiNodeConfig(failureDetectorPuppet)) + + import multiNodeConfig._ "Client of a 4 node cluster" must { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala index 89ad712815..97d1e870a5 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala @@ -19,9 +19,9 @@ object ClusterAccrualFailureDetectorMultiJvmSpec extends MultiNodeConfig { withFallback(MultiNodeClusterSpec.clusterConfig)) } -class ClusterAccrualFailureDetectorMultiJvmNode1 extends ClusterAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy -class ClusterAccrualFailureDetectorMultiJvmNode2 extends ClusterAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy -class ClusterAccrualFailureDetectorMultiJvmNode3 extends ClusterAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy +class ClusterAccrualFailureDetectorMultiJvmNode1 extends ClusterAccrualFailureDetectorSpec +class ClusterAccrualFailureDetectorMultiJvmNode2 extends ClusterAccrualFailureDetectorSpec +class ClusterAccrualFailureDetectorMultiJvmNode3 extends ClusterAccrualFailureDetectorSpec abstract class ClusterAccrualFailureDetectorSpec extends MultiNodeSpec(ClusterAccrualFailureDetectorMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala index 1862b8ea40..420b2acacb 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala @@ -12,7 +12,7 @@ import akka.testkit._ import scala.concurrent.util.duration._ import akka.actor.Address -object ConvergenceMultiJvmSpec extends MultiNodeConfig { +case class ConvergenceMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") @@ -20,24 +20,26 @@ object ConvergenceMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString("akka.cluster.failure-detector.threshold = 4")). - withFallback(MultiNodeClusterSpec.clusterConfig)) + withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) } -class ConvergenceWithFailureDetectorPuppetMultiJvmNode1 extends ConvergenceSpec with FailureDetectorPuppetStrategy -class ConvergenceWithFailureDetectorPuppetMultiJvmNode2 extends ConvergenceSpec with FailureDetectorPuppetStrategy -class ConvergenceWithFailureDetectorPuppetMultiJvmNode3 extends ConvergenceSpec with FailureDetectorPuppetStrategy -class ConvergenceWithFailureDetectorPuppetMultiJvmNode4 extends ConvergenceSpec with FailureDetectorPuppetStrategy +class ConvergenceWithFailureDetectorPuppetMultiJvmNode1 extends ConvergenceSpec(failureDetectorPuppet = true) +class ConvergenceWithFailureDetectorPuppetMultiJvmNode2 extends ConvergenceSpec(failureDetectorPuppet = true) +class ConvergenceWithFailureDetectorPuppetMultiJvmNode3 extends ConvergenceSpec(failureDetectorPuppet = true) +class ConvergenceWithFailureDetectorPuppetMultiJvmNode4 extends ConvergenceSpec(failureDetectorPuppet = true) -class ConvergenceWithAccrualFailureDetectorMultiJvmNode1 extends ConvergenceSpec with AccrualFailureDetectorStrategy -class ConvergenceWithAccrualFailureDetectorMultiJvmNode2 extends ConvergenceSpec with AccrualFailureDetectorStrategy -class ConvergenceWithAccrualFailureDetectorMultiJvmNode3 extends ConvergenceSpec with AccrualFailureDetectorStrategy -class ConvergenceWithAccrualFailureDetectorMultiJvmNode4 extends ConvergenceSpec with AccrualFailureDetectorStrategy +class ConvergenceWithAccrualFailureDetectorMultiJvmNode1 extends ConvergenceSpec(failureDetectorPuppet = false) +class ConvergenceWithAccrualFailureDetectorMultiJvmNode2 extends ConvergenceSpec(failureDetectorPuppet = false) +class ConvergenceWithAccrualFailureDetectorMultiJvmNode3 extends ConvergenceSpec(failureDetectorPuppet = false) +class ConvergenceWithAccrualFailureDetectorMultiJvmNode4 extends ConvergenceSpec(failureDetectorPuppet = false) -abstract class ConvergenceSpec - extends MultiNodeSpec(ConvergenceMultiJvmSpec) +abstract class ConvergenceSpec(multiNodeConfig: ConvergenceMultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) with MultiNodeClusterSpec { - import ConvergenceMultiJvmSpec._ + def this(failureDetectorPuppet: Boolean) = this(ConvergenceMultiNodeConfig(failureDetectorPuppet)) + + import multiNodeConfig._ "A cluster of 3 members" must { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/FailureDetectorStrategy.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/FailureDetectorStrategy.scala deleted file mode 100644 index 86e03f9457..0000000000 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/FailureDetectorStrategy.scala +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.cluster - -import akka.actor.Address -import akka.remote.testkit.MultiNodeSpec -import akka.testkit._ - -/** - * Base trait for all failure detector strategies. - */ -trait FailureDetectorStrategy { - - /** - * Get or create the FailureDetector to be used in the cluster node. - * To be defined by subclass. - */ - def failureDetector: FailureDetector - - /** - * Marks a node as available in the failure detector. - * To be defined by subclass. - */ - def markNodeAsAvailable(address: Address): Unit - - /** - * Marks a node as unavailable in the failure detector. - * To be defined by subclass. - */ - def markNodeAsUnavailable(address: Address): Unit -} - -/** - * Defines a FailureDetectorPuppet-based FailureDetectorStrategy. - */ -trait FailureDetectorPuppetStrategy extends FailureDetectorStrategy { self: MultiNodeSpec ⇒ - - /** - * The puppet instance. Separated from 'failureDetector' field so we don't have to cast when using the puppet specific methods. - */ - private val puppet = new FailureDetectorPuppet(system) - - override def failureDetector: FailureDetector = puppet - - override def markNodeAsAvailable(address: Address): Unit = puppet markNodeAsAvailable address - - override def markNodeAsUnavailable(address: Address): Unit = puppet markNodeAsUnavailable address -} - -/** - * Defines a AccrualFailureDetector-based FailureDetectorStrategy. - */ -trait AccrualFailureDetectorStrategy extends FailureDetectorStrategy { self: MultiNodeSpec ⇒ - - override val failureDetector: FailureDetector = new AccrualFailureDetector(system, new ClusterSettings(system.settings.config, system.name)) - - override def markNodeAsAvailable(address: Address): Unit = () - - override def markNodeAsUnavailable(address: Address): Unit = () -} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinInProgressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinInProgressSpec.scala index 61636f6358..e198694aab 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinInProgressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinInProgressSpec.scala @@ -29,8 +29,8 @@ object JoinInProgressMultiJvmSpec extends MultiNodeConfig { .withFallback(MultiNodeClusterSpec.clusterConfig))) } -class JoinInProgressMultiJvmNode1 extends JoinInProgressSpec with AccrualFailureDetectorStrategy -class JoinInProgressMultiJvmNode2 extends JoinInProgressSpec with AccrualFailureDetectorStrategy +class JoinInProgressMultiJvmNode1 extends JoinInProgressSpec +class JoinInProgressMultiJvmNode2 extends JoinInProgressSpec abstract class JoinInProgressSpec extends MultiNodeSpec(JoinInProgressMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala index 10d98cd86b..1391b80127 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala @@ -9,6 +9,7 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import scala.concurrent.util.duration._ +import akka.actor.Address object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig { val seed1 = role("seed1") @@ -18,15 +19,15 @@ object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig { val ordinary2 = role("ordinary2") commonConfig(debugConfig(on = false). - withFallback(ConfigFactory.parseString("akka.cluster.auto-join = on")). + withFallback(ConfigFactory.parseString("akka.cluster.auto-join = off")). withFallback(MultiNodeClusterSpec.clusterConfig)) } -class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy -class JoinSeedNodeMultiJvmNode2 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy -class JoinSeedNodeMultiJvmNode3 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy -class JoinSeedNodeMultiJvmNode4 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy -class JoinSeedNodeMultiJvmNode5 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy +class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec +class JoinSeedNodeMultiJvmNode2 extends JoinSeedNodeSpec +class JoinSeedNodeMultiJvmNode3 extends JoinSeedNodeSpec +class JoinSeedNodeMultiJvmNode4 extends JoinSeedNodeSpec +class JoinSeedNodeMultiJvmNode5 extends JoinSeedNodeSpec abstract class JoinSeedNodeSpec extends MultiNodeSpec(JoinSeedNodeMultiJvmSpec) @@ -34,9 +35,9 @@ abstract class JoinSeedNodeSpec import JoinSeedNodeMultiJvmSpec._ - override def seedNodes = IndexedSeq(seed1, seed2, seed3) + def seedNodes: IndexedSeq[Address] = IndexedSeq(seed1, seed2, seed3) - "A cluster with configured seed nodes" must { + "A cluster with seed nodes" must { "be able to start the seed nodes concurrently" taggedAs LongRunningTest in { runOn(seed1) { @@ -45,12 +46,16 @@ abstract class JoinSeedNodeSpec } runOn(seed1, seed2, seed3) { + cluster.joinSeedNodes(seedNodes) awaitUpConvergence(3) } enterBarrier("after-1") } - "join the seed nodes at startup" taggedAs LongRunningTest in { + "join the seed nodes" taggedAs LongRunningTest in { + runOn(ordinary1, ordinary2) { + cluster.joinSeedNodes(seedNodes) + } awaitUpConvergence(roles.size) enterBarrier("after-2") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala index d34a48f48e..efa9e3f25d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala @@ -17,15 +17,15 @@ object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig { val c1 = role("c1") val c2 = role("c2") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } -class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy -class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy -class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy -class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy -class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy -class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy +class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala index dd880a76d8..0f0bc0b429 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -70,11 +70,11 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig { """)) } -class LargeClusterMultiJvmNode1 extends LargeClusterSpec with AccrualFailureDetectorStrategy -class LargeClusterMultiJvmNode2 extends LargeClusterSpec with AccrualFailureDetectorStrategy -class LargeClusterMultiJvmNode3 extends LargeClusterSpec with AccrualFailureDetectorStrategy -class LargeClusterMultiJvmNode4 extends LargeClusterSpec with AccrualFailureDetectorStrategy -class LargeClusterMultiJvmNode5 extends LargeClusterSpec with AccrualFailureDetectorStrategy +class LargeClusterMultiJvmNode1 extends LargeClusterSpec +class LargeClusterMultiJvmNode2 extends LargeClusterSpec +class LargeClusterMultiJvmNode3 extends LargeClusterSpec +class LargeClusterMultiJvmNode4 extends LargeClusterSpec +class LargeClusterMultiJvmNode5 extends LargeClusterSpec abstract class LargeClusterSpec extends MultiNodeSpec(LargeClusterMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala index 2ae0a79483..01bacf9b14 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala @@ -12,7 +12,7 @@ import akka.testkit._ import akka.actor._ import scala.concurrent.util.duration._ -object LeaderDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { +case class LeaderDowningNodeThatIsUnreachableMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") @@ -20,24 +20,26 @@ object LeaderDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString("akka.cluster.auto-down = on")). - withFallback(MultiNodeClusterSpec.clusterConfig)) + withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) } -class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy -class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy -class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy -class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy +class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true) +class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true) +class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true) +class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true) -class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy -class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy -class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy -class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy +class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false) +class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false) +class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false) +class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false) -abstract class LeaderDowningNodeThatIsUnreachableSpec - extends MultiNodeSpec(LeaderDowningNodeThatIsUnreachableMultiJvmSpec) +abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDowningNodeThatIsUnreachableMultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) with MultiNodeClusterSpec { - import LeaderDowningNodeThatIsUnreachableMultiJvmSpec._ + def this(failureDetectorPuppet: Boolean) = this(LeaderDowningNodeThatIsUnreachableMultiNodeConfig(failureDetectorPuppet)) + + import multiNodeConfig._ "The Leader in a 4 node cluster" must { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index 1a657b3da8..8c2198dd7b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -9,33 +9,35 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ -object LeaderElectionMultiJvmSpec extends MultiNodeConfig { +case class LeaderElectionMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val controller = role("controller") val first = role("first") val second = role("second") val third = role("third") val fourth = role("fourth") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) } -class LeaderElectionWithFailureDetectorPuppetMultiJvmNode1 extends LeaderElectionSpec with FailureDetectorPuppetStrategy -class LeaderElectionWithFailureDetectorPuppetMultiJvmNode2 extends LeaderElectionSpec with FailureDetectorPuppetStrategy -class LeaderElectionWithFailureDetectorPuppetMultiJvmNode3 extends LeaderElectionSpec with FailureDetectorPuppetStrategy -class LeaderElectionWithFailureDetectorPuppetMultiJvmNode4 extends LeaderElectionSpec with FailureDetectorPuppetStrategy -class LeaderElectionWithFailureDetectorPuppetMultiJvmNode5 extends LeaderElectionSpec with FailureDetectorPuppetStrategy +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode1 extends LeaderElectionSpec(failureDetectorPuppet = true) +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode2 extends LeaderElectionSpec(failureDetectorPuppet = true) +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode3 extends LeaderElectionSpec(failureDetectorPuppet = true) +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode4 extends LeaderElectionSpec(failureDetectorPuppet = true) +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode5 extends LeaderElectionSpec(failureDetectorPuppet = true) -class LeaderElectionWithAccrualFailureDetectorMultiJvmNode1 extends LeaderElectionSpec with AccrualFailureDetectorStrategy -class LeaderElectionWithAccrualFailureDetectorMultiJvmNode2 extends LeaderElectionSpec with AccrualFailureDetectorStrategy -class LeaderElectionWithAccrualFailureDetectorMultiJvmNode3 extends LeaderElectionSpec with AccrualFailureDetectorStrategy -class LeaderElectionWithAccrualFailureDetectorMultiJvmNode4 extends LeaderElectionSpec with AccrualFailureDetectorStrategy -class LeaderElectionWithAccrualFailureDetectorMultiJvmNode5 extends LeaderElectionSpec with AccrualFailureDetectorStrategy +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode1 extends LeaderElectionSpec(failureDetectorPuppet = false) +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode2 extends LeaderElectionSpec(failureDetectorPuppet = false) +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode3 extends LeaderElectionSpec(failureDetectorPuppet = false) +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode4 extends LeaderElectionSpec(failureDetectorPuppet = false) +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode5 extends LeaderElectionSpec(failureDetectorPuppet = false) -abstract class LeaderElectionSpec - extends MultiNodeSpec(LeaderElectionMultiJvmSpec) +abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) with MultiNodeClusterSpec { - import LeaderElectionMultiJvmSpec._ + def this(failureDetectorPuppet: Boolean) = this(LeaderElectionMultiNodeConfig(failureDetectorPuppet)) + + import multiNodeConfig._ // sorted in the order used by the cluster lazy val sortedRoles = Seq(first, second, third, fourth).sorted diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala index 15e308cafb..eb0aa40591 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala @@ -23,12 +23,12 @@ object LeaderLeavingMultiJvmSpec extends MultiNodeConfig { .withFallback(ConfigFactory.parseString(""" # turn off unreachable reaper akka.cluster.unreachable-nodes-reaper-interval = 300 s""") - .withFallback(MultiNodeClusterSpec.clusterConfig))) + .withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))) } -class LeaderLeavingMultiJvmNode1 extends LeaderLeavingSpec with FailureDetectorPuppetStrategy -class LeaderLeavingMultiJvmNode2 extends LeaderLeavingSpec with FailureDetectorPuppetStrategy -class LeaderLeavingMultiJvmNode3 extends LeaderLeavingSpec with FailureDetectorPuppetStrategy +class LeaderLeavingMultiJvmNode1 extends LeaderLeavingSpec +class LeaderLeavingMultiJvmNode2 extends LeaderLeavingSpec +class LeaderLeavingMultiJvmNode3 extends LeaderLeavingSpec abstract class LeaderLeavingSpec extends MultiNodeSpec(LeaderLeavingMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala index 1d50fe0d37..afeec13d9e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala @@ -26,12 +26,12 @@ object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig { unreachable-nodes-reaper-interval = 300 s # turn "off" reaping to unreachable node set } """) - .withFallback(MultiNodeClusterSpec.clusterConfig))) + .withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))) } -class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListenerExitingSpec with FailureDetectorPuppetStrategy -class MembershipChangeListenerExitingMultiJvmNode2 extends MembershipChangeListenerExitingSpec with FailureDetectorPuppetStrategy -class MembershipChangeListenerExitingMultiJvmNode3 extends MembershipChangeListenerExitingSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListenerExitingSpec +class MembershipChangeListenerExitingMultiJvmNode2 extends MembershipChangeListenerExitingSpec +class MembershipChangeListenerExitingMultiJvmNode3 extends MembershipChangeListenerExitingSpec abstract class MembershipChangeListenerExitingSpec extends MultiNodeSpec(MembershipChangeListenerExitingMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala index 7c14af7203..b2c69c7c17 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala @@ -17,11 +17,11 @@ object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } -class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec with FailureDetectorPuppetStrategy -class MembershipChangeListenerJoinMultiJvmNode2 extends MembershipChangeListenerJoinSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec +class MembershipChangeListenerJoinMultiJvmNode2 extends MembershipChangeListenerJoinSpec abstract class MembershipChangeListenerJoinSpec extends MultiNodeSpec(MembershipChangeListenerJoinMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala index b1a7663154..835c3d722e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala @@ -24,12 +24,12 @@ object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig { .withFallback(ConfigFactory.parseString(""" akka.cluster.unreachable-nodes-reaper-interval = 300 s # turn "off" """)) - .withFallback(MultiNodeClusterSpec.clusterConfig)) + .withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } -class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListenerLeavingSpec with FailureDetectorPuppetStrategy -class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec with FailureDetectorPuppetStrategy -class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListenerLeavingSpec +class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec +class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec abstract class MembershipChangeListenerLeavingSpec extends MultiNodeSpec(MembershipChangeListenerLeavingMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala index c44e61df57..f4faf4234b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala @@ -16,12 +16,12 @@ object MembershipChangeListenerUpMultiJvmSpec extends MultiNodeConfig { val second = role("second") val third = role("third") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } -class MembershipChangeListenerUpMultiJvmNode1 extends MembershipChangeListenerUpSpec with FailureDetectorPuppetStrategy -class MembershipChangeListenerUpMultiJvmNode2 extends MembershipChangeListenerUpSpec with FailureDetectorPuppetStrategy -class MembershipChangeListenerUpMultiJvmNode3 extends MembershipChangeListenerUpSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerUpMultiJvmNode1 extends MembershipChangeListenerUpSpec +class MembershipChangeListenerUpMultiJvmNode2 extends MembershipChangeListenerUpSpec +class MembershipChangeListenerUpMultiJvmNode3 extends MembershipChangeListenerUpSpec abstract class MembershipChangeListenerUpSpec extends MultiNodeSpec(MembershipChangeListenerUpMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 93c1f921ae..3b593ee87c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -20,6 +20,14 @@ import akka.actor.ActorPath import akka.actor.RootActorPath object MultiNodeClusterSpec { + + def clusterConfigWithFailureDetectorPuppet: Config = + ConfigFactory.parseString("akka.cluster.failure-detector.implementation-class = akka.cluster.FailureDetectorPuppet"). + withFallback(clusterConfig) + + def clusterConfig(failureDetectorPuppet: Boolean): Config = + if (failureDetectorPuppet) clusterConfigWithFailureDetectorPuppet else clusterConfig + def clusterConfig: Config = ConfigFactory.parseString(""" akka.cluster { auto-join = on @@ -31,13 +39,14 @@ object MultiNodeClusterSpec { periodic-tasks-initial-delay = 300 ms publish-stats-interval = 0 s # always, when it happens } + akka.remote.log-remote-lifecycle-events = off akka.test { single-expect-default = 5 s } """) } -trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: MultiNodeSpec ⇒ +trait MultiNodeClusterSpec extends Suite { self: MultiNodeSpec ⇒ override def initialParticipants = roles.size @@ -80,29 +89,12 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu throw t } - /** - * Make it possible to override/configure seedNodes from tests without - * specifying in config. Addresses are unknown before startup time. - */ - protected def seedNodes: IndexedSeq[RoleName] = IndexedSeq.empty - - /** - * The cluster node instance. Needs to be lazily created. - */ - private lazy val clusterNode = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) { - override def seedNodes: IndexedSeq[Address] = { - val testSeedNodes = MultiNodeClusterSpec.this.seedNodes - if (testSeedNodes.isEmpty) super.seedNodes - else testSeedNodes map address - } - } - def clusterView: ClusterReadView = cluster.readView /** * Get the cluster node to use. */ - def cluster: Cluster = clusterNode + def cluster: Cluster = Cluster(system) /** * Use this method for the initial startup of the cluster node. @@ -213,4 +205,24 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu def roleName(addr: Address): Option[RoleName] = roles.find(address(_) == addr) + /** + * Marks a node as available in the failure detector if + * [[akka.cluster.FailureDetectorPuppet]] is used as + * failure detector. + */ + def markNodeAsAvailable(address: Address): Unit = cluster.failureDetector match { + case puppet: FailureDetectorPuppet ⇒ puppet.markNodeAsAvailable(address) + case _ ⇒ + } + + /** + * Marks a node as unavailable in the failure detector if + * [[akka.cluster.FailureDetectorPuppet]] is used as + * failure detector. + */ + def markNodeAsUnavailable(address: Address): Unit = cluster.failureDetector match { + case puppet: FailureDetectorPuppet ⇒ puppet.markNodeAsUnavailable(address) + case _ ⇒ + } + } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala index 752857316b..3fec2f22ad 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala @@ -15,12 +15,12 @@ object NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec extends MultiNodeConfig val second = role("second") val third = role("third") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } -class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec with FailureDetectorPuppetStrategy -class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode2 extends NodeLeavingAndExitingAndBeingRemovedSpec with FailureDetectorPuppetStrategy -class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode3 extends NodeLeavingAndExitingAndBeingRemovedSpec with FailureDetectorPuppetStrategy +class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec +class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode2 extends NodeLeavingAndExitingAndBeingRemovedSpec +class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode3 extends NodeLeavingAndExitingAndBeingRemovedSpec abstract class NodeLeavingAndExitingAndBeingRemovedSpec extends MultiNodeSpec(NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala index 529866c433..0d0b45fd04 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala @@ -23,12 +23,12 @@ object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig { .withFallback(ConfigFactory.parseString(""" # turn off unreachable reaper akka.cluster.unreachable-nodes-reaper-interval = 300 s""") - .withFallback(MultiNodeClusterSpec.clusterConfig))) + .withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))) } -class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec with FailureDetectorPuppetStrategy -class NodeLeavingAndExitingMultiJvmNode2 extends NodeLeavingAndExitingSpec with FailureDetectorPuppetStrategy -class NodeLeavingAndExitingMultiJvmNode3 extends NodeLeavingAndExitingSpec with FailureDetectorPuppetStrategy +class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec +class NodeLeavingAndExitingMultiJvmNode2 extends NodeLeavingAndExitingSpec +class NodeLeavingAndExitingMultiJvmNode3 extends NodeLeavingAndExitingSpec abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExitingMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala index 3df642afad..86e5dd71e9 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala @@ -13,12 +13,12 @@ object NodeMembershipMultiJvmSpec extends MultiNodeConfig { val second = role("second") val third = role("third") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } -class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec with FailureDetectorPuppetStrategy -class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec with FailureDetectorPuppetStrategy -class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec with FailureDetectorPuppetStrategy +class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec +class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec +class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala index 4318d0e79d..0b6cea8683 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala @@ -18,11 +18,11 @@ object NodeUpMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } -class NodeUpMultiJvmNode1 extends NodeUpSpec with FailureDetectorPuppetStrategy -class NodeUpMultiJvmNode2 extends NodeUpSpec with FailureDetectorPuppetStrategy +class NodeUpMultiJvmNode1 extends NodeUpSpec +class NodeUpMultiJvmNode2 extends NodeUpSpec abstract class NodeUpSpec extends MultiNodeSpec(NodeUpMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala index d044d90f72..d84ae89e5d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala @@ -9,7 +9,7 @@ import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import scala.concurrent.util.duration._ -object SingletonClusterMultiJvmSpec extends MultiNodeConfig { +case class SingletonClusterMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") val second = role("second") @@ -21,21 +21,23 @@ object SingletonClusterMultiJvmSpec extends MultiNodeConfig { failure-detector.threshold = 4 } """)). - withFallback(MultiNodeClusterSpec.clusterConfig)) + withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) } -class SingletonClusterWithFailureDetectorPuppetMultiJvmNode1 extends SingletonClusterSpec with FailureDetectorPuppetStrategy -class SingletonClusterWithFailureDetectorPuppetMultiJvmNode2 extends SingletonClusterSpec with FailureDetectorPuppetStrategy +class SingletonClusterWithFailureDetectorPuppetMultiJvmNode1 extends SingletonClusterSpec(failureDetectorPuppet = true) +class SingletonClusterWithFailureDetectorPuppetMultiJvmNode2 extends SingletonClusterSpec(failureDetectorPuppet = true) -class SingletonClusterWithAccrualFailureDetectorMultiJvmNode1 extends SingletonClusterSpec with AccrualFailureDetectorStrategy -class SingletonClusterWithAccrualFailureDetectorMultiJvmNode2 extends SingletonClusterSpec with AccrualFailureDetectorStrategy +class SingletonClusterWithAccrualFailureDetectorMultiJvmNode1 extends SingletonClusterSpec(failureDetectorPuppet = false) +class SingletonClusterWithAccrualFailureDetectorMultiJvmNode2 extends SingletonClusterSpec(failureDetectorPuppet = false) -abstract class SingletonClusterSpec - extends MultiNodeSpec(SingletonClusterMultiJvmSpec) +abstract class SingletonClusterSpec(multiNodeConfig: SingletonClusterMultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) with MultiNodeClusterSpec { - import SingletonClusterMultiJvmSpec._ + def this(failureDetectorPuppet: Boolean) = this(SingletonClusterMultiNodeConfig(failureDetectorPuppet)) + + import multiNodeConfig._ "A cluster of 2 nodes" must { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala index d030734a71..22e3cae694 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala @@ -13,7 +13,7 @@ import scala.concurrent.util.duration._ import akka.actor.Address import akka.remote.testconductor.Direction -object SplitBrainMultiJvmSpec extends MultiNodeConfig { +case class SplitBrainMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") @@ -26,26 +26,28 @@ object SplitBrainMultiJvmSpec extends MultiNodeConfig { auto-down = on failure-detector.threshold = 4 }""")). - withFallback(MultiNodeClusterSpec.clusterConfig)) + withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) } -class SplitBrainWithFailureDetectorPuppetMultiJvmNode1 extends SplitBrainSpec with FailureDetectorPuppetStrategy -class SplitBrainWithFailureDetectorPuppetMultiJvmNode2 extends SplitBrainSpec with FailureDetectorPuppetStrategy -class SplitBrainWithFailureDetectorPuppetMultiJvmNode3 extends SplitBrainSpec with FailureDetectorPuppetStrategy -class SplitBrainWithFailureDetectorPuppetMultiJvmNode4 extends SplitBrainSpec with FailureDetectorPuppetStrategy -class SplitBrainWithFailureDetectorPuppetMultiJvmNode5 extends SplitBrainSpec with FailureDetectorPuppetStrategy +class SplitBrainWithFailureDetectorPuppetMultiJvmNode1 extends SplitBrainSpec(failureDetectorPuppet = true) +class SplitBrainWithFailureDetectorPuppetMultiJvmNode2 extends SplitBrainSpec(failureDetectorPuppet = true) +class SplitBrainWithFailureDetectorPuppetMultiJvmNode3 extends SplitBrainSpec(failureDetectorPuppet = true) +class SplitBrainWithFailureDetectorPuppetMultiJvmNode4 extends SplitBrainSpec(failureDetectorPuppet = true) +class SplitBrainWithFailureDetectorPuppetMultiJvmNode5 extends SplitBrainSpec(failureDetectorPuppet = true) -class SplitBrainWithAccrualFailureDetectorMultiJvmNode1 extends SplitBrainSpec with AccrualFailureDetectorStrategy -class SplitBrainWithAccrualFailureDetectorMultiJvmNode2 extends SplitBrainSpec with AccrualFailureDetectorStrategy -class SplitBrainWithAccrualFailureDetectorMultiJvmNode3 extends SplitBrainSpec with AccrualFailureDetectorStrategy -class SplitBrainWithAccrualFailureDetectorMultiJvmNode4 extends SplitBrainSpec with AccrualFailureDetectorStrategy -class SplitBrainWithAccrualFailureDetectorMultiJvmNode5 extends SplitBrainSpec with AccrualFailureDetectorStrategy +class SplitBrainWithAccrualFailureDetectorMultiJvmNode1 extends SplitBrainSpec(failureDetectorPuppet = false) +class SplitBrainWithAccrualFailureDetectorMultiJvmNode2 extends SplitBrainSpec(failureDetectorPuppet = false) +class SplitBrainWithAccrualFailureDetectorMultiJvmNode3 extends SplitBrainSpec(failureDetectorPuppet = false) +class SplitBrainWithAccrualFailureDetectorMultiJvmNode4 extends SplitBrainSpec(failureDetectorPuppet = false) +class SplitBrainWithAccrualFailureDetectorMultiJvmNode5 extends SplitBrainSpec(failureDetectorPuppet = false) -abstract class SplitBrainSpec - extends MultiNodeSpec(SplitBrainMultiJvmSpec) +abstract class SplitBrainSpec(multiNodeConfig: SplitBrainMultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) with MultiNodeClusterSpec { - import SplitBrainMultiJvmSpec._ + def this(failureDetectorPuppet: Boolean) = this(SplitBrainMultiNodeConfig(failureDetectorPuppet)) + + import multiNodeConfig._ val side1 = IndexedSeq(first, second) val side2 = IndexedSeq(third, fourth, fifth) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index e1b3571bd2..ab521f4a5b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -31,11 +31,11 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { """)) } -class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy -class SunnyWeatherMultiJvmNode2 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy -class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy -class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy -class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy +class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec +class SunnyWeatherMultiJvmNode2 extends SunnyWeatherSpec +class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec +class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec +class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec abstract class SunnyWeatherSpec extends MultiNodeSpec(SunnyWeatherMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index e3e446bd50..4df6032116 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -23,12 +23,12 @@ object TransitionMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString("akka.cluster.periodic-tasks-initial-delay = 300 s # turn off all periodic tasks")). - withFallback(MultiNodeClusterSpec.clusterConfig)) + withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } -class TransitionMultiJvmNode1 extends TransitionSpec with FailureDetectorPuppetStrategy -class TransitionMultiJvmNode2 extends TransitionSpec with FailureDetectorPuppetStrategy -class TransitionMultiJvmNode3 extends TransitionSpec with FailureDetectorPuppetStrategy +class TransitionMultiJvmNode1 extends TransitionSpec +class TransitionMultiJvmNode2 extends TransitionSpec +class TransitionMultiJvmNode3 extends TransitionSpec abstract class TransitionSpec extends MultiNodeSpec(TransitionMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala index ca5c8f3265..97e15c0e4d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -14,7 +14,7 @@ import akka.actor.Address import akka.remote.testconductor.{ RoleName, Direction } import scala.concurrent.util.duration._ -object UnreachableNodeRejoinsClusterMultiJvmSpec extends MultiNodeConfig { +case class UnreachableNodeRejoinsClusterMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") @@ -23,20 +23,23 @@ object UnreachableNodeRejoinsClusterMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy -class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy -class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy -class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy +class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true) +class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true) +class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true) +class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true) -class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy -class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy -class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy -class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy +class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = false) +class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = false) +class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = false) +class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = false) -abstract class UnreachableNodeRejoinsClusterSpec - extends MultiNodeSpec(UnreachableNodeRejoinsClusterMultiJvmSpec) +abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNodeRejoinsClusterMultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) with MultiNodeClusterSpec { - import UnreachableNodeRejoinsClusterMultiJvmSpec._ + + def this(failureDetectorPuppet: Boolean) = this(UnreachableNodeRejoinsClusterMultiNodeConfig(failureDetectorPuppet)) + + import multiNodeConfig._ def allBut(role: RoleName, roles: Seq[RoleName] = roles): Seq[RoleName] = { roles.filterNot(_ == role) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 73364b853e..38ac359de4 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -26,8 +26,10 @@ object ClusterSpec { auto-down = off periodic-tasks-initial-delay = 120 seconds // turn off scheduled tasks publish-stats-interval = 0 s # always, when it happens + failure-detector.implementation-class = akka.cluster.FailureDetectorPuppet } akka.actor.provider = "akka.remote.RemoteActorRefProvider" + akka.remote.log-remote-lifecycle-events = off akka.remote.netty.port = 0 # akka.loglevel = DEBUG """ @@ -41,9 +43,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { val selfAddress = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.address - val failureDetector = new FailureDetectorPuppet(system) - - val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) + val cluster = Cluster(system) def clusterView = cluster.readView def leaderActions(): Unit = { diff --git a/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala index 9ddc9942b0..869df97f84 100644 --- a/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala +++ b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala @@ -6,6 +6,7 @@ package akka.cluster import akka.actor.{ Address, ActorSystem } import akka.event.{ Logging, LogSource } +import akka.remote.testkit.MultiNodeConfig /** * User controllable "puppet" failure detector. @@ -13,8 +14,6 @@ import akka.event.{ Logging, LogSource } class FailureDetectorPuppet(system: ActorSystem, settings: ClusterSettings) extends FailureDetector { import java.util.concurrent.ConcurrentHashMap - def this(system: ActorSystem) = this(system, new ClusterSettings(system.settings.config, system.name)) - trait Status object Up extends Status object Down extends Status @@ -63,3 +62,4 @@ class FailureDetectorPuppet(system: ActorSystem, settings: ClusterSettings) exte connections.clear() } } +