Introduce version vectors for replicated events (#29332)

* ActiveActive: Events with metadata and events by persistence id for  (#29287)
* Introduce version vectors for replicated events

* Set concurrent on recovery

* Group together AA related fields in a published event
This commit is contained in:
Christopher Batey 2020-07-08 09:04:59 +01:00
parent 82b8d699ca
commit 398ab2efe0
11 changed files with 582 additions and 70 deletions

View file

@ -5,14 +5,13 @@
package akka.cluster.sharding.typed
import org.scalatest.wordspec.AnyWordSpecLike
import akka.Done
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.eventstream.EventStream
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.PublishedEvent
import akka.persistence.typed.internal.PublishedEventImpl
import akka.persistence.typed.internal.{ PublishedEventImpl, ReplicatedPublishedEventMetaData, VersionVector }
class ActiveActiveShardingDirectReplicationSpec
extends ScalaTestWithActorTestKit
@ -37,11 +36,11 @@ class ActiveActiveShardingDirectReplicationSpec
upProbe.receiveMessage() // not bullet proof wrt to subscription being complete but good enough
val event = PublishedEventImpl(
Some("ReplicaA"),
PersistenceId.replicatedUniqueId("pid", "ReplicaA"),
1L,
"event",
System.currentTimeMillis())
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData("ReplicaA", VersionVector.empty)))
system.eventStream ! EventStream.Publish(event)
replicaBProbe.receiveMessage().message should equal(event)

View file

@ -12,6 +12,7 @@ import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.internal.{ ReplicatedPublishedEventMetaData, VersionVector }
import akka.persistence.typed.scaladsl.ActiveActiveEventSourcing
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
@ -74,11 +75,11 @@ class ActiveActiveEventPublishingSpec
// simulate a published event from another replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
Some("DC-B"),
PersistenceId.replicatedUniqueId(id, "DC-B"),
1L,
"two",
System.currentTimeMillis())
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty)))
actor ! MyActiveActive.Add("three", probe.ref)
probe.expectMessage(Done)
@ -95,11 +96,11 @@ class ActiveActiveEventPublishingSpec
// simulate a published event from another replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
Some("DC-B"),
PersistenceId.replicatedUniqueId(id, "DC-B"),
2L, // missing 1L
"two",
System.currentTimeMillis())
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty)))
actor ! MyActiveActive.Add("three", probe.ref)
probe.expectMessage(Done)
@ -116,11 +117,11 @@ class ActiveActiveEventPublishingSpec
// simulate a published event from another replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
Some("DC-C"),
PersistenceId.replicatedUniqueId(id, "DC-C"),
1L,
"two",
System.currentTimeMillis())
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData("DC-C", VersionVector.empty)))
actor ! MyActiveActive.Add("three", probe.ref)
probe.expectMessage(Done)
@ -137,18 +138,18 @@ class ActiveActiveEventPublishingSpec
// simulate a published event from another replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
Some("DC-B"),
PersistenceId.replicatedUniqueId("myId4", "DC-B"),
1L,
"two",
System.currentTimeMillis())
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty)))
// simulate another published event from that replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
Some("DC-B"),
PersistenceId.replicatedUniqueId(id, "DC-B"),
1L,
"two-again", // ofc this would be the same in the real world, different just so we can detect
System.currentTimeMillis())
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty)))
actor ! MyActiveActive.Add("three", probe.ref)
probe.expectMessage(Done)
@ -176,11 +177,11 @@ class ActiveActiveEventPublishingSpec
// simulate a published event from another replica
incarnation2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
Some("DC-B"),
PersistenceId.replicatedUniqueId(id, "DC-B"),
1L,
"two",
System.currentTimeMillis())
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty)))
incarnation2 ! MyActiveActive.Add("three", probe.ref)
probe.expectMessage(Done)
@ -199,11 +200,11 @@ class ActiveActiveEventPublishingSpec
// simulate a published event from another replica
incarnationA1.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
Some("DC-B"),
PersistenceId.replicatedUniqueId(id, "DC-B"),
1L,
"two",
System.currentTimeMillis())
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty)))
incarnationA1 ! MyActiveActive.Stop
probe.expectTerminated(incarnationA1)
@ -212,11 +213,11 @@ class ActiveActiveEventPublishingSpec
// simulate a published event from another replica
incarnationA2.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
Some("DC-B"),
PersistenceId.replicatedUniqueId(id, "DC-B"),
2L,
"three",
System.currentTimeMillis())
System.currentTimeMillis(),
Some(new ReplicatedPublishedEventMetaData("DC-B", VersionVector.empty)))
incarnationA2 ! MyActiveActive.Add("four", probe.ref)
probe.expectMessage(Done)

View file

