!per #3631 Snapshotting

- capture and save snapshots of processor state
- start processor recovery from saved snapshots
- snapshot storage on local filesystem
- snapshot store completely isolated from journal
- LevelDB journal modularized (and completely re-rwritten)
- In-memory journal removed
This commit is contained in:
Martin Krasser 2013-09-26 09:14:43 +02:00
parent c55189f615
commit 842ac672f7
34 changed files with 1348 additions and 477 deletions

View file

@ -19,9 +19,9 @@ Johannes Rudolph Committer
Raymond Roestenburg Committer
Piotr Gabryanczyk Committer
Helena Edelson Committer
Martin Krasser Committer
Henrik Engström Alumnus
Peter Vlugter Alumnus
Martin Krasser Alumnus
Derek Williams Alumnus
Debasish Ghosh Alumnus
Ross McDonald Alumnus

View file

@ -301,6 +301,11 @@ Martin Krasser has written an implementation of event sourcing principles on
top of Akka called `eventsourced`_, including support for guaranteed delivery
semantics as described in the previous section.
A successor of `eventsourced` is now part of Akka (see :ref:`persistence`) which
is a general solution for actor state persistence. It journals messages before
they are received by an actor and can be used to implement both event sourcing
and command sourcing.
.. _Event Sourcing: http://martinfowler.com/eaaDev/EventSourcing.html
.. _eventsourced: https://github.com/eligosource/eventsourced

View file

@ -88,7 +88,7 @@ public class PersistenceDocTest {
@Override
public void preRestart(Throwable reason, Option<Object> message) {
if (message.isDefined() && message.get() instanceof Persistent) {
delete((Persistent) message.get());
deleteMessage((Persistent) message.get());
}
super.preRestart(reason, message);
}
@ -169,4 +169,62 @@ public class PersistenceDocTest {
}
}
};
static Object o4 = new Object() {
//#save-snapshot
class MyProcessor extends UntypedProcessor {
private Object state;
@Override
public void onReceive(Object message) throws Exception {
if (message.equals("snap")) {
saveSnapshot(state);
} else if (message instanceof SaveSnapshotSucceeded) {
SnapshotMetadata metadata = ((SaveSnapshotSucceeded)message).metadata();
// ...
} else if (message instanceof SaveSnapshotFailed) {
SnapshotMetadata metadata = ((SaveSnapshotFailed)message).metadata();
// ...
}
}
}
//#save-snapshot
};
static Object o5 = new Object() {
//#snapshot-offer
class MyProcessor extends UntypedProcessor {
private Object state;
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof SnapshotOffer) {
state = ((SnapshotOffer)message).snapshot();
// ...
} else if (message instanceof Persistent) {
// ...
}
}
}
//#snapshot-offer
class MyActor extends UntypedActor {
ActorRef processor;
public MyActor() {
processor = getContext().actorOf(Props.create(MyProcessor.class));
}
public void onReceive(Object message) throws Exception {
// ...
}
private void recover() {
//#snapshot-criteria
processor.tell(Recover.create(SnapshotSelectionCriteria.create(457L, System.currentTimeMillis())), null);
//#snapshot-criteria
}
}
};
}

View file

@ -50,7 +50,12 @@ Configuration
By default, journaled messages are written to a directory named ``journal`` in the current working directory. This
can be changed by configuration where the specified path can be relative or absolute:
.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#config
.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#journal-config
The default storage location of :ref:`snapshots-java` is a directory named ``snapshots`` in the current working directory.
This can be changed by configuration where the specified path can be relative or absolute:
.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#snapshot-config
.. _processors-java:
@ -214,11 +219,40 @@ The sequence number of a ``Persistent`` message can be obtained via its ``sequen
messages are assigned sequence numbers on a per-processor basis. A sequence starts at ``1L`` and doesn't contain
gaps unless a processor marks a message as deleted.
.. _snapshots-java:
Snapshots
=========
Snapshots can dramatically reduce recovery times. Processors can save snapshots of internal state by calling the
``saveSnapshot`` method on ``Processor``. If saving of a snapshot succeeds, the processor will receive a
``SaveSnapshotSucceeded`` message, otherwise a ``SaveSnapshotFailed`` message.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#save-snapshot
During recovery, the processor is offered a previously saved snapshot via a ``SnapshotOffer`` message from
which it can initialize internal state.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#snapshot-offer
The replayed messages that follow the ``SnapshotOffer`` message, if any, are younger than the offered snapshot.
They finally recover the processor to its current (i.e. latest) state.
In general, a processor is only offered a snapshot if that processor has previously saved one or more snapshots
and at least one of these snapshots matches the ``SnapshotSelectionCriteria`` that can be specified for recovery.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#snapshot-criteria
If not specified, they default to ``SnapshotSelectionCriteria.latest()`` which selects the latest (= youngest) snapshot.
To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.none()``. A recovery where no
saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages.
Upcoming features
=================
* Snapshot based recovery
* Configurable serialization
* Reliable channels
* Journal plugin API
* Snapshot store plugin API
* Reliable channels
* Custom serialization of messages and snapshots
* Extended deletion of messages and snapshots
* ...

View file

@ -1,16 +1,20 @@
package docs.persistence
import akka.actor.ActorSystem
import akka.persistence.{ Recover, Persistent, Processor }
import akka.testkit.{ ImplicitSender, AkkaSpec }
import akka.persistence._
import akka.persistence.SaveSnapshotSucceeded
import scala.Some
trait PersistenceDocSpec {
val system: ActorSystem
val config =
"""
//#config
//#journal-config
akka.persistence.journal.leveldb.dir = "target/journal"
//#config
//#journal-config
//#snapshot-config
akka.persistence.snapshot-store.local.dir = "target/snapshots"
//#snapshot-config
"""
import system._
@ -63,7 +67,7 @@ trait PersistenceDocSpec {
//#deletion
override def preRestart(reason: Throwable, message: Option[Any]) {
message match {
case Some(p: Persistent) delete(p)
case Some(p: Persistent) deleteMessage(p)
case _
}
super.preRestart(reason, message)
@ -184,4 +188,41 @@ trait PersistenceDocSpec {
}
//#fsm-example
}
new AnyRef {
//#save-snapshot
class MyProcessor extends Processor {
var state: Any = _
def receive = {
case "snap" saveSnapshot(state)
case SaveSnapshotSucceeded(metadata) // ...
case SaveSnapshotFailed(metadata, reason) // ...
}
}
//#save-snapshot
}
new AnyRef {
//#snapshot-offer
class MyProcessor extends Processor {
var state: Any = _
def receive = {
case SnapshotOffer(metadata, offeredSnapshot) state = offeredSnapshot
case Persistent(payload, sequenceNr) // ...
}
}
//#snapshot-offer
import akka.actor.Props
val processor = system.actorOf(Props[MyProcessor])
//#snapshot-criteria
processor ! Recover(fromSnapshot = SnapshotSelectionCriteria(
maxSequenceNr = 457L,
maxTimestamp = System.currentTimeMillis))
//#snapshot-criteria
}
}

View file

@ -46,7 +46,12 @@ Configuration
By default, journaled messages are written to a directory named ``journal`` in the current working directory. This
can be changed by configuration where the specified path can be relative or absolute:
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#config
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#journal-config
The default storage location of :ref:`snapshots` is a directory named ``snapshots`` in the current working directory.
This can be changed by configuration where the specified path can be relative or absolute:
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#snapshot-config
.. _processors:
@ -221,6 +226,38 @@ method or by pattern matching
Persistent messages are assigned sequence numbers on a per-processor basis. A sequence starts at ``1L`` and
doesn't contain gaps unless a processor marks a message as deleted.
.. _snapshots:
Snapshots
=========
Snapshots can dramatically reduce recovery times. Processors can save snapshots of internal state by calling the
``saveSnapshot`` method on ``Processor``. If saving of a snapshot succeeds, the processor will receive a
``SaveSnapshotSucceeded`` message, otherwise a ``SaveSnapshotFailed`` message
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#save-snapshot
where ``metadata`` is of type ``SnapshotMetadata``:
.. includecode:: ../../../akka-persistence/src/main/scala/akka/persistence/Snapshot.scala#snapshot-metadata
During recovery, the processor is offered a previously saved snapshot via a ``SnapshotOffer`` message from
which it can initialize internal state.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#snapshot-offer
The replayed messages that follow the ``SnapshotOffer`` message, if any, are younger than the offered snapshot.
They finally recover the processor to its current (i.e. latest) state.
In general, a processor is only offered a snapshot if that processor has previously saved one or more snapshots
and at least one of these snapshots matches the ``SnapshotSelectionCriteria`` that can be specified for recovery.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#snapshot-criteria
If not specified, they default to ``SnapshotSelectionCriteria.Latest`` which selects the latest (= youngest) snapshot.
To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.None``. A recovery where no
saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages.
Miscellaneous
=============
@ -234,8 +271,9 @@ State machines can be persisted by mixing in the ``FSM`` trait into processors.
Upcoming features
=================
* Snapshot based recovery
* Configurable serialization
* Reliable channels
* Journal plugin API
* Snapshot store plugin API
* Reliable channels
* Custom serialization of messages and snapshots
* Extended deletion of messages and snapshots
* ...

View file

@ -2,16 +2,34 @@ akka {
persistence {
journal {
use = "leveldb"
inmem {
// ...
}
leveldb {
dir = "journal"
dispatcher {
executor = "thread-pool-executor"
write.dispatcher {
type = PinnedDispatcher
executor = "thread-pool-executor"
}
replay.dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 2
core-pool-size-max = 8
}
}
fsync = off
}
}
snapshot-store {
use = "local"
local {
dir = "snapshots"
io.dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 2
core-pool-size-max = 8
}
}
}
}

