From d0bc8a6400cd43ae3872bf18d3438d208a626636 Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Mon, 25 Nov 2013 12:02:29 +0100 Subject: [PATCH] +per #3746 Remote sharing of LevelDB for testing purposes Further changes - remove obsolete identity checks in Eventsourced - fix wrong serialize-messages config in tests --- .../persistence/PersistencePluginDocTest.java | 37 ++++++ akka-docs/rst/java/persistence.rst | 83 +++++++++--- .../PersistencePluginDocSpec.scala | 41 ++++++ akka-docs/rst/scala/persistence.rst | 90 ++++++++++--- .../src/main/resources/reference.conf | 31 +++++ .../scala/akka/persistence/Eventsourced.scala | 7 +- .../akka/persistence/JournalProtocol.scala | 2 + .../scala/akka/persistence/Persistent.scala | 3 + .../scala/akka/persistence/Snapshot.scala | 2 + .../journal/AsyncWriteJournal.scala | 5 +- .../persistence/journal/AsyncWriteProxy.scala | 110 ++++++++++++++++ .../journal/inmem/InmemJournal.scala | 60 +++------ .../journal/leveldb/LeveldbIdMapping.scala | 2 +- .../journal/leveldb/LeveldbJournal.scala | 106 ++++----------- .../journal/leveldb/LeveldbReplay.scala | 8 +- .../journal/leveldb/LeveldbStore.scala | 121 ++++++++++++++++++ .../akka/persistence/PerformanceSpec.scala | 2 +- .../akka/persistence/PersistenceSpec.scala | 7 +- .../leveldb/SharedLeveldbJournalSpec.scala | 99 ++++++++++++++ .../serialization/SerializerSpec.scala | 12 +- 20 files changed, 649 insertions(+), 179 deletions(-) create mode 100644 akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala create mode 100644 akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala create mode 100644 akka-persistence/src/test/scala/akka/persistence/journal/leveldb/SharedLeveldbJournalSpec.scala diff --git a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java index 97068da28b..962e40669b 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java @@ -5,6 +5,7 @@ package docs.persistence; //#plugin-imports +import akka.actor.UntypedActor; import scala.concurrent.Future; import akka.japi.Option; import akka.japi.Procedure; @@ -12,8 +13,44 @@ import akka.persistence.*; import akka.persistence.journal.japi.*; import akka.persistence.snapshot.japi.*; //#plugin-imports +import akka.actor.*; +import akka.persistence.journal.leveldb.SharedLeveldbJournal; +import akka.persistence.journal.leveldb.SharedLeveldbStore; public class PersistencePluginDocTest { + + + static Object o1 = new Object() { + final ActorSystem system = null; + //#shared-store-creation + final ActorRef store = system.actorOf(Props.create(SharedLeveldbStore.class), "store"); + //#shared-store-creation + + //#shared-store-usage + class SharedStorageUsage extends UntypedActor { + @Override + public void preStart() throws Exception { + String path = "akka.tcp://example@127.0.0.1:2552/user/store"; + ActorSelection selection = getContext().actorSelection(path); + selection.tell(new Identify(1), getSelf()); + } + + @Override + public void onReceive(Object message) throws Exception { + if (message instanceof ActorIdentity) { + ActorIdentity identity = (ActorIdentity) message; + if (identity.correlationId().equals(1)) { + ActorRef store = identity.getRef(); + if (store != null) { + SharedLeveldbJournal.setStore(store, getContext().system()); + } + } + } + } + } + //#shared-store-usage + }; + class MySnapshotStore extends SnapshotStore { @Override public Future> doLoadAsync(String processorId, SnapshotSelectionCriteria criteria) { diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 7ecb5436c8..31fbb8fb32 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -60,19 +60,6 @@ Architecture * *Event sourcing*. Based on the building blocks described above, Akka persistence provides abstractions for the development of event sourced applications (see section :ref:`event-sourcing-java`) -Configuration -============= - -By default, journaled messages are written to a directory named ``journal`` in the current working directory. This -can be changed by configuration where the specified path can be relative or absolute: - -.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#journal-config - -The default storage location of :ref:`snapshots-java` is a directory named ``snapshots`` in the current working directory. -This can be changed by configuration where the specified path can be relative or absolute: - -.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-config - .. _processors-java: Processors @@ -407,10 +394,11 @@ will therefore never be done partially i.e. with only a subset of events persist Storage plugins =============== -Storage backends for journals and snapshot stores are plugins in akka-persistence. The default journal plugin writes -messages to LevelDB. The default snapshot store plugin writes snapshots as individual files to the local filesystem. -Applications can provide their own plugins by implementing a plugin API and activate them by configuration. Plugin -development requires the following imports: +Storage backends for journals and snapshot stores are plugins in akka-persistence. The default journal plugin +writes messages to LevelDB (see :ref:`local-leveldb-journal-java`). The default snapshot store plugin writes snapshots +as individual files to the local filesystem (see :ref:`local-snapshot-store-java`). Applications can provide their own +plugins by implementing a plugin API and activate them by configuration. Plugin development requires the following +imports: .. includecode:: code/docs/persistence/PersistencePluginDocTest.java#plugin-imports @@ -454,6 +442,67 @@ A snapshot store plugin can be activated with the following minimal configuratio The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``. +Pre-packaged plugins +==================== + +.. _local-leveldb-journal-java: + +Local LevelDB journal +--------------------- + +The default journal plugin is ``akka.persistence.journal.leveldb`` which writes messages to a local LevelDB +instance. The default location of the LevelDB files is a directory named ``journal`` in the current working +directory. This location can be changed by configuration where the specified path can be relative or absolute: + +.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#journal-config + +With this plugin, each actor system runs its own private LevelDB instance. + +Shared LevelDB journal +---------------------- + +A LevelDB instance can also be shared by multiple actor systems (on the same or on different nodes). This, for +example, allows processors to failover to a backup node, assuming that the node, where the shared instance is +runnning, is accessible from the backup node. + +.. warning:: + + A shared LevelDB instance is a single point of failure and should therefore only be used for testing + purposes. + +A shared LevelDB instance can be created by instantiating the ``SharedLeveldbStore`` actor. + +.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#shared-store-creation + +By default, the shared instance writes journaled messages to a local directory named ``journal`` in the current +working directory. The storage location can be changed by configuration: + +.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#shared-store-config + +Actor systems that use a shared LevelDB store must activate the ``akka.persistence.journal.leveldb-shared`` +plugin. + +.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#shared-journal-config + +This plugin must be initialized by injecting the (remote) ``SharedLeveldbStore`` actor reference. Injection is +done by calling the ``SharedLeveldbJournal.setStore`` method with the actor reference as argument. + +.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#shared-store-usage + +Internal journal commands (sent by processors) are buffered until injection completes. Injection is idempotent +i.e. only the first injection is used. + +.. _local-snapshot-store-java: + +Local snapshot store +-------------------- + +The default snapshot store plugin is ``akka.persistence.snapshot-store.local`` which writes snapshot files to +the local filesystem. The default storage location is a directory named ``snapshots`` in the current working +directory. This can be changed by configuration where the specified path can be relative or absolute: + +.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-config + Custom serialization ==================== diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala index 4c261c2902..f25d7686cd 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala @@ -71,6 +71,47 @@ class PersistencePluginDocSpec extends WordSpec { } } +object SharedLeveldbPluginDocSpec { + import akka.actor._ + import akka.persistence.journal.leveldb.SharedLeveldbJournal + + val config = + """ + //#shared-journal-config + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" + //#shared-journal-config + //#shared-store-config + akka.persistence.journal.leveldb-shared.store.dir = "target/shared" + //#shared-store-config + """ + + //#shared-store-usage + trait SharedStoreUsage extends Actor { + override def preStart(): Unit = { + context.actorSelection("akka.tcp://example@127.0.0.1:2552/user/store") ! Identify(1) + } + + def receive = { + case ActorIdentity(1, Some(store)) ⇒ + SharedLeveldbJournal.setStore(store, context.system) + } + } + //#shared-store-usage +} + +trait SharedLeveldbPluginDocSpec { + val system: ActorSystem + + new AnyRef { + import akka.actor._ + //#shared-store-creation + import akka.persistence.journal.leveldb.SharedLeveldbStore + + val store = system.actorOf(Props[SharedLeveldbStore], "store") + //#shared-store-creation + } +} + class MyJournal extends AsyncWriteJournal { def writeAsync(persistentBatch: Seq[PersistentRepr]): Future[Unit] = ??? def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Future[Unit] = ??? diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 1d54ceb462..d98280b995 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -56,19 +56,6 @@ Architecture * *Event sourcing*. Based on the building blocks described above, Akka persistence provides abstractions for the development of event sourced applications (see section :ref:`event-sourcing`) -Configuration -============= - -By default, journaled messages are written to a directory named ``journal`` in the current working directory. This -can be changed by configuration where the specified path can be relative or absolute: - -.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#journal-config - -The default storage location of :ref:`snapshots` is a directory named ``snapshots`` in the current working directory. -This can be changed by configuration where the specified path can be relative or absolute: - -.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-config - .. _processors: Processors @@ -418,10 +405,11 @@ will therefore never be done partially i.e. with only a subset of events persist Storage plugins =============== -Storage backends for journals and snapshot stores are plugins in akka-persistence. The default journal plugin writes -messages to LevelDB. The default snapshot store plugin writes snapshots as individual files to the local filesystem. -Applications can provide their own plugins by implementing a plugin API and activate them by configuration. Plugin -development requires the following imports: +Storage backends for journals and snapshot stores are plugins in akka-persistence. The default journal plugin +writes messages to LevelDB (see :ref:`local-leveldb-journal`). The default snapshot store plugin writes snapshots +as individual files to the local filesystem (see :ref:`local-snapshot-store`). Applications can provide their own +plugins by implementing a plugin API and activate them by configuration. Plugin development requires the following +imports: .. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#plugin-imports @@ -465,6 +453,74 @@ A snapshot store plugin can be activated with the following minimal configuratio The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``. +Pre-packaged plugins +==================== + +.. _local-leveldb-journal: + +Local LevelDB journal +--------------------- + +The default journal plugin is ``akka.persistence.journal.leveldb`` which writes messages to a local LevelDB +instance. The default location of the LevelDB files is a directory named ``journal`` in the current working +directory. This location can be changed by configuration where the specified path can be relative or absolute: + +.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#journal-config + +With this plugin, each actor system runs its own private LevelDB instance. + +Shared LevelDB journal +---------------------- + +A LevelDB instance can also be shared by multiple actor systems (on the same or on different nodes). This, for +example, allows processors to failover to a backup node, assuming that the node, where the shared instance is +runnning, is accessible from the backup node. + +.. warning:: + + A shared LevelDB instance is a single point of failure and should therefore only be used for testing + purposes. + +A shared LevelDB instance can be created by instantiating the ``SharedLeveldbStore`` actor. + +.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#shared-store-creation + +By default, the shared instance writes journaled messages to a local directory named ``journal`` in the current +working directory. The storage location can be changed by configuration: + +.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#shared-store-config + +Actor systems that use a shared LevelDB store must activate the ``akka.persistence.journal.leveldb-shared`` +plugin. + +.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#shared-journal-config + +This plugin must be initialized by injecting the (remote) ``SharedLeveldbStore`` actor reference. Injection is +done by calling the ``SharedLeveldbJournal.setStore`` method with the actor reference as argument. + +.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#shared-store-usage + +Internal journal commands (sent by processors) are buffered until injection completes. Injection is idempotent +i.e. only the first injection is used. + +.. _local-snapshot-store: + +Local snapshot store +-------------------- + +The default snapshot store plugin is ``akka.persistence.snapshot-store.local`` which writes snapshot files to +the local filesystem. The default storage location is a directory named ``snapshots`` in the current working +directory. This can be changed by configuration where the specified path can be relative or absolute: + +.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-config + +Planned plugins +--------------- + +* Shared snapshot store (SPOF, for testing purposes) +* HA snapshot store backed by a distributed file system +* HA journal backed by a distributed (NoSQL) data store + Custom serialization ==================== diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index 85ea2edc11..5ff3c47843 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -69,6 +69,37 @@ akka { # Native LevelDB (via JNI) or LevelDB Java port native = on } + + # Shared LevelDB journal plugin (for testing only). + leveldb-shared { + + # Class name of the plugin. + class = "akka.persistence.journal.leveldb.SharedLeveldbJournal" + + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.actor.default-dispatcher" + + store { + + # Dispatcher for shared store actor. + store-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + + # Dispatcher for message replay. + replay-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + + # Storage location of LevelDB files. + dir = "journal" + + # Use fsync on write + fsync = off + + # Verify checksum on read. + checksum = off + + # Native LevelDB (via JNI) or LevelDB Java port + native = on + } + } } snapshot-store { diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index d904e48400..672e861b77 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -75,10 +75,10 @@ private[persistence] trait Eventsourced extends Processor { case p: PersistentRepr ⇒ deleteMessage(p.sequenceNr, true) throw new UnsupportedOperationException("Persistent commands not supported") - case WriteSuccess(p) if identical(p.payload, persistInvocations.head._1) ⇒ + case WriteSuccess(p) ⇒ withCurrentPersistent(p)(p ⇒ persistInvocations.head._2(p.payload)) onWriteComplete() - case e @ WriteFailure(p, _) if identical(p.payload, persistInvocations.head._1) ⇒ + case e @ WriteFailure(p, _) ⇒ Eventsourced.super.aroundReceive(receive, message) // stops actor by default onWriteComplete() case s @ WriteBatchSuccess ⇒ Eventsourced.super.aroundReceive(receive, s) @@ -93,9 +93,6 @@ private[persistence] trait Eventsourced extends Processor { processorStash.unstash() } } - - def identical(a: Any, b: Any): Boolean = - a.asInstanceOf[AnyRef] eq b.asInstanceOf[AnyRef] } private var persistInvocations: List[(Any, Any ⇒ Unit)] = Nil diff --git a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala index 8ce5b9c59b..52064ac5b1 100644 --- a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala @@ -9,6 +9,8 @@ import scala.collection.immutable import akka.actor._ /** + * INTERNAL API. + * * Defines messages exchanged between processors, channels and a journal. */ private[persistence] object JournalProtocol { diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index ac74e71b93..c071d3b98b 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -302,6 +302,9 @@ private[persistence] case class ConfirmablePersistentImpl( copy(sequenceNr = sequenceNr, processorId = processorId, deleted = deleted, resolved = resolved, confirms = confirms, confirmMessage = confirmMessage, confirmTarget = confirmTarget, sender = sender) } +/** + * INTERNAL API. + */ private[persistence] object ConfirmablePersistentImpl { def apply(persistent: PersistentRepr, confirmMessage: Confirm, confirmTarget: ActorRef): ConfirmablePersistentImpl = ConfirmablePersistentImpl(persistent.payload, persistent.sequenceNr, persistent.processorId, persistent.deleted, persistent.resolved, persistent.confirms, confirmMessage, confirmTarget, persistent.sender) diff --git a/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala b/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala index 1e27d6c2c3..adcef8426e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala @@ -108,6 +108,8 @@ object SelectedSnapshot { } /** + * INTERNAL API. + * * Defines messages exchanged between processors and a snapshot store. */ private[persistence] object SnapshotProtocol { diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index bb0b1b7e85..c20909842d 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -26,7 +26,7 @@ trait AsyncWriteJournal extends Actor with AsyncReplay { private val resequencer = context.actorOf(Props[Resequencer]) private var resequencerCounter = 1L - final def receive = { + def receive = { case WriteBatch(persistentBatch, processor) ⇒ val cctr = resequencerCounter def resequence(f: PersistentRepr ⇒ Any) = persistentBatch.zipWithIndex.foreach { @@ -92,6 +92,9 @@ trait AsyncWriteJournal extends Actor with AsyncReplay { //#journal-plugin-api } +/** + * INTERNAL API. + */ private[persistence] object AsyncWriteJournal { case class Desequenced(msg: Any, snr: Long, target: ActorRef, sender: ActorRef) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala new file mode 100644 index 0000000000..880015d2cf --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala @@ -0,0 +1,110 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.persistence.journal + +import scala.collection.immutable +import scala.concurrent._ +import scala.concurrent.duration.Duration +import scala.language.postfixOps + +import akka.AkkaException +import akka.actor._ +import akka.pattern.ask +import akka.persistence._ +import akka.util._ + +/** + * INTERNAL API. + * + * A journal that delegates actual storage to a target actor. For testing only. + */ +private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash { + import AsyncWriteProxy._ + import AsyncWriteTarget._ + + private val initialized = super.receive + private var store: ActorRef = _ + + override def receive = { + case SetStore(ref) ⇒ + store = ref + unstashAll() + context.become(initialized) + case _ ⇒ stash() + } + + implicit def timeout: Timeout + + def writeAsync(persistentBatch: immutable.Seq[PersistentRepr]): Future[Unit] = + (store ? WriteBatch(persistentBatch)).mapTo[Unit] + + def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Future[Unit] = + (store ? Delete(processorId, fromSequenceNr, toSequenceNr, permanent)).mapTo[Unit] + + def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] = + (store ? Confirm(processorId, sequenceNr, channelId)).mapTo[Unit] + + def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentRepr) ⇒ Unit): Future[Long] = { + val replayCompletionPromise = Promise[Long] + val mediator = context.actorOf(Props(classOf[ReplayMediator], replayCallback, replayCompletionPromise, timeout.duration).withDeploy(Deploy.local)) + store.tell(Replay(processorId, fromSequenceNr, toSequenceNr), mediator) + replayCompletionPromise.future + } +} + +/** + * INTERNAL API. + */ +private[persistence] object AsyncWriteProxy { + case class SetStore(ref: ActorRef) +} + +/** + * INTERNAL API. + */ +private[persistence] object AsyncWriteTarget { + @SerialVersionUID(1L) + case class WriteBatch(pb: immutable.Seq[PersistentRepr]) + + @SerialVersionUID(1L) + case class Delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean) + + @SerialVersionUID(1L) + case class Confirm(processorId: String, sequenceNr: Long, channelId: String) + + @SerialVersionUID(1L) + case class Replay(processorId: String, fromSequenceNr: Long, toSequenceNr: Long) + + @SerialVersionUID(1L) + case class ReplaySuccess(maxSequenceNr: Long) + + @SerialVersionUID(1L) + case class ReplayFailure(cause: Throwable) +} + +/** + * Thrown if replay inactivity exceeds a specified timeout. + */ +@SerialVersionUID(1L) +class AsyncReplayTimeoutException(msg: String) extends AkkaException(msg) + +private class ReplayMediator(replayCallback: PersistentRepr ⇒ Unit, replayCompletionPromise: Promise[Long], replayTimeout: Duration) extends Actor { + import AsyncWriteTarget._ + + context.setReceiveTimeout(replayTimeout) + + def receive = { + case p: PersistentRepr ⇒ replayCallback(p) + case ReplaySuccess(maxSnr) ⇒ + replayCompletionPromise.success(maxSnr) + context.stop(self) + case ReplayFailure(cause) ⇒ + replayCompletionPromise.failure(cause) + context.stop(self) + case ReceiveTimeout ⇒ + replayCompletionPromise.failure(new AsyncReplayTimeoutException(s"replay timed out after ${replayTimeout.toSeconds} seconds inactivity")) + context.stop(self) + } +} \ No newline at end of file diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala index 9b7a93b416..6037702165 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala @@ -5,39 +5,29 @@ package akka.persistence.journal.inmem import scala.collection.immutable -import scala.concurrent.Future import scala.concurrent.duration._ import scala.language.postfixOps import akka.actor._ -import akka.pattern.ask import akka.persistence._ -import akka.persistence.journal.AsyncWriteJournal -import akka.util._ +import akka.persistence.journal.AsyncWriteProxy +import akka.persistence.journal.AsyncWriteTarget +import akka.util.Timeout /** * INTERNAL API. * * In-memory journal for testing purposes only. */ -private[persistence] class InmemJournal extends AsyncWriteJournal { - val store = context.actorOf(Props[InmemStore]) +private[persistence] class InmemJournal extends AsyncWriteProxy { + import AsyncWriteProxy.SetStore - implicit val timeout = Timeout(5 seconds) + val timeout = Timeout(5 seconds) - import InmemStore._ - - def writeAsync(persistentBatch: immutable.Seq[PersistentRepr]): Future[Unit] = - (store ? WriteBatch(persistentBatch)).mapTo[Unit] - - def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Future[Unit] = - (store ? Delete(processorId, fromSequenceNr, toSequenceNr, permanent)).mapTo[Unit] - - def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] = - (store ? Confirm(processorId, sequenceNr, channelId)).mapTo[Unit] - - def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentRepr) ⇒ Unit): Future[Long] = - (store ? Replay(processorId, fromSequenceNr, toSequenceNr, replayCallback)).mapTo[Long] + override def preStart(): Unit = { + super.preStart() + self ! SetStore(context.actorOf(Props[InmemStore])) + } } /** @@ -80,33 +70,19 @@ private[persistence] trait InmemMessages { * INTERNAL API. */ private[persistence] class InmemStore extends Actor with InmemMessages { - import InmemStore._ + import AsyncWriteTarget._ def receive = { case WriteBatch(pb) ⇒ - pb.foreach(add) - success() + sender ! pb.foreach(add) case Delete(pid, fsnr, tsnr, false) ⇒ - fsnr to tsnr foreach { snr ⇒ update(pid, snr)(_.update(deleted = true)) } - success() + sender ! (fsnr to tsnr foreach { snr ⇒ update(pid, snr)(_.update(deleted = true)) }) case Delete(pid, fsnr, tsnr, true) ⇒ - fsnr to tsnr foreach { snr ⇒ delete(pid, snr) } - success() + sender ! (fsnr to tsnr foreach { snr ⇒ delete(pid, snr) }) case Confirm(pid, snr, cid) ⇒ - update(pid, snr)(p ⇒ p.update(confirms = cid +: p.confirms)) - success() - case Replay(pid, fromSnr, toSnr, callback) ⇒ - read(pid, fromSnr, toSnr).foreach(callback) - success(maxSequenceNr(pid)) + sender ! update(pid, snr)(p ⇒ p.update(confirms = cid +: p.confirms)) + case Replay(pid, fromSnr, toSnr) ⇒ + read(pid, fromSnr, toSnr).foreach(sender ! _) + sender ! ReplaySuccess(maxSequenceNr(pid)) } - - private def success(reply: Any = ()) = - sender ! reply -} - -private[persistence] object InmemStore { - case class WriteBatch(pb: Seq[PersistentRepr]) - case class Delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean) - case class Confirm(processorId: String, sequenceNr: Long, channelId: String) - case class Replay(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, replayCallback: (PersistentRepr) ⇒ Unit) } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala index d52f38fe6a..82147c8b7d 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala @@ -13,7 +13,7 @@ import akka.actor.Actor * * LevelDB backed persistent mapping of `String`-based processor and channel ids to numeric ids. */ -private[persistence] trait LeveldbIdMapping extends Actor { this: LeveldbJournal ⇒ +private[persistence] trait LeveldbIdMapping extends Actor { this: LeveldbStore ⇒ import Key._ private val idOffset = 10 diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala index 2d65853290..7b64c5d109 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala @@ -1,94 +1,38 @@ /** * Copyright (C) 2009-2013 Typesafe Inc. - * Copyright (C) 2012-2013 Eligotech BV. */ - package akka.persistence.journal.leveldb -import java.io.File +import scala.concurrent.duration._ +import scala.language.postfixOps -import scala.collection.immutable - -import org.iq80.leveldb._ - -import akka.persistence._ -import akka.persistence.journal.SyncWriteJournal -import akka.serialization.SerializationExtension +import akka.actor._ +import akka.persistence.Persistence +import akka.persistence.journal._ +import akka.util.Timeout /** * INTERNAL API. * - * LevelDB backed journal. + * Journal backed by a local LevelDB store. For production use. */ -private[persistence] class LeveldbJournal extends SyncWriteJournal with LeveldbIdMapping with LeveldbReplay { - val config = context.system.settings.config.getConfig("akka.persistence.journal.leveldb") - val nativeLeveldb = config.getBoolean("native") +private[persistence] class LeveldbJournal extends { val configPath = "akka.persistence.journal.leveldb" } with SyncWriteJournal with LeveldbStore - val leveldbOptions = new Options().createIfMissing(true) - val leveldbReadOptions = new ReadOptions().verifyChecksums(config.getBoolean("checksum")) - val leveldbWriteOptions = new WriteOptions().sync(config.getBoolean("fsync")) - val leveldbDir = new File(config.getString("dir")) - var leveldb: DB = _ - - def leveldbFactory = - if (nativeLeveldb) org.fusesource.leveldbjni.JniDBFactory.factory - else org.iq80.leveldb.impl.Iq80DBFactory.factory - - // TODO: support migration of processor and channel ids - // needed if default processor and channel ids are used - // (actor paths, which contain deployment information). - - val serialization = SerializationExtension(context.system) - - import Key._ - - def write(persistentBatch: immutable.Seq[PersistentRepr]) = - withBatch(batch ⇒ persistentBatch.foreach(persistent ⇒ addToBatch(persistent, batch))) - - def delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean) = withBatch { batch ⇒ - val nid = numericId(processorId) - if (permanent) fromSequenceNr to toSequenceNr foreach { sequenceNr ⇒ - batch.delete(keyToBytes(Key(nid, sequenceNr, 0))) // TODO: delete confirmations and deletion markers, if any. - } - else fromSequenceNr to toSequenceNr foreach { sequenceNr ⇒ - batch.put(keyToBytes(deletionKey(nid, sequenceNr)), Array.empty[Byte]) - } - } - - def confirm(processorId: String, sequenceNr: Long, channelId: String) { - leveldb.put(keyToBytes(Key(numericId(processorId), sequenceNr, numericId(channelId))), channelId.getBytes("UTF-8")) - } - - def leveldbSnapshot = leveldbReadOptions.snapshot(leveldb.getSnapshot) - def leveldbIterator = leveldb.iterator(leveldbSnapshot) - - def persistentToBytes(p: PersistentRepr): Array[Byte] = serialization.serialize(p).get - def persistentFromBytes(a: Array[Byte]): PersistentRepr = serialization.deserialize(a, classOf[PersistentRepr]).get - - private def addToBatch(persistent: PersistentRepr, batch: WriteBatch): Unit = { - val nid = numericId(persistent.processorId) - batch.put(keyToBytes(counterKey(nid)), counterToBytes(persistent.sequenceNr)) - batch.put(keyToBytes(Key(nid, persistent.sequenceNr, 0)), persistentToBytes(persistent)) - } - - private def withBatch[R](body: WriteBatch ⇒ R): R = { - val batch = leveldb.createWriteBatch() - try { - val r = body(batch) - leveldb.write(batch, leveldbWriteOptions) - r - } finally { - batch.close() - } - } - - override def preStart() { - leveldb = leveldbFactory.open(leveldbDir, if (nativeLeveldb) leveldbOptions else leveldbOptions.compressionType(CompressionType.NONE)) - super.preStart() - } - - override def postStop() { - leveldb.close() - super.postStop() - } +/** + * INTERNAL API. + * + * Journal backed by a [[SharedLeveldbStore]]. For testing only. + */ +private[persistence] class SharedLeveldbJournal extends AsyncWriteProxy { + val timeout: Timeout = Timeout(10 seconds) // TODO: make configurable +} + +object SharedLeveldbJournal { + /** + * Sets the shared LevelDB `store` for the given actor `system`. + * + * @see [[SharedLeveldbStore]] + */ + def setStore(store: ActorRef, system: ActorSystem): Unit = + Persistence(system).journalFor(null) ! AsyncWriteProxy.SetStore(store) } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbReplay.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbReplay.scala index afc0eee7d3..adb69fc915 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbReplay.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbReplay.scala @@ -15,16 +15,16 @@ import akka.persistence.journal.AsyncReplay * * LevelDB backed message replay. */ -private[persistence] trait LeveldbReplay extends AsyncReplay { this: LeveldbJournal ⇒ +private[persistence] trait LeveldbReplay extends AsyncReplay { this: LeveldbStore ⇒ import Key._ - private val replayDispatcherId = context.system.settings.config.getString("akka.persistence.journal.leveldb.replay-dispatcher") - private val replayDispatcher = context.system.dispatchers.lookup(replayDispatcherId) + private lazy val replayDispatcherId = config.getString("replay-dispatcher") + private lazy val replayDispatcher = context.system.dispatchers.lookup(replayDispatcherId) def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: PersistentRepr ⇒ Unit): Future[Long] = Future(replay(numericId(processorId), fromSequenceNr: Long, toSequenceNr)(replayCallback))(replayDispatcher) - private def replay(processorId: Int, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: PersistentRepr ⇒ Unit): Long = { + def replay(processorId: Int, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: PersistentRepr ⇒ Unit): Long = { val iter = leveldbIterator @scala.annotation.tailrec diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala new file mode 100644 index 0000000000..607a1d65f7 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala @@ -0,0 +1,121 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + * Copyright (C) 2012-2013 Eligotech BV. + */ + +package akka.persistence.journal.leveldb + +import java.io.File + +import scala.collection.immutable +import scala.util._ + +import org.iq80.leveldb._ + +import akka.actor._ +import akka.persistence._ +import akka.persistence.journal.AsyncWriteTarget +import akka.serialization.SerializationExtension + +/** + * INTERNAL API. + */ +private[persistence] trait LeveldbStore extends Actor with LeveldbIdMapping with LeveldbReplay { + val configPath: String + + val config = context.system.settings.config.getConfig(configPath) + val nativeLeveldb = config.getBoolean("native") + + val leveldbOptions = new Options().createIfMissing(true) + val leveldbReadOptions = new ReadOptions().verifyChecksums(config.getBoolean("checksum")) + val leveldbWriteOptions = new WriteOptions().sync(config.getBoolean("fsync")) + val leveldbDir = new File(config.getString("dir")) + var leveldb: DB = _ + + def leveldbFactory = + if (nativeLeveldb) org.fusesource.leveldbjni.JniDBFactory.factory + else org.iq80.leveldb.impl.Iq80DBFactory.factory + + // TODO: support migration of processor and channel ids + // needed if default processor and channel ids are used + // (actor paths, which contain deployment information). + + val serialization = SerializationExtension(context.system) + + import Key._ + + def write(persistentBatch: immutable.Seq[PersistentRepr]) = + withBatch(batch ⇒ persistentBatch.foreach(persistent ⇒ addToBatch(persistent, batch))) + + def delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean) = withBatch { batch ⇒ + val nid = numericId(processorId) + if (permanent) fromSequenceNr to toSequenceNr foreach { sequenceNr ⇒ + batch.delete(keyToBytes(Key(nid, sequenceNr, 0))) // TODO: delete confirmations and deletion markers, if any. + } + else fromSequenceNr to toSequenceNr foreach { sequenceNr ⇒ + batch.put(keyToBytes(deletionKey(nid, sequenceNr)), Array.empty[Byte]) + } + } + + def confirm(processorId: String, sequenceNr: Long, channelId: String) { + leveldb.put(keyToBytes(Key(numericId(processorId), sequenceNr, numericId(channelId))), channelId.getBytes("UTF-8")) + } + + def leveldbSnapshot = leveldbReadOptions.snapshot(leveldb.getSnapshot) + def leveldbIterator = leveldb.iterator(leveldbSnapshot) + + def persistentToBytes(p: PersistentRepr): Array[Byte] = serialization.serialize(p).get + def persistentFromBytes(a: Array[Byte]): PersistentRepr = serialization.deserialize(a, classOf[PersistentRepr]).get + + private def addToBatch(persistent: PersistentRepr, batch: WriteBatch): Unit = { + val nid = numericId(persistent.processorId) + batch.put(keyToBytes(counterKey(nid)), counterToBytes(persistent.sequenceNr)) + batch.put(keyToBytes(Key(nid, persistent.sequenceNr, 0)), persistentToBytes(persistent)) + } + + private def withBatch[R](body: WriteBatch ⇒ R): R = { + val batch = leveldb.createWriteBatch() + try { + val r = body(batch) + leveldb.write(batch, leveldbWriteOptions) + r + } finally { + batch.close() + } + } + + override def preStart() { + leveldb = leveldbFactory.open(leveldbDir, if (nativeLeveldb) leveldbOptions else leveldbOptions.compressionType(CompressionType.NONE)) + super.preStart() + } + + override def postStop() { + leveldb.close() + super.postStop() + } +} + +/** + * A LevelDB store that can be shared by multiple actor systems. The shared store must be + * set for each actor system that uses the store via `SharedLeveldbJournal.setStore`. The + * shared LevelDB store is for testing only. + */ +class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.leveldb-shared.store" } with LeveldbStore { + import AsyncWriteTarget._ + + def receive = { + case WriteBatch(pb) ⇒ sender ! write(pb) + case Delete(pid, fsnr, tsnr, permanent) ⇒ sender ! delete(pid, fsnr, tsnr, permanent) + case Confirm(pid, snr, cid) ⇒ sender ! confirm(pid, snr, cid) + case Replay(pid, fromSnr, toSnr) ⇒ + val npid = numericId(pid) + val res = for { + _ ← Try(replay(npid, fromSnr, toSnr)(sender ! _)) + max ← Try(maxSequenceNr(npid)) + } yield max + res match { + case Success(max) ⇒ sender ! ReplaySuccess(max) + case Failure(cause) ⇒ sender ! ReplayFailure(cause) + } + } +} diff --git a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala index c3b356f0e4..a6606d7022 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala @@ -93,7 +93,7 @@ object PerformanceSpec { } } -class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "performance").withFallback(ConfigFactory.parseString(PerformanceSpec.config))) with PersistenceSpec with ImplicitSender { +class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "performance", serialization = "off").withFallback(ConfigFactory.parseString(PerformanceSpec.config))) with PersistenceSpec with ImplicitSender { import PerformanceSpec._ val warmupCycles = system.settings.config.getInt("akka.persistence.performance.cycles.warmup") diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala index 9e8671e7e9..c1192ced3e 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala @@ -46,10 +46,10 @@ trait PersistenceSpec extends BeforeAndAfterEach with Cleanup { this: AkkaSpec } object PersistenceSpec { - def config(plugin: String, test: String) = ConfigFactory.parseString( + def config(plugin: String, test: String, serialization: String = "on") = ConfigFactory.parseString( s""" - serialize-creators = on - serialize-messages = on + akka.actor.serialize-creators = ${serialization} + akka.actor.serialize-messages = ${serialization} akka.persistence.publish-plugin-commands = on akka.persistence.journal.plugin = "akka.persistence.journal.${plugin}" akka.persistence.journal.leveldb.dir = "target/journal-${test}-spec" @@ -60,6 +60,7 @@ object PersistenceSpec { trait Cleanup { this: AkkaSpec ⇒ val storageLocations = List( "akka.persistence.journal.leveldb.dir", + "akka.persistence.journal.leveldb-shared.store.dir", "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) override protected def atStartup() { diff --git a/akka-persistence/src/test/scala/akka/persistence/journal/leveldb/SharedLeveldbJournalSpec.scala b/akka-persistence/src/test/scala/akka/persistence/journal/leveldb/SharedLeveldbJournalSpec.scala new file mode 100644 index 0000000000..b97150b724 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/journal/leveldb/SharedLeveldbJournalSpec.scala @@ -0,0 +1,99 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.persistence.journal.leveldb + +import com.typesafe.config.ConfigFactory + +import akka.actor._ +import akka.persistence._ +import akka.testkit.{ TestProbe, AkkaSpec } + +object SharedLeveldbJournalSpec { + val config = + """ + akka { + actor { + provider = "akka.remote.RemoteActorRefProvider" + } + persistence { + journal { + plugin = "akka.persistence.journal.leveldb-shared" + leveldb-shared.store.dir = target/shared-journal + } + snapshot-store.local.dir = target/snapshot-store + } + remote { + enabled-transports = ["akka.remote.netty.tcp"] + netty.tcp { + hostname = "127.0.0.1" + port = 0 + } + } + loglevel = ERROR + log-dead-letters = 0 + log-dead-letters-during-shutdown = off + } + """ + + class ExampleProcessor(probe: ActorRef, name: String) extends NamedProcessor(name) { + def receive = { + case Persistent(payload, _) ⇒ probe ! payload + } + } + + class ExampleApp(probe: ActorRef, port: Int) extends Actor { + val processor = context.actorOf(Props(classOf[ExampleProcessor], probe, context.system.name)) + + def receive = { + case ActorIdentity(1, Some(store)) ⇒ SharedLeveldbJournal.setStore(store, context.system) + case m ⇒ processor forward m + } + + override def preStart(): Unit = { + context.actorSelection(s"akka.tcp://store@127.0.0.1:${port}/user/store") ! Identify(1) + } + } + + def port(system: ActorSystem) = + system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress.port.get +} + +class SharedLeveldbJournalSpec extends AkkaSpec(SharedLeveldbJournalSpec.config) with Cleanup { + import SharedLeveldbJournalSpec._ + + "A LevelDB store" can { + "be shared by multiple actor systems" in { + val storeSystem = ActorSystem("store", ConfigFactory.parseString(SharedLeveldbJournalSpec.config)) + val processorASystem = ActorSystem("processorA", ConfigFactory.parseString(SharedLeveldbJournalSpec.config)) + val processorBSystem = ActorSystem("processorB", ConfigFactory.parseString(SharedLeveldbJournalSpec.config)) + + val processorAProbe = new TestProbe(processorASystem) + val processorBProbe = new TestProbe(processorBSystem) + + storeSystem.actorOf(Props[SharedLeveldbStore], "store") + + val appA = processorASystem.actorOf(Props(classOf[ExampleApp], processorAProbe.ref, port(storeSystem))) + val appB = processorBSystem.actorOf(Props(classOf[ExampleApp], processorBProbe.ref, port(storeSystem))) + + appA ! Persistent("a1") + appB ! Persistent("b1") + + processorAProbe.expectMsg("a1") + processorBProbe.expectMsg("b1") + + val recoveredAppA = processorASystem.actorOf(Props(classOf[ExampleApp], processorAProbe.ref, port(storeSystem))) + val recoveredAppB = processorBSystem.actorOf(Props(classOf[ExampleApp], processorBProbe.ref, port(storeSystem))) + + recoveredAppA ! Persistent("a2") + recoveredAppB ! Persistent("b2") + + processorAProbe.expectMsg("a1") + processorAProbe.expectMsg("a2") + + processorBProbe.expectMsg("b1") + processorBProbe.expectMsg("b2") + } + } +} diff --git a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala index 09759d2b1c..3a92e9c20f 100644 --- a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala @@ -1,3 +1,7 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + package akka.persistence.serialization import scala.collection.immutable @@ -10,12 +14,6 @@ import akka.serialization._ import akka.testkit._ object SerializerSpecConfigs { - val common = - """ - serialize-creators = on - serialize-messages = on - """ - val customSerializers = """ akka.actor { @@ -50,7 +48,7 @@ object SerializerSpecConfigs { val systemB = "akka.remote.netty.tcp.port = 0" def config(configs: String*): Config = - configs.foldLeft(ConfigFactory.parseString(common))((r, c) ⇒ r.withFallback(ConfigFactory.parseString(c))) + configs.foldLeft(ConfigFactory.empty)((r, c) ⇒ r.withFallback(ConfigFactory.parseString(c))) } import SerializerSpecConfigs._