diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index eae82b1bb3..cbff3233e6 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -316,6 +316,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas * @param handler handler for each persisted `event` */ def persist[A](event: A)(handler: A ⇒ Unit): Unit = { + if (recoveryRunning) throw new IllegalStateException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.") pendingStashingPersistInvocations += 1 pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) eventBatch ::= AtomicWrite(PersistentRepr(event, persistenceId = persistenceId, @@ -331,6 +332,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas * @param handler handler for each persisted `events` */ def persistAll[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = { + if (recoveryRunning) throw new IllegalStateException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.") if (events.nonEmpty) { events.foreach { event ⇒ pendingStashingPersistInvocations += 1 @@ -369,6 +371,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas * @param handler handler for each persisted `event` */ def persistAsync[A](event: A)(handler: A ⇒ Unit): Unit = { + if (recoveryRunning) throw new IllegalStateException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.") pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) eventBatch ::= AtomicWrite(PersistentRepr(event, persistenceId = persistenceId, sequenceNr = nextSequenceNr(), writerUuid = writerUuid, sender = sender())) @@ -382,7 +385,8 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas * @param events events to be persisted * @param handler handler for each persisted `events` */ - def persistAllAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = + def persistAllAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = { + if (recoveryRunning) throw new IllegalStateException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.") if (events.nonEmpty) { events.foreach { event ⇒ pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) @@ -390,6 +394,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas eventBatch ::= AtomicWrite(events.map(PersistentRepr(_, persistenceId = persistenceId, sequenceNr = nextSequenceNr(), writerUuid = writerUuid, sender = sender()))) } + } @deprecated("use persistAllAsync instead", "2.4") def persistAsync[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = @@ -413,6 +418,7 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas * @param handler handler for the given `event` */ def deferAsync[A](event: A)(handler: A ⇒ Unit): Unit = { + if (recoveryRunning) throw new IllegalStateException("Cannot persist during replay. Events can be persisted when receiving RecoveryCompleted or later.") if (pendingInvocations.isEmpty) { handler(event) } else { @@ -537,10 +543,11 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas context.system.scheduler.schedule(timeout, timeout, self, RecoveryTick(snapshot = false)) } var eventSeenInInterval = false + var _recoveryRunning = true override def toString: String = "replay started" - override def recoveryRunning: Boolean = true + override def recoveryRunning: Boolean = _recoveryRunning override def stateReceive(receive: Receive, message: Any) = message match { case ReplayedMessage(p) ⇒ @@ -556,18 +563,18 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas case RecoverySuccess(highestSeqNr) ⇒ timeoutCancellable.cancel() onReplaySuccess() // callback for subclass implementation - changeState(processingCommands) sequenceNr = highestSeqNr setLastSequenceNr(highestSeqNr) - internalStash.unstashAll() - Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted) + _recoveryRunning = false + try Eventsourced.super.aroundReceive(recoveryBehavior, RecoveryCompleted) + finally transitToProcessingState() case ReplayMessagesFailure(cause) ⇒ timeoutCancellable.cancel() try onRecoveryFailure(cause, event = None) finally context.stop(self) case RecoveryTick(false) if !eventSeenInInterval ⇒ timeoutCancellable.cancel() try onRecoveryFailure( - new RecoveryTimedOut(s"Recovery timed out, didn't get event within $timeout, highest sequence number seen $sequenceNr"), + new RecoveryTimedOut(s"Recovery timed out, didn't get event within $timeout, highest sequence number seen $lastSequenceNr"), event = None) finally context.stop(self) case RecoveryTick(false) ⇒ @@ -577,6 +584,17 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas case other ⇒ stashInternally(other) } + + private def transitToProcessingState(): Unit = { + if (eventBatch.nonEmpty) flushBatch() + + if (pendingStashingPersistInvocations > 0) changeState(persistingEvents) + else { + changeState(processingCommands) + internalStash.unstashAll() + } + + } } private def flushBatch() { diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index 6623d6d2d1..e6a661cdc6 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -43,7 +43,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { case "fail" ⇒ ReplayFilter.Fail case "warn" ⇒ ReplayFilter.Warn case other ⇒ throw new IllegalArgumentException( - s"invalid replay-filter.mode [$other], supported values [off, repair, fail, warn]") + s"invalid replay-filter.mode [$other], supported values [off, repair-by-discard-old, fail, warn]") } private def isReplayFilterEnabled: Boolean = replayFilterMode != ReplayFilter.Disabled private val replayFilterWindowSize: Int = config.getInt("replay-filter.window-size") diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala index 13750aee3a..9ce6bc8e04 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala @@ -646,6 +646,23 @@ object PersistentActorSpec { } } + class PersistInRecovery(name: String) extends ExamplePersistentActor(name) { + override def receiveRecover = { + case Evt("invalid") ⇒ + persist(Evt("invalid-recovery"))(updateState) + case e: Evt ⇒ updateState(e) + case RecoveryCompleted ⇒ + persistAsync(Evt("rc-1"))(updateState) + persist(Evt("rc-2"))(updateState) + persistAsync(Evt("rc-3"))(updateState) + } + + override def onRecoveryFailure(cause: scala.Throwable, event: Option[Any]): Unit = () + + def receiveCommand = commonBehavior orElse { + case Cmd(d) ⇒ persist(Evt(d))(updateState) + } + } } abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(config) with ImplicitSender { @@ -1119,6 +1136,20 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi persistentActor ! "Boom" expectMsg("failed with TestException while processing Boom") } + + "be able to persist events that happen during recovery" in { + val persistentActor = namedPersistentActor[PersistInRecovery] + persistentActor ! GetState + expectMsg(List("a-1", "a-2", "rc-1", "rc-2")) + persistentActor ! GetState + expectMsg(List("a-1", "a-2", "rc-1", "rc-2", "rc-3")) + persistentActor ! Cmd("invalid") + persistentActor ! GetState + expectMsg(List("a-1", "a-2", "rc-1", "rc-2", "rc-3", "invalid")) + watch(persistentActor) + persistentActor ! "boom" + expectTerminated(persistentActor) + } } }