diff --git a/akka-docs/rst/dev/team.rst b/akka-docs/rst/dev/team.rst index a6ab2b7106..61341c3ced 100644 --- a/akka-docs/rst/dev/team.rst +++ b/akka-docs/rst/dev/team.rst @@ -19,10 +19,10 @@ Johannes Rudolph Committer Raymond Roestenburg Committer Piotr Gabryanczyk Committer Helena Edelson Committer +Martin Krasser Committer Henrik Engström Alumnus Peter Vlugter Alumnus -Martin Krasser Alumnus -Derek Williams Alumnus +Derek Williams Alumnus Debasish Ghosh Alumnus Ross McDonald Alumnus Eckhart Hertzler Alumnus diff --git a/akka-docs/rst/general/message-delivery-guarantees.rst b/akka-docs/rst/general/message-delivery-guarantees.rst index e398346254..bbfc70fd1d 100644 --- a/akka-docs/rst/general/message-delivery-guarantees.rst +++ b/akka-docs/rst/general/message-delivery-guarantees.rst @@ -301,6 +301,11 @@ Martin Krasser has written an implementation of event sourcing principles on top of Akka called `eventsourced`_, including support for guaranteed delivery semantics as described in the previous section. +A successor of `eventsourced` is now part of Akka (see :ref:`persistence`) which +is a general solution for actor state persistence. It journals messages before +they are received by an actor and can be used to implement both event sourcing +and command sourcing. + .. _Event Sourcing: http://martinfowler.com/eaaDev/EventSourcing.html .. _eventsourced: https://github.com/eligosource/eventsourced diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index 5adbcf0bfb..86ff753dca 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -88,7 +88,7 @@ public class PersistenceDocTest { @Override public void preRestart(Throwable reason, Option message) { if (message.isDefined() && message.get() instanceof Persistent) { - delete((Persistent) message.get()); + deleteMessage((Persistent) message.get()); } super.preRestart(reason, message); } @@ -169,4 +169,62 @@ public class PersistenceDocTest { } } }; + + static Object o4 = new Object() { + //#save-snapshot + class MyProcessor extends UntypedProcessor { + private Object state; + + @Override + public void onReceive(Object message) throws Exception { + if (message.equals("snap")) { + saveSnapshot(state); + } else if (message instanceof SaveSnapshotSucceeded) { + SnapshotMetadata metadata = ((SaveSnapshotSucceeded)message).metadata(); + // ... + } else if (message instanceof SaveSnapshotFailed) { + SnapshotMetadata metadata = ((SaveSnapshotFailed)message).metadata(); + // ... + } + } + } + //#save-snapshot + }; + + static Object o5 = new Object() { + //#snapshot-offer + class MyProcessor extends UntypedProcessor { + private Object state; + + @Override + public void onReceive(Object message) throws Exception { + if (message instanceof SnapshotOffer) { + state = ((SnapshotOffer)message).snapshot(); + // ... + } else if (message instanceof Persistent) { + // ... + } + } + } + //#snapshot-offer + + class MyActor extends UntypedActor { + ActorRef processor; + + public MyActor() { + processor = getContext().actorOf(Props.create(MyProcessor.class)); + } + + public void onReceive(Object message) throws Exception { + // ... + } + + private void recover() { + //#snapshot-criteria + processor.tell(Recover.create(SnapshotSelectionCriteria.create(457L, System.currentTimeMillis())), null); + //#snapshot-criteria + } + } + + }; } diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index f26af4239f..930e1e4572 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -50,7 +50,12 @@ Configuration By default, journaled messages are written to a directory named ``journal`` in the current working directory. This can be changed by configuration where the specified path can be relative or absolute: -.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#config +.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#journal-config + +The default storage location of :ref:`snapshots-java` is a directory named ``snapshots`` in the current working directory. +This can be changed by configuration where the specified path can be relative or absolute: + +.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#snapshot-config .. _processors-java: @@ -214,11 +219,40 @@ The sequence number of a ``Persistent`` message can be obtained via its ``sequen messages are assigned sequence numbers on a per-processor basis. A sequence starts at ``1L`` and doesn't contain gaps unless a processor marks a message as deleted. +.. _snapshots-java: + +Snapshots +========= + +Snapshots can dramatically reduce recovery times. Processors can save snapshots of internal state by calling the +``saveSnapshot`` method on ``Processor``. If saving of a snapshot succeeds, the processor will receive a +``SaveSnapshotSucceeded`` message, otherwise a ``SaveSnapshotFailed`` message. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#save-snapshot + +During recovery, the processor is offered a previously saved snapshot via a ``SnapshotOffer`` message from +which it can initialize internal state. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#snapshot-offer + +The replayed messages that follow the ``SnapshotOffer`` message, if any, are younger than the offered snapshot. +They finally recover the processor to its current (i.e. latest) state. + +In general, a processor is only offered a snapshot if that processor has previously saved one or more snapshots +and at least one of these snapshots matches the ``SnapshotSelectionCriteria`` that can be specified for recovery. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#snapshot-criteria + +If not specified, they default to ``SnapshotSelectionCriteria.latest()`` which selects the latest (= youngest) snapshot. +To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.none()``. A recovery where no +saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages. + Upcoming features ================= -* Snapshot based recovery -* Configurable serialization -* Reliable channels * Journal plugin API +* Snapshot store plugin API +* Reliable channels +* Custom serialization of messages and snapshots +* Extended deletion of messages and snapshots * ... diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index b7f950a3f9..f891e2c17a 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -1,16 +1,20 @@ package docs.persistence import akka.actor.ActorSystem -import akka.persistence.{ Recover, Persistent, Processor } -import akka.testkit.{ ImplicitSender, AkkaSpec } +import akka.persistence._ +import akka.persistence.SaveSnapshotSucceeded +import scala.Some trait PersistenceDocSpec { val system: ActorSystem val config = """ - //#config + //#journal-config akka.persistence.journal.leveldb.dir = "target/journal" - //#config + //#journal-config + //#snapshot-config + akka.persistence.snapshot-store.local.dir = "target/snapshots" + //#snapshot-config """ import system._ @@ -63,7 +67,7 @@ trait PersistenceDocSpec { //#deletion override def preRestart(reason: Throwable, message: Option[Any]) { message match { - case Some(p: Persistent) ⇒ delete(p) + case Some(p: Persistent) ⇒ deleteMessage(p) case _ ⇒ } super.preRestart(reason, message) @@ -184,4 +188,41 @@ trait PersistenceDocSpec { } //#fsm-example } + + new AnyRef { + //#save-snapshot + class MyProcessor extends Processor { + var state: Any = _ + + def receive = { + case "snap" ⇒ saveSnapshot(state) + case SaveSnapshotSucceeded(metadata) ⇒ // ... + case SaveSnapshotFailed(metadata, reason) ⇒ // ... + } + } + //#save-snapshot + } + + new AnyRef { + //#snapshot-offer + class MyProcessor extends Processor { + var state: Any = _ + + def receive = { + case SnapshotOffer(metadata, offeredSnapshot) ⇒ state = offeredSnapshot + case Persistent(payload, sequenceNr) ⇒ // ... + } + } + //#snapshot-offer + + import akka.actor.Props + + val processor = system.actorOf(Props[MyProcessor]) + + //#snapshot-criteria + processor ! Recover(fromSnapshot = SnapshotSelectionCriteria( + maxSequenceNr = 457L, + maxTimestamp = System.currentTimeMillis)) + //#snapshot-criteria + } } diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 5392d92f1f..1850d4395d 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -46,7 +46,12 @@ Configuration By default, journaled messages are written to a directory named ``journal`` in the current working directory. This can be changed by configuration where the specified path can be relative or absolute: -.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#config +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#journal-config + +The default storage location of :ref:`snapshots` is a directory named ``snapshots`` in the current working directory. +This can be changed by configuration where the specified path can be relative or absolute: + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#snapshot-config .. _processors: @@ -221,6 +226,38 @@ method or by pattern matching Persistent messages are assigned sequence numbers on a per-processor basis. A sequence starts at ``1L`` and doesn't contain gaps unless a processor marks a message as deleted. +.. _snapshots: + +Snapshots +========= + +Snapshots can dramatically reduce recovery times. Processors can save snapshots of internal state by calling the +``saveSnapshot`` method on ``Processor``. If saving of a snapshot succeeds, the processor will receive a +``SaveSnapshotSucceeded`` message, otherwise a ``SaveSnapshotFailed`` message + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#save-snapshot + +where ``metadata`` is of type ``SnapshotMetadata``: + +.. includecode:: ../../../akka-persistence/src/main/scala/akka/persistence/Snapshot.scala#snapshot-metadata + +During recovery, the processor is offered a previously saved snapshot via a ``SnapshotOffer`` message from +which it can initialize internal state. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#snapshot-offer + +The replayed messages that follow the ``SnapshotOffer`` message, if any, are younger than the offered snapshot. +They finally recover the processor to its current (i.e. latest) state. + +In general, a processor is only offered a snapshot if that processor has previously saved one or more snapshots +and at least one of these snapshots matches the ``SnapshotSelectionCriteria`` that can be specified for recovery. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#snapshot-criteria + +If not specified, they default to ``SnapshotSelectionCriteria.Latest`` which selects the latest (= youngest) snapshot. +To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.None``. A recovery where no +saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages. + Miscellaneous ============= @@ -234,8 +271,9 @@ State machines can be persisted by mixing in the ``FSM`` trait into processors. Upcoming features ================= -* Snapshot based recovery -* Configurable serialization -* Reliable channels * Journal plugin API +* Snapshot store plugin API +* Reliable channels +* Custom serialization of messages and snapshots +* Extended deletion of messages and snapshots * ... diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index f1842097d1..0d81389dee 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -2,16 +2,34 @@ akka { persistence { journal { use = "leveldb" - - inmem { - // ... - } - leveldb { dir = "journal" - dispatcher { - executor = "thread-pool-executor" + write.dispatcher { type = PinnedDispatcher + executor = "thread-pool-executor" + } + replay.dispatcher { + type = Dispatcher + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = 2 + core-pool-size-max = 8 + } + } + fsync = off + } + } + snapshot-store { + use = "local" + local { + dir = "snapshots" + io.dispatcher { + type = Dispatcher + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = 2 + core-pool-size-max = 8 + } } } } diff --git a/akka-persistence/src/main/scala/akka/persistence/Channel.scala b/akka-persistence/src/main/scala/akka/persistence/Channel.scala index 982e4323ab..4d903d4272 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Channel.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Channel.scala @@ -69,17 +69,17 @@ class Channel private (_channelId: Option[String]) extends Actor with Stash { import ResolvedDelivery._ private val delivering: Actor.Receive = { - case Deliver(p: PersistentImpl, destination, resolve) ⇒ { - if (!p.confirms.contains(id)) { - val msg = p.copy(channelId = id, - confirmTarget = extension.journalFor(p.processorId), - confirmMessage = Confirm(p.processorId, p.sequenceNr, id)) + case Deliver(persistent: PersistentImpl, destination, resolve) ⇒ { + if (!persistent.confirms.contains(id)) { + val msg = persistent.copy(channelId = id, + confirmTarget = extension.journalFor(persistent.processorId), + confirmMessage = Confirm(persistent.processorId, persistent.sequenceNr, id)) resolve match { - case Resolve.Sender if !p.resolved ⇒ { + case Resolve.Sender if !persistent.resolved ⇒ { context.actorOf(Props(classOf[ResolvedSenderDelivery], msg, destination, sender)) ! DeliverResolved context.become(buffering, false) } - case Resolve.Destination if !p.resolved ⇒ { + case Resolve.Destination if !persistent.resolved ⇒ { context.actorOf(Props(classOf[ResolvedDestinationDelivery], msg, destination, sender)) ! DeliverResolved context.become(buffering, false) } diff --git a/akka-persistence/src/main/scala/akka/persistence/Journal.scala b/akka-persistence/src/main/scala/akka/persistence/Journal.scala index a430e65222..59adfe94fd 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Journal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Journal.scala @@ -8,7 +8,6 @@ import akka.actor._ private[persistence] trait JournalFactory { /** - * * Creates a new journal actor. */ def createJournal(implicit factory: ActorRefFactory): ActorRef @@ -59,27 +58,22 @@ private[persistence] object Journal { case class Looped(message: Any) /** - * Instructs a journal to replay persistent messages to `processor`, identified by - * `processorId`. Messages are replayed up to sequence number `toSequenceNr` (inclusive). - * - * @param toSequenceNr upper sequence number bound (inclusive) for replay. - * @param processor processor that receives the replayed messages. - * @param processorId processor id. + * ... */ - case class Replay(toSequenceNr: Long, processor: ActorRef, processorId: String) + case class Replay(fromSequenceNr: Long, toSequenceNr: Long, processor: ActorRef, processorId: String) /** - * Wrapper for a replayed `persistent` message. + * Reply message to a processor that `persistent` message has been replayed. * * @param persistent persistent message. */ case class Replayed(persistent: PersistentImpl) /** - * Message sent to a processor after the last [[Replayed]] message. + * Reply message to a processor that all `persistent` messages have been replayed. * * @param maxSequenceNr the highest stored sequence number (for a processor). */ - case class RecoveryEnd(maxSequenceNr: Long) + case class ReplayCompleted(maxSequenceNr: Long) } diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index 19632689b4..9710f2343d 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -7,20 +7,29 @@ package akka.persistence import com.typesafe.config.Config import akka.actor._ -import akka.persistence.journal._ +import akka.persistence.journal.leveldb._ +import akka.persistence.snapshot.local._ /** - * Akka persistence extension. + * Persistence extension. */ object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider { class Settings(config: Config) { - val rootConfig = config.getConfig("akka.persistence.journal") - val journalName = rootConfig.getString("use") - val journalConfig = rootConfig.getConfig(journalName) + val rootConfig = config.getConfig("akka.persistence") + + val journalsConfig = rootConfig.getConfig("journal") + val journalName = journalsConfig.getString("use") + val journalConfig = journalsConfig.getConfig(journalName) val journalFactory = journalName match { - case "inmem" ⇒ new InmemJournalSettings(journalConfig) case "leveldb" ⇒ new LeveldbJournalSettings(journalConfig) } + + val snapshotStoresConfig = rootConfig.getConfig("snapshot-store") + val snapshotStoreName = snapshotStoresConfig.getString("use") + val snapshotStoreConfig = snapshotStoresConfig.getConfig(snapshotStoreName) + val snapshotStoreFactory = snapshotStoreName match { + case "local" ⇒ new LocalSnapshotStoreSettings(snapshotStoreConfig) + } } /** @@ -34,21 +43,28 @@ object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider { } /** - * Akka persistence extension. + * Persistence extension. */ class Persistence(val system: ExtendedActorSystem) extends Extension { private val settings = new Persistence.Settings(system.settings.config) private val journal = settings.journalFactory.createJournal(system) - // TODO: journal should have its own dispatcher + private val snapshotStore = settings.snapshotStoreFactory.createSnapshotStore(system) /** - * Returns a journal for processor identified by `pid`. - * - * @param processorId processor id. + * Returns a snapshot store for a processor identified by `processorId`. + */ + def snapshotStoreFor(processorId: String): ActorRef = { + // Currently returns a snapshot store singleton but this methods allows for later + // optimizations where each processor can have its own snapshot store actor. + snapshotStore + } + + /** + * Returns a journal for a processor identified by `processorId`. */ def journalFor(processorId: String): ActorRef = { - // Currently returns a journal singleton is returned but this methods allows - // for later optimisations where each processor can have its own journal actor. + // Currently returns a journal singleton but this methods allows for later + // optimizations where each processor can have its own journal actor. journal } diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index 08e50ee527..7ef3fafb56 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -60,8 +60,8 @@ object Persistent { /** * Creates a new persistent message, derived from an implicit current message. - * When used inside a [[Processor]], this is the optional current [[Message]] - * of that processor. + * When used inside a [[Processor]], this is the optional current [[Persistent]] + * message of that processor. * * @param payload payload of the new persistent message. * @param currentPersistentMessage optional current persistent message, defaults to `None`. diff --git a/akka-persistence/src/main/scala/akka/persistence/Processor.scala b/akka-persistence/src/main/scala/akka/persistence/Processor.scala index b55c01d78c..567b791deb 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Processor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Processor.scala @@ -31,12 +31,12 @@ import akka.dispatch._ * state from these messages. New messages sent to a processor during recovery do not interfere with replayed * messages, hence applications don't need to wait for a processor to complete its recovery. * - * Automated recovery can be turned off or customized by overriding the [[preStartProcessor]] and - * [[preRestartProcessor]] life cycle hooks. If automated recovery is turned off, an application can - * explicitly recover a processor by sending it a [[Recover]] message. + * Automated recovery can be turned off or customized by overriding the [[preStart]] and [[preRestart]] life + * cycle hooks. If automated recovery is turned off, an application can explicitly recover a processor by + * sending it a [[Recover]] message. * * [[Persistent]] messages are assigned sequence numbers that are generated on a per-processor basis. A sequence - * starts at `1L` and doesn't contain gaps unless a processor (logically) [[delete]]s a message. + * starts at `1L` and doesn't contain gaps unless a processor (logically) deletes a message * * During recovery, a processor internally buffers new messages until recovery completes, so that new messages * do not interfere with replayed messages. This internal buffer (the ''processor stash'') is isolated from the @@ -47,6 +47,7 @@ import akka.dispatch._ */ trait Processor extends Actor with Stash { import Journal._ + import SnapshotStore._ private val extension = Persistence(context.system) private val _processorId = extension.processorId(self) @@ -81,24 +82,37 @@ trait Processor extends Actor with Stash { override def toString: String = "recovery pending" def aroundReceive(receive: Actor.Receive, message: Any): Unit = message match { - case Recover(toSnr) ⇒ { + case Recover(fromSnap, toSnr) ⇒ { _currentState = recoveryStarted - journal ! Replay(toSnr, self, processorId) + snapshotStore ! LoadSnapshot(processorId, fromSnap, toSnr) } case _ ⇒ stashInternal() } } /** - * Processes replayed messages. Changes to `recoverySucceeded` if all replayed - * messages have been successfully processed, otherwise, changes to `recoveryFailed`. - * In case of a failure, the exception is caught and stored for being thrown later - * in `prepareRestart`. + * Processes a loaded snapshot and replayed messages, if any. If processing of the loaded + * snapshot fails, the exception is thrown immediately. If processing of a replayed message + * fails, the exception is caught and stored for being thrown later and state is changed to + * `recoveryFailed`. */ private val recoveryStarted = new State { override def toString: String = "recovery started" def aroundReceive(receive: Actor.Receive, message: Any) = message match { + case LoadSnapshotCompleted(sso, toSnr) ⇒ sso match { + case Some(ss) ⇒ { + process(receive, SnapshotOffer(ss.metadata, ss.snapshot)) + journal ! Replay(ss.metadata.sequenceNr + 1L, toSnr, self, processorId) + } case None ⇒ { + journal ! Replay(1L, toSnr, self, processorId) + } + } + case ReplayCompleted(maxSnr) ⇒ { + _currentState = recoverySucceeded + _sequenceNr = maxSnr + unstashAllInternal() + } case Replayed(p) ⇒ try { processPersistent(receive, p) } catch { case t: Throwable ⇒ { _currentState = recoveryFailed // delay throwing exception to prepareRestart @@ -106,12 +120,7 @@ trait Processor extends Actor with Stash { _recoveryFailureMessage = currentEnvelope } } - case RecoveryEnd(maxSnr) ⇒ { - _currentState = recoverySucceeded - _sequenceNr = maxSnr - unstashAllInternal() - } - case Recover(_) ⇒ // ignore + case r: Recover ⇒ // ignore case _ ⇒ stashInternal() } } @@ -123,12 +132,14 @@ trait Processor extends Actor with Stash { override def toString: String = "recovery finished" def aroundReceive(receive: Actor.Receive, message: Any) = message match { - case Recover(_) ⇒ // ignore - case Replayed(p) ⇒ processPersistent(receive, p) // can occur after unstash from user stash - case Written(p) ⇒ processPersistent(receive, p) - case Looped(p) ⇒ process(receive, p) - case p: PersistentImpl ⇒ journal forward Write(p.copy(processorId = processorId, sequenceNr = nextSequenceNr()), self) - case m ⇒ journal forward Loop(m, self) + case r: Recover ⇒ // ignore + case Replayed(p) ⇒ processPersistent(receive, p) // can occur after unstash from user stash + case Written(p) ⇒ processPersistent(receive, p) + case Looped(p) ⇒ process(receive, p) + case s: SaveSnapshotSucceeded ⇒ process(receive, s) + case f: SaveSnapshotFailed ⇒ process(receive, f) + case p: PersistentImpl ⇒ journal forward Write(p.copy(processorId = processorId, sequenceNr = nextSequenceNr()), self) + case m ⇒ journal forward Loop(m, self) } } @@ -141,12 +152,13 @@ trait Processor extends Actor with Stash { override def toString: String = "recovery failed" def aroundReceive(receive: Actor.Receive, message: Any) = message match { - case RecoveryEnd(maxSnr) ⇒ { + case ReplayCompleted(maxSnr) ⇒ { _currentState = prepareRestart mailbox.enqueueFirst(self, _recoveryFailureMessage) } case Replayed(p) ⇒ updateLastSequenceNr(p) - case _ ⇒ // ignore + case r: Recover ⇒ // ignore + case _ ⇒ stashInternal() } } @@ -172,7 +184,8 @@ trait Processor extends Actor with Stash { private var _recoveryFailureReason: Throwable = _ private var _recoveryFailureMessage: Envelope = _ - private lazy val journal: ActorRef = extension.journalFor(processorId) + private lazy val journal = extension.journalFor(processorId) + private lazy val snapshotStore = extension.snapshotStoreFor(processorId) /** * Processor id. Defaults to this processor's path and can be overridden. @@ -211,10 +224,18 @@ trait Processor extends Actor with Stash { * caused an exception. Processors that want to re-receive that persistent message during recovery * should not call this method. */ - def delete(persistent: Persistent) { + def deleteMessage(persistent: Persistent): Unit = { journal ! Delete(persistent) } + /** + * Saves a `snapshot` of this processor's state. If saving succeeds, this processor will receive a + * [[SaveSnapshotSucceeded]] message, otherwise a [[SaveSnapshotFailed]] message. + */ + def saveSnapshot(snapshot: Any): Unit = { + snapshotStore ! SaveSnapshot(SnapshotMetadata(processorId, lastSequenceNr), snapshot) + } + /** * INTERNAL API. */ @@ -268,7 +289,7 @@ trait Processor extends Actor with Stash { */ override def preRestart(reason: Throwable, message: Option[Any]): Unit = { message match { - case Some(_) ⇒ self ! Recover(lastSequenceNr) + case Some(_) ⇒ self ! Recover(toSequenceNr = lastSequenceNr) case None ⇒ self ! Recover() } } @@ -348,12 +369,12 @@ trait Processor extends Actor with Stash { * state from these messages. New messages sent to a processor during recovery do not interfere with replayed * messages, hence applications don't need to wait for a processor to complete its recovery. * - * Automated recovery can be turned off or customized by overriding the [[preStartProcessor]] and - * [[preRestartProcessor]] life cycle hooks. If automated recovery is turned off, an application can - * explicitly recover a processor by sending it a [[Recover]] message. + * Automated recovery can be turned off or customized by overriding the [[preStart]] and [[preRestart]] life + * cycle hooks. If automated recovery is turned off, an application can explicitly recover a processor by + * sending it a [[Recover]] message. * * [[Persistent]] messages are assigned sequence numbers that are generated on a per-processor basis. A sequence - * starts at `1L` and doesn't contain gaps unless a processor (logically) [[delete]]s a message. + * starts at `1L` and doesn't contain gaps unless a processor (logically) deletes a message. * * During recovery, a processor internally buffers new messages until recovery completes, so that new messages * do not interfere with replayed messages. This internal buffer (the ''processor stash'') is isolated from the @@ -371,23 +392,3 @@ abstract class UntypedProcessor extends UntypedActor with Processor { */ def getCurrentPersistentMessage = currentPersistentMessage.getOrElse(null) } - -/** - * Recovery request for a [[Processor]]. - * - * @param toSequenceNr upper sequence number bound (inclusive) for replayed messages. - */ -@SerialVersionUID(1L) -case class Recover(toSequenceNr: Long = Long.MaxValue) - -object Recover { - /** - * Java API. - */ - def create() = Recover() - - /** - * Java API. - */ - def create(toSequenceNr: Long) = Recover(toSequenceNr) -} diff --git a/akka-persistence/src/main/scala/akka/persistence/Recover.scala b/akka-persistence/src/main/scala/akka/persistence/Recover.scala new file mode 100644 index 0000000000..08b88878d1 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/Recover.scala @@ -0,0 +1,55 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.persistence + +/** + * Instructs a processor to recover itself. Recovery will start from a snapshot if the processor has + * previously saved one or more snapshots and at least one of these snapshots matches the specified + * `fromSnapshot` criteria. Otherwise, recovery will start from scratch by replaying all journaled + * messages. + * + * If recovery starts from a snapshot, the processor is offered that snapshot with a [[SnapshotOffer]] + * message, followed by replayed messages, if any, that are younger than the snapshot, up to the + * specified upper sequence number bound (`toSequenceNr`). + * + * @param fromSnapshot criteria for selecting a saved snapshot from which recovery should start. Default + * is latest (= youngest) snapshot. + * @param toSequenceNr upper sequence number bound (inclusive) for recovery. Default is no upper bound. + */ +@SerialVersionUID(1L) +case class Recover(fromSnapshot: SnapshotSelectionCriteria = SnapshotSelectionCriteria.Latest, toSequenceNr: Long = Long.MaxValue) + +object Recover { + /** + * Java API. + * + * @see [[Recover]] + */ + def create() = Recover() + + /** + * Java API. + * + * @see [[Recover]] + */ + def create(toSequenceNr: Long) = + Recover(toSequenceNr = toSequenceNr) + + /** + * Java API. + * + * @see [[Recover]] + */ + def create(fromSnapshot: SnapshotSelectionCriteria) = + Recover(fromSnapshot = fromSnapshot) + + /** + * Java API. + * + * @see [[Recover]] + */ + def create(fromSnapshot: SnapshotSelectionCriteria, toSequenceNr: Long) = + Recover(fromSnapshot, toSequenceNr) +} diff --git a/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala b/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala new file mode 100644 index 0000000000..8dbfb14831 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala @@ -0,0 +1,208 @@ +/** + * Copyright (C) 2012-2013 Eligotech BV. + */ + +package akka.persistence + +import java.io._ + +import akka.actor._ +import akka.util.ClassLoaderObjectInputStream + +/** + * Snapshot metadata. + * + * @param processorId id of processor from which the snapshot was taken. + * @param sequenceNr sequence number at which the snapshot was taken. + * @param timestamp time at which the snapshot was saved. + */ +@SerialVersionUID(1L) //#snapshot-metadata +case class SnapshotMetadata(processorId: String, sequenceNr: Long, timestamp: Long = 0L) +//#snapshot-metadata + +/** + * Indicates successful saving of a snapshot. + * + * @param metadata snapshot metadata. + */ +@SerialVersionUID(1L) +case class SaveSnapshotSucceeded(metadata: SnapshotMetadata) + +/** + * Indicates failed saving of a snapshot. + * + * @param metadata snapshot metadata. + * @param reason failure reason. + */ +@SerialVersionUID(1L) +case class SaveSnapshotFailed(metadata: SnapshotMetadata, reason: Throwable) + +/** + * Offers a [[Processor]] a previously saved `snapshot` during recovery. This offer is received + * before any further replayed messages. + */ +@SerialVersionUID(1L) +case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any) + +/** + * Snapshot selection criteria for recovery. + * + * @param maxSequenceNr upper bound for a selected snapshot's sequence number. Default is no upper bound. + * @param maxTimestamp upper bound for a selected snapshot's timestamp. Default is no upper bound. + * + * @see [[Recover]] + */ +@SerialVersionUID(1L) +case class SnapshotSelectionCriteria(maxSequenceNr: Long = Long.MaxValue, maxTimestamp: Long = Long.MaxValue) { + private[persistence] def limit(toSequenceNr: Long): SnapshotSelectionCriteria = + if (toSequenceNr < maxSequenceNr) copy(maxSequenceNr = toSequenceNr) else this +} + +object SnapshotSelectionCriteria { + /** + * The latest saved snapshot. + */ + val Latest = SnapshotSelectionCriteria() + + /** + * No saved snapshot matches. + */ + val None = SnapshotSelectionCriteria(0L, 0L) + + /** + * Java API. + */ + def create(maxSequenceNr: Long, maxTimestamp: Long) = + SnapshotSelectionCriteria(maxSequenceNr, maxTimestamp) + + /** + * Java API. + */ + def latest() = Latest + + /** + * Java API. + */ + def none() = None +} + +// TODO: support application-defined snapshot serializers +// TODO: support application-defined snapshot access + +/** + * Snapshot serialization extension. + */ +private[persistence] object SnapshotSerialization extends ExtensionId[SnapshotSerialization] with ExtensionIdProvider { + def createExtension(system: ExtendedActorSystem): SnapshotSerialization = new SnapshotSerialization(system) + def lookup() = SnapshotSerialization +} + +/** + * Snapshot serialization extension. + */ +private[persistence] class SnapshotSerialization(val system: ExtendedActorSystem) extends Extension { + import akka.serialization.JavaSerializer + + /** + * Java serialization based snapshot serializer. + */ + val java = new SnapshotSerializer { + def serialize(stream: OutputStream, metadata: SnapshotMetadata, state: Any) = { + val out = new ObjectOutputStream(stream) + JavaSerializer.currentSystem.withValue(system) { out.writeObject(state) } + } + + def deserialize(stream: InputStream, metadata: SnapshotMetadata) = { + val in = new ClassLoaderObjectInputStream(system.dynamicAccess.classLoader, stream) + JavaSerializer.currentSystem.withValue(system) { in.readObject } + } + } +} + +/** + * Stream-based snapshot serializer. + */ +private[persistence] trait SnapshotSerializer { + /** + * Serializes a `snapshot` to an output stream. + */ + def serialize(stream: OutputStream, metadata: SnapshotMetadata, snapshot: Any): Unit + + /** + * Deserializes a snapshot from an input stream. + */ + def deserialize(stream: InputStream, metadata: SnapshotMetadata): Any +} + +/** + * Input and output stream management for snapshot serialization. + */ +private[persistence] trait SnapshotAccess { + /** + * Provides a managed output stream for serializing a snapshot. + * + * @param metadata snapshot metadata needed to create an output stream. + * @param body called with the managed output stream as argument. + */ + def withOutputStream(metadata: SnapshotMetadata)(body: OutputStream ⇒ Unit) + + /** + * Provides a managed input stream for deserializing a state object. + * + * @param metadata snapshot metadata needed to create an input stream. + * @param body called with the managed input stream as argument. + * @return read snapshot. + */ + def withInputStream(metadata: SnapshotMetadata)(body: InputStream ⇒ Any): Any + + /** + * Loads the snapshot metadata of all currently stored snapshots. + */ + def metadata: Set[SnapshotMetadata] + + /** + * Deletes the snapshot referenced by `metadata`. + */ + def delete(metadata: SnapshotMetadata) +} + +private[persistence] trait SnapshotStoreFactory { + /** + * Creates a new snapshot store actor. + */ + def createSnapshotStore(implicit factory: ActorRefFactory): ActorRef +} + +private[persistence] object SnapshotStore { + /** + * Instructs a snapshot store to load a snapshot. + * + * @param processorId processor id. + * @param criteria criteria for selecting a saved snapshot from which recovery should start. + * @param toSequenceNr upper sequence number bound (inclusive) for recovery. + */ + case class LoadSnapshot(processorId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) + + /** + * Reply message to a processor that a snapshot loading attempt has been completed. + * + * @param savedSnapshot + */ + case class LoadSnapshotCompleted(savedSnapshot: Option[SavedSnapshot], toSequenceNr: Long) + + /** + * Instructs snapshot store to save a snapshot. + * + * @param metadata snapshot metadata. + * @param snapshot snapshot. + */ + case class SaveSnapshot(metadata: SnapshotMetadata, snapshot: Any) + + /** + * In-memory representation of a saved snapshot. + * + * @param metadata snapshot metadata. + * @param snapshot saved snapshot. + */ + case class SavedSnapshot(metadata: SnapshotMetadata, snapshot: Any) +} diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/InmemJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/InmemJournal.scala deleted file mode 100644 index fc8e89533f..0000000000 --- a/akka-persistence/src/main/scala/akka/persistence/journal/InmemJournal.scala +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ - -package akka.persistence.journal - -import com.typesafe.config.Config - -import akka.actor._ -import akka.pattern.PromiseActorRef -import akka.persistence._ - -/** - * In-memory journal configuration object. - */ -private[persistence] class InmemJournalSettings(config: Config) extends JournalFactory { - /** - * Creates a new in-memory journal actor from this configuration object. - */ - def createJournal(implicit factory: ActorRefFactory): ActorRef = factory.actorOf(Props(classOf[InmemJournal], this)) -} - -/** - * In-memory journal. - */ -private[persistence] class InmemJournal(settings: InmemJournalSettings) extends Actor { - // processorId => (message, sender, deleted) - private var messages = Map.empty[String, List[(PersistentImpl, ActorRef, Boolean)]] - // (processorId, sequenceNr) => confirming channels ids - private var confirms = Map.empty[(String, Long), List[String]] - - import Journal._ - - def receive = { - case Write(pm, p) ⇒ { - // must be done because PromiseActorRef instances have no uid set TODO: discuss - val ps = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender - messages = messages + (messages.get(pm.processorId) match { - case None ⇒ pm.processorId -> List((pm.copy(resolved = false), ps, false)) - case Some(mss) ⇒ pm.processorId -> ((pm.copy(resolved = false), ps, false) :: mss) - }) - p.tell(Written(pm), sender) - } - case c @ Confirm(pid, snr, cid) ⇒ { - val pair = (pid, snr) - confirms = confirms + (confirms.get(pair) match { - case None ⇒ pair -> List(cid) - case Some(cids) ⇒ pair -> (cid :: cids) - }) - // TODO: turn off by default and allow to turn on by configuration - context.system.eventStream.publish(c) - } - case Delete(pm: PersistentImpl) ⇒ { - val pid = pm.processorId - val snr = pm.sequenceNr - messages = messages map { - case (`pid`, mss) ⇒ pid -> (mss map { - case (msg, sdr, _) if msg.sequenceNr == snr ⇒ (msg, sdr, true) - case ms ⇒ ms - }) - case kv ⇒ kv - } - } - case Loop(m, p) ⇒ { - p.tell(Looped(m), sender) - } - case Replay(toSnr, p, pid) ⇒ { - val cfs = confirms.withDefaultValue(Nil) - for { - mss ← messages.get(pid) - (msg, sdr, del) ← mss.reverseIterator.filter(_._1.sequenceNr <= toSnr) - } if (!del) p.tell(Replayed(msg.copy(confirms = cfs((msg.processorId, msg.sequenceNr)))), sdr) - p.tell(RecoveryEnd(messages.get(pid).map(_.head._1.sequenceNr).getOrElse(0L)), self) - } - } -} diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/LeveldbJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/LeveldbJournal.scala deleted file mode 100644 index e764554be6..0000000000 --- a/akka-persistence/src/main/scala/akka/persistence/journal/LeveldbJournal.scala +++ /dev/null @@ -1,278 +0,0 @@ -/** - * Copyright (C) 2012-2013 Eligotech BV. - */ - -package akka.persistence.journal - -import java.io.File -import java.nio.ByteBuffer - -import com.typesafe.config.Config - -import org.iq80.leveldb._ - -import akka.actor._ -import akka.pattern.PromiseActorRef -import akka.persistence._ -import akka.serialization.{ Serialization, SerializationExtension } - -/** - * LevelDB journal configuration object. - */ -private[persistence] class LeveldbJournalSettings(config: Config) extends JournalFactory { - /** - * Name of directory where journal files shall be stored. Can be a relative or absolute path. - */ - val dir: String = config.getString("dir") - - /** - * Currently `false`. - */ - val checksum = false - - /** - * Currently `false`. - */ - val fsync = false - - /** - * Creates a new LevelDB journal actor from this configuration object. - */ - def createJournal(implicit factory: ActorRefFactory): ActorRef = - factory.actorOf(Props(classOf[LeveldbJournal], this).withDispatcher("akka.persistence.journal.leveldb.dispatcher")) -} - -/** - * LevelDB journal. - */ -private[persistence] class LeveldbJournal(settings: LeveldbJournalSettings) extends Actor { - // TODO: support migration of processor and channel ids - // needed if default processor and channel ids are used - // (actor paths, which contain deployment information). - - val leveldbOptions = new Options().createIfMissing(true).compressionType(CompressionType.NONE) - val levelDbReadOptions = new ReadOptions().verifyChecksums(settings.checksum) - val levelDbWriteOptions = new WriteOptions().sync(settings.fsync) - - val leveldbFactory = org.iq80.leveldb.impl.Iq80DBFactory.factory - var leveldb: DB = _ - - val numericIdOffset = 10 - var pathMap: Map[String, Int] = Map.empty - - // TODO: use protobuf serializer for PersistentImpl - // TODO: use user-defined serializer for payload - val serializer = SerializationExtension(context.system).findSerializerFor("") - - import LeveldbJournal._ - import Journal._ - - def receive = { - case Write(pm, p) ⇒ { - val sm = withBatch { batch ⇒ - // must be done because PromiseActorRef instances have no uid set TODO: discuss - val ps = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender - val sm = pm.copy(sender = Serialization.serializedActorPath(ps)) - val pid = numericId(sm.processorId) - batch.put(keyToBytes(counterKey(pid)), counterToBytes(sm.sequenceNr)) - batch.put(keyToBytes(Key(pid, sm.sequenceNr, 0)), msgToBytes(sm.copy(resolved = false, confirmTarget = null, confirmMessage = null))) - sm - } - p.tell(Written(sm), sender) - } - case c @ Confirm(pid, snr, cid) ⇒ { - leveldb.put(keyToBytes(Key(numericId(pid), snr, numericId(cid))), cid.getBytes("UTF-8")) - // TODO: turn off by default and allow to turn on by configuration - context.system.eventStream.publish(c) - } - case Delete(pm: PersistentImpl) ⇒ { - leveldb.put(keyToBytes(deletionKey(numericId(pm.processorId), pm.sequenceNr)), Array.empty[Byte]) - } - case Loop(m, p) ⇒ { - p.tell(Looped(m), sender) - } - case Replay(toSnr, p, pid) ⇒ { - val options = levelDbReadOptions.snapshot(leveldb.getSnapshot) - val iter = leveldb.iterator(options) - val maxSnr = leveldb.get(keyToBytes(counterKey(numericId(pid))), options) match { - case null ⇒ 0L - case bytes ⇒ counterFromBytes(bytes) - } - context.actorOf(Props(classOf[LeveldbReplay], msgFromBytes _)) ! LeveldbReplay.Replay(toSnr, maxSnr, p, numericId(pid), iter) - } - } - - private def msgToBytes(m: PersistentImpl): Array[Byte] = serializer.toBinary(m) - private def msgFromBytes(a: Array[Byte]): PersistentImpl = serializer.fromBinary(a).asInstanceOf[PersistentImpl] - - // ---------------------------------------------------------- - // Path mapping - // ---------------------------------------------------------- - - private def numericId(processorId: String): Int = pathMap.get(processorId) match { - case None ⇒ writePathMapping(processorId, pathMap.size + numericIdOffset) - case Some(v) ⇒ v - } - - private def readPathMap(): Map[String, Int] = { - val iter = leveldb.iterator(levelDbReadOptions.snapshot(leveldb.getSnapshot)) - try { - iter.seek(keyToBytes(idToKey(numericIdOffset))) - readPathMap(Map.empty, iter) - } finally { - iter.close() - } - } - - private def readPathMap(pathMap: Map[String, Int], iter: DBIterator): Map[String, Int] = { - if (!iter.hasNext) pathMap else { - val nextEntry = iter.next() - val nextKey = keyFromBytes(nextEntry.getKey) - if (!isMappingKey(nextKey)) pathMap else { - val nextVal = new String(nextEntry.getValue, "UTF-8") - readPathMap(pathMap + (nextVal -> idFromKey(nextKey)), iter) - } - } - } - - private def writePathMapping(path: String, numericId: Int): Int = { - pathMap = pathMap + (path -> numericId) - leveldb.put(keyToBytes(idToKey(numericId)), path.getBytes("UTF-8")) - numericId - } - - // ---------------------------------------------------------- - // Batch write support - // ---------------------------------------------------------- - - def withBatch[R](body: WriteBatch ⇒ R): R = { - val batch = leveldb.createWriteBatch() - try { - val r = body(batch) - leveldb.write(batch, levelDbWriteOptions) - r - } finally { - batch.close() - } - } - - // ---------------------------------------------------------- - // Life cycle - // ---------------------------------------------------------- - - override def preStart() { - leveldb = leveldbFactory.open(new File(settings.dir), leveldbOptions) - pathMap = readPathMap() - } - - override def postStop() { - leveldb.close() - } -} - -private object LeveldbJournal { - case class Key( - processorId: Int, - sequenceNr: Long, - channelId: Int) - - def idToKey(id: Int) = Key(1, 0L, id) - def idFromKey(key: Key) = key.channelId - - def counterKey(processorId: Int): Key = Key(processorId, 0L, 0) - def isMappingKey(key: Key): Boolean = key.processorId == 1 - - def deletionKey(processorId: Int, sequenceNr: Long): Key = Key(processorId, sequenceNr, 1) - def isDeletionKey(key: Key): Boolean = key.channelId == 1 - - def counterToBytes(ctr: Long): Array[Byte] = ByteBuffer.allocate(8).putLong(ctr).array - def counterFromBytes(bytes: Array[Byte]): Long = ByteBuffer.wrap(bytes).getLong - - def keyToBytes(key: Key): Array[Byte] = { - val bb = ByteBuffer.allocate(20) - bb.putInt(key.processorId) - bb.putLong(key.sequenceNr) - bb.putInt(key.channelId) - bb.array - } - - def keyFromBytes(bytes: Array[Byte]): Key = { - val bb = ByteBuffer.wrap(bytes) - val aid = bb.getInt - val snr = bb.getLong - val cid = bb.getInt - new Key(aid, snr, cid) - } -} - -private class LeveldbReplay(deserialize: Array[Byte] ⇒ PersistentImpl) extends Actor { - val extension = Persistence(context.system) - - import LeveldbReplay._ - import LeveldbJournal._ - import Journal.{ Replayed, RecoveryEnd } - - // TODO: parent should stop replay actor if it crashes - // TODO: use a pinned dispatcher - - def receive = { - case Replay(toSnr, maxSnr, processor, processorId, iter) ⇒ { - try { - val startKey = Key(processorId, 1L, 0) - iter.seek(keyToBytes(startKey)) - replay(iter, startKey, toSnr, m ⇒ processor.tell(Replayed(m), extension.system.provider.resolveActorRef(m.sender))) - } finally { iter.close() } - processor.tell(RecoveryEnd(maxSnr), self) - context.stop(self) - } - } - - @scala.annotation.tailrec - private def replay(iter: DBIterator, key: Key, toSnr: Long, callback: PersistentImpl ⇒ Unit) { - if (iter.hasNext) { - val nextEntry = iter.next() - val nextKey = keyFromBytes(nextEntry.getKey) - if (nextKey.sequenceNr > toSnr) { - // end iteration here - } else if (nextKey.channelId != 0) { - // phantom confirmation (just advance iterator) - replay(iter, nextKey, toSnr, callback) - } else if (key.processorId == nextKey.processorId) { - val msg = deserialize(nextEntry.getValue) - val del = deletion(iter, nextKey) - val cnf = confirms(iter, nextKey, Nil) - if (!del) callback(msg.copy(confirms = cnf)) - replay(iter, nextKey, toSnr, callback) - } - } - } - - private def deletion(iter: DBIterator, key: Key): Boolean = { - if (iter.hasNext) { - val nextEntry = iter.peekNext() - val nextKey = keyFromBytes(nextEntry.getKey) - if (key.processorId == nextKey.processorId && key.sequenceNr == nextKey.sequenceNr && isDeletionKey(nextKey)) { - iter.next() - true - } else false - } else false - } - - @scala.annotation.tailrec - private def confirms(iter: DBIterator, key: Key, channelIds: List[String]): List[String] = { - if (iter.hasNext) { - val nextEntry = iter.peekNext() - val nextKey = keyFromBytes(nextEntry.getKey) - if (key.processorId == nextKey.processorId && key.sequenceNr == nextKey.sequenceNr) { - val nextValue = new String(nextEntry.getValue, "UTF-8") - iter.next() - confirms(iter, nextKey, nextValue :: channelIds) - } else channelIds - } else channelIds - } - -} - -private object LeveldbReplay { - case class Replay(toSequenceNr: Long, maxSequenceNr: Long, processor: ActorRef, pid: Int, iterator: DBIterator) -} diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala new file mode 100644 index 0000000000..9e9793b22c --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala @@ -0,0 +1,60 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.persistence.journal.leveldb + +import akka.actor.Actor + +import org.iq80.leveldb.DBIterator + +/** + * Persistent mapping of `String`-based processor and channel ids to numeric ids. + */ +private[persistence] trait LeveldbIdMapping extends Actor { this: LeveldbJournal ⇒ + import Key._ + + private val idOffset = 10 + private var idMap: Map[String, Int] = Map.empty + + /** + * Get the mapped numeric id for the specified processor or channel `id`. Creates and + * stores a new mapping if necessary. + */ + def numericId(id: String): Int = idMap.get(id) match { + case None ⇒ writeIdMapping(id, idMap.size + idOffset) + case Some(v) ⇒ v + } + + private def readIdMap(): Map[String, Int] = { + val iter = leveldbIterator + try { + iter.seek(keyToBytes(idKey(idOffset))) + readIdMap(Map.empty, iter) + } finally { + iter.close() + } + } + + private def readIdMap(pathMap: Map[String, Int], iter: DBIterator): Map[String, Int] = { + if (!iter.hasNext) pathMap else { + val nextEntry = iter.next() + val nextKey = keyFromBytes(nextEntry.getKey) + if (!isIdKey(nextKey)) pathMap else { + val nextVal = new String(nextEntry.getValue, "UTF-8") + readIdMap(pathMap + (nextVal -> id(nextKey)), iter) + } + } + } + + private def writeIdMapping(id: String, numericId: Int): Int = { + idMap = idMap + (id -> numericId) + leveldb.put(keyToBytes(idKey(numericId)), id.getBytes("UTF-8")) + numericId + } + + override def preStart() { + idMap = readIdMap() + super.preStart() + } +} diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala new file mode 100644 index 0000000000..cf8bd563fe --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala @@ -0,0 +1,138 @@ +/** + * Copyright (C) 2012-2013 Eligotech BV. + */ + +package akka.persistence.journal.leveldb + +import java.io.File + +import scala.util._ + +import org.iq80.leveldb._ + +import com.typesafe.config.Config + +import akka.actor._ +import akka.pattern.PromiseActorRef +import akka.persistence._ +import akka.serialization.{ Serialization, SerializationExtension } + +/** + * LevelDB journal settings. + */ +private[persistence] class LeveldbJournalSettings(config: Config) extends JournalFactory { + /** + * Name of directory where journal files shall be stored. Can be a relative or absolute path. + */ + val journalDir: File = new File(config.getString("dir")) + + /** + * Verify checksums on read. + */ + val checksum = false + + /** + * Synchronous writes to disk. + */ + val fsync: Boolean = config.getBoolean("fsync") + + /** + * Creates a new LevelDB journal actor from this configuration object. + */ + def createJournal(implicit factory: ActorRefFactory): ActorRef = + factory.actorOf(Props(classOf[LeveldbJournal], this).withDispatcher("akka.persistence.journal.leveldb.write.dispatcher")) +} + +/** + * LevelDB journal. + */ +private[persistence] class LeveldbJournal(val settings: LeveldbJournalSettings) extends Actor with LeveldbIdMapping with LeveldbReplay { + val extension = Persistence(context.system) + + val leveldbOptions = new Options().createIfMissing(true).compressionType(CompressionType.NONE) + val leveldbReadOptions = new ReadOptions().verifyChecksums(settings.checksum) + val leveldbWriteOptions = new WriteOptions().sync(settings.fsync) + + val leveldbDir = settings.journalDir + val leveldbFactory = org.iq80.leveldb.impl.Iq80DBFactory.factory + var leveldb: DB = _ + + // TODO: support migration of processor and channel ids + // needed if default processor and channel ids are used + // (actor paths, which contain deployment information). + + // TODO: use protobuf serializer for PersistentImpl + // TODO: use user-defined serializer for payload + val serializer = SerializationExtension(context.system).findSerializerFor("") + + import Journal._ + import Key._ + + import context.dispatcher + + def receive = { + case Write(persistent, processor) ⇒ { + val persisted = withBatch { batch ⇒ + val sdr = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender + val nid = numericId(persistent.processorId) + val prepared = persistent.copy(sender = Serialization.serializedActorPath(sdr)) + batch.put(keyToBytes(counterKey(nid)), counterToBytes(prepared.sequenceNr)) + batch.put(keyToBytes(Key(nid, prepared.sequenceNr, 0)), persistentToBytes(prepared.copy(resolved = false, confirmTarget = null, confirmMessage = null))) + prepared + } + processor.tell(Written(persisted), sender) + } + case c @ Confirm(processorId, sequenceNr, channelId) ⇒ { + leveldb.put(keyToBytes(Key(numericId(processorId), sequenceNr, numericId(channelId))), channelId.getBytes("UTF-8")) + context.system.eventStream.publish(c) // TODO: turn off by default and allow to turn on by configuration + } + case Delete(persistent: PersistentImpl) ⇒ { + leveldb.put(keyToBytes(deletionKey(numericId(persistent.processorId), persistent.sequenceNr)), Array.empty[Byte]) + } + case Loop(message, processor) ⇒ { + processor.tell(Looped(message), sender) + } + case Replay(fromSequenceNr, toSequenceNr, processor, processorId) ⇒ { + val maxSnr = maxSequenceNr(processorId) + replayAsync(fromSequenceNr, toSequenceNr, processor, processorId) onComplete { + case Success(_) ⇒ processor ! ReplayCompleted(maxSnr) + case Failure(e) ⇒ // TODO: send RecoveryFailed to processor + } + } + } + + def leveldbSnapshot = leveldbReadOptions.snapshot(leveldb.getSnapshot) + def leveldbIterator = leveldb.iterator(leveldbSnapshot) + + def persistentToBytes(p: PersistentImpl): Array[Byte] = serializer.toBinary(p) + def persistentFromBytes(a: Array[Byte]): PersistentImpl = serializer.fromBinary(a).asInstanceOf[PersistentImpl] + + def withBatch[R](body: WriteBatch ⇒ R): R = { + val batch = leveldb.createWriteBatch() + try { + val r = body(batch) + leveldb.write(batch, leveldbWriteOptions) + r + } finally { + batch.close() + } + } + + def maxSequenceNr(processorId: String) = { + leveldb.get(keyToBytes(counterKey(numericId(processorId))), leveldbSnapshot) match { + case null ⇒ 0L + case bytes ⇒ counterFromBytes(bytes) + } + } + + override def preStart() { + leveldb = leveldbFactory.open(leveldbDir, leveldbOptions) + super.preStart() + } + + override def postStop() { + super.postStop() + leveldb.close() + } +} + diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbKey.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbKey.scala new file mode 100644 index 0000000000..b5326aa5dc --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbKey.scala @@ -0,0 +1,45 @@ +/** + * Copyright (C) 2012-2013 Eligotech BV. + */ + +package akka.persistence.journal.leveldb + +import java.nio.ByteBuffer + +/** + * LevelDB key. + */ +private[leveldb] case class Key( + processorId: Int, + sequenceNr: Long, + channelId: Int) + +private[leveldb] object Key { + def keyToBytes(key: Key): Array[Byte] = { + val bb = ByteBuffer.allocate(20) + bb.putInt(key.processorId) + bb.putLong(key.sequenceNr) + bb.putInt(key.channelId) + bb.array + } + + def keyFromBytes(bytes: Array[Byte]): Key = { + val bb = ByteBuffer.wrap(bytes) + val aid = bb.getInt + val snr = bb.getLong + val cid = bb.getInt + new Key(aid, snr, cid) + } + + def counterKey(processorId: Int): Key = Key(processorId, 0L, 0) + def counterToBytes(ctr: Long): Array[Byte] = ByteBuffer.allocate(8).putLong(ctr).array + def counterFromBytes(bytes: Array[Byte]): Long = ByteBuffer.wrap(bytes).getLong + + def id(key: Key) = key.channelId + def idKey(id: Int) = Key(1, 0L, id) + def isIdKey(key: Key): Boolean = key.processorId == 1 + + def deletionKey(processorId: Int, sequenceNr: Long): Key = Key(processorId, sequenceNr, 1) + def isDeletionKey(key: Key): Boolean = key.channelId == 1 +} + diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbReplay.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbReplay.scala new file mode 100644 index 0000000000..bc864626c2 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbReplay.scala @@ -0,0 +1,79 @@ +/** + * Copyright (C) 2012-2013 Eligotech BV. + */ + +package akka.persistence.journal.leveldb + +import scala.concurrent.Future + +import org.iq80.leveldb.DBIterator + +import akka.actor._ +import akka.persistence._ +import akka.persistence.Journal._ + +/** + * Asynchronous replay support. + */ +private[persistence] trait LeveldbReplay extends Actor { this: LeveldbJournal ⇒ + import Key._ + + private val executionContext = context.system.dispatchers.lookup("akka.persistence.journal.leveldb.replay.dispatcher") + + def replayAsync(fromSequenceNr: Long, toSequenceNr: Long, processor: ActorRef, processorId: String): Future[Unit] = + Future(replay(fromSequenceNr: Long, toSequenceNr, processor, numericId(processorId), leveldbIterator))(executionContext) + + private def replay(fromSequenceNr: Long, toSequenceNr: Long, processor: ActorRef, processorId: Int, iter: DBIterator): Unit = { + @scala.annotation.tailrec + def go(key: Key)(callback: PersistentImpl ⇒ Unit) { + if (iter.hasNext) { + val nextEntry = iter.next() + val nextKey = keyFromBytes(nextEntry.getKey) + if (nextKey.sequenceNr > toSequenceNr) { + // end iteration here + } else if (nextKey.channelId != 0) { + // phantom confirmation (just advance iterator) + go(nextKey)(callback) + } else if (key.processorId == nextKey.processorId) { + val msg = persistentFromBytes(nextEntry.getValue) + val del = deletion(nextKey) + val cnf = confirms(nextKey, Nil) + if (!del) callback(msg.copy(confirms = cnf)) + go(nextKey)(callback) + } + } + } + + @scala.annotation.tailrec + def confirms(key: Key, channelIds: List[String]): List[String] = { + if (iter.hasNext) { + val nextEntry = iter.peekNext() + val nextKey = keyFromBytes(nextEntry.getKey) + if (key.processorId == nextKey.processorId && key.sequenceNr == nextKey.sequenceNr) { + val nextValue = new String(nextEntry.getValue, "UTF-8") + iter.next() + confirms(nextKey, nextValue :: channelIds) + } else channelIds + } else channelIds + } + + def deletion(key: Key): Boolean = { + if (iter.hasNext) { + val nextEntry = iter.peekNext() + val nextKey = keyFromBytes(nextEntry.getKey) + if (key.processorId == nextKey.processorId && key.sequenceNr == nextKey.sequenceNr && isDeletionKey(nextKey)) { + iter.next() + true + } else false + } else false + } + + try { + val startKey = Key(processorId, if (fromSequenceNr < 1L) 1L else fromSequenceNr, 0) + iter.seek(keyToBytes(startKey)) + go(startKey) { m ⇒ processor.tell(Replayed(m), extension.system.provider.resolveActorRef(m.sender)) } + } finally { + iter.close() + } + } +} diff --git a/akka-persistence/src/main/scala/akka/persistence/package.scala b/akka-persistence/src/main/scala/akka/persistence/package.scala new file mode 100644 index 0000000000..ebfcc1efb5 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/package.scala @@ -0,0 +1,13 @@ +/** + * Copyright (C) 2012-2013 Eligotech BV. + */ + +package akka + +package object persistence { + implicit val snapshotMetadataOrdering = new Ordering[SnapshotMetadata] { + def compare(x: SnapshotMetadata, y: SnapshotMetadata) = + if (x.processorId == y.processorId) math.signum(x.sequenceNr - y.sequenceNr).toInt + else x.processorId.compareTo(y.processorId) + } +} diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala new file mode 100644 index 0000000000..0413c0680d --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala @@ -0,0 +1,160 @@ +/** + * Copyright (C) 2012-2013 Eligotech BV. + */ + +package akka.persistence.snapshot.local + +import java.io._ +import java.net.{ URLDecoder, URLEncoder } + +import scala.collection.SortedSet +import scala.concurrent._ +import scala.util._ + +import com.typesafe.config.Config + +import akka.actor._ +import akka.persistence._ + +/** + * [[LocalSnapshotStore]] settings. + */ +private[persistence] class LocalSnapshotStoreSettings(config: Config) extends SnapshotStoreFactory { + /** + * Name of directory where snapshot files shall be stored. + */ + val snapshotDir: File = new File(config.getString("dir")) + + /** + * Creates a new snapshot store actor. + */ + def createSnapshotStore(implicit factory: ActorRefFactory): ActorRef = + factory.actorOf(Props(classOf[LocalSnapshotStore], this)) +} +/** + * Snapshot store that stores snapshots on local filesystem. + */ +private[persistence] class LocalSnapshotStore(settings: LocalSnapshotStoreSettings) extends Actor with ActorLogging { + private implicit val executionContext = context.system.dispatchers.lookup("akka.persistence.snapshot-store.local.io.dispatcher") + + // TODO: make snapshot access configurable + // TODO: make snapshot serializer configurable + + private val snapshotDir = settings.snapshotDir + private val snapshotAccess = new LocalSnapshotAccess(snapshotDir) + private val snapshotSerializer = SnapshotSerialization(context.system).java + + var snapshotMetadata = Map.empty[String, SortedSet[SnapshotMetadata]] + + import SnapshotStore._ + + def receive = { + case LoadSnapshot(processorId, criteria, toSequenceNr) ⇒ { + val p = sender + loadSnapshotAsync(processorId, criteria.limit(toSequenceNr)) onComplete { + case Success(sso) ⇒ p ! LoadSnapshotCompleted(sso, toSequenceNr) + case Failure(_) ⇒ p ! LoadSnapshotCompleted(None, toSequenceNr) + } + } + case SaveSnapshot(metadata, snapshot) ⇒ { + val p = sender + val md = metadata.copy(timestamp = System.currentTimeMillis) + saveSnapshotAsync(md, snapshot) onComplete { + case Success(_) ⇒ self tell (SaveSnapshotSucceeded(md), p) + case Failure(e) ⇒ self tell (SaveSnapshotFailed(metadata, e), p) + } + } + case evt @ SaveSnapshotSucceeded(metadata) ⇒ { + updateMetadata(metadata) + sender ! evt // sender is processor + } + case evt @ SaveSnapshotFailed(metadata, reason) ⇒ { + deleteSnapshot(metadata) + sender ! evt // sender is processor + } + } + + def loadSnapshotAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SavedSnapshot]] = + Future(loadSnapshot(processorId, criteria)) + + def loadSnapshot(processorId: String, criteria: SnapshotSelectionCriteria): Option[SavedSnapshot] = { + @scala.annotation.tailrec + def load(metadata: SortedSet[SnapshotMetadata]): Option[SavedSnapshot] = metadata.lastOption match { + case None ⇒ None + case Some(md) ⇒ { + Try(snapshotAccess.withInputStream(md)(snapshotSerializer.deserialize(_, md))) match { + case Success(ss) ⇒ Some(SavedSnapshot(md, ss)) + case Failure(e) ⇒ { + log.error(e, s"error loading snapshot ${md}") + load(metadata.init) // try older snapshot + } + } + } + } + + // Heuristics: + // + // Select youngest 3 snapshots that match upper bound. This may help in situations + // where saving of a snapshot could not be completed because of a JVM crash. Hence, + // an attempt to load that snapshot will fail but loading an older snapshot may + // succeed. + // + // TODO: make number of loading attempts configurable + // TODO: improve heuristics for remote snapshot loading + + for { + mds ← snapshotMetadata.get(processorId) + md ← load(mds.filter(md ⇒ + md.sequenceNr <= criteria.maxSequenceNr && + md.timestamp <= criteria.maxTimestamp).takeRight(3)) + } yield md + } + + def saveSnapshotAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = + Future(saveSnapshot(metadata, snapshot)) + + private def saveSnapshot(metadata: SnapshotMetadata, snapshot: Any): Unit = + snapshotAccess.withOutputStream(metadata)(snapshotSerializer.serialize(_, metadata, snapshot)) + + def deleteSnapshot(metadata: SnapshotMetadata): Unit = + snapshotAccess.delete(metadata) + + def updateMetadata(metadata: SnapshotMetadata): Unit = { + snapshotMetadata = snapshotMetadata + (snapshotMetadata.get(metadata.processorId) match { + case Some(mds) ⇒ metadata.processorId -> (mds + metadata) + case None ⇒ metadata.processorId -> SortedSet(metadata) + }) + } + + override def preStart() { + if (!snapshotDir.exists) snapshotDir.mkdirs() + snapshotMetadata = SortedSet.empty ++ snapshotAccess.metadata groupBy (_.processorId) + super.preStart() + } +} + +/** + * Access to snapshot files on local filesystem. + */ +private[persistence] class LocalSnapshotAccess(snapshotDir: File) extends SnapshotAccess { + private val FilenamePattern = """^snapshot-(.+)-(\d+)-(\d+)""".r + + def metadata: Set[SnapshotMetadata] = snapshotDir.listFiles.map(_.getName).collect { + case FilenamePattern(pid, snr, tms) ⇒ SnapshotMetadata(URLDecoder.decode(pid, "UTF-8"), snr.toLong, tms.toLong) + }.toSet + + def delete(metadata: SnapshotMetadata): Unit = + snapshotFile(metadata).delete() + + def withOutputStream(metadata: SnapshotMetadata)(p: (OutputStream) ⇒ Unit) = + withStream(new BufferedOutputStream(new FileOutputStream(snapshotFile(metadata))), p) + + def withInputStream(metadata: SnapshotMetadata)(p: (InputStream) ⇒ Any) = + withStream(new BufferedInputStream(new FileInputStream(snapshotFile(metadata))), p) + + private def withStream[A <: Closeable, B](stream: A, p: A ⇒ B): B = + try { p(stream) } finally { stream.close() } + + private def snapshotFile(metadata: SnapshotMetadata): File = + new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.processorId, "UTF-8")}-${metadata.sequenceNr}-${metadata.timestamp}") +} diff --git a/akka-persistence/src/test/scala/akka/persistence/ChannelSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ChannelSpec.scala index e7b5cb9a6e..1756f2e0c5 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ChannelSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ChannelSpec.scala @@ -9,6 +9,7 @@ object ChannelSpec { |serialize-creators = on |serialize-messages = on |akka.persistence.journal.leveldb.dir = "target/journal-channel-spec" + |akka.persistence.snapshot-store.local.dir = ${akka.persistence.journal.leveldb.dir}/snapshots """.stripMargin class TestProcessor(name: String) extends NamedProcessor(name) { diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala index 79b0a33e19..fa7bfc6f7d 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala @@ -49,3 +49,5 @@ abstract class NamedProcessor(name: String) extends Processor { trait TurnOffRecoverOnStart { this: Processor ⇒ override def preStart(): Unit = () } + +case object GetState diff --git a/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala index ac10967abf..5d3a6ca734 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala @@ -9,10 +9,9 @@ object ProcessorSpec { |serialize-creators = on |serialize-messages = on |akka.persistence.journal.leveldb.dir = "target/journal-processor-spec" + |akka.persistence.snapshot-store.local.dir = ${akka.persistence.journal.leveldb.dir}/snapshots """.stripMargin - case object GetState - class RecoverTestProcessor(name: String) extends NamedProcessor(name) { var state = List.empty[String] def receive = { @@ -24,7 +23,7 @@ object ProcessorSpec { override def preRestart(reason: Throwable, message: Option[Any]) = { message match { - case Some(m: Persistent) ⇒ delete(m) // delete message from journal + case Some(m: Persistent) ⇒ deleteMessage(m) // delete message from journal case _ ⇒ // ignore } super.preRestart(reason, message) @@ -113,7 +112,7 @@ object ProcessorSpec { class LastReplayedMsgFailsTestProcessor(name: String) extends RecoverTestProcessor(name) { override def preRestart(reason: Throwable, message: Option[Any]) = { message match { - case Some(m: Persistent) ⇒ if (recoveryRunning) delete(m) + case Some(m: Persistent) ⇒ if (recoveryRunning) deleteMessage(m) case _ ⇒ } super.preRestart(reason, message) @@ -222,13 +221,13 @@ class ProcessorSpec extends AkkaSpec(ProcessorSpec.config) with PersistenceSpec } "support recovery with upper sequence number bound" in { val processor = namedProcessor[RecoverOffTestProcessor] - processor ! Recover(1L) + processor ! Recover(toSequenceNr = 1L) processor ! GetState expectMsg(List("a-1")) } "never replace journaled messages" in { val processor1 = namedProcessor[RecoverOffTestProcessor] - processor1 ! Recover(1L) + processor1 ! Recover(toSequenceNr = 1L) processor1 ! Persistent("c") processor1 ! GetState expectMsg(List("a-1", "c-3")) diff --git a/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala index 35c8184536..3df87e8f27 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala @@ -9,10 +9,9 @@ object ProcessorStashSpec { |serialize-creators = on |serialize-messages = on |akka.persistence.journal.leveldb.dir = "target/journal-processor-stash-spec" + |akka.persistence.snapshot-store.local.dir = ${akka.persistence.journal.leveldb.dir}/snapshots """.stripMargin - case object GetState - class StashingProcessor(name: String) extends NamedProcessor(name) { var state: List[String] = Nil @@ -41,7 +40,7 @@ object ProcessorStashSpec { class RecoveryFailureStashingProcessor(name: String) extends StashingProcessor(name) { override def preRestart(reason: Throwable, message: Option[Any]) = { message match { - case Some(m: Persistent) ⇒ if (recoveryRunning) delete(m) + case Some(m: Persistent) ⇒ if (recoveryRunning) deleteMessage(m) case _ ⇒ } super.preRestart(reason, message) diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala new file mode 100644 index 0000000000..9b13de6277 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala @@ -0,0 +1,146 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.persistence + +import akka.actor._ +import akka.testkit._ + +object SnapshotSpec { + val config = + """ + |serialize-creators = on + |serialize-messages = on + |akka.persistence.journal.leveldb.dir = "target/journal-snapshot-spec" + |akka.persistence.snapshot-store.local.dir = ${akka.persistence.journal.leveldb.dir}/snapshots + """.stripMargin + + case object TakeSnapshot + + class SaveSnapshotTestProcessor(name: String, probe: ActorRef) extends NamedProcessor(name) { + var state = List.empty[String] + def receive = { + case Persistent(payload, snr) ⇒ state = s"${payload}-${snr}" :: state + case TakeSnapshot ⇒ saveSnapshot(state) + case SaveSnapshotSucceeded(md) ⇒ probe ! md.sequenceNr + case GetState ⇒ probe ! state.reverse + } + } + + class LoadSnapshotTestProcessor(name: String, probe: ActorRef) extends NamedProcessor(name) { + def receive = { + case Persistent(payload, snr) ⇒ probe ! s"${payload}-${snr}" + case SnapshotOffer(md, s) ⇒ probe ! ((md, s)) + case other ⇒ probe ! other + } + override def preStart() = () + } +} + +class SnapshotSpec extends AkkaSpec(SnapshotSpec.config) with PersistenceSpec with ImplicitSender { + import SnapshotSpec._ + + override protected def beforeEach() { + super.beforeEach() + + val processor = system.actorOf(Props(classOf[SaveSnapshotTestProcessor], name, testActor)) + processor ! Persistent("a") + processor ! TakeSnapshot + processor ! Persistent("b") + processor ! TakeSnapshot + processor ! Persistent("c") + processor ! Persistent("d") + processor ! TakeSnapshot + processor ! Persistent("e") + processor ! Persistent("f") + expectMsgAllOf(1L, 2L, 4L) + } + + "A processor" must { + "recover state starting from the most recent snapshot" in { + val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) + val processorId = name + + processor ! Recover() + + expectMsgPF() { + case (SnapshotMetadata(`processorId`, 4, timestamp), state) ⇒ { + state must be(List("a-1", "b-2", "c-3", "d-4").reverse) + timestamp must be > (0L) + } + } + expectMsg("e-5") + expectMsg("f-6") + } + "recover state starting from the most recent snapshot matching an upper sequence number bound" in { + val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) + val processorId = name + + processor ! Recover(toSequenceNr = 3) + + expectMsgPF() { + case (SnapshotMetadata(`processorId`, 2, timestamp), state) ⇒ { + state must be(List("a-1", "b-2").reverse) + timestamp must be > (0L) + } + } + expectMsg("c-3") + } + "recover state starting from the most recent snapshot matching an upper sequence number bound (without further replay)" in { + val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) + val processorId = name + + processor ! Recover(toSequenceNr = 4) + processor ! "done" + + expectMsgPF() { + case (SnapshotMetadata(`processorId`, 4, timestamp), state) ⇒ { + state must be(List("a-1", "b-2", "c-3", "d-4").reverse) + timestamp must be > (0L) + } + } + expectMsg("done") + } + "recover state starting from the most recent snapshot matching criteria" in { + val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) + val processorId = name + + processor ! Recover(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2)) + + expectMsgPF() { + case (SnapshotMetadata(`processorId`, 2, timestamp), state) ⇒ { + state must be(List("a-1", "b-2").reverse) + timestamp must be > (0L) + } + } + expectMsg("c-3") + expectMsg("d-4") + expectMsg("e-5") + expectMsg("f-6") + } + "recover state starting from the most recent snapshot matching criteria and an upper sequence number bound" in { + val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) + val processorId = name + + processor ! Recover(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2), toSequenceNr = 3) + + expectMsgPF() { + case (SnapshotMetadata(`processorId`, 2, timestamp), state) ⇒ { + state must be(List("a-1", "b-2").reverse) + timestamp must be > (0L) + } + } + expectMsg("c-3") + } + "recover state from scratch if snapshot based recovery is disabled" in { + val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) + + processor ! Recover(fromSnapshot = SnapshotSelectionCriteria.None, toSequenceNr = 3) + + expectMsg("a-1") + expectMsg("b-2") + expectMsg("c-3") + } + } +} diff --git a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorChannelExample.java b/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorChannelExample.java index 91a8398d55..c03c172ac1 100644 --- a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorChannelExample.java +++ b/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorChannelExample.java @@ -41,7 +41,7 @@ public class ProcessorChannelExample { public static void main(String... args) throws Exception { final ActorSystem system = ActorSystem.create("example"); final ActorRef destination = system.actorOf(Props.create(ExampleDestination.class)); - final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class, destination), "processor"); + final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class, destination), "processor-1"); processor.tell(Persistent.create("a"), null); processor.tell(Persistent.create("b"), null); diff --git a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorFailureExample.java b/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorFailureExample.java index e236887a5a..bdf9baf2af 100644 --- a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorFailureExample.java +++ b/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorFailureExample.java @@ -34,7 +34,7 @@ public class ProcessorFailureExample { @Override public void preRestart(Throwable reason, Option message) { if (message.isDefined() && message.get() instanceof Persistent) { - delete((Persistent) message.get()); + deleteMessage((Persistent) message.get()); } super.preRestart(reason, message); } @@ -42,7 +42,7 @@ public class ProcessorFailureExample { public static void main(String... args) throws Exception { final ActorSystem system = ActorSystem.create("example"); - final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor"); + final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-2"); processor.tell(Persistent.create("a"), null); processor.tell("print", null); diff --git a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/SnapshotExample.java b/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/SnapshotExample.java new file mode 100644 index 0000000000..e58be657fe --- /dev/null +++ b/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/SnapshotExample.java @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package sample.persistence.japi; + +import java.io.Serializable; +import java.util.ArrayList; + +import akka.actor.*; +import akka.persistence.*; + +public class SnapshotExample { + public static class ExampleState implements Serializable { + private final ArrayList received; + + public ExampleState() { + this(new ArrayList()); + } + + public ExampleState(ArrayList received) { + this.received = received; + } + + public ExampleState copy() { + return new ExampleState(new ArrayList(received)); + } + + public void update(String s) { + received.add(s); + } + + @Override + public String toString() { + return received.toString(); + } + } + + public static class ExampleProcessor extends UntypedProcessor { + private ExampleState state = new ExampleState(); + + @Override + public void onReceive(Object message) throws Exception { + if (message instanceof Persistent) { + Persistent persistent = (Persistent)message; + state.update(String.format("%s-%d", persistent.payload(), persistent.sequenceNr())); + } else if (message instanceof SnapshotOffer) { + ExampleState s = (ExampleState)((SnapshotOffer)message).snapshot(); + System.out.println("offered state = " + s); + state = s; + } else if (message.equals("print")) { + System.out.println("current state = " + state); + } else if (message.equals("snap")) { + saveSnapshot(state.copy()); + } + } + } + + public static void main(String... args) throws Exception { + final ActorSystem system = ActorSystem.create("example"); + final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-3-java"); + + processor.tell(Persistent.create("a"), null); + processor.tell(Persistent.create("b"), null); + processor.tell("snap", null); + processor.tell(Persistent.create("c"), null); + processor.tell(Persistent.create("d"), null); + processor.tell("print", null); + + Thread.sleep(1000); + system.shutdown(); + } +} diff --git a/akka-samples/akka-sample-persistence/src/main/resources/application.conf b/akka-samples/akka-sample-persistence/src/main/resources/application.conf index f483b177e8..432132898d 100644 --- a/akka-samples/akka-sample-persistence/src/main/resources/application.conf +++ b/akka-samples/akka-sample-persistence/src/main/resources/application.conf @@ -1 +1,2 @@ -akka.persistence.journal.leveldb.dir = "target/journal-example" +akka.persistence.journal.leveldb.dir = "target/example/journal" +akka.persistence.snapshot-store.local.dir = "target/example/snapshots" diff --git a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorChannelExample.scala b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorChannelExample.scala index eda76ecc13..aac20be1be 100644 --- a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorChannelExample.scala +++ b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorChannelExample.scala @@ -34,7 +34,7 @@ object ProcessorChannelExample extends App { } val system = ActorSystem("example") - val processor = system.actorOf(Props(classOf[ExampleProcessor]), "processor") + val processor = system.actorOf(Props(classOf[ExampleProcessor]), "processor-1") implicit val timeout = Timeout(3000) import system.dispatcher diff --git a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorFailureExample.scala b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorFailureExample.scala index 3a781450eb..8ca24dc3d9 100644 --- a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorFailureExample.scala +++ b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorFailureExample.scala @@ -20,7 +20,7 @@ object ProcessorFailureExample extends App { override def preRestart(reason: Throwable, message: Option[Any]) { message match { - case Some(p: Persistent) if !recoveryRunning ⇒ delete(p) // mark failing message as deleted + case Some(p: Persistent) if !recoveryRunning ⇒ deleteMessage(p) // mark failing message as deleted case _ ⇒ // ignore } super.preRestart(reason, message) @@ -28,7 +28,7 @@ object ProcessorFailureExample extends App { } val system = ActorSystem("example") - val processor = system.actorOf(Props(classOf[ExampleProcessor]), "processor") + val processor = system.actorOf(Props(classOf[ExampleProcessor]), "processor-2") processor ! Persistent("a") processor ! "print" diff --git a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/SnapshotExample.scala b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/SnapshotExample.scala new file mode 100644 index 0000000000..ae6f678876 --- /dev/null +++ b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/SnapshotExample.scala @@ -0,0 +1,41 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package sample.persistence + +import akka.actor._ +import akka.persistence._ + +object SnapshotExample extends App { + case class ExampleState(received: List[String] = Nil) { + def update(s: String) = copy(s :: received) + override def toString = received.reverse.toString + } + + class ExampleProcessor extends Processor { + var state = ExampleState() + + def receive = { + case Persistent(s, snr) ⇒ state = state.update(s"${s}-${snr}") + case SaveSnapshotSucceeded(metadata) ⇒ // ... + case SaveSnapshotFailed(metadata, reason) ⇒ // ... + case SnapshotOffer(_, s: ExampleState) ⇒ println("offered state = " + s); state = s + case "print" ⇒ println("current state = " + state) + case "snap" ⇒ saveSnapshot(state) + } + } + + val system = ActorSystem("example") + val processor = system.actorOf(Props(classOf[ExampleProcessor]), "processor-3-scala") + + processor ! Persistent("a") + processor ! Persistent("b") + processor ! "snap" + processor ! Persistent("c") + processor ! Persistent("d") + processor ! "print" + + Thread.sleep(1000) + system.shutdown() +}