Speculative replication - step 1 (#29289)

* Step 1: general event-publishing-to-topic feature

* Step 2: an actor subscribes to the topic and forwards events to the sharded replicas

* Another half piece of the puzzle, receive the PublishedEvent in the ESB internals

* Stash published events while replaying rather than drop

* Publish on the event stream instead of a topic

* Active active actor receiving a published event

* Some smaller changes

 * Public API for published event
 * Better name for the sharding component

* Naive test for the Active Active Sharding Replication

* Java API for ActiveActiveShardingReplication

* Spelling

* Use ShardingEnvelope for publishing the event across sharding

* Fast forwarding filter stage

* Move test to testkit, enable the see-event-twice test (fails)

* Use persistence testkit journal

* Various smaller review feedback things

* Trying to figure out why duplicate event write test fails

* Missing unstash after processing published event

Co-authored-by: Christopher Batey <christopher.batey@gmail.com>
This commit is contained in:
Johan Andrén 2020-06-29 08:06:59 +02:00 committed by Christopher Batey
parent e98f1311f3
commit c44302bd1e
18 changed files with 709 additions and 29 deletions

View file

@ -0,0 +1,90 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed
import akka.Done
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.persistence.typed.PublishedEvent
import scala.collection.JavaConverters._
/**
* Used when sharding Active Active entities in multiple instances of sharding, for example one per DC in a Multi DC
* Akka Cluster.
*
* This actor should be started once on each node where Active Active entities will run (the same nodes that you start
* sharding on).
*
* Subscribes to locally written events through the event stream and sends the seen events to all the sharded replicas
* which can then fast forward their cross-replica event streams to improve latency while allowing less frequent poll
* for the cross replica queries. Note that since message delivery is at-most-once this can not be the only
* channel for replica events - the entities must still tail events from the journals of other replicas.
*
* The events are forwarded as [[akka.cluster.sharding.typed.ShardingEnvelope]] this will work out of the box both
* by default and with a custom extractor since the envelopes are handled internally.
*/
@ApiMayChange
object ActiveActiveShardingDirectReplication {
/**
* Not for user extension
*/
@DoNotInherit
sealed trait Command
/**
* INTERNAL API
*/
@InternalApi
private[akka] case class VerifyStarted(replyTo: ActorRef[Done]) extends Command
/**
* Java API:
* @param selfReplica The replica id of the replica that runs on this node
* @param replicaShardingProxies A replica id to sharding proxy mapping for each replica in the system
*/
def create[T](selfReplica: String, replicaShardingProxies: java.util.Map[String, ActorRef[T]]): Behavior[Command] =
apply(selfReplica, replicaShardingProxies.asScala.toMap)
/**
* Scala API:
* @param selfReplica The replica id of the replica that runs on this node
* @param replicaShardingProxies A replica id to sharding proxy mapping for each replica in the system
*/
def apply[T](selfReplica: String, replicaShardingProxies: Map[String, ActorRef[T]]): Behavior[Command] =
Behaviors
.setup[Any] { context =>
context.log.debug(
"Subscribing to event stream to forward events to [{}] sharded replicas",
replicaShardingProxies.size - 1)
context.system.eventStream ! EventStream.Subscribe[PublishedEvent](context.self)
Behaviors.receiveMessagePartial {
case event: PublishedEvent =>
context.log.trace(
"Forwarding event for persistence id [{}] sequence nr [{}] to replicas",
event.persistenceId,
event.sequenceNumber)
replicaShardingProxies.foreach {
case (replica, proxy) =>
val envelopedEvent = ShardingEnvelope(event.persistenceId.id, event)
if (replica != selfReplica)
proxy.asInstanceOf[ActorRef[ShardingEnvelope[PublishedEvent]]] ! envelopedEvent
}
Behaviors.same
case VerifyStarted(replyTo) =>
replyTo ! Done
Behaviors.same
}
}
.narrow[Command]
}

View file

@ -12,7 +12,7 @@ abstract class Murmur2NoEnvelopeMessageExtractor[M](val numberOfShards: Int) ext
}
/**
* The murmur2 message extractor uses the same algorithm as the default kafka partitoiner
* The murmur2 message extractor uses the same algorithm as the default kafka partitioner
* allowing kafka partitions to be mapped to shards.
* This can be used with the [[akka.cluster.sharding.external.ExternalShardAllocationStrategy]] to have messages
* processed locally.

View file

@ -0,0 +1,52 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed
import akka.Done
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.eventstream.EventStream
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.internal.PublishedEventImpl
import org.scalatest.wordspec.AnyWordSpecLike
class ActiveActiveShardingDirectReplicationSpec
extends ScalaTestWithActorTestKit
with AnyWordSpecLike
with LogCapturing {
"Active active sharding replication" must {
"replicate published events to all sharding proxies" in {
val replicaAProbe = createTestProbe[Any]()
val replicaBProbe = createTestProbe[Any]()
val replicaCProbe = createTestProbe[Any]()
val replicationActor = spawn(
ActiveActiveShardingDirectReplication(
"ReplicaA",
replicaShardingProxies =
Map("ReplicaA" -> replicaAProbe.ref, "ReplicaB" -> replicaBProbe.ref, "ReplicaC" -> replicaCProbe.ref)))
val upProbe = createTestProbe[Done]()
replicationActor ! ActiveActiveShardingDirectReplication.VerifyStarted(upProbe.ref)
upProbe.receiveMessage() // not bullet proof wrt to subscription being complete but good enough
val event = PublishedEventImpl(
Some("ReplicaA"),
PersistenceId.replicatedUniqueId("pid", "ReplicaA"),
1L,
"event",
System.currentTimeMillis())
system.eventStream ! EventStream.Publish(event)
replicaBProbe.expectMessageType[ShardingEnvelope[_]].message should equal(event)
replicaCProbe.expectMessageType[ShardingEnvelope[_]].message should equal(event)
replicaAProbe.expectNoMessage() // no publishing to the replica emitting it
}
}
}

View file

@ -5,7 +5,7 @@
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} [%-5level] [%logger] [%marker] [%X{akkaSource}] [%X{persistenceId}] - %msg %n</pattern>
<pattern>%date{ISO8601} [%-5level] [%logger] [%marker] [%X{akkaSource}] [%X{persistencePhase}] [%X{persistenceId}] - %msg %n</pattern>
</encoder>
</appender>
@ -25,7 +25,7 @@
<appender-ref ref="STDOUT"/>
</logger>
<root level="DEBUG">
<root level="TRACE">
<appender-ref ref="CapturingAppender"/>
</root>

View file

@ -0,0 +1,150 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed
import akka.Done
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.scaladsl.ActiveActiveEventSourcing
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import org.scalatest.wordspec.AnyWordSpecLike
object ActiveActiveEventPublishingSpec {
object MyActiveActive {
trait Command
case class Add(text: String, replyTo: ActorRef[Done]) extends Command
case class Get(replyTo: ActorRef[Set[String]]) extends Command
def apply(entityId: String, replicaId: String, allReplicas: Set[String]): Behavior[Command] =
Behaviors.setup { ctx =>
ActiveActiveEventSourcing(entityId, replicaId, allReplicas, PersistenceTestKitReadJournal.Identifier)(
aactx =>
EventSourcedBehavior[Command, String, Set[String]](
aactx.persistenceId,
Set.empty,
(state, command) =>
command match {
case Add(string, replyTo) =>
ctx.log.debug("Persisting [{}]", string)
Effect.persist(string).thenRun { _ =>
ctx.log.debug("Ack:ing [{}]", string)
replyTo ! Done
}
case Get(replyTo) =>
replyTo ! state
Effect.none
},
(state, string) => state + string))
}
}
}
class ActiveActiveEventPublishingSpec
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config)
with AnyWordSpecLike
with LogCapturing {
import ActiveActiveEventPublishingSpec._
"An active active actor" must {
"move forward when a published event from a replica is received" in {
val actor = spawn(MyActiveActive("myId1", "DC-A", Set("DC-A", "DC-B")))
val probe = createTestProbe[Any]()
actor ! MyActiveActive.Add("one", probe.ref)
probe.expectMessage(Done)
// simulate a published event from another replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
Some("DC-B"),
PersistenceId.replicatedUniqueId("myId1", "DC-B"),
1L,
"two",
System.currentTimeMillis())
actor ! MyActiveActive.Add("three", probe.ref)
probe.expectMessage(Done)
actor ! MyActiveActive.Get(probe.ref)
probe.expectMessage(Set("one", "two", "three"))
}
"ignore a published event from a replica is received but the sequence number is unexpected" in {
val actor = spawn(MyActiveActive("myId2", "DC-A", Set("DC-A", "DC-B")))
val probe = createTestProbe[Any]()
actor ! MyActiveActive.Add("one", probe.ref)
probe.expectMessage(Done)
// simulate a published event from another replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
Some("DC-B"),
PersistenceId.replicatedUniqueId("myId2", "DC-B"),
2L, // missing 1L
"two",
System.currentTimeMillis())
actor ! MyActiveActive.Add("three", probe.ref)
probe.expectMessage(Done)
actor ! MyActiveActive.Get(probe.ref)
probe.expectMessage(Set("one", "three"))
}
"ignore a published event from an unknown replica" in {
val actor = spawn(MyActiveActive("myId3", "DC-A", Set("DC-A", "DC-B")))
val probe = createTestProbe[Any]()
actor ! MyActiveActive.Add("one", probe.ref)
probe.expectMessage(Done)
// simulate a published event from another replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
Some("DC-C"),
PersistenceId.replicatedUniqueId("myId3", "DC-C"),
1L,
"two",
System.currentTimeMillis())
actor ! MyActiveActive.Add("three", probe.ref)
probe.expectMessage(Done)
actor ! MyActiveActive.Get(probe.ref)
probe.expectMessage(Set("one", "three"))
}
"ignore an already seen event from a replica" in {
val actor = spawn(MyActiveActive("myId4", "DC-A", Set("DC-A", "DC-B")))
val probe = createTestProbe[Any]()
actor ! MyActiveActive.Add("one", probe.ref)
probe.expectMessage(Done)
// simulate a published event from another replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
Some("DC-B"),
PersistenceId.replicatedUniqueId("myId4", "DC-B"),
1L,
"two",
System.currentTimeMillis())
// simulate another published event from that replica
actor.asInstanceOf[ActorRef[Any]] ! internal.PublishedEventImpl(
Some("DC-B"),
PersistenceId.replicatedUniqueId("myId4", "DC-B"),
1L,
"two-again", // ofc this would be the same in the real world, different just so we can detect
System.currentTimeMillis())
actor ! MyActiveActive.Add("three", probe.ref)
probe.expectMessage(Done)
actor ! MyActiveActive.Get(probe.ref)
probe.expectMessage(Set("one", "two", "three"))
}
}
}

View file

@ -0,0 +1,84 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed
import akka.Done
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.eventstream.EventStream
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.serialization.jackson.CborSerializable
import org.scalatest.wordspec.AnyWordSpecLike
object EventPublishingSpec {
object WowSuchEventSourcingBehavior {
sealed trait Command
case class StoreThis(data: String, tagIt: Boolean, replyTo: ActorRef[Done]) extends Command
final case class Event(data: String, tagIt: Boolean) extends CborSerializable
def apply(id: PersistenceId): Behavior[Command] =
EventSourcedBehavior[Command, Event, Set[Event]](
id,
Set.empty,
(_, command) =>
command match {
case StoreThis(data, tagIt, replyTo) =>
Effect.persist(Event(data, tagIt)).thenRun(_ => replyTo ! Done)
},
(state, event) => state + event)
.withTagger(evt => if (evt.tagIt) Set("tag") else Set.empty)
.withEventPublishing()
}
}
class EventPublishingSpec
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config)
with AnyWordSpecLike
with LogCapturing {
import EventPublishingSpec._
"EventPublishing support" must {
"publish events after written for any actor" in {
val topicProbe = createTestProbe[Any]()
system.eventStream ! EventStream.Subscribe[PublishedEvent](topicProbe.ref.narrow)
// We don't verify subscription completed (no ack available), but expect the next steps to take enough time
// for subscription to complete
val myId = PersistenceId.ofUniqueId("myId")
val wowSuchActor = spawn(WowSuchEventSourcingBehavior(myId))
val persistProbe = createTestProbe[Any]()
wowSuchActor ! WowSuchEventSourcingBehavior.StoreThis("great stuff", tagIt = false, replyTo = persistProbe.ref)
persistProbe.expectMessage(Done)
val published1 = topicProbe.expectMessageType[PublishedEvent]
published1.persistenceId should ===(myId)
published1.event should ===(WowSuchEventSourcingBehavior.Event("great stuff", false))
published1.sequenceNumber should ===(1L)
published1.tags should ===(Set.empty)
val anotherId = PersistenceId.ofUniqueId("anotherId")
val anotherActor = spawn(WowSuchEventSourcingBehavior(anotherId))
anotherActor ! WowSuchEventSourcingBehavior.StoreThis("another event", tagIt = true, replyTo = persistProbe.ref)
persistProbe.expectMessage(Done)
val published2 = topicProbe.expectMessageType[PublishedEvent]
published2.persistenceId should ===(anotherId)
published2.event should ===(WowSuchEventSourcingBehavior.Event("another event", true))
published2.sequenceNumber should ===(1L)
published2.tags should ===(Set("tag"))
}
}
}

View file

@ -0,0 +1,31 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed
import java.util.Optional
import akka.annotation.DoNotInherit
/**
* When using event publishing the events published to the system event stream will be in this form.
*
* Not for user extension
*/
@DoNotInherit
trait PublishedEvent {
/** Scala API: When emitted from an Active Active actor this will contain the replica id */
def replicaId: Option[String]
/** Java API: When emitted from an Active Active actor this will contain the replica id */
def getReplicaId: Optional[String]
def persistenceId: PersistenceId
def sequenceNumber: Long
/** User event */
def event: Any
def timestamp: Long
def tags: Set[String]
}

View file

@ -7,7 +7,7 @@ package akka.persistence.typed.internal
import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal
import org.slf4j.{ Logger, MDC }
import akka.actor.{ ActorRef, Cancellable }
import akka.actor.{ Cancellable, ActorRef => ClassicActorRef }
import akka.actor.typed.Signal
import akka.actor.typed.scaladsl.ActorContext
import akka.annotation.InternalApi
@ -48,7 +48,8 @@ private[akka] final class BehaviorSetup[C, E, S](
var holdingRecoveryPermit: Boolean,
val settings: EventSourcedSettings,
val stashState: StashState,
val activeActive: Option[ActiveActive]) {
val activeActive: Option[ActiveActive],
val publishEvents: Boolean) {
import BehaviorSetup._
import InternalProtocol.RecoveryTickEvent
@ -57,10 +58,12 @@ private[akka] final class BehaviorSetup[C, E, S](
val persistence: Persistence = Persistence(context.system.toClassic)
val journal: ActorRef = persistence.journalFor(settings.journalPluginId)
val snapshotStore: ActorRef = persistence.snapshotStoreFor(settings.snapshotPluginId)
val journal: ClassicActorRef = persistence.journalFor(settings.journalPluginId)
val snapshotStore: ClassicActorRef = persistence.snapshotStoreFor(settings.snapshotPluginId)
def selfClassic: ActorRef = context.self.toClassic
val replicaId: Option[String] = activeActive.map(_.replicaId)
def selfClassic: ClassicActorRef = context.self.toClassic
private var mdcPhase = PersistenceMdc.Initializing
def log: Logger = {

View file

@ -4,6 +4,7 @@
package akka.persistence.typed.internal
import java.util.Optional
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
@ -24,6 +25,7 @@ import akka.persistence.JournalProtocol
import akka.persistence.Recovery
import akka.persistence.RecoveryPermitter
import akka.persistence.SnapshotProtocol
import akka.persistence.journal.Tagged
import akka.persistence.typed.DeleteEventsCompleted
import akka.persistence.typed.DeleteEventsFailed
import akka.persistence.typed.DeleteSnapshotsCompleted
@ -32,6 +34,7 @@ import akka.persistence.typed.DeletionTarget
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.NoOpEventAdapter
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.PublishedEvent
import akka.persistence.typed.SnapshotAdapter
import akka.persistence.typed.SnapshotCompleted
import akka.persistence.typed.SnapshotFailed
@ -89,7 +92,8 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
retention: RetentionCriteria = RetentionCriteria.disabled,
supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop,
override val signalHandler: PartialFunction[(State, Signal), Unit] = PartialFunction.empty,
activeActive: Option[ActiveActive] = None)
activeActive: Option[ActiveActive] = None,
publishEvents: Boolean = false)
extends EventSourcedBehavior[Command, Event, State] {
import EventSourcedBehaviorImpl.WriterIdentity
@ -153,7 +157,8 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
holdingRecoveryPermit = false,
settings = settings,
stashState = stashState,
activeActive = activeActive)
activeActive = activeActive,
publishEvents = publishEvents)
// needs to accept Any since we also can get messages from the journal
// not part of the user facing Command protocol
@ -241,6 +246,10 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
copy(recovery = recovery.toClassic)
}
override def withEventPublishing(): EventSourcedBehavior[Command, Event, State] = {
copy(publishEvents = true)
}
override private[akka] def withActiveActive(
context: ActiveActiveContextImpl,
id: String,
@ -261,6 +270,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
final case class ReplicatedEventEnvelope[E](event: ReplicatedEvent[E], ack: ActorRef[ReplicatedEventAck.type])
extends InternalProtocol
}
// FIXME serializer
@ -270,3 +280,31 @@ private[akka] final case class ReplicatedEventMetaData(originReplica: String, or
private[akka] final case class ReplicatedEvent[E](event: E, originReplica: String, originSequenceNr: Long)
@InternalApi
private[akka] case object ReplicatedEventAck
/**
* INTERNAL API
*/
@InternalApi
private[akka] final case class PublishedEventImpl(
replicaId: Option[String],
persistenceId: PersistenceId,
sequenceNumber: Long,
payload: Any,
timestamp: Long)
extends PublishedEvent
with InternalProtocol {
import scala.compat.java8.OptionConverters._
override def getReplicaId: Optional[String] = replicaId.asJava
def tags: Set[String] = payload match {
case t: Tagged => t.tags
case _ => Set.empty
}
def event: Any = payload match {
case Tagged(event, _) => event
case _ => payload
}
}

View file

@ -0,0 +1,72 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.internal
import akka.annotation.InternalApi
import akka.persistence.query.EventEnvelope
import akka.stream.Attributes
import akka.stream.FlowShape
import akka.stream.Inlet
import akka.stream.Outlet
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.GraphStageWithMaterializedValue
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
/**
* INTERNAL API
*/
@InternalApi
private[akka] trait ReplicationStreamControl {
def fastForward(sequenceNumber: Long): Unit
}
/**
* INTERNAL API
*/
@InternalApi
private[akka] class FastForwardingFilter
extends GraphStageWithMaterializedValue[FlowShape[EventEnvelope, EventEnvelope], ReplicationStreamControl] {
val in = Inlet[EventEnvelope]("FastForwardingFilter.in")
val out = Outlet[EventEnvelope]("FastForwardingFilter.out")
override val shape = FlowShape[EventEnvelope, EventEnvelope](in, out)
override def createLogicAndMaterializedValue(
inheritedAttributes: Attributes): (GraphStageLogic, ReplicationStreamControl) = {
var replicationStreamControl: ReplicationStreamControl = null
val logic = new GraphStageLogic(shape) with InHandler with OutHandler {
// -1 means not currently fast forwarding
@volatile private var fastForwardTo = -1L
override def onPush(): Unit = {
val eventEnvelope = grab(in)
if (fastForwardTo == -1L)
push(out, eventEnvelope)
else {
if (eventEnvelope.sequenceNr <= fastForwardTo) pull(in)
else {
fastForwardTo = -1L
push(out, eventEnvelope)
}
}
}
override def onPull(): Unit = pull(in)
replicationStreamControl = new ReplicationStreamControl {
override def fastForward(sequenceNumber: Long): Unit = {
require(sequenceNumber > 0) // only the stage may complete a fast forward
fastForwardTo = sequenceNumber
}
}
setHandlers(in, out, this)
}
(logic, replicationStreamControl)
}
}

View file

@ -93,6 +93,7 @@ private[akka] final class ReplayingEvents[C, E, S](
case SnapshotterResponse(r) => onSnapshotterResponse(r)
case RecoveryTickEvent(snap) => onRecoveryTick(snap)
case evt: ReplicatedEventEnvelope[E] => onInternalCommand(evt)
case pe: PublishedEventImpl => onInternalCommand(pe)
case cmd: IncomingCommand[C] => onInternalCommand(cmd)
case get: GetState[S @unchecked] => stashInternal(get)
case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit
@ -259,7 +260,12 @@ private[akka] final class ReplayingEvents[C, E, S](
val running =
Running[C, E, S](
setup,
Running.RunningState[S](state.seqNr, state.state, state.receivedPoisonPill, seenPerReplica))
Running.RunningState[S](
seqNr = state.seqNr,
state = state.state,
receivedPoisonPill = state.receivedPoisonPill,
seenPerReplica = seenPerReplica,
replicationControl = Map.empty))
tryUnstashOne(running)
}

View file

@ -61,6 +61,7 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
case JournalResponse(r) => onJournalResponse(r)
case RecoveryTickEvent(snapshot) => onRecoveryTick(snapshot)
case evt: ReplicatedEventEnvelope[E] => onReplicatedEvent(evt)
case pe: PublishedEventImpl => onPublishedEvent(pe)
case cmd: IncomingCommand[C] =>
if (receivedPoisonPill) {
if (setup.settings.logOnStashing)
@ -127,6 +128,10 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
stashInternal(evt)
}
def onPublishedEvent(event: PublishedEventImpl): Behavior[InternalProtocol] = {
stashInternal(event)
}
def onJournalResponse(response: JournalProtocol.Response): Behavior[InternalProtocol] = {
setup.log.debug(
"Unexpected response from journal: [{}], may be due to an actor restart, ignoring...",

View file

@ -4,13 +4,17 @@
package akka.persistence.typed.internal
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.immutable
import akka.actor.UnhandledMessage
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Signal }
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.{ Behavior, Signal }
import akka.actor.typed.internal.PoisonPill
import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors, LoggerOps }
import akka.annotation.{ InternalApi, InternalStableApi }
import akka.event.Logging
import akka.persistence.DeleteMessagesFailure
import akka.persistence.DeleteMessagesSuccess
import akka.persistence.DeleteSnapshotFailure
@ -44,10 +48,14 @@ import akka.persistence.typed.internal.InternalProtocol.ReplicatedEventEnvelope
import akka.persistence.typed.internal.Running.WithSeqNrAccessible
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior.ActiveActive
import akka.stream.scaladsl.Keep
import akka.stream.{ SharedKillSwitch, SystemMaterializer }
import akka.stream.scaladsl.{ RestartSource, Sink }
import akka.stream.typed.scaladsl.ActorFlow
import akka.util.{ unused, OptionVal, Timeout }
import akka.util.Helpers
import akka.util.OptionVal
import akka.util.unused
import akka.util.Timeout
/**
* INTERNAL API
@ -79,6 +87,7 @@ private[akka] object Running {
state: State,
receivedPoisonPill: Boolean,
seenPerReplica: Map[String, Long],
replicationControl: Map[String, ReplicationStreamControl],
replicationKillSwitch: Option[SharedKillSwitch] = None) {
def nextSequenceNr(): RunningState[State] =
@ -95,31 +104,38 @@ private[akka] object Running {
def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: RunningState[S]): Behavior[InternalProtocol] = {
val running = new Running(setup.setMdcPhase(PersistenceMdc.RunningCmds))
setup.activeActive.foreach(aa => startReplicationStream(setup.context.system, setup.context.self, state, aa))
new running.HandlingCommands(state)
val initialState = setup.activeActive match {
case Some(aa) => startReplicationStream(setup, state, aa)
case None => state
}
new running.HandlingCommands(initialState)
}
def startReplicationStream[E, S](
system: ActorSystem[_],
ref: ActorRef[InternalProtocol],
def startReplicationStream[C, E, S](
setup: BehaviorSetup[C, E, S],
state: RunningState[S],
aa: ActiveActive): Unit = {
aa: ActiveActive): RunningState[S] = {
import scala.concurrent.duration._
val system = setup.context.system
val ref = setup.context.self
val query = PersistenceQuery(system)
aa.allReplicas.foreach { replica =>
if (replica != aa.replicaId) {
val seqNr = state.seenPerReplica(replica)
val pid = PersistenceId.replicatedUniqueId(aa.aaContext.entityId, replica)
aa.allReplicas.foldLeft(state) { (state, replicaId) =>
if (replicaId != aa.replicaId) {
val seqNr = state.seenPerReplica(replicaId)
val pid = PersistenceId.replicatedUniqueId(aa.aaContext.entityId, replicaId)
// FIXME support different configuration per replica https://github.com/akka/akka/issues/29257
val replication = query.readJournalFor[EventsByPersistenceIdQuery](aa.queryPluginId)
implicit val timeout = Timeout(30.seconds)
// FIXME config
val controlRef = new AtomicReference[ReplicationStreamControl]()
val source = RestartSource.withBackoff(2.seconds, 10.seconds, randomFactor = 0.2) { () =>
replication
.eventsByPersistenceId(pid.id, seqNr + 1, Long.MaxValue)
.viaMat(new FastForwardingFilter)(Keep.right)
.mapMaterializedValue(streamControl => controlRef.set(streamControl))
.via(ActorFlow.ask[EventEnvelope, ReplicatedEventEnvelope[E], ReplicatedEventAck.type](ref) {
(eventEnvelope, replyTo) =>
// Need to handle this not being available migration from non-active-active is supported
@ -131,6 +147,30 @@ private[akka] object Running {
}
source.runWith(Sink.ignore)(SystemMaterializer(system).materializer)
// FIXME support from journal to fast forward https://github.com/akka/akka/issues/29311
state.copy(
replicationControl =
state.replicationControl.updated(replicaId, new ReplicationStreamControl {
override def fastForward(sequenceNumber: Long): Unit = {
// (logging is safe here since invoked on message receive
OptionVal(controlRef.get) match {
case OptionVal.Some(control) =>
if (setup.log.isDebugEnabled)
setup.log.debug("Fast forward replica [{}] to [{}]", replicaId, sequenceNumber)
control.fastForward(sequenceNumber)
case OptionVal.None =>
// stream not started yet, ok, fast forward is an optimization
if (setup.log.isDebugEnabled)
setup.log.debug(
"Ignoring fast forward replica [{}] to [{}], stream not started yet",
replicaId,
sequenceNumber)
}
}
}))
} else {
state
}
}
}
@ -163,6 +203,7 @@ private[akka] object Running {
def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match {
case IncomingCommand(c: C @unchecked) => onCommand(state, c)
case re: ReplicatedEventEnvelope[E @unchecked] => onReplicatedEvent(state, re, setup.activeActive.get)
case pe: PublishedEventImpl => onPublishedEvent(state, pe)
case JournalResponse(r) => onDeleteEventsJournalResponse(r, state.state)
case SnapshotterResponse(r) => onDeleteSnapshotResponse(r, state.state)
case get: GetState[S @unchecked] => onGetState(get)
@ -205,6 +246,91 @@ private[akka] object Running {
}
}
def onPublishedEvent(state: Running.RunningState[S], event: PublishedEventImpl): Behavior[InternalProtocol] = {
val newBehavior: Behavior[InternalProtocol] = setup.activeActive match {
case None =>
setup.log
.warn("Received published event for [{}] but not an active active actor, dropping", event.persistenceId)
this
case Some(activeActive) =>
event.replicaId match {
case None =>
setup.log.warn("Received published event for [{}] but with no replica id, dropping")
this
case Some(replicaId) =>
onPublishedEvent(state, activeActive, replicaId, event)
}
}
tryUnstashOne(newBehavior)
}
private def onPublishedEvent(
state: Running.RunningState[S],
activeActive: ActiveActive,
originReplicaId: String,
event: PublishedEventImpl): Behavior[InternalProtocol] = {
val log = setup.log
val separatorIndex = event.persistenceId.id.indexOf(PersistenceId.DefaultSeparator)
val idPrefix = event.persistenceId.id.substring(0, separatorIndex)
if (!setup.persistenceId.id.startsWith(idPrefix)) {
log.warn("Ignoring published replicated event for the wrong actor [{}]", event.persistenceId)
this
} else if (originReplicaId == activeActive.replicaId) {
if (log.isDebugEnabled)
log.debug(
"Ignoring published replicated event with seqNr [{}] from our own replica id [{}]",
event.sequenceNumber,
originReplicaId)
this
} else if (!activeActive.allReplicas.contains(originReplicaId)) {
log.warnN(
"Received published replicated event from replica [{}], which is unknown. Active active must be set up with a list of all replicas (known are [{}]).",
originReplicaId,
activeActive.allReplicas.mkString(", "))
this
} else {
val expectedSequenceNumber = state.seenPerReplica(originReplicaId) + 1
if (expectedSequenceNumber > event.sequenceNumber) {
// already seen
if (log.isDebugEnabled)
log.debugN(
"Ignoring published replicated event with seqNr [{}] from replica [{}] because it was already seen ([{}])",
event.sequenceNumber,
originReplicaId,
expectedSequenceNumber)
this
} else if (expectedSequenceNumber != event.sequenceNumber) {
// gap in sequence numbers (message lost or query and direct replication out of sync, should heal up by itself
// once the query catches up)
if (log.isDebugEnabled) {
log.debugN(
"Ignoring published replicated event with replication seqNr [{}] from replica [{}] " +
"because expected replication seqNr was [{}] ",
event.sequenceNumber,
event.replicaId,
expectedSequenceNumber)
}
this
} else {
if (log.isTraceEnabled)
log.traceN(
"Received published replicated event [{}] with timestamp [{}] from replica [{}] seqNr [{}]",
Logging.simpleName(event.event.getClass),
Helpers.timestamp(event.timestamp),
originReplicaId,
event.sequenceNumber)
// fast forward stream for source replica
state.replicationControl.get(originReplicaId).foreach(_.fastForward(event.sequenceNumber))
handleExternalReplicatedEventPersist(
ReplicatedEvent(event.event.asInstanceOf[E], originReplicaId, event.sequenceNumber))
}
}
}
// Used by EventSourcedBehaviorTestKit to retrieve the state.
def onGetState(get: GetState[S]): Behavior[InternalProtocol] = {
get.replyTo ! state.state
@ -368,6 +494,7 @@ private[akka] object Running {
case JournalResponse(r) => onJournalResponse(r)
case in: IncomingCommand[C @unchecked] => onCommand(in)
case re: ReplicatedEventEnvelope[E @unchecked] => onReplicatedEvent(re)
case pe: PublishedEventImpl => onPublishedEvent(pe)
case get: GetState[S @unchecked] => stashInternal(get)
case SnapshotterResponse(r) => onDeleteSnapshotResponse(r, visibleState.state)
case RecoveryTickEvent(_) => Behaviors.unhandled
@ -393,6 +520,14 @@ private[akka] object Running {
}
}
def onPublishedEvent(event: PublishedEventImpl): Behavior[InternalProtocol] = {
if (state.receivedPoisonPill) {
Behaviors.unhandled
} else {
stashInternal(event)
}
}
final def onJournalResponse(response: Response): Behavior[InternalProtocol] = {
if (setup.log.isDebugEnabled) {
setup.log.debug2(
@ -407,6 +542,11 @@ private[akka] object Running {
onWriteSuccess(setup.context, p)
if (setup.publishEvents) {
context.system.eventStream ! EventStream.Publish(
PublishedEventImpl(setup.replicaId, setup.persistenceId, p.sequenceNr, p.payload, p.timestamp))
}
// only once all things are applied we can revert back
if (eventCounter < numberOfEvents) {
onWriteDone(setup.context, p)

View file

@ -62,7 +62,7 @@ private[akka] trait StashManagement[C, E, S] {
}
/**
* `tryUnstashOne` is called at the end of processing each command or when persist is completed
* `tryUnstashOne` is called at the end of processing each command, published event, or when persist is completed
*/
protected def tryUnstashOne(behavior: Behavior[InternalProtocol]): Behavior[InternalProtocol] = {
val buffer =

View file

@ -12,6 +12,7 @@ import akka.actor.typed.internal.BehaviorImpl.DeferredBehavior
import akka.actor.typed.internal.InterceptorImpl
import akka.actor.typed.internal.LoggerClass
import akka.actor.typed.scaladsl.ActorContext
import akka.annotation.ApiMayChange
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.PersistenceId
@ -21,8 +22,9 @@ import akka.persistence.typed.internal._
object EventSourcedBehavior {
// FIXME move to internal
@InternalApi
private[akka] case class ActiveActive(
private[akka] final case class ActiveActive(
replicaId: String,
allReplicas: Set[String],
aaContext: ActiveActiveContextImpl,
@ -237,4 +239,10 @@ object EventSourcedBehavior {
* By default, snapshots and events are recovered.
*/
def withRecovery(recovery: Recovery): EventSourcedBehavior[Command, Event, State]
/**
* Publish events to the system event stream as [[akka.persistence.typed.PublishedEvent]] after they have been persisted
*/
@ApiMayChange
def withEventPublishing(): EventSourcedBehavior[Command, Event, State]
}

View file

@ -5,7 +5,7 @@
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} [%-5level] [%logger] [%marker] [%X{akkaSource}] [%X{persistenceId}] - %msg %n</pattern>
<pattern>%date{ISO8601} [%-5level] [%logger] [%marker] [%X{akkaSource}] [%X{persistencePhase}] [%X{persistenceId}] - %msg %n</pattern>
</encoder>
</appender>

View file

@ -67,7 +67,8 @@ class EventSourcedBehaviorWatchSpec
holdingRecoveryPermit = false,
settings = settings,
stashState = new StashState(context.asInstanceOf[ActorContext[InternalProtocol]], settings),
None)
activeActive = None,
publishEvents = false)
"A typed persistent parent actor watching a child" must {

View file

@ -499,7 +499,7 @@ lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed")
clusterSharding % "compile->compile;compile->CompileJdk9;multi-jvm->multi-jvm",
actorTestkitTyped % "test->test",
actorTypedTests % "test->test",
persistenceTyped % "test->test",
persistenceTyped % "optional->compile;test->test",
persistenceTestkit % "test->test",
remote % "compile->CompileJdk9;test->test",
remoteTests % "test->test",