diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala index 97fc784ae0..725cb90b8c 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala @@ -149,6 +149,11 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set * @param message the message that was being processed when the exception was thrown */ protected def onRecoveryFailure(cause: Throwable, sequenceNr: Long, message: Option[Any]): Behavior[InternalProtocol] = { + try { + setup.onRecoveryFailure(cause) + } catch { + case NonFatal(t) ⇒ setup.log.error(t, "onRecoveryFailure threw exception") + } setup.cancelRecoveryTimer() tryReturnRecoveryPermit("on replay failure: " + cause.getMessage) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala index 32031066fc..9980a5a93a 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala @@ -36,6 +36,7 @@ private[persistence] final class EventsourcedSetup[C, E, S]( val eventHandler: PersistentBehavior.EventHandler[S, E], val writerIdentity: WriterIdentity, val recoveryCompleted: S ⇒ Unit, + val onRecoveryFailure: Throwable ⇒ Unit, val onSnapshot: (SnapshotMetadata, Try[Done]) ⇒ Unit, val tagger: E ⇒ Set[String], val eventAdapter: EventAdapter[E, _], diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala index df4b0eac0b..e70ce0d6d0 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala @@ -45,7 +45,8 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( snapshotWhen: (State, Event, Long) ⇒ Boolean = ConstantFun.scalaAnyThreeToFalse, recovery: Recovery = Recovery(), supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop, - onSnapshot: (SnapshotMetadata, Try[Done]) ⇒ Unit = ConstantFun.scalaAnyTwoToUnit + onSnapshot: (SnapshotMetadata, Try[Done]) ⇒ Unit = ConstantFun.scalaAnyTwoToUnit, + onRecoveryFailure: Throwable ⇒ Unit = ConstantFun.scalaAnyToUnit ) extends PersistentBehavior[Command, Event, State] with EventsourcedStashReferenceManagement { override def apply(context: typed.ActorContext[Command]): Behavior[Command] = { @@ -69,6 +70,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( eventHandler, WriterIdentity.newIdentity(), recoveryCompleted, + onRecoveryFailure, actualOnSnapshot, tagger, eventAdapter, @@ -191,4 +193,11 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( */ def onPersistFailure(backoffStrategy: BackoffSupervisorStrategy): PersistentBehavior[Command, Event, State] = copy(supervisionStrategy = backoffStrategy) + + /** + * The `callback` function is called to notify that recovery has failed. For setting a supervision + * strategy `onPersistFailure` + */ + def onRecoveryFailure(callback: Throwable ⇒ Unit): PersistentBehavior[Command, Event, State] = + copy(onRecoveryFailure = callback) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala index 331837a628..0527f65f70 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala @@ -91,6 +91,12 @@ abstract class PersistentBehavior[Command, Event, State >: Null] private[akka] ( */ def onRecoveryCompleted(state: State): Unit = () + /** + * The `callback` function is called to notify the actor that the recovery process + * has failed + */ + def onRecoveryFailure(failure: Throwable): Unit = () + /** * Override to get notified when a snapshot is finished. * @@ -167,7 +173,9 @@ abstract class PersistentBehavior[Command, Event, State >: Null] private[akka] ( case Success(_) ⇒ Optional.empty() case Failure(t) ⇒ Optional.of(t) }) - }).eventAdapter(eventAdapter()) + }) + .eventAdapter(eventAdapter()) + .onRecoveryFailure(onRecoveryFailure) if (supervisorStrategy.isPresent) behavior.onPersistFailure(supervisorStrategy.get) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehavior.scala index beecf44d19..0740dd3a3f 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehavior.scala @@ -93,10 +93,14 @@ object PersistentBehavior { def persistenceId: PersistenceId /** - * The `callback` function is called to notify the actor that the recovery process - * is finished. + * The `callback` function is called to notify that the recovery process has finished. */ def onRecoveryCompleted(callback: State ⇒ Unit): PersistentBehavior[Command, Event, State] + /** + * The `callback` function is called to notify that recovery has failed. For setting a supervision + * strategy `onPersistFailure` + */ + def onRecoveryFailure(callback: Throwable ⇒ Unit): PersistentBehavior[Command, Event, State] /** * The `callback` function is called to notify when a snapshot is complete. diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorFailureTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorFailureTest.java index 14c1b09a59..cb8bd7535a 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorFailureTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorFailureTest.java @@ -4,12 +4,12 @@ package akka.persistence.typed.javadsl; +import akka.actor.testkit.typed.TE; import akka.actor.testkit.typed.javadsl.TestKitJunitResource; import akka.actor.testkit.typed.javadsl.TestProbe; import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; import akka.actor.typed.SupervisorStrategy; -import akka.actor.typed.javadsl.ActorContext; import akka.persistence.typed.PersistenceId; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -24,10 +24,12 @@ import static akka.persistence.typed.scaladsl.PersistentBehaviorFailureSpec.conf class FailingPersistentActor extends PersistentBehavior { private final ActorRef probe; + private final ActorRef recoveryFailureProbe; - FailingPersistentActor(PersistenceId persistenceId, ActorRef probe) { + FailingPersistentActor(PersistenceId persistenceId, ActorRef probe, ActorRef recoveryFailureProbe) { super(persistenceId, SupervisorStrategy.restartWithBackoff(Duration.ofMillis(1), Duration.ofMillis(5), 0.1)); this.probe = probe; + this.recoveryFailureProbe = recoveryFailureProbe; } @Override @@ -35,6 +37,11 @@ class FailingPersistentActor extends PersistentBehavior probe.tell("starting"); } + @Override + public void onRecoveryFailure(Throwable failure) { + recoveryFailureProbe.tell(failure); + } + @Override public String emptyState() { return ""; @@ -64,8 +71,20 @@ public class PersistentActorFailureTest extends JUnitSuite { @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(config); + public static Behavior fail(PersistenceId pid, ActorRef probe, ActorRef recoveryFailureProbe) { + return new FailingPersistentActor(pid, probe, recoveryFailureProbe); + } public static Behavior fail(PersistenceId pid, ActorRef probe) { - return new FailingPersistentActor(pid, probe); + return fail(pid, probe, testKit.createTestProbe().ref()); + } + + @Test + public void notifyRecoveryFailure() { + TestProbe probe = testKit.createTestProbe(); + TestProbe recoveryFailureProbe = testKit.createTestProbe(); + Behavior p1 = fail(new PersistenceId("fail-recovery-once"), probe.ref(), recoveryFailureProbe.ref()); + testKit.spawn(p1); + recoveryFailureProbe.expectMessageClass(TE.class); } @Test diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorFailureSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorFailureSpec.scala index 9a53d62ed9..4c48d8039b 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorFailureSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorFailureSpec.scala @@ -7,24 +7,25 @@ package akka.persistence.typed.scaladsl import akka.actor.testkit.typed.TestKitSettings import akka.actor.testkit.typed.scaladsl._ import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.{ ActorRef, Behavior, SupervisorStrategy } +import akka.actor.typed.{ ActorRef, SupervisorStrategy } import akka.actor.testkit.typed.TE import akka.persistence.AtomicWrite import akka.persistence.journal.inmem.InmemJournal import akka.persistence.typed.EventRejectedException import com.typesafe.config.ConfigFactory import org.scalatest.WordSpecLike + import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Try - import akka.persistence.typed.PersistenceId class ChaosJournal extends InmemJournal { var count = 0 var failRecovery = true var reject = true + override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = { val pid = messages.head.persistenceId if (pid == "fail-first-2" && count < 2) { @@ -44,6 +45,8 @@ class ChaosJournal extends InmemJournal { if (persistenceId == "fail-recovery-once" && failRecovery) { failRecovery = false Future.failed(TE("Nah")) + } else if (persistenceId == "fail-recovery") { + Future.failed(TE("Nope")) } else { super.asyncReadHighestSequenceNr(persistenceId, fromSequenceNr) } @@ -67,7 +70,7 @@ class PersistentBehaviorFailureSpec extends ScalaTestWithActorTestKit(Persistent implicit val testSettings = TestKitSettings(system) - def failingPersistentActor(pid: PersistenceId, probe: ActorRef[String]): Behavior[String] = PersistentBehavior[String, String, String]( + def failingPersistentActor(pid: PersistenceId, probe: ActorRef[String] = TestProbe[String].ref): PersistentBehavior[String, String, String] = PersistentBehavior[String, String, String]( pid, "", (_, cmd) ⇒ { probe.tell("persisting") @@ -77,11 +80,30 @@ class PersistentBehaviorFailureSpec extends ScalaTestWithActorTestKit(Persistent probe.tell(event) state + event } - ).onRecoveryCompleted { state ⇒ + ).onRecoveryCompleted { _ ⇒ probe.tell("starting") }.onPersistFailure(SupervisorStrategy.restartWithBackoff(1.milli, 5.millis, 0.1)) "A typed persistent actor (failures)" must { + + "call onRecoveryFailure when replay fails" in { + val probe = TestProbe[Throwable]() + spawn(failingPersistentActor(PersistenceId("fail-recovery")) + .onRecoveryFailure(t ⇒ probe.ref ! t)) + + probe.expectMessageType[TE].message shouldEqual "Nope" + } + + "handle exceptions in onRecoveryFailure" in { + val probe = TestProbe[String]() + val pa = spawn(failingPersistentActor(PersistenceId("fail-recovery-twice"), probe.ref) + .onRecoveryFailure(t ⇒ throw TE("recovery call back failure"))) + pa ! "one" + probe.expectMessage("starting") + probe.expectMessage("persisting") + probe.expectMessage("one") + } + "restart with backoff" in { val probe = TestProbe[String]() val behav = failingPersistentActor(PersistenceId("fail-first-2"), probe.ref)