!clu #3617 API improvements related to CurrentClusterState

* Getter for CurrentClusterState in Cluster extension, updated via
  ClusterReadView
* Remove lazy init of readView. Otherwise the cluster.state will be
  empty on first access, wich is probably surprising
* Subscribe to several cluster event types at once, to ensure *one*
  CurrentClusterEvent followed by change events
* Deprecate publishCurrentClusterState, was a bad idea, use sendCurrentClusterState
  instead
* Possibility to subscribe with InitialStateAsEvents to receive events corresponding
  to CurrentClusterState
* CurrentClusterState not a ClusterDomainEvent, ticket #3614
This commit is contained in:
Patrik Nordwall 2014-01-08 14:14:48 +01:00
parent bfbee94fec
commit 2e5193347e
19 changed files with 342 additions and 99 deletions

View file

@ -29,6 +29,7 @@ import com.typesafe.config.Config
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import java.util.concurrent.ThreadFactory import java.util.concurrent.ThreadFactory
import scala.util.control.NonFatal import scala.util.control.NonFatal
import scala.annotation.varargs
/** /**
* Cluster Extension Id and factory for creating Cluster extension. * Cluster Extension Id and factory for creating Cluster extension.
@ -178,13 +179,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
} }
} }
@volatile private[cluster] val readView: ClusterReadView = new ClusterReadView(this)
private var readViewStarted = false
private[cluster] lazy val readView: ClusterReadView = {
val readView = new ClusterReadView(this)
readViewStarted = true
readView
}
system.registerOnTermination(shutdown()) system.registerOnTermination(shutdown())
@ -207,15 +202,38 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
def isTerminated: Boolean = _isTerminated.get def isTerminated: Boolean = _isTerminated.get
/** /**
* Subscribe to cluster domain events. * Current snapshot state of the cluster.
* The `to` Class can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]] */
* or subclass. def state: CurrentClusterState = readView.state
/**
* Subscribe to one or more cluster domain events.
* The `to` classes can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]]
* or subclasses.
* *
* A snapshot of [[akka.cluster.ClusterEvent.CurrentClusterState]] * A snapshot of [[akka.cluster.ClusterEvent.CurrentClusterState]]
* will be sent to the subscriber as the first event. * will be sent to the subscriber as the first message.
*/ */
def subscribe(subscriber: ActorRef, to: Class[_]): Unit = @varargs def subscribe(subscriber: ActorRef, to: Class[_]*): Unit =
clusterCore ! InternalClusterAction.Subscribe(subscriber, to) clusterCore ! InternalClusterAction.Subscribe(subscriber, initialStateMode = InitialStateAsSnapshot, to.toSet)
/**
* Subscribe to one or more cluster domain events.
* The `to` classes can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]]
* or subclasses.
*
* If `initialStateMode` is [[ClusterEvent.InitialStateAsEvents]] the events corresponding
* to the current state will be sent to the subscriber to mimic what you would
* have seen if you were listening to the events when they occurred in the past.
*
* If `initialStateMode` is [[ClusterEvent.InitialStateAsSnapshot]] a snapshot of
* [[akka.cluster.ClusterEvent.CurrentClusterState]] will be sent to the subscriber as the
* first message.
*
* Note that for large clusters it is more efficient to use `InitialStateAsSnapshot`.
*/
@varargs def subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode, to: Class[_]*): Unit =
clusterCore ! InternalClusterAction.Subscribe(subscriber, initialStateMode, to.toSet)
/** /**
* Unsubscribe to all cluster domain events. * Unsubscribe to all cluster domain events.
@ -237,13 +255,15 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
* If you want this to happen periodically you need to schedule a call to * If you want this to happen periodically you need to schedule a call to
* this method yourself. * this method yourself.
*/ */
@deprecated("Use sendCurrentClusterState instead of publishCurrentClusterState", "2.3")
def publishCurrentClusterState(): Unit = def publishCurrentClusterState(): Unit =
clusterCore ! InternalClusterAction.PublishCurrentClusterState(None) clusterCore ! InternalClusterAction.PublishCurrentClusterState(None)
/** /**
* Publish current (full) state of the cluster to the specified * Publish current (full) state of the cluster to the specified
* receiver. If you want this to happen periodically you need to schedule * receiver. If you want this to happen periodically you need to schedule
* a call to this method yourself. * a call to this method yourself. Note that you can also retrieve the current
* state with [[#state]].
*/ */
def sendCurrentClusterState(receiver: ActorRef): Unit = def sendCurrentClusterState(receiver: ActorRef): Unit =
clusterCore ! InternalClusterAction.PublishCurrentClusterState(Some(receiver)) clusterCore ! InternalClusterAction.PublishCurrentClusterState(Some(receiver))
@ -333,7 +353,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
logInfo("Shutting down...") logInfo("Shutting down...")
system.stop(clusterDaemons) system.stop(clusterDaemons)
if (readViewStarted) readView.close() readView.close()
closeScheduler() closeScheduler()

View file

@ -134,7 +134,7 @@ private[cluster] object InternalClusterAction {
case class AddOnMemberUpListener(callback: Runnable) extends NoSerializationVerificationNeeded case class AddOnMemberUpListener(callback: Runnable) extends NoSerializationVerificationNeeded
sealed trait SubscriptionMessage sealed trait SubscriptionMessage
case class Subscribe(subscriber: ActorRef, to: Class[_]) extends SubscriptionMessage case class Subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode, to: Set[Class[_]]) extends SubscriptionMessage
case class Unsubscribe(subscriber: ActorRef, to: Option[Class[_]]) extends SubscriptionMessage case class Unsubscribe(subscriber: ActorRef, to: Option[Class[_]]) extends SubscriptionMessage
/** /**
* @param receiver if `receiver` is defined the event will only be sent to that * @param receiver if `receiver` is defined the event will only be sent to that

View file

@ -20,6 +20,31 @@ import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
* }}} * }}}
*/ */
object ClusterEvent { object ClusterEvent {
sealed abstract class SubscriptionInitialStateMode
/**
* When using this subscription mode a snapshot of
* [[akka.cluster.ClusterEvent.CurrentClusterState]] will be sent to the
* subscriber as the first message.
*/
case object InitialStateAsSnapshot extends SubscriptionInitialStateMode
/**
* When using this subscription mode the events corresponding
* to the current state will be sent to the subscriber to mimic what you would
* have seen if you were listening to the events when they occurred in the past.
*/
case object InitialStateAsEvents extends SubscriptionInitialStateMode
/**
* Java API
*/
def initialStateAsSnapshot = InitialStateAsSnapshot
/**
* Java API
*/
def initialStateAsEvents = InitialStateAsEvents
/** /**
* Marker interface for cluster domain events. * Marker interface for cluster domain events.
*/ */
@ -33,7 +58,7 @@ object ClusterEvent {
unreachable: Set[Member] = Set.empty, unreachable: Set[Member] = Set.empty,
seenBy: Set[Address] = Set.empty, seenBy: Set[Address] = Set.empty,
leader: Option[Address] = None, leader: Option[Address] = None,
roleLeaderMap: Map[String, Option[Address]] = Map.empty) extends ClusterDomainEvent { roleLeaderMap: Map[String, Option[Address]] = Map.empty) {
/** /**
* Java API: get current member list. * Java API: get current member list.
@ -102,7 +127,8 @@ object ClusterEvent {
} }
/** /**
* Member status changed to Exiting. * Member status changed to [[MemberStatus.Exiting]] and will be removed
* when all members have seen the `Exiting` status.
*/ */
case class MemberExited(member: Member) extends MemberEvent { case class MemberExited(member: Member) extends MemberEvent {
if (member.status != Exiting) throw new IllegalArgumentException("Expected Exiting status, got: " + member) if (member.status != Exiting) throw new IllegalArgumentException("Expected Exiting status, got: " + member)
@ -305,7 +331,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
case PublishChanges(newGossip) publishChanges(newGossip) case PublishChanges(newGossip) publishChanges(newGossip)
case currentStats: CurrentInternalStats publishInternalStats(currentStats) case currentStats: CurrentInternalStats publishInternalStats(currentStats)
case PublishCurrentClusterState(receiver) publishCurrentClusterState(receiver) case PublishCurrentClusterState(receiver) publishCurrentClusterState(receiver)
case Subscribe(subscriber, to) subscribe(subscriber, to) case Subscribe(subscriber, initMode, to) subscribe(subscriber, initMode, to)
case Unsubscribe(subscriber, to) unsubscribe(subscriber, to) case Unsubscribe(subscriber, to) unsubscribe(subscriber, to)
case PublishEvent(event) publish(event) case PublishEvent(event) publish(event)
} }
@ -314,7 +340,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
/** /**
* The current snapshot state corresponding to latest gossip * The current snapshot state corresponding to latest gossip
* to mimic what you would have seen if you where listening to the events. * to mimic what you would have seen if you were listening to the events.
*/ */
def publishCurrentClusterState(receiver: Option[ActorRef]): Unit = { def publishCurrentClusterState(receiver: Option[ActorRef]): Unit = {
val state = CurrentClusterState( val state = CurrentClusterState(
@ -329,9 +355,19 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
} }
} }
def subscribe(subscriber: ActorRef, to: Class[_]): Unit = { def subscribe(subscriber: ActorRef, initMode: SubscriptionInitialStateMode, to: Set[Class[_]]): Unit = {
publishCurrentClusterState(Some(subscriber)) initMode match {
eventStream.subscribe(subscriber, to) case InitialStateAsEvents
def pub(event: AnyRef): Unit = {
if (to.exists(_.isAssignableFrom(event.getClass)))
subscriber ! event
}
publishDiff(Gossip.empty, latestGossip, pub)
case InitialStateAsSnapshot
publishCurrentClusterState(Some(subscriber))
}
to foreach { eventStream.subscribe(subscriber, _) }
} }
def unsubscribe(subscriber: ActorRef, to: Option[Class[_]]): Unit = to match { def unsubscribe(subscriber: ActorRef, to: Option[Class[_]]): Unit = to match {
@ -343,14 +379,18 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
val oldGossip = latestGossip val oldGossip = latestGossip
// keep the latestGossip to be sent to new subscribers // keep the latestGossip to be sent to new subscribers
latestGossip = newGossip latestGossip = newGossip
diffUnreachable(oldGossip, newGossip) foreach publish publishDiff(oldGossip, newGossip, publish)
diffReachable(oldGossip, newGossip) foreach publish }
diffMemberEvents(oldGossip, newGossip) foreach publish
diffLeader(oldGossip, newGossip) foreach publish def publishDiff(oldGossip: Gossip, newGossip: Gossip, pub: AnyRef Unit): Unit = {
diffRolesLeader(oldGossip, newGossip) foreach publish diffMemberEvents(oldGossip, newGossip) foreach pub
diffUnreachable(oldGossip, newGossip) foreach pub
diffReachable(oldGossip, newGossip) foreach pub
diffLeader(oldGossip, newGossip) foreach pub
diffRolesLeader(oldGossip, newGossip) foreach pub
// publish internal SeenState for testing purposes // publish internal SeenState for testing purposes
diffSeen(oldGossip, newGossip) foreach publish diffSeen(oldGossip, newGossip) foreach pub
diffReachability(oldGossip, newGossip) foreach publish diffReachability(oldGossip, newGossip) foreach pub
} }
def publishInternalStats(currentStats: CurrentInternalStats): Unit = publish(currentStats) def publishInternalStats(currentStats: CurrentInternalStats): Unit = publish(currentStats)

View file

@ -77,8 +77,7 @@ private[cluster] class ClusterMetricsCollector(publisher: ActorRef) extends Acto
MetricsInterval, self, MetricsTick) MetricsInterval, self, MetricsTick)
override def preStart(): Unit = { override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent]) cluster.subscribe(self, classOf[MemberEvent], classOf[ReachabilityEvent])
cluster.subscribe(self, classOf[ReachabilityEvent])
logInfo("Metrics collection has started successfully") logInfo("Metrics collection has started successfully")
} }

