Resolve merge conflicts
This commit is contained in:
commit
1f1f31f11c
25 changed files with 359 additions and 246 deletions
|
|
@ -8,9 +8,18 @@
|
|||
akka {
|
||||
|
||||
cluster {
|
||||
# node to join - the full URI defined by a string on the form of "akka://system@hostname:port"
|
||||
# leave as empty string if the node should be a singleton cluster
|
||||
node-to-join = ""
|
||||
# Initial contact points of the cluster. Nodes to join at startup if auto-join = on.
|
||||
# The seed nodes also play the role of deputy nodes (the nodes responsible
|
||||
# for breaking network partitions).
|
||||
# Comma separated full URIs defined by a string on the form of "akka://system@hostname:port"
|
||||
# Leave as empty if the node should be a singleton cluster.
|
||||
seed-nodes = []
|
||||
|
||||
# how long to wait for one of the seed nodes to reply to initial join request
|
||||
seed-node-timeout = 5s
|
||||
|
||||
# automatic join the seed-nodes at startup
|
||||
auto-join = on
|
||||
|
||||
# should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN?
|
||||
auto-down = on
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ object AccrualFailureDetector {
|
|||
* to this duration, with a with rather high standard deviation (since environment is unknown
|
||||
* in the beginning)
|
||||
*
|
||||
* @clock The clock, returning current time in milliseconds, but can be faked for testing
|
||||
* @param clock The clock, returning current time in milliseconds, but can be faked for testing
|
||||
* purposes. It is only used for measuring intervals (duration).
|
||||
*
|
||||
*/
|
||||
|
|
@ -68,7 +68,7 @@ class AccrualFailureDetector(
|
|||
val minStdDeviation: Duration,
|
||||
val acceptableHeartbeatPause: Duration,
|
||||
val firstHeartbeatEstimate: Duration,
|
||||
val clock: () ⇒ Long) extends FailureDetector {
|
||||
val clock: () ⇒ Long = AccrualFailureDetector.realClock) extends FailureDetector {
|
||||
|
||||
import AccrualFailureDetector._
|
||||
|
||||
|
|
@ -77,8 +77,7 @@ class AccrualFailureDetector(
|
|||
*/
|
||||
def this(
|
||||
system: ActorSystem,
|
||||
settings: ClusterSettings,
|
||||
clock: () ⇒ Long = AccrualFailureDetector.realClock) =
|
||||
settings: ClusterSettings) =
|
||||
this(
|
||||
system,
|
||||
settings.FailureDetectorThreshold,
|
||||
|
|
@ -86,7 +85,7 @@ class AccrualFailureDetector(
|
|||
settings.FailureDetectorAcceptableHeartbeatPause,
|
||||
settings.FailureDetectorMinStdDeviation,
|
||||
settings.HeartbeatInterval,
|
||||
clock)
|
||||
AccrualFailureDetector.realClock)
|
||||
|
||||
private val log = Logging(system, "FailureDetector")
|
||||
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import akka.dispatch.Await
|
|||
import akka.dispatch.MonitorableThreadFactory
|
||||
import akka.event.Logging
|
||||
import akka.jsr166y.ThreadLocalRandom
|
||||
import akka.pattern.ask
|
||||
import akka.pattern._
|
||||
import akka.remote._
|
||||
import akka.routing._
|
||||
import akka.util._
|
||||
|
|
@ -55,11 +55,29 @@ sealed trait ClusterMessage extends Serializable
|
|||
object ClusterUserAction {
|
||||
|
||||
/**
|
||||
* Command to join the cluster. Sent when a node (reprsesented by 'address')
|
||||
* Command to join the cluster. Sent when a node (represented by 'address')
|
||||
* wants to join another node (the receiver).
|
||||
*/
|
||||
case class Join(address: Address) extends ClusterMessage
|
||||
|
||||
/**
|
||||
* Start message of the process to join one of the seed nodes.
|
||||
* The node sends `InitJoin` to all seed nodes, which replies
|
||||
* with `InitJoinAck`. The first reply is used others are discarded.
|
||||
* The node sends `Join` command to the seed node that replied first.
|
||||
*/
|
||||
case object JoinSeedNode extends ClusterMessage
|
||||
|
||||
/**
|
||||
* @see JoinSeedNode
|
||||
*/
|
||||
case object InitJoin extends ClusterMessage
|
||||
|
||||
/**
|
||||
* @see JoinSeedNode
|
||||
*/
|
||||
case class InitJoinAck(address: Address) extends ClusterMessage
|
||||
|
||||
/**
|
||||
* Command to leave the cluster.
|
||||
*/
|
||||
|
|
@ -197,6 +215,9 @@ case class GossipOverview(
|
|||
seen: Map[Address, VectorClock] = Map.empty,
|
||||
unreachable: Set[Member] = Set.empty) {
|
||||
|
||||
def isNonDownUnreachable(address: Address): Boolean =
|
||||
unreachable.exists { m ⇒ m.address == address && m.status != Down }
|
||||
|
||||
override def toString =
|
||||
"GossipOverview(seen = [" + seen.mkString(", ") +
|
||||
"], unreachable = [" + unreachable.mkString(", ") +
|
||||
|
|
@ -343,11 +364,27 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto
|
|||
val log = Logging(context.system, this)
|
||||
|
||||
def receive = {
|
||||
case Join(address) ⇒ cluster.joining(address)
|
||||
case Down(address) ⇒ cluster.downing(address)
|
||||
case Leave(address) ⇒ cluster.leaving(address)
|
||||
case Exit(address) ⇒ cluster.exiting(address)
|
||||
case Remove(address) ⇒ cluster.removing(address)
|
||||
case JoinSeedNode ⇒ joinSeedNode()
|
||||
case InitJoin ⇒ sender ! InitJoinAck(cluster.selfAddress)
|
||||
case InitJoinAck(address) ⇒ cluster.join(address)
|
||||
case Join(address) ⇒ cluster.joining(address)
|
||||
case Down(address) ⇒ cluster.downing(address)
|
||||
case Leave(address) ⇒ cluster.leaving(address)
|
||||
case Exit(address) ⇒ cluster.exiting(address)
|
||||
case Remove(address) ⇒ cluster.removing(address)
|
||||
}
|
||||
|
||||
def joinSeedNode(): Unit = {
|
||||
val seedRoutees = for (address ← cluster.seedNodes; if address != cluster.selfAddress)
|
||||
yield self.path.toStringWithAddress(address)
|
||||
if (seedRoutees.nonEmpty) {
|
||||
implicit val within = Timeout(cluster.clusterSettings.SeedNodeTimeout)
|
||||
val seedRouter = context.actorOf(
|
||||
Props.empty.withRouter(ScatterGatherFirstCompletedRouter(
|
||||
routees = seedRoutees, within = within.duration)))
|
||||
seedRouter ? InitJoin pipeTo self
|
||||
seedRouter ! PoisonPill
|
||||
}
|
||||
}
|
||||
|
||||
override def unhandled(unknown: Any) = log.error("Illegal command [{}]", unknown)
|
||||
|
|
@ -436,15 +473,12 @@ trait ClusterNodeMBean {
|
|||
/**
|
||||
* This module is responsible for Gossiping cluster information. The abstraction maintains the list of live
|
||||
* and dead members. Periodically i.e. every 1 second this module chooses a random member and initiates a round
|
||||
* of Gossip with it. Whenever it gets gossip updates it updates the Failure Detector with the liveness
|
||||
* information.
|
||||
* of Gossip with it.
|
||||
* <p/>
|
||||
* During each of these runs the member initiates gossip exchange according to following rules (as defined in the
|
||||
* Cassandra documentation [http://wiki.apache.org/cassandra/ArchitectureGossip]:
|
||||
* During each of these runs the member initiates gossip exchange according to following rules:
|
||||
* <pre>
|
||||
* 1) Gossip to random live member (if any)
|
||||
* 2) Gossip to random unreachable member with certain probability depending on number of unreachable and live members
|
||||
* 3) If the member gossiped to at (1) was not deputy, or the number of live members is less than number of deputy list,
|
||||
* 2) If the member gossiped to at (1) was not deputy, or the number of live members is less than number of deputy list,
|
||||
* gossip to random deputy with certain probability depending on number of unreachable, deputy and live members.
|
||||
* </pre>
|
||||
*
|
||||
|
|
@ -480,8 +514,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
|
||||
implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout)
|
||||
|
||||
private val nodeToJoin: Option[Address] = NodeToJoin filter (_ != selfAddress)
|
||||
|
||||
private val serialization = remote.serialization
|
||||
|
||||
private val _isRunning = new AtomicBoolean(true)
|
||||
|
|
@ -508,8 +540,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
new AtomicReference[State](State(seenVersionedGossip))
|
||||
}
|
||||
|
||||
// try to join the node defined in the 'akka.cluster.node-to-join' option
|
||||
autoJoin()
|
||||
// try to join one of the nodes defined in the 'akka.cluster.seed-nodes'
|
||||
if (AutoJoin) joinSeedNode()
|
||||
|
||||
// ========================================================
|
||||
// ===================== WORK DAEMONS =====================
|
||||
|
|
@ -647,6 +679,12 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
*/
|
||||
def isAvailable: Boolean = !isUnavailable(state.get)
|
||||
|
||||
/**
|
||||
* Make it possible to override/configure seedNodes from tests without
|
||||
* specifying in config. Addresses are unknown before startup time.
|
||||
*/
|
||||
def seedNodes: IndexedSeq[Address] = SeedNodes
|
||||
|
||||
/**
|
||||
* Registers a listener to subscribe to cluster membership changes.
|
||||
*/
|
||||
|
|
@ -751,7 +789,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
val localUnreachable = localGossip.overview.unreachable
|
||||
|
||||
val alreadyMember = localMembers.exists(_.address == node)
|
||||
val isUnreachable = localUnreachable.exists { m ⇒ m.address == node && m.status != Down }
|
||||
val isUnreachable = localGossip.overview.isNonDownUnreachable(node)
|
||||
|
||||
if (!alreadyMember && !isUnreachable) {
|
||||
|
||||
|
|
@ -898,46 +936,49 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
val localState = state.get
|
||||
val localGossip = localState.latestGossip
|
||||
|
||||
val winningGossip =
|
||||
if (isSingletonCluster(localState) && localGossip.overview.unreachable.isEmpty && remoteGossip.members.contains(self)) {
|
||||
// a fresh singleton cluster that is joining, no need to merge, use received gossip
|
||||
remoteGossip
|
||||
if (!localGossip.overview.isNonDownUnreachable(from)) {
|
||||
|
||||
} else if (remoteGossip.version <> localGossip.version) {
|
||||
// concurrent
|
||||
val mergedGossip = remoteGossip merge localGossip
|
||||
val versionedMergedGossip = mergedGossip :+ vclockNode
|
||||
val winningGossip =
|
||||
if (isSingletonCluster(localState) && localGossip.overview.unreachable.isEmpty && remoteGossip.members.contains(self)) {
|
||||
// a fresh singleton cluster that is joining, no need to merge, use received gossip
|
||||
remoteGossip
|
||||
|
||||
log.debug(
|
||||
"""Can't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merging them into [{}]""",
|
||||
remoteGossip, localGossip, versionedMergedGossip)
|
||||
} else if (remoteGossip.version <> localGossip.version) {
|
||||
// concurrent
|
||||
val mergedGossip = remoteGossip merge localGossip
|
||||
val versionedMergedGossip = mergedGossip :+ vclockNode
|
||||
|
||||
versionedMergedGossip
|
||||
log.debug(
|
||||
"""Can't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merging them into [{}]""",
|
||||
remoteGossip, localGossip, versionedMergedGossip)
|
||||
|
||||
} else if (remoteGossip.version < localGossip.version) {
|
||||
// local gossip is newer
|
||||
localGossip
|
||||
versionedMergedGossip
|
||||
|
||||
} else {
|
||||
// remote gossip is newer
|
||||
remoteGossip
|
||||
} else if (remoteGossip.version < localGossip.version) {
|
||||
// local gossip is newer
|
||||
localGossip
|
||||
|
||||
} else {
|
||||
// remote gossip is newer
|
||||
remoteGossip
|
||||
}
|
||||
|
||||
val newJoinInProgress =
|
||||
if (localState.joinInProgress.isEmpty) localState.joinInProgress
|
||||
else localState.joinInProgress --
|
||||
winningGossip.members.map(_.address) --
|
||||
winningGossip.overview.unreachable.map(_.address)
|
||||
|
||||
val newState = localState copy (
|
||||
latestGossip = winningGossip seen selfAddress,
|
||||
joinInProgress = newJoinInProgress)
|
||||
|
||||
// if we won the race then update else try again
|
||||
if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update
|
||||
else {
|
||||
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from)
|
||||
notifyMembershipChangeListeners(localState, newState)
|
||||
}
|
||||
|
||||
val newJoinInProgress =
|
||||
if (localState.joinInProgress.isEmpty) localState.joinInProgress
|
||||
else localState.joinInProgress --
|
||||
winningGossip.members.map(_.address) --
|
||||
winningGossip.overview.unreachable.map(_.address)
|
||||
|
||||
val newState = localState copy (
|
||||
latestGossip = winningGossip seen selfAddress,
|
||||
joinInProgress = newJoinInProgress)
|
||||
|
||||
// if we won the race then update else try again
|
||||
if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update
|
||||
else {
|
||||
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from)
|
||||
notifyMembershipChangeListeners(localState, newState)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -947,9 +988,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
private[cluster] def receiveHeartbeat(from: Address): Unit = failureDetector heartbeat from
|
||||
|
||||
/**
|
||||
* Joins the pre-configured contact point.
|
||||
* Joins the pre-configured contact points.
|
||||
*/
|
||||
private def autoJoin(): Unit = nodeToJoin foreach join
|
||||
private def joinSeedNode(): Unit = clusterCommandDaemon ! ClusterUserAction.JoinSeedNode
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
|
|
@ -975,15 +1016,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
peer
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*/
|
||||
private[cluster] def gossipToUnreachableProbablity(membersSize: Int, unreachableSize: Int): Double =
|
||||
(membersSize + unreachableSize) match {
|
||||
case 0 ⇒ 0.0
|
||||
case sum ⇒ unreachableSize.toDouble / sum
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*/
|
||||
|
|
@ -1019,18 +1051,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
// 1. gossip to alive members
|
||||
val gossipedToAlive = gossipToRandomNodeOf(localMemberAddresses)
|
||||
|
||||
// 2. gossip to unreachable members
|
||||
if (localUnreachableSize > 0) {
|
||||
val probability = gossipToUnreachableProbablity(localMembersSize, localUnreachableSize)
|
||||
if (ThreadLocalRandom.current.nextDouble() < probability)
|
||||
gossipToRandomNodeOf(localUnreachableMembers.map(_.address))
|
||||
}
|
||||
|
||||
// 3. gossip to a deputy nodes for facilitating partition healing
|
||||
// 2. gossip to a deputy nodes for facilitating partition healing
|
||||
val deputies = deputyNodes(localMemberAddresses)
|
||||
val alreadyGossipedToDeputy = gossipedToAlive.map(deputies.contains(_)).getOrElse(false)
|
||||
if ((!alreadyGossipedToDeputy || localMembersSize < NrOfDeputyNodes) && deputies.nonEmpty) {
|
||||
val probability = gossipToDeputyProbablity(localMembersSize, localUnreachableSize, NrOfDeputyNodes)
|
||||
if ((!alreadyGossipedToDeputy || localMembersSize < seedNodes.size) && deputies.nonEmpty) {
|
||||
val probability = gossipToDeputyProbablity(localMembersSize, localUnreachableSize, seedNodes.size)
|
||||
if (ThreadLocalRandom.current.nextDouble() < probability)
|
||||
gossipToRandomNodeOf(deputies)
|
||||
}
|
||||
|
|
@ -1373,7 +1398,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
|||
* Gets the addresses of a all the 'deputy' nodes - excluding this node if part of the group.
|
||||
*/
|
||||
private def deputyNodes(addresses: IndexedSeq[Address]): IndexedSeq[Address] =
|
||||
addresses drop 1 take NrOfDeputyNodes filterNot (_ == selfAddress)
|
||||
addresses filterNot (_ == selfAddress) intersect seedNodes
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
|
|
|
|||
|
|
@ -22,10 +22,10 @@ class ClusterSettings(val config: Config, val systemName: String) {
|
|||
final val FailureDetectorAcceptableHeartbeatPause: Duration =
|
||||
Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS)
|
||||
|
||||
final val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match {
|
||||
case "" ⇒ None
|
||||
case AddressFromURIString(addr) ⇒ Some(addr)
|
||||
}
|
||||
final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map {
|
||||
case AddressFromURIString(addr) ⇒ addr
|
||||
}.toIndexedSeq
|
||||
final val SeedNodeTimeout: Duration = Duration(getMilliseconds("akka.cluster.seed-node-timeout"), MILLISECONDS)
|
||||
final val PeriodicTasksInitialDelay: Duration = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS)
|
||||
final val GossipInterval: Duration = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS)
|
||||
final val HeartbeatInterval: Duration = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS)
|
||||
|
|
@ -33,6 +33,7 @@ class ClusterSettings(val config: Config, val systemName: String) {
|
|||
final val UnreachableNodesReaperInterval: Duration = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS)
|
||||
final val NrOfGossipDaemons: Int = getInt("akka.cluster.nr-of-gossip-daemons")
|
||||
final val NrOfDeputyNodes: Int = getInt("akka.cluster.nr-of-deputy-nodes")
|
||||
final val AutoJoin: Boolean = getBoolean("akka.cluster.auto-join")
|
||||
final val AutoDown: Boolean = getBoolean("akka.cluster.auto-down")
|
||||
final val JoinTimeout: Duration = Duration(getMilliseconds("akka.cluster.join-timeout"), MILLISECONDS)
|
||||
final val SchedulerTickDuration: Duration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,46 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
|
||||
object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig {
|
||||
val seed1 = role("seed1")
|
||||
val seed2 = role("seed2")
|
||||
val ordinary1 = role("ordinary1")
|
||||
val ordinary2 = role("ordinary2")
|
||||
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
}
|
||||
|
||||
class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
|
||||
class JoinSeedNodeMultiJvmNode2 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
|
||||
class JoinSeedNodeMultiJvmNode3 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
|
||||
class JoinSeedNodeMultiJvmNode4 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
|
||||
|
||||
abstract class JoinSeedNodeSpec
|
||||
extends MultiNodeSpec(JoinSeedNodeMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import JoinSeedNodeMultiJvmSpec._
|
||||
|
||||
override def seedNodes = IndexedSeq(seed1, seed2)
|
||||
|
||||
"A cluster with configured seed nodes" must {
|
||||
"join the seed nodes at startup" taggedAs LongRunningTest in {
|
||||
|
||||
startClusterNode()
|
||||
enterBarrier("all-started")
|
||||
|
||||
awaitUpConvergence(4)
|
||||
|
||||
enterBarrier("after")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -26,7 +26,6 @@ object MultiNodeClusterSpec {
|
|||
leader-actions-interval = 200 ms
|
||||
unreachable-nodes-reaper-interval = 200 ms
|
||||
periodic-tasks-initial-delay = 300 ms
|
||||
nr-of-deputy-nodes = 2
|
||||
}
|
||||
akka.test {
|
||||
single-expect-default = 5 s
|
||||
|
|
@ -77,10 +76,22 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu
|
|||
throw t
|
||||
}
|
||||
|
||||
/**
|
||||
* Make it possible to override/configure seedNodes from tests without
|
||||
* specifying in config. Addresses are unknown before startup time.
|
||||
*/
|
||||
protected def seedNodes: IndexedSeq[RoleName] = IndexedSeq.empty
|
||||
|
||||
/**
|
||||
* The cluster node instance. Needs to be lazily created.
|
||||
*/
|
||||
private lazy val clusterNode = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector)
|
||||
private lazy val clusterNode = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) {
|
||||
override def seedNodes: IndexedSeq[Address] = {
|
||||
val testSeedNodes = MultiNodeClusterSpec.this.seedNodes
|
||||
if (testSeedNodes.isEmpty) super.seedNodes
|
||||
else testSeedNodes map address
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the cluster node to use.
|
||||
|
|
|
|||
|
|
@ -36,6 +36,8 @@ abstract class NodeJoinSpec
|
|||
startClusterNode()
|
||||
}
|
||||
|
||||
enterBarrier("first-started")
|
||||
|
||||
runOn(second) {
|
||||
cluster.join(first)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,7 +23,8 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig {
|
|||
// not MultiNodeClusterSpec.clusterConfig
|
||||
commonConfig(ConfigFactory.parseString("""
|
||||
akka.cluster {
|
||||
nr-of-deputy-nodes = 0
|
||||
# FIXME remove this (use default) when ticket #2239 has been fixed
|
||||
gossip-interval = 400 ms
|
||||
}
|
||||
akka.loglevel = INFO
|
||||
"""))
|
||||
|
|
|
|||
|
|
@ -21,7 +21,8 @@ class ClusterConfigSpec extends AkkaSpec {
|
|||
FailureDetectorImplementationClass must be(classOf[AccrualFailureDetector].getName)
|
||||
FailureDetectorMinStdDeviation must be(100 millis)
|
||||
FailureDetectorAcceptableHeartbeatPause must be(3 seconds)
|
||||
NodeToJoin must be(None)
|
||||
SeedNodes must be(Seq.empty[String])
|
||||
SeedNodeTimeout must be(5 seconds)
|
||||
PeriodicTasksInitialDelay must be(1 seconds)
|
||||
GossipInterval must be(1 second)
|
||||
HeartbeatInterval must be(1 second)
|
||||
|
|
@ -29,7 +30,7 @@ class ClusterConfigSpec extends AkkaSpec {
|
|||
UnreachableNodesReaperInterval must be(1 second)
|
||||
JoinTimeout must be(60 seconds)
|
||||
NrOfGossipDaemons must be(4)
|
||||
NrOfDeputyNodes must be(3)
|
||||
AutoJoin must be(true)
|
||||
AutoDown must be(true)
|
||||
SchedulerTickDuration must be(33 millis)
|
||||
SchedulerTicksPerWheel must be(512)
|
||||
|
|
|
|||
|
|
@ -11,12 +11,13 @@ import akka.actor.ExtendedActorSystem
|
|||
import akka.actor.Address
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import akka.remote.RemoteActorRefProvider
|
||||
|
||||
object ClusterSpec {
|
||||
val config = """
|
||||
akka.cluster {
|
||||
auto-join = off
|
||||
auto-down = off
|
||||
nr-of-deputy-nodes = 3
|
||||
periodic-tasks-initial-delay = 120 seconds // turn off scheduled tasks
|
||||
}
|
||||
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||
|
|
@ -31,12 +32,24 @@ object ClusterSpec {
|
|||
class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
|
||||
import ClusterSpec._
|
||||
|
||||
val selfAddress = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.address
|
||||
val addresses = IndexedSeq(
|
||||
selfAddress,
|
||||
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 1),
|
||||
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 2),
|
||||
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 3),
|
||||
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 4),
|
||||
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 5))
|
||||
|
||||
val deterministicRandom = new AtomicInteger
|
||||
|
||||
val failureDetector = new FailureDetectorPuppet(system)
|
||||
|
||||
val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) {
|
||||
|
||||
// 3 deputy nodes (addresses index 1, 2, 3)
|
||||
override def seedNodes = addresses.slice(1, 4)
|
||||
|
||||
override def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] = {
|
||||
if (addresses.isEmpty) None
|
||||
else Some(addresses.toSeq(deterministicRandom.getAndIncrement % addresses.size))
|
||||
|
|
@ -50,14 +63,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
|
|||
testActor ! GossipTo(address)
|
||||
}
|
||||
|
||||
@volatile
|
||||
var _gossipToUnreachableProbablity = 0.0
|
||||
|
||||
override def gossipToUnreachableProbablity(membersSize: Int, unreachableSize: Int): Double = {
|
||||
if (_gossipToUnreachableProbablity < 0.0) super.gossipToUnreachableProbablity(membersSize, unreachableSize)
|
||||
else _gossipToUnreachableProbablity
|
||||
}
|
||||
|
||||
@volatile
|
||||
var _gossipToDeputyProbablity = 0.0
|
||||
|
||||
|
|
@ -68,20 +73,10 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
|
|||
|
||||
}
|
||||
|
||||
val selfAddress = cluster.self.address
|
||||
val addresses = IndexedSeq(
|
||||
selfAddress,
|
||||
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 1),
|
||||
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 2),
|
||||
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 3),
|
||||
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 4),
|
||||
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 5))
|
||||
|
||||
def memberStatus(address: Address): Option[MemberStatus] =
|
||||
cluster.latestGossip.members.collectFirst { case m if m.address == address ⇒ m.status }
|
||||
|
||||
before {
|
||||
cluster._gossipToUnreachableProbablity = 0.0
|
||||
cluster._gossipToDeputyProbablity = 0.0
|
||||
addresses foreach failureDetector.remove
|
||||
deterministicRandom.set(0)
|
||||
|
|
@ -89,6 +84,11 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
|
|||
|
||||
"A Cluster" must {
|
||||
|
||||
"use the address of the remote transport" in {
|
||||
cluster.selfAddress must be(selfAddress)
|
||||
cluster.self.address must be(selfAddress)
|
||||
}
|
||||
|
||||
"initially be singleton cluster and reach convergence immediately" in {
|
||||
cluster.isSingletonCluster must be(true)
|
||||
cluster.latestGossip.members.map(_.address) must be(Set(selfAddress))
|
||||
|
|
@ -133,17 +133,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
|
|||
expectNoMsg(1 second)
|
||||
}
|
||||
|
||||
"use certain probability for gossiping to unreachable node depending on the number of unreachable and live nodes" in {
|
||||
cluster._gossipToUnreachableProbablity = -1.0 // use real impl
|
||||
cluster.gossipToUnreachableProbablity(10, 1) must be < (cluster.gossipToUnreachableProbablity(9, 1))
|
||||
cluster.gossipToUnreachableProbablity(10, 1) must be < (cluster.gossipToUnreachableProbablity(10, 2))
|
||||
cluster.gossipToUnreachableProbablity(10, 5) must be < (cluster.gossipToUnreachableProbablity(10, 9))
|
||||
cluster.gossipToUnreachableProbablity(0, 10) must be <= (1.0)
|
||||
cluster.gossipToUnreachableProbablity(1, 10) must be <= (1.0)
|
||||
cluster.gossipToUnreachableProbablity(10, 0) must be(0.0 plusOrMinus (0.0001))
|
||||
cluster.gossipToUnreachableProbablity(0, 0) must be(0.0 plusOrMinus (0.0001))
|
||||
}
|
||||
|
||||
"use certain probability for gossiping to deputy node depending on the number of unreachable and live nodes" in {
|
||||
cluster._gossipToDeputyProbablity = -1.0 // use real impl
|
||||
cluster.gossipToDeputyProbablity(10, 1, 2) must be < (cluster.gossipToDeputyProbablity(9, 1, 2))
|
||||
|
|
@ -161,7 +150,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
|
|||
"gossip to duputy node" in {
|
||||
cluster._gossipToDeputyProbablity = 1.0 // always
|
||||
|
||||
// we have configured 2 deputy nodes
|
||||
// we have configured 3 deputy nodes (seedNodes)
|
||||
cluster.gossip() // 1 is deputy
|
||||
cluster.gossip() // 2 is deputy
|
||||
cluster.gossip() // 3 is deputy
|
||||
|
|
@ -178,22 +167,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
|
|||
|
||||
}
|
||||
|
||||
"gossip to random unreachable node" in {
|
||||
val dead = Set(addresses(1))
|
||||
dead foreach failureDetector.markNodeAsUnavailable
|
||||
cluster._gossipToUnreachableProbablity = 1.0 // always
|
||||
|
||||
cluster.reapUnreachableMembers()
|
||||
cluster.latestGossip.overview.unreachable.map(_.address) must be(dead)
|
||||
|
||||
cluster.gossip()
|
||||
|
||||
expectMsg(GossipTo(addresses(2))) // first available
|
||||
expectMsg(GossipTo(addresses(1))) // the unavailable
|
||||
|
||||
expectNoMsg(1 second)
|
||||
}
|
||||
|
||||
"gossip to random deputy node if number of live nodes is less than number of deputy nodes" in {
|
||||
cluster._gossipToDeputyProbablity = -1.0 // real impl
|
||||
// 0 and 2 still alive
|
||||
|
|
|
|||
|
|
@ -183,14 +183,20 @@ according to the Failure Detector is considered unreachable. This means setting
|
|||
the unreachable node status to ``down`` automatically.
|
||||
|
||||
|
||||
Seed Nodes
|
||||
^^^^^^^^^^
|
||||
|
||||
The seed nodes are configured contact points for inital join of the cluster.
|
||||
When a new node is started started it sends a message to all seed nodes and
|
||||
then sends join command to the one that answers first.
|
||||
|
||||
It is possible to turn off automatic join.
|
||||
|
||||
Deputy Nodes
|
||||
^^^^^^^^^^^^
|
||||
|
||||
After gossip convergence a set of ``deputy`` nodes for the cluster can be
|
||||
determined. As with the ``leader``, there is no ``deputy`` election process,
|
||||
the deputies can always be recognised deterministically by any node whenever there
|
||||
is gossip convergence. The list of ``deputy`` nodes is simply the N - 1 number
|
||||
of nodes (e.g. starting with the first node after the ``leader``) in sorted order.
|
||||
The deputy nodes are the live members of the configured seed nodes.
|
||||
It is preferred to use deputy nodes in different racks/data centers.
|
||||
|
||||
The nodes defined as ``deputy`` nodes are just regular member nodes whose only
|
||||
"special role" is to help breaking logical partitions as seen in the gossip
|
||||
|
|
@ -213,7 +219,7 @@ nodes involved in a gossip exchange.
|
|||
|
||||
Periodically, the default is every 1 second, each node chooses another random
|
||||
node to initiate a round of gossip with. The choice of node is random but can
|
||||
also include extra gossiping for unreachable nodes, ``deputy`` nodes, and nodes with
|
||||
also include extra gossiping for ``deputy`` nodes, and nodes with
|
||||
either newer or older state versions.
|
||||
|
||||
The gossip overview contains the current state version for all nodes and also a
|
||||
|
|
@ -228,14 +234,11 @@ During each round of gossip exchange the following process is used:
|
|||
|
||||
1. Gossip to random live node (if any)
|
||||
|
||||
2. Gossip to random unreachable node with certain probability depending on the
|
||||
number of unreachable and live nodes
|
||||
|
||||
3. If the node gossiped to at (1) was not a ``deputy`` node, or the number of live
|
||||
2. If the node gossiped to at (1) was not a ``deputy`` node, or the number of live
|
||||
nodes is less than number of ``deputy`` nodes, gossip to random ``deputy`` node with
|
||||
certain probability depending on number of unreachable, ``deputy``, and live nodes.
|
||||
|
||||
4. Gossip to random node with newer or older state information, based on the
|
||||
3. Gossip to random node with newer or older state information, based on the
|
||||
current gossip overview, with some probability (?)
|
||||
|
||||
The gossiper only sends the gossip overview to the chosen node. The recipient of
|
||||
|
|
|
|||
|
|
@ -575,7 +575,7 @@ order as they have been received originally.
|
|||
Please note that the stash can only be used together with actors
|
||||
that have a deque-based mailbox. For this, configure the
|
||||
``mailbox-type`` of the dispatcher to be a deque-based mailbox, such as
|
||||
``akka.dispatch.UnboundedDequeBasedMailbox``.
|
||||
``akka.dispatch.UnboundedDequeBasedMailbox`` (see :ref:`dispatchers-java`).
|
||||
|
||||
Here is an example of the ``UntypedActorWithStash`` class in action:
|
||||
|
||||
|
|
@ -585,7 +585,7 @@ Invoking ``stash()`` adds the current message (the message that the
|
|||
actor received last) to the actor's stash. It is typically invoked
|
||||
when handling the default case in the actor's message handler to stash
|
||||
messages that aren't handled by the other cases. It is illegal to
|
||||
stash the same message twice; to do so results in a
|
||||
stash the same message twice; to do so results in an
|
||||
``IllegalStateException`` being thrown. The stash may also be bounded
|
||||
in which case invoking ``stash()`` may lead to a capacity violation,
|
||||
which results in a ``StashOverflowException``. The capacity of the
|
||||
|
|
|
|||
|
|
@ -636,7 +636,7 @@ order as they have been received originally.
|
|||
Please note that the ``Stash`` can only be used together with actors
|
||||
that have a deque-based mailbox. For this, configure the
|
||||
``mailbox-type`` of the dispatcher to be a deque-based mailbox, such as
|
||||
``akka.dispatch.UnboundedDequeBasedMailbox``.
|
||||
``akka.dispatch.UnboundedDequeBasedMailbox`` (see :ref:`dispatchers-scala`).
|
||||
|
||||
Here is an example of the ``Stash`` in action:
|
||||
|
||||
|
|
@ -646,7 +646,7 @@ Invoking ``stash()`` adds the current message (the message that the
|
|||
actor received last) to the actor's stash. It is typically invoked
|
||||
when handling the default case in the actor's message handler to stash
|
||||
messages that aren't handled by the other cases. It is illegal to
|
||||
stash the same message twice; to do so results in a
|
||||
stash the same message twice; to do so results in an
|
||||
``IllegalStateException`` being thrown. The stash may also be bounded
|
||||
in which case invoking ``stash()`` may lead to a capacity violation,
|
||||
which results in a ``StashOverflowException``. The capacity of the
|
||||
|
|
|
|||
|
|
@ -13,50 +13,50 @@ akka {
|
|||
file-based {
|
||||
# directory below which this queue resides
|
||||
directory-path = "./_mb"
|
||||
|
||||
|
||||
# attempting to add an item after the queue reaches this size (in items) will fail.
|
||||
max-items = 2147483647
|
||||
|
||||
|
||||
# attempting to add an item after the queue reaches this size (in bytes) will fail.
|
||||
max-size = 2147483647 bytes
|
||||
|
||||
|
||||
# attempting to add an item larger than this size (in bytes) will fail.
|
||||
max-item-size = 2147483647 bytes
|
||||
|
||||
|
||||
# maximum expiration time for this queue (seconds).
|
||||
max-age = 0s
|
||||
|
||||
|
||||
# maximum journal size before the journal should be rotated.
|
||||
max-journal-size = 16 MiB
|
||||
|
||||
|
||||
# maximum size of a queue before it drops into read-behind mode.
|
||||
max-memory-size = 128 MiB
|
||||
|
||||
|
||||
# maximum overflow (multiplier) of a journal file before we re-create it.
|
||||
max-journal-overflow = 10
|
||||
|
||||
|
||||
# absolute maximum size of a journal file until we rebuild it, no matter what.
|
||||
max-journal-size-absolute = 9223372036854775807 bytes
|
||||
|
||||
|
||||
# whether to drop older items (instead of newer) when the queue is full
|
||||
discard-old-when-full = on
|
||||
|
||||
discard-old-when-full = on
|
||||
|
||||
# whether to keep a journal file at all
|
||||
keep-journal = on
|
||||
|
||||
keep-journal = on
|
||||
|
||||
# whether to sync the journal after each transaction
|
||||
sync-journal = off
|
||||
|
||||
# circuit breaker configuration
|
||||
circuit-breaker {
|
||||
# maximum number of failures before opening breaker
|
||||
max-failures = 3
|
||||
# maximum number of failures before opening breaker
|
||||
max-failures = 3
|
||||
|
||||
# duration of time beyond which a call is assumed to be timed out and considered a failure
|
||||
call-timeout = 3 seconds
|
||||
# duration of time beyond which a call is assumed to be timed out and considered a failure
|
||||
call-timeout = 3 seconds
|
||||
|
||||
# duration of time to wait until attempting to reset the breaker during which all calls fail-fast
|
||||
reset-timeout = 30 seconds
|
||||
# duration of time to wait until attempting to reset the breaker during which all calls fail-fast
|
||||
reset-timeout = 30 seconds
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ class FileBasedMessageQueue(_owner: ActorRef, _system: ExtendedActorSystem, val
|
|||
(new java.io.File(settings.QueuePath)) match {
|
||||
case dir if dir.exists && !dir.isDirectory ⇒ throw new IllegalStateException("Path already occupied by non-directory " + dir)
|
||||
case dir if !dir.exists ⇒ if (!dir.mkdirs() && !dir.isDirectory) throw new IllegalStateException("Creation of directory failed " + dir)
|
||||
case _ ⇒ //All good
|
||||
case _ ⇒ // All good
|
||||
}
|
||||
val queue = new filequeue.PersistentQueue(settings.QueuePath, name, settings, log)
|
||||
queue.setup // replays journal
|
||||
|
|
|
|||
|
|
@ -16,20 +16,20 @@ class FileBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val use
|
|||
val config = initialize
|
||||
import config._
|
||||
|
||||
val QueuePath: String = getString("directory-path")
|
||||
val MaxItems: Int = getInt("max-items")
|
||||
val MaxSize: Long = getBytes("max-size")
|
||||
val MaxItemSize: Long = getBytes("max-item-size")
|
||||
val MaxAge: Duration = Duration(getMilliseconds("max-age"), MILLISECONDS)
|
||||
val MaxJournalSize: Long = getBytes("max-journal-size")
|
||||
val MaxMemorySize: Long = getBytes("max-memory-size")
|
||||
val MaxJournalOverflow: Int = getInt("max-journal-overflow")
|
||||
val MaxJournalSizeAbsolute: Long = getBytes("max-journal-size-absolute")
|
||||
val DiscardOldWhenFull: Boolean = getBoolean("discard-old-when-full")
|
||||
val KeepJournal: Boolean = getBoolean("keep-journal")
|
||||
val SyncJournal: Boolean = getBoolean("sync-journal")
|
||||
final val QueuePath: String = getString("directory-path")
|
||||
final val MaxItems: Int = getInt("max-items")
|
||||
final val MaxSize: Long = getBytes("max-size")
|
||||
final val MaxItemSize: Long = getBytes("max-item-size")
|
||||
final val MaxAge: Duration = Duration(getMilliseconds("max-age"), MILLISECONDS)
|
||||
final val MaxJournalSize: Long = getBytes("max-journal-size")
|
||||
final val MaxMemorySize: Long = getBytes("max-memory-size")
|
||||
final val MaxJournalOverflow: Int = getInt("max-journal-overflow")
|
||||
final val MaxJournalSizeAbsolute: Long = getBytes("max-journal-size-absolute")
|
||||
final val DiscardOldWhenFull: Boolean = getBoolean("discard-old-when-full")
|
||||
final val KeepJournal: Boolean = getBoolean("keep-journal")
|
||||
final val SyncJournal: Boolean = getBoolean("sync-journal")
|
||||
|
||||
val CircuitBreakerMaxFailures = getInt("circuit-breaker.max-failures")
|
||||
val CircuitBreakerCallTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.call-timeout"))
|
||||
val CircuitBreakerResetTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.reset-timeout"))
|
||||
}
|
||||
final val CircuitBreakerMaxFailures = getInt("circuit-breaker.max-failures")
|
||||
final val CircuitBreakerCallTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.call-timeout"))
|
||||
final val CircuitBreakerResetTimeout = Duration.fromNanos(getNanoseconds("circuit-breaker.reset-timeout"))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -68,44 +68,44 @@ class PersistentQueue(persistencePath: String, val name: String, val settings: F
|
|||
def overlay[T](base: ⇒ T) = new OverlaySetting(base)
|
||||
|
||||
// attempting to add an item after the queue reaches this size (in items) will fail.
|
||||
val maxItems = overlay(PersistentQueue.maxItems)
|
||||
final val maxItems = overlay(PersistentQueue.maxItems)
|
||||
|
||||
// attempting to add an item after the queue reaches this size (in bytes) will fail.
|
||||
val maxSize = overlay(PersistentQueue.maxSize)
|
||||
final val maxSize = overlay(PersistentQueue.maxSize)
|
||||
|
||||
// attempting to add an item larger than this size (in bytes) will fail.
|
||||
val maxItemSize = overlay(PersistentQueue.maxItemSize)
|
||||
final val maxItemSize = overlay(PersistentQueue.maxItemSize)
|
||||
|
||||
// maximum expiration time for this queue (seconds).
|
||||
val maxAge = overlay(PersistentQueue.maxAge)
|
||||
final val maxAge = overlay(PersistentQueue.maxAge)
|
||||
|
||||
// maximum journal size before the journal should be rotated.
|
||||
val maxJournalSize = overlay(PersistentQueue.maxJournalSize)
|
||||
final val maxJournalSize = overlay(PersistentQueue.maxJournalSize)
|
||||
|
||||
// maximum size of a queue before it drops into read-behind mode.
|
||||
val maxMemorySize = overlay(PersistentQueue.maxMemorySize)
|
||||
final val maxMemorySize = overlay(PersistentQueue.maxMemorySize)
|
||||
|
||||
// maximum overflow (multiplier) of a journal file before we re-create it.
|
||||
val maxJournalOverflow = overlay(PersistentQueue.maxJournalOverflow)
|
||||
final val maxJournalOverflow = overlay(PersistentQueue.maxJournalOverflow)
|
||||
|
||||
// absolute maximum size of a journal file until we rebuild it, no matter what.
|
||||
val maxJournalSizeAbsolute = overlay(PersistentQueue.maxJournalSizeAbsolute)
|
||||
final val maxJournalSizeAbsolute = overlay(PersistentQueue.maxJournalSizeAbsolute)
|
||||
|
||||
// whether to drop older items (instead of newer) when the queue is full
|
||||
val discardOldWhenFull = overlay(PersistentQueue.discardOldWhenFull)
|
||||
final val discardOldWhenFull = overlay(PersistentQueue.discardOldWhenFull)
|
||||
|
||||
// whether to keep a journal file at all
|
||||
val keepJournal = overlay(PersistentQueue.keepJournal)
|
||||
final val keepJournal = overlay(PersistentQueue.keepJournal)
|
||||
|
||||
// whether to sync the journal after each transaction
|
||||
val syncJournal = overlay(PersistentQueue.syncJournal)
|
||||
final val syncJournal = overlay(PersistentQueue.syncJournal)
|
||||
|
||||
// (optional) move expired items over to this queue
|
||||
val expiredQueue = overlay(PersistentQueue.expiredQueue)
|
||||
final val expiredQueue = overlay(PersistentQueue.expiredQueue)
|
||||
|
||||
private var journal = new Journal(new File(persistencePath, name).getCanonicalPath, syncJournal(), log)
|
||||
|
||||
// track tentative removals
|
||||
// track tentative remofinal vals
|
||||
private var xidCounter: Int = 0
|
||||
private val openTransactions = new mutable.HashMap[Int, QItem]
|
||||
def openTransactionCount = openTransactions.size
|
||||
|
|
|
|||
|
|
@ -68,11 +68,15 @@ trait DurableMessageSerialization { this: DurableMessageQueue ⇒
|
|||
* Conventional organization of durable mailbox settings:
|
||||
*
|
||||
* {{{
|
||||
* my-durable-dispatcher {
|
||||
* mailbox-type = "my.durable.mailbox"
|
||||
* my-durable-mailbox {
|
||||
* setting1 = 1
|
||||
* setting2 = 2
|
||||
* akka {
|
||||
* actor {
|
||||
* my-durable-dispatcher {
|
||||
* mailbox-type = "my.durable.mailbox"
|
||||
* my-durable-mailbox {
|
||||
* setting1 = 1
|
||||
* setting2 = 2
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
* }}}
|
||||
|
|
|
|||
|
|
@ -282,6 +282,8 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex
|
|||
import akka.actor.FSM._
|
||||
import Controller._
|
||||
|
||||
var roleName: RoleName = null
|
||||
|
||||
startWith(Initial, None)
|
||||
|
||||
whenUnhandled {
|
||||
|
|
@ -292,12 +294,15 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex
|
|||
}
|
||||
|
||||
onTermination {
|
||||
case _ ⇒ controller ! ClientDisconnected
|
||||
case _ ⇒
|
||||
controller ! ClientDisconnected(roleName)
|
||||
channel.close()
|
||||
}
|
||||
|
||||
when(Initial, stateTimeout = 10 seconds) {
|
||||
case Event(Hello(name, addr), _) ⇒
|
||||
controller ! NodeInfo(RoleName(name), addr, self)
|
||||
roleName = RoleName(name)
|
||||
controller ! NodeInfo(roleName, addr, self)
|
||||
goto(Ready)
|
||||
case Event(x: NetworkOp, _) ⇒
|
||||
log.warning("client {} sent no Hello in first message (instead {}), disconnecting", getAddrString(channel), x)
|
||||
|
|
@ -334,10 +339,6 @@ private[akka] class ServerFSM(val controller: ActorRef, val channel: Channel) ex
|
|||
}
|
||||
|
||||
initialize
|
||||
|
||||
onTermination {
|
||||
case _ ⇒ channel.close()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -517,10 +518,13 @@ private[akka] class BarrierCoordinator extends Actor with LoggingFSM[BarrierCoor
|
|||
if (clients.find(_.name == n.name).isDefined) throw new DuplicateNode(d, n)
|
||||
stay using d.copy(clients = clients + n)
|
||||
case Event(ClientDisconnected(name), d @ Data(clients, _, arrived, _)) ⇒
|
||||
if (clients.isEmpty) throw BarrierEmpty(d, "cannot disconnect " + name + ": no client to disconnect")
|
||||
(clients find (_.name == name)) match {
|
||||
case None ⇒ stay
|
||||
case Some(c) ⇒ throw ClientLost(d.copy(clients = clients - c, arrived = arrived filterNot (_ == c.fsm)), name)
|
||||
if (arrived.isEmpty)
|
||||
stay using d.copy(clients = clients.filterNot(_.name == name))
|
||||
else {
|
||||
(clients find (_.name == name)) match {
|
||||
case None ⇒ stay
|
||||
case Some(c) ⇒ throw ClientLost(d.copy(clients = clients - c, arrived = arrived filterNot (_ == c.fsm)), name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.remote.testkit
|
||||
|
||||
import akka.testkit.LongRunningTest
|
||||
|
||||
object MultiNodeSpecMultiJvmSpec extends MultiNodeConfig {
|
||||
commonConfig(debugConfig(on = false))
|
||||
|
||||
val node1 = role("node1")
|
||||
val node2 = role("node2")
|
||||
val node3 = role("node3")
|
||||
val node4 = role("node4")
|
||||
}
|
||||
|
||||
class MultiNodeSpecSpecMultiJvmNode1 extends MultiNodeSpecSpec
|
||||
class MultiNodeSpecSpecMultiJvmNode2 extends MultiNodeSpecSpec
|
||||
class MultiNodeSpecSpecMultiJvmNode3 extends MultiNodeSpecSpec
|
||||
class MultiNodeSpecSpecMultiJvmNode4 extends MultiNodeSpecSpec
|
||||
|
||||
class MultiNodeSpecSpec extends MultiNodeSpec(MultiNodeSpecMultiJvmSpec) {
|
||||
|
||||
import MultiNodeSpecMultiJvmSpec._
|
||||
|
||||
def initialParticipants = 4
|
||||
|
||||
"A MultiNodeSpec" must {
|
||||
|
||||
"wait for all nodes to remove themselves before we shut the conductor down" taggedAs LongRunningTest in {
|
||||
enterBarrier("startup")
|
||||
// this test is empty here since it only exercises the shutdown code in the MultiNodeSpec
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -59,14 +59,9 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {
|
|||
val b = getBarrier()
|
||||
b ! NodeInfo(A, AddressFromURIString("akka://sys"), system.deadLetters)
|
||||
b ! ClientDisconnected(B)
|
||||
EventFilter[ClientLost](occurrences = 1) intercept {
|
||||
b ! ClientDisconnected(A)
|
||||
}
|
||||
expectMsg(Failed(b, ClientLost(Data(Set(), "", Nil, null), A)))
|
||||
EventFilter[BarrierEmpty](occurrences = 1) intercept {
|
||||
b ! ClientDisconnected(A)
|
||||
}
|
||||
expectMsg(Failed(b, BarrierEmpty(Data(Set(), "", Nil, null), "cannot disconnect RoleName(a): no client to disconnect")))
|
||||
expectNoMsg(1 second)
|
||||
b ! ClientDisconnected(A)
|
||||
expectNoMsg(1 second)
|
||||
}
|
||||
|
||||
"fail entering barrier when nobody registered" taggedAs TimingTest in {
|
||||
|
|
@ -264,12 +259,9 @@ class BarrierSpec extends AkkaSpec(BarrierSpec.config) with ImplicitSender {
|
|||
b ! NodeInfo(A, AddressFromURIString("akka://sys"), testActor)
|
||||
expectMsg(ToClient(Done))
|
||||
b ! ClientDisconnected(B)
|
||||
EventFilter[ClientLost](occurrences = 1) intercept {
|
||||
b ! ClientDisconnected(A)
|
||||
}
|
||||
EventFilter[BarrierEmpty](occurrences = 1) intercept {
|
||||
b ! ClientDisconnected(A)
|
||||
}
|
||||
expectNoMsg(1 second)
|
||||
b ! ClientDisconnected(A)
|
||||
expectNoMsg(1 second)
|
||||
}
|
||||
|
||||
"fail entering barrier when nobody registered" taggedAs TimingTest in {
|
||||
|
|
|
|||
|
|
@ -7,12 +7,13 @@ import java.net.InetSocketAddress
|
|||
|
||||
import com.typesafe.config.{ ConfigObject, ConfigFactory, Config }
|
||||
|
||||
import akka.actor.{ RootActorPath, Deploy, ActorPath, ActorSystem, ExtendedActorSystem }
|
||||
import akka.actor.{ RootActorPath, ActorPath, ActorSystem, ExtendedActorSystem }
|
||||
import akka.dispatch.Await
|
||||
import akka.dispatch.Await.Awaitable
|
||||
import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName }
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.util.{ Timeout, NonFatal, Duration }
|
||||
import akka.util.{ Timeout, NonFatal }
|
||||
import akka.util.duration._
|
||||
|
||||
/**
|
||||
* Configure the role names and participants of the test, including configuration settings.
|
||||
|
|
@ -261,4 +262,16 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
|
|||
// useful to see which jvm is running which role
|
||||
log.info("Role [{}] started", myself.name)
|
||||
|
||||
// wait for all nodes to remove themselves before we shut the conductor down
|
||||
final override def beforeShutdown() = {
|
||||
if (selfIndex == 0) {
|
||||
testConductor.removeNode(myself)
|
||||
within(testConductor.Settings.BarrierTimeout.duration) {
|
||||
awaitCond {
|
||||
testConductor.getNodes.await.filterNot(_ == myself).isEmpty
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -74,6 +74,7 @@ abstract class AkkaSpec(_system: ActorSystem)
|
|||
}
|
||||
|
||||
final override def afterAll {
|
||||
beforeShutdown()
|
||||
system.shutdown()
|
||||
try system.awaitTermination(5 seconds) catch {
|
||||
case _: TimeoutException ⇒ system.log.warning("Failed to stop [{}] within 5 seconds", system.name)
|
||||
|
|
@ -83,6 +84,8 @@ abstract class AkkaSpec(_system: ActorSystem)
|
|||
|
||||
protected def atStartup() {}
|
||||
|
||||
protected def beforeShutdown() {}
|
||||
|
||||
protected def atTermination() {}
|
||||
|
||||
def spawn(dispatcherId: String = Dispatchers.DefaultDispatcherId)(body: ⇒ Unit) {
|
||||
|
|
|
|||
|
|
@ -87,16 +87,15 @@ object Sphinx {
|
|||
|
||||
def pdfTask = (sphinxLatex, streams) map {
|
||||
(latex, s) => {
|
||||
val empty = (latex * "*.pdf").get.isEmpty
|
||||
val pdf = latex / "Akka.pdf"
|
||||
def failed = sys.error("Failed to build Sphinx pdf documentation.")
|
||||
if (empty) {
|
||||
if (!pdf.exists) {
|
||||
s.log.info("Building Sphinx pdf documentation...")
|
||||
val logger = newLogger(s)
|
||||
val exitCode = Process(Seq("make", "all-pdf"), latex) ! logger
|
||||
if (exitCode != 0) failed
|
||||
s.log.info("Sphinx pdf documentation created: %s" format pdf)
|
||||
}
|
||||
val pdf = (latex * "*.pdf").get.headOption.getOrElse(failed)
|
||||
if (empty) s.log.info("Sphinx pdf documentation created: %s" format pdf)
|
||||
pdf
|
||||
}
|
||||
}
|
||||
|
|
|
|||
9
repl
9
repl
|
|
@ -1,9 +0,0 @@
|
|||
import akka.actor._
|
||||
import akka.dispatch.{ Future, Promise }
|
||||
import com.typesafe.config.ConfigFactory
|
||||
val config=ConfigFactory.parseString("akka.daemonic=on")
|
||||
val sys=ActorSystem("repl", config.withFallback(ConfigFactory.load())).asInstanceOf[ExtendedActorSystem]
|
||||
implicit val ec=sys.dispatcher
|
||||
import akka.util.duration._
|
||||
import akka.util.Timeout
|
||||
implicit val timeout=Timeout(5 seconds)
|
||||
Loading…
Add table
Add a link
Reference in a new issue