Merge pull request #1122 from akka/wip-3030-cluster-fault-handling-patriknw
Make cluster fault handling more robust, see #3030
This commit is contained in:
commit
33bc2ed5f6
9 changed files with 114 additions and 40 deletions
|
|
@ -9,8 +9,11 @@ import scala.collection.immutable
|
|||
import scala.concurrent.duration._
|
||||
import scala.concurrent.forkjoin.ThreadLocalRandom
|
||||
import scala.util.control.NonFatal
|
||||
import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, ReceiveTimeout, RootActorPath, Scheduler }
|
||||
import java.util.UUID
|
||||
import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, PoisonPill, ReceiveTimeout, RootActorPath, Scheduler }
|
||||
import akka.actor.OneForOneStrategy
|
||||
import akka.actor.Status.Failure
|
||||
import akka.actor.SupervisorStrategy.Stop
|
||||
import akka.event.EventStream
|
||||
import akka.pattern.ask
|
||||
import akka.util.Timeout
|
||||
|
|
@ -103,6 +106,8 @@ private[cluster] object InternalClusterAction {
|
|||
|
||||
case object GetClusterCoreRef
|
||||
|
||||
case class PublisherCreated(publisher: ActorRef)
|
||||
|
||||
/**
|
||||
* Comand to [[akka.cluster.ClusterDaemon]] to create a
|
||||
* [[akka.cluster.OnMemberUpListener]].
|
||||
|
|
@ -122,8 +127,6 @@ private[cluster] object InternalClusterAction {
|
|||
case class PublishChanges(newGossip: Gossip) extends PublishMessage
|
||||
case class PublishEvent(event: ClusterDomainEvent) extends PublishMessage
|
||||
case object PublishStart extends PublishMessage
|
||||
case object PublishDone extends PublishMessage
|
||||
case object PublishDoneFinished extends PublishMessage
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -151,28 +154,61 @@ private[cluster] object ClusterLeaderAction {
|
|||
* Supervisor managing the different Cluster daemons.
|
||||
*/
|
||||
private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Actor with ActorLogging {
|
||||
|
||||
import InternalClusterAction._
|
||||
// Important - don't use Cluster(context.system) here because that would
|
||||
// cause deadlock. The Cluster extension is currently being created and is waiting
|
||||
// for response from GetClusterCoreRef in its constructor.
|
||||
|
||||
val publisher = context.actorOf(Props[ClusterDomainEventPublisher].
|
||||
withDispatcher(context.props.dispatcher), name = "publisher")
|
||||
val core = context.actorOf(Props(new ClusterCoreDaemon(publisher)).
|
||||
val coreSupervisor = context.actorOf(Props[ClusterCoreSupervisor].
|
||||
withDispatcher(context.props.dispatcher), name = "core")
|
||||
context.actorOf(Props[ClusterHeartbeatReceiver].
|
||||
withDispatcher(context.props.dispatcher), name = "heartbeatReceiver")
|
||||
if (settings.MetricsEnabled) context.actorOf(Props(new ClusterMetricsCollector(publisher)).
|
||||
withDispatcher(context.props.dispatcher), name = "metrics")
|
||||
|
||||
def receive = {
|
||||
case InternalClusterAction.GetClusterCoreRef ⇒ sender ! core
|
||||
case InternalClusterAction.AddOnMemberUpListener(code) ⇒
|
||||
case msg @ GetClusterCoreRef ⇒ coreSupervisor forward msg
|
||||
case AddOnMemberUpListener(code) ⇒
|
||||
context.actorOf(Props(new OnMemberUpListener(code)))
|
||||
case PublisherCreated(publisher) ⇒
|
||||
if (settings.MetricsEnabled) {
|
||||
// metrics must be started after core/publisher to be able
|
||||
// to inject the publisher ref to the ClusterMetricsCollector
|
||||
context.actorOf(Props(new ClusterMetricsCollector(publisher)).
|
||||
withDispatcher(context.props.dispatcher), name = "metrics")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*
|
||||
* ClusterCoreDaemon and ClusterDomainEventPublisher can't be restarted because the state
|
||||
* would be obsolete. Shutdown the member if any those actors crashed.
|
||||
*/
|
||||
private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLogging {
|
||||
import InternalClusterAction._
|
||||
|
||||
val publisher = context.actorOf(Props[ClusterDomainEventPublisher].
|
||||
withDispatcher(context.props.dispatcher), name = "publisher")
|
||||
val coreDaemon = context.watch(context.actorOf(Props(new ClusterCoreDaemon(publisher)).
|
||||
withDispatcher(context.props.dispatcher), name = "daemon"))
|
||||
|
||||
context.parent ! PublisherCreated(publisher)
|
||||
|
||||
override val supervisorStrategy =
|
||||
OneForOneStrategy() {
|
||||
case NonFatal(e) ⇒
|
||||
log.error(e, "Cluster node [{}] crashed, [{}] - shutting down...", Cluster(context.system).selfAddress, e.getMessage)
|
||||
self ! PoisonPill
|
||||
Stop
|
||||
}
|
||||
|
||||
override def postStop(): Unit = Cluster(context.system).shutdown()
|
||||
|
||||
def receive = {
|
||||
case InternalClusterAction.GetClusterCoreRef ⇒ sender ! coreDaemon
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*/
|
||||
|
|
@ -184,7 +220,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
import cluster.{ selfAddress, scheduler, failureDetector }
|
||||
import cluster.settings._
|
||||
|
||||
val vclockNode = VectorClock.Node(selfAddress.toString)
|
||||
// FIXME the UUID should not be needed when Address contains uid, ticket #2788
|
||||
val vclockNode = VectorClock.Node(selfAddress.toString + "-" + UUID.randomUUID())
|
||||
|
||||
// note that self is not initially member,
|
||||
// and the Gossip is not versioned for this 'Node' yet
|
||||
|
|
@ -196,7 +233,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
* Looks up and returns the remote cluster command connection for the specific address.
|
||||
*/
|
||||
private def clusterCore(address: Address): ActorRef =
|
||||
context.actorFor(RootActorPath(address) / "system" / "cluster" / "core")
|
||||
context.actorFor(RootActorPath(address) / "system" / "cluster" / "core" / "daemon")
|
||||
|
||||
val heartbeatSender = context.actorOf(Props[ClusterHeartbeatSender].
|
||||
withDispatcher(UseDispatcher), name = "heartbeatSender")
|
||||
|
|
@ -381,14 +418,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
*/
|
||||
def removing(address: Address): Unit = {
|
||||
log.info("Cluster Node [{}] - Node has been REMOVED by the leader - shutting down...", selfAddress)
|
||||
// just cleaning up the gossip state
|
||||
latestGossip = Gossip.empty
|
||||
publish(latestGossip)
|
||||
context.become(removed)
|
||||
// make sure the final (removed) state is published
|
||||
// before shutting down
|
||||
implicit val timeout = Timeout(5 seconds)
|
||||
publisher ? PublishDone onComplete { case _ ⇒ cluster.shutdown() }
|
||||
cluster.shutdown()
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -479,10 +509,10 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
val localGossip = latestGossip
|
||||
|
||||
if (remoteGossip.overview.unreachable.exists(_.address == selfAddress)) {
|
||||
// FIXME how should we handle this situation?
|
||||
log.debug("Received gossip with self as unreachable, from [{}]", from)
|
||||
|
||||
} else if (!localGossip.overview.isNonDownUnreachable(from)) {
|
||||
log.debug("Ignoring received gossip with self [{}] as unreachable, from [{}]", selfAddress, from)
|
||||
} else if (localGossip.overview.isNonDownUnreachable(from)) {
|
||||
log.debug("Ignoring received gossip from unreachable [{}] ", from)
|
||||
} else {
|
||||
|
||||
// leader handles merge conflicts, or when they have different views of how is leader
|
||||
val handleMerge = localGossip.leader == Some(selfAddress) || localGossip.leader != remoteGossip.leader
|
||||
|
|
@ -802,7 +832,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = false))
|
||||
|
||||
def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit =
|
||||
if (address != selfAddress) clusterCore(address) ! gossipMsg
|
||||
if (address != selfAddress && gossipMsg.gossip.members.exists(_.address == address))
|
||||
clusterCore(address) ! gossipMsg
|
||||
|
||||
def publish(newGossip: Gossip): Unit = {
|
||||
publisher ! PublishChanges(newGossip)
|
||||
|
|
|
|||
|
|
@ -341,6 +341,15 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
|||
var latestConvergedGossip: Gossip = Gossip.empty
|
||||
var bufferedEvents: immutable.IndexedSeq[ClusterDomainEvent] = Vector.empty
|
||||
|
||||
override def preRestart(reason: Throwable, message: Option[Any]) {
|
||||
// don't postStop when restarted, no children to stop
|
||||
}
|
||||
|
||||
override def postStop(): Unit = {
|
||||
// publish the final removed state before shutting down
|
||||
publishChanges(Gossip.empty)
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case PublishChanges(newGossip) ⇒ publishChanges(newGossip)
|
||||
case currentStats: CurrentInternalStats ⇒ publishInternalStats(currentStats)
|
||||
|
|
@ -349,7 +358,6 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
|||
case Unsubscribe(subscriber, to) ⇒ unsubscribe(subscriber, to)
|
||||
case PublishEvent(event) ⇒ publish(event)
|
||||
case PublishStart ⇒ publishStart()
|
||||
case PublishDone ⇒ publishDone(sender)
|
||||
}
|
||||
|
||||
def eventStream: EventStream = context.system.eventStream
|
||||
|
|
@ -435,11 +443,6 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
|
|||
publishCurrentClusterState(None)
|
||||
}
|
||||
|
||||
def publishDone(receiver: ActorRef): Unit = {
|
||||
clearState()
|
||||
receiver ! PublishDoneFinished
|
||||
}
|
||||
|
||||
def clearState(): Unit = {
|
||||
latestGossip = Gossip.empty
|
||||
latestConvergedGossip = Gossip.empty
|
||||
|
|
|
|||
|
|
@ -58,7 +58,8 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
|
|||
state = state.copy(members = state.members - member, unreachable = state.unreachable - member + member)
|
||||
case event: MemberEvent ⇒
|
||||
// replace current member with new member (might have different status, only address is used in equals)
|
||||
state = state.copy(members = state.members - event.member + event.member)
|
||||
state = state.copy(members = state.members - event.member + event.member,
|
||||
unreachable = state.unreachable - event.member)
|
||||
case LeaderChanged(leader) ⇒ state = state.copy(leader = leader)
|
||||
case s: CurrentClusterState ⇒ state = s
|
||||
case CurrentInternalStats(stats) ⇒ _latestStats = stats
|
||||
|
|
|
|||
|
|
@ -123,7 +123,12 @@ private[cluster] case class Gossip(
|
|||
val mergedVClock = this.version merge that.version
|
||||
|
||||
// 2. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups
|
||||
val mergedUnreachable = Member.pickHighestPriority(this.overview.unreachable, that.overview.unreachable)
|
||||
// FIXME allowing Down -> Joining should be adjusted as part of ticket #2788
|
||||
val mergedUnreachable = Member.pickHighestPriority(
|
||||
this.overview.unreachable.filterNot(m1 ⇒
|
||||
m1.status == Down && that.members.exists(m2 ⇒ m2.status == Joining && m2.address == m1.address)),
|
||||
that.overview.unreachable.filterNot(m1 ⇒
|
||||
m1.status == Down && this.members.exists(m2 ⇒ m2.status == Joining && m2.address == m1.address)))
|
||||
|
||||
// 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups,
|
||||
// and exclude unreachable
|
||||
|
|
|
|||
|
|
@ -59,9 +59,6 @@ abstract class LeaderLeavingSpec
|
|||
// verify that the LEADER is shut down
|
||||
awaitCond(cluster.isTerminated)
|
||||
|
||||
// verify that the LEADER is REMOVED
|
||||
awaitCond(clusterView.status == Removed)
|
||||
|
||||
} else {
|
||||
|
||||
val leavingLatch = TestLatch()
|
||||
|
|
|
|||
|
|
@ -51,7 +51,24 @@ abstract class MembershipChangeListenerExitingSpec
|
|||
}
|
||||
|
||||
runOn(second) {
|
||||
val exitingLatch = TestLatch()
|
||||
val removedLatch = TestLatch()
|
||||
val secondAddress = address(second)
|
||||
cluster.subscribe(system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case state: CurrentClusterState ⇒
|
||||
if (state.members.exists(m ⇒ m.address == secondAddress && m.status == Exiting))
|
||||
exitingLatch.countDown()
|
||||
case MemberExited(m) if m.address == secondAddress ⇒
|
||||
exitingLatch.countDown()
|
||||
case MemberRemoved(m) if m.address == secondAddress ⇒
|
||||
removedLatch.countDown()
|
||||
case _ ⇒ // ignore
|
||||
}
|
||||
})), classOf[MemberEvent])
|
||||
enterBarrier("registered-listener")
|
||||
exitingLatch.await
|
||||
removedLatch.await
|
||||
}
|
||||
|
||||
runOn(third) {
|
||||
|
|
|
|||
|
|
@ -50,9 +50,8 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec
|
|||
}
|
||||
|
||||
runOn(second) {
|
||||
// verify that the second node is shut down and has status REMOVED
|
||||
// verify that the second node is shut down
|
||||
awaitCond(cluster.isTerminated, reaperWaitingTime)
|
||||
awaitCond(clusterView.status == MemberStatus.Removed, reaperWaitingTime)
|
||||
}
|
||||
|
||||
enterBarrier("finished")
|
||||
|
|
|
|||
|
|
@ -91,5 +91,15 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
|
|||
expectMsgClass(classOf[ClusterEvent.CurrentClusterState])
|
||||
}
|
||||
|
||||
// this must be the last test step, since the cluster is shutdown
|
||||
"publish MemberRemoved when shutdown" in {
|
||||
cluster.subscribe(testActor, classOf[ClusterEvent.MemberRemoved])
|
||||
// first, is in response to the subscription
|
||||
expectMsgClass(classOf[ClusterEvent.CurrentClusterState])
|
||||
|
||||
cluster.shutdown()
|
||||
expectMsgType[ClusterEvent.MemberRemoved].member.address must be(selfAddress)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ class GossipSpec extends WordSpec with MustMatchers {
|
|||
val d2 = Member(Address("akka.tcp", "sys", "d", 2552), Removed)
|
||||
val e1 = Member(Address("akka.tcp", "sys", "e", 2552), Joining)
|
||||
val e2 = Member(Address("akka.tcp", "sys", "e", 2552), Up)
|
||||
val e3 = Member(Address("akka.tcp", "sys", "e", 2552), Down)
|
||||
|
||||
"A Gossip" must {
|
||||
|
||||
|
|
@ -78,6 +79,16 @@ class GossipSpec extends WordSpec with MustMatchers {
|
|||
|
||||
}
|
||||
|
||||
"merge by allowing Down -> Joining" in {
|
||||
val g1 = Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(e3)))
|
||||
val g2 = Gossip(members = SortedSet(a1, b1, e1), overview = GossipOverview(unreachable = Set.empty))
|
||||
|
||||
val merged2 = g2 merge g1
|
||||
merged2.members must be(SortedSet(a1, b1, e1))
|
||||
merged2.members.toSeq.map(_.status) must be(Seq(Up, Up, Joining))
|
||||
merged2.overview.unreachable must be(Set.empty)
|
||||
}
|
||||
|
||||
"start with fresh seen table after merge" in {
|
||||
val g1 = Gossip(members = SortedSet(a1, e1)).seen(a1.address).seen(a1.address)
|
||||
val g2 = Gossip(members = SortedSet(a2, e2)).seen(e2.address).seen(e2.address)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue