diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplicationSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplicationSpec.scala index 79952ccf1a..9b2653399e 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplicationSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ActiveActiveShardingDirectReplicationSpec.scala @@ -5,14 +5,13 @@ package akka.cluster.sharding.typed import org.scalatest.wordspec.AnyWordSpecLike - import akka.Done import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.eventstream.EventStream import akka.persistence.typed.PersistenceId import akka.persistence.typed.PublishedEvent -import akka.persistence.typed.internal.PublishedEventImpl +import akka.persistence.typed.internal.{ PublishedEventImpl, ReplicatedPublishedEventMetaData, VersionVector } class ActiveActiveShardingDirectReplicationSpec extends ScalaTestWithActorTestKit @@ -37,11 +36,11 @@ class ActiveActiveShardingDirectReplicationSpec upProbe.receiveMessage() // not bullet proof wrt to subscription being complete but good enough val event = PublishedEventImpl( - Some("ReplicaA"), PersistenceId.replicatedUniqueId("pid", "ReplicaA"), 1L, "event", - System.currentTimeMillis()) + System.currentTimeMillis(), + Some(new ReplicatedPublishedEventMetaData("ReplicaA", VersionVector.empty))) system.eventStream ! EventStream.Publish(event) replicaBProbe.receiveMessage().message should equal(event) diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveEventPublishingSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveEventPublishingSpec.scala index 1acb5e67e7..f844bb8538 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveEventPublishingSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveEventPublishingSpec.scala @@ -12,6 +12,7 @@ import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors import akka.persistence.testkit.PersistenceTestKitPlugin import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal +import akka.persistence.typed.internal.{ ReplicatedPublishedEventMetaData, VersionVector } import akka.persistence.typed.scaladsl.ActiveActiveEventSourcing import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.EventSourcedBehavior @@ -74,11 +75,11 @@ class ActiveActiveEventPublishingSpec // simulate a published event from another replica actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - Some("DC-B"), PersistenceId.replicatedUniqueId(id, "DC-B"), 1L, "two", - System.currentTimeMillis()) + System.currentTimeMillis(), + Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty))) actor ! MyActiveActive.Add("three", probe.ref) probe.expectMessage(Done) @@ -95,11 +96,11 @@ class ActiveActiveEventPublishingSpec // simulate a published event from another replica actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - Some("DC-B"), PersistenceId.replicatedUniqueId(id, "DC-B"), 2L, // missing 1L "two", - System.currentTimeMillis()) + System.currentTimeMillis(), + Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty))) actor ! MyActiveActive.Add("three", probe.ref) probe.expectMessage(Done) @@ -116,11 +117,11 @@ class ActiveActiveEventPublishingSpec // simulate a published event from another replica actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - Some("DC-C"), PersistenceId.replicatedUniqueId(id, "DC-C"), 1L, "two", - System.currentTimeMillis()) + System.currentTimeMillis(), + Some(new ReplicatedPublishedEventMetaData("DC-C", VersionVector.empty))) actor ! MyActiveActive.Add("three", probe.ref) probe.expectMessage(Done) @@ -137,18 +138,18 @@ class ActiveActiveEventPublishingSpec // simulate a published event from another replica actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - Some("DC-B"), PersistenceId.replicatedUniqueId("myId4", "DC-B"), 1L, "two", - System.currentTimeMillis()) + System.currentTimeMillis(), + Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty))) // simulate another published event from that replica actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - Some("DC-B"), PersistenceId.replicatedUniqueId(id, "DC-B"), 1L, "two-again", // ofc this would be the same in the real world, different just so we can detect - System.currentTimeMillis()) + System.currentTimeMillis(), + Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty))) actor ! MyActiveActive.Add("three", probe.ref) probe.expectMessage(Done) @@ -176,11 +177,11 @@ class ActiveActiveEventPublishingSpec // simulate a published event from another replica incarnation2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - Some("DC-B"), PersistenceId.replicatedUniqueId(id, "DC-B"), 1L, "two", - System.currentTimeMillis()) + System.currentTimeMillis(), + Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty))) incarnation2 ! MyActiveActive.Add("three", probe.ref) probe.expectMessage(Done) @@ -199,11 +200,11 @@ class ActiveActiveEventPublishingSpec // simulate a published event from another replica incarnationA1.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - Some("DC-B"), PersistenceId.replicatedUniqueId(id, "DC-B"), 1L, "two", - System.currentTimeMillis()) + System.currentTimeMillis(), + Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty))) incarnationA1 ! MyActiveActive.Stop probe.expectTerminated(incarnationA1) @@ -212,11 +213,11 @@ class ActiveActiveEventPublishingSpec // simulate a published event from another replica incarnationA2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl( - Some("DC-B"), PersistenceId.replicatedUniqueId(id, "DC-B"), 2L, "three", - System.currentTimeMillis()) + System.currentTimeMillis(), + Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty))) incarnationA2 ! MyActiveActive.Add("four", probe.ref) probe.expectMessage(Done) diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveSpec.scala index 5b7dba7ba9..58effb19cf 100644 --- a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveSpec.scala @@ -55,13 +55,13 @@ object ActiveActiveSpec { Effect.stop() }, (state, event) => { - probe.foreach(_ ! EventAndContext(event, aaContext.origin, aaContext.recoveryRunning)) + probe.foreach(_ ! EventAndContext(event, aaContext.origin, aaContext.recoveryRunning, aaContext.concurrent)) state.copy(all = event :: state.all) })) } -case class EventAndContext(event: Any, origin: String, recoveryRunning: Boolean = false) +case class EventAndContext(event: Any, origin: String, recoveryRunning: Boolean, concurrent: Boolean) class ActiveActiveSpec extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config) @@ -161,12 +161,12 @@ class ActiveActiveSpec val r2 = spawn(testBehavior(entityId, "R2", eventProbeR2.ref)) r1 ! StoreMe("from r1", replyProbe.ref) - eventProbeR2.expectMessage(EventAndContext("from r1", "R1")) - eventProbeR1.expectMessage(EventAndContext("from r1", "R1")) + eventProbeR2.expectMessage(EventAndContext("from r1", "R1", false, false)) + eventProbeR1.expectMessage(EventAndContext("from r1", "R1", false, false)) r2 ! StoreMe("from r2", replyProbe.ref) - eventProbeR1.expectMessage(EventAndContext("from r2", "R2")) - eventProbeR2.expectMessage(EventAndContext("from r2", "R2")) + eventProbeR1.expectMessage(EventAndContext("from r2", "R2", false, false)) + eventProbeR2.expectMessage(EventAndContext("from r2", "R2", false, false)) } "set recovery running" in { @@ -175,24 +175,33 @@ class ActiveActiveSpec val replyProbe = createTestProbe[Done]() val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref)) r1 ! StoreMe("Event", replyProbe.ref) - eventProbeR1.expectMessage(EventAndContext("Event", "R1", recoveryRunning = false)) + eventProbeR1.expectMessage(EventAndContext("Event", "R1", recoveryRunning = false, false)) replyProbe.expectMessage(Done) val recoveryProbe = createTestProbe[EventAndContext]() spawn(testBehavior(entityId, "R1", recoveryProbe.ref)) - recoveryProbe.expectMessage(EventAndContext("Event", "R1", recoveryRunning = true)) + recoveryProbe.expectMessage(EventAndContext("Event", "R1", recoveryRunning = true, false)) } "persist all" in { val entityId = nextEntityId val probe = createTestProbe[Done]() - val r1 = spawn(testBehavior(entityId, "R1")) + val eventProbeR1 = createTestProbe[EventAndContext]() + + val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref)) val r2 = spawn(testBehavior(entityId, "R2")) r1 ! StoreUs("1 from r1" :: "2 from r1" :: Nil, probe.ref) r2 ! StoreUs("1 from r2" :: "2 from r2" :: Nil, probe.ref) probe.receiveMessage() probe.receiveMessage() + // events at r2 happened concurrently with events at r1 + + eventProbeR1.expectMessage(EventAndContext("1 from r1", "R1", false, concurrent = false)) + eventProbeR1.expectMessage(EventAndContext("2 from r1", "R1", false, concurrent = false)) + eventProbeR1.expectMessage(EventAndContext("1 from r2", "R2", false, concurrent = true)) + eventProbeR1.expectMessage(EventAndContext("2 from r2", "R2", false, concurrent = true)) + eventually { val probe = createTestProbe[State]() r1 ! GetState(probe.ref) @@ -203,7 +212,116 @@ class ActiveActiveSpec r2 ! GetState(probe.ref) probe.expectMessageType[State].all.toSet shouldEqual Set("1 from r1", "2 from r1", "1 from r2", "2 from r2") } - } + + "replicate alternate events" in { + val entityId = nextEntityId + val probe = createTestProbe[Done]() + val eventProbeR1 = createTestProbe[EventAndContext]() + val eventProbeR2 = createTestProbe[EventAndContext]() + val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref)) + val r2 = spawn(testBehavior(entityId, "R2", eventProbeR2.ref)) + r1 ! StoreMe("from r1", probe.ref) // R1 0 R2 0 -> R1 1 R2 0 + r2 ! StoreMe("from r2", probe.ref) // R2 0 R1 0 -> R2 1 R1 0 + + // each gets its local event + eventProbeR1.expectMessage(EventAndContext("from r1", "R1", recoveryRunning = false, concurrent = false)) + eventProbeR2.expectMessage(EventAndContext("from r2", "R2", recoveryRunning = false, concurrent = false)) + + // then the replicated remote events, which will be concurrent + eventProbeR1.expectMessage(EventAndContext("from r2", "R2", recoveryRunning = false, concurrent = true)) + eventProbeR2.expectMessage(EventAndContext("from r1", "R1", recoveryRunning = false, concurrent = true)) + + // state is updated + eventually { + val probe = createTestProbe[State]() + r1 ! GetState(probe.ref) + probe.expectMessageType[State].all.toSet shouldEqual Set("from r1", "from r2") + } + eventually { + val probe = createTestProbe[State]() + r2 ! GetState(probe.ref) + probe.expectMessageType[State].all.toSet shouldEqual Set("from r1", "from r2") + } + + // Neither of these should be concurrent, nothing happening at r2 + r1 ! StoreMe("from r1 2", probe.ref) // R1 1 R2 1 + eventProbeR1.expectMessage(EventAndContext("from r1 2", "R1", false, concurrent = false)) + eventProbeR2.expectMessage(EventAndContext("from r1 2", "R1", false, concurrent = false)) + r1 ! StoreMe("from r1 3", probe.ref) // R2 2 R2 1 + eventProbeR1.expectMessage(EventAndContext("from r1 3", "R1", false, concurrent = false)) + eventProbeR2.expectMessage(EventAndContext("from r1 3", "R1", false, concurrent = false)) + eventually { + val probe = createTestProbe[State]() + r2 ! GetState(probe.ref) + probe.expectMessageType[State].all.toSet shouldEqual Set("from r1", "from r2", "from r1 2", "from r1 3") + } + + // not concurrent as the above asserts mean that all events are fully replicated + r2 ! StoreMe("from r2 2", probe.ref) + eventProbeR1.expectMessage(EventAndContext("from r2 2", "R2", false, concurrent = false)) + eventProbeR2.expectMessage(EventAndContext("from r2 2", "R2", false, concurrent = false)) + eventually { + val probe = createTestProbe[State]() + r1 ! GetState(probe.ref) + probe.expectMessageType[State].all.toSet shouldEqual Set( + "from r1", + "from r2", + "from r1 2", + "from r1 3", + "from r2 2") + } + } + + "receive each event only once" in { + val entityId = nextEntityId + val probe = createTestProbe[Done]() + val eventProbeR1 = createTestProbe[EventAndContext]() + val eventProbeR2 = createTestProbe[EventAndContext]() + val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref)) + val r2 = spawn(testBehavior(entityId, "R2", eventProbeR2.ref)) + r1 ! StoreMe("from r1 1", probe.ref) + probe.expectMessage(Done) + r1 ! StoreMe("from r1 2", probe.ref) + probe.expectMessage(Done) + + // r2 + eventProbeR2.expectMessage(EventAndContext("from r1 1", "R1", false, false)) + eventProbeR2.expectMessage(EventAndContext("from r1 2", "R1", false, false)) + + r2 ! StoreMe("from r2 1", probe.ref) + probe.expectMessage(Done) + r2 ! StoreMe("from r2 2", probe.ref) + probe.expectMessage(Done) + + // r3 should only get the events 1, not R2s stored version of them + val eventProbeR3 = createTestProbe[EventAndContext]() + spawn(testBehavior(entityId, "R3", eventProbeR3.ref)) + eventProbeR3.expectMessage(EventAndContext("from r1 1", "R1", false, false)) + eventProbeR3.expectMessage(EventAndContext("from r1 2", "R1", false, false)) + eventProbeR3.expectMessage(EventAndContext("from r2 1", "R2", false, false)) + eventProbeR3.expectMessage(EventAndContext("from r2 2", "R2", false, false)) + eventProbeR3.expectNoMessage() + } + + "set concurrent on replay of events" in { + val entityId = nextEntityId + val probe = createTestProbe[Done]() + val eventProbeR1 = createTestProbe[EventAndContext]() + val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref)) + val r2 = spawn(testBehavior(entityId, "R2")) + r1 ! StoreMe("from r1", probe.ref) // R1 0 R2 0 -> R1 1 R2 0 + r2 ! StoreMe("from r2", probe.ref) // R2 0 R1 0 -> R2 1 R1 0 + // local event isn't concurrent, remote event is + eventProbeR1.expectMessage(EventAndContext("from r1", "R1", recoveryRunning = false, concurrent = false)) + eventProbeR1.expectMessage(EventAndContext("from r2", "R2", recoveryRunning = false, concurrent = true)) + + // take 2 + val eventProbeR1Take2 = createTestProbe[EventAndContext]() + spawn(testBehavior(entityId, "R1", eventProbeR1Take2.ref)) + eventProbeR1Take2.expectMessage(EventAndContext("from r1", "R1", recoveryRunning = true, concurrent = false)) + eventProbeR1Take2.expectMessage(EventAndContext("from r2", "R2", recoveryRunning = true, concurrent = true)) + } + } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/PublishedEvent.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/PublishedEvent.scala index 67a9560b83..1311ea6574 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/PublishedEvent.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/PublishedEvent.scala @@ -7,6 +7,7 @@ package akka.persistence.typed import java.util.Optional import akka.annotation.DoNotInherit +import akka.persistence.typed.internal.ReplicatedPublishedEventMetaData /** * When using event publishing the events published to the system event stream will be in this form. @@ -17,10 +18,11 @@ import akka.annotation.DoNotInherit trait PublishedEvent { /** Scala API: When emitted from an Active Active actor this will contain the replica id */ - def replicaId: Option[String] + def replicatedMetaData: Option[ReplicatedPublishedEventMetaData] /** Java API: When emitted from an Active Active actor this will contain the replica id */ - def getReplicaId: Optional[String] + def getReplicatedMetaData: Optional[ReplicatedPublishedEventMetaData] + def persistenceId: PersistenceId def sequenceNumber: Long diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index a14d0735f4..5c46366211 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -274,29 +274,50 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( } // FIXME serializer +/** + * @param originReplica Where the event originally was created + * @param originSequenceNr The original sequenceNr in the origin DC + * @param version The version with which the event was persisted at the different DC. The same event will have different version vectors + * at each location as they are received at different times + */ @InternalApi -private[akka] final case class ReplicatedEventMetaData(originReplica: String, originSequenceNr: Long) +private[akka] final case class ReplicatedEventMetaData( + originReplica: String, + originSequenceNr: Long, + version: VersionVector, + concurrent: Boolean) // whether when the event handler was executed the event was concurrent + +/** + * An event replicated from a different replica. + * + * The version is for when it was persisted at the other replica. At the current replica it will be + * merged with the current local version. + */ @InternalApi -private[akka] final case class ReplicatedEvent[E](event: E, originReplica: String, originSequenceNr: Long) +private[akka] final case class ReplicatedEvent[E]( + event: E, + originReplica: String, + originSequenceNr: Long, + originVersion: VersionVector) @InternalApi private[akka] case object ReplicatedEventAck +final class ReplicatedPublishedEventMetaData(val replicaId: String, private[akka] val version: VersionVector) + /** * INTERNAL API */ @InternalApi private[akka] final case class PublishedEventImpl( - replicaId: Option[String], persistenceId: PersistenceId, sequenceNumber: Long, payload: Any, - timestamp: Long) + timestamp: Long, + replicatedMetaData: Option[ReplicatedPublishedEventMetaData]) extends PublishedEvent with InternalProtocol { import scala.compat.java8.OptionConverters._ - override def getReplicaId: Optional[String] = replicaId.asJava - def tags: Set[String] = payload match { case t: Tagged => t.tags case _ => Set.empty @@ -307,4 +328,5 @@ private[akka] final case class PublishedEventImpl( case _ => payload } + override def getReplicatedMetaData: Optional[ReplicatedPublishedEventMetaData] = replicatedMetaData.asJava } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index a4502bb699..d2612bce34 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -51,6 +51,7 @@ private[akka] object ReplayingEvents { toSeqNr: Long, receivedPoisonPill: Boolean, recoveryStartTime: Long, + version: VersionVector, seenSeqNrPerReplica: Map[String, Long]) def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: ReplayingState[S]): Behavior[InternalProtocol] = @@ -87,8 +88,6 @@ private[akka] final class ReplayingEvents[C, E, S]( () override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = { - // FIXME deal with a replicated event and ack - // https://github.com/akka/akka/issues/29256 msg match { case JournalResponse(r) => onJournalResponse(r) case SnapshotterResponse(r) => onSnapshotterResponse(r) @@ -132,7 +131,7 @@ private[akka] final class ReplayingEvents[C, E, S]( s"Active active enabled but existing event has no metadata. Migration isn't supported yet.") } - aa.setContext(recoveryRunning = true, meta.originReplica) + aa.setContext(recoveryRunning = true, meta.originReplica, meta.concurrent) Some(meta -> aa.replicaId) case None => None } @@ -144,6 +143,7 @@ private[akka] final class ReplayingEvents[C, E, S]( state = state.copy( state = newState, eventSeenInInterval = true, + version = meta.version, seenSeqNrPerReplica = state.seenSeqNrPerReplica + (meta.originReplica -> meta.originSequenceNr)) case _ => state = state.copy(state = newState, eventSeenInInterval = true) @@ -273,6 +273,7 @@ private[akka] final class ReplayingEvents[C, E, S]( seqNr = state.seqNr, state = state.state, receivedPoisonPill = state.receivedPoisonPill, + state.version, seenPerReplica = state.seenSeqNrPerReplica, replicationControl = Map.empty)) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala index 4977e36d78..9f2d464ec3 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala @@ -179,7 +179,8 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup toSnr, receivedPoisonPill, System.nanoTime(), - // FIXME seqNrs for other replicas needs to come from snapshot + VersionVector.empty, + // FIXME seqNrs for other replicas needs to come from snapshot. seenSeqNrPerReplica = setup.activeActive.map(_.allReplicas.map(replica => replica -> 0L).toMap).getOrElse(Map.empty))) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index e4aa609dcf..7c6af4a82a 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -54,7 +54,7 @@ import akka.persistence.typed.internal.Running.WithSeqNrAccessible import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.EventSourcedBehavior.ActiveActive import akka.stream.scaladsl.Keep -import akka.stream.{ SharedKillSwitch, SystemMaterializer } +import akka.stream.SystemMaterializer import akka.stream.scaladsl.{ RestartSource, Sink } import akka.stream.typed.scaladsl.ActorFlow import akka.util.OptionVal @@ -90,9 +90,9 @@ private[akka] object Running { seqNr: Long, state: State, receivedPoisonPill: Boolean, + version: VersionVector, seenPerReplica: Map[String, Long], - replicationControl: Map[String, ReplicationStreamControl], - replicationKillSwitch: Option[SharedKillSwitch] = None) { + replicationControl: Map[String, ReplicationStreamControl]) { def nextSequenceNr(): RunningState[State] = copy(seqNr = seqNr + 1) @@ -138,6 +138,9 @@ private[akka] object Running { val source = RestartSource.withBackoff(2.seconds, 10.seconds, randomFactor = 0.2) { () => replication .eventsByPersistenceId(pid.id, seqNr + 1, Long.MaxValue) + // from each replica, only get the events that originated there, this prevents most of the event filtering + // the downside is that events can't be received via other replicas in the event of an uneven network partition + .filter(_.eventMetadata.get.asInstanceOf[ReplicatedEventMetaData].originReplica == replicaId) .viaMat(new FastForwardingFilter)(Keep.right) .mapMaterializedValue(streamControl => controlRef.set(streamControl)) .via(ActorFlow.ask[EventEnvelope, ReplicatedEventEnvelope[E], ReplicatedEventAck.type](ref) { @@ -145,7 +148,11 @@ private[akka] object Running { // Need to handle this not being available migration from non-active-active is supported val meta = eventEnvelope.eventMetadata.get.asInstanceOf[ReplicatedEventMetaData] val re = - ReplicatedEvent[E](eventEnvelope.event.asInstanceOf[E], meta.originReplica, meta.originSequenceNr) + ReplicatedEvent[E]( + eventEnvelope.event.asInstanceOf[E], + meta.originReplica, + meta.originSequenceNr, + meta.version) ReplicatedEventEnvelope(re, replyTo) }) } @@ -249,18 +256,17 @@ private[akka] object Running { envelope) envelope.ack ! ReplicatedEventAck if (envelope.event.originReplica != activeActive.replicaId && !alreadySeen(envelope.event)) { - activeActive.setContext(false, envelope.event.originReplica) setup.log.debug( "Saving event [{}] from [{}] as first time", envelope.event.originSequenceNr, envelope.event.originReplica) - handleExternalReplicatedEventPersist(envelope.event) + handleExternalReplicatedEventPersist(activeActive, envelope.event) } else { setup.log.debug( "Filtering event [{}] from [{}] as it was already seen", envelope.event.originSequenceNr, envelope.event.originReplica) - this + tryUnstashOne(this) } } @@ -272,12 +278,12 @@ private[akka] object Running { this case Some(activeActive) => - event.replicaId match { + event.replicatedMetaData match { case None => - setup.log.warn("Received published event for [{}] but with no replica id, dropping") + setup.log.warn("Received published event for [{}] but with no replicated metadata, dropping") this - case Some(replicaId) => - onPublishedEvent(state, activeActive, replicaId, event) + case Some(replicatedEventMetaData) => + onPublishedEvent(state, activeActive, replicatedEventMetaData, event) } } tryUnstashOne(newBehavior) @@ -286,11 +292,12 @@ private[akka] object Running { private def onPublishedEvent( state: Running.RunningState[S], activeActive: ActiveActive, - originReplicaId: String, + replicatedMetadata: ReplicatedPublishedEventMetaData, event: PublishedEventImpl): Behavior[InternalProtocol] = { val log = setup.log val separatorIndex = event.persistenceId.id.indexOf(PersistenceId.DefaultSeparator) val idPrefix = event.persistenceId.id.substring(0, separatorIndex) + val originReplicaId = replicatedMetadata.replicaId if (!setup.persistenceId.id.startsWith(idPrefix)) { log.warn("Ignoring published replicated event for the wrong actor [{}]", event.persistenceId) this @@ -326,7 +333,7 @@ private[akka] object Running { "Ignoring published replicated event with replication seqNr [{}] from replica [{}] " + "because expected replication seqNr was [{}] ", event.sequenceNumber, - event.replicaId, + originReplicaId, expectedSequenceNumber) } this @@ -343,7 +350,12 @@ private[akka] object Running { state.replicationControl.get(originReplicaId).foreach(_.fastForward(event.sequenceNumber)) handleExternalReplicatedEventPersist( - ReplicatedEvent(event.event.asInstanceOf[E], originReplicaId, event.sequenceNumber)) + activeActive, + ReplicatedEvent( + event.event.asInstanceOf[E], + originReplicaId, + event.sequenceNumber, + replicatedMetadata.version)) } } @@ -355,8 +367,22 @@ private[akka] object Running { this } - private def handleExternalReplicatedEventPersist(event: ReplicatedEvent[E]): Behavior[InternalProtocol] = { + private def handleExternalReplicatedEventPersist( + activeActive: ActiveActive, + event: ReplicatedEvent[E]): Behavior[InternalProtocol] = { _currentSequenceNumber = state.seqNr + 1 + val isConcurrent: Boolean = event.originVersion <> state.version + val updatedVersion = event.originVersion.merge(state.version) + activeActive.setContext(false, event.originReplica, isConcurrent) + + setup.log.debugN( + "Processing event [{}] with version [{}]. Local version: {}. Updated version {}. Concurrent? {}", + event.event, + event.originVersion, + state.version, + updatedVersion, + isConcurrent) + val newState: RunningState[S] = state.applyEvent(setup, event.event) val newState2: RunningState[S] = internalPersist( setup.context, @@ -364,12 +390,13 @@ private[akka] object Running { newState, event.event, "", - OptionVal.Some(ReplicatedEventMetaData(event.originReplica, event.originSequenceNr))) + OptionVal.Some( + ReplicatedEventMetaData(event.originReplica, event.originSequenceNr, updatedVersion, isConcurrent))) val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event.event, newState2.seqNr) // FIXME validate this is the correct sequence nr from that replica https://github.com/akka/akka/issues/29259 val updatedSeen = newState2.seenPerReplica.updated(event.originReplica, event.originSequenceNr) persistingEvents( - newState2.copy(seenPerReplica = updatedSeen), + newState2.copy(seenPerReplica = updatedSeen, version = updatedVersion), state, numberOfEvents = 1, shouldSnapshotAfterPersist, @@ -386,8 +413,10 @@ private[akka] object Running { _currentSequenceNumber = state.seqNr + 1 setup.activeActive.foreach { aa => - aa.setContext(recoveryRunning = false, aa.replicaId) + // set concurrent to false, local events are never concurrent + aa.setContext(recoveryRunning = false, aa.replicaId, concurrent = false) } + val newState: RunningState[S] = state.applyEvent(setup, event) val eventToPersist = adaptEvent(event) @@ -395,13 +424,20 @@ private[akka] object Running { val newState2 = setup.activeActive match { case Some(aa) => - internalPersist( + val updatedVersion = newState.version.updated(aa.replicaId, _currentSequenceNumber) + val r = internalPersist( setup.context, cmd, newState, eventToPersist, eventAdapterManifest, - OptionVal.Some(ReplicatedEventMetaData(aa.replicaId, _currentSequenceNumber))) + OptionVal.Some( + ReplicatedEventMetaData(aa.replicaId, _currentSequenceNumber, updatedVersion, concurrent = false))) + .copy(version = updatedVersion) + + setup.log.debug("Event persisted [{}]. Version vector after: [{}]", eventToPersist, r.version) + + r case None => internalPersist(setup.context, cmd, newState, eventToPersist, eventAdapterManifest, OptionVal.None) } @@ -422,14 +458,15 @@ private[akka] object Running { val metadataTemplate: Option[ReplicatedEventMetaData] = setup.activeActive match { case Some(aa) => - aa.setContext(recoveryRunning = false, aa.replicaId) - Some(ReplicatedEventMetaData(aa.replicaId, 0L)) // we replace it with actual seqnr later + aa.setContext(recoveryRunning = false, aa.replicaId, concurrent = false) // local events are never concurrent + Some(ReplicatedEventMetaData(aa.replicaId, 0L, state.version, concurrent = false)) // we replace it with actual seqnr later case None => None } var currentState = state var shouldSnapshotAfterPersist: SnapshotAfterPersist = NoSnapshot var eventsToPersist: List[EventToPersist] = Nil + events.foreach { event => _currentSequenceNumber += 1 if (shouldSnapshotAfterPersist == NoSnapshot) @@ -437,14 +474,19 @@ private[akka] object Running { val evtManifest = setup.eventAdapter.manifest(event) val adaptedEvent = adaptEvent(event) val eventMetadata = metadataTemplate match { - case Some(template) => Some(template.copy(originSequenceNr = _currentSequenceNumber)) - case None => None + case Some(template) => + val updatedVersion = currentState.version.updated(template.originReplica, _currentSequenceNumber) + setup.log.trace("Processing event [{}] with version vector [{}]", event, updatedVersion) + currentState = currentState.copy(version = updatedVersion) + Some(template.copy(originSequenceNr = _currentSequenceNumber, version = updatedVersion)) + case None => None } currentState = currentState.applyEvent(setup, event) eventsToPersist = EventToPersist(adaptedEvent, evtManifest, eventMetadata) :: eventsToPersist } - val newState2 = internalPersistAll(setup.context, cmd, currentState, eventsToPersist.reverse) + val newState2 = + internalPersistAll(setup.context, cmd, currentState, eventsToPersist.reverse) (persistingEvents(newState2, state, events.size, shouldSnapshotAfterPersist, sideEffects), false) } else { @@ -585,8 +627,9 @@ private[akka] object Running { onWriteSuccess(setup.context, p) if (setup.publishEvents) { + val meta = setup.activeActive.map(aa => new ReplicatedPublishedEventMetaData(aa.replicaId, state.version)) context.system.eventStream ! EventStream.Publish( - PublishedEventImpl(setup.replicaId, setup.persistenceId, p.sequenceNr, p.payload, p.timestamp)) + PublishedEventImpl(setup.persistenceId, p.sequenceNr, p.payload, p.timestamp, meta)) } // only once all things are applied we can revert back diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/VersionVector.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/VersionVector.scala new file mode 100644 index 0000000000..42e5b7bbf5 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/VersionVector.scala @@ -0,0 +1,323 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.typed.internal +import scala.annotation.tailrec +import scala.collection.immutable.TreeMap +import akka.annotation.InternalApi + +/** + * INTERNAL API + * + * VersionVector module with helper classes and methods. + */ +@InternalApi +object VersionVector { + + private val emptyVersions: TreeMap[String, Long] = TreeMap.empty + val empty: VersionVector = ManyVersionVector(emptyVersions) + + def apply(): VersionVector = empty + + def apply(versions: TreeMap[String, Long]): VersionVector = + if (versions.isEmpty) empty + else if (versions.size == 1) apply(versions.head._1, versions.head._2) + else ManyVersionVector(versions) + + def apply(key: String, version: Long): VersionVector = OneVersionVector(key, version) + + /** INTERNAL API */ + @InternalApi private[akka] def apply(versions: List[(String, Long)]): VersionVector = + if (versions.isEmpty) empty + else if (versions.tail.isEmpty) apply(versions.head._1, versions.head._2) + else apply(emptyVersions ++ versions) + + sealed trait Ordering + case object After extends Ordering + case object Before extends Ordering + case object Same extends Ordering + case object Concurrent extends Ordering + + /** + * Marker to ensure that we do a full order comparison instead of bailing out early. + */ + private case object FullOrder extends Ordering + + /** INTERNAL API */ + @InternalApi private[akka] object Timestamp { + final val Zero = 0L + final val EndMarker = Long.MinValue + } + + /** + * Marker to signal that we have reached the end of a version vector. + */ + private val cmpEndMarker = (null, Timestamp.EndMarker) + +} + +/** + * INTERNAL API + * + * Representation of a Vector-based clock (counting clock), inspired by Lamport logical clocks. + * {{{ + * Reference: + * 1) Leslie Lamport (1978). "Time, clocks, and the ordering of events in a distributed system". Communications of the ACM 21 (7): 558-565. + * 2) Friedemann Mattern (1988). "Virtual Time and Global States of Distributed Systems". Workshop on Parallel and Distributed Algorithms: pp. 215-226 + * }}} + * + * Based on `akka.cluster.ddata.VersionVector`. + * + * This class is immutable, i.e. "modifying" methods return a new instance. + */ +@SerialVersionUID(1L) +@InternalApi +sealed abstract class VersionVector extends Serializable { + + type T = VersionVector + + import VersionVector._ + + /** + * Increment the version for the key passed as argument. Returns a new VersionVector. + */ + def +(key: String): VersionVector = increment(key) + + /** + * Increment the version for the key passed as argument. Returns a new VersionVector. + */ + def increment(key: String): VersionVector + + def updated(key: String, version: Long): VersionVector + + def isEmpty: Boolean + + /** + * INTERNAL API + */ + @InternalApi private[akka] def size: Int + + def versionAt(key: String): Long + + /** + * INTERNAL API + */ + @InternalApi private[akka] def contains(key: String): Boolean + + /** + * Returns true if this and that are concurrent else false. + */ + def <>(that: VersionVector): Boolean = compareOnlyTo(that, Concurrent) eq Concurrent + + /** + * Returns true if this is before that else false. + */ + def <(that: VersionVector): Boolean = compareOnlyTo(that, Before) eq Before + + /** + * Returns true if this is after that else false. + */ + def >(that: VersionVector): Boolean = compareOnlyTo(that, After) eq After + + /** + * Returns true if this VersionVector has the same history as the 'that' VersionVector else false. + */ + def ==(that: VersionVector): Boolean = compareOnlyTo(that, Same) eq Same + + /** + * Version vector comparison according to the semantics described by compareTo, with the ability to bail + * out early if the we can't reach the Ordering that we are looking for. + * + * The ordering always starts with Same and can then go to Same, Before or After + * If we're on After we can only go to After or Concurrent + * If we're on Before we can only go to Before or Concurrent + * If we go to Concurrent we exit the loop immediately + * + * If you send in the ordering FullOrder, you will get a full comparison. + */ + private final def compareOnlyTo(that: VersionVector, order: Ordering): Ordering = { + def nextOrElse[A](iter: Iterator[A], default: A): A = if (iter.hasNext) iter.next() else default + + def compare(i1: Iterator[(String, Long)], i2: Iterator[(String, Long)], requestedOrder: Ordering): Ordering = { + @tailrec + def compareNext(nt1: (String, Long), nt2: (String, Long), currentOrder: Ordering): Ordering = + if ((requestedOrder ne FullOrder) && (currentOrder ne Same) && (currentOrder ne requestedOrder)) currentOrder + else if ((nt1 eq cmpEndMarker) && (nt2 eq cmpEndMarker)) currentOrder + // i1 is empty but i2 is not, so i1 can only be Before + else if (nt1 eq cmpEndMarker) { + if (currentOrder eq After) Concurrent else Before + } + // i2 is empty but i1 is not, so i1 can only be After + else if (nt2 eq cmpEndMarker) { + if (currentOrder eq Before) Concurrent else After + } else { + // compare the entries + val nc = nt1._1.compareTo(nt2._1) + if (nc == 0) { + // both entries exist compare the timestamps + // same timestamp so just continue with the next entry + if (nt1._2 == nt2._2) compareNext(nextOrElse(i1, cmpEndMarker), nextOrElse(i2, cmpEndMarker), currentOrder) + else if (nt1._2 < nt2._2) { + // t1 is less than t2, so i1 can only be Before + if (currentOrder eq After) Concurrent + else compareNext(nextOrElse(i1, cmpEndMarker), nextOrElse(i2, cmpEndMarker), Before) + } else { + // t2 is less than t1, so i1 can only be After + if (currentOrder eq Before) Concurrent + else compareNext(nextOrElse(i1, cmpEndMarker), nextOrElse(i2, cmpEndMarker), After) + } + } else if (nc < 0) { + // this entry only exists in i1 so i1 can only be After + if (currentOrder eq Before) Concurrent + else compareNext(nextOrElse(i1, cmpEndMarker), nt2, After) + } else { + // this entry only exists in i2 so i1 can only be Before + if (currentOrder eq After) Concurrent + else compareNext(nt1, nextOrElse(i2, cmpEndMarker), Before) + } + } + + compareNext(nextOrElse(i1, cmpEndMarker), nextOrElse(i2, cmpEndMarker), Same) + } + + if (this eq that) Same + else compare(this.versionsIterator, that.versionsIterator, if (order eq Concurrent) FullOrder else order) + } + + /** + * INTERNAL API + */ + @InternalApi private[akka] def versionsIterator: Iterator[(String, Long)] + + /** + * Compare two version vectors. The outcome will be one of the following: + *

