Synchronous TestKit for EventSourcedBehavior, #23712 (#28952)

* using real EventSourcedBehaviorImpl
* using new inmem journal (PersistenceTestKit)
* advantages compared to a "fake" driver
  * no difference in implementation details from real thing
  * no limitations
  * less maintance
* added internal messsages to EventSourcedBehaviorImpl to be able to grab state
  and persistenceId
  * GetState as InternalProtocol instead of Signal so that it is stashed
* serialization checks, using SerializationTestKit
* better testKitGuardian naming to allow multiple PersistenceTestKit
* support testing of restart
* support failure testing by using PersistenceTestKit
* update doc sample
* apidoc, reference docs, and javadsl
This commit is contained in:
Patrik Nordwall 2020-04-29 22:06:42 +02:00 committed by GitHub
parent a910bee99f
commit ef79738373
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 1254 additions and 235 deletions

View file

@ -5,6 +5,7 @@
package akka.actor.testkit.typed.scaladsl
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Await
import scala.concurrent.duration._
@ -34,6 +35,8 @@ import akka.util.Timeout
object ActorTestKit {
private val testKitGuardianCounter = new AtomicInteger(0)
/**
* Create a testkit named from the ActorTestKit class.
*
@ -62,7 +65,12 @@ object ActorTestKit {
* using default configuration from the reference.conf resources that ship with the Akka libraries.
*/
def apply(system: ActorSystem[_]): ActorTestKit = {
val testKitGuardian = system.systemActorOf(ActorTestKitGuardian.testKitGuardian, "test")
val name = testKitGuardianCounter.incrementAndGet() match {
case 1 => "test"
case n => s"test-$n"
}
val testKitGuardian =
system.systemActorOf(ActorTestKitGuardian.testKitGuardian, name)
new ActorTestKit(system, testKitGuardian, settings = None)
}

View file

@ -4,33 +4,28 @@
package jdocs.akka.cluster.sharding.typed;
import org.scalatestplus.junit.JUnitSuite;
import static jdocs.akka.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity;
// #test
import java.math.BigDecimal;
import java.util.UUID;
import akka.actor.testkit.typed.javadsl.LogCapturing;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.typed.ActorRef;
import akka.persistence.testkit.javadsl.EventSourcedBehaviorTestKit;
import akka.persistence.testkit.javadsl.EventSourcedBehaviorTestKit.CommandResultWithReply;
import akka.persistence.typed.PersistenceId;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import akka.actor.testkit.typed.javadsl.LogCapturing;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.ActorRef;
import akka.persistence.typed.PersistenceId;
import static org.junit.Assert.assertTrue;
// #test
// #test-events
import akka.actor.typed.eventstream.EventStream;
import akka.persistence.journal.inmem.InmemJournal;
// #test-events
import org.scalatestplus.junit.JUnitSuite;
import static jdocs.akka.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity;
// #test
public class AccountExampleDocTest
// #test
@ -38,100 +33,103 @@ public class AccountExampleDocTest
// #test
{
// #inmem-config
private static final String inmemConfig =
"akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n"
+ "akka.persistence.journal.inmem.test-serialization = on \n";
// #testkit
@ClassRule
public static final TestKitJunitResource testKit =
new TestKitJunitResource(EventSourcedBehaviorTestKit.config());
// #inmem-config
// #snapshot-store-config
private static final String snapshotConfig =
"akka.persistence.snapshot-store.plugin = \"akka.persistence.snapshot-store.local\" \n"
+ "akka.persistence.snapshot-store.local.dir = \"target/snapshot-"
+ UUID.randomUUID().toString()
+ "\" \n";
// #snapshot-store-config
private static final String config = inmemConfig + snapshotConfig;
@ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(config);
private EventSourcedBehaviorTestKit<
AccountEntity.Command, AccountEntity.Event, AccountEntity.Account>
eventSourcedTestKit =
EventSourcedBehaviorTestKit.create(
testKit.system(), AccountEntity.create("1", PersistenceId.of("Account", "1")));
// #testkit
@Rule public final LogCapturing logCapturing = new LogCapturing();
@Before
public void beforeEach() {
eventSourcedTestKit.clear();
}
@Test
public void createWithEmptyBalance() {
CommandResultWithReply<
AccountEntity.Command,
AccountEntity.Event,
AccountEntity.Account,
AccountEntity.OperationResult>
result = eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new);
assertEquals(AccountEntity.Confirmed.INSTANCE, result.reply());
assertEquals(AccountEntity.AccountCreated.INSTANCE, result.event());
assertEquals(BigDecimal.ZERO, result.stateOfType(AccountEntity.OpenedAccount.class).balance);
}
@Test
public void handleWithdraw() {
ActorRef<AccountEntity.Command> ref =
testKit.spawn(AccountEntity.create("1", PersistenceId.of("Account", "1")));
TestProbe<AccountEntity.OperationResult> probe =
testKit.createTestProbe(AccountEntity.OperationResult.class);
ref.tell(new AccountEntity.CreateAccount(probe.getRef()));
probe.expectMessage(AccountEntity.Confirmed.INSTANCE);
ref.tell(new AccountEntity.Deposit(BigDecimal.valueOf(100), probe.getRef()));
probe.expectMessage(AccountEntity.Confirmed.INSTANCE);
ref.tell(new AccountEntity.Withdraw(BigDecimal.valueOf(10), probe.getRef()));
probe.expectMessage(AccountEntity.Confirmed.INSTANCE);
eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new);
CommandResultWithReply<
AccountEntity.Command,
AccountEntity.Event,
AccountEntity.Account,
AccountEntity.OperationResult>
result1 =
eventSourcedTestKit.runCommand(
replyTo -> new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo));
assertEquals(AccountEntity.Confirmed.INSTANCE, result1.reply());
assertEquals(
BigDecimal.valueOf(100), result1.eventOfType(AccountEntity.Deposited.class).amount);
assertEquals(
BigDecimal.valueOf(100), result1.stateOfType(AccountEntity.OpenedAccount.class).balance);
CommandResultWithReply<
AccountEntity.Command,
AccountEntity.Event,
AccountEntity.Account,
AccountEntity.OperationResult>
result2 =
eventSourcedTestKit.runCommand(
replyTo -> new AccountEntity.Withdraw(BigDecimal.valueOf(10), replyTo));
assertEquals(AccountEntity.Confirmed.INSTANCE, result2.reply());
assertEquals(BigDecimal.valueOf(10), result2.eventOfType(AccountEntity.Withdrawn.class).amount);
assertEquals(
BigDecimal.valueOf(90), result2.stateOfType(AccountEntity.OpenedAccount.class).balance);
}
@Test
public void rejectWithdrawOverdraft() {
ActorRef<AccountEntity.Command> ref =
testKit.spawn(AccountEntity.create("2", PersistenceId.of("Account", "2")));
TestProbe<AccountEntity.OperationResult> probe =
testKit.createTestProbe(AccountEntity.OperationResult.class);
ref.tell(new AccountEntity.CreateAccount(probe.getRef()));
probe.expectMessage(AccountEntity.Confirmed.INSTANCE);
ref.tell(new AccountEntity.Deposit(BigDecimal.valueOf(100), probe.getRef()));
probe.expectMessage(AccountEntity.Confirmed.INSTANCE);
ref.tell(new AccountEntity.Withdraw(BigDecimal.valueOf(110), probe.getRef()));
probe.expectMessageClass(AccountEntity.Rejected.class);
eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new);
eventSourcedTestKit.runCommand(
(ActorRef<AccountEntity.OperationResult> replyTo) ->
new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo));
CommandResultWithReply<
AccountEntity.Command,
AccountEntity.Event,
AccountEntity.Account,
AccountEntity.OperationResult>
result =
eventSourcedTestKit.runCommand(
replyTo -> new AccountEntity.Withdraw(BigDecimal.valueOf(110), replyTo));
result.replyOfType(AccountEntity.Rejected.class);
assertTrue(result.hasNoEvents());
}
@Test
public void handleGetBalance() {
ActorRef<AccountEntity.Command> ref =
testKit.spawn(AccountEntity.create("3", PersistenceId.of("Account", "3")));
TestProbe<AccountEntity.OperationResult> opProbe =
testKit.createTestProbe(AccountEntity.OperationResult.class);
ref.tell(new AccountEntity.CreateAccount(opProbe.getRef()));
opProbe.expectMessage(AccountEntity.Confirmed.INSTANCE);
ref.tell(new AccountEntity.Deposit(BigDecimal.valueOf(100), opProbe.getRef()));
opProbe.expectMessage(AccountEntity.Confirmed.INSTANCE);
eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new);
eventSourcedTestKit.runCommand(
(ActorRef<AccountEntity.OperationResult> replyTo) ->
new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo));
TestProbe<AccountEntity.CurrentBalance> getProbe =
testKit.createTestProbe(AccountEntity.CurrentBalance.class);
ref.tell(new AccountEntity.GetBalance(getProbe.getRef()));
assertEquals(
BigDecimal.valueOf(100),
getProbe.expectMessageClass(AccountEntity.CurrentBalance.class).balance);
CommandResultWithReply<
AccountEntity.Command,
AccountEntity.Event,
AccountEntity.Account,
AccountEntity.CurrentBalance>
result = eventSourcedTestKit.runCommand(AccountEntity.GetBalance::new);
assertEquals(BigDecimal.valueOf(100), result.reply().balance);
}
// #test
// #test-events
@Test
public void storeEvents() {
TestProbe<InmemJournal.Operation> eventProbe = testKit.createTestProbe();
testKit
.system()
.eventStream()
.tell(new EventStream.Subscribe<>(InmemJournal.Operation.class, eventProbe.getRef()));
ActorRef<AccountEntity.Command> ref =
testKit.spawn(AccountEntity.create("4", PersistenceId.of("Account", "4")));
TestProbe<AccountEntity.OperationResult> probe =
testKit.createTestProbe(AccountEntity.OperationResult.class);
ref.tell(new AccountEntity.CreateAccount(probe.getRef()));
assertEquals(
AccountEntity.AccountCreated.INSTANCE,
eventProbe.expectMessageClass(InmemJournal.Write.class).event());
ref.tell(new AccountEntity.Deposit(BigDecimal.valueOf(100), probe.getRef()));
assertEquals(
BigDecimal.valueOf(100),
((AccountEntity.Deposited) eventProbe.expectMessageClass(InmemJournal.Write.class).event())
.amount);
}
// #test
// #test-events
}
// #test

View file

@ -5,102 +5,76 @@
package docs.akka.cluster.sharding.typed
//#test
import java.util.UUID
import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit
import akka.persistence.typed.PersistenceId
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.persistence.typed.PersistenceId
import org.scalatest.BeforeAndAfterEach
import org.scalatest.wordspec.AnyWordSpecLike
//#test
//#test-events
import akka.persistence.journal.inmem.InmemJournal
import akka.actor.typed.eventstream.EventStream
//#test-events
import docs.akka.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity
object AccountExampleDocSpec {
val inmemConfig =
//#inmem-config
"""
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
akka.persistence.journal.inmem.test-serialization = on
"""
//#inmem-config
val snapshotConfig =
//#snapshot-store-config
s"""
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshot-${UUID.randomUUID().toString}"
"""
//#snapshot-store-config
}
//#test
class AccountExampleDocSpec extends ScalaTestWithActorTestKit(s"""
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshot-${UUID.randomUUID().toString}"
""") with AnyWordSpecLike with LogCapturing {
//#testkit
class AccountExampleDocSpec
extends ScalaTestWithActorTestKit(EventSourcedBehaviorTestKit.config)
//#testkit
with AnyWordSpecLike
with BeforeAndAfterEach
with LogCapturing {
private val eventSourcedTestKit =
EventSourcedBehaviorTestKit[AccountEntity.Command[_], AccountEntity.Event, AccountEntity.Account](
system,
AccountEntity("1", PersistenceId("Account", "1")))
override protected def beforeEach(): Unit = {
super.beforeEach()
eventSourcedTestKit.clear()
}
"Account" must {
"be created with zero balance" in {
val result = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_))
result.reply shouldBe AccountEntity.Confirmed
result.event shouldBe AccountEntity.AccountCreated
result.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 0
}
"handle Withdraw" in {
val probe = createTestProbe[AccountEntity.OperationResult]()
val ref = spawn(AccountEntity("1", PersistenceId("Account", "1")))
ref ! AccountEntity.CreateAccount(probe.ref)
probe.expectMessage(AccountEntity.Confirmed)
ref ! AccountEntity.Deposit(100, probe.ref)
probe.expectMessage(AccountEntity.Confirmed)
ref ! AccountEntity.Withdraw(10, probe.ref)
probe.expectMessage(AccountEntity.Confirmed)
eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_))
val result1 = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Deposit(100, _))
result1.reply shouldBe AccountEntity.Confirmed
result1.event shouldBe AccountEntity.Deposited(100)
result1.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 100
val result2 = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Withdraw(10, _))
result2.reply shouldBe AccountEntity.Confirmed
result2.event shouldBe AccountEntity.Withdrawn(10)
result2.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 90
}
"reject Withdraw overdraft" in {
val probe = createTestProbe[AccountEntity.OperationResult]()
val ref = spawn(AccountEntity("2", PersistenceId("Account", "2")))
ref ! AccountEntity.CreateAccount(probe.ref)
probe.expectMessage(AccountEntity.Confirmed)
ref ! AccountEntity.Deposit(100, probe.ref)
probe.expectMessage(AccountEntity.Confirmed)
ref ! AccountEntity.Withdraw(110, probe.ref)
probe.expectMessageType[AccountEntity.Rejected]
eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_))
eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Deposit(100, _))
val result = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Withdraw(110, _))
result.replyOfType[AccountEntity.Rejected]
result.hasNoEvents shouldBe true
}
"handle GetBalance" in {
val opProbe = createTestProbe[AccountEntity.OperationResult]()
val ref = spawn(AccountEntity("3", PersistenceId("Account", "3")))
ref ! AccountEntity.CreateAccount(opProbe.ref)
opProbe.expectMessage(AccountEntity.Confirmed)
ref ! AccountEntity.Deposit(100, opProbe.ref)
opProbe.expectMessage(AccountEntity.Confirmed)
eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_))
eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Deposit(100, _))
val getProbe = createTestProbe[AccountEntity.CurrentBalance]()
ref ! AccountEntity.GetBalance(getProbe.ref)
getProbe.expectMessage(AccountEntity.CurrentBalance(100))
val result = eventSourcedTestKit.runCommand[AccountEntity.CurrentBalance](AccountEntity.GetBalance(_))
result.reply.balance shouldBe 100
result.hasNoEvents shouldBe true
}
//#test
//#test-events
"store events" in {
val eventProbe = createTestProbe[InmemJournal.Operation]()
system.eventStream ! EventStream.Subscribe(eventProbe.ref)
val probe = createTestProbe[AccountEntity.OperationResult]()
val ref = spawn(AccountEntity("4", PersistenceId("Account", "4")))
ref ! AccountEntity.CreateAccount(probe.ref)
eventProbe.expectMessageType[InmemJournal.Write].event should ===(AccountEntity.AccountCreated)
ref ! AccountEntity.Deposit(100, probe.ref)
probe.expectMessage(AccountEntity.Confirmed)
eventProbe.expectMessageType[InmemJournal.Write].event should ===(AccountEntity.Deposited(100))
}
//#test-events
//#test
}
}
//#test

View file

@ -1,49 +1,38 @@
# Testing
## Dependency
## Module info
To use Akka Persistence and Actor TestKit, add the module to your project:
To use Akka Persistence TestKit, add the module to your project:
@@dependency[sbt,Maven,Gradle] {
group1=com.typesafe.akka
artifact1=akka-persistence-typed_$scala.binary_version$
version1=$akka.version$
group2=com.typesafe.akka
artifact2=akka-actor-testkit-typed_$scala.binary_version$
artifact2=akka-persistence-testkit_$scala.binary_version$
version2=$akka.version$
scope2=test
}
@@project-info{ projectId="akka-persistence-testkit" }
## Unit testing
Unit testing of `EventSourcedBehavior` can be done with the @ref:[ActorTestKit](testing-async.md)
in the same way as other behaviors.
**Note!** The `EventSourcedBehaviorTestKit` is a new feature, api may have changes breaking source compatibility in future versions.
@ref:[Synchronous behavior testing](testing-sync.md) for `EventSourcedBehavior` is not supported yet, but
tracked in @github[issue #23712](#23712).
Unit testing of `EventSourcedBehavior` can be done with the @apidoc[EventSourcedBehaviorTestKit]. It supports running
one command at a time and you can assert that the synchronously returned result is as expected. The result contains the
events emitted by the command and the new state after applying the events. It also has support for verifying the reply
to a command.
You need to configure a journal, and the in-memory journal is sufficient for unit testing. To enable the
in-memory journal you need to pass the following configuration to the @scala[`ScalaTestWithActorTestKit`]@java[`TestKitJunitResource`].
You need to configure the `ActorSystem` with the `EventSourcedBehaviorTestKit.config`. The configuration enables
the in-memory journal and snapshot storage.
Scala
: @@snip [AccountExampleDocSpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleDocSpec.scala) { #inmem-config }
: @@snip [AccountExampleDocSpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleDocSpec.scala) { #testkit }
Java
: @@snip [AccountExampleDocTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java) { #inmem-config }
The `test-serialization = on` configuration of the `InmemJournal` will verify that persisted events can be serialized and deserialized.
Optionally you can also configure a snapshot store. To enable the file based snapshot store you need to pass the
following configuration to the @scala[`ScalaTestWithActorTestKit`]@java[`TestKitJunitResource`].
Scala
: @@snip [AccountExampleDocSpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleDocSpec.scala) { #snapshot-store-config }
Java
: @@snip [AccountExampleDocTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java) { #snapshot-store-config }
Then you can `spawn` the `EventSourcedBehavior` and verify the outcome of sending commands to the actor using
the facilities of the @ref:[ActorTestKit](testing-async.md).
: @@snip [AccountExampleDocTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java) { #testkit }
A full test for the `AccountEntity`, which is shown in the @ref:[Persistence Style Guide](persistence-style.md), may look like this:
@ -53,21 +42,19 @@ Scala
Java
: @@snip [AccountExampleDocTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java) { #test }
Note that each test case is using a different `PersistenceId` to not interfere with each other.
Serialization of commands, events and state are verified automatically. The serialization checks can be
customized with the `SerializationSettings` when creating the `EventSourcedBehaviorTestKit`. By default,
the serialization roundtrip is checked but the equality of the result of the serialization is not checked.
`equals` must be implemented @scala[(or using `case class`)] in the commands, events and state if `verifyEquality`
is enabled.
The @apidoc[akka.persistence.journal.inmem.InmemJournal$] publishes `Write` and `Delete` operations to the
`eventStream`, which makes it possible to verify that the expected events have been emitted and stored by the
`EventSourcedBehavior`. You can subscribe to to the `eventStream` with a `TestProbe` like this:
Scala
: @@snip [AccountExampleDocSpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleDocSpec.scala) { #test-events }
Java
: @@snip [AccountExampleDocTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java) { #test-events }
To test recovery the `restart` method of the `EventSourcedBehaviorTestKit` can be used. It will restart the
behavior, which will then recover from stored snapshot and events from previous commands. It's also possible
to populate the storage with events or simulate failures by using the underlying @apidoc[PersistenceTestKit].
## Persistence TestKit
**Note!** The testkit is a new feature, api may have changes breaking source compatibility in future versions.
**Note!** The `PersistenceTestKit` is a new feature, api may have changes breaking source compatibility in future versions.
Persistence testkit allows to check events saved in a storage, emulate storage operations and exceptions.
To use the testkit you need to add the following dependency in your project:
@ -187,8 +174,10 @@ to the @ref:[reference configuration](../general/configuration-reference.md#conf
## Integration testing
The in-memory journal and file based snapshot store can be used also for integration style testing of a single
`ActorSystem`, for example when using Cluster Sharding with a single Cluster node.
`EventSourcedBehavior` actors can be tested with the @ref:[ActorTestKit](testing-async.md) together with
other actors. The in-memory journal and snapshot storage from the @ref:[Persistence TestKit](#persistence-testkit)
can be used also for integration style testing of a single `ActorSystem`, for example when using Cluster Sharding
with a single Cluster node.
For tests that involve more than one Cluster node you have to use another journal and snapshot store.
While it's possible to use the @ref:[Persistence Plugin Proxy](../persistence-plugins.md#persistence-plugin-proxy)

View file

@ -0,0 +1,187 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.internal
import scala.collection.immutable
import scala.concurrent.duration._
import scala.reflect.ClassTag
import scala.util.control.NonFatal
import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.testkit.typed.scaladsl.SerializationTestKit
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.annotation.InternalApi
import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit
import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit.CommandResult
import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit.CommandResultWithReply
import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit.RestartResult
import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit.SerializationSettings
import akka.persistence.testkit.scaladsl.PersistenceTestKit
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.internal.EventSourcedBehaviorImpl
/**
* INTERNAL API
*/
@InternalApi private[akka] object EventSourcedBehaviorTestKitImpl {
final case class CommandResultImpl[Command, Event, State, Reply](
command: Command,
events: immutable.Seq[Event],
state: State,
replyOption: Option[Reply])
extends CommandResultWithReply[Command, Event, State, Reply] {
override def hasNoEvents: Boolean = events.isEmpty
override def event: Event = {
if (events.nonEmpty) events.head else throw new AssertionError("No events")
}
override def eventOfType[E <: Event: ClassTag]: E =
ofType(event, "event")
override def stateOfType[S <: State: ClassTag]: S =
ofType(state, "state")
override def reply: Reply = replyOption.getOrElse(throw new AssertionError("No reply"))
override def replyOfType[R <: Reply: ClassTag]: R =
ofType(reply, "reply")
// cast with nice error message
private def ofType[A: ClassTag](obj: Any, errorParam: String): A = {
obj match {
case a: A => a
case other =>
val expectedClass = implicitly[ClassTag[A]].runtimeClass
throw new AssertionError(
s"Expected $errorParam class [${expectedClass.getName}], " +
s"but was [${other.getClass.getName}]")
}
}
}
final case class RestartResultImpl[State](state: State) extends RestartResult[State]
}
/**
* INTERNAL API
*/
@InternalApi private[akka] class EventSourcedBehaviorTestKitImpl[Command, Event, State](
actorTestKit: ActorTestKit,
behavior: Behavior[Command],
serializationSettings: SerializationSettings)
extends EventSourcedBehaviorTestKit[Command, Event, State] {
import EventSourcedBehaviorTestKitImpl._
private def system: ActorSystem[_] = actorTestKit.system
override val persistenceTestKit: PersistenceTestKit = PersistenceTestKit(system)
private val probe = actorTestKit.createTestProbe[Any]()
private val stateProbe = actorTestKit.createTestProbe[State]()
private var actor: ActorRef[Command] = actorTestKit.spawn(behavior)
private def internalActor = actor.unsafeUpcast[Any]
private val persistenceId: PersistenceId = {
internalActor ! EventSourcedBehaviorImpl.GetPersistenceId(probe.ref)
try {
probe.expectMessageType[PersistenceId]
} catch {
case NonFatal(_) =>
throw new IllegalArgumentException("Only EventSourcedBehavior, or nested EventSourcedBehavior allowed.")
}
}
private val serializationTestKit = new SerializationTestKit(system)
private var emptyStateVerified = false
persistenceTestKit.clearByPersistenceId(persistenceId.id)
override def runCommand(command: Command): CommandResult[Command, Event, State] = {
if (serializationSettings.enabled && serializationSettings.verifyCommands)
verifySerializationAndThrow(command, "Command")
if (serializationSettings.enabled && !emptyStateVerified) {
internalActor ! EventSourcedBehaviorImpl.GetState(stateProbe.ref)
val emptyState = stateProbe.receiveMessage()
verifySerializationAndThrow(emptyState, "Empty State")
emptyStateVerified = true
}
// FIXME we can expand the api of persistenceTestKit to read from storage from a seqNr instead
val oldEvents =
persistenceTestKit.persistedInStorage(persistenceId.id).map(_.asInstanceOf[Event])
actor ! command
internalActor ! EventSourcedBehaviorImpl.GetState(stateProbe.ref)
val newState = stateProbe.receiveMessage()
val newEvents =
persistenceTestKit.persistedInStorage(persistenceId.id).map(_.asInstanceOf[Event]).drop(oldEvents.size)
if (serializationSettings.enabled) {
if (serializationSettings.verifyEvents) {
newEvents.foreach(verifySerializationAndThrow(_, "Event"))
}
if (serializationSettings.verifyState)
verifySerializationAndThrow(newState, "State")
}
CommandResultImpl[Command, Event, State, Nothing](command, newEvents, newState, None)
}
override def runCommand[R](creator: ActorRef[R] => Command): CommandResultWithReply[Command, Event, State, R] = {
val replyProbe = actorTestKit.createTestProbe[R]()
val command = creator(replyProbe.ref)
val result = runCommand(command)
val reply = try {
replyProbe.receiveMessage(Duration.Zero)
} catch {
case NonFatal(_) =>
throw new AssertionError(s"Missing expected reply for command [$command].")
} finally {
replyProbe.stop()
}
if (serializationSettings.enabled && serializationSettings.verifyCommands)
verifySerializationAndThrow(reply, "Reply")
CommandResultImpl[Command, Event, State, R](result.command, result.events, result.state, Some(reply))
}
override def restart(): RestartResult[State] = {
actorTestKit.stop(actor)
actor = actorTestKit.spawn(behavior)
internalActor ! EventSourcedBehaviorImpl.GetState(stateProbe.ref)
try {
val state = stateProbe.receiveMessage()
RestartResultImpl(state)
} catch {
case NonFatal(_) =>
throw new IllegalStateException("Could not restart. Maybe exception from event handler. See logs.")
}
}
override def clear(): Unit = {
persistenceTestKit.clearByPersistenceId(persistenceId.id)
restart()
}
private def verifySerializationAndThrow(obj: Any, errorMessagePrefix: String): Unit = {
try {
serializationTestKit.verifySerialization(obj, serializationSettings.verifyEquality)
} catch {
case NonFatal(exc) =>
throw new IllegalArgumentException(s"$errorMessagePrefix [$obj] isn't serializable.", exc)
}
}
}

View file

@ -0,0 +1,246 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.javadsl
import java.util.function.{ Function => JFunction }
import java.util.{ List => JList }
import scala.reflect.ClassTag
import com.typesafe.config.Config
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
import akka.persistence.testkit.scaladsl
import akka.util.ccompat.JavaConverters._
/**
* Testing of [[akka.persistence.typed.javadsl.EventSourcedBehavior]] implementations.
* It supports running one command at a time and you can assert that the synchronously returned result is as expected.
* The result contains the events emitted by the command and the new state after applying the events.
* It also has support for verifying the reply to a command.
*
* Serialization of commands, events and state are verified automatically.
*/
@ApiMayChange
object EventSourcedBehaviorTestKit {
/**
* The configuration to be included in the configuration of the `ActorSystem`. Typically used as
* constructor parameter to `TestKitJunitResource`. The configuration enables the in-memory
* journal and snapshot storage.
*/
val config: Config = scaladsl.EventSourcedBehaviorTestKit.config
val enabledSerializationSettings: SerializationSettings = new SerializationSettings(
enabled = true,
verifyEquality = false,
verifyCommands = true,
verifyEvents = true,
verifyState = true)
val disabledSerializationSettings: SerializationSettings = new SerializationSettings(
enabled = false,
verifyEquality = false,
verifyCommands = false,
verifyEvents = false,
verifyState = false)
/**
* Customization of which serialization checks that are performed.
* `equals` must be implemented (or using `case class`) when `verifyEquality` is enabled.
*/
final class SerializationSettings(
val enabled: Boolean,
val verifyEquality: Boolean,
val verifyCommands: Boolean,
val verifyEvents: Boolean,
val verifyState: Boolean) {
def withEnabled(value: Boolean): SerializationSettings =
copy(enabled = value)
def withVerifyEquality(value: Boolean): SerializationSettings =
copy(verifyEquality = value)
def withVerifyCommands(value: Boolean): SerializationSettings =
copy(verifyCommands = value)
def withVerifyEvents(value: Boolean): SerializationSettings =
copy(verifyEvents = value)
def withVerifyState(value: Boolean): SerializationSettings =
copy(verifyState = value)
private def copy(
enabled: Boolean = this.enabled,
verifyEquality: Boolean = this.verifyEquality,
verifyCommands: Boolean = this.verifyCommands,
verifyEvents: Boolean = this.verifyEvents,
verifyState: Boolean = this.verifyState): SerializationSettings = {
new SerializationSettings(enabled, verifyEquality, verifyCommands, verifyEvents, verifyState)
}
}
/**
* Factory method to create a new EventSourcedBehaviorTestKit.
*/
def create[Command, Event, State](
system: ActorSystem[_],
behavior: Behavior[Command]): EventSourcedBehaviorTestKit[Command, Event, State] =
create(system, behavior, enabledSerializationSettings)
/**
* Factory method to create a new EventSourcedBehaviorTestKit with custom [[SerializationSettings]].
*
* Note that `equals` must be implemented in the commands, events and state if `verifyEquality` is enabled.
*/
def create[Command, Event, State](
system: ActorSystem[_],
behavior: Behavior[Command],
serializationSettings: SerializationSettings): EventSourcedBehaviorTestKit[Command, Event, State] = {
val scaladslSettings = new scaladsl.EventSourcedBehaviorTestKit.SerializationSettings(
enabled = serializationSettings.enabled,
verifyEquality = serializationSettings.verifyEquality,
verifyCommands = serializationSettings.verifyCommands,
verifyEvents = serializationSettings.verifyEvents,
verifyState = serializationSettings.verifyState)
new EventSourcedBehaviorTestKit(scaladsl.EventSourcedBehaviorTestKit(system, behavior, scaladslSettings))
}
/**
* The result of running a command.
*/
@DoNotInherit class CommandResult[Command, Event, State](
delegate: scaladsl.EventSourcedBehaviorTestKit.CommandResult[Command, Event, State]) {
/**
* The command that was run.
*/
def command: Command =
delegate.command
/**
* The events that were emitted by the command, and persisted.
* In many cases only one event is emitted and then it's more convenient to use [[CommandResult.event]]
* or [[CommandResult.eventOfType]].
*/
def events: JList[Event] =
delegate.events.asJava
/**
* `true` if no events were emitted by the command.
*/
def hasNoEvents: Boolean =
delegate.hasNoEvents
/**
* The first event. It will throw `AssertionError` if there is no event.
*/
def event: Event =
delegate.event
/**
* The first event as a given expected type. It will throw `AssertionError` if there is no event or
* if the event is of a different type.
*/
def eventOfType[E <: Event](eventClass: Class[E]): E =
delegate.eventOfType(ClassTag[E](eventClass))
/**
* The state after applying the events.
*/
def state: State =
delegate.state
/**
* The state as a given expected type. It will throw `AssertionError` if the state is of a different type.
*/
def stateOfType[S <: State](stateClass: Class[S]): S =
delegate.stateOfType(ClassTag[S](stateClass))
}
/**
* The result of running a command with a `ActorRef<R> replyTo`, i.e. the `runCommand` with a
* `Function<ActorRef<R>, Command>` parameter.
*/
final class CommandResultWithReply[Command, Event, State, Reply](
delegate: scaladsl.EventSourcedBehaviorTestKit.CommandResultWithReply[Command, Event, State, Reply])
extends CommandResult[Command, Event, State](delegate) {
/**
* The reply. It will throw `AssertionError` if there was no reply.
*/
def reply: Reply =
delegate.reply
/**
* The reply as a given expected type. It will throw `AssertionError` if there is no reply or
* if the reply is of a different type.
*/
def replyOfType[R <: Reply](replyClass: Class[R]): R =
delegate.replyOfType(ClassTag[R](replyClass))
}
/**
* The result of restarting the behavior.
*/
final class RestartResult[State](delegate: scaladsl.EventSourcedBehaviorTestKit.RestartResult[State]) {
/**
* The state after recovery.
*/
def state: State =
delegate.state
}
}
@ApiMayChange
final class EventSourcedBehaviorTestKit[Command, Event, State](
delegate: scaladsl.EventSourcedBehaviorTestKit[Command, Event, State]) {
import EventSourcedBehaviorTestKit._
private val _persistenceTestKit = new PersistenceTestKit(delegate.persistenceTestKit)
/**
* Run one command through the behavior. The returned result contains emitted events and the state
* after applying the events.
*/
def runCommand(command: Command): CommandResult[Command, Event, State] =
new CommandResult(delegate.runCommand(command))
/**
* Run one command with a `replyTo: ActorRef` through the behavior. The returned result contains emitted events,
* the state after applying the events, and the reply.
*/
def runCommand[R](creator: JFunction[ActorRef[R], Command]): CommandResultWithReply[Command, Event, State, R] =
new CommandResultWithReply(delegate.runCommand(replyTo => creator.apply(replyTo)))
/**
* Restart the behavior, which will then recover from stored snapshot and events. Can be used for testing
* that the recovery is correct.
*/
def restart(): RestartResult[State] =
new RestartResult(delegate.restart())
/**
* Clears the in-memory journal and snapshot storage and restarts the behavior.
*/
def clear(): Unit =
delegate.clear()
/**
* The underlying `PersistenceTestKit` for the in-memory journal and snapshot storage.
* Can be useful for advanced testing scenarios, such as simulating failures or
* populating the journal with events that are used for replay.
*/
def persistenceTestKit: PersistenceTestKit =
_persistenceTestKit
}

View file

@ -19,9 +19,9 @@ import akka.util.ccompat.JavaConverters._
* Class for testing persisted events in persistent actors.
*/
@ApiMayChange
class PersistenceTestKit(system: ActorSystem) {
class PersistenceTestKit(scalaTestkit: ScalaTestKit) {
private val scalaTestkit = new ScalaTestKit(system)
def this(system: ActorSystem) = this(new ScalaTestKit(system))
/**
* Check that nothing has been saved in the storage.

View file

@ -0,0 +1,224 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.scaladsl
import scala.collection.immutable
import scala.reflect.ClassTag
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.internal.EventSourcedBehaviorTestKitImpl
/**
* Testing of [[akka.persistence.typed.scaladsl.EventSourcedBehavior]] implementations.
* It supports running one command at a time and you can assert that the synchronously returned result is as expected.
* The result contains the events emitted by the command and the new state after applying the events.
* It also has support for verifying the reply to a command.
*
* Serialization of commands, events and state are verified automatically.
*/
@ApiMayChange
object EventSourcedBehaviorTestKit {
/**
* The configuration to be included in the configuration of the `ActorSystem`. Typically used as
* constructor parameter to `ScalaTestWithActorTestKit`. The configuration enables the in-memory
* journal and snapshot storage.
*/
val config: Config = ConfigFactory.parseString("""
akka.persistence.testkit.events.serialize = off
""").withFallback(PersistenceTestKitPlugin.config)
object SerializationSettings {
val enabled: SerializationSettings = new SerializationSettings(
enabled = true,
verifyEquality = false,
verifyCommands = true,
verifyEvents = true,
verifyState = true)
val disabled: SerializationSettings = new SerializationSettings(
enabled = false,
verifyEquality = false,
verifyCommands = false,
verifyEvents = false,
verifyState = false)
}
/**
* Customization of which serialization checks that are performed.
* `equals` must be implemented (or using `case class`) when `verifyEquality` is enabled.
*/
final class SerializationSettings private[akka] (
val enabled: Boolean,
val verifyEquality: Boolean,
val verifyCommands: Boolean,
val verifyEvents: Boolean,
val verifyState: Boolean) {
def withEnabled(value: Boolean): SerializationSettings =
copy(enabled = value)
def withVerifyEquality(value: Boolean): SerializationSettings =
copy(verifyEquality = value)
def withVerifyCommands(value: Boolean): SerializationSettings =
copy(verifyCommands = value)
def withVerifyEvents(value: Boolean): SerializationSettings =
copy(verifyEvents = value)
def withVerifyState(value: Boolean): SerializationSettings =
copy(verifyState = value)
private def copy(
enabled: Boolean = this.enabled,
verifyEquality: Boolean = this.verifyEquality,
verifyCommands: Boolean = this.verifyCommands,
verifyEvents: Boolean = this.verifyEvents,
verifyState: Boolean = this.verifyState): SerializationSettings = {
new SerializationSettings(enabled, verifyEquality, verifyCommands, verifyEvents, verifyState)
}
}
/**
* Factory method to create a new EventSourcedBehaviorTestKit.
*/
def apply[Command, Event, State](
system: ActorSystem[_],
behavior: Behavior[Command]): EventSourcedBehaviorTestKit[Command, Event, State] =
apply(system, behavior, SerializationSettings.enabled)
/**
* Factory method to create a new EventSourcedBehaviorTestKit with custom [[SerializationSettings]].
*
* Note that `equals` must be implemented (or using `case class`) in the commands, events and state if
* `verifyEquality` is enabled.
*/
def apply[Command, Event, State](
system: ActorSystem[_],
behavior: Behavior[Command],
serializationSettings: SerializationSettings): EventSourcedBehaviorTestKit[Command, Event, State] =
new EventSourcedBehaviorTestKitImpl(ActorTestKit(system), behavior, serializationSettings)
/**
* The result of running a command.
*/
@DoNotInherit trait CommandResult[Command, Event, State] {
/**
* The command that was run.
*/
def command: Command
/**
* The events that were emitted by the command, and persisted.
* In many cases only one event is emitted and then it's more convenient to use [[CommandResult.event]]
* or [[CommandResult.eventOfType]].
*/
def events: immutable.Seq[Event]
/**
* `true` if no events were emitted by the command.
*/
def hasNoEvents: Boolean
/**
* The first event. It will throw `AssertionError` if there is no event.
*/
def event: Event
/**
* The first event as a given expected type. It will throw `AssertionError` if there is no event or
* if the event is of a different type.
*/
def eventOfType[E <: Event: ClassTag]: E
/**
* The state after applying the events.
*/
def state: State
/**
* The state as a given expected type. It will throw `AssertionError` if the state is of a different type.
*/
def stateOfType[S <: State: ClassTag]: S
}
/**
* The result of running a command with a `replyTo: ActorRef[R]`, i.e. the `runCommand` with a
* `ActorRef[R] => Command` parameter.
*/
@DoNotInherit trait CommandResultWithReply[Command, Event, State, Reply]
extends CommandResult[Command, Event, State] {
/**
* The reply. It will throw `AssertionError` if there was no reply.
*/
def reply: Reply
/**
* The reply as a given expected type. It will throw `AssertionError` if there is no reply or
* if the reply is of a different type.
*/
def replyOfType[R <: Reply: ClassTag]: R
}
/**
* The result of restarting the behavior.
*/
@DoNotInherit trait RestartResult[State] {
/**
* The state after recovery.
*/
def state: State
}
}
@ApiMayChange
@DoNotInherit trait EventSourcedBehaviorTestKit[Command, Event, State] {
import EventSourcedBehaviorTestKit._
/**
* Run one command through the behavior. The returned result contains emitted events and the state
* after applying the events.
*/
def runCommand(command: Command): CommandResult[Command, Event, State]
/**
* Run one command with a `replyTo: ActorRef[R]` through the behavior. The returned result contains emitted events,
* the state after applying the events, and the reply.
*/
def runCommand[R](creator: ActorRef[R] => Command): CommandResultWithReply[Command, Event, State, R]
/**
* Restart the behavior, which will then recover from stored snapshot and events. Can be used for testing
* that the recovery is correct.
*/
def restart(): RestartResult[State]
/**
* Clears the in-memory journal and snapshot storage and restarts the behavior.
*/
def clear(): Unit
/**
* The underlying `PersistenceTestKit` for the in-memory journal and snapshot storage.
* Can be useful for advanced testing scenarios, such as simulating failures or
* populating the journal with events that are used for replay.
*/
def persistenceTestKit: PersistenceTestKit
}

View file

@ -10,12 +10,19 @@ import scala.util.Try
import com.typesafe.config.Config
import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId }
import akka.actor.ActorSystem
import akka.actor.ClassicActorSystemProvider
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.typed.{ ActorSystem => TypedActorSystem }
import akka.annotation.ApiMayChange
import akka.persistence.{ Persistence, PersistentRepr, SnapshotMetadata }
import akka.persistence.Persistence
import akka.persistence.PersistentRepr
import akka.persistence.SnapshotMetadata
import akka.persistence.testkit._
import akka.persistence.testkit.internal.{ InMemStorageExtension, SnapshotStorageEmulatorExtension }
import akka.persistence.testkit.internal.InMemStorageExtension
import akka.persistence.testkit.internal.SnapshotStorageEmulatorExtension
import akka.testkit.TestProbe
private[testkit] trait CommonTestKitOps[S, P] extends ClearOps with PolicyOpsTestKit[P] {
@ -293,7 +300,7 @@ private[testkit] trait PersistenceTestKitOps[S, P]
/**
* Persist `snapshots` into storage in order.
*/
def persistForRecovery(persistenceId: String, snapshots: immutable.Seq[Any]): Unit
def persistForRecovery(persistenceId: String, events: immutable.Seq[Any]): Unit
/**
* Retrieve all snapshots saved in storage by persistence id.
@ -480,9 +487,9 @@ class PersistenceTestKit(system: ActorSystem)
override def failNextNDeletes(persistenceId: String, n: Int, cause: Throwable): Unit =
failNextNOpsCond((pid, op) => pid == persistenceId && op.isInstanceOf[DeleteEvents], n, cause)
def persistForRecovery(persistenceId: String, snapshots: immutable.Seq[Any]): Unit = {
storage.addAny(persistenceId, snapshots)
addToIndex(persistenceId, snapshots.size)
def persistForRecovery(persistenceId: String, events: immutable.Seq[Any]): Unit = {
storage.addAny(persistenceId, events)
addToIndex(persistenceId, events.size)
}
def persistedInStorage(persistenceId: String): immutable.Seq[Any] =
@ -494,9 +501,7 @@ class PersistenceTestKit(system: ActorSystem)
@ApiMayChange
object PersistenceTestKit {
def apply(system: ActorSystem): PersistenceTestKit = new PersistenceTestKit(system)
def apply(system: TypedActorSystem[_]): PersistenceTestKit = apply(system.classicSystem)
def apply(system: ClassicActorSystemProvider): PersistenceTestKit = new PersistenceTestKit(system.classicSystem)
object Settings extends ExtensionId[Settings] {

View file

@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- Silence initial setup logging from Logback -->
<statusListener class="ch.qos.logback.core.status.NopStatusListener" />
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} %-5level %logger %marker - %msg MDC: {%mdc}%n</pattern>
</encoder>
</appender>
<!--
Logging from tests are silenced by this appender. When there is a test failure
the captured logging events are flushed to the appenders defined for the
akka.actor.testkit.typed.internal.CapturingAppenderDelegate logger.
-->
<appender name="CapturingAppender" class="akka.actor.testkit.typed.internal.CapturingAppender" />
<!--
The appenders defined for this CapturingAppenderDelegate logger are used
when there is a test failure and all logging events from the test are
flushed to these appenders.
-->
<logger name="akka.actor.testkit.typed.internal.CapturingAppenderDelegate" >
<appender-ref ref="STDOUT"/>
</logger>
<root level="TRACE">
<appender-ref ref="CapturingAppender"/>
</root>
</configuration>

View file

@ -0,0 +1,302 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.scaladsl
import java.io.NotSerializableException
import org.scalatest.wordspec.AnyWordSpecLike
import akka.Done
import akka.actor.testkit.typed.TestException
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.LoggingTestKit
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKitSpec.TestCounter.NotSerializableState
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.internal.JournalFailureException
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.serialization.DisabledJavaSerializer
import akka.serialization.jackson.CborSerializable
import akka.util.unused
object EventSourcedBehaviorTestKitSpec {
object TestCounter {
sealed trait Command
case object Increment extends Command with CborSerializable
final case class IncrementWithConfirmation(replyTo: ActorRef[Done]) extends Command with CborSerializable
case class IncrementSeveral(n: Int) extends Command with CborSerializable
final case class GetValue(replyTo: ActorRef[State]) extends Command with CborSerializable
sealed trait Event
final case class Incremented(delta: Int) extends Event with CborSerializable
sealed trait State
final case class RealState(value: Int, history: Vector[Int]) extends State with CborSerializable
case object IncrementWithNotSerializableEvent extends Command with CborSerializable
final case class NotSerializableEvent(delta: Int) extends Event
case object IncrementWithNotSerializableState extends Command with CborSerializable
final case class IncrementedWithNotSerializableState(delta: Int) extends Event with CborSerializable
final case class NotSerializableState(value: Int, history: Vector[Int]) extends State
case object NotSerializableCommand extends Command
final case class IncrementWithNotSerializableReply(replyTo: ActorRef[NotSerializableReply.type])
extends Command
with CborSerializable
object NotSerializableReply
def apply(persistenceId: PersistenceId): Behavior[Command] =
apply(persistenceId, RealState(0, Vector.empty))
def apply(persistenceId: PersistenceId, emptyState: State): Behavior[Command] =
Behaviors.setup(ctx => counter(ctx, persistenceId, emptyState))
private def counter(
@unused ctx: ActorContext[Command],
persistenceId: PersistenceId,
emptyState: State): EventSourcedBehavior[Command, Event, State] = {
EventSourcedBehavior.withEnforcedReplies[Command, Event, State](
persistenceId,
emptyState,
commandHandler = (state, command) =>
command match {
case Increment =>
Effect.persist(Incremented(1)).thenNoReply()
case IncrementWithConfirmation(replyTo) =>
Effect.persist(Incremented(1)).thenReply(replyTo)(_ => Done)
case IncrementSeveral(n: Int) =>
val events = (1 to n).map(_ => Incremented(1))
Effect.persist(events).thenNoReply()
case IncrementWithNotSerializableEvent =>
Effect.persist(NotSerializableEvent(1)).thenNoReply()
case IncrementWithNotSerializableState =>
Effect.persist(IncrementedWithNotSerializableState(1)).thenNoReply()
case IncrementWithNotSerializableReply(replyTo) =>
Effect.persist(Incremented(1)).thenReply(replyTo)(_ => NotSerializableReply)
case NotSerializableCommand =>
Effect.noReply
case GetValue(replyTo) =>
Effect.reply(replyTo)(state)
},
eventHandler = {
case (RealState(value, history), Incremented(delta)) =>
if (delta <= 0)
throw new IllegalStateException("Delta must be positive")
RealState(value + delta, history :+ value)
case (RealState(value, history), NotSerializableEvent(delta)) =>
RealState(value + delta, history :+ value)
case (RealState(value, history), IncrementedWithNotSerializableState(delta)) =>
NotSerializableState(value + delta, history :+ value)
case (state: NotSerializableState, _) =>
throw new IllegalStateException(state.toString)
})
}
}
}
class EventSourcedBehaviorTestKitSpec
extends ScalaTestWithActorTestKit(EventSourcedBehaviorTestKit.config)
with AnyWordSpecLike
with LogCapturing {
import EventSourcedBehaviorTestKitSpec._
private val persistenceId = PersistenceId.ofUniqueId("test")
private val behavior = TestCounter(persistenceId)
private def createTestKit() = {
EventSourcedBehaviorTestKit[TestCounter.Command, TestCounter.Event, TestCounter.State](system, behavior)
}
"EventSourcedBehaviorTestKit" must {
"run commands" in {
val eventSourcedTestKit = createTestKit()
val result1 = eventSourcedTestKit.runCommand(TestCounter.Increment)
result1.event should ===(TestCounter.Incremented(1))
result1.state should ===(TestCounter.RealState(1, Vector(0)))
val result2 = eventSourcedTestKit.runCommand(TestCounter.Increment)
result2.event should ===(TestCounter.Incremented(1))
result2.state should ===(TestCounter.RealState(2, Vector(0, 1)))
result2.eventOfType[TestCounter.Incremented].delta should ===(1)
intercept[AssertionError] {
// wrong event type
result2.eventOfType[TestCounter.NotSerializableEvent].delta should ===(1)
}
}
"run command emitting several events" in {
val eventSourcedTestKit = createTestKit()
val result = eventSourcedTestKit.runCommand(TestCounter.IncrementSeveral(3))
result.events should ===(List(TestCounter.Incremented(1), TestCounter.Incremented(1), TestCounter.Incremented(1)))
result.state should ===(TestCounter.RealState(3, Vector(0, 1, 2)))
}
"run commands with reply" in {
val eventSourcedTestKit = createTestKit()
val result1 = eventSourcedTestKit.runCommand[Done](TestCounter.IncrementWithConfirmation(_))
result1.event should ===(TestCounter.Incremented(1))
result1.state should ===(TestCounter.RealState(1, Vector(0)))
result1.reply should ===(Done)
val result2 = eventSourcedTestKit.runCommand[Done](TestCounter.IncrementWithConfirmation(_))
result2.event should ===(TestCounter.Incremented(1))
result2.state should ===(TestCounter.RealState(2, Vector(0, 1)))
result2.reply should ===(Done)
}
"detect missing reply" in {
val eventSourcedTestKit = createTestKit()
intercept[AssertionError] {
eventSourcedTestKit.runCommand[Done](_ => TestCounter.Increment)
}.getMessage should include("Missing expected reply")
}
"run command with reply that is not emitting events" in {
val eventSourcedTestKit = createTestKit()
eventSourcedTestKit.runCommand(TestCounter.Increment)
val result = eventSourcedTestKit.runCommand[TestCounter.State](TestCounter.GetValue(_))
result.hasNoEvents should ===(true)
intercept[AssertionError] {
result.event
}
result.state should ===(TestCounter.RealState(1, Vector(0)))
}
"detect non-serializable events" in {
val eventSourcedTestKit = createTestKit()
val exc = intercept[IllegalArgumentException] {
eventSourcedTestKit.runCommand(TestCounter.IncrementWithNotSerializableEvent)
}
(exc.getMessage should startWith).regex("Event.*isn't serializable")
exc.getCause.getClass should ===(classOf[DisabledJavaSerializer.JavaSerializationException])
}
"detect non-serializable state" in {
val eventSourcedTestKit = createTestKit()
val exc = intercept[IllegalArgumentException] {
eventSourcedTestKit.runCommand(TestCounter.IncrementWithNotSerializableState)
}
(exc.getMessage should include).regex("State.*isn't serializable")
exc.getCause.getClass should ===(classOf[DisabledJavaSerializer.JavaSerializationException])
}
"detect non-serializable empty state" in {
val eventSourcedTestKit =
EventSourcedBehaviorTestKit[TestCounter.Command, TestCounter.Event, TestCounter.State](
system,
TestCounter(persistenceId, NotSerializableState(0, Vector.empty)))
val exc = intercept[IllegalArgumentException] {
eventSourcedTestKit.runCommand(TestCounter.Increment)
}
(exc.getMessage should include).regex("Empty State.*isn't serializable")
exc.getCause.getClass should ===(classOf[DisabledJavaSerializer.JavaSerializationException])
}
"detect non-serializable command" in {
val eventSourcedTestKit = createTestKit()
val exc = intercept[IllegalArgumentException] {
eventSourcedTestKit.runCommand(TestCounter.NotSerializableCommand)
}
(exc.getMessage should include).regex("Command.*isn't serializable")
exc.getCause.getClass should ===(classOf[DisabledJavaSerializer.JavaSerializationException])
}
"detect non-serializable reply" in {
val eventSourcedTestKit = createTestKit()
val exc = intercept[IllegalArgumentException] {
eventSourcedTestKit.runCommand(TestCounter.IncrementWithNotSerializableReply(_))
}
(exc.getMessage should include).regex("Reply.*isn't serializable")
exc.getCause.getClass should ===(classOf[NotSerializableException])
}
"support test of replay" in {
val eventSourcedTestKit = createTestKit()
eventSourcedTestKit.runCommand(TestCounter.Increment)
eventSourcedTestKit.runCommand(TestCounter.Increment)
val expectedState = TestCounter.RealState(3, Vector(0, 1, 2))
eventSourcedTestKit.runCommand(TestCounter.Increment).state should ===(expectedState)
}
"support test of replay from stored events" in {
val eventSourcedTestKit = createTestKit()
eventSourcedTestKit.persistenceTestKit
.persistForRecovery(persistenceId.id, List(TestCounter.Incremented(1), TestCounter.Incremented(1)))
eventSourcedTestKit.restart().state should ===(TestCounter.RealState(2, Vector(0, 1)))
}
"support test of invalid events" in {
val eventSourcedTestKit = createTestKit()
eventSourcedTestKit.persistenceTestKit
.persistForRecovery(persistenceId.id, List(TestCounter.Incremented(1), TestCounter.Incremented(-1)))
intercept[IllegalStateException] {
eventSourcedTestKit.restart()
}
}
"only allow EventSourcedBehavior" in {
intercept[IllegalArgumentException] {
EventSourcedBehaviorTestKit[TestCounter.Command, TestCounter.Event, TestCounter.State](
system,
Behaviors.empty[TestCounter.Command])
}
}
"support test of failures" in {
val eventSourcedTestKit = createTestKit()
eventSourcedTestKit.runCommand(TestCounter.Increment)
eventSourcedTestKit.runCommand(TestCounter.Increment)
eventSourcedTestKit.persistenceTestKit.failNextPersisted(persistenceId.id, TestException("DB err"))
LoggingTestKit.error[JournalFailureException].expect {
intercept[AssertionError] {
eventSourcedTestKit.runCommand(TestCounter.Increment)
}
}
eventSourcedTestKit.restart().state should ===(TestCounter.RealState(2, Vector(0, 1)))
eventSourcedTestKit.runCommand(TestCounter.Increment).state should ===(TestCounter.RealState(3, Vector(0, 1, 2)))
}
"have possibility to clear" in {
val eventSourcedTestKit = createTestKit()
eventSourcedTestKit.runCommand(TestCounter.Increment)
eventSourcedTestKit.runCommand(TestCounter.Increment).state should ===(TestCounter.RealState(2, Vector(0, 1)))
eventSourcedTestKit.clear()
eventSourcedTestKit.runCommand(TestCounter.Increment).state should ===(TestCounter.RealState(1, Vector(0)))
}
}
}

View file

@ -0,0 +1,6 @@
# 23712 Internals for EventSourcedBehaviorTestKit
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.typed.internal.RequestingRecoveryPermit.stashInternal")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.typed.internal.ReplayingSnapshot.stashInternal")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.typed.internal.ReplayingEvents.stashInternal")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.typed.internal.StashManagement.stashInternal")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.typed.internal.Running.stashInternal")

View file

@ -8,6 +8,7 @@ import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.typed
import akka.actor.typed.ActorRef
import akka.actor.typed.BackoffSupervisorStrategy
import akka.actor.typed.Behavior
import akka.actor.typed.BehaviorInterceptor
@ -57,6 +58,17 @@ private[akka] object EventSourcedBehaviorImpl {
}
final case class WriterIdentity(instanceId: Int, writerUuid: String)
/**
* Used by EventSourcedBehaviorTestKit to retrieve the `persistenceId`.
*/
final case class GetPersistenceId(replyTo: ActorRef[PersistenceId]) extends Signal
/**
* Used by EventSourcedBehaviorTestKit to retrieve the state.
* Can't be a Signal because those are not stashed.
*/
final case class GetState[State](replyTo: ActorRef[State]) extends InternalProtocol
}
@InternalApi
@ -113,6 +125,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
ctx.log.debug("Events successfully deleted to sequence number [{}].", toSequenceNr)
case (_, DeleteEventsFailed(toSequenceNr, failure)) =>
ctx.log.warn2("Failed to delete events to sequence number [{}] due to: {}", toSequenceNr, failure.getMessage)
case (_, EventSourcedBehaviorImpl.GetPersistenceId(replyTo)) => replyTo ! persistenceId
}
// do this once, even if the actor is restarted

View file

@ -20,6 +20,7 @@ import akka.persistence.typed.EventsSeq
import akka.persistence.typed.RecoveryCompleted
import akka.persistence.typed.RecoveryFailed
import akka.persistence.typed.SingleEventSeq
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetState
import akka.persistence.typed.internal.ReplayingEvents.ReplayingState
import akka.persistence.typed.internal.Running.WithSeqNrAccessible
import akka.util.OptionVal
@ -87,11 +88,12 @@ private[akka] final class ReplayingEvents[C, E, S](
override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = {
msg match {
case JournalResponse(r) => onJournalResponse(r)
case SnapshotterResponse(r) => onSnapshotterResponse(r)
case RecoveryTickEvent(snap) => onRecoveryTick(snap)
case cmd: IncomingCommand[C] => onCommand(cmd)
case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit
case JournalResponse(r) => onJournalResponse(r)
case SnapshotterResponse(r) => onSnapshotterResponse(r)
case RecoveryTickEvent(snap) => onRecoveryTick(snap)
case cmd: IncomingCommand[C] => onCommand(cmd)
case get: GetState[S @unchecked] => stashInternal(get)
case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit
}
}
@ -162,7 +164,6 @@ private[akka] final class ReplayingEvents[C, E, S](
Behaviors.unhandled
} else {
stashInternal(cmd)
Behaviors.same
}
}

View file

@ -12,6 +12,7 @@ import akka.persistence._
import akka.persistence.SnapshotProtocol.LoadSnapshotFailed
import akka.persistence.SnapshotProtocol.LoadSnapshotResult
import akka.persistence.typed.RecoveryFailed
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetState
import akka.util.unused
/**
@ -66,7 +67,8 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
Behaviors.unhandled
} else
onCommand(cmd)
case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit
case get: GetState[S @unchecked] => stashInternal(get)
case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit
}
.receiveSignal(returnPermitOnStop.orElse {
case (_, PoisonPill) =>
@ -118,7 +120,6 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
def onCommand(cmd: IncomingCommand[C]): Behavior[InternalProtocol] = {
// during recovery, stash all incoming commands
stashInternal(cmd)
Behaviors.same
}
def onJournalResponse(response: JournalProtocol.Response): Behavior[InternalProtocol] = {

View file

@ -53,7 +53,6 @@ private[akka] class RequestingRecoveryPermit[C, E, S](override val setup: Behavi
Behaviors.unhandled
} else {
stashInternal(other)
Behaviors.same
}
}

View file

@ -36,6 +36,7 @@ import akka.persistence.typed.SnapshotCompleted
import akka.persistence.typed.SnapshotFailed
import akka.persistence.typed.SnapshotMetadata
import akka.persistence.typed.SnapshotSelectionCriteria
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetState
import akka.persistence.typed.internal.Running.WithSeqNrAccessible
import akka.persistence.typed.scaladsl.Effect
import akka.util.unused
@ -104,6 +105,7 @@ private[akka] object Running {
case IncomingCommand(c: C @unchecked) => onCommand(state, c)
case JournalResponse(r) => onDeleteEventsJournalResponse(r, state.state)
case SnapshotterResponse(r) => onDeleteSnapshotResponse(r, state.state)
case get: GetState[S @unchecked] => onGetState(get)
case _ => Behaviors.unhandled
}
@ -121,6 +123,12 @@ private[akka] object Running {
applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast?
}
// Used by EventSourcedBehaviorTestKit to retrieve the state.
def onGetState(get: GetState[S]): Behavior[InternalProtocol] = {
get.replyTo ! state.state
this
}
@tailrec def applyEffects(
msg: Any,
state: RunningState[S],
@ -236,6 +244,7 @@ private[akka] object Running {
msg match {
case JournalResponse(r) => onJournalResponse(r)
case in: IncomingCommand[C @unchecked] => onCommand(in)
case get: GetState[S @unchecked] => stashInternal(get)
case SnapshotterResponse(r) => onDeleteSnapshotResponse(r, visibleState.state)
case RecoveryTickEvent(_) => Behaviors.unhandled
case RecoveryPermitGranted => Behaviors.unhandled
@ -249,7 +258,6 @@ private[akka] object Running {
Behaviors.unhandled
} else {
stashInternal(cmd)
this
}
}
@ -348,7 +356,6 @@ private[akka] object Running {
Behaviors.unhandled
} else {
stashInternal(cmd)
Behaviors.same
}
}
@ -406,6 +413,8 @@ private[akka] object Running {
case _ =>
onDeleteSnapshotResponse(response, state.state)
}
case get: GetState[S @unchecked] =>
stashInternal(get)
case _ =>
Behaviors.unhandled
}

View file

@ -7,6 +7,7 @@ package akka.persistence.typed.internal
import akka.actor.Dropped
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.LoggerOps
import akka.actor.typed.scaladsl.StashBuffer
import akka.actor.typed.scaladsl.StashOverflowException
@ -29,8 +30,10 @@ private[akka] trait StashManagement[C, E, S] {
/**
* Stash a command to the internal stash buffer, which is used while waiting for persist to be completed.
*/
protected def stashInternal(msg: InternalProtocol): Unit =
protected def stashInternal(msg: InternalProtocol): Behavior[InternalProtocol] = {
stash(msg, stashState.internalStashBuffer)
Behaviors.same
}
/**
* Stash a command to the user stash buffer, which is used when `Stash` effect is used.

View file

@ -281,6 +281,7 @@ lazy val persistenceTestkit = akkaModule("akka-persistence-testkit")
.dependsOn(
persistenceTyped % "compile->compile;provided->provided;test->test",
testkit % "compile->compile;test->test",
actorTestkitTyped,
persistenceTck % "test")
.settings(Dependencies.persistenceTestKit)
.settings(AutomaticModuleName.settings("akka.persistence.testkit"))
@ -450,6 +451,7 @@ lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed")
actorTestkitTyped % "test->test",
actorTypedTests % "test->test",
persistenceTyped % "test->test",
persistenceTestkit % "test->test",
remote % "compile->CompileJdk9;test->test",
remoteTests % "test->test",
remoteTests % "test->test;multi-jvm->multi-jvm",

View file

@ -243,7 +243,7 @@ object Dependencies {
Provided.levelDB,
Provided.levelDBNative)
val persistenceTestKit = l ++= Seq(Test.scalatest)
val persistenceTestKit = l ++= Seq(Test.scalatest, Test.logback)
val persistenceShared = l ++= Seq(Provided.levelDB, Provided.levelDBNative)

View file

@ -407,6 +407,27 @@ project-info {
}
]
}
akka-persistence-testkit: ${project-info.shared-info} {
title: "Akka Persistence Testkit"
jpms-name: "akka.persistence.testkit"
levels: [
{
readiness: Incubating
since: "2020-04-30"
since-version: "2.6.5"
}
]
api-docs: [
{
url: ${project-info.scaladoc}"persistence/testkit/scaladsl/index.html"
text: "API (Scaladoc)"
}
{
url: ${project-info.javadoc}"?akka/persistence/testkit/javadsl/package-summary.html"
text: "API (Javadoc)"
}
]
}
akka-remote: ${project-info.shared-info} {
title: "Akka Remoting"
jpms-name: "akka.remote"