+act #18575 Publish MemberJoined

This commit is contained in:
Heiko Seeberger 2015-10-15 08:08:01 +02:00
parent a6279b7b3f
commit 821dc2199b
7 changed files with 113 additions and 90 deletions

View file

@ -121,6 +121,13 @@ object ClusterEvent {
def member: Member
}
/**
* Member status changed to Joining.
*/
final case class MemberJoined(member: Member) extends MemberEvent {
if (member.status != Joining) throw new IllegalArgumentException("Expected Joining status, got: " + member)
}
/**
* Member status changed to WeaklyUp.
* A joining member can be moved to `WeaklyUp` if convergence
@ -138,6 +145,13 @@ object ClusterEvent {
if (member.status != Up) throw new IllegalArgumentException("Expected Up status, got: " + member)
}
/**
* Member status changed to Leaving.
*/
final case class MemberLeft(member: Member) extends MemberEvent {
if (member.status != Leaving) throw new IllegalArgumentException("Expected Leaving status, got: " + member)
}
/**
* Member status changed to `MemberStatus.Exiting` and will be removed
* when all members have seen the `Exiting` status.
@ -278,8 +292,10 @@ object ClusterEvent {
case (_, newMember :: oldMember :: Nil) if newMember.status != oldMember.status newMember
}
val memberEvents = (newMembers ++ changedMembers) collect {
case m if m.status == Joining MemberJoined(m)
case m if m.status == WeaklyUp MemberWeaklyUp(m)
case m if m.status == Up MemberUp(m)
case m if m.status == Leaving MemberLeft(m)
case m if m.status == Exiting MemberExited(m)
// no events for other transitions
}

View file

@ -95,11 +95,11 @@ abstract class ConvergenceSpec(multiNodeConfig: ConvergenceMultiNodeConfig)
runOn(first, second, fourth) {
for (n 1 to 5) {
awaitAssert(clusterView.members.size should ===(3))
awaitAssert(clusterView.members.size should ===(4))
awaitSeenSameState(first, second, fourth)
memberStatus(first) should ===(Some(MemberStatus.Up))
memberStatus(second) should ===(Some(MemberStatus.Up))
memberStatus(fourth) should ===(None)
memberStatus(fourth) should ===(Some(MemberStatus.Joining))
// wait and then check again
Thread.sleep(1.second.dilated.toMillis)
}

View file

@ -67,7 +67,7 @@ abstract class NodeChurnSpec
additionaSystems.foreach { s
val c = Cluster(s)
c.state.members.size should be(numberOfMembers)
c.state.members.forall(_.status == MemberStatus.Up)
c.state.members.forall(_.status == MemberStatus.Up) shouldBe true
}
}
}

View file

@ -44,7 +44,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish
val dUp = TestMember(Address("akka.tcp", "sys", "d", 2552), Up, Set("GRP"))
val g0 = Gossip(members = SortedSet(aUp)).seen(aUp.uniqueAddress)
val g1 = Gossip(members = SortedSet(aUp, bExiting, cJoining)).seen(aUp.uniqueAddress).seen(bExiting.uniqueAddress).seen(cJoining.uniqueAddress)
val g1 = Gossip(members = SortedSet(aUp, cJoining)).seen(aUp.uniqueAddress).seen(cJoining.uniqueAddress)
val g2 = Gossip(members = SortedSet(aUp, bExiting, cUp)).seen(aUp.uniqueAddress)
val g3 = g2.seen(bExiting.uniqueAddress).seen(cUp.uniqueAddress)
val g4 = Gossip(members = SortedSet(a51Up, aUp, bExiting, cUp)).seen(aUp.uniqueAddress)
@ -71,6 +71,11 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish
"ClusterDomainEventPublisher" must {
"publish MemberJoined" in {
publisher ! PublishChanges(g1)
memberSubscriber.expectMsg(MemberJoined(cJoining))
}
"publish MemberUp" in {
publisher ! PublishChanges(g2)
publisher ! PublishChanges(g3)
@ -92,7 +97,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish
memberSubscriber.expectMsg(MemberExited(bExiting))
memberSubscriber.expectMsg(MemberUp(cUp))
publisher ! PublishChanges(g6)
memberSubscriber.expectNoMsg(500 millis)
memberSubscriber.expectMsg(MemberLeft(aLeaving))
publisher ! PublishChanges(g7)
memberSubscriber.expectMsg(MemberExited(aExiting))
memberSubscriber.expectMsg(LeaderChanged(Some(cUp.address)))

View file

@ -51,7 +51,7 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
val (g1, _) = converge(Gossip(members = SortedSet(aUp)))
val (g2, s2) = converge(Gossip(members = SortedSet(aUp, bUp, eJoining)))
diffMemberEvents(g1, g2) should ===(Seq(MemberUp(bUp)))
diffMemberEvents(g1, g2) should ===(Seq(MemberUp(bUp), MemberJoined(eJoining)))
diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq.empty)
diffSeen(g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
}
@ -60,7 +60,7 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
val (g1, _) = converge(Gossip(members = SortedSet(aJoining, bUp, cUp)))
val (g2, s2) = converge(Gossip(members = SortedSet(aUp, bUp, cLeaving, eJoining)))
diffMemberEvents(g1, g2) should ===(Seq(MemberUp(aUp)))
diffMemberEvents(g1, g2) should ===(Seq(MemberUp(aUp), MemberLeft(cLeaving), MemberJoined(eJoining)))
diffUnreachable(g1, g2, selfDummyAddress) should ===(Seq.empty)
diffSeen(g1, g2, selfDummyAddress) should ===(Seq(SeenChanged(convergence = true, seenBy = s2.map(_.address))))
}

View file

@ -237,6 +237,7 @@ to the current state and it is not the full history of all changes that actually
The events to track the life-cycle of members are:
* ``ClusterEvent.MemberJoined`` - A new member has joined the cluster and its status has been changed to ``Joining``.
* ``ClusterEvent.MemberUp`` - A new member has joined the cluster and its status has been changed to ``Up``.
* ``ClusterEvent.MemberExited`` - A member is leaving the cluster and its status has been changed to ``Exiting``
Note that the node might already have been shutdown when this event is published on another node.

View file

@ -231,6 +231,7 @@ to the current state and it is not the full history of all changes that actually
The events to track the life-cycle of members are:
* ``ClusterEvent.MemberJoined`` - A new member has joined the cluster and its status has been changed to ``Joining``.
* ``ClusterEvent.MemberUp`` - A new member has joined the cluster and its status has been changed to ``Up``.
* ``ClusterEvent.MemberExited`` - A member is leaving the cluster and its status has been changed to ``Exiting``
Note that the node might already have been shutdown when this event is published on another node.