Support ClusterSharding passivation in BackoffSupervisor (#25933)

* Specific final stop message predicate instead of arbitrary action
This commit is contained in:
Christopher Batey 2018-12-06 10:10:37 +00:00 committed by GitHub
parent 8b8c7355bf
commit e5c1fc02a9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 212 additions and 34 deletions

View file

@ -336,5 +336,21 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually
}
}
"stop restarting the child if final stop message received (Backoff.onStop)" in {
val stopMessage = "stop"
val supervisor: ActorRef = create(onStopOptions(maxNrOfRetries = 100).withFinalStopMessage(_ == stopMessage))
supervisor ! BackoffSupervisor.GetCurrentChild
val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get
watch(c1)
watch(supervisor)
supervisor ! stopMessage
expectMsg("stop")
c1 ! PoisonPill
expectTerminated(c1)
expectTerminated(supervisor)
}
}
}

View file

@ -57,3 +57,18 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.TcpConnection.abort"
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.ChannelRegistration.cancel")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.io.ChannelRegistration.cancelAndClose")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.SelectionHandler#ChannelRegistryImpl.this")
# Excludes for adding an option for backoff supervisor
# private
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffOptionsImpl.copy")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffOptionsImpl.apply")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffOptionsImpl.this")
ProblemFilters.exclude[MissingTypesProblem]("akka.pattern.BackoffOptionsImpl$")
# private
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffOnRestartSupervisor.this")
# DoNotInherit (should have been) new method
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.BackoffOptions.withFinalStopMessage")
# private[akka]
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.HandleBackoff.finalStopMessageReceived")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.HandleBackoff.finalStopMessageReceived_=")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.pattern.HandleBackoff.finalStopMessage")

View file

@ -23,7 +23,8 @@ private class BackoffOnRestartSupervisor(
val reset: BackoffReset,
randomFactor: Double,
strategy: OneForOneStrategy,
val replyWhileStopped: Option[Any])
val replyWhileStopped: Option[Any],
val finalStopMessage: Option[Any Boolean])
extends Actor with HandleBackoff
with ActorLogging {
@ -75,10 +76,11 @@ private class BackoffOnRestartSupervisor(
}
def onTerminated: Receive = {
case Terminated(child)
log.debug(s"Terminating, because child [$child] terminated itself")
case Terminated(c)
log.debug(s"Terminating, because child [$c] terminated itself")
stop(self)
}
def receive = onTerminated orElse handleBackoff
}

View file

@ -6,7 +6,8 @@ package akka.pattern
import scala.concurrent.duration.{ Duration, FiniteDuration }
import akka.util.JavaDurationConverters._
import akka.actor.{ Props, OneForOneStrategy, SupervisorStrategy }
import akka.actor.{ OneForOneStrategy, Props, SupervisorStrategy }
import akka.annotation.DoNotInherit
/**
* Builds back-off options for creating a back-off supervisor.
@ -516,6 +517,7 @@ object Backoff {
* context.actorOf(BackoffSupervisor.props(options), name)
* }}}
*/
@DoNotInherit
trait BackoffOptions {
/**
* Returns a new BackoffOptions with automatic back-off reset.
@ -555,6 +557,14 @@ trait BackoffOptions {
*/
def withReplyWhileStopped(replyWhileStopped: Any): BackoffOptions
/**
* Predicate evaluated for each message, if it returns true and the supervised actor is
* stopped then the supervisor will stop its self. If it returns true while
* the supervised actor is running then it will be forwarded to the supervised actor and
* when the supervised actor stops its self the supervisor will stop its self.
*/
def withFinalStopMessage(isFinalStopMessage: Any Boolean): BackoffOptions
/**
* Returns a new BackoffOptions with a maximum number of retries to restart the child actor.
* By default, the supervisor will retry infinitely.
@ -571,15 +581,17 @@ trait BackoffOptions {
}
private final case class BackoffOptionsImpl(
backoffType: BackoffType = RestartImpliesFailure,
backoffType: BackoffType = RestartImpliesFailure,
childProps: Props,
childName: String,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
reset: Option[BackoffReset] = None,
supervisorStrategy: OneForOneStrategy = OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider),
replyWhileStopped: Option[Any] = None) extends BackoffOptions {
reset: Option[BackoffReset] = None,
supervisorStrategy: OneForOneStrategy = OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider),
replyWhileStopped: Option[Any] = None,
finalStopMessage: Option[Any Boolean] = None
) extends BackoffOptions {
val backoffReset = reset.getOrElse(AutoReset(minBackoff))
@ -589,6 +601,7 @@ private final case class BackoffOptionsImpl(
def withDefaultStoppingStrategy = copy(supervisorStrategy = OneForOneStrategy(supervisorStrategy.maxNrOfRetries)(SupervisorStrategy.stoppingStrategy.decider))
def withReplyWhileStopped(replyWhileStopped: Any) = copy(replyWhileStopped = Some(replyWhileStopped))
def withMaxNrOfRetries(maxNrOfRetries: Int) = copy(supervisorStrategy = supervisorStrategy.withMaxNrOfRetries(maxNrOfRetries))
def withFinalStopMessage(action: Any Boolean) = copy(finalStopMessage = Some(action))
def props = {
require(minBackoff > Duration.Zero, "minBackoff must be > 0")
@ -603,10 +616,10 @@ private final case class BackoffOptionsImpl(
backoffType match {
//onFailure method in companion object
case RestartImpliesFailure
Props(new BackoffOnRestartSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy, replyWhileStopped))
Props(new BackoffOnRestartSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy, replyWhileStopped, finalStopMessage))
//onStop method in companion object
case StopImpliesFailure
Props(new BackoffSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy, replyWhileStopped))
Props(new BackoffSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy, replyWhileStopped, finalStopMessage))
}
}
}

View file

@ -291,7 +291,8 @@ final class BackoffSupervisor(
val reset: BackoffReset,
randomFactor: Double,
strategy: SupervisorStrategy,
val replyWhileStopped: Option[Any])
val replyWhileStopped: Option[Any],
val finalStopMessage: Option[Any Boolean])
extends Actor with HandleBackoff
with ActorLogging {
@ -311,6 +312,17 @@ final class BackoffSupervisor(
case s s
}
// for binary compatibility with 2.5.18
def this(
childProps: Props,
childName: String,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
reset: BackoffReset,
randomFactor: Double,
strategy: SupervisorStrategy,
replyWhileStopped: Option[Any]) = this(childProps, childName, minBackoff, maxBackoff, reset, randomFactor, strategy, replyWhileStopped, None)
// for binary compatibility with 2.4.1
def this(
childProps: Props,
@ -319,7 +331,7 @@ final class BackoffSupervisor(
maxBackoff: FiniteDuration,
randomFactor: Double,
supervisorStrategy: SupervisorStrategy) =
this(childProps, childName, minBackoff, maxBackoff, AutoReset(minBackoff), randomFactor, supervisorStrategy, None)
this(childProps, childName, minBackoff, maxBackoff, AutoReset(minBackoff), randomFactor, supervisorStrategy, None, None)
// for binary compatibility with 2.4.0
def this(
@ -333,21 +345,26 @@ final class BackoffSupervisor(
def onTerminated: Receive = {
case Terminated(ref) if child.contains(ref)
child = None
val maxNrOfRetries = strategy match {
case oneForOne: OneForOneStrategy oneForOne.maxNrOfRetries
case _ -1
}
val nextRestartCount = restartCount + 1
if (maxNrOfRetries == -1 || nextRestartCount <= maxNrOfRetries) {
val restartDelay = calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor)
context.system.scheduler.scheduleOnce(restartDelay, self, StartChild)
restartCount = nextRestartCount
} else {
log.debug(s"Terminating on restart #{} which exceeds max allowed restarts ({})", nextRestartCount, maxNrOfRetries)
if (finalStopMessageReceived) {
context.stop(self)
} else {
val maxNrOfRetries = strategy match {
case oneForOne: OneForOneStrategy oneForOne.maxNrOfRetries
case _ -1
}
val nextRestartCount = restartCount + 1
if (maxNrOfRetries == -1 || nextRestartCount <= maxNrOfRetries) {
val restartDelay = calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor)
context.system.scheduler.scheduleOnce(restartDelay, self, StartChild)
restartCount = nextRestartCount
} else {
log.debug(s"Terminating on restart #{} which exceeds max allowed restarts ({})", nextRestartCount, maxNrOfRetries)
context.stop(self)
}
}
}
def receive = onTerminated orElse handleBackoff
@ -358,9 +375,11 @@ private[akka] trait HandleBackoff { this: Actor ⇒
def childName: String
def reset: BackoffReset
def replyWhileStopped: Option[Any]
def finalStopMessage: Option[Any Boolean]
var child: Option[ActorRef] = None
var restartCount = 0
var finalStopMessageReceived = false
import BackoffSupervisor._
import context.dispatcher
@ -404,11 +423,22 @@ private[akka] trait HandleBackoff { this: Actor ⇒
context.parent ! msg
case msg child match {
case Some(c) c.forward(msg)
case None replyWhileStopped match {
case Some(r) sender ! r
case None context.system.deadLetters.forward(msg)
}
case Some(c)
c.forward(msg)
if (!finalStopMessageReceived && finalStopMessage.isDefined) {
finalStopMessageReceived = finalStopMessage.get.apply(msg)
}
case None
replyWhileStopped match {
case None context.system.deadLetters.forward(msg)
case Some(r) sender() ! r
}
finalStopMessage match {
case None
case Some(fsm)
fsm(msg)
context.stop(self)
}
}
}
}

View file

@ -22,7 +22,7 @@ object ProxyShardingSpec {
class ProxyShardingSpec extends AkkaSpec(ProxyShardingSpec.config) {
val role = "Shard"
val clusterSharding: ClusterSharding = ClusterSharding.get(system)
val clusterSharding: ClusterSharding = ClusterSharding(system)
val shardingSettings: ClusterShardingSettings =
ClusterShardingSettings.create(system)
val messageExtractor = new ShardRegion.HashCodeMessageExtractor(10) {

View file

@ -0,0 +1,102 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import akka.actor.{ Actor, ActorLogging, ActorRef, PoisonPill, Props }
import akka.cluster.Cluster
import akka.cluster.sharding.ShardRegion.Passivate
import akka.pattern.{ Backoff, BackoffSupervisor }
import akka.testkit.{ AkkaSpec, ImplicitSender }
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
object SupervisionSpec {
val config =
ConfigFactory.parseString(
"""
akka.actor.provider = "cluster"
akka.loglevel = INFO
""")
case class Msg(id: Long, msg: Any)
case class Response(self: ActorRef)
case object StopMessage
val idExtractor: ShardRegion.ExtractEntityId = {
case Msg(id, msg) (id.toString, msg)
}
val shardResolver: ShardRegion.ExtractShardId = {
case Msg(id, msg) (id % 2).toString
}
class PassivatingActor extends Actor with ActorLogging {
override def preStart(): Unit = {
log.info("Starting")
}
override def postStop(): Unit = {
log.info("Stopping")
}
override def receive: Receive = {
case "passivate"
log.info("Passivating")
context.parent ! Passivate(StopMessage)
// simulate another message causing a stop before the region sends the stop message
// e.g. a persistent actor having a persist failure while processing the next message
context.stop(self)
case "hello"
sender() ! Response(self)
case StopMessage
log.info("Received stop from region")
context.parent ! PoisonPill
}
}
}
class SupervisionSpec extends AkkaSpec(SupervisionSpec.config) with ImplicitSender {
import SupervisionSpec._
"Supervision for a sharded actor" must {
"allow passivation" in {
val supervisedProps = BackoffSupervisor.props(Backoff.onStop(
Props(new PassivatingActor()),
childName = "child",
minBackoff = 1.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2,
maxNrOfRetries = -1
).withFinalStopMessage(_ == StopMessage))
Cluster(system).join(Cluster(system).selfAddress)
val region = ClusterSharding(system).start(
"passy",
supervisedProps,
ClusterShardingSettings(system),
idExtractor,
shardResolver
)
region ! Msg(10, "hello")
val response = expectMsgType[Response](5.seconds)
watch(response.self)
region ! Msg(10, "passivate")
expectTerminated(response.self)
// This would fail before as sharded actor would be stuck passivating
region ! Msg(10, "hello")
expectMsgType[Response](20.seconds)
}
}
}

View file

@ -258,7 +258,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
val expected = PersistentRepr(MyPayload(".a."), 13, "p1", "", true, Actor.noSender)
val serializer = serialization.findSerializerFor(expected)
val deserialized = serializer.fromBinary(bytes, None).asInstanceOf[PersistentRepr]
deserialized.sender should not be (null)
deserialized.sender should not be null
val deserializedWithoutSender = deserialized.update(sender = Actor.noSender)
deserializedWithoutSender should be(expected)
}

View file

@ -47,7 +47,7 @@ class AkkaSpecSpec extends WordSpec with Matchers {
"stop correctly when sending PoisonPill to rootGuardian" in {
val system = ActorSystem("AkkaSpec2", AkkaSpec.testConf)
val spec = new AkkaSpec(system) {}
new AkkaSpec(system) {}
val latch = new TestLatch(1)(system)
system.registerOnTermination(latch.countDown())

View file

@ -219,7 +219,7 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA
}
"support receive timeout" in {
val a = TestActorRef(new ReceiveTimeoutActor(testActor))
TestActorRef(new ReceiveTimeoutActor(testActor))
expectMsg("timeout")
}