Merge pull request #24508 from psliwa/patch-1-defer-method
Adding defer method to PersistentActor
This commit is contained in:
commit
11f2ef5784
7 changed files with 230 additions and 75 deletions
|
|
@ -325,11 +325,12 @@ The callback will not be invoked if the actor is restarted (or stopped) in betwe
|
|||
### Deferring actions until preceding persist handlers have executed
|
||||
|
||||
Sometimes when working with `persistAsync` or `persist` you may find that it would be nice to define some actions in terms of
|
||||
''happens-after the previous `persistAsync`/`persist` handlers have been invoked''. `PersistentActor` provides an utility method
|
||||
called `deferAsync`, which works similarly to `persistAsync` yet does not persist the passed in event. It is recommended to
|
||||
use it for *read* operations, and actions which do not have corresponding events in your domain model.
|
||||
''happens-after the previous `persistAsync`/`persist` handlers have been invoked''. `PersistentActor` provides utility methods
|
||||
called `defer` and `deferAsync`, which work similarly to `persist` and `persistAsync` respectively yet do not persist the
|
||||
passed in event. It is recommended to use them for *read* operations, and actions which do not have corresponding events in your
|
||||
domain model.
|
||||
|
||||
Using this method is very similar to the persist family of methods, yet it does **not** persist the passed in event.
|
||||
Using those methods is very similar to the persist family of methods, yet they do **not** persist the passed in event.
|
||||
It will be kept in memory and used when invoking the handler.
|
||||
|
||||
Scala
|
||||
|
|
@ -339,7 +340,7 @@ Java
|
|||
: @@snip [LambdaPersistenceDocTest.java]($code$/java/jdocs/persistence/LambdaPersistenceDocTest.java) { #defer }
|
||||
|
||||
Notice that the `sender()` is **safe** to access in the handler callback, and will be pointing to the original sender
|
||||
of the command for which this `deferAsync` handler was called.
|
||||
of the command for which this `defer` or `deferAsync` handler was called.
|
||||
|
||||
The calling side will get the responses in this (guaranteed) order:
|
||||
|
||||
|
|
@ -349,7 +350,7 @@ Scala
|
|||
Java
|
||||
: @@snip [LambdaPersistenceDocTest.java]($code$/java/jdocs/persistence/LambdaPersistenceDocTest.java) { #defer-caller }
|
||||
|
||||
You can also call `deferAsync` with `persist`.
|
||||
You can also call `defer` or `deferAsync` with `persist`.
|
||||
|
||||
Scala
|
||||
: @@snip [PersistenceDocSpec.scala]($code$/scala/docs/persistence/PersistenceDocSpec.scala) { #defer-with-persist }
|
||||
|
|
@ -360,7 +361,7 @@ Java
|
|||
@@@ warning
|
||||
|
||||
The callback will not be invoked if the actor is restarted (or stopped) in between the call to
|
||||
`deferAsync` and the journal has processed and confirmed all preceding writes.
|
||||
`defer` or `deferAsync` and the journal has processed and confirmed all preceding writes.
|
||||
|
||||
@@@
|
||||
|
||||
|
|
|
|||
|
|
@ -439,7 +439,7 @@ public class LambdaPersistenceDocTest {
|
|||
sender().tell(e, self());
|
||||
});
|
||||
|
||||
deferAsync(String.format("evt-%s-3", c), e -> {
|
||||
defer(String.format("evt-%s-3", c), e -> {
|
||||
sender().tell(e, self());
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -295,7 +295,7 @@ object PersistenceDocSpec {
|
|||
sender() ! c
|
||||
persist(s"evt-$c-1") { e ⇒ sender() ! e }
|
||||
persist(s"evt-$c-2") { e ⇒ sender() ! e }
|
||||
deferAsync(s"evt-$c-3") { e ⇒ sender() ! e }
|
||||
defer(s"evt-$c-3") { e ⇒ sender() ! e }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,4 @@
|
|||
# #24508 Adding defer method to PersistentActor
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.PersistentActor.defer")
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.Eventsourced.internalDefer")
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.AbstractPersistentActorLike.defer")
|
||||
|
|
@ -375,6 +375,21 @@ private[persistence] trait Eventsourced extends Snapshotter with PersistenceStas
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal API
|
||||
*/
|
||||
@InternalApi
|
||||
final private[akka] def internalDefer[A](event: A)(handler: A ⇒ Unit): Unit = {
|
||||
if (recoveryRunning) throw new IllegalStateException("Cannot defer during replay. Events can be deferred when receiving RecoveryCompleted or later.")
|
||||
if (pendingInvocations.isEmpty) {
|
||||
handler(event)
|
||||
} else {
|
||||
pendingStashingPersistInvocations += 1
|
||||
pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit])
|
||||
eventBatch = NonPersistentRepr(event, sender()) :: eventBatch
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Permanently deletes all persistent messages with sequence numbers less than or equal `toSequenceNr`.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -264,6 +264,28 @@ trait PersistentActor extends Eventsourced with PersistenceIdentity {
|
|||
def deferAsync[A](event: A)(handler: A ⇒ Unit): Unit = {
|
||||
internalDeferAsync(event)(handler)
|
||||
}
|
||||
|
||||
/**
|
||||
* Defer the handler execution until all pending handlers have been executed. It is guaranteed that no new commands
|
||||
* will be received by a persistent actor between a call to `defer` and the execution of its `handler`.
|
||||
* Allows to define logic within the actor, which will respect the invocation-order-guarantee
|
||||
* in respect to `persistAsync` or `persist` calls. That is, if `persistAsync` or `persist` was invoked before `defer`,
|
||||
* the corresponding handlers will be invoked in the same order as they were registered in.
|
||||
*
|
||||
* This call will NOT result in `event` being persisted, use `persist` or `persistAsync` instead
|
||||
* if the given event should possible to replay.
|
||||
*
|
||||
* If there are no pending persist handler calls, the handler will be called immediately.
|
||||
*
|
||||
* If persistence of an earlier event fails, the persistent actor will stop, and the `handler`
|
||||
* will not be run.
|
||||
*
|
||||
* @param event event to be handled in the future, when preceding persist operations have been processes
|
||||
* @param handler handler for the given `event`
|
||||
*/
|
||||
def defer[A](event: A)(handler: A ⇒ Unit): Unit = {
|
||||
internalDefer(event)(handler)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -378,6 +400,28 @@ abstract class UntypedPersistentActor extends UntypedActor with Eventsourced wit
|
|||
def deferAsync[A](event: A)(handler: Procedure[A]): Unit =
|
||||
internalDeferAsync(event)(event ⇒ handler(event))
|
||||
|
||||
/**
|
||||
* Defer the handler execution until all pending handlers have been executed. It is guaranteed that no new commands
|
||||
* will be received by a persistent actor between a call to `defer` and the execution of its `handler`.
|
||||
* Allows to define logic within the actor, which will respect the invocation-order-guarantee
|
||||
* in respect to `persistAsync` or `persist` calls. That is, if `persistAsync` or `persist` was invoked before `defer`,
|
||||
* the corresponding handlers will be invoked in the same order as they were registered in.
|
||||
*
|
||||
* This call will NOT result in `event` being persisted, use `persist` or `persistAsync` instead
|
||||
* if the given event should possible to replay.
|
||||
*
|
||||
* If there are no pending persist handler calls, the handler will be called immediately.
|
||||
*
|
||||
* If persistence of an earlier event fails, the persistent actor will stop, and the `handler`
|
||||
* will not be run.
|
||||
*
|
||||
* @param event event to be handled in the future, when preceding persist operations have been processes
|
||||
* @param handler handler for the given `event`
|
||||
*/
|
||||
def defer[A](event: A)(handler: Procedure[A]): Unit = {
|
||||
internalDefer(event)(event ⇒ handler(event))
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API: recovery handler that receives persisted events during recovery. If a state snapshot
|
||||
* has been captured and saved, this handler will receive a [[SnapshotOffer]] message
|
||||
|
|
@ -562,6 +606,28 @@ abstract class AbstractPersistentActor extends AbstractActor with AbstractPersis
|
|||
def deferAsync[A](event: A)(handler: Procedure[A]): Unit =
|
||||
internalDeferAsync(event)(event ⇒ handler(event))
|
||||
|
||||
/**
|
||||
* Defer the handler execution until all pending handlers have been executed. It is guaranteed that no new commands
|
||||
* will be received by a persistent actor between a call to `defer` and the execution of its `handler`.
|
||||
* Allows to define logic within the actor, which will respect the invocation-order-guarantee
|
||||
* in respect to `persistAsync` or `persist` calls. That is, if `persistAsync` or `persist` was invoked before `defer`,
|
||||
* the corresponding handlers will be invoked in the same order as they were registered in.
|
||||
*
|
||||
* This call will NOT result in `event` being persisted, use `persist` or `persistAsync` instead
|
||||
* if the given event should possible to replay.
|
||||
*
|
||||
* If there are no pending persist handler calls, the handler will be called immediately.
|
||||
*
|
||||
* If persistence of an earlier event fails, the persistent actor will stop, and the `handler`
|
||||
* will not be run.
|
||||
*
|
||||
* @param event event to be handled in the future, when preceding persist operations have been processes
|
||||
* @param handler handler for the given `event`
|
||||
*/
|
||||
def defer[A](event: A)(handler: Procedure[A]): Unit = {
|
||||
internalDefer(event)(event ⇒ handler(event))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -359,43 +359,74 @@ object PersistentActorSpec {
|
|||
}
|
||||
|
||||
}
|
||||
class DeferringWithPersistActor(name: String) extends ExamplePersistentActor(name) {
|
||||
trait DeferActor extends PersistentActor {
|
||||
def doDefer[A](event: A)(handler: A ⇒ Unit): Unit
|
||||
}
|
||||
trait DeferSync {
|
||||
this: PersistentActor ⇒
|
||||
def doDefer[A](event: A)(handler: A ⇒ Unit): Unit = defer(event)(handler)
|
||||
}
|
||||
trait DeferAsync {
|
||||
this: PersistentActor ⇒
|
||||
def doDefer[A](event: A)(handler: A ⇒ Unit): Unit = deferAsync(event)(handler)
|
||||
}
|
||||
abstract class DeferringWithPersistActor(name: String) extends ExamplePersistentActor(name) with DeferActor {
|
||||
val receiveCommand: Receive = {
|
||||
case Cmd(data) ⇒
|
||||
deferAsync("d-1") { sender() ! _ }
|
||||
doDefer("d-1") { sender() ! _ }
|
||||
persist(s"$data-2") { sender() ! _ }
|
||||
deferAsync("d-3") { sender() ! _ }
|
||||
deferAsync("d-4") { sender() ! _ }
|
||||
doDefer("d-3") { sender() ! _ }
|
||||
doDefer("d-4") { sender() ! _ }
|
||||
}
|
||||
}
|
||||
class DeferringWithAsyncPersistActor(name: String) extends ExamplePersistentActor(name) {
|
||||
class DeferringAsyncWithPersistActor(name: String) extends DeferringWithPersistActor(name) with DeferAsync
|
||||
class DeferringSyncWithPersistActor(name: String) extends DeferringWithPersistActor(name) with DeferSync
|
||||
abstract class DeferringWithAsyncPersistActor(name: String) extends ExamplePersistentActor(name) with DeferActor {
|
||||
val receiveCommand: Receive = {
|
||||
case Cmd(data) ⇒
|
||||
deferAsync(s"d-$data-1") { sender() ! _ }
|
||||
doDefer(s"d-$data-1") { sender() ! _ }
|
||||
persistAsync(s"pa-$data-2") { sender() ! _ }
|
||||
deferAsync(s"d-$data-3") { sender() ! _ }
|
||||
deferAsync(s"d-$data-4") { sender() ! _ }
|
||||
doDefer(s"d-$data-3") { sender() ! _ }
|
||||
doDefer(s"d-$data-4") { sender() ! _ }
|
||||
}
|
||||
}
|
||||
class DeferringMixedCallsPPADDPADPersistActor(name: String) extends ExamplePersistentActor(name) {
|
||||
class DeferringAsyncWithAsyncPersistActor(name: String) extends DeferringWithAsyncPersistActor(name) with DeferAsync
|
||||
class DeferringSyncWithAsyncPersistActor(name: String) extends DeferringWithAsyncPersistActor(name) with DeferSync
|
||||
abstract class DeferringMixedCallsPPADDPADPersistActor(name: String) extends ExamplePersistentActor(name) with DeferActor {
|
||||
val receiveCommand: Receive = {
|
||||
case Cmd(data) ⇒
|
||||
persist(s"p-$data-1") { sender() ! _ }
|
||||
persistAsync(s"pa-$data-2") { sender() ! _ }
|
||||
deferAsync(s"d-$data-3") { sender() ! _ }
|
||||
deferAsync(s"d-$data-4") { sender() ! _ }
|
||||
doDefer(s"d-$data-3") { sender() ! _ }
|
||||
doDefer(s"d-$data-4") { sender() ! _ }
|
||||
persistAsync(s"pa-$data-5") { sender() ! _ }
|
||||
deferAsync(s"d-$data-6") { sender() ! _ }
|
||||
doDefer(s"d-$data-6") { sender() ! _ }
|
||||
}
|
||||
}
|
||||
class DeferringWithNoPersistCallsPersistActor(name: String) extends ExamplePersistentActor(name) {
|
||||
class DeferringAsyncMixedCallsPPADDPADPersistActor(name: String) extends DeferringMixedCallsPPADDPADPersistActor(name) with DeferAsync
|
||||
class DeferringSyncMixedCallsPPADDPADPersistActor(name: String) extends DeferringMixedCallsPPADDPADPersistActor(name) with DeferSync
|
||||
abstract class DeferringWithNoPersistCallsPersistActor(name: String) extends ExamplePersistentActor(name) with DeferActor {
|
||||
val receiveCommand: Receive = {
|
||||
case Cmd(_) ⇒
|
||||
doDefer("d-1") { sender() ! _ }
|
||||
doDefer("d-2") { sender() ! _ }
|
||||
doDefer("d-3") { sender() ! _ }
|
||||
}
|
||||
}
|
||||
class DeferringAsyncWithNoPersistCallsPersistActor(name: String) extends DeferringWithNoPersistCallsPersistActor(name) with DeferAsync
|
||||
class DeferringSyncWithNoPersistCallsPersistActor(name: String) extends DeferringWithNoPersistCallsPersistActor(name) with DeferSync
|
||||
abstract class DeferringActor(name: String) extends ExamplePersistentActor(name) with DeferActor {
|
||||
val receiveCommand: Receive = {
|
||||
case Cmd(data) ⇒
|
||||
deferAsync("d-1") { sender() ! _ }
|
||||
deferAsync("d-2") { sender() ! _ }
|
||||
deferAsync("d-3") { sender() ! _ }
|
||||
sender() ! data
|
||||
persist(()) { _ ⇒ } // skip calling defer immediately because of empty pending invocations
|
||||
doDefer(Evt(s"$data-defer")) { evt ⇒
|
||||
sender() ! evt.data
|
||||
}
|
||||
}
|
||||
}
|
||||
class DeferringAsyncActor(name: String) extends DeferringActor(name) with DeferAsync
|
||||
class DeferringSyncActor(name: String) extends DeferringActor(name) with DeferSync
|
||||
|
||||
class StressOrdering(name: String) extends ExamplePersistentActor(name) {
|
||||
val receiveCommand: Receive = {
|
||||
|
|
@ -936,69 +967,107 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi
|
|||
expectMsg(5.seconds, "done")
|
||||
}
|
||||
"allow deferring handlers in order to provide ordered processing in respect to persist handlers" in {
|
||||
val persistentActor = namedPersistentActor[DeferringWithPersistActor]
|
||||
persistentActor ! Cmd("a")
|
||||
expectMsg("d-1")
|
||||
expectMsg("a-2")
|
||||
expectMsg("d-3")
|
||||
expectMsg("d-4")
|
||||
expectNoMsg(100.millis)
|
||||
def test(actor: ActorRef): Unit = {
|
||||
actor ! Cmd("a")
|
||||
expectMsg("d-1")
|
||||
expectMsg("a-2")
|
||||
expectMsg("d-3")
|
||||
expectMsg("d-4")
|
||||
expectNoMsg(100.millis)
|
||||
}
|
||||
|
||||
test(namedPersistentActor[DeferringAsyncWithPersistActor])
|
||||
test(namedPersistentActor[DeferringSyncWithPersistActor])
|
||||
}
|
||||
"allow deferring handlers in order to provide ordered processing in respect to asyncPersist handlers" in {
|
||||
val persistentActor = namedPersistentActor[DeferringWithAsyncPersistActor]
|
||||
persistentActor ! Cmd("a")
|
||||
expectMsg("d-a-1")
|
||||
expectMsg("pa-a-2")
|
||||
expectMsg("d-a-3")
|
||||
expectMsg("d-a-4")
|
||||
expectNoMsg(100.millis)
|
||||
def test(actor: ActorRef): Unit = {
|
||||
actor ! Cmd("a")
|
||||
expectMsg("d-a-1")
|
||||
expectMsg("pa-a-2")
|
||||
expectMsg("d-a-3")
|
||||
expectMsg("d-a-4")
|
||||
expectNoMsg(100.millis)
|
||||
}
|
||||
|
||||
test(namedPersistentActor[DeferringAsyncWithAsyncPersistActor])
|
||||
test(namedPersistentActor[DeferringSyncWithAsyncPersistActor])
|
||||
}
|
||||
"invoke deferred handlers, in presence of mixed a long series persist / persistAsync calls" in {
|
||||
val persistentActor = namedPersistentActor[DeferringMixedCallsPPADDPADPersistActor]
|
||||
val p1, p2 = TestProbe()
|
||||
def test(actor: ActorRef): Unit = {
|
||||
val p1, p2 = TestProbe()
|
||||
|
||||
persistentActor.tell(Cmd("a"), p1.ref)
|
||||
persistentActor.tell(Cmd("b"), p2.ref)
|
||||
p1.expectMsg("p-a-1")
|
||||
p1.expectMsg("pa-a-2")
|
||||
p1.expectMsg("d-a-3")
|
||||
p1.expectMsg("d-a-4")
|
||||
p1.expectMsg("pa-a-5")
|
||||
p1.expectMsg("d-a-6")
|
||||
actor.tell(Cmd("a"), p1.ref)
|
||||
actor.tell(Cmd("b"), p2.ref)
|
||||
p1.expectMsg("p-a-1")
|
||||
p1.expectMsg("pa-a-2")
|
||||
p1.expectMsg("d-a-3")
|
||||
p1.expectMsg("d-a-4")
|
||||
p1.expectMsg("pa-a-5")
|
||||
p1.expectMsg("d-a-6")
|
||||
|
||||
p2.expectMsg("p-b-1")
|
||||
p2.expectMsg("pa-b-2")
|
||||
p2.expectMsg("d-b-3")
|
||||
p2.expectMsg("d-b-4")
|
||||
p2.expectMsg("pa-b-5")
|
||||
p2.expectMsg("d-b-6")
|
||||
p2.expectMsg("p-b-1")
|
||||
p2.expectMsg("pa-b-2")
|
||||
p2.expectMsg("d-b-3")
|
||||
p2.expectMsg("d-b-4")
|
||||
p2.expectMsg("pa-b-5")
|
||||
p2.expectMsg("d-b-6")
|
||||
|
||||
expectNoMsg(100.millis)
|
||||
expectNoMsg(100.millis)
|
||||
}
|
||||
|
||||
test(namedPersistentActor[DeferringAsyncMixedCallsPPADDPADPersistActor])
|
||||
test(namedPersistentActor[DeferringSyncMixedCallsPPADDPADPersistActor])
|
||||
}
|
||||
"invoke deferred handlers right away, if there are no pending persist handlers registered" in {
|
||||
val persistentActor = namedPersistentActor[DeferringWithNoPersistCallsPersistActor]
|
||||
persistentActor ! Cmd("a")
|
||||
expectMsg("d-1")
|
||||
expectMsg("d-2")
|
||||
expectMsg("d-3")
|
||||
expectNoMsg(100.millis)
|
||||
def test(actor: ActorRef): Unit = {
|
||||
actor ! Cmd("a")
|
||||
expectMsg("d-1")
|
||||
expectMsg("d-2")
|
||||
expectMsg("d-3")
|
||||
expectNoMsg(100.millis)
|
||||
}
|
||||
|
||||
test(namedPersistentActor[DeferringAsyncWithNoPersistCallsPersistActor])
|
||||
test(namedPersistentActor[DeferringSyncWithNoPersistCallsPersistActor])
|
||||
}
|
||||
"invoke deferred handlers, preserving the original sender references" in {
|
||||
val persistentActor = namedPersistentActor[DeferringWithAsyncPersistActor]
|
||||
val p1, p2 = TestProbe()
|
||||
def test(actor: ActorRef): Unit = {
|
||||
val p1, p2 = TestProbe()
|
||||
|
||||
persistentActor.tell(Cmd("a"), p1.ref)
|
||||
persistentActor.tell(Cmd("b"), p2.ref)
|
||||
p1.expectMsg("d-a-1")
|
||||
p1.expectMsg("pa-a-2")
|
||||
p1.expectMsg("d-a-3")
|
||||
p1.expectMsg("d-a-4")
|
||||
actor.tell(Cmd("a"), p1.ref)
|
||||
actor.tell(Cmd("b"), p2.ref)
|
||||
p1.expectMsg("d-a-1")
|
||||
p1.expectMsg("pa-a-2")
|
||||
p1.expectMsg("d-a-3")
|
||||
p1.expectMsg("d-a-4")
|
||||
|
||||
p2.expectMsg("d-b-1")
|
||||
p2.expectMsg("pa-b-2")
|
||||
p2.expectMsg("d-b-3")
|
||||
p2.expectMsg("d-b-4")
|
||||
expectNoMsg(100.millis)
|
||||
p2.expectMsg("d-b-1")
|
||||
p2.expectMsg("pa-b-2")
|
||||
p2.expectMsg("d-b-3")
|
||||
p2.expectMsg("d-b-4")
|
||||
expectNoMsg(100.millis)
|
||||
}
|
||||
|
||||
test(namedPersistentActor[DeferringAsyncWithAsyncPersistActor])
|
||||
test(namedPersistentActor[DeferringSyncWithAsyncPersistActor])
|
||||
}
|
||||
"handle new messages before deferAsync handler is called" in {
|
||||
val persistentActor = namedPersistentActor[DeferringAsyncActor]
|
||||
persistentActor ! Cmd("x")
|
||||
persistentActor ! Cmd("y")
|
||||
expectMsg("x")
|
||||
expectMsg("y") // "y" command was processed before event persisted
|
||||
expectMsg("x-defer")
|
||||
expectMsg("y-defer")
|
||||
}
|
||||
"handle defer sequentially" in {
|
||||
val persistentActor = namedPersistentActor[DeferringSyncActor]
|
||||
persistentActor ! Cmd("x")
|
||||
persistentActor ! Cmd("y")
|
||||
expectMsg("x")
|
||||
expectMsg("x-defer")
|
||||
expectMsg("y")
|
||||
expectMsg("y-defer")
|
||||
}
|
||||
"receive RecoveryFinished if it is handled after all events have been replayed" in {
|
||||
val persistentActor1 = system.actorOf(Props(classOf[SnapshottingPersistentActor], name, testActor))
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue