diff --git a/akka-actor/src/main/scala/akka/util/ConstantFun.scala b/akka-actor/src/main/scala/akka/util/ConstantFun.scala index 66bb54ea32..2d35a59eb8 100644 --- a/akka-actor/src/main/scala/akka/util/ConstantFun.scala +++ b/akka-actor/src/main/scala/akka/util/ConstantFun.scala @@ -44,6 +44,8 @@ import akka.japi.{ Pair ⇒ JPair } val oneInt = (_: Any) ⇒ 1 + val unitToUnit = () ⇒ () + private val _nullFun = (_: Any) ⇒ null private val conforms = (a: Any) ⇒ a diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index 6a5b619746..e959a661e1 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -10,13 +10,16 @@ import java.util.concurrent.atomic.AtomicInteger import scala.util.Failure import scala.util.Success import scala.util.Try +import scala.util.control.NonFatal import akka.Done import akka.actor.typed import akka.actor.typed.BackoffSupervisorStrategy import akka.actor.typed.Behavior import akka.actor.typed.BehaviorInterceptor +import akka.actor.typed.Logger import akka.actor.typed.PostStop +import akka.actor.typed.PreRestart import akka.actor.typed.Signal import akka.actor.typed.SupervisorStrategy import akka.actor.typed.scaladsl.ActorContext @@ -66,6 +69,8 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( journalPluginId: Option[String] = None, snapshotPluginId: Option[String] = None, recoveryCompleted: State ⇒ Unit = ConstantFun.scalaAnyToUnit, + postStop: () ⇒ Unit = ConstantFun.unitToUnit, + preRestart: () ⇒ Unit = ConstantFun.unitToUnit, tagger: Event ⇒ Set[String] = (_: Event) ⇒ Set.empty[String], eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event], snapshotWhen: (State, Event, Long) ⇒ Boolean = ConstantFun.scalaAnyThreeToFalse, @@ -127,6 +132,9 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( eventsourcedSetup.cancelRecoveryTimer() // clear stash to be GC friendly stashState.clearStashBuffers() + signalPostStop(eventsourcedSetup.log) + } else if (signal == PreRestart) { + signalPreRestart(eventsourcedSetup.log) } target(ctx, signal) } @@ -144,96 +152,64 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( }.onFailure[JournalFailureException](supervisionStrategy) } - /** - * The `callback` function is called to notify the actor that the recovery process - * is finished. - */ - def onRecoveryCompleted(callback: State ⇒ Unit): EventSourcedBehavior[Command, Event, State] = + def signalPostStop(log: Logger): Unit = { + try postStop() catch { + case NonFatal(e) ⇒ + log.warning("Exception in postStop: {}", e) + } + } + + def signalPreRestart(log: Logger): Unit = { + try preRestart() catch { + case NonFatal(e) ⇒ + log.warning("Exception in preRestart: {}", e) + } + } + + override def onRecoveryCompleted(callback: State ⇒ Unit): EventSourcedBehavior[Command, Event, State] = copy(recoveryCompleted = callback) - /** - * Initiates a snapshot if the given function returns true. - * When persisting multiple events at once the snapshot is triggered after all the events have - * been persisted. - * - * `predicate` receives the State, Event and the sequenceNr used for the Event - */ - def snapshotWhen(predicate: (State, Event, Long) ⇒ Boolean): EventSourcedBehavior[Command, Event, State] = + override def onPostStop(callback: () ⇒ Unit): EventSourcedBehavior[Command, Event, State] = + copy(postStop = callback) + + override def onPreRestart(callback: () ⇒ Unit): EventSourcedBehavior[Command, Event, State] = + copy(preRestart = callback) + + override def snapshotWhen(predicate: (State, Event, Long) ⇒ Boolean): EventSourcedBehavior[Command, Event, State] = copy(snapshotWhen = predicate) - /** - * Snapshot every N events - * - * `numberOfEvents` should be greater than 0 - */ - def snapshotEvery(numberOfEvents: Long): EventSourcedBehavior[Command, Event, State] = { + override def snapshotEvery(numberOfEvents: Long): EventSourcedBehavior[Command, Event, State] = { require(numberOfEvents > 0, s"numberOfEvents should be positive: Was $numberOfEvents") copy(snapshotWhen = (_, _, seqNr) ⇒ seqNr % numberOfEvents == 0) } - /** - * Change the journal plugin id that this actor should use. - */ - def withJournalPluginId(id: String): EventSourcedBehavior[Command, Event, State] = { + override def withJournalPluginId(id: String): EventSourcedBehavior[Command, Event, State] = { require(id != null, "journal plugin id must not be null; use empty string for 'default' journal") copy(journalPluginId = if (id != "") Some(id) else None) } - /** - * Change the snapshot store plugin id that this actor should use. - */ - def withSnapshotPluginId(id: String): EventSourcedBehavior[Command, Event, State] = { + override def withSnapshotPluginId(id: String): EventSourcedBehavior[Command, Event, State] = { require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store") copy(snapshotPluginId = if (id != "") Some(id) else None) } - /** - * Changes the snapshot selection criteria used by this behavior. - * By default the most recent snapshot is used, and the remaining state updates are recovered by replaying events - * from the sequence number up until which the snapshot reached. - * - * You may configure the behavior to skip replaying snapshots completely, in which case the recovery will be - * performed by replaying all events -- which may take a long time. - */ - def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): EventSourcedBehavior[Command, Event, State] = { + override def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): EventSourcedBehavior[Command, Event, State] = { copy(recovery = Recovery(selection)) } - /** - * The `tagger` function should give event tags, which will be used in persistence query - */ - def withTagger(tagger: Event ⇒ Set[String]): EventSourcedBehavior[Command, Event, State] = + override def withTagger(tagger: Event ⇒ Set[String]): EventSourcedBehavior[Command, Event, State] = copy(tagger = tagger) - /** - * Adapt the event before sending to the journal e.g. wrapping the event in a type - * the journal understands - */ - def eventAdapter(adapter: EventAdapter[Event, _]): EventSourcedBehavior[Command, Event, State] = + override def eventAdapter(adapter: EventAdapter[Event, _]): EventSourcedBehavior[Command, Event, State] = copy(eventAdapter = adapter.asInstanceOf[EventAdapter[Event, Any]]) - /** - * The `callback` function is called to notify the actor that a snapshot has finished - */ - def onSnapshot(callback: (SnapshotMetadata, Try[Done]) ⇒ Unit): EventSourcedBehavior[Command, Event, State] = + override def onSnapshot(callback: (SnapshotMetadata, Try[Done]) ⇒ Unit): EventSourcedBehavior[Command, Event, State] = copy(onSnapshot = callback) - /** - * Back off strategy for persist failures. - * - * Specifically BackOff to prevent resume being used. Resume is not allowed as - * it will be unknown if the event has been persisted. - * - * If not specified the actor will be stopped on failure. - */ - def onPersistFailure(backoffStrategy: BackoffSupervisorStrategy): EventSourcedBehavior[Command, Event, State] = + override def onPersistFailure(backoffStrategy: BackoffSupervisorStrategy): EventSourcedBehavior[Command, Event, State] = copy(supervisionStrategy = backoffStrategy) - /** - * The `callback` function is called to notify that recovery has failed. For setting a supervision - * strategy `onPersistFailure` - */ - def onRecoveryFailure(callback: Throwable ⇒ Unit): EventSourcedBehavior[Command, Event, State] = + override def onRecoveryFailure(callback: Throwable ⇒ Unit): EventSourcedBehavior[Command, Event, State] = copy(onRecoveryFailure = callback) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala index a290eeb2de..2342e796da 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventSourcedBehavior.scala @@ -87,17 +87,27 @@ abstract class EventSourcedBehavior[Command, Event, State >: Null] private[akka] EventHandlerBuilder.builder[State, Event]() /** - * The `callback` function is called to notify the actor that the recovery process + * The callback is invoked to notify the actor that the recovery process * is finished. */ def onRecoveryCompleted(state: State): Unit = () /** - * The `callback` function is called to notify the actor that the recovery process + * The callback is invoked to notify the actor that the recovery process * has failed */ def onRecoveryFailure(failure: Throwable): Unit = () + /** + * The callback is invoked to notify that the actor has stopped. + */ + def onPostStop(): Unit = () + + /** + * The callback is invoked to notify that the actor is restarted. + */ + def onPreRestart(): Unit = () + /** * Override to get notified when a snapshot is finished. * @@ -160,6 +170,8 @@ abstract class EventSourcedBehavior[Command, Event, State >: Null] private[akka] eventHandler()(_, _), getClass) .onRecoveryCompleted(onRecoveryCompleted) + .onPostStop(() ⇒ onPostStop()) + .onPreRestart(() ⇒ onPreRestart()) .snapshotWhen(snapshotWhen) .withTagger(tagger) .onSnapshot((meta, result) ⇒ { diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala index 3becc58b2c..6416c2d91c 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala @@ -107,6 +107,16 @@ object EventSourcedBehavior { */ def onRecoveryFailure(callback: Throwable ⇒ Unit): EventSourcedBehavior[Command, Event, State] + /** + * The `callback` function is called to notify that the actor has stopped. + */ + def onPostStop(callback: () ⇒ Unit): EventSourcedBehavior[Command, Event, State] + + /** + * The `callback` function is called to notify that the actor is restarted. + */ + def onPreRestart(callback: () ⇒ Unit): EventSourcedBehavior[Command, Event, State] + /** * The `callback` function is called to notify when a snapshot is complete. */ diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java index 46738c8c7c..7397874a1a 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java @@ -458,6 +458,23 @@ public class PersistentActorJavaDslTest extends JUnitSuite { probe.expectTerminated(c, Duration.ofSeconds(1)); } + @Test + public void postStop() { + TestProbe probe = testKit.createTestProbe(); + Behavior counter = + Behaviors.setup( + ctx -> + new CounterBehavior(new PersistenceId("c5"), ctx) { + @Override + public void onPostStop() { + probe.ref().tell("stopped"); + } + }); + ActorRef c = testKit.spawn(counter); + c.tell(StopThenLog.INSTANCE); + probe.expectMessage("stopped"); + } + @Test public void tapPersistentActor() { TestProbe interceptProbe = testKit.createTestProbe(); diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala index 82528d0eb5..74094a8614 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorFailureSpec.scala @@ -78,6 +78,8 @@ class EventSourcedBehaviorFailureSpec extends ScalaTestWithActorTestKit(EventSou EventSourcedBehavior[String, String, String]( pid, "", (_, cmd) ⇒ { + if (cmd == "wrong") + throw new TestException("wrong command") probe.tell("persisting") Effect.persist(cmd) }, @@ -87,18 +89,22 @@ class EventSourcedBehaviorFailureSpec extends ScalaTestWithActorTestKit(EventSou } ).onRecoveryCompleted { _ ⇒ probe.tell("starting") - }.onPersistFailure(SupervisorStrategy.restartWithBackoff(1.milli, 5.millis, 0.1) + } + .onPostStop(() ⇒ probe.tell("stopped")) + .onPreRestart(() ⇒ probe.tell("restarting")) + .onPersistFailure(SupervisorStrategy.restartWithBackoff(1.milli, 5.millis, 0.1) .withLoggingEnabled(enabled = false)) "A typed persistent actor (failures)" must { "call onRecoveryFailure when replay fails" in { - val notUsedProbe = TestProbe[String]() - val probe = TestProbe[Throwable]() - spawn(failingPersistentActor(PersistenceId("fail-recovery"), notUsedProbe.ref) - .onRecoveryFailure(t ⇒ probe.ref ! t)) + val probe = TestProbe[String]() + val excProbe = TestProbe[Throwable]() + spawn(failingPersistentActor(PersistenceId("fail-recovery"), probe.ref) + .onRecoveryFailure(t ⇒ excProbe.ref ! t)) - probe.expectMessageType[TestException].message shouldEqual "Nope" + excProbe.expectMessageType[TestException].message shouldEqual "Nope" + probe.expectMessage("restarting") } "handle exceptions in onRecoveryFailure" in { @@ -120,11 +126,13 @@ class EventSourcedBehaviorFailureSpec extends ScalaTestWithActorTestKit(EventSou c ! "one" probe.expectMessage("persisting") probe.expectMessage("one") + probe.expectMessage("restarting") probe.expectMessage("starting") // fail c ! "two" probe.expectMessage("persisting") probe.expectMessage("two") + probe.expectMessage("restarting") probe.expectMessage("starting") // work! c ! "three" @@ -139,6 +147,7 @@ class EventSourcedBehaviorFailureSpec extends ScalaTestWithActorTestKit(EventSou val behav = failingPersistentActor(PersistenceId("fail-recovery-once"), probe.ref) spawn(behav) // First time fails, second time should work and call onRecoveryComplete + probe.expectMessage("restarting") probe.expectMessage("starting") probe.expectNoMessage() } @@ -156,6 +165,7 @@ class EventSourcedBehaviorFailureSpec extends ScalaTestWithActorTestKit(EventSou c ! "one" probe.expectMessage("persisting") probe.expectMessage("one") + probe.expectMessage("restarting") probe.expectMessage("starting") c ! "two" probe.expectMessage("persisting") @@ -163,5 +173,24 @@ class EventSourcedBehaviorFailureSpec extends ScalaTestWithActorTestKit(EventSou // no restart probe.expectNoMessage() } + + "stop (default supervisor strategy) if command handler throws" in { + val probe = TestProbe[String]() + val behav = failingPersistentActor(PersistenceId("wrong-command-1"), probe.ref) + val c = spawn(behav) + probe.expectMessage("starting") + c ! "wrong" + probe.expectMessage("stopped") + } + + "restart supervisor strategy if command handler throws" in { + val probe = TestProbe[String]() + val behav = Behaviors.supervise(failingPersistentActor(PersistenceId("wrong-command-2"), probe.ref)) + .onFailure[TestException](SupervisorStrategy.restart) + val c = spawn(behav) + probe.expectMessage("starting") + c ! "wrong" + probe.expectMessage("restarting") + } } }