diff --git a/akka-docs/rst/general/message-delivery-guarantees.rst b/akka-docs/rst/general/message-delivery-guarantees.rst index bbfc70fd1d..f0b14bcce8 100644 --- a/akka-docs/rst/general/message-delivery-guarantees.rst +++ b/akka-docs/rst/general/message-delivery-guarantees.rst @@ -295,19 +295,8 @@ components may consume the event stream as a means to replicate the component’ state on a different continent or to react to changes). If the component’s state is lost—due to a machine failure or by being pushed out of a cache—it can easily be reconstructed by replaying the event stream (usually employing -snapshots to speed up the process). Read a lot more about `Event Sourcing`_. - -Martin Krasser has written an implementation of event sourcing principles on -top of Akka called `eventsourced`_, including support for guaranteed delivery -semantics as described in the previous section. - -A successor of `eventsourced` is now part of Akka (see :ref:`persistence`) which -is a general solution for actor state persistence. It journals messages before -they are received by an actor and can be used to implement both event sourcing -and command sourcing. - -.. _Event Sourcing: http://martinfowler.com/eaaDev/EventSourcing.html -.. _eventsourced: https://github.com/eligosource/eventsourced +snapshots to speed up the process). :ref:`event-sourcing` is supported by +Akka (see :ref:`persistence`). Mailbox with Explicit Acknowledgement ------------------------------------- diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 8c7365bd44..17e929f1f7 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -57,12 +57,6 @@ Architecture * *Snapshot store*: A snapshot store persists snapshots of a processor's internal state. Snapshots are used for optimizing recovery times. The storage backend of a snapshot store is pluggable. -Use cases -========= - -* TODO: describe command sourcing -* TODO: describe event sourcing - Configuration ============= @@ -271,6 +265,59 @@ If not specified, they default to ``SnapshotSelectionCriteria.latest()`` which s To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.none()``. A recovery where no saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages. +Event sourcing +============== + +In all the examples so far, messages that change a processor's state have been sent as ``Persistent`` messages +by an application, so that they can be replayed during recovery. From this point of view, the journal acts as +a write-ahead-log for whatever ``Persistent`` messages a processor receives. This is also known as *command +sourcing*. Commands, however, may fail and some applications cannot tolerate command failures during recovery. + +For these applications `Event Sourcing`_ is a better choice. Applied to Akka persistence, the basic idea behind +event sourcing is quite simple. A processor receives a (non-persistent) command which is first validated if it +can be applied to the current state. Here, validation can mean anything, from simple inspection of a command +message's fields up to a conversation with several external services, for example. If validation succeeds, events +are generated from the command, representing the effect of the command. These events are then persisted and, after +successful persistence, used to change a processor's state. When the processor needs to be recovered, only the +persisted events are replayed of which we know that they can be successfully applied. In other words, events +cannot fail when being replayed to a processor, in contrast to commands. Eventsourced processors may of course +also process commands that do not change application state, such as query commands, for example. + +.. _Event Sourcing: http://martinfowler.com/eaaDev/EventSourcing.html + +Akka persistence supports event sourcing with the abstract ``UntypedEventsourcedProcessor`` class (which implements +event sourcing as a pattern on top of command sourcing). A processor that extends this abstract class does not handle +``Persistent`` messages directly but uses the ``persist`` method to persist and handle events. The behavior of an +``UntypedEventsourcedProcessor`` is defined by implementing ``onReceiveReplay`` and ``onReceiveCommand``. This is +best explained with an example (which is also part of ``akka-sample-persistence``). + +.. includecode:: ../../../akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/EventsourcedExample.java#eventsourced-example + +The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The +``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``. + +The processor's ``onReceiveReplay`` method defines how ``state`` is updated during recovery by handling ``Evt`` +and ``SnapshotOffer`` messages. The processor's ``onReceiveCommand`` method is a command handler. In this example, +a command is handled by generating two events which are then persisted and handled. Events are persisted by calling +``persist`` with an event (or a sequence of events) as first argument and an event handler as second argument. + +The ``persist`` method persists events asynchronously and the event handler is executed for successfully persisted +events. Successfully persisted events are internally sent back to the processor as separate messages which trigger +the event handler execution. An event handler may therefore close over processor state and mutate it. The sender +of a persisted event is the sender of the corresponding command. This allows event handlers to reply to the sender +of a command (not shown). + +The main responsibility of an event handler is changing processor state using event data and notifying others +about successful state changes by publishing events. + +When persisting events with ``persist`` it is guaranteed that the processor will not receive new commands between +the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist`` +calls in context of a single command. + +The example also demonstrates how to change the processor's default behavior, defined by ``onReceiveCommand``, to +another behavior, defined by ``otherCommandHandler``, and back using ``getContext().become()`` and +``getContext().unbecome()``. See also the API docs of ``persist`` for further details. + Storage plugins =============== diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 7dad84328e..20fc1318db 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -53,12 +53,6 @@ Architecture * *Snapshot store*: A snapshot store persists snapshots of a processor's internal state. Snapshots are used for optimizing recovery times. The storage backend of a snapshot store is pluggable. -Use cases -========= - -* TODO: describe command sourcing -* TODO: describe event sourcing - Configuration ============= @@ -282,6 +276,61 @@ If not specified, they default to ``SnapshotSelectionCriteria.Latest`` which sel To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.None``. A recovery where no saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages. +.. _event-sourcing: + +Event sourcing +============== + +In all the examples so far, messages that change a processor's state have been sent as ``Persistent`` messages +by an application, so that they can be replayed during recovery. From this point of view, the journal acts as +a write-ahead-log for whatever ``Persistent`` messages a processor receives. This is also known as *command +sourcing*. Commands, however, may fail and some applications cannot tolerate command failures during recovery. + +For these applications `Event Sourcing`_ is a better choice. Applied to Akka persistence, the basic idea behind +event sourcing is quite simple. A processor receives a (non-persistent) command which is first validated if it +can be applied to the current state. Here, validation can mean anything, from simple inspection of a command +message's fields up to a conversation with several external services, for example. If validation succeeds, events +are generated from the command, representing the effect of the command. These events are then persisted and, after +successful persistence, used to change a processor's state. When the processor needs to be recovered, only the +persisted events are replayed of which we know that they can be successfully applied. In other words, events +cannot fail when being replayed to a processor, in contrast to commands. Eventsourced processors may of course +also process commands that do not change application state, such as query commands, for example. + +.. _Event Sourcing: http://martinfowler.com/eaaDev/EventSourcing.html + +Akka persistence supports event sourcing with the ``EventsourcedProcessor`` trait (which implements event sourcing +as a pattern on top of command sourcing). A processor that extends this trait does not handle ``Persistent`` messages +directly but uses the ``persist`` method to persist and handle events. The behavior of an ``EventsourcedProcessor`` +is defined by implementing ``receiveReplay`` and ``receiveCommand``. This is best explained with an example (which +is also part of ``akka-sample-persistence``). + +.. includecode:: ../../../akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/EventsourcedExample.scala#eventsourced-example + +The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The +``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``. + +The processor's ``receiveReplay`` method defines how ``state`` is updated during recovery by handling ``Evt`` +and ``SnapshotOffer`` messages. The processor's ``receiveCommand`` method is a command handler. In this example, +a command is handled by generating two events which are then persisted and handled. Events are persisted by calling +``persist`` with an event (or a sequence of events) as first argument and an event handler as second argument. + +The ``persist`` method persists events asynchronously and the event handler is executed for successfully persisted +events. Successfully persisted events are internally sent back to the processor as separate messages which trigger +the event handler execution. An event handler may therefore close over processor state and mutate it. The sender +of a persisted event is the sender of the corresponding command. This allows event handlers to reply to the sender +of a command (not shown). + +The main responsibility of an event handler is changing processor state using event data and notifying others +about successful state changes by publishing events. + +When persisting events with ``persist`` it is guaranteed that the processor will not receive new commands between +the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist`` +calls in context of a single command. + +The example also demonstrates how to change the processor's default behavior, defined by ``receiveCommand``, to +another behavior, defined by ``otherCommandHandler``, and back using ``context.become()`` and ``context.unbecome()``. +See also the API docs of ``persist`` for further details. + Storage plugins =============== diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala new file mode 100644 index 0000000000..5257c52fd2 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -0,0 +1,248 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.persistence + +import java.lang.{ Iterable ⇒ JIterable } + +import scala.collection.immutable + +import akka.japi.{ Procedure, Util } +import akka.persistence.JournalProtocol._ + +/** + * INTERNAL API. + * + * Event sourcing mixin for a [[Processor]]. + */ +private[persistence] trait Eventsourced extends Processor { + private trait State { + def aroundReceive(receive: Receive, message: Any): Unit + } + + /** + * Command processing state. If event persistence is pending after processing a + * command, event persistence is triggered and state changes to `persistingEvents`. + */ + private val processingCommands: State = new State { + def aroundReceive(receive: Receive, message: Any) = message match { + case m if (persistInvocations.isEmpty) ⇒ { + Eventsourced.super.aroundReceive(receive, m) + if (!persistInvocations.isEmpty) { + persistInvocations = persistInvocations.reverse + persistCandidates = persistCandidates.reverse + persistCandidates.foreach(self forward Persistent(_)) + currentState = persistingEvents + } + } + } + } + + /** + * Event persisting state. Remains until pending events are persisted and then changes + * state to `processingCommands`. Only events to be persisted are processed. All other + * messages are stashed internally. + */ + private val persistingEvents: State = new State { + def aroundReceive(receive: Receive, message: Any) = message match { + case p: PersistentImpl if identical(p.payload, persistCandidates.head) ⇒ { + Eventsourced.super.aroundReceive(receive, message) + persistCandidates = persistCandidates.tail + } + case WriteSuccess(p) if identical(p.payload, persistInvocations.head._1) ⇒ { + withCurrentPersistent(p)(p ⇒ persistInvocations.head._2(p.payload)) + onWriteComplete() + } + case e @ WriteFailure(p, _) if identical(p.payload, persistInvocations.head._1) ⇒ { + Eventsourced.super.aroundReceive(receive, message) // stops actor by default + onWriteComplete() + } + case other ⇒ processorStash.stash() + } + + def onWriteComplete(): Unit = { + persistInvocations = persistInvocations.tail + if (persistInvocations.isEmpty) { + currentState = processingCommands + processorStash.unstashAll() + } + } + + def identical(a: Any, b: Any): Boolean = + a.asInstanceOf[AnyRef] eq b.asInstanceOf[AnyRef] + } + + private var persistInvocations: List[(Any, Any ⇒ Unit)] = Nil + private var persistCandidates: List[Any] = Nil + + private var currentState: State = processingCommands + private val processorStash = createProcessorStash + + /** + * Asynchronously persists `event`. On successful persistence, `handler` is called with the + * persisted event. It is guaranteed that no new commands will be received by a processor + * between a call to `persist` and the execution of its `handler`. This also holds for + * multiple `persist` calls per received command. Internally, this is achieved by stashing new + * commands and unstashing them when the `event` has been persisted and handled. The stash used + * for that is an internal stash which doesn't interfere with the user stash inherited from + * [[Processor]]. + * + * An event `handler` may close over processor state and modify it. The `sender` of a persisted + * event is the sender of the corresponding command. This means that one can reply to a command + * sender within an event `handler`. + * + * Within an event handler, applications usually update processor state using persisted event + * data, notify listeners and reply to command senders. + * + * If persistence of an event fails, the processor will be stopped. This can be customized by + * handling [[PersistenceFailure]] in [[receiveCommand]]. + * + * @param event event to be persisted. + * @param handler handler for each persisted `event` + */ + final def persist[A](event: A)(handler: A ⇒ Unit): Unit = { + persistInvocations = (event, handler.asInstanceOf[Any ⇒ Unit]) :: persistInvocations + persistCandidates = event :: persistCandidates + } + + /** + * Asynchronously persists `events` in specified order. This is equivalent to calling + * `persist[A](event: A)(handler: A => Unit)` multiple times with the same `handler`. + * + * @param events events to be persisted. + * @param handler handler for each persisted `events` + */ + final def persist[A](events: immutable.Seq[A])(handler: A ⇒ Unit): Unit = + events.foreach(persist(_)(handler)) + + /** + * Replay handler that receives persisted events during recovery. If a state snapshot + * has been captured and saved, this handler will receive a [[SnapshotOffer]] message + * followed by events that are younger than the offered snapshot. + * + * This handler must not have side-effects other than changing processor state i.e. it + * should not perform actions that may fail, such as interacting with external services, + * for example. + * + * @see [[Recover]] + */ + def receiveReplay: Receive + + /** + * Command handler. Typically validates commands against current state (and/or by + * communication with other actors). On successful validation, one or more events are + * derived from a command and these events are then persisted by calling `persist`. + * Commands sent to event sourced processors should not be [[Persistent]] messages. + */ + def receiveCommand: Receive + + /** + * INTERNAL API. + */ + final override protected[akka] def aroundReceive(receive: Receive, message: Any) { + currentState.aroundReceive(receive, message) + } + + /** + * INTERNAL API. + */ + protected[persistence] val initialBehavior: Receive = { + case Persistent(payload, _) if receiveReplay.isDefinedAt(payload) && recoveryRunning ⇒ + receiveReplay(payload) + case s: SnapshotOffer if receiveReplay.isDefinedAt(s) ⇒ + receiveReplay(s) + case f: RecoveryFailure if receiveReplay.isDefinedAt(f) ⇒ + receiveReplay(f) + case msg if receiveCommand.isDefinedAt(msg) ⇒ + receiveCommand(msg) + } +} + +/** + * An event sourced processor. + */ +trait EventsourcedProcessor extends Processor with Eventsourced { + final def receive = initialBehavior +} + +/** + * Java API. + * + * An event sourced processor. + */ +abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Eventsourced { + final def onReceive(message: Any) = initialBehavior(message) + + final def receiveReplay: Receive = { + case msg ⇒ onReceiveReplay(msg) + } + + final def receiveCommand: Receive = { + case msg ⇒ onReceiveCommand(msg) + } + + /** + * Java API. + * + * Asynchronously persists `event`. On successful persistence, `handler` is called with the + * persisted event. It is guaranteed that no new commands will be received by a processor + * between a call to `persist` and the execution of its `handler`. This also holds for + * multiple `persist` calls per received command. Internally, this is achieved by stashing new + * commands and unstashing them when the `event` has been persisted and handled. The stash used + * for that is an internal stash which doesn't interfere with the user stash inherited from + * [[UntypedProcessor]]. + * + * An event `handler` may close over processor state and modify it. The `getSender()` of a persisted + * event is the sender of the corresponding command. This means that one can reply to a command + * sender within an event `handler`. + * + * Within an event handler, applications usually update processor state using persisted event + * data, notify listeners and reply to command senders. + * + * If persistence of an event fails, the processor will be stopped. This can be customized by + * handling [[PersistenceFailure]] in [[onReceiveCommand]]. + * + * @param event event to be persisted. + * @param handler handler for each persisted `event` + */ + final def persist[A](event: A, handler: Procedure[A]): Unit = + persist(event)(event ⇒ handler(event)) + + /** + * Java API. + * + * Asynchronously persists `events` in specified order. This is equivalent to calling + * `persist[A](event: A, handler: Procedure[A])` multiple times with the same `handler`. + * + * @param events events to be persisted. + * @param handler handler for each persisted `events` + */ + final def persist[A](events: JIterable[A], handler: Procedure[A]): Unit = + persist(Util.immutableSeq(events))(event ⇒ handler(event)) + + /** + * Java API. + * + * Replay handler that receives persisted events during recovery. If a state snapshot + * has been captured and saved, this handler will receive a [[SnapshotOffer]] message + * followed by events that are younger than the offered snapshot. + * + * This handler must not have side-effects other than changing processor state i.e. it + * should not perform actions that may fail, such as interacting with external services, + * for example. + * + * @see [[Recover]] + */ + def onReceiveReplay(msg: Any): Unit + + /** + * Java API. + * + * Command handler. Typically validates commands against current state (and/or by + * communication with other actors). On successful validation, one or more events are + * derived from a command and these events are then persisted by calling `persist`. + * Commands sent to event sourced processors should not be [[Persistent]] messages. + */ + def onReceiveCommand(msg: Any): Unit +} \ No newline at end of file diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index 5aa94d741d..7d8b33d95f 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -136,7 +136,8 @@ object PersistentImpl { } /** - * Received by a processor when a journal failed to write a [[Persistent]] message. + * Sent to a [[Processor]] when a journal failed to write a [[Persistent]] message. If + * not handled, an `akka.actor.ActorKilledException` is thrown by that processor. * * @param payload payload of the persistent message. * @param sequenceNr sequence number of the persistent message. diff --git a/akka-persistence/src/main/scala/akka/persistence/Processor.scala b/akka-persistence/src/main/scala/akka/persistence/Processor.scala index 5592b68df4..0e1244075a 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Processor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Processor.scala @@ -4,16 +4,9 @@ package akka.persistence -import akka.AkkaException import akka.actor._ import akka.dispatch._ -/** - * Thrown by a [[Processor]] if a journal failed to replay all requested messages. - */ -@SerialVersionUID(1L) -case class ReplayFailureException(message: String, cause: Throwable) extends AkkaException(message, cause) - /** * An actor that persists (journals) messages of type [[Persistent]]. Messages of other types are not persisted. * @@ -76,15 +69,8 @@ trait Processor extends Actor with Stash { protected def process(receive: Actor.Receive, message: Any) = receive.applyOrElse(message, unhandled) - protected def processPersistent(receive: Actor.Receive, persistent: Persistent) = try { - _currentPersistent = persistent - updateLastSequenceNr(persistent) - receive.applyOrElse(persistent, unhandled) - } finally _currentPersistent = null - - protected def updateLastSequenceNr(persistent: Persistent) { - if (persistent.sequenceNr > _lastSequenceNr) _lastSequenceNr = persistent.sequenceNr - } + protected def processPersistent(receive: Actor.Receive, persistent: Persistent) = + withCurrentPersistent(persistent)(receive.applyOrElse(_, unhandled)) } /** @@ -98,7 +84,7 @@ trait Processor extends Actor with Stash { _currentState = recoveryStarted snapshotStore ! LoadSnapshot(processorId, fromSnap, toSnr) } - case _ ⇒ stashInternal() + case _ ⇒ processorStash.stash() } } @@ -123,11 +109,15 @@ trait Processor extends Actor with Stash { case ReplaySuccess(maxSnr) ⇒ { _currentState = recoverySucceeded _sequenceNr = maxSnr - unstashAllInternal() + processorStash.unstashAll() } case ReplayFailure(cause) ⇒ { - val errorMsg = s"Replay failure by journal (processor id = [${processorId}])" - throw new ReplayFailureException(errorMsg, cause) + val notification = RecoveryFailure(cause) + if (receive.isDefinedAt(notification)) process(receive, notification) + else { + val errorMsg = s"Replay failure by journal (processor id = [${processorId}])" + throw new RecoveryFailureException(errorMsg, cause) + } } case Replayed(p) ⇒ try { processPersistent(receive, p) } catch { case t: Throwable ⇒ { @@ -137,7 +127,7 @@ trait Processor extends Actor with Stash { } } case r: Recover ⇒ // ignore - case _ ⇒ stashInternal() + case _ ⇒ processorStash.stash() } } @@ -182,7 +172,7 @@ trait Processor extends Actor with Stash { } case Replayed(p) ⇒ updateLastSequenceNr(p) case r: Recover ⇒ // ignore - case _ ⇒ stashInternal() + case _ ⇒ processorStash.stash() } } @@ -263,7 +253,23 @@ trait Processor extends Actor with Stash { /** * INTERNAL API. */ - final override protected[akka] def aroundReceive(receive: Actor.Receive, message: Any): Unit = { + protected[persistence] def withCurrentPersistent(persistent: Persistent)(body: Persistent ⇒ Unit): Unit = try { + _currentPersistent = persistent + updateLastSequenceNr(persistent) + body(persistent) + } finally _currentPersistent = null + + /** + * INTERNAL API. + */ + protected[persistence] def updateLastSequenceNr(persistent: Persistent) { + if (persistent.sequenceNr > _lastSequenceNr) _lastSequenceNr = persistent.sequenceNr + } + + /** + * INTERNAL API. + */ + override protected[akka] def aroundReceive(receive: Actor.Receive, message: Any): Unit = { _currentState.aroundReceive(receive, message) } @@ -287,7 +293,7 @@ trait Processor extends Actor with Stash { final override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = { try { unstashAll(unstashFilterPredicate) - unstashAllInternal() + processorStash.unstashAll() } finally { message match { case Some(WriteSuccess(m)) ⇒ preRestartDefault(reason, Some(m)) @@ -335,27 +341,44 @@ trait Processor extends Actor with Stash { // Processor-internal stash // ----------------------------------------------------- - private def unstashFilterPredicate: Any ⇒ Boolean = { + private val unstashFilterPredicate: Any ⇒ Boolean = { case _: WriteSuccess ⇒ false case _: Replayed ⇒ false case _ ⇒ true } - private var processorStash = Vector.empty[Envelope] - - private def stashInternal(): Unit = { - processorStash :+= currentEnvelope - } - - private def unstashAllInternal(): Unit = try { - val i = processorStash.reverseIterator - while (i.hasNext) mailbox.enqueueFirst(self, i.next()) - } finally { - processorStash = Vector.empty[Envelope] - } + private val processorStash = + createProcessorStash private def currentEnvelope: Envelope = context.asInstanceOf[ActorCell].currentMessage + + /** + * INTERNAL API. + */ + private[persistence] def createProcessorStash = new ProcessorStash { + var theStash = Vector.empty[Envelope] + + def stash(): Unit = + theStash :+= currentEnvelope + + def unstashAll(): Unit = try { + val i = theStash.reverseIterator + while (i.hasNext) mailbox.enqueueFirst(self, i.next()) + } finally { + theStash = Vector.empty[Envelope] + } + } +} + +/** + * INTERNAL API. + * + * Processor specific stash used internally to avoid interference with user stash. + */ +private[persistence] trait ProcessorStash { + def stash() + def unstashAll() } /** @@ -421,3 +444,4 @@ abstract class UntypedProcessor extends UntypedActor with Processor { */ def getCurrentPersistentMessage = currentPersistentMessage.getOrElse(null) } + diff --git a/akka-persistence/src/main/scala/akka/persistence/Recover.scala b/akka-persistence/src/main/scala/akka/persistence/Recover.scala index 08b88878d1..5e0efa4bb1 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Recover.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Recover.scala @@ -4,6 +4,8 @@ package akka.persistence +import akka.AkkaException + /** * Instructs a processor to recover itself. Recovery will start from a snapshot if the processor has * previously saved one or more snapshots and at least one of these snapshots matches the specified @@ -53,3 +55,17 @@ object Recover { def create(fromSnapshot: SnapshotSelectionCriteria, toSequenceNr: Long) = Recover(fromSnapshot, toSequenceNr) } + +/** + * Sent to a [[Processor]] after failed recovery. If not handled, a + * [[RecoveryFailureException]] is thrown by that processor. + */ +@SerialVersionUID(1L) +case class RecoveryFailure(cause: Throwable) + +/** + * Thrown by a [[Processor]] if a journal failed to replay all requested messages. + */ +@SerialVersionUID(1L) +case class RecoveryFailureException(message: String, cause: Throwable) extends AkkaException(message, cause) + diff --git a/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala b/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala index 4a5b7ece69..9b1f4713ba 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala @@ -16,7 +16,7 @@ case class SnapshotMetadata(processorId: String, sequenceNr: Long, timestamp: Lo //#snapshot-metadata /** - * Notification of a snapshot saving success. + * Sent to a [[Processor]] after successful saving of a snapshot. * * @param metadata snapshot metadata. */ @@ -24,7 +24,7 @@ case class SnapshotMetadata(processorId: String, sequenceNr: Long, timestamp: Lo case class SaveSnapshotSuccess(metadata: SnapshotMetadata) /** - * Notification of a snapshot saving success failure. + * Sent to a [[Processor]] after failed saving of a snapshot. * * @param metadata snapshot metadata. * @param cause failure cause. diff --git a/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala b/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala new file mode 100644 index 0000000000..45a4be11b4 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala @@ -0,0 +1,293 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.persistence + +import scala.collection.immutable.Seq + +import com.typesafe.config.Config + +import akka.actor._ +import akka.testkit.{ ImplicitSender, AkkaSpec } + +object EventsourcedSpec { + case class Cmd(data: Any) + case class Evt(data: Any) + + abstract class ExampleProcessor(name: String) extends NamedProcessor(name) with EventsourcedProcessor { + var events: List[Any] = Nil + + val updateState: Receive = { + case Evt(data) ⇒ events = data :: events + } + + val commonBehavior: Receive = { + case "boom" ⇒ throw new Exception("boom") + case GetState ⇒ sender ! events.reverse + } + + def receiveReplay = updateState + } + + class Behavior1Processor(name: String) extends ExampleProcessor(name) { + val receiveCommand: Receive = commonBehavior orElse { + case Cmd(data) ⇒ { + persist(Seq(Evt(s"${data}-1"), Evt(s"${data}-2")))(updateState) + } + } + } + + class Behavior2Processor(name: String) extends ExampleProcessor(name) { + val receiveCommand: Receive = commonBehavior orElse { + case Cmd(data) ⇒ { + persist(Seq(Evt(s"${data}-1"), Evt(s"${data}-2")))(updateState) + persist(Seq(Evt(s"${data}-3"), Evt(s"${data}-4")))(updateState) + } + } + } + + class Behavior3Processor(name: String) extends ExampleProcessor(name) { + val receiveCommand: Receive = commonBehavior orElse { + case Cmd(data) ⇒ { + persist(Seq(Evt(s"${data}-11"), Evt(s"${data}-12")))(updateState) + updateState(Evt(s"${data}-10")) + } + } + } + + class ChangeBehaviorInLastEventHandlerProcessor(name: String) extends ExampleProcessor(name) { + val newBehavior: Receive = { + case Cmd(data) ⇒ { + persist(Evt(s"${data}-21"))(updateState) + persist(Evt(s"${data}-22")) { event ⇒ + updateState(event) + context.unbecome() + } + } + } + + val receiveCommand: Receive = commonBehavior orElse { + case Cmd(data) ⇒ { + persist(Evt(s"${data}-0")) { event ⇒ + updateState(event) + context.become(newBehavior) + } + } + } + } + + class ChangeBehaviorInFirstEventHandlerProcessor(name: String) extends ExampleProcessor(name) { + val newBehavior: Receive = { + case Cmd(data) ⇒ { + persist(Evt(s"${data}-21")) { event ⇒ + updateState(event) + context.unbecome() + } + persist(Evt(s"${data}-22"))(updateState) + } + } + + val receiveCommand: Receive = commonBehavior orElse { + case Cmd(data) ⇒ { + persist(Evt(s"${data}-0")) { event ⇒ + updateState(event) + context.become(newBehavior) + } + } + } + } + + class ChangeBehaviorInCommandHandlerFirstProcessor(name: String) extends ExampleProcessor(name) { + val newBehavior: Receive = { + case Cmd(data) ⇒ { + context.unbecome() + persist(Seq(Evt(s"${data}-31"), Evt(s"${data}-32")))(updateState) + updateState(Evt(s"${data}-30")) + } + } + + val receiveCommand: Receive = commonBehavior orElse { + case Cmd(data) ⇒ { + context.become(newBehavior) + persist(Evt(s"${data}-0"))(updateState) + } + } + } + + class ChangeBehaviorInCommandHandlerLastProcessor(name: String) extends ExampleProcessor(name) { + val newBehavior: Receive = { + case Cmd(data) ⇒ { + persist(Seq(Evt(s"${data}-31"), Evt(s"${data}-32")))(updateState) + updateState(Evt(s"${data}-30")) + context.unbecome() + } + } + + val receiveCommand: Receive = commonBehavior orElse { + case Cmd(data) ⇒ { + persist(Evt(s"${data}-0"))(updateState) + context.become(newBehavior) + } + } + } + + class SnapshottingEventsourcedProcessor(name: String, probe: ActorRef) extends ExampleProcessor(name) { + override def receiveReplay = super.receiveReplay orElse { + case SnapshotOffer(_, events: List[_]) ⇒ { + probe ! "offered" + this.events = events + } + } + + val receiveCommand: Receive = commonBehavior orElse { + case Cmd(data) ⇒ { + persist(Seq(Evt(s"${data}-41"), Evt(s"${data}-42")))(updateState) + } + case SaveSnapshotSuccess(_) ⇒ probe ! "saved" + case "snap" ⇒ saveSnapshot(events) + } + } + + class ReplyInEventHandlerProcessor(name: String) extends ExampleProcessor(name) { + val receiveCommand: Receive = { + case Cmd("a") ⇒ persist(Evt("a"))(evt ⇒ sender ! evt.data) + } + } + + class UserStashProcessor(name: String) extends ExampleProcessor(name) { + var stashed = false + val receiveCommand: Receive = { + case Cmd("a") ⇒ if (!stashed) { stash(); stashed = true } else sender ! "a" + case Cmd("b") ⇒ persist(Evt("b"))(evt ⇒ sender ! evt.data) + case Cmd("c") ⇒ unstashAll(); sender ! "c" + } + } + + class AnyValEventProcessor(name: String) extends ExampleProcessor(name) { + val receiveCommand: Receive = { + case Cmd("a") ⇒ persist(5)(evt ⇒ sender ! evt) + } + } +} + +abstract class EventsourcedSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender { + import EventsourcedSpec._ + + override protected def beforeEach() { + super.beforeEach() + + val processor = namedProcessor[Behavior1Processor] + processor ! Cmd("a") + processor ! GetState + expectMsg(List("a-1", "a-2")) + } + + "An eventsourced processor" must { + "recover from persisted events" in { + val processor = namedProcessor[Behavior1Processor] + processor ! GetState + expectMsg(List("a-1", "a-2")) + } + "handle multiple emitted events in correct order (for a single persist call)" in { + val processor = namedProcessor[Behavior1Processor] + processor ! Cmd("b") + processor ! GetState + expectMsg(List("a-1", "a-2", "b-1", "b-2")) + } + "handle multiple emitted events in correct order (for multiple persist calls)" in { + val processor = namedProcessor[Behavior2Processor] + processor ! Cmd("b") + processor ! GetState + expectMsg(List("a-1", "a-2", "b-1", "b-2", "b-3", "b-4")) + } + "receive emitted events immediately after command" in { + val processor = namedProcessor[Behavior3Processor] + processor ! Cmd("b") + processor ! Cmd("c") + processor ! GetState + expectMsg(List("a-1", "a-2", "b-10", "b-11", "b-12", "c-10", "c-11", "c-12")) + } + "recover on command failure" in { + val processor = namedProcessor[Behavior3Processor] + processor ! Cmd("b") + processor ! "boom" + processor ! Cmd("c") + processor ! GetState + // cmd that was added to state before failure (b-10) is not replayed ... + expectMsg(List("a-1", "a-2", "b-11", "b-12", "c-10", "c-11", "c-12")) + } + "allow behavior changes in event handler (when handling first event)" in { + val processor = namedProcessor[ChangeBehaviorInFirstEventHandlerProcessor] + processor ! Cmd("b") + processor ! Cmd("c") + processor ! Cmd("d") + processor ! Cmd("e") + processor ! GetState + expectMsg(List("a-1", "a-2", "b-0", "c-21", "c-22", "d-0", "e-21", "e-22")) + } + "allow behavior changes in event handler (when handling last event)" in { + val processor = namedProcessor[ChangeBehaviorInLastEventHandlerProcessor] + processor ! Cmd("b") + processor ! Cmd("c") + processor ! Cmd("d") + processor ! Cmd("e") + processor ! GetState + expectMsg(List("a-1", "a-2", "b-0", "c-21", "c-22", "d-0", "e-21", "e-22")) + } + "allow behavior changes in command handler (as first action)" in { + val processor = namedProcessor[ChangeBehaviorInCommandHandlerFirstProcessor] + processor ! Cmd("b") + processor ! Cmd("c") + processor ! Cmd("d") + processor ! Cmd("e") + processor ! GetState + expectMsg(List("a-1", "a-2", "b-0", "c-30", "c-31", "c-32", "d-0", "e-30", "e-31", "e-32")) + } + "allow behavior changes in command handler (as last action)" in { + val processor = namedProcessor[ChangeBehaviorInCommandHandlerLastProcessor] + processor ! Cmd("b") + processor ! Cmd("c") + processor ! Cmd("d") + processor ! Cmd("e") + processor ! GetState + expectMsg(List("a-1", "a-2", "b-0", "c-30", "c-31", "c-32", "d-0", "e-30", "e-31", "e-32")) + } + "support snapshotting" in { + val processor1 = system.actorOf(Props(classOf[SnapshottingEventsourcedProcessor], name, testActor)) + processor1 ! Cmd("b") + processor1 ! "snap" + processor1 ! Cmd("c") + expectMsg("saved") + processor1 ! GetState + expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42")) + + val processor2 = system.actorOf(Props(classOf[SnapshottingEventsourcedProcessor], name, testActor)) + expectMsg("offered") + processor2 ! GetState + expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42")) + } + "be able to reply within an event handler" in { + val processor = namedProcessor[ReplyInEventHandlerProcessor] + processor ! Cmd("a") + expectMsg("a") + } + "not interfere with the user stash" in { + val processor = namedProcessor[UserStashProcessor] + processor ! Cmd("a") + processor ! Cmd("b") + processor ! Cmd("c") + expectMsg("b") + expectMsg("c") + expectMsg("a") + } + "be able to persist events that extend AnyVal" in { + val processor = namedProcessor[AnyValEventProcessor] + processor ! Cmd("a") + expectMsg(5) + } + } +} + +class LeveldbEventsourcedSpec extends EventsourcedSpec(PersistenceSpec.config("leveldb", "eventsourced")) +class InmemEventsourcedSpec extends EventsourcedSpec(PersistenceSpec.config("inmem", "eventsourced")) diff --git a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala index dce19ea370..17a2d2633d 100644 --- a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala @@ -123,18 +123,15 @@ object MessageSerializerRemotingSpec { } } - def port(system: ActorSystem, protocol: String) = - addr(system, protocol).port.get - - def addr(system: ActorSystem, protocol: String) = - system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + def port(system: ActorSystem) = + system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress.port.get } class MessageSerializerRemotingSpec extends AkkaSpec(config(systemA).withFallback(config(customSerializers, remoteCommon))) with ImplicitSender { import MessageSerializerRemotingSpec._ val remoteSystem = ActorSystem("remote", config(systemB).withFallback(config(customSerializers, remoteCommon))) - val localActor = system.actorOf(Props(classOf[LocalActor], port(remoteSystem, "tcp"))) + val localActor = system.actorOf(Props(classOf[LocalActor], port(remoteSystem))) override protected def atStartup() { remoteSystem.actorOf(Props[RemoteActor], "remote") diff --git a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/EventsourcedExample.java b/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/EventsourcedExample.java new file mode 100644 index 0000000000..297a1c9a63 --- /dev/null +++ b/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/EventsourcedExample.java @@ -0,0 +1,137 @@ +package sample.persistence.japi; + +//#eventsourced-example +import java.io.Serializable; +import java.util.ArrayList; + +import akka.actor.*; +import akka.japi.Procedure; +import akka.persistence.*; + +import static java.util.Arrays.asList; + +class Cmd implements Serializable { + private final String data; + + public Cmd(String data) { + this.data = data; + } + + public String getData() { + return data; + } +} + +class Evt implements Serializable { + private final String data; + + public Evt(String data) { + this.data = data; + } + + public String getData() { + return data; + } +} + +class ExampleState implements Serializable { + private final ArrayList events; + + public ExampleState() { + this(new ArrayList()); + } + + public ExampleState(ArrayList events) { + this.events = events; + } + + public ExampleState copy() { + return new ExampleState(new ArrayList(events)); + } + + public void update(Evt evt) { + events.add(evt.getData()); + } + + public int size() { + return events.size(); + } + + @Override + public String toString() { + return events.toString(); + } +} + +class ExampleProcessor extends UntypedEventsourcedProcessor { + private ExampleState state = new ExampleState(); + + public int getNumEvents() { + return state.size(); + } + + public void onReceiveReplay(Object msg) { + if (msg instanceof Evt) { + state.update((Evt) msg); + } else if (msg instanceof SnapshotOffer) { + state = (ExampleState)((SnapshotOffer)msg).snapshot(); + } + } + + public void onReceiveCommand(Object msg) { + if (msg instanceof Cmd) { + final String data = ((Cmd)msg).getData(); + final Evt evt1 = new Evt(data + "-" + getNumEvents()); + final Evt evt2 = new Evt(data + "-" + (getNumEvents() + 1)); + persist(asList(evt1, evt2), new Procedure() { + public void apply(Evt evt) throws Exception { + state.update(evt); + if (evt.equals(evt2)) { + getContext().system().eventStream().publish(evt); + if (data.equals("foo")) getContext().become(otherCommandHandler); + } + } + }); + } else if (msg.equals("snap")) { + // IMPORTANT: create a copy of snapshot + // because ExampleState is mutable !!! + saveSnapshot(state.copy()); + } else if (msg.equals("print")) { + System.out.println(state); + } + } + + private Procedure otherCommandHandler = new Procedure() { + public void apply(Object msg) throws Exception { + if (msg instanceof Cmd && ((Cmd)msg).getData().equals("bar")) { + persist(new Evt("bar-" + getNumEvents()), new Procedure() { + public void apply(Evt event) throws Exception { + state.update(event); + getContext().unbecome(); + } + }); + unstashAll(); + } else { + stash(); + } + } + }; +} +//#eventsourced-example + +public class EventsourcedExample { + public static void main(String... args) throws Exception { + final ActorSystem system = ActorSystem.create("example"); + final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-4-java"); + + processor.tell(new Cmd("foo"), null); + processor.tell(new Cmd("baz"), null); + processor.tell(new Cmd("bar"), null); + processor.tell("snap", null); + processor.tell(new Cmd("buzz"), null); + processor.tell("print", null); + + Thread.sleep(1000); + system.shutdown(); + } +} diff --git a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/SnapshotExample.java b/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/SnapshotExample.java index e58be657fe..5f5055b7f1 100644 --- a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/SnapshotExample.java +++ b/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/SnapshotExample.java @@ -51,6 +51,8 @@ public class SnapshotExample { } else if (message.equals("print")) { System.out.println("current state = " + state); } else if (message.equals("snap")) { + // IMPORTANT: create a copy of snapshot + // because ExampleState is mutable !!! saveSnapshot(state.copy()); } } diff --git a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/EventsourcedExample.scala b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/EventsourcedExample.scala new file mode 100644 index 0000000000..d357a26a66 --- /dev/null +++ b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/EventsourcedExample.scala @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package sample.persistence + +//#eventsourced-example +import akka.actor._ +import akka.persistence._ + +case class Cmd(data: String) +case class Evt(data: String) + +case class ExampleState(events: List[String] = Nil) { + def update(evt: Evt) = copy(evt.data :: events) + def size = events.length + override def toString: String = events.reverse.toString +} + +class ExampleProcessor extends EventsourcedProcessor { + var state = ExampleState() + + def updateState(event: Evt): Unit = + state = state.update(event) + + def numEvents = + state.size + + val receiveReplay: Receive = { + case evt: Evt ⇒ updateState(evt) + case SnapshotOffer(_, snapshot: ExampleState) ⇒ state = snapshot + } + + val receiveCommand: Receive = { + case Cmd(data) ⇒ { + persist(Evt(s"${data}-${numEvents}"))(updateState) + persist(Evt(s"${data}-${numEvents + 1}")) { event ⇒ + updateState(event) + context.system.eventStream.publish(event) + if (data == "foo") context.become(otherCommandHandler) + } + } + case "snap" ⇒ saveSnapshot(state) + case "print" ⇒ println(state) + } + + val otherCommandHandler: Receive = { + case Cmd("bar") ⇒ { + persist(Evt(s"bar-${numEvents}")) { event ⇒ + updateState(event) + context.unbecome() + } + unstashAll() + } + case other ⇒ stash() + } +} +//#eventsourced-example + +object EventsourcedExample extends App { + + val system = ActorSystem("example") + val processor = system.actorOf(Props[ExampleProcessor], "processor-4-scala") + + processor ! Cmd("foo") + processor ! Cmd("baz") // will be stashed + processor ! Cmd("bar") + processor ! "snap" + processor ! Cmd("buzz") + processor ! "print" + + Thread.sleep(1000) + system.shutdown() +}