diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala index 883c05504c..e8618be5dc 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala @@ -1141,7 +1141,7 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite rebalanceTask.cancel() } - override def receiveReplay: Receive = { + override def receiveRecover: Receive = { case evt: DomainEvent ⇒ evt match { case ShardRegionRegistered(region) ⇒ context.watch(region) diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala index bc6c541846..64a8ea9711 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala @@ -91,7 +91,7 @@ object ClusterShardingSpec extends MultiNodeConfig { def updateState(event: CounterChanged): Unit = count += event.delta - override def receiveReplay: Receive = { + override def receiveRecover: Receive = { case evt: CounterChanged ⇒ updateState(evt) } diff --git a/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java b/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java index e7df11dc98..0e7f874e80 100644 --- a/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java +++ b/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java @@ -124,7 +124,7 @@ public class ClusterShardingTest { } @Override - public void onReceiveReplay(Object msg) { + public void onReceiveRecover(Object msg) { if (msg instanceof CounterChanged) updateState((CounterChanged) msg); else diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index 7542a86351..636f551285 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -336,7 +336,7 @@ public class PersistenceDocTest { event, getCurrentPersistentMessage()), destination.path()), getSelf()); } - public void onReceiveReplay(Object msg) { + public void onReceiveRecover(Object msg) { if (msg instanceof String) { handleEvent((String)msg); } diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 900d743362..b16048ab02 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -434,7 +434,7 @@ also process commands that do not change application state, such as query comman Akka persistence supports event sourcing with the abstract ``UntypedEventsourcedProcessor`` class (which implements event sourcing as a pattern on top of command sourcing). A processor that extends this abstract class does not handle ``Persistent`` messages directly but uses the ``persist`` method to persist and handle events. The behavior of an -``UntypedEventsourcedProcessor`` is defined by implementing ``onReceiveReplay`` and ``onReceiveCommand``. This is +``UntypedEventsourcedProcessor`` is defined by implementing ``onReceiveRecover`` and ``onReceiveCommand``. This is demonstrated in the following example. .. includecode:: ../../../akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/EventsourcedExample.java#eventsourced-example @@ -442,7 +442,7 @@ demonstrated in the following example. The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The ``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``. -The processor's ``onReceiveReplay`` method defines how ``state`` is updated during recovery by handling ``Evt`` +The processor's ``onReceiveRecover`` method defines how ``state`` is updated during recovery by handling ``Evt`` and ``SnapshotOffer`` messages. The processor's ``onReceiveCommand`` method is a command handler. In this example, a command is handled by generating two events which are then persisted and handled. Events are persisted by calling ``persist`` with an event (or a sequence of events) as first argument and an event handler as second argument. @@ -465,7 +465,7 @@ Reliable event delivery ----------------------- Sending events from an event handler to another actor has at-most-once delivery semantics. For at-least-once delivery, -:ref:`channels-java` must be used. In this case, also replayed events (received by ``receiveReplay``) must be sent to a +:ref:`channels-java` must be used. In this case, also replayed events (received by ``receiveRecover``) must be sent to a channel, as shown in the following example: .. includecode:: code/docs/persistence/PersistenceDocTest.java#reliable-event-delivery diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index 029f54591b..f96450c619 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -295,7 +295,7 @@ trait PersistenceDocSpec { channel ! Deliver(Persistent(event), destination.path) } - def receiveReplay: Receive = { + def receiveRecover: Receive = { case event: String => handleEvent(event) } diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index b2ff70d781..3260d50fea 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -445,14 +445,14 @@ also process commands that do not change application state, such as query comman Akka persistence supports event sourcing with the ``EventsourcedProcessor`` trait (which implements event sourcing as a pattern on top of command sourcing). A processor that extends this trait does not handle ``Persistent`` messages directly but uses the ``persist`` method to persist and handle events. The behavior of an ``EventsourcedProcessor`` -is defined by implementing ``receiveReplay`` and ``receiveCommand``. This is demonstrated in the following example. +is defined by implementing ``receiveRecover`` and ``receiveCommand``. This is demonstrated in the following example. .. includecode:: ../../../akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/EventsourcedExample.scala#eventsourced-example The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The ``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``. -The processor's ``receiveReplay`` method defines how ``state`` is updated during recovery by handling ``Evt`` +The processor's ``receiveRecover`` method defines how ``state`` is updated during recovery by handling ``Evt`` and ``SnapshotOffer`` messages. The processor's ``receiveCommand`` method is a command handler. In this example, a command is handled by generating two events which are then persisted and handled. Events are persisted by calling ``persist`` with an event (or a sequence of events) as first argument and an event handler as second argument. @@ -475,7 +475,7 @@ Reliable event delivery ----------------------- Sending events from an event handler to another actor has at-most-once delivery semantics. For at-least-once delivery, -:ref:`channels` must be used. In this case, also replayed events (received by ``receiveReplay``) must be sent to a +:ref:`channels` must be used. In this case, also replayed events (received by ``receiveRecover``) must be sent to a channel, as shown in the following example: .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#reliable-event-delivery diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index bdb08dcec5..1d3fdb591c 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -137,7 +137,7 @@ private[persistence] trait Eventsourced extends Processor { events.foreach(persist(_)(handler)) /** - * Replay handler that receives persisted events during recovery. If a state snapshot + * Recovery handler that receives persisted events during recovery. If a state snapshot * has been captured and saved, this handler will receive a [[SnapshotOffer]] message * followed by events that are younger than the offered snapshot. * @@ -147,7 +147,7 @@ private[persistence] trait Eventsourced extends Processor { * * @see [[Recover]] */ - def receiveReplay: Receive + def receiveRecover: Receive /** * Command handler. Typically validates commands against current state (and/or by @@ -191,12 +191,12 @@ private[persistence] trait Eventsourced extends Processor { * INTERNAL API. */ protected[persistence] val initialBehavior: Receive = { - case Persistent(payload, _) if receiveReplay.isDefinedAt(payload) && recoveryRunning ⇒ - receiveReplay(payload) - case s: SnapshotOffer if receiveReplay.isDefinedAt(s) ⇒ - receiveReplay(s) - case f: RecoveryFailure if receiveReplay.isDefinedAt(f) ⇒ - receiveReplay(f) + case Persistent(payload, _) if receiveRecover.isDefinedAt(payload) && recoveryRunning ⇒ + receiveRecover(payload) + case s: SnapshotOffer if receiveRecover.isDefinedAt(s) ⇒ + receiveRecover(s) + case f: RecoveryFailure if receiveRecover.isDefinedAt(f) ⇒ + receiveRecover(f) case msg if receiveCommand.isDefinedAt(msg) ⇒ receiveCommand(msg) } @@ -215,8 +215,8 @@ trait EventsourcedProcessor extends Processor with Eventsourced { abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Eventsourced { final def onReceive(message: Any) = initialBehavior(message) - final def receiveReplay: Receive = { - case msg ⇒ onReceiveReplay(msg) + final def receiveRecover: Receive = { + case msg ⇒ onReceiveRecover(msg) } final def receiveCommand: Receive = { @@ -260,7 +260,7 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events persist(Util.immutableSeq(events))(event ⇒ handler(event)) /** - * Java API: replay handler that receives persisted events during recovery. If a state snapshot + * Java API: recovery handler that receives persisted events during recovery. If a state snapshot * has been captured and saved, this handler will receive a [[SnapshotOffer]] message * followed by events that are younger than the offered snapshot. * @@ -270,7 +270,7 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events * * @see [[Recover]] */ - def onReceiveReplay(msg: Any): Unit + def onReceiveRecover(msg: Any): Unit /** * Java API: command handler. Typically validates commands against current state (and/or by diff --git a/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala b/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala index 74dcbd637a..68fd7de503 100644 --- a/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala @@ -28,7 +28,7 @@ object EventsourcedSpec { case GetState ⇒ sender ! events.reverse } - def receiveReplay = updateState + def receiveRecover = updateState } class Behavior1Processor(name: String) extends ExampleProcessor(name) { @@ -123,7 +123,7 @@ object EventsourcedSpec { } class SnapshottingEventsourcedProcessor(name: String, probe: ActorRef) extends ExampleProcessor(name) { - override def receiveReplay = super.receiveReplay orElse { + override def receiveRecover = super.receiveRecover orElse { case SnapshotOffer(_, events: List[_]) ⇒ probe ! "offered" this.events = events diff --git a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala index b42a6979ec..f62dacb268 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala @@ -86,7 +86,7 @@ object PerformanceSpec { } class EventsourcedTestProcessor(name: String) extends PerformanceTestProcessor(name) with EventsourcedProcessor { - val receiveReplay: Receive = { + val receiveRecover: Receive = { case _ ⇒ if (lastSequenceNr % 1000 == 0) print("r") } @@ -99,7 +99,7 @@ object PerformanceSpec { } class StashingEventsourcedTestProcessor(name: String) extends PerformanceTestProcessor(name) with EventsourcedProcessor { - val receiveReplay: Receive = { + val receiveRecover: Receive = { case _ ⇒ if (lastSequenceNr % 1000 == 0) print("r") } diff --git a/akka-persistence/src/test/scala/akka/persistence/ProcessorChannelSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ProcessorChannelSpec.scala index ffeed8f9b9..25b0279915 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ProcessorChannelSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ProcessorChannelSpec.scala @@ -53,7 +53,7 @@ object ProcessorChannelSpec { channel ! Deliver(Persistent(event), destination.path) } - def receiveReplay: Receive = { + def receiveRecover: Receive = { case event: String ⇒ handleEvent(event) } diff --git a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/EventsourcedExample.java b/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/EventsourcedExample.java index 297a1c9a63..9ab21b9324 100644 --- a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/EventsourcedExample.java +++ b/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/EventsourcedExample.java @@ -70,7 +70,7 @@ class ExampleProcessor extends UntypedEventsourcedProcessor { return state.size(); } - public void onReceiveReplay(Object msg) { + public void onReceiveRecover(Object msg) { if (msg instanceof Evt) { state.update((Evt) msg); } else if (msg instanceof SnapshotOffer) { diff --git a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/EventsourcedExample.scala b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/EventsourcedExample.scala index 19a5eef071..007a8ac925 100644 --- a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/EventsourcedExample.scala +++ b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/EventsourcedExample.scala @@ -26,7 +26,7 @@ class ExampleProcessor extends EventsourcedProcessor { def numEvents = state.size - val receiveReplay: Receive = { + val receiveRecover: Receive = { case evt: Evt => updateState(evt) case SnapshotOffer(_, snapshot: ExampleState) => state = snapshot }