Typed persistence: Throw on persist failures, #24479

* Rather than stop so that users can add their own supervision e.g.
restartWithBackOff
* Only allow back off supervisoir for persistent behaviors
* Handle persist rejections
This commit is contained in:
Christopher Batey 2018-07-09 09:57:36 +01:00 committed by Patrik Nordwall
parent 9cecba3455
commit 836347fe08
15 changed files with 409 additions and 94 deletions

View file

@ -269,3 +269,24 @@ Scala
Java Java
: @@snip [BasicPersistentBehaviorsTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java) { #wrapPersistentBehavior } : @@snip [BasicPersistentBehaviorsTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java) { #wrapPersistentBehavior }
## Journal failures
By default a `PersistentBehavior` will stop if an exception is thrown from the journal. It is possible to override this with
any `BackoffSupervisorStrategy`. It is not possible to use the normal supervision wrapping for this as it isn't valid to
`resume` a behavior on a journal failure as it is not known if the event was persisted.
Scala
: @@snip [BasicPersistentBehaviorsSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala) { #supervision }
Java
: @@snip [BasicPersistentBehaviorsTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java) { #supervision }
## Journal rejections
Journals can reject events. The difference from a failure is that the journal must decide to reject an event before
trying to persist it e.g. because of a serialization exception. If an event is rejected it definitely won't be in the journal.
This is signalled to a `PersistentBehavior` via a `EventRejectedException` and can be handled with a @ref[supervisor](fault-tolerance.md).

View file

@ -0,0 +1,11 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed
/**
* Thrown if a journal rejects an event e.g. due to a serialization error.
*/
final class EventRejectedException(persistenceId: String, sequenceNr: Long, cause: Throwable)
extends RuntimeException(s"PersistenceId $persistenceId sequenceNr: $sequenceNr", cause)

View file

@ -121,7 +121,7 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set
} else { } else {
cancelRecoveryTimer(setup.timers) cancelRecoveryTimer(setup.timers)
val msg = s"Replay timed out, didn't get event within ]${setup.settings.recoveryEventTimeout}], highest sequence number seen [${state.seqNr}]" val msg = s"Replay timed out, didn't get event within ]${setup.settings.recoveryEventTimeout}], highest sequence number seen [${state.seqNr}]"
onRecoveryFailure(new RecoveryTimedOut(msg), state.seqNr, None) // TODO allow users to hook into this? onRecoveryFailure(new RecoveryTimedOut(msg), state.seqNr, None)
} }
} else { } else {
// snapshot timeout, but we're already in the events recovery phase // snapshot timeout, but we're already in the events recovery phase
@ -145,14 +145,14 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set
cancelRecoveryTimer(setup.timers) cancelRecoveryTimer(setup.timers)
tryReturnRecoveryPermit("on replay failure: " + cause.getMessage) tryReturnRecoveryPermit("on replay failure: " + cause.getMessage)
message match { val msg = message match {
case Some(evt) case Some(evt)
setup.log.error(cause, "Exception during recovery while handling [{}] with sequence number [{}].", evt.getClass.getName, sequenceNr) s"Exception during recovery while handling [${evt.getClass.getName}] with sequence number [$sequenceNr]. PersistenceId: [${setup.persistence}]"
case None case None
setup.log.error(cause, "Exception during recovery. Last known sequence number [{}]", setup.persistenceId, sequenceNr) s"Exception during recovery. Last known sequence number [$sequenceNr]. PersistenceId: [${setup.persistenceId}]"
} }
Behaviors.stopped throw new JournalFailureException(msg, cause)
} }
protected def onRecoveryCompleted(state: ReplayingState[S]): Behavior[InternalProtocol] = try { protected def onRecoveryCompleted(state: ReplayingState[S]): Behavior[InternalProtocol] = try {

View file

@ -12,6 +12,7 @@ import akka.annotation.InternalApi
import akka.persistence.JournalProtocol._ import akka.persistence.JournalProtocol._
import akka.persistence._ import akka.persistence._
import akka.persistence.journal.Tagged import akka.persistence.journal.Tagged
import akka.persistence.typed.EventRejectedException
import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, MDC } import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, MDC }
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._ import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._
@ -227,13 +228,12 @@ private[akka] object EventsourcedRunning {
case WriteMessageRejected(p, cause, id) case WriteMessageRejected(p, cause, id)
if (id == setup.writerIdentity.instanceId) { if (id == setup.writerIdentity.instanceId) {
onPersistRejected(cause, p.payload, p.sequenceNr) // does not stop (by design) throw new EventRejectedException(setup.persistenceId, p.sequenceNr, cause)
onWriteResponse(p)
} else this } else this
case WriteMessageFailure(p, cause, id) case WriteMessageFailure(p, cause, id)
if (id == setup.writerIdentity.instanceId) if (id == setup.writerIdentity.instanceId)
onPersistFailureThenStop(cause, p.payload, p.sequenceNr) throw new JournalFailureException(setup.persistenceId, p.sequenceNr, p.payload.getClass.getName, cause)
else this else this
case WriteMessagesSuccessful case WriteMessagesSuccessful
@ -250,21 +250,6 @@ private[akka] object EventsourcedRunning {
} }
} }
private def onPersistRejected(cause: Throwable, event: Any, seqNr: Long): Unit = {
setup.log.error(
cause,
"Rejected to persist event type [{}] with sequence number [{}] for persistenceId [{}] due to [{}].",
event.getClass.getName, seqNr, setup.persistenceId, cause.getMessage)
}
private def onPersistFailureThenStop(cause: Throwable, event: Any, seqNr: Long): Behavior[InternalProtocol] = {
setup.log.error(cause, "Failed to persist event type [{}] with sequence number [{}] for persistenceId [{}].",
event.getClass.getName, seqNr, setup.persistenceId)
// FIXME see #24479 for reconsidering the stopping behaviour
Behaviors.stopped
}
} }
private def onSnapshotterResponse( private def onSnapshotterResponse(

View file

@ -0,0 +1,18 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.internal
import akka.annotation.InternalApi
/**
* INTERNAL API
*
* Used for journal failures. Private to akka as only internal supervision strategies should use it.
*/
@InternalApi
final private[akka] class JournalFailureException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) {
def this(persistenceId: String, sequenceNr: Long, eventType: String, cause: Throwable) =
this(s"Failed to persist event type $eventType with sequence number $sequenceNr for persistenceId [$persistenceId]", cause)
}

View file

@ -6,7 +6,7 @@ package akka.persistence.typed.internal
import akka.Done import akka.Done
import akka.actor.typed import akka.actor.typed
import akka.actor.typed.Behavior import akka.actor.typed.{ BackoffSupervisorStrategy, Behavior, SupervisorStrategy }
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors } import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.persistence._ import akka.persistence._
@ -32,58 +32,59 @@ private[akka] object PersistentBehaviorImpl {
@InternalApi @InternalApi
private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
persistenceId: String, persistenceId: String,
emptyState: State, emptyState: State,
commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State], commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State],
eventHandler: PersistentBehaviors.EventHandler[State, Event], eventHandler: PersistentBehaviors.EventHandler[State, Event],
journalPluginId: Option[String] = None, journalPluginId: Option[String] = None,
snapshotPluginId: Option[String] = None, snapshotPluginId: Option[String] = None,
recoveryCompleted: (ActorContext[Command], State) Unit = ConstantFun.scalaAnyTwoToUnit, recoveryCompleted: (ActorContext[Command], State) Unit = ConstantFun.scalaAnyTwoToUnit,
tagger: Event Set[String] = (_: Event) Set.empty[String], tagger: Event Set[String] = (_: Event) Set.empty[String],
eventAdapter: EventAdapter[Event, _] = NoOpEventAdapter.instance[Event], eventAdapter: EventAdapter[Event, _] = NoOpEventAdapter.instance[Event],
snapshotWhen: (State, Event, Long) Boolean = ConstantFun.scalaAnyThreeToFalse, snapshotWhen: (State, Event, Long) Boolean = ConstantFun.scalaAnyThreeToFalse,
recovery: Recovery = Recovery(), recovery: Recovery = Recovery(),
onSnapshot: (ActorContext[Command], SnapshotMetadata, Try[Done]) Unit = PersistentBehaviorImpl.defaultOnSnapshot[Command] _ supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop,
onSnapshot: (ActorContext[Command], SnapshotMetadata, Try[Done]) Unit = PersistentBehaviorImpl.defaultOnSnapshot[Command] _
) extends PersistentBehavior[Command, Event, State] with EventsourcedStashReferenceManagement { ) extends PersistentBehavior[Command, Event, State] with EventsourcedStashReferenceManagement {
override def apply(context: typed.ActorContext[Command]): Behavior[Command] = { override def apply(context: typed.ActorContext[Command]): Behavior[Command] = {
Behaviors.setup[EventsourcedBehavior.InternalProtocol] { ctx Behaviors.supervise(
Behaviors.withTimers { timers Behaviors.setup[EventsourcedBehavior.InternalProtocol] { ctx
val settings = EventsourcedSettings(ctx.system) Behaviors.withTimers { timers
val internalStash = stashBuffer(settings) val settings = EventsourcedSettings(ctx.system)
Behaviors.tap( val internalStash = stashBuffer(settings)
onMessage = (_, _) Unit, Behaviors.tap(
onSignal = onSignalCleanup, onMessage = (_, _) Unit,
behavior = { onSignal = onSignalCleanup,
val setup = new EventsourcedSetup( behavior = {
ctx, val setup = new EventsourcedSetup(
timers, ctx,
persistenceId, timers,
emptyState, persistenceId,
commandHandler, emptyState,
eventHandler, commandHandler,
WriterIdentity.newIdentity(), eventHandler,
recoveryCompleted, WriterIdentity.newIdentity(),
onSnapshot, recoveryCompleted,
tagger, onSnapshot,
eventAdapter, tagger,
snapshotWhen, eventAdapter,
recovery, snapshotWhen,
holdingRecoveryPermit = false, recovery,
settings = settings, holdingRecoveryPermit = false,
internalStash = internalStash settings = settings,
) internalStash = internalStash
)
EventsourcedRequestingRecoveryPermit(setup) EventsourcedRequestingRecoveryPermit(setup)
} }
) )
} }
}.widen[Any] { }.widen[Any] {
case res: JournalProtocol.Response InternalProtocol.JournalResponse(res) case res: JournalProtocol.Response InternalProtocol.JournalResponse(res)
case res: SnapshotProtocol.Response InternalProtocol.SnapshotterResponse(res) case res: SnapshotProtocol.Response InternalProtocol.SnapshotterResponse(res)
case RecoveryPermitter.RecoveryPermitGranted InternalProtocol.RecoveryPermitGranted case RecoveryPermitter.RecoveryPermitGranted InternalProtocol.RecoveryPermitGranted
case cmd: Command @unchecked InternalProtocol.IncomingCommand(cmd) case cmd: Command @unchecked InternalProtocol.IncomingCommand(cmd)
}.narrow[Command] }.narrow[Command]).onFailure[JournalFailureException](supervisionStrategy)
} }
/** /**
@ -159,4 +160,15 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
*/ */
def onSnapshot(callback: (ActorContext[Command], SnapshotMetadata, Try[Done]) Unit): PersistentBehavior[Command, Event, State] = def onSnapshot(callback: (ActorContext[Command], SnapshotMetadata, Try[Done]) Unit): PersistentBehavior[Command, Event, State] =
copy(onSnapshot = callback) copy(onSnapshot = callback)
/**
* Back off strategy for persist failures.
*
* Specifically BackOff to prevent resume being used. Resume is not allowed as
* it will be unknown if the event has been persisted.
*
* If not specified the actor will be stopped on failure.
*/
def onPersistFailure(backoffStrategy: BackoffSupervisorStrategy): PersistentBehavior[Command, Event, State] =
copy(supervisionStrategy = backoffStrategy)
} }

View file

@ -8,7 +8,7 @@ import java.util.function.Predicate
import java.util.{ Collections, Optional } import java.util.{ Collections, Optional }
import akka.actor.typed import akka.actor.typed
import akka.actor.typed.Behavior import akka.actor.typed.{ BackoffSupervisorStrategy, Behavior, SupervisorStrategy }
import akka.actor.typed.Behavior.DeferredBehavior import akka.actor.typed.Behavior.DeferredBehavior
import akka.actor.typed.javadsl.ActorContext import akka.actor.typed.javadsl.ActorContext
import akka.annotation.{ ApiMayChange, InternalApi } import akka.annotation.{ ApiMayChange, InternalApi }
@ -21,7 +21,15 @@ import akka.japi.pf.FI
/** Java API */ /** Java API */
@ApiMayChange @ApiMayChange
abstract class PersistentBehavior[Command, Event, State >: Null](val persistenceId: String) extends DeferredBehavior[Command] { abstract class PersistentBehavior[Command, Event, State >: Null] private (val persistenceId: String, supervisorStrategy: Option[BackoffSupervisorStrategy]) extends DeferredBehavior[Command] {
def this(persistenceId: String) = {
this(persistenceId, None)
}
def this(persistenceId: String, backoffSupervisorStrategy: BackoffSupervisorStrategy) = {
this(persistenceId, Some(backoffSupervisorStrategy))
}
/** /**
* Factory of effects. * Factory of effects.
@ -142,7 +150,7 @@ abstract class PersistentBehavior[Command, Event, State >: Null](val persistence
else tags.asScala.toSet else tags.asScala.toSet
} }
scaladsl.PersistentBehaviors.receive[Command, Event, State]( val behavior = scaladsl.PersistentBehaviors.receive[Command, Event, State](
persistenceId, persistenceId,
emptyState, emptyState,
(c, state, cmd) commandHandler()(state, cmd).asInstanceOf[EffectImpl[Event, State]], (c, state, cmd) commandHandler()(state, cmd).asInstanceOf[EffectImpl[Event, State]],
@ -163,6 +171,11 @@ abstract class PersistentBehavior[Command, Event, State >: Null](val persistence
case Failure(t) Optional.of(t) case Failure(t) Optional.of(t)
}) })
}).eventAdapter(eventAdapter()) }).eventAdapter(eventAdapter())
if (supervisorStrategy.isDefined)
behavior.onPersistFailure(supervisorStrategy.get)
else
behavior
} }
} }

View file

@ -5,6 +5,7 @@
package akka.persistence.typed.scaladsl package akka.persistence.typed.scaladsl
import akka.Done import akka.Done
import akka.actor.typed.BackoffSupervisorStrategy
import akka.actor.typed.Behavior.DeferredBehavior import akka.actor.typed.Behavior.DeferredBehavior
import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.ActorContext
import akka.annotation.InternalApi import akka.annotation.InternalApi
@ -148,5 +149,15 @@ trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command
* in types Journals understand but is of a different type than `Event`. * in types Journals understand but is of a different type than `Event`.
*/ */
def eventAdapter(adapter: EventAdapter[Event, _]): PersistentBehavior[Command, Event, State] def eventAdapter(adapter: EventAdapter[Event, _]): PersistentBehavior[Command, Event, State]
/**
* Back off strategy for persist failures.
*
* Specifically BackOff to prevent resume being used. Resume is not allowed as
* it will be unknown if the event has been persisted.
*
* If not specified the actor will be stopped on failure.
*/
def onPersistFailure(backoffStrategy: BackoffSupervisorStrategy): PersistentBehavior[Command, Event, State]
} }

View file

@ -0,0 +1,93 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.javadsl;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.SupervisorStrategy;
import akka.actor.typed.javadsl.ActorContext;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import java.time.Duration;
import static akka.persistence.typed.scaladsl.PersistentBehaviorFailureSpec.conf;
class FailingPersistentActor extends PersistentBehavior<String, String, String> {
private final ActorRef<String> probe;
FailingPersistentActor(String persistenceId, ActorRef<String> probe) {
super(persistenceId, SupervisorStrategy.restartWithBackoff(Duration.ofMillis(1), Duration.ofMillis(5), 0.1));
this.probe = probe;
}
@Override
public void onRecoveryCompleted(String s) {
probe.tell("starting");
}
@Override
public String emptyState() {
return "";
}
@Override
public CommandHandler<String, String, String> commandHandler() {
return (state, command) -> {
probe.tell("persisting");
return Effect().persist(command);
};
}
@Override
public EventHandler<String, String> eventHandler() {
return (state, event) -> {
probe.tell(event);
return state + event;
};
}
}
public class PersistentActorFailureTest extends JUnitSuite {
public static final Config config = conf().withFallback(ConfigFactory.load());
@ClassRule
public static final TestKitJunitResource testKit = new TestKitJunitResource(config);
public static Behavior<String> fail(String pid, ActorRef<String> probe) {
return new FailingPersistentActor(pid, probe);
}
@Test
public void persistEvents() throws Exception {
TestProbe<String> probe = testKit.createTestProbe();
Behavior<String> p1 = fail("fail-first-2", probe.ref());
ActorRef<String> c = testKit.spawn(p1);
probe.expectMessage("starting");
// fail
c.tell("one");
probe.expectMessage("persisting");
probe.expectMessage("one");
probe.expectMessage("starting");
// fail
c.tell("two");
probe.expectMessage("persisting");
probe.expectMessage("two");
probe.expectMessage("starting");
// work
c.tell("three");
probe.expectMessage("persisting");
probe.expectMessage("three");
// no starting as this one did not fail
probe.expectNoMessage();
}
}

View file

@ -4,10 +4,7 @@
package akka.persistence.typed.javadsl; package akka.persistence.typed.javadsl;
import akka.actor.typed.ActorRef; import akka.actor.typed.*;
import akka.actor.typed.Behavior;
import akka.actor.typed.Signal;
import akka.actor.typed.SupervisorStrategy;
import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Adapter; import akka.actor.typed.javadsl.Adapter;
import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Behaviors;
@ -103,7 +100,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
} }
public static class Incremented implements Serializable { public static class Incremented implements Serializable {
private final int delta; final int delta;
public Incremented(int delta) { public Incremented(int delta) {
this.delta = delta; this.delta = delta;
@ -132,8 +129,8 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
} }
public static class State implements Serializable { public static class State implements Serializable {
private final int value; final int value;
private final List<Integer> history; final List<Integer> history;
public State(int value, List<Integer> history) { public State(int value, List<Integer> history) {
this.value = value; this.value = value;
@ -240,7 +237,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
); );
} }
private <A> Behavior<Command> counter( private static <A> Behavior<Command> counter(
String persistentId, String persistentId,
ActorRef<Pair<State, Incremented>> eventProbe, ActorRef<Pair<State, Incremented>> eventProbe,
ActorRef<String> loggingProbe, ActorRef<String> loggingProbe,
@ -250,7 +247,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
EventAdapter<Incremented, A> transformer) { EventAdapter<Incremented, A> transformer) {
return Behaviors.setup(ctx -> { return Behaviors.setup(ctx -> {
return new PersistentBehavior<Command, Incremented, State>(persistentId) { return new PersistentBehavior<Command, Incremented, State>(persistentId, SupervisorStrategy.restartWithBackoff(Duration.ofMillis(1), Duration.ofMillis(5), 0.1)) {
@Override @Override
public CommandHandler<Command, Incremented, State> commandHandler() { public CommandHandler<Command, Incremented, State> commandHandler() {
return commandHandlerBuilder(State.class) return commandHandlerBuilder(State.class)
@ -479,7 +476,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
.runWith(Sink.seq(), materializer).toCompletableFuture().get(); .runWith(Sink.seq(), materializer).toCompletableFuture().get();
assertEquals(Lists.newArrayList( assertEquals(Lists.newArrayList(
new EventEnvelope(new Sequence(1), "transform", 1, new Wrapper<>(new Incremented(1))) new EventEnvelope(new Sequence(1), "transform", 1, new Wrapper<>(new Incremented(1)))
), events); ), events);
ActorRef<Command> c2 = testKit.spawn(counter("transform", eventProbe.ref(), new WrapperEventAdapter())); ActorRef<Command> c2 = testKit.spawn(counter("transform", eventProbe.ref(), new WrapperEventAdapter()));
c2.tell(new GetValue(stateProbe.ref())); c2.tell(new GetValue(stateProbe.ref()));
@ -487,7 +484,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
} }
//event-wrapper //event-wrapper
class WrapperEventAdapter extends EventAdapter<Incremented,Wrapper> { class WrapperEventAdapter extends EventAdapter<Incremented, Wrapper> {
@Override @Override
public Wrapper toJournal(Incremented incremented) { public Wrapper toJournal(Incremented incremented) {
return new Wrapper<>(incremented); return new Wrapper<>(incremented);

View file

@ -5,12 +5,13 @@
package jdocs.akka.persistence.typed; package jdocs.akka.persistence.typed;
import akka.actor.typed.Behavior; import akka.actor.typed.Behavior;
import akka.actor.typed.SupervisorStrategy;
import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors; import akka.actor.typed.javadsl.Behaviors;
import akka.persistence.typed.javadsl.CommandHandler; import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.Effect;
import akka.persistence.typed.javadsl.EventHandler; import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.PersistentBehavior; import akka.persistence.typed.javadsl.PersistentBehavior;
import java.time.Duration;
import java.util.Collections; import java.util.Collections;
import java.util.Set; import java.util.Set;
@ -22,11 +23,12 @@ public class BasicPersistentBehaviorsTest {
public interface Event {} public interface Event {}
public static class State {} public static class State {}
//#supervision
public static class MyPersistentBehavior extends PersistentBehavior<Command, Event, State> { public static class MyPersistentBehavior extends PersistentBehavior<Command, Event, State> {
public MyPersistentBehavior(String persistenceId) { public MyPersistentBehavior(String persistenceId) {
super(persistenceId); super(persistenceId, SupervisorStrategy.restartWithBackoff(Duration.ofSeconds(10), Duration.ofSeconds(30), 0.2));
} }
//#supervision
@Override @Override
public State emptyState() { public State emptyState() {
@ -62,7 +64,7 @@ public class BasicPersistentBehaviorsTest {
//#tagging //#tagging
} }
static Behavior<Command> persistentBehavior = new MyPersistentBehavior("pid"); static PersistentBehavior<Command, Event, State> persistentBehavior = new MyPersistentBehavior("pid");
//#structure //#structure
//#wrapPersistentBehavior //#wrapPersistentBehavior

View file

@ -0,0 +1,142 @@
/**
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.scaladsl
import java.util.concurrent.atomic.AtomicInteger
import akka.Done
import akka.actor.testkit.typed.TestKitSettings
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ ActorRef, Behavior, SupervisorStrategy, TypedAkkaSpecWithShutdown }
import akka.persistence.AtomicWrite
import akka.persistence.journal.inmem.InmemJournal
import akka.persistence.typed.EventRejectedException
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.concurrent.Eventually
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Try
class ChaosJournal extends InmemJournal {
var count = 0
var failRecovery = true
var reject = true
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
val pid = messages.head.persistenceId
if (pid == "fail-first-2" && count < 2) {
count += 1
Future.failed(new RuntimeException("database says no"))
} else if (pid == "reject-first" && reject) {
reject = false
Future.successful(messages.map(aw Try {
throw new RuntimeException("I don't like it")
}))
} else {
super.asyncWriteMessages(messages)
}
}
override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
if (persistenceId == "fail-recovery-once" && failRecovery) {
failRecovery = false
Future.failed(new RuntimeException("Nah"))
} else {
super.asyncReadHighestSequenceNr(persistenceId, fromSequenceNr)
}
}
}
object PersistentBehaviorFailureSpec {
val conf = ConfigFactory.parseString(
s"""
akka.loglevel = DEBUG
akka.persistence.journal.plugin = "failure-journal"
failure-journal = $${akka.persistence.journal.inmem}
failure-journal {
class = "akka.persistence.typed.scaladsl.ChaosJournal"
}
""").withFallback(ConfigFactory.load("reference.conf")).resolve()
}
class PersistentBehaviorFailureSpec extends ActorTestKit with TypedAkkaSpecWithShutdown with Eventually {
import PersistentBehaviorSpec._
override lazy val config: Config = PersistentBehaviorFailureSpec.conf
implicit val testSettings = TestKitSettings(system)
def failingPersistentActor(pid: String, probe: ActorRef[String]): Behavior[String] = PersistentBehaviors.receive[String, String, String](
pid, "",
(ctx, state, cmd) {
probe.tell("persisting")
Effect.persist(cmd)
},
(state, event) {
probe.tell(event)
state + event
}
).onRecoveryCompleted { (ctx, state)
probe.tell("starting")
}.onPersistFailure(SupervisorStrategy.restartWithBackoff(1.milli, 5.millis, 0.1))
"A typed persistent actor (failures)" must {
"restart with backoff" in {
val probe = TestProbe[String]()
val behav = failingPersistentActor("fail-first-2", probe.ref)
val c = spawn(behav)
probe.expectMessage("starting")
// fail
c ! "one"
probe.expectMessage("persisting")
probe.expectMessage("one")
probe.expectMessage("starting")
// fail
c ! "two"
probe.expectMessage("persisting")
probe.expectMessage("two")
probe.expectMessage("starting")
// work!
c ! "three"
probe.expectMessage("persisting")
probe.expectMessage("three")
// no restart
probe.expectNoMessage()
}
"restart with backoff for recovery" in {
val probe = TestProbe[String]()
val behav = failingPersistentActor("fail-recovery-once", probe.ref)
spawn(behav)
// First time fails, second time should work and call onRecoveryComplete
probe.expectMessage("starting")
probe.expectNoMessage()
}
"handles rejections" in {
val probe = TestProbe[String]()
val behav =
Behaviors.supervise(
failingPersistentActor("reject-first", probe.ref)).onFailure[EventRejectedException](
SupervisorStrategy.restartWithBackoff(1.milli, 5.millis, 0.1))
val c = spawn(behav)
// First time fails, second time should work and call onRecoveryComplete
probe.expectMessage("starting")
c ! "one"
probe.expectMessage("persisting")
probe.expectMessage("one")
probe.expectMessage("starting")
c ! "two"
probe.expectMessage("persisting")
probe.expectMessage("two")
// no restart
probe.expectNoMessage()
}
}
}

View file

@ -227,7 +227,6 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
"persist an event" in { "persist an event" in {
val c = spawn(counter(nextPid)) val c = spawn(counter(nextPid))
val probe = TestProbe[State] val probe = TestProbe[State]
c ! Increment c ! Increment
c ! GetValue(probe.ref) c ! GetValue(probe.ref)

View file

@ -4,10 +4,12 @@
package docs.akka.persistence.typed package docs.akka.persistence.typed
import akka.actor.typed.Behavior import akka.actor.typed.{ Behavior, SupervisorStrategy }
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.scaladsl.PersistentBehaviors import akka.persistence.typed.scaladsl.PersistentBehaviors
import scala.concurrent.duration._
object BasicPersistentBehaviorsCompileOnly { object BasicPersistentBehaviorsCompileOnly {
//#structure //#structure
@ -83,4 +85,13 @@ object BasicPersistentBehaviorsCompileOnly {
}) })
} }
//#wrapPersistentBehavior //#wrapPersistentBehavior
//#supervision
val supervisedBehavior = samplePersistentBehavior.onPersistFailure(
SupervisorStrategy.restartWithBackoff(
minBackoff = 10.seconds,
maxBackoff = 60.seconds,
randomFactor = 0.1
))
//#supervision
} }

View file

@ -61,7 +61,7 @@ import akka.util.OptionVal
private case object ResendTick private case object ResendTick
// If other message types than SystemMesage need acked delivery they can extend this trait. // If other message types than SystemMessage need acked delivery they can extend this trait.
// Used in tests since real SystemMessage are somewhat cumbersome to create. // Used in tests since real SystemMessage are somewhat cumbersome to create.
trait AckedDeliveryMessage trait AckedDeliveryMessage