diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 4da4dd6620..d226506acc 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -18,7 +18,8 @@ akka { # how long to wait for one of the seed nodes to reply to initial join request seed-node-timeout = 5s - # automatic join the seed-nodes at startup + # Automatic join the seed-nodes at startup. + # If seed-nodes is empty it will join itself and become a single node cluster. auto-join = on # should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN? diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index caecf3906b..3eddb5bf60 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -364,20 +364,23 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto val log = Logging(context.system, this) def receive = { - case JoinSeedNode ⇒ joinSeedNode() - case InitJoin ⇒ sender ! InitJoinAck(cluster.selfAddress) - case InitJoinAck(address) ⇒ cluster.join(address) - case Join(address) ⇒ cluster.joining(address) - case Down(address) ⇒ cluster.downing(address) - case Leave(address) ⇒ cluster.leaving(address) - case Exit(address) ⇒ cluster.exiting(address) - case Remove(address) ⇒ cluster.removing(address) + case JoinSeedNode ⇒ joinSeedNode() + case InitJoin ⇒ sender ! InitJoinAck(cluster.selfAddress) + case InitJoinAck(address) ⇒ cluster.join(address) + case Join(address) ⇒ cluster.joining(address) + case Down(address) ⇒ cluster.downing(address) + case Leave(address) ⇒ cluster.leaving(address) + case Exit(address) ⇒ cluster.exiting(address) + case Remove(address) ⇒ cluster.removing(address) + case Failure(e: AskTimeoutException) ⇒ joinSeedNodeTimeout() } def joinSeedNode(): Unit = { val seedRoutees = for (address ← cluster.seedNodes; if address != cluster.selfAddress) yield self.path.toStringWithAddress(address) - if (seedRoutees.nonEmpty) { + if (seedRoutees.isEmpty) { + cluster join cluster.selfAddress + } else { implicit val within = Timeout(cluster.clusterSettings.SeedNodeTimeout) val seedRouter = context.actorOf( Props.empty.withRouter(ScatterGatherFirstCompletedRouter( @@ -387,6 +390,8 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto } } + def joinSeedNodeTimeout(): Unit = cluster join cluster.selfAddress + override def unhandled(unknown: Any) = log.error("Illegal command [{}]", unknown) } @@ -534,10 +539,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } private val state = { - val member = Member(selfAddress, Joining) - val versionedGossip = Gossip(members = Gossip.emptyMembers + member) :+ vclockNode // add me as member and update my vector clock - val seenVersionedGossip = versionedGossip seen selfAddress - new AtomicReference[State](State(seenVersionedGossip)) + // note that self is not initially member, + // and the Gossip is not versioned for this 'Node' yet + new AtomicReference[State](State(Gossip(members = Gossip.emptyMembers))) } // try to join one of the nodes defined in the 'akka.cluster.seed-nodes' @@ -797,7 +801,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val newUnreachableMembers = localUnreachable filterNot { _.address == node } val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers) - val newMembers = localMembers + Member(node, Joining) // add joining node as Joining + // add joining node as Joining + // add self in case someone else joins before self has joined (Set discards duplicates) + val newMembers = localMembers + Member(node, Joining) + Member(selfAddress, Joining) val newGossip = localGossip copy (overview = newOverview, members = newMembers) val versionedGossip = newGossip :+ vclockNode @@ -939,11 +945,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) if (!localGossip.overview.isNonDownUnreachable(from)) { val winningGossip = - if (isSingletonCluster(localState) && localGossip.overview.unreachable.isEmpty && remoteGossip.members.contains(self)) { - // a fresh singleton cluster that is joining, no need to merge, use received gossip - remoteGossip - - } else if (remoteGossip.version <> localGossip.version) { + if (remoteGossip.version <> localGossip.version) { // concurrent val mergedGossip = remoteGossip merge localGossip val versionedMergedGossip = mergedGossip :+ vclockNode diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala index bd161a435c..20dec26a45 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala @@ -16,7 +16,9 @@ object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig { val ordinary1 = role("ordinary1") val ordinary2 = role("ordinary2") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString("akka.cluster.auto-join = on")). + withFallback(MultiNodeClusterSpec.clusterConfig)) } class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 79e3a67e1e..ed95013bf4 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -20,6 +20,7 @@ import akka.actor.RootActorPath object MultiNodeClusterSpec { def clusterConfig: Config = ConfigFactory.parseString(""" akka.cluster { + auto-join = off auto-down = off gossip-interval = 200 ms heartbeat-interval = 400 ms @@ -99,10 +100,15 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu def cluster: Cluster = clusterNode /** - * Use this method instead of 'cluster.self' - * for the initial startup of the cluster node. + * Use this method for the initial startup of the cluster node. */ - def startClusterNode(): Unit = cluster.self + def startClusterNode(): Unit = { + if (cluster.latestGossip.members.isEmpty) { + cluster join myself + awaitCond(cluster.latestGossip.members.exists(_.address == address(myself))) + } else + cluster.self + } /** * Initialize the cluster with the specified member diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala index 9f79af2f13..3c35e95333 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala @@ -16,6 +16,7 @@ object SingletonClusterMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString(""" akka.cluster { + auto-join = on auto-down = on failure-detector.threshold = 4 } @@ -38,12 +39,20 @@ abstract class SingletonClusterSpec "A cluster of 2 nodes" must { - "not be singleton cluster when joined" taggedAs LongRunningTest in { + "become singleton cluster when started with 'auto-join=on' and 'seed-nodes=[]'" taggedAs LongRunningTest in { + startClusterNode() + awaitUpConvergence(1) + cluster.isSingletonCluster must be(true) + + enterBarrier("after-1") + } + + "not be singleton cluster when joined with other node" taggedAs LongRunningTest in { awaitClusterUp(first, second) cluster.isSingletonCluster must be(false) assertLeader(first, second) - enterBarrier("after-1") + enterBarrier("after-2") } "become singleton cluster when one node is shutdown" taggedAs LongRunningTest in { @@ -58,7 +67,7 @@ abstract class SingletonClusterSpec assertLeader(first) } - enterBarrier("after-2") + enterBarrier("after-3") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index d661f0cc51..3be082d2f3 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -25,6 +25,7 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { akka.cluster { # FIXME remove this (use default) when ticket #2239 has been fixed gossip-interval = 400 ms + auto-join = off } akka.loglevel = INFO """)) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 1e6c8d390f..302da5fc7f 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -89,11 +89,13 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { "use the address of the remote transport" in { cluster.selfAddress must be(selfAddress) - cluster.self.address must be(selfAddress) } - "initially be singleton cluster and reach convergence immediately" in { - cluster.isSingletonCluster must be(true) + "initially become singleton cluster when joining itself and reach convergence" in { + cluster.isSingletonCluster must be(false) // auto-join = off + cluster.join(selfAddress) + awaitCond(cluster.isSingletonCluster) + cluster.self.address must be(selfAddress) cluster.latestGossip.members.map(_.address) must be(Set(selfAddress)) memberStatus(selfAddress) must be(Some(MemberStatus.Joining)) cluster.convergence.isDefined must be(true)