fix double return of recovery permit, #24003
* when exception was thrown from RecoveryCompleted the permit was returned twice, resulting in negative balance
This commit is contained in:
parent
1f8b0137a8
commit
60e7a4088d
2 changed files with 40 additions and 6 deletions
|
|
@ -541,7 +541,9 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
|
||||||
setLastSequenceNr(highestSeqNr)
|
setLastSequenceNr(highestSeqNr)
|
||||||
_recoveryRunning = false
|
_recoveryRunning = false
|
||||||
try Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted)
|
try Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted)
|
||||||
finally transitToProcessingState()
|
finally transitToProcessingState() // in finally in case exception and resume strategy
|
||||||
|
// if exception from RecoveryCompleted the permit is returned in below catch
|
||||||
|
returnRecoveryPermit()
|
||||||
case ReplayMessagesFailure(cause) ⇒
|
case ReplayMessagesFailure(cause) ⇒
|
||||||
timeoutCancellable.cancel()
|
timeoutCancellable.cancel()
|
||||||
try onRecoveryFailure(cause, event = None) finally context.stop(self)
|
try onRecoveryFailure(cause, event = None) finally context.stop(self)
|
||||||
|
|
@ -567,8 +569,6 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
|
||||||
extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, self)
|
extension.recoveryPermitter.tell(RecoveryPermitter.ReturnRecoveryPermit, self)
|
||||||
|
|
||||||
private def transitToProcessingState(): Unit = {
|
private def transitToProcessingState(): Unit = {
|
||||||
returnRecoveryPermit()
|
|
||||||
|
|
||||||
if (eventBatch.nonEmpty) flushBatch()
|
if (eventBatch.nonEmpty) flushBatch()
|
||||||
|
|
||||||
if (pendingStashingPersistInvocations > 0) changeState(persistingEvents)
|
if (pendingStashingPersistInvocations > 0) changeState(persistingEvents)
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,8 @@
|
||||||
package akka.persistence
|
package akka.persistence
|
||||||
|
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
import scala.util.control.NoStackTrace
|
||||||
|
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.event.Logging
|
import akka.event.Logging
|
||||||
import akka.event.Logging.Warning
|
import akka.event.Logging.Warning
|
||||||
|
|
@ -14,10 +16,13 @@ import akka.testkit.TestActors
|
||||||
|
|
||||||
object RecoveryPermitterSpec {
|
object RecoveryPermitterSpec {
|
||||||
|
|
||||||
def testProps(name: String, probe: ActorRef): Props =
|
class TestExc extends RuntimeException("simulated exc") with NoStackTrace
|
||||||
Props(new TestPersistentActor(name, probe))
|
|
||||||
|
|
||||||
class TestPersistentActor(name: String, probe: ActorRef) extends PersistentActor {
|
def testProps(name: String, probe: ActorRef, throwFromRecoveryCompleted: Boolean = false): Props =
|
||||||
|
Props(new TestPersistentActor(name, probe, throwFromRecoveryCompleted))
|
||||||
|
|
||||||
|
class TestPersistentActor(name: String, probe: ActorRef, throwFromRecoveryCompleted: Boolean)
|
||||||
|
extends PersistentActor {
|
||||||
|
|
||||||
override def persistenceId = name
|
override def persistenceId = name
|
||||||
|
|
||||||
|
|
@ -28,6 +33,8 @@ object RecoveryPermitterSpec {
|
||||||
override def receiveRecover: Receive = {
|
override def receiveRecover: Receive = {
|
||||||
case RecoveryCompleted ⇒
|
case RecoveryCompleted ⇒
|
||||||
probe ! RecoveryCompleted
|
probe ! RecoveryCompleted
|
||||||
|
if (throwFromRecoveryCompleted)
|
||||||
|
throw new TestExc
|
||||||
}
|
}
|
||||||
override def receiveCommand: Receive = {
|
override def receiveCommand: Receive = {
|
||||||
case "stop" ⇒
|
case "stop" ⇒
|
||||||
|
|
@ -46,6 +53,8 @@ class RecoveryPermitterSpec extends PersistenceSpec(ConfigFactory.parseString(
|
||||||
import RecoveryPermitterSpec._
|
import RecoveryPermitterSpec._
|
||||||
import RecoveryPermitter._
|
import RecoveryPermitter._
|
||||||
|
|
||||||
|
system.eventStream.publish(TestEvent.Mute(EventFilter[TestExc]()))
|
||||||
|
|
||||||
val permitter = Persistence(system).recoveryPermitter
|
val permitter = Persistence(system).recoveryPermitter
|
||||||
val p1 = TestProbe()
|
val p1 = TestProbe()
|
||||||
val p2 = TestProbe()
|
val p2 = TestProbe()
|
||||||
|
|
@ -152,6 +161,31 @@ class RecoveryPermitterSpec extends PersistenceSpec(ConfigFactory.parseString(
|
||||||
permitter.tell(ReturnRecoveryPermit, p3.ref)
|
permitter.tell(ReturnRecoveryPermit, p3.ref)
|
||||||
permitter.tell(ReturnRecoveryPermit, p4.ref)
|
permitter.tell(ReturnRecoveryPermit, p4.ref)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"return permit when actor throws from RecoveryCompleted" in {
|
||||||
|
requestPermit(p1)
|
||||||
|
requestPermit(p2)
|
||||||
|
|
||||||
|
val persistentActor = system.actorOf(testProps("p3", p3.ref, throwFromRecoveryCompleted = true))
|
||||||
|
p3.expectMsg(RecoveryCompleted)
|
||||||
|
p3.expectMsg("postStop")
|
||||||
|
// it's restarting
|
||||||
|
(1 to 5).foreach { _ ⇒
|
||||||
|
p3.expectMsg(RecoveryCompleted)
|
||||||
|
p3.expectMsg("postStop")
|
||||||
|
}
|
||||||
|
// stop it
|
||||||
|
val stopProbe = TestProbe()
|
||||||
|
stopProbe.watch(persistentActor)
|
||||||
|
system.stop(persistentActor)
|
||||||
|
stopProbe.expectTerminated(persistentActor)
|
||||||
|
|
||||||
|
requestPermit(p4)
|
||||||
|
|
||||||
|
permitter.tell(ReturnRecoveryPermit, p1.ref)
|
||||||
|
permitter.tell(ReturnRecoveryPermit, p2.ref)
|
||||||
|
permitter.tell(ReturnRecoveryPermit, p4.ref)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue