diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index e3ae50d12a..f804f1f5ac 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -68,54 +68,140 @@ object ClusterEvent { } /** - * Marker interface for member related events. + * Marker interface for membership events. + * Only published after convergence, when all members have seen current + * state. */ sealed trait MemberEvent extends ClusterDomainEvent { def member: Member } /** - * A new member joined the cluster. Only published after convergence. + * A new member joined the cluster. + * Only published after convergence, when all members have seen current + * state. */ case class MemberJoined(member: Member) extends MemberEvent { if (member.status != Joining) throw new IllegalArgumentException("Expected Joining status, got: " + member) } /** - * Member status changed to Up. Only published after convergence. + * Member status changed to Up. + * Only published after convergence, when all members have seen current + * state. */ case class MemberUp(member: Member) extends MemberEvent { if (member.status != Up) throw new IllegalArgumentException("Expected Up status, got: " + member) } /** - * Member status changed to Leaving. Only published after convergence. + * Member status changed to Leaving. + * Only published after convergence, when all members have seen current + * state. */ case class MemberLeft(member: Member) extends MemberEvent { if (member.status != Leaving) throw new IllegalArgumentException("Expected Leaving status, got: " + member) } /** - * Member status changed to Exiting. Only published after convergence. + * Member status changed to Exiting. + * Only published after convergence, when all members have seen current + * state. */ case class MemberExited(member: Member) extends MemberEvent { if (member.status != Exiting) throw new IllegalArgumentException("Expected Exiting status, got: " + member) } /** - * Member status changed to Down. Only published after convergence. + * Member status changed to Down. + * Only published after convergence, when all members have seen current + * state. */ case class MemberDowned(member: Member) extends MemberEvent { if (member.status != Down) throw new IllegalArgumentException("Expected Down status, got: " + member) } /** - * Member completely removed from the cluster. Only published after convergence. + * Member completely removed from the cluster. Only published after convergence, + * when all other members have seen the state. */ case class MemberRemoved(member: Member) extends MemberEvent { if (member.status != Removed) throw new IllegalArgumentException("Expected Removed status, got: " + member) } + /** + * Current snapshot state of the cluster. Sent to new subscriber of + * [akka.cluster.ClusterEvent.InstantMemberEvent]. + */ + case class InstantClusterState(members: immutable.SortedSet[Member] = immutable.SortedSet.empty) + extends ClusterDomainEvent { + + /** + * Java API + * Read only + */ + def getMembers: java.lang.Iterable[Member] = { + import scala.collection.JavaConverters._ + members.asJava + } + } + + /** + * Marker interface for membership events published immediately. + * All other members might not have seen the state. + */ + sealed trait InstantMemberEvent extends ClusterDomainEvent { + def member: Member + } + + /** + * A new member joined the cluster. Published immediately when it happened. + * All other members might not have seen the state. + */ + case class InstantMemberJoined(member: Member) extends InstantMemberEvent { + if (member.status != Joining) throw new IllegalArgumentException("Expected Joining status, got: " + member) + } + + /** + * Member status changed to Up. Published immediately when it happened. + * All other members might not have seen the state. + */ + case class InstantMemberUp(member: Member) extends InstantMemberEvent { + if (member.status != Up) throw new IllegalArgumentException("Expected Up status, got: " + member) + } + + /** + * Member status changed to Leaving. Published immediately when it happened. + * All other members might not have seen the state. + */ + case class InstantMemberLeft(member: Member) extends InstantMemberEvent { + if (member.status != Leaving) throw new IllegalArgumentException("Expected Leaving status, got: " + member) + } + + /** + * Member status changed to Exiting. Published immediately when it happened. + * All other members might not have seen the state. + */ + case class InstantMemberExited(member: Member) extends InstantMemberEvent { + if (member.status != Exiting) throw new IllegalArgumentException("Expected Exiting status, got: " + member) + } + + /** + * Member status changed to Down. Published immediately when it happened. + * All other members might not have seen the state. + */ + case class InstantMemberDowned(member: Member) extends InstantMemberEvent { + if (member.status != Down) throw new IllegalArgumentException("Expected Down status, got: " + member) + } + + /** + * Member completely removed from the cluster. Published immediately when it happened. + * All other members might not have seen the state. + */ + case class InstantMemberRemoved(member: Member) extends InstantMemberEvent { + if (member.status != Removed) throw new IllegalArgumentException("Expected Removed status, got: " + member) + } + /** * Leader of the cluster members changed. Only published after convergence. */ @@ -209,6 +295,21 @@ object ClusterEvent { ++= removedEvents).result() } + /** + * INTERNAL API + */ + private[cluster] def convertToInstantMemberEvents(memberEvents: immutable.Seq[MemberEvent]): immutable.Seq[InstantMemberEvent] = + memberEvents map { event ⇒ + event match { + case MemberJoined(m) ⇒ InstantMemberJoined(m) + case MemberUp(m) ⇒ InstantMemberUp(m) + case MemberDowned(m) ⇒ InstantMemberDowned(m) + case MemberLeft(m) ⇒ InstantMemberLeft(m) + case MemberExited(m) ⇒ InstantMemberExited(m) + case MemberRemoved(m) ⇒ InstantMemberRemoved(m) + } + } + /** * INTERNAL API */ @@ -269,8 +370,21 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto } } + def publishInstantClusterState(receiver: ActorRef): Unit = { + // The state is based on latest gossip to mimic what you + // would have seen if you where listening to the InstantMemberEvent stream. + receiver ! InstantClusterState(members = latestGossip.members) + } + def subscribe(subscriber: ActorRef, to: Class[_]): Unit = { - publishCurrentClusterState(Some(subscriber)) + if (classOf[ClusterDomainEvent] == to) { + publishInstantClusterState(subscriber) + publishCurrentClusterState(Some(subscriber)) + } else if (classOf[InstantMemberEvent].isAssignableFrom(to)) + publishInstantClusterState(subscriber) + else + publishCurrentClusterState(Some(subscriber)) + eventStream.subscribe(subscriber, to) } @@ -285,8 +399,10 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto latestGossip = newGossip // first publish the diffUnreachable between the last two gossips diffUnreachable(oldGossip, newGossip) foreach publish + val newMemberEvents = diffMemberEvents(oldGossip, newGossip) + convertToInstantMemberEvents(newMemberEvents) foreach publish // buffer up the MemberEvents waiting for convergence - memberEvents ++= diffMemberEvents(oldGossip, newGossip) + memberEvents ++= newMemberEvents // if we have convergence then publish the MemberEvents and possibly a LeaderChanged if (newGossip.convergence) { val previousConvergedGossip = latestConvergedGossip diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index ce57413287..571148af41 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -59,10 +59,11 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { case event: MemberEvent ⇒ // replace current member with new member (might have different status, only address is used in equals) state = state.copy(members = state.members - event.member + event.member) - case LeaderChanged(leader) ⇒ state = state.copy(leader = leader) - case s: CurrentClusterState ⇒ state = s - case CurrentInternalStats(stats) ⇒ _latestStats = stats - case ClusterMetricsChanged(nodes) ⇒ _clusterMetrics = nodes + case LeaderChanged(leader) ⇒ state = state.copy(leader = leader) + case s: CurrentClusterState ⇒ state = s + case CurrentInternalStats(stats) ⇒ _latestStats = stats + case ClusterMetricsChanged(nodes) ⇒ _clusterMetrics = nodes + case _: InstantClusterState | _: InstantMemberEvent ⇒ // not used here } } }).withDispatcher(cluster.settings.UseDispatcher), name = "clusterEventBusListener") diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala index 12951447fe..2323dd4620 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -36,16 +36,18 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec val g4 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address) val g5 = Gossip(members = SortedSet(d1, a1, b1, c2)).seen(a1.address).seen(b1.address).seen(c2.address).seen(d1.address) - override def atStartup(): Unit = { - system.eventStream.subscribe(testActor, classOf[ClusterDomainEvent]) - } + // created in beforeEach + var memberSubscriber: TestProbe = _ override def beforeEach(): Unit = { + memberSubscriber = TestProbe() + system.eventStream.subscribe(memberSubscriber.ref, classOf[MemberEvent]) + system.eventStream.subscribe(memberSubscriber.ref, classOf[LeaderChanged]) + publisher = system.actorOf(Props[ClusterDomainEventPublisher]) publisher ! PublishChanges(g0) - expectMsg(MemberUp(a1)) - expectMsg(LeaderChanged(Some(a1.address))) - expectMsgType[SeenChanged] + memberSubscriber.expectMsg(MemberUp(a1)) + memberSubscriber.expectMsg(LeaderChanged(Some(a1.address))) } override def afterEach(): Unit = { @@ -56,94 +58,114 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec "not publish MemberUp when there is no convergence" in { publisher ! PublishChanges(g2) - expectMsgType[SeenChanged] } "publish MemberEvents when there is convergence" in { publisher ! PublishChanges(g2) - expectMsgType[SeenChanged] publisher ! PublishChanges(g3) - expectMsg(MemberUp(b1)) - expectMsg(MemberUp(c2)) - expectMsgType[SeenChanged] + memberSubscriber.expectMsg(MemberUp(b1)) + memberSubscriber.expectMsg(MemberUp(c2)) } "publish leader changed when new leader after convergence" in { publisher ! PublishChanges(g4) - expectMsgType[SeenChanged] - expectNoMsg(1 second) + memberSubscriber.expectNoMsg(1 second) publisher ! PublishChanges(g5) - expectMsg(MemberUp(d1)) - expectMsg(MemberUp(b1)) - expectMsg(MemberUp(c2)) - expectMsg(LeaderChanged(Some(d1.address))) - expectMsgType[SeenChanged] + memberSubscriber.expectMsg(MemberUp(d1)) + memberSubscriber.expectMsg(MemberUp(b1)) + memberSubscriber.expectMsg(MemberUp(c2)) + memberSubscriber.expectMsg(LeaderChanged(Some(d1.address))) } "publish leader changed when new leader and convergence both before and after" in { // convergence both before and after publisher ! PublishChanges(g3) - expectMsg(MemberUp(b1)) - expectMsg(MemberUp(c2)) - expectMsgType[SeenChanged] + memberSubscriber.expectMsg(MemberUp(b1)) + memberSubscriber.expectMsg(MemberUp(c2)) publisher ! PublishChanges(g5) - expectMsg(MemberUp(d1)) - expectMsg(LeaderChanged(Some(d1.address))) - expectMsgType[SeenChanged] + memberSubscriber.expectMsg(MemberUp(d1)) + memberSubscriber.expectMsg(LeaderChanged(Some(d1.address))) } "not publish leader changed when not convergence" in { publisher ! PublishChanges(g4) - expectMsgType[SeenChanged] - expectNoMsg(1 second) + memberSubscriber.expectNoMsg(1 second) } "not publish leader changed when changed convergence but still same leader" in { publisher ! PublishChanges(g5) - expectMsg(MemberUp(d1)) - expectMsg(MemberUp(b1)) - expectMsg(MemberUp(c2)) - expectMsg(LeaderChanged(Some(d1.address))) - expectMsgType[SeenChanged] + memberSubscriber.expectMsg(MemberUp(d1)) + memberSubscriber.expectMsg(MemberUp(b1)) + memberSubscriber.expectMsg(MemberUp(c2)) + memberSubscriber.expectMsg(LeaderChanged(Some(d1.address))) publisher ! PublishChanges(g4) - expectMsgType[SeenChanged] + memberSubscriber.expectNoMsg(1 second) publisher ! PublishChanges(g5) - expectMsgType[SeenChanged] + memberSubscriber.expectNoMsg(1 second) } "send CurrentClusterState when subscribe" in { val subscriber = TestProbe() publisher ! Subscribe(subscriber.ref, classOf[ClusterDomainEvent]) + subscriber.expectMsgType[InstantClusterState] subscriber.expectMsgType[CurrentClusterState] // but only to the new subscriber - expectNoMsg(1 second) + memberSubscriber.expectNoMsg(1 second) } "support unsubscribe" in { val subscriber = TestProbe() - publisher ! Subscribe(subscriber.ref, classOf[ClusterDomainEvent]) + publisher ! Subscribe(subscriber.ref, classOf[MemberEvent]) subscriber.expectMsgType[CurrentClusterState] - publisher ! Unsubscribe(subscriber.ref, Some(classOf[ClusterDomainEvent])) + publisher ! Unsubscribe(subscriber.ref, Some(classOf[MemberEvent])) publisher ! PublishChanges(g3) subscriber.expectNoMsg(1 second) - // but testActor is still subscriber - expectMsg(MemberUp(b1)) - expectMsg(MemberUp(c2)) - expectMsgType[SeenChanged] + // but memberSubscriber is still subscriber + memberSubscriber.expectMsg(MemberUp(b1)) + memberSubscriber.expectMsg(MemberUp(c2)) } "publish clean state when PublishStart" in { + val subscriber = TestProbe() + publisher ! Subscribe(subscriber.ref, classOf[ClusterDomainEvent]) + subscriber.expectMsgType[InstantClusterState] + subscriber.expectMsgType[CurrentClusterState] publisher ! PublishChanges(g3) - expectMsg(MemberUp(b1)) - expectMsg(MemberUp(c2)) - expectMsgType[SeenChanged] + subscriber.expectMsg(InstantMemberUp(b1)) + subscriber.expectMsg(InstantMemberUp(c2)) + subscriber.expectMsg(MemberUp(b1)) + subscriber.expectMsg(MemberUp(c2)) + subscriber.expectMsgType[SeenChanged] publisher ! PublishStart - expectMsgType[CurrentClusterState] must be(CurrentClusterState()) + subscriber.expectMsgType[CurrentClusterState] must be(CurrentClusterState()) + } + "publish immediately when subscribing to InstantMemberEvent" in { + val subscriber = TestProbe() + publisher ! Subscribe(subscriber.ref, classOf[InstantMemberEvent]) + subscriber.expectMsgType[InstantClusterState] + publisher ! PublishChanges(g2) + subscriber.expectMsg(InstantMemberUp(b1)) + subscriber.expectMsg(InstantMemberUp(c2)) + subscriber.expectNoMsg(1 second) + publisher ! PublishChanges(g3) + subscriber.expectNoMsg(1 second) + } + + "publish SeenChanged" in { + val subscriber = TestProbe() + publisher ! Subscribe(subscriber.ref, classOf[SeenChanged]) + subscriber.expectMsgType[CurrentClusterState] + publisher ! PublishChanges(g2) + subscriber.expectMsgType[SeenChanged] + subscriber.expectNoMsg(1 second) + publisher ! PublishChanges(g3) + subscriber.expectMsgType[SeenChanged] + subscriber.expectNoMsg(1 second) } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 7d766bf17e..55c35ea6e3 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -76,10 +76,11 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { try { cluster.subscribe(testActor, classOf[ClusterEvent.ClusterDomainEvent]) // first, is in response to the subscription - expectMsgClass(classOf[ClusterEvent.ClusterDomainEvent]) + expectMsgClass(classOf[ClusterEvent.InstantClusterState]) + expectMsgClass(classOf[ClusterEvent.CurrentClusterState]) cluster.publishCurrentClusterState() - expectMsgClass(classOf[ClusterEvent.ClusterDomainEvent]) + expectMsgClass(classOf[ClusterEvent.CurrentClusterState]) } finally { cluster.unsubscribe(testActor) } @@ -87,7 +88,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { "send CurrentClusterState to one receiver when requested" in { cluster.sendCurrentClusterState(testActor) - expectMsgClass(classOf[ClusterEvent.ClusterDomainEvent]) + expectMsgClass(classOf[ClusterEvent.CurrentClusterState]) } } diff --git a/akka-docs/rst/cluster/cluster-usage-java.rst b/akka-docs/rst/cluster/cluster-usage-java.rst index 0b418350b2..c9c158cc03 100644 --- a/akka-docs/rst/cluster/cluster-usage-java.rst +++ b/akka-docs/rst/cluster/cluster-usage-java.rst @@ -89,7 +89,7 @@ and becomes a member of the cluster. It's status changed to 'Up'. Switch over to the first terminal window and see in the log output that the member joined. -6. Start another node. Open a sbt session in yet another terminal window and run:: +6. Start another node. Open a maven session in yet another terminal window and run:: mvn exec:java -Dexec.mainClass="sample.cluster.simple.japi.SimpleClusterApp"