Merge pull request #1766 from eligosource/wip-3661-event-sourcing-krasserm

+per #3661 Event sourcing support
This commit is contained in:
Patrik Nordwall 2013-10-16 22:55:47 -07:00
commit f194860dfb
13 changed files with 948 additions and 71 deletions

View file

@ -295,19 +295,8 @@ components may consume the event stream as a means to replicate the component
state on a different continent or to react to changes). If the components
state is lost—due to a machine failure or by being pushed out of a cache—it can
easily be reconstructed by replaying the event stream (usually employing
snapshots to speed up the process). Read a lot more about `Event Sourcing`_.
Martin Krasser has written an implementation of event sourcing principles on
top of Akka called `eventsourced`_, including support for guaranteed delivery
semantics as described in the previous section.
A successor of `eventsourced` is now part of Akka (see :ref:`persistence`) which
is a general solution for actor state persistence. It journals messages before
they are received by an actor and can be used to implement both event sourcing
and command sourcing.
.. _Event Sourcing: http://martinfowler.com/eaaDev/EventSourcing.html
.. _eventsourced: https://github.com/eligosource/eventsourced
snapshots to speed up the process). :ref:`event-sourcing` is supported by
Akka (see :ref:`persistence`).
Mailbox with Explicit Acknowledgement
-------------------------------------

View file

@ -57,12 +57,6 @@ Architecture
* *Snapshot store*: A snapshot store persists snapshots of a processor's internal state. Snapshots are used for
optimizing recovery times. The storage backend of a snapshot store is pluggable.
Use cases
=========
* TODO: describe command sourcing
* TODO: describe event sourcing
Configuration
=============
@ -271,6 +265,59 @@ If not specified, they default to ``SnapshotSelectionCriteria.latest()`` which s
To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.none()``. A recovery where no
saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages.
Event sourcing
==============
In all the examples so far, messages that change a processor's state have been sent as ``Persistent`` messages
by an application, so that they can be replayed during recovery. From this point of view, the journal acts as
a write-ahead-log for whatever ``Persistent`` messages a processor receives. This is also known as *command
sourcing*. Commands, however, may fail and some applications cannot tolerate command failures during recovery.
For these applications `Event Sourcing`_ is a better choice. Applied to Akka persistence, the basic idea behind
event sourcing is quite simple. A processor receives a (non-persistent) command which is first validated if it
can be applied to the current state. Here, validation can mean anything, from simple inspection of a command
message's fields up to a conversation with several external services, for example. If validation succeeds, events
are generated from the command, representing the effect of the command. These events are then persisted and, after
successful persistence, used to change a processor's state. When the processor needs to be recovered, only the
persisted events are replayed of which we know that they can be successfully applied. In other words, events
cannot fail when being replayed to a processor, in contrast to commands. Eventsourced processors may of course
also process commands that do not change application state, such as query commands, for example.
.. _Event Sourcing: http://martinfowler.com/eaaDev/EventSourcing.html
Akka persistence supports event sourcing with the abstract ``UntypedEventsourcedProcessor`` class (which implements
event sourcing as a pattern on top of command sourcing). A processor that extends this abstract class does not handle
``Persistent`` messages directly but uses the ``persist`` method to persist and handle events. The behavior of an
``UntypedEventsourcedProcessor`` is defined by implementing ``onReceiveReplay`` and ``onReceiveCommand``. This is
best explained with an example (which is also part of ``akka-sample-persistence``).
.. includecode:: ../../../akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/EventsourcedExample.java#eventsourced-example
The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The
``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``.
The processor's ``onReceiveReplay`` method defines how ``state`` is updated during recovery by handling ``Evt``
and ``SnapshotOffer`` messages. The processor's ``onReceiveCommand`` method is a command handler. In this example,
a command is handled by generating two events which are then persisted and handled. Events are persisted by calling
``persist`` with an event (or a sequence of events) as first argument and an event handler as second argument.
The ``persist`` method persists events asynchronously and the event handler is executed for successfully persisted
events. Successfully persisted events are internally sent back to the processor as separate messages which trigger
the event handler execution. An event handler may therefore close over processor state and mutate it. The sender
of a persisted event is the sender of the corresponding command. This allows event handlers to reply to the sender
of a command (not shown).
The main responsibility of an event handler is changing processor state using event data and notifying others
about successful state changes by publishing events.
When persisting events with ``persist`` it is guaranteed that the processor will not receive new commands between
the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist``
calls in context of a single command.
The example also demonstrates how to change the processor's default behavior, defined by ``onReceiveCommand``, to
another behavior, defined by ``otherCommandHandler``, and back using ``getContext().become()`` and
``getContext().unbecome()``. See also the API docs of ``persist`` for further details.
Storage plugins
===============

View file

@ -53,12 +53,6 @@ Architecture
* *Snapshot store*: A snapshot store persists snapshots of a processor's internal state. Snapshots are used for
optimizing recovery times. The storage backend of a snapshot store is pluggable.
Use cases
=========
* TODO: describe command sourcing
* TODO: describe event sourcing
Configuration
=============
@ -282,6 +276,61 @@ If not specified, they default to ``SnapshotSelectionCriteria.Latest`` which sel
To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.None``. A recovery where no
saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages.
.. _event-sourcing:
Event sourcing
==============
In all the examples so far, messages that change a processor's state have been sent as ``Persistent`` messages
by an application, so that they can be replayed during recovery. From this point of view, the journal acts as
a write-ahead-log for whatever ``Persistent`` messages a processor receives. This is also known as *command
sourcing*. Commands, however, may fail and some applications cannot tolerate command failures during recovery.
For these applications `Event Sourcing`_ is a better choice. Applied to Akka persistence, the basic idea behind
event sourcing is quite simple. A processor receives a (non-persistent) command which is first validated if it
can be applied to the current state. Here, validation can mean anything, from simple inspection of a command
message's fields up to a conversation with several external services, for example. If validation succeeds, events
are generated from the command, representing the effect of the command. These events are then persisted and, after
successful persistence, used to change a processor's state. When the processor needs to be recovered, only the
persisted events are replayed of which we know that they can be successfully applied. In other words, events
cannot fail when being replayed to a processor, in contrast to commands. Eventsourced processors may of course
also process commands that do not change application state, such as query commands, for example.
.. _Event Sourcing: http://martinfowler.com/eaaDev/EventSourcing.html
Akka persistence supports event sourcing with the ``EventsourcedProcessor`` trait (which implements event sourcing
as a pattern on top of command sourcing). A processor that extends this trait does not handle ``Persistent`` messages
directly but uses the ``persist`` method to persist and handle events. The behavior of an ``EventsourcedProcessor``
is defined by implementing ``receiveReplay`` and ``receiveCommand``. This is best explained with an example (which
is also part of ``akka-sample-persistence``).
.. includecode:: ../../../akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/EventsourcedExample.scala#eventsourced-example
The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The
``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``.
The processor's ``receiveReplay`` method defines how ``state`` is updated during recovery by handling ``Evt``
and ``SnapshotOffer`` messages. The processor's ``receiveCommand`` method is a command handler. In this example,
a command is handled by generating two events which are then persisted and handled. Events are persisted by calling
``persist`` with an event (or a sequence of events) as first argument and an event handler as second argument.
The ``persist`` method persists events asynchronously and the event handler is executed for successfully persisted
events. Successfully persisted events are internally sent back to the processor as separate messages which trigger
the event handler execution. An event handler may therefore close over processor state and mutate it. The sender
of a persisted event is the sender of the corresponding command. This allows event handlers to reply to the sender
of a command (not shown).
The main responsibility of an event handler is changing processor state using event data and notifying others
about successful state changes by publishing events.
When persisting events with ``persist`` it is guaranteed that the processor will not receive new commands between
the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist``
calls in context of a single command.
The example also demonstrates how to change the processor's default behavior, defined by ``receiveCommand``, to
another behavior, defined by ``otherCommandHandler``, and back using ``context.become()`` and ``context.unbecome()``.
See also the API docs of ``persist`` for further details.
Storage plugins
===============

View file

