diff --git a/akka-docs/src/main/paradox/persistence.md b/akka-docs/src/main/paradox/persistence.md index 27f6e898b9..de72f297ab 100644 --- a/akka-docs/src/main/paradox/persistence.md +++ b/akka-docs/src/main/paradox/persistence.md @@ -326,11 +326,12 @@ The callback will not be invoked if the actor is restarted (or stopped) in betwe ### Deferring actions until preceding persist handlers have executed Sometimes when working with `persistAsync` or `persist` you may find that it would be nice to define some actions in terms of -''happens-after the previous `persistAsync`/`persist` handlers have been invoked''. `PersistentActor` provides an utility method -called `deferAsync`, which works similarly to `persistAsync` yet does not persist the passed in event. It is recommended to -use it for *read* operations, and actions which do not have corresponding events in your domain model. +''happens-after the previous `persistAsync`/`persist` handlers have been invoked''. `PersistentActor` provides utility methods +called `defer` and `deferAsync`, which work similarly to `persist` and `persistAsync` respectively yet do not persist the +passed in event. It is recommended to use them for *read* operations, and actions which do not have corresponding events in your +domain model. -Using this method is very similar to the persist family of methods, yet it does **not** persist the passed in event. +Using those methods is very similar to the persist family of methods, yet they do **not** persist the passed in event. It will be kept in memory and used when invoking the handler. Scala @@ -340,7 +341,7 @@ Java : @@snip [LambdaPersistenceDocTest.java]($code$/java/jdocs/persistence/LambdaPersistenceDocTest.java) { #defer } Notice that the `sender()` is **safe** to access in the handler callback, and will be pointing to the original sender -of the command for which this `deferAsync` handler was called. +of the command for which this `defer` or `deferAsync` handler was called. The calling side will get the responses in this (guaranteed) order: @@ -350,7 +351,7 @@ Scala Java : @@snip [LambdaPersistenceDocTest.java]($code$/java/jdocs/persistence/LambdaPersistenceDocTest.java) { #defer-caller } -You can also call `deferAsync` with `persist`. +You can also call `defer` or `deferAsync` with `persist`. Scala : @@snip [PersistenceDocSpec.scala]($code$/scala/docs/persistence/PersistenceDocSpec.scala) { #defer-with-persist } @@ -361,7 +362,7 @@ Java @@@ warning The callback will not be invoked if the actor is restarted (or stopped) in between the call to -`deferAsync` and the journal has processed and confirmed all preceding writes. +`defer` or `deferAsync` and the journal has processed and confirmed all preceding writes. @@@ diff --git a/akka-docs/src/test/java/jdocs/persistence/LambdaPersistenceDocTest.java b/akka-docs/src/test/java/jdocs/persistence/LambdaPersistenceDocTest.java index 84df46e70f..02fc53a25f 100644 --- a/akka-docs/src/test/java/jdocs/persistence/LambdaPersistenceDocTest.java +++ b/akka-docs/src/test/java/jdocs/persistence/LambdaPersistenceDocTest.java @@ -438,7 +438,7 @@ public class LambdaPersistenceDocTest { sender().tell(e, self()); }); - deferAsync(String.format("evt-%s-3", c), e -> { + defer(String.format("evt-%s-3", c), e -> { sender().tell(e, self()); }); } diff --git a/akka-docs/src/test/scala/docs/persistence/PersistenceDocSpec.scala b/akka-docs/src/test/scala/docs/persistence/PersistenceDocSpec.scala index 86615bed4f..ac792b0ed6 100644 --- a/akka-docs/src/test/scala/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/src/test/scala/docs/persistence/PersistenceDocSpec.scala @@ -295,7 +295,7 @@ object PersistenceDocSpec { sender() ! c persist(s"evt-$c-1") { e ⇒ sender() ! e } persist(s"evt-$c-2") { e ⇒ sender() ! e } - deferAsync(s"evt-$c-3") { e ⇒ sender() ! e } + defer(s"evt-$c-3") { e ⇒ sender() ! e } } } } diff --git a/akka-persistence/src/main/mima-filters/2.5.10.backwards.excludes b/akka-persistence/src/main/mima-filters/2.5.10.backwards.excludes new file mode 100644 index 0000000000..38469c2f05 --- /dev/null +++ b/akka-persistence/src/main/mima-filters/2.5.10.backwards.excludes @@ -0,0 +1,4 @@ +# #24508 Adding defer method to PersistentActor +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.PersistentActor.defer") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.Eventsourced.internalDefer") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.AbstractPersistentActorLike.defer") diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 2150e2d4b9..ed65c5c954 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -375,6 +375,21 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas } } + /** + * Internal API + */ + @InternalApi + final private[akka] def internalDefer[A](event: A)(handler: A ⇒ Unit): Unit = { + if (recoveryRunning) throw new IllegalStateException("Cannot defer during replay. Events can be deferred when receiving RecoveryCompleted or later.") + if (pendingInvocations.isEmpty) { + handler(event) + } else { + pendingStashingPersistInvocations += 1 + pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) + eventBatch = NonPersistentRepr(event, sender()) :: eventBatch + } + } + /** * Permanently deletes all persistent messages with sequence numbers less than or equal `toSequenceNr`. * diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala index d36755d72e..184084ec3e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala @@ -264,6 +264,28 @@ trait PersistentActor extends Eventsourced with PersistenceIdentity { def deferAsync[A](event: A)(handler: A ⇒ Unit): Unit = { internalDeferAsync(event)(handler) } + + /** + * Defer the handler execution until all pending handlers have been executed. It is guaranteed that no new commands + * will be received by a persistent actor between a call to `defer` and the execution of its `handler`. + * Allows to define logic within the actor, which will respect the invocation-order-guarantee + * in respect to `persistAsync` or `persist` calls. That is, if `persistAsync` or `persist` was invoked before `defer`, + * the corresponding handlers will be invoked in the same order as they were registered in. + * + * This call will NOT result in `event` being persisted, use `persist` or `persistAsync` instead + * if the given event should possible to replay. + * + * If there are no pending persist handler calls, the handler will be called immediately. + * + * If persistence of an earlier event fails, the persistent actor will stop, and the `handler` + * will not be run. + * + * @param event event to be handled in the future, when preceding persist operations have been processes + * @param handler handler for the given `event` + */ + def defer[A](event: A)(handler: A ⇒ Unit): Unit = { + internalDefer(event)(handler) + } } /** @@ -378,6 +400,28 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit def deferAsync[A](event: A)(handler: Procedure[A]): Unit = internalDeferAsync(event)(event ⇒ handler(event)) + /** + * Defer the handler execution until all pending handlers have been executed. It is guaranteed that no new commands + * will be received by a persistent actor between a call to `defer` and the execution of its `handler`. + * Allows to define logic within the actor, which will respect the invocation-order-guarantee + * in respect to `persistAsync` or `persist` calls. That is, if `persistAsync` or `persist` was invoked before `defer`, + * the corresponding handlers will be invoked in the same order as they were registered in. + * + * This call will NOT result in `event` being persisted, use `persist` or `persistAsync` instead + * if the given event should possible to replay. + * + * If there are no pending persist handler calls, the handler will be called immediately. + * + * If persistence of an earlier event fails, the persistent actor will stop, and the `handler` + * will not be run. + * + * @param event event to be handled in the future, when preceding persist operations have been processes + * @param handler handler for the given `event` + */ + def defer[A](event: A)(handler: Procedure[A]): Unit = { + internalDefer(event)(event ⇒ handler(event)) + } + /** * Java API: recovery handler that receives persisted events during recovery. If a state snapshot * has been captured and saved, this handler will receive a [[SnapshotOffer]] message @@ -562,6 +606,28 @@ abstract class AbstractPersistentActor extends AbstractActor with AbstractPersis def deferAsync[A](event: A)(handler: Procedure[A]): Unit = internalDeferAsync(event)(event ⇒ handler(event)) + /** + * Defer the handler execution until all pending handlers have been executed. It is guaranteed that no new commands + * will be received by a persistent actor between a call to `defer` and the execution of its `handler`. + * Allows to define logic within the actor, which will respect the invocation-order-guarantee + * in respect to `persistAsync` or `persist` calls. That is, if `persistAsync` or `persist` was invoked before `defer`, + * the corresponding handlers will be invoked in the same order as they were registered in. + * + * This call will NOT result in `event` being persisted, use `persist` or `persistAsync` instead + * if the given event should possible to replay. + * + * If there are no pending persist handler calls, the handler will be called immediately. + * + * If persistence of an earlier event fails, the persistent actor will stop, and the `handler` + * will not be run. + * + * @param event event to be handled in the future, when preceding persist operations have been processes + * @param handler handler for the given `event` + */ + def defer[A](event: A)(handler: Procedure[A]): Unit = { + internalDefer(event)(event ⇒ handler(event)) + } + } /** diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala index 7c8feb2a23..6b4030b5fc 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala @@ -359,43 +359,74 @@ object PersistentActorSpec { } } - class DeferringWithPersistActor(name: String) extends ExamplePersistentActor(name) { + trait DeferActor extends PersistentActor { + def doDefer[A](event: A)(handler: A ⇒ Unit): Unit + } + trait DeferSync { + this: PersistentActor ⇒ + def doDefer[A](event: A)(handler: A ⇒ Unit): Unit = defer(event)(handler) + } + trait DeferAsync { + this: PersistentActor ⇒ + def doDefer[A](event: A)(handler: A ⇒ Unit): Unit = deferAsync(event)(handler) + } + abstract class DeferringWithPersistActor(name: String) extends ExamplePersistentActor(name) with DeferActor { val receiveCommand: Receive = { case Cmd(data) ⇒ - deferAsync("d-1") { sender() ! _ } + doDefer("d-1") { sender() ! _ } persist(s"$data-2") { sender() ! _ } - deferAsync("d-3") { sender() ! _ } - deferAsync("d-4") { sender() ! _ } + doDefer("d-3") { sender() ! _ } + doDefer("d-4") { sender() ! _ } } } - class DeferringWithAsyncPersistActor(name: String) extends ExamplePersistentActor(name) { + class DeferringAsyncWithPersistActor(name: String) extends DeferringWithPersistActor(name) with DeferAsync + class DeferringSyncWithPersistActor(name: String) extends DeferringWithPersistActor(name) with DeferSync + abstract class DeferringWithAsyncPersistActor(name: String) extends ExamplePersistentActor(name) with DeferActor { val receiveCommand: Receive = { case Cmd(data) ⇒ - deferAsync(s"d-$data-1") { sender() ! _ } + doDefer(s"d-$data-1") { sender() ! _ } persistAsync(s"pa-$data-2") { sender() ! _ } - deferAsync(s"d-$data-3") { sender() ! _ } - deferAsync(s"d-$data-4") { sender() ! _ } + doDefer(s"d-$data-3") { sender() ! _ } + doDefer(s"d-$data-4") { sender() ! _ } } } - class DeferringMixedCallsPPADDPADPersistActor(name: String) extends ExamplePersistentActor(name) { + class DeferringAsyncWithAsyncPersistActor(name: String) extends DeferringWithAsyncPersistActor(name) with DeferAsync + class DeferringSyncWithAsyncPersistActor(name: String) extends DeferringWithAsyncPersistActor(name) with DeferSync + abstract class DeferringMixedCallsPPADDPADPersistActor(name: String) extends ExamplePersistentActor(name) with DeferActor { val receiveCommand: Receive = { case Cmd(data) ⇒ persist(s"p-$data-1") { sender() ! _ } persistAsync(s"pa-$data-2") { sender() ! _ } - deferAsync(s"d-$data-3") { sender() ! _ } - deferAsync(s"d-$data-4") { sender() ! _ } + doDefer(s"d-$data-3") { sender() ! _ } + doDefer(s"d-$data-4") { sender() ! _ } persistAsync(s"pa-$data-5") { sender() ! _ } - deferAsync(s"d-$data-6") { sender() ! _ } + doDefer(s"d-$data-6") { sender() ! _ } } } - class DeferringWithNoPersistCallsPersistActor(name: String) extends ExamplePersistentActor(name) { + class DeferringAsyncMixedCallsPPADDPADPersistActor(name: String) extends DeferringMixedCallsPPADDPADPersistActor(name) with DeferAsync + class DeferringSyncMixedCallsPPADDPADPersistActor(name: String) extends DeferringMixedCallsPPADDPADPersistActor(name) with DeferSync + abstract class DeferringWithNoPersistCallsPersistActor(name: String) extends ExamplePersistentActor(name) with DeferActor { + val receiveCommand: Receive = { + case Cmd(_) ⇒ + doDefer("d-1") { sender() ! _ } + doDefer("d-2") { sender() ! _ } + doDefer("d-3") { sender() ! _ } + } + } + class DeferringAsyncWithNoPersistCallsPersistActor(name: String) extends DeferringWithNoPersistCallsPersistActor(name) with DeferAsync + class DeferringSyncWithNoPersistCallsPersistActor(name: String) extends DeferringWithNoPersistCallsPersistActor(name) with DeferSync + abstract class DeferringActor(name: String) extends ExamplePersistentActor(name) with DeferActor { val receiveCommand: Receive = { case Cmd(data) ⇒ - deferAsync("d-1") { sender() ! _ } - deferAsync("d-2") { sender() ! _ } - deferAsync("d-3") { sender() ! _ } + sender() ! data + persist(()) { _ ⇒ } // skip calling defer immediately because of empty pending invocations + doDefer(Evt(s"$data-defer")) { evt ⇒ + sender() ! evt.data + } } } + class DeferringAsyncActor(name: String) extends DeferringActor(name) with DeferAsync + class DeferringSyncActor(name: String) extends DeferringActor(name) with DeferSync class StressOrdering(name: String) extends ExamplePersistentActor(name) { val receiveCommand: Receive = { @@ -931,69 +962,107 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi expectMsg(5.seconds, "done") } "allow deferring handlers in order to provide ordered processing in respect to persist handlers" in { - val persistentActor = namedPersistentActor[DeferringWithPersistActor] - persistentActor ! Cmd("a") - expectMsg("d-1") - expectMsg("a-2") - expectMsg("d-3") - expectMsg("d-4") - expectNoMsg(100.millis) + def test(actor: ActorRef): Unit = { + actor ! Cmd("a") + expectMsg("d-1") + expectMsg("a-2") + expectMsg("d-3") + expectMsg("d-4") + expectNoMsg(100.millis) + } + + test(namedPersistentActor[DeferringAsyncWithPersistActor]) + test(namedPersistentActor[DeferringSyncWithPersistActor]) } "allow deferring handlers in order to provide ordered processing in respect to asyncPersist handlers" in { - val persistentActor = namedPersistentActor[DeferringWithAsyncPersistActor] - persistentActor ! Cmd("a") - expectMsg("d-a-1") - expectMsg("pa-a-2") - expectMsg("d-a-3") - expectMsg("d-a-4") - expectNoMsg(100.millis) + def test(actor: ActorRef): Unit = { + actor ! Cmd("a") + expectMsg("d-a-1") + expectMsg("pa-a-2") + expectMsg("d-a-3") + expectMsg("d-a-4") + expectNoMsg(100.millis) + } + + test(namedPersistentActor[DeferringAsyncWithAsyncPersistActor]) + test(namedPersistentActor[DeferringSyncWithAsyncPersistActor]) } "invoke deferred handlers, in presence of mixed a long series persist / persistAsync calls" in { - val persistentActor = namedPersistentActor[DeferringMixedCallsPPADDPADPersistActor] - val p1, p2 = TestProbe() + def test(actor: ActorRef): Unit = { + val p1, p2 = TestProbe() - persistentActor.tell(Cmd("a"), p1.ref) - persistentActor.tell(Cmd("b"), p2.ref) - p1.expectMsg("p-a-1") - p1.expectMsg("pa-a-2") - p1.expectMsg("d-a-3") - p1.expectMsg("d-a-4") - p1.expectMsg("pa-a-5") - p1.expectMsg("d-a-6") + actor.tell(Cmd("a"), p1.ref) + actor.tell(Cmd("b"), p2.ref) + p1.expectMsg("p-a-1") + p1.expectMsg("pa-a-2") + p1.expectMsg("d-a-3") + p1.expectMsg("d-a-4") + p1.expectMsg("pa-a-5") + p1.expectMsg("d-a-6") - p2.expectMsg("p-b-1") - p2.expectMsg("pa-b-2") - p2.expectMsg("d-b-3") - p2.expectMsg("d-b-4") - p2.expectMsg("pa-b-5") - p2.expectMsg("d-b-6") + p2.expectMsg("p-b-1") + p2.expectMsg("pa-b-2") + p2.expectMsg("d-b-3") + p2.expectMsg("d-b-4") + p2.expectMsg("pa-b-5") + p2.expectMsg("d-b-6") - expectNoMsg(100.millis) + expectNoMsg(100.millis) + } + + test(namedPersistentActor[DeferringAsyncMixedCallsPPADDPADPersistActor]) + test(namedPersistentActor[DeferringSyncMixedCallsPPADDPADPersistActor]) } "invoke deferred handlers right away, if there are no pending persist handlers registered" in { - val persistentActor = namedPersistentActor[DeferringWithNoPersistCallsPersistActor] - persistentActor ! Cmd("a") - expectMsg("d-1") - expectMsg("d-2") - expectMsg("d-3") - expectNoMsg(100.millis) + def test(actor: ActorRef): Unit = { + actor ! Cmd("a") + expectMsg("d-1") + expectMsg("d-2") + expectMsg("d-3") + expectNoMsg(100.millis) + } + + test(namedPersistentActor[DeferringAsyncWithNoPersistCallsPersistActor]) + test(namedPersistentActor[DeferringSyncWithNoPersistCallsPersistActor]) } "invoke deferred handlers, preserving the original sender references" in { - val persistentActor = namedPersistentActor[DeferringWithAsyncPersistActor] - val p1, p2 = TestProbe() + def test(actor: ActorRef): Unit = { + val p1, p2 = TestProbe() - persistentActor.tell(Cmd("a"), p1.ref) - persistentActor.tell(Cmd("b"), p2.ref) - p1.expectMsg("d-a-1") - p1.expectMsg("pa-a-2") - p1.expectMsg("d-a-3") - p1.expectMsg("d-a-4") + actor.tell(Cmd("a"), p1.ref) + actor.tell(Cmd("b"), p2.ref) + p1.expectMsg("d-a-1") + p1.expectMsg("pa-a-2") + p1.expectMsg("d-a-3") + p1.expectMsg("d-a-4") - p2.expectMsg("d-b-1") - p2.expectMsg("pa-b-2") - p2.expectMsg("d-b-3") - p2.expectMsg("d-b-4") - expectNoMsg(100.millis) + p2.expectMsg("d-b-1") + p2.expectMsg("pa-b-2") + p2.expectMsg("d-b-3") + p2.expectMsg("d-b-4") + expectNoMsg(100.millis) + } + + test(namedPersistentActor[DeferringAsyncWithAsyncPersistActor]) + test(namedPersistentActor[DeferringSyncWithAsyncPersistActor]) + } + "handle new messages before deferAsync handler is called" in { + val persistentActor = namedPersistentActor[DeferringAsyncActor] + persistentActor ! Cmd("x") + persistentActor ! Cmd("y") + expectMsg("x") + expectMsg("y") // "y" command was processed before event persisted + expectMsg("x-defer") + expectMsg("y-defer") + } + "handle defer sequentially" in { + val persistentActor = namedPersistentActor[DeferringSyncActor] + persistentActor ! Cmd("x") + persistentActor ! Cmd("y") + expectMsg("x") + expectMsg("x-defer") + expectMsg("y") + expectMsg("y-defer") } "receive RecoveryFinished if it is handled after all events have been replayed" in { val persistentActor1 = system.actorOf(Props(classOf[SnapshottingPersistentActor], name, testActor))