Persistence testkit implementation #15571 (#26825)

This commit is contained in:
Kirill Yankov 2020-03-20 22:18:43 +09:00 committed by GitHub
parent 630e712b9f
commit 41f20cbb81
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
43 changed files with 5726 additions and 3 deletions

View file

@ -0,0 +1,30 @@
##################################################
# Akka Persistence Testkit Reference Config File #
##################################################
# This is the reference config file that contains all the default settings.
# Make your edits/overrides in your application.conf.
akka.persistence.testkit {
# configuration for persistence testkit for events
events {
# enable serialization of the persisted events
serialize = true
# timeout for assertions
assert-timeout = 3s
# poll interval for assertions with timeout
assert-poll-interval = 100millis
}
# configuration for persistence testkit for snapshots
snapshots {
# enable serialization of the persisted snapshots
serialize = true
# timeout for assertions
assert-timeout = 3s
# poll interval for assertions with timeout
assert-poll-interval = 100millis
}
}

View file

@ -0,0 +1,156 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit
import akka.annotation.InternalApi
import akka.persistence.PersistentRepr
import akka.persistence.testkit.ProcessingPolicy.DefaultPolicies
import akka.persistence.testkit.internal.TestKitStorage
import akka.util.ccompat.JavaConverters._
import java.util.{ List => JList }
import scala.collection.immutable
import scala.util.{ Failure, Success, Try }
/**
* INTERNAL API
*/
@InternalApi
private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, PersistentRepr] {
import EventStorage._
def addAny(key: String, elem: Any): Unit =
addAny(key, immutable.Seq(elem))
def addAny(key: String, elems: immutable.Seq[Any]): Unit =
// need to use `updateExisting` because `mapAny` reads latest seqnum
// and therefore must be done at the same time with the update, not before
updateOrSetNew(key, v => v ++ mapAny(key, elems).toVector)
override def reprToSeqNum(repr: PersistentRepr): Long = repr.sequenceNr
def add(elems: immutable.Seq[PersistentRepr]): Unit =
elems.groupBy(_.persistenceId).foreach(gr => add(gr._1, gr._2))
override protected val DefaultPolicy = JournalPolicies.PassAll
/**
* @throws Exception from StorageFailure in the current writing policy
*/
def tryAdd(elems: immutable.Seq[PersistentRepr]): Try[Unit] = {
val grouped = elems.groupBy(_.persistenceId)
val processed = grouped.map {
case (pid, els) => currentPolicy.tryProcess(pid, WriteEvents(els.map(_.payload)))
}
val reduced: ProcessingResult =
processed.foldLeft[ProcessingResult](ProcessingSuccess)((left: ProcessingResult, right: ProcessingResult) =>
(left, right) match {
case (ProcessingSuccess, ProcessingSuccess) => ProcessingSuccess
case (f: StorageFailure, _) => f
case (_, f: StorageFailure) => f
case (r: Reject, _) => r
case (_, r: Reject) => r
})
reduced match {
case ProcessingSuccess =>
add(elems)
Success(())
case Reject(ex) => Failure(ex)
case StorageFailure(ex) => throw ex
}
}
def tryRead(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long,
max: Long): immutable.Seq[PersistentRepr] = {
val batch = read(persistenceId, fromSequenceNr, toSequenceNr, max)
currentPolicy.tryProcess(persistenceId, ReadEvents(batch)) match {
case ProcessingSuccess => batch
case Reject(ex) => throw ex
case StorageFailure(ex) => throw ex
}
}
def tryReadSeqNumber(persistenceId: String): Long = {
currentPolicy.tryProcess(persistenceId, ReadSeqNum) match {
case ProcessingSuccess => getHighestSeqNumber(persistenceId)
case Reject(ex) => throw ex
case StorageFailure(ex) => throw ex
}
}
def tryDelete(persistenceId: String, toSeqNumber: Long): Unit = {
currentPolicy.tryProcess(persistenceId, DeleteEvents(toSeqNumber)) match {
case ProcessingSuccess => deleteToSeqNumber(persistenceId, toSeqNumber)
case Reject(ex) => throw ex
case StorageFailure(ex) => throw ex
}
}
private def mapAny(key: String, elems: immutable.Seq[Any]): immutable.Seq[PersistentRepr] = {
val sn = getHighestSeqNumber(key) + 1
elems.zipWithIndex.map(p => PersistentRepr(p._1, p._2 + sn, key))
}
}
object EventStorage {
object JournalPolicies extends DefaultPolicies[JournalOperation]
}
/**
* INTERNAL API
*
* Persistent journal operations.
*/
@InternalApi
sealed trait JournalOperation
/**
* Read from journal operation with events that were read.
*/
final case class ReadEvents(batch: immutable.Seq[Any]) extends JournalOperation {
def getBatch(): JList[Any] = batch.asJava
}
/**
* Write in journal operation with events to be written.
*/
final case class WriteEvents(batch: immutable.Seq[Any]) extends JournalOperation {
def getBatch(): JList[Any] = batch.asJava
}
/**
* Read persistent actor's sequence number operation.
*/
case object ReadSeqNum extends JournalOperation {
/**
* Java API: the singleton instance.
*/
def getInstance() = this
}
/**
* Delete events in the journal up to `toSeqNumber` operation.
*/
final case class DeleteEvents(toSeqNumber: Long) extends JournalOperation {
def getToSeqNumber() = toSeqNumber
}

View file

@ -0,0 +1,105 @@
/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit
import akka.annotation.InternalApi
import akka.persistence._
import akka.persistence.journal.{ AsyncWriteJournal, Tagged }
import akka.persistence.snapshot.SnapshotStore
import akka.persistence.testkit.internal.{ InMemStorageExtension, SnapshotStorageEmulatorExtension }
import com.typesafe.config.{ Config, ConfigFactory }
import scala.collection.immutable
import scala.concurrent.Future
import scala.util.Try
/**
* INTERNAL API
*
* Persistence testkit plugin for events.
*/
@InternalApi
class PersistenceTestKitPlugin extends AsyncWriteJournal {
private final val storage = InMemStorageExtension(context.system)
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] =
Future.fromTry(Try(messages.map(aw => {
val data = aw.payload.map(pl =>
pl.payload match {
case Tagged(p, _) => pl.withPayload(p)
case _ => pl
})
storage.tryAdd(data)
})))
override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] =
Future.fromTry(Try(storage.tryDelete(persistenceId, toSequenceNr)))
override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(
recoveryCallback: PersistentRepr => Unit): Future[Unit] =
Future.fromTry(Try(storage.tryRead(persistenceId, fromSequenceNr, toSequenceNr, max).foreach(recoveryCallback)))
override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] =
Future.fromTry(Try {
val found = storage.tryReadSeqNumber(persistenceId)
if (found < fromSequenceNr) fromSequenceNr else found
})
}
object PersistenceTestKitPlugin {
val PluginId = "akka.persistence.testkit.journal.pluginid"
import akka.util.ccompat.JavaConverters._
def getInstance() = this
val config: Config = ConfigFactory.parseMap(
Map(
"akka.persistence.journal.plugin" -> PluginId,
s"$PluginId.class" -> s"${classOf[PersistenceTestKitPlugin].getName}").asJava)
}
/**
* INTERNAL API
*
* Persistence testkit plugin for snapshots.
*/
@InternalApi
class PersistenceTestKitSnapshotPlugin extends SnapshotStore {
private final val storage = SnapshotStorageEmulatorExtension(context.system)
override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] =
Future.fromTry(Try(storage.tryRead(persistenceId, criteria)))
override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] =
Future.fromTry(Try(storage.tryAdd(metadata, snapshot)))
override def deleteAsync(metadata: SnapshotMetadata): Future[Unit] =
Future.fromTry(Try(storage.tryDelete(metadata)))
override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] =
Future.successful(Try(storage.tryDelete(persistenceId, criteria)))
}
object PersistenceTestKitSnapshotPlugin {
val PluginId = "akka.persistence.testkit.snapshotstore.pluginid"
import akka.util.ccompat.JavaConverters._
def getInstance() = this
val config: Config = ConfigFactory.parseMap(
Map(
"akka.persistence.snapshot-store.plugin" -> PluginId,
s"$PluginId.class" -> classOf[PersistenceTestKitSnapshotPlugin].getName).asJava)
}

View file

@ -0,0 +1,195 @@
/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit
import akka.annotation.{ ApiMayChange, InternalApi }
/**
* Policies allow to emulate behavior of the storage (failures and rejections).
*
* @tparam U type determines operations which storage can perform.
*/
@ApiMayChange
trait ProcessingPolicy[U] {
/**
* Emulates behavior of the storage.
* The function is invoked when any of the plugin's operations is executed.
* If you need this operation to succeed return [[ProcessingSuccess]],
* otherwise you should return some of the [[ProcessingFailure]]'s.
*
* @param processingUnit details about current operation to be executed
* @return needed result of processing the operation
*/
def tryProcess(persistenceId: String, processingUnit: U): ProcessingResult
}
/**
* INTERNAL API
*/
@InternalApi
object ProcessingPolicy {
/**
* INTERNAL API
*/
@InternalApi
private[testkit] trait DefaultPolicies[U] {
type PolicyType = ProcessingPolicy[U]
case object PassAll extends PolicyType {
override def tryProcess(persistenceId: String, processingUnit: U): ProcessingResult = ProcessingSuccess
}
class RejectNextNCond(
_numberToFail: Int,
_failureException: Throwable,
cond: (String, U) => Boolean,
onLimitExceed: => Unit = ())
extends CountNextNCond(_numberToFail, Reject(_failureException), ProcessingSuccess, cond, onLimitExceed)
class FailNextNCond(
_numberToFail: Int,
_failureException: Throwable,
cond: (String, U) => Boolean,
onLimitExceed: => Unit = ())
extends CountNextNCond(_numberToFail, StorageFailure(_failureException), ProcessingSuccess, cond, onLimitExceed)
class FailNextN(_numberToFail: Int, _failureException: Throwable, onLimitExceed: => Unit = ())
extends CountNextNCond(
_numberToFail,
StorageFailure(_failureException),
ProcessingSuccess,
(_, _) => true,
onLimitExceed)
class RejectNextN(_numberToReject: Int, _rejectionException: Throwable, onLimitExceed: => Unit = ())
extends CountNextNCond(
_numberToReject,
Reject(_rejectionException),
ProcessingSuccess,
(_, _) => true,
onLimitExceed)
class ReturnAfterNextNCond(
returnOnTrigger: => ProcessingResult,
returnNonTrigger: => ProcessingResult,
cond: (String, U) => Boolean)
extends PolicyType {
override def tryProcess(persistenceId: String, processingUnit: U): ProcessingResult = {
if (cond(persistenceId, processingUnit)) {
returnOnTrigger
} else {
returnNonTrigger
}
}
}
class CountNextNCond(
numberToCount: Int,
returnOnTrigger: => ProcessingResult,
returnNonTrigger: => ProcessingResult,
cond: (String, U) => Boolean,
onLimitExceed: => Unit)
extends ReturnAfterNextNCond(returnOnTrigger, returnNonTrigger, new Function2[String, U, Boolean] {
var counter = 0
override def apply(persistenceId: String, v1: U): Boolean = {
val intRes = cond(persistenceId, v1)
if (intRes && counter < numberToCount) {
counter += 1
if (counter == numberToCount) onLimitExceed
intRes
} else {
false
}
}
})
}
}
/**
* INTERNAL API
*/
@InternalApi
sealed trait ProcessingResult
sealed abstract class ProcessingSuccess extends ProcessingResult
/**
* Emulates successful processing of some operation.
*/
case object ProcessingSuccess extends ProcessingSuccess {
def getInstance(): ProcessingSuccess = this
}
sealed trait ProcessingFailure extends ProcessingResult {
def error: Throwable
}
sealed abstract class ExpectedRejection extends Throwable
object ExpectedRejection extends ExpectedRejection {
def getInstance(): ExpectedRejection = this
}
sealed abstract class ExpectedFailure extends Throwable
object ExpectedFailure extends ExpectedFailure {
def getInstance(): ExpectedFailure = this
}
/**
* Emulates rejection of operation by the journal with `error` exception.
* Has the same meaning as `StorageFailure` for snapshot storage,
* because it does not support rejections.
*/
final case class Reject(error: Throwable = ExpectedRejection) extends ProcessingFailure {
def getError(): Throwable = error
}
object Reject {
def create(error: Throwable): Reject = Reject(error)
def create(): Reject = Reject(ExpectedRejection)
}
/**
* Emulates exception thrown by the storage on the attempt to perform some operation.
*/
final case class StorageFailure(error: Throwable = ExpectedFailure) extends ProcessingFailure {
def getError(): Throwable = error
}
object StorageFailure {
def create(error: Throwable): StorageFailure = StorageFailure(error)
def create(): StorageFailure = StorageFailure(ExpectedFailure)
}

View file

@ -0,0 +1,150 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit
import akka.annotation.InternalApi
import akka.persistence.{ SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria }
import akka.persistence.testkit.ProcessingPolicy.DefaultPolicies
import akka.persistence.testkit.internal.TestKitStorage
import scala.util.Success
/**
* INTERNAL API
*/
@InternalApi
private[testkit] trait SnapshotStorage extends TestKitStorage[SnapshotOperation, (SnapshotMetadata, Any)] {
import SnapshotStorage._
override def reprToSeqNum(repr: (SnapshotMetadata, Any)): Long =
repr._1.sequenceNr
override protected val DefaultPolicy = SnapshotPolicies.PassAll
def tryAdd(meta: SnapshotMetadata, payload: Any): Unit = {
currentPolicy.tryProcess(meta.persistenceId, WriteSnapshot(SnapshotMeta(meta.sequenceNr, meta.timestamp), payload)) match {
case ProcessingSuccess =>
add(meta.persistenceId, (meta, payload))
Success(())
case f: ProcessingFailure => throw f.error
}
}
def tryRead(persistenceId: String, criteria: SnapshotSelectionCriteria): Option[SelectedSnapshot] = {
val selectedSnapshot =
read(persistenceId).flatMap(
_.reverseIterator.find(v => criteria.matches(v._1)).map(v => SelectedSnapshot(v._1, v._2)))
currentPolicy.tryProcess(persistenceId, ReadSnapshot(criteria, selectedSnapshot.map(_.snapshot))) match {
case ProcessingSuccess => selectedSnapshot
case f: ProcessingFailure => throw f.error
}
}
def tryDelete(persistenceId: String, selectionCriteria: SnapshotSelectionCriteria): Unit = {
currentPolicy.tryProcess(persistenceId, DeleteSnapshotsByCriteria(selectionCriteria)) match {
case ProcessingSuccess =>
delete(persistenceId, v => selectionCriteria.matches(v._1))
case f: ProcessingFailure => throw f.error
}
}
def tryDelete(meta: SnapshotMetadata): Unit = {
currentPolicy.tryProcess(meta.persistenceId, DeleteSnapshotByMeta(SnapshotMeta(meta.sequenceNr, meta.timestamp))) match {
case ProcessingSuccess =>
delete(meta.persistenceId, _._1.sequenceNr == meta.sequenceNr)
case f: ProcessingFailure => throw f.error
}
}
}
object SnapshotStorage {
object SnapshotPolicies extends DefaultPolicies[SnapshotOperation]
}
/**
* Snapshot metainformation.
*/
final case class SnapshotMeta(sequenceNr: Long, timestamp: Long = 0L) {
def getSequenceNr() = sequenceNr
def getTimestamp() = timestamp
}
case object SnapshotMeta {
def create(sequenceNr: Long, timestamp: Long) =
SnapshotMeta(sequenceNr, timestamp)
def create(sequenceNr: Long) = SnapshotMeta(sequenceNr)
}
/**
* INTERNAL API
* Operations supported by snapshot plugin
*/
@InternalApi
sealed trait SnapshotOperation
/**
*
* Storage read operation for recovery of the persistent actor.
*
* @param criteria criteria with which snapshot is searched
* @param snapshot snapshot found by criteria
*/
final case class ReadSnapshot(criteria: SnapshotSelectionCriteria, snapshot: Option[Any]) extends SnapshotOperation {
def getSnapshotSelectionCriteria() = criteria
def getSnapshot(): java.util.Optional[Any] =
snapshot.map(java.util.Optional.of[Any]).getOrElse(java.util.Optional.empty[Any]())
}
/**
* Storage write operation to persist snapshot in the storage.
*
* @param metadata snapshot metadata
* @param snapshot snapshot payload
*/
final case class WriteSnapshot(metadata: SnapshotMeta, snapshot: Any) extends SnapshotOperation {
def getMetadata() = metadata
def getSnapshot() = snapshot
}
/**
* INTERNAL API
*/
@InternalApi
sealed abstract class DeleteSnapshot extends SnapshotOperation
/**
* Delete snapshots from storage by criteria.
*/
final case class DeleteSnapshotsByCriteria(criteria: SnapshotSelectionCriteria) extends DeleteSnapshot {
def getCriteria() = criteria
}
/**
* Delete particular snapshot from storage by its metadata.
*/
final case class DeleteSnapshotByMeta(metadata: SnapshotMeta) extends DeleteSnapshot {
def getMetadata() = metadata
}

View file

@ -0,0 +1,29 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.internal
import akka.actor.{ ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider }
import akka.annotation.InternalApi
import akka.persistence.testkit.scaladsl.PersistenceTestKit
import akka.persistence.testkit.EventStorage
/**
* INTERNAL API
*/
@InternalApi
private[testkit] object InMemStorageExtension extends ExtensionId[EventStorage] with ExtensionIdProvider {
override def get(system: ActorSystem): EventStorage = super.get(system)
override def createExtension(system: ExtendedActorSystem) =
if (PersistenceTestKit.Settings(system).serialize) {
new SerializedEventStorageImpl(system)
} else {
new SimpleEventStorageImpl
}
override def lookup() = InMemStorageExtension
}

View file

@ -0,0 +1,43 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.internal
import akka.actor.{ ActorSystem, ExtendedActorSystem }
import akka.annotation.InternalApi
import akka.persistence.PersistentRepr
import akka.persistence.testkit.EventStorage
import akka.serialization.{ Serialization, SerializationExtension }
import scala.util.Try
/**
* INTERNAL API
*/
@InternalApi
private[testkit] class SerializedEventStorageImpl(system: ActorSystem) extends EventStorage {
override type InternalRepr = (Int, Array[Byte])
private lazy val serialization = SerializationExtension(system)
/**
* @return (serializer id, serialized bytes)
*/
override def toInternal(repr: PersistentRepr): (Int, Array[Byte]) =
Serialization.withTransportInformation(system.asInstanceOf[ExtendedActorSystem]) { () =>
val s = serialization.findSerializerFor(repr)
(s.identifier, s.toBinary(repr))
}
/**
* @param internal (serializer id, serialized bytes)
*/
override def toRepr(internal: (Int, Array[Byte])): PersistentRepr =
serialization
.deserialize(internal._2, internal._1, PersistentRepr.Undefined)
.flatMap(r => Try(r.asInstanceOf[PersistentRepr]))
.get
}

View file

@ -0,0 +1,34 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.internal
import akka.actor.{ ActorSystem, ExtendedActorSystem }
import akka.annotation.InternalApi
import akka.persistence.SnapshotMetadata
import akka.persistence.testkit.SnapshotStorage
import akka.serialization.{ Serialization, SerializationExtension, Serializers }
/**
* INTERNAL API
*/
@InternalApi
private[testkit] class SerializedSnapshotStorageImpl(system: ActorSystem) extends SnapshotStorage {
override type InternalRepr = (SnapshotMetadata, String, Int, Array[Byte])
private lazy val serialization = SerializationExtension(system)
override def toRepr(internal: (SnapshotMetadata, String, Int, Array[Byte])): (SnapshotMetadata, Any) =
(internal._1, serialization.deserialize(internal._4, internal._3, internal._2).get)
override def toInternal(repr: (SnapshotMetadata, Any)): (SnapshotMetadata, String, Int, Array[Byte]) =
Serialization.withTransportInformation(system.asInstanceOf[ExtendedActorSystem]) { () =>
val payload = repr._2.asInstanceOf[AnyRef]
val s = serialization.findSerializerFor(payload)
val manifest = Serializers.manifestFor(s, payload)
(repr._1, manifest, s.identifier, s.toBinary(payload))
}
}

View file

@ -0,0 +1,23 @@
/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.internal
import akka.annotation.InternalApi
import akka.persistence._
import akka.persistence.testkit.EventStorage
/**
* INTERNAL API
*/
@InternalApi
private[testkit] class SimpleEventStorageImpl extends EventStorage {
override type InternalRepr = PersistentRepr
override def toInternal(repr: PersistentRepr): PersistentRepr = repr
override def toRepr(internal: PersistentRepr): PersistentRepr = internal
}

View file

@ -0,0 +1,23 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.internal
import akka.annotation.InternalApi
import akka.persistence.SnapshotMetadata
import akka.persistence.testkit.SnapshotStorage
/**
* INTERNAL API
*/
@InternalApi
private[testkit] class SimpleSnapshotStorageImpl extends SnapshotStorage {
override type InternalRepr = (SnapshotMetadata, Any)
override def toRepr(internal: (SnapshotMetadata, Any)): (SnapshotMetadata, Any) = identity(internal)
override def toInternal(repr: (SnapshotMetadata, Any)): (SnapshotMetadata, Any) = identity(repr)
}

View file

@ -0,0 +1,29 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.internal
import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
import akka.annotation.InternalApi
import akka.persistence.testkit.scaladsl.SnapshotTestKit
import akka.persistence.testkit.SnapshotStorage
/**
* INTERNAL API
*/
@InternalApi
private[testkit] object SnapshotStorageEmulatorExtension extends ExtensionId[SnapshotStorage] with ExtensionIdProvider {
override def get(system: ActorSystem): SnapshotStorage = super.get(system)
override def createExtension(system: ExtendedActorSystem): SnapshotStorage =
if (SnapshotTestKit.Settings(system).serialize) {
new SerializedSnapshotStorageImpl(system)
} else {
new SimpleSnapshotStorageImpl
}
override def lookup(): ExtensionId[_ <: Extension] =
SnapshotStorageEmulatorExtension
}

View file

@ -0,0 +1,154 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.internal
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicReference
import akka.actor.Extension
import akka.annotation.InternalApi
import akka.persistence.testkit.ProcessingPolicy
import scala.collection.immutable
/**
* INTERNAL API
*/
@InternalApi
sealed trait InternalReprSupport[R] {
type InternalRepr
private[testkit] def toInternal(repr: R): InternalRepr
private[testkit] def toRepr(internal: InternalRepr): R
}
/**
* INTERNAL API
*/
@InternalApi
sealed trait InMemStorage[K, R] extends InternalReprSupport[R] {
import scala.math._
private final val eventsMap: ConcurrentHashMap[K, (Long, Vector[InternalRepr])] =
new ConcurrentHashMap()
def reprToSeqNum(repr: R): Long
def findMany(key: K, fromInclusive: Int, maxNum: Int): Option[Vector[R]] =
read(key).flatMap(
value =>
if (value.size > fromInclusive)
Some(value.drop(fromInclusive).take(maxNum))
else None)
def findOneByIndex(key: K, index: Int): Option[R] =
Option(eventsMap.get(key))
.flatMap {
case (_, value) => if (value.size > index) Some(value(index)) else None
}
.map(toRepr)
def add(key: K, p: R): Unit =
add(key, List(p))
/**
* Adds elements ordered by seqnum, sets new seqnum as max(old, max(newElemsSeqNums)))
*/
def add(key: K, elems: immutable.Seq[R]): Unit =
updateOrSetNew(key, v => v ++ elems)
/**
* Deletes elements preserving highest sequence number.
*/
def delete(key: K, needsToBeDeleted: R => Boolean): Vector[R] =
updateOrSetNew(key, v => v.filterNot(needsToBeDeleted))
/**
* Sets new elements returned by updater ordered by seqnum. Sets new seqnum as max(old, max(newElemsFromUpdaterSeqNums))
*/
def updateOrSetNew(key: K, updater: Vector[R] => Vector[R]): Vector[R] =
eventsMap
.compute(
key,
(_: K, value: (Long, Vector[InternalRepr])) => {
val (sn, elems) = if (value != null) value else (0L, Vector.empty)
val upd = updater(elems.map(toRepr)).sortBy(reprToSeqNum)
(max(getLastSeqNumber(upd), sn), upd.map(toInternal))
})
._2
.map(toRepr)
def read(key: K): Option[Vector[R]] =
Option(eventsMap.get(key)).map(_._2.map(toRepr))
def clearAll(): Unit =
eventsMap.clear()
/**
* Removes key and the whole value including seqnum.
*/
def removeKey(key: K): Vector[R] =
Option(eventsMap.remove(key)).map(_._2).getOrElse(Vector.empty).map(toRepr)
/**
* Reads elems within the range of seqnums.
*/
def read(key: K, fromInclusive: Long, toInclusive: Long, maxNumber: Long): immutable.Seq[R] =
read(key)
.getOrElse(Vector.empty)
.dropWhile(reprToSeqNum(_) < fromInclusive)
// we dont need to read highestSeqNumber because it will in any case stop at it if toInclusive > highestSeqNumber
.takeWhile(reprToSeqNum(_) <= toInclusive)
.take(if (maxNumber > Int.MaxValue) Int.MaxValue else maxNumber.toInt)
def removePreservingSeqNumber(key: K): Unit =
updateOrSetNew(key, _ => Vector.empty)
def getHighestSeqNumber(key: K): Long =
Option(eventsMap.get(key)).map(_._1).getOrElse(0L)
def deleteToSeqNumber(key: K, toSeqNumberInclusive: Long): Unit =
updateOrSetNew(key, value => {
value.dropWhile(reprToSeqNum(_) <= toSeqNumberInclusive)
})
def clearAllPreservingSeqNumbers(): Unit =
eventsMap.forEachKey(1, removePreservingSeqNumber)
private def getLastSeqNumber(elems: immutable.Seq[R]): Long =
elems.lastOption.map(reprToSeqNum).getOrElse(0L)
}
/**
* INTERNAL API
*/
@InternalApi
sealed trait PolicyOps[U] {
type Policy = ProcessingPolicy[U]
protected val DefaultPolicy: Policy
private lazy val _processingPolicy: AtomicReference[Policy] =
new AtomicReference(DefaultPolicy)
def currentPolicy: Policy = _processingPolicy.get()
def setPolicy(policy: Policy): Unit = _processingPolicy.set(policy)
def returnDefaultPolicy(): Unit = setPolicy(DefaultPolicy)
}
/**
* INTERNAL API
*/
@InternalApi
private[testkit] trait TestKitStorage[P, R] extends InMemStorage[String, R] with PolicyOps[P] with Extension

View file