@ -0,0 +1,248 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import java.lang.{ Iterable JIterable }
import scala.collection.immutable
import akka.japi.{ Procedure, Util }
import akka.persistence.JournalProtocol._
/**
* INTERNAL API.
*
* Event sourcing mixin for a [[Processor]].
*/
private[persistence] trait Eventsourced extends Processor {
private trait State {
def aroundReceive(receive: Receive, message: Any): Unit
}
/**
* Command processing state. If event persistence is pending after processing a
* command, event persistence is triggered and state changes to `persistingEvents`.
*/
private val processingCommands: State = new State {
def aroundReceive(receive: Receive, message: Any) = message match {
case m if (persistInvocations.isEmpty) {
Eventsourced.super.aroundReceive(receive, m)
if (!persistInvocations.isEmpty) {
persistInvocations = persistInvocations.reverse
persistCandidates = persistCandidates.reverse
persistCandidates.foreach(self forward Persistent(_))
currentState = persistingEvents
}
}
}
}
/**
* Event persisting state. Remains until pending events are persisted and then changes
* state to `processingCommands`. Only events to be persisted are processed. All other
* messages are stashed internally.
*/
private val persistingEvents: State = new State {
def aroundReceive(receive: Receive, message: Any) = message match {
case p: PersistentImpl if identical(p.payload, persistCandidates.head) {
Eventsourced.super.aroundReceive(receive, message)
persistCandidates = persistCandidates.tail
}
case WriteSuccess(p) if identical(p.payload, persistInvocations.head._1) {
withCurrentPersistent(p)(p persistInvocations.head._2(p.payload))
onWriteComplete()
}
case e @ WriteFailure(p, _) if identical(p.payload, persistInvocations.head._1) {
Eventsourced.super.aroundReceive(receive, message) // stops actor by default
onWriteComplete()
}
case other processorStash.stash()
}
def onWriteComplete(): Unit = {
persistInvocations = persistInvocations.tail
if (persistInvocations.isEmpty) {
currentState = processingCommands
processorStash.unstashAll()
}
}
def identical(a: Any, b: Any): Boolean =
a.asInstanceOf[AnyRef] eq b.asInstanceOf[AnyRef]
}
private var persistInvocations: List[(Any, Any Unit)] = Nil
private var persistCandidates: List[Any] = Nil
private var currentState: State = processingCommands
private val processorStash = createProcessorStash
/**
* Asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event. It is guaranteed that no new commands will be received by a processor
* between a call to `persist` and the execution of its `handler`. This also holds for
* multiple `persist` calls per received command. Internally, this is achieved by stashing new
* commands and unstashing them when the `event` has been persisted and handled. The stash used
* for that is an internal stash which doesn't interfere with the user stash inherited from
* [[Processor]].
*
* An event `handler` may close over processor state and modify it. The `sender` of a persisted
* event is the sender of the corresponding command. This means that one can reply to a command
* sender within an event `handler`.
*
* Within an event handler, applications usually update processor state using persisted event
* data, notify listeners and reply to command senders.
*
* If persistence of an event fails, the processor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[receiveCommand]].
*
* @param event event to be persisted.
* @param handler handler for each persisted `event`
*/
final def persist[A](event: A)(handler: A Unit): Unit = {
persistInvocations = (event, handler.asInstanceOf[Any Unit]) :: persistInvocations
persistCandidates = event :: persistCandidates
}
/**
* Asynchronously persists `events` in specified order. This is equivalent to calling
* `persist[A](event: A)(handler: A => Unit)` multiple times with the same `handler`.
*
* @param events events to be persisted.
* @param handler handler for each persisted `events`
*/
final def persist[A](events: immutable.Seq[A])(handler: A Unit): Unit =
events.foreach(persist(_)(handler))
/**
* Replay handler that receives persisted events during recovery. If a state snapshot
* has been captured and saved, this handler will receive a [[SnapshotOffer]] message
* followed by events that are younger than the offered snapshot.
*
* This handler must not have side-effects other than changing processor state i.e. it
* should not perform actions that may fail, such as interacting with external services,
* for example.
*
* @see [[Recover]]
*/
def receiveReplay: Receive
/**
* Command handler. Typically validates commands against current state (and/or by
* communication with other actors). On successful validation, one or more events are
* derived from a command and these events are then persisted by calling `persist`.
* Commands sent to event sourced processors should not be [[Persistent]] messages.
*/
def receiveCommand: Receive
/**
* INTERNAL API.
*/
final override protected[akka] def aroundReceive(receive: Receive, message: Any) {
currentState.aroundReceive(receive, message)
}
/**
* INTERNAL API.
*/
protected[persistence] val initialBehavior: Receive = {
case Persistent(payload, _) if receiveReplay.isDefinedAt(payload) && recoveryRunning
receiveReplay(payload)
case s: SnapshotOffer if receiveReplay.isDefinedAt(s)
receiveReplay(s)
case f: RecoveryFailure if receiveReplay.isDefinedAt(f)
receiveReplay(f)
case msg if receiveCommand.isDefinedAt(msg)
receiveCommand(msg)
}
}
/**
* An event sourced processor.
*/
trait EventsourcedProcessor extends Processor with Eventsourced {
final def receive = initialBehavior
}
/**
* Java API.
*
* An event sourced processor.
*/
abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Eventsourced {
final def onReceive(message: Any) = initialBehavior(message)
final def receiveReplay: Receive = {
case msg onReceiveReplay(msg)
}
final def receiveCommand: Receive = {
case msg onReceiveCommand(msg)
}
/**
* Java API.
*
* Asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event. It is guaranteed that no new commands will be received by a processor
* between a call to `persist` and the execution of its `handler`. This also holds for
* multiple `persist` calls per received command. Internally, this is achieved by stashing new
* commands and unstashing them when the `event` has been persisted and handled. The stash used
* for that is an internal stash which doesn't interfere with the user stash inherited from
* [[UntypedProcessor]].
*
* An event `handler` may close over processor state and modify it. The `getSender()` of a persisted
* event is the sender of the corresponding command. This means that one can reply to a command
* sender within an event `handler`.
*
* Within an event handler, applications usually update processor state using persisted event
* data, notify listeners and reply to command senders.
*
* If persistence of an event fails, the processor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[onReceiveCommand]].
*
* @param event event to be persisted.
* @param handler handler for each persisted `event`
*/
final def persist[A](event: A, handler: Procedure[A]): Unit =
persist(event)(event handler(event))
/**
* Java API.
*
* Asynchronously persists `events` in specified order. This is equivalent to calling
* `persist[A](event: A, handler: Procedure[A])` multiple times with the same `handler`.
*
* @param events events to be persisted.
* @param handler handler for each persisted `events`
*/
final def persist[A](events: JIterable[A], handler: Procedure[A]): Unit =
persist(Util.immutableSeq(events))(event handler(event))
/**
* Java API.
*
* Replay handler that receives persisted events during recovery. If a state snapshot
* has been captured and saved, this handler will receive a [[SnapshotOffer]] message
* followed by events that are younger than the offered snapshot.
*
* This handler must not have side-effects other than changing processor state i.e. it
* should not perform actions that may fail, such as interacting with external services,
* for example.
*
* @see [[Recover]]
*/
def onReceiveReplay(msg: Any): Unit
/**
* Java API.
*
* Command handler. Typically validates commands against current state (and/or by
* communication with other actors). On successful validation, one or more events are
* derived from a command and these events are then persisted by calling `persist`.
* Commands sent to event sourced processors should not be [[Persistent]] messages.
*/
def onReceiveCommand(msg: Any): Unit
}

