diff --git a/akka-actor/src/main/scala/akka/util/AlwaysIncreasingClock.scala b/akka-actor/src/main/scala/akka/util/AlwaysIncreasingClock.scala new file mode 100644 index 0000000000..8bc84f8261 --- /dev/null +++ b/akka-actor/src/main/scala/akka/util/AlwaysIncreasingClock.scala @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.util + +import java.util.concurrent.atomic.AtomicLong +import java.util.function.LongUnaryOperator + +import akka.annotation.InternalApi + +/** + * Always increasing wall clock time. + */ +@InternalApi +private[akka] final class AlwaysIncreasingClock() extends AtomicLong with WallClock { + + override def currentTimeMillis(): Long = { + val currentSystemTime = System.currentTimeMillis() + updateAndGet { + new LongUnaryOperator { + override def applyAsLong(time: Long): Long = { + if (currentSystemTime <= time) time + 1 + else currentSystemTime + } + } + } + } +} diff --git a/akka-actor/src/main/scala/akka/util/WallClock.scala b/akka-actor/src/main/scala/akka/util/WallClock.scala new file mode 100644 index 0000000000..1bda0e8485 --- /dev/null +++ b/akka-actor/src/main/scala/akka/util/WallClock.scala @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.util + +import akka.annotation.ApiMayChange + +/** + * A time source. + */ +@ApiMayChange +trait WallClock { + def currentTimeMillis(): Long +} + +object WallClock { + + /** + * Always increasing time source. + */ + val AlwaysIncreasingClock: WallClock = new AlwaysIncreasingClock() +} diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala index 7285a43b8f..ac9ed891b4 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala @@ -79,7 +79,7 @@ final class EventEnvelope( override def equals(obj: Any): Boolean = obj match { case other: EventEnvelope => offset == other.offset && persistenceId == other.persistenceId && sequenceNr == other.sequenceNr && - event == other.event // timestamp not included in equals for backwards compatibility + event == other.event // timestamp && metadata not included in equals for backwards compatibility case _ => false } diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/TestActor.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/TestActor.scala index 693ee109a3..1befbc27fe 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/TestActor.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/TestActor.scala @@ -6,16 +6,12 @@ package akka.persistence.query.journal.leveldb import akka.actor.Props import akka.persistence.PersistentActor -import akka.persistence.journal.EventWithMetaData -import akka.persistence.query.journal.leveldb.TestActor.WithMeta object TestActor { def props(persistenceId: String): Props = Props(new TestActor(persistenceId)) case class DeleteCmd(toSeqNr: Long = Long.MaxValue) - - case class WithMeta(payload: String, meta: Any) } class TestActor(override val persistenceId: String) extends PersistentActor { @@ -30,10 +26,6 @@ class TestActor(override val persistenceId: String) extends PersistentActor { case DeleteCmd(toSeqNr) => deleteMessages(toSeqNr) sender() ! s"$toSeqNr-deleted" - case WithMeta(payload, meta) => - persist(EventWithMetaData(payload, meta)) { _ => - sender() ! s"$payload-done" - } case cmd: String => persist(cmd) { evt => diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/CapabilityFlags.scala b/akka-persistence-tck/src/main/scala/akka/persistence/CapabilityFlags.scala index 24f5d91119..7434737402 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/CapabilityFlags.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/CapabilityFlags.scala @@ -52,6 +52,12 @@ trait JournalCapabilityFlags extends CapabilityFlags { */ protected def supportsSerialization: CapabilityFlag + /** + * When `true` enables tests which check if the Journal stores and returns + * metadata for an event + */ + protected def supportsMetadata: CapabilityFlag + } //#journal-flags diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala index 854b7faa70..1d6eaed53b 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala @@ -54,6 +54,8 @@ abstract class JournalSpec(config: Config) override protected def supportsSerialization: CapabilityFlag = true + override protected def supportsMetadata: CapabilityFlag = false + override protected def beforeEach(): Unit = { super.beforeEach() senderProbe = TestProbe() @@ -79,7 +81,7 @@ abstract class JournalSpec(config: Config) extension.journalFor(null) def replayedMessage(snr: Long, deleted: Boolean = false): ReplayedMessage = - ReplayedMessage(PersistentImpl(s"a-${snr}", snr, pid, "", deleted, Actor.noSender, writerUuid, 0L)) + ReplayedMessage(PersistentImpl(s"a-${snr}", snr, pid, "", deleted, Actor.noSender, writerUuid, 0L, None)) def writeMessages(fromSnr: Int, toSnr: Int, pid: String, sender: ActorRef, writerUuid: String): Unit = { @@ -112,7 +114,7 @@ abstract class JournalSpec(config: Config) probe.expectMsg(WriteMessagesSuccessful) (fromSnr to toSnr).foreach { i => probe.expectMsgPF() { - case WriteMessageSuccess(PersistentImpl(payload, `i`, `pid`, _, _, `sender`, `writerUuid`, _), _) => + case WriteMessageSuccess(PersistentImpl(payload, `i`, `pid`, _, _, `sender`, `writerUuid`, _, _), _) => payload should be(s"a-${i}") } } @@ -263,15 +265,15 @@ abstract class JournalSpec(config: Config) val Pid = pid val WriterUuid = writerUuid probe.expectMsgPF() { - case WriteMessageSuccess(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid, _), _) => + case WriteMessageSuccess(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid, _, _), _) => payload should be(s"b-6") } probe.expectMsgPF() { - case WriteMessageRejected(PersistentImpl(payload, 7L, Pid, _, _, Actor.noSender, WriterUuid, _), _, _) => + case WriteMessageRejected(PersistentImpl(payload, 7L, Pid, _, _, Actor.noSender, WriterUuid, _, _), _, _) => payload should be(notSerializableEvent) } probe.expectMsgPF() { - case WriteMessageSuccess(PersistentImpl(payload, 8L, Pid, _, _, Actor.noSender, WriterUuid, _), _) => + case WriteMessageSuccess(PersistentImpl(payload, 8L, Pid, _, _, Actor.noSender, WriterUuid, _, _), _) => payload should be(s"b-8") } } @@ -296,13 +298,13 @@ abstract class JournalSpec(config: Config) val Pid = pid val WriterUuid = writerUuid probe.expectMsgPF() { - case WriteMessageSuccess(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid, _), _) => + case WriteMessageSuccess(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid, _, _), _) => payload should be(event) } journal ! ReplayMessages(6, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) receiverProbe.expectMsgPF() { - case ReplayedMessage(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid, _)) => + case ReplayedMessage(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid, _, _)) => payload should be(event) } receiverProbe.expectMsgPF() { @@ -310,5 +312,34 @@ abstract class JournalSpec(config: Config) } } } + + optional(flag = supportsMetadata) { + + "return metadata" in { + val probe = TestProbe() + val event = TestPayload(probe.ref) + val meta = "meta-data" + val aw = + AtomicWrite( + PersistentRepr( + payload = event, + sequenceNr = 1L, + persistenceId = pid, + sender = Actor.noSender, + writerUuid = writerUuid).withMetadata(meta)) + + journal ! WriteMessages(List(aw), probe.ref, actorInstanceId) + probe.expectMsg(WriteMessagesSuccessful) + + val Pid = pid + val WriterUuid = writerUuid + probe.expectMsgPF() { + case WriteMessageSuccess( + PersistentImpl(payload, 1L, Pid, _, _, Actor.noSender, WriterUuid, _, Some(`meta`)), + _) => + payload should be(event) + } + } + } } } diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/EventStorage.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/EventStorage.scala index ff458085c7..14f8c90a46 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/EventStorage.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/EventStorage.scala @@ -10,7 +10,6 @@ import scala.collection.immutable import scala.util.{ Failure, Success, Try } import akka.annotation.InternalApi import akka.persistence.PersistentRepr -import akka.persistence.testkit.EventStorage.{ JournalPolicies, Metadata } import akka.persistence.testkit.ProcessingPolicy.DefaultPolicies import akka.persistence.testkit.internal.TestKitStorage import akka.util.ccompat.JavaConverters._ @@ -19,7 +18,7 @@ import akka.util.ccompat.JavaConverters._ * INTERNAL API */ @InternalApi -private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, (PersistentRepr, Metadata)] { +private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, PersistentRepr] { import EventStorage._ @@ -31,10 +30,10 @@ private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, (Pe // and therefore must be done at the same time with the update, not before updateOrSetNew(key, v => v ++ mapAny(key, elems).toVector) - override def reprToSeqNum(repr: (PersistentRepr, Metadata)): Long = repr._1.sequenceNr + override def reprToSeqNum(repr: (PersistentRepr)): Long = repr.sequenceNr - def add(elems: immutable.Seq[(PersistentRepr, Metadata)]): Unit = - elems.groupBy(_._1.persistenceId).foreach { gr => + def add(elems: immutable.Seq[PersistentRepr]): Unit = + elems.groupBy(_.persistenceId).foreach { gr => add(gr._1, gr._2) } @@ -43,11 +42,11 @@ private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, (Pe /** * @throws Exception from StorageFailure in the current writing policy */ - def tryAdd(elems: immutable.Seq[(PersistentRepr, Metadata)]): Try[Unit] = { - val grouped = elems.groupBy(_._1.persistenceId) + def tryAdd(elems: immutable.Seq[PersistentRepr]): Try[Unit] = { + val grouped = elems.groupBy(_.persistenceId) val processed = grouped.map { - case (pid, els) => currentPolicy.tryProcess(pid, WriteEvents(els.map(_._1.payload))) + case (pid, els) => currentPolicy.tryProcess(pid, WriteEvents(els.map(_.payload))) } val reduced: ProcessingResult = @@ -73,8 +72,8 @@ private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, (Pe persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, - max: Long): immutable.Seq[(PersistentRepr, Metadata)] = { - val batch: immutable.Seq[(PersistentRepr, Metadata)] = read(persistenceId, fromSequenceNr, toSequenceNr, max) + max: Long): immutable.Seq[PersistentRepr] = { + val batch = read(persistenceId, fromSequenceNr, toSequenceNr, max) currentPolicy.tryProcess(persistenceId, ReadEvents(batch)) match { case ProcessingSuccess => batch case Reject(ex) => throw ex @@ -98,35 +97,15 @@ private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, (Pe } } - private def mapAny(key: String, elems: immutable.Seq[Any]): immutable.Seq[(PersistentRepr, Metadata)] = { + private def mapAny(key: String, elems: immutable.Seq[Any]): immutable.Seq[PersistentRepr] = { val sn = getHighestSeqNumber(key) + 1 - elems.zipWithIndex.map(p => (PersistentRepr(p._1, p._2 + sn, key), NoMetadata)) + elems.zipWithIndex.map(p => PersistentRepr(p._1, p._2 + sn, key)) } } object EventStorage { - object JournalPolicies extends DefaultPolicies[JournalOperation] - - /** - * INTERNAL API - */ - @InternalApi - private[testkit] sealed trait Metadata - - /** - * INTERNAL API - */ - @InternalApi - private[testkit] case object NoMetadata extends Metadata - - /** - * INTERNAL API - */ - @InternalApi - private[testkit] final case class WithMetadata(payload: Any) extends Metadata - } /** diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala index 7d8e899b89..77dcd86a25 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala @@ -10,9 +10,8 @@ import scala.util.Try import com.typesafe.config.{ Config, ConfigFactory } import akka.annotation.InternalApi import akka.persistence._ -import akka.persistence.journal.{ AsyncWriteJournal, EventWithMetaData, Tagged } +import akka.persistence.journal.{ AsyncWriteJournal, Tagged } import akka.persistence.snapshot.SnapshotStore -import akka.persistence.testkit.EventStorage.{ NoMetadata, WithMetadata } import akka.persistence.testkit.internal.{ InMemStorageExtension, SnapshotStorageEmulatorExtension } /** @@ -26,32 +25,30 @@ class PersistenceTestKitPlugin extends AsyncWriteJournal { private final val storage = InMemStorageExtension(context.system) private val eventStream = context.system.eventStream - override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = + override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = { Future.fromTry(Try(messages.map(aw => { val data = aw.payload.map(pl => pl.payload match { - // TODO define how to handle tagged and metadata - case Tagged(p, _) => (pl.withPayload(p).withTimestamp(System.currentTimeMillis()), NoMetadata) - case evt: EventWithMetaData => - (pl.withPayload(evt.event).withTimestamp(System.currentTimeMillis()), WithMetadata(evt.metaData)) - case _ => (pl.withTimestamp(System.currentTimeMillis()), NoMetadata) + case Tagged(p, _) => pl.withPayload(p).withTimestamp(System.currentTimeMillis()) + case _ => pl.withTimestamp(System.currentTimeMillis()) }) val result: Try[Unit] = storage.tryAdd(data) result.foreach { _ => - messages.foreach(aw => - eventStream.publish(PersistenceTestKitPlugin.Write(aw.persistenceId, aw.highestSequenceNr))) + messages.foreach { aw => + eventStream.publish(PersistenceTestKitPlugin.Write(aw.persistenceId, aw.highestSequenceNr)) + } } result }))) + } override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = Future.fromTry(Try(storage.tryDelete(persistenceId, toSequenceNr))) override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)( recoveryCallback: PersistentRepr => Unit): Future[Unit] = - Future.fromTry( - Try(storage.tryRead(persistenceId, fromSequenceNr, toSequenceNr, max).map(_._1).foreach(recoveryCallback))) + Future.fromTry(Try(storage.tryRead(persistenceId, fromSequenceNr, toSequenceNr, max).foreach(recoveryCallback))) override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = Future.fromTry(Try { diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SerializedEventStorageImpl.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SerializedEventStorageImpl.scala index dd6242ee95..895874a423 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SerializedEventStorageImpl.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SerializedEventStorageImpl.scala @@ -8,7 +8,6 @@ import akka.actor.{ ActorSystem, ExtendedActorSystem } import akka.annotation.InternalApi import akka.persistence.PersistentRepr import akka.persistence.testkit.EventStorage -import akka.persistence.testkit.EventStorage.Metadata import akka.persistence.testkit.internal.SerializedEventStorageImpl.Serialized import akka.serialization.{ Serialization, SerializationExtension, Serializers } @@ -21,7 +20,7 @@ private[testkit] object SerializedEventStorageImpl { payloadSerManifest: String, writerUuid: String, payload: Array[Byte], - metadata: Metadata) + metadata: Option[Any]) } /** @@ -37,23 +36,28 @@ private[testkit] class SerializedEventStorageImpl(system: ActorSystem) extends E /** * @return (serializer id, serialized bytes) */ - override def toInternal(repr: (PersistentRepr, Metadata)): Serialized = + override def toInternal(pr: PersistentRepr): Serialized = Serialization.withTransportInformation(system.asInstanceOf[ExtendedActorSystem]) { () => - val (pr, meta) = repr val payload = pr.payload.asInstanceOf[AnyRef] val s = serialization.findSerializerFor(payload) val manifest = Serializers.manifestFor(s, payload) - Serialized(pr.persistenceId, pr.sequenceNr, s.identifier, manifest, pr.writerUuid, s.toBinary(payload), meta) + Serialized( + pr.persistenceId, + pr.sequenceNr, + s.identifier, + manifest, + pr.writerUuid, + s.toBinary(payload), + pr.metadata) } /** * @param internal (serializer id, serialized bytes) */ - override def toRepr(internal: Serialized): (PersistentRepr, Metadata) = { + override def toRepr(internal: Serialized): PersistentRepr = { val event = serialization.deserialize(internal.payload, internal.payloadSerId, internal.payloadSerManifest).get - ( - PersistentRepr(event, internal.sequenceNr, internal.persistenceId, writerUuid = internal.writerUuid), - internal.metadata) + val pr = PersistentRepr(event, internal.sequenceNr, internal.persistenceId, writerUuid = internal.writerUuid) + internal.metadata.fold(pr)(meta => pr.withMetadata(meta)) } } diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SimpleEventStorageImpl.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SimpleEventStorageImpl.scala index 25b63c8042..712c311498 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SimpleEventStorageImpl.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SimpleEventStorageImpl.scala @@ -7,7 +7,6 @@ package akka.persistence.testkit.internal import akka.annotation.InternalApi import akka.persistence._ import akka.persistence.testkit.EventStorage -import akka.persistence.testkit.EventStorage.Metadata /** * INTERNAL API @@ -15,10 +14,10 @@ import akka.persistence.testkit.EventStorage.Metadata @InternalApi private[testkit] class SimpleEventStorageImpl extends EventStorage { - override type InternalRepr = (PersistentRepr, Metadata) + override type InternalRepr = PersistentRepr - override def toInternal(repr: (PersistentRepr, Metadata)): (PersistentRepr, Metadata) = repr + override def toInternal(repr: PersistentRepr): PersistentRepr = repr - override def toRepr(internal: (PersistentRepr, Metadata)): (PersistentRepr, Metadata) = internal + override def toRepr(internal: PersistentRepr): PersistentRepr = internal } diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/internal/EventsByPersistenceIdStage.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/internal/EventsByPersistenceIdStage.scala index af115ae4dc..1f2bc623d0 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/internal/EventsByPersistenceIdStage.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/internal/EventsByPersistenceIdStage.scala @@ -7,7 +7,6 @@ import akka.actor.ActorRef import akka.annotation.InternalApi import akka.persistence.query.{ EventEnvelope, Sequence } import akka.persistence.testkit.{ EventStorage, PersistenceTestKitPlugin } -import akka.persistence.testkit.EventStorage.{ NoMetadata, WithMetadata } import akka.stream.{ Attributes, Outlet, SourceShape } import akka.stream.stage.{ GraphStage, GraphStageLogic, GraphStageLogicWithLogging, OutHandler } @@ -37,7 +36,6 @@ final private[akka] class EventsByPersistenceIdStage( val (_, msg) = in msg match { case PersistenceTestKitPlugin.Write(pid, toSequenceNr) if pid == persistenceId => - log.debug("Write notification {} {}", pid, toSequenceNr) if (toSequenceNr >= currentSequenceNr) { tryPush() } @@ -50,7 +48,7 @@ final private[akka] class EventsByPersistenceIdStage( val event = storage.tryRead(persistenceId, currentSequenceNr, currentSequenceNr, 1) log.debug("tryPush available. Query for {} {} result {}", currentSequenceNr, currentSequenceNr, event) event.headOption match { - case Some((pr, meta)) => + case Some(pr) => push( out, EventEnvelope( @@ -59,10 +57,7 @@ final private[akka] class EventsByPersistenceIdStage( pr.sequenceNr, pr.payload, pr.timestamp, - meta match { - case NoMetadata => None - case WithMetadata(m) => Some(m) - })) + pr.metadata)) if (currentSequenceNr == toSequenceNr) { completeStage() } else { diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala index fc52243c67..ebc50c233a 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala @@ -8,7 +8,6 @@ import akka.actor.ExtendedActorSystem import akka.persistence.query.{ EventEnvelope, Sequence } import akka.persistence.query.scaladsl.{ CurrentEventsByPersistenceIdQuery, EventsByPersistenceIdQuery, ReadJournal } import akka.persistence.testkit.EventStorage -import akka.persistence.testkit.EventStorage.{ NoMetadata, WithMetadata } import akka.persistence.testkit.internal.InMemStorageExtension import akka.persistence.testkit.query.internal.EventsByPersistenceIdStage import akka.stream.scaladsl.Source @@ -22,7 +21,7 @@ final class PersistenceTestKitReadJournal(system: ExtendedActorSystem) with EventsByPersistenceIdQuery with CurrentEventsByPersistenceIdQuery { - private final val storage: EventStorage = InMemStorageExtension(system) + private val storage: EventStorage = InMemStorageExtension(system) override def eventsByPersistenceId( persistenceId: String, @@ -35,12 +34,8 @@ final class PersistenceTestKitReadJournal(system: ExtendedActorSystem) persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long): Source[EventEnvelope, NotUsed] = { - Source(storage.tryRead(persistenceId, fromSequenceNr, toSequenceNr, Long.MaxValue)).map { - case (pr, meta) => - EventEnvelope(Sequence(pr.sequenceNr), persistenceId, pr.sequenceNr, pr.payload, pr.timestamp, meta match { - case NoMetadata => None - case WithMetadata(payload) => Some(payload) - }) + Source(storage.tryRead(persistenceId, fromSequenceNr, toSequenceNr, Long.MaxValue)).map { pr => + EventEnvelope(Sequence(pr.sequenceNr), persistenceId, pr.sequenceNr, pr.payload, pr.timestamp, pr.metadata) } } } diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala index 4a959adfc1..1aa3910860 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala @@ -18,7 +18,6 @@ import akka.annotation.ApiMayChange import akka.persistence.Persistence import akka.persistence.PersistentRepr import akka.persistence.SnapshotMetadata -import akka.persistence.testkit.EventStorage.Metadata import akka.persistence.testkit._ import akka.persistence.testkit.internal.InMemStorageExtension import akka.persistence.testkit.internal.SnapshotStorageEmulatorExtension @@ -423,9 +422,9 @@ object SnapshotTestKit { */ @ApiMayChange class PersistenceTestKit(system: ActorSystem) - extends PersistenceTestKitOps[(PersistentRepr, Metadata), JournalOperation] - with ExpectOps[(PersistentRepr, Metadata)] - with HasStorage[JournalOperation, (PersistentRepr, Metadata)] { + extends PersistenceTestKitOps[PersistentRepr, JournalOperation] + with ExpectOps[PersistentRepr] + with HasStorage[JournalOperation, PersistentRepr] { require( Try(Persistence(system).journalFor(PersistenceTestKitPlugin.PluginId)).isSuccess, "The test persistence plugin is not configured.") @@ -494,7 +493,7 @@ class PersistenceTestKit(system: ActorSystem) def persistedInStorage(persistenceId: String): immutable.Seq[Any] = storage.read(persistenceId).getOrElse(List.empty).map(reprToAny) - override private[testkit] def reprToAny(repr: (PersistentRepr, Metadata)): Any = repr._1.payload + override private[testkit] def reprToAny(repr: PersistentRepr): Any = repr.payload } @ApiMayChange diff --git a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/query/EventsByPersistenceIdSpec.scala b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/query/EventsByPersistenceIdSpec.scala index 144a98108c..a642a79788 100644 --- a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/query/EventsByPersistenceIdSpec.scala +++ b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/query/EventsByPersistenceIdSpec.scala @@ -4,16 +4,14 @@ package akka.persistence.testkit.query -import akka.{ Done, NotUsed } +import akka.Done import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit } import akka.actor.typed.ActorRef -import akka.persistence.journal.EventWithMetaData import akka.persistence.query.{ EventEnvelope, PersistenceQuery } import akka.persistence.testkit.PersistenceTestKitPlugin import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal -import akka.persistence.typed.{ EventAdapter, EventSeq, PersistenceId } +import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior } -import akka.stream.scaladsl.Source import akka.stream.testkit.scaladsl.TestSink import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike @@ -40,20 +38,7 @@ object EventsByPersistenceIdSpec { command.ack ! Done }, (state, _) => state) - }.eventAdapter(new EventAdapter[String, Any] { - override def toJournal(e: String): Any = { - if (e.startsWith("m")) { - EventWithMetaData(e, s"$e-meta") - } else { - e - } - } - override def manifest(event: String): String = "" - override def fromJournal(p: Any, manifest: String): EventSeq[String] = p match { - case e: EventWithMetaData => EventSeq.single(e.event.toString) - case _ => EventSeq.single(p.toString) - } - }) + } } @@ -148,22 +133,5 @@ class EventsByPersistenceIdSpec probe.cancel() } - - "return metadata in queries" in { - val ackProbe = createTestProbe[Done]() - val ref = setupEmpty("with-meta") - ref ! Command("m-1", ackProbe.ref) - ref ! Command("m-2", ackProbe.ref) - val src: Source[EventEnvelope, NotUsed] = queries.eventsByPersistenceId("with-meta", 0L, Long.MaxValue) - val probe = - src.runWith(TestSink.probe[Any]).request(3) - probe.expectNextPF { - case e @ EventEnvelope(_, "with-meta", 1L, "m-1") if e.eventMetadata.contains("m-1-meta") => - } - - probe.expectNextPF { - case e @ EventEnvelope(_, "with-meta", 2L, "m-2") if e.eventMetadata.contains("m-2-meta") => - } - } } } diff --git a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/scaladsl/PersistenceTestKitJournalCompatSpec.scala b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/scaladsl/PersistenceTestKitJournalCompatSpec.scala index c2ea678c62..30e2996fab 100644 --- a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/scaladsl/PersistenceTestKitJournalCompatSpec.scala +++ b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/scaladsl/PersistenceTestKitJournalCompatSpec.scala @@ -37,6 +37,7 @@ class PersistenceTestKitJournalCompatSpec extends JournalSpec(config = Persisten } override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = true + override protected def supportsMetadata: CapabilityFlag = true } class PersistenceTestKitSnapshotStoreCompatSpec diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveSpec.scala new file mode 100644 index 0000000000..bb2ee63290 --- /dev/null +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/ActiveActiveSpec.scala @@ -0,0 +1,148 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.typed + +import java.util.concurrent.atomic.AtomicInteger + +import akka.Done +import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit } +import akka.actor.typed.{ ActorRef, Behavior } +import akka.persistence.testkit.PersistenceTestKitPlugin +import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal +import akka.persistence.typed.scaladsl.{ ActiveActiveEventSourcing, Effect, EventSourcedBehavior } +import org.scalatest.concurrent.Eventually +import org.scalatest.wordspec.AnyWordSpecLike + +object ActiveActiveSpec { + + val AllReplicas = Set("R1", "R2", "R3") + + sealed trait Command + case class GetState(replyTo: ActorRef[State]) extends Command + case class StoreMe(description: String, replyTo: ActorRef[Done]) extends Command + case class GetReplica(replyTo: ActorRef[(String, Set[String])]) extends Command + + case class State(all: List[String]) + def testBehavior(entityId: String, replicaId: String, probe: ActorRef[EventAndContext]): Behavior[Command] = + testBehavior(entityId, replicaId, Some(probe)) + + def testBehavior( + entityId: String, + replicaId: String, + probe: Option[ActorRef[EventAndContext]] = None): Behavior[Command] = + ActiveActiveEventSourcing(entityId, replicaId, AllReplicas, PersistenceTestKitReadJournal.Identifier)( + aaContext => + EventSourcedBehavior[Command, String, State]( + aaContext.persistenceId, + State(Nil), + (state, command) => + command match { + case GetState(replyTo) => + replyTo ! state + Effect.none + case GetReplica(replyTo) => + replyTo.tell((aaContext.replicaId, aaContext.allReplicas)) + Effect.none + case StoreMe(evt, ack) => + Effect.persist(evt).thenRun(_ => ack ! Done) + }, + (state, event) => { + probe.foreach(_ ! EventAndContext(event, aaContext.origin, aaContext.recoveryRunning)) + state.copy(all = event :: state.all) + })) + +} + +case class EventAndContext(event: Any, origin: String, recoveryRunning: Boolean = false) + +class ActiveActiveSpec + extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config) + with AnyWordSpecLike + with LogCapturing + with Eventually { + import ActiveActiveSpec._ + val ids = new AtomicInteger(0) + def nextEntityId = s"e-${ids.getAndIncrement()}" + "ActiveActiveEventSourcing" should { + "replicate events between entities" in { + val entityId = nextEntityId + val probe = createTestProbe[Done]() + val r1 = spawn(testBehavior(entityId, "R1")) + val r2 = spawn(testBehavior(entityId, "R2")) + r1 ! StoreMe("from r1", probe.ref) + r2 ! StoreMe("from r2", probe.ref) + eventually { + val probe = createTestProbe[State]() + r1 ! GetState(probe.ref) + probe.expectMessageType[State].all.toSet shouldEqual Set("from r1", "from r2") + } + eventually { + val probe = createTestProbe[State]() + r2 ! GetState(probe.ref) + probe.expectMessageType[State].all.toSet shouldEqual Set("from r1", "from r2") + } + } + "get all events in recovery" in { + val entityId = nextEntityId + val probe = createTestProbe[Done]() + val r1 = spawn(testBehavior(entityId, "R1")) + val r2 = spawn(testBehavior(entityId, "R2")) + r1 ! StoreMe("from r1", probe.ref) + r2 ! StoreMe("from r2", probe.ref) + r1 ! StoreMe("from r1 again", probe.ref) + + val r3 = spawn(testBehavior(entityId, "R3")) + eventually { + val probe = createTestProbe[State]() + r3 ! GetState(probe.ref) + probe.expectMessageType[State].all.toSet shouldEqual Set("from r1", "from r2", "from r1 again") + } + } + + "have access to replica information" in { + val entityId = nextEntityId + val probe = createTestProbe[(String, Set[String])]() + val r1 = spawn(testBehavior(entityId, "R1")) + r1 ! GetReplica(probe.ref) + probe.expectMessage(("R1", Set("R1", "R2", "R3"))) + } + + "have access to event origin" in { + val entityId = nextEntityId + val replyProbe = createTestProbe[Done]() + val eventProbeR1 = createTestProbe[EventAndContext]() + val eventProbeR2 = createTestProbe[EventAndContext]() + + val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref)) + val r2 = spawn(testBehavior(entityId, "R2", eventProbeR2.ref)) + + r1 ! StoreMe("from r1", replyProbe.ref) + eventProbeR2.expectMessage(EventAndContext("from r1", "R1")) + eventProbeR1.expectMessage(EventAndContext("from r1", "R1")) + + r2 ! StoreMe("from r2", replyProbe.ref) + eventProbeR1.expectMessage(EventAndContext("from r2", "R2")) + eventProbeR2.expectMessage(EventAndContext("from r2", "R2")) + } + + "set recovery running" in { + val entityId = nextEntityId + val eventProbeR1 = createTestProbe[EventAndContext]() + val replyProbe = createTestProbe[Done]() + val r1 = spawn(testBehavior(entityId, "R1", eventProbeR1.ref)) + r1 ! StoreMe("Event", replyProbe.ref) + eventProbeR1.expectMessage(EventAndContext("Event", "R1", recoveryRunning = false)) + replyProbe.expectMessage(Done) + + val recoveryProbe = createTestProbe[EventAndContext]() + spawn(testBehavior(entityId, "R1", recoveryProbe.ref)) + recoveryProbe.expectMessage(EventAndContext("Event", "R1", recoveryRunning = true)) + } + + "persist all" in { + pending + } + } +} diff --git a/akka-persistence-typed-tests/src/test/scala/docs.akka.persistence.typed/AABlogExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs.akka.persistence.typed/AABlogExampleSpec.scala index 4fc552a0f1..99106b5c71 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs.akka.persistence.typed/AABlogExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs.akka.persistence.typed/AABlogExampleSpec.scala @@ -61,13 +61,19 @@ class AABlogExampleSpec cmd match { case AddPost(_, content, replyTo) => val evt = - PostAdded(aa.persistenceId.id, content, state.contentTimestamp.increase(aa.timestamp, aa.replicaId)) + PostAdded( + aa.persistenceId.id, + content, + state.contentTimestamp.increase(aa.currentTimeMillis(), aa.replicaId)) Effect.persist(evt).thenRun { _ => replyTo ! AddPostDone(aa.entityId) } case ChangeBody(_, newContent, replyTo) => val evt = - BodyChanged(aa.persistenceId.id, newContent, state.contentTimestamp.increase(aa.timestamp, aa.replicaId)) + BodyChanged( + aa.persistenceId.id, + newContent, + state.contentTimestamp.increase(aa.currentTimeMillis(), aa.replicaId)) Effect.persist(evt).thenRun { _ => replyTo ! Done } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index 4b4f8205bc..0087e5d20b 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -242,7 +242,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( } override private[akka] def withActiveActive( - context: ActiveActiveContext, + context: ActiveActiveContextImpl, id: String, allIds: Set[String], queryPluginId: String): EventSourcedBehavior[Command, Event, State] = { @@ -265,7 +265,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( // FIXME serializer @InternalApi -private[akka] final case class ReplicatedEventMetaData(originDc: String) +private[akka] final case class ReplicatedEventMetaData(originReplica: String, originSequenceNr: Long) @InternalApi private[akka] final case class ReplicatedEvent[E](event: E, originReplica: String, originSequenceNr: Long) @InternalApi diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala index 8345a26d30..ec6291e5bf 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ExternalInteractions.scala @@ -5,7 +5,6 @@ package akka.persistence.typed.internal import scala.collection.immutable - import akka.actor.ActorRef import akka.actor.typed.Behavior import akka.actor.typed.PostStop @@ -19,7 +18,7 @@ import akka.annotation.InternalStableApi import akka.persistence._ import akka.persistence.JournalProtocol.ReplayMessages import akka.persistence.SnapshotProtocol.LoadSnapshot -import akka.util.unused +import akka.util.{ unused, OptionVal } /** INTERNAL API */ @InternalApi @@ -34,7 +33,8 @@ private[akka] trait JournalInteractions[C, E, S] { cmd: Any, state: Running.RunningState[S], event: EventOrTaggedOrReplicated, - eventAdapterManifest: String): Running.RunningState[S] = { + eventAdapterManifest: String, + metadata: OptionVal[Any]): Running.RunningState[S] = { val newRunningState = state.nextSequenceNr() @@ -50,7 +50,11 @@ private[akka] trait JournalInteractions[C, E, S] { // https://github.com/akka/akka/issues/29262 onWriteInitiated(ctx, cmd, repr) - val write = AtomicWrite(repr) :: Nil + val write = AtomicWrite(metadata match { + case OptionVal.Some(meta) => repr.withMetadata(meta) + case OptionVal.None => repr + }) :: Nil + setup.journal .tell(JournalProtocol.WriteMessages(write, setup.selfClassic, setup.writerIdentity.instanceId), setup.selfClassic) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index 0b2735723d..eafe1e9eb6 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -6,7 +6,6 @@ package akka.persistence.typed.internal import scala.concurrent.duration._ import scala.util.control.NonFatal - import akka.actor.typed.{ Behavior, Signal } import akka.actor.typed.internal.PoisonPill import akka.actor.typed.internal.UnstashException @@ -120,7 +119,21 @@ private[akka] final class ReplayingEvents[C, E, S]( def handleEvent(event: E): Unit = { eventForErrorReporting = OptionVal.Some(event) state = state.copy(seqNr = repr.sequenceNr) - state = state.copy(state = setup.eventHandler(state.state, event), eventSeenInInterval = true) + + setup.activeActive match { + case Some(aa) => + val meta = repr.metadata match { + case Some(m) => m.asInstanceOf[ReplicatedEventMetaData] + case None => + throw new IllegalStateException( + s"Active active enabled but existing event has no metadata. Migration isn't supported yet.") + + } + aa.setContext(recoveryRunning = true, meta.originReplica) + case None => + } + val newState = setup.eventHandler(state.state, event) + state = state.copy(state = newState, eventSeenInInterval = true) } eventSeq match { @@ -240,7 +253,9 @@ private[akka] final class ReplayingEvents[C, E, S]( Behaviors.stopped else { val seenPerReplica: Map[String, Long] = - setup.activeActive.map(aa => aa.allReplicas.map(replica => replica -> 0L).toMap).getOrElse(Map.empty) + setup.activeActive + .map(aa => aa.allReplicas.filterNot(_ == aa.replicaId).map(replica => replica -> 0L).toMap) + .getOrElse(Map.empty) val running = Running[C, E, S]( setup, diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index 846051b758..e932ccf873 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -23,7 +23,7 @@ import akka.persistence.PersistentRepr import akka.persistence.SaveSnapshotFailure import akka.persistence.SaveSnapshotSuccess import akka.persistence.SnapshotProtocol -import akka.persistence.journal.{ EventWithMetaData, Tagged } +import akka.persistence.journal.Tagged import akka.persistence.query.{ EventEnvelope, PersistenceQuery } import akka.persistence.query.scaladsl.EventsByPersistenceIdQuery import akka.persistence.typed.{ @@ -47,7 +47,7 @@ import akka.persistence.typed.scaladsl.EventSourcedBehavior.ActiveActive import akka.stream.{ SharedKillSwitch, SystemMaterializer } import akka.stream.scaladsl.{ RestartSource, Sink } import akka.stream.typed.scaladsl.ActorFlow -import akka.util.{ unused, Timeout } +import akka.util.{ unused, OptionVal, Timeout } /** * INTERNAL API @@ -116,15 +116,16 @@ private[akka] object Running { implicit val timeout = Timeout(30.seconds) + // FIXME config val source = RestartSource.withBackoff(2.seconds, 10.seconds, randomFactor = 0.2) { () => replication .eventsByPersistenceId(pid.id, seqNr + 1, Long.MaxValue) .via(ActorFlow.ask[EventEnvelope, ReplicatedEventEnvelope[E], ReplicatedEventAck.type](ref) { (eventEnvelope, replyTo) => - val re = ReplicatedEvent[E]( - eventEnvelope.event.asInstanceOf[E], - eventEnvelope.eventMetadata.get.asInstanceOf[ReplicatedEventMetaData].originDc, - eventEnvelope.sequenceNr) // FIXME, this is the wrong sequence nr, we need origin sequence nr, follow up with tests that show this + // Need to handle this not being available migration from non-active-active is supported + val meta = eventEnvelope.eventMetadata.get.asInstanceOf[ReplicatedEventMetaData] + val re = + ReplicatedEvent[E](eventEnvelope.event.asInstanceOf[E], meta.originReplica, meta.originSequenceNr) ReplicatedEventEnvelope(re, replyTo) }) } @@ -161,7 +162,7 @@ private[akka] object Running { def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = msg match { case IncomingCommand(c: C @unchecked) => onCommand(state, c) - case re: ReplicatedEventEnvelope[E @unchecked] => onReplicatedEvent(state, re) + case re: ReplicatedEventEnvelope[E @unchecked] => onReplicatedEvent(state, re, setup.activeActive.get) case JournalResponse(r) => onDeleteEventsJournalResponse(r, state.state) case SnapshotterResponse(r) => onDeleteSnapshotResponse(r, state.state) case get: GetState[S @unchecked] => onGetState(get) @@ -186,18 +187,20 @@ private[akka] object Running { def onReplicatedEvent( state: Running.RunningState[S], - envelope: ReplicatedEventEnvelope[E]): Behavior[InternalProtocol] = { - // FIXME set the details on the context https://github.com/akka/akka/issues/29258 + envelope: ReplicatedEventEnvelope[E], + activeActive: ActiveActive): Behavior[InternalProtocol] = { setup.log.infoN( - "Replica {} received replicated event. Replica seqs nrs: {}", + "Replica {} received replicated event. Replica seqs nrs: {}. Envelope {}", setup.activeActive, - state.seenPerReplica) + state.seenPerReplica, + envelope) envelope.ack ! ReplicatedEventAck - if (envelope.event.originReplica != setup.activeActive.get.replicaId && !alreadySeen(envelope.event)) { - setup.log.info("Saving event as first time") - handleReplicatedEventPersist(envelope.event) + if (envelope.event.originReplica != activeActive.replicaId && !alreadySeen(envelope.event)) { + activeActive.setContext(false, envelope.event.originReplica) + setup.log.debug("Saving event as first time") + handleExternalReplicatedEventPersist(envelope.event) } else { - setup.log.info("Filtering event as already seen") + setup.log.debug("Filtering event as already seen") this } } @@ -208,11 +211,16 @@ private[akka] object Running { this } - private def handleReplicatedEventPersist(event: ReplicatedEvent[E]): Behavior[InternalProtocol] = { + private def handleExternalReplicatedEventPersist(event: ReplicatedEvent[E]): Behavior[InternalProtocol] = { _currentSequenceNumber = state.seqNr + 1 - val replicatedEvent = new EventWithMetaData(event.event, ReplicatedEventMetaData(event.originReplica)) val newState: RunningState[S] = state.applyEvent(setup, event.event) - val newState2: RunningState[S] = internalPersist(setup.context, null, newState, replicatedEvent, "") + val newState2: RunningState[S] = internalPersist( + setup.context, + null, + newState, + event.event, + "", + OptionVal.Some(ReplicatedEventMetaData(event.originReplica, event.originSequenceNr))) val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event.event, newState2.seqNr) // FIXME validate this is the correct sequence nr from that replica https://github.com/akka/akka/issues/29259 val updatedSeen = newState2.seenPerReplica.updated(event.originReplica, event.originSequenceNr) @@ -224,6 +232,37 @@ private[akka] object Running { Nil) } + private def handleEventPersist(event: E, cmd: Any, sideEffects: immutable.Seq[SideEffect[S]]) = { + // apply the event before persist so that validation exception is handled before persisting + // the invalid event, in case such validation is implemented in the event handler. + // also, ensure that there is an event handler for each single event + _currentSequenceNumber = state.seqNr + 1 + + setup.activeActive.foreach { aa => + aa.setContext(recoveryRunning = false, aa.replicaId) + } + val newState: RunningState[S] = state.applyEvent(setup, event) + + val eventToPersist = adaptEvent(event) + val eventAdapterManifest = setup.eventAdapter.manifest(event) + + val newState2 = setup.activeActive match { + case Some(aa) => + internalPersist( + setup.context, + cmd, + newState, + eventToPersist, + eventAdapterManifest, + OptionVal.Some(ReplicatedEventMetaData(aa.replicaId, _currentSequenceNumber))) + case None => + internalPersist(setup.context, cmd, newState, eventToPersist, eventAdapterManifest, OptionVal.None) + } + + val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event, newState2.seqNr) + (persistingEvents(newState2, state, numberOfEvents = 1, shouldSnapshotAfterPersist, sideEffects), false) + } + @tailrec def applyEffects( msg: Any, state: RunningState[S], @@ -242,27 +281,7 @@ private[akka] object Running { applyEffects(msg, state, eff, currentSideEffects ++ sideEffects) case Persist(event) => - // apply the event before persist so that validation exception is handled before persisting - // the invalid event, in case such validation is implemented in the event handler. - // also, ensure that there is an event handler for each single event - _currentSequenceNumber = state.seqNr + 1 - val newState: RunningState[S] = state.applyEvent(setup, event) - - val eventToPersist = adaptEvent(event) - val eventAdapterManifest = setup.eventAdapter.manifest(event) - - val newState2 = setup.activeActive match { - case Some(aa) => - val replicatedEvent = ReplicatedEvent(eventToPersist, aa.replicaId, _currentSequenceNumber) - internalPersist(setup.context, msg, newState, replicatedEvent, eventAdapterManifest) - case None => - internalPersist(setup.context, msg, newState, eventToPersist, eventAdapterManifest) - } - - val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event, newState2.seqNr) - - (persistingEvents(newState2, state, numberOfEvents = 1, shouldSnapshotAfterPersist, sideEffects), false) - + handleEventPersist(event, msg, sideEffects) case PersistAll(events) => if (events.nonEmpty) { // apply the event before persist so that validation exception is handled before persisting diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ActiveActiveEventSourcing.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ActiveActiveEventSourcing.scala index 847b2cbff6..f0695b7b8d 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ActiveActiveEventSourcing.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/ActiveActiveEventSourcing.scala @@ -5,6 +5,7 @@ package akka.persistence.typed.scaladsl import akka.persistence.typed.PersistenceId +import akka.util.WallClock /** * Utility class for comparing timestamp and data center @@ -34,8 +35,9 @@ final case class LwwTime(timestamp: Long, originDc: String) { } } +// FIXME docs trait ActiveActiveContext { - def timestamp: Long + def origin: String def concurrent: Boolean def replicaId: String @@ -44,6 +46,7 @@ trait ActiveActiveContext { def recoveryRunning: Boolean def entityId: String def currentTimeMillis(): Long + } // FIXME, parts of this can be set during initialisation @@ -51,18 +54,11 @@ trait ActiveActiveContext { // https://github.com/akka/akka/issues/29258 private[akka] class ActiveActiveContextImpl(val entityId: String, val replicaId: String, val allReplicas: Set[String]) extends ActiveActiveContext { - var _timestamp: Long = -1 var _origin: String = null - var _concurrent: Boolean = false + var _recoveryRunning: Boolean = false // FIXME check illegal access https://github.com/akka/akka/issues/29264 - /** - * The timestamp of the event. Always increases per data center - * Undefined result if called from any where other than an event handler. - */ - override def timestamp: Long = _timestamp - /** * The origin of the current event. * Undefined result if called from anywhere other than an event handler. @@ -73,13 +69,14 @@ private[akka] class ActiveActiveContextImpl(val entityId: String, val replicaId: * Whether the happened concurrently with an event from another replica. * Undefined result if called from any where other than an event handler. */ - override def concurrent: Boolean = _concurrent + override def concurrent: Boolean = throw new UnsupportedOperationException("TODO") + override def persistenceId: PersistenceId = PersistenceId.replicatedUniqueId(entityId, replicaId) + override def currentTimeMillis(): Long = { - // FIXME always increasing - System.currentTimeMillis() + WallClock.AlwaysIncreasingClock.currentTimeMillis() } - override def recoveryRunning: Boolean = false + override def recoveryRunning: Boolean = _recoveryRunning } object ActiveActiveEventSourcing { diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala index e0a4984587..e53a1918a0 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/EventSourcedBehavior.scala @@ -25,8 +25,18 @@ object EventSourcedBehavior { private[akka] case class ActiveActive( replicaId: String, allReplicas: Set[String], - aaContext: ActiveActiveContext, - queryPluginId: String) + aaContext: ActiveActiveContextImpl, + queryPluginId: String) { + + /** + * Must only be called on the same thread that will execute the user code + */ + def setContext(recoveryRunning: Boolean, originReplica: String): Unit = { + aaContext._recoveryRunning = recoveryRunning + aaContext._origin = originReplica + } + + } /** * Type alias for the command handler function that defines how to act on commands. @@ -152,7 +162,7 @@ object EventSourcedBehavior { def withJournalPluginId(id: String): EventSourcedBehavior[Command, Event, State] private[akka] def withActiveActive( - context: ActiveActiveContext, + context: ActiveActiveContextImpl, replicaId: String, allReplicaIds: Set[String], queryPluginId: String): EventSourcedBehavior[Command, Event, State] diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index f260f79124..b7285058f9 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -102,6 +102,10 @@ final case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) extends Per def withTimestamp(newTimestamp: Long): PersistentRepr + def metadata: Option[Any] + + def withMetadata(metadata: Any): PersistentRepr + /** * Unique identifier of the writing persistent actor. * Used to detect anomalies with overlapping writes from multiple @@ -163,7 +167,7 @@ object PersistentRepr { deleted: Boolean = false, sender: ActorRef = null, writerUuid: String = PersistentRepr.Undefined): PersistentRepr = - PersistentImpl(payload, sequenceNr, persistenceId, manifest, deleted, sender, writerUuid, 0L) + PersistentImpl(payload, sequenceNr, persistenceId, manifest, deleted, sender, writerUuid, 0L, None) /** * Java API, Plugin API. @@ -188,7 +192,8 @@ private[persistence] final case class PersistentImpl( override val deleted: Boolean, override val sender: ActorRef, override val writerUuid: String, - override val timestamp: Long) + override val timestamp: Long, + override val metadata: Option[Any]) extends PersistentRepr with NoSerializationVerificationNeeded { @@ -203,6 +208,10 @@ private[persistence] final case class PersistentImpl( if (this.timestamp == newTimestamp) this else copy(timestamp = newTimestamp) + override def withMetadata(metadata: Any): PersistentRepr = { + copy(metadata = Some(metadata)) + } + def update(sequenceNr: Long, persistenceId: String, deleted: Boolean, sender: ActorRef, writerUuid: String) = copy( sequenceNr = sequenceNr, @@ -221,6 +230,7 @@ private[persistence] final case class PersistentImpl( result = HashCode.hash(result, sender) result = HashCode.hash(result, writerUuid) // timestamp not included in equals for backwards compatibility + // meta not included in equals for backwards compatibility result } @@ -235,5 +245,4 @@ private[persistence] final case class PersistentImpl( override def toString: String = { s"PersistentRepr($persistenceId,$sequenceNr,$writerUuid,$timestamp)" } - } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/EventWithMetaData.scala b/akka-persistence/src/main/scala/akka/persistence/journal/EventWithMetaData.scala deleted file mode 100644 index c1bfc6aa11..0000000000 --- a/akka-persistence/src/main/scala/akka/persistence/journal/EventWithMetaData.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (C) 2020 Lightbend Inc. - */ - -package akka.persistence.journal - -object EventWithMetaData { - - def apply(event: Any, metaData: Any): EventWithMetaData = new EventWithMetaData(event, metaData) - - /** - * If meta data could not be deserialized it will not fail the replay/query. - * The "invalid" meta data is represented with this `UnknownMetaData` and - * it and the event will be wrapped in `EventWithMetaData`. - * - * The reason for not failing the replay/query is that meta data should be - * optional, e.g. the tool that wrote the meta data has been removed. This - * is typically because the serializer for the meta data has been removed - * from the class path (or configuration). - */ - final class UnknownMetaData(val serializerId: Int, val manifest: String) -} - -/** - * If the event is wrapped in this class the `metaData` will - * be serialized and stored separately from the event payload. This can be used by event - * adapters or other tools to store additional meta data without altering - * the actual domain event. - * - * Check the documentation of the persistence plugin in use to use if it supports - * EventWithMetaData. - */ -final class EventWithMetaData(val event: Any, val metaData: Any) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala index 53cf784f70..d07b37c3d3 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala @@ -19,7 +19,7 @@ import akka.persistence.AtomicWrite import akka.persistence.JournalProtocol.RecoverySuccess import akka.persistence.PersistentRepr import akka.persistence.journal.inmem.InmemJournal.{ MessageWithMeta, ReplayWithMeta } -import akka.persistence.journal.{ AsyncWriteJournal, EventWithMetaData, Tagged } +import akka.persistence.journal.{ AsyncWriteJournal, Tagged } import akka.serialization.SerializationExtension import akka.serialization.Serializers import akka.util.OptionVal @@ -143,13 +143,10 @@ object InmemJournal { // persistenceId -> highest used sequence number private var highestSequenceNumbers = Map.empty[String, Long] - // FIXME, which way around should Tagged/EventWithMeta go? https://github.com/akka/akka/issues/29284 def add(p: PersistentRepr): Unit = { val pr = p.payload match { case Tagged(payload, _) => (p.withPayload(payload).withTimestamp(System.currentTimeMillis()), OptionVal.None) - case meta: EventWithMetaData => - (p.withPayload(meta.event).withTimestamp(System.currentTimeMillis()), OptionVal.Some(meta.metaData)) - case _ => (p.withTimestamp(System.currentTimeMillis()), OptionVal.None) + case _ => (p.withTimestamp(System.currentTimeMillis()), OptionVal.None) } messages = messages + (messages.get(p.persistenceId) match {