diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/TestKitUtils.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/TestKitUtils.scala index 335ebae056..f6c809cb25 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/TestKitUtils.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/internal/TestKitUtils.scala @@ -19,6 +19,9 @@ private[akka] object ActorTestKitGuardian { sealed trait TestKitCommand final case class SpawnActor[T](name: String, behavior: Behavior[T], replyTo: ActorRef[ActorRef[T]], props: Props) extends TestKitCommand final case class SpawnActorAnonymous[T](behavior: Behavior[T], replyTo: ActorRef[ActorRef[T]], props: Props) extends TestKitCommand + final case class StopActor[T](ref: ActorRef[T], replyTo: ActorRef[Ack.type]) extends TestKitCommand + + final case object Ack val testKitGuardian: Behavior[TestKitCommand] = Behaviors.receive[TestKitCommand] { case (ctx, SpawnActor(name, behavior, reply, props)) ⇒ @@ -27,6 +30,10 @@ private[akka] object ActorTestKitGuardian { case (ctx, SpawnActorAnonymous(behavior, reply, props)) ⇒ reply ! ctx.spawnAnonymous(behavior, props) Behaviors.same + case (ctx, StopActor(ref, reply)) ⇒ + ctx.stop(ref) + reply ! Ack + Behaviors.same } } @@ -43,7 +50,7 @@ private[akka] object TestKitUtils { val startFrom = classToStartFrom.getName val filteredStack = Thread.currentThread.getStackTrace.toIterator .map(_.getClassName) - // drop until we find the first occurence of classToStartFrom + // drop until we find the first occurrence of classToStartFrom .dropWhile(!_.startsWith(startFrom)) // then continue to the next entry after classToStartFrom that makes sense .dropWhile { diff --git a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/ActorTestKit.scala b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/ActorTestKit.scala index 412f580904..e94c0edf84 100644 --- a/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/ActorTestKit.scala +++ b/akka-actor-testkit-typed/src/main/scala/akka/actor/testkit/typed/scaladsl/ActorTestKit.scala @@ -11,7 +11,7 @@ import akka.actor.testkit.typed.TestKitSettings import akka.actor.testkit.typed.internal.{ ActorTestKitGuardian, TestKitUtils } import com.typesafe.config.{ Config, ConfigFactory } -import scala.concurrent.Await +import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ object ActorTestKit { diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/ExtensionsTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/ExtensionsTest.java index 9e83d094c5..3b85dcb1b1 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/ExtensionsTest.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/ExtensionsTest.java @@ -4,14 +4,12 @@ package akka.actor.typed; -import akka.actor.*; import akka.actor.setup.ActorSystemSetup; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.junit.Test; import org.scalatest.junit.JUnitSuite; -import java.util.Optional; import java.util.function.Function; import static junit.framework.TestCase.assertSame; @@ -107,5 +105,4 @@ public class ExtensionsTest extends JUnitSuite { } } - } diff --git a/akka-actor/src/main/scala/akka/util/ConstantFun.scala b/akka-actor/src/main/scala/akka/util/ConstantFun.scala index 8f82aeafc9..df650d5d6e 100644 --- a/akka-actor/src/main/scala/akka/util/ConstantFun.scala +++ b/akka-actor/src/main/scala/akka/util/ConstantFun.scala @@ -29,6 +29,7 @@ import akka.japi.{ Pair ⇒ JPair } def scalaAnyToNone[A, B]: A ⇒ Option[B] = none def scalaAnyTwoToNone[A, B, C]: (A, B) ⇒ Option[C] = two2none def scalaAnyTwoToUnit[A, B]: (A, B) ⇒ Unit = two2unit + def scalaAnyThreeToUnit[A, B, C]: (A, B, C) ⇒ Unit = three2unit def scalaAnyTwoToTrue[A, B]: (A, B) ⇒ Boolean = two2true def scalaAnyThreeToFalse[A, B, C]: (A, B, C) ⇒ Boolean = three2false def scalaAnyThreeToThird[A, B, C]: (A, B, C) ⇒ C = three2third.asInstanceOf[(A, B, C) ⇒ C] @@ -53,6 +54,8 @@ import akka.japi.{ Pair ⇒ JPair } private val two2unit = (_: Any, _: Any) ⇒ () + private val three2unit = (_: Any, _: Any, _: Any) ⇒ () + private val three2false = (_: Any, _: Any, _: Any) ⇒ false private val three2third = (_: Any, _: Any, third: Any) ⇒ third diff --git a/akka-docs/src/main/paradox/typed/persistence.md b/akka-docs/src/main/paradox/typed/persistence.md index b4b9e64d12..78d540a623 100644 --- a/akka-docs/src/main/paradox/typed/persistence.md +++ b/akka-docs/src/main/paradox/typed/persistence.md @@ -30,7 +30,7 @@ This module is currently marked as @ref:[may change](../common/may-change.md) in Let's start with a simple example. The minimum required for a `PersistentBehavior` is: Scala -: @@snip [BasicPersistentBehaviorsSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsSpec.scala) { #structure } +: @@snip [BasicPersistentBehaviorsCompileOnly.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala) { #structure } Java : @@snip [BasicPersistentBehaviorsTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java) { #structure } @@ -216,7 +216,7 @@ Since it is strongly discouraged to perform side effects in applyEvent, side effects should be performed once recovery has completed @scala[in the `onRecoveryCompleted` callback.] @java[by overriding `onRecoveryCompleted`] Scala -: @@snip [BasicPersistentBehaviorsSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsSpec.scala) { #recovery } +: @@snip [BasicPersistentBehaviorsCompileOnly.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala) { #recovery } Java : @@snip [BasicPersistentBehaviorsTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java) { #recovery } @@ -228,18 +228,40 @@ The `onRecoveryCompleted` takes on an `ActorContext` and the current `State`. Persistence typed allows you to use event tags without using @ref[`EventAdapter`](../persistence.md#event-adapters): Scala -: @@snip [BasicPersistentActorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsSpec.scala) { #tagging } +: @@snip [BasicPersistentActorCompileOnly.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala) { #tagging } Java : @@snip [BasicPersistentBehaviorsTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java) { #tagging } +## Event adapters + +Event adapters can be programmatically added to your `PersistentBehavior`s that can convert from your `Event` type +to another type that is then passed to the journal. + +Defining an event adapter is done by extending an EventAdapter: + +Scala +: @@snip [x]($akka$/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala) { #event-wrapper } + +Java +: @@snip [x]($akka$/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #event-wrapper } + +Then install it on a persistent behavior: + +Scala +: @@snip [x]($akka$/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala) { #install-event-adapter } + +Java +: @@snip [x]($akka$/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #install-event-adapter } + ## Wrapping Persistent Behaviors + When creating a `PersistentBehavior`, it is possible to wrap `PersistentBehavior` in other behaviors such as `Behaviors.setup` in order to access the `ActorContext` object. For instance to access the actor logging upon taking snapshots for debug purpose. Scala -: @@snip [BasicPersistentActorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsSpec.scala) { #wrapPersistentBehavior } +: @@snip [BasicPersistentActorCompileOnly.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala) { #wrapPersistentBehavior } Java : @@snip [BasicPersistentBehaviorsTest.java]($akka$/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/BasicPersistentBehaviorsTest.java) { #wrapPersistentBehavior } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventAdapter.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventAdapter.scala new file mode 100644 index 0000000000..0fd8f38fe3 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/EventAdapter.scala @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.persistence.typed + +import akka.annotation.InternalApi + +abstract class EventAdapter[E, P] { + /** + * Type of the event to persist + */ + type Per = P + /** + * Transform event on the way to the journal + */ + def toJournal(e: E): Per + + /** + * Transform the event on recovery from the journal. + * Note that this is not called in any read side so will need to be applied + * manually when using Query. + */ + def fromJournal(p: Per): E +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] object NoOpEventAdapter { + private val i = new NoOpEventAdapter[Nothing] + def instance[E]: NoOpEventAdapter[E] = i.asInstanceOf[NoOpEventAdapter[E]] +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] class NoOpEventAdapter[E] extends EventAdapter[E, Any] { + override def toJournal(e: E): Any = e + override def fromJournal(p: Any): E = p.asInstanceOf[E] +} + diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala index e976dc81de..7825a36789 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala @@ -79,7 +79,7 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set try { response match { case ReplayedMessage(repr) ⇒ - val event = repr.payload.asInstanceOf[E] + val event = setup.eventAdapter.fromJournal(repr.payload.asInstanceOf[setup.eventAdapter.Per]) try { val newState = state.copy( diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala index 126610655b..4bfb5b5c0b 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala @@ -4,6 +4,7 @@ package akka.persistence.typed.internal +import akka.Done import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.MutableBehavior @@ -16,6 +17,7 @@ import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._ import scala.annotation.tailrec import scala.collection.immutable +import scala.util.{ Failure, Success } /** * INTERNAL API @@ -99,7 +101,8 @@ private[akka] object EventsourcedRunning { // the invalid event, in case such validation is implemented in the event handler. // also, ensure that there is an event handler for each single event val newState = state.applyEvent(setup, event) - val eventToPersist = tagEvent(event) + + val eventToPersist = adaptEvent(event) val newState2 = internalPersist(newState, eventToPersist) @@ -120,7 +123,7 @@ private[akka] object EventsourcedRunning { (currentState.applyEvent(setup, event), shouldSnapshot) } - val eventsToPersist = events.map(tagEvent) + val eventsToPersist = events.map(adaptEvent) val newState2 = internalPersistAll(eventsToPersist, newState) @@ -143,9 +146,13 @@ private[akka] object EventsourcedRunning { } } - def tagEvent(event: E): Any = { + def adaptEvent(event: E): Any = { val tags = setup.tagger(event) - if (tags.isEmpty) event else Tagged(event, tags) + val adaptedEvent = setup.eventAdapter.toJournal(event) + if (tags.isEmpty) + adaptedEvent + else + Tagged(adaptedEvent, tags) } setup.setMdc(runningCmdsMdc) @@ -265,11 +272,11 @@ private[akka] object EventsourcedRunning { outer: Behavior[InternalProtocol]): Behavior[InternalProtocol] = { response match { case SaveSnapshotSuccess(meta) ⇒ - setup.context.log.debug("Save snapshot successful, snapshot metadata: [{}]", meta) + setup.onSnapshot(commandContext, meta, Success(Done)) outer case SaveSnapshotFailure(meta, ex) ⇒ - setup.context.log.error(ex, "Save snapshot failed, snapshot metadata: [{}]", meta) - outer // FIXME https://github.com/akka/akka/issues/24637 should we provide callback for this? to allow Stop + setup.onSnapshot(commandContext, meta, Failure(ex)) + outer // FIXME not implemented case DeleteSnapshotFailure(_, _) ⇒ ??? diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala index eaab929054..8787b1ec4b 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala @@ -4,17 +4,21 @@ package akka.persistence.typed.internal +import akka.Done import akka.actor.typed.Logger import akka.actor.{ ActorRef, ExtendedActorSystem } import akka.actor.typed.scaladsl.{ ActorContext, StashBuffer, TimerScheduler } import akka.annotation.InternalApi import akka.persistence._ +import akka.persistence.typed.EventAdapter import akka.persistence.typed.internal.EventsourcedBehavior.MDC import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity } import akka.persistence.typed.scaladsl.PersistentBehaviors import akka.util.Collections.EmptyImmutableSeq import akka.util.OptionVal +import scala.util.Try + /** * INTERNAL API: Carry state for the Persistent behavior implementation behaviors */ @@ -28,7 +32,9 @@ private[persistence] final class EventsourcedSetup[C, E, S]( val eventHandler: (S, E) ⇒ S, val writerIdentity: WriterIdentity, val recoveryCompleted: (ActorContext[C], S) ⇒ Unit, + val onSnapshot: (ActorContext[C], SnapshotMetadata, Try[Done]) ⇒ Unit, val tagger: E ⇒ Set[String], + val eventAdapter: EventAdapter[E, _], val snapshotWhen: (S, E, Long) ⇒ Boolean, val recovery: Recovery, var holdingRecoveryPermit: Boolean, diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala index f529832f7f..8f805d8db6 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala @@ -4,27 +4,46 @@ package akka.persistence.typed.internal +import akka.Done import akka.actor.typed import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.{ ActorContext, Behaviors } import akka.annotation.InternalApi import akka.persistence._ +import akka.persistence.typed.{ EventAdapter, NoOpEventAdapter } import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity } -import akka.persistence.typed.scaladsl.{ PersistentBehavior, PersistentBehaviors } +import akka.persistence.typed.scaladsl._ import akka.util.ConstantFun +import scala.util.{ Failure, Success, Try } + +@InternalApi +private[akka] object PersistentBehaviorImpl { + + def defaultOnSnapshot[A](ctx: ActorContext[A], meta: SnapshotMetadata, result: Try[Done]): Unit = { + result match { + case Success(_) ⇒ + ctx.log.debug("Save snapshot successful, snapshot metadata: [{}]", meta) + case Failure(t) ⇒ + ctx.log.error(t, "Save snapshot failed, snapshot metadata: [{}]", meta) + } + } +} + @InternalApi private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( persistenceId: String, initialState: State, commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State], eventHandler: (State, Event) ⇒ State, - journalPluginId: Option[String] = None, - snapshotPluginId: Option[String] = None, - recoveryCompleted: (ActorContext[Command], State) ⇒ Unit = ConstantFun.scalaAnyTwoToUnit, - tagger: Event ⇒ Set[String] = (_: Event) ⇒ Set.empty[String], - snapshotWhen: (State, Event, Long) ⇒ Boolean = ConstantFun.scalaAnyThreeToFalse, - recovery: Recovery = Recovery() + journalPluginId: Option[String] = None, + snapshotPluginId: Option[String] = None, + recoveryCompleted: (ActorContext[Command], State) ⇒ Unit = ConstantFun.scalaAnyTwoToUnit, + tagger: Event ⇒ Set[String] = (_: Event) ⇒ Set.empty[String], + eventAdapter: EventAdapter[Event, _] = NoOpEventAdapter.instance[Event], + snapshotWhen: (State, Event, Long) ⇒ Boolean = ConstantFun.scalaAnyThreeToFalse, + recovery: Recovery = Recovery(), + onSnapshot: (ActorContext[Command], SnapshotMetadata, Try[Done]) ⇒ Unit = PersistentBehaviorImpl.defaultOnSnapshot[Command] _ ) extends PersistentBehavior[Command, Event, State] with EventsourcedStashReferenceManagement { override def apply(context: typed.ActorContext[Command]): Behavior[Command] = { @@ -45,7 +64,9 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( eventHandler, WriterIdentity.newIdentity(), recoveryCompleted, + onSnapshot, tagger, + eventAdapter, snapshotWhen, recovery, holdingRecoveryPermit = false, @@ -126,4 +147,16 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( def withTagger(tagger: Event ⇒ Set[String]): PersistentBehavior[Command, Event, State] = copy(tagger = tagger) + /** + * Adapt the event before sending to the journal e.g. wrapping the event in a type + * the journal understands + */ + def eventAdapter(adapter: EventAdapter[Event, _]): PersistentBehavior[Command, Event, State] = + copy(eventAdapter = adapter) + + /** + * The `callback` function is called to notify the actor that a snapshot has finished + */ + def onSnapshot(callback: (ActorContext[Command], SnapshotMetadata, Try[Done]) ⇒ Unit): PersistentBehavior[Command, Event, State] = + copy(onSnapshot = callback) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventHandler.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventHandler.scala index c8057c6de1..3f65dc7e14 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventHandler.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventHandler.scala @@ -73,7 +73,7 @@ final class EventHandlerBuilder[Event, State >: Null]() { } result match { - case OptionVal.None ⇒ throw new MatchError(s"No match found for event [${event.getClass}] and state [${state.getClass}]") + case OptionVal.None ⇒ throw new MatchError(s"No match found for event [${event.getClass}] and state [${state.getClass}]. Has this event been stored using an EventAdapter?") case OptionVal.Some(s) ⇒ s } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala index cbda330565..d74024aeac 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala @@ -4,16 +4,19 @@ package akka.persistence.typed.javadsl -import java.util.Collections +import java.util.{ Collections, Optional } import akka.actor.typed import akka.actor.typed.Behavior import akka.actor.typed.Behavior.DeferredBehavior import akka.actor.typed.javadsl.ActorContext -import akka.annotation.ApiMayChange -import akka.persistence.typed._ +import akka.annotation.{ ApiMayChange, InternalApi } +import akka.persistence.SnapshotMetadata +import akka.persistence.typed.{ EventAdapter, _ } import akka.persistence.typed.internal._ +import scala.util.{ Failure, Success } + /** Java API */ @ApiMayChange abstract class PersistentBehavior[Command, Event, State >: Null](val persistenceId: String) extends DeferredBehavior[Command] { @@ -78,6 +81,21 @@ abstract class PersistentBehavior[Command, Event, State >: Null](val persistence */ def onRecoveryCompleted(ctx: ActorContext[Command], state: State): Unit = {} + /** + * Override to get notified when a snapshot is finished. + * The default implementation logs failures at error and success writes at + * debug. + * + * @param result None if successful otherwise contains the exception thrown when snapshotting + */ + def onSnapshot(ctx: ActorContext[Command], meta: SnapshotMetadata, result: Optional[Throwable]): Unit = { + if (result.isPresent) { + ctx.getLog.error(result.get(), "Save snapshot failed, snapshot metadata: [{}]", meta) + } else { + ctx.getLog.debug("Save snapshot successful, snapshot metadata: [{}]", meta) + } + } + /** * Override and define that snapshot should be saved every N events. * @@ -105,6 +123,8 @@ abstract class PersistentBehavior[Command, Event, State >: Null](val persistence */ def tagsFor(event: Event): java.util.Set[String] = Collections.emptySet() + def eventAdapter(): EventAdapter[Event, _] = NoOpEventAdapter.instance[Event] + /** * INTERNAL API: DeferredBehavior init */ @@ -133,6 +153,12 @@ abstract class PersistentBehavior[Command, Event, State >: Null](val persistence .onRecoveryCompleted((ctx, state) ⇒ onRecoveryCompleted(ctx.asJava, state)) .snapshotWhen(snapshotWhen) .withTagger(tagger) + .onSnapshot((ctx, meta, result) ⇒ { + onSnapshot(ctx.asJava, meta, result match { + case Success(_) ⇒ Optional.empty() + case Failure(t) ⇒ Optional.of(t) + }) + }).eventAdapter(eventAdapter()) } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala index 9258173691..d3a1aee04e 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala @@ -4,12 +4,16 @@ package akka.persistence.typed.scaladsl +import akka.Done import akka.actor.typed.Behavior.DeferredBehavior import akka.actor.typed.scaladsl.ActorContext import akka.annotation.InternalApi import akka.persistence._ +import akka.persistence.typed.EventAdapter import akka.persistence.typed.internal._ +import scala.util.Try + object PersistentBehaviors { // we use this type internally, however it's easier for users to understand the function, so we use it in external api @@ -69,6 +73,11 @@ trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command */ def onRecoveryCompleted(callback: (ActorContext[Command], State) ⇒ Unit): PersistentBehavior[Command, Event, State] + /** + * The `callback` function is called to notify when a snapshot is complete. + */ + def onSnapshot(callback: (ActorContext[Command], SnapshotMetadata, Try[Done]) ⇒ Unit): PersistentBehavior[Command, Event, State] + /** * Initiates a snapshot if the given function returns true. * When persisting multiple events at once the snapshot is triggered after all the events have @@ -108,4 +117,11 @@ trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command * The `tagger` function should give event tags, which will be used in persistence query */ def withTagger(tagger: Event ⇒ Set[String]): PersistentBehavior[Command, Event, State] + + /** + * Transform the event in another type before giving to the journal. Can be used to wrap events + * in types Journals understand but is of a different type than `Event`. + */ + def eventAdapter(adapter: EventAdapter[Event, _]): PersistentBehavior[Command, Event, State] } + diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java index bdaf97b012..929fed9dd7 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java @@ -6,6 +6,7 @@ package akka.persistence.typed.javadsl; import akka.actor.Scheduler; import akka.actor.typed.ActorRef; +import akka.persistence.typed.EventAdapter; import akka.actor.testkit.typed.javadsl.TestInbox; import akka.util.Timeout; @@ -17,7 +18,32 @@ import static akka.actor.typed.javadsl.AskPattern.ask; public class PersistentActorCompileOnlyTest { + public static abstract class Simple { + + //#event-wrapper + public static class Wrapper { + private final T t; + public Wrapper(T t) { + this.t = t; + } + public T getT() { + return t; + } + } + + public static class EventAdapterExample extends EventAdapter> { + @Override + public Wrapper toJournal(SimpleEvent simpleEvent) { + return new Wrapper<>(simpleEvent); + } + @Override + public SimpleEvent fromJournal(Wrapper simpleEventWrapper) { + return simpleEventWrapper.getT(); + } + } + //#event-wrapper + //#command public static class SimpleCommand { public final String data; @@ -45,6 +71,7 @@ public class PersistentActorCompileOnlyTest { SimpleState(List events) { this.events = events; } + SimpleState() { this.events = new ArrayList<>(); } @@ -79,7 +106,15 @@ public class PersistentActorCompileOnlyTest { return (state, event) -> state.addEvent(event); } //#event-handler + + //#install-event-adapter + @Override + public EventAdapter> eventAdapter() { + return new EventAdapterExample(); + } + //#install-event-adapter }; + //#behavior } diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java similarity index 62% rename from akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java rename to akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java index 4515f38147..e68293b5ee 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java @@ -8,35 +8,58 @@ import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; import akka.actor.typed.Signal; import akka.actor.typed.SupervisorStrategy; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Adapter; import akka.actor.typed.javadsl.Behaviors; import akka.japi.Pair; import akka.japi.function.Function3; -import akka.persistence.typed.scaladsl.PersistentBehaviorSpec; +import akka.japi.function.Function; +import akka.persistence.SnapshotMetadata; +import akka.persistence.query.EventEnvelope; +import akka.persistence.query.NoOffset; +import akka.persistence.query.PersistenceQuery; +import akka.persistence.query.Sequence; +import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal; +import akka.persistence.typed.EventAdapter; +import akka.persistence.typed.NoOpEventAdapter; +import akka.stream.ActorMaterializer; +import akka.stream.javadsl.Sink; import akka.actor.testkit.typed.javadsl.TestKitJunitResource; import akka.actor.testkit.typed.javadsl.TestProbe; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import org.junit.ClassRule; import org.junit.Test; import org.scalatest.junit.JUnitSuite; +import java.io.Serializable; import java.time.Duration; import java.util.*; +import static akka.persistence.typed.scaladsl.PersistentBehaviorSpec.*; import static java.util.Collections.singletonList; +import static org.junit.Assert.assertEquals; -public class PersistentActorTest extends JUnitSuite { +public class PersistentActorJavaDslTest extends JUnitSuite { + + public static final Config config = conf().withFallback(ConfigFactory.load()); @ClassRule - public static final TestKitJunitResource testKit = new TestKitJunitResource(PersistentBehaviorSpec.conf()); + public static final TestKitJunitResource testKit = new TestKitJunitResource(config); + static final Incremented timeoutEvent = new Incremented(100); static final State emptyState = new State(0, Collections.emptyList()); static final Incremented terminatedEvent = new Incremented(10); - public PersistentActorTest() { - super(); - } + private LeveldbReadJournal queries = PersistenceQuery.get(Adapter.toUntyped(testKit.system())) + .getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier()); - interface Command { + private ActorMaterializer materializer = ActorMaterializer.create(Adapter.toUntyped(testKit.system())); + + interface Command extends Serializable { } public static class Increment implements Command { @@ -79,7 +102,7 @@ public class PersistentActorTest extends JUnitSuite { } } - public static class Incremented { + public static class Incremented implements Serializable { private final int delta; public Incremented(int delta) { @@ -108,7 +131,7 @@ public class PersistentActorTest extends JUnitSuite { } } - public static class State { + public static class State implements Serializable { private final int value; private final List history; @@ -153,44 +176,78 @@ public class PersistentActorTest extends JUnitSuite { private PersistentBehavior counter(String persistenceId, ActorRef> probe) { ActorRef loggingProbe = TestProbe.create(String.class, testKit.system()).ref(); - return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false); + ActorRef> snapshotProbe = TestProbe.>create(testKit.system()).ref(); + return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false, (e) -> Collections.emptySet(), snapshotProbe, new NoOpEventAdapter<>()); + } + + private PersistentBehavior counter(String persistenceId, + ActorRef> probe, + Function> tagger) { + ActorRef loggingProbe = TestProbe.create(String.class, testKit.system()).ref(); + ActorRef> snapshotProbe = TestProbe.>create(testKit.system()).ref(); + return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false, tagger, snapshotProbe, new NoOpEventAdapter<>()); + } + + private PersistentBehavior counter(String persistenceId, + ActorRef> probe, + EventAdapter transformer) { + ActorRef loggingProbe = TestProbe.create(String.class, testKit.system()).ref(); + ActorRef> snapshotProbe = TestProbe.>create(testKit.system()).ref(); + return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false, e -> Collections.emptySet(), snapshotProbe, transformer); } private PersistentBehavior counter(String persistenceId) { return counter(persistenceId, TestProbe.>create(testKit.system()).ref(), TestProbe.create(testKit.system()).ref(), - (s, i, l) -> false); + (s, i, l) -> false, + (i) -> Collections.emptySet(), + TestProbe.>create(testKit.system()).ref(), + new NoOpEventAdapter<>() + ); } private PersistentBehavior counter( String persistenceId, - Function3 snapshot + Function3 snapshot, + ActorRef> snapshotProbe ) { return counter(persistenceId, testKit.>createTestProbe().ref(), - testKit.createTestProbe().ref(), snapshot); + testKit.createTestProbe().ref(), + snapshot, + e -> Collections.emptySet(), + snapshotProbe, + new NoOpEventAdapter<>()); } private PersistentBehavior counter( String persistentId, ActorRef> eventProbe, ActorRef loggingProbe) { - return counter(persistentId, eventProbe, loggingProbe, (s, i, l) -> false); + return counter(persistentId, eventProbe, loggingProbe, (s, i, l) -> false, e -> Collections.emptySet(), + TestProbe.>create(testKit.system()).ref(), + new NoOpEventAdapter<>() + ); } private PersistentBehavior counter( String persistentId, ActorRef> eventProbe, Function3 snapshot) { - return counter(persistentId, eventProbe, testKit.createTestProbe().ref(), snapshot); + return counter(persistentId, eventProbe, testKit.createTestProbe().ref(), snapshot, (e) -> Collections.emptySet(), + TestProbe.>create(testKit.system()).ref(), new NoOpEventAdapter<>() + ); } - private PersistentBehavior counter( + private PersistentBehavior counter( String persistentId, ActorRef> eventProbe, ActorRef loggingProbe, - Function3 snapshot) { + Function3 snapshot, + Function> tagsFunction, + ActorRef> snapshotProbe, + EventAdapter transformer) { return new PersistentBehavior(persistentId) { @Override public CommandHandler commandHandler() { @@ -251,9 +308,29 @@ public class PersistentActorTest extends JUnitSuite { try { return snapshot.apply(state, event, sequenceNr); } catch (Exception e) { - return false; + throw new RuntimeException(e); } } + + @Override + public Set tagsFor(Incremented incremented) { + try { + return tagsFunction.apply(incremented); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void onSnapshot(ActorContext ctx, SnapshotMetadata meta, Optional result) { + snapshotProbe.tell(result); + } + + + @Override + public EventAdapter eventAdapter() { + return transformer; + } }; } @@ -327,12 +404,15 @@ public class PersistentActorTest extends JUnitSuite { @Test public void snapshot() { - PersistentBehavior snapshoter = counter("c11", (s, e, l) -> s.value % 2 == 0); + TestProbe> snapshotProbe = testKit.createTestProbe(); + PersistentBehavior snapshoter = counter("c11", (s, e, l) -> s.value % 2 == 0, snapshotProbe.ref()); ActorRef c = testKit.spawn(snapshoter); c.tell(Increment.instance); c.tell(Increment.instance); + snapshotProbe.expectMessage(Optional.empty()); c.tell(Increment.instance); TestProbe probe = testKit.createTestProbe(); + c.tell(new GetValue(probe.ref())); probe.expectMessage(new State(3, Arrays.asList(0, 1, 2))); @@ -358,7 +438,7 @@ public class PersistentActorTest extends JUnitSuite { TestProbe interceptProbe = testKit.createTestProbe(); TestProbe signalProbe = testKit.createTestProbe(); ActorRef c = testKit.spawn(Behaviors.tap(Command.class, - (ctx, cmd) -> interceptProbe.ref().tell(cmd), + (ctx, cmd) -> interceptProbe.ref().tell(cmd), (ctx, signal) -> signalProbe.ref().tell(signal), counter("tap1"))); c.tell(Increment.instance); @@ -366,5 +446,56 @@ public class PersistentActorTest extends JUnitSuite { signalProbe.expectNoMessage(); } + @Test + public void tagEvent() throws Exception { + TestProbe> eventProbe = testKit.createTestProbe(); + TestProbe stateProbe = testKit.createTestProbe(); + ActorRef c = testKit.spawn(counter("tagging", eventProbe.ref(), e -> Sets.newHashSet("tag1", "tag2"))); + c.tell(new Increment()); + c.tell(new GetValue(stateProbe.ref())); + stateProbe.expectMessage(new State(1, Collections.singletonList(0))); + + List events = queries.currentEventsByTag("tag1", NoOffset.getInstance()).runWith(Sink.seq(), materializer) + .toCompletableFuture().get(); + assertEquals(Lists.newArrayList( + new EventEnvelope(new Sequence(1), "tagging", 1, new Incremented(1)) + ), events); + } + + @Test + public void transformEvent() throws Exception { + TestProbe> eventProbe = testKit.createTestProbe(); + TestProbe stateProbe = testKit.createTestProbe(); + ActorRef c = testKit.spawn(counter("transform", eventProbe.ref(), new WrapperEventAdapter())); + + c.tell(new Increment()); + c.tell(new GetValue(stateProbe.ref())); + stateProbe.expectMessage(new State(1, Collections.singletonList(0))); + + List events = queries.currentEventsByPersistenceId("transform", 0, Long.MAX_VALUE) + .runWith(Sink.seq(), materializer).toCompletableFuture().get(); + assertEquals(Lists.newArrayList( + new EventEnvelope(new Sequence(1), "transform", 1, new Wrapper<>(new Incremented(1))) + ), events); + + ActorRef c2 = testKit.spawn(counter("transform", eventProbe.ref(), new WrapperEventAdapter())); + c2.tell(new GetValue(stateProbe.ref())); + stateProbe.expectMessage(new State(1, Collections.singletonList(0))); + } + + //event-wrapper + class WrapperEventAdapter extends EventAdapter { + @Override + public Wrapper toJournal(Incremented incremented) { + return new Wrapper<>(incremented); + } + + @Override + public Incremented fromJournal(Wrapper wrapper) { + return (Incremented) wrapper.t(); + } + } + //event-wrapper + // FIXME test with by state command handler } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala index 6000880184..a75c9889ed 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala @@ -4,12 +4,19 @@ package akka.persistence.typed.scaladsl +import java.util.UUID import java.util.concurrent.atomic.AtomicInteger +import akka.Done import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, SupervisorStrategy, Terminated, TypedAkkaSpecWithShutdown } +import akka.persistence.query.{ EventEnvelope, PersistenceQuery, Sequence } +import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal import akka.persistence.snapshot.SnapshotStore +import akka.persistence.typed.EventAdapter import akka.persistence.{ SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria } +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.Sink import akka.actor.testkit.typed.TestKitSettings import akka.actor.testkit.typed.scaladsl._ import com.typesafe.config.{ Config, ConfigFactory } @@ -17,9 +24,18 @@ import org.scalatest.concurrent.Eventually import scala.concurrent.Future import scala.concurrent.duration._ +import scala.util.{ Success, Try } object PersistentBehaviorSpec { + //#event-wrapper + case class Wrapper[T](t: T) + class WrapperEventAdapter[T] extends EventAdapter[T, Wrapper[T]] { + override def toJournal(e: T): Wrapper[T] = Wrapper(e) + override def fromJournal(p: Wrapper[T]): T = p.t + } + //#event-wrapper + class InMemorySnapshotStore extends SnapshotStore { private var state = Map.empty[String, (Any, SnapshotMetadata)] @@ -38,14 +54,14 @@ object PersistentBehaviorSpec { } // also used from PersistentActorTest - val conf: Config = ConfigFactory.parseString( + def conf: Config = ConfigFactory.parseString( s""" akka.loglevel = INFO # akka.persistence.typed.log-stashing = INFO - - akka.persistence.snapshot-store.inmem.class = "akka.persistence.typed.scaladsl.PersistentBehaviorSpec$$InMemorySnapshotStore" - akka.persistence.journal.plugin = "akka.persistence.journal.inmem" - akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.inmem" + akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}" + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "target/typed-persistence-${UUID.randomUUID().toString}" """) @@ -75,18 +91,22 @@ object PersistentBehaviorSpec { val secondLogging = "second logging" def counter(persistenceId: String)(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] = - counter(persistenceId, loggingActor = TestProbe[String].ref, probe = TestProbe[(State, Event)].ref) + counter(persistenceId, loggingActor = TestProbe[String].ref, probe = TestProbe[(State, Event)].ref, TestProbe[Try[Done]].ref) def counter(persistenceId: String, logging: ActorRef[String])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] = - counter(persistenceId, loggingActor = logging, probe = TestProbe[(State, Event)].ref) + counter(persistenceId, loggingActor = logging, probe = TestProbe[(State, Event)].ref, TestProbe[Try[Done]].ref) def counterWithProbe(persistenceId: String, probe: ActorRef[(State, Event)])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] = - counter(persistenceId, TestProbe[String].ref, probe) + counter(persistenceId, TestProbe[String].ref, probe, TestProbe[Try[Done]].ref) + + def counterWithSnapshotProbe(persistenceId: String, probe: ActorRef[Try[Done]])(implicit system: ActorSystem[_]): PersistentBehavior[Command, Event, State] = + counter(persistenceId, TestProbe[String].ref, TestProbe[(State, Event)].ref, snapshotProbe = probe) def counter( persistenceId: String, loggingActor: ActorRef[String], - probe: ActorRef[(State, Event)]): PersistentBehavior[Command, Event, State] = { + probe: ActorRef[(State, Event)], + snapshotProbe: ActorRef[Try[Done]]): PersistentBehavior[Command, Event, State] = { PersistentBehaviors.receive[Command, Event, State]( persistenceId, initialState = State(0, Vector.empty), @@ -172,18 +192,31 @@ object PersistentBehaviorSpec { case Incremented(delta) ⇒ probe ! ((state, evt)) State(state.value + delta, state.history :+ state.value) - }) + }).onRecoveryCompleted { + case (_, _) ⇒ + } + .onSnapshot { + case (_, _, result) ⇒ + snapshotProbe ! result + } } } class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown with Eventually { + import PersistentBehaviorSpec._ - override def config: Config = PersistentBehaviorSpec.conf + override lazy val config: Config = PersistentBehaviorSpec.conf implicit val testSettings = TestKitSettings(system) + import akka.actor.typed.scaladsl.adapter._ + + implicit val materializer = ActorMaterializer()(system.toUntyped) + val queries: LeveldbReadJournal = PersistenceQuery(system.toUntyped).readJournalFor[LeveldbReadJournal]( + LeveldbReadJournal.Identifier) + val pidCounter = new AtomicInteger(0) private def nextPid(): String = s"c${pidCounter.incrementAndGet()}" @@ -326,16 +359,17 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown "snapshot via predicate" in { val pid = nextPid + val snapshotProbe = TestProbe[Try[Done]] val alwaysSnapshot: Behavior[Command] = Behaviors.setup { _ ⇒ - counter(pid).snapshotWhen { (_, _, _) ⇒ true } + counterWithSnapshotProbe(pid, snapshotProbe.ref).snapshotWhen { (_, _, _) ⇒ true } } - val c = spawn(alwaysSnapshot) val watchProbe = watcher(c) val replyProbe = TestProbe[State]() c ! Increment + snapshotProbe.expectMessage(Success(Done)) c ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(1, Vector(0))) c ! LogThenStop @@ -344,6 +378,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown val probe = TestProbe[(State, Event)]() val c2 = spawn(counterWithProbe(pid, probe.ref)) // state should be rebuilt from snapshot, no events replayed + // Fails as snapshot is async (i think) probe.expectNoMessage() c2 ! Increment c2 ! GetValue(replyProbe.ref) @@ -352,14 +387,17 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown "check all events for snapshot in PersistAll" in { val pid = nextPid - val snapshotAtTwo = counter(pid).snapshotWhen { (s, _, _) ⇒ s.value == 2 } + val snapshotProbe = TestProbe[Try[Done]] + val snapshotAtTwo = counterWithSnapshotProbe(pid, snapshotProbe.ref).snapshotWhen { (s, _, _) ⇒ s.value == 2 } val c: ActorRef[Command] = spawn(snapshotAtTwo) val watchProbe = watcher(c) val replyProbe = TestProbe[State]() c ! IncrementWithPersistAll(3) + c ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(3, Vector(0, 1, 2))) + snapshotProbe.expectMessage(Success(Done)) c ! LogThenStop watchProbe.expectMessage("Terminated") @@ -434,6 +472,83 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown probe.expectMessage("msg received") } + "tag events" in { + val pid = nextPid + val c = spawn(counter(pid).withTagger(_ ⇒ Set("tag1", "tag2"))) + val replyProbe = TestProbe[State]() + + c ! Increment + c ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(1, Vector(0))) + + val events = queries.currentEventsByTag("tag1").runWith(Sink.seq).futureValue + events shouldEqual List(EventEnvelope(Sequence(1), pid, 1, Incremented(1))) + } + + "adapt events" in { + val pid = nextPid + val persistentBehavior = counter(pid) + val c = spawn( + //#install-event-adapter + persistentBehavior.eventAdapter(new WrapperEventAdapter[Event])) + //#install-event-adapter + val replyProbe = TestProbe[State]() + + c ! Increment + c ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(1, Vector(0))) + + val events = queries.currentEventsByPersistenceId(pid).runWith(Sink.seq).futureValue + events shouldEqual List(EventEnvelope(Sequence(1), pid, 1, Wrapper(Incremented(1)))) + + val c2 = spawn(counter(pid).eventAdapter(new WrapperEventAdapter[Event])) + c2 ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(1, Vector(0))) + + } + + "adapter multiple events with persist all" in { + val pid = nextPid + val c = spawn(counter(pid).eventAdapter(new WrapperEventAdapter[Event])) + val replyProbe = TestProbe[State]() + + c ! IncrementWithPersistAll(2) + c ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(2, Vector(0, 1))) + + val events = queries.currentEventsByPersistenceId(pid).runWith(Sink.seq).futureValue + events shouldEqual List( + EventEnvelope(Sequence(1), pid, 1, Wrapper(Incremented(1))), + EventEnvelope(Sequence(2), pid, 2, Wrapper(Incremented(1))) + ) + + val c2 = spawn(counter(pid).eventAdapter(new WrapperEventAdapter[Event])) + c2 ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(2, Vector(0, 1))) + } + + "adapt and tag events" in { + val pid = nextPid + val c = spawn(counter(pid) + .withTagger(_ ⇒ Set("tag99")) + .eventAdapter(new WrapperEventAdapter[Event])) + val replyProbe = TestProbe[State]() + + c ! Increment + c ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(1, Vector(0))) + + val events = queries.currentEventsByPersistenceId(pid).runWith(Sink.seq).futureValue + events shouldEqual List(EventEnvelope(Sequence(1), pid, 1, Wrapper(Incremented(1)))) + + val c2 = spawn(counter(pid).eventAdapter(new WrapperEventAdapter[Event])) + c2 ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(1, Vector(0))) + + val taggedEvents = queries.currentEventsByTag("tag99").runWith(Sink.seq).futureValue + taggedEvents shouldEqual List(EventEnvelope(Sequence(1), pid, 1, Wrapper(Incremented(1)))) + } + def watcher(toWatch: ActorRef[_]): TestProbe[String] = { val probe = TestProbe[String]() val w = Behaviors.setup[Any] { (ctx) ⇒ diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsSpec.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala similarity index 97% rename from akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsSpec.scala rename to akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala index ab474bb409..3e470c5e5e 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsSpec.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala @@ -8,7 +8,7 @@ import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors import akka.persistence.typed.scaladsl.PersistentBehaviors -object BasicPersistentBehaviorsSpec { +object BasicPersistentBehaviorsCompileOnly { //#structure sealed trait Command diff --git a/build.sbt b/build.sbt index ea2a8584df..af866f2b78 100644 --- a/build.sbt +++ b/build.sbt @@ -396,9 +396,10 @@ lazy val actorTyped = akkaModule("akka-actor-typed") lazy val persistenceTyped = akkaModule("akka-persistence-typed") .dependsOn( actorTyped, - persistence, - actorTestkitTyped % "test->test", - actorTypedTests % "test->test" + persistence % "compile->compile;test->test", + persistenceQuery % "test", + actorTypedTests % "test->test", + actorTestkitTyped % "compile->compile;test->test" ) .settings(Dependencies.persistenceShared) .settings(AkkaBuild.mayChangeSettings)