Make sure to gracefully stop replication streams when aa actor stops #29406

This commit is contained in:
Johan Andrén 2020-07-22 09:22:38 +02:00 committed by Christopher Batey
parent 9830988566
commit a563a9b69e
5 changed files with 41 additions and 66 deletions

View file

@ -141,9 +141,9 @@ Sometimes it is enough to use timestamps to decide which update should win. Such
![images/lww.png](images/lww.png)
There is a small utility class @apidoc[akka.persistence.typed.LwwTime] that can be useful for implementing last writer wins semantics.
There is a small utility class @apidoc[LwwTime] that can be useful for implementing last writer wins semantics.
It contains a timestamp representing current time when the event was persisted and an identifier of the
replica that persisted it. When comparing two @apidoc[akka.persistence.typed.LwwTime] the greatest timestamp wins. The replica
replica that persisted it. When comparing two @apidoc[LwwTime] the greatest timestamp wins. The replica
identifier is used if the two timestamps are equal, and then the one from the data center sorted first in
alphanumeric order wins.

View file

@ -13,15 +13,14 @@ public class ActiveActiveCompileOnlyTest {
// dummy for docs example
interface Command {}
interface Event {}
interface State {}
static // #factory
final class MyActiceActiveEventSourcedBehavior
extends ActiveActiveEventSourcedBehavior<
Command,
Event,
State> {
extends ActiveActiveEventSourcedBehavior<Command, Event, State> {
public MyActiceActiveEventSourcedBehavior(ActiveActiveContext activeActiveContext) {
super(activeActiveContext);
@ -74,7 +73,8 @@ public class ActiveActiveCompileOnlyTest {
allReplicasAndQueryPlugins.put(DCA, "journalForDCA");
allReplicasAndQueryPlugins.put(DCB, "journalForDCB");
EventSourcedBehavior<Command, Event, State> behavior = ActiveActiveEventSourcing.create(
EventSourcedBehavior<Command, Event, State> behavior =
ActiveActiveEventSourcing.create(
"entityId",
DCA,
allReplicasAndQueryPlugins,

View file

@ -5,19 +5,18 @@
package docs.akka.persistence.typed
import akka.Done
import akka.actor.testkit.typed.scaladsl.{LogCapturing, ScalaTestWithActorTestKit}
import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit }
import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.LwwTime
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.crdt.LwwTime
import akka.persistence.typed.scaladsl._
import akka.serialization.jackson.CborSerializable
import org.scalatest.concurrent.{Eventually, ScalaFutures}
import org.scalatest.concurrent.{ Eventually, ScalaFutures }
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.{Millis, Span}
import org.scalatest.time.{ Millis, Span }
import org.scalatest.wordspec.AnyWordSpecLike
object AABlogExampleSpec {

View file

@ -1,33 +0,0 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed
/**
* Utility class for comparing timestamp and data center
* identifier when implementing last-writer wins.
*/
final case class LwwTime(timestamp: Long, originDc: ReplicaId) {
/**
* Create a new `LwwTime` that has a `timestamp` that is
* `max` of the given timestamp and previous timestamp + 1,
* i.e. monotonically increasing.
*/
def increase(t: Long, replicaId: ReplicaId): LwwTime =
LwwTime(math.max(timestamp + 1, t), replicaId)
/**
* Compare this `LwwTime` with the `other`.
* Greatest timestamp wins. If both timestamps are
* equal the `dc` identifiers are compared and the
* one sorted first in alphanumeric order wins.
*/
def isAfter(other: LwwTime): Boolean = {
if (timestamp > other.timestamp) true
else if (timestamp < other.timestamp) false
else if (other.originDc.id.compareTo(originDc.id) > 0) true
else false
}
}

View file

@ -56,6 +56,8 @@ import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior.ActiveActive
import akka.stream.scaladsl.Keep
import akka.stream.SystemMaterializer
import akka.stream.WatchedActorTerminatedException
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.{ RestartSource, Sink }
import akka.stream.typed.scaladsl.ActorFlow
import akka.util.OptionVal
@ -136,27 +138,34 @@ private[akka] object Running {
val controlRef = new AtomicReference[ReplicationStreamControl]()
val source = RestartSource.withBackoff(2.seconds, 10.seconds, randomFactor = 0.2) { () =>
replication
.eventsByPersistenceId(pid.id, seqNr + 1, Long.MaxValue)
// from each replica, only get the events that originated there, this prevents most of the event filtering
// the downside is that events can't be received via other replicas in the event of an uneven network partition
.filter(_.eventMetadata.get.asInstanceOf[ReplicatedEventMetaData].originReplica == replicaId)
.viaMat(new FastForwardingFilter)(Keep.right)
.mapMaterializedValue(streamControl => controlRef.set(streamControl))
.via(ActorFlow.ask[EventEnvelope, ReplicatedEventEnvelope[E], ReplicatedEventAck.type](ref) {
(eventEnvelope, replyTo) =>
// Need to handle this not being available migration from non-active-active is supported
val meta = eventEnvelope.eventMetadata.get.asInstanceOf[ReplicatedEventMetaData]
val re =
ReplicatedEvent[E](
eventEnvelope.event.asInstanceOf[E],
meta.originReplica,
meta.originSequenceNr,
meta.version)
ReplicatedEventEnvelope(re, replyTo)
})
}
val source = RestartSource
.withBackoff(2.seconds, 10.seconds, randomFactor = 0.2) { () =>
replication
.eventsByPersistenceId(pid.id, seqNr + 1, Long.MaxValue)
// from each replica, only get the events that originated there, this prevents most of the event filtering
// the downside is that events can't be received via other replicas in the event of an uneven network partition
.filter(_.eventMetadata.get.asInstanceOf[ReplicatedEventMetaData].originReplica == replicaId)
.viaMat(new FastForwardingFilter)(Keep.right)
.mapMaterializedValue(streamControl => controlRef.set(streamControl))
}
// needs to be outside of the restart source so that it actually cancels when terminating the replica
.via(ActorFlow
.ask[EventEnvelope, ReplicatedEventEnvelope[E], ReplicatedEventAck.type](ref) { (eventEnvelope, replyTo) =>
// Need to handle this not being available migration from non-active-active is supported
val meta = eventEnvelope.eventMetadata.get.asInstanceOf[ReplicatedEventMetaData]
val re =
ReplicatedEvent[E](
eventEnvelope.event.asInstanceOf[E],
meta.originReplica,
meta.originSequenceNr,
meta.version)
ReplicatedEventEnvelope(re, replyTo)
}
.recoverWithRetries(1, {
// not a failure, the replica is stopping, complete the stream
case _: WatchedActorTerminatedException =>
Source.empty
}))
source.runWith(Sink.ignore)(SystemMaterializer(system).materializer)