diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index b3e90fac21..3df6dd3774 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -8,13 +8,9 @@ akka { cluster { - join { - # contact point on the form of "hostname:port" of a node to try to join - # leave as empty string if the node should be a singleton cluster - contact-point = "" - timeout = 30s - max-time-to-retry = 30s - } + # node to join - the full URI defined by a string on the form of "akka://system@hostname:port" + # leave as empty string if the node should be a singleton cluster + node-to-join = "" gossip { initialDelay = 5s diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 3e709cee49..f4d57bf1f6 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -13,19 +13,12 @@ import akka.actor.AddressFromURIString class ClusterSettings(val config: Config, val systemName: String) { import config._ - // cluster config section val FailureDetectorThreshold = getInt("akka.cluster.failure-detector.threshold") val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size") - - // join config - val JoinContactPoint: Option[Address] = getString("akka.cluster.join.contact-point") match { + val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match { case "" ⇒ None case AddressExtractor(addr) ⇒ Some(addr) } - val JoinTimeout = Duration(config.getMilliseconds("akka.cluster.join.timeout"), MILLISECONDS) - val JoinMaxTimeToRetry = Duration(config.getMilliseconds("akka.cluster.join.max-time-to-retry"), MILLISECONDS) - - // gossip config val GossipInitialDelay = Duration(getMilliseconds("akka.cluster.gossip.initialDelay"), MILLISECONDS) val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip.frequency"), MILLISECONDS) } diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala b/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala index 47536ff5d2..b134a9c54c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossiper.scala @@ -48,9 +48,9 @@ case class Join(node: Address) extends ClusterMessage * Represents the state of the cluster; cluster ring membership, ring convergence, meta data - all versioned by a vector clock. */ case class Gossip( - member: Member, + self: Member, // sorted set of members with their status, sorted by name - members: SortedSet[Member] = SortedSet.empty[Member](Ordering.fromLessThan[Member](_.address.toString > _.address.toString)), + members: SortedSet[Member], unavailableMembers: Set[Member] = Set.empty[Member], // for ring convergence seen: Map[Member, VectorClock] = Map.empty[Member, VectorClock], @@ -97,8 +97,8 @@ final class ClusterDaemon(system: ActorSystem, gossiper: Gossiper) extends Actor val log = Logging(system, "ClusterDaemon") def receive = { - case Join(address) ⇒ sender ! gossiper.latestGossip // TODO use address in Join(address) ? - case gossip: Gossip ⇒ gossiper.tell(gossip) + case Join(address) ⇒ gossiper.joining(address) + case gossip: Gossip ⇒ gossiper.receive(gossip) case unknown ⇒ log.error("Unknown message sent to cluster daemon [" + unknown + "]") } } @@ -118,31 +118,30 @@ final class ClusterDaemon(system: ActorSystem, gossiper: Gossiper) extends Actor * gossip to random deputy with certain probability depending on number of unreachable, deputy and live members. * */ -case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { +case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) { /** * Represents the state for this Gossiper. Implemented using optimistic lockless concurrency, * all state is represented by this immutable case class and managed by an AtomicReference. */ private case class State( - currentGossip: Gossip, + latestGossip: Gossip, + isSingletonCluster: Boolean = true, // starts as singleton cluster memberMembershipChangeListeners: Set[NodeMembershipChangeListener] = Set.empty[NodeMembershipChangeListener]) val remoteSettings = new RemoteSettings(system.settings.config, system.name) val clusterSettings = new ClusterSettings(system.settings.config, system.name) - val protocol = "akka" // TODO should this be hardcoded? - val address = remote.transport.address - val memberFingerprint = address.## + val remoteAddress = remote.transport.address + val memberFingerprint = remoteAddress.## val gossipInitialDelay = clusterSettings.GossipInitialDelay val gossipFrequency = clusterSettings.GossipFrequency - implicit val joinTimeout = clusterSettings.JoinTimeout implicit val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout) - private val contactPoint: Option[Member] = - clusterSettings.JoinContactPoint filter (_ != address) map (address ⇒ Member(address, MemberStatus.Up)) + private val nodeToJoin: Option[Member] = + clusterSettings.NodeToJoin filter (_ != remoteAddress) map (address ⇒ Member(address, MemberStatus.Joining)) private val serialization = remote.serialization private val failureDetector = new AccrualFailureDetector( @@ -154,31 +153,42 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { // Is it right to put this guy under the /system path or should we have a top-level /cluster or something else...? private val clusterDaemon = system.systemActorOf(Props(new ClusterDaemon(system, this)), "cluster") - private val state = new AtomicReference[State](State(currentGossip = newGossip())) + + private val state = { + val member = Member(remoteAddress, MemberStatus.Joining) + val gossip = Gossip( + self = member, + members = SortedSet.empty[Member](Ordering.fromLessThan[Member](_.address.toString > _.address.toString)) + member) // add joining node as Joining + new AtomicReference[State](State(gossip)) + } // FIXME manage connections in some other way so we can delete the RemoteConnectionManager (SINCE IT SUCKS!!!) private val connectionManager = new RemoteConnectionManager(system, remote, failureDetector, Map.empty[Address, ActorRef]) - log.info("Starting cluster Gossiper...") + log.info("Node [{}] - Starting cluster Gossiper...", remoteAddress) - // join the cluster by connecting to one of the deputy members and retrieve current cluster state (Gossip) - joinContactPoint(clusterSettings.JoinMaxTimeToRetry fromNow) + // try to join the node defined in the 'akka.cluster.node-to-join' option + join() // start periodic gossip and cluster scrutinization - val initateGossipCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency)(initateGossip()) - val scrutinizeCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency)(scrutinize()) + val gossipCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) { + gossip() + } + val scrutinizeCanceller = system.scheduler.schedule(gossipInitialDelay, gossipFrequency) { + scrutinize() + } /** * Shuts down all connections to other members, the cluster daemon and the periodic gossip and cleanup tasks. */ def shutdown() { if (isRunning.compareAndSet(true, false)) { - log.info("Shutting down Gossiper for [{}]...", address) + log.info("Node [{}] - Shutting down Gossiper", remoteAddress) try connectionManager.shutdown() finally { try system.stop(clusterDaemon) finally { - try initateGossipCanceller.cancel() finally { + try gossipCanceller.cancel() finally { try scrutinizeCanceller.cancel() finally { - log.info("Gossiper for [{}] is shut down", address) + log.info("Node [{}] - Gossiper is shut down", remoteAddress) } } } @@ -186,60 +196,90 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { } } - def latestGossip: Gossip = state.get.currentGossip + /** + * Latest gossip. + */ + def latestGossip: Gossip = state.get.latestGossip /** - * Tell the gossiper some gossip. + * Member status for this node. + */ + def self: Member = latestGossip.self + + /** + * Is this node a singleton cluster? + */ + def isSingletonCluster: Boolean = state.get.isSingletonCluster + + /** + * New node joining. + */ + @tailrec + final def joining(node: Address) { + log.debug("Node [{}] - Node [{}] is joining", remoteAddress, node) + val oldState = state.get + val oldGossip = oldState.latestGossip + val oldMembers = oldGossip.members + val newGossip = oldGossip copy (members = oldMembers + Member(node, MemberStatus.Joining)) // add joining node as Joining + val newState = oldState copy (latestGossip = incrementVersionForGossip(newGossip)) + if (!state.compareAndSet(oldState, newState)) joining(node) // recur if we failed update + } + + /** + * Receive new gossip. */ //@tailrec - final def tell(newGossip: Gossip) { - val gossipingNode = newGossip.member + final def receive(newGossip: Gossip) { + val from = newGossip.self + log.debug("Node [{}] - Receiving gossip from [{}]", remoteAddress, from.address) - failureDetector heartbeat gossipingNode.address // update heartbeat in failure detector + failureDetector heartbeat from.address // update heartbeat in failure detector + + // FIXME set flag state.isSingletonCluster = false (if true) // FIXME all below here is WRONG - redesign with cluster convergence in mind // val oldState = state.get // println("-------- NEW VERSION " + newGossip) - // println("-------- OLD VERSION " + oldState.currentGossip) - // val latestGossip = VectorClock.latestVersionOf(newGossip, oldState.currentGossip) - // println("-------- WINNING VERSION " + latestGossip) + // println("-------- OLD VERSION " + oldState.latestGossip) + // val gossip = VectorClock.latestVersionOf(newGossip, oldState.latestGossip) + // println("-------- WINNING VERSION " + gossip) - // val latestAvailableNodes = latestGossip.members - // val latestUnavailableNodes = latestGossip.unavailableMembers - // println("=======>>> gossipingNode: " + gossipingNode) + // val latestAvailableNodes = gossip.members + // val latestUnavailableNodes = gossip.unavailableMembers + // println("=======>>> myself: " + myself) // println("=======>>> latestAvailableNodes: " + latestAvailableNodes) - // if (!(latestAvailableNodes contains gossipingNode) && !(latestUnavailableNodes contains gossipingNode)) { + // if (!(latestAvailableNodes contains myself) && !(latestUnavailableNodes contains myself)) { // println("-------- NEW NODE") // // we have a new member - // val newGossip = latestGossip copy (availableNodes = latestAvailableNodes + gossipingNode) - // val newState = oldState copy (currentGossip = incrementVersionForGossip(newGossip)) + // val newGossip = gossip copy (availableNodes = latestAvailableNodes + myself) + // val newState = oldState copy (latestGossip = incrementVersionForGossip(newGossip)) // println("--------- new GOSSIP " + newGossip.members) // println("--------- new STATE " + newState) // // if we won the race then update else try again - // if (!state.compareAndSet(oldState, newState)) tell(newGossip) // recur + // if (!state.compareAndSet(oldState, newState)) receive(newGossip) // recur // else { // println("---------- WON RACE - setting state") // // create connections for all new members in the latest gossip - // (latestAvailableNodes + gossipingNode) foreach { member ⇒ + // (latestAvailableNodes + myself) foreach { member ⇒ // setUpConnectionTo(member) // oldState.memberMembershipChangeListeners foreach (_ memberConnected member) // notify listeners about the new members // } // } - // } else if (latestUnavailableNodes contains gossipingNode) { + // } else if (latestUnavailableNodes contains myself) { // // gossip from an old former dead member - // val newUnavailableMembers = latestUnavailableNodes - gossipingNode - // val newMembers = latestAvailableNodes + gossipingNode + // val newUnavailableMembers = latestUnavailableNodes - myself + // val newMembers = latestAvailableNodes + myself - // val newGossip = latestGossip copy (availableNodes = newMembers, unavailableNodes = newUnavailableMembers) - // val newState = oldState copy (currentGossip = incrementVersionForGossip(newGossip)) + // val newGossip = gossip copy (availableNodes = newMembers, unavailableNodes = newUnavailableMembers) + // val newState = oldState copy (latestGossip = incrementVersionForGossip(newGossip)) // // if we won the race then update else try again - // if (!state.compareAndSet(oldState, newState)) tell(newGossip) // recur - // else oldState.memberMembershipChangeListeners foreach (_ memberConnected gossipingNode) // notify listeners on successful update of state + // if (!state.compareAndSet(oldState, newState)) receive(newGossip) // recur + // else oldState.memberMembershipChangeListeners foreach (_ memberConnected myself) // notify listeners on successful update of state // } } @@ -268,49 +308,20 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { /** * Joins the pre-configured contact point and retrieves current gossip state. */ - private def joinContactPoint(deadline: Deadline) { - def tryJoinContactPoint(connection: ActorRef, deadline: Deadline) { - try { - Await.result(connection ? Join(address), joinTimeout) match { - case initialGossip: Gossip ⇒ - // just sets/overwrites the state/gossip regardless of what it was before - // since it should be treated as the initial state - state.set(state.get copy (currentGossip = initialGossip)) - log.debug("Received initial gossip [{}]", initialGossip) - - case unknown ⇒ - throw new IllegalStateException("Expected initial gossip but received [" + unknown + "]") - } - } catch { - case e: Exception ⇒ - log.error("Could not join contact point node - retrying for another {} seconds", deadline.timeLeft.toSeconds) - - // retry joining the cluster unless - // 1. Gossiper is shut down - // 2. The connection time window has expired - if (isRunning.get && deadline.timeLeft.toMillis > 0) tryJoinContactPoint(connection, deadline) // recur - else throw new RemoteConnectionException( - "Could not join contact point node - giving up after trying for " + deadline.time.toSeconds + " seconds") - } - } - - contactPoint match { - case None ⇒ log.info("Booting up in singleton cluster mode") - case Some(member) ⇒ - log.info("Trying to join contact point node defined in the configuration [{}]", member) - setUpConnectionTo(member) match { - case None ⇒ log.error("Could not set up connection to join contact point node defined in the configuration [{}]", member) - case Some(connection) ⇒ tryJoinContactPoint(connection, deadline) - } + private def join() = nodeToJoin foreach { member ⇒ + setUpConnectionTo(member) foreach { connection ⇒ + val command = Join(remoteAddress) + log.info("Node [{}] - Sending [{}] to [{}] through connection [{}]", remoteAddress, command, member.address, connection) + connection ! command } } /** * Initates a new round of gossip. */ - private def initateGossip() { + private def gossip() { val oldState = state.get - val oldGossip = oldState.currentGossip + val oldGossip = oldState.latestGossip val oldMembers = oldGossip.members val oldMembersSize = oldMembers.size @@ -331,7 +342,7 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { // 3. gossip to a deputy nodes for facilitating partition healing val deputies = deputyNodesWithoutMyself - if ((!shouldGossipToDeputy || oldMembersSize < 1) && (deputies.head != address)) { + if ((!shouldGossipToDeputy || oldMembersSize < 1) && !deputies.isEmpty) { if (oldMembersSize == 0) gossipToRandomNodeOf(deputies) else { val probability = 1.0 / oldMembersSize + oldUnavailableMembersSize @@ -341,17 +352,24 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { } /** - * Gossips to a random member in the set of members passed in as argument. + * Gossips latest gossip to a member. + */ + private def gossipTo(member: Member) { + setUpConnectionTo(member) foreach { _ ! latestGossip } + } + + /** + * Gossips latest gossip to a random member in the set of members passed in as argument. * * @return 'true' if it gossiped to a "deputy" member. */ private def gossipToRandomNodeOf(members: Seq[Member]): Boolean = { - val peers = members filter (_.address != address) // filter out myself + val peers = members filter (_.address != remoteAddress) // filter out myself val peer = selectRandomNode(peers) val oldState = state.get - val oldGossip = oldState.currentGossip + val oldGossip = oldState.latestGossip // if connection can't be established/found => ignore it since the failure detector will take care of the potential problem - setUpConnectionTo(peer) foreach { _ ! newGossip } + gossipTo(peer) deputyNodesWithoutMyself exists (peer == _) } @@ -369,7 +387,7 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { @tailrec final private def scrutinize() { val oldState = state.get - val oldGossip = oldState.currentGossip + val oldGossip = oldState.latestGossip val oldMembers = oldGossip.members val oldUnavailableMembers = oldGossip.unavailableMembers @@ -380,7 +398,7 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { val newUnavailableMembers = oldUnavailableMembers ++ newlyDetectedUnavailableMembers val newGossip = oldGossip copy (members = newMembers, unavailableMembers = newUnavailableMembers) - val newState = oldState copy (currentGossip = incrementVersionForGossip(newGossip)) + val newState = oldState copy (latestGossip = incrementVersionForGossip(newGossip)) // if we won the race then update else try again if (!state.compareAndSet(oldState, newState)) scrutinize() // recur @@ -420,22 +438,17 @@ case class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { private def setUpConnectionTo(member: Member): Option[ActorRef] = { val address = member.address try { - Some( - connectionManager.putIfAbsent( - address, - () ⇒ system.actorFor(RootActorPath(Address(protocol, system.name)) / "system" / "cluster"))) + Some(connectionManager.putIfAbsent(address, () ⇒ system.actorFor(RootActorPath(address) / "system" / "cluster"))) } catch { case e: Exception ⇒ None } } - private def newGossip(): Gossip = Gossip(Member(address, MemberStatus.Joining)) // starts in Joining mode - private def incrementVersionForGossip(from: Gossip): Gossip = { from copy (version = from.version.increment(memberFingerprint, newTimestamp)) } - private def deputyNodesWithoutMyself: Seq[Member] = Seq.empty[Member] filter (_.address != address) // FIXME read in deputy nodes from gossip data - now empty seq + private def deputyNodesWithoutMyself: Seq[Member] = Seq.empty[Member] filter (_.address != remoteAddress) // FIXME read in deputy nodes from gossip data - now empty seq private def selectRandomNode(members: Seq[Member]): Member = members(random.nextInt(members.size)) } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 7f1b26e553..78c836f0b5 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -25,11 +25,7 @@ class ClusterConfigSpec extends AkkaSpec( import settings._ FailureDetectorThreshold must be(8) FailureDetectorMaxSampleSize must be(1000) - - JoinContactPoint must be(None) - JoinTimeout must be(30 seconds) - JoinMaxTimeToRetry must be(30 seconds) - + NodeToJoin must be(None) GossipInitialDelay must be(5 seconds) GossipFrequency must be(1 second) } diff --git a/akka-cluster/src/test/scala/akka/cluster/NodeStartupSpec.scala b/akka-cluster/src/test/scala/akka/cluster/NodeStartupSpec.scala new file mode 100644 index 0000000000..4f07650f62 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/NodeStartupSpec.scala @@ -0,0 +1,90 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.cluster + +import java.net.InetSocketAddress + +import akka.testkit._ +import akka.dispatch._ +import akka.actor._ +import akka.remote._ + +import com.typesafe.config._ + +class NodeStartupSpec extends AkkaSpec(""" + akka { + loglevel = "DEBUG" + } + """) with ImplicitSender { + + var gossiper0: Gossiper = _ + var gossiper1: Gossiper = _ + var node0: ActorSystemImpl = _ + var node1: ActorSystemImpl = _ + + try { + node0 = ActorSystem("NodeStartupSpec", ConfigFactory + .parseString(""" + akka { + actor.provider = "akka.remote.RemoteActorRefProvider" + remote.netty { + hostname = localhost + port=5550 + } + }""") + .withFallback(system.settings.config)) + .asInstanceOf[ActorSystemImpl] + val remote0 = node0.provider.asInstanceOf[RemoteActorRefProvider] + gossiper0 = Gossiper(node0, remote0) + + "A first cluster node with a 'node-to-join' config set to empty string" must { + "be in 'Joining' phase when started up" in { + val members = gossiper0.latestGossip.members + val joiningMember = members find (_.address.port.get == 5550) + joiningMember must be('defined) + joiningMember.get.status must be(MemberStatus.Joining) + } + + "be a singleton cluster when started up" in { + gossiper0.isSingletonCluster must be(true) + } + } + + node1 = ActorSystem("NodeStartupSpec", ConfigFactory + .parseString(""" + akka { + actor.provider = "akka.remote.RemoteActorRefProvider" + remote.netty { + hostname = localhost + port=5551 + } + cluster.node-to-join = "akka://NodeStartupSpec@localhost:5550" + }""") + .withFallback(system.settings.config)) + .asInstanceOf[ActorSystemImpl] + val remote1 = node1.provider.asInstanceOf[RemoteActorRefProvider] + gossiper1 = Gossiper(node1, remote1) + + "A second cluster node with a 'node-to-join' config defined" must { + "join the other node cluster as 'Joining' when sending a Join command" in { + Thread.sleep(1000) // give enough time for node1 to JOIN node0 + val members = gossiper0.latestGossip.members + val joiningMember = members find (_.address.port.get == 5551) + joiningMember must be('defined) + joiningMember.get.status must be(MemberStatus.Joining) + } + } + } catch { + case e: Exception ⇒ + e.printStackTrace + fail(e.toString) + } + + override def atTermination() { + gossiper0.shutdown() + node0.shutdown() + gossiper1.shutdown() + node1.shutdown() + } +} diff --git a/akka-remote/src/main/scala/akka/remote/RemoteAddress.scala b/akka-remote/src/main/scala/akka/remote/RemoteAddress.scala deleted file mode 100644 index f7274c2356..0000000000 --- a/akka-remote/src/main/scala/akka/remote/RemoteAddress.scala +++ /dev/null @@ -1,5 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.remote -