+per #3661 Event sourcing support

This commit is contained in:
Martin Krasser 2013-10-15 09:01:07 +02:00
parent a30ca0d3d4
commit 0a2cfdc4d1
13 changed files with 948 additions and 71 deletions

View file

@ -4,16 +4,9 @@
package akka.persistence
import akka.AkkaException
import akka.actor._
import akka.dispatch._
/**
* Thrown by a [[Processor]] if a journal failed to replay all requested messages.
*/
@SerialVersionUID(1L)
case class ReplayFailureException(message: String, cause: Throwable) extends AkkaException(message, cause)
/**
* An actor that persists (journals) messages of type [[Persistent]]. Messages of other types are not persisted.
*
@ -76,15 +69,8 @@ trait Processor extends Actor with Stash {
protected def process(receive: Actor.Receive, message: Any) =
receive.applyOrElse(message, unhandled)
protected def processPersistent(receive: Actor.Receive, persistent: Persistent) = try {
_currentPersistent = persistent
updateLastSequenceNr(persistent)
receive.applyOrElse(persistent, unhandled)
} finally _currentPersistent = null
protected def updateLastSequenceNr(persistent: Persistent) {
if (persistent.sequenceNr > _lastSequenceNr) _lastSequenceNr = persistent.sequenceNr
}
protected def processPersistent(receive: Actor.Receive, persistent: Persistent) =
withCurrentPersistent(persistent)(receive.applyOrElse(_, unhandled))
}
/**
@ -98,7 +84,7 @@ trait Processor extends Actor with Stash {
_currentState = recoveryStarted
snapshotStore ! LoadSnapshot(processorId, fromSnap, toSnr)
}
case _ stashInternal()
case _ processorStash.stash()
}
}
@ -123,11 +109,15 @@ trait Processor extends Actor with Stash {
case ReplaySuccess(maxSnr) {
_currentState = recoverySucceeded
_sequenceNr = maxSnr
unstashAllInternal()
processorStash.unstashAll()
}
case ReplayFailure(cause) {
val errorMsg = s"Replay failure by journal (processor id = [${processorId}])"
throw new ReplayFailureException(errorMsg, cause)
val notification = RecoveryFailure(cause)
if (receive.isDefinedAt(notification)) process(receive, notification)
else {
val errorMsg = s"Replay failure by journal (processor id = [${processorId}])"
throw new RecoveryFailureException(errorMsg, cause)
}
}
case Replayed(p) try { processPersistent(receive, p) } catch {
case t: Throwable {
@ -137,7 +127,7 @@ trait Processor extends Actor with Stash {
}
}
case r: Recover // ignore
case _ stashInternal()
case _ processorStash.stash()
}
}
@ -182,7 +172,7 @@ trait Processor extends Actor with Stash {
}
case Replayed(p) updateLastSequenceNr(p)
case r: Recover // ignore
case _ stashInternal()
case _ processorStash.stash()
}
}
@ -263,7 +253,23 @@ trait Processor extends Actor with Stash {
/**
* INTERNAL API.
*/
final override protected[akka] def aroundReceive(receive: Actor.Receive, message: Any): Unit = {
protected[persistence] def withCurrentPersistent(persistent: Persistent)(body: Persistent Unit): Unit = try {
_currentPersistent = persistent
updateLastSequenceNr(persistent)
body(persistent)
} finally _currentPersistent = null
/**
* INTERNAL API.
*/
protected[persistence] def updateLastSequenceNr(persistent: Persistent) {
if (persistent.sequenceNr > _lastSequenceNr) _lastSequenceNr = persistent.sequenceNr
}
/**
* INTERNAL API.
*/
override protected[akka] def aroundReceive(receive: Actor.Receive, message: Any): Unit = {
_currentState.aroundReceive(receive, message)
}
@ -287,7 +293,7 @@ trait Processor extends Actor with Stash {
final override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = {
try {
unstashAll(unstashFilterPredicate)
unstashAllInternal()
processorStash.unstashAll()
} finally {
message match {
case Some(WriteSuccess(m)) preRestartDefault(reason, Some(m))
@ -335,27 +341,44 @@ trait Processor extends Actor with Stash {
// Processor-internal stash
// -----------------------------------------------------
private def unstashFilterPredicate: Any Boolean = {
private val unstashFilterPredicate: Any Boolean = {
case _: WriteSuccess false
case _: Replayed false
case _ true
}
private var processorStash = Vector.empty[Envelope]
private def stashInternal(): Unit = {
processorStash :+= currentEnvelope
}
private def unstashAllInternal(): Unit = try {
val i = processorStash.reverseIterator
while (i.hasNext) mailbox.enqueueFirst(self, i.next())
} finally {
processorStash = Vector.empty[Envelope]
}
private val processorStash =
createProcessorStash
private def currentEnvelope: Envelope =
context.asInstanceOf[ActorCell].currentMessage
/**
* INTERNAL API.
*/
private[persistence] def createProcessorStash = new ProcessorStash {
var theStash = Vector.empty[Envelope]
def stash(): Unit =
theStash :+= currentEnvelope
def unstashAll(): Unit = try {
val i = theStash.reverseIterator
while (i.hasNext) mailbox.enqueueFirst(self, i.next())
} finally {
theStash = Vector.empty[Envelope]
}
}
}
/**
* INTERNAL API.
*
* Processor specific stash used internally to avoid interference with user stash.
*/
private[persistence] trait ProcessorStash {
def stash()
def unstashAll()
}
/**
@ -421,3 +444,4 @@ abstract class UntypedProcessor extends UntypedActor with Processor {
*/
def getCurrentPersistentMessage = currentPersistentMessage.getOrElse(null)
}