@ -0,0 +1,446 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.javadsl
import akka.actor.ActorSystem
import akka.persistence.testkit.scaladsl.{ PersistenceTestKit => ScalaTestKit }
import akka.util.JavaDurationConverters._
import akka.util.ccompat.JavaConverters._
import java.time.Duration
import java.util.{ List => JList }
import java.util.{ function => jf }
import akka.annotation.ApiMayChange
import akka.persistence.testkit.{ EventStorage, ExpectedFailure, ExpectedRejection, JournalOperation }
/**
* Class for testing persisted events in persistent actors.
*/
@ApiMayChange
class PersistenceTestKit(system: ActorSystem) {
private val scalaTestkit = new ScalaTestKit(system)
/**
* Check that nothing has been saved in the storage.
*/
def expectNothingPersisted(persistenceId: String): Unit = scalaTestkit.expectNothingPersisted(persistenceId)
/**
* Check for `max` time that nothing has been saved in the storage.
*/
def expectNothingPersisted(persistenceId: String, max: Duration): Unit =
scalaTestkit.expectNothingPersisted(persistenceId, max.asScala)
/**
* Check that `event` has been saved in the storage.
*/
def expectNextPersisted[A](persistenceId: String, event: A): A =
scalaTestkit.expectNextPersisted(persistenceId, event)
/**
* Check for `max` time that `event` has been saved in the storage.
*/
def expectNextPersisted[A](persistenceId: String, event: A, max: Duration): A =
scalaTestkit.expectNextPersisted(persistenceId, event, max.asScala)
/**
* Check that next persisted in storage for particular persistence id event has expected type.
*/
def expectNextPersistedClass[A](persistenceId: String, cla: Class[A]): A =
scalaTestkit.expectNextPersistedClass(persistenceId, cla)
/**
* Check for `max` time that next persisted in storage for particular persistence id event has expected type.
*/
def expectNextPersistedClass[A](persistenceId: String, cla: Class[A], max: Duration): A =
scalaTestkit.expectNextPersistedClass(persistenceId, cla, max.asScala)
/**
* Fail next `n` write operations with the `cause` exception for particular persistence id.
*/
def failNextNPersisted(persistenceId: String, n: Int, cause: Throwable): Unit =
scalaTestkit.failNextNPersisted(persistenceId, n, cause)
/**
* Fail next `n` write operations for particular persistence id.
*/
def failNextNPersisted(persistenceId: String, n: Int): Unit = failNextNPersisted(persistenceId, n, ExpectedFailure)
/**
* Fail next `n` write operations with the `cause` exception for any persistence id.
*/
def failNextNPersisted(n: Int, cause: Throwable): Unit = scalaTestkit.failNextNPersisted(n, cause)
/**
* Fail next `n` write operations with default exception for any persistence id.
*/
def failNextNPersisted(n: Int): Unit = failNextNPersisted(n, ExpectedFailure)
/**
* Fail next write operation with `cause` exception for particular persistence id.
*/
def failNextPersisted(persistenceId: String, cause: Throwable): Unit = failNextNPersisted(persistenceId, 1, cause)
/**
* Fail next write operation with default exception for particular persistence id.
*/
def failNextPersisted(persistenceId: String): Unit = failNextNPersisted(persistenceId, 1)
/**
* Fail next write operation event with `cause` exception for any persistence id.
*/
def failNextPersisted(cause: Throwable): Unit = failNextNPersisted(1, cause)
/**
* Fail next write operation with default exception for any persistence id.
*/
def failNextPersisted(): Unit = failNextNPersisted(1)
/**
* Fail next read from storage (recovery) attempt with `cause` exception for any persistence id.
*/
def failNextRead(cause: Throwable): Unit = failNextNReads(1, cause)
/**
* Fail next read from storage (recovery) attempt with default exception for any persistence id.
*/
def failNextRead(): Unit = failNextNReads(1)
/**
* Fail next read from storage (recovery) attempt with `cause` exception for particular persistence id.
*/
def failNextRead(persistenceId: String, cause: Throwable): Unit = failNextNReads(persistenceId, 1, cause)
/**
* Fail next read from storage (recovery) attempt with default exception for any persistence id.
*/
def failNextRead(persistenceId: String): Unit = failNextNReads(persistenceId, 1)
/**
* Fail next n read from storage (recovery) attempts with `cause` exception for any persistence id.
*/
def failNextNReads(n: Int, cause: Throwable): Unit = scalaTestkit.failNextNReads(n, cause)
/**
* Fail next n read from storage (recovery) attempts with default exception for any persistence id.
*/
def failNextNReads(n: Int): Unit = failNextNReads(n, ExpectedFailure)
/**
* Fail next n read from storage (recovery) attempts with `cause` exception for particular persistence id.
*/
def failNextNReads(persistenceId: String, n: Int, cause: Throwable): Unit =
scalaTestkit.failNextNReads(persistenceId, n, cause)
/**
* Fail next n read from storage (recovery) attempts with default exception for particular persistence id.
*/
def failNextNReads(persistenceId: String, n: Int): Unit = failNextNReads(persistenceId, n, ExpectedFailure)
/**
* Fail next delete from storage attempt with `cause` exception for any persistence id.
*/
def failNextDelete(cause: Throwable): Unit = failNextNDeletes(1, cause)
/**
* Fail next delete from storage attempt with default exception for any persistence id.
*/
def failNextDelete(): Unit = failNextNDeletes(1)
/**
* Fail next delete from storage attempt with `cause` exception for particular persistence id.
*/
def failNextDelete(persistenceId: String, cause: Throwable): Unit = failNextNDeletes(persistenceId, 1, cause)
/**
* Fail next delete from storage attempt with default exception for particular persistence id.
*/
def failNextDelete(persistenceId: String): Unit = failNextNDeletes(persistenceId, 1)
/**
* Fail next n delete from storage attempts with `cause` exception for any persistence id.
*/
def failNextNDeletes(n: Int, cause: Throwable): Unit = scalaTestkit.failNextNDeletes(n, cause)
/**
* Fail next n delete from storage attempts with default exception for any persistence id.
*/
def failNextNDeletes(n: Int): Unit = failNextNDeletes(n, ExpectedFailure)
/**
* Fail next n delete from storage attempts with `cause` exception for particular persistence id.
*/
def failNextNDeletes(persistenceId: String, n: Int, cause: Throwable): Unit =
scalaTestkit.failNextNDeletes(persistenceId, n, cause)
/**
* Fail next n delete from storage attempts with default exception for particular persistence id.
*/
def failNextNDeletes(persistenceId: String, n: Int): Unit = failNextNDeletes(persistenceId, n, ExpectedFailure)
/**
* Receive next n events from the storage.
*/
def receivePersisted[A](persistenceId: String, n: Int, cla: Class[A]): JList[A] =
scalaTestkit.receivePersisted(persistenceId, n, cla).asJava
/**
* Receive for `max` time next n events from the storage.
*/
def receivePersisted[A](persistenceId: String, n: Int, cla: Class[A], max: Duration): JList[A] =
scalaTestkit.receivePersisted(persistenceId, n, cla, max.asScala).asJava
/**
* Reject next n save in storage operations for particular persistence id with `cause` exception.
*/
def rejectNextNPersisted(persistenceId: String, n: Int, cause: Throwable): Unit =
scalaTestkit.rejectNextNPersisted(persistenceId, n, cause)
/**
* Reject next n save in storage operations for particular persistence id with default exception.
*/
def rejectNextNPersisted(persistenceId: String, n: Int): Unit =
rejectNextNPersisted(persistenceId, n, ExpectedRejection)
/**
* Reject next n save in storage operations for any persistence id with default exception.
*/
def rejectNextNPersisted(n: Int): Unit = rejectNextNPersisted(n, ExpectedRejection)
/**
* Reject next n save in storage operations for any persistence id with `cause` exception.
*/
def rejectNextNPersisted(n: Int, cause: Throwable): Unit = scalaTestkit.rejectNextNPersisted(n, cause)
/**
* Reject next save in storage operation for particular persistence id with default exception.
*/
def rejectNextPersisted(persistenceId: String): Unit = rejectNextNPersisted(persistenceId, 1)
/**
* Reject next save in storage operation for particular persistence id with `cause` exception.
*/
def rejectNextPersisted(persistenceId: String, cause: Throwable): Unit = rejectNextNPersisted(persistenceId, 1, cause)
/**
* Reject next save in storage operation for any persistence id with `cause` exception.
*/
def rejectNextPersisted(cause: Throwable): Unit = rejectNextNPersisted(1, cause)
/**
* Reject next save in storage operation for any persistence id with default exception.
*/
def rejectNextPersisted(): Unit = rejectNextNPersisted(1)
/**
* Reject next read from storage operation for any persistence id with default exception.
*/
def rejectNextRead(): Unit = rejectNextNReads(1)
/**
* Reject next read from storage operation for any persistence id with `cause` exception.
*/
def rejectNextRead(cause: Throwable): Unit = rejectNextNReads(1, cause)
/**
* Reject next n read from storage operations for any persistence id with default exception.
*/
def rejectNextNReads(n: Int): Unit = rejectNextNReads(n, ExpectedRejection)
/**
* Reject next n read from storage operations for any persistence id with `cause` exception.
*/
def rejectNextNReads(n: Int, cause: Throwable): Unit = scalaTestkit.rejectNextNReads(n, cause)
/**
* Reject next read from storage operation for particular persistence id with default exception.
*/
def rejectNextRead(persistenceId: String): Unit = rejectNextNReads(persistenceId, 1)
/**
* Reject next read from storage operation for particular persistence id with `cause` exception.
*/
def rejectNextRead(persistenceId: String, cause: Throwable): Unit = rejectNextNReads(persistenceId, 1, cause)
/**
* Reject next n read from storage operations for particular persistence id with default exception.
*/
def rejectNextNReads(persistenceId: String, n: Int): Unit = rejectNextNReads(persistenceId, n, ExpectedRejection)
/**
* Reject next n read from storage operations for particular persistence id with `cause` exception.
*/
def rejectNextNReads(persistenceId: String, n: Int, cause: Throwable): Unit =
scalaTestkit.rejectNextNReads(persistenceId, n, cause)
/**
* Reject next delete from storage operation for any persistence id with default exception.
*/
def rejectNextDelete(): Unit = rejectNextNDeletes(1)
/**
* Reject next delete from storage operation for any persistence id with `cause` exception.
*/
def rejectNextDelete(cause: Throwable): Unit = rejectNextNDeletes(1, cause)
/**
* Reject next n delete from storage operations for any persistence id with default exception.
*/
def rejectNextNDeletes(n: Int): Unit = rejectNextNDeletes(n, ExpectedRejection)
/**
* Reject next n delete from storage operations for any persistence id with `cause` exception.
*/
def rejectNextNDeletes(n: Int, cause: Throwable): Unit = scalaTestkit.rejectNextNDeletes(n, cause)
/**
* Reject next delete from storage operations for particular persistence id with default exception.
*/
def rejectNextDelete(persistenceId: String): Unit = rejectNextNDeletes(persistenceId, 1)
/**
* Reject next delete from storage operations for particular persistence id with `cause` exception.
*/
def rejectNextDelete(persistenceId: String, cause: Throwable): Unit = rejectNextNDeletes(persistenceId, 1, cause)
/**
* Reject next n delete from storage operations for particular persistence id with default exception.
*/
def rejectNextNDeletes(persistenceId: String, n: Int): Unit = rejectNextNDeletes(persistenceId, n, ExpectedRejection)
/**
* Reject next n delete from storage operations for particular persistence id with `cause` exception.
*/
def rejectNextNDeletes(persistenceId: String, n: Int, cause: Throwable): Unit =
scalaTestkit.rejectNextNDeletes(persistenceId, n, cause)
/**
* Reject `n` following journal operations depending on the condition `cond`.
* Rejection triggers, when `cond` returns true.
* Reject operations with default `ExpectedRejection` exception.
*/
def rejectNextNOpsCond(cond: jf.BiFunction[String, JournalOperation, Boolean], n: Int): Unit =
rejectNextNOpsCond(cond, n, ExpectedRejection)
/**
* Reject `n` following journal operations depending on the condition `cond`.
* Rejection triggers, when `cond` returns true.
* Rejects operations with the `cause` exception.
*/
def rejectNextNOpsCond(cond: jf.BiFunction[String, JournalOperation, Boolean], n: Int, cause: Throwable): Unit =
scalaTestkit.rejectNextNOpsCond((l: String, r: JournalOperation) => cond.apply(l, r), n, cause)
/**
* Reject n following journal operations regardless of their type.
* Rejects operations with default `ExpectedRejection` exception.
*/
def rejectNextNOps(n: Int): Unit = rejectNextNOps(n, ExpectedRejection)
/**
* Reject `n` following journal operations regardless of their type.
* Rejects operations with the `cause` exception.
*/
def rejectNextNOps(n: Int, cause: Throwable): Unit = scalaTestkit.rejectNextNOps(n, cause)
/**
* Persist `events` into storage in order.
*/
def persistForRecovery(persistenceId: String, events: JList[Any]): Unit =
scalaTestkit.persistForRecovery(persistenceId, events.asScala.toVector)
/**
* Retrieve all events saved in storage by persistence id.
*/
def persistedInStorage(persistenceId: String): JList[Any] = scalaTestkit.persistedInStorage(persistenceId).asJava
/**
* Clear all data from storage.
*
* NOTE! Also clears sequence numbers in storage!
*
* @see [[PersistenceTestKit.clearAllPreservingSeqNumbers()]]
*/
def clearAll(): Unit = scalaTestkit.clearAll()
/**
* Clear all data from storage for particular persistence id.
*
* NOTE! Also clears sequence number in storage!
*
* @see [[PersistenceTestKit.clearByIdPreservingSeqNumbers()]]
*/
def clearByPersistenceId(persistenceId: String): Unit = scalaTestkit.clearByPersistenceId(persistenceId)
/**
* Clear all data in storage preserving sequence numbers.
*
* @see [[PersistenceTestKit.clearAll()]]
*/
def clearAllPreservingSeqNumbers(): Unit = scalaTestkit.clearAllPreservingSeqNumbers()
/**
* Clear all data in storage for particular persistence id preserving sequence numbers.
*
* @see [[PersistenceTestKit.clearByPersistenceId()]]
*/
def clearByIdPreservingSeqNumbers(persistenceId: String): Unit =
scalaTestkit.clearByIdPreservingSeqNumbers(persistenceId)
/**
* Fail `n` following journal operations depending on the condition `cond`.
* Failure triggers, when `cond` returns true.
* Fails operations with default `ExpectedFailure` exception.
*/
def failNextNOpsCond(cond: jf.BiFunction[String, JournalOperation, Boolean], n: Int): Unit =
failNextNOpsCond(cond, n, ExpectedFailure)
/**
* Fail `n` following journal operations depending on the condition `cond`.
* Failure triggers, when `cond` returns true.
* Fails operations with the `cause` exception.
*/
def failNextNOpsCond(cond: jf.BiFunction[String, JournalOperation, Boolean], n: Int, cause: Throwable): Unit =
scalaTestkit.failNextNOpsCond((l: String, r: JournalOperation) => cond.apply(l, r), n, cause)
/**
* Fail n following journal operations regardless of their type.
* Fails operations with default `ExpectedFailure` exception.
*/
def failNextNOps(n: Int): Unit =
failNextNOps(n, ExpectedFailure)
/**
* Fail `n` following journal operations depending on the condition `cond`.
* Failure triggers, when `cond` returns true.
* Fails operations with the `cause` exception.
*/
def failNextNOps(n: Int, cause: Throwable): Unit = scalaTestkit.failNextNOps(n, cause)
/**
* Set new processing policy for journal operations.
* NOTE! Overrides previously invoked `failNext...` or `rejectNext...`
*/
def withPolicy(policy: EventStorage.JournalPolicies.PolicyType): PersistenceTestKit = {
scalaTestkit.withPolicy(policy)
this
}
/**
* Returns default policy if it was changed by [[PersistenceTestKit.withPolicy()]].
*/
def returnDefaultPolicy(): Unit = scalaTestkit.returnDefaultPolicy()
}
object PersistenceTestKit {
import akka.actor.typed.{ ActorSystem => TypedActorSystem }
def create(system: ActorSystem): PersistenceTestKit = new PersistenceTestKit(system)
def create(system: TypedActorSystem[_]): PersistenceTestKit = create(system.classicSystem)
}

View file

@ -0,0 +1,273 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.javadsl
import akka.actor.ActorSystem
import akka.persistence.testkit.scaladsl.{ SnapshotTestKit => ScalaTestKit }
import akka.persistence.testkit.{ ExpectedFailure, SnapshotMeta, SnapshotOperation, SnapshotStorage }
import akka.util.JavaDurationConverters._
import akka.util.ccompat.JavaConverters._
import java.time.Duration
import java.util.{ List => JList }
import java.util.{ function => jf }
import akka.annotation.ApiMayChange
import akka.japi.Pair
/**
* Class for testing persisted snapshots in persistent actors.
*/
@ApiMayChange
class SnapshotTestKit(system: ActorSystem) {
private val scalaTestkit = new ScalaTestKit(system)
/**
* Check that nothing has been saved in the storage.
*/
def expectNothingPersisted(persistenceId: String): Unit = scalaTestkit.expectNothingPersisted(persistenceId)
/**
* Check for `max` time that nothing has been saved in the storage.
*/
def expectNothingPersisted(persistenceId: String, max: Duration): Unit =
scalaTestkit.expectNothingPersisted(persistenceId, max.asScala)
/**
* Check that `snapshot` has been saved in the storage.
*/
def expectNextPersisted[A](persistenceId: String, snapshot: A): A =
scalaTestkit.expectNextPersisted(persistenceId, snapshot)
/**
* Check for `max` time that `snapshot` has been saved in the storage.
*/
def expectNextPersisted[A](persistenceId: String, snapshot: A, max: Duration): A =
scalaTestkit.expectNextPersisted(persistenceId, snapshot, max.asScala)
/**
* Check that next persisted in storage for particular persistence id snapshot has expected type.
*/
def expectNextPersistedClass[A](persistenceId: String, cla: Class[A]): A =
scalaTestkit.expectNextPersistedClass[A](persistenceId, cla)
/**
* Check for `max` time that next persisted in storage for particular persistence id snapshot has expected type.
*/
def expectNextPersistedClass[A](persistenceId: String, cla: Class[A], max: Duration): A =
scalaTestkit.expectNextPersistedClass[A](persistenceId, cla, max.asScala)
/**
* Fail next `n` write operations with the `cause` exception for particular persistence id.
*/
def failNextNPersisted(persistenceId: String, n: Int, cause: Throwable): Unit =
scalaTestkit.failNextNPersisted(persistenceId, n, cause)
/**
* Fail next `n` write operations for particular persistence id.
*/
def failNextNPersisted(persistenceId: String, n: Int): Unit = failNextNPersisted(persistenceId, n, ExpectedFailure)
/**
* Fail next `n` write operations with the `cause` exception for any persistence id.
*/
def failNextNPersisted(n: Int, cause: Throwable): Unit = scalaTestkit.failNextNPersisted(n, cause)
/**
* Fail next `n` write operations with default exception for any persistence id.
*/
def failNextNPersisted(n: Int): Unit = failNextNPersisted(n, ExpectedFailure)
/**
* Fail next write operations with `cause` exception for particular persistence id.
*/
def failNextPersisted(persistenceId: String, cause: Throwable): Unit = failNextNPersisted(persistenceId, 1, cause)
/**
* Fail next write operations with default exception for particular persistence id.
*/
def failNextPersisted(persistenceId: String): Unit = failNextNPersisted(persistenceId, 1)
/**
* Fail next write operations with `cause` exception for any persistence id.
*/
def failNextPersisted(cause: Throwable): Unit = failNextNPersisted(1, cause)
/**
* Fail next write operations with default exception for any persistence id.
*/
def failNextPersisted(): Unit = failNextNPersisted(1)
/**
* Fail next read from storage (recovery) attempt with `cause` exception for any persistence id.
*/
def failNextRead(cause: Throwable): Unit = failNextNReads(1, cause)
/**
* Fail next read from storage (recovery) attempt with default exception for any persistence id.
*/
def failNextRead(): Unit = failNextNReads(1)
/**
* Fail next read from storage (recovery) attempt with `cause` exception for particular persistence id.
*/
def failNextRead(persistenceId: String, cause: Throwable): Unit = failNextNReads(persistenceId, 1, cause)
/**
* Fail next read from storage (recovery) attempt with default exception for any persistence id.
*/
def failNextRead(persistenceId: String): Unit = failNextNReads(persistenceId, 1)
/**
* Fail next n read from storage (recovery) attempts with `cause` exception for any persistence id.
*/
def failNextNReads(n: Int, cause: Throwable): Unit = scalaTestkit.failNextNReads(n, cause)
/**
* Fail next n read from storage (recovery) attempts with default exception for any persistence id.
*/
def failNextNReads(n: Int): Unit = failNextNReads(n, ExpectedFailure)
/**
* Fail next n read from storage (recovery) attempts with `cause` exception for particular persistence id.
*/
def failNextNReads(persistenceId: String, n: Int, cause: Throwable): Unit =
scalaTestkit.failNextNReads(persistenceId, n, cause)
/**
* Fail next n read from storage (recovery) attempts with default exception for particular persistence id.
*/
def failNextNReads(persistenceId: String, n: Int): Unit = failNextNReads(persistenceId, n, ExpectedFailure)
/**
* Fail next delete from storage attempt with `cause` exception for any persistence id.
*/
def failNextDelete(cause: Throwable): Unit = failNextNDeletes(1, cause)
/**
* Fail next delete from storage attempt with default exception for any persistence id.
*/
def failNextDelete(): Unit = failNextNDeletes(1)
/**
* Fail next delete from storage attempt with `cause` exception for particular persistence id.
*/
def failNextDelete(persistenceId: String, cause: Throwable): Unit = failNextNDeletes(persistenceId, 1, cause)
/**
* Fail next delete from storage attempt with default exception for particular persistence id.
*/
def failNextDelete(persistenceId: String): Unit = failNextNDeletes(persistenceId, 1)
/**
* Fail next n delete from storage attempts with `cause` exception for any persistence id.
*/
def failNextNDeletes(n: Int, cause: Throwable): Unit = scalaTestkit.failNextNDeletes(n, cause)
/**
* Fail next n delete from storage attempts with default exception for any persistence id.
*/
def failNextNDeletes(n: Int): Unit = failNextNDeletes(n, ExpectedFailure)
/**
* Fail next n delete from storage attempts with `cause` exception for particular persistence id.
*/
def failNextNDeletes(persistenceId: String, n: Int, cause: Throwable): Unit =
scalaTestkit.failNextNDeletes(persistenceId, n, cause)
/**
* Fail next n delete from storage attempts with default exception for particular persistence id.
*/
def failNextNDeletes(persistenceId: String, n: Int): Unit = failNextNDeletes(persistenceId, n, ExpectedFailure)
/**
* Receive next `n` snapshots that have been persisted in the storage.
*/
def receivePersisted[A](persistenceId: String, n: Int, cla: Class[A]): JList[A] =
scalaTestkit.receivePersisted[A](persistenceId, n, cla).asJava
/**
* Receive for `max` time next `n` snapshots that have been persisted in the storage.
*/
def receivePersisted[A](persistenceId: String, n: Int, cla: Class[A], max: Duration): JList[A] =
scalaTestkit.receivePersisted[A](persistenceId, n, cla, max.asScala).asJava
/**
* Persist `snapshots` with metadata into storage in order.
*/
def persistForRecovery(persistenceId: String, snapshots: JList[Pair[SnapshotMeta, Any]]): Unit =
scalaTestkit.persistForRecovery(persistenceId, snapshots.asScala.toVector.map(_.toScala))
/**
* Retrieve all snapshots and their metadata saved in storage by persistence id.
*/
def persistedInStorage(persistenceId: String): JList[Pair[SnapshotMeta, Any]] =
scalaTestkit.persistedInStorage(persistenceId).map(p => Pair(p._1, p._2)).asJava
/**
* Clear all data from storage.
*/
def clearAll(): Unit = scalaTestkit.clearAll()
/**
* Clear all data from storage for particular persistence id.
*/
def clearByPersistenceId(persistenceId: String): Unit = scalaTestkit.clearByPersistenceId(persistenceId)
/**
* Fail `n` following journal operations depending on the condition `cond`.
* Failure triggers, when `cond` returns true.
* Fails operations with default `ExpectedFailure` exception.
*/
def failNextNOpsCond(cond: jf.BiFunction[String, SnapshotOperation, Boolean], n: Int): Unit =
failNextNOpsCond(cond, n, ExpectedFailure)
/**
* Fail `n` following journal operations depending on the condition `cond`.
* Failure triggers, when `cond` returns true.
* Fails operations with the `cause` exception.
*/
def failNextNOpsCond(cond: jf.BiFunction[String, SnapshotOperation, Boolean], n: Int, cause: Throwable): Unit =
scalaTestkit.failNextNOpsCond((l: String, r: SnapshotOperation) => cond.apply(l, r), n, cause)
/**
* Fail n following journal operations regardless of their type.
* Fails operations with default `ExpectedFailure` exception.
*/
def failNextNOps(n: Int): Unit =
failNextNOps(n, ExpectedFailure)
/**
* Fail `n` following journal operations depending on the condition `cond`.
* Failure triggers, when `cond` returns true.
* Fails operations with the `cause` exception.
*/
def failNextNOps(n: Int, cause: Throwable): Unit = scalaTestkit.failNextNOps(n, cause)
/**
* Set new processing policy for journal operations.
* NOTE! Overrides previously invoked `failNext...` or `rejectNext...`
*/
def withPolicy(policy: SnapshotStorage.SnapshotPolicies.PolicyType): SnapshotTestKit = {
scalaTestkit.withPolicy(policy)
this
}
/**
* Returns default policy if it was changed by [[SnapshotTestKit.withPolicy()]].
*/
def returnDefaultPolicy(): Unit = scalaTestkit.returnDefaultPolicy()
}
object SnapshotTestKit {
import akka.actor.typed.{ ActorSystem => TypedActorSystem }
def create(system: ActorSystem): SnapshotTestKit = new SnapshotTestKit(system)
def create(system: TypedActorSystem[_]): SnapshotTestKit = create(system.classicSystem)
}

