Reintroduce 'seed' nodes, see #2219

* Implement the join to seed nodes process
  When a new node is started started it sends a message to all
  seed nodes and then sends join command to the one that answers
  first.
* Configuration of seed-nodes and auto-join
* New JoinSeedNodeSpec that verifies the auto join to seed nodes
* In tests seed nodes are configured by overriding seedNodes
  function, since addresses are not known before start
* Deputy nodes are the live members of the seed nodes (not sure if
  that will be the final solution, see ticket 2252
* Updated cluster.rst with latest info about deputy and seed nodes
This commit is contained in:
Patrik Nordwall 2012-06-21 10:58:35 +02:00
parent a7c8d7da10
commit 42078e7083
10 changed files with 166 additions and 45 deletions

View file

@ -8,9 +8,15 @@
akka { akka {
cluster { cluster {
# node to join - the full URI defined by a string on the form of "akka://system@hostname:port" # Initial contact points of the cluster. Nodes to join at startup if auto-join = on.
# leave as empty string if the node should be a singleton cluster # The seed nodes also play the role of deputy nodes (the nodes responsible
node-to-join = "" # for breaking network partitions).
# Comma separated full URIs defined by a string on the form of "akka://system@hostname:port"
# Leave as empty if the node should be a singleton cluster.
seed-nodes = []
# automatic join the seed-nodes at startup
auto-join = on
# should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN? # should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN?
auto-down = on auto-down = on

View file

@ -11,7 +11,7 @@ import akka.dispatch.Await
import akka.dispatch.MonitorableThreadFactory import akka.dispatch.MonitorableThreadFactory
import akka.event.Logging import akka.event.Logging
import akka.jsr166y.ThreadLocalRandom import akka.jsr166y.ThreadLocalRandom
import akka.pattern.ask import akka.pattern._
import akka.remote._ import akka.remote._
import akka.routing._ import akka.routing._
import akka.util._ import akka.util._
@ -55,11 +55,29 @@ sealed trait ClusterMessage extends Serializable
object ClusterUserAction { object ClusterUserAction {
/** /**
* Command to join the cluster. Sent when a node (reprsesented by 'address') * Command to join the cluster. Sent when a node (represented by 'address')
* wants to join another node (the receiver). * wants to join another node (the receiver).
*/ */
case class Join(address: Address) extends ClusterMessage case class Join(address: Address) extends ClusterMessage
/**
* Start message of the process to join one of the seed nodes.
* The node sends `InitJoin` to all seed nodes, which replies
* with `InitJoinAck`. The first reply is used others are discarded.
* The node sends `Join` command to the seed node that replied first.
*/
case object JoinSeedNode extends ClusterMessage
/**
* @see JoinSeedNode
*/
case object InitJoin extends ClusterMessage
/**
* @see JoinSeedNode
*/
case class InitJoinAck(address: Address) extends ClusterMessage
/** /**
* Command to leave the cluster. * Command to leave the cluster.
*/ */
@ -343,11 +361,28 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto
val log = Logging(context.system, this) val log = Logging(context.system, this)
def receive = { def receive = {
case Join(address) cluster.joining(address) case JoinSeedNode joinSeedNode()
case Down(address) cluster.downing(address) case InitJoin sender ! InitJoinAck(cluster.selfAddress)
case Leave(address) cluster.leaving(address) case InitJoinAck(address) cluster.join(address)
case Exit(address) cluster.exiting(address) case Join(address) cluster.joining(address)
case Remove(address) cluster.removing(address) case Down(address) cluster.downing(address)
case Leave(address) cluster.leaving(address)
case Exit(address) cluster.exiting(address)
case Remove(address) cluster.removing(address)
}
def joinSeedNode(): Unit = {
val seedRoutees = for (address cluster.seedNodes; if address != cluster.selfAddress)
yield self.path.toStringWithAddress(address)
if (seedRoutees.nonEmpty) {
// FIXME config of within (use JoinInProgressTimeout when that is in master)
implicit val within = Timeout(5 seconds)
val seedRouter = context.actorOf(
Props.empty.withRouter(ScatterGatherFirstCompletedRouter(
routees = seedRoutees, within = within.duration)))
seedRouter ? InitJoin pipeTo self
seedRouter ! PoisonPill
}
} }
override def unhandled(unknown: Any) = log.error("Illegal command [{}]", unknown) override def unhandled(unknown: Any) = log.error("Illegal command [{}]", unknown)
@ -479,8 +514,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout) implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout)
private val nodeToJoin: Option[Address] = NodeToJoin filter (_ != selfAddress)
private val serialization = remote.serialization private val serialization = remote.serialization
private val _isRunning = new AtomicBoolean(true) private val _isRunning = new AtomicBoolean(true)
@ -507,8 +540,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
new AtomicReference[State](State(seenVersionedGossip)) new AtomicReference[State](State(seenVersionedGossip))
} }
// try to join the node defined in the 'akka.cluster.node-to-join' option // try to join one of the nodes defined in the 'akka.cluster.seed-nodes'
autoJoin() if (AutoJoin) joinSeedNode()
// ======================================================== // ========================================================
// ===================== WORK DAEMONS ===================== // ===================== WORK DAEMONS =====================
@ -927,9 +960,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
private[cluster] def receiveHeartbeat(from: Address): Unit = failureDetector heartbeat from private[cluster] def receiveHeartbeat(from: Address): Unit = failureDetector heartbeat from
/** /**
* Joins the pre-configured contact point. * Joins the pre-configured contact points.
*/ */
private def autoJoin(): Unit = nodeToJoin foreach join private def joinSeedNode(): Unit = clusterCommandDaemon ! ClusterUserAction.JoinSeedNode
/** /**
* INTERNAL API. * INTERNAL API.
@ -999,6 +1032,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
// 1. gossip to alive members // 1. gossip to alive members
val gossipedToAlive = gossipToRandomNodeOf(localMemberAddresses) val gossipedToAlive = gossipToRandomNodeOf(localMemberAddresses)
// FIXME does this work as intended? See ticket #2252
// 2. gossip to unreachable members // 2. gossip to unreachable members
if (localUnreachableSize > 0) { if (localUnreachableSize > 0) {
val probability = gossipToUnreachableProbablity(localMembersSize, localUnreachableSize) val probability = gossipToUnreachableProbablity(localMembersSize, localUnreachableSize)
@ -1006,11 +1040,12 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
gossipToRandomNodeOf(localUnreachableMembers.map(_.address)) gossipToRandomNodeOf(localUnreachableMembers.map(_.address))
} }
// FIXME does this work as intended? See ticket #2252
// 3. gossip to a deputy nodes for facilitating partition healing // 3. gossip to a deputy nodes for facilitating partition healing
val deputies = deputyNodes(localMemberAddresses) val deputies = deputyNodes(localMemberAddresses)
val alreadyGossipedToDeputy = gossipedToAlive.map(deputies.contains(_)).getOrElse(false) val alreadyGossipedToDeputy = gossipedToAlive.map(deputies.contains(_)).getOrElse(false)
if ((!alreadyGossipedToDeputy || localMembersSize < NrOfDeputyNodes) && deputies.nonEmpty) { if ((!alreadyGossipedToDeputy || localMembersSize < seedNodes.size) && deputies.nonEmpty) {
val probability = gossipToDeputyProbablity(localMembersSize, localUnreachableSize, NrOfDeputyNodes) val probability = gossipToDeputyProbablity(localMembersSize, localUnreachableSize, seedNodes.size)
if (ThreadLocalRandom.current.nextDouble() < probability) if (ThreadLocalRandom.current.nextDouble() < probability)
gossipToRandomNodeOf(deputies) gossipToRandomNodeOf(deputies)
} }
@ -1337,7 +1372,15 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
* Gets the addresses of a all the 'deputy' nodes - excluding this node if part of the group. * Gets the addresses of a all the 'deputy' nodes - excluding this node if part of the group.
*/ */
private def deputyNodes(addresses: IndexedSeq[Address]): IndexedSeq[Address] = private def deputyNodes(addresses: IndexedSeq[Address]): IndexedSeq[Address] =
addresses drop 1 take NrOfDeputyNodes filterNot (_ == selfAddress) addresses filterNot (_ == selfAddress) intersect seedNodes
/**
* INTERNAL API.
*
* Make it possible to override/configure seedNodes from tests without
* specifying in config. Addresses are unknown before startup time.
*/
private[cluster] def seedNodes: IndexedSeq[Address] = SeedNodes
/** /**
* INTERNAL API. * INTERNAL API.

View file

@ -22,17 +22,16 @@ class ClusterSettings(val config: Config, val systemName: String) {
final val FailureDetectorAcceptableHeartbeatPause: Duration = final val FailureDetectorAcceptableHeartbeatPause: Duration =
Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS) Duration(getMilliseconds("akka.cluster.failure-detector.acceptable-heartbeat-pause"), MILLISECONDS)
final val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match { final val SeedNodes: IndexedSeq[Address] = getStringList("akka.cluster.seed-nodes").asScala.map {
case "" None case AddressFromURIString(addr) addr
case AddressFromURIString(addr) Some(addr) }.toIndexedSeq
}
final val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS) final val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS)
final val GossipInterval = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS) final val GossipInterval = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS)
final val HeartbeatInterval = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS) final val HeartbeatInterval = Duration(getMilliseconds("akka.cluster.heartbeat-interval"), MILLISECONDS)
final val LeaderActionsInterval = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS) final val LeaderActionsInterval = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS)
final val UnreachableNodesReaperInterval = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS) final val UnreachableNodesReaperInterval = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS)
final val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons") final val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons")
final val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes") final val AutoJoin = getBoolean("akka.cluster.auto-join")
final val AutoDown = getBoolean("akka.cluster.auto-down") final val AutoDown = getBoolean("akka.cluster.auto-down")
final val SchedulerTickDuration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS) final val SchedulerTickDuration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS)
final val SchedulerTicksPerWheel = getInt("akka.cluster.scheduler.ticks-per-wheel") final val SchedulerTicksPerWheel = getInt("akka.cluster.scheduler.ticks-per-wheel")

View file

@ -0,0 +1,46 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfter
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.util.duration._
object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig {
val seed1 = role("seed1")
val seed2 = role("seed2")
val ordinary1 = role("ordinary1")
val ordinary2 = role("ordinary2")
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
}
class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
class JoinSeedNodeMultiJvmNode2 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
class JoinSeedNodeMultiJvmNode3 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
class JoinSeedNodeMultiJvmNode4 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy
abstract class JoinSeedNodeSpec
extends MultiNodeSpec(JoinSeedNodeMultiJvmSpec)
with MultiNodeClusterSpec {
import JoinSeedNodeMultiJvmSpec._
override def seedNodes = IndexedSeq(seed1, seed2)
"A cluster with configured seed nodes" must {
"join the seed nodes at startup" taggedAs LongRunningTest in {
startClusterNode()
enterBarrier("all-started")
awaitUpConvergence(4)
enterBarrier("after")
}
}
}

View file

@ -26,7 +26,6 @@ object MultiNodeClusterSpec {
leader-actions-interval = 200 ms leader-actions-interval = 200 ms
unreachable-nodes-reaper-interval = 200 ms unreachable-nodes-reaper-interval = 200 ms
periodic-tasks-initial-delay = 300 ms periodic-tasks-initial-delay = 300 ms
nr-of-deputy-nodes = 2
} }
akka.test { akka.test {
single-expect-default = 5 s single-expect-default = 5 s
@ -77,10 +76,22 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu
throw t throw t
} }
/**
* Make it possible to override/configure seedNodes from tests without
* specifying in config. Addresses are unknown before startup time.
*/
protected def seedNodes: IndexedSeq[RoleName] = IndexedSeq.empty
/** /**
* The cluster node instance. Needs to be lazily created. * The cluster node instance. Needs to be lazily created.
*/ */
private lazy val clusterNode = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) private lazy val clusterNode = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) {
override def seedNodes: IndexedSeq[Address] = {
val testSeedNodes = MultiNodeClusterSpec.this.seedNodes
if (testSeedNodes.isEmpty) super.seedNodes
else testSeedNodes map address
}
}
/** /**
* Get the cluster node to use. * Get the cluster node to use.

View file

@ -36,6 +36,8 @@ abstract class NodeJoinSpec
startClusterNode() startClusterNode()
} }
enterBarrier("first-started")
runOn(second) { runOn(second) {
cluster.join(first) cluster.join(first)
} }

View file

@ -21,7 +21,6 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString(""" commonConfig(ConfigFactory.parseString("""
akka.cluster { akka.cluster {
nr-of-deputy-nodes = 0
# FIXME remove this (use default) when ticket #2239 has been fixed # FIXME remove this (use default) when ticket #2239 has been fixed
gossip-interval = 400 ms gossip-interval = 400 ms
} }

View file

@ -21,14 +21,14 @@ class ClusterConfigSpec extends AkkaSpec {
FailureDetectorImplementationClass must be(classOf[AccrualFailureDetector].getName) FailureDetectorImplementationClass must be(classOf[AccrualFailureDetector].getName)
FailureDetectorMinStdDeviation must be(100 millis) FailureDetectorMinStdDeviation must be(100 millis)
FailureDetectorAcceptableHeartbeatPause must be(3 seconds) FailureDetectorAcceptableHeartbeatPause must be(3 seconds)
NodeToJoin must be(None) SeedNodes must be(Seq.empty[String])
PeriodicTasksInitialDelay must be(1 seconds) PeriodicTasksInitialDelay must be(1 seconds)
GossipInterval must be(1 second) GossipInterval must be(1 second)
HeartbeatInterval must be(1 second) HeartbeatInterval must be(1 second)
LeaderActionsInterval must be(1 second) LeaderActionsInterval must be(1 second)
UnreachableNodesReaperInterval must be(1 second) UnreachableNodesReaperInterval must be(1 second)
NrOfGossipDaemons must be(4) NrOfGossipDaemons must be(4)
NrOfDeputyNodes must be(3) AutoJoin must be(true)
AutoDown must be(true) AutoDown must be(true)
SchedulerTickDuration must be(33 millis) SchedulerTickDuration must be(33 millis)
SchedulerTicksPerWheel must be(512) SchedulerTicksPerWheel must be(512)

View file

@ -11,12 +11,13 @@ import akka.actor.ExtendedActorSystem
import akka.actor.Address import akka.actor.Address
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import org.scalatest.BeforeAndAfter import org.scalatest.BeforeAndAfter
import akka.remote.RemoteActorRefProvider
object ClusterSpec { object ClusterSpec {
val config = """ val config = """
akka.cluster { akka.cluster {
auto-join = off
auto-down = off auto-down = off
nr-of-deputy-nodes = 3
periodic-tasks-initial-delay = 120 seconds // turn off scheduled tasks periodic-tasks-initial-delay = 120 seconds // turn off scheduled tasks
} }
akka.actor.provider = "akka.remote.RemoteActorRefProvider" akka.actor.provider = "akka.remote.RemoteActorRefProvider"
@ -31,12 +32,24 @@ object ClusterSpec {
class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
import ClusterSpec._ import ClusterSpec._
val selfAddress = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.address
val addresses = IndexedSeq(
selfAddress,
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 1),
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 2),
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 3),
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 4),
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 5))
val deterministicRandom = new AtomicInteger val deterministicRandom = new AtomicInteger
val failureDetector = new FailureDetectorPuppet(system) val failureDetector = new FailureDetectorPuppet(system)
val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) { val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) {
// 3 deputy nodes (addresses index 1, 2, 3)
override def seedNodes = addresses.slice(1, 4)
override def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] = { override def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] = {
if (addresses.isEmpty) None if (addresses.isEmpty) None
else Some(addresses.toSeq(deterministicRandom.getAndIncrement % addresses.size)) else Some(addresses.toSeq(deterministicRandom.getAndIncrement % addresses.size))
@ -68,15 +81,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
} }
val selfAddress = cluster.self.address
val addresses = IndexedSeq(
selfAddress,
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 1),
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 2),
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 3),
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 4),
Address("akka", system.name, selfAddress.host.get, selfAddress.port.get + 5))
def memberStatus(address: Address): Option[MemberStatus] = def memberStatus(address: Address): Option[MemberStatus] =
cluster.latestGossip.members.collectFirst { case m if m.address == address m.status } cluster.latestGossip.members.collectFirst { case m if m.address == address m.status }
@ -89,6 +93,11 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
"A Cluster" must { "A Cluster" must {
"use the address of the remote transport" in {
cluster.selfAddress must be(selfAddress)
cluster.self.address must be(selfAddress)
}
"initially be singleton cluster and reach convergence immediately" in { "initially be singleton cluster and reach convergence immediately" in {
cluster.isSingletonCluster must be(true) cluster.isSingletonCluster must be(true)
cluster.latestGossip.members.map(_.address) must be(Set(selfAddress)) cluster.latestGossip.members.map(_.address) must be(Set(selfAddress))
@ -161,7 +170,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
"gossip to duputy node" in { "gossip to duputy node" in {
cluster._gossipToDeputyProbablity = 1.0 // always cluster._gossipToDeputyProbablity = 1.0 // always
// we have configured 2 deputy nodes // we have configured 3 deputy nodes (seedNodes)
cluster.gossip() // 1 is deputy cluster.gossip() // 1 is deputy
cluster.gossip() // 2 is deputy cluster.gossip() // 2 is deputy
cluster.gossip() // 3 is deputy cluster.gossip() // 3 is deputy

View file

@ -183,14 +183,20 @@ according to the Failure Detector is considered unreachable. This means setting
the unreachable node status to ``down`` automatically. the unreachable node status to ``down`` automatically.
Seed Nodes
^^^^^^^^^^
The seed nodes are configured contact points for inital join of the cluster.
When a new node is started started it sends a message to all seed nodes and
then sends join command to the one that answers first.
It is possible to turn off automatic join.
Deputy Nodes Deputy Nodes
^^^^^^^^^^^^ ^^^^^^^^^^^^
After gossip convergence a set of ``deputy`` nodes for the cluster can be The deputy nodes are the live members of the configured seed nodes.
determined. As with the ``leader``, there is no ``deputy`` election process, It is preferred to use deputy nodes in different racks/data centers.
the deputies can always be recognised deterministically by any node whenever there
is gossip convergence. The list of ``deputy`` nodes is simply the N - 1 number
of nodes (e.g. starting with the first node after the ``leader``) in sorted order.
The nodes defined as ``deputy`` nodes are just regular member nodes whose only The nodes defined as ``deputy`` nodes are just regular member nodes whose only
"special role" is to help breaking logical partitions as seen in the gossip "special role" is to help breaking logical partitions as seen in the gossip