From ef797383732a0374179316688e54575f24faa3cd Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 29 Apr 2020 22:06:42 +0200 Subject: [PATCH] Synchronous TestKit for EventSourcedBehavior, #23712 (#28952) * using real EventSourcedBehaviorImpl * using new inmem journal (PersistenceTestKit) * advantages compared to a "fake" driver * no difference in implementation details from real thing * no limitations * less maintance * added internal messsages to EventSourcedBehaviorImpl to be able to grab state and persistenceId * GetState as InternalProtocol instead of Signal so that it is stashed * serialization checks, using SerializationTestKit * better testKitGuardian naming to allow multiple PersistenceTestKit * support testing of restart * support failure testing by using PersistenceTestKit * update doc sample * apidoc, reference docs, and javadsl --- .../testkit/typed/scaladsl/ActorTestKit.scala | 10 +- .../sharding/typed/AccountExampleDocTest.java | 190 ++++++----- .../typed/AccountExampleDocSpec.scala | 124 +++---- .../main/paradox/typed/persistence-testing.md | 65 ++-- .../EventSourcedBehaviorTestKitImpl.scala | 187 +++++++++++ .../javadsl/EventSourcedBehaviorTestKit.scala | 246 ++++++++++++++ .../testkit/javadsl/PersistenceTestKit.scala | 4 +- .../EventSourcedBehaviorTestKit.scala | 224 +++++++++++++ .../testkit/scaladsl/PersistenceTestKit.scala | 25 +- .../src/test/resources/logback-test.xml | 31 ++ .../EventSourcedBehaviorTestKitSpec.scala | 302 ++++++++++++++++++ .../issue-23712-testkit.excludes | 6 + .../internal/EventSourcedBehaviorImpl.scala | 13 + .../typed/internal/ReplayingEvents.scala | 13 +- .../typed/internal/ReplayingSnapshot.scala | 5 +- .../internal/RequestingRecoveryPermit.scala | 1 - .../persistence/typed/internal/Running.scala | 13 +- .../typed/internal/StashManagement.scala | 5 +- build.sbt | 2 + project/Dependencies.scala | 2 +- project/project-info.conf | 21 ++ 21 files changed, 1254 insertions(+), 235 deletions(-) create mode 100644 akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/EventSourcedBehaviorTestKitImpl.scala create mode 100644 akka-persistence-testkit/src/main/scala/akka/persistence/testkit/javadsl/EventSourcedBehaviorTestKit.scala create mode 100644 akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/EventSourcedBehaviorTestKit.scala create mode 100644 akka-persistence-testkit/src/test/resources/logback-test.xml create mode 100644 akka-persistence-testkit/src/test/scala/akka/persistence/testkit/scaladsl/EventSourcedBehaviorTestKitSpec.scala create mode 100644 akka-persistence-typed/src/main/mima-filters/2.6.4.backwards.excludes/issue-23712-testkit.excludes diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/ActorTestKit.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/ActorTestKit.scala index 45109abb42..6c8009a534 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/ActorTestKit.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/ActorTestKit.scala @@ -5,6 +5,7 @@ package akka.actor.testkit.typed.scaladsl import java.util.concurrent.TimeoutException +import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.Await import scala.concurrent.duration._ @@ -34,6 +35,8 @@ import akka.util.Timeout object ActorTestKit { + private val testKitGuardianCounter = new AtomicInteger(0) + /** * Create a testkit named from the ActorTestKit class. * @@ -62,7 +65,12 @@ object ActorTestKit { * using default configuration from the reference.conf resources that ship with the Akka libraries. */ def apply(system: ActorSystem[_]): ActorTestKit = { - val testKitGuardian = system.systemActorOf(ActorTestKitGuardian.testKitGuardian, "test") + val name = testKitGuardianCounter.incrementAndGet() match { + case 1 => "test" + case n => s"test-$n" + } + val testKitGuardian = + system.systemActorOf(ActorTestKitGuardian.testKitGuardian, name) new ActorTestKit(system, testKitGuardian, settings = None) } diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java index 28710ac8a4..b0c8d057d8 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java @@ -4,33 +4,28 @@ package jdocs.akka.cluster.sharding.typed; +import org.scalatestplus.junit.JUnitSuite; + +import static jdocs.akka.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity; + // #test import java.math.BigDecimal; -import java.util.UUID; +import akka.actor.testkit.typed.javadsl.LogCapturing; +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; +import akka.actor.typed.ActorRef; +import akka.persistence.testkit.javadsl.EventSourcedBehaviorTestKit; +import akka.persistence.testkit.javadsl.EventSourcedBehaviorTestKit.CommandResultWithReply; +import akka.persistence.typed.PersistenceId; +import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import static org.junit.Assert.assertEquals; - -import akka.actor.testkit.typed.javadsl.LogCapturing; -import akka.actor.testkit.typed.javadsl.TestKitJunitResource; -import akka.actor.testkit.typed.javadsl.TestProbe; -import akka.actor.typed.ActorRef; -import akka.persistence.typed.PersistenceId; +import static org.junit.Assert.assertTrue; // #test -// #test-events -import akka.actor.typed.eventstream.EventStream; -import akka.persistence.journal.inmem.InmemJournal; - -// #test-events - -import org.scalatestplus.junit.JUnitSuite; - -import static jdocs.akka.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity; - // #test public class AccountExampleDocTest // #test @@ -38,100 +33,103 @@ public class AccountExampleDocTest // #test { - // #inmem-config - private static final String inmemConfig = - "akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n" - + "akka.persistence.journal.inmem.test-serialization = on \n"; + // #testkit + @ClassRule + public static final TestKitJunitResource testKit = + new TestKitJunitResource(EventSourcedBehaviorTestKit.config()); - // #inmem-config - - // #snapshot-store-config - private static final String snapshotConfig = - "akka.persistence.snapshot-store.plugin = \"akka.persistence.snapshot-store.local\" \n" - + "akka.persistence.snapshot-store.local.dir = \"target/snapshot-" - + UUID.randomUUID().toString() - + "\" \n"; - // #snapshot-store-config - - private static final String config = inmemConfig + snapshotConfig; - - @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(config); + private EventSourcedBehaviorTestKit< + AccountEntity.Command, AccountEntity.Event, AccountEntity.Account> + eventSourcedTestKit = + EventSourcedBehaviorTestKit.create( + testKit.system(), AccountEntity.create("1", PersistenceId.of("Account", "1"))); + // #testkit @Rule public final LogCapturing logCapturing = new LogCapturing(); + @Before + public void beforeEach() { + eventSourcedTestKit.clear(); + } + + @Test + public void createWithEmptyBalance() { + CommandResultWithReply< + AccountEntity.Command, + AccountEntity.Event, + AccountEntity.Account, + AccountEntity.OperationResult> + result = eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new); + assertEquals(AccountEntity.Confirmed.INSTANCE, result.reply()); + assertEquals(AccountEntity.AccountCreated.INSTANCE, result.event()); + assertEquals(BigDecimal.ZERO, result.stateOfType(AccountEntity.OpenedAccount.class).balance); + } + @Test public void handleWithdraw() { - ActorRef ref = - testKit.spawn(AccountEntity.create("1", PersistenceId.of("Account", "1"))); - TestProbe probe = - testKit.createTestProbe(AccountEntity.OperationResult.class); - ref.tell(new AccountEntity.CreateAccount(probe.getRef())); - probe.expectMessage(AccountEntity.Confirmed.INSTANCE); - ref.tell(new AccountEntity.Deposit(BigDecimal.valueOf(100), probe.getRef())); - probe.expectMessage(AccountEntity.Confirmed.INSTANCE); - ref.tell(new AccountEntity.Withdraw(BigDecimal.valueOf(10), probe.getRef())); - probe.expectMessage(AccountEntity.Confirmed.INSTANCE); + eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new); + + CommandResultWithReply< + AccountEntity.Command, + AccountEntity.Event, + AccountEntity.Account, + AccountEntity.OperationResult> + result1 = + eventSourcedTestKit.runCommand( + replyTo -> new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo)); + assertEquals(AccountEntity.Confirmed.INSTANCE, result1.reply()); + assertEquals( + BigDecimal.valueOf(100), result1.eventOfType(AccountEntity.Deposited.class).amount); + assertEquals( + BigDecimal.valueOf(100), result1.stateOfType(AccountEntity.OpenedAccount.class).balance); + + CommandResultWithReply< + AccountEntity.Command, + AccountEntity.Event, + AccountEntity.Account, + AccountEntity.OperationResult> + result2 = + eventSourcedTestKit.runCommand( + replyTo -> new AccountEntity.Withdraw(BigDecimal.valueOf(10), replyTo)); + assertEquals(AccountEntity.Confirmed.INSTANCE, result2.reply()); + assertEquals(BigDecimal.valueOf(10), result2.eventOfType(AccountEntity.Withdrawn.class).amount); + assertEquals( + BigDecimal.valueOf(90), result2.stateOfType(AccountEntity.OpenedAccount.class).balance); } @Test public void rejectWithdrawOverdraft() { - ActorRef ref = - testKit.spawn(AccountEntity.create("2", PersistenceId.of("Account", "2"))); - TestProbe probe = - testKit.createTestProbe(AccountEntity.OperationResult.class); - ref.tell(new AccountEntity.CreateAccount(probe.getRef())); - probe.expectMessage(AccountEntity.Confirmed.INSTANCE); - ref.tell(new AccountEntity.Deposit(BigDecimal.valueOf(100), probe.getRef())); - probe.expectMessage(AccountEntity.Confirmed.INSTANCE); - ref.tell(new AccountEntity.Withdraw(BigDecimal.valueOf(110), probe.getRef())); - probe.expectMessageClass(AccountEntity.Rejected.class); + eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new); + eventSourcedTestKit.runCommand( + (ActorRef replyTo) -> + new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo)); + + CommandResultWithReply< + AccountEntity.Command, + AccountEntity.Event, + AccountEntity.Account, + AccountEntity.OperationResult> + result = + eventSourcedTestKit.runCommand( + replyTo -> new AccountEntity.Withdraw(BigDecimal.valueOf(110), replyTo)); + result.replyOfType(AccountEntity.Rejected.class); + assertTrue(result.hasNoEvents()); } @Test public void handleGetBalance() { - ActorRef ref = - testKit.spawn(AccountEntity.create("3", PersistenceId.of("Account", "3"))); - TestProbe opProbe = - testKit.createTestProbe(AccountEntity.OperationResult.class); - ref.tell(new AccountEntity.CreateAccount(opProbe.getRef())); - opProbe.expectMessage(AccountEntity.Confirmed.INSTANCE); - ref.tell(new AccountEntity.Deposit(BigDecimal.valueOf(100), opProbe.getRef())); - opProbe.expectMessage(AccountEntity.Confirmed.INSTANCE); + eventSourcedTestKit.runCommand(AccountEntity.CreateAccount::new); + eventSourcedTestKit.runCommand( + (ActorRef replyTo) -> + new AccountEntity.Deposit(BigDecimal.valueOf(100), replyTo)); - TestProbe getProbe = - testKit.createTestProbe(AccountEntity.CurrentBalance.class); - ref.tell(new AccountEntity.GetBalance(getProbe.getRef())); - assertEquals( - BigDecimal.valueOf(100), - getProbe.expectMessageClass(AccountEntity.CurrentBalance.class).balance); + CommandResultWithReply< + AccountEntity.Command, + AccountEntity.Event, + AccountEntity.Account, + AccountEntity.CurrentBalance> + result = eventSourcedTestKit.runCommand(AccountEntity.GetBalance::new); + assertEquals(BigDecimal.valueOf(100), result.reply().balance); } - - // #test - // #test-events - @Test - public void storeEvents() { - TestProbe eventProbe = testKit.createTestProbe(); - testKit - .system() - .eventStream() - .tell(new EventStream.Subscribe<>(InmemJournal.Operation.class, eventProbe.getRef())); - - ActorRef ref = - testKit.spawn(AccountEntity.create("4", PersistenceId.of("Account", "4"))); - TestProbe probe = - testKit.createTestProbe(AccountEntity.OperationResult.class); - ref.tell(new AccountEntity.CreateAccount(probe.getRef())); - assertEquals( - AccountEntity.AccountCreated.INSTANCE, - eventProbe.expectMessageClass(InmemJournal.Write.class).event()); - - ref.tell(new AccountEntity.Deposit(BigDecimal.valueOf(100), probe.getRef())); - assertEquals( - BigDecimal.valueOf(100), - ((AccountEntity.Deposited) eventProbe.expectMessageClass(InmemJournal.Write.class).event()) - .amount); - } - // #test - // #test-events } // #test diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleDocSpec.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleDocSpec.scala index e94ea0b306..a32dfe3e4a 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleDocSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleDocSpec.scala @@ -5,102 +5,76 @@ package docs.akka.cluster.sharding.typed //#test -import java.util.UUID - +import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit +import akka.persistence.typed.PersistenceId import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit -import akka.persistence.typed.PersistenceId +import org.scalatest.BeforeAndAfterEach import org.scalatest.wordspec.AnyWordSpecLike //#test -//#test-events -import akka.persistence.journal.inmem.InmemJournal -import akka.actor.typed.eventstream.EventStream - -//#test-events - import docs.akka.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity -object AccountExampleDocSpec { - val inmemConfig = - //#inmem-config - """ - akka.persistence.journal.plugin = "akka.persistence.journal.inmem" - akka.persistence.journal.inmem.test-serialization = on - """ - //#inmem-config - - val snapshotConfig = - //#snapshot-store-config - s""" - akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "target/snapshot-${UUID.randomUUID().toString}" - """ - //#snapshot-store-config -} - //#test -class AccountExampleDocSpec extends ScalaTestWithActorTestKit(s""" - akka.persistence.journal.plugin = "akka.persistence.journal.inmem" - akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" - akka.persistence.snapshot-store.local.dir = "target/snapshot-${UUID.randomUUID().toString}" - """) with AnyWordSpecLike with LogCapturing { +//#testkit +class AccountExampleDocSpec + extends ScalaTestWithActorTestKit(EventSourcedBehaviorTestKit.config) + //#testkit + with AnyWordSpecLike + with BeforeAndAfterEach + with LogCapturing { + + private val eventSourcedTestKit = + EventSourcedBehaviorTestKit[AccountEntity.Command[_], AccountEntity.Event, AccountEntity.Account]( + system, + AccountEntity("1", PersistenceId("Account", "1"))) + + override protected def beforeEach(): Unit = { + super.beforeEach() + eventSourcedTestKit.clear() + } "Account" must { + "be created with zero balance" in { + val result = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_)) + result.reply shouldBe AccountEntity.Confirmed + result.event shouldBe AccountEntity.AccountCreated + result.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 0 + } + "handle Withdraw" in { - val probe = createTestProbe[AccountEntity.OperationResult]() - val ref = spawn(AccountEntity("1", PersistenceId("Account", "1"))) - ref ! AccountEntity.CreateAccount(probe.ref) - probe.expectMessage(AccountEntity.Confirmed) - ref ! AccountEntity.Deposit(100, probe.ref) - probe.expectMessage(AccountEntity.Confirmed) - ref ! AccountEntity.Withdraw(10, probe.ref) - probe.expectMessage(AccountEntity.Confirmed) + eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_)) + + val result1 = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Deposit(100, _)) + result1.reply shouldBe AccountEntity.Confirmed + result1.event shouldBe AccountEntity.Deposited(100) + result1.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 100 + + val result2 = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Withdraw(10, _)) + result2.reply shouldBe AccountEntity.Confirmed + result2.event shouldBe AccountEntity.Withdrawn(10) + result2.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 90 } "reject Withdraw overdraft" in { - val probe = createTestProbe[AccountEntity.OperationResult]() - val ref = spawn(AccountEntity("2", PersistenceId("Account", "2"))) - ref ! AccountEntity.CreateAccount(probe.ref) - probe.expectMessage(AccountEntity.Confirmed) - ref ! AccountEntity.Deposit(100, probe.ref) - probe.expectMessage(AccountEntity.Confirmed) - ref ! AccountEntity.Withdraw(110, probe.ref) - probe.expectMessageType[AccountEntity.Rejected] + eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_)) + eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Deposit(100, _)) + + val result = eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Withdraw(110, _)) + result.replyOfType[AccountEntity.Rejected] + result.hasNoEvents shouldBe true } "handle GetBalance" in { - val opProbe = createTestProbe[AccountEntity.OperationResult]() - val ref = spawn(AccountEntity("3", PersistenceId("Account", "3"))) - ref ! AccountEntity.CreateAccount(opProbe.ref) - opProbe.expectMessage(AccountEntity.Confirmed) - ref ! AccountEntity.Deposit(100, opProbe.ref) - opProbe.expectMessage(AccountEntity.Confirmed) + eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.CreateAccount(_)) + eventSourcedTestKit.runCommand[AccountEntity.OperationResult](AccountEntity.Deposit(100, _)) - val getProbe = createTestProbe[AccountEntity.CurrentBalance]() - ref ! AccountEntity.GetBalance(getProbe.ref) - getProbe.expectMessage(AccountEntity.CurrentBalance(100)) + val result = eventSourcedTestKit.runCommand[AccountEntity.CurrentBalance](AccountEntity.GetBalance(_)) + result.reply.balance shouldBe 100 + result.hasNoEvents shouldBe true } - - //#test - //#test-events - "store events" in { - val eventProbe = createTestProbe[InmemJournal.Operation]() - system.eventStream ! EventStream.Subscribe(eventProbe.ref) - - val probe = createTestProbe[AccountEntity.OperationResult]() - val ref = spawn(AccountEntity("4", PersistenceId("Account", "4"))) - ref ! AccountEntity.CreateAccount(probe.ref) - eventProbe.expectMessageType[InmemJournal.Write].event should ===(AccountEntity.AccountCreated) - - ref ! AccountEntity.Deposit(100, probe.ref) - probe.expectMessage(AccountEntity.Confirmed) - eventProbe.expectMessageType[InmemJournal.Write].event should ===(AccountEntity.Deposited(100)) - } - //#test-events - //#test } } //#test diff --git a/akka-docs/src/main/paradox/typed/persistence-testing.md b/akka-docs/src/main/paradox/typed/persistence-testing.md index 05463b1fc9..30bac9073d 100644 --- a/akka-docs/src/main/paradox/typed/persistence-testing.md +++ b/akka-docs/src/main/paradox/typed/persistence-testing.md @@ -1,49 +1,38 @@ # Testing -## Dependency +## Module info -To use Akka Persistence and Actor TestKit, add the module to your project: +To use Akka Persistence TestKit, add the module to your project: @@dependency[sbt,Maven,Gradle] { group1=com.typesafe.akka artifact1=akka-persistence-typed_$scala.binary_version$ version1=$akka.version$ group2=com.typesafe.akka - artifact2=akka-actor-testkit-typed_$scala.binary_version$ + artifact2=akka-persistence-testkit_$scala.binary_version$ version2=$akka.version$ scope2=test } +@@project-info{ projectId="akka-persistence-testkit" } + ## Unit testing -Unit testing of `EventSourcedBehavior` can be done with the @ref:[ActorTestKit](testing-async.md) -in the same way as other behaviors. +**Note!** The `EventSourcedBehaviorTestKit` is a new feature, api may have changes breaking source compatibility in future versions. -@ref:[Synchronous behavior testing](testing-sync.md) for `EventSourcedBehavior` is not supported yet, but -tracked in @github[issue #23712](#23712). +Unit testing of `EventSourcedBehavior` can be done with the @apidoc[EventSourcedBehaviorTestKit]. It supports running +one command at a time and you can assert that the synchronously returned result is as expected. The result contains the +events emitted by the command and the new state after applying the events. It also has support for verifying the reply +to a command. -You need to configure a journal, and the in-memory journal is sufficient for unit testing. To enable the -in-memory journal you need to pass the following configuration to the @scala[`ScalaTestWithActorTestKit`]@java[`TestKitJunitResource`]. +You need to configure the `ActorSystem` with the `EventSourcedBehaviorTestKit.config`. The configuration enables +the in-memory journal and snapshot storage. Scala -: @@snip [AccountExampleDocSpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleDocSpec.scala) { #inmem-config } +: @@snip [AccountExampleDocSpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleDocSpec.scala) { #testkit } Java -: @@snip [AccountExampleDocTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java) { #inmem-config } - -The `test-serialization = on` configuration of the `InmemJournal` will verify that persisted events can be serialized and deserialized. - -Optionally you can also configure a snapshot store. To enable the file based snapshot store you need to pass the -following configuration to the @scala[`ScalaTestWithActorTestKit`]@java[`TestKitJunitResource`]. - -Scala -: @@snip [AccountExampleDocSpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleDocSpec.scala) { #snapshot-store-config } - -Java -: @@snip [AccountExampleDocTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java) { #snapshot-store-config } - -Then you can `spawn` the `EventSourcedBehavior` and verify the outcome of sending commands to the actor using -the facilities of the @ref:[ActorTestKit](testing-async.md). +: @@snip [AccountExampleDocTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java) { #testkit } A full test for the `AccountEntity`, which is shown in the @ref:[Persistence Style Guide](persistence-style.md), may look like this: @@ -53,21 +42,19 @@ Scala Java : @@snip [AccountExampleDocTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java) { #test } -Note that each test case is using a different `PersistenceId` to not interfere with each other. +Serialization of commands, events and state are verified automatically. The serialization checks can be +customized with the `SerializationSettings` when creating the `EventSourcedBehaviorTestKit`. By default, +the serialization roundtrip is checked but the equality of the result of the serialization is not checked. +`equals` must be implemented @scala[(or using `case class`)] in the commands, events and state if `verifyEquality` +is enabled. -The @apidoc[akka.persistence.journal.inmem.InmemJournal$] publishes `Write` and `Delete` operations to the -`eventStream`, which makes it possible to verify that the expected events have been emitted and stored by the -`EventSourcedBehavior`. You can subscribe to to the `eventStream` with a `TestProbe` like this: - -Scala -: @@snip [AccountExampleDocSpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleDocSpec.scala) { #test-events } - -Java -: @@snip [AccountExampleDocTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java) { #test-events } +To test recovery the `restart` method of the `EventSourcedBehaviorTestKit` can be used. It will restart the +behavior, which will then recover from stored snapshot and events from previous commands. It's also possible +to populate the storage with events or simulate failures by using the underlying @apidoc[PersistenceTestKit]. ## Persistence TestKit -**Note!** The testkit is a new feature, api may have changes breaking source compatibility in future versions. +**Note!** The `PersistenceTestKit` is a new feature, api may have changes breaking source compatibility in future versions. Persistence testkit allows to check events saved in a storage, emulate storage operations and exceptions. To use the testkit you need to add the following dependency in your project: @@ -187,8 +174,10 @@ to the @ref:[reference configuration](../general/configuration-reference.md#conf ## Integration testing -The in-memory journal and file based snapshot store can be used also for integration style testing of a single -`ActorSystem`, for example when using Cluster Sharding with a single Cluster node. +`EventSourcedBehavior` actors can be tested with the @ref:[ActorTestKit](testing-async.md) together with +other actors. The in-memory journal and snapshot storage from the @ref:[Persistence TestKit](#persistence-testkit) +can be used also for integration style testing of a single `ActorSystem`, for example when using Cluster Sharding +with a single Cluster node. For tests that involve more than one Cluster node you have to use another journal and snapshot store. While it's possible to use the @ref:[Persistence Plugin Proxy](../persistence-plugins.md#persistence-plugin-proxy) diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/EventSourcedBehaviorTestKitImpl.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/EventSourcedBehaviorTestKitImpl.scala new file mode 100644 index 0000000000..e520b22a4d --- /dev/null +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/EventSourcedBehaviorTestKitImpl.scala @@ -0,0 +1,187 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.testkit.internal + +import scala.collection.immutable +import scala.concurrent.duration._ +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import akka.actor.testkit.typed.scaladsl.ActorTestKit +import akka.actor.testkit.typed.scaladsl.SerializationTestKit +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.annotation.InternalApi +import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit +import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit.CommandResult +import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit.CommandResultWithReply +import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit.RestartResult +import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit.SerializationSettings +import akka.persistence.testkit.scaladsl.PersistenceTestKit +import akka.persistence.typed.PersistenceId +import akka.persistence.typed.internal.EventSourcedBehaviorImpl + +/** + * INTERNAL API + */ +@InternalApi private[akka] object EventSourcedBehaviorTestKitImpl { + final case class CommandResultImpl[Command, Event, State, Reply]( + command: Command, + events: immutable.Seq[Event], + state: State, + replyOption: Option[Reply]) + extends CommandResultWithReply[Command, Event, State, Reply] { + + override def hasNoEvents: Boolean = events.isEmpty + + override def event: Event = { + if (events.nonEmpty) events.head else throw new AssertionError("No events") + } + + override def eventOfType[E <: Event: ClassTag]: E = + ofType(event, "event") + + override def stateOfType[S <: State: ClassTag]: S = + ofType(state, "state") + + override def reply: Reply = replyOption.getOrElse(throw new AssertionError("No reply")) + + override def replyOfType[R <: Reply: ClassTag]: R = + ofType(reply, "reply") + + // cast with nice error message + private def ofType[A: ClassTag](obj: Any, errorParam: String): A = { + obj match { + case a: A => a + case other => + val expectedClass = implicitly[ClassTag[A]].runtimeClass + throw new AssertionError( + s"Expected $errorParam class [${expectedClass.getName}], " + + s"but was [${other.getClass.getName}]") + } + } + } + + final case class RestartResultImpl[State](state: State) extends RestartResult[State] +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] class EventSourcedBehaviorTestKitImpl[Command, Event, State]( + actorTestKit: ActorTestKit, + behavior: Behavior[Command], + serializationSettings: SerializationSettings) + extends EventSourcedBehaviorTestKit[Command, Event, State] { + + import EventSourcedBehaviorTestKitImpl._ + + private def system: ActorSystem[_] = actorTestKit.system + + override val persistenceTestKit: PersistenceTestKit = PersistenceTestKit(system) + + private val probe = actorTestKit.createTestProbe[Any]() + private val stateProbe = actorTestKit.createTestProbe[State]() + private var actor: ActorRef[Command] = actorTestKit.spawn(behavior) + private def internalActor = actor.unsafeUpcast[Any] + private val persistenceId: PersistenceId = { + internalActor ! EventSourcedBehaviorImpl.GetPersistenceId(probe.ref) + try { + probe.expectMessageType[PersistenceId] + } catch { + case NonFatal(_) => + throw new IllegalArgumentException("Only EventSourcedBehavior, or nested EventSourcedBehavior allowed.") + } + } + private val serializationTestKit = new SerializationTestKit(system) + + private var emptyStateVerified = false + + persistenceTestKit.clearByPersistenceId(persistenceId.id) + + override def runCommand(command: Command): CommandResult[Command, Event, State] = { + if (serializationSettings.enabled && serializationSettings.verifyCommands) + verifySerializationAndThrow(command, "Command") + + if (serializationSettings.enabled && !emptyStateVerified) { + internalActor ! EventSourcedBehaviorImpl.GetState(stateProbe.ref) + val emptyState = stateProbe.receiveMessage() + verifySerializationAndThrow(emptyState, "Empty State") + emptyStateVerified = true + } + + // FIXME we can expand the api of persistenceTestKit to read from storage from a seqNr instead + val oldEvents = + persistenceTestKit.persistedInStorage(persistenceId.id).map(_.asInstanceOf[Event]) + + actor ! command + + internalActor ! EventSourcedBehaviorImpl.GetState(stateProbe.ref) + val newState = stateProbe.receiveMessage() + + val newEvents = + persistenceTestKit.persistedInStorage(persistenceId.id).map(_.asInstanceOf[Event]).drop(oldEvents.size) + + if (serializationSettings.enabled) { + if (serializationSettings.verifyEvents) { + newEvents.foreach(verifySerializationAndThrow(_, "Event")) + } + + if (serializationSettings.verifyState) + verifySerializationAndThrow(newState, "State") + } + + CommandResultImpl[Command, Event, State, Nothing](command, newEvents, newState, None) + } + + override def runCommand[R](creator: ActorRef[R] => Command): CommandResultWithReply[Command, Event, State, R] = { + val replyProbe = actorTestKit.createTestProbe[R]() + val command = creator(replyProbe.ref) + val result = runCommand(command) + + val reply = try { + replyProbe.receiveMessage(Duration.Zero) + } catch { + case NonFatal(_) => + throw new AssertionError(s"Missing expected reply for command [$command].") + } finally { + replyProbe.stop() + } + + if (serializationSettings.enabled && serializationSettings.verifyCommands) + verifySerializationAndThrow(reply, "Reply") + + CommandResultImpl[Command, Event, State, R](result.command, result.events, result.state, Some(reply)) + } + + override def restart(): RestartResult[State] = { + actorTestKit.stop(actor) + actor = actorTestKit.spawn(behavior) + internalActor ! EventSourcedBehaviorImpl.GetState(stateProbe.ref) + try { + val state = stateProbe.receiveMessage() + RestartResultImpl(state) + } catch { + case NonFatal(_) => + throw new IllegalStateException("Could not restart. Maybe exception from event handler. See logs.") + } + } + + override def clear(): Unit = { + persistenceTestKit.clearByPersistenceId(persistenceId.id) + restart() + } + + private def verifySerializationAndThrow(obj: Any, errorMessagePrefix: String): Unit = { + try { + serializationTestKit.verifySerialization(obj, serializationSettings.verifyEquality) + } catch { + case NonFatal(exc) => + throw new IllegalArgumentException(s"$errorMessagePrefix [$obj] isn't serializable.", exc) + } + } + +} diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/javadsl/EventSourcedBehaviorTestKit.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/javadsl/EventSourcedBehaviorTestKit.scala new file mode 100644 index 0000000000..42b1330e00 --- /dev/null +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/javadsl/EventSourcedBehaviorTestKit.scala @@ -0,0 +1,246 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.testkit.javadsl + +import java.util.function.{ Function => JFunction } +import java.util.{ List => JList } + +import scala.reflect.ClassTag + +import com.typesafe.config.Config + +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.annotation.ApiMayChange +import akka.annotation.DoNotInherit +import akka.persistence.testkit.scaladsl +import akka.util.ccompat.JavaConverters._ + +/** + * Testing of [[akka.persistence.typed.javadsl.EventSourcedBehavior]] implementations. + * It supports running one command at a time and you can assert that the synchronously returned result is as expected. + * The result contains the events emitted by the command and the new state after applying the events. + * It also has support for verifying the reply to a command. + * + * Serialization of commands, events and state are verified automatically. + */ +@ApiMayChange +object EventSourcedBehaviorTestKit { + + /** + * The configuration to be included in the configuration of the `ActorSystem`. Typically used as + * constructor parameter to `TestKitJunitResource`. The configuration enables the in-memory + * journal and snapshot storage. + */ + val config: Config = scaladsl.EventSourcedBehaviorTestKit.config + + val enabledSerializationSettings: SerializationSettings = new SerializationSettings( + enabled = true, + verifyEquality = false, + verifyCommands = true, + verifyEvents = true, + verifyState = true) + + val disabledSerializationSettings: SerializationSettings = new SerializationSettings( + enabled = false, + verifyEquality = false, + verifyCommands = false, + verifyEvents = false, + verifyState = false) + + /** + * Customization of which serialization checks that are performed. + * `equals` must be implemented (or using `case class`) when `verifyEquality` is enabled. + */ + final class SerializationSettings( + val enabled: Boolean, + val verifyEquality: Boolean, + val verifyCommands: Boolean, + val verifyEvents: Boolean, + val verifyState: Boolean) { + + def withEnabled(value: Boolean): SerializationSettings = + copy(enabled = value) + + def withVerifyEquality(value: Boolean): SerializationSettings = + copy(verifyEquality = value) + + def withVerifyCommands(value: Boolean): SerializationSettings = + copy(verifyCommands = value) + + def withVerifyEvents(value: Boolean): SerializationSettings = + copy(verifyEvents = value) + + def withVerifyState(value: Boolean): SerializationSettings = + copy(verifyState = value) + + private def copy( + enabled: Boolean = this.enabled, + verifyEquality: Boolean = this.verifyEquality, + verifyCommands: Boolean = this.verifyCommands, + verifyEvents: Boolean = this.verifyEvents, + verifyState: Boolean = this.verifyState): SerializationSettings = { + new SerializationSettings(enabled, verifyEquality, verifyCommands, verifyEvents, verifyState) + } + } + + /** + * Factory method to create a new EventSourcedBehaviorTestKit. + */ + def create[Command, Event, State]( + system: ActorSystem[_], + behavior: Behavior[Command]): EventSourcedBehaviorTestKit[Command, Event, State] = + create(system, behavior, enabledSerializationSettings) + + /** + * Factory method to create a new EventSourcedBehaviorTestKit with custom [[SerializationSettings]]. + * + * Note that `equals` must be implemented in the commands, events and state if `verifyEquality` is enabled. + */ + def create[Command, Event, State]( + system: ActorSystem[_], + behavior: Behavior[Command], + serializationSettings: SerializationSettings): EventSourcedBehaviorTestKit[Command, Event, State] = { + val scaladslSettings = new scaladsl.EventSourcedBehaviorTestKit.SerializationSettings( + enabled = serializationSettings.enabled, + verifyEquality = serializationSettings.verifyEquality, + verifyCommands = serializationSettings.verifyCommands, + verifyEvents = serializationSettings.verifyEvents, + verifyState = serializationSettings.verifyState) + new EventSourcedBehaviorTestKit(scaladsl.EventSourcedBehaviorTestKit(system, behavior, scaladslSettings)) + } + + /** + * The result of running a command. + */ + @DoNotInherit class CommandResult[Command, Event, State]( + delegate: scaladsl.EventSourcedBehaviorTestKit.CommandResult[Command, Event, State]) { + + /** + * The command that was run. + */ + def command: Command = + delegate.command + + /** + * The events that were emitted by the command, and persisted. + * In many cases only one event is emitted and then it's more convenient to use [[CommandResult.event]] + * or [[CommandResult.eventOfType]]. + */ + def events: JList[Event] = + delegate.events.asJava + + /** + * `true` if no events were emitted by the command. + */ + def hasNoEvents: Boolean = + delegate.hasNoEvents + + /** + * The first event. It will throw `AssertionError` if there is no event. + */ + def event: Event = + delegate.event + + /** + * The first event as a given expected type. It will throw `AssertionError` if there is no event or + * if the event is of a different type. + */ + def eventOfType[E <: Event](eventClass: Class[E]): E = + delegate.eventOfType(ClassTag[E](eventClass)) + + /** + * The state after applying the events. + */ + def state: State = + delegate.state + + /** + * The state as a given expected type. It will throw `AssertionError` if the state is of a different type. + */ + def stateOfType[S <: State](stateClass: Class[S]): S = + delegate.stateOfType(ClassTag[S](stateClass)) + } + + /** + * The result of running a command with a `ActorRef replyTo`, i.e. the `runCommand` with a + * `Function, Command>` parameter. + */ + final class CommandResultWithReply[Command, Event, State, Reply]( + delegate: scaladsl.EventSourcedBehaviorTestKit.CommandResultWithReply[Command, Event, State, Reply]) + extends CommandResult[Command, Event, State](delegate) { + + /** + * The reply. It will throw `AssertionError` if there was no reply. + */ + def reply: Reply = + delegate.reply + + /** + * The reply as a given expected type. It will throw `AssertionError` if there is no reply or + * if the reply is of a different type. + */ + def replyOfType[R <: Reply](replyClass: Class[R]): R = + delegate.replyOfType(ClassTag[R](replyClass)) + } + + /** + * The result of restarting the behavior. + */ + final class RestartResult[State](delegate: scaladsl.EventSourcedBehaviorTestKit.RestartResult[State]) { + + /** + * The state after recovery. + */ + def state: State = + delegate.state + } + +} + +@ApiMayChange +final class EventSourcedBehaviorTestKit[Command, Event, State]( + delegate: scaladsl.EventSourcedBehaviorTestKit[Command, Event, State]) { + + import EventSourcedBehaviorTestKit._ + + private val _persistenceTestKit = new PersistenceTestKit(delegate.persistenceTestKit) + + /** + * Run one command through the behavior. The returned result contains emitted events and the state + * after applying the events. + */ + def runCommand(command: Command): CommandResult[Command, Event, State] = + new CommandResult(delegate.runCommand(command)) + + /** + * Run one command with a `replyTo: ActorRef` through the behavior. The returned result contains emitted events, + * the state after applying the events, and the reply. + */ + def runCommand[R](creator: JFunction[ActorRef[R], Command]): CommandResultWithReply[Command, Event, State, R] = + new CommandResultWithReply(delegate.runCommand(replyTo => creator.apply(replyTo))) + + /** + * Restart the behavior, which will then recover from stored snapshot and events. Can be used for testing + * that the recovery is correct. + */ + def restart(): RestartResult[State] = + new RestartResult(delegate.restart()) + + /** + * Clears the in-memory journal and snapshot storage and restarts the behavior. + */ + def clear(): Unit = + delegate.clear() + + /** + * The underlying `PersistenceTestKit` for the in-memory journal and snapshot storage. + * Can be useful for advanced testing scenarios, such as simulating failures or + * populating the journal with events that are used for replay. + */ + def persistenceTestKit: PersistenceTestKit = + _persistenceTestKit +} diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/javadsl/PersistenceTestKit.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/javadsl/PersistenceTestKit.scala index 75e381593c..cd91ff12c4 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/javadsl/PersistenceTestKit.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/javadsl/PersistenceTestKit.scala @@ -19,9 +19,9 @@ import akka.util.ccompat.JavaConverters._ * Class for testing persisted events in persistent actors. */ @ApiMayChange -class PersistenceTestKit(system: ActorSystem) { +class PersistenceTestKit(scalaTestkit: ScalaTestKit) { - private val scalaTestkit = new ScalaTestKit(system) + def this(system: ActorSystem) = this(new ScalaTestKit(system)) /** * Check that nothing has been saved in the storage. diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/EventSourcedBehaviorTestKit.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/EventSourcedBehaviorTestKit.scala new file mode 100644 index 0000000000..2aed301b88 --- /dev/null +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/EventSourcedBehaviorTestKit.scala @@ -0,0 +1,224 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.testkit.scaladsl + +import scala.collection.immutable +import scala.reflect.ClassTag + +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory + +import akka.actor.testkit.typed.scaladsl.ActorTestKit +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.annotation.ApiMayChange +import akka.annotation.DoNotInherit +import akka.persistence.testkit.PersistenceTestKitPlugin +import akka.persistence.testkit.internal.EventSourcedBehaviorTestKitImpl + +/** + * Testing of [[akka.persistence.typed.scaladsl.EventSourcedBehavior]] implementations. + * It supports running one command at a time and you can assert that the synchronously returned result is as expected. + * The result contains the events emitted by the command and the new state after applying the events. + * It also has support for verifying the reply to a command. + * + * Serialization of commands, events and state are verified automatically. + */ +@ApiMayChange +object EventSourcedBehaviorTestKit { + + /** + * The configuration to be included in the configuration of the `ActorSystem`. Typically used as + * constructor parameter to `ScalaTestWithActorTestKit`. The configuration enables the in-memory + * journal and snapshot storage. + */ + val config: Config = ConfigFactory.parseString(""" + akka.persistence.testkit.events.serialize = off + """).withFallback(PersistenceTestKitPlugin.config) + + object SerializationSettings { + val enabled: SerializationSettings = new SerializationSettings( + enabled = true, + verifyEquality = false, + verifyCommands = true, + verifyEvents = true, + verifyState = true) + + val disabled: SerializationSettings = new SerializationSettings( + enabled = false, + verifyEquality = false, + verifyCommands = false, + verifyEvents = false, + verifyState = false) + } + + /** + * Customization of which serialization checks that are performed. + * `equals` must be implemented (or using `case class`) when `verifyEquality` is enabled. + */ + final class SerializationSettings private[akka] ( + val enabled: Boolean, + val verifyEquality: Boolean, + val verifyCommands: Boolean, + val verifyEvents: Boolean, + val verifyState: Boolean) { + + def withEnabled(value: Boolean): SerializationSettings = + copy(enabled = value) + + def withVerifyEquality(value: Boolean): SerializationSettings = + copy(verifyEquality = value) + + def withVerifyCommands(value: Boolean): SerializationSettings = + copy(verifyCommands = value) + + def withVerifyEvents(value: Boolean): SerializationSettings = + copy(verifyEvents = value) + + def withVerifyState(value: Boolean): SerializationSettings = + copy(verifyState = value) + + private def copy( + enabled: Boolean = this.enabled, + verifyEquality: Boolean = this.verifyEquality, + verifyCommands: Boolean = this.verifyCommands, + verifyEvents: Boolean = this.verifyEvents, + verifyState: Boolean = this.verifyState): SerializationSettings = { + new SerializationSettings(enabled, verifyEquality, verifyCommands, verifyEvents, verifyState) + } + } + + /** + * Factory method to create a new EventSourcedBehaviorTestKit. + */ + def apply[Command, Event, State]( + system: ActorSystem[_], + behavior: Behavior[Command]): EventSourcedBehaviorTestKit[Command, Event, State] = + apply(system, behavior, SerializationSettings.enabled) + + /** + * Factory method to create a new EventSourcedBehaviorTestKit with custom [[SerializationSettings]]. + * + * Note that `equals` must be implemented (or using `case class`) in the commands, events and state if + * `verifyEquality` is enabled. + */ + def apply[Command, Event, State]( + system: ActorSystem[_], + behavior: Behavior[Command], + serializationSettings: SerializationSettings): EventSourcedBehaviorTestKit[Command, Event, State] = + new EventSourcedBehaviorTestKitImpl(ActorTestKit(system), behavior, serializationSettings) + + /** + * The result of running a command. + */ + @DoNotInherit trait CommandResult[Command, Event, State] { + + /** + * The command that was run. + */ + def command: Command + + /** + * The events that were emitted by the command, and persisted. + * In many cases only one event is emitted and then it's more convenient to use [[CommandResult.event]] + * or [[CommandResult.eventOfType]]. + */ + def events: immutable.Seq[Event] + + /** + * `true` if no events were emitted by the command. + */ + def hasNoEvents: Boolean + + /** + * The first event. It will throw `AssertionError` if there is no event. + */ + def event: Event + + /** + * The first event as a given expected type. It will throw `AssertionError` if there is no event or + * if the event is of a different type. + */ + def eventOfType[E <: Event: ClassTag]: E + + /** + * The state after applying the events. + */ + def state: State + + /** + * The state as a given expected type. It will throw `AssertionError` if the state is of a different type. + */ + def stateOfType[S <: State: ClassTag]: S + } + + /** + * The result of running a command with a `replyTo: ActorRef[R]`, i.e. the `runCommand` with a + * `ActorRef[R] => Command` parameter. + */ + @DoNotInherit trait CommandResultWithReply[Command, Event, State, Reply] + extends CommandResult[Command, Event, State] { + + /** + * The reply. It will throw `AssertionError` if there was no reply. + */ + def reply: Reply + + /** + * The reply as a given expected type. It will throw `AssertionError` if there is no reply or + * if the reply is of a different type. + */ + def replyOfType[R <: Reply: ClassTag]: R + } + + /** + * The result of restarting the behavior. + */ + @DoNotInherit trait RestartResult[State] { + + /** + * The state after recovery. + */ + def state: State + } + +} + +@ApiMayChange +@DoNotInherit trait EventSourcedBehaviorTestKit[Command, Event, State] { + + import EventSourcedBehaviorTestKit._ + + /** + * Run one command through the behavior. The returned result contains emitted events and the state + * after applying the events. + */ + def runCommand(command: Command): CommandResult[Command, Event, State] + + /** + * Run one command with a `replyTo: ActorRef[R]` through the behavior. The returned result contains emitted events, + * the state after applying the events, and the reply. + */ + def runCommand[R](creator: ActorRef[R] => Command): CommandResultWithReply[Command, Event, State, R] + + /** + * Restart the behavior, which will then recover from stored snapshot and events. Can be used for testing + * that the recovery is correct. + */ + def restart(): RestartResult[State] + + /** + * Clears the in-memory journal and snapshot storage and restarts the behavior. + */ + def clear(): Unit + + /** + * The underlying `PersistenceTestKit` for the in-memory journal and snapshot storage. + * Can be useful for advanced testing scenarios, such as simulating failures or + * populating the journal with events that are used for replay. + */ + def persistenceTestKit: PersistenceTestKit +} diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala index f8fa04a348..aae6f4de07 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala @@ -10,12 +10,19 @@ import scala.util.Try import com.typesafe.config.Config -import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId } +import akka.actor.ActorSystem +import akka.actor.ClassicActorSystemProvider +import akka.actor.ExtendedActorSystem +import akka.actor.Extension +import akka.actor.ExtensionId import akka.actor.typed.{ ActorSystem => TypedActorSystem } import akka.annotation.ApiMayChange -import akka.persistence.{ Persistence, PersistentRepr, SnapshotMetadata } +import akka.persistence.Persistence +import akka.persistence.PersistentRepr +import akka.persistence.SnapshotMetadata import akka.persistence.testkit._ -import akka.persistence.testkit.internal.{ InMemStorageExtension, SnapshotStorageEmulatorExtension } +import akka.persistence.testkit.internal.InMemStorageExtension +import akka.persistence.testkit.internal.SnapshotStorageEmulatorExtension import akka.testkit.TestProbe private[testkit] trait CommonTestKitOps[S, P] extends ClearOps with PolicyOpsTestKit[P] { @@ -293,7 +300,7 @@ private[testkit] trait PersistenceTestKitOps[S, P] /** * Persist `snapshots` into storage in order. */ - def persistForRecovery(persistenceId: String, snapshots: immutable.Seq[Any]): Unit + def persistForRecovery(persistenceId: String, events: immutable.Seq[Any]): Unit /** * Retrieve all snapshots saved in storage by persistence id. @@ -480,9 +487,9 @@ class PersistenceTestKit(system: ActorSystem) override def failNextNDeletes(persistenceId: String, n: Int, cause: Throwable): Unit = failNextNOpsCond((pid, op) => pid == persistenceId && op.isInstanceOf[DeleteEvents], n, cause) - def persistForRecovery(persistenceId: String, snapshots: immutable.Seq[Any]): Unit = { - storage.addAny(persistenceId, snapshots) - addToIndex(persistenceId, snapshots.size) + def persistForRecovery(persistenceId: String, events: immutable.Seq[Any]): Unit = { + storage.addAny(persistenceId, events) + addToIndex(persistenceId, events.size) } def persistedInStorage(persistenceId: String): immutable.Seq[Any] = @@ -494,9 +501,7 @@ class PersistenceTestKit(system: ActorSystem) @ApiMayChange object PersistenceTestKit { - def apply(system: ActorSystem): PersistenceTestKit = new PersistenceTestKit(system) - - def apply(system: TypedActorSystem[_]): PersistenceTestKit = apply(system.classicSystem) + def apply(system: ClassicActorSystemProvider): PersistenceTestKit = new PersistenceTestKit(system.classicSystem) object Settings extends ExtensionId[Settings] { diff --git a/akka-persistence-testkit/src/test/resources/logback-test.xml b/akka-persistence-testkit/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..3d08c776bd --- /dev/null +++ b/akka-persistence-testkit/src/test/resources/logback-test.xml @@ -0,0 +1,31 @@ + + + + + + + + %date{ISO8601} %-5level %logger %marker - %msg MDC: {%mdc}%n + + + + + + + + + + + + + + + diff --git a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/scaladsl/EventSourcedBehaviorTestKitSpec.scala b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/scaladsl/EventSourcedBehaviorTestKitSpec.scala new file mode 100644 index 0000000000..39869b475d --- /dev/null +++ b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/scaladsl/EventSourcedBehaviorTestKitSpec.scala @@ -0,0 +1,302 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.testkit.scaladsl + +import java.io.NotSerializableException + +import org.scalatest.wordspec.AnyWordSpecLike + +import akka.Done +import akka.actor.testkit.typed.TestException +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.LoggingTestKit +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.ActorContext +import akka.actor.typed.scaladsl.Behaviors +import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKitSpec.TestCounter.NotSerializableState +import akka.persistence.typed.PersistenceId +import akka.persistence.typed.internal.JournalFailureException +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.serialization.DisabledJavaSerializer +import akka.serialization.jackson.CborSerializable +import akka.util.unused + +object EventSourcedBehaviorTestKitSpec { + + object TestCounter { + sealed trait Command + case object Increment extends Command with CborSerializable + final case class IncrementWithConfirmation(replyTo: ActorRef[Done]) extends Command with CborSerializable + case class IncrementSeveral(n: Int) extends Command with CborSerializable + final case class GetValue(replyTo: ActorRef[State]) extends Command with CborSerializable + + sealed trait Event + final case class Incremented(delta: Int) extends Event with CborSerializable + + sealed trait State + final case class RealState(value: Int, history: Vector[Int]) extends State with CborSerializable + + case object IncrementWithNotSerializableEvent extends Command with CborSerializable + final case class NotSerializableEvent(delta: Int) extends Event + + case object IncrementWithNotSerializableState extends Command with CborSerializable + final case class IncrementedWithNotSerializableState(delta: Int) extends Event with CborSerializable + final case class NotSerializableState(value: Int, history: Vector[Int]) extends State + + case object NotSerializableCommand extends Command + + final case class IncrementWithNotSerializableReply(replyTo: ActorRef[NotSerializableReply.type]) + extends Command + with CborSerializable + object NotSerializableReply + + def apply(persistenceId: PersistenceId): Behavior[Command] = + apply(persistenceId, RealState(0, Vector.empty)) + + def apply(persistenceId: PersistenceId, emptyState: State): Behavior[Command] = + Behaviors.setup(ctx => counter(ctx, persistenceId, emptyState)) + + private def counter( + @unused ctx: ActorContext[Command], + persistenceId: PersistenceId, + emptyState: State): EventSourcedBehavior[Command, Event, State] = { + EventSourcedBehavior.withEnforcedReplies[Command, Event, State]( + persistenceId, + emptyState, + commandHandler = (state, command) => + command match { + case Increment => + Effect.persist(Incremented(1)).thenNoReply() + + case IncrementWithConfirmation(replyTo) => + Effect.persist(Incremented(1)).thenReply(replyTo)(_ => Done) + + case IncrementSeveral(n: Int) => + val events = (1 to n).map(_ => Incremented(1)) + Effect.persist(events).thenNoReply() + + case IncrementWithNotSerializableEvent => + Effect.persist(NotSerializableEvent(1)).thenNoReply() + + case IncrementWithNotSerializableState => + Effect.persist(IncrementedWithNotSerializableState(1)).thenNoReply() + + case IncrementWithNotSerializableReply(replyTo) => + Effect.persist(Incremented(1)).thenReply(replyTo)(_ => NotSerializableReply) + + case NotSerializableCommand => + Effect.noReply + + case GetValue(replyTo) => + Effect.reply(replyTo)(state) + + }, + eventHandler = { + case (RealState(value, history), Incremented(delta)) => + if (delta <= 0) + throw new IllegalStateException("Delta must be positive") + RealState(value + delta, history :+ value) + case (RealState(value, history), NotSerializableEvent(delta)) => + RealState(value + delta, history :+ value) + case (RealState(value, history), IncrementedWithNotSerializableState(delta)) => + NotSerializableState(value + delta, history :+ value) + case (state: NotSerializableState, _) => + throw new IllegalStateException(state.toString) + }) + } + } +} + +class EventSourcedBehaviorTestKitSpec + extends ScalaTestWithActorTestKit(EventSourcedBehaviorTestKit.config) + with AnyWordSpecLike + with LogCapturing { + import EventSourcedBehaviorTestKitSpec._ + + private val persistenceId = PersistenceId.ofUniqueId("test") + private val behavior = TestCounter(persistenceId) + + private def createTestKit() = { + EventSourcedBehaviorTestKit[TestCounter.Command, TestCounter.Event, TestCounter.State](system, behavior) + } + + "EventSourcedBehaviorTestKit" must { + + "run commands" in { + val eventSourcedTestKit = createTestKit() + + val result1 = eventSourcedTestKit.runCommand(TestCounter.Increment) + result1.event should ===(TestCounter.Incremented(1)) + result1.state should ===(TestCounter.RealState(1, Vector(0))) + + val result2 = eventSourcedTestKit.runCommand(TestCounter.Increment) + result2.event should ===(TestCounter.Incremented(1)) + result2.state should ===(TestCounter.RealState(2, Vector(0, 1))) + + result2.eventOfType[TestCounter.Incremented].delta should ===(1) + intercept[AssertionError] { + // wrong event type + result2.eventOfType[TestCounter.NotSerializableEvent].delta should ===(1) + } + } + + "run command emitting several events" in { + val eventSourcedTestKit = createTestKit() + + val result = eventSourcedTestKit.runCommand(TestCounter.IncrementSeveral(3)) + result.events should ===(List(TestCounter.Incremented(1), TestCounter.Incremented(1), TestCounter.Incremented(1))) + result.state should ===(TestCounter.RealState(3, Vector(0, 1, 2))) + } + + "run commands with reply" in { + val eventSourcedTestKit = createTestKit() + + val result1 = eventSourcedTestKit.runCommand[Done](TestCounter.IncrementWithConfirmation(_)) + result1.event should ===(TestCounter.Incremented(1)) + result1.state should ===(TestCounter.RealState(1, Vector(0))) + result1.reply should ===(Done) + + val result2 = eventSourcedTestKit.runCommand[Done](TestCounter.IncrementWithConfirmation(_)) + result2.event should ===(TestCounter.Incremented(1)) + result2.state should ===(TestCounter.RealState(2, Vector(0, 1))) + result2.reply should ===(Done) + } + + "detect missing reply" in { + val eventSourcedTestKit = createTestKit() + + intercept[AssertionError] { + eventSourcedTestKit.runCommand[Done](_ => TestCounter.Increment) + }.getMessage should include("Missing expected reply") + } + + "run command with reply that is not emitting events" in { + val eventSourcedTestKit = createTestKit() + + eventSourcedTestKit.runCommand(TestCounter.Increment) + val result = eventSourcedTestKit.runCommand[TestCounter.State](TestCounter.GetValue(_)) + result.hasNoEvents should ===(true) + intercept[AssertionError] { + result.event + } + result.state should ===(TestCounter.RealState(1, Vector(0))) + } + + "detect non-serializable events" in { + val eventSourcedTestKit = createTestKit() + + val exc = intercept[IllegalArgumentException] { + eventSourcedTestKit.runCommand(TestCounter.IncrementWithNotSerializableEvent) + } + (exc.getMessage should startWith).regex("Event.*isn't serializable") + exc.getCause.getClass should ===(classOf[DisabledJavaSerializer.JavaSerializationException]) + } + + "detect non-serializable state" in { + val eventSourcedTestKit = createTestKit() + + val exc = intercept[IllegalArgumentException] { + eventSourcedTestKit.runCommand(TestCounter.IncrementWithNotSerializableState) + } + (exc.getMessage should include).regex("State.*isn't serializable") + exc.getCause.getClass should ===(classOf[DisabledJavaSerializer.JavaSerializationException]) + } + + "detect non-serializable empty state" in { + val eventSourcedTestKit = + EventSourcedBehaviorTestKit[TestCounter.Command, TestCounter.Event, TestCounter.State]( + system, + TestCounter(persistenceId, NotSerializableState(0, Vector.empty))) + + val exc = intercept[IllegalArgumentException] { + eventSourcedTestKit.runCommand(TestCounter.Increment) + } + (exc.getMessage should include).regex("Empty State.*isn't serializable") + exc.getCause.getClass should ===(classOf[DisabledJavaSerializer.JavaSerializationException]) + } + + "detect non-serializable command" in { + val eventSourcedTestKit = createTestKit() + + val exc = intercept[IllegalArgumentException] { + eventSourcedTestKit.runCommand(TestCounter.NotSerializableCommand) + } + (exc.getMessage should include).regex("Command.*isn't serializable") + exc.getCause.getClass should ===(classOf[DisabledJavaSerializer.JavaSerializationException]) + } + + "detect non-serializable reply" in { + val eventSourcedTestKit = createTestKit() + + val exc = intercept[IllegalArgumentException] { + eventSourcedTestKit.runCommand(TestCounter.IncrementWithNotSerializableReply(_)) + } + (exc.getMessage should include).regex("Reply.*isn't serializable") + exc.getCause.getClass should ===(classOf[NotSerializableException]) + } + + "support test of replay" in { + val eventSourcedTestKit = createTestKit() + + eventSourcedTestKit.runCommand(TestCounter.Increment) + eventSourcedTestKit.runCommand(TestCounter.Increment) + val expectedState = TestCounter.RealState(3, Vector(0, 1, 2)) + eventSourcedTestKit.runCommand(TestCounter.Increment).state should ===(expectedState) + } + + "support test of replay from stored events" in { + val eventSourcedTestKit = createTestKit() + eventSourcedTestKit.persistenceTestKit + .persistForRecovery(persistenceId.id, List(TestCounter.Incremented(1), TestCounter.Incremented(1))) + eventSourcedTestKit.restart().state should ===(TestCounter.RealState(2, Vector(0, 1))) + } + + "support test of invalid events" in { + val eventSourcedTestKit = createTestKit() + eventSourcedTestKit.persistenceTestKit + .persistForRecovery(persistenceId.id, List(TestCounter.Incremented(1), TestCounter.Incremented(-1))) + intercept[IllegalStateException] { + eventSourcedTestKit.restart() + } + + } + + "only allow EventSourcedBehavior" in { + intercept[IllegalArgumentException] { + EventSourcedBehaviorTestKit[TestCounter.Command, TestCounter.Event, TestCounter.State]( + system, + Behaviors.empty[TestCounter.Command]) + } + } + + "support test of failures" in { + val eventSourcedTestKit = createTestKit() + eventSourcedTestKit.runCommand(TestCounter.Increment) + eventSourcedTestKit.runCommand(TestCounter.Increment) + eventSourcedTestKit.persistenceTestKit.failNextPersisted(persistenceId.id, TestException("DB err")) + LoggingTestKit.error[JournalFailureException].expect { + intercept[AssertionError] { + eventSourcedTestKit.runCommand(TestCounter.Increment) + } + } + + eventSourcedTestKit.restart().state should ===(TestCounter.RealState(2, Vector(0, 1))) + eventSourcedTestKit.runCommand(TestCounter.Increment).state should ===(TestCounter.RealState(3, Vector(0, 1, 2))) + } + + "have possibility to clear" in { + val eventSourcedTestKit = createTestKit() + eventSourcedTestKit.runCommand(TestCounter.Increment) + eventSourcedTestKit.runCommand(TestCounter.Increment).state should ===(TestCounter.RealState(2, Vector(0, 1))) + + eventSourcedTestKit.clear() + eventSourcedTestKit.runCommand(TestCounter.Increment).state should ===(TestCounter.RealState(1, Vector(0))) + } + } + +} diff --git a/akka-persistence-typed/src/main/mima-filters/2.6.4.backwards.excludes/issue-23712-testkit.excludes b/akka-persistence-typed/src/main/mima-filters/2.6.4.backwards.excludes/issue-23712-testkit.excludes new file mode 100644 index 0000000000..1e6c5bab15 --- /dev/null +++ b/akka-persistence-typed/src/main/mima-filters/2.6.4.backwards.excludes/issue-23712-testkit.excludes @@ -0,0 +1,6 @@ +# 23712 Internals for EventSourcedBehaviorTestKit +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.typed.internal.RequestingRecoveryPermit.stashInternal") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.typed.internal.ReplayingSnapshot.stashInternal") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.typed.internal.ReplayingEvents.stashInternal") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.typed.internal.StashManagement.stashInternal") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.typed.internal.Running.stashInternal") diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index db55ebf18d..5a4a926f3b 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -8,6 +8,7 @@ import java.util.UUID import java.util.concurrent.atomic.AtomicInteger import akka.actor.typed +import akka.actor.typed.ActorRef import akka.actor.typed.BackoffSupervisorStrategy import akka.actor.typed.Behavior import akka.actor.typed.BehaviorInterceptor @@ -57,6 +58,17 @@ private[akka] object EventSourcedBehaviorImpl { } final case class WriterIdentity(instanceId: Int, writerUuid: String) + /** + * Used by EventSourcedBehaviorTestKit to retrieve the `persistenceId`. + */ + final case class GetPersistenceId(replyTo: ActorRef[PersistenceId]) extends Signal + + /** + * Used by EventSourcedBehaviorTestKit to retrieve the state. + * Can't be a Signal because those are not stashed. + */ + final case class GetState[State](replyTo: ActorRef[State]) extends InternalProtocol + } @InternalApi @@ -113,6 +125,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( ctx.log.debug("Events successfully deleted to sequence number [{}].", toSequenceNr) case (_, DeleteEventsFailed(toSequenceNr, failure)) => ctx.log.warn2("Failed to delete events to sequence number [{}] due to: {}", toSequenceNr, failure.getMessage) + case (_, EventSourcedBehaviorImpl.GetPersistenceId(replyTo)) => replyTo ! persistenceId } // do this once, even if the actor is restarted diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index 4049b6ff5f..da06fb1294 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -20,6 +20,7 @@ import akka.persistence.typed.EventsSeq import akka.persistence.typed.RecoveryCompleted import akka.persistence.typed.RecoveryFailed import akka.persistence.typed.SingleEventSeq +import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetState import akka.persistence.typed.internal.ReplayingEvents.ReplayingState import akka.persistence.typed.internal.Running.WithSeqNrAccessible import akka.util.OptionVal @@ -87,11 +88,12 @@ private[akka] final class ReplayingEvents[C, E, S]( override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = { msg match { - case JournalResponse(r) => onJournalResponse(r) - case SnapshotterResponse(r) => onSnapshotterResponse(r) - case RecoveryTickEvent(snap) => onRecoveryTick(snap) - case cmd: IncomingCommand[C] => onCommand(cmd) - case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit + case JournalResponse(r) => onJournalResponse(r) + case SnapshotterResponse(r) => onSnapshotterResponse(r) + case RecoveryTickEvent(snap) => onRecoveryTick(snap) + case cmd: IncomingCommand[C] => onCommand(cmd) + case get: GetState[S @unchecked] => stashInternal(get) + case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit } } @@ -162,7 +164,6 @@ private[akka] final class ReplayingEvents[C, E, S]( Behaviors.unhandled } else { stashInternal(cmd) - Behaviors.same } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala index efd3db7055..e71ffec70b 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala @@ -12,6 +12,7 @@ import akka.persistence._ import akka.persistence.SnapshotProtocol.LoadSnapshotFailed import akka.persistence.SnapshotProtocol.LoadSnapshotResult import akka.persistence.typed.RecoveryFailed +import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetState import akka.util.unused /** @@ -66,7 +67,8 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup Behaviors.unhandled } else onCommand(cmd) - case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit + case get: GetState[S @unchecked] => stashInternal(get) + case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit } .receiveSignal(returnPermitOnStop.orElse { case (_, PoisonPill) => @@ -118,7 +120,6 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup def onCommand(cmd: IncomingCommand[C]): Behavior[InternalProtocol] = { // during recovery, stash all incoming commands stashInternal(cmd) - Behaviors.same } def onJournalResponse(response: JournalProtocol.Response): Behavior[InternalProtocol] = { diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/RequestingRecoveryPermit.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/RequestingRecoveryPermit.scala index fc8552e93b..93cd47e653 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/RequestingRecoveryPermit.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/RequestingRecoveryPermit.scala @@ -53,7 +53,6 @@ private[akka] class RequestingRecoveryPermit[C, E, S](override val setup: Behavi Behaviors.unhandled } else { stashInternal(other) - Behaviors.same } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index 480bd80503..a9174e5440 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -36,6 +36,7 @@ import akka.persistence.typed.SnapshotCompleted import akka.persistence.typed.SnapshotFailed import akka.persistence.typed.SnapshotMetadata import akka.persistence.typed.SnapshotSelectionCriteria +import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetState import akka.persistence.typed.internal.Running.WithSeqNrAccessible import akka.persistence.typed.scaladsl.Effect import akka.util.unused @@ -104,6 +105,7 @@ private[akka] object Running { case IncomingCommand(c: C @unchecked) => onCommand(state, c) case JournalResponse(r) => onDeleteEventsJournalResponse(r, state.state) case SnapshotterResponse(r) => onDeleteSnapshotResponse(r, state.state) + case get: GetState[S @unchecked] => onGetState(get) case _ => Behaviors.unhandled } @@ -121,6 +123,12 @@ private[akka] object Running { applyEffects(cmd, state, effect.asInstanceOf[EffectImpl[E, S]]) // TODO can we avoid the cast? } + // Used by EventSourcedBehaviorTestKit to retrieve the state. + def onGetState(get: GetState[S]): Behavior[InternalProtocol] = { + get.replyTo ! state.state + this + } + @tailrec def applyEffects( msg: Any, state: RunningState[S], @@ -236,6 +244,7 @@ private[akka] object Running { msg match { case JournalResponse(r) => onJournalResponse(r) case in: IncomingCommand[C @unchecked] => onCommand(in) + case get: GetState[S @unchecked] => stashInternal(get) case SnapshotterResponse(r) => onDeleteSnapshotResponse(r, visibleState.state) case RecoveryTickEvent(_) => Behaviors.unhandled case RecoveryPermitGranted => Behaviors.unhandled @@ -249,7 +258,6 @@ private[akka] object Running { Behaviors.unhandled } else { stashInternal(cmd) - this } } @@ -348,7 +356,6 @@ private[akka] object Running { Behaviors.unhandled } else { stashInternal(cmd) - Behaviors.same } } @@ -406,6 +413,8 @@ private[akka] object Running { case _ => onDeleteSnapshotResponse(response, state.state) } + case get: GetState[S @unchecked] => + stashInternal(get) case _ => Behaviors.unhandled } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/StashManagement.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/StashManagement.scala index d474038de8..49df6ae06b 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/StashManagement.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/StashManagement.scala @@ -7,6 +7,7 @@ package akka.persistence.typed.internal import akka.actor.Dropped import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.ActorContext +import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.LoggerOps import akka.actor.typed.scaladsl.StashBuffer import akka.actor.typed.scaladsl.StashOverflowException @@ -29,8 +30,10 @@ private[akka] trait StashManagement[C, E, S] { /** * Stash a command to the internal stash buffer, which is used while waiting for persist to be completed. */ - protected def stashInternal(msg: InternalProtocol): Unit = + protected def stashInternal(msg: InternalProtocol): Behavior[InternalProtocol] = { stash(msg, stashState.internalStashBuffer) + Behaviors.same + } /** * Stash a command to the user stash buffer, which is used when `Stash` effect is used. diff --git a/build.sbt b/build.sbt index eb929573fd..63c5e1c222 100644 --- a/build.sbt +++ b/build.sbt @@ -281,6 +281,7 @@ lazy val persistenceTestkit = akkaModule("akka-persistence-testkit") .dependsOn( persistenceTyped % "compile->compile;provided->provided;test->test", testkit % "compile->compile;test->test", + actorTestkitTyped, persistenceTck % "test") .settings(Dependencies.persistenceTestKit) .settings(AutomaticModuleName.settings("akka.persistence.testkit")) @@ -450,6 +451,7 @@ lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed") actorTestkitTyped % "test->test", actorTypedTests % "test->test", persistenceTyped % "test->test", + persistenceTestkit % "test->test", remote % "compile->CompileJdk9;test->test", remoteTests % "test->test", remoteTests % "test->test;multi-jvm->multi-jvm", diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7efec97842..3dbe4a905a 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -243,7 +243,7 @@ object Dependencies { Provided.levelDB, Provided.levelDBNative) - val persistenceTestKit = l ++= Seq(Test.scalatest) + val persistenceTestKit = l ++= Seq(Test.scalatest, Test.logback) val persistenceShared = l ++= Seq(Provided.levelDB, Provided.levelDBNative) diff --git a/project/project-info.conf b/project/project-info.conf index ef6cf88c1e..0dadd1851b 100644 --- a/project/project-info.conf +++ b/project/project-info.conf @@ -407,6 +407,27 @@ project-info { } ] } + akka-persistence-testkit: ${project-info.shared-info} { + title: "Akka Persistence Testkit" + jpms-name: "akka.persistence.testkit" + levels: [ + { + readiness: Incubating + since: "2020-04-30" + since-version: "2.6.5" + } + ] + api-docs: [ + { + url: ${project-info.scaladoc}"persistence/testkit/scaladsl/index.html" + text: "API (Scaladoc)" + } + { + url: ${project-info.javadoc}"?akka/persistence/testkit/javadsl/package-summary.html" + text: "API (Javadoc)" + } + ] + } akka-remote: ${project-info.shared-info} { title: "Akka Remoting" jpms-name: "akka.remote"