View file

@ -24,7 +24,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
* Current state * Current state
*/ */
@volatile @volatile
private var state: CurrentClusterState = CurrentClusterState() private var _state: CurrentClusterState = CurrentClusterState()
@volatile @volatile
private var _reachability: Reachability = Reachability.empty private var _reachability: Reachability = Reachability.empty
@ -52,35 +52,37 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
def receive = { def receive = {
case e: ClusterDomainEvent e match { case e: ClusterDomainEvent e match {
case SeenChanged(convergence, seenBy) case SeenChanged(convergence, seenBy)
state = state.copy(seenBy = seenBy) _state = _state.copy(seenBy = seenBy)
case ReachabilityChanged(reachability) case ReachabilityChanged(reachability)
_reachability = reachability _reachability = reachability
case MemberRemoved(member, _) case MemberRemoved(member, _)
state = state.copy(members = state.members - member, unreachable = state.unreachable - member) _state = _state.copy(members = _state.members - member, unreachable = _state.unreachable - member)
case UnreachableMember(member) case UnreachableMember(member)
// replace current member with new member (might have different status, only address is used in equals) // replace current member with new member (might have different status, only address is used in equals)
state = state.copy(unreachable = state.unreachable - member + member) _state = _state.copy(unreachable = _state.unreachable - member + member)
case ReachableMember(member) case ReachableMember(member)
state = state.copy(unreachable = state.unreachable - member) _state = _state.copy(unreachable = _state.unreachable - member)
case event: MemberEvent case event: MemberEvent
// replace current member with new member (might have different status, only address is used in equals) // replace current member with new member (might have different status, only address is used in equals)
val newUnreachable = val newUnreachable =
if (state.unreachable.contains(event.member)) state.unreachable - event.member + event.member if (_state.unreachable.contains(event.member)) _state.unreachable - event.member + event.member
else state.unreachable else _state.unreachable
state = state.copy(members = state.members - event.member + event.member, _state = _state.copy(members = _state.members - event.member + event.member,
unreachable = newUnreachable) unreachable = newUnreachable)
case LeaderChanged(leader) case LeaderChanged(leader)
state = state.copy(leader = leader) _state = _state.copy(leader = leader)
case RoleLeaderChanged(role, leader) case RoleLeaderChanged(role, leader)
state = state.copy(roleLeaderMap = state.roleLeaderMap + (role -> leader)) _state = _state.copy(roleLeaderMap = _state.roleLeaderMap + (role -> leader))
case s: CurrentClusterState state = s
case stats: CurrentInternalStats _latestStats = stats case stats: CurrentInternalStats _latestStats = stats
case ClusterMetricsChanged(nodes) _clusterMetrics = nodes case ClusterMetricsChanged(nodes) _clusterMetrics = nodes
} }
case s: CurrentClusterState _state = s
} }
}).withDispatcher(cluster.settings.UseDispatcher).withDeploy(Deploy.local), name = "clusterEventBusListener") }).withDispatcher(cluster.settings.UseDispatcher).withDeploy(Deploy.local), name = "clusterEventBusListener")
} }
def state: CurrentClusterState = _state
def self: Member = { def self: Member = {
import cluster.selfUniqueAddress import cluster.selfUniqueAddress
state.members.find(_.uniqueAddress == selfUniqueAddress). state.members.find(_.uniqueAddress == selfUniqueAddress).

View file

@ -239,7 +239,7 @@ private[akka] trait ClusterRouterConfigBase extends RouterConfig {
// Intercept ClusterDomainEvent and route them to the ClusterRouterActor // Intercept ClusterDomainEvent and route them to the ClusterRouterActor
override def isManagementMessage(msg: Any): Boolean = override def isManagementMessage(msg: Any): Boolean =
(msg.isInstanceOf[ClusterDomainEvent]) || super.isManagementMessage(msg) (msg.isInstanceOf[ClusterDomainEvent]) || msg.isInstanceOf[CurrentClusterState] || super.isManagementMessage(msg)
} }
/** /**
@ -373,10 +373,9 @@ private[akka] trait ClusterRouterActor { this: RouterActor ⇒
def cluster: Cluster = Cluster(context.system) def cluster: Cluster = Cluster(context.system)
// re-subscribe when restart // re-subscribe when restart
override def preStart(): Unit = { override def preStart(): Unit =
cluster.subscribe(self, classOf[MemberEvent]) cluster.subscribe(self, classOf[MemberEvent], classOf[ReachabilityEvent])
cluster.subscribe(self, classOf[ReachabilityEvent])
}
override def postStop(): Unit = cluster.unsubscribe(self) override def postStop(): Unit = cluster.unsubscribe(self)
var nodes: immutable.SortedSet[Address] = { var nodes: immutable.SortedSet[Address] = {

View file

@ -44,6 +44,8 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
val g5 = Gossip(members = SortedSet(a51Up, aUp, bExiting, cUp)).seen(aUp.uniqueAddress).seen(bExiting.uniqueAddress).seen(cUp.uniqueAddress).seen(a51Up.uniqueAddress) val g5 = Gossip(members = SortedSet(a51Up, aUp, bExiting, cUp)).seen(aUp.uniqueAddress).seen(bExiting.uniqueAddress).seen(cUp.uniqueAddress).seen(a51Up.uniqueAddress)
val g6 = Gossip(members = SortedSet(aLeaving, bExiting, cUp)).seen(aUp.uniqueAddress) val g6 = Gossip(members = SortedSet(aLeaving, bExiting, cUp)).seen(aUp.uniqueAddress)
val g7 = Gossip(members = SortedSet(aExiting, bExiting, cUp)).seen(aUp.uniqueAddress) val g7 = Gossip(members = SortedSet(aExiting, bExiting, cUp)).seen(aUp.uniqueAddress)
val g8 = Gossip(members = SortedSet(aUp, bExiting, cUp, dUp), overview = GossipOverview(reachability =
Reachability.empty.unreachable(aUp.uniqueAddress, dUp.uniqueAddress))).seen(aUp.uniqueAddress)
// created in beforeEach // created in beforeEach
var memberSubscriber: TestProbe = _ var memberSubscriber: TestProbe = _
@ -74,7 +76,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
memberSubscriber.expectMsg(MemberExited(bExiting)) memberSubscriber.expectMsg(MemberExited(bExiting))
memberSubscriber.expectMsg(MemberUp(cUp)) memberSubscriber.expectMsg(MemberUp(cUp))
memberSubscriber.expectMsg(LeaderChanged(Some(a51Up.address))) memberSubscriber.expectMsg(LeaderChanged(Some(a51Up.address)))
memberSubscriber.expectNoMsg(1 second) memberSubscriber.expectNoMsg(500 millis)
} }
"publish leader changed when old leader leaves and is removed" in { "publish leader changed when old leader leaves and is removed" in {
@ -82,11 +84,11 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
memberSubscriber.expectMsg(MemberExited(bExiting)) memberSubscriber.expectMsg(MemberExited(bExiting))
memberSubscriber.expectMsg(MemberUp(cUp)) memberSubscriber.expectMsg(MemberUp(cUp))
publisher ! PublishChanges(g6) publisher ! PublishChanges(g6)
memberSubscriber.expectNoMsg(1 second) memberSubscriber.expectNoMsg(500 millis)
publisher ! PublishChanges(g7) publisher ! PublishChanges(g7)
memberSubscriber.expectMsg(MemberExited(aExiting)) memberSubscriber.expectMsg(MemberExited(aExiting))
memberSubscriber.expectMsg(LeaderChanged(Some(cUp.address))) memberSubscriber.expectMsg(LeaderChanged(Some(cUp.address)))
memberSubscriber.expectNoMsg(1 second) memberSubscriber.expectNoMsg(500 millis)
// at the removed member a an empty gossip is the last thing // at the removed member a an empty gossip is the last thing
publisher ! PublishChanges(Gossip.empty) publisher ! PublishChanges(Gossip.empty)
memberSubscriber.expectMsg(MemberRemoved(aRemoved, Exiting)) memberSubscriber.expectMsg(MemberRemoved(aRemoved, Exiting))
@ -103,12 +105,12 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
memberSubscriber.expectMsg(LeaderChanged(Some(a51Up.address))) memberSubscriber.expectMsg(LeaderChanged(Some(a51Up.address)))
publisher ! PublishChanges(g5) publisher ! PublishChanges(g5)
memberSubscriber.expectNoMsg(1 second) memberSubscriber.expectNoMsg(500 millis)
} }
"publish role leader changed" in { "publish role leader changed" in {
val subscriber = TestProbe() val subscriber = TestProbe()
publisher ! Subscribe(subscriber.ref, classOf[RoleLeaderChanged]) publisher ! Subscribe(subscriber.ref, InitialStateAsSnapshot, Set(classOf[RoleLeaderChanged]))
subscriber.expectMsgType[CurrentClusterState] subscriber.expectMsgType[CurrentClusterState]
publisher ! PublishChanges(Gossip(members = SortedSet(cJoining, dUp))) publisher ! PublishChanges(Gossip(members = SortedSet(cJoining, dUp)))
subscriber.expectMsg(RoleLeaderChanged("GRP", Some(dUp.address))) subscriber.expectMsg(RoleLeaderChanged("GRP", Some(dUp.address)))
@ -118,19 +120,29 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
"send CurrentClusterState when subscribe" in { "send CurrentClusterState when subscribe" in {
val subscriber = TestProbe() val subscriber = TestProbe()
publisher ! Subscribe(subscriber.ref, classOf[ClusterDomainEvent]) publisher ! Subscribe(subscriber.ref, InitialStateAsSnapshot, Set(classOf[ClusterDomainEvent]))
subscriber.expectMsgType[CurrentClusterState] subscriber.expectMsgType[CurrentClusterState]
// but only to the new subscriber // but only to the new subscriber
memberSubscriber.expectNoMsg(1 second) memberSubscriber.expectNoMsg(500 millis)
}
"send events corresponding to current state when subscribe" in {
val subscriber = TestProbe()
publisher ! PublishChanges(g8)
publisher ! Subscribe(subscriber.ref, InitialStateAsEvents, Set(classOf[MemberEvent], classOf[ReachabilityEvent]))
subscriber.receiveN(4).toSet should be(Set(MemberUp(aUp), MemberUp(cUp), MemberUp(dUp), MemberExited(bExiting)))
subscriber.expectMsg(UnreachableMember(dUp))
subscriber.expectNoMsg(500 millis)
} }
"support unsubscribe" in { "support unsubscribe" in {
val subscriber = TestProbe() val subscriber = TestProbe()
publisher ! Subscribe(subscriber.ref, classOf[MemberEvent]) publisher ! Subscribe(subscriber.ref, InitialStateAsSnapshot, Set(classOf[MemberEvent]))
subscriber.expectMsgType[CurrentClusterState] subscriber.expectMsgType[CurrentClusterState]
publisher ! Unsubscribe(subscriber.ref, Some(classOf[MemberEvent])) publisher ! Unsubscribe(subscriber.ref, Some(classOf[MemberEvent]))
publisher ! PublishChanges(g3) publisher ! PublishChanges(g3)
subscriber.expectNoMsg(1 second) subscriber.expectNoMsg(500 millis)
// but memberSubscriber is still subscriber // but memberSubscriber is still subscriber
memberSubscriber.expectMsg(MemberExited(bExiting)) memberSubscriber.expectMsg(MemberExited(bExiting))
memberSubscriber.expectMsg(MemberUp(cUp)) memberSubscriber.expectMsg(MemberUp(cUp))
@ -138,14 +150,14 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec
"publish SeenChanged" in { "publish SeenChanged" in {
val subscriber = TestProbe() val subscriber = TestProbe()
publisher ! Subscribe(subscriber.ref, classOf[SeenChanged]) publisher ! Subscribe(subscriber.ref, InitialStateAsSnapshot, Set(classOf[SeenChanged]))
subscriber.expectMsgType[CurrentClusterState] subscriber.expectMsgType[CurrentClusterState]
publisher ! PublishChanges(g2) publisher ! PublishChanges(g2)
subscriber.expectMsgType[SeenChanged] subscriber.expectMsgType[SeenChanged]
subscriber.expectNoMsg(1 second) subscriber.expectNoMsg(500 millis)
publisher ! PublishChanges(g3) publisher ! PublishChanges(g3)
subscriber.expectMsgType[SeenChanged] subscriber.expectMsgType[SeenChanged]
subscriber.expectNoMsg(1 second) subscriber.expectNoMsg(500 millis)
} }
"publish Removed when stopped" in { "publish Removed when stopped" in {

View file

@ -68,9 +68,27 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
awaitAssert(clusterView.status should be(MemberStatus.Up)) awaitAssert(clusterView.status should be(MemberStatus.Up))
} }
"publish inital state as snapshot to subscribers" in {
try {
cluster.subscribe(testActor, ClusterEvent.InitialStateAsSnapshot, classOf[ClusterEvent.MemberEvent])
expectMsgClass(classOf[ClusterEvent.CurrentClusterState])
} finally {
cluster.unsubscribe(testActor)
}
}
"publish inital state as events to subscribers" in {
try {
cluster.subscribe(testActor, ClusterEvent.InitialStateAsEvents, classOf[ClusterEvent.MemberEvent])
expectMsgClass(classOf[ClusterEvent.MemberUp])
} finally {
cluster.unsubscribe(testActor)
}
}
"publish CurrentClusterState to subscribers when requested" in { "publish CurrentClusterState to subscribers when requested" in {
try { try {
cluster.subscribe(testActor, classOf[ClusterEvent.ClusterDomainEvent]) cluster.subscribe(testActor, classOf[ClusterEvent.ClusterDomainEvent], classOf[ClusterEvent.CurrentClusterState])
// first, is in response to the subscription // first, is in response to the subscription
expectMsgClass(classOf[ClusterEvent.CurrentClusterState]) expectMsgClass(classOf[ClusterEvent.CurrentClusterState])

View file

@ -652,16 +652,18 @@ class ShardRegion(
def receive = { def receive = {
case Terminated(ref) receiveTerminated(ref) case Terminated(ref) receiveTerminated(ref)
case evt: ClusterDomainEvent receiveClusterEvent(evt) case evt: ClusterDomainEvent receiveClusterEvent(evt)
case state: CurrentClusterState receiveClusterState(state)
case msg: CoordinatorMessage receiveCoordinatorMessage(msg) case msg: CoordinatorMessage receiveCoordinatorMessage(msg)
case cmd: ShardRegionCommand receiveCommand(cmd) case cmd: ShardRegionCommand receiveCommand(cmd)
case msg if idExtractor.isDefinedAt(msg) deliverMessage(msg, sender) case msg if idExtractor.isDefinedAt(msg) deliverMessage(msg, sender)
} }
def receiveClusterEvent(evt: ClusterDomainEvent): Unit = evt match { def receiveClusterState(state: CurrentClusterState): Unit = {
case state: CurrentClusterState changeMembers(immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m
changeMembers(immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m m.status == MemberStatus.Up && matchingRole(m)))
m.status == MemberStatus.Up && matchingRole(m))) }
def receiveClusterEvent(evt: ClusterDomainEvent): Unit = evt match {
case MemberUp(m) case MemberUp(m)
if (matchingRole(m)) if (matchingRole(m))
changeMembers(membersByAge + m) changeMembers(membersByAge + m)

View file

@ -389,8 +389,7 @@ class ClusterSingletonManager(
require(!cluster.isTerminated, "Cluster node must not be terminated") require(!cluster.isTerminated, "Cluster node must not be terminated")
// subscribe to cluster changes, re-subscribe when restart // subscribe to cluster changes, re-subscribe when restart
cluster.subscribe(self, classOf[MemberExited]) cluster.subscribe(self, classOf[MemberExited], classOf[MemberRemoved])
cluster.subscribe(self, classOf[MemberRemoved])
setTimer(CleanupTimer, Cleanup, 1.minute, repeat = true) setTimer(CleanupTimer, Cleanup, 1.minute, repeat = true)

View file

@ -44,12 +44,12 @@ An actor that uses the cluster extension may look like this:
.. literalinclude:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/simple/SimpleClusterListener.java .. literalinclude:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/simple/SimpleClusterListener.java
:language: java :language: java
The actor registers itself as subscriber of certain cluster events. It gets notified with a snapshot event, ``CurrentClusterState`` The actor registers itself as subscriber of certain cluster events. It receives events corresponding to the current state
that holds full state information of the cluster. After that it receives events for changes that happen in the cluster. of the cluster when the subscription starts and then it receives events for changes that happen in the cluster.
The easiest way to run this example yourself is to download `Typesafe Activator <http://typesafe.com/platform/getstarted>`_ The easiest way to run this example yourself is to download `Typesafe Activator <http://typesafe.com/platform/getstarted>`_
and open the tutorial named `Akka Cluster Samples with Java <http://typesafe.com/activator/template/akka-sample-cluster-java>`_. and open the tutorial named `Akka Cluster Samples with Java <http://typesafe.com/activator/template/akka-sample-cluster-java>`_.
It contains instructions of how to run the <code>SimpleClusterApp</code>. It contains instructions of how to run the ``SimpleClusterApp``.
Joining to Seed Nodes Joining to Seed Nodes
^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^
@ -166,15 +166,27 @@ Subscribe to Cluster Events
^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^
You can subscribe to change notifications of the cluster membership by using You can subscribe to change notifications of the cluster membership by using
``Cluster.get(system).subscribe(subscriber, to)``. A snapshot of the full state, ``Cluster.get(system).subscribe``.
``akka.cluster.ClusterEvent.CurrentClusterState``, is sent to the subscriber
as the first event, followed by events for incremental updates. .. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/simple/SimpleClusterListener2.java#subscribe
A snapshot of the full state, ``akka.cluster.ClusterEvent.CurrentClusterState``, is sent to the subscriber
as the first message, followed by events for incremental updates.
Note that you may receive an empty ``CurrentClusterState``, containing no members, Note that you may receive an empty ``CurrentClusterState``, containing no members,
if you start the subscription before the initial join procedure has completed. if you start the subscription before the initial join procedure has completed.
This is expected behavior. When the node has been accepted in the cluster you will This is expected behavior. When the node has been accepted in the cluster you will
receive ``MemberUp`` for that node, and other nodes. receive ``MemberUp`` for that node, and other nodes.
If you find it inconvenient to handle the ``CurrentClusterState`` you can use
``ClusterEvent.initialStateAsEvents()`` as parameter to ``subscribe``.
That means that instead of receiving ``CurrentClusterState`` as the first message you will receive
the events corresponding to the current state to mimic what you would have seen if you were
listening to the events when they occurred in the past. Note that those initial events only correspond
to the current state and it is not the full history of all changes that actually has occurred in the cluster.
.. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/simple/SimpleClusterListener.java#subscribe
The events to track the life-cycle of members are: The events to track the life-cycle of members are:
* ``ClusterEvent.MemberUp`` - A new member has joined the cluster and its status has been changed to ``Up``. * ``ClusterEvent.MemberUp`` - A new member has joined the cluster and its status has been changed to ``Up``.
@ -190,6 +202,10 @@ There are more types of change events, consult the API documentation
of classes that extends ``akka.cluster.ClusterEvent.ClusterDomainEvent`` of classes that extends ``akka.cluster.ClusterEvent.ClusterDomainEvent``
for details about the events. for details about the events.
Instead of subscribing to cluster events it can sometimes be convenient to only get the full membership state with
``Cluster.get(system).state()``. Note that this state is not necessarily in sync with the events published to a
cluster subscription.
Worker Dial-in Example Worker Dial-in Example
---------------------- ----------------------

View file

@ -133,4 +133,21 @@ The following, previously deprecated, features have been removed:
* `API changes to FSM and TestFSMRef <http://doc.akka.io/docs/akka/2.2.3/project/migration-guide-2.1.x-2.2.x.html#API_changes_to_FSM_and_TestFSMRef>`_ * `API changes to FSM and TestFSMRef <http://doc.akka.io/docs/akka/2.2.3/project/migration-guide-2.1.x-2.2.x.html#API_changes_to_FSM_and_TestFSMRef>`_
* DefaultScheduler superseded by LightArrayRevolverScheduler * DefaultScheduler superseded by LightArrayRevolverScheduler
publishCurrentClusterState is Deprecated
========================================
Use ``sendCurrentClusterState`` instead. Note that you can also retrieve the current cluster state
with the new ``Cluster(system).state``.
CurrentClusterState is not a ClusterDomainEvent
===============================================
``CurrentClusterState`` does not implement the ``ClusterDomainEvent`` marker interface any more.
Note the new ``initialStateMode`` parameter of ``Cluster.subscribe``, which makes it possible
to handle the initial state as events instead of ``CurrentClusterState``. See
:ref:`documentation for Scala <cluster_subscriber_scala>` and
:ref:`documentation for Java <cluster_subscriber_java>`.

View file

@ -38,12 +38,12 @@ An actor that uses the cluster extension may look like this:
.. literalinclude:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/simple/SimpleClusterListener.scala .. literalinclude:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/simple/SimpleClusterListener.scala
:language: scala :language: scala
The actor registers itself as subscriber of certain cluster events. It gets notified with a snapshot event, ``CurrentClusterState`` The actor registers itself as subscriber of certain cluster events. It receives events corresponding to the current state
that holds full state information of the cluster. After that it receives events for changes that happen in the cluster. of the cluster when the subscription starts and then it receives events for changes that happen in the cluster.
The easiest way to run this example yourself is to download `Typesafe Activator <http://typesafe.com/platform/getstarted>`_ The easiest way to run this example yourself is to download `Typesafe Activator <http://typesafe.com/platform/getstarted>`_
and open the tutorial named `Akka Cluster Samples with Scala <http://typesafe.com/activator/template/akka-sample-cluster-scala>`_. and open the tutorial named `Akka Cluster Samples with Scala <http://typesafe.com/activator/template/akka-sample-cluster-scala>`_.
It contains instructions of how to run the <code>SimpleClusterApp</code>. It contains instructions of how to run the ``SimpleClusterApp``.
Joining to Seed Nodes Joining to Seed Nodes
^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^
@ -160,15 +160,27 @@ Subscribe to Cluster Events
^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^
You can subscribe to change notifications of the cluster membership by using You can subscribe to change notifications of the cluster membership by using
``Cluster(system).subscribe(subscriber, to)``. A snapshot of the full state, ``Cluster(system).subscribe``.
``akka.cluster.ClusterEvent.CurrentClusterState``, is sent to the subscriber
as the first event, followed by events for incremental updates. .. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/simple/SimpleClusterListener2.scala#subscribe
A snapshot of the full state, ``akka.cluster.ClusterEvent.CurrentClusterState``, is sent to the subscriber
as the first message, followed by events for incremental updates.
Note that you may receive an empty ``CurrentClusterState``, containing no members, Note that you may receive an empty ``CurrentClusterState``, containing no members,
if you start the subscription before the initial join procedure has completed. if you start the subscription before the initial join procedure has completed.
This is expected behavior. When the node has been accepted in the cluster you will This is expected behavior. When the node has been accepted in the cluster you will
receive ``MemberUp`` for that node, and other nodes. receive ``MemberUp`` for that node, and other nodes.
If you find it inconvenient to handle the ``CurrentClusterState`` you can use
``ClusterEvent.InitialStateAsEvents`` as parameter to ``subscribe``.
That means that instead of receiving ``CurrentClusterState`` as the first message you will receive
the events corresponding to the current state to mimic what you would have seen if you were
listening to the events when they occurred in the past. Note that those initial events only correspond
to the current state and it is not the full history of all changes that actually has occurred in the cluster.
.. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/simple/SimpleClusterListener.scala#subscribe
The events to track the life-cycle of members are: The events to track the life-cycle of members are:
* ``ClusterEvent.MemberUp`` - A new member has joined the cluster and its status has been changed to ``Up``. * ``ClusterEvent.MemberUp`` - A new member has joined the cluster and its status has been changed to ``Up``.
@ -184,6 +196,10 @@ There are more types of change events, consult the API documentation
of classes that extends ``akka.cluster.ClusterEvent.ClusterDomainEvent`` of classes that extends ``akka.cluster.ClusterEvent.ClusterDomainEvent``
for details about the events. for details about the events.
Instead of subscribing to cluster events it can sometimes be convenient to only get the full membership state with
``Cluster(system).state``. Note that this state is not necessarily in sync with the events published to a
cluster subscription.
Worker Dial-in Example Worker Dial-in Example
---------------------- ----------------------

View file

@ -2,8 +2,8 @@ package sample.cluster.simple;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import akka.cluster.Cluster; import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.ClusterDomainEvent; import akka.cluster.ClusterEvent;
import akka.cluster.ClusterEvent.CurrentClusterState; import akka.cluster.ClusterEvent.MemberEvent;
import akka.cluster.ClusterEvent.MemberUp; import akka.cluster.ClusterEvent.MemberUp;
import akka.cluster.ClusterEvent.MemberRemoved; import akka.cluster.ClusterEvent.MemberRemoved;
import akka.cluster.ClusterEvent.UnreachableMember; import akka.cluster.ClusterEvent.UnreachableMember;
@ -14,10 +14,13 @@ public class SimpleClusterListener extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this); LoggingAdapter log = Logging.getLogger(getContext().system(), this);
Cluster cluster = Cluster.get(getContext().system()); Cluster cluster = Cluster.get(getContext().system());
//subscribe to cluster changes, MemberUp //subscribe to cluster changes
@Override @Override
public void preStart() { public void preStart() {
cluster.subscribe(getSelf(), ClusterDomainEvent.class); //#subscribe
cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(),
MemberEvent.class, UnreachableMember.class);
//#subscribe
} }
//re-subscribe when restart //re-subscribe when restart
@ -28,11 +31,7 @@ public class SimpleClusterListener extends UntypedActor {
@Override @Override
public void onReceive(Object message) { public void onReceive(Object message) {
if (message instanceof CurrentClusterState) { if (message instanceof MemberUp) {
CurrentClusterState state = (CurrentClusterState) message;
log.info("Current members: {}", state.members());
} else if (message instanceof MemberUp) {
MemberUp mUp = (MemberUp) message; MemberUp mUp = (MemberUp) message;
log.info("Member is Up: {}", mUp.member()); log.info("Member is Up: {}", mUp.member());
@ -44,7 +43,7 @@ public class SimpleClusterListener extends UntypedActor {
MemberRemoved mRemoved = (MemberRemoved) message; MemberRemoved mRemoved = (MemberRemoved) message;
log.info("Member is Removed: {}", mRemoved.member()); log.info("Member is Removed: {}", mRemoved.member());
} else if (message instanceof ClusterDomainEvent) { } else if (message instanceof MemberEvent) {
// ignore // ignore
} else { } else {

View file

@ -0,0 +1,57 @@
package sample.cluster.simple;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.ClusterEvent.MemberEvent;
import akka.cluster.ClusterEvent.MemberUp;
import akka.cluster.ClusterEvent.MemberRemoved;
import akka.cluster.ClusterEvent.UnreachableMember;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class SimpleClusterListener2 extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
Cluster cluster = Cluster.get(getContext().system());
//subscribe to cluster changes
@Override
public void preStart() {
//#subscribe
cluster.subscribe(getSelf(), MemberEvent.class, UnreachableMember.class);
//#subscribe
}
//re-subscribe when restart
@Override
public void postStop() {
cluster.unsubscribe(getSelf());
}
@Override
public void onReceive(Object message) {
if (message instanceof CurrentClusterState) {
CurrentClusterState state = (CurrentClusterState) message;
log.info("Current members: {}", state.members());
} else if (message instanceof MemberUp) {
MemberUp mUp = (MemberUp) message;
log.info("Member is Up: {}", mUp.member());
} else if (message instanceof UnreachableMember) {
UnreachableMember mUnreachable = (UnreachableMember) message;
log.info("Member detected as unreachable: {}", mUnreachable.member());
} else if (message instanceof MemberRemoved) {
MemberRemoved mRemoved = (MemberRemoved) message;
log.info("Member is Removed: {}", mRemoved.member());
} else if (message instanceof MemberEvent) {
// ignore
} else {
unhandled(message);
}
}
}

View file

@ -17,9 +17,12 @@ import akka.actor.Address;
import akka.actor.Cancellable; import akka.actor.Cancellable;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import akka.cluster.Cluster; import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.UnreachableMember;
import akka.cluster.ClusterEvent.ReachableMember;
import akka.cluster.ClusterEvent.CurrentClusterState; import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.ClusterEvent.MemberEvent; import akka.cluster.ClusterEvent.MemberEvent;
import akka.cluster.ClusterEvent.MemberUp; import akka.cluster.ClusterEvent.MemberUp;
import akka.cluster.ClusterEvent.ReachabilityEvent;
import akka.cluster.Member; import akka.cluster.Member;
import akka.cluster.MemberStatus; import akka.cluster.MemberStatus;
@ -44,7 +47,7 @@ public class StatsSampleClient extends UntypedActor {
//subscribe to cluster changes, MemberEvent //subscribe to cluster changes, MemberEvent
@Override @Override
public void preStart() { public void preStart() {
cluster.subscribe(getSelf(), MemberEvent.class); cluster.subscribe(getSelf(), MemberEvent.class, ReachabilityEvent.class);
} }
//re-subscribe when restart //re-subscribe when restart
@ -91,6 +94,15 @@ public class StatsSampleClient extends UntypedActor {
MemberEvent other = (MemberEvent) message; MemberEvent other = (MemberEvent) message;
nodes.remove(other.member().address()); nodes.remove(other.member().address());
} else if (message instanceof UnreachableMember) {
UnreachableMember unreachable = (UnreachableMember) message;
nodes.remove(unreachable.member().address());
} else if (message instanceof ReachableMember) {
ReachableMember reachable = (ReachableMember) message;
if (reachable.member().hasRole("compute"))
nodes.add(reachable.member().address());
} else { } else {
unhandled(message); unhandled(message);
} }

View file

@ -10,12 +10,15 @@ class SimpleClusterListener extends Actor with ActorLogging {
val cluster = Cluster(context.system) val cluster = Cluster(context.system)
// subscribe to cluster changes, re-subscribe when restart // subscribe to cluster changes, re-subscribe when restart
override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterDomainEvent]) override def preStart(): Unit = {
//#subscribe
cluster.subscribe(self, initialStateMode = InitialStateAsEvents,
classOf[MemberEvent], classOf[UnreachableMember])
//#subscribe
}
override def postStop(): Unit = cluster.unsubscribe(self) override def postStop(): Unit = cluster.unsubscribe(self)
def receive = { def receive = {
case state: CurrentClusterState =>
log.info("Current members: {}", state.members.mkString(", "))
case MemberUp(member) => case MemberUp(member) =>
log.info("Member is Up: {}", member.address) log.info("Member is Up: {}", member.address)
case UnreachableMember(member) => case UnreachableMember(member) =>
@ -23,6 +26,6 @@ class SimpleClusterListener extends Actor with ActorLogging {
case MemberRemoved(member, previousStatus) => case MemberRemoved(member, previousStatus) =>
log.info("Member is Removed: {} after {}", log.info("Member is Removed: {} after {}",
member.address, previousStatus) member.address, previousStatus)
case _: ClusterDomainEvent => // ignore case _: MemberEvent => // ignore
} }
} }

View file

@ -0,0 +1,32 @@
package sample.cluster.simple
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.actor.ActorLogging
import akka.actor.Actor
class SimpleClusterListener2 extends Actor with ActorLogging {
val cluster = Cluster(context.system)
// subscribe to cluster changes, re-subscribe when restart
override def preStart(): Unit = {
//#subscribe
cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember])
//#subscribe
}
override def postStop(): Unit = cluster.unsubscribe(self)
def receive = {
case state: CurrentClusterState =>
log.info("Current members: {}", state.members.mkString(", "))
case MemberUp(member) =>
log.info("Member is Up: {}", member.address)
case UnreachableMember(member) =>
log.info("Member detected as unreachable: {}", member)
case MemberRemoved(member, previousStatus) =>
log.info("Member is Removed: {} after {}",
member.address, previousStatus)
case _: MemberEvent => // ignore
}
}

View file

@ -61,8 +61,7 @@ class StatsSampleClient(servicePath: String) extends Actor {
var nodes = Set.empty[Address] var nodes = Set.empty[Address]
override def preStart(): Unit = { override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent]) cluster.subscribe(self, classOf[MemberEvent], classOf[ReachabilityEvent])
cluster.subscribe(self, classOf[UnreachableMember])
} }
override def postStop(): Unit = { override def postStop(): Unit = {
cluster.unsubscribe(self) cluster.unsubscribe(self)
@ -83,9 +82,10 @@ class StatsSampleClient(servicePath: String) extends Actor {
nodes = state.members.collect { nodes = state.members.collect {
case m if m.hasRole("compute") && m.status == MemberStatus.Up => m.address case m if m.hasRole("compute") && m.status == MemberStatus.Up => m.address
} }
case MemberUp(m) if m.hasRole("compute") => nodes += m.address case MemberUp(m) if m.hasRole("compute") => nodes += m.address
case other: MemberEvent => nodes -= other.member.address case other: MemberEvent => nodes -= other.member.address
case UnreachableMember(m) => nodes -= m.address case UnreachableMember(m) => nodes -= m.address
case ReachableMember(m) if m.hasRole("compute") => nodes += m.address
} }
} }