supervision in AdapterClusterImpl (#27862)

* methods like cluster.join and cluster.subscribe may throw exception if called
  with invalid parameters
* the manager and subscriptions actors should survive such exceptions
This commit is contained in:
Patrik Nordwall 2019-10-03 09:02:29 +02:00 committed by Christopher Batey
parent b544359502
commit 064f06f5a6

View file

@ -4,7 +4,9 @@
package akka.cluster.typed.internal package akka.cluster.typed.internal
import akka.actor.typed.Behavior
import akka.actor.typed.Props import akka.actor.typed.Props
import akka.actor.typed.SupervisorStrategy
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.cluster.ClusterEvent.MemberEvent import akka.cluster.ClusterEvent.MemberEvent
import akka.cluster.{ ClusterEvent, Member, MemberStatus } import akka.cluster.{ ClusterEvent, Member, MemberStatus }
@ -25,8 +27,8 @@ private[akka] object AdapterClusterImpl {
private case object Up extends SeenState private case object Up extends SeenState
private case class Removed(previousStatus: MemberStatus) extends SeenState private case class Removed(previousStatus: MemberStatus) extends SeenState
private def subscriptionsBehavior(adaptedCluster: akka.cluster.Cluster) = Behaviors.setup[ClusterStateSubscription] { private def subscriptionsBehavior(adaptedCluster: akka.cluster.Cluster): Behavior[ClusterStateSubscription] =
ctx => Behaviors.setup[ClusterStateSubscription] { ctx =>
var seenState: SeenState = BeforeUp var seenState: SeenState = BeforeUp
var upSubscribers: List[ActorRef[SelfUp]] = Nil var upSubscribers: List[ActorRef[SelfUp]] = Nil
var removedSubscribers: List[ActorRef[SelfRemoved]] = Nil var removedSubscribers: List[ActorRef[SelfRemoved]] = Nil
@ -35,6 +37,7 @@ private[akka] object AdapterClusterImpl {
// important to not eagerly refer to it or we get a cycle here // important to not eagerly refer to it or we get a cycle here
lazy val cluster = Cluster(ctx.system) lazy val cluster = Cluster(ctx.system)
def onSelfMemberEvent(event: MemberEvent): Unit = { def onSelfMemberEvent(event: MemberEvent): Unit = {
event match { event match {
case ClusterEvent.MemberUp(_) => case ClusterEvent.MemberUp(_) =>
@ -54,48 +57,48 @@ private[akka] object AdapterClusterImpl {
} }
Behaviors Behaviors
.receive[AnyRef] { (ctx, msg) => .receiveMessage[AnyRef] {
msg match { case Subscribe(subscriber: ActorRef[SelfUp] @unchecked, clazz) if clazz == classOf[SelfUp] =>
case Subscribe(subscriber: ActorRef[SelfUp] @unchecked, clazz) if clazz == classOf[SelfUp] => seenState match {
seenState match { case Up => subscriber ! SelfUp(adaptedCluster.state)
case Up => subscriber ! SelfUp(adaptedCluster.state) case BeforeUp =>
case BeforeUp => ctx.watch(subscriber)
ctx.watch(subscriber) upSubscribers = subscriber :: upSubscribers
upSubscribers = subscriber :: upSubscribers case _: Removed =>
case _: Removed => // self did join, but is now no longer up, we want to avoid subscribing
// self did join, but is now no longer up, we want to avoid subscribing // to not get a memory leak, but also not signal anything
// to not get a memory leak, but also not signal anything }
} Behaviors.same
Behaviors.same
case Subscribe(subscriber: ActorRef[SelfRemoved] @unchecked, clazz) if clazz == classOf[SelfRemoved] => case Subscribe(subscriber: ActorRef[SelfRemoved] @unchecked, clazz) if clazz == classOf[SelfRemoved] =>
seenState match { seenState match {
case BeforeUp | Up => removedSubscribers = subscriber :: removedSubscribers case BeforeUp | Up => removedSubscribers = subscriber :: removedSubscribers
case Removed(s) => subscriber ! SelfRemoved(s) case Removed(s) => subscriber ! SelfRemoved(s)
} }
Behaviors.same Behaviors.same
case Subscribe(subscriber, eventClass) => case Subscribe(subscriber, eventClass) =>
adaptedCluster adaptedCluster.subscribe(
.subscribe(subscriber.toClassic, initialStateMode = ClusterEvent.initialStateAsEvents, eventClass) subscriber.toClassic,
Behaviors.same initialStateMode = ClusterEvent.initialStateAsEvents,
eventClass)
Behaviors.same
case Unsubscribe(subscriber) => case Unsubscribe(subscriber) =>
adaptedCluster.unsubscribe(subscriber.toClassic) adaptedCluster.unsubscribe(subscriber.toClassic)
Behaviors.same Behaviors.same
case GetCurrentState(sender) => case GetCurrentState(sender) =>
adaptedCluster.sendCurrentClusterState(sender.toClassic) adaptedCluster.sendCurrentClusterState(sender.toClassic)
Behaviors.same Behaviors.same
case evt: MemberEvent if evt.member.uniqueAddress == cluster.selfMember.uniqueAddress => case evt: MemberEvent if evt.member.uniqueAddress == cluster.selfMember.uniqueAddress =>
onSelfMemberEvent(evt) onSelfMemberEvent(evt)
Behaviors.same Behaviors.same
case _: MemberEvent => case _: MemberEvent =>
Behaviors.same Behaviors.same
}
} }
.receiveSignal { .receiveSignal {
@ -106,29 +109,29 @@ private[akka] object AdapterClusterImpl {
} }
.narrow[ClusterStateSubscription] .narrow[ClusterStateSubscription]
}
private def managerBehavior(adaptedCluster: akka.cluster.Cluster): Behavior[ClusterCommand] = {
Behaviors.receiveMessage {
case Join(address) =>
adaptedCluster.join(address)
Behaviors.same
case Leave(address) =>
adaptedCluster.leave(address)
Behaviors.same
case Down(address) =>
adaptedCluster.down(address)
Behaviors.same
case JoinSeedNodes(addresses) =>
adaptedCluster.joinSeedNodes(addresses)
Behaviors.same
}
} }
private def managerBehavior(adaptedCluster: akka.cluster.Cluster) =
Behaviors.receive[ClusterCommand]((_, msg) =>
msg match {
case Join(address) =>
adaptedCluster.join(address)
Behaviors.same
case Leave(address) =>
adaptedCluster.leave(address)
Behaviors.same
case Down(address) =>
adaptedCluster.down(address)
Behaviors.same
case JoinSeedNodes(addresses) =>
adaptedCluster.joinSeedNodes(addresses)
Behaviors.same
})
} }
/** /**
@ -147,9 +150,17 @@ private[akka] final class AdapterClusterImpl(system: ActorSystem[_]) extends Clu
// must not be lazy as it also updates the cached selfMember // must not be lazy as it also updates the cached selfMember
override val subscriptions: ActorRef[ClusterStateSubscription] = override val subscriptions: ActorRef[ClusterStateSubscription] =
system.internalSystemActorOf(subscriptionsBehavior(classicCluster), "clusterStateSubscriptions", Props.empty) system.internalSystemActorOf(
// resume supervision: has state that shouldn't be lost in case of failure
Behaviors.supervise(subscriptionsBehavior(classicCluster)).onFailure(SupervisorStrategy.resume),
"clusterStateSubscriptions",
Props.empty)
override lazy val manager: ActorRef[ClusterCommand] = override lazy val manager: ActorRef[ClusterCommand] =
system.internalSystemActorOf(managerBehavior(classicCluster), "clusterCommandManager", Props.empty) system.internalSystemActorOf(
// restart supervision: no state lost in case of failure
Behaviors.supervise(managerBehavior(classicCluster)).onFailure(SupervisorStrategy.restart),
"clusterCommandManager",
Props.empty)
} }