diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 5ffeecfa39..4477f057fc 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -29,6 +29,7 @@ import com.typesafe.config.Config import akka.event.LoggingAdapter import java.util.concurrent.ThreadFactory import scala.util.control.NonFatal +import scala.annotation.varargs /** * Cluster Extension Id and factory for creating Cluster extension. @@ -178,13 +179,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { } } - @volatile - private var readViewStarted = false - private[cluster] lazy val readView: ClusterReadView = { - val readView = new ClusterReadView(this) - readViewStarted = true - readView - } + private[cluster] val readView: ClusterReadView = new ClusterReadView(this) system.registerOnTermination(shutdown()) @@ -207,15 +202,38 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { def isTerminated: Boolean = _isTerminated.get /** - * Subscribe to cluster domain events. - * The `to` Class can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]] - * or subclass. + * Current snapshot state of the cluster. + */ + def state: CurrentClusterState = readView.state + + /** + * Subscribe to one or more cluster domain events. + * The `to` classes can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]] + * or subclasses. * * A snapshot of [[akka.cluster.ClusterEvent.CurrentClusterState]] - * will be sent to the subscriber as the first event. + * will be sent to the subscriber as the first message. */ - def subscribe(subscriber: ActorRef, to: Class[_]): Unit = - clusterCore ! InternalClusterAction.Subscribe(subscriber, to) + @varargs def subscribe(subscriber: ActorRef, to: Class[_]*): Unit = + clusterCore ! InternalClusterAction.Subscribe(subscriber, initialStateMode = InitialStateAsSnapshot, to.toSet) + + /** + * Subscribe to one or more cluster domain events. + * The `to` classes can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]] + * or subclasses. + * + * If `initialStateMode` is [[ClusterEvent.InitialStateAsEvents]] the events corresponding + * to the current state will be sent to the subscriber to mimic what you would + * have seen if you were listening to the events when they occurred in the past. + * + * If `initialStateMode` is [[ClusterEvent.InitialStateAsSnapshot]] a snapshot of + * [[akka.cluster.ClusterEvent.CurrentClusterState]] will be sent to the subscriber as the + * first message. + * + * Note that for large clusters it is more efficient to use `InitialStateAsSnapshot`. + */ + @varargs def subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode, to: Class[_]*): Unit = + clusterCore ! InternalClusterAction.Subscribe(subscriber, initialStateMode, to.toSet) /** * Unsubscribe to all cluster domain events. @@ -237,13 +255,15 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { * If you want this to happen periodically you need to schedule a call to * this method yourself. */ + @deprecated("Use sendCurrentClusterState instead of publishCurrentClusterState", "2.3") def publishCurrentClusterState(): Unit = clusterCore ! InternalClusterAction.PublishCurrentClusterState(None) /** * Publish current (full) state of the cluster to the specified * receiver. If you want this to happen periodically you need to schedule - * a call to this method yourself. + * a call to this method yourself. Note that you can also retrieve the current + * state with [[#state]]. */ def sendCurrentClusterState(receiver: ActorRef): Unit = clusterCore ! InternalClusterAction.PublishCurrentClusterState(Some(receiver)) @@ -333,7 +353,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { logInfo("Shutting down...") system.stop(clusterDaemons) - if (readViewStarted) readView.close() + readView.close() closeScheduler() diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index cd160019f0..7c41a5f5d7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -134,7 +134,7 @@ private[cluster] object InternalClusterAction { case class AddOnMemberUpListener(callback: Runnable) extends NoSerializationVerificationNeeded sealed trait SubscriptionMessage - case class Subscribe(subscriber: ActorRef, to: Class[_]) extends SubscriptionMessage + case class Subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode, to: Set[Class[_]]) extends SubscriptionMessage case class Unsubscribe(subscriber: ActorRef, to: Option[Class[_]]) extends SubscriptionMessage /** * @param receiver if `receiver` is defined the event will only be sent to that diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index fd7f522645..3f224394ed 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -20,6 +20,31 @@ import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } * }}} */ object ClusterEvent { + + sealed abstract class SubscriptionInitialStateMode + /** + * When using this subscription mode a snapshot of + * [[akka.cluster.ClusterEvent.CurrentClusterState]] will be sent to the + * subscriber as the first message. + */ + case object InitialStateAsSnapshot extends SubscriptionInitialStateMode + /** + * When using this subscription mode the events corresponding + * to the current state will be sent to the subscriber to mimic what you would + * have seen if you were listening to the events when they occurred in the past. + */ + case object InitialStateAsEvents extends SubscriptionInitialStateMode + + /** + * Java API + */ + def initialStateAsSnapshot = InitialStateAsSnapshot + + /** + * Java API + */ + def initialStateAsEvents = InitialStateAsEvents + /** * Marker interface for cluster domain events. */ @@ -33,7 +58,7 @@ object ClusterEvent { unreachable: Set[Member] = Set.empty, seenBy: Set[Address] = Set.empty, leader: Option[Address] = None, - roleLeaderMap: Map[String, Option[Address]] = Map.empty) extends ClusterDomainEvent { + roleLeaderMap: Map[String, Option[Address]] = Map.empty) { /** * Java API: get current member list. @@ -102,7 +127,8 @@ object ClusterEvent { } /** - * Member status changed to Exiting. + * Member status changed to [[MemberStatus.Exiting]] and will be removed + * when all members have seen the `Exiting` status. */ case class MemberExited(member: Member) extends MemberEvent { if (member.status != Exiting) throw new IllegalArgumentException("Expected Exiting status, got: " + member) @@ -305,7 +331,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto case PublishChanges(newGossip) ⇒ publishChanges(newGossip) case currentStats: CurrentInternalStats ⇒ publishInternalStats(currentStats) case PublishCurrentClusterState(receiver) ⇒ publishCurrentClusterState(receiver) - case Subscribe(subscriber, to) ⇒ subscribe(subscriber, to) + case Subscribe(subscriber, initMode, to) ⇒ subscribe(subscriber, initMode, to) case Unsubscribe(subscriber, to) ⇒ unsubscribe(subscriber, to) case PublishEvent(event) ⇒ publish(event) } @@ -314,7 +340,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto /** * The current snapshot state corresponding to latest gossip - * to mimic what you would have seen if you where listening to the events. + * to mimic what you would have seen if you were listening to the events. */ def publishCurrentClusterState(receiver: Option[ActorRef]): Unit = { val state = CurrentClusterState( @@ -329,9 +355,19 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto } } - def subscribe(subscriber: ActorRef, to: Class[_]): Unit = { - publishCurrentClusterState(Some(subscriber)) - eventStream.subscribe(subscriber, to) + def subscribe(subscriber: ActorRef, initMode: SubscriptionInitialStateMode, to: Set[Class[_]]): Unit = { + initMode match { + case InitialStateAsEvents ⇒ + def pub(event: AnyRef): Unit = { + if (to.exists(_.isAssignableFrom(event.getClass))) + subscriber ! event + } + publishDiff(Gossip.empty, latestGossip, pub) + case InitialStateAsSnapshot ⇒ + publishCurrentClusterState(Some(subscriber)) + } + + to foreach { eventStream.subscribe(subscriber, _) } } def unsubscribe(subscriber: ActorRef, to: Option[Class[_]]): Unit = to match { @@ -343,14 +379,18 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto val oldGossip = latestGossip // keep the latestGossip to be sent to new subscribers latestGossip = newGossip - diffUnreachable(oldGossip, newGossip) foreach publish - diffReachable(oldGossip, newGossip) foreach publish - diffMemberEvents(oldGossip, newGossip) foreach publish - diffLeader(oldGossip, newGossip) foreach publish - diffRolesLeader(oldGossip, newGossip) foreach publish + publishDiff(oldGossip, newGossip, publish) + } + + def publishDiff(oldGossip: Gossip, newGossip: Gossip, pub: AnyRef ⇒ Unit): Unit = { + diffMemberEvents(oldGossip, newGossip) foreach pub + diffUnreachable(oldGossip, newGossip) foreach pub + diffReachable(oldGossip, newGossip) foreach pub + diffLeader(oldGossip, newGossip) foreach pub + diffRolesLeader(oldGossip, newGossip) foreach pub // publish internal SeenState for testing purposes - diffSeen(oldGossip, newGossip) foreach publish - diffReachability(oldGossip, newGossip) foreach publish + diffSeen(oldGossip, newGossip) foreach pub + diffReachability(oldGossip, newGossip) foreach pub } def publishInternalStats(currentStats: CurrentInternalStats): Unit = publish(currentStats) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala index 7d914f2778..dedfa7aeb1 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala @@ -77,8 +77,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto MetricsInterval, self, MetricsTick) override def preStart(): Unit = { - cluster.subscribe(self, classOf[MemberEvent]) - cluster.subscribe(self, classOf[ReachabilityEvent]) + cluster.subscribe(self, classOf[MemberEvent], classOf[ReachabilityEvent]) logInfo("Metrics collection has started successfully") } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index 58dc99f435..da02f92f69 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -24,7 +24,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { * Current state */ @volatile - private var state: CurrentClusterState = CurrentClusterState() + private var _state: CurrentClusterState = CurrentClusterState() @volatile private var _reachability: Reachability = Reachability.empty @@ -52,35 +52,37 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { def receive = { case e: ClusterDomainEvent ⇒ e match { case SeenChanged(convergence, seenBy) ⇒ - state = state.copy(seenBy = seenBy) + _state = _state.copy(seenBy = seenBy) case ReachabilityChanged(reachability) ⇒ _reachability = reachability case MemberRemoved(member, _) ⇒ - state = state.copy(members = state.members - member, unreachable = state.unreachable - member) + _state = _state.copy(members = _state.members - member, unreachable = _state.unreachable - member) case UnreachableMember(member) ⇒ // replace current member with new member (might have different status, only address is used in equals) - state = state.copy(unreachable = state.unreachable - member + member) + _state = _state.copy(unreachable = _state.unreachable - member + member) case ReachableMember(member) ⇒ - state = state.copy(unreachable = state.unreachable - member) + _state = _state.copy(unreachable = _state.unreachable - member) case event: MemberEvent ⇒ // replace current member with new member (might have different status, only address is used in equals) val newUnreachable = - if (state.unreachable.contains(event.member)) state.unreachable - event.member + event.member - else state.unreachable - state = state.copy(members = state.members - event.member + event.member, + if (_state.unreachable.contains(event.member)) _state.unreachable - event.member + event.member + else _state.unreachable + _state = _state.copy(members = _state.members - event.member + event.member, unreachable = newUnreachable) case LeaderChanged(leader) ⇒ - state = state.copy(leader = leader) + _state = _state.copy(leader = leader) case RoleLeaderChanged(role, leader) ⇒ - state = state.copy(roleLeaderMap = state.roleLeaderMap + (role -> leader)) - case s: CurrentClusterState ⇒ state = s + _state = _state.copy(roleLeaderMap = _state.roleLeaderMap + (role -> leader)) case stats: CurrentInternalStats ⇒ _latestStats = stats case ClusterMetricsChanged(nodes) ⇒ _clusterMetrics = nodes } + case s: CurrentClusterState ⇒ _state = s } }).withDispatcher(cluster.settings.UseDispatcher).withDeploy(Deploy.local), name = "clusterEventBusListener") } + def state: CurrentClusterState = _state + def self: Member = { import cluster.selfUniqueAddress state.members.find(_.uniqueAddress == selfUniqueAddress). diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index b20032f2c9..584ed2168a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -239,7 +239,7 @@ private[akka] trait ClusterRouterConfigBase extends RouterConfig { // Intercept ClusterDomainEvent and route them to the ClusterRouterActor override def isManagementMessage(msg: Any): Boolean = - (msg.isInstanceOf[ClusterDomainEvent]) || super.isManagementMessage(msg) + (msg.isInstanceOf[ClusterDomainEvent]) || msg.isInstanceOf[CurrentClusterState] || super.isManagementMessage(msg) } /** @@ -373,10 +373,9 @@ private[akka] trait ClusterRouterActor { this: RouterActor ⇒ def cluster: Cluster = Cluster(context.system) // re-subscribe when restart - override def preStart(): Unit = { - cluster.subscribe(self, classOf[MemberEvent]) - cluster.subscribe(self, classOf[ReachabilityEvent]) - } + override def preStart(): Unit = + cluster.subscribe(self, classOf[MemberEvent], classOf[ReachabilityEvent]) + override def postStop(): Unit = cluster.unsubscribe(self) var nodes: immutable.SortedSet[Address] = { diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala index eae4f5da45..9a150fee75 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -44,6 +44,8 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec val g5 = Gossip(members = SortedSet(a51Up, aUp, bExiting, cUp)).seen(aUp.uniqueAddress).seen(bExiting.uniqueAddress).seen(cUp.uniqueAddress).seen(a51Up.uniqueAddress) val g6 = Gossip(members = SortedSet(aLeaving, bExiting, cUp)).seen(aUp.uniqueAddress) val g7 = Gossip(members = SortedSet(aExiting, bExiting, cUp)).seen(aUp.uniqueAddress) + val g8 = Gossip(members = SortedSet(aUp, bExiting, cUp, dUp), overview = GossipOverview(reachability = + Reachability.empty.unreachable(aUp.uniqueAddress, dUp.uniqueAddress))).seen(aUp.uniqueAddress) // created in beforeEach var memberSubscriber: TestProbe = _ @@ -74,7 +76,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec memberSubscriber.expectMsg(MemberExited(bExiting)) memberSubscriber.expectMsg(MemberUp(cUp)) memberSubscriber.expectMsg(LeaderChanged(Some(a51Up.address))) - memberSubscriber.expectNoMsg(1 second) + memberSubscriber.expectNoMsg(500 millis) } "publish leader changed when old leader leaves and is removed" in { @@ -82,11 +84,11 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec memberSubscriber.expectMsg(MemberExited(bExiting)) memberSubscriber.expectMsg(MemberUp(cUp)) publisher ! PublishChanges(g6) - memberSubscriber.expectNoMsg(1 second) + memberSubscriber.expectNoMsg(500 millis) publisher ! PublishChanges(g7) memberSubscriber.expectMsg(MemberExited(aExiting)) memberSubscriber.expectMsg(LeaderChanged(Some(cUp.address))) - memberSubscriber.expectNoMsg(1 second) + memberSubscriber.expectNoMsg(500 millis) // at the removed member a an empty gossip is the last thing publisher ! PublishChanges(Gossip.empty) memberSubscriber.expectMsg(MemberRemoved(aRemoved, Exiting)) @@ -103,12 +105,12 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec memberSubscriber.expectMsg(LeaderChanged(Some(a51Up.address))) publisher ! PublishChanges(g5) - memberSubscriber.expectNoMsg(1 second) + memberSubscriber.expectNoMsg(500 millis) } "publish role leader changed" in { val subscriber = TestProbe() - publisher ! Subscribe(subscriber.ref, classOf[RoleLeaderChanged]) + publisher ! Subscribe(subscriber.ref, InitialStateAsSnapshot, Set(classOf[RoleLeaderChanged])) subscriber.expectMsgType[CurrentClusterState] publisher ! PublishChanges(Gossip(members = SortedSet(cJoining, dUp))) subscriber.expectMsg(RoleLeaderChanged("GRP", Some(dUp.address))) @@ -118,19 +120,29 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec "send CurrentClusterState when subscribe" in { val subscriber = TestProbe() - publisher ! Subscribe(subscriber.ref, classOf[ClusterDomainEvent]) + publisher ! Subscribe(subscriber.ref, InitialStateAsSnapshot, Set(classOf[ClusterDomainEvent])) subscriber.expectMsgType[CurrentClusterState] // but only to the new subscriber - memberSubscriber.expectNoMsg(1 second) + memberSubscriber.expectNoMsg(500 millis) + + } + + "send events corresponding to current state when subscribe" in { + val subscriber = TestProbe() + publisher ! PublishChanges(g8) + publisher ! Subscribe(subscriber.ref, InitialStateAsEvents, Set(classOf[MemberEvent], classOf[ReachabilityEvent])) + subscriber.receiveN(4).toSet should be(Set(MemberUp(aUp), MemberUp(cUp), MemberUp(dUp), MemberExited(bExiting))) + subscriber.expectMsg(UnreachableMember(dUp)) + subscriber.expectNoMsg(500 millis) } "support unsubscribe" in { val subscriber = TestProbe() - publisher ! Subscribe(subscriber.ref, classOf[MemberEvent]) + publisher ! Subscribe(subscriber.ref, InitialStateAsSnapshot, Set(classOf[MemberEvent])) subscriber.expectMsgType[CurrentClusterState] publisher ! Unsubscribe(subscriber.ref, Some(classOf[MemberEvent])) publisher ! PublishChanges(g3) - subscriber.expectNoMsg(1 second) + subscriber.expectNoMsg(500 millis) // but memberSubscriber is still subscriber memberSubscriber.expectMsg(MemberExited(bExiting)) memberSubscriber.expectMsg(MemberUp(cUp)) @@ -138,14 +150,14 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec "publish SeenChanged" in { val subscriber = TestProbe() - publisher ! Subscribe(subscriber.ref, classOf[SeenChanged]) + publisher ! Subscribe(subscriber.ref, InitialStateAsSnapshot, Set(classOf[SeenChanged])) subscriber.expectMsgType[CurrentClusterState] publisher ! PublishChanges(g2) subscriber.expectMsgType[SeenChanged] - subscriber.expectNoMsg(1 second) + subscriber.expectNoMsg(500 millis) publisher ! PublishChanges(g3) subscriber.expectMsgType[SeenChanged] - subscriber.expectNoMsg(1 second) + subscriber.expectNoMsg(500 millis) } "publish Removed when stopped" in { diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 8992d30663..dd9485ea28 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -68,9 +68,27 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { awaitAssert(clusterView.status should be(MemberStatus.Up)) } + "publish inital state as snapshot to subscribers" in { + try { + cluster.subscribe(testActor, ClusterEvent.InitialStateAsSnapshot, classOf[ClusterEvent.MemberEvent]) + expectMsgClass(classOf[ClusterEvent.CurrentClusterState]) + } finally { + cluster.unsubscribe(testActor) + } + } + + "publish inital state as events to subscribers" in { + try { + cluster.subscribe(testActor, ClusterEvent.InitialStateAsEvents, classOf[ClusterEvent.MemberEvent]) + expectMsgClass(classOf[ClusterEvent.MemberUp]) + } finally { + cluster.unsubscribe(testActor) + } + } + "publish CurrentClusterState to subscribers when requested" in { try { - cluster.subscribe(testActor, classOf[ClusterEvent.ClusterDomainEvent]) + cluster.subscribe(testActor, classOf[ClusterEvent.ClusterDomainEvent], classOf[ClusterEvent.CurrentClusterState]) // first, is in response to the subscription expectMsgClass(classOf[ClusterEvent.CurrentClusterState]) diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala index b5129d458e..0fa8965e08 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala @@ -652,16 +652,18 @@ class ShardRegion( def receive = { case Terminated(ref) ⇒ receiveTerminated(ref) case evt: ClusterDomainEvent ⇒ receiveClusterEvent(evt) + case state: CurrentClusterState ⇒ receiveClusterState(state) case msg: CoordinatorMessage ⇒ receiveCoordinatorMessage(msg) case cmd: ShardRegionCommand ⇒ receiveCommand(cmd) case msg if idExtractor.isDefinedAt(msg) ⇒ deliverMessage(msg, sender) } - def receiveClusterEvent(evt: ClusterDomainEvent): Unit = evt match { - case state: CurrentClusterState ⇒ - changeMembers(immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m ⇒ - m.status == MemberStatus.Up && matchingRole(m))) + def receiveClusterState(state: CurrentClusterState): Unit = { + changeMembers(immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m ⇒ + m.status == MemberStatus.Up && matchingRole(m))) + } + def receiveClusterEvent(evt: ClusterDomainEvent): Unit = evt match { case MemberUp(m) ⇒ if (matchingRole(m)) changeMembers(membersByAge + m) diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala index a439d51ccd..7696ffcce0 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala @@ -389,8 +389,7 @@ class ClusterSingletonManager( require(!cluster.isTerminated, "Cluster node must not be terminated") // subscribe to cluster changes, re-subscribe when restart - cluster.subscribe(self, classOf[MemberExited]) - cluster.subscribe(self, classOf[MemberRemoved]) + cluster.subscribe(self, classOf[MemberExited], classOf[MemberRemoved]) setTimer(CleanupTimer, Cleanup, 1.minute, repeat = true) diff --git a/akka-docs/rst/java/cluster-usage.rst b/akka-docs/rst/java/cluster-usage.rst index 4d995ac4dc..cc3709b8fc 100644 --- a/akka-docs/rst/java/cluster-usage.rst +++ b/akka-docs/rst/java/cluster-usage.rst @@ -44,12 +44,12 @@ An actor that uses the cluster extension may look like this: .. literalinclude:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/simple/SimpleClusterListener.java :language: java -The actor registers itself as subscriber of certain cluster events. It gets notified with a snapshot event, ``CurrentClusterState`` -that holds full state information of the cluster. After that it receives events for changes that happen in the cluster. +The actor registers itself as subscriber of certain cluster events. It receives events corresponding to the current state +of the cluster when the subscription starts and then it receives events for changes that happen in the cluster. The easiest way to run this example yourself is to download `Typesafe Activator `_ and open the tutorial named `Akka Cluster Samples with Java `_. -It contains instructions of how to run the SimpleClusterApp. +It contains instructions of how to run the ``SimpleClusterApp``. Joining to Seed Nodes ^^^^^^^^^^^^^^^^^^^^^ @@ -166,15 +166,27 @@ Subscribe to Cluster Events ^^^^^^^^^^^^^^^^^^^^^^^^^^^ You can subscribe to change notifications of the cluster membership by using -``Cluster.get(system).subscribe(subscriber, to)``. A snapshot of the full state, -``akka.cluster.ClusterEvent.CurrentClusterState``, is sent to the subscriber -as the first event, followed by events for incremental updates. +``Cluster.get(system).subscribe``. + +.. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/simple/SimpleClusterListener2.java#subscribe + +A snapshot of the full state, ``akka.cluster.ClusterEvent.CurrentClusterState``, is sent to the subscriber +as the first message, followed by events for incremental updates. Note that you may receive an empty ``CurrentClusterState``, containing no members, if you start the subscription before the initial join procedure has completed. This is expected behavior. When the node has been accepted in the cluster you will receive ``MemberUp`` for that node, and other nodes. +If you find it inconvenient to handle the ``CurrentClusterState`` you can use +``ClusterEvent.initialStateAsEvents()`` as parameter to ``subscribe``. +That means that instead of receiving ``CurrentClusterState`` as the first message you will receive +the events corresponding to the current state to mimic what you would have seen if you were +listening to the events when they occurred in the past. Note that those initial events only correspond +to the current state and it is not the full history of all changes that actually has occurred in the cluster. + +.. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/simple/SimpleClusterListener.java#subscribe + The events to track the life-cycle of members are: * ``ClusterEvent.MemberUp`` - A new member has joined the cluster and its status has been changed to ``Up``. @@ -190,6 +202,10 @@ There are more types of change events, consult the API documentation of classes that extends ``akka.cluster.ClusterEvent.ClusterDomainEvent`` for details about the events. +Instead of subscribing to cluster events it can sometimes be convenient to only get the full membership state with +``Cluster.get(system).state()``. Note that this state is not necessarily in sync with the events published to a +cluster subscription. + Worker Dial-in Example ---------------------- diff --git a/akka-docs/rst/project/migration-guide-2.2.x-2.3.x.rst b/akka-docs/rst/project/migration-guide-2.2.x-2.3.x.rst index 422223f316..09c126d9ba 100644 --- a/akka-docs/rst/project/migration-guide-2.2.x-2.3.x.rst +++ b/akka-docs/rst/project/migration-guide-2.2.x-2.3.x.rst @@ -133,4 +133,21 @@ The following, previously deprecated, features have been removed: * `API changes to FSM and TestFSMRef `_ * DefaultScheduler superseded by LightArrayRevolverScheduler +publishCurrentClusterState is Deprecated +======================================== + +Use ``sendCurrentClusterState`` instead. Note that you can also retrieve the current cluster state +with the new ``Cluster(system).state``. + + +CurrentClusterState is not a ClusterDomainEvent +=============================================== + +``CurrentClusterState`` does not implement the ``ClusterDomainEvent`` marker interface any more. + +Note the new ``initialStateMode`` parameter of ``Cluster.subscribe``, which makes it possible +to handle the initial state as events instead of ``CurrentClusterState``. See +:ref:`documentation for Scala ` and +:ref:`documentation for Java `. + diff --git a/akka-docs/rst/scala/cluster-usage.rst b/akka-docs/rst/scala/cluster-usage.rst index 6c87346534..60a97d1f18 100644 --- a/akka-docs/rst/scala/cluster-usage.rst +++ b/akka-docs/rst/scala/cluster-usage.rst @@ -38,12 +38,12 @@ An actor that uses the cluster extension may look like this: .. literalinclude:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/simple/SimpleClusterListener.scala :language: scala -The actor registers itself as subscriber of certain cluster events. It gets notified with a snapshot event, ``CurrentClusterState`` -that holds full state information of the cluster. After that it receives events for changes that happen in the cluster. +The actor registers itself as subscriber of certain cluster events. It receives events corresponding to the current state +of the cluster when the subscription starts and then it receives events for changes that happen in the cluster. The easiest way to run this example yourself is to download `Typesafe Activator `_ and open the tutorial named `Akka Cluster Samples with Scala `_. -It contains instructions of how to run the SimpleClusterApp. +It contains instructions of how to run the ``SimpleClusterApp``. Joining to Seed Nodes ^^^^^^^^^^^^^^^^^^^^^ @@ -160,15 +160,27 @@ Subscribe to Cluster Events ^^^^^^^^^^^^^^^^^^^^^^^^^^^ You can subscribe to change notifications of the cluster membership by using -``Cluster(system).subscribe(subscriber, to)``. A snapshot of the full state, -``akka.cluster.ClusterEvent.CurrentClusterState``, is sent to the subscriber -as the first event, followed by events for incremental updates. +``Cluster(system).subscribe``. + +.. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/simple/SimpleClusterListener2.scala#subscribe + +A snapshot of the full state, ``akka.cluster.ClusterEvent.CurrentClusterState``, is sent to the subscriber +as the first message, followed by events for incremental updates. Note that you may receive an empty ``CurrentClusterState``, containing no members, if you start the subscription before the initial join procedure has completed. This is expected behavior. When the node has been accepted in the cluster you will receive ``MemberUp`` for that node, and other nodes. +If you find it inconvenient to handle the ``CurrentClusterState`` you can use +``ClusterEvent.InitialStateAsEvents`` as parameter to ``subscribe``. +That means that instead of receiving ``CurrentClusterState`` as the first message you will receive +the events corresponding to the current state to mimic what you would have seen if you were +listening to the events when they occurred in the past. Note that those initial events only correspond +to the current state and it is not the full history of all changes that actually has occurred in the cluster. + +.. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/simple/SimpleClusterListener.scala#subscribe + The events to track the life-cycle of members are: * ``ClusterEvent.MemberUp`` - A new member has joined the cluster and its status has been changed to ``Up``. @@ -184,6 +196,10 @@ There are more types of change events, consult the API documentation of classes that extends ``akka.cluster.ClusterEvent.ClusterDomainEvent`` for details about the events. +Instead of subscribing to cluster events it can sometimes be convenient to only get the full membership state with +``Cluster(system).state``. Note that this state is not necessarily in sync with the events published to a +cluster subscription. + Worker Dial-in Example ---------------------- diff --git a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/simple/SimpleClusterListener.java b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/simple/SimpleClusterListener.java index 755a73d749..37b763d6d9 100644 --- a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/simple/SimpleClusterListener.java +++ b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/simple/SimpleClusterListener.java @@ -2,8 +2,8 @@ package sample.cluster.simple; import akka.actor.UntypedActor; import akka.cluster.Cluster; -import akka.cluster.ClusterEvent.ClusterDomainEvent; -import akka.cluster.ClusterEvent.CurrentClusterState; +import akka.cluster.ClusterEvent; +import akka.cluster.ClusterEvent.MemberEvent; import akka.cluster.ClusterEvent.MemberUp; import akka.cluster.ClusterEvent.MemberRemoved; import akka.cluster.ClusterEvent.UnreachableMember; @@ -14,10 +14,13 @@ public class SimpleClusterListener extends UntypedActor { LoggingAdapter log = Logging.getLogger(getContext().system(), this); Cluster cluster = Cluster.get(getContext().system()); - //subscribe to cluster changes, MemberUp + //subscribe to cluster changes @Override public void preStart() { - cluster.subscribe(getSelf(), ClusterDomainEvent.class); + //#subscribe + cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), + MemberEvent.class, UnreachableMember.class); + //#subscribe } //re-subscribe when restart @@ -28,11 +31,7 @@ public class SimpleClusterListener extends UntypedActor { @Override public void onReceive(Object message) { - if (message instanceof CurrentClusterState) { - CurrentClusterState state = (CurrentClusterState) message; - log.info("Current members: {}", state.members()); - - } else if (message instanceof MemberUp) { + if (message instanceof MemberUp) { MemberUp mUp = (MemberUp) message; log.info("Member is Up: {}", mUp.member()); @@ -44,7 +43,7 @@ public class SimpleClusterListener extends UntypedActor { MemberRemoved mRemoved = (MemberRemoved) message; log.info("Member is Removed: {}", mRemoved.member()); - } else if (message instanceof ClusterDomainEvent) { + } else if (message instanceof MemberEvent) { // ignore } else { diff --git a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/simple/SimpleClusterListener2.java b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/simple/SimpleClusterListener2.java new file mode 100644 index 0000000000..da623b3976 --- /dev/null +++ b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/simple/SimpleClusterListener2.java @@ -0,0 +1,57 @@ +package sample.cluster.simple; + +import akka.actor.UntypedActor; +import akka.cluster.Cluster; +import akka.cluster.ClusterEvent.CurrentClusterState; +import akka.cluster.ClusterEvent.MemberEvent; +import akka.cluster.ClusterEvent.MemberUp; +import akka.cluster.ClusterEvent.MemberRemoved; +import akka.cluster.ClusterEvent.UnreachableMember; +import akka.event.Logging; +import akka.event.LoggingAdapter; + +public class SimpleClusterListener2 extends UntypedActor { + LoggingAdapter log = Logging.getLogger(getContext().system(), this); + Cluster cluster = Cluster.get(getContext().system()); + + //subscribe to cluster changes + @Override + public void preStart() { + //#subscribe + cluster.subscribe(getSelf(), MemberEvent.class, UnreachableMember.class); + //#subscribe + } + + //re-subscribe when restart + @Override + public void postStop() { + cluster.unsubscribe(getSelf()); + } + + @Override + public void onReceive(Object message) { + if (message instanceof CurrentClusterState) { + CurrentClusterState state = (CurrentClusterState) message; + log.info("Current members: {}", state.members()); + + } else if (message instanceof MemberUp) { + MemberUp mUp = (MemberUp) message; + log.info("Member is Up: {}", mUp.member()); + + } else if (message instanceof UnreachableMember) { + UnreachableMember mUnreachable = (UnreachableMember) message; + log.info("Member detected as unreachable: {}", mUnreachable.member()); + + } else if (message instanceof MemberRemoved) { + MemberRemoved mRemoved = (MemberRemoved) message; + log.info("Member is Removed: {}", mRemoved.member()); + + } else if (message instanceof MemberEvent) { + // ignore + + } else { + unhandled(message); + } + + } +} diff --git a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsSampleClient.java b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsSampleClient.java index acfaebee71..8c4a30ca24 100644 --- a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsSampleClient.java +++ b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsSampleClient.java @@ -17,9 +17,12 @@ import akka.actor.Address; import akka.actor.Cancellable; import akka.actor.UntypedActor; import akka.cluster.Cluster; +import akka.cluster.ClusterEvent.UnreachableMember; +import akka.cluster.ClusterEvent.ReachableMember; import akka.cluster.ClusterEvent.CurrentClusterState; import akka.cluster.ClusterEvent.MemberEvent; import akka.cluster.ClusterEvent.MemberUp; +import akka.cluster.ClusterEvent.ReachabilityEvent; import akka.cluster.Member; import akka.cluster.MemberStatus; @@ -44,7 +47,7 @@ public class StatsSampleClient extends UntypedActor { //subscribe to cluster changes, MemberEvent @Override public void preStart() { - cluster.subscribe(getSelf(), MemberEvent.class); + cluster.subscribe(getSelf(), MemberEvent.class, ReachabilityEvent.class); } //re-subscribe when restart @@ -91,6 +94,15 @@ public class StatsSampleClient extends UntypedActor { MemberEvent other = (MemberEvent) message; nodes.remove(other.member().address()); + } else if (message instanceof UnreachableMember) { + UnreachableMember unreachable = (UnreachableMember) message; + nodes.remove(unreachable.member().address()); + + } else if (message instanceof ReachableMember) { + ReachableMember reachable = (ReachableMember) message; + if (reachable.member().hasRole("compute")) + nodes.add(reachable.member().address()); + } else { unhandled(message); } diff --git a/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/simple/SimpleClusterListener.scala b/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/simple/SimpleClusterListener.scala index 7f06f38ba2..32c229402f 100644 --- a/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/simple/SimpleClusterListener.scala +++ b/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/simple/SimpleClusterListener.scala @@ -10,12 +10,15 @@ class SimpleClusterListener extends Actor with ActorLogging { val cluster = Cluster(context.system) // subscribe to cluster changes, re-subscribe when restart - override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterDomainEvent]) + override def preStart(): Unit = { + //#subscribe + cluster.subscribe(self, initialStateMode = InitialStateAsEvents, + classOf[MemberEvent], classOf[UnreachableMember]) + //#subscribe + } override def postStop(): Unit = cluster.unsubscribe(self) def receive = { - case state: CurrentClusterState => - log.info("Current members: {}", state.members.mkString(", ")) case MemberUp(member) => log.info("Member is Up: {}", member.address) case UnreachableMember(member) => @@ -23,6 +26,6 @@ class SimpleClusterListener extends Actor with ActorLogging { case MemberRemoved(member, previousStatus) => log.info("Member is Removed: {} after {}", member.address, previousStatus) - case _: ClusterDomainEvent => // ignore + case _: MemberEvent => // ignore } } \ No newline at end of file diff --git a/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/simple/SimpleClusterListener2.scala b/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/simple/SimpleClusterListener2.scala new file mode 100644 index 0000000000..13daf8c9e8 --- /dev/null +++ b/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/simple/SimpleClusterListener2.scala @@ -0,0 +1,32 @@ +package sample.cluster.simple + +import akka.cluster.Cluster +import akka.cluster.ClusterEvent._ +import akka.actor.ActorLogging +import akka.actor.Actor + +class SimpleClusterListener2 extends Actor with ActorLogging { + + val cluster = Cluster(context.system) + + // subscribe to cluster changes, re-subscribe when restart + override def preStart(): Unit = { + //#subscribe + cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember]) + //#subscribe + } + override def postStop(): Unit = cluster.unsubscribe(self) + + def receive = { + case state: CurrentClusterState => + log.info("Current members: {}", state.members.mkString(", ")) + case MemberUp(member) => + log.info("Member is Up: {}", member.address) + case UnreachableMember(member) => + log.info("Member detected as unreachable: {}", member) + case MemberRemoved(member, previousStatus) => + log.info("Member is Removed: {} after {}", + member.address, previousStatus) + case _: MemberEvent => // ignore + } +} \ No newline at end of file diff --git a/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsSample.scala b/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsSample.scala index ef286bd497..060de44d0c 100644 --- a/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsSample.scala +++ b/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsSample.scala @@ -61,8 +61,7 @@ class StatsSampleClient(servicePath: String) extends Actor { var nodes = Set.empty[Address] override def preStart(): Unit = { - cluster.subscribe(self, classOf[MemberEvent]) - cluster.subscribe(self, classOf[UnreachableMember]) + cluster.subscribe(self, classOf[MemberEvent], classOf[ReachabilityEvent]) } override def postStop(): Unit = { cluster.unsubscribe(self) @@ -83,9 +82,10 @@ class StatsSampleClient(servicePath: String) extends Actor { nodes = state.members.collect { case m if m.hasRole("compute") && m.status == MemberStatus.Up => m.address } - case MemberUp(m) if m.hasRole("compute") => nodes += m.address - case other: MemberEvent => nodes -= other.member.address - case UnreachableMember(m) => nodes -= m.address + case MemberUp(m) if m.hasRole("compute") => nodes += m.address + case other: MemberEvent => nodes -= other.member.address + case UnreachableMember(m) => nodes -= m.address + case ReachableMember(m) if m.hasRole("compute") => nodes += m.address } }