Initial prototype of active active event sourcing (#29265)

This commit is contained in:
Christopher Batey 2020-06-22 12:46:57 +01:00
parent 94dc84d5d0
commit ad2d7e2d00
16 changed files with 761 additions and 57 deletions

View file

@ -21,7 +21,6 @@ akka.persistence.typed {
# enables automatic DEBUG level logging of messages stashed automatically by an EventSourcedBehavior,
# this may happen while it receives commands while it is recovering events or while it is persisting events
log-stashing = off
}
akka.reliable-delivery {

View file

@ -3,6 +3,7 @@
*/
package akka.persistence.typed
import akka.annotation.InternalApi
object PersistenceId {
@ -125,6 +126,21 @@ object PersistenceId {
def ofUniqueId(id: String): PersistenceId =
new PersistenceId(id)
/**
* Constructs a persistence id from a unique entity id that includes the replica id.
*/
@InternalApi
private[akka] def replicatedUniqueId(entityId: String, replicaId: String): PersistenceId = {
if (entityId.contains(DefaultSeparator))
throw new IllegalArgumentException(
s"entityId [$entityId] contains [$DefaultSeparator] which is a reserved character")
if (replicaId.contains(DefaultSeparator))
throw new IllegalArgumentException(
s"replicaId [$replicaId] contains [$DefaultSeparator] which is a reserved character")
new PersistenceId(entityId + DefaultSeparator + replicaId)
}
}
/**

View file

@ -6,14 +6,13 @@ package akka.persistence.typed.internal
import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal
import org.slf4j.{ Logger, MDC }
import akka.actor.{ ActorRef, Cancellable }
import akka.actor.typed.Signal
import akka.actor.typed.scaladsl.ActorContext
import akka.annotation.InternalApi
import akka.persistence._
import akka.persistence.typed.scaladsl.EventSourcedBehavior.ActiveActive
import akka.persistence.typed.{ EventAdapter, PersistenceId, SnapshotAdapter }
import akka.persistence.typed.scaladsl.{ EventSourcedBehavior, RetentionCriteria }
import akka.util.OptionVal
@ -48,7 +47,8 @@ private[akka] final class BehaviorSetup[C, E, S](
val retention: RetentionCriteria,
var holdingRecoveryPermit: Boolean,
val settings: EventSourcedSettings,
val stashState: StashState) {
val stashState: StashState,
val activeActive: Option[ActiveActive]) {
import BehaviorSetup._
import InternalProtocol.RecoveryTickEvent

View file

@ -36,6 +36,7 @@ import akka.persistence.typed.SnapshotAdapter
import akka.persistence.typed.SnapshotCompleted
import akka.persistence.typed.SnapshotFailed
import akka.persistence.typed.SnapshotSelectionCriteria
import akka.persistence.typed.scaladsl.EventSourcedBehavior.ActiveActive
import akka.persistence.typed.scaladsl._
import akka.persistence.typed.scaladsl.{ Recovery => TypedRecovery }
import akka.persistence.typed.scaladsl.RetentionCriteria
@ -87,7 +88,8 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
recovery: Recovery = Recovery(),
retention: RetentionCriteria = RetentionCriteria.disabled,
supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop,
override val signalHandler: PartialFunction[(State, Signal), Unit] = PartialFunction.empty)
override val signalHandler: PartialFunction[(State, Signal), Unit] = PartialFunction.empty,
activeActive: Option[ActiveActive] = None)
extends EventSourcedBehavior[Command, Event, State] {
import EventSourcedBehaviorImpl.WriterIdentity
@ -150,7 +152,8 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
retention,
holdingRecoveryPermit = false,
settings = settings,
stashState = stashState)
stashState = stashState,
activeActive = activeActive)
// needs to accept Any since we also can get messages from the journal
// not part of the user facing Command protocol
@ -237,6 +240,14 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
override def withRecovery(recovery: TypedRecovery): EventSourcedBehavior[Command, Event, State] = {
copy(recovery = recovery.toClassic)
}
override private[akka] def withActiveActive(
context: ActiveActiveContext,
id: String,
allIds: Set[String],
queryPluginId: String): EventSourcedBehavior[Command, Event, State] = {
copy(activeActive = Some(ActiveActive(id, allIds, context, queryPluginId)))
}
}
/** Protocol used internally by the eventsourced behaviors. */
@ -247,4 +258,10 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
final case class SnapshotterResponse(msg: akka.persistence.SnapshotProtocol.Response) extends InternalProtocol
final case class RecoveryTickEvent(snapshot: Boolean) extends InternalProtocol
final case class IncomingCommand[C](c: C) extends InternalProtocol
final case class ReplicatedEventEnvelope[E](event: ReplicatedEvent[E], ack: ActorRef[ReplicatedEventAck.type])
extends InternalProtocol
}
final case class ReplicatedEvent[E](event: E, originReplica: String, originSequenceNr: Long)
case object ReplicatedEventAck

View file

@ -27,32 +27,34 @@ private[akka] trait JournalInteractions[C, E, S] {
def setup: BehaviorSetup[C, E, S]
type EventOrTagged = Any // `Any` since can be `E` or `Tagged`
type EventOrTaggedOrReplicated = Any // `Any` since can be `E` or `Tagged` or a `ReplicatedEvent`
protected def internalPersist(
ctx: ActorContext[_],
cmd: Any,
state: Running.RunningState[S],
event: EventOrTagged,
event: EventOrTaggedOrReplicated,
eventAdapterManifest: String): Running.RunningState[S] = {
val newState = state.nextSequenceNr()
val newRunningState = state.nextSequenceNr()
val repr = PersistentRepr(
event,
persistenceId = setup.persistenceId.id,
sequenceNr = newState.seqNr,
sequenceNr = newRunningState.seqNr,
manifest = eventAdapterManifest,
writerUuid = setup.writerIdentity.writerUuid,
sender = ActorRef.noSender)
// FIXME check cinnamon is okay with this being null
// https://github.com/akka/akka/issues/29262
onWriteInitiated(ctx, cmd, repr)
val write = AtomicWrite(repr) :: Nil
setup.journal
.tell(JournalProtocol.WriteMessages(write, setup.selfClassic, setup.writerIdentity.instanceId), setup.selfClassic)
newState
newRunningState
}
@InternalStableApi
@ -65,7 +67,7 @@ private[akka] trait JournalInteractions[C, E, S] {
ctx: ActorContext[_],
cmd: Any,
state: Running.RunningState[S],
events: immutable.Seq[(EventOrTagged, String)]): Running.RunningState[S] = {
events: immutable.Seq[(EventOrTaggedOrReplicated, String)]): Running.RunningState[S] = {
if (events.nonEmpty) {
var newState = state

View file

@ -87,11 +87,14 @@ private[akka] final class ReplayingEvents[C, E, S](
()
override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = {
// FIXME deal with a replicated event and ack
// https://github.com/akka/akka/issues/29256
msg match {
case JournalResponse(r) => onJournalResponse(r)
case SnapshotterResponse(r) => onSnapshotterResponse(r)
case RecoveryTickEvent(snap) => onRecoveryTick(snap)
case cmd: IncomingCommand[C] => onCommand(cmd)
case evt: ReplicatedEventEnvelope[E] => onInternalCommand(evt)
case cmd: IncomingCommand[C] => onInternalCommand(cmd)
case get: GetState[S @unchecked] => stashInternal(get)
case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit
}
@ -154,7 +157,7 @@ private[akka] final class ReplayingEvents[C, E, S](
}
}
private def onCommand(cmd: InternalProtocol): Behavior[InternalProtocol] = {
private def onInternalCommand(cmd: InternalProtocol): Behavior[InternalProtocol] = {
// during recovery, stash all incoming commands
if (state.receivedPoisonPill) {
if (setup.settings.logOnStashing)
@ -236,8 +239,12 @@ private[akka] final class ReplayingEvents[C, E, S](
if (state.receivedPoisonPill && isInternalStashEmpty && !isUnstashAllInProgress)
Behaviors.stopped
else {
val seenPerReplica: Map[String, Long] =
setup.activeActive.map(aa => aa.allReplicas.map(replica => replica -> 0L).toMap).getOrElse(Map.empty)
val running =
Running[C, E, S](setup, Running.RunningState[S](state.seqNr, state.state, state.receivedPoisonPill))
Running[C, E, S](
setup,
Running.RunningState[S](state.seqNr, state.state, state.receivedPoisonPill, seenPerReplica))
tryUnstashOne(running)
}

View file

@ -60,6 +60,7 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
case SnapshotterResponse(r) => onSnapshotterResponse(r, receivedPoisonPill)
case JournalResponse(r) => onJournalResponse(r)
case RecoveryTickEvent(snapshot) => onRecoveryTick(snapshot)
case evt: ReplicatedEventEnvelope[E] => onReplicatedEvent(evt)
case cmd: IncomingCommand[C] =>
if (receivedPoisonPill) {
if (setup.settings.logOnStashing)
@ -122,6 +123,10 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
stashInternal(cmd)
}
def onReplicatedEvent(evt: InternalProtocol.ReplicatedEventEnvelope[E]): Behavior[InternalProtocol] = {
stashInternal(evt)
}
def onJournalResponse(response: JournalProtocol.Response): Behavior[InternalProtocol] = {
setup.log.debug(
"Unexpected response from journal: [{}], may be due to an actor restart, ignoring...",

View file

@ -6,10 +6,8 @@ package akka.persistence.typed.internal
import scala.annotation.tailrec
import scala.collection.immutable
import akka.actor.UnhandledMessage
import akka.actor.typed.Behavior
import akka.actor.typed.Signal
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Signal }
import akka.actor.typed.internal.PoisonPill
import akka.actor.typed.scaladsl.{ AbstractBehavior, ActorContext, Behaviors, LoggerOps }
import akka.annotation.{ InternalApi, InternalStableApi }
@ -26,20 +24,30 @@ import akka.persistence.SaveSnapshotFailure
import akka.persistence.SaveSnapshotSuccess
import akka.persistence.SnapshotProtocol
import akka.persistence.journal.Tagged
import akka.persistence.typed.DeleteEventsCompleted
import akka.persistence.typed.DeleteEventsFailed
import akka.persistence.typed.DeleteSnapshotsCompleted
import akka.persistence.typed.DeleteSnapshotsFailed
import akka.persistence.typed.DeletionTarget
import akka.persistence.typed.EventRejectedException
import akka.persistence.typed.SnapshotCompleted
import akka.persistence.typed.SnapshotFailed
import akka.persistence.typed.SnapshotMetadata
import akka.persistence.typed.SnapshotSelectionCriteria
import akka.persistence.query.{ EventEnvelope, PersistenceQuery }
import akka.persistence.query.scaladsl.EventsByPersistenceIdQuery
import akka.persistence.typed.{
DeleteEventsCompleted,
DeleteEventsFailed,
DeleteSnapshotsCompleted,
DeleteSnapshotsFailed,
DeletionTarget,
EventRejectedException,
PersistenceId,
SnapshotCompleted,
SnapshotFailed,
SnapshotMetadata,
SnapshotSelectionCriteria
}
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.GetState
import akka.persistence.typed.internal.InternalProtocol.ReplicatedEventEnvelope
import akka.persistence.typed.internal.Running.WithSeqNrAccessible
import akka.persistence.typed.scaladsl.Effect
import akka.util.unused
import akka.persistence.typed.scaladsl.EventSourcedBehavior.ActiveActive
import akka.stream.{ SharedKillSwitch, SystemMaterializer }
import akka.stream.scaladsl.{ RestartSource, Sink }
import akka.stream.typed.scaladsl.ActorFlow
import akka.util.{ unused, Timeout }
/**
* INTERNAL API
@ -66,7 +74,12 @@ private[akka] object Running {
def currentSequenceNumber: Long
}
final case class RunningState[State](seqNr: Long, state: State, receivedPoisonPill: Boolean) {
final case class RunningState[State](
seqNr: Long,
state: State,
receivedPoisonPill: Boolean,
seenPerReplica: Map[String, Long],
replicationKillSwitch: Option[SharedKillSwitch] = None) {
def nextSequenceNr(): RunningState[State] =
copy(seqNr = seqNr + 1)
@ -82,8 +95,40 @@ private[akka] object Running {
def apply[C, E, S](setup: BehaviorSetup[C, E, S], state: RunningState[S]): Behavior[InternalProtocol] = {
val running = new Running(setup.setMdcPhase(PersistenceMdc.RunningCmds))
setup.activeActive.foreach(aa => startReplicationStream(setup.context.system, setup.context.self, state, aa))
new running.HandlingCommands(state)
}
def startReplicationStream[E, S](
system: ActorSystem[_],
ref: ActorRef[InternalProtocol],
state: RunningState[S],
aa: ActiveActive): Unit = {
import scala.concurrent.duration._
val query = PersistenceQuery(system)
aa.allReplicas.foreach { replica =>
if (replica != aa.replicaId) {
val seqNr = state.seenPerReplica(replica)
val pid = PersistenceId.replicatedUniqueId(aa.aaContext.entityId, replica)
// FIXME support different configuration per replica https://github.com/akka/akka/issues/29257
val replication = query.readJournalFor[EventsByPersistenceIdQuery](aa.queryPluginId)
implicit val timeout = Timeout(30.seconds)
val source = RestartSource.withBackoff(2.seconds, 10.seconds, randomFactor = 0.2) { () =>
replication
.eventsByPersistenceId(pid.id, seqNr + 1, Long.MaxValue)
.via(ActorFlow.ask[EventEnvelope, ReplicatedEventEnvelope[E], ReplicatedEventAck.type](ref) {
(eventEnvelope, replyTo) =>
ReplicatedEventEnvelope(eventEnvelope.event.asInstanceOf[ReplicatedEvent[E]], replyTo)
})
}
source.runWith(Sink.ignore)(SystemMaterializer(system).materializer)
}
}
}
}
// ===============================================
@ -106,8 +151,13 @@ private[akka] object Running {
_currentSequenceNumber = state.seqNr
private def alreadySeen(e: ReplicatedEvent[_]): Boolean = {
e.originSequenceNr <= state.seenPerReplica.getOrElse(e.originReplica, 0L)
}
def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match {
case IncomingCommand(c: C @unchecked) => onCommand(state, c)
case re: ReplicatedEventEnvelope[E @unchecked] => onReplicatedEvent(state, re)
case JournalResponse(r) => onDeleteEventsJournalResponse(r, state.state)
case SnapshotterResponse(r) => onDeleteSnapshotResponse(r, state.state)
case get: GetState[S @unchecked] => onGetState(get)
@ -130,12 +180,45 @@ private[akka] object Running {
else next
}
def onReplicatedEvent(
state: Running.RunningState[S],
envelope: ReplicatedEventEnvelope[E]): Behavior[InternalProtocol] = {
// FIXME set the details on the context https://github.com/akka/akka/issues/29258
setup.log.infoN(
"Replica {} received replicated event. Replica seqs nrs: {}",
setup.activeActive,
state.seenPerReplica)
envelope.ack ! ReplicatedEventAck
if (envelope.event.originReplica != setup.activeActive.get.replicaId && !alreadySeen(envelope.event)) {
setup.log.info("Saving event as first time")
handleReplicatedEventPersist(envelope.event)
} else {
setup.log.info("Filtering event as already seen")
this
}
}
// Used by EventSourcedBehaviorTestKit to retrieve the state.
def onGetState(get: GetState[S]): Behavior[InternalProtocol] = {
get.replyTo ! state.state
this
}
private def handleReplicatedEventPersist(event: ReplicatedEvent[E]): Behavior[InternalProtocol] = {
_currentSequenceNumber = state.seqNr + 1
val newState: RunningState[S] = state.applyEvent(setup, event.event)
val newState2: RunningState[S] = internalPersist(setup.context, null, newState, event, "")
val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event.event, newState2.seqNr)
// FIXME validate this is the correct sequence nr from that replica https://github.com/akka/akka/issues/29259
val updatedSeen = newState2.seenPerReplica.updated(event.originReplica, event.originSequenceNr)
persistingEvents(
newState2.copy(seenPerReplica = updatedSeen),
state,
numberOfEvents = 1,
shouldSnapshotAfterPersist,
Nil)
}
@tailrec def applyEffects(
msg: Any,
state: RunningState[S],
@ -162,8 +245,13 @@ private[akka] object Running {
val eventToPersist = adaptEvent(event)
val eventAdapterManifest = setup.eventAdapter.manifest(event)
val newState2 = internalPersist(setup.context, msg, newState, eventToPersist, eventAdapterManifest)
val newState2 = setup.activeActive match {
case Some(aa) =>
val replicatedEvent = ReplicatedEvent(eventToPersist, aa.replicaId, _currentSequenceNumber)
internalPersist(setup.context, msg, newState, replicatedEvent, eventAdapterManifest)
case None =>
internalPersist(setup.context, msg, newState, eventToPersist, eventAdapterManifest)
}
val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event, newState2.seqNr)
@ -254,6 +342,7 @@ private[akka] object Running {
msg match {
case JournalResponse(r) => onJournalResponse(r)
case in: IncomingCommand[C @unchecked] => onCommand(in)
case re: ReplicatedEventEnvelope[E @unchecked] => onReplicatedEvent(re)
case get: GetState[S @unchecked] => stashInternal(get)
case SnapshotterResponse(r) => onDeleteSnapshotResponse(r, visibleState.state)
case RecoveryTickEvent(_) => Behaviors.unhandled
@ -271,6 +360,14 @@ private[akka] object Running {
}
}
def onReplicatedEvent(event: InternalProtocol.ReplicatedEventEnvelope[E]): Behavior[InternalProtocol] = {
if (state.receivedPoisonPill) {
Behaviors.unhandled
} else {
stashInternal(event)
}
}
final def onJournalResponse(response: Response): Behavior[InternalProtocol] = {
if (setup.log.isDebugEnabled) {
setup.log.debug2(

View file

@ -0,0 +1,115 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.scaladsl
import akka.persistence.typed.PersistenceId
/**
* Utility class for comparing timestamp and data center
* identifier when implementing last-writer wins.
*/
final case class LwwTime(timestamp: Long, originDc: String) {
/**
* Create a new `LwwTime` that has a `timestamp` that is
* `max` of the given timestamp and previous timestamp + 1,
* i.e. monotonically increasing.
*/
def increase(t: Long, replicaId: String): LwwTime =
LwwTime(math.max(timestamp + 1, t), replicaId)
/**
* Compare this `LwwTime` with the `other`.
* Greatest timestamp wins. If both timestamps are
* equal the `dc` identifiers are compared and the
* one sorted first in alphanumeric order wins.
*/
def isAfter(other: LwwTime): Boolean = {
if (timestamp > other.timestamp) true
else if (timestamp < other.timestamp) false
else if (other.originDc.compareTo(originDc) > 0) true
else false
}
}
trait ActiveActiveContext {
def timestamp: Long
def origin: String
def concurrent: Boolean
def replicaId: String
def allReplicas: Set[String]
def persistenceId: PersistenceId
def recoveryRunning: Boolean
def entityId: String
def currentTimeMillis(): Long
}
// FIXME, parts of this can be set during initialisation
// Other fields will be set before executing the event handler as they change per event
// https://github.com/akka/akka/issues/29258
private[akka] class ActiveActiveContextImpl(val entityId: String, val replicaId: String, val allReplicas: Set[String])
extends ActiveActiveContext {
var _timestamp: Long = -1
var _origin: String = null
var _concurrent: Boolean = false
// FIXME check illegal access https://github.com/akka/akka/issues/29264
/**
* The timestamp of the event. Always increases per data center
* Undefined result if called from any where other than an event handler.
*/
override def timestamp: Long = _timestamp
/**
* The origin of the current event.
* Undefined result if called from anywhere other than an event handler.
*/
override def origin: String = _origin
/**
* Whether the happened concurrently with an event from another replica.
* Undefined result if called from any where other than an event handler.
*/
override def concurrent: Boolean = _concurrent
override def persistenceId: PersistenceId = PersistenceId.replicatedUniqueId(entityId, replicaId)
override def currentTimeMillis(): Long = {
// FIXME always increasing
System.currentTimeMillis()
}
override def recoveryRunning: Boolean = false
}
object ActiveActiveEventSourcing {
/**
* Initialize a replicated event sourced behavior.
*
* Events from each replica for the same entityId will be replicated to every copy.
* Care must be taken to handle events in any order as events can happen concurrently at different replicas.
*
* Using an replicated event sourced behavior means there is no longer the single writer guarantee.
*
* A different journal plugin id can be configured using withJournalPluginId after creation. Different databases
* can be used for each replica.
* The events from other replicas are read using PersistentQuery.
* TODO support a different query plugin id per replicas: https://github.com/akka/akka/issues/29257
*
* @param replicaId The unique identity for this entity. The underlying persistence id will include the replica.
* @param allReplicaIds All replica ids. These need to be known to receive events from all replicas.
* @param queryPluginId Used to read the events from other replicas. Must be the query side of your configured journal plugin.
* @return
*/
def apply[Command, Event, State](
entityId: String,
replicaId: String,
allReplicaIds: Set[String],
queryPluginId: String)(activeActiveContext: ActiveActiveContext => EventSourcedBehavior[Command, Event, State])
: EventSourcedBehavior[Command, Event, State] = {
val context = new ActiveActiveContextImpl(entityId, replicaId, allReplicaIds)
activeActiveContext(context).withActiveActive(context, replicaId, allReplicaIds, queryPluginId)
}
}

View file

@ -5,7 +5,6 @@
package akka.persistence.typed.scaladsl
import scala.annotation.tailrec
import akka.actor.typed.BackoffSupervisorStrategy
import akka.actor.typed.Behavior
import akka.actor.typed.Signal
@ -13,7 +12,7 @@ import akka.actor.typed.internal.BehaviorImpl.DeferredBehavior
import akka.actor.typed.internal.InterceptorImpl
import akka.actor.typed.internal.LoggerClass
import akka.actor.typed.scaladsl.ActorContext
import akka.annotation.DoNotInherit
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.SnapshotAdapter
@ -22,6 +21,13 @@ import akka.persistence.typed.internal._
object EventSourcedBehavior {
@InternalApi
private[akka] case class ActiveActive(
replicaId: String,
allReplicas: Set[String],
aaContext: ActiveActiveContext,
queryPluginId: String)
/**
* Type alias for the command handler function that defines how to act on commands.
*
@ -145,6 +151,12 @@ object EventSourcedBehavior {
*/
def withJournalPluginId(id: String): EventSourcedBehavior[Command, Event, State]
private[akka] def withActiveActive(
context: ActiveActiveContext,
replicaId: String,
allReplicaIds: Set[String],
queryPluginId: String): EventSourcedBehavior[Command, Event, State]
/**
* Change the snapshot store plugin id that this actor should use.
*/

View file

@ -5,7 +5,7 @@
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} %-5level %logger %marker - %msg MDC: {%mdc}%n</pattern>
<pattern>%date{ISO8601} [%-5level] [%logger] [%marker] [%X{akkaSource}] [%X{persistenceId}] - %msg %n</pattern>
</encoder>
</appender>

View file

@ -66,7 +66,8 @@ class EventSourcedBehaviorWatchSpec
RetentionCriteria.disabled,
holdingRecoveryPermit = false,
settings = settings,
stashState = new StashState(context.asInstanceOf[ActorContext[InternalProtocol]], settings))
stashState = new StashState(context.asInstanceOf[ActorContext[InternalProtocol]], settings),
None)
"A typed persistent parent actor watching a child" must {

View file

@ -0,0 +1,270 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.persistence.typed.aa
import java.time.Instant
import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit }
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, _ }
import akka.actor.typed.{ ActorRef, Behavior }
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.typed.scaladsl.{ ActiveActiveContext, ActiveActiveEventSourcing, Effect, EventSourcedBehavior }
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.{ Eventually, ScalaFutures }
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
object AAAuctionExampleSpec {
val config = ConfigFactory.parseString("""
akka.actor.provider = cluster
akka.loglevel = info
akka.persistence {
journal {
plugin = "akka.persistence.journal.inmem"
}
}
""")
type MoneyAmount = Int
case class Bid(bidder: String, offer: MoneyAmount, timestamp: Instant, originDc: String)
// commands
sealed trait AuctionCommand
case object Finish extends AuctionCommand // A timer needs to schedule this event at each replica
final case class OfferBid(bidder: String, offer: MoneyAmount) extends AuctionCommand
final case class GetHighestBid(replyTo: ActorRef[Bid]) extends AuctionCommand
final case class IsClosed(replyTo: ActorRef[Boolean]) extends AuctionCommand
private final case object Close extends AuctionCommand // Internal, should not be sent from the outside
sealed trait AuctionEvent
final case class BidRegistered(bid: Bid) extends AuctionEvent
final case class AuctionFinished(atDc: String) extends AuctionEvent
final case class WinnerDecided(atDc: String, winningBid: Bid, highestCounterOffer: MoneyAmount) extends AuctionEvent
sealed trait AuctionPhase
case object Running extends AuctionPhase
final case class Closing(finishedAtDc: Set[String]) extends AuctionPhase
case object Closed extends AuctionPhase
case class AuctionState(
phase: AuctionPhase,
highestBid: Bid,
highestCounterOffer: MoneyAmount // in ebay style auctions, we need to keep track of current highest counter offer
) {
def applyEvent(event: AuctionEvent): AuctionState =
event match {
case BidRegistered(b) =>
if (isHigherBid(b, highestBid))
withNewHighestBid(b)
else
withTooLowBid(b)
case AuctionFinished(atDc) =>
phase match {
case Running =>
copy(phase = Closing(Set(atDc)))
case Closing(alreadyFinishedDcs) =>
copy(phase = Closing(alreadyFinishedDcs + atDc))
case _ =>
this
}
case _: WinnerDecided =>
copy(phase = Closed)
}
def withNewHighestBid(bid: Bid): AuctionState = {
require(phase != Closed)
require(isHigherBid(bid, highestBid))
copy(highestBid = bid, highestCounterOffer = highestBid.offer // keep last highest bid around
)
}
def withTooLowBid(bid: Bid): AuctionState = {
require(phase != Closed)
require(isHigherBid(highestBid, bid))
copy(highestCounterOffer = highestCounterOffer.max(bid.offer)) // update highest counter offer
}
def isHigherBid(first: Bid, second: Bid): Boolean =
first.offer > second.offer ||
(first.offer == second.offer && first.timestamp.isBefore(second.timestamp)) || // if equal, first one wins
// If timestamps are equal, choose by dc where the offer was submitted
// In real auctions, this last comparison should be deterministic but unpredictable, so that submitting to a
// particular DC would not be an advantage.
(first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originDc.compareTo(
second.originDc) < 0)
}
case class AuctionSetup(
name: String,
initialBid: Bid, // the initial bid is basically the minimum price bidden at start time by the owner
closingAt: Instant,
responsibleForClosing: Boolean,
allDcs: Set[String])
def commandHandler(setup: AuctionSetup, ctx: ActorContext[AuctionCommand], aaContext: ActiveActiveContext)(
state: AuctionState,
command: AuctionCommand): Effect[AuctionEvent, AuctionState] = {
state.phase match {
case Closing(_) | Closed =>
command match {
case GetHighestBid(replyTo) =>
replyTo ! state.highestBid
Effect.none
case IsClosed(replyTo) =>
replyTo ! (state.phase == Closed)
Effect.none
case Finish =>
ctx.log.info("Finish")
Effect.persist(AuctionFinished(aaContext.replicaId))
case Close =>
ctx.log.info("Close")
require(shouldClose(setup, state))
// TODO send email (before or after persisting)
Effect.persist(WinnerDecided(aaContext.replicaId, state.highestBid, state.highestCounterOffer))
case _: OfferBid =>
// auction finished, no more bids accepted
Effect.unhandled
}
case Running =>
command match {
case OfferBid(bidder, offer) =>
Effect.persist(
BidRegistered(
Bid(bidder, offer, Instant.ofEpochMilli(aaContext.currentTimeMillis()), aaContext.replicaId)))
case GetHighestBid(replyTo) =>
replyTo ! state.highestBid
Effect.none
case Finish =>
Effect.persist(AuctionFinished(aaContext.replicaId))
case Close =>
ctx.log.warn("Premature close")
// Close should only be triggered when we have already finished
Effect.unhandled
case IsClosed(replyTo) =>
replyTo ! false
Effect.none
}
}
}
private def shouldClose(auctionSetup: AuctionSetup, state: AuctionState): Boolean = {
auctionSetup.responsibleForClosing && (state.phase match {
case Closing(alreadyFinishedAtDc) =>
val allDone = auctionSetup.allDcs.diff(alreadyFinishedAtDc).isEmpty
if (!allDone) {
println(
s"Not closing auction as not all DCs have reported finished. All DCs: ${auctionSetup.allDcs}. Reported finished ${alreadyFinishedAtDc}")
}
allDone
case _ =>
false
})
}
def eventHandler(ctx: ActorContext[AuctionCommand], aaCtx: ActiveActiveContext, setup: AuctionSetup)(
state: AuctionState,
event: AuctionEvent): AuctionState = {
val newState = state.applyEvent(event)
ctx.log.infoN("Applying event {}. New start {}", event, newState)
if (!aaCtx.recoveryRunning) {
eventTriggers(setup, ctx, aaCtx, event, newState)
}
newState
}
private def eventTriggers(
setup: AuctionSetup,
ctx: ActorContext[AuctionCommand],
aaCtx: ActiveActiveContext,
event: AuctionEvent,
newState: AuctionState) = {
event match {
case finished: AuctionFinished =>
newState.phase match {
case Closing(alreadyFinishedAtDc) =>
ctx.log.infoN(
"AuctionFinished at {}, already finished at [{}]",
finished.atDc,
alreadyFinishedAtDc.mkString(", "))
if (alreadyFinishedAtDc(aaCtx.replicaId)) {
if (shouldClose(setup, newState)) ctx.self ! Close
} else {
ctx.log.info("Sending finish to self")
ctx.self ! Finish
}
case _ => // no trigger for this state
}
case _ => // no trigger for this event
}
}
def initialState(setup: AuctionSetup) =
AuctionState(phase = Running, highestBid = setup.initialBid, highestCounterOffer = setup.initialBid.offer)
def behavior(replica: String, setup: AuctionSetup): Behavior[AuctionCommand] = Behaviors.setup[AuctionCommand] {
ctx =>
ActiveActiveEventSourcing(setup.name, replica, setup.allDcs, LeveldbReadJournal.Identifier) { aaCtx =>
EventSourcedBehavior(
aaCtx.persistenceId,
initialState(setup),
commandHandler(setup, ctx, aaCtx),
eventHandler(ctx, aaCtx, setup))
}
}
}
class AAAuctionExampleSpec
extends ScalaTestWithActorTestKit(AAAuctionExampleSpec.config)
with AnyWordSpecLike
with Matchers
with LogCapturing
with ScalaFutures
with Eventually {
import AAAuctionExampleSpec._
"Auction example" should {
"work" in {
val Replicas = Set("DC-A", "DC-B")
val setupA =
AuctionSetup(
"old-skis",
Bid("chbatey", 12, Instant.now(), "DC-A"),
Instant.now().plusSeconds(10),
responsibleForClosing = true,
Replicas)
val setupB = setupA.copy(responsibleForClosing = false)
val dcAReplica: ActorRef[AuctionCommand] = spawn(behavior("DC-A", setupA))
val dcBReplica: ActorRef[AuctionCommand] = spawn(behavior("DC-B", setupB))
dcAReplica ! OfferBid("me", 100)
dcAReplica ! OfferBid("me", 99)
dcAReplica ! OfferBid("me", 202)
eventually {
val replyProbe = createTestProbe[Bid]()
dcAReplica ! GetHighestBid(replyProbe.ref)
val bid = replyProbe.expectMessageType[Bid]
bid.offer shouldEqual 202
}
dcAReplica ! Finish
eventually {
val finishProbe = createTestProbe[Boolean]()
dcAReplica ! IsClosed(finishProbe.ref)
finishProbe.expectMessage(true)
}
eventually {
val finishProbe = createTestProbe[Boolean]()
dcBReplica ! IsClosed(finishProbe.ref)
finishProbe.expectMessage(true)
}
}
}
}

View file

@ -0,0 +1,162 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.persistence.typed.aa
import java.util.UUID
import akka.Done
import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit }
import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.typed.scaladsl._
import akka.serialization.jackson.CborSerializable
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.{ Eventually, ScalaFutures }
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.{ Millis, Span }
import org.scalatest.wordspec.AnyWordSpecLike
object AABlogExampleSpec {
val config =
ConfigFactory.parseString(s"""
akka.actor.allow-java-serialization = true
// FIXME serializers for replicated event or akka persistence support for metadata: https://github.com/akka/akka/issues/29260
akka.actor.provider = cluster
akka.loglevel = debug
akka.persistence {
journal {
plugin = "akka.persistence.journal.leveldb"
leveldb {
native = off
dir = "target/journal-AABlogExampleSpec-${UUID.randomUUID()}"
}
}
}
""")
final case class BlogState(content: Option[PostContent], contentTimestamp: LwwTime, published: Boolean) {
def withContent(newContent: PostContent, timestamp: LwwTime): BlogState =
copy(content = Some(newContent), contentTimestamp = timestamp)
def isEmpty: Boolean = content.isEmpty
}
val emptyState: BlogState = BlogState(None, LwwTime(Long.MinValue, ""), published = false)
final case class PostContent(title: String, body: String)
final case class PostSummary(postId: String, title: String)
final case class Published(postId: String) extends BlogEvent
sealed trait BlogCommand
final case class AddPost(postId: String, content: PostContent, replyTo: ActorRef[AddPostDone]) extends BlogCommand
final case class AddPostDone(postId: String)
final case class GetPost(postId: String, replyTo: ActorRef[PostContent]) extends BlogCommand
final case class ChangeBody(postId: String, newContent: PostContent, replyTo: ActorRef[Done]) extends BlogCommand
final case class Publish(postId: String, replyTo: ActorRef[Done]) extends BlogCommand
sealed trait BlogEvent extends CborSerializable
final case class PostAdded(postId: String, content: PostContent, timestamp: LwwTime) extends BlogEvent
final case class BodyChanged(postId: String, newContent: PostContent, timestamp: LwwTime) extends BlogEvent
}
class AABlogExampleSpec
extends ScalaTestWithActorTestKit(AABlogExampleSpec.config)
with AnyWordSpecLike
with Matchers
with LogCapturing
with ScalaFutures
with Eventually {
import AABlogExampleSpec._
implicit val config: PatienceConfig = PatienceConfig(timeout = Span(timeout.duration.toMillis, Millis))
def behavior(aa: ActiveActiveContext, ctx: ActorContext[BlogCommand]) =
EventSourcedBehavior[BlogCommand, BlogEvent, BlogState](
aa.persistenceId,
emptyState,
(state, cmd) =>
cmd match {
case AddPost(_, content, replyTo) =>
val evt =
PostAdded(aa.persistenceId.id, content, state.contentTimestamp.increase(aa.timestamp, aa.replicaId))
Effect.persist(evt).thenRun { _ =>
replyTo ! AddPostDone(aa.entityId)
}
case ChangeBody(_, newContent, replyTo) =>
val evt =
BodyChanged(aa.persistenceId.id, newContent, state.contentTimestamp.increase(aa.timestamp, aa.replicaId))
Effect.persist(evt).thenRun { _ =>
replyTo ! Done
}
case p: Publish =>
Effect.persist(Published("id")).thenRun { _ =>
p.replyTo ! Done
}
case gp: GetPost =>
ctx.log.info("GetPost {}", state.content)
state.content.foreach(content => gp.replyTo ! content)
Effect.none
},
(state, event) => {
ctx.log.info(s"${aa.entityId}:${aa.replicaId} Received event $event")
event match {
case PostAdded(_, content, timestamp) =>
if (timestamp.isAfter(state.contentTimestamp)) {
val s = state.withContent(content, timestamp)
ctx.log.info("Updating content. New content is {}", s)
s
} else {
ctx.log.info("Ignoring event as timestamp is older")
state
}
case BodyChanged(_, newContent, timestamp) =>
if (timestamp.isAfter(state.contentTimestamp))
state.withContent(newContent, timestamp)
else state
case Published(_) =>
state.copy(published = true)
}
})
"Blog Example" should {
"work" in {
val refDcA: ActorRef[BlogCommand] =
spawn(Behaviors.setup[BlogCommand] { ctx =>
ActiveActiveEventSourcing("cat", "DC-A", Set("DC-A", "DC-B"), LeveldbReadJournal.Identifier) {
(aa: ActiveActiveContext) =>
behavior(aa, ctx)
}
}, "dc-a")
val refDcB: ActorRef[BlogCommand] =
spawn(Behaviors.setup[BlogCommand] { ctx =>
ActiveActiveEventSourcing("cat", "DC-B", Set("DC-A", "DC-B"), LeveldbReadJournal.Identifier) {
(aa: ActiveActiveContext) =>
behavior(aa, ctx)
}
}, "dc-b")
import akka.actor.typed.scaladsl.AskPattern._
import akka.util.Timeout
import scala.concurrent.duration._
implicit val timeout: Timeout = 3.seconds
val content = PostContent("cats are the bets", "yep")
val response =
refDcA.ask[AddPostDone](replyTo => AddPost("cat", content, replyTo)).futureValue
response shouldEqual AddPostDone("cat")
eventually {
refDcA.ask[PostContent](replyTo => GetPost("cat", replyTo)).futureValue shouldEqual content
}
eventually {
refDcB.ask[PostContent](replyTo => GetPost("cat", replyTo)).futureValue shouldEqual content
}
}
}
}

View file

@ -453,8 +453,9 @@ lazy val actorTyped = akkaModule("akka-actor-typed")
lazy val persistenceTyped = akkaModule("akka-persistence-typed")
.dependsOn(
actorTyped,
streamTyped,
persistence % "compile->compile;test->test",
persistenceQuery % "test",
persistenceQuery,
actorTestkitTyped % "test->test",
clusterTyped % "test->test",
actorTestkitTyped % "test->test",

View file

@ -255,7 +255,7 @@ object Dependencies {
val persistenceTestKit = l ++= Seq(Test.scalatest, Test.logback)
val persistenceShared = l ++= Seq(Provided.levelDB, Provided.levelDBNative)
val persistenceShared = l ++= Seq(Provided.levelDB, Provided.levelDBNative, Test.logback)
val jackson = l ++= Seq(
jacksonCore,