diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 8b79336755..efdc1db2e6 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -541,7 +541,9 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas setLastSequenceNr(highestSeqNr) _recoveryRunning = false try Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted) - finally transitToProcessingState() + finally transitToProcessingState() // in finally in case exception and resume strategy + // if exception from RecoveryCompleted the permit is returned in below catch + returnRecoveryPermit() case ReplayMessagesFailure(cause) ⇒ timeoutCancellable.cancel() try onRecoveryFailure(cause, event = None) finally context.stop(self) @@ -567,8 +569,6 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, self) private def transitToProcessingState(): Unit = { - returnRecoveryPermit() - if (eventBatch.nonEmpty) flushBatch() if (pendingStashingPersistInvocations > 0) changeState(persistingEvents) diff --git a/akka-persistence/src/test/scala/akka/persistence/RecoveryPermitterSpec.scala b/akka-persistence/src/test/scala/akka/persistence/RecoveryPermitterSpec.scala index ffc04155b7..6285d38a63 100644 --- a/akka-persistence/src/test/scala/akka/persistence/RecoveryPermitterSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/RecoveryPermitterSpec.scala @@ -4,6 +4,8 @@ package akka.persistence import scala.concurrent.duration._ +import scala.util.control.NoStackTrace + import akka.actor._ import akka.event.Logging import akka.event.Logging.Warning @@ -14,10 +16,13 @@ import akka.testkit.TestActors object RecoveryPermitterSpec { - def testProps(name: String, probe: ActorRef): Props = - Props(new TestPersistentActor(name, probe)) + class TestExc extends RuntimeException("simulated exc") with NoStackTrace - class TestPersistentActor(name: String, probe: ActorRef) extends PersistentActor { + def testProps(name: String, probe: ActorRef, throwFromRecoveryCompleted: Boolean = false): Props = + Props(new TestPersistentActor(name, probe, throwFromRecoveryCompleted)) + + class TestPersistentActor(name: String, probe: ActorRef, throwFromRecoveryCompleted: Boolean) + extends PersistentActor { override def persistenceId = name @@ -28,6 +33,8 @@ object RecoveryPermitterSpec { override def receiveRecover: Receive = { case RecoveryCompleted ⇒ probe ! RecoveryCompleted + if (throwFromRecoveryCompleted) + throw new TestExc } override def receiveCommand: Receive = { case "stop" ⇒ @@ -46,6 +53,8 @@ class RecoveryPermitterSpec extends PersistenceSpec(ConfigFactory.parseString( import RecoveryPermitterSpec._ import RecoveryPermitter._ + system.eventStream.publish(TestEvent.Mute(EventFilter[TestExc]())) + val permitter = Persistence(system).recoveryPermitter val p1 = TestProbe() val p2 = TestProbe() @@ -152,6 +161,31 @@ class RecoveryPermitterSpec extends PersistenceSpec(ConfigFactory.parseString( permitter.tell(ReturnRecoveryPermit, p3.ref) permitter.tell(ReturnRecoveryPermit, p4.ref) } + + "return permit when actor throws from RecoveryCompleted" in { + requestPermit(p1) + requestPermit(p2) + + val persistentActor = system.actorOf(testProps("p3", p3.ref, throwFromRecoveryCompleted = true)) + p3.expectMsg(RecoveryCompleted) + p3.expectMsg("postStop") + // it's restarting + (1 to 5).foreach { _ ⇒ + p3.expectMsg(RecoveryCompleted) + p3.expectMsg("postStop") + } + // stop it + val stopProbe = TestProbe() + stopProbe.watch(persistentActor) + system.stop(persistentActor) + stopProbe.expectTerminated(persistentActor) + + requestPermit(p4) + + permitter.tell(ReturnRecoveryPermit, p1.ref) + permitter.tell(ReturnRecoveryPermit, p2.ref) + permitter.tell(ReturnRecoveryPermit, p4.ref) + } } }