diff --git a/akka-actor-tests/src/test/scala/akka/pattern/BackoffSupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/BackoffSupervisorSpec.scala index fe36e787b0..4daf26bf9b 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/BackoffSupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/BackoffSupervisorSpec.scala @@ -336,5 +336,21 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually } } + + "stop restarting the child if final stop message received (Backoff.onStop)" in { + val stopMessage = "stop" + val supervisor: ActorRef = create(onStopOptions(maxNrOfRetries = 100).withFinalStopMessage(_ == stopMessage)) + supervisor ! BackoffSupervisor.GetCurrentChild + val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get + watch(c1) + watch(supervisor) + + supervisor ! stopMessage + expectMsg("stop") + c1 ! PoisonPill + expectTerminated(c1) + expectTerminated(supervisor) + + } } } diff --git a/akka-actor/src/main/mima-filters/2.5.18.backwards.excludes b/akka-actor/src/main/mima-filters/2.5.18.backwards.excludes index db14a2879c..f4ad80040f 100644 --- a/akka-actor/src/main/mima-filters/2.5.18.backwards.excludes +++ b/akka-actor/src/main/mima-filters/2.5.18.backwards.excludes @@ -57,3 +57,18 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.TcpConnection.abort" ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.ChannelRegistration.cancel") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.io.ChannelRegistration.cancelAndClose") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.SelectionHandler#ChannelRegistryImpl.this") + +# Excludes for adding an option for backoff supervisor +# private +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffOptionsImpl.copy") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffOptionsImpl.apply") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffOptionsImpl.this") +ProblemFilters.exclude[MissingTypesProblem]("akka.pattern.BackoffOptionsImpl$") +# private +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffOnRestartSupervisor.this") +# DoNotInherit (should have been) new method +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.BackoffOptions.withFinalStopMessage") +# private[akka] +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.HandleBackoff.finalStopMessageReceived") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.HandleBackoff.finalStopMessageReceived_=") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.HandleBackoff.finalStopMessage") diff --git a/akka-actor/src/main/scala/akka/pattern/BackoffOnRestartSupervisor.scala b/akka-actor/src/main/scala/akka/pattern/BackoffOnRestartSupervisor.scala index be20bfda35..d227c0882c 100644 --- a/akka-actor/src/main/scala/akka/pattern/BackoffOnRestartSupervisor.scala +++ b/akka-actor/src/main/scala/akka/pattern/BackoffOnRestartSupervisor.scala @@ -23,7 +23,8 @@ private class BackoffOnRestartSupervisor( val reset: BackoffReset, randomFactor: Double, strategy: OneForOneStrategy, - val replyWhileStopped: Option[Any]) + val replyWhileStopped: Option[Any], + val finalStopMessage: Option[Any ⇒ Boolean]) extends Actor with HandleBackoff with ActorLogging { @@ -75,10 +76,11 @@ private class BackoffOnRestartSupervisor( } def onTerminated: Receive = { - case Terminated(child) ⇒ - log.debug(s"Terminating, because child [$child] terminated itself") + case Terminated(c) ⇒ + log.debug(s"Terminating, because child [$c] terminated itself") stop(self) } def receive = onTerminated orElse handleBackoff + } diff --git a/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala b/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala index 344f320750..c3ca29e184 100644 --- a/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala +++ b/akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala @@ -6,7 +6,8 @@ package akka.pattern import scala.concurrent.duration.{ Duration, FiniteDuration } import akka.util.JavaDurationConverters._ -import akka.actor.{ Props, OneForOneStrategy, SupervisorStrategy } +import akka.actor.{ OneForOneStrategy, Props, SupervisorStrategy } +import akka.annotation.DoNotInherit /** * Builds back-off options for creating a back-off supervisor. @@ -516,6 +517,7 @@ object Backoff { * context.actorOf(BackoffSupervisor.props(options), name) * }}} */ +@DoNotInherit trait BackoffOptions { /** * Returns a new BackoffOptions with automatic back-off reset. @@ -555,6 +557,14 @@ trait BackoffOptions { */ def withReplyWhileStopped(replyWhileStopped: Any): BackoffOptions + /** + * Predicate evaluated for each message, if it returns true and the supervised actor is + * stopped then the supervisor will stop its self. If it returns true while + * the supervised actor is running then it will be forwarded to the supervised actor and + * when the supervised actor stops its self the supervisor will stop its self. + */ + def withFinalStopMessage(isFinalStopMessage: Any ⇒ Boolean): BackoffOptions + /** * Returns a new BackoffOptions with a maximum number of retries to restart the child actor. * By default, the supervisor will retry infinitely. @@ -571,15 +581,17 @@ trait BackoffOptions { } private final case class BackoffOptionsImpl( - backoffType: BackoffType = RestartImpliesFailure, + backoffType: BackoffType = RestartImpliesFailure, childProps: Props, childName: String, minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, - reset: Option[BackoffReset] = None, - supervisorStrategy: OneForOneStrategy = OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider), - replyWhileStopped: Option[Any] = None) extends BackoffOptions { + reset: Option[BackoffReset] = None, + supervisorStrategy: OneForOneStrategy = OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider), + replyWhileStopped: Option[Any] = None, + finalStopMessage: Option[Any ⇒ Boolean] = None +) extends BackoffOptions { val backoffReset = reset.getOrElse(AutoReset(minBackoff)) @@ -589,6 +601,7 @@ private final case class BackoffOptionsImpl( def withDefaultStoppingStrategy = copy(supervisorStrategy = OneForOneStrategy(supervisorStrategy.maxNrOfRetries)(SupervisorStrategy.stoppingStrategy.decider)) def withReplyWhileStopped(replyWhileStopped: Any) = copy(replyWhileStopped = Some(replyWhileStopped)) def withMaxNrOfRetries(maxNrOfRetries: Int) = copy(supervisorStrategy = supervisorStrategy.withMaxNrOfRetries(maxNrOfRetries)) + def withFinalStopMessage(action: Any ⇒ Boolean) = copy(finalStopMessage = Some(action)) def props = { require(minBackoff > Duration.Zero, "minBackoff must be > 0") @@ -603,10 +616,10 @@ private final case class BackoffOptionsImpl( backoffType match { //onFailure method in companion object case RestartImpliesFailure ⇒ - Props(new BackoffOnRestartSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy, replyWhileStopped)) + Props(new BackoffOnRestartSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy, replyWhileStopped, finalStopMessage)) //onStop method in companion object case StopImpliesFailure ⇒ - Props(new BackoffSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy, replyWhileStopped)) + Props(new BackoffSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy, replyWhileStopped, finalStopMessage)) } } } diff --git a/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala b/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala index 460ef3f987..30317b47c8 100644 --- a/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala +++ b/akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala @@ -291,7 +291,8 @@ final class BackoffSupervisor( val reset: BackoffReset, randomFactor: Double, strategy: SupervisorStrategy, - val replyWhileStopped: Option[Any]) + val replyWhileStopped: Option[Any], + val finalStopMessage: Option[Any ⇒ Boolean]) extends Actor with HandleBackoff with ActorLogging { @@ -311,6 +312,17 @@ final class BackoffSupervisor( case s ⇒ s } + // for binary compatibility with 2.5.18 + def this( + childProps: Props, + childName: String, + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + reset: BackoffReset, + randomFactor: Double, + strategy: SupervisorStrategy, + replyWhileStopped: Option[Any]) = this(childProps, childName, minBackoff, maxBackoff, reset, randomFactor, strategy, replyWhileStopped, None) + // for binary compatibility with 2.4.1 def this( childProps: Props, @@ -319,7 +331,7 @@ final class BackoffSupervisor( maxBackoff: FiniteDuration, randomFactor: Double, supervisorStrategy: SupervisorStrategy) = - this(childProps, childName, minBackoff, maxBackoff, AutoReset(minBackoff), randomFactor, supervisorStrategy, None) + this(childProps, childName, minBackoff, maxBackoff, AutoReset(minBackoff), randomFactor, supervisorStrategy, None, None) // for binary compatibility with 2.4.0 def this( @@ -333,21 +345,26 @@ final class BackoffSupervisor( def onTerminated: Receive = { case Terminated(ref) if child.contains(ref) ⇒ child = None - val maxNrOfRetries = strategy match { - case oneForOne: OneForOneStrategy ⇒ oneForOne.maxNrOfRetries - case _ ⇒ -1 - } - - val nextRestartCount = restartCount + 1 - - if (maxNrOfRetries == -1 || nextRestartCount <= maxNrOfRetries) { - val restartDelay = calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor) - context.system.scheduler.scheduleOnce(restartDelay, self, StartChild) - restartCount = nextRestartCount - } else { - log.debug(s"Terminating on restart #{} which exceeds max allowed restarts ({})", nextRestartCount, maxNrOfRetries) + if (finalStopMessageReceived) { context.stop(self) + } else { + val maxNrOfRetries = strategy match { + case oneForOne: OneForOneStrategy ⇒ oneForOne.maxNrOfRetries + case _ ⇒ -1 + } + + val nextRestartCount = restartCount + 1 + + if (maxNrOfRetries == -1 || nextRestartCount <= maxNrOfRetries) { + val restartDelay = calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor) + context.system.scheduler.scheduleOnce(restartDelay, self, StartChild) + restartCount = nextRestartCount + } else { + log.debug(s"Terminating on restart #{} which exceeds max allowed restarts ({})", nextRestartCount, maxNrOfRetries) + context.stop(self) + } } + } def receive = onTerminated orElse handleBackoff @@ -358,9 +375,11 @@ private[akka] trait HandleBackoff { this: Actor ⇒ def childName: String def reset: BackoffReset def replyWhileStopped: Option[Any] + def finalStopMessage: Option[Any ⇒ Boolean] var child: Option[ActorRef] = None var restartCount = 0 + var finalStopMessageReceived = false import BackoffSupervisor._ import context.dispatcher @@ -404,11 +423,22 @@ private[akka] trait HandleBackoff { this: Actor ⇒ context.parent ! msg case msg ⇒ child match { - case Some(c) ⇒ c.forward(msg) - case None ⇒ replyWhileStopped match { - case Some(r) ⇒ sender ! r - case None ⇒ context.system.deadLetters.forward(msg) - } + case Some(c) ⇒ + c.forward(msg) + if (!finalStopMessageReceived && finalStopMessage.isDefined) { + finalStopMessageReceived = finalStopMessage.get.apply(msg) + } + case None ⇒ + replyWhileStopped match { + case None ⇒ context.system.deadLetters.forward(msg) + case Some(r) ⇒ sender() ! r + } + finalStopMessage match { + case None ⇒ + case Some(fsm) ⇒ + fsm(msg) + context.stop(self) + } } } } diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ProxyShardingSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ProxyShardingSpec.scala index 38216682da..75db031626 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ProxyShardingSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ProxyShardingSpec.scala @@ -22,7 +22,7 @@ object ProxyShardingSpec { class ProxyShardingSpec extends AkkaSpec(ProxyShardingSpec.config) { val role = "Shard" - val clusterSharding: ClusterSharding = ClusterSharding.get(system) + val clusterSharding: ClusterSharding = ClusterSharding(system) val shardingSettings: ClusterShardingSettings = ClusterShardingSettings.create(system) val messageExtractor = new ShardRegion.HashCodeMessageExtractor(10) { diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/SupervisionSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/SupervisionSpec.scala new file mode 100644 index 0000000000..cd1303e41e --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/SupervisionSpec.scala @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.cluster.sharding + +import akka.actor.{ Actor, ActorLogging, ActorRef, PoisonPill, Props } +import akka.cluster.Cluster +import akka.cluster.sharding.ShardRegion.Passivate +import akka.pattern.{ Backoff, BackoffSupervisor } +import akka.testkit.{ AkkaSpec, ImplicitSender } +import com.typesafe.config.ConfigFactory + +import scala.concurrent.duration._ + +object SupervisionSpec { + val config = + ConfigFactory.parseString( + """ + akka.actor.provider = "cluster" + akka.loglevel = INFO + """) + + case class Msg(id: Long, msg: Any) + case class Response(self: ActorRef) + case object StopMessage + + val idExtractor: ShardRegion.ExtractEntityId = { + case Msg(id, msg) ⇒ (id.toString, msg) + } + + val shardResolver: ShardRegion.ExtractShardId = { + case Msg(id, msg) ⇒ (id % 2).toString + } + + class PassivatingActor extends Actor with ActorLogging { + + override def preStart(): Unit = { + log.info("Starting") + } + + override def postStop(): Unit = { + log.info("Stopping") + } + + override def receive: Receive = { + case "passivate" ⇒ + log.info("Passivating") + context.parent ! Passivate(StopMessage) + // simulate another message causing a stop before the region sends the stop message + // e.g. a persistent actor having a persist failure while processing the next message + context.stop(self) + case "hello" ⇒ + sender() ! Response(self) + case StopMessage ⇒ + log.info("Received stop from region") + context.parent ! PoisonPill + } + } + +} + +class SupervisionSpec extends AkkaSpec(SupervisionSpec.config) with ImplicitSender { + + import SupervisionSpec._ + + "Supervision for a sharded actor" must { + + "allow passivation" in { + + val supervisedProps = BackoffSupervisor.props(Backoff.onStop( + Props(new PassivatingActor()), + childName = "child", + minBackoff = 1.seconds, + maxBackoff = 30.seconds, + randomFactor = 0.2, + maxNrOfRetries = -1 + ).withFinalStopMessage(_ == StopMessage)) + + Cluster(system).join(Cluster(system).selfAddress) + val region = ClusterSharding(system).start( + "passy", + supervisedProps, + ClusterShardingSettings(system), + idExtractor, + shardResolver + ) + + region ! Msg(10, "hello") + val response = expectMsgType[Response](5.seconds) + watch(response.self) + + region ! Msg(10, "passivate") + expectTerminated(response.self) + + // This would fail before as sharded actor would be stuck passivating + region ! Msg(10, "hello") + expectMsgType[Response](20.seconds) + } + } + +} diff --git a/akka-persistence-shared/src/test/scala/akka/persistence/serialization/SerializerSpec.scala b/akka-persistence-shared/src/test/scala/akka/persistence/serialization/SerializerSpec.scala index 0cb43a339a..ab3ef02e22 100644 --- a/akka-persistence-shared/src/test/scala/akka/persistence/serialization/SerializerSpec.scala +++ b/akka-persistence-shared/src/test/scala/akka/persistence/serialization/SerializerSpec.scala @@ -258,7 +258,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) { val expected = PersistentRepr(MyPayload(".a."), 13, "p1", "", true, Actor.noSender) val serializer = serialization.findSerializerFor(expected) val deserialized = serializer.fromBinary(bytes, None).asInstanceOf[PersistentRepr] - deserialized.sender should not be (null) + deserialized.sender should not be null val deserializedWithoutSender = deserialized.update(sender = Actor.noSender) deserializedWithoutSender should be(expected) } diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpecSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpecSpec.scala index d88bc8d21c..efd106965d 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpecSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpecSpec.scala @@ -47,7 +47,7 @@ class AkkaSpecSpec extends WordSpec with Matchers { "stop correctly when sending PoisonPill to rootGuardian" in { val system = ActorSystem("AkkaSpec2", AkkaSpec.testConf) - val spec = new AkkaSpec(system) {} + new AkkaSpec(system) {} val latch = new TestLatch(1)(system) system.registerOnTermination(latch.countDown()) diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index bf1541ff78..87efe666f9 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -219,7 +219,7 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA } "support receive timeout" in { - val a = TestActorRef(new ReceiveTimeoutActor(testActor)) + TestActorRef(new ReceiveTimeoutActor(testActor)) expectMsg("timeout") }