From 5dd487311d3e2578893faf8a253ef621cdc190be Mon Sep 17 00:00:00 2001 From: "Richard S. Imaoka" Date: Sat, 25 Aug 2018 15:02:56 +0900 Subject: [PATCH] Add more description about ClusterEvent.InitialStateAsSnapshot (Closes #25172) --- akka-docs/src/main/paradox/cluster-usage.md | 18 ++++++++++++++++++ .../jdocs/cluster/SimpleClusterListener2.java | 12 ++++++++++++ .../docs/cluster/SimpleClusterListener2.scala | 17 ++++++++++++++--- 3 files changed, 44 insertions(+), 3 deletions(-) diff --git a/akka-docs/src/main/paradox/cluster-usage.md b/akka-docs/src/main/paradox/cluster-usage.md index 5187e6dab4..ecf0917879 100644 --- a/akka-docs/src/main/paradox/cluster-usage.md +++ b/akka-docs/src/main/paradox/cluster-usage.md @@ -422,10 +422,28 @@ A snapshot of the full state, `akka.cluster.ClusterEvent.CurrentClusterState`, i as the first message, followed by events for incremental updates. Note that you may receive an empty `CurrentClusterState`, containing no members, +followed by `MemberUp` events from other nodes which already joined, if you start the subscription before the initial join procedure has completed. +This may for example happen when you start the subscription immediately after `cluster.join()` like below. This is expected behavior. When the node has been accepted in the cluster you will receive `MemberUp` for that node, and other nodes. +Scala +: @@snip [SimpleClusterListener2.scala]($code$/scala/docs/cluster/SimpleClusterListener2.scala) { #join #subscribe } + +Java +: @@snip [SimpleClusterListener2.java]($code$/java/jdocs/cluster/SimpleClusterListener2.java) { #join #subscribe } + +To avoid receiving an empty `CurrentClusterState` at the beginning, you can use it like shown in the following example, +to defer subscription until the `MemberUp` event for the own node is received: + +Scala +: @@snip [SimpleClusterListener2.scala]($code$/scala/docs/cluster/SimpleClusterListener2.scala) { #join #register-on-memberup } + +Java +: @@snip [SimpleClusterListener2.java]($code$/java/jdocs/cluster/SimpleClusterListener2.java) { #join #register-on-memberup } + + If you find it inconvenient to handle the `CurrentClusterState` you can use @scala[`ClusterEvent.InitialStateAsEvents`] @java[`ClusterEvent.initialStateAsEvents()`] as parameter to `subscribe`. That means that instead of receiving `CurrentClusterState` as the first message you will receive diff --git a/akka-docs/src/test/java/jdocs/cluster/SimpleClusterListener2.java b/akka-docs/src/test/java/jdocs/cluster/SimpleClusterListener2.java index ab2e0b52a4..6831d45be4 100644 --- a/akka-docs/src/test/java/jdocs/cluster/SimpleClusterListener2.java +++ b/akka-docs/src/test/java/jdocs/cluster/SimpleClusterListener2.java @@ -16,14 +16,26 @@ import akka.event.LoggingAdapter; public class SimpleClusterListener2 extends AbstractActor { LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); + //#join Cluster cluster = Cluster.get(getContext().getSystem()); + //#join //subscribe to cluster changes @Override public void preStart() { + //#join + cluster.join(cluster.selfAddress()); + //#join + //#subscribe cluster.subscribe(getSelf(), MemberEvent.class, UnreachableMember.class); //#subscribe + + //#register-on-memberup + cluster.registerOnMemberUp( + () -> cluster.subscribe(getSelf(), MemberEvent.class, UnreachableMember.class) + ); + //#register-on-memberup } //re-subscribe when restart diff --git a/akka-docs/src/test/scala/docs/cluster/SimpleClusterListener2.scala b/akka-docs/src/test/scala/docs/cluster/SimpleClusterListener2.scala index 6d4036461c..9695a48182 100644 --- a/akka-docs/src/test/scala/docs/cluster/SimpleClusterListener2.scala +++ b/akka-docs/src/test/scala/docs/cluster/SimpleClusterListener2.scala @@ -2,22 +2,33 @@ * Copyright (C) 2018 Lightbend Inc. */ -package scala.docs.cluster +package docs.cluster +import akka.actor.{ Actor, ActorLogging } import akka.cluster.Cluster import akka.cluster.ClusterEvent._ -import akka.actor.ActorLogging -import akka.actor.Actor class SimpleClusterListener2 extends Actor with ActorLogging { + //#join val cluster = Cluster(context.system) + //#join // subscribe to cluster changes, re-subscribe when restart override def preStart(): Unit = { + //#join + cluster.join(cluster.selfAddress) + //#join + //#subscribe cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember]) //#subscribe + + //#register-on-memberup + cluster.registerOnMemberUp { + cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember]) + } + //#register-on-memberup } override def postStop(): Unit = cluster.unsubscribe(self)