+per #3641 Storage plugin API

- Journal plugin API for storage backends with asynchronous client API (default impl: in-memory journal)
- Journal plugin API for storage backends with synchronous client API (default impl: LevelDB journal)
- Snapshot store plugin API (default impl: local filesystem snapshot store)
This commit is contained in:
Martin Krasser 2013-10-08 11:46:02 +02:00
parent 1bda2a43d5
commit da7490bbc9
33 changed files with 1454 additions and 474 deletions

View file

@ -391,6 +391,12 @@ akka-agent
.. literalinclude:: ../../../akka-agent/src/main/resources/reference.conf
:language: none
akka-persistence
~~~~~~~~~~~~~~~~
.. literalinclude:: ../../../akka-persistence/src/main/resources/reference.conf
:language: none
akka-zeromq
~~~~~~~~~~~

View file

@ -25,13 +25,20 @@ public class PersistenceDocTest {
class MyProcessor extends UntypedProcessor {
public void onReceive(Object message) throws Exception {
if (message instanceof Persistent) {
// message has been written to journal
// message successfully written to journal
Persistent persistent = (Persistent)message;
Object payload = persistent.payload();
Long sequenceNr = persistent.sequenceNr();
// ...
} else if (message instanceof PersistenceFailure) {
// message failed to be written to journal
PersistenceFailure failure = (PersistenceFailure)message;
Object payload = failure.payload();
Long sequenceNr = failure.sequenceNr();
Throwable cause = failure.cause();
// ...
} else {
// message has not been written to journal
// message not written to journal
}
}
}
@ -179,11 +186,11 @@ public class PersistenceDocTest {
public void onReceive(Object message) throws Exception {
if (message.equals("snap")) {
saveSnapshot(state);
} else if (message instanceof SaveSnapshotSucceeded) {
SnapshotMetadata metadata = ((SaveSnapshotSucceeded)message).metadata();
} else if (message instanceof SaveSnapshotSuccess) {
SnapshotMetadata metadata = ((SaveSnapshotSuccess)message).metadata();
// ...
} else if (message instanceof SaveSnapshotFailed) {
SnapshotMetadata metadata = ((SaveSnapshotFailed)message).metadata();
} else if (message instanceof SaveSnapshotFailure) {
SnapshotMetadata metadata = ((SaveSnapshotFailure)message).metadata();
// ...
}
}
@ -225,6 +232,5 @@ public class PersistenceDocTest {
//#snapshot-criteria
}
}
};
}

View file

@ -0,0 +1,141 @@
package docs.persistence;
//#plugin-imports
import scala.concurrent.Future;
import akka.japi.Option;
import akka.japi.Procedure;
import akka.persistence.*;
import akka.persistence.journal.japi.*;
import akka.persistence.snapshot.japi.*;
//#plugin-imports
public class PersistencePluginDocTest {
static Object o1 = new Object() {
abstract class MySnapshotStore extends SnapshotStore {
//#snapshot-store-plugin-api
/**
* Plugin Java API.
*
* Asynchronously loads a snapshot.
*
* @param processorId processor id.
* @param criteria selection criteria for loading.
*/
public abstract Future<Option<SelectedSnapshot>> doLoadAsync(String processorId, SnapshotSelectionCriteria criteria);
/**
* Plugin Java API.
*
* Asynchronously saves a snapshot.
*
* @param metadata snapshot metadata.
* @param snapshot snapshot.
*/
public abstract Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot);
/**
* Plugin Java API.
*
* Called after successful saving of a snapshot.
*
* @param metadata snapshot metadata.
*/
public abstract void onSaved(SnapshotMetadata metadata) throws Exception;
/**
* Plugin Java API.
*
* Deletes the snapshot identified by `metadata`.
*
* @param metadata snapshot metadata.
*/
public abstract void doDelete(SnapshotMetadata metadata) throws Exception;
//#snapshot-store-plugin-api
}
abstract class MySyncWriteJournal extends SyncWriteJournal {
//#sync-write-plugin-api
/**
* Plugin Java API.
*
* Synchronously writes a `persistent` message to the journal.
*/
@Override
public abstract void doWrite(PersistentImpl persistent) throws Exception;
/**
* Plugin Java API.
*
* Synchronously marks a `persistent` message as deleted.
*/
@Override
public abstract void doDelete(PersistentImpl persistent) throws Exception;
/**
* Plugin Java API.
*
* Synchronously writes a delivery confirmation to the journal.
*/
@Override
public abstract void doConfirm(String processorId, long sequenceNr, String channelId) throws Exception;
//#sync-write-plugin-api
}
abstract class MyAsyncWriteJournal extends AsyncWriteJournal {
//#async-write-plugin-api
/**
* Plugin Java API.
*
* Asynchronously writes a `persistent` message to the journal.
*/
@Override
public abstract Future<Void> doWriteAsync(PersistentImpl persistent);
/**
* Plugin Java API.
*
* Asynchronously marks a `persistent` message as deleted.
*/
@Override
public abstract Future<Void> doDeleteAsync(PersistentImpl persistent);
/**
* Plugin Java API.
*
* Asynchronously writes a delivery confirmation to the journal.
*/
@Override
public abstract Future<Void> doConfirmAsync(String processorId, long sequenceNr, String channelId);
//#async-write-plugin-api
}
abstract class MyAsyncReplay extends AsyncReplay {
//#async-replay-plugin-api
/**
* Plugin Java API.
*
* Asynchronously replays persistent messages. Implementations replay a message
* by calling `replayCallback`. The returned future must be completed when all
* messages (matching the sequence number bounds) have been replayed. The future
* `Long` value must be the highest stored sequence number in the journal for the
* specified processor. The future must be completed with a failure if any of
* the persistent messages could not be replayed.
*
* The `replayCallback` must also be called with messages that have been marked
* as deleted. In this case a replayed message's `deleted` field must be set to
* `true`.
*
* The channel ids of delivery confirmations that are available for a replayed
* message must be contained in that message's `confirms` sequence.
*
* @param processorId processor id.
* @param fromSequenceNr sequence number where replay should start.
* @param toSequenceNr sequence number where replay should end (inclusive).
* @param replayCallback called to replay a single message.
*/
@Override
public abstract Future<Long> doReplayAsync(String processorId, long fromSequenceNr, long toSequenceNr, Procedure<PersistentImpl> replayCallback);
//#async-replay-plugin-api
}
};
}

View file

@ -50,12 +50,12 @@ Configuration
By default, journaled messages are written to a directory named ``journal`` in the current working directory. This
can be changed by configuration where the specified path can be relative or absolute:
.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#journal-config
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#journal-config
The default storage location of :ref:`snapshots-java` is a directory named ``snapshots`` in the current working directory.
This can be changed by configuration where the specified path can be relative or absolute:
.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#snapshot-config
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-config
.. _processors-java:
@ -69,8 +69,12 @@ A processor can be implemented by extending the abstract ``UntypedProcessor`` cl
Processors only write messages of type ``Persistent`` to the journal, others are received without being persisted.
When a processor's ``onReceive`` method is called with a ``Persistent`` message it can safely assume that this message
has been successfully written to the journal. A ``UntypedProcessor`` itself is an ``Actor`` and can therefore
be instantiated with ``actorOf``.
has been successfully written to the journal. If a journal fails to write a ``Persistent`` message then the processor
receives a ``PersistenceFailure`` message instead of a ``Persistent`` message. In this case, a processor may want to
inform the sender about the failure, so that the sender can re-send the message, if needed, under the assumption that
the journal recovered from a temporary failure.
An ``UntypedProcessor`` itself is an ``Actor`` and can therefore be instantiated with ``actorOf``.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#usage
@ -226,7 +230,7 @@ Snapshots
Snapshots can dramatically reduce recovery times. Processors can save snapshots of internal state by calling the
``saveSnapshot`` method on ``Processor``. If saving of a snapshot succeeds, the processor will receive a
``SaveSnapshotSucceeded`` message, otherwise a ``SaveSnapshotFailed`` message.
``SaveSnapshotSuccess`` message, otherwise a ``SaveSnapshotFailure`` message.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#save-snapshot
@ -247,11 +251,59 @@ If not specified, they default to ``SnapshotSelectionCriteria.latest()`` which s
To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.none()``. A recovery where no
saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages.
Storage plugins
===============
Storage backends for journals and snapshot stores are plugins in akka-persistence. The default journal plugin writes
messages to LevelDB. The default snapshot store plugin writes snapshots as individual files to the local filesystem.
Applications can provide their own plugins by implementing a plugin API and activate them by configuration. Plugin
development requires the following imports:
.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#plugin-imports
Journal plugin API
------------------
A journal plugin either extends ``SyncWriteJournal`` or ``AsyncWriteJournal``. ``SyncWriteJournal`` is an
actor that should be extended when the storage backend API only supports synchronous, blocking writes. The
methods to be implemented in this case are:
.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#sync-write-plugin-api
``AsyncWriteJournal`` is an actor that should be extended if the storage backend API supports asynchronous,
non-blocking writes. The methods to be implemented in that case are:
.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#async-write-plugin-api
Message replays are always asynchronous, therefore, any journal plugin must implement:
.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#async-replay-plugin-api
A journal plugin can be activated with the following minimal configuration:
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#journal-plugin-config
The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher
used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``
for ``SyncWriteJournal`` plugins and ``akka.actor.default-dispatcher`` for ``AsyncWriteJournal`` plugins.
Snapshot store plugin API
-------------------------
A snapshot store plugin must extend the ``SnapshotStore`` actor and implement the following methods:
.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#snapshot-store-plugin-api
A snapshot store plugin can be activated with the following minimal configuration:
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-store-plugin-config
The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher
used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``.
Upcoming features
=================
* Journal plugin API
* Snapshot store plugin API
* Reliable channels
* Custom serialization of messages and snapshots
* Extended deletion of messages and snapshots

View file

