Merge branch 'master' into wip-scala210M4-√
This commit is contained in:
commit
32dc65aab5
7 changed files with 53 additions and 30 deletions
|
|
@ -18,7 +18,8 @@ akka {
|
||||||
# how long to wait for one of the seed nodes to reply to initial join request
|
# how long to wait for one of the seed nodes to reply to initial join request
|
||||||
seed-node-timeout = 5s
|
seed-node-timeout = 5s
|
||||||
|
|
||||||
# automatic join the seed-nodes at startup
|
# Automatic join the seed-nodes at startup.
|
||||||
|
# If seed-nodes is empty it will join itself and become a single node cluster.
|
||||||
auto-join = on
|
auto-join = on
|
||||||
|
|
||||||
# should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN?
|
# should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN?
|
||||||
|
|
|
||||||
|
|
@ -364,20 +364,23 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto
|
||||||
val log = Logging(context.system, this)
|
val log = Logging(context.system, this)
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case JoinSeedNode ⇒ joinSeedNode()
|
case JoinSeedNode ⇒ joinSeedNode()
|
||||||
case InitJoin ⇒ sender ! InitJoinAck(cluster.selfAddress)
|
case InitJoin ⇒ sender ! InitJoinAck(cluster.selfAddress)
|
||||||
case InitJoinAck(address) ⇒ cluster.join(address)
|
case InitJoinAck(address) ⇒ cluster.join(address)
|
||||||
case Join(address) ⇒ cluster.joining(address)
|
case Join(address) ⇒ cluster.joining(address)
|
||||||
case Down(address) ⇒ cluster.downing(address)
|
case Down(address) ⇒ cluster.downing(address)
|
||||||
case Leave(address) ⇒ cluster.leaving(address)
|
case Leave(address) ⇒ cluster.leaving(address)
|
||||||
case Exit(address) ⇒ cluster.exiting(address)
|
case Exit(address) ⇒ cluster.exiting(address)
|
||||||
case Remove(address) ⇒ cluster.removing(address)
|
case Remove(address) ⇒ cluster.removing(address)
|
||||||
|
case Failure(e: AskTimeoutException) ⇒ joinSeedNodeTimeout()
|
||||||
}
|
}
|
||||||
|
|
||||||
def joinSeedNode(): Unit = {
|
def joinSeedNode(): Unit = {
|
||||||
val seedRoutees = for (address ← cluster.seedNodes; if address != cluster.selfAddress)
|
val seedRoutees = for (address ← cluster.seedNodes; if address != cluster.selfAddress)
|
||||||
yield self.path.toStringWithAddress(address)
|
yield self.path.toStringWithAddress(address)
|
||||||
if (seedRoutees.nonEmpty) {
|
if (seedRoutees.isEmpty) {
|
||||||
|
cluster join cluster.selfAddress
|
||||||
|
} else {
|
||||||
implicit val within = Timeout(cluster.clusterSettings.SeedNodeTimeout)
|
implicit val within = Timeout(cluster.clusterSettings.SeedNodeTimeout)
|
||||||
val seedRouter = context.actorOf(
|
val seedRouter = context.actorOf(
|
||||||
Props.empty.withRouter(ScatterGatherFirstCompletedRouter(
|
Props.empty.withRouter(ScatterGatherFirstCompletedRouter(
|
||||||
|
|
@ -387,6 +390,8 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def joinSeedNodeTimeout(): Unit = cluster join cluster.selfAddress
|
||||||
|
|
||||||
override def unhandled(unknown: Any) = log.error("Illegal command [{}]", unknown)
|
override def unhandled(unknown: Any) = log.error("Illegal command [{}]", unknown)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -534,10 +539,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
||||||
}
|
}
|
||||||
|
|
||||||
private val state = {
|
private val state = {
|
||||||
val member = Member(selfAddress, Joining)
|
// note that self is not initially member,
|
||||||
val versionedGossip = Gossip(members = Gossip.emptyMembers + member) :+ vclockNode // add me as member and update my vector clock
|
// and the Gossip is not versioned for this 'Node' yet
|
||||||
val seenVersionedGossip = versionedGossip seen selfAddress
|
new AtomicReference[State](State(Gossip(members = Gossip.emptyMembers)))
|
||||||
new AtomicReference[State](State(seenVersionedGossip))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// try to join one of the nodes defined in the 'akka.cluster.seed-nodes'
|
// try to join one of the nodes defined in the 'akka.cluster.seed-nodes'
|
||||||
|
|
@ -797,7 +801,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
||||||
val newUnreachableMembers = localUnreachable filterNot { _.address == node }
|
val newUnreachableMembers = localUnreachable filterNot { _.address == node }
|
||||||
val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers)
|
val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers)
|
||||||
|
|
||||||
val newMembers = localMembers + Member(node, Joining) // add joining node as Joining
|
// add joining node as Joining
|
||||||
|
// add self in case someone else joins before self has joined (Set discards duplicates)
|
||||||
|
val newMembers = localMembers + Member(node, Joining) + Member(selfAddress, Joining)
|
||||||
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
|
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
|
||||||
|
|
||||||
val versionedGossip = newGossip :+ vclockNode
|
val versionedGossip = newGossip :+ vclockNode
|
||||||
|
|
@ -939,11 +945,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
||||||
if (!localGossip.overview.isNonDownUnreachable(from)) {
|
if (!localGossip.overview.isNonDownUnreachable(from)) {
|
||||||
|
|
||||||
val winningGossip =
|
val winningGossip =
|
||||||
if (isSingletonCluster(localState) && localGossip.overview.unreachable.isEmpty && remoteGossip.members.contains(self)) {
|
if (remoteGossip.version <> localGossip.version) {
|
||||||
// a fresh singleton cluster that is joining, no need to merge, use received gossip
|
|
||||||
remoteGossip
|
|
||||||
|
|
||||||
} else if (remoteGossip.version <> localGossip.version) {
|
|
||||||
// concurrent
|
// concurrent
|
||||||
val mergedGossip = remoteGossip merge localGossip
|
val mergedGossip = remoteGossip merge localGossip
|
||||||
val versionedMergedGossip = mergedGossip :+ vclockNode
|
val versionedMergedGossip = mergedGossip :+ vclockNode
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,9 @@ object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig {
|
||||||
val ordinary1 = role("ordinary1")
|
val ordinary1 = role("ordinary1")
|
||||||
val ordinary2 = role("ordinary2")
|
val ordinary2 = role("ordinary2")
|
||||||
|
|
||||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
commonConfig(debugConfig(on = false).
|
||||||
|
withFallback(ConfigFactory.parseString("akka.cluster.auto-join = on")).
|
||||||
|
withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||||
}
|
}
|
||||||
|
|
||||||
class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
|
class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ import akka.actor.RootActorPath
|
||||||
object MultiNodeClusterSpec {
|
object MultiNodeClusterSpec {
|
||||||
def clusterConfig: Config = ConfigFactory.parseString("""
|
def clusterConfig: Config = ConfigFactory.parseString("""
|
||||||
akka.cluster {
|
akka.cluster {
|
||||||
|
auto-join = off
|
||||||
auto-down = off
|
auto-down = off
|
||||||
gossip-interval = 200 ms
|
gossip-interval = 200 ms
|
||||||
heartbeat-interval = 400 ms
|
heartbeat-interval = 400 ms
|
||||||
|
|
@ -99,10 +100,15 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu
|
||||||
def cluster: Cluster = clusterNode
|
def cluster: Cluster = clusterNode
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Use this method instead of 'cluster.self'
|
* Use this method for the initial startup of the cluster node.
|
||||||
* for the initial startup of the cluster node.
|
|
||||||
*/
|
*/
|
||||||
def startClusterNode(): Unit = cluster.self
|
def startClusterNode(): Unit = {
|
||||||
|
if (cluster.latestGossip.members.isEmpty) {
|
||||||
|
cluster join myself
|
||||||
|
awaitCond(cluster.latestGossip.members.exists(_.address == address(myself)))
|
||||||
|
} else
|
||||||
|
cluster.self
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize the cluster with the specified member
|
* Initialize the cluster with the specified member
|
||||||
|
|
|
||||||
|
|
@ -16,6 +16,7 @@ object SingletonClusterMultiJvmSpec extends MultiNodeConfig {
|
||||||
commonConfig(debugConfig(on = false).
|
commonConfig(debugConfig(on = false).
|
||||||
withFallback(ConfigFactory.parseString("""
|
withFallback(ConfigFactory.parseString("""
|
||||||
akka.cluster {
|
akka.cluster {
|
||||||
|
auto-join = on
|
||||||
auto-down = on
|
auto-down = on
|
||||||
failure-detector.threshold = 4
|
failure-detector.threshold = 4
|
||||||
}
|
}
|
||||||
|
|
@ -38,12 +39,20 @@ abstract class SingletonClusterSpec
|
||||||
|
|
||||||
"A cluster of 2 nodes" must {
|
"A cluster of 2 nodes" must {
|
||||||
|
|
||||||
"not be singleton cluster when joined" taggedAs LongRunningTest in {
|
"become singleton cluster when started with 'auto-join=on' and 'seed-nodes=[]'" taggedAs LongRunningTest in {
|
||||||
|
startClusterNode()
|
||||||
|
awaitUpConvergence(1)
|
||||||
|
cluster.isSingletonCluster must be(true)
|
||||||
|
|
||||||
|
enterBarrier("after-1")
|
||||||
|
}
|
||||||
|
|
||||||
|
"not be singleton cluster when joined with other node" taggedAs LongRunningTest in {
|
||||||
awaitClusterUp(first, second)
|
awaitClusterUp(first, second)
|
||||||
cluster.isSingletonCluster must be(false)
|
cluster.isSingletonCluster must be(false)
|
||||||
assertLeader(first, second)
|
assertLeader(first, second)
|
||||||
|
|
||||||
enterBarrier("after-1")
|
enterBarrier("after-2")
|
||||||
}
|
}
|
||||||
|
|
||||||
"become singleton cluster when one node is shutdown" taggedAs LongRunningTest in {
|
"become singleton cluster when one node is shutdown" taggedAs LongRunningTest in {
|
||||||
|
|
@ -58,7 +67,7 @@ abstract class SingletonClusterSpec
|
||||||
assertLeader(first)
|
assertLeader(first)
|
||||||
}
|
}
|
||||||
|
|
||||||
enterBarrier("after-2")
|
enterBarrier("after-3")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,7 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig {
|
||||||
akka.cluster {
|
akka.cluster {
|
||||||
# FIXME remove this (use default) when ticket #2239 has been fixed
|
# FIXME remove this (use default) when ticket #2239 has been fixed
|
||||||
gossip-interval = 400 ms
|
gossip-interval = 400 ms
|
||||||
|
auto-join = off
|
||||||
}
|
}
|
||||||
akka.loglevel = INFO
|
akka.loglevel = INFO
|
||||||
"""))
|
"""))
|
||||||
|
|
|
||||||
|
|
@ -89,11 +89,13 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
|
||||||
|
|
||||||
"use the address of the remote transport" in {
|
"use the address of the remote transport" in {
|
||||||
cluster.selfAddress must be(selfAddress)
|
cluster.selfAddress must be(selfAddress)
|
||||||
cluster.self.address must be(selfAddress)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"initially be singleton cluster and reach convergence immediately" in {
|
"initially become singleton cluster when joining itself and reach convergence" in {
|
||||||
cluster.isSingletonCluster must be(true)
|
cluster.isSingletonCluster must be(false) // auto-join = off
|
||||||
|
cluster.join(selfAddress)
|
||||||
|
awaitCond(cluster.isSingletonCluster)
|
||||||
|
cluster.self.address must be(selfAddress)
|
||||||
cluster.latestGossip.members.map(_.address) must be(Set(selfAddress))
|
cluster.latestGossip.members.map(_.address) must be(Set(selfAddress))
|
||||||
memberStatus(selfAddress) must be(Some(MemberStatus.Joining))
|
memberStatus(selfAddress) must be(Some(MemberStatus.Joining))
|
||||||
cluster.convergence.isDefined must be(true)
|
cluster.convergence.isDefined must be(true)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue