diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index e626da6852..7d0e9408a1 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -14,6 +14,7 @@ import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, import akka.actor.OneForOneStrategy import akka.actor.Status.Failure import akka.actor.SupervisorStrategy.Stop +import akka.actor.Terminated import akka.event.EventStream import akka.pattern.ask import akka.util.Timeout @@ -72,6 +73,8 @@ private[cluster] object InternalClusterAction { * The node sends `InitJoin` to all seed nodes, which replies * with `InitJoinAck`. The first reply is used others are discarded. * The node sends `Join` command to the seed node that replied first. + * If a node is uninitialized it will reply to `InitJoin` with + * `InitJoinNack`. */ case object JoinSeedNode extends ClusterMessage @@ -85,6 +88,11 @@ private[cluster] object InternalClusterAction { */ case class InitJoinAck(address: Address) extends ClusterMessage + /** + * @see JoinSeedNode + */ + case class InitJoinNack(address: Address) extends ClusterMessage + /** * Marker interface for periodic tick messages */ @@ -229,6 +237,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto var stats = ClusterStats() + var seedNodeProcess: Option[ActorRef] = None + /** * Looks up and returns the remote cluster command connection for the specific address. */ @@ -270,7 +280,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto } def uninitialized: Actor.Receive = { - case InitJoin ⇒ // skip, not ready yet + case InitJoin ⇒ sender ! InitJoinNack(selfAddress) case JoinTo(address) ⇒ join(address) case JoinSeedNodes(seedNodes) ⇒ joinSeedNodes(seedNodes) case msg: SubscriptionMessage ⇒ publisher forward msg @@ -306,12 +316,18 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto def initJoin(): Unit = sender ! InitJoinAck(selfAddress) def joinSeedNodes(seedNodes: immutable.IndexedSeq[Address]): Unit = { - // only the node which is named first in the list of seed nodes will join itself - if (seedNodes.isEmpty || seedNodes.head == selfAddress) - self ! JoinTo(selfAddress) - else - context.actorOf(Props(new JoinSeedNodeProcess(seedNodes)). - withDispatcher(UseDispatcher), name = "joinSeedNodeProcess") + require(seedNodeProcess.isEmpty, "Join seed nodes is already in progress") + seedNodeProcess = + if (seedNodes.isEmpty || seedNodes == immutable.IndexedSeq(selfAddress)) { + self ! JoinTo(selfAddress) + None + } else if (seedNodes.head == selfAddress) { + Some(context.actorOf(Props(new FirstSeedNodeProcess(seedNodes)). + withDispatcher(UseDispatcher), name = "firstSeedNodeProcess")) + } else { + Some(context.actorOf(Props(new JoinSeedNodeProcess(seedNodes)). + withDispatcher(UseDispatcher), name = "joinSeedNodeProcess")) + } } /** @@ -320,12 +336,26 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto */ def join(address: Address): Unit = { if (address.protocol != selfAddress.protocol) - log.info("Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]", + log.warning("Trying to join member with wrong protocol, but was ignored, expected [{}] but was [{}]", selfAddress.protocol, address.protocol) else if (address.system != selfAddress.system) - log.info("Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]", + log.warning("Trying to join member with wrong ActorSystem name, but was ignored, expected [{}] but was [{}]", selfAddress.system, address.system) else if (!latestGossip.members.exists(_.address == address)) { + + // to support manual join when joining to seed nodes is stuck (no seed nodes available) + val snd = sender + seedNodeProcess match { + case Some(`snd`) ⇒ + // seedNodeProcess completed, it will stop itself + seedNodeProcess = None + case Some(s) ⇒ + // manual join, abort current seedNodeProcess + context stop s + seedNodeProcess = None + case None ⇒ // no seedNodeProcess in progress + } + // wipe our state since a node that joins a cluster must be empty latestGossip = Gossip.empty // wipe the failure detector since we are starting fresh and shouldn't care about the past @@ -347,37 +377,45 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto * State transition to JOINING - new node joining. */ def joining(node: Address): Unit = { - val localMembers = latestGossip.members - val localUnreachable = latestGossip.overview.unreachable + if (node.protocol != selfAddress.protocol) + log.warning("Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]", + selfAddress.protocol, node.protocol) + else if (node.system != selfAddress.system) + log.warning("Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]", + selfAddress.system, node.system) + else { + val localMembers = latestGossip.members + val localUnreachable = latestGossip.overview.unreachable - val alreadyMember = localMembers.exists(_.address == node) - val isUnreachable = latestGossip.overview.isNonDownUnreachable(node) + val alreadyMember = localMembers.exists(_.address == node) + val isUnreachable = latestGossip.overview.isNonDownUnreachable(node) - if (!alreadyMember && !isUnreachable) { - // remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster - val (rejoiningMember, newUnreachableMembers) = localUnreachable partition { _.address == node } - val newOverview = latestGossip.overview copy (unreachable = newUnreachableMembers) + if (!alreadyMember && !isUnreachable) { + // remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster + val (rejoiningMember, newUnreachableMembers) = localUnreachable partition { _.address == node } + val newOverview = latestGossip.overview copy (unreachable = newUnreachableMembers) - // remove the node from the failure detector if it is a DOWN node that is rejoining cluster - if (rejoiningMember.nonEmpty) failureDetector.remove(node) + // remove the node from the failure detector if it is a DOWN node that is rejoining cluster + if (rejoiningMember.nonEmpty) failureDetector.remove(node) - // add joining node as Joining - // add self in case someone else joins before self has joined (Set discards duplicates) - val newMembers = localMembers + Member(node, Joining) + Member(selfAddress, Joining) - val newGossip = latestGossip copy (overview = newOverview, members = newMembers) + // add joining node as Joining + // add self in case someone else joins before self has joined (Set discards duplicates) + val newMembers = localMembers + Member(node, Joining) + Member(selfAddress, Joining) + val newGossip = latestGossip copy (overview = newOverview, members = newMembers) - val versionedGossip = newGossip :+ vclockNode - val seenVersionedGossip = versionedGossip seen selfAddress + val versionedGossip = newGossip :+ vclockNode + val seenVersionedGossip = versionedGossip seen selfAddress - latestGossip = seenVersionedGossip + latestGossip = seenVersionedGossip - log.debug("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node) - // treat join as initial heartbeat, so that it becomes unavailable if nothing more happens - if (node != selfAddress) { - gossipTo(node) + log.debug("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node) + // treat join as initial heartbeat, so that it becomes unavailable if nothing more happens + if (node != selfAddress) { + gossipTo(node) + } + + publish(latestGossip) } - - publish(latestGossip) } } @@ -847,7 +885,64 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto /** * INTERNAL API. * - * Sends InitJoinAck to all seed nodes (except itself) and expect + * Used only for the first seed node. + * Sends InitJoin to all seed nodes (except itself). + * If other seed nodes are not part of the cluster yet they will reply with + * InitJoinNack or not respond at all and then the first seed node + * will join itself to initialize the new cluster. When the first + * seed node is restarted, and some other seed node is part of the cluster + * it will reply with InitJoinAck and then the first seed node will join + * that other seed node to join existing cluster. + */ +private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSeq[Address]) extends Actor with ActorLogging { + import InternalClusterAction._ + + val cluster = Cluster(context.system) + def selfAddress = cluster.selfAddress + + if (seedNodes.size <= 1 || seedNodes.head != selfAddress) + throw new IllegalArgumentException("Join seed node should not be done") + + val timeout = Deadline.now + cluster.settings.SeedNodeTimeout + + var remainingSeedNodes = seedNodes.toSet - selfAddress + + // retry until one ack, or all nack, or timeout + import context.dispatcher + val retryTask = cluster.scheduler.schedule(1.second, 1.second, self, JoinSeedNode) + self ! JoinSeedNode + + override def postStop(): Unit = retryTask.cancel() + + def receive = { + case JoinSeedNode ⇒ + if (timeout.hasTimeLeft) { + // send InitJoin to remaining seed nodes (except myself) + remainingSeedNodes foreach { a ⇒ context.actorFor(context.parent.path.toStringWithAddress(a)) ! InitJoin } + } else { + // no InitJoinAck received, initialize new cluster by joining myself + context.parent ! JoinTo(selfAddress) + context.stop(self) + } + case InitJoinAck(address) ⇒ + // first InitJoinAck reply, join existing cluster + context.parent ! JoinTo(address) + context.stop(self) + case InitJoinNack(address) ⇒ + remainingSeedNodes -= address + if (remainingSeedNodes.isEmpty) { + // initialize new cluster by joining myself when nacks from all other seed nodes + context.parent ! JoinTo(selfAddress) + context.stop(self) + } + } + +} + +/** + * INTERNAL API. + * + * Sends InitJoin to all seed nodes (except itself) and expect * InitJoinAck reply back. The seed node that replied first * will be used, joined to. InitJoinAck replies received after the * first one are ignored. @@ -890,6 +985,7 @@ private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq // first InitJoinAck reply context.parent ! JoinTo(address) context.become(done) + case InitJoinNack(_) ⇒ // that seed was uninitialized case ReceiveTimeout ⇒ // no InitJoinAck received, try again self ! JoinSeedNode diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala new file mode 100644 index 0000000000..4e103c9a39 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala @@ -0,0 +1,129 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.cluster + +import language.postfixOps +import scala.collection.immutable +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import scala.concurrent.duration._ +import akka.actor.Address +import akka.actor.ActorSystem +import akka.actor.Props +import akka.actor.Actor +import akka.actor.RootActorPath +import akka.cluster.MemberStatus._ + +object RestartFirstSeedNodeMultiJvmSpec extends MultiNodeConfig { + val seed1 = role("seed1") + val seed2 = role("seed2") + val seed3 = role("seed3") + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString(""" + akka.cluster { + auto-join = off + auto-down = on + } + """)).withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class RestartFirstSeedNodeMultiJvmNode1 extends RestartFirstSeedNodeSpec +class RestartFirstSeedNodeMultiJvmNode2 extends RestartFirstSeedNodeSpec +class RestartFirstSeedNodeMultiJvmNode3 extends RestartFirstSeedNodeSpec + +abstract class RestartFirstSeedNodeSpec + extends MultiNodeSpec(RestartFirstSeedNodeMultiJvmSpec) + with MultiNodeClusterSpec with ImplicitSender { + + import RestartFirstSeedNodeMultiJvmSpec._ + + @volatile var seedNode1Address: Address = _ + + // use a separate ActorSystem, to be able to simulate restart + lazy val seed1System = ActorSystem(system.name, system.settings.config) + + def missingSeed = address(seed3).copy(port = Some(61313)) + def seedNodes: immutable.IndexedSeq[Address] = Vector(seedNode1Address, seed2, seed3, missingSeed) + + lazy val restartedSeed1System = ActorSystem(system.name, + ConfigFactory.parseString("akka.remote.netty.tcp.port=" + seedNodes.head.port.get). + withFallback(system.settings.config)) + + override def afterAll(): Unit = { + runOn(seed1) { + if (seed1System.isTerminated) + restartedSeed1System.shutdown() + else + seed1System.shutdown() + } + super.afterAll() + } + + "Cluster seed nodes" must { + "be able to restart first seed node and join other seed nodes" taggedAs LongRunningTest in within(40 seconds) { + // seed1System is a separate ActorSystem, to be able to simulate restart + // we must transfer its address to seed2 and seed3 + runOn(seed2, seed3) { + system.actorOf(Props(new Actor { + def receive = { + case a: Address ⇒ + seedNode1Address = a + sender ! "ok" + } + }), name = "address-receiver") + enterBarrier("seed1-address-receiver-ready") + } + + runOn(seed1) { + enterBarrier("seed1-address-receiver-ready") + seedNode1Address = Cluster(seed1System).selfAddress + List(seed2, seed3) foreach { r ⇒ + system.actorFor(RootActorPath(r) / "user" / "address-receiver") ! seedNode1Address + expectMsg(5 seconds, "ok") + } + } + enterBarrier("seed1-address-transfered") + + // now we can join seed1System, seed2, seed3 together + runOn(seed1) { + Cluster(seed1System).joinSeedNodes(seedNodes) + awaitCond(Cluster(seed1System).readView.members.size == 3) + awaitCond(Cluster(seed1System).readView.members.forall(_.status == Up)) + } + runOn(seed2, seed3) { + cluster.joinSeedNodes(seedNodes) + awaitUpConvergence(3) + } + enterBarrier("started") + + // shutdown seed1System + runOn(seed1) { + seed1System.shutdown() + seed1System.awaitTermination(remaining) + } + runOn(seed2, seed3) { + awaitUpConvergence(2, canNotBePartOfMemberRing = Set(seedNodes.head)) + awaitCond(clusterView.unreachableMembers.exists(m ⇒ m.status == Down && m.address == seedNodes.head)) + } + enterBarrier("seed1-shutdown") + + // then start restartedSeed1System, which has the same address as seed1System + runOn(seed1) { + Cluster(restartedSeed1System).joinSeedNodes(seedNodes) + awaitCond(Cluster(restartedSeed1System).readView.members.size == 3) + awaitCond(Cluster(restartedSeed1System).readView.members.forall(_.status == Up)) + } + runOn(seed2, seed3) { + awaitUpConvergence(3) + } + enterBarrier("seed1-restarted") + + } + + } +} diff --git a/akka-docs/rst/cluster/cluster-usage-java.rst b/akka-docs/rst/cluster/cluster-usage-java.rst index 8f08dbaa7e..bd9bf2626d 100644 --- a/akka-docs/rst/cluster/cluster-usage-java.rst +++ b/akka-docs/rst/cluster/cluster-usage-java.rst @@ -118,16 +118,17 @@ sends a message to all seed nodes and then sends join command to the one that answers first. If no one of the seed nodes replied (might not be started yet) it retries this procedure until successful or shutdown. -There is one thing to be aware of regarding the seed node configured as the -first element in the ``seed-nodes`` configuration list. The seed nodes can be started in any order and it is not necessary to have all -seed nodes running, but the first seed node must be started when initially -starting a cluster, otherwise the other seed-nodes will not become initialized -and no other node can join the cluster. Once more than two seed nodes have been -started it is no problem to shut down the first seed node. If it goes down it -must be manually joined to the cluster again. -Automatic joining of the first seed node is not possible, it would only join -itself. It is only the first seed node that has this restriction. +seed nodes running, but the node configured as the first element in the ``seed-nodes`` +configuration list must be started when initially starting a cluster, otherwise the +other seed-nodes will not become initialized and no other node can join the cluster. +It is quickest to start all configured seed nodes at the same time (order doesn't matter), +otherwise it can take up to the configured ``seed-node-timeout`` until the nodes +can join. + +Once more than two seed nodes have been started it is no problem to shut down the first +seed node. If the first seed node is restarted it will first try join the other +seed nodes in the existing cluster. You can disable automatic joining with configuration:: diff --git a/akka-docs/rst/cluster/cluster-usage-scala.rst b/akka-docs/rst/cluster/cluster-usage-scala.rst index bc2e8b90a8..90b4d89437 100644 --- a/akka-docs/rst/cluster/cluster-usage-scala.rst +++ b/akka-docs/rst/cluster/cluster-usage-scala.rst @@ -96,16 +96,17 @@ sends a message to all seed nodes and then sends join command to the one that answers first. If no one of the seed nodes replied (might not be started yet) it retries this procedure until successful or shutdown. -There is one thing to be aware of regarding the seed node configured as the -first element in the ``seed-nodes`` configuration list. The seed nodes can be started in any order and it is not necessary to have all -seed nodes running, but the first seed node must be started when initially -starting a cluster, otherwise the other seed-nodes will not become initialized -and no other node can join the cluster. Once more than two seed nodes have been -started it is no problem to shut down the first seed node. If it goes down it -must be manually joined to the cluster again. -Automatic joining of the first seed node is not possible, it would only join -itself. It is only the first seed node that has this restriction. +seed nodes running, but the node configured as the first element in the ``seed-nodes`` +configuration list must be started when initially starting a cluster, otherwise the +other seed-nodes will not become initialized and no other node can join the cluster. +It is quickest to start all configured seed nodes at the same time (order doesn't matter), +otherwise it can take up to the configured ``seed-node-timeout`` until the nodes +can join. + +Once more than two seed nodes have been started it is no problem to shut down the first +seed node. If the first seed node is restarted it will first try join the other +seed nodes in the existing cluster. You can disable automatic joining with configuration::