Typed Persistence: onRecoveryFailure callback (#25993)

This commit is contained in:
Christopher Batey 2018-12-04 09:25:00 +00:00 committed by Arnout Engelen
parent 5ccfc12eb5
commit 8419671de2
7 changed files with 79 additions and 11 deletions

View file

@ -149,6 +149,11 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set
* @param message the message that was being processed when the exception was thrown
*/
protected def onRecoveryFailure(cause: Throwable, sequenceNr: Long, message: Option[Any]): Behavior[InternalProtocol] = {
try {
setup.onRecoveryFailure(cause)
} catch {
case NonFatal(t) setup.log.error(t, "onRecoveryFailure threw exception")
}
setup.cancelRecoveryTimer()
tryReturnRecoveryPermit("on replay failure: " + cause.getMessage)

View file

@ -36,6 +36,7 @@ private[persistence] final class EventsourcedSetup[C, E, S](
val eventHandler: PersistentBehavior.EventHandler[S, E],
val writerIdentity: WriterIdentity,
val recoveryCompleted: S Unit,
val onRecoveryFailure: Throwable Unit,
val onSnapshot: (SnapshotMetadata, Try[Done]) Unit,
val tagger: E Set[String],
val eventAdapter: EventAdapter[E, _],

View file

@ -45,7 +45,8 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
snapshotWhen: (State, Event, Long) Boolean = ConstantFun.scalaAnyThreeToFalse,
recovery: Recovery = Recovery(),
supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop,
onSnapshot: (SnapshotMetadata, Try[Done]) Unit = ConstantFun.scalaAnyTwoToUnit
onSnapshot: (SnapshotMetadata, Try[Done]) Unit = ConstantFun.scalaAnyTwoToUnit,
onRecoveryFailure: Throwable Unit = ConstantFun.scalaAnyToUnit
) extends PersistentBehavior[Command, Event, State] with EventsourcedStashReferenceManagement {
override def apply(context: typed.ActorContext[Command]): Behavior[Command] = {
@ -69,6 +70,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
eventHandler,
WriterIdentity.newIdentity(),
recoveryCompleted,
onRecoveryFailure,
actualOnSnapshot,
tagger,
eventAdapter,
@ -191,4 +193,11 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
*/
def onPersistFailure(backoffStrategy: BackoffSupervisorStrategy): PersistentBehavior[Command, Event, State] =
copy(supervisionStrategy = backoffStrategy)
/**
* The `callback` function is called to notify that recovery has failed. For setting a supervision
* strategy `onPersistFailure`
*/
def onRecoveryFailure(callback: Throwable Unit): PersistentBehavior[Command, Event, State] =
copy(onRecoveryFailure = callback)
}

View file

@ -91,6 +91,12 @@ abstract class PersistentBehavior[Command, Event, State >: Null] private[akka] (
*/
def onRecoveryCompleted(state: State): Unit = ()
/**
* The `callback` function is called to notify the actor that the recovery process
* has failed
*/
def onRecoveryFailure(failure: Throwable): Unit = ()
/**
* Override to get notified when a snapshot is finished.
*
@ -167,7 +173,9 @@ abstract class PersistentBehavior[Command, Event, State >: Null] private[akka] (
case Success(_) Optional.empty()
case Failure(t) Optional.of(t)
})
}).eventAdapter(eventAdapter())
})
.eventAdapter(eventAdapter())
.onRecoveryFailure(onRecoveryFailure)
if (supervisorStrategy.isPresent)
behavior.onPersistFailure(supervisorStrategy.get)

View file

@ -93,10 +93,14 @@ object PersistentBehavior {
def persistenceId: PersistenceId
/**
* The `callback` function is called to notify the actor that the recovery process
* is finished.
* The `callback` function is called to notify that the recovery process has finished.
*/
def onRecoveryCompleted(callback: State Unit): PersistentBehavior[Command, Event, State]
/**
* The `callback` function is called to notify that recovery has failed. For setting a supervision
* strategy `onPersistFailure`
*/
def onRecoveryFailure(callback: Throwable Unit): PersistentBehavior[Command, Event, State]
/**
* The `callback` function is called to notify when a snapshot is complete.

View file

@ -4,12 +4,12 @@
package akka.persistence.typed.javadsl;
import akka.actor.testkit.typed.TE;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.SupervisorStrategy;
import akka.actor.typed.javadsl.ActorContext;
import akka.persistence.typed.PersistenceId;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@ -24,10 +24,12 @@ import static akka.persistence.typed.scaladsl.PersistentBehaviorFailureSpec.conf
class FailingPersistentActor extends PersistentBehavior<String, String, String> {
private final ActorRef<String> probe;
private final ActorRef<Throwable> recoveryFailureProbe;
FailingPersistentActor(PersistenceId persistenceId, ActorRef<String> probe) {
FailingPersistentActor(PersistenceId persistenceId, ActorRef<String> probe, ActorRef<Throwable> recoveryFailureProbe) {
super(persistenceId, SupervisorStrategy.restartWithBackoff(Duration.ofMillis(1), Duration.ofMillis(5), 0.1));
this.probe = probe;
this.recoveryFailureProbe = recoveryFailureProbe;
}
@Override
@ -35,6 +37,11 @@ class FailingPersistentActor extends PersistentBehavior<String, String, String>
probe.tell("starting");
}
@Override
public void onRecoveryFailure(Throwable failure) {
recoveryFailureProbe.tell(failure);
}
@Override
public String emptyState() {
return "";
@ -64,8 +71,20 @@ public class PersistentActorFailureTest extends JUnitSuite {
@ClassRule
public static final TestKitJunitResource testKit = new TestKitJunitResource(config);
public static Behavior<String> fail(PersistenceId pid, ActorRef<String> probe, ActorRef<Throwable> recoveryFailureProbe) {
return new FailingPersistentActor(pid, probe, recoveryFailureProbe);
}
public static Behavior<String> fail(PersistenceId pid, ActorRef<String> probe) {
return new FailingPersistentActor(pid, probe);
return fail(pid, probe, testKit.<Throwable>createTestProbe().ref());
}
@Test
public void notifyRecoveryFailure() {
TestProbe<String> probe = testKit.createTestProbe();
TestProbe<Throwable> recoveryFailureProbe = testKit.createTestProbe();
Behavior<String> p1 = fail(new PersistenceId("fail-recovery-once"), probe.ref(), recoveryFailureProbe.ref());
testKit.spawn(p1);
recoveryFailureProbe.expectMessageClass(TE.class);
}
@Test

View file

@ -7,24 +7,25 @@ package akka.persistence.typed.scaladsl
import akka.actor.testkit.typed.TestKitSettings
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ ActorRef, Behavior, SupervisorStrategy }
import akka.actor.typed.{ ActorRef, SupervisorStrategy }
import akka.actor.testkit.typed.TE
import akka.persistence.AtomicWrite
import akka.persistence.journal.inmem.InmemJournal
import akka.persistence.typed.EventRejectedException
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Try
import akka.persistence.typed.PersistenceId
class ChaosJournal extends InmemJournal {
var count = 0
var failRecovery = true
var reject = true
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
val pid = messages.head.persistenceId
if (pid == "fail-first-2" && count < 2) {
@ -44,6 +45,8 @@ class ChaosJournal extends InmemJournal {
if (persistenceId == "fail-recovery-once" && failRecovery) {
failRecovery = false
Future.failed(TE("Nah"))
} else if (persistenceId == "fail-recovery") {
Future.failed(TE("Nope"))
} else {
super.asyncReadHighestSequenceNr(persistenceId, fromSequenceNr)
}
@ -67,7 +70,7 @@ class PersistentBehaviorFailureSpec extends ScalaTestWithActorTestKit(Persistent
implicit val testSettings = TestKitSettings(system)
def failingPersistentActor(pid: PersistenceId, probe: ActorRef[String]): Behavior[String] = PersistentBehavior[String, String, String](
def failingPersistentActor(pid: PersistenceId, probe: ActorRef[String] = TestProbe[String].ref): PersistentBehavior[String, String, String] = PersistentBehavior[String, String, String](
pid, "",
(_, cmd) {
probe.tell("persisting")
@ -77,11 +80,30 @@ class PersistentBehaviorFailureSpec extends ScalaTestWithActorTestKit(Persistent
probe.tell(event)
state + event
}
).onRecoveryCompleted { state
).onRecoveryCompleted { _
probe.tell("starting")
}.onPersistFailure(SupervisorStrategy.restartWithBackoff(1.milli, 5.millis, 0.1))
"A typed persistent actor (failures)" must {
"call onRecoveryFailure when replay fails" in {
val probe = TestProbe[Throwable]()
spawn(failingPersistentActor(PersistenceId("fail-recovery"))
.onRecoveryFailure(t probe.ref ! t))
probe.expectMessageType[TE].message shouldEqual "Nope"
}
"handle exceptions in onRecoveryFailure" in {
val probe = TestProbe[String]()
val pa = spawn(failingPersistentActor(PersistenceId("fail-recovery-twice"), probe.ref)
.onRecoveryFailure(t throw TE("recovery call back failure")))
pa ! "one"
probe.expectMessage("starting")
probe.expectMessage("persisting")
probe.expectMessage("one")
}
"restart with backoff" in {
val probe = TestProbe[String]()
val behav = failingPersistentActor(PersistenceId("fail-first-2"), probe.ref)