!per persistAsync

Breaks binary compatibility because adding new methods to Eventsourced
trait. Since akka-persistence is experimental this is ok, yet
source-level compatibility has been perserved thankfuly :-)

Deprecates:
* Rename of EventsourcedProcessor -> PersistentActor
* Processor -> suggest using PersistentActor
* Migration guide for akka-persistence is separate, as wel'll deprecate in minor versions (its experimental)
* Persistent as well as ConfirmablePersistent - since Processor, their
  main user will be removed soon.

Other changes:
* persistAsync works as expected when mixed with persist
* A counter must be kept for pending stashing invocations
* Uses only 1 shared list buffer for persit / persistAsync
* Includes small benchmark
* Docs also include info about not using Persistent() wrapper
* uses java LinkedList, for best performance of append / head on
  persistInvocations; the get(0) is safe, because these msgs only
  come in response to persistInvocations
* Renamed internal *MessagesSuccess/Failure messages because we kept
  small mistakes seeing the class "with s" and "without s" as the same
* Updated everything that refered to EventsourcedProcessor to
  PersistentActor, including samples

Refs #15227

Conflicts:
	akka-docs/rst/project/migration-guides.rst
	akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala
	akka-persistence/src/main/scala/akka/persistence/Persistent.scala
	akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala
	project/AkkaBuild.scala
This commit is contained in:
Konrad 'ktoso' Malawski 2014-05-21 01:35:21 +02:00
parent 5f3d6029b1
commit d51b79c95a
32 changed files with 907 additions and 134 deletions

View file

@ -18,6 +18,8 @@ import akka.actor.AbstractActor
* Event sourcing mixin for a [[Processor]].
*/
private[persistence] trait Eventsourced extends Processor {
// TODO consolidate these traits as PersistentActor #15230
/**
* Processor recovery state. Waits for recovery completion and then changes to
* `processingCommands`
@ -56,22 +58,44 @@ private[persistence] trait Eventsourced extends Processor {
throw new UnsupportedOperationException("Persistent command batches not supported")
case _: PersistentRepr
throw new UnsupportedOperationException("Persistent commands not supported")
case WriteMessageSuccess(p)
withCurrentPersistent(p)(p persistInvocations.get(0).handler(p.payload))
onWriteComplete()
case s @ WriteMessagesSuccessful Eventsourced.super.aroundReceive(receive, s)
case f: WriteMessagesFailed Eventsourced.super.aroundReceive(receive, f)
case _
doAroundReceive(receive, message)
}
private def doAroundReceive(receive: Receive, message: Any): Unit = {
Eventsourced.super.aroundReceive(receive, LoopMessageSuccess(message))
if (!persistInvocations.isEmpty) {
if (pendingStashingPersistInvocations > 0) {
currentState = persistingEvents
}
if (persistentEventBatch.nonEmpty) {
Eventsourced.super.aroundReceive(receive, PersistentBatch(persistentEventBatch.reverse))
persistInvocations = persistInvocations.reverse
persistentEventBatch = Nil
} else {
processorStash.unstash()
}
}
private def onWriteComplete(): Unit = {
persistInvocations.remove(0)
val nextIsStashing = !persistInvocations.isEmpty && persistInvocations.get(0).isInstanceOf[StashingPersistInvocation]
if (nextIsStashing) {
currentState = persistingEvents
}
if (persistInvocations.isEmpty) {
processorStash.unstash()
}
}
}
/**
@ -86,29 +110,38 @@ private[persistence] trait Eventsourced extends Processor {
case _: ConfirmablePersistent
processorStash.stash()
case PersistentBatch(b)
b.foreach(p deleteMessage(p.sequenceNr, true))
b.foreach(p deleteMessage(p.sequenceNr, permanent = true))
throw new UnsupportedOperationException("Persistent command batches not supported")
case p: PersistentRepr
deleteMessage(p.sequenceNr, true)
deleteMessage(p.sequenceNr, permanent = true)
throw new UnsupportedOperationException("Persistent commands not supported")
case WriteMessageSuccess(p)
withCurrentPersistent(p)(p persistInvocations.head._2(p.payload))
onWriteComplete()
val invocation = persistInvocations.get(0)
withCurrentPersistent(p)(p invocation.handler(p.payload))
onWriteComplete(invocation)
case e @ WriteMessageFailure(p, _)
Eventsourced.super.aroundReceive(receive, message) // stops actor by default
onWriteComplete()
case s @ WriteMessagesSuccess Eventsourced.super.aroundReceive(receive, s)
case f: WriteMessagesFailure Eventsourced.super.aroundReceive(receive, f)
case other processorStash.stash()
onWriteComplete(persistInvocations.get(0))
case s @ WriteMessagesSuccessful Eventsourced.super.aroundReceive(receive, s)
case f: WriteMessagesFailed Eventsourced.super.aroundReceive(receive, f)
case other processorStash.stash()
}
def onWriteComplete(): Unit = {
persistInvocations = persistInvocations.tail
if (persistInvocations.isEmpty) {
private def onWriteComplete(invocation: PersistInvocation): Unit = {
if (invocation.isInstanceOf[StashingPersistInvocation]) {
// enables an early return to `processingCommands`, because if this counter hits `0`,
// we know the remaining persistInvocations are all `persistAsync` created, which
// means we can go back to processing commands also - and these callbacks will be called as soon as possible
pendingStashingPersistInvocations -= 1
}
persistInvocations.remove(0)
if (persistInvocations.isEmpty || pendingStashingPersistInvocations == 0) {
currentState = processingCommands
processorStash.unstash()
}
}
}
/**
@ -126,7 +159,18 @@ private[persistence] trait Eventsourced extends Processor {
receiveRecover(f)
}
private var persistInvocations: List[(Any, Any Unit)] = Nil
sealed trait PersistInvocation {
def handler: Any Unit
}
/** forces processor to stash incoming commands untill all these invocations are handled */
final case class StashingPersistInvocation(evt: Any, handler: Any Unit) extends PersistInvocation
/** does not force the processor to stash commands */
final case class AsyncPersistInvocation(evt: Any, handler: Any Unit) extends PersistInvocation
/** Used instead of iterating `persistInvocations` in order to check if safe to revert to processing commands */
private var pendingStashingPersistInvocations: Long = 0
/** Holds user-supplied callbacks for persist/persistAsync calls */
private val persistInvocations = new java.util.LinkedList[PersistInvocation]() // we only append / isEmpty / get(0) on it
private var persistentEventBatch: List[PersistentRepr] = Nil
private var currentState: State = recovering
@ -151,11 +195,12 @@ private[persistence] trait Eventsourced extends Processor {
* If persistence of an event fails, the processor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[receiveCommand]].
*
* @param event event to be persisted.
* @param event event to be persisted
* @param handler handler for each persisted `event`
*/
final def persist[A](event: A)(handler: A Unit): Unit = {
persistInvocations = (event, handler.asInstanceOf[Any Unit]) :: persistInvocations
pendingStashingPersistInvocations += 1
persistInvocations addLast StashingPersistInvocation(event, handler.asInstanceOf[Any Unit])
persistentEventBatch = PersistentRepr(event) :: persistentEventBatch
}
@ -164,12 +209,47 @@ private[persistence] trait Eventsourced extends Processor {
* `persist[A](event: A)(handler: A => Unit)` multiple times with the same `handler`,
* except that `events` are persisted atomically with this method.
*
* @param events events to be persisted.
* @param events events to be persisted
* @param handler handler for each persisted `events`
*/
final def persist[A](events: immutable.Seq[A])(handler: A Unit): Unit =
events.foreach(persist(_)(handler))
/**
* Asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event.
*
* Unlike `persist` the processor will continue to receive incomming commands between the
* call to `persist` and executing it's `handler`. This asynchronous, non-stashing, version of
* of persist should be used when you favor throughput over the "command-2 only processed after
* command-1 effects' have been applied" guarantee, which is provided by the plain [[persist]] method.
*
* An event `handler` may close over processor state and modify it. The `sender` of a persisted
* event is the sender of the corresponding command. This means that one can reply to a command
* sender within an event `handler`.
*
* If persistence of an event fails, the processor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[receiveCommand]].
*
* @param event event to be persisted
* @param handler handler for each persisted `event`
*/
final def persistAsync[A](event: A)(handler: A Unit): Unit = {
persistInvocations addLast AsyncPersistInvocation(event, handler.asInstanceOf[Any Unit])
persistentEventBatch = PersistentRepr(event) :: persistentEventBatch
}
/**
* Asynchronously persists `events` in specified order. This is equivalent to calling
* `persistAsync[A](event: A)(handler: A => Unit)` multiple times with the same `handler`,
* except that `events` are persisted atomically with this method.
*
* @param events events to be persisted
* @param handler handler for each persisted `events`
*/
final def persistAsync[A](events: immutable.Seq[A])(handler: A Unit): Unit =
events.foreach(persistAsync(_)(handler))
/**
* Recovery handler that receives persisted events during recovery. If a state snapshot
* has been captured and saved, this handler will receive a [[SnapshotOffer]] message
@ -235,13 +315,31 @@ private[persistence] trait Eventsourced extends Processor {
/**
* An event sourced processor.
*/
@deprecated("EventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent `akka.persistence.PersistentProcessor`", since = "2.3.4")
trait EventsourcedProcessor extends Processor with Eventsourced {
// TODO remove Processor #15230
def receive = receiveCommand
}
/**
* An persistent Actor - can be used to implement command or event sourcing.
*/
// TODO remove EventsourcedProcessor / Processor #15230
trait PersistentActor extends EventsourcedProcessor
/**
* Java API: an persistent actor - can be used to implement command or event sourcing.
*/
abstract class UntypedPersistentActor extends UntypedEventsourcedProcessor
/**
* Java API: an persistent actor - can be used to implement command or event sourcing.
*/
abstract class AbstractPersistentActor extends AbstractEventsourcedProcessor
/**
* Java API: an event sourced processor.
*/
@deprecated("UntypedEventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent `akka.persistence.PersistentProcessor`", since = "2.3.4")
abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Eventsourced {
final def onReceive(message: Any) = onReceiveCommand(message)
@ -289,6 +387,39 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events
final def persist[A](events: JIterable[A], handler: Procedure[A]): Unit =
persist(Util.immutableSeq(events))(event handler(event))
/**
* JAVA API: asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event.
*
* Unlike `persist` the processor will continue to receive incomming commands between the
* call to `persist` and executing it's `handler`. This asynchronous, non-stashing, version of
* of persist should be used when you favor throughput over the "command-2 only processed after
* command-1 effects' have been applied" guarantee, which is provided by the plain [[persist]] method.
*
* An event `handler` may close over processor state and modify it. The `sender` of a persisted
* event is the sender of the corresponding command. This means that one can reply to a command
* sender within an event `handler`.
*
* If persistence of an event fails, the processor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[receiveCommand]].
*
* @param event event to be persisted
* @param handler handler for each persisted `event`
*/
final def persistAsync[A](event: A)(handler: Procedure[A]): Unit =
super[Eventsourced].persistAsync(event)(event handler(event))
/**
* JAVA API: asynchronously persists `events` in specified order. This is equivalent to calling
* `persistAsync[A](event: A)(handler: A => Unit)` multiple times with the same `handler`,
* except that `events` are persisted atomically with this method.
*
* @param events events to be persisted
* @param handler handler for each persisted `events`
*/
final def persistAsync[A](events: JIterable[A])(handler: A Unit): Unit =
super[Eventsourced].persistAsync(Util.immutableSeq(events))(event handler(event))
/**
* Java API: recovery handler that receives persisted events during recovery. If a state snapshot
* has been captured and saved, this handler will receive a [[SnapshotOffer]] message
@ -322,6 +453,7 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events
* [[PersistentBatch]] messages. In this case an `UnsupportedOperationException` is
* thrown by the processor.
*/
@deprecated("AbstractEventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent `akka.persistence.PersistentProcessor`", since = "2.3.4")
abstract class AbstractEventsourcedProcessor extends AbstractActor with EventsourcedProcessor {
/**
* Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the
@ -359,9 +491,37 @@ abstract class AbstractEventsourcedProcessor extends AbstractActor with Eventsou
final def persist[A](events: JIterable[A], handler: Procedure[A]): Unit =
persist(Util.immutableSeq(events))(event handler(event))
/**
* Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event.
*
* Unlike `persist` the processor will continue to receive incomming commands between the
* call to `persistAsync` and executing it's `handler`. This asynchronous, non-stashing, version of
* of persist should be used when you favor throughput over the strict ordering guarantees that `persist` guarantees.
*
* If persistence of an event fails, the processor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[receiveCommand]].
*
* @param event event to be persisted
* @param handler handler for each persisted `event`
*/
final def persistAsync[A](event: A, handler: Procedure[A]): Unit =
persistAsync(event)(event handler(event))
/**
* Java API: asynchronously persists `events` in specified order. This is equivalent to calling
* `persistAsync[A](event: A)(handler: A => Unit)` multiple times with the same `handler`,
* except that `events` are persisted atomically with this method.
*
* @param events events to be persisted
* @param handler handler for each persisted `events`
*/
final def persistAsync[A](events: JIterable[A], handler: Procedure[A]): Unit =
persistAsync(Util.immutableSeq(events))(event handler(event))
override def receive = super[EventsourcedProcessor].receive
override def receive(receive: Receive): Unit = {
throw new IllegalArgumentException("Define the behavior by overriding receiveRecover and receiveCommand")
}
}
}