Publish operations from InmemJournal (#28332)

* small feature that is useful for verifying that expected events were persisted
* doc example
* also enable serialization test config
This commit is contained in:
Patrik Nordwall 2020-01-13 09:46:43 +01:00 committed by GitHub
parent 4749b11be8
commit cdc45c128d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 177 additions and 10 deletions

View file

@ -7,6 +7,7 @@ package jdocs.akka.cluster.sharding.typed;
// #test
import java.math.BigDecimal;
import java.util.UUID;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
@ -20,6 +21,12 @@ import akka.persistence.typed.PersistenceId;
// #test
// #test-events
import akka.actor.typed.eventstream.EventStream;
import akka.persistence.journal.inmem.InmemJournal;
// #test-events
import org.scalatest.junit.JUnitSuite;
import static jdocs.akka.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity;
@ -33,7 +40,9 @@ public class AccountExampleDocTest
// #inmem-config
private static final String inmemConfig =
"akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n";
"akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n"
+ "akka.persistence.journal.inmem.test-serialization = on \n";
// #inmem-config
// #snapshot-store-config
@ -96,5 +105,33 @@ public class AccountExampleDocTest
BigDecimal.valueOf(100),
getProbe.expectMessageClass(AccountEntity.CurrentBalance.class).balance);
}
// #test
// #test-events
@Test
public void storeEvents() {
TestProbe<InmemJournal.Operation> eventProbe = testKit.createTestProbe();
testKit
.system()
.eventStream()
.tell(new EventStream.Subscribe<>(InmemJournal.Operation.class, eventProbe.getRef()));
ActorRef<AccountEntity.Command> ref =
testKit.spawn(AccountEntity.create("4", PersistenceId.of("Account", "4")));
TestProbe<AccountEntity.OperationResult> probe =
testKit.createTestProbe(AccountEntity.OperationResult.class);
ref.tell(new AccountEntity.CreateAccount(probe.getRef()));
assertEquals(
AccountEntity.AccountCreated.INSTANCE,
eventProbe.expectMessageClass(InmemJournal.Write.class).event());
ref.tell(new AccountEntity.Deposit(BigDecimal.valueOf(100), probe.getRef()));
assertEquals(
BigDecimal.valueOf(100),
((AccountEntity.Deposited) eventProbe.expectMessageClass(InmemJournal.Write.class).event())
.amount);
}
// #test
// #test-events
}
// #test

View file

