=per Allow persisting events when recovery has completed (#21893)

* be able to persist events that happen during recovery, #21736

* prohibit persiting events during recovery, #21736

* change error message #21736
This commit is contained in:
ortigali 2016-12-14 17:32:27 +05:00 committed by Konrad `ktoso` Malawski
parent e29e06f850
commit ef1e0e01a2
3 changed files with 56 additions and 7 deletions

View file

@ -316,6 +316,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
* @param handler handler for each persisted `event`
*/
def persist[A](event: A)(handler: A Unit): Unit = {
if (recoveryRunning) throw new IllegalStateException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.")
pendingStashingPersistInvocations += 1
pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any Unit])
eventBatch ::= AtomicWrite(PersistentRepr(event, persistenceId = persistenceId,
@ -331,6 +332,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
* @param handler handler for each persisted `events`
*/
def persistAll[A](events: immutable.Seq[A])(handler: A Unit): Unit = {
if (recoveryRunning) throw new IllegalStateException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.")
if (events.nonEmpty) {
events.foreach { event
pendingStashingPersistInvocations += 1
@ -369,6 +371,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
* @param handler handler for each persisted `event`
*/
def persistAsync[A](event: A)(handler: A Unit): Unit = {
if (recoveryRunning) throw new IllegalStateException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.")
pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any Unit])
eventBatch ::= AtomicWrite(PersistentRepr(event, persistenceId = persistenceId,
sequenceNr = nextSequenceNr(), writerUuid = writerUuid, sender = sender()))
@ -382,7 +385,8 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
* @param events events to be persisted
* @param handler handler for each persisted `events`
*/
def persistAllAsync[A](events: immutable.Seq[A])(handler: A Unit): Unit =
def persistAllAsync[A](events: immutable.Seq[A])(handler: A Unit): Unit = {
if (recoveryRunning) throw new IllegalStateException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.")
if (events.nonEmpty) {
events.foreach { event
pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any Unit])
@ -390,6 +394,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
eventBatch ::= AtomicWrite(events.map(PersistentRepr(_, persistenceId = persistenceId,
sequenceNr = nextSequenceNr(), writerUuid = writerUuid, sender = sender())))
}
}
@deprecated("use persistAllAsync instead", "2.4")
def persistAsync[A](events: immutable.Seq[A])(handler: A Unit): Unit =
@ -413,6 +418,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
* @param handler handler for the given `event`
*/
def deferAsync[A](event: A)(handler: A Unit): Unit = {
if (recoveryRunning) throw new IllegalStateException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.")
if (pendingInvocations.isEmpty) {
handler(event)
} else {
@ -537,10 +543,11 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
context.system.scheduler.schedule(timeout, timeout, self, RecoveryTick(snapshot = false))
}
var eventSeenInInterval = false
var _recoveryRunning = true
override def toString: String = "replay started"
override def recoveryRunning: Boolean = true
override def recoveryRunning: Boolean = _recoveryRunning
override def stateReceive(receive: Receive, message: Any) = message match {
case ReplayedMessage(p)
@ -556,18 +563,18 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
case RecoverySuccess(highestSeqNr)
timeoutCancellable.cancel()
onReplaySuccess() // callback for subclass implementation
changeState(processingCommands)
sequenceNr = highestSeqNr
setLastSequenceNr(highestSeqNr)
internalStash.unstashAll()
Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted)
_recoveryRunning = false
try Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted)
finally transitToProcessingState()
case ReplayMessagesFailure(cause)
timeoutCancellable.cancel()
try onRecoveryFailure(cause, event = None) finally context.stop(self)
case RecoveryTick(false) if !eventSeenInInterval
timeoutCancellable.cancel()
try onRecoveryFailure(
new RecoveryTimedOut(s"Recovery timed out, didn't get event within $timeout, highest sequence number seen $sequenceNr"),
new RecoveryTimedOut(s"Recovery timed out, didn't get event within $timeout, highest sequence number seen $lastSequenceNr"),
event = None)
finally context.stop(self)
case RecoveryTick(false)
@ -577,6 +584,17 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
case other
stashInternally(other)
}
private def transitToProcessingState(): Unit = {
if (eventBatch.nonEmpty) flushBatch()
if (pendingStashingPersistInvocations > 0) changeState(persistingEvents)
else {
changeState(processingCommands)
internalStash.unstashAll()
}
}
}
private def flushBatch() {

View file

@ -43,7 +43,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
case "fail" ReplayFilter.Fail
case "warn" ReplayFilter.Warn
case other throw new IllegalArgumentException(
s"invalid replay-filter.mode [$other], supported values [off, repair, fail, warn]")
s"invalid replay-filter.mode [$other], supported values [off, repair-by-discard-old, fail, warn]")
}
private def isReplayFilterEnabled: Boolean = replayFilterMode != ReplayFilter.Disabled
private val replayFilterWindowSize: Int = config.getInt("replay-filter.window-size")

View file

@ -646,6 +646,23 @@ object PersistentActorSpec {
}
}
class PersistInRecovery(name: String) extends ExamplePersistentActor(name) {
override def receiveRecover = {
case Evt("invalid")
persist(Evt("invalid-recovery"))(updateState)
case e: Evt updateState(e)
case RecoveryCompleted
persistAsync(Evt("rc-1"))(updateState)
persist(Evt("rc-2"))(updateState)
persistAsync(Evt("rc-3"))(updateState)
}
override def onRecoveryFailure(cause: scala.Throwable, event: Option[Any]): Unit = ()
def receiveCommand = commonBehavior orElse {
case Cmd(d) persist(Evt(d))(updateState)
}
}
}
abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(config) with ImplicitSender {
@ -1119,6 +1136,20 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi
persistentActor ! "Boom"
expectMsg("failed with TestException while processing Boom")
}
"be able to persist events that happen during recovery" in {
val persistentActor = namedPersistentActor[PersistInRecovery]
persistentActor ! GetState
expectMsg(List("a-1", "a-2", "rc-1", "rc-2"))
persistentActor ! GetState
expectMsg(List("a-1", "a-2", "rc-1", "rc-2", "rc-3"))
persistentActor ! Cmd("invalid")
persistentActor ! GetState
expectMsg(List("a-1", "a-2", "rc-1", "rc-2", "rc-3", "invalid"))
watch(persistentActor)
persistentActor ! "boom"
expectTerminated(persistentActor)
}
}
}