diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index d3fe3308ea..9e126e75ed 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -102,8 +102,6 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with } "be able to send their routees" in { - val doneLatch = new TestLatch(1) - class TheActor extends Actor { val routee1 = context.actorOf(Props[TestActor], "routee1") val routee2 = context.actorOf(Props[TestActor], "routee2") @@ -114,19 +112,18 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with within = 5 seconds))) def receive = { - case RouterRoutees(iterable) ⇒ - iterable.exists(_.path.name == "routee1") must be(true) - iterable.exists(_.path.name == "routee2") must be(true) - iterable.exists(_.path.name == "routee3") must be(true) - doneLatch.countDown() - case "doIt" ⇒ - router ! CurrentRoutees + case "doIt" ⇒ router ! CurrentRoutees + case routees: RouterRoutees ⇒ testActor forward routees } } val theActor = system.actorOf(Props(new TheActor), "theActor") theActor ! "doIt" - Await.ready(doneLatch, remaining) + val routees = expectMsgPF() { + case RouterRoutees(routees) ⇒ routees.toSet + } + + routees.map(_.path.name) must be(Set("routee1", "routee2", "routee3")) } "use configured nr-of-instances when FromConfig" in { @@ -226,14 +223,9 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with } "send message to connection" in { - val doneLatch = new TestLatch(1) - - val counter = new AtomicInteger(0) - class Actor1 extends Actor { def receive = { - case "end" ⇒ doneLatch.countDown() - case _ ⇒ counter.incrementAndGet + case msg ⇒ testActor forward msg } } @@ -241,9 +233,8 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with routedActor ! "hello" routedActor ! "end" - Await.ready(doneLatch, remaining) - - counter.get must be(1) + expectMsg("hello") + expectMsg("end") } } diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index a42066d0d2..51d0c290dc 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -597,7 +597,7 @@ trait FSM[S, D] extends Listeners with ActorLogging { val timeout = if (currentState.timeout.isDefined) currentState.timeout else stateTimeouts(currentState.stateName) if (timeout.isDefined) { val t = timeout.get - if (t.finite_? && t.length >= 0) { + if (t.isFinite && t.length >= 0) { import context.dispatcher timeoutFuture = Some(context.system.scheduler.scheduleOnce(t, self, TimeoutMarker(generation))) } diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index a2311f596a..bfe419ec4d 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -63,6 +63,12 @@ akka { failure-detector { + # FQCN of the failure detector implementation. + # It must implement akka.cluster.akka.cluster and + # have constructor with akka.actor.ActorSystem and + # akka.cluster.ClusterSettings parameters + implementation-class = "akka.cluster.AccrualFailureDetector" + # defines the failure detector threshold # A low threshold is prone to generate many wrong suspicions but ensures # a quick detection in the event of a real crash. Conversely, a high @@ -84,8 +90,6 @@ akka { # network drop. acceptable-heartbeat-pause = 3s - implementation-class = "akka.cluster.AccrualFailureDetector" - max-sample-size = 1000 } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 5248db6ad8..496836f435 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -40,19 +40,7 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { override def lookup = Cluster - override def createExtension(system: ExtendedActorSystem): Cluster = { - val clusterSettings = new ClusterSettings(system.settings.config, system.name) - - val failureDetector = { - import clusterSettings.{ FailureDetectorImplementationClass ⇒ fqcn } - system.dynamicAccess.createInstanceFor[FailureDetector]( - fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> clusterSettings)).recover({ - case e ⇒ throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString) - }).get - } - - new Cluster(system, failureDetector) - } + override def createExtension(system: ExtendedActorSystem): Cluster = new Cluster(system) } /** @@ -69,7 +57,7 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { * if (Cluster(system).isLeader) { ... } * }}} */ -class Cluster(val system: ExtendedActorSystem, val failureDetector: FailureDetector) extends Extension with ClusterEnvironment { +class Cluster(val system: ExtendedActorSystem) extends Extension { import ClusterEvent._ @@ -86,6 +74,14 @@ class Cluster(val system: ExtendedActorSystem, val failureDetector: FailureDetec log.info("Cluster Node [{}] - is starting up...", selfAddress) + val failureDetector = { + import settings.{ FailureDetectorImplementationClass ⇒ fqcn } + system.dynamicAccess.createInstanceFor[FailureDetector]( + fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> settings)).recover({ + case e ⇒ throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString) + }).get + } + // ======================================================== // ===================== WORK DAEMONS ===================== // ======================================================== @@ -140,7 +136,7 @@ class Cluster(val system: ExtendedActorSystem, val failureDetector: FailureDetec // create supervisor for daemons under path "/system/cluster" private val clusterDaemons: ActorRef = { - system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new ClusterDaemon(this)). + system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new ClusterDaemon(settings)). withDispatcher(UseDispatcher), name = "cluster") } @@ -215,10 +211,12 @@ class Cluster(val system: ExtendedActorSystem, val failureDetector: FailureDetec // ======================================================== /** - * Make it possible to override/configure seedNodes from tests without - * specifying in config. Addresses are unknown before startup time. + * Make it possible to join the specified seed nodes without defining them + * in config. Especially useful from tests when Addresses are unknown + * before startup time. */ - private[cluster] def seedNodes: IndexedSeq[Address] = SeedNodes + private[cluster] def joinSeedNodes(seedNodes: IndexedSeq[Address]): Unit = + clusterCore ! InternalClusterAction.JoinSeedNodes(seedNodes) /** * INTERNAL API. diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index ee8d1e0374..f84bf100ea 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -58,6 +58,12 @@ private[cluster] object InternalClusterAction { */ case class JoinTo(address: Address) extends ClusterMessage + /** + * Command to initiate the process to join the specified + * seed nodes. + */ + case class JoinSeedNodes(seedNodes: IndexedSeq[Address]) + /** * Start message of the process to join one of the seed nodes. * The node sends `InitJoin` to all seed nodes, which replies @@ -128,33 +134,21 @@ private[cluster] object ClusterLeaderAction { case class Remove(address: Address) extends ClusterMessage } -/** - * INTERNAL API - * - * The contextual pieces that ClusterDaemon actors need. - * Makes it easier to test the actors without using the Cluster extension. - */ -private[cluster] trait ClusterEnvironment { - private[cluster] def settings: ClusterSettings - private[cluster] def failureDetector: FailureDetector - private[cluster] def selfAddress: Address - private[cluster] def scheduler: Scheduler - private[cluster] def seedNodes: IndexedSeq[Address] - private[cluster] def shutdown(): Unit -} - /** * INTERNAL API. * * Supervisor managing the different Cluster daemons. */ -private[cluster] final class ClusterDaemon(environment: ClusterEnvironment) extends Actor with ActorLogging { +private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Actor with ActorLogging { - val configuredDispatcher = environment.settings.UseDispatcher - val core = context.actorOf(Props(new ClusterCoreDaemon(environment)). - withDispatcher(configuredDispatcher), name = "core") - val heartbeat = context.actorOf(Props(new ClusterHeartbeatDaemon(environment)). - withDispatcher(configuredDispatcher), name = "heartbeat") + // Important - don't use Cluster(context.system) here because that would + // cause deadlock. The Cluster extension is currently being created and is waiting + // for response from GetClusterCoreRef in its constructor. + + val core = context.actorOf(Props[ClusterCoreDaemon]. + withDispatcher(context.props.dispatcher), name = "core") + val heartbeat = context.actorOf(Props[ClusterHeartbeatDaemon]. + withDispatcher(context.props.dispatcher), name = "heartbeat") def receive = { case InternalClusterAction.GetClusterCoreRef ⇒ sender ! core @@ -165,16 +159,14 @@ private[cluster] final class ClusterDaemon(environment: ClusterEnvironment) exte /** * INTERNAL API. */ -private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) extends Actor with ActorLogging { +private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging { import ClusterLeaderAction._ import InternalClusterAction._ import ClusterHeartbeatSender._ - def selfAddress = environment.selfAddress - def clusterScheduler = environment.scheduler - def failureDetector = environment.failureDetector - val settings = environment.settings - import settings._ + val cluster = Cluster(context.system) + import cluster.{ selfAddress, scheduler, failureDetector } + import cluster.settings._ val vclockNode = VectorClock.Node(selfAddress.toString) val selfHeartbeat = Heartbeat(selfAddress) @@ -186,55 +178,48 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) var stats = ClusterStats() - val heartbeatSender = context.actorOf(Props(new ClusterHeartbeatSender(environment)). + val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender]. withDispatcher(UseDispatcher), name = "heartbeatSender") - val coreSender = context.actorOf(Props(new ClusterCoreSender(selfAddress)). + val coreSender = context.actorOf(Props[ClusterCoreSender]. withDispatcher(UseDispatcher), name = "coreSender") - val publisher = context.actorOf(Props(new ClusterDomainEventPublisher(environment)). + val publisher = context.actorOf(Props[ClusterDomainEventPublisher]. withDispatcher(UseDispatcher), name = "publisher") import context.dispatcher // start periodic gossip to random nodes in cluster val gossipTask = - FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(GossipInterval), GossipInterval) { + FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(GossipInterval), GossipInterval) { self ! GossipTick } // start periodic heartbeat to all nodes in cluster val heartbeatTask = - FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval), HeartbeatInterval) { + FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval), HeartbeatInterval) { self ! HeartbeatTick } // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list) val failureDetectorReaperTask = - FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval), UnreachableNodesReaperInterval) { + FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval), UnreachableNodesReaperInterval) { self ! ReapUnreachableTick } // start periodic leader action management (only applies for the current leader) private val leaderActionsTask = - FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval), LeaderActionsInterval) { + FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval), LeaderActionsInterval) { self ! LeaderActionsTick } // start periodic publish of current state private val publishStateTask: Option[Cancellable] = if (PublishStatsInterval == Duration.Zero) None - else Some(FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(PublishStatsInterval), PublishStatsInterval) { + else Some(FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(PublishStatsInterval), PublishStatsInterval) { self ! PublishStatsTick }) override def preStart(): Unit = { - if (AutoJoin) { - // only the node which is named first in the list of seed nodes will join itself - if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress) - self ! JoinTo(selfAddress) - else - context.actorOf(Props(new JoinSeedNodeProcess(environment)). - withDispatcher(UseDispatcher), name = "joinSeedNodeProcess") - } + if (AutoJoin) self ! JoinSeedNodes(SeedNodes) } override def postStop(): Unit = { @@ -248,6 +233,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) def uninitialized: Actor.Receive = { case InitJoin ⇒ // skip, not ready yet case JoinTo(address) ⇒ join(address) + case JoinSeedNodes(seedNodes) ⇒ joinSeedNodes(seedNodes) case msg: SubscriptionMessage ⇒ publisher forward msg case _: Tick ⇒ // ignore periodic tasks until initialized } @@ -282,6 +268,15 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) def initJoin(): Unit = sender ! InitJoinAck(selfAddress) + def joinSeedNodes(seedNodes: IndexedSeq[Address]): Unit = { + // only the node which is named first in the list of seed nodes will join itself + if (seedNodes.isEmpty || seedNodes.head == selfAddress) + self ! JoinTo(selfAddress) + else + context.actorOf(Props(new JoinSeedNodeProcess(seedNodes)). + withDispatcher(UseDispatcher), name = "joinSeedNodeProcess") + } + /** * Try to join this cluster node with the node specified by 'address'. * A 'Join(thisNodeAddress)' command is sent to the node to join. @@ -393,7 +388,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) // make sure the final (removed) state is published // before shutting down implicit val timeout = Timeout(5 seconds) - publisher ? PublishDone onComplete { case _ ⇒ environment.shutdown() } + publisher ? PublishDone onComplete { case _ ⇒ cluster.shutdown() } } /** @@ -796,8 +791,6 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) } } - def seedNodes: IndexedSeq[Address] = environment.seedNodes - def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] = if (addresses.isEmpty) None else Some(addresses(ThreadLocalRandom.current nextInt addresses.size)) @@ -865,22 +858,22 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) * 5. seed3 retries the join procedure and gets acks from seed2 first, and then joins to seed2 * */ -private[cluster] final class JoinSeedNodeProcess(environment: ClusterEnvironment) extends Actor with ActorLogging { +private[cluster] final class JoinSeedNodeProcess(seedNodes: IndexedSeq[Address]) extends Actor with ActorLogging { import InternalClusterAction._ - def selfAddress = environment.selfAddress + def selfAddress = Cluster(context.system).selfAddress - if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress) + if (seedNodes.isEmpty || seedNodes.head == selfAddress) throw new IllegalArgumentException("Join seed node should not be done") - context.setReceiveTimeout(environment.settings.SeedNodeTimeout) + context.setReceiveTimeout(Cluster(context.system).settings.SeedNodeTimeout) override def preStart(): Unit = self ! JoinSeedNode def receive = { case JoinSeedNode ⇒ // send InitJoin to all seed nodes (except myself) - environment.seedNodes.collect { + seedNodes.collect { case a if a != selfAddress ⇒ context.system.actorFor(context.parent.path.toStringWithAddress(a)) } foreach { _ ! InitJoin } case InitJoinAck(address) ⇒ @@ -901,9 +894,11 @@ private[cluster] final class JoinSeedNodeProcess(environment: ClusterEnvironment /** * INTERNAL API. */ -private[cluster] final class ClusterCoreSender(selfAddress: Address) extends Actor with ActorLogging { +private[cluster] final class ClusterCoreSender extends Actor with ActorLogging { import InternalClusterAction._ + val selfAddress = Cluster(context.system).selfAddress + /** * Looks up and returns the remote cluster command connection for the specific address. */ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index cbcee78cc9..ae1bd6b6ac 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -169,7 +169,7 @@ object ClusterEvent { * Responsible for domain event subscriptions and publishing of * domain events to event bus. */ -private[cluster] final class ClusterDomainEventPublisher(environment: ClusterEnvironment) extends Actor with ActorLogging { +private[cluster] final class ClusterDomainEventPublisher extends Actor with ActorLogging { import InternalClusterAction._ var latestGossip: Gossip = Gossip() diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index 932be34d2f..a7b4d52daa 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -23,10 +23,12 @@ case class Heartbeat(from: Address) extends ClusterMessage * Instantiated as a single instance for each Cluster - e.g. heartbeats are serialized * to Cluster message after message, but concurrent with other types of messages. */ -private[cluster] final class ClusterHeartbeatDaemon(environment: ClusterEnvironment) extends Actor with ActorLogging { +private[cluster] final class ClusterHeartbeatDaemon extends Actor with ActorLogging { + + val failureDetector = Cluster(context.system).failureDetector def receive = { - case Heartbeat(from) ⇒ environment.failureDetector heartbeat from + case Heartbeat(from) ⇒ failureDetector heartbeat from } } @@ -53,7 +55,7 @@ private[cluster] object ClusterHeartbeatSender { * address and thereby reduce the risk of irregular heartbeats to healty * nodes due to broken connections to other nodes. */ -private[cluster] final class ClusterHeartbeatSender(environment: ClusterEnvironment) extends Actor with ActorLogging { +private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging { import ClusterHeartbeatSender._ /** @@ -78,8 +80,7 @@ private[cluster] final class ClusterHeartbeatSender(environment: ClusterEnvironm val workerName = encodeChildName(to.toString) val worker = context.actorFor(workerName) match { case notFound if notFound.isTerminated ⇒ - context.actorOf(Props(new ClusterHeartbeatSenderWorker( - environment.settings.SendCircuitBreakerSettings, clusterHeartbeatConnectionFor(to))), workerName) + context.actorOf(Props(new ClusterHeartbeatSenderWorker(clusterHeartbeatConnectionFor(to))), workerName) case child ⇒ child } worker ! msg @@ -96,17 +97,19 @@ private[cluster] final class ClusterHeartbeatSender(environment: ClusterEnvironm * * @see ClusterHeartbeatSender */ -private[cluster] final class ClusterHeartbeatSenderWorker( - cbSettings: CircuitBreakerSettings, toRef: ActorRef) +private[cluster] final class ClusterHeartbeatSenderWorker(toRef: ActorRef) extends Actor with ActorLogging { import ClusterHeartbeatSender._ - val breaker = CircuitBreaker(context.system.scheduler, - cbSettings.maxFailures, cbSettings.callTimeout, cbSettings.resetTimeout). - onHalfOpen(log.debug("CircuitBreaker Half-Open for: [{}]", toRef)). - onOpen(log.debug("CircuitBreaker Open for [{}]", toRef)). - onClose(log.debug("CircuitBreaker Closed for [{}]", toRef)) + val breaker = { + val cbSettings = Cluster(context.system).settings.SendCircuitBreakerSettings + CircuitBreaker(context.system.scheduler, + cbSettings.maxFailures, cbSettings.callTimeout, cbSettings.resetTimeout). + onHalfOpen(log.debug("CircuitBreaker Half-Open for: [{}]", toRef)). + onOpen(log.debug("CircuitBreaker Open for [{}]", toRef)). + onClose(log.debug("CircuitBreaker Closed for [{}]", toRef)) + } // make sure it will cleanup when not used any more context.setReceiveTimeout(30 seconds) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index 8ca328ed18..49483d39ef 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -9,30 +9,32 @@ import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.actor.Address -object ClientDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { +case class ClientDowningNodeThatIsUnreachableMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") val fourth = role("fourth") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) } -class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy -class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy -class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy -class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy +class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true) +class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true) +class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true) +class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true) -class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false) +class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false) +class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false) +class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false) -abstract class ClientDowningNodeThatIsUnreachableSpec - extends MultiNodeSpec(ClientDowningNodeThatIsUnreachableMultiJvmSpec) - with MultiNodeClusterSpec with FailureDetectorStrategy { +abstract class ClientDowningNodeThatIsUnreachableSpec(multiNodeConfig: ClientDowningNodeThatIsUnreachableMultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) + with MultiNodeClusterSpec { - import ClientDowningNodeThatIsUnreachableMultiJvmSpec._ + def this(failureDetectorPuppet: Boolean) = this(ClientDowningNodeThatIsUnreachableMultiNodeConfig(failureDetectorPuppet)) + + import multiNodeConfig._ "Client of a 4 node cluster" must { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala index e7d487a276..5a7308ec92 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -9,30 +9,32 @@ import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.actor.Address -object ClientDowningNodeThatIsUpMultiJvmSpec extends MultiNodeConfig { +case class ClientDowningNodeThatIsUpMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") val fourth = role("fourth") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) } -class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy -class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy -class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy -class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy +class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = true) +class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = true) +class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = true) +class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = true) -class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = false) +class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = false) +class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = false) +class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = false) -abstract class ClientDowningNodeThatIsUpSpec - extends MultiNodeSpec(ClientDowningNodeThatIsUpMultiJvmSpec) - with MultiNodeClusterSpec with FailureDetectorStrategy { +abstract class ClientDowningNodeThatIsUpSpec(multiNodeConfig: ClientDowningNodeThatIsUpMultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) + with MultiNodeClusterSpec { - import ClientDowningNodeThatIsUpMultiJvmSpec._ + def this(failureDetectorPuppet: Boolean) = this(ClientDowningNodeThatIsUpMultiNodeConfig(failureDetectorPuppet)) + + import multiNodeConfig._ "Client of a 4 node cluster" must { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala index 89ad712815..97d1e870a5 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterAccrualFailureDetectorSpec.scala @@ -19,9 +19,9 @@ object ClusterAccrualFailureDetectorMultiJvmSpec extends MultiNodeConfig { withFallback(MultiNodeClusterSpec.clusterConfig)) } -class ClusterAccrualFailureDetectorMultiJvmNode1 extends ClusterAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy -class ClusterAccrualFailureDetectorMultiJvmNode2 extends ClusterAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy -class ClusterAccrualFailureDetectorMultiJvmNode3 extends ClusterAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy +class ClusterAccrualFailureDetectorMultiJvmNode1 extends ClusterAccrualFailureDetectorSpec +class ClusterAccrualFailureDetectorMultiJvmNode2 extends ClusterAccrualFailureDetectorSpec +class ClusterAccrualFailureDetectorMultiJvmNode3 extends ClusterAccrualFailureDetectorSpec abstract class ClusterAccrualFailureDetectorSpec extends MultiNodeSpec(ClusterAccrualFailureDetectorMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala index 239a7c7bcb..420b2acacb 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala @@ -12,7 +12,7 @@ import akka.testkit._ import scala.concurrent.util.duration._ import akka.actor.Address -object ConvergenceMultiJvmSpec extends MultiNodeConfig { +case class ConvergenceMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") @@ -20,24 +20,26 @@ object ConvergenceMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString("akka.cluster.failure-detector.threshold = 4")). - withFallback(MultiNodeClusterSpec.clusterConfig)) + withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) } -class ConvergenceWithFailureDetectorPuppetMultiJvmNode1 extends ConvergenceSpec with FailureDetectorPuppetStrategy -class ConvergenceWithFailureDetectorPuppetMultiJvmNode2 extends ConvergenceSpec with FailureDetectorPuppetStrategy -class ConvergenceWithFailureDetectorPuppetMultiJvmNode3 extends ConvergenceSpec with FailureDetectorPuppetStrategy -class ConvergenceWithFailureDetectorPuppetMultiJvmNode4 extends ConvergenceSpec with FailureDetectorPuppetStrategy +class ConvergenceWithFailureDetectorPuppetMultiJvmNode1 extends ConvergenceSpec(failureDetectorPuppet = true) +class ConvergenceWithFailureDetectorPuppetMultiJvmNode2 extends ConvergenceSpec(failureDetectorPuppet = true) +class ConvergenceWithFailureDetectorPuppetMultiJvmNode3 extends ConvergenceSpec(failureDetectorPuppet = true) +class ConvergenceWithFailureDetectorPuppetMultiJvmNode4 extends ConvergenceSpec(failureDetectorPuppet = true) -class ConvergenceWithAccrualFailureDetectorMultiJvmNode1 extends ConvergenceSpec with AccrualFailureDetectorStrategy -class ConvergenceWithAccrualFailureDetectorMultiJvmNode2 extends ConvergenceSpec with AccrualFailureDetectorStrategy -class ConvergenceWithAccrualFailureDetectorMultiJvmNode3 extends ConvergenceSpec with AccrualFailureDetectorStrategy -class ConvergenceWithAccrualFailureDetectorMultiJvmNode4 extends ConvergenceSpec with AccrualFailureDetectorStrategy +class ConvergenceWithAccrualFailureDetectorMultiJvmNode1 extends ConvergenceSpec(failureDetectorPuppet = false) +class ConvergenceWithAccrualFailureDetectorMultiJvmNode2 extends ConvergenceSpec(failureDetectorPuppet = false) +class ConvergenceWithAccrualFailureDetectorMultiJvmNode3 extends ConvergenceSpec(failureDetectorPuppet = false) +class ConvergenceWithAccrualFailureDetectorMultiJvmNode4 extends ConvergenceSpec(failureDetectorPuppet = false) -abstract class ConvergenceSpec - extends MultiNodeSpec(ConvergenceMultiJvmSpec) - with MultiNodeClusterSpec with FailureDetectorStrategy { +abstract class ConvergenceSpec(multiNodeConfig: ConvergenceMultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) + with MultiNodeClusterSpec { - import ConvergenceMultiJvmSpec._ + def this(failureDetectorPuppet: Boolean) = this(ConvergenceMultiNodeConfig(failureDetectorPuppet)) + + import multiNodeConfig._ "A cluster of 3 members" must { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/FailureDetectorStrategy.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/FailureDetectorStrategy.scala deleted file mode 100644 index 86e03f9457..0000000000 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/FailureDetectorStrategy.scala +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.cluster - -import akka.actor.Address -import akka.remote.testkit.MultiNodeSpec -import akka.testkit._ - -/** - * Base trait for all failure detector strategies. - */ -trait FailureDetectorStrategy { - - /** - * Get or create the FailureDetector to be used in the cluster node. - * To be defined by subclass. - */ - def failureDetector: FailureDetector - - /** - * Marks a node as available in the failure detector. - * To be defined by subclass. - */ - def markNodeAsAvailable(address: Address): Unit - - /** - * Marks a node as unavailable in the failure detector. - * To be defined by subclass. - */ - def markNodeAsUnavailable(address: Address): Unit -} - -/** - * Defines a FailureDetectorPuppet-based FailureDetectorStrategy. - */ -trait FailureDetectorPuppetStrategy extends FailureDetectorStrategy { self: MultiNodeSpec ⇒ - - /** - * The puppet instance. Separated from 'failureDetector' field so we don't have to cast when using the puppet specific methods. - */ - private val puppet = new FailureDetectorPuppet(system) - - override def failureDetector: FailureDetector = puppet - - override def markNodeAsAvailable(address: Address): Unit = puppet markNodeAsAvailable address - - override def markNodeAsUnavailable(address: Address): Unit = puppet markNodeAsUnavailable address -} - -/** - * Defines a AccrualFailureDetector-based FailureDetectorStrategy. - */ -trait AccrualFailureDetectorStrategy extends FailureDetectorStrategy { self: MultiNodeSpec ⇒ - - override val failureDetector: FailureDetector = new AccrualFailureDetector(system, new ClusterSettings(system.settings.config, system.name)) - - override def markNodeAsAvailable(address: Address): Unit = () - - override def markNodeAsUnavailable(address: Address): Unit = () -} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala index 10d98cd86b..1391b80127 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala @@ -9,6 +9,7 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import scala.concurrent.util.duration._ +import akka.actor.Address object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig { val seed1 = role("seed1") @@ -18,15 +19,15 @@ object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig { val ordinary2 = role("ordinary2") commonConfig(debugConfig(on = false). - withFallback(ConfigFactory.parseString("akka.cluster.auto-join = on")). + withFallback(ConfigFactory.parseString("akka.cluster.auto-join = off")). withFallback(MultiNodeClusterSpec.clusterConfig)) } -class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy -class JoinSeedNodeMultiJvmNode2 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy -class JoinSeedNodeMultiJvmNode3 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy -class JoinSeedNodeMultiJvmNode4 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy -class JoinSeedNodeMultiJvmNode5 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy +class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec +class JoinSeedNodeMultiJvmNode2 extends JoinSeedNodeSpec +class JoinSeedNodeMultiJvmNode3 extends JoinSeedNodeSpec +class JoinSeedNodeMultiJvmNode4 extends JoinSeedNodeSpec +class JoinSeedNodeMultiJvmNode5 extends JoinSeedNodeSpec abstract class JoinSeedNodeSpec extends MultiNodeSpec(JoinSeedNodeMultiJvmSpec) @@ -34,9 +35,9 @@ abstract class JoinSeedNodeSpec import JoinSeedNodeMultiJvmSpec._ - override def seedNodes = IndexedSeq(seed1, seed2, seed3) + def seedNodes: IndexedSeq[Address] = IndexedSeq(seed1, seed2, seed3) - "A cluster with configured seed nodes" must { + "A cluster with seed nodes" must { "be able to start the seed nodes concurrently" taggedAs LongRunningTest in { runOn(seed1) { @@ -45,12 +46,16 @@ abstract class JoinSeedNodeSpec } runOn(seed1, seed2, seed3) { + cluster.joinSeedNodes(seedNodes) awaitUpConvergence(3) } enterBarrier("after-1") } - "join the seed nodes at startup" taggedAs LongRunningTest in { + "join the seed nodes" taggedAs LongRunningTest in { + runOn(ordinary1, ordinary2) { + cluster.joinSeedNodes(seedNodes) + } awaitUpConvergence(roles.size) enterBarrier("after-2") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala index d34a48f48e..efa9e3f25d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala @@ -17,15 +17,15 @@ object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig { val c1 = role("c1") val c2 = role("c2") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } -class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy -class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy -class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy -class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy -class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy -class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy +class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala index d4023d785b..01bacf9b14 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala @@ -12,7 +12,7 @@ import akka.testkit._ import akka.actor._ import scala.concurrent.util.duration._ -object LeaderDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { +case class LeaderDowningNodeThatIsUnreachableMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") @@ -20,24 +20,26 @@ object LeaderDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString("akka.cluster.auto-down = on")). - withFallback(MultiNodeClusterSpec.clusterConfig)) + withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) } -class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy -class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy -class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy -class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy +class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true) +class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true) +class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true) +class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true) -class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy -class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy -class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy -class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy +class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false) +class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false) +class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false) +class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false) -abstract class LeaderDowningNodeThatIsUnreachableSpec - extends MultiNodeSpec(LeaderDowningNodeThatIsUnreachableMultiJvmSpec) - with MultiNodeClusterSpec with FailureDetectorStrategy { +abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDowningNodeThatIsUnreachableMultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) + with MultiNodeClusterSpec { - import LeaderDowningNodeThatIsUnreachableMultiJvmSpec._ + def this(failureDetectorPuppet: Boolean) = this(LeaderDowningNodeThatIsUnreachableMultiNodeConfig(failureDetectorPuppet)) + + import multiNodeConfig._ "The Leader in a 4 node cluster" must { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index 5d455251aa..8c2198dd7b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -9,33 +9,35 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ -object LeaderElectionMultiJvmSpec extends MultiNodeConfig { +case class LeaderElectionMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val controller = role("controller") val first = role("first") val second = role("second") val third = role("third") val fourth = role("fourth") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) } -class LeaderElectionWithFailureDetectorPuppetMultiJvmNode1 extends LeaderElectionSpec with FailureDetectorPuppetStrategy -class LeaderElectionWithFailureDetectorPuppetMultiJvmNode2 extends LeaderElectionSpec with FailureDetectorPuppetStrategy -class LeaderElectionWithFailureDetectorPuppetMultiJvmNode3 extends LeaderElectionSpec with FailureDetectorPuppetStrategy -class LeaderElectionWithFailureDetectorPuppetMultiJvmNode4 extends LeaderElectionSpec with FailureDetectorPuppetStrategy -class LeaderElectionWithFailureDetectorPuppetMultiJvmNode5 extends LeaderElectionSpec with FailureDetectorPuppetStrategy +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode1 extends LeaderElectionSpec(failureDetectorPuppet = true) +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode2 extends LeaderElectionSpec(failureDetectorPuppet = true) +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode3 extends LeaderElectionSpec(failureDetectorPuppet = true) +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode4 extends LeaderElectionSpec(failureDetectorPuppet = true) +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode5 extends LeaderElectionSpec(failureDetectorPuppet = true) -class LeaderElectionWithAccrualFailureDetectorMultiJvmNode1 extends LeaderElectionSpec with AccrualFailureDetectorStrategy -class LeaderElectionWithAccrualFailureDetectorMultiJvmNode2 extends LeaderElectionSpec with AccrualFailureDetectorStrategy -class LeaderElectionWithAccrualFailureDetectorMultiJvmNode3 extends LeaderElectionSpec with AccrualFailureDetectorStrategy -class LeaderElectionWithAccrualFailureDetectorMultiJvmNode4 extends LeaderElectionSpec with AccrualFailureDetectorStrategy -class LeaderElectionWithAccrualFailureDetectorMultiJvmNode5 extends LeaderElectionSpec with AccrualFailureDetectorStrategy +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode1 extends LeaderElectionSpec(failureDetectorPuppet = false) +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode2 extends LeaderElectionSpec(failureDetectorPuppet = false) +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode3 extends LeaderElectionSpec(failureDetectorPuppet = false) +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode4 extends LeaderElectionSpec(failureDetectorPuppet = false) +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode5 extends LeaderElectionSpec(failureDetectorPuppet = false) -abstract class LeaderElectionSpec - extends MultiNodeSpec(LeaderElectionMultiJvmSpec) - with MultiNodeClusterSpec with FailureDetectorStrategy { +abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) + with MultiNodeClusterSpec { - import LeaderElectionMultiJvmSpec._ + def this(failureDetectorPuppet: Boolean) = this(LeaderElectionMultiNodeConfig(failureDetectorPuppet)) + + import multiNodeConfig._ // sorted in the order used by the cluster lazy val sortedRoles = Seq(first, second, third, fourth).sorted diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala index 15e308cafb..eb0aa40591 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala @@ -23,12 +23,12 @@ object LeaderLeavingMultiJvmSpec extends MultiNodeConfig { .withFallback(ConfigFactory.parseString(""" # turn off unreachable reaper akka.cluster.unreachable-nodes-reaper-interval = 300 s""") - .withFallback(MultiNodeClusterSpec.clusterConfig))) + .withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))) } -class LeaderLeavingMultiJvmNode1 extends LeaderLeavingSpec with FailureDetectorPuppetStrategy -class LeaderLeavingMultiJvmNode2 extends LeaderLeavingSpec with FailureDetectorPuppetStrategy -class LeaderLeavingMultiJvmNode3 extends LeaderLeavingSpec with FailureDetectorPuppetStrategy +class LeaderLeavingMultiJvmNode1 extends LeaderLeavingSpec +class LeaderLeavingMultiJvmNode2 extends LeaderLeavingSpec +class LeaderLeavingMultiJvmNode3 extends LeaderLeavingSpec abstract class LeaderLeavingSpec extends MultiNodeSpec(LeaderLeavingMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala index 1d50fe0d37..afeec13d9e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala @@ -26,12 +26,12 @@ object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig { unreachable-nodes-reaper-interval = 300 s # turn "off" reaping to unreachable node set } """) - .withFallback(MultiNodeClusterSpec.clusterConfig))) + .withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))) } -class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListenerExitingSpec with FailureDetectorPuppetStrategy -class MembershipChangeListenerExitingMultiJvmNode2 extends MembershipChangeListenerExitingSpec with FailureDetectorPuppetStrategy -class MembershipChangeListenerExitingMultiJvmNode3 extends MembershipChangeListenerExitingSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListenerExitingSpec +class MembershipChangeListenerExitingMultiJvmNode2 extends MembershipChangeListenerExitingSpec +class MembershipChangeListenerExitingMultiJvmNode3 extends MembershipChangeListenerExitingSpec abstract class MembershipChangeListenerExitingSpec extends MultiNodeSpec(MembershipChangeListenerExitingMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala index 7c14af7203..b2c69c7c17 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala @@ -17,11 +17,11 @@ object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } -class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec with FailureDetectorPuppetStrategy -class MembershipChangeListenerJoinMultiJvmNode2 extends MembershipChangeListenerJoinSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec +class MembershipChangeListenerJoinMultiJvmNode2 extends MembershipChangeListenerJoinSpec abstract class MembershipChangeListenerJoinSpec extends MultiNodeSpec(MembershipChangeListenerJoinMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala index b1a7663154..835c3d722e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala @@ -24,12 +24,12 @@ object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig { .withFallback(ConfigFactory.parseString(""" akka.cluster.unreachable-nodes-reaper-interval = 300 s # turn "off" """)) - .withFallback(MultiNodeClusterSpec.clusterConfig)) + .withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } -class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListenerLeavingSpec with FailureDetectorPuppetStrategy -class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec with FailureDetectorPuppetStrategy -class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListenerLeavingSpec +class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec +class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec abstract class MembershipChangeListenerLeavingSpec extends MultiNodeSpec(MembershipChangeListenerLeavingMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala index c44e61df57..f4faf4234b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala @@ -16,12 +16,12 @@ object MembershipChangeListenerUpMultiJvmSpec extends MultiNodeConfig { val second = role("second") val third = role("third") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } -class MembershipChangeListenerUpMultiJvmNode1 extends MembershipChangeListenerUpSpec with FailureDetectorPuppetStrategy -class MembershipChangeListenerUpMultiJvmNode2 extends MembershipChangeListenerUpSpec with FailureDetectorPuppetStrategy -class MembershipChangeListenerUpMultiJvmNode3 extends MembershipChangeListenerUpSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerUpMultiJvmNode1 extends MembershipChangeListenerUpSpec +class MembershipChangeListenerUpMultiJvmNode2 extends MembershipChangeListenerUpSpec +class MembershipChangeListenerUpMultiJvmNode3 extends MembershipChangeListenerUpSpec abstract class MembershipChangeListenerUpSpec extends MultiNodeSpec(MembershipChangeListenerUpMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 161b3d3601..af47d869dc 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -20,6 +20,14 @@ import akka.actor.ActorPath import akka.actor.RootActorPath object MultiNodeClusterSpec { + + def clusterConfigWithFailureDetectorPuppet: Config = + ConfigFactory.parseString("akka.cluster.failure-detector.implementation-class = akka.cluster.FailureDetectorPuppet"). + withFallback(clusterConfig) + + def clusterConfig(failureDetectorPuppet: Boolean): Config = + if (failureDetectorPuppet) clusterConfigWithFailureDetectorPuppet else clusterConfig + def clusterConfig: Config = ConfigFactory.parseString(""" akka.actor.provider = akka.cluster.ClusterActorRefProvider akka.cluster { @@ -32,6 +40,7 @@ object MultiNodeClusterSpec { periodic-tasks-initial-delay = 300 ms publish-stats-interval = 0 s # always, when it happens } + akka.remote.log-remote-lifecycle-events = off akka.test { single-expect-default = 5 s } @@ -81,39 +90,12 @@ trait MultiNodeClusterSpec extends Suite { self: MultiNodeSpec ⇒ throw t } - /** - * Make it possible to override/configure seedNodes from tests without - * specifying in config. Addresses are unknown before startup time. - */ - protected def seedNodes: IndexedSeq[RoleName] = IndexedSeq.empty - - /** - * The cluster node instance. Needs to be lazily created. - */ - private lazy val clusterNode = this match { - case x: FailureDetectorStrategy ⇒ createTestCluster(x.failureDetector) - case _ if seedNodes.nonEmpty ⇒ - createTestCluster(new AccrualFailureDetector(system, new ClusterSettings(system.settings.config, system.name))) - case _ ⇒ Cluster(system) - - } - - private def createTestCluster(failureDetector: FailureDetector): Cluster = { - new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) { - override def seedNodes: IndexedSeq[Address] = { - val testSeedNodes = MultiNodeClusterSpec.this.seedNodes - if (testSeedNodes.isEmpty) super.seedNodes - else testSeedNodes map address - } - } - } - def clusterView: ClusterReadView = cluster.readView /** * Get the cluster node to use. */ - def cluster: Cluster = clusterNode + def cluster: Cluster = Cluster(system) /** * Use this method for the initial startup of the cluster node. @@ -224,5 +206,25 @@ trait MultiNodeClusterSpec extends Suite { self: MultiNodeSpec ⇒ def roleName(addr: Address): Option[RoleName] = roles.find(address(_) == addr) + /** + * Marks a node as available in the failure detector if + * [[akka.cluster.FailureDetectorPuppet]] is used as + * failure detector. + */ + def markNodeAsAvailable(address: Address): Unit = cluster.failureDetector match { + case puppet: FailureDetectorPuppet ⇒ puppet.markNodeAsAvailable(address) + case _ ⇒ + } + + /** + * Marks a node as unavailable in the failure detector if + * [[akka.cluster.FailureDetectorPuppet]] is used as + * failure detector. + */ + def markNodeAsUnavailable(address: Address): Unit = cluster.failureDetector match { + case puppet: FailureDetectorPuppet ⇒ puppet.markNodeAsUnavailable(address) + case _ ⇒ + } + } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala index 752857316b..3fec2f22ad 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala @@ -15,12 +15,12 @@ object NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec extends MultiNodeConfig val second = role("second") val third = role("third") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } -class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec with FailureDetectorPuppetStrategy -class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode2 extends NodeLeavingAndExitingAndBeingRemovedSpec with FailureDetectorPuppetStrategy -class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode3 extends NodeLeavingAndExitingAndBeingRemovedSpec with FailureDetectorPuppetStrategy +class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec +class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode2 extends NodeLeavingAndExitingAndBeingRemovedSpec +class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode3 extends NodeLeavingAndExitingAndBeingRemovedSpec abstract class NodeLeavingAndExitingAndBeingRemovedSpec extends MultiNodeSpec(NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala index 529866c433..0d0b45fd04 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala @@ -23,12 +23,12 @@ object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig { .withFallback(ConfigFactory.parseString(""" # turn off unreachable reaper akka.cluster.unreachable-nodes-reaper-interval = 300 s""") - .withFallback(MultiNodeClusterSpec.clusterConfig))) + .withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))) } -class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec with FailureDetectorPuppetStrategy -class NodeLeavingAndExitingMultiJvmNode2 extends NodeLeavingAndExitingSpec with FailureDetectorPuppetStrategy -class NodeLeavingAndExitingMultiJvmNode3 extends NodeLeavingAndExitingSpec with FailureDetectorPuppetStrategy +class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec +class NodeLeavingAndExitingMultiJvmNode2 extends NodeLeavingAndExitingSpec +class NodeLeavingAndExitingMultiJvmNode3 extends NodeLeavingAndExitingSpec abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExitingMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala index 3df642afad..86e5dd71e9 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala @@ -13,12 +13,12 @@ object NodeMembershipMultiJvmSpec extends MultiNodeConfig { val second = role("second") val third = role("third") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } -class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec with FailureDetectorPuppetStrategy -class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec with FailureDetectorPuppetStrategy -class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec with FailureDetectorPuppetStrategy +class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec +class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec +class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala index 4318d0e79d..0b6cea8683 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala @@ -18,11 +18,11 @@ object NodeUpMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } -class NodeUpMultiJvmNode1 extends NodeUpSpec with FailureDetectorPuppetStrategy -class NodeUpMultiJvmNode2 extends NodeUpSpec with FailureDetectorPuppetStrategy +class NodeUpMultiJvmNode1 extends NodeUpSpec +class NodeUpMultiJvmNode2 extends NodeUpSpec abstract class NodeUpSpec extends MultiNodeSpec(NodeUpMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala index c0ac0f81de..d84ae89e5d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala @@ -9,7 +9,7 @@ import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import scala.concurrent.util.duration._ -object SingletonClusterMultiJvmSpec extends MultiNodeConfig { +case class SingletonClusterMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") val second = role("second") @@ -21,21 +21,23 @@ object SingletonClusterMultiJvmSpec extends MultiNodeConfig { failure-detector.threshold = 4 } """)). - withFallback(MultiNodeClusterSpec.clusterConfig)) + withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) } -class SingletonClusterWithFailureDetectorPuppetMultiJvmNode1 extends SingletonClusterSpec with FailureDetectorPuppetStrategy -class SingletonClusterWithFailureDetectorPuppetMultiJvmNode2 extends SingletonClusterSpec with FailureDetectorPuppetStrategy +class SingletonClusterWithFailureDetectorPuppetMultiJvmNode1 extends SingletonClusterSpec(failureDetectorPuppet = true) +class SingletonClusterWithFailureDetectorPuppetMultiJvmNode2 extends SingletonClusterSpec(failureDetectorPuppet = true) -class SingletonClusterWithAccrualFailureDetectorMultiJvmNode1 extends SingletonClusterSpec with AccrualFailureDetectorStrategy -class SingletonClusterWithAccrualFailureDetectorMultiJvmNode2 extends SingletonClusterSpec with AccrualFailureDetectorStrategy +class SingletonClusterWithAccrualFailureDetectorMultiJvmNode1 extends SingletonClusterSpec(failureDetectorPuppet = false) +class SingletonClusterWithAccrualFailureDetectorMultiJvmNode2 extends SingletonClusterSpec(failureDetectorPuppet = false) -abstract class SingletonClusterSpec - extends MultiNodeSpec(SingletonClusterMultiJvmSpec) - with MultiNodeClusterSpec with FailureDetectorStrategy { +abstract class SingletonClusterSpec(multiNodeConfig: SingletonClusterMultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) + with MultiNodeClusterSpec { - import SingletonClusterMultiJvmSpec._ + def this(failureDetectorPuppet: Boolean) = this(SingletonClusterMultiNodeConfig(failureDetectorPuppet)) + + import multiNodeConfig._ "A cluster of 2 nodes" must { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala index 365940ab8a..22e3cae694 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala @@ -13,7 +13,7 @@ import scala.concurrent.util.duration._ import akka.actor.Address import akka.remote.testconductor.Direction -object SplitBrainMultiJvmSpec extends MultiNodeConfig { +case class SplitBrainMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") @@ -26,26 +26,28 @@ object SplitBrainMultiJvmSpec extends MultiNodeConfig { auto-down = on failure-detector.threshold = 4 }""")). - withFallback(MultiNodeClusterSpec.clusterConfig)) + withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) } -class SplitBrainWithFailureDetectorPuppetMultiJvmNode1 extends SplitBrainSpec with FailureDetectorPuppetStrategy -class SplitBrainWithFailureDetectorPuppetMultiJvmNode2 extends SplitBrainSpec with FailureDetectorPuppetStrategy -class SplitBrainWithFailureDetectorPuppetMultiJvmNode3 extends SplitBrainSpec with FailureDetectorPuppetStrategy -class SplitBrainWithFailureDetectorPuppetMultiJvmNode4 extends SplitBrainSpec with FailureDetectorPuppetStrategy -class SplitBrainWithFailureDetectorPuppetMultiJvmNode5 extends SplitBrainSpec with FailureDetectorPuppetStrategy +class SplitBrainWithFailureDetectorPuppetMultiJvmNode1 extends SplitBrainSpec(failureDetectorPuppet = true) +class SplitBrainWithFailureDetectorPuppetMultiJvmNode2 extends SplitBrainSpec(failureDetectorPuppet = true) +class SplitBrainWithFailureDetectorPuppetMultiJvmNode3 extends SplitBrainSpec(failureDetectorPuppet = true) +class SplitBrainWithFailureDetectorPuppetMultiJvmNode4 extends SplitBrainSpec(failureDetectorPuppet = true) +class SplitBrainWithFailureDetectorPuppetMultiJvmNode5 extends SplitBrainSpec(failureDetectorPuppet = true) -class SplitBrainWithAccrualFailureDetectorMultiJvmNode1 extends SplitBrainSpec with AccrualFailureDetectorStrategy -class SplitBrainWithAccrualFailureDetectorMultiJvmNode2 extends SplitBrainSpec with AccrualFailureDetectorStrategy -class SplitBrainWithAccrualFailureDetectorMultiJvmNode3 extends SplitBrainSpec with AccrualFailureDetectorStrategy -class SplitBrainWithAccrualFailureDetectorMultiJvmNode4 extends SplitBrainSpec with AccrualFailureDetectorStrategy -class SplitBrainWithAccrualFailureDetectorMultiJvmNode5 extends SplitBrainSpec with AccrualFailureDetectorStrategy +class SplitBrainWithAccrualFailureDetectorMultiJvmNode1 extends SplitBrainSpec(failureDetectorPuppet = false) +class SplitBrainWithAccrualFailureDetectorMultiJvmNode2 extends SplitBrainSpec(failureDetectorPuppet = false) +class SplitBrainWithAccrualFailureDetectorMultiJvmNode3 extends SplitBrainSpec(failureDetectorPuppet = false) +class SplitBrainWithAccrualFailureDetectorMultiJvmNode4 extends SplitBrainSpec(failureDetectorPuppet = false) +class SplitBrainWithAccrualFailureDetectorMultiJvmNode5 extends SplitBrainSpec(failureDetectorPuppet = false) -abstract class SplitBrainSpec - extends MultiNodeSpec(SplitBrainMultiJvmSpec) - with MultiNodeClusterSpec with FailureDetectorStrategy { +abstract class SplitBrainSpec(multiNodeConfig: SplitBrainMultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) + with MultiNodeClusterSpec { - import SplitBrainMultiJvmSpec._ + def this(failureDetectorPuppet: Boolean) = this(SplitBrainMultiNodeConfig(failureDetectorPuppet)) + + import multiNodeConfig._ val side1 = IndexedSeq(first, second) val side2 = IndexedSeq(third, fourth, fifth) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index 3ff2935949..4df6032116 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -23,17 +23,16 @@ object TransitionMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString("akka.cluster.periodic-tasks-initial-delay = 300 s # turn off all periodic tasks")). - withFallback(MultiNodeClusterSpec.clusterConfig)) + withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } -class TransitionMultiJvmNode1 extends TransitionSpec with FailureDetectorPuppetStrategy -class TransitionMultiJvmNode2 extends TransitionSpec with FailureDetectorPuppetStrategy -class TransitionMultiJvmNode3 extends TransitionSpec with FailureDetectorPuppetStrategy +class TransitionMultiJvmNode1 extends TransitionSpec +class TransitionMultiJvmNode2 extends TransitionSpec +class TransitionMultiJvmNode3 extends TransitionSpec abstract class TransitionSpec extends MultiNodeSpec(TransitionMultiJvmSpec) with MultiNodeClusterSpec - with FailureDetectorStrategy with ImplicitSender { import TransitionMultiJvmSpec._ diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala index e3f21d86cf..97e15c0e4d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -14,7 +14,7 @@ import akka.actor.Address import akka.remote.testconductor.{ RoleName, Direction } import scala.concurrent.util.duration._ -object UnreachableNodeRejoinsClusterMultiJvmSpec extends MultiNodeConfig { +case class UnreachableNodeRejoinsClusterMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") @@ -23,20 +23,23 @@ object UnreachableNodeRejoinsClusterMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy -class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy -class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy -class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy +class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true) +class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true) +class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true) +class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true) -class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy -class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy -class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy -class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy +class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = false) +class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = false) +class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = false) +class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = false) -abstract class UnreachableNodeRejoinsClusterSpec - extends MultiNodeSpec(UnreachableNodeRejoinsClusterMultiJvmSpec) - with MultiNodeClusterSpec with FailureDetectorStrategy { - import UnreachableNodeRejoinsClusterMultiJvmSpec._ +abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNodeRejoinsClusterMultiNodeConfig) + extends MultiNodeSpec(multiNodeConfig) + with MultiNodeClusterSpec { + + def this(failureDetectorPuppet: Boolean) = this(UnreachableNodeRejoinsClusterMultiNodeConfig(failureDetectorPuppet)) + + import multiNodeConfig._ def allBut(role: RoleName, roles: Seq[RoleName] = roles): Seq[RoleName] = { roles.filterNot(_ == role) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 3086339517..6666e38cce 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -25,8 +25,10 @@ object ClusterSpec { auto-down = off periodic-tasks-initial-delay = 120 seconds // turn off scheduled tasks publish-stats-interval = 0 s # always, when it happens + failure-detector.implementation-class = akka.cluster.FailureDetectorPuppet } akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off akka.remote.netty.port = 0 # akka.loglevel = DEBUG """ @@ -40,9 +42,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { val selfAddress = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[ClusterActorRefProvider].transport.address - val failureDetector = new FailureDetectorPuppet(system) - - val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) + val cluster = Cluster(system) def clusterView = cluster.readView def leaderActions(): Unit = { diff --git a/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala index 9ddc9942b0..869df97f84 100644 --- a/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala +++ b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala @@ -6,6 +6,7 @@ package akka.cluster import akka.actor.{ Address, ActorSystem } import akka.event.{ Logging, LogSource } +import akka.remote.testkit.MultiNodeConfig /** * User controllable "puppet" failure detector. @@ -13,8 +14,6 @@ import akka.event.{ Logging, LogSource } class FailureDetectorPuppet(system: ActorSystem, settings: ClusterSettings) extends FailureDetector { import java.util.concurrent.ConcurrentHashMap - def this(system: ActorSystem) = this(system, new ClusterSettings(system.settings.config, system.name)) - trait Status object Up extends Status object Down extends Status @@ -63,3 +62,4 @@ class FailureDetectorPuppet(system: ActorSystem, settings: ClusterSettings) exte connections.clear() } } + diff --git a/akka-docs/_sphinx/pygments/setup.py b/akka-docs/_sphinx/pygments/setup.py index 7c86a6a681..cdfa31d397 100644 --- a/akka-docs/_sphinx/pygments/setup.py +++ b/akka-docs/_sphinx/pygments/setup.py @@ -15,5 +15,5 @@ setup( description = __doc__, author = "Akka", packages = ['styles'], - entry_points = entry_points + entry_points = entry_points ) diff --git a/akka-docs/cluster/cluster-usage.rst b/akka-docs/cluster/cluster-usage.rst index 7bf4e754db..0b72dbce70 100644 --- a/akka-docs/cluster/cluster-usage.rst +++ b/akka-docs/cluster/cluster-usage.rst @@ -18,7 +18,7 @@ The Akka cluster is a separate jar file. Make sure that you have the following d .. parsed-literal:: - "com.typesafe.akka" % "akka-cluster_|scalaVersion|" % "2.1-SNAPSHOT" + "com.typesafe.akka" %% "akka-cluster" % "2.1-SNAPSHOT" If you are using the latest nightly build you should pick a timestamped Akka version from ``_. diff --git a/akka-docs/conf.py b/akka-docs/conf.py index 2ca3a5c66c..63afaa3bb8 100644 --- a/akka-docs/conf.py +++ b/akka-docs/conf.py @@ -50,6 +50,7 @@ html_show_sourcelink = False html_show_sphinx = False html_show_copyright = True htmlhelp_basename = 'Akkadoc' +html_use_smartypants = False html_add_permalinks = '' html_context = { diff --git a/akka-docs/general/howdoi.rst b/akka-docs/general/howdoi.rst deleted file mode 100644 index 0dd9a12ed9..0000000000 --- a/akka-docs/general/howdoi.rst +++ /dev/null @@ -1,31 +0,0 @@ -.. _howdoi: - -How do I … -================================ - -This section of the Akka Documentation tries to answer common usage questions. - -… deal with blocking third-party code? --------------------------------------- - -Some times you cannot avoid doing blocking, and in that case you might want to explore the following: - - 1. Isolate the blocking to a dedicated ``ExecutionContext`` - 2. Configure the actor to have a bounded-mailbox as to prevent from excessive mailbox sizes - -.. note:: - - Before you do anything at all, measure! - - -… use persistence with Akka? ----------------------------- - -You just use it? -You might want to have a look at the answer to the question about blocking though. - -… use a pool of connections/whatnots ------------------------------------- - -You most probably want to wrap that pooling service as an Akka Extension, -see the docs for documentation on Java / Scala Extensions. \ No newline at end of file diff --git a/akka-docs/general/index.rst b/akka-docs/general/index.rst index edfebff5c5..e76a81a80c 100644 --- a/akka-docs/general/index.rst +++ b/akka-docs/general/index.rst @@ -11,5 +11,4 @@ General remoting jmm message-send-semantics - configuration - howdoi + configuration \ No newline at end of file diff --git a/akka-docs/intro/getting-started.rst b/akka-docs/intro/getting-started.rst index 9fae5281a8..11b699fa8f 100644 --- a/akka-docs/intro/getting-started.rst +++ b/akka-docs/intro/getting-started.rst @@ -14,9 +14,9 @@ The best way to start learning Akka is to download the Typesafe Stack and either the Akka Getting Started Tutorials or check out one of Akka Template Projects. Both comes in several flavours depending on your development environment preferences. -- `Download Typesafe Stack `_ -- `Getting Started Tutorials `_ -- `Template Projects `_ +- `Download Typesafe Stack `_ +- `Getting Started Tutorials `_ +- `Template Projects `_ Download -------- diff --git a/akka-docs/scala/code/docs/actor/FSMDocSpec.scala b/akka-docs/scala/code/docs/actor/FSMDocSpec.scala index ff9ae0cc14..5bc1ea8d70 100644 --- a/akka-docs/scala/code/docs/actor/FSMDocSpec.scala +++ b/akka-docs/scala/code/docs/actor/FSMDocSpec.scala @@ -5,11 +5,11 @@ package docs.actor import language.postfixOps +import akka.testkit.{ AkkaSpec ⇒ MyFavoriteTestFrameWorkPlusAkkaTestKit } //#test-code -import akka.testkit.AkkaSpec import akka.actor.Props -class FSMDocSpec extends AkkaSpec { +class FSMDocSpec extends MyFavoriteTestFrameWorkPlusAkkaTestKit { "simple finite state machine" must { //#fsm-code-elided diff --git a/akka-docs/scala/howto.rst b/akka-docs/scala/howto.rst index c588c2fe25..bcaa456b82 100644 --- a/akka-docs/scala/howto.rst +++ b/akka-docs/scala/howto.rst @@ -19,7 +19,7 @@ Throttling Messages Contributed by: Kaspar Fischer -A message throttler that ensures that messages are not sent out at too high a rate. +"A message throttler that ensures that messages are not sent out at too high a rate." The pattern is described `here `_. @@ -28,9 +28,9 @@ Balancing Workload Across Nodes Contributed by: Derek Wyatt -Often times, people want the functionality of the BalancingDispatcher with the +"Often times, people want the functionality of the BalancingDispatcher with the stipulation that the Actors doing the work have distinct Mailboxes on remote -nodes. In this post we’ll explore the implementation of such a concept. +nodes. In this post we’ll explore the implementation of such a concept." The pattern is described `here `_. @@ -39,15 +39,78 @@ Ordered Termination Contributed by: Derek Wyatt -When an Actor stops, its children stop in an undefined order. Child termination is +"When an Actor stops, its children stop in an undefined order. Child termination is asynchronous and thus non-deterministic. If an Actor has children that have order dependencies, then you might need to ensure a particular shutdown order of those children so that their postStop() methods get -called in the right order. +called in the right order." The pattern is described `here `_. +Akka AMQP Proxies +================= + +Contributed by: Fabrice Drouin + +"“AMQP proxies” is a simple way of integrating AMQP with Akka to distribute jobs across a network of computing nodes. +You still write “local” code, have very little to configure, and end up with a distributed, elastic, +fault-tolerant grid where computing nodes can be written in nearly every programming language." + +The pattern is described `here `_. + +Shutdown Patterns in Akka 2 +=========================== + +Contributed by: Derek Wyatt + +“How do you tell Akka to shut down the ActorSystem when everything’s finished? +It turns out that there’s no magical flag for this, no configuration setting, no special callback you can register for, +and neither will the illustrious shutdown fairy grace your application with her glorious presence at that perfect moment. +She’s just plain mean. + +In this post, we’ll discuss why this is the case and provide you with a simple option for shutting down “at the right time”, +as well as a not-so-simple-option for doing the exact same thing." + +The pattern is described `here `_. + +Distributed (in-memory) graph processing with Akka +================================================== + +Contributed by: Adelbert Chang + +"Graphs have always been an interesting structure to study in both mathematics and computer science (among other fields), +and have become even more interesting in the context of online social networks such as Facebook and Twitter, +whose underlying network structures are nicely represented by graphs." + +The pattern is described `here `_. + +Case Study: An Auto-Updating Cache Using Actors +=============================================== + +Contributed by: Eric Pederson + +"We recently needed to build a caching system in front of a slow backend system with the following requirements: + +The data in the backend system is constantly being updated so the caches need to be updated every N minutes. +Requests to the backend system need to be throttled. +The caching system we built used Akka actors and Scala’s support for functions as first class objects." + +The pattern is described `here `_. + +Discovering message flows in actor systems with the Spider Pattern +================================================================== + +Contributed by: Raymond Roestenburg + +"Building actor systems is fun but debugging them can be difficult, you mostly end up browsing through many log files +on several machines to find out what’s going on. I’m sure you have browsed through logs and thought, +“Hey, where did that message go?”, “Why did this message cause that effect” or “Why did this actor never get a message?” + +This is where the Spider pattern comes in." + +The pattern is described `here `_. + Template Pattern ================ diff --git a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala index 29f404e1f9..8ae386d4eb 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -54,6 +54,7 @@ abstract class MultiNodeConfig { receive = on fsm = on } + akka.remote.log-remote-lifecycle-events = on """) else ConfigFactory.parseString("akka.loglevel = INFO") diff --git a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala index 1b21b89ccb..f800872657 100644 --- a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala +++ b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala @@ -6,7 +6,6 @@ package akka.routing import com.typesafe.config.ConfigFactory import akka.actor.ActorContext import akka.actor.ActorRef -import akka.actor.ActorSystemImpl import akka.actor.Deploy import akka.actor.InternalActorRef import akka.actor.Props @@ -18,6 +17,7 @@ import akka.actor.Address import scala.collection.JavaConverters._ import java.util.concurrent.atomic.AtomicInteger import java.lang.IllegalStateException +import akka.actor.ActorCell /** * [[akka.routing.RouterConfig]] implementation for remote deployment on defined @@ -75,12 +75,14 @@ class RemoteRouteeProvider(nodes: Iterable[Address], _context: ActorContext, _ro format context.self.path.toString) override def createRoutees(nrOfInstances: Int): Unit = { - val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559 val refs = IndexedSeq.empty[ActorRef] ++ (for (i ← 1 to nrOfInstances) yield { val name = "c" + childNameCounter.incrementAndGet val deploy = Deploy("", ConfigFactory.empty(), routeeProps.routerConfig, RemoteScope(nodeAddressIter.next)) - impl.provider.actorOf(impl, routeeProps, context.self.asInstanceOf[InternalActorRef], context.self.path / name, - systemService = false, Some(deploy), lookupDeploy = false, async = false) + + // attachChild means that the provider will treat this call as if possibly done out of the wrong + // context and use RepointableActorRef instead of LocalActorRef. Seems like a slightly sub-optimal + // choice in a corner case (and hence not worth fixing). + context.asInstanceOf[ActorCell].attachChild(routeeProps.withDeploy(deploy), name, systemService = false) }) registerRoutees(refs) } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala index 1255489691..8d7effc5d9 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala @@ -67,10 +67,10 @@ akka.actor.deployment { nr-of-instances = 4 } }""").withFallback(system.settings.config) - val other = ActorSystem("remote-sys", conf) + val otherSystem = ActorSystem("remote-sys", conf) override def atTermination() { - other.shutdown() + otherSystem.shutdown() } "A Remote Router" must { @@ -199,6 +199,25 @@ akka.actor.deployment { system.stop(router) } + "set supplied supervisorStrategy" in { + val escalator = OneForOneStrategy() { + case e ⇒ + println("## " + e) + testActor ! e; SupervisorStrategy.Escalate + } + val router = system.actorOf(Props.empty.withRouter(new RemoteRouterConfig( + RoundRobinRouter(1, supervisorStrategy = escalator), + Seq(Address("akka", "remote-sys", "localhost", 12347)))), "blub3") + + router ! CurrentRoutees + EventFilter[ActorKilledException](occurrences = 1) intercept { + EventFilter[ActorKilledException](occurrences = 1).intercept { + expectMsgType[RouterRoutees].routees.head ! Kill + }(otherSystem) + } + expectMsgType[ActorKilledException] + } + } } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index e52fe741ea..a1721d2ffe 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -568,7 +568,7 @@ trait TestKitBase { val message = if (max == 0.seconds) { queue.pollFirst - } else if (max.finite_?) { + } else if (max.isFinite) { queue.pollFirst(max.length, max.unit) } else { queue.takeFirst diff --git a/project/Sphinx.scala b/project/Sphinx.scala index be35d83916..b349265b60 100644 --- a/project/Sphinx.scala +++ b/project/Sphinx.scala @@ -52,7 +52,7 @@ object Sphinx { val env = "PYTHONPATH" -> target.absolutePath s.log.debug("Command: " + command.mkString(" ") + "\nEnv:" + env) val exitCode = Process(command, cwd, env) ! logger - if (exitCode != 0) sys.error("Failed to install custom Sphinx pygments styles.") + if (exitCode != 0) sys.error("Failed to install custom Sphinx pygments styles: exit code " + exitCode) (pygments * ("*.egg-info" | "build" | "temp")).get.foreach(IO.delete) s.log.info("Sphinx pygments styles installed at: " + target) }