diff --git a/akka-docs/rst/general/configuration.rst b/akka-docs/rst/general/configuration.rst index 76c8cec80b..4940dd14dc 100644 --- a/akka-docs/rst/general/configuration.rst +++ b/akka-docs/rst/general/configuration.rst @@ -391,6 +391,12 @@ akka-agent .. literalinclude:: ../../../akka-agent/src/main/resources/reference.conf :language: none +akka-persistence +~~~~~~~~~~~~~~~~ + +.. literalinclude:: ../../../akka-persistence/src/main/resources/reference.conf + :language: none + akka-zeromq ~~~~~~~~~~~ diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index 86ff753dca..06e2afb406 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -25,13 +25,20 @@ public class PersistenceDocTest { class MyProcessor extends UntypedProcessor { public void onReceive(Object message) throws Exception { if (message instanceof Persistent) { - // message has been written to journal + // message successfully written to journal Persistent persistent = (Persistent)message; Object payload = persistent.payload(); Long sequenceNr = persistent.sequenceNr(); // ... + } else if (message instanceof PersistenceFailure) { + // message failed to be written to journal + PersistenceFailure failure = (PersistenceFailure)message; + Object payload = failure.payload(); + Long sequenceNr = failure.sequenceNr(); + Throwable cause = failure.cause(); + // ... } else { - // message has not been written to journal + // message not written to journal } } } @@ -179,11 +186,11 @@ public class PersistenceDocTest { public void onReceive(Object message) throws Exception { if (message.equals("snap")) { saveSnapshot(state); - } else if (message instanceof SaveSnapshotSucceeded) { - SnapshotMetadata metadata = ((SaveSnapshotSucceeded)message).metadata(); + } else if (message instanceof SaveSnapshotSuccess) { + SnapshotMetadata metadata = ((SaveSnapshotSuccess)message).metadata(); // ... - } else if (message instanceof SaveSnapshotFailed) { - SnapshotMetadata metadata = ((SaveSnapshotFailed)message).metadata(); + } else if (message instanceof SaveSnapshotFailure) { + SnapshotMetadata metadata = ((SaveSnapshotFailure)message).metadata(); // ... } } @@ -225,6 +232,5 @@ public class PersistenceDocTest { //#snapshot-criteria } } - }; } diff --git a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java new file mode 100644 index 0000000000..5019ed42b1 --- /dev/null +++ b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java @@ -0,0 +1,141 @@ +package docs.persistence; + +//#plugin-imports +import scala.concurrent.Future; +import akka.japi.Option; +import akka.japi.Procedure; +import akka.persistence.*; +import akka.persistence.journal.japi.*; +import akka.persistence.snapshot.japi.*; +//#plugin-imports + +public class PersistencePluginDocTest { + static Object o1 = new Object() { + abstract class MySnapshotStore extends SnapshotStore { + //#snapshot-store-plugin-api + /** + * Plugin Java API. + * + * Asynchronously loads a snapshot. + * + * @param processorId processor id. + * @param criteria selection criteria for loading. + */ + public abstract Future> doLoadAsync(String processorId, SnapshotSelectionCriteria criteria); + + /** + * Plugin Java API. + * + * Asynchronously saves a snapshot. + * + * @param metadata snapshot metadata. + * @param snapshot snapshot. + */ + public abstract Future doSaveAsync(SnapshotMetadata metadata, Object snapshot); + + /** + * Plugin Java API. + * + * Called after successful saving of a snapshot. + * + * @param metadata snapshot metadata. + */ + public abstract void onSaved(SnapshotMetadata metadata) throws Exception; + + /** + * Plugin Java API. + * + * Deletes the snapshot identified by `metadata`. + * + * @param metadata snapshot metadata. + */ + public abstract void doDelete(SnapshotMetadata metadata) throws Exception; + //#snapshot-store-plugin-api + } + + abstract class MySyncWriteJournal extends SyncWriteJournal { + //#sync-write-plugin-api + /** + * Plugin Java API. + * + * Synchronously writes a `persistent` message to the journal. + */ + @Override + public abstract void doWrite(PersistentImpl persistent) throws Exception; + + /** + * Plugin Java API. + * + * Synchronously marks a `persistent` message as deleted. + */ + @Override + public abstract void doDelete(PersistentImpl persistent) throws Exception; + + /** + * Plugin Java API. + * + * Synchronously writes a delivery confirmation to the journal. + */ + @Override + public abstract void doConfirm(String processorId, long sequenceNr, String channelId) throws Exception; + //#sync-write-plugin-api + } + + abstract class MyAsyncWriteJournal extends AsyncWriteJournal { + //#async-write-plugin-api + /** + * Plugin Java API. + * + * Asynchronously writes a `persistent` message to the journal. + */ + @Override + public abstract Future doWriteAsync(PersistentImpl persistent); + + /** + * Plugin Java API. + * + * Asynchronously marks a `persistent` message as deleted. + */ + @Override + public abstract Future doDeleteAsync(PersistentImpl persistent); + + /** + * Plugin Java API. + * + * Asynchronously writes a delivery confirmation to the journal. + */ + @Override + public abstract Future doConfirmAsync(String processorId, long sequenceNr, String channelId); + //#async-write-plugin-api + } + + abstract class MyAsyncReplay extends AsyncReplay { + //#async-replay-plugin-api + /** + * Plugin Java API. + * + * Asynchronously replays persistent messages. Implementations replay a message + * by calling `replayCallback`. The returned future must be completed when all + * messages (matching the sequence number bounds) have been replayed. The future + * `Long` value must be the highest stored sequence number in the journal for the + * specified processor. The future must be completed with a failure if any of + * the persistent messages could not be replayed. + * + * The `replayCallback` must also be called with messages that have been marked + * as deleted. In this case a replayed message's `deleted` field must be set to + * `true`. + * + * The channel ids of delivery confirmations that are available for a replayed + * message must be contained in that message's `confirms` sequence. + * + * @param processorId processor id. + * @param fromSequenceNr sequence number where replay should start. + * @param toSequenceNr sequence number where replay should end (inclusive). + * @param replayCallback called to replay a single message. + */ + @Override + public abstract Future doReplayAsync(String processorId, long fromSequenceNr, long toSequenceNr, Procedure replayCallback); + //#async-replay-plugin-api + } + }; +} diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 930e1e4572..d0ee60d162 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -50,12 +50,12 @@ Configuration By default, journaled messages are written to a directory named ``journal`` in the current working directory. This can be changed by configuration where the specified path can be relative or absolute: -.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#journal-config +.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#journal-config The default storage location of :ref:`snapshots-java` is a directory named ``snapshots`` in the current working directory. This can be changed by configuration where the specified path can be relative or absolute: -.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#snapshot-config +.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-config .. _processors-java: @@ -69,8 +69,12 @@ A processor can be implemented by extending the abstract ``UntypedProcessor`` cl Processors only write messages of type ``Persistent`` to the journal, others are received without being persisted. When a processor's ``onReceive`` method is called with a ``Persistent`` message it can safely assume that this message -has been successfully written to the journal. A ``UntypedProcessor`` itself is an ``Actor`` and can therefore -be instantiated with ``actorOf``. +has been successfully written to the journal. If a journal fails to write a ``Persistent`` message then the processor +receives a ``PersistenceFailure`` message instead of a ``Persistent`` message. In this case, a processor may want to +inform the sender about the failure, so that the sender can re-send the message, if needed, under the assumption that +the journal recovered from a temporary failure. + +An ``UntypedProcessor`` itself is an ``Actor`` and can therefore be instantiated with ``actorOf``. .. includecode:: code/docs/persistence/PersistenceDocTest.java#usage @@ -226,7 +230,7 @@ Snapshots Snapshots can dramatically reduce recovery times. Processors can save snapshots of internal state by calling the ``saveSnapshot`` method on ``Processor``. If saving of a snapshot succeeds, the processor will receive a -``SaveSnapshotSucceeded`` message, otherwise a ``SaveSnapshotFailed`` message. +``SaveSnapshotSuccess`` message, otherwise a ``SaveSnapshotFailure`` message. .. includecode:: code/docs/persistence/PersistenceDocTest.java#save-snapshot @@ -247,11 +251,59 @@ If not specified, they default to ``SnapshotSelectionCriteria.latest()`` which s To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.none()``. A recovery where no saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages. +Storage plugins +=============== + +Storage backends for journals and snapshot stores are plugins in akka-persistence. The default journal plugin writes +messages to LevelDB. The default snapshot store plugin writes snapshots as individual files to the local filesystem. +Applications can provide their own plugins by implementing a plugin API and activate them by configuration. Plugin +development requires the following imports: + +.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#plugin-imports + +Journal plugin API +------------------ + +A journal plugin either extends ``SyncWriteJournal`` or ``AsyncWriteJournal``. ``SyncWriteJournal`` is an +actor that should be extended when the storage backend API only supports synchronous, blocking writes. The +methods to be implemented in this case are: + +.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#sync-write-plugin-api + +``AsyncWriteJournal`` is an actor that should be extended if the storage backend API supports asynchronous, +non-blocking writes. The methods to be implemented in that case are: + +.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#async-write-plugin-api + +Message replays are always asynchronous, therefore, any journal plugin must implement: + +.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#async-replay-plugin-api + +A journal plugin can be activated with the following minimal configuration: + +.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#journal-plugin-config + +The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher +used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher`` +for ``SyncWriteJournal`` plugins and ``akka.actor.default-dispatcher`` for ``AsyncWriteJournal`` plugins. + +Snapshot store plugin API +------------------------- + +A snapshot store plugin must extend the ``SnapshotStore`` actor and implement the following methods: + +.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#snapshot-store-plugin-api + +A snapshot store plugin can be activated with the following minimal configuration: + +.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-store-plugin-config + +The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher +used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``. + Upcoming features ================= -* Journal plugin API -* Snapshot store plugin API * Reliable channels * Custom serialization of messages and snapshots * Extended deletion of messages and snapshots diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index f891e2c17a..29497dfbf8 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -2,31 +2,27 @@ package docs.persistence import akka.actor.ActorSystem import akka.persistence._ -import akka.persistence.SaveSnapshotSucceeded -import scala.Some trait PersistenceDocSpec { val system: ActorSystem - val config = - """ - //#journal-config - akka.persistence.journal.leveldb.dir = "target/journal" - //#journal-config - //#snapshot-config - akka.persistence.snapshot-store.local.dir = "target/snapshots" - //#snapshot-config - """ import system._ new AnyRef { //#definition - import akka.persistence.{ Persistent, Processor } + import akka.persistence.{ Persistent, PersistenceFailure, Processor } class MyProcessor extends Processor { def receive = { - case Persistent(payload, sequenceNr) ⇒ // message has been written to journal - case other ⇒ // message has not been written to journal + case Persistent(payload, sequenceNr) ⇒ { + // message successfully written to journal + } + case PersistenceFailure(payload, sequenceNr, cause) ⇒ { + // message failed to be written to journal + } + case other ⇒ { + // message not written to journal + } } } //#definition @@ -195,9 +191,9 @@ trait PersistenceDocSpec { var state: Any = _ def receive = { - case "snap" ⇒ saveSnapshot(state) - case SaveSnapshotSucceeded(metadata) ⇒ // ... - case SaveSnapshotFailed(metadata, reason) ⇒ // ... + case "snap" ⇒ saveSnapshot(state) + case SaveSnapshotSuccess(metadata) ⇒ // ... + case SaveSnapshotFailure(metadata, reason) ⇒ // ... } } //#save-snapshot diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala new file mode 100644 index 0000000000..19901b8102 --- /dev/null +++ b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala @@ -0,0 +1,78 @@ +package docs.persistence + +//#plugin-imports +import scala.concurrent.Future +//#plugin-imports + +import com.typesafe.config._ + +import org.scalatest.WordSpec + +import akka.actor.ActorSystem +//#plugin-imports +import akka.persistence._ +import akka.persistence.journal._ +import akka.persistence.snapshot._ +//#plugin-imports + +object PersistencePluginDocSpec { + val config = + """ + //#journal-config + akka.persistence.journal.leveldb.dir = "target/journal" + //#journal-config + //#snapshot-config + akka.persistence.snapshot-store.local.dir = "target/snapshots" + //#snapshot-config + """ +} + +class PersistencePluginDocSpec extends WordSpec { + new AnyRef { + val providerConfig = + """ + //#journal-plugin-config + # Path to the journal plugin to be used + akka.persistence.journal.plugin = "my-journal" + + # My custom journal plugin + my-journal { + # Class name of the plugin. + class = "docs.persistence.MyJournal" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.actor.default-dispatcher" + } + //#journal-plugin-config + + //#snapshot-store-plugin-config + # Path to the snapshot store plugin to be used + akka.persistence.snapshot-store.plugin = "my-snapshot-store" + + # My custom snapshot store plugin + my-snapshot-store { + # Class name of the plugin. + class = "docs.persistence.MySnapshotStore" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + } + //#snapshot-store-plugin-config + """ + + val system = ActorSystem("doc", ConfigFactory.parseString(providerConfig).withFallback(ConfigFactory.parseString(PersistencePluginDocSpec.config))) + val extension = Persistence(system) + } +} + +class MyJournal extends AsyncWriteJournal { + def writeAsync(persistent: PersistentImpl): Future[Unit] = ??? + def deleteAsync(persistent: PersistentImpl): Future[Unit] = ??? + def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] = ??? + def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentImpl) ⇒ Unit): Future[Long] = ??? +} + +class MySnapshotStore extends SnapshotStore { + def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = ??? + def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = ??? + def saved(metadata: SnapshotMetadata) {} + def delete(metadata: SnapshotMetadata) {} +} diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 1850d4395d..fc3f7e280d 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -46,12 +46,12 @@ Configuration By default, journaled messages are written to a directory named ``journal`` in the current working directory. This can be changed by configuration where the specified path can be relative or absolute: -.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#journal-config +.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#journal-config The default storage location of :ref:`snapshots` is a directory named ``snapshots`` in the current working directory. This can be changed by configuration where the specified path can be relative or absolute: -.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#snapshot-config +.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-config .. _processors: @@ -64,8 +64,12 @@ A processor can be implemented by extending the ``Processor`` trait and implemen Processors only write messages of type ``Persistent`` to the journal, others are received without being persisted. When a processor's ``receive`` method is called with a ``Persistent`` message it can safely assume that this message -has been successfully written to the journal. A ``Processor`` itself is an ``Actor`` and can therefore be instantiated -with ``actorOf``. +has been successfully written to the journal. If a journal fails to write a ``Persistent`` message then the processor +receives a ``PersistenceFailure`` message instead of a ``Persistent`` message. In this case, a processor may want to +inform the sender about the failure, so that the sender can re-send the message, if needed, under the assumption that +the journal recovered from a temporary failure. + +A ``Processor`` itself is an ``Actor`` and can therefore be instantiated with ``actorOf``. .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#usage @@ -233,7 +237,7 @@ Snapshots Snapshots can dramatically reduce recovery times. Processors can save snapshots of internal state by calling the ``saveSnapshot`` method on ``Processor``. If saving of a snapshot succeeds, the processor will receive a -``SaveSnapshotSucceeded`` message, otherwise a ``SaveSnapshotFailed`` message +``SaveSnapshotSuccess`` message, otherwise a ``SaveSnapshotFailure`` message .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#save-snapshot @@ -258,6 +262,56 @@ If not specified, they default to ``SnapshotSelectionCriteria.Latest`` which sel To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.None``. A recovery where no saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages. +Storage plugins +=============== + +Storage backends for journals and snapshot stores are plugins in akka-persistence. The default journal plugin writes +messages to LevelDB. The default snapshot store plugin writes snapshots as individual files to the local filesystem. +Applications can provide their own plugins by implementing a plugin API and activate them by configuration. Plugin +development requires the following imports: + +.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#plugin-imports + +Journal plugin API +------------------ + +A journal plugin either extends ``SyncWriteJournal`` or ``AsyncWriteJournal``. ``SyncWriteJournal`` is an +actor that should be extended when the storage backend API only supports synchronous, blocking writes. The +methods to be implemented in this case are: + +.. includecode:: ../../../akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala#journal-plugin-api + +``AsyncWriteJournal`` is an actor that should be extended if the storage backend API supports asynchronous, +non-blocking writes. The methods to be implemented in that case are: + +.. includecode:: ../../../akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala#journal-plugin-api + +Message replays are always asynchronous, therefore, any journal plugin must implement: + +.. includecode:: ../../../akka-persistence/src/main/scala/akka/persistence/journal/AsyncReplay.scala#journal-plugin-api + +A journal plugin can be activated with the following minimal configuration: + +.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#journal-plugin-config + +The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher +used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher`` +for ``SyncWriteJournal`` plugins and ``akka.actor.default-dispatcher`` for ``AsyncWriteJournal`` plugins. + +Snapshot store plugin API +------------------------- + +A snapshot store plugin must extend the ``SnapshotStore`` actor and implement the following methods: + +.. includecode:: ../../../akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala#snapshot-store-plugin-api + +A snapshot store plugin can be activated with the following minimal configuration: + +.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-store-plugin-config + +The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher +used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``. + Miscellaneous ============= @@ -271,8 +325,6 @@ State machines can be persisted by mixing in the ``FSM`` trait into processors. Upcoming features ================= -* Journal plugin API -* Snapshot store plugin API * Reliable channels * Custom serialization of messages and snapshots * Extended deletion of messages and snapshots diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index 0d81389dee..7599d38e1d 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -1,35 +1,90 @@ +########################################## +# Akka Persistence Reference Config File # +########################################## + akka { + persistence { + journal { - use = "leveldb" + + # Path to the journal plugin to be used + plugin = "akka.persistence.journal.leveldb" + + # In-memory journal plugin. + inmem { + + # Class name of the plugin. + class = "akka.persistence.journal.inmem.InmemJournal" + + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.actor.default-dispatcher" + } + + # LevelDB journal plugin. leveldb { + + # Class name of the plugin. + class = "akka.persistence.journal.leveldb.LeveldbJournal" + + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + + # Dispatcher for message replay. + replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher" + + # Storage location of LevelDB files. dir = "journal" - write.dispatcher { - type = PinnedDispatcher - executor = "thread-pool-executor" - } - replay.dispatcher { - type = Dispatcher - executor = "thread-pool-executor" - thread-pool-executor { - core-pool-size-min = 2 - core-pool-size-max = 8 - } - } + + # Use fsync on write fsync = off + + # Verify checksum on read. + checksum = off } } + snapshot-store { - use = "local" + + # Path to the snapshot store plugin to be used + plugin = "akka.persistence.snapshot-store.local" + + # Local filesystem snapshot store plugin. local { + + # Class name of the plugin. + class = "akka.persistence.snapshot.local.LocalSnapshotStore" + + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + + # Dispatcher for streaming snapshot IO. + stream-dispatcher = "akka.persistence.dispatchers.default-stream-dispatcher" + + # Storage location of snapshot files. dir = "snapshots" - io.dispatcher { - type = Dispatcher - executor = "thread-pool-executor" - thread-pool-executor { - core-pool-size-min = 2 - core-pool-size-max = 8 - } + } + } + + dispatchers { + default-plugin-dispatcher { + type = PinnedDispatcher + executor = "thread-pool-executor" + } + default-replay-dispatcher { + type = Dispatcher + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = 2 + core-pool-size-max = 8 + } + } + default-stream-dispatcher { + type = Dispatcher + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = 2 + core-pool-size-max = 8 } } } diff --git a/akka-persistence/src/main/scala/akka/persistence/Journal.scala b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala similarity index 65% rename from akka-persistence/src/main/scala/akka/persistence/Journal.scala rename to akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala index 59adfe94fd..45c0228a83 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Journal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala @@ -6,17 +6,10 @@ package akka.persistence import akka.actor._ -private[persistence] trait JournalFactory { - /** - * Creates a new journal actor. - */ - def createJournal(implicit factory: ActorRefFactory): ActorRef -} - /** * Defines messages exchanged between processors, channels and a journal. */ -private[persistence] object Journal { +private[persistence] object JournalProtocol { /** * Instructs a journal to mark the `persistent` message as deleted. * A persistent message marked as deleted is not replayed during recovery. @@ -34,11 +27,19 @@ private[persistence] object Journal { case class Write(persistent: PersistentImpl, processor: ActorRef) /** - * Reply message to a processor that `persistent` message has been journaled. + * Reply message to a processor that `persistent` message has been successfully journaled. * * @param persistent persistent message. */ - case class Written(persistent: PersistentImpl) + case class WriteSuccess(persistent: PersistentImpl) + + /** + * Reply message to a processor that `persistent` message could not be journaled. + * + * @param persistent persistent message. + * @param cause failure cause. + */ + case class WriteFailure(persistent: PersistentImpl, cause: Throwable) /** * Instructs a journal to loop a `message` back to `processor`, without persisting the @@ -55,12 +56,17 @@ private[persistence] object Journal { * * @param message looped message. */ - case class Looped(message: Any) + case class LoopSuccess(message: Any) /** - * ... + * Instructs a journal to replay messages to `processor`. + * + * @param fromSequenceNr sequence number where replay should start. + * @param toSequenceNr sequence number where replay should end (inclusive). + * @param processorId requesting processor id. + * @param processor requesting processor. */ - case class Replay(fromSequenceNr: Long, toSequenceNr: Long, processor: ActorRef, processorId: String) + case class Replay(fromSequenceNr: Long, toSequenceNr: Long, processorId: String, processor: ActorRef) /** * Reply message to a processor that `persistent` message has been replayed. @@ -74,6 +80,12 @@ private[persistence] object Journal { * * @param maxSequenceNr the highest stored sequence number (for a processor). */ - case class ReplayCompleted(maxSequenceNr: Long) + case class ReplaySuccess(maxSequenceNr: Long) + + /** + * Reply message to a processor that not all `persistent` messages could have been + * replayed. + */ + case class ReplayFailure(cause: Throwable) } diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index 9710f2343d..241e444dc8 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -7,29 +7,14 @@ package akka.persistence import com.typesafe.config.Config import akka.actor._ -import akka.persistence.journal.leveldb._ -import akka.persistence.snapshot.local._ +import akka.dispatch.Dispatchers +import akka.persistence.journal.AsyncWriteJournal /** * Persistence extension. */ object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider { class Settings(config: Config) { - val rootConfig = config.getConfig("akka.persistence") - - val journalsConfig = rootConfig.getConfig("journal") - val journalName = journalsConfig.getString("use") - val journalConfig = journalsConfig.getConfig(journalName) - val journalFactory = journalName match { - case "leveldb" ⇒ new LeveldbJournalSettings(journalConfig) - } - - val snapshotStoresConfig = rootConfig.getConfig("snapshot-store") - val snapshotStoreName = snapshotStoresConfig.getString("use") - val snapshotStoreConfig = snapshotStoresConfig.getConfig(snapshotStoreName) - val snapshotStoreFactory = snapshotStoreName match { - case "local" ⇒ new LocalSnapshotStoreSettings(snapshotStoreConfig) - } } /** @@ -46,9 +31,12 @@ object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider { * Persistence extension. */ class Persistence(val system: ExtendedActorSystem) extends Extension { - private val settings = new Persistence.Settings(system.settings.config) - private val journal = settings.journalFactory.createJournal(system) - private val snapshotStore = settings.snapshotStoreFactory.createSnapshotStore(system) + private val DefaultPluginDispatcherId = "akka.persistence.dispatchers.default-plugin-dispatcher" + + private val config = system.settings.config.getConfig("akka.persistence") + private val snapshotStore = createPlugin("snapshot-store", _ ⇒ DefaultPluginDispatcherId) + private val journal = createPlugin("journal", clazz ⇒ + if (classOf[AsyncWriteJournal].isAssignableFrom(clazz)) Dispatchers.DefaultDispatcherId else DefaultPluginDispatcherId) /** * Returns a snapshot store for a processor identified by `processorId`. @@ -78,5 +66,14 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { */ def channelId(channel: ActorRef): String = id(channel) + private def createPlugin(pluginType: String, dispatcherSelector: Class[_] ⇒ String) = { + val pluginConfigPath = config.getString(s"${pluginType}.plugin") + val pluginConfig = system.settings.config.getConfig(pluginConfigPath) + val pluginClassName = pluginConfig.getString("class") + val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get + val pluginDispatcherId = if (pluginConfig.hasPath("plugin-dispatcher")) pluginConfig.getString("plugin-dispatcher") else dispatcherSelector(pluginClass) + system.actorOf(Props(pluginClass).withDispatcher(pluginDispatcherId)) + } + private def id(ref: ActorRef) = ref.path.toStringWithAddress(system.provider.getDefaultAddress) } diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index 7ef3fafb56..4f17663280 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -4,6 +4,8 @@ package akka.persistence +import java.util.{ List ⇒ JList } + import akka.actor.ActorRef /** @@ -77,25 +79,75 @@ object Persistent { } /** - * INTERNAL API. + * Plugin API. * - * Internal [[Persistent]] representation. + * Internal [[Persistent]] message representation. + * + * @param resolved `true` by default, `false` for replayed messages. Set to `true` by a channel if this + * message is replayed and its sender reference was resolved. Channels use this field to + * avoid redundant sender reference resolutions. + * @param processorId Id of processor that journaled the message. + * @param channelId Id of last channel that delivered the message to a destination. + * @param sender Serialized sender reference. + * @param deleted `true` if this message is marked as deleted. + * @param confirms Channel ids of delivery confirmations that are available for this message. Only non-empty + * for replayed messages. + * @param confirmTarget Delivery confirmation target. + * @param confirmMessage Delivery confirmation message. + * + * @see [[Processor]] + * @see [[Channel]] + * @see [[Deliver]] */ -private[persistence] case class PersistentImpl( +case class PersistentImpl( payload: Any, sequenceNr: Long = 0L, resolved: Boolean = true, processorId: String = "", channelId: String = "", sender: String = "", + deleted: Boolean = false, confirms: Seq[String] = Nil, confirmTarget: ActorRef = null, confirmMessage: Confirm = null) extends Persistent { - def withPayload(payload: Any): Persistent = copy(payload = payload) - def confirm(): Unit = if (confirmTarget != null) confirmTarget ! confirmMessage + def withPayload(payload: Any): Persistent = + copy(payload = payload) + + def confirm(): Unit = + if (confirmTarget != null) confirmTarget ! confirmMessage + + import scala.collection.JavaConverters._ + + /** + * Java Plugin API. + */ + def getConfirms: JList[String] = confirms.asJava } +object PersistentImpl { + /** + * Java Plugin API. + */ + def create(payload: Any, sequenceNr: Long, resolved: Boolean, processorId: String, channelId: String, sender: String, deleted: Boolean, confirms: Seq[String]): PersistentImpl = + PersistentImpl(payload, sequenceNr, resolved, processorId, channelId, sender, deleted, confirms) + + /** + * Java Plugin API. + */ + def create(payload: Any, sequenceNr: Long, resolved: Boolean, processorId: String, channelId: String, sender: String, deleted: Boolean, confirms: Seq[String], confirmTarget: ActorRef, confirmMessage: Confirm): PersistentImpl = + PersistentImpl(payload, sequenceNr, resolved, processorId, channelId, sender, deleted, confirms, confirmTarget, confirmMessage) +} + +/** + * Receive by a processor when a journal failed to write a [[Persistent]] message. + * + * @param payload payload of the persistent message. + * @param sequenceNr sequence number of the persistent message. + * @param cause failure cause. + */ +case class PersistenceFailure(payload: Any, sequenceNr: Long, cause: Throwable) + /** * Message to confirm the receipt of a persistent message (sent via a [[Channel]]). */ diff --git a/akka-persistence/src/main/scala/akka/persistence/Processor.scala b/akka-persistence/src/main/scala/akka/persistence/Processor.scala index 567b791deb..4cf1eda109 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Processor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Processor.scala @@ -43,11 +43,16 @@ import akka.dispatch._ * ''user stash'' inherited by `akka.actor.Stash`. `Processor` implementation classes can therefore use the * ''user stash'' for stashing/unstashing both persistent and transient messages. * + * Processors can also store snapshots of internal state by calling [[saveSnapshot]]. During recovery, a saved + * snapshot is offered to the processor with a [[SnapshotOffer]] message, followed by replayed messages, if any, + * that are younger than the snapshot. Default is to offer the latest saved snapshot. + * * @see [[UntypedProcessor]] + * @see [[Recover]] */ trait Processor extends Actor with Stash { - import Journal._ - import SnapshotStore._ + import JournalProtocol._ + import SnapshotProtocol._ private val extension = Persistence(context.system) private val _processorId = extension.processorId(self) @@ -100,23 +105,26 @@ trait Processor extends Actor with Stash { override def toString: String = "recovery started" def aroundReceive(receive: Actor.Receive, message: Any) = message match { - case LoadSnapshotCompleted(sso, toSnr) ⇒ sso match { - case Some(ss) ⇒ { - process(receive, SnapshotOffer(ss.metadata, ss.snapshot)) - journal ! Replay(ss.metadata.sequenceNr + 1L, toSnr, self, processorId) + case LoadSnapshotResult(sso, toSnr) ⇒ sso match { + case Some(SelectedSnapshot(metadata, snapshot)) ⇒ { + process(receive, SnapshotOffer(metadata, snapshot)) + journal ! Replay(metadata.sequenceNr + 1L, toSnr, processorId, self) } case None ⇒ { - journal ! Replay(1L, toSnr, self, processorId) + journal ! Replay(1L, toSnr, processorId, self) } } - case ReplayCompleted(maxSnr) ⇒ { + case ReplaySuccess(maxSnr) ⇒ { _currentState = recoverySucceeded _sequenceNr = maxSnr unstashAllInternal() } + case ReplayFailure(cause) ⇒ { + throw cause + } case Replayed(p) ⇒ try { processPersistent(receive, p) } catch { case t: Throwable ⇒ { _currentState = recoveryFailed // delay throwing exception to prepareRestart - _recoveryFailureReason = t + _recoveryFailureCause = t _recoveryFailureMessage = currentEnvelope } } @@ -132,14 +140,13 @@ trait Processor extends Actor with Stash { override def toString: String = "recovery finished" def aroundReceive(receive: Actor.Receive, message: Any) = message match { - case r: Recover ⇒ // ignore - case Replayed(p) ⇒ processPersistent(receive, p) // can occur after unstash from user stash - case Written(p) ⇒ processPersistent(receive, p) - case Looped(p) ⇒ process(receive, p) - case s: SaveSnapshotSucceeded ⇒ process(receive, s) - case f: SaveSnapshotFailed ⇒ process(receive, f) - case p: PersistentImpl ⇒ journal forward Write(p.copy(processorId = processorId, sequenceNr = nextSequenceNr()), self) - case m ⇒ journal forward Loop(m, self) + case r: Recover ⇒ // ignore + case Replayed(p) ⇒ processPersistent(receive, p) // can occur after unstash from user stash + case WriteSuccess(p) ⇒ processPersistent(receive, p) + case WriteFailure(p, cause) ⇒ process(receive, PersistenceFailure(p.payload, p.sequenceNr, cause)) + case LoopSuccess(m) ⇒ process(receive, m) + case p: PersistentImpl ⇒ journal forward Write(p.copy(processorId = processorId, sequenceNr = nextSequenceNr()), self) + case m ⇒ journal forward Loop(m, self) } } @@ -152,7 +159,7 @@ trait Processor extends Actor with Stash { override def toString: String = "recovery failed" def aroundReceive(receive: Actor.Receive, message: Any) = message match { - case ReplayCompleted(maxSnr) ⇒ { + case ReplaySuccess(maxSnr) ⇒ { _currentState = prepareRestart mailbox.enqueueFirst(self, _recoveryFailureMessage) } @@ -170,7 +177,7 @@ trait Processor extends Actor with Stash { override def toString: String = "prepare restart" def aroundReceive(receive: Actor.Receive, message: Any) = message match { - case Replayed(_) ⇒ throw _recoveryFailureReason + case Replayed(_) ⇒ throw _recoveryFailureCause case _ ⇒ // ignore } } @@ -181,7 +188,7 @@ trait Processor extends Actor with Stash { private var _currentPersistent: Persistent = _ private var _currentState: State = recoveryPending - private var _recoveryFailureReason: Throwable = _ + private var _recoveryFailureCause: Throwable = _ private var _recoveryFailureMessage: Envelope = _ private lazy val journal = extension.journalFor(processorId) @@ -230,7 +237,7 @@ trait Processor extends Actor with Stash { /** * Saves a `snapshot` of this processor's state. If saving succeeds, this processor will receive a - * [[SaveSnapshotSucceeded]] message, otherwise a [[SaveSnapshotFailed]] message. + * [[SaveSnapshotSuccess]] message, otherwise a [[SaveSnapshotFailure]] message. */ def saveSnapshot(snapshot: Any): Unit = { snapshotStore ! SaveSnapshot(SnapshotMetadata(processorId, lastSequenceNr), snapshot) @@ -266,10 +273,10 @@ trait Processor extends Actor with Stash { unstashAllInternal() } finally { message match { - case Some(Written(m)) ⇒ preRestartDefault(reason, Some(m)) - case Some(Looped(m)) ⇒ preRestartDefault(reason, Some(m)) - case Some(Replayed(m)) ⇒ preRestartDefault(reason, Some(m)) - case mo ⇒ preRestartDefault(reason, None) + case Some(WriteSuccess(m)) ⇒ preRestartDefault(reason, Some(m)) + case Some(LoopSuccess(m)) ⇒ preRestartDefault(reason, Some(m)) + case Some(Replayed(m)) ⇒ preRestartDefault(reason, Some(m)) + case mo ⇒ preRestartDefault(reason, None) } } } @@ -312,9 +319,9 @@ trait Processor extends Actor with Stash { // ----------------------------------------------------- private def unstashFilterPredicate: Any ⇒ Boolean = { - case _: Written ⇒ false - case _: Replayed ⇒ false - case _ ⇒ true + case _: WriteSuccess ⇒ false + case _: Replayed ⇒ false + case _ ⇒ true } private var processorStash = Vector.empty[Envelope] @@ -381,7 +388,12 @@ trait Processor extends Actor with Stash { * ''user stash'' inherited by `akka.actor.Stash`. `Processor` implementation classes can therefore use the * ''user stash'' for stashing/unstashing both persistent and transient messages. * + * Processors can also store snapshots of internal state by calling [[saveSnapshot]]. During recovery, a saved + * snapshot is offered to the processor with a [[SnapshotOffer]] message, followed by replayed messages, if any, + * that are younger than the snapshot. Default is to offer the latest saved snapshot. + * * @see [[Processor]] + * @see [[Recover]] */ abstract class UntypedProcessor extends UntypedActor with Processor { diff --git a/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala b/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala index 8dbfb14831..4a5b7ece69 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala @@ -4,11 +4,6 @@ package akka.persistence -import java.io._ - -import akka.actor._ -import akka.util.ClassLoaderObjectInputStream - /** * Snapshot metadata. * @@ -21,21 +16,21 @@ case class SnapshotMetadata(processorId: String, sequenceNr: Long, timestamp: Lo //#snapshot-metadata /** - * Indicates successful saving of a snapshot. + * Notification of a snapshot saving success. * * @param metadata snapshot metadata. */ @SerialVersionUID(1L) -case class SaveSnapshotSucceeded(metadata: SnapshotMetadata) +case class SaveSnapshotSuccess(metadata: SnapshotMetadata) /** - * Indicates failed saving of a snapshot. + * Notification of a snapshot saving success failure. * * @param metadata snapshot metadata. - * @param reason failure reason. + * @param cause failure cause. */ @SerialVersionUID(1L) -case class SaveSnapshotFailed(metadata: SnapshotMetadata, reason: Throwable) +case class SaveSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable) /** * Offers a [[Processor]] a previously saved `snapshot` during recovery. This offer is received @@ -45,7 +40,7 @@ case class SaveSnapshotFailed(metadata: SnapshotMetadata, reason: Throwable) case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any) /** - * Snapshot selection criteria for recovery. + * Selection criteria for loading snapshots. * * @param maxSequenceNr upper bound for a selected snapshot's sequence number. Default is no upper bound. * @param maxTimestamp upper bound for a selected snapshot's timestamp. Default is no upper bound. @@ -86,109 +81,43 @@ object SnapshotSelectionCriteria { def none() = None } -// TODO: support application-defined snapshot serializers -// TODO: support application-defined snapshot access - /** - * Snapshot serialization extension. + * Plugin API. + * + * A selected snapshot matching [[SnapshotSelectionCriteria]]. + * + * @param metadata snapshot metadata. + * @param snapshot snapshot. */ -private[persistence] object SnapshotSerialization extends ExtensionId[SnapshotSerialization] with ExtensionIdProvider { - def createExtension(system: ExtendedActorSystem): SnapshotSerialization = new SnapshotSerialization(system) - def lookup() = SnapshotSerialization +case class SelectedSnapshot(metadata: SnapshotMetadata, snapshot: Any) + +object SelectedSnapshot { + /** + * Plugin Java API. + */ + def create(metadata: SnapshotMetadata, snapshot: Any): SelectedSnapshot = + SelectedSnapshot(metadata, snapshot) } /** - * Snapshot serialization extension. + * Defines messages exchanged between processors and a snapshot store. */ -private[persistence] class SnapshotSerialization(val system: ExtendedActorSystem) extends Extension { - import akka.serialization.JavaSerializer - - /** - * Java serialization based snapshot serializer. - */ - val java = new SnapshotSerializer { - def serialize(stream: OutputStream, metadata: SnapshotMetadata, state: Any) = { - val out = new ObjectOutputStream(stream) - JavaSerializer.currentSystem.withValue(system) { out.writeObject(state) } - } - - def deserialize(stream: InputStream, metadata: SnapshotMetadata) = { - val in = new ClassLoaderObjectInputStream(system.dynamicAccess.classLoader, stream) - JavaSerializer.currentSystem.withValue(system) { in.readObject } - } - } -} - -/** - * Stream-based snapshot serializer. - */ -private[persistence] trait SnapshotSerializer { - /** - * Serializes a `snapshot` to an output stream. - */ - def serialize(stream: OutputStream, metadata: SnapshotMetadata, snapshot: Any): Unit - - /** - * Deserializes a snapshot from an input stream. - */ - def deserialize(stream: InputStream, metadata: SnapshotMetadata): Any -} - -/** - * Input and output stream management for snapshot serialization. - */ -private[persistence] trait SnapshotAccess { - /** - * Provides a managed output stream for serializing a snapshot. - * - * @param metadata snapshot metadata needed to create an output stream. - * @param body called with the managed output stream as argument. - */ - def withOutputStream(metadata: SnapshotMetadata)(body: OutputStream ⇒ Unit) - - /** - * Provides a managed input stream for deserializing a state object. - * - * @param metadata snapshot metadata needed to create an input stream. - * @param body called with the managed input stream as argument. - * @return read snapshot. - */ - def withInputStream(metadata: SnapshotMetadata)(body: InputStream ⇒ Any): Any - - /** - * Loads the snapshot metadata of all currently stored snapshots. - */ - def metadata: Set[SnapshotMetadata] - - /** - * Deletes the snapshot referenced by `metadata`. - */ - def delete(metadata: SnapshotMetadata) -} - -private[persistence] trait SnapshotStoreFactory { - /** - * Creates a new snapshot store actor. - */ - def createSnapshotStore(implicit factory: ActorRefFactory): ActorRef -} - -private[persistence] object SnapshotStore { +private[persistence] object SnapshotProtocol { /** * Instructs a snapshot store to load a snapshot. * * @param processorId processor id. - * @param criteria criteria for selecting a saved snapshot from which recovery should start. + * @param criteria criteria for selecting a snapshot from which recovery should start. * @param toSequenceNr upper sequence number bound (inclusive) for recovery. */ case class LoadSnapshot(processorId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) /** - * Reply message to a processor that a snapshot loading attempt has been completed. + * Response message to a [[LoadSnapshot]] message. * - * @param savedSnapshot + * @param snapshot loaded snapshot, if any. */ - case class LoadSnapshotCompleted(savedSnapshot: Option[SavedSnapshot], toSequenceNr: Long) + case class LoadSnapshotResult(snapshot: Option[SelectedSnapshot], toSequenceNr: Long) /** * Instructs snapshot store to save a snapshot. @@ -197,12 +126,4 @@ private[persistence] object SnapshotStore { * @param snapshot snapshot. */ case class SaveSnapshot(metadata: SnapshotMetadata, snapshot: Any) - - /** - * In-memory representation of a saved snapshot. - * - * @param metadata snapshot metadata. - * @param snapshot saved snapshot. - */ - case class SavedSnapshot(metadata: SnapshotMetadata, snapshot: Any) } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncReplay.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncReplay.scala new file mode 100644 index 0000000000..f79bcd6f8c --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncReplay.scala @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.persistence.journal + +import scala.concurrent.Future + +import akka.persistence.PersistentImpl + +/** + * Asynchronous message replay interface. + */ +trait AsyncReplay { + //#journal-plugin-api + /** + * Plugin API. + * + * Asynchronously replays persistent messages. Implementations replay a message + * by calling `replayCallback`. The returned future must be completed when all + * messages (matching the sequence number bounds) have been replayed. The future + * `Long` value must be the highest stored sequence number in the journal for the + * specified processor. The future must be completed with a failure if any of + * the persistent messages could not be replayed. + * + * The `replayCallback` must also be called with messages that have been marked + * as deleted. In this case a replayed message's `deleted` field must be set to + * `true`. + * + * The channel ids of delivery confirmations that are available for a replayed + * message must be contained in that message's `confirms` sequence. + * + * @param processorId processor id. + * @param fromSequenceNr sequence number where replay should start. + * @param toSequenceNr sequence number where replay should end (inclusive). + * @param replayCallback called to replay a single message. + * + * @see [[AsyncWriteJournal]] + * @see [[SyncWriteJournal]] + */ + def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: PersistentImpl ⇒ Unit): Future[Long] + //#journal-plugin-api +} diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala new file mode 100644 index 0000000000..aedba26844 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -0,0 +1,119 @@ +/** + * Copyright (C) 2012-2013 Eligotech BV. + */ + +package akka.persistence.journal + +import scala.concurrent.Future +import scala.util._ + +import akka.actor._ +import akka.pattern.{ pipe, PromiseActorRef } +import akka.persistence._ +import akka.persistence.JournalProtocol._ +import akka.serialization.Serialization + +/** + * Abstract journal, optimized for asynchronous, non-blocking writes. + */ +trait AsyncWriteJournal extends Actor with AsyncReplay { + import AsyncWriteJournal._ + import context.dispatcher + + private val extension = Persistence(context.system) + private val resequencer = context.actorOf(Props[Resequencer]) + private var resequencerCounter = 1L + + final def receive = { + case Write(persistent, processor) ⇒ { + val csdr = sender + val cctr = resequencerCounter + val psdr = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender + writeAsync(persistent.copy(sender = Serialization.serializedActorPath(psdr), resolved = false, confirmTarget = null, confirmMessage = null)) map { + _ ⇒ Desequenced(WriteSuccess(persistent), cctr, processor, csdr) + } recover { + case e ⇒ Desequenced(WriteFailure(persistent, e), cctr, processor, csdr) + } pipeTo (resequencer) + resequencerCounter += 1 + } + case Replay(fromSequenceNr, toSequenceNr, processorId, processor) ⇒ { + // Send replayed messages and replay result to processor directly. No need + // to resequence replayed messages relative to written and looped messages. + replayAsync(processorId, fromSequenceNr, toSequenceNr) { p ⇒ + if (!p.deleted) processor.tell(Replayed(p), extension.system.provider.resolveActorRef(p.sender)) + } map { + maxSnr ⇒ ReplaySuccess(maxSnr) + } recover { + case e ⇒ ReplayFailure(e) + } pipeTo (processor) + } + case c @ Confirm(processorId, sequenceNr, channelId) ⇒ { + confirmAsync(processorId, sequenceNr, channelId) onComplete { + case Success(_) ⇒ context.system.eventStream.publish(c) + case Failure(e) ⇒ // TODO: publish failure to event stream + } + context.system.eventStream.publish(c) + } + case Delete(persistent: PersistentImpl) ⇒ { + deleteAsync(persistent) onComplete { + case Success(_) ⇒ // TODO: publish success to event stream + case Failure(e) ⇒ // TODO: publish failure to event stream + } + } + case Loop(message, processor) ⇒ { + resequencer ! Desequenced(LoopSuccess(message), resequencerCounter, processor, sender) + resequencerCounter += 1 + } + } + + //#journal-plugin-api + /** + * Plugin API. + * + * Asynchronously writes a `persistent` message to the journal. + */ + def writeAsync(persistent: PersistentImpl): Future[Unit] + + /** + * Plugin API. + * + * Asynchronously marks a `persistent` message as deleted. + */ + def deleteAsync(persistent: PersistentImpl): Future[Unit] + + /** + * Plugin API. + * + * Asynchronously writes a delivery confirmation to the journal. + */ + def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] + //#journal-plugin-api +} + +private[persistence] object AsyncWriteJournal { + case class Desequenced(msg: Any, snr: Long, target: ActorRef, sender: ActorRef) + + class Resequencer extends Actor { + import scala.collection.mutable.Map + + private val delayed = Map.empty[Long, Desequenced] + private var delivered = 0L + + def receive = { + case d: Desequenced ⇒ resequence(d) + } + + @scala.annotation.tailrec + private def resequence(d: Desequenced) { + if (d.snr == delivered + 1) { + delivered = d.snr + d.target tell (d.msg, d.sender) + } else { + delayed += (d.snr -> d) + } + val ro = delayed.remove(delivered + 1) + if (ro.isDefined) resequence(ro.get) + } + } +} + diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala new file mode 100644 index 0000000000..31f3db30ed --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2012-2013 Eligotech BV. + */ + +package akka.persistence.journal + +import scala.util._ + +import akka.actor.Actor +import akka.pattern.{ pipe, PromiseActorRef } +import akka.persistence._ +import akka.serialization.Serialization + +/** + * Abstract journal, optimized for synchronous writes. + */ +trait SyncWriteJournal extends Actor with AsyncReplay { + import JournalProtocol._ + import context.dispatcher + + private val extension = Persistence(context.system) + + final def receive = { + case Write(persistent, processor) ⇒ { + val sdr = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender + Try(write(persistent.copy(sender = Serialization.serializedActorPath(sdr), resolved = false, confirmTarget = null, confirmMessage = null))) match { + case Success(_) ⇒ processor forward WriteSuccess(persistent) + case Failure(e) ⇒ processor forward WriteFailure(persistent, e); throw e + } + } + case Replay(fromSequenceNr, toSequenceNr, processorId, processor) ⇒ { + replayAsync(processorId, fromSequenceNr, toSequenceNr) { p ⇒ + if (!p.deleted) processor.tell(Replayed(p), extension.system.provider.resolveActorRef(p.sender)) + } map { + maxSnr ⇒ ReplaySuccess(maxSnr) + } recover { + case e ⇒ ReplayFailure(e) + } pipeTo (processor) + } + case c @ Confirm(processorId, sequenceNr, channelId) ⇒ { + confirm(processorId, sequenceNr, channelId) + context.system.eventStream.publish(c) // TODO: turn off by default and allow to turn on by configuration + } + case Delete(persistent: PersistentImpl) ⇒ { + delete(persistent) + } + case Loop(message, processor) ⇒ { + processor forward LoopSuccess(message) + } + } + + //#journal-plugin-api + /** + * Plugin API. + * + * Synchronously writes a `persistent` message to the journal. + */ + def write(persistent: PersistentImpl): Unit + + /** + * Plugin API. + * + * Synchronously marks a `persistent` message as deleted. + */ + def delete(persistent: PersistentImpl): Unit + + /** + * Plugin API. + * + * Synchronously writes a delivery confirmation to the journal. + */ + def confirm(processorId: String, sequenceNr: Long, channelId: String): Unit + //#journal-plugin-api +} diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala new file mode 100644 index 0000000000..fd8f723bc7 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala @@ -0,0 +1,90 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.persistence.journal.inmem + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.language.postfixOps + +import akka.actor._ +import akka.pattern.ask +import akka.persistence._ +import akka.persistence.journal.AsyncWriteJournal +import akka.util._ + +/** + * INTERNAL API. + * + * In-memory journal for testing purposes only. + */ +private[persistence] class InmemJournal extends AsyncWriteJournal { + val store = context.actorOf(Props[InmemStore]) + + implicit val timeout = Timeout(5 seconds) + + import InmemStore._ + + def writeAsync(persistent: PersistentImpl): Future[Unit] = + (store ? Write(persistent)).mapTo[Unit] + + def deleteAsync(persistent: PersistentImpl): Future[Unit] = + (store ? Delete(persistent)).mapTo[Unit] + + def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] = + (store ? Confirm(processorId, sequenceNr, channelId)).mapTo[Unit] + + def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentImpl) ⇒ Unit): Future[Long] = + (store ? Replay(processorId, fromSequenceNr, toSequenceNr, replayCallback)).mapTo[Long] +} + +private[persistence] class InmemStore extends Actor { + import InmemStore._ + + // processor id => persistent message + var messages = Map.empty[String, Vector[PersistentImpl]] + + def receive = { + case Write(p) ⇒ add(p); success() + case Delete(p) ⇒ update(p.processorId, p.sequenceNr)(_.copy(deleted = true)); success() + case Confirm(pid, snr, cid) ⇒ update(pid, snr)(p ⇒ p.copy(confirms = cid +: p.confirms)); success() + case Replay(pid, fromSnr, toSnr, callback) ⇒ { + for { + ms ← messages.get(pid) + m ← ms + if m.sequenceNr >= fromSnr && m.sequenceNr <= toSnr + } callback(m) + + success(maxSequenceNr(pid)) + } + } + + private def success(reply: Any = ()) = + sender ! reply + + private def add(p: PersistentImpl) = messages = messages + (messages.get(p.processorId) match { + case Some(ms) ⇒ p.processorId -> (ms :+ p) + case None ⇒ p.processorId -> Vector(p) + }) + + private def update(pid: String, snr: Long)(f: PersistentImpl ⇒ PersistentImpl) = messages = messages.get(pid) match { + case Some(ms) ⇒ messages + (pid -> ms.map(sp ⇒ if (sp.sequenceNr == snr) f(sp) else sp)) + case None ⇒ messages + } + + private def maxSequenceNr(pid: String): Long = { + val snro = for { + ms ← messages.get(pid) + m ← ms.lastOption + } yield m.sequenceNr + snro.getOrElse(0L) + } +} + +private[persistence] object InmemStore { + case class Write(p: PersistentImpl) + case class Delete(p: PersistentImpl) + case class Confirm(processorId: String, sequenceNr: Long, channelId: String) + case class Replay(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, replayCallback: (PersistentImpl) ⇒ Unit) +} diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncReplay.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncReplay.scala new file mode 100644 index 0000000000..948755faac --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncReplay.scala @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.persistence.journal.japi + +import java.lang.{ Long ⇒ JLong } + +import scala.concurrent.Future + +import akka.actor.Actor +import akka.japi.Procedure +import akka.persistence.journal.{ AsyncReplay ⇒ SAsyncReplay } +import akka.persistence.PersistentImpl + +abstract class AsyncReplay extends SAsyncReplay { this: Actor ⇒ + import context.dispatcher + + final def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentImpl) ⇒ Unit) = + doReplayAsync(processorId, fromSequenceNr, toSequenceNr, new Procedure[PersistentImpl] { + def apply(p: PersistentImpl) = replayCallback(p) + }).map(_.longValue) + + /** + * Plugin Java API. + * + * Asynchronously replays persistent messages. Implementations replay a message + * by calling `replayCallback`. The returned future must be completed when all + * messages (matching the sequence number bounds) have been replayed. The future + * `Long` value must be the highest stored sequence number in the journal for the + * specified processor. The future must be completed with a failure if any of + * the persistent messages could not be replayed. + * + * The `replayCallback` must also be called with messages that have been marked + * as deleted. In this case a replayed message's `deleted` field must be set to + * `true`. + * + * The channel ids of delivery confirmations that are available for a replayed + * message must be contained in that message's `confirms` sequence. + * + * @param processorId processor id. + * @param fromSequenceNr sequence number where replay should start. + * @param toSequenceNr sequence number where replay should end (inclusive). + * @param replayCallback called to replay a single message. + */ + def doReplayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, replayCallback: Procedure[PersistentImpl]): Future[JLong] +} diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala new file mode 100644 index 0000000000..4a5c2594d9 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.persistence.journal.japi + +import scala.concurrent.Future + +import akka.persistence.journal.{ AsyncWriteJournal ⇒ SAsyncWriteJournal } +import akka.persistence.PersistentImpl + +/** + * Java API. + * + * Abstract journal, optimized for asynchronous, non-blocking writes. + */ +abstract class AsyncWriteJournal extends AsyncReplay with SAsyncWriteJournal { + import context.dispatcher + + final def writeAsync(persistent: PersistentImpl) = + doWriteAsync(persistent).map(Unit.unbox) + + final def deleteAsync(persistent: PersistentImpl) = + doDeleteAsync(persistent).map(Unit.unbox) + + final def confirmAsync(processorId: String, sequenceNr: Long, channelId: String) = + doConfirmAsync(processorId, sequenceNr, channelId).map(Unit.unbox) + + /** + * Plugin Java API. + * + * Asynchronously writes a `persistent` message to the journal. + */ + def doWriteAsync(persistent: PersistentImpl): Future[Void] + + /** + * Plugin Java API. + * + * Asynchronously marks a `persistent` message as deleted. + */ + def doDeleteAsync(persistent: PersistentImpl): Future[Void] + + /** + * Plugin Java API. + * + * Asynchronously writes a delivery confirmation to the journal. + */ + def doConfirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Void] +} diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala new file mode 100644 index 0000000000..559594577b --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala @@ -0,0 +1,48 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.persistence.journal.japi + +import akka.persistence.journal.{ SyncWriteJournal ⇒ SSyncWriteJournal } +import akka.persistence.PersistentImpl + +/** + * Java API. + * + * Abstract journal, optimized for synchronous writes. + */ +abstract class SyncWriteJournal extends AsyncReplay with SSyncWriteJournal { + final def write(persistent: PersistentImpl) = + doWrite(persistent) + + final def delete(persistent: PersistentImpl) = + doDelete(persistent) + + final def confirm(processorId: String, sequenceNr: Long, channelId: String) = + doConfirm(processorId, sequenceNr, channelId) + + /** + * Plugin Java API. + * + * Synchronously writes a `persistent` message to the journal. + */ + @throws(classOf[Exception]) + def doWrite(persistent: PersistentImpl): Unit + + /** + * Plugin Java API. + * + * Synchronously marks a `persistent` message as deleted. + */ + @throws(classOf[Exception]) + def doDelete(persistent: PersistentImpl): Unit + + /** + * Plugin Java API. + * + * Synchronously writes a delivery confirmation to the journal. + */ + @throws(classOf[Exception]) + def doConfirm(processorId: String, sequenceNr: Long, channelId: String): Unit +} diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala index 9e9793b22c..b5f0db7bef 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala @@ -4,12 +4,12 @@ package akka.persistence.journal.leveldb -import akka.actor.Actor - import org.iq80.leveldb.DBIterator +import akka.actor.Actor + /** - * Persistent mapping of `String`-based processor and channel ids to numeric ids. + * LevelDB backed persistent mapping of `String`-based processor and channel ids to numeric ids. */ private[persistence] trait LeveldbIdMapping extends Actor { this: LeveldbJournal ⇒ import Key._ diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala index cf8bd563fe..7a0063eb41 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala @@ -6,54 +6,25 @@ package akka.persistence.journal.leveldb import java.io.File -import scala.util._ - import org.iq80.leveldb._ -import com.typesafe.config.Config - -import akka.actor._ -import akka.pattern.PromiseActorRef import akka.persistence._ -import akka.serialization.{ Serialization, SerializationExtension } +import akka.persistence.journal.SyncWriteJournal +import akka.serialization.SerializationExtension /** - * LevelDB journal settings. + * INTERNAL API. + * + * LevelDB backed journal. */ -private[persistence] class LeveldbJournalSettings(config: Config) extends JournalFactory { - /** - * Name of directory where journal files shall be stored. Can be a relative or absolute path. - */ - val journalDir: File = new File(config.getString("dir")) - - /** - * Verify checksums on read. - */ - val checksum = false - - /** - * Synchronous writes to disk. - */ - val fsync: Boolean = config.getBoolean("fsync") - - /** - * Creates a new LevelDB journal actor from this configuration object. - */ - def createJournal(implicit factory: ActorRefFactory): ActorRef = - factory.actorOf(Props(classOf[LeveldbJournal], this).withDispatcher("akka.persistence.journal.leveldb.write.dispatcher")) -} - -/** - * LevelDB journal. - */ -private[persistence] class LeveldbJournal(val settings: LeveldbJournalSettings) extends Actor with LeveldbIdMapping with LeveldbReplay { - val extension = Persistence(context.system) +private[leveldb] class LeveldbJournal extends SyncWriteJournal with LeveldbIdMapping with LeveldbReplay { + val config = context.system.settings.config.getConfig("akka.persistence.journal.leveldb") val leveldbOptions = new Options().createIfMissing(true).compressionType(CompressionType.NONE) - val leveldbReadOptions = new ReadOptions().verifyChecksums(settings.checksum) - val leveldbWriteOptions = new WriteOptions().sync(settings.fsync) + val leveldbReadOptions = new ReadOptions().verifyChecksums(config.getBoolean("checksum")) + val leveldbWriteOptions = new WriteOptions().sync(config.getBoolean("fsync")) + val leveldbDir = new File(config.getString("dir")) - val leveldbDir = settings.journalDir val leveldbFactory = org.iq80.leveldb.impl.Iq80DBFactory.factory var leveldb: DB = _ @@ -65,40 +36,20 @@ private[persistence] class LeveldbJournal(val settings: LeveldbJournalSettings) // TODO: use user-defined serializer for payload val serializer = SerializationExtension(context.system).findSerializerFor("") - import Journal._ import Key._ - import context.dispatcher + def write(persistent: PersistentImpl) = withBatch { batch ⇒ + val nid = numericId(persistent.processorId) + batch.put(keyToBytes(counterKey(nid)), counterToBytes(persistent.sequenceNr)) + batch.put(keyToBytes(Key(nid, persistent.sequenceNr, 0)), persistentToBytes(persistent)) + } - def receive = { - case Write(persistent, processor) ⇒ { - val persisted = withBatch { batch ⇒ - val sdr = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender - val nid = numericId(persistent.processorId) - val prepared = persistent.copy(sender = Serialization.serializedActorPath(sdr)) - batch.put(keyToBytes(counterKey(nid)), counterToBytes(prepared.sequenceNr)) - batch.put(keyToBytes(Key(nid, prepared.sequenceNr, 0)), persistentToBytes(prepared.copy(resolved = false, confirmTarget = null, confirmMessage = null))) - prepared - } - processor.tell(Written(persisted), sender) - } - case c @ Confirm(processorId, sequenceNr, channelId) ⇒ { - leveldb.put(keyToBytes(Key(numericId(processorId), sequenceNr, numericId(channelId))), channelId.getBytes("UTF-8")) - context.system.eventStream.publish(c) // TODO: turn off by default and allow to turn on by configuration - } - case Delete(persistent: PersistentImpl) ⇒ { - leveldb.put(keyToBytes(deletionKey(numericId(persistent.processorId), persistent.sequenceNr)), Array.empty[Byte]) - } - case Loop(message, processor) ⇒ { - processor.tell(Looped(message), sender) - } - case Replay(fromSequenceNr, toSequenceNr, processor, processorId) ⇒ { - val maxSnr = maxSequenceNr(processorId) - replayAsync(fromSequenceNr, toSequenceNr, processor, processorId) onComplete { - case Success(_) ⇒ processor ! ReplayCompleted(maxSnr) - case Failure(e) ⇒ // TODO: send RecoveryFailed to processor - } - } + def delete(persistent: PersistentImpl) { + leveldb.put(keyToBytes(deletionKey(numericId(persistent.processorId), persistent.sequenceNr)), Array.empty[Byte]) + } + + def confirm(processorId: String, sequenceNr: Long, channelId: String) { + leveldb.put(keyToBytes(Key(numericId(processorId), sequenceNr, numericId(channelId))), channelId.getBytes("UTF-8")) } def leveldbSnapshot = leveldbReadOptions.snapshot(leveldb.getSnapshot) @@ -107,7 +58,7 @@ private[persistence] class LeveldbJournal(val settings: LeveldbJournalSettings) def persistentToBytes(p: PersistentImpl): Array[Byte] = serializer.toBinary(p) def persistentFromBytes(a: Array[Byte]): PersistentImpl = serializer.fromBinary(a).asInstanceOf[PersistentImpl] - def withBatch[R](body: WriteBatch ⇒ R): R = { + private def withBatch[R](body: WriteBatch ⇒ R): R = { val batch = leveldb.createWriteBatch() try { val r = body(batch) @@ -118,21 +69,13 @@ private[persistence] class LeveldbJournal(val settings: LeveldbJournalSettings) } } - def maxSequenceNr(processorId: String) = { - leveldb.get(keyToBytes(counterKey(numericId(processorId))), leveldbSnapshot) match { - case null ⇒ 0L - case bytes ⇒ counterFromBytes(bytes) - } - } - override def preStart() { leveldb = leveldbFactory.open(leveldbDir, leveldbOptions) super.preStart() } override def postStop() { - super.postStop() leveldb.close() + super.postStop() } } - diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbReplay.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbReplay.scala index bc864626c2..d76961fac4 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbReplay.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbReplay.scala @@ -6,26 +6,26 @@ package akka.persistence.journal.leveldb import scala.concurrent.Future -import org.iq80.leveldb.DBIterator - -import akka.actor._ import akka.persistence._ -import akka.persistence.Journal._ +import akka.persistence.journal.AsyncReplay /** - * Asynchronous replay support. + * LevelDB backed message replay. */ -private[persistence] trait LeveldbReplay extends Actor { this: LeveldbJournal ⇒ +private[persistence] trait LeveldbReplay extends AsyncReplay { this: LeveldbJournal ⇒ import Key._ - private val executionContext = context.system.dispatchers.lookup("akka.persistence.journal.leveldb.replay.dispatcher") + private val replayDispatcherId = context.system.settings.config.getString("akka.persistence.journal.leveldb.replay-dispatcher") + private val replayDispatcher = context.system.dispatchers.lookup(replayDispatcherId) - def replayAsync(fromSequenceNr: Long, toSequenceNr: Long, processor: ActorRef, processorId: String): Future[Unit] = - Future(replay(fromSequenceNr: Long, toSequenceNr, processor, numericId(processorId), leveldbIterator))(executionContext) + def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: PersistentImpl ⇒ Unit): Future[Long] = + Future(replay(numericId(processorId), fromSequenceNr: Long, toSequenceNr)(replayCallback))(replayDispatcher) + + private def replay(processorId: Int, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: PersistentImpl ⇒ Unit): Long = { + val iter = leveldbIterator - private def replay(fromSequenceNr: Long, toSequenceNr: Long, processor: ActorRef, processorId: Int, iter: DBIterator): Unit = { @scala.annotation.tailrec - def go(key: Key)(callback: PersistentImpl ⇒ Unit) { + def go(key: Key, replayCallback: PersistentImpl ⇒ Unit) { if (iter.hasNext) { val nextEntry = iter.next() val nextKey = keyFromBytes(nextEntry.getKey) @@ -33,13 +33,13 @@ private[persistence] trait LeveldbReplay extends Actor { this: LeveldbJournal // end iteration here } else if (nextKey.channelId != 0) { // phantom confirmation (just advance iterator) - go(nextKey)(callback) + go(nextKey, replayCallback) } else if (key.processorId == nextKey.processorId) { val msg = persistentFromBytes(nextEntry.getValue) val del = deletion(nextKey) val cnf = confirms(nextKey, Nil) - if (!del) callback(msg.copy(confirms = cnf)) - go(nextKey)(callback) + replayCallback(msg.copy(confirms = cnf, deleted = del)) + go(nextKey, replayCallback) } } } @@ -71,9 +71,17 @@ private[persistence] trait LeveldbReplay extends Actor { this: LeveldbJournal try { val startKey = Key(processorId, if (fromSequenceNr < 1L) 1L else fromSequenceNr, 0) iter.seek(keyToBytes(startKey)) - go(startKey) { m ⇒ processor.tell(Replayed(m), extension.system.provider.resolveActorRef(m.sender)) } + go(startKey, replayCallback) + maxSequenceNr(processorId) } finally { iter.close() } } + + def maxSequenceNr(processorId: Int) = { + leveldb.get(keyToBytes(counterKey(processorId)), leveldbSnapshot) match { + case null ⇒ 0L + case bytes ⇒ counterFromBytes(bytes) + } + } } diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotSerialization.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotSerialization.scala new file mode 100644 index 0000000000..7e8968de4f --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotSerialization.scala @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2012-2013 Eligotech BV. + */ + +package akka.persistence.snapshot + +import java.io._ + +import akka.actor._ +import akka.persistence.SnapshotMetadata +import akka.util.ClassLoaderObjectInputStream + +/** + * Snapshot serialization extension. + */ +private[persistence] object SnapshotSerialization extends ExtensionId[SnapshotSerialization] with ExtensionIdProvider { + def createExtension(system: ExtendedActorSystem): SnapshotSerialization = new SnapshotSerialization(system) + def lookup() = SnapshotSerialization +} + +/** + * Snapshot serialization extension. + */ +private[persistence] class SnapshotSerialization(val system: ExtendedActorSystem) extends Extension { + import akka.serialization.JavaSerializer + + /** + * Java serialization based snapshot serializer. + */ + val java = new SnapshotSerializer { + def serialize(stream: OutputStream, metadata: SnapshotMetadata, state: Any) = { + val out = new ObjectOutputStream(stream) + JavaSerializer.currentSystem.withValue(system) { out.writeObject(state) } + } + + def deserialize(stream: InputStream, metadata: SnapshotMetadata) = { + val in = new ClassLoaderObjectInputStream(system.dynamicAccess.classLoader, stream) + JavaSerializer.currentSystem.withValue(system) { in.readObject } + } + } +} + +/** + * Stream-based snapshot serializer. + */ +private[persistence] trait SnapshotSerializer { + /** + * Serializes a `snapshot` to an output stream. + */ + def serialize(stream: OutputStream, metadata: SnapshotMetadata, snapshot: Any): Unit + + /** + * Deserializes a snapshot from an input stream. + */ + def deserialize(stream: InputStream, metadata: SnapshotMetadata): Any +} + diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala new file mode 100644 index 0000000000..addc796087 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala @@ -0,0 +1,88 @@ +/** + * Copyright (C) 2012-2013 Eligotech BV. + */ + +package akka.persistence.snapshot + +import scala.concurrent.Future +import scala.util._ + +import akka.actor._ +import akka.pattern.pipe +import akka.persistence._ + +/** + * Abstract snapshot store. + */ +trait SnapshotStore extends Actor { + import SnapshotProtocol._ + import context.dispatcher + + final def receive = { + case LoadSnapshot(processorId, criteria, toSequenceNr) ⇒ { + val p = sender + loadAsync(processorId, criteria.limit(toSequenceNr)) map { + sso ⇒ LoadSnapshotResult(sso, toSequenceNr) + } recover { + case e ⇒ LoadSnapshotResult(None, toSequenceNr) + } pipeTo (p) + } + case SaveSnapshot(metadata, snapshot) ⇒ { + val p = sender + val md = metadata.copy(timestamp = System.currentTimeMillis) + saveAsync(md, snapshot) map { + _ ⇒ SaveSnapshotSuccess(md) + } recover { + case e ⇒ SaveSnapshotFailure(metadata, e) + } to (self, p) + } + case evt @ SaveSnapshotSuccess(metadata) ⇒ { + saved(metadata) + sender ! evt // sender is processor + } + case evt @ SaveSnapshotFailure(metadata, _) ⇒ { + delete(metadata) + sender ! evt // sender is processor + } + } + + //#snapshot-store-plugin-api + /** + * Plugin API. + * + * Asynchronously loads a snapshot. + * + * @param processorId processor id. + * @param criteria selection criteria for loading. + */ + def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] + + /** + * Plugin API. + * + * Asynchronously saves a snapshot. + * + * @param metadata snapshot metadata. + * @param snapshot snapshot. + */ + def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] + + /** + * Plugin API. + * + * Called after successful saving of a snapshot. + * + * @param metadata snapshot metadata. + */ + def saved(metadata: SnapshotMetadata) + + /** + * Plugin API. + * + * Deletes the snapshot identified by `metadata`. + * + * @param metadata snapshot metadata. + */ + def delete(metadata: SnapshotMetadata) + //#snapshot-store-plugin-api +} diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala new file mode 100644 index 0000000000..d4594c3a50 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.persistence.snapshot.japi + +import scala.concurrent.Future + +import akka.japi.{ Option ⇒ JOption } +import akka.persistence._ +import akka.persistence.snapshot.{ SnapshotStore ⇒ SSnapshotStore } + +abstract class SnapshotStore extends SSnapshotStore { + import context.dispatcher + + final def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria) = + doLoadAsync(processorId, criteria).map(_.asScala) + + final def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = + doSaveAsync(metadata, snapshot).map(Unit.unbox) + + final def saved(metadata: SnapshotMetadata) = + onSaved(metadata) + + final def delete(metadata: SnapshotMetadata) = + doDelete(metadata) + + /** + * Plugin Java API. + * + * Asynchronously loads a snapshot. + * + * @param processorId processor id. + * @param criteria selection criteria for loading. + */ + def doLoadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[JOption[SelectedSnapshot]] + + /** + * Plugin Java API. + * + * Asynchronously saves a snapshot. + * + * @param metadata snapshot metadata. + * @param snapshot snapshot. + */ + def doSaveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Void] + + /** + * Plugin Java API. + * + * Called after successful saving of a snapshot. + * + * @param metadata snapshot metadata. + */ + @throws(classOf[Exception]) + def onSaved(metadata: SnapshotMetadata): Unit + + /** + * Plugin Java API. + * + * Deletes the snapshot identified by `metadata`. + * + * @param metadata snapshot metadata. + */ + @throws(classOf[Exception]) + def doDelete(metadata: SnapshotMetadata): Unit +} diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala index 0413c0680d..79277ba516 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala @@ -7,83 +7,58 @@ package akka.persistence.snapshot.local import java.io._ import java.net.{ URLDecoder, URLEncoder } -import scala.collection.SortedSet -import scala.concurrent._ +import scala.collection.immutable.SortedSet +import scala.concurrent.Future import scala.util._ -import com.typesafe.config.Config - -import akka.actor._ +import akka.actor.ActorLogging import akka.persistence._ +import akka.persistence.snapshot._ /** - * [[LocalSnapshotStore]] settings. + * INTERNAL API. + * + * Local filesystem backed snapshot store. */ -private[persistence] class LocalSnapshotStoreSettings(config: Config) extends SnapshotStoreFactory { - /** - * Name of directory where snapshot files shall be stored. - */ - val snapshotDir: File = new File(config.getString("dir")) +private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLogging { + private val FilenamePattern = """^snapshot-(.+)-(\d+)-(\d+)""".r - /** - * Creates a new snapshot store actor. - */ - def createSnapshotStore(implicit factory: ActorRefFactory): ActorRef = - factory.actorOf(Props(classOf[LocalSnapshotStore], this)) -} -/** - * Snapshot store that stores snapshots on local filesystem. - */ -private[persistence] class LocalSnapshotStore(settings: LocalSnapshotStoreSettings) extends Actor with ActorLogging { - private implicit val executionContext = context.system.dispatchers.lookup("akka.persistence.snapshot-store.local.io.dispatcher") + private val config = context.system.settings.config.getConfig("akka.persistence.snapshot-store.local") + private val streamDispatcher = context.system.dispatchers.lookup(config.getString("stream-dispatcher")) + private val snapshotDir = new File(config.getString("dir")) - // TODO: make snapshot access configurable // TODO: make snapshot serializer configurable - - private val snapshotDir = settings.snapshotDir - private val snapshotAccess = new LocalSnapshotAccess(snapshotDir) private val snapshotSerializer = SnapshotSerialization(context.system).java + private var snapshotMetadata = Map.empty[String, SortedSet[SnapshotMetadata]] - var snapshotMetadata = Map.empty[String, SortedSet[SnapshotMetadata]] + def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = + Future(load(processorId, criteria))(streamDispatcher) - import SnapshotStore._ + def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = + Future(save(metadata, snapshot))(streamDispatcher) - def receive = { - case LoadSnapshot(processorId, criteria, toSequenceNr) ⇒ { - val p = sender - loadSnapshotAsync(processorId, criteria.limit(toSequenceNr)) onComplete { - case Success(sso) ⇒ p ! LoadSnapshotCompleted(sso, toSequenceNr) - case Failure(_) ⇒ p ! LoadSnapshotCompleted(None, toSequenceNr) - } - } - case SaveSnapshot(metadata, snapshot) ⇒ { - val p = sender - val md = metadata.copy(timestamp = System.currentTimeMillis) - saveSnapshotAsync(md, snapshot) onComplete { - case Success(_) ⇒ self tell (SaveSnapshotSucceeded(md), p) - case Failure(e) ⇒ self tell (SaveSnapshotFailed(metadata, e), p) - } - } - case evt @ SaveSnapshotSucceeded(metadata) ⇒ { - updateMetadata(metadata) - sender ! evt // sender is processor - } - case evt @ SaveSnapshotFailed(metadata, reason) ⇒ { - deleteSnapshot(metadata) - sender ! evt // sender is processor - } + def saved(metadata: SnapshotMetadata) { + snapshotMetadata = snapshotMetadata + (snapshotMetadata.get(metadata.processorId) match { + case Some(mds) ⇒ metadata.processorId -> (mds + metadata) + case None ⇒ metadata.processorId -> SortedSet(metadata) + }) } - def loadSnapshotAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SavedSnapshot]] = - Future(loadSnapshot(processorId, criteria)) + def delete(metadata: SnapshotMetadata): Unit = { + snapshotMetadata = snapshotMetadata.get(metadata.processorId) match { + case Some(mds) ⇒ snapshotMetadata + (metadata.processorId -> (mds - metadata)) + case None ⇒ snapshotMetadata + } + snapshotFile(metadata).delete() + } - def loadSnapshot(processorId: String, criteria: SnapshotSelectionCriteria): Option[SavedSnapshot] = { + private def load(processorId: String, criteria: SnapshotSelectionCriteria): Option[SelectedSnapshot] = { @scala.annotation.tailrec - def load(metadata: SortedSet[SnapshotMetadata]): Option[SavedSnapshot] = metadata.lastOption match { + def load(metadata: SortedSet[SnapshotMetadata]): Option[SelectedSnapshot] = metadata.lastOption match { case None ⇒ None case Some(md) ⇒ { - Try(snapshotAccess.withInputStream(md)(snapshotSerializer.deserialize(_, md))) match { - case Success(ss) ⇒ Some(SavedSnapshot(md, ss)) + Try(withInputStream(md)(snapshotSerializer.deserialize(_, md))) match { + case Success(s) ⇒ Some(SelectedSnapshot(md, s)) case Failure(e) ⇒ { log.error(e, s"error loading snapshot ${md}") load(metadata.init) // try older snapshot @@ -100,56 +75,21 @@ private[persistence] class LocalSnapshotStore(settings: LocalSnapshotStoreSettin // succeed. // // TODO: make number of loading attempts configurable - // TODO: improve heuristics for remote snapshot loading for { - mds ← snapshotMetadata.get(processorId) - md ← load(mds.filter(md ⇒ + md ← load(metadata(processorId).filter(md ⇒ md.sequenceNr <= criteria.maxSequenceNr && md.timestamp <= criteria.maxTimestamp).takeRight(3)) } yield md } - def saveSnapshotAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = - Future(saveSnapshot(metadata, snapshot)) + private def save(metadata: SnapshotMetadata, snapshot: Any): Unit = + withOutputStream(metadata)(snapshotSerializer.serialize(_, metadata, snapshot)) - private def saveSnapshot(metadata: SnapshotMetadata, snapshot: Any): Unit = - snapshotAccess.withOutputStream(metadata)(snapshotSerializer.serialize(_, metadata, snapshot)) - - def deleteSnapshot(metadata: SnapshotMetadata): Unit = - snapshotAccess.delete(metadata) - - def updateMetadata(metadata: SnapshotMetadata): Unit = { - snapshotMetadata = snapshotMetadata + (snapshotMetadata.get(metadata.processorId) match { - case Some(mds) ⇒ metadata.processorId -> (mds + metadata) - case None ⇒ metadata.processorId -> SortedSet(metadata) - }) - } - - override def preStart() { - if (!snapshotDir.exists) snapshotDir.mkdirs() - snapshotMetadata = SortedSet.empty ++ snapshotAccess.metadata groupBy (_.processorId) - super.preStart() - } -} - -/** - * Access to snapshot files on local filesystem. - */ -private[persistence] class LocalSnapshotAccess(snapshotDir: File) extends SnapshotAccess { - private val FilenamePattern = """^snapshot-(.+)-(\d+)-(\d+)""".r - - def metadata: Set[SnapshotMetadata] = snapshotDir.listFiles.map(_.getName).collect { - case FilenamePattern(pid, snr, tms) ⇒ SnapshotMetadata(URLDecoder.decode(pid, "UTF-8"), snr.toLong, tms.toLong) - }.toSet - - def delete(metadata: SnapshotMetadata): Unit = - snapshotFile(metadata).delete() - - def withOutputStream(metadata: SnapshotMetadata)(p: (OutputStream) ⇒ Unit) = + private def withOutputStream(metadata: SnapshotMetadata)(p: (OutputStream) ⇒ Unit) = withStream(new BufferedOutputStream(new FileOutputStream(snapshotFile(metadata))), p) - def withInputStream(metadata: SnapshotMetadata)(p: (InputStream) ⇒ Any) = + private def withInputStream(metadata: SnapshotMetadata)(p: (InputStream) ⇒ Any) = withStream(new BufferedInputStream(new FileInputStream(snapshotFile(metadata))), p) private def withStream[A <: Closeable, B](stream: A, p: A ⇒ B): B = @@ -157,4 +97,17 @@ private[persistence] class LocalSnapshotAccess(snapshotDir: File) extends Snapsh private def snapshotFile(metadata: SnapshotMetadata): File = new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.processorId, "UTF-8")}-${metadata.sequenceNr}-${metadata.timestamp}") + + private def metadata(processorId: String): SortedSet[SnapshotMetadata] = + snapshotMetadata.getOrElse(processorId, SortedSet.empty) + + private def metadata: Seq[SnapshotMetadata] = snapshotDir.listFiles.map(_.getName).collect { + case FilenamePattern(pid, snr, tms) ⇒ SnapshotMetadata(URLDecoder.decode(pid, "UTF-8"), snr.toLong, tms.toLong) + } + + override def preStart() { + if (!snapshotDir.exists) snapshotDir.mkdirs() + snapshotMetadata = SortedSet.empty ++ metadata groupBy (_.processorId) + super.preStart() + } } diff --git a/akka-persistence/src/test/scala/akka/persistence/ChannelSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ChannelSpec.scala index 1756f2e0c5..08838277cb 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ChannelSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ChannelSpec.scala @@ -1,17 +1,15 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + package akka.persistence +import com.typesafe.config._ + import akka.actor._ import akka.testkit._ object ChannelSpec { - val config = - """ - |serialize-creators = on - |serialize-messages = on - |akka.persistence.journal.leveldb.dir = "target/journal-channel-spec" - |akka.persistence.snapshot-store.local.dir = ${akka.persistence.journal.leveldb.dir}/snapshots - """.stripMargin - class TestProcessor(name: String) extends NamedProcessor(name) { val destination = context.actorOf(Props[TestDestination]) val channel = context.actorOf(Channel.props("channel")) @@ -42,7 +40,7 @@ object ChannelSpec { } } -class ChannelSpec extends AkkaSpec(ChannelSpec.config) with PersistenceSpec with ImplicitSender { +abstract class ChannelSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender { import ChannelSpec._ override protected def beforeEach() { @@ -120,3 +118,6 @@ class ChannelSpec extends AkkaSpec(ChannelSpec.config) with PersistenceSpec with } } } + +class LeveldbChannelSpec extends ChannelSpec(PersistenceSpec.config("leveldb", "channel")) +class InmemChannelSpec extends ChannelSpec(PersistenceSpec.config("inmem", "channel")) diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala index fa7bfc6f7d..5617e1a0db 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala @@ -1,3 +1,7 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + package akka.persistence import java.io.File @@ -5,6 +9,8 @@ import java.util.concurrent.atomic.AtomicInteger import scala.reflect.ClassTag +import com.typesafe.config.ConfigFactory + import org.apache.commons.io.FileUtils import org.scalatest.BeforeAndAfterEach @@ -38,10 +44,23 @@ trait PersistenceSpec extends BeforeAndAfterEach { this: AkkaSpec ⇒ } override protected def afterTermination() { - FileUtils.deleteDirectory(new File(system.settings.config.getString("akka.persistence.journal.leveldb.dir"))) + List("akka.persistence.journal.leveldb.dir", "akka.persistence.snapshot-store.local.dir") foreach { s ⇒ + FileUtils.deleteDirectory(new File(system.settings.config.getString(s))) + } } } +object PersistenceSpec { + def config(plugin: String, test: String) = ConfigFactory.parseString( + s""" + |serialize-creators = on + |serialize-messages = on + |akka.persistence.journal.plugin = "akka.persistence.journal.${plugin}" + |akka.persistence.journal.leveldb.dir = "target/journal-${test}-spec" + |akka.persistence.snapshot-store.local.dir = "target/snapshots-${test}-spec/" + """.stripMargin) +} + abstract class NamedProcessor(name: String) extends Processor { override def processorId: String = name } diff --git a/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala index 5d3a6ca734..c9eb14fc7a 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala @@ -1,17 +1,15 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + package akka.persistence +import com.typesafe.config._ + import akka.actor._ import akka.testkit._ object ProcessorSpec { - val config = - """ - |serialize-creators = on - |serialize-messages = on - |akka.persistence.journal.leveldb.dir = "target/journal-processor-spec" - |akka.persistence.snapshot-store.local.dir = ${akka.persistence.journal.leveldb.dir}/snapshots - """.stripMargin - class RecoverTestProcessor(name: String) extends NamedProcessor(name) { var state = List.empty[String] def receive = { @@ -128,7 +126,7 @@ object ProcessorSpec { } } -class ProcessorSpec extends AkkaSpec(ProcessorSpec.config) with PersistenceSpec with ImplicitSender { +abstract class ProcessorSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender { import ProcessorSpec._ override protected def beforeEach() { @@ -296,3 +294,6 @@ class ProcessorSpec extends AkkaSpec(ProcessorSpec.config) with PersistenceSpec } } } + +class LeveldbProcessorSpec extends ProcessorSpec(PersistenceSpec.config("leveldb", "processor")) +class InmemProcessorSpec extends ProcessorSpec(PersistenceSpec.config("inmem", "processor")) diff --git a/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala index 3df87e8f27..023bd3402c 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala @@ -1,17 +1,15 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + package akka.persistence +import com.typesafe.config._ + import akka.actor._ import akka.testkit._ object ProcessorStashSpec { - val config = - """ - |serialize-creators = on - |serialize-messages = on - |akka.persistence.journal.leveldb.dir = "target/journal-processor-stash-spec" - |akka.persistence.snapshot-store.local.dir = ${akka.persistence.journal.leveldb.dir}/snapshots - """.stripMargin - class StashingProcessor(name: String) extends NamedProcessor(name) { var state: List[String] = Nil @@ -48,7 +46,7 @@ object ProcessorStashSpec { } } -class ProcessorStashSpec extends AkkaSpec(ProcessorStashSpec.config) with PersistenceSpec with ImplicitSender { +abstract class ProcessorStashSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender { import ProcessorStashSpec._ "A processor" must { @@ -120,3 +118,6 @@ class ProcessorStashSpec extends AkkaSpec(ProcessorStashSpec.config) with Persis } } } + +class LeveldbProcessorStashSpec extends ProcessorStashSpec(PersistenceSpec.config("leveldb", "processor-stash")) +class InmemProcessorStashSpec extends ProcessorStashSpec(PersistenceSpec.config("inmem", "processor-stash")) diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala index 9b13de6277..f17a59243b 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala @@ -8,23 +8,15 @@ import akka.actor._ import akka.testkit._ object SnapshotSpec { - val config = - """ - |serialize-creators = on - |serialize-messages = on - |akka.persistence.journal.leveldb.dir = "target/journal-snapshot-spec" - |akka.persistence.snapshot-store.local.dir = ${akka.persistence.journal.leveldb.dir}/snapshots - """.stripMargin - case object TakeSnapshot class SaveSnapshotTestProcessor(name: String, probe: ActorRef) extends NamedProcessor(name) { var state = List.empty[String] def receive = { - case Persistent(payload, snr) ⇒ state = s"${payload}-${snr}" :: state - case TakeSnapshot ⇒ saveSnapshot(state) - case SaveSnapshotSucceeded(md) ⇒ probe ! md.sequenceNr - case GetState ⇒ probe ! state.reverse + case Persistent(payload, snr) ⇒ state = s"${payload}-${snr}" :: state + case TakeSnapshot ⇒ saveSnapshot(state) + case SaveSnapshotSuccess(md) ⇒ probe ! md.sequenceNr + case GetState ⇒ probe ! state.reverse } } @@ -38,7 +30,7 @@ object SnapshotSpec { } } -class SnapshotSpec extends AkkaSpec(SnapshotSpec.config) with PersistenceSpec with ImplicitSender { +class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "snapshot")) with PersistenceSpec with ImplicitSender { import SnapshotSpec._ override protected def beforeEach() { diff --git a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/SnapshotExample.scala b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/SnapshotExample.scala index ae6f678876..967a03ba2b 100644 --- a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/SnapshotExample.scala +++ b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/SnapshotExample.scala @@ -17,12 +17,12 @@ object SnapshotExample extends App { var state = ExampleState() def receive = { - case Persistent(s, snr) ⇒ state = state.update(s"${s}-${snr}") - case SaveSnapshotSucceeded(metadata) ⇒ // ... - case SaveSnapshotFailed(metadata, reason) ⇒ // ... - case SnapshotOffer(_, s: ExampleState) ⇒ println("offered state = " + s); state = s - case "print" ⇒ println("current state = " + state) - case "snap" ⇒ saveSnapshot(state) + case Persistent(s, snr) ⇒ state = state.update(s"${s}-${snr}") + case SaveSnapshotSuccess(metadata) ⇒ // ... + case SaveSnapshotFailure(metadata, reason) ⇒ // ... + case SnapshotOffer(_, s: ExampleState) ⇒ println("offered state = " + s); state = s + case "print" ⇒ println("current state = " + state) + case "snap" ⇒ saveSnapshot(state) } }