diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 106de783ae..c6154716b1 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -9,8 +9,10 @@ import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.forkjoin.ThreadLocalRandom import scala.util.control.NonFatal -import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, ReceiveTimeout, RootActorPath, Scheduler } +import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, PoisonPill, ReceiveTimeout, RootActorPath, Scheduler } +import akka.actor.OneForOneStrategy import akka.actor.Status.Failure +import akka.actor.SupervisorStrategy.Stop import akka.event.EventStream import akka.pattern.ask import akka.util.Timeout @@ -103,6 +105,8 @@ private[cluster] object InternalClusterAction { case object GetClusterCoreRef + case class PublisherCreated(publisher: ActorRef) + /** * Comand to [[akka.cluster.ClusterDaemon]] to create a * [[akka.cluster.OnMemberUpListener]]. @@ -122,8 +126,6 @@ private[cluster] object InternalClusterAction { case class PublishChanges(newGossip: Gossip) extends PublishMessage case class PublishEvent(event: ClusterDomainEvent) extends PublishMessage case object PublishStart extends PublishMessage - case object PublishDone extends PublishMessage - case object PublishDoneFinished extends PublishMessage } /** @@ -151,28 +153,61 @@ private[cluster] object ClusterLeaderAction { * Supervisor managing the different Cluster daemons. */ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Actor with ActorLogging { - + import InternalClusterAction._ // Important - don't use Cluster(context.system) here because that would // cause deadlock. The Cluster extension is currently being created and is waiting // for response from GetClusterCoreRef in its constructor. - - val publisher = context.actorOf(Props[ClusterDomainEventPublisher]. - withDispatcher(context.props.dispatcher), name = "publisher") - val core = context.actorOf(Props(new ClusterCoreDaemon(publisher)). + val coreSupervisor = context.actorOf(Props[ClusterCoreSupervisor]. withDispatcher(context.props.dispatcher), name = "core") context.actorOf(Props[ClusterHeartbeatReceiver]. withDispatcher(context.props.dispatcher), name = "heartbeatReceiver") - if (settings.MetricsEnabled) context.actorOf(Props(new ClusterMetricsCollector(publisher)). - withDispatcher(context.props.dispatcher), name = "metrics") def receive = { - case InternalClusterAction.GetClusterCoreRef ⇒ sender ! core - case InternalClusterAction.AddOnMemberUpListener(code) ⇒ + case msg @ GetClusterCoreRef ⇒ coreSupervisor forward msg + case AddOnMemberUpListener(code) ⇒ context.actorOf(Props(new OnMemberUpListener(code))) + case PublisherCreated(publisher) ⇒ + if (settings.MetricsEnabled) { + // metrics must be started after core/publisher to be able + // to inject the publisher ref to the ClusterMetricsCollector + context.actorOf(Props(new ClusterMetricsCollector(publisher)). + withDispatcher(context.props.dispatcher), name = "metrics") + } } } +/** + * INTERNAL API. + * + * ClusterCoreDaemon and ClusterDomainEventPublisher can't be restarted because the state + * would be obsolete. Shutdown the member if any those actors crashed. + */ +private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLogging { + import InternalClusterAction._ + + val publisher = context.actorOf(Props[ClusterDomainEventPublisher]. + withDispatcher(context.props.dispatcher), name = "publisher") + val coreDaemon = context.watch(context.actorOf(Props(new ClusterCoreDaemon(publisher)). + withDispatcher(context.props.dispatcher), name = "daemon")) + + context.parent ! PublisherCreated(publisher) + + override val supervisorStrategy = + OneForOneStrategy() { + case NonFatal(e) ⇒ + log.error(e, "Cluster node [{}] crashed, [{}] - shutting down...", Cluster(context.system).selfAddress, e.getMessage) + self ! PoisonPill + Stop + } + + override def postStop(): Unit = Cluster(context.system).shutdown() + + def receive = { + case InternalClusterAction.GetClusterCoreRef ⇒ sender ! coreDaemon + } +} + /** * INTERNAL API. */ @@ -196,7 +231,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto * Looks up and returns the remote cluster command connection for the specific address. */ private def clusterCore(address: Address): ActorRef = - context.actorFor(RootActorPath(address) / "system" / "cluster" / "core") + context.actorFor(RootActorPath(address) / "system" / "cluster" / "core" / "daemon") val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender]. withDispatcher(UseDispatcher), name = "heartbeatSender") @@ -381,14 +416,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto */ def removing(address: Address): Unit = { log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress) - // just cleaning up the gossip state - latestGossip = Gossip.empty - publish(latestGossip) - context.become(removed) - // make sure the final (removed) state is published - // before shutting down - implicit val timeout = Timeout(5 seconds) - publisher ? PublishDone onComplete { case _ ⇒ cluster.shutdown() } + cluster.shutdown() } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 8b0e7a4ffc..393f71cc57 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -341,6 +341,15 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto var latestConvergedGossip: Gossip = Gossip.empty var bufferedEvents: immutable.IndexedSeq[ClusterDomainEvent] = Vector.empty + override def preRestart(reason: Throwable, message: Option[Any]) { + // don't postStop when restarted, no children to stop + } + + override def postStop(): Unit = { + // publish the final removed state before shutting down + publishChanges(Gossip.empty) + } + def receive = { case PublishChanges(newGossip) ⇒ publishChanges(newGossip) case currentStats: CurrentInternalStats ⇒ publishInternalStats(currentStats) @@ -349,7 +358,6 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto case Unsubscribe(subscriber, to) ⇒ unsubscribe(subscriber, to) case PublishEvent(event) ⇒ publish(event) case PublishStart ⇒ publishStart() - case PublishDone ⇒ publishDone(sender) } def eventStream: EventStream = context.system.eventStream @@ -435,11 +443,6 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto publishCurrentClusterState(None) } - def publishDone(receiver: ActorRef): Unit = { - clearState() - receiver ! PublishDoneFinished - } - def clearState(): Unit = { latestGossip = Gossip.empty latestConvergedGossip = Gossip.empty diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala index bcd8b48c61..0f6ff1b904 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderLeavingSpec.scala @@ -59,9 +59,6 @@ abstract class LeaderLeavingSpec // verify that the LEADER is shut down awaitCond(cluster.isTerminated) - // verify that the LEADER is REMOVED - awaitCond(clusterView.status == Removed) - } else { val leavingLatch = TestLatch() diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala index 6b3bac9341..b83bcd12a8 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala @@ -51,7 +51,24 @@ abstract class MembershipChangeListenerExitingSpec } runOn(second) { + val exitingLatch = TestLatch() + val removedLatch = TestLatch() + val secondAddress = address(second) + cluster.subscribe(system.actorOf(Props(new Actor { + def receive = { + case state: CurrentClusterState ⇒ + if (state.members.exists(m ⇒ m.address == secondAddress && m.status == Exiting)) + exitingLatch.countDown() + case MemberExited(m) if m.address == secondAddress ⇒ + exitingLatch.countDown() + case MemberRemoved(m) if m.address == secondAddress ⇒ + removedLatch.countDown() + case _ ⇒ // ignore + } + })), classOf[MemberEvent]) enterBarrier("registered-listener") + exitingLatch.await + removedLatch.await } runOn(third) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala index f6d132df42..e3c5f6d1c7 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala @@ -50,9 +50,8 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec } runOn(second) { - // verify that the second node is shut down and has status REMOVED + // verify that the second node is shut down awaitCond(cluster.isTerminated, reaperWaitingTime) - awaitCond(clusterView.status == MemberStatus.Removed, reaperWaitingTime) } enterBarrier("finished") diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 242775fb40..20a5023f4a 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -91,5 +91,15 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { expectMsgClass(classOf[ClusterEvent.CurrentClusterState]) } + // this must be the last test step, since the cluster is shutdown + "publish MemberRemoved when shutdown" in { + cluster.subscribe(testActor, classOf[ClusterEvent.MemberRemoved]) + // first, is in response to the subscription + expectMsgClass(classOf[ClusterEvent.CurrentClusterState]) + + cluster.shutdown() + expectMsgType[ClusterEvent.MemberRemoved].member.address must be(selfAddress) + } + } }