diff --git a/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md b/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md index f1073f7a2b..b0f555141b 100644 --- a/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md +++ b/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md @@ -66,7 +66,7 @@ Scala : @@snip [ReplicatedEventSourcingCompileOnlySpec.scala](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala) { #replicas } Java -: @@snip [ReplicatedEventSourcingCompileOnlyTest.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlyTest.java) { #replicas } +: @@snip [MyReplicatedBehavior.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/MyReplicatedBehavior.java) { #replicas } Then to enable replication create the event sourced behavior with the factory method: @@ -75,7 +75,7 @@ Scala : @@snip [ReplicatedEventSourcingCompileOnlySpec.scala](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala) { #factory } Java -: @@snip [ReplicatedEventSourcingCompileOnlyTest.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlyTest.java) { #factory } +: @@snip [MyReplicatedBehavior.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/MyReplicatedBehavior.java) { #factory } The factory takes in: @@ -90,7 +90,7 @@ Scala : @@snip [ReplicatedEventSourcingCompileOnlySpec.scala](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala) { #factory-shared} Java -: @@snip [ReplicatedEventSourcingCompileOnlyTest.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlyTest.java) { #factory-shared } +: @@snip [MyReplicatedBehavior.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/MyReplicatedBehavior.java) { #factory-shared } @@@ div { .group-scala } @@ -271,6 +271,42 @@ Java More advanced routing among the replicas is currently left as an exercise for the reader (or may be covered in a future release [#29281](https://github.com/akka/akka/issues/29281), [#29319](https://github.com/akka/akka/issues/29319)). +## Tagging events and running projections + +Just like for regular `EventSourcedBehavior`s it is possible to tag events along with persisting them. +This is useful for later retrival of events for a given tag. The same @ref[API for tagging provided for EventSourcedBehavior](persistence.md#tagging) can +be used for replicated event sourced behaviors as well. +Tagging is useful in practice to build queries that lead to other data representations or aggregations of the these event +streams that can more directly serve user queries – known as building the “read side” in @ref[CQRS](cqrs.md) based applications. + +Creating read side projections is possible through [Akka Projection](https://doc.akka.io/docs/akka-projection/current/) +or through direct usage of the @ref[events by tag queries](../persistence-query.md#eventsbytag-and-currenteventsbytag). + +The tagging is invoked in each replicas, which requires some special care in using tags, or else the same event will be +tagged one time for each replica and show up in the event by tag stream one time for each replica. In addition to this +the tags will be written in the respective journal of the replicas, which means that unless they all share a single journal +the tag streams will be local to the replica even if the same tag is used on multiple replicas. + +One strategy for dealing with this is to include the replica id in the tag name, this means there will be a tagged stream of events +per replica that contains all replicated events, but since the events can arrive in different order, they can also come in different +order per replica tag. + +Another strategy would be to tag only the events that are local to the replica and not events that are replicated. Either +using a tag that will be the same for all replicas, leading to a single stream of tagged events where the events from each +replica is present only once, or with a tag including the replica id meaning that there will be a stream of tagged events +with the events accepted locally for each replica. + +Determining the replica id of the replicated actor itself and the origin replica id of an event is possible through the +@apidoc[ReplicationContext] when the tagger callback is invoked like this: + +Scala +: @@snip [ReplicatedEventSourcingTaggingSpec.scala](/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingTaggingSpec.scala) { #tagging } + +Java +: @@snip [ReplicatedStringSet.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedStringSet.java) { #tagging } + +In this sample we are using a shared journal, and single tag but only tagging local events to it and therefore ending up +with a single stream of tagged events from all replicas without duplicates. ## Direct Replication of Events diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/EventStorage.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/EventStorage.scala index 14f8c90a46..d7b6ac0727 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/EventStorage.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/EventStorage.scala @@ -10,6 +10,7 @@ import scala.collection.immutable import scala.util.{ Failure, Success, Try } import akka.annotation.InternalApi import akka.persistence.PersistentRepr +import akka.persistence.journal.Tagged import akka.persistence.testkit.ProcessingPolicy.DefaultPolicies import akka.persistence.testkit.internal.TestKitStorage import akka.util.ccompat.JavaConverters._ @@ -46,7 +47,11 @@ private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, Per val grouped = elems.groupBy(_.persistenceId) val processed = grouped.map { - case (pid, els) => currentPolicy.tryProcess(pid, WriteEvents(els.map(_.payload))) + case (pid, els) => + currentPolicy.tryProcess(pid, WriteEvents(els.map(_.payload match { + case Tagged(payload, _) => payload + case nonTagged => nonTagged + }))) } val reduced: ProcessingResult = @@ -81,6 +86,23 @@ private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, Per } } + def tryReadByTag(tag: String): immutable.Seq[PersistentRepr] = { + val batch = readAll() + .filter(repr => + repr.payload match { + case Tagged(_, tags) => tags.contains(tag) + case _ => false + }) + .toVector + .sortBy(_.timestamp) + + currentPolicy.tryProcess(tag, ReadEvents(batch)) match { + case ProcessingSuccess => batch + case Reject(ex) => throw ex + case StorageFailure(ex) => throw ex + } + } + def tryReadSeqNumber(persistenceId: String): Long = { currentPolicy.tryProcess(persistenceId, ReadSeqNum) match { case ProcessingSuccess => getHighestSeqNumber(persistenceId) diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala index 8738d8c206..f0f67418c4 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala @@ -12,7 +12,8 @@ import scala.util.Try import com.typesafe.config.{ Config, ConfigFactory } import akka.annotation.InternalApi import akka.persistence._ -import akka.persistence.journal.{ AsyncWriteJournal, Tagged } +import akka.persistence.journal.AsyncWriteJournal +import akka.persistence.journal.Tagged import akka.persistence.snapshot.SnapshotStore import akka.persistence.testkit.internal.{ InMemStorageExtension, SnapshotStorageEmulatorExtension } import akka.util.unused @@ -35,8 +36,7 @@ class PersistenceTestKitPlugin(@unused cfg: Config, cfgPath: String) extends Asy Future.fromTry(Try(messages.map(aw => { val data = aw.payload.map(pl => pl.payload match { - case Tagged(p, _) => pl.withPayload(p).withTimestamp(System.currentTimeMillis()) - case _ => pl.withTimestamp(System.currentTimeMillis()) + case _ => pl.withTimestamp(System.currentTimeMillis()) }) val result: Try[Unit] = storage.tryAdd(data) @@ -54,7 +54,19 @@ class PersistenceTestKitPlugin(@unused cfg: Config, cfgPath: String) extends Asy override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)( recoveryCallback: PersistentRepr => Unit): Future[Unit] = - Future.fromTry(Try(storage.tryRead(persistenceId, fromSequenceNr, toSequenceNr, max).foreach(recoveryCallback))) + Future.fromTry( + Try( + storage + .tryRead(persistenceId, fromSequenceNr, toSequenceNr, max) + .map { repr => + // we keep the tags in the repr, so remove those here + repr.payload match { + case Tagged(payload, _) => repr.withPayload(payload) + case _ => repr + } + + } + .foreach(recoveryCallback))) override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = Future.fromTry(Try { diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SerializedEventStorageImpl.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SerializedEventStorageImpl.scala index 895874a423..8b4970bd5e 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SerializedEventStorageImpl.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SerializedEventStorageImpl.scala @@ -7,6 +7,7 @@ package akka.persistence.testkit.internal import akka.actor.{ ActorSystem, ExtendedActorSystem } import akka.annotation.InternalApi import akka.persistence.PersistentRepr +import akka.persistence.journal.Tagged import akka.persistence.testkit.EventStorage import akka.persistence.testkit.internal.SerializedEventStorageImpl.Serialized import akka.serialization.{ Serialization, SerializationExtension, Serializers } @@ -20,6 +21,7 @@ private[testkit] object SerializedEventStorageImpl { payloadSerManifest: String, writerUuid: String, payload: Array[Byte], + tags: Set[String], metadata: Option[Any]) } @@ -38,7 +40,10 @@ private[testkit] class SerializedEventStorageImpl(system: ActorSystem) extends E */ override def toInternal(pr: PersistentRepr): Serialized = Serialization.withTransportInformation(system.asInstanceOf[ExtendedActorSystem]) { () => - val payload = pr.payload.asInstanceOf[AnyRef] + val (payload, tags) = pr.payload match { + case Tagged(event: AnyRef, tags) => (event, tags) + case event: AnyRef => (event, Set.empty[String]) + } val s = serialization.findSerializerFor(payload) val manifest = Serializers.manifestFor(s, payload) Serialized( @@ -48,6 +53,7 @@ private[testkit] class SerializedEventStorageImpl(system: ActorSystem) extends E manifest, pr.writerUuid, s.toBinary(payload), + tags, pr.metadata) } @@ -56,7 +62,10 @@ private[testkit] class SerializedEventStorageImpl(system: ActorSystem) extends E */ override def toRepr(internal: Serialized): PersistentRepr = { val event = serialization.deserialize(internal.payload, internal.payloadSerId, internal.payloadSerManifest).get - val pr = PersistentRepr(event, internal.sequenceNr, internal.persistenceId, writerUuid = internal.writerUuid) + val eventForRepr = + if (internal.tags.isEmpty) event + else Tagged(event, internal.tags) + val pr = PersistentRepr(eventForRepr, internal.sequenceNr, internal.persistenceId, writerUuid = internal.writerUuid) internal.metadata.fold(pr)(meta => pr.withMetadata(meta)) } diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/TestKitStorage.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/TestKitStorage.scala index 6f85dd7d83..47c6ddc7fb 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/TestKitStorage.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/TestKitStorage.scala @@ -8,6 +8,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicReference import scala.collection.immutable +import scala.collection.JavaConverters._ import akka.annotation.InternalApi import akka.persistence.testkit.ProcessingPolicy @@ -86,6 +87,10 @@ sealed trait InMemStorage[K, R] extends InternalReprSupport[R] { def read(key: K): Option[Vector[R]] = Option(eventsMap.get(key)).map(_._2.map(toRepr)) + def readAll(): Iterable[R] = { + eventsMap.values().asScala.flatMap { case (_, events) => events }.map(toRepr) + } + def clearAll(): Unit = eventsMap.clear() diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/internal/EventsByPersistenceIdStage.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/internal/EventsByPersistenceIdStage.scala index 1f2bc623d0..5d82e6bb1e 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/internal/EventsByPersistenceIdStage.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/internal/EventsByPersistenceIdStage.scala @@ -5,6 +5,7 @@ package akka.persistence.testkit.query.internal import akka.actor.ActorRef import akka.annotation.InternalApi +import akka.persistence.journal.Tagged import akka.persistence.query.{ EventEnvelope, Sequence } import akka.persistence.testkit.{ EventStorage, PersistenceTestKitPlugin } import akka.stream.{ Attributes, Outlet, SourceShape } @@ -49,15 +50,10 @@ final private[akka] class EventsByPersistenceIdStage( log.debug("tryPush available. Query for {} {} result {}", currentSequenceNr, currentSequenceNr, event) event.headOption match { case Some(pr) => - push( - out, - EventEnvelope( - Sequence(pr.sequenceNr), - pr.persistenceId, - pr.sequenceNr, - pr.payload, - pr.timestamp, - pr.metadata)) + push(out, EventEnvelope(Sequence(pr.sequenceNr), pr.persistenceId, pr.sequenceNr, pr.payload match { + case Tagged(payload, _) => payload + case payload => payload + }, pr.timestamp, pr.metadata)) if (currentSequenceNr == toSequenceNr) { completeStage() } else { diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/javadsl/PersistenceTestKitReadJournal.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/javadsl/PersistenceTestKitReadJournal.scala index 9114994f10..2b96285343 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/javadsl/PersistenceTestKitReadJournal.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/javadsl/PersistenceTestKitReadJournal.scala @@ -5,7 +5,13 @@ package akka.persistence.testkit.query.javadsl import akka.NotUsed import akka.persistence.query.EventEnvelope -import akka.persistence.query.javadsl.{ CurrentEventsByPersistenceIdQuery, EventsByPersistenceIdQuery, ReadJournal } +import akka.persistence.query.Offset +import akka.persistence.query.javadsl.{ + CurrentEventsByPersistenceIdQuery, + CurrentEventsByTagQuery, + EventsByPersistenceIdQuery, + ReadJournal +} import akka.stream.javadsl.Source import akka.persistence.testkit.query.scaladsl @@ -16,7 +22,8 @@ object PersistenceTestKitReadJournal { final class PersistenceTestKitReadJournal(delegate: scaladsl.PersistenceTestKitReadJournal) extends ReadJournal with EventsByPersistenceIdQuery - with CurrentEventsByPersistenceIdQuery { + with CurrentEventsByPersistenceIdQuery + with CurrentEventsByTagQuery { override def eventsByPersistenceId( persistenceId: String, @@ -29,4 +36,8 @@ final class PersistenceTestKitReadJournal(delegate: scaladsl.PersistenceTestKitR fromSequenceNr: Long, toSequenceNr: Long): Source[EventEnvelope, NotUsed] = delegate.currentEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr).asJava + + override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = + delegate.currentEventsByTag(tag, offset).asJava + } diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala index 1df3f34bde..491032f04f 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala @@ -5,6 +5,10 @@ package akka.persistence.testkit.query.scaladsl import akka.NotUsed import akka.actor.ExtendedActorSystem +import akka.persistence.journal.Tagged +import akka.persistence.query.NoOffset +import akka.persistence.query.Offset +import akka.persistence.query.scaladsl.CurrentEventsByTagQuery import akka.persistence.query.{ EventEnvelope, Sequence } import akka.persistence.query.scaladsl.{ CurrentEventsByPersistenceIdQuery, EventsByPersistenceIdQuery, ReadJournal } import akka.persistence.testkit.EventStorage @@ -22,7 +26,8 @@ object PersistenceTestKitReadJournal { final class PersistenceTestKitReadJournal(system: ExtendedActorSystem, @unused config: Config, configPath: String) extends ReadJournal with EventsByPersistenceIdQuery - with CurrentEventsByPersistenceIdQuery { + with CurrentEventsByPersistenceIdQuery + with CurrentEventsByTagQuery { private val log = LoggerFactory.getLogger(getClass) @@ -33,6 +38,11 @@ final class PersistenceTestKitReadJournal(system: ExtendedActorSystem, @unused c InMemStorageExtension(system).storageFor(storagePluginId) } + private def unwrapTaggedPayload(payload: Any): Any = payload match { + case Tagged(payload, _) => payload + case payload => payload + } + override def eventsByPersistenceId( persistenceId: String, fromSequenceNr: Long, @@ -45,7 +55,30 @@ final class PersistenceTestKitReadJournal(system: ExtendedActorSystem, @unused c fromSequenceNr: Long, toSequenceNr: Long): Source[EventEnvelope, NotUsed] = { Source(storage.tryRead(persistenceId, fromSequenceNr, toSequenceNr, Long.MaxValue)).map { pr => - EventEnvelope(Sequence(pr.sequenceNr), persistenceId, pr.sequenceNr, pr.payload, pr.timestamp, pr.metadata) + EventEnvelope( + Sequence(pr.sequenceNr), + persistenceId, + pr.sequenceNr, + unwrapTaggedPayload(pr.payload), + pr.timestamp, + pr.metadata) + } + } + + override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = { + offset match { + case NoOffset => + case _ => + throw new UnsupportedOperationException("Offsets not supported for persistence test kit currentEventsByTag yet") + } + Source(storage.tryReadByTag(tag)).map { pr => + EventEnvelope( + Sequence(pr.timestamp), + pr.persistenceId, + pr.sequenceNr, + unwrapTaggedPayload(pr.payload), + pr.timestamp, + pr.metadata) } } } diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala index 2fa2033421..49b90c8b1c 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala @@ -18,6 +18,7 @@ import akka.annotation.ApiMayChange import akka.persistence.Persistence import akka.persistence.PersistentRepr import akka.persistence.SnapshotMetadata +import akka.persistence.journal.Tagged import akka.persistence.testkit._ import akka.persistence.testkit.internal.InMemStorageExtension import akka.persistence.testkit.internal.SnapshotStorageEmulatorExtension @@ -493,7 +494,10 @@ class PersistenceTestKit(system: ActorSystem) def persistedInStorage(persistenceId: String): immutable.Seq[Any] = storage.read(persistenceId).getOrElse(List.empty).map(reprToAny) - override private[testkit] def reprToAny(repr: PersistentRepr): Any = repr.payload + override private[testkit] def reprToAny(repr: PersistentRepr): Any = repr.payload match { + case Tagged(payload, _) => payload + case payload => payload + } } @ApiMayChange diff --git a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/query/CurrentEventsByTagSpec.scala b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/query/CurrentEventsByTagSpec.scala new file mode 100644 index 0000000000..0f38947075 --- /dev/null +++ b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/query/CurrentEventsByTagSpec.scala @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.testkit.query + +import akka.Done +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorRef +import akka.persistence.query.NoOffset +import akka.persistence.query.PersistenceQuery +import akka.persistence.testkit.query.EventsByPersistenceIdSpec.Command +import akka.persistence.testkit.query.EventsByPersistenceIdSpec.testBehaviour +import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal +import akka.stream.scaladsl.Sink +import org.scalatest.wordspec.AnyWordSpecLike + +class CurrentEventsByTagSpec + extends ScalaTestWithActorTestKit(EventsByPersistenceIdSpec.config) + with LogCapturing + with AnyWordSpecLike { + + implicit val classic = system.classicSystem + + val queries = + PersistenceQuery(system).readJournalFor[PersistenceTestKitReadJournal](PersistenceTestKitReadJournal.Identifier) + + def setup(persistenceId: String): ActorRef[Command] = { + val probe = createTestProbe[Done]() + val ref = setupEmpty(persistenceId) + ref ! Command(s"$persistenceId-1", probe.ref) + ref ! Command(s"$persistenceId-2", probe.ref) + ref ! Command(s"$persistenceId-3", probe.ref) + probe.expectMessage(Done) + probe.expectMessage(Done) + probe.expectMessage(Done) + ref + } + + def setupEmpty(persistenceId: String): ActorRef[Command] = { + spawn( + testBehaviour(persistenceId).withTagger(evt => + if (evt.indexOf('-') > 0) Set(evt.split('-')(1), "all") + else Set("all"))) + } + + "Persistent test kit currentEventsByTag query" must { + + "find tagged events ordered by insert time" in { + val probe = createTestProbe[Done]() + val ref1 = setupEmpty("taggedpid-1") + val ref2 = setupEmpty("taggedpid-2") + ref1 ! Command("evt-1", probe.ref) + ref1 ! Command("evt-2", probe.ref) + ref1 ! Command("evt-3", probe.ref) + probe.receiveMessages(3) + ref2 ! Command("evt-4", probe.ref) + probe.receiveMessage() + ref1 ! Command("evt-5", probe.ref) + probe.receiveMessage() + + queries.currentEventsByTag("all", NoOffset).runWith(Sink.seq).futureValue.map(_.event) should ===( + Seq("evt-1", "evt-2", "evt-3", "evt-4", "evt-5")) + } + } + +} diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/MyReplicatedBehavior.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/MyReplicatedBehavior.java new file mode 100644 index 0000000000..3ada97caf7 --- /dev/null +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/MyReplicatedBehavior.java @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.akka.persistence.typed; + +import akka.actor.typed.Behavior; +import akka.persistence.typed.ReplicaId; +import akka.persistence.typed.javadsl.*; + +import java.util.*; + +// #factory +public class MyReplicatedBehavior + extends ReplicatedEventSourcedBehavior< + MyReplicatedBehavior.Command, MyReplicatedBehavior.Event, MyReplicatedBehavior.State> { + // #factory + interface Command {} + + interface State {} + + interface Event {} + + // #replicas + public static final ReplicaId DCA = new ReplicaId("DCA"); + public static final ReplicaId DCB = new ReplicaId("DCB"); + + public static final Set ALL_REPLICAS = + Collections.unmodifiableSet(new HashSet<>(Arrays.asList(DCA, DCB))); + // #replicas + + // #factory-shared + public static Behavior create( + String entityId, ReplicaId replicaId, String queryPluginId) { + return ReplicatedEventSourcing.withSharedJournal( + entityId, replicaId, ALL_REPLICAS, queryPluginId, MyReplicatedBehavior::new); + } + // #factory-shared + + // #factory + public static Behavior create(String entityId, ReplicaId replicaId) { + Map allReplicasAndQueryPlugins = new HashMap<>(); + allReplicasAndQueryPlugins.put(DCA, "journalForDCA"); + allReplicasAndQueryPlugins.put(DCB, "journalForDCB"); + + return ReplicatedEventSourcing.create( + entityId, replicaId, allReplicasAndQueryPlugins, MyReplicatedBehavior::new); + } + + private MyReplicatedBehavior(ReplicationContext replicationContext) { + super(replicationContext); + } + // #factory + + @Override + public State emptyState() { + throw new UnsupportedOperationException("dummy for example"); + } + + @Override + public CommandHandler commandHandler() { + throw new UnsupportedOperationException("dummy for example"); + } + + @Override + public EventHandler eventHandler() { + throw new UnsupportedOperationException("dummy for example"); + } +} diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlyTest.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlyTest.java deleted file mode 100644 index d9928f2bd6..0000000000 --- a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlyTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright (C) 2020 Lightbend Inc. - */ - -package jdocs.akka.persistence.typed; - -import akka.persistence.typed.ReplicaId; -import akka.persistence.typed.javadsl.*; - -import java.util.*; - -public class ReplicatedEventSourcingCompileOnlyTest { - - // dummy for docs example - interface Command {} - - interface Event {} - - interface State {} - - static // #factory - final class MyReplicatedEventSourcedBehavior - extends ReplicatedEventSourcedBehavior { - - public MyReplicatedEventSourcedBehavior(ReplicationContext replicationContext) { - super(replicationContext); - } - // ... implementation of abstract methods ... - // #factory - - @Override - public State emptyState() { - return null; - } - - @Override - public CommandHandler commandHandler() { - return null; - } - - @Override - public EventHandler eventHandler() { - return null; - } - // #factory - } - - // #factory - - { - // #replicas - ReplicaId DCA = new ReplicaId("DC-A"); - ReplicaId DCB = new ReplicaId("DC-B"); - Set allReplicas = - Collections.unmodifiableSet(new HashSet<>(Arrays.asList(DCA, DCB))); - // #replicas - - String queryPluginId = ""; - - // #factory-shared - ReplicatedEventSourcing.withSharedJournal( - "entityId", - DCA, - allReplicas, - queryPluginId, - context -> new MyReplicatedEventSourcedBehavior(context)); - // #factory-shared - - // #factory - - // bootstrap logic - Map allReplicasAndQueryPlugins = new HashMap<>(); - allReplicasAndQueryPlugins.put(DCA, "journalForDCA"); - allReplicasAndQueryPlugins.put(DCB, "journalForDCB"); - - EventSourcedBehavior behavior = - ReplicatedEventSourcing.create( - "entityId", - DCA, - allReplicasAndQueryPlugins, - context -> new MyReplicatedEventSourcedBehavior(context)); - // #factory - } -} diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedStringSet.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedStringSet.java new file mode 100644 index 0000000000..0dc9e078a0 --- /dev/null +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedStringSet.java @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.akka.persistence.typed; + +import akka.actor.typed.Behavior; +import akka.persistence.testkit.query.javadsl.PersistenceTestKitReadJournal; +import akka.persistence.typed.ReplicaId; +import akka.persistence.typed.javadsl.*; + +import java.util.HashSet; +import java.util.Set; + +public final class ReplicatedStringSet + extends ReplicatedEventSourcedBehavior> { + interface Command {} + + public static final class AddString implements Command { + final String string; + + public AddString(String string) { + this.string = string; + } + } + + public static Behavior create( + String entityId, ReplicaId replicaId, Set allReplicas) { + return ReplicatedEventSourcing.withSharedJournal( + entityId, + replicaId, + allReplicas, + PersistenceTestKitReadJournal.Identifier(), + ReplicatedStringSet::new); + } + + private ReplicatedStringSet(ReplicationContext replicationContext) { + super(replicationContext); + } + + @Override + public Set emptyState() { + return new HashSet<>(); + } + + @Override + public CommandHandler> commandHandler() { + return newCommandHandlerBuilder() + .forAnyState() + .onCommand( + AddString.class, + (state, cmd) -> { + if (!state.contains(cmd.string)) return Effect().persist(cmd.string); + else return Effect().none(); + }) + .build(); + } + + @Override + public EventHandler, String> eventHandler() { + return newEventHandlerBuilder() + .forAnyState() + .onAnyEvent( + (set, string) -> { + HashSet newState = new HashSet<>(set); + newState.add(string); + return newState; + }); + } + + // #tagging + @Override + public Set tagsFor(String event) { + // don't apply tags if event was replicated here, it already will appear in queries by tag + // as the origin replica would have tagged it already + if (getReplicationContext().replicaId() != getReplicationContext().origin()) { + return new HashSet<>(); + } else { + Set tags = new HashSet<>(); + tags.add("strings"); + if (event.length() > 10) tags.add("long-strings"); + return tags; + } + } + // #tagging +} diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingTaggingSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingTaggingSpec.scala new file mode 100644 index 0000000000..033a8dee14 --- /dev/null +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingTaggingSpec.scala @@ -0,0 +1,114 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.typed + +import java.util.concurrent.atomic.AtomicInteger + +import akka.Done +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorRef +import akka.persistence.query.NoOffset +import akka.persistence.query.scaladsl.CurrentEventsByTagQuery +import akka.persistence.query.PersistenceQuery +import akka.persistence.testkit.PersistenceTestKitPlugin +import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.persistence.typed.scaladsl.ReplicatedEventSourcing +import akka.stream.scaladsl.Sink +import akka.serialization.jackson.CborSerializable +import org.scalatest.concurrent.Eventually +import org.scalatest.wordspec.AnyWordSpecLike + +object ReplicatedEventSourcingTaggingSpec { + + val ReplicaId1 = ReplicaId("R1") + val ReplicaId2 = ReplicaId("R2") + val AllReplicas = Set(ReplicaId1, ReplicaId2) + val queryPluginId = PersistenceTestKitReadJournal.Identifier + + object ReplicatedStringSet { + + sealed trait Command + case class Add(description: String, replyTo: ActorRef[Done]) extends Command + case class GetStrings(replyTo: ActorRef[Set[String]]) extends Command + case class State(strings: Set[String]) extends CborSerializable + + def apply( + entityId: String, + replica: ReplicaId, + allReplicas: Set[ReplicaId]): EventSourcedBehavior[Command, String, State] = { + // #tagging + ReplicatedEventSourcing.withSharedJournal(entityId, replica, allReplicas, queryPluginId)( + replicationContext => + EventSourcedBehavior[Command, String, State]( + replicationContext.persistenceId, + State(Set.empty), + (state, command) => + command match { + case Add(string, ack) => + if (state.strings.contains(string)) Effect.none.thenRun(_ => ack ! Done) + else Effect.persist(string).thenRun(_ => ack ! Done) + case GetStrings(replyTo) => + replyTo ! state.strings + Effect.none + }, + (state, event) => state.copy(strings = state.strings + event)) + // use withTagger to define tagging logic + .withTagger( + event => + // don't apply tags if event was replicated here, it already will appear in queries by tag + // as the origin replica would have tagged it already + if (replicationContext.origin != replicationContext.replicaId) Set.empty + else if (event.length > 10) Set("long-strings", "strings") + else Set("strings"))) + // #tagging + } + } + +} + +class ReplicatedEventSourcingTaggingSpec + extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config) + with AnyWordSpecLike + with LogCapturing + with Eventually { + import ReplicatedEventSourcingTaggingSpec._ + val ids = new AtomicInteger(0) + def nextEntityId = s"e-${ids.getAndIncrement()}" + "ReplicatedEventSourcing" should { + "allow for tagging of events using the replication context" in { + val entityId = nextEntityId + val probe = createTestProbe[Done]() + val r1 = spawn(ReplicatedStringSet(entityId, ReplicaId1, AllReplicas)) + val r2 = spawn(ReplicatedStringSet(entityId, ReplicaId2, AllReplicas)) + r1 ! ReplicatedStringSet.Add("from r1", probe.ref) + r2 ! ReplicatedStringSet.Add("from r2", probe.ref) + probe.receiveMessages(2) + r1 ! ReplicatedStringSet.Add("a very long string from r1", probe.ref) + probe.receiveMessages(1) + + val allEvents = Set("from r1", "from r2", "a very long string from r1") + for (replica <- r1 :: r2 :: Nil) { + eventually { + val probe = testKit.createTestProbe[Set[String]]() + replica ! ReplicatedStringSet.GetStrings(probe.ref) + probe.receiveMessage() should ===(allEvents) + } + } + + val query = + PersistenceQuery(system).readJournalFor[CurrentEventsByTagQuery](PersistenceTestKitReadJournal.Identifier) + + val stringTaggedEvents = query.currentEventsByTag("strings", NoOffset).runWith(Sink.seq).futureValue + stringTaggedEvents.map(_.event).toSet should equal(allEvents) + + val longStrings = query.currentEventsByTag("long-strings", NoOffset).runWith(Sink.seq).futureValue + longStrings should have size (1) + + } + } +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index 037a056041..1bac5c2683 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -125,7 +125,7 @@ private[akka] final class ReplayingEvents[C, E, S]( val aaMetaAndSelfReplica: Option[(ReplicatedEventMetadata, ReplicaId, ReplicationSetup)] = setup.replication match { - case Some(aa) => + case Some(replication) => val meta = repr.metadata match { case Some(m) => m.asInstanceOf[ReplicatedEventMetadata] case None => @@ -133,16 +133,16 @@ private[akka] final class ReplayingEvents[C, E, S]( s"Replicated Event Sourcing enabled but existing event has no metadata. Migration isn't supported yet.") } - aa.setContext(recoveryRunning = true, meta.originReplica, meta.concurrent) - Some((meta, aa.replicaId, aa)) + replication.setContext(recoveryRunning = true, meta.originReplica, meta.concurrent) + Some((meta, replication.replicaId, replication)) case None => None } val newState = setup.eventHandler(state.state, event) setup.replication match { - case Some(aa) => - aa.clearContext() + case Some(replication) => + replication.clearContext() case None => } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index 0bfff4afed..e0fb66ffe8 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -398,13 +398,6 @@ private[akka] object Running { this } - def withContext[A](aa: ReplicationSetup, withReplication: ReplicationSetup => Unit, f: () => A): A = { - withReplication(aa) - val result = f() - aa.clearContext() - result - } - private def handleExternalReplicatedEventPersist( replication: ReplicationSetup, event: ReplicatedEvent[E]): Behavior[InternalProtocol] = { @@ -421,17 +414,20 @@ private[akka] object Running { updatedVersion, isConcurrent) - val newState: RunningState[S] = withContext( - replication, - aa => aa.setContext(recoveryRunning = false, event.originReplica, concurrent = isConcurrent), - () => state.applyEvent(setup, event.event)) + replication.setContext(recoveryRunning = false, event.originReplica, concurrent = isConcurrent) + + val stateAfterApply = state.applyEvent(setup, event.event) + val eventToPersist = adaptEvent(event.event) + val eventAdapterManifest = setup.eventAdapter.manifest(event.event) + + replication.clearContext() val newState2: RunningState[S] = internalPersist( setup.context, null, - newState, - event.event, - "", + stateAfterApply, + eventToPersist, + eventAdapterManifest, OptionVal.Some( ReplicatedEventMetadata(event.originReplica, event.originSequenceNr, updatedVersion, isConcurrent))) val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event.event, newState2.seqNr) @@ -449,51 +445,48 @@ private[akka] object Running { event: E, cmd: Any, sideEffects: immutable.Seq[SideEffect[S]]): (Behavior[InternalProtocol], Boolean) = { - // apply the event before persist so that validation exception is handled before persisting - // the invalid event, in case such validation is implemented in the event handler. - // also, ensure that there is an event handler for each single event - _currentSequenceNumber = state.seqNr + 1 + try { + // apply the event before persist so that validation exception is handled before persisting + // the invalid event, in case such validation is implemented in the event handler. + // also, ensure that there is an event handler for each single event + _currentSequenceNumber = state.seqNr + 1 - val newState: RunningState[S] = setup.replication match { - case Some(aa) => - // set concurrent to false, local events are never concurrent - withContext( - aa, - aa => aa.setContext(recoveryRunning = false, aa.replicaId, concurrent = false), - () => state.applyEvent(setup, event)) - case None => - state.applyEvent(setup, event) + setup.replication.foreach(r => r.setContext(recoveryRunning = false, r.replicaId, concurrent = false)) + + val stateAfterApply = state.applyEvent(setup, event) + val eventToPersist = adaptEvent(event) + val eventAdapterManifest = setup.eventAdapter.manifest(event) + + val newState2 = setup.replication match { + case Some(aa) => + val updatedVersion = stateAfterApply.version.updated(aa.replicaId.id, _currentSequenceNumber) + val r = internalPersist( + setup.context, + cmd, + stateAfterApply, + eventToPersist, + eventAdapterManifest, + OptionVal.Some( + ReplicatedEventMetadata(aa.replicaId, _currentSequenceNumber, updatedVersion, concurrent = false))) + .copy(version = updatedVersion) + + if (setup.log.isTraceEnabled()) + setup.log.traceN( + "Event persisted [{}]. Version vector after: [{}]", + Logging.simpleName(event.getClass), + r.version) + + r + case None => + internalPersist(setup.context, cmd, stateAfterApply, eventToPersist, eventAdapterManifest, OptionVal.None) + } + + val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event, newState2.seqNr) + (persistingEvents(newState2, state, numberOfEvents = 1, shouldSnapshotAfterPersist, sideEffects), false) + + } finally { + setup.replication.foreach(_.clearContext()) } - - val eventToPersist = adaptEvent(event) - val eventAdapterManifest = setup.eventAdapter.manifest(event) - - val newState2 = setup.replication match { - case Some(aa) => - val updatedVersion = newState.version.updated(aa.replicaId.id, _currentSequenceNumber) - val r = internalPersist( - setup.context, - cmd, - newState, - eventToPersist, - eventAdapterManifest, - OptionVal.Some( - ReplicatedEventMetadata(aa.replicaId, _currentSequenceNumber, updatedVersion, concurrent = false))) - .copy(version = updatedVersion) - - if (setup.log.isTraceEnabled()) - setup.log.traceN( - "Event persisted [{}]. Version vector after: [{}]", - Logging.simpleName(event.getClass), - r.version) - - r - case None => - internalPersist(setup.context, cmd, newState, eventToPersist, eventAdapterManifest, OptionVal.None) - } - - val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event, newState2.seqNr) - (persistingEvents(newState2, state, numberOfEvents = 1, shouldSnapshotAfterPersist, sideEffects), false) } private def handleEventPersistAll( @@ -501,64 +494,59 @@ private[akka] object Running { cmd: Any, sideEffects: immutable.Seq[SideEffect[S]]): (Behavior[InternalProtocol], Boolean) = { if (events.nonEmpty) { - // apply the event before persist so that validation exception is handled before persisting - // the invalid event, in case such validation is implemented in the event handler. - // also, ensure that there is an event handler for each single event - _currentSequenceNumber = state.seqNr + try { + // apply the event before persist so that validation exception is handled before persisting + // the invalid event, in case such validation is implemented in the event handler. + // also, ensure that there is an event handler for each single event + _currentSequenceNumber = state.seqNr - val metadataTemplate: Option[ReplicatedEventMetadata] = setup.replication match { - case Some(aa) => - aa.setContext(recoveryRunning = false, aa.replicaId, concurrent = false) // local events are never concurrent - Some(ReplicatedEventMetadata(aa.replicaId, 0L, state.version, concurrent = false)) // we replace it with actual seqnr later - case None => None - } - - var currentState = state - var shouldSnapshotAfterPersist: SnapshotAfterPersist = NoSnapshot - var eventsToPersist: List[EventToPersist] = Nil - - events.foreach { event => - _currentSequenceNumber += 1 - if (shouldSnapshotAfterPersist == NoSnapshot) - shouldSnapshotAfterPersist = setup.shouldSnapshot(currentState.state, event, _currentSequenceNumber) - val evtManifest = setup.eventAdapter.manifest(event) - val adaptedEvent = adaptEvent(event) - val eventMetadata = metadataTemplate match { - case Some(template) => - val updatedVersion = currentState.version.updated(template.originReplica.id, _currentSequenceNumber) - if (setup.log.isDebugEnabled) - setup.log.traceN( - "Processing event [{}] with version vector [{}]", - Logging.simpleName(event.getClass), - updatedVersion) - currentState = currentState.copy(version = updatedVersion) - Some(template.copy(originSequenceNr = _currentSequenceNumber, version = updatedVersion)) + val metadataTemplate: Option[ReplicatedEventMetadata] = setup.replication match { + case Some(aa) => + aa.setContext(recoveryRunning = false, aa.replicaId, concurrent = false) // local events are never concurrent + Some(ReplicatedEventMetadata(aa.replicaId, 0L, state.version, concurrent = false)) // we replace it with actual seqnr later case None => None } - currentState = setup.replication match { - case Some(aa) => - withContext( - aa, - aa => aa.setContext(recoveryRunning = false, aa.replicaId, concurrent = false), - () => currentState.applyEvent(setup, event)) - case None => - currentState.applyEvent(setup, event) + var currentState = state + var shouldSnapshotAfterPersist: SnapshotAfterPersist = NoSnapshot + var eventsToPersist: List[EventToPersist] = Nil + + events.foreach { event => + _currentSequenceNumber += 1 + if (shouldSnapshotAfterPersist == NoSnapshot) + shouldSnapshotAfterPersist = setup.shouldSnapshot(currentState.state, event, _currentSequenceNumber) + val evtManifest = setup.eventAdapter.manifest(event) + val adaptedEvent = adaptEvent(event) + val eventMetadata = metadataTemplate match { + case Some(template) => + val updatedVersion = currentState.version.updated(template.originReplica.id, _currentSequenceNumber) + if (setup.log.isDebugEnabled) + setup.log.traceN( + "Processing event [{}] with version vector [{}]", + Logging.simpleName(event.getClass), + updatedVersion) + currentState = currentState.copy(version = updatedVersion) + Some(template.copy(originSequenceNr = _currentSequenceNumber, version = updatedVersion)) + case None => None + } + + currentState = currentState.applyEvent(setup, event) + + eventsToPersist = EventToPersist(adaptedEvent, evtManifest, eventMetadata) :: eventsToPersist } - eventsToPersist = EventToPersist(adaptedEvent, evtManifest, eventMetadata) :: eventsToPersist + val newState2 = + internalPersistAll(setup.context, cmd, currentState, eventsToPersist.reverse) + + (persistingEvents(newState2, state, events.size, shouldSnapshotAfterPersist, sideEffects), false) + } finally { + setup.replication.foreach(_.clearContext()) } - - val newState2 = - internalPersistAll(setup.context, cmd, currentState, eventsToPersist.reverse) - - (persistingEvents(newState2, state, events.size, shouldSnapshotAfterPersist, sideEffects), false) } else { // run side-effects even when no events are emitted (applySideEffects(sideEffects, state), true) } } - @tailrec def applyEffects( msg: Any, state: RunningState[S],