@ -2,31 +2,27 @@ package docs.persistence
import akka.actor.ActorSystem
import akka.persistence._
import akka.persistence.SaveSnapshotSucceeded
import scala.Some
trait PersistenceDocSpec {
val system: ActorSystem
val config =
"""
//#journal-config
akka.persistence.journal.leveldb.dir = "target/journal"
//#journal-config
//#snapshot-config
akka.persistence.snapshot-store.local.dir = "target/snapshots"
//#snapshot-config
"""
import system._
new AnyRef {
//#definition
import akka.persistence.{ Persistent, Processor }
import akka.persistence.{ Persistent, PersistenceFailure, Processor }
class MyProcessor extends Processor {
def receive = {
case Persistent(payload, sequenceNr) // message has been written to journal
case other // message has not been written to journal
case Persistent(payload, sequenceNr) {
// message successfully written to journal
}
case PersistenceFailure(payload, sequenceNr, cause) {
// message failed to be written to journal
}
case other {
// message not written to journal
}
}
}
//#definition
@ -196,8 +192,8 @@ trait PersistenceDocSpec {
def receive = {
case "snap" saveSnapshot(state)
case SaveSnapshotSucceeded(metadata) // ...
case SaveSnapshotFailed(metadata, reason) // ...
case SaveSnapshotSuccess(metadata) // ...
case SaveSnapshotFailure(metadata, reason) // ...
}
}
//#save-snapshot

View file

@ -0,0 +1,78 @@
package docs.persistence
//#plugin-imports
import scala.concurrent.Future
//#plugin-imports
import com.typesafe.config._
import org.scalatest.WordSpec
import akka.actor.ActorSystem
//#plugin-imports
import akka.persistence._
import akka.persistence.journal._
import akka.persistence.snapshot._
//#plugin-imports
object PersistencePluginDocSpec {
val config =
"""
//#journal-config
akka.persistence.journal.leveldb.dir = "target/journal"
//#journal-config
//#snapshot-config
akka.persistence.snapshot-store.local.dir = "target/snapshots"
//#snapshot-config
"""
}
class PersistencePluginDocSpec extends WordSpec {
new AnyRef {
val providerConfig =
"""
//#journal-plugin-config
# Path to the journal plugin to be used
akka.persistence.journal.plugin = "my-journal"
# My custom journal plugin
my-journal {
# Class name of the plugin.
class = "docs.persistence.MyJournal"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.actor.default-dispatcher"
}
//#journal-plugin-config
//#snapshot-store-plugin-config
# Path to the snapshot store plugin to be used
akka.persistence.snapshot-store.plugin = "my-snapshot-store"
# My custom snapshot store plugin
my-snapshot-store {
# Class name of the plugin.
class = "docs.persistence.MySnapshotStore"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
}
//#snapshot-store-plugin-config
"""
val system = ActorSystem("doc", ConfigFactory.parseString(providerConfig).withFallback(ConfigFactory.parseString(PersistencePluginDocSpec.config)))
val extension = Persistence(system)
}
}
class MyJournal extends AsyncWriteJournal {
def writeAsync(persistent: PersistentImpl): Future[Unit] = ???
def deleteAsync(persistent: PersistentImpl): Future[Unit] = ???
def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] = ???
def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentImpl) Unit): Future[Long] = ???
}
class MySnapshotStore extends SnapshotStore {
def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = ???
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = ???
def saved(metadata: SnapshotMetadata) {}
def delete(metadata: SnapshotMetadata) {}
}

View file

@ -46,12 +46,12 @@ Configuration
By default, journaled messages are written to a directory named ``journal`` in the current working directory. This
can be changed by configuration where the specified path can be relative or absolute:
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#journal-config
.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#journal-config
The default storage location of :ref:`snapshots` is a directory named ``snapshots`` in the current working directory.
This can be changed by configuration where the specified path can be relative or absolute:
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#snapshot-config
.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-config
.. _processors:
@ -64,8 +64,12 @@ A processor can be implemented by extending the ``Processor`` trait and implemen
Processors only write messages of type ``Persistent`` to the journal, others are received without being persisted.
When a processor's ``receive`` method is called with a ``Persistent`` message it can safely assume that this message
has been successfully written to the journal. A ``Processor`` itself is an ``Actor`` and can therefore be instantiated
with ``actorOf``.
has been successfully written to the journal. If a journal fails to write a ``Persistent`` message then the processor
receives a ``PersistenceFailure`` message instead of a ``Persistent`` message. In this case, a processor may want to
inform the sender about the failure, so that the sender can re-send the message, if needed, under the assumption that
the journal recovered from a temporary failure.
A ``Processor`` itself is an ``Actor`` and can therefore be instantiated with ``actorOf``.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#usage
@ -233,7 +237,7 @@ Snapshots
Snapshots can dramatically reduce recovery times. Processors can save snapshots of internal state by calling the
``saveSnapshot`` method on ``Processor``. If saving of a snapshot succeeds, the processor will receive a
``SaveSnapshotSucceeded`` message, otherwise a ``SaveSnapshotFailed`` message
``SaveSnapshotSuccess`` message, otherwise a ``SaveSnapshotFailure`` message
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#save-snapshot
@ -258,6 +262,56 @@ If not specified, they default to ``SnapshotSelectionCriteria.Latest`` which sel
To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.None``. A recovery where no
saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages.
Storage plugins
===============
Storage backends for journals and snapshot stores are plugins in akka-persistence. The default journal plugin writes
messages to LevelDB. The default snapshot store plugin writes snapshots as individual files to the local filesystem.
Applications can provide their own plugins by implementing a plugin API and activate them by configuration. Plugin
development requires the following imports:
.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#plugin-imports
Journal plugin API
------------------
A journal plugin either extends ``SyncWriteJournal`` or ``AsyncWriteJournal``. ``SyncWriteJournal`` is an
actor that should be extended when the storage backend API only supports synchronous, blocking writes. The
methods to be implemented in this case are:
.. includecode:: ../../../akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala#journal-plugin-api
``AsyncWriteJournal`` is an actor that should be extended if the storage backend API supports asynchronous,
non-blocking writes. The methods to be implemented in that case are:
.. includecode:: ../../../akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala#journal-plugin-api
Message replays are always asynchronous, therefore, any journal plugin must implement:
.. includecode:: ../../../akka-persistence/src/main/scala/akka/persistence/journal/AsyncReplay.scala#journal-plugin-api
A journal plugin can be activated with the following minimal configuration:
.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#journal-plugin-config
The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher
used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``
for ``SyncWriteJournal`` plugins and ``akka.actor.default-dispatcher`` for ``AsyncWriteJournal`` plugins.
Snapshot store plugin API
-------------------------
A snapshot store plugin must extend the ``SnapshotStore`` actor and implement the following methods:
.. includecode:: ../../../akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala#snapshot-store-plugin-api
A snapshot store plugin can be activated with the following minimal configuration:
.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-store-plugin-config
The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher
used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``.
Miscellaneous
=============
@ -271,8 +325,6 @@ State machines can be persisted by mixing in the ``FSM`` trait into processors.
Upcoming features
=================
* Journal plugin API
* Snapshot store plugin API
* Reliable channels
* Custom serialization of messages and snapshots
* Extended deletion of messages and snapshots

View file

