From c44302bd1e8fca873cd5eaf89a5b1d3fe2150156 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Mon, 29 Jun 2020 08:06:59 +0200 Subject: [PATCH] Speculative replication - step 1 (#29289) * Step 1: general event-publishing-to-topic feature * Step 2: an actor subscribes to the topic and forwards events to the sharded replicas * Another half piece of the puzzle, receive the PublishedEvent in the ESB internals * Stash published events while replaying rather than drop * Publish on the event stream instead of a topic * Active active actor receiving a published event * Some smaller changes * Public API for published event * Better name for the sharding component * Naive test for the Active Active Sharding Replication * Java API for ActiveActiveShardingReplication * Spelling * Use ShardingEnvelope for publishing the event across sharding * Fast forwarding filter stage * Move test to testkit, enable the see-event-twice test (fails) * Use persistence testkit journal * Various smaller review feedback things * Trying to figure out why duplicate event write test fails * Missing unstash after processing published event Co-authored-by: Christopher Batey --- ...ctiveActiveShardingDirectReplication.scala | 90 ++++++++++ .../typed/Murmur2MessageExtractor.scala | 2 +- ...eActiveShardingDirectReplicationSpec.scala | 52 ++++++ .../src/test/resources/logback-test.xml | 4 +- .../ActiveActiveEventPublishingSpec.scala | 150 ++++++++++++++++ .../typed/EventPublishingSpec.scala | 84 +++++++++ .../persistence/typed/PublishedEvent.scala | 31 ++++ .../typed/internal/BehaviorSetup.scala | 13 +- .../internal/EventSourcedBehaviorImpl.scala | 42 ++++- .../typed/internal/FastForwardingFilter.scala | 72 ++++++++ .../typed/internal/ReplayingEvents.scala | 8 +- .../typed/internal/ReplayingSnapshot.scala | 5 + .../persistence/typed/internal/Running.scala | 166 ++++++++++++++++-- .../typed/internal/StashManagement.scala | 2 +- .../typed/scaladsl/EventSourcedBehavior.scala | 10 +- .../src/test/resources/logback-test.xml | 2 +- .../EventSourcedBehaviorWatchSpec.scala | 3 +- build.sbt | 2 +- 18 files changed, 709 insertions(+), 29 deletions(-) create mode 100644 akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplication.scala create mode 100644 akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplicationSpec.scala create mode 100644 akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveEventPublishingSpec.scala create mode 100644 akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/EventPublishingSpec.scala create mode 100644 akka-persistence-typed/src/main/scala/akka/persistence/typed/PublishedEvent.scala create mode 100644 akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/FastForwardingFilter.scala diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplication.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplication.scala new file mode 100644 index 0000000000..fdb843962b --- /dev/null +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplication.scala @@ -0,0 +1,90 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding.typed + +import akka.Done +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.eventstream.EventStream +import akka.actor.typed.scaladsl.Behaviors +import akka.annotation.ApiMayChange +import akka.annotation.DoNotInherit +import akka.annotation.InternalApi +import akka.persistence.typed.PublishedEvent + +import scala.collection.JavaConverters._ + +/** + * Used when sharding Active Active entities in multiple instances of sharding, for example one per DC in a Multi DC + * Akka Cluster. + * + * This actor should be started once on each node where Active Active entities will run (the same nodes that you start + * sharding on). + * + * Subscribes to locally written events through the event stream and sends the seen events to all the sharded replicas + * which can then fast forward their cross-replica event streams to improve latency while allowing less frequent poll + * for the cross replica queries. Note that since message delivery is at-most-once this can not be the only + * channel for replica events - the entities must still tail events from the journals of other replicas. + * + * The events are forwarded as [[akka.cluster.sharding.typed.ShardingEnvelope]] this will work out of the box both + * by default and with a custom extractor since the envelopes are handled internally. + */ +@ApiMayChange +object ActiveActiveShardingDirectReplication { + + /** + * Not for user extension + */ + @DoNotInherit + sealed trait Command + + /** + * INTERNAL API + */ + @InternalApi + private[akka] case class VerifyStarted(replyTo: ActorRef[Done]) extends Command + + /** + * Java API: + * @param selfReplica The replica id of the replica that runs on this node + * @param replicaShardingProxies A replica id to sharding proxy mapping for each replica in the system + */ + def create[T](selfReplica: String, replicaShardingProxies: java.util.Map[String, ActorRef[T]]): Behavior[Command] = + apply(selfReplica, replicaShardingProxies.asScala.toMap) + + /** + * Scala API: + * @param selfReplica The replica id of the replica that runs on this node + * @param replicaShardingProxies A replica id to sharding proxy mapping for each replica in the system + */ + def apply[T](selfReplica: String, replicaShardingProxies: Map[String, ActorRef[T]]): Behavior[Command] = + Behaviors + .setup[Any] { context => + context.log.debug( + "Subscribing to event stream to forward events to [{}] sharded replicas", + replicaShardingProxies.size - 1) + context.system.eventStream ! EventStream.Subscribe[PublishedEvent](context.self) + + Behaviors.receiveMessagePartial { + case event: PublishedEvent => + context.log.trace( + "Forwarding event for persistence id [{}] sequence nr [{}] to replicas", + event.persistenceId, + event.sequenceNumber) + replicaShardingProxies.foreach { + case (replica, proxy) => + val envelopedEvent = ShardingEnvelope(event.persistenceId.id, event) + if (replica != selfReplica) + proxy.asInstanceOf[ActorRef[ShardingEnvelope[PublishedEvent]]] ! envelopedEvent + } + Behaviors.same + case VerifyStarted(replyTo) => + replyTo ! Done + Behaviors.same + } + } + .narrow[Command] + +} diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/Murmur2MessageExtractor.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/Murmur2MessageExtractor.scala index 166a620218..631191270c 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/Murmur2MessageExtractor.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/Murmur2MessageExtractor.scala @@ -12,7 +12,7 @@ abstract class Murmur2NoEnvelopeMessageExtractor[M](val numberOfShards: Int) ext } /** - * The murmur2 message extractor uses the same algorithm as the default kafka partitoiner + * The murmur2 message extractor uses the same algorithm as the default kafka partitioner * allowing kafka partitions to be mapped to shards. * This can be used with the [[akka.cluster.sharding.external.ExternalShardAllocationStrategy]] to have messages * processed locally. diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplicationSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplicationSpec.scala new file mode 100644 index 0000000000..b9a939f387 --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplicationSpec.scala @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding.typed + +import akka.Done +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.eventstream.EventStream +import akka.persistence.typed.PersistenceId +import akka.persistence.typed.internal.PublishedEventImpl +import org.scalatest.wordspec.AnyWordSpecLike + +class ActiveActiveShardingDirectReplicationSpec + extends ScalaTestWithActorTestKit + with AnyWordSpecLike + with LogCapturing { + + "Active active sharding replication" must { + + "replicate published events to all sharding proxies" in { + val replicaAProbe = createTestProbe[Any]() + val replicaBProbe = createTestProbe[Any]() + val replicaCProbe = createTestProbe[Any]() + + val replicationActor = spawn( + ActiveActiveShardingDirectReplication( + "ReplicaA", + replicaShardingProxies = + Map("ReplicaA" -> replicaAProbe.ref, "ReplicaB" -> replicaBProbe.ref, "ReplicaC" -> replicaCProbe.ref))) + + val upProbe = createTestProbe[Done]() + replicationActor ! ActiveActiveShardingDirectReplication.VerifyStarted(upProbe.ref) + upProbe.receiveMessage() // not bullet proof wrt to subscription being complete but good enough + + val event = PublishedEventImpl( + Some("ReplicaA"), + PersistenceId.replicatedUniqueId("pid", "ReplicaA"), + 1L, + "event", + System.currentTimeMillis()) + system.eventStream ! EventStream.Publish(event) + + replicaBProbe.expectMessageType[ShardingEnvelope[_]].message should equal(event) + replicaCProbe.expectMessageType[ShardingEnvelope[_]].message should equal(event) + replicaAProbe.expectNoMessage() // no publishing to the replica emitting it + } + + } + +} diff --git a/akka-persistence-typed-tests/src/test/resources/logback-test.xml b/akka-persistence-typed-tests/src/test/resources/logback-test.xml index c980894390..df3cd05c69 100644 --- a/akka-persistence-typed-tests/src/test/resources/logback-test.xml +++ b/akka-persistence-typed-tests/src/test/resources/logback-test.xml @@ -5,7 +5,7 @@ - %date{ISO8601} [%-5level] [%logger] [%marker] [%X{akkaSource}] [%X{persistenceId}] - %msg %n + %date{ISO8601} [%-5level] [%logger] [%marker] [%X{akkaSource}] [%X{persistencePhase}] [%X{persistenceId}] - %msg %n @@ -25,7 +25,7 @@ - + diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveEventPublishingSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveEventPublishingSpec.scala new file mode 100644 index 0000000000..c1238b2531 --- /dev/null +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveEventPublishingSpec.scala @@ -0,0 +1,150 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.typed + +import akka.Done +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors +import akka.persistence.testkit.PersistenceTestKitPlugin +import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal +import akka.persistence.typed.scaladsl.ActiveActiveEventSourcing +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.EventSourcedBehavior +import org.scalatest.wordspec.AnyWordSpecLike + +object ActiveActiveEventPublishingSpec { + + object MyActiveActive { + trait Command + case class Add(text: String, replyTo: ActorRef[Done]) extends Command + case class Get(replyTo: ActorRef[Set[String]]) extends Command + + def apply(entityId: String, replicaId: String, allReplicas: Set[String]): Behavior[Command] = + Behaviors.setup { ctx => + ActiveActiveEventSourcing(entityId, replicaId, allReplicas, PersistenceTestKitReadJournal.Identifier)( + aactx => + EventSourcedBehavior[Command, String, Set[String]]( + aactx.persistenceId, + Set.empty, + (state, command) => + command match { + case Add(string, replyTo) => + ctx.log.debug("Persisting [{}]", string) + Effect.persist(string).thenRun { _ => + ctx.log.debug("Ack:ing [{}]", string) + replyTo ! Done + } + case Get(replyTo) => + replyTo ! state + Effect.none + }, + (state, string) => state + string)) + } + } +} + +class ActiveActiveEventPublishingSpec + extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config) + with AnyWordSpecLike + with LogCapturing { + + import ActiveActiveEventPublishingSpec._ + + "An active active actor" must { + "move forward when a published event from a replica is received" in { + + val actor = spawn(MyActiveActive("myId1", "DC-A", Set("DC-A", "DC-B"))) + val probe = createTestProbe[Any]() + actor ! MyActiveActive.Add("one", probe.ref) + probe.expectMessage(Done) + + // simulate a published event from another replica + actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( + Some("DC-B"), + PersistenceId.replicatedUniqueId("myId1", "DC-B"), + 1L, + "two", + System.currentTimeMillis()) + actor ! MyActiveActive.Add("three", probe.ref) + probe.expectMessage(Done) + + actor ! MyActiveActive.Get(probe.ref) + probe.expectMessage(Set("one", "two", "three")) + } + + "ignore a published event from a replica is received but the sequence number is unexpected" in { + val actor = spawn(MyActiveActive("myId2", "DC-A", Set("DC-A", "DC-B"))) + val probe = createTestProbe[Any]() + actor ! MyActiveActive.Add("one", probe.ref) + probe.expectMessage(Done) + + // simulate a published event from another replica + actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( + Some("DC-B"), + PersistenceId.replicatedUniqueId("myId2", "DC-B"), + 2L, // missing 1L + "two", + System.currentTimeMillis()) + actor ! MyActiveActive.Add("three", probe.ref) + probe.expectMessage(Done) + + actor ! MyActiveActive.Get(probe.ref) + probe.expectMessage(Set("one", "three")) + } + + "ignore a published event from an unknown replica" in { + val actor = spawn(MyActiveActive("myId3", "DC-A", Set("DC-A", "DC-B"))) + val probe = createTestProbe[Any]() + actor ! MyActiveActive.Add("one", probe.ref) + probe.expectMessage(Done) + + // simulate a published event from another replica + actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( + Some("DC-C"), + PersistenceId.replicatedUniqueId("myId3", "DC-C"), + 1L, + "two", + System.currentTimeMillis()) + actor ! MyActiveActive.Add("three", probe.ref) + probe.expectMessage(Done) + + actor ! MyActiveActive.Get(probe.ref) + probe.expectMessage(Set("one", "three")) + } + + "ignore an already seen event from a replica" in { + val actor = spawn(MyActiveActive("myId4", "DC-A", Set("DC-A", "DC-B"))) + val probe = createTestProbe[Any]() + actor ! MyActiveActive.Add("one", probe.ref) + probe.expectMessage(Done) + + // simulate a published event from another replica + actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( + Some("DC-B"), + PersistenceId.replicatedUniqueId("myId4", "DC-B"), + 1L, + "two", + System.currentTimeMillis()) + // simulate another published event from that replica + actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( + Some("DC-B"), + PersistenceId.replicatedUniqueId("myId4", "DC-B"), + 1L, + "two-again", // ofc this would be the same in the real world, different just so we can detect + System.currentTimeMillis()) + + actor ! MyActiveActive.Add("three", probe.ref) + probe.expectMessage(Done) + + actor ! MyActiveActive.Get(probe.ref) + probe.expectMessage(Set("one", "two", "three")) + } + + } + +} diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/EventPublishingSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/EventPublishingSpec.scala new file mode 100644 index 0000000000..a899beebc4 --- /dev/null +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/EventPublishingSpec.scala @@ -0,0 +1,84 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.typed + +import akka.Done +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.eventstream.EventStream +import akka.persistence.testkit.PersistenceTestKitPlugin +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.serialization.jackson.CborSerializable +import org.scalatest.wordspec.AnyWordSpecLike + +object EventPublishingSpec { + + object WowSuchEventSourcingBehavior { + sealed trait Command + case class StoreThis(data: String, tagIt: Boolean, replyTo: ActorRef[Done]) extends Command + + final case class Event(data: String, tagIt: Boolean) extends CborSerializable + + def apply(id: PersistenceId): Behavior[Command] = + EventSourcedBehavior[Command, Event, Set[Event]]( + id, + Set.empty, + (_, command) => + command match { + case StoreThis(data, tagIt, replyTo) => + Effect.persist(Event(data, tagIt)).thenRun(_ => replyTo ! Done) + }, + (state, event) => state + event) + .withTagger(evt => if (evt.tagIt) Set("tag") else Set.empty) + .withEventPublishing() + } +} + +class EventPublishingSpec + extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config) + with AnyWordSpecLike + with LogCapturing { + + import EventPublishingSpec._ + + "EventPublishing support" must { + + "publish events after written for any actor" in { + val topicProbe = createTestProbe[Any]() + system.eventStream ! EventStream.Subscribe[PublishedEvent](topicProbe.ref.narrow) + // We don't verify subscription completed (no ack available), but expect the next steps to take enough time + // for subscription to complete + + val myId = PersistenceId.ofUniqueId("myId") + val wowSuchActor = spawn(WowSuchEventSourcingBehavior(myId)) + + val persistProbe = createTestProbe[Any]() + wowSuchActor ! WowSuchEventSourcingBehavior.StoreThis("great stuff", tagIt = false, replyTo = persistProbe.ref) + persistProbe.expectMessage(Done) + + val published1 = topicProbe.expectMessageType[PublishedEvent] + published1.persistenceId should ===(myId) + published1.event should ===(WowSuchEventSourcingBehavior.Event("great stuff", false)) + published1.sequenceNumber should ===(1L) + published1.tags should ===(Set.empty) + + val anotherId = PersistenceId.ofUniqueId("anotherId") + val anotherActor = spawn(WowSuchEventSourcingBehavior(anotherId)) + anotherActor ! WowSuchEventSourcingBehavior.StoreThis("another event", tagIt = true, replyTo = persistProbe.ref) + persistProbe.expectMessage(Done) + + val published2 = topicProbe.expectMessageType[PublishedEvent] + published2.persistenceId should ===(anotherId) + published2.event should ===(WowSuchEventSourcingBehavior.Event("another event", true)) + published2.sequenceNumber should ===(1L) + published2.tags should ===(Set("tag")) + } + + } + +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/PublishedEvent.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/PublishedEvent.scala new file mode 100644 index 0000000000..67a9560b83 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/PublishedEvent.scala @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.typed + +import java.util.Optional + +import akka.annotation.DoNotInherit + +/** + * When using event publishing the events published to the system event stream will be in this form. + * + * Not for user extension + */ +@DoNotInherit +trait PublishedEvent { + + /** Scala API: When emitted from an Active Active actor this will contain the replica id */ + def replicaId: Option[String] + + /** Java API: When emitted from an Active Active actor this will contain the replica id */ + def getReplicaId: Optional[String] + def persistenceId: PersistenceId + def sequenceNumber: Long + + /** User event */ + def event: Any + def timestamp: Long + def tags: Set[String] +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala index 6f87ad5134..20281bace5 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala @@ -7,7 +7,7 @@ package akka.persistence.typed.internal import scala.concurrent.ExecutionContext import scala.util.control.NonFatal import org.slf4j.{ Logger, MDC } -import akka.actor.{ ActorRef, Cancellable } +import akka.actor.{ Cancellable, ActorRef => ClassicActorRef } import akka.actor.typed.Signal import akka.actor.typed.scaladsl.ActorContext import akka.annotation.InternalApi @@ -48,7 +48,8 @@ private[akka] final class BehaviorSetup[C, E, S]( var holdingRecoveryPermit: Boolean, val settings: EventSourcedSettings, val stashState: StashState, - val activeActive: Option[ActiveActive]) { + val activeActive: Option[ActiveActive], + val publishEvents: Boolean) { import BehaviorSetup._ import InternalProtocol.RecoveryTickEvent @@ -57,10 +58,12 @@ private[akka] final class BehaviorSetup[C, E, S]( val persistence: Persistence = Persistence(context.system.toClassic) - val journal: ActorRef = persistence.journalFor(settings.journalPluginId) - val snapshotStore: ActorRef = persistence.snapshotStoreFor(settings.snapshotPluginId) + val journal: ClassicActorRef = persistence.journalFor(settings.journalPluginId) + val snapshotStore: ClassicActorRef = persistence.snapshotStoreFor(settings.snapshotPluginId) - def selfClassic: ActorRef = context.self.toClassic + val replicaId: Option[String] = activeActive.map(_.replicaId) + + def selfClassic: ClassicActorRef = context.self.toClassic private var mdcPhase = PersistenceMdc.Initializing def log: Logger = { diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index 0087e5d20b..a14d0735f4 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -4,6 +4,7 @@ package akka.persistence.typed.internal +import java.util.Optional import java.util.UUID import java.util.concurrent.atomic.AtomicInteger @@ -24,6 +25,7 @@ import akka.persistence.JournalProtocol import akka.persistence.Recovery import akka.persistence.RecoveryPermitter import akka.persistence.SnapshotProtocol +import akka.persistence.journal.Tagged import akka.persistence.typed.DeleteEventsCompleted import akka.persistence.typed.DeleteEventsFailed import akka.persistence.typed.DeleteSnapshotsCompleted @@ -32,6 +34,7 @@ import akka.persistence.typed.DeletionTarget import akka.persistence.typed.EventAdapter import akka.persistence.typed.NoOpEventAdapter import akka.persistence.typed.PersistenceId +import akka.persistence.typed.PublishedEvent import akka.persistence.typed.SnapshotAdapter import akka.persistence.typed.SnapshotCompleted import akka.persistence.typed.SnapshotFailed @@ -89,7 +92,8 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( retention: RetentionCriteria = RetentionCriteria.disabled, supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop, override val signalHandler: PartialFunction[(State, Signal), Unit] = PartialFunction.empty, - activeActive: Option[ActiveActive] = None) + activeActive: Option[ActiveActive] = None, + publishEvents: Boolean = false) extends EventSourcedBehavior[Command, Event, State] { import EventSourcedBehaviorImpl.WriterIdentity @@ -153,7 +157,8 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( holdingRecoveryPermit = false, settings = settings, stashState = stashState, - activeActive = activeActive) + activeActive = activeActive, + publishEvents = publishEvents) // needs to accept Any since we also can get messages from the journal // not part of the user facing Command protocol @@ -241,6 +246,10 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( copy(recovery = recovery.toClassic) } + override def withEventPublishing(): EventSourcedBehavior[Command, Event, State] = { + copy(publishEvents = true) + } + override private[akka] def withActiveActive( context: ActiveActiveContextImpl, id: String, @@ -261,6 +270,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( final case class ReplicatedEventEnvelope[E](event: ReplicatedEvent[E], ack: ActorRef[ReplicatedEventAck.type]) extends InternalProtocol + } // FIXME serializer @@ -270,3 +280,31 @@ private[akka] final case class ReplicatedEventMetaData(originReplica: String, or private[akka] final case class ReplicatedEvent[E](event: E, originReplica: String, originSequenceNr: Long) @InternalApi private[akka] case object ReplicatedEventAck + +/** + * INTERNAL API + */ +@InternalApi +private[akka] final case class PublishedEventImpl( + replicaId: Option[String], + persistenceId: PersistenceId, + sequenceNumber: Long, + payload: Any, + timestamp: Long) + extends PublishedEvent + with InternalProtocol { + import scala.compat.java8.OptionConverters._ + + override def getReplicaId: Optional[String] = replicaId.asJava + + def tags: Set[String] = payload match { + case t: Tagged => t.tags + case _ => Set.empty + } + + def event: Any = payload match { + case Tagged(event, _) => event + case _ => payload + } + +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/FastForwardingFilter.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/FastForwardingFilter.scala new file mode 100644 index 0000000000..a607f0b907 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/FastForwardingFilter.scala @@ -0,0 +1,72 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.typed.internal + +import akka.annotation.InternalApi +import akka.persistence.query.EventEnvelope +import akka.stream.Attributes +import akka.stream.FlowShape +import akka.stream.Inlet +import akka.stream.Outlet +import akka.stream.stage.GraphStageLogic +import akka.stream.stage.GraphStageWithMaterializedValue +import akka.stream.stage.InHandler +import akka.stream.stage.OutHandler + +/** + * INTERNAL API + */ +@InternalApi +private[akka] trait ReplicationStreamControl { + def fastForward(sequenceNumber: Long): Unit +} + +/** + * INTERNAL API + */ +@InternalApi +private[akka] class FastForwardingFilter + extends GraphStageWithMaterializedValue[FlowShape[EventEnvelope, EventEnvelope], ReplicationStreamControl] { + + val in = Inlet[EventEnvelope]("FastForwardingFilter.in") + val out = Outlet[EventEnvelope]("FastForwardingFilter.out") + + override val shape = FlowShape[EventEnvelope, EventEnvelope](in, out) + + override def createLogicAndMaterializedValue( + inheritedAttributes: Attributes): (GraphStageLogic, ReplicationStreamControl) = { + var replicationStreamControl: ReplicationStreamControl = null + val logic = new GraphStageLogic(shape) with InHandler with OutHandler { + // -1 means not currently fast forwarding + @volatile private var fastForwardTo = -1L + + override def onPush(): Unit = { + val eventEnvelope = grab(in) + if (fastForwardTo == -1L) + push(out, eventEnvelope) + else { + if (eventEnvelope.sequenceNr <= fastForwardTo) pull(in) + else { + fastForwardTo = -1L + push(out, eventEnvelope) + } + } + } + override def onPull(): Unit = pull(in) + + replicationStreamControl = new ReplicationStreamControl { + override def fastForward(sequenceNumber: Long): Unit = { + require(sequenceNumber > 0) // only the stage may complete a fast forward + fastForwardTo = sequenceNumber + } + } + + setHandlers(in, out, this) + } + + (logic, replicationStreamControl) + } + +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index eafe1e9eb6..c229c842a1 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -93,6 +93,7 @@ private[akka] final class ReplayingEvents[C, E, S]( case SnapshotterResponse(r) => onSnapshotterResponse(r) case RecoveryTickEvent(snap) => onRecoveryTick(snap) case evt: ReplicatedEventEnvelope[E] => onInternalCommand(evt) + case pe: PublishedEventImpl => onInternalCommand(pe) case cmd: IncomingCommand[C] => onInternalCommand(cmd) case get: GetState[S @unchecked] => stashInternal(get) case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit @@ -259,7 +260,12 @@ private[akka] final class ReplayingEvents[C, E, S]( val running = Running[C, E, S]( setup, - Running.RunningState[S](state.seqNr, state.state, state.receivedPoisonPill, seenPerReplica)) + Running.RunningState[S]( + seqNr = state.seqNr, + state = state.state, + receivedPoisonPill = state.receivedPoisonPill, + seenPerReplica = seenPerReplica, + replicationControl = Map.empty)) tryUnstashOne(running) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala index 6c471b9201..cfabb7a43c 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala @@ -61,6 +61,7 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup case JournalResponse(r) => onJournalResponse(r) case RecoveryTickEvent(snapshot) => onRecoveryTick(snapshot) case evt: ReplicatedEventEnvelope[E] => onReplicatedEvent(evt) + case pe: PublishedEventImpl => onPublishedEvent(pe) case cmd: IncomingCommand[C] => if (receivedPoisonPill) { if (setup.settings.logOnStashing) @@ -127,6 +128,10 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup stashInternal(evt) } + def onPublishedEvent(event: PublishedEventImpl): Behavior[InternalProtocol] = { + stashInternal(event) + } + def onJournalResponse(response: JournalProtocol.Response): Behavior[InternalProtocol] = { setup.log.debug( "Unexpected response from journal: [{}], may be due to an actor restart, ignoring...", diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index e932ccf873..f24914e4f8 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -4,13 +4,17 @@ package akka.persistence.typed.internal +import java.util.concurrent.atomic.AtomicReference + import scala.annotation.tailrec import scala.collection.immutable import akka.actor.UnhandledMessage -import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Signal } +import akka.actor.typed.eventstream.EventStream +import akka.actor.typed.{ Behavior, Signal } import akka.actor.typed.internal.PoisonPill import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors, LoggerOps } import akka.annotation.{ InternalApi, InternalStableApi } +import akka.event.Logging import akka.persistence.DeleteMessagesFailure import akka.persistence.DeleteMessagesSuccess import akka.persistence.DeleteSnapshotFailure @@ -44,10 +48,14 @@ import akka.persistence.typed.internal.InternalProtocol.ReplicatedEventEnvelope import akka.persistence.typed.internal.Running.WithSeqNrAccessible import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.EventSourcedBehavior.ActiveActive +import akka.stream.scaladsl.Keep import akka.stream.{ SharedKillSwitch, SystemMaterializer } import akka.stream.scaladsl.{ RestartSource, Sink } import akka.stream.typed.scaladsl.ActorFlow -import akka.util.{ unused, OptionVal, Timeout } +import akka.util.Helpers +import akka.util.OptionVal +import akka.util.unused +import akka.util.Timeout /** * INTERNAL API @@ -79,6 +87,7 @@ private[akka] object Running { state: State, receivedPoisonPill: Boolean, seenPerReplica: Map[String, Long], + replicationControl: Map[String, ReplicationStreamControl], replicationKillSwitch: Option[SharedKillSwitch] = None) { def nextSequenceNr(): RunningState[State] = @@ -95,31 +104,38 @@ private[akka] object Running { def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: RunningState[S]): Behavior[InternalProtocol] = { val running = new Running(setup.setMdcPhase(PersistenceMdc.RunningCmds)) - setup.activeActive.foreach(aa => startReplicationStream(setup.context.system, setup.context.self, state, aa)) - new running.HandlingCommands(state) + val initialState = setup.activeActive match { + case Some(aa) => startReplicationStream(setup, state, aa) + case None => state + } + new running.HandlingCommands(initialState) } - def startReplicationStream[E, S]( - system: ActorSystem[_], - ref: ActorRef[InternalProtocol], + def startReplicationStream[C, E, S]( + setup: BehaviorSetup[C, E, S], state: RunningState[S], - aa: ActiveActive): Unit = { + aa: ActiveActive): RunningState[S] = { import scala.concurrent.duration._ + val system = setup.context.system + val ref = setup.context.self val query = PersistenceQuery(system) - aa.allReplicas.foreach { replica => - if (replica != aa.replicaId) { - val seqNr = state.seenPerReplica(replica) - val pid = PersistenceId.replicatedUniqueId(aa.aaContext.entityId, replica) + aa.allReplicas.foldLeft(state) { (state, replicaId) => + if (replicaId != aa.replicaId) { + val seqNr = state.seenPerReplica(replicaId) + val pid = PersistenceId.replicatedUniqueId(aa.aaContext.entityId, replicaId) // FIXME support different configuration per replica https://github.com/akka/akka/issues/29257 val replication = query.readJournalFor[EventsByPersistenceIdQuery](aa.queryPluginId) implicit val timeout = Timeout(30.seconds) - // FIXME config + val controlRef = new AtomicReference[ReplicationStreamControl]() + val source = RestartSource.withBackoff(2.seconds, 10.seconds, randomFactor = 0.2) { () => replication .eventsByPersistenceId(pid.id, seqNr + 1, Long.MaxValue) + .viaMat(new FastForwardingFilter)(Keep.right) + .mapMaterializedValue(streamControl => controlRef.set(streamControl)) .via(ActorFlow.ask[EventEnvelope, ReplicatedEventEnvelope[E], ReplicatedEventAck.type](ref) { (eventEnvelope, replyTo) => // Need to handle this not being available migration from non-active-active is supported @@ -131,6 +147,30 @@ private[akka] object Running { } source.runWith(Sink.ignore)(SystemMaterializer(system).materializer) + + // FIXME support from journal to fast forward https://github.com/akka/akka/issues/29311 + state.copy( + replicationControl = + state.replicationControl.updated(replicaId, new ReplicationStreamControl { + override def fastForward(sequenceNumber: Long): Unit = { + // (logging is safe here since invoked on message receive + OptionVal(controlRef.get) match { + case OptionVal.Some(control) => + if (setup.log.isDebugEnabled) + setup.log.debug("Fast forward replica [{}] to [{}]", replicaId, sequenceNumber) + control.fastForward(sequenceNumber) + case OptionVal.None => + // stream not started yet, ok, fast forward is an optimization + if (setup.log.isDebugEnabled) + setup.log.debug( + "Ignoring fast forward replica [{}] to [{}], stream not started yet", + replicaId, + sequenceNumber) + } + } + })) + } else { + state } } } @@ -163,6 +203,7 @@ private[akka] object Running { def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match { case IncomingCommand(c: C @unchecked) => onCommand(state, c) case re: ReplicatedEventEnvelope[E @unchecked] => onReplicatedEvent(state, re, setup.activeActive.get) + case pe: PublishedEventImpl => onPublishedEvent(state, pe) case JournalResponse(r) => onDeleteEventsJournalResponse(r, state.state) case SnapshotterResponse(r) => onDeleteSnapshotResponse(r, state.state) case get: GetState[S @unchecked] => onGetState(get) @@ -205,6 +246,91 @@ private[akka] object Running { } } + def onPublishedEvent(state: Running.RunningState[S], event: PublishedEventImpl): Behavior[InternalProtocol] = { + val newBehavior: Behavior[InternalProtocol] = setup.activeActive match { + case None => + setup.log + .warn("Received published event for [{}] but not an active active actor, dropping", event.persistenceId) + this + + case Some(activeActive) => + event.replicaId match { + case None => + setup.log.warn("Received published event for [{}] but with no replica id, dropping") + this + case Some(replicaId) => + onPublishedEvent(state, activeActive, replicaId, event) + } + } + tryUnstashOne(newBehavior) + } + + private def onPublishedEvent( + state: Running.RunningState[S], + activeActive: ActiveActive, + originReplicaId: String, + event: PublishedEventImpl): Behavior[InternalProtocol] = { + val log = setup.log + val separatorIndex = event.persistenceId.id.indexOf(PersistenceId.DefaultSeparator) + val idPrefix = event.persistenceId.id.substring(0, separatorIndex) + if (!setup.persistenceId.id.startsWith(idPrefix)) { + log.warn("Ignoring published replicated event for the wrong actor [{}]", event.persistenceId) + this + } else if (originReplicaId == activeActive.replicaId) { + if (log.isDebugEnabled) + log.debug( + "Ignoring published replicated event with seqNr [{}] from our own replica id [{}]", + event.sequenceNumber, + originReplicaId) + this + } else if (!activeActive.allReplicas.contains(originReplicaId)) { + log.warnN( + "Received published replicated event from replica [{}], which is unknown. Active active must be set up with a list of all replicas (known are [{}]).", + originReplicaId, + activeActive.allReplicas.mkString(", ")) + this + } else { + val expectedSequenceNumber = state.seenPerReplica(originReplicaId) + 1 + if (expectedSequenceNumber > event.sequenceNumber) { + // already seen + if (log.isDebugEnabled) + log.debugN( + "Ignoring published replicated event with seqNr [{}] from replica [{}] because it was already seen ([{}])", + event.sequenceNumber, + originReplicaId, + expectedSequenceNumber) + this + } else if (expectedSequenceNumber != event.sequenceNumber) { + // gap in sequence numbers (message lost or query and direct replication out of sync, should heal up by itself + // once the query catches up) + if (log.isDebugEnabled) { + log.debugN( + "Ignoring published replicated event with replication seqNr [{}] from replica [{}] " + + "because expected replication seqNr was [{}] ", + event.sequenceNumber, + event.replicaId, + expectedSequenceNumber) + } + this + } else { + if (log.isTraceEnabled) + log.traceN( + "Received published replicated event [{}] with timestamp [{}] from replica [{}] seqNr [{}]", + Logging.simpleName(event.event.getClass), + Helpers.timestamp(event.timestamp), + originReplicaId, + event.sequenceNumber) + + // fast forward stream for source replica + state.replicationControl.get(originReplicaId).foreach(_.fastForward(event.sequenceNumber)) + + handleExternalReplicatedEventPersist( + ReplicatedEvent(event.event.asInstanceOf[E], originReplicaId, event.sequenceNumber)) + } + + } + } + // Used by EventSourcedBehaviorTestKit to retrieve the state. def onGetState(get: GetState[S]): Behavior[InternalProtocol] = { get.replyTo ! state.state @@ -368,6 +494,7 @@ private[akka] object Running { case JournalResponse(r) => onJournalResponse(r) case in: IncomingCommand[C @unchecked] => onCommand(in) case re: ReplicatedEventEnvelope[E @unchecked] => onReplicatedEvent(re) + case pe: PublishedEventImpl => onPublishedEvent(pe) case get: GetState[S @unchecked] => stashInternal(get) case SnapshotterResponse(r) => onDeleteSnapshotResponse(r, visibleState.state) case RecoveryTickEvent(_) => Behaviors.unhandled @@ -393,6 +520,14 @@ private[akka] object Running { } } + def onPublishedEvent(event: PublishedEventImpl): Behavior[InternalProtocol] = { + if (state.receivedPoisonPill) { + Behaviors.unhandled + } else { + stashInternal(event) + } + } + final def onJournalResponse(response: Response): Behavior[InternalProtocol] = { if (setup.log.isDebugEnabled) { setup.log.debug2( @@ -407,6 +542,11 @@ private[akka] object Running { onWriteSuccess(setup.context, p) + if (setup.publishEvents) { + context.system.eventStream ! EventStream.Publish( + PublishedEventImpl(setup.replicaId, setup.persistenceId, p.sequenceNr, p.payload, p.timestamp)) + } + // only once all things are applied we can revert back if (eventCounter < numberOfEvents) { onWriteDone(setup.context, p) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/StashManagement.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/StashManagement.scala index 49df6ae06b..5fca6336d5 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/StashManagement.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/StashManagement.scala @@ -62,7 +62,7 @@ private[akka] trait StashManagement[C, E, S] { } /** - * `tryUnstashOne` is called at the end of processing each command or when persist is completed + * `tryUnstashOne` is called at the end of processing each command, published event, or when persist is completed */ protected def tryUnstashOne(behavior: Behavior[InternalProtocol]): Behavior[InternalProtocol] = { val buffer = diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala index e53a1918a0..921f9a014f 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala @@ -12,6 +12,7 @@ import akka.actor.typed.internal.BehaviorImpl.DeferredBehavior import akka.actor.typed.internal.InterceptorImpl import akka.actor.typed.internal.LoggerClass import akka.actor.typed.scaladsl.ActorContext +import akka.annotation.ApiMayChange import akka.annotation.{ DoNotInherit, InternalApi } import akka.persistence.typed.EventAdapter import akka.persistence.typed.PersistenceId @@ -21,8 +22,9 @@ import akka.persistence.typed.internal._ object EventSourcedBehavior { + // FIXME move to internal @InternalApi - private[akka] case class ActiveActive( + private[akka] final case class ActiveActive( replicaId: String, allReplicas: Set[String], aaContext: ActiveActiveContextImpl, @@ -237,4 +239,10 @@ object EventSourcedBehavior { * By default, snapshots and events are recovered. */ def withRecovery(recovery: Recovery): EventSourcedBehavior[Command, Event, State] + + /** + * Publish events to the system event stream as [[akka.persistence.typed.PublishedEvent]] after they have been persisted + */ + @ApiMayChange + def withEventPublishing(): EventSourcedBehavior[Command, Event, State] } diff --git a/akka-persistence-typed/src/test/resources/logback-test.xml b/akka-persistence-typed/src/test/resources/logback-test.xml index c980894390..901bdbb4aa 100644 --- a/akka-persistence-typed/src/test/resources/logback-test.xml +++ b/akka-persistence-typed/src/test/resources/logback-test.xml @@ -5,7 +5,7 @@ - %date{ISO8601} [%-5level] [%logger] [%marker] [%X{akkaSource}] [%X{persistenceId}] - %msg %n + %date{ISO8601} [%-5level] [%logger] [%marker] [%X{akkaSource}] [%X{persistencePhase}] [%X{persistenceId}] - %msg %n diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala index c033e614ed..b3175e5777 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala @@ -67,7 +67,8 @@ class EventSourcedBehaviorWatchSpec holdingRecoveryPermit = false, settings = settings, stashState = new StashState(context.asInstanceOf[ActorContext[InternalProtocol]], settings), - None) + activeActive = None, + publishEvents = false) "A typed persistent parent actor watching a child" must { diff --git a/build.sbt b/build.sbt index 72bc671395..85977aabed 100644 --- a/build.sbt +++ b/build.sbt @@ -499,7 +499,7 @@ lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed") clusterSharding % "compile->compile;compile->CompileJdk9;multi-jvm->multi-jvm", actorTestkitTyped % "test->test", actorTypedTests % "test->test", - persistenceTyped % "test->test", + persistenceTyped % "optional->compile;test->test", persistenceTestkit % "test->test", remote % "compile->CompileJdk9;test->test", remoteTests % "test->test",