From 9bcaeff87d7c3cfe2efcb78d3e20ccb836e0464a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bjo=CC=88rn=20Antonsson?= Date: Thu, 5 Jun 2014 14:07:17 +0200 Subject: [PATCH] +per #13944 Send RecoveryComplete message at end of recovery Fixes #13944 Conflicts: akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala akka-persistence/src/main/scala/akka/persistence/Processor.scala project/AkkaBuild.scala --- .../docs/persistence/PersistenceDocTest.java | 23 +++-- akka-docs/rst/java/lambda-persistence.rst | 8 +- akka-docs/rst/java/persistence.rst | 8 +- .../docs/persistence/PersistenceDocSpec.scala | 16 ++-- akka-docs/rst/scala/persistence.rst | 8 +- .../scala/akka/persistence/Eventsourced.scala | 8 ++ .../scala/akka/persistence/Processor.scala | 64 ++++++++------ .../PersistentActorFailureSpec.scala | 83 +++++++++++++++++++ .../persistence/PersistentActorSpec.scala | 44 +++++++++- .../akka/persistence/ProcessorSpec.scala | 5 +- .../SnapshotFailureRobustnessSpec.scala | 2 + .../SnapshotSerializationSpec.scala | 1 + .../scala/akka/persistence/SnapshotSpec.scala | 14 +++- .../java/doc/LambdaPersistenceDocTest.java | 20 +---- 14 files changed, 223 insertions(+), 81 deletions(-) create mode 100644 akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index f1d7e5bbf1..c18ed62c07 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -19,6 +19,8 @@ import static java.util.Arrays.asList; public class PersistenceDocTest { + public interface SomeOtherMessage {} + public interface ProcessorMethods { //#processor-id public String processorId(); @@ -49,9 +51,12 @@ public class PersistenceDocTest { Long sequenceNr = failure.sequenceNr(); Throwable cause = failure.cause(); // ... - } else { + } else if (message instanceof SomeOtherMessage) { // message not written to journal } + else { + unhandled(message); + } } } //#definition @@ -127,21 +132,12 @@ public class PersistenceDocTest { class MyProcessor5 extends UntypedProcessor { //#recovery-completed - @Override - public void preStart() throws Exception { - super.preStart(); - self().tell("FIRST", self()); - } - public void onReceive(Object message) throws Exception { - if (message.equals("FIRST")) { + if (message instanceof RecoveryCompleted) { recoveryCompleted(); getContext().become(active); - unstashAll(); - } else if (recoveryFinished()) { - stash(); } else { - active.apply(message); + unhandled(message); } } @@ -156,6 +152,9 @@ public class PersistenceDocTest { if (message instanceof Persistent) { // ... } + else { + unhandled(message); + } } }; //#recovery-completed diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index b70f5847f0..04caf62fcc 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -124,9 +124,11 @@ A processor can query its own recovery status via the methods Sometimes there is a need for performing additional initialization when the recovery has completed, before processing any other message sent to the processor. -The processor can send itself a message from ``preStart``. It will be stashed and received -after recovery. The mailbox may contain other messages that are queued in front of -that message and therefore you need to stash until you receive that message. +The processor will receive a special :class:`RecoveryCompleted` message right after recovery +and before any other received messages. If there is a problem with recovering the state of +the actor from the journal, the actor will be sent a :class:`RecoveryFailure` message that +it can choose to handle. If the actor doesn't handle the :class:`RecoveryFailure` message it +will be stopped. .. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recovery-completed diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 55f8aa5c5d..77c437cfb9 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -143,9 +143,11 @@ A processor can query its own recovery status via the methods Sometimes there is a need for performing additional initialization when the recovery has completed, before processing any other message sent to the processor. -The processor can send itself a message from ``preStart``. It will be stashed and received -after recovery. The mailbox may contain other messages that are queued in front of -that message and therefore you need to stash until you receive that message. +The processor will receive a special :class:`RecoveryCompleted` message right after recovery +and before any other received messages. If there is a problem with recovering the state of +the actor from the journal, the actor will be sent a :class:`RecoveryFailure` message that +it can choose to handle. If the actor doesn't handle the :class:`RecoveryFailure` message it +will be stopped. .. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-completed diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index 7b132e43b9..c054e40e0a 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -21,6 +21,8 @@ trait PersistenceDocSpec { //#auto-update """ + trait SomeOtherMessage + val system: ActorSystem import system._ @@ -35,7 +37,7 @@ trait PersistenceDocSpec { // message successfully written to journal case PersistenceFailure(payload, sequenceNr, cause) => // message failed to be written to journal - case other => + case m: SomeOtherMessage => // message not written to journal } } @@ -87,20 +89,12 @@ trait PersistenceDocSpec { class MyProcessor4 extends Processor { //#recovery-completed - override def preStart(): Unit = { - super.preStart() - self ! "FIRST" - } - - def receive = initializing.orElse(active) + def receive = initializing def initializing: Receive = { - case "FIRST" => + case RecoveryCompleted => recoveryCompleted() context.become(active) - unstashAll() - case other if recoveryFinished => - stash() } def recoveryCompleted(): Unit = { diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index dd3871b015..3325bc4b2d 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -132,9 +132,11 @@ A processor can query its own recovery status via the methods Sometimes there is a need for performing additional initialization when the recovery has completed, before processing any other message sent to the processor. -The processor can send itself a message from ``preStart``. It will be stashed and received -after recovery. The mailbox may contain other messages that are queued in front of -that message and therefore you need to stash until you receive that message. +The processor will receive a special :class:`RecoveryCompleted` message right after recovery +and before any other received messages. If there is a problem with recovering the state of +the actor from the journal, the actor will be sent a :class:`RecoveryFailure` message that +it can choose to handle. If the actor doesn't handle the :class:`RecoveryFailure` message it +will be stopped. .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-completed diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index c3190283bd..77324901d3 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -157,6 +157,8 @@ private[persistence] trait Eventsourced extends Processor { receiveRecover(s) case f: RecoveryFailure if receiveRecover.isDefinedAt(f) ⇒ receiveRecover(f) + case RecoveryCompleted if receiveRecover.isDefinedAt(RecoveryCompleted) ⇒ + receiveRecover(RecoveryCompleted) } sealed trait PersistInvocation { @@ -259,6 +261,9 @@ private[persistence] trait Eventsourced extends Processor { * should not perform actions that may fail, such as interacting with external services, * for example. * + * If recovery fails, the actor will be stopped. This can be customized by + * handling [[RecoveryFailure]]. + * * @see [[Recover]] */ def receiveRecover: Receive @@ -429,6 +434,9 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events * should not perform actions that may fail, such as interacting with external services, * for example. * + * If recovery fails, the actor will be stopped. This can be customized by + * handling [[RecoveryFailure]]. + * * @see [[Recover]] */ def onReceiveRecover(msg: Any): Unit diff --git a/akka-persistence/src/main/scala/akka/persistence/Processor.scala b/akka-persistence/src/main/scala/akka/persistence/Processor.scala index 6882eacdbe..4f8feb4f57 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Processor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Processor.scala @@ -69,6 +69,7 @@ trait Processor extends Actor with Recovery { _currentState = processing sequenceNr = highest receiverStash.unstashAll() + onRecoveryCompleted(receive) case ReadHighestSequenceNrFailure(cause) ⇒ onRecoveryFailure(receive, cause) case other ⇒ @@ -89,15 +90,7 @@ trait Processor extends Actor with Recovery { case ReplayedMessage(p) ⇒ processPersistent(receive, p) // can occur after unstash from user stash case WriteMessageSuccess(p) ⇒ processPersistent(receive, p) case WriteMessageFailure(p, cause) ⇒ - val notification = PersistenceFailure(p.payload, p.sequenceNr, cause) - if (receive.isDefinedAt(notification)) process(receive, notification) - else { - val errorMsg = "Processor killed after persistence failure " + - s"(processor id = [${processorId}], sequence nr = [${p.sequenceNr}], payload class = [${p.payload.getClass.getName}]). " + - "To avoid killing processors on persistence failure, a processor must handle PersistenceFailure messages. " + - "PersistenceFailure was caused by: " + cause - throw new ActorKilledException(errorMsg) - } + process(receive, PersistenceFailure(p.payload, p.sequenceNr, cause)) case LoopMessageSuccess(m) ⇒ process(receive, m) case WriteMessagesSuccessful | WriteMessagesFailed(_) ⇒ if (processorBatch.isEmpty) batching = false else journalBatch() @@ -148,18 +141,16 @@ trait Processor extends Actor with Recovery { onRecoveryFailure(receive, cause) /** - * Invokes this processor's behavior with a `RecoveryFailure` message, if handled, otherwise throws a - * `RecoveryFailureException`. + * Invokes this processor's behavior with a `RecoveryFailure` message. */ - private def onRecoveryFailure(receive: Receive, cause: Throwable): Unit = { - val notification = RecoveryFailure(cause) - if (receive.isDefinedAt(notification)) { - receive(notification) - } else { - val errorMsg = s"Recovery failure by journal (processor id = [${processorId}])" - throw new RecoveryException(errorMsg, cause) - } - } + private def onRecoveryFailure(receive: Receive, cause: Throwable): Unit = + receive.applyOrElse(RecoveryFailure(cause), unhandled) + + /** + * Invokes this processor's behavior with a `RecoveryFinished` message. + */ + private def onRecoveryCompleted(receive: Receive): Unit = + receive.applyOrElse(RecoveryCompleted, unhandled) private val _processorId = extension.processorId(self) @@ -296,6 +287,24 @@ trait Processor extends Actor with Recovery { try preRestart(reason, message) finally super.preRestart(reason, message) } + override def unhandled(message: Any): Unit = { + message match { + case RecoveryCompleted ⇒ // mute + case RecoveryFailure(cause) ⇒ + val errorMsg = s"Processor killed after recovery failure (processor id = [${processorId}]). " + + "To avoid killing processors on recovery failure, a processor must handle RecoveryFailure messages. " + + "RecoveryFailure was caused by: " + cause + throw new ActorKilledException(errorMsg) + case PersistenceFailure(payload, sequenceNumber, cause) ⇒ + val errorMsg = "Processor killed after persistence failure " + + s"(processor id = [${processorId}], sequence nr = [${sequenceNumber}], payload class = [${payload.getClass.getName}]). " + + "To avoid killing processors on persistence failure, a processor must handle PersistenceFailure messages. " + + "PersistenceFailure was caused by: " + cause + throw new ActorKilledException(errorMsg) + case m ⇒ super.unhandled(m) + } + } + private def nextSequenceNr(): Long = { sequenceNr += 1L sequenceNr @@ -321,19 +330,22 @@ final case class PersistenceFailure(payload: Any, sequenceNr: Long, cause: Throw /** * Sent to a [[Processor]] if a journal fails to replay messages or fetch that processor's - * highest sequence number. If not handled, a [[RecoveryException]] is thrown by that - * processor. + * highest sequence number. If not handled, the prossor will be stopped. */ @SerialVersionUID(1L) final case class RecoveryFailure(cause: Throwable) +abstract class RecoveryCompleted /** - * Thrown by a [[Processor]] if a journal fails to replay messages or fetch that processor's - * highest sequence number. This exception is only thrown if that processor doesn't handle - * [[RecoveryFailure]] messages. + * Sent to a [[Processor]] when the journal replay has been finished. */ @SerialVersionUID(1L) -final case class RecoveryException(message: String, cause: Throwable) extends AkkaException(message, cause) +case object RecoveryCompleted extends RecoveryCompleted { + /** + * Java API: get the singleton instance + */ + def getInstance = this +} /** * Java API: an actor that persists (journals) messages of type [[Persistent]]. Messages of other types diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala new file mode 100644 index 0000000000..dd6b986b09 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorFailureSpec.scala @@ -0,0 +1,83 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.persistence + +import akka.actor._ +import akka.persistence.journal.AsyncWriteProxy +import akka.persistence.journal.inmem.InmemStore +import akka.testkit.{ ImplicitSender, AkkaSpec } +import akka.util.Timeout +import com.typesafe.config.Config +import scala.concurrent.duration._ +import akka.persistence.journal.AsyncWriteTarget.{ ReplayFailure, ReplaySuccess, ReplayMessages } + +import scala.language.postfixOps +import akka.persistence.journal.AsyncWriteTarget.ReplayFailure +import scala.Some +import akka.actor.OneForOneStrategy +import akka.persistence.journal.AsyncWriteTarget.ReplayMessages + +object PersistentActorFailureSpec { + class FailingInmemJournal extends AsyncWriteProxy { + import AsyncWriteProxy.SetStore + + val timeout = Timeout(5 seconds) + + override def preStart(): Unit = { + super.preStart() + self ! SetStore(context.actorOf(Props[FailingInmemStore])) + } + } + + class FailingInmemStore extends InmemStore { + def failingReceive: Receive = { + case ReplayMessages(pid, fromSnr, toSnr, max) ⇒ + val readFromStore = read(pid, fromSnr, toSnr, max) + if (readFromStore.length == 0) + sender ! ReplaySuccess + else + sender ! ReplayFailure(new IllegalArgumentException(s"blahonga $fromSnr $toSnr")) + } + + override def receive = failingReceive.orElse(super.receive) + } + + class Supervisor(testActor: ActorRef) extends Actor { + override def supervisorStrategy = OneForOneStrategy(loggingEnabled = false) { + case e ⇒ testActor ! e; SupervisorStrategy.Stop + } + + def receive = { + case props: Props ⇒ sender ! context.actorOf(props) + case m ⇒ sender ! m + } + } +} + +class PersistentActorFailureSpec extends AkkaSpec(PersistenceSpec.config("inmem", "SnapshotFailureRobustnessSpec", extraConfig = Some( + """ + |akka.persistence.journal.inmem.class = "akka.persistence.PersistentActorFailureSpec$FailingInmemJournal" + """.stripMargin))) with PersistenceSpec with ImplicitSender { + + import PersistentActorSpec._ + import PersistentActorFailureSpec._ + + override protected def beforeEach() { + super.beforeEach() + + val processor = namedProcessor[Behavior1Processor] + processor ! Cmd("a") + processor ! GetState + expectMsg(List("a-1", "a-2")) + } + + "A persistent actor" must { + "throw ActorKilledException if recovery from persisted events fail" in { + system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[Behavior1Processor], name) + expectMsgType[ActorRef] + expectMsgType[ActorKilledException] + } + } +} diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala index 7ed797c071..179bd46f30 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala @@ -135,7 +135,7 @@ object PersistentActorSpec { persist(Seq(Evt(s"${cmd.data}-41"), Evt(s"${cmd.data}-42")))(updateState) } - val receiveCommand: Receive = commonBehavior orElse { + def receiveCommand: Receive = commonBehavior orElse { case c: Cmd ⇒ handleCmd(c) case SaveSnapshotSuccess(_) ⇒ probe ! "saved" case "snap" ⇒ saveSnapshot(events) @@ -327,6 +327,27 @@ object PersistentActorSpec { case Cmd("a") ⇒ persist(5)(evt ⇒ sender() ! evt) } } + + class HandleRecoveryFinishedEventProcessor(name: String, probe: ActorRef) extends SnapshottingPersistentActor(name, probe) { + val sendingRecover: Receive = { + case msg: SnapshotOffer ⇒ + // sending ourself a normal message tests + // that we stash them until recovery is complete + self ! "I am the stashed" + super.receiveRecover(msg) + case RecoveryCompleted ⇒ + probe ! RecoveryCompleted + self ! "I am the recovered" + updateState(Evt(RecoveryCompleted)) + } + + override def receiveRecover = sendingRecover.orElse(super.receiveRecover) + + override def receiveCommand: Receive = super.receiveCommand orElse { + case s: String ⇒ probe ! s + } + + } } abstract class PersistentActorSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender { @@ -604,8 +625,25 @@ abstract class PersistentActorSpec(config: Config) extends AkkaSpec(config) with expectNoMsg(100.millis) } + "receive RecoveryFinished if it is handled after all events have been replayed" in { + val processor1 = system.actorOf(Props(classOf[SnapshottingPersistentActor], name, testActor)) + processor1 ! Cmd("b") + processor1 ! "snap" + processor1 ! Cmd("c") + expectMsg("saved") + processor1 ! GetState + expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42")) + + val processor2 = system.actorOf(Props(classOf[HandleRecoveryFinishedEventProcessor], name, testActor)) + expectMsg("offered") + expectMsg(RecoveryCompleted) + expectMsg("I am the stashed") + expectMsg("I am the recovered") + processor2 ! GetState + expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42", RecoveryCompleted)) + } } } -class LeveldbPersistentActorSpec extends PersistentActorSpec(PersistenceSpec.config("leveldb", "LeveldbEventsourcedSpec")) -class InmemPersistentActorSpec extends PersistentActorSpec(PersistenceSpec.config("inmem", "InmemEventsourcedSpec")) +class LeveldbPersistentActorSpec extends PersistentActorSpec(PersistenceSpec.config("leveldb", "LeveldbPersistentActorSpec")) +class InmemPersistentActorSpec extends PersistentActorSpec(PersistenceSpec.config("inmem", "InmemPersistentActorSpec")) diff --git a/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala index 7a812bedc0..40229f2cd0 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala @@ -128,10 +128,11 @@ object ProcessorSpec { final case class DeleteN(toSnr: Long) class DeleteMessageTestProcessor(name: String) extends RecoverTestProcessor(name) { - override def receive = { + override def receive = deleteReceive orElse super.receive + + def deleteReceive: Actor.Receive = { case Delete1(snr) ⇒ deleteMessage(snr) case DeleteN(toSnr) ⇒ deleteMessages(toSnr) - case m ⇒ super.receive(m) } } } diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala index 5612e205b0..56b69afbd8 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala @@ -55,6 +55,7 @@ class SnapshotFailureRobustnessSpec extends AkkaSpec(PersistenceSpec.config("lev val sProcessor = system.actorOf(Props(classOf[SaveSnapshotTestProcessor], name, testActor)) val processorId = name + expectMsg(RecoveryCompleted) sProcessor ! Persistent("blahonga") expectMsg(1) sProcessor ! Persistent("kablama") @@ -71,6 +72,7 @@ class SnapshotFailureRobustnessSpec extends AkkaSpec(PersistenceSpec.config("lev timestamp should be > (0L) } expectMsg("kablama-2") + expectMsg(RecoveryCompleted) expectNoMsg(1 second) } finally { system.eventStream.unsubscribe(testActor, classOf[Logging.Error]) diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala index c8daa98c6a..13beeb8d0e 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala @@ -52,6 +52,7 @@ object SnapshotSerializationSpec { case s: String ⇒ saveSnapshot(new MySnapshot(s)) case SaveSnapshotSuccess(md) ⇒ probe ! md.sequenceNr case SnapshotOffer(md, s) ⇒ probe ! ((md, s)) + case RecoveryCompleted ⇒ // ignore case other ⇒ probe ! other } } diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala index 06256a14cd..5176f60cd0 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala @@ -33,10 +33,10 @@ object SnapshotSpec { final case class DeleteN(criteria: SnapshotSelectionCriteria) class DeleteSnapshotTestProcessor(name: String, probe: ActorRef) extends LoadSnapshotTestProcessor(name, probe) { - override def receive = { + override def receive = receiveDelete orElse super.receive + def receiveDelete: Receive = { case Delete1(metadata) ⇒ deleteSnapshot(metadata.sequenceNr, metadata.timestamp) case DeleteN(criteria) ⇒ deleteSnapshots(criteria) - case other ⇒ super.receive(other) } } } @@ -75,6 +75,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS } expectMsg("e-5") expectMsg("f-6") + expectMsg(RecoveryCompleted) } "recover state starting from the most recent snapshot matching an upper sequence number bound" in { val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) @@ -88,6 +89,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS timestamp should be > (0L) } expectMsg("c-3") + expectMsg(RecoveryCompleted) } "recover state starting from the most recent snapshot matching an upper sequence number bound (without further replay)" in { val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) @@ -101,6 +103,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS state should be(List("a-1", "b-2", "c-3", "d-4").reverse) timestamp should be > (0L) } + expectMsg(RecoveryCompleted) expectMsg("done") } "recover state starting from the most recent snapshot matching criteria" in { @@ -118,6 +121,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS expectMsg("d-4") expectMsg("e-5") expectMsg("f-6") + expectMsg(RecoveryCompleted) } "recover state starting from the most recent snapshot matching criteria and an upper sequence number bound" in { val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) @@ -131,6 +135,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS timestamp should be > (0L) } expectMsg("c-3") + expectMsg(RecoveryCompleted) } "recover state from scratch if snapshot based recovery is disabled" in { val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) @@ -140,6 +145,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS expectMsg("a-1") expectMsg("b-2") expectMsg("c-3") + expectMsg(RecoveryCompleted) } "support single message deletions" in { val deleteProbe = TestProbe() @@ -158,6 +164,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS state should be(List("a-1", "b-2", "c-3", "d-4").reverse) md } + expectMsg(RecoveryCompleted) expectMsg("done") processor1 ! Delete1(metadata) @@ -174,6 +181,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS } expectMsg("c-3") expectMsg("d-4") + expectMsg(RecoveryCompleted) } "support bulk message deletions" in { val deleteProbe = TestProbe() @@ -190,6 +198,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS case (md @ SnapshotMetadata(`processorId`, 4, _), state) ⇒ state should be(List("a-1", "b-2", "c-3", "d-4").reverse) } + expectMsg(RecoveryCompleted) deleteProbe.expectMsgType[DeleteSnapshots] // recover processor from replayed messages (all snapshots deleted) @@ -200,6 +209,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS expectMsg("b-2") expectMsg("c-3") expectMsg("d-4") + expectMsg(RecoveryCompleted) } } } diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java index 4e2f985c2d..c924b2238c 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java @@ -19,6 +19,8 @@ import static java.util.Arrays.asList; public class LambdaPersistenceDocTest { + public interface SomeOtherMessage {} + public interface ProcessorMethods { //#processor-id public String processorId(); @@ -50,7 +52,7 @@ public class LambdaPersistenceDocTest { Throwable cause = failure.cause(); // ... }). - matchAny(otherwise -> { + match(SomeOtherMessage.class, message -> { // message not written to journal }).build() ); @@ -136,28 +138,14 @@ public class LambdaPersistenceDocTest { public MyProcessor5() { receive(ReceiveBuilder. - matchEquals("FIRST", s -> { + match(RecoveryCompleted.class, r -> { recoveryCompleted(); getContext().become(active); - unstashAll(); - }). - matchAny(message -> { - if (recoveryFinished()) { - stash(); - } else { - active.apply(message); - } }). build() ); } - @Override - public void preStart() throws Exception { - super.preStart(); - self().tell("FIRST", self()); - } - private void recoveryCompleted() { // perform init after recovery, before any other messages // ...