diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 6635c7251f..dc56109ae3 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -4,32 +4,24 @@ package akka.cluster -import language.implicitConversions -import akka.actor._ -import akka.actor.Status._ -import akka.ConfigurationException -import akka.dispatch.MonitorableThreadFactory -import akka.event.Logging -import akka.pattern._ -import akka.remote._ -import akka.routing._ -import akka.util._ -import scala.concurrent.duration._ -import scala.concurrent.forkjoin.ThreadLocalRandom -import scala.annotation.tailrec -import scala.collection.immutable import java.io.Closeable -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.atomic.AtomicReference -import scala.concurrent.{ ExecutionContext, Await } -import com.typesafe.config.ConfigFactory -import akka.remote.DefaultFailureDetectorRegistry -import akka.remote.FailureDetector -import com.typesafe.config.Config -import akka.event.LoggingAdapter import java.util.concurrent.ThreadFactory -import scala.util.control.NonFatal +import java.util.concurrent.atomic.AtomicBoolean + +import akka.ConfigurationException +import akka.actor._ +import akka.dispatch.MonitorableThreadFactory +import akka.event.{ Logging, LoggingAdapter } +import akka.pattern._ +import akka.remote.{ DefaultFailureDetectorRegistry, FailureDetector, _ } +import com.typesafe.config.{ Config, ConfigFactory } + import scala.annotation.varargs +import scala.collection.immutable +import scala.concurrent.duration._ +import scala.concurrent.{ Await, ExecutionContext } +import scala.language.implicitConversions +import scala.util.control.NonFatal /** * Cluster Extension Id and factory for creating Cluster extension. @@ -65,8 +57,8 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { import ClusterEvent._ val settings = new ClusterSettings(system.settings.config, system.name) - import settings._ import InfoLogger._ + import settings._ /** * The address including a `uid` of this cluster member. @@ -119,7 +111,6 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { */ private[cluster] val scheduler: Scheduler = { if (system.scheduler.maxFrequency < 1.second / SchedulerTickDuration) { - import scala.collection.JavaConverters._ logInfo("Using a dedicated scheduler for cluster. Default scheduler can be used if configured " + "with 'akka.scheduler.tick-duration' [{} ms] <= 'akka.cluster.scheduler.tick-duration' [{} ms].", (1000 / system.scheduler.maxFrequency).toInt, SchedulerTickDuration.toMillis) @@ -314,7 +305,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { * a certain size. */ def registerOnMemberUp[T](code: ⇒ T): Unit = - registerOnMemberUp(new Runnable { def run = code }) + registerOnMemberUp(new Runnable { def run() = code }) /** * Java API: The supplied callback will be run, once, when current cluster member is `Up`. @@ -322,8 +313,27 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { * to defer some action, such as starting actors, until the cluster has reached * a certain size. */ - def registerOnMemberUp(callback: Runnable): Unit = clusterDaemons ! InternalClusterAction.AddOnMemberUpListener(callback) + def registerOnMemberUp(callback: Runnable): Unit = + clusterDaemons ! InternalClusterAction.AddOnMemberUpListener(callback) + /** + * The supplied thunk will be run, once, when current cluster member is `Removed`. + * and if the cluster have been shutdown,that thunk will run on the caller thread immediately. + * Typically used together `cluster.leave(cluster.selfAddress)` and then `system.shutdown()`. + */ + def registerOnMemberRemoved[T](code: ⇒ T): Unit = + registerOnMemberRemoved(new Runnable { override def run(): Unit = code }) + /** + * Java API: The supplied thunk will be run, once, when current cluster member is `Removed`. + * and if the cluster have been shutdown,that thunk will run on the caller thread immediately. + * Typically used together `cluster.leave(cluster.selfAddress)` and then `system.shutdown()`. + */ + def registerOnMemberRemoved(callback: Runnable): Unit = { + if (_isTerminated.get()) + callback.run() + else + clusterDaemons ! InternalClusterAction.AddOnMemberRemovedListener(callback) + } // ======================================================== // ===================== INTERNAL API ===================== // ======================================================== diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index f5d3157f95..7d9a7ec924 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -129,10 +129,12 @@ private[cluster] object InternalClusterAction { /** * Comand to [[akka.cluster.ClusterDaemon]] to create a - * [[akka.cluster.OnMemberUpListener]]. + * [[akka.cluster.OnMemberStatusChangedListener]]. */ final case class AddOnMemberUpListener(callback: Runnable) extends NoSerializationVerificationNeeded + final case class AddOnMemberRemovedListener(callback: Runnable) extends NoSerializationVerificationNeeded + sealed trait SubscriptionMessage final case class Subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode, to: Set[Class[_]]) extends SubscriptionMessage final case class Unsubscribe(subscriber: ActorRef, to: Option[Class[_]]) extends SubscriptionMessage @@ -165,7 +167,9 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac def receive = { case msg: GetClusterCoreRef.type ⇒ coreSupervisor forward msg case AddOnMemberUpListener(code) ⇒ - context.actorOf(Props(classOf[OnMemberUpListener], code).withDeploy(Deploy.local)) + context.actorOf(Props(classOf[OnMemberStatusChangedListener], code, Up).withDeploy(Deploy.local)) + case AddOnMemberRemovedListener(code) ⇒ + context.actorOf(Props(classOf[OnMemberStatusChangedListener], code, Removed).withDeploy(Deploy.local)) case PublisherCreated(publisher) ⇒ if (settings.MetricsEnabled) { // metrics must be started after core/publisher to be able @@ -1112,36 +1116,45 @@ private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq /** * INTERNAL API * - * The supplied callback will be run, once, when current cluster member is `Up`. + * The supplied callback will be run, once, when current cluster member come up with the same status. */ -private[cluster] class OnMemberUpListener(callback: Runnable) extends Actor with ActorLogging { +private[cluster] class OnMemberStatusChangedListener(callback: Runnable, status: MemberStatus) extends Actor with ActorLogging { import ClusterEvent._ - val cluster = Cluster(context.system) - // subscribe to MemberUp, re-subscribe when restart + private val cluster = Cluster(context.system) + private val to = status match { + case Up ⇒ + classOf[MemberUp] + case Removed ⇒ + classOf[MemberRemoved] + } + override def preStart(): Unit = - cluster.subscribe(self, classOf[MemberUp]) + cluster.subscribe(self, to) override def postStop(): Unit = cluster.unsubscribe(self) def receive = { case state: CurrentClusterState ⇒ - if (state.members.exists(isSelfUp(_))) + if (state.members.exists(isTriggered)) done() - case MemberUp(m) ⇒ - if (isSelfUp(m)) + case MemberUp(member) ⇒ + if (isTriggered(member)) + done() + case MemberRemoved(member, _) ⇒ + if (isTriggered(member)) done() } - def done(): Unit = { + private def done(): Unit = { try callback.run() catch { - case NonFatal(e) ⇒ log.error(e, "OnMemberUp callback failed with [{}]", e.getMessage) + case NonFatal(e) ⇒ log.error(e, "[{}] callback failed with [{}]", s"On${to.getSimpleName}", e.getMessage) } finally { context stop self } } - def isSelfUp(m: Member): Boolean = - m.uniqueAddress == cluster.selfUniqueAddress && m.status == MemberStatus.Up + private def isTriggered(m: Member): Boolean = + m.uniqueAddress == cluster.selfUniqueAddress && m.status == status } diff --git a/akka-docs/rst/java/cluster-usage.rst b/akka-docs/rst/java/cluster-usage.rst index 148a03bf0b..6fb9d5db85 100644 --- a/akka-docs/rst/java/cluster-usage.rst +++ b/akka-docs/rst/java/cluster-usage.rst @@ -92,7 +92,7 @@ Joining programatically can be performed with ``Cluster.get(system).join``. You can join to any node in the cluster. It does not have to be configured as a seed node. Note that you can only join to an existing cluster member, which means that for bootstrapping some -node must join itself. +node must join itself,and then the following nodes could join them to make up a cluster. You may also use ``Cluster.get(system).joinSeedNodes`` to join programmatically, which is attractive when dynamically discovering other nodes at startup by using some external tool or API. @@ -108,8 +108,9 @@ that the last join request is retried. Retries can be disabled by setting the pr An actor system can only join a cluster once. Additional attempts will be ignored. When it has successfully joined it must be restarted to be able to join another cluster or to join the same cluster again. It can use the same host name and port -after the restart, but it must have been removed from the cluster before the join -request is accepted. +after the restart, when it come up as new incarnation of existing member in the cluster, +trying to join in ,then the existing one will be removed from the cluster and then it will be allowed to join. + .. _automatic-vs-manual-downing-java: @@ -276,13 +277,25 @@ before the leader changes member status of 'Joining' members to 'Up'. .. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/resources/factorial.conf#role-min-nr-of-members You can start the actors in a ``registerOnMemberUp`` callback, which will -be invoked when the current member status is changed tp 'Up', i.e. the cluster +be invoked when the current member status is changed to 'Up', i.e. the cluster has at least the defined number of members. .. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontendMain.java#registerOnUp This callback can be used for other things than starting actors. +You can do some clean up in a ``registerOnMemberRemoved`` callback, which will +be invoked when the current member status is changed to 'Removed' or the cluster have been shutdown,i.e. +terminate the actor system. + +.. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontendMain.java#registerOnRemoved + +.. note:: +Register a OnMemberRemoved callback on a cluster that have been shutdown ,the callback will be invoked immediately on + the caller thread,otherwise it will be invoked later when the current member status changed to 'Removed'.You may + want to install some cleanup handling after the cluster was started up,but the cluster might already be shutting + down when you installing, and depending on the race is not healthy. + Cluster Singleton ^^^^^^^^^^^^^^^^^ diff --git a/akka-docs/rst/scala/cluster-usage.rst b/akka-docs/rst/scala/cluster-usage.rst index 3e44ea31d6..b0312c0d38 100644 --- a/akka-docs/rst/scala/cluster-usage.rst +++ b/akka-docs/rst/scala/cluster-usage.rst @@ -86,7 +86,7 @@ Joining programatically can be performed with ``Cluster(system).join``. You can join to any node in the cluster. It does not have to be configured as a seed node. Note that you can only join to an existing cluster member, which means that for bootstrapping some -node must join itself. +node must join itself,and then the following nodes could join them to make up a cluster. You may also use ``Cluster(system).joinSeedNodes`` to join programmatically, which is attractive when dynamically discovering other nodes at startup by using some external tool or API. @@ -102,9 +102,9 @@ that the last join request is retried. Retries can be disabled by setting the pr An actor system can only join a cluster once. Additional attempts will be ignored. When it has successfully joined it must be restarted to be able to join another -cluster or to join the same cluster again. It can use the same host name and port -after the restart, but it must have been removed from the cluster before the join -request is accepted. +cluster or to join the same cluster again.It can use the same host name and port +after the restart, when it come up as new incarnation of existing member in the cluster, +trying to join in ,then the existing one will be removed from the cluster and then it will be allowed to join. .. _automatic-vs-manual-downing-scala: @@ -271,13 +271,25 @@ before the leader changes member status of 'Joining' members to 'Up'. .. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/resources/factorial.conf#role-min-nr-of-members You can start the actors in a ``registerOnMemberUp`` callback, which will -be invoked when the current member status is changed tp 'Up', i.e. the cluster +be invoked when the current member status is changed to 'Up',i.e. the cluster has at least the defined number of members. .. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/factorial/FactorialFrontend.scala#registerOnUp This callback can be used for other things than starting actors. +You can do some clean up in a ``registerOnMemberRemoved`` callback, which will +be invoked when the current member status is changed to 'Removed' or the cluster have been shutdown,i.e. +terminate the actor system. + +.. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/factorial/FactorialFrontend.scala#registerOnRemoved + +.. note:: + Register a OnMemberRemoved callback on a cluster that have been shutdown ,the callback will be invoked immediately on + the caller thread,otherwise it will be invoked later when the current member status changed to 'Removed'.You may + want to install some cleanup handling after the cluster was started up,but the cluster might already be shutting + down when you installing, and depending on the race is not healthy. + Cluster Singleton ^^^^^^^^^^^^^^^^^ diff --git a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontendMain.java b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontendMain.java index 492dbc5c95..69bc399ffc 100644 --- a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontendMain.java +++ b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontendMain.java @@ -17,7 +17,7 @@ public class FactorialFrontendMain { final ActorSystem system = ActorSystem.create("ClusterSystem", config); system.log().info( - "Factorials will start when 2 backend members in the cluster."); + "Factorials will start when 2 backend members in the cluster."); //#registerOnUp Cluster.get(system).registerOnMemberUp(new Runnable() { @Override @@ -27,6 +27,16 @@ public class FactorialFrontendMain { } }); //#registerOnUp + + //#registerOnRemoved + Cluster.get(system).registerOnMemberRemoved(new Runnable() { + @Override + public void run() { + system.terminate(); + } + }); + //#registerOnRemoved + } } diff --git a/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/factorial/FactorialFrontend.scala b/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/factorial/FactorialFrontend.scala index b9606d1456..e4671dae77 100644 --- a/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/factorial/FactorialFrontend.scala +++ b/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/factorial/FactorialFrontend.scala @@ -57,5 +57,12 @@ object FactorialFrontend { name = "factorialFrontend") } //#registerOnUp + + //#registerOnRemoved + Cluster(system).registerOnMemberRemoved{ + system.terminate() + } + //#registerOnRemoved + } } \ No newline at end of file