117 lines
4 KiB
Scala
117 lines
4 KiB
Scala
|
|
/**
|
||
|
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||
|
|
*/
|
||
|
|
|
||
|
|
package akka.cluster
|
||
|
|
|
||
|
|
import scala.collection.immutable.SortedSet
|
||
|
|
import scala.collection.GenTraversableOnce
|
||
|
|
import akka.actor.Address
|
||
|
|
import MemberStatus._
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Represents the address and the current status of a cluster member node.
|
||
|
|
*
|
||
|
|
* Note: `hashCode` and `equals` are solely based on the underlying `Address`, not its `MemberStatus`.
|
||
|
|
*/
|
||
|
|
class Member(val address: Address, val status: MemberStatus) extends ClusterMessage {
|
||
|
|
override def hashCode = address.##
|
||
|
|
override def equals(other: Any) = Member.unapply(this) == Member.unapply(other)
|
||
|
|
override def toString = "Member(address = %s, status = %s)" format (address, status)
|
||
|
|
def copy(address: Address = this.address, status: MemberStatus = this.status): Member = new Member(address, status)
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Module with factory and ordering methods for Member instances.
|
||
|
|
*/
|
||
|
|
object Member {
|
||
|
|
|
||
|
|
/**
|
||
|
|
* `Address` ordering type class, sorts addresses by host and port.
|
||
|
|
*/
|
||
|
|
implicit val addressOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b) ⇒
|
||
|
|
if (a.host != b.host) a.host.getOrElse("").compareTo(b.host.getOrElse("")) < 0
|
||
|
|
else if (a.port != b.port) a.port.getOrElse(0) < b.port.getOrElse(0)
|
||
|
|
else false
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* `Member` ordering type class, sorts members by host and port with the exception that
|
||
|
|
* it puts all members that are in MemberStatus.EXITING last.
|
||
|
|
*/
|
||
|
|
implicit val ordering: Ordering[Member] = Ordering.fromLessThan[Member] { (a, b) ⇒
|
||
|
|
if (a.status == Exiting && b.status != Exiting) false
|
||
|
|
else if (a.status != Exiting && b.status == Exiting) true
|
||
|
|
else addressOrdering.compare(a.address, b.address) < 0
|
||
|
|
}
|
||
|
|
|
||
|
|
def apply(address: Address, status: MemberStatus): Member = new Member(address, status)
|
||
|
|
|
||
|
|
def unapply(other: Any) = other match {
|
||
|
|
case m: Member ⇒ Some(m.address)
|
||
|
|
case _ ⇒ None
|
||
|
|
}
|
||
|
|
|
||
|
|
def pickHighestPriority(a: Set[Member], b: Set[Member]): Set[Member] = {
|
||
|
|
// group all members by Address => Seq[Member]
|
||
|
|
val groupedByAddress = (a.toSeq ++ b.toSeq).groupBy(_.address)
|
||
|
|
// pick highest MemberStatus
|
||
|
|
(Set.empty[Member] /: groupedByAddress) {
|
||
|
|
case (acc, (_, members)) ⇒ acc + members.reduceLeft(highestPriorityOf)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Picks the Member with the highest "priority" MemberStatus.
|
||
|
|
*/
|
||
|
|
def highestPriorityOf(m1: Member, m2: Member): Member = (m1.status, m2.status) match {
|
||
|
|
case (Removed, _) ⇒ m1
|
||
|
|
case (_, Removed) ⇒ m2
|
||
|
|
case (Down, _) ⇒ m1
|
||
|
|
case (_, Down) ⇒ m2
|
||
|
|
case (Exiting, _) ⇒ m1
|
||
|
|
case (_, Exiting) ⇒ m2
|
||
|
|
case (Leaving, _) ⇒ m1
|
||
|
|
case (_, Leaving) ⇒ m2
|
||
|
|
case (Up, Joining) ⇒ m2
|
||
|
|
case (Joining, Up) ⇒ m1
|
||
|
|
case (Joining, Joining) ⇒ m1
|
||
|
|
case (Up, Up) ⇒ m1
|
||
|
|
}
|
||
|
|
|
||
|
|
// FIXME Workaround for https://issues.scala-lang.org/browse/SI-5986
|
||
|
|
// SortedSet + and ++ operators replaces existing element
|
||
|
|
// Use these :+ and :++ operators for the Gossip members
|
||
|
|
implicit def sortedSetWorkaround(sortedSet: SortedSet[Member]): SortedSetWorkaround = new SortedSetWorkaround(sortedSet)
|
||
|
|
class SortedSetWorkaround(sortedSet: SortedSet[Member]) {
|
||
|
|
implicit def :+(elem: Member): SortedSet[Member] = {
|
||
|
|
if (sortedSet.contains(elem)) sortedSet
|
||
|
|
else sortedSet + elem
|
||
|
|
}
|
||
|
|
|
||
|
|
implicit def :++(elems: GenTraversableOnce[Member]): SortedSet[Member] =
|
||
|
|
sortedSet ++ (elems.toSet diff sortedSet)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Defines the current status of a cluster member node
|
||
|
|
*
|
||
|
|
* Can be one of: Joining, Up, Leaving, Exiting and Down.
|
||
|
|
*/
|
||
|
|
sealed trait MemberStatus extends ClusterMessage {
|
||
|
|
|
||
|
|
/**
|
||
|
|
* Using the same notion for 'unavailable' as 'non-convergence': DOWN
|
||
|
|
*/
|
||
|
|
def isUnavailable: Boolean = this == Down
|
||
|
|
}
|
||
|
|
|
||
|
|
object MemberStatus {
|
||
|
|
case object Joining extends MemberStatus
|
||
|
|
case object Up extends MemberStatus
|
||
|
|
case object Leaving extends MemberStatus
|
||
|
|
case object Exiting extends MemberStatus
|
||
|
|
case object Down extends MemberStatus
|
||
|
|
case object Removed extends MemberStatus
|
||
|
|
}
|