@ -55,13 +55,13 @@ object ActiveActiveSpec {
Effect.stop()
},
(state, event) => {
probe.foreach(_ ! EventAndContext(event, aaContext.origin, aaContext.recoveryRunning))
probe.foreach(_ ! EventAndContext(event, aaContext.origin, aaContext.recoveryRunning, aaContext.concurrent))
state.copy(all = event :: state.all)
}))
}
case class EventAndContext(event: Any, origin: String, recoveryRunning: Boolean = false)
case class EventAndContext(event: Any, origin: String, recoveryRunning: Boolean, concurrent: Boolean)
class ActiveActiveSpec
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config)
@ -161,12 +161,12 @@ class ActiveActiveSpec
val r2 = spawn(testBehavior(entityId, "R2", eventProbeR2.ref))
r1 ! StoreMe("from r1", replyProbe.ref)
eventProbeR2.expectMessage(EventAndContext("from r1", "R1"))
eventProbeR1.expectMessage(EventAndContext("from r1", "R1"))
eventProbeR2.expectMessage(EventAndContext("from r1", "R1", false, false))
eventProbeR1.expectMessage(EventAndContext("from r1", "R1", false, false))
r2 ! StoreMe("from r2", replyProbe.ref)
eventProbeR1.expectMessage(EventAndContext("from r2", "R2"))
eventProbeR2.expectMessage(EventAndContext("from r2", "R2"))
eventProbeR1.expectMessage(EventAndContext("from r2", "R2", false, false))
eventProbeR2.expectMessage(EventAndContext("from r2", "R2", false, false))
}
"set recovery running" in {
@ -175,24 +175,33 @@ class ActiveActiveSpec
val replyProbe = createTestProbe[Done]()
val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref))
r1 ! StoreMe("Event", replyProbe.ref)
eventProbeR1.expectMessage(EventAndContext("Event", "R1", recoveryRunning = false))
eventProbeR1.expectMessage(EventAndContext("Event", "R1", recoveryRunning = false, false))
replyProbe.expectMessage(Done)
val recoveryProbe = createTestProbe[EventAndContext]()
spawn(testBehavior(entityId, "R1", recoveryProbe.ref))
recoveryProbe.expectMessage(EventAndContext("Event", "R1", recoveryRunning = true))
recoveryProbe.expectMessage(EventAndContext("Event", "R1", recoveryRunning = true, false))
}
"persist all" in {
val entityId = nextEntityId
val probe = createTestProbe[Done]()
val r1 = spawn(testBehavior(entityId, "R1"))
val eventProbeR1 = createTestProbe[EventAndContext]()
val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref))
val r2 = spawn(testBehavior(entityId, "R2"))
r1 ! StoreUs("1 from r1" :: "2 from r1" :: Nil, probe.ref)
r2 ! StoreUs("1 from r2" :: "2 from r2" :: Nil, probe.ref)
probe.receiveMessage()
probe.receiveMessage()
// events at r2 happened concurrently with events at r1
eventProbeR1.expectMessage(EventAndContext("1 from r1", "R1", false, concurrent = false))
eventProbeR1.expectMessage(EventAndContext("2 from r1", "R1", false, concurrent = false))
eventProbeR1.expectMessage(EventAndContext("1 from r2", "R2", false, concurrent = true))
eventProbeR1.expectMessage(EventAndContext("2 from r2", "R2", false, concurrent = true))
eventually {
val probe = createTestProbe[State]()
r1 ! GetState(probe.ref)
@ -203,7 +212,116 @@ class ActiveActiveSpec
r2 ! GetState(probe.ref)
probe.expectMessageType[State].all.toSet shouldEqual Set("1 from r1", "2 from r1", "1 from r2", "2 from r2")
}
}
"replicate alternate events" in {
val entityId = nextEntityId
val probe = createTestProbe[Done]()
val eventProbeR1 = createTestProbe[EventAndContext]()
val eventProbeR2 = createTestProbe[EventAndContext]()
val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref))
val r2 = spawn(testBehavior(entityId, "R2", eventProbeR2.ref))
r1 ! StoreMe("from r1", probe.ref) // R1 0 R2 0 -> R1 1 R2 0
r2 ! StoreMe("from r2", probe.ref) // R2 0 R1 0 -> R2 1 R1 0
// each gets its local event
eventProbeR1.expectMessage(EventAndContext("from r1", "R1", recoveryRunning = false, concurrent = false))
eventProbeR2.expectMessage(EventAndContext("from r2", "R2", recoveryRunning = false, concurrent = false))
// then the replicated remote events, which will be concurrent
eventProbeR1.expectMessage(EventAndContext("from r2", "R2", recoveryRunning = false, concurrent = true))
eventProbeR2.expectMessage(EventAndContext("from r1", "R1", recoveryRunning = false, concurrent = true))
// state is updated
eventually {
val probe = createTestProbe[State]()
r1 ! GetState(probe.ref)
probe.expectMessageType[State].all.toSet shouldEqual Set("from r1", "from r2")
}
eventually {
val probe = createTestProbe[State]()
r2 ! GetState(probe.ref)
probe.expectMessageType[State].all.toSet shouldEqual Set("from r1", "from r2")
}
// Neither of these should be concurrent, nothing happening at r2
r1 ! StoreMe("from r1 2", probe.ref) // R1 1 R2 1
eventProbeR1.expectMessage(EventAndContext("from r1 2", "R1", false, concurrent = false))
eventProbeR2.expectMessage(EventAndContext("from r1 2", "R1", false, concurrent = false))
r1 ! StoreMe("from r1 3", probe.ref) // R2 2 R2 1
eventProbeR1.expectMessage(EventAndContext("from r1 3", "R1", false, concurrent = false))
eventProbeR2.expectMessage(EventAndContext("from r1 3", "R1", false, concurrent = false))
eventually {
val probe = createTestProbe[State]()
r2 ! GetState(probe.ref)
probe.expectMessageType[State].all.toSet shouldEqual Set("from r1", "from r2", "from r1 2", "from r1 3")
}
// not concurrent as the above asserts mean that all events are fully replicated
r2 ! StoreMe("from r2 2", probe.ref)
eventProbeR1.expectMessage(EventAndContext("from r2 2", "R2", false, concurrent = false))
eventProbeR2.expectMessage(EventAndContext("from r2 2", "R2", false, concurrent = false))
eventually {
val probe = createTestProbe[State]()
r1 ! GetState(probe.ref)
probe.expectMessageType[State].all.toSet shouldEqual Set(
"from r1",
"from r2",
"from r1 2",
"from r1 3",
"from r2 2")
}
}
"receive each event only once" in {
val entityId = nextEntityId
val probe = createTestProbe[Done]()
val eventProbeR1 = createTestProbe[EventAndContext]()
val eventProbeR2 = createTestProbe[EventAndContext]()
val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref))
val r2 = spawn(testBehavior(entityId, "R2", eventProbeR2.ref))
r1 ! StoreMe("from r1 1", probe.ref)
probe.expectMessage(Done)
r1 ! StoreMe("from r1 2", probe.ref)
probe.expectMessage(Done)
// r2
eventProbeR2.expectMessage(EventAndContext("from r1 1", "R1", false, false))
eventProbeR2.expectMessage(EventAndContext("from r1 2", "R1", false, false))
r2 ! StoreMe("from r2 1", probe.ref)
probe.expectMessage(Done)
r2 ! StoreMe("from r2 2", probe.ref)
probe.expectMessage(Done)
// r3 should only get the events 1, not R2s stored version of them
val eventProbeR3 = createTestProbe[EventAndContext]()
spawn(testBehavior(entityId, "R3", eventProbeR3.ref))
eventProbeR3.expectMessage(EventAndContext("from r1 1", "R1", false, false))
eventProbeR3.expectMessage(EventAndContext("from r1 2", "R1", false, false))
eventProbeR3.expectMessage(EventAndContext("from r2 1", "R2", false, false))
eventProbeR3.expectMessage(EventAndContext("from r2 2", "R2", false, false))
eventProbeR3.expectNoMessage()
}
"set concurrent on replay of events" in {
val entityId = nextEntityId
val probe = createTestProbe[Done]()
val eventProbeR1 = createTestProbe[EventAndContext]()
val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref))
val r2 = spawn(testBehavior(entityId, "R2"))
r1 ! StoreMe("from r1", probe.ref) // R1 0 R2 0 -> R1 1 R2 0
r2 ! StoreMe("from r2", probe.ref) // R2 0 R1 0 -> R2 1 R1 0
// local event isn't concurrent, remote event is
eventProbeR1.expectMessage(EventAndContext("from r1", "R1", recoveryRunning = false, concurrent = false))
eventProbeR1.expectMessage(EventAndContext("from r2", "R2", recoveryRunning = false, concurrent = true))
// take 2
val eventProbeR1Take2 = createTestProbe[EventAndContext]()
spawn(testBehavior(entityId, "R1", eventProbeR1Take2.ref))
eventProbeR1Take2.expectMessage(EventAndContext("from r1", "R1", recoveryRunning = true, concurrent = false))
eventProbeR1Take2.expectMessage(EventAndContext("from r2", "R2", recoveryRunning = true, concurrent = true))
}
}
}

View file

@ -7,6 +7,7 @@ package akka.persistence.typed
import java.util.Optional
import akka.annotation.DoNotInherit
import akka.persistence.typed.internal.ReplicatedPublishedEventMetaData
/**
* When using event publishing the events published to the system event stream will be in this form.
@ -17,10 +18,11 @@ import akka.annotation.DoNotInherit
trait PublishedEvent {
/** Scala API: When emitted from an Active Active actor this will contain the replica id */
def replicaId: Option[String]
def replicatedMetaData: Option[ReplicatedPublishedEventMetaData]
/** Java API: When emitted from an Active Active actor this will contain the replica id */
def getReplicaId: Optional[String]
def getReplicatedMetaData: Optional[ReplicatedPublishedEventMetaData]
def persistenceId: PersistenceId
def sequenceNumber: Long

View file

@ -274,29 +274,50 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
}
// FIXME serializer
/**
* @param originReplica Where the event originally was created
* @param originSequenceNr The original sequenceNr in the origin DC
* @param version The version with which the event was persisted at the different DC. The same event will have different version vectors
* at each location as they are received at different times
*/
@InternalApi
private[akka] final case class ReplicatedEventMetaData(originReplica: String, originSequenceNr: Long)
private[akka] final case class ReplicatedEventMetaData(
originReplica: String,
originSequenceNr: Long,
version: VersionVector,
concurrent: Boolean) // whether when the event handler was executed the event was concurrent
/**
* An event replicated from a different replica.
*
* The version is for when it was persisted at the other replica. At the current replica it will be
* merged with the current local version.
*/
@InternalApi
private[akka] final case class ReplicatedEvent[E](event: E, originReplica: String, originSequenceNr: Long)
private[akka] final case class ReplicatedEvent[E](
event: E,
originReplica: String,
originSequenceNr: Long,
originVersion: VersionVector)
@InternalApi
private[akka] case object ReplicatedEventAck
final class ReplicatedPublishedEventMetaData(val replicaId: String, private[akka] val version: VersionVector)
/**
* INTERNAL API
*/
@InternalApi
private[akka] final case class PublishedEventImpl(
replicaId: Option[String],
persistenceId: PersistenceId,
sequenceNumber: Long,
payload: Any,
timestamp: Long)
timestamp: Long,
replicatedMetaData: Option[ReplicatedPublishedEventMetaData])
extends PublishedEvent
with InternalProtocol {
import scala.compat.java8.OptionConverters._
override def getReplicaId: Optional[String] = replicaId.asJava
def tags: Set[String] = payload match {
case t: Tagged => t.tags
case _ => Set.empty
@ -307,4 +328,5 @@ private[akka] final case class PublishedEventImpl(
case _ => payload
}
override def getReplicatedMetaData: Optional[ReplicatedPublishedEventMetaData] = replicatedMetaData.asJava
}

View file

@ -51,6 +51,7 @@ private[akka] object ReplayingEvents {
toSeqNr: Long,
receivedPoisonPill: Boolean,
recoveryStartTime: Long,
version: VersionVector,
seenSeqNrPerReplica: Map[String, Long])
def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: ReplayingState[S]): Behavior[InternalProtocol] =
@ -87,8 +88,6 @@ private[akka] final class ReplayingEvents[C, E, S](
()
override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = {
// FIXME deal with a replicated event and ack
// https://github.com/akka/akka/issues/29256
msg match {
case JournalResponse(r) => onJournalResponse(r)
case SnapshotterResponse(r) => onSnapshotterResponse(r)
@ -132,7 +131,7 @@ private[akka] final class ReplayingEvents[C, E, S](
s"Active active enabled but existing event has no metadata. Migration isn't supported yet.")
}
aa.setContext(recoveryRunning = true, meta.originReplica)
aa.setContext(recoveryRunning = true, meta.originReplica, meta.concurrent)
Some(meta -> aa.replicaId)
case None => None
}
@ -144,6 +143,7 @@ private[akka] final class ReplayingEvents[C, E, S](
state = state.copy(
state = newState,
eventSeenInInterval = true,
version = meta.version,
seenSeqNrPerReplica = state.seenSeqNrPerReplica + (meta.originReplica -> meta.originSequenceNr))
case _ =>
state = state.copy(state = newState, eventSeenInInterval = true)
@ -273,6 +273,7 @@ private[akka] final class ReplayingEvents[C, E, S](
seqNr = state.seqNr,
state = state.state,
receivedPoisonPill = state.receivedPoisonPill,
state.version,
seenPerReplica = state.seenSeqNrPerReplica,
replicationControl = Map.empty))

View file

@ -179,7 +179,8 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
toSnr,
receivedPoisonPill,
System.nanoTime(),
// FIXME seqNrs for other replicas needs to come from snapshot
VersionVector.empty,
// FIXME seqNrs for other replicas needs to come from snapshot.
seenSeqNrPerReplica =
setup.activeActive.map(_.allReplicas.map(replica => replica -> 0L).toMap).getOrElse(Map.empty)))
}

View file

@ -54,7 +54,7 @@ import akka.persistence.typed.internal.Running.WithSeqNrAccessible
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior.ActiveActive
import akka.stream.scaladsl.Keep
import akka.stream.{ SharedKillSwitch, SystemMaterializer }
import akka.stream.SystemMaterializer
import akka.stream.scaladsl.{ RestartSource, Sink }
import akka.stream.typed.scaladsl.ActorFlow
import akka.util.OptionVal
@ -90,9 +90,9 @@ private[akka] object Running {
seqNr: Long,
state: State,
receivedPoisonPill: Boolean,
version: VersionVector,
seenPerReplica: Map[String, Long],
replicationControl: Map[String, ReplicationStreamControl],
replicationKillSwitch: Option[SharedKillSwitch] = None) {
replicationControl: Map[String, ReplicationStreamControl]) {
def nextSequenceNr(): RunningState[State] =
copy(seqNr = seqNr + 1)
@ -138,6 +138,9 @@ private[akka] object Running {
val source = RestartSource.withBackoff(2.seconds, 10.seconds, randomFactor = 0.2) { () =>
replication
.eventsByPersistenceId(pid.id, seqNr + 1, Long.MaxValue)
// from each replica, only get the events that originated there, this prevents most of the event filtering
// the downside is that events can't be received via other replicas in the event of an uneven network partition
.filter(_.eventMetadata.get.asInstanceOf[ReplicatedEventMetaData].originReplica == replicaId)
.viaMat(new FastForwardingFilter)(Keep.right)
.mapMaterializedValue(streamControl => controlRef.set(streamControl))
.via(ActorFlow.ask[EventEnvelope, ReplicatedEventEnvelope[E], ReplicatedEventAck.type](ref) {
@ -145,7 +148,11 @@ private[akka] object Running {
// Need to handle this not being available migration from non-active-active is supported
val meta = eventEnvelope.eventMetadata.get.asInstanceOf[ReplicatedEventMetaData]
val re =
ReplicatedEvent[E](eventEnvelope.event.asInstanceOf[E], meta.originReplica, meta.originSequenceNr)
ReplicatedEvent[E](
eventEnvelope.event.asInstanceOf[E],
meta.originReplica,
meta.originSequenceNr,
meta.version)
ReplicatedEventEnvelope(re, replyTo)
})
}
@ -249,18 +256,17 @@ private[akka] object Running {
envelope)
envelope.ack ! ReplicatedEventAck
if (envelope.event.originReplica != activeActive.replicaId && !alreadySeen(envelope.event)) {
activeActive.setContext(false, envelope.event.originReplica)
setup.log.debug(
"Saving event [{}] from [{}] as first time",
envelope.event.originSequenceNr,
envelope.event.originReplica)
handleExternalReplicatedEventPersist(envelope.event)
handleExternalReplicatedEventPersist(activeActive, envelope.event)
} else {
setup.log.debug(
"Filtering event [{}] from [{}] as it was already seen",
envelope.event.originSequenceNr,
envelope.event.originReplica)
this
tryUnstashOne(this)
}
}
@ -272,12 +278,12 @@ private[akka] object Running {
this
case Some(activeActive) =>
event.replicaId match {
event.replicatedMetaData match {
case None =>
setup.log.warn("Received published event for [{}] but with no replica id, dropping")
setup.log.warn("Received published event for [{}] but with no replicated metadata, dropping")
this
case Some(replicaId) =>
onPublishedEvent(state, activeActive, replicaId, event)
case Some(replicatedEventMetaData) =>
onPublishedEvent(state, activeActive, replicatedEventMetaData, event)
}
}
tryUnstashOne(newBehavior)
@ -286,11 +292,12 @@ private[akka] object Running {
private def onPublishedEvent(
state: Running.RunningState[S],
activeActive: ActiveActive,
originReplicaId: String,
replicatedMetadata: ReplicatedPublishedEventMetaData,
event: PublishedEventImpl): Behavior[InternalProtocol] = {
val log = setup.log
val separatorIndex = event.persistenceId.id.indexOf(PersistenceId.DefaultSeparator)
val idPrefix = event.persistenceId.id.substring(0, separatorIndex)
val originReplicaId = replicatedMetadata.replicaId
if (!setup.persistenceId.id.startsWith(idPrefix)) {
log.warn("Ignoring published replicated event for the wrong actor [{}]", event.persistenceId)
this
@ -326,7 +333,7 @@ private[akka] object Running {
"Ignoring published replicated event with replication seqNr [{}] from replica [{}] " +
"because expected replication seqNr was [{}] ",
event.sequenceNumber,
event.replicaId,
originReplicaId,
expectedSequenceNumber)
}
this
@ -343,7 +350,12 @@ private[akka] object Running {
state.replicationControl.get(originReplicaId).foreach(_.fastForward(event.sequenceNumber))
handleExternalReplicatedEventPersist(
ReplicatedEvent(event.event.asInstanceOf[E], originReplicaId, event.sequenceNumber))
activeActive,
ReplicatedEvent(
event.event.asInstanceOf[E],
originReplicaId,
event.sequenceNumber,
replicatedMetadata.version))
}
}
@ -355,8 +367,22 @@ private[akka] object Running {
this
}
private def handleExternalReplicatedEventPersist(event: ReplicatedEvent[E]): Behavior[InternalProtocol] = {
private def handleExternalReplicatedEventPersist(
activeActive: ActiveActive,
event: ReplicatedEvent[E]): Behavior[InternalProtocol] = {
_currentSequenceNumber = state.seqNr + 1
val isConcurrent: Boolean = event.originVersion <> state.version
val updatedVersion = event.originVersion.merge(state.version)
activeActive.setContext(false, event.originReplica, isConcurrent)
setup.log.debugN(
"Processing event [{}] with version [{}]. Local version: {}. Updated version {}. Concurrent? {}",
event.event,
event.originVersion,
state.version,
updatedVersion,
isConcurrent)
val newState: RunningState[S] = state.applyEvent(setup, event.event)
val newState2: RunningState[S] = internalPersist(
setup.context,
@ -364,12 +390,13 @@ private[akka] object Running {
newState,
event.event,
"",
OptionVal.Some(ReplicatedEventMetaData(event.originReplica, event.originSequenceNr)))
OptionVal.Some(
ReplicatedEventMetaData(event.originReplica, event.originSequenceNr, updatedVersion, isConcurrent)))
val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event.event, newState2.seqNr)
// FIXME validate this is the correct sequence nr from that replica https://github.com/akka/akka/issues/29259
val updatedSeen = newState2.seenPerReplica.updated(event.originReplica, event.originSequenceNr)
persistingEvents(
newState2.copy(seenPerReplica = updatedSeen),
newState2.copy(seenPerReplica = updatedSeen, version = updatedVersion),
state,
numberOfEvents = 1,
shouldSnapshotAfterPersist,
@ -386,8 +413,10 @@ private[akka] object Running {
_currentSequenceNumber = state.seqNr + 1
setup.activeActive.foreach { aa =>
aa.setContext(recoveryRunning = false, aa.replicaId)
// set concurrent to false, local events are never concurrent
aa.setContext(recoveryRunning = false, aa.replicaId, concurrent = false)
}
val newState: RunningState[S] = state.applyEvent(setup, event)
val eventToPersist = adaptEvent(event)
@ -395,13 +424,20 @@ private[akka] object Running {
val newState2 = setup.activeActive match {
case Some(aa) =>
internalPersist(
val updatedVersion = newState.version.updated(aa.replicaId, _currentSequenceNumber)
val r = internalPersist(
setup.context,
cmd,
newState,
eventToPersist,
eventAdapterManifest,
OptionVal.Some(ReplicatedEventMetaData(aa.replicaId, _currentSequenceNumber)))
OptionVal.Some(
ReplicatedEventMetaData(aa.replicaId, _currentSequenceNumber, updatedVersion, concurrent = false)))
.copy(version = updatedVersion)
setup.log.debug("Event persisted [{}]. Version vector after: [{}]", eventToPersist, r.version)
r
case None =>
internalPersist(setup.context, cmd, newState, eventToPersist, eventAdapterManifest, OptionVal.None)
}
@ -422,14 +458,15 @@ private[akka] object Running {
val metadataTemplate: Option[ReplicatedEventMetaData] = setup.activeActive match {
case Some(aa) =>
aa.setContext(recoveryRunning = false, aa.replicaId)
Some(ReplicatedEventMetaData(aa.replicaId, 0L)) // we replace it with actual seqnr later
aa.setContext(recoveryRunning = false, aa.replicaId, concurrent = false) // local events are never concurrent
Some(ReplicatedEventMetaData(aa.replicaId, 0L, state.version, concurrent = false)) // we replace it with actual seqnr later
case None => None
}
var currentState = state
var shouldSnapshotAfterPersist: SnapshotAfterPersist = NoSnapshot
var eventsToPersist: List[EventToPersist] = Nil
events.foreach { event =>
_currentSequenceNumber += 1
if (shouldSnapshotAfterPersist == NoSnapshot)
@ -437,14 +474,19 @@ private[akka] object Running {
val evtManifest = setup.eventAdapter.manifest(event)
val adaptedEvent = adaptEvent(event)
val eventMetadata = metadataTemplate match {
case Some(template) => Some(template.copy(originSequenceNr = _currentSequenceNumber))
case None => None
case Some(template) =>
val updatedVersion = currentState.version.updated(template.originReplica, _currentSequenceNumber)
setup.log.trace("Processing event [{}] with version vector [{}]", event, updatedVersion)
currentState = currentState.copy(version = updatedVersion)
Some(template.copy(originSequenceNr = _currentSequenceNumber, version = updatedVersion))
case None => None
}
currentState = currentState.applyEvent(setup, event)
eventsToPersist = EventToPersist(adaptedEvent, evtManifest, eventMetadata) :: eventsToPersist
}
val newState2 = internalPersistAll(setup.context, cmd, currentState, eventsToPersist.reverse)
val newState2 =
internalPersistAll(setup.context, cmd, currentState, eventsToPersist.reverse)
(persistingEvents(newState2, state, events.size, shouldSnapshotAfterPersist, sideEffects), false)
} else {
@ -585,8 +627,9 @@ private[akka] object Running {
onWriteSuccess(setup.context, p)
if (setup.publishEvents) {
val meta = setup.activeActive.map(aa => new ReplicatedPublishedEventMetaData(aa.replicaId, state.version))
context.system.eventStream ! EventStream.Publish(
PublishedEventImpl(setup.replicaId, setup.persistenceId, p.sequenceNr, p.payload, p.timestamp))
PublishedEventImpl(setup.persistenceId, p.sequenceNr, p.payload, p.timestamp, meta))
}
// only once all things are applied we can revert back

View file

@ -0,0 +1,323 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.internal
import scala.annotation.tailrec
import scala.collection.immutable.TreeMap
import akka.annotation.InternalApi
/**
* INTERNAL API
*
* VersionVector module with helper classes and methods.
*/
@InternalApi
object VersionVector {
private val emptyVersions: TreeMap[String, Long] = TreeMap.empty
val empty: VersionVector = ManyVersionVector(emptyVersions)
def apply(): VersionVector = empty
def apply(versions: TreeMap[String, Long]): VersionVector =
if (versions.isEmpty) empty
else if (versions.size == 1) apply(versions.head._1, versions.head._2)
else ManyVersionVector(versions)
def apply(key: String, version: Long): VersionVector = OneVersionVector(key, version)
/** INTERNAL API */
@InternalApi private[akka] def apply(versions: List[(String, Long)]): VersionVector =
if (versions.isEmpty) empty
else if (versions.tail.isEmpty) apply(versions.head._1, versions.head._2)
else apply(emptyVersions ++ versions)
sealed trait Ordering
case object After extends Ordering
case object Before extends Ordering
case object Same extends Ordering
case object Concurrent extends Ordering
/**
* Marker to ensure that we do a full order comparison instead of bailing out early.
*/
private case object FullOrder extends Ordering
/** INTERNAL API */
@InternalApi private[akka] object Timestamp {
final val Zero = 0L
final val EndMarker = Long.MinValue
}
/**
* Marker to signal that we have reached the end of a version vector.
*/
private val cmpEndMarker = (null, Timestamp.EndMarker)
}
/**
* INTERNAL API
*
* Representation of a Vector-based clock (counting clock), inspired by Lamport logical clocks.
* {{{
* Reference:
* 1) Leslie Lamport (1978). "Time, clocks, and the ordering of events in a distributed system". Communications of the ACM 21 (7): 558-565.
* 2) Friedemann Mattern (1988). "Virtual Time and Global States of Distributed Systems". Workshop on Parallel and Distributed Algorithms: pp. 215-226
* }}}
*
* Based on `akka.cluster.ddata.VersionVector`.
*
* This class is immutable, i.e. "modifying" methods return a new instance.
*/
@SerialVersionUID(1L)
@InternalApi
sealed abstract class VersionVector extends Serializable {
type T = VersionVector
import VersionVector._
/**
* Increment the version for the key passed as argument. Returns a new VersionVector.
*/
def +(key: String): VersionVector = increment(key)
/**
* Increment the version for the key passed as argument. Returns a new VersionVector.
*/
def increment(key: String): VersionVector
def updated(key: String, version: Long): VersionVector
def isEmpty: Boolean
/**
* INTERNAL API
*/
@InternalApi private[akka] def size: Int
def versionAt(key: String): Long
/**
* INTERNAL API
*/
@InternalApi private[akka] def contains(key: String): Boolean
/**
* Returns true if <code>this</code> and <code>that</code> are concurrent else false.
*/
def <>(that: VersionVector): Boolean = compareOnlyTo(that, Concurrent) eq Concurrent
/**
* Returns true if <code>this</code> is before <code>that</code> else false.
*/
def <(that: VersionVector): Boolean = compareOnlyTo(that, Before) eq Before
/**
* Returns true if <code>this</code> is after <code>that</code> else false.
*/
def >(that: VersionVector): Boolean = compareOnlyTo(that, After) eq After
/**
* Returns true if this VersionVector has the same history as the 'that' VersionVector else false.
*/
def ==(that: VersionVector): Boolean = compareOnlyTo(that, Same) eq Same
/**
* Version vector comparison according to the semantics described by compareTo, with the ability to bail
* out early if the we can't reach the Ordering that we are looking for.
*
* The ordering always starts with Same and can then go to Same, Before or After
* If we're on After we can only go to After or Concurrent
* If we're on Before we can only go to Before or Concurrent
* If we go to Concurrent we exit the loop immediately
*
* If you send in the ordering FullOrder, you will get a full comparison.
*/
private final def compareOnlyTo(that: VersionVector, order: Ordering): Ordering = {
def nextOrElse[A](iter: Iterator[A], default: A): A = if (iter.hasNext) iter.next() else default
def compare(i1: Iterator[(String, Long)], i2: Iterator[(String, Long)], requestedOrder: Ordering): Ordering = {
@tailrec
def compareNext(nt1: (String, Long), nt2: (String, Long), currentOrder: Ordering): Ordering =
if ((requestedOrder ne FullOrder) && (currentOrder ne Same) && (currentOrder ne requestedOrder)) currentOrder
else if ((nt1 eq cmpEndMarker) && (nt2 eq cmpEndMarker)) currentOrder
// i1 is empty but i2 is not, so i1 can only be Before
else if (nt1 eq cmpEndMarker) {
if (currentOrder eq After) Concurrent else Before
}
// i2 is empty but i1 is not, so i1 can only be After
else if (nt2 eq cmpEndMarker) {
if (currentOrder eq Before) Concurrent else After
} else {
// compare the entries
val nc = nt1._1.compareTo(nt2._1)
if (nc == 0) {
// both entries exist compare the timestamps
// same timestamp so just continue with the next entry
if (nt1._2 == nt2._2) compareNext(nextOrElse(i1, cmpEndMarker), nextOrElse(i2, cmpEndMarker), currentOrder)
else if (nt1._2 < nt2._2) {
// t1 is less than t2, so i1 can only be Before
if (currentOrder eq After) Concurrent
else compareNext(nextOrElse(i1, cmpEndMarker), nextOrElse(i2, cmpEndMarker), Before)
} else {
// t2 is less than t1, so i1 can only be After
if (currentOrder eq Before) Concurrent
else compareNext(nextOrElse(i1, cmpEndMarker), nextOrElse(i2, cmpEndMarker), After)
}
} else if (nc < 0) {
// this entry only exists in i1 so i1 can only be After
if (currentOrder eq Before) Concurrent
else compareNext(nextOrElse(i1, cmpEndMarker), nt2, After)
} else {
// this entry only exists in i2 so i1 can only be Before
if (currentOrder eq After) Concurrent
else compareNext(nt1, nextOrElse(i2, cmpEndMarker), Before)
}
}
compareNext(nextOrElse(i1, cmpEndMarker), nextOrElse(i2, cmpEndMarker), Same)
}
if (this eq that) Same
else compare(this.versionsIterator, that.versionsIterator, if (order eq Concurrent) FullOrder else order)
}
/**
* INTERNAL API
*/
@InternalApi private[akka] def versionsIterator: Iterator[(String, Long)]
/**
* Compare two version vectors. The outcome will be one of the following:
* <p/>
* {{{
* 1. Version 1 is SAME (==) as Version 2 iff for all i c1(i) == c2(i)
* 2. Version 1 is BEFORE (<) Version 2 iff for all i c1(i) <= c2(i) and there exist a j such that c1(j) < c2(j)
* 3. Version 1 is AFTER (>) Version 2 iff for all i c1(i) >= c2(i) and there exist a j such that c1(j) > c2(j).
* 4. Version 1 is CONCURRENT (<>) to Version 2 otherwise.
* }}}
*/
def compareTo(that: VersionVector): Ordering = {
compareOnlyTo(that, FullOrder)
}
def merge(that: VersionVector): VersionVector
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class OneVersionVector private[akka] (key: String, version: Long)
extends VersionVector {
import VersionVector.Timestamp
override def isEmpty: Boolean = false
/** INTERNAL API */
@InternalApi private[akka] override def size: Int = 1
override def increment(k: String): VersionVector = {
val v = version + 1
if (k == key) copy(version = v)
else ManyVersionVector(TreeMap(key -> version, k -> v))
}
override def updated(k: String, v: Long): VersionVector = {
if (k == key) copy(version = v)
else ManyVersionVector(TreeMap(key -> version, k -> v))
}
override def versionAt(k: String): Long =
if (k == key) version
else Timestamp.Zero
/** INTERNAL API */
@InternalApi private[akka] override def contains(k: String): Boolean =
k == key
/** INTERNAL API */
@InternalApi private[akka] override def versionsIterator: Iterator[(String, Long)] =
Iterator.single((key, version))
override def merge(that: VersionVector): VersionVector = {
that match {
case OneVersionVector(n2, v2) =>
if (key == n2) if (version >= v2) this else OneVersionVector(n2, v2)
else ManyVersionVector(TreeMap(key -> version, n2 -> v2))
case ManyVersionVector(vs2) =>
val v2 = vs2.getOrElse(key, Timestamp.Zero)
val mergedVersions =
if (v2 >= version) vs2
else vs2.updated(key, version)
VersionVector(mergedVersions)
}
}
override def toString: String =
s"VersionVector($key -> $version)"
}
// TODO we could add more specialized/optimized implementations for 2 and 3 entries, because
// that will be the typical number of data centers
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class ManyVersionVector(versions: TreeMap[String, Long]) extends VersionVector {
import VersionVector.Timestamp
override def isEmpty: Boolean = versions.isEmpty
/** INTERNAL API */
@InternalApi private[akka] override def size: Int = versions.size
override def increment(key: String): VersionVector = {
val v = versionAt(key) + 1
VersionVector(versions.updated(key, v))
}
override def updated(key: String, v: Long): VersionVector =
VersionVector(versions.updated(key, v))
override def versionAt(key: String): Long = versions.get(key) match {
case Some(v) => v
case None => Timestamp.Zero
}
/** INTERNAL API */
@InternalApi private[akka] override def contains(key: String): Boolean =
versions.contains(key)
/** INTERNAL API */
@InternalApi private[akka] override def versionsIterator: Iterator[(String, Long)] =
versions.iterator
override def merge(that: VersionVector): VersionVector = {
if (that.isEmpty) this
else if (this.isEmpty) that
else
that match {
case ManyVersionVector(vs2) =>
var mergedVersions = vs2
for ((key, time) <- versions) {
val mergedVersionsCurrentTime = mergedVersions.getOrElse(key, Timestamp.Zero)
if (time > mergedVersionsCurrentTime)
mergedVersions = mergedVersions.updated(key, time)
}
VersionVector(mergedVersions)
case OneVersionVector(n2, v2) =>
val v1 = versions.getOrElse(n2, Timestamp.Zero)
val mergedVersions =
if (v1 >= v2) versions
else versions.updated(n2, v2)
VersionVector(mergedVersions)
}
}
override def toString: String =
versions.map { case (k, v) => k + " -> " + v }.mkString("VersionVector(", ", ", ")")
}

View file

@ -56,6 +56,7 @@ private[akka] class ActiveActiveContextImpl(val entityId: String, val replicaId:
extends ActiveActiveContext {
var _origin: String = null
var _recoveryRunning: Boolean = false
var _concurrent: Boolean = false
// FIXME check illegal access https://github.com/akka/akka/issues/29264
@ -69,7 +70,7 @@ private[akka] class ActiveActiveContextImpl(val entityId: String, val replicaId:
* Whether the happened concurrently with an event from another replica.
* Undefined result if called from any where other than an event handler.
*/
override def concurrent: Boolean = throw new UnsupportedOperationException("TODO")
override def concurrent: Boolean = _concurrent
override def persistenceId: PersistenceId = PersistenceId.replicatedUniqueId(entityId, replicaId)

View file

@ -33,8 +33,9 @@ object EventSourcedBehavior {
/**
* Must only be called on the same thread that will execute the user code
*/
def setContext(recoveryRunning: Boolean, originReplica: String): Unit = {
def setContext(recoveryRunning: Boolean, originReplica: String, concurrent: Boolean): Unit = {
aaContext._recoveryRunning = recoveryRunning
aaContext._concurrent = concurrent
aaContext._origin = originReplica
}