!per Make persistent failures fatal

* remove PersistentFailure and RecoveryFailure messages
* use stop instead of ActorKilledException
* adjust PersistentView
* adjust AtLeastOnceDeliveryFailureSpec
* adjust sharding
* add BackoffSupervisor
This commit is contained in:
Patrik Nordwall 2015-06-01 19:03:00 +02:00
parent 1eaebcedb8
commit 6d26b3e591
21 changed files with 566 additions and 446 deletions

View file

@ -24,8 +24,9 @@ akka.cluster.sharding {
remember-entities = off
# If the coordinator can't store state changes it will be stopped
# and started again after this duration.
coordinator-failure-backoff = 10 s
# and started again after this duration, with an exponential back-off
# of up to 5 times this duration.
coordinator-failure-backoff = 5 s
# The ShardRegion retries registration and shard location requests to the
# ShardCoordinator with this interval if it does not reply.
@ -40,9 +41,9 @@ akka.cluster.sharding {
# Time given to a region to acknowledge it's hosting a shard.
shard-start-timeout = 10 s
# If the shard can't store state changes it will retry the action
# again after this duration. Any messages sent to an affected entity
# will be buffered until the state change is processed
# If the shard is remembering entities and can't store state changes
# will be stopped and then started again after this duration. Any messages
# sent to an affected entity may be lost in this process.
shard-failure-backoff = 10 s
# If the shard is remembering entities and an entity stops itself without

View file

@ -449,7 +449,12 @@ private[akka] class ClusterShardingGuardian extends Actor {
val shardRegion = context.child(encName).getOrElse {
if (context.child(cName).isEmpty) {
val coordinatorProps = ShardCoordinator.props(typeName, settings, allocationStrategy)
val singletonProps = ShardCoordinatorSupervisor.props(coordinatorFailureBackoff, coordinatorProps)
val singletonProps = BackoffSupervisor.props(
childProps = coordinatorProps,
childName = "coordinator",
minBackoff = coordinatorFailureBackoff,
maxBackoff = coordinatorFailureBackoff * 5,
randomFactor = 0.2).withDeploy(Deploy.local)
val singletonSettings = settings.coordinatorSingletonSettings
.withSingletonName("singleton").withRole(role)
context.actorOf(ClusterSingletonManager.props(
@ -652,6 +657,12 @@ object ShardRegion {
private case object Retry extends ShardRegionCommand
/**
* When an remembering entities and the shard stops unexpected (e.g. persist failure), we
* restart it after a back off using this message.
*/
private final case class RestartShard(shardId: ShardId)
private def roleOption(role: String): Option[String] =
if (role == "") None else Option(role)
@ -884,14 +895,17 @@ class ShardRegion(
} else if (shardsByRef.contains(ref)) {
val shardId: ShardId = shardsByRef(ref)
//Are we meant to be handing off, or is this a unknown stop?
if (handingOff.contains(ref)) {
shardsByRef = shardsByRef - ref
shards = shards - shardId
if (handingOff.contains(ref)) {
handingOff = handingOff - ref
log.debug("Shard [{}] handoff complete", shardId)
} else {
throw new IllegalStateException(s"Shard [$shardId] terminated while not being handed off.")
// if persist fails it will stop
log.debug("Shard [{}] terminated while not being handed off", shardId)
if (rememberEntities) {
context.system.scheduler.scheduleOnce(shardFailureBackoff, self, RestartShard(shardId))
}
}
if (gracefulShutdownInProgress && shards.isEmpty && shardBuffers.isEmpty)
@ -924,28 +938,44 @@ class ShardRegion(
}
}
def deliverMessage(msg: Any, snd: ActorRef): Unit = {
val shard = extractShardId(msg)
regionByShard.get(shard) match {
case Some(ref) if ref == self
getShard(shard).tell(msg, snd)
def deliverMessage(msg: Any, snd: ActorRef): Unit =
msg match {
case RestartShard(shardId)
regionByShard.get(shardId) match {
case Some(ref)
log.debug("Forwarding request for shard [{}] to [{}]", shard, ref)
if (ref == self)
getShard(shardId)
case None
if (!shardBuffers.contains(shardId)) {
log.debug("Request shard [{}] home", shardId)
coordinator.foreach(_ ! GetShardHome(shardId))
}
val buf = shardBuffers.getOrElse(shardId, Vector.empty)
shardBuffers = shardBuffers.updated(shardId, buf :+ ((msg, snd)))
}
case _
val shardId = extractShardId(msg)
regionByShard.get(shardId) match {
case Some(ref) if ref == self
getShard(shardId).tell(msg, snd)
case Some(ref)
log.debug("Forwarding request for shard [{}] to [{}]", shardId, ref)
ref.tell(msg, snd)
case None if (shard == null || shard == "")
case None if (shardId == null || shardId == "")
log.warning("Shard must not be empty, dropping message [{}]", msg.getClass.getName)
context.system.deadLetters ! msg
case None
if (!shardBuffers.contains(shard)) {
log.debug("Request shard [{}] home", shard)
coordinator.foreach(_ ! GetShardHome(shard))
if (!shardBuffers.contains(shardId)) {
log.debug("Request shard [{}] home", shardId)
coordinator.foreach(_ ! GetShardHome(shardId))
}
if (totalBufferSize >= bufferSize) {
log.debug("Buffer is full, dropping message for shard [{}]", shard)
log.debug("Buffer is full, dropping message for shard [{}]", shardId)
context.system.deadLetters ! msg
} else {
val buf = shardBuffers.getOrElse(shard, Vector.empty)
shardBuffers = shardBuffers.updated(shard, buf :+ ((msg, snd)))
val buf = shardBuffers.getOrElse(shardId, Vector.empty)
shardBuffers = shardBuffers.updated(shardId, buf :+ ((msg, snd)))
}
}
}
@ -986,21 +1016,11 @@ class ShardRegion(
private[akka] object Shard {
import ShardRegion.EntityId
object State {
val Empty = State()
}
/**
* A Shard command
*/
sealed trait ShardCommand
/**
* When a `StateChange` fails to write to the journal, we will retry it after a back
* off.
*/
final case class RetryPersistence(payload: StateChange) extends ShardCommand
/**
* When an remembering entities and the entity stops without issuing a `Passivate`, we
* restart it after a back off using this message.
@ -1022,6 +1042,10 @@ private[akka] object Shard {
*/
@SerialVersionUID(1L) final case class EntityStopped(entityId: EntityId) extends StateChange
object State {
val Empty = State()
}
/**
* Persistent state of the Shard.
*/
@ -1061,7 +1085,7 @@ private[akka] class Shard(
import ShardRegion.{ handOffStopperProps, EntityId, Msg, Passivate }
import ShardCoordinator.Internal.{ HandOff, ShardStopped }
import Shard.{ State, RetryPersistence, RestartEntity, EntityStopped, EntityStarted }
import Shard.{ State, RestartEntity, EntityStopped, EntityStarted }
import akka.cluster.sharding.ShardCoordinator.Internal.CoordinatorMessage
import akka.cluster.sharding.ShardRegion.ShardRegionCommand
import akka.persistence.RecoveryCompleted
@ -1112,12 +1136,10 @@ private[akka] class Shard(
case msg: CoordinatorMessage receiveCoordinatorMessage(msg)
case msg: ShardCommand receiveShardCommand(msg)
case msg: ShardRegionCommand receiveShardRegionCommand(msg)
case PersistenceFailure(payload: StateChange, _, _) persistenceFailure(payload)
case msg if extractEntityId.isDefinedAt(msg) deliverMessage(msg, sender())
}
def receiveShardCommand(msg: ShardCommand): Unit = msg match {
case RetryPersistence(payload) retryPersistence(payload)
case RestartEntity(id) getEntity(id)
}
@ -1132,26 +1154,6 @@ private[akka] class Shard(
case _ unhandled(msg)
}
def persistenceFailure(payload: StateChange): Unit = {
log.debug("Persistence of [{}] failed, will backoff and retry", payload)
if (!messageBuffers.isDefinedAt(payload.entityId)) {
messageBuffers = messageBuffers.updated(payload.entityId, Vector.empty)
}
import context.dispatcher
context.system.scheduler.scheduleOnce(shardFailureBackoff, self, RetryPersistence(payload))
}
def retryPersistence(payload: StateChange): Unit = {
log.debug("Retrying Persistence of [{}]", payload)
persist(payload) { _
payload match {
case msg: EntityStarted sendMsgBuffer(msg)
case msg: EntityStopped passivateCompleted(msg)
}
}
}
def handOff(replyTo: ActorRef): Unit = handOffStopper match {
case Some(_) log.warning("HandOff shard [{}] received during existing handOff", shardId)
case None
@ -1287,45 +1289,6 @@ private[akka] class Shard(
}
}
/**
* INTERNAL API
* @see [[ClusterSharding$ ClusterSharding extension]]
*/
private[akka] object ShardCoordinatorSupervisor {
/**
* Factory method for the [[akka.actor.Props]] of the [[ShardCoordinator]] actor.
*/
def props(failureBackoff: FiniteDuration, coordinatorProps: Props): Props =
Props(new ShardCoordinatorSupervisor(failureBackoff, coordinatorProps))
.withDeploy(Deploy.local)
/**
* INTERNAL API
*/
private case object StartCoordinator
}
/**
* INTERNAL API
*/
private[akka] class ShardCoordinatorSupervisor(failureBackoff: FiniteDuration, coordinatorProps: Props) extends Actor {
import ShardCoordinatorSupervisor._
def startCoordinator(): Unit = {
// it will be stopped in case of PersistenceFailure
context.watch(context.actorOf(coordinatorProps, "coordinator"))
}
override def preStart(): Unit = startCoordinator()
def receive = {
case Terminated(_)
import context.dispatcher
context.system.scheduler.scheduleOnce(failureBackoff, self, StartCoordinator)
case StartCoordinator startCoordinator()
}
}
/**
* @see [[ClusterSharding$ ClusterSharding extension]]
*/

View file

@ -177,7 +177,7 @@ class ClusterShardingLeavingSpec extends MultiNodeSpec(ClusterShardingLeavingSpe
runOn(first) {
watch(region)
expectTerminated(region, 5.seconds)
expectTerminated(region, 15.seconds)
}
enterBarrier("stopped")

View file

@ -26,6 +26,7 @@ import java.io.File
import org.apache.commons.io.FileUtils
import akka.cluster.singleton.ClusterSingletonManager
import akka.cluster.singleton.ClusterSingletonManagerSettings
import akka.persistence.BackoffSupervisor
object ClusterShardingSpec extends MultiNodeConfig {
val controller = role("controller")
@ -198,9 +199,14 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
List("counter", "rebalancingCounter", "PersistentCounterEntities", "AnotherPersistentCounter",
"PersistentCounter", "RebalancingPersistentCounter", "AutoMigrateRegionTest").foreach { typeName
val rebalanceEnabled = typeName.toLowerCase.startsWith("rebalancing")
val singletonProps = BackoffSupervisor.props(
childProps = coordinatorProps(typeName, rebalanceEnabled),
childName = "coordinator",
minBackoff = 5.seconds,
maxBackoff = 5.seconds,
randomFactor = 0.1).withDeploy(Deploy.local)
system.actorOf(ClusterSingletonManager.props(
singletonProps = ShardCoordinatorSupervisor.props(failureBackoff = 5.seconds,
coordinatorProps(typeName, rebalanceEnabled)),
singletonProps,
terminationMessage = PoisonPill,
settings = ClusterSingletonManagerSettings(system)),
name = typeName + "Coordinator")

View file

@ -4,6 +4,8 @@
package docs.persistence;
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.Duration;
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
@ -12,8 +14,8 @@ import akka.actor.UntypedActor;
import akka.japi.Function;
import akka.japi.Procedure;
import akka.persistence.*;
import scala.Option;
import scala.Option;
import java.io.Serializable;
public class PersistenceDocTest {
@ -104,6 +106,23 @@ public class PersistenceDocTest {
}
//#recovery-completed
}
abstract class MyActor extends UntypedPersistentActor {
//#backoff
@Override
public void preStart() throws Exception {
final Props childProps = Props.create(MyPersistentActor1.class);
final Props props = BackoffSupervisor.props(
childProps,
"myActor",
Duration.create(3, TimeUnit.SECONDS),
Duration.create(30, TimeUnit.SECONDS),
0.2);
getContext().actorOf(props, "mySupervisor");
super.preStart();
}
//#backoff
}
};
static Object fullyDisabledRecoveryExample = new Object() {

View file

@ -137,9 +137,14 @@ When persisting events with ``persist`` it is guaranteed that the persistent act
the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist``
calls in context of a single command.
If persistence of an event fails, the persistent actor will be stopped by throwing :class:`ActorKilledException`.
This can be customized by handling ``PersistenceFailure`` message in ``receiveCommand`` and/or defining
``supervisorStrategy`` in parent actor.
If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default)
and the actor will unconditionally be stopped. The reason that it cannot resume when persist fails
is that it is unknown if the even was actually persisted or not, and therefore it is in an inconsistent
state. Restarting on persistent failures will most likely fail anyway, since the journal is probably
unavailable. It is better to stop the actor and after a back-off timeout start it again. The
``akka.persistence.BackoffSupervisor`` actor is provided to support such restarts.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#backoff
The easiest way to run this example yourself is to download `Typesafe Activator <http://www.typesafe.com/platform/getstarted>`_
and open the tutorial named `Akka Persistence Samples in Java with Lambdas <http://www.typesafe.com/activator/template/akka-sample-persistence-java-lambda>`_.
@ -215,9 +220,8 @@ and before any other received messages.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recovery-completed
If there is a problem with recovering the state of the actor from the journal, the actor will be
sent a :class:`RecoveryFailure` message that it can choose to handle in ``receiveRecover``. If the
actor doesn't handle the :class:`RecoveryFailure` message it will be stopped by throwing :class:`ActorKilledException`.
If there is a problem with recovering the state of the actor from the journal, the error will be logged and the
actor will be stopped.
Relaxed local consistency requirements and high throughput use-cases

View file

@ -138,9 +138,14 @@ When persisting events with ``persist`` it is guaranteed that the persistent act
the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist``
calls in context of a single command.
If persistence of an event fails, the persistent actor will be stopped by throwing :class:`ActorKilledException`.
This can be customized by handling ``PersistenceFailure`` message in ``onReceiveCommand`` and/or defining
``supervisorStrategy`` in parent actor.
If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default)
and the actor will unconditionally be stopped. The reason that it cannot resume when persist fails
is that it is unknown if the even was actually persisted or not, and therefore it is in an inconsistent
state. Restarting on persistent failures will most likely fail anyway, since the journal is probably
unavailable. It is better to stop the actor and after a back-off timeout start it again. The
``akka.persistence.BackoffSupervisor`` actor is provided to support such restarts.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#backoff
The easiest way to run this example yourself is to download `Typesafe Activator <http://www.typesafe.com/platform/getstarted>`_
and open the tutorial named `Akka Persistence Samples with Java <http://www.typesafe.com/activator/template/akka-sample-persistence-java>`_.
@ -217,9 +222,8 @@ and before any other received messages.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-completed
If there is a problem with recovering the state of the actor from the journal, the actor will be
sent a :class:`RecoveryFailure` message that it can choose to handle in ``receiveRecover``. If the
actor doesn't handle the :class:`RecoveryFailure` message it will be stopped by throwing :class:`ActorKilledException`.
If there is a problem with recovering the state of the actor from the journal, the error will be logged and the
actor will be stopped.
.. _persist-async-java:

View file

@ -265,7 +265,7 @@ the ``rememberEntities`` flag to true in ``ClusterShardingSettings`` when callin
``ClusterSharding.start``. When configured to remember entities, whenever a ``Shard``
is rebalanced onto another node or recovers after a crash it will recreate all the
entities which were previously running in that ``Shard``. To permanently stop entities,
a ``Passivate`` message must be sent to the parent the ``Shard``, otherwise the
a ``Passivate`` message must be sent to the parent of the entity actor, otherwise the
entity will be automatically restarted after the entity restart backoff specified in
the configuration.

View file

@ -100,6 +100,23 @@ object PersistenceDocSpec {
}
}
object Backoff {
abstract class MyActor extends Actor {
import PersistAsync.MyPersistentActor
//#backoff
val childProps = Props[MyPersistentActor]
val props = BackoffSupervisor.props(
childProps,
childName = "myActor",
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2)
context.actorOf(props, name = "mySupervisor")
//#backoff
}
}
object AtLeastOnce {
//#at-least-once-example
import akka.actor.{ Actor, ActorPath }

View file

@ -123,9 +123,14 @@ When persisting events with ``persist`` it is guaranteed that the persistent act
the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist``
calls in context of a single command.
If persistence of an event fails, the persistent actor will be stopped by throwing :class:`ActorKilledException`.
This can be customized by handling ``PersistenceFailure`` message in ``receiveCommand`` and/or defining
``supervisorStrategy`` in parent actor.
If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default)
and the actor will unconditionally be stopped. The reason that it cannot resume when persist fails
is that it is unknown if the even was actually persisted or not, and therefore it is in an inconsistent
state. Restarting on persistent failures will most likely fail anyway, since the journal is probably
unavailable. It is better to stop the actor and after a back-off timeout start it again. The
``akka.persistence.BackoffSupervisor`` actor is provided to support such restarts.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#backoff
The easiest way to run this example yourself is to download `Typesafe Activator <http://www.typesafe.com/platform/getstarted>`_
and open the tutorial named `Akka Persistence Samples with Scala <http://www.typesafe.com/activator/template/akka-sample-persistence-scala>`_.
@ -201,9 +206,8 @@ and before any other received messages.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-completed
If there is a problem with recovering the state of the actor from the journal, the actor will be
sent a :class:`RecoveryFailure` message that it can choose to handle in ``receiveRecover``. If the
actor doesn't handle the :class:`RecoveryFailure` message it will be stopped by throwing :class:`ActorKilledException`.
If there is a problem with recovering the state of the actor from the journal, the error will be logged and the
actor will be stopped.
.. _persist-async-scala:

View file

@ -300,7 +300,7 @@ trait AtLeastOnceDelivery extends Eventsourced {
super.aroundPostStop()
}
override private[persistence] def onReplaySuccess(): Unit = {
override private[akka] def onReplaySuccess(): Unit = {
redeliverOverdue()
super.onReplaySuccess()
}

View file

@ -0,0 +1,148 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.DeadLetterSuppression
import akka.actor.Props
import akka.actor.Terminated
import java.util.Optional
import scala.concurrent.duration.Duration
object BackoffSupervisor {
/**
* @param childProps the [[akka.actor.Props]] of the child actor that
* will be started and supervised
* @param childName name of the child actor
* @param minBackoff minimum (initial) duration until the child actor will
* started again, if it is terminated
* @param maxBackoff the exponential back-off is capped to this duration
* @param randomFactor after calculation of the exponential back-off an additional
* random delay based on this factor is added, e.g. 0.2 adds up to 20%
* delay
*/
def props(
childProps: Props,
childName: String,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double): Props = {
require(minBackoff > Duration.Zero, "minBackoff must be > 0")
require(maxBackoff >= minBackoff, "maxBackoff must be >= minBackoff")
require(0.0 <= randomFactor && randomFactor <= 1.0, "randomFactor must be between 0.0 and 1.0")
Props(new BackoffSupervisor(childProps, childName, minBackoff, maxBackoff, randomFactor))
}
/**
* Send this message to the [[BackoffSupervisor]] and it will reply with
* [[BackoffSupervisor.CurrentChild]] containing the `ActorRef` of the current child, if any.
*/
final case object GetCurrentChild
/**
* Java API: Send this message to the [[BackoffSupervisor]] and it will reply with
* [[BackoffSupervisor.CurrentChild]] containing the `ActorRef` of the current child, if any.
*/
def getCurrentChild = GetCurrentChild
final case class CurrentChild(ref: Option[ActorRef]) {
/**
* Java API: The `ActorRef` of the current child, if any
*/
def getRef: Optional[ActorRef] = Optional.ofNullable(ref.orNull)
}
private case object StartChild extends DeadLetterSuppression
private case class ResetRestartCount(current: Int) extends DeadLetterSuppression
}
/**
* This actor can be used to supervise a child actor and start it again
* after a back-off duration if the child actor is stopped. This is useful
* for persistent actors, which are stopped in case of persistence failures.
* Just restarting them immediately would probably fail again (since the data
* store is probably unavailable). It is better to try again after a delay.
*
* It supports exponential back-off between the given `minBackoff` and
* `maxBackoff` durations. For example, if `minBackoff` is 3 seconds and
* `maxBackoff` 30 seconds the start attempts will be delayed with
* 3, 6, 12, 24, 30, 30 seconds. The exponential back-off counter is reset
* if the actor is not terminated within the `minBackoff` duration.
*
* In addition to the calculated exponential back-off an additional
* random delay based the given `randomFactor` is added, e.g. 0.2 adds up to 20%
* delay. The reason for adding a random delay is to avoid that all failing
* actors hit the backend resource at the same time.
*
* You can retrieve the current child `ActorRef` by sending `BackoffSupervisor.GetCurrentChild`
* message to this actor and it will reply with [[BackoffSupervisor.CurrentChild]] containing the
* `ActorRef` of the current child, if any.
*
* The `BackoffSupervisor` forwards all other messages to the child, if it is currently running.
*
* The child can stop itself and send a [[akka.actor.PoisonPill]] to the parent supervisor
* if it want to do an intentional stop.
*/
final class BackoffSupervisor(
childProps: Props,
childName: String,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double)
extends Actor {
import BackoffSupervisor._
import context.dispatcher
private var child: Option[ActorRef] = None
private var restartCount = 0
override def preStart(): Unit = {
startChild()
super.preStart()
}
def startChild(): Unit =
if (child == None) {
child = Some(context.watch(context.actorOf(childProps, childName)))
}
def receive = {
case Terminated(ref) if child.contains(ref)
child = None
val rnd = 1.0 + ThreadLocalRandom.current().nextDouble() * randomFactor
val restartDelay =
if (restartCount >= 30) // Duration overflow protection (> 100 years)
maxBackoff
else
(maxBackoff.min(minBackoff * math.pow(2, restartCount)) * rnd) match {
case f: FiniteDuration f
case _ maxBackoff
}
context.system.scheduler.scheduleOnce(restartDelay, self, StartChild)
restartCount += 1
case StartChild
startChild()
context.system.scheduler.scheduleOnce(minBackoff, self, ResetRestartCount(restartCount))
case ResetRestartCount(current)
if (current == restartCount)
restartCount = 0
case GetCurrentChild
sender() ! CurrentChild(child)
case msg child match {
case Some(c) c.forward(msg)
case None context.system.deadLetters.forward(msg)
}
}
}

View file

@ -7,7 +7,6 @@ package akka.persistence
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.immutable
import scala.util.control.NonFatal
import akka.actor.ActorKilledException
import akka.actor.Stash
import akka.actor.StashFactory
import akka.event.Logging
@ -92,15 +91,46 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
* Called whenever a message replay succeeds.
* May be implemented by subclass.
*/
private[persistence] def onReplaySuccess(): Unit = ()
private[akka] def onReplaySuccess(): Unit = ()
/**
* INTERNAL API.
* Called whenever a message replay fails.
* May be implemented by subclass.
* Called whenever a message replay fails. By default it logs the error.
*
* Subclass may override to customize logging.
*
* The actor is always stopped after this method has been invoked.
*
* @param cause failure cause.
* @param event the event that was processed in `receiveRecover`, if the exception
* was thrown there
*/
private[persistence] def onReplayFailure(cause: Throwable): Unit = ()
protected def onReplayFailure(cause: Throwable, event: Option[Any]): Unit =
event match {
case Some(evt)
log.error(cause, "Exception in receiveRecover when replaying event type [{}] with sequence number [{}] for " +
"persistenceId [{}].", evt.getClass.getName, lastSequenceNr, persistenceId)
case None
log.error(cause, "Persistence failure when replaying events for persistenceId [{}]. " +
"Last known sequence number [{}]", persistenceId, lastSequenceNr)
}
/**
* Called when persist fails. By default it logs the error.
* Subclass may override to customize logging and for example send negative
* acknowledgment to sender.
*
* The actor is always stopped after this method has been invoked.
*
* Note that the event may or may not have been saved, depending on the type of
* failure.
*
* @param cause failure cause.
* @param event the event that was to be persisted
*/
protected def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = {
log.error(cause, "Failed to persist event type [{}] with sequence number [{}] for persistenceId [{}].",
event.getClass.getName, seqNr, persistenceId)
}
/**
* User-overridable callback. Called when a persistent actor is started or restarted.
@ -156,17 +186,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
override def unhandled(message: Any): Unit = {
message match {
case RecoveryCompleted | ReadHighestSequenceNrSuccess | ReadHighestSequenceNrFailure // mute
case RecoveryFailure(cause)
val errorMsg = s"PersistentActor killed after recovery failure (persisten id = [${persistenceId}]). " +
"To avoid killing persistent actors on recovery failure, a PersistentActor must handle RecoveryFailure messages. " +
"RecoveryFailure was caused by: " + cause
throw new ActorKilledException(errorMsg)
case PersistenceFailure(payload, sequenceNumber, cause)
val errorMsg = "PersistentActor killed after persistence failure " +
s"(persistent id = [${persistenceId}], sequence nr = [${sequenceNumber}], payload class = [${payload.getClass.getName}]). " +
"To avoid killing persistent actors on persistence failure, a PersistentActor must handle PersistenceFailure messages. " +
"PersistenceFailure was caused by: " + cause
throw new ActorKilledException(errorMsg)
case m super.unhandled(m)
}
}
@ -203,9 +222,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
* should not perform actions that may fail, such as interacting with external services,
* for example.
*
* If recovery fails, the actor will be stopped by throwing ActorKilledException.
* This can be customized by handling [[RecoveryFailure]] message in `receiveRecover`
* and/or defining `supervisorStrategy` in parent actor.
* If there is a problem with recovering the state of the actor from the journal, the error
* will be logged and the actor will be stopped.
*
* @see [[Recover]]
*/
@ -233,9 +251,12 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
* Within an event handler, applications usually update persistent actor state using persisted event
* data, notify listeners and reply to command senders.
*
* If persistence of an event fails, the persistent actor will be stopped by throwing ActorKilledException.
* This can be customized by handling [[PersistenceFailure]] message in [[#receiveCommand]]
* and/or defining `supervisorStrategy` in parent actor.
* If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will
* unconditionally be stopped. The reason that it cannot resume when persist fails is that it
* is unknown if the even was actually persisted or not, and therefore it is in an inconsistent
* state. Restarting on persistent failures will most likely fail anyway, since the journal
* is probably unavailable. It is better to stop the actor and after a back-off timeout start
* it again.
*
* @param event event to be persisted
* @param handler handler for each persisted `event`
@ -270,9 +291,12 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
* event is the sender of the corresponding command. This means that one can reply to a command
* sender within an event `handler`.
*
* If persistence of an event fails, the persistent actor will be stopped by throwing ActorKilledException.
* This can be customized by handling [[PersistenceFailure]] message in [[#receiveCommand]]
* and/or defining `supervisorStrategy` in parent actor.
* If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will
* unconditionally be stopped. The reason that it cannot resume when persist fails is that it
* is unknown if the even was actually persisted or not, and therefore it is in an inconsistent
* state. Restarting on persistent failures will most likely fail anyway, since the journal
* is probably unavailable. It is better to stop the actor and after a back-off timeout start
* it again.
*
* @param event event to be persisted
* @param handler handler for each persisted `event`
@ -304,10 +328,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
*
* If there are no pending persist handler calls, the handler will be called immediately.
*
* In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the
* [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor by
* throwing ActorKilledException, thus the handlers will not be run.
* If persistence of an earlier event fails, the persistent actor will stop, and the `handler`
* will not be run.
*
* @param event event to be handled in the future, when preceding persist operations have been processes
* @param handler handler for the given `event`
@ -398,8 +420,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
_receiveRecover(payload)
case s: SnapshotOffer if _receiveRecover.isDefinedAt(s)
_receiveRecover(s)
case f: RecoveryFailure if _receiveRecover.isDefinedAt(f)
_receiveRecover(f)
case RecoveryCompleted if _receiveRecover.isDefinedAt(RecoveryCompleted)
_receiveRecover(RecoveryCompleted)
}
@ -428,12 +448,9 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
* events.
*
* If replay succeeds it switches to `initializing` state and requests the highest stored sequence
* number from the journal. Otherwise RecoveryFailure is emitted.
* number from the journal. Otherwise the actor is stopped.
* If replay succeeds the `onReplaySuccess` callback method is called, otherwise `onReplayFailure`.
*
* If processing of a replayed event fails, the exception is caught and
* stored for later `RecoveryFailure` message and state is changed to `recoveryFailed`.
*
* All incoming messages are stashed.
*/
private def replayStarted(recoveryBehavior: Receive) = new State {
@ -448,50 +465,19 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
Eventsourced.super.aroundReceive(recoveryBehavior, p)
} catch {
case NonFatal(t)
changeState(replayFailed(recoveryBehavior, t, p))
try onReplayFailure(t, Some(p.payload)) finally context.stop(self)
}
case ReplayMessagesSuccess
onReplaySuccess() // callback for subclass implementation
changeState(initializing(recoveryBehavior))
journal ! ReadHighestSequenceNr(lastSequenceNr, persistenceId, self)
case ReplayMessagesFailure(cause)
// in case the actor resumes the state must be initializing
changeState(initializing(recoveryBehavior))
journal ! ReadHighestSequenceNr(lastSequenceNr, persistenceId, self)
onReplayFailure(cause) // callback for subclass implementation
Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryFailure(cause)(None))
try onReplayFailure(cause, event = None) finally context.stop(self)
case other
internalStash.stash()
}
}
/**
* Consumes remaining replayed messages and then emits RecoveryFailure to the
* `receiveRecover` behavior.
*/
private def replayFailed(recoveryBehavior: Receive, cause: Throwable, failed: PersistentRepr) = new State {
override def toString: String = "replay failed"
override def recoveryRunning: Boolean = true
override def stateReceive(receive: Receive, message: Any) = message match {
case ReplayedMessage(p) updateLastSequenceNr(p)
case ReplayMessagesSuccess | ReplayMessagesFailure(_) replayCompleted()
case r: Recover // ignore
case _ internalStash.stash()
}
def replayCompleted(): Unit = {
// in case the actor resumes the state must be initializing
changeState(initializing(recoveryBehavior))
journal ! ReadHighestSequenceNr(failed.sequenceNr, persistenceId, self)
Eventsourced.super.aroundReceive(recoveryBehavior,
RecoveryFailure(cause)(Some((failed.sequenceNr, failed.payload))))
}
}
/**
* Processes the highest stored sequence number response from the journal and then switches
* to `processingCommands` state.
@ -536,10 +522,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
// while message is in flight, in that case the handler has already been discarded
if (id == instanceId) {
try {
Eventsourced.super.aroundReceive(receive, PersistenceFailure(p.payload, p.sequenceNr, cause)) // stops actor by default
onWriteMessageComplete(err = false)
} catch { case NonFatal(e) onWriteMessageComplete(err = true); throw e }
try onPersistFailure(cause, p.payload, p.sequenceNr) finally context.stop(self)
}
case LoopMessageSuccess(l, id)
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
@ -550,7 +534,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
onWriteMessageComplete(err = false)
} catch { case NonFatal(e) onWriteMessageComplete(err = true); throw e }
}
case WriteMessagesSuccessful | WriteMessagesFailed(_) // FIXME PN: WriteMessagesFailed?
case WriteMessagesSuccessful | WriteMessagesFailed(_)
// FIXME PN: WriteMessagesFailed?
if (journalBatch.isEmpty) writeInProgress = false else flushJournalBatch()
}

View file

@ -108,21 +108,21 @@ private[persistence] object JournalProtocol {
* @param persistent replayed message.
*/
final case class ReplayedMessage(persistent: PersistentRepr)
extends Response
extends Response with DeadLetterSuppression
/**
* Reply message to a successful [[ReplayMessages]] request. This reply is sent to the requestor
* after all [[ReplayedMessage]] have been sent (if any).
*/
case object ReplayMessagesSuccess
extends Response
extends Response with DeadLetterSuppression
/**
* Reply message to a failed [[ReplayMessages]] request. This reply is sent to the requestor
* if a replay could not be successfully completed.
*/
final case class ReplayMessagesFailure(cause: Throwable)
extends Response
extends Response with DeadLetterSuppression
/**
* Request to read the highest stored sequence number of a given persistent actor.

View file

@ -9,39 +9,6 @@ import akka.japi.Procedure
import akka.actor.AbstractActor
import akka.japi.Util
/**
* Sent to a [[PersistentActor]] if a journal fails to write a persistent message. If
* not handled, an `akka.actor.ActorKilledException` is thrown by that persistent actor.
*
* @param payload payload of the persistent message.
* @param sequenceNr sequence number of the persistent message.
* @param cause failure cause.
*/
@SerialVersionUID(1L)
case class PersistenceFailure(payload: Any, sequenceNr: Long, cause: Throwable)
/**
* Sent to a [[PersistentActor]] if a journal fails to replay messages or fetch that persistent actor's
* highest sequence number. If not handled, the actor will be stopped.
*
* Contains the [[#sequenceNr]] of the message that could not be replayed, if it
* failed at a specific message.
*
* Contains the [[#payload]] of the message that could not be replayed, if it
* failed at a specific message.
*/
@SerialVersionUID(1L)
case class RecoveryFailure(cause: Throwable)(failingMessage: Option[(Long, Any)]) {
override def toString: String = failingMessage match {
case Some((sequenceNr, payload)) s"RecoveryFailure(${cause.getMessage},$sequenceNr,$payload)"
case None s"RecoveryFailure(${cause.getMessage})"
}
def sequenceNr: Option[Long] = failingMessage.map { case (snr, _) snr }
def payload: Option[Any] = failingMessage.map { case (_, payload) payload }
}
abstract class RecoveryCompleted
/**
* Sent to a [[PersistentActor]] when the journal replay has been finished.
@ -150,8 +117,12 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit
* Within an event handler, applications usually update persistent actor state using persisted event
* data, notify listeners and reply to command senders.
*
* If persistence of an event fails, the persistent actor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[onReceiveCommand]].
* If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will
* unconditionally be stopped. The reason that it cannot resume when persist fails is that it
* is unknown if the even was actually persisted or not, and therefore it is in an inconsistent
* state. Restarting on persistent failures will most likely fail anyway, since the journal
* is probably unavailable. It is better to stop the actor and after a back-off timeout start
* it again.
*
* @param event event to be persisted.
* @param handler handler for each persisted `event`
@ -183,8 +154,12 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit
* event is the sender of the corresponding command. This means that one can reply to a command
* sender within an event `handler`.
*
* If persistence of an event fails, the persistent actor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[receiveCommand]].
* If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will
* unconditionally be stopped. The reason that it cannot resume when persist fails is that it
* is unknown if the even was actually persisted or not, and therefore it is in an inconsistent
* state. Restarting on persistent failures will most likely fail anyway, since the journal
* is probably unavailable. It is better to stop the actor and after a back-off timeout start
* it again.
*
* @param event event to be persisted
* @param handler handler for each persisted `event`
@ -214,9 +189,7 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit
*
* If there are no pending persist handler calls, the handler will be called immediately.
*
* In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the
* [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers
* If persistence of an earlier event fails, the persistent actor will stop, and the `handler`
* will not be run.
*
* @param event event to be handled in the future, when preceding persist operations have been processes
@ -234,8 +207,8 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit
* should not perform actions that may fail, such as interacting with external services,
* for example.
*
* If recovery fails, the actor will be stopped. This can be customized by
* handling [[RecoveryFailure]].
* If there is a problem with recovering the state of the actor from the journal, the error
* will be logged and the actor will be stopped.
*
* @see [[Recover]]
*/
@ -271,8 +244,12 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo
* Within an event handler, applications usually update persistent actor state using persisted event
* data, notify listeners and reply to command senders.
*
* If persistence of an event fails, the persistent actor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[receiveCommand]].
* If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will
* unconditionally be stopped. The reason that it cannot resume when persist fails is that it
* is unknown if the even was actually persisted or not, and therefore it is in an inconsistent
* state. Restarting on persistent failures will most likely fail anyway, since the journal
* is probably unavailable. It is better to stop the actor and after a back-off timeout start
* it again.
*
* @param event event to be persisted.
* @param handler handler for each persisted `event`
@ -299,8 +276,12 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo
* call to `persistAsync` and executing it's `handler`. This asynchronous, non-stashing, version of
* of persist should be used when you favor throughput over the strict ordering guarantees that `persist` guarantees.
*
* If persistence of an event fails, the persistent actor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[receiveCommand]].
* If persistence of an event fails, [[#onPersistFailure]] will be invoked and the actor will
* unconditionally be stopped. The reason that it cannot resume when persist fails is that it
* is unknown if the even was actually persisted or not, and therefore it is in an inconsistent
* state. Restarting on persistent failures will most likely fail anyway, since the journal
* is probably unavailable. It is better to stop the actor and after a back-off timeout start
* it again.
*
* @param event event to be persisted
* @param handler handler for each persisted `event`
@ -330,9 +311,7 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo
*
* If there are no pending persist handler calls, the handler will be called immediately.
*
* In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the
* [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers
* If persistence of an earlier event fails, the persistent actor will stop, and the `handler`
* will not be run.
*
* @param event event to be handled in the future, when preceding persist operations have been processes

View file

@ -8,12 +8,12 @@ import scala.concurrent.duration._
import scala.util.control.NonFatal
import akka.actor.AbstractActor
import akka.actor.Actor
import akka.actor.ActorKilledException
import akka.actor.Cancellable
import akka.actor.Stash
import akka.actor.StashFactory
import akka.actor.UntypedActor
import akka.dispatch.Envelope
import akka.actor.ActorLogging
/**
* Instructs a [[PersistentView]] to update itself. This will run a single incremental message replay with
@ -80,7 +80,8 @@ private[akka] object PersistentView {
* - [[autoUpdate]] for turning automated updates on or off
* - [[autoUpdateReplayMax]] for limiting the number of replayed messages per view update cycle
*/
trait PersistentView extends Actor with Snapshotter with Stash with StashFactory with PersistenceIdentity {
trait PersistentView extends Actor with Snapshotter with Stash with StashFactory with PersistenceIdentity
with ActorLogging {
import PersistentView._
import JournalProtocol._
import SnapshotProtocol.LoadSnapshotResult
@ -205,16 +206,15 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
super.postStop()
}
override def unhandled(message: Any): Unit = {
message match {
case RecoveryCompleted // mute
case RecoveryFailure(cause)
val errorMsg = s"PersistentView killed after recovery failure (persisten id = [${persistenceId}]). " +
"To avoid killing persistent actors on recovery failure, a PersistentView must handle RecoveryFailure messages. " +
"RecoveryFailure was caused by: " + cause
throw new ActorKilledException(errorMsg)
case m super.unhandled(m)
}
/**
* Called whenever a message replay fails. By default it logs the error.
* Subclass may override to customize logging.
* The `PersistentView` will not stop or throw exception due to this.
* It will try again on next update.
*/
protected def onReplayError(cause: Throwable): Unit = {
log.error(cause, "Persistence view failure when replaying events for persistenceId [{}]. " +
"Last known sequence number [{}]", persistenceId, lastSequenceNr)
}
private def changeState(state: State): Unit = {
@ -265,7 +265,6 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
sso.foreach {
case SelectedSnapshot(metadata, snapshot)
setLastSequenceNr(metadata.sequenceNr)
// Since we are recovering we can ignore the receive behavior from the stack
PersistentView.super.aroundReceive(receive, SnapshotOffer(metadata, snapshot))
}
changeState(replayStarted(await = true))
@ -279,64 +278,70 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
* events.
*
* If replay succeeds it switches to `initializing` state and requests the highest stored sequence
* number from the journal. Otherwise RecoveryFailure is emitted.
* If replay succeeds the `onReplaySuccess` callback method is called, otherwise `onReplayFailure`.
* number from the journal.
*
* If replay succeeds the `onReplaySuccess` callback method is called, otherwise `onReplayError` is called and
* remaining replay events are consumed (ignored).
*
* If processing of a replayed event fails, the exception is caught and
* stored for later `RecoveryFailure` message and state is changed to `recoveryFailed`.
* stored for later and state is changed to `recoveryFailed`.
*
* All incoming messages are stashed.
* All incoming messages are stashed when `await` is true.
*/
private def replayStarted(await: Boolean) = new State {
override def toString: String = s"replay started"
override def recoveryRunning: Boolean = true
private var stashUpdate = await
override def stateReceive(receive: Receive, message: Any) = message match {
case ScheduledUpdate(_) // ignore
case Update(false, _) // ignore
case u @ Update(true, _) if !stashUpdate
stashUpdate = true
internalStash.stash()
case r: Recover // ignore
case ReplayedMessage(p)
try {
updateLastSequenceNr(p)
PersistentView.super.aroundReceive(receive, p.payload)
} catch {
case NonFatal(t)
changeState(replayFailed(t, p))
changeState(ignoreRemainingReplay(t))
}
case ReplayMessagesSuccess
onReplayComplete(await)
onReplayComplete()
case ReplayMessagesFailure(cause)
onReplayComplete(await)
PersistentView.super.aroundReceive(receive, RecoveryFailure(cause)(None))
case other
try onReplayError(cause) finally onReplayComplete()
case ScheduledUpdate(_) // ignore
case r: Recover // ignore
case Update(a, _)
if (a)
internalStash.stash()
case other
if (await)
internalStash.stash()
else {
try {
PersistentView.super.aroundReceive(receive, other)
} catch {
case NonFatal(t)
changeState(ignoreRemainingReplay(t))
}
}
}
/**
* Switches to `idle` state and schedules the next update if `autoUpdate` returns `true`.
* Switches to `idle`
*/
private def onReplayComplete(await: Boolean): Unit = {
private def onReplayComplete(): Unit = {
changeState(idle)
if (await) internalStash.unstashAll()
internalStash.unstashAll()
}
}
/**
* Consumes remaining replayed messages and then emits RecoveryFailure to the
* `receive` behavior.
* Consumes remaining replayed messages and then throw the exception.
*/
private def replayFailed(cause: Throwable, failed: PersistentRepr) = new State {
private def ignoreRemainingReplay(cause: Throwable) = new State {
override def toString: String = "replay failed"
override def recoveryRunning: Boolean = true
override def stateReceive(receive: Receive, message: Any) = message match {
case ReplayedMessage(p) updateLastSequenceNr(p)
case ReplayedMessage(p)
case ReplayMessagesFailure(_)
replayCompleted(receive)
// journal couldn't tell the maximum stored sequence number, hence the next
@ -351,8 +356,8 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
def replayCompleted(receive: Receive): Unit = {
// in case the actor resumes the state must be `idle`
changeState(idle)
PersistentView.super.aroundReceive(receive, RecoveryFailure(cause)(Some((failed.sequenceNr, failed.payload))))
internalStash.unstashAll()
throw cause
}
}

View file

@ -36,8 +36,6 @@ object AtLeastOnceDeliveryFailureSpec {
case class Done(ints: Vector[Int])
case class Ack(i: Int)
case class ProcessingFailure(i: Int)
case class JournalingFailure(i: Int)
case class Msg(deliveryId: Long, i: Int)
case class Confirm(deliveryId: Long, i: Int)
@ -76,7 +74,6 @@ object AtLeastOnceDeliveryFailureSpec {
def receiveCommand: Receive = {
case i: Int
val failureRate = if (recoveryRunning) replayProcessingFailureRate else liveProcessingFailureRate
if (contains(i)) {
log.debug(debugMessage(s"ignored duplicate ${i}"))
sender() ! Ack(i)
@ -84,29 +81,24 @@ object AtLeastOnceDeliveryFailureSpec {
persist(MsgSent(i)) { evt
updateState(evt)
sender() ! Ack(i)
if (shouldFail(failureRate))
throw new TestException(debugMessage(s"failed at payload ${i}"))
if (shouldFail(liveProcessingFailureRate))
throw new TestException(debugMessage(s"failed at payload $i"))
else
log.debug(debugMessage(s"processed payload ${i}"))
log.debug(debugMessage(s"processed payload $i"))
}
}
case Confirm(deliveryId, i) persist(MsgConfirmed(deliveryId, i))(updateState)
case PersistenceFailure(MsgSent(i), _, _)
// inform sender about journaling failure so that it can resend
sender() ! JournalingFailure(i)
case PersistenceFailure(MsgConfirmed(_, i), _, _)
// ok, will be redelivered
}
def receiveRecover: Receive = {
case evt: Evt updateState(evt)
case RecoveryFailure(_)
// journal failed during recovery, throw exception to re-recover persistent actor
throw new TestException(debugMessage("recovery failed"))
case evt: Evt
updateState(evt)
if (shouldFail(replayProcessingFailureRate))
throw new TestException(debugMessage(s"replay failed at event $evt"))
else
log.debug(debugMessage(s"replayed event $evt"))
}
def updateState(evt: Evt): Unit = evt match {
@ -120,6 +112,14 @@ object AtLeastOnceDeliveryFailureSpec {
private def debugMessage(msg: String): String =
s"[sender] ${msg} (mode = ${if (recoveryRunning) "replay" else "live"} snr = ${lastSequenceNr} state = ${state.sorted})"
override protected def onReplayFailure(cause: Throwable, event: Option[Any]): Unit = {
// mute logging
}
override protected def onPersistFailure(cause: Throwable, event: Any, seqNr: Long): Unit = {
// mute logging
}
}
class ChaosDestination(val probe: ActorRef) extends Actor with ChaosSupport with ActorLogging {
@ -129,7 +129,7 @@ object AtLeastOnceDeliveryFailureSpec {
def receive = {
case m @ Msg(deliveryId, i)
if (shouldFail(confirmFailureRate)) {
log.error(debugMessage("confirm message failed", m))
log.debug(debugMessage("confirm message failed", m))
} else if (contains(i)) {
log.debug(debugMessage("ignored duplicate", m))
sender() ! Confirm(deliveryId, i)
@ -155,14 +155,8 @@ object AtLeastOnceDeliveryFailureSpec {
def receive = {
case Start 1 to numMessages foreach (snd ! _)
case Ack(i) acks += i
case ProcessingFailure(i)
snd ! i
log.debug(s"resent ${i} after processing failure")
case JournalingFailure(i)
snd ! i
log.debug(s"resent ${i} after journaling failure")
case Terminated(_)
// snd will be stopped if ReadHighestSequenceNr fails
// snd will be stopped if recovery or persist fails
log.debug(s"sender stopped, starting it again")
snd = createSender()
1 to numMessages foreach (i if (!acks(i)) snd ! i)
@ -173,6 +167,8 @@ object AtLeastOnceDeliveryFailureSpec {
class AtLeastOnceDeliveryFailureSpec extends AkkaSpec(AtLeastOnceDeliveryFailureSpec.config) with Cleanup with ImplicitSender {
import AtLeastOnceDeliveryFailureSpec._
muteDeadLetters(classOf[AnyRef])(system)
"AtLeastOnceDelivery" must {
"tolerate and recover from random failures" in {
system.actorOf(Props(classOf[ChaosApp], testActor), "chaosApp") ! Start

View file

@ -0,0 +1,50 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import scala.concurrent.duration._
import akka.actor._
import akka.testkit._
object BackoffSupervisorSpec {
object Child {
def props(probe: ActorRef): Props =
Props(new Child(probe))
}
class Child(probe: ActorRef) extends Actor {
def receive = {
case msg probe ! msg
}
}
}
class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender {
import BackoffSupervisorSpec._
"BackoffSupervisor" must {
"start child again when it stops" in {
val supervisor = system.actorOf(
BackoffSupervisor.props(Child.props(testActor), "c1", 100.millis, 3.seconds, 0.2))
supervisor ! BackoffSupervisor.GetCurrentChild
val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get
watch(c1)
c1 ! PoisonPill
expectTerminated(c1)
awaitAssert {
supervisor ! BackoffSupervisor.GetCurrentChild
// new instance
expectMsgType[BackoffSupervisor.CurrentChild].ref.get should !==(c1)
}
}
"forward messages to the child" in {
val supervisor = system.actorOf(
BackoffSupervisor.props(Child.props(testActor), "c2", 100.millis, 3.seconds, 0.2))
supervisor ! "hello"
expectMsg("hello")
}
}
}

View file

@ -67,9 +67,6 @@ object PersistentActorFailureSpec {
class Supervisor(testActor: ActorRef) extends Actor {
override def supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
case e: ActorKilledException
testActor ! e
SupervisorStrategy.Stop
case e
testActor ! e
SupervisorStrategy.Restart
@ -100,12 +97,9 @@ object PersistentActorFailureSpec {
val failingRecover: Receive = {
case Evt(data) if data == "bad"
throw new SimulatedException("Simulated exception from receiveRecover")
case r @ RecoveryFailure(cause) if recoveryFailureProbe.isDefined
recoveryFailureProbe.foreach { _ ! r }
}
override def receiveRecover: Receive = failingRecover orElse super.receiveRecover
override def receiveRecover: Receive = failingRecover.orElse[Any, Unit](super.receiveRecover)
}
@ -150,7 +144,7 @@ class PersistentActorFailureSpec extends PersistenceSpec(PersistenceSpec.config(
}
"A persistent actor" must {
"throw ActorKilledException if recovery from persisted events fail" in {
"stop if recovery from persisted events fail" in {
val persistentActor = namedPersistentActor[Behavior1PersistentActor]
persistentActor ! Cmd("corrupt")
persistentActor ! GetState
@ -158,107 +152,36 @@ class PersistentActorFailureSpec extends PersistenceSpec(PersistenceSpec.config(
// recover by creating another with same name
system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[Behavior1PersistentActor], name)
expectMsgType[ActorRef]
expectMsgType[ActorKilledException]
val ref = expectMsgType[ActorRef]
watch(ref)
expectTerminated(ref)
}
"throw ActorKilledException if persist fails" in {
"stop if persist fails" in {
system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[Behavior1PersistentActor], name)
val persistentActor = expectMsgType[ActorRef]
watch(persistentActor)
persistentActor ! Cmd("wrong")
expectMsgType[ActorKilledException]
expectTerminated(persistentActor)
}
"throw ActorKilledException if persistAsync fails" in {
"stop if persistAsync fails" in {
system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[AsyncPersistPersistentActor], name)
val persistentActor = expectMsgType[ActorRef]
persistentActor ! Cmd("a")
watch(persistentActor)
expectMsg("a") // reply before persistAsync
expectMsg("a-1") // reply after successful persistAsync
persistentActor ! Cmd("wrong")
expectMsg("wrong") // reply before persistAsync
expectMsgType[ActorKilledException]
expectTerminated(persistentActor)
}
"throw ActorKilledException if receiveRecover fails" in {
"stop if receiveRecover fails" in {
prepareFailingRecovery()
// recover by creating another with same name
system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[FailingRecovery], name)
expectMsgType[ActorRef]
expectMsgType[ActorKilledException]
}
"include failing event in RecoveryFailure message" in {
prepareFailingRecovery()
// recover by creating another with same name
val probe = TestProbe()
system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[FailingRecovery], name, Some(probe.ref))
expectMsgType[ActorRef]
val recoveryFailure = probe.expectMsgType[RecoveryFailure]
recoveryFailure.payload should ===(Some(Evt("bad")))
recoveryFailure.sequenceNr should ===(Some(3L))
}
"continue by handling RecoveryFailure" in {
prepareFailingRecovery()
// recover by creating another with same name
val probe = TestProbe()
system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[FailingRecovery], name, Some(probe.ref))
val persistentActor = expectMsgType[ActorRef]
val recoveryFailure = probe.expectMsgType[RecoveryFailure]
// continue
persistentActor ! Cmd("d")
persistentActor ! GetState
// "bad" failed, and "c" was not replayed
expectMsg(List("a", "b", "d"))
}
"support resume after recovery failure" in {
prepareFailingRecovery()
// recover by creating another with same name
system.actorOf(Props(classOf[ResumingSupervisor], testActor)) ! Props(classOf[FailingRecovery], name)
val persistentActor = expectMsgType[ActorRef]
expectMsgType[ActorKilledException] // from supervisor
// resume
persistentActor ! Cmd("d")
persistentActor ! GetState
// "bad" failed, and "c" was not replayed
expectMsg(List("a", "b", "d"))
}
"support resume after persist failure" in {
system.actorOf(Props(classOf[ResumingSupervisor], testActor)) ! Props(classOf[Behavior1PersistentActor], name)
val persistentActor = expectMsgType[ActorRef]
persistentActor ! Cmd("a")
persistentActor ! Cmd("wrong")
persistentActor ! Cmd("b")
// Behavior1PersistentActor persists 2 events per Cmd,
// and therefore 2 exceptions from supervisor
expectMsgType[ActorKilledException]
expectMsgType[ActorKilledException]
persistentActor ! Cmd("c")
persistentActor ! GetState
expectMsg(List("a-1", "a-2", "b-1", "b-2", "c-1", "c-2"))
}
"support resume when persist followed by exception" in {
system.actorOf(Props(classOf[ResumingSupervisor], testActor)) ! Props(classOf[ThrowingActor1], name)
val persistentActor = expectMsgType[ActorRef]
persistentActor ! Cmd("a")
persistentActor ! Cmd("err")
persistentActor ! Cmd("b")
expectMsgType[SimulatedException] // from supervisor
persistentActor ! Cmd("c")
persistentActor ! GetState
expectMsg(List("a", "err", "b", "c"))
}
"support resume when persist handler throws exception" in {
system.actorOf(Props(classOf[ResumingSupervisor], testActor)) ! Props(classOf[ThrowingActor2], name)
val persistentActor = expectMsgType[ActorRef]
persistentActor ! Cmd("a")
persistentActor ! Cmd("b")
persistentActor ! Cmd("err")
persistentActor ! Cmd("c")
expectMsgType[SimulatedException] // from supervisor
persistentActor ! Cmd("d")
persistentActor ! GetState
expectMsg(List("a", "b", "c", "d"))
val ref = expectMsgType[ActorRef]
watch(ref)
expectTerminated(ref)
}
}

View file

@ -42,9 +42,6 @@ object PersistentViewSpec {
case "boom"
throw new TestException("boom")
case RecoveryFailure(cause)
throw cause // restart
case payload if isPersistent && shouldFailOn(payload)
throw new TestException("boom")

View file

@ -12,9 +12,11 @@ import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.*;
import scala.Option;
import scala.concurrent.duration.Duration;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
public class LambdaPersistenceDocTest {
@ -116,6 +118,23 @@ public class LambdaPersistenceDocTest {
}
//#recovery-completed
abstract class MyActor extends AbstractPersistentActor {
//#backoff
@Override
public void preStart() throws Exception {
final Props childProps = Props.create(MyPersistentActor1.class);
final Props props = BackoffSupervisor.props(
childProps,
"myActor",
Duration.create(3, TimeUnit.SECONDS),
Duration.create(30, TimeUnit.SECONDS),
0.2);
context().actorOf(props, "mySupervisor");
super.preStart();
}
//#backoff
}
};
static Object fullyDisabledRecoveyExample = new Object() {