diff --git a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala index c55e4f1d26..fc718acd8a 100644 --- a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala +++ b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala @@ -15,27 +15,26 @@ import akka.actor._ import com.typesafe.config.Config import akka.actor.Terminated import akka.io.IO.HasFailureMessage +import akka.util.Helpers.Requiring abstract class SelectionHandlerSettings(config: Config) { import config._ - val MaxChannels = getString("max-channels") match { + val MaxChannels: Int = getString("max-channels") match { case "unlimited" ⇒ -1 - case _ ⇒ getInt("max-channels") + case _ ⇒ getInt("max-channels") requiring (_ > 0, "max-channels must be > 0 or 'unlimited'") } - val SelectTimeout = getString("select-timeout") match { + val SelectTimeout: Duration = getString("select-timeout") match { case "infinite" ⇒ Duration.Inf - case x ⇒ Duration(x) + case _ ⇒ Duration(getMilliseconds("select-timeout"), MILLISECONDS) requiring ( + _ >= Duration.Zero, "select-timeout must not be negative") } - val SelectorAssociationRetries = getInt("selector-association-retries") + val SelectorAssociationRetries: Int = getInt("selector-association-retries") requiring ( + _ >= 0, "selector-association-retries must be >= 0") - val SelectorDispatcher = getString("selector-dispatcher") - val WorkerDispatcher = getString("worker-dispatcher") - val TraceLogging = getBoolean("trace-logging") - - require(MaxChannels == -1 || MaxChannels > 0, "max-channels must be > 0 or 'unlimited'") - require(SelectTimeout >= Duration.Zero, "select-timeout must not be negative") - require(SelectorAssociationRetries >= 0, "selector-association-retries must be >= 0") + val SelectorDispatcher: String = getString("selector-dispatcher") + val WorkerDispatcher: String = getString("worker-dispatcher") + val TraceLogging: Boolean = getBoolean("trace-logging") def MaxChannelsPerSelector: Int @@ -180,7 +179,7 @@ private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandler SelectTimeout match { case Duration.Zero ⇒ () ⇒ selector.selectNow() case Duration.Inf ⇒ () ⇒ selector.select() - case x ⇒ val millis = x.toMillis; () ⇒ selector.select(millis) + case x ⇒ { val millis = x.toMillis; () ⇒ selector.select(millis) } } def tryRun() { if (doSelect() > 0) { @@ -197,7 +196,7 @@ private[io] class SelectionHandler(manager: ActorRef, settings: SelectionHandler readyOps match { case OP_READ ⇒ connection ! ChannelReadable case OP_WRITE ⇒ connection ! ChannelWritable - case OP_READ_AND_WRITE ⇒ connection ! ChannelWritable; connection ! ChannelReadable + case OP_READ_AND_WRITE ⇒ { connection ! ChannelWritable; connection ! ChannelReadable } case x if (x & OP_ACCEPT) > 0 ⇒ connection ! ChannelAcceptable case x if (x & OP_CONNECT) > 0 ⇒ connection ! ChannelConnectable case x ⇒ log.warning("Invalid readyOps: [{}]", x) diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index 89ad3e1644..aa5d71adcd 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -11,6 +11,7 @@ import com.typesafe.config.Config import scala.concurrent.duration._ import scala.collection.immutable import akka.util.ByteString +import akka.util.Helpers.Requiring import akka.actor._ import java.lang.{ Iterable ⇒ JIterable } @@ -173,33 +174,27 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension { class Settings private[TcpExt] (_config: Config) extends SelectionHandlerSettings(_config) { import _config._ - val NrOfSelectors = getInt("nr-of-selectors") + val NrOfSelectors: Int = getInt("nr-of-selectors") requiring (_ > 0, "nr-of-selectors must be > 0") - val BatchAcceptLimit = getInt("batch-accept-limit") - val DirectBufferSize = getIntBytes("direct-buffer-size") - val MaxDirectBufferPoolSize = getInt("direct-buffer-pool-limit") - val RegisterTimeout = getString("register-timeout") match { + val BatchAcceptLimit: Int = getInt("batch-accept-limit") requiring (_ > 0, "batch-accept-limit must be > 0") + val DirectBufferSize: Int = getIntBytes("direct-buffer-size") + val MaxDirectBufferPoolSize: Int = getInt("direct-buffer-pool-limit") + val RegisterTimeout: Duration = getString("register-timeout") match { case "infinite" ⇒ Duration.Undefined - case x ⇒ Duration(x) + case x ⇒ Duration(getMilliseconds("register-timeout"), MILLISECONDS) } - val ReceivedMessageSizeLimit = getString("max-received-message-size") match { + val ReceivedMessageSizeLimit: Int = getString("max-received-message-size") match { case "unlimited" ⇒ Int.MaxValue case x ⇒ getIntBytes("received-message-size-limit") } - val ManagementDispatcher = getString("management-dispatcher") + val ManagementDispatcher: String = getString("management-dispatcher") val FileIODispatcher = getString("file-io-dispatcher") val TransferToLimit = getString("file-io-transferTo-limit") match { case "unlimited" ⇒ Int.MaxValue case _ ⇒ getIntBytes("file-io-transferTo-limit") } - require(NrOfSelectors > 0, "nr-of-selectors must be > 0") - require(MaxChannels == -1 || MaxChannels > 0, "max-channels must be > 0 or 'unlimited'") - require(SelectTimeout >= Duration.Zero, "select-timeout must not be negative") - require(SelectorAssociationRetries >= 0, "selector-association-retries must be >= 0") - require(BatchAcceptLimit > 0, "batch-accept-limit must be > 0") - - val MaxChannelsPerSelector = if (MaxChannels == -1) -1 else math.max(MaxChannels / NrOfSelectors, 1) + val MaxChannelsPerSelector: Int = if (MaxChannels == -1) -1 else math.max(MaxChannels / NrOfSelectors, 1) private[this] def getIntBytes(path: String): Int = { val size = getBytes(path) diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index 83bd842272..602c196711 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -92,8 +92,8 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, doWrite(handler) if (!writePending) // writing is now finished handleClose(handler, closeCommander, closedEvent) - case SendBufferFull(remaining) ⇒ pendingWrite = remaining; selector ! WriteInterest - case WriteFileFinished ⇒ pendingWrite = null; handleClose(handler, closeCommander, closedEvent) + case SendBufferFull(remaining) ⇒ { pendingWrite = remaining; selector ! WriteInterest } + case WriteFileFinished ⇒ { pendingWrite = null; handleClose(handler, closeCommander, closedEvent) } case WriteFileFailed(e) ⇒ handleError(handler, e) // rethrow exception from dispatcher task case Abort ⇒ handleClose(handler, Some(sender), Aborted) @@ -122,7 +122,7 @@ private[io] abstract class TcpConnection(val channel: SocketChannel, pendingWrite = createWrite(write) doWrite(handler) - case SendBufferFull(remaining) ⇒ pendingWrite = remaining; selector ! WriteInterest + case SendBufferFull(remaining) ⇒ { pendingWrite = remaining; selector ! WriteInterest } case WriteFileFinished ⇒ pendingWrite = null case WriteFileFailed(e) ⇒ handleError(handler, e) // rethrow exception from dispatcher task } diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala index 9036d190f3..6c7f2e68fe 100644 --- a/akka-actor/src/main/scala/akka/io/TcpListener.scala +++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala @@ -90,7 +90,7 @@ private[io] class TcpListener(val selectorRouter: ActorRef, if (limit > 0) { try channel.accept() catch { - case NonFatal(e) ⇒ log.error(e, "Accept error: could not accept new connection due to {}", e); null + case NonFatal(e) ⇒ { log.error(e, "Accept error: could not accept new connection due to {}", e); null } } } else null if (socketChannel != null) { diff --git a/akka-actor/src/main/scala/akka/io/Udp.scala b/akka-actor/src/main/scala/akka/io/Udp.scala index d6c6e0589a..198b41ce54 100644 --- a/akka-actor/src/main/scala/akka/io/Udp.scala +++ b/akka-actor/src/main/scala/akka/io/Udp.scala @@ -12,6 +12,7 @@ import akka.actor.ActorRef import akka.actor.ExtensionKey import akka.actor.ActorSystem import akka.util.ByteString +import akka.util.Helpers.Requiring import java.net.InetSocketAddress import scala.collection.immutable @@ -80,17 +81,14 @@ object Udp extends ExtensionKey[UdpExt] { private[io] class UdpSettings(_config: Config) extends SelectionHandlerSettings(_config) { import _config._ - val NrOfSelectors = getInt("nr-of-selectors") - val DirectBufferSize = getIntBytes("direct-buffer-size") - val MaxDirectBufferPoolSize = getInt("direct-buffer-pool-limit") - val BatchReceiveLimit = getInt("receive-throughput") + val NrOfSelectors: Int = getInt("nr-of-selectors") requiring (_ > 0, "nr-of-selectors must be > 0") + val DirectBufferSize: Int = getIntBytes("direct-buffer-size") + val MaxDirectBufferPoolSize: Int = getInt("direct-buffer-pool-limit") + val BatchReceiveLimit: Int = getInt("receive-throughput") - val ManagementDispatcher = getString("management-dispatcher") + val ManagementDispatcher: String = getString("management-dispatcher") - // FIXME: Use new requiring - require(NrOfSelectors > 0, "nr-of-selectors must be > 0") - - override val MaxChannelsPerSelector = if (MaxChannels == -1) -1 else math.max(MaxChannels / NrOfSelectors, 1) + override val MaxChannelsPerSelector: Int = if (MaxChannels == -1) -1 else math.max(MaxChannels / NrOfSelectors, 1) private[this] def getIntBytes(path: String): Int = { val size = getBytes(path) diff --git a/akka-cluster/src/main/protobuf/ClusterMessages.proto b/akka-cluster/src/main/protobuf/ClusterMessages.proto index ebbb04baf7..068068b0d3 100644 --- a/akka-cluster/src/main/protobuf/ClusterMessages.proto +++ b/akka-cluster/src/main/protobuf/ClusterMessages.proto @@ -9,14 +9,6 @@ option optimize_for = SPEED; * Cluster User Messages ****************************************/ -/** - * Join - */ -message Join { - required Address address = 1; - repeated string roles = 2; -} - /** * Leave * Sends an Address @@ -31,6 +23,22 @@ message Join { * Internal Cluster Action Messages ****************************************/ +/** + * Join + */ +message Join { + required UniqueAddress node = 1; + repeated string roles = 2; +} + +/** + * Welcome, reply to Join + */ +message Welcome { + required UniqueAddress from = 1; + required Gossip gossip = 2; +} + /** * InitJoin * Sends Empty @@ -52,12 +60,12 @@ message Join { /** * Exit - * Sends an Address + * Sends a UniqueAddress */ /** - * Remove - * Sends an Address + * Shutdown + * Sends a UniqueAddress */ @@ -88,16 +96,17 @@ message Join { * Gossip Envelope */ message GossipEnvelope { - required Address from = 1; - required Gossip gossip = 2; - required bool conversation = 3; + required UniqueAddress from = 1; + required UniqueAddress to = 2; + required Gossip gossip = 3; + required bool conversation = 4; } /** * Gossip */ message Gossip { - repeated Address allAddresses = 1; + repeated UniqueAddress allAddresses = 1; repeated string allRoles = 2; repeated string allHashes = 3; repeated Member members = 4; @@ -222,3 +231,11 @@ message Address { required uint32 port = 3; optional string protocol = 4; } + +/** + * Defines a remote address with uid. + */ +message UniqueAddress { + required Address address = 1; + required uint32 uid = 2; +} diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 71b10b6ce6..b34c0c9eb1 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -18,6 +18,10 @@ akka { # how long to wait for one of the seed nodes to reply to initial join request seed-node-timeout = 5s + # If a join request fails it will be retried after this period. + # Disable join retry by specifying "off". + retry-unsuccessful-join-after = 10s + # Automatic join the seed-nodes at startup. # If seed-nodes is empty it will join itself and become a single node cluster. auto-join = on @@ -69,7 +73,8 @@ akka { unreachable-nodes-reaper-interval = 1s # How often the current internal stats should be published. - # A value of 0 s can be used to always publish the stats, when it happens. + # A value of 0s can be used to always publish the stats, when it happens. + # Disable with "off". publish-stats-interval = 10s # The id of the dispatcher to use for cluster actors. If not specified diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 05697baf03..92aa1d92f5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -32,10 +32,6 @@ import scala.util.control.NonFatal /** * Cluster Extension Id and factory for creating Cluster extension. - * Example: - * {{{ - * if (Cluster(system).isLeader) { ... } - * }}} */ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { override def get(system: ActorSystem): Cluster = super.get(system) @@ -53,11 +49,6 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { * During each round of gossip exchange it sends Gossip to random node with * newer or older state information, if any, based on the current gossip overview, * with some probability. Otherwise Gossip to any random live node. - * - * Example: - * {{{ - * if (Cluster(system).isLeader) { ... } - * }}} */ class Cluster(val system: ExtendedActorSystem) extends Extension { @@ -66,13 +57,21 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { val settings = new ClusterSettings(system.settings.config, system.name) import settings._ - val selfAddress: Address = system.provider match { - case c: ClusterActorRefProvider ⇒ c.transport.defaultAddress + /** + * INTERNAL API + */ + private[cluster] val selfUniqueAddress: UniqueAddress = system.provider match { + case c: ClusterActorRefProvider ⇒ + UniqueAddress(c.transport.defaultAddress, AddressUidExtension(system).addressUid) case other ⇒ throw new ConfigurationException( - "ActorSystem [%s] needs to have a 'ClusterActorRefProvider' enabled in the configuration, currently uses [%s]". - format(system, other.getClass.getName)) + s"ActorSystem [${system}] needs to have a 'ClusterActorRefProvider' enabled in the configuration, currently uses [${other.getClass.getName}]") } + /** + * The address of this cluster member. + */ + def selfAddress: Address = selfUniqueAddress.address + /** * roles that this member has */ @@ -241,10 +240,10 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { /** * Try to join this cluster node with the node specified by 'address'. - * A 'Join(thisNodeAddress)' command is sent to the node to join. + * A 'Join(selfAddress)' command is sent to the node to join. */ def join(address: Address): Unit = - clusterCore ! InternalClusterAction.JoinTo(address) + clusterCore ! ClusterUserAction.JoinTo(address) /** * Send command to issue state transition to LEAVING for the node specified by 'address'. diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index ca42a0f64a..71024dffc8 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -23,16 +23,19 @@ import akka.actor.ActorSelection trait ClusterMessage extends Serializable /** - * Cluster commands sent by the USER. + * INTERNAL API + * Cluster commands sent by the USER via + * [[akka.cluster.Cluster]] extension + * or JMX. */ -object ClusterUserAction { +private[cluster] object ClusterUserAction { /** - * Command to join the cluster. Sent when a node (represented by 'address') - * wants to join another node (the receiver). + * Command to initiate join another node (represented by `address`). + * Join will be sent to the other node. */ @SerialVersionUID(1L) - case class Join(address: Address, roles: Set[String]) extends ClusterMessage + case class JoinTo(address: Address) /** * Command to leave the cluster. @@ -54,10 +57,18 @@ object ClusterUserAction { private[cluster] object InternalClusterAction { /** - * Command to initiate join another node (represented by 'address'). - * Join will be sent to the other node. + * Command to join the cluster. Sent when a node wants to join another node (the receiver). + * @param node the node that wants to join the cluster */ - case class JoinTo(address: Address) + @SerialVersionUID(1L) + case class Join(node: UniqueAddress, roles: Set[String]) extends ClusterMessage + + /** + * Reply to Join + * @param from the sender node in the cluster, i.e. the node that received the Join command + */ + @SerialVersionUID(1L) + case class Welcome(from: UniqueAddress, gossip: Gossip) extends ClusterMessage /** * Command to initiate the process to join the specified @@ -134,7 +145,6 @@ private[cluster] object InternalClusterAction { sealed trait PublishMessage case class PublishChanges(newGossip: Gossip) extends PublishMessage case class PublishEvent(event: ClusterDomainEvent) extends PublishMessage - case object PublishStart extends PublishMessage } /** @@ -147,15 +157,17 @@ private[cluster] object ClusterLeaderAction { /** * Command to mark a node to be removed from the cluster immediately. * Can only be sent by the leader. + * @param node the node to exit, i.e. destination of the message */ @SerialVersionUID(1L) - case class Exit(address: Address) extends ClusterMessage + case class Exit(node: UniqueAddress) extends ClusterMessage /** * Command to remove a node from the cluster immediately. + * @param node the node to shutdown, i.e. destination of the message */ @SerialVersionUID(1L) - case class Remove(address: Address) extends ClusterMessage + case class Shutdown(node: UniqueAddress) extends ClusterMessage } /** @@ -227,11 +239,11 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto import InternalClusterAction._ val cluster = Cluster(context.system) - import cluster.{ selfAddress, scheduler, failureDetector } + import cluster.{ selfAddress, selfUniqueAddress, scheduler, failureDetector } import cluster.settings._ - // FIXME the UUID should not be needed when Address contains uid, ticket #2788 - val vclockNode = VectorClock.Node(selfAddress.toString + "-" + UUID.randomUUID()) + def vclockName(node: UniqueAddress): String = node.address + "-" + node.uid + val vclockNode = VectorClock.Node(vclockName(selfUniqueAddress)) // note that self is not initially member, // and the Gossip is not versioned for this 'Node' yet @@ -241,8 +253,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto var seedNodeProcess: Option[ActorRef] = None - var tryingToJoinWith: Option[Address] = None - /** * Looks up and returns the remote cluster command connection for the specific address. */ @@ -267,10 +277,11 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto LeaderActionsInterval, self, LeaderActionsTick) // start periodic publish of current stats - val publishStatsTask: Option[Cancellable] = - if (PublishStatsInterval == Duration.Zero) None - else Some(scheduler.schedule(PeriodicTasksInitialDelay.max(PublishStatsInterval), - PublishStatsInterval, self, PublishStatsTick)) + val publishStatsTask: Option[Cancellable] = PublishStatsInterval match { + case Duration.Zero | Duration.Undefined | Duration.Inf ⇒ None + case d: FiniteDuration ⇒ + Some(scheduler.schedule(PeriodicTasksInitialDelay.max(d), d, self, PublishStatsTick)) + } override def preStart(): Unit = { if (AutoJoin) self ! JoinSeedNodes(SeedNodes) @@ -284,28 +295,47 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto } def uninitialized: Actor.Receive = { - case InitJoin ⇒ sender ! InitJoinNack(selfAddress) - case JoinTo(address) ⇒ join(address) - case JoinSeedNodes(seedNodes) ⇒ joinSeedNodes(seedNodes) + case InitJoin ⇒ sender ! InitJoinNack(selfAddress) + case ClusterUserAction.JoinTo(address) ⇒ join(address) + case JoinSeedNodes(seedNodes) ⇒ joinSeedNodes(seedNodes) + case msg: SubscriptionMessage ⇒ publisher forward msg + case _: Tick ⇒ // ignore periodic tasks until initialized + } + + def tryingToJoin(joinWith: Address, deadline: Option[Deadline]): Actor.Receive = { + case Welcome(from, gossip) ⇒ welcome(joinWith, from, gossip) + case InitJoin ⇒ sender ! InitJoinNack(selfAddress) + case ClusterUserAction.JoinTo(address) ⇒ + context.become(uninitialized) + join(address) + case JoinSeedNodes(seedNodes) ⇒ + context.become(uninitialized) + joinSeedNodes(seedNodes) case msg: SubscriptionMessage ⇒ publisher forward msg - case _: Tick ⇒ // ignore periodic tasks until initialized + case _: Tick ⇒ + if (deadline.exists(_.isOverdue)) { + context.become(uninitialized) + if (AutoJoin) joinSeedNodes(SeedNodes) + else join(joinWith) + } } def initialized: Actor.Receive = { - case msg: GossipEnvelope ⇒ receiveGossip(msg) - case GossipTick ⇒ gossip() - case ReapUnreachableTick ⇒ reapUnreachableMembers() - case LeaderActionsTick ⇒ leaderActions() - case PublishStatsTick ⇒ publishInternalStats() - case InitJoin ⇒ initJoin() - case JoinTo(address) ⇒ join(address) - case ClusterUserAction.Join(address, roles) ⇒ joining(address, roles) - case ClusterUserAction.Down(address) ⇒ downing(address) - case ClusterUserAction.Leave(address) ⇒ leaving(address) - case Exit(address) ⇒ exiting(address) - case Remove(address) ⇒ removing(address) - case SendGossipTo(address) ⇒ gossipTo(address) - case msg: SubscriptionMessage ⇒ publisher forward msg + case msg: GossipEnvelope ⇒ receiveGossip(msg) + case GossipTick ⇒ gossip() + case ReapUnreachableTick ⇒ reapUnreachableMembers() + case LeaderActionsTick ⇒ leaderActions() + case PublishStatsTick ⇒ publishInternalStats() + case InitJoin ⇒ initJoin() + case Join(node, roles) ⇒ joining(node, roles) + case ClusterUserAction.Down(address) ⇒ downing(address) + case ClusterUserAction.Leave(address) ⇒ leaving(address) + case Exit(node) ⇒ exiting(node) + case Shutdown(node) ⇒ shutdown(node) + case SendGossipTo(address) ⇒ sendGossipTo(address) + case msg: SubscriptionMessage ⇒ publisher forward msg + case ClusterUserAction.JoinTo(address) ⇒ + log.info("Trying to join [{}] when already part of a cluster, ignoring", address) } @@ -322,7 +352,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto require(seedNodeProcess.isEmpty, "Join seed nodes is already in progress") seedNodeProcess = if (seedNodes.isEmpty || seedNodes == immutable.IndexedSeq(selfAddress)) { - self ! JoinTo(selfAddress) + self ! ClusterUserAction.JoinTo(selfAddress) None } else if (seedNodes.head == selfAddress) { Some(context.actorOf(Props(new FirstSeedNodeProcess(seedNodes)). @@ -334,8 +364,10 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto } /** - * Try to join this cluster node with the node specified by 'address'. - * A 'Join(thisNodeAddress)' command is sent to the node to join. + * Try to join this cluster node with the node specified by `address`. + * It's only allowed to join from an empty state, i.e. when not already a member. + * A `Join(selfUniqueAddress)` command is sent to the node to join, + * which will reply with a `Welcome` message. */ def join(address: Address): Unit = { if (address.protocol != selfAddress.protocol) @@ -344,7 +376,9 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto else if (address.system != selfAddress.system) log.warning("Trying to join member with wrong ActorSystem name, but was ignored, expected [{}] but was [{}]", selfAddress.system, address.system) - else if (!latestGossip.members.exists(_.address == address)) { + else { + require(latestGossip.members.isEmpty, "Join can only be done from empty state") + // to support manual join when joining to seed nodes is stuck (no seed nodes available) val snd = sender seedNodeProcess match { @@ -358,61 +392,63 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto case None ⇒ // no seedNodeProcess in progress } - // only wipe the state if we're not in the process of joining this address - if (tryingToJoinWith.forall(_ != address)) { - tryingToJoinWith = Some(address) - // wipe our state since a node that joins a cluster must be empty - latestGossip = Gossip.empty - // wipe the failure detector since we are starting fresh and shouldn't care about the past - failureDetector.reset() - // wipe the publisher since we are starting fresh - publisher ! PublishStart - - publish(latestGossip) + if (address == selfAddress) { + context.become(initialized) + joining(selfUniqueAddress, cluster.selfRoles) + } else { + val joinDeadline = RetryUnsuccessfulJoinAfter match { + case Duration.Undefined | Duration.Inf ⇒ None + case d: FiniteDuration ⇒ Some(Deadline.now + d) + } + context.become(tryingToJoin(address, joinDeadline)) + clusterCore(address) ! Join(selfUniqueAddress, cluster.selfRoles) } - context.become(initialized) - if (address == selfAddress) - joining(address, cluster.selfRoles) - else - clusterCore(address) ! ClusterUserAction.Join(selfAddress, cluster.selfRoles) } } /** * State transition to JOINING - new node joining. + * Received `Join` message and replies with `Welcome` message, containing + * current gossip state, including the new joining member. */ - def joining(node: Address, roles: Set[String]): Unit = { - if (node.protocol != selfAddress.protocol) + def joining(node: UniqueAddress, roles: Set[String]): Unit = { + if (node.address.protocol != selfAddress.protocol) log.warning("Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]", - selfAddress.protocol, node.protocol) - else if (node.system != selfAddress.system) + selfAddress.protocol, node.address.protocol) + else if (node.address.system != selfAddress.system) log.warning("Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]", - selfAddress.system, node.system) + selfAddress.system, node.address.system) else { val localMembers = latestGossip.members val localUnreachable = latestGossip.overview.unreachable - val alreadyMember = localMembers.exists(_.address == node) - val isUnreachable = localUnreachable.exists(_.address == node) + // check by address without uid to make sure that node with same host:port is not allowed + // to join until previous node with that host:port has been removed from the cluster + val alreadyMember = localMembers.exists(_.address == node.address) + val isUnreachable = localUnreachable.exists(_.address == node.address) - if (!alreadyMember && !isUnreachable) { + if (alreadyMember) + log.info("Existing member [{}] is trying to join, ignoring", node) + else if (isUnreachable) + log.info("Unreachable member [{}] is trying to join, ignoring", node) + else { // remove the node from the failure detector - failureDetector.remove(node) + failureDetector.remove(node.address) // add joining node as Joining // add self in case someone else joins before self has joined (Set discards duplicates) - val newMembers = localMembers + Member(node, Joining, roles) + Member(selfAddress, Joining, cluster.selfRoles) + val newMembers = localMembers + Member(node, roles) + Member(selfUniqueAddress, cluster.selfRoles) val newGossip = latestGossip copy (members = newMembers) val versionedGossip = newGossip :+ vclockNode - val seenVersionedGossip = versionedGossip seen selfAddress + val seenVersionedGossip = versionedGossip seen selfUniqueAddress latestGossip = seenVersionedGossip - log.info("Cluster Node [{}] - Node [{}] is JOINING, roles [{}]", selfAddress, node, roles.mkString(", ")) - if (node != selfAddress) { - gossipTo(node) + log.info("Cluster Node [{}] - Node [{}] is JOINING, roles [{}]", selfAddress, node.address, roles.mkString(", ")) + if (node != selfUniqueAddress) { + clusterCore(node.address) ! Welcome(selfUniqueAddress, latestGossip) } publish(latestGossip) @@ -420,8 +456,27 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto } } + /** + * Reply from Join request. + */ + def welcome(joinWith: Address, from: UniqueAddress, gossip: Gossip): Unit = { + require(latestGossip.members.isEmpty, "Join can only be done from empty state") + if (joinWith != from.address) + log.info("Ignoring welcome from [{}] when trying to join with [{}]", from.address, joinWith) + else { + log.info("Cluster Node [{}] - Welcome from [{}]", selfAddress, from.address) + latestGossip = gossip seen selfUniqueAddress + publish(latestGossip) + if (from != selfUniqueAddress) + oneWayGossipTo(from) + context.become(initialized) + } + } + /** * State transition to LEAVING. + * The node will eventually be removed by the leader, after hand-off in EXITING, and only after + * removal a new node with same address can join the cluster through the normal joining procedure. */ def leaving(address: Address): Unit = { // only try to update if the node is available (in the member ring) @@ -430,7 +485,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val newGossip = latestGossip copy (members = newMembers) val versionedGossip = newGossip :+ vclockNode - val seenVersionedGossip = versionedGossip seen selfAddress + val seenVersionedGossip = versionedGossip seen selfUniqueAddress latestGossip = seenVersionedGossip @@ -442,31 +497,29 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto /** * State transition to EXITING. */ - def exiting(address: Address): Unit = { - log.info("Cluster Node [{}] - Marked node [{}] as [{}]", selfAddress, address, Exiting) - // FIXME implement when we implement hand-off - } + def exiting(node: UniqueAddress): Unit = + if (node == selfUniqueAddress) { + log.info("Cluster Node [{}] - Marked as [{}]", selfAddress, Exiting) + // TODO implement when we need hand-off + } /** - * State transition to REMOVED. - * - * This method is for now only called after the LEADER have sent a Removed message - telling the node + * This method is only called after the LEADER has sent a Shutdown message - telling the node * to shut down himself. - * - * In the future we might change this to allow the USER to send a Removed(address) message telling an - * arbitrary node to be moved directly from UP -> REMOVED. */ - def removing(address: Address): Unit = { - log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress) - cluster.shutdown() - } + def shutdown(node: UniqueAddress): Unit = + if (node == selfUniqueAddress) { + log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress) + cluster.shutdown() + } /** - * The node to DOWN is removed from the 'members' set and put in the 'unreachable' set (if not already there) - * and its status is set to DOWN. The node is also removed from the 'seen' table. + * State transition to DOW. + * The node to DOWN is removed from the `members` set and put in the `unreachable` set (if not already there) + * and its status is set to DOWN. The node is also removed from the `seen` table. * - * The node will reside as DOWN in the 'unreachable' set until an explicit command JOIN command is sent directly - * to this node and it will then go through the normal JOINING procedure. + * The node will eventually be removed by the leader, and only after removal a new node with same address can + * join the cluster through the normal joining procedure. */ def downing(address: Address): Unit = { val localGossip = latestGossip @@ -475,7 +528,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val localSeen = localOverview.seen val localUnreachableMembers = localOverview.unreachable - // 1. check if the node to DOWN is in the 'members' set + // 1. check if the node to DOWN is in the `members` set val downedMember: Option[Member] = localMembers.collectFirst { case m if m.address == address ⇒ m.copy(status = Down) } @@ -486,7 +539,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto case None ⇒ localMembers } - // 2. check if the node to DOWN is in the 'unreachable' set + // 2. check if the node to DOWN is in the `unreachable` set val newUnreachableMembers = localUnreachableMembers.map { member ⇒ // no need to DOWN members already DOWN @@ -496,17 +549,17 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto } else member } - // 3. add the newly DOWNED members from the 'members' (in step 1.) to the 'newUnreachableMembers' set. + // 3. add the newly DOWNED members from the `members` (in step 1.) to the `newUnreachableMembers` set. val newUnreachablePlusNewlyDownedMembers = newUnreachableMembers ++ downedMember - // 4. remove nodes marked as DOWN from the 'seen' table - val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers.collect { case m if m.status == Down ⇒ m.address } + // 4. remove nodes marked as DOWN from the `seen` table + val newSeen = localSeen -- newUnreachablePlusNewlyDownedMembers.collect { case m if m.status == Down ⇒ m.uniqueAddress } // update gossip overview val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachablePlusNewlyDownedMembers) val newGossip = localGossip copy (overview = newOverview, members = newMembers) // update gossip val versionedGossip = newGossip :+ vclockNode - latestGossip = versionedGossip seen selfAddress + latestGossip = versionedGossip seen selfUniqueAddress publish(latestGossip) } @@ -519,14 +572,17 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val remoteGossip = envelope.gossip val localGossip = latestGossip - if (remoteGossip.overview.unreachable.exists(_.address == selfAddress)) { - log.debug("Ignoring received gossip with self [{}] as unreachable, from [{}]", selfAddress, from) - } else if (localGossip.overview.isNonDownUnreachable(from)) { - log.debug("Ignoring received gossip from unreachable [{}] ", from) - } else { - // if we're in the remote gossip and not Removed, then we're not joining - if (tryingToJoinWith.nonEmpty && remoteGossip.member(selfAddress).status != Removed) - tryingToJoinWith = None + if (envelope.to != selfUniqueAddress) + log.info("Ignoring received gossip intended for someone else, from [{}] to [{}]", from.address, envelope.to) + if (remoteGossip.overview.unreachable.exists(_.address == selfAddress)) + log.info("Ignoring received gossip with myself as unreachable, from [{}]", selfAddress, from.address) + else if (localGossip.overview.unreachable.exists(_.uniqueAddress == from)) + log.info("Ignoring received gossip from unreachable [{}] ", from) + else if (localGossip.members.forall(_.uniqueAddress != from)) + log.info("Ignoring received gossip from unknown [{}]", from) + else if (remoteGossip.members.forall(_.uniqueAddress != selfUniqueAddress)) + log.info("Ignoring received gossip that does not contain myself, from [{}]", from) + else { val comparison = remoteGossip.version tryCompareTo localGossip.version val conflict = comparison.isEmpty @@ -537,17 +593,17 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto (remoteGossip merge localGossip, true, stats.incrementMergeCount) case Some(0) ⇒ // same version - (remoteGossip mergeSeen localGossip, !remoteGossip.seenByAddress(selfAddress), stats.incrementSameCount) + (remoteGossip mergeSeen localGossip, !remoteGossip.seenByNode(selfUniqueAddress), stats.incrementSameCount) case Some(x) if x < 0 ⇒ // local is newer (localGossip, true, stats.incrementNewerCount) case _ ⇒ // remote is newer - (remoteGossip, !remoteGossip.seenByAddress(selfAddress), stats.incrementOlderCount) + (remoteGossip, !remoteGossip.seenByNode(selfUniqueAddress), stats.incrementOlderCount) } stats = newStats - latestGossip = winningGossip seen selfAddress + latestGossip = winningGossip seen selfUniqueAddress // for all new joining nodes we remove them from the failure detector latestGossip.members foreach { @@ -587,31 +643,32 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val preferredGossipTargets = if (ThreadLocalRandom.current.nextDouble() < GossipDifferentViewProbability) { // If it's time to try to gossip to some nodes with a different view // gossip to a random alive member with preference to a member with older or newer gossip version - val localMemberAddressesSet = localGossip.members map { _.address } + val localMemberAddressesSet = localGossip.members map { _.uniqueAddress } val nodesWithDifferentView = for { - (address, version) ← localGossip.overview.seen - if localMemberAddressesSet contains address + (node, version) ← localGossip.overview.seen + if localMemberAddressesSet contains node if version != localGossip.version - } yield address + } yield node nodesWithDifferentView.toIndexedSeq - } else Vector.empty[Address] + } else Vector.empty[UniqueAddress] gossipToRandomNodeOf( if (preferredGossipTargets.nonEmpty) preferredGossipTargets - else localGossip.members.toIndexedSeq.map(_.address) // Fall back to localGossip; important to not accidentally use `map` of the SortedSet, since the original order is not preserved) + else localGossip.members.toIndexedSeq.map(_.uniqueAddress) // Fall back to localGossip; important to not accidentally use `map` of the SortedSet, since the original order is not preserved) ) } } /** - * Runs periodic leader actions, such as auto-downing unreachable nodes, assigning partitions etc. + * Runs periodic leader actions, such as member status transitions, auto-downing unreachable nodes, + * assigning partitions etc. */ def leaderActions(): Unit = { val localGossip = latestGossip val localMembers = localGossip.members - val isLeader = localGossip.isLeader(selfAddress) + val isLeader = localGossip.isLeader(selfUniqueAddress) if (isLeader && isAvailable) { // only run the leader actions if we are the LEADER and available @@ -632,7 +689,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto // 5. Move UNREACHABLE => DOWN -- When the node is in the UNREACHABLE set it can be auto-down by leader // 6. Move DOWN => REMOVED -- When all nodes have seen that the node is DOWN (convergence) - remove the nodes from the node ring and seen table // 7. Updating the vclock version for the changes - // 8. Updating the 'seen' table + // 8. Updating the `seen` table // 9. Try to update the state with the new gossip // 10. If success - run all the side-effecting processing @@ -666,7 +723,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto member copy (status = Exiting) // Everyone else that is not Exiting stays as they are case member if member.status != Exiting && member.status != Down ⇒ member - // Move EXITING => REMOVED, DOWN => REMOVED - i.e. remove the nodes from the 'members' set/node ring and seen table + // Move EXITING => REMOVED, DOWN => REMOVED - i.e. remove the nodes from the `members` set/node ring and seen table } // ---------------------- @@ -687,8 +744,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val hasChangedState = removedMembers.nonEmpty || removedUnreachable.nonEmpty || upMembers.nonEmpty || exitingMembers.nonEmpty - // removing REMOVED nodes from the 'seen' table - val newSeen = localSeen -- removedMembers.map(_.address) -- removedUnreachable.map(_.address) + // removing REMOVED nodes from the `seen` table + val newSeen = localSeen -- removedMembers.map(_.uniqueAddress) -- removedUnreachable.map(_.uniqueAddress) val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachable) // update gossip overview val newGossip = localGossip copy (members = newMembers, overview = newOverview) // update gossip @@ -710,8 +767,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto // Check for the need to do side-effecting on successful state change val unreachableButNotDownedMembers = localUnreachableMembers filter (_.status != Down) - // removing nodes marked as DOWN from the 'seen' table - val newSeen = localSeen -- newUnreachableMembers.collect { case m if m.status == Down ⇒ m.address } + // removing nodes marked as DOWN from the `seen` table + val newSeen = localSeen -- newUnreachableMembers.collect { case m if m.status == Down ⇒ m.uniqueAddress } val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableMembers) // update gossip overview val newGossip = localGossip copy (overview = newOverview) // update gossip @@ -727,12 +784,12 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val versionedGossip = newGossip :+ vclockNode // ---------------------- - // Updating the 'seen' table + // Updating the `seen` table // Unless the leader (this node) is part of the removed members, i.e. the leader have moved himself from EXITING -> REMOVED // ---------------------- val seenVersionedGossip = - if (removedMembers.exists(_.address == selfAddress)) versionedGossip - else versionedGossip seen selfAddress + if (removedMembers.exists(_.uniqueAddress == selfUniqueAddress)) versionedGossip + else versionedGossip seen selfUniqueAddress // ---------------------- // Update the state with the new gossip @@ -754,7 +811,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val address = member.address log.info("Cluster Node [{}] - Leader is moving node [{}] from [{}] to [{}] - and removing node from node ring", selfAddress, address, member.status, Removed) - clusterCore(address) ! ClusterLeaderAction.Remove(address) + clusterCore(address) ! ClusterLeaderAction.Shutdown(member.uniqueAddress) } // tell all exiting members to exit @@ -762,7 +819,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val address = member.address log.info("Cluster Node [{}] - Leader is moving node [{}] from [{}] to [{}]", selfAddress, address, member.status, Exiting) - clusterCore(address) ! ClusterLeaderAction.Exit(address) // FIXME should use ? to await completion of handoff? + clusterCore(address) ! ClusterLeaderAction.Exit(member.uniqueAddress) // FIXME should wait for completion of handoff? } // log the auto-downing of the unreachable nodes @@ -781,7 +838,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto } /** - * Reaps the unreachable members (moves them to the 'unreachable' list in the cluster overview) according to the failure detector's verdict. + * Reaps the unreachable members (moves them to the `unreachable` list in the cluster overview) according to the failure detector's verdict. */ def reapUnreachableMembers(): Unit = { if (!isSingletonCluster && isAvailable) { @@ -793,7 +850,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val localUnreachableMembers = localGossip.overview.unreachable val newlyDetectedUnreachableMembers = localMembers filterNot { member ⇒ - member.address == selfAddress || failureDetector.isAvailable(member.address) + member.uniqueAddress == selfUniqueAddress || failureDetector.isAvailable(member.address) } if (newlyDetectedUnreachableMembers.nonEmpty) { @@ -804,9 +861,9 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val newOverview = localOverview copy (unreachable = newUnreachableMembers) val newGossip = localGossip copy (overview = newOverview, members = newMembers) - // updating vclock and 'seen' table + // updating vclock and `seen` table val versionedGossip = newGossip :+ vclockNode - val seenVersionedGossip = versionedGossip seen selfAddress + val seenVersionedGossip = versionedGossip seen selfUniqueAddress latestGossip = seenVersionedGossip @@ -817,39 +874,45 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto } } - def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] = - if (addresses.isEmpty) None - else Some(addresses(ThreadLocalRandom.current nextInt addresses.size)) + def selectRandomNode(nodes: IndexedSeq[UniqueAddress]): Option[UniqueAddress] = + if (nodes.isEmpty) None + else Some(nodes(ThreadLocalRandom.current nextInt nodes.size)) def isSingletonCluster: Boolean = latestGossip.isSingletonCluster - def isAvailable: Boolean = !latestGossip.isUnreachable(selfAddress) + def isAvailable: Boolean = !latestGossip.isUnreachable(selfUniqueAddress) /** * Gossips latest gossip to a random member in the set of members passed in as argument. * - * @return the used [[akka.actor.Address] if any + * @return the used [[UniqueAddress]] if any */ - private def gossipToRandomNodeOf(addresses: immutable.IndexedSeq[Address]): Option[Address] = { - log.debug("Cluster Node [{}] - Selecting random node to gossip to [{}]", selfAddress, addresses.mkString(", ")) + private def gossipToRandomNodeOf(nodes: immutable.IndexedSeq[UniqueAddress]): Option[UniqueAddress] = { // filter out myself - val peer = selectRandomNode(addresses filterNot (_ == selfAddress)) + val peer = selectRandomNode(nodes filterNot (_ == selfUniqueAddress)) peer foreach gossipTo peer } + // needed for tests + def sendGossipTo(address: Address): Unit = { + latestGossip.members.foreach(m ⇒ + if (m.address == address) + gossipTo(m.uniqueAddress)) + } + /** - * Gossips latest gossip to an address. + * Gossips latest gossip to a node. */ - def gossipTo(address: Address): Unit = - gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = true)) + def gossipTo(node: UniqueAddress): Unit = + gossipTo(node, GossipEnvelope(selfUniqueAddress, node, latestGossip, conversation = true)) - def oneWayGossipTo(address: Address): Unit = - gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = false)) + def oneWayGossipTo(node: UniqueAddress): Unit = + gossipTo(node, GossipEnvelope(selfUniqueAddress, node, latestGossip, conversation = false)) - def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit = - if (address != selfAddress && gossipMsg.gossip.members.exists(_.address == address)) - clusterCore(address) ! gossipMsg + def gossipTo(node: UniqueAddress, gossipMsg: GossipEnvelope): Unit = + if (node != selfUniqueAddress && gossipMsg.gossip.members.exists(_.uniqueAddress == node)) + clusterCore(node.address) ! gossipMsg def publish(newGossip: Gossip): Unit = { publisher ! PublishChanges(newGossip) @@ -874,6 +937,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto */ private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSeq[Address]) extends Actor with ActorLogging { import InternalClusterAction._ + import ClusterUserAction.JoinTo val cluster = Cluster(context.system) def selfAddress = cluster.selfAddress @@ -943,6 +1007,7 @@ private[cluster] final class FirstSeedNodeProcess(seedNodes: immutable.IndexedSe */ private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq[Address]) extends Actor with ActorLogging { import InternalClusterAction._ + import ClusterUserAction.JoinTo def selfAddress = Cluster(context.system).selfAddress @@ -1007,7 +1072,7 @@ private[cluster] class OnMemberUpListener(callback: Runnable) extends Actor with } def isSelfUp(m: Member): Boolean = - m.address == cluster.selfAddress && m.status == MemberStatus.Up + m.uniqueAddress == cluster.selfUniqueAddress && m.status == MemberStatus.Up } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 8a436dbc36..f65c4fe2be 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -187,7 +187,7 @@ object ClusterEvent { if (newGossip eq oldGossip) Nil else { val newMembers = newGossip.members -- oldGossip.members - val membersGroupedByAddress = List(newGossip.members, oldGossip.members).flatten.groupBy(_.address) + val membersGroupedByAddress = List(newGossip.members, oldGossip.members).flatten.groupBy(_.uniqueAddress) val changedMembers = membersGroupedByAddress collect { case (_, newMember :: oldMember :: Nil) if newMember.status != oldMember.status ⇒ newMember } @@ -200,7 +200,7 @@ object ClusterEvent { val allNewUnreachable = newGossip.overview.unreachable -- oldGossip.overview.unreachable val unreachableGroupedByAddress = - List(newGossip.overview.unreachable, oldGossip.overview.unreachable).flatten.groupBy(_.address) + List(newGossip.overview.unreachable, oldGossip.overview.unreachable).flatten.groupBy(_.uniqueAddress) val unreachableDownMembers = unreachableGroupedByAddress collect { case (_, newMember :: oldMember :: Nil) if newMember.status == Down && newMember.status != oldMember.status ⇒ newMember @@ -218,7 +218,7 @@ object ClusterEvent { */ private[cluster] def diffLeader(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[LeaderChanged] = { val newLeader = newGossip.leader - if (newLeader != oldGossip.leader) List(LeaderChanged(newLeader)) + if (newLeader != oldGossip.leader) List(LeaderChanged(newLeader.map(_.address))) else Nil } @@ -230,7 +230,7 @@ object ClusterEvent { role ← (oldGossip.allRoles ++ newGossip.allRoles) newLeader = newGossip.roleLeader(role) if newLeader != oldGossip.roleLeader(role) - } yield RoleLeaderChanged(role, newLeader) + } yield RoleLeaderChanged(role, newLeader.map(_.address)) } /** @@ -242,7 +242,7 @@ object ClusterEvent { val newConvergence = newGossip.convergence val newSeenBy = newGossip.seenBy if (newConvergence != oldGossip.convergence || newSeenBy != oldGossip.seenBy) - List(SeenChanged(newConvergence, newSeenBy)) + List(SeenChanged(newConvergence, newSeenBy.map(_.address))) else Nil } } @@ -273,7 +273,6 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto case Subscribe(subscriber, to) ⇒ subscribe(subscriber, to) case Unsubscribe(subscriber, to) ⇒ unsubscribe(subscriber, to) case PublishEvent(event) ⇒ publish(event) - case PublishStart ⇒ publishStart() } def eventStream: EventStream = context.system.eventStream @@ -286,9 +285,9 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto val state = CurrentClusterState( members = latestGossip.members, unreachable = latestGossip.overview.unreachable, - seenBy = latestGossip.seenBy, - leader = latestGossip.leader, - roleLeaderMap = latestGossip.allRoles.map(r ⇒ r -> latestGossip.roleLeader(r))(collection.breakOut)) + seenBy = latestGossip.seenBy.map(_.address), + leader = latestGossip.leader.map(_.address), + roleLeaderMap = latestGossip.allRoles.map(r ⇒ r -> latestGossip.roleLeader(r).map(_.address))(collection.breakOut)) receiver match { case Some(ref) ⇒ ref ! state case None ⇒ publish(state) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index 39eaa63bfd..436ee38203 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -70,8 +70,9 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { } def self: Member = { - state.members.find(_.address == selfAddress).orElse(state.unreachable.find(_.address == selfAddress)). - getOrElse(Member(selfAddress, MemberStatus.Removed, cluster.selfRoles)) + import cluster.selfUniqueAddress + state.members.find(_.uniqueAddress == selfUniqueAddress).orElse(state.unreachable.find(_.uniqueAddress == selfUniqueAddress)). + getOrElse(Member(selfUniqueAddress, cluster.selfRoles).copy(status = MemberStatus.Removed)) } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 64d02ca03d..8b25a283b3 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -44,11 +44,24 @@ class ClusterSettings(val config: Config, val systemName: String) { final val SeedNodes: immutable.IndexedSeq[Address] = immutableSeq(cc.getStringList("seed-nodes")).map { case AddressFromURIString(addr) ⇒ addr }.toVector final val SeedNodeTimeout: FiniteDuration = Duration(cc.getMilliseconds("seed-node-timeout"), MILLISECONDS) + final val RetryUnsuccessfulJoinAfter: Duration = { + val key = "retry-unsuccessful-join-after" + cc.getString(key).toLowerCase match { + case "off" ⇒ Duration.Undefined + case _ ⇒ Duration(cc.getMilliseconds(key), MILLISECONDS) requiring (_ > Duration.Zero, key + " > 0s, or off") + } + } final val PeriodicTasksInitialDelay: FiniteDuration = Duration(cc.getMilliseconds("periodic-tasks-initial-delay"), MILLISECONDS) final val GossipInterval: FiniteDuration = Duration(cc.getMilliseconds("gossip-interval"), MILLISECONDS) final val LeaderActionsInterval: FiniteDuration = Duration(cc.getMilliseconds("leader-actions-interval"), MILLISECONDS) final val UnreachableNodesReaperInterval: FiniteDuration = Duration(cc.getMilliseconds("unreachable-nodes-reaper-interval"), MILLISECONDS) - final val PublishStatsInterval: FiniteDuration = Duration(cc.getMilliseconds("publish-stats-interval"), MILLISECONDS) + final val PublishStatsInterval: Duration = { + val key = "publish-stats-interval" + cc.getString(key).toLowerCase match { + case "off" ⇒ Duration.Undefined + case _ ⇒ Duration(cc.getMilliseconds(key), MILLISECONDS) requiring (_ >= Duration.Zero, key + " >= 0s, or off") + } + } final val AutoJoin: Boolean = cc.getBoolean("auto-join") final val AutoDown: Boolean = cc.getBoolean("auto-down") final val Roles: Set[String] = immutableSeq(cc.getStringList("roles")).toSet @@ -78,5 +91,6 @@ class ClusterSettings(val config: Config, val systemName: String) { final val MetricsMovingAverageHalfLife: FiniteDuration = { Duration(cc.getMilliseconds("metrics.moving-average-half-life"), MILLISECONDS) } requiring (_ > Duration.Zero, "metrics.moving-average-half-life must be > 0") + } diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 5fadea4286..980f145919 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -4,7 +4,6 @@ package akka.cluster -import akka.actor.Address import scala.collection.immutable import MemberStatus._ @@ -78,7 +77,7 @@ private[cluster] case class Gossip( format (allowedLiveMemberStatus.mkString(", "), (members filter hasNotAllowedLiveMemberStatus).mkString(", "))) - val seenButNotMember = overview.seen.keySet -- members.map(_.address) -- overview.unreachable.map(_.address) + val seenButNotMember = overview.seen.keySet -- members.map(_.uniqueAddress) -- overview.unreachable.map(_.uniqueAddress) if (seenButNotMember.nonEmpty) throw new IllegalArgumentException("Nodes not part of cluster have marked the Gossip as seen, got [%s]" format seenButNotMember.mkString(", ")) @@ -102,40 +101,40 @@ private[cluster] case class Gossip( * Marks the gossip as seen by this node (address) by updating the address entry in the 'gossip.overview.seen' * Map with the VectorClock (version) for the new gossip. */ - def seen(address: Address): Gossip = { - if (seenByAddress(address)) this - else this copy (overview = overview copy (seen = overview.seen + (address -> version))) + def seen(node: UniqueAddress): Gossip = { + if (seenByNode(node)) this + else this copy (overview = overview copy (seen = overview.seen + (node -> version))) } /** * The nodes that have seen current version of the Gossip. */ - def seenBy: Set[Address] = { + def seenBy: Set[UniqueAddress] = { overview.seen.collect { - case (address, vclock) if vclock == version ⇒ address + case (node, vclock) if vclock == version ⇒ node }.toSet } /** - * Has this Gossip been seen by this address. + * Has this Gossip been seen by this node. */ - def seenByAddress(address: Address): Boolean = { - overview.seen.get(address).exists(_ == version) + def seenByNode(node: UniqueAddress): Boolean = { + overview.seen.get(node).exists(_ == version) } - private def mergeSeenTables(allowed: Set[Member], one: Map[Address, VectorClock], another: Map[Address, VectorClock]): Map[Address, VectorClock] = { - (Map.empty[Address, VectorClock] /: allowed) { + private def mergeSeenTables(allowed: Set[Member], one: Map[UniqueAddress, VectorClock], another: Map[UniqueAddress, VectorClock]): Map[UniqueAddress, VectorClock] = { + (Map.empty[UniqueAddress, VectorClock] /: allowed) { (merged, member) ⇒ - val address = member.address - (one.get(address), another.get(address)) match { + val node = member.uniqueAddress + (one.get(node), another.get(node)) match { case (None, None) ⇒ merged - case (Some(v1), None) ⇒ merged.updated(address, v1) - case (None, Some(v2)) ⇒ merged.updated(address, v2) + case (Some(v1), None) ⇒ merged.updated(node, v1) + case (None, Some(v2)) ⇒ merged.updated(node, v2) case (Some(v1), Some(v2)) ⇒ v1 tryCompareTo v2 match { case None ⇒ merged - case Some(x) if x > 0 ⇒ merged.updated(address, v1) - case _ ⇒ merged.updated(address, v2) + case Some(x) if x > 0 ⇒ merged.updated(node, v1) + case _ ⇒ merged.updated(node, v2) } } } @@ -184,19 +183,19 @@ private[cluster] case class Gossip( // status is in the seen table and has the latest vector clock // version overview.unreachable.forall(_.status == Down) && - !members.exists(m ⇒ Gossip.convergenceMemberStatus(m.status) && !seenByAddress(m.address)) + !members.exists(m ⇒ Gossip.convergenceMemberStatus(m.status) && !seenByNode(m.uniqueAddress)) } - def isLeader(address: Address): Boolean = leader == Some(address) + def isLeader(node: UniqueAddress): Boolean = leader == Some(node) - def leader: Option[Address] = leaderOf(members) + def leader: Option[UniqueAddress] = leaderOf(members) - def roleLeader(role: String): Option[Address] = leaderOf(members.filter(_.hasRole(role))) + def roleLeader(role: String): Option[UniqueAddress] = leaderOf(members.filter(_.hasRole(role))) - private def leaderOf(mbrs: immutable.SortedSet[Member]): Option[Address] = { + private def leaderOf(mbrs: immutable.SortedSet[Member]): Option[UniqueAddress] = { if (mbrs.isEmpty) None else mbrs.find(m ⇒ Gossip.leaderMemberStatus(m.status)). - orElse(Some(mbrs.min(Member.leaderStatusOrdering))).map(_.address) + orElse(Some(mbrs.min(Member.leaderStatusOrdering))).map(_.uniqueAddress) } def allRoles: Set[String] = members.flatMap(_.roles) @@ -206,20 +205,16 @@ private[cluster] case class Gossip( /** * Returns true if the node is in the unreachable set */ - def isUnreachable(address: Address): Boolean = - overview.unreachable exists { _.address == address } + def isUnreachable(node: UniqueAddress): Boolean = + overview.unreachable exists { _.uniqueAddress == node } - def member(address: Address): Member = { - members.find(_.address == address).orElse(overview.unreachable.find(_.address == address)). - getOrElse(Member(address, Removed, Set.empty)) + def member(node: UniqueAddress): Member = { + members.find(_.uniqueAddress == node).orElse(overview.unreachable.find(_.uniqueAddress == node)). + getOrElse(Member.removed(node)) // placeholder for removed member } override def toString = - "Gossip(" + - "overview = " + overview + - ", members = [" + members.mkString(", ") + - "], version = " + version + - ")" + s"Gossip(members = [${members.mkString(", ")}], overview = ${overview}, version = ${version})" } /** @@ -228,21 +223,20 @@ private[cluster] case class Gossip( */ @SerialVersionUID(1L) private[cluster] case class GossipOverview( - seen: Map[Address, VectorClock] = Map.empty, + seen: Map[UniqueAddress, VectorClock] = Map.empty, unreachable: Set[Member] = Set.empty) { - def isNonDownUnreachable(address: Address): Boolean = - unreachable.exists { m ⇒ m.address == address && m.status != Down } - override def toString = - "GossipOverview(seen = [" + seen.mkString(", ") + - "], unreachable = [" + unreachable.mkString(", ") + - "])" + s"GossipOverview(unreachable = [${unreachable.mkString(", ")}], seen = [${seen.mkString(", ")}])" } /** * INTERNAL API - * Envelope adding a sender address to the gossip. + * Envelope adding a sender and receiver address to the gossip. + * The reason for including the receiver address is to be able to + * ignore messages that were intended for a previous incarnation of + * the node with same host:port. The `uid` in the `UniqueAddress` is + * different in that case. */ @SerialVersionUID(1L) -private[cluster] case class GossipEnvelope(from: Address, gossip: Gossip, conversation: Boolean = true) extends ClusterMessage +private[cluster] case class GossipEnvelope(from: UniqueAddress, to: UniqueAddress, gossip: Gossip, conversation: Boolean = true) extends ClusterMessage diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index d60ce2afc3..6929b2dfea 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -18,13 +18,20 @@ import MemberStatus._ * and roles. */ @SerialVersionUID(1L) -class Member(val address: Address, val status: MemberStatus, val roles: Set[String]) extends Serializable { - override def hashCode = address.## +class Member private[cluster] ( + /** INTERNAL API **/ + private[cluster] val uniqueAddress: UniqueAddress, + val status: MemberStatus, + val roles: Set[String]) extends Serializable { + + def address: Address = uniqueAddress.address + + override def hashCode = uniqueAddress.## override def equals(other: Any) = other match { - case m: Member ⇒ address == m.address + case m: Member ⇒ uniqueAddress == m.uniqueAddress case _ ⇒ false } - override def toString = "Member(address = %s, status = %s)" format (address, status) + override def toString = s"{Member(address = ${address}, status = ${status})" def hasRole(role: String): Boolean = roles.contains(role) @@ -40,7 +47,7 @@ class Member(val address: Address, val status: MemberStatus, val roles: Set[Stri else { require(allowedTransitions(oldStatus)(status), s"Invalid member status transition [ ${this} -> ${status}]") - new Member(address, status, roles) + new Member(uniqueAddress, status, roles) } } } @@ -52,8 +59,17 @@ object Member { val none = Set.empty[Member] - def apply(address: Address, status: MemberStatus, roles: Set[String]): Member = - new Member(address, status, roles) + /** + * INTERNAL API + * Create a new member with status Joining. + */ + private[cluster] def apply(uniqueAddress: UniqueAddress, roles: Set[String]): Member = + new Member(uniqueAddress, Joining, roles) + + /** + * INTERNAL API + */ + private[cluster] def removed(node: UniqueAddress): Member = new Member(node, Removed, Set.empty) /** * `Address` ordering type class, sorts addresses by host and port. @@ -87,12 +103,20 @@ object Member { * `Member` ordering type class, sorts members by host and port. */ implicit val ordering: Ordering[Member] = new Ordering[Member] { - def compare(a: Member, b: Member): Int = addressOrdering.compare(a.address, b.address) + def compare(a: Member, b: Member): Int = { + val result = addressOrdering.compare(a.address, b.address) + if (result == 0) { + val aUid = a.uniqueAddress.uid + val bUid = b.uniqueAddress.uid + if (aUid < bUid) -1 else if (aUid == bUid) 0 else 1 + } else + result + } } def pickHighestPriority(a: Set[Member], b: Set[Member]): Set[Member] = { // group all members by Address => Seq[Member] - val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.address) + val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.uniqueAddress) // pick highest MemberStatus (Member.none /: groupedByAddress) { case (acc, (_, members)) ⇒ acc + members.reduceLeft(highestPriorityOf) @@ -176,3 +200,9 @@ object MemberStatus { Exiting -> Set(Removed, Down), Removed -> Set.empty[MemberStatus]) } + +/** + * INTERNAL API + */ +@SerialVersionUID(1L) +private[cluster] case class UniqueAddress(address: Address, uid: Int) diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index a8bdfb5285..4b3e908f08 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -17,18 +17,23 @@ import java.{ lang ⇒ jl } class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializer { private val fromBinaryMap = collection.immutable.HashMap[Class[_ <: ClusterMessage], Array[Byte] ⇒ AnyRef]( - classOf[ClusterUserAction.Join] -> { + classOf[InternalClusterAction.Join] -> { case bytes ⇒ val m = msg.Join.defaultInstance.mergeFrom(bytes) - ClusterUserAction.Join(addressFromProto(m.address), m.roles.toSet) + InternalClusterAction.Join(uniqueAddressFromProto(m.node), m.roles.toSet) + }, + classOf[InternalClusterAction.Welcome] -> { + case bytes ⇒ + val m = msg.Welcome.defaultInstance.mergeFrom(bytes) + InternalClusterAction.Welcome(uniqueAddressFromProto(m.from), gossipFromProto(m.gossip)) }, classOf[ClusterUserAction.Leave] -> (bytes ⇒ ClusterUserAction.Leave(addressFromBinary(bytes))), classOf[ClusterUserAction.Down] -> (bytes ⇒ ClusterUserAction.Down(addressFromBinary(bytes))), InternalClusterAction.InitJoin.getClass -> (_ ⇒ InternalClusterAction.InitJoin), classOf[InternalClusterAction.InitJoinAck] -> (bytes ⇒ InternalClusterAction.InitJoinAck(addressFromBinary(bytes))), classOf[InternalClusterAction.InitJoinNack] -> (bytes ⇒ InternalClusterAction.InitJoinNack(addressFromBinary(bytes))), - classOf[ClusterLeaderAction.Exit] -> (bytes ⇒ ClusterLeaderAction.Exit(addressFromBinary(bytes))), - classOf[ClusterLeaderAction.Remove] -> (bytes ⇒ ClusterLeaderAction.Remove(addressFromBinary(bytes))), + classOf[ClusterLeaderAction.Exit] -> (bytes ⇒ ClusterLeaderAction.Exit(uniqueAddressFromBinary(bytes))), + classOf[ClusterLeaderAction.Shutdown] -> (bytes ⇒ ClusterLeaderAction.Shutdown(uniqueAddressFromBinary(bytes))), classOf[ClusterHeartbeatReceiver.Heartbeat] -> (bytes ⇒ ClusterHeartbeatReceiver.Heartbeat(addressFromBinary(bytes))), classOf[ClusterHeartbeatReceiver.EndHeartbeat] -> (bytes ⇒ ClusterHeartbeatReceiver.EndHeartbeat(addressFromBinary(bytes))), classOf[ClusterHeartbeatSender.HeartbeatRequest] -> (bytes ⇒ ClusterHeartbeatSender.HeartbeatRequest(addressFromBinary(bytes))), @@ -46,8 +51,10 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ gossipEnvelopeToProto(m) case m: MetricsGossipEnvelope ⇒ metricsGossipEnvelopeToProto(m) - case ClusterUserAction.Join(address, roles) ⇒ - msg.Join(addressToProto(address), roles.map(identity)(breakOut): Vector[String]) + case InternalClusterAction.Join(node, roles) ⇒ + msg.Join(uniqueAddressToProto(node), roles.map(identity)(breakOut): Vector[String]) + case InternalClusterAction.Welcome(from, gossip) ⇒ + msg.Welcome(uniqueAddressToProto(from), gossipToProto(gossip)) case ClusterUserAction.Leave(address) ⇒ addressToProto(address) case ClusterUserAction.Down(address) ⇒ @@ -58,10 +65,10 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ addressToProto(address) case InternalClusterAction.InitJoinNack(address) ⇒ addressToProto(address) - case ClusterLeaderAction.Exit(address) ⇒ - addressToProto(address) - case ClusterLeaderAction.Remove(address) ⇒ - addressToProto(address) + case ClusterLeaderAction.Exit(node) ⇒ + uniqueAddressToProto(node) + case ClusterLeaderAction.Shutdown(node) ⇒ + uniqueAddressToProto(node) case ClusterHeartbeatReceiver.EndHeartbeat(from) ⇒ addressToProto(from) case ClusterHeartbeatSender.HeartbeatRequest(from) ⇒ @@ -85,14 +92,26 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ addressFromProto(msg.Address.defaultInstance.mergeFrom(bytes)) } + private def uniqueAddressFromBinary(bytes: Array[Byte]): UniqueAddress = { + uniqueAddressFromProto(msg.UniqueAddress.defaultInstance.mergeFrom(bytes)) + } + private def addressToProto(address: Address): msg.Address = { msg.Address(address.system, address.host.getOrElse(""), address.port.getOrElse(0), Some(address.protocol)) } + private def uniqueAddressToProto(uniqueAddress: UniqueAddress): msg.UniqueAddress = { + msg.UniqueAddress(addressToProto(uniqueAddress.address), uniqueAddress.uid) + } + private def addressFromProto(address: msg.Address): Address = { Address(address.protocol.getOrElse(""), address.system, address.hostname, address.port) } + private def uniqueAddressFromProto(uniqueAddress: msg.UniqueAddress): UniqueAddress = { + UniqueAddress(addressFromProto(uniqueAddress.address), uniqueAddress.uid) + } + private val memberStatusToInt = scala.collection.immutable.HashMap[MemberStatus, Int]( MemberStatus.Joining -> msg.MemberStatus.Joining_VALUE, MemberStatus.Up -> msg.MemberStatus.Up_VALUE, @@ -108,10 +127,9 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ case _ ⇒ throw new IllegalArgumentException(s"Unknown ${unknown} [${value}] in cluster message") } - private def gossipEnvelopeToProto(envelope: GossipEnvelope): msg.GossipEnvelope = { - val gossip = envelope.gossip + private def gossipToProto(gossip: Gossip): msg.Gossip = { val allMembers = List(gossip.members, gossip.overview.unreachable).flatMap(identity) - val allAddresses = allMembers.map(_.address).to[Vector] + val allAddresses = allMembers.map(_.uniqueAddress).to[Vector] val addressMapping = allAddresses.zipWithIndex.toMap val allRoles = allMembers.flatMap(_.roles).to[Vector] val roleMapping = allRoles.zipWithIndex.toMap @@ -120,12 +138,12 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ }.to[Vector] val hashMapping = allHashes.zipWithIndex.toMap - def mapAddress(address: Address) = mapWithErrorMessage(addressMapping, address, "address") + def mapUniqueAddress(uniqueAddress: UniqueAddress) = mapWithErrorMessage(addressMapping, uniqueAddress, "address") def mapRole(role: String) = mapWithErrorMessage(roleMapping, role, "role") def mapHash(hash: String) = mapWithErrorMessage(hashMapping, hash, "hash") def memberToProto(member: Member) = { - msg.Member(mapAddress(member.address), msg.MemberStatus.valueOf(memberStatusToInt(member.status)), member.roles.map(mapRole).to[Vector]) + msg.Member(mapUniqueAddress(member.uniqueAddress), msg.MemberStatus.valueOf(memberStatusToInt(member.status)), member.roles.map(mapRole).to[Vector]) } def vectorClockToProto(version: VectorClock) = { @@ -133,9 +151,9 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ version.versions.map { case (n, t) ⇒ msg.VectorClock.Version(mapHash(n.hash), t.time) }.to[Vector]) } - def seenToProto(seen: (Address, VectorClock)) = seen match { - case (address: Address, version: VectorClock) ⇒ - msg.GossipOverview.Seen(mapAddress(address), vectorClockToProto(version)) + def seenToProto(seen: (UniqueAddress, VectorClock)) = seen match { + case (address: UniqueAddress, version: VectorClock) ⇒ + msg.GossipOverview.Seen(mapUniqueAddress(address), vectorClockToProto(version)) } val unreachable = gossip.overview.unreachable.map(memberToProto).to[Vector] @@ -144,23 +162,26 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ val overview = msg.GossipOverview(seen, unreachable) - msg.GossipEnvelope(addressToProto(envelope.from), msg.Gossip(allAddresses.map(addressToProto), - allRoles, allHashes, members, overview, vectorClockToProto(gossip.version)), - envelope.conversation) + msg.Gossip(allAddresses.map(uniqueAddressToProto), + allRoles, allHashes, members, overview, vectorClockToProto(gossip.version)) + } + + private def gossipEnvelopeToProto(envelope: GossipEnvelope): msg.GossipEnvelope = { + msg.GossipEnvelope(uniqueAddressToProto(envelope.from), uniqueAddressToProto(envelope.to), + gossipToProto(envelope.gossip), envelope.conversation) } private def gossipEnvelopeFromBinary(bytes: Array[Byte]): GossipEnvelope = { gossipEnvelopeFromProto(msg.GossipEnvelope.defaultInstance.mergeFrom(bytes)) } - private def gossipEnvelopeFromProto(envelope: msg.GossipEnvelope): GossipEnvelope = { - val gossip = envelope.gossip - val addressMapping = gossip.allAddresses.map(addressFromProto) + private def gossipFromProto(gossip: msg.Gossip): Gossip = { + val addressMapping = gossip.allAddresses.map(uniqueAddressFromProto) val roleMapping = gossip.allRoles val hashMapping = gossip.allHashes def memberFromProto(member: msg.Member) = { - Member(addressMapping(member.addressIndex), memberStatusFromInt(member.status.id), + new Member(addressMapping(member.addressIndex), memberStatusFromInt(member.status.id), member.rolesIndexes.map(roleMapping).to[Set]) } @@ -180,7 +201,12 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ val seen = gossip.overview.seen.map(seenFromProto).toMap val overview = GossipOverview(seen, unreachable) - GossipEnvelope(addressFromProto(envelope.from), Gossip(members, overview, vectorClockFromProto(gossip.version))) + Gossip(members, overview, vectorClockFromProto(gossip.version)) + } + + private def gossipEnvelopeFromProto(envelope: msg.GossipEnvelope): GossipEnvelope = { + GossipEnvelope(uniqueAddressFromProto(envelope.from), uniqueAddressFromProto(envelope.to), + gossipFromProto(envelope.gossip)) } private def metricsGossipEnvelopeToProto(envelope: MetricsGossipEnvelope): msg.MetricsGossipEnvelope = { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/DisallowJoinOfTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/DisallowJoinOfTwoClustersSpec.scala new file mode 100644 index 0000000000..a729de52d6 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/DisallowJoinOfTwoClustersSpec.scala @@ -0,0 +1,78 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.cluster + +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ + +object DisallowJoinOfTwoClustersMultiJvmSpec extends MultiNodeConfig { + val a1 = role("a1") + val a2 = role("a2") + val b1 = role("b1") + val b2 = role("b2") + val c1 = role("c1") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) +} + +class DisallowJoinOfTwoClustersMultiJvmNode1 extends DisallowJoinOfTwoClustersSpec +class DisallowJoinOfTwoClustersMultiJvmNode2 extends DisallowJoinOfTwoClustersSpec +class DisallowJoinOfTwoClustersMultiJvmNode3 extends DisallowJoinOfTwoClustersSpec +class DisallowJoinOfTwoClustersMultiJvmNode4 extends DisallowJoinOfTwoClustersSpec +class DisallowJoinOfTwoClustersMultiJvmNode5 extends DisallowJoinOfTwoClustersSpec + +abstract class DisallowJoinOfTwoClustersSpec + extends MultiNodeSpec(DisallowJoinOfTwoClustersMultiJvmSpec) + with MultiNodeClusterSpec { + + import DisallowJoinOfTwoClustersMultiJvmSpec._ + + "Three different clusters (A, B and C)" must { + + "not be able to join" taggedAs LongRunningTest in { + // make sure that the node-to-join is started before other join + runOn(a1, b1, c1) { + startClusterNode() + } + enterBarrier("first-started") + + runOn(a1, a2) { + cluster.join(a1) + } + runOn(b1, b2) { + cluster.join(b1) + } + runOn(c1) { + cluster.join(c1) + } + + val expectedSize = if (myself == c1) 1 else 2 + awaitMembersUp(numberOfMembers = expectedSize) + + enterBarrier("two-members") + + runOn(b1) { + cluster.join(a1) + } + runOn(b2) { + cluster.join(c1) + } + runOn(c1) { + cluster.join(a2) + } + + // no change expected + 1 to 5 foreach { _ ⇒ + clusterView.members.size must be(expectedSize) + Thread.sleep(1000) + } + + enterBarrier("after-1") + } + + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala deleted file mode 100644 index db5e618a22..0000000000 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ - -package akka.cluster - -import com.typesafe.config.ConfigFactory -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.testkit._ - -object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig { - val a1 = role("a1") - val a2 = role("a2") - val b1 = role("b1") - val b2 = role("b2") - val c1 = role("c1") - val c2 = role("c2") - - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) -} - -class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec -class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec -class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec -class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec -class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec -class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec - -abstract class JoinTwoClustersSpec - extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) - with MultiNodeClusterSpec { - - import JoinTwoClustersMultiJvmSpec._ - - "Three different clusters (A, B and C)" must { - - "be able to 'elect' a single leader after joining (A -> B)" taggedAs LongRunningTest in { - // make sure that the node-to-join is started before other join - runOn(a1, b1, c1) { - startClusterNode() - } - enterBarrier("first-started") - - runOn(a1, a2) { - cluster.join(a1) - } - runOn(b1, b2) { - cluster.join(b1) - } - runOn(c1, c2) { - cluster.join(c1) - } - - awaitMembersUp(numberOfMembers = 2) - - assertLeader(a1, a2) - assertLeader(b1, b2) - assertLeader(c1, c2) - - enterBarrier("two-members") - - runOn(b2) { - cluster.join(a1) - } - - runOn(a1, a2, b1, b2) { - awaitMembersUp(numberOfMembers = 4) - } - - assertLeader(a1, a2, b1, b2) - assertLeader(c1, c2) - - enterBarrier("four-members") - } - - "be able to 'elect' a single leader after joining (C -> A + B)" taggedAs LongRunningTest in { - - runOn(b2) { - cluster.join(c1) - } - - awaitMembersUp(numberOfMembers = 6) - - assertLeader(a1, a2, b1, b2, c1, c2) - - enterBarrier("six-members") - } - } -} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala index 8a20bc8efd..bfbf1f3ad4 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala @@ -17,13 +17,14 @@ case class SingletonClusterMultiNodeConfig(failureDetectorPuppet: Boolean) exten commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString(""" akka.cluster { - auto-join = on auto-down = on failure-detector.threshold = 4 } """)). withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) + nodeConfig(first)(ConfigFactory.parseString("akka.cluster.auto-join = on")) + } class SingletonClusterWithFailureDetectorPuppetMultiJvmNode1 extends SingletonClusterSpec(failureDetectorPuppet = true) @@ -45,8 +46,10 @@ abstract class SingletonClusterSpec(multiNodeConfig: SingletonClusterMultiNodeCo "A cluster of 2 nodes" must { "become singleton cluster when started with 'auto-join=on' and 'seed-nodes=[]'" taggedAs LongRunningTest in { - awaitMembersUp(1) - clusterView.isSingletonCluster must be(true) + runOn(first) { + awaitMembersUp(1) + clusterView.isSingletonCluster must be(true) + } enterBarrier("after-1") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index 9cf6ba710c..92e4be77b1 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -228,7 +228,7 @@ abstract class TransitionSpec runOn(third) { markNodeAsUnavailable(second) reapUnreachable() - awaitAssert(clusterView.unreachableMembers must contain(Member(second, Up, Set.empty))) + awaitAssert(clusterView.unreachableMembers.map(_.address) must contain(address(second))) awaitAssert(seenLatestGossip must be(Set(third))) } @@ -237,7 +237,7 @@ abstract class TransitionSpec third gossipTo first runOn(first, third) { - awaitAssert(clusterView.unreachableMembers must contain(Member(second, Up, Set.empty))) + awaitAssert(clusterView.unreachableMembers.map(_.address) must contain(address(second))) } runOn(first) { @@ -249,7 +249,7 @@ abstract class TransitionSpec first gossipTo third runOn(first, third) { - awaitAssert(clusterView.unreachableMembers must contain(Member(second, Down, Set.empty))) + awaitAssert(clusterView.unreachableMembers.map(_.address) must contain(address(second))) awaitMemberStatus(second, Down) awaitAssert(seenLatestGossip must be(Set(first, third))) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala similarity index 57% rename from akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala rename to akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala index 680bb87041..1fea5d3ff8 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeJoinsAgainSpec.scala @@ -4,20 +4,22 @@ package akka.cluster import language.postfixOps - -import org.scalatest.BeforeAndAfter +import scala.collection.immutable +import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory +import akka.actor.ActorSystem +import akka.actor.ExtendedActorSystem +import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec -import akka.testkit._ -import com.typesafe.config.ConfigFactory -import akka.actor.Address -import akka.remote.testconductor.RoleName -import scala.concurrent.duration._ -import scala.collection.immutable import akka.remote.transport.ThrottlerTransportAdapter.Direction +import akka.testkit._ +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.actor.RootActorPath -case class UnreachableNodeRejoinsClusterMultiNodeConfig(failureDetectorPuppet: Boolean) extends MultiNodeConfig { +object UnreachableNodeJoinsAgainMultiNodeConfig extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") @@ -31,29 +33,25 @@ case class UnreachableNodeRejoinsClusterMultiNodeConfig(failureDetectorPuppet: B akka.remote.log-remote-lifecycle-events = off akka.cluster.publish-stats-interval = 0s - akka.loglevel = INFO - """).withFallback(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet)))) + """).withFallback(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))) testTransport(on = true) + + class EndActor(testActor: ActorRef) extends Actor { + def receive = { case msg ⇒ testActor forward msg } + } } -class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true) -class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true) -class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true) -class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true) +class UnreachableNodeJoinsAgainMultiJvmNode1 extends UnreachableNodeJoinsAgainSpec +class UnreachableNodeJoinsAgainMultiJvmNode2 extends UnreachableNodeJoinsAgainSpec +class UnreachableNodeJoinsAgainMultiJvmNode3 extends UnreachableNodeJoinsAgainSpec +class UnreachableNodeJoinsAgainMultiJvmNode4 extends UnreachableNodeJoinsAgainSpec -class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = false) -class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = false) -class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = false) -class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = false) - -abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNodeRejoinsClusterMultiNodeConfig) - extends MultiNodeSpec(multiNodeConfig) +abstract class UnreachableNodeJoinsAgainSpec + extends MultiNodeSpec(UnreachableNodeJoinsAgainMultiNodeConfig) with MultiNodeClusterSpec { - def this(failureDetectorPuppet: Boolean) = this(UnreachableNodeRejoinsClusterMultiNodeConfig(failureDetectorPuppet)) - - import multiNodeConfig._ + import UnreachableNodeJoinsAgainMultiNodeConfig._ muteMarkingAsUnreachable() @@ -139,10 +137,21 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod awaitAssert(clusterView.unreachableMembers must be(Set.empty), 15 seconds) } + endBarrier() } - "allow node to REJOIN when the network is plugged back in" taggedAs LongRunningTest in { + "allow fresh node with same host:port to join again when the network is plugged back in" taggedAs LongRunningTest in { + val expectedNumberOfMembers = roles.size + + // victim actor system will be shutdown, not part of testConductor any more + // so we can't use barriers to synchronize with it + val masterAddress = address(master) + runOn(master) { + system.actorOf(Props(classOf[EndActor], testActor), "end") + } + enterBarrier("end-actor-created") + runOn(first) { // put the network back in allBut(victim).foreach { roleName ⇒ @@ -152,13 +161,48 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod enterBarrier("plug_in_victim") - runOn(victim) { - joinWithin(master, 10.seconds) + runOn(first) { + // will shutdown ActorSystem of victim + testConductor.removeNode(victim) } - awaitMembersUp(roles.size) + runOn(victim) { + val victimAddress = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + system.shutdown() + system.awaitTermination(10 seconds) + // create new ActorSystem with same host:port + val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s""" + akka.remote.netty.tcp { + hostname = ${victimAddress.host.get} + port = ${victimAddress.port.get} + } + """).withFallback(system.settings.config)) + + try { + Cluster(freshSystem).join(masterAddress) + Thread.sleep(5000) + within(15 seconds) { + awaitAssert(Cluster(freshSystem).readView.members.map(_.address) must contain(victimAddress)) + awaitAssert(Cluster(freshSystem).readView.members.size must be(expectedNumberOfMembers)) + awaitAssert(clusterView.members.map(_.status) must be(Set(MemberStatus.Up))) + } + freshSystem.actorSelection(RootActorPath(master) / "user" / "end") ! "done" + } finally { + freshSystem.shutdown() + freshSystem.awaitTermination(10 seconds) + } + // no barrier here, because it is not part of testConductor roles any more + } + + runOn(allBut(victim): _*) { + awaitMembersUp(expectedNumberOfMembers) + // don't end the test until the freshSystem is done + runOn(master) { + expectMsg("done") + } + endBarrier() + } - endBarrier() } } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 7e268660a3..613f86cf8b 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -25,6 +25,7 @@ class ClusterConfigSpec extends AkkaSpec { FailureDetectorImplementationClass must be(classOf[PhiAccrualFailureDetector].getName) SeedNodes must be(Seq.empty[String]) SeedNodeTimeout must be(5 seconds) + RetryUnsuccessfulJoinAfter must be(10 seconds) PeriodicTasksInitialDelay must be(1 seconds) GossipInterval must be(1 second) HeartbeatInterval must be(1 second) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala index fa9cf857ac..8e9fd04b0e 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -24,26 +24,26 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender { var publisher: ActorRef = _ - val aUp = Member(Address("akka.tcp", "sys", "a", 2552), Up, Set.empty) + val aUp = TestMember(Address("akka.tcp", "sys", "a", 2552), Up) val aLeaving = aUp.copy(status = Leaving) val aExiting = aLeaving.copy(status = Exiting) val aRemoved = aExiting.copy(status = Removed) - val bExiting = Member(Address("akka.tcp", "sys", "b", 2552), Exiting, Set.empty) + val bExiting = TestMember(Address("akka.tcp", "sys", "b", 2552), Exiting) val bRemoved = bExiting.copy(status = Removed) - val cJoining = Member(Address("akka.tcp", "sys", "c", 2552), Joining, Set("GRP")) + val cJoining = TestMember(Address("akka.tcp", "sys", "c", 2552), Joining, Set("GRP")) val cUp = cJoining.copy(status = Up) val cRemoved = cUp.copy(status = Removed) - val a51Up = Member(Address("akka.tcp", "sys", "a", 2551), Up, Set.empty) - val dUp = Member(Address("akka.tcp", "sys", "d", 2552), Up, Set("GRP")) + val a51Up = TestMember(Address("akka.tcp", "sys", "a", 2551), Up) + val dUp = TestMember(Address("akka.tcp", "sys", "d", 2552), Up, Set("GRP")) - val g0 = Gossip(members = SortedSet(aUp)).seen(aUp.address) - val g1 = Gossip(members = SortedSet(aUp, bExiting, cJoining)).seen(aUp.address).seen(bExiting.address).seen(cJoining.address) - val g2 = Gossip(members = SortedSet(aUp, bExiting, cUp)).seen(aUp.address) - val g3 = g2.seen(bExiting.address).seen(cUp.address) - val g4 = Gossip(members = SortedSet(a51Up, aUp, bExiting, cUp)).seen(aUp.address) - val g5 = Gossip(members = SortedSet(a51Up, aUp, bExiting, cUp)).seen(aUp.address).seen(bExiting.address).seen(cUp.address).seen(a51Up.address) - val g6 = Gossip(members = SortedSet(aLeaving, bExiting, cUp)).seen(aUp.address) - val g7 = Gossip(members = SortedSet(aExiting, bExiting, cUp)).seen(aUp.address) + val g0 = Gossip(members = SortedSet(aUp)).seen(aUp.uniqueAddress) + val g1 = Gossip(members = SortedSet(aUp, bExiting, cJoining)).seen(aUp.uniqueAddress).seen(bExiting.uniqueAddress).seen(cJoining.uniqueAddress) + val g2 = Gossip(members = SortedSet(aUp, bExiting, cUp)).seen(aUp.uniqueAddress) + val g3 = g2.seen(bExiting.uniqueAddress).seen(cUp.uniqueAddress) + val g4 = Gossip(members = SortedSet(a51Up, aUp, bExiting, cUp)).seen(aUp.uniqueAddress) + val g5 = Gossip(members = SortedSet(a51Up, aUp, bExiting, cUp)).seen(aUp.uniqueAddress).seen(bExiting.uniqueAddress).seen(cUp.uniqueAddress).seen(a51Up.uniqueAddress) + val g6 = Gossip(members = SortedSet(aLeaving, bExiting, cUp)).seen(aUp.uniqueAddress) + val g7 = Gossip(members = SortedSet(aExiting, bExiting, cUp)).seen(aUp.uniqueAddress) // created in beforeEach var memberSubscriber: TestProbe = _ @@ -136,20 +136,6 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec memberSubscriber.expectMsg(MemberUp(cUp)) } - "publish clean state when PublishStart" in { - val subscriber = TestProbe() - publisher ! Subscribe(subscriber.ref, classOf[ClusterDomainEvent]) - subscriber.expectMsgType[CurrentClusterState] - publisher ! PublishChanges(g3) - subscriber.expectMsg(MemberExited(bExiting)) - subscriber.expectMsg(MemberUp(cUp)) - subscriber.expectMsg(RoleLeaderChanged("GRP", Some(cUp.address))) - subscriber.expectMsgType[SeenChanged] - - publisher ! PublishStart - subscriber.expectMsgType[CurrentClusterState] must be(CurrentClusterState()) - } - "publish SeenChanged" in { val subscriber = TestProbe() publisher ! Subscribe(subscriber.ref, classOf[SeenChanged]) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala index d01540ae75..cba8900869 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala @@ -16,27 +16,27 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers { import ClusterEvent._ val aRoles = Set("AA", "AB") - val aJoining = Member(Address("akka.tcp", "sys", "a", 2552), Joining, aRoles) - val aUp = Member(Address("akka.tcp", "sys", "a", 2552), Up, aRoles) - val aRemoved = Member(Address("akka.tcp", "sys", "a", 2552), Removed, aRoles) + val aJoining = TestMember(Address("akka.tcp", "sys", "a", 2552), Joining, aRoles) + val aUp = TestMember(Address("akka.tcp", "sys", "a", 2552), Up, aRoles) + val aRemoved = TestMember(Address("akka.tcp", "sys", "a", 2552), Removed, aRoles) val bRoles = Set("AB", "BB") - val bUp = Member(Address("akka.tcp", "sys", "b", 2552), Up, bRoles) - val bDown = Member(Address("akka.tcp", "sys", "b", 2552), Down, bRoles) - val bRemoved = Member(Address("akka.tcp", "sys", "b", 2552), Removed, bRoles) + val bUp = TestMember(Address("akka.tcp", "sys", "b", 2552), Up, bRoles) + val bDown = TestMember(Address("akka.tcp", "sys", "b", 2552), Down, bRoles) + val bRemoved = TestMember(Address("akka.tcp", "sys", "b", 2552), Removed, bRoles) val cRoles = Set.empty[String] - val cUp = Member(Address("akka.tcp", "sys", "c", 2552), Up, cRoles) - val cLeaving = Member(Address("akka.tcp", "sys", "c", 2552), Leaving, cRoles) + val cUp = TestMember(Address("akka.tcp", "sys", "c", 2552), Up, cRoles) + val cLeaving = TestMember(Address("akka.tcp", "sys", "c", 2552), Leaving, cRoles) val dRoles = Set("DD", "DE") - val dLeaving = Member(Address("akka.tcp", "sys", "d", 2552), Leaving, dRoles) - val dExiting = Member(Address("akka.tcp", "sys", "d", 2552), Exiting, dRoles) - val dRemoved = Member(Address("akka.tcp", "sys", "d", 2552), Removed, dRoles) + val dLeaving = TestMember(Address("akka.tcp", "sys", "d", 2552), Leaving, dRoles) + val dExiting = TestMember(Address("akka.tcp", "sys", "d", 2552), Exiting, dRoles) + val dRemoved = TestMember(Address("akka.tcp", "sys", "d", 2552), Removed, dRoles) val eRoles = Set("EE", "DE") - val eJoining = Member(Address("akka.tcp", "sys", "e", 2552), Joining, eRoles) - val eUp = Member(Address("akka.tcp", "sys", "e", 2552), Up, eRoles) - val eDown = Member(Address("akka.tcp", "sys", "e", 2552), Down, eRoles) + val eJoining = TestMember(Address("akka.tcp", "sys", "e", 2552), Joining, eRoles) + val eUp = TestMember(Address("akka.tcp", "sys", "e", 2552), Up, eRoles) + val eDown = TestMember(Address("akka.tcp", "sys", "e", 2552), Down, eRoles) - private[cluster] def converge(gossip: Gossip): (Gossip, Set[Address]) = - ((gossip, Set.empty[Address]) /: gossip.members) { case ((gs, as), m) ⇒ (gs.seen(m.address), as + m.address) } + private[cluster] def converge(gossip: Gossip): (Gossip, Set[UniqueAddress]) = + ((gossip, Set.empty[UniqueAddress]) /: gossip.members) { case ((gs, as), m) ⇒ (gs.seen(m.uniqueAddress), as + m.uniqueAddress) } "Domain events" must { @@ -52,7 +52,7 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers { diffMemberEvents(g1, g2) must be(Seq(MemberUp(bUp))) diffUnreachable(g1, g2) must be(Seq.empty) - diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2))) + diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) } "be produced for changed status of members" in { @@ -61,7 +61,7 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers { diffMemberEvents(g1, g2) must be(Seq(MemberUp(aUp))) diffUnreachable(g1, g2) must be(Seq.empty) - diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2))) + diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) } "be produced for members in unreachable" in { @@ -78,12 +78,12 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers { diffMemberEvents(g1, g2) must be(Seq(MemberRemoved(dRemoved))) diffUnreachable(g1, g2) must be(Seq.empty) - diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2))) + diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) } "be produced for convergence changes" in { - val g1 = Gossip(members = SortedSet(aUp, bUp, eJoining)).seen(aUp.address).seen(bUp.address).seen(eJoining.address) - val g2 = Gossip(members = SortedSet(aUp, bUp, eJoining)).seen(aUp.address).seen(bUp.address) + val g1 = Gossip(members = SortedSet(aUp, bUp, eJoining)).seen(aUp.uniqueAddress).seen(bUp.uniqueAddress).seen(eJoining.uniqueAddress) + val g2 = Gossip(members = SortedSet(aUp, bUp, eJoining)).seen(aUp.uniqueAddress).seen(bUp.uniqueAddress) diffMemberEvents(g1, g2) must be(Seq.empty) diffUnreachable(g1, g2) must be(Seq.empty) @@ -99,7 +99,7 @@ class ClusterDomainEventSpec extends WordSpec with MustMatchers { diffMemberEvents(g1, g2) must be(Seq(MemberRemoved(aRemoved))) diffUnreachable(g1, g2) must be(Seq.empty) - diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2))) + diffSeen(g1, g2) must be(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address)))) diffLeader(g1, g2) must be(Seq(LeaderChanged(Some(bUp.address)))) } diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index 8cd2b45508..58a75e82d4 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -14,18 +14,18 @@ class GossipSpec extends WordSpec with MustMatchers { import MemberStatus._ - val a1 = Member(Address("akka.tcp", "sys", "a", 2552), Up, Set.empty) - val a2 = Member(a1.address, Joining, Set.empty) - val b1 = Member(Address("akka.tcp", "sys", "b", 2552), Up, Set.empty) - val b2 = Member(b1.address, Removed, Set.empty) - val c1 = Member(Address("akka.tcp", "sys", "c", 2552), Leaving, Set.empty) - val c2 = Member(c1.address, Up, Set.empty) - val c3 = Member(c1.address, Exiting, Set.empty) - val d1 = Member(Address("akka.tcp", "sys", "d", 2552), Leaving, Set.empty) - val d2 = Member(d1.address, Removed, Set.empty) - val e1 = Member(Address("akka.tcp", "sys", "e", 2552), Joining, Set.empty) - val e2 = Member(e1.address, Up, Set.empty) - val e3 = Member(e1.address, Down, Set.empty) + val a1 = TestMember(Address("akka.tcp", "sys", "a", 2552), Up) + val a2 = TestMember(a1.address, Joining) + val b1 = TestMember(Address("akka.tcp", "sys", "b", 2552), Up) + val b2 = TestMember(b1.address, Removed) + val c1 = TestMember(Address("akka.tcp", "sys", "c", 2552), Leaving) + val c2 = TestMember(c1.address, Up) + val c3 = TestMember(c1.address, Exiting) + val d1 = TestMember(Address("akka.tcp", "sys", "d", 2552), Leaving) + val d2 = TestMember(d1.address, Removed) + val e1 = TestMember(Address("akka.tcp", "sys", "e", 2552), Joining) + val e2 = TestMember(e1.address, Up) + val e3 = TestMember(e1.address, Down) "A Gossip" must { @@ -89,33 +89,33 @@ class GossipSpec extends WordSpec with MustMatchers { } "not have non cluster members in seen table" in intercept[IllegalArgumentException] { - Gossip(members = SortedSet(a1, e1)).seen(a1.address).seen(e1.address).seen(b1.address) + Gossip(members = SortedSet(a1, e1)).seen(a1.uniqueAddress).seen(e1.uniqueAddress).seen(b1.uniqueAddress) } "have leader as first member based on ordering, except Exiting status" in { - Gossip(members = SortedSet(c2, e2)).leader must be(Some(c2.address)) - Gossip(members = SortedSet(c3, e2)).leader must be(Some(e2.address)) - Gossip(members = SortedSet(c3)).leader must be(Some(c3.address)) + Gossip(members = SortedSet(c2, e2)).leader must be(Some(c2.uniqueAddress)) + Gossip(members = SortedSet(c3, e2)).leader must be(Some(e2.uniqueAddress)) + Gossip(members = SortedSet(c3)).leader must be(Some(c3.uniqueAddress)) } "merge seen table correctly" in { val vclockNode = VectorClock.Node("something") - val g1 = (Gossip(members = SortedSet(a1, b1, c1, d1)) :+ vclockNode).seen(a1.address).seen(b1.address) - val g2 = (Gossip(members = SortedSet(a1, b1, c1, d1)) :+ vclockNode).seen(a1.address).seen(c1.address) - val g3 = (g1 copy (version = g2.version)).seen(d1.address) + val g1 = (Gossip(members = SortedSet(a1, b1, c1, d1)) :+ vclockNode).seen(a1.uniqueAddress).seen(b1.uniqueAddress) + val g2 = (Gossip(members = SortedSet(a1, b1, c1, d1)) :+ vclockNode).seen(a1.uniqueAddress).seen(c1.uniqueAddress) + val g3 = (g1 copy (version = g2.version)).seen(d1.uniqueAddress) def checkMerged(merged: Gossip) { val keys = merged.overview.seen.keys.toSeq keys.length must be(4) - keys.toSet must be(Set(a1.address, b1.address, c1.address, d1.address)) + keys.toSet must be(Set(a1.uniqueAddress, b1.uniqueAddress, c1.uniqueAddress, d1.uniqueAddress)) - merged seenByAddress (a1.address) must be(true) - merged seenByAddress (b1.address) must be(false) - merged seenByAddress (c1.address) must be(true) - merged seenByAddress (d1.address) must be(true) - merged seenByAddress (e1.address) must be(false) + merged seenByNode (a1.uniqueAddress) must be(true) + merged seenByNode (b1.uniqueAddress) must be(false) + merged seenByNode (c1.uniqueAddress) must be(true) + merged seenByNode (d1.uniqueAddress) must be(true) + merged seenByNode (e1.uniqueAddress) must be(false) - merged.overview.seen(b1.address) must be(g1.version) + merged.overview.seen(b1.uniqueAddress) must be(g1.version) } checkMerged(g3 merge g2) diff --git a/akka-cluster/src/test/scala/akka/cluster/MemberOrderingSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MemberOrderingSpec.scala index c8537b2bb0..17db59c43a 100644 --- a/akka-cluster/src/test/scala/akka/cluster/MemberOrderingSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/MemberOrderingSpec.scala @@ -17,7 +17,7 @@ class MemberOrderingSpec extends WordSpec with MustMatchers { import Member.addressOrdering import MemberStatus._ - def m(address: Address, status: MemberStatus): Member = Member(address, status, Set.empty) + def m(address: Address, status: MemberStatus): Member = TestMember(address, status) "An Ordering[Member]" must { @@ -52,7 +52,9 @@ class MemberOrderingSpec extends WordSpec with MustMatchers { "have stable equals and hashCode" in { val address = Address("akka.tcp", "sys1", "host1", 9000) val m1 = m(address, Joining) - val m2 = m(address, Up) + val m11 = Member(UniqueAddress(address, -3), Set.empty) + val m2 = m1.copy(status = Up) + val m22 = m11.copy(status = Up) val m3 = m(address.copy(port = Some(10000)), Up) m1 must be(m2) @@ -60,6 +62,13 @@ class MemberOrderingSpec extends WordSpec with MustMatchers { m3 must not be (m2) m3 must not be (m1) + + m11 must be(m22) + m11.hashCode must be(m22.hashCode) + + // different uid + m1 must not be (m11) + m2 must not be (m22) } "have consistent ordering and equals" in { @@ -71,6 +80,13 @@ class MemberOrderingSpec extends WordSpec with MustMatchers { val z = m(address2, Up) Member.ordering.compare(x, y) must be(0) Member.ordering.compare(x, z) must be(Member.ordering.compare(y, z)) + + // different uid + val a = m(address1, Joining) + val b = Member(UniqueAddress(address1, -3), Set.empty) + Member.ordering.compare(a, b) must be(1) + Member.ordering.compare(b, a) must be(-1) + } "work with SortedSet" in { @@ -84,6 +100,7 @@ class MemberOrderingSpec extends WordSpec with MustMatchers { (SortedSet(m(address2, Up), m(address3, Joining), m(address1, Exiting)) - m(address1, Removed)) must be( SortedSet(m(address2, Up), m(address3, Joining))) } + } "An Ordering[Address]" must { diff --git a/akka-cluster/src/test/scala/akka/cluster/TestMember.scala b/akka-cluster/src/test/scala/akka/cluster/TestMember.scala new file mode 100644 index 0000000000..2200f42fef --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/TestMember.scala @@ -0,0 +1,14 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.cluster + +import akka.actor.Address + +object TestMember { + def apply(address: Address, status: MemberStatus): Member = + apply(address, status, Set.empty) + + def apply(address: Address, status: MemberStatus, roles: Set[String]): Member = + new Member(UniqueAddress(address, 0), status, roles) +} \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala index ba2d885395..47aa7a8cfc 100644 --- a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala @@ -22,34 +22,39 @@ class ClusterMessageSerializerSpec extends AkkaSpec { import MemberStatus._ - val a1 = Member(Address("akka.tcp", "sys", "a", 2552), Joining, Set.empty) - val b1 = Member(Address("akka.tcp", "sys", "b", 2552), Up, Set("r1")) - val c1 = Member(Address("akka.tcp", "sys", "c", 2552), Leaving, Set("r2")) - val d1 = Member(Address("akka.tcp", "sys", "d", 2552), Exiting, Set("r1", "r2")) - val e1 = Member(Address("akka.tcp", "sys", "e", 2552), Down, Set("r3")) - val f1 = Member(Address("akka.tcp", "sys", "f", 2552), Removed, Set("r2", "r3")) + val a1 = TestMember(Address("akka.tcp", "sys", "a", 2552), Joining, Set.empty) + val b1 = TestMember(Address("akka.tcp", "sys", "b", 2552), Up, Set("r1")) + val c1 = TestMember(Address("akka.tcp", "sys", "c", 2552), Leaving, Set("r2")) + val d1 = TestMember(Address("akka.tcp", "sys", "d", 2552), Exiting, Set("r1", "r2")) + val e1 = TestMember(Address("akka.tcp", "sys", "e", 2552), Down, Set("r3")) + val f1 = TestMember(Address("akka.tcp", "sys", "f", 2552), Removed, Set("r2", "r3")) "ClusterMessages" must { "be serializable" in { val address = Address("akka.tcp", "system", "some.host.org", 4711) - checkSerialization(ClusterUserAction.Join(address, Set("foo", "bar"))) + val uniqueAddress = UniqueAddress(address, 17) + val address2 = Address("akka.tcp", "system", "other.host.org", 4711) + val uniqueAddress2 = UniqueAddress(address2, 18) + checkSerialization(InternalClusterAction.Join(uniqueAddress, Set("foo", "bar"))) checkSerialization(ClusterUserAction.Leave(address)) checkSerialization(ClusterUserAction.Down(address)) checkSerialization(InternalClusterAction.InitJoin) checkSerialization(InternalClusterAction.InitJoinAck(address)) checkSerialization(InternalClusterAction.InitJoinNack(address)) - checkSerialization(ClusterLeaderAction.Exit(address)) - checkSerialization(ClusterLeaderAction.Remove(address)) + checkSerialization(ClusterLeaderAction.Exit(uniqueAddress)) + checkSerialization(ClusterLeaderAction.Shutdown(uniqueAddress)) checkSerialization(ClusterHeartbeatReceiver.Heartbeat(address)) checkSerialization(ClusterHeartbeatReceiver.EndHeartbeat(address)) checkSerialization(ClusterHeartbeatSender.HeartbeatRequest(address)) val node1 = VectorClock.Node("node1") val node2 = VectorClock.Node("node2") - val g1 = (Gossip(SortedSet(a1, b1, c1, d1)) :+ node1).seen(a1.address).seen(b1.address) - val g2 = (g1 :+ node2).seen(a1.address).seen(c1.address) - checkSerialization(GossipEnvelope(a1.address, g2.copy(overview = g2.overview.copy(unreachable = Set(e1, f1))))) + val g1 = (Gossip(SortedSet(a1, b1, c1, d1)) :+ node1).seen(a1.uniqueAddress).seen(b1.uniqueAddress) + val g2 = (g1 :+ node2).seen(a1.uniqueAddress).seen(c1.uniqueAddress) + checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g2.copy(overview = g2.overview.copy(unreachable = Set(e1, f1))))) + + checkSerialization(InternalClusterAction.Welcome(uniqueAddress, g2)) val mg = MetricsGossip(Set(NodeMetrics(a1.address, 4711, Set(Metric("foo", 1.2, None))), NodeMetrics(b1.address, 4712, Set(Metric("foo", 2.1, Some(EWMA(value = 100.0, alpha = 0.18))), diff --git a/akka-docs/rst/cluster/cluster-usage-java.rst b/akka-docs/rst/cluster/cluster-usage-java.rst index 66a18af618..9d924e79d1 100644 --- a/akka-docs/rst/cluster/cluster-usage-java.rst +++ b/akka-docs/rst/cluster/cluster-usage-java.rst @@ -126,6 +126,17 @@ seed nodes at all. Joining can also be performed programatically with ``Cluster.get(system).join(address)``. +Unsuccessful join attempts are automatically retried after the time period defined in +configuration property ``retry-unsuccessful-join-after``. When using auto-joining with +``seed-nodes`` this means that a new seed node is picked. When joining manually or +programatically this means that the last join request is retried. Retries can be disabled by +setting the property to ``off``. + +An actor system can only join a cluster once. Additional attempts will be ignored. +When it has successfully joined it must be restarted to be able to join another +cluster or to join the same cluster again. It can use the same host name and port +after the restart, but it must have been removed from the cluster before the join +request is accepted. Automatic vs. Manual Downing ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/akka-docs/rst/cluster/cluster-usage-scala.rst b/akka-docs/rst/cluster/cluster-usage-scala.rst index 4321cc169d..0da45fb27a 100644 --- a/akka-docs/rst/cluster/cluster-usage-scala.rst +++ b/akka-docs/rst/cluster/cluster-usage-scala.rst @@ -119,6 +119,17 @@ seed nodes at all. Joining can also be performed programatically with ``Cluster(system).join(address)``. +Unsuccessful join attempts are automatically retried after the time period defined in +configuration property ``retry-unsuccessful-join-after``. When using auto-joining with +``seed-nodes`` this means that a new seed node is picked. When joining manually or +programatically this means that the last join request is retried. Retries can be disabled by +setting the property to ``off``. + +An actor system can only join a cluster once. Additional attempts will be ignored. +When it has successfully joined it must be restarted to be able to join another +cluster or to join the same cluster again. It can use the same host name and port +after the restart, but it must have been removed from the cluster before the join +request is accepted. Automatic vs. Manual Downing ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/akka-remote/src/main/scala/akka/remote/AddressUidExtension.scala b/akka-remote/src/main/scala/akka/remote/AddressUidExtension.scala new file mode 100644 index 0000000000..0f7fe50f68 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/AddressUidExtension.scala @@ -0,0 +1,29 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.remote + +import scala.concurrent.forkjoin.ThreadLocalRandom +import akka.actor.ActorSystem +import akka.actor.ExtendedActorSystem +import akka.actor.Extension +import akka.actor.ExtensionId +import akka.actor.ExtensionIdProvider + +/** + * Extension that holds a uid that is assigned as a random `Int`. + * The uid is intended to be used together with an [[akka.actor.Address]] + * to be able to distinguish restarted actor system using the same host + * and port. + */ +object AddressUidExtension extends ExtensionId[AddressUidExtension] with ExtensionIdProvider { + override def get(system: ActorSystem): AddressUidExtension = super.get(system) + + override def lookup = AddressUidExtension + + override def createExtension(system: ExtendedActorSystem): AddressUidExtension = new AddressUidExtension(system) +} + +class AddressUidExtension(val system: ExtendedActorSystem) extends Extension { + val addressUid: Int = ThreadLocalRandom.current.nextInt() +} \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index 072d4f2bd8..9947fe4f61 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -135,8 +135,6 @@ private[akka] class RemoteSystemDaemon( case Identify(messageId) ⇒ sender ! ActorIdentity(messageId, Some(this)) - case t: Terminated ⇒ - case TerminationHook ⇒ terminating.switchOn { terminationHookDoneWhenNoChildren() diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala index 47a3cc1510..84751684cf 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSingleMasterSpec.scala @@ -74,12 +74,14 @@ abstract class StatsSampleSingleMasterSpec extends MultiNodeSpec(StatsSampleSing Cluster(system).subscribe(testActor, classOf[MemberUp]) expectMsgClass(classOf[CurrentClusterState]) - Cluster(system) join node(first).address + val firstAddress = node(first).address + val secondAddress = node(second).address + val thirdAddress = node(third).address - expectMsgAllOf( - MemberUp(Member(node(first).address, MemberStatus.Up, Set.empty)), - MemberUp(Member(node(second).address, MemberStatus.Up, Set.empty)), - MemberUp(Member(node(third).address, MemberStatus.Up, Set.empty))) + Cluster(system) join firstAddress + + receiveN(3).collect { case MemberUp(m) => m.address }.toSet must be ( + Set(firstAddress, secondAddress, thirdAddress)) Cluster(system).unsubscribe(testActor) diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala index 708512f657..31fdad6607 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala @@ -97,10 +97,8 @@ abstract class StatsSampleSpec extends MultiNodeSpec(StatsSampleSpecConfig) system.actorOf(Props[StatsWorker], "statsWorker") system.actorOf(Props[StatsService], "statsService") - expectMsgAllOf( - MemberUp(Member(firstAddress, MemberStatus.Up, Set.empty)), - MemberUp(Member(secondAddress, MemberStatus.Up, Set.empty)), - MemberUp(Member(thirdAddress, MemberStatus.Up, Set.empty))) + receiveN(3).collect { case MemberUp(m) => m.address }.toSet must be ( + Set(firstAddress, secondAddress, thirdAddress)) Cluster(system).unsubscribe(testActor) diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleJapiSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleJapiSpec.scala index d101a3568e..41741d667e 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleJapiSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleJapiSpec.scala @@ -82,10 +82,8 @@ abstract class StatsSampleJapiSpec extends MultiNodeSpec(StatsSampleJapiSpecConf system.actorOf(Props[StatsWorker], "statsWorker") system.actorOf(Props[StatsService], "statsService") - expectMsgAllOf( - MemberUp(Member(firstAddress, MemberStatus.Up, Set.empty)), - MemberUp(Member(secondAddress, MemberStatus.Up, Set.empty)), - MemberUp(Member(thirdAddress, MemberStatus.Up, Set.empty))) + receiveN(3).collect { case MemberUp(m) => m.address }.toSet must be ( + Set(firstAddress, secondAddress, thirdAddress)) Cluster(system).unsubscribe(testActor) diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleSingleMasterJapiSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleSingleMasterJapiSpec.scala index 56780043d2..9860b443db 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleSingleMasterJapiSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleSingleMasterJapiSpec.scala @@ -74,12 +74,14 @@ abstract class StatsSampleSingleMasterJapiSpec extends MultiNodeSpec(StatsSample Cluster(system).subscribe(testActor, classOf[MemberUp]) expectMsgClass(classOf[CurrentClusterState]) - Cluster(system) join node(first).address + val firstAddress = node(first).address + val secondAddress = node(second).address + val thirdAddress = node(third).address - expectMsgAllOf( - MemberUp(Member(node(first).address, MemberStatus.Up, Set.empty)), - MemberUp(Member(node(second).address, MemberStatus.Up, Set.empty)), - MemberUp(Member(node(third).address, MemberStatus.Up, Set.empty))) + Cluster(system) join firstAddress + + receiveN(3).collect { case MemberUp(m) => m.address }.toSet must be ( + Set(firstAddress, secondAddress, thirdAddress)) Cluster(system).unsubscribe(testActor)