@ -179,7 +179,7 @@ public class AccountExampleTest extends JUnitSuite {
.serializationTestKit()
.verifySerialization(new CurrentBalance(BigDecimal.valueOf(100)), false);
testKit.serializationTestKit().verifySerialization(new AccountCreated(), false);
testKit.serializationTestKit().verifySerialization(AccountCreated.INSTANCE, false);
testKit
.serializationTestKit()
.verifySerialization(new Deposited(BigDecimal.valueOf(100)), false);

View file

@ -120,7 +120,9 @@ public interface AccountExampleWithEventHandlersInState {
// Event
interface Event extends CborSerializable {}
public static class AccountCreated implements Event {}
public enum AccountCreated implements Event {
INSTANCE
}
public static class Deposited implements Event {
public final BigDecimal amount;
@ -219,7 +221,7 @@ public interface AccountExampleWithEventHandlersInState {
private ReplyEffect<Event, Account> createAccount(EmptyAccount account, CreateAccount command) {
return Effect()
.persist(new AccountCreated())
.persist(AccountCreated.INSTANCE)
.thenReply(command.replyTo, account2 -> Confirmed.INSTANCE);
}

View file

@ -114,7 +114,9 @@ public interface AccountExampleWithMutableState {
// Event
interface Event extends CborSerializable {}
public static class AccountCreated implements Event {}
public enum AccountCreated implements Event {
INSTANCE
}
public static class Deposited implements Event {
public final BigDecimal amount;
@ -212,7 +214,7 @@ public interface AccountExampleWithMutableState {
private ReplyEffect<Event, Account> createAccount(EmptyAccount account, CreateAccount command) {
return Effect()
.persist(new AccountCreated())
.persist(AccountCreated.INSTANCE)
.thenReply(command.replyTo, account2 -> Confirmed.INSTANCE);
}

View file

@ -114,7 +114,9 @@ public interface AccountExampleWithNullState {
// Event
interface Event extends CborSerializable {}
public static class AccountCreated implements Event {}
public enum AccountCreated implements Event {
INSTANCE
}
public static class Deposited implements Event {
public final BigDecimal amount;
@ -211,7 +213,7 @@ public interface AccountExampleWithNullState {
private ReplyEffect<Event, Account> createAccount(CreateAccount command) {
return Effect()
.persist(new AccountCreated())
.persist(AccountCreated.INSTANCE)
.thenReply(command.replyTo, account2 -> Confirmed.INSTANCE);
}

View file

@ -14,6 +14,12 @@ import org.scalatest.WordSpecLike
//#test
//#test-events
import akka.persistence.journal.inmem.InmemJournal
import akka.actor.typed.eventstream.EventStream
//#test-events
import docs.akka.cluster.sharding.typed.AccountExampleWithEventHandlersInState.AccountEntity
object AccountExampleDocSpec {
@ -21,6 +27,7 @@ object AccountExampleDocSpec {
//#inmem-config
"""
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
akka.persistence.journal.inmem.test-serialization = on
"""
//#inmem-config
@ -76,6 +83,24 @@ class AccountExampleDocSpec extends ScalaTestWithActorTestKit(s"""
ref ! AccountEntity.GetBalance(getProbe.ref)
getProbe.expectMessage(AccountEntity.CurrentBalance(100))
}
//#test
//#test-events
"store events" in {
val eventProbe = createTestProbe[InmemJournal.Operation]()
system.eventStream ! EventStream.Subscribe(eventProbe.ref)
val probe = createTestProbe[AccountEntity.OperationResult]()
val ref = spawn(AccountEntity("4", PersistenceId("Account", "4")))
ref ! AccountEntity.CreateAccount(probe.ref)
eventProbe.expectMessageType[InmemJournal.Write].event should ===(AccountEntity.AccountCreated)
ref ! AccountEntity.Deposit(100, probe.ref)
probe.expectMessage(AccountEntity.Confirmed)
eventProbe.expectMessageType[InmemJournal.Write].event should ===(AccountEntity.Deposited(100))
}
//#test-events
//#test
}
}
//#test

View file

@ -31,6 +31,8 @@ Scala
Java
: @@snip [AccountExampleDocTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java) { #inmem-config }
The `test-serialization = on` configuration of the `InmemJournal` will verify that persisted events can be serialized and deserialized.
Optionally you can also configure a snapshot store. To enable the file based snapshot store you need to pass the
following configuration to the @scala[`ScalaTestWithActorTestKit`]@java[`TestKitJunitResource`].
@ -53,6 +55,16 @@ Java
Note that each test case is using a different `PersistenceId` to not interfere with each other.
The @apidoc[akka.persistence.journal.inmem.InmemJournal$] publishes `Write` and `Delete` operations to the
`eventStream`, which makes it possible to verify that the expected events have been emitted and stored by the
`EventSourcedBehavior`. You can subscribe to to the `eventStream` with a `TestProbe` like this:
Scala
: @@snip [AccountExampleDocSpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/AccountExampleDocSpec.scala) { #test-events }
Java
: @@snip [AccountExampleDocTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/AccountExampleDocTest.java) { #test-events }
## Integration testing
The in-memory journal and file based snapshot store can be used also for integration style testing of a single

View file

@ -9,6 +9,8 @@ import scala.concurrent.Future
import scala.util.Try
import scala.util.control.NonFatal
import akka.annotation.ApiMayChange
import akka.annotation.InternalApi
import akka.persistence.journal.AsyncWriteJournal
import akka.persistence.PersistentRepr
import akka.persistence.AtomicWrite
@ -17,12 +19,28 @@ import akka.serialization.Serializers
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
/**
* The InmemJournal publishes writes and deletes to the `eventStream`, which tests may use to
* verify that expected events have been persisted or deleted.
*
* InmemJournal is only intended to be used for tests and therefore binary backwards compatibility
* of the published messages are not guaranteed.
*/
@ApiMayChange
object InmemJournal {
sealed trait Operation
final case class Write(event: Any, persistenceId: String, sequenceNr: Long) extends Operation
final case class Delete(persistenceId: String, toSequenceNr: Long) extends Operation
}
/**
* INTERNAL API.
*
* In-memory journal for testing purposes only.
*/
private[persistence] class InmemJournal(cfg: Config) extends AsyncWriteJournal with InmemMessages {
@InternalApi private[persistence] class InmemJournal(cfg: Config) extends AsyncWriteJournal with InmemMessages {
def this() = this(ConfigFactory.empty())
@ -34,11 +52,14 @@ private[persistence] class InmemJournal(cfg: Config) extends AsyncWriteJournal w
private val serialization = SerializationExtension(context.system)
private val eventStream = context.system.eventStream
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
try {
for (w <- messages; p <- w.payload) {
verifySerialization(p.payload)
add(p)
eventStream.publish(InmemJournal.Write(p.payload, p.persistenceId, p.sequenceNr))
}
Future.successful(Nil) // all good
} catch {
@ -67,6 +88,7 @@ private[persistence] class InmemJournal(cfg: Config) extends AsyncWriteJournal w
delete(persistenceId, snr)
snr += 1
}
eventStream.publish(InmemJournal.Delete(persistenceId, toSeqNr))
Future.successful(())
}
@ -84,7 +106,7 @@ private[persistence] class InmemJournal(cfg: Config) extends AsyncWriteJournal w
/**
* INTERNAL API.
*/
private[persistence] trait InmemMessages {
@InternalApi private[persistence] trait InmemMessages {
// persistenceId -> persistent message
var messages = Map.empty[String, Vector[PersistentRepr]]
// persistenceId -> highest used sequence number

View file

@ -0,0 +1,65 @@
/*
* Copyright (C) 2017-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.journal.inmem
import akka.actor.Props
import akka.persistence.PersistenceSpec
import akka.persistence.PersistentActor
import akka.testkit._
object InmemJournalSpec {
def testProps(name: String): Props =
Props(new TestPersistentActor(name))
final case class Cmd(s: String)
final case class Delete(toSeqNr: Long)
final case class Evt(s: String)
class TestPersistentActor(name: String) extends PersistentActor {
override def persistenceId: String = name
override def receiveRecover: Receive = {
case Evt(_) =>
}
override def receiveCommand: Receive = {
case Cmd(s) => persist(Evt(s))(_ => ())
case Delete(toSeqNr) => deleteMessages(toSeqNr)
}
}
}
class InmemJournalSpec
extends PersistenceSpec(PersistenceSpec.config("inmem", "InmemJournalSpec"))
with ImplicitSender {
import InmemJournalSpec._
system.eventStream.subscribe(testActor, classOf[InmemJournal.Operation])
"InmemJournal" must {
"publish writes" in {
val p1 = system.actorOf(testProps("p1"))
p1 ! Cmd("A")
p1 ! Cmd("B")
expectMsg(InmemJournal.Write(Evt("A"), "p1", 1L))
expectMsg(InmemJournal.Write(Evt("B"), "p1", 2L))
}
"publish deletes" in {
val p1 = system.actorOf(testProps("p2"))
p1 ! Cmd("A")
p1 ! Cmd("B")
p1 ! Cmd("C")
p1 ! Delete(2)
expectMsg(InmemJournal.Write(Evt("A"), "p2", 1L))
expectMsg(InmemJournal.Write(Evt("B"), "p2", 2L))
expectMsg(InmemJournal.Write(Evt("C"), "p2", 3L))
expectMsg(InmemJournal.Delete("p2", 2L))
}
}
}