Merge pull request #15381 from ktoso/port-defer

!per PersistentActor#defer (forward port)
This commit is contained in:
Konrad Malawski 2014-06-10 18:40:19 +02:00
commit ac671c2991
19 changed files with 905 additions and 159 deletions

View file

@ -0,0 +1,15 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import akka.actor.Actor
/** only as a "the best we could possibly get" baseline, does not persist anything */
class BaselineActor(respondAfter: Int) extends Actor {
override def receive = {
case n: Int => if (n == respondAfter) sender() ! n
}
}
final case class Evt(i: Int)

View file

@ -0,0 +1,140 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import org.openjdk.jmh.annotations._
import org.openjdk.jmh._
import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.testkit.TestProbe
import java.io.File
import org.apache.commons.io.FileUtils
import org.openjdk.jmh.annotations.Scope
/*
# OS: OSX 10.9.3
# CPU: Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz
# Date: Mon Jun 9 13:22:42 CEST 2014
[info] Benchmark Mode Samples Mean Mean error Units
[info] a.p.PersistentActorDeferBenchmark.tell_persistAsync_defer_persistAsync_reply thrpt 10 6.858 0.515 ops/ms
[info] a.p.PersistentActorDeferBenchmark.tell_persistAsync_defer_persistAsync_replyASAP thrpt 10 20.256 2.941 ops/ms
[info] a.p.PersistentActorDeferBenchmark.tell_processor_Persistent_reply thrpt 10 6.531 0.114 ops/ms
[info] a.p.PersistentActorDeferBenchmark.tell_processor_Persistent_replyASAP thrpt 10 26.000 0.694 ops/ms
*/
@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
class PersistentActorDeferBenchmark {
val config = PersistenceSpec.config("leveldb", "benchmark")
lazy val storageLocations = List(
"akka.persistence.journal.leveldb.dir",
"akka.persistence.journal.leveldb-shared.store.dir",
"akka.persistence.snapshot-store.local.dir").map(s new File(system.settings.config.getString(s)))
var system: ActorSystem = _
var probe: TestProbe = _
var processor: ActorRef = _
var processor_replyASAP: ActorRef = _
var persistAsync_defer: ActorRef = _
var persistAsync_defer_replyASAP: ActorRef = _
val data10k = (1 to 10000).toArray
@Setup
def setup() {
system = ActorSystem("test", config)
probe = TestProbe()(system)
storageLocations.foreach(FileUtils.deleteDirectory)
processor = system.actorOf(Props(classOf[`processor, forward Persistent, like defer`], data10k.last), "p-1")
processor_replyASAP = system.actorOf(Props(classOf[`processor, forward Persistent, reply ASAP`], data10k.last), "p-2")
persistAsync_defer = system.actorOf(Props(classOf[`persistAsync, defer`], data10k.last), "a-1")
persistAsync_defer_replyASAP = system.actorOf(Props(classOf[`persistAsync, defer, respond ASAP`], data10k.last), "a-2")
}
@TearDown
def shutdown() {
system.shutdown()
system.awaitTermination()
storageLocations.foreach(FileUtils.deleteDirectory)
}
@GenerateMicroBenchmark
@OperationsPerInvocation(10000)
def tell_processor_Persistent_reply() {
for (i <- data10k) processor.tell(i, probe.ref)
probe.expectMsg(data10k.last)
}
@GenerateMicroBenchmark
@OperationsPerInvocation(10000)
def tell_processor_Persistent_replyASAP() {
for (i <- data10k) processor_replyASAP.tell(i, probe.ref)
probe.expectMsg(data10k.last)
}
@GenerateMicroBenchmark
@OperationsPerInvocation(10000)
def tell_persistAsync_defer_persistAsync_reply() {
for (i <- data10k) persistAsync_defer.tell(i, probe.ref)
probe.expectMsg(data10k.last)
}
@GenerateMicroBenchmark
@OperationsPerInvocation(10000)
def tell_persistAsync_defer_persistAsync_replyASAP() {
for (i <- data10k) persistAsync_defer_replyASAP.tell(i, probe.ref)
probe.expectMsg(data10k.last)
}
}
class `processor, forward Persistent, like defer`(respondAfter: Int) extends Processor {
def receive = {
case n: Int =>
self forward Persistent(Evt(n))
self forward Evt(n)
case Persistent(p) => // ignore
case Evt(n) if n == respondAfter => sender() ! respondAfter
}
}
class `processor, forward Persistent, reply ASAP`(respondAfter: Int) extends Processor {
def receive = {
case n: Int =>
self forward Persistent(Evt(n))
if (n == respondAfter) sender() ! respondAfter
case _ => // ignore
}
}
class `persistAsync, defer`(respondAfter: Int) extends PersistentActor {
override def receiveCommand = {
case n: Int =>
persistAsync(Evt(n)) { e => }
defer(Evt(n)) { e => if (e.i == respondAfter) sender() ! e.i }
}
override def receiveRecover = {
case _ => // do nothing
}
}
class `persistAsync, defer, respond ASAP`(respondAfter: Int) extends PersistentActor {
override def receiveCommand = {
case n: Int =>
persistAsync(Evt(n)) { e => }
defer(Evt(n)) { e => }
if (n == respondAfter) sender() ! n
}
override def receiveRecover = {
case _ => // do nothing
}
}

View file

@ -61,7 +61,7 @@ class PersistentActorThroughputBenchmark {
def tell_normalActor_reply_baseline() {
for (i <- data10k) actor.tell(i, probe.ref)
probe.expectMsg(Evt(data10k.last))
probe.expectMsg(data10k.last)
}
@GenerateMicroBenchmark
@ -95,9 +95,9 @@ class PersistentActorThroughputBenchmark {
probe.expectMsg(Evt(data10k.last))
}
}
final case class Evt(i: Int)
class Persist1EventPersistentActor(respondAfter: Int) extends PersistentActor {
override def receiveCommand = {
@ -135,10 +135,3 @@ class PersistAsync1EventQuickReplyPersistentActor(respondAfter: Int) extends Per
case _ => // do nothing
}
}
/** only as a "the best we could possibly get" baseline, does not persist anything */
class BaselineActor(respondAfter: Int) extends Actor {
override def receive = {
case n: Int => if (n == respondAfter) sender() ! Evt(n)
}
}

View file

