diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala index 684d7b9324..7285a43b8f 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/EventEnvelope.scala @@ -4,14 +4,26 @@ package akka.persistence.query -import scala.runtime.AbstractFunction4 +import java.util.Optional +import akka.annotation.InternalApi + +import scala.runtime.AbstractFunction4 import akka.util.HashCode // for binary compatibility (used to be a case class) object EventEnvelope extends AbstractFunction4[Offset, String, Long, Any, EventEnvelope] { def apply(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any, timestamp: Long): EventEnvelope = - new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp) + new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp, None) + + def apply( + offset: Offset, + persistenceId: String, + sequenceNr: Long, + event: Any, + timestamp: Long, + meta: Option[Any]): EventEnvelope = + new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp, meta) @deprecated("for binary compatibility", "2.6.2") override def apply(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any): EventEnvelope = @@ -34,13 +46,26 @@ final class EventEnvelope( val persistenceId: String, val sequenceNr: Long, val event: Any, - val timestamp: Long) + val timestamp: Long, + val eventMetadata: Option[Any]) extends Product4[Offset, String, Long, Any] with Serializable { @deprecated("for binary compatibility", "2.6.2") def this(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any) = - this(offset, persistenceId, sequenceNr, event, 0L) + this(offset, persistenceId, sequenceNr, event, 0L, None) + + // bin compat 2.6.7 + def this(offset: Offset, persistenceId: String, sequenceNr: Long, event: Any, timestamp: Long) = + this(offset, persistenceId, sequenceNr, event, timestamp, None) + + /** + * Java API + */ + def getEventMetaData(): Optional[Any] = { + import scala.compat.java8.OptionConverters._ + eventMetadata.asJava + } override def hashCode(): Int = { var result = HashCode.SEED @@ -59,7 +84,7 @@ final class EventEnvelope( } override def toString: String = - s"EventEnvelope($offset,$persistenceId,$sequenceNr,$event,$timestamp)" + s"EventEnvelope($offset,$persistenceId,$sequenceNr,$event,$timestamp,$eventMetadata)" // for binary compatibility (used to be a case class) def copy( @@ -67,7 +92,11 @@ final class EventEnvelope( persistenceId: String = this.persistenceId, sequenceNr: Long = this.sequenceNr, event: Any = this.event): EventEnvelope = - new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp) + new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp, this.eventMetadata) + + @InternalApi + private[akka] def withMetadata(metadata: Any): EventEnvelope = + new EventEnvelope(offset, persistenceId, sequenceNr, event, timestamp, Some(metadata)) // Product4, for binary compatibility (used to be a case class) override def productPrefix = "EventEnvelope" diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala index 70aebd84b8..dcfc755322 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala @@ -5,7 +5,6 @@ package akka.persistence.query.journal.leveldb import scala.concurrent.duration.FiniteDuration - import akka.actor.ActorRef import akka.annotation.InternalApi import akka.persistence.JournalProtocol.RecoverySuccess diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagStage.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagStage.scala index 610071f827..85d3558a5e 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagStage.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagStage.scala @@ -5,7 +5,6 @@ package akka.persistence.query.journal.leveldb import scala.concurrent.duration.FiniteDuration - import akka.actor.ActorRef import akka.annotation.InternalApi import akka.persistence.JournalProtocol.RecoverySuccess diff --git a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/TestActor.scala b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/TestActor.scala index 06c3f6ff57..693ee109a3 100644 --- a/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/TestActor.scala +++ b/akka-persistence-query/src/test/scala/akka/persistence/query/journal/leveldb/TestActor.scala @@ -6,12 +6,16 @@ package akka.persistence.query.journal.leveldb import akka.actor.Props import akka.persistence.PersistentActor +import akka.persistence.journal.EventWithMetaData +import akka.persistence.query.journal.leveldb.TestActor.WithMeta object TestActor { def props(persistenceId: String): Props = Props(new TestActor(persistenceId)) case class DeleteCmd(toSeqNr: Long = Long.MaxValue) + + case class WithMeta(payload: String, meta: Any) } class TestActor(override val persistenceId: String) extends PersistentActor { @@ -26,10 +30,14 @@ class TestActor(override val persistenceId: String) extends PersistentActor { case DeleteCmd(toSeqNr) => deleteMessages(toSeqNr) sender() ! s"$toSeqNr-deleted" + case WithMeta(payload, meta) => + persist(EventWithMetaData(payload, meta)) { _ => + sender() ! s"$payload-done" + } case cmd: String => persist(cmd) { evt => - sender() ! evt + "-done" + sender() ! s"$evt-done" } } diff --git a/akka-persistence-testkit/src/main/resources/reference.conf b/akka-persistence-testkit/src/main/resources/reference.conf index f4d10a05f7..223191e06b 100644 --- a/akka-persistence-testkit/src/main/resources/reference.conf +++ b/akka-persistence-testkit/src/main/resources/reference.conf @@ -28,3 +28,7 @@ akka.persistence.testkit { } } + +akka.persistence.testkit.query { + class = "akka.persistence.testkit.query.PersistenceTestKitReadJournalProvider" +} diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/EventStorage.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/EventStorage.scala index 070e39901f..ff458085c7 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/EventStorage.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/EventStorage.scala @@ -8,9 +8,9 @@ import java.util.{ List => JList } import scala.collection.immutable import scala.util.{ Failure, Success, Try } - import akka.annotation.InternalApi import akka.persistence.PersistentRepr +import akka.persistence.testkit.EventStorage.{ JournalPolicies, Metadata } import akka.persistence.testkit.ProcessingPolicy.DefaultPolicies import akka.persistence.testkit.internal.TestKitStorage import akka.util.ccompat.JavaConverters._ @@ -19,7 +19,7 @@ import akka.util.ccompat.JavaConverters._ * INTERNAL API */ @InternalApi -private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, PersistentRepr] { +private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, (PersistentRepr, Metadata)] { import EventStorage._ @@ -31,21 +31,23 @@ private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, Per // and therefore must be done at the same time with the update, not before updateOrSetNew(key, v => v ++ mapAny(key, elems).toVector) - override def reprToSeqNum(repr: PersistentRepr): Long = repr.sequenceNr + override def reprToSeqNum(repr: (PersistentRepr, Metadata)): Long = repr._1.sequenceNr - def add(elems: immutable.Seq[PersistentRepr]): Unit = - elems.groupBy(_.persistenceId).foreach(gr => add(gr._1, gr._2)) + def add(elems: immutable.Seq[(PersistentRepr, Metadata)]): Unit = + elems.groupBy(_._1.persistenceId).foreach { gr => + add(gr._1, gr._2) + } override protected val DefaultPolicy = JournalPolicies.PassAll /** * @throws Exception from StorageFailure in the current writing policy */ - def tryAdd(elems: immutable.Seq[PersistentRepr]): Try[Unit] = { - val grouped = elems.groupBy(_.persistenceId) + def tryAdd(elems: immutable.Seq[(PersistentRepr, Metadata)]): Try[Unit] = { + val grouped = elems.groupBy(_._1.persistenceId) val processed = grouped.map { - case (pid, els) => currentPolicy.tryProcess(pid, WriteEvents(els.map(_.payload))) + case (pid, els) => currentPolicy.tryProcess(pid, WriteEvents(els.map(_._1.payload))) } val reduced: ProcessingResult = @@ -71,8 +73,8 @@ private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, Per persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, - max: Long): immutable.Seq[PersistentRepr] = { - val batch = read(persistenceId, fromSequenceNr, toSequenceNr, max) + max: Long): immutable.Seq[(PersistentRepr, Metadata)] = { + val batch: immutable.Seq[(PersistentRepr, Metadata)] = read(persistenceId, fromSequenceNr, toSequenceNr, max) currentPolicy.tryProcess(persistenceId, ReadEvents(batch)) match { case ProcessingSuccess => batch case Reject(ex) => throw ex @@ -96,9 +98,9 @@ private[testkit] trait EventStorage extends TestKitStorage[JournalOperation, Per } } - private def mapAny(key: String, elems: immutable.Seq[Any]): immutable.Seq[PersistentRepr] = { + private def mapAny(key: String, elems: immutable.Seq[Any]): immutable.Seq[(PersistentRepr, Metadata)] = { val sn = getHighestSeqNumber(key) + 1 - elems.zipWithIndex.map(p => PersistentRepr(p._1, p._2 + sn, key)) + elems.zipWithIndex.map(p => (PersistentRepr(p._1, p._2 + sn, key), NoMetadata)) } } @@ -107,6 +109,24 @@ object EventStorage { object JournalPolicies extends DefaultPolicies[JournalOperation] + /** + * INTERNAL API + */ + @InternalApi + private[testkit] sealed trait Metadata + + /** + * INTERNAL API + */ + @InternalApi + private[testkit] case object NoMetadata extends Metadata + + /** + * INTERNAL API + */ + @InternalApi + private[testkit] final case class WithMetadata(payload: Any) extends Metadata + } /** diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala index 9730e8a918..7d8e899b89 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala @@ -7,13 +7,12 @@ package akka.persistence.testkit import scala.collection.immutable import scala.concurrent.Future import scala.util.Try - import com.typesafe.config.{ Config, ConfigFactory } - import akka.annotation.InternalApi import akka.persistence._ -import akka.persistence.journal.{ AsyncWriteJournal, Tagged } +import akka.persistence.journal.{ AsyncWriteJournal, EventWithMetaData, Tagged } import akka.persistence.snapshot.SnapshotStore +import akka.persistence.testkit.EventStorage.{ NoMetadata, WithMetadata } import akka.persistence.testkit.internal.{ InMemStorageExtension, SnapshotStorageEmulatorExtension } /** @@ -25,15 +24,25 @@ import akka.persistence.testkit.internal.{ InMemStorageExtension, SnapshotStorag class PersistenceTestKitPlugin extends AsyncWriteJournal { private final val storage = InMemStorageExtension(context.system) + private val eventStream = context.system.eventStream override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = Future.fromTry(Try(messages.map(aw => { val data = aw.payload.map(pl => pl.payload match { - case Tagged(p, _) => pl.withPayload(p) - case _ => pl + // TODO define how to handle tagged and metadata + case Tagged(p, _) => (pl.withPayload(p).withTimestamp(System.currentTimeMillis()), NoMetadata) + case evt: EventWithMetaData => + (pl.withPayload(evt.event).withTimestamp(System.currentTimeMillis()), WithMetadata(evt.metaData)) + case _ => (pl.withTimestamp(System.currentTimeMillis()), NoMetadata) }) - storage.tryAdd(data) + + val result: Try[Unit] = storage.tryAdd(data) + result.foreach { _ => + messages.foreach(aw => + eventStream.publish(PersistenceTestKitPlugin.Write(aw.persistenceId, aw.highestSequenceNr))) + } + result }))) override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = @@ -41,7 +50,8 @@ class PersistenceTestKitPlugin extends AsyncWriteJournal { override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)( recoveryCallback: PersistentRepr => Unit): Future[Unit] = - Future.fromTry(Try(storage.tryRead(persistenceId, fromSequenceNr, toSequenceNr, max).foreach(recoveryCallback))) + Future.fromTry( + Try(storage.tryRead(persistenceId, fromSequenceNr, toSequenceNr, max).map(_._1).foreach(recoveryCallback))) override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = Future.fromTry(Try { @@ -64,6 +74,8 @@ object PersistenceTestKitPlugin { "akka.persistence.journal.plugin" -> PluginId, s"$PluginId.class" -> s"${classOf[PersistenceTestKitPlugin].getName}").asJava) + private[testkit] case class Write(persistenceId: String, toSequenceNr: Long) + } /** diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SerializedEventStorageImpl.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SerializedEventStorageImpl.scala index 9d25479dba..dd6242ee95 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SerializedEventStorageImpl.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SerializedEventStorageImpl.scala @@ -4,40 +4,56 @@ package akka.persistence.testkit.internal -import scala.util.Try - import akka.actor.{ ActorSystem, ExtendedActorSystem } import akka.annotation.InternalApi import akka.persistence.PersistentRepr import akka.persistence.testkit.EventStorage -import akka.serialization.{ Serialization, SerializationExtension } +import akka.persistence.testkit.EventStorage.Metadata +import akka.persistence.testkit.internal.SerializedEventStorageImpl.Serialized +import akka.serialization.{ Serialization, SerializationExtension, Serializers } + +@InternalApi +private[testkit] object SerializedEventStorageImpl { + case class Serialized( + persistenceId: String, + sequenceNr: Long, + payloadSerId: Int, + payloadSerManifest: String, + writerUuid: String, + payload: Array[Byte], + metadata: Metadata) +} /** * INTERNAL API + * FIXME, once we add serializers for metadata serialize the metadata payload if present */ @InternalApi private[testkit] class SerializedEventStorageImpl(system: ActorSystem) extends EventStorage { - - override type InternalRepr = (Int, Array[Byte]) + override type InternalRepr = Serialized private lazy val serialization = SerializationExtension(system) /** * @return (serializer id, serialized bytes) */ - override def toInternal(repr: PersistentRepr): (Int, Array[Byte]) = + override def toInternal(repr: (PersistentRepr, Metadata)): Serialized = Serialization.withTransportInformation(system.asInstanceOf[ExtendedActorSystem]) { () => - val s = serialization.findSerializerFor(repr) - (s.identifier, s.toBinary(repr)) + val (pr, meta) = repr + val payload = pr.payload.asInstanceOf[AnyRef] + val s = serialization.findSerializerFor(payload) + val manifest = Serializers.manifestFor(s, payload) + Serialized(pr.persistenceId, pr.sequenceNr, s.identifier, manifest, pr.writerUuid, s.toBinary(payload), meta) } /** * @param internal (serializer id, serialized bytes) */ - override def toRepr(internal: (Int, Array[Byte])): PersistentRepr = - serialization - .deserialize(internal._2, internal._1, PersistentRepr.Undefined) - .flatMap(r => Try(r.asInstanceOf[PersistentRepr])) - .get + override def toRepr(internal: Serialized): (PersistentRepr, Metadata) = { + val event = serialization.deserialize(internal.payload, internal.payloadSerId, internal.payloadSerManifest).get + ( + PersistentRepr(event, internal.sequenceNr, internal.persistenceId, writerUuid = internal.writerUuid), + internal.metadata) + } } diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SimpleEventStorageImpl.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SimpleEventStorageImpl.scala index 712c311498..25b63c8042 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SimpleEventStorageImpl.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SimpleEventStorageImpl.scala @@ -7,6 +7,7 @@ package akka.persistence.testkit.internal import akka.annotation.InternalApi import akka.persistence._ import akka.persistence.testkit.EventStorage +import akka.persistence.testkit.EventStorage.Metadata /** * INTERNAL API @@ -14,10 +15,10 @@ import akka.persistence.testkit.EventStorage @InternalApi private[testkit] class SimpleEventStorageImpl extends EventStorage { - override type InternalRepr = PersistentRepr + override type InternalRepr = (PersistentRepr, Metadata) - override def toInternal(repr: PersistentRepr): PersistentRepr = repr + override def toInternal(repr: (PersistentRepr, Metadata)): (PersistentRepr, Metadata) = repr - override def toRepr(internal: PersistentRepr): PersistentRepr = internal + override def toRepr(internal: (PersistentRepr, Metadata)): (PersistentRepr, Metadata) = internal } diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/PersistenceTestKitReadJournalProvider.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/PersistenceTestKitReadJournalProvider.scala new file mode 100644 index 0000000000..dcd6246d40 --- /dev/null +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/PersistenceTestKitReadJournalProvider.scala @@ -0,0 +1,16 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.testkit.query +import akka.actor.ExtendedActorSystem +import akka.persistence.query.ReadJournalProvider + +class PersistenceTestKitReadJournalProvider(system: ExtendedActorSystem) extends ReadJournalProvider { + + override def scaladslReadJournal(): scaladsl.PersistenceTestKitReadJournal = + new scaladsl.PersistenceTestKitReadJournal(system) + + override def javadslReadJournal(): javadsl.PersistenceTestKitReadJournal = + new javadsl.PersistenceTestKitReadJournal(scaladslReadJournal()) +} diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/internal/EventsByPersistenceIdStage.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/internal/EventsByPersistenceIdStage.scala new file mode 100644 index 0000000000..af115ae4dc --- /dev/null +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/internal/EventsByPersistenceIdStage.scala @@ -0,0 +1,87 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.testkit.query.internal +import akka.actor.ActorRef +import akka.annotation.InternalApi +import akka.persistence.query.{ EventEnvelope, Sequence } +import akka.persistence.testkit.{ EventStorage, PersistenceTestKitPlugin } +import akka.persistence.testkit.EventStorage.{ NoMetadata, WithMetadata } +import akka.stream.{ Attributes, Outlet, SourceShape } +import akka.stream.stage.{ GraphStage, GraphStageLogic, GraphStageLogicWithLogging, OutHandler } + +/** + * INTERNAL API + */ +@InternalApi +final private[akka] class EventsByPersistenceIdStage( + persistenceId: String, + fromSequenceNr: Long, + toSequenceNr: Long, + storage: EventStorage) + extends GraphStage[SourceShape[EventEnvelope]] { + val out: Outlet[EventEnvelope] = Outlet("EventsByPersistenceIdSource") + override def shape: SourceShape[EventEnvelope] = SourceShape(out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { + new GraphStageLogicWithLogging(shape) with OutHandler { + private var currentSequenceNr = math.max(fromSequenceNr, 1) + private var stageActorRef: ActorRef = null + override def preStart(): Unit = { + stageActorRef = getStageActor(receiveNotifications).ref + materializer.system.eventStream.subscribe(stageActorRef, classOf[PersistenceTestKitPlugin.Write]) + } + + private def receiveNotifications(in: (ActorRef, Any)): Unit = { + val (_, msg) = in + msg match { + case PersistenceTestKitPlugin.Write(pid, toSequenceNr) if pid == persistenceId => + log.debug("Write notification {} {}", pid, toSequenceNr) + if (toSequenceNr >= currentSequenceNr) { + tryPush() + } + case _ => + } + } + + private def tryPush(): Unit = { + if (isAvailable(out)) { + val event = storage.tryRead(persistenceId, currentSequenceNr, currentSequenceNr, 1) + log.debug("tryPush available. Query for {} {} result {}", currentSequenceNr, currentSequenceNr, event) + event.headOption match { + case Some((pr, meta)) => + push( + out, + EventEnvelope( + Sequence(pr.sequenceNr), + pr.persistenceId, + pr.sequenceNr, + pr.payload, + pr.timestamp, + meta match { + case NoMetadata => None + case WithMetadata(m) => Some(m) + })) + if (currentSequenceNr == toSequenceNr) { + completeStage() + } else { + currentSequenceNr += 1 + } + case None => + } + } else { + log.debug("tryPush, no demand") + } + } + + override def onPull(): Unit = { + tryPush() + } + + setHandler(out, this) + } + + } + +} diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/javadsl/PersistenceTestKitReadJournal.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/javadsl/PersistenceTestKitReadJournal.scala new file mode 100644 index 0000000000..9114994f10 --- /dev/null +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/javadsl/PersistenceTestKitReadJournal.scala @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.testkit.query.javadsl +import akka.NotUsed +import akka.persistence.query.EventEnvelope +import akka.persistence.query.javadsl.{ CurrentEventsByPersistenceIdQuery, EventsByPersistenceIdQuery, ReadJournal } +import akka.stream.javadsl.Source +import akka.persistence.testkit.query.scaladsl + +object PersistenceTestKitReadJournal { + val Identifier = "akka.persistence.testkit.query" +} + +final class PersistenceTestKitReadJournal(delegate: scaladsl.PersistenceTestKitReadJournal) + extends ReadJournal + with EventsByPersistenceIdQuery + with CurrentEventsByPersistenceIdQuery { + + override def eventsByPersistenceId( + persistenceId: String, + fromSequenceNr: Long, + toSequenceNr: Long): Source[EventEnvelope, NotUsed] = + delegate.eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr).asJava + + override def currentEventsByPersistenceId( + persistenceId: String, + fromSequenceNr: Long, + toSequenceNr: Long): Source[EventEnvelope, NotUsed] = + delegate.currentEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr).asJava +} diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala new file mode 100644 index 0000000000..fc52243c67 --- /dev/null +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/query/scaladsl/PersistenceTestKitReadJournal.scala @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.testkit.query.scaladsl +import akka.NotUsed +import akka.actor.ExtendedActorSystem +import akka.persistence.query.{ EventEnvelope, Sequence } +import akka.persistence.query.scaladsl.{ CurrentEventsByPersistenceIdQuery, EventsByPersistenceIdQuery, ReadJournal } +import akka.persistence.testkit.EventStorage +import akka.persistence.testkit.EventStorage.{ NoMetadata, WithMetadata } +import akka.persistence.testkit.internal.InMemStorageExtension +import akka.persistence.testkit.query.internal.EventsByPersistenceIdStage +import akka.stream.scaladsl.Source + +object PersistenceTestKitReadJournal { + val Identifier = "akka.persistence.testkit.query" +} + +final class PersistenceTestKitReadJournal(system: ExtendedActorSystem) + extends ReadJournal + with EventsByPersistenceIdQuery + with CurrentEventsByPersistenceIdQuery { + + private final val storage: EventStorage = InMemStorageExtension(system) + + override def eventsByPersistenceId( + persistenceId: String, + fromSequenceNr: Long, + toSequenceNr: Long): Source[EventEnvelope, NotUsed] = { + Source.fromGraph(new EventsByPersistenceIdStage(persistenceId, fromSequenceNr, toSequenceNr, storage)) + } + + override def currentEventsByPersistenceId( + persistenceId: String, + fromSequenceNr: Long, + toSequenceNr: Long): Source[EventEnvelope, NotUsed] = { + Source(storage.tryRead(persistenceId, fromSequenceNr, toSequenceNr, Long.MaxValue)).map { + case (pr, meta) => + EventEnvelope(Sequence(pr.sequenceNr), persistenceId, pr.sequenceNr, pr.payload, pr.timestamp, meta match { + case NoMetadata => None + case WithMetadata(payload) => Some(payload) + }) + } + } +} diff --git a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala index aae6f4de07..4a959adfc1 100644 --- a/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala +++ b/akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala @@ -7,9 +7,7 @@ package akka.persistence.testkit.scaladsl import scala.collection.immutable import scala.concurrent.duration.FiniteDuration import scala.util.Try - import com.typesafe.config.Config - import akka.actor.ActorSystem import akka.actor.ClassicActorSystemProvider import akka.actor.ExtendedActorSystem @@ -20,6 +18,7 @@ import akka.annotation.ApiMayChange import akka.persistence.Persistence import akka.persistence.PersistentRepr import akka.persistence.SnapshotMetadata +import akka.persistence.testkit.EventStorage.Metadata import akka.persistence.testkit._ import akka.persistence.testkit.internal.InMemStorageExtension import akka.persistence.testkit.internal.SnapshotStorageEmulatorExtension @@ -424,9 +423,9 @@ object SnapshotTestKit { */ @ApiMayChange class PersistenceTestKit(system: ActorSystem) - extends PersistenceTestKitOps[PersistentRepr, JournalOperation] - with ExpectOps[PersistentRepr] - with HasStorage[JournalOperation, PersistentRepr] { + extends PersistenceTestKitOps[(PersistentRepr, Metadata), JournalOperation] + with ExpectOps[(PersistentRepr, Metadata)] + with HasStorage[JournalOperation, (PersistentRepr, Metadata)] { require( Try(Persistence(system).journalFor(PersistenceTestKitPlugin.PluginId)).isSuccess, "The test persistence plugin is not configured.") @@ -495,7 +494,7 @@ class PersistenceTestKit(system: ActorSystem) def persistedInStorage(persistenceId: String): immutable.Seq[Any] = storage.read(persistenceId).getOrElse(List.empty).map(reprToAny) - override private[testkit] def reprToAny(repr: PersistentRepr): Any = repr.payload + override private[testkit] def reprToAny(repr: (PersistentRepr, Metadata)): Any = repr._1.payload } @ApiMayChange diff --git a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/query/EventsByPersistenceIdSpec.scala b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/query/EventsByPersistenceIdSpec.scala new file mode 100644 index 0000000000..144a98108c --- /dev/null +++ b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/query/EventsByPersistenceIdSpec.scala @@ -0,0 +1,169 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.testkit.query + +import akka.{ Done, NotUsed } +import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit } +import akka.actor.typed.ActorRef +import akka.persistence.journal.EventWithMetaData +import akka.persistence.query.{ EventEnvelope, PersistenceQuery } +import akka.persistence.testkit.PersistenceTestKitPlugin +import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal +import akka.persistence.typed.{ EventAdapter, EventSeq, PersistenceId } +import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior } +import akka.stream.scaladsl.Source +import akka.stream.testkit.scaladsl.TestSink +import com.typesafe.config.ConfigFactory +import org.scalatest.wordspec.AnyWordSpecLike + +import scala.concurrent.duration._ + +object EventsByPersistenceIdSpec { + val config = PersistenceTestKitPlugin.config.withFallback( + ConfigFactory.parseString(""" + akka.loglevel = DEBUG + akka.loggers = ["akka.testkit.SilenceAllTestEventListener"] + akka.persistence.testkit.events.serialize = off + """)) + + case class Command(evt: String, ack: ActorRef[Done]) + case class State() + + def testBehaviour(persistenceId: String) = { + EventSourcedBehavior[Command, String, State]( + PersistenceId.ofUniqueId(persistenceId), + State(), + (_, command) => + Effect.persist(command.evt).thenRun { _ => + command.ack ! Done + }, + (state, _) => state) + }.eventAdapter(new EventAdapter[String, Any] { + override def toJournal(e: String): Any = { + if (e.startsWith("m")) { + EventWithMetaData(e, s"$e-meta") + } else { + e + } + } + override def manifest(event: String): String = "" + override def fromJournal(p: Any, manifest: String): EventSeq[String] = p match { + case e: EventWithMetaData => EventSeq.single(e.event.toString) + case _ => EventSeq.single(p.toString) + } + }) + +} + +class EventsByPersistenceIdSpec + extends ScalaTestWithActorTestKit(EventsByPersistenceIdSpec.config) + with LogCapturing + with AnyWordSpecLike { + import EventsByPersistenceIdSpec._ + + implicit val classic = system.classicSystem + + val queries = + PersistenceQuery(system).readJournalFor[PersistenceTestKitReadJournal](PersistenceTestKitReadJournal.Identifier) + + def setup(persistenceId: String): ActorRef[Command] = { + val probe = createTestProbe[Done]() + val ref = setupEmpty(persistenceId) + ref ! Command(s"$persistenceId-1", probe.ref) + ref ! Command(s"$persistenceId-2", probe.ref) + ref ! Command(s"$persistenceId-3", probe.ref) + probe.expectMessage(Done) + probe.expectMessage(Done) + probe.expectMessage(Done) + ref + } + + def setupEmpty(persistenceId: String): ActorRef[Command] = { + spawn(testBehaviour(persistenceId)) + } + + "Persistent test kit live query EventsByPersistenceId" must { + "find new events" in { + val ackProbe = createTestProbe[Done]() + val ref = setup("c") + val src = queries.eventsByPersistenceId("c", 0L, Long.MaxValue) + val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(5).expectNext("c-1", "c-2", "c-3") + + ref ! Command("c-4", ackProbe.ref) + ackProbe.expectMessage(Done) + + probe.expectNext("c-4") + } + + "find new events up to a sequence number" in { + val ackProbe = createTestProbe[Done]() + val ref = setup("d") + val src = queries.eventsByPersistenceId("d", 0L, 4L) + val probe = src.map(_.event).runWith(TestSink.probe[Any]).request(5).expectNext("d-1", "d-2", "d-3") + + ref ! Command("d-4", ackProbe.ref) + ackProbe.expectMessage(Done) + + probe.expectNext("d-4").expectComplete() + } + + "find new events after demand request" in { + val ackProbe = createTestProbe[Done]() + val ref = setup("e") + val src = queries.eventsByPersistenceId("e", 0L, Long.MaxValue) + val probe = + src.map(_.event).runWith(TestSink.probe[Any]).request(2).expectNext("e-1", "e-2").expectNoMessage(100.millis) + + ref ! Command("e-4", ackProbe.ref) + ackProbe.expectMessage(Done) + + probe.expectNoMessage(100.millis).request(5).expectNext("e-3").expectNext("e-4") + } + + "include timestamp in EventEnvelope" in { + setup("n") + + val src = queries.eventsByPersistenceId("n", 0L, Long.MaxValue) + val probe = src.runWith(TestSink.probe[EventEnvelope]) + + probe.request(5) + probe.expectNext().timestamp should be > 0L + probe.expectNext().timestamp should be > 0L + probe.cancel() + } + + "not complete for empty persistence id" in { + val ackProbe = createTestProbe[Done]() + val src = queries.eventsByPersistenceId("o", 0L, Long.MaxValue) + val probe = + src.map(_.event).runWith(TestSink.probe[Any]).request(2) + + probe.expectNoMessage(200.millis) // must not complete + + val ref = setupEmpty("o") + ref ! Command("o-1", ackProbe.ref) + ackProbe.expectMessage(Done) + + probe.cancel() + } + + "return metadata in queries" in { + val ackProbe = createTestProbe[Done]() + val ref = setupEmpty("with-meta") + ref ! Command("m-1", ackProbe.ref) + ref ! Command("m-2", ackProbe.ref) + val src: Source[EventEnvelope, NotUsed] = queries.eventsByPersistenceId("with-meta", 0L, Long.MaxValue) + val probe = + src.runWith(TestSink.probe[Any]).request(3) + probe.expectNextPF { + case e @ EventEnvelope(_, "with-meta", 1L, "m-1") if e.eventMetadata.contains("m-1-meta") => + } + + probe.expectNextPF { + case e @ EventEnvelope(_, "with-meta", 2L, "m-2") if e.eventMetadata.contains("m-2-meta") => + } + } + } +} diff --git a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/scaladsl/EventSourcedBehaviorTestKitSpec.scala b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/scaladsl/EventSourcedBehaviorTestKitSpec.scala index 39869b475d..38ce74b8ea 100644 --- a/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/scaladsl/EventSourcedBehaviorTestKitSpec.scala +++ b/akka-persistence-testkit/src/test/scala/akka/persistence/testkit/scaladsl/EventSourcedBehaviorTestKitSpec.scala @@ -234,7 +234,7 @@ class EventSourcedBehaviorTestKitSpec val eventSourcedTestKit = createTestKit() val exc = intercept[IllegalArgumentException] { - eventSourcedTestKit.runCommand(TestCounter.IncrementWithNotSerializableReply(_)) + eventSourcedTestKit.runCommand(TestCounter.IncrementWithNotSerializableReply) } (exc.getMessage should include).regex("Reply.*isn't serializable") exc.getCause.getClass should ===(classOf[NotSerializableException]) diff --git a/akka-persistence-typed-tests/src/test/resources/logback-test.xml b/akka-persistence-typed-tests/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..c980894390 --- /dev/null +++ b/akka-persistence-typed-tests/src/test/resources/logback-test.xml @@ -0,0 +1,32 @@ + + + + + + + + %date{ISO8601} [%-5level] [%logger] [%marker] [%X{akkaSource}] [%X{persistenceId}] - %msg %n + + + + + + + + + + + + + + + + diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/aa/AAAuctionExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs.akka.persistence.typed/AAAuctionExampleSpec.scala similarity index 94% rename from akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/aa/AAAuctionExampleSpec.scala rename to akka-persistence-typed-tests/src/test/scala/docs.akka.persistence.typed/AAAuctionExampleSpec.scala index 9ca783e315..4577431cb6 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/aa/AAAuctionExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs.akka.persistence.typed/AAAuctionExampleSpec.scala @@ -2,30 +2,23 @@ * Copyright (C) 2020 Lightbend Inc. */ -package docs.akka.persistence.typed.aa +package docs.akka.persistence.typed import java.time.Instant import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit } import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, _ } import akka.actor.typed.{ ActorRef, Behavior } -import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal +import akka.persistence.testkit.PersistenceTestKitPlugin +import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal import akka.persistence.typed.scaladsl.{ ActiveActiveContext, ActiveActiveEventSourcing, Effect, EventSourcedBehavior } -import com.typesafe.config.ConfigFactory +import akka.serialization.jackson.CborSerializable import org.scalatest.concurrent.{ Eventually, ScalaFutures } import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike object AAAuctionExampleSpec { - val config = ConfigFactory.parseString(""" - akka.actor.provider = cluster - akka.loglevel = info - akka.persistence { - journal { - plugin = "akka.persistence.journal.inmem" - } - } - """) + type MoneyAmount = Int case class Bid(bidder: String, offer: MoneyAmount, timestamp: Instant, originDc: String) @@ -38,7 +31,7 @@ object AAAuctionExampleSpec { final case class IsClosed(replyTo: ActorRef[Boolean]) extends AuctionCommand private final case object Close extends AuctionCommand // Internal, should not be sent from the outside - sealed trait AuctionEvent + sealed trait AuctionEvent extends CborSerializable final case class BidRegistered(bid: Bid) extends AuctionEvent final case class AuctionFinished(atDc: String) extends AuctionEvent final case class WinnerDecided(atDc: String, winningBid: Bid, highestCounterOffer: MoneyAmount) extends AuctionEvent @@ -208,7 +201,7 @@ object AAAuctionExampleSpec { def behavior(replica: String, setup: AuctionSetup): Behavior[AuctionCommand] = Behaviors.setup[AuctionCommand] { ctx => - ActiveActiveEventSourcing(setup.name, replica, setup.allDcs, LeveldbReadJournal.Identifier) { aaCtx => + ActiveActiveEventSourcing(setup.name, replica, setup.allDcs, PersistenceTestKitReadJournal.Identifier) { aaCtx => EventSourcedBehavior( aaCtx.persistenceId, initialState(setup), @@ -219,7 +212,7 @@ object AAAuctionExampleSpec { } class AAAuctionExampleSpec - extends ScalaTestWithActorTestKit(AAAuctionExampleSpec.config) + extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config) with AnyWordSpecLike with Matchers with LogCapturing @@ -228,6 +221,7 @@ class AAAuctionExampleSpec import AAAuctionExampleSpec._ "Auction example" should { + "work" in { val Replicas = Set("DC-A", "DC-B") val setupA = diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/aa/AABlogExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs.akka.persistence.typed/AABlogExampleSpec.scala similarity index 85% rename from akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/aa/AABlogExampleSpec.scala rename to akka-persistence-typed-tests/src/test/scala/docs.akka.persistence.typed/AABlogExampleSpec.scala index 9f5b288ee9..4fc552a0f1 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/aa/AABlogExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs.akka.persistence.typed/AABlogExampleSpec.scala @@ -2,41 +2,22 @@ * Copyright (C) 2020 Lightbend Inc. */ -package docs.akka.persistence.typed.aa - -import java.util.UUID +package docs.akka.persistence.typed import akka.Done import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit } import akka.actor.typed.ActorRef import akka.actor.typed.scaladsl.{ ActorContext, Behaviors } -import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal +import akka.persistence.testkit.PersistenceTestKitPlugin +import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal import akka.persistence.typed.scaladsl._ import akka.serialization.jackson.CborSerializable -import com.typesafe.config.ConfigFactory import org.scalatest.concurrent.{ Eventually, ScalaFutures } import org.scalatest.matchers.should.Matchers import org.scalatest.time.{ Millis, Span } import org.scalatest.wordspec.AnyWordSpecLike object AABlogExampleSpec { - val config = - ConfigFactory.parseString(s""" - - akka.actor.allow-java-serialization = true - // FIXME serializers for replicated event or akka persistence support for metadata: https://github.com/akka/akka/issues/29260 - akka.actor.provider = cluster - akka.loglevel = debug - akka.persistence { - journal { - plugin = "akka.persistence.journal.leveldb" - leveldb { - native = off - dir = "target/journal-AABlogExampleSpec-${UUID.randomUUID()}" - } - } - } - """) final case class BlogState(content: Option[PostContent], contentTimestamp: LwwTime, published: Boolean) { def withContent(newContent: PostContent, timestamp: LwwTime): BlogState = @@ -62,7 +43,7 @@ object AABlogExampleSpec { } class AABlogExampleSpec - extends ScalaTestWithActorTestKit(AABlogExampleSpec.config) + extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config) with AnyWordSpecLike with Matchers with LogCapturing @@ -124,7 +105,7 @@ class AABlogExampleSpec "work" in { val refDcA: ActorRef[BlogCommand] = spawn(Behaviors.setup[BlogCommand] { ctx => - ActiveActiveEventSourcing("cat", "DC-A", Set("DC-A", "DC-B"), LeveldbReadJournal.Identifier) { + ActiveActiveEventSourcing("cat", "DC-A", Set("DC-A", "DC-B"), PersistenceTestKitReadJournal.Identifier) { (aa: ActiveActiveContext) => behavior(aa, ctx) } @@ -132,7 +113,7 @@ class AABlogExampleSpec val refDcB: ActorRef[BlogCommand] = spawn(Behaviors.setup[BlogCommand] { ctx => - ActiveActiveEventSourcing("cat", "DC-B", Set("DC-A", "DC-B"), LeveldbReadJournal.Identifier) { + ActiveActiveEventSourcing("cat", "DC-B", Set("DC-A", "DC-B"), PersistenceTestKitReadJournal.Identifier) { (aa: ActiveActiveContext) => behavior(aa, ctx) } diff --git a/akka-persistence-typed/src/main/mima-filters/2.6.7.backwards.excludes/29217-active-active-event-sourcing.excludes b/akka-persistence-typed/src/main/mima-filters/2.6.7.backwards.excludes/29217-active-active-event-sourcing.excludes new file mode 100644 index 0000000000..b0597f04c8 --- /dev/null +++ b/akka-persistence-typed/src/main/mima-filters/2.6.7.backwards.excludes/29217-active-active-event-sourcing.excludes @@ -0,0 +1,6 @@ +# Changes to internal/private +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.typed.scaladsl.EventSourcedBehavior.withActiveActive") +ProblemFilters.exclude[Problem]("akka.persistence.typed.internal.Running*") +ProblemFilters.exclude[Problem]("akka.persistence.typed.internal.EventSourcedBehaviorImpl.*") +ProblemFilters.exclude[Problem]("akka.persistence.typed.internal.BehaviorSetup*") + diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index bc32587879..4b4f8205bc 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -263,5 +263,10 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( extends InternalProtocol } -final case class ReplicatedEvent[E](event: E, originReplica: String, originSequenceNr: Long) -case object ReplicatedEventAck +// FIXME serializer +@InternalApi +private[akka] final case class ReplicatedEventMetaData(originDc: String) +@InternalApi +private[akka] final case class ReplicatedEvent[E](event: E, originReplica: String, originSequenceNr: Long) +@InternalApi +private[akka] case object ReplicatedEventAck diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index 66e7f8732c..846051b758 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -23,7 +23,7 @@ import akka.persistence.PersistentRepr import akka.persistence.SaveSnapshotFailure import akka.persistence.SaveSnapshotSuccess import akka.persistence.SnapshotProtocol -import akka.persistence.journal.Tagged +import akka.persistence.journal.{ EventWithMetaData, Tagged } import akka.persistence.query.{ EventEnvelope, PersistenceQuery } import akka.persistence.query.scaladsl.EventsByPersistenceIdQuery import akka.persistence.typed.{ @@ -121,7 +121,11 @@ private[akka] object Running { .eventsByPersistenceId(pid.id, seqNr + 1, Long.MaxValue) .via(ActorFlow.ask[EventEnvelope, ReplicatedEventEnvelope[E], ReplicatedEventAck.type](ref) { (eventEnvelope, replyTo) => - ReplicatedEventEnvelope(eventEnvelope.event.asInstanceOf[ReplicatedEvent[E]], replyTo) + val re = ReplicatedEvent[E]( + eventEnvelope.event.asInstanceOf[E], + eventEnvelope.eventMetadata.get.asInstanceOf[ReplicatedEventMetaData].originDc, + eventEnvelope.sequenceNr) // FIXME, this is the wrong sequence nr, we need origin sequence nr, follow up with tests that show this + ReplicatedEventEnvelope(re, replyTo) }) } @@ -206,8 +210,9 @@ private[akka] object Running { private def handleReplicatedEventPersist(event: ReplicatedEvent[E]): Behavior[InternalProtocol] = { _currentSequenceNumber = state.seqNr + 1 + val replicatedEvent = new EventWithMetaData(event.event, ReplicatedEventMetaData(event.originReplica)) val newState: RunningState[S] = state.applyEvent(setup, event.event) - val newState2: RunningState[S] = internalPersist(setup.context, null, newState, event, "") + val newState2: RunningState[S] = internalPersist(setup.context, null, newState, replicatedEvent, "") val shouldSnapshotAfterPersist = setup.shouldSnapshot(newState2.state, event.event, newState2.seqNr) // FIXME validate this is the correct sequence nr from that replica https://github.com/akka/akka/issues/29259 val updatedSeen = newState2.seenPerReplica.updated(event.originReplica, event.originSequenceNr) @@ -241,10 +246,11 @@ private[akka] object Running { // the invalid event, in case such validation is implemented in the event handler. // also, ensure that there is an event handler for each single event _currentSequenceNumber = state.seqNr + 1 - val newState = state.applyEvent(setup, event) + val newState: RunningState[S] = state.applyEvent(setup, event) val eventToPersist = adaptEvent(event) val eventAdapterManifest = setup.eventAdapter.manifest(event) + val newState2 = setup.activeActive match { case Some(aa) => val replicatedEvent = ReplicatedEvent(eventToPersist, aa.replicaId, _currentSequenceNumber) diff --git a/akka-persistence/src/main/mima-filters/2.6.7.backwards.excludes/29217-active-active-event-sourcing.excludes b/akka-persistence/src/main/mima-filters/2.6.7.backwards.excludes/29217-active-active-event-sourcing.excludes new file mode 100644 index 0000000000..91d5ea01ed --- /dev/null +++ b/akka-persistence/src/main/mima-filters/2.6.7.backwards.excludes/29217-active-active-event-sourcing.excludes @@ -0,0 +1,4 @@ +# Changes to internal/private + +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.journal.inmem.InmemMessages.*") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.journal.inmem.InmemJournal.*") diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/EventWithMetaData.scala b/akka-persistence/src/main/scala/akka/persistence/journal/EventWithMetaData.scala new file mode 100644 index 0000000000..c1bfc6aa11 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/journal/EventWithMetaData.scala @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.persistence.journal + +object EventWithMetaData { + + def apply(event: Any, metaData: Any): EventWithMetaData = new EventWithMetaData(event, metaData) + + /** + * If meta data could not be deserialized it will not fail the replay/query. + * The "invalid" meta data is represented with this `UnknownMetaData` and + * it and the event will be wrapped in `EventWithMetaData`. + * + * The reason for not failing the replay/query is that meta data should be + * optional, e.g. the tool that wrote the meta data has been removed. This + * is typically because the serializer for the meta data has been removed + * from the class path (or configuration). + */ + final class UnknownMetaData(val serializerId: Int, val manifest: String) +} + +/** + * If the event is wrapped in this class the `metaData` will + * be serialized and stored separately from the event payload. This can be used by event + * adapters or other tools to store additional meta data without altering + * the actual domain event. + * + * Check the documentation of the persistence plugin in use to use if it supports + * EventWithMetaData. + */ +final class EventWithMetaData(val event: Any, val metaData: Any) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala index be2f0122ef..53cf784f70 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala @@ -4,21 +4,25 @@ package akka.persistence.journal.inmem +import akka.actor.ActorRef + import scala.collection.immutable import scala.concurrent.Future import scala.util.Try import scala.util.control.NonFatal - import com.typesafe.config.Config import com.typesafe.config.ConfigFactory - import akka.annotation.ApiMayChange import akka.annotation.InternalApi +import akka.event.Logging import akka.persistence.AtomicWrite +import akka.persistence.JournalProtocol.RecoverySuccess import akka.persistence.PersistentRepr -import akka.persistence.journal.{ AsyncWriteJournal, Tagged } +import akka.persistence.journal.inmem.InmemJournal.{ MessageWithMeta, ReplayWithMeta } +import akka.persistence.journal.{ AsyncWriteJournal, EventWithMetaData, Tagged } import akka.serialization.SerializationExtension import akka.serialization.Serializers +import akka.util.OptionVal /** * The InmemJournal publishes writes and deletes to the `eventStream`, which tests may use to @@ -32,8 +36,17 @@ object InmemJournal { sealed trait Operation final case class Write(event: Any, persistenceId: String, sequenceNr: Long) extends Operation - final case class Delete(persistenceId: String, toSequenceNr: Long) extends Operation + + @InternalApi + private[persistence] case class ReplayWithMeta( + from: Long, + to: Long, + limit: Long, + persistenceId: String, + replyTo: ActorRef) + @InternalApi + private[persistence] case class MessageWithMeta(pr: PersistentRepr, meta: OptionVal[Any]) } /** @@ -45,6 +58,8 @@ object InmemJournal { def this() = this(ConfigFactory.empty()) + private val log = Logging(context.system, classOf[InmemJournal]) + private val testSerialization = { val key = "test-serialization" if (cfg.hasPath(key)) cfg.getBoolean("test-serialization") @@ -78,7 +93,9 @@ object InmemJournal { recoveryCallback: PersistentRepr => Unit): Future[Unit] = { val highest = highestSequenceNr(persistenceId) if (highest != 0L && max != 0L) - read(persistenceId, fromSequenceNr, math.min(toSequenceNr, highest), max).foreach(recoveryCallback) + read(persistenceId, fromSequenceNr, math.min(toSequenceNr, highest), max).foreach { + case (pr, _) => recoveryCallback(pr) + } Future.successful(()) } @@ -93,6 +110,19 @@ object InmemJournal { Future.successful(()) } + override def receivePluginInternal: Receive = { + case ReplayWithMeta(fromSequenceNr, toSequenceNr, max, persistenceId, replyTo) => + log.debug("ReplayWithMeta {} {} {} {}", fromSequenceNr, toSequenceNr, max, persistenceId) + val highest = highestSequenceNr(persistenceId) + if (highest != 0L && max != 0L) { + read(persistenceId, fromSequenceNr, math.min(toSequenceNr, highest), max).foreach { + case (pr, meta) => replyTo ! MessageWithMeta(pr, meta) + } + } + replyTo ! RecoverySuccess(highest) + + } + private def verifySerialization(event: Any): Unit = { if (testSerialization) { val eventAnyRef = event.asInstanceOf[AnyRef] @@ -109,31 +139,35 @@ object InmemJournal { */ @InternalApi private[persistence] trait InmemMessages { // persistenceId -> persistent message - var messages = Map.empty[String, Vector[PersistentRepr]] + var messages = Map.empty[String, Vector[(PersistentRepr, OptionVal[Any])]] // persistenceId -> highest used sequence number private var highestSequenceNumbers = Map.empty[String, Long] + // FIXME, which way around should Tagged/EventWithMeta go? https://github.com/akka/akka/issues/29284 def add(p: PersistentRepr): Unit = { val pr = p.payload match { - case Tagged(payload, _) => p.withPayload(payload) - case _ => p + case Tagged(payload, _) => (p.withPayload(payload).withTimestamp(System.currentTimeMillis()), OptionVal.None) + case meta: EventWithMetaData => + (p.withPayload(meta.event).withTimestamp(System.currentTimeMillis()), OptionVal.Some(meta.metaData)) + case _ => (p.withTimestamp(System.currentTimeMillis()), OptionVal.None) } - messages = messages + (messages.get(pr.persistenceId) match { - case Some(ms) => pr.persistenceId -> (ms :+ pr) - case None => pr.persistenceId -> Vector(pr) + + messages = messages + (messages.get(p.persistenceId) match { + case Some(ms) => p.persistenceId -> (ms :+ pr) + case None => p.persistenceId -> Vector(pr) }) highestSequenceNumbers = - highestSequenceNumbers.updated(pr.persistenceId, math.max(highestSequenceNr(pr.persistenceId), pr.sequenceNr)) + highestSequenceNumbers.updated(p.persistenceId, math.max(highestSequenceNr(p.persistenceId), p.sequenceNr)) } def delete(pid: String, snr: Long): Unit = messages = messages.get(pid) match { - case Some(ms) => messages + (pid -> ms.filterNot(_.sequenceNr == snr)) + case Some(ms) => messages + (pid -> ms.filterNot(_._1.sequenceNr == snr)) case None => messages } - def read(pid: String, fromSnr: Long, toSnr: Long, max: Long): immutable.Seq[PersistentRepr] = + def read(pid: String, fromSnr: Long, toSnr: Long, max: Long): immutable.Seq[(PersistentRepr, OptionVal[Any])] = messages.get(pid) match { - case Some(ms) => ms.filter(m => m.sequenceNr >= fromSnr && m.sequenceNr <= toSnr).take(safeLongToInt(max)) + case Some(ms) => ms.filter(m => m._1.sequenceNr >= fromSnr && m._1.sequenceNr <= toSnr).take(safeLongToInt(max)) case None => Nil } diff --git a/akka-persistence/src/test/scala/akka/persistence/EventSourcedActorFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/EventSourcedActorFailureSpec.scala index 0712f9af4c..1b008cebb5 100644 --- a/akka-persistence/src/test/scala/akka/persistence/EventSourcedActorFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/EventSourcedActorFailureSpec.scala @@ -39,10 +39,10 @@ object EventSourcedActorFailureSpec { val readFromStore = read(persistenceId, fromSequenceNr, toSequenceNr, max) if (readFromStore.isEmpty) Future.successful(()) - else if (isCorrupt(readFromStore)) + else if (isCorrupt(readFromStore.map(_._1))) Future.failed(new SimulatedException(s"blahonga $fromSequenceNr $toSequenceNr")) else { - readFromStore.foreach(recoveryCallback) + readFromStore.map(_._1).foreach(recoveryCallback) Future.successful(()) } } diff --git a/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala b/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala index 1381222971..393c5c4ba9 100644 --- a/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala +++ b/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala @@ -66,11 +66,11 @@ class ChaosJournal extends AsyncWriteJournal { replayCallback: (PersistentRepr) => Unit): Future[Unit] = if (shouldFail(replayFailureRate)) { val rm = read(persistenceId, fromSequenceNr, toSequenceNr, max) - val sm = rm.take(random.nextInt(rm.length + 1)) + val sm = rm.take(random.nextInt(rm.length + 1)).map(_._1) sm.foreach(replayCallback) Future.failed(new ReplayFailedException(sm)) } else { - read(persistenceId, fromSequenceNr, toSequenceNr, max).foreach(replayCallback) + read(persistenceId, fromSequenceNr, toSequenceNr, max).map(_._1).foreach(replayCallback) Future.successful(()) } diff --git a/build.sbt b/build.sbt index 0e151f8fc4..72bc671395 100644 --- a/build.sbt +++ b/build.sbt @@ -314,6 +314,13 @@ lazy val persistenceTestkit = akkaModule("akka-persistence-testkit") .settings(AutomaticModuleName.settings("akka.persistence.testkit")) .disablePlugins(MimaPlugin) +lazy val persistenceTypedTests = akkaModule("akka-persistence-typed-tests") + .dependsOn(persistenceTyped, persistenceTestkit % "test", actorTestkitTyped % "test", jackson % "test->test") + .settings(AkkaBuild.mayChangeSettings) + .settings(Dependencies.persistenceTypedTests) + .disablePlugins(MimaPlugin) + .enablePlugins(NoPublish) + lazy val protobuf = akkaModule("akka-protobuf") .settings(OSGi.protobuf) .settings(AutomaticModuleName.settings("akka.protobuf")) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 0072a183c4..884fbae40a 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -255,6 +255,8 @@ object Dependencies { val persistenceTestKit = l ++= Seq(Test.scalatest, Test.logback) + val persistenceTypedTests = l ++= Seq(Test.scalatest, Test.logback) + val persistenceShared = l ++= Seq(Provided.levelDB, Provided.levelDBNative, Test.logback) val jackson = l ++= Seq(