+clu #16736 add registerOnMemberRemoved to get notified when current member removed from the cluster

This commit is contained in:
hepin 2015-04-17 17:28:37 +08:00
parent 2d70599d22
commit ccca503b4d
6 changed files with 116 additions and 51 deletions

View file

@ -4,32 +4,24 @@
package akka.cluster
import language.implicitConversions
import akka.actor._
import akka.actor.Status._
import akka.ConfigurationException
import akka.dispatch.MonitorableThreadFactory
import akka.event.Logging
import akka.pattern._
import akka.remote._
import akka.routing._
import akka.util._
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.annotation.tailrec
import scala.collection.immutable
import java.io.Closeable
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.{ ExecutionContext, Await }
import com.typesafe.config.ConfigFactory
import akka.remote.DefaultFailureDetectorRegistry
import akka.remote.FailureDetector
import com.typesafe.config.Config
import akka.event.LoggingAdapter
import java.util.concurrent.ThreadFactory
import scala.util.control.NonFatal
import java.util.concurrent.atomic.AtomicBoolean
import akka.ConfigurationException
import akka.actor._
import akka.dispatch.MonitorableThreadFactory
import akka.event.{ Logging, LoggingAdapter }
import akka.pattern._
import akka.remote.{ DefaultFailureDetectorRegistry, FailureDetector, _ }
import com.typesafe.config.{ Config, ConfigFactory }
import scala.annotation.varargs
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext }
import scala.language.implicitConversions
import scala.util.control.NonFatal
/**
* Cluster Extension Id and factory for creating Cluster extension.
@ -65,8 +57,8 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
import ClusterEvent._
val settings = new ClusterSettings(system.settings.config, system.name)
import settings._
import InfoLogger._
import settings._
/**
* The address including a `uid` of this cluster member.
@ -119,7 +111,6 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
*/
private[cluster] val scheduler: Scheduler = {
if (system.scheduler.maxFrequency < 1.second / SchedulerTickDuration) {
import scala.collection.JavaConverters._
logInfo("Using a dedicated scheduler for cluster. Default scheduler can be used if configured " +
"with 'akka.scheduler.tick-duration' [{} ms] <= 'akka.cluster.scheduler.tick-duration' [{} ms].",
(1000 / system.scheduler.maxFrequency).toInt, SchedulerTickDuration.toMillis)
@ -314,7 +305,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
* a certain size.
*/
def registerOnMemberUp[T](code: T): Unit =
registerOnMemberUp(new Runnable { def run = code })
registerOnMemberUp(new Runnable { def run() = code })
/**
* Java API: The supplied callback will be run, once, when current cluster member is `Up`.
@ -322,8 +313,27 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
* to defer some action, such as starting actors, until the cluster has reached
* a certain size.
*/
def registerOnMemberUp(callback: Runnable): Unit = clusterDaemons ! InternalClusterAction.AddOnMemberUpListener(callback)
def registerOnMemberUp(callback: Runnable): Unit =
clusterDaemons ! InternalClusterAction.AddOnMemberUpListener(callback)
/**
* The supplied thunk will be run, once, when current cluster member is `Removed`.
* and if the cluster have been shutdown,that thunk will run on the caller thread immediately.
* Typically used together `cluster.leave(cluster.selfAddress)` and then `system.shutdown()`.
*/
def registerOnMemberRemoved[T](code: T): Unit =
registerOnMemberRemoved(new Runnable { override def run(): Unit = code })
/**
* Java API: The supplied thunk will be run, once, when current cluster member is `Removed`.
* and if the cluster have been shutdown,that thunk will run on the caller thread immediately.
* Typically used together `cluster.leave(cluster.selfAddress)` and then `system.shutdown()`.
*/
def registerOnMemberRemoved(callback: Runnable): Unit = {
if (_isTerminated.get())
callback.run()
else
clusterDaemons ! InternalClusterAction.AddOnMemberRemovedListener(callback)
}
// ========================================================
// ===================== INTERNAL API =====================
// ========================================================

View file

@ -129,10 +129,12 @@ private[cluster] object InternalClusterAction {
/**
* Comand to [[akka.cluster.ClusterDaemon]] to create a
* [[akka.cluster.OnMemberUpListener]].
* [[akka.cluster.OnMemberStatusChangedListener]].
*/
final case class AddOnMemberUpListener(callback: Runnable) extends NoSerializationVerificationNeeded
final case class AddOnMemberRemovedListener(callback: Runnable) extends NoSerializationVerificationNeeded
sealed trait SubscriptionMessage
final case class Subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode, to: Set[Class[_]]) extends SubscriptionMessage
final case class Unsubscribe(subscriber: ActorRef, to: Option[Class[_]]) extends SubscriptionMessage
@ -165,7 +167,9 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
def receive = {
case msg: GetClusterCoreRef.type coreSupervisor forward msg
case AddOnMemberUpListener(code)
context.actorOf(Props(classOf[OnMemberUpListener], code).withDeploy(Deploy.local))
context.actorOf(Props(classOf[OnMemberStatusChangedListener], code, Up).withDeploy(Deploy.local))
case AddOnMemberRemovedListener(code)
context.actorOf(Props(classOf[OnMemberStatusChangedListener], code, Removed).withDeploy(Deploy.local))
case PublisherCreated(publisher)
if (settings.MetricsEnabled) {
// metrics must be started after core/publisher to be able
@ -1112,36 +1116,45 @@ private[cluster] final class JoinSeedNodeProcess(seedNodes: immutable.IndexedSeq
/**
* INTERNAL API
*
* The supplied callback will be run, once, when current cluster member is `Up`.
* The supplied callback will be run, once, when current cluster member come up with the same status.
*/
private[cluster] class OnMemberUpListener(callback: Runnable) extends Actor with ActorLogging {
private[cluster] class OnMemberStatusChangedListener(callback: Runnable, status: MemberStatus) extends Actor with ActorLogging {
import ClusterEvent._
val cluster = Cluster(context.system)
// subscribe to MemberUp, re-subscribe when restart
private val cluster = Cluster(context.system)
private val to = status match {
case Up
classOf[MemberUp]
case Removed
classOf[MemberRemoved]
}
override def preStart(): Unit =
cluster.subscribe(self, classOf[MemberUp])
cluster.subscribe(self, to)
override def postStop(): Unit =
cluster.unsubscribe(self)
def receive = {
case state: CurrentClusterState
if (state.members.exists(isSelfUp(_)))
if (state.members.exists(isTriggered))
done()
case MemberUp(m)
if (isSelfUp(m))
case MemberUp(member)
if (isTriggered(member))
done()
case MemberRemoved(member, _)
if (isTriggered(member))
done()
}
def done(): Unit = {
private def done(): Unit = {
try callback.run() catch {
case NonFatal(e) log.error(e, "OnMemberUp callback failed with [{}]", e.getMessage)
case NonFatal(e) log.error(e, "[{}] callback failed with [{}]", s"On${to.getSimpleName}", e.getMessage)
} finally {
context stop self
}
}
def isSelfUp(m: Member): Boolean =
m.uniqueAddress == cluster.selfUniqueAddress && m.status == MemberStatus.Up
private def isTriggered(m: Member): Boolean =
m.uniqueAddress == cluster.selfUniqueAddress && m.status == status
}

View file

@ -92,7 +92,7 @@ Joining programatically can be performed with ``Cluster.get(system).join``.
You can join to any node in the cluster. It does not have to be configured as a seed node.
Note that you can only join to an existing cluster member, which means that for bootstrapping some
node must join itself.
node must join itself,and then the following nodes could join them to make up a cluster.
You may also use ``Cluster.get(system).joinSeedNodes`` to join programmatically,
which is attractive when dynamically discovering other nodes at startup by using some external tool or API.
@ -108,8 +108,9 @@ that the last join request is retried. Retries can be disabled by setting the pr
An actor system can only join a cluster once. Additional attempts will be ignored.
When it has successfully joined it must be restarted to be able to join another
cluster or to join the same cluster again. It can use the same host name and port
after the restart, but it must have been removed from the cluster before the join
request is accepted.
after the restart, when it come up as new incarnation of existing member in the cluster,
trying to join in ,then the existing one will be removed from the cluster and then it will be allowed to join.
.. _automatic-vs-manual-downing-java:
@ -276,13 +277,25 @@ before the leader changes member status of 'Joining' members to 'Up'.
.. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/resources/factorial.conf#role-min-nr-of-members
You can start the actors in a ``registerOnMemberUp`` callback, which will
be invoked when the current member status is changed tp 'Up', i.e. the cluster
be invoked when the current member status is changed to 'Up', i.e. the cluster
has at least the defined number of members.
.. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontendMain.java#registerOnUp
This callback can be used for other things than starting actors.
You can do some clean up in a ``registerOnMemberRemoved`` callback, which will
be invoked when the current member status is changed to 'Removed' or the cluster have been shutdown,i.e.
terminate the actor system.
.. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontendMain.java#registerOnRemoved
.. note::
Register a OnMemberRemoved callback on a cluster that have been shutdown ,the callback will be invoked immediately on
the caller thread,otherwise it will be invoked later when the current member status changed to 'Removed'.You may
want to install some cleanup handling after the cluster was started up,but the cluster might already be shutting
down when you installing, and depending on the race is not healthy.
Cluster Singleton
^^^^^^^^^^^^^^^^^

View file

@ -86,7 +86,7 @@ Joining programatically can be performed with ``Cluster(system).join``.
You can join to any node in the cluster. It does not have to be configured as a seed node.
Note that you can only join to an existing cluster member, which means that for bootstrapping some
node must join itself.
node must join itself,and then the following nodes could join them to make up a cluster.
You may also use ``Cluster(system).joinSeedNodes`` to join programmatically,
which is attractive when dynamically discovering other nodes at startup by using some external tool or API.
@ -102,9 +102,9 @@ that the last join request is retried. Retries can be disabled by setting the pr
An actor system can only join a cluster once. Additional attempts will be ignored.
When it has successfully joined it must be restarted to be able to join another
cluster or to join the same cluster again. It can use the same host name and port
after the restart, but it must have been removed from the cluster before the join
request is accepted.
cluster or to join the same cluster again.It can use the same host name and port
after the restart, when it come up as new incarnation of existing member in the cluster,
trying to join in ,then the existing one will be removed from the cluster and then it will be allowed to join.
.. _automatic-vs-manual-downing-scala:
@ -271,13 +271,25 @@ before the leader changes member status of 'Joining' members to 'Up'.
.. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/resources/factorial.conf#role-min-nr-of-members
You can start the actors in a ``registerOnMemberUp`` callback, which will
be invoked when the current member status is changed tp 'Up', i.e. the cluster
be invoked when the current member status is changed to 'Up',i.e. the cluster
has at least the defined number of members.
.. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/factorial/FactorialFrontend.scala#registerOnUp
This callback can be used for other things than starting actors.
You can do some clean up in a ``registerOnMemberRemoved`` callback, which will
be invoked when the current member status is changed to 'Removed' or the cluster have been shutdown,i.e.
terminate the actor system.
.. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/factorial/FactorialFrontend.scala#registerOnRemoved
.. note::
Register a OnMemberRemoved callback on a cluster that have been shutdown ,the callback will be invoked immediately on
the caller thread,otherwise it will be invoked later when the current member status changed to 'Removed'.You may
want to install some cleanup handling after the cluster was started up,but the cluster might already be shutting
down when you installing, and depending on the race is not healthy.
Cluster Singleton
^^^^^^^^^^^^^^^^^

View file

@ -17,7 +17,7 @@ public class FactorialFrontendMain {
final ActorSystem system = ActorSystem.create("ClusterSystem", config);
system.log().info(
"Factorials will start when 2 backend members in the cluster.");
"Factorials will start when 2 backend members in the cluster.");
//#registerOnUp
Cluster.get(system).registerOnMemberUp(new Runnable() {
@Override
@ -27,6 +27,16 @@ public class FactorialFrontendMain {
}
});
//#registerOnUp
//#registerOnRemoved
Cluster.get(system).registerOnMemberRemoved(new Runnable() {
@Override
public void run() {
system.terminate();
}
});
//#registerOnRemoved
}
}

View file

@ -57,5 +57,12 @@ object FactorialFrontend {
name = "factorialFrontend")
}
//#registerOnUp
//#registerOnRemoved
Cluster(system).registerOnMemberRemoved{
system.terminate()
}
//#registerOnRemoved
}
}