diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index fafd58ab2d..b2f6014047 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -285,6 +285,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with case InitJoin ⇒ sender ! InitJoinNack(selfAddress) case ClusterUserAction.JoinTo(address) ⇒ join(address) case JoinSeedNodes(seedNodes) ⇒ joinSeedNodes(seedNodes) + case Join(node, roles) ⇒ joiningUninitialized(node, roles) case msg: SubscriptionMessage ⇒ publisher forward msg } @@ -292,25 +293,34 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with case Welcome(from, gossip) ⇒ welcome(joinWith, from, gossip) case InitJoin ⇒ sender ! InitJoinNack(selfAddress) case ClusterUserAction.JoinTo(address) ⇒ - context.become(uninitialized) + becomeUninitialized() join(address) case JoinSeedNodes(seedNodes) ⇒ - context.become(uninitialized) + becomeUninitialized() joinSeedNodes(seedNodes) + case Join(node, roles) ⇒ joiningUninitialized(node, roles) case msg: SubscriptionMessage ⇒ publisher forward msg case _: Tick ⇒ if (deadline.exists(_.isOverdue)) { - context.become(uninitialized) + becomeUninitialized() if (SeedNodes.nonEmpty) joinSeedNodes(SeedNodes) else join(joinWith) } } + def becomeUninitialized(): Unit = { + // make sure that join process is stopped + stopSeedNodeProcess() + context.become(uninitialized) + } + def becomeInitialized(): Unit = { // start heartbeatSender here, and not in constructor to make sure that // heartbeating doesn't start before Welcome is received context.actorOf(Props[ClusterHeartbeatSender]. withDispatcher(UseDispatcher), name = "heartbeatSender") + // make sure that join process is stopped + stopSeedNodeProcess() context.become(initialized) } @@ -351,8 +361,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with def initJoin(): Unit = sender ! InitJoinAck(selfAddress) def joinSeedNodes(seedNodes: immutable.IndexedSeq[Address]): Unit = { - require(seedNodeProcess.isEmpty, "Join seed nodes is already in progress") if (seedNodes.nonEmpty) { + stopSeedNodeProcess() seedNodeProcess = if (seedNodes == immutable.IndexedSeq(selfAddress)) { self ! ClusterUserAction.JoinTo(selfAddress) @@ -384,17 +394,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with require(latestGossip.members.isEmpty, "Join can only be done from empty state") // to support manual join when joining to seed nodes is stuck (no seed nodes available) - val snd = sender - seedNodeProcess match { - case Some(`snd`) ⇒ - // seedNodeProcess completed, it will stop itself - seedNodeProcess = None - case Some(s) ⇒ - // manual join, abort current seedNodeProcess - context stop s - seedNodeProcess = None - case None ⇒ // no seedNodeProcess in progress - } + stopSeedNodeProcess() if (address == selfAddress) { becomeInitialized() @@ -410,6 +410,16 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with } } + def stopSeedNodeProcess(): Unit = { + seedNodeProcess match { + case Some(s) ⇒ + // manual join, abort current seedNodeProcess + context stop s + seedNodeProcess = None + case None ⇒ // no seedNodeProcess in progress + } + } + /** * State transition to JOINING - new node joining. * Received `Join` message and replies with `Welcome` message, containing @@ -456,6 +466,16 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with } } + /** + * Another node is joining when this node is uninitialized. + */ + def joiningUninitialized(node: UniqueAddress, roles: Set[String]): Unit = { + require(latestGossip.members.isEmpty, "Joining an uninitialized node can only be done from empty state") + joining(node, roles) + if (latestGossip.hasMember(selfUniqueAddress)) + becomeInitialized() + } + /** * Reply from Join request. */ diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala index 43ca4fae3e..613a36ca5e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala @@ -34,7 +34,16 @@ abstract class NodeUpSpec "A cluster node that is joining another cluster" must { "be moved to UP by the leader after a convergence" taggedAs LongRunningTest in { - awaitClusterUp(first, second) + // it should be possible to join an uninitialized node + // test race on purpose + runOn(first) { + cluster.join(second) + } + runOn(second) { + cluster.join(first) + } + + awaitMembersUp(2) enterBarrier("after-1") }