Merge branch 'master' into wip-2103-cluster-routers-patriknw
Conflicts: akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala
This commit is contained in:
commit
018a949678
45 changed files with 428 additions and 421 deletions
|
|
@ -102,8 +102,6 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
|
|||
}
|
||||
|
||||
"be able to send their routees" in {
|
||||
val doneLatch = new TestLatch(1)
|
||||
|
||||
class TheActor extends Actor {
|
||||
val routee1 = context.actorOf(Props[TestActor], "routee1")
|
||||
val routee2 = context.actorOf(Props[TestActor], "routee2")
|
||||
|
|
@ -114,19 +112,18 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
|
|||
within = 5 seconds)))
|
||||
|
||||
def receive = {
|
||||
case RouterRoutees(iterable) ⇒
|
||||
iterable.exists(_.path.name == "routee1") must be(true)
|
||||
iterable.exists(_.path.name == "routee2") must be(true)
|
||||
iterable.exists(_.path.name == "routee3") must be(true)
|
||||
doneLatch.countDown()
|
||||
case "doIt" ⇒
|
||||
router ! CurrentRoutees
|
||||
case "doIt" ⇒ router ! CurrentRoutees
|
||||
case routees: RouterRoutees ⇒ testActor forward routees
|
||||
}
|
||||
}
|
||||
|
||||
val theActor = system.actorOf(Props(new TheActor), "theActor")
|
||||
theActor ! "doIt"
|
||||
Await.ready(doneLatch, remaining)
|
||||
val routees = expectMsgPF() {
|
||||
case RouterRoutees(routees) ⇒ routees.toSet
|
||||
}
|
||||
|
||||
routees.map(_.path.name) must be(Set("routee1", "routee2", "routee3"))
|
||||
}
|
||||
|
||||
"use configured nr-of-instances when FromConfig" in {
|
||||
|
|
@ -226,14 +223,9 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
|
|||
}
|
||||
|
||||
"send message to connection" in {
|
||||
val doneLatch = new TestLatch(1)
|
||||
|
||||
val counter = new AtomicInteger(0)
|
||||
|
||||
class Actor1 extends Actor {
|
||||
def receive = {
|
||||
case "end" ⇒ doneLatch.countDown()
|
||||
case _ ⇒ counter.incrementAndGet
|
||||
case msg ⇒ testActor forward msg
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -241,9 +233,8 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
|
|||
routedActor ! "hello"
|
||||
routedActor ! "end"
|
||||
|
||||
Await.ready(doneLatch, remaining)
|
||||
|
||||
counter.get must be(1)
|
||||
expectMsg("hello")
|
||||
expectMsg("end")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -597,7 +597,7 @@ trait FSM[S, D] extends Listeners with ActorLogging {
|
|||
val timeout = if (currentState.timeout.isDefined) currentState.timeout else stateTimeouts(currentState.stateName)
|
||||
if (timeout.isDefined) {
|
||||
val t = timeout.get
|
||||
if (t.finite_? && t.length >= 0) {
|
||||
if (t.isFinite && t.length >= 0) {
|
||||
import context.dispatcher
|
||||
timeoutFuture = Some(context.system.scheduler.scheduleOnce(t, self, TimeoutMarker(generation)))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -63,6 +63,12 @@ akka {
|
|||
|
||||
failure-detector {
|
||||
|
||||
# FQCN of the failure detector implementation.
|
||||
# It must implement akka.cluster.akka.cluster and
|
||||
# have constructor with akka.actor.ActorSystem and
|
||||
# akka.cluster.ClusterSettings parameters
|
||||
implementation-class = "akka.cluster.AccrualFailureDetector"
|
||||
|
||||
# defines the failure detector threshold
|
||||
# A low threshold is prone to generate many wrong suspicions but ensures
|
||||
# a quick detection in the event of a real crash. Conversely, a high
|
||||
|
|
@ -84,8 +90,6 @@ akka {
|
|||
# network drop.
|
||||
acceptable-heartbeat-pause = 3s
|
||||
|
||||
implementation-class = "akka.cluster.AccrualFailureDetector"
|
||||
|
||||
max-sample-size = 1000
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -40,19 +40,7 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
|
|||
|
||||
override def lookup = Cluster
|
||||
|
||||
override def createExtension(system: ExtendedActorSystem): Cluster = {
|
||||
val clusterSettings = new ClusterSettings(system.settings.config, system.name)
|
||||
|
||||
val failureDetector = {
|
||||
import clusterSettings.{ FailureDetectorImplementationClass ⇒ fqcn }
|
||||
system.dynamicAccess.createInstanceFor[FailureDetector](
|
||||
fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> clusterSettings)).recover({
|
||||
case e ⇒ throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString)
|
||||
}).get
|
||||
}
|
||||
|
||||
new Cluster(system, failureDetector)
|
||||
}
|
||||
override def createExtension(system: ExtendedActorSystem): Cluster = new Cluster(system)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -69,7 +57,7 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
|
|||
* if (Cluster(system).isLeader) { ... }
|
||||
* }}}
|
||||
*/
|
||||
class Cluster(val system: ExtendedActorSystem, val failureDetector: FailureDetector) extends Extension with ClusterEnvironment {
|
||||
class Cluster(val system: ExtendedActorSystem) extends Extension {
|
||||
|
||||
import ClusterEvent._
|
||||
|
||||
|
|
@ -86,6 +74,14 @@ class Cluster(val system: ExtendedActorSystem, val failureDetector: FailureDetec
|
|||
|
||||
log.info("Cluster Node [{}] - is starting up...", selfAddress)
|
||||
|
||||
val failureDetector = {
|
||||
import settings.{ FailureDetectorImplementationClass ⇒ fqcn }
|
||||
system.dynamicAccess.createInstanceFor[FailureDetector](
|
||||
fqcn, Seq(classOf[ActorSystem] -> system, classOf[ClusterSettings] -> settings)).recover({
|
||||
case e ⇒ throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString)
|
||||
}).get
|
||||
}
|
||||
|
||||
// ========================================================
|
||||
// ===================== WORK DAEMONS =====================
|
||||
// ========================================================
|
||||
|
|
@ -140,7 +136,7 @@ class Cluster(val system: ExtendedActorSystem, val failureDetector: FailureDetec
|
|||
|
||||
// create supervisor for daemons under path "/system/cluster"
|
||||
private val clusterDaemons: ActorRef = {
|
||||
system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new ClusterDaemon(this)).
|
||||
system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new ClusterDaemon(settings)).
|
||||
withDispatcher(UseDispatcher), name = "cluster")
|
||||
}
|
||||
|
||||
|
|
@ -215,10 +211,12 @@ class Cluster(val system: ExtendedActorSystem, val failureDetector: FailureDetec
|
|||
// ========================================================
|
||||
|
||||
/**
|
||||
* Make it possible to override/configure seedNodes from tests without
|
||||
* specifying in config. Addresses are unknown before startup time.
|
||||
* Make it possible to join the specified seed nodes without defining them
|
||||
* in config. Especially useful from tests when Addresses are unknown
|
||||
* before startup time.
|
||||
*/
|
||||
private[cluster] def seedNodes: IndexedSeq[Address] = SeedNodes
|
||||
private[cluster] def joinSeedNodes(seedNodes: IndexedSeq[Address]): Unit =
|
||||
clusterCore ! InternalClusterAction.JoinSeedNodes(seedNodes)
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
|
|
|
|||
|
|
@ -58,6 +58,12 @@ private[cluster] object InternalClusterAction {
|
|||
*/
|
||||
case class JoinTo(address: Address) extends ClusterMessage
|
||||
|
||||
/**
|
||||
* Command to initiate the process to join the specified
|
||||
* seed nodes.
|
||||
*/
|
||||
case class JoinSeedNodes(seedNodes: IndexedSeq[Address])
|
||||
|
||||
/**
|
||||
* Start message of the process to join one of the seed nodes.
|
||||
* The node sends `InitJoin` to all seed nodes, which replies
|
||||
|
|
@ -128,33 +134,21 @@ private[cluster] object ClusterLeaderAction {
|
|||
case class Remove(address: Address) extends ClusterMessage
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* The contextual pieces that ClusterDaemon actors need.
|
||||
* Makes it easier to test the actors without using the Cluster extension.
|
||||
*/
|
||||
private[cluster] trait ClusterEnvironment {
|
||||
private[cluster] def settings: ClusterSettings
|
||||
private[cluster] def failureDetector: FailureDetector
|
||||
private[cluster] def selfAddress: Address
|
||||
private[cluster] def scheduler: Scheduler
|
||||
private[cluster] def seedNodes: IndexedSeq[Address]
|
||||
private[cluster] def shutdown(): Unit
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*
|
||||
* Supervisor managing the different Cluster daemons.
|
||||
*/
|
||||
private[cluster] final class ClusterDaemon(environment: ClusterEnvironment) extends Actor with ActorLogging {
|
||||
private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Actor with ActorLogging {
|
||||
|
||||
val configuredDispatcher = environment.settings.UseDispatcher
|
||||
val core = context.actorOf(Props(new ClusterCoreDaemon(environment)).
|
||||
withDispatcher(configuredDispatcher), name = "core")
|
||||
val heartbeat = context.actorOf(Props(new ClusterHeartbeatDaemon(environment)).
|
||||
withDispatcher(configuredDispatcher), name = "heartbeat")
|
||||
// Important - don't use Cluster(context.system) here because that would
|
||||
// cause deadlock. The Cluster extension is currently being created and is waiting
|
||||
// for response from GetClusterCoreRef in its constructor.
|
||||
|
||||
val core = context.actorOf(Props[ClusterCoreDaemon].
|
||||
withDispatcher(context.props.dispatcher), name = "core")
|
||||
val heartbeat = context.actorOf(Props[ClusterHeartbeatDaemon].
|
||||
withDispatcher(context.props.dispatcher), name = "heartbeat")
|
||||
|
||||
def receive = {
|
||||
case InternalClusterAction.GetClusterCoreRef ⇒ sender ! core
|
||||
|
|
@ -165,16 +159,14 @@ private[cluster] final class ClusterDaemon(environment: ClusterEnvironment) exte
|
|||
/**
|
||||
* INTERNAL API.
|
||||
*/
|
||||
private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) extends Actor with ActorLogging {
|
||||
private[cluster] final class ClusterCoreDaemon extends Actor with ActorLogging {
|
||||
import ClusterLeaderAction._
|
||||
import InternalClusterAction._
|
||||
import ClusterHeartbeatSender._
|
||||
|
||||
def selfAddress = environment.selfAddress
|
||||
def clusterScheduler = environment.scheduler
|
||||
def failureDetector = environment.failureDetector
|
||||
val settings = environment.settings
|
||||
import settings._
|
||||
val cluster = Cluster(context.system)
|
||||
import cluster.{ selfAddress, scheduler, failureDetector }
|
||||
import cluster.settings._
|
||||
|
||||
val vclockNode = VectorClock.Node(selfAddress.toString)
|
||||
val selfHeartbeat = Heartbeat(selfAddress)
|
||||
|
|
@ -186,55 +178,48 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
|
|||
|
||||
var stats = ClusterStats()
|
||||
|
||||
val heartbeatSender = context.actorOf(Props(new ClusterHeartbeatSender(environment)).
|
||||
val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender].
|
||||
withDispatcher(UseDispatcher), name = "heartbeatSender")
|
||||
val coreSender = context.actorOf(Props(new ClusterCoreSender(selfAddress)).
|
||||
val coreSender = context.actorOf(Props[ClusterCoreSender].
|
||||
withDispatcher(UseDispatcher), name = "coreSender")
|
||||
val publisher = context.actorOf(Props(new ClusterDomainEventPublisher(environment)).
|
||||
val publisher = context.actorOf(Props[ClusterDomainEventPublisher].
|
||||
withDispatcher(UseDispatcher), name = "publisher")
|
||||
|
||||
import context.dispatcher
|
||||
|
||||
// start periodic gossip to random nodes in cluster
|
||||
val gossipTask =
|
||||
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(GossipInterval), GossipInterval) {
|
||||
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(GossipInterval), GossipInterval) {
|
||||
self ! GossipTick
|
||||
}
|
||||
|
||||
// start periodic heartbeat to all nodes in cluster
|
||||
val heartbeatTask =
|
||||
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval), HeartbeatInterval) {
|
||||
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(HeartbeatInterval), HeartbeatInterval) {
|
||||
self ! HeartbeatTick
|
||||
}
|
||||
|
||||
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
|
||||
val failureDetectorReaperTask =
|
||||
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval), UnreachableNodesReaperInterval) {
|
||||
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(UnreachableNodesReaperInterval), UnreachableNodesReaperInterval) {
|
||||
self ! ReapUnreachableTick
|
||||
}
|
||||
|
||||
// start periodic leader action management (only applies for the current leader)
|
||||
private val leaderActionsTask =
|
||||
FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval), LeaderActionsInterval) {
|
||||
FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(LeaderActionsInterval), LeaderActionsInterval) {
|
||||
self ! LeaderActionsTick
|
||||
}
|
||||
|
||||
// start periodic publish of current state
|
||||
private val publishStateTask: Option[Cancellable] =
|
||||
if (PublishStatsInterval == Duration.Zero) None
|
||||
else Some(FixedRateTask(clusterScheduler, PeriodicTasksInitialDelay.max(PublishStatsInterval), PublishStatsInterval) {
|
||||
else Some(FixedRateTask(scheduler, PeriodicTasksInitialDelay.max(PublishStatsInterval), PublishStatsInterval) {
|
||||
self ! PublishStatsTick
|
||||
})
|
||||
|
||||
override def preStart(): Unit = {
|
||||
if (AutoJoin) {
|
||||
// only the node which is named first in the list of seed nodes will join itself
|
||||
if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress)
|
||||
self ! JoinTo(selfAddress)
|
||||
else
|
||||
context.actorOf(Props(new JoinSeedNodeProcess(environment)).
|
||||
withDispatcher(UseDispatcher), name = "joinSeedNodeProcess")
|
||||
}
|
||||
if (AutoJoin) self ! JoinSeedNodes(SeedNodes)
|
||||
}
|
||||
|
||||
override def postStop(): Unit = {
|
||||
|
|
@ -248,6 +233,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
|
|||
def uninitialized: Actor.Receive = {
|
||||
case InitJoin ⇒ // skip, not ready yet
|
||||
case JoinTo(address) ⇒ join(address)
|
||||
case JoinSeedNodes(seedNodes) ⇒ joinSeedNodes(seedNodes)
|
||||
case msg: SubscriptionMessage ⇒ publisher forward msg
|
||||
case _: Tick ⇒ // ignore periodic tasks until initialized
|
||||
}
|
||||
|
|
@ -282,6 +268,15 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
|
|||
|
||||
def initJoin(): Unit = sender ! InitJoinAck(selfAddress)
|
||||
|
||||
def joinSeedNodes(seedNodes: IndexedSeq[Address]): Unit = {
|
||||
// only the node which is named first in the list of seed nodes will join itself
|
||||
if (seedNodes.isEmpty || seedNodes.head == selfAddress)
|
||||
self ! JoinTo(selfAddress)
|
||||
else
|
||||
context.actorOf(Props(new JoinSeedNodeProcess(seedNodes)).
|
||||
withDispatcher(UseDispatcher), name = "joinSeedNodeProcess")
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to join this cluster node with the node specified by 'address'.
|
||||
* A 'Join(thisNodeAddress)' command is sent to the node to join.
|
||||
|
|
@ -393,7 +388,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
|
|||
// make sure the final (removed) state is published
|
||||
// before shutting down
|
||||
implicit val timeout = Timeout(5 seconds)
|
||||
publisher ? PublishDone onComplete { case _ ⇒ environment.shutdown() }
|
||||
publisher ? PublishDone onComplete { case _ ⇒ cluster.shutdown() }
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -796,8 +791,6 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
|
|||
}
|
||||
}
|
||||
|
||||
def seedNodes: IndexedSeq[Address] = environment.seedNodes
|
||||
|
||||
def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] =
|
||||
if (addresses.isEmpty) None
|
||||
else Some(addresses(ThreadLocalRandom.current nextInt addresses.size))
|
||||
|
|
@ -865,22 +858,22 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment)
|
|||
* 5. seed3 retries the join procedure and gets acks from seed2 first, and then joins to seed2
|
||||
*
|
||||
*/
|
||||
private[cluster] final class JoinSeedNodeProcess(environment: ClusterEnvironment) extends Actor with ActorLogging {
|
||||
private[cluster] final class JoinSeedNodeProcess(seedNodes: IndexedSeq[Address]) extends Actor with ActorLogging {
|
||||
import InternalClusterAction._
|
||||
|
||||
def selfAddress = environment.selfAddress
|
||||
def selfAddress = Cluster(context.system).selfAddress
|
||||
|
||||
if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress)
|
||||
if (seedNodes.isEmpty || seedNodes.head == selfAddress)
|
||||
throw new IllegalArgumentException("Join seed node should not be done")
|
||||
|
||||
context.setReceiveTimeout(environment.settings.SeedNodeTimeout)
|
||||
context.setReceiveTimeout(Cluster(context.system).settings.SeedNodeTimeout)
|
||||
|
||||
override def preStart(): Unit = self ! JoinSeedNode
|
||||
|
||||
def receive = {
|
||||
case JoinSeedNode ⇒
|
||||
// send InitJoin to all seed nodes (except myself)
|
||||
environment.seedNodes.collect {
|
||||
seedNodes.collect {
|
||||
case a if a != selfAddress ⇒ context.system.actorFor(context.parent.path.toStringWithAddress(a))
|
||||
} foreach { _ ! InitJoin }
|
||||
case InitJoinAck(address) ⇒
|
||||
|
|
@ -901,9 +894,11 @@ private[cluster] final class JoinSeedNodeProcess(environment: ClusterEnvironment
|
|||
/**
|
||||
* INTERNAL API.
|
||||
*/
|
||||
private[cluster] final class ClusterCoreSender(selfAddress: Address) extends Actor with ActorLogging {
|
||||
private[cluster] final class ClusterCoreSender extends Actor with ActorLogging {
|
||||
import InternalClusterAction._
|
||||
|
||||
val selfAddress = Cluster(context.system).selfAddress
|
||||
|
||||
/**
|
||||
* Looks up and returns the remote cluster command connection for the specific address.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -169,7 +169,7 @@ object ClusterEvent {
|
|||
* Responsible for domain event subscriptions and publishing of
|
||||
* domain events to event bus.
|
||||
*/
|
||||
private[cluster] final class ClusterDomainEventPublisher(environment: ClusterEnvironment) extends Actor with ActorLogging {
|
||||
private[cluster] final class ClusterDomainEventPublisher extends Actor with ActorLogging {
|
||||
import InternalClusterAction._
|
||||
|
||||
var latestGossip: Gossip = Gossip()
|
||||
|
|
|
|||
|
|
@ -23,10 +23,12 @@ case class Heartbeat(from: Address) extends ClusterMessage
|
|||
* Instantiated as a single instance for each Cluster - e.g. heartbeats are serialized
|
||||
* to Cluster message after message, but concurrent with other types of messages.
|
||||
*/
|
||||
private[cluster] final class ClusterHeartbeatDaemon(environment: ClusterEnvironment) extends Actor with ActorLogging {
|
||||
private[cluster] final class ClusterHeartbeatDaemon extends Actor with ActorLogging {
|
||||
|
||||
val failureDetector = Cluster(context.system).failureDetector
|
||||
|
||||
def receive = {
|
||||
case Heartbeat(from) ⇒ environment.failureDetector heartbeat from
|
||||
case Heartbeat(from) ⇒ failureDetector heartbeat from
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -53,7 +55,7 @@ private[cluster] object ClusterHeartbeatSender {
|
|||
* address and thereby reduce the risk of irregular heartbeats to healty
|
||||
* nodes due to broken connections to other nodes.
|
||||
*/
|
||||
private[cluster] final class ClusterHeartbeatSender(environment: ClusterEnvironment) extends Actor with ActorLogging {
|
||||
private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogging {
|
||||
import ClusterHeartbeatSender._
|
||||
|
||||
/**
|
||||
|
|
@ -78,8 +80,7 @@ private[cluster] final class ClusterHeartbeatSender(environment: ClusterEnvironm
|
|||
val workerName = encodeChildName(to.toString)
|
||||
val worker = context.actorFor(workerName) match {
|
||||
case notFound if notFound.isTerminated ⇒
|
||||
context.actorOf(Props(new ClusterHeartbeatSenderWorker(
|
||||
environment.settings.SendCircuitBreakerSettings, clusterHeartbeatConnectionFor(to))), workerName)
|
||||
context.actorOf(Props(new ClusterHeartbeatSenderWorker(clusterHeartbeatConnectionFor(to))), workerName)
|
||||
case child ⇒ child
|
||||
}
|
||||
worker ! msg
|
||||
|
|
@ -96,17 +97,19 @@ private[cluster] final class ClusterHeartbeatSender(environment: ClusterEnvironm
|
|||
*
|
||||
* @see ClusterHeartbeatSender
|
||||
*/
|
||||
private[cluster] final class ClusterHeartbeatSenderWorker(
|
||||
cbSettings: CircuitBreakerSettings, toRef: ActorRef)
|
||||
private[cluster] final class ClusterHeartbeatSenderWorker(toRef: ActorRef)
|
||||
extends Actor with ActorLogging {
|
||||
|
||||
import ClusterHeartbeatSender._
|
||||
|
||||
val breaker = CircuitBreaker(context.system.scheduler,
|
||||
cbSettings.maxFailures, cbSettings.callTimeout, cbSettings.resetTimeout).
|
||||
onHalfOpen(log.debug("CircuitBreaker Half-Open for: [{}]", toRef)).
|
||||
onOpen(log.debug("CircuitBreaker Open for [{}]", toRef)).
|
||||
onClose(log.debug("CircuitBreaker Closed for [{}]", toRef))
|
||||
val breaker = {
|
||||
val cbSettings = Cluster(context.system).settings.SendCircuitBreakerSettings
|
||||
CircuitBreaker(context.system.scheduler,
|
||||
cbSettings.maxFailures, cbSettings.callTimeout, cbSettings.resetTimeout).
|
||||
onHalfOpen(log.debug("CircuitBreaker Half-Open for: [{}]", toRef)).
|
||||
onOpen(log.debug("CircuitBreaker Open for [{}]", toRef)).
|
||||
onClose(log.debug("CircuitBreaker Closed for [{}]", toRef))
|
||||
}
|
||||
|
||||
// make sure it will cleanup when not used any more
|
||||
context.setReceiveTimeout(30 seconds)
|
||||
|
|
|
|||
|
|
@ -9,30 +9,32 @@ import akka.remote.testkit.MultiNodeSpec
|
|||
import akka.testkit._
|
||||
import akka.actor.Address
|
||||
|
||||
object ClientDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig {
|
||||
case class ClientDowningNodeThatIsUnreachableMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
val third = role("third")
|
||||
val fourth = role("fourth")
|
||||
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet)))
|
||||
}
|
||||
|
||||
class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy
|
||||
class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy
|
||||
class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy
|
||||
class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy
|
||||
class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true)
|
||||
class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true)
|
||||
class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true)
|
||||
class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true)
|
||||
|
||||
class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy
|
||||
class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy
|
||||
class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy
|
||||
class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy
|
||||
class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false)
|
||||
class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false)
|
||||
class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false)
|
||||
class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false)
|
||||
|
||||
abstract class ClientDowningNodeThatIsUnreachableSpec
|
||||
extends MultiNodeSpec(ClientDowningNodeThatIsUnreachableMultiJvmSpec)
|
||||
with MultiNodeClusterSpec with FailureDetectorStrategy {
|
||||
abstract class ClientDowningNodeThatIsUnreachableSpec(multiNodeConfig: ClientDowningNodeThatIsUnreachableMultiNodeConfig)
|
||||
extends MultiNodeSpec(multiNodeConfig)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import ClientDowningNodeThatIsUnreachableMultiJvmSpec._
|
||||
def this(failureDetectorPuppet: Boolean) = this(ClientDowningNodeThatIsUnreachableMultiNodeConfig(failureDetectorPuppet))
|
||||
|
||||
import multiNodeConfig._
|
||||
|
||||
"Client of a 4 node cluster" must {
|
||||
|
||||
|
|
|
|||
|
|
@ -9,30 +9,32 @@ import akka.remote.testkit.MultiNodeSpec
|
|||
import akka.testkit._
|
||||
import akka.actor.Address
|
||||
|
||||
object ClientDowningNodeThatIsUpMultiJvmSpec extends MultiNodeConfig {
|
||||
case class ClientDowningNodeThatIsUpMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
val third = role("third")
|
||||
val fourth = role("fourth")
|
||||
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet)))
|
||||
}
|
||||
|
||||
class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy
|
||||
class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy
|
||||
class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy
|
||||
class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy
|
||||
class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = true)
|
||||
class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = true)
|
||||
class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = true)
|
||||
class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = true)
|
||||
|
||||
class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy
|
||||
class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy
|
||||
class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy
|
||||
class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy
|
||||
class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = false)
|
||||
class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = false)
|
||||
class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = false)
|
||||
class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec(failureDetectorPuppet = false)
|
||||
|
||||
abstract class ClientDowningNodeThatIsUpSpec
|
||||
extends MultiNodeSpec(ClientDowningNodeThatIsUpMultiJvmSpec)
|
||||
with MultiNodeClusterSpec with FailureDetectorStrategy {
|
||||
abstract class ClientDowningNodeThatIsUpSpec(multiNodeConfig: ClientDowningNodeThatIsUpMultiNodeConfig)
|
||||
extends MultiNodeSpec(multiNodeConfig)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import ClientDowningNodeThatIsUpMultiJvmSpec._
|
||||
def this(failureDetectorPuppet: Boolean) = this(ClientDowningNodeThatIsUpMultiNodeConfig(failureDetectorPuppet))
|
||||
|
||||
import multiNodeConfig._
|
||||
|
||||
"Client of a 4 node cluster" must {
|
||||
|
||||
|
|
|
|||
|
|
@ -19,9 +19,9 @@ object ClusterAccrualFailureDetectorMultiJvmSpec extends MultiNodeConfig {
|
|||
withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
}
|
||||
|
||||
class ClusterAccrualFailureDetectorMultiJvmNode1 extends ClusterAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy
|
||||
class ClusterAccrualFailureDetectorMultiJvmNode2 extends ClusterAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy
|
||||
class ClusterAccrualFailureDetectorMultiJvmNode3 extends ClusterAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy
|
||||
class ClusterAccrualFailureDetectorMultiJvmNode1 extends ClusterAccrualFailureDetectorSpec
|
||||
class ClusterAccrualFailureDetectorMultiJvmNode2 extends ClusterAccrualFailureDetectorSpec
|
||||
class ClusterAccrualFailureDetectorMultiJvmNode3 extends ClusterAccrualFailureDetectorSpec
|
||||
|
||||
abstract class ClusterAccrualFailureDetectorSpec
|
||||
extends MultiNodeSpec(ClusterAccrualFailureDetectorMultiJvmSpec)
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ import akka.testkit._
|
|||
import scala.concurrent.util.duration._
|
||||
import akka.actor.Address
|
||||
|
||||
object ConvergenceMultiJvmSpec extends MultiNodeConfig {
|
||||
case class ConvergenceMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
val third = role("third")
|
||||
|
|
@ -20,24 +20,26 @@ object ConvergenceMultiJvmSpec extends MultiNodeConfig {
|
|||
|
||||
commonConfig(debugConfig(on = false).
|
||||
withFallback(ConfigFactory.parseString("akka.cluster.failure-detector.threshold = 4")).
|
||||
withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet)))
|
||||
}
|
||||
|
||||
class ConvergenceWithFailureDetectorPuppetMultiJvmNode1 extends ConvergenceSpec with FailureDetectorPuppetStrategy
|
||||
class ConvergenceWithFailureDetectorPuppetMultiJvmNode2 extends ConvergenceSpec with FailureDetectorPuppetStrategy
|
||||
class ConvergenceWithFailureDetectorPuppetMultiJvmNode3 extends ConvergenceSpec with FailureDetectorPuppetStrategy
|
||||
class ConvergenceWithFailureDetectorPuppetMultiJvmNode4 extends ConvergenceSpec with FailureDetectorPuppetStrategy
|
||||
class ConvergenceWithFailureDetectorPuppetMultiJvmNode1 extends ConvergenceSpec(failureDetectorPuppet = true)
|
||||
class ConvergenceWithFailureDetectorPuppetMultiJvmNode2 extends ConvergenceSpec(failureDetectorPuppet = true)
|
||||
class ConvergenceWithFailureDetectorPuppetMultiJvmNode3 extends ConvergenceSpec(failureDetectorPuppet = true)
|
||||
class ConvergenceWithFailureDetectorPuppetMultiJvmNode4 extends ConvergenceSpec(failureDetectorPuppet = true)
|
||||
|
||||
class ConvergenceWithAccrualFailureDetectorMultiJvmNode1 extends ConvergenceSpec with AccrualFailureDetectorStrategy
|
||||
class ConvergenceWithAccrualFailureDetectorMultiJvmNode2 extends ConvergenceSpec with AccrualFailureDetectorStrategy
|
||||
class ConvergenceWithAccrualFailureDetectorMultiJvmNode3 extends ConvergenceSpec with AccrualFailureDetectorStrategy
|
||||
class ConvergenceWithAccrualFailureDetectorMultiJvmNode4 extends ConvergenceSpec with AccrualFailureDetectorStrategy
|
||||
class ConvergenceWithAccrualFailureDetectorMultiJvmNode1 extends ConvergenceSpec(failureDetectorPuppet = false)
|
||||
class ConvergenceWithAccrualFailureDetectorMultiJvmNode2 extends ConvergenceSpec(failureDetectorPuppet = false)
|
||||
class ConvergenceWithAccrualFailureDetectorMultiJvmNode3 extends ConvergenceSpec(failureDetectorPuppet = false)
|
||||
class ConvergenceWithAccrualFailureDetectorMultiJvmNode4 extends ConvergenceSpec(failureDetectorPuppet = false)
|
||||
|
||||
abstract class ConvergenceSpec
|
||||
extends MultiNodeSpec(ConvergenceMultiJvmSpec)
|
||||
with MultiNodeClusterSpec with FailureDetectorStrategy {
|
||||
abstract class ConvergenceSpec(multiNodeConfig: ConvergenceMultiNodeConfig)
|
||||
extends MultiNodeSpec(multiNodeConfig)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import ConvergenceMultiJvmSpec._
|
||||
def this(failureDetectorPuppet: Boolean) = this(ConvergenceMultiNodeConfig(failureDetectorPuppet))
|
||||
|
||||
import multiNodeConfig._
|
||||
|
||||
"A cluster of 3 members" must {
|
||||
|
||||
|
|
|
|||
|
|
@ -1,61 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster
|
||||
|
||||
import akka.actor.Address
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
|
||||
/**
|
||||
* Base trait for all failure detector strategies.
|
||||
*/
|
||||
trait FailureDetectorStrategy {
|
||||
|
||||
/**
|
||||
* Get or create the FailureDetector to be used in the cluster node.
|
||||
* To be defined by subclass.
|
||||
*/
|
||||
def failureDetector: FailureDetector
|
||||
|
||||
/**
|
||||
* Marks a node as available in the failure detector.
|
||||
* To be defined by subclass.
|
||||
*/
|
||||
def markNodeAsAvailable(address: Address): Unit
|
||||
|
||||
/**
|
||||
* Marks a node as unavailable in the failure detector.
|
||||
* To be defined by subclass.
|
||||
*/
|
||||
def markNodeAsUnavailable(address: Address): Unit
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a FailureDetectorPuppet-based FailureDetectorStrategy.
|
||||
*/
|
||||
trait FailureDetectorPuppetStrategy extends FailureDetectorStrategy { self: MultiNodeSpec ⇒
|
||||
|
||||
/**
|
||||
* The puppet instance. Separated from 'failureDetector' field so we don't have to cast when using the puppet specific methods.
|
||||
*/
|
||||
private val puppet = new FailureDetectorPuppet(system)
|
||||
|
||||
override def failureDetector: FailureDetector = puppet
|
||||
|
||||
override def markNodeAsAvailable(address: Address): Unit = puppet markNodeAsAvailable address
|
||||
|
||||
override def markNodeAsUnavailable(address: Address): Unit = puppet markNodeAsUnavailable address
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a AccrualFailureDetector-based FailureDetectorStrategy.
|
||||
*/
|
||||
trait AccrualFailureDetectorStrategy extends FailureDetectorStrategy { self: MultiNodeSpec ⇒
|
||||
|
||||
override val failureDetector: FailureDetector = new AccrualFailureDetector(system, new ClusterSettings(system.settings.config, system.name))
|
||||
|
||||
override def markNodeAsAvailable(address: Address): Unit = ()
|
||||
|
||||
override def markNodeAsUnavailable(address: Address): Unit = ()
|
||||
}
|
||||
|
|
@ -9,6 +9,7 @@ import akka.remote.testkit.MultiNodeConfig
|
|||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
import scala.concurrent.util.duration._
|
||||
import akka.actor.Address
|
||||
|
||||
object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig {
|
||||
val seed1 = role("seed1")
|
||||
|
|
@ -18,15 +19,15 @@ object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig {
|
|||
val ordinary2 = role("ordinary2")
|
||||
|
||||
commonConfig(debugConfig(on = false).
|
||||
withFallback(ConfigFactory.parseString("akka.cluster.auto-join = on")).
|
||||
withFallback(ConfigFactory.parseString("akka.cluster.auto-join = off")).
|
||||
withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
}
|
||||
|
||||
class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
|
||||
class JoinSeedNodeMultiJvmNode2 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
|
||||
class JoinSeedNodeMultiJvmNode3 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
|
||||
class JoinSeedNodeMultiJvmNode4 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
|
||||
class JoinSeedNodeMultiJvmNode5 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
|
||||
class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec
|
||||
class JoinSeedNodeMultiJvmNode2 extends JoinSeedNodeSpec
|
||||
class JoinSeedNodeMultiJvmNode3 extends JoinSeedNodeSpec
|
||||
class JoinSeedNodeMultiJvmNode4 extends JoinSeedNodeSpec
|
||||
class JoinSeedNodeMultiJvmNode5 extends JoinSeedNodeSpec
|
||||
|
||||
abstract class JoinSeedNodeSpec
|
||||
extends MultiNodeSpec(JoinSeedNodeMultiJvmSpec)
|
||||
|
|
@ -34,9 +35,9 @@ abstract class JoinSeedNodeSpec
|
|||
|
||||
import JoinSeedNodeMultiJvmSpec._
|
||||
|
||||
override def seedNodes = IndexedSeq(seed1, seed2, seed3)
|
||||
def seedNodes: IndexedSeq[Address] = IndexedSeq(seed1, seed2, seed3)
|
||||
|
||||
"A cluster with configured seed nodes" must {
|
||||
"A cluster with seed nodes" must {
|
||||
"be able to start the seed nodes concurrently" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(seed1) {
|
||||
|
|
@ -45,12 +46,16 @@ abstract class JoinSeedNodeSpec
|
|||
}
|
||||
|
||||
runOn(seed1, seed2, seed3) {
|
||||
cluster.joinSeedNodes(seedNodes)
|
||||
awaitUpConvergence(3)
|
||||
}
|
||||
enterBarrier("after-1")
|
||||
}
|
||||
|
||||
"join the seed nodes at startup" taggedAs LongRunningTest in {
|
||||
"join the seed nodes" taggedAs LongRunningTest in {
|
||||
runOn(ordinary1, ordinary2) {
|
||||
cluster.joinSeedNodes(seedNodes)
|
||||
}
|
||||
awaitUpConvergence(roles.size)
|
||||
enterBarrier("after-2")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,15 +17,15 @@ object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig {
|
|||
val c1 = role("c1")
|
||||
val c2 = role("c2")
|
||||
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
|
||||
}
|
||||
|
||||
class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy
|
||||
class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy
|
||||
class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy
|
||||
class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy
|
||||
class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy
|
||||
class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy
|
||||
class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec
|
||||
class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec
|
||||
class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec
|
||||
class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec
|
||||
class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec
|
||||
class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec
|
||||
|
||||
abstract class JoinTwoClustersSpec
|
||||
extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec)
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ import akka.testkit._
|
|||
import akka.actor._
|
||||
import scala.concurrent.util.duration._
|
||||
|
||||
object LeaderDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig {
|
||||
case class LeaderDowningNodeThatIsUnreachableMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
val third = role("third")
|
||||
|
|
@ -20,24 +20,26 @@ object LeaderDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig {
|
|||
|
||||
commonConfig(debugConfig(on = false).
|
||||
withFallback(ConfigFactory.parseString("akka.cluster.auto-down = on")).
|
||||
withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet)))
|
||||
}
|
||||
|
||||
class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy
|
||||
class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy
|
||||
class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy
|
||||
class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy
|
||||
class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true)
|
||||
class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true)
|
||||
class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true)
|
||||
class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = true)
|
||||
|
||||
class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy
|
||||
class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy
|
||||
class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy
|
||||
class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy
|
||||
class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false)
|
||||
class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false)
|
||||
class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false)
|
||||
class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec(failureDetectorPuppet = false)
|
||||
|
||||
abstract class LeaderDowningNodeThatIsUnreachableSpec
|
||||
extends MultiNodeSpec(LeaderDowningNodeThatIsUnreachableMultiJvmSpec)
|
||||
with MultiNodeClusterSpec with FailureDetectorStrategy {
|
||||
abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDowningNodeThatIsUnreachableMultiNodeConfig)
|
||||
extends MultiNodeSpec(multiNodeConfig)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import LeaderDowningNodeThatIsUnreachableMultiJvmSpec._
|
||||
def this(failureDetectorPuppet: Boolean) = this(LeaderDowningNodeThatIsUnreachableMultiNodeConfig(failureDetectorPuppet))
|
||||
|
||||
import multiNodeConfig._
|
||||
|
||||
"The Leader in a 4 node cluster" must {
|
||||
|
||||
|
|
|
|||
|
|
@ -9,33 +9,35 @@ import akka.remote.testkit.MultiNodeConfig
|
|||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
|
||||
object LeaderElectionMultiJvmSpec extends MultiNodeConfig {
|
||||
case class LeaderElectionMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig {
|
||||
val controller = role("controller")
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
val third = role("third")
|
||||
val fourth = role("fourth")
|
||||
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet)))
|
||||
}
|
||||
|
||||
class LeaderElectionWithFailureDetectorPuppetMultiJvmNode1 extends LeaderElectionSpec with FailureDetectorPuppetStrategy
|
||||
class LeaderElectionWithFailureDetectorPuppetMultiJvmNode2 extends LeaderElectionSpec with FailureDetectorPuppetStrategy
|
||||
class LeaderElectionWithFailureDetectorPuppetMultiJvmNode3 extends LeaderElectionSpec with FailureDetectorPuppetStrategy
|
||||
class LeaderElectionWithFailureDetectorPuppetMultiJvmNode4 extends LeaderElectionSpec with FailureDetectorPuppetStrategy
|
||||
class LeaderElectionWithFailureDetectorPuppetMultiJvmNode5 extends LeaderElectionSpec with FailureDetectorPuppetStrategy
|
||||
class LeaderElectionWithFailureDetectorPuppetMultiJvmNode1 extends LeaderElectionSpec(failureDetectorPuppet = true)
|
||||
class LeaderElectionWithFailureDetectorPuppetMultiJvmNode2 extends LeaderElectionSpec(failureDetectorPuppet = true)
|
||||
class LeaderElectionWithFailureDetectorPuppetMultiJvmNode3 extends LeaderElectionSpec(failureDetectorPuppet = true)
|
||||
class LeaderElectionWithFailureDetectorPuppetMultiJvmNode4 extends LeaderElectionSpec(failureDetectorPuppet = true)
|
||||
class LeaderElectionWithFailureDetectorPuppetMultiJvmNode5 extends LeaderElectionSpec(failureDetectorPuppet = true)
|
||||
|
||||
class LeaderElectionWithAccrualFailureDetectorMultiJvmNode1 extends LeaderElectionSpec with AccrualFailureDetectorStrategy
|
||||
class LeaderElectionWithAccrualFailureDetectorMultiJvmNode2 extends LeaderElectionSpec with AccrualFailureDetectorStrategy
|
||||
class LeaderElectionWithAccrualFailureDetectorMultiJvmNode3 extends LeaderElectionSpec with AccrualFailureDetectorStrategy
|
||||
class LeaderElectionWithAccrualFailureDetectorMultiJvmNode4 extends LeaderElectionSpec with AccrualFailureDetectorStrategy
|
||||
class LeaderElectionWithAccrualFailureDetectorMultiJvmNode5 extends LeaderElectionSpec with AccrualFailureDetectorStrategy
|
||||
class LeaderElectionWithAccrualFailureDetectorMultiJvmNode1 extends LeaderElectionSpec(failureDetectorPuppet = false)
|
||||
class LeaderElectionWithAccrualFailureDetectorMultiJvmNode2 extends LeaderElectionSpec(failureDetectorPuppet = false)
|
||||
class LeaderElectionWithAccrualFailureDetectorMultiJvmNode3 extends LeaderElectionSpec(failureDetectorPuppet = false)
|
||||
class LeaderElectionWithAccrualFailureDetectorMultiJvmNode4 extends LeaderElectionSpec(failureDetectorPuppet = false)
|
||||
class LeaderElectionWithAccrualFailureDetectorMultiJvmNode5 extends LeaderElectionSpec(failureDetectorPuppet = false)
|
||||
|
||||
abstract class LeaderElectionSpec
|
||||
extends MultiNodeSpec(LeaderElectionMultiJvmSpec)
|
||||
with MultiNodeClusterSpec with FailureDetectorStrategy {
|
||||
abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig)
|
||||
extends MultiNodeSpec(multiNodeConfig)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import LeaderElectionMultiJvmSpec._
|
||||
def this(failureDetectorPuppet: Boolean) = this(LeaderElectionMultiNodeConfig(failureDetectorPuppet))
|
||||
|
||||
import multiNodeConfig._
|
||||
|
||||
// sorted in the order used by the cluster
|
||||
lazy val sortedRoles = Seq(first, second, third, fourth).sorted
|
||||
|
|
|
|||
|
|
@ -23,12 +23,12 @@ object LeaderLeavingMultiJvmSpec extends MultiNodeConfig {
|
|||
.withFallback(ConfigFactory.parseString("""
|
||||
# turn off unreachable reaper
|
||||
akka.cluster.unreachable-nodes-reaper-interval = 300 s""")
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfig)))
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)))
|
||||
}
|
||||
|
||||
class LeaderLeavingMultiJvmNode1 extends LeaderLeavingSpec with FailureDetectorPuppetStrategy
|
||||
class LeaderLeavingMultiJvmNode2 extends LeaderLeavingSpec with FailureDetectorPuppetStrategy
|
||||
class LeaderLeavingMultiJvmNode3 extends LeaderLeavingSpec with FailureDetectorPuppetStrategy
|
||||
class LeaderLeavingMultiJvmNode1 extends LeaderLeavingSpec
|
||||
class LeaderLeavingMultiJvmNode2 extends LeaderLeavingSpec
|
||||
class LeaderLeavingMultiJvmNode3 extends LeaderLeavingSpec
|
||||
|
||||
abstract class LeaderLeavingSpec
|
||||
extends MultiNodeSpec(LeaderLeavingMultiJvmSpec)
|
||||
|
|
|
|||
|
|
@ -26,12 +26,12 @@ object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig {
|
|||
unreachable-nodes-reaper-interval = 300 s # turn "off" reaping to unreachable node set
|
||||
}
|
||||
""")
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfig)))
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)))
|
||||
}
|
||||
|
||||
class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListenerExitingSpec with FailureDetectorPuppetStrategy
|
||||
class MembershipChangeListenerExitingMultiJvmNode2 extends MembershipChangeListenerExitingSpec with FailureDetectorPuppetStrategy
|
||||
class MembershipChangeListenerExitingMultiJvmNode3 extends MembershipChangeListenerExitingSpec with FailureDetectorPuppetStrategy
|
||||
class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListenerExitingSpec
|
||||
class MembershipChangeListenerExitingMultiJvmNode2 extends MembershipChangeListenerExitingSpec
|
||||
class MembershipChangeListenerExitingMultiJvmNode3 extends MembershipChangeListenerExitingSpec
|
||||
|
||||
abstract class MembershipChangeListenerExitingSpec
|
||||
extends MultiNodeSpec(MembershipChangeListenerExitingMultiJvmSpec)
|
||||
|
|
|
|||
|
|
@ -17,11 +17,11 @@ object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig {
|
|||
val first = role("first")
|
||||
val second = role("second")
|
||||
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
|
||||
}
|
||||
|
||||
class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec with FailureDetectorPuppetStrategy
|
||||
class MembershipChangeListenerJoinMultiJvmNode2 extends MembershipChangeListenerJoinSpec with FailureDetectorPuppetStrategy
|
||||
class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec
|
||||
class MembershipChangeListenerJoinMultiJvmNode2 extends MembershipChangeListenerJoinSpec
|
||||
|
||||
abstract class MembershipChangeListenerJoinSpec
|
||||
extends MultiNodeSpec(MembershipChangeListenerJoinMultiJvmSpec)
|
||||
|
|
|
|||
|
|
@ -24,12 +24,12 @@ object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig {
|
|||
.withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster.unreachable-nodes-reaper-interval = 300 s # turn "off"
|
||||
"""))
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
|
||||
}
|
||||
|
||||
class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListenerLeavingSpec with FailureDetectorPuppetStrategy
|
||||
class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec with FailureDetectorPuppetStrategy
|
||||
class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec with FailureDetectorPuppetStrategy
|
||||
class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListenerLeavingSpec
|
||||
class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec
|
||||
class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec
|
||||
|
||||
abstract class MembershipChangeListenerLeavingSpec
|
||||
extends MultiNodeSpec(MembershipChangeListenerLeavingMultiJvmSpec)
|
||||
|
|
|
|||
|
|
@ -16,12 +16,12 @@ object MembershipChangeListenerUpMultiJvmSpec extends MultiNodeConfig {
|
|||
val second = role("second")
|
||||
val third = role("third")
|
||||
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
|
||||
}
|
||||
|
||||
class MembershipChangeListenerUpMultiJvmNode1 extends MembershipChangeListenerUpSpec with FailureDetectorPuppetStrategy
|
||||
class MembershipChangeListenerUpMultiJvmNode2 extends MembershipChangeListenerUpSpec with FailureDetectorPuppetStrategy
|
||||
class MembershipChangeListenerUpMultiJvmNode3 extends MembershipChangeListenerUpSpec with FailureDetectorPuppetStrategy
|
||||
class MembershipChangeListenerUpMultiJvmNode1 extends MembershipChangeListenerUpSpec
|
||||
class MembershipChangeListenerUpMultiJvmNode2 extends MembershipChangeListenerUpSpec
|
||||
class MembershipChangeListenerUpMultiJvmNode3 extends MembershipChangeListenerUpSpec
|
||||
|
||||
abstract class MembershipChangeListenerUpSpec
|
||||
extends MultiNodeSpec(MembershipChangeListenerUpMultiJvmSpec)
|
||||
|
|
|
|||
|
|
@ -20,6 +20,14 @@ import akka.actor.ActorPath
|
|||
import akka.actor.RootActorPath
|
||||
|
||||
object MultiNodeClusterSpec {
|
||||
|
||||
def clusterConfigWithFailureDetectorPuppet: Config =
|
||||
ConfigFactory.parseString("akka.cluster.failure-detector.implementation-class = akka.cluster.FailureDetectorPuppet").
|
||||
withFallback(clusterConfig)
|
||||
|
||||
def clusterConfig(failureDetectorPuppet: Boolean): Config =
|
||||
if (failureDetectorPuppet) clusterConfigWithFailureDetectorPuppet else clusterConfig
|
||||
|
||||
def clusterConfig: Config = ConfigFactory.parseString("""
|
||||
akka.actor.provider = akka.cluster.ClusterActorRefProvider
|
||||
akka.cluster {
|
||||
|
|
@ -32,6 +40,7 @@ object MultiNodeClusterSpec {
|
|||
periodic-tasks-initial-delay = 300 ms
|
||||
publish-stats-interval = 0 s # always, when it happens
|
||||
}
|
||||
akka.remote.log-remote-lifecycle-events = off
|
||||
akka.test {
|
||||
single-expect-default = 5 s
|
||||
}
|
||||
|
|
@ -81,39 +90,12 @@ trait MultiNodeClusterSpec extends Suite { self: MultiNodeSpec ⇒
|
|||
throw t
|
||||
}
|
||||
|
||||
/**
|
||||
* Make it possible to override/configure seedNodes from tests without
|
||||
* specifying in config. Addresses are unknown before startup time.
|
||||
*/
|
||||
protected def seedNodes: IndexedSeq[RoleName] = IndexedSeq.empty
|
||||
|
||||
/**
|
||||
* The cluster node instance. Needs to be lazily created.
|
||||
*/
|
||||
private lazy val clusterNode = this match {
|
||||
case x: FailureDetectorStrategy ⇒ createTestCluster(x.failureDetector)
|
||||
case _ if seedNodes.nonEmpty ⇒
|
||||
createTestCluster(new AccrualFailureDetector(system, new ClusterSettings(system.settings.config, system.name)))
|
||||
case _ ⇒ Cluster(system)
|
||||
|
||||
}
|
||||
|
||||
private def createTestCluster(failureDetector: FailureDetector): Cluster = {
|
||||
new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) {
|
||||
override def seedNodes: IndexedSeq[Address] = {
|
||||
val testSeedNodes = MultiNodeClusterSpec.this.seedNodes
|
||||
if (testSeedNodes.isEmpty) super.seedNodes
|
||||
else testSeedNodes map address
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def clusterView: ClusterReadView = cluster.readView
|
||||
|
||||
/**
|
||||
* Get the cluster node to use.
|
||||
*/
|
||||
def cluster: Cluster = clusterNode
|
||||
def cluster: Cluster = Cluster(system)
|
||||
|
||||
/**
|
||||
* Use this method for the initial startup of the cluster node.
|
||||
|
|
@ -224,5 +206,25 @@ trait MultiNodeClusterSpec extends Suite { self: MultiNodeSpec ⇒
|
|||
|
||||
def roleName(addr: Address): Option[RoleName] = roles.find(address(_) == addr)
|
||||
|
||||
/**
|
||||
* Marks a node as available in the failure detector if
|
||||
* [[akka.cluster.FailureDetectorPuppet]] is used as
|
||||
* failure detector.
|
||||
*/
|
||||
def markNodeAsAvailable(address: Address): Unit = cluster.failureDetector match {
|
||||
case puppet: FailureDetectorPuppet ⇒ puppet.markNodeAsAvailable(address)
|
||||
case _ ⇒
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks a node as unavailable in the failure detector if
|
||||
* [[akka.cluster.FailureDetectorPuppet]] is used as
|
||||
* failure detector.
|
||||
*/
|
||||
def markNodeAsUnavailable(address: Address): Unit = cluster.failureDetector match {
|
||||
case puppet: FailureDetectorPuppet ⇒ puppet.markNodeAsUnavailable(address)
|
||||
case _ ⇒
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -15,12 +15,12 @@ object NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec extends MultiNodeConfig
|
|||
val second = role("second")
|
||||
val third = role("third")
|
||||
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
|
||||
}
|
||||
|
||||
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec with FailureDetectorPuppetStrategy
|
||||
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode2 extends NodeLeavingAndExitingAndBeingRemovedSpec with FailureDetectorPuppetStrategy
|
||||
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode3 extends NodeLeavingAndExitingAndBeingRemovedSpec with FailureDetectorPuppetStrategy
|
||||
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec
|
||||
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode2 extends NodeLeavingAndExitingAndBeingRemovedSpec
|
||||
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode3 extends NodeLeavingAndExitingAndBeingRemovedSpec
|
||||
|
||||
abstract class NodeLeavingAndExitingAndBeingRemovedSpec
|
||||
extends MultiNodeSpec(NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec)
|
||||
|
|
|
|||
|
|
@ -23,12 +23,12 @@ object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig {
|
|||
.withFallback(ConfigFactory.parseString("""
|
||||
# turn off unreachable reaper
|
||||
akka.cluster.unreachable-nodes-reaper-interval = 300 s""")
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfig)))
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)))
|
||||
}
|
||||
|
||||
class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec with FailureDetectorPuppetStrategy
|
||||
class NodeLeavingAndExitingMultiJvmNode2 extends NodeLeavingAndExitingSpec with FailureDetectorPuppetStrategy
|
||||
class NodeLeavingAndExitingMultiJvmNode3 extends NodeLeavingAndExitingSpec with FailureDetectorPuppetStrategy
|
||||
class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec
|
||||
class NodeLeavingAndExitingMultiJvmNode2 extends NodeLeavingAndExitingSpec
|
||||
class NodeLeavingAndExitingMultiJvmNode3 extends NodeLeavingAndExitingSpec
|
||||
|
||||
abstract class NodeLeavingAndExitingSpec
|
||||
extends MultiNodeSpec(NodeLeavingAndExitingMultiJvmSpec)
|
||||
|
|
|
|||
|
|
@ -13,12 +13,12 @@ object NodeMembershipMultiJvmSpec extends MultiNodeConfig {
|
|||
val second = role("second")
|
||||
val third = role("third")
|
||||
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
|
||||
}
|
||||
|
||||
class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec with FailureDetectorPuppetStrategy
|
||||
class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec with FailureDetectorPuppetStrategy
|
||||
class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec with FailureDetectorPuppetStrategy
|
||||
class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec
|
||||
class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec
|
||||
class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec
|
||||
|
||||
abstract class NodeMembershipSpec
|
||||
extends MultiNodeSpec(NodeMembershipMultiJvmSpec)
|
||||
|
|
|
|||
|
|
@ -18,11 +18,11 @@ object NodeUpMultiJvmSpec extends MultiNodeConfig {
|
|||
val first = role("first")
|
||||
val second = role("second")
|
||||
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
|
||||
}
|
||||
|
||||
class NodeUpMultiJvmNode1 extends NodeUpSpec with FailureDetectorPuppetStrategy
|
||||
class NodeUpMultiJvmNode2 extends NodeUpSpec with FailureDetectorPuppetStrategy
|
||||
class NodeUpMultiJvmNode1 extends NodeUpSpec
|
||||
class NodeUpMultiJvmNode2 extends NodeUpSpec
|
||||
|
||||
abstract class NodeUpSpec
|
||||
extends MultiNodeSpec(NodeUpMultiJvmSpec)
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ import akka.remote.testkit.MultiNodeSpec
|
|||
import akka.testkit._
|
||||
import scala.concurrent.util.duration._
|
||||
|
||||
object SingletonClusterMultiJvmSpec extends MultiNodeConfig {
|
||||
case class SingletonClusterMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
|
||||
|
|
@ -21,21 +21,23 @@ object SingletonClusterMultiJvmSpec extends MultiNodeConfig {
|
|||
failure-detector.threshold = 4
|
||||
}
|
||||
""")).
|
||||
withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet)))
|
||||
|
||||
}
|
||||
|
||||
class SingletonClusterWithFailureDetectorPuppetMultiJvmNode1 extends SingletonClusterSpec with FailureDetectorPuppetStrategy
|
||||
class SingletonClusterWithFailureDetectorPuppetMultiJvmNode2 extends SingletonClusterSpec with FailureDetectorPuppetStrategy
|
||||
class SingletonClusterWithFailureDetectorPuppetMultiJvmNode1 extends SingletonClusterSpec(failureDetectorPuppet = true)
|
||||
class SingletonClusterWithFailureDetectorPuppetMultiJvmNode2 extends SingletonClusterSpec(failureDetectorPuppet = true)
|
||||
|
||||
class SingletonClusterWithAccrualFailureDetectorMultiJvmNode1 extends SingletonClusterSpec with AccrualFailureDetectorStrategy
|
||||
class SingletonClusterWithAccrualFailureDetectorMultiJvmNode2 extends SingletonClusterSpec with AccrualFailureDetectorStrategy
|
||||
class SingletonClusterWithAccrualFailureDetectorMultiJvmNode1 extends SingletonClusterSpec(failureDetectorPuppet = false)
|
||||
class SingletonClusterWithAccrualFailureDetectorMultiJvmNode2 extends SingletonClusterSpec(failureDetectorPuppet = false)
|
||||
|
||||
abstract class SingletonClusterSpec
|
||||
extends MultiNodeSpec(SingletonClusterMultiJvmSpec)
|
||||
with MultiNodeClusterSpec with FailureDetectorStrategy {
|
||||
abstract class SingletonClusterSpec(multiNodeConfig: SingletonClusterMultiNodeConfig)
|
||||
extends MultiNodeSpec(multiNodeConfig)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import SingletonClusterMultiJvmSpec._
|
||||
def this(failureDetectorPuppet: Boolean) = this(SingletonClusterMultiNodeConfig(failureDetectorPuppet))
|
||||
|
||||
import multiNodeConfig._
|
||||
|
||||
"A cluster of 2 nodes" must {
|
||||
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ import scala.concurrent.util.duration._
|
|||
import akka.actor.Address
|
||||
import akka.remote.testconductor.Direction
|
||||
|
||||
object SplitBrainMultiJvmSpec extends MultiNodeConfig {
|
||||
case class SplitBrainMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
val third = role("third")
|
||||
|
|
@ -26,26 +26,28 @@ object SplitBrainMultiJvmSpec extends MultiNodeConfig {
|
|||
auto-down = on
|
||||
failure-detector.threshold = 4
|
||||
}""")).
|
||||
withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet)))
|
||||
}
|
||||
|
||||
class SplitBrainWithFailureDetectorPuppetMultiJvmNode1 extends SplitBrainSpec with FailureDetectorPuppetStrategy
|
||||
class SplitBrainWithFailureDetectorPuppetMultiJvmNode2 extends SplitBrainSpec with FailureDetectorPuppetStrategy
|
||||
class SplitBrainWithFailureDetectorPuppetMultiJvmNode3 extends SplitBrainSpec with FailureDetectorPuppetStrategy
|
||||
class SplitBrainWithFailureDetectorPuppetMultiJvmNode4 extends SplitBrainSpec with FailureDetectorPuppetStrategy
|
||||
class SplitBrainWithFailureDetectorPuppetMultiJvmNode5 extends SplitBrainSpec with FailureDetectorPuppetStrategy
|
||||
class SplitBrainWithFailureDetectorPuppetMultiJvmNode1 extends SplitBrainSpec(failureDetectorPuppet = true)
|
||||
class SplitBrainWithFailureDetectorPuppetMultiJvmNode2 extends SplitBrainSpec(failureDetectorPuppet = true)
|
||||
class SplitBrainWithFailureDetectorPuppetMultiJvmNode3 extends SplitBrainSpec(failureDetectorPuppet = true)
|
||||
class SplitBrainWithFailureDetectorPuppetMultiJvmNode4 extends SplitBrainSpec(failureDetectorPuppet = true)
|
||||
class SplitBrainWithFailureDetectorPuppetMultiJvmNode5 extends SplitBrainSpec(failureDetectorPuppet = true)
|
||||
|
||||
class SplitBrainWithAccrualFailureDetectorMultiJvmNode1 extends SplitBrainSpec with AccrualFailureDetectorStrategy
|
||||
class SplitBrainWithAccrualFailureDetectorMultiJvmNode2 extends SplitBrainSpec with AccrualFailureDetectorStrategy
|
||||
class SplitBrainWithAccrualFailureDetectorMultiJvmNode3 extends SplitBrainSpec with AccrualFailureDetectorStrategy
|
||||
class SplitBrainWithAccrualFailureDetectorMultiJvmNode4 extends SplitBrainSpec with AccrualFailureDetectorStrategy
|
||||
class SplitBrainWithAccrualFailureDetectorMultiJvmNode5 extends SplitBrainSpec with AccrualFailureDetectorStrategy
|
||||
class SplitBrainWithAccrualFailureDetectorMultiJvmNode1 extends SplitBrainSpec(failureDetectorPuppet = false)
|
||||
class SplitBrainWithAccrualFailureDetectorMultiJvmNode2 extends SplitBrainSpec(failureDetectorPuppet = false)
|
||||
class SplitBrainWithAccrualFailureDetectorMultiJvmNode3 extends SplitBrainSpec(failureDetectorPuppet = false)
|
||||
class SplitBrainWithAccrualFailureDetectorMultiJvmNode4 extends SplitBrainSpec(failureDetectorPuppet = false)
|
||||
class SplitBrainWithAccrualFailureDetectorMultiJvmNode5 extends SplitBrainSpec(failureDetectorPuppet = false)
|
||||
|
||||
abstract class SplitBrainSpec
|
||||
extends MultiNodeSpec(SplitBrainMultiJvmSpec)
|
||||
with MultiNodeClusterSpec with FailureDetectorStrategy {
|
||||
abstract class SplitBrainSpec(multiNodeConfig: SplitBrainMultiNodeConfig)
|
||||
extends MultiNodeSpec(multiNodeConfig)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import SplitBrainMultiJvmSpec._
|
||||
def this(failureDetectorPuppet: Boolean) = this(SplitBrainMultiNodeConfig(failureDetectorPuppet))
|
||||
|
||||
import multiNodeConfig._
|
||||
|
||||
val side1 = IndexedSeq(first, second)
|
||||
val side2 = IndexedSeq(third, fourth, fifth)
|
||||
|
|
|
|||
|
|
@ -23,17 +23,16 @@ object TransitionMultiJvmSpec extends MultiNodeConfig {
|
|||
|
||||
commonConfig(debugConfig(on = false).
|
||||
withFallback(ConfigFactory.parseString("akka.cluster.periodic-tasks-initial-delay = 300 s # turn off all periodic tasks")).
|
||||
withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
|
||||
}
|
||||
|
||||
class TransitionMultiJvmNode1 extends TransitionSpec with FailureDetectorPuppetStrategy
|
||||
class TransitionMultiJvmNode2 extends TransitionSpec with FailureDetectorPuppetStrategy
|
||||
class TransitionMultiJvmNode3 extends TransitionSpec with FailureDetectorPuppetStrategy
|
||||
class TransitionMultiJvmNode1 extends TransitionSpec
|
||||
class TransitionMultiJvmNode2 extends TransitionSpec
|
||||
class TransitionMultiJvmNode3 extends TransitionSpec
|
||||
|
||||
abstract class TransitionSpec
|
||||
extends MultiNodeSpec(TransitionMultiJvmSpec)
|
||||
with MultiNodeClusterSpec
|
||||
with FailureDetectorStrategy
|
||||
with ImplicitSender {
|
||||
|
||||
import TransitionMultiJvmSpec._
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ import akka.actor.Address
|
|||
import akka.remote.testconductor.{ RoleName, Direction }
|
||||
import scala.concurrent.util.duration._
|
||||
|
||||
object UnreachableNodeRejoinsClusterMultiJvmSpec extends MultiNodeConfig {
|
||||
case class UnreachableNodeRejoinsClusterMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
val third = role("third")
|
||||
|
|
@ -23,20 +23,23 @@ object UnreachableNodeRejoinsClusterMultiJvmSpec extends MultiNodeConfig {
|
|||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
}
|
||||
|
||||
class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy
|
||||
class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy
|
||||
class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy
|
||||
class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy
|
||||
class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true)
|
||||
class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true)
|
||||
class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true)
|
||||
class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true)
|
||||
|
||||
class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy
|
||||
class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy
|
||||
class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy
|
||||
class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy
|
||||
class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = false)
|
||||
class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = false)
|
||||
class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = false)
|
||||
class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = false)
|
||||
|
||||
abstract class UnreachableNodeRejoinsClusterSpec
|
||||
extends MultiNodeSpec(UnreachableNodeRejoinsClusterMultiJvmSpec)
|
||||
with MultiNodeClusterSpec with FailureDetectorStrategy {
|
||||
import UnreachableNodeRejoinsClusterMultiJvmSpec._
|
||||
abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNodeRejoinsClusterMultiNodeConfig)
|
||||
extends MultiNodeSpec(multiNodeConfig)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
def this(failureDetectorPuppet: Boolean) = this(UnreachableNodeRejoinsClusterMultiNodeConfig(failureDetectorPuppet))
|
||||
|
||||
import multiNodeConfig._
|
||||
|
||||
def allBut(role: RoleName, roles: Seq[RoleName] = roles): Seq[RoleName] = {
|
||||
roles.filterNot(_ == role)
|
||||
|
|
|
|||
|
|
@ -25,8 +25,10 @@ object ClusterSpec {
|
|||
auto-down = off
|
||||
periodic-tasks-initial-delay = 120 seconds // turn off scheduled tasks
|
||||
publish-stats-interval = 0 s # always, when it happens
|
||||
failure-detector.implementation-class = akka.cluster.FailureDetectorPuppet
|
||||
}
|
||||
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
||||
akka.remote.log-remote-lifecycle-events = off
|
||||
akka.remote.netty.port = 0
|
||||
# akka.loglevel = DEBUG
|
||||
"""
|
||||
|
|
@ -40,9 +42,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
|
|||
|
||||
val selfAddress = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[ClusterActorRefProvider].transport.address
|
||||
|
||||
val failureDetector = new FailureDetectorPuppet(system)
|
||||
|
||||
val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector)
|
||||
val cluster = Cluster(system)
|
||||
def clusterView = cluster.readView
|
||||
|
||||
def leaderActions(): Unit = {
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ package akka.cluster
|
|||
|
||||
import akka.actor.{ Address, ActorSystem }
|
||||
import akka.event.{ Logging, LogSource }
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
|
||||
/**
|
||||
* User controllable "puppet" failure detector.
|
||||
|
|
@ -13,8 +14,6 @@ import akka.event.{ Logging, LogSource }
|
|||
class FailureDetectorPuppet(system: ActorSystem, settings: ClusterSettings) extends FailureDetector {
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
def this(system: ActorSystem) = this(system, new ClusterSettings(system.settings.config, system.name))
|
||||
|
||||
trait Status
|
||||
object Up extends Status
|
||||
object Down extends Status
|
||||
|
|
@ -63,3 +62,4 @@ class FailureDetectorPuppet(system: ActorSystem, settings: ClusterSettings) exte
|
|||
connections.clear()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -15,5 +15,5 @@ setup(
|
|||
description = __doc__,
|
||||
author = "Akka",
|
||||
packages = ['styles'],
|
||||
entry_points = entry_points
|
||||
entry_points = entry_points
|
||||
)
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ The Akka cluster is a separate jar file. Make sure that you have the following d
|
|||
|
||||
.. parsed-literal::
|
||||
|
||||
"com.typesafe.akka" % "akka-cluster_|scalaVersion|" % "2.1-SNAPSHOT"
|
||||
"com.typesafe.akka" %% "akka-cluster" % "2.1-SNAPSHOT"
|
||||
|
||||
If you are using the latest nightly build you should pick a timestamped Akka version from `<http://repo.typesafe.com/typesafe/snapshots/com/typesafe/akka/akka-cluster/>`_.
|
||||
|
||||
|
|
|
|||
|
|
@ -50,6 +50,7 @@ html_show_sourcelink = False
|
|||
html_show_sphinx = False
|
||||
html_show_copyright = True
|
||||
htmlhelp_basename = 'Akkadoc'
|
||||
html_use_smartypants = False
|
||||
html_add_permalinks = ''
|
||||
|
||||
html_context = {
|
||||
|
|
|
|||
|
|
@ -1,31 +0,0 @@
|
|||
.. _howdoi:
|
||||
|
||||
How do I …
|
||||
================================
|
||||
|
||||
This section of the Akka Documentation tries to answer common usage questions.
|
||||
|
||||
… deal with blocking third-party code?
|
||||
--------------------------------------
|
||||
|
||||
Some times you cannot avoid doing blocking, and in that case you might want to explore the following:
|
||||
|
||||
1. Isolate the blocking to a dedicated ``ExecutionContext``
|
||||
2. Configure the actor to have a bounded-mailbox as to prevent from excessive mailbox sizes
|
||||
|
||||
.. note::
|
||||
|
||||
Before you do anything at all, measure!
|
||||
|
||||
|
||||
… use persistence with Akka?
|
||||
----------------------------
|
||||
|
||||
You just use it?
|
||||
You might want to have a look at the answer to the question about blocking though.
|
||||
|
||||
… use a pool of connections/whatnots
|
||||
------------------------------------
|
||||
|
||||
You most probably want to wrap that pooling service as an Akka Extension,
|
||||
see the docs for documentation on Java / Scala Extensions.
|
||||
|
|
@ -11,5 +11,4 @@ General
|
|||
remoting
|
||||
jmm
|
||||
message-send-semantics
|
||||
configuration
|
||||
howdoi
|
||||
configuration
|
||||
|
|
@ -14,9 +14,9 @@ The best way to start learning Akka is to download the Typesafe Stack and either
|
|||
the Akka Getting Started Tutorials or check out one of Akka Template Projects. Both comes
|
||||
in several flavours depending on your development environment preferences.
|
||||
|
||||
- `Download Typesafe Stack <http://typesafe.com/resources/getting-started/typesafe-stack/downloading-installing.html>`_
|
||||
- `Getting Started Tutorials <http://typesafe.com/resources/getting-started/tutorials/index.html>`_
|
||||
- `Template Projects <http://typesafe.com/resources/getting-started/typesafe-stack/downloading-installing.html#template-projects-for-scala-akka-and-play>`_
|
||||
- `Download Typesafe Stack <http://typesafe.com/stack/download>`_
|
||||
- `Getting Started Tutorials <http://typesafe.com/resources/getting-started>`_
|
||||
- `Template Projects <http://typesafe.com/stack/download#template>`_
|
||||
|
||||
Download
|
||||
--------
|
||||
|
|
|
|||
|
|
@ -5,11 +5,11 @@ package docs.actor
|
|||
|
||||
import language.postfixOps
|
||||
|
||||
import akka.testkit.{ AkkaSpec ⇒ MyFavoriteTestFrameWorkPlusAkkaTestKit }
|
||||
//#test-code
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.actor.Props
|
||||
|
||||
class FSMDocSpec extends AkkaSpec {
|
||||
class FSMDocSpec extends MyFavoriteTestFrameWorkPlusAkkaTestKit {
|
||||
|
||||
"simple finite state machine" must {
|
||||
//#fsm-code-elided
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ Throttling Messages
|
|||
|
||||
Contributed by: Kaspar Fischer
|
||||
|
||||
A message throttler that ensures that messages are not sent out at too high a rate.
|
||||
"A message throttler that ensures that messages are not sent out at too high a rate."
|
||||
|
||||
The pattern is described `here <http://letitcrash.com/post/28901663062/throttling-messages-in-akka-2>`_.
|
||||
|
||||
|
|
@ -28,9 +28,9 @@ Balancing Workload Across Nodes
|
|||
|
||||
Contributed by: Derek Wyatt
|
||||
|
||||
Often times, people want the functionality of the BalancingDispatcher with the
|
||||
"Often times, people want the functionality of the BalancingDispatcher with the
|
||||
stipulation that the Actors doing the work have distinct Mailboxes on remote
|
||||
nodes. In this post we’ll explore the implementation of such a concept.
|
||||
nodes. In this post we’ll explore the implementation of such a concept."
|
||||
|
||||
The pattern is described `here <http://letitcrash.com/post/29044669086/balancing-workload-across-nodes-with-akka-2>`_.
|
||||
|
||||
|
|
@ -39,15 +39,78 @@ Ordered Termination
|
|||
|
||||
Contributed by: Derek Wyatt
|
||||
|
||||
When an Actor stops, its children stop in an undefined order. Child termination is
|
||||
"When an Actor stops, its children stop in an undefined order. Child termination is
|
||||
asynchronous and thus non-deterministic.
|
||||
|
||||
If an Actor has children that have order dependencies, then you might need to ensure
|
||||
a particular shutdown order of those children so that their postStop() methods get
|
||||
called in the right order.
|
||||
called in the right order."
|
||||
|
||||
The pattern is described `here <http://letitcrash.com/post/29773618510/an-akka-2-terminator>`_.
|
||||
|
||||
Akka AMQP Proxies
|
||||
=================
|
||||
|
||||
Contributed by: Fabrice Drouin
|
||||
|
||||
"“AMQP proxies” is a simple way of integrating AMQP with Akka to distribute jobs across a network of computing nodes.
|
||||
You still write “local” code, have very little to configure, and end up with a distributed, elastic,
|
||||
fault-tolerant grid where computing nodes can be written in nearly every programming language."
|
||||
|
||||
The pattern is described `here <http://letitcrash.com/post/29988753572/akka-amqp-proxies>`_.
|
||||
|
||||
Shutdown Patterns in Akka 2
|
||||
===========================
|
||||
|
||||
Contributed by: Derek Wyatt
|
||||
|
||||
“How do you tell Akka to shut down the ActorSystem when everything’s finished?
|
||||
It turns out that there’s no magical flag for this, no configuration setting, no special callback you can register for,
|
||||
and neither will the illustrious shutdown fairy grace your application with her glorious presence at that perfect moment.
|
||||
She’s just plain mean.
|
||||
|
||||
In this post, we’ll discuss why this is the case and provide you with a simple option for shutting down “at the right time”,
|
||||
as well as a not-so-simple-option for doing the exact same thing."
|
||||
|
||||
The pattern is described `here <http://letitcrash.com/post/30165507578/shutdown-patterns-in-akka-2>`_.
|
||||
|
||||
Distributed (in-memory) graph processing with Akka
|
||||
==================================================
|
||||
|
||||
Contributed by: Adelbert Chang
|
||||
|
||||
"Graphs have always been an interesting structure to study in both mathematics and computer science (among other fields),
|
||||
and have become even more interesting in the context of online social networks such as Facebook and Twitter,
|
||||
whose underlying network structures are nicely represented by graphs."
|
||||
|
||||
The pattern is described `here <http://letitcrash.com/post/30257014291/distributed-in-memory-graph-processing-with-akka>`_.
|
||||
|
||||
Case Study: An Auto-Updating Cache Using Actors
|
||||
===============================================
|
||||
|
||||
Contributed by: Eric Pederson
|
||||
|
||||
"We recently needed to build a caching system in front of a slow backend system with the following requirements:
|
||||
|
||||
The data in the backend system is constantly being updated so the caches need to be updated every N minutes.
|
||||
Requests to the backend system need to be throttled.
|
||||
The caching system we built used Akka actors and Scala’s support for functions as first class objects."
|
||||
|
||||
The pattern is described `here <http://letitcrash.com/post/30509298968/case-study-an-auto-updating-cache-using-actors>`_.
|
||||
|
||||
Discovering message flows in actor systems with the Spider Pattern
|
||||
==================================================================
|
||||
|
||||
Contributed by: Raymond Roestenburg
|
||||
|
||||
"Building actor systems is fun but debugging them can be difficult, you mostly end up browsing through many log files
|
||||
on several machines to find out what’s going on. I’m sure you have browsed through logs and thought,
|
||||
“Hey, where did that message go?”, “Why did this message cause that effect” or “Why did this actor never get a message?”
|
||||
|
||||
This is where the Spider pattern comes in."
|
||||
|
||||
The pattern is described `here <http://letitcrash.com/post/30585282971/discovering-message-flows-in-actor-systems-with-the>`_.
|
||||
|
||||
Template Pattern
|
||||
================
|
||||
|
||||
|
|
|
|||
|
|
@ -54,6 +54,7 @@ abstract class MultiNodeConfig {
|
|||
receive = on
|
||||
fsm = on
|
||||
}
|
||||
akka.remote.log-remote-lifecycle-events = on
|
||||
""")
|
||||
else
|
||||
ConfigFactory.parseString("akka.loglevel = INFO")
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ package akka.routing
|
|||
import com.typesafe.config.ConfigFactory
|
||||
import akka.actor.ActorContext
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.ActorSystemImpl
|
||||
import akka.actor.Deploy
|
||||
import akka.actor.InternalActorRef
|
||||
import akka.actor.Props
|
||||
|
|
@ -18,6 +17,7 @@ import akka.actor.Address
|
|||
import scala.collection.JavaConverters._
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.lang.IllegalStateException
|
||||
import akka.actor.ActorCell
|
||||
|
||||
/**
|
||||
* [[akka.routing.RouterConfig]] implementation for remote deployment on defined
|
||||
|
|
@ -75,12 +75,14 @@ class RemoteRouteeProvider(nodes: Iterable[Address], _context: ActorContext, _ro
|
|||
format context.self.path.toString)
|
||||
|
||||
override def createRoutees(nrOfInstances: Int): Unit = {
|
||||
val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559
|
||||
val refs = IndexedSeq.empty[ActorRef] ++ (for (i ← 1 to nrOfInstances) yield {
|
||||
val name = "c" + childNameCounter.incrementAndGet
|
||||
val deploy = Deploy("", ConfigFactory.empty(), routeeProps.routerConfig, RemoteScope(nodeAddressIter.next))
|
||||
impl.provider.actorOf(impl, routeeProps, context.self.asInstanceOf[InternalActorRef], context.self.path / name,
|
||||
systemService = false, Some(deploy), lookupDeploy = false, async = false)
|
||||
|
||||
// attachChild means that the provider will treat this call as if possibly done out of the wrong
|
||||
// context and use RepointableActorRef instead of LocalActorRef. Seems like a slightly sub-optimal
|
||||
// choice in a corner case (and hence not worth fixing).
|
||||
context.asInstanceOf[ActorCell].attachChild(routeeProps.withDeploy(deploy), name, systemService = false)
|
||||
})
|
||||
registerRoutees(refs)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -67,10 +67,10 @@ akka.actor.deployment {
|
|||
nr-of-instances = 4
|
||||
}
|
||||
}""").withFallback(system.settings.config)
|
||||
val other = ActorSystem("remote-sys", conf)
|
||||
val otherSystem = ActorSystem("remote-sys", conf)
|
||||
|
||||
override def atTermination() {
|
||||
other.shutdown()
|
||||
otherSystem.shutdown()
|
||||
}
|
||||
|
||||
"A Remote Router" must {
|
||||
|
|
@ -199,6 +199,25 @@ akka.actor.deployment {
|
|||
system.stop(router)
|
||||
}
|
||||
|
||||
"set supplied supervisorStrategy" in {
|
||||
val escalator = OneForOneStrategy() {
|
||||
case e ⇒
|
||||
println("## " + e)
|
||||
testActor ! e; SupervisorStrategy.Escalate
|
||||
}
|
||||
val router = system.actorOf(Props.empty.withRouter(new RemoteRouterConfig(
|
||||
RoundRobinRouter(1, supervisorStrategy = escalator),
|
||||
Seq(Address("akka", "remote-sys", "localhost", 12347)))), "blub3")
|
||||
|
||||
router ! CurrentRoutees
|
||||
EventFilter[ActorKilledException](occurrences = 1) intercept {
|
||||
EventFilter[ActorKilledException](occurrences = 1).intercept {
|
||||
expectMsgType[RouterRoutees].routees.head ! Kill
|
||||
}(otherSystem)
|
||||
}
|
||||
expectMsgType[ActorKilledException]
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -568,7 +568,7 @@ trait TestKitBase {
|
|||
val message =
|
||||
if (max == 0.seconds) {
|
||||
queue.pollFirst
|
||||
} else if (max.finite_?) {
|
||||
} else if (max.isFinite) {
|
||||
queue.pollFirst(max.length, max.unit)
|
||||
} else {
|
||||
queue.takeFirst
|
||||
|
|
|
|||
|
|
@ -52,7 +52,7 @@ object Sphinx {
|
|||
val env = "PYTHONPATH" -> target.absolutePath
|
||||
s.log.debug("Command: " + command.mkString(" ") + "\nEnv:" + env)
|
||||
val exitCode = Process(command, cwd, env) ! logger
|
||||
if (exitCode != 0) sys.error("Failed to install custom Sphinx pygments styles.")
|
||||
if (exitCode != 0) sys.error("Failed to install custom Sphinx pygments styles: exit code " + exitCode)
|
||||
(pygments * ("*.egg-info" | "build" | "temp")).get.foreach(IO.delete)
|
||||
s.log.info("Sphinx pygments styles installed at: " + target)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue