Cluster member status transition guards, see #2802

This commit is contained in:
Patrik Nordwall 2013-04-05 12:38:09 +02:00
parent 7e79bcd4ae
commit d2548285ac
4 changed files with 53 additions and 27 deletions

View file

@ -422,7 +422,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
* State transition to LEAVING. * State transition to LEAVING.
*/ */
def leaving(address: Address): Unit = { def leaving(address: Address): Unit = {
if (latestGossip.members.exists(_.address == address)) { // only try to update if the node is available (in the member ring) // only try to update if the node is available (in the member ring)
if (latestGossip.members.exists(m m.address == address && m.status == Up)) {
val newMembers = latestGossip.members map { m if (m.address == address) m.copy(status = Leaving) else m } // mark node as LEAVING val newMembers = latestGossip.members map { m if (m.address == address) m.copy(status = Leaving) else m } // mark node as LEAVING
val newGossip = latestGossip copy (members = newMembers) val newGossip = latestGossip copy (members = newMembers)

View file

@ -17,7 +17,7 @@ import MemberStatus._
* Note: `hashCode` and `equals` are solely based on the underlying `Address`, not its `MemberStatus` * Note: `hashCode` and `equals` are solely based on the underlying `Address`, not its `MemberStatus`
* and roles. * and roles.
*/ */
case class Member(val address: Address, val status: MemberStatus, roles: Set[String]) extends ClusterMessage { class Member(val address: Address, val status: MemberStatus, val roles: Set[String]) extends ClusterMessage with Serializable {
override def hashCode = address.## override def hashCode = address.##
override def equals(other: Any) = other match { override def equals(other: Any) = other match {
case m: Member address == m.address case m: Member address == m.address
@ -32,6 +32,16 @@ case class Member(val address: Address, val status: MemberStatus, roles: Set[Str
*/ */
def getRoles: java.util.Set[String] = def getRoles: java.util.Set[String] =
scala.collection.JavaConverters.setAsJavaSetConverter(roles).asJava scala.collection.JavaConverters.setAsJavaSetConverter(roles).asJava
def copy(status: MemberStatus): Member = {
val oldStatus = this.status
if (status == oldStatus) this
else {
require(allowedTransitions(oldStatus)(status),
s"Invalid member status transition [ ${this} -> ${status}]")
new Member(address, status, roles)
}
}
} }
/** /**
@ -41,6 +51,9 @@ object Member {
val none = Set.empty[Member] val none = Set.empty[Member]
def apply(address: Address, status: MemberStatus, roles: Set[String]): Member =
new Member(address, status, roles)
/** /**
* `Address` ordering type class, sorts addresses by host and port. * `Address` ordering type class, sorts addresses by host and port.
*/ */
@ -149,4 +162,16 @@ object MemberStatus {
* Java API: retrieve the removed status singleton * Java API: retrieve the removed status singleton
*/ */
def removed: MemberStatus = Removed def removed: MemberStatus = Removed
/**
* INTERNAL API
*/
private[cluster] val allowedTransitions: Map[MemberStatus, Set[MemberStatus]] =
Map(
Joining -> Set(Up, Down, Removed),
Up -> Set(Leaving, Down, Removed),
Leaving -> Set(Exiting, Down, Removed),
Down -> Set(Removed),
Exiting -> Set(Removed, Down),
Removed -> Set.empty[MemberStatus])
} }

View file

@ -26,10 +26,10 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
var publisher: ActorRef = _ var publisher: ActorRef = _
val aUp = Member(Address("akka.tcp", "sys", "a", 2552), Up, Set.empty) val aUp = Member(Address("akka.tcp", "sys", "a", 2552), Up, Set.empty)
val aLeaving = aUp.copy(status = Leaving) val aLeaving = aUp.copy(status = Leaving)
val aExiting = aUp.copy(status = Exiting) val aExiting = aLeaving.copy(status = Exiting)
val aRemoved = aUp.copy(status = Removed) val aRemoved = aExiting.copy(status = Removed)
val bUp = Member(Address("akka.tcp", "sys", "b", 2552), Up, Set.empty) val bExiting = Member(Address("akka.tcp", "sys", "b", 2552), Exiting, Set.empty)
val bRemoved = bUp.copy(status = Removed) val bRemoved = bExiting.copy(status = Removed)
val cJoining = Member(Address("akka.tcp", "sys", "c", 2552), Joining, Set("GRP")) val cJoining = Member(Address("akka.tcp", "sys", "c", 2552), Joining, Set("GRP"))
val cUp = cJoining.copy(status = Up) val cUp = cJoining.copy(status = Up)
val cRemoved = cUp.copy(status = Removed) val cRemoved = cUp.copy(status = Removed)
@ -37,13 +37,13 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
val dUp = Member(Address("akka.tcp", "sys", "d", 2552), Up, Set("GRP")) val dUp = Member(Address("akka.tcp", "sys", "d", 2552), Up, Set("GRP"))
val g0 = Gossip(members = SortedSet(aUp)).seen(aUp.address) val g0 = Gossip(members = SortedSet(aUp)).seen(aUp.address)
val g1 = Gossip(members = SortedSet(aUp, bUp, cJoining)).seen(aUp.address).seen(bUp.address).seen(cJoining.address) val g1 = Gossip(members = SortedSet(aUp, bExiting, cJoining)).seen(aUp.address).seen(bExiting.address).seen(cJoining.address)
val g2 = Gossip(members = SortedSet(aUp, bUp, cUp)).seen(aUp.address) val g2 = Gossip(members = SortedSet(aUp, bExiting, cUp)).seen(aUp.address)
val g3 = g2.seen(bUp.address).seen(cUp.address) val g3 = g2.seen(bExiting.address).seen(cUp.address)
val g4 = Gossip(members = SortedSet(a51Up, aUp, bUp, cUp)).seen(aUp.address) val g4 = Gossip(members = SortedSet(a51Up, aUp, bExiting, cUp)).seen(aUp.address)
val g5 = Gossip(members = SortedSet(a51Up, aUp, bUp, cUp)).seen(aUp.address).seen(bUp.address).seen(cUp.address).seen(a51Up.address) val g5 = Gossip(members = SortedSet(a51Up, aUp, bExiting, cUp)).seen(aUp.address).seen(bExiting.address).seen(cUp.address).seen(a51Up.address)
val g6 = Gossip(members = SortedSet(aLeaving, bUp, cUp)).seen(aUp.address) val g6 = Gossip(members = SortedSet(aLeaving, bExiting, cUp)).seen(aUp.address)
val g7 = Gossip(members = SortedSet(aExiting, bUp, cUp)).seen(aUp.address) val g7 = Gossip(members = SortedSet(aExiting, bExiting, cUp)).seen(aUp.address)
// created in beforeEach // created in beforeEach
var memberSubscriber: TestProbe = _ var memberSubscriber: TestProbe = _
@ -64,14 +64,14 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
"publish MemberUp" in { "publish MemberUp" in {
publisher ! PublishChanges(g2) publisher ! PublishChanges(g2)
publisher ! PublishChanges(g3) publisher ! PublishChanges(g3)
memberSubscriber.expectMsg(MemberUp(bUp)) memberSubscriber.expectMsg(MemberExited(bExiting))
memberSubscriber.expectMsg(MemberUp(cUp)) memberSubscriber.expectMsg(MemberUp(cUp))
} }
"publish leader changed" in { "publish leader changed" in {
publisher ! PublishChanges(g4) publisher ! PublishChanges(g4)
memberSubscriber.expectMsg(MemberUp(a51Up)) memberSubscriber.expectMsg(MemberUp(a51Up))
memberSubscriber.expectMsg(MemberUp(bUp)) memberSubscriber.expectMsg(MemberExited(bExiting))
memberSubscriber.expectMsg(MemberUp(cUp)) memberSubscriber.expectMsg(MemberUp(cUp))
memberSubscriber.expectMsg(LeaderChanged(Some(a51Up.address))) memberSubscriber.expectMsg(LeaderChanged(Some(a51Up.address)))
memberSubscriber.expectNoMsg(1 second) memberSubscriber.expectNoMsg(1 second)
@ -79,13 +79,13 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
"publish leader changed when old leader leaves and is removed" in { "publish leader changed when old leader leaves and is removed" in {
publisher ! PublishChanges(g3) publisher ! PublishChanges(g3)
memberSubscriber.expectMsg(MemberUp(bUp)) memberSubscriber.expectMsg(MemberExited(bExiting))
memberSubscriber.expectMsg(MemberUp(cUp)) memberSubscriber.expectMsg(MemberUp(cUp))
publisher ! PublishChanges(g6) publisher ! PublishChanges(g6)
memberSubscriber.expectNoMsg(1 second) memberSubscriber.expectNoMsg(1 second)
publisher ! PublishChanges(g7) publisher ! PublishChanges(g7)
memberSubscriber.expectMsg(MemberExited(aExiting)) memberSubscriber.expectMsg(MemberExited(aExiting))
memberSubscriber.expectMsg(LeaderChanged(Some(bUp.address))) memberSubscriber.expectMsg(LeaderChanged(Some(cUp.address)))
memberSubscriber.expectNoMsg(1 second) memberSubscriber.expectNoMsg(1 second)
// at the removed member a an empty gossip is the last thing // at the removed member a an empty gossip is the last thing
publisher ! PublishChanges(Gossip.empty) publisher ! PublishChanges(Gossip.empty)
@ -98,7 +98,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
"not publish leader changed when same leader" in { "not publish leader changed when same leader" in {
publisher ! PublishChanges(g4) publisher ! PublishChanges(g4)
memberSubscriber.expectMsg(MemberUp(a51Up)) memberSubscriber.expectMsg(MemberUp(a51Up))
memberSubscriber.expectMsg(MemberUp(bUp)) memberSubscriber.expectMsg(MemberExited(bExiting))
memberSubscriber.expectMsg(MemberUp(cUp)) memberSubscriber.expectMsg(MemberUp(cUp))
memberSubscriber.expectMsg(LeaderChanged(Some(a51Up.address))) memberSubscriber.expectMsg(LeaderChanged(Some(a51Up.address)))
@ -132,7 +132,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
publisher ! PublishChanges(g3) publisher ! PublishChanges(g3)
subscriber.expectNoMsg(1 second) subscriber.expectNoMsg(1 second)
// but memberSubscriber is still subscriber // but memberSubscriber is still subscriber
memberSubscriber.expectMsg(MemberUp(bUp)) memberSubscriber.expectMsg(MemberExited(bExiting))
memberSubscriber.expectMsg(MemberUp(cUp)) memberSubscriber.expectMsg(MemberUp(cUp))
} }
@ -141,7 +141,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
publisher ! Subscribe(subscriber.ref, classOf[ClusterDomainEvent]) publisher ! Subscribe(subscriber.ref, classOf[ClusterDomainEvent])
subscriber.expectMsgType[CurrentClusterState] subscriber.expectMsgType[CurrentClusterState]
publisher ! PublishChanges(g3) publisher ! PublishChanges(g3)
subscriber.expectMsg(MemberUp(bUp)) subscriber.expectMsg(MemberExited(bExiting))
subscriber.expectMsg(MemberUp(cUp)) subscriber.expectMsg(MemberUp(cUp))
subscriber.expectMsg(RoleLeaderChanged("GRP", Some(cUp.address))) subscriber.expectMsg(RoleLeaderChanged("GRP", Some(cUp.address)))
subscriber.expectMsgType[SeenChanged] subscriber.expectMsgType[SeenChanged]

View file

@ -15,17 +15,17 @@ class GossipSpec extends WordSpec with MustMatchers {
import MemberStatus._ import MemberStatus._
val a1 = Member(Address("akka.tcp", "sys", "a", 2552), Up, Set.empty) val a1 = Member(Address("akka.tcp", "sys", "a", 2552), Up, Set.empty)
val a2 = a1.copy(status = Joining) val a2 = Member(a1.address, Joining, Set.empty)
val b1 = Member(Address("akka.tcp", "sys", "b", 2552), Up, Set.empty) val b1 = Member(Address("akka.tcp", "sys", "b", 2552), Up, Set.empty)
val b2 = b1.copy(status = Removed) val b2 = Member(b1.address, Removed, Set.empty)
val c1 = Member(Address("akka.tcp", "sys", "c", 2552), Leaving, Set.empty) val c1 = Member(Address("akka.tcp", "sys", "c", 2552), Leaving, Set.empty)
val c2 = c1.copy(status = Up) val c2 = Member(c1.address, Up, Set.empty)
val c3 = c1.copy(status = Exiting) val c3 = Member(c1.address, Exiting, Set.empty)
val d1 = Member(Address("akka.tcp", "sys", "d", 2552), Leaving, Set.empty) val d1 = Member(Address("akka.tcp", "sys", "d", 2552), Leaving, Set.empty)
val d2 = d1.copy(status = Removed) val d2 = Member(d1.address, Removed, Set.empty)
val e1 = Member(Address("akka.tcp", "sys", "e", 2552), Joining, Set.empty) val e1 = Member(Address("akka.tcp", "sys", "e", 2552), Joining, Set.empty)
val e2 = e1.copy(status = Up) val e2 = Member(e1.address, Up, Set.empty)
val e3 = e1.copy(status = Down) val e3 = Member(e1.address, Down, Set.empty)
"A Gossip" must { "A Gossip" must {