From a563a9b69e9cd46061b841bfa18c9ab2de5e297a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Wed, 22 Jul 2020 09:22:38 +0200 Subject: [PATCH] Make sure to gracefully stop replication streams when aa actor stops #29406 --- .../typed/persistence-active-active.md | 4 +- .../typed/ActiveActiveCompileOnlyTest.java | 10 ++-- .../persistence/typed/AABlogExampleSpec.scala | 9 ++-- .../akka/persistence/typed/LwwTime.scala | 33 ------------ .../persistence/typed/internal/Running.scala | 51 +++++++++++-------- 5 files changed, 41 insertions(+), 66 deletions(-) delete mode 100644 akka-persistence-typed/src/main/scala/akka/persistence/typed/LwwTime.scala diff --git a/akka-docs/src/main/paradox/typed/persistence-active-active.md b/akka-docs/src/main/paradox/typed/persistence-active-active.md index 0f4da28a9c..bca97075c0 100644 --- a/akka-docs/src/main/paradox/typed/persistence-active-active.md +++ b/akka-docs/src/main/paradox/typed/persistence-active-active.md @@ -141,9 +141,9 @@ Sometimes it is enough to use timestamps to decide which update should win. Such ![images/lww.png](images/lww.png) -There is a small utility class @apidoc[akka.persistence.typed.LwwTime] that can be useful for implementing last writer wins semantics. +There is a small utility class @apidoc[LwwTime] that can be useful for implementing last writer wins semantics. It contains a timestamp representing current time when the event was persisted and an identifier of the -replica that persisted it. When comparing two @apidoc[akka.persistence.typed.LwwTime] the greatest timestamp wins. The replica +replica that persisted it. When comparing two @apidoc[LwwTime] the greatest timestamp wins. The replica identifier is used if the two timestamps are equal, and then the one from the data center sorted first in alphanumeric order wins. diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ActiveActiveCompileOnlyTest.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ActiveActiveCompileOnlyTest.java index 51085a48b3..4020954f9f 100644 --- a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ActiveActiveCompileOnlyTest.java +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ActiveActiveCompileOnlyTest.java @@ -13,15 +13,14 @@ public class ActiveActiveCompileOnlyTest { // dummy for docs example interface Command {} + interface Event {} + interface State {} static // #factory final class MyActiceActiveEventSourcedBehavior - extends ActiveActiveEventSourcedBehavior< - Command, - Event, - State> { + extends ActiveActiveEventSourcedBehavior { public MyActiceActiveEventSourcedBehavior(ActiveActiveContext activeActiveContext) { super(activeActiveContext); @@ -74,7 +73,8 @@ public class ActiveActiveCompileOnlyTest { allReplicasAndQueryPlugins.put(DCA, "journalForDCA"); allReplicasAndQueryPlugins.put(DCB, "journalForDCB"); - EventSourcedBehavior behavior = ActiveActiveEventSourcing.create( + EventSourcedBehavior behavior = + ActiveActiveEventSourcing.create( "entityId", DCA, allReplicasAndQueryPlugins, diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AABlogExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AABlogExampleSpec.scala index c3d7f4d365..f67fd19226 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AABlogExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AABlogExampleSpec.scala @@ -5,19 +5,18 @@ package docs.akka.persistence.typed import akka.Done -import akka.actor.testkit.typed.scaladsl.{LogCapturing, ScalaTestWithActorTestKit} +import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit } import akka.actor.typed.ActorRef -import akka.actor.typed.scaladsl.{ActorContext, Behaviors} +import akka.actor.typed.scaladsl.{ ActorContext, Behaviors } import akka.persistence.testkit.PersistenceTestKitPlugin import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal -import akka.persistence.typed.LwwTime import akka.persistence.typed.ReplicaId import akka.persistence.typed.crdt.LwwTime import akka.persistence.typed.scaladsl._ import akka.serialization.jackson.CborSerializable -import org.scalatest.concurrent.{Eventually, ScalaFutures} +import org.scalatest.concurrent.{ Eventually, ScalaFutures } import org.scalatest.matchers.should.Matchers -import org.scalatest.time.{Millis, Span} +import org.scalatest.time.{ Millis, Span } import org.scalatest.wordspec.AnyWordSpecLike object AABlogExampleSpec { diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/LwwTime.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/LwwTime.scala deleted file mode 100644 index 1342d8000c..0000000000 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/LwwTime.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (C) 2020 Lightbend Inc. - */ - -package akka.persistence.typed - -/** - * Utility class for comparing timestamp and data center - * identifier when implementing last-writer wins. - */ -final case class LwwTime(timestamp: Long, originDc: ReplicaId) { - - /** - * Create a new `LwwTime` that has a `timestamp` that is - * `max` of the given timestamp and previous timestamp + 1, - * i.e. monotonically increasing. - */ - def increase(t: Long, replicaId: ReplicaId): LwwTime = - LwwTime(math.max(timestamp + 1, t), replicaId) - - /** - * Compare this `LwwTime` with the `other`. - * Greatest timestamp wins. If both timestamps are - * equal the `dc` identifiers are compared and the - * one sorted first in alphanumeric order wins. - */ - def isAfter(other: LwwTime): Boolean = { - if (timestamp > other.timestamp) true - else if (timestamp < other.timestamp) false - else if (other.originDc.id.compareTo(originDc.id) > 0) true - else false - } -} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index c0d83579c5..d93db654be 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -56,6 +56,8 @@ import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.EventSourcedBehavior.ActiveActive import akka.stream.scaladsl.Keep import akka.stream.SystemMaterializer +import akka.stream.WatchedActorTerminatedException +import akka.stream.scaladsl.Source import akka.stream.scaladsl.{ RestartSource, Sink } import akka.stream.typed.scaladsl.ActorFlow import akka.util.OptionVal @@ -136,27 +138,34 @@ private[akka] object Running { val controlRef = new AtomicReference[ReplicationStreamControl]() - val source = RestartSource.withBackoff(2.seconds, 10.seconds, randomFactor = 0.2) { () => - replication - .eventsByPersistenceId(pid.id, seqNr + 1, Long.MaxValue) - // from each replica, only get the events that originated there, this prevents most of the event filtering - // the downside is that events can't be received via other replicas in the event of an uneven network partition - .filter(_.eventMetadata.get.asInstanceOf[ReplicatedEventMetaData].originReplica == replicaId) - .viaMat(new FastForwardingFilter)(Keep.right) - .mapMaterializedValue(streamControl => controlRef.set(streamControl)) - .via(ActorFlow.ask[EventEnvelope, ReplicatedEventEnvelope[E], ReplicatedEventAck.type](ref) { - (eventEnvelope, replyTo) => - // Need to handle this not being available migration from non-active-active is supported - val meta = eventEnvelope.eventMetadata.get.asInstanceOf[ReplicatedEventMetaData] - val re = - ReplicatedEvent[E]( - eventEnvelope.event.asInstanceOf[E], - meta.originReplica, - meta.originSequenceNr, - meta.version) - ReplicatedEventEnvelope(re, replyTo) - }) - } + val source = RestartSource + .withBackoff(2.seconds, 10.seconds, randomFactor = 0.2) { () => + replication + .eventsByPersistenceId(pid.id, seqNr + 1, Long.MaxValue) + // from each replica, only get the events that originated there, this prevents most of the event filtering + // the downside is that events can't be received via other replicas in the event of an uneven network partition + .filter(_.eventMetadata.get.asInstanceOf[ReplicatedEventMetaData].originReplica == replicaId) + .viaMat(new FastForwardingFilter)(Keep.right) + .mapMaterializedValue(streamControl => controlRef.set(streamControl)) + } + // needs to be outside of the restart source so that it actually cancels when terminating the replica + .via(ActorFlow + .ask[EventEnvelope, ReplicatedEventEnvelope[E], ReplicatedEventAck.type](ref) { (eventEnvelope, replyTo) => + // Need to handle this not being available migration from non-active-active is supported + val meta = eventEnvelope.eventMetadata.get.asInstanceOf[ReplicatedEventMetaData] + val re = + ReplicatedEvent[E]( + eventEnvelope.event.asInstanceOf[E], + meta.originReplica, + meta.originSequenceNr, + meta.version) + ReplicatedEventEnvelope(re, replyTo) + } + .recoverWithRetries(1, { + // not a failure, the replica is stopping, complete the stream + case _: WatchedActorTerminatedException => + Source.empty + })) source.runWith(Sink.ignore)(SystemMaterializer(system).materializer)