Merge pull request #24006 from akka/wip-24003-permit-patriknw

fix double return of recovery permit, #24003
This commit is contained in:
Patrik Nordwall 2017-11-17 10:58:47 +01:00 committed by GitHub
commit b51d720b18
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 6 deletions

View file

@ -541,7 +541,9 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
setLastSequenceNr(highestSeqNr) setLastSequenceNr(highestSeqNr)
_recoveryRunning = false _recoveryRunning = false
try Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted) try Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted)
finally transitToProcessingState() finally transitToProcessingState() // in finally in case exception and resume strategy
// if exception from RecoveryCompleted the permit is returned in below catch
returnRecoveryPermit()
case ReplayMessagesFailure(cause) case ReplayMessagesFailure(cause)
timeoutCancellable.cancel() timeoutCancellable.cancel()
try onRecoveryFailure(cause, event = None) finally context.stop(self) try onRecoveryFailure(cause, event = None) finally context.stop(self)
@ -567,8 +569,6 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, self) extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, self)
private def transitToProcessingState(): Unit = { private def transitToProcessingState(): Unit = {
returnRecoveryPermit()
if (eventBatch.nonEmpty) flushBatch() if (eventBatch.nonEmpty) flushBatch()
if (pendingStashingPersistInvocations > 0) changeState(persistingEvents) if (pendingStashingPersistInvocations > 0) changeState(persistingEvents)

View file

@ -4,6 +4,8 @@
package akka.persistence package akka.persistence
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.actor._ import akka.actor._
import akka.event.Logging import akka.event.Logging
import akka.event.Logging.Warning import akka.event.Logging.Warning
@ -14,10 +16,13 @@ import akka.testkit.TestActors
object RecoveryPermitterSpec { object RecoveryPermitterSpec {
def testProps(name: String, probe: ActorRef): Props = class TestExc extends RuntimeException("simulated exc") with NoStackTrace
Props(new TestPersistentActor(name, probe))
class TestPersistentActor(name: String, probe: ActorRef) extends PersistentActor { def testProps(name: String, probe: ActorRef, throwFromRecoveryCompleted: Boolean = false): Props =
Props(new TestPersistentActor(name, probe, throwFromRecoveryCompleted))
class TestPersistentActor(name: String, probe: ActorRef, throwFromRecoveryCompleted: Boolean)
extends PersistentActor {
override def persistenceId = name override def persistenceId = name
@ -28,6 +33,8 @@ object RecoveryPermitterSpec {
override def receiveRecover: Receive = { override def receiveRecover: Receive = {
case RecoveryCompleted case RecoveryCompleted
probe ! RecoveryCompleted probe ! RecoveryCompleted
if (throwFromRecoveryCompleted)
throw new TestExc
} }
override def receiveCommand: Receive = { override def receiveCommand: Receive = {
case "stop" case "stop"
@ -46,6 +53,8 @@ class RecoveryPermitterSpec extends PersistenceSpec(ConfigFactory.parseString(
import RecoveryPermitterSpec._ import RecoveryPermitterSpec._
import RecoveryPermitter._ import RecoveryPermitter._
system.eventStream.publish(TestEvent.Mute(EventFilter[TestExc]()))
val permitter = Persistence(system).recoveryPermitter val permitter = Persistence(system).recoveryPermitter
val p1 = TestProbe() val p1 = TestProbe()
val p2 = TestProbe() val p2 = TestProbe()
@ -152,6 +161,31 @@ class RecoveryPermitterSpec extends PersistenceSpec(ConfigFactory.parseString(
permitter.tell(ReturnRecoveryPermit, p3.ref) permitter.tell(ReturnRecoveryPermit, p3.ref)
permitter.tell(ReturnRecoveryPermit, p4.ref) permitter.tell(ReturnRecoveryPermit, p4.ref)
} }
"return permit when actor throws from RecoveryCompleted" in {
requestPermit(p1)
requestPermit(p2)
val persistentActor = system.actorOf(testProps("p3", p3.ref, throwFromRecoveryCompleted = true))
p3.expectMsg(RecoveryCompleted)
p3.expectMsg("postStop")
// it's restarting
(1 to 5).foreach { _
p3.expectMsg(RecoveryCompleted)
p3.expectMsg("postStop")
}
// stop it
val stopProbe = TestProbe()
stopProbe.watch(persistentActor)
system.stop(persistentActor)
stopProbe.expectTerminated(persistentActor)
requestPermit(p4)
permitter.tell(ReturnRecoveryPermit, p1.ref)
permitter.tell(ReturnRecoveryPermit, p2.ref)
permitter.tell(ReturnRecoveryPermit, p4.ref)
}
} }
} }