diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 92f6c4b072..42396b3ec4 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -469,6 +469,16 @@ trait Actor { def receive: Actor.Receive //#receive + /** + * INTERNAL API. + * + * Can be overridden to intercept calls to this actor's current behavior. + * + * @param receive current behavior. + * @param msg current message. + */ + protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = receive.applyOrElse(msg, unhandled) + /** * User overridable definition the strategy to use for supervising * child actors. diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 44a872f071..8e8d6f6501 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -495,7 +495,7 @@ private[akka] class ActorCell( } } - final def receiveMessage(msg: Any): Unit = behaviorStack.head.applyOrElse(msg, actor.unhandled) + final def receiveMessage(msg: Any): Unit = actor.aroundReceive(behaviorStack.head, msg) /* * ACTOR CONTEXT IMPLEMENTATION diff --git a/akka-actor/src/main/scala/akka/actor/Stash.scala b/akka-actor/src/main/scala/akka/actor/Stash.scala index 1b1edfe144..263c5218ad 100644 --- a/akka-actor/src/main/scala/akka/actor/Stash.scala +++ b/akka-actor/src/main/scala/akka/actor/Stash.scala @@ -77,10 +77,13 @@ trait UnrestrictedStash extends Actor { config.getInt("stash-capacity") } - /* The actor's deque-based message queue. + /** + * INTERNAL API. + * + * The actor's deque-based message queue. * `mailbox.queue` is the underlying `Deque`. */ - private val mailbox: DequeBasedMessageQueueSemantics = { + protected[akka] val mailbox: DequeBasedMessageQueueSemantics = { context.asInstanceOf[ActorCell].mailbox.messageQueue match { case queue: DequeBasedMessageQueueSemantics ⇒ queue case other ⇒ throw ActorInitializationException(self, s"DequeBasedMailbox required, got: ${other.getClass.getName}\n" + @@ -116,9 +119,26 @@ trait UnrestrictedStash extends Actor { * * The stash is guaranteed to be empty after calling `unstashAll()`. */ - def unstashAll(): Unit = { + def unstashAll(): Unit = unstashAll(_ ⇒ true) + + /** + * INTERNAL API. + * + * Prepends selected messages in the stash, applying `filterPredicate`, to the + * mailbox, and then clears the stash. + * + * Messages from the stash are enqueued to the mailbox until the capacity of the + * mailbox (if any) has been reached. In case a bounded mailbox overflows, a + * `MessageQueueAppendFailedException` is thrown. + * + * The stash is guaranteed to be empty after calling `unstashAll(Any => Boolean)`. + * + * @param filterPredicate only stashed messages selected by this predicate are + * prepended to the mailbox. + */ + protected[akka] def unstashAll(filterPredicate: Any ⇒ Boolean): Unit = { try { - val i = theStash.reverseIterator + val i = theStash.reverseIterator.filter(envelope ⇒ filterPredicate(envelope.message)) while (i.hasNext) mailbox.enqueueFirst(self, i.next()) } finally { theStash = Vector.empty[Envelope] diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java new file mode 100644 index 0000000000..57e09d633e --- /dev/null +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -0,0 +1,172 @@ +package docs.persistence; + +import scala.Option; + +import akka.actor.*; +import akka.persistence.*; + +public class PersistenceDocTest { + + public interface ProcessorMethods { + //#processor-id + public String processorId(); + //#processor-id + //#recovery-status + public boolean recoveryRunning(); + public boolean recoveryFinished(); + //#recovery-status + //#current-message + public Persistent getCurrentPersistentMessage(); + //#current-message + } + + static Object o1 = new Object() { + //#definition + class MyProcessor extends UntypedProcessor { + public void onReceive(Object message) throws Exception { + if (message instanceof Persistent) { + // message has been written to journal + Persistent persistent = (Persistent)message; + Object payload = persistent.payload(); + Long sequenceNr = persistent.sequenceNr(); + // ... + } else { + // message has not been written to journal + } + } + } + //#definition + + class MyActor extends UntypedActor { + ActorRef processor; + + public MyActor() { + //#usage + processor = getContext().actorOf(Props.create(MyProcessor.class), "myProcessor"); + + processor.tell(Persistent.create("foo"), null); + processor.tell("bar", null); + //#usage + } + + public void onReceive(Object message) throws Exception { + // ... + } + + private void recover() { + //#recover-explicit + processor.tell(Recover.create(), null); + //#recover-explicit + } + } + }; + + static Object o2 = new Object() { + abstract class MyProcessor1 extends UntypedProcessor { + //#recover-on-start-disabled + @Override + public void preStartProcessor() {} + //#recover-on-start-disabled + + //#recover-on-restart-disabled + @Override + public void preRestartProcessor(Throwable reason, Option message) {} + //#recover-on-restart-disabled + } + + abstract class MyProcessor2 extends UntypedProcessor { + //#recover-on-start-custom + @Override + public void preStartProcessor() { + getSelf().tell(Recover.create(457L), null); + } + //#recover-on-start-custom + } + + abstract class MyProcessor3 extends UntypedProcessor { + //#deletion + @Override + public void preRestartProcessor(Throwable reason, Option message) throws Exception { + if (message.isDefined() && message.get() instanceof Persistent) { + delete((Persistent) message.get()); + } + super.preRestartProcessor(reason, message); + } + //#deletion + } + + class MyProcessor4 extends UntypedProcessor implements ProcessorMethods { + //#processor-id-override + @Override + public String processorId() { + return "my-stable-processor-id"; + } + //#processor-id-override + @Override + public void onReceive(Object message) throws Exception {} + } + }; + + static Object o3 = new Object() { + //#channel-example + class MyProcessor extends UntypedProcessor { + private final ActorRef destination; + private final ActorRef channel; + + public MyProcessor() { + this.destination = getContext().actorOf(Props.create(MyDestination.class)); + this.channel = getContext().actorOf(Channel.props(), "myChannel"); + } + + public void onReceive(Object message) throws Exception { + if (message instanceof Persistent) { + Persistent p = (Persistent)message; + Persistent out = p.withPayload("done " + p.payload()); + channel.tell(Deliver.create(out, destination), getSelf()); + } + } + } + + class MyDestination extends UntypedActor { + public void onReceive(Object message) throws Exception { + if (message instanceof Persistent) { + Persistent p = (Persistent)message; + System.out.println("received " + p.payload()); + p.confirm(); + } + } + } + //#channel-example + + class MyProcessor2 extends UntypedProcessor { + private final ActorRef destination; + private final ActorRef channel; + + public MyProcessor2(ActorRef destination) { + this.destination = getContext().actorOf(Props.create(MyDestination.class)); + //#channel-id-override + this.channel = getContext().actorOf(Channel.props("my-stable-channel-id")); + //#channel-id-override + } + + public void onReceive(Object message) throws Exception { + if (message instanceof Persistent) { + Persistent p = (Persistent)message; + Persistent out = p.withPayload("done " + p.payload()); + channel.tell(Deliver.create(out, destination), getSelf()); + + //#channel-example-reply + channel.tell(Deliver.create(out, getSender()), getSelf()); + //#channel-example-reply + //#resolve-destination + channel.tell(Deliver.create(out, getSender(), Resolve.destination()), getSelf()); + //#resolve-destination + //#resolve-sender + channel.tell(Deliver.create(out, destination, Resolve.sender()), getSender()); + //#resolve-sender + + } + } + } + }; +} diff --git a/akka-docs/rst/java/index-actors.rst b/akka-docs/rst/java/index-actors.rst index 64a4c863ee..20b567295b 100644 --- a/akka-docs/rst/java/index-actors.rst +++ b/akka-docs/rst/java/index-actors.rst @@ -11,4 +11,5 @@ Actors mailboxes routing fsm + persistence testing diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst new file mode 100644 index 0000000000..fe8a1f0024 --- /dev/null +++ b/akka-docs/rst/java/persistence.rst @@ -0,0 +1,234 @@ +.. _persistence-java: + +########### +Persistence +########### + +This section describes an early access version of the Akka persistence module. Akka persistence is heavily inspired +by the `eventsourced`_ library. It follows the same concepts and architecture of `eventsourced`_ but significantly +differs on API and implementation level. + +.. warning:: + + This module is marked as **“experimental”** as of its introduction in Akka 2.3.0. We will continue to + improve this API based on our users’ feedback, which implies that while we try to keep incompatible + changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the + contents of the ``akka.persistence`` package. + +.. _eventsourced: https://github.com/eligosource/eventsourced + +Dependencies +============ + +Akka persistence is a separate jar file. Make sure that you have the following dependency in your project:: + + + com.typesafe.akka + akka-persistence_@binVersion@ + @version@ + + +Architecture +============ + +* *Processor*: A processor is a persistent actor. Messages sent to a processor are written to a journal before + its ``onReceive`` method is called. When a processor is started or restarted, journaled messages are replayed + to that processor, so that it can recover internal state from these messages. + +* *Channel*: Channels are used by processors to communicate with other actors. They prevent that replayed messages + are redundantly delivered to these actors. + +Use cases +========= + +* TODO: describe command sourcing +* TODO: describe event sourcing + +Configuration +============= + +By default, journaled messages are written to a directory named ``journal`` in the current working directory. This +can be changed by configuration where the specified path can be relative or absolute: + +.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#config + +.. _processors-java: + +Processors +========== + +A processor can be implemented by extending the abstract ``UntypedProcessor`` class and implementing the +``onReceive`` method. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#definition + +Processors only write messages of type ``Persistent`` to the journal, others are received without being persisted. +When a processor's ``onReceive`` method is called with a ``Persistent`` message it can safely assume that this message +has been successfully written to the journal. A ``UntypedProcessor`` itself is an ``Actor`` and can therefore +be instantiated with ``actorOf``. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#usage + +Recovery +-------- + +By default, a processor is automatically recovered on start and on restart by replaying persistent messages. +New messages sent to a processor during recovery do not interfere with replayed messages. New messages will +only be received by that processor after recovery completes. + +Recovery customization +^^^^^^^^^^^^^^^^^^^^^^ + +Automated recovery on start can be disabled by overriding ``preStartProcessor`` with an empty implementation. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-on-start-disabled + +In this case, a processor must be recovered explicitly by sending it a ``Recover`` message. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-explicit + +If not overridden, ``preStartProcessor`` sends a ``Recover`` message to ``getSelf()``. Applications may also override +``preStartProcessor`` to define further ``Recover`` parameters such as an upper sequence number bound, for example. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-on-start-custom + +Automated recovery on restart can be disabled by overriding ``preRestartProcessor`` with an empty implementation. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-on-restart-disabled + +This is useful in situations where processors are *resumed* by a supervisor (which keeps accumulated internal +state and makes a message replay unnecessary). + +Recovery status +^^^^^^^^^^^^^^^ + +A processor can query its own recovery status via the methods + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-status + +.. _failure-handling-java: + +Failure handling +^^^^^^^^^^^^^^^^ + +A persistent message that caused an exception will be received again by a processor after restart. To prevent +a replay of that message during recovery it can be marked as deleted. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#deletion + +Life cycle hooks +---------------- + +``UntypedProcessor`` implementation classes should override the ``preStartProcessor``, ``preRestartProcessor``, +``postRestartProcessor`` and ``postStopProcessor`` life cycle hooks and not ``preStart``, ``preRestart``, +``postRestart`` and ``postStop`` directly. + +Identifiers +----------- + +A processor must have an identifier that doesn't change across different actor incarnations. It defaults to the +``String`` representation of processor's path and can be obtained via the ``processorId`` method. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#processor-id + +Applications can customize a processor's id by specifying an actor name during processor creation as shown in +section :ref:`processors-java`. This works well when using local actor references but may cause problems with remote +actor references because their paths also contain deployment information such as host and port (and actor deployments +are likely to change during the lifetime of an application). In this case, ``UntypedProcessor`` implementation classes +should override ``processorId``. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#processor-id-override + +Later versions of the Akka persistence module will likely offer a possibility to migrate processor ids. + +Channels +======== + +Channels are special actors that are used by processors to communicate with other actors (channel destinations). +Channels prevent redundant delivery of replayed messages to destinations during processor recovery. A replayed +message is retained by a channel if its previous delivery has been confirmed by a destination. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#channel-example + +A channel is ready to use once it has been created, no recovery or further activation is needed. A ``Deliver`` +request instructs a channel to send a ``Persistent`` message to a destination where the sender of the ``Deliver`` +request is forwarded to the destination. A processor may also reply to a message sender directly by using +``getSender()`` as channel destination. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#channel-example-reply + +Channel destinations confirm the delivery of a ``Persistent`` message by calling its ``confirm()`` method. This +(asynchronously) writes a confirmation entry to the journal. Replayed messages internally contain these confirmation +entries which allows a channel to decide if a message should be retained or not. + +If an application crashes after a destination called ``confirm()`` but before the confirmation entry could have +been written to the journal then the unconfirmed message will be delivered again during next recovery and it is +the destination's responsibility to detect the duplicate or simply process the message again if it's an idempotent +receiver. Duplicates can be detected, for example, by tracking sequence numbers. + +Currently, channels do not store ``Deliver`` requests or retry delivery on network or destination failures. This +feature (*reliable channels*) will be available soon. + +Sender resolution +----------------- + +``ActorRef`` s of ``Persistent`` message senders are also stored in the journal. Consequently, they may become invalid if +an application is restarted and messages are replayed. For example, the stored ``ActorRef`` may then reference +a previous incarnation of a sender and a new incarnation of that sender cannot receive a reply from a processor. +This may be acceptable for many applications but others may require that a new sender incarnation receives the +reply (to reliably resume a conversation between actors after a JVM crash, for example). Here, a channel may +assist in resolving new sender incarnations by specifying a third ``Deliver`` argument: + +* ``Resolve.destination()`` if the sender of a persistent message is used as channel destination + + .. includecode:: code/docs/persistence/PersistenceDocTest.java#resolve-destination + +* ``Resolve.sender()`` if the sender of a persistent message is forwarded to a destination. + + .. includecode:: code/docs/persistence/PersistenceDocTest.java#resolve-sender + +Default is ``Resolve.off()`` which means no resolution. Find out more in the ``Deliver`` API docs. + +Identifiers +----------- + +In the same way as :ref:`processors-java`, channels also have an identifier that defaults to a channel's path. A channel +identifier can therefore be customized by using a custom actor name at channel creation. As already mentioned, this +works well when using local actor references but may cause problems with remote actor references. In this case, an +application-defined channel id should be provided as argument to ``Channel.props(String)`` + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#channel-id-override + +Persistent messages +=================== + +Payload +------- + +The payload of a ``Persistent`` message can be obtained via its ``payload`` method. Inside processors, new messages +must be derived from the current persistent message before sending them via a channel, either by calling ``p.withPayload(...)`` +or ``Persistent.create(..., getCurrentPersistentMessage())`` where ``getCurrentPersistentMessage()`` is defined on +``UntypedProcessor``. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#current-message + +This is necessary for delivery confirmations to work properly. Both +ways are equivalent but we recommend using ``p.withPayload(...)`` for clarity. It is not allowed to send a message +via a channel that has been created with ``Persistent.create(...)``. This would redeliver the message on every replay +even though its delivery was confirmed by a destination. + +Sequence number +--------------- + +The sequence number of a ``Persistent`` message can be obtained via its ``sequenceNr`` method. Persistent +messages are assigned sequence numbers on a per-processor basis. A sequence starts at ``1L`` and doesn't contain +gaps unless a processor marks a message as deleted. + +Upcoming features +================= + +* Snapshot based recovery +* Configurable serialization +* Reliable channels +* Journal plugin API +* ... diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala new file mode 100644 index 0000000000..8f3d56ddc1 --- /dev/null +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -0,0 +1,187 @@ +package docs.persistence + +import akka.actor.ActorSystem +import akka.persistence.{ Recover, Persistent, Processor } +import akka.testkit.{ ImplicitSender, AkkaSpec } + +trait PersistenceDocSpec { + val system: ActorSystem + val config = + """ + //#config + akka.persistence.journal.leveldb.dir = "target/journal" + //#config + """ + + import system._ + + new AnyRef { + //#definition + import akka.persistence.{ Persistent, Processor } + + class MyProcessor extends Processor { + def receive = { + case Persistent(payload, sequenceNr) ⇒ // message has been written to journal + case other ⇒ // message has not been written to journal + } + } + //#definition + + //#usage + import akka.actor.Props + + val processor = actorOf(Props[MyProcessor], name = "myProcessor") + + processor ! Persistent("foo") // will be journaled + processor ! "bar" // will not be journaled + //#usage + + //#recover-explicit + processor ! Recover() + //#recover-explicit + } + + new AnyRef { + trait MyProcessor1 extends Processor { + //#recover-on-start-disabled + override def preStartProcessor() = () + //#recover-on-start-disabled + //#recover-on-restart-disabled + override def preRestartProcessor(reason: Throwable, message: Option[Any]) = () + //#recover-on-restart-disabled + } + + trait MyProcessor2 extends Processor { + //#recover-on-start-custom + override def preStartProcessor() { + self ! Recover(toSequenceNr = 457L) + } + //#recover-on-start-custom + } + + trait MyProcessor3 extends Processor { + //#deletion + override def preRestartProcessor(reason: Throwable, message: Option[Any]) { + message match { + case Some(p: Persistent) ⇒ delete(p) + case _ ⇒ + } + super.preRestartProcessor(reason, message) + } + //#deletion + } + } + + new AnyRef { + trait ProcessorMethods { + //#processor-id + def processorId: String + //#processor-id + //#recovery-status + def recoveryRunning: Boolean + def recoveryFinished: Boolean + //#recovery-status + //#current-message + implicit def currentPersistentMessage: Option[Persistent] + //#current-message + } + class MyProcessor1 extends Processor with ProcessorMethods { + //#processor-id-override + override def processorId = "my-stable-processor-id" + //#processor-id-override + def receive = { + case _ ⇒ + } + } + } + + new AnyRef { + //#channel-example + import akka.actor.{ Actor, Props } + import akka.persistence.{ Channel, Deliver, Persistent, Processor } + + class MyProcessor extends Processor { + val destination = context.actorOf(Props[MyDestination]) + val channel = context.actorOf(Channel.props(), name = "myChannel") + + def receive = { + case p @ Persistent(payload, _) ⇒ { + channel ! Deliver(p.withPayload(s"processed ${payload}"), destination) + } + } + } + + class MyDestination extends Actor { + def receive = { + case p @ Persistent(payload, _) ⇒ { + println(s"received ${payload}") + p.confirm() + } + } + } + //#channel-example + + class MyProcessor2 extends Processor { + import akka.persistence.Resolve + + val destination = context.actorOf(Props[MyDestination]) + val channel = + //#channel-id-override + context.actorOf(Channel.props("my-stable-channel-id")) + //#channel-id-override + + def receive = { + case p @ Persistent(payload, _) ⇒ { + //#channel-example-reply + channel ! Deliver(p.withPayload(s"processed ${payload}"), sender) + //#channel-example-reply + //#resolve-destination + channel ! Deliver(p, sender, Resolve.Destination) + //#resolve-destination + //#resolve-sender + channel forward Deliver(p, destination, Resolve.Sender) + //#resolve-sender + } + } + } + + class MyProcessor3 extends Processor { + def receive = { + //#payload-pattern-matching + case Persistent(payload, _) ⇒ + //#payload-pattern-matching + } + } + + class MyProcessor4 extends Processor { + def receive = { + //#sequence-nr-pattern-matching + case Persistent(_, sequenceNr) ⇒ + //#sequence-nr-pattern-matching + } + } + } + + new AnyRef { + //#fsm-example + import akka.actor.FSM + import akka.persistence.{ Processor, Persistent } + + class PersistentDoor extends Processor with FSM[String, Int] { + startWith("closed", 0) + + when("closed") { + case Event(Persistent("open", _), counter) ⇒ { + goto("open") using (counter + 1) replying (counter) + } + } + + when("open") { + case Event(Persistent("close", _), counter) ⇒ { + goto("closed") using (counter + 1) replying (counter) + } + } + } + //#fsm-example + } +} diff --git a/akka-docs/rst/scala/index-actors.rst b/akka-docs/rst/scala/index-actors.rst index 3504ad8daf..3ff36e4d68 100644 --- a/akka-docs/rst/scala/index-actors.rst +++ b/akka-docs/rst/scala/index-actors.rst @@ -12,4 +12,5 @@ Actors mailboxes routing fsm + persistence testing diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst new file mode 100644 index 0000000000..078408a3d2 --- /dev/null +++ b/akka-docs/rst/scala/persistence.rst @@ -0,0 +1,252 @@ +.. _persistence: + +########### +Persistence +########### + +This section describes an early access version of the Akka persistence module. Akka persistence is heavily inspired +by the `eventsourced`_ library. It follows the same concepts and architecture of `eventsourced`_ but significantly +differs on API and implementation level. + +.. warning:: + + This module is marked as **“experimental”** as of its introduction in Akka 2.3.0. We will continue to + improve this API based on our users’ feedback, which implies that while we try to keep incompatible + changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the + contents of the ``akka.persistence`` package. + +.. _eventsourced: https://github.com/eligosource/eventsourced + +Dependencies +============ + +Akka persistence is a separate jar file. Make sure that you have the following dependency in your project:: + + "com.typesafe.akka" %% "akka-persistence" % "@version@" @crossString@ + +Architecture +============ + +* *Processor*: A processor is a persistent actor. Messages sent to a processor are written to a journal before + its ``receive`` method is called. When a processor is started or restarted, journaled messages are replayed + to that processor, so that it can recover internal state from these messages. + +* *Channel*: Channels are used by processors to communicate with other actors. They prevent that replayed messages + are redundantly delivered to these actors. + +Use cases +========= + +* TODO: describe command sourcing +* TODO: describe event sourcing + +Configuration +============= + +By default, journaled messages are written to a directory named ``journal`` in the current working directory. This +can be changed by configuration where the specified path can be relative or absolute: + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#config + +.. _processors: + +Processors +========== + +A processor can be implemented by extending the ``Processor`` trait and implementing the ``receive`` method. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#definition + +Processors only write messages of type ``Persistent`` to the journal, others are received without being persisted. +When a processor's ``receive`` method is called with a ``Persistent`` message it can safely assume that this message +has been successfully written to the journal. A ``Processor`` itself is an ``Actor`` and can therefore be instantiated +with ``actorOf``. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#usage + +Recovery +-------- + +By default, a processor is automatically recovered on start and on restart by replaying journaled messages. +New messages sent to a processor during recovery do not interfere with replayed messages. New messages will +only be received by that processor after recovery completes. + +Recovery customization +^^^^^^^^^^^^^^^^^^^^^^ + +Automated recovery on start can be disabled by overriding ``preStartProcessor`` with an empty implementation. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-on-start-disabled + +In this case, a processor must be recovered explicitly by sending it a ``Recover()`` message. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-explicit + +If not overridden, ``preStartProcessor`` sends a ``Recover()`` message to ``self``. Applications may also override +``preStartProcessor`` to define further ``Recover()`` parameters such as an upper sequence number bound, for example. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-on-start-custom + +Automated recovery on restart can be disabled by overriding ``preRestartProcessor`` with an empty implementation. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-on-restart-disabled + +This is useful in situations where processors are *resumed* by a supervisor (which keeps accumulated internal +state and makes a message replay unnecessary). + +Recovery status +^^^^^^^^^^^^^^^ + +A processor can query its own recovery status via the methods + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-status + +.. _failure-handling: + +Failure handling +^^^^^^^^^^^^^^^^ + +A persistent message that caused an exception will be received again by a processor after restart. To prevent +a replay of that message during recovery it can be marked as deleted. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#deletion + +Life cycle hooks +---------------- + +``Processor`` implementation classes should override the ``preStartProcessor``, ``preRestartProcessor``, +``postRestartProcessor`` and ``postStopProcessor`` life cycle hooks and not ``preStart``, ``preRestart``, +``postRestart`` and ``postStop`` directly. The latter are nevertheless non-final to allow composition with +existing traits such as ``akka.actor.FSM``, for example. + +Identifiers +----------- + +A processor must have an identifier that doesn't change across different actor incarnations. It defaults to the +``String`` representation of processor's path and can be obtained via the ``processorId`` method. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#processor-id + +Applications can customize a processor's id by specifying an actor name during processor creation as shown in +section :ref:`processors`. This works well when using local actor references but may cause problems with remote +actor references because their paths also contain deployment information such as host and port (and actor deployments +are likely to change during the lifetime of an application). In this case, ``Processor`` implementation classes +should override ``processorId``. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#processor-id-override + +Later versions of the Akka persistence module will likely offer a possibility to migrate processor ids. + +Channels +======== + +Channels are special actors that are used by processors to communicate with other actors (channel destinations). +Channels prevent redundant delivery of replayed messages to destinations during processor recovery. A replayed +message is retained by a channel if its previous delivery has been confirmed by a destination. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#channel-example + +A channel is ready to use once it has been created, no recovery or further activation is needed. A ``Deliver`` +request instructs a channel to send a ``Persistent`` message to a destination where the sender of the ``Deliver`` +request is forwarded to the destination. A processor may also reply to a message sender directly by using ``sender`` +as channel destination. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#channel-example-reply + +Channel destinations confirm the delivery of a ``Persistent`` message by calling its ``confirm()`` method. This +(asynchronously) writes a confirmation entry to the journal. Replayed messages internally contain these confirmation +entries which allows a channel to decide if a message should be retained or not. + +If an application crashes after a destination called ``confirm()`` but before the confirmation entry could have +been written to the journal then the unconfirmed message will be delivered again during next recovery and it is +the destination's responsibility to detect the duplicate or simply process the message again if it's an idempotent +receiver. Duplicates can be detected, for example, by tracking sequence numbers. + +Currently, channels do not store ``Deliver`` requests or retry delivery on network or destination failures. This +feature (*reliable channels*) will be available soon. + +Sender resolution +----------------- + +``ActorRef`` s of ``Persistent`` message senders are also stored in the journal. Consequently, they may become invalid if +an application is restarted and messages are replayed. For example, the stored ``ActorRef`` may then reference +a previous incarnation of a sender and a new incarnation of that sender cannot receive a reply from a processor. +This may be acceptable for many applications but others may require that a new sender incarnation receives the +reply (to reliably resume a conversation between actors after a JVM crash, for example). Here, a channel may +assist in resolving new sender incarnations by specifying a third ``Deliver`` argument: + +* ``Resolve.Destination`` if the sender of a persistent message is used as channel destination + + .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#resolve-destination + +* ``Resolve.Sender`` if the sender of a persistent message is forwarded to a destination. + + .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#resolve-sender + +Default is ``Resolve.Off`` which means no resolution. Find out more in the ``Deliver`` API docs. + +Identifiers +----------- + +In the same way as :ref:`processors`, channels also have an identifier that defaults to a channel's path. A channel +identifier can therefore be customized by using a custom actor name at channel creation. As already mentioned, this +works well when using local actor references but may cause problems with remote actor references. In this case, an +application-defined channel id should be provided as argument to ``Channel.props(String)`` + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#channel-id-override + +Persistent messages +=================== + +Payload +------- + +The payload of a ``Persistent`` message can be obtained via its + +.. includecode:: ../../../akka-persistence/src/main/scala/akka/persistence/Persistent.scala#payload + +method or by pattern matching + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#payload-pattern-matching + +Inside processors, new persistent messages are derived from the current persistent message before sending them via a +channel, either by calling ``p.withPayload(...)`` or ``Persistent.create(...)`` where the latter uses the +implicit ``currentPersistentMessage`` made available by ``Processor``. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#current-message + +This is necessary for delivery confirmations to work properly. Both ways are equivalent but we recommend +using ``p.withPayload(...)`` for clarity. + +Sequence number +--------------- + +The sequence number of a ``Persistent`` message can be obtained via its + +.. includecode:: ../../../akka-persistence/src/main/scala/akka/persistence/Persistent.scala#sequence-nr + +method or by pattern matching + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#sequence-nr-pattern-matching + +Persistent messages are assigned sequence numbers on a per-processor basis. A sequence starts at ``1L`` and +doesn't contain gaps unless a processor marks a message as deleted. + +Miscellaneous +============= + +State machines +-------------- + +State machines can be persisted by mixing in the ``FSM`` trait into processors. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#fsm-example + +Upcoming features +================= + +* Snapshot based recovery +* Configurable serialization +* Reliable channels +* Journal plugin API +* ... diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf new file mode 100644 index 0000000000..f1842097d1 --- /dev/null +++ b/akka-persistence/src/main/resources/reference.conf @@ -0,0 +1,19 @@ +akka { + persistence { + journal { + use = "leveldb" + + inmem { + // ... + } + + leveldb { + dir = "journal" + dispatcher { + executor = "thread-pool-executor" + type = PinnedDispatcher + } + } + } + } +} \ No newline at end of file diff --git a/akka-persistence/src/main/scala/akka/persistence/Channel.scala b/akka-persistence/src/main/scala/akka/persistence/Channel.scala new file mode 100644 index 0000000000..982e4323ab --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/Channel.scala @@ -0,0 +1,271 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.persistence + +import akka.actor._ + +/** + * A channel is used by [[Processor]]s for sending received persistent messages to destinations. + * It prevents redundant delivery of messages to these destinations when a processor is recovered + * i.e. receives replayed messages. This requires that channel destinations confirm the receipt of + * persistent messages by calling `confirm()` on the [[Persistent]] message. + * + * A channel can be instructed to deliver a persistent message to a `destination` via the [[Deliver]] + * command. + * + * {{{ + * class ForwardExample extends Processor { + * val destination = context.actorOf(Props[MyDestination]) + * val channel = context.actorOf(Channel.props(), "myChannel") + * + * def receive = { + * case m @ Persistent(payload, _) => { + * // forward modified message to destination + * channel forward Deliver(m.withPayload(s"fw: ${payload}"), destination) + * } + * } + * } + * }}} + * + * To reply to the sender of a persistent message, the `sender` reference should be used as channel + * destination. + * + * {{{ + * class ReplyExample extends Processor { + * val channel = context.actorOf(Channel.props(), "myChannel") + * + * def receive = { + * case m @ Persistent(payload, _) => { + * // reply modified message to sender + * channel ! Deliver(m.withPayload(s"re: ${payload}"), sender) + * } + * } + * } + * }}} + * + * @see [[Deliver]] + */ +class Channel private (_channelId: Option[String]) extends Actor with Stash { + private val extension = Persistence(context.system) + private val id = _channelId match { + case Some(cid) ⇒ cid + case None ⇒ extension.channelId(self) + } + + /** + * Creates a new channel with a generated channel id. + */ + def this() = this(None) + + /** + * Creates a new channel with specified channel id. + * + * @param channelId channel id. + */ + def this(channelId: String) = this(Some(channelId)) + + import ResolvedDelivery._ + + private val delivering: Actor.Receive = { + case Deliver(p: PersistentImpl, destination, resolve) ⇒ { + if (!p.confirms.contains(id)) { + val msg = p.copy(channelId = id, + confirmTarget = extension.journalFor(p.processorId), + confirmMessage = Confirm(p.processorId, p.sequenceNr, id)) + resolve match { + case Resolve.Sender if !p.resolved ⇒ { + context.actorOf(Props(classOf[ResolvedSenderDelivery], msg, destination, sender)) ! DeliverResolved + context.become(buffering, false) + } + case Resolve.Destination if !p.resolved ⇒ { + context.actorOf(Props(classOf[ResolvedDestinationDelivery], msg, destination, sender)) ! DeliverResolved + context.become(buffering, false) + } + case _ ⇒ destination tell (msg, sender) + } + } + } + } + + private val buffering: Actor.Receive = { + case DeliveredResolved | DeliveredUnresolved ⇒ context.unbecome(); unstashAll() // TODO: optimize + case _: Deliver ⇒ stash() + } + + def receive = delivering +} + +object Channel { + /** + * Returns a channel configuration object for creating a [[Channel]] with a + * generated id. + */ + def props(): Props = Props(classOf[Channel]) + + /** + * Returns a channel configuration object for creating a [[Channel]] with the + * specified id. + * + * @param channelId channel id. + */ + def props(channelId: String): Props = Props(classOf[Channel], channelId) +} + +/** + * Instructs a [[Channel]] to deliver `persistent` message to destination `destination`. + * The `resolve` parameter can be: + * + * - `Resolve.Destination`: will resolve a new destination reference from the specified + * `destination`s path. The `persistent` message will be sent to the newly resolved + * destination. + * - `Resolve.Sender`: will resolve a new sender reference from this `Deliver` message's + * `sender` path. The `persistent` message will be sent to the specified `destination` + * using the newly resolved sender. + * - `Resolve.Off`: will not do any resolution (default). + * + * Resolving an actor reference means first obtaining an `ActorSelection` from the path of + * the reference to be resolved and then obtaining a new actor reference via an `Identify` + * - `ActorIdentity` conversation. Actor reference resolution does not change the original + * order of messages. + * + * Resolving actor references may become necessary when using the stored sender references + * of replayed messages. A stored sender reference may become invalid (for example, it may + * reference a previous sender incarnation, after a JVM restart). Depending on how a processor + * uses sender references, two resolution strategies are relevant. + * + * - `Resolve.Sender` when a processor forwards a replayed message to a destination. + * + * {{{ + * channel forward Deliver(message, destination, Resolve.Sender) + * }}} + * + * - `Resolve.Destination` when a processor replies to the sender of a replayed message. In + * this case the sender is used as channel destination. + * + * {{{ + * channel ! Deliver(message, sender, Resolve.Destination) + * }}} + * + * A destination or sender reference will only be resolved by a channel if + * + * - the `resolve` parameter is set to `Resolve.Destination` or `Resolve.Channel` + * - the message is replayed + * - the message is not retained by the channel and + * - there was no previous successful resolve action for that message + * + * @param persistent persistent message. + * @param destination persistent message destination. + * @param resolve resolve strategy. + */ +@SerialVersionUID(1L) +case class Deliver(persistent: Persistent, destination: ActorRef, resolve: Resolve.ResolveStrategy = Resolve.Off) + +object Deliver { + /** + * Java API. + */ + def create(persistent: Persistent, destination: ActorRef) = Deliver(persistent, destination) + + /** + * Java API. + */ + def create(persistent: Persistent, destination: ActorRef, resolve: Resolve.ResolveStrategy) = Deliver(persistent, destination, resolve) +} + +/** + * Actor reference resolution strategy. + * + * @see [[Deliver]] + */ +object Resolve { + sealed abstract class ResolveStrategy + + /** + * No resolution. + */ + @SerialVersionUID(1L) + case object Off extends ResolveStrategy + + /** + * [[Channel]] should resolve the `sender` of a [[Deliver]] message. + */ + @SerialVersionUID(1L) + case object Sender extends ResolveStrategy + + /** + * [[Channel]] should resolve the `destination` of a [[Deliver]] message. + */ + @SerialVersionUID(1L) + case object Destination extends ResolveStrategy + + /** + * Java API. + */ + def off() = Off + + /** + * Java API. + */ + def sender() = Sender + + /** + * Java API. + */ + def destination() = Destination +} + +/** + * Resolved delivery support. + */ +private trait ResolvedDelivery extends Actor { + import scala.concurrent.duration._ + import scala.language.postfixOps + import ResolvedDelivery._ + + context.setReceiveTimeout(5 seconds) // TODO: make configurable + + def path: ActorPath + def onResolveSuccess(ref: ActorRef): Unit + def onResolveFailure(): Unit + + def receive = { + case DeliverResolved ⇒ context.actorSelection(path) ! Identify(1) + case ActorIdentity(1, Some(ref)) ⇒ onResolveSuccess(ref); shutdown(DeliveredResolved) + case ActorIdentity(1, None) ⇒ onResolveFailure(); shutdown(DeliveredUnresolved) + case ReceiveTimeout ⇒ onResolveFailure(); shutdown(DeliveredUnresolved) + } + + def shutdown(message: Any) { + context.parent ! message + context.stop(self) + } +} + +private object ResolvedDelivery { + case object DeliverResolved + case object DeliveredResolved + case object DeliveredUnresolved +} + +/** + * Resolves `destination` before sending `persistent` message to the resolved destination using + * the specified sender (`sdr`) as message sender. + */ +private class ResolvedDestinationDelivery(persistent: PersistentImpl, destination: ActorRef, sdr: ActorRef) extends ResolvedDelivery { + val path = destination.path + def onResolveSuccess(ref: ActorRef) = ref tell (persistent.copy(resolved = true), sdr) + def onResolveFailure() = destination tell (persistent, sdr) +} + +/** + * Resolves `sdr` before sending `persistent` message to specified `destination` using + * the resolved sender as message sender. + */ +private class ResolvedSenderDelivery(persistent: PersistentImpl, destination: ActorRef, sdr: ActorRef) extends ResolvedDelivery { + val path = sdr.path + def onResolveSuccess(ref: ActorRef) = destination tell (persistent.copy(resolved = true), ref) + def onResolveFailure() = destination tell (persistent, sdr) +} + diff --git a/akka-persistence/src/main/scala/akka/persistence/Journal.scala b/akka-persistence/src/main/scala/akka/persistence/Journal.scala new file mode 100644 index 0000000000..a430e65222 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/Journal.scala @@ -0,0 +1,85 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.persistence + +import akka.actor._ + +private[persistence] trait JournalFactory { + /** + * + * Creates a new journal actor. + */ + def createJournal(implicit factory: ActorRefFactory): ActorRef +} + +/** + * Defines messages exchanged between processors, channels and a journal. + */ +private[persistence] object Journal { + /** + * Instructs a journal to mark the `persistent` message as deleted. + * A persistent message marked as deleted is not replayed during recovery. + * + * @param persistent persistent message. + */ + case class Delete(persistent: Persistent) + + /** + * Instructs a journal to persist a message. + * + * @param persistent message to be persisted. + * @param processor requesting processor. + */ + case class Write(persistent: PersistentImpl, processor: ActorRef) + + /** + * Reply message to a processor that `persistent` message has been journaled. + * + * @param persistent persistent message. + */ + case class Written(persistent: PersistentImpl) + + /** + * Instructs a journal to loop a `message` back to `processor`, without persisting the + * message. Looping of messages through a journal is required to preserve message order + * with persistent messages. + * + * @param message message to be looped through the journal. + * @param processor requesting processor. + */ + case class Loop(message: Any, processor: ActorRef) + + /** + * Reply message to a processor that a `message` has been looped through the journal. + * + * @param message looped message. + */ + case class Looped(message: Any) + + /** + * Instructs a journal to replay persistent messages to `processor`, identified by + * `processorId`. Messages are replayed up to sequence number `toSequenceNr` (inclusive). + * + * @param toSequenceNr upper sequence number bound (inclusive) for replay. + * @param processor processor that receives the replayed messages. + * @param processorId processor id. + */ + case class Replay(toSequenceNr: Long, processor: ActorRef, processorId: String) + + /** + * Wrapper for a replayed `persistent` message. + * + * @param persistent persistent message. + */ + case class Replayed(persistent: PersistentImpl) + + /** + * Message sent to a processor after the last [[Replayed]] message. + * + * @param maxSequenceNr the highest stored sequence number (for a processor). + */ + case class RecoveryEnd(maxSequenceNr: Long) +} + diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala new file mode 100644 index 0000000000..19632689b4 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -0,0 +1,66 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.persistence + +import com.typesafe.config.Config + +import akka.actor._ +import akka.persistence.journal._ + +/** + * Akka persistence extension. + */ +object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider { + class Settings(config: Config) { + val rootConfig = config.getConfig("akka.persistence.journal") + val journalName = rootConfig.getString("use") + val journalConfig = rootConfig.getConfig(journalName) + val journalFactory = journalName match { + case "inmem" ⇒ new InmemJournalSettings(journalConfig) + case "leveldb" ⇒ new LeveldbJournalSettings(journalConfig) + } + } + + /** + * Java API. + */ + override def get(system: ActorSystem): Persistence = super.get(system) + + def createExtension(system: ExtendedActorSystem): Persistence = new Persistence(system) + + def lookup() = Persistence +} + +/** + * Akka persistence extension. + */ +class Persistence(val system: ExtendedActorSystem) extends Extension { + private val settings = new Persistence.Settings(system.settings.config) + private val journal = settings.journalFactory.createJournal(system) + // TODO: journal should have its own dispatcher + + /** + * Returns a journal for processor identified by `pid`. + * + * @param processorId processor id. + */ + def journalFor(processorId: String): ActorRef = { + // Currently returns a journal singleton is returned but this methods allows + // for later optimisations where each processor can have its own journal actor. + journal + } + + /** + * Creates a canonical processor id from a processor actor ref. + */ + def processorId(processor: ActorRef): String = id(processor) + + /** + * Creates a canonical channel id from a channel actor ref. + */ + def channelId(channel: ActorRef): String = id(channel) + + private def id(ref: ActorRef) = ref.path.toStringWithAddress(system.provider.getDefaultAddress) +} diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala new file mode 100644 index 0000000000..08e50ee527 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -0,0 +1,103 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.persistence + +import akka.actor.ActorRef + +/** + * Persistent message. + */ +sealed abstract class Persistent { + /** + * This persistent message's payload. + */ + //#payload + def payload: Any + //#payload + + /** + * This persistent message's sequence number. + */ + //#sequence-nr + def sequenceNr: Long + //#sequence-nr + + /** + * Creates a new persistent message with the specified `payload`. + */ + def withPayload(payload: Any): Persistent + + /** + * Called by [[Channel]] destinations to confirm the receipt of a persistent message. + */ + def confirm(): Unit +} + +object Persistent { + /** + * Java API. + * + * Creates a new persistent message. Must only be used outside processors. + * + * @param payload payload of new persistent message. + */ + def create(payload: Any): Persistent = + create(payload, null) + + /** + * Java API. + * + * Creates a new persistent message, derived from the specified current message. The current + * message can be obtained inside a [[Processor]] by calling `getCurrentPersistentMessage()`. + * + * @param payload payload of new persistent message. + * @param currentPersistentMessage current persistent message. + */ + def create(payload: Any, currentPersistentMessage: Persistent): Persistent = + apply(payload)(Option(currentPersistentMessage)) + + /** + * Creates a new persistent message, derived from an implicit current message. + * When used inside a [[Processor]], this is the optional current [[Message]] + * of that processor. + * + * @param payload payload of the new persistent message. + * @param currentPersistentMessage optional current persistent message, defaults to `None`. + */ + def apply(payload: Any)(implicit currentPersistentMessage: Option[Persistent] = None): Persistent = + currentPersistentMessage.map(_.withPayload(payload)).getOrElse(PersistentImpl(payload)) + + /** + * Persistent message extractor. + */ + def unapply(persistent: Persistent): Option[(Any, Long)] = + Some((persistent.payload, persistent.sequenceNr)) +} + +/** + * INTERNAL API. + * + * Internal [[Persistent]] representation. + */ +private[persistence] case class PersistentImpl( + payload: Any, + sequenceNr: Long = 0L, + resolved: Boolean = true, + processorId: String = "", + channelId: String = "", + sender: String = "", + confirms: Seq[String] = Nil, + confirmTarget: ActorRef = null, + confirmMessage: Confirm = null) extends Persistent { + + def withPayload(payload: Any): Persistent = copy(payload = payload) + def confirm(): Unit = if (confirmTarget != null) confirmTarget ! confirmMessage +} + +/** + * Message to confirm the receipt of a persistent message (sent via a [[Channel]]). + */ +@SerialVersionUID(1L) +private[persistence] case class Confirm(processorId: String, sequenceNr: Long, channelId: String) diff --git a/akka-persistence/src/main/scala/akka/persistence/Processor.scala b/akka-persistence/src/main/scala/akka/persistence/Processor.scala new file mode 100644 index 0000000000..b16f5681ba --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/Processor.scala @@ -0,0 +1,415 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.persistence + +import akka.actor._ +import akka.dispatch._ + +/** + * An actor that persists (journals) messages of type [[Persistent]]. Messages of other types are not persisted. + * + * {{{ + * import akka.persistence.{ Persistent, Processor } + * + * class MyProcessor extends Processor { + * def receive = { + * case Persistent(payload, sequenceNr) => // message has been written to journal + * case other => // message has not been written to journal + * } + * } + * + * val processor = actorOf(Props[MyProcessor], name = "myProcessor") + * + * processor ! Persistent("foo") + * processor ! "bar" + * }}} + * + * + * During start and restart, persistent messages are replayed to a processor so that it can recover internal + * state from these messages. New messages sent to a processor during recovery do not interfere with replayed + * messages, hence applications don't need to wait for a processor to complete its recovery. + * + * Automated recovery can be turned off or customized by overriding the [[preStartProcessor]] and + * [[preRestartProcessor]] life cycle hooks. If automated recovery is turned off, an application can + * explicitly recover a processor by sending it a [[Recover]] message. + * + * [[Persistent]] messages are assigned sequence numbers that are generated on a per-processor basis. A sequence + * starts at `1L` and doesn't contain gaps unless a processor (logically) [[delete]]s a message. + * + * During recovery, a processor internally buffers new messages until recovery completes, so that new messages + * do not interfere with replayed messages. This internal buffer (the ''processor stash'') is isolated from the + * ''user stash'' inherited by `akka.actor.Stash`. `Processor` implementation classes can therefore use the + * ''user stash'' for stashing/unstashing both persistent and transient messages. + * + * @see [[UntypedProcessor]] + */ +trait Processor extends Actor with Stash { + import Journal._ + + private val extension = Persistence(context.system) + private val _processorId = extension.processorId(self) + + /** + * Processor state. + */ + private trait State { + /** + * State-specific message handler. + */ + def aroundReceive(receive: Actor.Receive, message: Any): Unit + + protected def process(receive: Actor.Receive, message: Any) = + receive.applyOrElse(message, unhandled) + + protected def processPersistent(receive: Actor.Receive, persistent: Persistent) = try { + _currentPersistent = persistent + updateLastSequenceNr(persistent) + receive.applyOrElse(persistent, unhandled) + } finally _currentPersistent = null + + protected def updateLastSequenceNr(persistent: Persistent) { + if (persistent.sequenceNr > _lastSequenceNr) _lastSequenceNr = persistent.sequenceNr + } + } + + /** + * Initial state, waits for `Recover` request, then changes to `recoveryStarted`. + */ + private val recoveryPending = new State { + override def toString: String = "recovery pending" + + def aroundReceive(receive: Actor.Receive, message: Any): Unit = message match { + case Recover(toSnr) ⇒ { + _currentState = recoveryStarted + journal ! Replay(toSnr, self, processorId) + } + case _ ⇒ stashInternal() + } + } + + /** + * Processes replayed messages. Changes to `recoverySucceeded` if all replayed + * messages have been successfully processed, otherwise, changes to `recoveryFailed`. + * In case of a failure, the exception is caught and stored for being thrown later + * in `prepareRestart`. + */ + private val recoveryStarted = new State { + override def toString: String = "recovery started" + + def aroundReceive(receive: Actor.Receive, message: Any) = message match { + case Replayed(p) ⇒ try { processPersistent(receive, p) } catch { + case t: Throwable ⇒ { + _currentState = recoveryFailed // delay throwing exception to prepareRestart + _recoveryFailureReason = t + _recoveryFailureMessage = currentEnvelope + } + } + case RecoveryEnd(maxSnr) ⇒ { + _currentState = recoverySucceeded + _sequenceNr = maxSnr + unstashAllInternal() + } + case Recover(_) ⇒ // ignore + case _ ⇒ stashInternal() + } + } + + /** + * Journals and processes new messages, both persistent and transient. + */ + private val recoverySucceeded = new State { + override def toString: String = "recovery finished" + + def aroundReceive(receive: Actor.Receive, message: Any) = message match { + case Recover(_) ⇒ // ignore + case Replayed(p) ⇒ processPersistent(receive, p) // can occur after unstash from user stash + case Written(p) ⇒ processPersistent(receive, p) + case Looped(p) ⇒ process(receive, p) + case p: PersistentImpl ⇒ journal forward Write(p.copy(processorId = processorId, sequenceNr = nextSequenceNr()), self) + case m ⇒ journal forward Loop(m, self) + } + } + + /** + * Consumes remaining replayed messages and then changes to `prepareRestart`. The + * message that caused the exception during replay, is re-added to the mailbox and + * re-received in `prepareRestart`. + */ + private val recoveryFailed = new State { + override def toString: String = "recovery failed" + + def aroundReceive(receive: Actor.Receive, message: Any) = message match { + case RecoveryEnd(maxSnr) ⇒ { + _currentState = prepareRestart + mailbox.enqueueFirst(self, _recoveryFailureMessage) + } + case Replayed(p) ⇒ updateLastSequenceNr(p) + case _ ⇒ // ignore + } + } + + /** + * Re-receives the replayed message that causes an exception during replay and throws + * that exception. + */ + private val prepareRestart = new State { + override def toString: String = "prepare restart" + + def aroundReceive(receive: Actor.Receive, message: Any) = message match { + case Replayed(_) ⇒ throw _recoveryFailureReason + case _ ⇒ // ignore + } + } + + private var _sequenceNr: Long = 0L + private var _lastSequenceNr: Long = 0L + + private var _currentPersistent: Persistent = _ + private var _currentState: State = recoveryPending + + private var _recoveryFailureReason: Throwable = _ + private var _recoveryFailureMessage: Envelope = _ + + private lazy val journal: ActorRef = extension.journalFor(processorId) + + /** + * Processor id. Defaults to this processor's path and can be overridden. + */ + def processorId: String = _processorId + + /** + * Highest received sequence number so far or `0L` if this processor hasn't received + * a persistent message yet. Usually equal to the sequence number of `currentPersistentMessage` + * (unless a processor implementation is about to re-order persistent messages using + * `stash()` and `unstash()`). + */ + def lastSequenceNr: Long = _lastSequenceNr + + /** + * Returns `true` if this processor is currently recovering. + */ + def recoveryRunning: Boolean = + _currentState == recoveryStarted || + _currentState == prepareRestart + + /** + * Returns `true` if this processor has successfully finished recovery. + */ + def recoveryFinished: Boolean = + _currentState == recoverySucceeded + + /** + * Returns the current persistent message if there is one. + */ + implicit def currentPersistentMessage: Option[Persistent] = Option(_currentPersistent) + + /** + * Marks the `persistent` message as deleted. A message marked as deleted is not replayed during + * recovery. This method is usually called inside `preRestartProcessor` when a persistent message + * caused an exception. Processors that want to re-receive that persistent message during recovery + * should not call this method. + */ + def delete(persistent: Persistent) { + journal ! Delete(persistent) + } + + /** + * INTERNAL API. + */ + final override protected[akka] def aroundReceive(receive: Actor.Receive, message: Any): Unit = { + _currentState.aroundReceive(receive, message) + } + + private def nextSequenceNr(): Long = { + _sequenceNr += 1L + _sequenceNr + } + + /** + * User-overridable callback. Called when a processor is started. Default implementation sends + * a `Recover()` to `self`. + */ + @throws(classOf[Exception]) + def preStartProcessor(): Unit = { + self ! Recover() + } + + /** + * User-overridable callback. Called when a processor is stopped. Empty default implementation. + */ + @throws(classOf[Exception]) + def postStopProcessor(): Unit = () + + /** + * User-overridable callback. Called before a processor is restarted. Default implementation sends + * a `Recover(lastSequenceNr)` message to `self` if `message` is defined, `Recover() otherwise`. + */ + @throws(classOf[Exception]) + def preRestartProcessor(reason: Throwable, message: Option[Any]): Unit = message match { + case Some(_) ⇒ self ! Recover(lastSequenceNr) + case None ⇒ self ! Recover() + } + + /** + * User-overridable callback. Called after a processor has been restarted. Empty default implementation. + */ + @throws(classOf[Exception]) + def postRestartProcessor(reason: Throwable): Unit = () + + /** + * Calls [[preStartProcessor]]. + */ + override def preStart() { + preStartProcessor() + super.preStart() + } + + /** + * Calls [[postStopProcessor]] and unstashes all messages from the ''user stash'' that cannot be + * replayed. The user stash is empty afterwards. + */ + override def postStop() { + postStopProcessor() + try unstashAll(unstashFilterPredicate) finally super.postStop() + } + + /** + * Calls [[preRestartDefault]] and then `super.preRestart()`. If processor implementation + * classes want to opt out from stopping child actors, they should override this method and + * call [[preRestartDefault]] only. + */ + override def preRestart(reason: Throwable, message: Option[Any]) { + try preRestartDefault(reason, message) finally super.preRestart(reason, message) + } + + /** + * Calls [[preRestartProcessor]] and unstashes all messages from the ''user stash'' that cannot be + * replayed. The user stash is empty afterwards. + */ + protected def preRestartDefault(reason: Throwable, message: Option[Any]) { + message match { + case Some(Written(m)) ⇒ preRestartProcessor(reason, Some(m)) + case Some(Looped(m)) ⇒ preRestartProcessor(reason, Some(m)) + case Some(Replayed(m)) ⇒ preRestartProcessor(reason, Some(m)) + case mo ⇒ preRestartProcessor(reason, None) + } + + unstashAll(unstashFilterPredicate) + unstashAllInternal() + } + + /** + * Calls [[postRestartProcessor]]. + */ + override def postRestart(reason: Throwable) { + postRestartProcessor(reason) + super.postRestart(reason) + } + + // ----------------------------------------------------- + // Processor-internal stash + // ----------------------------------------------------- + + private def unstashFilterPredicate: Any ⇒ Boolean = { + case _: Written ⇒ false + case _: Replayed ⇒ false + case _ ⇒ true + } + + private var processorStash = Vector.empty[Envelope] + + private def stashInternal(): Unit = { + processorStash :+= currentEnvelope + } + + private def unstashAllInternal(): Unit = try { + val i = processorStash.reverseIterator + while (i.hasNext) mailbox.enqueueFirst(self, i.next()) + } finally { + processorStash = Vector.empty[Envelope] + } + + private def currentEnvelope: Envelope = + context.asInstanceOf[ActorCell].currentMessage +} + +/** + * Java API. + * + * An actor that persists (journals) messages of type [[Persistent]]. Messages of other types are not persisted. + * + * {{{ + * import akka.persistence.Persistent; + * import akka.persistence.Processor; + * + * class MyProcessor extends UntypedProcessor { + * public void onReceive(Object message) throws Exception { + * if (message instanceof Persistent) { + * // message has been written to journal + * Persistent persistent = (Persistent)message; + * Object payload = persistent.payload(); + * Long sequenceNr = persistent.sequenceNr(); + * // ... + * } else { + * // message has not been written to journal + * } + * } + * } + * + * // ... + * + * ActorRef processor = getContext().actorOf(Props.create(MyProcessor.class), "myProcessor"); + * + * processor.tell(Persistent.create("foo"), null); + * processor.tell("bar", null); + * }}} + * + * During start and restart, persistent messages are replayed to a processor so that it can recover internal + * state from these messages. New messages sent to a processor during recovery do not interfere with replayed + * messages, hence applications don't need to wait for a processor to complete its recovery. + * + * Automated recovery can be turned off or customized by overriding the [[preStartProcessor]] and + * [[preRestartProcessor]] life cycle hooks. If automated recovery is turned off, an application can + * explicitly recover a processor by sending it a [[Recover]] message. + * + * [[Persistent]] messages are assigned sequence numbers that are generated on a per-processor basis. A sequence + * starts at `1L` and doesn't contain gaps unless a processor (logically) [[delete]]s a message. + * + * During recovery, a processor internally buffers new messages until recovery completes, so that new messages + * do not interfere with replayed messages. This internal buffer (the ''processor stash'') is isolated from the + * ''user stash'' inherited by `akka.actor.Stash`. `Processor` implementation classes can therefore use the + * ''user stash'' for stashing/unstashing both persistent and transient messages. + * + * @see [[Processor]] + */ +abstract class UntypedProcessor extends UntypedActor with Processor { + + /** + * Java API. + * + * Returns the current persistent message or `null` if there is none. + */ + def getCurrentPersistentMessage = currentPersistentMessage.getOrElse(null) +} + +/** + * Recovery request for a [[Processor]]. + * + * @param toSequenceNr upper sequence number bound (inclusive) for replayed messages. + */ +@SerialVersionUID(1L) +case class Recover(toSequenceNr: Long = Long.MaxValue) + +object Recover { + /** + * Java API. + */ + def create() = Recover() + + /** + * Java API. + */ + def create(toSequenceNr: Long) = Recover(toSequenceNr) +} diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/InmemJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/InmemJournal.scala new file mode 100644 index 0000000000..fc8e89533f --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/InmemJournal.scala @@ -0,0 +1,76 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.persistence.journal + +import com.typesafe.config.Config + +import akka.actor._ +import akka.pattern.PromiseActorRef +import akka.persistence._ + +/** + * In-memory journal configuration object. + */ +private[persistence] class InmemJournalSettings(config: Config) extends JournalFactory { + /** + * Creates a new in-memory journal actor from this configuration object. + */ + def createJournal(implicit factory: ActorRefFactory): ActorRef = factory.actorOf(Props(classOf[InmemJournal], this)) +} + +/** + * In-memory journal. + */ +private[persistence] class InmemJournal(settings: InmemJournalSettings) extends Actor { + // processorId => (message, sender, deleted) + private var messages = Map.empty[String, List[(PersistentImpl, ActorRef, Boolean)]] + // (processorId, sequenceNr) => confirming channels ids + private var confirms = Map.empty[(String, Long), List[String]] + + import Journal._ + + def receive = { + case Write(pm, p) ⇒ { + // must be done because PromiseActorRef instances have no uid set TODO: discuss + val ps = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender + messages = messages + (messages.get(pm.processorId) match { + case None ⇒ pm.processorId -> List((pm.copy(resolved = false), ps, false)) + case Some(mss) ⇒ pm.processorId -> ((pm.copy(resolved = false), ps, false) :: mss) + }) + p.tell(Written(pm), sender) + } + case c @ Confirm(pid, snr, cid) ⇒ { + val pair = (pid, snr) + confirms = confirms + (confirms.get(pair) match { + case None ⇒ pair -> List(cid) + case Some(cids) ⇒ pair -> (cid :: cids) + }) + // TODO: turn off by default and allow to turn on by configuration + context.system.eventStream.publish(c) + } + case Delete(pm: PersistentImpl) ⇒ { + val pid = pm.processorId + val snr = pm.sequenceNr + messages = messages map { + case (`pid`, mss) ⇒ pid -> (mss map { + case (msg, sdr, _) if msg.sequenceNr == snr ⇒ (msg, sdr, true) + case ms ⇒ ms + }) + case kv ⇒ kv + } + } + case Loop(m, p) ⇒ { + p.tell(Looped(m), sender) + } + case Replay(toSnr, p, pid) ⇒ { + val cfs = confirms.withDefaultValue(Nil) + for { + mss ← messages.get(pid) + (msg, sdr, del) ← mss.reverseIterator.filter(_._1.sequenceNr <= toSnr) + } if (!del) p.tell(Replayed(msg.copy(confirms = cfs((msg.processorId, msg.sequenceNr)))), sdr) + p.tell(RecoveryEnd(messages.get(pid).map(_.head._1.sequenceNr).getOrElse(0L)), self) + } + } +} diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/LeveldbJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/LeveldbJournal.scala new file mode 100644 index 0000000000..e764554be6 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/LeveldbJournal.scala @@ -0,0 +1,278 @@ +/** + * Copyright (C) 2012-2013 Eligotech BV. + */ + +package akka.persistence.journal + +import java.io.File +import java.nio.ByteBuffer + +import com.typesafe.config.Config + +import org.iq80.leveldb._ + +import akka.actor._ +import akka.pattern.PromiseActorRef +import akka.persistence._ +import akka.serialization.{ Serialization, SerializationExtension } + +/** + * LevelDB journal configuration object. + */ +private[persistence] class LeveldbJournalSettings(config: Config) extends JournalFactory { + /** + * Name of directory where journal files shall be stored. Can be a relative or absolute path. + */ + val dir: String = config.getString("dir") + + /** + * Currently `false`. + */ + val checksum = false + + /** + * Currently `false`. + */ + val fsync = false + + /** + * Creates a new LevelDB journal actor from this configuration object. + */ + def createJournal(implicit factory: ActorRefFactory): ActorRef = + factory.actorOf(Props(classOf[LeveldbJournal], this).withDispatcher("akka.persistence.journal.leveldb.dispatcher")) +} + +/** + * LevelDB journal. + */ +private[persistence] class LeveldbJournal(settings: LeveldbJournalSettings) extends Actor { + // TODO: support migration of processor and channel ids + // needed if default processor and channel ids are used + // (actor paths, which contain deployment information). + + val leveldbOptions = new Options().createIfMissing(true).compressionType(CompressionType.NONE) + val levelDbReadOptions = new ReadOptions().verifyChecksums(settings.checksum) + val levelDbWriteOptions = new WriteOptions().sync(settings.fsync) + + val leveldbFactory = org.iq80.leveldb.impl.Iq80DBFactory.factory + var leveldb: DB = _ + + val numericIdOffset = 10 + var pathMap: Map[String, Int] = Map.empty + + // TODO: use protobuf serializer for PersistentImpl + // TODO: use user-defined serializer for payload + val serializer = SerializationExtension(context.system).findSerializerFor("") + + import LeveldbJournal._ + import Journal._ + + def receive = { + case Write(pm, p) ⇒ { + val sm = withBatch { batch ⇒ + // must be done because PromiseActorRef instances have no uid set TODO: discuss + val ps = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender + val sm = pm.copy(sender = Serialization.serializedActorPath(ps)) + val pid = numericId(sm.processorId) + batch.put(keyToBytes(counterKey(pid)), counterToBytes(sm.sequenceNr)) + batch.put(keyToBytes(Key(pid, sm.sequenceNr, 0)), msgToBytes(sm.copy(resolved = false, confirmTarget = null, confirmMessage = null))) + sm + } + p.tell(Written(sm), sender) + } + case c @ Confirm(pid, snr, cid) ⇒ { + leveldb.put(keyToBytes(Key(numericId(pid), snr, numericId(cid))), cid.getBytes("UTF-8")) + // TODO: turn off by default and allow to turn on by configuration + context.system.eventStream.publish(c) + } + case Delete(pm: PersistentImpl) ⇒ { + leveldb.put(keyToBytes(deletionKey(numericId(pm.processorId), pm.sequenceNr)), Array.empty[Byte]) + } + case Loop(m, p) ⇒ { + p.tell(Looped(m), sender) + } + case Replay(toSnr, p, pid) ⇒ { + val options = levelDbReadOptions.snapshot(leveldb.getSnapshot) + val iter = leveldb.iterator(options) + val maxSnr = leveldb.get(keyToBytes(counterKey(numericId(pid))), options) match { + case null ⇒ 0L + case bytes ⇒ counterFromBytes(bytes) + } + context.actorOf(Props(classOf[LeveldbReplay], msgFromBytes _)) ! LeveldbReplay.Replay(toSnr, maxSnr, p, numericId(pid), iter) + } + } + + private def msgToBytes(m: PersistentImpl): Array[Byte] = serializer.toBinary(m) + private def msgFromBytes(a: Array[Byte]): PersistentImpl = serializer.fromBinary(a).asInstanceOf[PersistentImpl] + + // ---------------------------------------------------------- + // Path mapping + // ---------------------------------------------------------- + + private def numericId(processorId: String): Int = pathMap.get(processorId) match { + case None ⇒ writePathMapping(processorId, pathMap.size + numericIdOffset) + case Some(v) ⇒ v + } + + private def readPathMap(): Map[String, Int] = { + val iter = leveldb.iterator(levelDbReadOptions.snapshot(leveldb.getSnapshot)) + try { + iter.seek(keyToBytes(idToKey(numericIdOffset))) + readPathMap(Map.empty, iter) + } finally { + iter.close() + } + } + + private def readPathMap(pathMap: Map[String, Int], iter: DBIterator): Map[String, Int] = { + if (!iter.hasNext) pathMap else { + val nextEntry = iter.next() + val nextKey = keyFromBytes(nextEntry.getKey) + if (!isMappingKey(nextKey)) pathMap else { + val nextVal = new String(nextEntry.getValue, "UTF-8") + readPathMap(pathMap + (nextVal -> idFromKey(nextKey)), iter) + } + } + } + + private def writePathMapping(path: String, numericId: Int): Int = { + pathMap = pathMap + (path -> numericId) + leveldb.put(keyToBytes(idToKey(numericId)), path.getBytes("UTF-8")) + numericId + } + + // ---------------------------------------------------------- + // Batch write support + // ---------------------------------------------------------- + + def withBatch[R](body: WriteBatch ⇒ R): R = { + val batch = leveldb.createWriteBatch() + try { + val r = body(batch) + leveldb.write(batch, levelDbWriteOptions) + r + } finally { + batch.close() + } + } + + // ---------------------------------------------------------- + // Life cycle + // ---------------------------------------------------------- + + override def preStart() { + leveldb = leveldbFactory.open(new File(settings.dir), leveldbOptions) + pathMap = readPathMap() + } + + override def postStop() { + leveldb.close() + } +} + +private object LeveldbJournal { + case class Key( + processorId: Int, + sequenceNr: Long, + channelId: Int) + + def idToKey(id: Int) = Key(1, 0L, id) + def idFromKey(key: Key) = key.channelId + + def counterKey(processorId: Int): Key = Key(processorId, 0L, 0) + def isMappingKey(key: Key): Boolean = key.processorId == 1 + + def deletionKey(processorId: Int, sequenceNr: Long): Key = Key(processorId, sequenceNr, 1) + def isDeletionKey(key: Key): Boolean = key.channelId == 1 + + def counterToBytes(ctr: Long): Array[Byte] = ByteBuffer.allocate(8).putLong(ctr).array + def counterFromBytes(bytes: Array[Byte]): Long = ByteBuffer.wrap(bytes).getLong + + def keyToBytes(key: Key): Array[Byte] = { + val bb = ByteBuffer.allocate(20) + bb.putInt(key.processorId) + bb.putLong(key.sequenceNr) + bb.putInt(key.channelId) + bb.array + } + + def keyFromBytes(bytes: Array[Byte]): Key = { + val bb = ByteBuffer.wrap(bytes) + val aid = bb.getInt + val snr = bb.getLong + val cid = bb.getInt + new Key(aid, snr, cid) + } +} + +private class LeveldbReplay(deserialize: Array[Byte] ⇒ PersistentImpl) extends Actor { + val extension = Persistence(context.system) + + import LeveldbReplay._ + import LeveldbJournal._ + import Journal.{ Replayed, RecoveryEnd } + + // TODO: parent should stop replay actor if it crashes + // TODO: use a pinned dispatcher + + def receive = { + case Replay(toSnr, maxSnr, processor, processorId, iter) ⇒ { + try { + val startKey = Key(processorId, 1L, 0) + iter.seek(keyToBytes(startKey)) + replay(iter, startKey, toSnr, m ⇒ processor.tell(Replayed(m), extension.system.provider.resolveActorRef(m.sender))) + } finally { iter.close() } + processor.tell(RecoveryEnd(maxSnr), self) + context.stop(self) + } + } + + @scala.annotation.tailrec + private def replay(iter: DBIterator, key: Key, toSnr: Long, callback: PersistentImpl ⇒ Unit) { + if (iter.hasNext) { + val nextEntry = iter.next() + val nextKey = keyFromBytes(nextEntry.getKey) + if (nextKey.sequenceNr > toSnr) { + // end iteration here + } else if (nextKey.channelId != 0) { + // phantom confirmation (just advance iterator) + replay(iter, nextKey, toSnr, callback) + } else if (key.processorId == nextKey.processorId) { + val msg = deserialize(nextEntry.getValue) + val del = deletion(iter, nextKey) + val cnf = confirms(iter, nextKey, Nil) + if (!del) callback(msg.copy(confirms = cnf)) + replay(iter, nextKey, toSnr, callback) + } + } + } + + private def deletion(iter: DBIterator, key: Key): Boolean = { + if (iter.hasNext) { + val nextEntry = iter.peekNext() + val nextKey = keyFromBytes(nextEntry.getKey) + if (key.processorId == nextKey.processorId && key.sequenceNr == nextKey.sequenceNr && isDeletionKey(nextKey)) { + iter.next() + true + } else false + } else false + } + + @scala.annotation.tailrec + private def confirms(iter: DBIterator, key: Key, channelIds: List[String]): List[String] = { + if (iter.hasNext) { + val nextEntry = iter.peekNext() + val nextKey = keyFromBytes(nextEntry.getKey) + if (key.processorId == nextKey.processorId && key.sequenceNr == nextKey.sequenceNr) { + val nextValue = new String(nextEntry.getValue, "UTF-8") + iter.next() + confirms(iter, nextKey, nextValue :: channelIds) + } else channelIds + } else channelIds + } + +} + +private object LeveldbReplay { + case class Replay(toSequenceNr: Long, maxSequenceNr: Long, processor: ActorRef, pid: Int, iterator: DBIterator) +} diff --git a/akka-persistence/src/test/scala/akka/persistence/ChannelSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ChannelSpec.scala new file mode 100644 index 0000000000..b4d20176f5 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/ChannelSpec.scala @@ -0,0 +1,129 @@ +package akka.persistence + +import akka.actor._ +import akka.testkit._ + +object ChannelSpec { + val config = + """ + |serialize-creators = on + |serialize-messages = on + |akka.persistence.journal.leveldb.dir = "target/journal-channel-spec" + """.stripMargin + + class TestProcessor extends Processor { + val destination = context.actorOf(Props[TestDestination]) + val channel = context.actorOf(Channel.props(), "channel") + + def receive = { + case m @ Persistent(s: String, _) if s.startsWith("a") ⇒ { + // forward to destination via channel, + // destination replies to initial sender + channel forward Deliver(m.withPayload(s"fw: ${s}"), destination) + } + case m @ Persistent(s: String, _) if s.startsWith("b") ⇒ { + // reply to sender via channel + channel ! Deliver(m.withPayload(s"re: ${s}"), sender) + } + } + } + + class TestDestination extends Actor { + def receive = { + case m: Persistent ⇒ sender ! m + } + } + + class TestReceiver(testActor: ActorRef) extends Actor { + def receive = { + case Persistent(payload, _) ⇒ testActor ! payload + } + } +} + +class ChannelSpec extends AkkaSpec(ChannelSpec.config) with PersistenceSpec with ImplicitSender { + import ChannelSpec._ + + override protected def beforeEach() { + super.beforeEach() + + val confirmProbe = TestProbe() + val forwardProbe = TestProbe() + val replyProbe = TestProbe() + + val processor = system.actorOf(Props[TestProcessor], name) + + system.eventStream.subscribe(confirmProbe.ref, classOf[Confirm]) + + processor tell (Persistent("a1"), forwardProbe.ref) + processor tell (Persistent("b1"), replyProbe.ref) + + forwardProbe.expectMsgPF() { case m @ Persistent("fw: a1", _) ⇒ m.confirm() } + replyProbe.expectMsgPF() { case m @ Persistent("re: b1", _) ⇒ m.confirm() } + + // wait for confirmations to be stored by journal (needed + // for replay so that channels can drop confirmed messages) + confirmProbe.expectMsgType[Confirm] + confirmProbe.expectMsgType[Confirm] + + stopAndAwaitTermination(processor) + } + + "A channel" must { + "forward un-confirmed messages to destination" in { + val processor = system.actorOf(Props[TestProcessor], name) + processor ! Persistent("a2") + expectMsgPF() { case m @ Persistent("fw: a2", _) ⇒ m.confirm() } + } + "reply un-confirmed messages to senders" in { + val processor = system.actorOf(Props[TestProcessor], name) + processor ! Persistent("b2") + expectMsgPF() { case m @ Persistent("re: b2", _) ⇒ m.confirm() } + } + "must resolve sender references and preserve message order" in { + val channel = system.actorOf(Channel.props(), "testChannel1") + val destination = system.actorOf(Props[TestDestination]) + val sender1 = system.actorOf(Props(classOf[TestReceiver], testActor), "testSender") + + channel tell (Deliver(Persistent("a"), destination), sender1) + expectMsg("a") + stopAndAwaitTermination(sender1) + + // create new incarnation of sender (with same actor path) + val sender2 = system.actorOf(Props(classOf[TestReceiver], testActor), "testSender") + + // replayed message (resolved = false) and invalid sender reference + channel tell (Deliver(PersistentImpl("a", resolved = false), destination, Resolve.Sender), sender1) + + // new messages (resolved = true) and valid sender references + channel tell (Deliver(Persistent("b"), destination), sender2) + channel tell (Deliver(Persistent("c"), destination), sender2) + + expectMsg("a") + expectMsg("b") + expectMsg("c") + } + "must resolve destination references and preserve message order" in { + val channel = system.actorOf(Channel.props(), "testChannel2") + val destination1 = system.actorOf(Props(classOf[TestReceiver], testActor), "testDestination") + + channel ! Deliver(Persistent("a"), destination1) + expectMsg("a") + stopAndAwaitTermination(destination1) + + // create new incarnation of destination (with same actor path) + val destination2 = system.actorOf(Props(classOf[TestReceiver], testActor), "testDestination") + + // replayed message (resolved = false) and invalid destination reference + channel ! Deliver(PersistentImpl("a", resolved = false), destination1, Resolve.Destination) + + // new messages (resolved = true) and valid destination references + channel ! Deliver(Persistent("b"), destination2) + channel ! Deliver(Persistent("c"), destination2) + + expectMsg("a") + expectMsg("b") + expectMsg("c") + } + } +} diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala new file mode 100644 index 0000000000..274aca85d4 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala @@ -0,0 +1,46 @@ +package akka.persistence + +import java.io.File +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.commons.io.FileUtils +import org.scalatest.BeforeAndAfterEach + +import akka.actor.ActorRef +import akka.testkit.AkkaSpec +import akka.testkit.TestActor.Watch + +trait PersistenceSpec extends BeforeAndAfterEach { this: AkkaSpec ⇒ + private var _name: String = _ + + val extension = Persistence(system) + val counter = new AtomicInteger(0) + + /** + * Unique name per test. + */ + def name = _name + + /** + * Prefix for generating a unique name per test. + */ + def namePrefix: String = "processor" + + protected def stopAndAwaitTermination(ref: ActorRef) { + testActor ! Watch(ref) + system.stop(ref) + expectTerminated(ref) + } + + override protected def beforeEach() { + _name = namePrefix + counter.incrementAndGet() + } + + override protected def afterTermination() { + FileUtils.deleteDirectory(new File(system.settings.config.getString("akka.persistence.journal.leveldb.dir"))) + } +} + +trait TurnOffRecoverOnStart { this: Processor ⇒ + override def preStartProcessor(): Unit = () +} diff --git a/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala new file mode 100644 index 0000000000..7f88277343 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala @@ -0,0 +1,303 @@ +package akka.persistence + +import akka.actor._ +import akka.testkit._ + +object ProcessorSpec { + val config = + """ + |serialize-creators = on + |serialize-messages = on + |akka.persistence.journal.leveldb.dir = "target/journal-processor-spec" + """.stripMargin + + case object GetState + + class RecoverTestProcessor extends Processor { + var state = List.empty[String] + def receive = { + case "boom" ⇒ throw new Exception("boom") + case Persistent("boom", _) ⇒ throw new Exception("boom") + case Persistent(payload, snr) ⇒ state = s"${payload}-${snr}" :: state + case GetState ⇒ sender ! state.reverse + } + + override def preRestartProcessor(reason: Throwable, message: Option[Any]) = { + message match { + case Some(m: Persistent) ⇒ delete(m) // delete message from journal + case _ ⇒ // ignore + } + super.preRestartProcessor(reason, message) + } + } + + class RecoverOffTestProcessor extends RecoverTestProcessor with TurnOffRecoverOnStart + + class StoredSenderTestProcessor extends Processor { + def receive = { + case Persistent(payload, _) ⇒ sender ! payload + } + } + + class RecoveryStatusTestProcessor extends Processor { + def receive = { + case Persistent("c", _) if !recoveryRunning ⇒ sender ! "c" + case Persistent(payload, _) if recoveryRunning ⇒ sender ! payload + } + } + + class BehaviorChangeTestProcessor extends Processor { + val acceptA: Actor.Receive = { + case Persistent("a", _) ⇒ { + sender ! "a" + context.become(acceptB) + } + } + + val acceptB: Actor.Receive = { + case Persistent("b", _) ⇒ { + sender ! "b" + context.become(acceptA) + } + } + + def receive = acceptA + } + + class FsmTestProcessor extends Processor with FSM[String, Int] { + startWith("closed", 0) + + when("closed") { + case Event(Persistent("a", _), counter) ⇒ { + goto("open") using (counter + 1) replying (counter) + } + } + + when("open") { + case Event(Persistent("b", _), counter) ⇒ { + goto("closed") using (counter + 1) replying (counter) + } + } + } + + class OutboundMessageTestProcessor extends Processor { + def receive = { + case Persistent(payload, snr) ⇒ sender ! Persistent(snr) + } + } + + class ResumeTestException extends Exception("test") + + class ResumeTestSupervisor extends Actor { + val processor = context.actorOf(Props[ResumeTestProcessor], "processor") + + override val supervisorStrategy = + OneForOneStrategy() { + case _: ResumeTestException ⇒ SupervisorStrategy.Resume + } + + def receive = { + case m ⇒ processor forward m + } + } + + class ResumeTestProcessor extends Processor { + var state: List[String] = Nil + def receive = { + case "boom" ⇒ throw new ResumeTestException + case Persistent(payload, snr) ⇒ state = s"${payload}-${snr}" :: state + case GetState ⇒ sender ! state.reverse + } + } + + class LastReplayedMsgFailsTestProcessor extends RecoverTestProcessor { + override def preRestartProcessor(reason: Throwable, message: Option[Any]) = { + message match { + case Some(m: Persistent) ⇒ if (recoveryRunning) delete(m) + case _ ⇒ + } + super.preRestartProcessor(reason, message) + } + } + + class AnyReplayedMsgFailsTestProcessor extends RecoverTestProcessor { + val failOnReplayedA: Actor.Receive = { + case Persistent("a", _) if recoveryRunning ⇒ throw new Exception("boom") + } + + override def receive = failOnReplayedA orElse super.receive + } +} + +class ProcessorSpec extends AkkaSpec(ProcessorSpec.config) with PersistenceSpec with ImplicitSender { + import ProcessorSpec._ + + override protected def beforeEach() { + super.beforeEach() + + val processor = system.actorOf(Props[RecoverTestProcessor], name) + processor ! Persistent("a") + processor ! Persistent("b") + processor ! GetState + expectMsg(List("a-1", "b-2")) + stopAndAwaitTermination(processor) + } + + "A processor" must { + "recover state on explicit request" in { + val processor = system.actorOf(Props[RecoverOffTestProcessor], name) + processor ! Recover() + processor ! GetState + expectMsg(List("a-1", "b-2")) + } + "recover state automatically" in { + val processor = system.actorOf(Props[RecoverTestProcessor], name) + processor ! GetState + expectMsg(List("a-1", "b-2")) + } + "recover state automatically on restart" in { + val processor = system.actorOf(Props[RecoverTestProcessor], name) + processor ! "boom" + processor ! GetState + expectMsg(List("a-1", "b-2")) + } + "buffer new messages until recovery completed" in { + val processor = system.actorOf(Props[RecoverOffTestProcessor], name) + processor ! Persistent("c") + processor ! Recover() + processor ! Persistent("d") + processor ! GetState + expectMsg(List("a-1", "b-2", "c-3", "d-4")) + } + "ignore redundant recovery requests" in { + val processor = system.actorOf(Props[RecoverOffTestProcessor], name) + processor ! Persistent("c") + processor ! Recover() + processor ! Persistent("d") + processor ! Recover() + processor ! Persistent("e") + processor ! GetState + expectMsg(List("a-1", "b-2", "c-3", "d-4", "e-5")) + } + "buffer new messages until restart-recovery completed" in { + val processor = system.actorOf(Props[RecoverTestProcessor], name) + processor ! "boom" + processor ! Persistent("c") + processor ! Persistent("d") + processor ! GetState + expectMsg(List("a-1", "b-2", "c-3", "d-4")) + } + "allow deletion of journaled messages on failure" in { + val processor = system.actorOf(Props[RecoverTestProcessor], name) + processor ! Persistent("boom") // journaled message causes failure and will be deleted + processor ! GetState + expectMsg(List("a-1", "b-2")) + } + "allow deletion of journaled messages on failure and buffer new messages until restart-recovery completed" in { + val processor = system.actorOf(Props[RecoverTestProcessor], name) + processor ! Persistent("boom") // journaled message causes failure and will be deleted + processor ! Persistent("c") + processor ! Persistent("d") + processor ! GetState + expectMsg(List("a-1", "b-2", "c-4", "d-5")) // deleted message leaves gap in sequence + } + "store sender references and restore them for replayed messages" in { + system.actorOf(Props[StoredSenderTestProcessor], name) + List("a", "b") foreach (expectMsg(_)) + } + "properly indicate its recovery status" in { + val processor = system.actorOf(Props[RecoveryStatusTestProcessor], name) + processor ! Persistent("c") + List("a", "b", "c") foreach (expectMsg(_)) + } + "continue journaling when changing behavior" in { + val processor = system.actorOf(Props[BehaviorChangeTestProcessor], name) + processor ! Persistent("a") + processor ! Persistent("b") + List("a", "b", "a", "b") foreach (expectMsg(_)) + } + "derive outbound messages from the current message" in { + val processor = system.actorOf(Props[OutboundMessageTestProcessor], name) + processor ! Persistent("c") + 1 to 3 foreach { _ ⇒ expectMsgPF() { case Persistent(payload, snr) ⇒ payload must be(snr) } } + } + "support recovery with upper sequence number bound" in { + val processor = system.actorOf(Props[RecoverOffTestProcessor], name) + processor ! Recover(1L) + processor ! GetState + expectMsg(List("a-1")) + } + "never replace journaled messages" in { + val processor1 = system.actorOf(Props[RecoverOffTestProcessor], name) + processor1 ! Recover(1L) + processor1 ! Persistent("c") + processor1 ! GetState + expectMsg(List("a-1", "c-3")) + stopAndAwaitTermination(processor1) + + val processor2 = system.actorOf(Props[RecoverOffTestProcessor], name) + processor2 ! Recover() + processor2 ! GetState + expectMsg(List("a-1", "b-2", "c-3")) + } + "be able to skip restart recovery when being resumed" in { + val supervisor1 = system.actorOf(Props[ResumeTestSupervisor], name) + supervisor1 ! Persistent("a") + supervisor1 ! Persistent("b") + supervisor1 ! GetState + expectMsg(List("a-1", "b-2")) + stopAndAwaitTermination(supervisor1) + + val supervisor2 = system.actorOf(Props[ResumeTestSupervisor], name) + supervisor2 ! Persistent("c") + supervisor2 ! "boom" + supervisor2 ! Persistent("d") + supervisor2 ! GetState + expectMsg(List("a-1", "b-2", "c-3", "d-4")) + stopAndAwaitTermination(supervisor2) + + val supervisor3 = system.actorOf(Props[ResumeTestSupervisor], name) + supervisor3 ! GetState + expectMsg(List("a-1", "b-2", "c-3", "d-4")) + } + "be able to re-run restart recovery when it fails with last replayed message" in { + val processor = system.actorOf(Props[LastReplayedMsgFailsTestProcessor], name) + processor ! Persistent("c") + processor ! Persistent("boom") + processor ! Persistent("d") + processor ! GetState + expectMsg(List("a-1", "b-2", "c-3", "d-5")) + } + "be able to re-run initial recovery when it fails with a message that is not the last replayed message" in { + val processor = system.actorOf(Props[AnyReplayedMsgFailsTestProcessor], name) + processor ! Persistent("c") + processor ! GetState + expectMsg(List("b-2", "c-3")) + } + "be able to re-run restart recovery when it fails with a message that is not the last replayed message" in { + val processor = system.actorOf(Props[AnyReplayedMsgFailsTestProcessor], "other") // new processor, no initial replay + processor ! Persistent("b") + processor ! Persistent("a") + processor ! Persistent("c") + processor ! Persistent("d") + processor ! Persistent("e") + processor ! Persistent("f") + processor ! Persistent("g") + processor ! Persistent("h") + processor ! Persistent("i") + processor ! "boom" + processor ! Persistent("j") + processor ! GetState + expectMsg(List("b-1", "c-3", "d-4", "e-5", "f-6", "g-7", "h-8", "i-9", "j-10")) + } + } + + "A processor" can { + "be a finite state machine" in { + val processor = system.actorOf(Props[FsmTestProcessor], name) + processor ! Persistent("a") + processor ! Persistent("b") + List(0, 1, 2, 3) foreach (expectMsg(_)) + } + } +} diff --git a/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala new file mode 100644 index 0000000000..1260a6c60e --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala @@ -0,0 +1,127 @@ +package akka.persistence + +import akka.actor._ +import akka.testkit._ + +object ProcessorStashSpec { + val config = + """ + |serialize-creators = on + |serialize-messages = on + |akka.persistence.journal.leveldb.dir = "target/journal-processor-stash-spec" + """.stripMargin + + case object GetState + + class StashingProcessor extends Processor { + var state: List[String] = Nil + + val behaviorA: Actor.Receive = { + case Persistent("a", snr) ⇒ update("a", snr); context.become(behaviorB) + case Persistent("b", snr) ⇒ update("b", snr) + case Persistent("c", snr) ⇒ update("c", snr); unstashAll() + case "x" ⇒ update("x") + case "boom" ⇒ throw new Exception("boom") + case Persistent("boom", _) ⇒ throw new Exception("boom") + case GetState ⇒ sender ! state.reverse + } + + val behaviorB: Actor.Receive = { + case Persistent("b", _) ⇒ stash(); context.become(behaviorA) + case "x" ⇒ stash() + } + + def receive = behaviorA + + def update(payload: String, snr: Long = 0L) { + state = s"${payload}-${snr}" :: state + } + } + + class RecoveryFailureStashingProcessor extends StashingProcessor { + override def preRestartProcessor(reason: Throwable, message: Option[Any]) = { + message match { + case Some(m: Persistent) ⇒ if (recoveryRunning) delete(m) + case _ ⇒ + } + super.preRestartProcessor(reason, message) + } + } +} + +class ProcessorStashSpec extends AkkaSpec(ProcessorStashSpec.config) with PersistenceSpec with ImplicitSender { + import ProcessorStashSpec._ + + "A processor" must { + "support user stash and unstash operations for persistent messages" in { + val p1 = system.actorOf(Props[StashingProcessor], name) + p1 ! Persistent("a") + p1 ! Persistent("b") + p1 ! Persistent("c") + p1 ! GetState + expectMsg(List("a-1", "c-3", "b-2")) + stopAndAwaitTermination(p1) + + val p2 = system.actorOf(Props[StashingProcessor], name) + p2 ! Persistent("a") + p2 ! Persistent("b") + p2 ! Persistent("c") + p2 ! GetState + expectMsg(List("a-1", "c-3", "b-2", "a-4", "c-6", "b-5")) + } + "support user stash and unstash operations for persistent and transient messages" in { + val p1 = system.actorOf(Props[StashingProcessor], name) + p1 ! Persistent("a") + p1 ! "x" + p1 ! Persistent("b") + p1 ! Persistent("c") + p1 ! GetState + expectMsg(List("a-1", "c-3", "x-0", "b-2")) + stopAndAwaitTermination(p1) + + val p2 = system.actorOf(Props[StashingProcessor], name) + p2 ! Persistent("a") + p2 ! "x" + p2 ! Persistent("b") + p2 ! Persistent("c") + p2 ! GetState + expectMsg(List("a-1", "c-3", "b-2", "a-4", "c-6", "x-0", "b-5")) + } + "support restarts between user stash and unstash operations" in { + val p1 = system.actorOf(Props[StashingProcessor], name) + p1 ! Persistent("a") + p1 ! Persistent("b") + p1 ! "boom" + p1 ! Persistent("c") + p1 ! GetState + expectMsg(List("a-1", "c-3", "b-2")) + stopAndAwaitTermination(p1) + + val p2 = system.actorOf(Props[StashingProcessor], name) + p2 ! Persistent("a") + p2 ! Persistent("b") + p2 ! "boom" + p2 ! Persistent("c") + p2 ! GetState + expectMsg(List("a-1", "c-3", "b-2", "a-4", "c-6", "b-5")) + } + "support multiple restarts between user stash and unstash operations" in { + val p1 = system.actorOf(Props[RecoveryFailureStashingProcessor], name) + p1 ! Persistent("a") + p1 ! Persistent("b") + p1 ! Persistent("boom") + p1 ! Persistent("c") + p1 ! GetState + expectMsg(List("a-1", "c-4", "b-2")) + stopAndAwaitTermination(p1) + + val p2 = system.actorOf(Props[RecoveryFailureStashingProcessor], name) + p2 ! Persistent("a") + p2 ! Persistent("b") + p2 ! Persistent("boom") + p2 ! Persistent("c") + p2 ! GetState + expectMsg(List("a-1", "c-4", "b-2", "a-5", "c-8", "b-6")) + } + } +} diff --git a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorChannelExample.java b/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorChannelExample.java new file mode 100644 index 0000000000..91a8398d55 --- /dev/null +++ b/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorChannelExample.java @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package sample.persistence.japi; + +import akka.actor.*; +import akka.persistence.*; + +public class ProcessorChannelExample { + public static class ExampleProcessor extends UntypedProcessor { + private ActorRef destination; + private ActorRef channel; + + public ExampleProcessor(ActorRef destination) { + this.destination = destination; + this.channel = getContext().actorOf(Channel.props(), "channel"); + } + + @Override + public void onReceive(Object message) throws Exception { + if (message instanceof Persistent) { + Persistent msg = (Persistent)message; + System.out.println("processed " + msg.payload()); + channel.tell(Deliver.create(msg.withPayload("processed " + msg.payload()), destination), getSelf()); + } + } + } + + public static class ExampleDestination extends UntypedActor { + @Override + public void onReceive(Object message) throws Exception { + if (message instanceof Persistent) { + Persistent msg = (Persistent)message; + msg.confirm(); + System.out.println("received " + msg.payload()); + } + } + } + + public static void main(String... args) throws Exception { + final ActorSystem system = ActorSystem.create("example"); + final ActorRef destination = system.actorOf(Props.create(ExampleDestination.class)); + final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class, destination), "processor"); + + processor.tell(Persistent.create("a"), null); + processor.tell(Persistent.create("b"), null); + + Thread.sleep(1000); + system.shutdown(); + } +} diff --git a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorFailureExample.java b/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorFailureExample.java new file mode 100644 index 0000000000..6999635638 --- /dev/null +++ b/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorFailureExample.java @@ -0,0 +1,75 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package sample.persistence.japi; + +import java.util.ArrayList; + +import scala.Option; + +import akka.actor.*; +import akka.persistence.*; + +public class ProcessorFailureExample { + public static class ExampleProcessor extends UntypedProcessor { + private ArrayList received = new ArrayList(); + + @Override + public void onReceive(Object message) throws Exception { + if (message instanceof Persistent) { + Persistent persistent = (Persistent)message; + if (persistent.payload() == "boom") { + throw new Exception("boom"); + } else { + received.add(persistent.payload()); + } + } else if (message == "boom") { + throw new Exception("boom"); + } else if (message == "print") { + System.out.println("received " + received); + } + } + + @Override + public void preRestartProcessor(Throwable reason, Option message) throws Exception { + if (message.isDefined() && message.get() instanceof Persistent) { + delete((Persistent) message.get()); + } + super.preRestartProcessor(reason, message); + } + } + + public static void main(String... args) throws Exception { + final ActorSystem system = ActorSystem.create("example"); + final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor"); + + processor.tell(Persistent.create("a"), null); + processor.tell("print", null); + processor.tell("boom", null); + processor.tell("print", null); + processor.tell(Persistent.create("b"), null); + processor.tell("print", null); + processor.tell(Persistent.create("boom"), null); + processor.tell("print", null); + processor.tell(Persistent.create("c"), null); + processor.tell("print", null); + + // Will print in a first run (i.e. with empty journal): + + // received [a] + // received [a, b] + // received [a, b, c] + + // Will print in a second run: + + // received [a, b, c, a] + // received [a, b, c, a, b] + // received [a, b, c, a, b, c] + + // etc ... + + Thread.sleep(1000); + system.shutdown(); + } +} diff --git a/akka-samples/akka-sample-persistence/src/main/resources/application.conf b/akka-samples/akka-sample-persistence/src/main/resources/application.conf new file mode 100644 index 0000000000..f483b177e8 --- /dev/null +++ b/akka-samples/akka-sample-persistence/src/main/resources/application.conf @@ -0,0 +1 @@ +akka.persistence.journal.leveldb.dir = "target/journal-example" diff --git a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ConversationRecoveryExample.scala b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ConversationRecoveryExample.scala new file mode 100644 index 0000000000..23645b0125 --- /dev/null +++ b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ConversationRecoveryExample.scala @@ -0,0 +1,58 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package sample.persistence + +import akka.actor._ +import akka.persistence._ + +object ConversationRecoveryExample extends App { + case object Ping + case object Pong + + class Ping extends Processor { + val pongChannel = context.actorOf(Channel.props, "pongChannel") + var counter = 0 + + def receive = { + case m @ Persistent(Ping, _) ⇒ { + counter += 1 + println(s"received ping ${counter} times ...") + m.confirm() + if (!recoveryRunning) Thread.sleep(2000) + pongChannel ! Deliver(m.withPayload(Pong), sender, Resolve.Destination) + } + case "init" ⇒ if (counter == 0) self forward Persistent(Ping) + } + + override def preStartProcessor() = () + } + + class Pong extends Processor { + val pingChannel = context.actorOf(Channel.props, "pingChannel") + var counter = 0 + + def receive = { + case m @ Persistent(Pong, _) ⇒ { + counter += 1 + println(s"received pong ${counter} times ...") + m.confirm() + if (!recoveryRunning) Thread.sleep(2000) + pingChannel ! Deliver(m.withPayload(Ping), sender, Resolve.Destination) + } + } + + override def preStartProcessor() = () + } + + val system = ActorSystem("example") + + val ping = system.actorOf(Props(classOf[Ping]), "ping") + val pong = system.actorOf(Props(classOf[Pong]), "pong") + + ping ! Recover() + pong ! Recover() + + ping tell ("init", pong) +} diff --git a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorChannelExample.scala b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorChannelExample.scala new file mode 100644 index 0000000000..eda76ecc13 --- /dev/null +++ b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorChannelExample.scala @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package sample.persistence + +import akka.actor._ +import akka.pattern.ask +import akka.persistence._ +import akka.util.Timeout + +object ProcessorChannelExample extends App { + class ExampleProcessor extends Processor { + val channel = context.actorOf(Channel.props, "channel") + val destination = context.actorOf(Props[ExampleDestination]) + var received: List[Persistent] = Nil + + def receive = { + case p @ Persistent(payload, _) ⇒ { + println(s"processed ${payload}") + channel forward Deliver(p.withPayload(s"processed ${payload}"), destination) + } + } + } + + class ExampleDestination extends Actor { + def receive = { + case p @ Persistent(payload, snr) ⇒ { + println(s"received ${payload}") + sender ! s"re: ${payload} (${snr})" + p.confirm() + } + } + } + + val system = ActorSystem("example") + val processor = system.actorOf(Props(classOf[ExampleProcessor]), "processor") + + implicit val timeout = Timeout(3000) + import system.dispatcher + + processor ? Persistent("a") onSuccess { case reply ⇒ println(s"reply = ${reply}") } + processor ? Persistent("b") onSuccess { case reply ⇒ println(s"reply = ${reply}") } + + Thread.sleep(1000) + system.shutdown() +} diff --git a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorFailureExample.scala b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorFailureExample.scala new file mode 100644 index 0000000000..c7961238e0 --- /dev/null +++ b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorFailureExample.scala @@ -0,0 +1,60 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package sample.persistence + +import akka.actor._ +import akka.persistence._ + +object ProcessorFailureExample extends App { + class ExampleProcessor extends Processor { + var received: List[String] = Nil // state + + def receive = { + case "print" ⇒ println(s"received ${received.reverse}") + case "boom" ⇒ throw new Exception("boom") + case Persistent("boom", _) ⇒ throw new Exception("boom") + case Persistent(payload: String, _) ⇒ received = payload :: received + } + + override def preRestartProcessor(reason: Throwable, message: Option[Any]) { + message match { + case Some(p: Persistent) if !recoveryRunning ⇒ delete(p) // mark failing message as deleted + case _ ⇒ // ignore + } + super.preRestartProcessor(reason, message) + } + } + + val system = ActorSystem("example") + val processor = system.actorOf(Props(classOf[ExampleProcessor]), "processor") + + processor ! Persistent("a") + processor ! "print" + processor ! "boom" // restart and recovery + processor ! "print" + processor ! Persistent("b") + processor ! "print" + processor ! Persistent("boom") // restart, recovery and deletion of message from journal + processor ! "print" + processor ! Persistent("c") + processor ! "print" + + // Will print in a first run (i.e. with empty journal): + + // received List(a) + // received List(a, b) + // received List(a, b, c) + + // Will print in a second run: + + // received List(a, b, c, a) + // received List(a, b, c, a, b) + // received List(a, b, c, a, b, c) + + // etc ... + + Thread.sleep(1000) + system.shutdown() +} diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 14d4c2d99e..1f0cb67c89 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -81,7 +81,7 @@ object AkkaBuild extends Build { ), aggregate = Seq(actor, testkit, actorTests, dataflow, remote, remoteTests, camel, cluster, slf4j, agent, transactor, - mailboxes, zeroMQ, kernel, akkaSbtPlugin, osgi, osgiAries, docs, contrib, samples, channels, channelsTests, + persistence, mailboxes, zeroMQ, kernel, akkaSbtPlugin, osgi, osgiAries, docs, contrib, samples, channels, channelsTests, multiNodeTestkit) ) @@ -276,6 +276,16 @@ object AkkaBuild extends Build { ) ) + lazy val persistence = Project( + id = "akka-persistence-experimental", + base = file("akka-persistence"), + dependencies = Seq(actor, testkit % "test->test"), + settings = defaultSettings ++ scaladocSettings ++ experimentalSettings ++ javadocSettings ++ OSGi.persistence ++ Seq( + libraryDependencies ++= Dependencies.persistence, + previousArtifact := akkaPreviousArtifact("akka-persistence") + ) + ) + val testMailbox = SettingKey[Boolean]("test-mailbox") lazy val mailboxes = Project( @@ -431,7 +441,7 @@ object AkkaBuild extends Build { id = "akka-samples", base = file("akka-samples"), settings = parentSettings, - aggregate = Seq(camelSample, fsmSample, helloSample, helloKernelSample, remoteSample, clusterSample, multiNodeSample, osgiDiningHakkersSample) + aggregate = Seq(camelSample, fsmSample, helloSample, helloKernelSample, remoteSample, persistenceSample, clusterSample, multiNodeSample, osgiDiningHakkersSample) ) lazy val camelSample = Project( @@ -469,6 +479,13 @@ object AkkaBuild extends Build { settings = sampleSettings ) + lazy val persistenceSample = Project( + id = "akka-sample-persistence", + base = file("akka-samples/akka-sample-persistence"), + dependencies = Seq(actor, persistence), + settings = sampleSettings + ) + lazy val clusterSample = Project( id = "akka-sample-cluster", base = file("akka-samples/akka-sample-cluster"), @@ -566,7 +583,8 @@ object AkkaBuild extends Build { id = "akka-docs", base = file("akka-docs"), dependencies = Seq(actor, testkit % "test->test", mailboxesCommon % "compile;test->test", channels, - remote % "compile;test->test", cluster, slf4j, agent, dataflow, transactor, fileMailbox, zeroMQ, camel, osgi, osgiAries), + remote % "compile;test->test", cluster, slf4j, agent, dataflow, transactor, fileMailbox, zeroMQ, camel, osgi, osgiAries, + persistence % "compile;test->test"), settings = defaultSettings ++ site.settings ++ site.sphinxSupport() ++ site.publishSite ++ sphinxPreprocessing ++ cpsPlugin ++ Seq( sourceDirectory in Sphinx <<= baseDirectory / "rst", sphinxPackages in Sphinx <+= baseDirectory { _ / "_sphinx" / "pygments" }, @@ -990,6 +1008,8 @@ object AkkaBuild extends Build { val transactor = exports(Seq("akka.transactor.*")) + val persistence = exports(Seq("akka.persistence.*")) + val testkit = exports(Seq("akka.testkit.*")) val zeroMQ = exports(Seq("akka.zeromq.*"), imports = Seq(protobufImport()) ) @@ -1050,6 +1070,7 @@ object Dependencies { val ariesBlueprint = "org.apache.aries.blueprint" % "org.apache.aries.blueprint" % "1.1.0" // ApacheV2 val osgiCore = "org.osgi" % "org.osgi.core" % "4.2.0" // ApacheV2 val osgiCompendium= "org.osgi" % "org.osgi.compendium" % "4.2.0" // ApacheV2 + val levelDB = "org.iq80.leveldb" % "leveldb" % "0.5" // ApacheV2 // Camel Sample val camelJetty = "org.apache.camel" % "camel-jetty" % camelCore.revision // ApacheV2 @@ -1100,6 +1121,8 @@ object Dependencies { val transactor = Seq(scalaStm, Test.scalatest, Test.junit) + val persistence = Seq(levelDB, Test.scalatest, Test.junit, Test.commonsIo) + val mailboxes = Seq(Test.scalatest, Test.junit) val fileMailbox = Seq(Test.commonsIo, Test.scalatest, Test.junit)