From 836347fe08d6b880e73d56c3201e453ebc009285 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Mon, 9 Jul 2018 09:57:36 +0100 Subject: [PATCH] Typed persistence: Throw on persist failures, #24479 * Rather than stop so that users can add their own supervision e.g. restartWithBackOff * Only allow back off supervisoir for persistent behaviors * Handle persist rejections --- .../src/main/paradox/typed/persistence.md | 21 +++ .../typed/EventRejectedException.scala | 11 ++ .../EventsourcedReplayingEvents.scala | 10 +- .../typed/internal/EventsourcedRunning.scala | 21 +-- .../internal/JournalFailureException.scala | 18 +++ .../internal/PersistentBehaviorImpl.scala | 112 ++++++++------ .../typed/javadsl/PersistentBehavior.scala | 19 ++- .../typed/scaladsl/PersistentBehaviors.scala | 11 ++ .../javadsl/PersistentActorFailureTest.java | 93 ++++++++++++ .../javadsl/PersistentActorJavaDslTest.java | 19 +-- .../typed/BasicPersistentBehaviorsTest.java | 10 +- .../PersistentBehaviorFailureSpec.scala | 142 ++++++++++++++++++ .../scaladsl/PersistentBehaviorSpec.scala | 1 - .../BasicPersistentBehaviorsCompileOnly.scala | 13 +- .../remote/artery/SystemMessageDelivery.scala | 2 +- 15 files changed, 409 insertions(+), 94 deletions(-) create mode 100644 akka-persistence-typed/src/main/scala/akka/persistence/typed/EventRejectedException.scala create mode 100644 akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/JournalFailureException.scala create mode 100644 akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorFailureTest.java create mode 100644 akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorFailureSpec.scala diff --git a/akka-docs/src/main/paradox/typed/persistence.md b/akka-docs/src/main/paradox/typed/persistence.md index 5059581dfc..b8e587c737 100644 --- a/akka-docs/src/main/paradox/typed/persistence.md +++ b/akka-docs/src/main/paradox/typed/persistence.md @@ -269,3 +269,24 @@ Scala Java : @@snip [BasicPersistentBehaviorsTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java) { #wrapPersistentBehavior } + + +## Journal failures + +By default a `PersistentBehavior` will stop if an exception is thrown from the journal. It is possible to override this with +any `BackoffSupervisorStrategy`. It is not possible to use the normal supervision wrapping for this as it isn't valid to +`resume` a behavior on a journal failure as it is not known if the event was persisted. + + +Scala +: @@snip [BasicPersistentBehaviorsSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala) { #supervision } + +Java +: @@snip [BasicPersistentBehaviorsTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java) { #supervision } + +## Journal rejections + +Journals can reject events. The difference from a failure is that the journal must decide to reject an event before +trying to persist it e.g. because of a serialization exception. If an event is rejected it definitely won't be in the journal. +This is signalled to a `PersistentBehavior` via a `EventRejectedException` and can be handled with a @ref[supervisor](fault-tolerance.md). + diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventRejectedException.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventRejectedException.scala new file mode 100644 index 0000000000..e51e2d373f --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventRejectedException.scala @@ -0,0 +1,11 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.persistence.typed + +/** + * Thrown if a journal rejects an event e.g. due to a serialization error. + */ +final class EventRejectedException(persistenceId: String, sequenceNr: Long, cause: Throwable) + extends RuntimeException(s"PersistenceId $persistenceId sequenceNr: $sequenceNr", cause) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala index 7825a36789..513f6e04c6 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala @@ -121,7 +121,7 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set } else { cancelRecoveryTimer(setup.timers) val msg = s"Replay timed out, didn't get event within ]${setup.settings.recoveryEventTimeout}], highest sequence number seen [${state.seqNr}]" - onRecoveryFailure(new RecoveryTimedOut(msg), state.seqNr, None) // TODO allow users to hook into this? + onRecoveryFailure(new RecoveryTimedOut(msg), state.seqNr, None) } } else { // snapshot timeout, but we're already in the events recovery phase @@ -145,14 +145,14 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set cancelRecoveryTimer(setup.timers) tryReturnRecoveryPermit("on replay failure: " + cause.getMessage) - message match { + val msg = message match { case Some(evt) ⇒ - setup.log.error(cause, "Exception during recovery while handling [{}] with sequence number [{}].", evt.getClass.getName, sequenceNr) + s"Exception during recovery while handling [${evt.getClass.getName}] with sequence number [$sequenceNr]. PersistenceId: [${setup.persistence}]" case None ⇒ - setup.log.error(cause, "Exception during recovery. Last known sequence number [{}]", setup.persistenceId, sequenceNr) + s"Exception during recovery. Last known sequence number [$sequenceNr]. PersistenceId: [${setup.persistenceId}]" } - Behaviors.stopped + throw new JournalFailureException(msg, cause) } protected def onRecoveryCompleted(state: ReplayingState[S]): Behavior[InternalProtocol] = try { diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala index 4bfb5b5c0b..5dff502a6e 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala @@ -12,6 +12,7 @@ import akka.annotation.InternalApi import akka.persistence.JournalProtocol._ import akka.persistence._ import akka.persistence.journal.Tagged +import akka.persistence.typed.EventRejectedException import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, MDC } import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._ @@ -227,13 +228,12 @@ private[akka] object EventsourcedRunning { case WriteMessageRejected(p, cause, id) ⇒ if (id == setup.writerIdentity.instanceId) { - onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop (by design) - onWriteResponse(p) + throw new EventRejectedException(setup.persistenceId, p.sequenceNr, cause) } else this case WriteMessageFailure(p, cause, id) ⇒ if (id == setup.writerIdentity.instanceId) - onPersistFailureThenStop(cause, p.payload, p.sequenceNr) + throw new JournalFailureException(setup.persistenceId, p.sequenceNr, p.payload.getClass.getName, cause) else this case WriteMessagesSuccessful ⇒ @@ -250,21 +250,6 @@ private[akka] object EventsourcedRunning { } } - private def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = { - setup.log.error( - cause, - "Rejected to persist event type [{}] with sequence number [{}] for persistenceId [{}] due to [{}].", - event.getClass.getName, seqNr, setup.persistenceId, cause.getMessage) - } - - private def onPersistFailureThenStop(cause: Throwable, event: Any, seqNr: Long): Behavior[InternalProtocol] = { - setup.log.error(cause, "Failed to persist event type [{}] with sequence number [{}] for persistenceId [{}].", - event.getClass.getName, seqNr, setup.persistenceId) - - // FIXME see #24479 for reconsidering the stopping behaviour - Behaviors.stopped - } - } private def onSnapshotterResponse( diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/JournalFailureException.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/JournalFailureException.scala new file mode 100644 index 0000000000..9eb9266fee --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/JournalFailureException.scala @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.persistence.typed.internal + +import akka.annotation.InternalApi + +/** + * INTERNAL API + * + * Used for journal failures. Private to akka as only internal supervision strategies should use it. + */ +@InternalApi +final private[akka] class JournalFailureException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) { + def this(persistenceId: String, sequenceNr: Long, eventType: String, cause: Throwable) = + this(s"Failed to persist event type $eventType with sequence number $sequenceNr for persistenceId [$persistenceId]", cause) +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala index 11ea532309..5431bc8f7b 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala @@ -6,7 +6,7 @@ package akka.persistence.typed.internal import akka.Done import akka.actor.typed -import akka.actor.typed.Behavior +import akka.actor.typed.{ BackoffSupervisorStrategy, Behavior, SupervisorStrategy } import akka.actor.typed.scaladsl.{ ActorContext, Behaviors } import akka.annotation.InternalApi import akka.persistence._ @@ -32,58 +32,59 @@ private[akka] object PersistentBehaviorImpl { @InternalApi private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( - persistenceId: String, - emptyState: State, - commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State], - eventHandler: PersistentBehaviors.EventHandler[State, Event], - journalPluginId: Option[String] = None, - snapshotPluginId: Option[String] = None, - recoveryCompleted: (ActorContext[Command], State) ⇒ Unit = ConstantFun.scalaAnyTwoToUnit, - tagger: Event ⇒ Set[String] = (_: Event) ⇒ Set.empty[String], - eventAdapter: EventAdapter[Event, _] = NoOpEventAdapter.instance[Event], - snapshotWhen: (State, Event, Long) ⇒ Boolean = ConstantFun.scalaAnyThreeToFalse, - recovery: Recovery = Recovery(), - onSnapshot: (ActorContext[Command], SnapshotMetadata, Try[Done]) ⇒ Unit = PersistentBehaviorImpl.defaultOnSnapshot[Command] _ + persistenceId: String, + emptyState: State, + commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State], + eventHandler: PersistentBehaviors.EventHandler[State, Event], + journalPluginId: Option[String] = None, + snapshotPluginId: Option[String] = None, + recoveryCompleted: (ActorContext[Command], State) ⇒ Unit = ConstantFun.scalaAnyTwoToUnit, + tagger: Event ⇒ Set[String] = (_: Event) ⇒ Set.empty[String], + eventAdapter: EventAdapter[Event, _] = NoOpEventAdapter.instance[Event], + snapshotWhen: (State, Event, Long) ⇒ Boolean = ConstantFun.scalaAnyThreeToFalse, + recovery: Recovery = Recovery(), + supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop, + onSnapshot: (ActorContext[Command], SnapshotMetadata, Try[Done]) ⇒ Unit = PersistentBehaviorImpl.defaultOnSnapshot[Command] _ ) extends PersistentBehavior[Command, Event, State] with EventsourcedStashReferenceManagement { override def apply(context: typed.ActorContext[Command]): Behavior[Command] = { - Behaviors.setup[EventsourcedBehavior.InternalProtocol] { ctx ⇒ - Behaviors.withTimers { timers ⇒ - val settings = EventsourcedSettings(ctx.system) - val internalStash = stashBuffer(settings) - Behaviors.tap( - onMessage = (_, _) ⇒ Unit, - onSignal = onSignalCleanup, - behavior = { - val setup = new EventsourcedSetup( - ctx, - timers, - persistenceId, - emptyState, - commandHandler, - eventHandler, - WriterIdentity.newIdentity(), - recoveryCompleted, - onSnapshot, - tagger, - eventAdapter, - snapshotWhen, - recovery, - holdingRecoveryPermit = false, - settings = settings, - internalStash = internalStash - ) - - EventsourcedRequestingRecoveryPermit(setup) - } - ) - } - }.widen[Any] { - case res: JournalProtocol.Response ⇒ InternalProtocol.JournalResponse(res) - case res: SnapshotProtocol.Response ⇒ InternalProtocol.SnapshotterResponse(res) - case RecoveryPermitter.RecoveryPermitGranted ⇒ InternalProtocol.RecoveryPermitGranted - case cmd: Command @unchecked ⇒ InternalProtocol.IncomingCommand(cmd) - }.narrow[Command] + Behaviors.supervise( + Behaviors.setup[EventsourcedBehavior.InternalProtocol] { ctx ⇒ + Behaviors.withTimers { timers ⇒ + val settings = EventsourcedSettings(ctx.system) + val internalStash = stashBuffer(settings) + Behaviors.tap( + onMessage = (_, _) ⇒ Unit, + onSignal = onSignalCleanup, + behavior = { + val setup = new EventsourcedSetup( + ctx, + timers, + persistenceId, + emptyState, + commandHandler, + eventHandler, + WriterIdentity.newIdentity(), + recoveryCompleted, + onSnapshot, + tagger, + eventAdapter, + snapshotWhen, + recovery, + holdingRecoveryPermit = false, + settings = settings, + internalStash = internalStash + ) + EventsourcedRequestingRecoveryPermit(setup) + } + ) + } + }.widen[Any] { + case res: JournalProtocol.Response ⇒ InternalProtocol.JournalResponse(res) + case res: SnapshotProtocol.Response ⇒ InternalProtocol.SnapshotterResponse(res) + case RecoveryPermitter.RecoveryPermitGranted ⇒ InternalProtocol.RecoveryPermitGranted + case cmd: Command @unchecked ⇒ InternalProtocol.IncomingCommand(cmd) + }.narrow[Command]).onFailure[JournalFailureException](supervisionStrategy) } /** @@ -159,4 +160,15 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( */ def onSnapshot(callback: (ActorContext[Command], SnapshotMetadata, Try[Done]) ⇒ Unit): PersistentBehavior[Command, Event, State] = copy(onSnapshot = callback) + + /** + * Back off strategy for persist failures. + * + * Specifically BackOff to prevent resume being used. Resume is not allowed as + * it will be unknown if the event has been persisted. + * + * If not specified the actor will be stopped on failure. + */ + def onPersistFailure(backoffStrategy: BackoffSupervisorStrategy): PersistentBehavior[Command, Event, State] = + copy(supervisionStrategy = backoffStrategy) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala index a3fd9aea44..44c47951d2 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala @@ -8,7 +8,7 @@ import java.util.function.Predicate import java.util.{ Collections, Optional } import akka.actor.typed -import akka.actor.typed.Behavior +import akka.actor.typed.{ BackoffSupervisorStrategy, Behavior, SupervisorStrategy } import akka.actor.typed.Behavior.DeferredBehavior import akka.actor.typed.javadsl.ActorContext import akka.annotation.{ ApiMayChange, InternalApi } @@ -21,7 +21,15 @@ import akka.japi.pf.FI /** Java API */ @ApiMayChange -abstract class PersistentBehavior[Command, Event, State >: Null](val persistenceId: String) extends DeferredBehavior[Command] { +abstract class PersistentBehavior[Command, Event, State >: Null] private (val persistenceId: String, supervisorStrategy: Option[BackoffSupervisorStrategy]) extends DeferredBehavior[Command] { + + def this(persistenceId: String) = { + this(persistenceId, None) + } + + def this(persistenceId: String, backoffSupervisorStrategy: BackoffSupervisorStrategy) = { + this(persistenceId, Some(backoffSupervisorStrategy)) + } /** * Factory of effects. @@ -142,7 +150,7 @@ abstract class PersistentBehavior[Command, Event, State >: Null](val persistence else tags.asScala.toSet } - scaladsl.PersistentBehaviors.receive[Command, Event, State]( + val behavior = scaladsl.PersistentBehaviors.receive[Command, Event, State]( persistenceId, emptyState, (c, state, cmd) ⇒ commandHandler()(state, cmd).asInstanceOf[EffectImpl[Event, State]], @@ -163,6 +171,11 @@ abstract class PersistentBehavior[Command, Event, State >: Null](val persistence case Failure(t) ⇒ Optional.of(t) }) }).eventAdapter(eventAdapter()) + + if (supervisorStrategy.isDefined) + behavior.onPersistFailure(supervisorStrategy.get) + else + behavior } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala index 518be6efac..e2d9984d19 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala @@ -5,6 +5,7 @@ package akka.persistence.typed.scaladsl import akka.Done +import akka.actor.typed.BackoffSupervisorStrategy import akka.actor.typed.Behavior.DeferredBehavior import akka.actor.typed.scaladsl.ActorContext import akka.annotation.InternalApi @@ -148,5 +149,15 @@ trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command * in types Journals understand but is of a different type than `Event`. */ def eventAdapter(adapter: EventAdapter[Event, _]): PersistentBehavior[Command, Event, State] + + /** + * Back off strategy for persist failures. + * + * Specifically BackOff to prevent resume being used. Resume is not allowed as + * it will be unknown if the event has been persisted. + * + * If not specified the actor will be stopped on failure. + */ + def onPersistFailure(backoffStrategy: BackoffSupervisorStrategy): PersistentBehavior[Command, Event, State] } diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorFailureTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorFailureTest.java new file mode 100644 index 0000000000..754b8d5d5b --- /dev/null +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorFailureTest.java @@ -0,0 +1,93 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.persistence.typed.javadsl; + +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; +import akka.actor.testkit.typed.javadsl.TestProbe; +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.SupervisorStrategy; +import akka.actor.typed.javadsl.ActorContext; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.junit.ClassRule; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; + +import java.time.Duration; + +import static akka.persistence.typed.scaladsl.PersistentBehaviorFailureSpec.conf; + +class FailingPersistentActor extends PersistentBehavior { + + private final ActorRef probe; + + FailingPersistentActor(String persistenceId, ActorRef probe) { + super(persistenceId, SupervisorStrategy.restartWithBackoff(Duration.ofMillis(1), Duration.ofMillis(5), 0.1)); + this.probe = probe; + } + + @Override + public void onRecoveryCompleted(String s) { + probe.tell("starting"); + } + + @Override + public String emptyState() { + return ""; + } + + @Override + public CommandHandler commandHandler() { + return (state, command) -> { + probe.tell("persisting"); + return Effect().persist(command); + }; + } + + @Override + public EventHandler eventHandler() { + return (state, event) -> { + probe.tell(event); + return state + event; + }; + } +} + +public class PersistentActorFailureTest extends JUnitSuite { + + public static final Config config = conf().withFallback(ConfigFactory.load()); + + @ClassRule + public static final TestKitJunitResource testKit = new TestKitJunitResource(config); + + public static Behavior fail(String pid, ActorRef probe) { + return new FailingPersistentActor(pid, probe); + } + + @Test + public void persistEvents() throws Exception { + TestProbe probe = testKit.createTestProbe(); + Behavior p1 = fail("fail-first-2", probe.ref()); + ActorRef c = testKit.spawn(p1); + probe.expectMessage("starting"); + // fail + c.tell("one"); + probe.expectMessage("persisting"); + probe.expectMessage("one"); + probe.expectMessage("starting"); + // fail + c.tell("two"); + probe.expectMessage("persisting"); + probe.expectMessage("two"); + probe.expectMessage("starting"); + // work + c.tell("three"); + probe.expectMessage("persisting"); + probe.expectMessage("three"); + // no starting as this one did not fail + probe.expectNoMessage(); + } +} diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java index 4209ca135c..24264b68d1 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java @@ -4,10 +4,7 @@ package akka.persistence.typed.javadsl; -import akka.actor.typed.ActorRef; -import akka.actor.typed.Behavior; -import akka.actor.typed.Signal; -import akka.actor.typed.SupervisorStrategy; +import akka.actor.typed.*; import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Adapter; import akka.actor.typed.javadsl.Behaviors; @@ -103,7 +100,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { } public static class Incremented implements Serializable { - private final int delta; + final int delta; public Incremented(int delta) { this.delta = delta; @@ -132,8 +129,8 @@ public class PersistentActorJavaDslTest extends JUnitSuite { } public static class State implements Serializable { - private final int value; - private final List history; + final int value; + final List history; public State(int value, List history) { this.value = value; @@ -240,7 +237,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { ); } - private Behavior counter( + private static Behavior counter( String persistentId, ActorRef> eventProbe, ActorRef loggingProbe, @@ -250,7 +247,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { EventAdapter transformer) { return Behaviors.setup(ctx -> { - return new PersistentBehavior(persistentId) { + return new PersistentBehavior(persistentId, SupervisorStrategy.restartWithBackoff(Duration.ofMillis(1), Duration.ofMillis(5), 0.1)) { @Override public CommandHandler commandHandler() { return commandHandlerBuilder(State.class) @@ -479,7 +476,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { .runWith(Sink.seq(), materializer).toCompletableFuture().get(); assertEquals(Lists.newArrayList( new EventEnvelope(new Sequence(1), "transform", 1, new Wrapper<>(new Incremented(1))) - ), events); + ), events); ActorRef c2 = testKit.spawn(counter("transform", eventProbe.ref(), new WrapperEventAdapter())); c2.tell(new GetValue(stateProbe.ref())); @@ -487,7 +484,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { } //event-wrapper - class WrapperEventAdapter extends EventAdapter { + class WrapperEventAdapter extends EventAdapter { @Override public Wrapper toJournal(Incremented incremented) { return new Wrapper<>(incremented); diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java index fe154f92cf..5d557d0ebd 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java @@ -5,12 +5,13 @@ package jdocs.akka.persistence.typed; import akka.actor.typed.Behavior; +import akka.actor.typed.SupervisorStrategy; import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Behaviors; import akka.persistence.typed.javadsl.CommandHandler; -import akka.persistence.typed.javadsl.Effect; import akka.persistence.typed.javadsl.EventHandler; import akka.persistence.typed.javadsl.PersistentBehavior; +import java.time.Duration; import java.util.Collections; import java.util.Set; @@ -22,11 +23,12 @@ public class BasicPersistentBehaviorsTest { public interface Event {} public static class State {} + //#supervision public static class MyPersistentBehavior extends PersistentBehavior { - public MyPersistentBehavior(String persistenceId) { - super(persistenceId); + super(persistenceId, SupervisorStrategy.restartWithBackoff(Duration.ofSeconds(10), Duration.ofSeconds(30), 0.2)); } + //#supervision @Override public State emptyState() { @@ -62,7 +64,7 @@ public class BasicPersistentBehaviorsTest { //#tagging } - static Behavior persistentBehavior = new MyPersistentBehavior("pid"); + static PersistentBehavior persistentBehavior = new MyPersistentBehavior("pid"); //#structure //#wrapPersistentBehavior diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorFailureSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorFailureSpec.scala new file mode 100644 index 0000000000..38a56aac71 --- /dev/null +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorFailureSpec.scala @@ -0,0 +1,142 @@ +/** + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package akka.persistence.typed.scaladsl + +import java.util.concurrent.atomic.AtomicInteger + +import akka.Done +import akka.actor.testkit.typed.TestKitSettings +import akka.actor.testkit.typed.scaladsl._ +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.{ ActorRef, Behavior, SupervisorStrategy, TypedAkkaSpecWithShutdown } +import akka.persistence.AtomicWrite +import akka.persistence.journal.inmem.InmemJournal +import akka.persistence.typed.EventRejectedException +import com.typesafe.config.{ Config, ConfigFactory } +import org.scalatest.concurrent.Eventually + +import scala.collection.immutable +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.Try + +class ChaosJournal extends InmemJournal { + var count = 0 + var failRecovery = true + var reject = true + override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = { + val pid = messages.head.persistenceId + if (pid == "fail-first-2" && count < 2) { + count += 1 + Future.failed(new RuntimeException("database says no")) + } else if (pid == "reject-first" && reject) { + reject = false + Future.successful(messages.map(aw ⇒ Try { + throw new RuntimeException("I don't like it") + })) + } else { + super.asyncWriteMessages(messages) + } + } + + override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = { + if (persistenceId == "fail-recovery-once" && failRecovery) { + failRecovery = false + Future.failed(new RuntimeException("Nah")) + } else { + super.asyncReadHighestSequenceNr(persistenceId, fromSequenceNr) + } + } +} + +object PersistentBehaviorFailureSpec { + + val conf = ConfigFactory.parseString( + s""" + akka.loglevel = DEBUG + akka.persistence.journal.plugin = "failure-journal" + failure-journal = $${akka.persistence.journal.inmem} + failure-journal { + class = "akka.persistence.typed.scaladsl.ChaosJournal" + } + """).withFallback(ConfigFactory.load("reference.conf")).resolve() +} + +class PersistentBehaviorFailureSpec extends ActorTestKit with TypedAkkaSpecWithShutdown with Eventually { + + import PersistentBehaviorSpec._ + + override lazy val config: Config = PersistentBehaviorFailureSpec.conf + + implicit val testSettings = TestKitSettings(system) + + def failingPersistentActor(pid: String, probe: ActorRef[String]): Behavior[String] = PersistentBehaviors.receive[String, String, String]( + pid, "", + (ctx, state, cmd) ⇒ { + probe.tell("persisting") + Effect.persist(cmd) + }, + (state, event) ⇒ { + probe.tell(event) + state + event + } + ).onRecoveryCompleted { (ctx, state) ⇒ + probe.tell("starting") + }.onPersistFailure(SupervisorStrategy.restartWithBackoff(1.milli, 5.millis, 0.1)) + + "A typed persistent actor (failures)" must { + "restart with backoff" in { + val probe = TestProbe[String]() + val behav = failingPersistentActor("fail-first-2", probe.ref) + val c = spawn(behav) + probe.expectMessage("starting") + // fail + c ! "one" + probe.expectMessage("persisting") + probe.expectMessage("one") + probe.expectMessage("starting") + // fail + c ! "two" + probe.expectMessage("persisting") + probe.expectMessage("two") + probe.expectMessage("starting") + // work! + c ! "three" + probe.expectMessage("persisting") + probe.expectMessage("three") + // no restart + probe.expectNoMessage() + } + + "restart with backoff for recovery" in { + val probe = TestProbe[String]() + val behav = failingPersistentActor("fail-recovery-once", probe.ref) + spawn(behav) + // First time fails, second time should work and call onRecoveryComplete + probe.expectMessage("starting") + probe.expectNoMessage() + } + + "handles rejections" in { + val probe = TestProbe[String]() + val behav = + Behaviors.supervise( + failingPersistentActor("reject-first", probe.ref)).onFailure[EventRejectedException]( + SupervisorStrategy.restartWithBackoff(1.milli, 5.millis, 0.1)) + val c = spawn(behav) + // First time fails, second time should work and call onRecoveryComplete + probe.expectMessage("starting") + c ! "one" + probe.expectMessage("persisting") + probe.expectMessage("one") + probe.expectMessage("starting") + c ! "two" + probe.expectMessage("persisting") + probe.expectMessage("two") + // no restart + probe.expectNoMessage() + } + } +} diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala index b92059af17..72feb718de 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala @@ -227,7 +227,6 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown "persist an event" in { val c = spawn(counter(nextPid)) - val probe = TestProbe[State] c ! Increment c ! GetValue(probe.ref) diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala index 102c5309c3..ae5a421c7d 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala @@ -4,10 +4,12 @@ package docs.akka.persistence.typed -import akka.actor.typed.Behavior +import akka.actor.typed.{ Behavior, SupervisorStrategy } import akka.actor.typed.scaladsl.Behaviors import akka.persistence.typed.scaladsl.PersistentBehaviors +import scala.concurrent.duration._ + object BasicPersistentBehaviorsCompileOnly { //#structure @@ -83,4 +85,13 @@ object BasicPersistentBehaviorsCompileOnly { }) } //#wrapPersistentBehavior + + //#supervision + val supervisedBehavior = samplePersistentBehavior.onPersistFailure( + SupervisorStrategy.restartWithBackoff( + minBackoff = 10.seconds, + maxBackoff = 60.seconds, + randomFactor = 0.1 + )) + //#supervision } diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index e67e735fae..91aa0fdc6d 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -61,7 +61,7 @@ import akka.util.OptionVal private case object ResendTick - // If other message types than SystemMesage need acked delivery they can extend this trait. + // If other message types than SystemMessage need acked delivery they can extend this trait. // Used in tests since real SystemMessage are somewhat cumbersome to create. trait AckedDeliveryMessage