Make cluster fault handling more robust, see #3030

* ClusterCoreDaemon and ClusterDomainEventPublisher can't be restarted
  because the state would be obsolete.
* Add extra supervisor level for ClusterCoreDaemon and
  ClusterDomainEventPublisher, which will shutdown the member
  on failure in children.
* Publish the final removed state on postStop in
  ClusterDomainEventPublisher. This also simplifies the removing
  process.
This commit is contained in:
Patrik Nordwall 2013-02-11 10:40:01 +01:00
parent b002bda23f
commit cab78e5174
6 changed files with 86 additions and 32 deletions

View file

@ -9,8 +9,10 @@ import scala.collection.immutable
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, ReceiveTimeout, RootActorPath, Scheduler } import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, PoisonPill, ReceiveTimeout, RootActorPath, Scheduler }
import akka.actor.OneForOneStrategy
import akka.actor.Status.Failure import akka.actor.Status.Failure
import akka.actor.SupervisorStrategy.Stop
import akka.event.EventStream import akka.event.EventStream
import akka.pattern.ask import akka.pattern.ask
import akka.util.Timeout import akka.util.Timeout
@ -103,6 +105,8 @@ private[cluster] object InternalClusterAction {
case object GetClusterCoreRef case object GetClusterCoreRef
case class PublisherCreated(publisher: ActorRef)
/** /**
* Comand to [[akka.cluster.ClusterDaemon]] to create a * Comand to [[akka.cluster.ClusterDaemon]] to create a
* [[akka.cluster.OnMemberUpListener]]. * [[akka.cluster.OnMemberUpListener]].
@ -122,8 +126,6 @@ private[cluster] object InternalClusterAction {
case class PublishChanges(newGossip: Gossip) extends PublishMessage case class PublishChanges(newGossip: Gossip) extends PublishMessage
case class PublishEvent(event: ClusterDomainEvent) extends PublishMessage case class PublishEvent(event: ClusterDomainEvent) extends PublishMessage
case object PublishStart extends PublishMessage case object PublishStart extends PublishMessage
case object PublishDone extends PublishMessage
case object PublishDoneFinished extends PublishMessage
} }
/** /**
@ -151,28 +153,61 @@ private[cluster] object ClusterLeaderAction {
* Supervisor managing the different Cluster daemons. * Supervisor managing the different Cluster daemons.
*/ */
private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Actor with ActorLogging { private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Actor with ActorLogging {
import InternalClusterAction._
// Important - don't use Cluster(context.system) here because that would // Important - don't use Cluster(context.system) here because that would
// cause deadlock. The Cluster extension is currently being created and is waiting // cause deadlock. The Cluster extension is currently being created and is waiting
// for response from GetClusterCoreRef in its constructor. // for response from GetClusterCoreRef in its constructor.
val coreSupervisor = context.actorOf(Props[ClusterCoreSupervisor].
val publisher = context.actorOf(Props[ClusterDomainEventPublisher].
withDispatcher(context.props.dispatcher), name = "publisher")
val core = context.actorOf(Props(new ClusterCoreDaemon(publisher)).
withDispatcher(context.props.dispatcher), name = "core") withDispatcher(context.props.dispatcher), name = "core")
context.actorOf(Props[ClusterHeartbeatReceiver]. context.actorOf(Props[ClusterHeartbeatReceiver].
withDispatcher(context.props.dispatcher), name = "heartbeatReceiver") withDispatcher(context.props.dispatcher), name = "heartbeatReceiver")
if (settings.MetricsEnabled) context.actorOf(Props(new ClusterMetricsCollector(publisher)).
withDispatcher(context.props.dispatcher), name = "metrics")
def receive = { def receive = {
case InternalClusterAction.GetClusterCoreRef sender ! core case msg @ GetClusterCoreRef coreSupervisor forward msg
case InternalClusterAction.AddOnMemberUpListener(code) case AddOnMemberUpListener(code)
context.actorOf(Props(new OnMemberUpListener(code))) context.actorOf(Props(new OnMemberUpListener(code)))
case PublisherCreated(publisher)
if (settings.MetricsEnabled) {
// metrics must be started after core/publisher to be able
// to inject the publisher ref to the ClusterMetricsCollector
context.actorOf(Props(new ClusterMetricsCollector(publisher)).
withDispatcher(context.props.dispatcher), name = "metrics")
}
} }
} }
/**
* INTERNAL API.
*
* ClusterCoreDaemon and ClusterDomainEventPublisher can't be restarted because the state
* would be obsolete. Shutdown the member if any those actors crashed.
*/
private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLogging {
import InternalClusterAction._
val publisher = context.actorOf(Props[ClusterDomainEventPublisher].
withDispatcher(context.props.dispatcher), name = "publisher")
val coreDaemon = context.watch(context.actorOf(Props(new ClusterCoreDaemon(publisher)).
withDispatcher(context.props.dispatcher), name = "daemon"))
context.parent ! PublisherCreated(publisher)
override val supervisorStrategy =
OneForOneStrategy() {
case NonFatal(e)
log.error(e, "Cluster node [{}] crashed, [{}] - shutting down...", Cluster(context.system).selfAddress, e.getMessage)
self ! PoisonPill
Stop
}
override def postStop(): Unit = Cluster(context.system).shutdown()
def receive = {
case InternalClusterAction.GetClusterCoreRef sender ! coreDaemon
}
}
/** /**
* INTERNAL API. * INTERNAL API.
*/ */
@ -196,7 +231,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
* Looks up and returns the remote cluster command connection for the specific address. * Looks up and returns the remote cluster command connection for the specific address.
*/ */
private def clusterCore(address: Address): ActorRef = private def clusterCore(address: Address): ActorRef =
context.actorFor(RootActorPath(address) / "system" / "cluster" / "core") context.actorFor(RootActorPath(address) / "system" / "cluster" / "core" / "daemon")
val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender]. val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender].
withDispatcher(UseDispatcher), name = "heartbeatSender") withDispatcher(UseDispatcher), name = "heartbeatSender")
@ -381,14 +416,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
*/ */
def removing(address: Address): Unit = { def removing(address: Address): Unit = {
log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress) log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress)
// just cleaning up the gossip state cluster.shutdown()
latestGossip = Gossip.empty
publish(latestGossip)
context.become(removed)
// make sure the final (removed) state is published
// before shutting down
implicit val timeout = Timeout(5 seconds)
publisher ? PublishDone onComplete { case _ cluster.shutdown() }
} }
/** /**

View file

@ -341,6 +341,15 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
var latestConvergedGossip: Gossip = Gossip.empty var latestConvergedGossip: Gossip = Gossip.empty
var bufferedEvents: immutable.IndexedSeq[ClusterDomainEvent] = Vector.empty var bufferedEvents: immutable.IndexedSeq[ClusterDomainEvent] = Vector.empty
override def preRestart(reason: Throwable, message: Option[Any]) {
// don't postStop when restarted, no children to stop
}
override def postStop(): Unit = {
// publish the final removed state before shutting down
publishChanges(Gossip.empty)
}
def receive = { def receive = {
case PublishChanges(newGossip) publishChanges(newGossip) case PublishChanges(newGossip) publishChanges(newGossip)
case currentStats: CurrentInternalStats publishInternalStats(currentStats) case currentStats: CurrentInternalStats publishInternalStats(currentStats)
@ -349,7 +358,6 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
case Unsubscribe(subscriber, to) unsubscribe(subscriber, to) case Unsubscribe(subscriber, to) unsubscribe(subscriber, to)
case PublishEvent(event) publish(event) case PublishEvent(event) publish(event)
case PublishStart publishStart() case PublishStart publishStart()
case PublishDone publishDone(sender)
} }
def eventStream: EventStream = context.system.eventStream def eventStream: EventStream = context.system.eventStream
@ -435,11 +443,6 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
publishCurrentClusterState(None) publishCurrentClusterState(None)
} }
def publishDone(receiver: ActorRef): Unit = {
clearState()
receiver ! PublishDoneFinished
}
def clearState(): Unit = { def clearState(): Unit = {
latestGossip = Gossip.empty latestGossip = Gossip.empty
latestConvergedGossip = Gossip.empty latestConvergedGossip = Gossip.empty

View file

@ -59,9 +59,6 @@ abstract class LeaderLeavingSpec
// verify that the LEADER is shut down // verify that the LEADER is shut down
awaitCond(cluster.isTerminated) awaitCond(cluster.isTerminated)
// verify that the LEADER is REMOVED
awaitCond(clusterView.status == Removed)
} else { } else {
val leavingLatch = TestLatch() val leavingLatch = TestLatch()

View file

@ -51,7 +51,24 @@ abstract class MembershipChangeListenerExitingSpec
} }
runOn(second) { runOn(second) {
val exitingLatch = TestLatch()
val removedLatch = TestLatch()
val secondAddress = address(second)
cluster.subscribe(system.actorOf(Props(new Actor {
def receive = {
case state: CurrentClusterState
if (state.members.exists(m m.address == secondAddress && m.status == Exiting))
exitingLatch.countDown()
case MemberExited(m) if m.address == secondAddress
exitingLatch.countDown()
case MemberRemoved(m) if m.address == secondAddress
removedLatch.countDown()
case _ // ignore
}
})), classOf[MemberEvent])
enterBarrier("registered-listener") enterBarrier("registered-listener")
exitingLatch.await
removedLatch.await
} }
runOn(third) { runOn(third) {

View file

@ -50,9 +50,8 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec
} }
runOn(second) { runOn(second) {
// verify that the second node is shut down and has status REMOVED // verify that the second node is shut down
awaitCond(cluster.isTerminated, reaperWaitingTime) awaitCond(cluster.isTerminated, reaperWaitingTime)
awaitCond(clusterView.status == MemberStatus.Removed, reaperWaitingTime)
} }
enterBarrier("finished") enterBarrier("finished")

View file

@ -91,5 +91,15 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
expectMsgClass(classOf[ClusterEvent.CurrentClusterState]) expectMsgClass(classOf[ClusterEvent.CurrentClusterState])
} }
// this must be the last test step, since the cluster is shutdown
"publish MemberRemoved when shutdown" in {
cluster.subscribe(testActor, classOf[ClusterEvent.MemberRemoved])
// first, is in response to the subscription
expectMsgClass(classOf[ClusterEvent.CurrentClusterState])
cluster.shutdown()
expectMsgType[ClusterEvent.MemberRemoved].member.address must be(selfAddress)
}
} }
} }