@ -1,14 +1,77 @@
##########################################
# Akka Persistence Reference Config File #
##########################################
akka {
persistence {
journal {
use = "leveldb"
# Path to the journal plugin to be used
plugin = "akka.persistence.journal.leveldb"
# In-memory journal plugin.
inmem {
# Class name of the plugin.
class = "akka.persistence.journal.inmem.InmemJournal"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.actor.default-dispatcher"
}
# LevelDB journal plugin.
leveldb {
# Class name of the plugin.
class = "akka.persistence.journal.leveldb.LeveldbJournal"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
# Dispatcher for message replay.
replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher"
# Storage location of LevelDB files.
dir = "journal"
write.dispatcher {
# Use fsync on write
fsync = off
# Verify checksum on read.
checksum = off
}
}
snapshot-store {
# Path to the snapshot store plugin to be used
plugin = "akka.persistence.snapshot-store.local"
# Local filesystem snapshot store plugin.
local {
# Class name of the plugin.
class = "akka.persistence.snapshot.local.LocalSnapshotStore"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
# Dispatcher for streaming snapshot IO.
stream-dispatcher = "akka.persistence.dispatchers.default-stream-dispatcher"
# Storage location of snapshot files.
dir = "snapshots"
}
}
dispatchers {
default-plugin-dispatcher {
type = PinnedDispatcher
executor = "thread-pool-executor"
}
replay.dispatcher {
default-replay-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
@ -16,14 +79,7 @@ akka {
core-pool-size-max = 8
}
}
fsync = off
}
}
snapshot-store {
use = "local"
local {
dir = "snapshots"
io.dispatcher {
default-stream-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
@ -34,4 +90,3 @@ akka {
}
}
}
}

View file

@ -6,17 +6,10 @@ package akka.persistence
import akka.actor._
private[persistence] trait JournalFactory {
/**
* Creates a new journal actor.
*/
def createJournal(implicit factory: ActorRefFactory): ActorRef
}
/**
* Defines messages exchanged between processors, channels and a journal.
*/
private[persistence] object Journal {
private[persistence] object JournalProtocol {
/**
* Instructs a journal to mark the `persistent` message as deleted.
* A persistent message marked as deleted is not replayed during recovery.
@ -34,11 +27,19 @@ private[persistence] object Journal {
case class Write(persistent: PersistentImpl, processor: ActorRef)
/**
* Reply message to a processor that `persistent` message has been journaled.
* Reply message to a processor that `persistent` message has been successfully journaled.
*
* @param persistent persistent message.
*/
case class Written(persistent: PersistentImpl)
case class WriteSuccess(persistent: PersistentImpl)
/**
* Reply message to a processor that `persistent` message could not be journaled.
*
* @param persistent persistent message.
* @param cause failure cause.
*/
case class WriteFailure(persistent: PersistentImpl, cause: Throwable)
/**
* Instructs a journal to loop a `message` back to `processor`, without persisting the
@ -55,12 +56,17 @@ private[persistence] object Journal {
*
* @param message looped message.
*/
case class Looped(message: Any)
case class LoopSuccess(message: Any)
/**
* ...
* Instructs a journal to replay messages to `processor`.
*
* @param fromSequenceNr sequence number where replay should start.
* @param toSequenceNr sequence number where replay should end (inclusive).
* @param processorId requesting processor id.
* @param processor requesting processor.
*/
case class Replay(fromSequenceNr: Long, toSequenceNr: Long, processor: ActorRef, processorId: String)
case class Replay(fromSequenceNr: Long, toSequenceNr: Long, processorId: String, processor: ActorRef)
/**
* Reply message to a processor that `persistent` message has been replayed.
@ -74,6 +80,12 @@ private[persistence] object Journal {
*
* @param maxSequenceNr the highest stored sequence number (for a processor).
*/
case class ReplayCompleted(maxSequenceNr: Long)
case class ReplaySuccess(maxSequenceNr: Long)
/**
* Reply message to a processor that not all `persistent` messages could have been
* replayed.
*/
case class ReplayFailure(cause: Throwable)
}

View file

@ -7,29 +7,14 @@ package akka.persistence
import com.typesafe.config.Config
import akka.actor._
import akka.persistence.journal.leveldb._
import akka.persistence.snapshot.local._
import akka.dispatch.Dispatchers
import akka.persistence.journal.AsyncWriteJournal
/**
* Persistence extension.
*/
object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider {
class Settings(config: Config) {
val rootConfig = config.getConfig("akka.persistence")
val journalsConfig = rootConfig.getConfig("journal")
val journalName = journalsConfig.getString("use")
val journalConfig = journalsConfig.getConfig(journalName)
val journalFactory = journalName match {
case "leveldb" new LeveldbJournalSettings(journalConfig)
}
val snapshotStoresConfig = rootConfig.getConfig("snapshot-store")
val snapshotStoreName = snapshotStoresConfig.getString("use")
val snapshotStoreConfig = snapshotStoresConfig.getConfig(snapshotStoreName)
val snapshotStoreFactory = snapshotStoreName match {
case "local" new LocalSnapshotStoreSettings(snapshotStoreConfig)
}
}
/**
@ -46,9 +31,12 @@ object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider {
* Persistence extension.
*/
class Persistence(val system: ExtendedActorSystem) extends Extension {
private val settings = new Persistence.Settings(system.settings.config)
private val journal = settings.journalFactory.createJournal(system)
private val snapshotStore = settings.snapshotStoreFactory.createSnapshotStore(system)
private val DefaultPluginDispatcherId = "akka.persistence.dispatchers.default-plugin-dispatcher"
private val config = system.settings.config.getConfig("akka.persistence")
private val snapshotStore = createPlugin("snapshot-store", _ DefaultPluginDispatcherId)
private val journal = createPlugin("journal", clazz
if (classOf[AsyncWriteJournal].isAssignableFrom(clazz)) Dispatchers.DefaultDispatcherId else DefaultPluginDispatcherId)
/**
* Returns a snapshot store for a processor identified by `processorId`.
@ -78,5 +66,14 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
*/
def channelId(channel: ActorRef): String = id(channel)
private def createPlugin(pluginType: String, dispatcherSelector: Class[_] String) = {
val pluginConfigPath = config.getString(s"${pluginType}.plugin")
val pluginConfig = system.settings.config.getConfig(pluginConfigPath)
val pluginClassName = pluginConfig.getString("class")
val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get
val pluginDispatcherId = if (pluginConfig.hasPath("plugin-dispatcher")) pluginConfig.getString("plugin-dispatcher") else dispatcherSelector(pluginClass)
system.actorOf(Props(pluginClass).withDispatcher(pluginDispatcherId))
}
private def id(ref: ActorRef) = ref.path.toStringWithAddress(system.provider.getDefaultAddress)
}

View file

@ -4,6 +4,8 @@
package akka.persistence
import java.util.{ List JList }
import akka.actor.ActorRef
/**
@ -77,25 +79,75 @@ object Persistent {
}
/**
* INTERNAL API.
* Plugin API.
*
* Internal [[Persistent]] representation.
* Internal [[Persistent]] message representation.
*
* @param resolved `true` by default, `false` for replayed messages. Set to `true` by a channel if this
* message is replayed and its sender reference was resolved. Channels use this field to
* avoid redundant sender reference resolutions.
* @param processorId Id of processor that journaled the message.
* @param channelId Id of last channel that delivered the message to a destination.
* @param sender Serialized sender reference.
* @param deleted `true` if this message is marked as deleted.
* @param confirms Channel ids of delivery confirmations that are available for this message. Only non-empty
* for replayed messages.
* @param confirmTarget Delivery confirmation target.
* @param confirmMessage Delivery confirmation message.
*
* @see [[Processor]]
* @see [[Channel]]
* @see [[Deliver]]
*/
private[persistence] case class PersistentImpl(
case class PersistentImpl(
payload: Any,
sequenceNr: Long = 0L,
resolved: Boolean = true,
processorId: String = "",
channelId: String = "",
sender: String = "",
deleted: Boolean = false,
confirms: Seq[String] = Nil,
confirmTarget: ActorRef = null,
confirmMessage: Confirm = null) extends Persistent {
def withPayload(payload: Any): Persistent = copy(payload = payload)
def confirm(): Unit = if (confirmTarget != null) confirmTarget ! confirmMessage
def withPayload(payload: Any): Persistent =
copy(payload = payload)
def confirm(): Unit =
if (confirmTarget != null) confirmTarget ! confirmMessage
import scala.collection.JavaConverters._
/**
* Java Plugin API.
*/
def getConfirms: JList[String] = confirms.asJava
}
object PersistentImpl {
/**
* Java Plugin API.
*/
def create(payload: Any, sequenceNr: Long, resolved: Boolean, processorId: String, channelId: String, sender: String, deleted: Boolean, confirms: Seq[String]): PersistentImpl =
PersistentImpl(payload, sequenceNr, resolved, processorId, channelId, sender, deleted, confirms)
/**
* Java Plugin API.
*/
def create(payload: Any, sequenceNr: Long, resolved: Boolean, processorId: String, channelId: String, sender: String, deleted: Boolean, confirms: Seq[String], confirmTarget: ActorRef, confirmMessage: Confirm): PersistentImpl =
PersistentImpl(payload, sequenceNr, resolved, processorId, channelId, sender, deleted, confirms, confirmTarget, confirmMessage)
}
/**
* Receive by a processor when a journal failed to write a [[Persistent]] message.
*
* @param payload payload of the persistent message.
* @param sequenceNr sequence number of the persistent message.
* @param cause failure cause.
*/
case class PersistenceFailure(payload: Any, sequenceNr: Long, cause: Throwable)
/**
* Message to confirm the receipt of a persistent message (sent via a [[Channel]]).
*/

View file

@ -43,11 +43,16 @@ import akka.dispatch._
* ''user stash'' inherited by `akka.actor.Stash`. `Processor` implementation classes can therefore use the
* ''user stash'' for stashing/unstashing both persistent and transient messages.
*
* Processors can also store snapshots of internal state by calling [[saveSnapshot]]. During recovery, a saved
* snapshot is offered to the processor with a [[SnapshotOffer]] message, followed by replayed messages, if any,
* that are younger than the snapshot. Default is to offer the latest saved snapshot.
*
* @see [[UntypedProcessor]]
* @see [[Recover]]
*/
trait Processor extends Actor with Stash {
import Journal._
import SnapshotStore._
import JournalProtocol._
import SnapshotProtocol._
private val extension = Persistence(context.system)
private val _processorId = extension.processorId(self)
@ -100,23 +105,26 @@ trait Processor extends Actor with Stash {
override def toString: String = "recovery started"
def aroundReceive(receive: Actor.Receive, message: Any) = message match {
case LoadSnapshotCompleted(sso, toSnr) sso match {
case Some(ss) {
process(receive, SnapshotOffer(ss.metadata, ss.snapshot))
journal ! Replay(ss.metadata.sequenceNr + 1L, toSnr, self, processorId)
case LoadSnapshotResult(sso, toSnr) sso match {
case Some(SelectedSnapshot(metadata, snapshot)) {
process(receive, SnapshotOffer(metadata, snapshot))
journal ! Replay(metadata.sequenceNr + 1L, toSnr, processorId, self)
} case None {
journal ! Replay(1L, toSnr, self, processorId)
journal ! Replay(1L, toSnr, processorId, self)
}
}
case ReplayCompleted(maxSnr) {
case ReplaySuccess(maxSnr) {
_currentState = recoverySucceeded
_sequenceNr = maxSnr
unstashAllInternal()
}
case ReplayFailure(cause) {
throw cause
}
case Replayed(p) try { processPersistent(receive, p) } catch {
case t: Throwable {
_currentState = recoveryFailed // delay throwing exception to prepareRestart
_recoveryFailureReason = t
_recoveryFailureCause = t
_recoveryFailureMessage = currentEnvelope
}
}
@ -134,10 +142,9 @@ trait Processor extends Actor with Stash {
def aroundReceive(receive: Actor.Receive, message: Any) = message match {
case r: Recover // ignore
case Replayed(p) processPersistent(receive, p) // can occur after unstash from user stash
case Written(p) processPersistent(receive, p)
case Looped(p) process(receive, p)
case s: SaveSnapshotSucceeded process(receive, s)
case f: SaveSnapshotFailed process(receive, f)
case WriteSuccess(p) processPersistent(receive, p)
case WriteFailure(p, cause) process(receive, PersistenceFailure(p.payload, p.sequenceNr, cause))
case LoopSuccess(m) process(receive, m)
case p: PersistentImpl journal forward Write(p.copy(processorId = processorId, sequenceNr = nextSequenceNr()), self)
case m journal forward Loop(m, self)
}
@ -152,7 +159,7 @@ trait Processor extends Actor with Stash {
override def toString: String = "recovery failed"
def aroundReceive(receive: Actor.Receive, message: Any) = message match {
case ReplayCompleted(maxSnr) {
case ReplaySuccess(maxSnr) {
_currentState = prepareRestart
mailbox.enqueueFirst(self, _recoveryFailureMessage)
}
@ -170,7 +177,7 @@ trait Processor extends Actor with Stash {
override def toString: String = "prepare restart"
def aroundReceive(receive: Actor.Receive, message: Any) = message match {
case Replayed(_) throw _recoveryFailureReason
case Replayed(_) throw _recoveryFailureCause
case _ // ignore
}
}
@ -181,7 +188,7 @@ trait Processor extends Actor with Stash {
private var _currentPersistent: Persistent = _
private var _currentState: State = recoveryPending
private var _recoveryFailureReason: Throwable = _
private var _recoveryFailureCause: Throwable = _
private var _recoveryFailureMessage: Envelope = _
private lazy val journal = extension.journalFor(processorId)
@ -230,7 +237,7 @@ trait Processor extends Actor with Stash {
/**
* Saves a `snapshot` of this processor's state. If saving succeeds, this processor will receive a
* [[SaveSnapshotSucceeded]] message, otherwise a [[SaveSnapshotFailed]] message.
* [[SaveSnapshotSuccess]] message, otherwise a [[SaveSnapshotFailure]] message.
*/
def saveSnapshot(snapshot: Any): Unit = {
snapshotStore ! SaveSnapshot(SnapshotMetadata(processorId, lastSequenceNr), snapshot)
@ -266,8 +273,8 @@ trait Processor extends Actor with Stash {
unstashAllInternal()
} finally {
message match {
case Some(Written(m)) preRestartDefault(reason, Some(m))
case Some(Looped(m)) preRestartDefault(reason, Some(m))
case Some(WriteSuccess(m)) preRestartDefault(reason, Some(m))
case Some(LoopSuccess(m)) preRestartDefault(reason, Some(m))
case Some(Replayed(m)) preRestartDefault(reason, Some(m))
case mo preRestartDefault(reason, None)
}
@ -312,7 +319,7 @@ trait Processor extends Actor with Stash {
// -----------------------------------------------------
private def unstashFilterPredicate: Any Boolean = {
case _: Written false
case _: WriteSuccess false
case _: Replayed false
case _ true
}
@ -381,7 +388,12 @@ trait Processor extends Actor with Stash {
* ''user stash'' inherited by `akka.actor.Stash`. `Processor` implementation classes can therefore use the
* ''user stash'' for stashing/unstashing both persistent and transient messages.
*
* Processors can also store snapshots of internal state by calling [[saveSnapshot]]. During recovery, a saved
* snapshot is offered to the processor with a [[SnapshotOffer]] message, followed by replayed messages, if any,
* that are younger than the snapshot. Default is to offer the latest saved snapshot.
*
* @see [[Processor]]
* @see [[Recover]]
*/
abstract class UntypedProcessor extends UntypedActor with Processor {

View file

@ -4,11 +4,6 @@
package akka.persistence
import java.io._
import akka.actor._
import akka.util.ClassLoaderObjectInputStream
/**
* Snapshot metadata.
*
@ -21,21 +16,21 @@ case class SnapshotMetadata(processorId: String, sequenceNr: Long, timestamp: Lo
//#snapshot-metadata
/**
* Indicates successful saving of a snapshot.
* Notification of a snapshot saving success.
*
* @param metadata snapshot metadata.
*/
@SerialVersionUID(1L)
case class SaveSnapshotSucceeded(metadata: SnapshotMetadata)
case class SaveSnapshotSuccess(metadata: SnapshotMetadata)
/**
* Indicates failed saving of a snapshot.
* Notification of a snapshot saving success failure.
*
* @param metadata snapshot metadata.
* @param reason failure reason.
* @param cause failure cause.
*/
@SerialVersionUID(1L)
case class SaveSnapshotFailed(metadata: SnapshotMetadata, reason: Throwable)
case class SaveSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable)
/**
* Offers a [[Processor]] a previously saved `snapshot` during recovery. This offer is received
@ -45,7 +40,7 @@ case class SaveSnapshotFailed(metadata: SnapshotMetadata, reason: Throwable)
case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any)
/**
* Snapshot selection criteria for recovery.
* Selection criteria for loading snapshots.
*
* @param maxSequenceNr upper bound for a selected snapshot's sequence number. Default is no upper bound.
* @param maxTimestamp upper bound for a selected snapshot's timestamp. Default is no upper bound.
@ -86,109 +81,43 @@ object SnapshotSelectionCriteria {
def none() = None
}
// TODO: support application-defined snapshot serializers
// TODO: support application-defined snapshot access
/**
* Snapshot serialization extension.
*/
private[persistence] object SnapshotSerialization extends ExtensionId[SnapshotSerialization] with ExtensionIdProvider {
def createExtension(system: ExtendedActorSystem): SnapshotSerialization = new SnapshotSerialization(system)
def lookup() = SnapshotSerialization
}
/**
* Snapshot serialization extension.
*/
private[persistence] class SnapshotSerialization(val system: ExtendedActorSystem) extends Extension {
import akka.serialization.JavaSerializer
/**
* Java serialization based snapshot serializer.
*/
val java = new SnapshotSerializer {
def serialize(stream: OutputStream, metadata: SnapshotMetadata, state: Any) = {
val out = new ObjectOutputStream(stream)
JavaSerializer.currentSystem.withValue(system) { out.writeObject(state) }
}
def deserialize(stream: InputStream, metadata: SnapshotMetadata) = {
val in = new ClassLoaderObjectInputStream(system.dynamicAccess.classLoader, stream)
JavaSerializer.currentSystem.withValue(system) { in.readObject }
}
}
}
/**
* Stream-based snapshot serializer.
*/
private[persistence] trait SnapshotSerializer {
/**
* Serializes a `snapshot` to an output stream.
*/
def serialize(stream: OutputStream, metadata: SnapshotMetadata, snapshot: Any): Unit
/**
* Deserializes a snapshot from an input stream.
*/
def deserialize(stream: InputStream, metadata: SnapshotMetadata): Any
}
/**
* Input and output stream management for snapshot serialization.
*/
private[persistence] trait SnapshotAccess {
/**
* Provides a managed output stream for serializing a snapshot.
* Plugin API.
*
* @param metadata snapshot metadata needed to create an output stream.
* @param body called with the managed output stream as argument.
*/
def withOutputStream(metadata: SnapshotMetadata)(body: OutputStream Unit)
/**
* Provides a managed input stream for deserializing a state object.
* A selected snapshot matching [[SnapshotSelectionCriteria]].
*
* @param metadata snapshot metadata needed to create an input stream.
* @param body called with the managed input stream as argument.
* @return read snapshot.
* @param metadata snapshot metadata.
* @param snapshot snapshot.
*/
def withInputStream(metadata: SnapshotMetadata)(body: InputStream Any): Any
case class SelectedSnapshot(metadata: SnapshotMetadata, snapshot: Any)
object SelectedSnapshot {
/**
* Loads the snapshot metadata of all currently stored snapshots.
* Plugin Java API.
*/
def metadata: Set[SnapshotMetadata]
/**
* Deletes the snapshot referenced by `metadata`.
*/
def delete(metadata: SnapshotMetadata)
def create(metadata: SnapshotMetadata, snapshot: Any): SelectedSnapshot =
SelectedSnapshot(metadata, snapshot)
}
private[persistence] trait SnapshotStoreFactory {
/**
* Creates a new snapshot store actor.
* Defines messages exchanged between processors and a snapshot store.
*/
def createSnapshotStore(implicit factory: ActorRefFactory): ActorRef
}
private[persistence] object SnapshotStore {
private[persistence] object SnapshotProtocol {
/**
* Instructs a snapshot store to load a snapshot.
*
* @param processorId processor id.
* @param criteria criteria for selecting a saved snapshot from which recovery should start.
* @param criteria criteria for selecting a snapshot from which recovery should start.
* @param toSequenceNr upper sequence number bound (inclusive) for recovery.
*/
case class LoadSnapshot(processorId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long)
/**
* Reply message to a processor that a snapshot loading attempt has been completed.
* Response message to a [[LoadSnapshot]] message.
*
* @param savedSnapshot
* @param snapshot loaded snapshot, if any.
*/
case class LoadSnapshotCompleted(savedSnapshot: Option[SavedSnapshot], toSequenceNr: Long)
case class LoadSnapshotResult(snapshot: Option[SelectedSnapshot], toSequenceNr: Long)
/**
* Instructs snapshot store to save a snapshot.
@ -197,12 +126,4 @@ private[persistence] object SnapshotStore {
* @param snapshot snapshot.
*/
case class SaveSnapshot(metadata: SnapshotMetadata, snapshot: Any)
/**
* In-memory representation of a saved snapshot.
*
* @param metadata snapshot metadata.
* @param snapshot saved snapshot.
*/
case class SavedSnapshot(metadata: SnapshotMetadata, snapshot: Any)
}

View file

@ -0,0 +1,43 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.journal
import scala.concurrent.Future
import akka.persistence.PersistentImpl
/**
* Asynchronous message replay interface.
*/
trait AsyncReplay {
//#journal-plugin-api
/**
* Plugin API.
*
* Asynchronously replays persistent messages. Implementations replay a message
* by calling `replayCallback`. The returned future must be completed when all
* messages (matching the sequence number bounds) have been replayed. The future
* `Long` value must be the highest stored sequence number in the journal for the
* specified processor. The future must be completed with a failure if any of
* the persistent messages could not be replayed.
*
* The `replayCallback` must also be called with messages that have been marked
* as deleted. In this case a replayed message's `deleted` field must be set to
* `true`.
*
* The channel ids of delivery confirmations that are available for a replayed
* message must be contained in that message's `confirms` sequence.
*
* @param processorId processor id.
* @param fromSequenceNr sequence number where replay should start.
* @param toSequenceNr sequence number where replay should end (inclusive).
* @param replayCallback called to replay a single message.
*
* @see [[AsyncWriteJournal]]
* @see [[SyncWriteJournal]]
*/
def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: PersistentImpl Unit): Future[Long]
//#journal-plugin-api
}

View file

@ -0,0 +1,119 @@
/**
* Copyright (C) 2012-2013 Eligotech BV.
*/
package akka.persistence.journal
import scala.concurrent.Future
import scala.util._
import akka.actor._
import akka.pattern.{ pipe, PromiseActorRef }
import akka.persistence._
import akka.persistence.JournalProtocol._
import akka.serialization.Serialization
/**
* Abstract journal, optimized for asynchronous, non-blocking writes.
*/
trait AsyncWriteJournal extends Actor with AsyncReplay {
import AsyncWriteJournal._
import context.dispatcher
private val extension = Persistence(context.system)
private val resequencer = context.actorOf(Props[Resequencer])
private var resequencerCounter = 1L
final def receive = {
case Write(persistent, processor) {
val csdr = sender
val cctr = resequencerCounter
val psdr = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender
writeAsync(persistent.copy(sender = Serialization.serializedActorPath(psdr), resolved = false, confirmTarget = null, confirmMessage = null)) map {
_ Desequenced(WriteSuccess(persistent), cctr, processor, csdr)
} recover {
case e Desequenced(WriteFailure(persistent, e), cctr, processor, csdr)
} pipeTo (resequencer)
resequencerCounter += 1
}
case Replay(fromSequenceNr, toSequenceNr, processorId, processor) {
// Send replayed messages and replay result to processor directly. No need
// to resequence replayed messages relative to written and looped messages.
replayAsync(processorId, fromSequenceNr, toSequenceNr) { p
if (!p.deleted) processor.tell(Replayed(p), extension.system.provider.resolveActorRef(p.sender))
} map {
maxSnr ReplaySuccess(maxSnr)
} recover {
case e ReplayFailure(e)
} pipeTo (processor)
}
case c @ Confirm(processorId, sequenceNr, channelId) {
confirmAsync(processorId, sequenceNr, channelId) onComplete {
case Success(_) context.system.eventStream.publish(c)
case Failure(e) // TODO: publish failure to event stream
}
context.system.eventStream.publish(c)
}
case Delete(persistent: PersistentImpl) {
deleteAsync(persistent) onComplete {
case Success(_) // TODO: publish success to event stream
case Failure(e) // TODO: publish failure to event stream
}
}
case Loop(message, processor) {
resequencer ! Desequenced(LoopSuccess(message), resequencerCounter, processor, sender)
resequencerCounter += 1
}
}
//#journal-plugin-api
/**
* Plugin API.
*
* Asynchronously writes a `persistent` message to the journal.
*/
def writeAsync(persistent: PersistentImpl): Future[Unit]
/**
* Plugin API.
*
* Asynchronously marks a `persistent` message as deleted.
*/
def deleteAsync(persistent: PersistentImpl): Future[Unit]
/**
* Plugin API.
*
* Asynchronously writes a delivery confirmation to the journal.
*/
def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit]
//#journal-plugin-api
}
private[persistence] object AsyncWriteJournal {
case class Desequenced(msg: Any, snr: Long, target: ActorRef, sender: ActorRef)
class Resequencer extends Actor {
import scala.collection.mutable.Map
private val delayed = Map.empty[Long, Desequenced]
private var delivered = 0L
def receive = {
case d: Desequenced resequence(d)
}
@scala.annotation.tailrec
private def resequence(d: Desequenced) {
if (d.snr == delivered + 1) {
delivered = d.snr
d.target tell (d.msg, d.sender)
} else {
delayed += (d.snr -> d)
}
val ro = delayed.remove(delivered + 1)
if (ro.isDefined) resequence(ro.get)
}
}
}

View file

@ -0,0 +1,74 @@
/**
* Copyright (C) 2012-2013 Eligotech BV.
*/
package akka.persistence.journal
import scala.util._
import akka.actor.Actor
import akka.pattern.{ pipe, PromiseActorRef }
import akka.persistence._
import akka.serialization.Serialization
/**
* Abstract journal, optimized for synchronous writes.
*/
trait SyncWriteJournal extends Actor with AsyncReplay {
import JournalProtocol._
import context.dispatcher
private val extension = Persistence(context.system)
final def receive = {
case Write(persistent, processor) {
val sdr = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender
Try(write(persistent.copy(sender = Serialization.serializedActorPath(sdr), resolved = false, confirmTarget = null, confirmMessage = null))) match {
case Success(_) processor forward WriteSuccess(persistent)
case Failure(e) processor forward WriteFailure(persistent, e); throw e
}
}
case Replay(fromSequenceNr, toSequenceNr, processorId, processor) {
replayAsync(processorId, fromSequenceNr, toSequenceNr) { p
if (!p.deleted) processor.tell(Replayed(p), extension.system.provider.resolveActorRef(p.sender))
} map {
maxSnr ReplaySuccess(maxSnr)
} recover {
case e ReplayFailure(e)
} pipeTo (processor)
}
case c @ Confirm(processorId, sequenceNr, channelId) {
confirm(processorId, sequenceNr, channelId)
context.system.eventStream.publish(c) // TODO: turn off by default and allow to turn on by configuration
}
case Delete(persistent: PersistentImpl) {
delete(persistent)
}
case Loop(message, processor) {
processor forward LoopSuccess(message)
}
}
//#journal-plugin-api
/**
* Plugin API.
*
* Synchronously writes a `persistent` message to the journal.
*/
def write(persistent: PersistentImpl): Unit
/**
* Plugin API.
*
* Synchronously marks a `persistent` message as deleted.
*/
def delete(persistent: PersistentImpl): Unit
/**
* Plugin API.
*
* Synchronously writes a delivery confirmation to the journal.
*/
def confirm(processorId: String, sequenceNr: Long, channelId: String): Unit
//#journal-plugin-api
}

View file

@ -0,0 +1,90 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.journal.inmem
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps
import akka.actor._
import akka.pattern.ask
import akka.persistence._
import akka.persistence.journal.AsyncWriteJournal
import akka.util._
/**
* INTERNAL API.
*
* In-memory journal for testing purposes only.
*/
private[persistence] class InmemJournal extends AsyncWriteJournal {
val store = context.actorOf(Props[InmemStore])
implicit val timeout = Timeout(5 seconds)
import InmemStore._
def writeAsync(persistent: PersistentImpl): Future[Unit] =
(store ? Write(persistent)).mapTo[Unit]
def deleteAsync(persistent: PersistentImpl): Future[Unit] =
(store ? Delete(persistent)).mapTo[Unit]
def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] =
(store ? Confirm(processorId, sequenceNr, channelId)).mapTo[Unit]
def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentImpl) Unit): Future[Long] =
(store ? Replay(processorId, fromSequenceNr, toSequenceNr, replayCallback)).mapTo[Long]
}
private[persistence] class InmemStore extends Actor {
import InmemStore._
// processor id => persistent message
var messages = Map.empty[String, Vector[PersistentImpl]]
def receive = {
case Write(p) add(p); success()
case Delete(p) update(p.processorId, p.sequenceNr)(_.copy(deleted = true)); success()
case Confirm(pid, snr, cid) update(pid, snr)(p p.copy(confirms = cid +: p.confirms)); success()
case Replay(pid, fromSnr, toSnr, callback) {
for {
ms messages.get(pid)
m ms
if m.sequenceNr >= fromSnr && m.sequenceNr <= toSnr
} callback(m)
success(maxSequenceNr(pid))
}
}
private def success(reply: Any = ()) =
sender ! reply
private def add(p: PersistentImpl) = messages = messages + (messages.get(p.processorId) match {
case Some(ms) p.processorId -> (ms :+ p)
case None p.processorId -> Vector(p)
})
private def update(pid: String, snr: Long)(f: PersistentImpl PersistentImpl) = messages = messages.get(pid) match {
case Some(ms) messages + (pid -> ms.map(sp if (sp.sequenceNr == snr) f(sp) else sp))
case None messages
}
private def maxSequenceNr(pid: String): Long = {
val snro = for {
ms messages.get(pid)
m ms.lastOption
} yield m.sequenceNr
snro.getOrElse(0L)
}
}
private[persistence] object InmemStore {
case class Write(p: PersistentImpl)
case class Delete(p: PersistentImpl)
case class Confirm(processorId: String, sequenceNr: Long, channelId: String)
case class Replay(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, replayCallback: (PersistentImpl) Unit)
}

View file

@ -0,0 +1,47 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.journal.japi
import java.lang.{ Long JLong }
import scala.concurrent.Future
import akka.actor.Actor
import akka.japi.Procedure
import akka.persistence.journal.{ AsyncReplay SAsyncReplay }
import akka.persistence.PersistentImpl
abstract class AsyncReplay extends SAsyncReplay { this: Actor
import context.dispatcher
final def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentImpl) Unit) =
doReplayAsync(processorId, fromSequenceNr, toSequenceNr, new Procedure[PersistentImpl] {
def apply(p: PersistentImpl) = replayCallback(p)
}).map(_.longValue)
/**
* Plugin Java API.
*
* Asynchronously replays persistent messages. Implementations replay a message
* by calling `replayCallback`. The returned future must be completed when all
* messages (matching the sequence number bounds) have been replayed. The future
* `Long` value must be the highest stored sequence number in the journal for the
* specified processor. The future must be completed with a failure if any of
* the persistent messages could not be replayed.
*
* The `replayCallback` must also be called with messages that have been marked
* as deleted. In this case a replayed message's `deleted` field must be set to
* `true`.
*
* The channel ids of delivery confirmations that are available for a replayed
* message must be contained in that message's `confirms` sequence.
*
* @param processorId processor id.
* @param fromSequenceNr sequence number where replay should start.
* @param toSequenceNr sequence number where replay should end (inclusive).
* @param replayCallback called to replay a single message.
*/
def doReplayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, replayCallback: Procedure[PersistentImpl]): Future[JLong]
}

View file

@ -0,0 +1,49 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.journal.japi
import scala.concurrent.Future
import akka.persistence.journal.{ AsyncWriteJournal SAsyncWriteJournal }
import akka.persistence.PersistentImpl
/**
* Java API.
*
* Abstract journal, optimized for asynchronous, non-blocking writes.
*/
abstract class AsyncWriteJournal extends AsyncReplay with SAsyncWriteJournal {
import context.dispatcher
final def writeAsync(persistent: PersistentImpl) =
doWriteAsync(persistent).map(Unit.unbox)
final def deleteAsync(persistent: PersistentImpl) =
doDeleteAsync(persistent).map(Unit.unbox)
final def confirmAsync(processorId: String, sequenceNr: Long, channelId: String) =
doConfirmAsync(processorId, sequenceNr, channelId).map(Unit.unbox)
/**
* Plugin Java API.
*
* Asynchronously writes a `persistent` message to the journal.
*/
def doWriteAsync(persistent: PersistentImpl): Future[Void]
/**
* Plugin Java API.
*
* Asynchronously marks a `persistent` message as deleted.
*/
def doDeleteAsync(persistent: PersistentImpl): Future[Void]
/**
* Plugin Java API.
*
* Asynchronously writes a delivery confirmation to the journal.
*/
def doConfirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Void]
}

View file

@ -0,0 +1,48 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.journal.japi
import akka.persistence.journal.{ SyncWriteJournal SSyncWriteJournal }
import akka.persistence.PersistentImpl
/**
* Java API.
*
* Abstract journal, optimized for synchronous writes.
*/
abstract class SyncWriteJournal extends AsyncReplay with SSyncWriteJournal {
final def write(persistent: PersistentImpl) =
doWrite(persistent)
final def delete(persistent: PersistentImpl) =
doDelete(persistent)
final def confirm(processorId: String, sequenceNr: Long, channelId: String) =
doConfirm(processorId, sequenceNr, channelId)
/**
* Plugin Java API.
*
* Synchronously writes a `persistent` message to the journal.
*/
@throws(classOf[Exception])
def doWrite(persistent: PersistentImpl): Unit
/**
* Plugin Java API.
*
* Synchronously marks a `persistent` message as deleted.
*/
@throws(classOf[Exception])
def doDelete(persistent: PersistentImpl): Unit
/**
* Plugin Java API.
*
* Synchronously writes a delivery confirmation to the journal.
*/
@throws(classOf[Exception])
def doConfirm(processorId: String, sequenceNr: Long, channelId: String): Unit
}

View file

@ -4,12 +4,12 @@
package akka.persistence.journal.leveldb
import akka.actor.Actor
import org.iq80.leveldb.DBIterator
import akka.actor.Actor
/**
* Persistent mapping of `String`-based processor and channel ids to numeric ids.
* LevelDB backed persistent mapping of `String`-based processor and channel ids to numeric ids.
*/
private[persistence] trait LeveldbIdMapping extends Actor { this: LeveldbJournal
import Key._

View file

@ -6,54 +6,25 @@ package akka.persistence.journal.leveldb
import java.io.File
import scala.util._
import org.iq80.leveldb._
import com.typesafe.config.Config
import akka.actor._
import akka.pattern.PromiseActorRef
import akka.persistence._
import akka.serialization.{ Serialization, SerializationExtension }
import akka.persistence.journal.SyncWriteJournal
import akka.serialization.SerializationExtension
/**
* LevelDB journal settings.
* INTERNAL API.
*
* LevelDB backed journal.
*/
private[persistence] class LeveldbJournalSettings(config: Config) extends JournalFactory {
/**
* Name of directory where journal files shall be stored. Can be a relative or absolute path.
*/
val journalDir: File = new File(config.getString("dir"))
/**
* Verify checksums on read.
*/
val checksum = false
/**
* Synchronous writes to disk.
*/
val fsync: Boolean = config.getBoolean("fsync")
/**
* Creates a new LevelDB journal actor from this configuration object.
*/
def createJournal(implicit factory: ActorRefFactory): ActorRef =
factory.actorOf(Props(classOf[LeveldbJournal], this).withDispatcher("akka.persistence.journal.leveldb.write.dispatcher"))
}
/**
* LevelDB journal.
*/
private[persistence] class LeveldbJournal(val settings: LeveldbJournalSettings) extends Actor with LeveldbIdMapping with LeveldbReplay {
val extension = Persistence(context.system)
private[leveldb] class LeveldbJournal extends SyncWriteJournal with LeveldbIdMapping with LeveldbReplay {
val config = context.system.settings.config.getConfig("akka.persistence.journal.leveldb")
val leveldbOptions = new Options().createIfMissing(true).compressionType(CompressionType.NONE)
val leveldbReadOptions = new ReadOptions().verifyChecksums(settings.checksum)
val leveldbWriteOptions = new WriteOptions().sync(settings.fsync)
val leveldbReadOptions = new ReadOptions().verifyChecksums(config.getBoolean("checksum"))
val leveldbWriteOptions = new WriteOptions().sync(config.getBoolean("fsync"))
val leveldbDir = new File(config.getString("dir"))
val leveldbDir = settings.journalDir
val leveldbFactory = org.iq80.leveldb.impl.Iq80DBFactory.factory
var leveldb: DB = _
@ -65,40 +36,20 @@ private[persistence] class LeveldbJournal(val settings: LeveldbJournalSettings)
// TODO: use user-defined serializer for payload
val serializer = SerializationExtension(context.system).findSerializerFor("")
import Journal._
import Key._
import context.dispatcher
def receive = {
case Write(persistent, processor) {
val persisted = withBatch { batch
val sdr = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender
def write(persistent: PersistentImpl) = withBatch { batch
val nid = numericId(persistent.processorId)
val prepared = persistent.copy(sender = Serialization.serializedActorPath(sdr))
batch.put(keyToBytes(counterKey(nid)), counterToBytes(prepared.sequenceNr))
batch.put(keyToBytes(Key(nid, prepared.sequenceNr, 0)), persistentToBytes(prepared.copy(resolved = false, confirmTarget = null, confirmMessage = null)))
prepared
batch.put(keyToBytes(counterKey(nid)), counterToBytes(persistent.sequenceNr))
batch.put(keyToBytes(Key(nid, persistent.sequenceNr, 0)), persistentToBytes(persistent))
}
processor.tell(Written(persisted), sender)
}
case c @ Confirm(processorId, sequenceNr, channelId) {
leveldb.put(keyToBytes(Key(numericId(processorId), sequenceNr, numericId(channelId))), channelId.getBytes("UTF-8"))
context.system.eventStream.publish(c) // TODO: turn off by default and allow to turn on by configuration
}
case Delete(persistent: PersistentImpl) {
def delete(persistent: PersistentImpl) {
leveldb.put(keyToBytes(deletionKey(numericId(persistent.processorId), persistent.sequenceNr)), Array.empty[Byte])
}
case Loop(message, processor) {
processor.tell(Looped(message), sender)
}
case Replay(fromSequenceNr, toSequenceNr, processor, processorId) {
val maxSnr = maxSequenceNr(processorId)
replayAsync(fromSequenceNr, toSequenceNr, processor, processorId) onComplete {
case Success(_) processor ! ReplayCompleted(maxSnr)
case Failure(e) // TODO: send RecoveryFailed to processor
}
}
def confirm(processorId: String, sequenceNr: Long, channelId: String) {
leveldb.put(keyToBytes(Key(numericId(processorId), sequenceNr, numericId(channelId))), channelId.getBytes("UTF-8"))
}
def leveldbSnapshot = leveldbReadOptions.snapshot(leveldb.getSnapshot)
@ -107,7 +58,7 @@ private[persistence] class LeveldbJournal(val settings: LeveldbJournalSettings)
def persistentToBytes(p: PersistentImpl): Array[Byte] = serializer.toBinary(p)
def persistentFromBytes(a: Array[Byte]): PersistentImpl = serializer.fromBinary(a).asInstanceOf[PersistentImpl]
def withBatch[R](body: WriteBatch R): R = {
private def withBatch[R](body: WriteBatch R): R = {
val batch = leveldb.createWriteBatch()
try {
val r = body(batch)
@ -118,21 +69,13 @@ private[persistence] class LeveldbJournal(val settings: LeveldbJournalSettings)
}
}
def maxSequenceNr(processorId: String) = {
leveldb.get(keyToBytes(counterKey(numericId(processorId))), leveldbSnapshot) match {
case null 0L
case bytes counterFromBytes(bytes)
}
}
override def preStart() {
leveldb = leveldbFactory.open(leveldbDir, leveldbOptions)
super.preStart()
}
override def postStop() {
super.postStop()
leveldb.close()
super.postStop()
}
}

View file

@ -6,26 +6,26 @@ package akka.persistence.journal.leveldb
import scala.concurrent.Future
import org.iq80.leveldb.DBIterator
import akka.actor._
import akka.persistence._
import akka.persistence.Journal._
import akka.persistence.journal.AsyncReplay
/**
* Asynchronous replay support.
* LevelDB backed message replay.
*/
private[persistence] trait LeveldbReplay extends Actor { this: LeveldbJournal
private[persistence] trait LeveldbReplay extends AsyncReplay { this: LeveldbJournal
import Key._
private val executionContext = context.system.dispatchers.lookup("akka.persistence.journal.leveldb.replay.dispatcher")
private val replayDispatcherId = context.system.settings.config.getString("akka.persistence.journal.leveldb.replay-dispatcher")
private val replayDispatcher = context.system.dispatchers.lookup(replayDispatcherId)
def replayAsync(fromSequenceNr: Long, toSequenceNr: Long, processor: ActorRef, processorId: String): Future[Unit] =
Future(replay(fromSequenceNr: Long, toSequenceNr, processor, numericId(processorId), leveldbIterator))(executionContext)
def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: PersistentImpl Unit): Future[Long] =
Future(replay(numericId(processorId), fromSequenceNr: Long, toSequenceNr)(replayCallback))(replayDispatcher)
private def replay(processorId: Int, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: PersistentImpl Unit): Long = {
val iter = leveldbIterator
private def replay(fromSequenceNr: Long, toSequenceNr: Long, processor: ActorRef, processorId: Int, iter: DBIterator): Unit = {
@scala.annotation.tailrec
def go(key: Key)(callback: PersistentImpl Unit) {
def go(key: Key, replayCallback: PersistentImpl Unit) {
if (iter.hasNext) {
val nextEntry = iter.next()
val nextKey = keyFromBytes(nextEntry.getKey)
@ -33,13 +33,13 @@ private[persistence] trait LeveldbReplay extends Actor { this: LeveldbJournal
// end iteration here
} else if (nextKey.channelId != 0) {
// phantom confirmation (just advance iterator)
go(nextKey)(callback)
go(nextKey, replayCallback)
} else if (key.processorId == nextKey.processorId) {
val msg = persistentFromBytes(nextEntry.getValue)
val del = deletion(nextKey)
val cnf = confirms(nextKey, Nil)
if (!del) callback(msg.copy(confirms = cnf))
go(nextKey)(callback)
replayCallback(msg.copy(confirms = cnf, deleted = del))
go(nextKey, replayCallback)
}
}
}
@ -71,9 +71,17 @@ private[persistence] trait LeveldbReplay extends Actor { this: LeveldbJournal
try {
val startKey = Key(processorId, if (fromSequenceNr < 1L) 1L else fromSequenceNr, 0)
iter.seek(keyToBytes(startKey))
go(startKey) { m processor.tell(Replayed(m), extension.system.provider.resolveActorRef(m.sender)) }
go(startKey, replayCallback)
maxSequenceNr(processorId)
} finally {
iter.close()
}
}
def maxSequenceNr(processorId: Int) = {
leveldb.get(keyToBytes(counterKey(processorId)), leveldbSnapshot) match {
case null 0L
case bytes counterFromBytes(bytes)
}
}
}

View file

@ -0,0 +1,57 @@
/**
* Copyright (C) 2012-2013 Eligotech BV.
*/
package akka.persistence.snapshot
import java.io._
import akka.actor._
import akka.persistence.SnapshotMetadata
import akka.util.ClassLoaderObjectInputStream
/**
* Snapshot serialization extension.
*/
private[persistence] object SnapshotSerialization extends ExtensionId[SnapshotSerialization] with ExtensionIdProvider {
def createExtension(system: ExtendedActorSystem): SnapshotSerialization = new SnapshotSerialization(system)
def lookup() = SnapshotSerialization
}
/**
* Snapshot serialization extension.
*/
private[persistence] class SnapshotSerialization(val system: ExtendedActorSystem) extends Extension {
import akka.serialization.JavaSerializer
/**
* Java serialization based snapshot serializer.
*/
val java = new SnapshotSerializer {
def serialize(stream: OutputStream, metadata: SnapshotMetadata, state: Any) = {
val out = new ObjectOutputStream(stream)
JavaSerializer.currentSystem.withValue(system) { out.writeObject(state) }
}
def deserialize(stream: InputStream, metadata: SnapshotMetadata) = {
val in = new ClassLoaderObjectInputStream(system.dynamicAccess.classLoader, stream)
JavaSerializer.currentSystem.withValue(system) { in.readObject }
}
}
}
/**
* Stream-based snapshot serializer.
*/
private[persistence] trait SnapshotSerializer {
/**
* Serializes a `snapshot` to an output stream.
*/
def serialize(stream: OutputStream, metadata: SnapshotMetadata, snapshot: Any): Unit
/**
* Deserializes a snapshot from an input stream.
*/
def deserialize(stream: InputStream, metadata: SnapshotMetadata): Any
}

View file

@ -0,0 +1,88 @@
/**
* Copyright (C) 2012-2013 Eligotech BV.
*/
package akka.persistence.snapshot
import scala.concurrent.Future
import scala.util._
import akka.actor._
import akka.pattern.pipe
import akka.persistence._
/**
* Abstract snapshot store.
*/
trait SnapshotStore extends Actor {
import SnapshotProtocol._
import context.dispatcher
final def receive = {
case LoadSnapshot(processorId, criteria, toSequenceNr) {
val p = sender
loadAsync(processorId, criteria.limit(toSequenceNr)) map {
sso LoadSnapshotResult(sso, toSequenceNr)
} recover {
case e LoadSnapshotResult(None, toSequenceNr)
} pipeTo (p)
}
case SaveSnapshot(metadata, snapshot) {
val p = sender
val md = metadata.copy(timestamp = System.currentTimeMillis)
saveAsync(md, snapshot) map {
_ SaveSnapshotSuccess(md)
} recover {
case e SaveSnapshotFailure(metadata, e)
} to (self, p)
}
case evt @ SaveSnapshotSuccess(metadata) {
saved(metadata)
sender ! evt // sender is processor
}
case evt @ SaveSnapshotFailure(metadata, _) {
delete(metadata)
sender ! evt // sender is processor
}
}
//#snapshot-store-plugin-api
/**
* Plugin API.
*
* Asynchronously loads a snapshot.
*
* @param processorId processor id.
* @param criteria selection criteria for loading.
*/
def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]]
/**
* Plugin API.
*
* Asynchronously saves a snapshot.
*
* @param metadata snapshot metadata.
* @param snapshot snapshot.
*/
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit]
/**
* Plugin API.
*
* Called after successful saving of a snapshot.
*
* @param metadata snapshot metadata.
*/
def saved(metadata: SnapshotMetadata)
/**
* Plugin API.
*
* Deletes the snapshot identified by `metadata`.
*
* @param metadata snapshot metadata.
*/
def delete(metadata: SnapshotMetadata)
//#snapshot-store-plugin-api
}

View file

@ -0,0 +1,67 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.snapshot.japi
import scala.concurrent.Future
import akka.japi.{ Option JOption }
import akka.persistence._
import akka.persistence.snapshot.{ SnapshotStore SSnapshotStore }
abstract class SnapshotStore extends SSnapshotStore {
import context.dispatcher
final def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria) =
doLoadAsync(processorId, criteria).map(_.asScala)
final def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] =
doSaveAsync(metadata, snapshot).map(Unit.unbox)
final def saved(metadata: SnapshotMetadata) =
onSaved(metadata)
final def delete(metadata: SnapshotMetadata) =
doDelete(metadata)
/**
* Plugin Java API.
*
* Asynchronously loads a snapshot.
*
* @param processorId processor id.
* @param criteria selection criteria for loading.
*/
def doLoadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[JOption[SelectedSnapshot]]
/**
* Plugin Java API.
*
* Asynchronously saves a snapshot.
*
* @param metadata snapshot metadata.
* @param snapshot snapshot.
*/
def doSaveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Void]
/**
* Plugin Java API.
*
* Called after successful saving of a snapshot.
*
* @param metadata snapshot metadata.
*/
@throws(classOf[Exception])
def onSaved(metadata: SnapshotMetadata): Unit
/**
* Plugin Java API.
*
* Deletes the snapshot identified by `metadata`.
*
* @param metadata snapshot metadata.
*/
@throws(classOf[Exception])
def doDelete(metadata: SnapshotMetadata): Unit
}

View file

@ -7,83 +7,58 @@ package akka.persistence.snapshot.local
import java.io._
import java.net.{ URLDecoder, URLEncoder }
import scala.collection.SortedSet
import scala.concurrent._
import scala.collection.immutable.SortedSet
import scala.concurrent.Future
import scala.util._
import com.typesafe.config.Config
import akka.actor._
import akka.actor.ActorLogging
import akka.persistence._
import akka.persistence.snapshot._
/**
* [[LocalSnapshotStore]] settings.
* INTERNAL API.
*
* Local filesystem backed snapshot store.
*/
private[persistence] class LocalSnapshotStoreSettings(config: Config) extends SnapshotStoreFactory {
/**
* Name of directory where snapshot files shall be stored.
*/
val snapshotDir: File = new File(config.getString("dir"))
private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLogging {
private val FilenamePattern = """^snapshot-(.+)-(\d+)-(\d+)""".r
/**
* Creates a new snapshot store actor.
*/
def createSnapshotStore(implicit factory: ActorRefFactory): ActorRef =
factory.actorOf(Props(classOf[LocalSnapshotStore], this))
}
/**
* Snapshot store that stores snapshots on local filesystem.
*/
private[persistence] class LocalSnapshotStore(settings: LocalSnapshotStoreSettings) extends Actor with ActorLogging {
private implicit val executionContext = context.system.dispatchers.lookup("akka.persistence.snapshot-store.local.io.dispatcher")
private val config = context.system.settings.config.getConfig("akka.persistence.snapshot-store.local")
private val streamDispatcher = context.system.dispatchers.lookup(config.getString("stream-dispatcher"))
private val snapshotDir = new File(config.getString("dir"))
// TODO: make snapshot access configurable
// TODO: make snapshot serializer configurable
private val snapshotDir = settings.snapshotDir
private val snapshotAccess = new LocalSnapshotAccess(snapshotDir)
private val snapshotSerializer = SnapshotSerialization(context.system).java
private var snapshotMetadata = Map.empty[String, SortedSet[SnapshotMetadata]]
var snapshotMetadata = Map.empty[String, SortedSet[SnapshotMetadata]]
def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] =
Future(load(processorId, criteria))(streamDispatcher)
import SnapshotStore._
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] =
Future(save(metadata, snapshot))(streamDispatcher)
def receive = {
case LoadSnapshot(processorId, criteria, toSequenceNr) {
val p = sender
loadSnapshotAsync(processorId, criteria.limit(toSequenceNr)) onComplete {
case Success(sso) p ! LoadSnapshotCompleted(sso, toSequenceNr)
case Failure(_) p ! LoadSnapshotCompleted(None, toSequenceNr)
}
}
case SaveSnapshot(metadata, snapshot) {
val p = sender
val md = metadata.copy(timestamp = System.currentTimeMillis)
saveSnapshotAsync(md, snapshot) onComplete {
case Success(_) self tell (SaveSnapshotSucceeded(md), p)
case Failure(e) self tell (SaveSnapshotFailed(metadata, e), p)
}
}
case evt @ SaveSnapshotSucceeded(metadata) {
updateMetadata(metadata)
sender ! evt // sender is processor
}
case evt @ SaveSnapshotFailed(metadata, reason) {
deleteSnapshot(metadata)
sender ! evt // sender is processor
}
def saved(metadata: SnapshotMetadata) {
snapshotMetadata = snapshotMetadata + (snapshotMetadata.get(metadata.processorId) match {
case Some(mds) metadata.processorId -> (mds + metadata)
case None metadata.processorId -> SortedSet(metadata)
})
}
def loadSnapshotAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SavedSnapshot]] =
Future(loadSnapshot(processorId, criteria))
def delete(metadata: SnapshotMetadata): Unit = {
snapshotMetadata = snapshotMetadata.get(metadata.processorId) match {
case Some(mds) snapshotMetadata + (metadata.processorId -> (mds - metadata))
case None snapshotMetadata
}
snapshotFile(metadata).delete()
}
def loadSnapshot(processorId: String, criteria: SnapshotSelectionCriteria): Option[SavedSnapshot] = {
private def load(processorId: String, criteria: SnapshotSelectionCriteria): Option[SelectedSnapshot] = {
@scala.annotation.tailrec
def load(metadata: SortedSet[SnapshotMetadata]): Option[SavedSnapshot] = metadata.lastOption match {
def load(metadata: SortedSet[SnapshotMetadata]): Option[SelectedSnapshot] = metadata.lastOption match {
case None None
case Some(md) {
Try(snapshotAccess.withInputStream(md)(snapshotSerializer.deserialize(_, md))) match {
case Success(ss) Some(SavedSnapshot(md, ss))
Try(withInputStream(md)(snapshotSerializer.deserialize(_, md))) match {
case Success(s) Some(SelectedSnapshot(md, s))
case Failure(e) {
log.error(e, s"error loading snapshot ${md}")
load(metadata.init) // try older snapshot
@ -100,56 +75,21 @@ private[persistence] class LocalSnapshotStore(settings: LocalSnapshotStoreSettin
// succeed.
//
// TODO: make number of loading attempts configurable
// TODO: improve heuristics for remote snapshot loading
for {
mds snapshotMetadata.get(processorId)
md load(mds.filter(md
md load(metadata(processorId).filter(md
md.sequenceNr <= criteria.maxSequenceNr &&
md.timestamp <= criteria.maxTimestamp).takeRight(3))
} yield md
}
def saveSnapshotAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] =
Future(saveSnapshot(metadata, snapshot))
private def save(metadata: SnapshotMetadata, snapshot: Any): Unit =
withOutputStream(metadata)(snapshotSerializer.serialize(_, metadata, snapshot))
private def saveSnapshot(metadata: SnapshotMetadata, snapshot: Any): Unit =
snapshotAccess.withOutputStream(metadata)(snapshotSerializer.serialize(_, metadata, snapshot))
def deleteSnapshot(metadata: SnapshotMetadata): Unit =
snapshotAccess.delete(metadata)
def updateMetadata(metadata: SnapshotMetadata): Unit = {
snapshotMetadata = snapshotMetadata + (snapshotMetadata.get(metadata.processorId) match {
case Some(mds) metadata.processorId -> (mds + metadata)
case None metadata.processorId -> SortedSet(metadata)
})
}
override def preStart() {
if (!snapshotDir.exists) snapshotDir.mkdirs()
snapshotMetadata = SortedSet.empty ++ snapshotAccess.metadata groupBy (_.processorId)
super.preStart()
}
}
/**
* Access to snapshot files on local filesystem.
*/
private[persistence] class LocalSnapshotAccess(snapshotDir: File) extends SnapshotAccess {
private val FilenamePattern = """^snapshot-(.+)-(\d+)-(\d+)""".r
def metadata: Set[SnapshotMetadata] = snapshotDir.listFiles.map(_.getName).collect {
case FilenamePattern(pid, snr, tms) SnapshotMetadata(URLDecoder.decode(pid, "UTF-8"), snr.toLong, tms.toLong)
}.toSet
def delete(metadata: SnapshotMetadata): Unit =
snapshotFile(metadata).delete()
def withOutputStream(metadata: SnapshotMetadata)(p: (OutputStream) Unit) =
private def withOutputStream(metadata: SnapshotMetadata)(p: (OutputStream) Unit) =
withStream(new BufferedOutputStream(new FileOutputStream(snapshotFile(metadata))), p)
def withInputStream(metadata: SnapshotMetadata)(p: (InputStream) Any) =
private def withInputStream(metadata: SnapshotMetadata)(p: (InputStream) Any) =
withStream(new BufferedInputStream(new FileInputStream(snapshotFile(metadata))), p)
private def withStream[A <: Closeable, B](stream: A, p: A B): B =
@ -157,4 +97,17 @@ private[persistence] class LocalSnapshotAccess(snapshotDir: File) extends Snapsh
private def snapshotFile(metadata: SnapshotMetadata): File =
new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.processorId, "UTF-8")}-${metadata.sequenceNr}-${metadata.timestamp}")
private def metadata(processorId: String): SortedSet[SnapshotMetadata] =
snapshotMetadata.getOrElse(processorId, SortedSet.empty)
private def metadata: Seq[SnapshotMetadata] = snapshotDir.listFiles.map(_.getName).collect {
case FilenamePattern(pid, snr, tms) SnapshotMetadata(URLDecoder.decode(pid, "UTF-8"), snr.toLong, tms.toLong)
}
override def preStart() {
if (!snapshotDir.exists) snapshotDir.mkdirs()
snapshotMetadata = SortedSet.empty ++ metadata groupBy (_.processorId)
super.preStart()
}
}

View file

@ -1,17 +1,15 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import com.typesafe.config._
import akka.actor._
import akka.testkit._
object ChannelSpec {
val config =
"""
|serialize-creators = on
|serialize-messages = on
|akka.persistence.journal.leveldb.dir = "target/journal-channel-spec"
|akka.persistence.snapshot-store.local.dir = ${akka.persistence.journal.leveldb.dir}/snapshots
""".stripMargin
class TestProcessor(name: String) extends NamedProcessor(name) {
val destination = context.actorOf(Props[TestDestination])
val channel = context.actorOf(Channel.props("channel"))
@ -42,7 +40,7 @@ object ChannelSpec {
}
}
class ChannelSpec extends AkkaSpec(ChannelSpec.config) with PersistenceSpec with ImplicitSender {
abstract class ChannelSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
import ChannelSpec._
override protected def beforeEach() {
@ -120,3 +118,6 @@ class ChannelSpec extends AkkaSpec(ChannelSpec.config) with PersistenceSpec with
}
}
}
class LeveldbChannelSpec extends ChannelSpec(PersistenceSpec.config("leveldb", "channel"))
class InmemChannelSpec extends ChannelSpec(PersistenceSpec.config("inmem", "channel"))

View file

@ -1,3 +1,7 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import java.io.File
@ -5,6 +9,8 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.reflect.ClassTag
import com.typesafe.config.ConfigFactory
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfterEach
@ -38,9 +44,22 @@ trait PersistenceSpec extends BeforeAndAfterEach { this: AkkaSpec ⇒
}
override protected def afterTermination() {
FileUtils.deleteDirectory(new File(system.settings.config.getString("akka.persistence.journal.leveldb.dir")))
List("akka.persistence.journal.leveldb.dir", "akka.persistence.snapshot-store.local.dir") foreach { s
FileUtils.deleteDirectory(new File(system.settings.config.getString(s)))
}
}
}
object PersistenceSpec {
def config(plugin: String, test: String) = ConfigFactory.parseString(
s"""
|serialize-creators = on
|serialize-messages = on
|akka.persistence.journal.plugin = "akka.persistence.journal.${plugin}"
|akka.persistence.journal.leveldb.dir = "target/journal-${test}-spec"
|akka.persistence.snapshot-store.local.dir = "target/snapshots-${test}-spec/"
""".stripMargin)
}
abstract class NamedProcessor(name: String) extends Processor {
override def processorId: String = name

View file

@ -1,17 +1,15 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import com.typesafe.config._
import akka.actor._
import akka.testkit._
object ProcessorSpec {
val config =
"""
|serialize-creators = on
|serialize-messages = on
|akka.persistence.journal.leveldb.dir = "target/journal-processor-spec"
|akka.persistence.snapshot-store.local.dir = ${akka.persistence.journal.leveldb.dir}/snapshots
""".stripMargin
class RecoverTestProcessor(name: String) extends NamedProcessor(name) {
var state = List.empty[String]
def receive = {
@ -128,7 +126,7 @@ object ProcessorSpec {
}
}
class ProcessorSpec extends AkkaSpec(ProcessorSpec.config) with PersistenceSpec with ImplicitSender {
abstract class ProcessorSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
import ProcessorSpec._
override protected def beforeEach() {
@ -296,3 +294,6 @@ class ProcessorSpec extends AkkaSpec(ProcessorSpec.config) with PersistenceSpec
}
}
}
class LeveldbProcessorSpec extends ProcessorSpec(PersistenceSpec.config("leveldb", "processor"))
class InmemProcessorSpec extends ProcessorSpec(PersistenceSpec.config("inmem", "processor"))

View file

@ -1,17 +1,15 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import com.typesafe.config._
import akka.actor._
import akka.testkit._
object ProcessorStashSpec {
val config =
"""
|serialize-creators = on
|serialize-messages = on
|akka.persistence.journal.leveldb.dir = "target/journal-processor-stash-spec"
|akka.persistence.snapshot-store.local.dir = ${akka.persistence.journal.leveldb.dir}/snapshots
""".stripMargin
class StashingProcessor(name: String) extends NamedProcessor(name) {
var state: List[String] = Nil
@ -48,7 +46,7 @@ object ProcessorStashSpec {
}
}
class ProcessorStashSpec extends AkkaSpec(ProcessorStashSpec.config) with PersistenceSpec with ImplicitSender {
abstract class ProcessorStashSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
import ProcessorStashSpec._
"A processor" must {
@ -120,3 +118,6 @@ class ProcessorStashSpec extends AkkaSpec(ProcessorStashSpec.config) with Persis
}
}
}
class LeveldbProcessorStashSpec extends ProcessorStashSpec(PersistenceSpec.config("leveldb", "processor-stash"))
class InmemProcessorStashSpec extends ProcessorStashSpec(PersistenceSpec.config("inmem", "processor-stash"))

View file

@ -8,14 +8,6 @@ import akka.actor._
import akka.testkit._
object SnapshotSpec {
val config =
"""
|serialize-creators = on
|serialize-messages = on
|akka.persistence.journal.leveldb.dir = "target/journal-snapshot-spec"
|akka.persistence.snapshot-store.local.dir = ${akka.persistence.journal.leveldb.dir}/snapshots
""".stripMargin
case object TakeSnapshot
class SaveSnapshotTestProcessor(name: String, probe: ActorRef) extends NamedProcessor(name) {
@ -23,7 +15,7 @@ object SnapshotSpec {
def receive = {
case Persistent(payload, snr) state = s"${payload}-${snr}" :: state
case TakeSnapshot saveSnapshot(state)
case SaveSnapshotSucceeded(md) probe ! md.sequenceNr
case SaveSnapshotSuccess(md) probe ! md.sequenceNr
case GetState probe ! state.reverse
}
}
@ -38,7 +30,7 @@ object SnapshotSpec {
}
}
class SnapshotSpec extends AkkaSpec(SnapshotSpec.config) with PersistenceSpec with ImplicitSender {
class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "snapshot")) with PersistenceSpec with ImplicitSender {
import SnapshotSpec._
override protected def beforeEach() {

View file

@ -18,8 +18,8 @@ object SnapshotExample extends App {
def receive = {
case Persistent(s, snr) state = state.update(s"${s}-${snr}")
case SaveSnapshotSucceeded(metadata) // ...
case SaveSnapshotFailed(metadata, reason) // ...
case SaveSnapshotSuccess(metadata) // ...
case SaveSnapshotFailure(metadata, reason) // ...
case SnapshotOffer(_, s: ExampleState) println("offered state = " + s); state = s
case "print" println("current state = " + state)
case "snap" saveSnapshot(state)