View file

@ -0,0 +1,521 @@
/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.scaladsl
import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId }
import akka.actor.typed.{ ActorSystem => TypedActorSystem }
import akka.annotation.ApiMayChange
import akka.persistence.testkit._
import akka.persistence.testkit.internal.{ InMemStorageExtension, SnapshotStorageEmulatorExtension }
import akka.persistence.{ Persistence, PersistentRepr, SnapshotMetadata }
import akka.testkit.TestProbe
import com.typesafe.config.Config
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
import scala.util.Try
private[testkit] trait CommonTestKitOps[S, P] extends ClearOps with PolicyOpsTestKit[P] {
this: HasStorage[P, S] =>
/**
* Check that nothing has been saved in the storage.
*/
def expectNothingPersisted(persistenceId: String): Unit
/**
* Check for `max` time that nothing has been saved in the storage.
*/
def expectNothingPersisted(persistenceId: String, max: FiniteDuration): Unit
/**
* Check that `event` has been saved in the storage.
*/
def expectNextPersisted[A](persistenceId: String, event: A): A
/**
* Check for `max` time that `event` has been saved in the storage.
*/
def expectNextPersisted[A](persistenceId: String, event: A, max: FiniteDuration): A
/**
* Fail next `n` write operations with the `cause` exception for particular persistence id.
*/
def failNextNPersisted(persistenceId: String, n: Int, cause: Throwable): Unit
/**
* Fail next `n` write operations for particular persistence id.
*/
def failNextNPersisted(persistenceId: String, n: Int): Unit = failNextNPersisted(persistenceId, n, ExpectedFailure)
/**
* Fail next `n` write operations with the `cause` exception for any persistence id.
*/
def failNextNPersisted(n: Int, cause: Throwable): Unit
/**
* Fail next `n` write operations with default exception for any persistence id.
*/
def failNextNPersisted(n: Int): Unit = failNextNPersisted(n, ExpectedFailure)
/**
* Fail next write operation with `cause` exception for particular persistence id.
*/
def failNextPersisted(persistenceId: String, cause: Throwable): Unit = failNextNPersisted(persistenceId, 1, cause)
/**
* Fail next write operation with default exception for particular persistence id.
*/
def failNextPersisted(persistenceId: String): Unit = failNextNPersisted(persistenceId, 1)
/**
* Fail next write operation with `cause` exception for any persistence id.
*/
def failNextPersisted(cause: Throwable): Unit = failNextNPersisted(1, cause)
/**
* Fail next write operation with default exception for any persistence id.
*/
def failNextPersisted(): Unit = failNextNPersisted(1)
/**
* Fail next read from storage (recovery) attempt with `cause` exception for any persistence id.
*/
def failNextRead(cause: Throwable): Unit = failNextNReads(1, cause)
/**
* Fail next read from storage (recovery) attempt with default exception for any persistence id.
*/
def failNextRead(): Unit = failNextNReads(1)
/**
* Fail next read from storage (recovery) attempt with `cause` exception for particular persistence id.
*/
def failNextRead(persistenceId: String, cause: Throwable): Unit = failNextNReads(persistenceId, 1, cause)
/**
* Fail next read from storage (recovery) attempt with default exception for any persistence id.
*/
def failNextRead(persistenceId: String): Unit = failNextNReads(persistenceId, 1)
/**
* Fail next n read from storage (recovery) attempts with `cause` exception for any persistence id.
*/
def failNextNReads(n: Int, cause: Throwable): Unit
/**
* Fail next n read from storage (recovery) attempts with default exception for any persistence id.
*/
def failNextNReads(n: Int): Unit = failNextNReads(n, ExpectedFailure)
/**
* Fail next n read from storage (recovery) attempts with `cause` exception for particular persistence id.
*/
def failNextNReads(persistenceId: String, n: Int, cause: Throwable): Unit
/**
* Fail next n read from storage (recovery) attempts with default exception for particular persistence id.
*/
def failNextNReads(persistenceId: String, n: Int): Unit = failNextNReads(persistenceId, n, ExpectedFailure)
/**
* Fail next delete from storage attempt with `cause` exception for any persistence id.
*/
def failNextDelete(cause: Throwable): Unit = failNextNDeletes(1, cause)
/**
* Fail next delete from storage attempt with default exception for any persistence id.
*/
def failNextDelete(): Unit = failNextNDeletes(1)
/**
* Fail next delete from storage attempt with `cause` exception for particular persistence id.
*/
def failNextDelete(persistenceId: String, cause: Throwable): Unit = failNextNDeletes(persistenceId, 1, cause)
/**
* Fail next delete from storage attempt with default exception for particular persistence id.
*/
def failNextDelete(persistenceId: String): Unit = failNextNDeletes(persistenceId, 1)
/**
* Fail next n delete from storage attempts with `cause` exception for any persistence id.
*/
def failNextNDeletes(n: Int, cause: Throwable): Unit
/**
* Fail next n delete from storage attempts with default exception for any persistence id.
*/
def failNextNDeletes(n: Int): Unit = failNextNDeletes(n, ExpectedFailure)
/**
* Fail next n delete from storage attempts with `cause` exception for particular persistence id.
*/
def failNextNDeletes(persistenceId: String, n: Int, cause: Throwable): Unit
/**
* Fail next n delete from storage attempts with default exception for particular persistence id.
*/
def failNextNDeletes(persistenceId: String, n: Int): Unit = failNextNDeletes(persistenceId, n, ExpectedFailure)
}
private[testkit] trait PersistenceTestKitOps[S, P]
extends RejectSupport[P]
with ClearPreservingSeqNums
with CommonTestKitOps[S, P] {
this: HasStorage[P, S] =>
/**
* Reject next n save in storage operations for particular persistence id with `cause` exception.
*/
def rejectNextNPersisted(persistenceId: String, n: Int, cause: Throwable): Unit
/**
* Reject next n save in storage operations for particular persistence id with default exception.
*/
def rejectNextNPersisted(persistenceId: String, n: Int): Unit =
rejectNextNPersisted(persistenceId, n, ExpectedRejection)
/**
* Reject next n save in storage operations for any persistence id with default exception.
*/
def rejectNextNPersisted(n: Int): Unit = rejectNextNPersisted(n, ExpectedRejection)
/**
* Reject next n save in storage operations for any persistence id with `cause` exception.
*/
def rejectNextNPersisted(n: Int, cause: Throwable): Unit
/**
* Reject next save in storage operation for particular persistence id with default exception.
*/
def rejectNextPersisted(persistenceId: String): Unit = rejectNextNPersisted(persistenceId, 1)
/**
* Reject next save in storage operation for particular persistence id with `cause` exception.
*/
def rejectNextPersisted(persistenceId: String, cause: Throwable): Unit = rejectNextNPersisted(persistenceId, 1, cause)
/**
* Reject next save in storage operation for any persistence id with `cause` exception.
*/
def rejectNextPersisted(cause: Throwable): Unit = rejectNextNPersisted(1, cause)
/**
* Reject next save in storage operation for any persistence id with default exception.
*/
def rejectNextPersisted(): Unit = rejectNextNPersisted(1)
/**
* Reject next read from storage operation for any persistence id with default exception.
*/
def rejectNextRead(): Unit = rejectNextNReads(1)
/**
* Reject next read from storage operation for any persistence id with `cause` exception.
*/
def rejectNextRead(cause: Throwable): Unit = rejectNextNReads(1, cause)
/**
* Reject next n read from storage operations for any persistence id with default exception.
*/
def rejectNextNReads(n: Int): Unit = rejectNextNReads(n, ExpectedRejection)
/**
* Reject next n read from storage operations for any persistence id with `cause` exception.
*/
def rejectNextNReads(n: Int, cause: Throwable): Unit
/**
* Reject next read from storage operation for particular persistence id with default exception.
*/
def rejectNextRead(persistenceId: String): Unit = rejectNextNReads(persistenceId, 1)
/**
* Reject next read from storage operation for particular persistence id with `cause` exception.
*/
def rejectNextRead(persistenceId: String, cause: Throwable): Unit = rejectNextNReads(persistenceId, 1, cause)
/**
* Reject next n read from storage operations for particular persistence id with default exception.
*/
def rejectNextNReads(persistenceId: String, n: Int): Unit = rejectNextNReads(persistenceId, n, ExpectedRejection)
/**
* Reject next n read from storage operations for particular persistence id with `cause` exception.
*/
def rejectNextNReads(persistenceId: String, n: Int, cause: Throwable): Unit
/**
* Reject next delete from storage operation for any persistence id with default exception.
*/
def rejectNextDelete(): Unit = rejectNextNDeletes(1)
/**
* Reject next delete from storage operation for any persistence id with `cause` exception.
*/
def rejectNextDelete(cause: Throwable): Unit = rejectNextNDeletes(1, cause)
/**
* Reject next n delete from storage operations for any persistence id with default exception.
*/
def rejectNextNDeletes(n: Int): Unit = rejectNextNDeletes(n, ExpectedRejection)
/**
* Reject next n delete from storage operations for any persistence id with `cause` exception.
*/
def rejectNextNDeletes(n: Int, cause: Throwable): Unit
/**
* Reject next delete from storage operations for particular persistence id with default exception.
*/
def rejectNextDelete(persistenceId: String): Unit = rejectNextNDeletes(persistenceId, 1)
/**
* Reject next delete from storage operations for particular persistence id with `cause` exception.
*/
def rejectNextDelete(persistenceId: String, cause: Throwable): Unit = rejectNextNDeletes(persistenceId, 1, cause)
/**
* Reject next n delete from storage operations for particular persistence id with default exception.
*/
def rejectNextNDeletes(persistenceId: String, n: Int): Unit = rejectNextNDeletes(persistenceId, n, ExpectedRejection)
/**
* Reject next n delete from storage operations for particular persistence id with `cause` exception.
*/
def rejectNextNDeletes(persistenceId: String, n: Int, cause: Throwable): Unit
/**
* Persist `snapshots` into storage in order.
*/
def persistForRecovery(persistenceId: String, snapshots: immutable.Seq[Any]): Unit
/**
* Retrieve all snapshots saved in storage by persistence id.
*/
def persistedInStorage(persistenceId: String): immutable.Seq[Any]
}
/**
* Class for testing snapshots of persistent actors.
*
* NOTE! ActorSystem must be configured with [[PersistenceTestKitSnapshotPlugin]].
* The configuration can be retrieved with [[PersistenceTestKitSnapshotPlugin.config]].
*/
@ApiMayChange
class SnapshotTestKit(system: ActorSystem)
extends CommonTestKitOps[(SnapshotMetadata, Any), SnapshotOperation]
with PolicyOpsTestKit[SnapshotOperation]
with ExpectOps[(SnapshotMetadata, Any)]
with HasStorage[SnapshotOperation, (SnapshotMetadata, Any)] {
require(
Try(Persistence(system).journalFor(PersistenceTestKitSnapshotPlugin.PluginId)).isSuccess,
"The test persistence plugin for snapshots is not configured.")
import SnapshotTestKit._
override protected val storage: SnapshotStorage = SnapshotStorageEmulatorExtension(system)
private val settings = Settings(system)
override private[testkit] val probe = TestProbe()(system)
override private[testkit] val pollInterval: FiniteDuration = settings.pollInterval
override private[testkit] val maxTimeout: FiniteDuration = settings.assertTimeout
override private[testkit] val Policies = SnapshotStorage.SnapshotPolicies
override def failNextNPersisted(persistenceId: String, n: Int, cause: Throwable): Unit =
failNextNOpsCond((pid, op) => pid == persistenceId && op.isInstanceOf[WriteSnapshot], n, cause)
override def failNextNPersisted(n: Int, cause: Throwable): Unit =
failNextNOpsCond((_, op) => op.isInstanceOf[WriteSnapshot], n, cause)
override def failNextNReads(n: Int, cause: Throwable): Unit =
failNextNOpsCond((_, op) => op.isInstanceOf[ReadSnapshot], n, cause)
override def failNextNReads(persistenceId: String, n: Int, cause: Throwable): Unit =
failNextNOpsCond((pid, op) => pid == persistenceId && op.isInstanceOf[ReadSnapshot], n, cause)
override def failNextNDeletes(n: Int, cause: Throwable): Unit =
failNextNOpsCond((_, op) => op.isInstanceOf[DeleteSnapshot], n, cause)
override def failNextNDeletes(persistenceId: String, n: Int, cause: Throwable): Unit =
failNextNOpsCond((pid, op) => pid == persistenceId && op.isInstanceOf[DeleteSnapshot], n, cause)
/**
* Persist `elems` pairs of (snapshot metadata, snapshot payload) into storage.
*/
def persistForRecovery(persistenceId: String, elems: immutable.Seq[(SnapshotMeta, Any)]): Unit =
elems.foreach {
case (m, p) =>
storage.add(persistenceId, (SnapshotMetadata(persistenceId, m.sequenceNr, m.timestamp), p))
addToIndex(persistenceId, 1)
}
/**
* Persist a pair of (snapshot metadata, snapshot payload) into storage.
*/
def persistForRecovery(persistenceId: String, elem: (SnapshotMeta, Any)): Unit =
persistForRecovery(persistenceId, immutable.Seq(elem))
/**
* Retrieve snapshots and their metadata from storage by persistence id.
*/
def persistedInStorage(persistenceId: String): immutable.Seq[(SnapshotMeta, Any)] =
storage
.read(persistenceId)
.map(_.map(m => (SnapshotMeta(m._1.sequenceNr, m._1.timestamp), m._2)))
.getOrElse(Vector.empty)
override private[testkit] def reprToAny(repr: (SnapshotMetadata, Any)) = repr._2
}
@ApiMayChange
object SnapshotTestKit {
def apply(implicit system: ActorSystem): SnapshotTestKit = new SnapshotTestKit(system)
def apply(implicit system: TypedActorSystem[_]): SnapshotTestKit = apply(system.classicSystem)
object Settings extends ExtensionId[Settings] {
val configPath = "akka.persistence.testkit.snapshots"
override def createExtension(system: ExtendedActorSystem): Settings =
new Settings(system.settings.config.getConfig(configPath))
override def get(system: ActorSystem): Settings = super.get(system)
}
class Settings(config: Config) extends Extension {
import akka.util.Helpers._
val serialize: Boolean = config.getBoolean("serialize")
val assertTimeout: FiniteDuration = config.getMillisDuration("assert-timeout")
val pollInterval: FiniteDuration = config.getMillisDuration("assert-poll-interval")
}
}
/**
* Class for testing events of persistent actors.
*
* NOTE! ActorSystem must be configured with [[PersistenceTestKitPlugin]].
* The configuration can be retrieved with [[PersistenceTestKitPlugin.config]].
*/
@ApiMayChange
class PersistenceTestKit(system: ActorSystem)
extends PersistenceTestKitOps[PersistentRepr, JournalOperation]
with ExpectOps[PersistentRepr]
with HasStorage[JournalOperation, PersistentRepr] {
require(
Try(Persistence(system).journalFor(PersistenceTestKitPlugin.PluginId)).isSuccess,
"The test persistence plugin is not configured.")
import PersistenceTestKit._
override protected val storage = InMemStorageExtension(system)
private final lazy val settings = Settings(system)
override private[testkit] val probe = TestProbe()(system)
override private[testkit] val Policies = EventStorage.JournalPolicies
override private[testkit] val pollInterval: FiniteDuration = settings.pollInterval
override private[testkit] val maxTimeout: FiniteDuration = settings.assertTimeout
override def rejectNextNPersisted(persistenceId: String, n: Int, cause: Throwable): Unit =
rejectNextNOpsCond((pid, op) => pid == persistenceId && op.isInstanceOf[WriteEvents], n, cause)
override def rejectNextNPersisted(n: Int, cause: Throwable): Unit =
rejectNextNOpsCond((_, op) => op.isInstanceOf[WriteEvents], n, cause)
override def rejectNextNReads(n: Int, cause: Throwable): Unit =
rejectNextNOpsCond((_, op) => op.isInstanceOf[ReadEvents] || op.isInstanceOf[ReadSeqNum.type], n, cause)
override def rejectNextNReads(persistenceId: String, n: Int, cause: Throwable): Unit =
rejectNextNOpsCond(
(pid, op) => (pid == persistenceId) && (op.isInstanceOf[ReadEvents] || op.isInstanceOf[ReadSeqNum.type]),
n,
cause)
override def rejectNextNDeletes(n: Int, cause: Throwable): Unit =
rejectNextNOpsCond((_, op) => op.isInstanceOf[DeleteEvents], n, cause)
override def rejectNextNDeletes(persistenceId: String, n: Int, cause: Throwable): Unit =
rejectNextNOpsCond((pid, op) => pid == persistenceId && op.isInstanceOf[DeleteEvents], n, cause)
override def failNextNPersisted(persistenceId: String, n: Int, cause: Throwable): Unit =
failNextNOpsCond((pid, op) => pid == persistenceId && op.isInstanceOf[WriteEvents], n, cause)
override def failNextNPersisted(n: Int, cause: Throwable): Unit =
failNextNOpsCond((_, op) => op.isInstanceOf[WriteEvents], n, cause)
override def failNextNReads(n: Int, cause: Throwable): Unit =
failNextNOpsCond((_, op) => op.isInstanceOf[ReadEvents] || op.isInstanceOf[ReadSeqNum.type], n, cause)
override def failNextNReads(persistenceId: String, n: Int, cause: Throwable): Unit =
failNextNOpsCond(
(pid, op) => (pid == persistenceId) && (op.isInstanceOf[ReadEvents] || op.isInstanceOf[ReadSeqNum.type]),
n,
cause)
override def failNextNDeletes(n: Int, cause: Throwable): Unit =
failNextNOpsCond((_, op) => op.isInstanceOf[DeleteEvents], n, cause)
override def failNextNDeletes(persistenceId: String, n: Int, cause: Throwable): Unit =
failNextNOpsCond((pid, op) => pid == persistenceId && op.isInstanceOf[DeleteEvents], n, cause)
def persistForRecovery(persistenceId: String, snapshots: immutable.Seq[Any]): Unit = {
storage.addAny(persistenceId, snapshots)
addToIndex(persistenceId, snapshots.size)
}
def persistedInStorage(persistenceId: String): immutable.Seq[Any] =
storage.read(persistenceId).getOrElse(List.empty).map(reprToAny)
override private[testkit] def reprToAny(repr: PersistentRepr): Any = repr.payload
}
@ApiMayChange
object PersistenceTestKit {
def apply(system: ActorSystem): PersistenceTestKit = new PersistenceTestKit(system)
def apply(system: TypedActorSystem[_]): PersistenceTestKit = apply(system.classicSystem)
object Settings extends ExtensionId[Settings] {
val configPath = "akka.persistence.testkit.events"
override def get(system: ActorSystem): Settings = super.get(system)
override def createExtension(system: ExtendedActorSystem): Settings =
new Settings(system.settings.config.getConfig(configPath))
}
class Settings(config: Config) extends Extension {
import akka.util.Helpers._
val serialize: Boolean = config.getBoolean("serialize")
val assertTimeout: FiniteDuration = config.getMillisDuration("assert-timeout")
val pollInterval: FiniteDuration = config.getMillisDuration("assert-poll-interval")
}
}

View file

@ -0,0 +1,336 @@
/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.scaladsl
import akka.persistence.testkit.ProcessingPolicy.DefaultPolicies
import akka.persistence.testkit.internal.TestKitStorage
import akka.persistence.testkit.{ ExpectedFailure, ExpectedRejection }
import akka.testkit.TestKitBase
import akka.util
import akka.util.BoxedType
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
private[testkit] trait RejectSupport[U] {
this: PolicyOpsTestKit[U] with HasStorage[U, _] =>
/**
* Reject `n` following journal operations depending on the condition `cond`.
* Rejection triggers, when `cond` returns true.
* Reject operations with default `ExpectedRejection` exception.
*/
def rejectNextNOpsCond(cond: (String, U) => Boolean, n: Int): Unit =
rejectNextNOpsCond(cond, n, ExpectedRejection)
/**
* Reject `n` following journal operations depending on the condition `cond`.
* Rejection triggers, when `cond` returns true.
* Rejects operations with the `cause` exception.
*/
def rejectNextNOpsCond(cond: (String, U) => Boolean, n: Int, cause: Throwable): Unit = {
val current = storage.currentPolicy
val pol = new Policies.RejectNextNCond(n, cause, cond, withPolicy(current))
withPolicy(pol)
}
/**
* Reject n following journal operations regardless of their type.
* Rejects operations with default `ExpectedRejection` exception.
*/
def rejectNextNOps(n: Int): Unit =
rejectNextNOps(n, ExpectedRejection)
/**
* Reject `n` following journal operations regardless of their type.
* Rejects operations with the `cause` exception.
*/
def rejectNextNOps(n: Int, cause: Throwable): Unit = {
val current = storage.currentPolicy
val pol = new Policies.RejectNextN(n, cause, withPolicy(current))
withPolicy(pol)
}
}
private[testkit] trait PolicyOpsTestKit[P] extends {
this: HasStorage[P, _] =>
private[testkit] val Policies: DefaultPolicies[P]
/**
* Fail `n` following journal operations depending on the condition `cond`.
* Failure triggers, when `cond` returns true.
* Fails operations with default `ExpectedFailure` exception.
*/
def failNextNOpsCond(cond: (String, P) => Boolean, n: Int): Unit =
failNextNOpsCond(cond, n, ExpectedFailure)
/**
* Fail `n` following journal operations depending on the condition `cond`.
* Failure triggers, when `cond` returns true.
* Fails operations with the `cause` exception.
*/
def failNextNOpsCond(cond: (String, P) => Boolean, n: Int, cause: Throwable): Unit = {
val current = storage.currentPolicy
val pol = new Policies.FailNextNCond(n, cause, cond, withPolicy(current))
withPolicy(pol)
}
/**
* Fail n following journal operations regardless of their type.
* Fails operations with default `ExpectedFailure` exception.
*/
def failNextNOps(n: Int): Unit =
failNextNOps(n, ExpectedFailure)
/**
* Fail `n` following journal operations regardless of their type.
* Fails operations with the `cause` exception.
*/
def failNextNOps(n: Int, cause: Throwable): Unit = {
val current = storage.currentPolicy
val pol = new Policies.FailNextN(n, cause, withPolicy(current))
withPolicy(pol)
}
/**
* Set new processing policy for journal operations.
* NOTE! Overrides previously invoked `failNext...` or `rejectNext...`
*/
def withPolicy(policy: Policies.PolicyType): this.type = {
storage.setPolicy(policy)
this
}
/**
* Returns default policy if it was changed by [[PolicyOpsTestKit.this.withPolicy()]].
*/
def returnDefaultPolicy(): Unit = storage.returnDefaultPolicy()
}
private[testkit] trait ExpectOps[U] {
this: HasStorage[_, U] =>
private[testkit] val probe: TestKitBase
import probe._
import akka.testkit._
private[testkit] def pollInterval: FiniteDuration
private[testkit] def maxTimeout: FiniteDuration
private[testkit] def reprToAny(repr: U): Any
/**
* Check that next persisted in storage for particular persistence id event/snapshot was `event`.
*/
def expectNextPersisted[A](persistenceId: String, event: A): A =
expectNextPersisted(persistenceId, event, maxTimeout)
/**
* Check for `max` time that next persisted in storage for particular persistence id event/snapshot was `event`.
*/
def expectNextPersisted[A](persistenceId: String, event: A, max: FiniteDuration): A = {
val nextInd = nextIndex(persistenceId)
val expected = Some(event)
val res = awaitAssert({
val actual = storage.findOneByIndex(persistenceId, nextInd).map(reprToAny)
assert(actual == expected, s"Failed to persist $event, got $actual instead")
actual
}, max = max.dilated, interval = pollInterval)
setIndex(persistenceId, nextInd + 1)
res.get.asInstanceOf[A]
}
/**
* Check that next persisted in storage for particular persistence id event/snapshot has expected type.
*/
def expectNextPersistedType[A](persistenceId: String)(implicit t: ClassTag[A]): A =
expectNextPersistedType(persistenceId, maxTimeout)
/**
* Check for `max` time that next persisted in storage for particular persistence id event/snapshot has expected type.
*/
def expectNextPersistedType[A](persistenceId: String, max: FiniteDuration)(implicit t: ClassTag[A]): A =
expectNextPersistedClass(persistenceId, t.runtimeClass.asInstanceOf[Class[A]], max)
/**
* Check that next persisted in storage for particular persistence id event/snapshot has expected type.
*/
def expectNextPersistedClass[A](persistenceId: String, cla: Class[A]): A =
expectNextPersistedClass(persistenceId, cla, maxTimeout)
/**
* Check for `max` time that next persisted in storage for particular persistence id event/snapshot has expected type.
*/
def expectNextPersistedClass[A](persistenceId: String, cla: Class[A], max: FiniteDuration): A = {
val nextInd = nextIndex(persistenceId)
val c = util.BoxedType(cla)
val res = awaitAssert({
val actual = storage.findOneByIndex(persistenceId, nextInd).map(reprToAny)
assert(actual.isDefined, s"Expected: $cla but got no event")
val a = actual.get
assert(c.isInstance(a), s"Expected: $cla but got unexpected ${a.getClass}")
a.asInstanceOf[A]
}, max.dilated, interval = pollInterval)
setIndex(persistenceId, nextInd + 1)
res
}
/**
* Check that nothing was persisted in storage for particular persistence id.
*/
def expectNothingPersisted(persistenceId: String): Unit =
expectNothingPersisted(persistenceId, maxTimeout)
/**
* Check for `max` time that nothing was persisted in storage for particular persistence id.
*/
def expectNothingPersisted(persistenceId: String, max: FiniteDuration): Unit = {
val nextInd = nextIndex(persistenceId)
assertForDuration({
val actual = storage.findOneByIndex(persistenceId, nextInd).map(reprToAny)
val res = actual.isEmpty
assert(res, s"Found persisted event $actual, but expected None instead")
}, max = max.dilated, interval = pollInterval)
}
/**
* Receive for `max` time next `n` events/snapshots that have been persisted in the storage.
*/
def receivePersisted[A](persistenceId: String, n: Int, max: FiniteDuration)(
implicit t: ClassTag[A]): immutable.Seq[A] =
receivePersisted(persistenceId, n, t.runtimeClass.asInstanceOf[Class[A]], max)
/**
* Receive next `n` events/snapshots that have been persisted in the storage.
*/
def receivePersisted[A](persistenceId: String, n: Int)(implicit t: ClassTag[A]): immutable.Seq[A] =
receivePersisted(persistenceId, n, t.runtimeClass.asInstanceOf[Class[A]], maxTimeout)
/**
* Receive next `n` events/snapshots that have been persisted in the storage.
*/
def receivePersisted[A](persistenceId: String, n: Int, cla: Class[A]): immutable.Seq[A] =
receivePersisted(persistenceId, n, cla, maxTimeout)
/**
* Receive for `max` time next `n` events/snapshots that have been persisted in the storage.
*/
def receivePersisted[A](persistenceId: String, n: Int, cla: Class[A], max: FiniteDuration): immutable.Seq[A] = {
val nextInd = nextIndex(persistenceId)
val bt = BoxedType(cla)
val res =
awaitAssert(
{
val actual = storage.findMany(persistenceId, nextInd, n)
actual match {
case Some(reprs) =>
val ls = reprs.map(reprToAny)
val filtered = ls.filter(e => !bt.isInstance(e))
assert(ls.size == n, s"Could read only ${ls.size} events instead of expected $n")
assert(filtered.isEmpty, s"Persisted events $filtered do not correspond to expected type")
case None => assert(false, "No events were persisted")
}
actual.get.map(reprToAny)
},
max = max.dilated,
interval = pollInterval)
setIndex(persistenceId, nextInd + n)
res.asInstanceOf[immutable.Seq[A]]
}
}
private[testkit] trait ClearOps {
this: HasStorage[_, _] =>
/**
* Clear all data from the storage.
*
* NOTE! Also clears sequence numbers in storage!
*
* @see [[ClearPreservingSeqNums.clearAllPreservingSeqNumbers()]]
*/
def clearAll(): Unit = {
storage.clearAll()
clearIndexStorage()
}
/**
* Clear all data from the storage for particular persistence id.
*
* NOTE! Also clears sequence number in the storage!
*
* @see [[ClearPreservingSeqNums.clearByIdPreservingSeqNumbers()]]
*/
def clearByPersistenceId(persistenceId: String): Unit = {
storage.removeKey(persistenceId)
removeLastIndex(persistenceId)
}
}
private[testkit] trait ClearPreservingSeqNums {
this: HasStorage[_, _] =>
/**
* Clear all data in the storage preserving sequence numbers.
*
* @see [[ClearOps.clearAll()]]
*/
def clearAllPreservingSeqNumbers(): Unit = {
storage.clearAllPreservingSeqNumbers()
clearIndexStorage()
}
/**
* Clear all data in the storage for particular persistence id preserving sequence numbers.
*
* @see [[ClearOps.clearByPersistenceId()]]
*/
def clearByIdPreservingSeqNumbers(persistenceId: String): Unit = {
storage.removePreservingSeqNumber(persistenceId)
removeLastIndex(persistenceId)
}
}
/**
* Abstract persistent storage for tests.
* Has additional methods for keeping track of the indexes of last events persisted in the storage during test.
*/
private[testkit] trait HasStorage[P, R] {
protected def storage: TestKitStorage[P, R]
//todo needs to be thread safe (atomic read-increment-write) for parallel tests. Do we need parallel tests support?
@volatile
private var nextIndexByPersistenceId: immutable.Map[String, Int] = Map.empty
private[testkit] def removeLastIndex(persistenceId: String): Unit =
nextIndexByPersistenceId -= persistenceId
private[testkit] def clearIndexStorage(): Unit =
nextIndexByPersistenceId = Map.empty
private[testkit] def nextIndex(persistenceId: String): Int =
nextIndexByPersistenceId.getOrElse(persistenceId, 0)
private[testkit] def setIndex(persistenceId: String, index: Int): Unit =
nextIndexByPersistenceId += persistenceId -> index
private[testkit] def addToIndex(persistenceId: String, add: Int): Unit = {
val nextInd = nextIndexByPersistenceId.getOrElse(persistenceId, 0)
nextIndexByPersistenceId += (persistenceId -> (nextInd + add))
}
}

View file

@ -0,0 +1,105 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit
import java.util.UUID
import akka.actor.{ ActorRef, ActorSystem }
import akka.persistence._
import akka.testkit.TestKitBase
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
trait CommonUtils extends AnyWordSpecLike with TestKitBase {
protected def randomPid() = UUID.randomUUID().toString
import akka.util.ccompat.JavaConverters._
def initSystemWithEnabledPlugin(name: String, serializeMessages: Boolean, serializeSnapshots: Boolean) =
ActorSystem(
s"persistence-testkit-$name",
PersistenceTestKitSnapshotPlugin.config
.withFallback(PersistenceTestKitPlugin.config)
.withFallback(
ConfigFactory.parseMap(
Map(
// testing serialization of the events when persisting in the storage
// using default java serializers for convenience
"akka.actor.allow-java-serialization" -> true,
"akka.persistence.testkit.events.serialize" -> serializeMessages,
"akka.persistence.testkit.snapshots.serialize" -> serializeSnapshots).asJava))
.withFallback(ConfigFactory.parseString("akka.loggers = [\"akka.testkit.TestEventListener\"]"))
.withFallback(ConfigFactory.defaultApplication()))
}
case class NewSnapshot(state: Any)
case object DeleteAllMessages
case class DeleteSomeSnapshot(seqNum: Long)
case class DeleteSomeSnapshotByCriteria(crit: SnapshotSelectionCriteria)
case object AskMessageSeqNum
case object AskSnapshotSeqNum
case class DeleteSomeMessages(upToSeqNum: Long)
class C
case class B(i: Int)
class A(pid: String, notifyOnStateChange: Option[ActorRef]) extends PersistentActor {
import scala.collection.immutable
var recovered = immutable.List.empty[Any]
var snapshotState = 0
override def receiveRecover = {
case SnapshotOffer(_, snapshot: Int) =>
snapshotState = snapshot
case RecoveryCompleted =>
notifyOnStateChange.foreach(_ ! Tuple2(recovered, snapshotState))
case s => recovered :+= s
}
override def receiveCommand = {
case AskMessageSeqNum =>
notifyOnStateChange.foreach(_ ! lastSequenceNr)
case AskSnapshotSeqNum =>
notifyOnStateChange.foreach(_ ! snapshotSequenceNr)
case d @ DeleteMessagesFailure(_, _) =>
notifyOnStateChange.foreach(_ ! d)
case d @ DeleteMessagesSuccess(_) =>
notifyOnStateChange.foreach(_ ! d)
case s: SnapshotProtocol.Response =>
notifyOnStateChange.foreach(_ ! s)
case DeleteAllMessages =>
deleteMessages(lastSequenceNr)
case DeleteSomeSnapshot(sn) =>
deleteSnapshot(sn)
case DeleteSomeSnapshotByCriteria(crit) =>
deleteSnapshots(crit)
case DeleteSomeMessages(sn) =>
deleteMessages(sn)
case NewSnapshot(state: Int) =>
snapshotState = state: Int
saveSnapshot(state)
case NewSnapshot(other) =>
saveSnapshot(other)
case s =>
persist(s) { _ =>
sender() ! s
}
}
override def persistenceId = pid
}
trait TestCommand
case class Cmd(data: String) extends TestCommand
case object Passivate extends TestCommand
case class Evt(data: String)
case class EmptyState()
case object Recovered
case object Stopped

View file

@ -0,0 +1,523 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.javadsl
import akka.actor.Props
import akka.persistence._
import akka.persistence.testkit._
import akka.testkit.EventFilter
import org.scalatest.matchers.should.Matchers._
import akka.util.ccompat.JavaConverters._
import akka.japi.Pair
import akka.actor.typed.javadsl.Adapter
trait CommonSnapshotTests extends JavaDslUtils {
lazy val testKit = new SnapshotTestKit(system)
import testKit._
def specificTests(): Unit
"SnapshotTestKit" should {
"work with typed actors" in {
val pid = randomPid()
val act = Adapter.spawn(system, eventSourcedBehavior(pid), pid)
act ! Cmd("")
testKit.expectNextPersisted(pid, EmptyState())
testKit.expectNothingPersisted(pid)
}
"save snapshot" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
a ! NewSnapshot(1: Any)
a ! NewSnapshot(2: Any)
expectNextPersisted(pid, 1)
assertThrows[AssertionError] {
expectNextPersisted(pid, 3)
}
expectNextPersisted(pid, 2)
assertThrows[AssertionError] {
expectNextPersisted(pid, 3)
}
}
"successfully set and execute custom policy" in {
val pid = randomPid()
val err = new Exception("BOOM!")
val newPolicy = new SnapshotStorage.SnapshotPolicies.PolicyType {
override def tryProcess(persistenceId: String, processingUnit: SnapshotOperation): ProcessingResult = {
processingUnit match {
case WriteSnapshot(_, msgs) =>
val ex = msgs match {
case 777 => true
case _ => false
}
if (ex) {
ProcessingSuccess
} else {
StorageFailure(err)
}
case _ => ProcessingSuccess
}
}
}
withPolicy(newPolicy)
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
a ! NewSnapshot(1)
expectMsg((List.empty, 0))
expectMsgPF() { case SaveSnapshotFailure(_, ee) if ee.getMessage == err.getMessage => }
a ! NewSnapshot(777)
expectMsgPF() { case SaveSnapshotSuccess(_) => }
expectNextPersisted(pid, 777)
returnDefaultPolicy()
}
"expect next N valid snapshots in order" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
a ! NewSnapshot(2)
a ! NewSnapshot(1)
assertThrows[AssertionError] {
receivePersisted(pid, 3, classOf[Int])
}
assertThrows[AssertionError] {
receivePersisted(pid, 2, classOf[String])
}
assertThrows[AssertionError] {
receivePersisted(pid, 3, classOf[Int])
}
val li = receivePersisted(pid, 2, classOf[Int])
(li should contain).theSameElementsInOrderAs(List(2, 1))
}
"fail to receive" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
a ! NewSnapshot(2)
a ! NewSnapshot("data")
assertThrows[AssertionError] {
receivePersisted(pid, 3, classOf[Int])
}
assertThrows[AssertionError] {
receivePersisted(pid, 2, classOf[String])
}
assertThrows[AssertionError] {
receivePersisted(pid, 3, classOf[Int])
}
val li = receivePersisted(pid, 2, classOf[Any])
(li should contain).theSameElementsInOrderAs(List(2, "data"))
}
"fail next snapshot" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
//consecutive calls should stack
failNextPersisted()
failNextPersisted()
a ! NewSnapshot(1)
expectMsg((List.empty, 0))
expectMsgPF() { case SaveSnapshotFailure(_, ExpectedFailure) => }
val b = system.actorOf(Props(classOf[A], pid, Some(testActor)))
b ! NewSnapshot(2)
expectMsg((List.empty, 0))
expectMsgPF() { case SaveSnapshotFailure(_, ExpectedFailure) => }
val c = system.actorOf(Props(classOf[A], pid, None))
c ! NewSnapshot(3)
expectNextPersisted(pid, 3)
}
"fail next snapshot with custom error" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
val err = new Exception("Custom ERROR!")
failNextPersisted(err)
a ! NewSnapshot(1)
expectMsg((List.empty, 0))
expectMsgPF() { case SaveSnapshotFailure(_, ee) if err.getMessage == ee.getMessage => }
}
"expect nothingPersisted fails" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
expectNothingPersisted(pid)
a ! NewSnapshot(1)
assertThrows[AssertionError] {
expectNothingPersisted(pid)
}
}
"expect no snapshot persisted" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
expectNothingPersisted(pid)
a ! NewSnapshot(1)
expectNextPersisted(pid, 1)
expectNothingPersisted(pid)
}
"fail recovery" in {
val pid = randomPid()
failNextNOps(1)
val a = system.actorOf(Props(classOf[A], pid, None))
watch(a)
expectTerminated(a)
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 0))
}
"recover last persisted snapshot" in {
val pid = randomPid()
val preload = List((SnapshotMeta(0), 1), (SnapshotMeta(1), 2), (SnapshotMeta(2), 3))
.map(tpl => Pair[SnapshotMeta, Any](tpl._1, tpl._2))
.asJava
persistForRecovery(pid, preload)
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 3))
}
"fail to recover persisted snapshots for any actor" in {
val pid = randomPid()
val preload = List((SnapshotMeta(0), 1), (SnapshotMeta(1), 2), (SnapshotMeta(2), 3))
.map(tpl => Pair[SnapshotMeta, Any](tpl._1, tpl._2))
.asJava
persistForRecovery(pid, preload)
failNextRead()
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
watch(a)
expectTerminated(a)
}
"fail to recover persisted snapshots for any actor with custom error" in {
val pid = randomPid()
val preload = List((SnapshotMeta(0), 1), (SnapshotMeta(1), 2), (SnapshotMeta(2), 3))
.map(tpl => Pair[SnapshotMeta, Any](tpl._1, tpl._2))
.asJava
val err = new Exception("Custom ERROR!")
persistForRecovery(pid, preload)
failNextRead(err)
EventFilter.error(err.getMessage, occurrences = 1).intercept {
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
watch(a)
expectTerminated(a)
}
}
"fail to recover persisted snapshots for actor with particular persistenceId" in {
val pid = randomPid()
val preload = List((SnapshotMeta(0), 1), (SnapshotMeta(1), 2), (SnapshotMeta(2), 3))
.map(tpl => Pair[SnapshotMeta, Any](tpl._1, tpl._2))
.asJava
persistForRecovery(pid, preload)
failNextRead(pid)
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
watch(a)
expectTerminated(a)
}
"recover last persisted snapshot when fail for different persistenceId is set" in {
val pid = randomPid()
val preload = List((SnapshotMeta(0), 1), (SnapshotMeta(1), 2), (SnapshotMeta(2), 3))
.map(tpl => Pair[SnapshotMeta, Any](tpl._1, tpl._2))
.asJava
persistForRecovery(pid, preload)
val otherPid = randomPid()
failNextRead(otherPid)
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 3))
}
"persist and return persisted snapshots" in {
val pid = randomPid()
val saved = List((SnapshotMeta(0), 1), (SnapshotMeta(1), 2), (SnapshotMeta(2), 3))
.map(tpl => Pair[SnapshotMeta, Any](tpl._1, tpl._2))
.asJava
persistForRecovery(pid, saved)
val li = persistedInStorage(pid).asScala
(li should contain).theSameElementsInOrderAs(saved.asScala)
}
"fail next snapshot delete for any actor" in {
val pid = randomPid()
persistForRecovery(pid, List((SnapshotMeta(0), 1)).map(tpl => Pair[SnapshotMeta, Any](tpl._1, tpl._2)).asJava)
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
failNextDelete()
a ! DeleteSomeSnapshot(0)
expectMsg((List.empty, 1))
expectMsgPF() { case DeleteSnapshotFailure(_, ExpectedFailure) => }
a ! DeleteSomeSnapshot(0)
expectMsgPF() { case DeleteSnapshotSuccess(meta) if meta.sequenceNr == 0 => }
expectNoMessage()
}
"fail next snapshot delete for any actor with custom error" in {
val pid = randomPid()
persistForRecovery(pid, List((SnapshotMeta(0), 1)).map(tpl => Pair[SnapshotMeta, Any](tpl._1, tpl._2)).asJava)
val err = new Exception("Custom ERROR!")
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
failNextDelete(err)
a ! DeleteSomeSnapshot(0)
expectMsg((List.empty, 1))
expectMsgPF() { case DeleteSnapshotFailure(_, ee) if ee.getMessage == err.getMessage => }
a ! DeleteSomeSnapshot(0)
expectMsgPF() { case DeleteSnapshotSuccess(meta) if meta.sequenceNr == 0 => }
expectNoMessage()
}
"fail next delete for particular persistence id" in {
val pid = randomPid()
persistForRecovery(
pid,
List((SnapshotMeta(0), 1), (SnapshotMeta(1), 2), (SnapshotMeta(2), 3))
.map(tpl => Pair[SnapshotMeta, Any](tpl._1, tpl._2))
.asJava)
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
failNextDelete(pid)
a ! DeleteSomeSnapshot(0)
expectMsg((List.empty, 3))
expectMsgPF() { case DeleteSnapshotFailure(_, ExpectedFailure) => }
a ! DeleteSomeSnapshot(0)
expectMsgPF() { case DeleteSnapshotSuccess(meta) if meta.sequenceNr == 0 => }
expectNoMessage()
}
"not fail next delete for other persistence id" in {
val pid = randomPid()
persistForRecovery(
pid,
List((SnapshotMeta(0), 1), (SnapshotMeta(1), 2), (SnapshotMeta(2), 3))
.map(tpl => Pair[SnapshotMeta, Any](tpl._1, tpl._2))
.asJava)
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
val other = randomPid()
failNextDelete(other)
a ! DeleteSomeSnapshotByCriteria(SnapshotSelectionCriteria.Latest)
expectMsg((List.empty, 3))
expectMsgPF() { case DeleteSnapshotsSuccess(SnapshotSelectionCriteria.Latest) => }
}
"clear all" in {
val pid = randomPid()
persistForRecovery(
pid,
List((SnapshotMeta(0), 1), (SnapshotMeta(1), 2), (SnapshotMeta(2), 3))
.map(tpl => Pair[SnapshotMeta, Any](tpl._1, tpl._2))
.asJava)
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 3))
clearAll()
val aa = system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 0))
aa ! AskSnapshotSeqNum
expectMsg(0L)
}
"clear all for particular persistence id" in {
val pid = randomPid()
persistForRecovery(
pid,
List((SnapshotMeta(0), 1), (SnapshotMeta(1), 2), (SnapshotMeta(2), 3))
.map(tpl => Pair[SnapshotMeta, Any](tpl._1, tpl._2))
.asJava)
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 3))
clearByPersistenceId(pid)
val aa = system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 0))
aa ! AskSnapshotSeqNum
expectMsg(0L)
}
"preserve all for other persistence id" in {
val pid = randomPid()
persistForRecovery(
pid,
List((SnapshotMeta(0), 1), (SnapshotMeta(1), 2), (SnapshotMeta(2), 3))
.map(tpl => Pair[SnapshotMeta, Any](tpl._1, tpl._2))
.asJava)
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 3))
clearByPersistenceId(randomPid())
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 3))
}
specificTests()
}
}

View file

