diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index e2f38377f7..e11b6f0c2d 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -24,8 +24,9 @@ akka.cluster.sharding { remember-entities = off # If the coordinator can't store state changes it will be stopped - # and started again after this duration. - coordinator-failure-backoff = 10 s + # and started again after this duration, with an exponential back-off + # of up to 5 times this duration. + coordinator-failure-backoff = 5 s # The ShardRegion retries registration and shard location requests to the # ShardCoordinator with this interval if it does not reply. @@ -40,9 +41,9 @@ akka.cluster.sharding { # Time given to a region to acknowledge it's hosting a shard. shard-start-timeout = 10 s - # If the shard can't store state changes it will retry the action - # again after this duration. Any messages sent to an affected entity - # will be buffered until the state change is processed + # If the shard is remembering entities and can't store state changes + # will be stopped and then started again after this duration. Any messages + # sent to an affected entity may be lost in this process. shard-failure-backoff = 10 s # If the shard is remembering entities and an entity stops itself without diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index 58a520fef2..21910fa43e 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -449,7 +449,12 @@ private[akka] class ClusterShardingGuardian extends Actor { val shardRegion = context.child(encName).getOrElse { if (context.child(cName).isEmpty) { val coordinatorProps = ShardCoordinator.props(typeName, settings, allocationStrategy) - val singletonProps = ShardCoordinatorSupervisor.props(coordinatorFailureBackoff, coordinatorProps) + val singletonProps = BackoffSupervisor.props( + childProps = coordinatorProps, + childName = "coordinator", + minBackoff = coordinatorFailureBackoff, + maxBackoff = coordinatorFailureBackoff * 5, + randomFactor = 0.2).withDeploy(Deploy.local) val singletonSettings = settings.coordinatorSingletonSettings .withSingletonName("singleton").withRole(role) context.actorOf(ClusterSingletonManager.props( @@ -652,6 +657,12 @@ object ShardRegion { private case object Retry extends ShardRegionCommand + /** + * When an remembering entities and the shard stops unexpected (e.g. persist failure), we + * restart it after a back off using this message. + */ + private final case class RestartShard(shardId: ShardId) + private def roleOption(role: String): Option[String] = if (role == "") None else Option(role) @@ -884,14 +895,17 @@ class ShardRegion( } else if (shardsByRef.contains(ref)) { val shardId: ShardId = shardsByRef(ref) - //Are we meant to be handing off, or is this a unknown stop? + shardsByRef = shardsByRef - ref + shards = shards - shardId if (handingOff.contains(ref)) { - shardsByRef = shardsByRef - ref - shards = shards - shardId handingOff = handingOff - ref log.debug("Shard [{}] handoff complete", shardId) } else { - throw new IllegalStateException(s"Shard [$shardId] terminated while not being handed off.") + // if persist fails it will stop + log.debug("Shard [{}] terminated while not being handed off", shardId) + if (rememberEntities) { + context.system.scheduler.scheduleOnce(shardFailureBackoff, self, RestartShard(shardId)) + } } if (gracefulShutdownInProgress && shards.isEmpty && shardBuffers.isEmpty) @@ -924,31 +938,47 @@ class ShardRegion( } } - def deliverMessage(msg: Any, snd: ActorRef): Unit = { - val shard = extractShardId(msg) - regionByShard.get(shard) match { - case Some(ref) if ref == self ⇒ - getShard(shard).tell(msg, snd) - case Some(ref) ⇒ - log.debug("Forwarding request for shard [{}] to [{}]", shard, ref) - ref.tell(msg, snd) - case None if (shard == null || shard == "") ⇒ - log.warning("Shard must not be empty, dropping message [{}]", msg.getClass.getName) - context.system.deadLetters ! msg - case None ⇒ - if (!shardBuffers.contains(shard)) { - log.debug("Request shard [{}] home", shard) - coordinator.foreach(_ ! GetShardHome(shard)) + def deliverMessage(msg: Any, snd: ActorRef): Unit = + msg match { + case RestartShard(shardId) ⇒ + regionByShard.get(shardId) match { + case Some(ref) ⇒ + if (ref == self) + getShard(shardId) + case None ⇒ + if (!shardBuffers.contains(shardId)) { + log.debug("Request shard [{}] home", shardId) + coordinator.foreach(_ ! GetShardHome(shardId)) + } + val buf = shardBuffers.getOrElse(shardId, Vector.empty) + shardBuffers = shardBuffers.updated(shardId, buf :+ ((msg, snd))) } - if (totalBufferSize >= bufferSize) { - log.debug("Buffer is full, dropping message for shard [{}]", shard) - context.system.deadLetters ! msg - } else { - val buf = shardBuffers.getOrElse(shard, Vector.empty) - shardBuffers = shardBuffers.updated(shard, buf :+ ((msg, snd))) + + case _ ⇒ + val shardId = extractShardId(msg) + regionByShard.get(shardId) match { + case Some(ref) if ref == self ⇒ + getShard(shardId).tell(msg, snd) + case Some(ref) ⇒ + log.debug("Forwarding request for shard [{}] to [{}]", shardId, ref) + ref.tell(msg, snd) + case None if (shardId == null || shardId == "") ⇒ + log.warning("Shard must not be empty, dropping message [{}]", msg.getClass.getName) + context.system.deadLetters ! msg + case None ⇒ + if (!shardBuffers.contains(shardId)) { + log.debug("Request shard [{}] home", shardId) + coordinator.foreach(_ ! GetShardHome(shardId)) + } + if (totalBufferSize >= bufferSize) { + log.debug("Buffer is full, dropping message for shard [{}]", shardId) + context.system.deadLetters ! msg + } else { + val buf = shardBuffers.getOrElse(shardId, Vector.empty) + shardBuffers = shardBuffers.updated(shardId, buf :+ ((msg, snd))) + } } } - } def getShard(id: ShardId): ActorRef = shards.getOrElse( id, @@ -986,21 +1016,11 @@ class ShardRegion( private[akka] object Shard { import ShardRegion.EntityId - object State { - val Empty = State() - } - /** * A Shard command */ sealed trait ShardCommand - /** - * When a `StateChange` fails to write to the journal, we will retry it after a back - * off. - */ - final case class RetryPersistence(payload: StateChange) extends ShardCommand - /** * When an remembering entities and the entity stops without issuing a `Passivate`, we * restart it after a back off using this message. @@ -1022,6 +1042,10 @@ private[akka] object Shard { */ @SerialVersionUID(1L) final case class EntityStopped(entityId: EntityId) extends StateChange + object State { + val Empty = State() + } + /** * Persistent state of the Shard. */ @@ -1061,7 +1085,7 @@ private[akka] class Shard( import ShardRegion.{ handOffStopperProps, EntityId, Msg, Passivate } import ShardCoordinator.Internal.{ HandOff, ShardStopped } - import Shard.{ State, RetryPersistence, RestartEntity, EntityStopped, EntityStarted } + import Shard.{ State, RestartEntity, EntityStopped, EntityStarted } import akka.cluster.sharding.ShardCoordinator.Internal.CoordinatorMessage import akka.cluster.sharding.ShardRegion.ShardRegionCommand import akka.persistence.RecoveryCompleted @@ -1108,17 +1132,15 @@ private[akka] class Shard( } override def receiveCommand: Receive = { - case Terminated(ref) ⇒ receiveTerminated(ref) - case msg: CoordinatorMessage ⇒ receiveCoordinatorMessage(msg) - case msg: ShardCommand ⇒ receiveShardCommand(msg) - case msg: ShardRegionCommand ⇒ receiveShardRegionCommand(msg) - case PersistenceFailure(payload: StateChange, _, _) ⇒ persistenceFailure(payload) - case msg if extractEntityId.isDefinedAt(msg) ⇒ deliverMessage(msg, sender()) + case Terminated(ref) ⇒ receiveTerminated(ref) + case msg: CoordinatorMessage ⇒ receiveCoordinatorMessage(msg) + case msg: ShardCommand ⇒ receiveShardCommand(msg) + case msg: ShardRegionCommand ⇒ receiveShardRegionCommand(msg) + case msg if extractEntityId.isDefinedAt(msg) ⇒ deliverMessage(msg, sender()) } def receiveShardCommand(msg: ShardCommand): Unit = msg match { - case RetryPersistence(payload) ⇒ retryPersistence(payload) - case RestartEntity(id) ⇒ getEntity(id) + case RestartEntity(id) ⇒ getEntity(id) } def receiveShardRegionCommand(msg: ShardRegionCommand): Unit = msg match { @@ -1132,26 +1154,6 @@ private[akka] class Shard( case _ ⇒ unhandled(msg) } - def persistenceFailure(payload: StateChange): Unit = { - log.debug("Persistence of [{}] failed, will backoff and retry", payload) - if (!messageBuffers.isDefinedAt(payload.entityId)) { - messageBuffers = messageBuffers.updated(payload.entityId, Vector.empty) - } - - import context.dispatcher - context.system.scheduler.scheduleOnce(shardFailureBackoff, self, RetryPersistence(payload)) - } - - def retryPersistence(payload: StateChange): Unit = { - log.debug("Retrying Persistence of [{}]", payload) - persist(payload) { _ ⇒ - payload match { - case msg: EntityStarted ⇒ sendMsgBuffer(msg) - case msg: EntityStopped ⇒ passivateCompleted(msg) - } - } - } - def handOff(replyTo: ActorRef): Unit = handOffStopper match { case Some(_) ⇒ log.warning("HandOff shard [{}] received during existing handOff", shardId) case None ⇒ @@ -1287,45 +1289,6 @@ private[akka] class Shard( } } -/** - * INTERNAL API - * @see [[ClusterSharding$ ClusterSharding extension]] - */ -private[akka] object ShardCoordinatorSupervisor { - /** - * Factory method for the [[akka.actor.Props]] of the [[ShardCoordinator]] actor. - */ - def props(failureBackoff: FiniteDuration, coordinatorProps: Props): Props = - Props(new ShardCoordinatorSupervisor(failureBackoff, coordinatorProps)) - .withDeploy(Deploy.local) - - /** - * INTERNAL API - */ - private case object StartCoordinator -} - -/** - * INTERNAL API - */ -private[akka] class ShardCoordinatorSupervisor(failureBackoff: FiniteDuration, coordinatorProps: Props) extends Actor { - import ShardCoordinatorSupervisor._ - - def startCoordinator(): Unit = { - // it will be stopped in case of PersistenceFailure - context.watch(context.actorOf(coordinatorProps, "coordinator")) - } - - override def preStart(): Unit = startCoordinator() - - def receive = { - case Terminated(_) ⇒ - import context.dispatcher - context.system.scheduler.scheduleOnce(failureBackoff, self, StartCoordinator) - case StartCoordinator ⇒ startCoordinator() - } -} - /** * @see [[ClusterSharding$ ClusterSharding extension]] */ diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala index 8cc98a7e58..e9bdc2195a 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala @@ -177,7 +177,7 @@ class ClusterShardingLeavingSpec extends MultiNodeSpec(ClusterShardingLeavingSpe runOn(first) { watch(region) - expectTerminated(region, 5.seconds) + expectTerminated(region, 15.seconds) } enterBarrier("stopped") diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index 8c29ad39f3..b5b0dc9f3e 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -26,6 +26,7 @@ import java.io.File import org.apache.commons.io.FileUtils import akka.cluster.singleton.ClusterSingletonManager import akka.cluster.singleton.ClusterSingletonManagerSettings +import akka.persistence.BackoffSupervisor object ClusterShardingSpec extends MultiNodeConfig { val controller = role("controller") @@ -198,9 +199,14 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult List("counter", "rebalancingCounter", "PersistentCounterEntities", "AnotherPersistentCounter", "PersistentCounter", "RebalancingPersistentCounter", "AutoMigrateRegionTest").foreach { typeName ⇒ val rebalanceEnabled = typeName.toLowerCase.startsWith("rebalancing") + val singletonProps = BackoffSupervisor.props( + childProps = coordinatorProps(typeName, rebalanceEnabled), + childName = "coordinator", + minBackoff = 5.seconds, + maxBackoff = 5.seconds, + randomFactor = 0.1).withDeploy(Deploy.local) system.actorOf(ClusterSingletonManager.props( - singletonProps = ShardCoordinatorSupervisor.props(failureBackoff = 5.seconds, - coordinatorProps(typeName, rebalanceEnabled)), + singletonProps, terminationMessage = PoisonPill, settings = ClusterSingletonManagerSettings(system)), name = typeName + "Coordinator") diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index e70f00b610..3ad450f5b5 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -4,6 +4,8 @@ package docs.persistence; +import java.util.concurrent.TimeUnit; +import scala.concurrent.duration.Duration; import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.ActorSystem; @@ -12,8 +14,8 @@ import akka.actor.UntypedActor; import akka.japi.Function; import akka.japi.Procedure; import akka.persistence.*; -import scala.Option; +import scala.Option; import java.io.Serializable; public class PersistenceDocTest { @@ -104,6 +106,23 @@ public class PersistenceDocTest { } //#recovery-completed } + + abstract class MyActor extends UntypedPersistentActor { + //#backoff + @Override + public void preStart() throws Exception { + final Props childProps = Props.create(MyPersistentActor1.class); + final Props props = BackoffSupervisor.props( + childProps, + "myActor", + Duration.create(3, TimeUnit.SECONDS), + Duration.create(30, TimeUnit.SECONDS), + 0.2); + getContext().actorOf(props, "mySupervisor"); + super.preStart(); + } + //#backoff + } }; static Object fullyDisabledRecoveryExample = new Object() { diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index 9cea3522ae..cf650d0bda 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -137,9 +137,14 @@ When persisting events with ``persist`` it is guaranteed that the persistent act the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist`` calls in context of a single command. -If persistence of an event fails, the persistent actor will be stopped by throwing :class:`ActorKilledException`. -This can be customized by handling ``PersistenceFailure`` message in ``receiveCommand`` and/or defining -``supervisorStrategy`` in parent actor. +If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default) +and the actor will unconditionally be stopped. The reason that it cannot resume when persist fails +is that it is unknown if the even was actually persisted or not, and therefore it is in an inconsistent +state. Restarting on persistent failures will most likely fail anyway, since the journal is probably +unavailable. It is better to stop the actor and after a back-off timeout start it again. The +``akka.persistence.BackoffSupervisor`` actor is provided to support such restarts. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#backoff The easiest way to run this example yourself is to download `Typesafe Activator `_ and open the tutorial named `Akka Persistence Samples in Java with Lambdas `_. @@ -215,9 +220,8 @@ and before any other received messages. .. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recovery-completed -If there is a problem with recovering the state of the actor from the journal, the actor will be -sent a :class:`RecoveryFailure` message that it can choose to handle in ``receiveRecover``. If the -actor doesn't handle the :class:`RecoveryFailure` message it will be stopped by throwing :class:`ActorKilledException`. +If there is a problem with recovering the state of the actor from the journal, the error will be logged and the +actor will be stopped. Relaxed local consistency requirements and high throughput use-cases diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index c47bb7cdc1..2f48d5648b 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -138,9 +138,14 @@ When persisting events with ``persist`` it is guaranteed that the persistent act the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist`` calls in context of a single command. -If persistence of an event fails, the persistent actor will be stopped by throwing :class:`ActorKilledException`. -This can be customized by handling ``PersistenceFailure`` message in ``onReceiveCommand`` and/or defining -``supervisorStrategy`` in parent actor. +If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default) +and the actor will unconditionally be stopped. The reason that it cannot resume when persist fails +is that it is unknown if the even was actually persisted or not, and therefore it is in an inconsistent +state. Restarting on persistent failures will most likely fail anyway, since the journal is probably +unavailable. It is better to stop the actor and after a back-off timeout start it again. The +``akka.persistence.BackoffSupervisor`` actor is provided to support such restarts. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#backoff The easiest way to run this example yourself is to download `Typesafe Activator `_ and open the tutorial named `Akka Persistence Samples with Java `_. @@ -217,9 +222,8 @@ and before any other received messages. .. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-completed -If there is a problem with recovering the state of the actor from the journal, the actor will be -sent a :class:`RecoveryFailure` message that it can choose to handle in ``receiveRecover``. If the -actor doesn't handle the :class:`RecoveryFailure` message it will be stopped by throwing :class:`ActorKilledException`. +If there is a problem with recovering the state of the actor from the journal, the error will be logged and the +actor will be stopped. .. _persist-async-java: diff --git a/akka-docs/rst/scala/cluster-sharding.rst b/akka-docs/rst/scala/cluster-sharding.rst index 6793d62793..0f7d17ec19 100644 --- a/akka-docs/rst/scala/cluster-sharding.rst +++ b/akka-docs/rst/scala/cluster-sharding.rst @@ -265,7 +265,7 @@ the ``rememberEntities`` flag to true in ``ClusterShardingSettings`` when callin ``ClusterSharding.start``. When configured to remember entities, whenever a ``Shard`` is rebalanced onto another node or recovers after a crash it will recreate all the entities which were previously running in that ``Shard``. To permanently stop entities, -a ``Passivate`` message must be sent to the parent the ``Shard``, otherwise the +a ``Passivate`` message must be sent to the parent of the entity actor, otherwise the entity will be automatically restarted after the entity restart backoff specified in the configuration. diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index c4fe4ff5bf..47a2dca14c 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -100,6 +100,23 @@ object PersistenceDocSpec { } } + object Backoff { + abstract class MyActor extends Actor { + import PersistAsync.MyPersistentActor + //#backoff + val childProps = Props[MyPersistentActor] + val props = BackoffSupervisor.props( + childProps, + childName = "myActor", + minBackoff = 3.seconds, + maxBackoff = 30.seconds, + randomFactor = 0.2) + context.actorOf(props, name = "mySupervisor") + //#backoff + } + + } + object AtLeastOnce { //#at-least-once-example import akka.actor.{ Actor, ActorPath } @@ -288,4 +305,4 @@ object PersistenceDocSpec { //#view-update } -} \ No newline at end of file +} diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 4654732a76..3b29dea7ea 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -123,9 +123,14 @@ When persisting events with ``persist`` it is guaranteed that the persistent act the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist`` calls in context of a single command. -If persistence of an event fails, the persistent actor will be stopped by throwing :class:`ActorKilledException`. -This can be customized by handling ``PersistenceFailure`` message in ``receiveCommand`` and/or defining -``supervisorStrategy`` in parent actor. +If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default) +and the actor will unconditionally be stopped. The reason that it cannot resume when persist fails +is that it is unknown if the even was actually persisted or not, and therefore it is in an inconsistent +state. Restarting on persistent failures will most likely fail anyway, since the journal is probably +unavailable. It is better to stop the actor and after a back-off timeout start it again. The +``akka.persistence.BackoffSupervisor`` actor is provided to support such restarts. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#backoff The easiest way to run this example yourself is to download `Typesafe Activator `_ and open the tutorial named `Akka Persistence Samples with Scala `_. @@ -201,9 +206,8 @@ and before any other received messages. .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-completed -If there is a problem with recovering the state of the actor from the journal, the actor will be -sent a :class:`RecoveryFailure` message that it can choose to handle in ``receiveRecover``. If the -actor doesn't handle the :class:`RecoveryFailure` message it will be stopped by throwing :class:`ActorKilledException`. +If there is a problem with recovering the state of the actor from the journal, the error will be logged and the +actor will be stopped. .. _persist-async-scala: diff --git a/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala b/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala index b39df1425b..53020b460c 100644 --- a/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala @@ -300,7 +300,7 @@ trait AtLeastOnceDelivery extends Eventsourced { super.aroundPostStop() } - override private[persistence] def onReplaySuccess(): Unit = { + override private[akka] def onReplaySuccess(): Unit = { redeliverOverdue() super.onReplaySuccess() } diff --git a/akka-persistence/src/main/scala/akka/persistence/BackoffSupervisor.scala b/akka-persistence/src/main/scala/akka/persistence/BackoffSupervisor.scala new file mode 100644 index 0000000000..0388bfc906 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/BackoffSupervisor.scala @@ -0,0 +1,148 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.persistence + +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.forkjoin.ThreadLocalRandom +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.DeadLetterSuppression +import akka.actor.Props +import akka.actor.Terminated +import java.util.Optional +import scala.concurrent.duration.Duration + +object BackoffSupervisor { + + /** + * @param childProps the [[akka.actor.Props]] of the child actor that + * will be started and supervised + * @param childName name of the child actor + * @param minBackoff minimum (initial) duration until the child actor will + * started again, if it is terminated + * @param maxBackoff the exponential back-off is capped to this duration + * @param randomFactor after calculation of the exponential back-off an additional + * random delay based on this factor is added, e.g. 0.2 adds up to 20% + * delay + */ + def props( + childProps: Props, + childName: String, + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double): Props = { + require(minBackoff > Duration.Zero, "minBackoff must be > 0") + require(maxBackoff >= minBackoff, "maxBackoff must be >= minBackoff") + require(0.0 <= randomFactor && randomFactor <= 1.0, "randomFactor must be between 0.0 and 1.0") + Props(new BackoffSupervisor(childProps, childName, minBackoff, maxBackoff, randomFactor)) + } + + /** + * Send this message to the [[BackoffSupervisor]] and it will reply with + * [[BackoffSupervisor.CurrentChild]] containing the `ActorRef` of the current child, if any. + */ + final case object GetCurrentChild + + /** + * Java API: Send this message to the [[BackoffSupervisor]] and it will reply with + * [[BackoffSupervisor.CurrentChild]] containing the `ActorRef` of the current child, if any. + */ + def getCurrentChild = GetCurrentChild + + final case class CurrentChild(ref: Option[ActorRef]) { + /** + * Java API: The `ActorRef` of the current child, if any + */ + def getRef: Optional[ActorRef] = Optional.ofNullable(ref.orNull) + } + + private case object StartChild extends DeadLetterSuppression + private case class ResetRestartCount(current: Int) extends DeadLetterSuppression +} + +/** + * This actor can be used to supervise a child actor and start it again + * after a back-off duration if the child actor is stopped. This is useful + * for persistent actors, which are stopped in case of persistence failures. + * Just restarting them immediately would probably fail again (since the data + * store is probably unavailable). It is better to try again after a delay. + * + * It supports exponential back-off between the given `minBackoff` and + * `maxBackoff` durations. For example, if `minBackoff` is 3 seconds and + * `maxBackoff` 30 seconds the start attempts will be delayed with + * 3, 6, 12, 24, 30, 30 seconds. The exponential back-off counter is reset + * if the actor is not terminated within the `minBackoff` duration. + * + * In addition to the calculated exponential back-off an additional + * random delay based the given `randomFactor` is added, e.g. 0.2 adds up to 20% + * delay. The reason for adding a random delay is to avoid that all failing + * actors hit the backend resource at the same time. + * + * You can retrieve the current child `ActorRef` by sending `BackoffSupervisor.GetCurrentChild` + * message to this actor and it will reply with [[BackoffSupervisor.CurrentChild]] containing the + * `ActorRef` of the current child, if any. + * + * The `BackoffSupervisor` forwards all other messages to the child, if it is currently running. + * + * The child can stop itself and send a [[akka.actor.PoisonPill]] to the parent supervisor + * if it want to do an intentional stop. + */ +final class BackoffSupervisor( + childProps: Props, + childName: String, + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double) + extends Actor { + + import BackoffSupervisor._ + import context.dispatcher + + private var child: Option[ActorRef] = None + private var restartCount = 0 + + override def preStart(): Unit = { + startChild() + super.preStart() + } + + def startChild(): Unit = + if (child == None) { + child = Some(context.watch(context.actorOf(childProps, childName))) + } + + def receive = { + case Terminated(ref) if child.contains(ref) ⇒ + child = None + val rnd = 1.0 + ThreadLocalRandom.current().nextDouble() * randomFactor + val restartDelay = + if (restartCount >= 30) // Duration overflow protection (> 100 years) + maxBackoff + else + (maxBackoff.min(minBackoff * math.pow(2, restartCount)) * rnd) match { + case f: FiniteDuration ⇒ f + case _ ⇒ maxBackoff + } + context.system.scheduler.scheduleOnce(restartDelay, self, StartChild) + restartCount += 1 + + case StartChild ⇒ + startChild() + context.system.scheduler.scheduleOnce(minBackoff, self, ResetRestartCount(restartCount)) + + case ResetRestartCount(current) ⇒ + if (current == restartCount) + restartCount = 0 + + case GetCurrentChild ⇒ + sender() ! CurrentChild(child) + + case msg ⇒ child match { + case Some(c) ⇒ c.forward(msg) + case None ⇒ context.system.deadLetters.forward(msg) + } + } +} + diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index f5e79be932..8f7eee60b7 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -7,7 +7,6 @@ package akka.persistence import java.util.concurrent.atomic.AtomicInteger import scala.collection.immutable import scala.util.control.NonFatal -import akka.actor.ActorKilledException import akka.actor.Stash import akka.actor.StashFactory import akka.event.Logging @@ -92,15 +91,46 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * Called whenever a message replay succeeds. * May be implemented by subclass. */ - private[persistence] def onReplaySuccess(): Unit = () + private[akka] def onReplaySuccess(): Unit = () /** - * INTERNAL API. - * Called whenever a message replay fails. - * May be implemented by subclass. + * Called whenever a message replay fails. By default it logs the error. + * + * Subclass may override to customize logging. + * + * The actor is always stopped after this method has been invoked. + * * @param cause failure cause. + * @param event the event that was processed in `receiveRecover`, if the exception + * was thrown there */ - private[persistence] def onReplayFailure(cause: Throwable): Unit = () + protected def onReplayFailure(cause: Throwable, event: Option[Any]): Unit = + event match { + case Some(evt) ⇒ + log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}] for " + + "persistenceId [{}].", evt.getClass.getName, lastSequenceNr, persistenceId) + case None ⇒ + log.error(cause, "Persistence failure when replaying events for persistenceId [{}]. " + + "Last known sequence number [{}]", persistenceId, lastSequenceNr) + } + + /** + * Called when persist fails. By default it logs the error. + * Subclass may override to customize logging and for example send negative + * acknowledgment to sender. + * + * The actor is always stopped after this method has been invoked. + * + * Note that the event may or may not have been saved, depending on the type of + * failure. + * + * @param cause failure cause. + * @param event the event that was to be persisted + */ + protected def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = { + log.error(cause, "Failed to persist event type [{}] with sequence number [{}] for persistenceId [{}].", + event.getClass.getName, seqNr, persistenceId) + } /** * User-overridable callback. Called when a persistent actor is started or restarted. @@ -156,17 +186,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas override def unhandled(message: Any): Unit = { message match { case RecoveryCompleted | ReadHighestSequenceNrSuccess | ReadHighestSequenceNrFailure ⇒ // mute - case RecoveryFailure(cause) ⇒ - val errorMsg = s"PersistentActor killed after recovery failure (persisten id = [${persistenceId}]). " + - "To avoid killing persistent actors on recovery failure, a PersistentActor must handle RecoveryFailure messages. " + - "RecoveryFailure was caused by: " + cause - throw new ActorKilledException(errorMsg) - case PersistenceFailure(payload, sequenceNumber, cause) ⇒ - val errorMsg = "PersistentActor killed after persistence failure " + - s"(persistent id = [${persistenceId}], sequence nr = [${sequenceNumber}], payload class = [${payload.getClass.getName}]). " + - "To avoid killing persistent actors on persistence failure, a PersistentActor must handle PersistenceFailure messages. " + - "PersistenceFailure was caused by: " + cause - throw new ActorKilledException(errorMsg) case m ⇒ super.unhandled(m) } } @@ -203,9 +222,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * should not perform actions that may fail, such as interacting with external services, * for example. * - * If recovery fails, the actor will be stopped by throwing ActorKilledException. - * This can be customized by handling [[RecoveryFailure]] message in `receiveRecover` - * and/or defining `supervisorStrategy` in parent actor. + * If there is a problem with recovering the state of the actor from the journal, the error + * will be logged and the actor will be stopped. * * @see [[Recover]] */ @@ -233,9 +251,12 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * Within an event handler, applications usually update persistent actor state using persisted event * data, notify listeners and reply to command senders. * - * If persistence of an event fails, the persistent actor will be stopped by throwing ActorKilledException. - * This can be customized by handling [[PersistenceFailure]] message in [[#receiveCommand]] - * and/or defining `supervisorStrategy` in parent actor. + * If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will + * unconditionally be stopped. The reason that it cannot resume when persist fails is that it + * is unknown if the even was actually persisted or not, and therefore it is in an inconsistent + * state. Restarting on persistent failures will most likely fail anyway, since the journal + * is probably unavailable. It is better to stop the actor and after a back-off timeout start + * it again. * * @param event event to be persisted * @param handler handler for each persisted `event` @@ -270,9 +291,12 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * event is the sender of the corresponding command. This means that one can reply to a command * sender within an event `handler`. * - * If persistence of an event fails, the persistent actor will be stopped by throwing ActorKilledException. - * This can be customized by handling [[PersistenceFailure]] message in [[#receiveCommand]] - * and/or defining `supervisorStrategy` in parent actor. + * If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will + * unconditionally be stopped. The reason that it cannot resume when persist fails is that it + * is unknown if the even was actually persisted or not, and therefore it is in an inconsistent + * state. Restarting on persistent failures will most likely fail anyway, since the journal + * is probably unavailable. It is better to stop the actor and after a back-off timeout start + * it again. * * @param event event to be persisted * @param handler handler for each persisted `event` @@ -304,10 +328,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * * If there are no pending persist handler calls, the handler will be called immediately. * - * In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the - * [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards. - * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor by - * throwing ActorKilledException, thus the handlers will not be run. + * If persistence of an earlier event fails, the persistent actor will stop, and the `handler` + * will not be run. * * @param event event to be handled in the future, when preceding persist operations have been processes * @param handler handler for the given `event` @@ -398,8 +420,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas _receiveRecover(payload) case s: SnapshotOffer if _receiveRecover.isDefinedAt(s) ⇒ _receiveRecover(s) - case f: RecoveryFailure if _receiveRecover.isDefinedAt(f) ⇒ - _receiveRecover(f) case RecoveryCompleted if _receiveRecover.isDefinedAt(RecoveryCompleted) ⇒ _receiveRecover(RecoveryCompleted) } @@ -428,12 +448,9 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * events. * * If replay succeeds it switches to `initializing` state and requests the highest stored sequence - * number from the journal. Otherwise RecoveryFailure is emitted. + * number from the journal. Otherwise the actor is stopped. * If replay succeeds the `onReplaySuccess` callback method is called, otherwise `onReplayFailure`. * - * If processing of a replayed event fails, the exception is caught and - * stored for later `RecoveryFailure` message and state is changed to `recoveryFailed`. - * * All incoming messages are stashed. */ private def replayStarted(recoveryBehavior: Receive) = new State { @@ -448,50 +465,19 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas Eventsourced.super.aroundReceive(recoveryBehavior, p) } catch { case NonFatal(t) ⇒ - changeState(replayFailed(recoveryBehavior, t, p)) + try onReplayFailure(t, Some(p.payload)) finally context.stop(self) } case ReplayMessagesSuccess ⇒ onReplaySuccess() // callback for subclass implementation changeState(initializing(recoveryBehavior)) journal ! ReadHighestSequenceNr(lastSequenceNr, persistenceId, self) case ReplayMessagesFailure(cause) ⇒ - // in case the actor resumes the state must be initializing - changeState(initializing(recoveryBehavior)) - journal ! ReadHighestSequenceNr(lastSequenceNr, persistenceId, self) - - onReplayFailure(cause) // callback for subclass implementation - Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryFailure(cause)(None)) + try onReplayFailure(cause, event = None) finally context.stop(self) case other ⇒ internalStash.stash() } } - /** - * Consumes remaining replayed messages and then emits RecoveryFailure to the - * `receiveRecover` behavior. - */ - private def replayFailed(recoveryBehavior: Receive, cause: Throwable, failed: PersistentRepr) = new State { - - override def toString: String = "replay failed" - override def recoveryRunning: Boolean = true - - override def stateReceive(receive: Receive, message: Any) = message match { - case ReplayedMessage(p) ⇒ updateLastSequenceNr(p) - case ReplayMessagesSuccess | ReplayMessagesFailure(_) ⇒ replayCompleted() - case r: Recover ⇒ // ignore - case _ ⇒ internalStash.stash() - } - - def replayCompleted(): Unit = { - // in case the actor resumes the state must be initializing - changeState(initializing(recoveryBehavior)) - journal ! ReadHighestSequenceNr(failed.sequenceNr, persistenceId, self) - - Eventsourced.super.aroundReceive(recoveryBehavior, - RecoveryFailure(cause)(Some((failed.sequenceNr, failed.payload)))) - } - } - /** * Processes the highest stored sequence number response from the journal and then switches * to `processingCommands` state. @@ -536,10 +522,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas // instanceId mismatch can happen for persistAsync and defer in case of actor restart // while message is in flight, in that case the handler has already been discarded if (id == instanceId) { - try { - Eventsourced.super.aroundReceive(receive, PersistenceFailure(p.payload, p.sequenceNr, cause)) // stops actor by default - onWriteMessageComplete(err = false) - } catch { case NonFatal(e) ⇒ onWriteMessageComplete(err = true); throw e } + onWriteMessageComplete(err = false) + try onPersistFailure(cause, p.payload, p.sequenceNr) finally context.stop(self) } case LoopMessageSuccess(l, id) ⇒ // instanceId mismatch can happen for persistAsync and defer in case of actor restart @@ -550,7 +534,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas onWriteMessageComplete(err = false) } catch { case NonFatal(e) ⇒ onWriteMessageComplete(err = true); throw e } } - case WriteMessagesSuccessful | WriteMessagesFailed(_) ⇒ // FIXME PN: WriteMessagesFailed? + case WriteMessagesSuccessful | WriteMessagesFailed(_) ⇒ + // FIXME PN: WriteMessagesFailed? if (journalBatch.isEmpty) writeInProgress = false else flushJournalBatch() } diff --git a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala index 28b8548c25..8f460c4c17 100644 --- a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala @@ -108,21 +108,21 @@ private[persistence] object JournalProtocol { * @param persistent replayed message. */ final case class ReplayedMessage(persistent: PersistentRepr) - extends Response + extends Response with DeadLetterSuppression /** * Reply message to a successful [[ReplayMessages]] request. This reply is sent to the requestor * after all [[ReplayedMessage]] have been sent (if any). */ case object ReplayMessagesSuccess - extends Response + extends Response with DeadLetterSuppression /** * Reply message to a failed [[ReplayMessages]] request. This reply is sent to the requestor * if a replay could not be successfully completed. */ final case class ReplayMessagesFailure(cause: Throwable) - extends Response + extends Response with DeadLetterSuppression /** * Request to read the highest stored sequence number of a given persistent actor. diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala index 76c0d3e56b..f8ca8a9685 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala @@ -9,39 +9,6 @@ import akka.japi.Procedure import akka.actor.AbstractActor import akka.japi.Util -/** - * Sent to a [[PersistentActor]] if a journal fails to write a persistent message. If - * not handled, an `akka.actor.ActorKilledException` is thrown by that persistent actor. - * - * @param payload payload of the persistent message. - * @param sequenceNr sequence number of the persistent message. - * @param cause failure cause. - */ -@SerialVersionUID(1L) -case class PersistenceFailure(payload: Any, sequenceNr: Long, cause: Throwable) - -/** - * Sent to a [[PersistentActor]] if a journal fails to replay messages or fetch that persistent actor's - * highest sequence number. If not handled, the actor will be stopped. - * - * Contains the [[#sequenceNr]] of the message that could not be replayed, if it - * failed at a specific message. - * - * Contains the [[#payload]] of the message that could not be replayed, if it - * failed at a specific message. - */ -@SerialVersionUID(1L) -case class RecoveryFailure(cause: Throwable)(failingMessage: Option[(Long, Any)]) { - override def toString: String = failingMessage match { - case Some((sequenceNr, payload)) ⇒ s"RecoveryFailure(${cause.getMessage},$sequenceNr,$payload)" - case None ⇒ s"RecoveryFailure(${cause.getMessage})" - } - - def sequenceNr: Option[Long] = failingMessage.map { case (snr, _) ⇒ snr } - - def payload: Option[Any] = failingMessage.map { case (_, payload) ⇒ payload } -} - abstract class RecoveryCompleted /** * Sent to a [[PersistentActor]] when the journal replay has been finished. @@ -150,8 +117,12 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit * Within an event handler, applications usually update persistent actor state using persisted event * data, notify listeners and reply to command senders. * - * If persistence of an event fails, the persistent actor will be stopped. This can be customized by - * handling [[PersistenceFailure]] in [[onReceiveCommand]]. + * If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will + * unconditionally be stopped. The reason that it cannot resume when persist fails is that it + * is unknown if the even was actually persisted or not, and therefore it is in an inconsistent + * state. Restarting on persistent failures will most likely fail anyway, since the journal + * is probably unavailable. It is better to stop the actor and after a back-off timeout start + * it again. * * @param event event to be persisted. * @param handler handler for each persisted `event` @@ -183,8 +154,12 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit * event is the sender of the corresponding command. This means that one can reply to a command * sender within an event `handler`. * - * If persistence of an event fails, the persistent actor will be stopped. This can be customized by - * handling [[PersistenceFailure]] in [[receiveCommand]]. + * If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will + * unconditionally be stopped. The reason that it cannot resume when persist fails is that it + * is unknown if the even was actually persisted or not, and therefore it is in an inconsistent + * state. Restarting on persistent failures will most likely fail anyway, since the journal + * is probably unavailable. It is better to stop the actor and after a back-off timeout start + * it again. * * @param event event to be persisted * @param handler handler for each persisted `event` @@ -214,9 +189,7 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit * * If there are no pending persist handler calls, the handler will be called immediately. * - * In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the - * [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards. - * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers + * If persistence of an earlier event fails, the persistent actor will stop, and the `handler` * will not be run. * * @param event event to be handled in the future, when preceding persist operations have been processes @@ -234,8 +207,8 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit * should not perform actions that may fail, such as interacting with external services, * for example. * - * If recovery fails, the actor will be stopped. This can be customized by - * handling [[RecoveryFailure]]. + * If there is a problem with recovering the state of the actor from the journal, the error + * will be logged and the actor will be stopped. * * @see [[Recover]] */ @@ -271,8 +244,12 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo * Within an event handler, applications usually update persistent actor state using persisted event * data, notify listeners and reply to command senders. * - * If persistence of an event fails, the persistent actor will be stopped. This can be customized by - * handling [[PersistenceFailure]] in [[receiveCommand]]. + * If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will + * unconditionally be stopped. The reason that it cannot resume when persist fails is that it + * is unknown if the even was actually persisted or not, and therefore it is in an inconsistent + * state. Restarting on persistent failures will most likely fail anyway, since the journal + * is probably unavailable. It is better to stop the actor and after a back-off timeout start + * it again. * * @param event event to be persisted. * @param handler handler for each persisted `event` @@ -299,8 +276,12 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo * call to `persistAsync` and executing it's `handler`. This asynchronous, non-stashing, version of * of persist should be used when you favor throughput over the strict ordering guarantees that `persist` guarantees. * - * If persistence of an event fails, the persistent actor will be stopped. This can be customized by - * handling [[PersistenceFailure]] in [[receiveCommand]]. + * If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will + * unconditionally be stopped. The reason that it cannot resume when persist fails is that it + * is unknown if the even was actually persisted or not, and therefore it is in an inconsistent + * state. Restarting on persistent failures will most likely fail anyway, since the journal + * is probably unavailable. It is better to stop the actor and after a back-off timeout start + * it again. * * @param event event to be persisted * @param handler handler for each persisted `event` @@ -330,9 +311,7 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo * * If there are no pending persist handler calls, the handler will be called immediately. * - * In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the - * [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards. - * If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers + * If persistence of an earlier event fails, the persistent actor will stop, and the `handler` * will not be run. * * @param event event to be handled in the future, when preceding persist operations have been processes diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala index f04740ba14..1c02334e45 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala @@ -8,12 +8,12 @@ import scala.concurrent.duration._ import scala.util.control.NonFatal import akka.actor.AbstractActor import akka.actor.Actor -import akka.actor.ActorKilledException import akka.actor.Cancellable import akka.actor.Stash import akka.actor.StashFactory import akka.actor.UntypedActor import akka.dispatch.Envelope +import akka.actor.ActorLogging /** * Instructs a [[PersistentView]] to update itself. This will run a single incremental message replay with @@ -80,7 +80,8 @@ private[akka] object PersistentView { * - [[autoUpdate]] for turning automated updates on or off * - [[autoUpdateReplayMax]] for limiting the number of replayed messages per view update cycle */ -trait PersistentView extends Actor with Snapshotter with Stash with StashFactory with PersistenceIdentity { +trait PersistentView extends Actor with Snapshotter with Stash with StashFactory with PersistenceIdentity + with ActorLogging { import PersistentView._ import JournalProtocol._ import SnapshotProtocol.LoadSnapshotResult @@ -205,16 +206,15 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory super.postStop() } - override def unhandled(message: Any): Unit = { - message match { - case RecoveryCompleted ⇒ // mute - case RecoveryFailure(cause) ⇒ - val errorMsg = s"PersistentView killed after recovery failure (persisten id = [${persistenceId}]). " + - "To avoid killing persistent actors on recovery failure, a PersistentView must handle RecoveryFailure messages. " + - "RecoveryFailure was caused by: " + cause - throw new ActorKilledException(errorMsg) - case m ⇒ super.unhandled(m) - } + /** + * Called whenever a message replay fails. By default it logs the error. + * Subclass may override to customize logging. + * The `PersistentView` will not stop or throw exception due to this. + * It will try again on next update. + */ + protected def onReplayError(cause: Throwable): Unit = { + log.error(cause, "Persistence view failure when replaying events for persistenceId [{}]. " + + "Last known sequence number [{}]", persistenceId, lastSequenceNr) } private def changeState(state: State): Unit = { @@ -265,7 +265,6 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory sso.foreach { case SelectedSnapshot(metadata, snapshot) ⇒ setLastSequenceNr(metadata.sequenceNr) - // Since we are recovering we can ignore the receive behavior from the stack PersistentView.super.aroundReceive(receive, SnapshotOffer(metadata, snapshot)) } changeState(replayStarted(await = true)) @@ -279,64 +278,70 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory * events. * * If replay succeeds it switches to `initializing` state and requests the highest stored sequence - * number from the journal. Otherwise RecoveryFailure is emitted. - * If replay succeeds the `onReplaySuccess` callback method is called, otherwise `onReplayFailure`. + * number from the journal. + * + * If replay succeeds the `onReplaySuccess` callback method is called, otherwise `onReplayError` is called and + * remaining replay events are consumed (ignored). * * If processing of a replayed event fails, the exception is caught and - * stored for later `RecoveryFailure` message and state is changed to `recoveryFailed`. + * stored for later and state is changed to `recoveryFailed`. * - * All incoming messages are stashed. + * All incoming messages are stashed when `await` is true. */ private def replayStarted(await: Boolean) = new State { override def toString: String = s"replay started" override def recoveryRunning: Boolean = true - private var stashUpdate = await - override def stateReceive(receive: Receive, message: Any) = message match { - case ScheduledUpdate(_) ⇒ // ignore - case Update(false, _) ⇒ // ignore - case u @ Update(true, _) if !stashUpdate ⇒ - stashUpdate = true - internalStash.stash() - case r: Recover ⇒ // ignore case ReplayedMessage(p) ⇒ try { updateLastSequenceNr(p) PersistentView.super.aroundReceive(receive, p.payload) } catch { case NonFatal(t) ⇒ - changeState(replayFailed(t, p)) + changeState(ignoreRemainingReplay(t)) } case ReplayMessagesSuccess ⇒ - onReplayComplete(await) + onReplayComplete() case ReplayMessagesFailure(cause) ⇒ - onReplayComplete(await) - PersistentView.super.aroundReceive(receive, RecoveryFailure(cause)(None)) + try onReplayError(cause) finally onReplayComplete() + case ScheduledUpdate(_) ⇒ // ignore + case r: Recover ⇒ // ignore + case Update(a, _) ⇒ + if (a) + internalStash.stash() case other ⇒ - internalStash.stash() + if (await) + internalStash.stash() + else { + try { + PersistentView.super.aroundReceive(receive, other) + } catch { + case NonFatal(t) ⇒ + changeState(ignoreRemainingReplay(t)) + } + } } /** - * Switches to `idle` state and schedules the next update if `autoUpdate` returns `true`. + * Switches to `idle` */ - private def onReplayComplete(await: Boolean): Unit = { + private def onReplayComplete(): Unit = { changeState(idle) - if (await) internalStash.unstashAll() + internalStash.unstashAll() } } /** - * Consumes remaining replayed messages and then emits RecoveryFailure to the - * `receive` behavior. + * Consumes remaining replayed messages and then throw the exception. */ - private def replayFailed(cause: Throwable, failed: PersistentRepr) = new State { + private def ignoreRemainingReplay(cause: Throwable) = new State { override def toString: String = "replay failed" override def recoveryRunning: Boolean = true override def stateReceive(receive: Receive, message: Any) = message match { - case ReplayedMessage(p) ⇒ updateLastSequenceNr(p) + case ReplayedMessage(p) ⇒ case ReplayMessagesFailure(_) ⇒ replayCompleted(receive) // journal couldn't tell the maximum stored sequence number, hence the next @@ -351,8 +356,8 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory def replayCompleted(receive: Receive): Unit = { // in case the actor resumes the state must be `idle` changeState(idle) - - PersistentView.super.aroundReceive(receive, RecoveryFailure(cause)(Some((failed.sequenceNr, failed.payload)))) + internalStash.unstashAll() + throw cause } } diff --git a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala index 02e190ad73..b22bfbde7e 100644 --- a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala @@ -36,8 +36,6 @@ object AtLeastOnceDeliveryFailureSpec { case class Done(ints: Vector[Int]) case class Ack(i: Int) - case class ProcessingFailure(i: Int) - case class JournalingFailure(i: Int) case class Msg(deliveryId: Long, i: Int) case class Confirm(deliveryId: Long, i: Int) @@ -76,7 +74,6 @@ object AtLeastOnceDeliveryFailureSpec { def receiveCommand: Receive = { case i: Int ⇒ - val failureRate = if (recoveryRunning) replayProcessingFailureRate else liveProcessingFailureRate if (contains(i)) { log.debug(debugMessage(s"ignored duplicate ${i}")) sender() ! Ack(i) @@ -84,29 +81,24 @@ object AtLeastOnceDeliveryFailureSpec { persist(MsgSent(i)) { evt ⇒ updateState(evt) sender() ! Ack(i) - if (shouldFail(failureRate)) - throw new TestException(debugMessage(s"failed at payload ${i}")) + if (shouldFail(liveProcessingFailureRate)) + throw new TestException(debugMessage(s"failed at payload $i")) else - log.debug(debugMessage(s"processed payload ${i}")) + log.debug(debugMessage(s"processed payload $i")) } } case Confirm(deliveryId, i) ⇒ persist(MsgConfirmed(deliveryId, i))(updateState) - - case PersistenceFailure(MsgSent(i), _, _) ⇒ - // inform sender about journaling failure so that it can resend - sender() ! JournalingFailure(i) - - case PersistenceFailure(MsgConfirmed(_, i), _, _) ⇒ - // ok, will be redelivered } def receiveRecover: Receive = { - case evt: Evt ⇒ updateState(evt) - case RecoveryFailure(_) ⇒ - // journal failed during recovery, throw exception to re-recover persistent actor - throw new TestException(debugMessage("recovery failed")) + case evt: Evt ⇒ + updateState(evt) + if (shouldFail(replayProcessingFailureRate)) + throw new TestException(debugMessage(s"replay failed at event $evt")) + else + log.debug(debugMessage(s"replayed event $evt")) } def updateState(evt: Evt): Unit = evt match { @@ -120,6 +112,14 @@ object AtLeastOnceDeliveryFailureSpec { private def debugMessage(msg: String): String = s"[sender] ${msg} (mode = ${if (recoveryRunning) "replay" else "live"} snr = ${lastSequenceNr} state = ${state.sorted})" + + override protected def onReplayFailure(cause: Throwable, event: Option[Any]): Unit = { + // mute logging + } + + override protected def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = { + // mute logging + } } class ChaosDestination(val probe: ActorRef) extends Actor with ChaosSupport with ActorLogging { @@ -129,7 +129,7 @@ object AtLeastOnceDeliveryFailureSpec { def receive = { case m @ Msg(deliveryId, i) ⇒ if (shouldFail(confirmFailureRate)) { - log.error(debugMessage("confirm message failed", m)) + log.debug(debugMessage("confirm message failed", m)) } else if (contains(i)) { log.debug(debugMessage("ignored duplicate", m)) sender() ! Confirm(deliveryId, i) @@ -155,14 +155,8 @@ object AtLeastOnceDeliveryFailureSpec { def receive = { case Start ⇒ 1 to numMessages foreach (snd ! _) case Ack(i) ⇒ acks += i - case ProcessingFailure(i) ⇒ - snd ! i - log.debug(s"resent ${i} after processing failure") - case JournalingFailure(i) ⇒ - snd ! i - log.debug(s"resent ${i} after journaling failure") case Terminated(_) ⇒ - // snd will be stopped if ReadHighestSequenceNr fails + // snd will be stopped if recovery or persist fails log.debug(s"sender stopped, starting it again") snd = createSender() 1 to numMessages foreach (i ⇒ if (!acks(i)) snd ! i) @@ -173,6 +167,8 @@ object AtLeastOnceDeliveryFailureSpec { class AtLeastOnceDeliveryFailureSpec extends AkkaSpec(AtLeastOnceDeliveryFailureSpec.config) with Cleanup with ImplicitSender { import AtLeastOnceDeliveryFailureSpec._ + muteDeadLetters(classOf[AnyRef])(system) + "AtLeastOnceDelivery" must { "tolerate and recover from random failures" in { system.actorOf(Props(classOf[ChaosApp], testActor), "chaosApp") ! Start diff --git a/akka-persistence/src/test/scala/akka/persistence/BackoffSupervisorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/BackoffSupervisorSpec.scala new file mode 100644 index 0000000000..6543ceb7cf --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/BackoffSupervisorSpec.scala @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ + +package akka.persistence + +import scala.concurrent.duration._ +import akka.actor._ +import akka.testkit._ + +object BackoffSupervisorSpec { + object Child { + def props(probe: ActorRef): Props = + Props(new Child(probe)) + } + + class Child(probe: ActorRef) extends Actor { + def receive = { + case msg ⇒ probe ! msg + } + } +} + +class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender { + import BackoffSupervisorSpec._ + + "BackoffSupervisor" must { + "start child again when it stops" in { + val supervisor = system.actorOf( + BackoffSupervisor.props(Child.props(testActor), "c1", 100.millis, 3.seconds, 0.2)) + supervisor ! BackoffSupervisor.GetCurrentChild + val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get + watch(c1) + c1 ! PoisonPill + expectTerminated(c1) + awaitAssert { + supervisor ! BackoffSupervisor.GetCurrentChild + // new instance + expectMsgType[BackoffSupervisor.CurrentChild].ref.get should !==(c1) + } + } + + "forward messages to the child" in { + val supervisor = system.actorOf( + BackoffSupervisor.props(Child.props(testActor), "c2", 100.millis, 3.seconds, 0.2)) + supervisor ! "hello" + expectMsg("hello") + } + } +} diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala index 6f9047fd16..0b19e9c463 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala @@ -67,9 +67,6 @@ object PersistentActorFailureSpec { class Supervisor(testActor: ActorRef) extends Actor { override def supervisorStrategy = OneForOneStrategy(loggingEnabled = false) { - case e: ActorKilledException ⇒ - testActor ! e - SupervisorStrategy.Stop case e ⇒ testActor ! e SupervisorStrategy.Restart @@ -100,12 +97,9 @@ object PersistentActorFailureSpec { val failingRecover: Receive = { case Evt(data) if data == "bad" ⇒ throw new SimulatedException("Simulated exception from receiveRecover") - - case r @ RecoveryFailure(cause) if recoveryFailureProbe.isDefined ⇒ - recoveryFailureProbe.foreach { _ ! r } } - override def receiveRecover: Receive = failingRecover orElse super.receiveRecover + override def receiveRecover: Receive = failingRecover.orElse[Any, Unit](super.receiveRecover) } @@ -150,7 +144,7 @@ class PersistentActorFailureSpec extends PersistenceSpec(PersistenceSpec.config( } "A persistent actor" must { - "throw ActorKilledException if recovery from persisted events fail" in { + "stop if recovery from persisted events fail" in { val persistentActor = namedPersistentActor[Behavior1PersistentActor] persistentActor ! Cmd("corrupt") persistentActor ! GetState @@ -158,107 +152,36 @@ class PersistentActorFailureSpec extends PersistenceSpec(PersistenceSpec.config( // recover by creating another with same name system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[Behavior1PersistentActor], name) - expectMsgType[ActorRef] - expectMsgType[ActorKilledException] + val ref = expectMsgType[ActorRef] + watch(ref) + expectTerminated(ref) } - "throw ActorKilledException if persist fails" in { + "stop if persist fails" in { system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[Behavior1PersistentActor], name) val persistentActor = expectMsgType[ActorRef] + watch(persistentActor) persistentActor ! Cmd("wrong") - expectMsgType[ActorKilledException] + expectTerminated(persistentActor) } - "throw ActorKilledException if persistAsync fails" in { + "stop if persistAsync fails" in { system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[AsyncPersistPersistentActor], name) val persistentActor = expectMsgType[ActorRef] persistentActor ! Cmd("a") + watch(persistentActor) expectMsg("a") // reply before persistAsync expectMsg("a-1") // reply after successful persistAsync persistentActor ! Cmd("wrong") expectMsg("wrong") // reply before persistAsync - expectMsgType[ActorKilledException] + expectTerminated(persistentActor) } - "throw ActorKilledException if receiveRecover fails" in { + "stop if receiveRecover fails" in { prepareFailingRecovery() // recover by creating another with same name system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[FailingRecovery], name) - expectMsgType[ActorRef] - expectMsgType[ActorKilledException] - } - "include failing event in RecoveryFailure message" in { - prepareFailingRecovery() - - // recover by creating another with same name - val probe = TestProbe() - system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[FailingRecovery], name, Some(probe.ref)) - expectMsgType[ActorRef] - val recoveryFailure = probe.expectMsgType[RecoveryFailure] - recoveryFailure.payload should ===(Some(Evt("bad"))) - recoveryFailure.sequenceNr should ===(Some(3L)) - } - "continue by handling RecoveryFailure" in { - prepareFailingRecovery() - - // recover by creating another with same name - val probe = TestProbe() - system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[FailingRecovery], name, Some(probe.ref)) - val persistentActor = expectMsgType[ActorRef] - val recoveryFailure = probe.expectMsgType[RecoveryFailure] - // continue - persistentActor ! Cmd("d") - persistentActor ! GetState - // "bad" failed, and "c" was not replayed - expectMsg(List("a", "b", "d")) - } - "support resume after recovery failure" in { - prepareFailingRecovery() - - // recover by creating another with same name - system.actorOf(Props(classOf[ResumingSupervisor], testActor)) ! Props(classOf[FailingRecovery], name) - val persistentActor = expectMsgType[ActorRef] - expectMsgType[ActorKilledException] // from supervisor - // resume - persistentActor ! Cmd("d") - persistentActor ! GetState - // "bad" failed, and "c" was not replayed - expectMsg(List("a", "b", "d")) - } - "support resume after persist failure" in { - system.actorOf(Props(classOf[ResumingSupervisor], testActor)) ! Props(classOf[Behavior1PersistentActor], name) - val persistentActor = expectMsgType[ActorRef] - persistentActor ! Cmd("a") - persistentActor ! Cmd("wrong") - persistentActor ! Cmd("b") - // Behavior1PersistentActor persists 2 events per Cmd, - // and therefore 2 exceptions from supervisor - expectMsgType[ActorKilledException] - expectMsgType[ActorKilledException] - persistentActor ! Cmd("c") - persistentActor ! GetState - expectMsg(List("a-1", "a-2", "b-1", "b-2", "c-1", "c-2")) - } - "support resume when persist followed by exception" in { - system.actorOf(Props(classOf[ResumingSupervisor], testActor)) ! Props(classOf[ThrowingActor1], name) - val persistentActor = expectMsgType[ActorRef] - persistentActor ! Cmd("a") - persistentActor ! Cmd("err") - persistentActor ! Cmd("b") - expectMsgType[SimulatedException] // from supervisor - persistentActor ! Cmd("c") - persistentActor ! GetState - expectMsg(List("a", "err", "b", "c")) - } - "support resume when persist handler throws exception" in { - system.actorOf(Props(classOf[ResumingSupervisor], testActor)) ! Props(classOf[ThrowingActor2], name) - val persistentActor = expectMsgType[ActorRef] - persistentActor ! Cmd("a") - persistentActor ! Cmd("b") - persistentActor ! Cmd("err") - persistentActor ! Cmd("c") - expectMsgType[SimulatedException] // from supervisor - persistentActor ! Cmd("d") - persistentActor ! GetState - expectMsg(List("a", "b", "c", "d")) + val ref = expectMsgType[ActorRef] + watch(ref) + expectTerminated(ref) } } diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala index 2cc22d8195..b84bc636a3 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala @@ -42,9 +42,6 @@ object PersistentViewSpec { case "boom" ⇒ throw new TestException("boom") - case RecoveryFailure(cause) ⇒ - throw cause // restart - case payload if isPersistent && shouldFailOn(payload) ⇒ throw new TestException("boom") diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java index 4f3a19d263..65b6c2caba 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java @@ -12,9 +12,11 @@ import akka.actor.Props; import akka.japi.pf.ReceiveBuilder; import akka.persistence.*; import scala.Option; +import scala.concurrent.duration.Duration; import scala.PartialFunction; import scala.runtime.BoxedUnit; import java.io.Serializable; +import java.util.concurrent.TimeUnit; public class LambdaPersistenceDocTest { @@ -39,7 +41,7 @@ public class LambdaPersistenceDocTest { persistentActor.tell(Recover.create(), null); //#recover-explicit } - + }; static Object o2 = new Object() { @@ -83,14 +85,14 @@ public class LambdaPersistenceDocTest { public PartialFunction receiveRecover() { return ReceiveBuilder. match(String.class, evt -> {/* ... */}).build(); - } + } } //#recovery-completed class MyPersistentActor5 extends AbstractPersistentActor { - @Override public String persistenceId() { + @Override public String persistenceId() { return "my-stable-persistence-id"; } @@ -107,15 +109,32 @@ public class LambdaPersistenceDocTest { return ReceiveBuilder. match(String.class, s -> s.equals("cmd"), s -> persist("evt", this::handleEvent)).build(); - } - + } + private void handleEvent(String event) { // update state // ... } - + } //#recovery-completed + + abstract class MyActor extends AbstractPersistentActor { + //#backoff + @Override + public void preStart() throws Exception { + final Props childProps = Props.create(MyPersistentActor1.class); + final Props props = BackoffSupervisor.props( + childProps, + "myActor", + Duration.create(3, TimeUnit.SECONDS), + Duration.create(30, TimeUnit.SECONDS), + 0.2); + context().actorOf(props, "mySupervisor"); + super.preStart(); + } + //#backoff + } }; static Object fullyDisabledRecoveyExample = new Object() { @@ -129,32 +148,32 @@ public class LambdaPersistenceDocTest { static Object atLeastOnceExample = new Object() { //#at-least-once-example - + class Msg implements Serializable { private static final long serialVersionUID = 1L; public final long deliveryId; public final String s; - + public Msg(long deliveryId, String s) { this.deliveryId = deliveryId; this.s = s; } } - + class Confirm implements Serializable { private static final long serialVersionUID = 1L; public final long deliveryId; - + public Confirm(long deliveryId) { this.deliveryId = deliveryId; } } - - + + class MsgSent implements Serializable { private static final long serialVersionUID = 1L; public final String s; - + public MsgSent(String s) { this.s = s; } @@ -162,12 +181,12 @@ public class LambdaPersistenceDocTest { class MsgConfirmed implements Serializable { private static final long serialVersionUID = 1L; public final long deliveryId; - + public MsgConfirmed(long deliveryId) { this.deliveryId = deliveryId; } } - + class MyPersistentActor extends AbstractPersistentActorWithAtLeastOnceDelivery { private final ActorPath destination; @@ -175,7 +194,7 @@ public class LambdaPersistenceDocTest { this.destination = destination; } - @Override public String persistenceId() { + @Override public String persistenceId() { return "persistence-id"; } @@ -195,8 +214,8 @@ public class LambdaPersistenceDocTest { public PartialFunction receiveRecover() { return ReceiveBuilder. match(Object.class, evt -> updateState(evt)).build(); - } - + } + void updateState(Object event) { if (event instanceof MsgSent) { final MsgSent evt = (MsgSent) event; @@ -219,7 +238,7 @@ public class LambdaPersistenceDocTest { } } //#at-least-once-example - + }; static Object o4 = new Object() { @@ -243,7 +262,7 @@ public class LambdaPersistenceDocTest { } //#save-snapshot - @Override public String persistenceId() { + @Override public String persistenceId() { return "persistence-id"; } @@ -256,7 +275,7 @@ public class LambdaPersistenceDocTest { }; static Object o5 = new Object() { - + class MyPersistentActor extends AbstractPersistentActor { //#snapshot-offer private Object state; @@ -271,16 +290,16 @@ public class LambdaPersistenceDocTest { } //#snapshot-offer - @Override public String persistenceId() { + @Override public String persistenceId() { return "persistence-id"; } @Override public PartialFunction receiveCommand() { return ReceiveBuilder. match(String.class, s -> {/* ...*/}).build(); - } + } } - + class MyActor extends AbstractActor { ActorRef persistentActor; @@ -306,7 +325,7 @@ public class LambdaPersistenceDocTest { //#persist-async class MyPersistentActor extends AbstractPersistentActor { - @Override public String persistenceId() { + @Override public String persistenceId() { return "my-stable-persistence-id"; } @@ -356,7 +375,7 @@ public class LambdaPersistenceDocTest { //#defer class MyPersistentActor extends AbstractPersistentActor { - @Override public String persistenceId() { + @Override public String persistenceId() { return "my-stable-persistence-id"; }