diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 2150e2d4b9..53c3446072 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -7,7 +7,7 @@ package akka.persistence import java.util.UUID import java.util.concurrent.atomic.AtomicInteger -import akka.actor.{ ActorCell, DeadLetter, StashOverflowException } +import akka.actor.{ Actor, ActorCell, DeadLetter, StashOverflowException } import akka.annotation.InternalApi import akka.dispatch.Envelope import akka.event.{ Logging, LoggingAdapter } @@ -459,7 +459,13 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas } private val recoveryBehavior: Receive = { - val _receiveRecover = receiveRecover + val _receiveRecover = try receiveRecover catch { + case NonFatal(e) ⇒ + try onRecoveryFailure(e, Some(e)) + finally context.stop(self) + returnRecoveryPermit() + Actor.emptyBehavior + } { case PersistentRepr(payload, _) if recoveryRunning && _receiveRecover.isDefinedAt(payload) ⇒ diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala index 7c8feb2a23..c4c378b97f 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala @@ -663,6 +663,11 @@ object PersistentActorSpec { case Cmd(d) ⇒ persist(Evt(d))(updateState) } } + + class ExceptionActor(name: String) extends ExamplePersistentActor(name) { + override def receiveCommand = commonBehavior + override def receiveRecover = throw new TestException("boom") + } } abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(config) with ImplicitSender { @@ -1176,6 +1181,12 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi persistentActor ! "boom" expectTerminated(persistentActor) } + + "stop actor when direct exception from receiveRecover" in { + val persistentActor = namedPersistentActor[ExceptionActor] + watch(persistentActor) + expectTerminated(persistentActor) + } } }