@ -0,0 +1,580 @@
/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.javadsl
import akka.actor.Props
import akka.persistence._
import akka.persistence.testkit._
import akka.testkit.EventFilter
import org.scalatest.matchers.should.Matchers._
import akka.util.ccompat.JavaConverters._
import akka.actor.typed.javadsl.Adapter
trait CommonTestKitTests extends JavaDslUtils {
lazy val testKit = new PersistenceTestKit(system)
import testKit._
def specificTests(): Unit
"PersistenceTestKit" should {
"work with typed actors" in {
val expectedId = randomPid()
val pid = randomPid()
val act = Adapter.spawn(system, eventSourcedBehavior(pid), pid)
act ! Cmd(expectedId)
testKit.expectNextPersisted(pid, Evt(expectedId))
testKit.expectNothingPersisted(pid)
}
"work with tagged events" in {
val expectedId = randomPid()
val pid = randomPid()
var act =
Adapter.spawn(system, eventSourcedBehavior(pid, true, Some(Adapter.toTyped[Any](testActor))), randomPid())
expectMsg(Recovered)
act ! Cmd(expectedId)
testKit.expectNextPersisted(pid, Evt(expectedId))
act ! Passivate
expectMsg(Stopped)
act = Adapter.spawn(system, eventSourcedBehavior(pid, true, Some(Adapter.toTyped[Any](testActor))), randomPid())
val expectedId2 = randomPid()
act ! Cmd(expectedId2)
expectMsg(Recovered)
testKit.expectNextPersisted(pid, Evt(expectedId2))
testKit.expectNothingPersisted(pid)
}
"expect next valid message" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
a ! B(1)
a ! B(2)
expectNextPersisted(pid, B(1))
assertThrows[AssertionError] {
expectNextPersisted(pid, B(3))
}
expectNextPersisted(pid, B(2))
assertThrows[AssertionError] {
expectNextPersisted(pid, B(3))
}
}
"expect next N valid messages in order" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
a ! B(1)
a ! B(2)
assertThrows[AssertionError] {
receivePersisted(pid, 3, classOf[B])
}
assertThrows[AssertionError] {
receivePersisted(pid, 2, classOf[C])
}
val li = receivePersisted(pid, 2, classOf[B])
(li should contain).theSameElementsInOrderAs(List(B(1), B(2)))
}
"successfully set and execute custom policy" in {
val pid = randomPid()
val err = new Exception("BOOM!")
val newPolicy = new EventStorage.JournalPolicies.PolicyType {
override def tryProcess(persistenceId: String, processingUnit: JournalOperation): ProcessingResult = {
processingUnit match {
case WriteEvents(msgs) =>
val ex = msgs.exists({
case B(666) => true
case _ => false
})
if (ex) {
ProcessingSuccess
} else {
StorageFailure(err)
}
case _ => ProcessingSuccess
}
}
}
withPolicy(newPolicy)
val a = system.actorOf(Props(classOf[A], pid, None))
EventFilter.error(err.getMessage, occurrences = 1).intercept {
a ! B(1)
}
watch(a)
expectTerminated(a)
val aa = system.actorOf(Props(classOf[A], pid, None))
aa ! B(666)
expectNextPersisted(pid, B(666))
returnDefaultPolicy()
}
"reject next persisted" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
//consecutive calls should stack
rejectNextPersisted()
rejectNextPersisted()
a ! B(1)
assertThrows[AssertionError] {
expectNextPersisted(pid, B(1))
}
a ! B(2)
assertThrows[AssertionError] {
expectNextPersisted(pid, B(2))
}
a ! B(3)
expectNextPersisted(pid, B(3))
}
"reject next persisted with custom exception" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
val err = new Exception("Custom ERROR!")
rejectNextPersisted(err)
EventFilter.error(err.getMessage, occurrences = 1).intercept {
a ! B(1)
}
a ! B(2)
expectNextPersisted(pid, B(2))
}
"fail next persisted" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
//consecutive calls should stack
failNextPersisted()
failNextPersisted()
a ! B(1)
watch(a)
expectTerminated(a)
val b = system.actorOf(Props(classOf[A], pid, None))
b ! B(2)
watch(b)
expectTerminated(b)
val c = system.actorOf(Props(classOf[A], pid, None))
c ! B(3)
expectNextPersisted(pid, B(3))
}
"fail next persisted with custom exception" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
val err = new Exception("Custom ERROR!")
failNextPersisted(err)
EventFilter.error(err.getMessage, occurrences = 1).intercept {
a ! B(1)
}
watch(a)
expectTerminated(a)
}
"expect nothingPersisted fails" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
expectNothingPersisted(pid)
a ! B(1)
assertThrows[AssertionError] {
expectNothingPersisted(pid)
}
}
"expect no message persisted" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
expectNothingPersisted(pid)
a ! B(1)
expectNextPersisted(pid, B(1))
expectNothingPersisted(pid)
}
"fail recovery" in {
val pid = randomPid()
failNextNOps(1)
val a = system.actorOf(Props(classOf[A], pid, None))
watch(a)
expectTerminated(a)
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 0))
}
"recover persisted messages" in {
val preload = List(B(1), B(2), B(3)).map(e => e: Any).asJava
val pid = randomPid()
persistForRecovery(pid, preload)
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((preload.asScala, 0))
}
"fail to recover persisted messages for any actor" in {
val preload = List(B(1), B(2), B(3)).map(e => e: Any).asJava
val pid = randomPid()
persistForRecovery(pid, preload)
failNextRead()
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
watch(a)
expectTerminated(a)
}
"fail to recover persisted messages for any actor with custom error" in {
val preload = List(B(1), B(2), B(3)).map(e => e: Any).asJava
val pid = randomPid()
val err = new Exception("BOOM!")
persistForRecovery(pid, preload)
failNextRead(err)
EventFilter.error(err.getMessage, occurrences = 1).intercept {
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
watch(a)
expectTerminated(a)
}
}
"fail to recover persisted messages for actor with particular persistenceId" in {
val preload = List(B(1), B(2), B(3)).map(e => e: Any).asJava
val pid = randomPid()
persistForRecovery(pid, preload)
failNextRead(pid)
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
watch(a)
expectTerminated(a)
}
"recover persisted messages when fail for different persistenceId is set" in {
val preload = List(B(1), B(2), B(3)).map(e => e: Any).asJava
val pid = randomPid()
persistForRecovery(pid, preload)
val otherPid = randomPid()
failNextRead(otherPid)
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((preload.asScala, 0))
}
"persist and return persisted messages" in {
val pid = randomPid()
val saved = List(B(1), B(2), B(3)).map(e => e: Any).asJava
persistForRecovery(pid, saved)
val li = persistedInStorage(pid).asScala
(li should contain).theSameElementsInOrderAs(saved.asScala)
}
"fail next delete for any actor" in {
val pid = randomPid()
val preload = List(B(1)).map(e => e: Any).asJava
persistForRecovery(pid, preload)
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
failNextDelete()
a ! DeleteAllMessages
expectMsg((preload.asScala, 0))
expectMsgPF() { case DeleteMessagesFailure(ExpectedFailure, _) => }
a ! DeleteAllMessages
expectMsgPF() { case DeleteMessagesSuccess(_) => }
}
"fail next delete for any actor with custom exception" in {
val pid = randomPid()
val err = new Exception("BOOM!")
val preload = List(B(1)).map(e => e: Any).asJava
persistForRecovery(pid, preload)
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
failNextDelete(err)
a ! DeleteAllMessages
expectMsg((preload.asScala, 0))
expectMsgPF() { case DeleteMessagesFailure(e, _) if e.getMessage == err.getMessage => }
a ! DeleteAllMessages
expectMsgPF() { case DeleteMessagesSuccess(_) => }
}
"fail next delete for particular persistence id" in {
val pid = randomPid()
val preload = List(B(1)).map(e => e: Any).asJava
persistForRecovery(pid, preload)
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
failNextDelete(pid)
a ! DeleteAllMessages
expectMsg((preload.asScala, 0))
expectMsgPF() { case DeleteMessagesFailure(ExpectedFailure, _) => }
a ! DeleteAllMessages
expectMsgPF() { case DeleteMessagesSuccess(_) => }
}
"not fail next delete for other persistence id" in {
val pid = randomPid()
persistForRecovery(pid, List(1).map(e => e: Any).asJava)
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
val other = randomPid()
failNextDelete(other)
a ! DeleteAllMessages
expectMsg((List(1), 0))
expectMsgPF() { case DeleteMessagesSuccess(_) => }
}
"clear all" in {
val pid = randomPid()
persistForRecovery(pid, List(B(1), B(2), B(3)).map(e => e: Any).asJava)
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List(B(1), B(2), B(3)), 0))
clearAll()
val aa = system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 0))
aa ! AskMessageSeqNum
expectMsg(0L)
}
"clear all for particular persistence id" in {
val pid = randomPid()
persistForRecovery(pid, List(B(1), B(2), B(3)).map(e => e: Any).asJava)
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List(B(1), B(2), B(3)), 0))
clearByPersistenceId(pid)
val aa = system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 0))
aa ! AskMessageSeqNum
expectMsg(0L)
}
"preserve all for other persistence id" in {
val pid = randomPid()
persistForRecovery(pid, List(B(1), B(2), B(3)).map(e => e: Any).asJava)
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List(B(1), B(2), B(3)), 0))
clearByPersistenceId(randomPid())
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List(B(1), B(2), B(3)), 0))
}
"clear all preserving seq nums" in {
val pid = randomPid()
persistForRecovery(pid, List(B(1), B(2), B(3)).map(e => e: Any).asJava)
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List(B(1), B(2), B(3)), 0))
clearAllPreservingSeqNumbers()
val aa = system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 0))
aa ! AskMessageSeqNum
expectMsg(3L)
}
"clear all preserving seq num for particular persistence id" in {
val pid = randomPid()
persistForRecovery(pid, List(B(1), B(2), B(3)).map(e => e: Any).asJava)
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List(B(1), B(2), B(3)), 0))
clearByIdPreservingSeqNumbers(pid)
val aa = system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 0))
aa ! AskMessageSeqNum
expectMsg(3L)
}
specificTests()
}
}

View file

@ -0,0 +1,51 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.javadsl
import java.util
import akka.actor.typed.ActorRef
import akka.persistence.typed.javadsl.{ CommandHandler, EventHandler, EventSourcedBehavior, SignalHandler }
import akka.persistence.testkit.{ Cmd, CommonUtils, EmptyState, Evt, Passivate, Recovered, Stopped, TestCommand }
import akka.persistence.typed.{ PersistenceId, RecoveryCompleted }
trait JavaDslUtils extends CommonUtils {
def eventSourcedBehavior(
pid: String,
setConstantTag: Boolean = false,
replyOnRecovery: Option[ActorRef[Any]] = None) =
new EventSourcedBehavior[TestCommand, Evt, EmptyState](PersistenceId.ofUniqueId(pid)) {
override protected def emptyState: EmptyState = EmptyState()
override protected def commandHandler(): CommandHandler[TestCommand, Evt, EmptyState] =
newCommandHandlerBuilder()
.forAnyState()
.onAnyCommand((command: TestCommand) => {
command match {
case Cmd(data) => Effect.persist(Evt(data))
case Passivate => Effect.stop().thenRun((_: EmptyState) => replyOnRecovery.foreach(_ ! Stopped))
}
})
override protected def eventHandler(): EventHandler[EmptyState, Evt] =
newEventHandlerBuilder().forAnyState().onAnyEvent(_ => emptyState)
override def shouldSnapshot(state: EmptyState, event: Evt, sequenceNr: Long): Boolean = true
override def signalHandler(): SignalHandler[EmptyState] =
newSignalHandlerBuilder().onSignal(RecoveryCompleted, _ => replyOnRecovery.foreach(_ ! Recovered)).build
override def tagsFor(event: Evt): util.Set[String] = {
if (setConstantTag) {
util.Collections.singleton("tag")
} else {
super.tagsFor(event)
}
}
}
}

View file

@ -0,0 +1,26 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.javadsl
import akka.actor.Props
import akka.persistence.testkit._
class SnapshotNotSerializeSpec extends CommonSnapshotTests {
override lazy val system = initSystemWithEnabledPlugin("SnapshotNotSerializeSpec", false, false)
import testKit._
override def specificTests(): Unit =
"succeed if trying to save nonserializable snapshot" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
val c = new C
a ! NewSnapshot(c)
expectNextPersisted(pid, c)
}
}

View file

@ -0,0 +1,27 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.javadsl
import java.io.NotSerializableException
import akka.actor.Props
import akka.persistence.SaveSnapshotFailure
import akka.persistence.testkit._
class SnapshotSerializeSpec extends CommonSnapshotTests {
override lazy val system = initSystemWithEnabledPlugin("SnapshotSerializeSpec", true, true)
override def specificTests(): Unit =
"fail if tries to save nonserializable snapshot" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
a ! NewSnapshot(new C)
expectMsg((List.empty, 0L))
expectMsgPF() { case SaveSnapshotFailure(_, _: NotSerializableException) => }
}
}

View file

@ -0,0 +1,25 @@
/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.javadsl
import akka.actor.Props
import akka.persistence.testkit._
class TestKitNotSerializeSpec extends CommonTestKitTests {
override lazy val system = initSystemWithEnabledPlugin("TestKitNotSerializeSpec", false, false)
import testKit._
override def specificTests() = "save next nonserializable persisted" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
val c = new C
a ! c
expectNextPersisted(pid, c)
}
}

View file

@ -0,0 +1,21 @@
/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.javadsl
import akka.actor.Props
import akka.persistence.testkit._
class TestKitSerializeSpec extends CommonTestKitTests {
override lazy val system = initSystemWithEnabledPlugin("TestKitSerializeSpec", true, true)
override def specificTests(): Unit = "fail next nonserializable persisted" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
a ! new C
watch(a)
expectTerminated(a)
}
}

View file

@ -0,0 +1,497 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.scaladsl
import akka.actor.Props
import akka.persistence._
import akka.persistence.testkit._
import akka.testkit.EventFilter
import org.scalatest.matchers.should.Matchers._
import akka.actor.typed.scaladsl.adapter._
trait CommonSnapshotTests extends ScalaDslUtils {
lazy val testKit = new SnapshotTestKit(system)
import testKit._
def specificTests(): Unit
"SnapshotTestKit" should {
"work with typed actors" in {
val pid = randomPid()
val act = system.spawn(eventSourcedBehavior(pid), pid)
act ! Cmd("")
testKit.expectNextPersisted(pid, EmptyState())
testKit.expectNothingPersisted(pid)
}
"save snapshot" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
a ! NewSnapshot(1: Any)
a ! NewSnapshot(2: Any)
expectNextPersisted(pid, 1)
assertThrows[AssertionError] {
expectNextPersisted(pid, 3)
}
expectNextPersisted(pid, 2)
assertThrows[AssertionError] {
expectNextPersisted(pid, 3)
}
}
"save snapshot and check type" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
a ! NewSnapshot(1: Any)
a ! NewSnapshot(2: Any)
expectNextPersistedType[Int](pid) should be(1)
assertThrows[AssertionError] {
expectNextPersistedType[String](pid)
}
expectNextPersistedType[Int](pid) should be(2)
assertThrows[AssertionError] {
expectNextPersistedType(pid)
}
}
"successfully set and execute custom policy" in {
val pid = randomPid()
val err = new Exception("BOOM!")
val newPolicy = new SnapshotStorage.SnapshotPolicies.PolicyType {
override def tryProcess(persistenceId: String, processingUnit: SnapshotOperation): ProcessingResult = {
processingUnit match {
case WriteSnapshot(_, msgs) =>
val ex = msgs match {
case 777 => true
case _ => false
}
if (ex) {
ProcessingSuccess
} else {
StorageFailure(err)
}
case _ => ProcessingSuccess
}
}
}
withPolicy(newPolicy)
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
a ! NewSnapshot(1)
expectMsg((List.empty, 0))
expectMsgPF() { case SaveSnapshotFailure(_, ee) if ee.getMessage == err.getMessage => }
a ! NewSnapshot(777)
expectMsgPF() { case SaveSnapshotSuccess(_) => }
expectNextPersisted(pid, 777)
returnDefaultPolicy()
}
"expect next N valid snapshots in order" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
a ! NewSnapshot(1)
a ! NewSnapshot(2)
assertThrows[AssertionError] {
receivePersisted[Int](pid, 3)
}
assertThrows[AssertionError] {
receivePersisted[String](pid, 2)
}
val li = receivePersisted[Int](pid, 2)
(li should contain).theSameElementsInOrderAs(List(1, 2))
}
"fail when receives wrong type" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
a ! NewSnapshot(1)
a ! NewSnapshot("data")
assertThrows[AssertionError] {
receivePersisted[Int](pid, 2)
}
}
"fail next snapshot" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
//consecutive calls should stack
failNextPersisted()
failNextPersisted()
a ! NewSnapshot(1)
expectMsg((List.empty, 0))
expectMsgPF() { case SaveSnapshotFailure(_, ExpectedFailure) => }
val b = system.actorOf(Props(classOf[A], pid, Some(testActor)))
b ! NewSnapshot(2)
expectMsg((List.empty, 0))
expectMsgPF() { case SaveSnapshotFailure(_, ExpectedFailure) => }
val c = system.actorOf(Props(classOf[A], pid, None))
c ! NewSnapshot(3)
expectNextPersisted(pid, 3)
}
"fail next snapshot with custom error" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
val err = new Exception("Custom ERROR!")
failNextPersisted(err)
a ! NewSnapshot(1)
expectMsg((List.empty, 0))
expectMsgPF() { case SaveSnapshotFailure(_, ee) if err.getMessage == ee.getMessage => }
}
"expect nothingPersisted fails" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
expectNothingPersisted(pid)
a ! NewSnapshot(1)
assertThrows[AssertionError] {
expectNothingPersisted(pid)
}
}
"expect no snapshot persisted" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
expectNothingPersisted(pid)
a ! NewSnapshot(1)
expectNextPersisted(pid, 1)
expectNothingPersisted(pid)
}
"fail recovery" in {
val pid = randomPid()
failNextNOps(1)
val a = system.actorOf(Props(classOf[A], pid, None))
watch(a)
expectTerminated(a)
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 0))
}
"recover last persisted snapshot" in {
val pid = randomPid()
val preload = List((SnapshotMeta(0), 1), (SnapshotMeta(1), 2), (SnapshotMeta(2), 3))
persistForRecovery(pid, preload)
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 3))
}
"fail to recover persisted snapshots for any actor" in {
val pid = randomPid()
val preload = List((SnapshotMeta(0), 1), (SnapshotMeta(1), 2), (SnapshotMeta(2), 3))
persistForRecovery(pid, preload)
failNextRead()
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
watch(a)
expectTerminated(a)
}
"fail to recover persisted snapshots for any actor with custom error" in {
val pid = randomPid()
val preload = List((SnapshotMeta(0), 1), (SnapshotMeta(1), 2), (SnapshotMeta(2), 3))
val err = new Exception("Custom ERROR!")
persistForRecovery(pid, preload)
failNextRead(err)
EventFilter.error(err.getMessage, occurrences = 1).intercept {
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
watch(a)
expectTerminated(a)
}
}
"fail to recover persisted snapshots for actor with particular persistenceId" in {
val pid = randomPid()
val preload = List((SnapshotMeta(0), 1), (SnapshotMeta(1), 2), (SnapshotMeta(2), 3))
persistForRecovery(pid, preload)
failNextRead(pid)
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
watch(a)
expectTerminated(a)
}
"recover last persisted snapshot when fail for different persistenceId is set" in {
val pid = randomPid()
val preload = List((SnapshotMeta(0), 1), (SnapshotMeta(1), 2), (SnapshotMeta(2), 3))
persistForRecovery(pid, preload)
val otherPid = randomPid()
failNextRead(otherPid)
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 3))
}
"persist and return persisted snapshots" in {
val pid = randomPid()
val saved = List((SnapshotMeta(0), 1), (SnapshotMeta(1), 2), (SnapshotMeta(2), 3))
persistForRecovery(pid, saved)
val li = persistedInStorage(pid)
(li should contain).theSameElementsInOrderAs(saved)
}
"fail next snapshot delete for any actor" in {
val pid = randomPid()
persistForRecovery(pid, List((SnapshotMeta(0), 1)))
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
failNextDelete()
a ! DeleteSomeSnapshot(0)
expectMsg((List.empty, 1))
expectMsgPF() { case DeleteSnapshotFailure(_, ExpectedFailure) => }
a ! DeleteSomeSnapshot(0)
expectMsgPF() { case DeleteSnapshotSuccess(meta) if meta.sequenceNr == 0 => }
expectNoMessage()
}
"fail next snapshot delete for any actor with custom error" in {
val pid = randomPid()
persistForRecovery(pid, List((SnapshotMeta(0), 1)))
val err = new Exception("Custom ERROR!")
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
failNextDelete(err)
a ! DeleteSomeSnapshot(0)
expectMsg((List.empty, 1))
expectMsgPF() { case DeleteSnapshotFailure(_, ee) if ee.getMessage == err.getMessage => }
a ! DeleteSomeSnapshot(0)
expectMsgPF() { case DeleteSnapshotSuccess(meta) if meta.sequenceNr == 0 => }
expectNoMessage()
}
"fail next delete for particular persistence id" in {
val pid = randomPid()
persistForRecovery(pid, List((SnapshotMeta(0), 1), (SnapshotMeta(1), 2), (SnapshotMeta(2), 3)))
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
failNextDelete(pid)
a ! DeleteSomeSnapshot(0)
expectMsg((List.empty, 3))
expectMsgPF() { case DeleteSnapshotFailure(_, ExpectedFailure) => }
a ! DeleteSomeSnapshot(0)
expectMsgPF() { case DeleteSnapshotSuccess(meta) if meta.sequenceNr == 0 => }
expectNoMessage()
}
"not fail next delete for other persistence id" in {
val pid = randomPid()
persistForRecovery(pid, List((SnapshotMeta(0), 1), (SnapshotMeta(1), 2), (SnapshotMeta(2), 3)))
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
val other = randomPid()
failNextDelete(other)
a ! DeleteSomeSnapshotByCriteria(SnapshotSelectionCriteria.Latest)
expectMsg((List.empty, 3))
expectMsgPF() { case DeleteSnapshotsSuccess(SnapshotSelectionCriteria.Latest) => }
}
"clear all" in {
val pid = randomPid()
persistForRecovery(pid, List((SnapshotMeta(0), 1), (SnapshotMeta(1), 2), (SnapshotMeta(2), 3)))
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 3))
clearAll()
val aa = system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 0))
aa ! AskSnapshotSeqNum
expectMsg(0L)
}
"clear all for particular persistence id" in {
val pid = randomPid()
persistForRecovery(pid, List((SnapshotMeta(0), 1), (SnapshotMeta(1), 2), (SnapshotMeta(2), 3)))
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 3))
clearByPersistenceId(pid)
val aa = system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 0))
aa ! AskSnapshotSeqNum
expectMsg(0L)
}
"preserve all for other persistence id" in {
val pid = randomPid()
persistForRecovery(pid, List((SnapshotMeta(0), 1), (SnapshotMeta(1), 2), (SnapshotMeta(2), 3)))
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 3))
clearByPersistenceId(randomPid())
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 3))
}
specificTests()
}
}

