Tagging for replicated event sourcing (#29442)

* Tagging for replicated event sourcing
* Docs improvements
* Support for currentEventsByTag in persistence testkit and test coverage for tags in replicated
This commit is contained in:
Johan Andrén 2020-07-31 14:07:37 +02:00 committed by Christopher Batey
parent c945fbd7a1
commit 9fb76bbea4
16 changed files with 585 additions and 216 deletions

View file

@ -66,7 +66,7 @@ Scala
: @@snip [ReplicatedEventSourcingCompileOnlySpec.scala](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala) { #replicas }
Java
: @@snip [ReplicatedEventSourcingCompileOnlyTest.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlyTest.java) { #replicas }
: @@snip [MyReplicatedBehavior.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/MyReplicatedBehavior.java) { #replicas }
Then to enable replication create the event sourced behavior with the factory method:
@ -75,7 +75,7 @@ Scala
: @@snip [ReplicatedEventSourcingCompileOnlySpec.scala](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala) { #factory }
Java
: @@snip [ReplicatedEventSourcingCompileOnlyTest.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlyTest.java) { #factory }
: @@snip [MyReplicatedBehavior.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/MyReplicatedBehavior.java) { #factory }
The factory takes in:
@ -90,7 +90,7 @@ Scala
: @@snip [ReplicatedEventSourcingCompileOnlySpec.scala](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlySpec.scala) { #factory-shared}
Java
: @@snip [ReplicatedEventSourcingCompileOnlyTest.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedEventSourcingCompileOnlyTest.java) { #factory-shared }
: @@snip [MyReplicatedBehavior.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/MyReplicatedBehavior.java) { #factory-shared }
@@@ div { .group-scala }
@ -271,6 +271,42 @@ Java
More advanced routing among the replicas is currently left as an exercise for the reader (or may be covered in a future release [#29281](https://github.com/akka/akka/issues/29281), [#29319](https://github.com/akka/akka/issues/29319)).
## Tagging events and running projections
Just like for regular `EventSourcedBehavior`s it is possible to tag events along with persisting them.
This is useful for later retrival of events for a given tag. The same @ref[API for tagging provided for EventSourcedBehavior](persistence.md#tagging) can
be used for replicated event sourced behaviors as well.
Tagging is useful in practice to build queries that lead to other data representations or aggregations of the these event
streams that can more directly serve user queries known as building the “read side” in @ref[CQRS](cqrs.md) based applications.
Creating read side projections is possible through [Akka Projection](https://doc.akka.io/docs/akka-projection/current/)
or through direct usage of the @ref[events by tag queries](../persistence-query.md#eventsbytag-and-currenteventsbytag).
The tagging is invoked in each replicas, which requires some special care in using tags, or else the same event will be
tagged one time for each replica and show up in the event by tag stream one time for each replica. In addition to this
the tags will be written in the respective journal of the replicas, which means that unless they all share a single journal
the tag streams will be local to the replica even if the same tag is used on multiple replicas.
One strategy for dealing with this is to include the replica id in the tag name, this means there will be a tagged stream of events
per replica that contains all replicated events, but since the events can arrive in different order, they can also come in different
order per replica tag.
Another strategy would be to tag only the events that are local to the replica and not events that are replicated. Either
using a tag that will be the same for all replicas, leading to a single stream of tagged events where the events from each
replica is present only once, or with a tag including the replica id meaning that there will be a stream of tagged events
with the events accepted locally for each replica.
Determining the replica id of the replicated actor itself and the origin replica id of an event is possible through the
@apidoc[ReplicationContext] when the tagger callback is invoked like this:
Scala
: @@snip [ReplicatedEventSourcingTaggingSpec.scala](/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ReplicatedEventSourcingTaggingSpec.scala) { #tagging }
Java
: @@snip [ReplicatedStringSet.java](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedStringSet.java) { #tagging }
In this sample we are using a shared journal, and single tag but only tagging local events to it and therefore ending up
with a single stream of tagged events from all replicas without duplicates.
## Direct Replication of Events

View file

@ -10,6 +10,7 @@ import scala.collection.immutable
import scala.util.{ Failure, Success, Try }
import akka.annotation.InternalApi
import akka.persistence.PersistentRepr
import akka.persistence.journal.Tagged
import akka.persistence.testkit.ProcessingPolicy.DefaultPolicies
import akka.persistence.testkit.internal.TestKitStorage
import akka.util.ccompat.JavaConverters._
@ -46,7 +47,11 @@ private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, Per
val grouped = elems.groupBy(_.persistenceId)
val processed = grouped.map {
case (pid, els) => currentPolicy.tryProcess(pid, WriteEvents(els.map(_.payload)))
case (pid, els) =>
currentPolicy.tryProcess(pid, WriteEvents(els.map(_.payload match {
case Tagged(payload, _) => payload
case nonTagged => nonTagged
})))
}
val reduced: ProcessingResult =
@ -81,6 +86,23 @@ private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, Per
}
}
def tryReadByTag(tag: String): immutable.Seq[PersistentRepr] = {
val batch = readAll()
.filter(repr =>
repr.payload match {
case Tagged(_, tags) => tags.contains(tag)
case _ => false
})
.toVector
.sortBy(_.timestamp)
currentPolicy.tryProcess(tag, ReadEvents(batch)) match {
case ProcessingSuccess => batch
case Reject(ex) => throw ex
case StorageFailure(ex) => throw ex
}
}
def tryReadSeqNumber(persistenceId: String): Long = {
currentPolicy.tryProcess(persistenceId, ReadSeqNum) match {
case ProcessingSuccess => getHighestSeqNumber(persistenceId)

View file

@ -12,7 +12,8 @@ import scala.util.Try
import com.typesafe.config.{ Config, ConfigFactory }
import akka.annotation.InternalApi
import akka.persistence._
import akka.persistence.journal.{ AsyncWriteJournal, Tagged }
import akka.persistence.journal.AsyncWriteJournal
import akka.persistence.journal.Tagged
import akka.persistence.snapshot.SnapshotStore
import akka.persistence.testkit.internal.{ InMemStorageExtension, SnapshotStorageEmulatorExtension }
import akka.util.unused
@ -35,7 +36,6 @@ class PersistenceTestKitPlugin(@unused cfg: Config, cfgPath: String) extends Asy
Future.fromTry(Try(messages.map(aw => {
val data = aw.payload.map(pl =>
pl.payload match {
case Tagged(p, _) => pl.withPayload(p).withTimestamp(System.currentTimeMillis())
case _ => pl.withTimestamp(System.currentTimeMillis())
})
@ -54,7 +54,19 @@ class PersistenceTestKitPlugin(@unused cfg: Config, cfgPath: String) extends Asy
override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(
recoveryCallback: PersistentRepr => Unit): Future[Unit] =
Future.fromTry(Try(storage.tryRead(persistenceId, fromSequenceNr, toSequenceNr, max).foreach(recoveryCallback)))
Future.fromTry(
Try(
storage
.tryRead(persistenceId, fromSequenceNr, toSequenceNr, max)
.map { repr =>
// we keep the tags in the repr, so remove those here
repr.payload match {
case Tagged(payload, _) => repr.withPayload(payload)
case _ => repr
}
}
.foreach(recoveryCallback)))
override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] =
Future.fromTry(Try {

View file

@ -7,6 +7,7 @@ package akka.persistence.testkit.internal
import akka.actor.{ ActorSystem, ExtendedActorSystem }
import akka.annotation.InternalApi
import akka.persistence.PersistentRepr
import akka.persistence.journal.Tagged
import akka.persistence.testkit.EventStorage
import akka.persistence.testkit.internal.SerializedEventStorageImpl.Serialized
import akka.serialization.{ Serialization, SerializationExtension, Serializers }
@ -20,6 +21,7 @@ private[testkit] object SerializedEventStorageImpl {
payloadSerManifest: String,
writerUuid: String,
payload: Array[Byte],
tags: Set[String],
metadata: Option[Any])
}
@ -38,7 +40,10 @@ private[testkit] class SerializedEventStorageImpl(system: ActorSystem) extends E
*/
override def toInternal(pr: PersistentRepr): Serialized =
Serialization.withTransportInformation(system.asInstanceOf[ExtendedActorSystem]) { () =>
val payload = pr.payload.asInstanceOf[AnyRef]
val (payload, tags) = pr.payload match {
case Tagged(event: AnyRef, tags) => (event, tags)
case event: AnyRef => (event, Set.empty[String])
}
val s = serialization.findSerializerFor(payload)
val manifest = Serializers.manifestFor(s, payload)
Serialized(
@ -48,6 +53,7 @@ private[testkit] class SerializedEventStorageImpl(system: ActorSystem) extends E
manifest,
pr.writerUuid,
s.toBinary(payload),
tags,
pr.metadata)
}
@ -56,7 +62,10 @@ private[testkit] class SerializedEventStorageImpl(system: ActorSystem) extends E
*/
override def toRepr(internal: Serialized): PersistentRepr = {
val event = serialization.deserialize(internal.payload, internal.payloadSerId, internal.payloadSerManifest).get
val pr = PersistentRepr(event, internal.sequenceNr, internal.persistenceId, writerUuid = internal.writerUuid)
val eventForRepr =
if (internal.tags.isEmpty) event
else Tagged(event, internal.tags)
val pr = PersistentRepr(eventForRepr, internal.sequenceNr, internal.persistenceId, writerUuid = internal.writerUuid)
internal.metadata.fold(pr)(meta => pr.withMetadata(meta))
}

View file

@ -8,6 +8,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicReference
import scala.collection.immutable
import scala.collection.JavaConverters._
import akka.annotation.InternalApi
import akka.persistence.testkit.ProcessingPolicy
@ -86,6 +87,10 @@ sealed trait InMemStorage[K, R] extends InternalReprSupport[R] {
def read(key: K): Option[Vector[R]] =
Option(eventsMap.get(key)).map(_._2.map(toRepr))
def readAll(): Iterable[R] = {
eventsMap.values().asScala.flatMap { case (_, events) => events }.map(toRepr)
}
def clearAll(): Unit =
eventsMap.clear()

View file

@ -5,6 +5,7 @@
package akka.persistence.testkit.query.internal
import akka.actor.ActorRef
import akka.annotation.InternalApi
import akka.persistence.journal.Tagged
import akka.persistence.query.{ EventEnvelope, Sequence }
import akka.persistence.testkit.{ EventStorage, PersistenceTestKitPlugin }
import akka.stream.{ Attributes, Outlet, SourceShape }
@ -49,15 +50,10 @@ final private[akka] class EventsByPersistenceIdStage(
log.debug("tryPush available. Query for {} {} result {}", currentSequenceNr, currentSequenceNr, event)
event.headOption match {
case Some(pr) =>
push(
out,
EventEnvelope(
Sequence(pr.sequenceNr),
pr.persistenceId,
pr.sequenceNr,
pr.payload,
pr.timestamp,
pr.metadata))
push(out, EventEnvelope(Sequence(pr.sequenceNr), pr.persistenceId, pr.sequenceNr, pr.payload match {
case Tagged(payload, _) => payload
case payload => payload
}, pr.timestamp, pr.metadata))
if (currentSequenceNr == toSequenceNr) {
completeStage()
} else {

View file

@ -5,7 +5,13 @@
package akka.persistence.testkit.query.javadsl
import akka.NotUsed
import akka.persistence.query.EventEnvelope
import akka.persistence.query.javadsl.{ CurrentEventsByPersistenceIdQuery, EventsByPersistenceIdQuery, ReadJournal }
import akka.persistence.query.Offset
import akka.persistence.query.javadsl.{
CurrentEventsByPersistenceIdQuery,
CurrentEventsByTagQuery,
EventsByPersistenceIdQuery,
ReadJournal
}
import akka.stream.javadsl.Source
import akka.persistence.testkit.query.scaladsl
@ -16,7 +22,8 @@ object PersistenceTestKitReadJournal {
final class PersistenceTestKitReadJournal(delegate: scaladsl.PersistenceTestKitReadJournal)
extends ReadJournal
with EventsByPersistenceIdQuery
with CurrentEventsByPersistenceIdQuery {
with CurrentEventsByPersistenceIdQuery
with CurrentEventsByTagQuery {
override def eventsByPersistenceId(
persistenceId: String,
@ -29,4 +36,8 @@ final class PersistenceTestKitReadJournal(delegate: scaladsl.PersistenceTestKitR
fromSequenceNr: Long,
toSequenceNr: Long): Source[EventEnvelope, NotUsed] =
delegate.currentEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr).asJava
override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
delegate.currentEventsByTag(tag, offset).asJava
}

View file

@ -5,6 +5,10 @@
package akka.persistence.testkit.query.scaladsl
import akka.NotUsed
import akka.actor.ExtendedActorSystem
import akka.persistence.journal.Tagged
import akka.persistence.query.NoOffset
import akka.persistence.query.Offset
import akka.persistence.query.scaladsl.CurrentEventsByTagQuery
import akka.persistence.query.{ EventEnvelope, Sequence }
import akka.persistence.query.scaladsl.{ CurrentEventsByPersistenceIdQuery, EventsByPersistenceIdQuery, ReadJournal }
import akka.persistence.testkit.EventStorage
@ -22,7 +26,8 @@ object PersistenceTestKitReadJournal {
final class PersistenceTestKitReadJournal(system: ExtendedActorSystem, @unused config: Config, configPath: String)
extends ReadJournal
with EventsByPersistenceIdQuery
with CurrentEventsByPersistenceIdQuery {
with CurrentEventsByPersistenceIdQuery
with CurrentEventsByTagQuery {
private val log = LoggerFactory.getLogger(getClass)
@ -33,6 +38,11 @@ final class PersistenceTestKitReadJournal(system: ExtendedActorSystem, @unused c
InMemStorageExtension(system).storageFor(storagePluginId)
}
private def unwrapTaggedPayload(payload: Any): Any = payload match {
case Tagged(payload, _) => payload
case payload => payload
}
override def eventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
@ -45,7 +55,30 @@ final class PersistenceTestKitReadJournal(system: ExtendedActorSystem, @unused c
fromSequenceNr: Long,
toSequenceNr: Long): Source[EventEnvelope, NotUsed] = {
Source(storage.tryRead(persistenceId, fromSequenceNr, toSequenceNr, Long.MaxValue)).map { pr =>
EventEnvelope(Sequence(pr.sequenceNr), persistenceId, pr.sequenceNr, pr.payload, pr.timestamp, pr.metadata)
EventEnvelope(
Sequence(pr.sequenceNr),
persistenceId,
pr.sequenceNr,
unwrapTaggedPayload(pr.payload),
pr.timestamp,
pr.metadata)
}
}
override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = {
offset match {
case NoOffset =>
case _ =>
throw new UnsupportedOperationException("Offsets not supported for persistence test kit currentEventsByTag yet")
}
Source(storage.tryReadByTag(tag)).map { pr =>
EventEnvelope(
Sequence(pr.timestamp),
pr.persistenceId,
pr.sequenceNr,
unwrapTaggedPayload(pr.payload),
pr.timestamp,
pr.metadata)
}
}
}

View file

@ -18,6 +18,7 @@ import akka.annotation.ApiMayChange
import akka.persistence.Persistence
import akka.persistence.PersistentRepr
import akka.persistence.SnapshotMetadata
import akka.persistence.journal.Tagged
import akka.persistence.testkit._
import akka.persistence.testkit.internal.InMemStorageExtension
import akka.persistence.testkit.internal.SnapshotStorageEmulatorExtension
@ -493,7 +494,10 @@ class PersistenceTestKit(system: ActorSystem)
def persistedInStorage(persistenceId: String): immutable.Seq[Any] =
storage.read(persistenceId).getOrElse(List.empty).map(reprToAny)
override private[testkit] def reprToAny(repr: PersistentRepr): Any = repr.payload
override private[testkit] def reprToAny(repr: PersistentRepr): Any = repr.payload match {
case Tagged(payload, _) => payload
case payload => payload
}
}
@ApiMayChange

View file

@ -0,0 +1,68 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.testkit.query
import akka.Done
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
import akka.persistence.query.NoOffset
import akka.persistence.query.PersistenceQuery
import akka.persistence.testkit.query.EventsByPersistenceIdSpec.Command
import akka.persistence.testkit.query.EventsByPersistenceIdSpec.testBehaviour
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.stream.scaladsl.Sink
import org.scalatest.wordspec.AnyWordSpecLike
class CurrentEventsByTagSpec
extends ScalaTestWithActorTestKit(EventsByPersistenceIdSpec.config)
with LogCapturing
with AnyWordSpecLike {
implicit val classic = system.classicSystem
val queries =
PersistenceQuery(system).readJournalFor[PersistenceTestKitReadJournal](PersistenceTestKitReadJournal.Identifier)
def setup(persistenceId: String): ActorRef[Command] = {
val probe = createTestProbe[Done]()
val ref = setupEmpty(persistenceId)
ref ! Command(s"$persistenceId-1", probe.ref)
ref ! Command(s"$persistenceId-2", probe.ref)
ref ! Command(s"$persistenceId-3", probe.ref)
probe.expectMessage(Done)
probe.expectMessage(Done)
probe.expectMessage(Done)
ref
}
def setupEmpty(persistenceId: String): ActorRef[Command] = {
spawn(
testBehaviour(persistenceId).withTagger(evt =>
if (evt.indexOf('-') > 0) Set(evt.split('-')(1), "all")
else Set("all")))
}
"Persistent test kit currentEventsByTag query" must {
"find tagged events ordered by insert time" in {
val probe = createTestProbe[Done]()
val ref1 = setupEmpty("taggedpid-1")
val ref2 = setupEmpty("taggedpid-2")
ref1 ! Command("evt-1", probe.ref)
ref1 ! Command("evt-2", probe.ref)
ref1 ! Command("evt-3", probe.ref)
probe.receiveMessages(3)
ref2 ! Command("evt-4", probe.ref)
probe.receiveMessage()
ref1 ! Command("evt-5", probe.ref)
probe.receiveMessage()
queries.currentEventsByTag("all", NoOffset).runWith(Sink.seq).futureValue.map(_.event) should ===(
Seq("evt-1", "evt-2", "evt-3", "evt-4", "evt-5"))
}
}
}

View file

@ -0,0 +1,69 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.persistence.typed;
import akka.actor.typed.Behavior;
import akka.persistence.typed.ReplicaId;
import akka.persistence.typed.javadsl.*;
import java.util.*;
// #factory
public class MyReplicatedBehavior
extends ReplicatedEventSourcedBehavior<
MyReplicatedBehavior.Command, MyReplicatedBehavior.Event, MyReplicatedBehavior.State> {
// #factory
interface Command {}
interface State {}
interface Event {}
// #replicas
public static final ReplicaId DCA = new ReplicaId("DCA");
public static final ReplicaId DCB = new ReplicaId("DCB");
public static final Set<ReplicaId> ALL_REPLICAS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(DCA, DCB)));
// #replicas
// #factory-shared
public static Behavior<Command> create(
String entityId, ReplicaId replicaId, String queryPluginId) {
return ReplicatedEventSourcing.withSharedJournal(
entityId, replicaId, ALL_REPLICAS, queryPluginId, MyReplicatedBehavior::new);
}
// #factory-shared
// #factory
public static Behavior<Command> create(String entityId, ReplicaId replicaId) {
Map<ReplicaId, String> allReplicasAndQueryPlugins = new HashMap<>();
allReplicasAndQueryPlugins.put(DCA, "journalForDCA");
allReplicasAndQueryPlugins.put(DCB, "journalForDCB");
return ReplicatedEventSourcing.create(
entityId, replicaId, allReplicasAndQueryPlugins, MyReplicatedBehavior::new);
}
private MyReplicatedBehavior(ReplicationContext replicationContext) {
super(replicationContext);
}
// #factory
@Override
public State emptyState() {
throw new UnsupportedOperationException("dummy for example");
}
@Override
public CommandHandler<Command, Event, State> commandHandler() {
throw new UnsupportedOperationException("dummy for example");
}
@Override
public EventHandler<State, Event> eventHandler() {
throw new UnsupportedOperationException("dummy for example");
}
}

View file

@ -1,84 +0,0 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.persistence.typed;
import akka.persistence.typed.ReplicaId;
import akka.persistence.typed.javadsl.*;
import java.util.*;
public class ReplicatedEventSourcingCompileOnlyTest {
// dummy for docs example
interface Command {}
interface Event {}
interface State {}
static // #factory
final class MyReplicatedEventSourcedBehavior
extends ReplicatedEventSourcedBehavior<Command, Event, State> {
public MyReplicatedEventSourcedBehavior(ReplicationContext replicationContext) {
super(replicationContext);
}
// ... implementation of abstract methods ...
// #factory
@Override
public State emptyState() {
return null;
}
@Override
public CommandHandler<Command, Event, State> commandHandler() {
return null;
}
@Override
public EventHandler<State, Event> eventHandler() {
return null;
}
// #factory
}
// #factory
{
// #replicas
ReplicaId DCA = new ReplicaId("DC-A");
ReplicaId DCB = new ReplicaId("DC-B");
Set<ReplicaId> allReplicas =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(DCA, DCB)));
// #replicas
String queryPluginId = "";
// #factory-shared
ReplicatedEventSourcing.withSharedJournal(
"entityId",
DCA,
allReplicas,
queryPluginId,
context -> new MyReplicatedEventSourcedBehavior(context));
// #factory-shared
// #factory
// bootstrap logic
Map<ReplicaId, String> allReplicasAndQueryPlugins = new HashMap<>();
allReplicasAndQueryPlugins.put(DCA, "journalForDCA");
allReplicasAndQueryPlugins.put(DCB, "journalForDCB");
EventSourcedBehavior<Command, Event, State> behavior =
ReplicatedEventSourcing.create(
"entityId",
DCA,
allReplicasAndQueryPlugins,
context -> new MyReplicatedEventSourcedBehavior(context));
// #factory
}
}

View file

@ -0,0 +1,86 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.persistence.typed;
import akka.actor.typed.Behavior;
import akka.persistence.testkit.query.javadsl.PersistenceTestKitReadJournal;
import akka.persistence.typed.ReplicaId;
import akka.persistence.typed.javadsl.*;
import java.util.HashSet;
import java.util.Set;
public final class ReplicatedStringSet
extends ReplicatedEventSourcedBehavior<ReplicatedStringSet.Command, String, Set<String>> {
interface Command {}
public static final class AddString implements Command {
final String string;
public AddString(String string) {
this.string = string;
}
}
public static Behavior<Command> create(
String entityId, ReplicaId replicaId, Set<ReplicaId> allReplicas) {
return ReplicatedEventSourcing.withSharedJournal(
entityId,
replicaId,
allReplicas,
PersistenceTestKitReadJournal.Identifier(),
ReplicatedStringSet::new);
}
private ReplicatedStringSet(ReplicationContext replicationContext) {
super(replicationContext);
}
@Override
public Set<String> emptyState() {
return new HashSet<>();
}
@Override
public CommandHandler<Command, String, Set<String>> commandHandler() {
return newCommandHandlerBuilder()
.forAnyState()
.onCommand(
AddString.class,
(state, cmd) -> {
if (!state.contains(cmd.string)) return Effect().persist(cmd.string);
else return Effect().none();
})
.build();
}
@Override
public EventHandler<Set<String>, String> eventHandler() {
return newEventHandlerBuilder()
.forAnyState()
.onAnyEvent(
(set, string) -> {
HashSet<String> newState = new HashSet<>(set);
newState.add(string);
return newState;
});
}
// #tagging
@Override
public Set<String> tagsFor(String event) {
// don't apply tags if event was replicated here, it already will appear in queries by tag
// as the origin replica would have tagged it already
if (getReplicationContext().replicaId() != getReplicationContext().origin()) {
return new HashSet<>();
} else {
Set<String> tags = new HashSet<>();
tags.add("strings");
if (event.length() > 10) tags.add("long-strings");
return tags;
}
}
// #tagging
}

View file

@ -0,0 +1,114 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed
import java.util.concurrent.atomic.AtomicInteger
import akka.Done
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
import akka.persistence.query.NoOffset
import akka.persistence.query.scaladsl.CurrentEventsByTagQuery
import akka.persistence.query.PersistenceQuery
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.scaladsl.ReplicatedEventSourcing
import akka.stream.scaladsl.Sink
import akka.serialization.jackson.CborSerializable
import org.scalatest.concurrent.Eventually
import org.scalatest.wordspec.AnyWordSpecLike
object ReplicatedEventSourcingTaggingSpec {
val ReplicaId1 = ReplicaId("R1")
val ReplicaId2 = ReplicaId("R2")
val AllReplicas = Set(ReplicaId1, ReplicaId2)
val queryPluginId = PersistenceTestKitReadJournal.Identifier
object ReplicatedStringSet {
sealed trait Command
case class Add(description: String, replyTo: ActorRef[Done]) extends Command
case class GetStrings(replyTo: ActorRef[Set[String]]) extends Command
case class State(strings: Set[String]) extends CborSerializable
def apply(
entityId: String,
replica: ReplicaId,
allReplicas: Set[ReplicaId]): EventSourcedBehavior[Command, String, State] = {
// #tagging
ReplicatedEventSourcing.withSharedJournal(entityId, replica, allReplicas, queryPluginId)(
replicationContext =>
EventSourcedBehavior[Command, String, State](
replicationContext.persistenceId,
State(Set.empty),
(state, command) =>
command match {
case Add(string, ack) =>
if (state.strings.contains(string)) Effect.none.thenRun(_ => ack ! Done)
else Effect.persist(string).thenRun(_ => ack ! Done)
case GetStrings(replyTo) =>
replyTo ! state.strings
Effect.none
},
(state, event) => state.copy(strings = state.strings + event))
// use withTagger to define tagging logic
.withTagger(
event =>
// don't apply tags if event was replicated here, it already will appear in queries by tag
// as the origin replica would have tagged it already
if (replicationContext.origin != replicationContext.replicaId) Set.empty
else if (event.length > 10) Set("long-strings", "strings")
else Set("strings")))
// #tagging
}
}
}
class ReplicatedEventSourcingTaggingSpec
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config)
with AnyWordSpecLike
with LogCapturing
with Eventually {
import ReplicatedEventSourcingTaggingSpec._
val ids = new AtomicInteger(0)
def nextEntityId = s"e-${ids.getAndIncrement()}"
"ReplicatedEventSourcing" should {
"allow for tagging of events using the replication context" in {
val entityId = nextEntityId
val probe = createTestProbe[Done]()
val r1 = spawn(ReplicatedStringSet(entityId, ReplicaId1, AllReplicas))
val r2 = spawn(ReplicatedStringSet(entityId, ReplicaId2, AllReplicas))
r1 ! ReplicatedStringSet.Add("from r1", probe.ref)
r2 ! ReplicatedStringSet.Add("from r2", probe.ref)
probe.receiveMessages(2)
r1 ! ReplicatedStringSet.Add("a very long string from r1", probe.ref)
probe.receiveMessages(1)
val allEvents = Set("from r1", "from r2", "a very long string from r1")
for (replica <- r1 :: r2 :: Nil) {
eventually {
val probe = testKit.createTestProbe[Set[String]]()
replica ! ReplicatedStringSet.GetStrings(probe.ref)
probe.receiveMessage() should ===(allEvents)
}
}
val query =
PersistenceQuery(system).readJournalFor[CurrentEventsByTagQuery](PersistenceTestKitReadJournal.Identifier)
val stringTaggedEvents = query.currentEventsByTag("strings", NoOffset).runWith(Sink.seq).futureValue
stringTaggedEvents.map(_.event).toSet should equal(allEvents)
val longStrings = query.currentEventsByTag("long-strings", NoOffset).runWith(Sink.seq).futureValue
longStrings should have size (1)
}
}
}

View file

@ -125,7 +125,7 @@ private[akka] final class ReplayingEvents[C, E, S](
val aaMetaAndSelfReplica: Option[(ReplicatedEventMetadata, ReplicaId, ReplicationSetup)] =
setup.replication match {
case Some(aa) =>
case Some(replication) =>
val meta = repr.metadata match {
case Some(m) => m.asInstanceOf[ReplicatedEventMetadata]
case None =>
@ -133,16 +133,16 @@ private[akka] final class ReplayingEvents[C, E, S](
s"Replicated Event Sourcing enabled but existing event has no metadata. Migration isn't supported yet.")
}
aa.setContext(recoveryRunning = true, meta.originReplica, meta.concurrent)
Some((meta, aa.replicaId, aa))
replication.setContext(recoveryRunning = true, meta.originReplica, meta.concurrent)
Some((meta, replication.replicaId, replication))
case None => None
}
val newState = setup.eventHandler(state.state, event)
setup.replication match {
case Some(aa) =>
aa.clearContext()
case Some(replication) =>
replication.clearContext()
case None =>
}

View file

@ -398,13 +398,6 @@ private[akka] object Running {
this
}
def withContext[A](aa: ReplicationSetup, withReplication: ReplicationSetup => Unit, f: () => A): A = {
withReplication(aa)
val result = f()
aa.clearContext()
result
}
private def handleExternalReplicatedEventPersist(
replication: ReplicationSetup,
event: ReplicatedEvent[E]): Behavior[InternalProtocol] = {
@ -421,17 +414,20 @@ private[akka] object Running {
updatedVersion,
isConcurrent)
val newState: RunningState[S] = withContext(
replication,
aa => aa.setContext(recoveryRunning = false, event.originReplica, concurrent = isConcurrent),
() => state.applyEvent(setup, event.event))
replication.setContext(recoveryRunning = false, event.originReplica, concurrent = isConcurrent)
val stateAfterApply = state.applyEvent(setup, event.event)
val eventToPersist = adaptEvent(event.event)
val eventAdapterManifest = setup.eventAdapter.manifest(event.event)
replication.clearContext()
val newState2: RunningState[S] = internalPersist(
setup.context,
null,
newState,
event.event,
"",
stateAfterApply,
eventToPersist,
eventAdapterManifest,
OptionVal.Some(
ReplicatedEventMetadata(event.originReplica, event.originSequenceNr, updatedVersion, isConcurrent)))
val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event.event, newState2.seqNr)
@ -449,32 +445,25 @@ private[akka] object Running {
event: E,
cmd: Any,
sideEffects: immutable.Seq[SideEffect[S]]): (Behavior[InternalProtocol], Boolean) = {
try {
// apply the event before persist so that validation exception is handled before persisting
// the invalid event, in case such validation is implemented in the event handler.
// also, ensure that there is an event handler for each single event
_currentSequenceNumber = state.seqNr + 1
val newState: RunningState[S] = setup.replication match {
case Some(aa) =>
// set concurrent to false, local events are never concurrent
withContext(
aa,
aa => aa.setContext(recoveryRunning = false, aa.replicaId, concurrent = false),
() => state.applyEvent(setup, event))
case None =>
state.applyEvent(setup, event)
}
setup.replication.foreach(r => r.setContext(recoveryRunning = false, r.replicaId, concurrent = false))
val stateAfterApply = state.applyEvent(setup, event)
val eventToPersist = adaptEvent(event)
val eventAdapterManifest = setup.eventAdapter.manifest(event)
val newState2 = setup.replication match {
case Some(aa) =>
val updatedVersion = newState.version.updated(aa.replicaId.id, _currentSequenceNumber)
val updatedVersion = stateAfterApply.version.updated(aa.replicaId.id, _currentSequenceNumber)
val r = internalPersist(
setup.context,
cmd,
newState,
stateAfterApply,
eventToPersist,
eventAdapterManifest,
OptionVal.Some(
@ -489,11 +478,15 @@ private[akka] object Running {
r
case None =>
internalPersist(setup.context, cmd, newState, eventToPersist, eventAdapterManifest, OptionVal.None)
internalPersist(setup.context, cmd, stateAfterApply, eventToPersist, eventAdapterManifest, OptionVal.None)
}
val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event, newState2.seqNr)
(persistingEvents(newState2, state, numberOfEvents = 1, shouldSnapshotAfterPersist, sideEffects), false)
} finally {
setup.replication.foreach(_.clearContext())
}
}
private def handleEventPersistAll(
@ -501,6 +494,7 @@ private[akka] object Running {
cmd: Any,
sideEffects: immutable.Seq[SideEffect[S]]): (Behavior[InternalProtocol], Boolean) = {
if (events.nonEmpty) {
try {
// apply the event before persist so that validation exception is handled before persisting
// the invalid event, in case such validation is implemented in the event handler.
// also, ensure that there is an event handler for each single event
@ -536,15 +530,7 @@ private[akka] object Running {
case None => None
}
currentState = setup.replication match {
case Some(aa) =>
withContext(
aa,
aa => aa.setContext(recoveryRunning = false, aa.replicaId, concurrent = false),
() => currentState.applyEvent(setup, event))
case None =>
currentState.applyEvent(setup, event)
}
currentState = currentState.applyEvent(setup, event)
eventsToPersist = EventToPersist(adaptedEvent, evtManifest, eventMetadata) :: eventsToPersist
}
@ -553,12 +539,14 @@ private[akka] object Running {
internalPersistAll(setup.context, cmd, currentState, eventsToPersist.reverse)
(persistingEvents(newState2, state, events.size, shouldSnapshotAfterPersist, sideEffects), false)
} finally {
setup.replication.foreach(_.clearContext())
}
} else {
// run side-effects even when no events are emitted
(applySideEffects(sideEffects, state), true)
}
}
@tailrec def applyEffects(
msg: Any,
state: RunningState[S],