!per #3828 Rename receiveReplay to receiveRecover
- because it handles messages related to recovery (snapshots and replayed messages)
This commit is contained in:
parent
34f9f4cfb9
commit
e0f5cf5f2c
13 changed files with 30 additions and 30 deletions
|
|
@ -1141,7 +1141,7 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite
|
|||
rebalanceTask.cancel()
|
||||
}
|
||||
|
||||
override def receiveReplay: Receive = {
|
||||
override def receiveRecover: Receive = {
|
||||
case evt: DomainEvent ⇒ evt match {
|
||||
case ShardRegionRegistered(region) ⇒
|
||||
context.watch(region)
|
||||
|
|
|
|||
|
|
@ -91,7 +91,7 @@ object ClusterShardingSpec extends MultiNodeConfig {
|
|||
def updateState(event: CounterChanged): Unit =
|
||||
count += event.delta
|
||||
|
||||
override def receiveReplay: Receive = {
|
||||
override def receiveRecover: Receive = {
|
||||
case evt: CounterChanged ⇒ updateState(evt)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -124,7 +124,7 @@ public class ClusterShardingTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onReceiveReplay(Object msg) {
|
||||
public void onReceiveRecover(Object msg) {
|
||||
if (msg instanceof CounterChanged)
|
||||
updateState((CounterChanged) msg);
|
||||
else
|
||||
|
|
|
|||
|
|
@ -336,7 +336,7 @@ public class PersistenceDocTest {
|
|||
event, getCurrentPersistentMessage()), destination.path()), getSelf());
|
||||
}
|
||||
|
||||
public void onReceiveReplay(Object msg) {
|
||||
public void onReceiveRecover(Object msg) {
|
||||
if (msg instanceof String) {
|
||||
handleEvent((String)msg);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -434,7 +434,7 @@ also process commands that do not change application state, such as query comman
|
|||
Akka persistence supports event sourcing with the abstract ``UntypedEventsourcedProcessor`` class (which implements
|
||||
event sourcing as a pattern on top of command sourcing). A processor that extends this abstract class does not handle
|
||||
``Persistent`` messages directly but uses the ``persist`` method to persist and handle events. The behavior of an
|
||||
``UntypedEventsourcedProcessor`` is defined by implementing ``onReceiveReplay`` and ``onReceiveCommand``. This is
|
||||
``UntypedEventsourcedProcessor`` is defined by implementing ``onReceiveRecover`` and ``onReceiveCommand``. This is
|
||||
demonstrated in the following example.
|
||||
|
||||
.. includecode:: ../../../akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/EventsourcedExample.java#eventsourced-example
|
||||
|
|
@ -442,7 +442,7 @@ demonstrated in the following example.
|
|||
The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The
|
||||
``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``.
|
||||
|
||||
The processor's ``onReceiveReplay`` method defines how ``state`` is updated during recovery by handling ``Evt``
|
||||
The processor's ``onReceiveRecover`` method defines how ``state`` is updated during recovery by handling ``Evt``
|
||||
and ``SnapshotOffer`` messages. The processor's ``onReceiveCommand`` method is a command handler. In this example,
|
||||
a command is handled by generating two events which are then persisted and handled. Events are persisted by calling
|
||||
``persist`` with an event (or a sequence of events) as first argument and an event handler as second argument.
|
||||
|
|
@ -465,7 +465,7 @@ Reliable event delivery
|
|||
-----------------------
|
||||
|
||||
Sending events from an event handler to another actor has at-most-once delivery semantics. For at-least-once delivery,
|
||||
:ref:`channels-java` must be used. In this case, also replayed events (received by ``receiveReplay``) must be sent to a
|
||||
:ref:`channels-java` must be used. In this case, also replayed events (received by ``receiveRecover``) must be sent to a
|
||||
channel, as shown in the following example:
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#reliable-event-delivery
|
||||
|
|
|
|||
|
|
@ -295,7 +295,7 @@ trait PersistenceDocSpec {
|
|||
channel ! Deliver(Persistent(event), destination.path)
|
||||
}
|
||||
|
||||
def receiveReplay: Receive = {
|
||||
def receiveRecover: Receive = {
|
||||
case event: String => handleEvent(event)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -445,14 +445,14 @@ also process commands that do not change application state, such as query comman
|
|||
Akka persistence supports event sourcing with the ``EventsourcedProcessor`` trait (which implements event sourcing
|
||||
as a pattern on top of command sourcing). A processor that extends this trait does not handle ``Persistent`` messages
|
||||
directly but uses the ``persist`` method to persist and handle events. The behavior of an ``EventsourcedProcessor``
|
||||
is defined by implementing ``receiveReplay`` and ``receiveCommand``. This is demonstrated in the following example.
|
||||
is defined by implementing ``receiveRecover`` and ``receiveCommand``. This is demonstrated in the following example.
|
||||
|
||||
.. includecode:: ../../../akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/EventsourcedExample.scala#eventsourced-example
|
||||
|
||||
The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The
|
||||
``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``.
|
||||
|
||||
The processor's ``receiveReplay`` method defines how ``state`` is updated during recovery by handling ``Evt``
|
||||
The processor's ``receiveRecover`` method defines how ``state`` is updated during recovery by handling ``Evt``
|
||||
and ``SnapshotOffer`` messages. The processor's ``receiveCommand`` method is a command handler. In this example,
|
||||
a command is handled by generating two events which are then persisted and handled. Events are persisted by calling
|
||||
``persist`` with an event (or a sequence of events) as first argument and an event handler as second argument.
|
||||
|
|
@ -475,7 +475,7 @@ Reliable event delivery
|
|||
-----------------------
|
||||
|
||||
Sending events from an event handler to another actor has at-most-once delivery semantics. For at-least-once delivery,
|
||||
:ref:`channels` must be used. In this case, also replayed events (received by ``receiveReplay``) must be sent to a
|
||||
:ref:`channels` must be used. In this case, also replayed events (received by ``receiveRecover``) must be sent to a
|
||||
channel, as shown in the following example:
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#reliable-event-delivery
|
||||
|
|
|
|||
|
|
@ -137,7 +137,7 @@ private[persistence] trait Eventsourced extends Processor {
|
|||
events.foreach(persist(_)(handler))
|
||||
|
||||
/**
|
||||
* Replay handler that receives persisted events during recovery. If a state snapshot
|
||||
* Recovery handler that receives persisted events during recovery. If a state snapshot
|
||||
* has been captured and saved, this handler will receive a [[SnapshotOffer]] message
|
||||
* followed by events that are younger than the offered snapshot.
|
||||
*
|
||||
|
|
@ -147,7 +147,7 @@ private[persistence] trait Eventsourced extends Processor {
|
|||
*
|
||||
* @see [[Recover]]
|
||||
*/
|
||||
def receiveReplay: Receive
|
||||
def receiveRecover: Receive
|
||||
|
||||
/**
|
||||
* Command handler. Typically validates commands against current state (and/or by
|
||||
|
|
@ -191,12 +191,12 @@ private[persistence] trait Eventsourced extends Processor {
|
|||
* INTERNAL API.
|
||||
*/
|
||||
protected[persistence] val initialBehavior: Receive = {
|
||||
case Persistent(payload, _) if receiveReplay.isDefinedAt(payload) && recoveryRunning ⇒
|
||||
receiveReplay(payload)
|
||||
case s: SnapshotOffer if receiveReplay.isDefinedAt(s) ⇒
|
||||
receiveReplay(s)
|
||||
case f: RecoveryFailure if receiveReplay.isDefinedAt(f) ⇒
|
||||
receiveReplay(f)
|
||||
case Persistent(payload, _) if receiveRecover.isDefinedAt(payload) && recoveryRunning ⇒
|
||||
receiveRecover(payload)
|
||||
case s: SnapshotOffer if receiveRecover.isDefinedAt(s) ⇒
|
||||
receiveRecover(s)
|
||||
case f: RecoveryFailure if receiveRecover.isDefinedAt(f) ⇒
|
||||
receiveRecover(f)
|
||||
case msg if receiveCommand.isDefinedAt(msg) ⇒
|
||||
receiveCommand(msg)
|
||||
}
|
||||
|
|
@ -215,8 +215,8 @@ trait EventsourcedProcessor extends Processor with Eventsourced {
|
|||
abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Eventsourced {
|
||||
final def onReceive(message: Any) = initialBehavior(message)
|
||||
|
||||
final def receiveReplay: Receive = {
|
||||
case msg ⇒ onReceiveReplay(msg)
|
||||
final def receiveRecover: Receive = {
|
||||
case msg ⇒ onReceiveRecover(msg)
|
||||
}
|
||||
|
||||
final def receiveCommand: Receive = {
|
||||
|
|
@ -260,7 +260,7 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events
|
|||
persist(Util.immutableSeq(events))(event ⇒ handler(event))
|
||||
|
||||
/**
|
||||
* Java API: replay handler that receives persisted events during recovery. If a state snapshot
|
||||
* Java API: recovery handler that receives persisted events during recovery. If a state snapshot
|
||||
* has been captured and saved, this handler will receive a [[SnapshotOffer]] message
|
||||
* followed by events that are younger than the offered snapshot.
|
||||
*
|
||||
|
|
@ -270,7 +270,7 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events
|
|||
*
|
||||
* @see [[Recover]]
|
||||
*/
|
||||
def onReceiveReplay(msg: Any): Unit
|
||||
def onReceiveRecover(msg: Any): Unit
|
||||
|
||||
/**
|
||||
* Java API: command handler. Typically validates commands against current state (and/or by
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ object EventsourcedSpec {
|
|||
case GetState ⇒ sender ! events.reverse
|
||||
}
|
||||
|
||||
def receiveReplay = updateState
|
||||
def receiveRecover = updateState
|
||||
}
|
||||
|
||||
class Behavior1Processor(name: String) extends ExampleProcessor(name) {
|
||||
|
|
@ -123,7 +123,7 @@ object EventsourcedSpec {
|
|||
}
|
||||
|
||||
class SnapshottingEventsourcedProcessor(name: String, probe: ActorRef) extends ExampleProcessor(name) {
|
||||
override def receiveReplay = super.receiveReplay orElse {
|
||||
override def receiveRecover = super.receiveRecover orElse {
|
||||
case SnapshotOffer(_, events: List[_]) ⇒
|
||||
probe ! "offered"
|
||||
this.events = events
|
||||
|
|
|
|||
|
|
@ -86,7 +86,7 @@ object PerformanceSpec {
|
|||
}
|
||||
|
||||
class EventsourcedTestProcessor(name: String) extends PerformanceTestProcessor(name) with EventsourcedProcessor {
|
||||
val receiveReplay: Receive = {
|
||||
val receiveRecover: Receive = {
|
||||
case _ ⇒ if (lastSequenceNr % 1000 == 0) print("r")
|
||||
}
|
||||
|
||||
|
|
@ -99,7 +99,7 @@ object PerformanceSpec {
|
|||
}
|
||||
|
||||
class StashingEventsourcedTestProcessor(name: String) extends PerformanceTestProcessor(name) with EventsourcedProcessor {
|
||||
val receiveReplay: Receive = {
|
||||
val receiveRecover: Receive = {
|
||||
case _ ⇒ if (lastSequenceNr % 1000 == 0) print("r")
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ object ProcessorChannelSpec {
|
|||
channel ! Deliver(Persistent(event), destination.path)
|
||||
}
|
||||
|
||||
def receiveReplay: Receive = {
|
||||
def receiveRecover: Receive = {
|
||||
case event: String ⇒ handleEvent(event)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -70,7 +70,7 @@ class ExampleProcessor extends UntypedEventsourcedProcessor {
|
|||
return state.size();
|
||||
}
|
||||
|
||||
public void onReceiveReplay(Object msg) {
|
||||
public void onReceiveRecover(Object msg) {
|
||||
if (msg instanceof Evt) {
|
||||
state.update((Evt) msg);
|
||||
} else if (msg instanceof SnapshotOffer) {
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ class ExampleProcessor extends EventsourcedProcessor {
|
|||
def numEvents =
|
||||
state.size
|
||||
|
||||
val receiveReplay: Receive = {
|
||||
val receiveRecover: Receive = {
|
||||
case evt: Evt => updateState(evt)
|
||||
case SnapshotOffer(_, snapshot: ExampleState) => state = snapshot
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue