diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index b23c0f2108..2cd6dca1c5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -72,15 +72,20 @@ private[cluster] object InternalClusterAction { */ case class InitJoinAck(address: Address) extends ClusterMessage - case object GossipTick + /** + * Marker interface for periodic tick messages + */ + trait Tick - case object HeartbeatTick + case object GossipTick extends Tick - case object ReapUnreachableTick + case object HeartbeatTick extends Tick - case object LeaderActionsTick + case object ReapUnreachableTick extends Tick - case object PublishStateTick + case object LeaderActionsTick extends Tick + + case object PublishStateTick extends Tick case class SendClusterMessage(to: Address, msg: ClusterMessage) @@ -223,7 +228,16 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) publishStateTask foreach { _.cancel() } } - def receive = { + def uninitialized: Actor.Receive = { + case JoinSeedNode ⇒ joinSeedNode() + case InitJoin ⇒ // skip, not ready yet + case InitJoinAck(address) ⇒ join(address) + case JoinTo(address) ⇒ join(address) + case Failure(e: AskTimeoutException) ⇒ joinSeedNodeTimeout() + case _: Tick ⇒ // ignore periodic tasks until initialized + } + + def initialized: Actor.Receive = { case msg: GossipEnvelope ⇒ receiveGossip(msg) case msg: GossipMergeConflict ⇒ receiveGossipMerge(msg) case GossipTick ⇒ gossip() @@ -231,10 +245,7 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) case ReapUnreachableTick ⇒ reapUnreachableMembers() case LeaderActionsTick ⇒ leaderActions() case PublishStateTick ⇒ publishState() - case JoinSeedNode ⇒ joinSeedNode() case InitJoin ⇒ initJoin() - case InitJoinAck(address) ⇒ join(address) - case Failure(e: AskTimeoutException) ⇒ joinSeedNodeTimeout() case JoinTo(address) ⇒ join(address) case ClusterUserAction.Join(address) ⇒ joining(address) case ClusterUserAction.Down(address) ⇒ downing(address) @@ -246,10 +257,14 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) } + def receive = uninitialized + def joinSeedNode(): Unit = { - val seedRoutees = environment.seedNodes.collect { case a if a != selfAddress ⇒ self.path.toStringWithAddress(a) } - if (seedRoutees.isEmpty) join(selfAddress) + // only the node which is named first in the list of seed nodes will join itself + if (environment.seedNodes.isEmpty || environment.seedNodes.head == selfAddress) + join(selfAddress) else { + val seedRoutees = environment.seedNodes.collect { case a if a != selfAddress ⇒ self.path.toStringWithAddress(a) } implicit val within = Timeout(SeedNodeTimeout) val seedRouter = context.actorOf(Props.empty.withRouter(ScatterGatherFirstCompletedRouter(routees = seedRoutees, within = within.duration))) seedRouter ! InitJoin @@ -259,7 +274,10 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) def initJoin(): Unit = sender ! InitJoinAck(selfAddress) - def joinSeedNodeTimeout(): Unit = join(selfAddress) + def joinSeedNodeTimeout(): Unit = { + // try again later, first seed node must be started before other seed nodes can join + clusterScheduler.scheduleOnce(SeedNodeTimeout, self, JoinSeedNode) + } /** * Try to join this cluster node with the node specified by 'address'. @@ -276,7 +294,11 @@ private[cluster] final class ClusterCoreDaemon(environment: ClusterEnvironment) notifyListeners(localGossip) - coreSender ! SendClusterMessage(address, ClusterUserAction.Join(selfAddress)) + context.become(initialized) + if (address == selfAddress) + joining(address) + else + coreSender ! SendClusterMessage(address, ClusterUserAction.Join(selfAddress)) } /** diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala index f71ebe3cc3..5dcb091960 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala @@ -13,6 +13,7 @@ import scala.concurrent.util.duration._ object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig { val seed1 = role("seed1") val seed2 = role("seed2") + val seed3 = role("seed3") val ordinary1 = role("ordinary1") val ordinary2 = role("ordinary2") @@ -25,6 +26,7 @@ class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec with FailureDetectorPup class JoinSeedNodeMultiJvmNode2 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy class JoinSeedNodeMultiJvmNode3 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy class JoinSeedNodeMultiJvmNode4 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy +class JoinSeedNodeMultiJvmNode5 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy abstract class JoinSeedNodeSpec extends MultiNodeSpec(JoinSeedNodeMultiJvmSpec) @@ -32,37 +34,22 @@ abstract class JoinSeedNodeSpec import JoinSeedNodeMultiJvmSpec._ - override def seedNodes = IndexedSeq(seed1, seed2) + override def seedNodes = IndexedSeq(seed3, seed2, seed1) "A cluster with configured seed nodes" must { - "start the seed nodes sequentially" taggedAs LongRunningTest in { + "be able to start the seed nodes concurrently" taggedAs LongRunningTest in { // without looking up the addresses first there might be // [akka://JoinSeedNodeSpec/user/TestConductorClient] cannot write GetAddress(RoleName(seed2)) while waiting for seed1 - roles foreach address + // roles foreach address - runOn(seed1) { - startClusterNode() - } - enterBarrier("seed1-started") - - runOn(seed2) { - startClusterNode() - } - enterBarrier("seed2-started") - - runOn(seed1, seed2) { - awaitUpConvergence(2) + runOn(seed1, seed2, seed3) { + awaitUpConvergence(3) } enterBarrier("after-1") } "join the seed nodes at startup" taggedAs LongRunningTest in { - - startClusterNode() - enterBarrier("all-started") - - awaitUpConvergence(4) - + awaitUpConvergence(roles.size) enterBarrier("after-2") } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 2362da8aef..f7b0352c42 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -22,7 +22,7 @@ import akka.actor.RootActorPath object MultiNodeClusterSpec { def clusterConfig: Config = ConfigFactory.parseString(""" akka.cluster { - auto-join = off + auto-join = on auto-down = off gossip-interval = 200 ms heartbeat-interval = 400 ms