@ -319,9 +319,9 @@ public class PersistenceDocTest {
final ActorRef processor = system.actorOf(Props.create(MyProcessor.class));
public void batchWrite() {
processor.tell(PersistentBatch.create(asList(
Persistent.create("a"),
Persistent.create("b"))), null);
processor.tell(PersistentBatch.create(asList(
Persistent.create("a"),
Persistent.create("b"))), null);
}
// ...
@ -423,7 +423,7 @@ public class PersistenceDocTest {
public void usage() {
final ActorSystem system = ActorSystem.create("example");
//#view-update
//#persist-async-usage
final ActorRef processor = system.actorOf(Props.create(MyPersistentActor.class));
processor.tell("a", null);
processor.tell("b", null);
@ -435,11 +435,56 @@ public class PersistenceDocTest {
// evt-a-2
// evt-b-1
// evt-b-2
//#view-update
//#persist-async-usage
}
};
static Object o10 = new Object() {
//#defer
class MyPersistentActor extends UntypedPersistentActor {
@Override
public void onReceiveRecover(Object msg) {
// handle recovery here
}
@Override
public void onReceiveCommand(Object msg) {
final Procedure<String> replyToSender = new Procedure<String>() {
@Override
public void apply(String event) throws Exception {
sender().tell(event, self());
}
};
persistAsync(String.format("evt-%s-1", msg), replyToSender);
persistAsync(String.format("evt-%s-2", msg), replyToSender);
defer(String.format("evt-%s-3", msg), replyToSender);
}
}
//#defer
public void usage() {
final ActorSystem system = ActorSystem.create("example");
//#defer-caller
final ActorRef processor = system.actorOf(Props.create(MyPersistentActor.class));
processor.tell("a", null);
processor.tell("b", null);
// order of received messages:
// a
// b
// evt-a-1
// evt-a-2
// evt-a-3
// evt-b-1
// evt-b-2
// evt-b-3
//#defer-caller
}
};
static Object o11 = new Object() {
//#view
class MyView extends UntypedView {
@Override

View file

@ -40,9 +40,14 @@ Akka persistence is a separate jar file. Make sure that you have the following d
Architecture
============
* *Processor*: A processor is a persistent, stateful actor. Messages sent to a processor are written to a journal
before its behavior is called. When a processor is started or restarted, journaled messages are replayed
to that processor, so that it can recover internal state from these messages.
* *Processor* (deprecated, use *PersistentActor* instead): A processor is a persistent, stateful actor. Messages sent
to a processor are written to a journal before its ``onReceive`` method is called. When a processor is started or
restarted, journaled messages are replayed to that processor, so that it can recover internal state from these messages.
* *PersistentActor*: Is a persistent, stateful actor. It is able to persist events to a journal and can react to
them in a thread-safe manner. It can be used to implement both *command* as well as *event sourced* actors.
When a persistent actor is started or restarted, journaled messages are replayed to that actor, so that it can
recover internal state from these messages.
* *View*: A view is a persistent, stateful actor that receives journaled messages that have been written by another
processor. A view itself does not journal new messages, instead, it updates internal state only from a processor's
@ -427,6 +432,11 @@ use the ``deleteSnapshots`` method.
Event sourcing
==============
.. note::
The ``PersistentActor`` introduced in this section was previously known as ``EventsourcedProcessor``,
which was a subset of the ``PersistentActor``. Migrating your code to use persistent actors instead is
very simple and is explained in the :ref:`migration-guide-persistence-experimental-2.3.x-2.4.x`.
In all the examples so far, messages that change a processor's state have been sent as ``Persistent`` messages
by an application, so that they can be replayed during recovery. From this point of view, the journal acts as
a write-ahead-log for whatever ``Persistent`` messages a processor receives. This is also known as *command
@ -484,6 +494,50 @@ It contains instructions on how to run the ``PersistentActorExample``.
recovery you need to take special care to perform the same state transitions with ``become`` and
``unbecome`` in the ``receiveRecover`` method as you would have done in the command handler.
Relaxed local consistency requirements and high throughput use-cases
--------------------------------------------------------------------
If faced with Relaxed local consistency requirements and high throughput demands sometimes ``PersistentActor`` and it's
``persist`` may not be enough in terms of consuming incoming Commands at a high rate, because it has to wait until all
Events related to a given Command are processed in order to start processing the next Command. While this abstraction is
very useful for most cases, sometimes you may be faced with relaxed requirements about consistency for example you may
want to process commands as fast as you can, assuming that Event will eventually be persisted and handled properly in
the background and retroactively reacting to persistence failures if needed.
The ``persistAsync`` method provides a tool for implementing high-throughput processors. It will *not*
stash incoming Commands while the Journal is still working on persisting and/or user code is executing event callbacks.
In the below example, the event callbacks may be called "at any time", even after the next Command has been processed.
The ordering between events is still guaranteed ("evt-b-1" will be sent after "evt-a-2", which will be sent after "evt-a-1" etc.).
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#persist-async
Notice that the client does not have to wrap any messages in the `Persistent` class in order to obtain "command sourcing like"
semantics. It's up to the processor to decide about persisting (or not) of messages, unlike ``Processor`` where the sender had to be aware of this decision.
.. note::
In order to implement the pattern known as "*command sourcing*" simply ``persistAsync`` all incoming events right away,
and handle them in the callback.
.. _defer-java-lambda:
Deferring actions until preceeding persist handlers have executed
-----------------------------------------------------------------
Sometimes when working with ``persistAsync`` you may find that it would be nice to define some actions in terms of
''happens-after the previous ``persistAsync`` handlers have been invoked''. ``PersistentActor`` provides an utility method
called ``defer``, which works similarily to ``persistAsync`` yet does not persist the passed in event. It is recommended to
use it for *read* operations, and actions which do not have corresponding events in your domain model.
Using this method is very similar to the persist family of methods, yet it does **not** persist the passed in event.
It will be kept in memory and used when invoking the handler.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#defer
Notice that the ``sender()`` is **safe** to access in the handler callback, and will be pointing to the original sender
of the command for which this ``defer`` handler was called.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#defer-caller
Reliable event delivery
-----------------------

View file

@ -546,10 +546,34 @@ The ordering between events is still guaranteed ("evt-b-1" will be sent after "e
.. includecode:: code/docs/persistence/PersistenceDocTest.java#persist-async
Notice that the client does not have to wrap any messages in the `Persistent` class in order to obtain "command sourcing like"
semantics. It's up to the processor to decide about persisting (or not) of messages, unlike ``Processor`` where the sender had to be aware of this decision.
.. note::
In order to implement the pattern known as "*command sourcing*" simply ``persistAsync`` all incoming events right away,
and handle them in the callback.
.. _defer-java:
Deferring actions until preceeding persist handlers have executed
-----------------------------------------------------------------
Sometimes when working with ``persistAsync`` you may find that it would be nice to define some actions in terms of
''happens-after the previous ``persistAsync`` handlers have been invoked''. ``PersistentActor`` provides an utility method
called ``defer``, which works similarily to ``persistAsync`` yet does not persist the passed in event. It is recommended to
use it for *read* operations, and actions which do not have corresponding events in your domain model.
Using this method is very similar to the persist family of methods, yet it does **not** persist the passed in event.
It will be kept in memory and used when invoking the handler.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#defer
Notice that the ``sender()`` is **safe** to access in the handler callback, and will be pointing to the original sender
of the command for which this ``defer`` handler was called.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#defer-caller
Reliable event delivery
-----------------------

View file

@ -23,7 +23,7 @@ trait PersistenceDocSpec {
trait SomeOtherMessage
val system: ActorSystem
implicit val system: ActorSystem
import system._
@ -33,11 +33,11 @@ trait PersistenceDocSpec {
class MyProcessor extends Processor {
def receive = {
case Persistent(payload, sequenceNr) =>
case Persistent(payload, sequenceNr) =>
// message successfully written to journal
case PersistenceFailure(payload, sequenceNr, cause) =>
// message failed to be written to journal
case m: SomeOtherMessage =>
case m: SomeOtherMessage =>
// message not written to journal
}
}
@ -367,6 +367,45 @@ trait PersistenceDocSpec {
//#persist-async
}
new AnyRef {
import akka.actor.ActorRef
val processor = system.actorOf(Props[MyPersistentActor]())
//#defer
class MyPersistentActor extends PersistentActor {
def receiveRecover: Receive = {
case _ => // handle recovery here
}
def receiveCommand: Receive = {
case c: String => {
sender() ! c
persistAsync(s"evt-$c-1") { e => sender() ! e }
persistAsync(s"evt-$c-2") { e => sender() ! e }
defer(s"evt-$c-3") { e => sender() ! e }
}
}
}
//#defer
//#defer-caller
processor ! "a"
processor ! "b"
// order of received messages:
// a
// b
// evt-a-1
// evt-a-2
// evt-a-3
// evt-b-1
// evt-b-2
// evt-b-3
//#defer-caller
}
new AnyRef {
import akka.actor.Props
@ -385,4 +424,47 @@ trait PersistenceDocSpec {
view ! Update(await = true)
//#view-update
}
new AnyRef {
// ------------------------------------------------------------------------------------------------
// FIXME: uncomment once going back to project dependencies (in akka-stream-experimental)
// ------------------------------------------------------------------------------------------------
/*
//#producer-creation
import org.reactivestreams.api.Producer
import akka.persistence.Persistent
import akka.persistence.stream.{ PersistentFlow, PersistentPublisherSettings }
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl.Flow
val materializer = FlowMaterializer(MaterializerSettings())
val flow: Flow[Persistent] = PersistentFlow.fromProcessor("some-processor-id")
val producer: Producer[Persistent] = flow.toProducer(materializer)
//#producer-creation
//#producer-buffer-size
PersistentFlow.fromProcessor("some-processor-id", PersistentPublisherSettings(maxBufferSize = 200))
//#producer-buffer-size
//#producer-examples
// 1 producer and 2 consumers:
val producer1: Producer[Persistent] =
PersistentFlow.fromProcessor("processor-1").toProducer(materializer)
Flow(producer1).foreach(p => println(s"consumer-1: ${p.payload}")).consume(materializer)
Flow(producer1).foreach(p => println(s"consumer-2: ${p.payload}")).consume(materializer)
// 2 producers (merged) and 1 consumer:
val producer2: Producer[Persistent] =
PersistentFlow.fromProcessor("processor-2").toProducer(materializer)
val producer3: Producer[Persistent] =
PersistentFlow.fromProcessor("processor-3").toProducer(materializer)
Flow(producer2).merge(producer3).foreach { p =>
println(s"consumer-3: ${p.payload}")
}.consume(materializer)
//#producer-examples
*/
}
}

View file

@ -260,7 +260,7 @@ request instructs a channel to send a ``Persistent`` message to a destination.
preserved by a channel, therefore, a destination can reply to the sender of a ``Deliver`` request.
.. note::
Sending via a channel has at-least-once delivery semantics—by virtue of either
the sending actor or the channel being persistent—which means that the
semantics do not match those of a normal :class:`ActorRef` send operation:
@ -558,14 +558,34 @@ The ordering between events is still guaranteed ("evt-b-1" will be sent after "e
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#persist-async
Notice that the client does not have to wrap any messages in the `Persistent` class in order to obtain "command sourcing like"
semantics. It's up to the processor to decide about persisting (or not) of messages, unlike ``Processor`` where this decision
was made by the sender.
semantics. It's up to the processor to decide about persisting (or not) of messages, unlike ``Processor`` where the sender had to be aware of this decision.
.. note::
In order to implement the "*command sourcing*" simply call ``persistAsync(cmd)(...)`` right away on all incomming
messages right away, and handle them in the callback.
.. _defer-scala:
Deferring actions until preceeding persist handlers have executed
-----------------------------------------------------------------
Sometimes when working with ``persistAsync`` you may find that it would be nice to define some actions in terms of
''happens-after the previous ``persistAsync`` handlers have been invoked''. ``PersistentActor`` provides an utility method
called ``defer``, which works similarily to ``persistAsync`` yet does not persist the passed in event. It is recommended to
use it for *read* operations, and actions which do not have corresponding events in your domain model.
Using this method is very similar to the persist family of methods, yet it does **not** persist the passed in event.
It will be kept in memory and used when invoking the handler.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#defer
Notice that the ``sender()`` is **safe** to access in the handler callback, and will be pointing to the original sender
of the command for which this ``defer`` handler was called.
The calling side will get the responses in this (guaranteed) order:
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#defer-caller
Reliable event delivery
-----------------------

View file

@ -10,7 +10,7 @@ import scala.collection.immutable
import akka.japi.{ Procedure, Util }
import akka.persistence.JournalProtocol._
import akka.actor.AbstractActor
import akka.actor.{ ActorRef, AbstractActor }
/**
* INTERNAL API.
@ -58,8 +58,15 @@ private[persistence] trait Eventsourced extends Processor {
throw new UnsupportedOperationException("Persistent command batches not supported")
case _: PersistentRepr
throw new UnsupportedOperationException("Persistent commands not supported")
case WriteMessageSuccess(p)
withCurrentPersistent(p)(p persistInvocations.get(0).handler(p.payload))
case WriteMessageSuccess(r)
r match {
case p: PersistentRepr
withCurrentPersistent(p)(p pendingInvocations.peek().handler(p.payload))
case _ pendingInvocations.peek().handler(r.payload)
}
onWriteComplete()
case LoopMessageSuccess(l)
pendingInvocations.peek().handler(l)
onWriteComplete()
case s @ WriteMessagesSuccessful Eventsourced.super.aroundReceive(receive, s)
case f: WriteMessagesFailed Eventsourced.super.aroundReceive(receive, f)
@ -74,28 +81,13 @@ private[persistence] trait Eventsourced extends Processor {
currentState = persistingEvents
}
if (persistentEventBatch.nonEmpty) {
Eventsourced.super.aroundReceive(receive, PersistentBatch(persistentEventBatch.reverse))
persistentEventBatch = Nil
} else {
processorStash.unstash()
}
if (resequenceableEventBatch.nonEmpty) flushBatch()
else processorStash.unstash()
}
private def onWriteComplete(): Unit = {
persistInvocations.remove(0)
val nextIsStashing = !persistInvocations.isEmpty && persistInvocations.get(0).isInstanceOf[StashingPersistInvocation]
if (nextIsStashing) {
currentState = persistingEvents
}
if (persistInvocations.isEmpty) {
processorStash.unstash()
}
pendingInvocations.pop()
}
}
/**
@ -106,37 +98,48 @@ private[persistence] trait Eventsourced extends Processor {
private val persistingEvents: State = new State {
override def toString: String = "persisting events"
def aroundReceive(receive: Receive, message: Any) = message match {
def aroundReceive(receive: Receive, message: Any): Unit = message match {
case _: ConfirmablePersistent
processorStash.stash()
case PersistentBatch(b)
b.foreach(p deleteMessage(p.sequenceNr, permanent = true))
b foreach {
case p: PersistentRepr deleteMessage(p.sequenceNr, permanent = true)
case r // ignore, nothing to delete (was not a persistent message)
}
throw new UnsupportedOperationException("Persistent command batches not supported")
case p: PersistentRepr
deleteMessage(p.sequenceNr, permanent = true)
throw new UnsupportedOperationException("Persistent commands not supported")
case WriteMessageSuccess(p)
val invocation = persistInvocations.get(0)
withCurrentPersistent(p)(p invocation.handler(p.payload))
onWriteComplete(invocation)
case WriteMessageSuccess(m)
m match {
case p: PersistentRepr withCurrentPersistent(p)(p pendingInvocations.peek().handler(p.payload))
case _ pendingInvocations.peek().handler(m.payload)
}
onWriteComplete()
case e @ WriteMessageFailure(p, _)
Eventsourced.super.aroundReceive(receive, message) // stops actor by default
onWriteComplete(persistInvocations.get(0))
onWriteComplete()
case LoopMessageSuccess(l)
pendingInvocations.peek().handler(l)
onWriteComplete()
case s @ WriteMessagesSuccessful Eventsourced.super.aroundReceive(receive, s)
case f: WriteMessagesFailed Eventsourced.super.aroundReceive(receive, f)
case other processorStash.stash()
}
private def onWriteComplete(invocation: PersistInvocation): Unit = {
if (invocation.isInstanceOf[StashingPersistInvocation]) {
// enables an early return to `processingCommands`, because if this counter hits `0`,
// we know the remaining persistInvocations are all `persistAsync` created, which
// means we can go back to processing commands also - and these callbacks will be called as soon as possible
pendingStashingPersistInvocations -= 1
private def onWriteComplete(): Unit = {
pendingInvocations.pop() match {
case _: StashingHandlerInvocation
// enables an early return to `processingCommands`, because if this counter hits `0`,
// we know the remaining pendingInvocations are all `persistAsync` created, which
// means we can go back to processing commands also - and these callbacks will be called as soon as possible
pendingStashingPersistInvocations -= 1
case _ // do nothing
}
persistInvocations.remove(0)
if (persistInvocations.isEmpty || pendingStashingPersistInvocations == 0) {
if (pendingStashingPersistInvocations == 0) {
currentState = processingCommands
processorStash.unstash()
}
@ -161,23 +164,29 @@ private[persistence] trait Eventsourced extends Processor {
receiveRecover(RecoveryCompleted)
}
sealed trait PersistInvocation {
sealed trait PendingHandlerInvocation {
def evt: Any
def handler: Any Unit
}
/** forces processor to stash incoming commands untill all these invocations are handled */
final case class StashingPersistInvocation(evt: Any, handler: Any Unit) extends PersistInvocation
/** does not force the processor to stash commands */
final case class AsyncPersistInvocation(evt: Any, handler: Any Unit) extends PersistInvocation
final case class StashingHandlerInvocation(evt: Any, handler: Any Unit) extends PendingHandlerInvocation
/** does not force the processor to stash commands; Originates from either `persistAsync` or `defer` calls */
final case class AsyncHandlerInvocation(evt: Any, handler: Any Unit) extends PendingHandlerInvocation
/** Used instead of iterating `persistInvocations` in order to check if safe to revert to processing commands */
/** Used instead of iterating `pendingInvocations` in order to check if safe to revert to processing commands */
private var pendingStashingPersistInvocations: Long = 0
/** Holds user-supplied callbacks for persist/persistAsync calls */
private val persistInvocations = new java.util.LinkedList[PersistInvocation]() // we only append / isEmpty / get(0) on it
private var persistentEventBatch: List[PersistentRepr] = Nil
private val pendingInvocations = new java.util.LinkedList[PendingHandlerInvocation]() // we only append / isEmpty / get(0) on it
private var resequenceableEventBatch: List[Resequenceable] = Nil
private var currentState: State = recovering
private val processorStash = createStash()
private def flushBatch() {
Eventsourced.super.aroundReceive(receive, PersistentBatch(resequenceableEventBatch.reverse))
resequenceableEventBatch = Nil
}
/**
* Asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event. It is guaranteed that no new commands will be received by a processor
@ -202,8 +211,8 @@ private[persistence] trait Eventsourced extends Processor {
*/
final def persist[A](event: A)(handler: A Unit): Unit = {
pendingStashingPersistInvocations += 1
persistInvocations addLast StashingPersistInvocation(event, handler.asInstanceOf[Any Unit])
persistentEventBatch = PersistentRepr(event) :: persistentEventBatch
pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any Unit])
resequenceableEventBatch = PersistentRepr(event) :: resequenceableEventBatch
}
/**
@ -237,8 +246,8 @@ private[persistence] trait Eventsourced extends Processor {
* @param handler handler for each persisted `event`
*/
final def persistAsync[A](event: A)(handler: A Unit): Unit = {
persistInvocations addLast AsyncPersistInvocation(event, handler.asInstanceOf[Any Unit])
persistentEventBatch = PersistentRepr(event) :: persistentEventBatch
pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any Unit])
resequenceableEventBatch = PersistentRepr(event) :: resequenceableEventBatch
}
/**
@ -252,6 +261,56 @@ private[persistence] trait Eventsourced extends Processor {
final def persistAsync[A](events: immutable.Seq[A])(handler: A Unit): Unit =
events.foreach(persistAsync(_)(handler))
/**
* Defer the handler execution until all pending handlers have been executed.
* Allows to define logic within the actor, which will respect the invocation-order-guarantee
* in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer,
* the corresponding handlers will be invoked in the same order as they were registered in.
*
* This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`,
* if the given event should possible to replay.
*
* If there are no pending persist handler calls, the handler will be called immediatly.
*
* In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the
* [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers
* will not be run.
*
* @param event event to be handled in the future, when preceeding persist operations have been processes
* @param handler handler for the given `event`
*/
final def defer[A](event: A)(handler: A Unit): Unit = {
if (pendingInvocations.isEmpty) {
handler(event)
} else {
pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any Unit])
resequenceableEventBatch = NonPersistentRepr(event, sender()) :: resequenceableEventBatch
}
}
/**
* Defer the handler execution until all pending handlers have been executed.
* Allows to define logic within the actor, which will respect the invocation-order-guarantee
* in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer,
* the corresponding handlers will be invoked in the same order as they were registered in.
*
* This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`,
* if the given event should possible to replay.
*
* If there are no pending persist handler calls, the handler will be called immediatly.
*
* In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the
* [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers
* will not be run.
*
* @param events event to be handled in the future, when preceeding persist operations have been processes
* @param handler handler for each `event`
*/
final def defer[A](events: immutable.Seq[A])(handler: A Unit): Unit =
events.foreach(defer(_)(handler))
/**
* Recovery handler that receives persisted events during recovery. If a state snapshot
* has been captured and saved, this handler will receive a [[SnapshotOffer]] message
@ -425,6 +484,50 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events
final def persistAsync[A](events: JIterable[A])(handler: A Unit): Unit =
super[Eventsourced].persistAsync(Util.immutableSeq(events))(event handler(event))
/**
* Defer the handler execution until all pending handlers have been executed.
* Allows to define logic within the actor, which will respect the invocation-order-guarantee
* in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer,
* the corresponding handlers will be invoked in the same order as they were registered in.
*
* This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`,
* if the given event should possible to replay.
*
* If there are no pending persist handler calls, the handler will be called immediatly.
*
* In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the
* [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers
* will not be run.
*
* @param event event to be handled in the future, when preceeding persist operations have been processes
* @param handler handler for the given `event`
*/
final def defer[A](event: A)(handler: Procedure[A]): Unit =
super[Eventsourced].defer(event)(event handler(event))
/**
* Defer the handler execution until all pending handlers have been executed.
* Allows to define logic within the actor, which will respect the invocation-order-guarantee
* in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer,
* the corresponding handlers will be invoked in the same order as they were registered in.
*
* This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`,
* if the given event should possible to replay.
*
* If there are no pending persist handler calls, the handler will be called immediatly.
*
* In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the
* [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers
* will not be run.
*
* @param events event to be handled in the future, when preceeding persist operations have been processes
* @param handler handler for each `event`
*/
final def defer[A](events: JIterable[A])(handler: Procedure[A]): Unit =
super[Eventsourced].defer(Util.immutableSeq(events))(event handler(event))
/**
* Java API: recovery handler that receives persisted events during recovery. If a state snapshot
* has been captured and saved, this handler will receive a [[SnapshotOffer]] message
@ -446,7 +549,7 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events
* communication with other actors). On successful validation, one or more events are
* derived from a command and these events are then persisted by calling `persist`.
* Commands sent to event sourced processors must not be [[Persistent]] or
* [[PersistentBatch]] messages. In this case an `UnsupportedOperationException` is
* [[ResequenceableBatch]] messages. In this case an `UnsupportedOperationException` is
* thrown by the processor.
*/
def onReceiveCommand(msg: Any): Unit
@ -458,7 +561,7 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events
* communication with other actors). On successful validation, one or more events are
* derived from a command and these events are then persisted by calling `persist`.
* Commands sent to event sourced processors must not be [[Persistent]] or
* [[PersistentBatch]] messages. In this case an `UnsupportedOperationException` is
* [[ResequenceableBatch]] messages. In this case an `UnsupportedOperationException` is
* thrown by the processor.
*/
@deprecated("AbstractEventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent `akka.persistence.PersistentProcessor`", since = "2.3.4")
@ -516,6 +619,50 @@ abstract class AbstractEventsourcedProcessor extends AbstractActor with Eventsou
final def persistAsync[A](event: A, handler: Procedure[A]): Unit =
persistAsync(event)(event handler(event))
/**
* Defer the handler execution until all pending handlers have been executed.
* Allows to define logic within the actor, which will respect the invocation-order-guarantee
* in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer,
* the corresponding handlers will be invoked in the same order as they were registered in.
*
* This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`,
* if the given event should possible to replay.
*
* If there are no pending persist handler calls, the handler will be called immediatly.
*
* In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the
* [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers
* will not be run.
*
* @param event event to be handled in the future, when preceeding persist operations have been processes
* @param handler handler for the given `event`
*/
final def defer[A](event: A)(handler: Procedure[A]): Unit =
super.defer(event)(event handler(event))
/**
* Defer the handler execution until all pending handlers have been executed.
* Allows to define logic within the actor, which will respect the invocation-order-guarantee
* in respect to `persistAsync` calls. That is, if `persistAsync` was invoked before defer,
* the corresponding handlers will be invoked in the same order as they were registered in.
*
* This call will NOT result in `event` being persisted, please use `persist` or `persistAsync`,
* if the given event should possible to replay.
*
* If there are no pending persist handler calls, the handler will be called immediatly.
*
* In the event of persistence failures (indicated by [[PersistenceFailure]] messages being sent to the
* [[PersistentActor]], you can handle these messages, which in turn will enable the deferred handlers to run afterwards.
* If persistence failure messages are left `unhandled`, the default behavior is to stop the Actor, thus the handlers
* will not be run.
*
* @param events event to be handled in the future, when preceeding persist operations have been processes
* @param handler handler for each `event`
*/
final def defer[A](events: JIterable[A])(handler: Procedure[A]): Unit =
super.defer(Util.immutableSeq(events))(event handler(event))
/**
* Java API: asynchronously persists `events` in specified order. This is equivalent to calling
* `persistAsync[A](event: A)(handler: A => Unit)` multiple times with the same `handler`,

View file

@ -8,8 +8,6 @@ import scala.collection.immutable
import akka.actor._
import akka.persistence.serialization.Message
/**
* INTERNAL API.
*
@ -60,7 +58,7 @@ private[persistence] object JournalProtocol {
* @param messages messages to be written.
* @param processor write requestor.
*/
final case class WriteMessages(messages: immutable.Seq[PersistentRepr], processor: ActorRef)
final case class WriteMessages(messages: immutable.Seq[Resequenceable], processor: ActorRef)
/**
* Reply message to a successful [[WriteMessages]] request. This reply is sent to the requestor

View file

@ -15,12 +15,21 @@ import akka.pattern.PromiseActorRef
import akka.persistence.serialization.Message
/**
* Persistent message.
* Marks messages which can be resequenced by the [[akka.persistence.journal.AsyncWriteJournal]].
*
* In essence it is either an [[NonPersistentRepr]] or [[Persistent]].
*/
@deprecated("Messages wrapped in Persistent were only required by Processor and Command Sourcing, " +
"which is now deprecated. Use `akka.persistence.PersistentActor` and Event Sourcing instead.",
since = "2.3.4")
sealed abstract class Persistent {
sealed trait Resequenceable {
def payload: Any
def sender: ActorRef
}
/** Message which can be resequenced by the Journal, but will not be persisted. */
final case class NonPersistentRepr(payload: Any, sender: ActorRef) extends Resequenceable
/** Persistent message. */
@deprecated("Use akka.persistence.PersistentActor instead.", since = "2.3.4")
sealed abstract class Persistent extends Resequenceable {
/**
* This persistent message's payload.
*/
@ -41,9 +50,7 @@ sealed abstract class Persistent {
def withPayload(payload: Any): Persistent
}
@deprecated("Messages wrapped in Persistent were only required by Processor and Command Sourcing, " +
"which is now deprecated. Use `akka.persistence.PersistentActor` and Event Sourcing instead.",
since = "2.3.4")
@deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4")
object Persistent {
/**
* Java API: creates a new persistent message. Must only be used outside processors.
@ -71,9 +78,7 @@ object Persistent {
* @param payload payload of the new persistent message.
* @param currentPersistentMessage optional current persistent message, defaults to `None`.
*/
@deprecated("Messages wrapped in Persistent were only required by Processor and Command Sourcing, " +
"which is now deprecated. Use `akka.persistence.PersistentActor` and Event Sourcing instead.",
since = "2.3.4")
@deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4")
def apply(payload: Any)(implicit currentPersistentMessage: Option[Persistent] = None): Persistent =
currentPersistentMessage.map(_.withPayload(payload)).getOrElse(PersistentRepr(payload))
@ -88,9 +93,7 @@ object Persistent {
* Persistent message that has been delivered by a [[Channel]] or [[PersistentChannel]]. Channel
* destinations that receive messages of this type can confirm their receipt by calling [[confirm]].
*/
@deprecated("Messages wrapped in Persistent were only required by Processor and Command Sourcing, " +
"which is now deprecated. Use `akka.persistence.PersistentActor` and Event Sourcing instead.",
since = "2.3.4")
@deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4")
sealed abstract class ConfirmablePersistent extends Persistent {
/**
* Called by [[Channel]] and [[PersistentChannel]] destinations to confirm the receipt of a
@ -105,9 +108,7 @@ sealed abstract class ConfirmablePersistent extends Persistent {
def redeliveries: Int
}
@deprecated("Messages wrapped in Persistent were only required by Processor and Command Sourcing, " +
"which is now deprecated. Use `akka.persistence.PersistentActor` and Event Sourcing instead.",
since = "2.3.4")
@deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4")
object ConfirmablePersistent {
/**
* [[ConfirmablePersistent]] extractor.
@ -121,18 +122,7 @@ object ConfirmablePersistent {
* journal. The processor receives the written messages individually as [[Persistent]] messages.
* During recovery, they are also replayed individually.
*/
@deprecated("Messages wrapped in Persistent were only required by Processor and Command Sourcing, " +
"which is now deprecated. Use `akka.persistence.PersistentActor` and Event Sourcing instead.",
since = "2.3.4")
case class PersistentBatch(persistentBatch: immutable.Seq[Persistent]) extends Message {
// todo while we want to remove Persistent() from user-land, the batch may (probably?) become private[akka] to remain for journal internals #15230
/**
* INTERNAL API.
*/
private[persistence] def persistentReprList: List[PersistentRepr] =
persistentBatch.toList.asInstanceOf[List[PersistentRepr]]
}
case class PersistentBatch(batch: immutable.Seq[Resequenceable]) extends Message
/**
* Plugin API: confirmation entry written by journal plugins.
@ -170,7 +160,7 @@ private[persistence] final case class PersistentIdImpl(processorId: String, sequ
* @see [[journal.AsyncWriteJournal]]
* @see [[journal.AsyncRecovery]]
*/
trait PersistentRepr extends Persistent with PersistentId with Message {
trait PersistentRepr extends Persistent with Resequenceable with PersistentId with Message {
// todo we want to get rid of the Persistent() wrapper from user land; PersistentRepr is here to stay. #15230
import scala.collection.JavaConverters._

View file

@ -86,10 +86,11 @@ trait Processor extends Actor with Recovery {
private var batching = false
def aroundReceive(receive: Receive, message: Any) = message match {
case r: Recover // ignore
case ReplayedMessage(p) processPersistent(receive, p) // can occur after unstash from user stash
case WriteMessageSuccess(p) processPersistent(receive, p)
case WriteMessageFailure(p, cause)
case r: Recover // ignore
case ReplayedMessage(p) processPersistent(receive, p) // can occur after unstash from user stash
case WriteMessageSuccess(p: PersistentRepr) processPersistent(receive, p)
case WriteMessageSuccess(r: Resequenceable) process(receive, r)
case WriteMessageFailure(p, cause)
process(receive, PersistenceFailure(p.payload, p.sequenceNr, cause))
case LoopMessageSuccess(m) process(receive, m)
case WriteMessagesSuccessful | WriteMessagesFailed(_)
@ -108,11 +109,15 @@ trait Processor extends Actor with Recovery {
journal forward LoopMessage(m, self)
}
def addToBatch(p: PersistentRepr): Unit =
processorBatch = processorBatch :+ p.update(processorId = processorId, sequenceNr = nextSequenceNr(), sender = sender())
def addToBatch(p: Resequenceable): Unit = p match {
case p: PersistentRepr
processorBatch = processorBatch :+ p.update(processorId = processorId, sequenceNr = nextSequenceNr(), sender = sender())
case r
processorBatch = processorBatch :+ r
}
def addToBatch(pb: PersistentBatch): Unit =
pb.persistentReprList.foreach(addToBatch)
pb.batch.foreach(addToBatch)
def maxBatchSizeReached: Boolean =
processorBatch.length >= extension.settings.journal.maxMessageBatchSize
@ -151,10 +156,10 @@ trait Processor extends Actor with Recovery {
*/
private def onRecoveryCompleted(receive: Receive): Unit =
receive.applyOrElse(RecoveryCompleted, unhandled)
private val _processorId = extension.processorId(self)
private var processorBatch = Vector.empty[PersistentRepr]
private var processorBatch = Vector.empty[Resequenceable]
private var sequenceNr: Long = 0L
/**
@ -326,14 +331,14 @@ trait Processor extends Actor with Recovery {
* @param cause failure cause.
*/
@SerialVersionUID(1L)
final case class PersistenceFailure(payload: Any, sequenceNr: Long, cause: Throwable)
case class PersistenceFailure(payload: Any, sequenceNr: Long, cause: Throwable)
/**
* Sent to a [[Processor]] if a journal fails to replay messages or fetch that processor's
* highest sequence number. If not handled, the prossor will be stopped.
*/
@SerialVersionUID(1L)
final case class RecoveryFailure(cause: Throwable)
case class RecoveryFailure(cause: Throwable)
abstract class RecoveryCompleted
/**

View file

@ -16,7 +16,7 @@ import akka.persistence._
/**
* Abstract journal, optimized for asynchronous, non-blocking writes.
*/
trait AsyncWriteJournal extends Actor with AsyncRecovery {
trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
import JournalProtocol._
import AsyncWriteJournal._
import context.dispatcher
@ -24,16 +24,17 @@ trait AsyncWriteJournal extends Actor with AsyncRecovery {
private val extension = Persistence(context.system)
private val publish = extension.settings.internal.publishPluginCommands
private val resequencer = context.actorOf(Props[Resequencer])
private val resequencer = context.actorOf(Props[Resequencer]())
private var resequencerCounter = 1L
def receive = {
case WriteMessages(persistentBatch, processor)
case WriteMessages(resequenceables, processor)
val cctr = resequencerCounter
def resequence(f: PersistentRepr Any) = persistentBatch.zipWithIndex.foreach {
case (p, i) resequencer ! Desequenced(f(p), cctr + i + 1, processor, p.sender)
def resequence(f: PersistentRepr Any) = resequenceables.zipWithIndex.foreach {
case (p: PersistentRepr, i) resequencer ! Desequenced(f(p), cctr + i + 1, processor, p.sender)
case (r, i) resequencer ! Desequenced(LoopMessageSuccess(r.payload), cctr + i + 1, processor, r.sender)
}
asyncWriteMessages(persistentBatch.map(_.prepareWrite())) onComplete {
asyncWriteMessages(preparePersistentBatch(resequenceables)) onComplete {
case Success(_)
resequencer ! Desequenced(WriteMessagesSuccessful, cctr, processor, self)
resequence(WriteMessageSuccess(_))
@ -41,7 +42,7 @@ trait AsyncWriteJournal extends Actor with AsyncRecovery {
resequencer ! Desequenced(WriteMessagesFailed(e), cctr, processor, self)
resequence(WriteMessageFailure(_, e))
}
resequencerCounter += persistentBatch.length + 1
resequencerCounter += resequenceables.length + 1
case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, processorId, processor, replayDeleted)
// Send replayed messages and replay result to processor directly. No need
// to resequence replayed messages relative to written and looped messages.

View file

@ -15,7 +15,7 @@ import akka.persistence._
/**
* Abstract journal, optimized for synchronous writes.
*/
trait SyncWriteJournal extends Actor with AsyncRecovery {
trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
import JournalProtocol._
import context.dispatcher
@ -23,14 +23,20 @@ trait SyncWriteJournal extends Actor with AsyncRecovery {
private val publish = extension.settings.internal.publishPluginCommands
final def receive = {
case WriteMessages(persistentBatch, processor)
Try(writeMessages(persistentBatch.map(_.prepareWrite()))) match {
case WriteMessages(resequenceables, processor)
Try(writeMessages(preparePersistentBatch(resequenceables))) match {
case Success(_)
processor ! WriteMessagesSuccessful
persistentBatch.foreach(p processor.tell(WriteMessageSuccess(p), p.sender))
resequenceables.foreach {
case p: PersistentRepr processor.tell(WriteMessageSuccess(p), p.sender)
case r processor.tell(LoopMessageSuccess(r.payload), r.sender)
}
case Failure(e)
processor ! WriteMessagesFailed(e)
persistentBatch.foreach(p processor tell (WriteMessageFailure(p, e), p.sender))
resequenceables.foreach {
case p: PersistentRepr processor tell (WriteMessageFailure(p, e), p.sender)
case r processor tell (LoopMessageSuccess(r.payload), r.sender)
}
throw e
}
case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, processorId, processor, replayDeleted)

View file

@ -0,0 +1,24 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.journal
import akka.persistence.{ PersistentRepr, Resequenceable }
import akka.actor.Actor
import scala.collection.immutable
private[akka] trait WriteJournalBase {
this: Actor
protected def preparePersistentBatch(rb: immutable.Seq[Resequenceable]): immutable.Seq[PersistentRepr] =
rb.filter(persistentPrepareWrite).asInstanceOf[immutable.Seq[PersistentRepr]] // filter instead of flatMap to avoid Some allocations
private def persistentPrepareWrite(r: Resequenceable): Boolean = r match {
case p: PersistentRepr
p.prepareWrite(); true
case _
false
}
}

View file

@ -86,7 +86,9 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
private def persistentMessageBatchBuilder(persistentBatch: PersistentBatch) = {
val builder = PersistentMessageBatch.newBuilder
persistentBatch.persistentReprList.foreach(p builder.addBatch(persistentMessageBuilder(p)))
persistentBatch.batch.
filter(_.isInstanceOf[PersistentRepr]).
foreach(p builder.addBatch(persistentMessageBuilder(p.asInstanceOf[PersistentRepr])))
builder
}

View file

@ -18,7 +18,7 @@ object PersistentActorSpec {
final case class Cmd(data: Any)
final case class Evt(data: Any)
abstract class ExampleProcessor(name: String) extends NamedProcessor(name) with PersistentActor {
abstract class ExamplePersistentActor(name: String) extends NamedProcessor(name) with PersistentActor {
var events: List[Any] = Nil
val updateState: Receive = {
@ -33,14 +33,14 @@ object PersistentActorSpec {
def receiveRecover = updateState
}
class Behavior1Processor(name: String) extends ExampleProcessor(name) {
class Behavior1Processor(name: String) extends ExamplePersistentActor(name) {
val receiveCommand: Receive = commonBehavior orElse {
case Cmd(data)
persist(Seq(Evt(s"${data}-1"), Evt(s"${data}-2")))(updateState)
}
}
class Behavior2Processor(name: String) extends ExampleProcessor(name) {
class Behavior2Processor(name: String) extends ExamplePersistentActor(name) {
val receiveCommand: Receive = commonBehavior orElse {
case Cmd(data)
persist(Seq(Evt(s"${data}-1"), Evt(s"${data}-2")))(updateState)
@ -48,7 +48,7 @@ object PersistentActorSpec {
}
}
class Behavior3Processor(name: String) extends ExampleProcessor(name) {
class Behavior3Processor(name: String) extends ExamplePersistentActor(name) {
val receiveCommand: Receive = commonBehavior orElse {
case Cmd(data)
persist(Seq(Evt(s"${data}-11"), Evt(s"${data}-12")))(updateState)
@ -56,7 +56,7 @@ object PersistentActorSpec {
}
}
class ChangeBehaviorInLastEventHandlerProcessor(name: String) extends ExampleProcessor(name) {
class ChangeBehaviorInLastEventHandlerProcessor(name: String) extends ExamplePersistentActor(name) {
val newBehavior: Receive = {
case Cmd(data)
persist(Evt(s"${data}-21"))(updateState)
@ -75,7 +75,7 @@ object PersistentActorSpec {
}
}
class ChangeBehaviorInFirstEventHandlerProcessor(name: String) extends ExampleProcessor(name) {
class ChangeBehaviorInFirstEventHandlerProcessor(name: String) extends ExamplePersistentActor(name) {
val newBehavior: Receive = {
case Cmd(data)
persist(Evt(s"${data}-21")) { event
@ -94,7 +94,7 @@ object PersistentActorSpec {
}
}
class ChangeBehaviorInCommandHandlerFirstProcessor(name: String) extends ExampleProcessor(name) {
class ChangeBehaviorInCommandHandlerFirstProcessor(name: String) extends ExamplePersistentActor(name) {
val newBehavior: Receive = {
case Cmd(data)
context.unbecome()
@ -109,7 +109,7 @@ object PersistentActorSpec {
}
}
class ChangeBehaviorInCommandHandlerLastProcessor(name: String) extends ExampleProcessor(name) {
class ChangeBehaviorInCommandHandlerLastProcessor(name: String) extends ExamplePersistentActor(name) {
val newBehavior: Receive = {
case Cmd(data)
persist(Seq(Evt(s"${data}-31"), Evt(s"${data}-32")))(updateState)
@ -124,7 +124,7 @@ object PersistentActorSpec {
}
}
class SnapshottingPersistentActor(name: String, probe: ActorRef) extends ExampleProcessor(name) {
class SnapshottingPersistentActor(name: String, probe: ActorRef) extends ExamplePersistentActor(name) {
override def receiveRecover = super.receiveRecover orElse {
case SnapshotOffer(_, events: List[_])
probe ! "offered"
@ -160,14 +160,14 @@ object PersistentActorSpec {
}
}
class ReplyInEventHandlerProcessor(name: String) extends ExampleProcessor(name) {
class ReplyInEventHandlerProcessor(name: String) extends ExamplePersistentActor(name) {
val receiveCommand: Receive = {
case Cmd("a") persist(Evt("a"))(evt sender() ! evt.data)
case p: Persistent sender() ! p // not expected
}
}
class UserStashProcessor(name: String) extends ExampleProcessor(name) {
class UserStashProcessor(name: String) extends ExamplePersistentActor(name) {
var stashed = false
val receiveCommand: Receive = {
case Cmd("a") if (!stashed) { stash(); stashed = true } else sender() ! "a"
@ -176,7 +176,7 @@ object PersistentActorSpec {
}
}
class UserStashManyProcessor(name: String) extends ExampleProcessor(name) {
class UserStashManyProcessor(name: String) extends ExamplePersistentActor(name) {
val receiveCommand: Receive = commonBehavior orElse {
case Cmd("a") persist(Evt("a")) { evt
updateState(evt)
@ -196,7 +196,7 @@ object PersistentActorSpec {
case other stash()
}
}
class AsyncPersistProcessor(name: String) extends ExampleProcessor(name) {
class AsyncPersistProcessor(name: String) extends ExamplePersistentActor(name) {
var counter = 0
val receiveCommand: Receive = commonBehavior orElse {
@ -212,7 +212,7 @@ object PersistentActorSpec {
counter
}
}
class AsyncPersistThreeTimesProcessor(name: String) extends ExampleProcessor(name) {
class AsyncPersistThreeTimesProcessor(name: String) extends ExamplePersistentActor(name) {
var counter = 0
val receiveCommand: Receive = commonBehavior orElse {
@ -231,7 +231,7 @@ object PersistentActorSpec {
counter
}
}
class AsyncPersistSameEventTwiceProcessor(name: String) extends ExampleProcessor(name) {
class AsyncPersistSameEventTwiceProcessor(name: String) extends ExamplePersistentActor(name) {
// atomic because used from inside the *async* callbacks
val sendMsgCounter = new AtomicInteger()
@ -249,7 +249,7 @@ object PersistentActorSpec {
persistAsync(event) { evt sender() ! s"${evt.data}-b-${sendMsgCounter.incrementAndGet()}" }
}
}
class AsyncPersistAndPersistMixedSyncAsyncSyncProcessor(name: String) extends ExampleProcessor(name) {
class AsyncPersistAndPersistMixedSyncAsyncSyncProcessor(name: String) extends ExamplePersistentActor(name) {
var counter = 0
@ -276,12 +276,10 @@ object PersistentActorSpec {
counter
}
}
class AsyncPersistAndPersistMixedSyncAsyncProcessor(name: String) extends ExampleProcessor(name) {
class AsyncPersistAndPersistMixedSyncAsyncProcessor(name: String) extends ExamplePersistentActor(name) {
var sendMsgCounter = 0
val start = System.currentTimeMillis()
def time = s" ${System.currentTimeMillis() - start}ms"
val receiveCommand: Receive = commonBehavior orElse {
case Cmd(data)
sender() ! data
@ -301,7 +299,7 @@ object PersistentActorSpec {
}
}
class UserStashFailureProcessor(name: String) extends ExampleProcessor(name) {
class UserStashFailureProcessor(name: String) extends ExamplePersistentActor(name) {
val receiveCommand: Receive = commonBehavior orElse {
case Cmd(data)
if (data == "b-2") throw new TestException("boom")
@ -322,7 +320,7 @@ object PersistentActorSpec {
}
}
class AnyValEventProcessor(name: String) extends ExampleProcessor(name) {
class AnyValEventProcessor(name: String) extends ExamplePersistentActor(name) {
val receiveCommand: Receive = {
case Cmd("a") persist(5)(evt sender() ! evt)
}
@ -348,6 +346,44 @@ object PersistentActorSpec {
}
}
class DeferringWithPersistActor(name: String) extends ExamplePersistentActor(name) {
val receiveCommand: Receive = {
case Cmd(data)
defer("d-1") { sender() ! _ }
persist(s"$data-2") { sender() ! _ }
defer("d-3") { sender() ! _ }
defer("d-4") { sender() ! _ }
}
}
class DeferringWithAsyncPersistActor(name: String) extends ExamplePersistentActor(name) {
val receiveCommand: Receive = {
case Cmd(data)
defer(s"d-$data-1") { sender() ! _ }
persistAsync(s"pa-$data-2") { sender() ! _ }
defer(s"d-$data-3") { sender() ! _ }
defer(s"d-$data-4") { sender() ! _ }
}
}
class DeferringMixedCallsPPADDPADPersistActor(name: String) extends ExamplePersistentActor(name) {
val receiveCommand: Receive = {
case Cmd(data)
persist(s"p-$data-1") { sender() ! _ }
persistAsync(s"pa-$data-2") { sender() ! _ }
defer(s"d-$data-3") { sender() ! _ }
defer(s"d-$data-4") { sender() ! _ }
persistAsync(s"pa-$data-5") { sender() ! _ }
defer(s"d-$data-6") { sender() ! _ }
}
}
class DeferringWithNoPersistCallsPersistActor(name: String) extends ExamplePersistentActor(name) {
val receiveCommand: Receive = {
case Cmd(data)
defer("d-1") { sender() ! _ }
defer("d-2") { sender() ! _ }
defer("d-3") { sender() ! _ }
}
}
}
abstract class PersistentActorSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
@ -625,6 +661,71 @@ abstract class PersistentActorSpec(config: Config) extends AkkaSpec(config) with
expectNoMsg(100.millis)
}
"allow deferring handlers in order to provide ordered processing in respect to persist handlers" in {
val processor = namedProcessor[DeferringWithPersistActor]
processor ! Cmd("a")
expectMsg("d-1")
expectMsg("a-2")
expectMsg("d-3")
expectMsg("d-4")
expectNoMsg(100.millis)
}
"allow deferring handlers in order to provide ordered processing in respect to asyncPersist handlers" in {
val processor = namedProcessor[DeferringWithAsyncPersistActor]
processor ! Cmd("a")
expectMsg("d-a-1")
expectMsg("pa-a-2")
expectMsg("d-a-3")
expectMsg("d-a-4")
expectNoMsg(100.millis)
}
"invoke deferred handlers, in presence of mixed a long series persist / persistAsync calls" in {
val processor = namedProcessor[DeferringMixedCallsPPADDPADPersistActor]
val p1, p2 = TestProbe()
processor.tell(Cmd("a"), p1.ref)
processor.tell(Cmd("b"), p2.ref)
p1.expectMsg("p-a-1")
p1.expectMsg("pa-a-2")
p1.expectMsg("d-a-3")
p1.expectMsg("d-a-4")
p1.expectMsg("pa-a-5")
p1.expectMsg("d-a-6")
p2.expectMsg("p-b-1")
p2.expectMsg("pa-b-2")
p2.expectMsg("d-b-3")
p2.expectMsg("d-b-4")
p2.expectMsg("pa-b-5")
p2.expectMsg("d-b-6")
expectNoMsg(100.millis)
}
"invoke deferred handlers right away, if there are no pending persist handlers registered" in {
val processor = namedProcessor[DeferringWithNoPersistCallsPersistActor]
processor ! Cmd("a")
expectMsg("d-1")
expectMsg("d-2")
expectMsg("d-3")
expectNoMsg(100.millis)
}
"invoke deferred handlers, perserving the original sender references" in {
val processor = namedProcessor[DeferringWithAsyncPersistActor]
val p1, p2 = TestProbe()
processor.tell(Cmd("a"), p1.ref)
processor.tell(Cmd("b"), p2.ref)
p1.expectMsg("d-a-1")
p1.expectMsg("pa-a-2")
p1.expectMsg("d-a-3")
p1.expectMsg("d-a-4")
p2.expectMsg("d-b-1")
p2.expectMsg("pa-b-2")
p2.expectMsg("d-b-3")
p2.expectMsg("d-b-4")
expectNoMsg(100.millis)
}
"receive RecoveryFinished if it is handled after all events have been replayed" in {
val processor1 = system.actorOf(Props(classOf[SnapshottingPersistentActor], name, testActor))
processor1 ! Cmd("b")

View file

@ -4,17 +4,19 @@
package doc;
import java.util.concurrent.TimeUnit;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.*;
import scala.Option;
import scala.PartialFunction;
import scala.concurrent.duration.Duration;
import akka.actor.*;
import akka.persistence.*;
import scala.runtime.BoxedUnit;
import java.util.concurrent.TimeUnit;
import static java.util.Arrays.asList;
public class LambdaPersistenceDocTest {
@ -359,7 +361,7 @@ public class LambdaPersistenceDocTest {
static Object o8 = new Object() {
//#reliable-event-delivery
class MyEventsourcedProcessor extends AbstractEventsourcedProcessor {
class MyEventsourcedProcessor extends AbstractPersistentActor {
private ActorRef destination;
private ActorRef channel;
@ -392,6 +394,103 @@ public class LambdaPersistenceDocTest {
};
static Object o9 = new Object() {
//#persist-async
class MyPersistentActor extends AbstractPersistentActor {
private void handleCommand(String c) {
sender().tell(c, self());
persistAsync(String.format("evt-%s-1", c), e -> {
sender().tell(e, self());
});
persistAsync(String.format("evt-%s-2", c), e -> {
sender().tell(e, self());
});
}
@Override public PartialFunction<Object, BoxedUnit> receiveRecover() {
return ReceiveBuilder.
match(String.class, this::handleCommand).build();
}
@Override public PartialFunction<Object, BoxedUnit> receiveCommand() {
return ReceiveBuilder.
match(String.class, this::handleCommand).build();
}
}
//#persist-async
public void usage() {
final ActorSystem system = ActorSystem.create("example");
//#persist-async-usage
final ActorRef processor = system.actorOf(Props.create(MyPersistentActor.class));
processor.tell("a", null);
processor.tell("b", null);
// possible order of received messages:
// a
// b
// evt-a-1
// evt-a-2
// evt-b-1
// evt-b-2
//#persist-async-usage
}
};
static Object o10 = new Object() {
//#defer
class MyPersistentActor extends AbstractPersistentActor {
private void handleCommand(String c) {
persistAsync(String.format("evt-%s-1", c), e -> {
sender().tell(e, self());
});
persistAsync(String.format("evt-%s-2", c), e -> {
sender().tell(e, self());
});
defer(String.format("evt-%s-3", c), e -> {
sender().tell(e, self());
});
}
@Override public PartialFunction<Object, BoxedUnit> receiveRecover() {
return ReceiveBuilder.
match(String.class, this::handleCommand).build();
}
@Override public PartialFunction<Object, BoxedUnit> receiveCommand() {
return ReceiveBuilder.
match(String.class, this::handleCommand).build();
}
}
//#defer
public void usage() {
final ActorSystem system = ActorSystem.create("example");
final ActorRef sender = null; // your imaginary sender here
//#defer-caller
final ActorRef processor = system.actorOf(Props.create(MyPersistentActor.class));
processor.tell("a", sender);
processor.tell("b", sender);
// order of received messages:
// a
// b
// evt-a-1
// evt-a-2
// evt-a-3
// evt-b-1
// evt-b-2
// evt-b-3
//#defer-caller
}
};
static Object o11 = new Object() {
//#view
class MyView extends AbstractView {
@Override

View file

@ -48,7 +48,7 @@ class ExampleState implements Serializable {
private final ArrayList<String> events;
public ExampleState() {
this(new ArrayList<String>());
this(new ArrayList<>());
}
public ExampleState(ArrayList<String> events) {
@ -56,7 +56,7 @@ class ExampleState implements Serializable {
}
public ExampleState copy() {
return new ExampleState(new ArrayList<String>(events));
return new ExampleState(new ArrayList<>(events));
}
public void update(Evt evt) {