View file

@ -0,0 +1,596 @@
/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.scaladsl
import akka.actor.Props
import akka.persistence.{ DeleteMessagesFailure, DeleteMessagesSuccess }
import akka.testkit.EventFilter
import akka.persistence.testkit._
import org.scalatest.matchers.should.Matchers._
import akka.actor.typed.scaladsl.adapter._
trait CommonTestKitTests extends ScalaDslUtils {
lazy val testKit = new PersistenceTestKit(system)
import testKit._
def specificTests(): Unit
"PersistenceTestKit" should {
"work with typed actors" in {
val expectedId = randomPid()
val pid = randomPid()
val act = system.spawn(eventSourcedBehavior(pid), pid)
act ! Cmd(expectedId)
testKit.expectNextPersisted(pid, Evt(expectedId))
testKit.expectNothingPersisted(pid)
}
"work with tagged events" in {
val expectedId = randomPid()
val pid = randomPid()
var act =
system.spawn(eventSourcedBehavior(pid, Some(testActor.toTyped[Any])).withTagger(_ => Set("tag")), randomPid())
expectMsg(Recovered)
act ! Cmd(expectedId)
testKit.expectNextPersisted(pid, Evt(expectedId))
act ! Passivate
expectMsg(Stopped)
act =
system.spawn(eventSourcedBehavior(pid, Some(testActor.toTyped[Any])).withTagger(_ => Set("tag")), randomPid())
val expectedId2 = randomPid()
act ! Cmd(expectedId2)
expectMsg(Recovered)
testKit.expectNextPersisted(pid, Evt(expectedId2))
testKit.expectNothingPersisted(pid)
}
"expect next valid message" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
a ! B(1)
a ! B(2)
expectNextPersisted(pid, B(1))
assertThrows[AssertionError] {
expectNextPersisted(pid, B(3))
}
expectNextPersisted(pid, B(2))
assertThrows[AssertionError] {
expectNextPersisted(pid, B(3))
}
}
"expect next valid message and check type" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
a ! B(1)
a ! B(2)
expectNextPersistedType[B](pid)
assertThrows[AssertionError] {
expectNextPersistedType[A](pid)
}
expectNextPersistedType[B](pid)
assertThrows[AssertionError] {
expectNextPersistedType[B](pid)
}
}
"expect next N valid messages in order" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
a ! B(1)
a ! B(2)
assertThrows[AssertionError] {
receivePersisted[B](pid, 3)
}
assertThrows[AssertionError] {
receivePersisted[C](pid, 2)
}
val li = receivePersisted[B](pid, 2)
(li should contain).theSameElementsInOrderAs(List(B(1), B(2)))
}
"successfully set and execute custom policy" in {
val pid = randomPid()
val err = new Exception("BOOM!")
val newPolicy = new EventStorage.JournalPolicies.PolicyType {
override def tryProcess(persistenceId: String, processingUnit: JournalOperation): ProcessingResult = {
processingUnit match {
case WriteEvents(msgs) =>
val ex = msgs.exists({
case B(666) => true
case _ => false
})
if (ex) {
ProcessingSuccess
} else {
StorageFailure(err)
}
case _ => ProcessingSuccess
}
}
}
withPolicy(newPolicy)
val a = system.actorOf(Props(classOf[A], pid, None))
EventFilter.error(err.getMessage, occurrences = 1).intercept {
a ! B(1)
}
watch(a)
expectTerminated(a)
val aa = system.actorOf(Props(classOf[A], pid, None))
aa ! B(666)
expectNextPersisted(pid, B(666))
returnDefaultPolicy()
}
"reject next persisted" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
//consecutive calls should stack
rejectNextPersisted()
rejectNextPersisted()
a ! B(1)
assertThrows[AssertionError] {
expectNextPersisted(pid, B(1))
}
a ! B(2)
assertThrows[AssertionError] {
expectNextPersisted(pid, B(2))
}
a ! B(3)
expectNextPersisted(pid, B(3))
}
"reject next persisted with custom exception" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
val err = new Exception("Custom ERROR!")
rejectNextPersisted(err)
EventFilter.error(err.getMessage, occurrences = 1).intercept {
a ! B(1)
}
a ! B(2)
expectNextPersisted(pid, B(2))
}
"fail next persisted" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
//consecutive calls should stack
failNextPersisted()
failNextPersisted()
a ! B(1)
watch(a)
expectTerminated(a)
val b = system.actorOf(Props(classOf[A], pid, None))
b ! B(2)
watch(b)
expectTerminated(b)
val c = system.actorOf(Props(classOf[A], pid, None))
c ! B(3)
expectNextPersisted(pid, B(3))
}
"fail next persisted with custom exception" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
val err = new Exception("Custom ERROR!")
failNextPersisted(err)
EventFilter.error(err.getMessage, occurrences = 1).intercept {
a ! B(1)
}
watch(a)
expectTerminated(a)
}
"expect nothingPersisted fails" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
expectNothingPersisted(pid)
a ! B(1)
assertThrows[AssertionError] {
expectNothingPersisted(pid)
}
}
"expect no message persisted" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
expectNothingPersisted(pid)
a ! B(1)
expectNextPersisted(pid, B(1))
expectNothingPersisted(pid)
}
"fail recovery" in {
val pid = randomPid()
failNextNOps(1)
val a = system.actorOf(Props(classOf[A], pid, None))
watch(a)
expectTerminated(a)
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 0))
}
"recover persisted messages" in {
val preload = List(B(1), B(2), B(3))
val pid = randomPid()
persistForRecovery(pid, preload)
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((preload, 0))
}
"fail to recover persisted messages for any actor" in {
val preload = List(B(1), B(2), B(3))
val pid = randomPid()
persistForRecovery(pid, preload)
failNextRead()
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
watch(a)
expectTerminated(a)
}
"fail to recover persisted messages for any actor with custom error" in {
val preload = List(B(1), B(2), B(3))
val pid = randomPid()
val err = new Exception("BOOM!")
persistForRecovery(pid, preload)
failNextRead(err)
EventFilter.error(err.getMessage, occurrences = 1).intercept {
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
watch(a)
expectTerminated(a)
}
}
"fail to recover persisted messages for actor with particular persistenceId" in {
val preload = List(B(1), B(2), B(3))
val pid = randomPid()
persistForRecovery(pid, preload)
failNextRead(pid)
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
watch(a)
expectTerminated(a)
}
"recover persisted messages when fail for different persistenceId is set" in {
val preload = List(B(1), B(2), B(3))
val pid = randomPid()
persistForRecovery(pid, preload)
val otherPid = randomPid()
failNextRead(otherPid)
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((preload, 0))
}
"persist and return persisted messages" in {
val pid = randomPid()
val saved = List(B(1), B(2), B(3))
persistForRecovery(pid, saved)
val li = persistedInStorage(pid)
(li should contain).theSameElementsInOrderAs(saved)
}
"fail next delete for any actor" in {
val pid = randomPid()
val preload = List(B(1))
persistForRecovery(pid, preload)
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
failNextDelete()
a ! DeleteAllMessages
expectMsg((preload, 0))
expectMsgPF() { case DeleteMessagesFailure(ExpectedFailure, _) => }
a ! DeleteAllMessages
expectMsgPF() { case DeleteMessagesSuccess(_) => }
}
"fail next delete for any actor with custom exception" in {
val pid = randomPid()
val err = new Exception("BOOM!")
val preload = List(B(1))
persistForRecovery(pid, preload)
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
failNextDelete(err)
a ! DeleteAllMessages
expectMsg((preload, 0))
expectMsgPF() { case DeleteMessagesFailure(e, _) if e.getMessage == err.getMessage => }
a ! DeleteAllMessages
expectMsgPF() { case DeleteMessagesSuccess(_) => }
}
"fail next delete for particular persistence id" in {
val pid = randomPid()
val preload = List(B(1))
persistForRecovery(pid, preload)
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
failNextDelete(pid)
a ! DeleteAllMessages
expectMsg((preload, 0))
expectMsgPF() { case DeleteMessagesFailure(ExpectedFailure, _) => }
a ! DeleteAllMessages
expectMsgPF() { case DeleteMessagesSuccess(_) => }
}
"not fail next delete for other persistence id" in {
val pid = randomPid()
persistForRecovery(pid, List(1))
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
val other = randomPid()
failNextDelete(other)
a ! DeleteAllMessages
expectMsg((List(1), 0))
expectMsgPF() { case DeleteMessagesSuccess(_) => }
}
"clear all" in {
val pid = randomPid()
persistForRecovery(pid, List(B(1), B(2), B(3)))
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List(B(1), B(2), B(3)), 0))
clearAll()
val aa = system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 0))
aa ! AskMessageSeqNum
expectMsg(0L)
}
"clear all for particular persistence id" in {
val pid = randomPid()
persistForRecovery(pid, List(B(1), B(2), B(3)))
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List(B(1), B(2), B(3)), 0))
clearByPersistenceId(pid)
val aa = system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 0))
aa ! AskMessageSeqNum
expectMsg(0L)
}
"preserve all for other persistence id" in {
val pid = randomPid()
persistForRecovery(pid, List(B(1), B(2), B(3)))
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List(B(1), B(2), B(3)), 0))
clearByPersistenceId(randomPid())
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List(B(1), B(2), B(3)), 0))
}
"clear all preserving seq nums" in {
val pid = randomPid()
persistForRecovery(pid, List(B(1), B(2), B(3)))
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List(B(1), B(2), B(3)), 0))
clearAllPreservingSeqNumbers()
val aa = system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 0))
aa ! AskMessageSeqNum
expectMsg(3L)
}
"clear all preserving seq num for particular persistence id" in {
val pid = randomPid()
persistForRecovery(pid, List(B(1), B(2), B(3)))
system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List(B(1), B(2), B(3)), 0))
clearByIdPreservingSeqNumbers(pid)
val aa = system.actorOf(Props(classOf[A], pid, Some(testActor)))
expectMsg((List.empty, 0))
aa ! AskMessageSeqNum
expectMsg(3L)
}
specificTests()
}
}

View file

@ -0,0 +1,43 @@
/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.scaladsl
import java.io.NotSerializableException
import akka.persistence.CapabilityFlag
import akka.persistence.journal.JournalSpec
import akka.persistence.snapshot.SnapshotStoreSpec
import akka.persistence.testkit._
import akka.persistence.testkit.EventStorage.JournalPolicies
import akka.persistence.testkit.Reject
import akka.persistence.testkit.internal.InMemStorageExtension
class PersistenceTestKitJournalCompatSpec extends JournalSpec(config = PersistenceTestKitPlugin.config) {
override def beforeAll(): Unit = {
super.beforeAll()
InMemStorageExtension(system).setPolicy(new JournalPolicies.PolicyType {
override def tryProcess(persistenceId: String, op: JournalOperation): ProcessingResult = {
op match {
case WriteEvents(batch) =>
val allSerializable =
batch.filter(_.isInstanceOf[AnyRef]).forall(_.isInstanceOf[java.io.Serializable])
if (allSerializable) {
ProcessingSuccess
} else {
Reject(new NotSerializableException("Some objects in the batch were not serializable"))
}
case _ => ProcessingSuccess
}
}
})
}
override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = true
}
class PersistenceTestKitSnapshotStoreCompatSpec
extends SnapshotStoreSpec(config = PersistenceTestKitSnapshotPlugin.config)

View file

@ -0,0 +1,25 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.scaladsl
import akka.actor.typed.ActorRef
import akka.persistence.typed.RecoveryCompleted
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior }
import akka.persistence.testkit.{ Cmd, CommonUtils, EmptyState, Evt, Passivate, Recovered, Stopped, TestCommand }
import akka.persistence.typed.PersistenceId
trait ScalaDslUtils extends CommonUtils {
def eventSourcedBehavior(pid: String, replyOnRecovery: Option[ActorRef[Any]] = None) =
EventSourcedBehavior[TestCommand, Evt, EmptyState](PersistenceId.ofUniqueId(pid), EmptyState(), (_, cmd) => {
cmd match {
case Cmd(data) => Effect.persist(Evt(data))
case Passivate => Effect.stop().thenRun(_ => replyOnRecovery.foreach(_ ! Stopped))
}
}, (_, _) => EmptyState()).snapshotWhen((_, _, _) => true).receiveSignal {
case (_, RecoveryCompleted) => replyOnRecovery.foreach(_ ! Recovered)
}
}

View file

@ -0,0 +1,26 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.scaladsl
import akka.actor.Props
import akka.persistence.testkit._
class SnapshotNotSerializeSpec extends CommonSnapshotTests {
override lazy val system = initSystemWithEnabledPlugin("SnapshotNotSerializeSpec", false, false)
import testKit._
override def specificTests(): Unit =
"succeed if trying to save nonserializable snapshot" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
val c = new C
a ! NewSnapshot(c)
expectNextPersisted(pid, c)
}
}

View file

@ -0,0 +1,27 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.scaladsl
import java.io.NotSerializableException
import akka.actor.Props
import akka.persistence.SaveSnapshotFailure
import akka.persistence.testkit._
class SnapshotSerializeSpec extends CommonSnapshotTests {
override lazy val system = initSystemWithEnabledPlugin("SnapshotSerializeSpec", true, true)
override def specificTests(): Unit =
"fail if tries to save nonserializable snapshot" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, Some(testActor)))
a ! NewSnapshot(new C)
expectMsg((List.empty, 0L))
expectMsgPF() { case SaveSnapshotFailure(_, _: NotSerializableException) => }
}
}

View file

@ -0,0 +1,25 @@
/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.scaladsl
import akka.actor.Props
import akka.persistence.testkit._
class TestKitNotSerializeSpec extends CommonTestKitTests {
override lazy val system = initSystemWithEnabledPlugin("TestKitNotSerializeSpec", false, false)
import testKit._
override def specificTests() = "save next nonserializable persisted" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
val c = new C
a ! c
expectNextPersisted(pid, c)
}
}

View file

@ -0,0 +1,22 @@
/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.scaladsl
import akka.actor.Props
import akka.persistence.testkit._
class TestKitSerializeSpec extends CommonTestKitTests {
override lazy val system = initSystemWithEnabledPlugin("TestKitSerializeSpec", true, true)
override def specificTests(): Unit = "fail next nonserializable persisted" in {
val pid = randomPid()
val a = system.actorOf(Props(classOf[A], pid, None))
a ! new C
watch(a)
expectTerminated(a)
}
}