View file

@ -69,17 +69,17 @@ class Channel private (_channelId: Option[String]) extends Actor with Stash {
import ResolvedDelivery._
private val delivering: Actor.Receive = {
case Deliver(p: PersistentImpl, destination, resolve) {
if (!p.confirms.contains(id)) {
val msg = p.copy(channelId = id,
confirmTarget = extension.journalFor(p.processorId),
confirmMessage = Confirm(p.processorId, p.sequenceNr, id))
case Deliver(persistent: PersistentImpl, destination, resolve) {
if (!persistent.confirms.contains(id)) {
val msg = persistent.copy(channelId = id,
confirmTarget = extension.journalFor(persistent.processorId),
confirmMessage = Confirm(persistent.processorId, persistent.sequenceNr, id))
resolve match {
case Resolve.Sender if !p.resolved {
case Resolve.Sender if !persistent.resolved {
context.actorOf(Props(classOf[ResolvedSenderDelivery], msg, destination, sender)) ! DeliverResolved
context.become(buffering, false)
}
case Resolve.Destination if !p.resolved {
case Resolve.Destination if !persistent.resolved {
context.actorOf(Props(classOf[ResolvedDestinationDelivery], msg, destination, sender)) ! DeliverResolved
context.become(buffering, false)
}

View file

@ -8,7 +8,6 @@ import akka.actor._
private[persistence] trait JournalFactory {
/**
*
* Creates a new journal actor.
*/
def createJournal(implicit factory: ActorRefFactory): ActorRef
@ -59,27 +58,22 @@ private[persistence] object Journal {
case class Looped(message: Any)
/**
* Instructs a journal to replay persistent messages to `processor`, identified by
* `processorId`. Messages are replayed up to sequence number `toSequenceNr` (inclusive).
*
* @param toSequenceNr upper sequence number bound (inclusive) for replay.
* @param processor processor that receives the replayed messages.
* @param processorId processor id.
* ...
*/
case class Replay(toSequenceNr: Long, processor: ActorRef, processorId: String)
case class Replay(fromSequenceNr: Long, toSequenceNr: Long, processor: ActorRef, processorId: String)
/**
* Wrapper for a replayed `persistent` message.
* Reply message to a processor that `persistent` message has been replayed.
*
* @param persistent persistent message.
*/
case class Replayed(persistent: PersistentImpl)
/**
* Message sent to a processor after the last [[Replayed]] message.
* Reply message to a processor that all `persistent` messages have been replayed.
*
* @param maxSequenceNr the highest stored sequence number (for a processor).
*/
case class RecoveryEnd(maxSequenceNr: Long)
case class ReplayCompleted(maxSequenceNr: Long)
}

View file

@ -7,20 +7,29 @@ package akka.persistence
import com.typesafe.config.Config
import akka.actor._
import akka.persistence.journal._
import akka.persistence.journal.leveldb._
import akka.persistence.snapshot.local._
/**
* Akka persistence extension.
* Persistence extension.
*/
object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider {
class Settings(config: Config) {
val rootConfig = config.getConfig("akka.persistence.journal")
val journalName = rootConfig.getString("use")
val journalConfig = rootConfig.getConfig(journalName)
val rootConfig = config.getConfig("akka.persistence")
val journalsConfig = rootConfig.getConfig("journal")
val journalName = journalsConfig.getString("use")
val journalConfig = journalsConfig.getConfig(journalName)
val journalFactory = journalName match {
case "inmem" new InmemJournalSettings(journalConfig)
case "leveldb" new LeveldbJournalSettings(journalConfig)
}
val snapshotStoresConfig = rootConfig.getConfig("snapshot-store")
val snapshotStoreName = snapshotStoresConfig.getString("use")
val snapshotStoreConfig = snapshotStoresConfig.getConfig(snapshotStoreName)
val snapshotStoreFactory = snapshotStoreName match {
case "local" new LocalSnapshotStoreSettings(snapshotStoreConfig)
}
}
/**
@ -34,21 +43,28 @@ object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider {
}
/**
* Akka persistence extension.
* Persistence extension.
*/
class Persistence(val system: ExtendedActorSystem) extends Extension {
private val settings = new Persistence.Settings(system.settings.config)
private val journal = settings.journalFactory.createJournal(system)
// TODO: journal should have its own dispatcher
private val snapshotStore = settings.snapshotStoreFactory.createSnapshotStore(system)
/**
* Returns a journal for processor identified by `pid`.
*
* @param processorId processor id.
* Returns a snapshot store for a processor identified by `processorId`.
*/
def snapshotStoreFor(processorId: String): ActorRef = {
// Currently returns a snapshot store singleton but this methods allows for later
// optimizations where each processor can have its own snapshot store actor.
snapshotStore
}
/**
* Returns a journal for a processor identified by `processorId`.
*/
def journalFor(processorId: String): ActorRef = {
// Currently returns a journal singleton is returned but this methods allows
// for later optimisations where each processor can have its own journal actor.
// Currently returns a journal singleton but this methods allows for later
// optimizations where each processor can have its own journal actor.
journal
}

View file

@ -60,8 +60,8 @@ object Persistent {
/**
* Creates a new persistent message, derived from an implicit current message.
* When used inside a [[Processor]], this is the optional current [[Message]]
* of that processor.
* When used inside a [[Processor]], this is the optional current [[Persistent]]
* message of that processor.
*
* @param payload payload of the new persistent message.
* @param currentPersistentMessage optional current persistent message, defaults to `None`.

View file

@ -31,12 +31,12 @@ import akka.dispatch._
* state from these messages. New messages sent to a processor during recovery do not interfere with replayed
* messages, hence applications don't need to wait for a processor to complete its recovery.
*
* Automated recovery can be turned off or customized by overriding the [[preStartProcessor]] and
* [[preRestartProcessor]] life cycle hooks. If automated recovery is turned off, an application can
* explicitly recover a processor by sending it a [[Recover]] message.
* Automated recovery can be turned off or customized by overriding the [[preStart]] and [[preRestart]] life
* cycle hooks. If automated recovery is turned off, an application can explicitly recover a processor by
* sending it a [[Recover]] message.
*
* [[Persistent]] messages are assigned sequence numbers that are generated on a per-processor basis. A sequence
* starts at `1L` and doesn't contain gaps unless a processor (logically) [[delete]]s a message.
* starts at `1L` and doesn't contain gaps unless a processor (logically) deletes a message
*
* During recovery, a processor internally buffers new messages until recovery completes, so that new messages
* do not interfere with replayed messages. This internal buffer (the ''processor stash'') is isolated from the
@ -47,6 +47,7 @@ import akka.dispatch._
*/
trait Processor extends Actor with Stash {
import Journal._
import SnapshotStore._
private val extension = Persistence(context.system)
private val _processorId = extension.processorId(self)
@ -81,24 +82,37 @@ trait Processor extends Actor with Stash {
override def toString: String = "recovery pending"
def aroundReceive(receive: Actor.Receive, message: Any): Unit = message match {
case Recover(toSnr) {
case Recover(fromSnap, toSnr) {
_currentState = recoveryStarted
journal ! Replay(toSnr, self, processorId)
snapshotStore ! LoadSnapshot(processorId, fromSnap, toSnr)
}
case _ stashInternal()
}
}
/**
* Processes replayed messages. Changes to `recoverySucceeded` if all replayed
* messages have been successfully processed, otherwise, changes to `recoveryFailed`.
* In case of a failure, the exception is caught and stored for being thrown later
* in `prepareRestart`.
* Processes a loaded snapshot and replayed messages, if any. If processing of the loaded
* snapshot fails, the exception is thrown immediately. If processing of a replayed message
* fails, the exception is caught and stored for being thrown later and state is changed to
* `recoveryFailed`.
*/
private val recoveryStarted = new State {
override def toString: String = "recovery started"
def aroundReceive(receive: Actor.Receive, message: Any) = message match {
case LoadSnapshotCompleted(sso, toSnr) sso match {
case Some(ss) {
process(receive, SnapshotOffer(ss.metadata, ss.snapshot))
journal ! Replay(ss.metadata.sequenceNr + 1L, toSnr, self, processorId)
} case None {
journal ! Replay(1L, toSnr, self, processorId)
}
}
case ReplayCompleted(maxSnr) {
_currentState = recoverySucceeded
_sequenceNr = maxSnr
unstashAllInternal()
}
case Replayed(p) try { processPersistent(receive, p) } catch {
case t: Throwable {
_currentState = recoveryFailed // delay throwing exception to prepareRestart
@ -106,12 +120,7 @@ trait Processor extends Actor with Stash {
_recoveryFailureMessage = currentEnvelope
}
}
case RecoveryEnd(maxSnr) {
_currentState = recoverySucceeded
_sequenceNr = maxSnr
unstashAllInternal()
}
case Recover(_) // ignore
case r: Recover // ignore
case _ stashInternal()
}
}
@ -123,10 +132,12 @@ trait Processor extends Actor with Stash {
override def toString: String = "recovery finished"
def aroundReceive(receive: Actor.Receive, message: Any) = message match {
case Recover(_) // ignore
case r: Recover // ignore
case Replayed(p) processPersistent(receive, p) // can occur after unstash from user stash
case Written(p) processPersistent(receive, p)
case Looped(p) process(receive, p)
case s: SaveSnapshotSucceeded process(receive, s)
case f: SaveSnapshotFailed process(receive, f)
case p: PersistentImpl journal forward Write(p.copy(processorId = processorId, sequenceNr = nextSequenceNr()), self)
case m journal forward Loop(m, self)
}
@ -141,12 +152,13 @@ trait Processor extends Actor with Stash {
override def toString: String = "recovery failed"
def aroundReceive(receive: Actor.Receive, message: Any) = message match {
case RecoveryEnd(maxSnr) {
case ReplayCompleted(maxSnr) {
_currentState = prepareRestart
mailbox.enqueueFirst(self, _recoveryFailureMessage)
}
case Replayed(p) updateLastSequenceNr(p)
case _ // ignore
case r: Recover // ignore
case _ stashInternal()
}
}
@ -172,7 +184,8 @@ trait Processor extends Actor with Stash {
private var _recoveryFailureReason: Throwable = _
private var _recoveryFailureMessage: Envelope = _
private lazy val journal: ActorRef = extension.journalFor(processorId)
private lazy val journal = extension.journalFor(processorId)
private lazy val snapshotStore = extension.snapshotStoreFor(processorId)
/**
* Processor id. Defaults to this processor's path and can be overridden.
@ -211,10 +224,18 @@ trait Processor extends Actor with Stash {
* caused an exception. Processors that want to re-receive that persistent message during recovery
* should not call this method.
*/
def delete(persistent: Persistent) {
def deleteMessage(persistent: Persistent): Unit = {
journal ! Delete(persistent)
}
/**
* Saves a `snapshot` of this processor's state. If saving succeeds, this processor will receive a
* [[SaveSnapshotSucceeded]] message, otherwise a [[SaveSnapshotFailed]] message.
*/
def saveSnapshot(snapshot: Any): Unit = {
snapshotStore ! SaveSnapshot(SnapshotMetadata(processorId, lastSequenceNr), snapshot)
}
/**
* INTERNAL API.
*/
@ -268,7 +289,7 @@ trait Processor extends Actor with Stash {
*/
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
message match {
case Some(_) self ! Recover(lastSequenceNr)
case Some(_) self ! Recover(toSequenceNr = lastSequenceNr)
case None self ! Recover()
}
}
@ -348,12 +369,12 @@ trait Processor extends Actor with Stash {
* state from these messages. New messages sent to a processor during recovery do not interfere with replayed
* messages, hence applications don't need to wait for a processor to complete its recovery.
*
* Automated recovery can be turned off or customized by overriding the [[preStartProcessor]] and
* [[preRestartProcessor]] life cycle hooks. If automated recovery is turned off, an application can
* explicitly recover a processor by sending it a [[Recover]] message.
* Automated recovery can be turned off or customized by overriding the [[preStart]] and [[preRestart]] life
* cycle hooks. If automated recovery is turned off, an application can explicitly recover a processor by
* sending it a [[Recover]] message.
*
* [[Persistent]] messages are assigned sequence numbers that are generated on a per-processor basis. A sequence
* starts at `1L` and doesn't contain gaps unless a processor (logically) [[delete]]s a message.
* starts at `1L` and doesn't contain gaps unless a processor (logically) deletes a message.
*
* During recovery, a processor internally buffers new messages until recovery completes, so that new messages
* do not interfere with replayed messages. This internal buffer (the ''processor stash'') is isolated from the
@ -371,23 +392,3 @@ abstract class UntypedProcessor extends UntypedActor with Processor {
*/
def getCurrentPersistentMessage = currentPersistentMessage.getOrElse(null)
}
/**
* Recovery request for a [[Processor]].
*
* @param toSequenceNr upper sequence number bound (inclusive) for replayed messages.
*/
@SerialVersionUID(1L)
case class Recover(toSequenceNr: Long = Long.MaxValue)
object Recover {
/**
* Java API.
*/
def create() = Recover()
/**
* Java API.
*/
def create(toSequenceNr: Long) = Recover(toSequenceNr)
}

View file

@ -0,0 +1,55 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
/**
* Instructs a processor to recover itself. Recovery will start from a snapshot if the processor has
* previously saved one or more snapshots and at least one of these snapshots matches the specified
* `fromSnapshot` criteria. Otherwise, recovery will start from scratch by replaying all journaled
* messages.
*
* If recovery starts from a snapshot, the processor is offered that snapshot with a [[SnapshotOffer]]
* message, followed by replayed messages, if any, that are younger than the snapshot, up to the
* specified upper sequence number bound (`toSequenceNr`).
*
* @param fromSnapshot criteria for selecting a saved snapshot from which recovery should start. Default
* is latest (= youngest) snapshot.
* @param toSequenceNr upper sequence number bound (inclusive) for recovery. Default is no upper bound.
*/
@SerialVersionUID(1L)
case class Recover(fromSnapshot: SnapshotSelectionCriteria = SnapshotSelectionCriteria.Latest, toSequenceNr: Long = Long.MaxValue)
object Recover {
/**
* Java API.
*
* @see [[Recover]]
*/
def create() = Recover()
/**
* Java API.
*
* @see [[Recover]]
*/
def create(toSequenceNr: Long) =
Recover(toSequenceNr = toSequenceNr)
/**
* Java API.
*
* @see [[Recover]]
*/
def create(fromSnapshot: SnapshotSelectionCriteria) =
Recover(fromSnapshot = fromSnapshot)
/**
* Java API.
*
* @see [[Recover]]
*/
def create(fromSnapshot: SnapshotSelectionCriteria, toSequenceNr: Long) =
Recover(fromSnapshot, toSequenceNr)
}

View file

@ -0,0 +1,208 @@
/**
* Copyright (C) 2012-2013 Eligotech BV.
*/
package akka.persistence
import java.io._
import akka.actor._
import akka.util.ClassLoaderObjectInputStream
/**
* Snapshot metadata.
*
* @param processorId id of processor from which the snapshot was taken.
* @param sequenceNr sequence number at which the snapshot was taken.
* @param timestamp time at which the snapshot was saved.
*/
@SerialVersionUID(1L) //#snapshot-metadata
case class SnapshotMetadata(processorId: String, sequenceNr: Long, timestamp: Long = 0L)
//#snapshot-metadata
/**
* Indicates successful saving of a snapshot.
*
* @param metadata snapshot metadata.
*/
@SerialVersionUID(1L)
case class SaveSnapshotSucceeded(metadata: SnapshotMetadata)
/**
* Indicates failed saving of a snapshot.
*
* @param metadata snapshot metadata.
* @param reason failure reason.
*/
@SerialVersionUID(1L)
case class SaveSnapshotFailed(metadata: SnapshotMetadata, reason: Throwable)
/**
* Offers a [[Processor]] a previously saved `snapshot` during recovery. This offer is received
* before any further replayed messages.
*/
@SerialVersionUID(1L)
case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any)
/**
* Snapshot selection criteria for recovery.
*
* @param maxSequenceNr upper bound for a selected snapshot's sequence number. Default is no upper bound.
* @param maxTimestamp upper bound for a selected snapshot's timestamp. Default is no upper bound.
*
* @see [[Recover]]
*/
@SerialVersionUID(1L)
case class SnapshotSelectionCriteria(maxSequenceNr: Long = Long.MaxValue, maxTimestamp: Long = Long.MaxValue) {
private[persistence] def limit(toSequenceNr: Long): SnapshotSelectionCriteria =
if (toSequenceNr < maxSequenceNr) copy(maxSequenceNr = toSequenceNr) else this
}
object SnapshotSelectionCriteria {
/**
* The latest saved snapshot.
*/
val Latest = SnapshotSelectionCriteria()
/**
* No saved snapshot matches.
*/
val None = SnapshotSelectionCriteria(0L, 0L)
/**
* Java API.
*/
def create(maxSequenceNr: Long, maxTimestamp: Long) =
SnapshotSelectionCriteria(maxSequenceNr, maxTimestamp)
/**
* Java API.
*/
def latest() = Latest
/**
* Java API.
*/
def none() = None
}
// TODO: support application-defined snapshot serializers
// TODO: support application-defined snapshot access
/**
* Snapshot serialization extension.
*/
private[persistence] object SnapshotSerialization extends ExtensionId[SnapshotSerialization] with ExtensionIdProvider {
def createExtension(system: ExtendedActorSystem): SnapshotSerialization = new SnapshotSerialization(system)
def lookup() = SnapshotSerialization
}
/**
* Snapshot serialization extension.
*/
private[persistence] class SnapshotSerialization(val system: ExtendedActorSystem) extends Extension {
import akka.serialization.JavaSerializer
/**
* Java serialization based snapshot serializer.
*/
val java = new SnapshotSerializer {
def serialize(stream: OutputStream, metadata: SnapshotMetadata, state: Any) = {
val out = new ObjectOutputStream(stream)
JavaSerializer.currentSystem.withValue(system) { out.writeObject(state) }
}
def deserialize(stream: InputStream, metadata: SnapshotMetadata) = {
val in = new ClassLoaderObjectInputStream(system.dynamicAccess.classLoader, stream)
JavaSerializer.currentSystem.withValue(system) { in.readObject }
}
}
}
/**
* Stream-based snapshot serializer.
*/
private[persistence] trait SnapshotSerializer {
/**
* Serializes a `snapshot` to an output stream.
*/
def serialize(stream: OutputStream, metadata: SnapshotMetadata, snapshot: Any): Unit
/**
* Deserializes a snapshot from an input stream.
*/
def deserialize(stream: InputStream, metadata: SnapshotMetadata): Any
}
/**
* Input and output stream management for snapshot serialization.
*/
private[persistence] trait SnapshotAccess {
/**
* Provides a managed output stream for serializing a snapshot.
*
* @param metadata snapshot metadata needed to create an output stream.
* @param body called with the managed output stream as argument.
*/
def withOutputStream(metadata: SnapshotMetadata)(body: OutputStream Unit)
/**
* Provides a managed input stream for deserializing a state object.
*
* @param metadata snapshot metadata needed to create an input stream.
* @param body called with the managed input stream as argument.
* @return read snapshot.
*/
def withInputStream(metadata: SnapshotMetadata)(body: InputStream Any): Any
/**
* Loads the snapshot metadata of all currently stored snapshots.
*/
def metadata: Set[SnapshotMetadata]
/**
* Deletes the snapshot referenced by `metadata`.
*/
def delete(metadata: SnapshotMetadata)
}
private[persistence] trait SnapshotStoreFactory {
/**
* Creates a new snapshot store actor.
*/
def createSnapshotStore(implicit factory: ActorRefFactory): ActorRef
}
private[persistence] object SnapshotStore {
/**
* Instructs a snapshot store to load a snapshot.
*
* @param processorId processor id.
* @param criteria criteria for selecting a saved snapshot from which recovery should start.
* @param toSequenceNr upper sequence number bound (inclusive) for recovery.
*/
case class LoadSnapshot(processorId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long)
/**
* Reply message to a processor that a snapshot loading attempt has been completed.
*
* @param savedSnapshot
*/
case class LoadSnapshotCompleted(savedSnapshot: Option[SavedSnapshot], toSequenceNr: Long)
/**
* Instructs snapshot store to save a snapshot.
*
* @param metadata snapshot metadata.
* @param snapshot snapshot.
*/
case class SaveSnapshot(metadata: SnapshotMetadata, snapshot: Any)
/**
* In-memory representation of a saved snapshot.
*
* @param metadata snapshot metadata.
* @param snapshot saved snapshot.
*/
case class SavedSnapshot(metadata: SnapshotMetadata, snapshot: Any)
}

View file

@ -1,76 +0,0 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.journal
import com.typesafe.config.Config
import akka.actor._
import akka.pattern.PromiseActorRef
import akka.persistence._
/**
* In-memory journal configuration object.
*/
private[persistence] class InmemJournalSettings(config: Config) extends JournalFactory {
/**
* Creates a new in-memory journal actor from this configuration object.
*/
def createJournal(implicit factory: ActorRefFactory): ActorRef = factory.actorOf(Props(classOf[InmemJournal], this))
}
/**
* In-memory journal.
*/
private[persistence] class InmemJournal(settings: InmemJournalSettings) extends Actor {
// processorId => (message, sender, deleted)
private var messages = Map.empty[String, List[(PersistentImpl, ActorRef, Boolean)]]
// (processorId, sequenceNr) => confirming channels ids
private var confirms = Map.empty[(String, Long), List[String]]
import Journal._
def receive = {
case Write(pm, p) {
// must be done because PromiseActorRef instances have no uid set TODO: discuss
val ps = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender
messages = messages + (messages.get(pm.processorId) match {
case None pm.processorId -> List((pm.copy(resolved = false), ps, false))
case Some(mss) pm.processorId -> ((pm.copy(resolved = false), ps, false) :: mss)
})
p.tell(Written(pm), sender)
}
case c @ Confirm(pid, snr, cid) {
val pair = (pid, snr)
confirms = confirms + (confirms.get(pair) match {
case None pair -> List(cid)
case Some(cids) pair -> (cid :: cids)
})
// TODO: turn off by default and allow to turn on by configuration
context.system.eventStream.publish(c)
}
case Delete(pm: PersistentImpl) {
val pid = pm.processorId
val snr = pm.sequenceNr
messages = messages map {
case (`pid`, mss) pid -> (mss map {
case (msg, sdr, _) if msg.sequenceNr == snr (msg, sdr, true)
case ms ms
})
case kv kv
}
}
case Loop(m, p) {
p.tell(Looped(m), sender)
}
case Replay(toSnr, p, pid) {
val cfs = confirms.withDefaultValue(Nil)
for {
mss messages.get(pid)
(msg, sdr, del) mss.reverseIterator.filter(_._1.sequenceNr <= toSnr)
} if (!del) p.tell(Replayed(msg.copy(confirms = cfs((msg.processorId, msg.sequenceNr)))), sdr)
p.tell(RecoveryEnd(messages.get(pid).map(_.head._1.sequenceNr).getOrElse(0L)), self)
}
}
}

View file

@ -1,278 +0,0 @@
/**
* Copyright (C) 2012-2013 Eligotech BV.
*/
package akka.persistence.journal
import java.io.File
import java.nio.ByteBuffer
import com.typesafe.config.Config
import org.iq80.leveldb._
import akka.actor._
import akka.pattern.PromiseActorRef
import akka.persistence._
import akka.serialization.{ Serialization, SerializationExtension }
/**
* LevelDB journal configuration object.
*/
private[persistence] class LeveldbJournalSettings(config: Config) extends JournalFactory {
/**
* Name of directory where journal files shall be stored. Can be a relative or absolute path.
*/
val dir: String = config.getString("dir")
/**
* Currently `false`.
*/
val checksum = false
/**
* Currently `false`.
*/
val fsync = false
/**
* Creates a new LevelDB journal actor from this configuration object.
*/
def createJournal(implicit factory: ActorRefFactory): ActorRef =
factory.actorOf(Props(classOf[LeveldbJournal], this).withDispatcher("akka.persistence.journal.leveldb.dispatcher"))
}
/**
* LevelDB journal.
*/
private[persistence] class LeveldbJournal(settings: LeveldbJournalSettings) extends Actor {
// TODO: support migration of processor and channel ids
// needed if default processor and channel ids are used
// (actor paths, which contain deployment information).
val leveldbOptions = new Options().createIfMissing(true).compressionType(CompressionType.NONE)
val levelDbReadOptions = new ReadOptions().verifyChecksums(settings.checksum)
val levelDbWriteOptions = new WriteOptions().sync(settings.fsync)
val leveldbFactory = org.iq80.leveldb.impl.Iq80DBFactory.factory
var leveldb: DB = _
val numericIdOffset = 10
var pathMap: Map[String, Int] = Map.empty
// TODO: use protobuf serializer for PersistentImpl
// TODO: use user-defined serializer for payload
val serializer = SerializationExtension(context.system).findSerializerFor("")
import LeveldbJournal._
import Journal._
def receive = {
case Write(pm, p) {
val sm = withBatch { batch
// must be done because PromiseActorRef instances have no uid set TODO: discuss
val ps = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender
val sm = pm.copy(sender = Serialization.serializedActorPath(ps))
val pid = numericId(sm.processorId)
batch.put(keyToBytes(counterKey(pid)), counterToBytes(sm.sequenceNr))
batch.put(keyToBytes(Key(pid, sm.sequenceNr, 0)), msgToBytes(sm.copy(resolved = false, confirmTarget = null, confirmMessage = null)))
sm
}
p.tell(Written(sm), sender)
}
case c @ Confirm(pid, snr, cid) {
leveldb.put(keyToBytes(Key(numericId(pid), snr, numericId(cid))), cid.getBytes("UTF-8"))
// TODO: turn off by default and allow to turn on by configuration
context.system.eventStream.publish(c)
}
case Delete(pm: PersistentImpl) {
leveldb.put(keyToBytes(deletionKey(numericId(pm.processorId), pm.sequenceNr)), Array.empty[Byte])
}
case Loop(m, p) {
p.tell(Looped(m), sender)
}
case Replay(toSnr, p, pid) {
val options = levelDbReadOptions.snapshot(leveldb.getSnapshot)
val iter = leveldb.iterator(options)
val maxSnr = leveldb.get(keyToBytes(counterKey(numericId(pid))), options) match {
case null 0L
case bytes counterFromBytes(bytes)
}
context.actorOf(Props(classOf[LeveldbReplay], msgFromBytes _)) ! LeveldbReplay.Replay(toSnr, maxSnr, p, numericId(pid), iter)
}
}
private def msgToBytes(m: PersistentImpl): Array[Byte] = serializer.toBinary(m)
private def msgFromBytes(a: Array[Byte]): PersistentImpl = serializer.fromBinary(a).asInstanceOf[PersistentImpl]
// ----------------------------------------------------------
// Path mapping
// ----------------------------------------------------------
private def numericId(processorId: String): Int = pathMap.get(processorId) match {
case None writePathMapping(processorId, pathMap.size + numericIdOffset)
case Some(v) v
}
private def readPathMap(): Map[String, Int] = {
val iter = leveldb.iterator(levelDbReadOptions.snapshot(leveldb.getSnapshot))
try {
iter.seek(keyToBytes(idToKey(numericIdOffset)))
readPathMap(Map.empty, iter)
} finally {
iter.close()
}
}
private def readPathMap(pathMap: Map[String, Int], iter: DBIterator): Map[String, Int] = {
if (!iter.hasNext) pathMap else {
val nextEntry = iter.next()
val nextKey = keyFromBytes(nextEntry.getKey)
if (!isMappingKey(nextKey)) pathMap else {
val nextVal = new String(nextEntry.getValue, "UTF-8")
readPathMap(pathMap + (nextVal -> idFromKey(nextKey)), iter)
}
}
}
private def writePathMapping(path: String, numericId: Int): Int = {
pathMap = pathMap + (path -> numericId)
leveldb.put(keyToBytes(idToKey(numericId)), path.getBytes("UTF-8"))
numericId
}
// ----------------------------------------------------------
// Batch write support
// ----------------------------------------------------------
def withBatch[R](body: WriteBatch R): R = {
val batch = leveldb.createWriteBatch()
try {
val r = body(batch)
leveldb.write(batch, levelDbWriteOptions)
r
} finally {
batch.close()
}
}
// ----------------------------------------------------------
// Life cycle
// ----------------------------------------------------------
override def preStart() {
leveldb = leveldbFactory.open(new File(settings.dir), leveldbOptions)
pathMap = readPathMap()
}
override def postStop() {
leveldb.close()
}
}
private object LeveldbJournal {
case class Key(
processorId: Int,
sequenceNr: Long,
channelId: Int)
def idToKey(id: Int) = Key(1, 0L, id)
def idFromKey(key: Key) = key.channelId
def counterKey(processorId: Int): Key = Key(processorId, 0L, 0)
def isMappingKey(key: Key): Boolean = key.processorId == 1
def deletionKey(processorId: Int, sequenceNr: Long): Key = Key(processorId, sequenceNr, 1)
def isDeletionKey(key: Key): Boolean = key.channelId == 1
def counterToBytes(ctr: Long): Array[Byte] = ByteBuffer.allocate(8).putLong(ctr).array
def counterFromBytes(bytes: Array[Byte]): Long = ByteBuffer.wrap(bytes).getLong
def keyToBytes(key: Key): Array[Byte] = {
val bb = ByteBuffer.allocate(20)
bb.putInt(key.processorId)
bb.putLong(key.sequenceNr)
bb.putInt(key.channelId)
bb.array
}
def keyFromBytes(bytes: Array[Byte]): Key = {
val bb = ByteBuffer.wrap(bytes)
val aid = bb.getInt
val snr = bb.getLong
val cid = bb.getInt
new Key(aid, snr, cid)
}
}
private class LeveldbReplay(deserialize: Array[Byte] PersistentImpl) extends Actor {
val extension = Persistence(context.system)
import LeveldbReplay._
import LeveldbJournal._
import Journal.{ Replayed, RecoveryEnd }
// TODO: parent should stop replay actor if it crashes
// TODO: use a pinned dispatcher
def receive = {
case Replay(toSnr, maxSnr, processor, processorId, iter) {
try {
val startKey = Key(processorId, 1L, 0)
iter.seek(keyToBytes(startKey))
replay(iter, startKey, toSnr, m processor.tell(Replayed(m), extension.system.provider.resolveActorRef(m.sender)))
} finally { iter.close() }
processor.tell(RecoveryEnd(maxSnr), self)
context.stop(self)
}
}
@scala.annotation.tailrec
private def replay(iter: DBIterator, key: Key, toSnr: Long, callback: PersistentImpl Unit) {
if (iter.hasNext) {
val nextEntry = iter.next()
val nextKey = keyFromBytes(nextEntry.getKey)
if (nextKey.sequenceNr > toSnr) {
// end iteration here
} else if (nextKey.channelId != 0) {
// phantom confirmation (just advance iterator)
replay(iter, nextKey, toSnr, callback)
} else if (key.processorId == nextKey.processorId) {
val msg = deserialize(nextEntry.getValue)
val del = deletion(iter, nextKey)
val cnf = confirms(iter, nextKey, Nil)
if (!del) callback(msg.copy(confirms = cnf))
replay(iter, nextKey, toSnr, callback)
}
}
}
private def deletion(iter: DBIterator, key: Key): Boolean = {
if (iter.hasNext) {
val nextEntry = iter.peekNext()
val nextKey = keyFromBytes(nextEntry.getKey)
if (key.processorId == nextKey.processorId && key.sequenceNr == nextKey.sequenceNr && isDeletionKey(nextKey)) {
iter.next()
true
} else false
} else false
}
@scala.annotation.tailrec
private def confirms(iter: DBIterator, key: Key, channelIds: List[String]): List[String] = {
if (iter.hasNext) {
val nextEntry = iter.peekNext()
val nextKey = keyFromBytes(nextEntry.getKey)
if (key.processorId == nextKey.processorId && key.sequenceNr == nextKey.sequenceNr) {
val nextValue = new String(nextEntry.getValue, "UTF-8")
iter.next()
confirms(iter, nextKey, nextValue :: channelIds)
} else channelIds
} else channelIds
}
}
private object LeveldbReplay {
case class Replay(toSequenceNr: Long, maxSequenceNr: Long, processor: ActorRef, pid: Int, iterator: DBIterator)
}

View file

@ -0,0 +1,60 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.journal.leveldb
import akka.actor.Actor
import org.iq80.leveldb.DBIterator
/**
* Persistent mapping of `String`-based processor and channel ids to numeric ids.
*/
private[persistence] trait LeveldbIdMapping extends Actor { this: LeveldbJournal
import Key._
private val idOffset = 10
private var idMap: Map[String, Int] = Map.empty
/**
* Get the mapped numeric id for the specified processor or channel `id`. Creates and
* stores a new mapping if necessary.
*/
def numericId(id: String): Int = idMap.get(id) match {
case None writeIdMapping(id, idMap.size + idOffset)
case Some(v) v
}
private def readIdMap(): Map[String, Int] = {
val iter = leveldbIterator
try {
iter.seek(keyToBytes(idKey(idOffset)))
readIdMap(Map.empty, iter)
} finally {
iter.close()
}
}
private def readIdMap(pathMap: Map[String, Int], iter: DBIterator): Map[String, Int] = {
if (!iter.hasNext) pathMap else {
val nextEntry = iter.next()
val nextKey = keyFromBytes(nextEntry.getKey)
if (!isIdKey(nextKey)) pathMap else {
val nextVal = new String(nextEntry.getValue, "UTF-8")
readIdMap(pathMap + (nextVal -> id(nextKey)), iter)
}
}
}
private def writeIdMapping(id: String, numericId: Int): Int = {
idMap = idMap + (id -> numericId)
leveldb.put(keyToBytes(idKey(numericId)), id.getBytes("UTF-8"))
numericId
}
override def preStart() {
idMap = readIdMap()
super.preStart()
}
}

View file

@ -0,0 +1,138 @@
/**
* Copyright (C) 2012-2013 Eligotech BV.
*/
package akka.persistence.journal.leveldb
import java.io.File
import scala.util._
import org.iq80.leveldb._
import com.typesafe.config.Config
import akka.actor._
import akka.pattern.PromiseActorRef
import akka.persistence._
import akka.serialization.{ Serialization, SerializationExtension }
/**
* LevelDB journal settings.
*/
private[persistence] class LeveldbJournalSettings(config: Config) extends JournalFactory {
/**
* Name of directory where journal files shall be stored. Can be a relative or absolute path.
*/
val journalDir: File = new File(config.getString("dir"))
/**
* Verify checksums on read.
*/
val checksum = false
/**
* Synchronous writes to disk.
*/
val fsync: Boolean = config.getBoolean("fsync")
/**
* Creates a new LevelDB journal actor from this configuration object.
*/
def createJournal(implicit factory: ActorRefFactory): ActorRef =
factory.actorOf(Props(classOf[LeveldbJournal], this).withDispatcher("akka.persistence.journal.leveldb.write.dispatcher"))
}
/**
* LevelDB journal.
*/
private[persistence] class LeveldbJournal(val settings: LeveldbJournalSettings) extends Actor with LeveldbIdMapping with LeveldbReplay {
val extension = Persistence(context.system)
val leveldbOptions = new Options().createIfMissing(true).compressionType(CompressionType.NONE)
val leveldbReadOptions = new ReadOptions().verifyChecksums(settings.checksum)
val leveldbWriteOptions = new WriteOptions().sync(settings.fsync)
val leveldbDir = settings.journalDir
val leveldbFactory = org.iq80.leveldb.impl.Iq80DBFactory.factory
var leveldb: DB = _
// TODO: support migration of processor and channel ids
// needed if default processor and channel ids are used
// (actor paths, which contain deployment information).
// TODO: use protobuf serializer for PersistentImpl
// TODO: use user-defined serializer for payload
val serializer = SerializationExtension(context.system).findSerializerFor("")
import Journal._
import Key._
import context.dispatcher
def receive = {
case Write(persistent, processor) {
val persisted = withBatch { batch
val sdr = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender
val nid = numericId(persistent.processorId)
val prepared = persistent.copy(sender = Serialization.serializedActorPath(sdr))
batch.put(keyToBytes(counterKey(nid)), counterToBytes(prepared.sequenceNr))
batch.put(keyToBytes(Key(nid, prepared.sequenceNr, 0)), persistentToBytes(prepared.copy(resolved = false, confirmTarget = null, confirmMessage = null)))
prepared
}
processor.tell(Written(persisted), sender)
}
case c @ Confirm(processorId, sequenceNr, channelId) {
leveldb.put(keyToBytes(Key(numericId(processorId), sequenceNr, numericId(channelId))), channelId.getBytes("UTF-8"))
context.system.eventStream.publish(c) // TODO: turn off by default and allow to turn on by configuration
}
case Delete(persistent: PersistentImpl) {
leveldb.put(keyToBytes(deletionKey(numericId(persistent.processorId), persistent.sequenceNr)), Array.empty[Byte])
}
case Loop(message, processor) {
processor.tell(Looped(message), sender)
}
case Replay(fromSequenceNr, toSequenceNr, processor, processorId) {
val maxSnr = maxSequenceNr(processorId)
replayAsync(fromSequenceNr, toSequenceNr, processor, processorId) onComplete {
case Success(_) processor ! ReplayCompleted(maxSnr)
case Failure(e) // TODO: send RecoveryFailed to processor
}
}
}
def leveldbSnapshot = leveldbReadOptions.snapshot(leveldb.getSnapshot)
def leveldbIterator = leveldb.iterator(leveldbSnapshot)
def persistentToBytes(p: PersistentImpl): Array[Byte] = serializer.toBinary(p)
def persistentFromBytes(a: Array[Byte]): PersistentImpl = serializer.fromBinary(a).asInstanceOf[PersistentImpl]
def withBatch[R](body: WriteBatch R): R = {
val batch = leveldb.createWriteBatch()
try {
val r = body(batch)
leveldb.write(batch, leveldbWriteOptions)
r
} finally {
batch.close()
}
}
def maxSequenceNr(processorId: String) = {
leveldb.get(keyToBytes(counterKey(numericId(processorId))), leveldbSnapshot) match {
case null 0L
case bytes counterFromBytes(bytes)
}
}
override def preStart() {
leveldb = leveldbFactory.open(leveldbDir, leveldbOptions)
super.preStart()
}
override def postStop() {
super.postStop()
leveldb.close()
}
}

View file

@ -0,0 +1,45 @@
/**
* Copyright (C) 2012-2013 Eligotech BV.
*/
package akka.persistence.journal.leveldb
import java.nio.ByteBuffer
/**
* LevelDB key.
*/
private[leveldb] case class Key(
processorId: Int,
sequenceNr: Long,
channelId: Int)
private[leveldb] object Key {
def keyToBytes(key: Key): Array[Byte] = {
val bb = ByteBuffer.allocate(20)
bb.putInt(key.processorId)
bb.putLong(key.sequenceNr)
bb.putInt(key.channelId)
bb.array
}
def keyFromBytes(bytes: Array[Byte]): Key = {
val bb = ByteBuffer.wrap(bytes)
val aid = bb.getInt
val snr = bb.getLong
val cid = bb.getInt
new Key(aid, snr, cid)
}
def counterKey(processorId: Int): Key = Key(processorId, 0L, 0)
def counterToBytes(ctr: Long): Array[Byte] = ByteBuffer.allocate(8).putLong(ctr).array
def counterFromBytes(bytes: Array[Byte]): Long = ByteBuffer.wrap(bytes).getLong
def id(key: Key) = key.channelId
def idKey(id: Int) = Key(1, 0L, id)
def isIdKey(key: Key): Boolean = key.processorId == 1
def deletionKey(processorId: Int, sequenceNr: Long): Key = Key(processorId, sequenceNr, 1)
def isDeletionKey(key: Key): Boolean = key.channelId == 1
}

View file

@ -0,0 +1,79 @@
/**
* Copyright (C) 2012-2013 Eligotech BV.
*/
package akka.persistence.journal.leveldb
import scala.concurrent.Future
import org.iq80.leveldb.DBIterator
import akka.actor._
import akka.persistence._
import akka.persistence.Journal._
/**
* Asynchronous replay support.
*/
private[persistence] trait LeveldbReplay extends Actor { this: LeveldbJournal
import Key._
private val executionContext = context.system.dispatchers.lookup("akka.persistence.journal.leveldb.replay.dispatcher")
def replayAsync(fromSequenceNr: Long, toSequenceNr: Long, processor: ActorRef, processorId: String): Future[Unit] =
Future(replay(fromSequenceNr: Long, toSequenceNr, processor, numericId(processorId), leveldbIterator))(executionContext)
private def replay(fromSequenceNr: Long, toSequenceNr: Long, processor: ActorRef, processorId: Int, iter: DBIterator): Unit = {
@scala.annotation.tailrec
def go(key: Key)(callback: PersistentImpl Unit) {
if (iter.hasNext) {
val nextEntry = iter.next()
val nextKey = keyFromBytes(nextEntry.getKey)
if (nextKey.sequenceNr > toSequenceNr) {
// end iteration here
} else if (nextKey.channelId != 0) {
// phantom confirmation (just advance iterator)
go(nextKey)(callback)
} else if (key.processorId == nextKey.processorId) {
val msg = persistentFromBytes(nextEntry.getValue)
val del = deletion(nextKey)
val cnf = confirms(nextKey, Nil)
if (!del) callback(msg.copy(confirms = cnf))
go(nextKey)(callback)
}
}
}
@scala.annotation.tailrec
def confirms(key: Key, channelIds: List[String]): List[String] = {
if (iter.hasNext) {
val nextEntry = iter.peekNext()
val nextKey = keyFromBytes(nextEntry.getKey)
if (key.processorId == nextKey.processorId && key.sequenceNr == nextKey.sequenceNr) {
val nextValue = new String(nextEntry.getValue, "UTF-8")
iter.next()
confirms(nextKey, nextValue :: channelIds)
} else channelIds
} else channelIds
}
def deletion(key: Key): Boolean = {
if (iter.hasNext) {
val nextEntry = iter.peekNext()
val nextKey = keyFromBytes(nextEntry.getKey)
if (key.processorId == nextKey.processorId && key.sequenceNr == nextKey.sequenceNr && isDeletionKey(nextKey)) {
iter.next()
true
} else false
} else false
}
try {
val startKey = Key(processorId, if (fromSequenceNr < 1L) 1L else fromSequenceNr, 0)
iter.seek(keyToBytes(startKey))
go(startKey) { m processor.tell(Replayed(m), extension.system.provider.resolveActorRef(m.sender)) }
} finally {
iter.close()
}
}
}

View file

@ -0,0 +1,13 @@
/**
* Copyright (C) 2012-2013 Eligotech BV.
*/
package akka
package object persistence {
implicit val snapshotMetadataOrdering = new Ordering[SnapshotMetadata] {
def compare(x: SnapshotMetadata, y: SnapshotMetadata) =
if (x.processorId == y.processorId) math.signum(x.sequenceNr - y.sequenceNr).toInt
else x.processorId.compareTo(y.processorId)
}
}

View file

@ -0,0 +1,160 @@
/**
* Copyright (C) 2012-2013 Eligotech BV.
*/
package akka.persistence.snapshot.local
import java.io._
import java.net.{ URLDecoder, URLEncoder }
import scala.collection.SortedSet
import scala.concurrent._
import scala.util._
import com.typesafe.config.Config
import akka.actor._
import akka.persistence._
/**
* [[LocalSnapshotStore]] settings.
*/
private[persistence] class LocalSnapshotStoreSettings(config: Config) extends SnapshotStoreFactory {
/**
* Name of directory where snapshot files shall be stored.
*/
val snapshotDir: File = new File(config.getString("dir"))
/**
* Creates a new snapshot store actor.
*/
def createSnapshotStore(implicit factory: ActorRefFactory): ActorRef =
factory.actorOf(Props(classOf[LocalSnapshotStore], this))
}
/**
* Snapshot store that stores snapshots on local filesystem.
*/
private[persistence] class LocalSnapshotStore(settings: LocalSnapshotStoreSettings) extends Actor with ActorLogging {
private implicit val executionContext = context.system.dispatchers.lookup("akka.persistence.snapshot-store.local.io.dispatcher")
// TODO: make snapshot access configurable
// TODO: make snapshot serializer configurable
private val snapshotDir = settings.snapshotDir
private val snapshotAccess = new LocalSnapshotAccess(snapshotDir)
private val snapshotSerializer = SnapshotSerialization(context.system).java
var snapshotMetadata = Map.empty[String, SortedSet[SnapshotMetadata]]
import SnapshotStore._
def receive = {
case LoadSnapshot(processorId, criteria, toSequenceNr) {
val p = sender
loadSnapshotAsync(processorId, criteria.limit(toSequenceNr)) onComplete {
case Success(sso) p ! LoadSnapshotCompleted(sso, toSequenceNr)
case Failure(_) p ! LoadSnapshotCompleted(None, toSequenceNr)
}
}
case SaveSnapshot(metadata, snapshot) {
val p = sender
val md = metadata.copy(timestamp = System.currentTimeMillis)
saveSnapshotAsync(md, snapshot) onComplete {
case Success(_) self tell (SaveSnapshotSucceeded(md), p)
case Failure(e) self tell (SaveSnapshotFailed(metadata, e), p)
}
}
case evt @ SaveSnapshotSucceeded(metadata) {
updateMetadata(metadata)
sender ! evt // sender is processor
}
case evt @ SaveSnapshotFailed(metadata, reason) {
deleteSnapshot(metadata)
sender ! evt // sender is processor
}
}
def loadSnapshotAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SavedSnapshot]] =
Future(loadSnapshot(processorId, criteria))
def loadSnapshot(processorId: String, criteria: SnapshotSelectionCriteria): Option[SavedSnapshot] = {
@scala.annotation.tailrec
def load(metadata: SortedSet[SnapshotMetadata]): Option[SavedSnapshot] = metadata.lastOption match {
case None None
case Some(md) {
Try(snapshotAccess.withInputStream(md)(snapshotSerializer.deserialize(_, md))) match {
case Success(ss) Some(SavedSnapshot(md, ss))
case Failure(e) {
log.error(e, s"error loading snapshot ${md}")
load(metadata.init) // try older snapshot
}
}
}
}
// Heuristics:
//
// Select youngest 3 snapshots that match upper bound. This may help in situations
// where saving of a snapshot could not be completed because of a JVM crash. Hence,
// an attempt to load that snapshot will fail but loading an older snapshot may
// succeed.
//
// TODO: make number of loading attempts configurable
// TODO: improve heuristics for remote snapshot loading
for {
mds snapshotMetadata.get(processorId)
md load(mds.filter(md
md.sequenceNr <= criteria.maxSequenceNr &&
md.timestamp <= criteria.maxTimestamp).takeRight(3))
} yield md
}
def saveSnapshotAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] =
Future(saveSnapshot(metadata, snapshot))
private def saveSnapshot(metadata: SnapshotMetadata, snapshot: Any): Unit =
snapshotAccess.withOutputStream(metadata)(snapshotSerializer.serialize(_, metadata, snapshot))
def deleteSnapshot(metadata: SnapshotMetadata): Unit =
snapshotAccess.delete(metadata)
def updateMetadata(metadata: SnapshotMetadata): Unit = {
snapshotMetadata = snapshotMetadata + (snapshotMetadata.get(metadata.processorId) match {
case Some(mds) metadata.processorId -> (mds + metadata)
case None metadata.processorId -> SortedSet(metadata)
})
}
override def preStart() {
if (!snapshotDir.exists) snapshotDir.mkdirs()
snapshotMetadata = SortedSet.empty ++ snapshotAccess.metadata groupBy (_.processorId)
super.preStart()
}
}
/**
* Access to snapshot files on local filesystem.
*/
private[persistence] class LocalSnapshotAccess(snapshotDir: File) extends SnapshotAccess {
private val FilenamePattern = """^snapshot-(.+)-(\d+)-(\d+)""".r
def metadata: Set[SnapshotMetadata] = snapshotDir.listFiles.map(_.getName).collect {
case FilenamePattern(pid, snr, tms) SnapshotMetadata(URLDecoder.decode(pid, "UTF-8"), snr.toLong, tms.toLong)
}.toSet
def delete(metadata: SnapshotMetadata): Unit =
snapshotFile(metadata).delete()
def withOutputStream(metadata: SnapshotMetadata)(p: (OutputStream) Unit) =
withStream(new BufferedOutputStream(new FileOutputStream(snapshotFile(metadata))), p)
def withInputStream(metadata: SnapshotMetadata)(p: (InputStream) Any) =
withStream(new BufferedInputStream(new FileInputStream(snapshotFile(metadata))), p)
private def withStream[A <: Closeable, B](stream: A, p: A B): B =
try { p(stream) } finally { stream.close() }
private def snapshotFile(metadata: SnapshotMetadata): File =
new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.processorId, "UTF-8")}-${metadata.sequenceNr}-${metadata.timestamp}")
}

View file

@ -9,6 +9,7 @@ object ChannelSpec {
|serialize-creators = on
|serialize-messages = on
|akka.persistence.journal.leveldb.dir = "target/journal-channel-spec"
|akka.persistence.snapshot-store.local.dir = ${akka.persistence.journal.leveldb.dir}/snapshots
""".stripMargin
class TestProcessor(name: String) extends NamedProcessor(name) {

View file

@ -49,3 +49,5 @@ abstract class NamedProcessor(name: String) extends Processor {
trait TurnOffRecoverOnStart { this: Processor
override def preStart(): Unit = ()
}
case object GetState

View file

@ -9,10 +9,9 @@ object ProcessorSpec {
|serialize-creators = on
|serialize-messages = on
|akka.persistence.journal.leveldb.dir = "target/journal-processor-spec"
|akka.persistence.snapshot-store.local.dir = ${akka.persistence.journal.leveldb.dir}/snapshots
""".stripMargin
case object GetState
class RecoverTestProcessor(name: String) extends NamedProcessor(name) {
var state = List.empty[String]
def receive = {
@ -24,7 +23,7 @@ object ProcessorSpec {
override def preRestart(reason: Throwable, message: Option[Any]) = {
message match {
case Some(m: Persistent) delete(m) // delete message from journal
case Some(m: Persistent) deleteMessage(m) // delete message from journal
case _ // ignore
}
super.preRestart(reason, message)
@ -113,7 +112,7 @@ object ProcessorSpec {
class LastReplayedMsgFailsTestProcessor(name: String) extends RecoverTestProcessor(name) {
override def preRestart(reason: Throwable, message: Option[Any]) = {
message match {
case Some(m: Persistent) if (recoveryRunning) delete(m)
case Some(m: Persistent) if (recoveryRunning) deleteMessage(m)
case _
}
super.preRestart(reason, message)
@ -222,13 +221,13 @@ class ProcessorSpec extends AkkaSpec(ProcessorSpec.config) with PersistenceSpec
}
"support recovery with upper sequence number bound" in {
val processor = namedProcessor[RecoverOffTestProcessor]
processor ! Recover(1L)
processor ! Recover(toSequenceNr = 1L)
processor ! GetState
expectMsg(List("a-1"))
}
"never replace journaled messages" in {
val processor1 = namedProcessor[RecoverOffTestProcessor]
processor1 ! Recover(1L)
processor1 ! Recover(toSequenceNr = 1L)
processor1 ! Persistent("c")
processor1 ! GetState
expectMsg(List("a-1", "c-3"))

View file

@ -9,10 +9,9 @@ object ProcessorStashSpec {
|serialize-creators = on
|serialize-messages = on
|akka.persistence.journal.leveldb.dir = "target/journal-processor-stash-spec"
|akka.persistence.snapshot-store.local.dir = ${akka.persistence.journal.leveldb.dir}/snapshots
""".stripMargin
case object GetState
class StashingProcessor(name: String) extends NamedProcessor(name) {
var state: List[String] = Nil
@ -41,7 +40,7 @@ object ProcessorStashSpec {
class RecoveryFailureStashingProcessor(name: String) extends StashingProcessor(name) {
override def preRestart(reason: Throwable, message: Option[Any]) = {
message match {
case Some(m: Persistent) if (recoveryRunning) delete(m)
case Some(m: Persistent) if (recoveryRunning) deleteMessage(m)
case _
}
super.preRestart(reason, message)

View file

@ -0,0 +1,146 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import akka.actor._
import akka.testkit._
object SnapshotSpec {
val config =
"""
|serialize-creators = on
|serialize-messages = on
|akka.persistence.journal.leveldb.dir = "target/journal-snapshot-spec"
|akka.persistence.snapshot-store.local.dir = ${akka.persistence.journal.leveldb.dir}/snapshots
""".stripMargin
case object TakeSnapshot
class SaveSnapshotTestProcessor(name: String, probe: ActorRef) extends NamedProcessor(name) {
var state = List.empty[String]
def receive = {
case Persistent(payload, snr) state = s"${payload}-${snr}" :: state
case TakeSnapshot saveSnapshot(state)
case SaveSnapshotSucceeded(md) probe ! md.sequenceNr
case GetState probe ! state.reverse
}
}
class LoadSnapshotTestProcessor(name: String, probe: ActorRef) extends NamedProcessor(name) {
def receive = {
case Persistent(payload, snr) probe ! s"${payload}-${snr}"
case SnapshotOffer(md, s) probe ! ((md, s))
case other probe ! other
}
override def preStart() = ()
}
}
class SnapshotSpec extends AkkaSpec(SnapshotSpec.config) with PersistenceSpec with ImplicitSender {
import SnapshotSpec._
override protected def beforeEach() {
super.beforeEach()
val processor = system.actorOf(Props(classOf[SaveSnapshotTestProcessor], name, testActor))
processor ! Persistent("a")
processor ! TakeSnapshot
processor ! Persistent("b")
processor ! TakeSnapshot
processor ! Persistent("c")
processor ! Persistent("d")
processor ! TakeSnapshot
processor ! Persistent("e")
processor ! Persistent("f")
expectMsgAllOf(1L, 2L, 4L)
}
"A processor" must {
"recover state starting from the most recent snapshot" in {
val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor))
val processorId = name
processor ! Recover()
expectMsgPF() {
case (SnapshotMetadata(`processorId`, 4, timestamp), state) {
state must be(List("a-1", "b-2", "c-3", "d-4").reverse)
timestamp must be > (0L)
}
}
expectMsg("e-5")
expectMsg("f-6")
}
"recover state starting from the most recent snapshot matching an upper sequence number bound" in {
val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor))
val processorId = name
processor ! Recover(toSequenceNr = 3)
expectMsgPF() {
case (SnapshotMetadata(`processorId`, 2, timestamp), state) {
state must be(List("a-1", "b-2").reverse)
timestamp must be > (0L)
}
}
expectMsg("c-3")
}
"recover state starting from the most recent snapshot matching an upper sequence number bound (without further replay)" in {
val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor))
val processorId = name
processor ! Recover(toSequenceNr = 4)
processor ! "done"
expectMsgPF() {
case (SnapshotMetadata(`processorId`, 4, timestamp), state) {
state must be(List("a-1", "b-2", "c-3", "d-4").reverse)
timestamp must be > (0L)
}
}
expectMsg("done")
}
"recover state starting from the most recent snapshot matching criteria" in {
val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor))
val processorId = name
processor ! Recover(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2))
expectMsgPF() {
case (SnapshotMetadata(`processorId`, 2, timestamp), state) {
state must be(List("a-1", "b-2").reverse)
timestamp must be > (0L)
}
}
expectMsg("c-3")
expectMsg("d-4")
expectMsg("e-5")
expectMsg("f-6")
}
"recover state starting from the most recent snapshot matching criteria and an upper sequence number bound" in {
val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor))
val processorId = name
processor ! Recover(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2), toSequenceNr = 3)
expectMsgPF() {
case (SnapshotMetadata(`processorId`, 2, timestamp), state) {
state must be(List("a-1", "b-2").reverse)
timestamp must be > (0L)
}
}
expectMsg("c-3")
}
"recover state from scratch if snapshot based recovery is disabled" in {
val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor))
processor ! Recover(fromSnapshot = SnapshotSelectionCriteria.None, toSequenceNr = 3)
expectMsg("a-1")
expectMsg("b-2")
expectMsg("c-3")
}
}
}

View file

@ -41,7 +41,7 @@ public class ProcessorChannelExample {
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef destination = system.actorOf(Props.create(ExampleDestination.class));
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class, destination), "processor");
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class, destination), "processor-1");
processor.tell(Persistent.create("a"), null);
processor.tell(Persistent.create("b"), null);

View file

@ -34,7 +34,7 @@ public class ProcessorFailureExample {
@Override
public void preRestart(Throwable reason, Option<Object> message) {
if (message.isDefined() && message.get() instanceof Persistent) {
delete((Persistent) message.get());
deleteMessage((Persistent) message.get());
}
super.preRestart(reason, message);
}
@ -42,7 +42,7 @@ public class ProcessorFailureExample {
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor");
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-2");
processor.tell(Persistent.create("a"), null);
processor.tell("print", null);

View file

@ -0,0 +1,73 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.persistence.japi;
import java.io.Serializable;
import java.util.ArrayList;
import akka.actor.*;
import akka.persistence.*;
public class SnapshotExample {
public static class ExampleState implements Serializable {
private final ArrayList<String> received;
public ExampleState() {
this(new ArrayList<String>());
}
public ExampleState(ArrayList<String> received) {
this.received = received;
}
public ExampleState copy() {
return new ExampleState(new ArrayList<String>(received));
}
public void update(String s) {
received.add(s);
}
@Override
public String toString() {
return received.toString();
}
}
public static class ExampleProcessor extends UntypedProcessor {
private ExampleState state = new ExampleState();
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof Persistent) {
Persistent persistent = (Persistent)message;
state.update(String.format("%s-%d", persistent.payload(), persistent.sequenceNr()));
} else if (message instanceof SnapshotOffer) {
ExampleState s = (ExampleState)((SnapshotOffer)message).snapshot();
System.out.println("offered state = " + s);
state = s;
} else if (message.equals("print")) {
System.out.println("current state = " + state);
} else if (message.equals("snap")) {
saveSnapshot(state.copy());
}
}
}
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-3-java");
processor.tell(Persistent.create("a"), null);
processor.tell(Persistent.create("b"), null);
processor.tell("snap", null);
processor.tell(Persistent.create("c"), null);
processor.tell(Persistent.create("d"), null);
processor.tell("print", null);
Thread.sleep(1000);
system.shutdown();
}
}

View file

@ -1 +1,2 @@
akka.persistence.journal.leveldb.dir = "target/journal-example"
akka.persistence.journal.leveldb.dir = "target/example/journal"
akka.persistence.snapshot-store.local.dir = "target/example/snapshots"

View file

@ -34,7 +34,7 @@ object ProcessorChannelExample extends App {
}
val system = ActorSystem("example")
val processor = system.actorOf(Props(classOf[ExampleProcessor]), "processor")
val processor = system.actorOf(Props(classOf[ExampleProcessor]), "processor-1")
implicit val timeout = Timeout(3000)
import system.dispatcher

View file

@ -20,7 +20,7 @@ object ProcessorFailureExample extends App {
override def preRestart(reason: Throwable, message: Option[Any]) {
message match {
case Some(p: Persistent) if !recoveryRunning delete(p) // mark failing message as deleted
case Some(p: Persistent) if !recoveryRunning deleteMessage(p) // mark failing message as deleted
case _ // ignore
}
super.preRestart(reason, message)
@ -28,7 +28,7 @@ object ProcessorFailureExample extends App {
}
val system = ActorSystem("example")
val processor = system.actorOf(Props(classOf[ExampleProcessor]), "processor")
val processor = system.actorOf(Props(classOf[ExampleProcessor]), "processor-2")
processor ! Persistent("a")
processor ! "print"

View file

@ -0,0 +1,41 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.persistence
import akka.actor._
import akka.persistence._
object SnapshotExample extends App {
case class ExampleState(received: List[String] = Nil) {
def update(s: String) = copy(s :: received)
override def toString = received.reverse.toString
}
class ExampleProcessor extends Processor {
var state = ExampleState()
def receive = {
case Persistent(s, snr) state = state.update(s"${s}-${snr}")
case SaveSnapshotSucceeded(metadata) // ...
case SaveSnapshotFailed(metadata, reason) // ...
case SnapshotOffer(_, s: ExampleState) println("offered state = " + s); state = s
case "print" println("current state = " + state)
case "snap" saveSnapshot(state)
}
}
val system = ActorSystem("example")
val processor = system.actorOf(Props(classOf[ExampleProcessor]), "processor-3-scala")
processor ! Persistent("a")
processor ! Persistent("b")
processor ! "snap"
processor ! Persistent("c")
processor ! Persistent("d")
processor ! "print"
Thread.sleep(1000)
system.shutdown()
}