+per #3746 Remote sharing of LevelDB for testing purposes

Further changes

- remove obsolete identity checks in Eventsourced
- fix wrong serialize-messages config in tests
This commit is contained in:
Martin Krasser 2013-11-25 12:02:29 +01:00
parent 4d05253391
commit d0bc8a6400
20 changed files with 649 additions and 179 deletions

View file

@ -5,6 +5,7 @@
package docs.persistence; package docs.persistence;
//#plugin-imports //#plugin-imports
import akka.actor.UntypedActor;
import scala.concurrent.Future; import scala.concurrent.Future;
import akka.japi.Option; import akka.japi.Option;
import akka.japi.Procedure; import akka.japi.Procedure;
@ -12,8 +13,44 @@ import akka.persistence.*;
import akka.persistence.journal.japi.*; import akka.persistence.journal.japi.*;
import akka.persistence.snapshot.japi.*; import akka.persistence.snapshot.japi.*;
//#plugin-imports //#plugin-imports
import akka.actor.*;
import akka.persistence.journal.leveldb.SharedLeveldbJournal;
import akka.persistence.journal.leveldb.SharedLeveldbStore;
public class PersistencePluginDocTest { public class PersistencePluginDocTest {
static Object o1 = new Object() {
final ActorSystem system = null;
//#shared-store-creation
final ActorRef store = system.actorOf(Props.create(SharedLeveldbStore.class), "store");
//#shared-store-creation
//#shared-store-usage
class SharedStorageUsage extends UntypedActor {
@Override
public void preStart() throws Exception {
String path = "akka.tcp://example@127.0.0.1:2552/user/store";
ActorSelection selection = getContext().actorSelection(path);
selection.tell(new Identify(1), getSelf());
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof ActorIdentity) {
ActorIdentity identity = (ActorIdentity) message;
if (identity.correlationId().equals(1)) {
ActorRef store = identity.getRef();
if (store != null) {
SharedLeveldbJournal.setStore(store, getContext().system());
}
}
}
}
}
//#shared-store-usage
};
class MySnapshotStore extends SnapshotStore { class MySnapshotStore extends SnapshotStore {
@Override @Override
public Future<Option<SelectedSnapshot>> doLoadAsync(String processorId, SnapshotSelectionCriteria criteria) { public Future<Option<SelectedSnapshot>> doLoadAsync(String processorId, SnapshotSelectionCriteria criteria) {

View file

@ -60,19 +60,6 @@ Architecture
* *Event sourcing*. Based on the building blocks described above, Akka persistence provides abstractions for the * *Event sourcing*. Based on the building blocks described above, Akka persistence provides abstractions for the
development of event sourced applications (see section :ref:`event-sourcing-java`) development of event sourced applications (see section :ref:`event-sourcing-java`)
Configuration
=============
By default, journaled messages are written to a directory named ``journal`` in the current working directory. This
can be changed by configuration where the specified path can be relative or absolute:
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#journal-config
The default storage location of :ref:`snapshots-java` is a directory named ``snapshots`` in the current working directory.
This can be changed by configuration where the specified path can be relative or absolute:
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-config
.. _processors-java: .. _processors-java:
Processors Processors
@ -407,10 +394,11 @@ will therefore never be done partially i.e. with only a subset of events persist
Storage plugins Storage plugins
=============== ===============
Storage backends for journals and snapshot stores are plugins in akka-persistence. The default journal plugin writes Storage backends for journals and snapshot stores are plugins in akka-persistence. The default journal plugin
messages to LevelDB. The default snapshot store plugin writes snapshots as individual files to the local filesystem. writes messages to LevelDB (see :ref:`local-leveldb-journal-java`). The default snapshot store plugin writes snapshots
Applications can provide their own plugins by implementing a plugin API and activate them by configuration. Plugin as individual files to the local filesystem (see :ref:`local-snapshot-store-java`). Applications can provide their own
development requires the following imports: plugins by implementing a plugin API and activate them by configuration. Plugin development requires the following
imports:
.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#plugin-imports .. includecode:: code/docs/persistence/PersistencePluginDocTest.java#plugin-imports
@ -454,6 +442,67 @@ A snapshot store plugin can be activated with the following minimal configuratio
The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher
used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``. used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``.
Pre-packaged plugins
====================
.. _local-leveldb-journal-java:
Local LevelDB journal
---------------------
The default journal plugin is ``akka.persistence.journal.leveldb`` which writes messages to a local LevelDB
instance. The default location of the LevelDB files is a directory named ``journal`` in the current working
directory. This location can be changed by configuration where the specified path can be relative or absolute:
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#journal-config
With this plugin, each actor system runs its own private LevelDB instance.
Shared LevelDB journal
----------------------
A LevelDB instance can also be shared by multiple actor systems (on the same or on different nodes). This, for
example, allows processors to failover to a backup node, assuming that the node, where the shared instance is
runnning, is accessible from the backup node.
.. warning::
A shared LevelDB instance is a single point of failure and should therefore only be used for testing
purposes.
A shared LevelDB instance can be created by instantiating the ``SharedLeveldbStore`` actor.
.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#shared-store-creation
By default, the shared instance writes journaled messages to a local directory named ``journal`` in the current
working directory. The storage location can be changed by configuration:
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#shared-store-config
Actor systems that use a shared LevelDB store must activate the ``akka.persistence.journal.leveldb-shared``
plugin.
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#shared-journal-config
This plugin must be initialized by injecting the (remote) ``SharedLeveldbStore`` actor reference. Injection is
done by calling the ``SharedLeveldbJournal.setStore`` method with the actor reference as argument.
.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#shared-store-usage
Internal journal commands (sent by processors) are buffered until injection completes. Injection is idempotent
i.e. only the first injection is used.
.. _local-snapshot-store-java:
Local snapshot store
--------------------
The default snapshot store plugin is ``akka.persistence.snapshot-store.local`` which writes snapshot files to
the local filesystem. The default storage location is a directory named ``snapshots`` in the current working
directory. This can be changed by configuration where the specified path can be relative or absolute:
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-config
Custom serialization Custom serialization
==================== ====================

View file

@ -71,6 +71,47 @@ class PersistencePluginDocSpec extends WordSpec {
} }
} }
object SharedLeveldbPluginDocSpec {
import akka.actor._
import akka.persistence.journal.leveldb.SharedLeveldbJournal
val config =
"""
//#shared-journal-config
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
//#shared-journal-config
//#shared-store-config
akka.persistence.journal.leveldb-shared.store.dir = "target/shared"
//#shared-store-config
"""
//#shared-store-usage
trait SharedStoreUsage extends Actor {
override def preStart(): Unit = {
context.actorSelection("akka.tcp://example@127.0.0.1:2552/user/store") ! Identify(1)
}
def receive = {
case ActorIdentity(1, Some(store))
SharedLeveldbJournal.setStore(store, context.system)
}
}
//#shared-store-usage
}
trait SharedLeveldbPluginDocSpec {
val system: ActorSystem
new AnyRef {
import akka.actor._
//#shared-store-creation
import akka.persistence.journal.leveldb.SharedLeveldbStore
val store = system.actorOf(Props[SharedLeveldbStore], "store")
//#shared-store-creation
}
}
class MyJournal extends AsyncWriteJournal { class MyJournal extends AsyncWriteJournal {
def writeAsync(persistentBatch: Seq[PersistentRepr]): Future[Unit] = ??? def writeAsync(persistentBatch: Seq[PersistentRepr]): Future[Unit] = ???
def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Future[Unit] = ??? def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Future[Unit] = ???

View file

@ -56,19 +56,6 @@ Architecture
* *Event sourcing*. Based on the building blocks described above, Akka persistence provides abstractions for the * *Event sourcing*. Based on the building blocks described above, Akka persistence provides abstractions for the
development of event sourced applications (see section :ref:`event-sourcing`) development of event sourced applications (see section :ref:`event-sourcing`)
Configuration
=============
By default, journaled messages are written to a directory named ``journal`` in the current working directory. This
can be changed by configuration where the specified path can be relative or absolute:
.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#journal-config
The default storage location of :ref:`snapshots` is a directory named ``snapshots`` in the current working directory.
This can be changed by configuration where the specified path can be relative or absolute:
.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-config
.. _processors: .. _processors:
Processors Processors
@ -418,10 +405,11 @@ will therefore never be done partially i.e. with only a subset of events persist
Storage plugins Storage plugins
=============== ===============
Storage backends for journals and snapshot stores are plugins in akka-persistence. The default journal plugin writes Storage backends for journals and snapshot stores are plugins in akka-persistence. The default journal plugin
messages to LevelDB. The default snapshot store plugin writes snapshots as individual files to the local filesystem. writes messages to LevelDB (see :ref:`local-leveldb-journal`). The default snapshot store plugin writes snapshots
Applications can provide their own plugins by implementing a plugin API and activate them by configuration. Plugin as individual files to the local filesystem (see :ref:`local-snapshot-store`). Applications can provide their own
development requires the following imports: plugins by implementing a plugin API and activate them by configuration. Plugin development requires the following
imports:
.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#plugin-imports .. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#plugin-imports
@ -465,6 +453,74 @@ A snapshot store plugin can be activated with the following minimal configuratio
The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher
used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``. used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``.
Pre-packaged plugins
====================
.. _local-leveldb-journal:
Local LevelDB journal
---------------------
The default journal plugin is ``akka.persistence.journal.leveldb`` which writes messages to a local LevelDB
instance. The default location of the LevelDB files is a directory named ``journal`` in the current working
directory. This location can be changed by configuration where the specified path can be relative or absolute:
.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#journal-config
With this plugin, each actor system runs its own private LevelDB instance.
Shared LevelDB journal
----------------------
A LevelDB instance can also be shared by multiple actor systems (on the same or on different nodes). This, for
example, allows processors to failover to a backup node, assuming that the node, where the shared instance is
runnning, is accessible from the backup node.
.. warning::
A shared LevelDB instance is a single point of failure and should therefore only be used for testing
purposes.
A shared LevelDB instance can be created by instantiating the ``SharedLeveldbStore`` actor.
.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#shared-store-creation
By default, the shared instance writes journaled messages to a local directory named ``journal`` in the current
working directory. The storage location can be changed by configuration:
.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#shared-store-config
Actor systems that use a shared LevelDB store must activate the ``akka.persistence.journal.leveldb-shared``
plugin.
.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#shared-journal-config
This plugin must be initialized by injecting the (remote) ``SharedLeveldbStore`` actor reference. Injection is
done by calling the ``SharedLeveldbJournal.setStore`` method with the actor reference as argument.
.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#shared-store-usage
Internal journal commands (sent by processors) are buffered until injection completes. Injection is idempotent
i.e. only the first injection is used.
.. _local-snapshot-store:
Local snapshot store
--------------------
The default snapshot store plugin is ``akka.persistence.snapshot-store.local`` which writes snapshot files to
the local filesystem. The default storage location is a directory named ``snapshots`` in the current working
directory. This can be changed by configuration where the specified path can be relative or absolute:
.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-config
Planned plugins
---------------
* Shared snapshot store (SPOF, for testing purposes)
* HA snapshot store backed by a distributed file system
* HA journal backed by a distributed (NoSQL) data store
Custom serialization Custom serialization
==================== ====================

View file

@ -69,6 +69,37 @@ akka {
# Native LevelDB (via JNI) or LevelDB Java port # Native LevelDB (via JNI) or LevelDB Java port
native = on native = on
} }
# Shared LevelDB journal plugin (for testing only).
leveldb-shared {
# Class name of the plugin.
class = "akka.persistence.journal.leveldb.SharedLeveldbJournal"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.actor.default-dispatcher"
store {
# Dispatcher for shared store actor.
store-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
# Dispatcher for message replay.
replay-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
# Storage location of LevelDB files.
dir = "journal"
# Use fsync on write
fsync = off
# Verify checksum on read.
checksum = off
# Native LevelDB (via JNI) or LevelDB Java port
native = on
}
}
} }
snapshot-store { snapshot-store {

View file

@ -75,10 +75,10 @@ private[persistence] trait Eventsourced extends Processor {
case p: PersistentRepr case p: PersistentRepr
deleteMessage(p.sequenceNr, true) deleteMessage(p.sequenceNr, true)
throw new UnsupportedOperationException("Persistent commands not supported") throw new UnsupportedOperationException("Persistent commands not supported")
case WriteSuccess(p) if identical(p.payload, persistInvocations.head._1) case WriteSuccess(p)
withCurrentPersistent(p)(p persistInvocations.head._2(p.payload)) withCurrentPersistent(p)(p persistInvocations.head._2(p.payload))
onWriteComplete() onWriteComplete()
case e @ WriteFailure(p, _) if identical(p.payload, persistInvocations.head._1) case e @ WriteFailure(p, _)
Eventsourced.super.aroundReceive(receive, message) // stops actor by default Eventsourced.super.aroundReceive(receive, message) // stops actor by default
onWriteComplete() onWriteComplete()
case s @ WriteBatchSuccess Eventsourced.super.aroundReceive(receive, s) case s @ WriteBatchSuccess Eventsourced.super.aroundReceive(receive, s)
@ -93,9 +93,6 @@ private[persistence] trait Eventsourced extends Processor {
processorStash.unstash() processorStash.unstash()
} }
} }
def identical(a: Any, b: Any): Boolean =
a.asInstanceOf[AnyRef] eq b.asInstanceOf[AnyRef]
} }
private var persistInvocations: List[(Any, Any Unit)] = Nil private var persistInvocations: List[(Any, Any Unit)] = Nil

View file

@ -9,6 +9,8 @@ import scala.collection.immutable
import akka.actor._ import akka.actor._
/** /**
* INTERNAL API.
*
* Defines messages exchanged between processors, channels and a journal. * Defines messages exchanged between processors, channels and a journal.
*/ */
private[persistence] object JournalProtocol { private[persistence] object JournalProtocol {

View file

@ -302,6 +302,9 @@ private[persistence] case class ConfirmablePersistentImpl(
copy(sequenceNr = sequenceNr, processorId = processorId, deleted = deleted, resolved = resolved, confirms = confirms, confirmMessage = confirmMessage, confirmTarget = confirmTarget, sender = sender) copy(sequenceNr = sequenceNr, processorId = processorId, deleted = deleted, resolved = resolved, confirms = confirms, confirmMessage = confirmMessage, confirmTarget = confirmTarget, sender = sender)
} }
/**
* INTERNAL API.
*/
private[persistence] object ConfirmablePersistentImpl { private[persistence] object ConfirmablePersistentImpl {
def apply(persistent: PersistentRepr, confirmMessage: Confirm, confirmTarget: ActorRef): ConfirmablePersistentImpl = def apply(persistent: PersistentRepr, confirmMessage: Confirm, confirmTarget: ActorRef): ConfirmablePersistentImpl =
ConfirmablePersistentImpl(persistent.payload, persistent.sequenceNr, persistent.processorId, persistent.deleted, persistent.resolved, persistent.confirms, confirmMessage, confirmTarget, persistent.sender) ConfirmablePersistentImpl(persistent.payload, persistent.sequenceNr, persistent.processorId, persistent.deleted, persistent.resolved, persistent.confirms, confirmMessage, confirmTarget, persistent.sender)

View file

@ -108,6 +108,8 @@ object SelectedSnapshot {
} }
/** /**
* INTERNAL API.
*
* Defines messages exchanged between processors and a snapshot store. * Defines messages exchanged between processors and a snapshot store.
*/ */
private[persistence] object SnapshotProtocol { private[persistence] object SnapshotProtocol {

View file

@ -26,7 +26,7 @@ trait AsyncWriteJournal extends Actor with AsyncReplay {
private val resequencer = context.actorOf(Props[Resequencer]) private val resequencer = context.actorOf(Props[Resequencer])
private var resequencerCounter = 1L private var resequencerCounter = 1L
final def receive = { def receive = {
case WriteBatch(persistentBatch, processor) case WriteBatch(persistentBatch, processor)
val cctr = resequencerCounter val cctr = resequencerCounter
def resequence(f: PersistentRepr Any) = persistentBatch.zipWithIndex.foreach { def resequence(f: PersistentRepr Any) = persistentBatch.zipWithIndex.foreach {
@ -92,6 +92,9 @@ trait AsyncWriteJournal extends Actor with AsyncReplay {
//#journal-plugin-api //#journal-plugin-api
} }
/**
* INTERNAL API.
*/
private[persistence] object AsyncWriteJournal { private[persistence] object AsyncWriteJournal {
case class Desequenced(msg: Any, snr: Long, target: ActorRef, sender: ActorRef) case class Desequenced(msg: Any, snr: Long, target: ActorRef, sender: ActorRef)

View file

@ -0,0 +1,110 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.journal
import scala.collection.immutable
import scala.concurrent._
import scala.concurrent.duration.Duration
import scala.language.postfixOps
import akka.AkkaException
import akka.actor._
import akka.pattern.ask
import akka.persistence._
import akka.util._
/**
* INTERNAL API.
*
* A journal that delegates actual storage to a target actor. For testing only.
*/
private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash {
import AsyncWriteProxy._
import AsyncWriteTarget._
private val initialized = super.receive
private var store: ActorRef = _
override def receive = {
case SetStore(ref)
store = ref
unstashAll()
context.become(initialized)
case _ stash()
}
implicit def timeout: Timeout
def writeAsync(persistentBatch: immutable.Seq[PersistentRepr]): Future[Unit] =
(store ? WriteBatch(persistentBatch)).mapTo[Unit]
def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Future[Unit] =
(store ? Delete(processorId, fromSequenceNr, toSequenceNr, permanent)).mapTo[Unit]
def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] =
(store ? Confirm(processorId, sequenceNr, channelId)).mapTo[Unit]
def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentRepr) Unit): Future[Long] = {
val replayCompletionPromise = Promise[Long]
val mediator = context.actorOf(Props(classOf[ReplayMediator], replayCallback, replayCompletionPromise, timeout.duration).withDeploy(Deploy.local))
store.tell(Replay(processorId, fromSequenceNr, toSequenceNr), mediator)
replayCompletionPromise.future
}
}
/**
* INTERNAL API.
*/
private[persistence] object AsyncWriteProxy {
case class SetStore(ref: ActorRef)
}
/**
* INTERNAL API.
*/
private[persistence] object AsyncWriteTarget {
@SerialVersionUID(1L)
case class WriteBatch(pb: immutable.Seq[PersistentRepr])
@SerialVersionUID(1L)
case class Delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean)
@SerialVersionUID(1L)
case class Confirm(processorId: String, sequenceNr: Long, channelId: String)
@SerialVersionUID(1L)
case class Replay(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)
@SerialVersionUID(1L)
case class ReplaySuccess(maxSequenceNr: Long)
@SerialVersionUID(1L)
case class ReplayFailure(cause: Throwable)
}
/**
* Thrown if replay inactivity exceeds a specified timeout.
*/
@SerialVersionUID(1L)
class AsyncReplayTimeoutException(msg: String) extends AkkaException(msg)
private class ReplayMediator(replayCallback: PersistentRepr Unit, replayCompletionPromise: Promise[Long], replayTimeout: Duration) extends Actor {
import AsyncWriteTarget._
context.setReceiveTimeout(replayTimeout)
def receive = {
case p: PersistentRepr replayCallback(p)
case ReplaySuccess(maxSnr)
replayCompletionPromise.success(maxSnr)
context.stop(self)
case ReplayFailure(cause)
replayCompletionPromise.failure(cause)
context.stop(self)
case ReceiveTimeout
replayCompletionPromise.failure(new AsyncReplayTimeoutException(s"replay timed out after ${replayTimeout.toSeconds} seconds inactivity"))
context.stop(self)
}
}

View file

@ -5,39 +5,29 @@
package akka.persistence.journal.inmem package akka.persistence.journal.inmem
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.language.postfixOps import scala.language.postfixOps
import akka.actor._ import akka.actor._
import akka.pattern.ask
import akka.persistence._ import akka.persistence._
import akka.persistence.journal.AsyncWriteJournal import akka.persistence.journal.AsyncWriteProxy
import akka.util._ import akka.persistence.journal.AsyncWriteTarget
import akka.util.Timeout
/** /**
* INTERNAL API. * INTERNAL API.
* *
* In-memory journal for testing purposes only. * In-memory journal for testing purposes only.
*/ */
private[persistence] class InmemJournal extends AsyncWriteJournal { private[persistence] class InmemJournal extends AsyncWriteProxy {
val store = context.actorOf(Props[InmemStore]) import AsyncWriteProxy.SetStore
implicit val timeout = Timeout(5 seconds) val timeout = Timeout(5 seconds)
import InmemStore._ override def preStart(): Unit = {
super.preStart()
def writeAsync(persistentBatch: immutable.Seq[PersistentRepr]): Future[Unit] = self ! SetStore(context.actorOf(Props[InmemStore]))
(store ? WriteBatch(persistentBatch)).mapTo[Unit] }
def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Future[Unit] =
(store ? Delete(processorId, fromSequenceNr, toSequenceNr, permanent)).mapTo[Unit]
def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] =
(store ? Confirm(processorId, sequenceNr, channelId)).mapTo[Unit]
def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentRepr) Unit): Future[Long] =
(store ? Replay(processorId, fromSequenceNr, toSequenceNr, replayCallback)).mapTo[Long]
} }
/** /**
@ -80,33 +70,19 @@ private[persistence] trait InmemMessages {
* INTERNAL API. * INTERNAL API.
*/ */
private[persistence] class InmemStore extends Actor with InmemMessages { private[persistence] class InmemStore extends Actor with InmemMessages {
import InmemStore._ import AsyncWriteTarget._
def receive = { def receive = {
case WriteBatch(pb) case WriteBatch(pb)
pb.foreach(add) sender ! pb.foreach(add)
success()
case Delete(pid, fsnr, tsnr, false) case Delete(pid, fsnr, tsnr, false)
fsnr to tsnr foreach { snr update(pid, snr)(_.update(deleted = true)) } sender ! (fsnr to tsnr foreach { snr update(pid, snr)(_.update(deleted = true)) })
success()
case Delete(pid, fsnr, tsnr, true) case Delete(pid, fsnr, tsnr, true)
fsnr to tsnr foreach { snr delete(pid, snr) } sender ! (fsnr to tsnr foreach { snr delete(pid, snr) })
success()
case Confirm(pid, snr, cid) case Confirm(pid, snr, cid)
update(pid, snr)(p p.update(confirms = cid +: p.confirms)) sender ! update(pid, snr)(p p.update(confirms = cid +: p.confirms))
success() case Replay(pid, fromSnr, toSnr)
case Replay(pid, fromSnr, toSnr, callback) read(pid, fromSnr, toSnr).foreach(sender ! _)
read(pid, fromSnr, toSnr).foreach(callback) sender ! ReplaySuccess(maxSequenceNr(pid))
success(maxSequenceNr(pid))
} }
private def success(reply: Any = ()) =
sender ! reply
}
private[persistence] object InmemStore {
case class WriteBatch(pb: Seq[PersistentRepr])
case class Delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean)
case class Confirm(processorId: String, sequenceNr: Long, channelId: String)
case class Replay(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, replayCallback: (PersistentRepr) Unit)
} }

View file

@ -13,7 +13,7 @@ import akka.actor.Actor
* *
* LevelDB backed persistent mapping of `String`-based processor and channel ids to numeric ids. * LevelDB backed persistent mapping of `String`-based processor and channel ids to numeric ids.
*/ */
private[persistence] trait LeveldbIdMapping extends Actor { this: LeveldbJournal private[persistence] trait LeveldbIdMapping extends Actor { this: LeveldbStore
import Key._ import Key._
private val idOffset = 10 private val idOffset = 10

View file

@ -1,94 +1,38 @@
/** /**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
* Copyright (C) 2012-2013 Eligotech BV.
*/ */
package akka.persistence.journal.leveldb package akka.persistence.journal.leveldb
import java.io.File import scala.concurrent.duration._
import scala.language.postfixOps
import scala.collection.immutable import akka.actor._
import akka.persistence.Persistence
import org.iq80.leveldb._ import akka.persistence.journal._
import akka.util.Timeout
import akka.persistence._
import akka.persistence.journal.SyncWriteJournal
import akka.serialization.SerializationExtension
/** /**
* INTERNAL API. * INTERNAL API.
* *
* LevelDB backed journal. * Journal backed by a local LevelDB store. For production use.
*/ */
private[persistence] class LeveldbJournal extends SyncWriteJournal with LeveldbIdMapping with LeveldbReplay { private[persistence] class LeveldbJournal extends { val configPath = "akka.persistence.journal.leveldb" } with SyncWriteJournal with LeveldbStore
val config = context.system.settings.config.getConfig("akka.persistence.journal.leveldb")
val nativeLeveldb = config.getBoolean("native")
val leveldbOptions = new Options().createIfMissing(true) /**
val leveldbReadOptions = new ReadOptions().verifyChecksums(config.getBoolean("checksum")) * INTERNAL API.
val leveldbWriteOptions = new WriteOptions().sync(config.getBoolean("fsync")) *
val leveldbDir = new File(config.getString("dir")) * Journal backed by a [[SharedLeveldbStore]]. For testing only.
var leveldb: DB = _ */
private[persistence] class SharedLeveldbJournal extends AsyncWriteProxy {
def leveldbFactory = val timeout: Timeout = Timeout(10 seconds) // TODO: make configurable
if (nativeLeveldb) org.fusesource.leveldbjni.JniDBFactory.factory }
else org.iq80.leveldb.impl.Iq80DBFactory.factory
object SharedLeveldbJournal {
// TODO: support migration of processor and channel ids /**
// needed if default processor and channel ids are used * Sets the shared LevelDB `store` for the given actor `system`.
// (actor paths, which contain deployment information). *
* @see [[SharedLeveldbStore]]
val serialization = SerializationExtension(context.system) */
def setStore(store: ActorRef, system: ActorSystem): Unit =
import Key._ Persistence(system).journalFor(null) ! AsyncWriteProxy.SetStore(store)
def write(persistentBatch: immutable.Seq[PersistentRepr]) =
withBatch(batch persistentBatch.foreach(persistent addToBatch(persistent, batch)))
def delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean) = withBatch { batch
val nid = numericId(processorId)
if (permanent) fromSequenceNr to toSequenceNr foreach { sequenceNr
batch.delete(keyToBytes(Key(nid, sequenceNr, 0))) // TODO: delete confirmations and deletion markers, if any.
}
else fromSequenceNr to toSequenceNr foreach { sequenceNr
batch.put(keyToBytes(deletionKey(nid, sequenceNr)), Array.empty[Byte])
}
}
def confirm(processorId: String, sequenceNr: Long, channelId: String) {
leveldb.put(keyToBytes(Key(numericId(processorId), sequenceNr, numericId(channelId))), channelId.getBytes("UTF-8"))
}
def leveldbSnapshot = leveldbReadOptions.snapshot(leveldb.getSnapshot)
def leveldbIterator = leveldb.iterator(leveldbSnapshot)
def persistentToBytes(p: PersistentRepr): Array[Byte] = serialization.serialize(p).get
def persistentFromBytes(a: Array[Byte]): PersistentRepr = serialization.deserialize(a, classOf[PersistentRepr]).get
private def addToBatch(persistent: PersistentRepr, batch: WriteBatch): Unit = {
val nid = numericId(persistent.processorId)
batch.put(keyToBytes(counterKey(nid)), counterToBytes(persistent.sequenceNr))
batch.put(keyToBytes(Key(nid, persistent.sequenceNr, 0)), persistentToBytes(persistent))
}
private def withBatch[R](body: WriteBatch R): R = {
val batch = leveldb.createWriteBatch()
try {
val r = body(batch)
leveldb.write(batch, leveldbWriteOptions)
r
} finally {
batch.close()
}
}
override def preStart() {
leveldb = leveldbFactory.open(leveldbDir, if (nativeLeveldb) leveldbOptions else leveldbOptions.compressionType(CompressionType.NONE))
super.preStart()
}
override def postStop() {
leveldb.close()
super.postStop()
}
} }

View file

@ -15,16 +15,16 @@ import akka.persistence.journal.AsyncReplay
* *
* LevelDB backed message replay. * LevelDB backed message replay.
*/ */
private[persistence] trait LeveldbReplay extends AsyncReplay { this: LeveldbJournal private[persistence] trait LeveldbReplay extends AsyncReplay { this: LeveldbStore
import Key._ import Key._
private val replayDispatcherId = context.system.settings.config.getString("akka.persistence.journal.leveldb.replay-dispatcher") private lazy val replayDispatcherId = config.getString("replay-dispatcher")
private val replayDispatcher = context.system.dispatchers.lookup(replayDispatcherId) private lazy val replayDispatcher = context.system.dispatchers.lookup(replayDispatcherId)
def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: PersistentRepr Unit): Future[Long] = def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: PersistentRepr Unit): Future[Long] =
Future(replay(numericId(processorId), fromSequenceNr: Long, toSequenceNr)(replayCallback))(replayDispatcher) Future(replay(numericId(processorId), fromSequenceNr: Long, toSequenceNr)(replayCallback))(replayDispatcher)
private def replay(processorId: Int, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: PersistentRepr Unit): Long = { def replay(processorId: Int, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: PersistentRepr Unit): Long = {
val iter = leveldbIterator val iter = leveldbIterator
@scala.annotation.tailrec @scala.annotation.tailrec

View file

@ -0,0 +1,121 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
* Copyright (C) 2012-2013 Eligotech BV.
*/
package akka.persistence.journal.leveldb
import java.io.File
import scala.collection.immutable
import scala.util._
import org.iq80.leveldb._
import akka.actor._
import akka.persistence._
import akka.persistence.journal.AsyncWriteTarget
import akka.serialization.SerializationExtension
/**
* INTERNAL API.
*/
private[persistence] trait LeveldbStore extends Actor with LeveldbIdMapping with LeveldbReplay {
val configPath: String
val config = context.system.settings.config.getConfig(configPath)
val nativeLeveldb = config.getBoolean("native")
val leveldbOptions = new Options().createIfMissing(true)
val leveldbReadOptions = new ReadOptions().verifyChecksums(config.getBoolean("checksum"))
val leveldbWriteOptions = new WriteOptions().sync(config.getBoolean("fsync"))
val leveldbDir = new File(config.getString("dir"))
var leveldb: DB = _
def leveldbFactory =
if (nativeLeveldb) org.fusesource.leveldbjni.JniDBFactory.factory
else org.iq80.leveldb.impl.Iq80DBFactory.factory
// TODO: support migration of processor and channel ids
// needed if default processor and channel ids are used
// (actor paths, which contain deployment information).
val serialization = SerializationExtension(context.system)
import Key._
def write(persistentBatch: immutable.Seq[PersistentRepr]) =
withBatch(batch persistentBatch.foreach(persistent addToBatch(persistent, batch)))
def delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean) = withBatch { batch
val nid = numericId(processorId)
if (permanent) fromSequenceNr to toSequenceNr foreach { sequenceNr
batch.delete(keyToBytes(Key(nid, sequenceNr, 0))) // TODO: delete confirmations and deletion markers, if any.
}
else fromSequenceNr to toSequenceNr foreach { sequenceNr
batch.put(keyToBytes(deletionKey(nid, sequenceNr)), Array.empty[Byte])
}
}
def confirm(processorId: String, sequenceNr: Long, channelId: String) {
leveldb.put(keyToBytes(Key(numericId(processorId), sequenceNr, numericId(channelId))), channelId.getBytes("UTF-8"))
}
def leveldbSnapshot = leveldbReadOptions.snapshot(leveldb.getSnapshot)
def leveldbIterator = leveldb.iterator(leveldbSnapshot)
def persistentToBytes(p: PersistentRepr): Array[Byte] = serialization.serialize(p).get
def persistentFromBytes(a: Array[Byte]): PersistentRepr = serialization.deserialize(a, classOf[PersistentRepr]).get
private def addToBatch(persistent: PersistentRepr, batch: WriteBatch): Unit = {
val nid = numericId(persistent.processorId)
batch.put(keyToBytes(counterKey(nid)), counterToBytes(persistent.sequenceNr))
batch.put(keyToBytes(Key(nid, persistent.sequenceNr, 0)), persistentToBytes(persistent))
}
private def withBatch[R](body: WriteBatch R): R = {
val batch = leveldb.createWriteBatch()
try {
val r = body(batch)
leveldb.write(batch, leveldbWriteOptions)
r
} finally {
batch.close()
}
}
override def preStart() {
leveldb = leveldbFactory.open(leveldbDir, if (nativeLeveldb) leveldbOptions else leveldbOptions.compressionType(CompressionType.NONE))
super.preStart()
}
override def postStop() {
leveldb.close()
super.postStop()
}
}
/**
* A LevelDB store that can be shared by multiple actor systems. The shared store must be
* set for each actor system that uses the store via `SharedLeveldbJournal.setStore`. The
* shared LevelDB store is for testing only.
*/
class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.leveldb-shared.store" } with LeveldbStore {
import AsyncWriteTarget._
def receive = {
case WriteBatch(pb) sender ! write(pb)
case Delete(pid, fsnr, tsnr, permanent) sender ! delete(pid, fsnr, tsnr, permanent)
case Confirm(pid, snr, cid) sender ! confirm(pid, snr, cid)
case Replay(pid, fromSnr, toSnr)
val npid = numericId(pid)
val res = for {
_ Try(replay(npid, fromSnr, toSnr)(sender ! _))
max Try(maxSequenceNr(npid))
} yield max
res match {
case Success(max) sender ! ReplaySuccess(max)
case Failure(cause) sender ! ReplayFailure(cause)
}
}
}

View file

@ -93,7 +93,7 @@ object PerformanceSpec {
} }
} }
class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "performance").withFallback(ConfigFactory.parseString(PerformanceSpec.config))) with PersistenceSpec with ImplicitSender { class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "performance", serialization = "off").withFallback(ConfigFactory.parseString(PerformanceSpec.config))) with PersistenceSpec with ImplicitSender {
import PerformanceSpec._ import PerformanceSpec._
val warmupCycles = system.settings.config.getInt("akka.persistence.performance.cycles.warmup") val warmupCycles = system.settings.config.getInt("akka.persistence.performance.cycles.warmup")

View file

@ -46,10 +46,10 @@ trait PersistenceSpec extends BeforeAndAfterEach with Cleanup { this: AkkaSpec
} }
object PersistenceSpec { object PersistenceSpec {
def config(plugin: String, test: String) = ConfigFactory.parseString( def config(plugin: String, test: String, serialization: String = "on") = ConfigFactory.parseString(
s""" s"""
serialize-creators = on akka.actor.serialize-creators = ${serialization}
serialize-messages = on akka.actor.serialize-messages = ${serialization}
akka.persistence.publish-plugin-commands = on akka.persistence.publish-plugin-commands = on
akka.persistence.journal.plugin = "akka.persistence.journal.${plugin}" akka.persistence.journal.plugin = "akka.persistence.journal.${plugin}"
akka.persistence.journal.leveldb.dir = "target/journal-${test}-spec" akka.persistence.journal.leveldb.dir = "target/journal-${test}-spec"
@ -60,6 +60,7 @@ object PersistenceSpec {
trait Cleanup { this: AkkaSpec trait Cleanup { this: AkkaSpec
val storageLocations = List( val storageLocations = List(
"akka.persistence.journal.leveldb.dir", "akka.persistence.journal.leveldb.dir",
"akka.persistence.journal.leveldb-shared.store.dir",
"akka.persistence.snapshot-store.local.dir").map(s new File(system.settings.config.getString(s))) "akka.persistence.snapshot-store.local.dir").map(s new File(system.settings.config.getString(s)))
override protected def atStartup() { override protected def atStartup() {

View file

@ -0,0 +1,99 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.journal.leveldb
import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.persistence._
import akka.testkit.{ TestProbe, AkkaSpec }
object SharedLeveldbJournalSpec {
val config =
"""
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
persistence {
journal {
plugin = "akka.persistence.journal.leveldb-shared"
leveldb-shared.store.dir = target/shared-journal
}
snapshot-store.local.dir = target/snapshot-store
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
loglevel = ERROR
log-dead-letters = 0
log-dead-letters-during-shutdown = off
}
"""
class ExampleProcessor(probe: ActorRef, name: String) extends NamedProcessor(name) {
def receive = {
case Persistent(payload, _) probe ! payload
}
}
class ExampleApp(probe: ActorRef, port: Int) extends Actor {
val processor = context.actorOf(Props(classOf[ExampleProcessor], probe, context.system.name))
def receive = {
case ActorIdentity(1, Some(store)) SharedLeveldbJournal.setStore(store, context.system)
case m processor forward m
}
override def preStart(): Unit = {
context.actorSelection(s"akka.tcp://store@127.0.0.1:${port}/user/store") ! Identify(1)
}
}
def port(system: ActorSystem) =
system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress.port.get
}
class SharedLeveldbJournalSpec extends AkkaSpec(SharedLeveldbJournalSpec.config) with Cleanup {
import SharedLeveldbJournalSpec._
"A LevelDB store" can {
"be shared by multiple actor systems" in {
val storeSystem = ActorSystem("store", ConfigFactory.parseString(SharedLeveldbJournalSpec.config))
val processorASystem = ActorSystem("processorA", ConfigFactory.parseString(SharedLeveldbJournalSpec.config))
val processorBSystem = ActorSystem("processorB", ConfigFactory.parseString(SharedLeveldbJournalSpec.config))
val processorAProbe = new TestProbe(processorASystem)
val processorBProbe = new TestProbe(processorBSystem)
storeSystem.actorOf(Props[SharedLeveldbStore], "store")
val appA = processorASystem.actorOf(Props(classOf[ExampleApp], processorAProbe.ref, port(storeSystem)))
val appB = processorBSystem.actorOf(Props(classOf[ExampleApp], processorBProbe.ref, port(storeSystem)))
appA ! Persistent("a1")
appB ! Persistent("b1")
processorAProbe.expectMsg("a1")
processorBProbe.expectMsg("b1")
val recoveredAppA = processorASystem.actorOf(Props(classOf[ExampleApp], processorAProbe.ref, port(storeSystem)))
val recoveredAppB = processorBSystem.actorOf(Props(classOf[ExampleApp], processorBProbe.ref, port(storeSystem)))
recoveredAppA ! Persistent("a2")
recoveredAppB ! Persistent("b2")
processorAProbe.expectMsg("a1")
processorAProbe.expectMsg("a2")
processorBProbe.expectMsg("b1")
processorBProbe.expectMsg("b2")
}
}
}

View file

@ -1,3 +1,7 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.serialization package akka.persistence.serialization
import scala.collection.immutable import scala.collection.immutable
@ -10,12 +14,6 @@ import akka.serialization._
import akka.testkit._ import akka.testkit._
object SerializerSpecConfigs { object SerializerSpecConfigs {
val common =
"""
serialize-creators = on
serialize-messages = on
"""
val customSerializers = val customSerializers =
""" """
akka.actor { akka.actor {
@ -50,7 +48,7 @@ object SerializerSpecConfigs {
val systemB = "akka.remote.netty.tcp.port = 0" val systemB = "akka.remote.netty.tcp.port = 0"
def config(configs: String*): Config = def config(configs: String*): Config =
configs.foldLeft(ConfigFactory.parseString(common))((r, c) r.withFallback(ConfigFactory.parseString(c))) configs.foldLeft(ConfigFactory.empty)((r, c) r.withFallback(ConfigFactory.parseString(c)))
} }
import SerializerSpecConfigs._ import SerializerSpecConfigs._