Typed persistent event adapters/wrappers (#25050)

Typed persistent event adapters/wrappers
This commit is contained in:
Christopher Batey 2018-05-25 10:23:04 +01:00 committed by GitHub
parent c8064d069b
commit 8eb7b1ea81
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 507 additions and 66 deletions

View file

@ -19,6 +19,9 @@ private[akka] object ActorTestKitGuardian {
sealed trait TestKitCommand
final case class SpawnActor[T](name: String, behavior: Behavior[T], replyTo: ActorRef[ActorRef[T]], props: Props) extends TestKitCommand
final case class SpawnActorAnonymous[T](behavior: Behavior[T], replyTo: ActorRef[ActorRef[T]], props: Props) extends TestKitCommand
final case class StopActor[T](ref: ActorRef[T], replyTo: ActorRef[Ack.type]) extends TestKitCommand
final case object Ack
val testKitGuardian: Behavior[TestKitCommand] = Behaviors.receive[TestKitCommand] {
case (ctx, SpawnActor(name, behavior, reply, props))
@ -27,6 +30,10 @@ private[akka] object ActorTestKitGuardian {
case (ctx, SpawnActorAnonymous(behavior, reply, props))
reply ! ctx.spawnAnonymous(behavior, props)
Behaviors.same
case (ctx, StopActor(ref, reply))
ctx.stop(ref)
reply ! Ack
Behaviors.same
}
}
@ -43,7 +50,7 @@ private[akka] object TestKitUtils {
val startFrom = classToStartFrom.getName
val filteredStack = Thread.currentThread.getStackTrace.toIterator
.map(_.getClassName)
// drop until we find the first occurence of classToStartFrom
// drop until we find the first occurrence of classToStartFrom
.dropWhile(!_.startsWith(startFrom))
// then continue to the next entry after classToStartFrom that makes sense
.dropWhile {

View file

@ -11,7 +11,7 @@ import akka.actor.testkit.typed.TestKitSettings
import akka.actor.testkit.typed.internal.{ ActorTestKitGuardian, TestKitUtils }
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.Await
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
object ActorTestKit {

View file

@ -4,14 +4,12 @@
package akka.actor.typed;
import akka.actor.*;
import akka.actor.setup.ActorSystemSetup;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import java.util.Optional;
import java.util.function.Function;
import static junit.framework.TestCase.assertSame;
@ -107,5 +105,4 @@ public class ExtensionsTest extends JUnitSuite {
}
}
}

View file

@ -29,6 +29,7 @@ import akka.japi.{ Pair ⇒ JPair }
def scalaAnyToNone[A, B]: A Option[B] = none
def scalaAnyTwoToNone[A, B, C]: (A, B) Option[C] = two2none
def scalaAnyTwoToUnit[A, B]: (A, B) Unit = two2unit
def scalaAnyThreeToUnit[A, B, C]: (A, B, C) Unit = three2unit
def scalaAnyTwoToTrue[A, B]: (A, B) Boolean = two2true
def scalaAnyThreeToFalse[A, B, C]: (A, B, C) Boolean = three2false
def scalaAnyThreeToThird[A, B, C]: (A, B, C) C = three2third.asInstanceOf[(A, B, C) C]
@ -53,6 +54,8 @@ import akka.japi.{ Pair ⇒ JPair }
private val two2unit = (_: Any, _: Any) ()
private val three2unit = (_: Any, _: Any, _: Any) ()
private val three2false = (_: Any, _: Any, _: Any) false
private val three2third = (_: Any, _: Any, third: Any) third

View file

@ -30,7 +30,7 @@ This module is currently marked as @ref:[may change](../common/may-change.md) in
Let's start with a simple example. The minimum required for a `PersistentBehavior` is:
Scala
: @@snip [BasicPersistentBehaviorsSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsSpec.scala) { #structure }
: @@snip [BasicPersistentBehaviorsCompileOnly.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala) { #structure }
Java
: @@snip [BasicPersistentBehaviorsTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java) { #structure }
@ -216,7 +216,7 @@ Since it is strongly discouraged to perform side effects in applyEvent,
side effects should be performed once recovery has completed @scala[in the `onRecoveryCompleted` callback.] @java[by overriding `onRecoveryCompleted`]
Scala
: @@snip [BasicPersistentBehaviorsSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsSpec.scala) { #recovery }
: @@snip [BasicPersistentBehaviorsCompileOnly.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala) { #recovery }
Java
: @@snip [BasicPersistentBehaviorsTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java) { #recovery }
@ -228,18 +228,40 @@ The `onRecoveryCompleted` takes on an `ActorContext` and the current `State`.
Persistence typed allows you to use event tags without using @ref[`EventAdapter`](../persistence.md#event-adapters):
Scala
: @@snip [BasicPersistentActorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsSpec.scala) { #tagging }
: @@snip [BasicPersistentActorCompileOnly.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala) { #tagging }
Java
: @@snip [BasicPersistentBehaviorsTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java) { #tagging }
## Event adapters
Event adapters can be programmatically added to your `PersistentBehavior`s that can convert from your `Event` type
to another type that is then passed to the journal.
Defining an event adapter is done by extending an EventAdapter:
Scala
: @@snip [x]($akka$/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala) { #event-wrapper }
Java
: @@snip [x]($akka$/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #event-wrapper }
Then install it on a persistent behavior:
Scala
: @@snip [x]($akka$/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala) { #install-event-adapter }
Java
: @@snip [x]($akka$/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #install-event-adapter }
## Wrapping Persistent Behaviors
When creating a `PersistentBehavior`, it is possible to wrap `PersistentBehavior` in
other behaviors such as `Behaviors.setup` in order to access the `ActorContext` object. For instance
to access the actor logging upon taking snapshots for debug purpose.
Scala
: @@snip [BasicPersistentActorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsSpec.scala) { #wrapPersistentBehavior }
: @@snip [BasicPersistentActorCompileOnly.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala) { #wrapPersistentBehavior }
Java
: @@snip [BasicPersistentBehaviorsTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java) { #wrapPersistentBehavior }

View file

@ -0,0 +1,42 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed
import akka.annotation.InternalApi
abstract class EventAdapter[E, P] {
/**
* Type of the event to persist
*/
type Per = P
/**
* Transform event on the way to the journal
*/
def toJournal(e: E): Per
/**
* Transform the event on recovery from the journal.
* Note that this is not called in any read side so will need to be applied
* manually when using Query.
*/
def fromJournal(p: Per): E
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object NoOpEventAdapter {
private val i = new NoOpEventAdapter[Nothing]
def instance[E]: NoOpEventAdapter[E] = i.asInstanceOf[NoOpEventAdapter[E]]
}
/**
* INTERNAL API
*/
@InternalApi private[akka] class NoOpEventAdapter[E] extends EventAdapter[E, Any] {
override def toJournal(e: E): Any = e
override def fromJournal(p: Any): E = p.asInstanceOf[E]
}

View file

@ -79,7 +79,7 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set
try {
response match {
case ReplayedMessage(repr)
val event = repr.payload.asInstanceOf[E]
val event = setup.eventAdapter.fromJournal(repr.payload.asInstanceOf[setup.eventAdapter.Per])
try {
val newState = state.copy(

View file

@ -4,6 +4,7 @@
package akka.persistence.typed.internal
import akka.Done
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.MutableBehavior
@ -16,6 +17,7 @@ import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._
import scala.annotation.tailrec
import scala.collection.immutable
import scala.util.{ Failure, Success }
/**
* INTERNAL API
@ -99,7 +101,8 @@ private[akka] object EventsourcedRunning {
// the invalid event, in case such validation is implemented in the event handler.
// also, ensure that there is an event handler for each single event
val newState = state.applyEvent(setup, event)
val eventToPersist = tagEvent(event)
val eventToPersist = adaptEvent(event)
val newState2 = internalPersist(newState, eventToPersist)
@ -120,7 +123,7 @@ private[akka] object EventsourcedRunning {
(currentState.applyEvent(setup, event), shouldSnapshot)
}
val eventsToPersist = events.map(tagEvent)
val eventsToPersist = events.map(adaptEvent)
val newState2 = internalPersistAll(eventsToPersist, newState)
@ -143,9 +146,13 @@ private[akka] object EventsourcedRunning {
}
}
def tagEvent(event: E): Any = {
def adaptEvent(event: E): Any = {
val tags = setup.tagger(event)
if (tags.isEmpty) event else Tagged(event, tags)
val adaptedEvent = setup.eventAdapter.toJournal(event)
if (tags.isEmpty)
adaptedEvent
else
Tagged(adaptedEvent, tags)
}
setup.setMdc(runningCmdsMdc)
@ -265,11 +272,11 @@ private[akka] object EventsourcedRunning {
outer: Behavior[InternalProtocol]): Behavior[InternalProtocol] = {
response match {
case SaveSnapshotSuccess(meta)
setup.context.log.debug("Save snapshot successful, snapshot metadata: [{}]", meta)
setup.onSnapshot(commandContext, meta, Success(Done))
outer
case SaveSnapshotFailure(meta, ex)
setup.context.log.error(ex, "Save snapshot failed, snapshot metadata: [{}]", meta)
outer // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop
setup.onSnapshot(commandContext, meta, Failure(ex))
outer
// FIXME not implemented
case DeleteSnapshotFailure(_, _) ???

View file

@ -4,17 +4,21 @@
package akka.persistence.typed.internal
import akka.Done
import akka.actor.typed.Logger
import akka.actor.{ ActorRef, ExtendedActorSystem }
import akka.actor.typed.scaladsl.{ ActorContext, StashBuffer, TimerScheduler }
import akka.annotation.InternalApi
import akka.persistence._
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.internal.EventsourcedBehavior.MDC
import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity }
import akka.persistence.typed.scaladsl.PersistentBehaviors
import akka.util.Collections.EmptyImmutableSeq
import akka.util.OptionVal
import scala.util.Try
/**
* INTERNAL API: Carry state for the Persistent behavior implementation behaviors
*/
@ -28,7 +32,9 @@ private[persistence] final class EventsourcedSetup[C, E, S](
val eventHandler: (S, E) S,
val writerIdentity: WriterIdentity,
val recoveryCompleted: (ActorContext[C], S) Unit,
val onSnapshot: (ActorContext[C], SnapshotMetadata, Try[Done]) Unit,
val tagger: E Set[String],
val eventAdapter: EventAdapter[E, _],
val snapshotWhen: (S, E, Long) Boolean,
val recovery: Recovery,
var holdingRecoveryPermit: Boolean,

View file

@ -4,15 +4,32 @@
package akka.persistence.typed.internal
import akka.Done
import akka.actor.typed
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
import akka.annotation.InternalApi
import akka.persistence._
import akka.persistence.typed.{ EventAdapter, NoOpEventAdapter }
import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity }
import akka.persistence.typed.scaladsl.{ PersistentBehavior, PersistentBehaviors }
import akka.persistence.typed.scaladsl._
import akka.util.ConstantFun
import scala.util.{ Failure, Success, Try }
@InternalApi
private[akka] object PersistentBehaviorImpl {
def defaultOnSnapshot[A](ctx: ActorContext[A], meta: SnapshotMetadata, result: Try[Done]): Unit = {
result match {
case Success(_)
ctx.log.debug("Save snapshot successful, snapshot metadata: [{}]", meta)
case Failure(t)
ctx.log.error(t, "Save snapshot failed, snapshot metadata: [{}]", meta)
}
}
}
@InternalApi
private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
persistenceId: String,
@ -23,8 +40,10 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
snapshotPluginId: Option[String] = None,
recoveryCompleted: (ActorContext[Command], State) Unit = ConstantFun.scalaAnyTwoToUnit,
tagger: Event Set[String] = (_: Event) Set.empty[String],
eventAdapter: EventAdapter[Event, _] = NoOpEventAdapter.instance[Event],
snapshotWhen: (State, Event, Long) Boolean = ConstantFun.scalaAnyThreeToFalse,
recovery: Recovery = Recovery()
recovery: Recovery = Recovery(),
onSnapshot: (ActorContext[Command], SnapshotMetadata, Try[Done]) Unit = PersistentBehaviorImpl.defaultOnSnapshot[Command] _
) extends PersistentBehavior[Command, Event, State] with EventsourcedStashReferenceManagement {
override def apply(context: typed.ActorContext[Command]): Behavior[Command] = {
@ -45,7 +64,9 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
eventHandler,
WriterIdentity.newIdentity(),
recoveryCompleted,
onSnapshot,
tagger,
eventAdapter,
snapshotWhen,
recovery,
holdingRecoveryPermit = false,
@ -126,4 +147,16 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
def withTagger(tagger: Event Set[String]): PersistentBehavior[Command, Event, State] =
copy(tagger = tagger)
/**
* Adapt the event before sending to the journal e.g. wrapping the event in a type
* the journal understands
*/
def eventAdapter(adapter: EventAdapter[Event, _]): PersistentBehavior[Command, Event, State] =
copy(eventAdapter = adapter)
/**
* The `callback` function is called to notify the actor that a snapshot has finished
*/
def onSnapshot(callback: (ActorContext[Command], SnapshotMetadata, Try[Done]) Unit): PersistentBehavior[Command, Event, State] =
copy(onSnapshot = callback)
}

View file

@ -73,7 +73,7 @@ final class EventHandlerBuilder[Event, State >: Null]() {
}
result match {
case OptionVal.None throw new MatchError(s"No match found for event [${event.getClass}] and state [${state.getClass}]")
case OptionVal.None throw new MatchError(s"No match found for event [${event.getClass}] and state [${state.getClass}]. Has this event been stored using an EventAdapter?")
case OptionVal.Some(s) s
}
}

View file

@ -4,16 +4,19 @@
package akka.persistence.typed.javadsl
import java.util.Collections
import java.util.{ Collections, Optional }
import akka.actor.typed
import akka.actor.typed.Behavior
import akka.actor.typed.Behavior.DeferredBehavior
import akka.actor.typed.javadsl.ActorContext
import akka.annotation.ApiMayChange
import akka.persistence.typed._
import akka.annotation.{ ApiMayChange, InternalApi }
import akka.persistence.SnapshotMetadata
import akka.persistence.typed.{ EventAdapter, _ }
import akka.persistence.typed.internal._
import scala.util.{ Failure, Success }
/** Java API */
@ApiMayChange
abstract class PersistentBehavior[Command, Event, State >: Null](val persistenceId: String) extends DeferredBehavior[Command] {
@ -78,6 +81,21 @@ abstract class PersistentBehavior[Command, Event, State >: Null](val persistence
*/
def onRecoveryCompleted(ctx: ActorContext[Command], state: State): Unit = {}
/**
* Override to get notified when a snapshot is finished.
* The default implementation logs failures at error and success writes at
* debug.
*
* @param result None if successful otherwise contains the exception thrown when snapshotting
*/
def onSnapshot(ctx: ActorContext[Command], meta: SnapshotMetadata, result: Optional[Throwable]): Unit = {
if (result.isPresent) {
ctx.getLog.error(result.get(), "Save snapshot failed, snapshot metadata: [{}]", meta)
} else {
ctx.getLog.debug("Save snapshot successful, snapshot metadata: [{}]", meta)
}
}
/**
* Override and define that snapshot should be saved every N events.
*
@ -105,6 +123,8 @@ abstract class PersistentBehavior[Command, Event, State >: Null](val persistence
*/
def tagsFor(event: Event): java.util.Set[String] = Collections.emptySet()
def eventAdapter(): EventAdapter[Event, _] = NoOpEventAdapter.instance[Event]
/**
* INTERNAL API: DeferredBehavior init
*/
@ -133,6 +153,12 @@ abstract class PersistentBehavior[Command, Event, State >: Null](val persistence
.onRecoveryCompleted((ctx, state) onRecoveryCompleted(ctx.asJava, state))
.snapshotWhen(snapshotWhen)
.withTagger(tagger)
.onSnapshot((ctx, meta, result) {
onSnapshot(ctx.asJava, meta, result match {
case Success(_) Optional.empty()
case Failure(t) Optional.of(t)
})
}).eventAdapter(eventAdapter())
}
}

View file

@ -4,12 +4,16 @@
package akka.persistence.typed.scaladsl
import akka.Done
import akka.actor.typed.Behavior.DeferredBehavior
import akka.actor.typed.scaladsl.ActorContext
import akka.annotation.InternalApi
import akka.persistence._
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.internal._
import scala.util.Try
object PersistentBehaviors {
// we use this type internally, however it's easier for users to understand the function, so we use it in external api
@ -69,6 +73,11 @@ trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command
*/
def onRecoveryCompleted(callback: (ActorContext[Command], State) Unit): PersistentBehavior[Command, Event, State]
/**
* The `callback` function is called to notify when a snapshot is complete.
*/
def onSnapshot(callback: (ActorContext[Command], SnapshotMetadata, Try[Done]) Unit): PersistentBehavior[Command, Event, State]
/**
* Initiates a snapshot if the given function returns true.
* When persisting multiple events at once the snapshot is triggered after all the events have
@ -108,4 +117,11 @@ trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command
* The `tagger` function should give event tags, which will be used in persistence query
*/
def withTagger(tagger: Event Set[String]): PersistentBehavior[Command, Event, State]
/**
* Transform the event in another type before giving to the journal. Can be used to wrap events
* in types Journals understand but is of a different type than `Event`.
*/
def eventAdapter(adapter: EventAdapter[Event, _]): PersistentBehavior[Command, Event, State]
}

View file

@ -6,6 +6,7 @@ package akka.persistence.typed.javadsl;
import akka.actor.Scheduler;
import akka.actor.typed.ActorRef;
import akka.persistence.typed.EventAdapter;
import akka.actor.testkit.typed.javadsl.TestInbox;
import akka.util.Timeout;
@ -17,7 +18,32 @@ import static akka.actor.typed.javadsl.AskPattern.ask;
public class PersistentActorCompileOnlyTest {
public static abstract class Simple {
//#event-wrapper
public static class Wrapper<T> {
private final T t;
public Wrapper(T t) {
this.t = t;
}
public T getT() {
return t;
}
}
public static class EventAdapterExample extends EventAdapter<SimpleEvent, Wrapper<SimpleEvent>> {
@Override
public Wrapper<SimpleEvent> toJournal(SimpleEvent simpleEvent) {
return new Wrapper<>(simpleEvent);
}
@Override
public SimpleEvent fromJournal(Wrapper<SimpleEvent> simpleEventWrapper) {
return simpleEventWrapper.getT();
}
}
//#event-wrapper
//#command
public static class SimpleCommand {
public final String data;
@ -45,6 +71,7 @@ public class PersistentActorCompileOnlyTest {
SimpleState(List<String> events) {
this.events = events;
}
SimpleState() {
this.events = new ArrayList<>();
}
@ -79,7 +106,15 @@ public class PersistentActorCompileOnlyTest {
return (state, event) -> state.addEvent(event);
}
//#event-handler
//#install-event-adapter
@Override
public EventAdapter<SimpleEvent, Wrapper<SimpleEvent>> eventAdapter() {
return new EventAdapterExample();
}
//#install-event-adapter
};
//#behavior
}

View file

@ -8,35 +8,58 @@ import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.Signal;
import akka.actor.typed.SupervisorStrategy;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Adapter;
import akka.actor.typed.javadsl.Behaviors;
import akka.japi.Pair;
import akka.japi.function.Function3;
import akka.persistence.typed.scaladsl.PersistentBehaviorSpec;
import akka.japi.function.Function;
import akka.persistence.SnapshotMetadata;
import akka.persistence.query.EventEnvelope;
import akka.persistence.query.NoOffset;
import akka.persistence.query.PersistenceQuery;
import akka.persistence.query.Sequence;
import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal;
import akka.persistence.typed.EventAdapter;
import akka.persistence.typed.NoOpEventAdapter;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import java.io.Serializable;
import java.time.Duration;
import java.util.*;
import static akka.persistence.typed.scaladsl.PersistentBehaviorSpec.*;
import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals;
public class PersistentActorTest extends JUnitSuite {
public class PersistentActorJavaDslTest extends JUnitSuite {
public static final Config config = conf().withFallback(ConfigFactory.load());
@ClassRule
public static final TestKitJunitResource testKit = new TestKitJunitResource(PersistentBehaviorSpec.conf());
public static final TestKitJunitResource testKit = new TestKitJunitResource(config);
static final Incremented timeoutEvent = new Incremented(100);
static final State emptyState = new State(0, Collections.emptyList());
static final Incremented terminatedEvent = new Incremented(10);
public PersistentActorTest() {
super();
}
private LeveldbReadJournal queries = PersistenceQuery.get(Adapter.toUntyped(testKit.system()))
.getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier());
interface Command {
private ActorMaterializer materializer = ActorMaterializer.create(Adapter.toUntyped(testKit.system()));
interface Command extends Serializable {
}
public static class Increment implements Command {
@ -79,7 +102,7 @@ public class PersistentActorTest extends JUnitSuite {
}
}
public static class Incremented {
public static class Incremented implements Serializable {
private final int delta;
public Incremented(int delta) {
@ -108,7 +131,7 @@ public class PersistentActorTest extends JUnitSuite {
}
}
public static class State {
public static class State implements Serializable {
private final int value;
private final List<Integer> history;
@ -153,44 +176,78 @@ public class PersistentActorTest extends JUnitSuite {
private PersistentBehavior<Command, Incremented, State> counter(String persistenceId, ActorRef<Pair<State, Incremented>> probe) {
ActorRef<String> loggingProbe = TestProbe.create(String.class, testKit.system()).ref();
return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false);
ActorRef<Optional<Throwable>> snapshotProbe = TestProbe.<Optional<Throwable>>create(testKit.system()).ref();
return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false, (e) -> Collections.emptySet(), snapshotProbe, new NoOpEventAdapter<>());
}
private PersistentBehavior<Command, Incremented, State> counter(String persistenceId,
ActorRef<Pair<State, Incremented>> probe,
Function<Incremented, Set<String>> tagger) {
ActorRef<String> loggingProbe = TestProbe.create(String.class, testKit.system()).ref();
ActorRef<Optional<Throwable>> snapshotProbe = TestProbe.<Optional<Throwable>>create(testKit.system()).ref();
return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false, tagger, snapshotProbe, new NoOpEventAdapter<>());
}
private PersistentBehavior<Command, Incremented, State> counter(String persistenceId,
ActorRef<Pair<State, Incremented>> probe,
EventAdapter<Incremented, ?> transformer) {
ActorRef<String> loggingProbe = TestProbe.create(String.class, testKit.system()).ref();
ActorRef<Optional<Throwable>> snapshotProbe = TestProbe.<Optional<Throwable>>create(testKit.system()).ref();
return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false, e -> Collections.emptySet(), snapshotProbe, transformer);
}
private PersistentBehavior<Command, Incremented, State> counter(String persistenceId) {
return counter(persistenceId,
TestProbe.<Pair<State, Incremented>>create(testKit.system()).ref(),
TestProbe.<String>create(testKit.system()).ref(),
(s, i, l) -> false);
(s, i, l) -> false,
(i) -> Collections.emptySet(),
TestProbe.<Optional<Throwable>>create(testKit.system()).ref(),
new NoOpEventAdapter<>()
);
}
private PersistentBehavior<Command, Incremented, State> counter(
String persistenceId,
Function3<State, Incremented, Long, Boolean> snapshot
Function3<State, Incremented, Long, Boolean> snapshot,
ActorRef<Optional<Throwable>> snapshotProbe
) {
return counter(persistenceId,
testKit.<Pair<State, Incremented>>createTestProbe().ref(),
testKit.<String>createTestProbe().ref(), snapshot);
testKit.<String>createTestProbe().ref(),
snapshot,
e -> Collections.emptySet(),
snapshotProbe,
new NoOpEventAdapter<>());
}
private PersistentBehavior<Command, Incremented, State> counter(
String persistentId,
ActorRef<Pair<State, Incremented>> eventProbe,
ActorRef<String> loggingProbe) {
return counter(persistentId, eventProbe, loggingProbe, (s, i, l) -> false);
return counter(persistentId, eventProbe, loggingProbe, (s, i, l) -> false, e -> Collections.emptySet(),
TestProbe.<Optional<Throwable>>create(testKit.system()).ref(),
new NoOpEventAdapter<>()
);
}
private PersistentBehavior<Command, Incremented, State> counter(
String persistentId,
ActorRef<Pair<State, Incremented>> eventProbe,
Function3<State, Incremented, Long, Boolean> snapshot) {
return counter(persistentId, eventProbe, testKit.<String>createTestProbe().ref(), snapshot);
return counter(persistentId, eventProbe, testKit.<String>createTestProbe().ref(), snapshot, (e) -> Collections.emptySet(),
TestProbe.<Optional<Throwable>>create(testKit.system()).ref(), new NoOpEventAdapter<>()
);
}
private PersistentBehavior<Command, Incremented, State> counter(
private <A> PersistentBehavior<Command, Incremented, State> counter(
String persistentId,
ActorRef<Pair<State, Incremented>> eventProbe,
ActorRef<String> loggingProbe,
Function3<State, Incremented, Long, Boolean> snapshot) {
Function3<State, Incremented, Long, Boolean> snapshot,
Function<Incremented, Set<String>> tagsFunction,
ActorRef<Optional<Throwable>> snapshotProbe,
EventAdapter<Incremented, A> transformer) {
return new PersistentBehavior<Command, Incremented, State>(persistentId) {
@Override
public CommandHandler<Command, Incremented, State> commandHandler() {
@ -251,9 +308,29 @@ public class PersistentActorTest extends JUnitSuite {
try {
return snapshot.apply(state, event, sequenceNr);
} catch (Exception e) {
return false;
throw new RuntimeException(e);
}
}
@Override
public Set<String> tagsFor(Incremented incremented) {
try {
return tagsFunction.apply(incremented);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void onSnapshot(ActorContext<Command> ctx, SnapshotMetadata meta, Optional<Throwable> result) {
snapshotProbe.tell(result);
}
@Override
public EventAdapter<Incremented, A> eventAdapter() {
return transformer;
}
};
}
@ -327,12 +404,15 @@ public class PersistentActorTest extends JUnitSuite {
@Test
public void snapshot() {
PersistentBehavior<Command, Incremented, State> snapshoter = counter("c11", (s, e, l) -> s.value % 2 == 0);
TestProbe<Optional<Throwable>> snapshotProbe = testKit.createTestProbe();
PersistentBehavior<Command, Incremented, State> snapshoter = counter("c11", (s, e, l) -> s.value % 2 == 0, snapshotProbe.ref());
ActorRef<Command> c = testKit.spawn(snapshoter);
c.tell(Increment.instance);
c.tell(Increment.instance);
snapshotProbe.expectMessage(Optional.empty());
c.tell(Increment.instance);
TestProbe<State> probe = testKit.createTestProbe();
c.tell(new GetValue(probe.ref()));
probe.expectMessage(new State(3, Arrays.asList(0, 1, 2)));
@ -366,5 +446,56 @@ public class PersistentActorTest extends JUnitSuite {
signalProbe.expectNoMessage();
}
@Test
public void tagEvent() throws Exception {
TestProbe<Pair<State, Incremented>> eventProbe = testKit.createTestProbe();
TestProbe<State> stateProbe = testKit.createTestProbe();
ActorRef<Command> c = testKit.spawn(counter("tagging", eventProbe.ref(), e -> Sets.newHashSet("tag1", "tag2")));
c.tell(new Increment());
c.tell(new GetValue(stateProbe.ref()));
stateProbe.expectMessage(new State(1, Collections.singletonList(0)));
List<EventEnvelope> events = queries.currentEventsByTag("tag1", NoOffset.getInstance()).runWith(Sink.seq(), materializer)
.toCompletableFuture().get();
assertEquals(Lists.newArrayList(
new EventEnvelope(new Sequence(1), "tagging", 1, new Incremented(1))
), events);
}
@Test
public void transformEvent() throws Exception {
TestProbe<Pair<State, Incremented>> eventProbe = testKit.createTestProbe();
TestProbe<State> stateProbe = testKit.createTestProbe();
ActorRef<Command> c = testKit.spawn(counter("transform", eventProbe.ref(), new WrapperEventAdapter()));
c.tell(new Increment());
c.tell(new GetValue(stateProbe.ref()));
stateProbe.expectMessage(new State(1, Collections.singletonList(0)));
List<EventEnvelope> events = queries.currentEventsByPersistenceId("transform", 0, Long.MAX_VALUE)
.runWith(Sink.seq(), materializer).toCompletableFuture().get();
assertEquals(Lists.newArrayList(
new EventEnvelope(new Sequence(1), "transform", 1, new Wrapper<>(new Incremented(1)))
), events);
ActorRef<Command> c2 = testKit.spawn(counter("transform", eventProbe.ref(), new WrapperEventAdapter()));
c2.tell(new GetValue(stateProbe.ref()));
stateProbe.expectMessage(new State(1, Collections.singletonList(0)));
}
//event-wrapper
class WrapperEventAdapter extends EventAdapter<Incremented,Wrapper> {
@Override
public Wrapper toJournal(Incremented incremented) {
return new Wrapper<>(incremented);
}
@Override
public Incremented fromJournal(Wrapper wrapper) {
return (Incremented) wrapper.t();
}
}
//event-wrapper
// FIXME test with by state command handler
}

View file

@ -4,12 +4,19 @@
package akka.persistence.typed.scaladsl
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import akka.Done
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, SupervisorStrategy, Terminated, TypedAkkaSpecWithShutdown }
import akka.persistence.query.{ EventEnvelope, PersistenceQuery, Sequence }
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.snapshot.SnapshotStore
import akka.persistence.typed.EventAdapter
import akka.persistence.{ SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria }
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.actor.testkit.typed.TestKitSettings
import akka.actor.testkit.typed.scaladsl._
import com.typesafe.config.{ Config, ConfigFactory }
@ -17,9 +24,18 @@ import org.scalatest.concurrent.Eventually
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{ Success, Try }
object PersistentBehaviorSpec {
//#event-wrapper
case class Wrapper[T](t: T)
class WrapperEventAdapter[T] extends EventAdapter[T, Wrapper[T]] {
override def toJournal(e: T): Wrapper[T] = Wrapper(e)
override def fromJournal(p: Wrapper[T]): T = p.t
}
//#event-wrapper
class InMemorySnapshotStore extends SnapshotStore {
private var state = Map.empty[String, (Any, SnapshotMetadata)]
@ -38,14 +54,14 @@ object PersistentBehaviorSpec {
}
// also used from PersistentActorTest
val conf: Config = ConfigFactory.parseString(
def conf: Config = ConfigFactory.parseString(
s"""
akka.loglevel = INFO
# akka.persistence.typed.log-stashing = INFO
akka.persistence.snapshot-store.inmem.class = "akka.persistence.typed.scaladsl.PersistentBehaviorSpec$$InMemorySnapshotStore"
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.inmem"
akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
""")
@ -75,18 +91,22 @@ object PersistentBehaviorSpec {
val secondLogging = "second logging"
def counter(persistenceId: String)(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] =
counter(persistenceId, loggingActor = TestProbe[String].ref, probe = TestProbe[(State, Event)].ref)
counter(persistenceId, loggingActor = TestProbe[String].ref, probe = TestProbe[(State, Event)].ref, TestProbe[Try[Done]].ref)
def counter(persistenceId: String, logging: ActorRef[String])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] =
counter(persistenceId, loggingActor = logging, probe = TestProbe[(State, Event)].ref)
counter(persistenceId, loggingActor = logging, probe = TestProbe[(State, Event)].ref, TestProbe[Try[Done]].ref)
def counterWithProbe(persistenceId: String, probe: ActorRef[(State, Event)])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] =
counter(persistenceId, TestProbe[String].ref, probe)
counter(persistenceId, TestProbe[String].ref, probe, TestProbe[Try[Done]].ref)
def counterWithSnapshotProbe(persistenceId: String, probe: ActorRef[Try[Done]])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] =
counter(persistenceId, TestProbe[String].ref, TestProbe[(State, Event)].ref, snapshotProbe = probe)
def counter(
persistenceId: String,
loggingActor: ActorRef[String],
probe: ActorRef[(State, Event)]): PersistentBehavior[Command, Event, State] = {
probe: ActorRef[(State, Event)],
snapshotProbe: ActorRef[Try[Done]]): PersistentBehavior[Command, Event, State] = {
PersistentBehaviors.receive[Command, Event, State](
persistenceId,
initialState = State(0, Vector.empty),
@ -172,18 +192,31 @@ object PersistentBehaviorSpec {
case Incremented(delta)
probe ! ((state, evt))
State(state.value + delta, state.history :+ state.value)
})
}).onRecoveryCompleted {
case (_, _)
}
.onSnapshot {
case (_, _, result)
snapshotProbe ! result
}
}
}
class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown with Eventually {
import PersistentBehaviorSpec._
override def config: Config = PersistentBehaviorSpec.conf
override lazy val config: Config = PersistentBehaviorSpec.conf
implicit val testSettings = TestKitSettings(system)
import akka.actor.typed.scaladsl.adapter._
implicit val materializer = ActorMaterializer()(system.toUntyped)
val queries: LeveldbReadJournal = PersistenceQuery(system.toUntyped).readJournalFor[LeveldbReadJournal](
LeveldbReadJournal.Identifier)
val pidCounter = new AtomicInteger(0)
private def nextPid(): String = s"c${pidCounter.incrementAndGet()}"
@ -326,16 +359,17 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
"snapshot via predicate" in {
val pid = nextPid
val snapshotProbe = TestProbe[Try[Done]]
val alwaysSnapshot: Behavior[Command] =
Behaviors.setup { _
counter(pid).snapshotWhen { (_, _, _) true }
counterWithSnapshotProbe(pid, snapshotProbe.ref).snapshotWhen { (_, _, _) true }
}
val c = spawn(alwaysSnapshot)
val watchProbe = watcher(c)
val replyProbe = TestProbe[State]()
c ! Increment
snapshotProbe.expectMessage(Success(Done))
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
c ! LogThenStop
@ -344,6 +378,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
val probe = TestProbe[(State, Event)]()
val c2 = spawn(counterWithProbe(pid, probe.ref))
// state should be rebuilt from snapshot, no events replayed
// Fails as snapshot is async (i think)
probe.expectNoMessage()
c2 ! Increment
c2 ! GetValue(replyProbe.ref)
@ -352,14 +387,17 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
"check all events for snapshot in PersistAll" in {
val pid = nextPid
val snapshotAtTwo = counter(pid).snapshotWhen { (s, _, _) s.value == 2 }
val snapshotProbe = TestProbe[Try[Done]]
val snapshotAtTwo = counterWithSnapshotProbe(pid, snapshotProbe.ref).snapshotWhen { (s, _, _) s.value == 2 }
val c: ActorRef[Command] = spawn(snapshotAtTwo)
val watchProbe = watcher(c)
val replyProbe = TestProbe[State]()
c ! IncrementWithPersistAll(3)
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(3, Vector(0, 1, 2)))
snapshotProbe.expectMessage(Success(Done))
c ! LogThenStop
watchProbe.expectMessage("Terminated")
@ -434,6 +472,83 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
probe.expectMessage("msg received")
}
"tag events" in {
val pid = nextPid
val c = spawn(counter(pid).withTagger(_ Set("tag1", "tag2")))
val replyProbe = TestProbe[State]()
c ! Increment
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
val events = queries.currentEventsByTag("tag1").runWith(Sink.seq).futureValue
events shouldEqual List(EventEnvelope(Sequence(1), pid, 1, Incremented(1)))
}
"adapt events" in {
val pid = nextPid
val persistentBehavior = counter(pid)
val c = spawn(
//#install-event-adapter
persistentBehavior.eventAdapter(new WrapperEventAdapter[Event]))
//#install-event-adapter
val replyProbe = TestProbe[State]()
c ! Increment
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
val events = queries.currentEventsByPersistenceId(pid).runWith(Sink.seq).futureValue
events shouldEqual List(EventEnvelope(Sequence(1), pid, 1, Wrapper(Incremented(1))))
val c2 = spawn(counter(pid).eventAdapter(new WrapperEventAdapter[Event]))
c2 ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
}
"adapter multiple events with persist all" in {
val pid = nextPid
val c = spawn(counter(pid).eventAdapter(new WrapperEventAdapter[Event]))
val replyProbe = TestProbe[State]()
c ! IncrementWithPersistAll(2)
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(2, Vector(0, 1)))
val events = queries.currentEventsByPersistenceId(pid).runWith(Sink.seq).futureValue
events shouldEqual List(
EventEnvelope(Sequence(1), pid, 1, Wrapper(Incremented(1))),
EventEnvelope(Sequence(2), pid, 2, Wrapper(Incremented(1)))
)
val c2 = spawn(counter(pid).eventAdapter(new WrapperEventAdapter[Event]))
c2 ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(2, Vector(0, 1)))
}
"adapt and tag events" in {
val pid = nextPid
val c = spawn(counter(pid)
.withTagger(_ Set("tag99"))
.eventAdapter(new WrapperEventAdapter[Event]))
val replyProbe = TestProbe[State]()
c ! Increment
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
val events = queries.currentEventsByPersistenceId(pid).runWith(Sink.seq).futureValue
events shouldEqual List(EventEnvelope(Sequence(1), pid, 1, Wrapper(Incremented(1))))
val c2 = spawn(counter(pid).eventAdapter(new WrapperEventAdapter[Event]))
c2 ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
val taggedEvents = queries.currentEventsByTag("tag99").runWith(Sink.seq).futureValue
taggedEvents shouldEqual List(EventEnvelope(Sequence(1), pid, 1, Wrapper(Incremented(1))))
}
def watcher(toWatch: ActorRef[_]): TestProbe[String] = {
val probe = TestProbe[String]()
val w = Behaviors.setup[Any] { (ctx)

View file

@ -8,7 +8,7 @@ import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.scaladsl.PersistentBehaviors
object BasicPersistentBehaviorsSpec {
object BasicPersistentBehaviorsCompileOnly {
//#structure
sealed trait Command

View file

@ -396,9 +396,10 @@ lazy val actorTyped = akkaModule("akka-actor-typed")
lazy val persistenceTyped = akkaModule("akka-persistence-typed")
.dependsOn(
actorTyped,
persistence,
actorTestkitTyped % "test->test",
actorTypedTests % "test->test"
persistence % "compile->compile;test->test",
persistenceQuery % "test",
actorTypedTests % "test->test",
actorTestkitTyped % "compile->compile;test->test"
)
.settings(Dependencies.persistenceShared)
.settings(AkkaBuild.mayChangeSettings)