+ * {{{ + * 1. Version 1 is SAME (==) as Version 2 iff for all i c1(i) == c2(i) + * 2. Version 1 is BEFORE (<) Version 2 iff for all i c1(i) <= c2(i) and there exist a j such that c1(j) < c2(j) + * 3. Version 1 is AFTER (>) Version 2 iff for all i c1(i) >= c2(i) and there exist a j such that c1(j) > c2(j). + * 4. Version 1 is CONCURRENT (<>) to Version 2 otherwise. + * }}} + */ + def compareTo(that: VersionVector): Ordering = { + compareOnlyTo(that, FullOrder) + } + + def merge(that: VersionVector): VersionVector + +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] final case class OneVersionVector private[akka] (key: String, version: Long) + extends VersionVector { + import VersionVector.Timestamp + + override def isEmpty: Boolean = false + + /** INTERNAL API */ + @InternalApi private[akka] override def size: Int = 1 + + override def increment(k: String): VersionVector = { + val v = version + 1 + if (k == key) copy(version = v) + else ManyVersionVector(TreeMap(key -> version, k -> v)) + } + + override def updated(k: String, v: Long): VersionVector = { + if (k == key) copy(version = v) + else ManyVersionVector(TreeMap(key -> version, k -> v)) + } + + override def versionAt(k: String): Long = + if (k == key) version + else Timestamp.Zero + + /** INTERNAL API */ + @InternalApi private[akka] override def contains(k: String): Boolean = + k == key + + /** INTERNAL API */ + @InternalApi private[akka] override def versionsIterator: Iterator[(String, Long)] = + Iterator.single((key, version)) + + override def merge(that: VersionVector): VersionVector = { + that match { + case OneVersionVector(n2, v2) => + if (key == n2) if (version >= v2) this else OneVersionVector(n2, v2) + else ManyVersionVector(TreeMap(key -> version, n2 -> v2)) + case ManyVersionVector(vs2) => + val v2 = vs2.getOrElse(key, Timestamp.Zero) + val mergedVersions = + if (v2 >= version) vs2 + else vs2.updated(key, version) + VersionVector(mergedVersions) + } + } + override def toString: String = + s"VersionVector($key -> $version)" + +} + +// TODO we could add more specialized/optimized implementations for 2 and 3 entries, because +// that will be the typical number of data centers + +/** + * INTERNAL API + */ +@InternalApi private[akka] final case class ManyVersionVector(versions: TreeMap[String, Long]) extends VersionVector { + import VersionVector.Timestamp + + override def isEmpty: Boolean = versions.isEmpty + + /** INTERNAL API */ + @InternalApi private[akka] override def size: Int = versions.size + + override def increment(key: String): VersionVector = { + val v = versionAt(key) + 1 + VersionVector(versions.updated(key, v)) + } + + override def updated(key: String, v: Long): VersionVector = + VersionVector(versions.updated(key, v)) + + override def versionAt(key: String): Long = versions.get(key) match { + case Some(v) => v + case None => Timestamp.Zero + } + + /** INTERNAL API */ + @InternalApi private[akka] override def contains(key: String): Boolean = + versions.contains(key) + + /** INTERNAL API */ + @InternalApi private[akka] override def versionsIterator: Iterator[(String, Long)] = + versions.iterator + + override def merge(that: VersionVector): VersionVector = { + if (that.isEmpty) this + else if (this.isEmpty) that + else + that match { + case ManyVersionVector(vs2) => + var mergedVersions = vs2 + for ((key, time) <- versions) { + val mergedVersionsCurrentTime = mergedVersions.getOrElse(key, Timestamp.Zero) + if (time > mergedVersionsCurrentTime) + mergedVersions = mergedVersions.updated(key, time) + } + VersionVector(mergedVersions) + case OneVersionVector(n2, v2) => + val v1 = versions.getOrElse(n2, Timestamp.Zero) + val mergedVersions = + if (v1 >= v2) versions + else versions.updated(n2, v2) + VersionVector(mergedVersions) + } + } + + override def toString: String = + versions.map { case (k, v) => k + " -> " + v }.mkString("VersionVector(", ", ", ")") +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ActiveActiveEventSourcing.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ActiveActiveEventSourcing.scala index f0695b7b8d..888c7e3487 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ActiveActiveEventSourcing.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ActiveActiveEventSourcing.scala @@ -56,6 +56,7 @@ private[akka] class ActiveActiveContextImpl(val entityId: String, val replicaId: extends ActiveActiveContext { var _origin: String = null var _recoveryRunning: Boolean = false + var _concurrent: Boolean = false // FIXME check illegal access https://github.com/akka/akka/issues/29264 @@ -69,7 +70,7 @@ private[akka] class ActiveActiveContextImpl(val entityId: String, val replicaId: * Whether the happened concurrently with an event from another replica. * Undefined result if called from any where other than an event handler. */ - override def concurrent: Boolean = throw new UnsupportedOperationException("TODO") + override def concurrent: Boolean = _concurrent override def persistenceId: PersistenceId = PersistenceId.replicatedUniqueId(entityId, replicaId) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala index 921f9a014f..c7357ac198 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala @@ -33,8 +33,9 @@ object EventSourcedBehavior { /** * Must only be called on the same thread that will execute the user code */ - def setContext(recoveryRunning: Boolean, originReplica: String): Unit = { + def setContext(recoveryRunning: Boolean, originReplica: String, concurrent: Boolean): Unit = { aaContext._recoveryRunning = recoveryRunning + aaContext._concurrent = concurrent aaContext._origin = originReplica }