diff --git a/akka-persistence-typed/src/main/resources/reference.conf b/akka-persistence-typed/src/main/resources/reference.conf index 7361c8113f..99c897d34e 100644 --- a/akka-persistence-typed/src/main/resources/reference.conf +++ b/akka-persistence-typed/src/main/resources/reference.conf @@ -21,7 +21,6 @@ akka.persistence.typed { # enables automatic DEBUG level logging of messages stashed automatically by an EventSourcedBehavior, # this may happen while it receives commands while it is recovering events or while it is persisting events log-stashing = off - } akka.reliable-delivery { diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/PersistenceId.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/PersistenceId.scala index 28b9321339..7614edec35 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/PersistenceId.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/PersistenceId.scala @@ -3,6 +3,7 @@ */ package akka.persistence.typed +import akka.annotation.InternalApi object PersistenceId { @@ -125,6 +126,21 @@ object PersistenceId { def ofUniqueId(id: String): PersistenceId = new PersistenceId(id) + /** + * Constructs a persistence id from a unique entity id that includes the replica id. + */ + @InternalApi + private[akka] def replicatedUniqueId(entityId: String, replicaId: String): PersistenceId = { + if (entityId.contains(DefaultSeparator)) + throw new IllegalArgumentException( + s"entityId [$entityId] contains [$DefaultSeparator] which is a reserved character") + + if (replicaId.contains(DefaultSeparator)) + throw new IllegalArgumentException( + s"replicaId [$replicaId] contains [$DefaultSeparator] which is a reserved character") + + new PersistenceId(entityId + DefaultSeparator + replicaId) + } } /** diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala index 1ff5521cfd..6f87ad5134 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala @@ -6,14 +6,13 @@ package akka.persistence.typed.internal import scala.concurrent.ExecutionContext import scala.util.control.NonFatal - import org.slf4j.{ Logger, MDC } - import akka.actor.{ ActorRef, Cancellable } import akka.actor.typed.Signal import akka.actor.typed.scaladsl.ActorContext import akka.annotation.InternalApi import akka.persistence._ +import akka.persistence.typed.scaladsl.EventSourcedBehavior.ActiveActive import akka.persistence.typed.{ EventAdapter, PersistenceId, SnapshotAdapter } import akka.persistence.typed.scaladsl.{ EventSourcedBehavior, RetentionCriteria } import akka.util.OptionVal @@ -48,7 +47,8 @@ private[akka] final class BehaviorSetup[C, E, S]( val retention: RetentionCriteria, var holdingRecoveryPermit: Boolean, val settings: EventSourcedSettings, - val stashState: StashState) { + val stashState: StashState, + val activeActive: Option[ActiveActive]) { import BehaviorSetup._ import InternalProtocol.RecoveryTickEvent diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index e989af5bca..bc32587879 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -36,6 +36,7 @@ import akka.persistence.typed.SnapshotAdapter import akka.persistence.typed.SnapshotCompleted import akka.persistence.typed.SnapshotFailed import akka.persistence.typed.SnapshotSelectionCriteria +import akka.persistence.typed.scaladsl.EventSourcedBehavior.ActiveActive import akka.persistence.typed.scaladsl._ import akka.persistence.typed.scaladsl.{ Recovery => TypedRecovery } import akka.persistence.typed.scaladsl.RetentionCriteria @@ -87,7 +88,8 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( recovery: Recovery = Recovery(), retention: RetentionCriteria = RetentionCriteria.disabled, supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop, - override val signalHandler: PartialFunction[(State, Signal), Unit] = PartialFunction.empty) + override val signalHandler: PartialFunction[(State, Signal), Unit] = PartialFunction.empty, + activeActive: Option[ActiveActive] = None) extends EventSourcedBehavior[Command, Event, State] { import EventSourcedBehaviorImpl.WriterIdentity @@ -150,7 +152,8 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( retention, holdingRecoveryPermit = false, settings = settings, - stashState = stashState) + stashState = stashState, + activeActive = activeActive) // needs to accept Any since we also can get messages from the journal // not part of the user facing Command protocol @@ -237,6 +240,14 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( override def withRecovery(recovery: TypedRecovery): EventSourcedBehavior[Command, Event, State] = { copy(recovery = recovery.toClassic) } + + override private[akka] def withActiveActive( + context: ActiveActiveContext, + id: String, + allIds: Set[String], + queryPluginId: String): EventSourcedBehavior[Command, Event, State] = { + copy(activeActive = Some(ActiveActive(id, allIds, context, queryPluginId))) + } } /** Protocol used internally by the eventsourced behaviors. */ @@ -247,4 +258,10 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( final case class SnapshotterResponse(msg: akka.persistence.SnapshotProtocol.Response) extends InternalProtocol final case class RecoveryTickEvent(snapshot: Boolean) extends InternalProtocol final case class IncomingCommand[C](c: C) extends InternalProtocol + + final case class ReplicatedEventEnvelope[E](event: ReplicatedEvent[E], ack: ActorRef[ReplicatedEventAck.type]) + extends InternalProtocol } + +final case class ReplicatedEvent[E](event: E, originReplica: String, originSequenceNr: Long) +case object ReplicatedEventAck diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala index 4b8298f366..8345a26d30 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala @@ -27,32 +27,34 @@ private[akka] trait JournalInteractions[C, E, S] { def setup: BehaviorSetup[C, E, S] - type EventOrTagged = Any // `Any` since can be `E` or `Tagged` + type EventOrTaggedOrReplicated = Any // `Any` since can be `E` or `Tagged` or a `ReplicatedEvent` protected def internalPersist( ctx: ActorContext[_], cmd: Any, state: Running.RunningState[S], - event: EventOrTagged, + event: EventOrTaggedOrReplicated, eventAdapterManifest: String): Running.RunningState[S] = { - val newState = state.nextSequenceNr() + val newRunningState = state.nextSequenceNr() val repr = PersistentRepr( event, persistenceId = setup.persistenceId.id, - sequenceNr = newState.seqNr, + sequenceNr = newRunningState.seqNr, manifest = eventAdapterManifest, writerUuid = setup.writerIdentity.writerUuid, sender = ActorRef.noSender) + // FIXME check cinnamon is okay with this being null + // https://github.com/akka/akka/issues/29262 onWriteInitiated(ctx, cmd, repr) val write = AtomicWrite(repr) :: Nil setup.journal .tell(JournalProtocol.WriteMessages(write, setup.selfClassic, setup.writerIdentity.instanceId), setup.selfClassic) - newState + newRunningState } @InternalStableApi @@ -65,7 +67,7 @@ private[akka] trait JournalInteractions[C, E, S] { ctx: ActorContext[_], cmd: Any, state: Running.RunningState[S], - events: immutable.Seq[(EventOrTagged, String)]): Running.RunningState[S] = { + events: immutable.Seq[(EventOrTaggedOrReplicated, String)]): Running.RunningState[S] = { if (events.nonEmpty) { var newState = state diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index c61da7ce27..0b2735723d 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -87,13 +87,16 @@ private[akka] final class ReplayingEvents[C, E, S]( () override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = { + // FIXME deal with a replicated event and ack + // https://github.com/akka/akka/issues/29256 msg match { - case JournalResponse(r) => onJournalResponse(r) - case SnapshotterResponse(r) => onSnapshotterResponse(r) - case RecoveryTickEvent(snap) => onRecoveryTick(snap) - case cmd: IncomingCommand[C] => onCommand(cmd) - case get: GetState[S @unchecked] => stashInternal(get) - case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit + case JournalResponse(r) => onJournalResponse(r) + case SnapshotterResponse(r) => onSnapshotterResponse(r) + case RecoveryTickEvent(snap) => onRecoveryTick(snap) + case evt: ReplicatedEventEnvelope[E] => onInternalCommand(evt) + case cmd: IncomingCommand[C] => onInternalCommand(cmd) + case get: GetState[S @unchecked] => stashInternal(get) + case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit } } @@ -154,7 +157,7 @@ private[akka] final class ReplayingEvents[C, E, S]( } } - private def onCommand(cmd: InternalProtocol): Behavior[InternalProtocol] = { + private def onInternalCommand(cmd: InternalProtocol): Behavior[InternalProtocol] = { // during recovery, stash all incoming commands if (state.receivedPoisonPill) { if (setup.settings.logOnStashing) @@ -236,8 +239,12 @@ private[akka] final class ReplayingEvents[C, E, S]( if (state.receivedPoisonPill && isInternalStashEmpty && !isUnstashAllInProgress) Behaviors.stopped else { + val seenPerReplica: Map[String, Long] = + setup.activeActive.map(aa => aa.allReplicas.map(replica => replica -> 0L).toMap).getOrElse(Map.empty) val running = - Running[C, E, S](setup, Running.RunningState[S](state.seqNr, state.state, state.receivedPoisonPill)) + Running[C, E, S]( + setup, + Running.RunningState[S](state.seqNr, state.state, state.receivedPoisonPill, seenPerReplica)) tryUnstashOne(running) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala index e71ffec70b..6c471b9201 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala @@ -57,9 +57,10 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup def stay(receivedPoisonPill: Boolean): Behavior[InternalProtocol] = { Behaviors .receiveMessage[InternalProtocol] { - case SnapshotterResponse(r) => onSnapshotterResponse(r, receivedPoisonPill) - case JournalResponse(r) => onJournalResponse(r) - case RecoveryTickEvent(snapshot) => onRecoveryTick(snapshot) + case SnapshotterResponse(r) => onSnapshotterResponse(r, receivedPoisonPill) + case JournalResponse(r) => onJournalResponse(r) + case RecoveryTickEvent(snapshot) => onRecoveryTick(snapshot) + case evt: ReplicatedEventEnvelope[E] => onReplicatedEvent(evt) case cmd: IncomingCommand[C] => if (receivedPoisonPill) { if (setup.settings.logOnStashing) @@ -122,6 +123,10 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup stashInternal(cmd) } + def onReplicatedEvent(evt: InternalProtocol.ReplicatedEventEnvelope[E]): Behavior[InternalProtocol] = { + stashInternal(evt) + } + def onJournalResponse(response: JournalProtocol.Response): Behavior[InternalProtocol] = { setup.log.debug( "Unexpected response from journal: [{}], may be due to an actor restart, ignoring...", diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index e9456f9795..66e7f8732c 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -6,10 +6,8 @@ package akka.persistence.typed.internal import scala.annotation.tailrec import scala.collection.immutable - import akka.actor.UnhandledMessage -import akka.actor.typed.Behavior -import akka.actor.typed.Signal +import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Signal } import akka.actor.typed.internal.PoisonPill import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors, LoggerOps } import akka.annotation.{ InternalApi, InternalStableApi } @@ -26,20 +24,30 @@ import akka.persistence.SaveSnapshotFailure import akka.persistence.SaveSnapshotSuccess import akka.persistence.SnapshotProtocol import akka.persistence.journal.Tagged -import akka.persistence.typed.DeleteEventsCompleted -import akka.persistence.typed.DeleteEventsFailed -import akka.persistence.typed.DeleteSnapshotsCompleted -import akka.persistence.typed.DeleteSnapshotsFailed -import akka.persistence.typed.DeletionTarget -import akka.persistence.typed.EventRejectedException -import akka.persistence.typed.SnapshotCompleted -import akka.persistence.typed.SnapshotFailed -import akka.persistence.typed.SnapshotMetadata -import akka.persistence.typed.SnapshotSelectionCriteria +import akka.persistence.query.{ EventEnvelope, PersistenceQuery } +import akka.persistence.query.scaladsl.EventsByPersistenceIdQuery +import akka.persistence.typed.{ + DeleteEventsCompleted, + DeleteEventsFailed, + DeleteSnapshotsCompleted, + DeleteSnapshotsFailed, + DeletionTarget, + EventRejectedException, + PersistenceId, + SnapshotCompleted, + SnapshotFailed, + SnapshotMetadata, + SnapshotSelectionCriteria +} import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetState +import akka.persistence.typed.internal.InternalProtocol.ReplicatedEventEnvelope import akka.persistence.typed.internal.Running.WithSeqNrAccessible import akka.persistence.typed.scaladsl.Effect -import akka.util.unused +import akka.persistence.typed.scaladsl.EventSourcedBehavior.ActiveActive +import akka.stream.{ SharedKillSwitch, SystemMaterializer } +import akka.stream.scaladsl.{ RestartSource, Sink } +import akka.stream.typed.scaladsl.ActorFlow +import akka.util.{ unused, Timeout } /** * INTERNAL API @@ -66,7 +74,12 @@ private[akka] object Running { def currentSequenceNumber: Long } - final case class RunningState[State](seqNr: Long, state: State, receivedPoisonPill: Boolean) { + final case class RunningState[State]( + seqNr: Long, + state: State, + receivedPoisonPill: Boolean, + seenPerReplica: Map[String, Long], + replicationKillSwitch: Option[SharedKillSwitch] = None) { def nextSequenceNr(): RunningState[State] = copy(seqNr = seqNr + 1) @@ -82,8 +95,40 @@ private[akka] object Running { def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: RunningState[S]): Behavior[InternalProtocol] = { val running = new Running(setup.setMdcPhase(PersistenceMdc.RunningCmds)) + setup.activeActive.foreach(aa => startReplicationStream(setup.context.system, setup.context.self, state, aa)) new running.HandlingCommands(state) } + + def startReplicationStream[E, S]( + system: ActorSystem[_], + ref: ActorRef[InternalProtocol], + state: RunningState[S], + aa: ActiveActive): Unit = { + import scala.concurrent.duration._ + + val query = PersistenceQuery(system) + aa.allReplicas.foreach { replica => + if (replica != aa.replicaId) { + val seqNr = state.seenPerReplica(replica) + val pid = PersistenceId.replicatedUniqueId(aa.aaContext.entityId, replica) + // FIXME support different configuration per replica https://github.com/akka/akka/issues/29257 + val replication = query.readJournalFor[EventsByPersistenceIdQuery](aa.queryPluginId) + + implicit val timeout = Timeout(30.seconds) + + val source = RestartSource.withBackoff(2.seconds, 10.seconds, randomFactor = 0.2) { () => + replication + .eventsByPersistenceId(pid.id, seqNr + 1, Long.MaxValue) + .via(ActorFlow.ask[EventEnvelope, ReplicatedEventEnvelope[E], ReplicatedEventAck.type](ref) { + (eventEnvelope, replyTo) => + ReplicatedEventEnvelope(eventEnvelope.event.asInstanceOf[ReplicatedEvent[E]], replyTo) + }) + } + + source.runWith(Sink.ignore)(SystemMaterializer(system).materializer) + } + } + } } // =============================================== @@ -106,12 +151,17 @@ private[akka] object Running { _currentSequenceNumber = state.seqNr + private def alreadySeen(e: ReplicatedEvent[_]): Boolean = { + e.originSequenceNr <= state.seenPerReplica.getOrElse(e.originReplica, 0L) + } + def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match { - case IncomingCommand(c: C @unchecked) => onCommand(state, c) - case JournalResponse(r) => onDeleteEventsJournalResponse(r, state.state) - case SnapshotterResponse(r) => onDeleteSnapshotResponse(r, state.state) - case get: GetState[S @unchecked] => onGetState(get) - case _ => Behaviors.unhandled + case IncomingCommand(c: C @unchecked) => onCommand(state, c) + case re: ReplicatedEventEnvelope[E @unchecked] => onReplicatedEvent(state, re) + case JournalResponse(r) => onDeleteEventsJournalResponse(r, state.state) + case SnapshotterResponse(r) => onDeleteSnapshotResponse(r, state.state) + case get: GetState[S @unchecked] => onGetState(get) + case _ => Behaviors.unhandled } override def onSignal: PartialFunction[Signal, Behavior[InternalProtocol]] = { @@ -130,12 +180,45 @@ private[akka] object Running { else next } + def onReplicatedEvent( + state: Running.RunningState[S], + envelope: ReplicatedEventEnvelope[E]): Behavior[InternalProtocol] = { + // FIXME set the details on the context https://github.com/akka/akka/issues/29258 + setup.log.infoN( + "Replica {} received replicated event. Replica seqs nrs: {}", + setup.activeActive, + state.seenPerReplica) + envelope.ack ! ReplicatedEventAck + if (envelope.event.originReplica != setup.activeActive.get.replicaId && !alreadySeen(envelope.event)) { + setup.log.info("Saving event as first time") + handleReplicatedEventPersist(envelope.event) + } else { + setup.log.info("Filtering event as already seen") + this + } + } + // Used by EventSourcedBehaviorTestKit to retrieve the state. def onGetState(get: GetState[S]): Behavior[InternalProtocol] = { get.replyTo ! state.state this } + private def handleReplicatedEventPersist(event: ReplicatedEvent[E]): Behavior[InternalProtocol] = { + _currentSequenceNumber = state.seqNr + 1 + val newState: RunningState[S] = state.applyEvent(setup, event.event) + val newState2: RunningState[S] = internalPersist(setup.context, null, newState, event, "") + val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event.event, newState2.seqNr) + // FIXME validate this is the correct sequence nr from that replica https://github.com/akka/akka/issues/29259 + val updatedSeen = newState2.seenPerReplica.updated(event.originReplica, event.originSequenceNr) + persistingEvents( + newState2.copy(seenPerReplica = updatedSeen), + state, + numberOfEvents = 1, + shouldSnapshotAfterPersist, + Nil) + } + @tailrec def applyEffects( msg: Any, state: RunningState[S], @@ -162,8 +245,13 @@ private[akka] object Running { val eventToPersist = adaptEvent(event) val eventAdapterManifest = setup.eventAdapter.manifest(event) - - val newState2 = internalPersist(setup.context, msg, newState, eventToPersist, eventAdapterManifest) + val newState2 = setup.activeActive match { + case Some(aa) => + val replicatedEvent = ReplicatedEvent(eventToPersist, aa.replicaId, _currentSequenceNumber) + internalPersist(setup.context, msg, newState, replicatedEvent, eventAdapterManifest) + case None => + internalPersist(setup.context, msg, newState, eventToPersist, eventAdapterManifest) + } val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event, newState2.seqNr) @@ -252,12 +340,13 @@ private[akka] object Running { override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = { msg match { - case JournalResponse(r) => onJournalResponse(r) - case in: IncomingCommand[C @unchecked] => onCommand(in) - case get: GetState[S @unchecked] => stashInternal(get) - case SnapshotterResponse(r) => onDeleteSnapshotResponse(r, visibleState.state) - case RecoveryTickEvent(_) => Behaviors.unhandled - case RecoveryPermitGranted => Behaviors.unhandled + case JournalResponse(r) => onJournalResponse(r) + case in: IncomingCommand[C @unchecked] => onCommand(in) + case re: ReplicatedEventEnvelope[E @unchecked] => onReplicatedEvent(re) + case get: GetState[S @unchecked] => stashInternal(get) + case SnapshotterResponse(r) => onDeleteSnapshotResponse(r, visibleState.state) + case RecoveryTickEvent(_) => Behaviors.unhandled + case RecoveryPermitGranted => Behaviors.unhandled } } @@ -271,6 +360,14 @@ private[akka] object Running { } } + def onReplicatedEvent(event: InternalProtocol.ReplicatedEventEnvelope[E]): Behavior[InternalProtocol] = { + if (state.receivedPoisonPill) { + Behaviors.unhandled + } else { + stashInternal(event) + } + } + final def onJournalResponse(response: Response): Behavior[InternalProtocol] = { if (setup.log.isDebugEnabled) { setup.log.debug2( diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ActiveActiveEventSourcing.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ActiveActiveEventSourcing.scala new file mode 100644 index 0000000000..847b2cbff6 --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ActiveActiveEventSourcing.scala @@ -0,0 +1,115 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.typed.scaladsl + +import akka.persistence.typed.PersistenceId + +/** + * Utility class for comparing timestamp and data center + * identifier when implementing last-writer wins. + */ +final case class LwwTime(timestamp: Long, originDc: String) { + + /** + * Create a new `LwwTime` that has a `timestamp` that is + * `max` of the given timestamp and previous timestamp + 1, + * i.e. monotonically increasing. + */ + def increase(t: Long, replicaId: String): LwwTime = + LwwTime(math.max(timestamp + 1, t), replicaId) + + /** + * Compare this `LwwTime` with the `other`. + * Greatest timestamp wins. If both timestamps are + * equal the `dc` identifiers are compared and the + * one sorted first in alphanumeric order wins. + */ + def isAfter(other: LwwTime): Boolean = { + if (timestamp > other.timestamp) true + else if (timestamp < other.timestamp) false + else if (other.originDc.compareTo(originDc) > 0) true + else false + } +} + +trait ActiveActiveContext { + def timestamp: Long + def origin: String + def concurrent: Boolean + def replicaId: String + def allReplicas: Set[String] + def persistenceId: PersistenceId + def recoveryRunning: Boolean + def entityId: String + def currentTimeMillis(): Long +} + +// FIXME, parts of this can be set during initialisation +// Other fields will be set before executing the event handler as they change per event +// https://github.com/akka/akka/issues/29258 +private[akka] class ActiveActiveContextImpl(val entityId: String, val replicaId: String, val allReplicas: Set[String]) + extends ActiveActiveContext { + var _timestamp: Long = -1 + var _origin: String = null + var _concurrent: Boolean = false + + // FIXME check illegal access https://github.com/akka/akka/issues/29264 + + /** + * The timestamp of the event. Always increases per data center + * Undefined result if called from any where other than an event handler. + */ + override def timestamp: Long = _timestamp + + /** + * The origin of the current event. + * Undefined result if called from anywhere other than an event handler. + */ + override def origin: String = _origin + + /** + * Whether the happened concurrently with an event from another replica. + * Undefined result if called from any where other than an event handler. + */ + override def concurrent: Boolean = _concurrent + override def persistenceId: PersistenceId = PersistenceId.replicatedUniqueId(entityId, replicaId) + override def currentTimeMillis(): Long = { + // FIXME always increasing + System.currentTimeMillis() + } + override def recoveryRunning: Boolean = false +} + +object ActiveActiveEventSourcing { + + /** + * Initialize a replicated event sourced behavior. + * + * Events from each replica for the same entityId will be replicated to every copy. + * Care must be taken to handle events in any order as events can happen concurrently at different replicas. + * + * Using an replicated event sourced behavior means there is no longer the single writer guarantee. + * + * A different journal plugin id can be configured using withJournalPluginId after creation. Different databases + * can be used for each replica. + * The events from other replicas are read using PersistentQuery. + * TODO support a different query plugin id per replicas: https://github.com/akka/akka/issues/29257 + * + * @param replicaId The unique identity for this entity. The underlying persistence id will include the replica. + * @param allReplicaIds All replica ids. These need to be known to receive events from all replicas. + * @param queryPluginId Used to read the events from other replicas. Must be the query side of your configured journal plugin. + * @return + */ + def apply[Command, Event, State]( + entityId: String, + replicaId: String, + allReplicaIds: Set[String], + queryPluginId: String)(activeActiveContext: ActiveActiveContext => EventSourcedBehavior[Command, Event, State]) + : EventSourcedBehavior[Command, Event, State] = { + val context = new ActiveActiveContextImpl(entityId, replicaId, allReplicaIds) + activeActiveContext(context).withActiveActive(context, replicaId, allReplicaIds, queryPluginId) + } + +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala index 7583715b9e..e0a4984587 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala @@ -5,7 +5,6 @@ package akka.persistence.typed.scaladsl import scala.annotation.tailrec - import akka.actor.typed.BackoffSupervisorStrategy import akka.actor.typed.Behavior import akka.actor.typed.Signal @@ -13,7 +12,7 @@ import akka.actor.typed.internal.BehaviorImpl.DeferredBehavior import akka.actor.typed.internal.InterceptorImpl import akka.actor.typed.internal.LoggerClass import akka.actor.typed.scaladsl.ActorContext -import akka.annotation.DoNotInherit +import akka.annotation.{ DoNotInherit, InternalApi } import akka.persistence.typed.EventAdapter import akka.persistence.typed.PersistenceId import akka.persistence.typed.SnapshotAdapter @@ -22,6 +21,13 @@ import akka.persistence.typed.internal._ object EventSourcedBehavior { + @InternalApi + private[akka] case class ActiveActive( + replicaId: String, + allReplicas: Set[String], + aaContext: ActiveActiveContext, + queryPluginId: String) + /** * Type alias for the command handler function that defines how to act on commands. * @@ -145,6 +151,12 @@ object EventSourcedBehavior { */ def withJournalPluginId(id: String): EventSourcedBehavior[Command, Event, State] + private[akka] def withActiveActive( + context: ActiveActiveContext, + replicaId: String, + allReplicaIds: Set[String], + queryPluginId: String): EventSourcedBehavior[Command, Event, State] + /** * Change the snapshot store plugin id that this actor should use. */ diff --git a/akka-persistence-typed/src/test/resources/logback-test.xml b/akka-persistence-typed/src/test/resources/logback-test.xml index 22c45c93b6..c980894390 100644 --- a/akka-persistence-typed/src/test/resources/logback-test.xml +++ b/akka-persistence-typed/src/test/resources/logback-test.xml @@ -5,7 +5,7 @@ - %date{ISO8601} %-5level %logger %marker - %msg MDC: {%mdc}%n + %date{ISO8601} [%-5level] [%logger] [%marker] [%X{akkaSource}] [%X{persistenceId}] - %msg %n diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala index b6d7bfe066..c033e614ed 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorWatchSpec.scala @@ -66,7 +66,8 @@ class EventSourcedBehaviorWatchSpec RetentionCriteria.disabled, holdingRecoveryPermit = false, settings = settings, - stashState = new StashState(context.asInstanceOf[ActorContext[InternalProtocol]], settings)) + stashState = new StashState(context.asInstanceOf[ActorContext[InternalProtocol]], settings), + None) "A typed persistent parent actor watching a child" must { diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/aa/AAAuctionExampleSpec.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/aa/AAAuctionExampleSpec.scala new file mode 100644 index 0000000000..9ca783e315 --- /dev/null +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/aa/AAAuctionExampleSpec.scala @@ -0,0 +1,270 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package docs.akka.persistence.typed.aa + +import java.time.Instant + +import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit } +import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, _ } +import akka.actor.typed.{ ActorRef, Behavior } +import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal +import akka.persistence.typed.scaladsl.{ ActiveActiveContext, ActiveActiveEventSourcing, Effect, EventSourcedBehavior } +import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.{ Eventually, ScalaFutures } +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpecLike + +object AAAuctionExampleSpec { + val config = ConfigFactory.parseString(""" + akka.actor.provider = cluster + akka.loglevel = info + akka.persistence { + journal { + plugin = "akka.persistence.journal.inmem" + } + } + """) + type MoneyAmount = Int + + case class Bid(bidder: String, offer: MoneyAmount, timestamp: Instant, originDc: String) + + // commands + sealed trait AuctionCommand + case object Finish extends AuctionCommand // A timer needs to schedule this event at each replica + final case class OfferBid(bidder: String, offer: MoneyAmount) extends AuctionCommand + final case class GetHighestBid(replyTo: ActorRef[Bid]) extends AuctionCommand + final case class IsClosed(replyTo: ActorRef[Boolean]) extends AuctionCommand + private final case object Close extends AuctionCommand // Internal, should not be sent from the outside + + sealed trait AuctionEvent + final case class BidRegistered(bid: Bid) extends AuctionEvent + final case class AuctionFinished(atDc: String) extends AuctionEvent + final case class WinnerDecided(atDc: String, winningBid: Bid, highestCounterOffer: MoneyAmount) extends AuctionEvent + + sealed trait AuctionPhase + case object Running extends AuctionPhase + final case class Closing(finishedAtDc: Set[String]) extends AuctionPhase + case object Closed extends AuctionPhase + + case class AuctionState( + phase: AuctionPhase, + highestBid: Bid, + highestCounterOffer: MoneyAmount // in ebay style auctions, we need to keep track of current highest counter offer + ) { + + def applyEvent(event: AuctionEvent): AuctionState = + event match { + case BidRegistered(b) => + if (isHigherBid(b, highestBid)) + withNewHighestBid(b) + else + withTooLowBid(b) + case AuctionFinished(atDc) => + phase match { + case Running => + copy(phase = Closing(Set(atDc))) + case Closing(alreadyFinishedDcs) => + copy(phase = Closing(alreadyFinishedDcs + atDc)) + case _ => + this + } + case _: WinnerDecided => + copy(phase = Closed) + } + + def withNewHighestBid(bid: Bid): AuctionState = { + require(phase != Closed) + require(isHigherBid(bid, highestBid)) + copy(highestBid = bid, highestCounterOffer = highestBid.offer // keep last highest bid around + ) + } + def withTooLowBid(bid: Bid): AuctionState = { + require(phase != Closed) + require(isHigherBid(highestBid, bid)) + copy(highestCounterOffer = highestCounterOffer.max(bid.offer)) // update highest counter offer + } + + def isHigherBid(first: Bid, second: Bid): Boolean = + first.offer > second.offer || + (first.offer == second.offer && first.timestamp.isBefore(second.timestamp)) || // if equal, first one wins + // If timestamps are equal, choose by dc where the offer was submitted + // In real auctions, this last comparison should be deterministic but unpredictable, so that submitting to a + // particular DC would not be an advantage. + (first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originDc.compareTo( + second.originDc) < 0) + } + + case class AuctionSetup( + name: String, + initialBid: Bid, // the initial bid is basically the minimum price bidden at start time by the owner + closingAt: Instant, + responsibleForClosing: Boolean, + allDcs: Set[String]) + + def commandHandler(setup: AuctionSetup, ctx: ActorContext[AuctionCommand], aaContext: ActiveActiveContext)( + state: AuctionState, + command: AuctionCommand): Effect[AuctionEvent, AuctionState] = { + state.phase match { + case Closing(_) | Closed => + command match { + case GetHighestBid(replyTo) => + replyTo ! state.highestBid + Effect.none + case IsClosed(replyTo) => + replyTo ! (state.phase == Closed) + Effect.none + case Finish => + ctx.log.info("Finish") + Effect.persist(AuctionFinished(aaContext.replicaId)) + case Close => + ctx.log.info("Close") + require(shouldClose(setup, state)) + // TODO send email (before or after persisting) + Effect.persist(WinnerDecided(aaContext.replicaId, state.highestBid, state.highestCounterOffer)) + case _: OfferBid => + // auction finished, no more bids accepted + Effect.unhandled + } + case Running => + command match { + case OfferBid(bidder, offer) => + Effect.persist( + BidRegistered( + Bid(bidder, offer, Instant.ofEpochMilli(aaContext.currentTimeMillis()), aaContext.replicaId))) + case GetHighestBid(replyTo) => + replyTo ! state.highestBid + Effect.none + case Finish => + Effect.persist(AuctionFinished(aaContext.replicaId)) + case Close => + ctx.log.warn("Premature close") + // Close should only be triggered when we have already finished + Effect.unhandled + case IsClosed(replyTo) => + replyTo ! false + Effect.none + } + } + } + + private def shouldClose(auctionSetup: AuctionSetup, state: AuctionState): Boolean = { + auctionSetup.responsibleForClosing && (state.phase match { + case Closing(alreadyFinishedAtDc) => + val allDone = auctionSetup.allDcs.diff(alreadyFinishedAtDc).isEmpty + if (!allDone) { + println( + s"Not closing auction as not all DCs have reported finished. All DCs: ${auctionSetup.allDcs}. Reported finished ${alreadyFinishedAtDc}") + } + allDone + case _ => + false + }) + } + + def eventHandler(ctx: ActorContext[AuctionCommand], aaCtx: ActiveActiveContext, setup: AuctionSetup)( + state: AuctionState, + event: AuctionEvent): AuctionState = { + + val newState = state.applyEvent(event) + ctx.log.infoN("Applying event {}. New start {}", event, newState) + if (!aaCtx.recoveryRunning) { + eventTriggers(setup, ctx, aaCtx, event, newState) + } + newState + + } + + private def eventTriggers( + setup: AuctionSetup, + ctx: ActorContext[AuctionCommand], + aaCtx: ActiveActiveContext, + event: AuctionEvent, + newState: AuctionState) = { + event match { + case finished: AuctionFinished => + newState.phase match { + case Closing(alreadyFinishedAtDc) => + ctx.log.infoN( + "AuctionFinished at {}, already finished at [{}]", + finished.atDc, + alreadyFinishedAtDc.mkString(", ")) + if (alreadyFinishedAtDc(aaCtx.replicaId)) { + if (shouldClose(setup, newState)) ctx.self ! Close + } else { + ctx.log.info("Sending finish to self") + ctx.self ! Finish + } + + case _ => // no trigger for this state + } + case _ => // no trigger for this event + } + } + + def initialState(setup: AuctionSetup) = + AuctionState(phase = Running, highestBid = setup.initialBid, highestCounterOffer = setup.initialBid.offer) + + def behavior(replica: String, setup: AuctionSetup): Behavior[AuctionCommand] = Behaviors.setup[AuctionCommand] { + ctx => + ActiveActiveEventSourcing(setup.name, replica, setup.allDcs, LeveldbReadJournal.Identifier) { aaCtx => + EventSourcedBehavior( + aaCtx.persistenceId, + initialState(setup), + commandHandler(setup, ctx, aaCtx), + eventHandler(ctx, aaCtx, setup)) + } + } +} + +class AAAuctionExampleSpec + extends ScalaTestWithActorTestKit(AAAuctionExampleSpec.config) + with AnyWordSpecLike + with Matchers + with LogCapturing + with ScalaFutures + with Eventually { + import AAAuctionExampleSpec._ + + "Auction example" should { + "work" in { + val Replicas = Set("DC-A", "DC-B") + val setupA = + AuctionSetup( + "old-skis", + Bid("chbatey", 12, Instant.now(), "DC-A"), + Instant.now().plusSeconds(10), + responsibleForClosing = true, + Replicas) + + val setupB = setupA.copy(responsibleForClosing = false) + + val dcAReplica: ActorRef[AuctionCommand] = spawn(behavior("DC-A", setupA)) + val dcBReplica: ActorRef[AuctionCommand] = spawn(behavior("DC-B", setupB)) + + dcAReplica ! OfferBid("me", 100) + dcAReplica ! OfferBid("me", 99) + dcAReplica ! OfferBid("me", 202) + + eventually { + val replyProbe = createTestProbe[Bid]() + dcAReplica ! GetHighestBid(replyProbe.ref) + val bid = replyProbe.expectMessageType[Bid] + bid.offer shouldEqual 202 + } + + dcAReplica ! Finish + eventually { + val finishProbe = createTestProbe[Boolean]() + dcAReplica ! IsClosed(finishProbe.ref) + finishProbe.expectMessage(true) + } + eventually { + val finishProbe = createTestProbe[Boolean]() + dcBReplica ! IsClosed(finishProbe.ref) + finishProbe.expectMessage(true) + } + } + } +} diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/aa/AABlogExampleSpec.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/aa/AABlogExampleSpec.scala new file mode 100644 index 0000000000..9f5b288ee9 --- /dev/null +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/aa/AABlogExampleSpec.scala @@ -0,0 +1,162 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package docs.akka.persistence.typed.aa + +import java.util.UUID + +import akka.Done +import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit } +import akka.actor.typed.ActorRef +import akka.actor.typed.scaladsl.{ ActorContext, Behaviors } +import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal +import akka.persistence.typed.scaladsl._ +import akka.serialization.jackson.CborSerializable +import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.{ Eventually, ScalaFutures } +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{ Millis, Span } +import org.scalatest.wordspec.AnyWordSpecLike + +object AABlogExampleSpec { + val config = + ConfigFactory.parseString(s""" + + akka.actor.allow-java-serialization = true + // FIXME serializers for replicated event or akka persistence support for metadata: https://github.com/akka/akka/issues/29260 + akka.actor.provider = cluster + akka.loglevel = debug + akka.persistence { + journal { + plugin = "akka.persistence.journal.leveldb" + leveldb { + native = off + dir = "target/journal-AABlogExampleSpec-${UUID.randomUUID()}" + } + } + } + """) + + final case class BlogState(content: Option[PostContent], contentTimestamp: LwwTime, published: Boolean) { + def withContent(newContent: PostContent, timestamp: LwwTime): BlogState = + copy(content = Some(newContent), contentTimestamp = timestamp) + def isEmpty: Boolean = content.isEmpty + } + val emptyState: BlogState = BlogState(None, LwwTime(Long.MinValue, ""), published = false) + + final case class PostContent(title: String, body: String) + final case class PostSummary(postId: String, title: String) + final case class Published(postId: String) extends BlogEvent + + sealed trait BlogCommand + final case class AddPost(postId: String, content: PostContent, replyTo: ActorRef[AddPostDone]) extends BlogCommand + final case class AddPostDone(postId: String) + final case class GetPost(postId: String, replyTo: ActorRef[PostContent]) extends BlogCommand + final case class ChangeBody(postId: String, newContent: PostContent, replyTo: ActorRef[Done]) extends BlogCommand + final case class Publish(postId: String, replyTo: ActorRef[Done]) extends BlogCommand + + sealed trait BlogEvent extends CborSerializable + final case class PostAdded(postId: String, content: PostContent, timestamp: LwwTime) extends BlogEvent + final case class BodyChanged(postId: String, newContent: PostContent, timestamp: LwwTime) extends BlogEvent +} + +class AABlogExampleSpec + extends ScalaTestWithActorTestKit(AABlogExampleSpec.config) + with AnyWordSpecLike + with Matchers + with LogCapturing + with ScalaFutures + with Eventually { + import AABlogExampleSpec._ + + implicit val config: PatienceConfig = PatienceConfig(timeout = Span(timeout.duration.toMillis, Millis)) + + def behavior(aa: ActiveActiveContext, ctx: ActorContext[BlogCommand]) = + EventSourcedBehavior[BlogCommand, BlogEvent, BlogState]( + aa.persistenceId, + emptyState, + (state, cmd) => + cmd match { + case AddPost(_, content, replyTo) => + val evt = + PostAdded(aa.persistenceId.id, content, state.contentTimestamp.increase(aa.timestamp, aa.replicaId)) + Effect.persist(evt).thenRun { _ => + replyTo ! AddPostDone(aa.entityId) + } + case ChangeBody(_, newContent, replyTo) => + val evt = + BodyChanged(aa.persistenceId.id, newContent, state.contentTimestamp.increase(aa.timestamp, aa.replicaId)) + Effect.persist(evt).thenRun { _ => + replyTo ! Done + } + case p: Publish => + Effect.persist(Published("id")).thenRun { _ => + p.replyTo ! Done + } + case gp: GetPost => + ctx.log.info("GetPost {}", state.content) + state.content.foreach(content => gp.replyTo ! content) + Effect.none + }, + (state, event) => { + ctx.log.info(s"${aa.entityId}:${aa.replicaId} Received event $event") + event match { + case PostAdded(_, content, timestamp) => + if (timestamp.isAfter(state.contentTimestamp)) { + val s = state.withContent(content, timestamp) + ctx.log.info("Updating content. New content is {}", s) + s + } else { + ctx.log.info("Ignoring event as timestamp is older") + state + } + case BodyChanged(_, newContent, timestamp) => + if (timestamp.isAfter(state.contentTimestamp)) + state.withContent(newContent, timestamp) + else state + case Published(_) => + state.copy(published = true) + } + }) + + "Blog Example" should { + "work" in { + val refDcA: ActorRef[BlogCommand] = + spawn(Behaviors.setup[BlogCommand] { ctx => + ActiveActiveEventSourcing("cat", "DC-A", Set("DC-A", "DC-B"), LeveldbReadJournal.Identifier) { + (aa: ActiveActiveContext) => + behavior(aa, ctx) + } + }, "dc-a") + + val refDcB: ActorRef[BlogCommand] = + spawn(Behaviors.setup[BlogCommand] { ctx => + ActiveActiveEventSourcing("cat", "DC-B", Set("DC-A", "DC-B"), LeveldbReadJournal.Identifier) { + (aa: ActiveActiveContext) => + behavior(aa, ctx) + } + }, "dc-b") + + import akka.actor.typed.scaladsl.AskPattern._ + import akka.util.Timeout + + import scala.concurrent.duration._ + implicit val timeout: Timeout = 3.seconds + + val content = PostContent("cats are the bets", "yep") + val response = + refDcA.ask[AddPostDone](replyTo => AddPost("cat", content, replyTo)).futureValue + + response shouldEqual AddPostDone("cat") + + eventually { + refDcA.ask[PostContent](replyTo => GetPost("cat", replyTo)).futureValue shouldEqual content + } + + eventually { + refDcB.ask[PostContent](replyTo => GetPost("cat", replyTo)).futureValue shouldEqual content + } + } + } +} diff --git a/build.sbt b/build.sbt index a53103bdac..0e151f8fc4 100644 --- a/build.sbt +++ b/build.sbt @@ -453,8 +453,9 @@ lazy val actorTyped = akkaModule("akka-actor-typed") lazy val persistenceTyped = akkaModule("akka-persistence-typed") .dependsOn( actorTyped, + streamTyped, persistence % "compile->compile;test->test", - persistenceQuery % "test", + persistenceQuery, actorTestkitTyped % "test->test", clusterTyped % "test->test", actorTestkitTyped % "test->test", diff --git a/project/Dependencies.scala b/project/Dependencies.scala index b3cc7cb359..0072a183c4 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -255,7 +255,7 @@ object Dependencies { val persistenceTestKit = l ++= Seq(Test.scalatest, Test.logback) - val persistenceShared = l ++= Seq(Provided.levelDB, Provided.levelDBNative) + val persistenceShared = l ++= Seq(Provided.levelDB, Provided.levelDBNative, Test.logback) val jackson = l ++= Seq( jacksonCore,