=per,tck #18639 introduce capability flags for journal features
This commit is contained in:
parent
516b1f0954
commit
ec96c5ddb7
18 changed files with 397 additions and 187 deletions
|
|
@ -1 +1 @@
|
|||
1.6
|
||||
1.8
|
||||
|
|
|
|||
|
|
@ -4,179 +4,191 @@
|
|||
|
||||
package docs.persistence;
|
||||
|
||||
import akka.actor.*;
|
||||
import akka.dispatch.Futures;
|
||||
import akka.persistence.*;
|
||||
import akka.persistence.japi.journal.JavaJournalSpec;
|
||||
import akka.persistence.japi.snapshot.JavaSnapshotStoreSpec;
|
||||
import akka.persistence.journal.japi.AsyncWriteJournal;
|
||||
import akka.persistence.journal.leveldb.SharedLeveldbJournal;
|
||||
import akka.persistence.journal.leveldb.SharedLeveldbStore;
|
||||
import akka.persistence.snapshot.japi.SnapshotStore;
|
||||
import com.typesafe.config.Config;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
import org.iq80.leveldb.util.FileUtils;
|
||||
import scala.concurrent.Future;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Consumer;
|
||||
import akka.actor.*;
|
||||
import akka.dispatch.Futures;
|
||||
import com.typesafe.config.Config;
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
import org.iq80.leveldb.util.FileUtils;
|
||||
import akka.persistence.japi.journal.JavaJournalSpec;
|
||||
import akka.persistence.japi.snapshot.JavaSnapshotStoreSpec;
|
||||
import akka.persistence.journal.leveldb.SharedLeveldbJournal;
|
||||
import akka.persistence.journal.leveldb.SharedLeveldbStore;
|
||||
import scala.concurrent.Future;
|
||||
|
||||
//#plugin-imports
|
||||
import akka.persistence.*;
|
||||
import akka.persistence.journal.japi.AsyncWriteJournal;
|
||||
import akka.persistence.snapshot.japi.SnapshotStore;
|
||||
|
||||
//#plugin-imports
|
||||
|
||||
public class PersistencePluginDocTest {
|
||||
|
||||
|
||||
static Object o1 = new Object() {
|
||||
final ActorSystem system = null;
|
||||
//#shared-store-creation
|
||||
final ActorRef store = system.actorOf(Props.create(SharedLeveldbStore.class), "store");
|
||||
//#shared-store-creation
|
||||
static Object o1 = new Object() {
|
||||
final ActorSystem system = null;
|
||||
//#shared-store-creation
|
||||
final ActorRef store = system.actorOf(Props.create(SharedLeveldbStore.class), "store");
|
||||
//#shared-store-creation
|
||||
|
||||
//#shared-store-usage
|
||||
class SharedStorageUsage extends UntypedActor {
|
||||
@Override
|
||||
public void preStart() throws Exception {
|
||||
String path = "akka.tcp://example@127.0.0.1:2552/user/store";
|
||||
ActorSelection selection = getContext().actorSelection(path);
|
||||
selection.tell(new Identify(1), getSelf());
|
||||
//#shared-store-usage
|
||||
class SharedStorageUsage extends UntypedActor {
|
||||
@Override
|
||||
public void preStart() throws Exception {
|
||||
String path = "akka.tcp://example@127.0.0.1:2552/user/store";
|
||||
ActorSelection selection = getContext().actorSelection(path);
|
||||
selection.tell(new Identify(1), getSelf());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) throws Exception {
|
||||
if (message instanceof ActorIdentity) {
|
||||
ActorIdentity identity = (ActorIdentity) message;
|
||||
if (identity.correlationId().equals(1)) {
|
||||
ActorRef store = identity.getRef();
|
||||
if (store != null) {
|
||||
SharedLeveldbJournal.setStore(store, getContext().system());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) throws Exception {
|
||||
if (message instanceof ActorIdentity) {
|
||||
ActorIdentity identity = (ActorIdentity) message;
|
||||
if (identity.correlationId().equals(1)) {
|
||||
ActorRef store = identity.getRef();
|
||||
if (store != null) {
|
||||
SharedLeveldbJournal.setStore(store, getContext().system());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
//#shared-store-usage
|
||||
};
|
||||
|
||||
class MySnapshotStore extends SnapshotStore {
|
||||
@Override
|
||||
public Future<Optional<SelectedSnapshot>> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Void> doDeleteAsync(SnapshotMetadata metadata) {
|
||||
return Futures.successful(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
|
||||
return Futures.successful(null);
|
||||
}
|
||||
}
|
||||
|
||||
class MyAsyncJournal extends AsyncWriteJournal {
|
||||
//#sync-journal-plugin-api
|
||||
@Override
|
||||
public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(
|
||||
Iterable<AtomicWrite> messages) {
|
||||
try {
|
||||
Iterable<Optional<Exception>> result = new ArrayList<Optional<Exception>>();
|
||||
// blocking call here...
|
||||
// result.add(..)
|
||||
return Futures.successful(result);
|
||||
} catch (Exception e) {
|
||||
return Futures.failed(e);
|
||||
}
|
||||
}
|
||||
//#sync-journal-plugin-api
|
||||
}
|
||||
}
|
||||
//#shared-store-usage
|
||||
};
|
||||
|
||||
@Override
|
||||
public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Void> doAsyncReplayMessages(String persistenceId, long fromSequenceNr,
|
||||
long toSequenceNr, long max, Consumer<PersistentRepr> replayCallback) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Long> doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) {
|
||||
return null;
|
||||
}
|
||||
class MySnapshotStore extends SnapshotStore {
|
||||
@Override
|
||||
public Future<Optional<SelectedSnapshot>> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot) {
|
||||
return null;
|
||||
}
|
||||
|
||||
static Object o2 = new Object() {
|
||||
//#journal-tck-java
|
||||
class MyJournalSpecTest extends JavaJournalSpec {
|
||||
@Override
|
||||
public Future<Void> doDeleteAsync(SnapshotMetadata metadata) {
|
||||
return Futures.successful(null);
|
||||
}
|
||||
|
||||
public MyJournalSpecTest() {
|
||||
super(ConfigFactory.parseString(
|
||||
"persistence.journal.plugin = " +
|
||||
"\"akka.persistence.journal.leveldb-shared\""));
|
||||
}
|
||||
@Override
|
||||
public Future<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
|
||||
return Futures.successful(null);
|
||||
}
|
||||
}
|
||||
|
||||
class MyAsyncJournal extends AsyncWriteJournal {
|
||||
//#sync-journal-plugin-api
|
||||
@Override
|
||||
public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(
|
||||
Iterable<AtomicWrite> messages) {
|
||||
try {
|
||||
Iterable<Optional<Exception>> result = new ArrayList<Optional<Exception>>();
|
||||
// blocking call here...
|
||||
// result.add(..)
|
||||
return Futures.successful(result);
|
||||
} catch (Exception e) {
|
||||
return Futures.failed(e);
|
||||
}
|
||||
}
|
||||
//#sync-journal-plugin-api
|
||||
|
||||
@Override
|
||||
public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Void> doAsyncReplayMessages(String persistenceId, long fromSequenceNr,
|
||||
long toSequenceNr, long max, Consumer<PersistentRepr> replayCallback) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Long> doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static Object o2 = new Object() {
|
||||
//#journal-tck-java
|
||||
class MyJournalSpecTest extends JavaJournalSpec {
|
||||
|
||||
public MyJournalSpecTest() {
|
||||
super(ConfigFactory.parseString(
|
||||
"persistence.journal.plugin = " +
|
||||
"\"akka.persistence.journal.leveldb-shared\""));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CapabilityFlag supportsRejectingNonSerializableObjects() {
|
||||
return CapabilityFlag.off();
|
||||
}
|
||||
}
|
||||
//#journal-tck-java
|
||||
};
|
||||
|
||||
static Object o3 = new Object() {
|
||||
//#snapshot-store-tck-java
|
||||
class MySnapshotStoreTest extends JavaSnapshotStoreSpec {
|
||||
|
||||
public MySnapshotStoreTest() {
|
||||
super(ConfigFactory.parseString(
|
||||
"akka.persistence.snapshot-store.plugin = " +
|
||||
"\"akka.persistence.snapshot-store.local\""));
|
||||
}
|
||||
}
|
||||
//#snapshot-store-tck-java
|
||||
};
|
||||
|
||||
static Object o4 = new Object() {
|
||||
//#journal-tck-before-after-java
|
||||
class MyJournalSpecTest extends JavaJournalSpec {
|
||||
|
||||
List<File> storageLocations = new ArrayList<File>();
|
||||
|
||||
public MyJournalSpecTest() {
|
||||
super(ConfigFactory.parseString(
|
||||
"persistence.journal.plugin = " +
|
||||
"\"akka.persistence.journal.leveldb-shared\""));
|
||||
|
||||
Config config = system().settings().config();
|
||||
storageLocations.add(new File(
|
||||
config.getString("akka.persistence.journal.leveldb.dir")));
|
||||
storageLocations.add(new File(
|
||||
config.getString("akka.persistence.snapshot-store.local.dir")));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public CapabilityFlag supportsRejectingNonSerializableObjects() {
|
||||
return CapabilityFlag.on();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeAll() {
|
||||
for (File storageLocation : storageLocations) {
|
||||
FileUtils.deleteRecursively(storageLocation);
|
||||
}
|
||||
//#journal-tck-java
|
||||
};
|
||||
super.beforeAll();
|
||||
}
|
||||
|
||||
static Object o3 = new Object() {
|
||||
//#snapshot-store-tck-java
|
||||
class MySnapshotStoreTest extends JavaSnapshotStoreSpec {
|
||||
|
||||
public MySnapshotStoreTest() {
|
||||
super(ConfigFactory.parseString(
|
||||
"akka.persistence.snapshot-store.plugin = " +
|
||||
"\"akka.persistence.snapshot-store.local\""));
|
||||
}
|
||||
@Override
|
||||
public void afterAll() {
|
||||
super.afterAll();
|
||||
for (File storageLocation : storageLocations) {
|
||||
FileUtils.deleteRecursively(storageLocation);
|
||||
}
|
||||
//#snapshot-store-tck-java
|
||||
};
|
||||
|
||||
static Object o4 = new Object() {
|
||||
//#journal-tck-before-after-java
|
||||
class MyJournalSpecTest extends JavaJournalSpec {
|
||||
|
||||
List<File> storageLocations = new ArrayList<File>();
|
||||
|
||||
public MyJournalSpecTest() {
|
||||
super(ConfigFactory.parseString(
|
||||
"persistence.journal.plugin = " +
|
||||
"\"akka.persistence.journal.leveldb-shared\""));
|
||||
|
||||
Config config = system().settings().config();
|
||||
storageLocations.add(new File(
|
||||
config.getString("akka.persistence.journal.leveldb.dir")));
|
||||
storageLocations.add(new File(
|
||||
config.getString("akka.persistence.snapshot-store.local.dir")));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeAll() {
|
||||
for (File storageLocation : storageLocations) {
|
||||
FileUtils.deleteRecursively(storageLocation);
|
||||
}
|
||||
super.beforeAll();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterAll() {
|
||||
super.afterAll();
|
||||
for (File storageLocation : storageLocations) {
|
||||
FileUtils.deleteRecursively(storageLocation);
|
||||
}
|
||||
}
|
||||
}
|
||||
//#journal-tck-before-after-java
|
||||
};
|
||||
}
|
||||
}
|
||||
//#journal-tck-before-after-java
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -814,6 +814,10 @@ To include the Journal TCK tests in your test suite simply extend the provided `
|
|||
|
||||
.. includecode:: ./code/docs/persistence/PersistencePluginDocTest.java#journal-tck-java
|
||||
|
||||
Please note that some of the tests are optional, and by overriding the ``supports...`` methods you give the
|
||||
TCK the needed information about which tests to run. You can implement these methods using the provided
|
||||
``CapabilityFlag.on`` / ``CapabilityFlag.off`` values.
|
||||
|
||||
We also provide a simple benchmarking class ``JavaJournalPerfSpec`` which includes all the tests that ``JavaJournalSpec``
|
||||
has, and also performs some longer operations on the Journal while printing it's performance stats. While it is NOT aimed
|
||||
to provide a proper benchmarking environment it can be used to get a rough feel about your journals performance in the most
|
||||
|
|
|
|||
|
|
@ -176,9 +176,11 @@ object PersistenceTCKDoc {
|
|||
//#journal-tck-scala
|
||||
class MyJournalSpec extends JournalSpec(
|
||||
config = ConfigFactory.parseString(
|
||||
"""
|
||||
akka.persistence.journal.plugin = "my.journal.plugin"
|
||||
"""))
|
||||
"""akka.persistence.journal.plugin = "my.journal.plugin"""")) {
|
||||
|
||||
override def supportsRejectingNonSerializableObjects: CapabilityFlag =
|
||||
false // or CapabilityFlag.off
|
||||
}
|
||||
//#journal-tck-scala
|
||||
}
|
||||
new AnyRef {
|
||||
|
|
@ -205,6 +207,9 @@ object PersistenceTCKDoc {
|
|||
akka.persistence.journal.plugin = "my.journal.plugin"
|
||||
""")) {
|
||||
|
||||
override def supportsRejectingNonSerializableObjects: CapabilityFlag =
|
||||
true // or CapabilityFlag.on
|
||||
|
||||
val storageLocations = List(
|
||||
new File(system.settings.config.getString("akka.persistence.journal.leveldb.dir")),
|
||||
new File(config.getString("akka.persistence.snapshot-store.local.dir")))
|
||||
|
|
|
|||
|
|
@ -868,6 +868,10 @@ To include the Journal TCK tests in your test suite simply extend the provided `
|
|||
|
||||
.. includecode:: ./code/docs/persistence/PersistencePluginDocSpec.scala#journal-tck-scala
|
||||
|
||||
Please note that some of the tests are optional, and by overriding the ``supports...`` methods you give the
|
||||
TCK the needed information about which tests to run. You can implement these methods using boolean falues or the
|
||||
provided ``CapabilityFlag.on`` / ``CapabilityFlag.off`` values.
|
||||
|
||||
We also provide a simple benchmarking class ``JournalPerfSpec`` which includes all the tests that ``JournalSpec``
|
||||
has, and also performs some longer operations on the Journal while printing it's performance stats. While it is NOT aimed
|
||||
to provide a proper benchmarking environment it can be used to get a rough feel about your journals performance in the most
|
||||
|
|
|
|||
|
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.persistence
|
||||
|
||||
import scala.language.implicitConversions
|
||||
|
||||
sealed abstract class CapabilityFlag {
|
||||
private val capturedStack = (new Throwable().getStackTrace)
|
||||
.filter(_.getMethodName.startsWith("supports"))
|
||||
.find { el ⇒
|
||||
val clazz = Class.forName(el.getClassName)
|
||||
clazz.getDeclaredMethod(el.getMethodName).getReturnType == classOf[CapabilityFlag]
|
||||
} map { _.getMethodName } getOrElse "[unknown]"
|
||||
|
||||
def name: String = capturedStack
|
||||
def value: Boolean
|
||||
}
|
||||
object CapabilityFlag {
|
||||
def on(): CapabilityFlag =
|
||||
new CapabilityFlag { override def value = true }
|
||||
def off(): CapabilityFlag =
|
||||
new CapabilityFlag { override def value = true }
|
||||
|
||||
/** Java DSL */
|
||||
def create(`val`: Boolean): CapabilityFlag =
|
||||
new CapabilityFlag { override def value = `val` }
|
||||
|
||||
// conversions
|
||||
|
||||
implicit def mkFlag(v: Boolean): CapabilityFlag =
|
||||
new CapabilityFlag { override def value = v }
|
||||
}
|
||||
|
||||
sealed trait CapabilityFlags
|
||||
|
||||
//#journal-flags
|
||||
trait JournalCapabilityFlags extends CapabilityFlags {
|
||||
|
||||
/**
|
||||
* When `true` enables tests which check if the Journal properly rejects
|
||||
* writes of objects which are not `java.lang.Serializable`.
|
||||
*/
|
||||
protected def supportsRejectingNonSerializableObjects: CapabilityFlag
|
||||
|
||||
}
|
||||
//#journal-flags
|
||||
|
||||
//#snapshot-store-flags
|
||||
trait SnapshotStoreCapabilityFlags extends CapabilityFlags {
|
||||
// no flags currently
|
||||
}
|
||||
//#snapshot-store-flags
|
||||
|
|
@ -1,3 +1,6 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.persistence
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
|
|
|||
|
|
@ -3,7 +3,8 @@
|
|||
*/
|
||||
package akka.persistence.japi.journal
|
||||
|
||||
import akka.persistence.journal.{ JournalPerfSpec, JournalSpec }
|
||||
import akka.persistence.CapabilityFlag
|
||||
import akka.persistence.journal.JournalPerfSpec
|
||||
import com.typesafe.config.Config
|
||||
import org.junit.runner.RunWith
|
||||
import org.scalatest.Informer
|
||||
|
|
@ -47,4 +48,6 @@ class JavaJournalPerfSpec(config: Config) extends JournalPerfSpec(config) {
|
|||
override protected def info: Informer = new Informer {
|
||||
override def apply(message: String, payload: Option[Any]): Unit = System.out.println(message)
|
||||
}
|
||||
|
||||
override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = CapabilityFlag.on
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@
|
|||
*/
|
||||
package akka.persistence.japi.journal
|
||||
|
||||
import akka.persistence.CapabilityFlag
|
||||
import akka.persistence.journal.JournalSpec
|
||||
import com.typesafe.config.Config
|
||||
import org.junit.runner.RunWith
|
||||
|
|
@ -21,4 +22,6 @@ import org.scalatest.junit.JUnitRunner
|
|||
* @param config configures the Journal plugin to be tested
|
||||
*/
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class JavaJournalSpec(config: Config) extends JournalSpec(config)
|
||||
class JavaJournalSpec(config: Config) extends JournalSpec(config) {
|
||||
override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = CapabilityFlag.on
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@
|
|||
*/
|
||||
package akka.persistence.japi.snapshot
|
||||
|
||||
import akka.persistence.CapabilityFlag
|
||||
import akka.persistence.snapshot.{ SnapshotStore, SnapshotStoreSpec }
|
||||
import com.typesafe.config.Config
|
||||
import org.junit.runner.RunWith
|
||||
|
|
@ -20,4 +21,4 @@ import org.scalatest.junit.JUnitRunner
|
|||
* @see [[akka.persistence.snapshot.SnapshotStoreSpec]]
|
||||
*/
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class JavaSnapshotStoreSpec(config: Config) extends SnapshotStoreSpec(config)
|
||||
class JavaSnapshotStoreSpec(config: Config) extends SnapshotStoreSpec(config)
|
||||
|
|
@ -1,5 +1,10 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.persistence.journal
|
||||
|
||||
import akka.persistence.scalatest.{ MayVerb, OptionalTests }
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.collection.immutable.Seq
|
||||
|
||||
|
|
@ -29,7 +34,8 @@ object JournalSpec {
|
|||
* @see [[akka.persistence.journal.JournalPerfSpec]]
|
||||
* @see [[akka.persistence.japi.journal.JavaJournalPerfSpec]]
|
||||
*/
|
||||
abstract class JournalSpec(config: Config) extends PluginSpec(config) {
|
||||
abstract class JournalSpec(config: Config) extends PluginSpec(config) with MayVerb
|
||||
with OptionalTests with JournalCapabilityFlags {
|
||||
|
||||
implicit lazy val system: ActorSystem = ActorSystem("JournalSpec", config.withFallback(JournalSpec.config))
|
||||
|
||||
|
|
@ -181,33 +187,39 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) {
|
|||
journal ! ReplayMessages(0, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
|
||||
receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L))
|
||||
}
|
||||
}
|
||||
|
||||
"reject non-serializable events" in {
|
||||
// there is no chance that a journal could create a data representation for type of event
|
||||
val notSerializableEvent = new Object { override def toString = "not serializable" }
|
||||
val msgs = (6 to 8).map { i ⇒
|
||||
val event = if (i == 7) notSerializableEvent else s"b-$i"
|
||||
AtomicWrite(PersistentRepr(payload = event, sequenceNr = i, persistenceId = pid, sender = Actor.noSender,
|
||||
writerUuid = writerUuid))
|
||||
}
|
||||
"A Journal optionally" may {
|
||||
|
||||
val probe = TestProbe()
|
||||
journal ! WriteMessages(msgs, probe.ref, actorInstanceId)
|
||||
optional(flag = supportsRejectingNonSerializableObjects) {
|
||||
"reject non-serializable events" in {
|
||||
// there is no chance that a journal could create a data representation for type of event
|
||||
val notSerializableEvent = new Object {
|
||||
override def toString = "not serializable"
|
||||
}
|
||||
val msgs = (6 to 8).map { i ⇒
|
||||
val event = if (i == 7) notSerializableEvent else s"b-$i"
|
||||
AtomicWrite(PersistentRepr(payload = event, sequenceNr = i, persistenceId = pid, sender = Actor.noSender,
|
||||
writerUuid = writerUuid))
|
||||
}
|
||||
|
||||
probe.expectMsg(WriteMessagesSuccessful)
|
||||
val Pid = pid
|
||||
val WriterUuid = writerUuid
|
||||
probe.expectMsgPF() {
|
||||
case WriteMessageSuccess(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid), _) ⇒ payload should be(s"b-6")
|
||||
}
|
||||
probe.expectMsgPF() {
|
||||
case WriteMessageRejected(PersistentImpl(payload, 7L, Pid, _, _, Actor.noSender, WriterUuid), _, _) ⇒
|
||||
payload should be(notSerializableEvent)
|
||||
}
|
||||
probe.expectMsgPF() {
|
||||
case WriteMessageSuccess(PersistentImpl(payload, 8L, Pid, _, _, Actor.noSender, WriterUuid), _) ⇒ payload should be(s"b-8")
|
||||
}
|
||||
val probe = TestProbe()
|
||||
journal ! WriteMessages(msgs, probe.ref, actorInstanceId)
|
||||
|
||||
probe.expectMsg(WriteMessagesSuccessful)
|
||||
val Pid = pid
|
||||
val WriterUuid = writerUuid
|
||||
probe.expectMsgPF() {
|
||||
case WriteMessageSuccess(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid), _) ⇒ payload should be(s"b-6")
|
||||
}
|
||||
probe.expectMsgPF() {
|
||||
case WriteMessageRejected(PersistentImpl(payload, 7L, Pid, _, _, Actor.noSender, WriterUuid), _, _) ⇒
|
||||
payload should be(notSerializableEvent)
|
||||
}
|
||||
probe.expectMsgPF() {
|
||||
case WriteMessageSuccess(PersistentImpl(payload, 8L, Pid, _, _, Actor.noSender, WriterUuid), _) ⇒ payload should be(s"b-8")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,62 @@
|
|||
/**
|
||||
* Copyright (C) 2014-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.persistence.scalatest
|
||||
|
||||
import org.scalatest.exceptions.TestCanceledException
|
||||
import org.scalatest.words.StringVerbBlockRegistration
|
||||
|
||||
trait MayVerb {
|
||||
import MayVerb._
|
||||
|
||||
/**
|
||||
* Configurable number of frames to be shown when a MAY test fails (is canceled).
|
||||
*
|
||||
* Defaults to `3`.
|
||||
* Must be geater than `0`.
|
||||
*/
|
||||
def mayVerbStacktraceContextFrames = 3
|
||||
|
||||
def optional(whenSkippedMessage: String)(body: ⇒ Unit): Unit =
|
||||
try body catch {
|
||||
case cause: Throwable ⇒
|
||||
val shortTrace = cause.getStackTrace.take(mayVerbStacktraceContextFrames)
|
||||
throw new TestCanceledByFailure(whenSkippedMessage, shortTrace)
|
||||
}
|
||||
|
||||
trait StringMayWrapperForVerb {
|
||||
val leftSideString: String
|
||||
|
||||
/**
|
||||
* Block of tests which MAY pass, and if not should be ignored.
|
||||
* Such as rules which may be optionally implemented by Journals.
|
||||
*
|
||||
* MUST be used in conjunction with [[optional]] to provide explanation as to why it may be ok to fail this spec.
|
||||
*
|
||||
* The word `MAY` is to be understood as defined in RFC 2119
|
||||
*
|
||||
* @see <a href="https://www.rfc-editor.org/rfc/rfc2119.txt">RFC 2119</a>
|
||||
*/
|
||||
def may(right: ⇒ Unit)(implicit fun: StringVerbBlockRegistration) {
|
||||
fun(leftSideString, "may", right _)
|
||||
}
|
||||
}
|
||||
|
||||
import scala.language.implicitConversions
|
||||
|
||||
/**
|
||||
* Implicitly converts an object of type <code>String</code> to a <code>StringMayWrapper</code>,
|
||||
* to enable <code>may</code> methods to be invokable on that object.
|
||||
*/
|
||||
implicit def convertToStringMayWrapper(o: String): StringMayWrapperForVerb =
|
||||
new StringMayWrapperForVerb {
|
||||
val leftSideString = o.trim
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object MayVerb {
|
||||
case class TestCanceledByFailure(msg: String, specialStackTrace: Array[StackTraceElement]) extends TestCanceledException(Some(msg), None, 2) {
|
||||
override def getStackTrace = specialStackTrace
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.scalatest
|
||||
|
||||
import akka.persistence.CapabilityFlag
|
||||
import org.scalatest.Informing
|
||||
|
||||
trait OptionalTests {
|
||||
this: Informing ⇒
|
||||
|
||||
def optional(flag: CapabilityFlag)(test: ⇒ Unit) = {
|
||||
val msg = s"CapabilityFlag `${flag.name}` was turned `" +
|
||||
(if (flag.value) "on" else "off") +
|
||||
"`. " + (if (!flag.value) "To enable the related tests override it with `CapabilityFlag.on` (or `true` in Scala)." else "")
|
||||
info(msg)
|
||||
if (flag.value)
|
||||
try test catch {
|
||||
case ex: Exception ⇒
|
||||
throw new AssertionError("Imlpementation did not pass this spec. " +
|
||||
"If your journal will be (by definition) unable to abide the here tested rule, you can disable this test," +
|
||||
s"by overriding [${flag.name}] with CapabilityFlag.off in your test class.")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,5 +1,10 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.persistence.snapshot
|
||||
|
||||
import akka.persistence.scalatest.OptionalTests
|
||||
|
||||
import scala.collection.immutable.Seq
|
||||
import akka.actor._
|
||||
import akka.persistence._
|
||||
|
|
@ -23,7 +28,8 @@ object SnapshotStoreSpec {
|
|||
*
|
||||
* @see [[akka.persistence.japi.snapshot.JavaSnapshotStoreSpec]]
|
||||
*/
|
||||
abstract class SnapshotStoreSpec(config: Config) extends PluginSpec(config) {
|
||||
abstract class SnapshotStoreSpec(config: Config) extends PluginSpec(config)
|
||||
with OptionalTests with SnapshotStoreCapabilityFlags {
|
||||
implicit lazy val system = ActorSystem("SnapshotStoreSpec", config.withFallback(SnapshotStoreSpec.config))
|
||||
|
||||
private var senderProbe: TestProbe = _
|
||||
|
|
|
|||
|
|
@ -1,11 +1,14 @@
|
|||
package akka.persistence.journal.leveldb
|
||||
|
||||
import akka.persistence.journal.JournalSpec
|
||||
import akka.persistence.{ PersistenceSpec, PluginCleanup }
|
||||
import akka.persistence.{ CapabilityFlag, PersistenceSpec, PluginCleanup }
|
||||
|
||||
class LeveldbJournalJavaSpec extends JournalSpec(
|
||||
config = PersistenceSpec.config(
|
||||
"leveldb",
|
||||
"LeveldbJournalJavaSpec",
|
||||
extraConfig = Some("akka.persistence.journal.leveldb.native = off")))
|
||||
with PluginCleanup
|
||||
with PluginCleanup {
|
||||
|
||||
override def supportsRejectingNonSerializableObjects = true
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
package akka.persistence.journal.leveldb
|
||||
|
||||
import akka.persistence.journal.{ JournalPerfSpec, JournalSpec }
|
||||
import akka.persistence.{ PersistenceSpec, PluginCleanup }
|
||||
import akka.persistence.{ CapabilityFlag, PersistenceSpec, PluginCleanup }
|
||||
import org.scalatest.DoNotDiscover
|
||||
|
||||
@DoNotDiscover // because only checking that compilation is OK with JournalPerfSpec
|
||||
|
|
@ -13,4 +13,8 @@ class LeveldbJournalNativePerfSpec extends JournalPerfSpec(
|
|||
"leveldb",
|
||||
"LeveldbJournalNativePerfSpec",
|
||||
extraConfig = Some("akka.persistence.journal.leveldb.native = on")))
|
||||
with PluginCleanup
|
||||
with PluginCleanup {
|
||||
|
||||
override def supportsRejectingNonSerializableObjects = true
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,4 +8,8 @@ class LeveldbJournalNativeSpec extends JournalSpec(
|
|||
"leveldb",
|
||||
"LeveldbJournalNativeSpec",
|
||||
extraConfig = Some("akka.persistence.journal.leveldb.native = on")))
|
||||
with PluginCleanup
|
||||
with PluginCleanup {
|
||||
|
||||
override def supportsRejectingNonSerializableObjects = true
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,5 +14,8 @@ class LeveldbJournalNoAtomicPersistMultipleEventsSpec extends JournalSpec(
|
|||
* Setting to false to test the single message atomic write behaviour of JournalSpec
|
||||
*/
|
||||
override def supportsAtomicPersistAllOfSeveralEvents = false
|
||||
|
||||
override def supportsRejectingNonSerializableObjects = true
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue