+per #13944 Send RecoveryComplete message at end of recovery

Fixes #13944

Conflicts:
	akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala
	akka-persistence/src/main/scala/akka/persistence/Processor.scala
	project/AkkaBuild.scala
This commit is contained in:
Björn Antonsson 2014-06-05 14:07:17 +02:00
parent 3c9483520a
commit 9bcaeff87d
14 changed files with 223 additions and 81 deletions

View file

@ -19,6 +19,8 @@ import static java.util.Arrays.asList;
public class PersistenceDocTest {
public interface SomeOtherMessage {}
public interface ProcessorMethods {
//#processor-id
public String processorId();
@ -49,9 +51,12 @@ public class PersistenceDocTest {
Long sequenceNr = failure.sequenceNr();
Throwable cause = failure.cause();
// ...
} else {
} else if (message instanceof SomeOtherMessage) {
// message not written to journal
}
else {
unhandled(message);
}
}
}
//#definition
@ -127,21 +132,12 @@ public class PersistenceDocTest {
class MyProcessor5 extends UntypedProcessor {
//#recovery-completed
@Override
public void preStart() throws Exception {
super.preStart();
self().tell("FIRST", self());
}
public void onReceive(Object message) throws Exception {
if (message.equals("FIRST")) {
if (message instanceof RecoveryCompleted) {
recoveryCompleted();
getContext().become(active);
unstashAll();
} else if (recoveryFinished()) {
stash();
} else {
active.apply(message);
unhandled(message);
}
}
@ -156,6 +152,9 @@ public class PersistenceDocTest {
if (message instanceof Persistent) {
// ...
}
else {
unhandled(message);
}
}
};
//#recovery-completed

View file

@ -124,9 +124,11 @@ A processor can query its own recovery status via the methods
Sometimes there is a need for performing additional initialization when the
recovery has completed, before processing any other message sent to the processor.
The processor can send itself a message from ``preStart``. It will be stashed and received
after recovery. The mailbox may contain other messages that are queued in front of
that message and therefore you need to stash until you receive that message.
The processor will receive a special :class:`RecoveryCompleted` message right after recovery
and before any other received messages. If there is a problem with recovering the state of
the actor from the journal, the actor will be sent a :class:`RecoveryFailure` message that
it can choose to handle. If the actor doesn't handle the :class:`RecoveryFailure` message it
will be stopped.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recovery-completed

View file

@ -143,9 +143,11 @@ A processor can query its own recovery status via the methods
Sometimes there is a need for performing additional initialization when the
recovery has completed, before processing any other message sent to the processor.
The processor can send itself a message from ``preStart``. It will be stashed and received
after recovery. The mailbox may contain other messages that are queued in front of
that message and therefore you need to stash until you receive that message.
The processor will receive a special :class:`RecoveryCompleted` message right after recovery
and before any other received messages. If there is a problem with recovering the state of
the actor from the journal, the actor will be sent a :class:`RecoveryFailure` message that
it can choose to handle. If the actor doesn't handle the :class:`RecoveryFailure` message it
will be stopped.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-completed

View file

@ -21,6 +21,8 @@ trait PersistenceDocSpec {
//#auto-update
"""
trait SomeOtherMessage
val system: ActorSystem
import system._
@ -35,7 +37,7 @@ trait PersistenceDocSpec {
// message successfully written to journal
case PersistenceFailure(payload, sequenceNr, cause) =>
// message failed to be written to journal
case other =>
case m: SomeOtherMessage =>
// message not written to journal
}
}
@ -87,20 +89,12 @@ trait PersistenceDocSpec {
class MyProcessor4 extends Processor {
//#recovery-completed
override def preStart(): Unit = {
super.preStart()
self ! "FIRST"
}
def receive = initializing.orElse(active)
def receive = initializing
def initializing: Receive = {
case "FIRST" =>
case RecoveryCompleted =>
recoveryCompleted()
context.become(active)
unstashAll()
case other if recoveryFinished =>
stash()
}
def recoveryCompleted(): Unit = {

View file

@ -132,9 +132,11 @@ A processor can query its own recovery status via the methods
Sometimes there is a need for performing additional initialization when the
recovery has completed, before processing any other message sent to the processor.
The processor can send itself a message from ``preStart``. It will be stashed and received
after recovery. The mailbox may contain other messages that are queued in front of
that message and therefore you need to stash until you receive that message.
The processor will receive a special :class:`RecoveryCompleted` message right after recovery
and before any other received messages. If there is a problem with recovering the state of
the actor from the journal, the actor will be sent a :class:`RecoveryFailure` message that
it can choose to handle. If the actor doesn't handle the :class:`RecoveryFailure` message it
will be stopped.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-completed

View file

@ -157,6 +157,8 @@ private[persistence] trait Eventsourced extends Processor {
receiveRecover(s)
case f: RecoveryFailure if receiveRecover.isDefinedAt(f)
receiveRecover(f)
case RecoveryCompleted if receiveRecover.isDefinedAt(RecoveryCompleted)
receiveRecover(RecoveryCompleted)
}
sealed trait PersistInvocation {
@ -259,6 +261,9 @@ private[persistence] trait Eventsourced extends Processor {
* should not perform actions that may fail, such as interacting with external services,
* for example.
*
* If recovery fails, the actor will be stopped. This can be customized by
* handling [[RecoveryFailure]].
*
* @see [[Recover]]
*/
def receiveRecover: Receive
@ -429,6 +434,9 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events
* should not perform actions that may fail, such as interacting with external services,
* for example.
*
* If recovery fails, the actor will be stopped. This can be customized by
* handling [[RecoveryFailure]].
*
* @see [[Recover]]
*/
def onReceiveRecover(msg: Any): Unit

View file

@ -69,6 +69,7 @@ trait Processor extends Actor with Recovery {
_currentState = processing
sequenceNr = highest
receiverStash.unstashAll()
onRecoveryCompleted(receive)
case ReadHighestSequenceNrFailure(cause)
onRecoveryFailure(receive, cause)
case other
@ -89,15 +90,7 @@ trait Processor extends Actor with Recovery {
case ReplayedMessage(p) processPersistent(receive, p) // can occur after unstash from user stash
case WriteMessageSuccess(p) processPersistent(receive, p)
case WriteMessageFailure(p, cause)
val notification = PersistenceFailure(p.payload, p.sequenceNr, cause)
if (receive.isDefinedAt(notification)) process(receive, notification)
else {
val errorMsg = "Processor killed after persistence failure " +
s"(processor id = [${processorId}], sequence nr = [${p.sequenceNr}], payload class = [${p.payload.getClass.getName}]). " +
"To avoid killing processors on persistence failure, a processor must handle PersistenceFailure messages. " +
"PersistenceFailure was caused by: " + cause
throw new ActorKilledException(errorMsg)
}
process(receive, PersistenceFailure(p.payload, p.sequenceNr, cause))
case LoopMessageSuccess(m) process(receive, m)
case WriteMessagesSuccessful | WriteMessagesFailed(_)
if (processorBatch.isEmpty) batching = false else journalBatch()
@ -148,18 +141,16 @@ trait Processor extends Actor with Recovery {
onRecoveryFailure(receive, cause)
/**
* Invokes this processor's behavior with a `RecoveryFailure` message, if handled, otherwise throws a
* `RecoveryFailureException`.
* Invokes this processor's behavior with a `RecoveryFailure` message.
*/
private def onRecoveryFailure(receive: Receive, cause: Throwable): Unit = {
val notification = RecoveryFailure(cause)
if (receive.isDefinedAt(notification)) {
receive(notification)
} else {
val errorMsg = s"Recovery failure by journal (processor id = [${processorId}])"
throw new RecoveryException(errorMsg, cause)
}
}
private def onRecoveryFailure(receive: Receive, cause: Throwable): Unit =
receive.applyOrElse(RecoveryFailure(cause), unhandled)
/**
* Invokes this processor's behavior with a `RecoveryFinished` message.
*/
private def onRecoveryCompleted(receive: Receive): Unit =
receive.applyOrElse(RecoveryCompleted, unhandled)
private val _processorId = extension.processorId(self)
@ -296,6 +287,24 @@ trait Processor extends Actor with Recovery {
try preRestart(reason, message) finally super.preRestart(reason, message)
}
override def unhandled(message: Any): Unit = {
message match {
case RecoveryCompleted // mute
case RecoveryFailure(cause)
val errorMsg = s"Processor killed after recovery failure (processor id = [${processorId}]). " +
"To avoid killing processors on recovery failure, a processor must handle RecoveryFailure messages. " +
"RecoveryFailure was caused by: " + cause
throw new ActorKilledException(errorMsg)
case PersistenceFailure(payload, sequenceNumber, cause)
val errorMsg = "Processor killed after persistence failure " +
s"(processor id = [${processorId}], sequence nr = [${sequenceNumber}], payload class = [${payload.getClass.getName}]). " +
"To avoid killing processors on persistence failure, a processor must handle PersistenceFailure messages. " +
"PersistenceFailure was caused by: " + cause
throw new ActorKilledException(errorMsg)
case m super.unhandled(m)
}
}
private def nextSequenceNr(): Long = {
sequenceNr += 1L
sequenceNr
@ -321,19 +330,22 @@ final case class PersistenceFailure(payload: Any, sequenceNr: Long, cause: Throw
/**
* Sent to a [[Processor]] if a journal fails to replay messages or fetch that processor's
* highest sequence number. If not handled, a [[RecoveryException]] is thrown by that
* processor.
* highest sequence number. If not handled, the prossor will be stopped.
*/
@SerialVersionUID(1L)
final case class RecoveryFailure(cause: Throwable)
abstract class RecoveryCompleted
/**
* Thrown by a [[Processor]] if a journal fails to replay messages or fetch that processor's
* highest sequence number. This exception is only thrown if that processor doesn't handle
* [[RecoveryFailure]] messages.
* Sent to a [[Processor]] when the journal replay has been finished.
*/
@SerialVersionUID(1L)
final case class RecoveryException(message: String, cause: Throwable) extends AkkaException(message, cause)
case object RecoveryCompleted extends RecoveryCompleted {
/**
* Java API: get the singleton instance
*/
def getInstance = this
}
/**
* Java API: an actor that persists (journals) messages of type [[Persistent]]. Messages of other types

View file

@ -0,0 +1,83 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import akka.actor._
import akka.persistence.journal.AsyncWriteProxy
import akka.persistence.journal.inmem.InmemStore
import akka.testkit.{ ImplicitSender, AkkaSpec }
import akka.util.Timeout
import com.typesafe.config.Config
import scala.concurrent.duration._
import akka.persistence.journal.AsyncWriteTarget.{ ReplayFailure, ReplaySuccess, ReplayMessages }
import scala.language.postfixOps
import akka.persistence.journal.AsyncWriteTarget.ReplayFailure
import scala.Some
import akka.actor.OneForOneStrategy
import akka.persistence.journal.AsyncWriteTarget.ReplayMessages
object PersistentActorFailureSpec {
class FailingInmemJournal extends AsyncWriteProxy {
import AsyncWriteProxy.SetStore
val timeout = Timeout(5 seconds)
override def preStart(): Unit = {
super.preStart()
self ! SetStore(context.actorOf(Props[FailingInmemStore]))
}
}
class FailingInmemStore extends InmemStore {
def failingReceive: Receive = {
case ReplayMessages(pid, fromSnr, toSnr, max)
val readFromStore = read(pid, fromSnr, toSnr, max)
if (readFromStore.length == 0)
sender ! ReplaySuccess
else
sender ! ReplayFailure(new IllegalArgumentException(s"blahonga $fromSnr $toSnr"))
}
override def receive = failingReceive.orElse(super.receive)
}
class Supervisor(testActor: ActorRef) extends Actor {
override def supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
case e testActor ! e; SupervisorStrategy.Stop
}
def receive = {
case props: Props sender ! context.actorOf(props)
case m sender ! m
}
}
}
class PersistentActorFailureSpec extends AkkaSpec(PersistenceSpec.config("inmem", "SnapshotFailureRobustnessSpec", extraConfig = Some(
"""
|akka.persistence.journal.inmem.class = "akka.persistence.PersistentActorFailureSpec$FailingInmemJournal"
""".stripMargin))) with PersistenceSpec with ImplicitSender {
import PersistentActorSpec._
import PersistentActorFailureSpec._
override protected def beforeEach() {
super.beforeEach()
val processor = namedProcessor[Behavior1Processor]
processor ! Cmd("a")
processor ! GetState
expectMsg(List("a-1", "a-2"))
}
"A persistent actor" must {
"throw ActorKilledException if recovery from persisted events fail" in {
system.actorOf(Props(classOf[Supervisor], testActor)) ! Props(classOf[Behavior1Processor], name)
expectMsgType[ActorRef]
expectMsgType[ActorKilledException]
}
}
}

View file

@ -135,7 +135,7 @@ object PersistentActorSpec {
persist(Seq(Evt(s"${cmd.data}-41"), Evt(s"${cmd.data}-42")))(updateState)
}
val receiveCommand: Receive = commonBehavior orElse {
def receiveCommand: Receive = commonBehavior orElse {
case c: Cmd handleCmd(c)
case SaveSnapshotSuccess(_) probe ! "saved"
case "snap" saveSnapshot(events)
@ -327,6 +327,27 @@ object PersistentActorSpec {
case Cmd("a") persist(5)(evt sender() ! evt)
}
}
class HandleRecoveryFinishedEventProcessor(name: String, probe: ActorRef) extends SnapshottingPersistentActor(name, probe) {
val sendingRecover: Receive = {
case msg: SnapshotOffer
// sending ourself a normal message tests
// that we stash them until recovery is complete
self ! "I am the stashed"
super.receiveRecover(msg)
case RecoveryCompleted
probe ! RecoveryCompleted
self ! "I am the recovered"
updateState(Evt(RecoveryCompleted))
}
override def receiveRecover = sendingRecover.orElse(super.receiveRecover)
override def receiveCommand: Receive = super.receiveCommand orElse {
case s: String probe ! s
}
}
}
abstract class PersistentActorSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
@ -604,8 +625,25 @@ abstract class PersistentActorSpec(config: Config) extends AkkaSpec(config) with
expectNoMsg(100.millis)
}
"receive RecoveryFinished if it is handled after all events have been replayed" in {
val processor1 = system.actorOf(Props(classOf[SnapshottingPersistentActor], name, testActor))
processor1 ! Cmd("b")
processor1 ! "snap"
processor1 ! Cmd("c")
expectMsg("saved")
processor1 ! GetState
expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42"))
val processor2 = system.actorOf(Props(classOf[HandleRecoveryFinishedEventProcessor], name, testActor))
expectMsg("offered")
expectMsg(RecoveryCompleted)
expectMsg("I am the stashed")
expectMsg("I am the recovered")
processor2 ! GetState
expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42", RecoveryCompleted))
}
}
}
class LeveldbPersistentActorSpec extends PersistentActorSpec(PersistenceSpec.config("leveldb", "LeveldbEventsourcedSpec"))
class InmemPersistentActorSpec extends PersistentActorSpec(PersistenceSpec.config("inmem", "InmemEventsourcedSpec"))
class LeveldbPersistentActorSpec extends PersistentActorSpec(PersistenceSpec.config("leveldb", "LeveldbPersistentActorSpec"))
class InmemPersistentActorSpec extends PersistentActorSpec(PersistenceSpec.config("inmem", "InmemPersistentActorSpec"))

View file

@ -128,10 +128,11 @@ object ProcessorSpec {
final case class DeleteN(toSnr: Long)
class DeleteMessageTestProcessor(name: String) extends RecoverTestProcessor(name) {
override def receive = {
override def receive = deleteReceive orElse super.receive
def deleteReceive: Actor.Receive = {
case Delete1(snr) deleteMessage(snr)
case DeleteN(toSnr) deleteMessages(toSnr)
case m super.receive(m)
}
}
}

View file

@ -55,6 +55,7 @@ class SnapshotFailureRobustnessSpec extends AkkaSpec(PersistenceSpec.config("lev
val sProcessor = system.actorOf(Props(classOf[SaveSnapshotTestProcessor], name, testActor))
val processorId = name
expectMsg(RecoveryCompleted)
sProcessor ! Persistent("blahonga")
expectMsg(1)
sProcessor ! Persistent("kablama")
@ -71,6 +72,7 @@ class SnapshotFailureRobustnessSpec extends AkkaSpec(PersistenceSpec.config("lev
timestamp should be > (0L)
}
expectMsg("kablama-2")
expectMsg(RecoveryCompleted)
expectNoMsg(1 second)
} finally {
system.eventStream.unsubscribe(testActor, classOf[Logging.Error])

View file

@ -52,6 +52,7 @@ object SnapshotSerializationSpec {
case s: String saveSnapshot(new MySnapshot(s))
case SaveSnapshotSuccess(md) probe ! md.sequenceNr
case SnapshotOffer(md, s) probe ! ((md, s))
case RecoveryCompleted // ignore
case other probe ! other
}
}

View file

@ -33,10 +33,10 @@ object SnapshotSpec {
final case class DeleteN(criteria: SnapshotSelectionCriteria)
class DeleteSnapshotTestProcessor(name: String, probe: ActorRef) extends LoadSnapshotTestProcessor(name, probe) {
override def receive = {
override def receive = receiveDelete orElse super.receive
def receiveDelete: Receive = {
case Delete1(metadata) deleteSnapshot(metadata.sequenceNr, metadata.timestamp)
case DeleteN(criteria) deleteSnapshots(criteria)
case other super.receive(other)
}
}
}
@ -75,6 +75,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS
}
expectMsg("e-5")
expectMsg("f-6")
expectMsg(RecoveryCompleted)
}
"recover state starting from the most recent snapshot matching an upper sequence number bound" in {
val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor))
@ -88,6 +89,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS
timestamp should be > (0L)
}
expectMsg("c-3")
expectMsg(RecoveryCompleted)
}
"recover state starting from the most recent snapshot matching an upper sequence number bound (without further replay)" in {
val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor))
@ -101,6 +103,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS
state should be(List("a-1", "b-2", "c-3", "d-4").reverse)
timestamp should be > (0L)
}
expectMsg(RecoveryCompleted)
expectMsg("done")
}
"recover state starting from the most recent snapshot matching criteria" in {
@ -118,6 +121,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS
expectMsg("d-4")
expectMsg("e-5")
expectMsg("f-6")
expectMsg(RecoveryCompleted)
}
"recover state starting from the most recent snapshot matching criteria and an upper sequence number bound" in {
val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor))
@ -131,6 +135,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS
timestamp should be > (0L)
}
expectMsg("c-3")
expectMsg(RecoveryCompleted)
}
"recover state from scratch if snapshot based recovery is disabled" in {
val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor))
@ -140,6 +145,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS
expectMsg("a-1")
expectMsg("b-2")
expectMsg("c-3")
expectMsg(RecoveryCompleted)
}
"support single message deletions" in {
val deleteProbe = TestProbe()
@ -158,6 +164,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS
state should be(List("a-1", "b-2", "c-3", "d-4").reverse)
md
}
expectMsg(RecoveryCompleted)
expectMsg("done")
processor1 ! Delete1(metadata)
@ -174,6 +181,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS
}
expectMsg("c-3")
expectMsg("d-4")
expectMsg(RecoveryCompleted)
}
"support bulk message deletions" in {
val deleteProbe = TestProbe()
@ -190,6 +198,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS
case (md @ SnapshotMetadata(`processorId`, 4, _), state)
state should be(List("a-1", "b-2", "c-3", "d-4").reverse)
}
expectMsg(RecoveryCompleted)
deleteProbe.expectMsgType[DeleteSnapshots]
// recover processor from replayed messages (all snapshots deleted)
@ -200,6 +209,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS
expectMsg("b-2")
expectMsg("c-3")
expectMsg("d-4")
expectMsg(RecoveryCompleted)
}
}
}

View file

@ -19,6 +19,8 @@ import static java.util.Arrays.asList;
public class LambdaPersistenceDocTest {
public interface SomeOtherMessage {}
public interface ProcessorMethods {
//#processor-id
public String processorId();
@ -50,7 +52,7 @@ public class LambdaPersistenceDocTest {
Throwable cause = failure.cause();
// ...
}).
matchAny(otherwise -> {
match(SomeOtherMessage.class, message -> {
// message not written to journal
}).build()
);
@ -136,28 +138,14 @@ public class LambdaPersistenceDocTest {
public MyProcessor5() {
receive(ReceiveBuilder.
matchEquals("FIRST", s -> {
match(RecoveryCompleted.class, r -> {
recoveryCompleted();
getContext().become(active);
unstashAll();
}).
matchAny(message -> {
if (recoveryFinished()) {
stash();
} else {
active.apply(message);
}
}).
build()
);
}
@Override
public void preStart() throws Exception {
super.preStart();
self().tell("FIRST", self());
}
private void recoveryCompleted() {
// perform init after recovery, before any other messages
// ...