diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index 275b7d8dc1..aaab826af4 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -74,7 +74,7 @@ Processors A processor can be implemented by extending ``AbstractProcessor`` class and implementing the ``receive`` method. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#definition +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#definition Processors only write messages of type ``Persistent`` to the journal, others are received without being persisted. When a processor's ``receive`` method is called with a ``Persistent`` message it can safely assume that this message @@ -85,7 +85,7 @@ so that the sender can re-send the message, if needed. An ``AbstractProcessor`` itself is an ``Actor`` and can therefore be instantiated with ``actorOf``. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#usage +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#usage Recovery -------- @@ -99,28 +99,28 @@ Recovery customization Automated recovery on start can be disabled by overriding ``preStart`` with an empty implementation. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#recover-on-start-disabled +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recover-on-start-disabled In this case, a processor must be recovered explicitly by sending it a ``Recover`` message. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#recover-explicit +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recover-explicit If not overridden, ``preStart`` sends a ``Recover`` message to ``self()``. Applications may also override ``preStart`` to define further ``Recover`` parameters such as an upper sequence number bound, for example. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#recover-on-start-custom +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recover-on-start-custom Upper sequence number bounds can be used to recover a processor to past state instead of current state. Automated recovery on restart can be disabled by overriding ``preRestart`` with an empty implementation. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#recover-on-restart-disabled +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recover-on-restart-disabled Recovery status ^^^^^^^^^^^^^^^ A processor can query its own recovery status via the methods -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#recovery-status +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#recovery-status .. _failure-handling-java-lambda: @@ -130,7 +130,7 @@ Failure handling A persistent message that caused an exception will be received again by a processor after restart. To prevent a replay of that message during recovery it can be deleted. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#deletion +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#deletion Message deletion ---------------- @@ -149,13 +149,13 @@ A processor must have an identifier that doesn't change across different actor i ``String`` representation of processor's path without the address part and can be obtained via the ``processorId`` method. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#processor-id +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#processor-id Applications can customize a processor's id by specifying an actor name during processor creation as shown in section :ref:`processors-java`. This changes that processor's name in its actor hierarchy and hence influences only part of the processor id. To fully customize a processor's id, the ``processorId`` method must be overridden. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#processor-id-override +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#processor-id-override Overriding ``processorId`` is the recommended way to generate stable identifiers. @@ -168,7 +168,7 @@ Views can be implemented by extending the ``AbstractView`` abstract class and im ``processorId`` methods. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#view +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#view The ``processorId`` identifies the processor from which the view receives journaled messages. It is not necessary the referenced processor is actually running. Views read messages from a processor's journal directly. When a @@ -186,7 +186,7 @@ The default update interval of all views of an actor system is configurable: interval for a specific view class or view instance. Applications may also trigger additional updates at any time by sending a view an ``Update`` message. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#view-update +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#view-update If the ``await`` parameter is set to ``true``, messages that follow the ``Update`` request are processed when the incremental message replay, triggered by that update request, completed. If set to ``false`` (default), messages @@ -234,7 +234,7 @@ destinations). The following discusses channels in context of processors but thi Channels prevent redundant delivery of replayed messages to destinations during processor recovery. A replayed message is retained by a channel if its delivery has been confirmed by a destination. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#channel-example +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#channel-example A channel is ready to use once it has been created, no recovery or further activation is needed. A ``Deliver`` request instructs a channel to send a ``Persistent`` message to a destination. A destination is provided as @@ -244,7 +244,7 @@ preserved by a channel, therefore, a destination can reply to the sender of a `` If a processor wants to reply to a ``Persistent`` message sender it should use the ``sender()`` path as channel destination. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#channel-example-reply +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#channel-example-reply Persistent messages delivered by a channel are of type ``ConfirmablePersistent``. ``ConfirmablePersistent`` extends ``Persistent`` by adding the methods ``confirm`` and ``redeliveries`` (see also :ref:`redelivery-java-lambda`). A @@ -264,13 +264,13 @@ This timeout can be specified as ``redeliverInterval`` when creating a channel, maximum number of re-deliveries a channel should attempt for each unconfirmed message. The number of re-delivery attempts can be obtained via the ``redeliveries`` method on ``ConfirmablePersistent``. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#channel-custom-settings +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#channel-custom-settings A channel keeps messages in memory until their successful delivery has been confirmed or the maximum number of re-deliveries is reached. To be notified about messages that have reached the maximum number of re-deliveries, applications can register a listener at channel creation. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#channel-custom-listener +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#channel-custom-listener A listener receives ``RedeliverFailure`` notifications containing all messages that could not be delivered. On receiving a ``RedeliverFailure`` message, an application may decide to restart the sending processor to enforce @@ -313,13 +313,13 @@ Persistent channels are like transient channels but additionally persist message that have been persisted by a persistent channel are deleted when destinations confirm their delivery. A persistent channel can be created with ``PersistentChannel.props`` and configured with a ``PersistentChannelSettings`` object. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#persistent-channel-example +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#persistent-channel-example A persistent channel is useful for delivery of messages to slow destinations or destinations that are unavailable for a long time. It can constrain the number of pending confirmations based on the ``pendingConfirmationsMax`` and ``pendingConfirmationsMin`` parameters of ``PersistentChannelSettings``. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#persistent-channel-watermarks +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#persistent-channel-watermarks It suspends delivery when the number of pending confirmations reaches ``pendingConfirmationsMax`` and resumes delivery again when this number falls below ``pendingConfirmationsMin``. This prevents both, flooding destinations @@ -335,7 +335,7 @@ case of a sender JVM crash is an issue, persistent channels should be used. In t receive replies from the channel whether messages have been successfully persisted or not. This can be enabled by creating the channel with the ``replyPersistent`` configuration parameter set to ``true``: -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#persistent-channel-reply +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#persistent-channel-reply With this setting, either the successfully persisted message is replied to the sender or a ``PersistenceFailure`` message. In case the latter case, the sender should re-send the message. @@ -349,7 +349,7 @@ that channel's name in its actor hierarchy and hence influences only part of the a channel identifier, it should be provided as argument ``Channel.props(String)`` or ``PersistentChannel.props(String)`` (recommended to generate stable identifiers). -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#channel-id-override +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#channel-id-override Persistent messages =================== @@ -362,7 +362,7 @@ must be derived from the current persistent message before sending them via a ch or ``Persistent.create(..., getCurrentPersistentMessage())`` where ``getCurrentPersistentMessage()`` is defined on ``AbstractProcessor``. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#current-message +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#current-message This is necessary for delivery confirmations to work properly. Both ways are equivalent but we recommend using ``p.withPayload(...)`` for clarity. It is not allowed to send a message @@ -387,12 +387,12 @@ in context of processors but this is also applicable to views. Processors can save snapshots of internal state by calling the ``saveSnapshot`` method. If saving of a snapshot succeeds, the processor receives a ``SaveSnapshotSuccess`` message, otherwise a ``SaveSnapshotFailure`` message -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#save-snapshot +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#save-snapshot During recovery, the processor is offered a previously saved snapshot via a ``SnapshotOffer`` message from which it can initialize internal state. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#snapshot-offer +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#snapshot-offer The replayed messages that follow the ``SnapshotOffer`` message, if any, are younger than the offered snapshot. They finally recover the processor to its current (i.e. latest) state. @@ -400,7 +400,7 @@ They finally recover the processor to its current (i.e. latest) state. In general, a processor is only offered a snapshot if that processor has previously saved one or more snapshots and at least one of these snapshots matches the ``SnapshotSelectionCriteria`` that can be specified for recovery. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#snapshot-criteria +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#snapshot-criteria If not specified, they default to ``SnapshotSelectionCriteria.latest()`` which selects the latest (= youngest) snapshot. To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.none()``. A recovery where no @@ -441,7 +441,7 @@ event sourcing as a pattern on top of command sourcing). A processor that extend ``AbstractEventsourcedProcessor`` is defined by implementing ``receiveRecover`` and ``receiveCommand``. This is demonstrated in the following example. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/EventsourcedExample.java#eventsourced-example +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/EventsourcedExample.java#eventsourced-example The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The ``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``. @@ -462,13 +462,20 @@ about successful state changes by publishing events. When persisting events with ``persist`` it is guaranteed that the processor will not receive further commands between the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist`` -calls in context of a single command. The example also shows how to switch between command different command handlers -with ``context().become()`` and ``context().unbecome()``. +calls in context of a single command. The easiest way to run this example yourself is to download `Typesafe Activator `_ -and open the tutorial named `Akka Persistence Samples with Java `_. +and open the tutorial named `Akka Persistence Samples in Java with Lambdas `_. It contains instructions on how to run the ``EventsourcedExample``. +.. note:: + + It's also possible to switch between different command handlers during normal processing and recovery + with ``context().become()`` and ``context().unbecome()``. To get the actor into the same state after + recovery you need to take special care to perform the same state transitions with ``become`` and + ``unbecome`` in the ``receiveRecover`` method as you would have done in the command handler. + + Reliable event delivery ----------------------- @@ -476,7 +483,7 @@ Sending events from an event handler to another actor has at-most-once delivery :ref:`channels-java-lambda` must be used. In this case, also replayed events (received by ``receiveRecover``) must be sent to a channel, as shown in the following example: -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#reliable-event-delivery +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#reliable-event-delivery In larger integration scenarios, channel destinations may be actors that submit received events to an external message broker, for example. After having successfully submitted an event, they should call ``confirm()`` on the @@ -498,7 +505,7 @@ writing the previous batch. Batch writes are never timer-based which keeps laten Applications that want to have more explicit control over batch writes and batch sizes can send processors ``PersistentBatch`` messages. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#batch-write +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#batch-write ``Persistent`` messages contained in a ``PersistentBatch`` are always written atomically, even if the batch size is greater than ``max-message-batch-size``. Also, a ``PersistentBatch`` is written isolated from other batches. @@ -522,7 +529,7 @@ snapshots as individual files to the local filesystem (see :ref:`local-snapshot- provide their own plugins by implementing a plugin API and activate them by configuration. Plugin development requires the following imports: -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistencePluginDocTest.java#plugin-imports +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java#plugin-imports Journal plugin API ------------------ @@ -611,7 +618,7 @@ plugin. This plugin must be initialized by injecting the (remote) ``SharedLeveldbStore`` actor reference. Injection is done by calling the ``SharedLeveldbJournal.setStore`` method with the actor reference as argument. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistencePluginDocTest.java#shared-store-usage +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java#shared-store-usage Internal journal commands (sent by processors) are buffered until injection completes. Injection is idempotent i.e. only the first injection is used. diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 0266696b1c..1488c91c04 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -487,13 +487,20 @@ about successful state changes by publishing events. When persisting events with ``persist`` it is guaranteed that the processor will not receive further commands between the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist`` -calls in context of a single command. The example also shows how to switch between command different command handlers -with ``getContext().become()`` and ``getContext().unbecome()``. +calls in context of a single command. The easiest way to run this example yourself is to download `Typesafe Activator `_ and open the tutorial named `Akka Persistence Samples with Java `_. It contains instructions on how to run the ``EventsourcedExample``. +.. note:: + + It's also possible to switch between different command handlers during normal processing and recovery + with ``getContext().become()`` and ``getContext().unbecome()``. To get the actor into the same state after + recovery you need to take special care to perform the same state transitions with ``become`` and + ``unbecome`` in the ``receiveRecover`` method as you would have done in the command handler. + + Reliable event delivery ----------------------- diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 4ec3e89b75..8469bce878 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -497,13 +497,20 @@ about successful state changes by publishing events. When persisting events with ``persist`` it is guaranteed that the processor will not receive further commands between the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist`` -calls in context of a single command. The example also shows how to switch between command different command handlers -with ``context.become()`` and ``context.unbecome()``. +calls in context of a single command. The easiest way to run this example yourself is to download `Typesafe Activator `_ and open the tutorial named `Akka Persistence Samples with Scala `_. It contains instructions on how to run the ``EventsourcedExample``. +.. note:: + + It's also possible to switch between different command handlers during normal processing and recovery + with ``context.become()`` and ``context.unbecome()``. To get the actor into the same state after + recovery you need to take special care to perform the same state transitions with ``become`` and + ``unbecome`` in the ``receiveRecover`` method as you would have done in the command handler. + + Reliable event delivery ----------------------- diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 6e55bde0db..3b00ae4732 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -23,10 +23,14 @@ private[persistence] trait Eventsourced extends Processor { * `processingCommands` */ private val recovering: State = new State { + // cache the recoveryBehavior since it's a def for binary compatibility in 2.3.x + private val _recoveryBehavior: Receive = recoveryBehavior + override def toString: String = "recovering" def aroundReceive(receive: Receive, message: Any) { - Eventsourced.super.aroundReceive(receive, message) + // Since we are recovering we can ignore the receive behavior from the stack + Eventsourced.super.aroundReceive(_recoveryBehavior, message) message match { case _: ReadHighestSequenceNrSuccess | _: ReadHighestSequenceNrFailure ⇒ currentState = processingCommands @@ -93,6 +97,21 @@ private[persistence] trait Eventsourced extends Processor { } } + /** + * INTERNAL API. + * + * This is a def and not a val because of binary compatibility in 2.3.x. + * It is cached where it is used. + */ + private def recoveryBehavior: Receive = { + case Persistent(payload, _) if recoveryRunning && receiveRecover.isDefinedAt(payload) ⇒ + receiveRecover(payload) + case s: SnapshotOffer if receiveRecover.isDefinedAt(s) ⇒ + receiveRecover(s) + case f: RecoveryFailure if receiveRecover.isDefinedAt(f) ⇒ + receiveRecover(f) + } + private var persistInvocations: List[(Any, Any ⇒ Unit)] = Nil private var persistentEventBatch: List[PersistentRepr] = Nil @@ -190,14 +209,10 @@ private[persistence] trait Eventsourced extends Processor { /** * INTERNAL API. + * + * Only here for binary compatibility in 2.3.x. */ - protected[persistence] val initialBehavior: Receive = { - case Persistent(payload, _) if receiveRecover.isDefinedAt(payload) && recoveryRunning ⇒ - receiveRecover(payload) - case s: SnapshotOffer if receiveRecover.isDefinedAt(s) ⇒ - receiveRecover(s) - case f: RecoveryFailure if receiveRecover.isDefinedAt(f) ⇒ - receiveRecover(f) + protected[persistence] val initialBehavior: Receive = recoveryBehavior orElse { case msg if receiveCommand.isDefinedAt(msg) ⇒ receiveCommand(msg) } @@ -207,14 +222,14 @@ private[persistence] trait Eventsourced extends Processor { * An event sourced processor. */ trait EventsourcedProcessor extends Processor with Eventsourced { - final def receive = initialBehavior + final def receive = receiveCommand } /** * Java API: an event sourced processor. */ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Eventsourced { - final def onReceive(message: Any) = initialBehavior(message) + final def onReceive(message: Any) = onReceiveCommand(message) final def receiveRecover: Receive = { case msg ⇒ onReceiveRecover(msg) diff --git a/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala b/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala index 96dbee3939..e844f565f2 100644 --- a/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala @@ -137,6 +137,23 @@ object EventsourcedSpec { } } + class SnapshottingBecomingEventsourcedProcessor(name: String, probe: ActorRef) extends SnapshottingEventsourcedProcessor(name, probe) { + val becomingRecover: Receive = { + case msg: SnapshotOffer ⇒ + context.become(becomingCommand) + // sending ourself a normal message here also tests + // that we stash them until recovery is complete + self ! "It's changing me" + super.receiveRecover(msg) + } + + override def receiveRecover = becomingRecover.orElse(super.receiveRecover) + + val becomingCommand: Receive = receiveCommand orElse { + case "It's changing me" ⇒ probe ! "I am becoming" + } + } + class ReplyInEventHandlerProcessor(name: String) extends ExampleProcessor(name) { val receiveCommand: Receive = { case Cmd("a") ⇒ persist(Evt("a"))(evt ⇒ sender ! evt.data) @@ -297,6 +314,21 @@ abstract class EventsourcedSpec(config: Config) extends AkkaSpec(config) with Pe processor2 ! GetState expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42")) } + "support context.become during recovery" in { + val processor1 = system.actorOf(Props(classOf[SnapshottingEventsourcedProcessor], name, testActor)) + processor1 ! Cmd("b") + processor1 ! "snap" + processor1 ! Cmd("c") + expectMsg("saved") + processor1 ! GetState + expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42")) + + val processor2 = system.actorOf(Props(classOf[SnapshottingBecomingEventsourcedProcessor], name, testActor)) + expectMsg("offered") + expectMsg("I am becoming") + processor2 ! GetState + expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42")) + } "be able to reply within an event handler" in { val processor = namedProcessor[ReplyInEventHandlerProcessor] processor ! Cmd("a") diff --git a/akka-samples/akka-sample-persistence-java-lambda/activator.properties b/akka-samples/akka-sample-persistence-java-lambda/activator.properties new file mode 100644 index 0000000000..917d565c6e --- /dev/null +++ b/akka-samples/akka-sample-persistence-java-lambda/activator.properties @@ -0,0 +1,7 @@ +name=akka-sample-persistence-java-lambda +title=Akka Persistence Samples in Java with Lambdas +description=Akka Persistence Samples in Java with Lambdas +tags=akka,java,java8,sample,persistence,sample +authorName=Akka Team +authorLink=http://akka.io/ +sourceLink=https://github.com/akka/akka diff --git a/akka-samples/akka-sample-persistence-java-lambda/build.sbt b/akka-samples/akka-sample-persistence-java-lambda/build.sbt new file mode 100644 index 0000000000..1117440e4e --- /dev/null +++ b/akka-samples/akka-sample-persistence-java-lambda/build.sbt @@ -0,0 +1,12 @@ +name := "akka-sample-persistence-java-lambda" + +version := "1.0" + +scalaVersion := "2.10.3" + +javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint") + +libraryDependencies ++= Seq( + "com.typesafe.akka" %% "akka-persistence-experimental" % "2.3-SNAPSHOT" +) + diff --git a/akka-samples/akka-sample-persistence-java8/pom.xml b/akka-samples/akka-sample-persistence-java-lambda/pom.xml similarity index 100% rename from akka-samples/akka-sample-persistence-java8/pom.xml rename to akka-samples/akka-sample-persistence-java-lambda/pom.xml diff --git a/akka-samples/akka-sample-persistence-java-lambda/project/build.properties b/akka-samples/akka-sample-persistence-java-lambda/project/build.properties new file mode 100644 index 0000000000..37b489cb6e --- /dev/null +++ b/akka-samples/akka-sample-persistence-java-lambda/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.1 diff --git a/akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java similarity index 100% rename from akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java rename to akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java diff --git a/akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistencePluginDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java similarity index 100% rename from akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistencePluginDocTest.java rename to akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java diff --git a/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/ConversationRecoveryExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ConversationRecoveryExample.java similarity index 100% rename from akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/ConversationRecoveryExample.java rename to akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ConversationRecoveryExample.java diff --git a/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/EventsourcedExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/EventsourcedExample.java similarity index 88% rename from akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/EventsourcedExample.java rename to akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/EventsourcedExample.java index 48e17d1446..96351c3c14 100644 --- a/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/EventsourcedExample.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/EventsourcedExample.java @@ -95,23 +95,12 @@ class ExampleProcessor extends AbstractEventsourcedProcessor { state.update(evt); if (evt.equals(evt2)) { context().system().eventStream().publish(evt); - if (data.equals("foo")) { context().become(och, true); } } }); }). match(String.class, s -> s.equals("snap"), s -> saveSnapshot(state.copy())). match(String.class, s -> s.equals("print"), s -> System.out.println(state)).build(); } - - PartialFunction och = ReceiveBuilder. - match(Cmd.class, cmd -> cmd.getData().equals("bar"), cmd -> { - persist(new Evt("bar-" + getNumEvents()), event -> { - state.update(event); - context().unbecome(); - }); - unstashAll(); - }). - matchAny(o -> stash()).build(); } //#eventsourced-example diff --git a/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/ProcessorChannelExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ProcessorChannelExample.java similarity index 100% rename from akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/ProcessorChannelExample.java rename to akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ProcessorChannelExample.java diff --git a/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/ProcessorFailureExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ProcessorFailureExample.java similarity index 100% rename from akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/ProcessorFailureExample.java rename to akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ProcessorFailureExample.java diff --git a/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/SnapshotExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/SnapshotExample.java similarity index 100% rename from akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/SnapshotExample.java rename to akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/SnapshotExample.java diff --git a/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/ViewExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java similarity index 100% rename from akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/ViewExample.java rename to akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java diff --git a/akka-samples/akka-sample-persistence-java8/src/main/resources/application.conf b/akka-samples/akka-sample-persistence-java-lambda/src/main/resources/application.conf similarity index 100% rename from akka-samples/akka-sample-persistence-java8/src/main/resources/application.conf rename to akka-samples/akka-sample-persistence-java-lambda/src/main/resources/application.conf diff --git a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/EventsourcedExample.java b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/EventsourcedExample.java index f9d92ae4bc..5f3e51d00b 100644 --- a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/EventsourcedExample.java +++ b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/EventsourcedExample.java @@ -88,7 +88,6 @@ class ExampleProcessor extends UntypedEventsourcedProcessor { state.update(evt); if (evt.equals(evt2)) { getContext().system().eventStream().publish(evt); - if (data.equals("foo")) getContext().become(otherCommandHandler); } } }); @@ -100,22 +99,6 @@ class ExampleProcessor extends UntypedEventsourcedProcessor { System.out.println(state); } } - - private Procedure otherCommandHandler = new Procedure() { - public void apply(Object msg) throws Exception { - if (msg instanceof Cmd && ((Cmd)msg).getData().equals("bar")) { - persist(new Evt("bar-" + getNumEvents()), new Procedure() { - public void apply(Evt event) throws Exception { - state.update(event); - getContext().unbecome(); - } - }); - unstashAll(); - } else { - stash(); - } - } - }; } //#eventsourced-example diff --git a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/EventsourcedExample.scala b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/EventsourcedExample.scala index 1064deab86..180210af9c 100644 --- a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/EventsourcedExample.scala +++ b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/EventsourcedExample.scala @@ -33,21 +33,11 @@ class ExampleProcessor extends EventsourcedProcessor { persist(Evt(s"${data}-${numEvents + 1}")) { event => updateState(event) context.system.eventStream.publish(event) - if (data == "foo") context.become(otherCommandHandler) } case "snap" => saveSnapshot(state) case "print" => println(state) } - val otherCommandHandler: Receive = { - case Cmd("bar") => - persist(Evt(s"bar-${numEvents}")) { event => - updateState(event) - context.unbecome() - } - unstashAll() - case other => stash() - } } //#eventsourced-example @@ -57,7 +47,7 @@ object EventsourcedExample extends App { val processor = system.actorOf(Props[ExampleProcessor], "processor-4-scala") processor ! Cmd("foo") - processor ! Cmd("baz") // will be stashed + processor ! Cmd("baz") processor ! Cmd("bar") processor ! "snap" processor ! Cmd("buzz") diff --git a/scripts/build/extra-build-steps.sh b/scripts/build/extra-build-steps.sh index 56da1ab8c0..d633e35203 100755 --- a/scripts/build/extra-build-steps.sh +++ b/scripts/build/extra-build-steps.sh @@ -100,6 +100,6 @@ mvncleantest "$java8_home" "akka-samples/akka-docs-java-lambda" mvncleantest "$java8_home" "akka-samples/akka-sample-fsm-java-lambda" -mvncleantest "$java8_home" "akka-samples/akka-sample-persistence-java8" +mvncleantest "$java8_home" "akka-samples/akka-sample-persistence-java-lambda" mvncleantest "$java8_home" "akka-samples/akka-sample-supervision-java-lambda"