add onPostStop and onPreRestart hooks to EventSourcedBehavior, #26404

This commit is contained in:
Patrik Nordwall 2019-02-18 16:54:46 +01:00
parent 537641c2a8
commit d82115988b
6 changed files with 117 additions and 71 deletions

View file

@ -44,6 +44,8 @@ import akka.japi.{ Pair ⇒ JPair }
val oneInt = (_: Any) 1
val unitToUnit = () ()
private val _nullFun = (_: Any) null
private val conforms = (a: Any) a

View file

@ -10,13 +10,16 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.util.control.NonFatal
import akka.Done
import akka.actor.typed
import akka.actor.typed.BackoffSupervisorStrategy
import akka.actor.typed.Behavior
import akka.actor.typed.BehaviorInterceptor
import akka.actor.typed.Logger
import akka.actor.typed.PostStop
import akka.actor.typed.PreRestart
import akka.actor.typed.Signal
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.scaladsl.ActorContext
@ -66,6 +69,8 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
journalPluginId: Option[String] = None,
snapshotPluginId: Option[String] = None,
recoveryCompleted: State Unit = ConstantFun.scalaAnyToUnit,
postStop: () Unit = ConstantFun.unitToUnit,
preRestart: () Unit = ConstantFun.unitToUnit,
tagger: Event Set[String] = (_: Event) Set.empty[String],
eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event],
snapshotWhen: (State, Event, Long) Boolean = ConstantFun.scalaAnyThreeToFalse,
@ -127,6 +132,9 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
eventsourcedSetup.cancelRecoveryTimer()
// clear stash to be GC friendly
stashState.clearStashBuffers()
signalPostStop(eventsourcedSetup.log)
} else if (signal == PreRestart) {
signalPreRestart(eventsourcedSetup.log)
}
target(ctx, signal)
}
@ -144,96 +152,64 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
}.onFailure[JournalFailureException](supervisionStrategy)
}
/**
* The `callback` function is called to notify the actor that the recovery process
* is finished.
*/
def onRecoveryCompleted(callback: State Unit): EventSourcedBehavior[Command, Event, State] =
def signalPostStop(log: Logger): Unit = {
try postStop() catch {
case NonFatal(e)
log.warning("Exception in postStop: {}", e)
}
}
def signalPreRestart(log: Logger): Unit = {
try preRestart() catch {
case NonFatal(e)
log.warning("Exception in preRestart: {}", e)
}
}
override def onRecoveryCompleted(callback: State Unit): EventSourcedBehavior[Command, Event, State] =
copy(recoveryCompleted = callback)
/**
* Initiates a snapshot if the given function returns true.
* When persisting multiple events at once the snapshot is triggered after all the events have
* been persisted.
*
* `predicate` receives the State, Event and the sequenceNr used for the Event
*/
def snapshotWhen(predicate: (State, Event, Long) Boolean): EventSourcedBehavior[Command, Event, State] =
override def onPostStop(callback: () Unit): EventSourcedBehavior[Command, Event, State] =
copy(postStop = callback)
override def onPreRestart(callback: () Unit): EventSourcedBehavior[Command, Event, State] =
copy(preRestart = callback)
override def snapshotWhen(predicate: (State, Event, Long) Boolean): EventSourcedBehavior[Command, Event, State] =
copy(snapshotWhen = predicate)
/**
* Snapshot every N events
*
* `numberOfEvents` should be greater than 0
*/
def snapshotEvery(numberOfEvents: Long): EventSourcedBehavior[Command, Event, State] = {
override def snapshotEvery(numberOfEvents: Long): EventSourcedBehavior[Command, Event, State] = {
require(numberOfEvents > 0, s"numberOfEvents should be positive: Was $numberOfEvents")
copy(snapshotWhen = (_, _, seqNr) seqNr % numberOfEvents == 0)
}
/**
* Change the journal plugin id that this actor should use.
*/
def withJournalPluginId(id: String): EventSourcedBehavior[Command, Event, State] = {
override def withJournalPluginId(id: String): EventSourcedBehavior[Command, Event, State] = {
require(id != null, "journal plugin id must not be null; use empty string for 'default' journal")
copy(journalPluginId = if (id != "") Some(id) else None)
}
/**
* Change the snapshot store plugin id that this actor should use.
*/
def withSnapshotPluginId(id: String): EventSourcedBehavior[Command, Event, State] = {
override def withSnapshotPluginId(id: String): EventSourcedBehavior[Command, Event, State] = {
require(id != null, "snapshot plugin id must not be null; use empty string for 'default' snapshot store")
copy(snapshotPluginId = if (id != "") Some(id) else None)
}
/**
* Changes the snapshot selection criteria used by this behavior.
* By default the most recent snapshot is used, and the remaining state updates are recovered by replaying events
* from the sequence number up until which the snapshot reached.
*
* You may configure the behavior to skip replaying snapshots completely, in which case the recovery will be
* performed by replaying all events -- which may take a long time.
*/
def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): EventSourcedBehavior[Command, Event, State] = {
override def withSnapshotSelectionCriteria(selection: SnapshotSelectionCriteria): EventSourcedBehavior[Command, Event, State] = {
copy(recovery = Recovery(selection))
}
/**
* The `tagger` function should give event tags, which will be used in persistence query
*/
def withTagger(tagger: Event Set[String]): EventSourcedBehavior[Command, Event, State] =
override def withTagger(tagger: Event Set[String]): EventSourcedBehavior[Command, Event, State] =
copy(tagger = tagger)
/**
* Adapt the event before sending to the journal e.g. wrapping the event in a type
* the journal understands
*/
def eventAdapter(adapter: EventAdapter[Event, _]): EventSourcedBehavior[Command, Event, State] =
override def eventAdapter(adapter: EventAdapter[Event, _]): EventSourcedBehavior[Command, Event, State] =
copy(eventAdapter = adapter.asInstanceOf[EventAdapter[Event, Any]])
/**
* The `callback` function is called to notify the actor that a snapshot has finished
*/
def onSnapshot(callback: (SnapshotMetadata, Try[Done]) Unit): EventSourcedBehavior[Command, Event, State] =
override def onSnapshot(callback: (SnapshotMetadata, Try[Done]) Unit): EventSourcedBehavior[Command, Event, State] =
copy(onSnapshot = callback)
/**
* Back off strategy for persist failures.
*
* Specifically BackOff to prevent resume being used. Resume is not allowed as
* it will be unknown if the event has been persisted.
*
* If not specified the actor will be stopped on failure.
*/
def onPersistFailure(backoffStrategy: BackoffSupervisorStrategy): EventSourcedBehavior[Command, Event, State] =
override def onPersistFailure(backoffStrategy: BackoffSupervisorStrategy): EventSourcedBehavior[Command, Event, State] =
copy(supervisionStrategy = backoffStrategy)
/**
* The `callback` function is called to notify that recovery has failed. For setting a supervision
* strategy `onPersistFailure`
*/
def onRecoveryFailure(callback: Throwable Unit): EventSourcedBehavior[Command, Event, State] =
override def onRecoveryFailure(callback: Throwable Unit): EventSourcedBehavior[Command, Event, State] =
copy(onRecoveryFailure = callback)
}

View file

@ -87,17 +87,27 @@ abstract class EventSourcedBehavior[Command, Event, State >: Null] private[akka]
EventHandlerBuilder.builder[State, Event]()
/**
* The `callback` function is called to notify the actor that the recovery process
* The callback is invoked to notify the actor that the recovery process
* is finished.
*/
def onRecoveryCompleted(state: State): Unit = ()
/**
* The `callback` function is called to notify the actor that the recovery process
* The callback is invoked to notify the actor that the recovery process
* has failed
*/
def onRecoveryFailure(failure: Throwable): Unit = ()
/**
* The callback is invoked to notify that the actor has stopped.
*/
def onPostStop(): Unit = ()
/**
* The callback is invoked to notify that the actor is restarted.
*/
def onPreRestart(): Unit = ()
/**
* Override to get notified when a snapshot is finished.
*
@ -160,6 +170,8 @@ abstract class EventSourcedBehavior[Command, Event, State >: Null] private[akka]
eventHandler()(_, _),
getClass)
.onRecoveryCompleted(onRecoveryCompleted)
.onPostStop(() onPostStop())
.onPreRestart(() onPreRestart())
.snapshotWhen(snapshotWhen)
.withTagger(tagger)
.onSnapshot((meta, result) {

View file

@ -107,6 +107,16 @@ object EventSourcedBehavior {
*/
def onRecoveryFailure(callback: Throwable Unit): EventSourcedBehavior[Command, Event, State]
/**
* The `callback` function is called to notify that the actor has stopped.
*/
def onPostStop(callback: () Unit): EventSourcedBehavior[Command, Event, State]
/**
* The `callback` function is called to notify that the actor is restarted.
*/
def onPreRestart(callback: () Unit): EventSourcedBehavior[Command, Event, State]
/**
* The `callback` function is called to notify when a snapshot is complete.
*/

View file

@ -458,6 +458,23 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
probe.expectTerminated(c, Duration.ofSeconds(1));
}
@Test
public void postStop() {
TestProbe<String> probe = testKit.createTestProbe();
Behavior<Command> counter =
Behaviors.setup(
ctx ->
new CounterBehavior(new PersistenceId("c5"), ctx) {
@Override
public void onPostStop() {
probe.ref().tell("stopped");
}
});
ActorRef<Command> c = testKit.spawn(counter);
c.tell(StopThenLog.INSTANCE);
probe.expectMessage("stopped");
}
@Test
public void tapPersistentActor() {
TestProbe<Object> interceptProbe = testKit.createTestProbe();

View file

@ -78,6 +78,8 @@ class EventSourcedBehaviorFailureSpec extends ScalaTestWithActorTestKit(EventSou
EventSourcedBehavior[String, String, String](
pid, "",
(_, cmd) {
if (cmd == "wrong")
throw new TestException("wrong command")
probe.tell("persisting")
Effect.persist(cmd)
},
@ -87,18 +89,22 @@ class EventSourcedBehaviorFailureSpec extends ScalaTestWithActorTestKit(EventSou
}
).onRecoveryCompleted { _
probe.tell("starting")
}.onPersistFailure(SupervisorStrategy.restartWithBackoff(1.milli, 5.millis, 0.1)
}
.onPostStop(() probe.tell("stopped"))
.onPreRestart(() probe.tell("restarting"))
.onPersistFailure(SupervisorStrategy.restartWithBackoff(1.milli, 5.millis, 0.1)
.withLoggingEnabled(enabled = false))
"A typed persistent actor (failures)" must {
"call onRecoveryFailure when replay fails" in {
val notUsedProbe = TestProbe[String]()
val probe = TestProbe[Throwable]()
spawn(failingPersistentActor(PersistenceId("fail-recovery"), notUsedProbe.ref)
.onRecoveryFailure(t probe.ref ! t))
val probe = TestProbe[String]()
val excProbe = TestProbe[Throwable]()
spawn(failingPersistentActor(PersistenceId("fail-recovery"), probe.ref)
.onRecoveryFailure(t excProbe.ref ! t))
probe.expectMessageType[TestException].message shouldEqual "Nope"
excProbe.expectMessageType[TestException].message shouldEqual "Nope"
probe.expectMessage("restarting")
}
"handle exceptions in onRecoveryFailure" in {
@ -120,11 +126,13 @@ class EventSourcedBehaviorFailureSpec extends ScalaTestWithActorTestKit(EventSou
c ! "one"
probe.expectMessage("persisting")
probe.expectMessage("one")
probe.expectMessage("restarting")
probe.expectMessage("starting")
// fail
c ! "two"
probe.expectMessage("persisting")
probe.expectMessage("two")
probe.expectMessage("restarting")
probe.expectMessage("starting")
// work!
c ! "three"
@ -139,6 +147,7 @@ class EventSourcedBehaviorFailureSpec extends ScalaTestWithActorTestKit(EventSou
val behav = failingPersistentActor(PersistenceId("fail-recovery-once"), probe.ref)
spawn(behav)
// First time fails, second time should work and call onRecoveryComplete
probe.expectMessage("restarting")
probe.expectMessage("starting")
probe.expectNoMessage()
}
@ -156,6 +165,7 @@ class EventSourcedBehaviorFailureSpec extends ScalaTestWithActorTestKit(EventSou
c ! "one"
probe.expectMessage("persisting")
probe.expectMessage("one")
probe.expectMessage("restarting")
probe.expectMessage("starting")
c ! "two"
probe.expectMessage("persisting")
@ -163,5 +173,24 @@ class EventSourcedBehaviorFailureSpec extends ScalaTestWithActorTestKit(EventSou
// no restart
probe.expectNoMessage()
}
"stop (default supervisor strategy) if command handler throws" in {
val probe = TestProbe[String]()
val behav = failingPersistentActor(PersistenceId("wrong-command-1"), probe.ref)
val c = spawn(behav)
probe.expectMessage("starting")
c ! "wrong"
probe.expectMessage("stopped")
}
"restart supervisor strategy if command handler throws" in {
val probe = TestProbe[String]()
val behav = Behaviors.supervise(failingPersistentActor(PersistenceId("wrong-command-2"), probe.ref))
.onFailure[TestException](SupervisorStrategy.restart)
val c = spawn(behav)
probe.expectMessage("starting")
c ! "wrong"
probe.expectMessage("restarting")
}
}
}