diff --git a/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorBenchmark.scala deleted file mode 100644 index 65dad576ef..0000000000 --- a/akka-bench-jmh/src/main/scala/akka/persistence/PersistentActorBenchmark.scala +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.persistence - -import org.openjdk.jmh.annotations._ -import org.openjdk.jmh._ -import com.typesafe.config.ConfigFactory -import akka.actor._ -import akka.testkit.TestProbe -import java.io.File -import org.apache.commons.io.FileUtils -import org.openjdk.jmh.annotations.Scope - -@State(Scope.Benchmark) -@BenchmarkMode(Array(Mode.Throughput)) -class PersistentActorThroughputBenchmark { - - val config = PersistenceSpec.config("leveldb", "benchmark") - - lazy val storageLocations = List( - "akka.persistence.journal.leveldb.dir", - "akka.persistence.journal.leveldb-shared.store.dir", - "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) - - var system: ActorSystem = _ - - var probe: TestProbe = _ - var actor: ActorRef = _ - var persist1EventProcessor: ActorRef = _ - var persist1CommandProcessor: ActorRef = _ - var persistAsync1EventProcessor: ActorRef = _ - var persistAsync1QuickReplyEventProcessor: ActorRef = _ - - val data10k = (1 to 10000).toArray - - @Setup - def setup() { - system = ActorSystem("test", config) - - probe = TestProbe()(system) - - storageLocations.foreach(FileUtils.deleteDirectory) - actor = system.actorOf(Props(classOf[BaselineActor], data10k.last), "a-1") - persist1CommandProcessor = system.actorOf(Props(classOf[Persist1CommandProcessor], data10k.last), "p-1") - persist1EventProcessor = system.actorOf(Props(classOf[Persist1EventPersistentActor], data10k.last), "ep-1") - persistAsync1EventProcessor = system.actorOf(Props(classOf[PersistAsync1EventPersistentActor], data10k.last), "epa-1") - persistAsync1QuickReplyEventProcessor = system.actorOf(Props(classOf[PersistAsync1EventQuickReplyPersistentActor], data10k.last), "epa-2") - } - - @TearDown - def shutdown() { - system.shutdown() - system.awaitTermination() - - storageLocations.foreach(FileUtils.deleteDirectory) - } - - @GenerateMicroBenchmark - @OperationsPerInvocation(10000) - def tell_normalActor_reply_baseline() { - for (i <- data10k) actor.tell(i, probe.ref) - - probe.expectMsg(data10k.last) - } - - @GenerateMicroBenchmark - @OperationsPerInvocation(10000) - def tell_persist_reply() { - for (i <- data10k) persist1EventProcessor.tell(i, probe.ref) - - probe.expectMsg(Evt(data10k.last)) - } - - @GenerateMicroBenchmark - @OperationsPerInvocation(10000) - def tell_commandPersist_reply() { - for (i <- data10k) persist1CommandProcessor.tell(i, probe.ref) - - probe.expectMsg(Evt(data10k.last)) - } - - @GenerateMicroBenchmark - @OperationsPerInvocation(10000) - def tell_persistAsync_reply() { - for (i <- data10k) persistAsync1EventProcessor.tell(i, probe.ref) - - probe.expectMsg(Evt(data10k.last)) - } - - @GenerateMicroBenchmark - @OperationsPerInvocation(10000) - def tell_persistAsync_replyRightOnCommandReceive() { - for (i <- data10k) persistAsync1QuickReplyEventProcessor.tell(i, probe.ref) - - probe.expectMsg(Evt(data10k.last)) - } - -} - -class Persist1EventPersistentActor(respondAfter: Int) extends PersistentActor { - - override def persistenceId: String = self.path.name - - override def receiveCommand = { - case n: Int => persist(Evt(n)) { e => if (e.i == respondAfter) sender() ! e } - } - override def receiveRecover = { - case _ => // do nothing - } - -} -class Persist1CommandProcessor(respondAfter: Int) extends Processor { - override def receive = { - case n: Int => if (n == respondAfter) sender() ! Evt(n) - } -} - -class PersistAsync1EventPersistentActor(respondAfter: Int) extends PersistentActor { - override def persistenceId: String = self.path.name - - override def receiveCommand = { - case n: Int => - persistAsync(Evt(n)) { e => if (e.i == respondAfter) sender() ! e } - } - override def receiveRecover = { - case _ => // do nothing - } -} - -class PersistAsync1EventQuickReplyPersistentActor(respondAfter: Int) extends PersistentActor { - - override def persistenceId: String = self.path.name - - override def receiveCommand = { - case n: Int => - val e = Evt(n) - if (n == respondAfter) sender() ! e - persistAsync(e)(identity) - } - override def receiveRecover = { - case _ => // do nothing - } -} diff --git a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java index fd4f826124..5e74073018 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java @@ -5,17 +5,27 @@ package docs.persistence; //#plugin-imports -import akka.actor.UntypedActor; -import scala.concurrent.Future; + +import akka.actor.*; import akka.japi.Option; import akka.japi.Procedure; import akka.persistence.*; -import akka.persistence.journal.japi.*; -import akka.persistence.snapshot.japi.*; -//#plugin-imports -import akka.actor.*; +import akka.persistence.japi.journal.JavaJournalSpec; +import akka.persistence.japi.snapshot.JavaSnapshotStoreSpec; +import akka.persistence.journal.japi.AsyncWriteJournal; import akka.persistence.journal.leveldb.SharedLeveldbJournal; import akka.persistence.journal.leveldb.SharedLeveldbStore; +import akka.persistence.snapshot.japi.SnapshotStore; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.iq80.leveldb.util.FileUtils; +import scala.concurrent.Future; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +//#plugin-imports public class PersistencePluginDocTest { @@ -106,4 +116,68 @@ public class PersistencePluginDocTest { return null; } } + + + static Object o2 = new Object() { + //#journal-tck-java + class MyJournalSpecTest extends JavaJournalSpec { + + public MyJournalSpecTest() { + super(ConfigFactory.parseString( + "persistence.journal.plugin = " + + "\"akka.persistence.journal.leveldb-shared\"")); + } + } + //#journal-tck-java + }; + + static Object o3 = new Object() { + //#snapshot-store-tck-java + class MySnapshotStoreTest extends JavaSnapshotStoreSpec { + + public MySnapshotStoreTest() { + super(ConfigFactory.parseString( + "akka.persistence.snapshot-store.plugin = " + + "\"akka.persistence.snapshot-store.local\"")); + } + } + //#snapshot-store-tck-java + }; + + static Object o4 = new Object() { + //#journal-tck-before-after-java + class MyJournalSpecTest extends JavaJournalSpec { + + List storageLocations = new ArrayList(); + + public MyJournalSpecTest() { + super(ConfigFactory.parseString( + "persistence.journal.plugin = " + + "\"akka.persistence.journal.leveldb-shared\"")); + + Config config = system().settings().config(); + storageLocations.add(new File( + config.getString("akka.persistence.journal.leveldb.dir"))); + storageLocations.add(new File( + config.getString("akka.persistence.snapshot-store.local.dir"))); + } + + @Override + public void beforeAll() { + for (File storageLocation : storageLocations) { + FileUtils.deleteRecursively(storageLocation); + } + super.beforeAll(); + } + + @Override + public void afterAll() { + super.afterAll(); + for (File storageLocation : storageLocations) { + FileUtils.deleteRecursively(storageLocation); + } + } + } + //#journal-tck-before-after-java + }; } diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index a280f6e71b..ce6cc1316a 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -498,6 +498,40 @@ A snapshot store plugin can be activated with the following minimal configuratio The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``. +Plugin TCK +---------- +In order to help developers build correct and high quality storage plugins, we provide an Technology Compatibility Kit (`TCK `_ for short). + +The TCK is usable from Java as well as Scala projects, for Java you need to include the akka-persistence-tck-experimental dependency:: + + + com.typesafe.akka + akka-persistence-tck-experimental_${scala.version} + 2.3.5 + test + + +To include the Journal TCK tests in your test suite simply extend the provided ``JavaJournalSpec``: + +.. includecode:: ./code/docs/persistence/PersistencePluginDocTest.java#journal-tck-java + +We also provide a simple benchmarking class ``JavaJournalPerfSpec`` which includes all the tests that ``JavaJournalSpec`` +has, and also performs some longer operations on the Journal while printing it's performance stats. While it is NOT aimed +to provide a proper benchmarking environment it can be used to get a rough feel about your journals performance in the most +typical scenarios. + +In order to include the ``SnapshotStore`` TCK tests in your test suite simply extend the ``SnapshotStoreSpec: + +.. includecode:: ./code/docs/persistence/PersistencePluginDocTest.java#snapshot-store-tck-java + +In case your plugin requires some setting up (starting a mock database, removing temporary files etc.) you can override the +``beforeAll`` and ``afterAll`` methods to hook into the tests lifecycle: + +.. includecode:: ./code/docs/persistence/PersistencePluginDocTest.java#journal-tck-before-after-java + +We *highly recommend* including these specifications in your test suite, as they cover a broad range of cases you +might have otherwise forgotten to test for when writing a plugin from scratch. + Pre-packaged plugins ==================== diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index f440507302..1528708ddf 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -4,8 +4,9 @@ package docs.persistence -import akka.actor.{ Actor, ActorSystem, Props } +import akka.actor.{Actor, ActorSystem, Props} import akka.persistence._ +import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ import scala.language.postfixOps @@ -28,7 +29,7 @@ trait PersistenceDocSpec { new AnyRef { //#definition - import akka.persistence.{ PersistenceFailure, Persistent, Processor } + import akka.persistence.{PersistenceFailure, Persistent, Processor} class MyProcessor extends Processor { def receive = { @@ -93,11 +94,11 @@ trait PersistenceDocSpec { def receiveRecover: Receive = { case RecoveryCompleted => recoveryCompleted() - case evt => //... + case evt => //... } def receiveCommand: Receive = { - case msg => //... + case msg => //... } def recoveryCompleted(): Unit = { @@ -134,7 +135,7 @@ trait PersistenceDocSpec { new AnyRef { //#at-least-once-example - import akka.actor.{ Actor, ActorPath, Props } + import akka.actor.{Actor, ActorPath} import akka.persistence.AtLeastOnceDelivery case class Msg(deliveryId: Long, s: String) @@ -176,8 +177,8 @@ trait PersistenceDocSpec { new AnyRef { //#channel-example - import akka.actor.{ Actor, Props } - import akka.persistence.{ Channel, Deliver, Persistent, Processor } + import akka.actor.{Actor, Props} + import akka.persistence.{Channel, Deliver, Persistent, Processor} class MyProcessor extends Processor { val destination = context.actorOf(Props[MyDestination]) @@ -251,7 +252,7 @@ trait PersistenceDocSpec { new AnyRef { //#fsm-example import akka.actor.FSM - import akka.persistence.{ Persistent, Processor } + import akka.persistence.{Persistent, Processor} class PersistentDoor extends Processor with FSM[String, Int] { startWith("closed", 0) diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala index 98e8a5443e..1887bf6f27 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala @@ -5,20 +5,18 @@ package docs.persistence //#plugin-imports -import scala.concurrent.Future -import scala.collection.immutable.Seq -//#plugin-imports - -import com.typesafe.config._ -import org.scalatest.WordSpec -import scala.concurrent.duration._ -import akka.testkit.TestKit import akka.actor.ActorSystem -//#plugin-imports import akka.persistence._ import akka.persistence.journal._ import akka.persistence.snapshot._ +import akka.testkit.TestKit +import com.typesafe.config._ +import org.scalatest.WordSpec + +import scala.collection.immutable.Seq +import scala.concurrent.Future +import scala.concurrent.duration._ //#plugin-imports object PersistencePluginDocSpec { @@ -115,7 +113,6 @@ trait SharedLeveldbPluginDocSpec { new AnyRef { import akka.actor._ - //#shared-store-creation import akka.persistence.journal.leveldb.SharedLeveldbStore val store = system.actorOf(Props[SharedLeveldbStore], "store") @@ -139,3 +136,60 @@ class MySnapshotStore extends SnapshotStore { def delete(metadata: SnapshotMetadata): Unit = ??? def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Unit = ??? } + +object PersistenceTCKDoc { + new AnyRef { + import akka.persistence.journal.JournalSpec + + //#journal-tck-scala + class MyJournalSpec extends JournalSpec { + override val config = ConfigFactory.parseString( + """ + |akka.persistence.journal.plugin = "my.journal.plugin" + """.stripMargin) + } + //#journal-tck-scala + } + new AnyRef { + import akka.persistence.snapshot.SnapshotStoreSpec + + //#snapshot-store-tck-scala + class MySnapshotStoreSpec extends SnapshotStoreSpec { + override val config = ConfigFactory.parseString( + """ + |akka.persistence.snapshot-store.plugin = "my.snapshot-store.plugin" + """.stripMargin) + } + //#snapshot-store-tck-scala + } + new AnyRef { + import java.io.File + + import akka.persistence.journal.JournalSpec + import org.iq80.leveldb.util.FileUtils + + //#journal-tck-before-after-scala + class MyJournalSpec extends JournalSpec { + override val config = ConfigFactory.parseString( + """ + |akka.persistence.journal.plugin = "my.journal.plugin" + """.stripMargin) + + val storageLocations = List( + new File(system.settings.config.getString("akka.persistence.journal.leveldb.dir")), + new File(config.getString("akka.persistence.snapshot-store.local.dir"))) + + override def beforeAll() { + super.beforeAll() + storageLocations foreach FileUtils.deleteRecursively + } + + override def afterAll() { + storageLocations foreach FileUtils.deleteRecursively + super.afterAll() + } + + } + //#journal-tck-before-after-scala + } +} \ No newline at end of file diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index a03188dcba..b071b28b70 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -500,6 +500,35 @@ A snapshot store plugin can be activated with the following minimal configuratio The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``. +Plugin TCK +---------- +In order to help developers build correct and high quality storage plugins, we provide an Technology Compatibility Kit (`TCK `_ for short). + +The TCK is usable from Java as well as Scala projects, for Scala you need to include the akka-persistence-tck-experimental dependency:: + + "com.typesafe.akka" %% "akka-persistence-tck-experimental" % "2.3.5" % "test" + +To include the Journal TCK tests in your test suite simply extend the provided ``JournalSpec``: + +.. includecode:: ./code/docs/persistence/PersistencePluginDocSpec.scala#journal-tck-scala + +We also provide a simple benchmarking class ``JournalPerfSpec`` which includes all the tests that ``JournalSpec`` +has, and also performs some longer operations on the Journal while printing it's performance stats. While it is NOT aimed +to provide a proper benchmarking environment it can be used to get a rough feel about your journals performance in the most +typical scenarios. + +In order to include the ``SnapshotStore`` TCK tests in your test suite simply extend the ``SnapshotStoreSpec``: + +.. includecode:: ./code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-store-tck-scala + +In case your plugin requires some setting up (starting a mock database, removing temporary files etc.) you can override the +``beforeAll`` and ``afterAll`` methods to hook into the tests lifecycle: + +.. includecode:: ./code/docs/persistence/PersistencePluginDocSpec.scala#journal-tck-before-after-scala + +We *highly recommend* including these specifications in your test suite, as they cover a broad range of cases you +might have otherwise forgotten to test for when writing a plugin from scratch. + .. _pre-packaged-plugins: Pre-packaged plugins diff --git a/akka-persistence-tck/build.sbt b/akka-persistence-tck/build.sbt new file mode 100644 index 0000000000..35fb4b92fa --- /dev/null +++ b/akka-persistence-tck/build.sbt @@ -0,0 +1,23 @@ +import akka.{ AkkaBuild, Dependencies, Formatting, OSGi, Unidoc } +import com.typesafe.tools.mima.plugin.MimaKeys +import akka.MultiNode + +AkkaBuild.defaultSettings + +AkkaBuild.experimentalSettings + +Formatting.formatSettings + +Unidoc.scaladocSettings + +Unidoc.javadocSettings + +// OSGi.persistenceTck TODO: we do need to export this as OSGi bundle too? + +libraryDependencies ++= Dependencies.persistenceTck + +MimaKeys.previousArtifact := None + +fork in Test := true + +javaOptions in Test := MultiNode.defaultMultiJvmOptions diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/PluginSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/PluginSpec.scala new file mode 100644 index 0000000000..4674d52dcc --- /dev/null +++ b/akka-persistence-tck/src/main/scala/akka/persistence/PluginSpec.scala @@ -0,0 +1,43 @@ +package akka.persistence + +import java.util.concurrent.atomic.AtomicInteger + +import scala.reflect.ClassTag + +import akka.actor._ +import akka.testkit._ + +import com.typesafe.config._ + +import org.scalatest._ + +trait PluginSpec extends TestKitBase with WordSpecLike with Matchers with BeforeAndAfterAll with BeforeAndAfterEach { + private val counter = new AtomicInteger(0) + + private var _extension: Persistence = _ + private var _pid: String = _ + + // used to avoid messages be delivered to a restarted actor, + // this is akka-persistence internals and journals themselfes don't really care + protected val actorInstanceId = 1 + + override protected def beforeEach(): Unit = { + _pid = s"p-${counter.incrementAndGet()}" + } + override protected def beforeAll(): Unit = + _extension = Persistence(system) + + override protected def afterAll(): Unit = + shutdown(system) + + val config: Config + + def extension: Persistence = + _extension + + def pid: String = + _pid + + def subscribe[T: ClassTag](subscriber: ActorRef) = + system.eventStream.subscribe(subscriber, implicitly[ClassTag[T]].runtimeClass) +} diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/japi/journal/JavaJournalPerfSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/japi/journal/JavaJournalPerfSpec.scala new file mode 100644 index 0000000000..36aacb1800 --- /dev/null +++ b/akka-persistence-tck/src/main/scala/akka/persistence/japi/journal/JavaJournalPerfSpec.scala @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.persistence.japi.journal + +import akka.persistence.journal.{ JournalPerfSpec, JournalSpec } +import com.typesafe.config.Config +import org.junit.runner.RunWith +import org.scalatest.Informer +import org.scalatest.junit.JUnitRunner + +/** + * JAVA API + * + * Java / JUnit consumable equivalent of [[akka.persistence.journal.JournalPerfSpec]] and [[akka.persistence.journal.JournalSpec]]. + * + * This spec measures execution times of the basic operations that an [[akka.persistence.PersistentActor]] provides, + * using the provided Journal (plugin). + * + * It is *NOT* meant to be a comprehensive benchmark, but rather aims to help plugin developers to easily determine + * if their plugin's performance is roughly as expected. It also validates the plugin still works under "more messages" scenarios. + * + * The measurements are by default printed to `System.out`, if you want to customise this please override the [[#info]] method. + * + * The benchmark iteration and message counts are easily customisable by overriding these methods: + * + * {{{ + * @Override + * public long awaitDurationMillis() { return 10000; } + * + * @Override + * public int eventsCount() { return 10 * 1000; } + * + * @Override + * public int measurementIterations { return 10; } + * }}} + * + * In case your journal plugin needs some kind of setup or teardown, override the `beforeAll` or `afterAll` + * methods (don't forget to call `super` in your overriden methods). + * + * @see [[akka.persistence.journal.JournalSpec]] + * @see [[akka.persistence.journal.JournalPerfSpec]] + * @param config configures the Journal plugin to be tested + */ +@RunWith(classOf[JUnitRunner]) +class JavaJournalPerfSpec(val config: Config) extends JournalSpec with JournalPerfSpec { + override protected def info: Informer = new Informer { + override def apply(message: String, payload: Option[Any]): Unit = System.out.println(message) + } +} diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/japi/journal/JavaJournalSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/japi/journal/JavaJournalSpec.scala new file mode 100644 index 0000000000..9255036f17 --- /dev/null +++ b/akka-persistence-tck/src/main/scala/akka/persistence/japi/journal/JavaJournalSpec.scala @@ -0,0 +1,24 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.persistence.japi.journal + +import akka.persistence.journal.JournalSpec +import com.typesafe.config.Config +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner + +/** + * JAVA API + * + * Java / JUnit API for [[akka.persistence.journal.JournalSpec]]. + * + * In case your journal plugin needs some kind of setup or teardown, override the `beforeAll` or `afterAll` + * methods (don't forget to call `super` in your overriden methods). + * + * @see [[akka.persistence.journal.JournalSpec]] + * @see [[akka.persistence.journal.JournalPerfSpec]] + * @param config configures the Journal plugin to be tested + */ +@RunWith(classOf[JUnitRunner]) +class JavaJournalSpec(val config: Config) extends JournalSpec diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/japi/snapshot/JavaSnapshotStoreSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/japi/snapshot/JavaSnapshotStoreSpec.scala new file mode 100644 index 0000000000..b93c66a253 --- /dev/null +++ b/akka-persistence-tck/src/main/scala/akka/persistence/japi/snapshot/JavaSnapshotStoreSpec.scala @@ -0,0 +1,23 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.persistence.japi.snapshot + +import akka.persistence.snapshot.{ SnapshotStore, SnapshotStoreSpec } +import com.typesafe.config.Config +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner + +/** + * JAVA API + * + * This spec aims to verify custom akka-persistence [[SnapshotStore]] implementations. + * Plugin authors are highly encouraged to include it in their plugin's test suites. + * + * In case your snapshot-store plugin needs some kind of setup or teardown, override the `beforeAll` or `afterAll` + * methods (don't forget to call `super` in your overriden methods). + * + * @see [[akka.persistence.snapshot.SnapshotStoreSpec]] + */ +@RunWith(classOf[JUnitRunner]) +class JavaSnapshotStoreSpec(val config: Config) extends SnapshotStoreSpec diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalPerfSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalPerfSpec.scala new file mode 100644 index 0000000000..18f6228df7 --- /dev/null +++ b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalPerfSpec.scala @@ -0,0 +1,153 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.persistence.journal + +import akka.actor.{ ActorLogging, ActorRef, Props } +import akka.persistence.journal.JournalPerfSpec.{ BenchActor, Cmd, ResetCounter } +import akka.persistence.{ PersistentActor, PluginSpec } +import akka.testkit.TestProbe + +import scala.collection.immutable +import scala.concurrent.duration._ + +object JournalPerfSpec { + class BenchActor(val persistenceId: String, replyTo: ActorRef, replyAfter: Int) extends PersistentActor + with ActorLogging { + + var counter = 0 + + override def receiveCommand: Receive = { + case c @ Cmd("p", payload) ⇒ + persist(c) { d ⇒ + counter += 1 + require(d.payload == counter, s"Expected to receive [$counter] yet got: [${d.payload}]") + if (counter == replyAfter) replyTo ! d.payload + } + + case c @ Cmd("pa", payload) ⇒ + persistAsync(c) { d ⇒ + counter += 1 + require(d.payload == counter, s"Expected to receive [$counter] yet got: [${d.payload}]") + if (counter == replyAfter) replyTo ! d.payload + } + + case c @ Cmd("par", payload) ⇒ + counter += 1 + persistAsync(c) { d ⇒ + require(d.payload == counter, s"Expected to receive [$counter] yet got: [${d.payload}]") + } + if (counter == replyAfter) replyTo ! payload + + case c @ Cmd("n", payload) ⇒ + counter += 1 + require(payload == counter, s"Expected to receive [$counter] yet got: [${payload}]") + if (counter == replyAfter) replyTo ! payload + + case ResetCounter ⇒ + counter = 0 + } + + override def receiveRecover: Receive = { + case Cmd(_, payload) ⇒ + counter += 1 + require(payload == counter, s"Expected to receive [$counter] yet got: [${payload}]") + if (counter == replyAfter) replyTo ! payload + } + + } + + case object ResetCounter + case class Cmd(mode: String, payload: Int) +} + +/** + * This spec measures execution times of the basic operations that an [[akka.persistence.PersistentActor]] provides, + * using the provided Journal (plugin). + * + * It is *NOT* meant to be a comprehensive benchmark, but rather aims to help plugin developers to easily determine + * if their plugin's performance is roughly as expected. It also validates the plugin still works under "more messages" scenarios. + * + * In case your journal plugin needs some kind of setup or teardown, override the `beforeAll` or `afterAll` + * methods (don't forget to call `super` in your overriden methods). + * + * For a Java and JUnit consumable version of the TCK please refer to [[akka.persistence.japi.journal.JavaJournalPerfSpec]]. + * + * @see [[akka.persistence.journal.JournalSpec]] + */ +trait JournalPerfSpec extends PluginSpec { + this: JournalSpec ⇒ + + private val testProbe = TestProbe() + + def benchActor(replyAfter: Int): ActorRef = + system.actorOf(Props(classOf[BenchActor], pid, testProbe.ref, replyAfter)) + + def feedAndExpectLast(actor: ActorRef, mode: String, cmnds: immutable.Seq[Int]): Unit = { + cmnds foreach { c ⇒ actor ! Cmd(mode, c) } + testProbe.expectMsg(awaitDuration, cmnds.last) + } + + /** Executes a block of code multiple times (no warmup) */ + def measure(msg: Duration ⇒ String)(block: ⇒ Unit): Unit = { + val measurements = Array.ofDim[Duration](measurementIterations) + var i = 0 + while (i < measurementIterations) { + val start = System.nanoTime() + + block + + val stop = System.nanoTime() + val d = (stop - start).nanos + measurements(i) = d + info(msg(d)) + + i += 1 + } + info(s"Average time: ${(measurements.map(_.toNanos).sum / measurementIterations).nanos.toMillis} ms") + } + + /** Override in order to customize timeouts used for expectMsg, in order to tune the awaits to your journal's perf */ + def awaitDurationMillis: Long = 10.seconds.toMillis + + /** Override in order to customize timeouts used for expectMsg, in order to tune the awaits to your journal's perf */ + private def awaitDuration: FiniteDuration = awaitDurationMillis.millis + + /** Numbe of messages sent to the PersistentActor under test for each test iteration */ + def eventsCount: Int = 10 * 1000 + + /** Number of measurement iterations each test will be run. */ + def measurementIterations: Int = 10 + + private val commands = Vector(1 to eventsCount: _*) + + "A PersistentActor's performance" must { + s"measure: persistAsync()-ing $eventsCount events" in { + val p1 = benchActor(eventsCount) + + measure(d ⇒ s"PersistAsync()-ing $eventsCount took ${d.toMillis} ms") { + feedAndExpectLast(p1, "pa", commands) + p1 ! ResetCounter + } + } + s"measure: persist()-ing $eventsCount events" in { + val p1 = benchActor(eventsCount) + + measure(d ⇒ s"Persist()-ing $eventsCount took ${d.toMillis} ms") { + feedAndExpectLast(p1, "p", commands) + p1 ! ResetCounter + } + } + s"measure: recovering $eventsCount events" in { + val p1 = benchActor(eventsCount) + feedAndExpectLast(p1, "p", commands) + + measure(d ⇒ s"Recovering $eventsCount took ${d.toMillis} ms") { + benchActor(eventsCount) + testProbe.expectMsg(max = awaitDuration, commands.last) + } + + } + } + +} diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala new file mode 100644 index 0000000000..8d51027ca1 --- /dev/null +++ b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala @@ -0,0 +1,198 @@ +package akka.persistence.journal + +import scala.collection.immutable.Seq + +import akka.actor._ +import akka.persistence._ +import akka.persistence.JournalProtocol._ +import akka.testkit._ + +import com.typesafe.config._ + +object JournalSpec { + val config = ConfigFactory.parseString( + """ + |akka.persistence.publish-confirmations = on + |akka.persistence.publish-plugin-commands = on + """.stripMargin) + + case class Confirmation(persistenceId: String, channelId: String, sequenceNr: Long) extends PersistentConfirmation +} + +/** + * This spec aims to verify custom akka-persistence Journal implementations. + * Plugin authors are highly encouraged to include it in their plugin's test suites. + * + * In case your journal plugin needs some kind of setup or teardown, override the `beforeAll` or `afterAll` + * methods (don't forget to call `super` in your overriden methods). + * + * For a Java and JUnit consumable version of the TCK please refer to [[akka.persistence.japi.journal.JavaJournalSpec]]. + * + * @see [[akka.persistence.journal.JournalPerfSpec]] + * @see [[akka.persistence.japi.journal.JavaJournalPerfSpec]] + */ +trait JournalSpec extends PluginSpec { + import JournalSpec._ + + implicit lazy val system: ActorSystem = ActorSystem("JournalSpec", config.withFallback(JournalSpec.config)) + + private var senderProbe: TestProbe = _ + private var receiverProbe: TestProbe = _ + + override protected def beforeEach(): Unit = { + super.beforeEach() + senderProbe = TestProbe() + receiverProbe = TestProbe() + writeMessages(1, 5, pid, senderProbe.ref) + } + + def journal: ActorRef = + extension.journalFor(null) + + def replayedMessage(snr: Long, deleted: Boolean = false, confirms: Seq[String] = Nil): ReplayedMessage = + ReplayedMessage(PersistentImpl(s"a-${snr}", snr, pid, deleted, confirms, senderProbe.ref)) + + def writeMessages(from: Int, to: Int, pid: String, sender: ActorRef): Unit = { + val msgs = from to to map { i ⇒ PersistentRepr(payload = s"a-${i}", sequenceNr = i, persistenceId = pid, sender = sender) } + val probe = TestProbe() + + journal ! WriteMessages(msgs, probe.ref, actorInstanceId) + + probe.expectMsg(WriteMessagesSuccessful) + from to to foreach { i ⇒ + probe.expectMsgPF() { case WriteMessageSuccess(PersistentImpl(payload, `i`, `pid`, _, _, `sender`), _) ⇒ payload should be(s"a-${i}") } + } + } + + "A journal" must { + "replay all messages" in { + journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) + 1 to 5 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } + receiverProbe.expectMsg(ReplayMessagesSuccess) + } + "replay messages using a lower sequence number bound" in { + journal ! ReplayMessages(3, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) + 3 to 5 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } + receiverProbe.expectMsg(ReplayMessagesSuccess) + } + "replay messages using an upper sequence number bound" in { + journal ! ReplayMessages(1, 3, Long.MaxValue, pid, receiverProbe.ref) + 1 to 3 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } + receiverProbe.expectMsg(ReplayMessagesSuccess) + } + "replay messages using a count limit" in { + journal ! ReplayMessages(1, Long.MaxValue, 3, pid, receiverProbe.ref) + 1 to 3 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } + receiverProbe.expectMsg(ReplayMessagesSuccess) + } + "replay messages using a lower and upper sequence number bound" in { + journal ! ReplayMessages(2, 4, Long.MaxValue, pid, receiverProbe.ref) + 2 to 4 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } + receiverProbe.expectMsg(ReplayMessagesSuccess) + } + "replay messages using a lower and upper sequence number bound and a count limit" in { + journal ! ReplayMessages(2, 4, 2, pid, receiverProbe.ref) + 2 to 3 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } + receiverProbe.expectMsg(ReplayMessagesSuccess) + } + "replay a single if lower sequence number bound equals upper sequence number bound" in { + journal ! ReplayMessages(2, 2, Long.MaxValue, pid, receiverProbe.ref) + 2 to 2 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } + receiverProbe.expectMsg(ReplayMessagesSuccess) + } + "replay a single message if count limit equals 1" in { + journal ! ReplayMessages(2, 4, 1, pid, receiverProbe.ref) + 2 to 2 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } + receiverProbe.expectMsg(ReplayMessagesSuccess) + } + "not replay messages if count limit equals 0" in { + journal ! ReplayMessages(2, 4, 0, pid, receiverProbe.ref) + receiverProbe.expectMsg(ReplayMessagesSuccess) + } + "not replay messages if lower sequence number bound is greater than upper sequence number bound" in { + journal ! ReplayMessages(3, 2, Long.MaxValue, pid, receiverProbe.ref) + receiverProbe.expectMsg(ReplayMessagesSuccess) + } + "not replay permanently deleted messages (range deletion)" in { + val cmd = DeleteMessagesTo(pid, 3, true) + val sub = TestProbe() + + subscribe[DeleteMessagesTo](sub.ref) + journal ! cmd + sub.expectMsg(cmd) + + journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) + List(4, 5) foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } + } + "replay logically deleted messages with deleted field set to true (range deletion)" in { + val cmd = DeleteMessagesTo(pid, 3, false) + val sub = TestProbe() + + subscribe[DeleteMessagesTo](sub.ref) + journal ! cmd + sub.expectMsg(cmd) + + journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref, replayDeleted = true) + 1 to 5 foreach { i ⇒ + i match { + case 1 | 2 | 3 ⇒ receiverProbe.expectMsg(replayedMessage(i, deleted = true)) + case 4 | 5 ⇒ receiverProbe.expectMsg(replayedMessage(i)) + } + } + } + "replay confirmed messages with corresponding channel ids contained in the confirmed field" in { + val confs = List(Confirmation(pid, "c1", 3), Confirmation(pid, "c2", 3)) + val lpid = pid + + journal ! WriteConfirmations(confs, receiverProbe.ref) + receiverProbe.expectMsg(WriteConfirmationsSuccess(confs)) + + journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref, replayDeleted = true) + 1 to 5 foreach { i ⇒ + i match { + case 1 | 2 | 4 | 5 ⇒ receiverProbe.expectMsg(replayedMessage(i)) + case 3 ⇒ receiverProbe.expectMsgPF() { + case ReplayedMessage(PersistentImpl(payload, `i`, `lpid`, false, confirms, _)) ⇒ + confirms should have length (2) + confirms should contain("c1") + confirms should contain("c2") + } + } + } + } + "ignore orphan deletion markers" in { + val msgIds = List(PersistentIdImpl(pid, 3), PersistentIdImpl(pid, 4)) + journal ! DeleteMessages(msgIds, true, Some(receiverProbe.ref)) // delete message + receiverProbe.expectMsg(DeleteMessagesSuccess(msgIds)) + + journal ! DeleteMessages(msgIds, false, Some(receiverProbe.ref)) // write orphan marker + receiverProbe.expectMsg(DeleteMessagesSuccess(msgIds)) + + journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) + List(1, 2, 5) foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } + } + "ignore orphan confirmation markers" in { + val msgIds = List(PersistentIdImpl(pid, 3)) + journal ! DeleteMessages(msgIds, true, Some(receiverProbe.ref)) // delete message + receiverProbe.expectMsg(DeleteMessagesSuccess(msgIds)) + + val confs = List(Confirmation(pid, "c1", 3), Confirmation(pid, "c2", 3)) + journal ! WriteConfirmations(confs, receiverProbe.ref) + receiverProbe.expectMsg(WriteConfirmationsSuccess(confs)) + + journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) + List(1, 2, 4, 5) foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } + } + "return a highest stored sequence number > 0 if the persistent actor has already written messages and the message log is non-empty" in { + journal ! ReadHighestSequenceNr(3L, pid, receiverProbe.ref) + receiverProbe.expectMsg(ReadHighestSequenceNrSuccess(5)) + + journal ! ReadHighestSequenceNr(5L, pid, receiverProbe.ref) + receiverProbe.expectMsg(ReadHighestSequenceNrSuccess(5)) + } + "return a highest stored sequence number == 0 if the persistent actor has not yet written messages" in { + journal ! ReadHighestSequenceNr(0L, "non-existing-pid", receiverProbe.ref) + receiverProbe.expectMsg(ReadHighestSequenceNrSuccess(0)) + } + } +} diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/snapshot/SnapshotStoreSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/snapshot/SnapshotStoreSpec.scala new file mode 100644 index 0000000000..2bbcf86e10 --- /dev/null +++ b/akka-persistence-tck/src/main/scala/akka/persistence/snapshot/SnapshotStoreSpec.scala @@ -0,0 +1,108 @@ +package akka.persistence.snapshot + +import scala.collection.immutable.Seq + +import akka.actor._ +import akka.persistence._ +import akka.persistence.SnapshotProtocol._ +import akka.testkit.TestProbe + +import com.typesafe.config.ConfigFactory + +object SnapshotStoreSpec { + val config = ConfigFactory.parseString("akka.persistence.publish-plugin-commands = on") +} + +/** + * This spec aims to verify custom akka-persistence [[SnapshotStore]] implementations. + * Plugin authors are highly encouraged to include it in their plugin's test suites. + * + * In case your journal plugin needs some kind of setup or teardown, override the `beforeAll` or `afterAll` + * methods (don't forget to call `super` in your overriden methods). + * + * For a Java and JUnit consumable version of the TCK please refer to [[akka.persistence.japi.snapshot.JavaSnapshotStoreSpec]]. + * + * @see [[akka.persistence.japi.snapshot.JavaSnapshotStoreSpec]] + */ +trait SnapshotStoreSpec extends PluginSpec { + implicit lazy val system = ActorSystem("SnapshotStoreSpec", config.withFallback(SnapshotStoreSpec.config)) + + private var senderProbe: TestProbe = _ + private var metadata: Seq[SnapshotMetadata] = Nil + + override protected def beforeEach(): Unit = { + super.beforeEach() + senderProbe = TestProbe() + metadata = writeSnapshots() + } + + def snapshotStore: ActorRef = + extension.snapshotStoreFor(null) + + def writeSnapshots(): Seq[SnapshotMetadata] = { + 1 to 5 map { i ⇒ + val metadata = SnapshotMetadata(pid, i + 10) + snapshotStore.tell(SaveSnapshot(metadata, s"s-${i}"), senderProbe.ref) + senderProbe.expectMsgPF() { case SaveSnapshotSuccess(md) ⇒ md } + } + } + + "A snapshot store" must { + "not load a snapshot given an invalid processor id" in { + snapshotStore.tell(LoadSnapshot("invalid", SnapshotSelectionCriteria.Latest, Long.MaxValue), senderProbe.ref) + senderProbe.expectMsg(LoadSnapshotResult(None, Long.MaxValue)) + } + "not load a snapshot given non-matching timestamp criteria" in { + snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria.Latest.copy(maxTimestamp = 100), Long.MaxValue), senderProbe.ref) + senderProbe.expectMsg(LoadSnapshotResult(None, Long.MaxValue)) + } + "not load a snapshot given non-matching sequence number criteria" in { + snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria(7), Long.MaxValue), senderProbe.ref) + senderProbe.expectMsg(LoadSnapshotResult(None, Long.MaxValue)) + snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria.Latest, 7), senderProbe.ref) + senderProbe.expectMsg(LoadSnapshotResult(None, 7)) + } + "load the most recent snapshot" in { + snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria.Latest, Long.MaxValue), senderProbe.ref) + senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(4), s"s-5")), Long.MaxValue)) + } + "load the most recent snapshot matching an upper sequence number bound" in { + snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria(13), Long.MaxValue), senderProbe.ref) + senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(2), s"s-3")), Long.MaxValue)) + snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria.Latest, 13), senderProbe.ref) + senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(2), s"s-3")), 13)) + } + "load the most recent snapshot matching upper sequence number and timestamp bounds" in { + snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria(13, metadata(2).timestamp), Long.MaxValue), senderProbe.ref) + senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(2), s"s-3")), Long.MaxValue)) + snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria.Latest.copy(maxTimestamp = metadata(2).timestamp), 13), senderProbe.ref) + senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(2), s"s-3")), 13)) + } + "delete a single snapshot identified by snapshot metadata" in { + val md = metadata(2) + val cmd = DeleteSnapshot(md) + val sub = TestProbe() + + subscribe[DeleteSnapshot](sub.ref) + snapshotStore ! cmd + sub.expectMsg(cmd) + + snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria(md.sequenceNr, md.timestamp), Long.MaxValue), senderProbe.ref) + senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(1), s"s-2")), Long.MaxValue)) + } + "delete all snapshots matching upper sequence number and timestamp bounds" in { + val md = metadata(2) + val cmd = DeleteSnapshots(pid, SnapshotSelectionCriteria(md.sequenceNr, md.timestamp)) + val sub = TestProbe() + + subscribe[DeleteSnapshots](sub.ref) + snapshotStore ! cmd + sub.expectMsg(cmd) + + snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria(md.sequenceNr, md.timestamp), Long.MaxValue), senderProbe.ref) + senderProbe.expectMsg(LoadSnapshotResult(None, Long.MaxValue)) + snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria(metadata(3).sequenceNr, metadata(3).timestamp), Long.MaxValue), senderProbe.ref) + senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(3), s"s-4")), Long.MaxValue)) + } + } +} diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/PluginCleanup.scala b/akka-persistence-tck/src/test/scala/akka/persistence/PluginCleanup.scala new file mode 100644 index 0000000000..e5e663e8b1 --- /dev/null +++ b/akka-persistence-tck/src/test/scala/akka/persistence/PluginCleanup.scala @@ -0,0 +1,21 @@ +package akka.persistence + +import java.io.File + +import org.apache.commons.io.FileUtils + +trait PluginCleanup extends PluginSpec { + val storageLocations = List( + "akka.persistence.journal.leveldb.dir", + "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) + + override def beforeAll() { + storageLocations.foreach(FileUtils.deleteDirectory) + super.beforeAll() + } + + override def afterAll() { + super.afterAll() + storageLocations.foreach(FileUtils.deleteDirectory) + } +} diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/japi/JavaJournalSpecSpec.scala b/akka-persistence-tck/src/test/scala/akka/persistence/japi/JavaJournalSpecSpec.scala new file mode 100644 index 0000000000..47b94509eb --- /dev/null +++ b/akka-persistence-tck/src/test/scala/akka/persistence/japi/JavaJournalSpecSpec.scala @@ -0,0 +1,12 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.persistence.japi + +import akka.persistence.japi.journal.JavaJournalSpec +import com.typesafe.config.ConfigFactory +import org.scalatest.DoNotDiscover + +/* Only checking that compilation works with the constructor here as expected (no other abstract fields leaked) */ +@DoNotDiscover +class JavaJournalSpecSpec extends JavaJournalSpec(ConfigFactory.parseString("")) diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalJavaSpec.scala b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalJavaSpec.scala new file mode 100644 index 0000000000..bb03a9955b --- /dev/null +++ b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalJavaSpec.scala @@ -0,0 +1,16 @@ +package akka.persistence.journal.leveldb + +import com.typesafe.config.ConfigFactory + +import akka.persistence.journal.{ JournalPerfSpec, JournalSpec } +import akka.persistence.PluginCleanup + +class LeveldbJournalJavaSpec extends JournalSpec with PluginCleanup { + lazy val config = ConfigFactory.parseString( + """ + |akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" + |akka.persistence.journal.leveldb.native = off + |akka.persistence.journal.leveldb.dir = "target/journal-java" + |akka.persistence.snapshot-store.local.dir = "target/snapshots-java/" + """.stripMargin) +} diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativePerfSpec.scala b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativePerfSpec.scala new file mode 100644 index 0000000000..bdcc391e5c --- /dev/null +++ b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativePerfSpec.scala @@ -0,0 +1,21 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.persistence.journal.leveldb + +import com.typesafe.config.ConfigFactory + +import akka.persistence.journal.{ JournalPerfSpec, JournalSpec } +import akka.persistence.PluginCleanup +import org.scalatest.DoNotDiscover + +@DoNotDiscover // because only checking that compilation is OK with JournalPerfSpec +class LeveldbJournalNativePerfSpec extends JournalSpec with JournalPerfSpec with PluginCleanup { + lazy val config = ConfigFactory.parseString( + """ + |akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" + |akka.persistence.journal.leveldb.native = on + |akka.persistence.journal.leveldb.dir = "target/journal-native" + |akka.persistence.snapshot-store.local.dir = "target/snapshots-native/" + """.stripMargin) +} diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativeSpec.scala b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativeSpec.scala new file mode 100644 index 0000000000..c83f6a9e68 --- /dev/null +++ b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativeSpec.scala @@ -0,0 +1,16 @@ +package akka.persistence.journal.leveldb + +import com.typesafe.config.ConfigFactory + +import akka.persistence.journal.{ JournalPerfSpec, JournalSpec } +import akka.persistence.PluginCleanup + +class LeveldbJournalNativeSpec extends JournalSpec with PluginCleanup { + lazy val config = ConfigFactory.parseString( + """ + |akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" + |akka.persistence.journal.leveldb.native = on + |akka.persistence.journal.leveldb.dir = "target/journal-native" + |akka.persistence.snapshot-store.local.dir = "target/snapshots-native/" + """.stripMargin) +} diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/snapshot/local/LocalSnapshotStoreSpec.scala b/akka-persistence-tck/src/test/scala/akka/persistence/snapshot/local/LocalSnapshotStoreSpec.scala new file mode 100644 index 0000000000..a6c0009165 --- /dev/null +++ b/akka-persistence-tck/src/test/scala/akka/persistence/snapshot/local/LocalSnapshotStoreSpec.scala @@ -0,0 +1,16 @@ +package akka.persistence.snapshot.local + +import com.typesafe.config.ConfigFactory + +import akka.persistence.PluginCleanup +import akka.persistence.snapshot.SnapshotStoreSpec + +class LocalSnapshotStoreSpec extends SnapshotStoreSpec with PluginCleanup { + lazy val config = ConfigFactory.parseString( + """ + |akka.test.timefactor = 3 + |akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + |akka.persistence.snapshot-store.local.dir = "target/snapshots" + """.stripMargin) + +} diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 93a94d7595..42e0d21126 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -4,23 +4,22 @@ package akka -import sbt._ -import sbt.Keys._ -import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.{ MultiJvm, extraOptions } -import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings -import com.typesafe.tools.mima.plugin.MimaKeys.previousArtifact -import com.typesafe.tools.mima.plugin.MimaKeys.reportBinaryIssues -import com.typesafe.tools.mima.plugin.MimaKeys.binaryIssueFilters -import java.io.{InputStreamReader, FileInputStream, File} +import java.io.{FileInputStream, InputStreamReader} import java.util.Properties -import sbtunidoc.Plugin.UnidocKeys.unidoc -import TestExtras.{ JUnitFileReporting, StatsDMetrics, GraphiteBuildEvents } -import com.typesafe.sbt.S3Plugin.{ S3, s3Settings } -import Unidoc.{ scaladocSettings, unidocSettings } -import Formatting.docFormatSettings -import MultiNode.multiJvmSettings + +import akka.Formatting.docFormatSettings +import akka.MultiNode.multiJvmSettings +import akka.TestExtras.{GraphiteBuildEvents, JUnitFileReporting, StatsDMetrics} +import akka.Unidoc.{scaladocSettings, unidocSettings} +import com.typesafe.sbt.S3Plugin.{S3, s3Settings} +import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.{MultiJvm, extraOptions} import com.typesafe.sbt.site.SphinxSupport import com.typesafe.sbt.site.SphinxSupport.Sphinx +import com.typesafe.tools.mima.plugin.MimaKeys.{binaryIssueFilters, previousArtifact, reportBinaryIssues} +import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings +import sbt.Keys._ +import sbt._ +import sbtunidoc.Plugin.UnidocKeys.unidoc object AkkaBuild extends Build { System.setProperty("akka.mode", "test") // Is there better place for this? @@ -58,7 +57,7 @@ object AkkaBuild extends Build { validatePullRequest <<= (unidoc in Compile, SphinxSupport.generate in Sphinx in docs) map { (_, _) => } ), aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j, agent, - persistence, zeroMQ, kernel, osgi, docs, contrib, samples, multiNodeTestkit) + persistence, persistenceTck, zeroMQ, kernel, osgi, docs, contrib, samples, multiNodeTestkit) ) lazy val akkaScalaNightly = Project( @@ -67,7 +66,7 @@ object AkkaBuild extends Build { // remove dependencies that we have to build ourselves (Scala STM, ZeroMQ Scala Bindings) // samples don't work with dbuild right now aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j, - persistence, kernel, osgi, contrib, multiNodeTestkit) + persistence, persistenceTck, kernel, osgi, contrib, multiNodeTestkit) ) lazy val actor = Project( @@ -129,6 +128,12 @@ object AkkaBuild extends Build { dependencies = Seq(actor, remote % "test->test", testkit % "test->test") ) + lazy val persistenceTck = Project( + id = "akka-persistence-experimental-tck", + base = file("akka-persistence-tck"), + dependencies = Seq(persistence % "compile;test->test", testkit % "compile;test->test") + ) + lazy val zeroMQ = Project( id = "akka-zeromq", base = file("akka-zeromq"), @@ -238,7 +243,7 @@ object AkkaBuild extends Build { base = file("akka-docs"), dependencies = Seq(actor, testkit % "test->test", remote % "compile;test->test", cluster, slf4j, agent, zeroMQ, camel, osgi, - persistence % "compile;test->test") + persistence % "compile;test->test", persistenceTck) ) lazy val contrib = Project( diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 78cd124b83..301426ba9c 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -70,7 +70,7 @@ object Dependencies { val karafExam = "org.apache.karaf.tooling.exam" % "org.apache.karaf.tooling.exam.container" % "2.3.1" % "test" // ApacheV2 // mirrored in OSGi sample val paxExam = "org.ops4j.pax.exam" % "pax-exam-junit4" % "2.6.0" % "test" // ApacheV2 - val scalaXml = "org.scala-lang.modules" %% "scala-xml" % "1.0.1" % "test" + val scalaXml = "org.scala-lang.modules" %% "scala-xml" % "1.0.1" % "test" // metrics, measurements, perf testing val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.1" % "test" // ApacheV2 @@ -104,6 +104,8 @@ object Dependencies { val persistence = Seq(levelDB, levelDBNative, protobuf, Test.scalatest, Test.junit, Test.commonsIo) ++ scalaXmlDepencency + val persistenceTck = Seq(Test.scalatest.copy(configurations = Some("compile")), Test.junit.copy(configurations = Some("compile"))) + val kernel = Seq(Test.scalatest, Test.junit) val camel = Seq(camelCore, Test.scalatest, Test.junit, Test.mockito, Test.logback, Test.commonsIo, Test.junitIntf)