+per #13815 akka-persistence-tck based on @krasserm's work
Original here: https://github.com/krasserm/akka-persistence-testkit New features: * merged martin's tests * usable from java (junit 4) * simple bench test, which helps checking if ordering is perserved under bigger workloads and simple perf checking * does NOT include tests for already deprecated features (deleteMessages) * docs Resolves #13815 Conflicts: project/AkkaBuild.scala
This commit is contained in:
parent
7ca3a9699e
commit
90bc347607
22 changed files with 965 additions and 186 deletions
|
|
@ -1,144 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.persistence
|
||||
|
||||
import org.openjdk.jmh.annotations._
|
||||
import org.openjdk.jmh._
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.actor._
|
||||
import akka.testkit.TestProbe
|
||||
import java.io.File
|
||||
import org.apache.commons.io.FileUtils
|
||||
import org.openjdk.jmh.annotations.Scope
|
||||
|
||||
@State(Scope.Benchmark)
|
||||
@BenchmarkMode(Array(Mode.Throughput))
|
||||
class PersistentActorThroughputBenchmark {
|
||||
|
||||
val config = PersistenceSpec.config("leveldb", "benchmark")
|
||||
|
||||
lazy val storageLocations = List(
|
||||
"akka.persistence.journal.leveldb.dir",
|
||||
"akka.persistence.journal.leveldb-shared.store.dir",
|
||||
"akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s)))
|
||||
|
||||
var system: ActorSystem = _
|
||||
|
||||
var probe: TestProbe = _
|
||||
var actor: ActorRef = _
|
||||
var persist1EventProcessor: ActorRef = _
|
||||
var persist1CommandProcessor: ActorRef = _
|
||||
var persistAsync1EventProcessor: ActorRef = _
|
||||
var persistAsync1QuickReplyEventProcessor: ActorRef = _
|
||||
|
||||
val data10k = (1 to 10000).toArray
|
||||
|
||||
@Setup
|
||||
def setup() {
|
||||
system = ActorSystem("test", config)
|
||||
|
||||
probe = TestProbe()(system)
|
||||
|
||||
storageLocations.foreach(FileUtils.deleteDirectory)
|
||||
actor = system.actorOf(Props(classOf[BaselineActor], data10k.last), "a-1")
|
||||
persist1CommandProcessor = system.actorOf(Props(classOf[Persist1CommandProcessor], data10k.last), "p-1")
|
||||
persist1EventProcessor = system.actorOf(Props(classOf[Persist1EventPersistentActor], data10k.last), "ep-1")
|
||||
persistAsync1EventProcessor = system.actorOf(Props(classOf[PersistAsync1EventPersistentActor], data10k.last), "epa-1")
|
||||
persistAsync1QuickReplyEventProcessor = system.actorOf(Props(classOf[PersistAsync1EventQuickReplyPersistentActor], data10k.last), "epa-2")
|
||||
}
|
||||
|
||||
@TearDown
|
||||
def shutdown() {
|
||||
system.shutdown()
|
||||
system.awaitTermination()
|
||||
|
||||
storageLocations.foreach(FileUtils.deleteDirectory)
|
||||
}
|
||||
|
||||
@GenerateMicroBenchmark
|
||||
@OperationsPerInvocation(10000)
|
||||
def tell_normalActor_reply_baseline() {
|
||||
for (i <- data10k) actor.tell(i, probe.ref)
|
||||
|
||||
probe.expectMsg(data10k.last)
|
||||
}
|
||||
|
||||
@GenerateMicroBenchmark
|
||||
@OperationsPerInvocation(10000)
|
||||
def tell_persist_reply() {
|
||||
for (i <- data10k) persist1EventProcessor.tell(i, probe.ref)
|
||||
|
||||
probe.expectMsg(Evt(data10k.last))
|
||||
}
|
||||
|
||||
@GenerateMicroBenchmark
|
||||
@OperationsPerInvocation(10000)
|
||||
def tell_commandPersist_reply() {
|
||||
for (i <- data10k) persist1CommandProcessor.tell(i, probe.ref)
|
||||
|
||||
probe.expectMsg(Evt(data10k.last))
|
||||
}
|
||||
|
||||
@GenerateMicroBenchmark
|
||||
@OperationsPerInvocation(10000)
|
||||
def tell_persistAsync_reply() {
|
||||
for (i <- data10k) persistAsync1EventProcessor.tell(i, probe.ref)
|
||||
|
||||
probe.expectMsg(Evt(data10k.last))
|
||||
}
|
||||
|
||||
@GenerateMicroBenchmark
|
||||
@OperationsPerInvocation(10000)
|
||||
def tell_persistAsync_replyRightOnCommandReceive() {
|
||||
for (i <- data10k) persistAsync1QuickReplyEventProcessor.tell(i, probe.ref)
|
||||
|
||||
probe.expectMsg(Evt(data10k.last))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class Persist1EventPersistentActor(respondAfter: Int) extends PersistentActor {
|
||||
|
||||
override def persistenceId: String = self.path.name
|
||||
|
||||
override def receiveCommand = {
|
||||
case n: Int => persist(Evt(n)) { e => if (e.i == respondAfter) sender() ! e }
|
||||
}
|
||||
override def receiveRecover = {
|
||||
case _ => // do nothing
|
||||
}
|
||||
|
||||
}
|
||||
class Persist1CommandProcessor(respondAfter: Int) extends Processor {
|
||||
override def receive = {
|
||||
case n: Int => if (n == respondAfter) sender() ! Evt(n)
|
||||
}
|
||||
}
|
||||
|
||||
class PersistAsync1EventPersistentActor(respondAfter: Int) extends PersistentActor {
|
||||
override def persistenceId: String = self.path.name
|
||||
|
||||
override def receiveCommand = {
|
||||
case n: Int =>
|
||||
persistAsync(Evt(n)) { e => if (e.i == respondAfter) sender() ! e }
|
||||
}
|
||||
override def receiveRecover = {
|
||||
case _ => // do nothing
|
||||
}
|
||||
}
|
||||
|
||||
class PersistAsync1EventQuickReplyPersistentActor(respondAfter: Int) extends PersistentActor {
|
||||
|
||||
override def persistenceId: String = self.path.name
|
||||
|
||||
override def receiveCommand = {
|
||||
case n: Int =>
|
||||
val e = Evt(n)
|
||||
if (n == respondAfter) sender() ! e
|
||||
persistAsync(e)(identity)
|
||||
}
|
||||
override def receiveRecover = {
|
||||
case _ => // do nothing
|
||||
}
|
||||
}
|
||||
|
|
@ -5,17 +5,27 @@
|
|||
package docs.persistence;
|
||||
|
||||
//#plugin-imports
|
||||
import akka.actor.UntypedActor;
|
||||
import scala.concurrent.Future;
|
||||
|
||||
import akka.actor.*;
|
||||
import akka.japi.Option;
|
||||
import akka.japi.Procedure;
|
||||
import akka.persistence.*;
|
||||
import akka.persistence.journal.japi.*;
|
||||
import akka.persistence.snapshot.japi.*;
|
||||
//#plugin-imports
|
||||
import akka.actor.*;
|
||||
import akka.persistence.japi.journal.JavaJournalSpec;
|
||||
import akka.persistence.japi.snapshot.JavaSnapshotStoreSpec;
|
||||
import akka.persistence.journal.japi.AsyncWriteJournal;
|
||||
import akka.persistence.journal.leveldb.SharedLeveldbJournal;
|
||||
import akka.persistence.journal.leveldb.SharedLeveldbStore;
|
||||
import akka.persistence.snapshot.japi.SnapshotStore;
|
||||
import com.typesafe.config.Config;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
import org.iq80.leveldb.util.FileUtils;
|
||||
import scala.concurrent.Future;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
//#plugin-imports
|
||||
|
||||
public class PersistencePluginDocTest {
|
||||
|
||||
|
|
@ -106,4 +116,68 @@ public class PersistencePluginDocTest {
|
|||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static Object o2 = new Object() {
|
||||
//#journal-tck-java
|
||||
class MyJournalSpecTest extends JavaJournalSpec {
|
||||
|
||||
public MyJournalSpecTest() {
|
||||
super(ConfigFactory.parseString(
|
||||
"persistence.journal.plugin = " +
|
||||
"\"akka.persistence.journal.leveldb-shared\""));
|
||||
}
|
||||
}
|
||||
//#journal-tck-java
|
||||
};
|
||||
|
||||
static Object o3 = new Object() {
|
||||
//#snapshot-store-tck-java
|
||||
class MySnapshotStoreTest extends JavaSnapshotStoreSpec {
|
||||
|
||||
public MySnapshotStoreTest() {
|
||||
super(ConfigFactory.parseString(
|
||||
"akka.persistence.snapshot-store.plugin = " +
|
||||
"\"akka.persistence.snapshot-store.local\""));
|
||||
}
|
||||
}
|
||||
//#snapshot-store-tck-java
|
||||
};
|
||||
|
||||
static Object o4 = new Object() {
|
||||
//#journal-tck-before-after-java
|
||||
class MyJournalSpecTest extends JavaJournalSpec {
|
||||
|
||||
List<File> storageLocations = new ArrayList<File>();
|
||||
|
||||
public MyJournalSpecTest() {
|
||||
super(ConfigFactory.parseString(
|
||||
"persistence.journal.plugin = " +
|
||||
"\"akka.persistence.journal.leveldb-shared\""));
|
||||
|
||||
Config config = system().settings().config();
|
||||
storageLocations.add(new File(
|
||||
config.getString("akka.persistence.journal.leveldb.dir")));
|
||||
storageLocations.add(new File(
|
||||
config.getString("akka.persistence.snapshot-store.local.dir")));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeAll() {
|
||||
for (File storageLocation : storageLocations) {
|
||||
FileUtils.deleteRecursively(storageLocation);
|
||||
}
|
||||
super.beforeAll();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterAll() {
|
||||
super.afterAll();
|
||||
for (File storageLocation : storageLocations) {
|
||||
FileUtils.deleteRecursively(storageLocation);
|
||||
}
|
||||
}
|
||||
}
|
||||
//#journal-tck-before-after-java
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -498,6 +498,40 @@ A snapshot store plugin can be activated with the following minimal configuratio
|
|||
The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher
|
||||
used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``.
|
||||
|
||||
Plugin TCK
|
||||
----------
|
||||
In order to help developers build correct and high quality storage plugins, we provide an Technology Compatibility Kit (`TCK <http://en.wikipedia.org/wiki/Technology_Compatibility_Kit>`_ for short).
|
||||
|
||||
The TCK is usable from Java as well as Scala projects, for Java you need to include the akka-persistence-tck-experimental dependency::
|
||||
|
||||
<dependency>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-persistence-tck-experimental_${scala.version}</artifactId>
|
||||
<version>2.3.5</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
To include the Journal TCK tests in your test suite simply extend the provided ``JavaJournalSpec``:
|
||||
|
||||
.. includecode:: ./code/docs/persistence/PersistencePluginDocTest.java#journal-tck-java
|
||||
|
||||
We also provide a simple benchmarking class ``JavaJournalPerfSpec`` which includes all the tests that ``JavaJournalSpec``
|
||||
has, and also performs some longer operations on the Journal while printing it's performance stats. While it is NOT aimed
|
||||
to provide a proper benchmarking environment it can be used to get a rough feel about your journals performance in the most
|
||||
typical scenarios.
|
||||
|
||||
In order to include the ``SnapshotStore`` TCK tests in your test suite simply extend the ``SnapshotStoreSpec:
|
||||
|
||||
.. includecode:: ./code/docs/persistence/PersistencePluginDocTest.java#snapshot-store-tck-java
|
||||
|
||||
In case your plugin requires some setting up (starting a mock database, removing temporary files etc.) you can override the
|
||||
``beforeAll`` and ``afterAll`` methods to hook into the tests lifecycle:
|
||||
|
||||
.. includecode:: ./code/docs/persistence/PersistencePluginDocTest.java#journal-tck-before-after-java
|
||||
|
||||
We *highly recommend* including these specifications in your test suite, as they cover a broad range of cases you
|
||||
might have otherwise forgotten to test for when writing a plugin from scratch.
|
||||
|
||||
Pre-packaged plugins
|
||||
====================
|
||||
|
||||
|
|
|
|||
|
|
@ -4,8 +4,9 @@
|
|||
|
||||
package docs.persistence
|
||||
|
||||
import akka.actor.{ Actor, ActorSystem, Props }
|
||||
import akka.actor.{Actor, ActorSystem, Props}
|
||||
import akka.persistence._
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.language.postfixOps
|
||||
|
|
@ -28,7 +29,7 @@ trait PersistenceDocSpec {
|
|||
|
||||
new AnyRef {
|
||||
//#definition
|
||||
import akka.persistence.{ PersistenceFailure, Persistent, Processor }
|
||||
import akka.persistence.{PersistenceFailure, Persistent, Processor}
|
||||
|
||||
class MyProcessor extends Processor {
|
||||
def receive = {
|
||||
|
|
@ -93,11 +94,11 @@ trait PersistenceDocSpec {
|
|||
|
||||
def receiveRecover: Receive = {
|
||||
case RecoveryCompleted => recoveryCompleted()
|
||||
case evt => //...
|
||||
case evt => //...
|
||||
}
|
||||
|
||||
def receiveCommand: Receive = {
|
||||
case msg => //...
|
||||
case msg => //...
|
||||
}
|
||||
|
||||
def recoveryCompleted(): Unit = {
|
||||
|
|
@ -134,7 +135,7 @@ trait PersistenceDocSpec {
|
|||
|
||||
new AnyRef {
|
||||
//#at-least-once-example
|
||||
import akka.actor.{ Actor, ActorPath, Props }
|
||||
import akka.actor.{Actor, ActorPath}
|
||||
import akka.persistence.AtLeastOnceDelivery
|
||||
|
||||
case class Msg(deliveryId: Long, s: String)
|
||||
|
|
@ -176,8 +177,8 @@ trait PersistenceDocSpec {
|
|||
|
||||
new AnyRef {
|
||||
//#channel-example
|
||||
import akka.actor.{ Actor, Props }
|
||||
import akka.persistence.{ Channel, Deliver, Persistent, Processor }
|
||||
import akka.actor.{Actor, Props}
|
||||
import akka.persistence.{Channel, Deliver, Persistent, Processor}
|
||||
|
||||
class MyProcessor extends Processor {
|
||||
val destination = context.actorOf(Props[MyDestination])
|
||||
|
|
@ -251,7 +252,7 @@ trait PersistenceDocSpec {
|
|||
new AnyRef {
|
||||
//#fsm-example
|
||||
import akka.actor.FSM
|
||||
import akka.persistence.{ Persistent, Processor }
|
||||
import akka.persistence.{Persistent, Processor}
|
||||
|
||||
class PersistentDoor extends Processor with FSM[String, Int] {
|
||||
startWith("closed", 0)
|
||||
|
|
|
|||
|
|
@ -5,20 +5,18 @@
|
|||
package docs.persistence
|
||||
|
||||
//#plugin-imports
|
||||
import scala.concurrent.Future
|
||||
import scala.collection.immutable.Seq
|
||||
//#plugin-imports
|
||||
|
||||
import com.typesafe.config._
|
||||
import org.scalatest.WordSpec
|
||||
import scala.concurrent.duration._
|
||||
import akka.testkit.TestKit
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
//#plugin-imports
|
||||
import akka.persistence._
|
||||
import akka.persistence.journal._
|
||||
import akka.persistence.snapshot._
|
||||
import akka.testkit.TestKit
|
||||
import com.typesafe.config._
|
||||
import org.scalatest.WordSpec
|
||||
|
||||
import scala.collection.immutable.Seq
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration._
|
||||
//#plugin-imports
|
||||
|
||||
object PersistencePluginDocSpec {
|
||||
|
|
@ -115,7 +113,6 @@ trait SharedLeveldbPluginDocSpec {
|
|||
|
||||
new AnyRef {
|
||||
import akka.actor._
|
||||
//#shared-store-creation
|
||||
import akka.persistence.journal.leveldb.SharedLeveldbStore
|
||||
|
||||
val store = system.actorOf(Props[SharedLeveldbStore], "store")
|
||||
|
|
@ -139,3 +136,60 @@ class MySnapshotStore extends SnapshotStore {
|
|||
def delete(metadata: SnapshotMetadata): Unit = ???
|
||||
def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Unit = ???
|
||||
}
|
||||
|
||||
object PersistenceTCKDoc {
|
||||
new AnyRef {
|
||||
import akka.persistence.journal.JournalSpec
|
||||
|
||||
//#journal-tck-scala
|
||||
class MyJournalSpec extends JournalSpec {
|
||||
override val config = ConfigFactory.parseString(
|
||||
"""
|
||||
|akka.persistence.journal.plugin = "my.journal.plugin"
|
||||
""".stripMargin)
|
||||
}
|
||||
//#journal-tck-scala
|
||||
}
|
||||
new AnyRef {
|
||||
import akka.persistence.snapshot.SnapshotStoreSpec
|
||||
|
||||
//#snapshot-store-tck-scala
|
||||
class MySnapshotStoreSpec extends SnapshotStoreSpec {
|
||||
override val config = ConfigFactory.parseString(
|
||||
"""
|
||||
|akka.persistence.snapshot-store.plugin = "my.snapshot-store.plugin"
|
||||
""".stripMargin)
|
||||
}
|
||||
//#snapshot-store-tck-scala
|
||||
}
|
||||
new AnyRef {
|
||||
import java.io.File
|
||||
|
||||
import akka.persistence.journal.JournalSpec
|
||||
import org.iq80.leveldb.util.FileUtils
|
||||
|
||||
//#journal-tck-before-after-scala
|
||||
class MyJournalSpec extends JournalSpec {
|
||||
override val config = ConfigFactory.parseString(
|
||||
"""
|
||||
|akka.persistence.journal.plugin = "my.journal.plugin"
|
||||
""".stripMargin)
|
||||
|
||||
val storageLocations = List(
|
||||
new File(system.settings.config.getString("akka.persistence.journal.leveldb.dir")),
|
||||
new File(config.getString("akka.persistence.snapshot-store.local.dir")))
|
||||
|
||||
override def beforeAll() {
|
||||
super.beforeAll()
|
||||
storageLocations foreach FileUtils.deleteRecursively
|
||||
}
|
||||
|
||||
override def afterAll() {
|
||||
storageLocations foreach FileUtils.deleteRecursively
|
||||
super.afterAll()
|
||||
}
|
||||
|
||||
}
|
||||
//#journal-tck-before-after-scala
|
||||
}
|
||||
}
|
||||
|
|
@ -500,6 +500,35 @@ A snapshot store plugin can be activated with the following minimal configuratio
|
|||
The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher
|
||||
used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``.
|
||||
|
||||
Plugin TCK
|
||||
----------
|
||||
In order to help developers build correct and high quality storage plugins, we provide an Technology Compatibility Kit (`TCK <http://en.wikipedia.org/wiki/Technology_Compatibility_Kit>`_ for short).
|
||||
|
||||
The TCK is usable from Java as well as Scala projects, for Scala you need to include the akka-persistence-tck-experimental dependency::
|
||||
|
||||
"com.typesafe.akka" %% "akka-persistence-tck-experimental" % "2.3.5" % "test"
|
||||
|
||||
To include the Journal TCK tests in your test suite simply extend the provided ``JournalSpec``:
|
||||
|
||||
.. includecode:: ./code/docs/persistence/PersistencePluginDocSpec.scala#journal-tck-scala
|
||||
|
||||
We also provide a simple benchmarking class ``JournalPerfSpec`` which includes all the tests that ``JournalSpec``
|
||||
has, and also performs some longer operations on the Journal while printing it's performance stats. While it is NOT aimed
|
||||
to provide a proper benchmarking environment it can be used to get a rough feel about your journals performance in the most
|
||||
typical scenarios.
|
||||
|
||||
In order to include the ``SnapshotStore`` TCK tests in your test suite simply extend the ``SnapshotStoreSpec``:
|
||||
|
||||
.. includecode:: ./code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-store-tck-scala
|
||||
|
||||
In case your plugin requires some setting up (starting a mock database, removing temporary files etc.) you can override the
|
||||
``beforeAll`` and ``afterAll`` methods to hook into the tests lifecycle:
|
||||
|
||||
.. includecode:: ./code/docs/persistence/PersistencePluginDocSpec.scala#journal-tck-before-after-scala
|
||||
|
||||
We *highly recommend* including these specifications in your test suite, as they cover a broad range of cases you
|
||||
might have otherwise forgotten to test for when writing a plugin from scratch.
|
||||
|
||||
.. _pre-packaged-plugins:
|
||||
|
||||
Pre-packaged plugins
|
||||
|
|
|
|||
23
akka-persistence-tck/build.sbt
Normal file
23
akka-persistence-tck/build.sbt
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
import akka.{ AkkaBuild, Dependencies, Formatting, OSGi, Unidoc }
|
||||
import com.typesafe.tools.mima.plugin.MimaKeys
|
||||
import akka.MultiNode
|
||||
|
||||
AkkaBuild.defaultSettings
|
||||
|
||||
AkkaBuild.experimentalSettings
|
||||
|
||||
Formatting.formatSettings
|
||||
|
||||
Unidoc.scaladocSettings
|
||||
|
||||
Unidoc.javadocSettings
|
||||
|
||||
// OSGi.persistenceTck TODO: we do need to export this as OSGi bundle too?
|
||||
|
||||
libraryDependencies ++= Dependencies.persistenceTck
|
||||
|
||||
MimaKeys.previousArtifact := None
|
||||
|
||||
fork in Test := true
|
||||
|
||||
javaOptions in Test := MultiNode.defaultMultiJvmOptions
|
||||
|
|
@ -0,0 +1,43 @@
|
|||
package akka.persistence
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
import akka.actor._
|
||||
import akka.testkit._
|
||||
|
||||
import com.typesafe.config._
|
||||
|
||||
import org.scalatest._
|
||||
|
||||
trait PluginSpec extends TestKitBase with WordSpecLike with Matchers with BeforeAndAfterAll with BeforeAndAfterEach {
|
||||
private val counter = new AtomicInteger(0)
|
||||
|
||||
private var _extension: Persistence = _
|
||||
private var _pid: String = _
|
||||
|
||||
// used to avoid messages be delivered to a restarted actor,
|
||||
// this is akka-persistence internals and journals themselfes don't really care
|
||||
protected val actorInstanceId = 1
|
||||
|
||||
override protected def beforeEach(): Unit = {
|
||||
_pid = s"p-${counter.incrementAndGet()}"
|
||||
}
|
||||
override protected def beforeAll(): Unit =
|
||||
_extension = Persistence(system)
|
||||
|
||||
override protected def afterAll(): Unit =
|
||||
shutdown(system)
|
||||
|
||||
val config: Config
|
||||
|
||||
def extension: Persistence =
|
||||
_extension
|
||||
|
||||
def pid: String =
|
||||
_pid
|
||||
|
||||
def subscribe[T: ClassTag](subscriber: ActorRef) =
|
||||
system.eventStream.subscribe(subscriber, implicitly[ClassTag[T]].runtimeClass)
|
||||
}
|
||||
|
|
@ -0,0 +1,50 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.persistence.japi.journal
|
||||
|
||||
import akka.persistence.journal.{ JournalPerfSpec, JournalSpec }
|
||||
import com.typesafe.config.Config
|
||||
import org.junit.runner.RunWith
|
||||
import org.scalatest.Informer
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
|
||||
/**
|
||||
* JAVA API
|
||||
*
|
||||
* Java / JUnit consumable equivalent of [[akka.persistence.journal.JournalPerfSpec]] and [[akka.persistence.journal.JournalSpec]].
|
||||
*
|
||||
* This spec measures execution times of the basic operations that an [[akka.persistence.PersistentActor]] provides,
|
||||
* using the provided Journal (plugin).
|
||||
*
|
||||
* It is *NOT* meant to be a comprehensive benchmark, but rather aims to help plugin developers to easily determine
|
||||
* if their plugin's performance is roughly as expected. It also validates the plugin still works under "more messages" scenarios.
|
||||
*
|
||||
* The measurements are by default printed to `System.out`, if you want to customise this please override the [[#info]] method.
|
||||
*
|
||||
* The benchmark iteration and message counts are easily customisable by overriding these methods:
|
||||
*
|
||||
* {{{
|
||||
* @Override
|
||||
* public long awaitDurationMillis() { return 10000; }
|
||||
*
|
||||
* @Override
|
||||
* public int eventsCount() { return 10 * 1000; }
|
||||
*
|
||||
* @Override
|
||||
* public int measurementIterations { return 10; }
|
||||
* }}}
|
||||
*
|
||||
* In case your journal plugin needs some kind of setup or teardown, override the `beforeAll` or `afterAll`
|
||||
* methods (don't forget to call `super` in your overriden methods).
|
||||
*
|
||||
* @see [[akka.persistence.journal.JournalSpec]]
|
||||
* @see [[akka.persistence.journal.JournalPerfSpec]]
|
||||
* @param config configures the Journal plugin to be tested
|
||||
*/
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class JavaJournalPerfSpec(val config: Config) extends JournalSpec with JournalPerfSpec {
|
||||
override protected def info: Informer = new Informer {
|
||||
override def apply(message: String, payload: Option[Any]): Unit = System.out.println(message)
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,24 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.persistence.japi.journal
|
||||
|
||||
import akka.persistence.journal.JournalSpec
|
||||
import com.typesafe.config.Config
|
||||
import org.junit.runner.RunWith
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
|
||||
/**
|
||||
* JAVA API
|
||||
*
|
||||
* Java / JUnit API for [[akka.persistence.journal.JournalSpec]].
|
||||
*
|
||||
* In case your journal plugin needs some kind of setup or teardown, override the `beforeAll` or `afterAll`
|
||||
* methods (don't forget to call `super` in your overriden methods).
|
||||
*
|
||||
* @see [[akka.persistence.journal.JournalSpec]]
|
||||
* @see [[akka.persistence.journal.JournalPerfSpec]]
|
||||
* @param config configures the Journal plugin to be tested
|
||||
*/
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class JavaJournalSpec(val config: Config) extends JournalSpec
|
||||
|
|
@ -0,0 +1,23 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.persistence.japi.snapshot
|
||||
|
||||
import akka.persistence.snapshot.{ SnapshotStore, SnapshotStoreSpec }
|
||||
import com.typesafe.config.Config
|
||||
import org.junit.runner.RunWith
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
|
||||
/**
|
||||
* JAVA API
|
||||
*
|
||||
* This spec aims to verify custom akka-persistence [[SnapshotStore]] implementations.
|
||||
* Plugin authors are highly encouraged to include it in their plugin's test suites.
|
||||
*
|
||||
* In case your snapshot-store plugin needs some kind of setup or teardown, override the `beforeAll` or `afterAll`
|
||||
* methods (don't forget to call `super` in your overriden methods).
|
||||
*
|
||||
* @see [[akka.persistence.snapshot.SnapshotStoreSpec]]
|
||||
*/
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class JavaSnapshotStoreSpec(val config: Config) extends SnapshotStoreSpec
|
||||
|
|
@ -0,0 +1,153 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.persistence.journal
|
||||
|
||||
import akka.actor.{ ActorLogging, ActorRef, Props }
|
||||
import akka.persistence.journal.JournalPerfSpec.{ BenchActor, Cmd, ResetCounter }
|
||||
import akka.persistence.{ PersistentActor, PluginSpec }
|
||||
import akka.testkit.TestProbe
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object JournalPerfSpec {
|
||||
class BenchActor(val persistenceId: String, replyTo: ActorRef, replyAfter: Int) extends PersistentActor
|
||||
with ActorLogging {
|
||||
|
||||
var counter = 0
|
||||
|
||||
override def receiveCommand: Receive = {
|
||||
case c @ Cmd("p", payload) ⇒
|
||||
persist(c) { d ⇒
|
||||
counter += 1
|
||||
require(d.payload == counter, s"Expected to receive [$counter] yet got: [${d.payload}]")
|
||||
if (counter == replyAfter) replyTo ! d.payload
|
||||
}
|
||||
|
||||
case c @ Cmd("pa", payload) ⇒
|
||||
persistAsync(c) { d ⇒
|
||||
counter += 1
|
||||
require(d.payload == counter, s"Expected to receive [$counter] yet got: [${d.payload}]")
|
||||
if (counter == replyAfter) replyTo ! d.payload
|
||||
}
|
||||
|
||||
case c @ Cmd("par", payload) ⇒
|
||||
counter += 1
|
||||
persistAsync(c) { d ⇒
|
||||
require(d.payload == counter, s"Expected to receive [$counter] yet got: [${d.payload}]")
|
||||
}
|
||||
if (counter == replyAfter) replyTo ! payload
|
||||
|
||||
case c @ Cmd("n", payload) ⇒
|
||||
counter += 1
|
||||
require(payload == counter, s"Expected to receive [$counter] yet got: [${payload}]")
|
||||
if (counter == replyAfter) replyTo ! payload
|
||||
|
||||
case ResetCounter ⇒
|
||||
counter = 0
|
||||
}
|
||||
|
||||
override def receiveRecover: Receive = {
|
||||
case Cmd(_, payload) ⇒
|
||||
counter += 1
|
||||
require(payload == counter, s"Expected to receive [$counter] yet got: [${payload}]")
|
||||
if (counter == replyAfter) replyTo ! payload
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
case object ResetCounter
|
||||
case class Cmd(mode: String, payload: Int)
|
||||
}
|
||||
|
||||
/**
|
||||
* This spec measures execution times of the basic operations that an [[akka.persistence.PersistentActor]] provides,
|
||||
* using the provided Journal (plugin).
|
||||
*
|
||||
* It is *NOT* meant to be a comprehensive benchmark, but rather aims to help plugin developers to easily determine
|
||||
* if their plugin's performance is roughly as expected. It also validates the plugin still works under "more messages" scenarios.
|
||||
*
|
||||
* In case your journal plugin needs some kind of setup or teardown, override the `beforeAll` or `afterAll`
|
||||
* methods (don't forget to call `super` in your overriden methods).
|
||||
*
|
||||
* For a Java and JUnit consumable version of the TCK please refer to [[akka.persistence.japi.journal.JavaJournalPerfSpec]].
|
||||
*
|
||||
* @see [[akka.persistence.journal.JournalSpec]]
|
||||
*/
|
||||
trait JournalPerfSpec extends PluginSpec {
|
||||
this: JournalSpec ⇒
|
||||
|
||||
private val testProbe = TestProbe()
|
||||
|
||||
def benchActor(replyAfter: Int): ActorRef =
|
||||
system.actorOf(Props(classOf[BenchActor], pid, testProbe.ref, replyAfter))
|
||||
|
||||
def feedAndExpectLast(actor: ActorRef, mode: String, cmnds: immutable.Seq[Int]): Unit = {
|
||||
cmnds foreach { c ⇒ actor ! Cmd(mode, c) }
|
||||
testProbe.expectMsg(awaitDuration, cmnds.last)
|
||||
}
|
||||
|
||||
/** Executes a block of code multiple times (no warmup) */
|
||||
def measure(msg: Duration ⇒ String)(block: ⇒ Unit): Unit = {
|
||||
val measurements = Array.ofDim[Duration](measurementIterations)
|
||||
var i = 0
|
||||
while (i < measurementIterations) {
|
||||
val start = System.nanoTime()
|
||||
|
||||
block
|
||||
|
||||
val stop = System.nanoTime()
|
||||
val d = (stop - start).nanos
|
||||
measurements(i) = d
|
||||
info(msg(d))
|
||||
|
||||
i += 1
|
||||
}
|
||||
info(s"Average time: ${(measurements.map(_.toNanos).sum / measurementIterations).nanos.toMillis} ms")
|
||||
}
|
||||
|
||||
/** Override in order to customize timeouts used for expectMsg, in order to tune the awaits to your journal's perf */
|
||||
def awaitDurationMillis: Long = 10.seconds.toMillis
|
||||
|
||||
/** Override in order to customize timeouts used for expectMsg, in order to tune the awaits to your journal's perf */
|
||||
private def awaitDuration: FiniteDuration = awaitDurationMillis.millis
|
||||
|
||||
/** Numbe of messages sent to the PersistentActor under test for each test iteration */
|
||||
def eventsCount: Int = 10 * 1000
|
||||
|
||||
/** Number of measurement iterations each test will be run. */
|
||||
def measurementIterations: Int = 10
|
||||
|
||||
private val commands = Vector(1 to eventsCount: _*)
|
||||
|
||||
"A PersistentActor's performance" must {
|
||||
s"measure: persistAsync()-ing $eventsCount events" in {
|
||||
val p1 = benchActor(eventsCount)
|
||||
|
||||
measure(d ⇒ s"PersistAsync()-ing $eventsCount took ${d.toMillis} ms") {
|
||||
feedAndExpectLast(p1, "pa", commands)
|
||||
p1 ! ResetCounter
|
||||
}
|
||||
}
|
||||
s"measure: persist()-ing $eventsCount events" in {
|
||||
val p1 = benchActor(eventsCount)
|
||||
|
||||
measure(d ⇒ s"Persist()-ing $eventsCount took ${d.toMillis} ms") {
|
||||
feedAndExpectLast(p1, "p", commands)
|
||||
p1 ! ResetCounter
|
||||
}
|
||||
}
|
||||
s"measure: recovering $eventsCount events" in {
|
||||
val p1 = benchActor(eventsCount)
|
||||
feedAndExpectLast(p1, "p", commands)
|
||||
|
||||
measure(d ⇒ s"Recovering $eventsCount took ${d.toMillis} ms") {
|
||||
benchActor(eventsCount)
|
||||
testProbe.expectMsg(max = awaitDuration, commands.last)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,198 @@
|
|||
package akka.persistence.journal
|
||||
|
||||
import scala.collection.immutable.Seq
|
||||
|
||||
import akka.actor._
|
||||
import akka.persistence._
|
||||
import akka.persistence.JournalProtocol._
|
||||
import akka.testkit._
|
||||
|
||||
import com.typesafe.config._
|
||||
|
||||
object JournalSpec {
|
||||
val config = ConfigFactory.parseString(
|
||||
"""
|
||||
|akka.persistence.publish-confirmations = on
|
||||
|akka.persistence.publish-plugin-commands = on
|
||||
""".stripMargin)
|
||||
|
||||
case class Confirmation(persistenceId: String, channelId: String, sequenceNr: Long) extends PersistentConfirmation
|
||||
}
|
||||
|
||||
/**
|
||||
* This spec aims to verify custom akka-persistence Journal implementations.
|
||||
* Plugin authors are highly encouraged to include it in their plugin's test suites.
|
||||
*
|
||||
* In case your journal plugin needs some kind of setup or teardown, override the `beforeAll` or `afterAll`
|
||||
* methods (don't forget to call `super` in your overriden methods).
|
||||
*
|
||||
* For a Java and JUnit consumable version of the TCK please refer to [[akka.persistence.japi.journal.JavaJournalSpec]].
|
||||
*
|
||||
* @see [[akka.persistence.journal.JournalPerfSpec]]
|
||||
* @see [[akka.persistence.japi.journal.JavaJournalPerfSpec]]
|
||||
*/
|
||||
trait JournalSpec extends PluginSpec {
|
||||
import JournalSpec._
|
||||
|
||||
implicit lazy val system: ActorSystem = ActorSystem("JournalSpec", config.withFallback(JournalSpec.config))
|
||||
|
||||
private var senderProbe: TestProbe = _
|
||||
private var receiverProbe: TestProbe = _
|
||||
|
||||
override protected def beforeEach(): Unit = {
|
||||
super.beforeEach()
|
||||
senderProbe = TestProbe()
|
||||
receiverProbe = TestProbe()
|
||||
writeMessages(1, 5, pid, senderProbe.ref)
|
||||
}
|
||||
|
||||
def journal: ActorRef =
|
||||
extension.journalFor(null)
|
||||
|
||||
def replayedMessage(snr: Long, deleted: Boolean = false, confirms: Seq[String] = Nil): ReplayedMessage =
|
||||
ReplayedMessage(PersistentImpl(s"a-${snr}", snr, pid, deleted, confirms, senderProbe.ref))
|
||||
|
||||
def writeMessages(from: Int, to: Int, pid: String, sender: ActorRef): Unit = {
|
||||
val msgs = from to to map { i ⇒ PersistentRepr(payload = s"a-${i}", sequenceNr = i, persistenceId = pid, sender = sender) }
|
||||
val probe = TestProbe()
|
||||
|
||||
journal ! WriteMessages(msgs, probe.ref, actorInstanceId)
|
||||
|
||||
probe.expectMsg(WriteMessagesSuccessful)
|
||||
from to to foreach { i ⇒
|
||||
probe.expectMsgPF() { case WriteMessageSuccess(PersistentImpl(payload, `i`, `pid`, _, _, `sender`), _) ⇒ payload should be(s"a-${i}") }
|
||||
}
|
||||
}
|
||||
|
||||
"A journal" must {
|
||||
"replay all messages" in {
|
||||
journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
|
||||
1 to 5 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
|
||||
receiverProbe.expectMsg(ReplayMessagesSuccess)
|
||||
}
|
||||
"replay messages using a lower sequence number bound" in {
|
||||
journal ! ReplayMessages(3, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
|
||||
3 to 5 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
|
||||
receiverProbe.expectMsg(ReplayMessagesSuccess)
|
||||
}
|
||||
"replay messages using an upper sequence number bound" in {
|
||||
journal ! ReplayMessages(1, 3, Long.MaxValue, pid, receiverProbe.ref)
|
||||
1 to 3 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
|
||||
receiverProbe.expectMsg(ReplayMessagesSuccess)
|
||||
}
|
||||
"replay messages using a count limit" in {
|
||||
journal ! ReplayMessages(1, Long.MaxValue, 3, pid, receiverProbe.ref)
|
||||
1 to 3 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
|
||||
receiverProbe.expectMsg(ReplayMessagesSuccess)
|
||||
}
|
||||
"replay messages using a lower and upper sequence number bound" in {
|
||||
journal ! ReplayMessages(2, 4, Long.MaxValue, pid, receiverProbe.ref)
|
||||
2 to 4 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
|
||||
receiverProbe.expectMsg(ReplayMessagesSuccess)
|
||||
}
|
||||
"replay messages using a lower and upper sequence number bound and a count limit" in {
|
||||
journal ! ReplayMessages(2, 4, 2, pid, receiverProbe.ref)
|
||||
2 to 3 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
|
||||
receiverProbe.expectMsg(ReplayMessagesSuccess)
|
||||
}
|
||||
"replay a single if lower sequence number bound equals upper sequence number bound" in {
|
||||
journal ! ReplayMessages(2, 2, Long.MaxValue, pid, receiverProbe.ref)
|
||||
2 to 2 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
|
||||
receiverProbe.expectMsg(ReplayMessagesSuccess)
|
||||
}
|
||||
"replay a single message if count limit equals 1" in {
|
||||
journal ! ReplayMessages(2, 4, 1, pid, receiverProbe.ref)
|
||||
2 to 2 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
|
||||
receiverProbe.expectMsg(ReplayMessagesSuccess)
|
||||
}
|
||||
"not replay messages if count limit equals 0" in {
|
||||
journal ! ReplayMessages(2, 4, 0, pid, receiverProbe.ref)
|
||||
receiverProbe.expectMsg(ReplayMessagesSuccess)
|
||||
}
|
||||
"not replay messages if lower sequence number bound is greater than upper sequence number bound" in {
|
||||
journal ! ReplayMessages(3, 2, Long.MaxValue, pid, receiverProbe.ref)
|
||||
receiverProbe.expectMsg(ReplayMessagesSuccess)
|
||||
}
|
||||
"not replay permanently deleted messages (range deletion)" in {
|
||||
val cmd = DeleteMessagesTo(pid, 3, true)
|
||||
val sub = TestProbe()
|
||||
|
||||
subscribe[DeleteMessagesTo](sub.ref)
|
||||
journal ! cmd
|
||||
sub.expectMsg(cmd)
|
||||
|
||||
journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
|
||||
List(4, 5) foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
|
||||
}
|
||||
"replay logically deleted messages with deleted field set to true (range deletion)" in {
|
||||
val cmd = DeleteMessagesTo(pid, 3, false)
|
||||
val sub = TestProbe()
|
||||
|
||||
subscribe[DeleteMessagesTo](sub.ref)
|
||||
journal ! cmd
|
||||
sub.expectMsg(cmd)
|
||||
|
||||
journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref, replayDeleted = true)
|
||||
1 to 5 foreach { i ⇒
|
||||
i match {
|
||||
case 1 | 2 | 3 ⇒ receiverProbe.expectMsg(replayedMessage(i, deleted = true))
|
||||
case 4 | 5 ⇒ receiverProbe.expectMsg(replayedMessage(i))
|
||||
}
|
||||
}
|
||||
}
|
||||
"replay confirmed messages with corresponding channel ids contained in the confirmed field" in {
|
||||
val confs = List(Confirmation(pid, "c1", 3), Confirmation(pid, "c2", 3))
|
||||
val lpid = pid
|
||||
|
||||
journal ! WriteConfirmations(confs, receiverProbe.ref)
|
||||
receiverProbe.expectMsg(WriteConfirmationsSuccess(confs))
|
||||
|
||||
journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref, replayDeleted = true)
|
||||
1 to 5 foreach { i ⇒
|
||||
i match {
|
||||
case 1 | 2 | 4 | 5 ⇒ receiverProbe.expectMsg(replayedMessage(i))
|
||||
case 3 ⇒ receiverProbe.expectMsgPF() {
|
||||
case ReplayedMessage(PersistentImpl(payload, `i`, `lpid`, false, confirms, _)) ⇒
|
||||
confirms should have length (2)
|
||||
confirms should contain("c1")
|
||||
confirms should contain("c2")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
"ignore orphan deletion markers" in {
|
||||
val msgIds = List(PersistentIdImpl(pid, 3), PersistentIdImpl(pid, 4))
|
||||
journal ! DeleteMessages(msgIds, true, Some(receiverProbe.ref)) // delete message
|
||||
receiverProbe.expectMsg(DeleteMessagesSuccess(msgIds))
|
||||
|
||||
journal ! DeleteMessages(msgIds, false, Some(receiverProbe.ref)) // write orphan marker
|
||||
receiverProbe.expectMsg(DeleteMessagesSuccess(msgIds))
|
||||
|
||||
journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
|
||||
List(1, 2, 5) foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
|
||||
}
|
||||
"ignore orphan confirmation markers" in {
|
||||
val msgIds = List(PersistentIdImpl(pid, 3))
|
||||
journal ! DeleteMessages(msgIds, true, Some(receiverProbe.ref)) // delete message
|
||||
receiverProbe.expectMsg(DeleteMessagesSuccess(msgIds))
|
||||
|
||||
val confs = List(Confirmation(pid, "c1", 3), Confirmation(pid, "c2", 3))
|
||||
journal ! WriteConfirmations(confs, receiverProbe.ref)
|
||||
receiverProbe.expectMsg(WriteConfirmationsSuccess(confs))
|
||||
|
||||
journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
|
||||
List(1, 2, 4, 5) foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
|
||||
}
|
||||
"return a highest stored sequence number > 0 if the persistent actor has already written messages and the message log is non-empty" in {
|
||||
journal ! ReadHighestSequenceNr(3L, pid, receiverProbe.ref)
|
||||
receiverProbe.expectMsg(ReadHighestSequenceNrSuccess(5))
|
||||
|
||||
journal ! ReadHighestSequenceNr(5L, pid, receiverProbe.ref)
|
||||
receiverProbe.expectMsg(ReadHighestSequenceNrSuccess(5))
|
||||
}
|
||||
"return a highest stored sequence number == 0 if the persistent actor has not yet written messages" in {
|
||||
journal ! ReadHighestSequenceNr(0L, "non-existing-pid", receiverProbe.ref)
|
||||
receiverProbe.expectMsg(ReadHighestSequenceNrSuccess(0))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,108 @@
|
|||
package akka.persistence.snapshot
|
||||
|
||||
import scala.collection.immutable.Seq
|
||||
|
||||
import akka.actor._
|
||||
import akka.persistence._
|
||||
import akka.persistence.SnapshotProtocol._
|
||||
import akka.testkit.TestProbe
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
object SnapshotStoreSpec {
|
||||
val config = ConfigFactory.parseString("akka.persistence.publish-plugin-commands = on")
|
||||
}
|
||||
|
||||
/**
|
||||
* This spec aims to verify custom akka-persistence [[SnapshotStore]] implementations.
|
||||
* Plugin authors are highly encouraged to include it in their plugin's test suites.
|
||||
*
|
||||
* In case your journal plugin needs some kind of setup or teardown, override the `beforeAll` or `afterAll`
|
||||
* methods (don't forget to call `super` in your overriden methods).
|
||||
*
|
||||
* For a Java and JUnit consumable version of the TCK please refer to [[akka.persistence.japi.snapshot.JavaSnapshotStoreSpec]].
|
||||
*
|
||||
* @see [[akka.persistence.japi.snapshot.JavaSnapshotStoreSpec]]
|
||||
*/
|
||||
trait SnapshotStoreSpec extends PluginSpec {
|
||||
implicit lazy val system = ActorSystem("SnapshotStoreSpec", config.withFallback(SnapshotStoreSpec.config))
|
||||
|
||||
private var senderProbe: TestProbe = _
|
||||
private var metadata: Seq[SnapshotMetadata] = Nil
|
||||
|
||||
override protected def beforeEach(): Unit = {
|
||||
super.beforeEach()
|
||||
senderProbe = TestProbe()
|
||||
metadata = writeSnapshots()
|
||||
}
|
||||
|
||||
def snapshotStore: ActorRef =
|
||||
extension.snapshotStoreFor(null)
|
||||
|
||||
def writeSnapshots(): Seq[SnapshotMetadata] = {
|
||||
1 to 5 map { i ⇒
|
||||
val metadata = SnapshotMetadata(pid, i + 10)
|
||||
snapshotStore.tell(SaveSnapshot(metadata, s"s-${i}"), senderProbe.ref)
|
||||
senderProbe.expectMsgPF() { case SaveSnapshotSuccess(md) ⇒ md }
|
||||
}
|
||||
}
|
||||
|
||||
"A snapshot store" must {
|
||||
"not load a snapshot given an invalid processor id" in {
|
||||
snapshotStore.tell(LoadSnapshot("invalid", SnapshotSelectionCriteria.Latest, Long.MaxValue), senderProbe.ref)
|
||||
senderProbe.expectMsg(LoadSnapshotResult(None, Long.MaxValue))
|
||||
}
|
||||
"not load a snapshot given non-matching timestamp criteria" in {
|
||||
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria.Latest.copy(maxTimestamp = 100), Long.MaxValue), senderProbe.ref)
|
||||
senderProbe.expectMsg(LoadSnapshotResult(None, Long.MaxValue))
|
||||
}
|
||||
"not load a snapshot given non-matching sequence number criteria" in {
|
||||
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria(7), Long.MaxValue), senderProbe.ref)
|
||||
senderProbe.expectMsg(LoadSnapshotResult(None, Long.MaxValue))
|
||||
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria.Latest, 7), senderProbe.ref)
|
||||
senderProbe.expectMsg(LoadSnapshotResult(None, 7))
|
||||
}
|
||||
"load the most recent snapshot" in {
|
||||
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria.Latest, Long.MaxValue), senderProbe.ref)
|
||||
senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(4), s"s-5")), Long.MaxValue))
|
||||
}
|
||||
"load the most recent snapshot matching an upper sequence number bound" in {
|
||||
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria(13), Long.MaxValue), senderProbe.ref)
|
||||
senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(2), s"s-3")), Long.MaxValue))
|
||||
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria.Latest, 13), senderProbe.ref)
|
||||
senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(2), s"s-3")), 13))
|
||||
}
|
||||
"load the most recent snapshot matching upper sequence number and timestamp bounds" in {
|
||||
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria(13, metadata(2).timestamp), Long.MaxValue), senderProbe.ref)
|
||||
senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(2), s"s-3")), Long.MaxValue))
|
||||
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria.Latest.copy(maxTimestamp = metadata(2).timestamp), 13), senderProbe.ref)
|
||||
senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(2), s"s-3")), 13))
|
||||
}
|
||||
"delete a single snapshot identified by snapshot metadata" in {
|
||||
val md = metadata(2)
|
||||
val cmd = DeleteSnapshot(md)
|
||||
val sub = TestProbe()
|
||||
|
||||
subscribe[DeleteSnapshot](sub.ref)
|
||||
snapshotStore ! cmd
|
||||
sub.expectMsg(cmd)
|
||||
|
||||
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria(md.sequenceNr, md.timestamp), Long.MaxValue), senderProbe.ref)
|
||||
senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(1), s"s-2")), Long.MaxValue))
|
||||
}
|
||||
"delete all snapshots matching upper sequence number and timestamp bounds" in {
|
||||
val md = metadata(2)
|
||||
val cmd = DeleteSnapshots(pid, SnapshotSelectionCriteria(md.sequenceNr, md.timestamp))
|
||||
val sub = TestProbe()
|
||||
|
||||
subscribe[DeleteSnapshots](sub.ref)
|
||||
snapshotStore ! cmd
|
||||
sub.expectMsg(cmd)
|
||||
|
||||
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria(md.sequenceNr, md.timestamp), Long.MaxValue), senderProbe.ref)
|
||||
senderProbe.expectMsg(LoadSnapshotResult(None, Long.MaxValue))
|
||||
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria(metadata(3).sequenceNr, metadata(3).timestamp), Long.MaxValue), senderProbe.ref)
|
||||
senderProbe.expectMsg(LoadSnapshotResult(Some(SelectedSnapshot(metadata(3), s"s-4")), Long.MaxValue))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,21 @@
|
|||
package akka.persistence
|
||||
|
||||
import java.io.File
|
||||
|
||||
import org.apache.commons.io.FileUtils
|
||||
|
||||
trait PluginCleanup extends PluginSpec {
|
||||
val storageLocations = List(
|
||||
"akka.persistence.journal.leveldb.dir",
|
||||
"akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s)))
|
||||
|
||||
override def beforeAll() {
|
||||
storageLocations.foreach(FileUtils.deleteDirectory)
|
||||
super.beforeAll()
|
||||
}
|
||||
|
||||
override def afterAll() {
|
||||
super.afterAll()
|
||||
storageLocations.foreach(FileUtils.deleteDirectory)
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.persistence.japi
|
||||
|
||||
import akka.persistence.japi.journal.JavaJournalSpec
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.DoNotDiscover
|
||||
|
||||
/* Only checking that compilation works with the constructor here as expected (no other abstract fields leaked) */
|
||||
@DoNotDiscover
|
||||
class JavaJournalSpecSpec extends JavaJournalSpec(ConfigFactory.parseString(""))
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
package akka.persistence.journal.leveldb
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
import akka.persistence.journal.{ JournalPerfSpec, JournalSpec }
|
||||
import akka.persistence.PluginCleanup
|
||||
|
||||
class LeveldbJournalJavaSpec extends JournalSpec with PluginCleanup {
|
||||
lazy val config = ConfigFactory.parseString(
|
||||
"""
|
||||
|akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
|
||||
|akka.persistence.journal.leveldb.native = off
|
||||
|akka.persistence.journal.leveldb.dir = "target/journal-java"
|
||||
|akka.persistence.snapshot-store.local.dir = "target/snapshots-java/"
|
||||
""".stripMargin)
|
||||
}
|
||||
|
|
@ -0,0 +1,21 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.persistence.journal.leveldb
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
import akka.persistence.journal.{ JournalPerfSpec, JournalSpec }
|
||||
import akka.persistence.PluginCleanup
|
||||
import org.scalatest.DoNotDiscover
|
||||
|
||||
@DoNotDiscover // because only checking that compilation is OK with JournalPerfSpec
|
||||
class LeveldbJournalNativePerfSpec extends JournalSpec with JournalPerfSpec with PluginCleanup {
|
||||
lazy val config = ConfigFactory.parseString(
|
||||
"""
|
||||
|akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
|
||||
|akka.persistence.journal.leveldb.native = on
|
||||
|akka.persistence.journal.leveldb.dir = "target/journal-native"
|
||||
|akka.persistence.snapshot-store.local.dir = "target/snapshots-native/"
|
||||
""".stripMargin)
|
||||
}
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
package akka.persistence.journal.leveldb
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
import akka.persistence.journal.{ JournalPerfSpec, JournalSpec }
|
||||
import akka.persistence.PluginCleanup
|
||||
|
||||
class LeveldbJournalNativeSpec extends JournalSpec with PluginCleanup {
|
||||
lazy val config = ConfigFactory.parseString(
|
||||
"""
|
||||
|akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
|
||||
|akka.persistence.journal.leveldb.native = on
|
||||
|akka.persistence.journal.leveldb.dir = "target/journal-native"
|
||||
|akka.persistence.snapshot-store.local.dir = "target/snapshots-native/"
|
||||
""".stripMargin)
|
||||
}
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
package akka.persistence.snapshot.local
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
import akka.persistence.PluginCleanup
|
||||
import akka.persistence.snapshot.SnapshotStoreSpec
|
||||
|
||||
class LocalSnapshotStoreSpec extends SnapshotStoreSpec with PluginCleanup {
|
||||
lazy val config = ConfigFactory.parseString(
|
||||
"""
|
||||
|akka.test.timefactor = 3
|
||||
|akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
|
||||
|akka.persistence.snapshot-store.local.dir = "target/snapshots"
|
||||
""".stripMargin)
|
||||
|
||||
}
|
||||
|
|
@ -4,23 +4,22 @@
|
|||
|
||||
package akka
|
||||
|
||||
import sbt._
|
||||
import sbt.Keys._
|
||||
import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.{ MultiJvm, extraOptions }
|
||||
import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings
|
||||
import com.typesafe.tools.mima.plugin.MimaKeys.previousArtifact
|
||||
import com.typesafe.tools.mima.plugin.MimaKeys.reportBinaryIssues
|
||||
import com.typesafe.tools.mima.plugin.MimaKeys.binaryIssueFilters
|
||||
import java.io.{InputStreamReader, FileInputStream, File}
|
||||
import java.io.{FileInputStream, InputStreamReader}
|
||||
import java.util.Properties
|
||||
import sbtunidoc.Plugin.UnidocKeys.unidoc
|
||||
import TestExtras.{ JUnitFileReporting, StatsDMetrics, GraphiteBuildEvents }
|
||||
import com.typesafe.sbt.S3Plugin.{ S3, s3Settings }
|
||||
import Unidoc.{ scaladocSettings, unidocSettings }
|
||||
import Formatting.docFormatSettings
|
||||
import MultiNode.multiJvmSettings
|
||||
|
||||
import akka.Formatting.docFormatSettings
|
||||
import akka.MultiNode.multiJvmSettings
|
||||
import akka.TestExtras.{GraphiteBuildEvents, JUnitFileReporting, StatsDMetrics}
|
||||
import akka.Unidoc.{scaladocSettings, unidocSettings}
|
||||
import com.typesafe.sbt.S3Plugin.{S3, s3Settings}
|
||||
import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.{MultiJvm, extraOptions}
|
||||
import com.typesafe.sbt.site.SphinxSupport
|
||||
import com.typesafe.sbt.site.SphinxSupport.Sphinx
|
||||
import com.typesafe.tools.mima.plugin.MimaKeys.{binaryIssueFilters, previousArtifact, reportBinaryIssues}
|
||||
import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings
|
||||
import sbt.Keys._
|
||||
import sbt._
|
||||
import sbtunidoc.Plugin.UnidocKeys.unidoc
|
||||
|
||||
object AkkaBuild extends Build {
|
||||
System.setProperty("akka.mode", "test") // Is there better place for this?
|
||||
|
|
@ -58,7 +57,7 @@ object AkkaBuild extends Build {
|
|||
validatePullRequest <<= (unidoc in Compile, SphinxSupport.generate in Sphinx in docs) map { (_, _) => }
|
||||
),
|
||||
aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j, agent,
|
||||
persistence, zeroMQ, kernel, osgi, docs, contrib, samples, multiNodeTestkit)
|
||||
persistence, persistenceTck, zeroMQ, kernel, osgi, docs, contrib, samples, multiNodeTestkit)
|
||||
)
|
||||
|
||||
lazy val akkaScalaNightly = Project(
|
||||
|
|
@ -67,7 +66,7 @@ object AkkaBuild extends Build {
|
|||
// remove dependencies that we have to build ourselves (Scala STM, ZeroMQ Scala Bindings)
|
||||
// samples don't work with dbuild right now
|
||||
aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j,
|
||||
persistence, kernel, osgi, contrib, multiNodeTestkit)
|
||||
persistence, persistenceTck, kernel, osgi, contrib, multiNodeTestkit)
|
||||
)
|
||||
|
||||
lazy val actor = Project(
|
||||
|
|
@ -129,6 +128,12 @@ object AkkaBuild extends Build {
|
|||
dependencies = Seq(actor, remote % "test->test", testkit % "test->test")
|
||||
)
|
||||
|
||||
lazy val persistenceTck = Project(
|
||||
id = "akka-persistence-experimental-tck",
|
||||
base = file("akka-persistence-tck"),
|
||||
dependencies = Seq(persistence % "compile;test->test", testkit % "compile;test->test")
|
||||
)
|
||||
|
||||
lazy val zeroMQ = Project(
|
||||
id = "akka-zeromq",
|
||||
base = file("akka-zeromq"),
|
||||
|
|
@ -238,7 +243,7 @@ object AkkaBuild extends Build {
|
|||
base = file("akka-docs"),
|
||||
dependencies = Seq(actor, testkit % "test->test",
|
||||
remote % "compile;test->test", cluster, slf4j, agent, zeroMQ, camel, osgi,
|
||||
persistence % "compile;test->test")
|
||||
persistence % "compile;test->test", persistenceTck)
|
||||
)
|
||||
|
||||
lazy val contrib = Project(
|
||||
|
|
|
|||
|
|
@ -70,7 +70,7 @@ object Dependencies {
|
|||
val karafExam = "org.apache.karaf.tooling.exam" % "org.apache.karaf.tooling.exam.container" % "2.3.1" % "test" // ApacheV2
|
||||
// mirrored in OSGi sample
|
||||
val paxExam = "org.ops4j.pax.exam" % "pax-exam-junit4" % "2.6.0" % "test" // ApacheV2
|
||||
val scalaXml = "org.scala-lang.modules" %% "scala-xml" % "1.0.1" % "test"
|
||||
val scalaXml = "org.scala-lang.modules" %% "scala-xml" % "1.0.1" % "test"
|
||||
|
||||
// metrics, measurements, perf testing
|
||||
val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.1" % "test" // ApacheV2
|
||||
|
|
@ -104,6 +104,8 @@ object Dependencies {
|
|||
val persistence = Seq(levelDB, levelDBNative, protobuf, Test.scalatest, Test.junit, Test.commonsIo) ++
|
||||
scalaXmlDepencency
|
||||
|
||||
val persistenceTck = Seq(Test.scalatest.copy(configurations = Some("compile")), Test.junit.copy(configurations = Some("compile")))
|
||||
|
||||
val kernel = Seq(Test.scalatest, Test.junit)
|
||||
|
||||
val camel = Seq(camelCore, Test.scalatest, Test.junit, Test.mockito, Test.logback, Test.commonsIo, Test.junitIntf)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue