EventSeq in Typed EventAdapter (#27130)

EventSeq in Typed EventAdapter, refs #26909
This commit is contained in:
Patrik Nordwall 2019-07-05 09:46:10 +02:00 committed by Arnout Engelen
parent 16f2009786
commit 72680e93bf
15 changed files with 494 additions and 133 deletions

View file

@ -53,6 +53,12 @@ private[akka] final class OptionVal[+A](val x: A) extends AnyVal {
def getOrElse[B >: A](default: B): B =
if (x == null) default else x
/**
* Convert to `scala.Option`
*/
def toOption: Option[A] =
Option(x)
def contains[B >: A](it: B): Boolean =
x != null && x == it

View file

@ -206,7 +206,7 @@ By default, these remoting features are disabled when not using Akka Cluster:
When used with Cluster, all previous behavior is the same except a remote watch of an actor is no longer possible before a node joins a cluster, only after.
To optionally enable them without Cluster, if you understand
the @ref[consequences](../remoting-artery.md#quarantine), set
the @ref[consequences](../remoting-artery.md#quarantine), set
```
akka.remote.use-unsafe-remote-features-without-cluster = on`.
```
@ -214,7 +214,7 @@ akka.remote.use-unsafe-remote-features-without-cluster = on`.
When used without Cluster
* An initial warning is logged on startup of `RemoteActorRefProvider`
* A warning will be logged on remote watch attempts, which you can suppress by setting
* A warning will be logged on remote watch attempts, which you can suppress by setting
```
akka.remote.warn-unsafe-watch-without-cluster = off
```
@ -305,7 +305,7 @@ akka.coordinated-shutdown.run-by-actor-system-terminate = off
`StreamConverters.fromInputStream` now always fails the materialized value in case of failure. It is no longer required
to both check the materialized value and the `Try[Done]` inside the @apidoc[IOResult]. In case of an IO failure
the exception will be @apidoc[IOOperationIncompleteException] instead of @apidoc[AbruptIOTerminationException].
the exception will be @apidoc[IOOperationIncompleteException] instead of @apidoc[AbruptIOTerminationException].
### Akka now uses Fork Join Pool from JDK
@ -387,6 +387,7 @@ made before finalizing the APIs. Compared to Akka 2.5.x the source incompatible
* `ActorContext` parameter removed in `javadsl.ReceiveBuilder` for the functional style in Java. Use `Behaviors.setup`
to retrieve `ActorContext`, and use an enclosing class to hold initialization parameters and `ActorContext`.
* Java @apidoc[akka.cluster.sharding.typed.javadsl.EntityRef] ask timeout now takes a `java.time.Duration` rather than a @apidoc[Timeout]
* Changed method signature for `EventAdapter.fromJournal` and support for `manifest` in `EventAdapter`.
* `BehaviorInterceptor`, `Behaviors.monitor`, `Behaviors.withMdc` and @scala[`widen`]@java[`Behaviors.widen`] takes
a @scala[`ClassTag` parameter (probably source compatible)]@java[`interceptMessageClass` parameter].
`interceptMessageType` method in `BehaviorInterceptor` is replaced with this @scala[`ClassTag`]@java[`Class`] parameter.

View file

@ -144,7 +144,7 @@ interpreted correctly on replay. Cluster Sharding ensures that there is only one
## Accessing the ActorContext
If the persistent behavior needs to use the `ActorContext`, for example to spawn child actors, it can be obtained by
If the `EventSourcedBehavior` needs to use the `ActorContext`, for example to spawn child actors, it can be obtained by
wrapping construction with `Behaviors.setup`:
Scala
@ -357,20 +357,20 @@ to another type that is then passed to the journal.
Defining an event adapter is done by extending an EventAdapter:
Scala
: @@snip [x](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala) { #event-wrapper }
: @@snip [x](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #event-wrapper }
Java
: @@snip [x](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #event-wrapper }
Then install it on a persistent behavior:
Then install it on a `EventSourcedBehavior`:
Scala
: @@snip [x](/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorSpec.scala) { #install-event-adapter }
: @@snip [x](/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorCompileOnly.scala) { #install-event-adapter }
Java
: @@snip [x](/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorCompileOnlyTest.java) { #install-event-adapter }
## Wrapping Persistent Behaviors
## Wrapping EventSourcedBehavior
When creating a `EventSourcedBehavior`, it is possible to wrap `EventSourcedBehavior` in
other behaviors such as `Behaviors.setup` in order to access the `ActorContext` object. For instance

View file

@ -4,26 +4,107 @@
package akka.persistence.typed
import scala.annotation.varargs
import scala.collection.immutable
import akka.annotation.InternalApi
/**
* Facility to convert from and to specialised data models, as may be required by specialized persistence Journals.
* Typical use cases include (but are not limited to):
* <ul>
* <li>extracting events from "envelopes"</li>
* <li>adapting events from a "domain model" to the "data model", e.g. converting to the Journals storage format,
* such as JSON, BSON or any specialised binary format</li>
* <li>adapting events from a "data model" to the "domain model"</li>
* <li>adding metadata that is understood by the journal</li>
* <li>migration by splitting up events into sequences of other events</li>
* <li>migration filtering out unused events, or replacing an event with another</li>
* </ul>
*/
abstract class EventAdapter[E, P] {
/**
* Type of the event to persist
* Convert domain event to journal event type.
*
* Some journal may require a specific type to be returned to them,
* for example if a primary key has to be associated with each event then a journal
* may require adapters to return `com.example.myjournal.EventWithPrimaryKey(event, key)`.
*
* The `toJournal` adaptation must be an 1-to-1 transformation.
* It is not allowed to drop incoming events during the `toJournal` adaptation.
*
* @param e the application-side domain event to be adapted to the journal model
* @return the adapted event object, possibly the same object if no adaptation was performed
*/
type Per = P
def toJournal(e: E): P
/**
* Transform event on the way to the journal
* Return the manifest (type hint) that will be provided in the `fromJournal` method.
* Use `""` if manifest is not needed.
*/
def toJournal(e: E): Per
def manifest(event: E): String
/**
* Transform the event on recovery from the journal.
* Note that this is not called in any read side so will need to be applied
* manually when using Query.
*
* Convert a event from its journal model to the applications domain model.
*
* One event may be adapter into multiple (or none) events which should be delivered to the `EventSourcedBehavior`.
* Use the specialised [[EventSeq.single]] method to emit exactly one event,
* or [[EventSeq.empty]] in case the adapter is not handling this event.
*
* @param p event to be adapted before delivering to the `EventSourcedBehavior`
* @param manifest optionally provided manifest (type hint) in case the Adapter has stored one
* for this event, `""` if none
* @return sequence containing the adapted events (possibly zero) which will be delivered to
* the `EventSourcedBehavior`
*/
def fromJournal(p: Per): E
def fromJournal(p: P, manifest: String): EventSeq[E]
}
sealed trait EventSeq[+A] {
def events: immutable.Seq[A]
def isEmpty: Boolean = events.isEmpty
def nonEmpty: Boolean = events.nonEmpty
def size: Int
}
object EventSeq {
final def empty[A]: EventSeq[A] = EmptyEventSeq.asInstanceOf[EventSeq[A]]
final def single[A](event: A): EventSeq[A] = SingleEventSeq(event)
@varargs final def many[A](events: A*): EventSeq[A] = EventsSeq(events.toList)
/** Java API */
final def create[A](events: java.util.List[A]): EventSeq[A] = {
import akka.util.ccompat.JavaConverters._
EventsSeq(events.asScala.toList)
}
/** Scala API */
final def apply[A](events: immutable.Seq[A]): EventSeq[A] = EventsSeq(events)
}
/** INTERNAL API */
@InternalApi private[akka] final case class SingleEventSeq[A](event: A) extends EventSeq[A] {
override def events: immutable.Seq[A] = List(event)
override def size: Int = 1
}
/** INTERNAL API */
@InternalApi private[akka] case object EmptyEventSeq extends EventSeq[Nothing] {
override def events: immutable.Seq[Nothing] = Nil
override def size: Int = 0
}
/** INTERNAL API */
@InternalApi private[akka] final case class EventsSeq[A](override val events: immutable.Seq[A]) extends EventSeq[A] {
override def size: Int = events.size
}
/**
@ -39,5 +120,6 @@ abstract class EventAdapter[E, P] {
*/
@InternalApi private[akka] class NoOpEventAdapter[E] extends EventAdapter[E, Any] {
override def toJournal(e: E): Any = e
override def fromJournal(p: Any): E = p.asInstanceOf[E]
override def fromJournal(p: Any, manifest: String): EventSeq[E] = EventSeq.single(p.asInstanceOf[E])
override def manifest(event: E): String = ""
}

View file

@ -44,7 +44,7 @@ private[akka] final class BehaviorSetup[C, E, S](
val writerIdentity: EventSourcedBehaviorImpl.WriterIdentity,
private val signalHandler: PartialFunction[(S, Signal), Unit],
val tagger: E => Set[String],
val eventAdapter: EventAdapter[E, _],
val eventAdapter: EventAdapter[E, Any],
val snapshotWhen: (S, E, Long) => Boolean,
val recovery: Recovery,
val retention: RetentionCriteria,

View file

@ -26,17 +26,20 @@ private[akka] trait JournalInteractions[C, E, S] {
type EventOrTagged = Any // `Any` since can be `E` or `Tagged`
protected def internalPersist(state: Running.RunningState[S], event: EventOrTagged): Running.RunningState[S] = {
protected def internalPersist(
state: Running.RunningState[S],
event: EventOrTagged,
eventAdapterManifest: String): Running.RunningState[S] = {
val newState = state.nextSequenceNr()
val senderNotKnownBecauseAkkaTyped = null
val repr = PersistentRepr(
event,
persistenceId = setup.persistenceId.id,
sequenceNr = newState.seqNr,
manifest = eventAdapterManifest,
writerUuid = setup.writerIdentity.writerUuid,
sender = senderNotKnownBecauseAkkaTyped)
sender = ActorRef.noSender)
val write = AtomicWrite(repr) :: Nil
setup.journal
@ -46,19 +49,21 @@ private[akka] trait JournalInteractions[C, E, S] {
}
protected def internalPersistAll(
events: immutable.Seq[EventOrTagged],
state: Running.RunningState[S]): Running.RunningState[S] = {
state: Running.RunningState[S],
events: immutable.Seq[(EventOrTagged, String)]): Running.RunningState[S] = {
if (events.nonEmpty) {
var newState = state
val writes = events.map { event =>
newState = newState.nextSequenceNr()
PersistentRepr(
event,
persistenceId = setup.persistenceId.id,
sequenceNr = newState.seqNr,
writerUuid = setup.writerIdentity.writerUuid,
sender = ActorRef.noSender)
val writes = events.map {
case (event, eventAdapterManifest) =>
newState = newState.nextSequenceNr()
PersistentRepr(
event,
persistenceId = setup.persistenceId.id,
sequenceNr = newState.seqNr,
manifest = eventAdapterManifest,
writerUuid = setup.writerIdentity.writerUuid,
sender = ActorRef.noSender)
}
val write = AtomicWrite(writes)

View file

@ -6,6 +6,7 @@ package akka.persistence.typed.internal
import scala.util.control.NonFatal
import scala.concurrent.duration._
import akka.actor.typed.{ Behavior, Signal }
import akka.actor.typed.internal.PoisonPill
import akka.actor.typed.internal.UnstashException
@ -14,10 +15,14 @@ import akka.annotation.{ InternalApi, InternalStableApi }
import akka.event.Logging
import akka.persistence.JournalProtocol._
import akka.persistence._
import akka.persistence.typed.EmptyEventSeq
import akka.persistence.typed.EventsSeq
import akka.persistence.typed.RecoveryFailed
import akka.persistence.typed.RecoveryCompleted
import akka.persistence.typed.SingleEventSeq
import akka.persistence.typed.internal.ReplayingEvents.ReplayingState
import akka.persistence.typed.internal.Running.WithSeqNrAccessible
import akka.util.OptionVal
import akka.util.unused
import akka.util.PrettyDuration._
@ -103,19 +108,31 @@ private[akka] final class ReplayingEvents[C, E, S](
try {
response match {
case ReplayedMessage(repr) =>
val event = setup.eventAdapter.fromJournal(repr.payload.asInstanceOf[setup.eventAdapter.Per])
var eventForErrorReporting: OptionVal[Any] = OptionVal.None
try {
state = state.copy(
seqNr = repr.sequenceNr,
state = setup.eventHandler(state.state, event),
eventSeenInInterval = true)
val eventSeq = setup.eventAdapter.fromJournal(repr.payload, repr.manifest)
def handleEvent(event: E): Unit = {
eventForErrorReporting = OptionVal.Some(event)
state = state.copy(
seqNr = repr.sequenceNr,
state = setup.eventHandler(state.state, event),
eventSeenInInterval = true)
}
eventSeq match {
case SingleEventSeq(event) => handleEvent(event)
case EventsSeq(events) => events.foreach(handleEvent)
case EmptyEventSeq => // no events
}
this
} catch {
case NonFatal(ex) =>
state = state.copy(repr.sequenceNr)
onRecoveryFailure(ex, Some(event))
onRecoveryFailure(ex, eventForErrorReporting.toOption)
}
case RecoverySuccess(highestSeqNr) =>
setup.log.debug("Recovery successful, recovered until sequenceNr: [{}]", highestSeqNr)
onRecoveryCompleted(state)

View file

@ -148,8 +148,9 @@ private[akka] object Running {
val newState = state.applyEvent(setup, event)
val eventToPersist = adaptEvent(event)
val eventAdapterManifest = setup.eventAdapter.manifest(event)
val newState2 = internalPersist(newState, eventToPersist)
val newState2 = internalPersist(newState, eventToPersist, eventAdapterManifest)
val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event, newState2.seqNr)
@ -169,9 +170,9 @@ private[akka] object Running {
(currentState.applyEvent(setup, event), shouldSnapshot)
}
val eventsToPersist = events.map(adaptEvent)
val eventsToPersist = events.map(evt => (adaptEvent(evt), setup.eventAdapter.manifest(evt)))
val newState2 = internalPersistAll(eventsToPersist, newState)
val newState2 = internalPersistAll(newState, eventsToPersist)
persistingEvents(newState2, state, events.size, shouldSnapshotAfterPersist, sideEffects)

View file

@ -149,6 +149,10 @@ abstract class EventSourcedBehavior[Command, Event, State] private[akka] (
*/
def tagsFor(@unused event: Event): java.util.Set[String] = Collections.emptySet()
/**
* Transform the event in another type before giving to the journal. Can be used to wrap events
* in types Journals understand but is of a different type than `Event`.
*/
def eventAdapter(): EventAdapter[Event, _] = NoOpEventAdapter.instance[Event]
/**

View file

@ -10,6 +10,7 @@ import akka.actor.typed.ActorRef;
import akka.actor.typed.Scheduler;
import akka.actor.typed.javadsl.Behaviors;
import akka.japi.function.Procedure;
import akka.persistence.typed.EventSeq;
import akka.persistence.typed.SnapshotSelectionCriteria;
import akka.persistence.typed.EventAdapter;
import akka.actor.testkit.typed.javadsl.TestInbox;
@ -27,14 +28,14 @@ public class PersistentActorCompileOnlyTest {
// #event-wrapper
public static class Wrapper<T> {
private final T t;
private final T event;
public Wrapper(T t) {
this.t = t;
public Wrapper(T event) {
this.event = event;
}
public T getT() {
return t;
public T getEvent() {
return event;
}
}
@ -46,12 +47,21 @@ public class PersistentActorCompileOnlyTest {
}
@Override
public SimpleEvent fromJournal(Wrapper<SimpleEvent> simpleEventWrapper) {
return simpleEventWrapper.getT();
public String manifest(SimpleEvent event) {
return "";
}
@Override
public EventSeq<SimpleEvent> fromJournal(
Wrapper<SimpleEvent> simpleEventWrapper, String manifest) {
return EventSeq.single(simpleEventWrapper.getEvent());
}
}
// #event-wrapper
// try varargs
private EventSeq<SimpleEvent> many = EventSeq.many(new SimpleEvent("a"), new SimpleEvent("b"));
public static class SimpleCommand {
public final String data;

View file

@ -17,6 +17,7 @@ import akka.persistence.query.PersistenceQuery;
import akka.persistence.query.Sequence;
import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal;
import akka.persistence.typed.*;
import akka.persistence.typed.scaladsl.EventSourcedBehaviorSpec;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
@ -35,7 +36,6 @@ import java.time.Duration;
import java.util.*;
import static akka.Done.done;
import static akka.persistence.typed.scaladsl.EventSourcedBehaviorSpec.*;
import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals;
@ -43,7 +43,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
public static final Config config =
ConfigFactory.parseString("akka.loggers = [akka.testkit.TestEventListener]")
.withFallback(conf().withFallback(ConfigFactory.load()));
.withFallback(EventSourcedBehaviorSpec.conf().withFallback(ConfigFactory.load()));
@ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(config);
@ -583,15 +583,47 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
}
// event-wrapper
class WrapperEventAdapter extends EventAdapter<Incremented, Wrapper> {
public static class Wrapper<T> implements Serializable {
private final T t;
public Wrapper(T t) {
this.t = t;
}
public T getT() {
return t;
}
@Override
public Wrapper toJournal(Incremented incremented) {
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Wrapper<?> wrapper = (Wrapper<?>) o;
return t.equals(wrapper.t);
}
@Override
public int hashCode() {
return t.hashCode();
}
}
class WrapperEventAdapter extends EventAdapter<Incremented, Wrapper<Incremented>> {
@Override
public Wrapper<Incremented> toJournal(Incremented incremented) {
return new Wrapper<>(incremented);
}
@Override
public Incremented fromJournal(Wrapper wrapper) {
return (Incremented) wrapper.t();
public String manifest(Incremented event) {
return "";
}
@Override
public EventSeq<Incremented> fromJournal(Wrapper<Incremented> wrapper, String manifest) {
return EventSeq.single(wrapper.getT());
}
}
// event-wrapper

View file

@ -32,7 +32,6 @@ import akka.persistence.query.PersistenceQuery
import akka.persistence.query.Sequence
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.snapshot.SnapshotStore
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryCompleted
@ -51,14 +50,6 @@ import org.scalatest.WordSpecLike
object EventSourcedBehaviorSpec {
//#event-wrapper
case class Wrapper[T](t: T)
class WrapperEventAdapter[T] extends EventAdapter[T, Wrapper[T]] {
override def toJournal(e: T): Wrapper[T] = Wrapper(e)
override def fromJournal(p: Wrapper[T]): T = p.t
}
//#event-wrapper
class SlowInMemorySnapshotStore extends SnapshotStore {
private var state = Map.empty[String, (Any, UntypedSnapshotMetadata)]
@ -484,70 +475,6 @@ class EventSourcedBehaviorSpec extends ScalaTestWithActorTestKit(EventSourcedBeh
events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, Incremented(1)))
}
"adapt events" in {
val pid = nextPid
val c = spawn(Behaviors.setup[Command] { ctx =>
val persistentBehavior = counter(ctx, pid)
//#install-event-adapter
persistentBehavior.eventAdapter(new WrapperEventAdapter[Event])
//#install-event-adapter
})
val replyProbe = TestProbe[State]()
c ! Increment
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue
events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, Wrapper(Incremented(1))))
val c2 = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new WrapperEventAdapter[Event])))
c2 ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
}
"adapter multiple events with persist all" in {
val pid = nextPid
val c = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new WrapperEventAdapter[Event])))
val replyProbe = TestProbe[State]()
c ! IncrementWithPersistAll(2)
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(2, Vector(0, 1)))
val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue
events shouldEqual List(
EventEnvelope(Sequence(1), pid.id, 1, Wrapper(Incremented(1))),
EventEnvelope(Sequence(2), pid.id, 2, Wrapper(Incremented(1))))
val c2 = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new WrapperEventAdapter[Event])))
c2 ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(2, Vector(0, 1)))
}
"adapt and tag events" in {
val pid = nextPid
val c = spawn(Behaviors.setup[Command](ctx =>
counter(ctx, pid).withTagger(_ => Set("tag99")).eventAdapter(new WrapperEventAdapter[Event])))
val replyProbe = TestProbe[State]()
c ! Increment
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue
events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, Wrapper(Incremented(1))))
val c2 = spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new WrapperEventAdapter[Event])))
c2 ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
val taggedEvents = queries.currentEventsByTag("tag99").runWith(Sink.seq).futureValue
taggedEvents shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, Wrapper(Incremented(1))))
}
"handle scheduled message arriving before recovery completed " in {
val c = spawn(Behaviors.withTimers[Command] { timers =>
timers.startSingleTimer("tick", Increment, 1.millis)

View file

@ -0,0 +1,249 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.scaladsl
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.query.EventEnvelope
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.Sequence
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.EventSeq
import akka.persistence.typed.PersistenceId
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.testkit.EventFilter
import akka.testkit.TestEvent.Mute
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
object EventSourcedEventAdapterSpec {
private val conf = ConfigFactory.parseString(s"""
akka.loggers = [akka.testkit.TestEventListener]
akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
""")
case class Wrapper(t: String)
class WrapperEventAdapter extends EventAdapter[String, Wrapper] {
override def toJournal(e: String): Wrapper = Wrapper("<" + e)
override def fromJournal(p: Wrapper, manifest: String): EventSeq[String] = EventSeq.single(p.t + ">")
override def manifest(event: String): String = ""
}
class FilterEventAdapter extends EventAdapter[String, String] {
override def toJournal(e: String): String = e.toUpperCase()
override def fromJournal(p: String, manifest: String): EventSeq[String] = {
if (p == "B") EventSeq.empty
else EventSeq.single(p)
}
override def manifest(event: String): String = ""
}
class SplitEventAdapter extends EventAdapter[String, String] {
override def toJournal(e: String): String = e.toUpperCase()
override def fromJournal(p: String, manifest: String): EventSeq[String] = {
EventSeq(p.map("<" + _.toString + ">"))
}
override def manifest(event: String): String = ""
}
class EventAdapterWithManifest extends EventAdapter[String, String] {
override def toJournal(e: String): String = e.toUpperCase()
override def fromJournal(p: String, manifest: String): EventSeq[String] = {
EventSeq.single(p + manifest)
}
override def manifest(event: String): String = event.length.toString
}
case class GenericWrapper[T](t: T)
class GenericWrapperEventAdapter[T] extends EventAdapter[T, GenericWrapper[T]] {
override def toJournal(e: T): GenericWrapper[T] = GenericWrapper(e)
override def fromJournal(p: GenericWrapper[T], manifest: String): EventSeq[T] = EventSeq.single(p.t)
override def manifest(event: T): String = ""
}
}
class EventSourcedEventAdapterSpec
extends ScalaTestWithActorTestKit(EventSourcedEventAdapterSpec.conf)
with WordSpecLike {
import EventSourcedEventAdapterSpec._
import EventSourcedBehaviorSpec.{
counter,
Command,
Event,
GetValue,
Increment,
IncrementWithPersistAll,
Incremented,
State
}
import akka.actor.typed.scaladsl.adapter._
system.toUntyped.eventStream.publish(Mute(EventFilter.warning(start = "No default snapshot store", occurrences = 1)))
val pidCounter = new AtomicInteger(0)
private def nextPid(): PersistenceId = PersistenceId(s"c${pidCounter.incrementAndGet()})")
implicit val materializer = ActorMaterializer()(system.toUntyped)
val queries: LeveldbReadJournal =
PersistenceQuery(system.toUntyped).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
private def behavior(pid: PersistenceId, probe: ActorRef[String]): EventSourcedBehavior[String, String, String] =
EventSourcedBehavior(pid, "", commandHandler = { (_, command) =>
Effect.persist(command).thenRun(newState => probe ! newState)
}, eventHandler = { (state, evt) =>
state + evt
})
"Event adapter" must {
"wrap single events" in {
val probe = TestProbe[String]()
val pid = nextPid()
val ref = spawn(behavior(pid, probe.ref).eventAdapter(new WrapperEventAdapter))
ref ! "a"
ref ! "b"
probe.expectMessage("a")
probe.expectMessage("ab")
// replay
val ref2 = spawn(behavior(pid, probe.ref).eventAdapter(new WrapperEventAdapter))
ref2 ! "c"
probe.expectMessage("<a><b>c")
}
"filter unused events" in {
val probe = TestProbe[String]()
val pid = nextPid()
val ref = spawn(behavior(pid, probe.ref).eventAdapter(new FilterEventAdapter))
ref ! "a"
ref ! "b"
ref ! "c"
probe.expectMessage("a")
probe.expectMessage("ab")
probe.expectMessage("abc")
// replay
val ref2 = spawn(behavior(pid, probe.ref).eventAdapter(new FilterEventAdapter))
ref2 ! "d"
probe.expectMessage("ACd")
}
"split one event into several" in {
val probe = TestProbe[String]()
val pid = nextPid()
val ref = spawn(behavior(pid, probe.ref).eventAdapter(new SplitEventAdapter))
ref ! "a"
ref ! "bc"
probe.expectMessage("a")
probe.expectMessage("abc")
// replay
val ref2 = spawn(behavior(pid, probe.ref).eventAdapter(new SplitEventAdapter))
ref2 ! "d"
probe.expectMessage("<A><B><C>d")
}
"support manifest" in {
val probe = TestProbe[String]()
val pid = nextPid()
val ref = spawn(behavior(pid, probe.ref).eventAdapter(new EventAdapterWithManifest))
ref ! "a"
ref ! "bcd"
probe.expectMessage("a")
probe.expectMessage("abcd")
// replay
val ref2 = spawn(behavior(pid, probe.ref).eventAdapter(new EventAdapterWithManifest))
ref2 ! "e"
probe.expectMessage("A1BCD3e")
}
"adapt events" in {
val pid = nextPid()
val c = spawn(Behaviors.setup[Command] { ctx =>
val persistentBehavior = counter(ctx, pid)
persistentBehavior.eventAdapter(new GenericWrapperEventAdapter[Event])
})
val replyProbe = TestProbe[State]()
c ! Increment
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue
events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, GenericWrapper(Incremented(1))))
val c2 =
spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new GenericWrapperEventAdapter[Event])))
c2 ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
}
"adapter multiple events with persist all" in {
val pid = nextPid()
val c =
spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new GenericWrapperEventAdapter[Event])))
val replyProbe = TestProbe[State]()
c ! IncrementWithPersistAll(2)
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(2, Vector(0, 1)))
val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue
events shouldEqual List(
EventEnvelope(Sequence(1), pid.id, 1, GenericWrapper(Incremented(1))),
EventEnvelope(Sequence(2), pid.id, 2, GenericWrapper(Incremented(1))))
val c2 =
spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new GenericWrapperEventAdapter[Event])))
c2 ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(2, Vector(0, 1)))
}
"adapt and tag events" in {
val pid = nextPid()
val c = spawn(Behaviors.setup[Command](ctx =>
counter(ctx, pid).withTagger(_ => Set("tag99")).eventAdapter(new GenericWrapperEventAdapter[Event])))
val replyProbe = TestProbe[State]()
c ! Increment
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
val events = queries.currentEventsByPersistenceId(pid.id).runWith(Sink.seq).futureValue
events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, GenericWrapper(Incremented(1))))
val c2 =
spawn(Behaviors.setup[Command](ctx => counter(ctx, pid).eventAdapter(new GenericWrapperEventAdapter[Event])))
c2 ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
val taggedEvents = queries.currentEventsByTag("tag99").runWith(Sink.seq).futureValue
taggedEvents shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, GenericWrapper(Incremented(1))))
}
}
}

View file

@ -5,11 +5,14 @@
package docs.akka.persistence.typed
import scala.concurrent.duration._
import akka.actor.typed.Behavior
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.DeleteEventsFailed
import akka.persistence.typed.DeleteSnapshotsFailed
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.EventSeq
//#behavior
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.PersistenceId
@ -230,4 +233,23 @@ object BasicPersistentBehaviorCompileOnly {
}
//#retentionCriteriaWithSignals
//#event-wrapper
case class Wrapper[T](event: T)
class WrapperEventAdapter[T] extends EventAdapter[T, Wrapper[T]] {
override def toJournal(e: T): Wrapper[T] = Wrapper(e)
override def fromJournal(p: Wrapper[T], manifest: String): EventSeq[T] = EventSeq.single(p.event)
override def manifest(event: T): String = ""
}
//#event-wrapper
//#install-event-adapter
val eventAdapterBehavior: Behavior[Command] =
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("abc"),
emptyState = State(),
commandHandler = (state, cmd) => throw new RuntimeException("TODO: process the command & return an Effect"),
eventHandler = (state, evt) => throw new RuntimeException("TODO: process the event return the next state"))
.eventAdapter(new WrapperEventAdapter[Event])
//#install-event-adapter
}

View file

@ -6,9 +6,10 @@ package akka.persistence
import akka.actor.{ ActorRef, NoSerializationVerificationNeeded }
import akka.persistence.serialization.Message
import scala.collection.immutable
import akka.annotation.DoNotInherit
/**
* INTERNAL API
*
@ -66,15 +67,17 @@ final case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) extends Per
* @see [[akka.persistence.journal.AsyncWriteJournal]]
* @see [[akka.persistence.journal.AsyncRecovery]]
*/
trait PersistentRepr extends Message {
@DoNotInherit trait PersistentRepr extends Message {
/**
* This persistent message's payload.
* This persistent message's payload (the event).
*/
def payload: Any
/**
* Returns the persistent payload's manifest if available
* Returns the event adapter manifest for the persistent payload (event) if available
* May be `""` if event adapter manifest is not used.
* Note that this is not the same as the manifest of the serialized representation of the `payload`.
*/
def manifest: String
@ -96,26 +99,28 @@ trait PersistentRepr extends Message {
def writerUuid: String
/**
* Creates a new persistent message with the specified `payload`.
* Creates a new persistent message with the specified `payload` (event).
*/
def withPayload(payload: Any): PersistentRepr
/**
* Creates a new persistent message with the specified `manifest`.
* Creates a new persistent message with the specified event adapter `manifest`.
*/
def withManifest(manifest: String): PersistentRepr
/**
* Not used, can always be `false`.
*
* Not used in new records stored with Akka v2.4, but
* old records from v2.3 may have this as `true` if
* it was a non-permanent delete.
*/
def deleted: Boolean
def deleted: Boolean // FIXME deprecate, issue #27278
/**
* Sender of this message.
* Not used, can be `null`
*/
def sender: ActorRef
def sender: ActorRef // FIXME deprecate, issue #27278
/**
* Creates a new copy of this [[PersistentRepr]].