From 064f06f5a62d812037c041cb39c18988a66465a4 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 3 Oct 2019 09:02:29 +0200 Subject: [PATCH] supervision in AdapterClusterImpl (#27862) * methods like cluster.join and cluster.subscribe may throw exception if called with invalid parameters * the manager and subscriptions actors should survive such exceptions --- .../typed/internal/AdaptedClusterImpl.scala | 131 ++++++++++-------- 1 file changed, 71 insertions(+), 60 deletions(-) diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterImpl.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterImpl.scala index e59d6aaffd..e4563a514b 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterImpl.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterImpl.scala @@ -4,7 +4,9 @@ package akka.cluster.typed.internal +import akka.actor.typed.Behavior import akka.actor.typed.Props +import akka.actor.typed.SupervisorStrategy import akka.annotation.InternalApi import akka.cluster.ClusterEvent.MemberEvent import akka.cluster.{ ClusterEvent, Member, MemberStatus } @@ -25,8 +27,8 @@ private[akka] object AdapterClusterImpl { private case object Up extends SeenState private case class Removed(previousStatus: MemberStatus) extends SeenState - private def subscriptionsBehavior(adaptedCluster: akka.cluster.Cluster) = Behaviors.setup[ClusterStateSubscription] { - ctx => + private def subscriptionsBehavior(adaptedCluster: akka.cluster.Cluster): Behavior[ClusterStateSubscription] = + Behaviors.setup[ClusterStateSubscription] { ctx => var seenState: SeenState = BeforeUp var upSubscribers: List[ActorRef[SelfUp]] = Nil var removedSubscribers: List[ActorRef[SelfRemoved]] = Nil @@ -35,6 +37,7 @@ private[akka] object AdapterClusterImpl { // important to not eagerly refer to it or we get a cycle here lazy val cluster = Cluster(ctx.system) + def onSelfMemberEvent(event: MemberEvent): Unit = { event match { case ClusterEvent.MemberUp(_) => @@ -54,48 +57,48 @@ private[akka] object AdapterClusterImpl { } Behaviors - .receive[AnyRef] { (ctx, msg) => - msg match { - case Subscribe(subscriber: ActorRef[SelfUp] @unchecked, clazz) if clazz == classOf[SelfUp] => - seenState match { - case Up => subscriber ! SelfUp(adaptedCluster.state) - case BeforeUp => - ctx.watch(subscriber) - upSubscribers = subscriber :: upSubscribers - case _: Removed => - // self did join, but is now no longer up, we want to avoid subscribing - // to not get a memory leak, but also not signal anything - } - Behaviors.same + .receiveMessage[AnyRef] { + case Subscribe(subscriber: ActorRef[SelfUp] @unchecked, clazz) if clazz == classOf[SelfUp] => + seenState match { + case Up => subscriber ! SelfUp(adaptedCluster.state) + case BeforeUp => + ctx.watch(subscriber) + upSubscribers = subscriber :: upSubscribers + case _: Removed => + // self did join, but is now no longer up, we want to avoid subscribing + // to not get a memory leak, but also not signal anything + } + Behaviors.same - case Subscribe(subscriber: ActorRef[SelfRemoved] @unchecked, clazz) if clazz == classOf[SelfRemoved] => - seenState match { - case BeforeUp | Up => removedSubscribers = subscriber :: removedSubscribers - case Removed(s) => subscriber ! SelfRemoved(s) - } - Behaviors.same + case Subscribe(subscriber: ActorRef[SelfRemoved] @unchecked, clazz) if clazz == classOf[SelfRemoved] => + seenState match { + case BeforeUp | Up => removedSubscribers = subscriber :: removedSubscribers + case Removed(s) => subscriber ! SelfRemoved(s) + } + Behaviors.same - case Subscribe(subscriber, eventClass) => - adaptedCluster - .subscribe(subscriber.toClassic, initialStateMode = ClusterEvent.initialStateAsEvents, eventClass) - Behaviors.same + case Subscribe(subscriber, eventClass) => + adaptedCluster.subscribe( + subscriber.toClassic, + initialStateMode = ClusterEvent.initialStateAsEvents, + eventClass) + Behaviors.same - case Unsubscribe(subscriber) => - adaptedCluster.unsubscribe(subscriber.toClassic) - Behaviors.same + case Unsubscribe(subscriber) => + adaptedCluster.unsubscribe(subscriber.toClassic) + Behaviors.same - case GetCurrentState(sender) => - adaptedCluster.sendCurrentClusterState(sender.toClassic) - Behaviors.same + case GetCurrentState(sender) => + adaptedCluster.sendCurrentClusterState(sender.toClassic) + Behaviors.same - case evt: MemberEvent if evt.member.uniqueAddress == cluster.selfMember.uniqueAddress => - onSelfMemberEvent(evt) - Behaviors.same + case evt: MemberEvent if evt.member.uniqueAddress == cluster.selfMember.uniqueAddress => + onSelfMemberEvent(evt) + Behaviors.same - case _: MemberEvent => - Behaviors.same + case _: MemberEvent => + Behaviors.same - } } .receiveSignal { @@ -106,29 +109,29 @@ private[akka] object AdapterClusterImpl { } .narrow[ClusterStateSubscription] + } + + private def managerBehavior(adaptedCluster: akka.cluster.Cluster): Behavior[ClusterCommand] = { + Behaviors.receiveMessage { + case Join(address) => + adaptedCluster.join(address) + Behaviors.same + + case Leave(address) => + adaptedCluster.leave(address) + Behaviors.same + + case Down(address) => + adaptedCluster.down(address) + Behaviors.same + + case JoinSeedNodes(addresses) => + adaptedCluster.joinSeedNodes(addresses) + Behaviors.same + + } } - private def managerBehavior(adaptedCluster: akka.cluster.Cluster) = - Behaviors.receive[ClusterCommand]((_, msg) => - msg match { - case Join(address) => - adaptedCluster.join(address) - Behaviors.same - - case Leave(address) => - adaptedCluster.leave(address) - Behaviors.same - - case Down(address) => - adaptedCluster.down(address) - Behaviors.same - - case JoinSeedNodes(addresses) => - adaptedCluster.joinSeedNodes(addresses) - Behaviors.same - - }) - } /** @@ -147,9 +150,17 @@ private[akka] final class AdapterClusterImpl(system: ActorSystem[_]) extends Clu // must not be lazy as it also updates the cached selfMember override val subscriptions: ActorRef[ClusterStateSubscription] = - system.internalSystemActorOf(subscriptionsBehavior(classicCluster), "clusterStateSubscriptions", Props.empty) + system.internalSystemActorOf( + // resume supervision: has state that shouldn't be lost in case of failure + Behaviors.supervise(subscriptionsBehavior(classicCluster)).onFailure(SupervisorStrategy.resume), + "clusterStateSubscriptions", + Props.empty) override lazy val manager: ActorRef[ClusterCommand] = - system.internalSystemActorOf(managerBehavior(classicCluster), "clusterCommandManager", Props.empty) + system.internalSystemActorOf( + // restart supervision: no state lost in case of failure + Behaviors.supervise(managerBehavior(classicCluster)).onFailure(SupervisorStrategy.restart), + "clusterCommandManager", + Props.empty) }