diff --git a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsCollector.scala b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsCollector.scala index a2fc73e04c..ef48bf9c2f 100644 --- a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsCollector.scala +++ b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsCollector.scala @@ -211,7 +211,7 @@ private[metrics] class ClusterMetricsCollector extends Actor with ActorLogging { * Updates the initial node ring for those nodes that are [[akka.cluster.MemberStatus]] `Up`. */ def receiveState(state: CurrentClusterState): Unit = - nodes = (state.members -- state.unreachable) collect { + nodes = (state.members diff state.unreachable) collect { case m if m.status == MemberStatus.Up || m.status == MemberStatus.WeaklyUp ⇒ m.address } diff --git a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/Metric.scala b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/Metric.scala index 7c2a1e3457..59294e7d53 100644 --- a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/Metric.scala +++ b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/Metric.scala @@ -282,7 +282,7 @@ final case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Met if (timestamp >= that.timestamp) this // that is older else { // equality is based on the name of the Metric and Set doesn't replace existing element - copy(metrics = that.metrics ++ metrics, timestamp = that.timestamp) + copy(metrics = that.metrics union metrics, timestamp = that.timestamp) } } @@ -303,7 +303,7 @@ final case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Met } // Append metrics missing from either latest or current. // Equality is based on the [[Metric.name]] and [[Set]] doesn't replace existing elements. - val merged = updated ++ latestNode.metrics ++ currentNode.metrics + val merged = updated union latestNode.metrics union currentNode.metrics copy(metrics = merged, timestamp = latestNode.timestamp) } diff --git a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/MetricsCollector.scala b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/MetricsCollector.scala index ee92a26b2e..85375d1c46 100644 --- a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/MetricsCollector.scala +++ b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/MetricsCollector.scala @@ -218,7 +218,7 @@ class SigarMetricsCollector(address: Address, decayFactor: Double, sigar: SigarP override def metrics(): Set[Metric] = { // Must obtain cpuPerc in one shot. See https://github.com/akka/akka/issues/16121 val cpuPerc = sigar.getCpuPerc - super.metrics ++ Set(cpuCombined(cpuPerc), cpuStolen(cpuPerc)).flatten + super.metrics union Set(cpuCombined(cpuPerc), cpuStolen(cpuPerc)).flatten } /** diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index fb6faa33a6..1e7ff93997 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -661,7 +661,7 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti rebalanceInProgress += shard log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion) context.actorOf(rebalanceWorkerProps(shard, rebalanceFromRegion, handOffTimeout, - state.regions.keySet ++ state.regionProxies) + state.regions.keySet union state.regionProxies) .withDispatcher(context.props.dispatcher)) case None ⇒ log.debug("Rebalance of non-existing shard [{}] is ignored", shard) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 6bb76ecb7d..a2b3329a73 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -255,7 +255,7 @@ class ShardRegion( val cluster = Cluster(context.system) // sort by age, oldest first - val ageOrdering = Ordering.fromLessThan[Member] { (a, b) ⇒ a.isOlderThan(b) } + val ageOrdering = Member.ageOrdering var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering) var regions = Map.empty[ActorRef, Set[ShardId]] @@ -318,14 +318,14 @@ class ShardRegion( } def receiveClusterState(state: CurrentClusterState): Unit = { - changeMembers(immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m ⇒ + changeMembers(immutable.SortedSet.empty(ageOrdering) union state.members.filter(m ⇒ m.status == MemberStatus.Up && matchingRole(m))) } def receiveClusterEvent(evt: ClusterDomainEvent): Unit = evt match { case MemberUp(m) ⇒ if (matchingRole(m)) - changeMembers(membersByAge + m) + changeMembers(membersByAge - m + m) // replace case MemberRemoved(m, _) ⇒ if (m.uniqueAddress == cluster.selfUniqueAddress) diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala b/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala index c90f75b4e9..ee3cb702d6 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala @@ -310,7 +310,7 @@ final class ClusterClient(settings: ClusterClientSettings) extends Actor with Ac def sendGetContacts(): Unit = { val sendTo = if (contacts.isEmpty) initialContactsSel - else if (contacts.size == 1) (initialContactsSel ++ contacts) + else if (contacts.size == 1) (initialContactsSel union contacts) else contacts if (log.isDebugEnabled) log.debug(s"""Sending GetContacts to [${sendTo.mkString(",")}]""") @@ -639,7 +639,7 @@ final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterRecep val slice = { val first = nodes.from(a).tail.take(numberOfContacts) if (first.size == numberOfContacts) first - else first ++ nodes.take(numberOfContacts - first.size) + else first union nodes.take(numberOfContacts - first.size) } val contacts = Contacts(slice.map(a ⇒ self.path.toStringWithAddress(a))(collection.breakOut)) if (log.isDebugEnabled) @@ -648,7 +648,7 @@ final class ClusterReceptionist(pubSubMediator: ActorRef, settings: ClusterRecep } case state: CurrentClusterState ⇒ - nodes = nodes.empty ++ state.members.collect { case m if m.status != MemberStatus.Joining && matchingRole(m) ⇒ m.address } + nodes = nodes.empty union state.members.collect { case m if m.status != MemberStatus.Joining && matchingRole(m) ⇒ m.address } consistentHash = ConsistentHash(nodes, virtualNodesFactor) case MemberUp(m) ⇒ diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index dfcbe2b702..2d93b47986 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -226,7 +226,7 @@ object ClusterSingletonManager { val cluster = Cluster(context.system) // sort by age, oldest first - val ageOrdering = Ordering.fromLessThan[Member] { (a, b) ⇒ a.isOlderThan(b) } + val ageOrdering = Member.ageOrdering var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering) var changes = Vector.empty[AnyRef] @@ -251,7 +251,7 @@ object ClusterSingletonManager { } def handleInitial(state: CurrentClusterState): Unit = { - membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m ⇒ + membersByAge = immutable.SortedSet.empty(ageOrdering) union state.members.filter(m ⇒ (m.status == MemberStatus.Up || m.status == MemberStatus.Leaving) && matchingRole(m)) val safeToBeOldest = !state.members.exists { m ⇒ (m.status == MemberStatus.Down || m.status == MemberStatus.Exiting) } val initial = InitialOldestState(membersByAge.headOption.map(_.address), safeToBeOldest) @@ -260,7 +260,10 @@ object ClusterSingletonManager { def add(m: Member): Unit = { if (matchingRole(m)) - trackChange { () ⇒ membersByAge += m } + trackChange { () ⇒ + membersByAge -= m // replace + membersByAge += m + } } def remove(m: Member): Unit = { diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala index 43b396e96b..7c81a17419 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonProxy.scala @@ -139,9 +139,7 @@ final class ClusterSingletonProxy(singletonManagerPath: String, settings: Cluste val cluster = Cluster(context.system) var singleton: Option[ActorRef] = None // sort by age, oldest first - val ageOrdering = Ordering.fromLessThan[Member] { - (a, b) ⇒ a.isOlderThan(b) - } + val ageOrdering = Member.ageOrdering var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering) var buffer = new java.util.LinkedList[(Any, ActorRef)] @@ -170,7 +168,7 @@ final class ClusterSingletonProxy(singletonManagerPath: String, settings: Cluste def handleInitial(state: CurrentClusterState): Unit = { trackChange { () ⇒ - membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.collect { + membersByAge = immutable.SortedSet.empty(ageOrdering) union state.members.collect { case m if m.status == MemberStatus.Up && matchingRole(m) ⇒ m } } @@ -203,8 +201,9 @@ final class ClusterSingletonProxy(singletonManagerPath: String, settings: Cluste */ def add(m: Member): Unit = { if (matchingRole(m)) - trackChange { - () ⇒ membersByAge += m + trackChange { () ⇒ + membersByAge -= m // replace + membersByAge += m } } diff --git a/akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java b/akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java index e02e9f24d9..e363757e90 100644 --- a/akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java +++ b/akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java @@ -46,77 +46,6 @@ public class ClusterSingletonManagerTest { //#create-singleton-proxy } - static//documentation of how to keep track of the oldest member in user land - //#singleton-proxy - public class ConsumerProxy extends UntypedActor { - - final Cluster cluster = Cluster.get(getContext().system()); - - final Comparator ageComparator = new Comparator() { - public int compare(Member a, Member b) { - if (a.isOlderThan(b)) - return -1; - else if (b.isOlderThan(a)) - return 1; - else - return 0; - } - }; - final SortedSet membersByAge = new TreeSet(ageComparator); - - final String role = "worker"; - - //subscribe to cluster changes - @Override - public void preStart() { - cluster.subscribe(getSelf(), MemberEvent.class); - } - - //re-subscribe when restart - @Override - public void postStop() { - cluster.unsubscribe(getSelf()); - } - - @Override - public void onReceive(Object message) { - if (message instanceof CurrentClusterState) { - CurrentClusterState state = (CurrentClusterState) message; - List members = new ArrayList(); - for (Member m : state.getMembers()) { - if (m.status().equals(MemberStatus.up()) && m.hasRole(role)) - members.add(m); - } - membersByAge.clear(); - membersByAge.addAll(members); - - } else if (message instanceof MemberUp) { - Member m = ((MemberUp) message).member(); - if (m.hasRole(role)) - membersByAge.add(m); - - } else if (message instanceof MemberRemoved) { - Member m = ((MemberUp) message).member(); - if (m.hasRole(role)) - membersByAge.remove(m); - - } else if (message instanceof MemberEvent) { - // not interesting - - } else if (!membersByAge.isEmpty()) { - currentMaster().tell(message, getSender()); - - } - } - - ActorSelection currentMaster() { - return getContext().actorSelection(membersByAge.first().address() + "/user/singleton/statsService"); - } - - } - - //#singleton-proxy - public static class End { } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 7a09543853..d4d2977ed8 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -511,7 +511,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with updateLatestGossip(newGossip) logInfo("Node [{}] is JOINING, roles [{}]", node.address, roles.mkString(", ")) - if (node != selfUniqueAddress) + if (node == selfUniqueAddress) { + if (localMembers.isEmpty) + leaderActions() // important for deterministic oldest when bootstrapping + } else sender() ! Welcome(selfUniqueAddress, latestGossip) publish(latestGossip) @@ -923,11 +926,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with // handle changes // replace changed members - val newMembers = changedMembers ++ localMembers -- removedUnreachable + val newMembers = changedMembers union localMembers diff removedUnreachable // removing REMOVED nodes from the `seen` table val removed = removedUnreachable.map(_.uniqueAddress) - val newSeen = localSeen -- removed + val newSeen = localSeen diff removed // removing REMOVED nodes from the `reachability` table val newReachability = localOverview.reachability.remove(removed) val newOverview = localOverview copy (seen = newSeen, reachability = newReachability) @@ -985,7 +988,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with if (changedMembers.nonEmpty) { // replace changed members - val newMembers = changedMembers ++ localMembers + val newMembers = changedMembers union localMembers val newGossip = localGossip.copy(members = newMembers) updateLatestGossip(newGossip) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index be95f74148..00c52f36ab 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -272,10 +272,11 @@ object ClusterEvent { private[cluster] def diffMemberEvents(oldGossip: Gossip, newGossip: Gossip): immutable.Seq[MemberEvent] = if (newGossip eq oldGossip) Nil else { - val newMembers = newGossip.members -- oldGossip.members + val newMembers = newGossip.members diff oldGossip.members val membersGroupedByAddress = List(newGossip.members, oldGossip.members).flatten.groupBy(_.uniqueAddress) val changedMembers = membersGroupedByAddress collect { - case (_, newMember :: oldMember :: Nil) if newMember.status != oldMember.status ⇒ newMember + case (_, newMember :: oldMember :: Nil) if newMember.status != oldMember.status || newMember.upNumber != oldMember.upNumber ⇒ + newMember } val memberEvents = (newMembers ++ changedMembers) collect { case m if m.status == WeaklyUp ⇒ MemberWeaklyUp(m) @@ -284,7 +285,7 @@ object ClusterEvent { // no events for other transitions } - val removedMembers = oldGossip.members -- newGossip.members + val removedMembers = oldGossip.members diff newGossip.members val removedEvents = removedMembers.map(m ⇒ MemberRemoved(m.copy(status = Removed), m.status)) (new VectorBuilder[MemberEvent]() ++= memberEvents ++= removedEvents).result() @@ -304,7 +305,7 @@ object ClusterEvent { */ private[cluster] def diffRolesLeader(oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): Set[RoleLeaderChanged] = { for { - role ← (oldGossip.allRoles ++ newGossip.allRoles) + role ← (oldGossip.allRoles union newGossip.allRoles) newLeader = newGossip.roleLeader(role, selfUniqueAddress) if newLeader != oldGossip.roleLeader(role, selfUniqueAddress) } yield RoleLeaderChanged(role, newLeader.map(_.address)) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index a6e2f41d80..e7f58ab1e3 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -184,7 +184,7 @@ private[cluster] final case class ClusterHeartbeatSenderState( oldReceiversNowUnreachable: Set[UniqueAddress], failureDetector: FailureDetectorRegistry[Address]) { - val activeReceivers: Set[UniqueAddress] = ring.myReceivers ++ oldReceiversNowUnreachable + val activeReceivers: Set[UniqueAddress] = ring.myReceivers union oldReceiversNowUnreachable def selfAddress = ring.selfAddress @@ -212,7 +212,7 @@ private[cluster] final case class ClusterHeartbeatSenderState( private def membershipChange(newRing: HeartbeatNodeRing): ClusterHeartbeatSenderState = { val oldReceivers = ring.myReceivers - val removedReceivers = oldReceivers -- newRing.myReceivers + val removedReceivers = oldReceivers diff newRing.myReceivers var adjustedOldReceiversNowUnreachable = oldReceiversNowUnreachable removedReceivers foreach { a ⇒ if (failureDetector.isAvailable(a.address)) @@ -260,7 +260,7 @@ private[cluster] final case class HeartbeatNodeRing( ha < hb || (ha == hb && Member.addressOrdering.compare(a.address, b.address) < 0) } - immutable.SortedSet() ++ nodes + immutable.SortedSet() union nodes } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala index bf68babb9a..c82f32556d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala @@ -404,7 +404,7 @@ final case class NodeMetrics(address: Address, timestamp: Long, metrics: Set[Met if (timestamp >= that.timestamp) this // that is older else { // equality is based on the name of the Metric and Set doesn't replace existing element - copy(metrics = that.metrics ++ metrics, timestamp = that.timestamp) + copy(metrics = that.metrics union metrics, timestamp = that.timestamp) } } @@ -742,7 +742,7 @@ class SigarMetricsCollector(address: Address, decayFactor: Double, sigar: AnyRef } override def metrics: Set[Metric] = { - super.metrics.filterNot(_.name == SystemLoadAverage) ++ Set(systemLoadAverage, cpuCombined).flatten + super.metrics.filterNot(_.name == SystemLoadAverage) union Set(systemLoadAverage, cpuCombined).flatten } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala index 17ed4088c9..c1a21e5977 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala @@ -72,7 +72,7 @@ private[cluster] class ClusterRemoteWatcher( case state: CurrentClusterState ⇒ clusterNodes = state.members.collect { case m if m.address != selfAddress ⇒ m.address } clusterNodes foreach takeOverResponsibility - unreachable --= clusterNodes + unreachable = unreachable diff clusterNodes case MemberUp(m) ⇒ memberUp(m) case MemberWeaklyUp(m) ⇒ memberUp(m) case MemberRemoved(m, previousStatus) ⇒ memberRemoved(m, previousStatus) diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index c130b1d442..cb10b08cf0 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -73,12 +73,12 @@ private[cluster] final case class Gossip( throw new IllegalArgumentException(s"Live members must have status [${Removed}], " + s"got [${members.filter(_.status == Removed)}]") - val inReachabilityButNotMember = overview.reachability.allObservers -- members.map(_.uniqueAddress) + val inReachabilityButNotMember = overview.reachability.allObservers diff members.map(_.uniqueAddress) if (inReachabilityButNotMember.nonEmpty) throw new IllegalArgumentException("Nodes not part of cluster in reachability table, got [%s]" format inReachabilityButNotMember.mkString(", ")) - val seenButNotMember = overview.seen -- members.map(_.uniqueAddress) + val seenButNotMember = overview.seen diff members.map(_.uniqueAddress) if (seenButNotMember.nonEmpty) throw new IllegalArgumentException("Nodes not part of cluster have marked the Gossip as seen, got [%s]" format seenButNotMember.mkString(", ")) @@ -129,7 +129,7 @@ private[cluster] final case class Gossip( * Merges the seen table of two Gossip instances. */ def mergeSeen(that: Gossip): Gossip = - this copy (overview = overview copy (seen = overview.seen ++ that.overview.seen)) + this copy (overview = overview copy (seen = overview.seen union that.overview.seen)) /** * Merges two Gossip instances including membership tables, and the VectorClock histories. @@ -141,7 +141,7 @@ private[cluster] final case class Gossip( val mergedVClock = this.version merge that.version // 2. merge members by selecting the single Member with highest MemberStatus out of the Member groups - val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members) + val mergedMembers = Gossip.emptyMembers union Member.pickHighestPriority(this.members, that.members) // 3. merge reachability table by picking records with highest version val mergedReachability = this.overview.reachability.merge(mergedMembers.map(_.uniqueAddress), diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index 8dc08dde6f..6fb54cbdd7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -47,7 +47,11 @@ class Member private[cluster] ( * cluster. A member that joined after removal of another member may be * considered older than the removed member. */ - def isOlderThan(other: Member): Boolean = upNumber < other.upNumber + def isOlderThan(other: Member): Boolean = + if (upNumber == other.upNumber) + Member.addressOrdering.compare(address, other.address) < 0 + else + upNumber < other.upNumber def copy(status: MemberStatus): Member = { val oldStatus = this.status @@ -123,6 +127,13 @@ object Member { } } + /** + * Sort members by age, i.e. using [[Member#isOlderThan]]. + */ + val ageOrdering: Ordering[Member] = Ordering.fromLessThan[Member] { + (a, b) ⇒ a.isOlderThan(b) + } + def pickHighestPriority(a: Set[Member], b: Set[Member]): Set[Member] = { // group all members by Address => Seq[Member] val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.uniqueAddress) @@ -141,20 +152,25 @@ object Member { /** * Picks the Member with the highest "priority" MemberStatus. */ - def highestPriorityOf(m1: Member, m2: Member): Member = (m1.status, m2.status) match { - case (Removed, _) ⇒ m1 - case (_, Removed) ⇒ m2 - case (Down, _) ⇒ m1 - case (_, Down) ⇒ m2 - case (Exiting, _) ⇒ m1 - case (_, Exiting) ⇒ m2 - case (Leaving, _) ⇒ m1 - case (_, Leaving) ⇒ m2 - case (Joining, _) ⇒ m2 - case (_, Joining) ⇒ m1 - case (WeaklyUp, _) ⇒ m2 - case (_, WeaklyUp) ⇒ m1 - case (Up, Up) ⇒ m1 + def highestPriorityOf(m1: Member, m2: Member): Member = { + if (m1.status == m2.status) + // preserve the oldest in case of different upNumber + if (m1.isOlderThan(m2)) m1 else m2 + else (m1.status, m2.status) match { + case (Removed, _) ⇒ m1 + case (_, Removed) ⇒ m2 + case (Down, _) ⇒ m1 + case (_, Down) ⇒ m2 + case (Exiting, _) ⇒ m1 + case (_, Exiting) ⇒ m2 + case (Leaving, _) ⇒ m1 + case (_, Leaving) ⇒ m2 + case (Joining, _) ⇒ m2 + case (_, Joining) ⇒ m1 + case (WeaklyUp, _) ⇒ m2 + case (_, WeaklyUp) ⇒ m1 + case (Up, Up) ⇒ m1 + } } } diff --git a/akka-cluster/src/main/scala/akka/cluster/Reachability.scala b/akka-cluster/src/main/scala/akka/cluster/Reachability.scala index 83dbc575f0..27fb62b18a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Reachability.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Reachability.scala @@ -80,7 +80,7 @@ private[cluster] class Reachability private ( val observerRowsMap: Map[UniqueAddress, Map[UniqueAddress, Reachability.Record]] = mapBuilder.toMap val allTerminated: Set[UniqueAddress] = terminatedBuilder.result() - val allUnreachable: Set[UniqueAddress] = unreachableBuilder.result() -- allTerminated + val allUnreachable: Set[UniqueAddress] = unreachableBuilder.result() diff allTerminated (observerRowsMap, allUnreachable, allTerminated) } @@ -88,7 +88,7 @@ private[cluster] class Reachability private ( val allUnreachableOrTerminated: Set[UniqueAddress] = if (allTerminated.isEmpty) allUnreachable - else allUnreachable ++ allTerminated + else allUnreachable union allTerminated } diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index e0e7e745bb..919f7141c9 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -195,7 +195,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri val allMembers = gossip.members.toVector val allAddresses: Vector[UniqueAddress] = allMembers.map(_.uniqueAddress) val addressMapping = allAddresses.zipWithIndex.toMap - val allRoles = allMembers.foldLeft(Set.empty[String])((acc, m) ⇒ acc ++ m.roles).to[Vector] + val allRoles = allMembers.foldLeft(Set.empty[String])((acc, m) ⇒ acc union m.roles).to[Vector] val roleMapping = allRoles.zipWithIndex.toMap val allHashes = gossip.version.versions.keys.to[Vector] val hashMapping = allHashes.zipWithIndex.toMap diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/DeterministicOldestWhenJoiningSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/DeterministicOldestWhenJoiningSpec.scala new file mode 100644 index 0000000000..51b5d1c28e --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/DeterministicOldestWhenJoiningSpec.scala @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import scala.concurrent.duration._ +import akka.actor.Address +import akka.cluster.ClusterEvent.MemberUp +import akka.cluster.ClusterEvent.CurrentClusterState + +object DeterministicOldestWhenJoiningMultiJvmSpec extends MultiNodeConfig { + val seed1 = role("seed1") + val seed2 = role("seed2") + val seed3 = role("seed3") + + commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" + # not too quick to trigger problematic scenario more often + akka.cluster.leader-actions-interval = 2000 ms + akka.cluster.gossip-interval = 500 ms + """)).withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class DeterministicOldestWhenJoiningMultiJvmNode1 extends DeterministicOldestWhenJoiningSpec +class DeterministicOldestWhenJoiningMultiJvmNode2 extends DeterministicOldestWhenJoiningSpec +class DeterministicOldestWhenJoiningMultiJvmNode3 extends DeterministicOldestWhenJoiningSpec + +abstract class DeterministicOldestWhenJoiningSpec + extends MultiNodeSpec(DeterministicOldestWhenJoiningMultiJvmSpec) + with MultiNodeClusterSpec { + + import DeterministicOldestWhenJoiningMultiJvmSpec._ + + // reverse order because that expose the bug in issue #18554 + def seedNodes: immutable.IndexedSeq[Address] = + Vector(address(seed1), address(seed2), address(seed3)).sorted(Member.addressOrdering).reverse + val roleByAddress = Map(address(seed1) -> seed1, address(seed2) -> seed2, address(seed3) -> seed3) + + "Joining a cluster" must { + "result in deterministic oldest node" taggedAs LongRunningTest in { + cluster.subscribe(testActor, classOf[MemberUp]) + expectMsgType[CurrentClusterState] + + runOn(roleByAddress(seedNodes.head)) { + cluster.joinSeedNodes(seedNodes) + } + enterBarrier("first-seed-joined") + + runOn(roleByAddress(seedNodes(1)), roleByAddress(roleByAddress(seedNodes(2)))) { + cluster.joinSeedNodes(seedNodes) + } + + within(10.seconds) { + val ups = List(expectMsgType[MemberUp], expectMsgType[MemberUp], expectMsgType[MemberUp]) + ups.map(_.member).sorted(Member.ageOrdering).head.address should ===(seedNodes.head) + } + + enterBarrier("after-1") + } + + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 523dcca93f..9cd99742da 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -303,7 +303,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro * Wait until the specified nodes have seen the same gossip overview. */ def awaitSeenSameState(addresses: Address*): Unit = - awaitAssert((addresses.toSet -- clusterView.seenBy) should ===(Set.empty)) + awaitAssert((addresses.toSet diff clusterView.seenBy) should ===(Set.empty)) /** * Leader according to the address ordering of the roles. diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index 0de47bb541..61d30e4cf0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -3,8 +3,8 @@ */ package akka.cluster -// TODO remove metrics -// FIXME this test is not migrated to metrics extension +// TODO remove metrics +// FIXME this test is not migrated to metrics extension import language.postfixOps import scala.annotation.tailrec @@ -981,7 +981,7 @@ abstract class StressSpec timeout = remainingOrDefault) awaitAllReachable() } - val nextAddresses = clusterView.members.map(_.address) -- usedAddresses + val nextAddresses = clusterView.members.map(_.address) diff usedAddresses runOn(usedRoles: _*) { nextAddresses.size should ===(numberOfNodesJoinRemove) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index 6668ab272f..e9c7467398 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -46,7 +46,7 @@ abstract class TransitionSpec def nonLeader(roles: RoleName*) = roles.toSeq.sorted.tail def memberStatus(address: Address): MemberStatus = { - val statusOption = (clusterView.members ++ clusterView.unreachableMembers).collectFirst { + val statusOption = (clusterView.members union clusterView.unreachableMembers).collectFirst { case m if m.address == address ⇒ m.status } statusOption.getOrElse(Removed) @@ -91,7 +91,7 @@ abstract class TransitionSpec clusterView.latestStats.gossipStats.receivedGossipCount != oldCount // received gossip } // gossip chat will synchronize the views - awaitCond((Set(fromRole, toRole) -- seenLatestGossip).isEmpty) + awaitCond((Set(fromRole, toRole) diff seenLatestGossip).isEmpty) enterBarrier("after-gossip-" + gossipBarrierCounter) } runOn(fromRole) { @@ -99,7 +99,7 @@ abstract class TransitionSpec // send gossip cluster.clusterCore ! InternalClusterAction.SendGossipTo(toRole) // gossip chat will synchronize the views - awaitCond((Set(fromRole, toRole) -- seenLatestGossip).isEmpty) + awaitCond((Set(fromRole, toRole) diff seenLatestGossip).isEmpty) enterBarrier("after-gossip-" + gossipBarrierCounter) } runOn(roles.filterNot(r ⇒ r == fromRole || r == toRole): _*) { @@ -115,8 +115,7 @@ abstract class TransitionSpec runOn(first) { cluster join myself - awaitMemberStatus(myself, Joining) - leaderActions() + // first joining itself will immediately be moved to Up awaitMemberStatus(myself, Up) awaitCond(clusterView.isSingletonCluster) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinSpec.scala index d212199bbe..be5759e87c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinSpec.scala @@ -342,7 +342,7 @@ abstract class ClusterRoundRobinSpec extends MultiNodeSpec(ClusterRoundRobinMult def routeeAddresses = (routees map { case ActorRefRoutee(ref) ⇒ fullAddress(ref) }).toSet routees foreach { case ActorRefRoutee(ref) ⇒ watch(ref) } - val notUsedAddress = ((roles map address).toSet -- routeeAddresses).head + val notUsedAddress = ((roles map address).toSet diff routeeAddresses).head val downAddress = routeeAddresses.find(_ != address(first)).get val downRouteeRef = routees.collectFirst { case ActorRefRoutee(ref) if ref.path.address == downAddress ⇒ ref diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala index 605de99336..b481560264 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala @@ -163,7 +163,7 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with Matchers { val oldUnreachable = state.oldReceiversNowUnreachable state = state.addMember(node) // keep unreachable - (oldUnreachable -- state.activeReceivers) should ===(Set.empty) + (oldUnreachable diff state.activeReceivers) should ===(Set.empty) state.failureDetector.isMonitoring(node.address) should ===(false) state.failureDetector.isAvailable(node.address) should ===(true) } @@ -174,9 +174,9 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with Matchers { state = state.removeMember(node) // keep unreachable, unless it was the removed if (oldUnreachable(node)) - (oldUnreachable -- state.activeReceivers) should ===(Set(node)) + (oldUnreachable diff state.activeReceivers) should ===(Set(node)) else - (oldUnreachable -- state.activeReceivers) should ===(Set.empty) + (oldUnreachable diff state.activeReceivers) should ===(Set.empty) state.failureDetector.isMonitoring(node.address) should ===(false) state.failureDetector.isAvailable(node.address) should ===(true) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala index dc43cf33ea..5d0fd4244e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala @@ -63,7 +63,7 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with case _ ⇒ (p, Set.empty[String]) } if (tags.nonEmpty && hasTagSubscribers) - allTags ++= tags + allTags = allTags union tags require(!p2.persistenceId.startsWith(tagPersistenceIdPrefix), s"persistenceId [${p.persistenceId}] must not start with $tagPersistenceIdPrefix") diff --git a/akka-remote/src/test/scala/akka/remote/RemoteInitErrorSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteInitErrorSpec.scala index 9951007514..b40242800e 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteInitErrorSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteInitErrorSpec.scala @@ -41,7 +41,7 @@ class RemoteInitErrorSpec extends FlatSpec with Matchers { } """).resolve() - def currentThreadIds() = { + def currentThreadIds(): Set[Long] = { val threads = Thread.getAllStackTraces().keySet() threads.collect({ case t: Thread if (!t.isDaemon()) ⇒ t.getId() }) } @@ -56,7 +56,7 @@ class RemoteInitErrorSpec extends FlatSpec with Matchers { eventually(timeout(30 seconds), interval(800 milliseconds)) { val current = currentThreadIds() // no new threads should remain compared to the start state - (current -- start) should be(empty) + (current diff start) should be(empty) } } } diff --git a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala index fe19fe7d03..481547b653 100644 --- a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala @@ -74,8 +74,8 @@ object Configuration { } val engine = NettySSLSupport.initializeClientSSL(settings, NoLogging).getEngine - val gotAllSupported = enabled.toSet -- engine.getSupportedCipherSuites.toSet - val gotAllEnabled = enabled.toSet -- engine.getEnabledCipherSuites.toSet + val gotAllSupported = enabled.toSet diff engine.getSupportedCipherSuites.toSet + val gotAllEnabled = enabled.toSet diff engine.getEnabledCipherSuites.toSet gotAllSupported.isEmpty || (throw new IllegalArgumentException("Cipher Suite not supported: " + gotAllSupported)) gotAllEnabled.isEmpty || (throw new IllegalArgumentException("Cipher Suite not enabled: " + gotAllEnabled)) engine.getSupportedProtocols.contains(settings.SSLProtocol.get) ||