View file

@ -136,7 +136,8 @@ object PersistentImpl {
}
/**
* Received by a processor when a journal failed to write a [[Persistent]] message.
* Sent to a [[Processor]] when a journal failed to write a [[Persistent]] message. If
* not handled, an `akka.actor.ActorKilledException` is thrown by that processor.
*
* @param payload payload of the persistent message.
* @param sequenceNr sequence number of the persistent message.

View file

@ -4,16 +4,9 @@
package akka.persistence
import akka.AkkaException
import akka.actor._
import akka.dispatch._
/**
* Thrown by a [[Processor]] if a journal failed to replay all requested messages.
*/
@SerialVersionUID(1L)
case class ReplayFailureException(message: String, cause: Throwable) extends AkkaException(message, cause)
/**
* An actor that persists (journals) messages of type [[Persistent]]. Messages of other types are not persisted.
*
@ -76,15 +69,8 @@ trait Processor extends Actor with Stash {
protected def process(receive: Actor.Receive, message: Any) =
receive.applyOrElse(message, unhandled)
protected def processPersistent(receive: Actor.Receive, persistent: Persistent) = try {
_currentPersistent = persistent
updateLastSequenceNr(persistent)
receive.applyOrElse(persistent, unhandled)
} finally _currentPersistent = null
protected def updateLastSequenceNr(persistent: Persistent) {
if (persistent.sequenceNr > _lastSequenceNr) _lastSequenceNr = persistent.sequenceNr
}
protected def processPersistent(receive: Actor.Receive, persistent: Persistent) =
withCurrentPersistent(persistent)(receive.applyOrElse(_, unhandled))
}
/**
@ -98,7 +84,7 @@ trait Processor extends Actor with Stash {
_currentState = recoveryStarted
snapshotStore ! LoadSnapshot(processorId, fromSnap, toSnr)
}
case _ stashInternal()
case _ processorStash.stash()
}
}
@ -123,11 +109,15 @@ trait Processor extends Actor with Stash {
case ReplaySuccess(maxSnr) {
_currentState = recoverySucceeded
_sequenceNr = maxSnr
unstashAllInternal()
processorStash.unstashAll()
}
case ReplayFailure(cause) {
val errorMsg = s"Replay failure by journal (processor id = [${processorId}])"
throw new ReplayFailureException(errorMsg, cause)
val notification = RecoveryFailure(cause)
if (receive.isDefinedAt(notification)) process(receive, notification)
else {
val errorMsg = s"Replay failure by journal (processor id = [${processorId}])"
throw new RecoveryFailureException(errorMsg, cause)
}
}
case Replayed(p) try { processPersistent(receive, p) } catch {
case t: Throwable {
@ -137,7 +127,7 @@ trait Processor extends Actor with Stash {
}
}
case r: Recover // ignore
case _ stashInternal()
case _ processorStash.stash()
}
}
@ -182,7 +172,7 @@ trait Processor extends Actor with Stash {
}
case Replayed(p) updateLastSequenceNr(p)
case r: Recover // ignore
case _ stashInternal()
case _ processorStash.stash()
}
}
@ -263,7 +253,23 @@ trait Processor extends Actor with Stash {
/**
* INTERNAL API.
*/
final override protected[akka] def aroundReceive(receive: Actor.Receive, message: Any): Unit = {
protected[persistence] def withCurrentPersistent(persistent: Persistent)(body: Persistent Unit): Unit = try {
_currentPersistent = persistent
updateLastSequenceNr(persistent)
body(persistent)
} finally _currentPersistent = null
/**
* INTERNAL API.
*/
protected[persistence] def updateLastSequenceNr(persistent: Persistent) {
if (persistent.sequenceNr > _lastSequenceNr) _lastSequenceNr = persistent.sequenceNr
}
/**
* INTERNAL API.
*/
override protected[akka] def aroundReceive(receive: Actor.Receive, message: Any): Unit = {
_currentState.aroundReceive(receive, message)
}
@ -287,7 +293,7 @@ trait Processor extends Actor with Stash {
final override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = {
try {
unstashAll(unstashFilterPredicate)
unstashAllInternal()
processorStash.unstashAll()
} finally {
message match {
case Some(WriteSuccess(m)) preRestartDefault(reason, Some(m))
@ -335,27 +341,44 @@ trait Processor extends Actor with Stash {
// Processor-internal stash
// -----------------------------------------------------
private def unstashFilterPredicate: Any Boolean = {
private val unstashFilterPredicate: Any Boolean = {
case _: WriteSuccess false
case _: Replayed false
case _ true
}
private var processorStash = Vector.empty[Envelope]
private def stashInternal(): Unit = {
processorStash :+= currentEnvelope
}
private def unstashAllInternal(): Unit = try {
val i = processorStash.reverseIterator
while (i.hasNext) mailbox.enqueueFirst(self, i.next())
} finally {
processorStash = Vector.empty[Envelope]
}
private val processorStash =
createProcessorStash
private def currentEnvelope: Envelope =
context.asInstanceOf[ActorCell].currentMessage
/**
* INTERNAL API.
*/
private[persistence] def createProcessorStash = new ProcessorStash {
var theStash = Vector.empty[Envelope]
def stash(): Unit =
theStash :+= currentEnvelope
def unstashAll(): Unit = try {
val i = theStash.reverseIterator
while (i.hasNext) mailbox.enqueueFirst(self, i.next())
} finally {
theStash = Vector.empty[Envelope]
}
}
}
/**
* INTERNAL API.
*
* Processor specific stash used internally to avoid interference with user stash.
*/
private[persistence] trait ProcessorStash {
def stash()
def unstashAll()
}
/**
@ -421,3 +444,4 @@ abstract class UntypedProcessor extends UntypedActor with Processor {
*/
def getCurrentPersistentMessage = currentPersistentMessage.getOrElse(null)
}

View file

@ -4,6 +4,8 @@
package akka.persistence
import akka.AkkaException
/**
* Instructs a processor to recover itself. Recovery will start from a snapshot if the processor has
* previously saved one or more snapshots and at least one of these snapshots matches the specified
@ -53,3 +55,17 @@ object Recover {
def create(fromSnapshot: SnapshotSelectionCriteria, toSequenceNr: Long) =
Recover(fromSnapshot, toSequenceNr)
}
/**
* Sent to a [[Processor]] after failed recovery. If not handled, a
* [[RecoveryFailureException]] is thrown by that processor.
*/
@SerialVersionUID(1L)
case class RecoveryFailure(cause: Throwable)
/**
* Thrown by a [[Processor]] if a journal failed to replay all requested messages.
*/
@SerialVersionUID(1L)
case class RecoveryFailureException(message: String, cause: Throwable) extends AkkaException(message, cause)

View file

@ -16,7 +16,7 @@ case class SnapshotMetadata(processorId: String, sequenceNr: Long, timestamp: Lo
//#snapshot-metadata
/**
* Notification of a snapshot saving success.
* Sent to a [[Processor]] after successful saving of a snapshot.
*
* @param metadata snapshot metadata.
*/
@ -24,7 +24,7 @@ case class SnapshotMetadata(processorId: String, sequenceNr: Long, timestamp: Lo
case class SaveSnapshotSuccess(metadata: SnapshotMetadata)
/**
* Notification of a snapshot saving success failure.
* Sent to a [[Processor]] after failed saving of a snapshot.
*
* @param metadata snapshot metadata.
* @param cause failure cause.

View file

@ -0,0 +1,293 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import scala.collection.immutable.Seq
import com.typesafe.config.Config
import akka.actor._
import akka.testkit.{ ImplicitSender, AkkaSpec }
object EventsourcedSpec {
case class Cmd(data: Any)
case class Evt(data: Any)
abstract class ExampleProcessor(name: String) extends NamedProcessor(name) with EventsourcedProcessor {
var events: List[Any] = Nil
val updateState: Receive = {
case Evt(data) events = data :: events
}
val commonBehavior: Receive = {
case "boom" throw new Exception("boom")
case GetState sender ! events.reverse
}
def receiveReplay = updateState
}
class Behavior1Processor(name: String) extends ExampleProcessor(name) {
val receiveCommand: Receive = commonBehavior orElse {
case Cmd(data) {
persist(Seq(Evt(s"${data}-1"), Evt(s"${data}-2")))(updateState)
}
}
}
class Behavior2Processor(name: String) extends ExampleProcessor(name) {
val receiveCommand: Receive = commonBehavior orElse {
case Cmd(data) {
persist(Seq(Evt(s"${data}-1"), Evt(s"${data}-2")))(updateState)
persist(Seq(Evt(s"${data}-3"), Evt(s"${data}-4")))(updateState)
}
}
}
class Behavior3Processor(name: String) extends ExampleProcessor(name) {
val receiveCommand: Receive = commonBehavior orElse {
case Cmd(data) {
persist(Seq(Evt(s"${data}-11"), Evt(s"${data}-12")))(updateState)
updateState(Evt(s"${data}-10"))
}
}
}
class ChangeBehaviorInLastEventHandlerProcessor(name: String) extends ExampleProcessor(name) {
val newBehavior: Receive = {
case Cmd(data) {
persist(Evt(s"${data}-21"))(updateState)
persist(Evt(s"${data}-22")) { event
updateState(event)
context.unbecome()
}
}
}
val receiveCommand: Receive = commonBehavior orElse {
case Cmd(data) {
persist(Evt(s"${data}-0")) { event
updateState(event)
context.become(newBehavior)
}
}
}
}
class ChangeBehaviorInFirstEventHandlerProcessor(name: String) extends ExampleProcessor(name) {
val newBehavior: Receive = {
case Cmd(data) {
persist(Evt(s"${data}-21")) { event
updateState(event)
context.unbecome()
}
persist(Evt(s"${data}-22"))(updateState)
}
}
val receiveCommand: Receive = commonBehavior orElse {
case Cmd(data) {
persist(Evt(s"${data}-0")) { event
updateState(event)
context.become(newBehavior)
}
}
}
}
class ChangeBehaviorInCommandHandlerFirstProcessor(name: String) extends ExampleProcessor(name) {
val newBehavior: Receive = {
case Cmd(data) {
context.unbecome()
persist(Seq(Evt(s"${data}-31"), Evt(s"${data}-32")))(updateState)
updateState(Evt(s"${data}-30"))
}
}
val receiveCommand: Receive = commonBehavior orElse {
case Cmd(data) {
context.become(newBehavior)
persist(Evt(s"${data}-0"))(updateState)
}
}
}
class ChangeBehaviorInCommandHandlerLastProcessor(name: String) extends ExampleProcessor(name) {
val newBehavior: Receive = {
case Cmd(data) {
persist(Seq(Evt(s"${data}-31"), Evt(s"${data}-32")))(updateState)
updateState(Evt(s"${data}-30"))
context.unbecome()
}
}
val receiveCommand: Receive = commonBehavior orElse {
case Cmd(data) {
persist(Evt(s"${data}-0"))(updateState)
context.become(newBehavior)
}
}
}
class SnapshottingEventsourcedProcessor(name: String, probe: ActorRef) extends ExampleProcessor(name) {
override def receiveReplay = super.receiveReplay orElse {
case SnapshotOffer(_, events: List[_]) {
probe ! "offered"
this.events = events
}
}
val receiveCommand: Receive = commonBehavior orElse {
case Cmd(data) {
persist(Seq(Evt(s"${data}-41"), Evt(s"${data}-42")))(updateState)
}
case SaveSnapshotSuccess(_) probe ! "saved"
case "snap" saveSnapshot(events)
}
}
class ReplyInEventHandlerProcessor(name: String) extends ExampleProcessor(name) {
val receiveCommand: Receive = {
case Cmd("a") persist(Evt("a"))(evt sender ! evt.data)
}
}
class UserStashProcessor(name: String) extends ExampleProcessor(name) {
var stashed = false
val receiveCommand: Receive = {
case Cmd("a") if (!stashed) { stash(); stashed = true } else sender ! "a"
case Cmd("b") persist(Evt("b"))(evt sender ! evt.data)
case Cmd("c") unstashAll(); sender ! "c"
}
}
class AnyValEventProcessor(name: String) extends ExampleProcessor(name) {
val receiveCommand: Receive = {
case Cmd("a") persist(5)(evt sender ! evt)
}
}
}
abstract class EventsourcedSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
import EventsourcedSpec._
override protected def beforeEach() {
super.beforeEach()
val processor = namedProcessor[Behavior1Processor]
processor ! Cmd("a")
processor ! GetState
expectMsg(List("a-1", "a-2"))
}
"An eventsourced processor" must {
"recover from persisted events" in {
val processor = namedProcessor[Behavior1Processor]
processor ! GetState
expectMsg(List("a-1", "a-2"))
}
"handle multiple emitted events in correct order (for a single persist call)" in {
val processor = namedProcessor[Behavior1Processor]
processor ! Cmd("b")
processor ! GetState
expectMsg(List("a-1", "a-2", "b-1", "b-2"))
}
"handle multiple emitted events in correct order (for multiple persist calls)" in {
val processor = namedProcessor[Behavior2Processor]
processor ! Cmd("b")
processor ! GetState
expectMsg(List("a-1", "a-2", "b-1", "b-2", "b-3", "b-4"))
}
"receive emitted events immediately after command" in {
val processor = namedProcessor[Behavior3Processor]
processor ! Cmd("b")
processor ! Cmd("c")
processor ! GetState
expectMsg(List("a-1", "a-2", "b-10", "b-11", "b-12", "c-10", "c-11", "c-12"))
}
"recover on command failure" in {
val processor = namedProcessor[Behavior3Processor]
processor ! Cmd("b")
processor ! "boom"
processor ! Cmd("c")
processor ! GetState
// cmd that was added to state before failure (b-10) is not replayed ...
expectMsg(List("a-1", "a-2", "b-11", "b-12", "c-10", "c-11", "c-12"))
}
"allow behavior changes in event handler (when handling first event)" in {
val processor = namedProcessor[ChangeBehaviorInFirstEventHandlerProcessor]
processor ! Cmd("b")
processor ! Cmd("c")
processor ! Cmd("d")
processor ! Cmd("e")
processor ! GetState
expectMsg(List("a-1", "a-2", "b-0", "c-21", "c-22", "d-0", "e-21", "e-22"))
}
"allow behavior changes in event handler (when handling last event)" in {
val processor = namedProcessor[ChangeBehaviorInLastEventHandlerProcessor]
processor ! Cmd("b")
processor ! Cmd("c")
processor ! Cmd("d")
processor ! Cmd("e")
processor ! GetState
expectMsg(List("a-1", "a-2", "b-0", "c-21", "c-22", "d-0", "e-21", "e-22"))
}
"allow behavior changes in command handler (as first action)" in {
val processor = namedProcessor[ChangeBehaviorInCommandHandlerFirstProcessor]
processor ! Cmd("b")
processor ! Cmd("c")
processor ! Cmd("d")
processor ! Cmd("e")
processor ! GetState
expectMsg(List("a-1", "a-2", "b-0", "c-30", "c-31", "c-32", "d-0", "e-30", "e-31", "e-32"))
}
"allow behavior changes in command handler (as last action)" in {
val processor = namedProcessor[ChangeBehaviorInCommandHandlerLastProcessor]
processor ! Cmd("b")
processor ! Cmd("c")
processor ! Cmd("d")
processor ! Cmd("e")
processor ! GetState
expectMsg(List("a-1", "a-2", "b-0", "c-30", "c-31", "c-32", "d-0", "e-30", "e-31", "e-32"))
}
"support snapshotting" in {
val processor1 = system.actorOf(Props(classOf[SnapshottingEventsourcedProcessor], name, testActor))
processor1 ! Cmd("b")
processor1 ! "snap"
processor1 ! Cmd("c")
expectMsg("saved")
processor1 ! GetState
expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42"))
val processor2 = system.actorOf(Props(classOf[SnapshottingEventsourcedProcessor], name, testActor))
expectMsg("offered")
processor2 ! GetState
expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42"))
}
"be able to reply within an event handler" in {
val processor = namedProcessor[ReplyInEventHandlerProcessor]
processor ! Cmd("a")
expectMsg("a")
}
"not interfere with the user stash" in {
val processor = namedProcessor[UserStashProcessor]
processor ! Cmd("a")
processor ! Cmd("b")
processor ! Cmd("c")
expectMsg("b")
expectMsg("c")
expectMsg("a")
}
"be able to persist events that extend AnyVal" in {
val processor = namedProcessor[AnyValEventProcessor]
processor ! Cmd("a")
expectMsg(5)
}
}
}
class LeveldbEventsourcedSpec extends EventsourcedSpec(PersistenceSpec.config("leveldb", "eventsourced"))
class InmemEventsourcedSpec extends EventsourcedSpec(PersistenceSpec.config("inmem", "eventsourced"))

View file

@ -123,18 +123,15 @@ object MessageSerializerRemotingSpec {
}
}
def port(system: ActorSystem, protocol: String) =
addr(system, protocol).port.get
def addr(system: ActorSystem, protocol: String) =
system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
def port(system: ActorSystem) =
system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress.port.get
}
class MessageSerializerRemotingSpec extends AkkaSpec(config(systemA).withFallback(config(customSerializers, remoteCommon))) with ImplicitSender {
import MessageSerializerRemotingSpec._
val remoteSystem = ActorSystem("remote", config(systemB).withFallback(config(customSerializers, remoteCommon)))
val localActor = system.actorOf(Props(classOf[LocalActor], port(remoteSystem, "tcp")))
val localActor = system.actorOf(Props(classOf[LocalActor], port(remoteSystem)))
override protected def atStartup() {
remoteSystem.actorOf(Props[RemoteActor], "remote")

View file

@ -0,0 +1,137 @@
package sample.persistence.japi;
//#eventsourced-example
import java.io.Serializable;
import java.util.ArrayList;
import akka.actor.*;
import akka.japi.Procedure;
import akka.persistence.*;
import static java.util.Arrays.asList;
class Cmd implements Serializable {
private final String data;
public Cmd(String data) {
this.data = data;
}
public String getData() {
return data;
}
}
class Evt implements Serializable {
private final String data;
public Evt(String data) {
this.data = data;
}
public String getData() {
return data;
}
}
class ExampleState implements Serializable {
private final ArrayList<String> events;
public ExampleState() {
this(new ArrayList<String>());
}
public ExampleState(ArrayList<String> events) {
this.events = events;
}
public ExampleState copy() {
return new ExampleState(new ArrayList<String>(events));
}
public void update(Evt evt) {
events.add(evt.getData());
}
public int size() {
return events.size();
}
@Override
public String toString() {
return events.toString();
}
}
class ExampleProcessor extends UntypedEventsourcedProcessor {
private ExampleState state = new ExampleState();
public int getNumEvents() {
return state.size();
}
public void onReceiveReplay(Object msg) {
if (msg instanceof Evt) {
state.update((Evt) msg);
} else if (msg instanceof SnapshotOffer) {
state = (ExampleState)((SnapshotOffer)msg).snapshot();
}
}
public void onReceiveCommand(Object msg) {
if (msg instanceof Cmd) {
final String data = ((Cmd)msg).getData();
final Evt evt1 = new Evt(data + "-" + getNumEvents());
final Evt evt2 = new Evt(data + "-" + (getNumEvents() + 1));
persist(asList(evt1, evt2), new Procedure<Evt>() {
public void apply(Evt evt) throws Exception {
state.update(evt);
if (evt.equals(evt2)) {
getContext().system().eventStream().publish(evt);
if (data.equals("foo")) getContext().become(otherCommandHandler);
}
}
});
} else if (msg.equals("snap")) {
// IMPORTANT: create a copy of snapshot
// because ExampleState is mutable !!!
saveSnapshot(state.copy());
} else if (msg.equals("print")) {
System.out.println(state);
}
}
private Procedure<Object> otherCommandHandler = new Procedure<Object>() {
public void apply(Object msg) throws Exception {
if (msg instanceof Cmd && ((Cmd)msg).getData().equals("bar")) {
persist(new Evt("bar-" + getNumEvents()), new Procedure<Evt>() {
public void apply(Evt event) throws Exception {
state.update(event);
getContext().unbecome();
}
});
unstashAll();
} else {
stash();
}
}
};
}
//#eventsourced-example
public class EventsourcedExample {
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-4-java");
processor.tell(new Cmd("foo"), null);
processor.tell(new Cmd("baz"), null);
processor.tell(new Cmd("bar"), null);
processor.tell("snap", null);
processor.tell(new Cmd("buzz"), null);
processor.tell("print", null);
Thread.sleep(1000);
system.shutdown();
}
}

View file

@ -51,6 +51,8 @@ public class SnapshotExample {
} else if (message.equals("print")) {
System.out.println("current state = " + state);
} else if (message.equals("snap")) {
// IMPORTANT: create a copy of snapshot
// because ExampleState is mutable !!!
saveSnapshot(state.copy());
}
}

View file

@ -0,0 +1,74 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.persistence
//#eventsourced-example
import akka.actor._
import akka.persistence._
case class Cmd(data: String)
case class Evt(data: String)
case class ExampleState(events: List[String] = Nil) {
def update(evt: Evt) = copy(evt.data :: events)
def size = events.length
override def toString: String = events.reverse.toString
}
class ExampleProcessor extends EventsourcedProcessor {
var state = ExampleState()
def updateState(event: Evt): Unit =
state = state.update(event)
def numEvents =
state.size
val receiveReplay: Receive = {
case evt: Evt updateState(evt)
case SnapshotOffer(_, snapshot: ExampleState) state = snapshot
}
val receiveCommand: Receive = {
case Cmd(data) {
persist(Evt(s"${data}-${numEvents}"))(updateState)
persist(Evt(s"${data}-${numEvents + 1}")) { event
updateState(event)
context.system.eventStream.publish(event)
if (data == "foo") context.become(otherCommandHandler)
}
}
case "snap" saveSnapshot(state)
case "print" println(state)
}
val otherCommandHandler: Receive = {
case Cmd("bar") {
persist(Evt(s"bar-${numEvents}")) { event
updateState(event)
context.unbecome()
}
unstashAll()
}
case other stash()
}
}
//#eventsourced-example
object EventsourcedExample extends App {
val system = ActorSystem("example")
val processor = system.actorOf(Props[ExampleProcessor], "processor-4-scala")
processor ! Cmd("foo")
processor ! Cmd("baz") // will be stashed
processor ! Cmd("bar")
processor ! "snap"
processor ! Cmd("buzz")
processor ! "print"
Thread.sleep(1000)
system.shutdown()
}