Publish InstantMemberEvent immediately, see #2803

This commit is contained in:
Patrik Nordwall 2013-01-14 17:35:56 +01:00
parent c321f735f3
commit d07f331e78
5 changed files with 201 additions and 61 deletions

View file

@ -68,54 +68,140 @@ object ClusterEvent {
}
/**
* Marker interface for member related events.
* Marker interface for membership events.
* Only published after convergence, when all members have seen current
* state.
*/
sealed trait MemberEvent extends ClusterDomainEvent {
def member: Member
}
/**
* A new member joined the cluster. Only published after convergence.
* A new member joined the cluster.
* Only published after convergence, when all members have seen current
* state.
*/
case class MemberJoined(member: Member) extends MemberEvent {
if (member.status != Joining) throw new IllegalArgumentException("Expected Joining status, got: " + member)
}
/**
* Member status changed to Up. Only published after convergence.
* Member status changed to Up.
* Only published after convergence, when all members have seen current
* state.
*/
case class MemberUp(member: Member) extends MemberEvent {
if (member.status != Up) throw new IllegalArgumentException("Expected Up status, got: " + member)
}
/**
* Member status changed to Leaving. Only published after convergence.
* Member status changed to Leaving.
* Only published after convergence, when all members have seen current
* state.
*/
case class MemberLeft(member: Member) extends MemberEvent {
if (member.status != Leaving) throw new IllegalArgumentException("Expected Leaving status, got: " + member)
}
/**
* Member status changed to Exiting. Only published after convergence.
* Member status changed to Exiting.
* Only published after convergence, when all members have seen current
* state.
*/
case class MemberExited(member: Member) extends MemberEvent {
if (member.status != Exiting) throw new IllegalArgumentException("Expected Exiting status, got: " + member)
}
/**
* Member status changed to Down. Only published after convergence.
* Member status changed to Down.
* Only published after convergence, when all members have seen current
* state.
*/
case class MemberDowned(member: Member) extends MemberEvent {
if (member.status != Down) throw new IllegalArgumentException("Expected Down status, got: " + member)
}
/**
* Member completely removed from the cluster. Only published after convergence.
* Member completely removed from the cluster. Only published after convergence,
* when all other members have seen the state.
*/
case class MemberRemoved(member: Member) extends MemberEvent {
if (member.status != Removed) throw new IllegalArgumentException("Expected Removed status, got: " + member)
}
/**
* Current snapshot state of the cluster. Sent to new subscriber of
* [akka.cluster.ClusterEvent.InstantMemberEvent].
*/
case class InstantClusterState(members: immutable.SortedSet[Member] = immutable.SortedSet.empty)
extends ClusterDomainEvent {
/**
* Java API
* Read only
*/
def getMembers: java.lang.Iterable[Member] = {
import scala.collection.JavaConverters._
members.asJava
}
}
/**
* Marker interface for membership events published immediately.
* All other members might not have seen the state.
*/
sealed trait InstantMemberEvent extends ClusterDomainEvent {
def member: Member
}
/**
* A new member joined the cluster. Published immediately when it happened.
* All other members might not have seen the state.
*/
case class InstantMemberJoined(member: Member) extends InstantMemberEvent {
if (member.status != Joining) throw new IllegalArgumentException("Expected Joining status, got: " + member)
}
/**
* Member status changed to Up. Published immediately when it happened.
* All other members might not have seen the state.
*/
case class InstantMemberUp(member: Member) extends InstantMemberEvent {
if (member.status != Up) throw new IllegalArgumentException("Expected Up status, got: " + member)
}
/**
* Member status changed to Leaving. Published immediately when it happened.
* All other members might not have seen the state.
*/
case class InstantMemberLeft(member: Member) extends InstantMemberEvent {
if (member.status != Leaving) throw new IllegalArgumentException("Expected Leaving status, got: " + member)
}
/**
* Member status changed to Exiting. Published immediately when it happened.
* All other members might not have seen the state.
*/
case class InstantMemberExited(member: Member) extends InstantMemberEvent {
if (member.status != Exiting) throw new IllegalArgumentException("Expected Exiting status, got: " + member)
}
/**
* Member status changed to Down. Published immediately when it happened.
* All other members might not have seen the state.
*/
case class InstantMemberDowned(member: Member) extends InstantMemberEvent {
if (member.status != Down) throw new IllegalArgumentException("Expected Down status, got: " + member)
}
/**
* Member completely removed from the cluster. Published immediately when it happened.
* All other members might not have seen the state.
*/
case class InstantMemberRemoved(member: Member) extends InstantMemberEvent {
if (member.status != Removed) throw new IllegalArgumentException("Expected Removed status, got: " + member)
}
/**
* Leader of the cluster members changed. Only published after convergence.
*/
@ -209,6 +295,21 @@ object ClusterEvent {
++= removedEvents).result()
}
/**
* INTERNAL API
*/
private[cluster] def convertToInstantMemberEvents(memberEvents: immutable.Seq[MemberEvent]): immutable.Seq[InstantMemberEvent] =
memberEvents map { event
event match {
case MemberJoined(m) InstantMemberJoined(m)
case MemberUp(m) InstantMemberUp(m)
case MemberDowned(m) InstantMemberDowned(m)
case MemberLeft(m) InstantMemberLeft(m)
case MemberExited(m) InstantMemberExited(m)
case MemberRemoved(m) InstantMemberRemoved(m)
}
}
/**
* INTERNAL API
*/
@ -269,8 +370,21 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
}
}
def publishInstantClusterState(receiver: ActorRef): Unit = {
// The state is based on latest gossip to mimic what you
// would have seen if you where listening to the InstantMemberEvent stream.
receiver ! InstantClusterState(members = latestGossip.members)
}
def subscribe(subscriber: ActorRef, to: Class[_]): Unit = {
publishCurrentClusterState(Some(subscriber))
if (classOf[ClusterDomainEvent] == to) {
publishInstantClusterState(subscriber)
publishCurrentClusterState(Some(subscriber))
} else if (classOf[InstantMemberEvent].isAssignableFrom(to))
publishInstantClusterState(subscriber)
else
publishCurrentClusterState(Some(subscriber))
eventStream.subscribe(subscriber, to)
}
@ -285,8 +399,10 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
latestGossip = newGossip
// first publish the diffUnreachable between the last two gossips
diffUnreachable(oldGossip, newGossip) foreach publish
val newMemberEvents = diffMemberEvents(oldGossip, newGossip)
convertToInstantMemberEvents(newMemberEvents) foreach publish
// buffer up the MemberEvents waiting for convergence
memberEvents ++= diffMemberEvents(oldGossip, newGossip)
memberEvents ++= newMemberEvents
// if we have convergence then publish the MemberEvents and possibly a LeaderChanged
if (newGossip.convergence) {
val previousConvergedGossip = latestConvergedGossip

View file

@ -59,10 +59,11 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
case event: MemberEvent
// replace current member with new member (might have different status, only address is used in equals)
state = state.copy(members = state.members - event.member + event.member)
case LeaderChanged(leader) state = state.copy(leader = leader)
case s: CurrentClusterState state = s
case CurrentInternalStats(stats) _latestStats = stats
case ClusterMetricsChanged(nodes) _clusterMetrics = nodes
case LeaderChanged(leader) state = state.copy(leader = leader)
case s: CurrentClusterState state = s
case CurrentInternalStats(stats) _latestStats = stats
case ClusterMetricsChanged(nodes) _clusterMetrics = nodes
case _: InstantClusterState | _: InstantMemberEvent // not used here
}
}
}).withDispatcher(cluster.settings.UseDispatcher), name = "clusterEventBusListener")

View file

@ -36,16 +36,18 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
val g4 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address)
val g5 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address).seen(b1.address).seen(c2.address).seen(d1.address)
override def atStartup(): Unit = {
system.eventStream.subscribe(testActor, classOf[ClusterDomainEvent])
}
// created in beforeEach
var memberSubscriber: TestProbe = _
override def beforeEach(): Unit = {
memberSubscriber = TestProbe()
system.eventStream.subscribe(memberSubscriber.ref, classOf[MemberEvent])
system.eventStream.subscribe(memberSubscriber.ref, classOf[LeaderChanged])
publisher = system.actorOf(Props[ClusterDomainEventPublisher])
publisher ! PublishChanges(g0)
expectMsg(MemberUp(a1))
expectMsg(LeaderChanged(Some(a1.address)))
expectMsgType[SeenChanged]
memberSubscriber.expectMsg(MemberUp(a1))
memberSubscriber.expectMsg(LeaderChanged(Some(a1.address)))
}
override def afterEach(): Unit = {
@ -56,94 +58,114 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
"not publish MemberUp when there is no convergence" in {
publisher ! PublishChanges(g2)
expectMsgType[SeenChanged]
}
"publish MemberEvents when there is convergence" in {
publisher ! PublishChanges(g2)
expectMsgType[SeenChanged]
publisher ! PublishChanges(g3)
expectMsg(MemberUp(b1))
expectMsg(MemberUp(c2))
expectMsgType[SeenChanged]
memberSubscriber.expectMsg(MemberUp(b1))
memberSubscriber.expectMsg(MemberUp(c2))
}
"publish leader changed when new leader after convergence" in {
publisher ! PublishChanges(g4)
expectMsgType[SeenChanged]
expectNoMsg(1 second)
memberSubscriber.expectNoMsg(1 second)
publisher ! PublishChanges(g5)
expectMsg(MemberUp(d1))
expectMsg(MemberUp(b1))
expectMsg(MemberUp(c2))
expectMsg(LeaderChanged(Some(d1.address)))
expectMsgType[SeenChanged]
memberSubscriber.expectMsg(MemberUp(d1))
memberSubscriber.expectMsg(MemberUp(b1))
memberSubscriber.expectMsg(MemberUp(c2))
memberSubscriber.expectMsg(LeaderChanged(Some(d1.address)))
}
"publish leader changed when new leader and convergence both before and after" in {
// convergence both before and after
publisher ! PublishChanges(g3)
expectMsg(MemberUp(b1))
expectMsg(MemberUp(c2))
expectMsgType[SeenChanged]
memberSubscriber.expectMsg(MemberUp(b1))
memberSubscriber.expectMsg(MemberUp(c2))
publisher ! PublishChanges(g5)
expectMsg(MemberUp(d1))
expectMsg(LeaderChanged(Some(d1.address)))
expectMsgType[SeenChanged]
memberSubscriber.expectMsg(MemberUp(d1))
memberSubscriber.expectMsg(LeaderChanged(Some(d1.address)))
}
"not publish leader changed when not convergence" in {
publisher ! PublishChanges(g4)
expectMsgType[SeenChanged]
expectNoMsg(1 second)
memberSubscriber.expectNoMsg(1 second)
}
"not publish leader changed when changed convergence but still same leader" in {
publisher ! PublishChanges(g5)
expectMsg(MemberUp(d1))
expectMsg(MemberUp(b1))
expectMsg(MemberUp(c2))
expectMsg(LeaderChanged(Some(d1.address)))
expectMsgType[SeenChanged]
memberSubscriber.expectMsg(MemberUp(d1))
memberSubscriber.expectMsg(MemberUp(b1))
memberSubscriber.expectMsg(MemberUp(c2))
memberSubscriber.expectMsg(LeaderChanged(Some(d1.address)))
publisher ! PublishChanges(g4)
expectMsgType[SeenChanged]
memberSubscriber.expectNoMsg(1 second)
publisher ! PublishChanges(g5)
expectMsgType[SeenChanged]
memberSubscriber.expectNoMsg(1 second)
}
"send CurrentClusterState when subscribe" in {
val subscriber = TestProbe()
publisher ! Subscribe(subscriber.ref, classOf[ClusterDomainEvent])
subscriber.expectMsgType[InstantClusterState]
subscriber.expectMsgType[CurrentClusterState]
// but only to the new subscriber
expectNoMsg(1 second)
memberSubscriber.expectNoMsg(1 second)
}
"support unsubscribe" in {
val subscriber = TestProbe()
publisher ! Subscribe(subscriber.ref, classOf[ClusterDomainEvent])
publisher ! Subscribe(subscriber.ref, classOf[MemberEvent])
subscriber.expectMsgType[CurrentClusterState]
publisher ! Unsubscribe(subscriber.ref, Some(classOf[ClusterDomainEvent]))
publisher ! Unsubscribe(subscriber.ref, Some(classOf[MemberEvent]))
publisher ! PublishChanges(g3)
subscriber.expectNoMsg(1 second)
// but testActor is still subscriber
expectMsg(MemberUp(b1))
expectMsg(MemberUp(c2))
expectMsgType[SeenChanged]
// but memberSubscriber is still subscriber
memberSubscriber.expectMsg(MemberUp(b1))
memberSubscriber.expectMsg(MemberUp(c2))
}
"publish clean state when PublishStart" in {
val subscriber = TestProbe()
publisher ! Subscribe(subscriber.ref, classOf[ClusterDomainEvent])
subscriber.expectMsgType[InstantClusterState]
subscriber.expectMsgType[CurrentClusterState]
publisher ! PublishChanges(g3)
expectMsg(MemberUp(b1))
expectMsg(MemberUp(c2))
expectMsgType[SeenChanged]
subscriber.expectMsg(InstantMemberUp(b1))
subscriber.expectMsg(InstantMemberUp(c2))
subscriber.expectMsg(MemberUp(b1))
subscriber.expectMsg(MemberUp(c2))
subscriber.expectMsgType[SeenChanged]
publisher ! PublishStart
expectMsgType[CurrentClusterState] must be(CurrentClusterState())
subscriber.expectMsgType[CurrentClusterState] must be(CurrentClusterState())
}
"publish immediately when subscribing to InstantMemberEvent" in {
val subscriber = TestProbe()
publisher ! Subscribe(subscriber.ref, classOf[InstantMemberEvent])
subscriber.expectMsgType[InstantClusterState]
publisher ! PublishChanges(g2)
subscriber.expectMsg(InstantMemberUp(b1))
subscriber.expectMsg(InstantMemberUp(c2))
subscriber.expectNoMsg(1 second)
publisher ! PublishChanges(g3)
subscriber.expectNoMsg(1 second)
}
"publish SeenChanged" in {
val subscriber = TestProbe()
publisher ! Subscribe(subscriber.ref, classOf[SeenChanged])
subscriber.expectMsgType[CurrentClusterState]
publisher ! PublishChanges(g2)
subscriber.expectMsgType[SeenChanged]
subscriber.expectNoMsg(1 second)
publisher ! PublishChanges(g3)
subscriber.expectMsgType[SeenChanged]
subscriber.expectNoMsg(1 second)
}
}

View file

@ -76,10 +76,11 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
try {
cluster.subscribe(testActor, classOf[ClusterEvent.ClusterDomainEvent])
// first, is in response to the subscription
expectMsgClass(classOf[ClusterEvent.ClusterDomainEvent])
expectMsgClass(classOf[ClusterEvent.InstantClusterState])
expectMsgClass(classOf[ClusterEvent.CurrentClusterState])
cluster.publishCurrentClusterState()
expectMsgClass(classOf[ClusterEvent.ClusterDomainEvent])
expectMsgClass(classOf[ClusterEvent.CurrentClusterState])
} finally {
cluster.unsubscribe(testActor)
}
@ -87,7 +88,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
"send CurrentClusterState to one receiver when requested" in {
cluster.sendCurrentClusterState(testActor)
expectMsgClass(classOf[ClusterEvent.ClusterDomainEvent])
expectMsgClass(classOf[ClusterEvent.CurrentClusterState])
}
}

View file

@ -89,7 +89,7 @@ and becomes a member of the cluster. It's status changed to 'Up'.
Switch over to the first terminal window and see in the log output that the member joined.
6. Start another node. Open a sbt session in yet another terminal window and run::
6. Start another node. Open a maven session in yet another terminal window and run::
mvn exec:java -Dexec.mainClass="sample.cluster.simple.japi.SimpleClusterApp"