Add more description about ClusterEvent.InitialStateAsSnapshot (Closes #25172)

This commit is contained in:
Richard S. Imaoka 2018-08-25 15:02:56 +09:00 committed by Konrad Malawski
parent ffc08d0b7a
commit 5dd487311d
3 changed files with 44 additions and 3 deletions

View file

@ -422,10 +422,28 @@ A snapshot of the full state, `akka.cluster.ClusterEvent.CurrentClusterState`, i
as the first message, followed by events for incremental updates.
Note that you may receive an empty `CurrentClusterState`, containing no members,
followed by `MemberUp` events from other nodes which already joined,
if you start the subscription before the initial join procedure has completed.
This may for example happen when you start the subscription immediately after `cluster.join()` like below.
This is expected behavior. When the node has been accepted in the cluster you will
receive `MemberUp` for that node, and other nodes.
Scala
: @@snip [SimpleClusterListener2.scala]($code$/scala/docs/cluster/SimpleClusterListener2.scala) { #join #subscribe }
Java
: @@snip [SimpleClusterListener2.java]($code$/java/jdocs/cluster/SimpleClusterListener2.java) { #join #subscribe }
To avoid receiving an empty `CurrentClusterState` at the beginning, you can use it like shown in the following example,
to defer subscription until the `MemberUp` event for the own node is received:
Scala
: @@snip [SimpleClusterListener2.scala]($code$/scala/docs/cluster/SimpleClusterListener2.scala) { #join #register-on-memberup }
Java
: @@snip [SimpleClusterListener2.java]($code$/java/jdocs/cluster/SimpleClusterListener2.java) { #join #register-on-memberup }
If you find it inconvenient to handle the `CurrentClusterState` you can use
@scala[`ClusterEvent.InitialStateAsEvents`] @java[`ClusterEvent.initialStateAsEvents()`] as parameter to `subscribe`.
That means that instead of receiving `CurrentClusterState` as the first message you will receive

View file

@ -16,14 +16,26 @@ import akka.event.LoggingAdapter;
public class SimpleClusterListener2 extends AbstractActor {
LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
//#join
Cluster cluster = Cluster.get(getContext().getSystem());
//#join
//subscribe to cluster changes
@Override
public void preStart() {
//#join
cluster.join(cluster.selfAddress());
//#join
//#subscribe
cluster.subscribe(getSelf(), MemberEvent.class, UnreachableMember.class);
//#subscribe
//#register-on-memberup
cluster.registerOnMemberUp(
() -> cluster.subscribe(getSelf(), MemberEvent.class, UnreachableMember.class)
);
//#register-on-memberup
}
//re-subscribe when restart

View file

@ -2,22 +2,33 @@
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package scala.docs.cluster
package docs.cluster
import akka.actor.{ Actor, ActorLogging }
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.actor.ActorLogging
import akka.actor.Actor
class SimpleClusterListener2 extends Actor with ActorLogging {
//#join
val cluster = Cluster(context.system)
//#join
// subscribe to cluster changes, re-subscribe when restart
override def preStart(): Unit = {
//#join
cluster.join(cluster.selfAddress)
//#join
//#subscribe
cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember])
//#subscribe
//#register-on-memberup
cluster.registerOnMemberUp {
cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember])
}
//#register-on-memberup
}
override def postStop(): Unit = cluster.unsubscribe(self)